15 #ifndef THRILL_NET_DISPATCHER_HEADER 16 #define THRILL_NET_DISPATCHER_HEADER 76 <<
"AsyncReadBuffer()" 92 <<
"~AsyncReadBuffer()" 98 LOGC(debug_async_recv)
99 <<
"AsyncReadBuffer() recv" 108 if (errno == EINTR || errno == EAGAIN)
return true;
114 if (errno == 0 || errno == EPIPE || errno == ECONNRESET) {
118 throw Exception(
"AsyncReadBuffer() error in recv() on " 189 <<
"AsyncWriteBuffer()" 205 <<
"~AsyncWriteBuffer()" 211 LOGC(debug_async_recv)
212 <<
"AsyncWriteBuffer() send" 213 <<
" offset=" << write_size_
220 if (errno == EINTR || errno == EAGAIN)
return true;
225 if (errno == EPIPE) {
226 LOG1 <<
"AsyncWriteBuffer() got EPIPE";
230 throw Exception(
"AsyncWriteBuffer() error in send", errno);
271 size_t write_size_ = 0;
287 block_(
std::move(block)),
291 <<
"AsyncReadByteBlock()" 292 <<
" block_=" << block_
293 <<
" size_=" << size_;
308 <<
"~AsyncReadByteBlock()" 309 <<
" block_=" << block_
310 <<
" size_=" << size_;
315 LOGC(debug_async_recv)
316 <<
"AsyncReadByteBlock() recv" 317 <<
" offset=" << pos_
318 <<
" size=" << size_ - pos_;
321 block_->data() + pos_, size_ - pos_);
325 if (errno == EINTR || errno == EAGAIN)
return true;
331 if (errno == 0 || errno == EPIPE || errno == ECONNRESET) {
335 throw Exception(
"AsyncReadBlock() error in recv", errno);
352 return !block_ || pos_ == size_;
373 uint8_t *
data() {
return block_->data(); }
376 const uint8_t *
data()
const {
return block_->data(); }
379 size_t size()
const {
return size_; }
407 block_(
std::move(block)),
410 <<
"AsyncWriteBlock()" 411 <<
" block_.size()=" << block_.size()
412 <<
" block_=" << block_;
427 <<
"~AsyncWriteBlock()" 428 <<
" block_=" << block_;
433 LOGC(debug_async_send)
434 <<
"AsyncWriteBlock() send" 435 <<
" offset=" << written_size_
436 <<
" size=" << block_.size() - written_size_;
439 block_.data_begin() + written_size_,
440 block_.size() - written_size_);
443 if (errno == EINTR || errno == EAGAIN)
return true;
446 written_size_ = block_.size();
448 if (errno == EPIPE) {
449 LOG1 <<
"AsyncWriteBlock() got EPIPE";
453 throw Exception(
"AsyncWriteBlock() error in send", errno);
458 if (written_size_ == block_.size()) {
468 bool IsDone()
const {
return written_size_ == block_.size(); }
483 const uint8_t *
data()
const {
return block_.data_begin(); }
486 size_t size()
const {
return block_.size(); }
496 size_t written_size_ = 0;
512 static constexpr
bool debug =
false;
540 void AddTimer(
const std::chrono::milliseconds& timeout,
542 timer_pq_.emplace(steady_clock::now() + timeout,
543 std::chrono::duration_cast<milliseconds>(timeout),
571 LOG <<
"async read on read dispatcher";
573 if (done_cb) done_cb(c,
Buffer());
578 async_read_.emplace_back(c, size, done_cb);
582 AddRead(c, AsyncCallback::make<
592 LOG <<
"async read on read dispatcher";
593 if (block->size() == 0) {
594 if (done_cb) done_cb(c, std::move(block));
599 async_read_block_.emplace_back(c, size, std::move(block), done_cb);
603 AddRead(c, AsyncCallback::make<
615 if (done_cb) done_cb(c);
620 async_write_.emplace_back(c, std::move(
buffer), done_cb);
624 AddWrite(c, AsyncCallback::make<
635 if (block.size() == 0) {
636 if (done_cb) done_cb(c);
641 async_write_block_.emplace_back(c, std::move(block), done_cb);
645 AddWrite(c, AsyncCallback::make<
654 return AsyncWrite(c, seq,
Buffer(buffer, size), done_cb);
662 return AsyncWriteCopy(c, seq, str.data(), str.size(), done_cb);
673 steady_clock::time_point now = steady_clock::now();
675 while (!terminate_ &&
676 !timer_pq_.empty() &&
677 timer_pq_.top().next_timeout <= now)
679 const Timer& top = timer_pq_.top();
688 if (terminate_)
return;
691 if (timer_pq_.empty()) {
692 LOG <<
"Dispatch(): empty timer queue - selecting for 10s";
697 timer_pq_.top().next_timeout - now);
701 sLOG <<
"Dispatch(): waiting" << diff.count() <<
"ms";
706 while (async_read_.size() && async_read_.front().IsDone()) {
707 async_read_.pop_front();
709 while (async_write_.size() && async_write_.front().IsDone()) {
710 async_write_.pop_front();
713 while (async_read_block_.size() && async_read_block_.front().IsDone()) {
714 async_read_block_.pop_front();
716 while (async_write_block_.size() && async_write_block_.front().IsDone()) {
717 async_write_block_.pop_front();
723 while (!terminate_) {
729 virtual void Interrupt() = 0;
740 return (async_write_.size() != 0) || (async_write_block_.size() != 0);
746 virtual void DispatchOne(
const std::chrono::milliseconds& timeout) = 0;
752 "Dispatcher() exception on socket fd " 757 std::atomic<bool> terminate_ {
false };
770 Timer(
const steady_clock::time_point& _next_timeout,
773 : next_timeout(_next_timeout),
783 using TimerPQ = std::priority_queue<
784 Timer, std::vector<Timer, mem::GPoolAllocator<Timer> > >;
814 #endif // !THRILL_NET_DISPATCHER_HEADER Connection * connection() const
Returns conn_.
static bool ExceptionCallback(Connection &c)
Default exception handler.
std::deque< AsyncReadByteBlock, mem::GPoolAllocator< AsyncReadByteBlock > > async_read_block_
deque of asynchronous readers
size_t read_size_
total size currently read
Buffer & buffer()
reference to Buffer
#define sLOG
Default logging method: output if the local debug variable is true.
Timer(const steady_clock::time_point &_next_timeout, const milliseconds &_timeout, const TimerCallback &_cb)
std::chrono::milliseconds milliseconds
import into class namespace
Connection * conn_
Connection reference.
uint8_t * data()
underlying buffer pointer
virtual void AsyncWrite(Connection &c, uint32_t, Buffer &&buffer, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
virtual bool IsValid() const =0
check whether the connection is (still) valid.
struct for timer callbacks
Connection * connection() const
Returns conn_.
std::deque< AsyncWriteBlock, mem::GPoolAllocator< AsyncWriteBlock > > async_write_block_
deque of asynchronous writers
tlx::delegate< void(Connection &), mem::GPoolAllocator< char > > AsyncWriteCallback
Signature of async write callbacks.
static constexpr bool debug_async_recv
virtual std::string ToString() const =0
return a string representation of this connection, for user output.
const uint8_t * data() const
underlying buffer pointer
const uint8_t * data() const
underlying buffer pointer
AsyncWriteCallback callback_
functional object to call once data is complete
A pinned / pin-counted pointer to a ByteBlock.
void Dispatch()
Dispatch one or more events.
bool operator()()
Should be called when the socket is readable.
virtual void AsyncWrite(Connection &c, uint32_t, data::PinnedBlock &&block, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
void AsyncWriteCopy(Connection &c, uint32_t seq, const void *buffer, size_t size, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
virtual void AsyncRead(Connection &c, uint32_t, size_t size, const AsyncReadBufferCallback &done_cb)
asynchronously read n bytes and deliver them to the callback
std::chrono::steady_clock steady_clock
import into class namespace
void Loop()
Loop over Dispatch() until terminate_ flag is set.
size_t size() const
underlying buffer size
size_t size() const
underlying buffer size
size_t size_
total size to read
A Exception is thrown by Connection on all errors instead of returning error codes.
std::deque< AsyncReadBuffer, mem::GPoolAllocator< AsyncReadBuffer > > async_read_
deque of asynchronous readers
AsyncReadBuffer & operator=(const AsyncReadBuffer &)=delete
non-copyable: delete assignment operator
iterator data() noexcept
return iterator to beginning of Buffer
void AsyncWriteCopy(Connection &c, uint32_t seq, const std::string &str, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
AsyncWriteBlock(Connection &conn, data::PinnedBlock &&block, const AsyncWriteCallback &callback)
Construct block writer with callback.
void DoCallback(size_t size_check)
virtual ssize_t RecvOne(void *out_data, size_t size)=0
uint8_t * data()
underlying buffer pointer
#define die_unequal(X, Y)
std::priority_queue< Timer, std::vector< Timer, mem::GPoolAllocator< Timer > > > TimerPQ
priority queue of timer callbacks
A Connection represents a link to another peer in a network group.
std::deque< AsyncWriteBuffer, mem::GPoolAllocator< AsyncWriteBuffer > > async_write_
deque of asynchronous writers
static constexpr bool debug_async_send
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
static constexpr bool debug
data::PinnedBlock block_
Send block (holds a pin on the underlying ByteBlock)
milliseconds timeout
relative timeout for restarting
tlx::delegate< void(Connection &c, data::PinnedByteBlockPtr &&bytes), mem::GPoolAllocator< char > > AsyncReadByteBlockCallback
Signature of async read ByteBlock callbacks.
static const size_t bytes
number of bytes in uint_pair
bool HasAsyncWrites() const
Check whether there are still AsyncWrite()s in the queue.
std::deque< T, Allocator< T > > deque
deque with Manager tracking
steady_clock::time_point next_timeout
timepoint of next timeout
AsyncWriteCallback callback_
functional object to call once data is complete
AsyncWriteBuffer(Connection &conn, Buffer &&buffer, const AsyncWriteCallback &callback)
Construct buffered writer with callback.
data::PinnedByteBlockPtr & byte_block()
Connection * connection() const
Returns conn_.
Connection * conn_
Connection reference.
size_t size() const
underlying buffer size
A pinned / pin-counted derivative of a Block.
void AddTimer(const std::chrono::milliseconds &timeout, const TimerCallback &cb)
Register a relative timeout callback.
void DoCallback(size_t size_check)
static constexpr bool debug_async
Buffer buffer_
Send buffer (owned by this writer)
Simple buffer of characters without initialization or growing functionality.
AsyncReadByteBlock(Connection &conn, size_t size, data::PinnedByteBlockPtr &&block, const AsyncReadByteBlockCallback &callback)
Construct block reader with callback.
const uint8_t * data() const
underlying buffer pointer
size_t size() const
underlying buffer size
std::atomic< size_t > tx_active_
active send requests
Connection * conn_
Connection reference.
Connection * connection() const
Returns conn_.
data::PinnedByteBlockPtr block_
Receive block, holds a pin on the memory.
virtual void AsyncRead(Connection &c, uint32_t, size_t size, data::PinnedByteBlockPtr &&block, const AsyncReadByteBlockCallback &done_cb)
asynchronously read the full ByteBlock and deliver it to the callback
bool operator<(const uint_pair &b) const
less-than comparison operator
Connection * conn_
Connection reference.
const uint8_t * data() const
underlying buffer pointer
Buffer buffer_
Receive buffer (allocates memory)
#define LOG
Default logging method: output if the local debug variable is true.
Dispatcher is a high level wrapper for asynchronous callback processing.
virtual ssize_t SendOne(const void *data, size_t size, Flags flags=NoFlags)=0
AsyncReadByteBlockCallback callback_
functional object to call once data is complete
tlx::delegate< void(Connection &c, Buffer &&buffer), mem::GPoolAllocator< char > > AsyncReadBufferCallback
Signature of async read Buffer callbacks.
DispatcherThread contains a net::Dispatcher object and an associated thread that runs in the dispatch...
#define LOGC(cond)
Explicitly specify the condition for logging.
size_type size() const noexcept
return number of items in Buffer
std::atomic< size_t > rx_active_
active recv requests
AsyncReadBuffer(Connection &conn, size_t buffer_size, const AsyncReadBufferCallback &callback)
Construct buffered reader with callback.
virtual ~Dispatcher()
virtual destructor
AsyncReadBufferCallback callback_
functional object to call once data is complete