Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
dispatcher.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/mpi/dispatcher.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
12 #include <tlx/die.hpp>
13 
14 #include <mpi.h>
15 
16 #include <mutex>
17 #include <vector>
18 
19 namespace thrill {
20 namespace net {
21 namespace mpi {
22 
23 //! The Grand MPI Library Invocation Mutex (The GMLIM)
24 extern std::mutex g_mutex;
25 
26 //! number of simultaneous transfers
27 static const size_t g_simultaneous = 32;
28 
29 /******************************************************************************/
30 // mpi::Dispatcher
31 
32 Dispatcher::Dispatcher(size_t group_size)
33  : net::Dispatcher() {
34 
35  watch_.resize(group_size);
36 #if THRILL_NET_MPI_QUEUES
37  send_queue_.resize(group_size);
38  send_active_.resize(group_size);
39  recv_queue_.resize(group_size);
40  recv_active_.resize(group_size);
41 #endif
42 }
43 
45  LOG << "~mpi::Dispatcher()"
46  << " mpi_async_.size()=" << mpi_async_.size();
47 
48  // lock the GMLIM
49  std::unique_lock<std::mutex> lock(g_mutex);
50 
51  for (size_t i = 0; i < mpi_async_requests_.size(); ++i) {
52  int r = MPI_Cancel(&mpi_async_requests_[i]);
53 
54  if (r != MPI_SUCCESS)
55  LOG1 << "Error during MPI_Cancel()";
56 
57  MPI_Request_free(&mpi_async_requests_[i]);
58  }
59 }
60 
61 MPI_Request Dispatcher::ISend(
62  Connection& c, uint32_t seq, const void* data, size_t size) {
63  // lock the GMLIM
64  std::unique_lock<std::mutex> lock(g_mutex);
65 
66  MPI_Request request;
67  int r = MPI_Isend(const_cast<void*>(data), static_cast<int>(size), MPI_BYTE,
68  c.peer(), static_cast<int>(seq),
69  MPI_COMM_WORLD, &request);
70 
71  if (r != MPI_SUCCESS)
72  throw Exception("Error during MPI_Isend()", r);
73 
74  LOG << "MPI_Isend() data=" << data << " size=" << size
75  << " peer=" << c.peer() << " seq=" << seq
76  << " request=" << request;
77 
78  c.tx_bytes_ += size;
79 
80  return request;
81 }
82 
83 MPI_Request Dispatcher::IRecv(
84  Connection& c, uint32_t seq, void* data, size_t size) {
85  // lock the GMLIM
86  std::unique_lock<std::mutex> lock(g_mutex);
87 
88  MPI_Request request;
89  int r = MPI_Irecv(data, static_cast<int>(size), MPI_BYTE,
90  c.peer(), static_cast<int>(seq),
91  MPI_COMM_WORLD, &request);
92 
93  if (r != MPI_SUCCESS)
94  throw Exception("Error during MPI_Irecv()", r);
95 
96  LOG << "MPI_Irecv() data=" << data << " size=" << size
97  << " peer=" << c.peer() << " seq=" << seq
98  << " request=" << request;
99 
100  c.rx_bytes_ += size;
101 
102  return request;
103 }
104 
106  const MPI_Request& req, const AsyncRequestCallback& callback) {
107 
108  LOG << "AddAsyncRequest() req=" << req;
109 
110  // store request and associated buffer (Isend needs memory).
111  mpi_async_requests_.emplace_back(req);
112  mpi_async_.emplace_back(MpiAsync(callback));
113  mpi_async_out_.emplace_back();
114  mpi_status_out_.emplace_back();
115 }
116 
118 #if THRILL_NET_MPI_QUEUES
119  assert(dynamic_cast<Connection*>(&c));
120  Connection* mpic = static_cast<Connection*>(&c);
121 
122  int peer = mpic->peer();
123  if (send_active_[peer] < g_simultaneous) {
124  // perform immediately
125  PerformAsync(std::move(a));
126  }
127  else {
128  send_queue_[peer].emplace_back(std::move(a));
129  }
130 #else
131  tlx::unused(c);
132  // perform immediately
133  PerformAsync(std::move(a));
134 #endif
135 }
136 
138 #if THRILL_NET_MPI_QUEUES
139  assert(dynamic_cast<Connection*>(&c));
140  Connection* mpic = static_cast<Connection*>(&c);
141 
142  int peer = mpic->peer();
143 
144  if (recv_active_[peer] < g_simultaneous) {
145  // perform immediately
146  PerformAsync(std::move(a));
147  }
148  else {
149  recv_queue_[peer].emplace_back(std::move(a));
150  }
151 #else
152  tlx::unused(c);
153  // perform immediately
154  PerformAsync(std::move(a));
155 #endif
156 }
157 
159 #if THRILL_NET_MPI_QUEUES
160  while (send_active_[peer] < g_simultaneous && !send_queue_[peer].empty()) {
161  MpiAsync a = std::move(send_queue_[peer].front());
162  send_queue_[peer].pop_front();
163  PerformAsync(std::move(a));
164  }
165  if (!send_queue_[peer].empty()) {
166  LOG << "Dispatcher::PumpSendQueue() send remaining="
167  << send_queue_[peer].size();
168  }
169 #else
170  tlx::unused(peer);
171 #endif
172 }
173 
175 #if THRILL_NET_MPI_QUEUES
176  while (recv_active_[peer] < g_simultaneous && !recv_queue_[peer].empty()) {
177  MpiAsync a = std::move(recv_queue_[peer].front());
178  recv_queue_[peer].pop_front();
179  PerformAsync(std::move(a));
180  }
181  if (!recv_queue_[peer].empty()) {
182  LOG << "Dispatcher::PumpRecvQueue(). recv remaining="
183  << recv_queue_[peer].size();
184  }
185 #else
186  tlx::unused(peer);
187 #endif
188 }
189 
191  if (a.type_ == MpiAsync::WRITE_BUFFER)
192  {
193  AsyncWriteBuffer& r = a.write_buffer_;
194  assert(dynamic_cast<Connection*>(r.connection()));
195  Connection* c = static_cast<Connection*>(r.connection());
196 
197  MPI_Request req = ISend(*c, a.seq_, r.data(), r.size());
198 
199  // store request and associated buffer (Isend needs memory).
200  mpi_async_requests_.emplace_back(req);
201  mpi_async_.emplace_back(std::move(a));
202  mpi_async_out_.emplace_back();
203  mpi_status_out_.emplace_back();
204 
205 #if THRILL_NET_MPI_QUEUES
206  send_active_[c->peer()]++;
207 #endif
208  }
209  else if (a.type_ == MpiAsync::WRITE_BLOCK)
210  {
211  AsyncWriteBlock& r = a.write_block_;
212  assert(dynamic_cast<Connection*>(r.connection()));
213  Connection* c = static_cast<Connection*>(r.connection());
214 
215  MPI_Request req = ISend(*c, a.seq_, r.data(), r.size());
216 
217  // store request and associated buffer (Isend needs memory).
218  mpi_async_requests_.emplace_back(req);
219  mpi_async_.emplace_back(std::move(a));
220  mpi_async_out_.emplace_back();
221  mpi_status_out_.emplace_back();
222 
223 #if THRILL_NET_MPI_QUEUES
224  send_active_[c->peer()]++;
225 #endif
226  }
227  else if (a.type_ == MpiAsync::READ_BUFFER)
228  {
229  AsyncReadBuffer& r = a.read_buffer_;
230  assert(dynamic_cast<Connection*>(r.connection()));
231  Connection* c = static_cast<Connection*>(r.connection());
232 
233  MPI_Request req = IRecv(*c, a.seq_, r.data(), r.size());
234 
235  // store request and associated buffer (Irecv needs memory).
236  mpi_async_requests_.emplace_back(req);
237  mpi_async_.emplace_back(std::move(a));
238  mpi_async_out_.emplace_back();
239  mpi_status_out_.emplace_back();
240 
241 #if THRILL_NET_MPI_QUEUES
242  recv_active_[c->peer()]++;
243 #endif
244  }
245  else if (a.type_ == MpiAsync::READ_BYTE_BLOCK)
246  {
247  AsyncReadByteBlock& r = a.read_byte_block_;
248  assert(dynamic_cast<Connection*>(r.connection()));
249  Connection* c = static_cast<Connection*>(r.connection());
250 
251  MPI_Request req = IRecv(*c, a.seq_, r.data(), r.size());
252 
253  // store request and associated buffer (Irecv needs memory).
254  mpi_async_requests_.emplace_back(req);
255  mpi_async_.emplace_back(std::move(a));
256  mpi_async_out_.emplace_back();
257  mpi_status_out_.emplace_back();
258 
259 #if THRILL_NET_MPI_QUEUES
260  recv_active_[c->peer()]++;
261 #endif
262  }
263 }
264 
265 void Dispatcher::DispatchOne(const std::chrono::milliseconds& /* timeout */) {
266 
267  // use MPI_Testsome() to check for finished writes
268  if (mpi_async_requests_.size())
269  {
270  die_unless(mpi_async_.size() == mpi_async_requests_.size());
271  die_unless(mpi_async_.size() == mpi_async_out_.size());
272  die_unless(mpi_async_.size() == mpi_status_out_.size());
273 
274  // lock the GMLIM
275  std::unique_lock<std::mutex> lock(g_mutex);
276 
277 #if 1
278  int out_count;
279 
280  sLOG << "DispatchOne(): MPI_Testsome()"
281  << " mpi_async_requests_=" << mpi_async_requests_.size();
282 
283  int r = MPI_Testsome(
284  // in: Length of array_of_requests (integer).
285  static_cast<int>(mpi_async_requests_.size()),
286  // in: Array of requests (array of handles).
287  mpi_async_requests_.data(),
288  // out: Number of completed requests (integer).
289  &out_count,
290  // out: Array of indices of operations that completed (array of
291  // integers).
292  mpi_async_out_.data(),
293  // out: Array of status objects for operations that completed (array
294  // of status).
295  mpi_status_out_.data());
296 
297  lock.unlock();
298 
299  if (r != MPI_SUCCESS)
300  throw Exception("Error during MPI_Testsome()", r);
301 
302  if (out_count == MPI_UNDEFINED) {
303  // nothing returned
304  }
305  else if (out_count > 0) {
306  sLOG << "DispatchOne(): MPI_Testsome() out_count=" << out_count;
307 
308  die_unless(std::is_sorted(mpi_async_out_.begin(),
309  mpi_async_out_.begin() + out_count));
310 
311  // rewrite the arrays, process and remove all finished requests.
312  {
313  // output index for shifting unfinished requests to (left of i)
314  size_t out = 0;
315  // index into output arrays mpi_async_out_ and mpi_status_out_.
316  int k = 0;
317 
318  // Callbacks or pumping the queue may add new requests to the
319  // end of the arrays. However, since these indexes cannot be in
320  // the result set (mpi_async_out_), the loop will just copy them
321  // correctly; hence no special handling is needed.
322 
323  for (size_t i = 0; i < mpi_async_.size(); ++i)
324  {
325  if (k < out_count && mpi_async_out_[k] == static_cast<int>(i)) {
326 
327  sLOG << "Working #" << k
328  << "which is $" << mpi_async_out_[k];
329 
330  // perform callback
331  mpi_async_[i].DoCallback(mpi_status_out_[k]);
332 
333 #if THRILL_NET_MPI_QUEUES
334  MpiAsync& a = mpi_async_[i];
335  int peer = a.connection() ? a.connection()->peer() : 0;
336  MpiAsync::Type a_type = a.type_;
337 
338  if (a_type == MpiAsync::WRITE_BUFFER ||
339  a_type == MpiAsync::WRITE_BLOCK)
340  {
341  die_unless(send_active_[peer] > 0);
342  send_active_[peer]--;
343  LOG << "DispatchOne() send_active_[" << peer << "]="
344  << send_active_[peer];
345  PumpSendQueue(peer);
346  }
347  else if (a_type == MpiAsync::READ_BUFFER ||
348  a_type == MpiAsync::READ_BYTE_BLOCK)
349  {
350  die_unless(recv_active_[peer] > 0);
351  recv_active_[peer]--;
352  LOG << "DispatchOne() recv_active_[" << peer << "]="
353  << recv_active_[peer];
354  PumpRecvQueue(peer);
355  }
356 #endif
357  // skip over finished request
358  ++k;
359  continue;
360  }
361  if (i != out) {
362  // shift unfinished requests from back of array
363  mpi_async_[out] = std::move(mpi_async_[i]);
364  mpi_async_requests_[out] = std::move(mpi_async_requests_[i]);
365  }
366  ++out;
367  }
368 
369  // shrink arrays
370  mpi_async_.resize(out);
371  mpi_async_requests_.resize(out);
372  mpi_async_out_.resize(out);
373  mpi_status_out_.resize(out);
374  }
375  }
376 #else
377  int out_index = 0, out_flag = 0;
378  MPI_Status out_status;
379 
380  // (mpi_async_requests_.size() >= 10)
381  sLOG0 << "DispatchOne(): MPI_Testany()"
382  << " mpi_async_requests_=" << mpi_async_requests_.size();
383 
384  int r = MPI_Testany(
385  // in: Length of array_of_requests (integer).
386  static_cast<int>(mpi_async_requests_.size()),
387  // in: Array of requests (array of handles).
388  mpi_async_requests_.data(),
389  // out: Number of completed request (integer).
390  &out_index,
391  // out: True if one of the operations is complete (logical).
392  &out_flag,
393  // out: Status object (status).
394  &out_status /* MPI_STATUS_IGNORE */);
395 
396  if (r != MPI_SUCCESS)
397  throw Exception("Error during MPI_Testany()", r);
398 
399  if (out_flag == 0) {
400  // nothing returned
401  lock.unlock();
402  }
403  else {
404  die_unless((unsigned)out_index < mpi_async_requests_.size());
405  lock.unlock();
406 
407  LOG << "DispatchOne(): MPI_Testany() out_flag=" << out_flag
408  << " done #" << out_index
409  << " out_tag=" << out_status.MPI_TAG;
410 
411  // perform callback
412  mpi_async_[out_index].DoCallback(out_status);
413 
414 #if THRILL_NET_MPI_QUEUES
415  MpiAsync& a = mpi_async_[out_index];
416  int peer = a.connection() ? a.connection()->peer() : 0;
417  MpiAsync::Type a_type = a.type_;
418 #endif
419 
420  mpi_async_.erase(mpi_async_.begin() + out_index);
421  mpi_async_requests_.erase(mpi_async_requests_.begin() + out_index);
422  mpi_async_out_.erase(mpi_async_out_.begin() + out_index);
423  mpi_status_out_.erase(mpi_status_out_.begin() + out_index);
424 
425 #if THRILL_NET_MPI_QUEUES
426  if (a_type == MpiAsync::WRITE_BUFFER ||
427  a_type == MpiAsync::WRITE_BLOCK)
428  {
429  die_unless(send_active_[peer] > 0);
430  send_active_[peer]--;
431  LOG << "DispatchOne() send_active_[" << peer << "]="
432  << send_active_[peer];
433  PumpSendQueue(peer);
434  }
435  else if (a_type == MpiAsync::READ_BUFFER ||
436  a_type == MpiAsync::READ_BYTE_BLOCK)
437  {
438  die_unless(recv_active_[peer] > 0);
439  recv_active_[peer]--;
440  LOG << "DispatchOne() recv_active_[" << peer << "]="
441  << recv_active_[peer];
442  PumpRecvQueue(peer);
443  }
444 #endif
445  }
446 #endif
447  }
448 
449  if (watch_active_ && 0)
450  {
451  // use MPI_Iprobe() to check for a new message on this MPI tag.
452  int flag = 0;
453  MPI_Status status;
454 
455  {
456  // lock the GMLIM
457  std::unique_lock<std::mutex> lock(g_mutex);
458 
459  int r = MPI_Iprobe(MPI_ANY_SOURCE, /* group_tag */ 0,
460  MPI_COMM_WORLD, &flag, &status);
461 
462  if (r != MPI_SUCCESS)
463  throw Exception("Error during MPI_Iprobe()", r);
464  }
465 
466  // check whether probe was successful
467  if (flag == 0) return;
468 
469  // get the right watch
470  int p = status.MPI_SOURCE;
471  assert(p >= 0 && static_cast<size_t>(p) < watch_.size());
472 
473  Watch& w = watch_[p];
474 
475  if (!w.active) {
476  sLOG << "Got Iprobe() for unwatched peer" << p;
477  return;
478  }
479 
480  sLOG << "Got iprobe for peer" << p;
481 
482  if (w.read_cb.size()) {
483  // run read callbacks until one returns true (in which case it wants
484  // to be called again), or the read_cb list is empty.
485  while (w.read_cb.size() && w.read_cb.front()() == false) {
486  w.read_cb.pop_front();
487  }
488 
489  if (w.read_cb.size() == 0) {
490  w.active = false;
491  watch_active_--;
492  }
493  }
494  else {
495  LOG << "Dispatcher: got MPI_Iprobe() for peer "
496  << p << " without a read handler.";
497  }
498  }
499 }
500 
501 } // namespace mpi
502 } // namespace net
503 } // namespace thrill
504 
505 /******************************************************************************/
void PerformAsync(MpiAsync &&a)
Issue the encapsulated request to the MPI layer.
Definition: dispatcher.cpp:190
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
Connection * connection()
Return mpi Connection pointer.
Definition: dispatcher.hpp:361
uint8_t * data()
underlying buffer pointer
Definition: dispatcher.hpp:146
MPI_Request IRecv(Connection &c, uint32_t seq, void *data, size_t size)
Definition: dispatcher.cpp:83
int peer() const
return the MPI peer number
Definition: group.hpp:79
#define die_unless(X)
Definition: die.hpp:27
size_t size() const
underlying buffer size
Definition: dispatcher.hpp:152
std::vector< size_t > recv_active_
number of active requests
Definition: dispatcher.hpp:437
std::atomic< size_t > tx_bytes_
sent bytes
Definition: connection.hpp:609
void AddAsyncRequest(const MPI_Request &req, const AsyncRequestCallback &callback)
Definition: dispatcher.cpp:105
#define LOG1
Definition: logger.hpp:28
Dispatcher()=default
default constructor
A derived exception class which looks up MPI error strings.
Definition: group.hpp:45
callback vectors per peer
Definition: dispatcher.hpp:225
std::deque< std::deque< MpiAsync > > recv_queue_
queue of delayed requests for each peer
Definition: dispatcher.hpp:431
std::vector< int > mpi_async_out_
array of output integer of finished requests for MPI_Testsome().
Definition: dispatcher.hpp:421
void PumpSendQueue(int peer)
Check send queue and perform waiting requests.
Definition: dispatcher.cpp:158
void PumpRecvQueue(int peer)
Check recv queue and perform waiting requests.
Definition: dispatcher.cpp:174
size_t size() const
underlying buffer size
Definition: dispatcher.hpp:356
void QueueAsyncRecv(net::Connection &c, MpiAsync &&a)
Enqueue and run the encapsulated result.
Definition: dispatcher.cpp:137
Connection * connection() const
Returns conn_.
Definition: dispatcher.hpp:347
void unused(Types &&...)
Definition: unused.hpp:20
void QueueAsyncSend(net::Connection &c, MpiAsync &&a)
Enqueue and run the encapsulated result.
Definition: dispatcher.cpp:117
#define sLOG0
Override default output: never or always output log.
Definition: logger.hpp:37
std::vector< MPI_Status > mpi_status_out_
array of output MPI_Status of finished requests for MPI_Testsome().
Definition: dispatcher.hpp:424
std::mutex g_mutex
The Grand MPI Library Invocation Mutex (The GMLIM)
Definition: group.cpp:26
std::vector< MpiAsync > mpi_async_
Definition: dispatcher.hpp:415
size_t watch_active_
counter of active watches
Definition: dispatcher.hpp:239
uint8_t * data()
underlying buffer pointer
Definition: dispatcher.hpp:350
std::vector< size_t > send_active_
number of active requests
Definition: dispatcher.hpp:434
void DispatchOne(const std::chrono::milliseconds &timeout) final
Run one iteration of dispatching using MPI_Iprobe().
Definition: dispatcher.cpp:265
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
Connection * connection() const
Returns conn_.
Definition: dispatcher.hpp:239
const uint8_t * data() const
underlying buffer pointer
Definition: dispatcher.hpp:453
This is the big answer to what happens when an MPI async request is signaled as complete: it unifies ...
Definition: dispatcher.hpp:254
size_t size() const
underlying buffer size
Definition: dispatcher.hpp:456
std::vector< Watch > watch_
callback watch vector
Definition: dispatcher.hpp:236
Connection * connection() const
Returns conn_.
Definition: dispatcher.hpp:450
Connection * connection() const
Returns conn_.
Definition: dispatcher.hpp:143
static const size_t g_simultaneous
number of simultaneous transfers
Definition: dispatcher.cpp:27
size_t size() const
underlying buffer size
Definition: dispatcher.hpp:245
Virtual MPI connection class.
Definition: group.hpp:62
MPI_Request ISend(Connection &c, uint32_t seq, const void *data, size_t size)
Definition: dispatcher.cpp:61
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
std::deque< std::deque< MpiAsync > > send_queue_
queue of delayed requests for each peer
Definition: dispatcher.hpp:428
std::vector< MPI_Request > mpi_async_requests_
array of current async MPI_Request for MPI_Testsome().
Definition: dispatcher.hpp:418
std::atomic< size_t > rx_bytes_
received bytes
Definition: connection.hpp:612
const uint8_t * data() const
underlying buffer pointer
Definition: dispatcher.hpp:242