Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
group.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/mock/group.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
13 
14 #include <tlx/die.hpp>
15 #include <tlx/string/hexdump.hpp>
16 
17 #include <algorithm>
18 #include <condition_variable>
19 #include <deque>
20 #include <map>
21 #include <mutex>
22 #include <set>
23 #include <string>
24 #include <vector>
25 
26 namespace thrill {
27 namespace net {
28 namespace mock {
29 
30 /******************************************************************************/
31 // mock::Connection
32 
33 class Connection::Data
34 {
35 public:
36  //! Mutex to lock access to inbound message queue
37  std::mutex mutex_;
38 
39  //! Condition variable to wake up threads synchronously blocking on
40  //! messages.
41  std::condition_variable cv_;
42 
43  //! Set of watching dispatchers.
44  std::set<Dispatcher*> watcher_;
45 
46  //! type of message queue
47  using DataQueue = std::deque<net::Buffer>;
48 
49  //! inbound message queue the virtual network peer
50  DataQueue inbound_;
51 };
52 
53 void Connection::Initialize(Group* group, size_t peer) {
54  d_ = std::make_unique<Data>();
55  group_ = group;
56  peer_ = peer;
57  is_loopback_ = true;
58 }
59 
61  std::unique_lock<std::mutex> lock(d_->mutex_);
62  d_->inbound_.emplace_back(std::move(msg));
63  d_->cv_.notify_all();
64  for (Dispatcher* d : d_->watcher_)
65  d->Notify(this);
66 }
67 
69  return "peer: " + std::to_string(peer_);
70 }
71 
72 std::ostream& Connection::OutputOstream(std::ostream& os) const {
73  return os << "[mock::Connection"
74  << " group=" << group_
75  << " peer=" << peer_
76  << "]";
77 }
78 void Connection::SyncSend(const void* data, size_t size, Flags /* flags */) {
79  // set errno : success (unconditionally)
80  errno = 0;
81  group_->Send(peer_, net::Buffer(data, size));
82  tx_bytes_ += size;
83 }
84 
85 ssize_t Connection::SendOne(const void* data, size_t size, Flags flags) {
86  SyncSend(data, size, flags);
87  return size;
88 }
89 
91  std::unique_lock<std::mutex> lock(d_->mutex_);
92  while (d_->inbound_.empty())
93  d_->cv_.wait(lock);
94  net::Buffer msg = std::move(d_->inbound_.front());
95  d_->inbound_.pop_front();
96 
97  // set errno : success (other syscalls may have failed)
98  errno = 0;
99  rx_bytes_ += msg.size();
100 
101  return msg;
102 }
103 
104 void Connection::SyncRecv(void* out_data, size_t size) {
105  net::Buffer msg = RecvNext();
106  die_unequal(msg.size(), size);
107  char* out_cdata = reinterpret_cast<char*>(out_data);
108  std::copy(msg.begin(), msg.end(), out_cdata);
109 }
110 
111 ssize_t Connection::RecvOne(void* out_data, size_t size) {
112  SyncRecv(out_data, size);
113  return size;
114 }
115 
116 void Connection::SyncSendRecv(const void* send_data, size_t send_size,
117  void* recv_data, size_t recv_size) {
118  SyncSend(send_data, send_size, NoFlags);
119  SyncRecv(recv_data, recv_size);
120 }
121 
122 void Connection::SyncRecvSend(const void* send_data, size_t send_size,
123  void* recv_data, size_t recv_size) {
124  SyncRecv(recv_data, recv_size);
125  SyncSend(send_data, send_size, NoFlags);
126 }
127 
128 /******************************************************************************/
129 // mock::Group
130 
131 Group::Group(size_t my_rank, size_t group_size)
132  : net::Group(my_rank) {
133  peers_.resize(group_size);
134  // create virtual connections, due to complications with non-movable
135  // mutexes, use a plain new.
136  conns_ = new Connection[group_size];
137  for (size_t i = 0; i < group_size; ++i)
138  conns_[i].Initialize(this, i);
139 }
140 
142  delete[] conns_;
143 }
144 
145 size_t Group::num_hosts() const {
146  return peers_.size();
147 }
148 
150  assert(peer < peers_.size());
151  return conns_[peer];
152 }
153 
154 void Group::Close() { }
155 
156 std::unique_ptr<net::Dispatcher> Group::ConstructDispatcher() const {
157  // construct mock::Dispatcher
158  return std::make_unique<Dispatcher>();
159 }
160 
161 std::vector<std::unique_ptr<Group> >
162 Group::ConstructLoopbackMesh(size_t num_hosts) {
163 
164  std::vector<std::unique_ptr<Group> > groups(num_hosts);
165 
166  // first construct all the Group objects.
167  for (size_t i = 0; i < groups.size(); ++i) {
168  groups[i] = std::make_unique<Group>(i, num_hosts);
169  }
170 
171  // then interconnect them
172  for (size_t i = 0; i < groups.size(); ++i) {
173  for (size_t j = 0; j < groups.size(); ++j) {
174  groups[i]->peers_[j] = groups[j].get();
175  }
176  }
177 
178  return groups;
179 }
180 
181 std::string Group::MaybeHexdump(const void* data, size_t size) {
182  if (debug_data)
183  return tlx::hexdump(data, size);
184  else
185  return "[data]";
186 }
187 
188 void Group::Send(size_t tgt, net::Buffer&& msg) {
189  assert(tgt < peers_.size());
190 
191  if (debug) {
192  sLOG << "Sending" << my_rank_ << "->" << tgt
193  << "msg" << MaybeHexdump(msg.data(), msg.size());
194  }
195 
196  peers_[tgt]->conns_[my_rank_].InboundMsg(std::move(msg));
197 }
198 
199 /******************************************************************************/
200 
201 class Dispatcher::Data
202 {
203 public:
204  //! Mutex to lock access to watch lists
205  std::mutex mutex_;
206 
207  //! Notification queue for Dispatch
209 
210  using Map = std::map<Connection*, Watch>;
211 
212  //! map from Connection to its watch list
213  Map map_;
214 };
215 
216 class Dispatcher::Watch
217 {
218 public:
219  //! boolean check whether Watch is registered at Connection
220  bool active = false;
221  //! queue of callbacks for fd.
222  std::deque<Callback, mem::GPoolAllocator<Callback> > read_cb, write_cb;
223  //! only one exception callback for the fd.
224  Callback except_cb;
225 };
226 
228  : net::Dispatcher(),
229  d_(std::make_unique<Data>())
230 { }
231 
233 { }
234 
235 //! Register a buffered read callback and a default exception callback.
236 void Dispatcher::AddRead(net::Connection& _c, const Callback& read_cb) {
237  assert(dynamic_cast<Connection*>(&_c));
238  Connection& c = static_cast<Connection&>(_c);
239 
240  std::unique_lock<std::mutex> d_lock(d_->mutex_);
241  Watch& w = GetWatch(&c);
242  w.read_cb.emplace_back(read_cb);
243  if (!w.active) {
244  std::unique_lock<std::mutex> c_lock(c.d_->mutex_);
245  c.d_->watcher_.insert(this);
246  w.active = true;
247  // if already have a packet, issue notification.
248  if (c.d_->inbound_.size())
249  Notify(&c);
250  }
251 }
252 
253 void Dispatcher::AddWrite(net::Connection& _c, const Callback& write_cb) {
254  assert(dynamic_cast<Connection*>(&_c));
255  Connection& c = static_cast<Connection&>(_c);
256 
257  std::unique_lock<std::mutex> d_lock(d_->mutex_);
258  Watch& w = GetWatch(&c);
259  w.write_cb.emplace_back(write_cb);
260  if (!w.active) {
261  std::unique_lock<std::mutex> c_lock(c.d_->mutex_);
262  c.d_->watcher_.insert(this);
263  w.active = true;
264  }
265  // our virtual sockets are always writable: issue notification.
266  Notify(&c);
267 }
268 
270  abort();
271 }
272 
274  d_->notify_.emplace(c);
275 }
276 
278  Notify(nullptr);
279 }
280 
281 Dispatcher::Watch& Dispatcher::GetWatch(Connection* c) {
282  Data::Map::iterator it = d_->map_.find(c);
283  if (it == d_->map_.end())
284  it = d_->map_.emplace(c, Watch()).first;
285  return it->second;
286 }
287 
288 void Dispatcher::DispatchOne(const std::chrono::milliseconds& timeout) {
289 
290  Connection* c = nullptr;
291  if (!d_->notify_.pop_for(c, timeout)) {
292  sLOG << "DispatchOne timeout";
293  return;
294  }
295 
296  if (c == nullptr) {
297  sLOG << "DispatchOne interrupt";
298  return;
299  }
300 
301  sLOG << "DispatchOne run";
302 
303  std::unique_lock<std::mutex> d_lock(d_->mutex_);
304 
305  Data::Map::iterator it = d_->map_.find(c);
306  if (it == d_->map_.end()) {
307  sLOG << "DispatchOne expired connection?";
308  return;
309  }
310 
311  Watch& w = it->second;
312  assert(w.active);
313 
314  std::unique_lock<std::mutex> c_lock(c->d_->mutex_);
315 
316  // check for readability
317  if (w.read_cb.size() && c->d_->inbound_.size()) {
318 
319  while (c->d_->inbound_.size() && w.read_cb.size()) {
320  c_lock.unlock();
321  d_lock.unlock();
322 
323  bool ret = true;
324  try {
325  ret = w.read_cb.front()();
326  }
327  catch (std::exception& e) {
328  LOG1 << "Dispatcher: exception " << typeid(e).name()
329  << "in read callback.";
330  LOG1 << " what(): " << e.what();
331  throw;
332  }
333 
334  d_lock.lock();
335  c_lock.lock();
336 
337  if (ret) break;
338  w.read_cb.pop_front();
339  }
340 
341  if (w.read_cb.size() == 0 && w.write_cb.size() == 0) {
342  // if all callbacks are done, listen no longer.
343  c->d_->watcher_.erase(this);
344  d_->map_.erase(it);
345  return;
346  }
347  }
348 
349  // "check" for writable. virtual sockets are always writable.
350  if (w.write_cb.size()) {
351 
352  while (w.write_cb.size()) {
353  c_lock.unlock();
354  d_lock.unlock();
355 
356  bool ret = true;
357  try {
358  ret = w.write_cb.front()();
359  }
360  catch (std::exception& e) {
361  LOG1 << "Dispatcher: exception " << typeid(e).name()
362  << "in write callback.";
363  LOG1 << " what(): " << e.what();
364  throw;
365  }
366 
367  d_lock.lock();
368  c_lock.lock();
369 
370  if (ret) break;
371  w.write_cb.pop_front();
372  }
373 
374  if (w.read_cb.size() == 0 && w.write_cb.size() == 0) {
375  // if all callbacks are done, listen no longer.
376  c->d_->watcher_.erase(this);
377  d_->map_.erase(it);
378  return;
379  }
380  }
381 }
382 
383 } // namespace mock
384 } // namespace net
385 } // namespace thrill
386 
387 /******************************************************************************/
void Cancel(net::Connection &) final
Cancel all callbacks on a given connection.
Definition: group.cpp:269
void DispatchOne(const std::chrono::milliseconds &timeout) final
Definition: group.cpp:288
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
size_t peer_
Outgoing peer id of this Connection.
Definition: group.hpp:96
The central object of a mock network: the Group containing links to other mock Group forming the netw...
Definition: group.hpp:116
std::atomic< size_t > tx_bytes_
sent bytes
Definition: connection.hpp:609
#define LOG1
Definition: logger.hpp:28
std::unique_ptr< net::Dispatcher > ConstructDispatcher() const final
Definition: group.cpp:156
ssize_t RecvOne(void *out_data, size_t size) final
Definition: group.cpp:111
size_t my_rank_
our rank in the network group
Definition: group.hpp:232
Connection * conns_
vector of virtual connection objects to remote peers
Definition: group.hpp:157
~Group()
virtual destructor
Definition: group.cpp:141
std::string ToString() const final
return a string representation of this connection, for user output.
Definition: group.cpp:68
void SyncRecv(void *out_data, size_t size) final
Definition: group.cpp:104
A virtual connection through the mock network: each Group has p Connections to its peers...
Definition: group.hpp:41
ssize_t SendOne(const void *data, size_t size, Flags flags=NoFlags) final
Definition: group.cpp:85
iterator begin() noexcept
return mutable iterator to first element
Definition: buffer.hpp:160
A virtual Dispatcher which waits for messages to arrive in the mock network.
Definition: group.hpp:171
std::unique_ptr< Data > d_
pimpl data struct with complex components
Definition: group.hpp:201
static by_string to_string(int val)
convert to string
static std::string MaybeHexdump(const void *data, size_t size)
return hexdump or just [data] if not debugging
Definition: group.cpp:181
iterator end() noexcept
return mutable iterator beyond last element
Definition: buffer.hpp:170
static constexpr bool debug_data
Definition: group.hpp:119
static constexpr bool debug
Definition: group.hpp:118
void SyncSendRecv(const void *send_data, size_t send_size, void *recv_data, size_t recv_size) final
Definition: group.cpp:116
#define die_unequal(X, Y)
Definition: die.hpp:50
void AddRead(net::Connection &_c, const Callback &read_cb) final
Register a buffered read callback and a default exception callback.
Definition: group.cpp:236
unique_ptr< T > make_unique(Manager &manager, Args &&...args)
make_unique with Manager tracking
Definition: allocator.hpp:212
std::unique_ptr< Data > d_
pimpl data struct with complex components
Definition: group.hpp:102
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
This is a queue, similar to std::queue and tbb::concurrent_bounded_queue, except that it uses mutexes...
AsyncCallback Callback
type for file descriptor readiness callbacks
Definition: group.hpp:177
std::ostream & OutputOstream(std::ostream &os) const final
virtual method to output to a std::ostream
Definition: group.cpp:72
static std::vector< std::unique_ptr< Group > > ConstructLoopbackMesh(size_t num_hosts)
Construct a mock network with num_hosts peers and deliver Group contexts for each of them...
Definition: group.cpp:162
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
Watch & GetWatch(Connection *c)
lookup method
Definition: group.cpp:281
void Close() final
Close.
Definition: group.cpp:154
void Initialize(Group *group, size_t peer)
construct from mock::Group
Definition: group.cpp:53
size_t num_hosts() const final
Return number of connections in this group (= number computing hosts)
Definition: group.cpp:145
void SyncSend(const void *data, size_t size, Flags=NoFlags) final
Definition: group.cpp:78
void InboundMsg(net::Buffer &&msg)
Method which is called by other peers to enqueue a message.
Definition: group.cpp:60
std::vector< Group * > peers_
vector of peers for delivery of messages.
Definition: group.hpp:154
void Send(size_t tgt, net::Buffer &&msg)
Send a buffer to peer tgt. Blocking, ... sort of.
Definition: group.cpp:188
Simple buffer of characters without initialization or growing functionality.
Definition: buffer.hpp:40
void AddWrite(net::Connection &_c, const Callback &write_cb) final
Register a buffered write callback and a default exception callback.
Definition: group.cpp:253
std::string hexdump(const void *const data, size_t size)
Dump a (binary) string as a sequence of uppercase hexadecimal pairs.
Definition: hexdump.cpp:21
net::Buffer RecvNext()
some-what internal function to extract the next packet from the queue.
Definition: group.cpp:90
static void Initialize()
run MPI_Init() if not already done (can be called multiple times).
Definition: group.cpp:641
Flags
Additional flags for sending or receiving.
Definition: connection.hpp:61
net::Connection & connection(size_t peer) final
Return Connection to client id.
Definition: group.cpp:149
void SyncRecvSend(const void *send_data, size_t send_size, void *recv_data, size_t recv_size) final
Definition: group.cpp:122
Group(size_t my_rank, size_t group_size)
Initialize a Group for the given size rank.
Definition: group.cpp:131
Group * group_
Reference to our group.
Definition: group.hpp:93
void Notify(Connection *c)
Definition: group.cpp:273
~Dispatcher()
virtual destructor
Definition: group.cpp:232
size_type size() const noexcept
return number of items in Buffer
Definition: buffer.hpp:141
void Interrupt() final
Interrupt current dispatch.
Definition: group.cpp:277
std::atomic< size_t > rx_bytes_
received bytes
Definition: connection.hpp:612