Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
group.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/tcp/group.cpp
3  *
4  * net::Group is a collection of Connections providing simple MPI-like
5  * collectives and point-to-point communication.
6  *
7  * Part of Project Thrill - http://project-thrill.org
8  *
9  * Copyright (C) 2015 Timo Bingmann <[email protected]>
10  * Copyright (C) 2015 Emanuel J√∂bstl <[email protected]>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #include <thrill/common/logger.hpp>
17 #include <thrill/net/tcp/group.hpp>
19 
20 #include <random>
21 #include <string>
22 #include <thread>
23 #include <utility>
24 #include <vector>
25 
26 namespace thrill {
27 namespace net {
28 namespace tcp {
29 
30 std::unique_ptr<Dispatcher>
32  // construct tcp::SelectDispatcher
33  return std::make_unique<SelectDispatcher>();
34 }
35 
36 std::vector<std::unique_ptr<Group> > Group::ConstructLoopbackMesh(
37  size_t num_hosts) {
38 
39  // construct a group of num_hosts
40  std::vector<std::unique_ptr<Group> > group(num_hosts);
41 
42  for (size_t i = 0; i < num_hosts; ++i) {
43  group[i] = std::make_unique<Group>(i, num_hosts);
44  }
45 
46  // construct a stream socket pair for (i,j) with i < j
47  for (size_t i = 0; i != num_hosts; ++i) {
48  for (size_t j = i + 1; j < num_hosts; ++j) {
49  LOG << "doing Socket::CreatePair() for i=" << i << " j=" << j;
50 
51  std::pair<Socket, Socket> sp = Socket::CreatePair();
52 
53  group[i]->connections_[j] = Connection(std::move(sp.first));
54  group[j]->connections_[i] = Connection(std::move(sp.second));
55 
56  group[i]->connections_[j].is_loopback_ = true;
57  group[j]->connections_[i].is_loopback_ = true;
58  }
59  }
60 
61  return group;
62 }
63 
64 std::vector<std::unique_ptr<Group> > Group::ConstructLocalRealTCPMesh(
65  size_t num_hosts) {
66 
67  // randomize base port number for test
68  std::default_random_engine generator(std::random_device { } ());
69  std::uniform_int_distribution<int> distribution(10000, 30000);
70  const size_t port_base = distribution(generator);
71 
72  std::vector<std::string> endpoints;
73 
74  for (size_t i = 0; i < num_hosts; ++i) {
75  endpoints.push_back("127.0.0.1:" + std::to_string(port_base + i));
76  }
77 
78  sLOG << "Group test uses ports" << port_base << "-" << port_base + num_hosts;
79 
80  std::vector<std::thread> threads(num_hosts);
81 
82  // we have to create and run threads to construct Group because these create
83  // real connections.
84 
85  std::vector<std::unique_ptr<Group> > groups(num_hosts);
86 
87  for (size_t i = 0; i < num_hosts; i++) {
88  threads[i] = std::thread(
89  [i, &endpoints, &groups]() {
90  // construct Group i with endpoints -- with temporary Dispatcher
91  net::tcp::SelectDispatcher dispatcher;
92  Construct(dispatcher, i, endpoints, groups.data() + i, 1);
93  });
94  }
95 
96  for (size_t i = 0; i < num_hosts; i++) {
97  threads[i].join();
98  }
99 
100  return groups;
101 }
102 
103 } // namespace tcp
104 } // namespace net
105 } // namespace thrill
106 
107 /******************************************************************************/
static std::vector< std::unique_ptr< Group > > ConstructLocalRealTCPMesh(size_t num_hosts)
Construct a test network with an underlying full mesh of REAL tcp streams interconnected via localhos...
Definition: group.cpp:64
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
size_t num_hosts() const final
Return number of connections in this group (= number computing hosts)
Definition: group.hpp:130
static std::pair< Socket, Socket > CreatePair()
Definition: socket.hpp:117
SelectDispatcher is a higher level wrapper for select().
Connection is a rich point-to-point socket connection to another client (worker, master, or whatever).
Definition: connection.hpp:54
static by_string to_string(int val)
convert to string
std::unique_ptr< net::Dispatcher > ConstructDispatcher() const final
Definition: group.cpp:31
static std::vector< std::unique_ptr< Group > > ConstructLoopbackMesh(size_t num_hosts)
Construct a test network with an underlying full mesh of local loopback stream sockets for testing...
Definition: group.cpp:36
void Construct(SelectDispatcher &dispatcher, size_t my_rank, const std::vector< std::string > &endpoints, std::unique_ptr< Group > *groups, size_t group_count)
Definition: construct.cpp:543
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24