Thrill  0.1
MixBlockQueue Class Reference

Detailed Description

Implements reading an unordered sequence of items from multiple workers, which sends Blocks.

This class is mainly used to implement MixChannel.

When Blocks arrive from the net, the Multiplexer pushes (src, Blocks) pairs to MixChannel, which pushes them into a MixBlockQueue. The MixBlockQueue stores these in a ConcurrentBoundedQueue for atomic reading.

When the MixChannel should be read, MixBlockQueueReader is used, which retrieves Blocks from the queue. The Reader contains one complete BlockReader for each inbound worker, and these BlockReaders are attached to BlockQueue instances inside the MixBlockQueue.

To enable unordered reading from multiple workers, the only remaining thing to do is to fetch Blocks from the main mix queue and put them into the right BlockQueue for the sub-readers to consume. By taking the Blocks from the main mix queue, the Reader only blocks when no inbound Blocks are available.

To enable switching between items from different workers, the MixBlockQueueReader keeps track of how many whole items are available on each reader. This number is simply -1 of the number of items known to start in the received blocks. The last item may span further Blocks, and cannot be fetched without infinitely blocking the sub-reader, since no thread will deliver the next Block.

Definition at line 64 of file mix_block_queue.hpp.

+ Collaboration diagram for MixBlockQueue:

#include <mix_block_queue.hpp>

Classes

struct  SrcBlockPair
 pair of (source worker, Block) stored in the main mix queue. More...
 

Public Types

using Reader = MixBlockQueueReader
 

Public Member Functions

 MixBlockQueue (BlockPool &block_pool, size_t num_workers, size_t local_worker_id, size_t dia_id)
 Constructor from BlockPool. More...
 
 MixBlockQueue (const MixBlockQueue &)=delete
 non-copyable: delete copy-constructor More...
 
 MixBlockQueue (MixBlockQueue &&)=default
 move-constructor: default More...
 
void AppendBlock (size_t src, const Block &block)
 append block delivered via the network from src. More...
 
void AppendBlock (size_t src, Block &&block)
 append block delivered via the network from src. More...
 
BlockPoolblock_pool ()
 return block pool More...
 
void Close (size_t src)
 append closing sentinel block from src (also delivered via the network). More...
 
bool is_queue_closed (size_t src)
 check if inbound queue is closed More...
 
MixBlockQueueoperator= (const MixBlockQueue &)=delete
 non-copyable: delete assignment operator More...
 
MixBlockQueueoperator= (MixBlockQueue &&)=default
 move-assignment operator: default More...
 
SrcBlockPair Pop ()
 Blocking retrieval of a (source,block) pair. More...
 
bool read_closed () const
 check if reader side has returned a closing sentinel block More...
 
void set_dia_id (size_t dia_id)
 
bool write_closed () const
 check if writer side Close() was called. More...
 

Private Attributes

BlockPoolblock_pool_
 
size_t local_worker_id_
 
common::ConcurrentBoundedQueue< SrcBlockPairmix_queue_
 the main mix queue, containing the block in the reception order. More...
 
size_t num_workers_
 total number of workers in system. More...
 
std::vector< BlockQueuequeues_
 BlockQueues to deliver blocks to from mix queue. More...
 
size_t read_open_ = num_workers_
 
std::vector< unsigned char > write_closed_
 flag to test for closed sources More...
 
common::AtomicMovable< size_t > write_open_count_ { num_workers_ }
 counter on number of writers still open. More...
 

Static Private Attributes

static constexpr bool debug = false
 

Member Typedef Documentation

◆ Reader

Definition at line 75 of file mix_block_queue.hpp.

Constructor & Destructor Documentation

◆ MixBlockQueue() [1/3]

MixBlockQueue ( BlockPool block_pool,
size_t  num_workers,
size_t  local_worker_id,
size_t  dia_id 
)

Constructor from BlockPool.

Definition at line 22 of file mix_block_queue.cpp.

References MixBlockQueue::block_pool_, and MixBlockQueue::queues_.

◆ MixBlockQueue() [2/3]

MixBlockQueue ( const MixBlockQueue )
delete

non-copyable: delete copy-constructor

◆ MixBlockQueue() [3/3]

MixBlockQueue ( MixBlockQueue &&  )
default

move-constructor: default

Member Function Documentation

◆ AppendBlock() [1/2]

void AppendBlock ( size_t  src,
const Block block 
)

append block delivered via the network from src.

Definition at line 40 of file mix_block_queue.cpp.

References LOG, and MixBlockQueue::mix_queue_.

Referenced by MixBlockQueue::block_pool(), and MixStreamData::OnStreamBlockOrdered().

◆ AppendBlock() [2/2]

void AppendBlock ( size_t  src,
Block &&  block 
)

append block delivered via the network from src.

Definition at line 46 of file mix_block_queue.cpp.

References LOG, and MixBlockQueue::mix_queue_.

◆ block_pool()

◆ Close()

void Close ( size_t  src)

append closing sentinel block from src (also delivered via the network).

Definition at line 52 of file mix_block_queue.cpp.

References MixBlockQueue::local_worker_id_, LOG, MixBlockQueue::mix_queue_, MixBlockQueue::write_closed_, and MixBlockQueue::write_open_count_.

Referenced by MixBlockQueue::block_pool(), and MixStreamData::OnStreamBlockOrdered().

◆ is_queue_closed()

bool is_queue_closed ( size_t  src)

check if inbound queue is closed

Definition at line 65 of file mix_block_queue.cpp.

References MixBlockQueue::write_closed_.

Referenced by MixStreamData::is_queue_closed(), and MixBlockQueue::read_closed().

◆ operator=() [1/2]

MixBlockQueue& operator= ( const MixBlockQueue )
delete

non-copyable: delete assignment operator

◆ operator=() [2/2]

MixBlockQueue& operator= ( MixBlockQueue &&  )
default

move-assignment operator: default

◆ Pop()

◆ read_closed()

bool read_closed ( ) const
inline

check if reader side has returned a closing sentinel block

Definition at line 113 of file mix_block_queue.hpp.

References MixBlockQueue::is_queue_closed(), and MixBlockQueue::read_open_.

◆ set_dia_id()

void set_dia_id ( size_t  dia_id)

change dia_id after construction (needed because it may be unknown at construction)

Definition at line 34 of file mix_block_queue.cpp.

References MixBlockQueue::queues_.

Referenced by MixStreamData::set_dia_id().

◆ write_closed()

bool write_closed ( ) const
inline

check if writer side Close() was called.

Definition at line 110 of file mix_block_queue.hpp.

References MixBlockQueue::write_open_count_.

Referenced by MixStreamData::closed().

Member Data Documentation

◆ block_pool_

BlockPool& block_pool_
private

Definition at line 119 of file mix_block_queue.hpp.

Referenced by MixBlockQueue::block_pool(), and MixBlockQueue::MixBlockQueue().

◆ debug

constexpr bool debug = false
staticprivate

Definition at line 66 of file mix_block_queue.hpp.

◆ local_worker_id_

size_t local_worker_id_
private

Definition at line 121 of file mix_block_queue.hpp.

Referenced by MixBlockQueue::Close().

◆ mix_queue_

the main mix queue, containing the block in the reception order.

Definition at line 124 of file mix_block_queue.hpp.

Referenced by MixBlockQueue::AppendBlock(), MixBlockQueue::Close(), and MixBlockQueue::Pop().

◆ num_workers_

size_t num_workers_
private

total number of workers in system.

Definition at line 127 of file mix_block_queue.hpp.

Referenced by MixBlockQueueReader::MixBlockQueueReader().

◆ queues_

std::vector<BlockQueue> queues_
private

BlockQueues to deliver blocks to from mix queue.

Definition at line 140 of file mix_block_queue.hpp.

Referenced by MixBlockQueue::MixBlockQueue(), MixBlockQueueReader::MixBlockQueueReader(), MixBlockQueueReader::PullBlock(), and MixBlockQueue::set_dia_id().

◆ read_open_

size_t read_open_ = num_workers_
private

number of times Pop() has not yet returned a closing Block; hence, if we received the close message from the writer.

Definition at line 137 of file mix_block_queue.hpp.

Referenced by MixBlockQueue::Pop(), and MixBlockQueue::read_closed().

◆ write_closed_

std::vector<unsigned char> write_closed_
private

flag to test for closed sources

Definition at line 133 of file mix_block_queue.hpp.

Referenced by MixBlockQueue::Close(), and MixBlockQueue::is_queue_closed().

◆ write_open_count_

common::AtomicMovable<size_t> write_open_count_ { num_workers_ }
private

counter on number of writers still open.

Definition at line 130 of file mix_block_queue.hpp.

Referenced by MixBlockQueue::Close(), and MixBlockQueue::write_closed().


The documentation for this class was generated from the following files: