Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
group.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/group.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
12 #include <thrill/net/group.hpp>
13 #include <thrill/net/manager.hpp>
15 
16 #if THRILL_HAVE_NET_TCP
17 #include <thrill/net/tcp/group.hpp>
18 #endif
19 
20 #include <functional>
21 #include <utility>
22 #include <vector>
23 
24 namespace thrill {
25 namespace net {
26 
28  size_t num_hosts,
29  const std::function<void(Group*)>& thread_function) {
30 #if THRILL_HAVE_NET_TCP
31  // construct local tcp network mesh and run threads
34  thread_function);
35 #else
36  // construct mock network mesh and run threads
39  thread_function);
40 #endif
41 }
42 
43 std::ostream& operator << (std::ostream& os, const Traffic& t) {
44  return os << t.total();
45 }
46 
47 /******************************************************************************/
48 // Manager
49 
50 Manager::Manager(std::array<GroupPtr, kGroupCount>&& groups,
51  common::JsonLogger& logger) noexcept
52  : groups_(std::move(groups)), logger_(logger) { }
53 
54 Manager::Manager(std::vector<GroupPtr>&& groups,
55  common::JsonLogger& logger) noexcept
56  : logger_(logger) {
57  assert(groups.size() == kGroupCount);
58  std::move(groups.begin(), groups.end(), groups_.begin());
59 }
60 
62  for (size_t i = 0; i < kGroupCount; i++) {
63  groups_[i]->Close();
64  }
65 }
66 
68  size_t total_tx = 0, total_rx = 0;
69 
70  for (size_t g = 0; g < kGroupCount; ++g) {
71  Group& group = *groups_[g];
72 
73  for (size_t h = 0; h < group.num_hosts(); ++h) {
74  if (h == group.my_host_rank()) continue;
75 
76  total_tx += group.connection(h).tx_bytes_;
77  total_rx += group.connection(h).rx_bytes_;
78  }
79  }
80 
81  return net::Traffic(total_tx, total_rx);
82 }
83 
84 void Manager::RunTask(const std::chrono::steady_clock::time_point& tp) {
85 
86  common::JsonLine line = logger_.line();
87  line << "class" << "NetManager"
88  << "event" << "profile";
89 
90  double elapsed = static_cast<double>(
91  std::chrono::duration_cast<std::chrono::microseconds>(
92  tp - tp_last_).count()) / 1e6;
93 
94  size_t total_tx = 0, total_rx = 0;
95  size_t prev_total_tx = 0, prev_total_rx = 0;
96 
97  for (size_t g = 0; g < kGroupCount; ++g) {
98  Group& group = *groups_[g];
99 
100  size_t group_tx = 0, group_rx = 0;
101  size_t prev_group_tx = 0, prev_group_rx = 0;
102  std::vector<size_t> tx_per_host(group.num_hosts());
103  std::vector<size_t> rx_per_host(group.num_hosts());
104 
105  for (size_t h = 0; h < group.num_hosts(); ++h) {
106  if (h == group.my_host_rank()) continue;
107 
108  Connection& conn = group.connection(h);
109 
110  size_t tx = conn.tx_bytes_.load(std::memory_order_relaxed);
111  size_t rx = conn.rx_bytes_.load(std::memory_order_relaxed);
112  size_t prev_tx = conn.prev_tx_bytes_;
113  size_t prev_rx = conn.prev_rx_bytes_;
114 
115  group_tx += tx;
116  prev_group_tx += prev_tx;
117  group.connection(h).prev_tx_bytes_ = tx;
118 
119  group_rx += rx;
120  prev_group_rx += prev_rx;
121  group.connection(h).prev_rx_bytes_ = rx;
122 
123  tx_per_host[h] = tx;
124  rx_per_host[h] = rx;
125  }
126 
127  line.sub(g == 0 ? "flow" : g == 1 ? "data" : "???")
128  << "tx_bytes" << group_tx
129  << "rx_bytes" << group_rx
130  << "tx_speed"
131  << static_cast<double>(group_tx - prev_group_tx) / elapsed
132  << "rx_speed"
133  << static_cast<double>(group_rx - prev_group_rx) / elapsed
134  << "tx_per_host" << tx_per_host
135  << "rx_per_host" << rx_per_host;
136 
137  total_tx += group_tx;
138  total_rx += group_rx;
139  prev_total_tx += prev_group_tx;
140  prev_total_rx += prev_group_rx;
141 
142  tp_last_ = tp;
143  }
144 
145  // write out totals
146  line
147  << "tx_bytes" << total_tx
148  << "rx_bytes" << total_rx
149  << "tx_speed"
150  << static_cast<double>(total_tx - prev_total_tx) / elapsed
151  << "rx_speed"
152  << static_cast<double>(total_rx - prev_total_rx) / elapsed;
153 }
154 
155 /******************************************************************************/
156 // Group
157 
159  return 0;
160 }
161 
162 /*[[[perl
163  for my $e (
164  ["int", "Int"], ["unsigned int", "UnsignedInt"],
165  ["long", "Long"], ["unsigned long", "UnsignedLong"],
166  ["long long", "LongLong"], ["unsigned long long", "UnsignedLongLong"])
167  {
168  print "void Group::PrefixSumPlus$$e[1]($$e[0]& value, const $$e[0]& initial) {\n";
169  print " return PrefixSumSelect(value, std::plus<$$e[0]>(), initial, true);\n";
170  print "}\n";
171 
172  print "void Group::ExPrefixSumPlus$$e[1]($$e[0]& value, const $$e[0]& initial) {\n";
173  print " return PrefixSumSelect(value, std::plus<$$e[0]>(), initial, false);\n";
174  print "}\n";
175 
176  print "void Group::Broadcast$$e[1]($$e[0]& value, size_t origin) {\n";
177  print " return BroadcastSelect(value, origin);\n";
178  print "}\n";
179 
180  print "void Group::AllReducePlus$$e[1]($$e[0]& value) {\n";
181  print " return AllReduceSelect(value, std::plus<$$e[0]>());\n";
182  print "}\n";
183 
184  print "void Group::AllReduceMinimum$$e[1]($$e[0]& value) {\n";
185  print " return AllReduceSelect(value, common::minimum<$$e[0]>());\n";
186  print "}\n";
187 
188  print "void Group::AllReduceMaximum$$e[1]($$e[0]& value) {\n";
189  print " return AllReduceSelect(value, common::maximum<$$e[0]>());\n";
190  print "}\n";
191  }
192 ]]]*/
193 void Group::PrefixSumPlusInt(int& value, const int& initial) {
194  return PrefixSumSelect(value, std::plus<int>(), initial, true);
195 }
196 void Group::ExPrefixSumPlusInt(int& value, const int& initial) {
197  return PrefixSumSelect(value, std::plus<int>(), initial, false);
198 }
199 void Group::BroadcastInt(int& value, size_t origin) {
200  return BroadcastSelect(value, origin);
201 }
203  return AllReduceSelect(value, std::plus<int>());
204 }
206  return AllReduceSelect(value, common::minimum<int>());
207 }
209  return AllReduceSelect(value, common::maximum<int>());
210 }
211 void Group::PrefixSumPlusUnsignedInt(unsigned int& value, const unsigned int& initial) {
212  return PrefixSumSelect(value, std::plus<unsigned int>(), initial, true);
213 }
214 void Group::ExPrefixSumPlusUnsignedInt(unsigned int& value, const unsigned int& initial) {
215  return PrefixSumSelect(value, std::plus<unsigned int>(), initial, false);
216 }
217 void Group::BroadcastUnsignedInt(unsigned int& value, size_t origin) {
218  return BroadcastSelect(value, origin);
219 }
221  return AllReduceSelect(value, std::plus<unsigned int>());
222 }
225 }
228 }
229 void Group::PrefixSumPlusLong(long& value, const long& initial) {
230  return PrefixSumSelect(value, std::plus<long>(), initial, true);
231 }
232 void Group::ExPrefixSumPlusLong(long& value, const long& initial) {
233  return PrefixSumSelect(value, std::plus<long>(), initial, false);
234 }
235 void Group::BroadcastLong(long& value, size_t origin) {
236  return BroadcastSelect(value, origin);
237 }
239  return AllReduceSelect(value, std::plus<long>());
240 }
242  return AllReduceSelect(value, common::minimum<long>());
243 }
245  return AllReduceSelect(value, common::maximum<long>());
246 }
247 void Group::PrefixSumPlusUnsignedLong(unsigned long& value, const unsigned long& initial) {
248  return PrefixSumSelect(value, std::plus<unsigned long>(), initial, true);
249 }
250 void Group::ExPrefixSumPlusUnsignedLong(unsigned long& value, const unsigned long& initial) {
251  return PrefixSumSelect(value, std::plus<unsigned long>(), initial, false);
252 }
253 void Group::BroadcastUnsignedLong(unsigned long& value, size_t origin) {
254  return BroadcastSelect(value, origin);
255 }
257  return AllReduceSelect(value, std::plus<unsigned long>());
258 }
261 }
264 }
265 void Group::PrefixSumPlusLongLong(long long& value, const long long& initial) {
266  return PrefixSumSelect(value, std::plus<long long>(), initial, true);
267 }
268 void Group::ExPrefixSumPlusLongLong(long long& value, const long long& initial) {
269  return PrefixSumSelect(value, std::plus<long long>(), initial, false);
270 }
271 void Group::BroadcastLongLong(long long& value, size_t origin) {
272  return BroadcastSelect(value, origin);
273 }
275  return AllReduceSelect(value, std::plus<long long>());
276 }
279 }
282 }
283 void Group::PrefixSumPlusUnsignedLongLong(unsigned long long& value, const unsigned long long& initial) {
284  return PrefixSumSelect(value, std::plus<unsigned long long>(), initial, true);
285 }
286 void Group::ExPrefixSumPlusUnsignedLongLong(unsigned long long& value, const unsigned long long& initial) {
287  return PrefixSumSelect(value, std::plus<unsigned long long>(), initial, false);
288 }
289 void Group::BroadcastUnsignedLongLong(unsigned long long& value, size_t origin) {
290  return BroadcastSelect(value, origin);
291 }
292 void Group::AllReducePlusUnsignedLongLong(unsigned long long& value) {
293  return AllReduceSelect(value, std::plus<unsigned long long>());
294 }
297 }
300 }
301 // [[[end]]]
302 
303 } // namespace net
304 } // namespace thrill
305 
306 /******************************************************************************/
virtual void AllReducePlusLongLong(long long &value)
Definition: group.cpp:274
virtual void BroadcastLong(long &value, size_t origin)
Definition: group.cpp:235
virtual void AllReducePlusUnsignedInt(unsigned int &value)
Definition: group.cpp:220
virtual void PrefixSumPlusLongLong(long long &value, const long long &initial)
Definition: group.cpp:265
virtual void AllReduceMinimumUnsignedInt(unsigned int &value)
Definition: group.cpp:223
Manager(const Manager &)=delete
non-copyable: delete copy-constructor
virtual void ExPrefixSumPlusLongLong(long long &value, const long long &initial)
Definition: group.cpp:268
void BroadcastSelect(T &value, size_t origin=0)
select broadcast implementation (often due to total number of processors)
Definition: collective.hpp:238
common::JsonLogger & logger_
JsonLogger for statistics output.
Definition: manager.hpp:111
virtual void ExPrefixSumPlusInt(int &value, const int &initial)
Definition: group.cpp:196
std::atomic< size_t > tx_bytes_
sent bytes
Definition: connection.hpp:609
size_t prev_tx_bytes_
previous read of sent bytes
Definition: connection.hpp:615
virtual void ExPrefixSumPlusUnsignedLongLong(unsigned long long &value, const unsigned long long &initial)
Definition: group.cpp:286
virtual void AllReducePlusInt(int &value)
Definition: group.cpp:202
virtual void BroadcastInt(int &value, size_t origin)
Definition: group.cpp:199
void ExecuteGroupThreads(const std::vector< std::unique_ptr< Group > > &groups, const std::function< void(GroupCalled *)> &thread_function)
Definition: group.hpp:299
virtual Connection & connection(size_t id)=0
Return Connection to client id.
virtual void AllReduceMinimumInt(int &value)
Definition: group.cpp:205
JsonLine line()
create new JsonLine instance which will be written to this logger.
Definition: json_logger.cpp:57
virtual void BroadcastUnsignedLong(unsigned long &value, size_t origin)
Definition: group.cpp:253
virtual void ExPrefixSumPlusLong(long &value, const long &initial)
Definition: group.cpp:232
virtual void PrefixSumPlusUnsignedLong(unsigned long &value, const unsigned long &initial)
Definition: group.cpp:247
virtual void PrefixSumPlusUnsignedLongLong(unsigned long long &value, const unsigned long long &initial)
Definition: group.cpp:283
virtual void AllReduceMinimumLong(long &value)
Definition: group.cpp:241
virtual void AllReduceMaximumUnsignedLong(unsigned long &value)
Definition: group.cpp:262
std::ostream & operator<<(std::ostream &os, const Traffic &t)
Definition: group.cpp:43
size_t my_host_rank() const
Return our rank among hosts in this group.
Definition: group.hpp:69
size_t total() const
both transmitted and received bytes
Definition: manager.hpp:39
virtual void AllReducePlusUnsignedLongLong(unsigned long long &value)
Definition: group.cpp:292
virtual void AllReduceMaximumLongLong(long long &value)
Definition: group.cpp:280
virtual void AllReduceMinimumUnsignedLong(unsigned long &value)
Definition: group.cpp:259
int value
Definition: gen_data.py:41
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
virtual void BroadcastUnsignedLongLong(unsigned long long &value, size_t origin)
Definition: group.cpp:289
virtual size_t num_parallel_async() const
Definition: group.cpp:158
virtual void ExPrefixSumPlusUnsignedInt(unsigned int &value, const unsigned int &initial)
Definition: group.cpp:214
static std::vector< std::unique_ptr< Group > > ConstructLoopbackMesh(size_t num_hosts)
Construct a mock network with num_hosts peers and deliver Group contexts for each of them...
Definition: group.cpp:160
virtual void AllReduceMaximumUnsignedLongLong(unsigned long long &value)
Definition: group.cpp:298
net::Traffic Traffic() const
calculate overall traffic for final stats
Definition: group.cpp:67
static std::vector< std::unique_ptr< Group > > ConstructLoopbackMesh(size_t num_hosts)
Construct a test network with an underlying full mesh of local loopback stream sockets for testing...
Definition: group.cpp:36
virtual void ExPrefixSumPlusUnsignedLong(unsigned long &value, const unsigned long &initial)
Definition: group.cpp:250
void RunLoopbackGroupTest(size_t num_hosts, const std::function< void(Group *)> &thread_function)
Definition: group.cpp:27
virtual void BroadcastUnsignedInt(unsigned int &value, size_t origin)
Definition: group.cpp:217
virtual void AllReduceMaximumUnsignedInt(unsigned int &value)
Definition: group.cpp:226
A network Group is a collection of enumerated communication links, which provides point-to-point comm...
Definition: group.hpp:47
virtual void AllReduceMaximumLong(long &value)
Definition: group.cpp:244
JsonLine sub(const Key &key)
return JsonLine has sub-dictionary of this one
size_t prev_rx_bytes_
previous read of received bytes
Definition: connection.hpp:618
static constexpr size_t kGroupCount
The count of net::Groups to initialize.
Definition: manager.hpp:61
virtual void AllReduceMaximumInt(int &value)
Definition: group.cpp:208
virtual void AllReduceMinimumUnsignedLongLong(unsigned long long &value)
Definition: group.cpp:295
JsonLogger is a receiver of JSON output objects for logging.
Definition: json_logger.hpp:69
void PrefixSumSelect(T &value, BinarySumOp sum_op=BinarySumOp(), const T &initial=T(), bool inclusive=true)
select prefixsum implementation (often due to total number of processors)
Definition: collective.hpp:153
virtual void PrefixSumPlusUnsignedInt(unsigned int &value, const unsigned int &initial)
Definition: group.cpp:211
virtual void AllReduceMinimumLongLong(long long &value)
Definition: group.cpp:277
virtual void PrefixSumPlusInt(int &value, const int &initial)
Definition: group.cpp:193
virtual void AllReducePlusLong(long &value)
Definition: group.cpp:238
std::array< GroupPtr, kGroupCount > groups_
The Groups initialized and managed by this Manager.
Definition: manager.hpp:108
virtual size_t num_hosts() const =0
Return number of connections in this group (= number computing hosts)
virtual void AllReducePlusUnsignedLong(unsigned long &value)
Definition: group.cpp:256
void AllReduceSelect(T &value, BinarySumOp sum_op=BinarySumOp())
select allreduce implementation (often due to total number of processors)
Definition: collective.hpp:550
virtual void BroadcastLongLong(long long &value, size_t origin)
Definition: group.cpp:271
std::chrono::steady_clock::time_point tp_last_
last time statistics where outputted
Definition: manager.hpp:114
void RunTask(const std::chrono::steady_clock::time_point &tp) final
method called by ProfileThread.
Definition: group.cpp:84
virtual void PrefixSumPlusLong(long &value, const long &initial)
Definition: group.cpp:229
JsonLine is an object used to aggregate a set of key:value pairs for output into a JSON log...
std::atomic< size_t > rx_bytes_
received bytes
Definition: connection.hpp:612