Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
multiplexer.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/multiplexer.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Tobias Sturm <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
13 
17 #include <thrill/data/stream.hpp>
19 
21 
22 #include <algorithm>
23 #include <unordered_map>
24 #include <vector>
25 
26 namespace thrill {
27 namespace data {
28 
29 /******************************************************************************/
30 // Repository
31 
32 /*!
33  * A Repository holds obects that are shared among workers. Each object is
34  * addressd via and Id. Workers can allocate new Id independetly but
35  * deterministically (the repository will issue the same id sequence to all
36  * workers). Objects are created inplace via argument forwarding.
37  */
38 template <typename Object>
39 class Repository
40 {
41 public:
42  using Id = size_t;
43  using ObjectPtr = tlx::CountingPtr<Object>;
44 
45  //! construct with initial ids 0.
46  explicit Repository(size_t num_workers_per_node)
47  : next_id_(num_workers_per_node, 0) { }
48 
49  //! Alllocates the next data target.
50  //! Calls to this method alter the internal state -> order of calls is
51  //! important and must be deterministic
52  size_t AllocateId(size_t local_worker_id) {
53  assert(local_worker_id < next_id_.size());
54  return next_id_[local_worker_id]++;
55  }
56 
57  //! Get object with given id, if it does not exist, create it.
58  //! \param object_id of the object
59  //! \param construction parameters forwards to constructor
60  template <typename Subclass = Object, typename... Types>
62  GetOrCreate(Id object_id, Types&& ... construction) {
63  auto it = map_.find(object_id);
64 
65  if (it != map_.end()) {
66  die_unless(dynamic_cast<Subclass*>(it->second.get()));
68  dynamic_cast<Subclass*>(it->second.get()));
69  }
70 
71  // construct new object
72  tlx::CountingPtr<Subclass> value = tlx::make_counting<Subclass>(
73  std::forward<Types>(construction) ...);
74 
75  map_.insert(std::make_pair(object_id, ObjectPtr(value)));
76  return value;
77  }
78 
79  template <typename Subclass = Object>
80  tlx::CountingPtr<Subclass> GetOrDie(Id object_id) {
81  auto it = map_.find(object_id);
82 
83  if (it != map_.end()) {
84  die_unless(dynamic_cast<Subclass*>(it->second.get()));
86  dynamic_cast<Subclass*>(it->second.get()));
87  }
88 
89  die("object " + std::to_string(object_id) + " not in repository");
90  }
91 
92  //! Remove id from map
93  void EraseOrDie(Id object_id) {
94  auto it = map_.find(object_id);
95  if (it != map_.end()) {
96  map_.erase(it);
97  return;
98  }
99  die("object " + std::to_string(object_id) + " not in repository");
100  }
101 
102  //! return mutable reference to map of objects.
103  std::unordered_map<Id, ObjectPtr>& map() { return map_; }
104 
105 private:
106  //! Next ID to generate, one for each local worker.
107  std::vector<size_t> next_id_;
108 
109  //! map containing value items
110  std::unordered_map<Id, ObjectPtr> map_;
111 };
112 
113 /******************************************************************************/
114 // Multiplexer
115 
116 struct Multiplexer::Data {
117  //! Streams have an ID in block headers. (worker id, stream id)
118  Repository<StreamSetBase> stream_sets_;
119 
120  //! array of number of open requests
121  std::vector<std::atomic<size_t> > ongoing_requests_;
122 
123  explicit Data(size_t num_hosts, size_t workers_per_host)
124  : stream_sets_(workers_per_host),
125  ongoing_requests_(num_hosts) { }
126 };
127 
129  net::DispatcherThread& dispatcher, net::Group& group,
130  size_t workers_per_host)
131  : mem_manager_(mem_manager),
132  block_pool_(block_pool),
133  dispatcher_(dispatcher),
134  group_(group),
135  workers_per_host_(workers_per_host),
136  d_(std::make_unique<Data>(group_.num_hosts(), workers_per_host)) {
137 
139  if (num_parallel_async_ == 0) {
140  // one async at a time (for TCP and mock backends)
142  }
143  else {
144  // k/2 asyncs at a time (for MPI backend)
145  num_parallel_async_ /= 2;
146  // at least one
148  }
149 
150  // calculate send queue size limit for StreamData semaphores
151  send_size_limit_ = block_pool.hard_ram_limit() / workers_per_host / 4;
154 
155  // launch initial async reads
156  for (size_t id = 0; id < group_.num_hosts(); id++) {
157  if (id == group_.my_host_rank()) continue;
159  }
160 }
161 
163  std::unique_lock<std::mutex> lock(mutex_);
164 
165  if (!d_->stream_sets_.map().empty()) {
166  LOG1 << "Multiplexer::Close()"
167  << " remaining_streams=" << d_->stream_sets_.map().size();
168  die_unless(d_->stream_sets_.map().empty());
169  }
170 
171  // destroy all still open Streams
172  d_->stream_sets_.map().clear();
173 
174  closed_ = true;
175 }
176 
178  if (!closed_)
179  Close();
180 
181  group_.Close();
182 }
183 
184 size_t Multiplexer::AllocateCatStreamId(size_t local_worker_id) {
185  std::unique_lock<std::mutex> lock(mutex_);
186  return d_->stream_sets_.AllocateId(local_worker_id);
187 }
188 
190  size_t id, size_t local_worker_id, size_t dia_id) {
191  std::unique_lock<std::mutex> lock(mutex_);
192  return IntGetOrCreateCatStreamData(id, local_worker_id, dia_id);
193 }
194 
195 CatStreamPtr Multiplexer::GetNewCatStream(size_t local_worker_id, size_t dia_id) {
196  std::unique_lock<std::mutex> lock(mutex_);
197  return tlx::make_counting<CatStream>(
199  d_->stream_sets_.AllocateId(local_worker_id),
200  local_worker_id, dia_id));
201 }
202 
204  size_t id, size_t local_worker_id, size_t dia_id) {
205  CatStreamDataPtr ptr =
206  d_->stream_sets_.GetOrCreate<CatStreamSet>(
207  id, *this, send_size_limit_, id,
208  workers_per_host_, dia_id)->Peer(local_worker_id);
209  // update dia_id: the stream may have been created before the DIANode
210  // associated with it.
211  if (ptr->dia_id_ == 0)
212  ptr->set_dia_id(dia_id);
213  return ptr;
214 }
215 
216 size_t Multiplexer::AllocateMixStreamId(size_t local_worker_id) {
217  std::unique_lock<std::mutex> lock(mutex_);
218  return d_->stream_sets_.AllocateId(local_worker_id);
219 }
220 
222  size_t id, size_t local_worker_id, size_t dia_id) {
223  std::unique_lock<std::mutex> lock(mutex_);
224  return IntGetOrCreateMixStreamData(id, local_worker_id, dia_id);
225 }
226 
227 MixStreamPtr Multiplexer::GetNewMixStream(size_t local_worker_id, size_t dia_id) {
228  std::unique_lock<std::mutex> lock(mutex_);
229  return tlx::make_counting<MixStream>(
231  d_->stream_sets_.AllocateId(local_worker_id),
232  local_worker_id, dia_id));
233 }
234 
236  size_t id, size_t local_worker_id, size_t dia_id) {
237  MixStreamDataPtr ptr =
238  d_->stream_sets_.GetOrCreate<MixStreamSet>(
239  id, *this, send_size_limit_, id,
240  workers_per_host_, dia_id)->Peer(local_worker_id);
241  // update dia_id: the stream may have been created before the DIANode
242  // associated with it.
243  if (ptr->dia_id_ == 0)
244  ptr->set_dia_id(dia_id);
245  return ptr;
246 }
247 
248 void Multiplexer::IntReleaseCatStream(size_t id, size_t local_worker_id) {
249 
251  d_->stream_sets_.GetOrDie<CatStreamSet>(id);
252 
253  sLOG << "Multiplexer::IntReleaseCatStream() release"
254  << "stream" << id << "local_worker_id" << local_worker_id;
255 
256  if (set->Release(local_worker_id)) {
257  LOG << "Multiplexer::IntReleaseCatStream() destroy stream " << id;
258  d_->stream_sets_.EraseOrDie(id);
259  }
260 }
261 
262 void Multiplexer::IntReleaseMixStream(size_t id, size_t local_worker_id) {
263 
265  d_->stream_sets_.GetOrDie<MixStreamSet>(id);
266 
267  sLOG << "Multiplexer::IntReleaseMixStream() release"
268  << "stream" << id << "local_worker_id" << local_worker_id;
269 
270  if (set->Release(local_worker_id)) {
271  LOG << "Multiplexer::IntReleaseMixStream() destroy stream " << id;
272  d_->stream_sets_.EraseOrDie(id);
273  }
274 }
275 
277  return block_pool_.logger();
278 }
279 
280 /******************************************************************************/
281 
283 
284  while (d_->ongoing_requests_[peer] < num_parallel_async_) {
285  uint32_t seq = 42 + (s.rx_seq_.fetch_add(2) & 0xFFFF);
288  [this, peer, seq](Connection& s, net::Buffer&& buffer) {
289  return OnMultiplexerHeader(peer, seq, s, std::move(buffer));
290  });
291 
292  d_->ongoing_requests_[peer]++;
293  }
294 }
295 
297  size_t peer, uint32_t seq, Connection& s, net::Buffer&& buffer) {
298 
299  die_unless(d_->ongoing_requests_[peer] > 0);
300  d_->ongoing_requests_[peer]--;
301 
302  // received invalid Buffer: the connection has closed?
303  if (!buffer.IsValid()) return;
304 
305  net::BufferReader br(buffer);
307 
308  LOG << "OnMultiplexerHeader() header"
309  << " magic=" << unsigned(header.magic)
310  << " size=" << header.size
311  << " num_items=" << header.num_items
312  << " first_item=" << header.first_item
313  << " typecode_verify=" << header.typecode_verify
314  << " stream_id=" << header.stream_id;
315 
316  // received stream id
317  StreamId id = header.stream_id;
318  size_t local_worker = header.receiver_local_worker;
319 
320  // round of allocation size to next power of two
321  size_t alloc_size = header.size;
322  if (alloc_size < THRILL_DEFAULT_ALIGN) alloc_size = THRILL_DEFAULT_ALIGN;
323  alloc_size = tlx::round_up_to_power_of_two(alloc_size);
324 
325  if (header.magic == MagicByte::CatStreamBlock)
326  {
328  id, local_worker, /* dia_id (unknown at this time) */ 0);
329  stream->rx_net_bytes_ += buffer.size();
330 
331  if (header.IsEnd()) {
332  sLOG << "end of stream on" << s << "in CatStream" << id
333  << "from worker" << header.sender_worker;
334 
335  stream->OnStreamBlock(
336  header.sender_worker, header.seq, PinnedBlock());
337  }
338  else {
339  sLOG << "stream header from" << s << "on CatStream" << id
340  << "from worker" << header.sender_worker
341  << "for local_worker" << local_worker
342  << "seq" << header.seq
343  << "size" << header.size;
344 
346  alloc_size, local_worker);
347  sLOG << "new PinnedByteBlockPtr bytes=" << *bytes;
348 
349  d_->ongoing_requests_[peer]++;
350 
352  s, seq + 1, header.size, std::move(bytes),
353  [this, peer, header, stream]
354  (Connection& s, PinnedByteBlockPtr&& bytes) {
355  OnCatStreamBlock(peer, s, header, stream, std::move(bytes));
356  });
357  }
358  }
359  else if (header.magic == MagicByte::MixStreamBlock)
360  {
361  MixStreamDataPtr stream = GetOrCreateMixStreamData(
362  id, local_worker, /* dia_id (unknown at this time) */ 0);
363  stream->rx_net_bytes_ += buffer.size();
364 
365  if (header.IsEnd()) {
366  sLOG << "end of stream on" << s << "in MixStream" << id
367  << "from worker" << header.sender_worker;
368 
369  stream->OnStreamBlock(header.sender_worker, header.seq,
370  PinnedBlock());
371  }
372  else {
373  sLOG << "stream header from" << s << "on MixStream" << id
374  << "from worker" << header.sender_worker
375  << "for local_worker" << local_worker
376  << "seq" << header.seq
377  << "size" << header.size;
378 
379  PinnedByteBlockPtr bytes = block_pool_.AllocateByteBlock(
380  alloc_size, local_worker);
381 
382  d_->ongoing_requests_[peer]++;
383 
384  dispatcher_.AsyncRead(
385  s, seq + 1, header.size, std::move(bytes),
386  [this, peer, header, stream]
387  (Connection& s, PinnedByteBlockPtr&& bytes) mutable {
388  OnMixStreamBlock(peer, s, header, stream, std::move(bytes));
389  });
390  }
391  }
392  else {
393  die("Invalid magic byte in MultiplexerHeader");
394  }
395 
396  AsyncReadMultiplexerHeader(peer, s);
397 }
398 
399 void Multiplexer::OnCatStreamBlock(
400  size_t peer, Connection& s, const StreamMultiplexerHeader& header,
401  const CatStreamDataPtr& stream, PinnedByteBlockPtr&& bytes) {
402 
403  die_unless(d_->ongoing_requests_[peer] > 0);
404  d_->ongoing_requests_[peer]--;
405 
406  sLOG << "Multiplexer::OnCatStreamBlock()"
407  << "got block" << *bytes << "seq" << header.seq << "on" << s
408  << "in CatStream" << header.stream_id
409  << "from worker" << header.sender_worker;
410 
411  stream->OnStreamBlock(
412  header.sender_worker, header.seq,
413  PinnedBlock(std::move(bytes), /* begin */ 0, header.size,
414  header.first_item, header.num_items,
415  header.typecode_verify));
416 
417  if (header.is_last_block)
418  stream->OnStreamBlock(header.sender_worker, header.seq + 1,
419  PinnedBlock());
420 
421  AsyncReadMultiplexerHeader(peer, s);
422 }
423 
424 void Multiplexer::OnMixStreamBlock(
425  size_t peer, Connection& s, const StreamMultiplexerHeader& header,
426  const MixStreamDataPtr& stream, PinnedByteBlockPtr&& bytes) {
427 
428  die_unless(d_->ongoing_requests_[peer] > 0);
429  d_->ongoing_requests_[peer]--;
430 
431  sLOG << "Multiplexer::OnMixStreamBlock()"
432  << "got block" << *bytes << "seq" << header.seq << "on" << s
433  << "in MixStream" << header.stream_id
434  << "from worker" << header.sender_worker;
435 
436  stream->OnStreamBlock(
437  header.sender_worker, header.seq,
438  PinnedBlock(std::move(bytes), /* begin */ 0, header.size,
439  header.first_item, header.num_items,
440  header.typecode_verify));
441 
442  if (header.is_last_block)
443  stream->OnStreamBlock(header.sender_worker, header.seq + 1,
444  PinnedBlock());
445 
446  AsyncReadMultiplexerHeader(peer, s);
447 }
448 
449 CatStreamDataPtr Multiplexer::CatLoopback(
450  size_t stream_id, size_t to_worker_id) {
451  std::unique_lock<std::mutex> lock(mutex_);
452  return d_->stream_sets_.GetOrDie<CatStreamSet>(stream_id)
453  ->Peer(to_worker_id);
454 }
455 
456 MixStreamDataPtr Multiplexer::MixLoopback(
457  size_t stream_id, size_t to_worker_id) {
458  std::unique_lock<std::mutex> lock(mutex_);
459  return d_->stream_sets_.GetOrDie<MixStreamSet>(stream_id)
460  ->Peer(to_worker_id);
461 }
462 
463 } // namespace data
464 } // namespace thrill
465 
466 /******************************************************************************/
CatStreamPtr GetNewCatStream(size_t local_worker_id, size_t dia_id)
Request next stream.
common::JsonLogger & logger()
Get the JsonLogger from the BlockPool.
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
size_t AllocateMixStreamId(size_t local_worker_id)
Allocate the next stream.
uint32_t sender_worker
global worker rank of sender
#define die_unless(X)
Definition: die.hpp:27
std::mutex mutex_
protects critical sections
size_t hard_ram_limit() noexcept
Hard limit on amount of memory used for ByteBlock.
Definition: block_pool.cpp:886
void IntReleaseCatStream(size_t id, size_t local_worker_id)
release pointer onto a CatStreamData object
#define LOG1
Definition: logger.hpp:28
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
CatStreamDataPtr GetOrCreateCatStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Get stream with given id, if it does not exist, create it.
size_t num_hosts() const
total number of hosts.
Definition: multiplexer.hpp:88
static StreamMultiplexerHeader Parse(net::BufferReader &br)
Reads the stream id and the number of elements in this block.
A pinned / pin-counted pointer to a ByteBlock.
Definition: byte_block.hpp:182
MixStreamDataPtr IntGetOrCreateMixStreamData(size_t id, size_t local_worker_id, size_t dia_id)
int round_up_to_power_of_two(int i)
does what it says: round up to next power of two
uint32_t seq
sequence number in Stream
~Multiplexer()
Closes all client connections.
std::atomic< size_t > rx_net_bytes_
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
virtual Connection & connection(size_t id)=0
Return Connection to client id.
Simple structure that holds a all stream instances for the workers on the local host for a given stre...
Definition: multiplexer.hpp:33
#define THRILL_DEFAULT_ALIGN
bool IsEnd() const
Indicates if this is the end-of-line block header.
static constexpr size_t total_size
void OnCatStreamBlock(size_t peer, Connection &s, const StreamMultiplexerHeader &header, const CatStreamDataPtr &stream, PinnedByteBlockPtr &&bytes)
Receives and dispatches a Block to a CatStreamData.
BufferReader represents a BufferRef with an additional cursor with which the memory can be read incre...
static by_string to_string(int val)
convert to string
PinnedByteBlockPtr AllocateByteBlock(size_t size, size_t local_worker_id)
Definition: block_pool.cpp:484
void IntReleaseMixStream(size_t id, size_t local_worker_id)
release pointer onto a MixStream object
uint32_t typecode_verify
typecode self verify
virtual void Close()=0
Close.
size_t workers_per_host_
Number of workers per host.
size_t my_host_rank() const
Return our rank among hosts in this group.
Definition: group.hpp:69
uint32_t is_last_block
is last block piggybacked indicator
size_t num_parallel_async_
number of parallel recv requests
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
Block header is sent before a sequence of blocks it indicates the number of elements and their bounda...
unique_ptr< T > make_unique(Manager &manager, Args &&...args)
make_unique with Manager tracking
Definition: allocator.hpp:212
void AsyncRead(Connection &c, uint32_t seq, size_t size, const AsyncReadCallback &done_cb)
asynchronously read n bytes and deliver them to the callback
CatStreamDataPtr IntGetOrCreateCatStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Multiplexer(mem::Manager &mem_manager, BlockPool &block_pool, net::DispatcherThread &dispatcher, net::Group &group, size_t workers_per_host)
int value
Definition: gen_data.py:41
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
virtual size_t num_parallel_async() const
Definition: group.cpp:158
std::atomic< uint32_t > rx_seq_
receive sequence
Definition: connection.hpp:601
size_t send_size_limit_
Calculated send queue size limit for StreamData semaphores.
MixStreamPtr GetNewMixStream(size_t local_worker_id, size_t dia_id)
Request next stream.
common::JsonLogger & logger()
Returns logger_.
Definition: block_pool.hpp:85
High-performance smart pointer used as a wrapping reference counting pointer.
size_t AllocateCatStreamId(size_t local_worker_id)
Allocate the next stream.
static const size_t bytes
number of bytes in uint_pair
Definition: uint_types.hpp:75
std::unique_ptr< Data > d_
pimpl data structure
A network Group is a collection of enumerated communication links, which provides point-to-point comm...
Definition: group.hpp:47
size_t workers_per_host() const
number of workers per host
net::DispatcherThread & dispatcher_
Object shared by allocators and other classes to track memory allocations.
Definition: manager.hpp:28
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
size_t StreamId
Definition: stream_data.hpp:32
Simple buffer of characters without initialization or growing functionality.
Definition: buffer.hpp:40
void OnStreamBlock(size_t from, uint32_t seq, PinnedBlock &&b)
called from Multiplexer when there is a new Block for this Stream.
Definition: mix_stream.cpp:160
void OnMultiplexerHeader(size_t peer, uint32_t seq, Connection &s, net::Buffer &&buffer)
JsonLogger is a receiver of JSON output objects for logging.
Definition: json_logger.hpp:69
void AsyncReadMultiplexerHeader(size_t peer, Connection &s)
BlockPool & block_pool_
reference to host-global BlockPool.
void Close()
Closes all client connections.
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
virtual size_t num_hosts() const =0
Return number of connections in this group (= number computing hosts)
DispatcherThread contains a net::Dispatcher object and an associated thread that runs in the dispatch...
MixStreamDataPtr GetOrCreateMixStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Get stream with given id, if it does not exist, create it.