Thrill  0.1
dispatcher.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/mpi/dispatcher.hpp
3  *
4  * A Thrill network layer Implementation which uses MPI to transmit messages to
5  * peers. See group.hpp for more.
6  *
7  * Part of Project Thrill - http://project-thrill.org
8  *
9  * Copyright (C) 2015 Timo Bingmann <[email protected]>
10  *
11  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
12  ******************************************************************************/
13 
14 #pragma once
15 #ifndef THRILL_NET_MPI_DISPATCHER_HEADER
16 #define THRILL_NET_MPI_DISPATCHER_HEADER
17 
19 #include <thrill/net/group.hpp>
20 #include <thrill/net/mpi/group.hpp>
21 
22 #include <mpi.h>
23 
24 #include <algorithm>
25 #include <deque>
26 #include <string>
27 #include <vector>
28 
29 namespace thrill {
30 namespace net {
31 namespace mpi {
32 
33 //! \addtogroup net_mpi MPI Network API
34 //! \ingroup net
35 //! \{
36 
37 #define THRILL_NET_MPI_QUEUES 1
38 
39 //! Signature of async MPI request callbacks.
41  void (MPI_Status&), mem::GPoolAllocator<char> >;
42 
44 {
45 public:
46  //! Construct buffered reader with callback
48  : callback_(callback) {
50  << "AsyncRequest()";
51  }
52 
53  //! non-copyable: delete copy-constructor
54  AsyncRequest(const AsyncRequest&) = delete;
55  //! non-copyable: delete assignment operator
56  AsyncRequest& operator = (const AsyncRequest&) = delete;
57  //! move-constructor: default
58  AsyncRequest(AsyncRequest&&) = default;
59  //! move-assignment operator: default
61 
64  << "~AsyncRequest()";
65  }
66 
67  void DoCallback(MPI_Status& s) {
68  if (callback_) {
69  callback_(s);
71  }
72  }
73 
74 private:
75  //! functional object to call once data is complete
77 };
78 
79 class Dispatcher final : public net::Dispatcher
80 {
81  static constexpr bool debug = false;
82 
83  class MpiAsync;
84 
85 public:
86  //! type for file descriptor readiness callbacks
88 
89  //! constructor
90  Dispatcher(size_t group_size);
91 
92  //! destructor
93  ~Dispatcher();
94 
95  //! Register a buffered read callback and a default exception callback.
96  void AddRead(net::Connection& c, const Callback& read_cb) final {
97  assert(dynamic_cast<Connection*>(&c));
98  Connection& mc = static_cast<Connection&>(c);
99  size_t p = mc.peer();
100  assert(p < watch_.size());
101  watch_[p].active = true;
102  watch_[p].read_cb.emplace_back(read_cb);
103  watch_active_++;
104  }
105 
106  //! Register a buffered write callback and a default exception callback.
107  void AddWrite(net::Connection& /* c */, const Callback& /* write_cb */) final {
108  // abort: this is not implemented. use AsyncWrites.
109  abort();
110  }
111 
112  //! Register a buffered write callback and a default exception callback.
113  void SetExcept(net::Connection& c, const Callback& except_cb) {
114  assert(dynamic_cast<Connection*>(&c));
115  Connection& mc = static_cast<Connection&>(c);
116  size_t p = mc.peer();
117  assert(p < watch_.size());
118  watch_[p].active = true;
119  watch_[p].except_cb = except_cb;
120  watch_active_++;
121  }
122 
123  //! Cancel all callbacks on a given peer.
124  void Cancel(net::Connection& c) final {
125  assert(dynamic_cast<Connection*>(&c));
126  Connection& mc = static_cast<Connection&>(c);
127  size_t p = mc.peer();
128  assert(p < watch_.size());
129 
130  if (watch_[p].read_cb.size() == 0)
131  LOG << "SelectDispatcher::Cancel() peer=" << p
132  << " called with no callbacks registered.";
133 
134  Watch& w = watch_[p];
135  w.read_cb.clear();
136  w.except_cb = Callback();
137  w.active = false;
138  watch_active_--;
139  }
140 
141  MPI_Request ISend(
142  Connection& c, uint32_t seq, const void* data, size_t size);
143  MPI_Request IRecv(
144  Connection& c, uint32_t seq, void* data, size_t size);
145 
146  void AddAsyncRequest(
147  const MPI_Request& req, const AsyncRequestCallback& callback);
148 
150  net::Connection& c, uint32_t seq, Buffer&& buffer,
151  const AsyncWriteCallback& done_cb = AsyncWriteCallback()) final {
152  assert(c.IsValid());
153 
154  if (buffer.size() == 0) {
155  if (done_cb) done_cb(c);
156  return;
157  }
158 
159  QueueAsyncSend(c, MpiAsync(c, seq, std::move(buffer), done_cb));
160  }
161 
163  net::Connection& c, uint32_t seq, data::PinnedBlock&& block,
164  const AsyncWriteCallback& done_cb = AsyncWriteCallback()) final {
165  assert(c.IsValid());
166 
167  if (block.size() == 0) {
168  if (done_cb) done_cb(c);
169  return;
170  }
171 
172  QueueAsyncSend(c, MpiAsync(c, seq, std::move(block), done_cb));
173  }
174 
175  void AsyncRead(net::Connection& c, uint32_t seq, size_t size,
176  const AsyncReadBufferCallback& done_cb
177  = AsyncReadBufferCallback()) final {
178  assert(c.IsValid());
179 
180  if (size == 0) {
181  if (done_cb) done_cb(c, Buffer());
182  return;
183  }
184 
185  QueueAsyncRecv(c, MpiAsync(c, seq, size, done_cb));
186  }
187 
188  void AsyncRead(net::Connection& c, uint32_t seq, size_t size,
189  data::PinnedByteBlockPtr&& block,
190  const AsyncReadByteBlockCallback& done_cb) final {
191  assert(c.IsValid());
192  assert(block.valid());
193 
194  if (block->size() == 0) {
195  if (done_cb) done_cb(c, std::move(block));
196  return;
197  }
198 
199  QueueAsyncRecv(c, MpiAsync(c, seq, size, std::move(block), done_cb));
200  }
201 
202  //! Enqueue and run the encapsulated result
203  void QueueAsyncSend(net::Connection& c, MpiAsync&& a);
204 
205  //! Enqueue and run the encapsulated result
206  void QueueAsyncRecv(net::Connection& c, MpiAsync&& a);
207 
208  //! Issue the encapsulated request to the MPI layer
209  void PerformAsync(MpiAsync&& a);
210 
211  //! Check send queue and perform waiting requests
212  void PumpSendQueue(int peer);
213 
214  //! Check recv queue and perform waiting requests
215  void PumpRecvQueue(int peer);
216 
217  //! Run one iteration of dispatching using MPI_Iprobe().
218  void DispatchOne(const std::chrono::milliseconds& timeout) final;
219 
220  //! Interrupt does nothing.
221  void Interrupt() final { }
222 
223 private:
224  //! callback vectors per peer
225  struct Watch {
226  //! boolean check whether any callbacks are registered
227  bool active = false;
228  //! queue of callbacks for peer.
229  std::deque<Callback, mem::GPoolAllocator<Callback> >
231  //! only one exception callback for the peer.
233  };
234 
235  //! callback watch vector
236  std::vector<Watch> watch_;
237 
238  //! counter of active watches
239  size_t watch_active_ { 0 };
240 
241  //! Default exception handler
242  static bool DefaultExceptionCallback() {
243  throw Exception("SelectDispatcher() exception on socket!", errno);
244  }
245 
246  /**************************************************************************/
247 
248  /*!
249  * This is the big answer to what happens when an MPI async request is
250  * signaled as complete: it unifies all possible async requests, including
251  * the reference counts they hold on the appropriate buffers, and dispatches
252  * the correct callbacks when done.
253  */
254  class MpiAsync
255  {
256  public:
257  enum Type {
260  WRITE_BUFFER, READ_BUFFER,
261  WRITE_BLOCK, READ_BYTE_BLOCK
262  };
263 
264  //! default constructor for resize
265  MpiAsync() : type_(NONE), seq_(0) { }
266 
267  //! construct generic MPI async request
269  : type_(REQUEST), seq_(0),
270  arequest_(callback) { }
271 
272  //! Construct AsyncWrite with Buffer
273  MpiAsync(net::Connection& conn, uint32_t seq,
274  Buffer&& buffer,
275  const AsyncWriteCallback& callback)
276  : type_(WRITE_BUFFER), seq_(seq),
277  write_buffer_(conn, std::move(buffer), callback) { }
278 
279  //! Construct AsyncRead with Buffer
280  MpiAsync(net::Connection& conn, uint32_t seq,
281  size_t buffer_size, const AsyncReadBufferCallback& callback)
282  : type_(READ_BUFFER), seq_(seq),
283  read_buffer_(conn, buffer_size, callback) { }
284 
285  //! Construct AsyncWrite with Block
286  MpiAsync(net::Connection& conn, uint32_t seq,
287  data::PinnedBlock&& block,
288  const AsyncWriteCallback& callback)
289  : type_(WRITE_BLOCK), seq_(seq),
290  write_block_(conn, std::move(block), callback) { }
291 
292  //! Construct AsyncRead with ByteBuffer
293  MpiAsync(net::Connection& conn, uint32_t seq,
294  size_t size,
295  data::PinnedByteBlockPtr&& block,
296  const AsyncReadByteBlockCallback& callback)
297  : type_(READ_BYTE_BLOCK), seq_(seq),
298  read_byte_block_(conn, size, std::move(block), callback) { }
299 
300  //! copy-constructor: default (work as long as union members are default
301  //! copyable)
302  MpiAsync(const MpiAsync& ma) = default;
303 
304  //! move-constructor: move item
306  : type_(ma.type_) {
307  Acquire(std::move(ma));
308  ma.type_ = NONE;
309  }
310 
311  //! move-assignment
312  MpiAsync& operator = (MpiAsync&& ma) noexcept {
313  if (this == &ma) return *this;
314 
315  // destroy self (yes, the destructor is just a function)
316  this->~MpiAsync();
317  // move item.
318  type_ = ma.type_;
319  Acquire(std::move(ma));
320  // release other
321  ma.type_ = NONE;
322 
323  return *this;
324  }
325 
327  // call the active content's destructor
328  if (type_ == REQUEST)
329  arequest_.~AsyncRequest();
330  else if (type_ == WRITE_BUFFER)
331  write_buffer_.~AsyncWriteBuffer();
332  else if (type_ == READ_BUFFER)
333  read_buffer_.~AsyncReadBuffer();
334  else if (type_ == WRITE_BLOCK)
335  write_block_.~AsyncWriteBlock();
336  else if (type_ == READ_BYTE_BLOCK)
337  read_byte_block_.~AsyncReadByteBlock();
338  }
339 
340  //! Dispatch done message to correct callback.
341  void DoCallback(MPI_Status& s) {
342  if (type_ == REQUEST)
343  arequest_.DoCallback(s);
344  else if (type_ == WRITE_BUFFER)
345  write_buffer_.DoCallback();
346  else if (type_ == READ_BUFFER) {
347  int size;
348  MPI_Get_count(&s, MPI_BYTE, &size);
349  read_buffer_.DoCallback(size);
350  }
351  else if (type_ == WRITE_BLOCK)
352  write_block_.DoCallback();
353  else if (type_ == READ_BYTE_BLOCK) {
354  int size;
355  MPI_Get_count(&s, MPI_BYTE, &size);
356  read_byte_block_.DoCallback(size);
357  }
358  }
359 
360  //! Return mpi Connection pointer
362  if (type_ == REQUEST)
363  return nullptr;
364  else if (type_ == WRITE_BUFFER)
365  return static_cast<Connection*>(write_buffer_.connection());
366  else if (type_ == READ_BUFFER)
367  return static_cast<Connection*>(read_buffer_.connection());
368  else if (type_ == WRITE_BLOCK)
369  return static_cast<Connection*>(write_block_.connection());
370  else if (type_ == READ_BYTE_BLOCK)
371  return static_cast<Connection*>(read_byte_block_.connection());
372  die("Unknown Buffer type");
373  }
374 
375  private:
376  //! type of this async
378 
379  //! sequence id
380  uint32_t seq_;
381 
382  //! the big unification of async receivers. these also hold reference
383  //! counts on the Buffer or Block objects.
384  union {
390  };
391 
392  //! assign myself the other object's content
393  void Acquire(MpiAsync&& ma) noexcept {
394  assert(type_ == ma.type_);
395  seq_ = ma.seq_;
396  // yes, this placement movement into the correct union component.
397  if (type_ == REQUEST)
398  new (&arequest_)AsyncRequest(std::move(ma.arequest_));
399  else if (type_ == WRITE_BUFFER)
400  new (&write_buffer_)AsyncWriteBuffer(std::move(ma.write_buffer_));
401  else if (type_ == READ_BUFFER)
402  new (&read_buffer_)AsyncReadBuffer(std::move(ma.read_buffer_));
403  else if (type_ == WRITE_BLOCK)
404  new (&write_block_)AsyncWriteBlock(std::move(ma.write_block_));
405  else if (type_ == READ_BYTE_BLOCK)
406  new (&read_byte_block_)AsyncReadByteBlock(std::move(ma.read_byte_block_));
407  }
408 
409  //! for direct access to union
410  friend class Dispatcher;
411  };
412 
413  //! array of asynchronous writers and readers (these have to align with
414  //! mpi_async_requests_)
415  std::vector<MpiAsync> mpi_async_;
416 
417  //! array of current async MPI_Request for MPI_Testsome().
418  std::vector<MPI_Request> mpi_async_requests_;
419 
420  //! array of output integer of finished requests for MPI_Testsome().
421  std::vector<int> mpi_async_out_;
422 
423  //! array of output MPI_Status of finished requests for MPI_Testsome().
424  std::vector<MPI_Status> mpi_status_out_;
425 
426 #if THRILL_NET_MPI_QUEUES
427  //! queue of delayed requests for each peer
428  std::deque<std::deque<MpiAsync> > send_queue_;
429 
430  //! queue of delayed requests for each peer
431  std::deque<std::deque<MpiAsync> > recv_queue_;
432 
433  //! number of active requests
434  std::vector<size_t> send_active_;
435 
436  //! number of active requests
437  std::vector<size_t> recv_active_;
438 #endif
439 };
440 
441 //! \}
442 
443 } // namespace mpi
444 } // namespace net
445 } // namespace thrill
446 
447 #endif // !THRILL_NET_MPI_DISPATCHER_HEADER
448 
449 /******************************************************************************/
void DoCallback(MPI_Status &s)
Dispatch done message to correct callback.
Definition: dispatcher.hpp:341
bool active
boolean check whether any callbacks are registered
Definition: dispatcher.hpp:227
Connection * connection()
Return mpi Connection pointer.
Definition: dispatcher.hpp:361
void AddRead(net::Connection &c, const Callback &read_cb) final
Register a buffered read callback and a default exception callback.
Definition: dispatcher.hpp:96
virtual bool IsValid() const =0
check whether the connection is (still) valid.
std::vector< size_t > recv_active_
number of active requests
Definition: dispatcher.hpp:437
tlx::delegate< void(Connection &), mem::GPoolAllocator< char > > AsyncWriteCallback
Signature of async write callbacks.
Definition: dispatcher.hpp:58
void Interrupt() final
Interrupt does nothing.
Definition: dispatcher.hpp:221
A derived exception class which looks up MPI error strings.
Definition: group.hpp:45
callback vectors per peer
Definition: dispatcher.hpp:225
A pinned / pin-counted pointer to a ByteBlock.
Definition: byte_block.hpp:205
STL namespace.
void AsyncWrite(net::Connection &c, uint32_t seq, data::PinnedBlock &&block, const AsyncWriteCallback &done_cb=AsyncWriteCallback()) final
Definition: dispatcher.hpp:162
std::deque< std::deque< MpiAsync > > recv_queue_
queue of delayed requests for each peer
Definition: dispatcher.hpp:431
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
tlx::delegate< bool(), mem::GPoolAllocator< char > > AsyncCallback
Signature of async connection readability/writability callbacks.
Definition: dispatcher.hpp:45
std::vector< int > mpi_async_out_
array of output integer of finished requests for MPI_Testsome().
Definition: dispatcher.hpp:421
AsyncRequest & operator=(const AsyncRequest &)=delete
non-copyable: delete assignment operator
MpiAsync(const AsyncRequestCallback &callback)
construct generic MPI async request
Definition: dispatcher.hpp:268
AsyncRequestCallback callback_
functional object to call once data is complete
Definition: dispatcher.hpp:76
std::deque< Callback, mem::GPoolAllocator< Callback > > read_cb
queue of callbacks for peer.
Definition: dispatcher.hpp:230
void AsyncRead(net::Connection &c, uint32_t seq, size_t size, data::PinnedByteBlockPtr &&block, const AsyncReadByteBlockCallback &done_cb) final
asynchronously read the full ByteBlock and deliver it to the callback
Definition: dispatcher.hpp:188
void AddWrite(net::Connection &, const Callback &) final
Register a buffered write callback and a default exception callback.
Definition: dispatcher.hpp:107
void DoCallback(MPI_Status &s)
Definition: dispatcher.hpp:67
MpiAsync(net::Connection &conn, uint32_t seq, size_t buffer_size, const AsyncReadBufferCallback &callback)
Construct AsyncRead with Buffer.
Definition: dispatcher.hpp:280
std::vector< MPI_Status > mpi_status_out_
array of output MPI_Status of finished requests for MPI_Testsome().
Definition: dispatcher.hpp:424
void AsyncRead(net::Connection &c, uint32_t seq, size_t size, const AsyncReadBufferCallback &done_cb=AsyncReadBufferCallback()) final
asynchronously read n bytes and deliver them to the callback
Definition: dispatcher.hpp:175
MpiAsync(net::Connection &conn, uint32_t seq, data::PinnedBlock &&block, const AsyncWriteCallback &callback)
Construct AsyncWrite with Block.
Definition: dispatcher.hpp:286
std::vector< MpiAsync > mpi_async_
Definition: dispatcher.hpp:415
tlx::delegate< void(MPI_Status &), mem::GPoolAllocator< char > > AsyncRequestCallback
Signature of async MPI request callbacks.
Definition: dispatcher.hpp:41
void Acquire(MpiAsync &&ma) noexcept
assign myself the other object&#39;s content
Definition: dispatcher.hpp:393
static bool DefaultExceptionCallback()
Default exception handler.
Definition: dispatcher.hpp:242
std::vector< size_t > send_active_
number of active requests
Definition: dispatcher.hpp:434
MpiAsync(net::Connection &conn, uint32_t seq, size_t size, data::PinnedByteBlockPtr &&block, const AsyncReadByteBlockCallback &callback)
Construct AsyncRead with ByteBuffer.
Definition: dispatcher.hpp:293
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
static constexpr bool debug
void AsyncWrite(net::Connection &c, uint32_t seq, Buffer &&buffer, const AsyncWriteCallback &done_cb=AsyncWriteCallback()) final
Definition: dispatcher.hpp:149
AsyncRequest(const AsyncRequestCallback &callback)
Construct buffered reader with callback.
Definition: dispatcher.hpp:47
void Cancel(net::Connection &c) final
Cancel all callbacks on a given peer.
Definition: dispatcher.hpp:124
MpiAsync()
default constructor for resize
Definition: dispatcher.hpp:265
void SetExcept(net::Connection &c, const Callback &except_cb)
Register a buffered write callback and a default exception callback.
Definition: dispatcher.hpp:113
MpiAsync(net::Connection &conn, uint32_t seq, Buffer &&buffer, const AsyncWriteCallback &callback)
Construct AsyncWrite with Buffer.
Definition: dispatcher.hpp:273
This is the big answer to what happens when an MPI async request is signaled as complete: it unifies ...
Definition: dispatcher.hpp:254
Callback except_cb
only one exception callback for the peer.
Definition: dispatcher.hpp:232
int peer() const
return the MPI peer number
Definition: group.hpp:79
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
static constexpr bool debug_async
Definition: dispatcher.hpp:62
std::vector< Watch > watch_
callback watch vector
Definition: dispatcher.hpp:236
Simple buffer of characters without initialization or growing functionality.
Definition: buffer.hpp:40
Virtual MPI connection class.
Definition: group.hpp:62
MpiAsync(MpiAsync &&ma)
move-constructor: move item
Definition: dispatcher.hpp:305
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
Dispatcher is a high level wrapper for asynchronous callback processing.
Definition: dispatcher.hpp:510
std::deque< std::deque< MpiAsync > > send_queue_
queue of delayed requests for each peer
Definition: dispatcher.hpp:428
tlx::delegate< void(Connection &c, Buffer &&buffer), mem::GPoolAllocator< char > > AsyncReadBufferCallback
Signature of async read Buffer callbacks.
Definition: dispatcher.hpp:49
std::vector< MPI_Request > mpi_async_requests_
array of current async MPI_Request for MPI_Testsome().
Definition: dispatcher.hpp:418
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21