Thrill  0.1
BlockQueue Class Referencefinal

Detailed Description

A BlockQueue is a thread-safe queue used to hand-over Block objects between threads.

It is currently used by the Multiplexer to queue received Blocks and deliver them (later) to their destination.

The BlockQueue itself is also a BlockSink (so one can attach a BlockWriter to it). To read items from the queue, one needs to use a BlockReader instantiated with a BlockQueueSource. Both are easily available via GetWriter() and GetReader(). Each block is available only once via the BlockQueueSource.

Definition at line 47 of file block_queue.hpp.

+ Inheritance diagram for BlockQueue:
+ Collaboration diagram for BlockQueue:

#include <block_queue.hpp>

Public Types

using CloseCallback = tlx::delegate< void(BlockQueue &)>
 
using ConsumeReader = BlockReader< ConsumeBlockQueueSource >
 
using Reader = DynBlockReader
 
using Writer = BlockWriter< BlockQueueSink >
 

Public Member Functions

 BlockQueue (BlockPool &block_pool, size_t local_worker_id, size_t dia_id, const CloseCallback &close_callback=CloseCallback())
 Constructor from BlockPool. More...
 
 BlockQueue (const BlockQueue &)=delete
 non-copyable: delete copy-constructor More...
 
 BlockQueue (BlockQueue &&)=default
 move-constructor: default More...
 
void AppendBlock (const Block &b, bool) final
 Appends the (unpinned) Block. More...
 
void AppendBlock (Block &&b, bool) final
 Appends the (unpinned) Block. More...
 
size_t block_counter () const
 Returns block_counter_. More...
 
size_t byte_counter () const
 Returns byte_counter_. More...
 
void Close () final
 Close called by BlockWriter. More...
 
bool empty () const
 
DynBlockSource GetBlockSource (bool consume, size_t local_worker_id)
 return polymorphic BlockSource variant More...
 
ConsumeReader GetConsumeReader (size_t local_worker_id)
 return BlockReader specifically for a BlockQueue More...
 
Reader GetReader (bool consume, size_t local_worker_id)
 return polymorphic BlockReader variant More...
 
Writer GetWriter (size_t block_size=default_block_size)
 Return a BlockWriter delivering to this BlockQueue. More...
 
size_t item_counter () const
 Returns item_counter_. More...
 
BlockQueueoperator= (const BlockQueue &)=delete
 non-copyable: delete assignment operator More...
 
BlockQueueoperator= (BlockQueue &&)=default
 move-assignment operator: default More...
 
Block Pop ()
 
bool read_closed () const
 check if reader side has returned a closing sentinel block More...
 
void set_close_callback (const CloseCallback &cb)
 set the close callback More...
 
void set_dia_id (size_t dia_id)
 
size_t size ()
 return number of block in the queue. Use this ONLY for DEBUGGING! More...
 
const common::StatsTimertimespan () const
 Returns timespan_. More...
 
bool write_closed () const
 check if writer side Close() was called. More...
 
- Public Member Functions inherited from BlockSink
 BlockSink (BlockPool &block_pool, size_t local_worker_id)
 constructor with reference to BlockPool More...
 
 BlockSink (BlockPool *block_pool, size_t local_worker_id)
 constructor with reference to BlockPool More...
 
 BlockSink (const BlockSink &)=default
 default copy-constructor More...
 
 BlockSink (BlockSink &&)=default
 move-constructor: default More...
 
virtual ~BlockSink ()
 required virtual destructor More...
 
virtual PinnedByteBlockPtr AllocateByteBlock (size_t block_size)
 
virtual void AppendPinnedBlock (PinnedBlock &&b, bool is_last_block)
 Appends the PinnedBlock. More...
 
BlockPoolblock_pool () const
 Returns block_pool_. More...
 
size_t local_worker_id () const
 local worker id to associate pinned block with More...
 
common::JsonLoggerlogger ()
 Returns BlockPool.logger_. More...
 
BlockSinkoperator= (const BlockSink &)=default
 default assignment operator More...
 
BlockSinkoperator= (BlockSink &&)=default
 move-assignment operator: default More...
 
virtual void ReleaseByteBlock (ByteBlockPtr &block)
 Release an unused ByteBlock with n bytes backing memory. More...
 
size_t workers_per_host () const
 return number of workers per host More...
 

Static Public Attributes

static constexpr bool allocate_can_fail_ = false
 
static constexpr bool debug = false
 
- Static Public Attributes inherited from BlockSink
static constexpr bool allocate_can_fail_ = false
 

Private Attributes

size_t block_counter_ = 0
 number of blocks transfered by the Queue More...
 
size_t byte_counter_ = 0
 number of bytes transfered by the Queue More...
 
CloseCallback close_callback_
 
File file_
 File to cache blocks for implementing CacheBlockQueueSource. More...
 
size_t item_counter_ = 0
 number of items transfered by the Queue More...
 
common::ConcurrentBoundedQueue< Blockqueue_
 
bool read_closed_ = false
 
common::StatsTimerStart timespan_
 timespan of existance More...
 
common::AtomicMovable< bool > write_closed_ = { false }
 

Additional Inherited Members

- Protected Attributes inherited from BlockSink
size_t local_worker_id_
 local worker id to associate pinned block with More...
 

Member Typedef Documentation

◆ CloseCallback

Definition at line 56 of file block_queue.hpp.

◆ ConsumeReader

Definition at line 54 of file block_queue.hpp.

◆ Reader

Definition at line 53 of file block_queue.hpp.

◆ Writer

Definition at line 52 of file block_queue.hpp.

Constructor & Destructor Documentation

◆ BlockQueue() [1/3]

BlockQueue ( BlockPool block_pool,
size_t  local_worker_id,
size_t  dia_id,
const CloseCallback close_callback = CloseCallback() 
)

Constructor from BlockPool.

Definition at line 19 of file block_queue.cpp.

References BlockPool::workers_per_host().

◆ BlockQueue() [2/3]

BlockQueue ( const BlockQueue )
delete

non-copyable: delete copy-constructor

◆ BlockQueue() [3/3]

BlockQueue ( BlockQueue &&  )
default

move-constructor: default

Member Function Documentation

◆ AppendBlock() [1/2]

void AppendBlock ( const Block b,
bool  is_last_block 
)
inlinefinalvirtual

Appends the (unpinned) Block.

Implements BlockSink.

Definition at line 72 of file block_queue.hpp.

References BlockQueue::block_counter_, BlockQueue::byte_counter_, BlockQueue::item_counter_, LOG, and BlockQueue::queue_.

Referenced by StreamSink::AppendBlock().

◆ AppendBlock() [2/2]

void AppendBlock ( Block &&  b,
bool  is_last_block 
)
inlinefinalvirtual

◆ block_counter()

size_t block_counter ( ) const
inline

Returns block_counter_.

Definition at line 127 of file block_queue.hpp.

References BlockQueue::block_counter_.

Referenced by CatStreamData::CatStreamData().

◆ byte_counter()

size_t byte_counter ( ) const
inline

Returns byte_counter_.

Definition at line 125 of file block_queue.hpp.

References BlockQueue::byte_counter_.

Referenced by CatStreamData::CatStreamData().

◆ Close()

void Close ( )
finalvirtual

◆ empty()

bool empty ( ) const
inline

Definition at line 114 of file block_queue.hpp.

References BlockQueue::queue_.

◆ GetBlockSource()

DynBlockSource GetBlockSource ( bool  consume,
size_t  local_worker_id 
)

return polymorphic BlockSource variant

Definition at line 52 of file block_queue.cpp.

References BlockQueue::file_, BlockSink::local_worker_id(), File::num_items(), BlockQueue::read_closed_, and sLOG.

Referenced by BlockQueue::GetReader(), and BlockQueue::timespan().

◆ GetConsumeReader()

BlockQueue::ConsumeReader GetConsumeReader ( size_t  local_worker_id)

return BlockReader specifically for a BlockQueue

Definition at line 47 of file block_queue.cpp.

References BlockQueue::read_closed_.

Referenced by BlockQueue::timespan().

◆ GetReader()

BlockQueue::Reader GetReader ( bool  consume,
size_t  local_worker_id 
)

return polymorphic BlockReader variant

Definition at line 85 of file block_queue.cpp.

References BlockQueue::GetBlockSource().

Referenced by BlockQueue::timespan().

◆ GetWriter()

BlockQueue::Writer GetWriter ( size_t  block_size = default_block_size)

Return a BlockWriter delivering to this BlockQueue.

Definition at line 43 of file block_queue.cpp.

Referenced by BlockQueue::timespan().

◆ item_counter()

size_t item_counter ( ) const
inline

Returns item_counter_.

Definition at line 123 of file block_queue.hpp.

References BlockQueue::item_counter_.

Referenced by CatStreamData::CatStreamData().

◆ operator=() [1/2]

BlockQueue& operator= ( const BlockQueue )
delete

non-copyable: delete assignment operator

Referenced by BlockQueueSink::BlockQueueSink().

◆ operator=() [2/2]

BlockQueue& operator= ( BlockQueue &&  )
default

move-assignment operator: default

◆ Pop()

◆ read_closed()

bool read_closed ( ) const
inline

check if reader side has returned a closing sentinel block

Definition at line 117 of file block_queue.hpp.

References BlockQueue::read_closed_.

Referenced by CacheBlockQueueSource::NextBlock(), and CacheBlockQueueSource::~CacheBlockQueueSource().

◆ set_close_callback()

void set_close_callback ( const CloseCallback cb)
inline

set the close callback

Definition at line 107 of file block_queue.hpp.

References BlockQueue::close_callback_.

◆ set_dia_id()

void set_dia_id ( size_t  dia_id)
inline

change dia_id after construction (needed because it may be unknown at construction)

Definition at line 102 of file block_queue.hpp.

References BlockQueue::file_, and File::set_dia_id().

◆ size()

size_t size ( )
inline

return number of block in the queue. Use this ONLY for DEBUGGING!

Definition at line 120 of file block_queue.hpp.

References BlockQueue::queue_, and BlockQueue::write_closed().

◆ timespan()

◆ write_closed()

bool write_closed ( ) const
inline

check if writer side Close() was called.

Definition at line 112 of file block_queue.hpp.

References BlockQueue::write_closed_.

Referenced by BlockQueue::size().

Member Data Documentation

◆ allocate_can_fail_

constexpr bool allocate_can_fail_ = false
static

Definition at line 90 of file block_queue.hpp.

◆ block_counter_

size_t block_counter_ = 0
private

number of blocks transfered by the Queue

Definition at line 157 of file block_queue.hpp.

Referenced by BlockQueue::AppendBlock(), BlockQueue::block_counter(), and BlockQueue::Close().

◆ byte_counter_

size_t byte_counter_ = 0
private

number of bytes transfered by the Queue

Definition at line 155 of file block_queue.hpp.

Referenced by BlockQueue::AppendBlock(), and BlockQueue::byte_counter().

◆ close_callback_

CloseCallback close_callback_
private

callback to issue when the writer closes the Queue – for delivering stats

Definition at line 166 of file block_queue.hpp.

Referenced by BlockQueue::Close(), and BlockQueue::set_close_callback().

◆ debug

constexpr bool debug = false
static

Definition at line 50 of file block_queue.hpp.

◆ file_

File file_
private

File to cache blocks for implementing CacheBlockQueueSource.

Definition at line 162 of file block_queue.hpp.

Referenced by BlockQueue::GetBlockSource(), CacheBlockQueueSource::NextBlock(), and BlockQueue::set_dia_id().

◆ item_counter_

size_t item_counter_ = 0
private

number of items transfered by the Queue

Definition at line 153 of file block_queue.hpp.

Referenced by BlockQueue::AppendBlock(), and BlockQueue::item_counter().

◆ queue_

◆ read_closed_

bool read_closed_ = false
private

whether Pop() has returned a closing Block; hence, if we received the close message from the writer

Definition at line 150 of file block_queue.hpp.

Referenced by BlockQueue::GetBlockSource(), BlockQueue::GetConsumeReader(), BlockQueue::Pop(), and BlockQueue::read_closed().

◆ timespan_

common::StatsTimerStart timespan_
private

timespan of existance

Definition at line 159 of file block_queue.hpp.

Referenced by BlockQueue::timespan().

◆ write_closed_

common::AtomicMovable<bool> write_closed_ = { false }
private

Definition at line 146 of file block_queue.hpp.

Referenced by BlockQueue::Close(), and BlockQueue::write_closed().


The documentation for this class was generated from the following files: