Thrill
0.1
|
A Stream is a virtual set of connections to all other worker instances, hence a "Stream" bundles them to a logical communication context.
We call an individual connection from a worker to another worker a "Host".
To use a Stream, one can get a vector of BlockWriter via OpenWriters() of outbound Stream. The vector is of size of workers in the system. One can then write items destined to the corresponding worker. The written items are buffered into a Block and only sent when the Block is full. To force a send, use BlockWriter::Flush(). When all items are sent, the BlockWriters must be closed using BlockWriter::Close().
To read the inbound Connection items, one can get a vector of BlockReader via OpenReaders(), which can then be used to read items sent by individual workers.
Alternatively, one can use OpenReader() to get a BlockReader which delivers all items from all worker in worker order (concatenating all inbound Connections).
As soon as all attached streams of the Stream have been Close() the number of expected streams is reached, the stream is marked as finished and no more data will arrive.
Definition at line 54 of file cat_stream.hpp.
#include <cat_stream.hpp>
Public Types | |
using | BlockQueueReader = BlockReader< BlockQueueSource > |
using | BlockQueueSource = ConsumeBlockQueueSource |
using | CatBlockReader = BlockReader< CatBlockSource > |
using | CatBlockSource = data::CatBlockSource< DynBlockSource > |
using | CatReader = CatBlockReader |
using | Handle = CatStream |
using | Reader = BlockQueueReader |
Public Types inherited from StreamData | |
using | Writer = BlockWriter< StreamSink > |
Public Member Functions | |
CatStreamData (StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id) | |
Creates a new stream instance. More... | |
CatStreamData (const CatStreamData &)=delete | |
non-copyable: delete copy-constructor More... | |
CatStreamData (CatStreamData &&)=default | |
move-constructor: default More... | |
~CatStreamData () final | |
void | Close () final |
shuts the stream down. More... | |
bool | closed () const final |
CatBlockSource | GetCatBlockSource (bool consume) |
Gets a CatBlockSource which includes all incoming queues of this stream. More... | |
CatReader | GetCatReader (bool consume) |
CatReader | GetReader (bool consume) |
Open a CatReader (function name matches a method in File and MixStream). More... | |
std::vector< Reader > | GetReaders () |
Writers | GetWriters () final |
bool | is_queue_closed (size_t from) |
check if inbound queue is closed More... | |
CatStreamData & | operator= (const CatStreamData &)=delete |
non-copyable: delete assignment operator More... | |
void | set_dia_id (size_t dia_id) |
const char * | stream_type () final |
return stream type string More... | |
Public Member Functions inherited from StreamData | |
StreamData (StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id) | |
virtual | ~StreamData () |
const StreamId & | id () const |
Return stream id. More... | |
size_t | my_host_rank () const |
Returns my_host_rank. More... | |
size_t | my_worker_rank () const |
Returns my_worker_rank_. More... | |
size_t | num_hosts () const |
Number of hosts in system. More... | |
size_t | num_workers () const |
Number of workers in system. More... | |
void | OnAllWritersClosed () |
method called when all StreamSink writers have finished More... | |
void | OnWriterClosed (size_t peer_worker_rank, bool sent) |
size_t | workers_per_host () const |
Returns workers_per_host. More... | |
Public Member Functions inherited from ReferenceCounter | |
ReferenceCounter () noexcept | |
new objects have zero reference count More... | |
ReferenceCounter (const ReferenceCounter &) noexcept | |
coping still creates a new object with zero reference count More... | |
~ReferenceCounter () | |
bool | dec_reference () const noexcept |
Call whenever resetting (i.e. More... | |
void | inc_reference () const noexcept |
Call whenever setting a pointer to the object. More... | |
ReferenceCounter & | operator= (const ReferenceCounter &) noexcept |
assignment operator, leaves pointers unchanged More... | |
size_t | reference_count () const noexcept |
Return the number of references to this object (for debugging) More... | |
bool | unique () const noexcept |
Test if the ReferenceCounter is referenced by only one CountingPtr. More... | |
Static Public Attributes | |
static constexpr bool | debug = false |
static constexpr bool | debug_data = false |
Static Public Attributes inherited from StreamData | |
static constexpr bool | debug = false |
Private Member Functions | |
BlockQueue * | loopback_queue (size_t from_worker_id) |
Returns the loopback queue for the worker of this stream. More... | |
void | OnStreamBlock (size_t from, uint32_t seq, Block &&b) |
void | OnStreamBlockOrdered (size_t from, Block &&b) |
Private Attributes | |
bool | is_closed_ = false |
std::vector< BlockQueue > | queues_ |
BlockQueues to store incoming Blocks with no attached destination. More... | |
std::vector< SeqReordering > | seq_ |
Block Sequence numbers. More... | |
Additional Inherited Members | |
Public Attributes inherited from StreamData | |
std::atomic< size_t > | rx_int_blocks_ { 0 } |
std::atomic< size_t > | rx_int_bytes_ { 0 } |
std::atomic< size_t > | rx_int_items_ { 0 } |
common::StatsTimerStart | rx_lifetime_ |
std::atomic< size_t > | rx_net_blocks_ { 0 } |
std::atomic< size_t > | rx_net_bytes_ { 0 } |
std::atomic< size_t > | rx_net_items_ { 0 } |
common::StatsTimerStopped | rx_timespan_ |
tlx::Semaphore | sem_queue_ |
std::atomic< size_t > | tx_int_blocks_ { 0 } |
std::atomic< size_t > | tx_int_bytes_ { 0 } |
std::atomic< size_t > | tx_int_items_ { 0 } |
common::StatsTimerStart | tx_lifetime_ |
Timers from creation of stream until rx / tx direction is closed. More... | |
std::atomic< size_t > | tx_net_blocks_ { 0 } |
std::atomic< size_t > | tx_net_bytes_ { 0 } |
std::atomic< size_t > | tx_net_items_ { 0 } |
common::StatsTimerStopped | tx_timespan_ |
Timers from first rx / tx package until rx / tx direction is closed. More... | |
Protected Attributes inherited from StreamData | |
bool | all_writers_closed_ = false |
bool if all writers were closed More... | |
size_t | dia_id_ |
associated DIANode id. More... | |
StreamId | id_ |
our own stream id. More... | |
size_t | local_worker_id_ |
local worker id More... | |
Multiplexer & | multiplexer_ |
reference to multiplexer More... | |
std::atomic< size_t > | remaining_closing_blocks_ |
tlx::Semaphore | sem_closing_blocks_ |
number of received stream closing Blocks. More... | |
StreamSetBase * | stream_set_base_ |
pointer to StreamSetBase containing this StreamData More... | |
size_t | writers_closed_ = 0 |
number of writers closed via StreamSink. More... | |
using BlockQueueReader = BlockReader<BlockQueueSource> |
Definition at line 61 of file cat_stream.hpp.
Definition at line 60 of file cat_stream.hpp.
using CatBlockReader = BlockReader<CatBlockSource> |
Definition at line 64 of file cat_stream.hpp.
Definition at line 63 of file cat_stream.hpp.
using CatReader = CatBlockReader |
Definition at line 67 of file cat_stream.hpp.
Definition at line 69 of file cat_stream.hpp.
using Reader = BlockQueueReader |
Definition at line 66 of file cat_stream.hpp.
CatStreamData | ( | StreamSetBase * | stream_set_base, |
Multiplexer & | multiplexer, | ||
size_t | send_size_limit, | ||
const StreamId & | id, | ||
size_t | local_worker_id, | ||
size_t | dia_id | ||
) |
Creates a new stream instance.
Definition at line 28 of file cat_stream.cpp.
References BlockQueue::block_counter(), Multiplexer::block_pool_, BlockQueue::byte_counter(), StreamData::id_, BlockQueue::item_counter(), Multiplexer::logger(), StreamData::multiplexer_, StreamData::my_host_rank(), StreamData::my_worker_rank(), StreamData::num_hosts(), StreamData::num_workers(), CatStreamData::queues_, StreamData::remaining_closing_blocks_, CatStreamData::seq_, and StreamData::workers_per_host().
|
delete |
non-copyable: delete copy-constructor
|
default |
move-constructor: default
|
final |
Definition at line 74 of file cat_stream.cpp.
References LOG.
|
finalvirtual |
shuts the stream down.
Implements StreamData.
Definition at line 193 of file cat_stream.cpp.
References Multiplexer::active_streams_, StreamData::all_writers_closed_, die_unless, StreamData::id(), StreamData::id_, Multiplexer::IntReleaseCatStream(), CatStreamData::is_closed_, StreamData::local_worker_id_, LOG, StreamData::multiplexer_, Multiplexer::mutex_, StreamData::my_host_rank(), StreamData::my_worker_rank(), CatStreamData::queues_, StreamData::sem_closing_blocks_, sLOG, Semaphore::wait(), and StreamData::workers_per_host().
|
finalvirtual |
Indicates if the stream is closed - meaning all remaining streams have been closed. This does not include the loopback stream
Implements StreamData.
Definition at line 223 of file cat_stream.cpp.
References CatStreamData::queues_.
CatStreamData::CatBlockSource GetCatBlockSource | ( | bool | consume | ) |
Gets a CatBlockSource which includes all incoming queues of this stream.
Definition at line 169 of file cat_stream.cpp.
References StreamData::local_worker_id_, StreamData::num_workers(), CatStreamData::queues_, and StreamData::rx_timespan_.
Referenced by CatStreamData::GetCatReader().
CatStreamData::CatReader GetCatReader | ( | bool | consume | ) |
Creates a BlockReader which concatenates items from all workers in worker rank order. The BlockReader is attached to one CatBlockSource which includes all incoming queues of this stream.
Definition at line 185 of file cat_stream.cpp.
References CatStreamData::GetCatBlockSource().
Referenced by CatStreamData::GetReader().
CatStreamData::CatReader GetReader | ( | bool | consume | ) |
Open a CatReader (function name matches a method in File and MixStream).
Definition at line 189 of file cat_stream.cpp.
References CatStreamData::GetCatReader().
std::vector< CatStreamData::Reader > GetReaders | ( | ) |
Creates a BlockReader for each worker. The BlockReaders are attached to the BlockQueues in the Stream and wait for further Blocks to arrive or the Stream's remote close. These Readers always consume!
Definition at line 154 of file cat_stream.cpp.
References StreamData::local_worker_id_, StreamData::num_workers(), CatStreamData::queues_, and StreamData::rx_timespan_.
|
finalvirtual |
Creates BlockWriters for each worker. BlockWriter can only be opened once, otherwise the block sequence is incorrectly interleaved!
Implements StreamData.
Definition at line 89 of file cat_stream.cpp.
References Multiplexer::active_streams_, Multiplexer::block_pool_, Multiplexer::CatLoopback(), thrill::data::CatStreamBlock, Group::connection(), thrill::data::default_block_size, Multiplexer::group_, BlockPool::hard_ram_limit(), StreamData::id_, StreamData::local_worker_id_, LOGC, max(), Multiplexer::max_active_streams_, StreamData::multiplexer_, Multiplexer::mutex_, StreamData::my_host_rank(), StreamData::my_worker_rank(), StreamData::num_hosts(), Multiplexer::num_workers(), StreamData::num_workers(), tlx::round_down_to_power_of_two(), StreamData::StreamSink, StreamData::tx_timespan_, Multiplexer::workers_per_host(), and StreamData::workers_per_host().
bool is_queue_closed | ( | size_t | from | ) |
check if inbound queue is closed
Definition at line 231 of file cat_stream.cpp.
References CatStreamData::queues_, and CatStreamData::seq_.
|
private |
Returns the loopback queue for the worker of this stream.
Definition at line 316 of file cat_stream.cpp.
References StreamData::local_worker_id_, StreamData::my_host_rank(), CatStreamData::queues_, sLOG, and StreamData::workers_per_host().
|
private |
called from Multiplexer when there is a new Block on a Stream.
Definition at line 243 of file cat_stream.cpp.
References CatStreamData::debug_data, die_unless, StreamMultiplexerHeader::final_seq, tlx::hexdump(), StreamData::id_, StreamData::local_worker_id_, LOG, CatStreamData::OnStreamBlockOrdered(), CatStreamData::queues_, StreamData::rx_timespan_, CatStreamData::seq_, sLOG, and TLX_UNLIKELY.
|
private |
Definition at line 287 of file cat_stream.cpp.
References die_unless, StreamData::id_, StreamData::my_worker_rank(), CatStreamData::queues_, StreamData::remaining_closing_blocks_, StreamData::rx_lifetime_, StreamData::rx_net_blocks_, StreamData::rx_net_bytes_, StreamData::rx_net_items_, StreamData::rx_timespan_, StreamData::sem_closing_blocks_, CatStreamData::seq_, Semaphore::signal(), and sLOG.
Referenced by CatStreamData::OnStreamBlock().
|
delete |
non-copyable: delete assignment operator
void set_dia_id | ( | size_t | dia_id | ) |
change dia_id after construction (needed because it may be unknown at construction)
Definition at line 78 of file cat_stream.cpp.
References StreamData::dia_id_, and CatStreamData::queues_.
|
finalvirtual |
|
static |
Definition at line 57 of file cat_stream.hpp.
|
static |
Definition at line 58 of file cat_stream.hpp.
Referenced by CatStreamData::OnStreamBlock().
|
private |
Definition at line 123 of file cat_stream.hpp.
Referenced by CatStreamData::Close().
|
private |
BlockQueues to store incoming Blocks with no attached destination.
Definition at line 131 of file cat_stream.hpp.
Referenced by CatStreamData::CatStreamData(), CatStreamData::Close(), CatStreamData::closed(), CatStreamData::GetCatBlockSource(), CatStreamData::GetReaders(), CatStreamData::is_queue_closed(), CatStreamData::loopback_queue(), CatStreamData::OnStreamBlock(), CatStreamData::OnStreamBlockOrdered(), and CatStreamData::set_dia_id().
|
private |
Block Sequence numbers.
Definition at line 125 of file cat_stream.hpp.
Referenced by CatStreamData::CatStreamData(), CatStreamData::is_queue_closed(), CatStreamData::OnStreamBlock(), and CatStreamData::OnStreamBlockOrdered().