13 #ifndef THRILL_DATA_BLOCK_QUEUE_HEADER 14 #define THRILL_DATA_BLOCK_QUEUE_HEADER 34 class ConsumeBlockQueueSource;
50 static constexpr
bool debug =
false;
73 LOG <<
"BlockQueue::AppendBlock() " << b;
80 LOG <<
"BlockQueue::AppendBlock() move " << b;
84 queue_.emplace(std::move(b));
177 static constexpr
bool debug =
false;
187 LOG <<
"BlockQueueSink() new for " << queue;
196 LOG <<
"~BlockQueueSink() for " <<
queue_;
206 return queue_->AppendBlock(b, is_last_block);
213 return queue_->AppendBlock(std::move(b), is_last_block);
241 void Prefetch(
size_t );
276 void Prefetch(
size_t );
297 #endif // !THRILL_DATA_BLOCK_QUEUE_HEADER Block combines a reference to a read-only ByteBlock and book-keeping information. ...
size_t item_counter_
number of items transfered by the Queue
BlockQueueSink(BlockQueue *queue)
void set_dia_id(size_t dia_id)
BlockQueue * queue_
Reference to BlockQueue.
size_t byte_counter_
number of bytes transfered by the Queue
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
A File is an ordered sequence of Block objects for storing items.
size_t block_counter_
number of blocks transfered by the Queue
friend class CacheBlockQueueSource
for access to file_
ConsumeReader GetConsumeReader(size_t local_worker_id)
return BlockReader specifically for a BlockQueue
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
common::ConcurrentBoundedQueue< Block > queue_
File file_
File to cache blocks for implementing CacheBlockQueueSource.
void set_dia_id(size_t dia_id)
void AppendBlock(const Block &b, bool is_last_block) final
static constexpr bool debug
BlockSink which interfaces to a File.
size_t local_worker_id_
local worker id of the thread reading the BlockQueue
size_t item_counter() const
Returns item_counter_.
common::StatsTimerStart timespan_
timespan of existance
common::AtomicMovable< bool > write_closed_
void AppendBlock(Block &&b, bool is_last_block) final
static constexpr bool allocate_can_fail_
Reader GetReader(bool consume, size_t local_worker_id)
return polymorphic BlockReader variant
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
BlockQueue & queue_
BlockQueue that blocks are retrieved from.
size_t local_worker_id() const
local worker id to associate pinned block with
Writer GetWriter(size_t block_size=default_block_size)
Return a BlockWriter delivering to this BlockQueue.
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
This is a queue, similar to std::queue and tbb::concurrent_bounded_queue, except that it uses mutexes...
tlx::delegate< void(BlockQueue &)> CloseCallback
BlockPool * block_pool() const
Returns block_pool_.
size_t block_counter() const
Returns block_counter_.
size_t local_worker_id_
local worker id of the thread reading the BlockQueue
void set_close_callback(const CloseCallback &cb)
set the close callback
void AppendBlock(Block &&b, bool) final
Appends the (unpinned) Block.
bool write_closed() const
check if writer side Close() was called.
CloseCallback close_callback_
bool read_closed() const
check if reader side has returned a closing sentinel block
size_t byte_counter() const
Returns byte_counter_.
BlockQueue & operator=(const BlockQueue &)=delete
non-copyable: delete assignment operator
void AppendBlock(const Block &b, bool) final
Appends the (unpinned) Block.
void Close() final
Closes the sink. Must not be called multiple times.
Pure virtual base class for all things that can receive Blocks from a BlockWriter.
A pinned / pin-counted derivative of a Block.
BlockQueue(BlockPool &block_pool, size_t local_worker_id, size_t dia_id, const CloseCallback &close_callback=CloseCallback())
Constructor from BlockPool.
const common::StatsTimer & timespan() const
Returns timespan_.
size_t size()
return number of block in the queue. Use this ONLY for DEBUGGING!
BlockReader< DynBlockSource > DynBlockReader
Instantiation of BlockReader for reading from the polymorphic source.
#define LOG
Default logging method: output if the local debug variable is true.
DynBlockSource GetBlockSource(bool consume, size_t local_worker_id)
return polymorphic BlockSource variant
bool IsValid() const
Return whether the enclosed ByteBlock is valid.
A BlockQueue is a thread-safe queue used to hand-over Block objects between threads.
A BlockSource to read Blocks from a BlockQueue using a BlockReader, and at the same time CACHE all it...
A BlockSource to read Block from a BlockQueue using a BlockReader.
void Close() final
Close called by BlockWriter.
This is the actual BlockSource used to instantiate BlockReader.