14 #ifndef THRILL_NET_TCP_SELECT_DISPATCHER_HEADER 15 #define THRILL_NET_TCP_SELECT_DISPATCHER_HEADER 53 static constexpr
bool debug =
false;
67 LOG1 <<
"SelectDispatcher() cannot set up self-pipe for non-blocking reads";
71 signal(SIGPIPE, SIG_IGN);
97 if (static_cast<size_t>(fd) >=
watch_.size())
104 if (!
watch_[fd].read_cb.size()) {
109 watch_[fd].read_cb.emplace_back(read_cb);
114 assert(dynamic_cast<Connection*>(&c));
122 assert(dynamic_cast<Connection*>(&c));
126 if (!
watch_[fd].write_cb.size()) {
131 watch_[fd].write_cb.emplace_back(write_cb);
136 assert(dynamic_cast<Connection*>(&c));
140 if (!
watch_[fd].except_cb) {
144 watch_[fd].except_cb = except_cb;
149 assert(dynamic_cast<Connection*>(&c));
154 if (
watch_[fd].read_cb.size() == 0 &&
155 watch_[fd].write_cb.size() == 0)
156 LOG <<
"SelectDispatcher::Cancel() fd=" << fd
157 <<
" called with no callbacks registered.";
171 void DispatchOne(
const std::chrono::milliseconds& timeout)
final;
191 std::deque<Callback, mem::GPoolAllocator<Callback> >
204 throw Exception(
"SelectDispatcher() exception on socket!", errno);
217 #endif // !THRILL_NET_TCP_SELECT_DISPATCHER_HEADER void Interrupt() final
Interrupt the current select via self-pipe.
Socket & GetSocket()
Return the raw socket object for more low-level network programming.
Select & ClearException(int fd)
Clear a file descriptor from the exception set.
Select & ClearWrite(int fd)
Clear a file descriptor from the write set.
void AddRead(net::Connection &c, const Callback &read_cb) final
Register a buffered read callback and a default exception callback.
Callback except_cb
only one exception callback for the fd.
std::vector< Watch > watch_
void SetExcept(net::Connection &c, const Callback &except_cb)
Register a buffered write callback and a default exception callback.
void DispatchOne(const std::chrono::milliseconds &timeout) final
Run one iteration of dispatching select().
void CheckSize(int fd)
Grow table if needed.
void Cancel(net::Connection &c) final
Cancel all callbacks on a given fd.
Select is an object-oriented wrapper for select().
void MakePipe(int out_pipefds[2])
create a pair of pipe file descriptors
tlx::delegate< bool(), mem::GPoolAllocator< char > > AsyncCallback
Signature of async connection readability/writability callbacks.
SelectDispatcher is a higher level wrapper for select().
SelectDispatcher()
constructor
std::deque< Callback, mem::GPoolAllocator< Callback > > write_cb
bool active
boolean check whether any callbacks are registered
static constexpr bool self_verify_
Connection is a rich point-to-point socket connection to another client (worker, master, or whatever).
static constexpr bool g_self_verify
Select select_
select() manager object
A Exception is thrown by Connection on all errors instead of returning error codes.
Select & SetRead(int fd)
Add a socket to the read and exception selection set.
static bool DefaultExceptionCallback()
Default exception handler.
bool SelfPipeCallback()
Self-pipe callback.
static constexpr bool debug
Select & SetWrite(int fd)
Add a socket to the write and exception selection set.
A Connection represents a link to another peer in a network group.
static bool SetNonBlocking(int fd, bool non_blocking)
Turn socket into non-blocking state.
int self_pipe_[2]
self-pipe to wake up select.
void AddRead(int fd, const Callback &read_cb)
Register a buffered read callback and a default exception callback.
Select & SetException(int fd)
Add a socket to the exception selection set.
int fd() const
Return the associated file descriptor.
AsyncCallback Callback
type for file descriptor readiness callbacks
SelectDispatcher & operator=(const SelectDispatcher &)=delete
non-copyable: delete assignment operator
char self_pipe_buffer_[32]
buffer to receive one byte signals from self-pipe
std::deque< Callback, mem::GPoolAllocator< Callback > > read_cb
queue of callbacks for fd.
callback vectors per watched file descriptor
#define LOG
Default logging method: output if the local debug variable is true.
Dispatcher is a high level wrapper for asynchronous callback processing.
Select & ClearRead(int fd)
Clear a file descriptor from the read set.
void AddWrite(net::Connection &c, const Callback &write_cb) final
Register a buffered write callback and a default exception callback.