Thrill  0.1
multiplexer.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/multiplexer.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Tobias Sturm <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
13 
17 #include <thrill/data/stream.hpp>
19 
21 
22 #include <algorithm>
23 #include <unordered_map>
24 #include <vector>
25 
26 namespace thrill {
27 namespace data {
28 
29 /******************************************************************************/
30 // Repository
31 
32 /*!
33  * A Repository holds obects that are shared among workers. Each object is
34  * addressd via and Id. Workers can allocate new Id independetly but
35  * deterministically (the repository will issue the same id sequence to all
36  * workers). Objects are created inplace via argument forwarding.
37  */
38 template <typename Object>
39 class Repository
40 {
41 public:
42  using Id = size_t;
43  using ObjectPtr = tlx::CountingPtr<Object>;
44 
45  //! construct with initial ids 0.
46  explicit Repository(size_t num_workers_per_node)
47  : next_id_(num_workers_per_node, 0) { }
48 
49  //! Alllocates the next data target.
50  //! Calls to this method alter the internal state -> order of calls is
51  //! important and must be deterministic
52  size_t AllocateId(size_t local_worker_id) {
53  assert(local_worker_id < next_id_.size());
54  return ++next_id_[local_worker_id];
55  }
56 
57  //! Get object with given id, if it does not exist, create it.
58  //! \param object_id of the object
59  //! \param construction parameters forwards to constructor
60  template <typename Subclass = Object, typename... Types>
62  GetOrCreate(Id object_id, Types&& ... construction) {
63  auto it = map_.find(object_id);
64 
65  if (it != map_.end()) {
66  die_unless(dynamic_cast<Subclass*>(it->second.get()));
68  dynamic_cast<Subclass*>(it->second.get()));
69  }
70 
71  // construct new object
72  tlx::CountingPtr<Subclass> value = tlx::make_counting<Subclass>(
73  std::forward<Types>(construction) ...);
74 
75  map_.insert(std::make_pair(object_id, ObjectPtr(value)));
76  return value;
77  }
78 
79  template <typename Subclass = Object>
80  tlx::CountingPtr<Subclass> GetOrDie(Id object_id) {
81  auto it = map_.find(object_id);
82 
83  if (it != map_.end()) {
84  die_unless(dynamic_cast<Subclass*>(it->second.get()));
86  dynamic_cast<Subclass*>(it->second.get()));
87  }
88 
89  die("object " + std::to_string(object_id) + " not in repository");
90  }
91 
92  //! Remove id from map
93  void EraseOrDie(Id object_id) {
94  auto it = map_.find(object_id);
95  if (it != map_.end()) {
96  map_.erase(it);
97  return;
98  }
99  die("object " + std::to_string(object_id) + " not in repository");
100  }
101 
102  //! return mutable reference to map of objects.
103  std::unordered_map<Id, ObjectPtr>& map() { return map_; }
104 
105 private:
106  //! Next ID to generate, one for each local worker.
107  std::vector<size_t> next_id_;
108 
109  //! map containing value items
110  std::unordered_map<Id, ObjectPtr> map_;
111 };
112 
113 /******************************************************************************/
114 // Multiplexer
115 
116 struct Multiplexer::Data {
117  //! Streams have an ID in block headers. (worker id, stream id)
118  Repository<StreamSetBase> stream_sets_;
119 
120  //! array of number of open requests
121  std::vector<std::atomic<size_t> > ongoing_requests_;
122 
123  explicit Data(size_t num_hosts, size_t workers_per_host)
124  : stream_sets_(workers_per_host),
125  ongoing_requests_(num_hosts) { }
126 };
127 
129  net::DispatcherThread& dispatcher, net::Group& group,
130  size_t workers_per_host)
131  : mem_manager_(mem_manager),
132  block_pool_(block_pool),
133  dispatcher_(dispatcher),
134  group_(group),
135  workers_per_host_(workers_per_host),
136  d_(std::make_unique<Data>(group_.num_hosts(), workers_per_host)) {
137 
139  if (num_parallel_async_ == 0) {
140  // one async at a time (for TCP and mock backends)
142  }
143  else {
144  // k/2 asyncs at a time (for MPI backend)
145  num_parallel_async_ /= 2;
146  // at least one
148  }
149 
150  // calculate send queue size limit for StreamData semaphores
151  send_size_limit_ = block_pool.hard_ram_limit() / workers_per_host / 4;
154 
155  // launch initial async reads
156  for (size_t id = 0; id < group_.num_hosts(); id++) {
157  if (id == group_.my_host_rank()) continue;
159  }
160 }
161 
163  std::unique_lock<std::mutex> lock(mutex_);
164 
165  if (!d_->stream_sets_.map().empty()) {
166  LOG1 << "Multiplexer::Close()"
167  << " remaining_streams=" << d_->stream_sets_.map().size();
168  die_unless(d_->stream_sets_.map().empty());
169  }
170 
171  // destroy all still open Streams
172  d_->stream_sets_.map().clear();
173 
174  closed_ = true;
175 }
176 
178  if (!closed_)
179  Close();
180 
181  group_.Close();
182 }
183 
184 size_t Multiplexer::AllocateCatStreamId(size_t local_worker_id) {
185  std::unique_lock<std::mutex> lock(mutex_);
186  return d_->stream_sets_.AllocateId(local_worker_id);
187 }
188 
190  size_t id, size_t local_worker_id, size_t dia_id) {
191  std::unique_lock<std::mutex> lock(mutex_);
192  return IntGetOrCreateCatStreamData(id, local_worker_id, dia_id);
193 }
194 
195 CatStreamPtr Multiplexer::GetNewCatStream(size_t local_worker_id, size_t dia_id) {
196  std::unique_lock<std::mutex> lock(mutex_);
197  return tlx::make_counting<CatStream>(
199  d_->stream_sets_.AllocateId(local_worker_id),
200  local_worker_id, dia_id));
201 }
202 
204  size_t id, size_t local_worker_id, size_t dia_id) {
205  CatStreamDataPtr ptr =
206  d_->stream_sets_.GetOrCreate<CatStreamSet>(
207  id, *this, send_size_limit_, id,
208  workers_per_host_, dia_id)->Peer(local_worker_id);
209  // update dia_id: the stream may have been created before the DIANode
210  // associated with it.
211  if (ptr && ptr->dia_id_ == 0)
212  ptr->set_dia_id(dia_id);
213  return ptr;
214 }
215 
216 size_t Multiplexer::AllocateMixStreamId(size_t local_worker_id) {
217  std::unique_lock<std::mutex> lock(mutex_);
218  return d_->stream_sets_.AllocateId(local_worker_id);
219 }
220 
222  size_t id, size_t local_worker_id, size_t dia_id) {
223  std::unique_lock<std::mutex> lock(mutex_);
224  return IntGetOrCreateMixStreamData(id, local_worker_id, dia_id);
225 }
226 
227 MixStreamPtr Multiplexer::GetNewMixStream(size_t local_worker_id, size_t dia_id) {
228  std::unique_lock<std::mutex> lock(mutex_);
229  return tlx::make_counting<MixStream>(
231  d_->stream_sets_.AllocateId(local_worker_id),
232  local_worker_id, dia_id));
233 }
234 
236  size_t id, size_t local_worker_id, size_t dia_id) {
237  MixStreamDataPtr ptr =
238  d_->stream_sets_.GetOrCreate<MixStreamSet>(
239  id, *this, send_size_limit_, id,
240  workers_per_host_, dia_id)->Peer(local_worker_id);
241  // update dia_id: the stream may have been created before the DIANode
242  // associated with it.
243  if (ptr && ptr->dia_id_ == 0)
244  ptr->set_dia_id(dia_id);
245  return ptr;
246 }
247 
248 void Multiplexer::IntReleaseCatStream(size_t id, size_t local_worker_id) {
249 
251  d_->stream_sets_.GetOrDie<CatStreamSet>(id);
252 
253  sLOG << "Multiplexer::IntReleaseCatStream() release"
254  << "stream" << id << "local_worker_id" << local_worker_id;
255 
256  if (set->Release(local_worker_id)) {
257  LOG << "Multiplexer::IntReleaseCatStream() destroy stream " << id;
258  d_->stream_sets_.EraseOrDie(id);
259  }
260 }
261 
262 void Multiplexer::IntReleaseMixStream(size_t id, size_t local_worker_id) {
263 
265  d_->stream_sets_.GetOrDie<MixStreamSet>(id);
266 
267  sLOG << "Multiplexer::IntReleaseMixStream() release"
268  << "stream" << id << "local_worker_id" << local_worker_id;
269 
270  if (set->Release(local_worker_id)) {
271  LOG << "Multiplexer::IntReleaseMixStream() destroy stream " << id;
272  d_->stream_sets_.EraseOrDie(id);
273  }
274 }
275 
277  return block_pool_.logger();
278 }
279 
280 /******************************************************************************/
281 
283 
284  while (d_->ongoing_requests_[peer] < num_parallel_async_) {
285  uint32_t seq = 42 + (s.rx_seq_.fetch_add(2) & 0xFFFF);
288  [this, peer, seq](Connection& s, net::Buffer&& buffer) {
289  return OnMultiplexerHeader(peer, seq, s, std::move(buffer));
290  });
291 
292  d_->ongoing_requests_[peer]++;
293  }
294 }
295 
297  size_t peer, uint32_t seq, Connection& s, net::Buffer&& buffer) {
298 
299  die_unless(d_->ongoing_requests_[peer] > 0);
300  d_->ongoing_requests_[peer]--;
301 
302  // received invalid Buffer: the connection has closed?
303  if (!buffer.IsValid()) return;
304 
305  net::BufferReader br(buffer);
307 
308  LOG << "OnMultiplexerHeader() header"
309  << " magic=" << unsigned(header.magic)
310  << " size=" << header.size
311  << " num_items=" << header.num_items
312  << " first_item=" << header.first_item
313  << " typecode_verify=" << header.typecode_verify
314  << " stream_id=" << header.stream_id;
315 
316  // received stream id
317  StreamId id = header.stream_id;
318  size_t local_worker = header.receiver_local_worker;
319 
320  // round of allocation size to next power of two
321  size_t alloc_size = header.size;
322  if (alloc_size < THRILL_DEFAULT_ALIGN) alloc_size = THRILL_DEFAULT_ALIGN;
323  alloc_size = tlx::round_up_to_power_of_two(alloc_size);
324 
325  if (header.magic == MagicByte::CatStreamBlock)
326  {
327  if (header.IsAllWorkers()) {
328  sLOG << "end of all stream on" << s << "CatStream" << id
329  << " my_host_rank=" << my_host_rank()
330  << " peer_host_rank=" << header.sender_worker / workers_per_host();
331 
332  for (size_t w = 0; w < workers_per_host(); ++w) {
334  id, w, /* dia_id (unknown at this time) */ 0);
335  if (!stream)
336  continue;
337 
338  for (size_t sender_worker = 0;
339  sender_worker < workers_per_host(); ++sender_worker) {
340  if (!stream->is_queue_closed(
341  header.sender_worker + sender_worker)) {
342  stream->OnStreamBlock(
343  header.sender_worker + sender_worker, header.seq,
344  Block());
345  }
346  }
347  }
348  }
349  else if (header.IsEnd()) {
350  sLOG << "end of stream on" << s << "in CatStream" << id
351  << "from worker" << header.sender_worker
352  << "seq" << header.seq;
353 
355  id, local_worker, /* dia_id (unknown at this time) */ 0);
356  stream->rx_net_bytes_ += buffer.size();
357 
358  stream->OnStreamBlock(
359  header.sender_worker, header.seq, Block());
360  }
361  else {
363  id, local_worker, /* dia_id (unknown at this time) */ 0);
364  stream->rx_net_bytes_ += buffer.size();
365 
366  sLOG << "stream header from" << s << "on CatStream" << id
367  << "from worker" << header.sender_worker
368  << "for local_worker" << local_worker
369  << "seq" << header.seq
370  << "size" << header.size;
371 
373  alloc_size, local_worker);
374  sLOG << "new PinnedByteBlockPtr bytes=" << *bytes;
375 
376  d_->ongoing_requests_[peer]++;
377 
379  s, seq + 1, header.size, std::move(bytes),
380  [this, peer, header, stream](
381  Connection& s, PinnedByteBlockPtr&& bytes) {
382  OnCatStreamBlock(peer, s, header, stream, std::move(bytes));
383  });
384  }
385  }
386  else if (header.magic == MagicByte::MixStreamBlock)
387  {
388  if (header.IsAllWorkers()) {
389  sLOG << "end of all stream on" << s << "MixStream" << id
390  << " my_host_rank=" << my_host_rank()
391  << " peer_host_rank=" << header.sender_worker / workers_per_host();
392 
393  for (size_t w = 0; w < workers_per_host(); ++w) {
395  id, w, /* dia_id (unknown at this time) */ 0);
396  if (!stream)
397  continue;
398 
399  for (size_t sender_worker = 0;
400  sender_worker < workers_per_host(); ++sender_worker) {
401  if (!stream->is_queue_closed(
402  header.sender_worker + sender_worker)) {
403  stream->OnStreamBlock(
404  header.sender_worker + sender_worker, header.seq,
405  Block());
406  }
407  }
408  }
409  }
410  else if (header.IsEnd()) {
411  sLOG << "end of stream on" << s << "in MixStream" << id
412  << "from worker" << header.sender_worker;
413 
415  id, local_worker, /* dia_id (unknown at this time) */ 0);
416  stream->rx_net_bytes_ += buffer.size();
417 
418  stream->OnStreamBlock(header.sender_worker, header.seq, Block());
419  }
420  else {
422  id, local_worker, /* dia_id (unknown at this time) */ 0);
423  stream->rx_net_bytes_ += buffer.size();
424 
425  sLOG << "stream header from" << s << "on MixStream" << id
426  << "from worker" << header.sender_worker
427  << "for local_worker" << local_worker
428  << "seq" << header.seq
429  << "size" << header.size;
430 
432  alloc_size, local_worker);
433 
434  d_->ongoing_requests_[peer]++;
435 
437  s, seq + 1, header.size, std::move(bytes),
438  [this, peer, header, stream](
439  Connection& s, PinnedByteBlockPtr&& bytes) mutable {
440  OnMixStreamBlock(peer, s, header, stream, std::move(bytes));
441  });
442  }
443  }
444  else {
445  die("Invalid magic byte in MultiplexerHeader");
446  }
447 
449 }
450 
452  size_t peer, Connection& s, const StreamMultiplexerHeader& header,
453  const CatStreamDataPtr& stream, PinnedByteBlockPtr&& bytes) {
454 
455  die_unless(d_->ongoing_requests_[peer] > 0);
456  d_->ongoing_requests_[peer]--;
457 
458  sLOG << "Multiplexer::OnCatStreamBlock()"
459  << "got block" << *bytes << "seq" << header.seq << "on" << s
460  << "in CatStream" << header.stream_id
461  << "from worker" << header.sender_worker;
462 
463  stream->OnStreamBlock(
464  header.sender_worker, header.seq,
465  Block(std::move(bytes).ReleasePin(), /* begin */ 0, header.size,
466  header.first_item, header.num_items,
467  header.typecode_verify));
468 
469  if (header.is_last_block)
470  stream->OnStreamBlock(header.sender_worker, header.seq + 1, Block());
471 
473 }
474 
476  size_t peer, Connection& s, const StreamMultiplexerHeader& header,
477  const MixStreamDataPtr& stream, PinnedByteBlockPtr&& bytes) {
478 
479  die_unless(d_->ongoing_requests_[peer] > 0);
480  d_->ongoing_requests_[peer]--;
481 
482  sLOG << "Multiplexer::OnMixStreamBlock()"
483  << "got block" << *bytes << "seq" << header.seq << "on" << s
484  << "in MixStream" << header.stream_id
485  << "from worker" << header.sender_worker;
486 
487  stream->OnStreamBlock(
488  header.sender_worker, header.seq,
489  Block(std::move(bytes).ReleasePin(), /* begin */ 0, header.size,
490  header.first_item, header.num_items,
491  header.typecode_verify));
492 
493  if (header.is_last_block)
494  stream->OnStreamBlock(header.sender_worker, header.seq + 1, Block());
495 
497 }
498 
500  size_t stream_id, size_t to_worker_id) {
501  std::unique_lock<std::mutex> lock(mutex_);
502  return d_->stream_sets_.GetOrDie<CatStreamSet>(stream_id)
503  ->Peer(to_worker_id);
504 }
505 
507  size_t stream_id, size_t to_worker_id) {
508  std::unique_lock<std::mutex> lock(mutex_);
509  return d_->stream_sets_.GetOrDie<MixStreamSet>(stream_id)
510  ->Peer(to_worker_id);
511 }
512 
513 } // namespace data
514 } // namespace thrill
515 
516 /******************************************************************************/
CatStreamPtr GetNewCatStream(size_t local_worker_id, size_t dia_id)
Request next stream.
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
common::JsonLogger & logger()
Get the JsonLogger from the BlockPool.
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
virtual size_t num_parallel_async() const
Definition: group.cpp:166
size_t AllocateMixStreamId(size_t local_worker_id)
Allocate the next stream.
uint32_t sender_worker
global worker rank of sender
#define die_unless(X)
Definition: die.hpp:27
std::mutex mutex_
protects critical sections
bool IsAllWorkers() const
Indicates if this message is for all local workers.
size_t hard_ram_limit() noexcept
Hard limit on amount of memory used for ByteBlock.
Definition: block_pool.cpp:886
void IntReleaseCatStream(size_t id, size_t local_worker_id)
release pointer onto a CatStreamData object
#define LOG1
Definition: logger.hpp:28
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
bool IsEnd() const
Indicates if this is the end-of-line block header.
CatStreamDataPtr GetOrCreateCatStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Get stream with given id, if it does not exist, create it.
MixStreamDataPtr MixLoopback(size_t stream_id, size_t to_worker_id)
static StreamMultiplexerHeader Parse(net::BufferReader &br)
Reads the stream id and the number of elements in this block.
A pinned / pin-counted pointer to a ByteBlock.
Definition: byte_block.hpp:205
STL namespace.
MixStreamDataPtr IntGetOrCreateMixStreamData(size_t id, size_t local_worker_id, size_t dia_id)
uint32_t seq
sequence number in Stream
~Multiplexer()
Closes all client connections.
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
virtual Connection & connection(size_t id)=0
Return Connection to client id.
Simple structure that holds a all stream instances for the workers on the local host for a given stre...
Definition: multiplexer.hpp:33
#define THRILL_DEFAULT_ALIGN
static constexpr size_t total_size
void OnCatStreamBlock(size_t peer, Connection &s, const StreamMultiplexerHeader &header, const CatStreamDataPtr &stream, PinnedByteBlockPtr &&bytes)
Receives and dispatches a Block to a CatStreamData.
BufferReader represents a BufferRef with an additional cursor with which the memory can be read incre...
static by_string to_string(int val)
convert to string
PinnedByteBlockPtr AllocateByteBlock(size_t size, size_t local_worker_id)
Definition: block_pool.cpp:484
void IntReleaseMixStream(size_t id, size_t local_worker_id)
release pointer onto a MixStream object
uint32_t typecode_verify
typecode self verify
virtual void Close()=0
Close.
size_t workers_per_host_
Number of workers per host.
uint32_t is_last_block
is last block piggybacked indicator
size_t my_host_rank() const
my rank among the hosts.
Definition: multiplexer.hpp:93
void OnMixStreamBlock(size_t peer, Connection &s, const StreamMultiplexerHeader &header, const MixStreamDataPtr &stream, PinnedByteBlockPtr &&bytes)
Receives and dispatches a Block to a MixStream.
size_t num_parallel_async_
number of parallel recv requests
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
Block header is sent before a sequence of blocks it indicates the number of elements and their bounda...
void AsyncRead(Connection &c, uint32_t seq, size_t size, const AsyncReadCallback &done_cb)
asynchronously read n bytes and deliver them to the callback
CatStreamDataPtr IntGetOrCreateCatStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Multiplexer(mem::Manager &mem_manager, BlockPool &block_pool, net::DispatcherThread &dispatcher, net::Group &group, size_t workers_per_host)
int value
Definition: gen_data.py:41
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
unique_ptr< T > make_unique(Manager &manager, Args &&... args)
make_unique with Manager tracking
Definition: allocator.hpp:212
std::atomic< uint32_t > rx_seq_
receive sequence
Definition: connection.hpp:601
net::Group & group_
Holds NetConnections for outgoing Streams.
size_t workers_per_host() const
number of workers per host
size_t send_size_limit_
Calculated send queue size limit for StreamData semaphores.
MixStreamPtr GetNewMixStream(size_t local_worker_id, size_t dia_id)
Request next stream.
common::JsonLogger & logger()
Returns logger_.
Definition: block_pool.hpp:85
CatStreamDataPtr CatLoopback(size_t stream_id, size_t to_worker_id)
High-performance smart pointer used as a wrapping reference counting pointer.
size_t AllocateCatStreamId(size_t local_worker_id)
Allocate the next stream.
static const size_t bytes
number of bytes in uint_pair
Definition: uint_types.hpp:75
std::unique_ptr< Data > d_
pimpl data structure
A network Group is a collection of enumerated communication links, which provides point-to-point comm...
Definition: group.hpp:47
net::DispatcherThread & dispatcher_
Object shared by allocators and other classes to track memory allocations.
Definition: manager.hpp:28
size_t StreamId
Definition: stream_data.hpp:32
Simple buffer of characters without initialization or growing functionality.
Definition: buffer.hpp:40
void OnMultiplexerHeader(size_t peer, uint32_t seq, Connection &s, net::Buffer &&buffer)
static int round_up_to_power_of_two(int i)
does what it says: round up to next power of two
JsonLogger is a receiver of JSON output objects for logging.
Definition: json_logger.hpp:69
void AsyncReadMultiplexerHeader(size_t peer, Connection &s)
BlockPool & block_pool_
reference to host-global BlockPool.
void Close()
Closes all client connections.
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
size_t my_host_rank() const
Return our rank among hosts in this group.
Definition: group.hpp:69
virtual size_t num_hosts() const =0
Return number of connections in this group (= number computing hosts)
DispatcherThread contains a net::Dispatcher object and an associated thread that runs in the dispatch...
MixStreamDataPtr GetOrCreateMixStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Get stream with given id, if it does not exist, create it.