Thrill  0.1
group.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/mpi/group.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
12 #include <thrill/net/mpi/group.hpp>
13 
14 #include <mpi.h>
15 
16 #include <limits>
17 #include <mutex>
18 #include <string>
19 #include <vector>
20 
21 namespace thrill {
22 namespace net {
23 namespace mpi {
24 
25 //! The Grand MPI Library Invocation Mutex (The GMLIM)
26 std::mutex g_mutex;
27 
28 /******************************************************************************/
29 // mpi::Exception
30 
31 Exception::Exception(const std::string& what, int error_code)
32  : net::Exception(what + ": [" + std::to_string(error_code) + "] "
33  + GetErrorString(error_code))
34 { }
35 
37  char string[MPI_MAX_ERROR_STRING];
38  int resultlen;
39  MPI_Error_string(error_code, string, &resultlen);
40  return std::string(string, resultlen);
41 }
42 
43 /******************************************************************************/
44 //! mpi::Connection
45 
47  return "peer: " + std::to_string(peer_);
48 }
49 
50 std::ostream& Connection::OutputOstream(std::ostream& os) const {
51  return os << "[mpi::Connection"
52  << " group_tag_=" << group_->group_tag()
53  << " peer_=" << peer_
54  << "]";
55 }
56 
57 void Connection::SyncSend(const void* data, size_t size, Flags /* flags */) {
58 
59  LOG << "SyncSend()"
60  << " data=" << data
61  << " size=" << size
62  << " peer_=" << peer_
63  << " group_tag_=" << group_->group_tag();
64 
65  assert(size <= std::numeric_limits<int>::max());
66 
67  bool done = false;
68  group_->dispatcher().RunInThread(
69  [=, &done](net::Dispatcher& dispatcher) {
70 
71  auto& disp = static_cast<mpi::Dispatcher&>(dispatcher);
72 
73  MPI_Request request =
74  disp.ISend(*this, /* seq */ 0, data, size);
75 
76  disp.AddAsyncRequest(
77  request, [&done](MPI_Status&) { done = true; });
78  });
79 
80  while (!done)
81  std::this_thread::yield();
82 
83  tx_bytes_ += size;
84 }
85 
86 void Connection::SyncRecv(void* out_data, size_t size) {
87 
88  LOG << "SyncRecv()"
89  << " out_data=" << out_data
90  << " size=" << size
91  << " peer_=" << peer_
92  << " group_tag_=" << group_->group_tag();
93 
94  assert(size <= std::numeric_limits<int>::max());
95 
96  bool done = false;
97  group_->dispatcher().RunInThread(
98  [=, &done](net::Dispatcher& dispatcher) {
99 
100  auto& disp = static_cast<mpi::Dispatcher&>(dispatcher);
101 
102  MPI_Request request =
103  disp.IRecv(*this, /* seq */ 0, out_data, size);
104 
105  disp.AddAsyncRequest(
106  request, [&done, size](MPI_Status& status) {
107  int count;
108  int r = MPI_Get_count(&status, MPI_BYTE, &count);
109  if (r != MPI_SUCCESS)
110  throw Exception("Error during MPI_Get_count()", r);
111 
112  if (static_cast<size_t>(count) != size)
113  throw Exception("Error during SyncRecv(): message truncated?");
114 
115  done = true;
116  });
117  });
118 
119  while (!done)
120  std::this_thread::yield();
121 
122  rx_bytes_ += size;
123 }
124 
125 void Connection::SyncSendRecv(const void* send_data, size_t send_size,
126  void* recv_data, size_t recv_size) {
127 
128  LOG << "SyncSendRecv()"
129  << " send_data=" << send_data
130  << " send_size=" << send_size
131  << " recv_data=" << recv_data
132  << " recv_size=" << recv_size
133  << " peer_=" << peer_
134  << " group_tag_=" << group_->group_tag();
135 
136  assert(send_size <= std::numeric_limits<int>::max());
137  assert(recv_size <= std::numeric_limits<int>::max());
138 
139  unsigned done = 0;
140  group_->dispatcher().RunInThread(
141  [=, &done](net::Dispatcher& dispatcher) {
142  auto& disp = static_cast<mpi::Dispatcher&>(dispatcher);
143 
144  MPI_Request send_request =
145  disp.ISend(*this, /* seq */ 0, send_data, send_size);
146 
147  MPI_Request recv_request =
148  disp.IRecv(*this, /* seq */ 0, recv_data, recv_size);
149 
150  disp.AddAsyncRequest(
151  send_request, [&done](MPI_Status&) { ++done; });
152  disp.AddAsyncRequest(
153  recv_request, [&done, recv_size](MPI_Status& status) {
154  int count;
155  int r = MPI_Get_count(&status, MPI_BYTE, &count);
156  if (r != MPI_SUCCESS)
157  throw Exception("Error during MPI_Get_count()", r);
158 
159  if (static_cast<size_t>(count) != recv_size)
160  throw Exception("Error during SyncSendRecv(): message truncated?");
161 
162  ++done;
163  });
164  });
165 
166  while (done != 2)
167  std::this_thread::yield();
168 
169  tx_bytes_ += send_size;
170  rx_bytes_ += recv_size;
171 }
172 
173 void Connection::SyncRecvSend(const void* send_data, size_t send_size,
174  void* recv_data, size_t recv_size) {
175  SyncSendRecv(send_data, send_size, recv_data, recv_size);
176 }
177 
178 /******************************************************************************/
179 // mpi::Group
180 
182  return 16;
183 }
184 
185 std::unique_ptr<net::Dispatcher> Group::ConstructDispatcher() const {
186  // construct mpi::Dispatcher
187  return std::make_unique<Dispatcher>(num_hosts());
188 }
189 
191 
192  bool done = false;
193  dispatcher_.RunInThread(
194  [=, &done](net::Dispatcher& dispatcher) {
195  std::unique_lock<std::mutex> lock(g_mutex);
196 
197  MPI_Request request;
198  int r = MPI_Ibarrier(MPI_COMM_WORLD, &request);
199  if (r != MPI_SUCCESS)
200  throw Exception("Error during MPI_Barrier()", r);
201 
202  lock.unlock();
203 
204  auto& disp = static_cast<mpi::Dispatcher&>(dispatcher);
205  disp.AddAsyncRequest(
206  request, [&done](MPI_Status&) { done = true; });
207  });
208 
209  while (!done)
210  std::this_thread::yield();
211 }
212 
213 template <typename MpiCall>
214 void Group::WaitForRequest(MpiCall call) {
215 
216  bool done = false;
217  dispatcher_.RunInThread(
218  [=, &done](net::Dispatcher& dispatcher) {
219  std::unique_lock<std::mutex> lock(g_mutex);
220 
221  MPI_Request request;
222  int r = call(request);
223 
224  if (r != MPI_SUCCESS)
225  throw Exception("Error during WaitForRequest", r);
226 
227  lock.unlock();
228 
229  auto& disp = static_cast<mpi::Dispatcher&>(dispatcher);
230  disp.AddAsyncRequest(
231  request, [&done](MPI_Status&) { done = true; });
232  });
233 
234  while (!done)
235  std::this_thread::yield();
236 }
237 
238 /******************************************************************************/
239 // mpi::Group Collective Overrides
240 
241 /*[[[perl
242  for my $e (
243  ["int", "Int", "INT"],
244  ["unsigned int", "UnsignedInt", "UNSIGNED"],
245  ["long", "Long", "LONG"],
246  ["unsigned long", "UnsignedLong", "UNSIGNED_LONG"],
247  ["long long", "LongLong", "LONG_LONG"],
248  ["unsigned long long", "UnsignedLongLong", "UNSIGNED_LONG_LONG"])
249  {
250  print "void Group::PrefixSumPlus$$e[1]($$e[0]& value, const $$e[0]& initial) {\n";
251  print " LOG << \"Group::PrefixSumPlus($$e[0]);\";\n";
252  print " WaitForRequest(\n";
253  print " [&](MPI_Request& request) {\n";
254  print " return MPI_Iscan(MPI_IN_PLACE, &value, 1, MPI_INT,\n";
255  print " MPI_SUM, MPI_COMM_WORLD, &request);\n";
256  print " });\n";
257  print " value += initial;\n";
258  print "}\n";
259 
260  print "void Group::ExPrefixSumPlus$$e[1]($$e[0]& value, const $$e[0]& initial) {\n";
261  print " LOG << \"Group::ExPrefixSumPlus($$e[0]);\";\n";
262  print " WaitForRequest(\n";
263  print " [&](MPI_Request& request) {\n";
264  print " return MPI_Iexscan(MPI_IN_PLACE, &value, 1, MPI_$$e[2],\n";
265  print " MPI_SUM, MPI_COMM_WORLD, &request);\n";
266  print " });\n";
267  print " value = (my_rank_ == 0 ? initial : value + initial);\n";
268  print "}\n";
269 
270  print "void Group::Broadcast$$e[1]($$e[0]& value, size_t origin) {\n";
271  print " LOG << \"Group::Broadcast($$e[0]);\";\n";
272  print " WaitForRequest(\n";
273  print " [&](MPI_Request& request) {\n";
274  print " return MPI_Ibcast(&value, 1, MPI_$$e[2], origin,\n";
275  print " MPI_COMM_WORLD, &request);\n";
276  print " });\n";
277  print "}\n";
278 
279  print "void Group::AllReducePlus$$e[1]($$e[0]& value) {\n";
280  print " LOG << \"Group::AllReducePlus($$e[0]);\";\n";
281  print " WaitForRequest(\n";
282  print " [&](MPI_Request& request) {\n";
283  print " return MPI_Iallreduce(\n";
284  print " MPI_IN_PLACE, &value, 1, MPI_$$e[2],\n";
285  print " MPI_SUM, MPI_COMM_WORLD, &request);\n";
286  print " });\n";
287  print "}\n";
288 
289  print "void Group::AllReduceMinimum$$e[1]($$e[0]& value) {\n";
290  print " LOG << \"Group::AllReduceMinimum($$e[0]);\";\n";
291  print " WaitForRequest(\n";
292  print " [&](MPI_Request& request) {\n";
293  print " return MPI_Iallreduce(\n";
294  print " MPI_IN_PLACE, &value, 1, MPI_$$e[2],\n";
295  print " MPI_MIN, MPI_COMM_WORLD, &request);\n";
296  print " });\n";
297  print "}\n";
298 
299  print "void Group::AllReduceMaximum$$e[1]($$e[0]& value) {\n";
300  print " LOG << \"Group::AllReduceMaximum($$e[0]);\";\n";
301  print " WaitForRequest(\n";
302  print " [&](MPI_Request& request) {\n";
303  print " return MPI_Iallreduce(\n";
304  print " MPI_IN_PLACE, &value, 1, MPI_$$e[2],\n";
305  print " MPI_MAX, MPI_COMM_WORLD, &request);\n";
306  print " });\n";
307  print "}\n";
308  }
309 ]]]*/
310 void Group::PrefixSumPlusInt(int& value, const int& initial) {
311  LOG << "Group::PrefixSumPlus(int);";
312  WaitForRequest(
313  [&](MPI_Request& request) {
314  return MPI_Iscan(MPI_IN_PLACE, &value, 1, MPI_INT,
315  MPI_SUM, MPI_COMM_WORLD, &request);
316  });
317  value += initial;
318 }
319 void Group::ExPrefixSumPlusInt(int& value, const int& initial) {
320  LOG << "Group::ExPrefixSumPlus(int);";
321  WaitForRequest(
322  [&](MPI_Request& request) {
323  return MPI_Iexscan(MPI_IN_PLACE, &value, 1, MPI_INT,
324  MPI_SUM, MPI_COMM_WORLD, &request);
325  });
326  value = (my_rank_ == 0 ? initial : value + initial);
327 }
328 void Group::BroadcastInt(int& value, size_t origin) {
329  LOG << "Group::Broadcast(int);";
330  WaitForRequest(
331  [&](MPI_Request& request) {
332  return MPI_Ibcast(&value, 1, MPI_INT, origin,
333  MPI_COMM_WORLD, &request);
334  });
335 }
337  LOG << "Group::AllReducePlus(int);";
338  WaitForRequest(
339  [&](MPI_Request& request) {
340  return MPI_Iallreduce(
341  MPI_IN_PLACE, &value, 1, MPI_INT,
342  MPI_SUM, MPI_COMM_WORLD, &request);
343  });
344 }
346  LOG << "Group::AllReduceMinimum(int);";
347  WaitForRequest(
348  [&](MPI_Request& request) {
349  return MPI_Iallreduce(
350  MPI_IN_PLACE, &value, 1, MPI_INT,
351  MPI_MIN, MPI_COMM_WORLD, &request);
352  });
353 }
355  LOG << "Group::AllReduceMaximum(int);";
356  WaitForRequest(
357  [&](MPI_Request& request) {
358  return MPI_Iallreduce(
359  MPI_IN_PLACE, &value, 1, MPI_INT,
360  MPI_MAX, MPI_COMM_WORLD, &request);
361  });
362 }
363 void Group::PrefixSumPlusUnsignedInt(unsigned int& value, const unsigned int& initial) {
364  LOG << "Group::PrefixSumPlus(unsigned int);";
365  WaitForRequest(
366  [&](MPI_Request& request) {
367  return MPI_Iscan(MPI_IN_PLACE, &value, 1, MPI_INT,
368  MPI_SUM, MPI_COMM_WORLD, &request);
369  });
370  value += initial;
371 }
372 void Group::ExPrefixSumPlusUnsignedInt(unsigned int& value, const unsigned int& initial) {
373  LOG << "Group::ExPrefixSumPlus(unsigned int);";
374  WaitForRequest(
375  [&](MPI_Request& request) {
376  return MPI_Iexscan(MPI_IN_PLACE, &value, 1, MPI_UNSIGNED,
377  MPI_SUM, MPI_COMM_WORLD, &request);
378  });
379  value = (my_rank_ == 0 ? initial : value + initial);
380 }
381 void Group::BroadcastUnsignedInt(unsigned int& value, size_t origin) {
382  LOG << "Group::Broadcast(unsigned int);";
383  WaitForRequest(
384  [&](MPI_Request& request) {
385  return MPI_Ibcast(&value, 1, MPI_UNSIGNED, origin,
386  MPI_COMM_WORLD, &request);
387  });
388 }
390  LOG << "Group::AllReducePlus(unsigned int);";
391  WaitForRequest(
392  [&](MPI_Request& request) {
393  return MPI_Iallreduce(
394  MPI_IN_PLACE, &value, 1, MPI_UNSIGNED,
395  MPI_SUM, MPI_COMM_WORLD, &request);
396  });
397 }
399  LOG << "Group::AllReduceMinimum(unsigned int);";
400  WaitForRequest(
401  [&](MPI_Request& request) {
402  return MPI_Iallreduce(
403  MPI_IN_PLACE, &value, 1, MPI_UNSIGNED,
404  MPI_MIN, MPI_COMM_WORLD, &request);
405  });
406 }
408  LOG << "Group::AllReduceMaximum(unsigned int);";
409  WaitForRequest(
410  [&](MPI_Request& request) {
411  return MPI_Iallreduce(
412  MPI_IN_PLACE, &value, 1, MPI_UNSIGNED,
413  MPI_MAX, MPI_COMM_WORLD, &request);
414  });
415 }
416 void Group::PrefixSumPlusLong(long& value, const long& initial) {
417  LOG << "Group::PrefixSumPlus(long);";
418  WaitForRequest(
419  [&](MPI_Request& request) {
420  return MPI_Iscan(MPI_IN_PLACE, &value, 1, MPI_INT,
421  MPI_SUM, MPI_COMM_WORLD, &request);
422  });
423  value += initial;
424 }
425 void Group::ExPrefixSumPlusLong(long& value, const long& initial) {
426  LOG << "Group::ExPrefixSumPlus(long);";
427  WaitForRequest(
428  [&](MPI_Request& request) {
429  return MPI_Iexscan(MPI_IN_PLACE, &value, 1, MPI_LONG,
430  MPI_SUM, MPI_COMM_WORLD, &request);
431  });
432  value = (my_rank_ == 0 ? initial : value + initial);
433 }
434 void Group::BroadcastLong(long& value, size_t origin) {
435  LOG << "Group::Broadcast(long);";
436  WaitForRequest(
437  [&](MPI_Request& request) {
438  return MPI_Ibcast(&value, 1, MPI_LONG, origin,
439  MPI_COMM_WORLD, &request);
440  });
441 }
443  LOG << "Group::AllReducePlus(long);";
444  WaitForRequest(
445  [&](MPI_Request& request) {
446  return MPI_Iallreduce(
447  MPI_IN_PLACE, &value, 1, MPI_LONG,
448  MPI_SUM, MPI_COMM_WORLD, &request);
449  });
450 }
452  LOG << "Group::AllReduceMinimum(long);";
453  WaitForRequest(
454  [&](MPI_Request& request) {
455  return MPI_Iallreduce(
456  MPI_IN_PLACE, &value, 1, MPI_LONG,
457  MPI_MIN, MPI_COMM_WORLD, &request);
458  });
459 }
461  LOG << "Group::AllReduceMaximum(long);";
462  WaitForRequest(
463  [&](MPI_Request& request) {
464  return MPI_Iallreduce(
465  MPI_IN_PLACE, &value, 1, MPI_LONG,
466  MPI_MAX, MPI_COMM_WORLD, &request);
467  });
468 }
469 void Group::PrefixSumPlusUnsignedLong(unsigned long& value, const unsigned long& initial) {
470  LOG << "Group::PrefixSumPlus(unsigned long);";
471  WaitForRequest(
472  [&](MPI_Request& request) {
473  return MPI_Iscan(MPI_IN_PLACE, &value, 1, MPI_INT,
474  MPI_SUM, MPI_COMM_WORLD, &request);
475  });
476  value += initial;
477 }
478 void Group::ExPrefixSumPlusUnsignedLong(unsigned long& value, const unsigned long& initial) {
479  LOG << "Group::ExPrefixSumPlus(unsigned long);";
480  WaitForRequest(
481  [&](MPI_Request& request) {
482  return MPI_Iexscan(MPI_IN_PLACE, &value, 1, MPI_UNSIGNED_LONG,
483  MPI_SUM, MPI_COMM_WORLD, &request);
484  });
485  value = (my_rank_ == 0 ? initial : value + initial);
486 }
487 void Group::BroadcastUnsignedLong(unsigned long& value, size_t origin) {
488  LOG << "Group::Broadcast(unsigned long);";
489  WaitForRequest(
490  [&](MPI_Request& request) {
491  return MPI_Ibcast(&value, 1, MPI_UNSIGNED_LONG, origin,
492  MPI_COMM_WORLD, &request);
493  });
494 }
496  LOG << "Group::AllReducePlus(unsigned long);";
497  WaitForRequest(
498  [&](MPI_Request& request) {
499  return MPI_Iallreduce(
500  MPI_IN_PLACE, &value, 1, MPI_UNSIGNED_LONG,
501  MPI_SUM, MPI_COMM_WORLD, &request);
502  });
503 }
505  LOG << "Group::AllReduceMinimum(unsigned long);";
506  WaitForRequest(
507  [&](MPI_Request& request) {
508  return MPI_Iallreduce(
509  MPI_IN_PLACE, &value, 1, MPI_UNSIGNED_LONG,
510  MPI_MIN, MPI_COMM_WORLD, &request);
511  });
512 }
514  LOG << "Group::AllReduceMaximum(unsigned long);";
515  WaitForRequest(
516  [&](MPI_Request& request) {
517  return MPI_Iallreduce(
518  MPI_IN_PLACE, &value, 1, MPI_UNSIGNED_LONG,
519  MPI_MAX, MPI_COMM_WORLD, &request);
520  });
521 }
522 void Group::PrefixSumPlusLongLong(long long& value, const long long& initial) {
523  LOG << "Group::PrefixSumPlus(long long);";
524  WaitForRequest(
525  [&](MPI_Request& request) {
526  return MPI_Iscan(MPI_IN_PLACE, &value, 1, MPI_INT,
527  MPI_SUM, MPI_COMM_WORLD, &request);
528  });
529  value += initial;
530 }
531 void Group::ExPrefixSumPlusLongLong(long long& value, const long long& initial) {
532  LOG << "Group::ExPrefixSumPlus(long long);";
533  WaitForRequest(
534  [&](MPI_Request& request) {
535  return MPI_Iexscan(MPI_IN_PLACE, &value, 1, MPI_LONG_LONG,
536  MPI_SUM, MPI_COMM_WORLD, &request);
537  });
538  value = (my_rank_ == 0 ? initial : value + initial);
539 }
540 void Group::BroadcastLongLong(long long& value, size_t origin) {
541  LOG << "Group::Broadcast(long long);";
542  WaitForRequest(
543  [&](MPI_Request& request) {
544  return MPI_Ibcast(&value, 1, MPI_LONG_LONG, origin,
545  MPI_COMM_WORLD, &request);
546  });
547 }
549  LOG << "Group::AllReducePlus(long long);";
550  WaitForRequest(
551  [&](MPI_Request& request) {
552  return MPI_Iallreduce(
553  MPI_IN_PLACE, &value, 1, MPI_LONG_LONG,
554  MPI_SUM, MPI_COMM_WORLD, &request);
555  });
556 }
558  LOG << "Group::AllReduceMinimum(long long);";
559  WaitForRequest(
560  [&](MPI_Request& request) {
561  return MPI_Iallreduce(
562  MPI_IN_PLACE, &value, 1, MPI_LONG_LONG,
563  MPI_MIN, MPI_COMM_WORLD, &request);
564  });
565 }
567  LOG << "Group::AllReduceMaximum(long long);";
568  WaitForRequest(
569  [&](MPI_Request& request) {
570  return MPI_Iallreduce(
571  MPI_IN_PLACE, &value, 1, MPI_LONG_LONG,
572  MPI_MAX, MPI_COMM_WORLD, &request);
573  });
574 }
575 void Group::PrefixSumPlusUnsignedLongLong(unsigned long long& value, const unsigned long long& initial) {
576  LOG << "Group::PrefixSumPlus(unsigned long long);";
577  WaitForRequest(
578  [&](MPI_Request& request) {
579  return MPI_Iscan(MPI_IN_PLACE, &value, 1, MPI_INT,
580  MPI_SUM, MPI_COMM_WORLD, &request);
581  });
582  value += initial;
583 }
584 void Group::ExPrefixSumPlusUnsignedLongLong(unsigned long long& value, const unsigned long long& initial) {
585  LOG << "Group::ExPrefixSumPlus(unsigned long long);";
586  WaitForRequest(
587  [&](MPI_Request& request) {
588  return MPI_Iexscan(MPI_IN_PLACE, &value, 1, MPI_UNSIGNED_LONG_LONG,
589  MPI_SUM, MPI_COMM_WORLD, &request);
590  });
591  value = (my_rank_ == 0 ? initial : value + initial);
592 }
593 void Group::BroadcastUnsignedLongLong(unsigned long long& value, size_t origin) {
594  LOG << "Group::Broadcast(unsigned long long);";
595  WaitForRequest(
596  [&](MPI_Request& request) {
597  return MPI_Ibcast(&value, 1, MPI_UNSIGNED_LONG_LONG, origin,
598  MPI_COMM_WORLD, &request);
599  });
600 }
601 void Group::AllReducePlusUnsignedLongLong(unsigned long long& value) {
602  LOG << "Group::AllReducePlus(unsigned long long);";
603  WaitForRequest(
604  [&](MPI_Request& request) {
605  return MPI_Iallreduce(
606  MPI_IN_PLACE, &value, 1, MPI_UNSIGNED_LONG_LONG,
607  MPI_SUM, MPI_COMM_WORLD, &request);
608  });
609 }
611  LOG << "Group::AllReduceMinimum(unsigned long long);";
612  WaitForRequest(
613  [&](MPI_Request& request) {
614  return MPI_Iallreduce(
615  MPI_IN_PLACE, &value, 1, MPI_UNSIGNED_LONG_LONG,
616  MPI_MIN, MPI_COMM_WORLD, &request);
617  });
618 }
620  LOG << "Group::AllReduceMaximum(unsigned long long);";
621  WaitForRequest(
622  [&](MPI_Request& request) {
623  return MPI_Iallreduce(
624  MPI_IN_PLACE, &value, 1, MPI_UNSIGNED_LONG_LONG,
625  MPI_MAX, MPI_COMM_WORLD, &request);
626  });
627 }
628 // [[[end]]]
629 
630 /******************************************************************************/
631 // mpi::Construct
632 
633 //! atexit() method to deinitialize the MPI library.
634 static inline void Deinitialize() {
635  std::unique_lock<std::mutex> lock(g_mutex);
636 
637  MPI_Finalize();
638 }
639 
640 //! run MPI_Init() if not already done (can be called multiple times).
641 static inline void Initialize() {
642 
643  int flag;
644  int r = MPI_Initialized(&flag);
645  if (r != MPI_SUCCESS)
646  throw Exception("Error during MPI_Initialized()", r);
647 
648  if (!flag) {
649  // fake command line
650  int argc = 1;
651  const char* argv[] = { "thrill", nullptr };
652 
653  int provided;
654  int r = MPI_Init_thread(&argc, reinterpret_cast<char***>(&argv),
655  MPI_THREAD_SERIALIZED, &provided);
656  if (r != MPI_SUCCESS)
657  throw Exception("Error during MPI_Init_thread()", r);
658 
659  if (provided < MPI_THREAD_SERIALIZED)
660  die("ERROR: MPI_Init_thread() only provided= " << provided);
661 
662  // register atexit method
663  atexit(&Deinitialize);
664  }
665 }
666 
667 /*!
668  * Construct Group which connects to peers using MPI. As the MPI environment
669  * already defines the connections, no hosts or parameters can be
670  * given. Constructs group_count mpi::Group objects at once. Within each Group
671  * this host has its MPI rank.
672  *
673  * Returns true if this Thrill host participates in the Group.
674  */
675 bool Construct(size_t group_size, DispatcherThread& dispatcher,
676  std::unique_ptr<Group>* groups, size_t group_count) {
677  std::unique_lock<std::mutex> lock(g_mutex);
678 
679  Initialize();
680 
681  int my_rank;
682  int r = MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
683  if (r != MPI_SUCCESS)
684  throw Exception("Error during MPI_Comm_rank()", r);
685 
686  int num_mpi_hosts;
687  r = MPI_Comm_size(MPI_COMM_WORLD, &num_mpi_hosts);
688  if (r != MPI_SUCCESS)
689  throw Exception("Error during MPI_Comm_size()", r);
690 
691  if (group_size > static_cast<size_t>(num_mpi_hosts))
692  throw Exception("mpi::Construct(): fewer MPI processes than hosts requested.");
693 
694  for (size_t i = 0; i < group_count; i++) {
695  groups[i] = std::make_unique<Group>(my_rank, i, group_size, dispatcher);
696  }
697 
698  return (static_cast<size_t>(my_rank) < group_size);
699 }
700 
701 size_t NumMpiProcesses() {
702  std::unique_lock<std::mutex> lock(g_mutex);
703 
704  Initialize();
705 
706  int num_mpi_hosts;
707  int r = MPI_Comm_size(MPI_COMM_WORLD, &num_mpi_hosts);
708  if (r != MPI_SUCCESS)
709  throw Exception("Error during MPI_Comm_size()", r);
710 
711  return static_cast<size_t>(num_mpi_hosts);
712 }
713 
714 size_t MpiRank() {
715  std::unique_lock<std::mutex> lock(g_mutex);
716 
717  Initialize();
718 
719  int mpi_rank;
720  int r = MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
721  if (r != MPI_SUCCESS)
722  throw Exception("Error during MPI_Comm_rank()", r);
723 
724  return static_cast<size_t>(mpi_rank);
725 }
726 
727 } // namespace mpi
728 } // namespace net
729 } // namespace thrill
730 
731 /******************************************************************************/
void SyncSendRecv(const void *send_data, size_t send_size, void *recv_data, size_t recv_size) final
Definition: group.cpp:125
void BroadcastLongLong(long long &value, size_t origin) final
Definition: group.cpp:540
void AllReduceMinimumLongLong(long long &value) final
Definition: group.cpp:557
void AllReducePlusLong(long &value) final
Definition: group.cpp:442
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
void AllReduceMinimumUnsignedInt(unsigned int &value) final
Definition: group.cpp:398
MPI_Request IRecv(Connection &c, uint32_t seq, void *data, size_t size)
Definition: dispatcher.cpp:83
void AllReduceMaximumUnsignedInt(unsigned int &value) final
Definition: group.cpp:407
void AddAsyncRequest(const MPI_Request &req, const AsyncRequestCallback &callback)
Definition: dispatcher.cpp:105
void AllReducePlusUnsignedLongLong(unsigned long long &value) final
Definition: group.cpp:601
void PrefixSumPlusLong(long &value, const long &initial) final
Definition: group.cpp:416
void BroadcastUnsignedLong(unsigned long &value, size_t origin) final
Definition: group.cpp:487
void AllReduceMaximumLong(long &value) final
Definition: group.cpp:460
void PrefixSumPlusUnsignedInt(unsigned int &value, const unsigned int &initial) final
Definition: group.cpp:363
A derived exception class which looks up MPI error strings.
Definition: group.hpp:45
STL namespace.
void ExPrefixSumPlusLongLong(long long &value, const long long &initial) final
Definition: group.cpp:531
void AllReduceMinimumUnsignedLong(unsigned long &value) final
Definition: group.cpp:504
void PrefixSumPlusUnsignedLongLong(unsigned long long &value, const unsigned long long &initial) final
Definition: group.cpp:575
std::ostream & OutputOstream(std::ostream &os) const final
virtual method to output to a std::ostream
Definition: group.cpp:50
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
void AllReduceMinimumLong(long &value) final
Definition: group.cpp:451
void AllReducePlusUnsignedLong(unsigned long &value) final
Definition: group.cpp:495
static by_string to_string(int val)
convert to string
void BroadcastUnsignedLongLong(unsigned long long &value, size_t origin) final
Definition: group.cpp:593
std::string ToString() const final
mpi::Connection
Definition: group.cpp:46
void AllReduceMinimumInt(int &value) final
Definition: group.cpp:345
std::mutex g_mutex
The Grand MPI Library Invocation Mutex (The GMLIM)
Definition: group.cpp:26
void AllReducePlusLongLong(long long &value) final
Definition: group.cpp:548
void SyncSend(const void *data, size_t size, Flags=NoFlags) final
Definition: group.cpp:57
int value
Definition: gen_data.py:41
void Barrier()
run a MPI_Barrier() for synchronization.
Definition: group.cpp:190
std::unique_ptr< net::Dispatcher > ConstructDispatcher() const final
Definition: group.cpp:185
void ExPrefixSumPlusUnsignedLong(unsigned long &value, const unsigned long &initial) final
Definition: group.cpp:478
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
void WaitForRequest(MpiCall call)
Definition: group.cpp:214
void SyncRecv(void *out_data, size_t size) final
Definition: group.cpp:86
void ExPrefixSumPlusInt(int &value, const int &initial) final
Definition: group.cpp:319
size_t MpiRank()
Return the rank of this process in the MPI COMM WORLD.
Definition: group.cpp:714
static void Deinitialize()
atexit() method to deinitialize the MPI library.
Definition: group.cpp:634
void ExPrefixSumPlusUnsignedInt(unsigned int &value, const unsigned int &initial) final
Definition: group.cpp:372
void PrefixSumPlusInt(int &value, const int &initial) final
Definition: group.cpp:310
void SyncRecvSend(const void *send_data, size_t send_size, void *recv_data, size_t recv_size) final
Definition: group.cpp:173
void BroadcastUnsignedInt(unsigned int &value, size_t origin) final
Definition: group.cpp:381
size_t num_parallel_async() const final
Number of parallel sends or recvs requests supported by net backend.
Definition: group.cpp:181
void AllReduceMinimumUnsignedLongLong(unsigned long long &value) final
Definition: group.cpp:610
void BroadcastLong(long &value, size_t origin) final
Definition: group.cpp:434
Exception(const std::string &what)
Definition: group.hpp:48
static void Initialize()
run MPI_Init() if not already done (can be called multiple times).
Definition: group.cpp:641
void ExPrefixSumPlusUnsignedLongLong(unsigned long long &value, const unsigned long long &initial) final
Definition: group.cpp:584
void PrefixSumPlusUnsignedLong(unsigned long &value, const unsigned long &initial) final
Definition: group.cpp:469
Flags
Additional flags for sending or receiving.
Definition: connection.hpp:61
size_t NumMpiProcesses()
Return the number of MPI processes.
Definition: group.cpp:701
static std::string GetErrorString(int error_code)
return the MPI error string
Definition: group.cpp:36
void PrefixSumPlusLongLong(long long &value, const long long &initial) final
Definition: group.cpp:522
void AllReduceMaximumLongLong(long long &value) final
Definition: group.cpp:566
void AllReduceMaximumUnsignedLongLong(unsigned long long &value) final
Definition: group.cpp:619
void AllReduceMaximumUnsignedLong(unsigned long &value) final
Definition: group.cpp:513
void AllReduceMaximumInt(int &value) final
Definition: group.cpp:354
MPI_Request ISend(Connection &c, uint32_t seq, const void *data, size_t size)
Definition: dispatcher.cpp:61
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
Dispatcher is a high level wrapper for asynchronous callback processing.
Definition: dispatcher.hpp:510
void AllReducePlusInt(int &value) final
Definition: group.cpp:336
void BroadcastInt(int &value, size_t origin) final
Definition: group.cpp:328
void AllReducePlusUnsignedInt(unsigned int &value) final
Definition: group.cpp:389
DispatcherThread contains a net::Dispatcher object and an associated thread that runs in the dispatch...
bool Construct(size_t group_size, DispatcherThread &dispatcher, std::unique_ptr< Group > *groups, size_t group_count)
Construct Group which connects to peers using MPI.
Definition: group.cpp:675
void ExPrefixSumPlusLong(long &value, const long &initial) final
Definition: group.cpp:425