Thrill  0.1
stream_sink.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/stream_sink.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
12 
16 #include <thrill/data/stream.hpp>
17 
18 #include <tlx/string/hexdump.hpp>
19 
20 namespace thrill {
21 namespace data {
22 
24  : BlockSink(nullptr, -1), closed_(true) { }
25 
27  net::Connection* connection,
28  MagicByte magic, StreamId stream_id,
29  size_t host_rank, size_t host_local_worker,
30  size_t peer_rank, size_t peer_local_worker)
31  : BlockSink(block_pool, host_local_worker),
32  stream_(std::move(stream)),
33  connection_(connection),
34  magic_(magic),
35  id_(stream_id),
36  host_rank_(host_rank),
37  peer_rank_(peer_rank),
38  peer_local_worker_(peer_local_worker) {
39  logger()
40  << "class" << "StreamSink"
41  << "event" << "open"
42  << "id" << id_
43  << "peer_host" << peer_rank_
44  << "src_worker" << my_worker_rank()
45  << "tgt_worker" << peer_worker_rank();
46 }
47 
49  BlockQueue* block_queue,
50  StreamId stream_id,
51  size_t host_rank, size_t host_local_worker,
52  size_t peer_rank, size_t peer_local_worker)
53  : BlockSink(block_pool, host_local_worker),
54  stream_(std::move(stream)),
55  block_queue_(block_queue),
56  id_(stream_id),
57  host_rank_(host_rank),
58  peer_rank_(peer_rank),
59  peer_local_worker_(peer_local_worker) {
60  logger()
61  << "class" << "StreamSink"
62  << "event" << "open"
63  << "id" << id_
64  << "peer_host" << peer_rank_
65  << "src_worker" << my_worker_rank()
66  << "tgt_worker" << peer_worker_rank();
67 }
68 
70  MixStreamDataPtr target,
71  StreamId stream_id,
72  size_t host_rank, size_t host_local_worker,
73  size_t peer_rank, size_t peer_local_worker)
74  : BlockSink(block_pool, host_local_worker),
75  stream_(std::move(stream)),
76  target_mix_stream_(target),
77  id_(stream_id),
78  host_rank_(host_rank),
79  peer_rank_(peer_rank),
80  peer_local_worker_(peer_local_worker) {
81  logger()
82  << "class" << "StreamSink"
83  << "event" << "open"
84  << "id" << id_
85  << "peer_host" << peer_rank_
86  << "src_worker" << my_worker_rank()
87  << "tgt_worker" << peer_worker_rank();
88 }
89 
92 }
93 
96 }
97 
98 void StreamSink::AppendBlock(const Block& block, bool is_last_block) {
99  if (block.size() == 0) return;
100 
101  if (block_queue_) {
102  LOG << "StreamSink::AppendBlock()"
103  << " block=" << block
104  << " is_last_block=" << is_last_block
105  << " id_= " << id_
106  << " host_rank_=" << host_rank_
107  << " local_worker_id_=" << local_worker_id_
108  << " peer_rank_=" << peer_rank_
109  << " peer_local_worker_=" << peer_local_worker_
110  << " item_counter_=" << item_counter_
111  << " byte_counter_=" << byte_counter_
112  << " block_counter_=" << block_counter_;
113 
114  // StreamSink statistics
115  item_counter_ += block.num_items();
116  byte_counter_ += block.size();
117  block_counter_++;
118 
119  // StreamData statistics for internal transfer
120  stream_->tx_int_items_ += block.num_items();
121  stream_->tx_int_bytes_ += block.size();
122  stream_->tx_int_blocks_++;
123 
124  return block_queue_->AppendBlock(block, is_last_block);
125  }
126  if (target_mix_stream_) {
127  LOG << "StreamSink::AppendBlock()"
128  << " block=" << block
129  << " is_last_block=" << is_last_block
130  << " id_= " << id_
131  << " host_rank_=" << host_rank_
132  << " local_worker_id_=" << local_worker_id_
133  << " peer_rank_=" << peer_rank_
134  << " peer_local_worker_=" << peer_local_worker_
135  << " item_counter_=" << item_counter_
136  << " byte_counter_=" << byte_counter_
137  << " block_counter_=" << block_counter_;
138 
139  // StreamSink statistics
140  item_counter_ += block.num_items();
141  byte_counter_ += block.size();
142  block_counter_++;
143 
144  // StreamData statistics for internal transfer
145  stream_->tx_int_items_ += block.num_items();
146  stream_->tx_int_bytes_ += block.size();
147  stream_->tx_int_blocks_++;
148 
149  return target_mix_stream_->OnStreamBlock(
150  my_worker_rank(), block_counter_ - 1, Block(block));
151  }
152 
153  // otherwise: pin for network transfer
154  return AppendPinnedBlock(block.PinWait(local_worker_id()), is_last_block);
155 }
156 
157 void StreamSink::AppendBlock(Block&& block, bool is_last_block) {
158  die("FIXME: this should never be used?");
159  return AppendBlock(block, is_last_block);
160 }
161 
162 void StreamSink::AppendPinnedBlock(PinnedBlock&& block, bool is_last_block) {
163  if (block.size() == 0) return;
164 
165  LOG << "StreamSink::AppendPinnedBlock()"
166  << " block=" << block
167  << " is_last_block=" << is_last_block
168  << " id_= " << id_
169  << " host_rank_=" << host_rank_
170  << " local_worker_id_=" << local_worker_id_
171  << " peer_rank_=" << peer_rank_
172  << " peer_local_worker_=" << peer_local_worker_
173  << " item_counter_=" << item_counter_
174  << " byte_counter_=" << byte_counter_
175  << " block_counter_=" << block_counter_;
176 
177  // StreamSink statistics
178  item_counter_ += block.num_items();
179  byte_counter_ += block.size();
180  block_counter_++;
181 
182  if (block_queue_) {
183  // StreamData statistics for internal transfer
184  stream_->tx_int_items_ += block.num_items();
185  stream_->tx_int_bytes_ += block.size();
186  stream_->tx_int_blocks_++;
187 
188  return block_queue_->AppendPinnedBlock(std::move(block), is_last_block);
189  }
190  if (target_mix_stream_) {
191  // StreamData statistics for internal transfer
192  stream_->tx_int_items_ += block.num_items();
193  stream_->tx_int_bytes_ += block.size();
194  stream_->tx_int_blocks_++;
195 
196  return target_mix_stream_->OnStreamBlock(
197  my_worker_rank(), block_counter_ - 1,
198  std::move(block).MoveToBlock());
199  }
200 
201  LOG0 << "StreamSink::AppendPinnedBlock()"
202  << " data=" << tlx::hexdump(block.ToString());
203 
204  StreamMultiplexerHeader header(magic_, block);
205  header.stream_id = id_;
206  header.sender_worker = my_worker_rank();
207  header.receiver_local_worker = peer_local_worker_;
208  header.seq = block_counter_ - 1;
209  header.is_last_block = is_last_block;
210 
212  header.Serialize(bb);
213 
214  net::Buffer buffer = bb.ToBuffer();
215  assert(buffer.size() == MultiplexerHeader::total_size);
216 
217  size_t send_size = buffer.size() + block.size();
218  // stream_->sem_queue_.wait(send_size);
219 
220  // StreamData statistics for network transfer
221  stream_->tx_net_items_ += block.num_items();
222  stream_->tx_net_bytes_ += send_size;
223  stream_->tx_net_blocks_++;
224  byte_counter_ += buffer.size();
225 
226  stream_->multiplexer_.dispatcher_.AsyncWrite(
227  *connection_, 42 + (connection_->tx_seq_.fetch_add(2) & 0xFFFF),
228  // send out Buffer and Block, guaranteed to be successive
229  std::move(buffer), std::move(block),
230  [s = stream_, send_size](net::Connection&) {
231  s->sem_queue_.signal(send_size);
232  });
233 
234  if (is_last_block) {
235  assert(!closed_);
236  closed_ = true;
237 
238  LOG << "StreamSink::AppendPinnedBlock()"
239  << " sent 'piggy-backed close stream' id=" << id_
240  << " from=" << my_worker_rank()
241  << " (host=" << host_rank_ << ")"
242  << " to=" << peer_worker_rank()
243  << " (host=" << peer_rank_ << ")";
244 
245  stream_->OnWriterClosed(peer_worker_rank(), /* sent */ true);
246 
247  Finalize();
248  }
249 }
250 
252  if (closed_) return;
253  closed_ = true;
254 
255  LOG << "StreamSink::Close() sending 'close stream' id=" << id_
256  << " from=" << my_worker_rank()
257  << " (host=" << host_rank_ << ")"
258  << " to=" << peer_worker_rank()
259  << " (host=" << peer_rank_ << ")";
260 
261  block_counter_++;
262 
263  if (block_queue_) {
264  // StreamData statistics for internal transfer
265  stream_->tx_int_blocks_++;
266  stream_->OnWriterClosed(peer_worker_rank(), /* sent */ true);
267  return block_queue_->Close();
268  }
269  if (target_mix_stream_) {
270  // StreamData statistics for internal transfer
271  stream_->tx_int_blocks_++;
272  stream_->OnWriterClosed(peer_worker_rank(), /* sent */ true);
273  return target_mix_stream_->OnStreamBlock(
275  }
276 
277  stream_->OnWriterClosed(peer_worker_rank(), /* sent */ false);
278 
279  Finalize();
280 }
281 
283  logger()
284  << "class" << "StreamSink"
285  << "event" << "close"
286  << "id" << id_
287  << "peer_host" << peer_rank_
288  << "src_worker" << my_worker_rank()
289  << "tgt_worker" << peer_worker_rank()
290  << "items" << item_counter_
291  << "bytes" << byte_counter_
292  << "blocks" << block_counter_
293  << "timespan" << timespan_;
294 }
295 
296 } // namespace data
297 } // namespace thrill
298 
299 /******************************************************************************/
PinnedBlock PinWait(size_t local_worker_id) const
Convenience function to call Pin() and wait for the future.
Definition: block.cpp:35
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
virtual void AppendPinnedBlock(PinnedBlock &&b, bool is_last_block)
Appends the PinnedBlock.
Definition: block_sink.hpp:89
size_t my_worker_rank() const
return local worker rank
Definition: stream_sink.cpp:90
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:27
void AppendBlock(const Block &block, bool is_last_block) final
Appends data to the StreamSink. Data may be sent but may be delayed.
Definition: stream_sink.cpp:98
STL namespace.
Buffer ToBuffer()
Explicit conversion to Buffer MOVING the memory ownership.
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
size_t size() const
return length of valid data in bytes.
Definition: block.hpp:100
void Close() final
Closes the connection.
static constexpr size_t total_size
size_t local_worker_id_
local worker id to associate pinned block with
Definition: block_sink.hpp:103
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
Block header is sent before a sequence of blocks it indicates the number of elements and their bounda...
size_t local_worker_id() const
local worker id to associate pinned block with
Definition: block_sink.hpp:94
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
BlockPool * block_pool() const
Returns block_pool_.
Definition: block_sink.hpp:69
void AppendPinnedBlock(PinnedBlock &&block, bool is_last_block) final
Appends data to the StreamSink. Data may be sent but may be delayed.
void AppendBlock(const Block &b, bool) final
Appends the (unpinned) Block.
Definition: block_queue.hpp:72
size_t num_items() const
return number of items beginning in this block
Definition: block.hpp:85
BufferBuilder represents a dynamically growable area of memory, which can be modified by appending in...
Pure virtual base class for all things that can receive Blocks from a BlockWriter.
Definition: block_sink.hpp:28
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
MixStreamDataPtr target_mix_stream_
destination mix stream
std::atomic< uint32_t > tx_seq_
send sequence
Definition: connection.hpp:598
Simple buffer of characters without initialization or growing functionality.
Definition: buffer.hpp:40
std::string hexdump(const void *const data, size_t size)
Dump a (binary) string as a sequence of uppercase hexadecimal pairs.
Definition: hexdump.cpp:21
void Finalize()
Finalize structure after sending the piggybacked or explicit close.
net::Connection * connection_
size_t peer_worker_rank() const
return remote worker rank
Definition: stream_sink.cpp:94
size_t workers_per_host() const
return number of workers per host
Definition: block_sink.hpp:72
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
common::JsonLogger & logger()
Returns BlockPool.logger_.
Definition: block_sink.hpp:66
A BlockQueue is a thread-safe queue used to hand-over Block objects between threads.
Definition: block_queue.hpp:47
size_type size() const noexcept
return number of items in Buffer
Definition: buffer.hpp:141
void Close() final
Close called by BlockWriter.
Definition: block_queue.cpp:28
common::StatsTimerStart timespan_