Thrill  0.1
CatStreamData Class Referencefinal

Detailed Description

A Stream is a virtual set of connections to all other worker instances, hence a "Stream" bundles them to a logical communication context.

We call an individual connection from a worker to another worker a "Host".

To use a Stream, one can get a vector of BlockWriter via OpenWriters() of outbound Stream. The vector is of size of workers in the system. One can then write items destined to the corresponding worker. The written items are buffered into a Block and only sent when the Block is full. To force a send, use BlockWriter::Flush(). When all items are sent, the BlockWriters must be closed using BlockWriter::Close().

To read the inbound Connection items, one can get a vector of BlockReader via OpenReaders(), which can then be used to read items sent by individual workers.

Alternatively, one can use OpenReader() to get a BlockReader which delivers all items from all worker in worker order (concatenating all inbound Connections).

As soon as all attached streams of the Stream have been Close() the number of expected streams is reached, the stream is marked as finished and no more data will arrive.

Definition at line 54 of file cat_stream.hpp.

+ Inheritance diagram for CatStreamData:
+ Collaboration diagram for CatStreamData:

#include <cat_stream.hpp>

Public Types

using BlockQueueReader = BlockReader< BlockQueueSource >
 
using BlockQueueSource = ConsumeBlockQueueSource
 
using CatBlockReader = BlockReader< CatBlockSource >
 
using CatBlockSource = data::CatBlockSource< DynBlockSource >
 
using CatReader = CatBlockReader
 
using Handle = CatStream
 
using Reader = BlockQueueReader
 
- Public Types inherited from StreamData
using Writer = BlockWriter< StreamSink >
 

Public Member Functions

 CatStreamData (StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
 Creates a new stream instance. More...
 
 CatStreamData (const CatStreamData &)=delete
 non-copyable: delete copy-constructor More...
 
 CatStreamData (CatStreamData &&)=default
 move-constructor: default More...
 
 ~CatStreamData () final
 
void Close () final
 shuts the stream down. More...
 
bool closed () const final
 
CatBlockSource GetCatBlockSource (bool consume)
 Gets a CatBlockSource which includes all incoming queues of this stream. More...
 
CatReader GetCatReader (bool consume)
 
CatReader GetReader (bool consume)
 Open a CatReader (function name matches a method in File and MixStream). More...
 
std::vector< ReaderGetReaders ()
 
Writers GetWriters () final
 
bool is_queue_closed (size_t from)
 check if inbound queue is closed More...
 
CatStreamDataoperator= (const CatStreamData &)=delete
 non-copyable: delete assignment operator More...
 
void set_dia_id (size_t dia_id)
 
const char * stream_type () final
 return stream type string More...
 
- Public Member Functions inherited from StreamData
 StreamData (StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
 
virtual ~StreamData ()
 
const StreamIdid () const
 Return stream id. More...
 
size_t my_host_rank () const
 Returns my_host_rank. More...
 
size_t my_worker_rank () const
 Returns my_worker_rank_. More...
 
size_t num_hosts () const
 Number of hosts in system. More...
 
size_t num_workers () const
 Number of workers in system. More...
 
void OnAllWritersClosed ()
 method called when all StreamSink writers have finished More...
 
void OnWriterClosed (size_t peer_worker_rank, bool sent)
 
size_t workers_per_host () const
 Returns workers_per_host. More...
 
- Public Member Functions inherited from ReferenceCounter
 ReferenceCounter () noexcept
 new objects have zero reference count More...
 
 ReferenceCounter (const ReferenceCounter &) noexcept
 coping still creates a new object with zero reference count More...
 
 ~ReferenceCounter ()
 
bool dec_reference () const noexcept
 Call whenever resetting (i.e. More...
 
void inc_reference () const noexcept
 Call whenever setting a pointer to the object. More...
 
ReferenceCounteroperator= (const ReferenceCounter &) noexcept
 assignment operator, leaves pointers unchanged More...
 
size_t reference_count () const noexcept
 Return the number of references to this object (for debugging) More...
 
bool unique () const noexcept
 Test if the ReferenceCounter is referenced by only one CountingPtr. More...
 

Static Public Attributes

static constexpr bool debug = false
 
static constexpr bool debug_data = false
 
- Static Public Attributes inherited from StreamData
static constexpr bool debug = false
 

Private Member Functions

BlockQueueloopback_queue (size_t from_worker_id)
 Returns the loopback queue for the worker of this stream. More...
 
void OnStreamBlock (size_t from, uint32_t seq, Block &&b)
 
void OnStreamBlockOrdered (size_t from, Block &&b)
 

Private Attributes

bool is_closed_ = false
 
std::vector< BlockQueuequeues_
 BlockQueues to store incoming Blocks with no attached destination. More...
 
std::vector< SeqReordering > seq_
 Block Sequence numbers. More...
 

Additional Inherited Members

- Public Attributes inherited from StreamData
std::atomic< size_t > rx_int_blocks_ { 0 }
 
std::atomic< size_t > rx_int_bytes_ { 0 }
 
std::atomic< size_t > rx_int_items_ { 0 }
 
common::StatsTimerStart rx_lifetime_
 
std::atomic< size_t > rx_net_blocks_ { 0 }
 
std::atomic< size_t > rx_net_bytes_ { 0 }
 
std::atomic< size_t > rx_net_items_ { 0 }
 
common::StatsTimerStopped rx_timespan_
 
tlx::Semaphore sem_queue_
 
std::atomic< size_t > tx_int_blocks_ { 0 }
 
std::atomic< size_t > tx_int_bytes_ { 0 }
 
std::atomic< size_t > tx_int_items_ { 0 }
 
common::StatsTimerStart tx_lifetime_
 Timers from creation of stream until rx / tx direction is closed. More...
 
std::atomic< size_t > tx_net_blocks_ { 0 }
 
std::atomic< size_t > tx_net_bytes_ { 0 }
 
std::atomic< size_t > tx_net_items_ { 0 }
 
common::StatsTimerStopped tx_timespan_
 Timers from first rx / tx package until rx / tx direction is closed. More...
 
- Protected Attributes inherited from StreamData
bool all_writers_closed_ = false
 bool if all writers were closed More...
 
size_t dia_id_
 associated DIANode id. More...
 
StreamId id_
 our own stream id. More...
 
size_t local_worker_id_
 local worker id More...
 
Multiplexermultiplexer_
 reference to multiplexer More...
 
std::atomic< size_t > remaining_closing_blocks_
 
tlx::Semaphore sem_closing_blocks_
 number of received stream closing Blocks. More...
 
StreamSetBasestream_set_base_
 pointer to StreamSetBase containing this StreamData More...
 
size_t writers_closed_ = 0
 number of writers closed via StreamSink. More...
 

Member Typedef Documentation

◆ BlockQueueReader

Definition at line 61 of file cat_stream.hpp.

◆ BlockQueueSource

Definition at line 60 of file cat_stream.hpp.

◆ CatBlockReader

Definition at line 64 of file cat_stream.hpp.

◆ CatBlockSource

Definition at line 63 of file cat_stream.hpp.

◆ CatReader

Definition at line 67 of file cat_stream.hpp.

◆ Handle

using Handle = CatStream

Definition at line 69 of file cat_stream.hpp.

◆ Reader

Definition at line 66 of file cat_stream.hpp.

Constructor & Destructor Documentation

◆ CatStreamData() [1/3]

◆ CatStreamData() [2/3]

CatStreamData ( const CatStreamData )
delete

non-copyable: delete copy-constructor

◆ CatStreamData() [3/3]

CatStreamData ( CatStreamData &&  )
default

move-constructor: default

◆ ~CatStreamData()

~CatStreamData ( )
final

Definition at line 74 of file cat_stream.cpp.

References LOG.

Member Function Documentation

◆ Close()

◆ closed()

bool closed ( ) const
finalvirtual

Indicates if the stream is closed - meaning all remaining streams have been closed. This does not include the loopback stream

Implements StreamData.

Definition at line 223 of file cat_stream.cpp.

References CatStreamData::queues_.

◆ GetCatBlockSource()

CatStreamData::CatBlockSource GetCatBlockSource ( bool  consume)

Gets a CatBlockSource which includes all incoming queues of this stream.

Definition at line 169 of file cat_stream.cpp.

References StreamData::local_worker_id_, StreamData::num_workers(), CatStreamData::queues_, and StreamData::rx_timespan_.

Referenced by CatStreamData::GetCatReader().

◆ GetCatReader()

CatStreamData::CatReader GetCatReader ( bool  consume)

Creates a BlockReader which concatenates items from all workers in worker rank order. The BlockReader is attached to one CatBlockSource which includes all incoming queues of this stream.

Definition at line 185 of file cat_stream.cpp.

References CatStreamData::GetCatBlockSource().

Referenced by CatStreamData::GetReader().

◆ GetReader()

CatStreamData::CatReader GetReader ( bool  consume)

Open a CatReader (function name matches a method in File and MixStream).

Definition at line 189 of file cat_stream.cpp.

References CatStreamData::GetCatReader().

◆ GetReaders()

std::vector< CatStreamData::Reader > GetReaders ( )

Creates a BlockReader for each worker. The BlockReaders are attached to the BlockQueues in the Stream and wait for further Blocks to arrive or the Stream's remote close. These Readers always consume!

Definition at line 154 of file cat_stream.cpp.

References StreamData::local_worker_id_, StreamData::num_workers(), CatStreamData::queues_, and StreamData::rx_timespan_.

◆ GetWriters()

◆ is_queue_closed()

bool is_queue_closed ( size_t  from)

check if inbound queue is closed

Definition at line 231 of file cat_stream.cpp.

References CatStreamData::queues_, and CatStreamData::seq_.

◆ loopback_queue()

BlockQueue * loopback_queue ( size_t  from_worker_id)
private

Returns the loopback queue for the worker of this stream.

Definition at line 316 of file cat_stream.cpp.

References StreamData::local_worker_id_, StreamData::my_host_rank(), CatStreamData::queues_, sLOG, and StreamData::workers_per_host().

◆ OnStreamBlock()

◆ OnStreamBlockOrdered()

◆ operator=()

CatStreamData& operator= ( const CatStreamData )
delete

non-copyable: delete assignment operator

◆ set_dia_id()

void set_dia_id ( size_t  dia_id)

change dia_id after construction (needed because it may be unknown at construction)

Definition at line 78 of file cat_stream.cpp.

References StreamData::dia_id_, and CatStreamData::queues_.

◆ stream_type()

const char * stream_type ( )
finalvirtual

return stream type string

Implements StreamData.

Definition at line 85 of file cat_stream.cpp.

Member Data Documentation

◆ debug

constexpr bool debug = false
static

Definition at line 57 of file cat_stream.hpp.

◆ debug_data

constexpr bool debug_data = false
static

Definition at line 58 of file cat_stream.hpp.

Referenced by CatStreamData::OnStreamBlock().

◆ is_closed_

bool is_closed_ = false
private

Definition at line 123 of file cat_stream.hpp.

Referenced by CatStreamData::Close().

◆ queues_

◆ seq_

std::vector<SeqReordering> seq_
private

The documentation for this class was generated from the following files: