31 size_t local_worker_id,
size_t dia_id)
33 send_size_limit, id, local_worker_id, dia_id) {
41 for (
size_t host = 0; host <
num_hosts(); ++host) {
47 <<
"class" <<
"StreamSink" 50 <<
"peer_host" << host
53 <<
"loopback" <<
true;
75 LOG <<
"~CatStreamData() deleted";
80 for (
size_t i = 0; i <
queues_.size(); ++i) {
91 size_t block_size_base = hard_ram_limit / 4
106 <<
"CatStreamData::GetWriters()" 107 <<
" hard_ram_limit=" << hard_ram_limit
108 <<
" block_size_base=" << block_size_base
109 <<
" block_size=" << block_size
118 for (
size_t host = 0; host <
num_hosts(); ++host) {
157 std::vector<BlockQueueReader> result;
160 for (
size_t worker = 0; worker <
num_workers(); ++worker) {
173 std::vector<DynBlockSource> result;
176 for (
size_t worker = 0; worker <
num_workers(); ++worker) {
197 sLOG <<
"CatStreamData" <<
id() <<
"close" 203 if (!
queues_[my_global_worker_id].write_closed())
204 queues_[my_global_worker_id].Close();
218 LOG <<
"CatStreamData::Close() finished" 226 closed = closed && q.write_closed();
232 return queues_[from].write_closed();
235 struct CatStreamData::SeqReordering {
240 std::map<uint32_t, Block> waiting_;
247 LOG <<
"OnCatStreamBlock" 253 sLOG <<
"stream" <<
id_ <<
"receive from" << from <<
":" 262 seq_[from].waiting_.insert(
263 std::make_pair(seq, std::move(b)));
271 while (!
seq_[from].waiting_.empty() &&
272 (
seq_[from].waiting_.begin()->first ==
seq_[from].seq_ ||
275 sLOG <<
"CatStreamData::OnStreamBlock" 276 <<
"processing delayed block with seq" 277 <<
seq_[from].waiting_.begin()->first;
280 from, std::move(
seq_[from].waiting_.begin()->second));
282 seq_[from].waiting_.erase(
283 seq_[from].waiting_.begin());
293 queues_[from].AppendBlock(std::move(b),
false);
296 sLOG <<
"CatStreamData::OnCloseStream" 305 if (--remaining_closing_blocks_ == 0) {
320 return &(
queues_[global_worker_rank]);
346 return ptr_->GetWriters();
350 return ptr_->GetReaders();
354 return ptr_->GetCatReader(consume);
358 return ptr_->GetReader(consume);
std::atomic< size_t > rx_net_items_
Writers GetWriters() final
bool all_writers_closed_
bool if all writers were closed
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
std::vector< BlockQueue > queues_
BlockQueues to store incoming Blocks with no attached destination.
common::JsonLogger & logger()
Get the JsonLogger from the BlockPool.
common::StatsTimerStopped tx_timespan_
Timers from first rx / tx package until rx / tx direction is closed.
const StreamId & id() const final
Return stream id.
#define sLOG
Default logging method: output if the local debug variable is true.
bool is_queue_closed(size_t from)
check if inbound queue is closed
Base class for common structures for ConcatStream and MixedStream.
static uint_pair max()
return an uint_pair instance containing the largest value possible
CatReader GetReader(bool consume)
Open a CatReader (function name matches a method in File and MixStream).
std::mutex mutex_
protects critical sections
size_t hard_ram_limit() noexcept
Hard limit on amount of memory used for ByteBlock.
void IntReleaseCatStream(size_t id, size_t local_worker_id)
release pointer onto a CatStreamData object
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
size_t my_worker_rank() const
Returns my_worker_rank_.
CatReader GetCatReader(bool consume)
const char * stream_type() final
return stream type string
CatBlockSource is a BlockSource which concatenates all Blocks available from a vector of BlockSources...
std::atomic< size_t > rx_net_bytes_
Multiplexer & multiplexer_
reference to multiplexer
CatReader GetReader(bool consume)
Open a CatReader (function name matches a method in File and MixStream).
virtual Connection & connection(size_t id)=0
Return Connection to client id.
tlx::CountingPtr< StreamData > StreamDataPtr
StreamData & data() final
Return stream data reference.
Multiplexes virtual Connections on Dispatcher.
std::atomic< size_t > remaining_closing_blocks_
std::atomic< size_t > max_active_streams_
maximu number of active Cat/MixStreams
friend class StreamSink
friends for access to multiplexer_
size_t local_worker_id_
local worker id
CatReader GetCatReader(bool consume)
std::vector< Reader > GetReaders()
common::StatsTimerStopped rx_timespan_
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
size_t item_counter() const
Returns item_counter_.
std::vector< Reader > GetReaders()
void Close() final
shuts the stream down.
size_t num_hosts() const
Number of hosts in system.
CatBlockSource GetCatBlockSource(bool consume)
Gets a CatBlockSource which includes all incoming queues of this stream.
size_t wait(size_t delta=1, size_t slack=0)
net::Group & group_
Holds NetConnections for outgoing Streams.
size_t workers_per_host() const
number of workers per host
void OnStreamBlock(size_t from, uint32_t seq, Block &&b)
ConsumeBlockQueueSource BlockQueueSource
size_t block_counter() const
Returns block_counter_.
tlx::Semaphore sem_closing_blocks_
number of received stream closing Blocks.
data::CatBlockSource< DynBlockSource > CatBlockSource
Base class for StreamSet.
CatStreamData(StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
Creates a new stream instance.
CatStreamDataPtr CatLoopback(size_t stream_id, size_t to_worker_id)
size_t byte_counter() const
Returns byte_counter_.
const StreamId & id() const
Return stream id.
BlockQueue * loopback_queue(size_t from_worker_id)
Returns the loopback queue for the worker of this stream.
void OnStreamBlockOrdered(size_t from, Block &&b)
StreamId id_
our own stream id.
BlockReader< CatBlockSource > CatBlockReader
static constexpr bool debug_data
size_t dia_id_
associated DIANode id.
size_t num_workers() const
Number of workers in system.
size_t my_host_rank() const
Returns my_host_rank.
static int round_down_to_power_of_two(int i)
does what it says: round down to next power of two
bool closed() const final
void set_dia_id(size_t dia_id)
size_t num_workers() const
total number of workers.
std::string hexdump(const void *const data, size_t size)
Dump a (binary) string as a sequence of uppercase hexadecimal pairs.
std::atomic< size_t > rx_net_blocks_
Writers GetWriters() final
BlockPool & block_pool_
reference to host-global BlockPool.
#define LOG
Default logging method: output if the local debug variable is true.
std::atomic< size_t > active_streams_
number of active Cat/MixStreams
CatStream(const CatStreamDataPtr &ptr)
A BlockQueue is a thread-safe queue used to hand-over Block objects between threads.
#define LOGC(cond)
Explicitly specify the condition for logging.
size_t workers_per_host() const
Returns workers_per_host.
common::StatsTimerStart rx_lifetime_
std::vector< SeqReordering > seq_
Block Sequence numbers.