23 file_(block_pool, local_worker_id, dia_id),
24 close_callback_(close_callback) {
55 sLOG <<
"BlockQueue::GetBlockSource() consume, from queue.";
56 return ConstructDynBlockSource<ConsumeBlockQueueSource>(
61 sLOG <<
"BlockQueue::GetBlockSource() consume, from cache:" 63 return ConstructDynBlockSource<ConsumeFileBlockSource>(
68 sLOG <<
"BlockQueue::GetBlockSource() non-consume, from queue.";
69 return ConstructDynBlockSource<CacheBlockQueueSource>(
74 sLOG <<
"BlockQueue::GetBlockSource() non-consume, from cache:" 76 return ConstructDynBlockSource<KeepFileBlockSource>(
102 LOG <<
"ConsumeBlockQueueSource::NextBlock() " << b;
127 LOG <<
"CacheBlockQueueSource[" <<
this <<
"]::NextBlock() " << b;
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
PinnedBlock PinWait() const =delete
not available in PinnedBlock
#define sLOG
Default logging method: output if the local debug variable is true.
BlockWriter< BlockQueueSink > Writer
BlockQueue * queue_
Reference to BlockQueue.
ConsumeBlockQueueSource(BlockQueue &queue, size_t local_worker_id)
Start reading from a BlockQueue.
size_t block_counter_
number of blocks transfered by the Queue
ConsumeReader GetConsumeReader(size_t local_worker_id)
return BlockReader specifically for a BlockQueue
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
common::ConcurrentBoundedQueue< Block > queue_
File file_
File to cache blocks for implementing CacheBlockQueueSource.
size_t local_worker_id_
local worker id to associate pinned block with
BlockSink which interfaces to a File.
size_t local_worker_id_
local worker id of the thread reading the BlockQueue
void AppendBlock(const Block &b)
common::AtomicMovable< bool > write_closed_
Reader GetReader(bool consume, size_t local_worker_id)
return polymorphic BlockReader variant
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
size_t workers_per_host() const
return number of workers per host
BlockQueue & queue_
BlockQueue that blocks are retrieved from.
size_t local_worker_id() const
local worker id to associate pinned block with
Writer GetWriter(size_t block_size=default_block_size)
Return a BlockWriter delivering to this BlockQueue.
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
tlx::delegate< void(BlockQueue &)> CloseCallback
size_t local_worker_id_
local worker id of the thread reading the BlockQueue
CacheBlockQueueSource(BlockQueue *queue, size_t local_worker_id)
Start reading from a BlockQueue.
CloseCallback close_callback_
bool read_closed() const
check if reader side has returned a closing sentinel block
~CacheBlockQueueSource()
Consume remaining blocks and cache them in the File.
Pure virtual base class for all things that can receive Blocks from a BlockWriter.
A pinned / pin-counted derivative of a Block.
BlockQueue(BlockPool &block_pool, size_t local_worker_id, size_t dia_id, const CloseCallback &close_callback=CloseCallback())
Constructor from BlockPool.
size_t num_items() const
Return the number of items in the file.
PinnedBlock NextBlock()
Return next block for BlockQueue, store into caching File and return it.
BlockReader< DynBlockSource > DynBlockReader
Instantiation of BlockReader for reading from the polymorphic source.
BlockReader< ConsumeBlockQueueSource > ConsumeReader
#define LOG
Default logging method: output if the local debug variable is true.
DynBlockSource GetBlockSource(bool consume, size_t local_worker_id)
return polymorphic BlockSource variant
A BlockQueue is a thread-safe queue used to hand-over Block objects between threads.
bool IsValid() const
Return whether the enclosed ByteBlock is valid.
A BlockSource to read Blocks from a BlockQueue using a BlockReader, and at the same time CACHE all it...
A BlockSource to read Block from a BlockQueue using a BlockReader.
void Close() final
Close called by BlockWriter.
This is the actual BlockSource used to instantiate BlockReader.