Thrill
0.1
|
A BlockQueue is a thread-safe queue used to hand-over Block objects between threads.
It is currently used by the Multiplexer to queue received Blocks and deliver them (later) to their destination.
The BlockQueue itself is also a BlockSink (so one can attach a BlockWriter to it). To read items from the queue, one needs to use a BlockReader instantiated with a BlockQueueSource. Both are easily available via GetWriter() and GetReader(). Each block is available only once via the BlockQueueSource.
Definition at line 47 of file block_queue.hpp.
#include <block_queue.hpp>
Public Types | |
using | CloseCallback = tlx::delegate< void(BlockQueue &)> |
using | ConsumeReader = BlockReader< ConsumeBlockQueueSource > |
using | Reader = DynBlockReader |
using | Writer = BlockWriter< BlockQueueSink > |
Public Member Functions | |
BlockQueue (BlockPool &block_pool, size_t local_worker_id, size_t dia_id, const CloseCallback &close_callback=CloseCallback()) | |
Constructor from BlockPool. More... | |
BlockQueue (const BlockQueue &)=delete | |
non-copyable: delete copy-constructor More... | |
BlockQueue (BlockQueue &&)=default | |
move-constructor: default More... | |
void | AppendBlock (const Block &b, bool) final |
Appends the (unpinned) Block. More... | |
void | AppendBlock (Block &&b, bool) final |
Appends the (unpinned) Block. More... | |
size_t | block_counter () const |
Returns block_counter_. More... | |
size_t | byte_counter () const |
Returns byte_counter_. More... | |
void | Close () final |
Close called by BlockWriter. More... | |
bool | empty () const |
DynBlockSource | GetBlockSource (bool consume, size_t local_worker_id) |
return polymorphic BlockSource variant More... | |
ConsumeReader | GetConsumeReader (size_t local_worker_id) |
return BlockReader specifically for a BlockQueue More... | |
Reader | GetReader (bool consume, size_t local_worker_id) |
return polymorphic BlockReader variant More... | |
Writer | GetWriter (size_t block_size=default_block_size) |
Return a BlockWriter delivering to this BlockQueue. More... | |
size_t | item_counter () const |
Returns item_counter_. More... | |
BlockQueue & | operator= (const BlockQueue &)=delete |
non-copyable: delete assignment operator More... | |
BlockQueue & | operator= (BlockQueue &&)=default |
move-assignment operator: default More... | |
Block | Pop () |
bool | read_closed () const |
check if reader side has returned a closing sentinel block More... | |
void | set_close_callback (const CloseCallback &cb) |
set the close callback More... | |
void | set_dia_id (size_t dia_id) |
size_t | size () |
return number of block in the queue. Use this ONLY for DEBUGGING! More... | |
const common::StatsTimer & | timespan () const |
Returns timespan_. More... | |
bool | write_closed () const |
check if writer side Close() was called. More... | |
Public Member Functions inherited from BlockSink | |
BlockSink (BlockPool &block_pool, size_t local_worker_id) | |
constructor with reference to BlockPool More... | |
BlockSink (BlockPool *block_pool, size_t local_worker_id) | |
constructor with reference to BlockPool More... | |
BlockSink (const BlockSink &)=default | |
default copy-constructor More... | |
BlockSink (BlockSink &&)=default | |
move-constructor: default More... | |
virtual | ~BlockSink () |
required virtual destructor More... | |
virtual PinnedByteBlockPtr | AllocateByteBlock (size_t block_size) |
virtual void | AppendPinnedBlock (PinnedBlock &&b, bool is_last_block) |
Appends the PinnedBlock. More... | |
BlockPool * | block_pool () const |
Returns block_pool_. More... | |
size_t | local_worker_id () const |
local worker id to associate pinned block with More... | |
common::JsonLogger & | logger () |
Returns BlockPool.logger_. More... | |
BlockSink & | operator= (const BlockSink &)=default |
default assignment operator More... | |
BlockSink & | operator= (BlockSink &&)=default |
move-assignment operator: default More... | |
virtual void | ReleaseByteBlock (ByteBlockPtr &block) |
Release an unused ByteBlock with n bytes backing memory. More... | |
size_t | workers_per_host () const |
return number of workers per host More... | |
Static Public Attributes | |
static constexpr bool | allocate_can_fail_ = false |
static constexpr bool | debug = false |
Static Public Attributes inherited from BlockSink | |
static constexpr bool | allocate_can_fail_ = false |
Private Attributes | |
size_t | block_counter_ = 0 |
number of blocks transfered by the Queue More... | |
size_t | byte_counter_ = 0 |
number of bytes transfered by the Queue More... | |
CloseCallback | close_callback_ |
File | file_ |
File to cache blocks for implementing CacheBlockQueueSource. More... | |
size_t | item_counter_ = 0 |
number of items transfered by the Queue More... | |
common::ConcurrentBoundedQueue< Block > | queue_ |
bool | read_closed_ = false |
common::StatsTimerStart | timespan_ |
timespan of existance More... | |
common::AtomicMovable< bool > | write_closed_ = { false } |
Additional Inherited Members | |
Protected Attributes inherited from BlockSink | |
size_t | local_worker_id_ |
local worker id to associate pinned block with More... | |
using CloseCallback = tlx::delegate<void (BlockQueue&)> |
Definition at line 56 of file block_queue.hpp.
Definition at line 54 of file block_queue.hpp.
using Reader = DynBlockReader |
Definition at line 53 of file block_queue.hpp.
using Writer = BlockWriter<BlockQueueSink> |
Definition at line 52 of file block_queue.hpp.
BlockQueue | ( | BlockPool & | block_pool, |
size_t | local_worker_id, | ||
size_t | dia_id, | ||
const CloseCallback & | close_callback = CloseCallback() |
||
) |
Constructor from BlockPool.
Definition at line 19 of file block_queue.cpp.
References BlockPool::workers_per_host().
|
delete |
non-copyable: delete copy-constructor
|
default |
move-constructor: default
|
inlinefinalvirtual |
Appends the (unpinned) Block.
Implements BlockSink.
Definition at line 72 of file block_queue.hpp.
References BlockQueue::block_counter_, BlockQueue::byte_counter_, BlockQueue::item_counter_, LOG, and BlockQueue::queue_.
Referenced by StreamSink::AppendBlock().
|
inlinefinalvirtual |
Appends the (unpinned) Block.
Implements BlockSink.
Definition at line 79 of file block_queue.hpp.
References BlockQueue::block_counter_, BlockQueue::byte_counter_, BlockQueue::Close(), BlockQueue::item_counter_, LOG, and BlockQueue::queue_.
|
inline |
Returns block_counter_.
Definition at line 127 of file block_queue.hpp.
References BlockQueue::block_counter_.
Referenced by CatStreamData::CatStreamData().
|
inline |
Returns byte_counter_.
Definition at line 125 of file block_queue.hpp.
References BlockQueue::byte_counter_.
Referenced by CatStreamData::CatStreamData().
|
finalvirtual |
Close called by BlockWriter.
Implements BlockSink.
Definition at line 28 of file block_queue.cpp.
References BlockQueue::block_counter_, BlockQueue::close_callback_, BlockQueue::queue_, and BlockQueue::write_closed_.
Referenced by BlockQueue::AppendBlock(), and StreamSink::Close().
|
inline |
Definition at line 114 of file block_queue.hpp.
References BlockQueue::queue_.
DynBlockSource GetBlockSource | ( | bool | consume, |
size_t | local_worker_id | ||
) |
return polymorphic BlockSource variant
Definition at line 52 of file block_queue.cpp.
References BlockQueue::file_, BlockSink::local_worker_id(), File::num_items(), BlockQueue::read_closed_, and sLOG.
Referenced by BlockQueue::GetReader(), and BlockQueue::timespan().
BlockQueue::ConsumeReader GetConsumeReader | ( | size_t | local_worker_id | ) |
return BlockReader specifically for a BlockQueue
Definition at line 47 of file block_queue.cpp.
References BlockQueue::read_closed_.
Referenced by BlockQueue::timespan().
BlockQueue::Reader GetReader | ( | bool | consume, |
size_t | local_worker_id | ||
) |
return polymorphic BlockReader variant
Definition at line 85 of file block_queue.cpp.
References BlockQueue::GetBlockSource().
Referenced by BlockQueue::timespan().
BlockQueue::Writer GetWriter | ( | size_t | block_size = default_block_size | ) |
Return a BlockWriter delivering to this BlockQueue.
Definition at line 43 of file block_queue.cpp.
Referenced by BlockQueue::timespan().
|
inline |
Returns item_counter_.
Definition at line 123 of file block_queue.hpp.
References BlockQueue::item_counter_.
Referenced by CatStreamData::CatStreamData().
|
delete |
non-copyable: delete assignment operator
Referenced by BlockQueueSink::BlockQueueSink().
|
default |
move-assignment operator: default
|
inline |
Definition at line 92 of file block_queue.hpp.
References Block::IsValid(), BlockQueue::queue_, and BlockQueue::read_closed_.
Referenced by ConsumeBlockQueueSource::NextBlock(), and CacheBlockQueueSource::NextBlock().
|
inline |
check if reader side has returned a closing sentinel block
Definition at line 117 of file block_queue.hpp.
References BlockQueue::read_closed_.
Referenced by CacheBlockQueueSource::NextBlock(), and CacheBlockQueueSource::~CacheBlockQueueSource().
|
inline |
set the close callback
Definition at line 107 of file block_queue.hpp.
References BlockQueue::close_callback_.
|
inline |
change dia_id after construction (needed because it may be unknown at construction)
Definition at line 102 of file block_queue.hpp.
References BlockQueue::file_, and File::set_dia_id().
|
inline |
return number of block in the queue. Use this ONLY for DEBUGGING!
Definition at line 120 of file block_queue.hpp.
References BlockQueue::queue_, and BlockQueue::write_closed().
|
inline |
Returns timespan_.
Definition at line 129 of file block_queue.hpp.
References thrill::data::default_block_size, BlockQueue::GetBlockSource(), BlockQueue::GetConsumeReader(), BlockQueue::GetReader(), BlockQueue::GetWriter(), and BlockQueue::timespan_.
|
inline |
check if writer side Close() was called.
Definition at line 112 of file block_queue.hpp.
References BlockQueue::write_closed_.
Referenced by BlockQueue::size().
|
static |
Definition at line 90 of file block_queue.hpp.
|
private |
number of blocks transfered by the Queue
Definition at line 157 of file block_queue.hpp.
Referenced by BlockQueue::AppendBlock(), BlockQueue::block_counter(), and BlockQueue::Close().
|
private |
number of bytes transfered by the Queue
Definition at line 155 of file block_queue.hpp.
Referenced by BlockQueue::AppendBlock(), and BlockQueue::byte_counter().
|
private |
callback to issue when the writer closes the Queue – for delivering stats
Definition at line 166 of file block_queue.hpp.
Referenced by BlockQueue::Close(), and BlockQueue::set_close_callback().
|
static |
Definition at line 50 of file block_queue.hpp.
|
private |
File to cache blocks for implementing CacheBlockQueueSource.
Definition at line 162 of file block_queue.hpp.
Referenced by BlockQueue::GetBlockSource(), CacheBlockQueueSource::NextBlock(), and BlockQueue::set_dia_id().
|
private |
number of items transfered by the Queue
Definition at line 153 of file block_queue.hpp.
Referenced by BlockQueue::AppendBlock(), and BlockQueue::item_counter().
|
private |
Definition at line 144 of file block_queue.hpp.
Referenced by BlockQueue::AppendBlock(), BlockQueueSink::AppendBlock(), BlockQueue::Close(), BlockQueueSink::Close(), BlockQueue::empty(), BlockQueue::Pop(), BlockQueue::size(), and BlockQueueSink::~BlockQueueSink().
|
private |
whether Pop() has returned a closing Block; hence, if we received the close message from the writer
Definition at line 150 of file block_queue.hpp.
Referenced by BlockQueue::GetBlockSource(), BlockQueue::GetConsumeReader(), BlockQueue::Pop(), and BlockQueue::read_closed().
|
private |
timespan of existance
Definition at line 159 of file block_queue.hpp.
Referenced by BlockQueue::timespan().
|
private |
Definition at line 146 of file block_queue.hpp.
Referenced by BlockQueue::Close(), and BlockQueue::write_closed().