Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
construct.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/tcp/construct.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Emanuel J√∂bstl <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
14 #include <thrill/net/tcp/group.hpp>
16 
17 #include <tlx/die.hpp>
18 
19 #include <deque>
20 #include <map>
21 #include <string>
22 #include <utility>
23 #include <vector>
24 
25 namespace thrill {
26 namespace net {
27 namespace tcp {
28 
29 //! \addtogroup net_tcp TCP Socket API
30 //! \{
31 
32 class Construction
33 {
34  static constexpr bool debug = false;
35 
36 public:
37  Construction(SelectDispatcher& dispatcher,
38  std::unique_ptr<Group>* groups, size_t group_count)
39  : dispatcher_(dispatcher),
40  groups_(groups),
41  group_count_(group_count)
42  { }
43 
44  /*!
45  * Initializes this Manager and initializes all Groups. When this method
46  * returns, the network system is ready to use.
47  *
48  * \param my_rank_ The rank of the worker that owns this Manager.
49  * \param endpoints The ordered list of all endpoints, including the local worker,
50  * where the endpoint at position i corresponds to the worker with id i.
51  */
52  void Initialize(size_t my_rank_,
53  const std::vector<std::string>& endpoints) {
54 
55  this->my_rank_ = my_rank_;
56  die_unless(my_rank_ < endpoints.size());
57 
58  LOG << "Client " << my_rank_ << " starting: " << endpoints[my_rank_];
59 
60  for (size_t i = 0; i < group_count_; i++) {
61  groups_[i] = std::make_unique<Group>(my_rank_, endpoints.size());
62  }
63 
64  // Parse endpoints.
65  std::vector<SocketAddress> address_list
66  = GetAddressList(endpoints);
67 
68  // Create listening socket.
69  {
70  Socket listen_socket = Socket::Create();
71  listen_socket.SetReuseAddr();
72 
73  SocketAddress& lsa = address_list[my_rank_];
74 
75  if (!listen_socket.bind(lsa))
76  throw Exception("Could not bind listen socket to "
77  + lsa.ToStringHostPort(), errno);
78 
79  if (!listen_socket.listen())
80  throw Exception("Could not listen on socket "
81  + lsa.ToStringHostPort(), errno);
82 
83  listener_ = Connection(std::move(listen_socket));
84  }
85 
86  LOG << "Client " << my_rank_ << " listening: " << endpoints[my_rank_];
87 
88  // Initiate connections to all hosts with higher id.
89  for (uint32_t g = 0; g < group_count_; g++) {
90  for (size_t id = my_rank_ + 1; id < address_list.size(); ++id) {
91  AsyncConnect(g, id, address_list[id]);
92  }
93  }
94 
95  // Add reads to the dispatcher to accept new connections.
96  dispatcher_.AddRead(listener_,
97  [=]() {
98  return OnIncomingConnection(listener_);
99  });
100 
101  // Dispatch until everything is connected.
102  while (!IsInitializationFinished())
103  {
104  LOG << "Client " << my_rank_ << " dispatching.";
105  dispatcher_.Dispatch();
106  }
107 
108  dispatcher_.Cancel(listener_);
109 
110  // All connected, Dispose listener.
111  listener_.Close();
112 
113  LOG << "Client " << my_rank_ << " done";
114 
115  for (size_t j = 0; j < group_count_; j++) {
116  // output list of file descriptors connected to partners
117  for (size_t i = 0; i != address_list.size(); ++i) {
118  if (i == my_rank_) continue;
119  LOG << "Group " << j
120  << " link " << my_rank_ << " -> " << i << " = fd "
121  << groups_[j]->tcp_connection(i).GetSocket().fd();
122  }
123  }
124  }
125 
126 private:
127  //! Temporary Manager for construction
128  mem::Manager mem_manager_ { nullptr, "Construction" };
129 
130  //! Dispatcher instance used by this Manager to perform async operations.
131  SelectDispatcher& dispatcher_;
132 
133  //! Link to groups to initialize
134  std::unique_ptr<Group>* groups_;
135 
136  //! number of groups to initialize
137  size_t group_count_;
138 
139  //! The rank associated with the local worker.
140  size_t my_rank_ = size_t(-1);
141 
142  //! The Connections responsible for listening to incoming connections.
143  Connection listener_;
144 
145  //! Some definitions for convenience
146  using GroupNodeIdPair = std::pair<size_t, size_t>;
147 
148  //! Array of opened connections that are not assigned to any (group,id)
149  //! client, yet. This must be a deque. When welcomes are received the
150  //! Connection is moved out of the deque into the right Group.
151  std::deque<Connection> connections_;
152 
153  //! Array of connect timeouts which are exponentially increased from 10msec
154  //! on failed connects.
155  std::map<GroupNodeIdPair, size_t> timeouts_;
156 
157  //! start connect backoff at 10msec
158  const size_t initial_timeout_ = 10;
159 
160  //! maximum connect backoff, after which the program fails. Total waiting
161  //! time is about 2 * final_timeout_ (in millisec).
162  const size_t final_timeout_ = 40960;
163 
164  //! Represents a welcome message that is exchanged by Connections during
165  //! network initialization.
166  struct WelcomeMsg {
167  //! the Thrill signature flag.
168  uint64_t thrill_sign;
169 
170  //! the id of the Group associated with the sending Connection.
171  size_t group_id;
172 
173  //! the id of the worker associated with the sending Connection.
174  size_t id;
175  };
176 
177  //! The Thrill signature flag - introduced by Master Timo.
178  static constexpr uint64_t thrill_sign = 0x0C7A0C7A0C7A0C7A;
179 
180  /*!
181  * Converts a Thrill endpoint list into a list of socket address.
182  *
183  * \param endpoints The endpoint list to convert.
184  * \return The socket addresses to use internally.
185  */
186  std::vector<SocketAddress> GetAddressList(
187  const std::vector<std::string>& endpoints) {
188 
189  std::vector<SocketAddress> addressList;
190  for (const std::string& endp : endpoints)
191  {
192  addressList.push_back(SocketAddress(endp));
193  if (!addressList.back().IsValid()) {
194  throw Exception(
195  "Error resolving endpoint " + endp
196  + ": " + addressList.back().GetResolveError());
197  }
198  }
199 
200  return addressList;
201  }
202 
203  /*!
204  * Returns wether the initialization is completed. Checks the Groups
205  * associated with this Manager and returns true or false wether the
206  * initialization is finished.
207  *
208  * \return True if initialization is finished, else false.
209  */
210  bool IsInitializationFinished() {
211 
212  for (size_t g = 0; g < group_count_; g++) {
213 
214  for (size_t id = 0; id < groups_[g]->num_hosts(); ++id) {
215  if (id == my_rank_) continue;
216 
217  // Just checking the state works since this implicitey checks the
218  // size. Unset connections have state ConnectionState::Invalid.
219  if (groups_[g]->tcp_connection(id).state()
221  return false;
222  }
223  }
224 
225  return true;
226  }
227 
228  /*!
229  * Starts connecting to the net connection specified. Starts connecting to
230  * the endpoint specified by the parameters. This method executes
231  * asynchronously.
232  *
233  * \param nc The connection to connect.
234  * \param address The address of the endpoint to connect to.
235  */
236  void AsyncConnect(net::Connection& nc, const SocketAddress& address) {
237  assert(dynamic_cast<Connection*>(&nc));
238  Connection& tcp = static_cast<Connection&>(nc);
239 
240  // Start asynchronous connect.
241  tcp.GetSocket().SetNonBlocking(true);
242  int res = tcp.GetSocket().connect(address);
243 
244  tcp.set_state(ConnectionState::Connecting);
245 
246  if (res == 0) {
247  LOG << "Early connect success. This should not happen.";
248  // connect() already successful? this should not be.
249  OnConnected(tcp, address);
250  }
251  else if (errno == EINPROGRESS) {
252  // connect is in progress, will wait for completion.
253  dispatcher_.AddWrite(tcp, [this, &address, &tcp]() {
254  return OnConnected(tcp, address);
255  });
256  }
257  else if (errno == ECONNREFUSED) {
258  LOG << "Early connect refused.";
259  // connect() already refused connection?
260  OnConnected(tcp, address, errno);
261  }
262  else {
263  // Failed to even try the connection - this might be a permanent
264  // error.
265  tcp.set_state(ConnectionState::Invalid);
266 
267  throw Exception("Error starting async connect client "
268  + std::to_string(tcp.peer_id()) + " via "
269  + address.ToStringHostPort(), errno);
270  }
271  }
272 
273  /*!
274  * Starts connecting to the endpoint specified by the parameters. This
275  * method executes asynchronously.
276  *
277  * \param group The id of the Group to connect to.
278  * \param id The id of the worker to connect to.
279  * \param address The address of the endpoint to connect to.
280  */
281  void AsyncConnect(
282  size_t group, size_t id, const SocketAddress& address) {
283 
284  // Construct a new socket (old one is destroyed)
285  Connection& nc = groups_[group]->tcp_connection(id);
286  if (nc.IsValid()) nc.Close();
287 
288  nc = Connection(Socket::Create());
289  nc.set_group_id(group);
290  nc.set_peer_id(id);
291 
292  AsyncConnect(nc, address);
293  }
294 
295  /*!
296  * Is called whenever a hello is sent. For outgoing connections, this is
297  * the final step in the state machine.
298  *
299  * \param conn The connection for which the hello is sent.
300  */
301  void OnHelloSent(net::Connection& conn) {
302  assert(dynamic_cast<Connection*>(&conn));
303  Connection& tcp = static_cast<Connection&>(conn);
304 
305  if (tcp.state() == ConnectionState::TransportConnected) {
306  tcp.set_state(ConnectionState::HelloSent);
307  }
308  else if (tcp.state() == ConnectionState::HelloReceived) {
309  tcp.set_state(ConnectionState::Connected);
310  }
311  else {
312  die("State mismatch: " + std::to_string(tcp.state()));
313  }
314  }
315 
316  //! calculate the next timeout on connect() errors
317  size_t NextConnectTimeout(size_t group, size_t id,
318  const SocketAddress& address) {
319  GroupNodeIdPair gnip(group, id);
320  auto it = timeouts_.find(gnip);
321  if (it == timeouts_.end()) {
322  it = timeouts_.insert(std::make_pair(gnip, initial_timeout_)).first;
323  }
324  else {
325  // exponential backoff of reconnects.
326  it->second = 2 * it->second;
327 
328  if (it->second >= final_timeout_) {
329  throw Exception("Timeout error connecting to client "
330  + std::to_string(id) + " via "
331  + address.ToStringHostPort());
332  }
333  }
334  return it->second;
335  }
336 
337  /*!
338  * Called when a connection initiated by us succeeds on a network level.
339  * The Thrill welcome messages still have to be exchanged.
340  *
341  * \param conn The connection that was connected successfully.
342  *
343  * \param address The associated address. This parameter is needed in case
344  * we need to reconnect.
345  *
346  * \param _err An errno state if called synchronously after a connect().
347  *
348  * \return A bool indicating wether this callback should stay registered.
349  */
350  bool OnConnected(net::Connection& conn, const SocketAddress& address,
351  int _err = 0) {
352  assert(dynamic_cast<Connection*>(&conn));
353  Connection& tcp = static_cast<Connection&>(conn);
354 
355  // First, check if everything went well.
356  int err = _err ? _err : tcp.GetSocket().GetError();
357 
358  if (tcp.state() != ConnectionState::Connecting) {
359  LOG << "Client " << my_rank_
360  << " expected connection state " << ConnectionState::Connecting
361  << " but got " << tcp.state();
362  die("FAULTY STATE DETECTED");
363  }
364 
365  if (err == ECONNREFUSED || err == ETIMEDOUT) {
366 
367  // Connection refused. The other workers might not be online yet.
368 
369  size_t next_timeout = NextConnectTimeout(
370  tcp.group_id(), tcp.peer_id(), address);
371 
372  LOG << "Connect to " << address.ToStringHostPort()
373  << " fd=" << tcp.GetSocket().fd()
374  << " timed out or refused with error " << err << "."
375  << " Attempting reconnect in " << next_timeout << "msec";
376 
377  dispatcher_.AddTimer(
378  std::chrono::milliseconds(next_timeout),
379  [&]() {
380  // Construct a new connection since the socket might not be
381  // reusable.
382  AsyncConnect(tcp.group_id(), tcp.peer_id(), address);
383  return false;
384  });
385 
386  return false;
387  }
388  else if (err != 0) {
389  // Other failure. Fail hard.
390  tcp.set_state(ConnectionState::Invalid);
391 
392  throw Exception("Error connecting asynchronously to client "
393  + std::to_string(tcp.peer_id()) + " via "
394  + address.ToStringHostPort(), err);
395  }
396 
397  die_unless(tcp.GetSocket().IsValid());
398 
400 
401  LOG << "OnConnected() " << my_rank_ << " connected"
402  << " fd=" << tcp.GetSocket().fd()
403  << " to=" << tcp.GetSocket().GetPeerAddress()
404  << " err=" << err
405  << " group=" << tcp.group_id();
406 
407  // send welcome message
408  const WelcomeMsg hello = { thrill_sign, tcp.group_id(), my_rank_ };
409 
410  dispatcher_.AsyncWriteCopy(
411  tcp, /* seq */ 0, &hello, sizeof(hello),
412  AsyncWriteCallback::make<
413  Construction, &Construction::OnHelloSent>(this));
414 
415  LOG << "Client " << my_rank_ << " sent active hello to "
416  << "client " << tcp.peer_id() << " group id " << tcp.group_id();
417 
418  dispatcher_.AsyncRead(
419  tcp, /* seq */ 0, sizeof(hello),
420  AsyncReadBufferCallback::make<
421  Construction, &Construction::OnIncomingWelcome>(this));
422 
423  return false;
424  }
425 
426  /*!
427  * Receives and handels a hello message without sending a reply.
428  *
429  * \param conn The connection the hello was received for.
430  * \param buffer The buffer containing the welcome message.
431  *
432  * \return A boolean indicating wether this handler should stay attached.
433  */
434  void OnIncomingWelcome(net::Connection& conn, Buffer&& buffer) {
435  assert(dynamic_cast<Connection*>(&conn));
436  Connection& tcp = static_cast<Connection&>(conn);
437 
438  die_unless(tcp.GetSocket().IsValid());
439  die_unequal(buffer.size(), sizeof(WelcomeMsg));
441 
442  const WelcomeMsg* msg
443  = reinterpret_cast<const WelcomeMsg*>(buffer.data());
444  die_unequal(msg->thrill_sign, uint64_t(thrill_sign));
445  // We already know those values since we connected actively. So, check
446  // for any errors.
447  if (tcp.peer_id() != msg->id) {
448  LOG << "FAULTY ID DETECTED";
449  }
450 
451  LOG << "client " << my_rank_ << " expected signature from client "
452  << tcp.peer_id() << " and got signature "
453  << "from client " << msg->id;
454 
455  die_unequal(tcp.peer_id(), msg->id);
456  die_unequal(tcp.group_id(), msg->group_id);
457 
458  tcp.set_state(ConnectionState::Connected);
459  }
460 
461  /*!
462  * Receives and handles a welcome message. Also sends a reply.
463  *
464  * \param conn The connection that received the welcome message.
465  * \param buffer The buffer containing the welcome message.
466  *
467  * \return A boolean indicating wether this handler should stay attached.
468  */
469  void OnIncomingWelcomeAndReply(net::Connection& conn, Buffer&& buffer) {
470  assert(dynamic_cast<Connection*>(&conn));
471  Connection& tcp = static_cast<Connection&>(conn);
472 
473  die_unless(tcp.GetSocket().IsValid());
475 
476  const WelcomeMsg* msg_in = reinterpret_cast<const WelcomeMsg*>(buffer.data());
477  die_unequal(msg_in->thrill_sign, uint64_t(thrill_sign));
478 
479  LOG << "client " << my_rank_ << " got signature from client"
480  << " group " << msg_in->group_id
481  << " id " << msg_in->id;
482 
483  die_unless(msg_in->group_id < group_count_);
484  die_unless(msg_in->id < groups_[msg_in->group_id]->num_hosts());
485 
486  die_unequal(groups_[msg_in->group_id]->tcp_connection(msg_in->id).state(),
488 
489  // move connection into Group.
490 
491  tcp.set_state(ConnectionState::HelloReceived);
492  tcp.set_peer_id(msg_in->id);
493  tcp.set_group_id(msg_in->group_id);
494 
495  Connection& c = groups_[msg_in->group_id]->AssignConnection(tcp);
496 
497  // send welcome message (via new connection's place)
498 
499  const WelcomeMsg msg_out = { thrill_sign, msg_in->group_id, my_rank_ };
500 
501  dispatcher_.AsyncWriteCopy(
502  c, /* seq */ 0, &msg_out, sizeof(msg_out),
503  AsyncWriteCallback::make<
504  Construction, &Construction::OnHelloSent>(this));
505 
506  LOG << "Client " << my_rank_
507  << " sent passive hello to client " << msg_in->id;
508  }
509 
510  /*!
511  * Handles incoming connections.
512  *
513  * \param conn The listener connection.
514  * \return A boolean indicating wether this handler should stay attached.
515  */
516  bool OnIncomingConnection(net::Connection& conn) {
517  assert(dynamic_cast<Connection*>(&conn));
518  Connection& tcp = static_cast<Connection&>(conn);
519 
520  // accept listening socket
521  connections_.emplace_back(tcp.GetSocket().accept());
522  die_unless(connections_.back().GetSocket().IsValid());
523 
525 
526  LOG << "OnIncomingConnection() " << my_rank_ << " accepted connection"
527  << " fd=" << connections_.back().GetSocket().fd()
528  << " from=" << connections_.back().GetPeerAddress();
529 
530  // wait for welcome message from other side
531  dispatcher_.AsyncRead(
532  connections_.back(), /* seq */ 0, sizeof(WelcomeMsg),
533  AsyncReadBufferCallback::make<
534  Construction, &Construction::OnIncomingWelcomeAndReply>(this));
535 
536  // wait for more connections.
537  return true;
538  }
539 };
540 
541 //! Connect to peers via endpoints using TCP sockets. Construct a group_count
542 //! tcp::Group objects at once. Within each Group this host has my_rank.
543 void Construct(SelectDispatcher& dispatcher, size_t my_rank,
544  const std::vector<std::string>& endpoints,
545  std::unique_ptr<Group>* groups, size_t group_count) {
546  Construction(dispatcher, groups, group_count)
547  .Initialize(my_rank, endpoints);
548 }
549 
550 //! Connect to peers via endpoints using TCP sockets. Construct a group_count
551 //! net::Group objects at once. Within each Group this host has my_rank.
552 std::vector<std::unique_ptr<net::Group> >
553 Construct(SelectDispatcher& dispatcher, size_t my_rank,
554  const std::vector<std::string>& endpoints, size_t group_count) {
555  std::vector<std::unique_ptr<tcp::Group> > tcp_groups(group_count);
556  Construction(dispatcher, &tcp_groups[0], tcp_groups.size())
557  .Initialize(my_rank, endpoints);
558  std::vector<std::unique_ptr<net::Group> > groups(group_count);
559  std::move(tcp_groups.begin(), tcp_groups.end(), groups.begin());
560  return groups;
561 }
562 
563 //! \}
564 
565 } // namespace tcp
566 } // namespace net
567 } // namespace thrill
568 
569 /******************************************************************************/
#define die_unless(X)
Definition: die.hpp:27
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
SelectDispatcher is a higher level wrapper for select().
static by_string to_string(int val)
convert to string
#define die_unequal(X, Y)
Definition: die.hpp:40
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
static constexpr bool debug
static Socket Create()
Create a new stream socket.
Definition: socket.hpp:93
void Construct(SelectDispatcher &dispatcher, size_t my_rank, const std::vector< std::string > &endpoints, std::unique_ptr< Group > *groups, size_t group_count)
Definition: construct.cpp:543
static bool Initialize()
Definition: context.cpp:273
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24