Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
dispatcher.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/dispatcher.hpp
3  *
4  * Asynchronous callback wrapper around select(), epoll(), or other kernel-level
5  * dispatchers.
6  *
7  * Part of Project Thrill - http://project-thrill.org
8  *
9  * Copyright (C) 2015 Timo Bingmann <[email protected]>
10  *
11  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
12  ******************************************************************************/
13 
14 #pragma once
15 #ifndef THRILL_NET_DISPATCHER_HEADER
16 #define THRILL_NET_DISPATCHER_HEADER
17 
18 #include <thrill/data/block.hpp>
20 #include <thrill/mem/allocator.hpp>
21 #include <thrill/net/buffer.hpp>
23 #include <tlx/delegate.hpp>
24 #include <tlx/die.hpp>
25 
26 #include <atomic>
27 #include <chrono>
28 #include <ctime>
29 #include <deque>
30 #include <functional>
31 #include <queue>
32 #include <string>
33 #include <vector>
34 
35 namespace thrill {
36 namespace net {
37 
38 //! \addtogroup net_layer
39 //! \{
40 
41 //! Signature of timer callbacks.
43 
44 //! Signature of async connection readability/writability callbacks.
46 
47 //! Signature of async read Buffer callbacks.
50 
51 //! Signature of async read ByteBlock callbacks.
55 
56 //! Signature of async write callbacks.
59 
60 /******************************************************************************/
61 
62 static constexpr bool debug_async = false;
63 
65 {
66 public:
67  //! Construct buffered reader with callback
68  AsyncReadBuffer(Connection& conn, size_t buffer_size,
69  const AsyncReadBufferCallback& callback)
70  : conn_(&conn),
71  buffer_(buffer_size),
72  callback_(callback) {
74  << "AsyncReadBuffer()"
75  << " buffer_.size()=" << buffer_.size();
76  }
77 
78  //! non-copyable: delete copy-constructor
79  AsyncReadBuffer(const AsyncReadBuffer&) = delete;
80  //! non-copyable: delete assignment operator
82  //! move-constructor: default
83  AsyncReadBuffer(AsyncReadBuffer&&) = default;
84  //! move-assignment operator: default
86 
89  << "~AsyncReadBuffer()"
90  << " buffer_.size()=" << buffer_.size();
91  }
92 
93  //! Should be called when the socket is readable
94  bool operator () () {
95  ssize_t r = conn_->RecvOne(
97 
98  if (r <= 0) {
99  // these errors are acceptable: just redo the recv later.
100  if (errno == EINTR || errno == EAGAIN) return true;
101 
102  // signal artificial IsDone, for clean up.
103  read_size_ = buffer_.size();
104 
105  // these errors are end-of-file indications (both good and bad)
106  if (errno == 0 || errno == EPIPE || errno == ECONNRESET) {
107  if (callback_) callback_(*conn_, Buffer());
108  return false;
109  }
110  throw Exception("AsyncReadBuffer() error in recv() on "
111  "connection " + conn_->ToString(), errno);
112  }
113 
114  read_size_ += r;
115 
116  if (read_size_ == buffer_.size()) {
117  DoCallback();
118  return false;
119  }
120  else {
121  return true;
122  }
123  }
124 
125  bool IsDone() const { return read_size_ == buffer_.size(); }
126 
127  //! reference to Buffer
128  Buffer& buffer() { return buffer_; }
129 
130  void DoCallback() {
131  if (callback_) {
132  callback_(*conn_, std::move(buffer_));
134  }
135  }
136 
137  void DoCallback(size_t size_check) {
138  die_unequal(size_check, buffer_.size());
139  return DoCallback();
140  }
141 
142  //! Returns conn_
143  Connection * connection() const { return conn_; }
144 
145  //! underlying buffer pointer
146  uint8_t * data() { return buffer_.data(); }
147 
148  //! underlying buffer pointer
149  const uint8_t * data() const { return buffer_.data(); }
150 
151  //! underlying buffer size
152  size_t size() const { return buffer_.size(); }
153 
154 private:
155  //! Connection reference
157 
158  //! Receive buffer (allocates memory)
160 
161  //! total size currently read
162  size_t read_size_ = 0;
163 
164  //! functional object to call once data is complete
166 };
167 
168 /******************************************************************************/
169 
171 {
172 public:
173  //! Construct buffered writer with callback
175  const AsyncWriteCallback& callback)
176  : conn_(&conn),
177  buffer_(std::move(buffer)),
178  callback_(callback) {
180  << "AsyncWriteBuffer()"
181  << " buffer_.size()=" << buffer_.size();
182  }
183 
184  //! non-copyable: delete copy-constructor
185  AsyncWriteBuffer(const AsyncWriteBuffer&) = delete;
186  //! non-copyable: delete assignment operator
188  //! move-constructor: default
189  AsyncWriteBuffer(AsyncWriteBuffer&&) = default;
190  //! move-assignment operator: default
192 
195  << "~AsyncWriteBuffer()"
196  << " buffer_.size()=" << buffer_.size();
197  }
198 
199  //! Should be called when the socket is writable
200  bool operator () () {
201  ssize_t r = conn_->SendOne(
203 
204  if (r <= 0) {
205  if (errno == EINTR || errno == EAGAIN) return true;
206 
207  // signal artificial IsDone, for clean up.
209 
210  if (errno == EPIPE) {
211  LOG1 << "AsyncWriteBuffer() got EPIPE";
212  DoCallback();
213  return false;
214  }
215  throw Exception("AsyncWriteBuffer() error in send", errno);
216  }
217 
218  write_size_ += r;
219 
220  if (write_size_ == buffer_.size()) {
221  DoCallback();
222  return false;
223  }
224  else {
225  return true;
226  }
227  }
228 
229  bool IsDone() const { return write_size_ == buffer_.size(); }
230 
231  void DoCallback() {
232  if (callback_) {
233  callback_(*conn_);
235  }
236  }
237 
238  //! Returns conn_
239  Connection * connection() const { return conn_; }
240 
241  //! underlying buffer pointer
242  const uint8_t * data() const { return buffer_.data(); }
243 
244  //! underlying buffer size
245  size_t size() const { return buffer_.size(); }
246 
247 private:
248  //! Connection reference
250 
251  //! Send buffer (owned by this writer)
253 
254  //! total size currently written
255  size_t write_size_ = 0;
256 
257  //! functional object to call once data is complete
259 };
260 
261 /******************************************************************************/
262 
264 {
265 public:
266  //! Construct block reader with callback
268  data::PinnedByteBlockPtr&& block,
269  const AsyncReadByteBlockCallback& callback)
270  : conn_(&conn),
271  block_(std::move(block)),
272  size_(size),
273  callback_(callback) {
275  << "AsyncReadByteBlock()"
276  << " block_=" << block_
277  << " size_=" << size_;
278  }
279 
280  //! non-copyable: delete copy-constructor
281  AsyncReadByteBlock(const AsyncReadByteBlock&) = delete;
282  //! non-copyable: delete assignment operator
284  //! move-constructor: default
286  //! move-assignment operator: default
288 
291  << "~AsyncReadByteBlock()"
292  << " block_=" << block_
293  << " size_=" << size_;
294  }
295 
296  //! Should be called when the socket is readable
297  bool operator () () {
298  ssize_t r = conn_->RecvOne(
299  block_->data() + pos_, size_ - pos_);
300 
301  if (r <= 0) {
302  // these errors are acceptable: just redo the recv later.
303  if (errno == EINTR || errno == EAGAIN) return true;
304 
305  // signal artificial IsDone, for clean up.
306  pos_ = size_;
307 
308  // these errors are end-of-file indications (both good and bad)
309  if (errno == 0 || errno == EPIPE || errno == ECONNRESET) {
310  DoCallback();
311  return false;
312  }
313  throw Exception("AsyncReadBlock() error in recv", errno);
314  }
315 
316  pos_ += r;
317 
318  if (pos_ == size_) {
319  DoCallback();
320  return false;
321  }
322  else {
323  return true;
324  }
325  }
326 
327  bool IsDone() const {
328  // done if block is already delivered to callback or size matches
329  return !block_ || pos_ == size_;
330  }
331 
333 
334  void DoCallback() {
335  if (callback_) {
336  callback_(*conn_, std::move(block_));
338  }
339  }
340 
341  void DoCallback(size_t size_check) {
342  die_unequal(size_check, size_);
343  return DoCallback();
344  }
345 
346  //! Returns conn_
347  Connection * connection() const { return conn_; }
348 
349  //! underlying buffer pointer
350  uint8_t * data() { return block_->data(); }
351 
352  //! underlying buffer pointer
353  const uint8_t * data() const { return block_->data(); }
354 
355  //! underlying buffer size
356  size_t size() const { return size_; }
357 
358 private:
359  //! Connection reference
361 
362  //! Receive block, holds a pin on the memory.
364 
365  //! size currently read
366  size_t pos_ = 0;
367 
368  //! total size to read
369  size_t size_;
370 
371  //! functional object to call once data is complete
373 };
374 
375 /******************************************************************************/
376 
378 {
379 public:
380  //! Construct block writer with callback
382  const AsyncWriteCallback& callback)
383  : conn_(&conn),
384  block_(std::move(block)),
385  callback_(callback) {
387  << "AsyncWriteBlock()"
388  << " block_.size()=" << block_.size()
389  << " block_=" << block_;
390  }
391 
392  //! non-copyable: delete copy-constructor
393  AsyncWriteBlock(const AsyncWriteBlock&) = delete;
394  //! non-copyable: delete assignment operator
395  AsyncWriteBlock& operator = (const AsyncWriteBlock&) = delete;
396  //! move-constructor: default
397  AsyncWriteBlock(AsyncWriteBlock&&) = default;
398  //! move-assignment operator: default
400 
403  << "~AsyncWriteBlock()"
404  << " block_=" << block_;
405  }
406 
407  //! Should be called when the socket is writable
408  bool operator () () {
409  ssize_t r = conn_->SendOne(
412 
413  if (r <= 0) {
414  if (errno == EINTR || errno == EAGAIN) return true;
415 
416  // signal artificial IsDone, for clean up.
418 
419  if (errno == EPIPE) {
420  LOG1 << "AsyncWriteBlock() got EPIPE";
421  DoCallback();
422  return false;
423  }
424  throw Exception("AsyncWriteBlock() error in send", errno);
425  }
426 
427  written_size_ += r;
428 
429  if (written_size_ == block_.size()) {
430  DoCallback();
431  return false;
432  }
433  else {
434  return true;
435  }
436  }
437 
438  bool IsDone() const { return written_size_ == block_.size(); }
439 
440  void DoCallback() {
441  if (callback_) {
442  callback_(*conn_);
444  }
445  // release Pin
446  block_.Reset();
447  }
448 
449  //! Returns conn_
450  Connection * connection() const { return conn_; }
451 
452  //! underlying buffer pointer
453  const uint8_t * data() const { return block_.data_begin(); }
454 
455  //! underlying buffer size
456  size_t size() const { return block_.size(); }
457 
458 private:
459  //! Connection reference
461 
462  //! Send block (holds a pin on the underlying ByteBlock)
464 
465  //! total size currently written
466  size_t written_size_ = 0;
467 
468  //! functional object to call once data is complete
470 };
471 
472 /******************************************************************************/
473 
474 /*!
475  * Dispatcher is a high level wrapper for asynchronous callback processing.. One
476  * can register Connection objects for readability and writability checks,
477  * buffered reads and writes with completion callbacks, and also timer
478  * functions.
479  */
481 {
482  static constexpr bool debug = false;
483 
484 private:
485  //! import into class namespace
486  using steady_clock = std::chrono::steady_clock;
487 
488  //! import into class namespace
489  using milliseconds = std::chrono::milliseconds;
490 
491  //! for access to terminate_
492  friend class DispatcherThread;
493 
494 public:
495  //! default constructor
496  Dispatcher() = default;
497 
498  //! non-copyable: delete copy-constructor
499  Dispatcher(const Dispatcher&) = delete;
500  //! non-copyable: delete assignment operator
501  Dispatcher& operator = (const Dispatcher&) = delete;
502 
503  //! virtual destructor
504  virtual ~Dispatcher() { }
505 
506  //! \name Timeout Callbacks
507  //! \{
508 
509  //! Register a relative timeout callback
510  void AddTimer(const std::chrono::milliseconds& timeout,
511  const TimerCallback& cb) {
512  timer_pq_.emplace(steady_clock::now() + timeout,
513  std::chrono::duration_cast<milliseconds>(timeout),
514  cb);
515  }
516 
517  //! \}
518 
519  //! \name Connection Callbacks
520  //! \{
521 
522  //! Register a buffered read callback and a default exception callback.
523  virtual void AddRead(Connection& c, const AsyncCallback& read_cb) = 0;
524 
525  //! Register a buffered write callback and a default exception callback.
526  virtual void AddWrite(Connection& c, const AsyncCallback& write_cb) = 0;
527 
528  //! Cancel all callbacks on a given connection.
529  virtual void Cancel(Connection& c) = 0;
530 
531  //! \}
532 
533  //! \name Asynchronous Data Reader/Writer Callbacks
534  //! \{
535 
536  //! asynchronously read n bytes and deliver them to the callback
537  virtual void AsyncRead(Connection& c, uint32_t /* seq */, size_t size,
538  const AsyncReadBufferCallback& done_cb) {
539  assert(c.IsValid());
540 
541  LOG << "async read on read dispatcher";
542  if (size == 0) {
543  if (done_cb) done_cb(c, Buffer());
544  return;
545  }
546 
547  // add new async reader object
548  async_read_.emplace_back(c, size, done_cb);
549 
550  // register read callback
551  AsyncReadBuffer& arb = async_read_.back();
552  AddRead(c, AsyncCallback::make<
553  AsyncReadBuffer, &AsyncReadBuffer::operator ()>(&arb));
554  }
555 
556  //! asynchronously read the full ByteBlock and deliver it to the callback
557  virtual void AsyncRead(Connection& c, uint32_t /* seq */, size_t size,
558  data::PinnedByteBlockPtr&& block,
559  const AsyncReadByteBlockCallback& done_cb) {
560  assert(c.IsValid());
561 
562  LOG << "async read on read dispatcher";
563  if (block->size() == 0) {
564  if (done_cb) done_cb(c, std::move(block));
565  return;
566  }
567 
568  // add new async reader object
569  async_read_block_.emplace_back(c, size, std::move(block), done_cb);
570 
571  // register read callback
572  AsyncReadByteBlock& arbb = async_read_block_.back();
573  AddRead(c, AsyncCallback::make<
574  AsyncReadByteBlock, &AsyncReadByteBlock::operator ()>(&arbb));
575  }
576 
577  //! asynchronously write buffer and callback when delivered. The buffer is
578  //! MOVED into the async writer.
579  virtual void AsyncWrite(
580  Connection& c, uint32_t /* seq */, Buffer&& buffer,
581  const AsyncWriteCallback& done_cb = AsyncWriteCallback()) {
582  assert(c.IsValid());
583 
584  if (buffer.size() == 0) {
585  if (done_cb) done_cb(c);
586  return;
587  }
588 
589  // add new async writer object
590  async_write_.emplace_back(c, std::move(buffer), done_cb);
591 
592  // register write callback
593  AsyncWriteBuffer& awb = async_write_.back();
594  AddWrite(c, AsyncCallback::make<
595  AsyncWriteBuffer, &AsyncWriteBuffer::operator ()>(&awb));
596  }
597 
598  //! asynchronously write buffer and callback when delivered. The buffer is
599  //! MOVED into the async writer.
600  virtual void AsyncWrite(
601  Connection& c, uint32_t /* seq */, data::PinnedBlock&& block,
602  const AsyncWriteCallback& done_cb = AsyncWriteCallback()) {
603  assert(c.IsValid());
604 
605  if (block.size() == 0) {
606  if (done_cb) done_cb(c);
607  return;
608  }
609 
610  // add new async writer object
611  async_write_block_.emplace_back(c, std::move(block), done_cb);
612 
613  // register write callback
614  AsyncWriteBlock& awb = async_write_block_.back();
615  AddWrite(c, AsyncCallback::make<
616  AsyncWriteBlock, &AsyncWriteBlock::operator ()>(&awb));
617  }
618 
619  //! asynchronously write buffer and callback when delivered. COPIES the data
620  //! into a Buffer!
622  Connection& c, uint32_t seq, const void* buffer, size_t size,
623  const AsyncWriteCallback& done_cb = AsyncWriteCallback()) {
624  return AsyncWrite(c, seq, Buffer(buffer, size), done_cb);
625  }
626 
627  //! asynchronously write buffer and callback when delivered. COPIES the data
628  //! into a Buffer!
630  Connection& c, uint32_t seq, const std::string& str,
631  const AsyncWriteCallback& done_cb = AsyncWriteCallback()) {
632  return AsyncWriteCopy(c, seq, str.data(), str.size(), done_cb);
633  }
634 
635  //! \}
636 
637  //! \name Dispatch
638  //! \{
639 
640  //! Dispatch one or more events
641  void Dispatch() {
642  // process timer events that lie in the past
643  steady_clock::time_point now = steady_clock::now();
644 
645  while (!terminate_ &&
646  !timer_pq_.empty() &&
647  timer_pq_.top().next_timeout <= now)
648  {
649  const Timer& top = timer_pq_.top();
650  if (top.cb()) {
651  // requeue timeout event again.
652  timer_pq_.emplace(top.next_timeout + top.timeout,
653  top.timeout, top.cb);
654  }
655  timer_pq_.pop();
656  }
657 
658  if (terminate_) return;
659 
660  // calculate time until next timer event
661  if (timer_pq_.empty()) {
662  LOG << "Dispatch(): empty timer queue - selecting for 10s";
663  DispatchOne(milliseconds(10000));
664  }
665  else {
666  auto diff = std::chrono::duration_cast<milliseconds>(
667  timer_pq_.top().next_timeout - now);
668 
669  if (diff < milliseconds(1)) diff = milliseconds(1);
670 
671  sLOG << "Dispatch(): waiting" << diff.count() << "ms";
672  DispatchOne(diff);
673  }
674 
675  // clean up finished AsyncRead/Writes
676  while (async_read_.size() && async_read_.front().IsDone()) {
677  async_read_.pop_front();
678  }
679  while (async_write_.size() && async_write_.front().IsDone()) {
680  async_write_.pop_front();
681  }
682 
683  while (async_read_block_.size() && async_read_block_.front().IsDone()) {
684  async_read_block_.pop_front();
685  }
686  while (async_write_block_.size() && async_write_block_.front().IsDone()) {
687  async_write_block_.pop_front();
688  }
689  }
690 
691  //! Loop over Dispatch() until terminate_ flag is set.
692  void Loop() {
693  while (!terminate_) {
694  Dispatch();
695  }
696  }
697 
698  //! Interrupt current dispatch
699  virtual void Interrupt() = 0;
700 
701  //! Causes the dispatcher to break out after the next timeout occurred
702  //! Does not interrupt the currently running read/write operation, but
703  //! breaks after the operation finished or timed out.
704  void Terminate() {
705  terminate_ = true;
706  }
707 
708  //! Check whether there are still AsyncWrite()s in the queue.
709  bool HasAsyncWrites() const {
710  return (async_write_.size() != 0) || (async_write_block_.size() != 0);
711  }
712 
713  //! \}
714 
715 protected:
716  virtual void DispatchOne(const std::chrono::milliseconds& timeout) = 0;
717 
718  //! Default exception handler
719  static bool ExceptionCallback(Connection& c) {
720  // exception on listen socket ?
721  throw Exception(
722  "Dispatcher() exception on socket fd "
723  + c.ToString() + "!", errno);
724  }
725 
726  //! true if dispatcher needs to stop
727  std::atomic<bool> terminate_ { false };
728 
729  /*------------------------------------------------------------------------*/
730 
731  //! struct for timer callbacks
732  struct Timer {
733  //! timepoint of next timeout
734  steady_clock::time_point next_timeout;
735  //! relative timeout for restarting
737  //! callback
739 
740  Timer(const steady_clock::time_point& _next_timeout,
741  const milliseconds& _timeout,
742  const TimerCallback& _cb)
743  : next_timeout(_next_timeout),
744  timeout(_timeout),
745  cb(_cb)
746  { }
747 
748  bool operator < (const Timer& b) const
749  { return next_timeout > b.next_timeout; }
750  };
751 
752  //! priority queue of timer callbacks
753  using TimerPQ = std::priority_queue<
754  Timer, std::vector<Timer, mem::GPoolAllocator<Timer> > >;
755 
756  //! priority queue of timer callbacks, obviously kept in timeout
757  //! order. Currently not addressable.
759 
760  /*------------------------------------------------------------------------*/
761 
762  //! deque of asynchronous readers
765 
766  //! deque of asynchronous writers
769 
770  //! deque of asynchronous readers
773 
774  //! deque of asynchronous writers
777 };
778 
779 //! \}
780 
781 } // namespace net
782 } // namespace thrill
783 
784 #endif // !THRILL_NET_DISPATCHER_HEADER
785 
786 /******************************************************************************/
size_t write_size_
total size currently written
Definition: dispatcher.hpp:255
static bool ExceptionCallback(Connection &c)
Default exception handler.
Definition: dispatcher.hpp:719
std::deque< AsyncReadByteBlock, mem::GPoolAllocator< AsyncReadByteBlock > > async_read_block_
deque of asynchronous readers
Definition: dispatcher.hpp:772
size_t read_size_
total size currently read
Definition: dispatcher.hpp:162
AsyncReadByteBlock & operator=(const AsyncReadByteBlock &)=delete
non-copyable: delete assignment operator
Buffer & buffer()
reference to Buffer
Definition: dispatcher.hpp:128
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
Timer(const steady_clock::time_point &_next_timeout, const milliseconds &_timeout, const TimerCallback &_cb)
Definition: dispatcher.hpp:740
const uint8_t * data() const
underlying buffer pointer
Definition: dispatcher.hpp:353
std::chrono::milliseconds milliseconds
import into class namespace
Definition: dispatcher.hpp:489
Connection * conn_
Connection reference.
Definition: dispatcher.hpp:360
uint8_t * data()
underlying buffer pointer
Definition: dispatcher.hpp:146
virtual void AsyncWrite(Connection &c, uint32_t, Buffer &&buffer, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
Definition: dispatcher.hpp:579
virtual bool IsValid() const =0
check whether the connection is (still) valid.
struct for timer callbacks
Definition: dispatcher.hpp:732
virtual void AddWrite(Connection &c, const AsyncCallback &write_cb)=0
Register a buffered write callback and a default exception callback.
size_t size() const
underlying buffer size
Definition: dispatcher.hpp:152
std::deque< AsyncWriteBlock, mem::GPoolAllocator< AsyncWriteBlock > > async_write_block_
deque of asynchronous writers
Definition: dispatcher.hpp:776
tlx::delegate< void(Connection &), mem::GPoolAllocator< char > > AsyncWriteCallback
Signature of async write callbacks.
Definition: dispatcher.hpp:58
virtual void Cancel(Connection &c)=0
Cancel all callbacks on a given connection.
#define LOG1
Definition: logger.hpp:28
virtual std::string ToString() const =0
return a string representation of this connection, for user output.
Dispatcher()=default
default constructor
virtual void AddRead(Connection &c, const AsyncCallback &read_cb)=0
Register a buffered read callback and a default exception callback.
AsyncWriteBuffer & operator=(const AsyncWriteBuffer &)=delete
non-copyable: delete assignment operator
AsyncWriteCallback callback_
functional object to call once data is complete
Definition: dispatcher.hpp:469
bool operator()()
Should be called when the socket is readable.
Definition: dispatcher.hpp:297
A pinned / pin-counted pointer to a ByteBlock.
Definition: byte_block.hpp:182
void Dispatch()
Dispatch one or more events.
Definition: dispatcher.hpp:641
std::atomic< bool > terminate_
true if dispatcher needs to stop
Definition: dispatcher.hpp:727
bool operator()()
Should be called when the socket is readable.
Definition: dispatcher.hpp:94
static constexpr bool debug
Definition: dispatcher.hpp:482
bool operator()()
Should be called when the socket is writable.
Definition: dispatcher.hpp:408
virtual void AsyncWrite(Connection &c, uint32_t, data::PinnedBlock &&block, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
Definition: dispatcher.hpp:600
void AsyncWriteCopy(Connection &c, uint32_t seq, const void *buffer, size_t size, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
Definition: dispatcher.hpp:621
virtual void AsyncRead(Connection &c, uint32_t, size_t size, const AsyncReadBufferCallback &done_cb)
asynchronously read n bytes and deliver them to the callback
Definition: dispatcher.hpp:537
size_t size() const
return length of valid data in bytes.
Definition: block.hpp:245
std::chrono::steady_clock steady_clock
import into class namespace
Definition: dispatcher.hpp:486
size_t size() const
underlying buffer size
Definition: dispatcher.hpp:356
Connection * connection() const
Returns conn_.
Definition: dispatcher.hpp:347
void Loop()
Loop over Dispatch() until terminate_ flag is set.
Definition: dispatcher.hpp:692
void Reset()
release pin on block and reset Block pointer to nullptr
Definition: block.hpp:271
size_t size_
total size to read
Definition: dispatcher.hpp:369
A Exception is thrown by Connection on all errors instead of returning error codes.
Definition: exception.hpp:30
std::deque< AsyncReadBuffer, mem::GPoolAllocator< AsyncReadBuffer > > async_read_
deque of asynchronous readers
Definition: dispatcher.hpp:764
AsyncReadBuffer & operator=(const AsyncReadBuffer &)=delete
non-copyable: delete assignment operator
TimerCallback cb
callback
Definition: dispatcher.hpp:738
iterator data() noexcept
return iterator to beginning of Buffer
Definition: buffer.hpp:134
bool operator<(const Timer &b) const
Definition: dispatcher.hpp:748
void AsyncWriteCopy(Connection &c, uint32_t seq, const std::string &str, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
Definition: dispatcher.hpp:629
AsyncWriteBlock(Connection &conn, data::PinnedBlock &&block, const AsyncWriteCallback &callback)
Construct block writer with callback.
Definition: dispatcher.hpp:381
void DoCallback(size_t size_check)
Definition: dispatcher.hpp:137
virtual ssize_t RecvOne(void *out_data, size_t size)=0
size_t pos_
size currently read
Definition: dispatcher.hpp:366
uint8_t * data()
underlying buffer pointer
Definition: dispatcher.hpp:350
#define die_unequal(X, Y)
Definition: die.hpp:40
std::priority_queue< Timer, std::vector< Timer, mem::GPoolAllocator< Timer > > > TimerPQ
priority queue of timer callbacks
Definition: dispatcher.hpp:754
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
std::deque< AsyncWriteBuffer, mem::GPoolAllocator< AsyncWriteBuffer > > async_write_
deque of asynchronous writers
Definition: dispatcher.hpp:768
bool HasAsyncWrites() const
Check whether there are still AsyncWrite()s in the queue.
Definition: dispatcher.hpp:709
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
const Byte * data_begin() const
return pointer to beginning of valid data
Definition: block.hpp:259
data::PinnedBlock block_
Send block (holds a pin on the underlying ByteBlock)
Definition: dispatcher.hpp:463
milliseconds timeout
relative timeout for restarting
Definition: dispatcher.hpp:736
Connection * connection() const
Returns conn_.
Definition: dispatcher.hpp:239
tlx::delegate< void(Connection &c, data::PinnedByteBlockPtr &&bytes), mem::GPoolAllocator< char > > AsyncReadByteBlockCallback
Signature of async read ByteBlock callbacks.
Definition: dispatcher.hpp:54
const uint8_t * data() const
underlying buffer pointer
Definition: dispatcher.hpp:453
static const size_t bytes
number of bytes in uint_pair
Definition: uint_types.hpp:75
AsyncWriteBlock & operator=(const AsyncWriteBlock &)=delete
non-copyable: delete assignment operator
std::deque< T, Allocator< T > > deque
deque with Manager tracking
Definition: allocator.hpp:232
steady_clock::time_point next_timeout
timepoint of next timeout
Definition: dispatcher.hpp:734
AsyncWriteCallback callback_
functional object to call once data is complete
Definition: dispatcher.hpp:258
AsyncWriteBuffer(Connection &conn, Buffer &&buffer, const AsyncWriteCallback &callback)
Construct buffered writer with callback.
Definition: dispatcher.hpp:174
bool operator()()
Should be called when the socket is writable.
Definition: dispatcher.hpp:200
data::PinnedByteBlockPtr & byte_block()
Definition: dispatcher.hpp:332
size_t size() const
underlying buffer size
Definition: dispatcher.hpp:456
Connection * conn_
Connection reference.
Definition: dispatcher.hpp:249
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
void AddTimer(const std::chrono::milliseconds &timeout, const TimerCallback &cb)
Register a relative timeout callback.
Definition: dispatcher.hpp:510
void DoCallback(size_t size_check)
Definition: dispatcher.hpp:341
static constexpr bool debug_async
Definition: dispatcher.hpp:62
Buffer buffer_
Send buffer (owned by this writer)
Definition: dispatcher.hpp:252
Simple buffer of characters without initialization or growing functionality.
Definition: buffer.hpp:40
AsyncReadByteBlock(Connection &conn, size_t size, data::PinnedByteBlockPtr &&block, const AsyncReadByteBlockCallback &callback)
Construct block reader with callback.
Definition: dispatcher.hpp:267
Connection * connection() const
Returns conn_.
Definition: dispatcher.hpp:450
const uint8_t * data() const
underlying buffer pointer
Definition: dispatcher.hpp:149
Connection * conn_
Connection reference.
Definition: dispatcher.hpp:460
Connection * connection() const
Returns conn_.
Definition: dispatcher.hpp:143
virtual void Interrupt()=0
Interrupt current dispatch.
data::PinnedByteBlockPtr block_
Receive block, holds a pin on the memory.
Definition: dispatcher.hpp:363
size_t size() const
underlying buffer size
Definition: dispatcher.hpp:245
virtual void AsyncRead(Connection &c, uint32_t, size_t size, data::PinnedByteBlockPtr &&block, const AsyncReadByteBlockCallback &done_cb)
asynchronously read the full ByteBlock and deliver it to the callback
Definition: dispatcher.hpp:557
Connection * conn_
Connection reference.
Definition: dispatcher.hpp:156
Buffer buffer_
Receive buffer (allocates memory)
Definition: dispatcher.hpp:159
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
Dispatcher is a high level wrapper for asynchronous callback processing.
Definition: dispatcher.hpp:480
virtual void DispatchOne(const std::chrono::milliseconds &timeout)=0
virtual ssize_t SendOne(const void *data, size_t size, Flags flags=NoFlags)=0
AsyncReadByteBlockCallback callback_
functional object to call once data is complete
Definition: dispatcher.hpp:372
Dispatcher & operator=(const Dispatcher &)=delete
non-copyable: delete assignment operator
DispatcherThread contains a net::Dispatcher object and an associated thread that runs in the dispatch...
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
size_type size() const noexcept
return number of items in Buffer
Definition: buffer.hpp:141
size_t written_size_
total size currently written
Definition: dispatcher.hpp:466
AsyncReadBuffer(Connection &conn, size_t buffer_size, const AsyncReadBufferCallback &callback)
Construct buffered reader with callback.
Definition: dispatcher.hpp:68
virtual ~Dispatcher()
virtual destructor
Definition: dispatcher.hpp:504
tlx::delegate< void(Connection &c, Buffer &&buffer), mem::GPoolAllocator< char > > AsyncReadBufferCallback
Signature of async read Buffer callbacks.
Definition: dispatcher.hpp:49
AsyncReadBufferCallback callback_
functional object to call once data is complete
Definition: dispatcher.hpp:165
const uint8_t * data() const
underlying buffer pointer
Definition: dispatcher.hpp:242