Thrill
0.1
|
A Stream is a virtual set of connections to all other worker instances, hence a "Stream" bundles them to a logical communication context.
We call an individual connection from a worker to another worker a "Host".
To use a Stream, one can get a vector of BlockWriter via OpenWriters() of outbound Stream. The vector is of size of workers in the system. One can then write items destined to the corresponding worker. The written items are buffered into a Block and only sent when the Block is full. To force a send, use BlockWriter::Flush(). When all items are sent, the BlockWriters must be closed using BlockWriter::Close().
The MixStream allows reading of items from all workers in an unordered sequence, without waiting for any of the workers to complete sending items.
Definition at line 44 of file mix_stream.hpp.
#include <mix_stream.hpp>
Public Types | |
using | Handle = MixStream |
using | MixReader = MixBlockQueueReader |
Public Types inherited from StreamData | |
using | Writer = BlockWriter< StreamSink > |
Public Member Functions | |
MixStreamData (StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id) | |
Creates a new stream instance. More... | |
MixStreamData (const MixStreamData &)=delete | |
non-copyable: delete copy-constructor More... | |
MixStreamData (MixStreamData &&)=default | |
move-constructor: default More... | |
~MixStreamData () final | |
void | Close () final |
shuts the stream down. More... | |
bool | closed () const final |
MixReader | GetMixReader (bool consume) |
Creates a BlockReader which mixes items from all workers. More... | |
MixReader | GetReader (bool consume) |
Open a MixReader (function name matches a method in File and CatStream). More... | |
Writers | GetWriters () final |
bool | is_queue_closed (size_t from) |
check if inbound queue is closed More... | |
MixStreamData & | operator= (const MixStreamData &)=delete |
non-copyable: delete assignment operator More... | |
void | set_dia_id (size_t dia_id) |
const char * | stream_type () final |
return stream type string More... | |
Public Member Functions inherited from StreamData | |
StreamData (StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id) | |
virtual | ~StreamData () |
const StreamId & | id () const |
Return stream id. More... | |
size_t | my_host_rank () const |
Returns my_host_rank. More... | |
size_t | my_worker_rank () const |
Returns my_worker_rank_. More... | |
size_t | num_hosts () const |
Number of hosts in system. More... | |
size_t | num_workers () const |
Number of workers in system. More... | |
void | OnAllWritersClosed () |
method called when all StreamSink writers have finished More... | |
void | OnWriterClosed (size_t peer_worker_rank, bool sent) |
size_t | workers_per_host () const |
Returns workers_per_host. More... | |
Public Member Functions inherited from ReferenceCounter | |
ReferenceCounter () noexcept | |
new objects have zero reference count More... | |
ReferenceCounter (const ReferenceCounter &) noexcept | |
coping still creates a new object with zero reference count More... | |
~ReferenceCounter () | |
bool | dec_reference () const noexcept |
Call whenever resetting (i.e. More... | |
void | inc_reference () const noexcept |
Call whenever setting a pointer to the object. More... | |
ReferenceCounter & | operator= (const ReferenceCounter &) noexcept |
assignment operator, leaves pointers unchanged More... | |
size_t | reference_count () const noexcept |
Return the number of references to this object (for debugging) More... | |
bool | unique () const noexcept |
Test if the ReferenceCounter is referenced by only one CountingPtr. More... | |
Private Member Functions | |
void | OnStreamBlock (size_t from, uint32_t seq, Block &&b) |
called from Multiplexer when there is a new Block for this Stream. More... | |
void | OnStreamBlockOrdered (size_t from, Block &&b) |
called to process PinnedBlock in sequence More... | |
Private Attributes | |
bool | is_closed_ = false |
flag if Close() was completed More... | |
MixBlockQueue | queue_ |
BlockQueue to store incoming Blocks with source. More... | |
std::vector< SeqReordering > | seq_ |
Block Sequence numbers. More... | |
Static Private Attributes | |
static constexpr bool | debug = false |
Additional Inherited Members | |
Public Attributes inherited from StreamData | |
std::atomic< size_t > | rx_int_blocks_ { 0 } |
std::atomic< size_t > | rx_int_bytes_ { 0 } |
std::atomic< size_t > | rx_int_items_ { 0 } |
common::StatsTimerStart | rx_lifetime_ |
std::atomic< size_t > | rx_net_blocks_ { 0 } |
std::atomic< size_t > | rx_net_bytes_ { 0 } |
std::atomic< size_t > | rx_net_items_ { 0 } |
common::StatsTimerStopped | rx_timespan_ |
tlx::Semaphore | sem_queue_ |
std::atomic< size_t > | tx_int_blocks_ { 0 } |
std::atomic< size_t > | tx_int_bytes_ { 0 } |
std::atomic< size_t > | tx_int_items_ { 0 } |
common::StatsTimerStart | tx_lifetime_ |
Timers from creation of stream until rx / tx direction is closed. More... | |
std::atomic< size_t > | tx_net_blocks_ { 0 } |
std::atomic< size_t > | tx_net_bytes_ { 0 } |
std::atomic< size_t > | tx_net_items_ { 0 } |
common::StatsTimerStopped | tx_timespan_ |
Timers from first rx / tx package until rx / tx direction is closed. More... | |
Static Public Attributes inherited from StreamData | |
static constexpr bool | debug = false |
Protected Attributes inherited from StreamData | |
bool | all_writers_closed_ = false |
bool if all writers were closed More... | |
size_t | dia_id_ |
associated DIANode id. More... | |
StreamId | id_ |
our own stream id. More... | |
size_t | local_worker_id_ |
local worker id More... | |
Multiplexer & | multiplexer_ |
reference to multiplexer More... | |
std::atomic< size_t > | remaining_closing_blocks_ |
tlx::Semaphore | sem_closing_blocks_ |
number of received stream closing Blocks. More... | |
StreamSetBase * | stream_set_base_ |
pointer to StreamSetBase containing this StreamData More... | |
size_t | writers_closed_ = 0 |
number of writers closed via StreamSink. More... | |
Definition at line 51 of file mix_stream.hpp.
using MixReader = MixBlockQueueReader |
Definition at line 49 of file mix_stream.hpp.
MixStreamData | ( | StreamSetBase * | stream_set_base, |
Multiplexer & | multiplexer, | ||
size_t | send_size_limit, | ||
const StreamId & | id, | ||
size_t | local_worker_id, | ||
size_t | dia_id | ||
) |
Creates a new stream instance.
Definition at line 27 of file mix_stream.cpp.
References StreamData::num_hosts(), StreamData::remaining_closing_blocks_, and StreamData::workers_per_host().
|
delete |
non-copyable: delete copy-constructor
|
default |
move-constructor: default
|
final |
Definition at line 39 of file mix_stream.cpp.
References LOG.
|
finalvirtual |
shuts the stream down.
Implements StreamData.
Definition at line 124 of file mix_stream.cpp.
References Multiplexer::active_streams_, StreamData::all_writers_closed_, die_unless, StreamData::id_, Multiplexer::IntReleaseMixStream(), MixStreamData::is_closed_, StreamData::local_worker_id_, LOG, StreamData::multiplexer_, Multiplexer::mutex_, StreamData::num_hosts(), StreamData::sem_closing_blocks_, Semaphore::value(), Semaphore::wait(), and StreamData::workers_per_host().
|
finalvirtual |
Indicates if the stream is closed - meaning all remaining outbound queues have been closed.
Implements StreamData.
Definition at line 149 of file mix_stream.cpp.
References MixStreamData::is_closed_, MixStreamData::queue_, and MixBlockQueue::write_closed().
MixStreamData::MixReader GetMixReader | ( | bool | consume | ) |
Creates a BlockReader which mixes items from all workers.
Definition at line 115 of file mix_stream.cpp.
References StreamData::local_worker_id_, MixStreamData::queue_, and StreamData::rx_timespan_.
Referenced by MixStreamData::GetReader().
MixStreamData::MixReader GetReader | ( | bool | consume | ) |
Open a MixReader (function name matches a method in File and CatStream).
Definition at line 120 of file mix_stream.cpp.
References MixStreamData::GetMixReader().
|
finalvirtual |
Creates BlockWriters for each worker. BlockWriter can only be opened once, otherwise the block sequence is incorrectly interleaved!
Implements StreamData.
Definition at line 52 of file mix_stream.cpp.
References Multiplexer::active_streams_, Multiplexer::block_pool_, Group::connection(), thrill::data::default_block_size, Multiplexer::group_, BlockPool::hard_ram_limit(), StreamData::id_, StreamData::local_worker_id_, LOGC, max(), Multiplexer::max_active_streams_, Multiplexer::MixLoopback(), thrill::data::MixStreamBlock, StreamData::multiplexer_, Multiplexer::mutex_, StreamData::my_host_rank(), StreamData::my_worker_rank(), StreamData::num_hosts(), Multiplexer::num_workers(), StreamData::num_workers(), tlx::round_down_to_power_of_two(), MixStreamData::StreamSink, StreamData::tx_timespan_, Multiplexer::workers_per_host(), and StreamData::workers_per_host().
bool is_queue_closed | ( | size_t | from | ) |
check if inbound queue is closed
Definition at line 156 of file mix_stream.cpp.
References MixBlockQueue::is_queue_closed(), MixStreamData::queue_, and MixStreamData::seq_.
|
private |
called from Multiplexer when there is a new Block for this Stream.
Definition at line 168 of file mix_stream.cpp.
References die_unless, StreamMultiplexerHeader::final_seq, StreamData::id_, StreamData::my_worker_rank(), StreamData::num_workers(), MixStreamData::OnStreamBlockOrdered(), StreamData::rx_timespan_, MixStreamData::seq_, sLOG, and TLX_UNLIKELY.
|
private |
called to process PinnedBlock in sequence
Definition at line 206 of file mix_stream.cpp.
References MixBlockQueue::AppendBlock(), MixBlockQueue::Close(), die_unless, StreamData::id_, StreamData::my_worker_rank(), MixStreamData::queue_, StreamData::remaining_closing_blocks_, StreamData::rx_lifetime_, StreamData::rx_net_blocks_, StreamData::rx_net_bytes_, StreamData::rx_net_items_, StreamData::rx_timespan_, StreamData::sem_closing_blocks_, MixStreamData::seq_, Semaphore::signal(), and sLOG.
Referenced by MixStreamData::OnStreamBlock().
|
delete |
non-copyable: delete assignment operator
void set_dia_id | ( | size_t | dia_id | ) |
change dia_id after construction (needed because it may be unknown at construction)
Definition at line 43 of file mix_stream.cpp.
References StreamData::dia_id_, MixStreamData::queue_, and MixBlockQueue::set_dia_id().
|
finalvirtual |
|
staticprivate |
Definition at line 46 of file mix_stream.hpp.
|
private |
flag if Close() was completed
Definition at line 96 of file mix_stream.hpp.
Referenced by MixStreamData::Close(), and MixStreamData::closed().
|
private |
BlockQueue to store incoming Blocks with source.
Definition at line 104 of file mix_stream.hpp.
Referenced by MixStreamData::closed(), MixStreamData::GetMixReader(), MixStreamData::is_queue_closed(), MixStreamData::OnStreamBlockOrdered(), and MixStreamData::set_dia_id().
|
private |
Block Sequence numbers.
Definition at line 98 of file mix_stream.hpp.
Referenced by MixStreamData::is_queue_closed(), MixStreamData::OnStreamBlock(), and MixStreamData::OnStreamBlockOrdered().