|
Thrill
0.1
|
A BlockQueue is a thread-safe queue used to hand-over Block objects between threads.
It is currently used by the Multiplexer to queue received Blocks and deliver them (later) to their destination.
The BlockQueue itself is also a BlockSink (so one can attach a BlockWriter to it). To read items from the queue, one needs to use a BlockReader instantiated with a BlockQueueSource. Both are easily available via GetWriter() and GetReader(). Each block is available only once via the BlockQueueSource.
Definition at line 47 of file block_queue.hpp.
Inheritance diagram for BlockQueue:
Collaboration diagram for BlockQueue:#include <block_queue.hpp>
Public Types | |
| using | CloseCallback = tlx::delegate< void(BlockQueue &)> |
| using | ConsumeReader = BlockReader< ConsumeBlockQueueSource > |
| using | Reader = DynBlockReader |
| using | Writer = BlockWriter< BlockQueueSink > |
Public Member Functions | |
| BlockQueue (BlockPool &block_pool, size_t local_worker_id, size_t dia_id, const CloseCallback &close_callback=CloseCallback()) | |
| Constructor from BlockPool. More... | |
| BlockQueue (const BlockQueue &)=delete | |
| non-copyable: delete copy-constructor More... | |
| BlockQueue (BlockQueue &&)=default | |
| move-constructor: default More... | |
| void | AppendBlock (const Block &b, bool) final |
| Appends the (unpinned) Block. More... | |
| void | AppendBlock (Block &&b, bool) final |
| Appends the (unpinned) Block. More... | |
| size_t | block_counter () const |
| Returns block_counter_. More... | |
| size_t | byte_counter () const |
| Returns byte_counter_. More... | |
| void | Close () final |
| Close called by BlockWriter. More... | |
| bool | empty () const |
| DynBlockSource | GetBlockSource (bool consume, size_t local_worker_id) |
| return polymorphic BlockSource variant More... | |
| ConsumeReader | GetConsumeReader (size_t local_worker_id) |
| return BlockReader specifically for a BlockQueue More... | |
| Reader | GetReader (bool consume, size_t local_worker_id) |
| return polymorphic BlockReader variant More... | |
| Writer | GetWriter (size_t block_size=default_block_size) |
| Return a BlockWriter delivering to this BlockQueue. More... | |
| size_t | item_counter () const |
| Returns item_counter_. More... | |
| BlockQueue & | operator= (const BlockQueue &)=delete |
| non-copyable: delete assignment operator More... | |
| BlockQueue & | operator= (BlockQueue &&)=default |
| move-assignment operator: default More... | |
| Block | Pop () |
| bool | read_closed () const |
| check if reader side has returned a closing sentinel block More... | |
| void | set_close_callback (const CloseCallback &cb) |
| set the close callback More... | |
| void | set_dia_id (size_t dia_id) |
| size_t | size () |
| return number of block in the queue. Use this ONLY for DEBUGGING! More... | |
| const common::StatsTimer & | timespan () const |
| Returns timespan_. More... | |
| bool | write_closed () const |
| check if writer side Close() was called. More... | |
Public Member Functions inherited from BlockSink | |
| BlockSink (BlockPool &block_pool, size_t local_worker_id) | |
| constructor with reference to BlockPool More... | |
| BlockSink (BlockPool *block_pool, size_t local_worker_id) | |
| constructor with reference to BlockPool More... | |
| BlockSink (const BlockSink &)=default | |
| default copy-constructor More... | |
| BlockSink (BlockSink &&)=default | |
| move-constructor: default More... | |
| virtual | ~BlockSink () |
| required virtual destructor More... | |
| virtual PinnedByteBlockPtr | AllocateByteBlock (size_t block_size) |
| virtual void | AppendPinnedBlock (PinnedBlock &&b, bool is_last_block) |
| Appends the PinnedBlock. More... | |
| BlockPool * | block_pool () const |
| Returns block_pool_. More... | |
| size_t | local_worker_id () const |
| local worker id to associate pinned block with More... | |
| common::JsonLogger & | logger () |
| Returns BlockPool.logger_. More... | |
| BlockSink & | operator= (const BlockSink &)=default |
| default assignment operator More... | |
| BlockSink & | operator= (BlockSink &&)=default |
| move-assignment operator: default More... | |
| virtual void | ReleaseByteBlock (ByteBlockPtr &block) |
| Release an unused ByteBlock with n bytes backing memory. More... | |
| size_t | workers_per_host () const |
| return number of workers per host More... | |
Static Public Attributes | |
| static constexpr bool | allocate_can_fail_ = false |
| static constexpr bool | debug = false |
Static Public Attributes inherited from BlockSink | |
| static constexpr bool | allocate_can_fail_ = false |
Private Attributes | |
| size_t | block_counter_ = 0 |
| number of blocks transfered by the Queue More... | |
| size_t | byte_counter_ = 0 |
| number of bytes transfered by the Queue More... | |
| CloseCallback | close_callback_ |
| File | file_ |
| File to cache blocks for implementing CacheBlockQueueSource. More... | |
| size_t | item_counter_ = 0 |
| number of items transfered by the Queue More... | |
| common::ConcurrentBoundedQueue< Block > | queue_ |
| bool | read_closed_ = false |
| common::StatsTimerStart | timespan_ |
| timespan of existance More... | |
| common::AtomicMovable< bool > | write_closed_ = { false } |
Additional Inherited Members | |
Protected Attributes inherited from BlockSink | |
| size_t | local_worker_id_ |
| local worker id to associate pinned block with More... | |
| using CloseCallback = tlx::delegate<void (BlockQueue&)> |
Definition at line 56 of file block_queue.hpp.
Definition at line 54 of file block_queue.hpp.
| using Reader = DynBlockReader |
Definition at line 53 of file block_queue.hpp.
| using Writer = BlockWriter<BlockQueueSink> |
Definition at line 52 of file block_queue.hpp.
| BlockQueue | ( | BlockPool & | block_pool, |
| size_t | local_worker_id, | ||
| size_t | dia_id, | ||
| const CloseCallback & | close_callback = CloseCallback() |
||
| ) |
Constructor from BlockPool.
Definition at line 19 of file block_queue.cpp.
References BlockPool::workers_per_host().
|
delete |
non-copyable: delete copy-constructor
|
default |
move-constructor: default
|
inlinefinalvirtual |
Appends the (unpinned) Block.
Implements BlockSink.
Definition at line 72 of file block_queue.hpp.
References BlockQueue::block_counter_, BlockQueue::byte_counter_, BlockQueue::item_counter_, LOG, and BlockQueue::queue_.
Referenced by StreamSink::AppendBlock().
|
inlinefinalvirtual |
Appends the (unpinned) Block.
Implements BlockSink.
Definition at line 79 of file block_queue.hpp.
References BlockQueue::block_counter_, BlockQueue::byte_counter_, BlockQueue::Close(), BlockQueue::item_counter_, LOG, and BlockQueue::queue_.
|
inline |
Returns block_counter_.
Definition at line 127 of file block_queue.hpp.
References BlockQueue::block_counter_.
Referenced by CatStreamData::CatStreamData().
|
inline |
Returns byte_counter_.
Definition at line 125 of file block_queue.hpp.
References BlockQueue::byte_counter_.
Referenced by CatStreamData::CatStreamData().
|
finalvirtual |
Close called by BlockWriter.
Implements BlockSink.
Definition at line 28 of file block_queue.cpp.
References BlockQueue::block_counter_, BlockQueue::close_callback_, BlockQueue::queue_, and BlockQueue::write_closed_.
Referenced by BlockQueue::AppendBlock(), and StreamSink::Close().
|
inline |
Definition at line 114 of file block_queue.hpp.
References BlockQueue::queue_.
| DynBlockSource GetBlockSource | ( | bool | consume, |
| size_t | local_worker_id | ||
| ) |
return polymorphic BlockSource variant
Definition at line 52 of file block_queue.cpp.
References BlockQueue::file_, BlockSink::local_worker_id(), File::num_items(), BlockQueue::read_closed_, and sLOG.
Referenced by BlockQueue::GetReader(), and BlockQueue::timespan().
| BlockQueue::ConsumeReader GetConsumeReader | ( | size_t | local_worker_id | ) |
return BlockReader specifically for a BlockQueue
Definition at line 47 of file block_queue.cpp.
References BlockQueue::read_closed_.
Referenced by BlockQueue::timespan().
| BlockQueue::Reader GetReader | ( | bool | consume, |
| size_t | local_worker_id | ||
| ) |
return polymorphic BlockReader variant
Definition at line 85 of file block_queue.cpp.
References BlockQueue::GetBlockSource().
Referenced by BlockQueue::timespan().
| BlockQueue::Writer GetWriter | ( | size_t | block_size = default_block_size | ) |
Return a BlockWriter delivering to this BlockQueue.
Definition at line 43 of file block_queue.cpp.
Referenced by BlockQueue::timespan().
|
inline |
Returns item_counter_.
Definition at line 123 of file block_queue.hpp.
References BlockQueue::item_counter_.
Referenced by CatStreamData::CatStreamData().
|
delete |
non-copyable: delete assignment operator
Referenced by BlockQueueSink::BlockQueueSink().
|
default |
move-assignment operator: default
|
inline |
Definition at line 92 of file block_queue.hpp.
References Block::IsValid(), BlockQueue::queue_, and BlockQueue::read_closed_.
Referenced by ConsumeBlockQueueSource::NextBlock(), and CacheBlockQueueSource::NextBlock().
|
inline |
check if reader side has returned a closing sentinel block
Definition at line 117 of file block_queue.hpp.
References BlockQueue::read_closed_.
Referenced by CacheBlockQueueSource::NextBlock(), and CacheBlockQueueSource::~CacheBlockQueueSource().
|
inline |
set the close callback
Definition at line 107 of file block_queue.hpp.
References BlockQueue::close_callback_.
|
inline |
change dia_id after construction (needed because it may be unknown at construction)
Definition at line 102 of file block_queue.hpp.
References BlockQueue::file_, and File::set_dia_id().
|
inline |
return number of block in the queue. Use this ONLY for DEBUGGING!
Definition at line 120 of file block_queue.hpp.
References BlockQueue::queue_, and BlockQueue::write_closed().
|
inline |
Returns timespan_.
Definition at line 129 of file block_queue.hpp.
References thrill::data::default_block_size, BlockQueue::GetBlockSource(), BlockQueue::GetConsumeReader(), BlockQueue::GetReader(), BlockQueue::GetWriter(), and BlockQueue::timespan_.
|
inline |
check if writer side Close() was called.
Definition at line 112 of file block_queue.hpp.
References BlockQueue::write_closed_.
Referenced by BlockQueue::size().
|
static |
Definition at line 90 of file block_queue.hpp.
|
private |
number of blocks transfered by the Queue
Definition at line 157 of file block_queue.hpp.
Referenced by BlockQueue::AppendBlock(), BlockQueue::block_counter(), and BlockQueue::Close().
|
private |
number of bytes transfered by the Queue
Definition at line 155 of file block_queue.hpp.
Referenced by BlockQueue::AppendBlock(), and BlockQueue::byte_counter().
|
private |
callback to issue when the writer closes the Queue – for delivering stats
Definition at line 166 of file block_queue.hpp.
Referenced by BlockQueue::Close(), and BlockQueue::set_close_callback().
|
static |
Definition at line 50 of file block_queue.hpp.
|
private |
File to cache blocks for implementing CacheBlockQueueSource.
Definition at line 162 of file block_queue.hpp.
Referenced by BlockQueue::GetBlockSource(), CacheBlockQueueSource::NextBlock(), and BlockQueue::set_dia_id().
|
private |
number of items transfered by the Queue
Definition at line 153 of file block_queue.hpp.
Referenced by BlockQueue::AppendBlock(), and BlockQueue::item_counter().
|
private |
Definition at line 144 of file block_queue.hpp.
Referenced by BlockQueue::AppendBlock(), BlockQueueSink::AppendBlock(), BlockQueue::Close(), BlockQueueSink::Close(), BlockQueue::empty(), BlockQueue::Pop(), BlockQueue::size(), and BlockQueueSink::~BlockQueueSink().
|
private |
whether Pop() has returned a closing Block; hence, if we received the close message from the writer
Definition at line 150 of file block_queue.hpp.
Referenced by BlockQueue::GetBlockSource(), BlockQueue::GetConsumeReader(), BlockQueue::Pop(), and BlockQueue::read_closed().
|
private |
timespan of existance
Definition at line 159 of file block_queue.hpp.
Referenced by BlockQueue::timespan().
|
private |
Definition at line 146 of file block_queue.hpp.
Referenced by BlockQueue::Close(), and BlockQueue::write_closed().