34 static constexpr
bool debug =
false;
37 Construction(SelectDispatcher& dispatcher,
38 std::unique_ptr<Group>* groups,
size_t group_count)
39 : dispatcher_(dispatcher),
41 group_count_(group_count)
53 const std::vector<std::string>& endpoints) {
55 this->my_rank_ = my_rank_;
58 LOG <<
"Client " << my_rank_ <<
" starting: " << endpoints[my_rank_];
60 for (
size_t i = 0; i < group_count_; i++) {
61 groups_[i] = std::make_unique<Group>(my_rank_, endpoints.size());
65 std::vector<SocketAddress> address_list
66 = GetAddressList(endpoints);
71 listen_socket.SetReuseAddr();
73 SocketAddress& lsa = address_list[my_rank_];
75 if (!listen_socket.bind(lsa))
76 throw Exception(
"Could not bind listen socket to " 77 + lsa.ToStringHostPort(), errno);
79 if (!listen_socket.listen())
80 throw Exception(
"Could not listen on socket " 81 + lsa.ToStringHostPort(), errno);
83 listener_ = Connection(std::move(listen_socket));
86 LOG <<
"Client " << my_rank_ <<
" listening: " << endpoints[my_rank_];
89 for (uint32_t g = 0; g < group_count_; g++) {
90 for (
size_t id = my_rank_ + 1;
id < address_list.size(); ++id) {
91 AsyncConnect(g,
id, address_list[
id]);
96 dispatcher_.AddRead(listener_,
98 return OnIncomingConnection(listener_);
102 while (!IsInitializationFinished())
104 LOG <<
"Client " << my_rank_ <<
" dispatching.";
105 dispatcher_.Dispatch();
108 dispatcher_.Cancel(listener_);
113 LOG <<
"Client " << my_rank_ <<
" done";
115 for (
size_t j = 0; j < group_count_; j++) {
117 for (
size_t i = 0; i != address_list.size(); ++i) {
118 if (i == my_rank_)
continue;
120 <<
" link " << my_rank_ <<
" -> " << i <<
" = fd " 121 << groups_[j]->tcp_connection(i).GetSocket().fd();
128 mem::Manager mem_manager_ {
nullptr,
"Construction" };
131 SelectDispatcher& dispatcher_;
134 std::unique_ptr<Group>* groups_;
140 size_t my_rank_ = size_t(-1);
143 Connection listener_;
146 using GroupNodeIdPair = std::pair<size_t, size_t>;
151 std::deque<Connection> connections_;
155 std::map<GroupNodeIdPair, size_t> timeouts_;
158 const size_t initial_timeout_ = 10;
162 const size_t final_timeout_ = 40960;
168 uint64_t thrill_sign;
178 static constexpr uint64_t thrill_sign = 0x0C7A0C7A0C7A0C7A;
186 std::vector<SocketAddress> GetAddressList(
187 const std::vector<std::string>& endpoints) {
189 std::vector<SocketAddress> addressList;
192 addressList.push_back(SocketAddress(endp));
193 if (!addressList.back().IsValid()) {
195 "Error resolving endpoint " + endp
196 +
": " + addressList.back().GetResolveError());
210 bool IsInitializationFinished() {
212 for (
size_t g = 0; g < group_count_; g++) {
214 for (
size_t id = 0;
id < groups_[g]->num_hosts(); ++id) {
215 if (
id == my_rank_)
continue;
219 if (groups_[g]->tcp_connection(
id).state()
236 void AsyncConnect(net::Connection& nc,
const SocketAddress& address) {
237 assert(dynamic_cast<Connection*>(&nc));
238 Connection& tcp =
static_cast<Connection&
>(nc);
241 tcp.GetSocket().SetNonBlocking(
true);
242 int res = tcp.GetSocket().connect(address);
247 LOG <<
"Early connect success. This should not happen.";
249 OnConnected(tcp, address);
251 else if (errno == EINPROGRESS) {
253 dispatcher_.AddWrite(tcp, [
this, &address, &tcp]() {
254 return OnConnected(tcp, address);
257 else if (errno == ECONNREFUSED) {
258 LOG <<
"Early connect refused.";
260 OnConnected(tcp, address, errno);
267 throw Exception(
"Error starting async connect client " 269 + address.ToStringHostPort(), errno);
282 size_t group,
size_t id,
const SocketAddress& address) {
285 Connection& nc = groups_[group]->tcp_connection(
id);
286 if (nc.IsValid()) nc.Close();
289 nc.set_group_id(group);
292 AsyncConnect(nc, address);
301 void OnHelloSent(net::Connection& conn) {
302 assert(dynamic_cast<Connection*>(&conn));
303 Connection& tcp =
static_cast<Connection&
>(conn);
317 size_t NextConnectTimeout(
size_t group,
size_t id,
318 const SocketAddress& address) {
319 GroupNodeIdPair gnip(group,
id);
320 auto it = timeouts_.find(gnip);
321 if (it == timeouts_.end()) {
322 it = timeouts_.insert(std::make_pair(gnip, initial_timeout_)).first;
326 it->second = 2 * it->second;
328 if (it->second >= final_timeout_) {
329 throw Exception(
"Timeout error connecting to client " 331 + address.ToStringHostPort());
350 bool OnConnected(net::Connection& conn,
const SocketAddress& address,
352 assert(dynamic_cast<Connection*>(&conn));
353 Connection& tcp =
static_cast<Connection&
>(conn);
356 int err = _err ? _err : tcp.GetSocket().GetError();
359 LOG <<
"Client " << my_rank_
361 <<
" but got " << tcp.state();
362 die(
"FAULTY STATE DETECTED");
365 if (err == ECONNREFUSED || err == ETIMEDOUT) {
369 size_t next_timeout = NextConnectTimeout(
370 tcp.group_id(), tcp.peer_id(), address);
372 LOG <<
"Connect to " << address.ToStringHostPort()
373 <<
" fd=" << tcp.GetSocket().fd()
374 <<
" timed out or refused with error " << err <<
"." 375 <<
" Attempting reconnect in " << next_timeout <<
"msec";
377 dispatcher_.AddTimer(
378 std::chrono::milliseconds(next_timeout),
382 AsyncConnect(tcp.group_id(), tcp.peer_id(), address);
392 throw Exception(
"Error connecting asynchronously to client " 394 + address.ToStringHostPort(), err);
401 LOG <<
"OnConnected() " << my_rank_ <<
" connected" 402 <<
" fd=" << tcp.GetSocket().fd()
403 <<
" to=" << tcp.GetSocket().GetPeerAddress()
405 <<
" group=" << tcp.group_id();
408 const WelcomeMsg hello = { thrill_sign, tcp.group_id(), my_rank_ };
410 dispatcher_.AsyncWriteCopy(
411 tcp, 0, &hello,
sizeof(hello),
412 AsyncWriteCallback::make<
413 Construction, &Construction::OnHelloSent>(
this));
415 LOG <<
"Client " << my_rank_ <<
" sent active hello to " 416 <<
"client " << tcp.peer_id() <<
" group id " << tcp.group_id();
418 dispatcher_.AsyncRead(
419 tcp, 0,
sizeof(hello),
420 AsyncReadBufferCallback::make<
421 Construction, &Construction::OnIncomingWelcome>(
this));
434 void OnIncomingWelcome(net::Connection& conn, Buffer&& buffer) {
435 assert(dynamic_cast<Connection*>(&conn));
436 Connection& tcp =
static_cast<Connection&
>(conn);
442 const WelcomeMsg* msg
443 =
reinterpret_cast<const WelcomeMsg*
>(buffer.data());
444 die_unequal(msg->thrill_sign, uint64_t(thrill_sign));
447 if (tcp.peer_id() != msg->id) {
448 LOG <<
"FAULTY ID DETECTED";
451 LOG <<
"client " << my_rank_ <<
" expected signature from client " 452 << tcp.peer_id() <<
" and got signature " 453 <<
"from client " << msg->id;
469 void OnIncomingWelcomeAndReply(net::Connection& conn, Buffer&& buffer) {
470 assert(dynamic_cast<Connection*>(&conn));
471 Connection& tcp =
static_cast<Connection&
>(conn);
476 const WelcomeMsg* msg_in =
reinterpret_cast<const WelcomeMsg*
>(buffer.data());
477 die_unequal(msg_in->thrill_sign, uint64_t(thrill_sign));
479 LOG <<
"client " << my_rank_ <<
" got signature from client" 480 <<
" group " << msg_in->group_id
481 <<
" id " << msg_in->id;
484 die_unless(msg_in->id < groups_[msg_in->group_id]->num_hosts());
486 die_unequal(groups_[msg_in->group_id]->tcp_connection(msg_in->id).state(),
492 tcp.set_peer_id(msg_in->id);
493 tcp.set_group_id(msg_in->group_id);
495 Connection& c = groups_[msg_in->group_id]->AssignConnection(tcp);
499 const WelcomeMsg msg_out = { thrill_sign, msg_in->group_id, my_rank_ };
501 dispatcher_.AsyncWriteCopy(
502 c, 0, &msg_out,
sizeof(msg_out),
503 AsyncWriteCallback::make<
504 Construction, &Construction::OnHelloSent>(
this));
506 LOG <<
"Client " << my_rank_
507 <<
" sent passive hello to client " << msg_in->id;
516 bool OnIncomingConnection(net::Connection& conn) {
517 assert(dynamic_cast<Connection*>(&conn));
518 Connection& tcp =
static_cast<Connection&
>(conn);
521 connections_.emplace_back(tcp.GetSocket().accept());
522 die_unless(connections_.back().GetSocket().IsValid());
526 LOG <<
"OnIncomingConnection() " << my_rank_ <<
" accepted connection" 527 <<
" fd=" << connections_.back().GetSocket().fd()
528 <<
" from=" << connections_.back().GetPeerAddress();
531 dispatcher_.AsyncRead(
532 connections_.back(), 0,
sizeof(WelcomeMsg),
533 AsyncReadBufferCallback::make<
534 Construction, &Construction::OnIncomingWelcomeAndReply>(
this));
544 const std::vector<std::string>& endpoints,
545 std::unique_ptr<Group>* groups,
size_t group_count) {
546 Construction(dispatcher, groups, group_count)
547 .Initialize(my_rank, endpoints);
552 std::vector<std::unique_ptr<net::Group> >
554 const std::vector<std::string>& endpoints,
size_t group_count) {
555 std::vector<std::unique_ptr<tcp::Group> > tcp_groups(group_count);
556 Construction(dispatcher, &tcp_groups[0], tcp_groups.size())
558 std::vector<std::unique_ptr<net::Group> > groups(group_count);
559 std::move(tcp_groups.begin(), tcp_groups.end(), groups.begin());
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
SelectDispatcher is a higher level wrapper for select().
static by_string to_string(int val)
convert to string
#define die_unequal(X, Y)
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
static constexpr bool debug
static Socket Create()
Create a new stream socket.
void Construct(SelectDispatcher &dispatcher, size_t my_rank, const std::vector< std::string > &endpoints, std::unique_ptr< Group > *groups, size_t group_count)
#define LOG
Default logging method: output if the local debug variable is true.