27 size_t local_worker_id,
size_t dia_id)
28 : sem_queue_(send_size_limit),
30 stream_set_base_(stream_set_base),
31 local_worker_id_(local_worker_id),
33 multiplexer_(multiplexer)
41 LOG <<
"StreamData::OnWriterClosed()" 43 <<
" peer_worker_rank=" << peer_worker_rank
51 LOG <<
"StreamData::OnWriterClosed() final close received";
63 <<
"class" <<
"StreamData" 89 : my_worker_rank_(my_worker_rank)
95 for (
size_t i = 0; i < s; ++i) {
107 template <
typename StreamData>
113 tlx::make_counting<StreamData>(
114 this, multiplexer, send_size_limit,
id, i, dia_id));
121 template <
typename StreamData>
123 assert(local_worker_id <
streams_.size());
127 template <
typename StreamData>
129 std::unique_lock<std::mutex> lock(
mutex_);
130 assert(local_worker_id <
streams_.size());
139 template <
typename StreamData>
145 template <
typename StreamData>
147 std::unique_lock<std::mutex> lock(
mutex_);
156 LOG <<
"StreamSet::OnWriterClosed()" 158 <<
" peer_host_rank=" << peer_host_rank
159 <<
" peer_worker_rank=" << peer_worker_rank
160 <<
" writers_closed_per_host_[]=" 161 << writers_closed_per_host_[peer_host_rank];
163 die_unless(writers_closed_per_host_[peer_host_rank] <=
166 if (writers_closed_per_host_[peer_host_rank] ==
172 if (writers_closed_per_host_[peer_host_rank] ==
175 LOG <<
"StreamSet::OnWriterClosed() final close already-done" 177 <<
" peer_host_rank=" << peer_host_rank
178 <<
" writers_closed_per_host_[]=" 179 << writers_closed_per_host_[peer_host_rank];
183 LOG <<
"StreamSet::OnWriterClosed() final close " 185 <<
" peer_host_rank=" << peer_host_rank
186 <<
" writers_closed_per_host_[]=" 187 << writers_closed_per_host_[peer_host_rank];
205 conn, 42 + (conn.tx_seq_.fetch_add(2) & 0xFFFF),
std::atomic< size_t > rx_net_items_
bool all_writers_closed_
bool if all writers were closed
common::JsonLogger & logger()
Get the JsonLogger from the BlockPool.
common::StatsTimerStopped tx_timespan_
Timers from first rx / tx package until rx / tx direction is closed.
void OnWriterClosed(size_t peer_worker_rank, bool sent)
std::atomic< size_t > tx_net_items_
StreamSetBase * stream_set_base_
pointer to StreamSetBase containing this StreamData
size_t my_host_rank() const
Returns my_host_rank.
size_t my_worker_rank() const
Returns my_worker_rank_.
std::atomic< size_t > rx_int_items_
bool Release(size_t local_worker_id)
Buffer ToBuffer()
Explicit conversion to Buffer MOVING the memory ownership.
Multiplexer & multiplexer_
reference to multiplexer
std::atomic< size_t > rx_net_bytes_
Multiplexer & multiplexer_
reference to multiplexer
virtual Connection & connection(size_t id)=0
Return Connection to client id.
std::atomic< size_t > tx_int_bytes_
std::vector< StreamDataPtr > streams_
'owns' all streams belonging to one stream id for all local workers.
Multiplexes virtual Connections on Dispatcher.
Simple structure that holds a all stream instances for the workers on the local host for a given stre...
virtual void OnWriterClosed(size_t peer_worker_rank, bool sent)=0
StreamSet(Multiplexer &multiplexer, size_t send_size_limit, StreamId id, size_t workers_per_host, size_t dia_id)
void Close()
custom destructor to close writers is a cyclic fashion
std::atomic< size_t > rx_int_bytes_
common::StatsTimerStart tx_lifetime_
Timers from creation of stream until rx / tx direction is closed.
size_t local_worker_id_
local worker id
StreamDataPtr Peer(size_t local_worker_id)
size_t writers_closed_
number of writers closed via StreamSink.
std::vector< size_t > writers_closed_per_host_sent_
number of writers closed per host, message is set when all are closed
size_t my_worker_rank_
rank of this worker
void OnAllWritersClosed()
method called when all StreamSink writers have finished
virtual const char * stream_type()=0
return stream type string
std::vector< size_t > writers_closed_per_host_
number of writers closed per host, message is set when all are closed
A Connection represents a link to another peer in a network group.
size_t num_hosts() const
Number of hosts in system.
size_t workers_per_host() const
Returns workers_per_host.
size_t remaining_
countdown to destruction
std::atomic< size_t > tx_net_bytes_
size_t workers_per_host() const
number of workers per host
~Writers()
custom destructor to close writers is a cyclic fashion
net::DispatcherThread & dispatcher()
get network dispatcher
size_t num_hosts() const
Number of hosts in system.
Base class for StreamSet.
net::Group & group()
get network group connection
MagicByte magic_byte() const
Writers(size_t my_worker_rank=0)
void OnWriterClosed(size_t peer_worker_rank, bool sent)
StreamId id_
our own stream id.
size_t dia_id_
associated DIANode id.
std::atomic< size_t > rx_int_blocks_
void AsyncWrite(Connection &c, uint32_t seq, Buffer &&buffer, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
std::atomic< size_t > tx_net_blocks_
size_t my_host_rank() const
Returns my_host_rank.
BufferBuilder represents a dynamically growable area of memory, which can be modified by appending in...
void Close() final
Close all StreamData objects.
Simple buffer of characters without initialization or growing functionality.
std::atomic< size_t > tx_int_blocks_
std::atomic< size_t > rx_net_blocks_
StreamData(StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
std::atomic< size_t > tx_int_items_
#define LOG
Default logging method: output if the local debug variable is true.
std::mutex mutex_
mutex for working on the data structure
size_type size() const noexcept
return number of items in Buffer
size_t workers_per_host() const
Returns workers_per_host.