|
Thrill
0.1
|
Base class for common structures for ConcatStream and MixedStream.
This is also a virtual base class use by Multiplexer to pass blocks to streams! Instead, it contains common items like stats.
Definition at line 46 of file stream_data.hpp.
Inheritance diagram for StreamData:
Collaboration diagram for StreamData:#include <stream_data.hpp>
Classes | |
| class | Writers |
| An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream. More... | |
Public Types | |
| using | Writer = BlockWriter< StreamSink > |
Public Member Functions | |
| StreamData (StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id) | |
| virtual | ~StreamData () |
| virtual void | Close ()=0 |
| shuts the stream down. More... | |
| virtual bool | closed () const =0 |
| virtual Writers | GetWriters ()=0 |
| const StreamId & | id () const |
| Return stream id. More... | |
| size_t | my_host_rank () const |
| Returns my_host_rank. More... | |
| size_t | my_worker_rank () const |
| Returns my_worker_rank_. More... | |
| size_t | num_hosts () const |
| Number of hosts in system. More... | |
| size_t | num_workers () const |
| Number of workers in system. More... | |
| void | OnAllWritersClosed () |
| method called when all StreamSink writers have finished More... | |
| void | OnWriterClosed (size_t peer_worker_rank, bool sent) |
| virtual const char * | stream_type ()=0 |
| return stream type string More... | |
| size_t | workers_per_host () const |
| Returns workers_per_host. More... | |
Public Member Functions inherited from ReferenceCounter | |
| ReferenceCounter () noexcept | |
| new objects have zero reference count More... | |
| ReferenceCounter (const ReferenceCounter &) noexcept | |
| coping still creates a new object with zero reference count More... | |
| ~ReferenceCounter () | |
| bool | dec_reference () const noexcept |
| Call whenever resetting (i.e. More... | |
| void | inc_reference () const noexcept |
| Call whenever setting a pointer to the object. More... | |
| ReferenceCounter & | operator= (const ReferenceCounter &) noexcept |
| assignment operator, leaves pointers unchanged More... | |
| size_t | reference_count () const noexcept |
| Return the number of references to this object (for debugging) More... | |
| bool | unique () const noexcept |
| Test if the ReferenceCounter is referenced by only one CountingPtr. More... | |
Public Attributes | |
| std::atomic< size_t > | rx_int_blocks_ { 0 } |
| std::atomic< size_t > | rx_int_bytes_ { 0 } |
| std::atomic< size_t > | rx_int_items_ { 0 } |
| common::StatsTimerStart | rx_lifetime_ |
| std::atomic< size_t > | rx_net_blocks_ { 0 } |
| std::atomic< size_t > | rx_net_bytes_ { 0 } |
| std::atomic< size_t > | rx_net_items_ { 0 } |
| common::StatsTimerStopped | rx_timespan_ |
| tlx::Semaphore | sem_queue_ |
| std::atomic< size_t > | tx_int_blocks_ { 0 } |
| std::atomic< size_t > | tx_int_bytes_ { 0 } |
| std::atomic< size_t > | tx_int_items_ { 0 } |
| common::StatsTimerStart | tx_lifetime_ |
| Timers from creation of stream until rx / tx direction is closed. More... | |
| std::atomic< size_t > | tx_net_blocks_ { 0 } |
| std::atomic< size_t > | tx_net_bytes_ { 0 } |
| std::atomic< size_t > | tx_net_items_ { 0 } |
| common::StatsTimerStopped | tx_timespan_ |
| Timers from first rx / tx package until rx / tx direction is closed. More... | |
Static Public Attributes | |
| static constexpr bool | debug = false |
Protected Attributes | |
| bool | all_writers_closed_ = false |
| bool if all writers were closed More... | |
| size_t | dia_id_ |
| associated DIANode id. More... | |
| StreamId | id_ |
| our own stream id. More... | |
| size_t | local_worker_id_ |
| local worker id More... | |
| Multiplexer & | multiplexer_ |
| reference to multiplexer More... | |
| std::atomic< size_t > | remaining_closing_blocks_ |
| tlx::Semaphore | sem_closing_blocks_ |
| number of received stream closing Blocks. More... | |
| StreamSetBase * | stream_set_base_ |
| pointer to StreamSetBase containing this StreamData More... | |
| size_t | writers_closed_ = 0 |
| number of writers closed via StreamSink. More... | |
| using Writer = BlockWriter<StreamSink> |
Definition at line 51 of file stream_data.hpp.
| StreamData | ( | StreamSetBase * | stream_set_base, |
| Multiplexer & | multiplexer, | ||
| size_t | send_size_limit, | ||
| const StreamId & | id, | ||
| size_t | local_worker_id, | ||
| size_t | dia_id | ||
| ) |
Definition at line 24 of file stream_data.cpp.
References StreamData::~StreamData().
|
virtualdefault |
Referenced by StreamData::StreamData().
|
pure virtual |
shuts the stream down.
Implemented in CatStreamData, and MixStreamData.
Referenced by Stream::Close().
|
pure virtual |
Implemented in CatStreamData, and MixStreamData.
|
pure virtual |
Creates BlockWriters for each worker. BlockWriter can only be opened once, otherwise the block sequence is incorrectly interleaved!
Implemented in CatStreamData, and MixStreamData.
|
inline |
Return stream id.
Definition at line 91 of file stream_data.hpp.
Referenced by CatStreamData::Close().
|
inline |
Returns my_host_rank.
Definition at line 97 of file stream_data.hpp.
Referenced by CatStreamData::CatStreamData(), CatStreamData::Close(), MixStreamData::GetWriters(), CatStreamData::GetWriters(), CatStreamData::loopback_queue(), and StreamData::OnAllWritersClosed().
|
inline |
Returns my_worker_rank_.
Definition at line 106 of file stream_data.hpp.
Referenced by CatStreamData::CatStreamData(), CatStreamData::Close(), MixStreamData::GetWriters(), CatStreamData::GetWriters(), MixStreamData::OnStreamBlock(), MixStreamData::OnStreamBlockOrdered(), CatStreamData::OnStreamBlockOrdered(), StreamData::OnWriterClosed(), and Stream::ScatterConsume().
|
inline |
Number of hosts in system.
Definition at line 99 of file stream_data.hpp.
Referenced by CatStreamData::CatStreamData(), MixStreamData::Close(), MixStreamData::GetWriters(), CatStreamData::GetWriters(), MixStreamData::MixStreamData(), and StreamData::OnWriterClosed().
|
inline |
Number of workers in system.
Definition at line 101 of file stream_data.hpp.
Referenced by CatStreamData::CatStreamData(), CatStreamData::GetCatBlockSource(), CatStreamData::GetReaders(), MixStreamData::GetWriters(), CatStreamData::GetWriters(), and MixStreamData::OnStreamBlock().
| void OnAllWritersClosed | ( | ) |
method called when all StreamSink writers have finished
Definition at line 61 of file stream_data.cpp.
References StreamData::dia_id_, StreamData::id_, StreamData::local_worker_id_, Multiplexer::logger(), StreamData::multiplexer_, StreamData::my_host_rank(), StreamData::rx_int_blocks_, StreamData::rx_int_bytes_, StreamData::rx_int_items_, StreamData::rx_net_blocks_, StreamData::rx_net_bytes_, StreamData::rx_net_items_, StreamData::stream_type(), StreamData::tx_int_blocks_, StreamData::tx_int_bytes_, StreamData::tx_int_items_, StreamData::tx_net_blocks_, StreamData::tx_net_bytes_, StreamData::tx_net_items_, and Multiplexer::workers_per_host().
Referenced by StreamData::OnWriterClosed().
| void OnWriterClosed | ( | size_t | peer_worker_rank, |
| bool | sent | ||
| ) |
method called from StreamSink when it is closed, used to aggregate Close messages to remote hosts
Definition at line 38 of file stream_data.cpp.
References StreamData::all_writers_closed_, die_unless, LOG, StreamData::my_worker_rank(), StreamData::num_hosts(), StreamData::OnAllWritersClosed(), StreamSetBase::OnWriterClosed(), StreamData::stream_set_base_, StreamData::tx_lifetime_, StreamData::tx_timespan_, StreamData::workers_per_host(), and StreamData::writers_closed_.
|
pure virtual |
return stream type string
Implemented in CatStreamData, and MixStreamData.
Referenced by StreamData::OnAllWritersClosed().
|
inline |
Returns workers_per_host.
Definition at line 104 of file stream_data.hpp.
Referenced by CatStreamData::CatStreamData(), MixStreamData::Close(), CatStreamData::Close(), MixStreamData::GetWriters(), CatStreamData::GetWriters(), CatStreamData::loopback_queue(), MixStreamData::MixStreamData(), and StreamData::OnWriterClosed().
|
protected |
bool if all writers were closed
Definition at line 190 of file stream_data.hpp.
Referenced by MixStreamData::Close(), CatStreamData::Close(), and StreamData::OnWriterClosed().
|
static |
Definition at line 49 of file stream_data.hpp.
|
protected |
associated DIANode id.
Definition at line 174 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), MixStreamData::set_dia_id(), and CatStreamData::set_dia_id().
|
protected |
our own stream id.
Definition at line 165 of file stream_data.hpp.
Referenced by CatStreamData::CatStreamData(), MixStreamData::Close(), CatStreamData::Close(), MixStreamData::GetWriters(), CatStreamData::GetWriters(), StreamData::OnAllWritersClosed(), MixStreamData::OnStreamBlock(), CatStreamData::OnStreamBlock(), MixStreamData::OnStreamBlockOrdered(), and CatStreamData::OnStreamBlockOrdered().
|
protected |
local worker id
Definition at line 171 of file stream_data.hpp.
Referenced by MixStreamData::Close(), CatStreamData::Close(), CatStreamData::GetCatBlockSource(), MixStreamData::GetMixReader(), CatStreamData::GetReaders(), MixStreamData::GetWriters(), CatStreamData::GetWriters(), CatStreamData::loopback_queue(), StreamData::OnAllWritersClosed(), and CatStreamData::OnStreamBlock().
|
protected |
reference to multiplexer
Definition at line 177 of file stream_data.hpp.
Referenced by CatStreamData::CatStreamData(), MixStreamData::Close(), CatStreamData::Close(), MixStreamData::GetWriters(), CatStreamData::GetWriters(), and StreamData::OnAllWritersClosed().
|
protected |
number of remaining expected stream closing operations. Required to know when to stop rx_lifetime
Definition at line 181 of file stream_data.hpp.
Referenced by CatStreamData::CatStreamData(), MixStreamData::MixStreamData(), MixStreamData::OnStreamBlockOrdered(), and CatStreamData::OnStreamBlockOrdered().
| std::atomic<size_t> rx_int_blocks_ { 0 } |
Definition at line 144 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), and Stream::rx_int_blocks().
| std::atomic<size_t> rx_int_bytes_ { 0 } |
Definition at line 144 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), and Stream::rx_int_bytes().
| std::atomic<size_t> rx_int_items_ { 0 } |
StatsCounter for incoming data transfer. Exclusively contains only loopback (internal) data transfer
Definition at line 144 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), and Stream::rx_int_items().
| common::StatsTimerStart rx_lifetime_ |
Definition at line 152 of file stream_data.hpp.
Referenced by MixStreamData::OnStreamBlockOrdered(), and CatStreamData::OnStreamBlockOrdered().
| std::atomic<size_t> rx_net_blocks_ { 0 } |
Definition at line 134 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), MixStreamData::OnStreamBlockOrdered(), CatStreamData::OnStreamBlockOrdered(), and Stream::rx_net_blocks().
| std::atomic<size_t> rx_net_bytes_ { 0 } |
Definition at line 134 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), MixStreamData::OnStreamBlockOrdered(), CatStreamData::OnStreamBlockOrdered(), and Stream::rx_net_bytes().
| std::atomic<size_t> rx_net_items_ { 0 } |
StatsCounter for incoming data transfer. Does not include loopback data transfer
Definition at line 134 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), MixStreamData::OnStreamBlockOrdered(), CatStreamData::OnStreamBlockOrdered(), and Stream::rx_net_items().
| common::StatsTimerStopped rx_timespan_ |
Definition at line 155 of file stream_data.hpp.
Referenced by CatStreamData::GetCatBlockSource(), MixStreamData::GetMixReader(), CatStreamData::GetReaders(), MixStreamData::OnStreamBlock(), CatStreamData::OnStreamBlock(), MixStreamData::OnStreamBlockOrdered(), and CatStreamData::OnStreamBlockOrdered().
|
protected |
number of received stream closing Blocks.
Definition at line 184 of file stream_data.hpp.
Referenced by MixStreamData::Close(), CatStreamData::Close(), MixStreamData::OnStreamBlockOrdered(), and CatStreamData::OnStreamBlockOrdered().
| tlx::Semaphore sem_queue_ |
semaphore to stall the amount of PinnedBlocks (measured in bytes) passed to the network layer for transmission.
Definition at line 159 of file stream_data.hpp.
|
protected |
pointer to StreamSetBase containing this StreamData
Definition at line 168 of file stream_data.hpp.
Referenced by StreamData::OnWriterClosed().
| std::atomic<size_t> tx_int_blocks_ { 0 } |
Definition at line 149 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), and Stream::tx_int_blocks().
| std::atomic<size_t> tx_int_bytes_ { 0 } |
Definition at line 149 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), and Stream::tx_int_bytes().
| std::atomic<size_t> tx_int_items_ { 0 } |
StatsCounters for outgoing data transfer - shared by all sinks. Exclusively contains only loopback (internal) data transfer
Definition at line 149 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), and Stream::tx_int_items().
| common::StatsTimerStart tx_lifetime_ |
Timers from creation of stream until rx / tx direction is closed.
Definition at line 152 of file stream_data.hpp.
Referenced by StreamData::OnWriterClosed().
| std::atomic<size_t> tx_net_blocks_ { 0 } |
Definition at line 139 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), and Stream::tx_net_blocks().
| std::atomic<size_t> tx_net_bytes_ { 0 } |
Definition at line 139 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), and Stream::tx_net_bytes().
| std::atomic<size_t> tx_net_items_ { 0 } |
StatsCounters for outgoing data transfer - shared by all sinks. Does not include loopback data transfer
Definition at line 139 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), and Stream::tx_net_items().
| common::StatsTimerStopped tx_timespan_ |
Timers from first rx / tx package until rx / tx direction is closed.
Definition at line 155 of file stream_data.hpp.
Referenced by MixStreamData::GetWriters(), CatStreamData::GetWriters(), and StreamData::OnWriterClosed().
|
protected |
number of writers closed via StreamSink.
Definition at line 187 of file stream_data.hpp.
Referenced by StreamData::OnWriterClosed().