23 size_t local_worker_id,
size_t dia_id)
24 : block_pool_(block_pool),
25 local_worker_id_(local_worker_id),
26 num_workers_(num_workers),
27 write_closed_(num_workers) {
29 for (
size_t w = 0; w < num_workers; ++w) {
35 for (
size_t i = 0; i <
queues_.size(); ++i) {
41 LOG <<
"MixBlockQueue::AppendBlock" 42 <<
" src=" << src <<
" block=" << block;
47 LOG <<
"MixBlockQueue::AppendBlock" 48 <<
" src=" << src <<
" block=" << block;
53 LOG <<
"MixBlockQueue::Close()" 77 LOG <<
"MixBlockQueue()" 88 MixBlockQueue& mix_queue,
bool consume,
size_t local_worker_id)
103 std::vector<DynBlockSource> result;
106 consume, local_worker_id));
122 LOG <<
"MixBlockQueueReader::PullBlock()" 123 <<
" still open_=" <<
open_ 124 <<
" src=" << src_blk.
src <<
" block=" << src_blk.
block 139 std::move(src_blk.
block),
false);
142 available_at_[src_blk.
src] += num_items;
153 std::move(src_blk.
block),
false);
156 if (available_at_[src_blk.
src]) {
157 assert(available_at_[src_blk.
src] == 1);
162 else if (
open_ == 0)
return false;
165 LOG <<
"MixBlockQueueReader::PullBlock() afterwards" 168 <<
" available_at_[src]=" << available_at_[src_blk.
src];
data::CatBlockSource< DynBlockSource > CatBlockSource
void Close(size_t src)
append closing sentinel block from src (also delivered via the network).
MixBlockQueue(BlockPool &block_pool, size_t num_workers, size_t local_worker_id, size_t dia_id)
Constructor from BlockPool.
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
std::vector< unsigned char > write_closed_
flag to test for closed sources
void set_dia_id(size_t dia_id)
std::vector< BlockQueue > queues_
BlockQueues to deliver blocks to from mix queue.
MixBlockQueueReader(MixBlockQueue &mix_queue, bool consume, size_t local_worker_id)
pair of (source worker, Block) stored in the main mix queue.
size_t available_
number of available items on the selected reader
size_t selected_
reader currently selected
size_t open_
number of readers still open
~MixBlockQueueReader()
Possibly consume unread blocks.
Implements reading an unordered sequence of items from multiple workers, which sends Blocks...
MixBlockQueue & mix_queue_
reference to mix queue
std::vector< BlockQueue::Reader > readers_
sub-readers for each block queue in mix queue
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
size_t num_workers_
total number of workers in system.
void AppendBlock(size_t src, const Block &block)
append block delivered via the network from src.
CatBlockReader cat_reader_
bool is_queue_closed(size_t src)
check if inbound queue is closed
BlockReader< CatBlockSource > CatBlockReader
size_t num_items() const
return number of items beginning in this block
SrcBlockPair Pop()
Blocking retrieval of a (source,block) pair.
bool read_closed() const
check if reader side has returned a closing sentinel block
std::vector< size_t > available_at_
#define LOG
Default logging method: output if the local debug variable is true.
common::AtomicMovable< size_t > write_open_count_
counter on number of writers still open.
bool IsValid() const
Return whether the enclosed ByteBlock is valid.
common::ConcurrentBoundedQueue< SrcBlockPair > mix_queue_
the main mix queue, containing the block in the reception order.