24 :
BlockSink(nullptr, -1), closed_(true) { }
29 size_t host_rank,
size_t host_local_worker,
30 size_t peer_rank,
size_t peer_local_worker)
31 :
BlockSink(block_pool, host_local_worker),
40 <<
"class" <<
"StreamSink" 51 size_t host_rank,
size_t host_local_worker,
52 size_t peer_rank,
size_t peer_local_worker)
53 :
BlockSink(block_pool, host_local_worker),
61 <<
"class" <<
"StreamSink" 72 size_t host_rank,
size_t host_local_worker,
73 size_t peer_rank,
size_t peer_local_worker)
74 :
BlockSink(block_pool, host_local_worker),
82 <<
"class" <<
"StreamSink" 99 if (block.
size() == 0)
return;
102 LOG <<
"StreamSink::AppendBlock()" 103 <<
" block=" << block
104 <<
" is_last_block=" << is_last_block
127 LOG <<
"StreamSink::AppendBlock()" 128 <<
" block=" << block
129 <<
" is_last_block=" << is_last_block
158 die(
"FIXME: this should never be used?");
163 if (block.size() == 0)
return;
165 LOG <<
"StreamSink::AppendPinnedBlock()" 166 <<
" block=" << block
167 <<
" is_last_block=" << is_last_block
184 stream_->tx_int_items_ += block.num_items();
185 stream_->tx_int_bytes_ += block.size();
192 stream_->tx_int_items_ += block.num_items();
193 stream_->tx_int_bytes_ += block.size();
198 std::move(block).MoveToBlock());
201 LOG0 <<
"StreamSink::AppendPinnedBlock()" 205 header.stream_id =
id_;
208 header.seq = block_counter_ - 1;
209 header.is_last_block = is_last_block;
212 header.Serialize(bb);
217 size_t send_size = buffer.
size() + block.size();
221 stream_->tx_net_items_ += block.num_items();
222 stream_->tx_net_bytes_ += send_size;
226 stream_->multiplexer_.dispatcher_.AsyncWrite(
229 std::move(buffer), std::move(block),
231 s->sem_queue_.signal(send_size);
238 LOG <<
"StreamSink::AppendPinnedBlock()" 239 <<
" sent 'piggy-backed close stream' id=" <<
id_ 255 LOG <<
"StreamSink::Close() sending 'close stream' id=" <<
id_ 284 <<
"class" <<
"StreamSink" 285 <<
"event" <<
"close" BlockQueue * block_queue_
PinnedBlock PinWait(size_t local_worker_id) const
Convenience function to call Pin() and wait for the future.
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
virtual void AppendPinnedBlock(PinnedBlock &&b, bool is_last_block)
Appends the PinnedBlock.
size_t peer_local_worker_
size_t my_worker_rank() const
return local worker rank
#define LOG0
Override default output: never or always output log.
void AppendBlock(const Block &block, bool is_last_block) final
Appends data to the StreamSink. Data may be sent but may be delayed.
Buffer ToBuffer()
Explicit conversion to Buffer MOVING the memory ownership.
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
size_t size() const
return length of valid data in bytes.
void Close() final
Closes the connection.
size_t local_worker_id_
local worker id to associate pinned block with
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
size_t local_worker_id() const
local worker id to associate pinned block with
A Connection represents a link to another peer in a network group.
BlockPool * block_pool() const
Returns block_pool_.
void AppendPinnedBlock(PinnedBlock &&block, bool is_last_block) final
Appends data to the StreamSink. Data may be sent but may be delayed.
void AppendBlock(const Block &b, bool) final
Appends the (unpinned) Block.
size_t num_items() const
return number of items beginning in this block
BufferBuilder represents a dynamically growable area of memory, which can be modified by appending in...
Pure virtual base class for all things that can receive Blocks from a BlockWriter.
A pinned / pin-counted derivative of a Block.
MixStreamDataPtr target_mix_stream_
destination mix stream
std::atomic< uint32_t > tx_seq_
send sequence
Simple buffer of characters without initialization or growing functionality.
std::string hexdump(const void *const data, size_t size)
Dump a (binary) string as a sequence of uppercase hexadecimal pairs.
void Finalize()
Finalize structure after sending the piggybacked or explicit close.
net::Connection * connection_
size_t peer_worker_rank() const
return remote worker rank
size_t workers_per_host() const
return number of workers per host
#define LOG
Default logging method: output if the local debug variable is true.
common::JsonLogger & logger()
Returns BlockPool.logger_.
A BlockQueue is a thread-safe queue used to hand-over Block objects between threads.
size_type size() const noexcept
return number of items in Buffer
void Close() final
Close called by BlockWriter.
common::StatsTimerStart timespan_