Thrill  0.1
StreamData Class Referenceabstract

Detailed Description

Base class for common structures for ConcatStream and MixedStream.

This is also a virtual base class use by Multiplexer to pass blocks to streams! Instead, it contains common items like stats.

Definition at line 46 of file stream_data.hpp.

+ Inheritance diagram for StreamData:
+ Collaboration diagram for StreamData:

#include <stream_data.hpp>

Classes

class  Writers
 An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream. More...
 

Public Types

using Writer = BlockWriter< StreamSink >
 

Public Member Functions

 StreamData (StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
 
virtual ~StreamData ()
 
virtual void Close ()=0
 shuts the stream down. More...
 
virtual bool closed () const =0
 
virtual Writers GetWriters ()=0
 
const StreamIdid () const
 Return stream id. More...
 
size_t my_host_rank () const
 Returns my_host_rank. More...
 
size_t my_worker_rank () const
 Returns my_worker_rank_. More...
 
size_t num_hosts () const
 Number of hosts in system. More...
 
size_t num_workers () const
 Number of workers in system. More...
 
void OnAllWritersClosed ()
 method called when all StreamSink writers have finished More...
 
void OnWriterClosed (size_t peer_worker_rank, bool sent)
 
virtual const char * stream_type ()=0
 return stream type string More...
 
size_t workers_per_host () const
 Returns workers_per_host. More...
 
- Public Member Functions inherited from ReferenceCounter
 ReferenceCounter () noexcept
 new objects have zero reference count More...
 
 ReferenceCounter (const ReferenceCounter &) noexcept
 coping still creates a new object with zero reference count More...
 
 ~ReferenceCounter ()
 
bool dec_reference () const noexcept
 Call whenever resetting (i.e. More...
 
void inc_reference () const noexcept
 Call whenever setting a pointer to the object. More...
 
ReferenceCounteroperator= (const ReferenceCounter &) noexcept
 assignment operator, leaves pointers unchanged More...
 
size_t reference_count () const noexcept
 Return the number of references to this object (for debugging) More...
 
bool unique () const noexcept
 Test if the ReferenceCounter is referenced by only one CountingPtr. More...
 

Public Attributes

std::atomic< size_t > rx_int_blocks_ { 0 }
 
std::atomic< size_t > rx_int_bytes_ { 0 }
 
std::atomic< size_t > rx_int_items_ { 0 }
 
common::StatsTimerStart rx_lifetime_
 
std::atomic< size_t > rx_net_blocks_ { 0 }
 
std::atomic< size_t > rx_net_bytes_ { 0 }
 
std::atomic< size_t > rx_net_items_ { 0 }
 
common::StatsTimerStopped rx_timespan_
 
tlx::Semaphore sem_queue_
 
std::atomic< size_t > tx_int_blocks_ { 0 }
 
std::atomic< size_t > tx_int_bytes_ { 0 }
 
std::atomic< size_t > tx_int_items_ { 0 }
 
common::StatsTimerStart tx_lifetime_
 Timers from creation of stream until rx / tx direction is closed. More...
 
std::atomic< size_t > tx_net_blocks_ { 0 }
 
std::atomic< size_t > tx_net_bytes_ { 0 }
 
std::atomic< size_t > tx_net_items_ { 0 }
 
common::StatsTimerStopped tx_timespan_
 Timers from first rx / tx package until rx / tx direction is closed. More...
 

Static Public Attributes

static constexpr bool debug = false
 

Protected Attributes

bool all_writers_closed_ = false
 bool if all writers were closed More...
 
size_t dia_id_
 associated DIANode id. More...
 
StreamId id_
 our own stream id. More...
 
size_t local_worker_id_
 local worker id More...
 
Multiplexermultiplexer_
 reference to multiplexer More...
 
std::atomic< size_t > remaining_closing_blocks_
 
tlx::Semaphore sem_closing_blocks_
 number of received stream closing Blocks. More...
 
StreamSetBasestream_set_base_
 pointer to StreamSetBase containing this StreamData More...
 
size_t writers_closed_ = 0
 number of writers closed via StreamSink. More...
 

Member Typedef Documentation

◆ Writer

Definition at line 51 of file stream_data.hpp.

Constructor & Destructor Documentation

◆ StreamData()

StreamData ( StreamSetBase stream_set_base,
Multiplexer multiplexer,
size_t  send_size_limit,
const StreamId id,
size_t  local_worker_id,
size_t  dia_id 
)

Definition at line 24 of file stream_data.cpp.

References StreamData::~StreamData().

◆ ~StreamData()

~StreamData ( )
virtualdefault

Referenced by StreamData::StreamData().

Member Function Documentation

◆ Close()

virtual void Close ( )
pure virtual

shuts the stream down.

Implemented in CatStreamData, and MixStreamData.

Referenced by Stream::Close().

◆ closed()

virtual bool closed ( ) const
pure virtual

Implemented in CatStreamData, and MixStreamData.

◆ GetWriters()

virtual Writers GetWriters ( )
pure virtual

Creates BlockWriters for each worker. BlockWriter can only be opened once, otherwise the block sequence is incorrectly interleaved!

Implemented in CatStreamData, and MixStreamData.

◆ id()

const StreamId& id ( ) const
inline

Return stream id.

Definition at line 91 of file stream_data.hpp.

Referenced by CatStreamData::Close().

◆ my_host_rank()

◆ my_worker_rank()

◆ num_hosts()

size_t num_hosts ( ) const
inline

◆ num_workers()

◆ OnAllWritersClosed()

◆ OnWriterClosed()

void OnWriterClosed ( size_t  peer_worker_rank,
bool  sent 
)

◆ stream_type()

virtual const char* stream_type ( )
pure virtual

return stream type string

Implemented in CatStreamData, and MixStreamData.

Referenced by StreamData::OnAllWritersClosed().

◆ workers_per_host()

Member Data Documentation

◆ all_writers_closed_

bool all_writers_closed_ = false
protected

bool if all writers were closed

Definition at line 190 of file stream_data.hpp.

Referenced by MixStreamData::Close(), CatStreamData::Close(), and StreamData::OnWriterClosed().

◆ debug

constexpr bool debug = false
static

Definition at line 49 of file stream_data.hpp.

◆ dia_id_

size_t dia_id_
protected

associated DIANode id.

Definition at line 174 of file stream_data.hpp.

Referenced by StreamData::OnAllWritersClosed(), MixStreamData::set_dia_id(), and CatStreamData::set_dia_id().

◆ id_

◆ local_worker_id_

◆ multiplexer_

◆ remaining_closing_blocks_

std::atomic<size_t> remaining_closing_blocks_
protected

number of remaining expected stream closing operations. Required to know when to stop rx_lifetime

Definition at line 181 of file stream_data.hpp.

Referenced by CatStreamData::CatStreamData(), MixStreamData::MixStreamData(), MixStreamData::OnStreamBlockOrdered(), and CatStreamData::OnStreamBlockOrdered().

◆ rx_int_blocks_

std::atomic<size_t> rx_int_blocks_ { 0 }

Definition at line 144 of file stream_data.hpp.

Referenced by StreamData::OnAllWritersClosed(), and Stream::rx_int_blocks().

◆ rx_int_bytes_

std::atomic<size_t> rx_int_bytes_ { 0 }

Definition at line 144 of file stream_data.hpp.

Referenced by StreamData::OnAllWritersClosed(), and Stream::rx_int_bytes().

◆ rx_int_items_

std::atomic<size_t> rx_int_items_ { 0 }

StatsCounter for incoming data transfer. Exclusively contains only loopback (internal) data transfer

Definition at line 144 of file stream_data.hpp.

Referenced by StreamData::OnAllWritersClosed(), and Stream::rx_int_items().

◆ rx_lifetime_

◆ rx_net_blocks_

std::atomic<size_t> rx_net_blocks_ { 0 }

◆ rx_net_bytes_

◆ rx_net_items_

std::atomic<size_t> rx_net_items_ { 0 }

StatsCounter for incoming data transfer. Does not include loopback data transfer

Definition at line 134 of file stream_data.hpp.

Referenced by StreamData::OnAllWritersClosed(), MixStreamData::OnStreamBlockOrdered(), CatStreamData::OnStreamBlockOrdered(), and Stream::rx_net_items().

◆ rx_timespan_

◆ sem_closing_blocks_

tlx::Semaphore sem_closing_blocks_
protected

number of received stream closing Blocks.

Definition at line 184 of file stream_data.hpp.

Referenced by MixStreamData::Close(), CatStreamData::Close(), MixStreamData::OnStreamBlockOrdered(), and CatStreamData::OnStreamBlockOrdered().

◆ sem_queue_

tlx::Semaphore sem_queue_

semaphore to stall the amount of PinnedBlocks (measured in bytes) passed to the network layer for transmission.

Definition at line 159 of file stream_data.hpp.

◆ stream_set_base_

StreamSetBase* stream_set_base_
protected

pointer to StreamSetBase containing this StreamData

Definition at line 168 of file stream_data.hpp.

Referenced by StreamData::OnWriterClosed().

◆ tx_int_blocks_

std::atomic<size_t> tx_int_blocks_ { 0 }

Definition at line 149 of file stream_data.hpp.

Referenced by StreamData::OnAllWritersClosed(), and Stream::tx_int_blocks().

◆ tx_int_bytes_

std::atomic<size_t> tx_int_bytes_ { 0 }

Definition at line 149 of file stream_data.hpp.

Referenced by StreamData::OnAllWritersClosed(), and Stream::tx_int_bytes().

◆ tx_int_items_

std::atomic<size_t> tx_int_items_ { 0 }

StatsCounters for outgoing data transfer - shared by all sinks. Exclusively contains only loopback (internal) data transfer

Definition at line 149 of file stream_data.hpp.

Referenced by StreamData::OnAllWritersClosed(), and Stream::tx_int_items().

◆ tx_lifetime_

Timers from creation of stream until rx / tx direction is closed.

Definition at line 152 of file stream_data.hpp.

Referenced by StreamData::OnWriterClosed().

◆ tx_net_blocks_

std::atomic<size_t> tx_net_blocks_ { 0 }

Definition at line 139 of file stream_data.hpp.

Referenced by StreamData::OnAllWritersClosed(), and Stream::tx_net_blocks().

◆ tx_net_bytes_

std::atomic<size_t> tx_net_bytes_ { 0 }

Definition at line 139 of file stream_data.hpp.

Referenced by StreamData::OnAllWritersClosed(), and Stream::tx_net_bytes().

◆ tx_net_items_

std::atomic<size_t> tx_net_items_ { 0 }

StatsCounters for outgoing data transfer - shared by all sinks. Does not include loopback data transfer

Definition at line 139 of file stream_data.hpp.

Referenced by StreamData::OnAllWritersClosed(), and Stream::tx_net_items().

◆ tx_timespan_

Timers from first rx / tx package until rx / tx direction is closed.

Definition at line 155 of file stream_data.hpp.

Referenced by MixStreamData::GetWriters(), CatStreamData::GetWriters(), and StreamData::OnWriterClosed().

◆ writers_closed_

size_t writers_closed_ = 0
protected

number of writers closed via StreamSink.

Definition at line 187 of file stream_data.hpp.

Referenced by StreamData::OnWriterClosed().


The documentation for this class was generated from the following files: