Thrill  0.1
cat_stream.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/cat_stream.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  * Copyright (C) 2015 Tobias Sturm <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
13 
17 
19 #include <tlx/string/hexdump.hpp>
20 
21 #include <algorithm>
22 #include <map>
23 #include <vector>
24 
25 namespace thrill {
26 namespace data {
27 
29  Multiplexer& multiplexer, size_t send_size_limit,
30  const StreamId& id,
31  size_t local_worker_id, size_t dia_id)
32  : StreamData(stream_set_base, multiplexer,
33  send_size_limit, id, local_worker_id, dia_id) {
34 
36 
37  queues_.reserve(num_workers());
38  seq_.resize(num_workers());
39 
40  // construct StreamSink array
41  for (size_t host = 0; host < num_hosts(); ++host) {
42  for (size_t worker = 0; worker < workers_per_host(); worker++) {
43  if (host == my_host_rank()) {
44  // construct loopback queue
45 
47  << "class" << "StreamSink"
48  << "event" << "open"
49  << "id" << id_
50  << "peer_host" << host
51  << "src_worker" << my_worker_rank()
52  << "tgt_worker" << (host * workers_per_host() + worker)
53  << "loopback" << true;
54 
55  queues_.emplace_back(
56  multiplexer_.block_pool_, local_worker_id, dia_id,
57  // OnClose callback to BlockQueue to deliver stats, keep a
58  // smart pointer reference to this
59  [p = CatStreamDataPtr(this)](BlockQueue& queue) {
60  p->rx_int_items_ += queue.item_counter();
61  p->rx_int_bytes_ += queue.byte_counter();
62  p->rx_int_blocks_ += queue.block_counter();
63  });
64  }
65  else {
66  // construct inbound BlockQueues
67  queues_.emplace_back(
68  multiplexer_.block_pool_, local_worker_id, dia_id);
69  }
70  }
71  }
72 }
73 
75  LOG << "~CatStreamData() deleted";
76 }
77 
78 void CatStreamData::set_dia_id(size_t dia_id) {
79  dia_id_ = dia_id;
80  for (size_t i = 0; i < queues_.size(); ++i) {
81  queues_[i].set_dia_id(dia_id);
82  }
83 }
84 
86  return "CatStream";
87 }
88 
90  size_t hard_ram_limit = multiplexer_.block_pool_.hard_ram_limit();
91  size_t block_size_base = hard_ram_limit / 4
93  size_t block_size = tlx::round_down_to_power_of_two(block_size_base);
94  if (block_size == 0 || block_size > default_block_size)
95  block_size = default_block_size;
96 
97  {
98  std::unique_lock<std::mutex> lock(multiplexer_.mutex_);
103  }
104 
105  LOGC(my_worker_rank() == 0 && 0)
106  << "CatStreamData::GetWriters()"
107  << " hard_ram_limit=" << hard_ram_limit
108  << " block_size_base=" << block_size_base
109  << " block_size=" << block_size
110  << " active_streams=" << multiplexer_.active_streams_
111  << " max_active_streams=" << multiplexer_.max_active_streams_;
112 
113  tx_timespan_.StartEventually();
114 
115  Writers result(my_worker_rank());
116  result.reserve(num_workers());
117 
118  for (size_t host = 0; host < num_hosts(); ++host) {
119  for (size_t worker = 0; worker < workers_per_host(); ++worker) {
120  if (host == my_host_rank()) {
121  // construct loopback queue writer
122  auto target_stream_ptr = multiplexer_.CatLoopback(id_, worker);
123  BlockQueue* sink_queue_ptr =
124  target_stream_ptr->loopback_queue(local_worker_id_);
125  result.emplace_back(
126  StreamSink(
127  StreamDataPtr(this),
129  sink_queue_ptr,
130  id_,
132  host, worker),
133  block_size);
134  }
135  else {
136  result.emplace_back(
137  StreamSink(
138  StreamDataPtr(this),
142  id_,
144  host, worker),
145  block_size);
146  }
147  }
148  }
149 
150  assert(result.size() == num_workers());
151  return result;
152 }
153 
154 std::vector<CatStreamData::Reader> CatStreamData::GetReaders() {
155  rx_timespan_.StartEventually();
156 
157  std::vector<BlockQueueReader> result;
158  result.reserve(num_workers());
159 
160  for (size_t worker = 0; worker < num_workers(); ++worker) {
161  result.emplace_back(
163  }
164 
165  assert(result.size() == num_workers());
166  return result;
167 }
168 
170  rx_timespan_.StartEventually();
171 
172  // construct vector of BlockSources to read from queues_.
173  std::vector<DynBlockSource> result;
174  result.reserve(num_workers());
175 
176  for (size_t worker = 0; worker < num_workers(); ++worker) {
177  result.emplace_back(
178  queues_[worker].GetBlockSource(consume, local_worker_id_));
179  }
180 
181  // move BlockQueueSources into concatenation BlockSource, and to Reader.
182  return CatBlockSource(std::move(result));
183 }
184 
186  return CatBlockReader(GetCatBlockSource(consume));
187 }
188 
190  return GetCatReader(consume);
191 }
192 
194  if (is_closed_) return;
195  is_closed_ = true;
196 
197  sLOG << "CatStreamData" << id() << "close"
198  << "host" << my_host_rank()
199  << "local_worker_id_" << local_worker_id_;
200 
201  // close loop-back queue from this worker to itself
202  auto my_global_worker_id = my_worker_rank();
203  if (!queues_[my_global_worker_id].write_closed())
204  queues_[my_global_worker_id].Close();
205 
206  // wait for close packets to arrive
207  for (size_t i = 0; i < queues_.size() - workers_per_host(); ++i)
209 
211 
212  {
213  std::unique_lock<std::mutex> lock(multiplexer_.mutex_);
215  multiplexer_.IntReleaseCatStream(id_, local_worker_id_);
216  }
217 
218  LOG << "CatStreamData::Close() finished"
219  << " id_=" << id_
220  << " local_worker_id_=" << local_worker_id_;
221 }
222 
223 bool CatStreamData::closed() const {
224  bool closed = true;
225  for (auto& q : queues_) {
226  closed = closed && q.write_closed();
227  }
228  return closed;
229 }
230 
232  return queues_[from].write_closed();
233 }
234 
235 struct CatStreamData::SeqReordering {
236  //! current top sequence number
237  uint32_t seq_ = 0;
238 
239  //! queue of waiting Blocks, ordered by sequence number
240  std::map<uint32_t, Block> waiting_;
241 };
242 
243 void CatStreamData::OnStreamBlock(size_t from, uint32_t seq, Block&& b) {
244  assert(from < queues_.size());
245  rx_timespan_.StartEventually();
246 
247  LOG << "OnCatStreamBlock"
248  << " from=" << from
249  << " seq=" << seq
250  << " b=" << b;
251 
252  if (debug_data) {
253  sLOG << "stream" << id_ << "receive from" << from << ":"
254  << tlx::hexdump(b.PinWait(local_worker_id_).ToString());
255  }
256 
257  if (TLX_UNLIKELY(seq != seq_[from].seq_ &&
259  // sequence mismatch: put into queue
260  die_unless(seq >= seq_[from].seq_);
261 
262  seq_[from].waiting_.insert(
263  std::make_pair(seq, std::move(b)));
264 
265  return;
266  }
267 
268  OnStreamBlockOrdered(from, std::move(b));
269 
270  // try to process additional queued blocks
271  while (!seq_[from].waiting_.empty() &&
272  (seq_[from].waiting_.begin()->first == seq_[from].seq_ ||
273  seq_[from].waiting_.begin()->first == StreamMultiplexerHeader::final_seq))
274  {
275  sLOG << "CatStreamData::OnStreamBlock"
276  << "processing delayed block with seq"
277  << seq_[from].waiting_.begin()->first;
278 
280  from, std::move(seq_[from].waiting_.begin()->second));
281 
282  seq_[from].waiting_.erase(
283  seq_[from].waiting_.begin());
284  }
285 }
286 
288  if (b.IsValid()) {
289  rx_net_items_ += b.num_items();
290  rx_net_bytes_ += b.size();
291  rx_net_blocks_++;
292 
293  queues_[from].AppendBlock(std::move(b), /* is_last_block */ false);
294  }
295  else {
296  sLOG << "CatStreamData::OnCloseStream"
297  << "stream" << id_
298  << "from" << from
299  << "for worker" << my_worker_rank()
300  << "remaining_closing_blocks_" << remaining_closing_blocks_;
301 
302  queues_[from].Close();
303 
304  die_unless(remaining_closing_blocks_ > 0);
305  if (--remaining_closing_blocks_ == 0) {
306  rx_lifetime_.StopEventually();
307  rx_timespan_.StopEventually();
308  }
309 
311  }
312 
313  seq_[from].seq_++;
314 }
315 
316 BlockQueue* CatStreamData::loopback_queue(size_t from_worker_id) {
317  assert(from_worker_id < workers_per_host());
318  size_t global_worker_rank = workers_per_host() * my_host_rank() + from_worker_id;
319  sLOG << "expose loopback queue for" << from_worker_id << "->" << local_worker_id_;
320  return &(queues_[global_worker_rank]);
321 }
322 
323 /******************************************************************************/
324 // CatStream
325 
327  : ptr_(ptr) { }
328 
330  ptr_->Close();
331 }
332 
333 const StreamId& CatStream::id() const {
334  return ptr_->id();
335 }
336 
338  return *ptr_;
339 }
340 
341 const StreamData& CatStream::data() const {
342  return *ptr_;
343 }
344 
346  return ptr_->GetWriters();
347 }
348 
349 std::vector<CatStream::Reader> CatStream::GetReaders() {
350  return ptr_->GetReaders();
351 }
352 
354  return ptr_->GetCatReader(consume);
355 }
356 
358  return ptr_->GetReader(consume);
359 }
360 
361 } // namespace data
362 } // namespace thrill
363 
364 /******************************************************************************/
std::atomic< size_t > rx_net_items_
Writers GetWriters() final
Definition: cat_stream.cpp:89
bool all_writers_closed_
bool if all writers were closed
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
std::vector< BlockQueue > queues_
BlockQueues to store incoming Blocks with no attached destination.
Definition: cat_stream.hpp:131
common::JsonLogger & logger()
Get the JsonLogger from the BlockPool.
common::StatsTimerStopped tx_timespan_
Timers from first rx / tx package until rx / tx direction is closed.
const StreamId & id() const final
Return stream id.
Definition: cat_stream.cpp:333
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
bool is_queue_closed(size_t from)
check if inbound queue is closed
Definition: cat_stream.cpp:231
Base class for common structures for ConcatStream and MixedStream.
Definition: stream_data.hpp:46
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
CatReader GetReader(bool consume)
Open a CatReader (function name matches a method in File and MixStream).
Definition: cat_stream.cpp:357
#define die_unless(X)
Definition: die.hpp:27
std::mutex mutex_
protects critical sections
size_t hard_ram_limit() noexcept
Hard limit on amount of memory used for ByteBlock.
Definition: block_pool.cpp:886
void IntReleaseCatStream(size_t id, size_t local_worker_id)
release pointer onto a CatStreamData object
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
size_t my_worker_rank() const
Returns my_worker_rank_.
CatReader GetCatReader(bool consume)
Definition: cat_stream.cpp:185
const char * stream_type() final
return stream type string
Definition: cat_stream.cpp:85
CatBlockSource is a BlockSource which concatenates all Blocks available from a vector of BlockSources...
std::atomic< size_t > rx_net_bytes_
Multiplexer & multiplexer_
reference to multiplexer
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
CatReader GetReader(bool consume)
Open a CatReader (function name matches a method in File and MixStream).
Definition: cat_stream.cpp:189
virtual Connection & connection(size_t id)=0
Return Connection to client id.
tlx::CountingPtr< StreamData > StreamDataPtr
StreamData & data() final
Return stream data reference.
Definition: cat_stream.cpp:337
Multiplexes virtual Connections on Dispatcher.
Definition: multiplexer.hpp:67
std::atomic< size_t > remaining_closing_blocks_
std::atomic< size_t > max_active_streams_
maximu number of active Cat/MixStreams
friend class StreamSink
friends for access to multiplexer_
size_t local_worker_id_
local worker id
CatReader GetCatReader(bool consume)
Definition: cat_stream.cpp:353
std::vector< Reader > GetReaders()
Definition: cat_stream.cpp:349
common::StatsTimerStopped rx_timespan_
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:59
size_t item_counter() const
Returns item_counter_.
std::vector< Reader > GetReaders()
Definition: cat_stream.cpp:154
void Close() final
shuts the stream down.
Definition: cat_stream.cpp:193
size_t num_hosts() const
Number of hosts in system.
Definition: stream_data.hpp:99
CatBlockSource GetCatBlockSource(bool consume)
Gets a CatBlockSource which includes all incoming queues of this stream.
Definition: cat_stream.cpp:169
size_t wait(size_t delta=1, size_t slack=0)
Definition: semaphore.hpp:60
size_t signal()
Definition: semaphore.hpp:44
net::Group & group_
Holds NetConnections for outgoing Streams.
size_t workers_per_host() const
number of workers per host
void OnStreamBlock(size_t from, uint32_t seq, Block &&b)
Definition: cat_stream.cpp:243
ConsumeBlockQueueSource BlockQueueSource
Definition: cat_stream.hpp:60
size_t block_counter() const
Returns block_counter_.
tlx::Semaphore sem_closing_blocks_
number of received stream closing Blocks.
data::CatBlockSource< DynBlockSource > CatBlockSource
Definition: cat_stream.hpp:63
Base class for StreamSet.
CatStreamData(StreamSetBase *stream_set_base, Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
Creates a new stream instance.
Definition: cat_stream.cpp:28
CatStreamDataPtr ptr_
Definition: cat_stream.hpp:195
CatStreamDataPtr CatLoopback(size_t stream_id, size_t to_worker_id)
size_t byte_counter() const
Returns byte_counter_.
const StreamId & id() const
Return stream id.
Definition: stream_data.hpp:91
BlockQueue * loopback_queue(size_t from_worker_id)
Returns the loopback queue for the worker of this stream.
Definition: cat_stream.cpp:316
void OnStreamBlockOrdered(size_t from, Block &&b)
Definition: cat_stream.cpp:287
StreamId id_
our own stream id.
BlockReader< CatBlockSource > CatBlockReader
Definition: cat_stream.hpp:64
static constexpr bool debug_data
Definition: cat_stream.hpp:58
size_t dia_id_
associated DIANode id.
size_t num_workers() const
Number of workers in system.
size_t my_host_rank() const
Returns my_host_rank.
Definition: stream_data.hpp:97
static int round_down_to_power_of_two(int i)
does what it says: round down to next power of two
size_t StreamId
Definition: stream_data.hpp:32
bool closed() const final
Definition: cat_stream.cpp:223
void set_dia_id(size_t dia_id)
Definition: cat_stream.cpp:78
size_t num_workers() const
total number of workers.
Definition: multiplexer.hpp:98
std::string hexdump(const void *const data, size_t size)
Dump a (binary) string as a sequence of uppercase hexadecimal pairs.
Definition: hexdump.cpp:21
std::atomic< size_t > rx_net_blocks_
Writers GetWriters() final
Definition: cat_stream.cpp:345
BlockPool & block_pool_
reference to host-global BlockPool.
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
std::atomic< size_t > active_streams_
number of active Cat/MixStreams
CatStream(const CatStreamDataPtr &ptr)
Definition: cat_stream.cpp:326
A BlockQueue is a thread-safe queue used to hand-over Block objects between threads.
Definition: block_queue.hpp:47
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
size_t workers_per_host() const
Returns workers_per_host.
common::StatsTimerStart rx_lifetime_
static const uint32_t final_seq
final sequence number
std::vector< SeqReordering > seq_
Block Sequence numbers.
Definition: cat_stream.hpp:125