Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
group.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/mock/group.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
13 
14 #include <tlx/die.hpp>
15 #include <tlx/string/hexdump.hpp>
16 
17 #include <algorithm>
18 #include <condition_variable>
19 #include <deque>
20 #include <map>
21 #include <mutex>
22 #include <set>
23 #include <string>
24 #include <vector>
25 
26 namespace thrill {
27 namespace net {
28 namespace mock {
29 
30 /******************************************************************************/
31 // mock::Connection
32 
33 struct Connection::Data {
34  //! Mutex to lock access to inbound message queue
35  std::mutex mutex_;
36 
37  //! Condition variable to wake up threads synchronously blocking on
38  //! messages.
39  std::condition_variable cv_;
40 
41  //! Set of watching dispatchers.
42  std::set<Dispatcher*> watcher_;
43 
44  //! type of message queue
45  using DataQueue = std::deque<net::Buffer>;
46 
47  //! inbound message queue the virtual network peer
48  DataQueue inbound_;
49 };
50 
51 void Connection::Initialize(Group* group, size_t peer) {
52  d_ = std::make_unique<Data>();
53  group_ = group;
54  peer_ = peer;
55  is_loopback_ = true;
56 }
57 
59  std::unique_lock<std::mutex> lock(d_->mutex_);
60  d_->inbound_.emplace_back(std::move(msg));
61  d_->cv_.notify_all();
62  for (Dispatcher* d : d_->watcher_)
63  d->Notify(this);
64 }
65 
67  return "peer: " + std::to_string(peer_);
68 }
69 
70 std::ostream& Connection::OutputOstream(std::ostream& os) const {
71  return os << "[mock::Connection"
72  << " group=" << group_
73  << " peer=" << peer_
74  << "]";
75 }
76 void Connection::SyncSend(const void* data, size_t size, Flags /* flags */) {
77  // set errno : success (unconditionally)
78  errno = 0;
79  group_->Send(peer_, net::Buffer(data, size));
80  tx_bytes_ += size;
81 }
82 
83 ssize_t Connection::SendOne(const void* data, size_t size, Flags flags) {
84  SyncSend(data, size, flags);
85  return size;
86 }
87 
89  std::unique_lock<std::mutex> lock(d_->mutex_);
90  while (d_->inbound_.empty())
91  d_->cv_.wait(lock);
92  net::Buffer msg = std::move(d_->inbound_.front());
93  d_->inbound_.pop_front();
94 
95  // set errno : success (other syscalls may have failed)
96  errno = 0;
97  rx_bytes_ += msg.size();
98 
99  return msg;
100 }
101 
102 void Connection::SyncRecv(void* out_data, size_t size) {
103  net::Buffer msg = RecvNext();
104  die_unequal(msg.size(), size);
105  char* out_cdata = reinterpret_cast<char*>(out_data);
106  std::copy(msg.begin(), msg.end(), out_cdata);
107 }
108 
109 ssize_t Connection::RecvOne(void* out_data, size_t size) {
110  SyncRecv(out_data, size);
111  return size;
112 }
113 
114 void Connection::SyncSendRecv(const void* send_data, size_t send_size,
115  void* recv_data, size_t recv_size) {
116  SyncSend(send_data, send_size, NoFlags);
117  SyncRecv(recv_data, recv_size);
118 }
119 
120 void Connection::SyncRecvSend(const void* send_data, size_t send_size,
121  void* recv_data, size_t recv_size) {
122  SyncRecv(recv_data, recv_size);
123  SyncSend(send_data, send_size, NoFlags);
124 }
125 
126 /******************************************************************************/
127 // mock::Group
128 
129 Group::Group(size_t my_rank, size_t group_size)
130  : net::Group(my_rank) {
131  peers_.resize(group_size);
132  // create virtual connections, due to complications with non-movable
133  // mutexes, use a plain new.
134  conns_ = new Connection[group_size];
135  for (size_t i = 0; i < group_size; ++i)
136  conns_[i].Initialize(this, i);
137 }
138 
140  delete[] conns_;
141 }
142 
143 size_t Group::num_hosts() const {
144  return peers_.size();
145 }
146 
148  assert(peer < peers_.size());
149  return conns_[peer];
150 }
151 
152 void Group::Close() { }
153 
154 std::unique_ptr<net::Dispatcher> Group::ConstructDispatcher() const {
155  // construct mock::Dispatcher
156  return std::make_unique<Dispatcher>();
157 }
158 
159 std::vector<std::unique_ptr<Group> >
160 Group::ConstructLoopbackMesh(size_t num_hosts) {
161 
162  std::vector<std::unique_ptr<Group> > groups(num_hosts);
163 
164  // first construct all the Group objects.
165  for (size_t i = 0; i < groups.size(); ++i) {
166  groups[i] = std::make_unique<Group>(i, num_hosts);
167  }
168 
169  // then interconnect them
170  for (size_t i = 0; i < groups.size(); ++i) {
171  for (size_t j = 0; j < groups.size(); ++j) {
172  groups[i]->peers_[j] = groups[j].get();
173  }
174  }
175 
176  return groups;
177 }
178 
179 std::string Group::MaybeHexdump(const void* data, size_t size) {
180  if (debug_data)
181  return tlx::hexdump(data, size);
182  else
183  return "[data]";
184 }
185 
186 void Group::Send(size_t tgt, net::Buffer&& msg) {
187  assert(tgt < peers_.size());
188 
189  if (debug) {
190  sLOG << "Sending" << my_rank_ << "->" << tgt
191  << "msg" << MaybeHexdump(msg.data(), msg.size());
192  }
193 
194  peers_[tgt]->conns_[my_rank_].InboundMsg(std::move(msg));
195 }
196 
197 /******************************************************************************/
198 
199 struct Dispatcher::Data {
200  //! Mutex to lock access to watch lists
201  std::mutex mutex_;
202 
203  //! Notification queue for Dispatch
205  notify_;
206 
207  using Map = std::map<Connection*, Watch>;
208 
209  //! map from Connection to its watch list
210  Map map_;
211 };
212 
213 struct Dispatcher::Watch {
214  //! boolean check whether Watch is registered at Connection
215  bool active = false;
216  //! queue of callbacks for fd.
217  std::deque<Callback, mem::GPoolAllocator<Callback> >
218  read_cb, write_cb;
219  //! only one exception callback for the fd.
220  Callback except_cb;
221 };
222 
224  : net::Dispatcher(),
225  d_(std::make_unique<Data>())
226 { }
227 
229 { }
230 
231 //! Register a buffered read callback and a default exception callback.
232 void Dispatcher::AddRead(net::Connection& _c, const Callback& read_cb) {
233  assert(dynamic_cast<Connection*>(&_c));
234  Connection& c = static_cast<Connection&>(_c);
235 
236  std::unique_lock<std::mutex> d_lock(d_->mutex_);
237  Watch& w = GetWatch(&c);
238  w.read_cb.emplace_back(read_cb);
239  if (!w.active) {
240  std::unique_lock<std::mutex> c_lock(c.d_->mutex_);
241  c.d_->watcher_.insert(this);
242  w.active = true;
243  // if already have a packet, issue notification.
244  if (c.d_->inbound_.size())
245  Notify(&c);
246  }
247 }
248 
249 void Dispatcher::AddWrite(net::Connection& _c, const Callback& write_cb) {
250  assert(dynamic_cast<Connection*>(&_c));
251  Connection& c = static_cast<Connection&>(_c);
252 
253  std::unique_lock<std::mutex> d_lock(d_->mutex_);
254  Watch& w = GetWatch(&c);
255  w.write_cb.emplace_back(write_cb);
256  if (!w.active) {
257  std::unique_lock<std::mutex> c_lock(c.d_->mutex_);
258  c.d_->watcher_.insert(this);
259  w.active = true;
260  }
261  // our virtual sockets are always writable: issue notification.
262  Notify(&c);
263 }
264 
266  abort();
267 }
268 
270  d_->notify_.emplace(c);
271 }
272 
274  Notify(nullptr);
275 }
276 
277 Dispatcher::Watch& Dispatcher::GetWatch(Connection* c) {
278  Data::Map::iterator it = d_->map_.find(c);
279  if (it == d_->map_.end())
280  it = d_->map_.emplace(c, Watch()).first;
281  return it->second;
282 }
283 
284 void Dispatcher::DispatchOne(const std::chrono::milliseconds& timeout) {
285 
286  Connection* c = nullptr;
287  if (!d_->notify_.pop_for(c, timeout)) {
288  sLOG << "DispatchOne timeout";
289  return;
290  }
291 
292  if (c == nullptr) {
293  sLOG << "DispatchOne interrupt";
294  return;
295  }
296 
297  sLOG << "DispatchOne run";
298 
299  std::unique_lock<std::mutex> d_lock(d_->mutex_);
300 
301  Data::Map::iterator it = d_->map_.find(c);
302  if (it == d_->map_.end()) {
303  sLOG << "DispatchOne expired connection?";
304  return;
305  }
306 
307  Watch& w = it->second;
308  assert(w.active);
309 
310  std::unique_lock<std::mutex> c_lock(c->d_->mutex_);
311 
312  // check for readability
313  if (w.read_cb.size() && c->d_->inbound_.size()) {
314 
315  while (c->d_->inbound_.size() && w.read_cb.size()) {
316  c_lock.unlock();
317  d_lock.unlock();
318 
319  bool ret = true;
320  try {
321  ret = w.read_cb.front()();
322  }
323  catch (std::exception& e) {
324  LOG1 << "Dispatcher: exception " << typeid(e).name()
325  << "in read callback.";
326  LOG1 << " what(): " << e.what();
327  throw;
328  }
329 
330  d_lock.lock();
331  c_lock.lock();
332 
333  if (ret) break;
334  w.read_cb.pop_front();
335  }
336 
337  if (w.read_cb.size() == 0 && w.write_cb.size() == 0) {
338  // if all callbacks are done, listen no longer.
339  c->d_->watcher_.erase(this);
340  d_->map_.erase(it);
341  return;
342  }
343  }
344 
345  // "check" for writable. virtual sockets are always writable.
346  if (w.write_cb.size()) {
347 
348  while (w.write_cb.size()) {
349  c_lock.unlock();
350  d_lock.unlock();
351 
352  bool ret = true;
353  try {
354  ret = w.write_cb.front()();
355  }
356  catch (std::exception& e) {
357  LOG1 << "Dispatcher: exception " << typeid(e).name()
358  << "in write callback.";
359  LOG1 << " what(): " << e.what();
360  throw;
361  }
362 
363  d_lock.lock();
364  c_lock.lock();
365 
366  if (ret) break;
367  w.write_cb.pop_front();
368  }
369 
370  if (w.read_cb.size() == 0 && w.write_cb.size() == 0) {
371  // if all callbacks are done, listen no longer.
372  c->d_->watcher_.erase(this);
373  d_->map_.erase(it);
374  return;
375  }
376  }
377 }
378 
379 } // namespace mock
380 } // namespace net
381 } // namespace thrill
382 
383 /******************************************************************************/
void Cancel(net::Connection &) final
Cancel all callbacks on a given connection.
Definition: group.cpp:265
void DispatchOne(const std::chrono::milliseconds &timeout) final
Definition: group.cpp:284
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
This is a queue, similar to std::queue and tbb::concurrent_bounded_queue, except that it uses mutexes...
size_t peer_
Outgoing peer id of this Connection.
Definition: group.hpp:96
The central object of a mock network: the Group containing links to other mock Group forming the netw...
Definition: group.hpp:116
std::atomic< size_t > tx_bytes_
sent bytes
Definition: connection.hpp:609
#define LOG1
Definition: logger.hpp:28
std::unique_ptr< net::Dispatcher > ConstructDispatcher() const final
Definition: group.cpp:154
ssize_t RecvOne(void *out_data, size_t size) final
Definition: group.cpp:109
size_t my_rank_
our rank in the network group
Definition: group.hpp:232
Connection * conns_
vector of virtual connection objects to remote peers
Definition: group.hpp:157
~Group()
virtual destructor
Definition: group.cpp:139
std::string ToString() const final
return a string representation of this connection, for user output.
Definition: group.cpp:66
void SyncRecv(void *out_data, size_t size) final
Definition: group.cpp:102
A virtual connection through the mock network: each Group has p Connections to its peers...
Definition: group.hpp:41
ssize_t SendOne(const void *data, size_t size, Flags flags=NoFlags) final
Definition: group.cpp:83
iterator begin() noexcept
return mutable iterator to first element
Definition: buffer.hpp:160
A virtual Dispatcher which waits for messages to arrive in the mock network.
Definition: group.hpp:171
std::unique_ptr< Data > d_
pimpl data struct with complex components
Definition: group.hpp:201
static by_string to_string(int val)
convert to string
static std::string MaybeHexdump(const void *data, size_t size)
return hexdump or just [data] if not debugging
Definition: group.cpp:179
iterator end() noexcept
return mutable iterator beyond last element
Definition: buffer.hpp:170
static constexpr bool debug_data
Definition: group.hpp:119
static constexpr bool debug
Definition: group.hpp:118
void SyncSendRecv(const void *send_data, size_t send_size, void *recv_data, size_t recv_size) final
Definition: group.cpp:114
#define die_unequal(X, Y)
Definition: die.hpp:40
void AddRead(net::Connection &_c, const Callback &read_cb) final
Register a buffered read callback and a default exception callback.
Definition: group.cpp:232
unique_ptr< T > make_unique(Manager &manager, Args &&...args)
make_unique with Manager tracking
Definition: allocator.hpp:212
std::unique_ptr< Data > d_
pimpl data struct with complex components
Definition: group.hpp:102
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
AsyncCallback Callback
type for file descriptor readiness callbacks
Definition: group.hpp:177
std::ostream & OutputOstream(std::ostream &os) const final
virtual method to output to a std::ostream
Definition: group.cpp:70
static std::vector< std::unique_ptr< Group > > ConstructLoopbackMesh(size_t num_hosts)
Construct a mock network with num_hosts peers and deliver Group contexts for each of them...
Definition: group.cpp:160
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
Watch & GetWatch(Connection *c)
lookup method
Definition: group.cpp:277
void Close() final
Close.
Definition: group.cpp:152
void Initialize(Group *group, size_t peer)
construct from mock::Group
Definition: group.cpp:51
size_t num_hosts() const final
Return number of connections in this group (= number computing hosts)
Definition: group.cpp:143
void SyncSend(const void *data, size_t size, Flags=NoFlags) final
Definition: group.cpp:76
void InboundMsg(net::Buffer &&msg)
Method which is called by other peers to enqueue a message.
Definition: group.cpp:58
std::vector< Group * > peers_
vector of peers for delivery of messages.
Definition: group.hpp:154
void Send(size_t tgt, net::Buffer &&msg)
Send a buffer to peer tgt. Blocking, ... sort of.
Definition: group.cpp:186
Simple buffer of characters without initialization or growing functionality.
Definition: buffer.hpp:40
void AddWrite(net::Connection &_c, const Callback &write_cb) final
Register a buffered write callback and a default exception callback.
Definition: group.cpp:249
std::string hexdump(const void *const data, size_t size)
Dump a (binary) string as a sequence of uppercase hexadecimal pairs.
Definition: hexdump.cpp:21
net::Buffer RecvNext()
some-what internal function to extract the next packet from the queue.
Definition: group.cpp:88
static void Initialize()
run MPI_Init() if not already done (can be called multiple times).
Definition: group.cpp:641
Flags
Additional flags for sending or receiving.
Definition: connection.hpp:61
net::Connection & connection(size_t peer) final
Return Connection to client id.
Definition: group.cpp:147
void SyncRecvSend(const void *send_data, size_t send_size, void *recv_data, size_t recv_size) final
Definition: group.cpp:120
Group(size_t my_rank, size_t group_size)
Initialize a Group for the given size rank.
Definition: group.cpp:129
Group * group_
Reference to our group.
Definition: group.hpp:93
void Notify(Connection *c)
Definition: group.cpp:269
~Dispatcher()
virtual destructor
Definition: group.cpp:228
size_type size() const noexcept
return number of items in Buffer
Definition: buffer.hpp:141
void Interrupt() final
Interrupt current dispatch.
Definition: group.cpp:273
std::atomic< size_t > rx_bytes_
received bytes
Definition: connection.hpp:612