15 #ifndef THRILL_NET_MPI_DISPATCHER_HEADER 16 #define THRILL_NET_MPI_DISPATCHER_HEADER 37 #define THRILL_NET_MPI_QUEUES 1 81 static constexpr
bool debug =
false;
97 assert(dynamic_cast<Connection*>(&c));
100 assert(p < watch_.size());
101 watch_[p].active =
true;
102 watch_[p].read_cb.emplace_back(read_cb);
114 assert(dynamic_cast<Connection*>(&c));
116 size_t p = mc.
peer();
117 assert(p < watch_.size());
118 watch_[p].active =
true;
119 watch_[p].except_cb = except_cb;
125 assert(dynamic_cast<Connection*>(&c));
127 size_t p = mc.
peer();
128 assert(p < watch_.size());
130 if (watch_[p].read_cb.size() == 0)
131 LOG <<
"SelectDispatcher::Cancel() peer=" << p
132 <<
" called with no callbacks registered.";
134 Watch& w = watch_[p];
142 Connection& c, uint32_t seq,
const void* data,
size_t size);
144 Connection& c, uint32_t seq,
void* data,
size_t size);
146 void AddAsyncRequest(
154 if (buffer.size() == 0) {
155 if (done_cb) done_cb(c);
159 QueueAsyncSend(c,
MpiAsync(c, seq, std::move(buffer), done_cb));
167 if (block.size() == 0) {
168 if (done_cb) done_cb(c);
172 QueueAsyncSend(c,
MpiAsync(c, seq, std::move(block), done_cb));
181 if (done_cb) done_cb(c,
Buffer());
185 QueueAsyncRecv(c,
MpiAsync(c, seq, size, done_cb));
192 assert(block.valid());
194 if (block->size() == 0) {
195 if (done_cb) done_cb(c, std::move(block));
199 QueueAsyncRecv(c,
MpiAsync(c, seq, size, std::move(block), done_cb));
212 void PumpSendQueue(
int peer);
215 void PumpRecvQueue(
int peer);
218 void DispatchOne(
const std::chrono::milliseconds& timeout)
final;
229 std::deque<Callback, mem::GPoolAllocator<Callback> >
239 size_t watch_active_ { 0 };
243 throw Exception(
"SelectDispatcher() exception on socket!", errno);
269 : type_(REQUEST), seq_(0),
270 arequest_(callback) { }
276 : type_(WRITE_BUFFER), seq_(seq),
277 write_buffer_(conn,
std::move(buffer), callback) { }
282 : type_(READ_BUFFER), seq_(seq),
283 read_buffer_(conn, buffer_size, callback) { }
289 : type_(WRITE_BLOCK), seq_(seq),
290 write_block_(conn,
std::move(block), callback) { }
297 : type_(READ_BYTE_BLOCK), seq_(seq),
298 read_byte_block_(conn, size,
std::move(block), callback) { }
307 Acquire(std::move(ma));
313 if (
this == &ma)
return *
this;
319 Acquire(std::move(ma));
328 if (type_ == REQUEST)
329 arequest_.~AsyncRequest();
330 else if (type_ == WRITE_BUFFER)
331 write_buffer_.~AsyncWriteBuffer();
332 else if (type_ == READ_BUFFER)
333 read_buffer_.~AsyncReadBuffer();
334 else if (type_ == WRITE_BLOCK)
335 write_block_.~AsyncWriteBlock();
336 else if (type_ == READ_BYTE_BLOCK)
337 read_byte_block_.~AsyncReadByteBlock();
342 if (type_ == REQUEST)
343 arequest_.DoCallback(s);
344 else if (type_ == WRITE_BUFFER)
345 write_buffer_.DoCallback();
346 else if (type_ == READ_BUFFER) {
348 MPI_Get_count(&s, MPI_BYTE, &size);
349 read_buffer_.DoCallback(size);
351 else if (type_ == WRITE_BLOCK)
352 write_block_.DoCallback();
353 else if (type_ == READ_BYTE_BLOCK) {
355 MPI_Get_count(&s, MPI_BYTE, &size);
356 read_byte_block_.DoCallback(size);
362 if (type_ == REQUEST)
364 else if (type_ == WRITE_BUFFER)
365 return static_cast<Connection*
>(write_buffer_.connection());
366 else if (type_ == READ_BUFFER)
367 return static_cast<Connection*
>(read_buffer_.connection());
368 else if (type_ == WRITE_BLOCK)
369 return static_cast<Connection*
>(write_block_.connection());
370 else if (type_ == READ_BYTE_BLOCK)
371 return static_cast<Connection*
>(read_byte_block_.connection());
372 die(
"Unknown Buffer type");
394 assert(type_ == ma.
type_);
397 if (type_ == REQUEST)
399 else if (type_ == WRITE_BUFFER)
401 else if (type_ == READ_BUFFER)
403 else if (type_ == WRITE_BLOCK)
405 else if (type_ == READ_BYTE_BLOCK)
426 #if THRILL_NET_MPI_QUEUES 447 #endif // !THRILL_NET_MPI_DISPATCHER_HEADER void DoCallback(MPI_Status &s)
Dispatch done message to correct callback.
bool active
boolean check whether any callbacks are registered
Connection * connection()
Return mpi Connection pointer.
void AddRead(net::Connection &c, const Callback &read_cb) final
Register a buffered read callback and a default exception callback.
virtual bool IsValid() const =0
check whether the connection is (still) valid.
AsyncWriteBuffer write_buffer_
std::vector< size_t > recv_active_
number of active requests
tlx::delegate< void(Connection &), mem::GPoolAllocator< char > > AsyncWriteCallback
Signature of async write callbacks.
void Interrupt() final
Interrupt does nothing.
A derived exception class which looks up MPI error strings.
callback vectors per peer
A pinned / pin-counted pointer to a ByteBlock.
void AsyncWrite(net::Connection &c, uint32_t seq, data::PinnedBlock &&block, const AsyncWriteCallback &done_cb=AsyncWriteCallback()) final
std::deque< std::deque< MpiAsync > > recv_queue_
queue of delayed requests for each peer
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
tlx::delegate< bool(), mem::GPoolAllocator< char > > AsyncCallback
Signature of async connection readability/writability callbacks.
std::vector< int > mpi_async_out_
array of output integer of finished requests for MPI_Testsome().
AsyncRequest & operator=(const AsyncRequest &)=delete
non-copyable: delete assignment operator
MpiAsync(const AsyncRequestCallback &callback)
construct generic MPI async request
AsyncRequestCallback callback_
functional object to call once data is complete
std::deque< Callback, mem::GPoolAllocator< Callback > > read_cb
queue of callbacks for peer.
void AsyncRead(net::Connection &c, uint32_t seq, size_t size, data::PinnedByteBlockPtr &&block, const AsyncReadByteBlockCallback &done_cb) final
asynchronously read the full ByteBlock and deliver it to the callback
AsyncReadBuffer read_buffer_
void AddWrite(net::Connection &, const Callback &) final
Register a buffered write callback and a default exception callback.
void DoCallback(MPI_Status &s)
MpiAsync(net::Connection &conn, uint32_t seq, size_t buffer_size, const AsyncReadBufferCallback &callback)
Construct AsyncRead with Buffer.
std::vector< MPI_Status > mpi_status_out_
array of output MPI_Status of finished requests for MPI_Testsome().
void AsyncRead(net::Connection &c, uint32_t seq, size_t size, const AsyncReadBufferCallback &done_cb=AsyncReadBufferCallback()) final
asynchronously read n bytes and deliver them to the callback
MpiAsync(net::Connection &conn, uint32_t seq, data::PinnedBlock &&block, const AsyncWriteCallback &callback)
Construct AsyncWrite with Block.
std::vector< MpiAsync > mpi_async_
tlx::delegate< void(MPI_Status &), mem::GPoolAllocator< char > > AsyncRequestCallback
Signature of async MPI request callbacks.
void Acquire(MpiAsync &&ma) noexcept
assign myself the other object's content
static bool DefaultExceptionCallback()
Default exception handler.
std::vector< size_t > send_active_
number of active requests
MpiAsync(net::Connection &conn, uint32_t seq, size_t size, data::PinnedByteBlockPtr &&block, const AsyncReadByteBlockCallback &callback)
Construct AsyncRead with ByteBuffer.
A Connection represents a link to another peer in a network group.
static constexpr bool debug
void AsyncWrite(net::Connection &c, uint32_t seq, Buffer &&buffer, const AsyncWriteCallback &done_cb=AsyncWriteCallback()) final
AsyncRequest(const AsyncRequestCallback &callback)
Construct buffered reader with callback.
void Cancel(net::Connection &c) final
Cancel all callbacks on a given peer.
MpiAsync()
default constructor for resize
void SetExcept(net::Connection &c, const Callback &except_cb)
Register a buffered write callback and a default exception callback.
MpiAsync(net::Connection &conn, uint32_t seq, Buffer &&buffer, const AsyncWriteCallback &callback)
Construct AsyncWrite with Buffer.
This is the big answer to what happens when an MPI async request is signaled as complete: it unifies ...
Callback except_cb
only one exception callback for the peer.
int peer() const
return the MPI peer number
A pinned / pin-counted derivative of a Block.
static constexpr bool debug_async
std::vector< Watch > watch_
callback watch vector
Simple buffer of characters without initialization or growing functionality.
Virtual MPI connection class.
MpiAsync(MpiAsync &&ma)
move-constructor: move item
Type type_
type of this async
#define LOG
Default logging method: output if the local debug variable is true.
Dispatcher is a high level wrapper for asynchronous callback processing.
AsyncWriteBlock write_block_
std::deque< std::deque< MpiAsync > > send_queue_
queue of delayed requests for each peer
AsyncReadByteBlock read_byte_block_
tlx::delegate< void(Connection &c, Buffer &&buffer), mem::GPoolAllocator< char > > AsyncReadBufferCallback
Signature of async read Buffer callbacks.
std::vector< MPI_Request > mpi_async_requests_
array of current async MPI_Request for MPI_Testsome().
#define LOGC(cond)
Explicitly specify the condition for logging.