12 #ifndef THRILL_DATA_MIX_BLOCK_QUEUE_HEADER 13 #define THRILL_DATA_MIX_BLOCK_QUEUE_HEADER 34 class MixBlockQueueReader;
66 static constexpr
bool debug =
false;
79 size_t local_worker_id,
size_t dia_id);
104 void Close(
size_t src);
163 static constexpr
bool debug =
false;
170 bool consume,
size_t local_worker_id);
186 if (reread_)
return cat_reader_.HasNext();
188 if (available_)
return true;
189 if (open_ == 0)
return false;
195 template <
typename T>
200 return cat_reader_.template Next<T>();
205 throw std::runtime_error(
206 "Data underflow in MixBlockQueueReader.");
210 assert(available_ > 0);
211 assert(selected_ < readers_.size());
214 return readers_[selected_].template Next<T>();
233 size_t selected_ = size_t(-1);
236 size_t available_ = 0;
259 #endif // !THRILL_DATA_MIX_BLOCK_QUEUE_HEADER void Close(size_t src)
append closing sentinel block from src (also delivered via the network).
MixBlockQueue(BlockPool &block_pool, size_t num_workers, size_t local_worker_id, size_t dia_id)
Constructor from BlockPool.
bool HasNext()
HasNext() returns true if at least one more item is available.
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
std::vector< unsigned char > write_closed_
flag to test for closed sources
void set_dia_id(size_t dia_id)
std::vector< BlockQueue > queues_
BlockQueues to deliver blocks to from mix queue.
pair of (source worker, Block) stored in the main mix queue.
CatBlockSource is a BlockSource which concatenates all Blocks available from a vector of BlockSources...
friend class MixBlockQueueReader
for access to queues_ and other internals.
BlockPool & block_pool()
return block pool
Implements reading an unordered sequence of items from multiple workers, which sends Blocks...
MixBlockQueue & operator=(const MixBlockQueue &)=delete
non-copyable: delete assignment operator
Reader to retrieve items in unordered sequence from a MixBlockQueue.
MixBlockQueue & mix_queue_
reference to mix queue
std::vector< BlockQueue::Reader > readers_
sub-readers for each block queue in mix queue
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
bool write_closed() const
check if writer side Close() was called.
static constexpr bool debug
This is a queue, similar to std::queue and tbb::concurrent_bounded_queue, except that it uses mutexes...
T Next()
Next() reads a complete item T.
size_t num_workers_
total number of workers in system.
void AppendBlock(size_t src, const Block &block)
append block delivered via the network from src.
bool is_queue_closed(size_t src)
check if inbound queue is closed
SrcBlockPair Pop()
Blocking retrieval of a (source,block) pair.
bool read_closed() const
check if reader side has returned a closing sentinel block
std::vector< size_t > available_at_
common::AtomicMovable< size_t > write_open_count_
counter on number of writers still open.
common::ConcurrentBoundedQueue< SrcBlockPair > mix_queue_
the main mix queue, containing the block in the reception order.