Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
select_dispatcher.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/tcp/select_dispatcher.cpp
3  *
4  * Lightweight wrapper around BSD socket API.
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Timo Bingmann <[email protected]>
9  *
10  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11  ******************************************************************************/
12 
14 
15 #include <sstream>
16 
17 namespace thrill {
18 namespace net {
19 namespace tcp {
20 
21 //! Run one iteration of dispatching select().
22 void SelectDispatcher::DispatchOne(const std::chrono::milliseconds& timeout) {
23 
24  // copy select fdset
25  Select fdset = select_;
26 
27  if (self_verify_)
28  {
29  for (int fd = 3; fd < static_cast<int>(watch_.size()); ++fd) {
30  Watch& w = watch_[fd];
31 
32  if (!w.active) continue;
33 
34  assert((w.read_cb.size() == 0) != select_.InRead(fd));
35  assert((w.write_cb.size() == 0) != select_.InWrite(fd));
36  }
37  }
38 
39  if (debug)
40  {
41  std::ostringstream oss;
42  oss << "| ";
43 
44  for (int fd = 3; fd < static_cast<int>(watch_.size()); ++fd) {
45  Watch& w = watch_[fd];
46 
47  if (!w.active) continue;
48 
49  if (select_.InRead(fd))
50  oss << "r" << fd << " ";
51  if (select_.InWrite(fd))
52  oss << "w" << fd << " ";
53  if (select_.InException(fd))
54  oss << "e" << fd << " ";
55  }
56 
57  LOG << "Performing select() on " << oss.str();
58  }
59 
60  int r = fdset.select_timeout(static_cast<double>(timeout.count()));
61 
62  if (r < 0) {
63  // if we caught a signal, this is intended to interrupt a select().
64  if (errno == EINTR) {
65  LOG << "Dispatch(): select() was interrupted due to a signal.";
66  return;
67  }
68 
69  throw Exception("Dispatch::Select() failed!", errno);
70  }
71  if (r == 0) return;
72 
73  // start running through the table at fd 3. 0 = stdin, 1 = stdout, 2 =
74  // stderr.
75 
76  for (int fd = 3; fd < static_cast<int>(watch_.size()); ++fd)
77  {
78  // we use a pointer into the watch_ table. however, since the
79  // std::vector may regrow when callback handlers are called, this
80  // pointer is reset a lot of times.
81  Watch* w = &watch_[fd];
82 
83  if (!w->active) continue;
84 
85  if (fdset.InRead(fd))
86  {
87  if (w->read_cb.size()) {
88  // run read callbacks until one returns true (in which case
89  // it wants to be called again), or the read_cb list is
90  // empty.
91  while (w->read_cb.size() && w->read_cb.front()() == false) {
92  w = &watch_[fd];
93  w->read_cb.pop_front();
94  }
95  w = &watch_[fd];
96 
97  if (w->read_cb.size() == 0) {
98  // if all read callbacks are done, listen no longer.
99  select_.ClearRead(fd);
100  if (w->write_cb.size() == 0 && !w->except_cb) {
101  // if also all write callbacks are done, stop
102  // listening.
103  select_.ClearWrite(fd);
105  w->active = false;
106  }
107  }
108  }
109  else {
110  LOG << "SelectDispatcher: got read event for fd "
111  << fd << " without a read handler.";
112 
113  select_.ClearRead(fd);
114  }
115  }
116 
117  if (fdset.InWrite(fd))
118  {
119  if (w->write_cb.size()) {
120  // run write callbacks until one returns true (in which case
121  // it wants to be called again), or the write_cb list is
122  // empty.
123  while (w->write_cb.size() && w->write_cb.front()() == false) {
124  w = &watch_[fd];
125  w->write_cb.pop_front();
126  }
127  w = &watch_[fd];
128 
129  if (w->write_cb.size() == 0) {
130  // if all write callbacks are done, listen no longer.
131  select_.ClearWrite(fd);
132  if (w->read_cb.size() == 0 && !w->except_cb) {
133  // if also all write callbacks are done, stop
134  // listening.
135  select_.ClearRead(fd);
137  w->active = false;
138  }
139  }
140  }
141  else {
142  LOG << "SelectDispatcher: got write event for fd "
143  << fd << " without a write handler.";
144 
145  select_.ClearWrite(fd);
146  }
147  }
148 
149  if (fdset.InException(fd))
150  {
151  if (w->except_cb) {
152  if (!w->except_cb()) {
153  // callback returned false: remove fd from set
155  }
156  }
157  else {
159  }
160  }
161  }
162 }
163 
165  // there are multiple very platform-dependent ways to do this. we'll try
166  // to use the self-pipe trick for now. The select() method waits on
167  // another fd, which we write one byte to when we need to interrupt the
168  // select().
169 
170  // another method would be to send a signal() via pthread_kill() to the
171  // select thread, but that had a race condition for waking up the other
172  // thread. -tb
173 
174  // send one byte to wake up the select() handler.
175  ssize_t wb;
176  while ((wb = write(self_pipe_[1], this, 1)) == 0) {
177  LOG1 << "WakeUp: error sending to self-pipe: " << errno;
178  }
179  die_unless(wb == 1);
180 }
181 
183  while (read(self_pipe_[0],
184  self_pipe_buffer_, sizeof(self_pipe_buffer_)) > 0) {
185  /* repeat, until empty pipe */
186  }
187  return true;
188 }
189 
190 } // namespace tcp
191 } // namespace net
192 } // namespace thrill
193 
194 /******************************************************************************/
void Interrupt() final
Interrupt the current select via self-pipe.
Select & ClearException(int fd)
Clear a file descriptor from the exception set.
Definition: select.hpp:89
bool InException(int fd) const
Check if a file descriptor is in the resulting exception set.
Definition: select.hpp:77
#define die_unless(X)
Definition: die.hpp:27
Select & ClearWrite(int fd)
Clear a file descriptor from the write set.
Definition: select.hpp:85
Callback except_cb
only one exception callback for the fd.
#define LOG1
Definition: logger.hpp:28
void DispatchOne(const std::chrono::milliseconds &timeout) final
Run one iteration of dispatching select().
Select is an object-oriented wrapper for select().
Definition: select.hpp:34
int select_timeout(double timeout)
Do a select() with timeout (in ms)
Definition: select.hpp:103
std::deque< Callback, mem::GPoolAllocator< Callback > > write_cb
bool active
boolean check whether any callbacks are registered
Select select_
select() manager object
A Exception is thrown by Connection on all errors instead of returning error codes.
Definition: exception.hpp:30
static bool DefaultExceptionCallback()
Default exception handler.
bool SelfPipeCallback()
Self-pipe callback.
bool InWrite(int fd) const
Check if a file descriptor is in the resulting Write set.
Definition: select.hpp:73
int self_pipe_[2]
self-pipe to wake up select.
bool InRead(int fd) const
Check if a file descriptor is in the resulting read set.
Definition: select.hpp:69
char self_pipe_buffer_[32]
buffer to receive one byte signals from self-pipe
std::deque< Callback, mem::GPoolAllocator< Callback > > read_cb
queue of callbacks for fd.
callback vectors per watched file descriptor
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
Select & ClearRead(int fd)
Clear a file descriptor from the read set.
Definition: select.hpp:81