Thrill  0.1
MixStreamData Class Referencefinal

Detailed Description

A Stream is a virtual set of connections to all other worker instances, hence a "Stream" bundles them to a logical communication context.

We call an individual connection from a worker to another worker a "Host".

To use a Stream, one can get a vector of BlockWriter via OpenWriters() of outbound Stream. The vector is of size of workers in the system. One can then write items destined to the corresponding worker. The written items are buffered into a Block and only sent when the Block is full. To force a send, use BlockWriter::Flush(). When all items are sent, the BlockWriters must be closed using BlockWriter::Close().

The MixStream allows reading of items from all workers in an unordered sequence, without waiting for any of the workers to complete sending items.

Definition at line 44 of file mix_stream.hpp.

+ Inheritance diagram for MixStreamData:
+ Collaboration diagram for MixStreamData:

#include <mix_stream.hpp>

Public Types

using Handle = MixStream
 
using MixReader = MixBlockQueueReader
 
- Public Types inherited from StreamData
using Writer = BlockWriter< StreamSink >
 

Public Member Functions

 MixStreamData (StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
 Creates a new stream instance. More...
 
 MixStreamData (const MixStreamData &)=delete
 non-copyable: delete copy-constructor More...
 
 MixStreamData (MixStreamData &&)=default
 move-constructor: default More...
 
 ~MixStreamData () final
 
void Close () final
 shuts the stream down. More...
 
bool closed () const final
 
MixReader GetMixReader (bool consume)
 Creates a BlockReader which mixes items from all workers. More...
 
MixReader GetReader (bool consume)
 Open a MixReader (function name matches a method in File and CatStream). More...
 
Writers GetWriters () final
 
bool is_queue_closed (size_t from)
 check if inbound queue is closed More...
 
MixStreamDataoperator= (const MixStreamData &)=delete
 non-copyable: delete assignment operator More...
 
void set_dia_id (size_t dia_id)
 
const char * stream_type () final
 return stream type string More...
 
- Public Member Functions inherited from StreamData
 StreamData (StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
 
virtual ~StreamData ()
 
const StreamIdid () const
 Return stream id. More...
 
size_t my_host_rank () const
 Returns my_host_rank. More...
 
size_t my_worker_rank () const
 Returns my_worker_rank_. More...
 
size_t num_hosts () const
 Number of hosts in system. More...
 
size_t num_workers () const
 Number of workers in system. More...
 
void OnAllWritersClosed ()
 method called when all StreamSink writers have finished More...
 
void OnWriterClosed (size_t peer_worker_rank, bool sent)
 
size_t workers_per_host () const
 Returns workers_per_host. More...
 
- Public Member Functions inherited from ReferenceCounter
 ReferenceCounter () noexcept
 new objects have zero reference count More...
 
 ReferenceCounter (const ReferenceCounter &) noexcept
 coping still creates a new object with zero reference count More...
 
 ~ReferenceCounter ()
 
bool dec_reference () const noexcept
 Call whenever resetting (i.e. More...
 
void inc_reference () const noexcept
 Call whenever setting a pointer to the object. More...
 
ReferenceCounteroperator= (const ReferenceCounter &) noexcept
 assignment operator, leaves pointers unchanged More...
 
size_t reference_count () const noexcept
 Return the number of references to this object (for debugging) More...
 
bool unique () const noexcept
 Test if the ReferenceCounter is referenced by only one CountingPtr. More...
 

Private Member Functions

void OnStreamBlock (size_t from, uint32_t seq, Block &&b)
 called from Multiplexer when there is a new Block for this Stream. More...
 
void OnStreamBlockOrdered (size_t from, Block &&b)
 called to process PinnedBlock in sequence More...
 

Private Attributes

bool is_closed_ = false
 flag if Close() was completed More...
 
MixBlockQueue queue_
 BlockQueue to store incoming Blocks with source. More...
 
std::vector< SeqReordering > seq_
 Block Sequence numbers. More...
 

Static Private Attributes

static constexpr bool debug = false
 

Additional Inherited Members

- Public Attributes inherited from StreamData
std::atomic< size_t > rx_int_blocks_ { 0 }
 
std::atomic< size_t > rx_int_bytes_ { 0 }
 
std::atomic< size_t > rx_int_items_ { 0 }
 
common::StatsTimerStart rx_lifetime_
 
std::atomic< size_t > rx_net_blocks_ { 0 }
 
std::atomic< size_t > rx_net_bytes_ { 0 }
 
std::atomic< size_t > rx_net_items_ { 0 }
 
common::StatsTimerStopped rx_timespan_
 
tlx::Semaphore sem_queue_
 
std::atomic< size_t > tx_int_blocks_ { 0 }
 
std::atomic< size_t > tx_int_bytes_ { 0 }
 
std::atomic< size_t > tx_int_items_ { 0 }
 
common::StatsTimerStart tx_lifetime_
 Timers from creation of stream until rx / tx direction is closed. More...
 
std::atomic< size_t > tx_net_blocks_ { 0 }
 
std::atomic< size_t > tx_net_bytes_ { 0 }
 
std::atomic< size_t > tx_net_items_ { 0 }
 
common::StatsTimerStopped tx_timespan_
 Timers from first rx / tx package until rx / tx direction is closed. More...
 
- Static Public Attributes inherited from StreamData
static constexpr bool debug = false
 
- Protected Attributes inherited from StreamData
bool all_writers_closed_ = false
 bool if all writers were closed More...
 
size_t dia_id_
 associated DIANode id. More...
 
StreamId id_
 our own stream id. More...
 
size_t local_worker_id_
 local worker id More...
 
Multiplexermultiplexer_
 reference to multiplexer More...
 
std::atomic< size_t > remaining_closing_blocks_
 
tlx::Semaphore sem_closing_blocks_
 number of received stream closing Blocks. More...
 
StreamSetBasestream_set_base_
 pointer to StreamSetBase containing this StreamData More...
 
size_t writers_closed_ = 0
 number of writers closed via StreamSink. More...
 

Member Typedef Documentation

◆ Handle

using Handle = MixStream

Definition at line 51 of file mix_stream.hpp.

◆ MixReader

Definition at line 49 of file mix_stream.hpp.

Constructor & Destructor Documentation

◆ MixStreamData() [1/3]

MixStreamData ( StreamSetBase stream_set_base,
Multiplexer multiplexer,
size_t  send_size_limit,
const StreamId id,
size_t  local_worker_id,
size_t  dia_id 
)

Creates a new stream instance.

Definition at line 27 of file mix_stream.cpp.

References StreamData::num_hosts(), StreamData::remaining_closing_blocks_, and StreamData::workers_per_host().

◆ MixStreamData() [2/3]

MixStreamData ( const MixStreamData )
delete

non-copyable: delete copy-constructor

◆ MixStreamData() [3/3]

MixStreamData ( MixStreamData &&  )
default

move-constructor: default

◆ ~MixStreamData()

~MixStreamData ( )
final

Definition at line 39 of file mix_stream.cpp.

References LOG.

Member Function Documentation

◆ Close()

◆ closed()

bool closed ( ) const
finalvirtual

Indicates if the stream is closed - meaning all remaining outbound queues have been closed.

Implements StreamData.

Definition at line 149 of file mix_stream.cpp.

References MixStreamData::is_closed_, MixStreamData::queue_, and MixBlockQueue::write_closed().

◆ GetMixReader()

MixStreamData::MixReader GetMixReader ( bool  consume)

Creates a BlockReader which mixes items from all workers.

Definition at line 115 of file mix_stream.cpp.

References StreamData::local_worker_id_, MixStreamData::queue_, and StreamData::rx_timespan_.

Referenced by MixStreamData::GetReader().

◆ GetReader()

MixStreamData::MixReader GetReader ( bool  consume)

Open a MixReader (function name matches a method in File and CatStream).

Definition at line 120 of file mix_stream.cpp.

References MixStreamData::GetMixReader().

◆ GetWriters()

◆ is_queue_closed()

bool is_queue_closed ( size_t  from)

check if inbound queue is closed

Definition at line 156 of file mix_stream.cpp.

References MixBlockQueue::is_queue_closed(), MixStreamData::queue_, and MixStreamData::seq_.

◆ OnStreamBlock()

void OnStreamBlock ( size_t  from,
uint32_t  seq,
Block &&  b 
)
private

◆ OnStreamBlockOrdered()

◆ operator=()

MixStreamData& operator= ( const MixStreamData )
delete

non-copyable: delete assignment operator

◆ set_dia_id()

void set_dia_id ( size_t  dia_id)

change dia_id after construction (needed because it may be unknown at construction)

Definition at line 43 of file mix_stream.cpp.

References StreamData::dia_id_, MixStreamData::queue_, and MixBlockQueue::set_dia_id().

◆ stream_type()

const char * stream_type ( )
finalvirtual

return stream type string

Implements StreamData.

Definition at line 48 of file mix_stream.cpp.

Member Data Documentation

◆ debug

constexpr bool debug = false
staticprivate

Definition at line 46 of file mix_stream.hpp.

◆ is_closed_

bool is_closed_ = false
private

flag if Close() was completed

Definition at line 96 of file mix_stream.hpp.

Referenced by MixStreamData::Close(), and MixStreamData::closed().

◆ queue_

◆ seq_

std::vector<SeqReordering> seq_
private

The documentation for this class was generated from the following files: