33 + GetErrorString(error_code))
37 char string[MPI_MAX_ERROR_STRING];
39 MPI_Error_string(error_code,
string, &resultlen);
51 return os <<
"[mpi::Connection" 52 <<
" group_tag_=" << group_->group_tag()
63 <<
" group_tag_=" << group_->group_tag();
68 group_->dispatcher().RunInThread(
74 disp.
ISend(*
this, 0, data, size);
77 request, [&done](MPI_Status&) { done =
true; });
81 std::this_thread::yield();
89 <<
" out_data=" << out_data
92 <<
" group_tag_=" << group_->group_tag();
97 group_->dispatcher().RunInThread(
102 MPI_Request request =
103 disp.
IRecv(*
this, 0, out_data, size);
105 disp.AddAsyncRequest(
106 request, [&done, size](MPI_Status& status) {
108 int r = MPI_Get_count(&status, MPI_BYTE, &count);
109 if (r != MPI_SUCCESS)
110 throw Exception(
"Error during MPI_Get_count()", r);
112 if (static_cast<size_t>(count) != size)
113 throw Exception(
"Error during SyncRecv(): message truncated?");
120 std::this_thread::yield();
126 void* recv_data,
size_t recv_size) {
128 LOG <<
"SyncSendRecv()" 129 <<
" send_data=" << send_data
130 <<
" send_size=" << send_size
131 <<
" recv_data=" << recv_data
132 <<
" recv_size=" << recv_size
133 <<
" peer_=" << peer_
134 <<
" group_tag_=" << group_->group_tag();
140 group_->dispatcher().RunInThread(
144 MPI_Request send_request =
145 disp.
ISend(*
this, 0, send_data, send_size);
147 MPI_Request recv_request =
148 disp.IRecv(*
this, 0, recv_data, recv_size);
150 disp.AddAsyncRequest(
151 send_request, [&done](MPI_Status&) { ++done; });
152 disp.AddAsyncRequest(
153 recv_request, [&done, recv_size](MPI_Status& status) {
155 int r = MPI_Get_count(&status, MPI_BYTE, &count);
156 if (r != MPI_SUCCESS)
157 throw Exception(
"Error during MPI_Get_count()", r);
159 if (static_cast<size_t>(count) != recv_size)
160 throw Exception(
"Error during SyncSendRecv(): message truncated?");
167 std::this_thread::yield();
169 tx_bytes_ += send_size;
170 rx_bytes_ += recv_size;
174 void* recv_data,
size_t recv_size) {
175 SyncSendRecv(send_data, send_size, recv_data, recv_size);
187 return std::make_unique<Dispatcher>(num_hosts());
193 dispatcher_.RunInThread(
195 std::unique_lock<std::mutex> lock(g_mutex);
198 int r = MPI_Ibarrier(MPI_COMM_WORLD, &request);
199 if (r != MPI_SUCCESS)
200 throw Exception(
"Error during MPI_Barrier()", r);
206 request, [&done](MPI_Status&) { done =
true; });
210 std::this_thread::yield();
213 template <
typename MpiCall>
217 dispatcher_.RunInThread(
219 std::unique_lock<std::mutex> lock(g_mutex);
222 int r = call(request);
224 if (r != MPI_SUCCESS)
225 throw Exception(
"Error during WaitForRequest", r);
231 request, [&done](MPI_Status&) { done =
true; });
235 std::this_thread::yield();
311 LOG <<
"Group::PrefixSumPlus(int);";
313 [&](MPI_Request& request) {
314 return MPI_Iscan(MPI_IN_PLACE, &value, 1, MPI_INT,
315 MPI_SUM, MPI_COMM_WORLD, &request);
320 LOG <<
"Group::ExPrefixSumPlus(int);";
322 [&](MPI_Request& request) {
323 return MPI_Iexscan(MPI_IN_PLACE, &value, 1, MPI_INT,
324 MPI_SUM, MPI_COMM_WORLD, &request);
326 value = (my_rank_ == 0 ? initial : value + initial);
329 LOG <<
"Group::Broadcast(int);";
331 [&](MPI_Request& request) {
332 return MPI_Ibcast(&value, 1, MPI_INT, origin,
333 MPI_COMM_WORLD, &request);
337 LOG <<
"Group::AllReducePlus(int);";
339 [&](MPI_Request& request) {
340 return MPI_Iallreduce(
341 MPI_IN_PLACE, &value, 1, MPI_INT,
342 MPI_SUM, MPI_COMM_WORLD, &request);
346 LOG <<
"Group::AllReduceMinimum(int);";
348 [&](MPI_Request& request) {
349 return MPI_Iallreduce(
350 MPI_IN_PLACE, &value, 1, MPI_INT,
351 MPI_MIN, MPI_COMM_WORLD, &request);
355 LOG <<
"Group::AllReduceMaximum(int);";
357 [&](MPI_Request& request) {
358 return MPI_Iallreduce(
359 MPI_IN_PLACE, &value, 1, MPI_INT,
360 MPI_MAX, MPI_COMM_WORLD, &request);
364 LOG <<
"Group::PrefixSumPlus(unsigned int);";
366 [&](MPI_Request& request) {
367 return MPI_Iscan(MPI_IN_PLACE, &value, 1, MPI_INT,
368 MPI_SUM, MPI_COMM_WORLD, &request);
373 LOG <<
"Group::ExPrefixSumPlus(unsigned int);";
375 [&](MPI_Request& request) {
376 return MPI_Iexscan(MPI_IN_PLACE, &value, 1, MPI_UNSIGNED,
377 MPI_SUM, MPI_COMM_WORLD, &request);
379 value = (my_rank_ == 0 ? initial : value + initial);
382 LOG <<
"Group::Broadcast(unsigned int);";
384 [&](MPI_Request& request) {
385 return MPI_Ibcast(&value, 1, MPI_UNSIGNED, origin,
386 MPI_COMM_WORLD, &request);
390 LOG <<
"Group::AllReducePlus(unsigned int);";
392 [&](MPI_Request& request) {
393 return MPI_Iallreduce(
394 MPI_IN_PLACE, &value, 1, MPI_UNSIGNED,
395 MPI_SUM, MPI_COMM_WORLD, &request);
399 LOG <<
"Group::AllReduceMinimum(unsigned int);";
401 [&](MPI_Request& request) {
402 return MPI_Iallreduce(
403 MPI_IN_PLACE, &value, 1, MPI_UNSIGNED,
404 MPI_MIN, MPI_COMM_WORLD, &request);
408 LOG <<
"Group::AllReduceMaximum(unsigned int);";
410 [&](MPI_Request& request) {
411 return MPI_Iallreduce(
412 MPI_IN_PLACE, &value, 1, MPI_UNSIGNED,
413 MPI_MAX, MPI_COMM_WORLD, &request);
417 LOG <<
"Group::PrefixSumPlus(long);";
419 [&](MPI_Request& request) {
420 return MPI_Iscan(MPI_IN_PLACE, &value, 1, MPI_INT,
421 MPI_SUM, MPI_COMM_WORLD, &request);
426 LOG <<
"Group::ExPrefixSumPlus(long);";
428 [&](MPI_Request& request) {
429 return MPI_Iexscan(MPI_IN_PLACE, &value, 1, MPI_LONG,
430 MPI_SUM, MPI_COMM_WORLD, &request);
432 value = (my_rank_ == 0 ? initial : value + initial);
435 LOG <<
"Group::Broadcast(long);";
437 [&](MPI_Request& request) {
438 return MPI_Ibcast(&value, 1, MPI_LONG, origin,
439 MPI_COMM_WORLD, &request);
443 LOG <<
"Group::AllReducePlus(long);";
445 [&](MPI_Request& request) {
446 return MPI_Iallreduce(
447 MPI_IN_PLACE, &value, 1, MPI_LONG,
448 MPI_SUM, MPI_COMM_WORLD, &request);
452 LOG <<
"Group::AllReduceMinimum(long);";
454 [&](MPI_Request& request) {
455 return MPI_Iallreduce(
456 MPI_IN_PLACE, &value, 1, MPI_LONG,
457 MPI_MIN, MPI_COMM_WORLD, &request);
461 LOG <<
"Group::AllReduceMaximum(long);";
463 [&](MPI_Request& request) {
464 return MPI_Iallreduce(
465 MPI_IN_PLACE, &value, 1, MPI_LONG,
466 MPI_MAX, MPI_COMM_WORLD, &request);
470 LOG <<
"Group::PrefixSumPlus(unsigned long);";
472 [&](MPI_Request& request) {
473 return MPI_Iscan(MPI_IN_PLACE, &value, 1, MPI_INT,
474 MPI_SUM, MPI_COMM_WORLD, &request);
479 LOG <<
"Group::ExPrefixSumPlus(unsigned long);";
481 [&](MPI_Request& request) {
482 return MPI_Iexscan(MPI_IN_PLACE, &value, 1, MPI_UNSIGNED_LONG,
483 MPI_SUM, MPI_COMM_WORLD, &request);
485 value = (my_rank_ == 0 ? initial : value + initial);
488 LOG <<
"Group::Broadcast(unsigned long);";
490 [&](MPI_Request& request) {
491 return MPI_Ibcast(&value, 1, MPI_UNSIGNED_LONG, origin,
492 MPI_COMM_WORLD, &request);
496 LOG <<
"Group::AllReducePlus(unsigned long);";
498 [&](MPI_Request& request) {
499 return MPI_Iallreduce(
500 MPI_IN_PLACE, &value, 1, MPI_UNSIGNED_LONG,
501 MPI_SUM, MPI_COMM_WORLD, &request);
505 LOG <<
"Group::AllReduceMinimum(unsigned long);";
507 [&](MPI_Request& request) {
508 return MPI_Iallreduce(
509 MPI_IN_PLACE, &value, 1, MPI_UNSIGNED_LONG,
510 MPI_MIN, MPI_COMM_WORLD, &request);
514 LOG <<
"Group::AllReduceMaximum(unsigned long);";
516 [&](MPI_Request& request) {
517 return MPI_Iallreduce(
518 MPI_IN_PLACE, &value, 1, MPI_UNSIGNED_LONG,
519 MPI_MAX, MPI_COMM_WORLD, &request);
523 LOG <<
"Group::PrefixSumPlus(long long);";
525 [&](MPI_Request& request) {
526 return MPI_Iscan(MPI_IN_PLACE, &value, 1, MPI_INT,
527 MPI_SUM, MPI_COMM_WORLD, &request);
532 LOG <<
"Group::ExPrefixSumPlus(long long);";
534 [&](MPI_Request& request) {
535 return MPI_Iexscan(MPI_IN_PLACE, &value, 1, MPI_LONG_LONG,
536 MPI_SUM, MPI_COMM_WORLD, &request);
538 value = (my_rank_ == 0 ? initial : value + initial);
541 LOG <<
"Group::Broadcast(long long);";
543 [&](MPI_Request& request) {
544 return MPI_Ibcast(&value, 1, MPI_LONG_LONG, origin,
545 MPI_COMM_WORLD, &request);
549 LOG <<
"Group::AllReducePlus(long long);";
551 [&](MPI_Request& request) {
552 return MPI_Iallreduce(
553 MPI_IN_PLACE, &value, 1, MPI_LONG_LONG,
554 MPI_SUM, MPI_COMM_WORLD, &request);
558 LOG <<
"Group::AllReduceMinimum(long long);";
560 [&](MPI_Request& request) {
561 return MPI_Iallreduce(
562 MPI_IN_PLACE, &value, 1, MPI_LONG_LONG,
563 MPI_MIN, MPI_COMM_WORLD, &request);
567 LOG <<
"Group::AllReduceMaximum(long long);";
569 [&](MPI_Request& request) {
570 return MPI_Iallreduce(
571 MPI_IN_PLACE, &value, 1, MPI_LONG_LONG,
572 MPI_MAX, MPI_COMM_WORLD, &request);
576 LOG <<
"Group::PrefixSumPlus(unsigned long long);";
578 [&](MPI_Request& request) {
579 return MPI_Iscan(MPI_IN_PLACE, &value, 1, MPI_INT,
580 MPI_SUM, MPI_COMM_WORLD, &request);
585 LOG <<
"Group::ExPrefixSumPlus(unsigned long long);";
587 [&](MPI_Request& request) {
588 return MPI_Iexscan(MPI_IN_PLACE, &value, 1, MPI_UNSIGNED_LONG_LONG,
589 MPI_SUM, MPI_COMM_WORLD, &request);
591 value = (my_rank_ == 0 ? initial : value + initial);
594 LOG <<
"Group::Broadcast(unsigned long long);";
596 [&](MPI_Request& request) {
597 return MPI_Ibcast(&value, 1, MPI_UNSIGNED_LONG_LONG, origin,
598 MPI_COMM_WORLD, &request);
602 LOG <<
"Group::AllReducePlus(unsigned long long);";
604 [&](MPI_Request& request) {
605 return MPI_Iallreduce(
606 MPI_IN_PLACE, &value, 1, MPI_UNSIGNED_LONG_LONG,
607 MPI_SUM, MPI_COMM_WORLD, &request);
611 LOG <<
"Group::AllReduceMinimum(unsigned long long);";
613 [&](MPI_Request& request) {
614 return MPI_Iallreduce(
615 MPI_IN_PLACE, &value, 1, MPI_UNSIGNED_LONG_LONG,
616 MPI_MIN, MPI_COMM_WORLD, &request);
620 LOG <<
"Group::AllReduceMaximum(unsigned long long);";
622 [&](MPI_Request& request) {
623 return MPI_Iallreduce(
624 MPI_IN_PLACE, &value, 1, MPI_UNSIGNED_LONG_LONG,
625 MPI_MAX, MPI_COMM_WORLD, &request);
635 std::unique_lock<std::mutex> lock(g_mutex);
644 int r = MPI_Initialized(&flag);
645 if (r != MPI_SUCCESS)
646 throw Exception(
"Error during MPI_Initialized()", r);
651 const char* argv[] = {
"thrill",
nullptr };
654 int r = MPI_Init_thread(&argc, reinterpret_cast<char***>(&argv),
655 MPI_THREAD_SERIALIZED, &provided);
656 if (r != MPI_SUCCESS)
657 throw Exception(
"Error during MPI_Init_thread()", r);
659 if (provided < MPI_THREAD_SERIALIZED)
660 die(
"ERROR: MPI_Init_thread() only provided= " << provided);
676 std::unique_ptr<Group>* groups,
size_t group_count) {
677 std::unique_lock<std::mutex> lock(g_mutex);
682 int r = MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
683 if (r != MPI_SUCCESS)
684 throw Exception(
"Error during MPI_Comm_rank()", r);
687 r = MPI_Comm_size(MPI_COMM_WORLD, &num_mpi_hosts);
688 if (r != MPI_SUCCESS)
689 throw Exception(
"Error during MPI_Comm_size()", r);
691 if (group_size > static_cast<size_t>(num_mpi_hosts))
692 throw Exception(
"mpi::Construct(): fewer MPI processes than hosts requested.");
694 for (
size_t i = 0; i < group_count; i++) {
695 groups[i] = std::make_unique<Group>(my_rank, i, group_size, dispatcher);
698 return (static_cast<size_t>(my_rank) < group_size);
702 std::unique_lock<std::mutex> lock(g_mutex);
707 int r = MPI_Comm_size(MPI_COMM_WORLD, &num_mpi_hosts);
708 if (r != MPI_SUCCESS)
709 throw Exception(
"Error during MPI_Comm_size()", r);
711 return static_cast<size_t>(num_mpi_hosts);
715 std::unique_lock<std::mutex> lock(g_mutex);
720 int r = MPI_Comm_rank(MPI_COMM_WORLD, &mpi_rank);
721 if (r != MPI_SUCCESS)
722 throw Exception(
"Error during MPI_Comm_rank()", r);
724 return static_cast<size_t>(mpi_rank);
void SyncSendRecv(const void *send_data, size_t send_size, void *recv_data, size_t recv_size) final
void BroadcastLongLong(long long &value, size_t origin) final
void AllReduceMinimumLongLong(long long &value) final
void AllReducePlusLong(long &value) final
static uint_pair max()
return an uint_pair instance containing the largest value possible
void AllReduceMinimumUnsignedInt(unsigned int &value) final
MPI_Request IRecv(Connection &c, uint32_t seq, void *data, size_t size)
void AllReduceMaximumUnsignedInt(unsigned int &value) final
void AddAsyncRequest(const MPI_Request &req, const AsyncRequestCallback &callback)
void AllReducePlusUnsignedLongLong(unsigned long long &value) final
void PrefixSumPlusLong(long &value, const long &initial) final
void BroadcastUnsignedLong(unsigned long &value, size_t origin) final
void AllReduceMaximumLong(long &value) final
void PrefixSumPlusUnsignedInt(unsigned int &value, const unsigned int &initial) final
A derived exception class which looks up MPI error strings.
void ExPrefixSumPlusLongLong(long long &value, const long long &initial) final
void AllReduceMinimumUnsignedLong(unsigned long &value) final
void PrefixSumPlusUnsignedLongLong(unsigned long long &value, const unsigned long long &initial) final
std::ostream & OutputOstream(std::ostream &os) const final
virtual method to output to a std::ostream
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
void AllReduceMinimumLong(long &value) final
void AllReducePlusUnsignedLong(unsigned long &value) final
static by_string to_string(int val)
convert to string
void BroadcastUnsignedLongLong(unsigned long long &value, size_t origin) final
std::string ToString() const final
mpi::Connection
void AllReduceMinimumInt(int &value) final
std::mutex g_mutex
The Grand MPI Library Invocation Mutex (The GMLIM)
void AllReducePlusLongLong(long long &value) final
void SyncSend(const void *data, size_t size, Flags=NoFlags) final
void Barrier()
run a MPI_Barrier() for synchronization.
std::unique_ptr< net::Dispatcher > ConstructDispatcher() const final
void ExPrefixSumPlusUnsignedLong(unsigned long &value, const unsigned long &initial) final
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
void WaitForRequest(MpiCall call)
void SyncRecv(void *out_data, size_t size) final
void ExPrefixSumPlusInt(int &value, const int &initial) final
size_t MpiRank()
Return the rank of this process in the MPI COMM WORLD.
static void Deinitialize()
atexit() method to deinitialize the MPI library.
void ExPrefixSumPlusUnsignedInt(unsigned int &value, const unsigned int &initial) final
void PrefixSumPlusInt(int &value, const int &initial) final
void SyncRecvSend(const void *send_data, size_t send_size, void *recv_data, size_t recv_size) final
void BroadcastUnsignedInt(unsigned int &value, size_t origin) final
size_t num_parallel_async() const final
Number of parallel sends or recvs requests supported by net backend.
void AllReduceMinimumUnsignedLongLong(unsigned long long &value) final
void BroadcastLong(long &value, size_t origin) final
Exception(const std::string &what)
static void Initialize()
run MPI_Init() if not already done (can be called multiple times).
void ExPrefixSumPlusUnsignedLongLong(unsigned long long &value, const unsigned long long &initial) final
void PrefixSumPlusUnsignedLong(unsigned long &value, const unsigned long &initial) final
Flags
Additional flags for sending or receiving.
size_t NumMpiProcesses()
Return the number of MPI processes.
static std::string GetErrorString(int error_code)
return the MPI error string
void PrefixSumPlusLongLong(long long &value, const long long &initial) final
void AllReduceMaximumLongLong(long long &value) final
void AllReduceMaximumUnsignedLongLong(unsigned long long &value) final
void AllReduceMaximumUnsignedLong(unsigned long &value) final
void AllReduceMaximumInt(int &value) final
MPI_Request ISend(Connection &c, uint32_t seq, const void *data, size_t size)
#define LOG
Default logging method: output if the local debug variable is true.
Dispatcher is a high level wrapper for asynchronous callback processing.
void AllReducePlusInt(int &value) final
void BroadcastInt(int &value, size_t origin) final
void AllReducePlusUnsignedInt(unsigned int &value) final
DispatcherThread contains a net::Dispatcher object and an associated thread that runs in the dispatch...
bool Construct(size_t group_size, DispatcherThread &dispatcher, std::unique_ptr< Group > *groups, size_t group_count)
Construct Group which connects to peers using MPI.
void ExPrefixSumPlusLong(long &value, const long &initial) final