Thrill  0.1
group.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/group.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
12 #include <thrill/net/group.hpp>
13 #include <thrill/net/manager.hpp>
15 
16 #if THRILL_HAVE_NET_TCP
17 #include <thrill/net/tcp/group.hpp>
18 #endif
19 
20 #include <functional>
21 #include <utility>
22 #include <vector>
23 
24 namespace thrill {
25 namespace net {
26 
28  size_t num_hosts,
29  const std::function<void(Group*)>& thread_function) {
30 #if THRILL_HAVE_NET_TCP
31  // construct local tcp network mesh and run threads
34  thread_function);
35 #else
36  // construct mock network mesh and run threads
39  thread_function);
40 #endif
41 }
42 
43 std::ostream& operator << (std::ostream& os, const Traffic& t) {
44  return os << t.total();
45 }
46 
47 /******************************************************************************/
48 // Manager
49 
50 Manager::Manager(std::array<GroupPtr, kGroupCount>&& groups,
51  common::JsonLogger& logger) noexcept
52  : groups_(std::move(groups)), logger_(logger) { }
53 
54 Manager::Manager(std::vector<GroupPtr>&& groups,
55  common::JsonLogger& logger) noexcept
56  : logger_(logger) {
57  assert(groups.size() == kGroupCount);
58  std::move(groups.begin(), groups.end(), groups_.begin());
59 }
60 
62  for (size_t i = 0; i < kGroupCount; i++) {
63  groups_[i]->Close();
64  }
65 }
66 
68  size_t total_tx = 0, total_rx = 0;
69 
70  for (size_t g = 0; g < kGroupCount; ++g) {
71  Group& group = *groups_[g];
72 
73  for (size_t h = 0; h < group.num_hosts(); ++h) {
74  if (h == group.my_host_rank()) continue;
75 
76  total_tx += group.connection(h).tx_bytes_;
77  total_rx += group.connection(h).rx_bytes_;
78  }
79  }
80 
81  return net::Traffic(total_tx, total_rx);
82 }
83 
84 void Manager::RunTask(const std::chrono::steady_clock::time_point& tp) {
85 
86  common::JsonLine line = logger_.line();
87  line << "class" << "NetManager"
88  << "event" << "profile";
89 
90  double elapsed = static_cast<double>(
91  std::chrono::duration_cast<std::chrono::microseconds>(
92  tp - tp_last_).count()) / 1e6;
93 
94  size_t total_tx = 0, total_rx = 0;
95  size_t prev_total_tx = 0, prev_total_rx = 0;
96  size_t total_tx_active = 0, total_rx_active = 0;
97 
98  for (size_t g = 0; g < kGroupCount; ++g) {
99  Group& group = *groups_[g];
100 
101  size_t group_tx = 0, group_rx = 0;
102  size_t prev_group_tx = 0, prev_group_rx = 0;
103  size_t group_tx_active = 0, group_rx_active = 0;
104  std::vector<size_t> tx_per_host(group.num_hosts());
105  std::vector<size_t> rx_per_host(group.num_hosts());
106 
107  for (size_t h = 0; h < group.num_hosts(); ++h) {
108  if (h == group.my_host_rank()) continue;
109 
110  Connection& conn = group.connection(h);
111 
112  size_t tx = conn.tx_bytes_.load(std::memory_order_relaxed);
113  size_t rx = conn.rx_bytes_.load(std::memory_order_relaxed);
114  size_t prev_tx = conn.prev_tx_bytes_;
115  size_t prev_rx = conn.prev_rx_bytes_;
116 
117  group_tx += tx;
118  prev_group_tx += prev_tx;
119  group.connection(h).prev_tx_bytes_ = tx;
120  group_tx_active += conn.tx_active_;
121 
122  group_rx += rx;
123  prev_group_rx += prev_rx;
124  group.connection(h).prev_rx_bytes_ = rx;
125  group_rx_active += conn.rx_active_;
126 
127  tx_per_host[h] = tx;
128  rx_per_host[h] = rx;
129  }
130 
131  line.sub(g == 0 ? "flow" : g == 1 ? "data" : "???")
132  << "tx_bytes" << group_tx
133  << "rx_bytes" << group_rx
134  << "tx_speed"
135  << static_cast<double>(group_tx - prev_group_tx) / elapsed
136  << "rx_speed"
137  << static_cast<double>(group_rx - prev_group_rx) / elapsed
138  << "tx_per_host" << tx_per_host
139  << "rx_per_host" << rx_per_host;
140 
141  total_tx += group_tx;
142  total_rx += group_rx;
143  prev_total_tx += prev_group_tx;
144  prev_total_rx += prev_group_rx;
145  total_tx_active += group_tx_active;
146  total_rx_active += group_rx_active;
147 
148  tp_last_ = tp;
149  }
150 
151  // write out totals
152  line
153  << "tx_bytes" << total_tx
154  << "rx_bytes" << total_rx
155  << "tx_speed"
156  << static_cast<double>(total_tx - prev_total_tx) / elapsed
157  << "rx_speed"
158  << static_cast<double>(total_rx - prev_total_rx) / elapsed
159  << "tx_active" << total_tx_active
160  << "rx_active" << total_rx_active;
161 }
162 
163 /******************************************************************************/
164 // Group
165 
167  return 0;
168 }
169 
170 /*[[[perl
171  for my $e (
172  ["int", "Int"], ["unsigned int", "UnsignedInt"],
173  ["long", "Long"], ["unsigned long", "UnsignedLong"],
174  ["long long", "LongLong"], ["unsigned long long", "UnsignedLongLong"])
175  {
176  print "void Group::PrefixSumPlus$$e[1]($$e[0]& value, const $$e[0]& initial) {\n";
177  print " return PrefixSumSelect(value, std::plus<$$e[0]>(), initial, true);\n";
178  print "}\n";
179 
180  print "void Group::ExPrefixSumPlus$$e[1]($$e[0]& value, const $$e[0]& initial) {\n";
181  print " return PrefixSumSelect(value, std::plus<$$e[0]>(), initial, false);\n";
182  print "}\n";
183 
184  print "void Group::Broadcast$$e[1]($$e[0]& value, size_t origin) {\n";
185  print " return BroadcastSelect(value, origin);\n";
186  print "}\n";
187 
188  print "void Group::AllReducePlus$$e[1]($$e[0]& value) {\n";
189  print " return AllReduceSelect(value, std::plus<$$e[0]>());\n";
190  print "}\n";
191 
192  print "void Group::AllReduceMinimum$$e[1]($$e[0]& value) {\n";
193  print " return AllReduceSelect(value, common::minimum<$$e[0]>());\n";
194  print "}\n";
195 
196  print "void Group::AllReduceMaximum$$e[1]($$e[0]& value) {\n";
197  print " return AllReduceSelect(value, common::maximum<$$e[0]>());\n";
198  print "}\n";
199  }
200 ]]]*/
201 void Group::PrefixSumPlusInt(int& value, const int& initial) {
202  return PrefixSumSelect(value, std::plus<int>(), initial, true);
203 }
204 void Group::ExPrefixSumPlusInt(int& value, const int& initial) {
205  return PrefixSumSelect(value, std::plus<int>(), initial, false);
206 }
207 void Group::BroadcastInt(int& value, size_t origin) {
208  return BroadcastSelect(value, origin);
209 }
211  return AllReduceSelect(value, std::plus<int>());
212 }
214  return AllReduceSelect(value, common::minimum<int>());
215 }
217  return AllReduceSelect(value, common::maximum<int>());
218 }
219 void Group::PrefixSumPlusUnsignedInt(unsigned int& value, const unsigned int& initial) {
220  return PrefixSumSelect(value, std::plus<unsigned int>(), initial, true);
221 }
222 void Group::ExPrefixSumPlusUnsignedInt(unsigned int& value, const unsigned int& initial) {
223  return PrefixSumSelect(value, std::plus<unsigned int>(), initial, false);
224 }
225 void Group::BroadcastUnsignedInt(unsigned int& value, size_t origin) {
226  return BroadcastSelect(value, origin);
227 }
229  return AllReduceSelect(value, std::plus<unsigned int>());
230 }
232  return AllReduceSelect(value, common::minimum<unsigned int>());
233 }
235  return AllReduceSelect(value, common::maximum<unsigned int>());
236 }
237 void Group::PrefixSumPlusLong(long& value, const long& initial) {
238  return PrefixSumSelect(value, std::plus<long>(), initial, true);
239 }
240 void Group::ExPrefixSumPlusLong(long& value, const long& initial) {
241  return PrefixSumSelect(value, std::plus<long>(), initial, false);
242 }
243 void Group::BroadcastLong(long& value, size_t origin) {
244  return BroadcastSelect(value, origin);
245 }
247  return AllReduceSelect(value, std::plus<long>());
248 }
250  return AllReduceSelect(value, common::minimum<long>());
251 }
253  return AllReduceSelect(value, common::maximum<long>());
254 }
255 void Group::PrefixSumPlusUnsignedLong(unsigned long& value, const unsigned long& initial) {
256  return PrefixSumSelect(value, std::plus<unsigned long>(), initial, true);
257 }
258 void Group::ExPrefixSumPlusUnsignedLong(unsigned long& value, const unsigned long& initial) {
259  return PrefixSumSelect(value, std::plus<unsigned long>(), initial, false);
260 }
261 void Group::BroadcastUnsignedLong(unsigned long& value, size_t origin) {
262  return BroadcastSelect(value, origin);
263 }
265  return AllReduceSelect(value, std::plus<unsigned long>());
266 }
268  return AllReduceSelect(value, common::minimum<unsigned long>());
269 }
271  return AllReduceSelect(value, common::maximum<unsigned long>());
272 }
273 void Group::PrefixSumPlusLongLong(long long& value, const long long& initial) {
274  return PrefixSumSelect(value, std::plus<long long>(), initial, true);
275 }
276 void Group::ExPrefixSumPlusLongLong(long long& value, const long long& initial) {
277  return PrefixSumSelect(value, std::plus<long long>(), initial, false);
278 }
279 void Group::BroadcastLongLong(long long& value, size_t origin) {
280  return BroadcastSelect(value, origin);
281 }
283  return AllReduceSelect(value, std::plus<long long>());
284 }
286  return AllReduceSelect(value, common::minimum<long long>());
287 }
289  return AllReduceSelect(value, common::maximum<long long>());
290 }
291 void Group::PrefixSumPlusUnsignedLongLong(unsigned long long& value, const unsigned long long& initial) {
292  return PrefixSumSelect(value, std::plus<unsigned long long>(), initial, true);
293 }
294 void Group::ExPrefixSumPlusUnsignedLongLong(unsigned long long& value, const unsigned long long& initial) {
295  return PrefixSumSelect(value, std::plus<unsigned long long>(), initial, false);
296 }
297 void Group::BroadcastUnsignedLongLong(unsigned long long& value, size_t origin) {
298  return BroadcastSelect(value, origin);
299 }
300 void Group::AllReducePlusUnsignedLongLong(unsigned long long& value) {
301  return AllReduceSelect(value, std::plus<unsigned long long>());
302 }
304  return AllReduceSelect(value, common::minimum<unsigned long long>());
305 }
307  return AllReduceSelect(value, common::maximum<unsigned long long>());
308 }
309 // [[[end]]]
310 
311 } // namespace net
312 } // namespace thrill
313 
314 /******************************************************************************/
virtual void AllReducePlusLongLong(long long &value)
Definition: group.cpp:282
virtual void BroadcastLong(long &value, size_t origin)
Definition: group.cpp:243
virtual void AllReducePlusUnsignedInt(unsigned int &value)
Definition: group.cpp:228
virtual void PrefixSumPlusLongLong(long long &value, const long long &initial)
Definition: group.cpp:273
virtual void AllReduceMinimumUnsignedInt(unsigned int &value)
Definition: group.cpp:231
Manager(const Manager &)=delete
non-copyable: delete copy-constructor
virtual void ExPrefixSumPlusLongLong(long long &value, const long long &initial)
Definition: group.cpp:276
virtual size_t num_parallel_async() const
Definition: group.cpp:166
common::JsonLogger & logger_
JsonLogger for statistics output.
Definition: manager.hpp:111
virtual void ExPrefixSumPlusInt(int &value, const int &initial)
Definition: group.cpp:204
std::atomic< size_t > tx_bytes_
sent bytes
Definition: connection.hpp:609
size_t prev_tx_bytes_
previous read of sent bytes
Definition: connection.hpp:615
virtual void ExPrefixSumPlusUnsignedLongLong(unsigned long long &value, const unsigned long long &initial)
Definition: group.cpp:294
virtual void AllReducePlusInt(int &value)
Definition: group.cpp:210
virtual void BroadcastInt(int &value, size_t origin)
Definition: group.cpp:207
void ExecuteGroupThreads(const std::vector< std::unique_ptr< Group > > &groups, const std::function< void(GroupCalled *)> &thread_function)
Definition: group.hpp:299
virtual Connection & connection(size_t id)=0
Return Connection to client id.
virtual void AllReduceMinimumInt(int &value)
Definition: group.cpp:213
JsonLine line()
create new JsonLine instance which will be written to this logger.
Definition: json_logger.cpp:57
virtual void BroadcastUnsignedLong(unsigned long &value, size_t origin)
Definition: group.cpp:261
virtual void ExPrefixSumPlusLong(long &value, const long &initial)
Definition: group.cpp:240
virtual void PrefixSumPlusUnsignedLong(unsigned long &value, const unsigned long &initial)
Definition: group.cpp:255
virtual void PrefixSumPlusUnsignedLongLong(unsigned long long &value, const unsigned long long &initial)
Definition: group.cpp:291
virtual void AllReduceMinimumLong(long &value)
Definition: group.cpp:249
size_t total() const
both transmitted and received bytes
Definition: manager.hpp:39
virtual void AllReduceMaximumUnsignedLong(unsigned long &value)
Definition: group.cpp:270
std::ostream & operator<<(std::ostream &os, const Traffic &t)
Definition: group.cpp:43
virtual void AllReducePlusUnsignedLongLong(unsigned long long &value)
Definition: group.cpp:300
virtual void AllReduceMaximumLongLong(long long &value)
Definition: group.cpp:288
virtual void AllReduceMinimumUnsignedLong(unsigned long &value)
Definition: group.cpp:267
int value
Definition: gen_data.py:41
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
virtual void BroadcastUnsignedLongLong(unsigned long long &value, size_t origin)
Definition: group.cpp:297
virtual void ExPrefixSumPlusUnsignedInt(unsigned int &value, const unsigned int &initial)
Definition: group.cpp:222
static std::vector< std::unique_ptr< Group > > ConstructLoopbackMesh(size_t num_hosts)
Construct a mock network with num_hosts peers and deliver Group contexts for each of them...
Definition: group.cpp:162
virtual void AllReduceMaximumUnsignedLongLong(unsigned long long &value)
Definition: group.cpp:306
static std::vector< std::unique_ptr< Group > > ConstructLoopbackMesh(size_t num_hosts)
Construct a test network with an underlying full mesh of local loopback stream sockets for testing...
Definition: group.cpp:36
virtual void ExPrefixSumPlusUnsignedLong(unsigned long &value, const unsigned long &initial)
Definition: group.cpp:258
net::Traffic Traffic() const
calculate overall traffic for final stats
Definition: group.cpp:67
void RunLoopbackGroupTest(size_t num_hosts, const std::function< void(Group *)> &thread_function)
Definition: group.cpp:27
virtual void BroadcastUnsignedInt(unsigned int &value, size_t origin)
Definition: group.cpp:225
virtual void AllReduceMaximumUnsignedInt(unsigned int &value)
Definition: group.cpp:234
A network Group is a collection of enumerated communication links, which provides point-to-point comm...
Definition: group.hpp:47
virtual void AllReduceMaximumLong(long &value)
Definition: group.cpp:252
JsonLine sub(const Key &key)
return JsonLine has sub-dictionary of this one
size_t prev_rx_bytes_
previous read of received bytes
Definition: connection.hpp:618
static constexpr size_t kGroupCount
The count of net::Groups to initialize.
Definition: manager.hpp:61
virtual void AllReduceMaximumInt(int &value)
Definition: group.cpp:216
virtual void AllReduceMinimumUnsignedLongLong(unsigned long long &value)
Definition: group.cpp:303
JsonLogger is a receiver of JSON output objects for logging.
Definition: json_logger.hpp:69
std::atomic< size_t > tx_active_
active send requests
Definition: connection.hpp:621
virtual void PrefixSumPlusUnsignedInt(unsigned int &value, const unsigned int &initial)
Definition: group.cpp:219
size_t my_host_rank() const
Return our rank among hosts in this group.
Definition: group.hpp:69
virtual void AllReduceMinimumLongLong(long long &value)
Definition: group.cpp:285
virtual void PrefixSumPlusInt(int &value, const int &initial)
Definition: group.cpp:201
virtual void AllReducePlusLong(long &value)
Definition: group.cpp:246
std::array< GroupPtr, kGroupCount > groups_
The Groups initialized and managed by this Manager.
Definition: manager.hpp:108
virtual size_t num_hosts() const =0
Return number of connections in this group (= number computing hosts)
virtual void AllReducePlusUnsignedLong(unsigned long &value)
Definition: group.cpp:264
virtual void BroadcastLongLong(long long &value, size_t origin)
Definition: group.cpp:279
std::chrono::steady_clock::time_point tp_last_
last time statistics where outputted
Definition: manager.hpp:114
void RunTask(const std::chrono::steady_clock::time_point &tp) final
method called by ProfileThread.
Definition: group.cpp:84
virtual void PrefixSumPlusLong(long &value, const long &initial)
Definition: group.cpp:237
std::atomic< size_t > rx_active_
active recv requests
Definition: connection.hpp:624
JsonLine is an object used to aggregate a set of key:value pairs for output into a JSON log...
std::atomic< size_t > rx_bytes_
received bytes
Definition: connection.hpp:612