Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
cat_stream.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/cat_stream.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  * Copyright (C) 2015 Tobias Sturm <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
13 
17 
19 #include <tlx/string/hexdump.hpp>
20 
21 #include <algorithm>
22 #include <map>
23 #include <vector>
24 
25 namespace thrill {
26 namespace data {
27 
28 CatStreamData::CatStreamData(Multiplexer& multiplexer, size_t send_size_limit,
29  const StreamId& id,
30  size_t local_worker_id, size_t dia_id)
31  : StreamData(multiplexer, send_size_limit, id, local_worker_id, dia_id) {
32 
34 
35  queues_.reserve(num_workers());
36  seq_.resize(num_workers());
37 
38  // construct StreamSink array
39  for (size_t host = 0; host < num_hosts(); ++host) {
40  for (size_t worker = 0; worker < workers_per_host(); worker++) {
41  if (host == my_host_rank()) {
42  // construct loopback queue
43 
45  << "class" << "StreamSink"
46  << "event" << "open"
47  << "id" << id_
48  << "peer_host" << host
49  << "src_worker" << my_worker_rank()
50  << "tgt_worker" << (host * workers_per_host() + worker)
51  << "loopback" << true;
52 
53  queues_.emplace_back(
54  multiplexer_.block_pool_, local_worker_id, dia_id,
55  // OnClose callback to BlockQueue to deliver stats, keep a
56  // smart pointer reference to this
57  [p = CatStreamDataPtr(this)](BlockQueue& queue) {
58  p->rx_int_items_ += queue.item_counter();
59  p->rx_int_bytes_ += queue.byte_counter();
60  p->rx_int_blocks_ += queue.block_counter();
61  });
62  }
63  else {
64  // construct inbound BlockQueues
65  queues_.emplace_back(
66  multiplexer_.block_pool_, local_worker_id, dia_id);
67  }
68  }
69  }
70 }
71 
73  LOG << "~CatStreamData() deleted";
74 }
75 
76 void CatStreamData::set_dia_id(size_t dia_id) {
77  dia_id_ = dia_id;
78  for (size_t i = 0; i < queues_.size(); ++i) {
79  queues_[i].set_dia_id(dia_id);
80  }
81 }
82 
84  size_t hard_ram_limit = multiplexer_.block_pool_.hard_ram_limit();
85  size_t block_size_base = hard_ram_limit / 4
87  size_t block_size = tlx::round_down_to_power_of_two(block_size_base);
88  if (block_size == 0 || block_size > default_block_size)
89  block_size = default_block_size;
90 
91  {
92  std::unique_lock<std::mutex> lock(multiplexer_.mutex_);
97  }
98 
99  LOGC(my_worker_rank() == 0 && 1)
100  << "CatStreamData::GetWriters()"
101  << " hard_ram_limit=" << hard_ram_limit
102  << " block_size_base=" << block_size_base
103  << " block_size=" << block_size
104  << " active_streams=" << multiplexer_.active_streams_
105  << " max_active_streams=" << multiplexer_.max_active_streams_;
106 
107  tx_timespan_.StartEventually();
108 
109  Writers result(my_worker_rank());
110  result.reserve(num_workers());
111 
112  for (size_t host = 0; host < num_hosts(); ++host) {
113  for (size_t worker = 0; worker < workers_per_host(); ++worker) {
114  if (host == my_host_rank()) {
115  // construct loopback queue writer
116  auto target_stream_ptr = multiplexer_.CatLoopback(id_, worker);
117  BlockQueue* sink_queue_ptr =
118  target_stream_ptr->loopback_queue(local_worker_id_);
119  result.emplace_back(
120  StreamSink(
121  StreamDataPtr(this),
123  sink_queue_ptr,
124  id_,
126  host, worker),
127  block_size);
128  }
129  else {
130  result.emplace_back(
131  StreamSink(
132  StreamDataPtr(this),
136  id_,
138  host, worker),
139  block_size);
140  }
141  }
142  }
143 
144  assert(result.size() == num_workers());
145  return result;
146 }
147 
148 std::vector<CatStreamData::Reader> CatStreamData::GetReaders() {
149  rx_timespan_.StartEventually();
150 
151  std::vector<BlockQueueReader> result;
152  result.reserve(num_workers());
153 
154  for (size_t worker = 0; worker < num_workers(); ++worker) {
155  result.emplace_back(
157  }
158 
159  assert(result.size() == num_workers());
160  return result;
161 }
162 
164  rx_timespan_.StartEventually();
165 
166  // construct vector of BlockSources to read from queues_.
167  std::vector<DynBlockSource> result;
168  result.reserve(num_workers());
169 
170  for (size_t worker = 0; worker < num_workers(); ++worker) {
171  result.emplace_back(
172  queues_[worker].GetBlockSource(consume, local_worker_id_));
173  }
174 
175  // move BlockQueueSources into concatenation BlockSource, and to Reader.
176  return CatBlockSource(std::move(result));
177 }
178 
180  return CatBlockReader(GetCatBlockSource(consume));
181 }
182 
184  return GetCatReader(consume);
185 }
186 
188  if (is_closed_) return;
189  is_closed_ = true;
190 
191  sLOG << "CatStreamData" << id() << "close"
192  << "host" << my_host_rank()
193  << "local_worker_id_" << local_worker_id_;
194 
195  // close loop-back queue from this worker to itself
196  auto my_global_worker_id = my_worker_rank();
197  if (!queues_[my_global_worker_id].write_closed())
198  queues_[my_global_worker_id].Close();
199 
200  // wait for close packets to arrive
201  for (size_t i = 0; i < queues_.size() - workers_per_host(); ++i)
203 
204  tx_lifetime_.StopEventually();
205  tx_timespan_.StopEventually();
206  OnAllClosed("CatStreamData");
207 
208  {
209  std::unique_lock<std::mutex> lock(multiplexer_.mutex_);
211  multiplexer_.IntReleaseCatStream(id_, local_worker_id_);
212  }
213 
214  LOG << "CatStreamData::Close() finished"
215  << " id_=" << id_
216  << " local_worker_id_=" << local_worker_id_;
217 }
218 
219 bool CatStreamData::closed() const {
220  bool closed = true;
221  for (auto& q : queues_) {
222  closed = closed && q.write_closed();
223  }
224  return closed;
225 }
226 
227 struct CatStreamData::SeqReordering {
228  //! current top sequence number
229  uint32_t seq_ = 0;
230 
231  //! queue of waiting Blocks, ordered by sequence number
232  std::map<uint32_t, PinnedBlock> waiting_;
233 };
234 
235 void CatStreamData::OnStreamBlock(size_t from, uint32_t seq, PinnedBlock&& b) {
236  assert(from < queues_.size());
237  rx_timespan_.StartEventually();
238 
239  rx_net_items_ += b.num_items();
240  rx_net_bytes_ += b.size();
241  rx_net_blocks_++;
242 
243  LOG << "OnCatStreamBlock"
244  << " from=" << from
245  << " seq=" << seq
246  << " b=" << b;
247 
248  if (debug_data) {
249  sLOG << "stream" << id_ << "receive from" << from << ":"
250  << tlx::hexdump(b.ToString());
251  }
252 
253  if (TLX_UNLIKELY(seq != seq_[from].seq_)) {
254  // sequence mismatch: put into queue
255  die_unless(seq >= seq_[from].seq_);
256 
257  seq_[from].waiting_.insert(
258  std::make_pair(seq, std::move(b)));
259 
260  return;
261  }
262 
263  OnStreamBlockOrdered(from, std::move(b));
264 
265  // try to process additional queued blocks
266  while (!seq_[from].waiting_.empty() &&
267  seq_[from].waiting_.begin()->first == seq_[from].seq_)
268  {
269  sLOG << "CatStreamData::OnStreamBlock"
270  << "processing delayed block with seq"
271  << seq_[from].waiting_.begin()->first;
272 
274  from, std::move(seq_[from].waiting_.begin()->second));
275 
276  seq_[from].waiting_.erase(
277  seq_[from].waiting_.begin());
278  }
279 }
280 
282  if (b.IsValid()) {
283  queues_[from].AppendPinnedBlock(std::move(b), /* is_last_block */ false);
284  }
285  else {
286  sLOG << "CatStreamData::OnCloseStream"
287  << "stream" << id_
288  << "from" << from
289  << "for worker" << my_worker_rank()
290  << "remaining_closing_blocks_" << remaining_closing_blocks_;
291 
292  queues_[from].Close();
293 
294  die_unless(remaining_closing_blocks_ > 0);
295  if (--remaining_closing_blocks_ == 0) {
296  rx_lifetime_.StopEventually();
297  rx_timespan_.StopEventually();
298  }
299 
301  }
302 
303  seq_[from].seq_++;
304 }
305 
306 BlockQueue* CatStreamData::loopback_queue(size_t from_worker_id) {
307  assert(from_worker_id < workers_per_host());
308  size_t global_worker_rank = workers_per_host() * my_host_rank() + from_worker_id;
309  sLOG << "expose loopback queue for" << from_worker_id << "->" << local_worker_id_;
310  return &(queues_[global_worker_rank]);
311 }
312 
313 /******************************************************************************/
314 // CatStream
315 
317  : ptr_(ptr) { }
318 
320  ptr_->Close();
321 }
322 
323 const StreamId& CatStream::id() const {
324  return ptr_->id();
325 }
326 
328  return *ptr_;
329 }
330 
331 const StreamData& CatStream::data() const {
332  return *ptr_;
333 }
334 
336  return ptr_->GetWriters();
337 }
338 
339 std::vector<CatStream::Reader> CatStream::GetReaders() {
340  return ptr_->GetReaders();
341 }
342 
344  return ptr_->GetCatReader(consume);
345 }
346 
348  return ptr_->GetReader(consume);
349 }
350 
351 } // namespace data
352 } // namespace thrill
353 
354 /******************************************************************************/
std::atomic< size_t > rx_net_items_
Writers GetWriters() final
Definition: cat_stream.cpp:83
std::vector< BlockQueue > queues_
BlockQueues to store incoming Blocks with no attached destination.
Definition: cat_stream.hpp:124
common::JsonLogger & logger()
Get the JsonLogger from the BlockPool.
common::StatsTimerStopped tx_timespan_
Timers from first rx / tx package until rx / tx direction is closed.
const StreamId & id() const final
Return stream id.
Definition: cat_stream.cpp:323
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
Base class for common structures for ConcatStream and MixedStream.
Definition: stream_data.hpp:45
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
size_t workers_per_host() const
Returns workers_per_host.
Definition: stream_data.hpp:97
CatReader GetReader(bool consume)
Open a CatReader (function name matches a method in File and MixStream).
Definition: cat_stream.cpp:347
#define die_unless(X)
Definition: die.hpp:27
std::mutex mutex_
protects critical sections
int round_down_to_power_of_two(int i)
does what it says: round down to next power of two
size_t hard_ram_limit() noexcept
Hard limit on amount of memory used for ByteBlock.
Definition: block_pool.cpp:886
void IntReleaseCatStream(size_t id, size_t local_worker_id)
release pointer onto a CatStreamData object
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
CatReader GetCatReader(bool consume)
Definition: cat_stream.cpp:179
size_t wait(size_t delta=1)
Definition: semaphore.hpp:60
size_t num_hosts() const
Number of hosts in system.
Definition: stream_data.hpp:92
CatBlockSource is a BlockSource which concatenates all Blocks available from a vector of BlockSources...
std::atomic< size_t > rx_net_bytes_
Multiplexer & multiplexer_
reference to multiplexer
size_t block_counter() const
Returns block_counter_.
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
CatReader GetReader(bool consume)
Open a CatReader (function name matches a method in File and MixStream).
Definition: cat_stream.cpp:183
size_t my_host_rank() const
Returns my_host_rank.
Definition: stream_data.hpp:90
virtual Connection & connection(size_t id)=0
Return Connection to client id.
tlx::CountingPtr< StreamData > StreamDataPtr
StreamData & data() final
Return stream data reference.
Definition: cat_stream.cpp:327
Multiplexes virtual Connections on Dispatcher.
Definition: multiplexer.hpp:67
std::atomic< size_t > remaining_closing_blocks_
std::atomic< size_t > max_active_streams_
maximu number of active Cat/MixStreams
size_t num_workers() const
total number of workers.
Definition: multiplexer.hpp:98
common::StatsTimerStart tx_lifetime_
Timers from creation of stream until rx / tx direction is closed.
void OnStreamBlock(size_t from, uint32_t seq, PinnedBlock &&b)
Definition: cat_stream.cpp:235
friend class StreamSink
friends for access to multiplexer_
CatReader GetCatReader(bool consume)
Definition: cat_stream.cpp:343
size_t my_worker_rank() const
Returns my_worker_rank_.
Definition: stream_data.hpp:99
std::vector< Reader > GetReaders()
Definition: cat_stream.cpp:339
common::StatsTimerStopped rx_timespan_
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
std::vector< Reader > GetReaders()
Definition: cat_stream.cpp:148
void Close() final
shuts the stream down.
Definition: cat_stream.cpp:187
size_t item_counter() const
Returns item_counter_.
CatBlockSource GetCatBlockSource(bool consume)
Gets a CatBlockSource which includes all incoming queues of this stream.
Definition: cat_stream.cpp:163
size_t signal()
Definition: semaphore.hpp:44
ConsumeBlockQueueSource BlockQueueSource
Definition: cat_stream.hpp:60
CatStreamData(Multiplexer &multiplexer, size_t send_size_limit, const StreamId &id, size_t local_worker_id, size_t dia_id)
Creates a new stream instance.
Definition: cat_stream.cpp:28
tlx::Semaphore sem_closing_blocks_
number of received stream closing Blocks.
data::CatBlockSource< DynBlockSource > CatBlockSource
Definition: cat_stream.hpp:63
CatStreamDataPtr ptr_
Definition: cat_stream.hpp:188
CatStreamDataPtr CatLoopback(size_t stream_id, size_t to_worker_id)
BlockQueue * loopback_queue(size_t from_worker_id)
Returns the loopback queue for the worker of this stream.
Definition: cat_stream.cpp:306
StreamId id_
our own stream id.
BlockReader< CatBlockSource > CatBlockReader
Definition: cat_stream.hpp:64
static constexpr bool debug_data
Definition: cat_stream.hpp:58
void OnAllClosed(const char *stream_type)
Definition: stream_data.cpp:35
size_t dia_id_
Associated DIANode id.
size_t byte_counter() const
Returns byte_counter_.
size_t workers_per_host() const
number of workers per host
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
size_t StreamId
Definition: stream_data.hpp:32
bool closed() const final
Definition: cat_stream.cpp:219
void set_dia_id(size_t dia_id)
Definition: cat_stream.cpp:76
std::string hexdump(const void *const data, size_t size)
Dump a (binary) string as a sequence of uppercase hexadecimal pairs.
Definition: hexdump.cpp:21
const StreamId & id() const
Return stream id.
Definition: stream_data.hpp:87
std::atomic< size_t > rx_net_blocks_
Writers GetWriters() final
Definition: cat_stream.cpp:335
BlockPool & block_pool_
reference to host-global BlockPool.
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
std::atomic< size_t > active_streams_
number of active Cat/MixStreams
void OnStreamBlockOrdered(size_t from, PinnedBlock &&b)
Definition: cat_stream.cpp:281
CatStream(const CatStreamDataPtr &ptr)
Definition: cat_stream.cpp:316
size_t num_workers() const
Number of workers in system.
Definition: stream_data.hpp:94
A BlockQueue is a thread-safe queue used to hand-over Block objects between threads.
Definition: block_queue.hpp:47
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
common::StatsTimerStart rx_lifetime_
std::vector< SeqReordering > seq_
Block Sequence numbers.
Definition: cat_stream.hpp:118