36 #if THRILL_NET_MPI_QUEUES 45 LOG <<
"~mpi::Dispatcher()" 49 std::unique_lock<std::mutex> lock(g_mutex);
55 LOG1 <<
"Error during MPI_Cancel()";
62 Connection& c, uint32_t seq,
const void* data,
size_t size) {
64 std::unique_lock<std::mutex> lock(g_mutex);
67 int r = MPI_Isend(const_cast<void*>(data), static_cast<int>(size), MPI_BYTE,
68 c.
peer(),
static_cast<int>(seq),
69 MPI_COMM_WORLD, &request);
72 throw Exception(
"Error during MPI_Isend()", r);
74 LOG <<
"MPI_Isend() data=" << data <<
" size=" << size
75 <<
" peer=" << c.
peer() <<
" seq=" << seq
76 <<
" request=" << request;
84 Connection& c, uint32_t seq,
void* data,
size_t size) {
86 std::unique_lock<std::mutex> lock(g_mutex);
89 int r = MPI_Irecv(data, static_cast<int>(size), MPI_BYTE,
90 c.
peer(),
static_cast<int>(seq),
91 MPI_COMM_WORLD, &request);
94 throw Exception(
"Error during MPI_Irecv()", r);
96 LOG <<
"MPI_Irecv() data=" << data <<
" size=" << size
97 <<
" peer=" << c.
peer() <<
" seq=" << seq
98 <<
" request=" << request;
108 LOG <<
"AddAsyncRequest() req=" << req;
118 #if THRILL_NET_MPI_QUEUES 119 assert(dynamic_cast<Connection*>(&c));
122 int peer = mpic->
peer();
138 #if THRILL_NET_MPI_QUEUES 139 assert(dynamic_cast<Connection*>(&c));
142 int peer = mpic->
peer();
159 #if THRILL_NET_MPI_QUEUES 166 LOG <<
"Dispatcher::PumpSendQueue() send remaining=" 175 #if THRILL_NET_MPI_QUEUES 182 LOG <<
"Dispatcher::PumpRecvQueue(). recv remaining=" 194 assert(dynamic_cast<Connection*>(r.
connection()));
205 #if THRILL_NET_MPI_QUEUES 212 assert(dynamic_cast<Connection*>(r.
connection()));
223 #if THRILL_NET_MPI_QUEUES 230 assert(dynamic_cast<Connection*>(r.
connection()));
241 #if THRILL_NET_MPI_QUEUES 248 assert(dynamic_cast<Connection*>(r.
connection()));
259 #if THRILL_NET_MPI_QUEUES 275 std::unique_lock<std::mutex> lock(g_mutex);
280 sLOG <<
"DispatchOne(): MPI_Testsome()" 283 int r = MPI_Testsome(
299 if (r != MPI_SUCCESS)
300 throw Exception(
"Error during MPI_Testsome()", r);
302 if (out_count == MPI_UNDEFINED) {
305 else if (out_count > 0) {
306 sLOG <<
"DispatchOne(): MPI_Testsome() out_count=" << out_count;
323 for (
size_t i = 0; i <
mpi_async_.size(); ++i)
327 sLOG <<
"Working #" << k
333 #if THRILL_NET_MPI_QUEUES 343 LOG <<
"DispatchOne() send_active_[" << peer <<
"]=" 352 LOG <<
"DispatchOne() recv_active_[" << peer <<
"]=" 377 int out_index = 0, out_flag = 0;
378 MPI_Status out_status;
381 sLOG0 <<
"DispatchOne(): MPI_Testany()" 396 if (r != MPI_SUCCESS)
397 throw Exception(
"Error during MPI_Testany()", r);
407 LOG <<
"DispatchOne(): MPI_Testany() out_flag=" << out_flag
408 <<
" done #" << out_index
409 <<
" out_tag=" << out_status.MPI_TAG;
414 #if THRILL_NET_MPI_QUEUES 425 #if THRILL_NET_MPI_QUEUES 431 LOG <<
"DispatchOne() send_active_[" << peer <<
"]=" 440 LOG <<
"DispatchOne() recv_active_[" << peer <<
"]=" 457 std::unique_lock<std::mutex> lock(g_mutex);
459 int r = MPI_Iprobe(MPI_ANY_SOURCE, 0,
460 MPI_COMM_WORLD, &flag, &status);
462 if (r != MPI_SUCCESS)
463 throw Exception(
"Error during MPI_Iprobe()", r);
467 if (flag == 0)
return;
470 int p = status.MPI_SOURCE;
471 assert(p >= 0 && static_cast<size_t>(p) <
watch_.size());
476 sLOG <<
"Got Iprobe() for unwatched peer" << p;
480 sLOG <<
"Got iprobe for peer" << p;
482 if (w.read_cb.size()) {
485 while (w.read_cb.size() && w.read_cb.front()() ==
false) {
486 w.read_cb.pop_front();
489 if (w.read_cb.size() == 0) {
495 LOG <<
"Dispatcher: got MPI_Iprobe() for peer " 496 << p <<
" without a read handler.";
Connection * connection() const
Returns conn_.
void PerformAsync(MpiAsync &&a)
Issue the encapsulated request to the MPI layer.
#define sLOG
Default logging method: output if the local debug variable is true.
Connection * connection()
Return mpi Connection pointer.
uint8_t * data()
underlying buffer pointer
MPI_Request IRecv(Connection &c, uint32_t seq, void *data, size_t size)
Connection * connection() const
Returns conn_.
std::vector< size_t > recv_active_
number of active requests
std::atomic< size_t > tx_bytes_
sent bytes
void AddAsyncRequest(const MPI_Request &req, const AsyncRequestCallback &callback)
Dispatcher()=default
default constructor
A derived exception class which looks up MPI error strings.
const uint8_t * data() const
underlying buffer pointer
callback vectors per peer
std::deque< std::deque< MpiAsync > > recv_queue_
queue of delayed requests for each peer
std::vector< int > mpi_async_out_
array of output integer of finished requests for MPI_Testsome().
void PumpSendQueue(int peer)
Check send queue and perform waiting requests.
void PumpRecvQueue(int peer)
Check recv queue and perform waiting requests.
void QueueAsyncRecv(net::Connection &c, MpiAsync &&a)
Enqueue and run the encapsulated result.
size_t size() const
underlying buffer size
size_t size() const
underlying buffer size
void QueueAsyncSend(net::Connection &c, MpiAsync &&a)
Enqueue and run the encapsulated result.
#define sLOG0
Override default output: never or always output log.
std::vector< MPI_Status > mpi_status_out_
array of output MPI_Status of finished requests for MPI_Testsome().
std::mutex g_mutex
The Grand MPI Library Invocation Mutex (The GMLIM)
std::vector< MpiAsync > mpi_async_
size_t watch_active_
counter of active watches
uint8_t * data()
underlying buffer pointer
std::vector< size_t > send_active_
number of active requests
void DispatchOne(const std::chrono::milliseconds &timeout) final
Run one iteration of dispatching using MPI_Iprobe().
A Connection represents a link to another peer in a network group.
This is the big answer to what happens when an MPI async request is signaled as complete: it unifies ...
int peer() const
return the MPI peer number
Connection * connection() const
Returns conn_.
size_t size() const
underlying buffer size
std::vector< Watch > watch_
callback watch vector
size_t size() const
underlying buffer size
static const size_t g_simultaneous
number of simultaneous transfers
Connection * connection() const
Returns conn_.
Virtual MPI connection class.
const uint8_t * data() const
underlying buffer pointer
Type type_
type of this async
MPI_Request ISend(Connection &c, uint32_t seq, const void *data, size_t size)
#define LOG
Default logging method: output if the local debug variable is true.
std::deque< std::deque< MpiAsync > > send_queue_
queue of delayed requests for each peer
std::vector< MPI_Request > mpi_async_requests_
array of current async MPI_Request for MPI_Testsome().
std::atomic< size_t > rx_bytes_
received bytes