Thrill
0.1
|
Implements reading an unordered sequence of items from multiple workers, which sends Blocks.
This class is mainly used to implement MixChannel.
When Blocks arrive from the net, the Multiplexer pushes (src, Blocks) pairs to MixChannel, which pushes them into a MixBlockQueue. The MixBlockQueue stores these in a ConcurrentBoundedQueue for atomic reading.
When the MixChannel should be read, MixBlockQueueReader is used, which retrieves Blocks from the queue. The Reader contains one complete BlockReader for each inbound worker, and these BlockReaders are attached to BlockQueue instances inside the MixBlockQueue.
To enable unordered reading from multiple workers, the only remaining thing to do is to fetch Blocks from the main mix queue and put them into the right BlockQueue for the sub-readers to consume. By taking the Blocks from the main mix queue, the Reader only blocks when no inbound Blocks are available.
To enable switching between items from different workers, the MixBlockQueueReader keeps track of how many whole items are available on each reader. This number is simply -1 of the number of items known to start in the received blocks. The last item may span further Blocks, and cannot be fetched without infinitely blocking the sub-reader, since no thread will deliver the next Block.
Definition at line 64 of file mix_block_queue.hpp.
#include <mix_block_queue.hpp>
Classes | |
struct | SrcBlockPair |
pair of (source worker, Block) stored in the main mix queue. More... | |
Public Types | |
using | Reader = MixBlockQueueReader |
Public Member Functions | |
MixBlockQueue (BlockPool &block_pool, size_t num_workers, size_t local_worker_id, size_t dia_id) | |
Constructor from BlockPool. More... | |
MixBlockQueue (const MixBlockQueue &)=delete | |
non-copyable: delete copy-constructor More... | |
MixBlockQueue (MixBlockQueue &&)=default | |
move-constructor: default More... | |
void | AppendBlock (size_t src, const Block &block) |
append block delivered via the network from src. More... | |
void | AppendBlock (size_t src, Block &&block) |
append block delivered via the network from src. More... | |
BlockPool & | block_pool () |
return block pool More... | |
void | Close (size_t src) |
append closing sentinel block from src (also delivered via the network). More... | |
bool | is_queue_closed (size_t src) |
check if inbound queue is closed More... | |
MixBlockQueue & | operator= (const MixBlockQueue &)=delete |
non-copyable: delete assignment operator More... | |
MixBlockQueue & | operator= (MixBlockQueue &&)=default |
move-assignment operator: default More... | |
SrcBlockPair | Pop () |
Blocking retrieval of a (source,block) pair. More... | |
bool | read_closed () const |
check if reader side has returned a closing sentinel block More... | |
void | set_dia_id (size_t dia_id) |
bool | write_closed () const |
check if writer side Close() was called. More... | |
Private Attributes | |
BlockPool & | block_pool_ |
size_t | local_worker_id_ |
common::ConcurrentBoundedQueue< SrcBlockPair > | mix_queue_ |
the main mix queue, containing the block in the reception order. More... | |
size_t | num_workers_ |
total number of workers in system. More... | |
std::vector< BlockQueue > | queues_ |
BlockQueues to deliver blocks to from mix queue. More... | |
size_t | read_open_ = num_workers_ |
std::vector< unsigned char > | write_closed_ |
flag to test for closed sources More... | |
common::AtomicMovable< size_t > | write_open_count_ { num_workers_ } |
counter on number of writers still open. More... | |
Static Private Attributes | |
static constexpr bool | debug = false |
using Reader = MixBlockQueueReader |
Definition at line 75 of file mix_block_queue.hpp.
MixBlockQueue | ( | BlockPool & | block_pool, |
size_t | num_workers, | ||
size_t | local_worker_id, | ||
size_t | dia_id | ||
) |
Constructor from BlockPool.
Definition at line 22 of file mix_block_queue.cpp.
References MixBlockQueue::block_pool_, and MixBlockQueue::queues_.
|
delete |
non-copyable: delete copy-constructor
|
default |
move-constructor: default
void AppendBlock | ( | size_t | src, |
const Block & | block | ||
) |
append block delivered via the network from src.
Definition at line 40 of file mix_block_queue.cpp.
References LOG, and MixBlockQueue::mix_queue_.
Referenced by MixBlockQueue::block_pool(), and MixStreamData::OnStreamBlockOrdered().
void AppendBlock | ( | size_t | src, |
Block && | block | ||
) |
append block delivered via the network from src.
Definition at line 46 of file mix_block_queue.cpp.
References LOG, and MixBlockQueue::mix_queue_.
|
inline |
return block pool
Definition at line 95 of file mix_block_queue.hpp.
References MixBlockQueue::AppendBlock(), MixBlockQueue::SrcBlockPair::block, MixBlockQueue::block_pool_, MixBlockQueue::Close(), MixBlockQueue::Pop(), and MixBlockQueue::SrcBlockPair::src.
void Close | ( | size_t | src | ) |
append closing sentinel block from src (also delivered via the network).
Definition at line 52 of file mix_block_queue.cpp.
References MixBlockQueue::local_worker_id_, LOG, MixBlockQueue::mix_queue_, MixBlockQueue::write_closed_, and MixBlockQueue::write_open_count_.
Referenced by MixBlockQueue::block_pool(), and MixStreamData::OnStreamBlockOrdered().
bool is_queue_closed | ( | size_t | src | ) |
check if inbound queue is closed
Definition at line 65 of file mix_block_queue.cpp.
References MixBlockQueue::write_closed_.
Referenced by MixStreamData::is_queue_closed(), and MixBlockQueue::read_closed().
|
delete |
non-copyable: delete assignment operator
|
default |
move-assignment operator: default
MixBlockQueue::SrcBlockPair Pop | ( | ) |
Blocking retrieval of a (source,block) pair.
Definition at line 69 of file mix_block_queue.cpp.
References MixBlockQueue::SrcBlockPair::block, Block::IsValid(), LOG, MixBlockQueue::mix_queue_, and MixBlockQueue::read_open_.
Referenced by MixBlockQueue::block_pool(), and MixBlockQueueReader::PullBlock().
|
inline |
check if reader side has returned a closing sentinel block
Definition at line 113 of file mix_block_queue.hpp.
References MixBlockQueue::is_queue_closed(), and MixBlockQueue::read_open_.
void set_dia_id | ( | size_t | dia_id | ) |
change dia_id after construction (needed because it may be unknown at construction)
Definition at line 34 of file mix_block_queue.cpp.
References MixBlockQueue::queues_.
Referenced by MixStreamData::set_dia_id().
|
inline |
check if writer side Close() was called.
Definition at line 110 of file mix_block_queue.hpp.
References MixBlockQueue::write_open_count_.
Referenced by MixStreamData::closed().
|
private |
Definition at line 119 of file mix_block_queue.hpp.
Referenced by MixBlockQueue::block_pool(), and MixBlockQueue::MixBlockQueue().
|
staticprivate |
Definition at line 66 of file mix_block_queue.hpp.
|
private |
Definition at line 121 of file mix_block_queue.hpp.
Referenced by MixBlockQueue::Close().
|
private |
the main mix queue, containing the block in the reception order.
Definition at line 124 of file mix_block_queue.hpp.
Referenced by MixBlockQueue::AppendBlock(), MixBlockQueue::Close(), and MixBlockQueue::Pop().
|
private |
total number of workers in system.
Definition at line 127 of file mix_block_queue.hpp.
Referenced by MixBlockQueueReader::MixBlockQueueReader().
|
private |
BlockQueues to deliver blocks to from mix queue.
Definition at line 140 of file mix_block_queue.hpp.
Referenced by MixBlockQueue::MixBlockQueue(), MixBlockQueueReader::MixBlockQueueReader(), MixBlockQueueReader::PullBlock(), and MixBlockQueue::set_dia_id().
|
private |
number of times Pop() has not yet returned a closing Block; hence, if we received the close message from the writer.
Definition at line 137 of file mix_block_queue.hpp.
Referenced by MixBlockQueue::Pop(), and MixBlockQueue::read_closed().
|
private |
flag to test for closed sources
Definition at line 133 of file mix_block_queue.hpp.
Referenced by MixBlockQueue::Close(), and MixBlockQueue::is_queue_closed().
|
private |
counter on number of writers still open.
Definition at line 130 of file mix_block_queue.hpp.
Referenced by MixBlockQueue::Close(), and MixBlockQueue::write_closed().