16 #if THRILL_HAVE_NET_TCP 29 const std::function<
void(
Group*)>& thread_function) {
30 #if THRILL_HAVE_NET_TCP 44 return os << t.
total();
52 : groups_(std::move(groups)), logger_(logger) { }
57 assert(groups.size() == kGroupCount);
58 std::move(groups.begin(), groups.end(), groups_.begin());
68 size_t total_tx = 0, total_rx = 0;
73 for (
size_t h = 0; h < group.
num_hosts(); ++h) {
87 line <<
"class" <<
"NetManager" 88 <<
"event" <<
"profile";
90 double elapsed =
static_cast<double>(
91 std::chrono::duration_cast<std::chrono::microseconds>(
94 size_t total_tx = 0, total_rx = 0;
95 size_t prev_total_tx = 0, prev_total_rx = 0;
96 size_t total_tx_active = 0, total_rx_active = 0;
101 size_t group_tx = 0, group_rx = 0;
102 size_t prev_group_tx = 0, prev_group_rx = 0;
103 size_t group_tx_active = 0, group_rx_active = 0;
104 std::vector<size_t> tx_per_host(group.
num_hosts());
105 std::vector<size_t> rx_per_host(group.
num_hosts());
107 for (
size_t h = 0; h < group.
num_hosts(); ++h) {
112 size_t tx = conn.
tx_bytes_.load(std::memory_order_relaxed);
113 size_t rx = conn.
rx_bytes_.load(std::memory_order_relaxed);
118 prev_group_tx += prev_tx;
123 prev_group_rx += prev_rx;
131 line.
sub(g == 0 ?
"flow" : g == 1 ?
"data" :
"???")
132 <<
"tx_bytes" << group_tx
133 <<
"rx_bytes" << group_rx
135 <<
static_cast<double>(group_tx - prev_group_tx) / elapsed
137 << static_cast<double>(group_rx - prev_group_rx) / elapsed
138 <<
"tx_per_host" << tx_per_host
139 <<
"rx_per_host" << rx_per_host;
141 total_tx += group_tx;
142 total_rx += group_rx;
143 prev_total_tx += prev_group_tx;
144 prev_total_rx += prev_group_rx;
145 total_tx_active += group_tx_active;
146 total_rx_active += group_rx_active;
153 <<
"tx_bytes" << total_tx
154 <<
"rx_bytes" << total_rx
156 <<
static_cast<double>(total_tx - prev_total_tx) / elapsed
158 << static_cast<double>(total_rx - prev_total_rx) / elapsed
159 <<
"tx_active" << total_tx_active
160 <<
"rx_active" << total_rx_active;
202 return PrefixSumSelect(value, std::plus<int>(), initial,
true);
205 return PrefixSumSelect(value, std::plus<int>(), initial,
false);
208 return BroadcastSelect(value, origin);
211 return AllReduceSelect(value, std::plus<int>());
220 return PrefixSumSelect(value, std::plus<unsigned int>(), initial,
true);
223 return PrefixSumSelect(value, std::plus<unsigned int>(), initial,
false);
226 return BroadcastSelect(value, origin);
229 return AllReduceSelect(value, std::plus<unsigned int>());
238 return PrefixSumSelect(value, std::plus<long>(), initial,
true);
241 return PrefixSumSelect(value, std::plus<long>(), initial,
false);
244 return BroadcastSelect(value, origin);
247 return AllReduceSelect(value, std::plus<long>());
256 return PrefixSumSelect(value, std::plus<unsigned long>(), initial,
true);
259 return PrefixSumSelect(value, std::plus<unsigned long>(), initial,
false);
262 return BroadcastSelect(value, origin);
265 return AllReduceSelect(value, std::plus<unsigned long>());
274 return PrefixSumSelect(value, std::plus<long long>(), initial,
true);
277 return PrefixSumSelect(value, std::plus<long long>(), initial,
false);
280 return BroadcastSelect(value, origin);
283 return AllReduceSelect(value, std::plus<long long>());
292 return PrefixSumSelect(value, std::plus<unsigned long long>(), initial,
true);
295 return PrefixSumSelect(value, std::plus<unsigned long long>(), initial,
false);
298 return BroadcastSelect(value, origin);
301 return AllReduceSelect(value, std::plus<unsigned long long>());
virtual void AllReducePlusLongLong(long long &value)
virtual void BroadcastLong(long &value, size_t origin)
virtual void AllReducePlusUnsignedInt(unsigned int &value)
virtual void PrefixSumPlusLongLong(long long &value, const long long &initial)
virtual void AllReduceMinimumUnsignedInt(unsigned int &value)
Manager(const Manager &)=delete
non-copyable: delete copy-constructor
virtual void ExPrefixSumPlusLongLong(long long &value, const long long &initial)
virtual size_t num_parallel_async() const
common::JsonLogger & logger_
JsonLogger for statistics output.
virtual void ExPrefixSumPlusInt(int &value, const int &initial)
std::atomic< size_t > tx_bytes_
sent bytes
size_t prev_tx_bytes_
previous read of sent bytes
virtual void ExPrefixSumPlusUnsignedLongLong(unsigned long long &value, const unsigned long long &initial)
virtual void AllReducePlusInt(int &value)
virtual void BroadcastInt(int &value, size_t origin)
void ExecuteGroupThreads(const std::vector< std::unique_ptr< Group > > &groups, const std::function< void(GroupCalled *)> &thread_function)
virtual Connection & connection(size_t id)=0
Return Connection to client id.
virtual void AllReduceMinimumInt(int &value)
JsonLine line()
create new JsonLine instance which will be written to this logger.
virtual void BroadcastUnsignedLong(unsigned long &value, size_t origin)
virtual void ExPrefixSumPlusLong(long &value, const long &initial)
virtual void PrefixSumPlusUnsignedLong(unsigned long &value, const unsigned long &initial)
virtual void PrefixSumPlusUnsignedLongLong(unsigned long long &value, const unsigned long long &initial)
virtual void AllReduceMinimumLong(long &value)
size_t total() const
both transmitted and received bytes
virtual void AllReduceMaximumUnsignedLong(unsigned long &value)
std::ostream & operator<<(std::ostream &os, const Traffic &t)
virtual void AllReducePlusUnsignedLongLong(unsigned long long &value)
virtual void AllReduceMaximumLongLong(long long &value)
virtual void AllReduceMinimumUnsignedLong(unsigned long &value)
A Connection represents a link to another peer in a network group.
virtual void BroadcastUnsignedLongLong(unsigned long long &value, size_t origin)
virtual void ExPrefixSumPlusUnsignedInt(unsigned int &value, const unsigned int &initial)
static std::vector< std::unique_ptr< Group > > ConstructLoopbackMesh(size_t num_hosts)
Construct a mock network with num_hosts peers and deliver Group contexts for each of them...
virtual void AllReduceMaximumUnsignedLongLong(unsigned long long &value)
static std::vector< std::unique_ptr< Group > > ConstructLoopbackMesh(size_t num_hosts)
Construct a test network with an underlying full mesh of local loopback stream sockets for testing...
virtual void ExPrefixSumPlusUnsignedLong(unsigned long &value, const unsigned long &initial)
net::Traffic Traffic() const
calculate overall traffic for final stats
void RunLoopbackGroupTest(size_t num_hosts, const std::function< void(Group *)> &thread_function)
virtual void BroadcastUnsignedInt(unsigned int &value, size_t origin)
virtual void AllReduceMaximumUnsignedInt(unsigned int &value)
A network Group is a collection of enumerated communication links, which provides point-to-point comm...
virtual void AllReduceMaximumLong(long &value)
JsonLine sub(const Key &key)
return JsonLine has sub-dictionary of this one
size_t prev_rx_bytes_
previous read of received bytes
static constexpr size_t kGroupCount
The count of net::Groups to initialize.
virtual void AllReduceMaximumInt(int &value)
virtual void AllReduceMinimumUnsignedLongLong(unsigned long long &value)
JsonLogger is a receiver of JSON output objects for logging.
std::atomic< size_t > tx_active_
active send requests
virtual void PrefixSumPlusUnsignedInt(unsigned int &value, const unsigned int &initial)
size_t my_host_rank() const
Return our rank among hosts in this group.
virtual void AllReduceMinimumLongLong(long long &value)
virtual void PrefixSumPlusInt(int &value, const int &initial)
virtual void AllReducePlusLong(long &value)
std::array< GroupPtr, kGroupCount > groups_
The Groups initialized and managed by this Manager.
virtual size_t num_hosts() const =0
Return number of connections in this group (= number computing hosts)
virtual void AllReducePlusUnsignedLong(unsigned long &value)
virtual void BroadcastLongLong(long long &value, size_t origin)
std::chrono::steady_clock::time_point tp_last_
last time statistics where outputted
void RunTask(const std::chrono::steady_clock::time_point &tp) final
method called by ProfileThread.
virtual void PrefixSumPlusLong(long &value, const long &initial)
std::atomic< size_t > rx_active_
active recv requests
JsonLine is an object used to aggregate a set of key:value pairs for output into a JSON log...
std::atomic< size_t > rx_bytes_
received bytes