Thrill  0.1
dispatcher.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/dispatcher.hpp
3  *
4  * Asynchronous callback wrapper around select(), epoll(), or other kernel-level
5  * dispatchers.
6  *
7  * Part of Project Thrill - http://project-thrill.org
8  *
9  * Copyright (C) 2015 Timo Bingmann <[email protected]>
10  *
11  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
12  ******************************************************************************/
13 
14 #pragma once
15 #ifndef THRILL_NET_DISPATCHER_HEADER
16 #define THRILL_NET_DISPATCHER_HEADER
17 
18 #include <thrill/data/block.hpp>
20 #include <thrill/mem/allocator.hpp>
21 #include <thrill/net/buffer.hpp>
23 #include <tlx/delegate.hpp>
24 #include <tlx/die.hpp>
25 
26 #include <atomic>
27 #include <chrono>
28 #include <ctime>
29 #include <deque>
30 #include <functional>
31 #include <queue>
32 #include <string>
33 #include <vector>
34 
35 namespace thrill {
36 namespace net {
37 
38 //! \addtogroup net_layer
39 //! \{
40 
41 //! Signature of timer callbacks.
43 
44 //! Signature of async connection readability/writability callbacks.
46 
47 //! Signature of async read Buffer callbacks.
50 
51 //! Signature of async read ByteBlock callbacks.
55 
56 //! Signature of async write callbacks.
59 
60 /******************************************************************************/
61 
62 static constexpr bool debug_async = false;
63 static constexpr bool debug_async_send = false;
64 static constexpr bool debug_async_recv = false;
65 
67 {
68 public:
69  //! Construct buffered reader with callback
70  AsyncReadBuffer(Connection& conn, size_t buffer_size,
71  const AsyncReadBufferCallback& callback)
72  : conn_(&conn),
73  buffer_(buffer_size),
74  callback_(callback) {
75  LOGC(debug_async)
76  << "AsyncReadBuffer()"
77  << " buffer_.size()=" << buffer_.size();
78  conn_->rx_active_++;
79  }
80 
81  //! non-copyable: delete copy-constructor
82  AsyncReadBuffer(const AsyncReadBuffer&) = delete;
83  //! non-copyable: delete assignment operator
85  //! move-constructor: default
86  AsyncReadBuffer(AsyncReadBuffer&&) = default;
87  //! move-assignment operator: default
89 
91  LOGC(debug_async)
92  << "~AsyncReadBuffer()"
93  << " buffer_.size()=" << buffer_.size();
94  }
95 
96  //! Should be called when the socket is readable
97  bool operator () () {
98  LOGC(debug_async_recv)
99  << "AsyncReadBuffer() recv"
100  << " offset=" << read_size_
101  << " size=" << buffer_.size() - read_size_;
102 
103  ssize_t r = conn_->RecvOne(
105 
106  if (r <= 0) {
107  // these errors are acceptable: just redo the recv later.
108  if (errno == EINTR || errno == EAGAIN) return true;
109 
110  // signal artificial IsDone, for clean up.
111  read_size_ = buffer_.size();
112 
113  // these errors are end-of-file indications (both good and bad)
114  if (errno == 0 || errno == EPIPE || errno == ECONNRESET) {
115  if (callback_) callback_(*conn_, Buffer());
116  return false;
117  }
118  throw Exception("AsyncReadBuffer() error in recv() on "
119  "connection " + conn_->ToString(), errno);
120  }
121 
122  read_size_ += r;
123 
124  if (read_size_ == buffer_.size()) {
125  DoCallback();
126  conn_->rx_active_--;
127  return false;
128  }
129  else {
130  return true;
131  }
132  }
133 
134  bool IsDone() const { return read_size_ == buffer_.size(); }
135 
136  //! reference to Buffer
137  Buffer& buffer() { return buffer_; }
138 
139  void DoCallback() {
140  if (callback_) {
141  callback_(*conn_, std::move(buffer_));
143  }
144  }
145 
146  void DoCallback(size_t size_check) {
147  die_unequal(size_check, buffer_.size());
148  return DoCallback();
149  }
150 
151  //! Returns conn_
152  Connection * connection() const { return conn_; }
153 
154  //! underlying buffer pointer
155  uint8_t * data() { return buffer_.data(); }
156 
157  //! underlying buffer pointer
158  const uint8_t * data() const { return buffer_.data(); }
159 
160  //! underlying buffer size
161  size_t size() const { return buffer_.size(); }
162 
163 private:
164  //! Connection reference
166 
167  //! Receive buffer (allocates memory)
169 
170  //! total size currently read
171  size_t read_size_ = 0;
172 
173  //! functional object to call once data is complete
175 };
176 
177 /******************************************************************************/
178 
180 {
181 public:
182  //! Construct buffered writer with callback
184  const AsyncWriteCallback& callback)
185  : conn_(&conn),
186  buffer_(std::move(buffer)),
187  callback_(callback) {
188  LOGC(debug_async)
189  << "AsyncWriteBuffer()"
190  << " buffer_.size()=" << buffer_.size();
191  conn_->tx_active_++;
192  }
193 
194  //! non-copyable: delete copy-constructor
195  AsyncWriteBuffer(const AsyncWriteBuffer&) = delete;
196  //! non-copyable: delete assignment operator
198  //! move-constructor: default
199  AsyncWriteBuffer(AsyncWriteBuffer&&) = default;
200  //! move-assignment operator: default
202 
204  LOGC(debug_async)
205  << "~AsyncWriteBuffer()"
206  << " buffer_.size()=" << buffer_.size();
207  }
208 
209  //! Should be called when the socket is writable
210  bool operator () () {
211  LOGC(debug_async_recv)
212  << "AsyncWriteBuffer() send"
213  << " offset=" << write_size_
214  << " size=" << buffer_.size() - write_size_;
215 
216  ssize_t r = conn_->SendOne(
217  buffer_.data() + write_size_, buffer_.size() - write_size_);
218 
219  if (r <= 0) {
220  if (errno == EINTR || errno == EAGAIN) return true;
221 
222  // signal artificial IsDone, for clean up.
223  write_size_ = buffer_.size();
224 
225  if (errno == EPIPE) {
226  LOG1 << "AsyncWriteBuffer() got EPIPE";
227  DoCallback();
228  return false;
229  }
230  throw Exception("AsyncWriteBuffer() error in send", errno);
231  }
232 
233  write_size_ += r;
234 
235  if (write_size_ == buffer_.size()) {
236  DoCallback();
237  conn_->tx_active_--;
238  return false;
239  }
240  else {
241  return true;
242  }
243  }
244 
245  bool IsDone() const { return write_size_ == buffer_.size(); }
246 
247  void DoCallback() {
248  if (callback_) {
249  callback_(*conn_);
251  }
252  }
253 
254  //! Returns conn_
255  Connection * connection() const { return conn_; }
256 
257  //! underlying buffer pointer
258  const uint8_t * data() const { return buffer_.data(); }
259 
260  //! underlying buffer size
261  size_t size() const { return buffer_.size(); }
262 
263 private:
264  //! Connection reference
266 
267  //! Send buffer (owned by this writer)
269 
270  //! total size currently written
271  size_t write_size_ = 0;
272 
273  //! functional object to call once data is complete
275 };
276 
277 /******************************************************************************/
278 
280 {
281 public:
282  //! Construct block reader with callback
284  data::PinnedByteBlockPtr&& block,
285  const AsyncReadByteBlockCallback& callback)
286  : conn_(&conn),
287  block_(std::move(block)),
288  size_(size),
289  callback_(callback) {
290  LOGC(debug_async)
291  << "AsyncReadByteBlock()"
292  << " block_=" << block_
293  << " size_=" << size_;
294  conn_->rx_active_++;
295  }
296 
297  //! non-copyable: delete copy-constructor
298  AsyncReadByteBlock(const AsyncReadByteBlock&) = delete;
299  //! non-copyable: delete assignment operator
301  //! move-constructor: default
303  //! move-assignment operator: default
305 
307  LOGC(debug_async)
308  << "~AsyncReadByteBlock()"
309  << " block_=" << block_
310  << " size_=" << size_;
311  }
312 
313  //! Should be called when the socket is readable
314  bool operator () () {
315  LOGC(debug_async_recv)
316  << "AsyncReadByteBlock() recv"
317  << " offset=" << pos_
318  << " size=" << size_ - pos_;
319 
320  ssize_t r = conn_->RecvOne(
321  block_->data() + pos_, size_ - pos_);
322 
323  if (r <= 0) {
324  // these errors are acceptable: just redo the recv later.
325  if (errno == EINTR || errno == EAGAIN) return true;
326 
327  // signal artificial IsDone, for clean up.
328  pos_ = size_;
329 
330  // these errors are end-of-file indications (both good and bad)
331  if (errno == 0 || errno == EPIPE || errno == ECONNRESET) {
332  DoCallback();
333  return false;
334  }
335  throw Exception("AsyncReadBlock() error in recv", errno);
336  }
337 
338  pos_ += r;
339 
340  if (pos_ == size_) {
341  DoCallback();
342  conn_->rx_active_--;
343  return false;
344  }
345  else {
346  return true;
347  }
348  }
349 
350  bool IsDone() const {
351  // done if block is already delivered to callback or size matches
352  return !block_ || pos_ == size_;
353  }
354 
355  data::PinnedByteBlockPtr& byte_block() { return block_; }
356 
357  void DoCallback() {
358  if (callback_) {
359  callback_(*conn_, std::move(block_));
361  }
362  }
363 
364  void DoCallback(size_t size_check) {
365  die_unequal(size_check, size_);
366  return DoCallback();
367  }
368 
369  //! Returns conn_
370  Connection * connection() const { return conn_; }
371 
372  //! underlying buffer pointer
373  uint8_t * data() { return block_->data(); }
374 
375  //! underlying buffer pointer
376  const uint8_t * data() const { return block_->data(); }
377 
378  //! underlying buffer size
379  size_t size() const { return size_; }
380 
381 private:
382  //! Connection reference
384 
385  //! Receive block, holds a pin on the memory.
387 
388  //! size currently read
389  size_t pos_ = 0;
390 
391  //! total size to read
392  size_t size_;
393 
394  //! functional object to call once data is complete
396 };
397 
398 /******************************************************************************/
399 
401 {
402 public:
403  //! Construct block writer with callback
405  const AsyncWriteCallback& callback)
406  : conn_(&conn),
407  block_(std::move(block)),
408  callback_(callback) {
409  LOGC(debug_async)
410  << "AsyncWriteBlock()"
411  << " block_.size()=" << block_.size()
412  << " block_=" << block_;
413  conn_->tx_active_++;
414  }
415 
416  //! non-copyable: delete copy-constructor
417  AsyncWriteBlock(const AsyncWriteBlock&) = delete;
418  //! non-copyable: delete assignment operator
419  AsyncWriteBlock& operator = (const AsyncWriteBlock&) = delete;
420  //! move-constructor: default
421  AsyncWriteBlock(AsyncWriteBlock&&) = default;
422  //! move-assignment operator: default
424 
426  LOGC(debug_async)
427  << "~AsyncWriteBlock()"
428  << " block_=" << block_;
429  }
430 
431  //! Should be called when the socket is writable
432  bool operator () () {
433  LOGC(debug_async_send)
434  << "AsyncWriteBlock() send"
435  << " offset=" << written_size_
436  << " size=" << block_.size() - written_size_;
437 
438  ssize_t r = conn_->SendOne(
439  block_.data_begin() + written_size_,
440  block_.size() - written_size_);
441 
442  if (r <= 0) {
443  if (errno == EINTR || errno == EAGAIN) return true;
444 
445  // signal artificial IsDone, for clean up.
446  written_size_ = block_.size();
447 
448  if (errno == EPIPE) {
449  LOG1 << "AsyncWriteBlock() got EPIPE";
450  DoCallback();
451  return false;
452  }
453  throw Exception("AsyncWriteBlock() error in send", errno);
454  }
455 
456  written_size_ += r;
457 
458  if (written_size_ == block_.size()) {
459  DoCallback();
460  conn_->tx_active_--;
461  return false;
462  }
463  else {
464  return true;
465  }
466  }
467 
468  bool IsDone() const { return written_size_ == block_.size(); }
469 
470  void DoCallback() {
471  if (callback_) {
472  callback_(*conn_);
474  }
475  // release Pin
476  block_.Reset();
477  }
478 
479  //! Returns conn_
480  Connection * connection() const { return conn_; }
481 
482  //! underlying buffer pointer
483  const uint8_t * data() const { return block_.data_begin(); }
484 
485  //! underlying buffer size
486  size_t size() const { return block_.size(); }
487 
488 private:
489  //! Connection reference
491 
492  //! Send block (holds a pin on the underlying ByteBlock)
494 
495  //! total size currently written
496  size_t written_size_ = 0;
497 
498  //! functional object to call once data is complete
500 };
501 
502 /******************************************************************************/
503 
504 /*!
505  * Dispatcher is a high level wrapper for asynchronous callback processing.. One
506  * can register Connection objects for readability and writability checks,
507  * buffered reads and writes with completion callbacks, and also timer
508  * functions.
509  */
511 {
512  static constexpr bool debug = false;
513 
514 private:
515  //! import into class namespace
516  using steady_clock = std::chrono::steady_clock;
517 
518  //! import into class namespace
519  using milliseconds = std::chrono::milliseconds;
520 
521  //! for access to terminate_
522  friend class DispatcherThread;
523 
524 public:
525  //! default constructor
526  Dispatcher() = default;
527 
528  //! non-copyable: delete copy-constructor
529  Dispatcher(const Dispatcher&) = delete;
530  //! non-copyable: delete assignment operator
531  Dispatcher& operator = (const Dispatcher&) = delete;
532 
533  //! virtual destructor
534  virtual ~Dispatcher() { }
535 
536  //! \name Timeout Callbacks
537  //! \{
538 
539  //! Register a relative timeout callback
540  void AddTimer(const std::chrono::milliseconds& timeout,
541  const TimerCallback& cb) {
542  timer_pq_.emplace(steady_clock::now() + timeout,
543  std::chrono::duration_cast<milliseconds>(timeout),
544  cb);
545  }
546 
547  //! \}
548 
549  //! \name Connection Callbacks
550  //! \{
551 
552  //! Register a buffered read callback and a default exception callback.
553  virtual void AddRead(Connection& c, const AsyncCallback& read_cb) = 0;
554 
555  //! Register a buffered write callback and a default exception callback.
556  virtual void AddWrite(Connection& c, const AsyncCallback& write_cb) = 0;
557 
558  //! Cancel all callbacks on a given connection.
559  virtual void Cancel(Connection& c) = 0;
560 
561  //! \}
562 
563  //! \name Asynchronous Data Reader/Writer Callbacks
564  //! \{
565 
566  //! asynchronously read n bytes and deliver them to the callback
567  virtual void AsyncRead(Connection& c, uint32_t /* seq */, size_t size,
568  const AsyncReadBufferCallback& done_cb) {
569  assert(c.IsValid());
570 
571  LOG << "async read on read dispatcher";
572  if (size == 0) {
573  if (done_cb) done_cb(c, Buffer());
574  return;
575  }
576 
577  // add new async reader object
578  async_read_.emplace_back(c, size, done_cb);
579 
580  // register read callback
581  AsyncReadBuffer& arb = async_read_.back();
582  AddRead(c, AsyncCallback::make<
583  AsyncReadBuffer, &AsyncReadBuffer::operator ()>(&arb));
584  }
585 
586  //! asynchronously read the full ByteBlock and deliver it to the callback
587  virtual void AsyncRead(Connection& c, uint32_t /* seq */, size_t size,
588  data::PinnedByteBlockPtr&& block,
589  const AsyncReadByteBlockCallback& done_cb) {
590  assert(c.IsValid());
591 
592  LOG << "async read on read dispatcher";
593  if (block->size() == 0) {
594  if (done_cb) done_cb(c, std::move(block));
595  return;
596  }
597 
598  // add new async reader object
599  async_read_block_.emplace_back(c, size, std::move(block), done_cb);
600 
601  // register read callback
602  AsyncReadByteBlock& arbb = async_read_block_.back();
603  AddRead(c, AsyncCallback::make<
604  AsyncReadByteBlock, &AsyncReadByteBlock::operator ()>(&arbb));
605  }
606 
607  //! asynchronously write buffer and callback when delivered. The buffer is
608  //! MOVED into the async writer.
609  virtual void AsyncWrite(
610  Connection& c, uint32_t /* seq */, Buffer&& buffer,
611  const AsyncWriteCallback& done_cb = AsyncWriteCallback()) {
612  assert(c.IsValid());
613 
614  if (buffer.size() == 0) {
615  if (done_cb) done_cb(c);
616  return;
617  }
618 
619  // add new async writer object
620  async_write_.emplace_back(c, std::move(buffer), done_cb);
621 
622  // register write callback
623  AsyncWriteBuffer& awb = async_write_.back();
624  AddWrite(c, AsyncCallback::make<
625  AsyncWriteBuffer, &AsyncWriteBuffer::operator ()>(&awb));
626  }
627 
628  //! asynchronously write buffer and callback when delivered. The buffer is
629  //! MOVED into the async writer.
630  virtual void AsyncWrite(
631  Connection& c, uint32_t /* seq */, data::PinnedBlock&& block,
632  const AsyncWriteCallback& done_cb = AsyncWriteCallback()) {
633  assert(c.IsValid());
634 
635  if (block.size() == 0) {
636  if (done_cb) done_cb(c);
637  return;
638  }
639 
640  // add new async writer object
641  async_write_block_.emplace_back(c, std::move(block), done_cb);
642 
643  // register write callback
644  AsyncWriteBlock& awb = async_write_block_.back();
645  AddWrite(c, AsyncCallback::make<
646  AsyncWriteBlock, &AsyncWriteBlock::operator ()>(&awb));
647  }
648 
649  //! asynchronously write buffer and callback when delivered. COPIES the data
650  //! into a Buffer!
652  Connection& c, uint32_t seq, const void* buffer, size_t size,
653  const AsyncWriteCallback& done_cb = AsyncWriteCallback()) {
654  return AsyncWrite(c, seq, Buffer(buffer, size), done_cb);
655  }
656 
657  //! asynchronously write buffer and callback when delivered. COPIES the data
658  //! into a Buffer!
660  Connection& c, uint32_t seq, const std::string& str,
661  const AsyncWriteCallback& done_cb = AsyncWriteCallback()) {
662  return AsyncWriteCopy(c, seq, str.data(), str.size(), done_cb);
663  }
664 
665  //! \}
666 
667  //! \name Dispatch
668  //! \{
669 
670  //! Dispatch one or more events
671  void Dispatch() {
672  // process timer events that lie in the past
673  steady_clock::time_point now = steady_clock::now();
674 
675  while (!terminate_ &&
676  !timer_pq_.empty() &&
677  timer_pq_.top().next_timeout <= now)
678  {
679  const Timer& top = timer_pq_.top();
680  if (top.cb()) {
681  // requeue timeout event again.
682  timer_pq_.emplace(top.next_timeout + top.timeout,
683  top.timeout, top.cb);
684  }
685  timer_pq_.pop();
686  }
687 
688  if (terminate_) return;
689 
690  // calculate time until next timer event
691  if (timer_pq_.empty()) {
692  LOG << "Dispatch(): empty timer queue - selecting for 10s";
693  DispatchOne(milliseconds(10000));
694  }
695  else {
696  auto diff = std::chrono::duration_cast<milliseconds>(
697  timer_pq_.top().next_timeout - now);
698 
699  if (diff < milliseconds(1)) diff = milliseconds(1);
700 
701  sLOG << "Dispatch(): waiting" << diff.count() << "ms";
702  DispatchOne(diff);
703  }
704 
705  // clean up finished AsyncRead/Writes
706  while (async_read_.size() && async_read_.front().IsDone()) {
707  async_read_.pop_front();
708  }
709  while (async_write_.size() && async_write_.front().IsDone()) {
710  async_write_.pop_front();
711  }
712 
713  while (async_read_block_.size() && async_read_block_.front().IsDone()) {
714  async_read_block_.pop_front();
715  }
716  while (async_write_block_.size() && async_write_block_.front().IsDone()) {
717  async_write_block_.pop_front();
718  }
719  }
720 
721  //! Loop over Dispatch() until terminate_ flag is set.
722  void Loop() {
723  while (!terminate_) {
724  Dispatch();
725  }
726  }
727 
728  //! Interrupt current dispatch
729  virtual void Interrupt() = 0;
730 
731  //! Causes the dispatcher to break out after the next timeout occurred
732  //! Does not interrupt the currently running read/write operation, but
733  //! breaks after the operation finished or timed out.
734  void Terminate() {
735  terminate_ = true;
736  }
737 
738  //! Check whether there are still AsyncWrite()s in the queue.
739  bool HasAsyncWrites() const {
740  return (async_write_.size() != 0) || (async_write_block_.size() != 0);
741  }
742 
743  //! \}
744 
745 protected:
746  virtual void DispatchOne(const std::chrono::milliseconds& timeout) = 0;
747 
748  //! Default exception handler
749  static bool ExceptionCallback(Connection& c) {
750  // exception on listen socket ?
751  throw Exception(
752  "Dispatcher() exception on socket fd "
753  + c.ToString() + "!", errno);
754  }
755 
756  //! true if dispatcher needs to stop
757  std::atomic<bool> terminate_ { false };
758 
759  /*------------------------------------------------------------------------*/
760 
761  //! struct for timer callbacks
762  struct Timer {
763  //! timepoint of next timeout
764  steady_clock::time_point next_timeout;
765  //! relative timeout for restarting
767  //! callback
769 
770  Timer(const steady_clock::time_point& _next_timeout,
771  const milliseconds& _timeout,
772  const TimerCallback& _cb)
773  : next_timeout(_next_timeout),
774  timeout(_timeout),
775  cb(_cb)
776  { }
777 
778  bool operator < (const Timer& b) const
779  { return next_timeout > b.next_timeout; }
780  };
781 
782  //! priority queue of timer callbacks
783  using TimerPQ = std::priority_queue<
784  Timer, std::vector<Timer, mem::GPoolAllocator<Timer> > >;
785 
786  //! priority queue of timer callbacks, obviously kept in timeout
787  //! order. Currently not addressable.
789 
790  /*------------------------------------------------------------------------*/
791 
792  //! deque of asynchronous readers
795 
796  //! deque of asynchronous writers
799 
800  //! deque of asynchronous readers
803 
804  //! deque of asynchronous writers
807 };
808 
809 //! \}
810 
811 } // namespace net
812 } // namespace thrill
813 
814 #endif // !THRILL_NET_DISPATCHER_HEADER
815 
816 /******************************************************************************/
Connection * connection() const
Returns conn_.
Definition: dispatcher.hpp:480
static bool ExceptionCallback(Connection &c)
Default exception handler.
Definition: dispatcher.hpp:749
std::deque< AsyncReadByteBlock, mem::GPoolAllocator< AsyncReadByteBlock > > async_read_block_
deque of asynchronous readers
Definition: dispatcher.hpp:802
size_t read_size_
total size currently read
Definition: dispatcher.hpp:171
Buffer & buffer()
reference to Buffer
Definition: dispatcher.hpp:137
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
Timer(const steady_clock::time_point &_next_timeout, const milliseconds &_timeout, const TimerCallback &_cb)
Definition: dispatcher.hpp:770
std::chrono::milliseconds milliseconds
import into class namespace
Definition: dispatcher.hpp:519
Connection * conn_
Connection reference.
Definition: dispatcher.hpp:383
uint8_t * data()
underlying buffer pointer
Definition: dispatcher.hpp:155
virtual void AsyncWrite(Connection &c, uint32_t, Buffer &&buffer, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
Definition: dispatcher.hpp:609
virtual bool IsValid() const =0
check whether the connection is (still) valid.
struct for timer callbacks
Definition: dispatcher.hpp:762
Connection * connection() const
Returns conn_.
Definition: dispatcher.hpp:255
std::deque< AsyncWriteBlock, mem::GPoolAllocator< AsyncWriteBlock > > async_write_block_
deque of asynchronous writers
Definition: dispatcher.hpp:806
tlx::delegate< void(Connection &), mem::GPoolAllocator< char > > AsyncWriteCallback
Signature of async write callbacks.
Definition: dispatcher.hpp:58
static constexpr bool debug_async_recv
Definition: dispatcher.hpp:64
#define LOG1
Definition: logger.hpp:28
virtual std::string ToString() const =0
return a string representation of this connection, for user output.
const uint8_t * data() const
underlying buffer pointer
Definition: dispatcher.hpp:258
const uint8_t * data() const
underlying buffer pointer
Definition: dispatcher.hpp:158
AsyncWriteCallback callback_
functional object to call once data is complete
Definition: dispatcher.hpp:499
A pinned / pin-counted pointer to a ByteBlock.
Definition: byte_block.hpp:205
STL namespace.
void Dispatch()
Dispatch one or more events.
Definition: dispatcher.hpp:671
bool operator()()
Should be called when the socket is readable.
Definition: dispatcher.hpp:97
virtual void AsyncWrite(Connection &c, uint32_t, data::PinnedBlock &&block, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
Definition: dispatcher.hpp:630
void AsyncWriteCopy(Connection &c, uint32_t seq, const void *buffer, size_t size, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
Definition: dispatcher.hpp:651
virtual void AsyncRead(Connection &c, uint32_t, size_t size, const AsyncReadBufferCallback &done_cb)
asynchronously read n bytes and deliver them to the callback
Definition: dispatcher.hpp:567
std::chrono::steady_clock steady_clock
import into class namespace
Definition: dispatcher.hpp:516
void Loop()
Loop over Dispatch() until terminate_ flag is set.
Definition: dispatcher.hpp:722
size_t size() const
underlying buffer size
Definition: dispatcher.hpp:379
size_t size() const
underlying buffer size
Definition: dispatcher.hpp:486
size_t size_
total size to read
Definition: dispatcher.hpp:392
A Exception is thrown by Connection on all errors instead of returning error codes.
Definition: exception.hpp:30
std::deque< AsyncReadBuffer, mem::GPoolAllocator< AsyncReadBuffer > > async_read_
deque of asynchronous readers
Definition: dispatcher.hpp:794
AsyncReadBuffer & operator=(const AsyncReadBuffer &)=delete
non-copyable: delete assignment operator
TimerCallback cb
callback
Definition: dispatcher.hpp:768
iterator data() noexcept
return iterator to beginning of Buffer
Definition: buffer.hpp:134
void AsyncWriteCopy(Connection &c, uint32_t seq, const std::string &str, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
Definition: dispatcher.hpp:659
AsyncWriteBlock(Connection &conn, data::PinnedBlock &&block, const AsyncWriteCallback &callback)
Construct block writer with callback.
Definition: dispatcher.hpp:404
void DoCallback(size_t size_check)
Definition: dispatcher.hpp:146
virtual ssize_t RecvOne(void *out_data, size_t size)=0
uint8_t * data()
underlying buffer pointer
Definition: dispatcher.hpp:373
#define die_unequal(X, Y)
Definition: die.hpp:50
std::priority_queue< Timer, std::vector< Timer, mem::GPoolAllocator< Timer > > > TimerPQ
priority queue of timer callbacks
Definition: dispatcher.hpp:784
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
std::deque< AsyncWriteBuffer, mem::GPoolAllocator< AsyncWriteBuffer > > async_write_
deque of asynchronous writers
Definition: dispatcher.hpp:798
static constexpr bool debug_async_send
Definition: dispatcher.hpp:63
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
static constexpr bool debug
data::PinnedBlock block_
Send block (holds a pin on the underlying ByteBlock)
Definition: dispatcher.hpp:493
milliseconds timeout
relative timeout for restarting
Definition: dispatcher.hpp:766
tlx::delegate< void(Connection &c, data::PinnedByteBlockPtr &&bytes), mem::GPoolAllocator< char > > AsyncReadByteBlockCallback
Signature of async read ByteBlock callbacks.
Definition: dispatcher.hpp:54
static const size_t bytes
number of bytes in uint_pair
Definition: uint_types.hpp:75
bool HasAsyncWrites() const
Check whether there are still AsyncWrite()s in the queue.
Definition: dispatcher.hpp:739
std::deque< T, Allocator< T > > deque
deque with Manager tracking
Definition: allocator.hpp:232
steady_clock::time_point next_timeout
timepoint of next timeout
Definition: dispatcher.hpp:764
AsyncWriteCallback callback_
functional object to call once data is complete
Definition: dispatcher.hpp:274
AsyncWriteBuffer(Connection &conn, Buffer &&buffer, const AsyncWriteCallback &callback)
Construct buffered writer with callback.
Definition: dispatcher.hpp:183
data::PinnedByteBlockPtr & byte_block()
Definition: dispatcher.hpp:355
Connection * connection() const
Returns conn_.
Definition: dispatcher.hpp:152
Connection * conn_
Connection reference.
Definition: dispatcher.hpp:265
size_t size() const
underlying buffer size
Definition: dispatcher.hpp:261
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
void AddTimer(const std::chrono::milliseconds &timeout, const TimerCallback &cb)
Register a relative timeout callback.
Definition: dispatcher.hpp:540
void DoCallback(size_t size_check)
Definition: dispatcher.hpp:364
static constexpr bool debug_async
Definition: dispatcher.hpp:62
Buffer buffer_
Send buffer (owned by this writer)
Definition: dispatcher.hpp:268
Simple buffer of characters without initialization or growing functionality.
Definition: buffer.hpp:40
AsyncReadByteBlock(Connection &conn, size_t size, data::PinnedByteBlockPtr &&block, const AsyncReadByteBlockCallback &callback)
Construct block reader with callback.
Definition: dispatcher.hpp:283
const uint8_t * data() const
underlying buffer pointer
Definition: dispatcher.hpp:376
size_t size() const
underlying buffer size
Definition: dispatcher.hpp:161
std::atomic< size_t > tx_active_
active send requests
Definition: connection.hpp:621
Connection * conn_
Connection reference.
Definition: dispatcher.hpp:490
Connection * connection() const
Returns conn_.
Definition: dispatcher.hpp:370
data::PinnedByteBlockPtr block_
Receive block, holds a pin on the memory.
Definition: dispatcher.hpp:386
virtual void AsyncRead(Connection &c, uint32_t, size_t size, data::PinnedByteBlockPtr &&block, const AsyncReadByteBlockCallback &done_cb)
asynchronously read the full ByteBlock and deliver it to the callback
Definition: dispatcher.hpp:587
bool operator<(const uint_pair &b) const
less-than comparison operator
Definition: uint_types.hpp:187
Connection * conn_
Connection reference.
Definition: dispatcher.hpp:165
const uint8_t * data() const
underlying buffer pointer
Definition: dispatcher.hpp:483
Buffer buffer_
Receive buffer (allocates memory)
Definition: dispatcher.hpp:168
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
Dispatcher is a high level wrapper for asynchronous callback processing.
Definition: dispatcher.hpp:510
virtual ssize_t SendOne(const void *data, size_t size, Flags flags=NoFlags)=0
AsyncReadByteBlockCallback callback_
functional object to call once data is complete
Definition: dispatcher.hpp:395
tlx::delegate< void(Connection &c, Buffer &&buffer), mem::GPoolAllocator< char > > AsyncReadBufferCallback
Signature of async read Buffer callbacks.
Definition: dispatcher.hpp:49
DispatcherThread contains a net::Dispatcher object and an associated thread that runs in the dispatch...
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
size_type size() const noexcept
return number of items in Buffer
Definition: buffer.hpp:141
std::atomic< size_t > rx_active_
active recv requests
Definition: connection.hpp:624
AsyncReadBuffer(Connection &conn, size_t buffer_size, const AsyncReadBufferCallback &callback)
Construct buffered reader with callback.
Definition: dispatcher.hpp:70
virtual ~Dispatcher()
virtual destructor
Definition: dispatcher.hpp:534
AsyncReadBufferCallback callback_
functional object to call once data is complete
Definition: dispatcher.hpp:174