Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
select_dispatcher.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/tcp/select_dispatcher.hpp
3  *
4  * Asynchronous callback wrapper around select()
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Timo Bingmann <[email protected]>
9  *
10  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11  ******************************************************************************/
12 
13 #pragma once
14 #ifndef THRILL_NET_TCP_SELECT_DISPATCHER_HEADER
15 #define THRILL_NET_TCP_SELECT_DISPATCHER_HEADER
16 
17 #include <thrill/common/config.hpp>
18 #include <thrill/common/logger.hpp>
20 #include <thrill/mem/allocator.hpp>
23 #include <thrill/net/exception.hpp>
27 #include <tlx/delegate.hpp>
28 #include <tlx/die.hpp>
29 
30 #include <unistd.h>
31 
32 #include <cerrno>
33 #include <chrono>
34 #include <csignal>
35 #include <deque>
36 #include <functional>
37 #include <vector>
38 
39 namespace thrill {
40 namespace net {
41 namespace tcp {
42 
43 //! \addtogroup net_tcp TCP Socket API
44 //! \{
45 
46 /*!
47  * SelectDispatcher is a higher level wrapper for select(). One can register
48  * Socket objects for readability and writability checks, buffered reads and
49  * writes with completion callbacks, and also timer functions.
50  */
51 class SelectDispatcher final : public net::Dispatcher
52 {
53  static constexpr bool debug = false;
54 
55  static constexpr bool self_verify_ = common::g_self_verify;
56 
57 public:
58  //! type for file descriptor readiness callbacks
60 
61  //! constructor
62  explicit SelectDispatcher() : net::Dispatcher() {
63  // allocate self-pipe
65 
66  if (!Socket::SetNonBlocking(self_pipe_[0], true)) {
67  LOG1 << "SelectDispatcher() cannot set up self-pipe for non-blocking reads";
68  }
69 
70  // Ignore PIPE signals (received when writing to closed sockets)
71  signal(SIGPIPE, SIG_IGN);
72 
73  // wait interrupts via self-pipe.
75  Callback::make<SelectDispatcher,
77  }
78 
79  //! non-copyable: delete copy-constructor
80  SelectDispatcher(const SelectDispatcher&) = delete;
81  //! non-copyable: delete assignment operator
83  //! move-constructor: default
85  //! move-assignment operator: default
87 
89  ::close(self_pipe_[0]);
90  ::close(self_pipe_[1]);
91  }
92 
93  //! Grow table if needed
94  void CheckSize(int fd) {
95  assert(fd >= 0);
96  assert(fd <= 32000); // this is an arbitrary limit to catch errors.
97  if (static_cast<size_t>(fd) >= watch_.size())
98  watch_.resize(fd + 1);
99  }
100 
101  //! Register a buffered read callback and a default exception callback.
102  void AddRead(int fd, const Callback& read_cb) {
103  CheckSize(fd);
104  if (!watch_[fd].read_cb.size()) {
105  select_.SetRead(fd);
106  select_.SetException(fd);
107  }
108  watch_[fd].active = true;
109  watch_[fd].read_cb.emplace_back(read_cb);
110  }
111 
112  //! Register a buffered read callback and a default exception callback.
113  void AddRead(net::Connection& c, const Callback& read_cb) final {
114  assert(dynamic_cast<Connection*>(&c));
115  Connection& tc = static_cast<Connection&>(c);
116  int fd = tc.GetSocket().fd();
117  return AddRead(fd, read_cb);
118  }
119 
120  //! Register a buffered write callback and a default exception callback.
121  void AddWrite(net::Connection& c, const Callback& write_cb) final {
122  assert(dynamic_cast<Connection*>(&c));
123  Connection& tc = static_cast<Connection&>(c);
124  int fd = tc.GetSocket().fd();
125  CheckSize(fd);
126  if (!watch_[fd].write_cb.size()) {
127  select_.SetWrite(fd);
128  select_.SetException(fd);
129  }
130  watch_[fd].active = true;
131  watch_[fd].write_cb.emplace_back(write_cb);
132  }
133 
134  //! Register a buffered write callback and a default exception callback.
135  void SetExcept(net::Connection& c, const Callback& except_cb) {
136  assert(dynamic_cast<Connection*>(&c));
137  Connection& tc = static_cast<Connection&>(c);
138  int fd = tc.GetSocket().fd();
139  CheckSize(fd);
140  if (!watch_[fd].except_cb) {
141  select_.SetException(fd);
142  }
143  watch_[fd].active = true;
144  watch_[fd].except_cb = except_cb;
145  }
146 
147  //! Cancel all callbacks on a given fd.
148  void Cancel(net::Connection& c) final {
149  assert(dynamic_cast<Connection*>(&c));
150  Connection& tc = static_cast<Connection&>(c);
151  int fd = tc.GetSocket().fd();
152  CheckSize(fd);
153 
154  if (watch_[fd].read_cb.size() == 0 &&
155  watch_[fd].write_cb.size() == 0)
156  LOG << "SelectDispatcher::Cancel() fd=" << fd
157  << " called with no callbacks registered.";
158 
159  select_.ClearRead(fd);
160  select_.ClearWrite(fd);
162 
163  Watch& w = watch_[fd];
164  w.read_cb.clear();
165  w.write_cb.clear();
166  w.except_cb = Callback();
167  w.active = false;
168  }
169 
170  //! Run one iteration of dispatching select().
171  void DispatchOne(const std::chrono::milliseconds& timeout) final;
172 
173  //! Interrupt the current select via self-pipe
174  void Interrupt() final;
175 
176 private:
177  //! select() manager object
179 
180  //! self-pipe to wake up select.
181  int self_pipe_[2];
182 
183  //! buffer to receive one byte signals from self-pipe
185 
186  //! callback vectors per watched file descriptor
187  struct Watch {
188  //! boolean check whether any callbacks are registered
189  bool active = false;
190  //! queue of callbacks for fd.
191  std::deque<Callback, mem::GPoolAllocator<Callback> >
192  read_cb, write_cb;
193  //! only one exception callback for the fd.
195  };
196 
197  //! handlers for all registered file descriptors. the fd integer range
198  //! should be small enough, otherwise a more complicated data structure is
199  //! needed.
200  std::vector<Watch> watch_;
201 
202  //! Default exception handler
203  static bool DefaultExceptionCallback() {
204  throw Exception("SelectDispatcher() exception on socket!", errno);
205  }
206 
207  //! Self-pipe callback
208  bool SelfPipeCallback();
209 };
210 
211 //! \}
212 
213 } // namespace tcp
214 } // namespace net
215 } // namespace thrill
216 
217 #endif // !THRILL_NET_TCP_SELECT_DISPATCHER_HEADER
218 
219 /******************************************************************************/
void Interrupt() final
Interrupt the current select via self-pipe.
Socket & GetSocket()
Return the raw socket object for more low-level network programming.
Definition: connection.hpp:130
Select & ClearException(int fd)
Clear a file descriptor from the exception set.
Definition: select.hpp:89
Select & ClearWrite(int fd)
Clear a file descriptor from the write set.
Definition: select.hpp:85
void AddRead(net::Connection &c, const Callback &read_cb) final
Register a buffered read callback and a default exception callback.
Callback except_cb
only one exception callback for the fd.
#define LOG1
Definition: logger.hpp:28
void SetExcept(net::Connection &c, const Callback &except_cb)
Register a buffered write callback and a default exception callback.
void DispatchOne(const std::chrono::milliseconds &timeout) final
Run one iteration of dispatching select().
void CheckSize(int fd)
Grow table if needed.
void Cancel(net::Connection &c) final
Cancel all callbacks on a given fd.
Select is an object-oriented wrapper for select().
Definition: select.hpp:34
void MakePipe(int out_pipefds[2])
create a pair of pipe file descriptors
Definition: porting.cpp:51
tlx::delegate< bool(), mem::GPoolAllocator< char > > AsyncCallback
Signature of async connection readability/writability callbacks.
Definition: dispatcher.hpp:45
SelectDispatcher is a higher level wrapper for select().
std::deque< Callback, mem::GPoolAllocator< Callback > > write_cb
bool active
boolean check whether any callbacks are registered
Connection is a rich point-to-point socket connection to another client (worker, master, or whatever).
Definition: connection.hpp:54
static constexpr bool g_self_verify
Definition: config.hpp:32
Select select_
select() manager object
A Exception is thrown by Connection on all errors instead of returning error codes.
Definition: exception.hpp:30
Select & SetRead(int fd)
Add a socket to the read and exception selection set.
Definition: select.hpp:45
static bool DefaultExceptionCallback()
Default exception handler.
bool SelfPipeCallback()
Self-pipe callback.
Select & SetWrite(int fd)
Add a socket to the write and exception selection set.
Definition: select.hpp:53
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
int fd() const
Return the associated file descriptor.
Definition: socket.hpp:154
static bool SetNonBlocking(int fd, bool non_blocking)
Turn socket into non-blocking state.
Definition: socket.hpp:165
int self_pipe_[2]
self-pipe to wake up select.
void AddRead(int fd, const Callback &read_cb)
Register a buffered read callback and a default exception callback.
Select & SetException(int fd)
Add a socket to the exception selection set.
Definition: select.hpp:61
AsyncCallback Callback
type for file descriptor readiness callbacks
SelectDispatcher & operator=(const SelectDispatcher &)=delete
non-copyable: delete assignment operator
char self_pipe_buffer_[32]
buffer to receive one byte signals from self-pipe
std::deque< Callback, mem::GPoolAllocator< Callback > > read_cb
queue of callbacks for fd.
callback vectors per watched file descriptor
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
Dispatcher is a high level wrapper for asynchronous callback processing.
Definition: dispatcher.hpp:480
Select & ClearRead(int fd)
Clear a file descriptor from the read set.
Definition: select.hpp:81
void AddWrite(net::Connection &c, const Callback &write_cb) final
Register a buffered write callback and a default exception callback.