Thrill
0.1
|
Base class for common structures for ConcatStream and MixedStream.
This is also a virtual base class use by Multiplexer to pass blocks to streams! Instead, it contains common items like stats.
Definition at line 46 of file stream_data.hpp.
#include <stream_data.hpp>
Classes | |
class | Writers |
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream. More... | |
Public Types | |
using | Writer = BlockWriter< StreamSink > |
Public Member Functions | |
StreamData (StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id) | |
virtual | ~StreamData () |
virtual void | Close ()=0 |
shuts the stream down. More... | |
virtual bool | closed () const =0 |
virtual Writers | GetWriters ()=0 |
const StreamId & | id () const |
Return stream id. More... | |
size_t | my_host_rank () const |
Returns my_host_rank. More... | |
size_t | my_worker_rank () const |
Returns my_worker_rank_. More... | |
size_t | num_hosts () const |
Number of hosts in system. More... | |
size_t | num_workers () const |
Number of workers in system. More... | |
void | OnAllWritersClosed () |
method called when all StreamSink writers have finished More... | |
void | OnWriterClosed (size_t peer_worker_rank, bool sent) |
virtual const char * | stream_type ()=0 |
return stream type string More... | |
size_t | workers_per_host () const |
Returns workers_per_host. More... | |
Public Member Functions inherited from ReferenceCounter | |
ReferenceCounter () noexcept | |
new objects have zero reference count More... | |
ReferenceCounter (const ReferenceCounter &) noexcept | |
coping still creates a new object with zero reference count More... | |
~ReferenceCounter () | |
bool | dec_reference () const noexcept |
Call whenever resetting (i.e. More... | |
void | inc_reference () const noexcept |
Call whenever setting a pointer to the object. More... | |
ReferenceCounter & | operator= (const ReferenceCounter &) noexcept |
assignment operator, leaves pointers unchanged More... | |
size_t | reference_count () const noexcept |
Return the number of references to this object (for debugging) More... | |
bool | unique () const noexcept |
Test if the ReferenceCounter is referenced by only one CountingPtr. More... | |
Public Attributes | |
std::atomic< size_t > | rx_int_blocks_ { 0 } |
std::atomic< size_t > | rx_int_bytes_ { 0 } |
std::atomic< size_t > | rx_int_items_ { 0 } |
common::StatsTimerStart | rx_lifetime_ |
std::atomic< size_t > | rx_net_blocks_ { 0 } |
std::atomic< size_t > | rx_net_bytes_ { 0 } |
std::atomic< size_t > | rx_net_items_ { 0 } |
common::StatsTimerStopped | rx_timespan_ |
tlx::Semaphore | sem_queue_ |
std::atomic< size_t > | tx_int_blocks_ { 0 } |
std::atomic< size_t > | tx_int_bytes_ { 0 } |
std::atomic< size_t > | tx_int_items_ { 0 } |
common::StatsTimerStart | tx_lifetime_ |
Timers from creation of stream until rx / tx direction is closed. More... | |
std::atomic< size_t > | tx_net_blocks_ { 0 } |
std::atomic< size_t > | tx_net_bytes_ { 0 } |
std::atomic< size_t > | tx_net_items_ { 0 } |
common::StatsTimerStopped | tx_timespan_ |
Timers from first rx / tx package until rx / tx direction is closed. More... | |
Static Public Attributes | |
static constexpr bool | debug = false |
Protected Attributes | |
bool | all_writers_closed_ = false |
bool if all writers were closed More... | |
size_t | dia_id_ |
associated DIANode id. More... | |
StreamId | id_ |
our own stream id. More... | |
size_t | local_worker_id_ |
local worker id More... | |
Multiplexer & | multiplexer_ |
reference to multiplexer More... | |
std::atomic< size_t > | remaining_closing_blocks_ |
tlx::Semaphore | sem_closing_blocks_ |
number of received stream closing Blocks. More... | |
StreamSetBase * | stream_set_base_ |
pointer to StreamSetBase containing this StreamData More... | |
size_t | writers_closed_ = 0 |
number of writers closed via StreamSink. More... | |
using Writer = BlockWriter<StreamSink> |
Definition at line 51 of file stream_data.hpp.
StreamData | ( | StreamSetBase * | stream_set_base, |
Multiplexer & | multiplexer, | ||
size_t | send_size_limit, | ||
const StreamId & | id, | ||
size_t | local_worker_id, | ||
size_t | dia_id | ||
) |
Definition at line 24 of file stream_data.cpp.
References StreamData::~StreamData().
|
virtualdefault |
Referenced by StreamData::StreamData().
|
pure virtual |
shuts the stream down.
Implemented in CatStreamData, and MixStreamData.
Referenced by Stream::Close().
|
pure virtual |
Implemented in CatStreamData, and MixStreamData.
|
pure virtual |
Creates BlockWriters for each worker. BlockWriter can only be opened once, otherwise the block sequence is incorrectly interleaved!
Implemented in CatStreamData, and MixStreamData.
|
inline |
Return stream id.
Definition at line 91 of file stream_data.hpp.
Referenced by CatStreamData::Close().
|
inline |
Returns my_host_rank.
Definition at line 97 of file stream_data.hpp.
Referenced by CatStreamData::CatStreamData(), CatStreamData::Close(), MixStreamData::GetWriters(), CatStreamData::GetWriters(), CatStreamData::loopback_queue(), and StreamData::OnAllWritersClosed().
|
inline |
Returns my_worker_rank_.
Definition at line 106 of file stream_data.hpp.
Referenced by CatStreamData::CatStreamData(), CatStreamData::Close(), MixStreamData::GetWriters(), CatStreamData::GetWriters(), MixStreamData::OnStreamBlock(), MixStreamData::OnStreamBlockOrdered(), CatStreamData::OnStreamBlockOrdered(), StreamData::OnWriterClosed(), and Stream::ScatterConsume().
|
inline |
Number of hosts in system.
Definition at line 99 of file stream_data.hpp.
Referenced by CatStreamData::CatStreamData(), MixStreamData::Close(), MixStreamData::GetWriters(), CatStreamData::GetWriters(), MixStreamData::MixStreamData(), and StreamData::OnWriterClosed().
|
inline |
Number of workers in system.
Definition at line 101 of file stream_data.hpp.
Referenced by CatStreamData::CatStreamData(), CatStreamData::GetCatBlockSource(), CatStreamData::GetReaders(), MixStreamData::GetWriters(), CatStreamData::GetWriters(), and MixStreamData::OnStreamBlock().
void OnAllWritersClosed | ( | ) |
method called when all StreamSink writers have finished
Definition at line 61 of file stream_data.cpp.
References StreamData::dia_id_, StreamData::id_, StreamData::local_worker_id_, Multiplexer::logger(), StreamData::multiplexer_, StreamData::my_host_rank(), StreamData::rx_int_blocks_, StreamData::rx_int_bytes_, StreamData::rx_int_items_, StreamData::rx_net_blocks_, StreamData::rx_net_bytes_, StreamData::rx_net_items_, StreamData::stream_type(), StreamData::tx_int_blocks_, StreamData::tx_int_bytes_, StreamData::tx_int_items_, StreamData::tx_net_blocks_, StreamData::tx_net_bytes_, StreamData::tx_net_items_, and Multiplexer::workers_per_host().
Referenced by StreamData::OnWriterClosed().
void OnWriterClosed | ( | size_t | peer_worker_rank, |
bool | sent | ||
) |
method called from StreamSink when it is closed, used to aggregate Close messages to remote hosts
Definition at line 38 of file stream_data.cpp.
References StreamData::all_writers_closed_, die_unless, LOG, StreamData::my_worker_rank(), StreamData::num_hosts(), StreamData::OnAllWritersClosed(), StreamSetBase::OnWriterClosed(), StreamData::stream_set_base_, StreamData::tx_lifetime_, StreamData::tx_timespan_, StreamData::workers_per_host(), and StreamData::writers_closed_.
|
pure virtual |
return stream type string
Implemented in CatStreamData, and MixStreamData.
Referenced by StreamData::OnAllWritersClosed().
|
inline |
Returns workers_per_host.
Definition at line 104 of file stream_data.hpp.
Referenced by CatStreamData::CatStreamData(), MixStreamData::Close(), CatStreamData::Close(), MixStreamData::GetWriters(), CatStreamData::GetWriters(), CatStreamData::loopback_queue(), MixStreamData::MixStreamData(), and StreamData::OnWriterClosed().
|
protected |
bool if all writers were closed
Definition at line 190 of file stream_data.hpp.
Referenced by MixStreamData::Close(), CatStreamData::Close(), and StreamData::OnWriterClosed().
|
static |
Definition at line 49 of file stream_data.hpp.
|
protected |
associated DIANode id.
Definition at line 174 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), MixStreamData::set_dia_id(), and CatStreamData::set_dia_id().
|
protected |
our own stream id.
Definition at line 165 of file stream_data.hpp.
Referenced by CatStreamData::CatStreamData(), MixStreamData::Close(), CatStreamData::Close(), MixStreamData::GetWriters(), CatStreamData::GetWriters(), StreamData::OnAllWritersClosed(), MixStreamData::OnStreamBlock(), CatStreamData::OnStreamBlock(), MixStreamData::OnStreamBlockOrdered(), and CatStreamData::OnStreamBlockOrdered().
|
protected |
local worker id
Definition at line 171 of file stream_data.hpp.
Referenced by MixStreamData::Close(), CatStreamData::Close(), CatStreamData::GetCatBlockSource(), MixStreamData::GetMixReader(), CatStreamData::GetReaders(), MixStreamData::GetWriters(), CatStreamData::GetWriters(), CatStreamData::loopback_queue(), StreamData::OnAllWritersClosed(), and CatStreamData::OnStreamBlock().
|
protected |
reference to multiplexer
Definition at line 177 of file stream_data.hpp.
Referenced by CatStreamData::CatStreamData(), MixStreamData::Close(), CatStreamData::Close(), MixStreamData::GetWriters(), CatStreamData::GetWriters(), and StreamData::OnAllWritersClosed().
|
protected |
number of remaining expected stream closing operations. Required to know when to stop rx_lifetime
Definition at line 181 of file stream_data.hpp.
Referenced by CatStreamData::CatStreamData(), MixStreamData::MixStreamData(), MixStreamData::OnStreamBlockOrdered(), and CatStreamData::OnStreamBlockOrdered().
std::atomic<size_t> rx_int_blocks_ { 0 } |
Definition at line 144 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), and Stream::rx_int_blocks().
std::atomic<size_t> rx_int_bytes_ { 0 } |
Definition at line 144 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), and Stream::rx_int_bytes().
std::atomic<size_t> rx_int_items_ { 0 } |
StatsCounter for incoming data transfer. Exclusively contains only loopback (internal) data transfer
Definition at line 144 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), and Stream::rx_int_items().
common::StatsTimerStart rx_lifetime_ |
Definition at line 152 of file stream_data.hpp.
Referenced by MixStreamData::OnStreamBlockOrdered(), and CatStreamData::OnStreamBlockOrdered().
std::atomic<size_t> rx_net_blocks_ { 0 } |
Definition at line 134 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), MixStreamData::OnStreamBlockOrdered(), CatStreamData::OnStreamBlockOrdered(), and Stream::rx_net_blocks().
std::atomic<size_t> rx_net_bytes_ { 0 } |
Definition at line 134 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), MixStreamData::OnStreamBlockOrdered(), CatStreamData::OnStreamBlockOrdered(), and Stream::rx_net_bytes().
std::atomic<size_t> rx_net_items_ { 0 } |
StatsCounter for incoming data transfer. Does not include loopback data transfer
Definition at line 134 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), MixStreamData::OnStreamBlockOrdered(), CatStreamData::OnStreamBlockOrdered(), and Stream::rx_net_items().
common::StatsTimerStopped rx_timespan_ |
Definition at line 155 of file stream_data.hpp.
Referenced by CatStreamData::GetCatBlockSource(), MixStreamData::GetMixReader(), CatStreamData::GetReaders(), MixStreamData::OnStreamBlock(), CatStreamData::OnStreamBlock(), MixStreamData::OnStreamBlockOrdered(), and CatStreamData::OnStreamBlockOrdered().
|
protected |
number of received stream closing Blocks.
Definition at line 184 of file stream_data.hpp.
Referenced by MixStreamData::Close(), CatStreamData::Close(), MixStreamData::OnStreamBlockOrdered(), and CatStreamData::OnStreamBlockOrdered().
tlx::Semaphore sem_queue_ |
semaphore to stall the amount of PinnedBlocks (measured in bytes) passed to the network layer for transmission.
Definition at line 159 of file stream_data.hpp.
|
protected |
pointer to StreamSetBase containing this StreamData
Definition at line 168 of file stream_data.hpp.
Referenced by StreamData::OnWriterClosed().
std::atomic<size_t> tx_int_blocks_ { 0 } |
Definition at line 149 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), and Stream::tx_int_blocks().
std::atomic<size_t> tx_int_bytes_ { 0 } |
Definition at line 149 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), and Stream::tx_int_bytes().
std::atomic<size_t> tx_int_items_ { 0 } |
StatsCounters for outgoing data transfer - shared by all sinks. Exclusively contains only loopback (internal) data transfer
Definition at line 149 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), and Stream::tx_int_items().
common::StatsTimerStart tx_lifetime_ |
Timers from creation of stream until rx / tx direction is closed.
Definition at line 152 of file stream_data.hpp.
Referenced by StreamData::OnWriterClosed().
std::atomic<size_t> tx_net_blocks_ { 0 } |
Definition at line 139 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), and Stream::tx_net_blocks().
std::atomic<size_t> tx_net_bytes_ { 0 } |
Definition at line 139 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), and Stream::tx_net_bytes().
std::atomic<size_t> tx_net_items_ { 0 } |
StatsCounters for outgoing data transfer - shared by all sinks. Does not include loopback data transfer
Definition at line 139 of file stream_data.hpp.
Referenced by StreamData::OnAllWritersClosed(), and Stream::tx_net_items().
common::StatsTimerStopped tx_timespan_ |
Timers from first rx / tx package until rx / tx direction is closed.
Definition at line 155 of file stream_data.hpp.
Referenced by MixStreamData::GetWriters(), CatStreamData::GetWriters(), and StreamData::OnWriterClosed().
|
protected |
number of writers closed via StreamSink.
Definition at line 187 of file stream_data.hpp.
Referenced by StreamData::OnWriterClosed().