Thrill  0.1
MixStreamData Class Referencefinal

Detailed Description

A Stream is a virtual set of connections to all other worker instances, hence a "Stream" bundles them to a logical communication context.

We call an individual connection from a worker to another worker a "Host".

To use a Stream, one can get a vector of BlockWriter via OpenWriters() of outbound Stream. The vector is of size of workers in the system. One can then write items destined to the corresponding worker. The written items are buffered into a Block and only sent when the Block is full. To force a send, use BlockWriter::Flush(). When all items are sent, the BlockWriters must be closed using BlockWriter::Close().

The MixStream allows reading of items from all workers in an unordered sequence, without waiting for any of the workers to complete sending items.

Public Types

using Handle = MixStream
using MixReader = MixBlockQueueReader
- Public Types inherited from StreamData
using Writer = BlockWriter< StreamSink >

Public Member Functions

 MixStreamData (StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
 Creates a new stream instance. More...
 MixStreamData (const MixStreamData &)=delete
 non-copyable: delete copy-constructor More...
 MixStreamData (MixStreamData &&)=default
 move-constructor: default More...
 ~MixStreamData () final
void Close () final
 shuts the stream down. More...
bool closed () const final
MixReader GetMixReader (bool consume)
 Creates a BlockReader which mixes items from all workers. More...
MixReader GetReader (bool consume)
 Open a MixReader (function name matches a method in File and CatStream). More...
Writers GetWriters () final
bool is_queue_closed (size_t from)
 check if inbound queue is closed More...
MixStreamDataoperator= (const MixStreamData &)=delete
 non-copyable: delete assignment operator More...
void set_dia_id (size_t dia_id)
const char * stream_type () final
 return stream type string More...
- Public Member Functions inherited from StreamData
 StreamData (StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
virtual ~StreamData ()
const StreamIdid () const
 Return stream id. More...
size_t my_host_rank () const
 Returns my_host_rank. More...
size_t my_worker_rank () const
 Returns my_worker_rank_. More...
size_t num_hosts () const
 Number of hosts in system. More...
size_t num_workers () const
 Number of workers in system. More...
void OnAllWritersClosed ()
 method called when all StreamSink writers have finished More...
void OnWriterClosed (size_t peer_worker_rank, bool sent)
size_t workers_per_host () const
 Returns workers_per_host. More...
- Public Member Functions inherited from ReferenceCounter
 ReferenceCounter () noexcept
 new objects have zero reference count More...
 ReferenceCounter (const ReferenceCounter &) noexcept
 coping still creates a new object with zero reference count More...
 ~ReferenceCounter ()
bool dec_reference () const noexcept
 Call whenever resetting (i.e. More...
void inc_reference () const noexcept
 Call whenever setting a pointer to the object. More...
ReferenceCounteroperator= (const ReferenceCounter &) noexcept
 assignment operator, leaves pointers unchanged More...
size_t reference_count () const noexcept
 Return the number of references to this object (for debugging) More...
bool unique () const noexcept
 Test if the ReferenceCounter is referenced by only one CountingPtr. More...

Private Member Functions

void OnStreamBlock (size_t from, uint32_t seq, Block &&b)
 called from Multiplexer when there is a new Block for this Stream. More...
void OnStreamBlockOrdered (size_t from, Block &&b)
 called to process PinnedBlock in sequence More...

Private Attributes

bool is_closed_ = false
 flag if Close() was completed More...
MixBlockQueue queue_
 BlockQueue to store incoming Blocks with source. More...
std::vector< SeqReordering > seq_
 Block Sequence numbers. More...

Static Private Attributes

static constexpr bool debug = false

Additional Inherited Members

- Public Attributes inherited from StreamData
std::atomic< size_t > rx_int_blocks_ { 0 }
std::atomic< size_t > rx_int_bytes_ { 0 }
std::atomic< size_t > rx_int_items_ { 0 }
common::StatsTimerStart rx_lifetime_
std::atomic< size_t > rx_net_blocks_ { 0 }
std::atomic< size_t > rx_net_bytes_ { 0 }
std::atomic< size_t > rx_net_items_ { 0 }
common::StatsTimerStopped rx_timespan_
tlx::Semaphore sem_queue_
std::atomic< size_t > tx_int_blocks_ { 0 }
std::atomic< size_t > tx_int_bytes_ { 0 }
std::atomic< size_t > tx_int_items_ { 0 }
common::StatsTimerStart tx_lifetime_
 Timers from creation of stream until rx / tx direction is closed. More...
std::atomic< size_t > tx_net_blocks_ { 0 }
std::atomic< size_t > tx_net_bytes_ { 0 }
std::atomic< size_t > tx_net_items_ { 0 }
common::StatsTimerStopped tx_timespan_
 Timers from first rx / tx package until rx / tx direction is closed. More...
- Static Public Attributes inherited from StreamData
static constexpr bool debug = false
- Protected Attributes inherited from StreamData
bool all_writers_closed_ = false
 bool if all writers were closed More...
size_t dia_id_
 associated DIANode id. More...
StreamId id_
 our own stream id. More...
size_t local_worker_id_
 local worker id More...
 reference to multiplexer More...
std::atomic< size_t > remaining_closing_blocks_
tlx::Semaphore sem_closing_blocks_
 number of received stream closing Blocks. More...
 pointer to StreamSetBase containing this StreamData More...
size_t writers_closed_ = 0
 number of writers closed via StreamSink. More...

Member Typedef Documentation

◆ Handle

using Handle = MixStream

◆ MixReader

Constructor & Destructor Documentation

◆ MixStreamData() [1/3]

MixStreamData ( StreamSetBase stream_set_base,
Multiplexer multiplexer,
size_t  send_size_limit,
const StreamId id,
size_t  local_worker_id,
size_t  dia_id 

Creates a new stream instance.

References StreamData::num_hosts(), StreamData::remaining_closing_blocks_, and StreamData::workers_per_host().

◆ MixStreamData() [2/3]

MixStreamData ( const MixStreamData )

non-copyable: delete copy-constructor

◆ MixStreamData() [3/3]

MixStreamData ( MixStreamData &&  )

move-constructor: default

◆ ~MixStreamData()

~MixStreamData ( )

References LOG.

Member Function Documentation

◆ Close()

◆ closed()

bool closed ( ) const

Indicates if the stream is closed - meaning all remaining outbound queues have been closed.

Implements StreamData.

References MixStreamData::is_closed_, MixStreamData::queue_, and MixBlockQueue::write_closed().

◆ GetMixReader()

MixStreamData::MixReader GetMixReader ( bool  consume)

Creates a BlockReader which mixes items from all workers.

References StreamData::local_worker_id_, MixStreamData::queue_, and StreamData::rx_timespan_.

Referenced by MixStreamData::GetReader().

◆ GetReader()

MixStreamData::MixReader GetReader ( bool  consume)

Open a MixReader (function name matches a method in File and CatStream).

References MixStreamData::GetMixReader().

◆ GetWriters()

◆ is_queue_closed()

bool is_queue_closed ( size_t  from)

check if inbound queue is closed

Definition at line 156 of file mix_stream.cpp.

References MixBlockQueue::is_queue_closed(), MixStreamData::queue_, and MixStreamData::seq_.

◆ OnStreamBlock()

void OnStreamBlock ( size_t  from,
uint32_t  seq,
Block &&  b 

◆ OnStreamBlockOrdered()

◆ operator=()

MixStreamData& operator= ( const MixStreamData )

non-copyable: delete assignment operator

◆ set_dia_id()

void set_dia_id ( size_t  dia_id)

change dia_id after construction (needed because it may be unknown at construction)

References StreamData::dia_id_, MixStreamData::queue_, and MixBlockQueue::set_dia_id().

◆ stream_type()

const char * stream_type ( )

return stream type string

Implements StreamData.

Member Data Documentation

◆ debug

constexpr bool debug = false

◆ is_closed_

bool is_closed_ = false

flag if Close() was completed

Referenced by MixStreamData::Close(), and MixStreamData::closed().

◆ queue_

◆ seq_

std::vector<SeqReordering> seq_

