Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
stream_sink.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/stream_sink.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
12 
16 #include <thrill/data/stream.hpp>
17 
18 #include <tlx/string/hexdump.hpp>
19 
20 namespace thrill {
21 namespace data {
22 
24  : BlockSink(nullptr, -1), closed_(true) { }
25 
27  net::Connection* connection,
28  MagicByte magic, StreamId stream_id,
29  size_t host_rank, size_t host_local_worker,
30  size_t peer_rank, size_t peer_local_worker)
31  : BlockSink(block_pool, host_local_worker),
32  stream_(std::move(stream)),
33  connection_(connection),
34  magic_(magic),
35  id_(stream_id),
36  host_rank_(host_rank),
37  peer_rank_(peer_rank),
38  peer_local_worker_(peer_local_worker) {
39  logger()
40  << "class" << "StreamSink"
41  << "event" << "open"
42  << "id" << id_
43  << "peer_host" << peer_rank_
44  << "src_worker" << my_worker_rank()
45  << "tgt_worker" << peer_worker_rank();
46 }
47 
49  BlockQueue* block_queue,
50  StreamId stream_id,
51  size_t host_rank, size_t host_local_worker,
52  size_t peer_rank, size_t peer_local_worker)
53  : BlockSink(block_pool, host_local_worker),
54  stream_(std::move(stream)),
55  block_queue_(block_queue),
56  id_(stream_id),
57  host_rank_(host_rank),
58  peer_rank_(peer_rank),
59  peer_local_worker_(peer_local_worker) {
60  logger()
61  << "class" << "StreamSink"
62  << "event" << "open"
63  << "id" << id_
64  << "peer_host" << peer_rank_
65  << "src_worker" << my_worker_rank()
66  << "tgt_worker" << peer_worker_rank();
67 }
68 
70  MixStreamDataPtr target,
71  StreamId stream_id,
72  size_t host_rank, size_t host_local_worker,
73  size_t peer_rank, size_t peer_local_worker)
74  : BlockSink(block_pool, host_local_worker),
75  stream_(std::move(stream)),
76  target_mix_stream_(target),
77  id_(stream_id),
78  host_rank_(host_rank),
79  peer_rank_(peer_rank),
80  peer_local_worker_(peer_local_worker) {
81  logger()
82  << "class" << "StreamSink"
83  << "event" << "open"
84  << "id" << id_
85  << "peer_host" << peer_rank_
86  << "src_worker" << my_worker_rank()
87  << "tgt_worker" << peer_worker_rank();
88 }
89 
92 }
93 
96 }
97 
98 void StreamSink::AppendBlock(const Block& block, bool is_last_block) {
99  return AppendPinnedBlock(block.PinWait(local_worker_id()), is_last_block);
100 }
101 
102 void StreamSink::AppendBlock(Block&& block, bool is_last_block) {
103  return AppendPinnedBlock(block.PinWait(local_worker_id()), is_last_block);
104 }
105 
106 void StreamSink::AppendPinnedBlock(PinnedBlock&& block, bool is_last_block) {
107  if (block.size() == 0) return;
108 
109  LOG << "StreamSink::AppendPinnedBlock()"
110  << " block=" << block
111  << " is_last_block=" << is_last_block
112  << " id_= " << id_
113  << " host_rank_=" << host_rank_
114  << " local_worker_id_=" << local_worker_id_
115  << " peer_rank_=" << peer_rank_
116  << " peer_local_worker_=" << peer_local_worker_
117  << " item_counter_=" << item_counter_
118  << " byte_counter_=" << byte_counter_
119  << " block_counter_=" << block_counter_;
120 
121  // StreamSink statistics
122  item_counter_ += block.num_items();
123  byte_counter_ += block.size();
124  block_counter_++;
125 
126  if (block_queue_) {
127  // StreamData statistics for internal transfer
128  stream_->tx_int_items_ += block.num_items();
129  stream_->tx_int_bytes_ += block.size();
130  stream_->tx_int_blocks_++;
131 
132  return block_queue_->AppendPinnedBlock(std::move(block), is_last_block);
133  }
134  if (target_mix_stream_) {
135  // StreamData statistics for internal transfer
136  stream_->tx_int_items_ += block.num_items();
137  stream_->tx_int_bytes_ += block.size();
138  stream_->tx_int_blocks_++;
139 
140  return target_mix_stream_->OnStreamBlock(
141  my_worker_rank(), block_counter_ - 1, std::move(block));
142  }
143 
144  LOG0 << "StreamSink::AppendPinnedBlock()"
145  << " data=" << tlx::hexdump(block.ToString());
146 
147  StreamMultiplexerHeader header(magic_, block);
148  header.stream_id = id_;
149  header.sender_worker = my_worker_rank();
150  header.receiver_local_worker = peer_local_worker_;
151  header.seq = block_counter_ - 1;
152  header.is_last_block = is_last_block;
153 
155  header.Serialize(bb);
156 
157  net::Buffer buffer = bb.ToBuffer();
158  assert(buffer.size() == MultiplexerHeader::total_size);
159 
160  size_t send_size = buffer.size() + block.size();
161  stream_->sem_queue_.wait(send_size);
162 
163  // StreamData statistics for network transfer
164  stream_->tx_net_items_ += block.num_items();
165  stream_->tx_net_bytes_ += send_size;
166  stream_->tx_net_blocks_++;
167  byte_counter_ += buffer.size();
168 
169  stream_->multiplexer_.dispatcher_.AsyncWrite(
170  *connection_, 42 + (connection_->tx_seq_.fetch_add(2) & 0xFFFF),
171  // send out Buffer and Block, guaranteed to be successive
172  std::move(buffer), std::move(block),
173  [s = stream_, send_size](net::Connection&) {
174  s->sem_queue_.signal(send_size);
175  });
176 
177  if (is_last_block) {
178  assert(!closed_);
179  closed_ = true;
180 
181  LOG << "StreamSink::AppendPinnedBlock()"
182  << " sent 'piggy-backed close stream' id=" << id_
183  << " from=" << my_worker_rank()
184  << " (host=" << host_rank_ << ")"
185  << " to=" << peer_worker_rank()
186  << " (host=" << peer_rank_ << ")";
187 
188  Finalize();
189  }
190 }
191 
193  if (closed_) return;
194  closed_ = true;
195 
196  LOG << "StreamSink::Close() sending 'close stream' id=" << id_
197  << " from=" << my_worker_rank()
198  << " (host=" << host_rank_ << ")"
199  << " to=" << peer_worker_rank()
200  << " (host=" << peer_rank_ << ")";
201 
202  block_counter_++;
203 
204  if (block_queue_) {
205  // StreamData statistics for internal transfer
206  stream_->tx_int_blocks_++;
207  return block_queue_->Close();
208  }
209  if (target_mix_stream_) {
210  // StreamData statistics for internal transfer
211  stream_->tx_int_blocks_++;
212  return target_mix_stream_->OnStreamBlock(
214  }
215 
217  header.magic = magic_;
218  header.stream_id = id_;
221  header.seq = block_counter_ - 1;
222 
224  header.Serialize(bb);
225 
226  net::Buffer buffer = bb.ToBuffer();
227  assert(buffer.size() == MultiplexerHeader::total_size);
228 
229  stream_->sem_queue_.wait(MultiplexerHeader::total_size);
230 
231  // StreamData statistics for network transfer
232  stream_->tx_net_bytes_ += buffer.size();
233  stream_->tx_net_blocks_++;
234  byte_counter_ += buffer.size();
235 
236  stream_->multiplexer_.dispatcher_.AsyncWrite(
237  *connection_, 42 + (connection_->tx_seq_.fetch_add(2) & 0xFFFF),
238  std::move(buffer),
239  [s = stream_](net::Connection&) {
240  s->sem_queue_.signal(MultiplexerHeader::total_size);
241  });
242 
243  Finalize();
244 }
245 
247  logger()
248  << "class" << "StreamSink"
249  << "event" << "close"
250  << "id" << id_
251  << "peer_host" << peer_rank_
252  << "src_worker" << my_worker_rank()
253  << "tgt_worker" << peer_worker_rank()
254  << "items" << item_counter_
255  << "bytes" << byte_counter_
256  << "blocks" << block_counter_
257  << "timespan" << timespan_;
258 }
259 
260 } // namespace data
261 } // namespace thrill
262 
263 /******************************************************************************/
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
virtual void AppendPinnedBlock(PinnedBlock &&b, bool is_last_block)
Appends the PinnedBlock.
Definition: block_sink.hpp:89
uint32_t sender_worker
global worker rank of sender
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:27
void AppendBlock(const Block &block, bool is_last_block) final
Appends data to the StreamSink. Data may be sent but may be delayed.
Definition: stream_sink.cpp:98
Buffer ToBuffer()
Explicit conversion to Buffer MOVING the memory ownership.
void Serialize(net::BufferBuilder &bb) const
Serializes the whole block struct into a buffer.
uint32_t seq
sequence number in Stream
void Close() final
Closes the connection.
static constexpr size_t total_size
size_t local_worker_id_
local worker id to associate pinned block with
Definition: block_sink.hpp:103
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
Block header is sent before a sequence of blocks it indicates the number of elements and their bounda...
PinnedBlock PinWait(size_t local_worker_id) const
Convenience function to call Pin() and wait for the future.
Definition: block.cpp:35
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
size_t my_worker_rank() const
return local worker rank
Definition: stream_sink.cpp:90
void AppendPinnedBlock(PinnedBlock &&block, bool is_last_block) final
Appends data to the StreamSink. Data may be sent but may be delayed.
BufferBuilder represents a dynamically growable area of memory, which can be modified by appending in...
Pure virtual base class for all things that can receive Blocks from a BlockWriter.
Definition: block_sink.hpp:28
size_t peer_worker_rank() const
return remote worker rank
Definition: stream_sink.cpp:94
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
MixStreamDataPtr target_mix_stream_
destination mix stream
size_t local_worker_id() const
local worker id to associate pinned block with
Definition: block_sink.hpp:94
std::atomic< uint32_t > tx_seq_
send sequence
Definition: connection.hpp:598
Simple buffer of characters without initialization or growing functionality.
Definition: buffer.hpp:40
std::string hexdump(const void *const data, size_t size)
Dump a (binary) string as a sequence of uppercase hexadecimal pairs.
Definition: hexdump.cpp:21
void Finalize()
Finalize structure after sending the piggybacked or explicit close.
size_t workers_per_host() const
return number of workers per host
Definition: block_sink.hpp:72
net::Connection * connection_
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
common::JsonLogger & logger()
Returns BlockPool.logger_.
Definition: block_sink.hpp:66
A BlockQueue is a thread-safe queue used to hand-over Block objects between threads.
Definition: block_queue.hpp:47
size_type size() const noexcept
return number of items in Buffer
Definition: buffer.hpp:141
void Close() final
Close called by BlockWriter.
Definition: block_queue.cpp:28
common::StatsTimerStart timespan_