30 size_t local_worker_id,
size_t dia_id)
32 send_size_limit, id, local_worker_id, dia_id),
34 queue_(multiplexer_.block_pool_, num_workers(),
35 local_worker_id, dia_id) {
40 LOG <<
"~MixStreamData() deleted";
54 size_t block_size_base = hard_ram_limit / 4
69 <<
"MixStreamData::GetWriters()" 70 <<
" hard_ram_limit=" << hard_ram_limit
71 <<
" block_size_base=" << block_size_base
72 <<
" block_size=" << block_size
81 for (
size_t host = 0; host <
num_hosts(); ++host) {
130 LOG <<
"MixStreamData::Close() wait for closing block" 144 LOG <<
"MixStreamData::Close() finished" 160 struct MixStreamData::SeqReordering {
165 std::map<uint32_t, Block> waiting_;
172 sLOG <<
"MixStreamData::OnStreamBlock" << b
182 seq_[from].waiting_.insert(std::make_pair(seq, std::move(b)));
190 while (!
seq_[from].waiting_.empty() &&
191 (
seq_[from].waiting_.begin()->first ==
seq_[from].seq_ ||
194 sLOG <<
"MixStreamData::OnStreamBlock" 195 <<
"processing delayed block with seq" 196 <<
seq_[from].waiting_.begin()->first;
199 from, std::move(
seq_[from].waiting_.begin()->second));
201 seq_[from].waiting_.erase(
202 seq_[from].waiting_.begin());
216 sLOG <<
"MixStreamData::OnCloseStream" 225 if (--remaining_closing_blocks_ == 0) {
259 return ptr_->GetWriters();
263 return ptr_->GetMixReader(consume);
267 return ptr_->GetReader(consume);
std::atomic< size_t > rx_net_items_
bool all_writers_closed_
bool if all writers were closed
void Close(size_t src)
append closing sentinel block from src (also delivered via the network).
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
common::StatsTimerStopped tx_timespan_
Timers from first rx / tx package until rx / tx direction is closed.
#define sLOG
Default logging method: output if the local debug variable is true.
Base class for common structures for ConcatStream and MixedStream.
static uint_pair max()
return an uint_pair instance containing the largest value possible
size_t value() const
return the current value – should only be used for debugging.
void set_dia_id(size_t dia_id)
StreamData & data() final
Return stream data reference.
void Close() final
shuts the stream down.
std::mutex mutex_
protects critical sections
MixBlockQueue queue_
BlockQueue to store incoming Blocks with source.
size_t hard_ram_limit() noexcept
Hard limit on amount of memory used for ByteBlock.
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
size_t my_worker_rank() const
Returns my_worker_rank_.
MixStreamDataPtr MixLoopback(size_t stream_id, size_t to_worker_id)
bool is_closed_
flag if Close() was completed
std::atomic< size_t > rx_net_bytes_
Multiplexer & multiplexer_
reference to multiplexer
MixStream(const MixStreamDataPtr &ptr)
MixBlockQueueReader MixReader
virtual Connection & connection(size_t id)=0
Return Connection to client id.
tlx::CountingPtr< StreamData > StreamDataPtr
Multiplexes virtual Connections on Dispatcher.
std::atomic< size_t > remaining_closing_blocks_
std::atomic< size_t > max_active_streams_
maximu number of active Cat/MixStreams
MixReader GetReader(bool consume)
Open a MixReader (function name matches a method in File and CatStream).
const StreamId & id() const final
Return stream id.
size_t local_worker_id_
local worker id
MixReader GetMixReader(bool consume)
Writers GetWriters() final
void IntReleaseMixStream(size_t id, size_t local_worker_id)
release pointer onto a MixStream object
MixReader GetReader(bool consume)
Open a MixReader (function name matches a method in File and CatStream).
common::StatsTimerStopped rx_timespan_
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Reader to retrieve items in unordered sequence from a MixBlockQueue.
std::vector< SeqReordering > seq_
Block Sequence numbers.
bool write_closed() const
check if writer side Close() was called.
size_t num_hosts() const
Number of hosts in system.
bool is_queue_closed(size_t from)
check if inbound queue is closed
size_t wait(size_t delta=1, size_t slack=0)
net::Group & group_
Holds NetConnections for outgoing Streams.
size_t workers_per_host() const
number of workers per host
bool closed() const final
tlx::Semaphore sem_closing_blocks_
number of received stream closing Blocks.
Base class for StreamSet.
void AppendBlock(size_t src, const Block &block)
append block delivered via the network from src.
MixStreamData(StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
Creates a new stream instance.
bool is_queue_closed(size_t src)
check if inbound queue is closed
StreamId id_
our own stream id.
size_t dia_id_
associated DIANode id.
size_t num_workers() const
Number of workers in system.
size_t my_host_rank() const
Returns my_host_rank.
static int round_down_to_power_of_two(int i)
does what it says: round down to next power of two
size_t num_workers() const
total number of workers.
std::atomic< size_t > rx_net_blocks_
BlockPool & block_pool_
reference to host-global BlockPool.
void OnStreamBlockOrdered(size_t from, Block &&b)
called to process PinnedBlock in sequence
MixReader GetMixReader(bool consume)
Creates a BlockReader which mixes items from all workers.
void OnStreamBlock(size_t from, uint32_t seq, Block &&b)
called from Multiplexer when there is a new Block for this Stream.
#define LOG
Default logging method: output if the local debug variable is true.
const char * stream_type() final
return stream type string
std::atomic< size_t > active_streams_
number of active Cat/MixStreams
#define LOGC(cond)
Explicitly specify the condition for logging.
size_t workers_per_host() const
Returns workers_per_host.
common::StatsTimerStart rx_lifetime_
void set_dia_id(size_t dia_id)