18 #ifndef THRILL_NET_COLLECTIVE_HEADER 19 #define THRILL_NET_COLLECTIVE_HEADER 51 template <
typename T,
typename BinarySumOp>
53 const T& initial,
bool inclusive) {
54 static constexpr
bool debug =
false;
59 value = sum_op(initial, value);
67 for (
size_t d = 1; d <
num_hosts(); d <<= 1) {
82 to_forward = sum_op(recv_value, to_forward);
84 if (!first || inclusive) {
85 value = sum_op(recv_value, value);
112 template <
typename T,
typename BinarySumOp>
116 static constexpr
bool debug =
false;
118 for (
size_t d = 1; d <
num_hosts(); d <<= 1)
127 <<
": sending to peer" << peer;
137 total_sum = sum_op(recv_data, total_sum);
139 total_sum = sum_op(total_sum, recv_data);
143 value = sum_op(recv_data, value);
145 <<
": received from peer" << peer;
153 template <
typename T,
typename BinarySumOp>
155 const T& initial,
bool inclusive) {
159 template <
typename T,
typename BinarySumOp>
164 template <
typename T,
typename BinarySumOp>
180 template <
typename T>
185 for (
size_t p = 0; p <
num_hosts(); ++p) {
204 template <
typename T>
206 static constexpr
bool debug =
false;
210 size_t my_rank = (
my_host_rank() + num_hosts - origin) % num_hosts;
218 size_t from = ((my_rank ^ d) + origin) %
num_hosts;
219 sLOG <<
"Broadcast: rank" << my_rank <<
"receiving from" << from
227 for (d >>= 1; d > 0; d >>= 1, ++r) {
228 if (my_rank + d < num_hosts) {
229 size_t to = (my_rank + d + origin) % num_hosts;
230 sLOG <<
"Broadcast: rank" << my_rank <<
"round" << r <<
"sending to" 238 template <
typename T>
251 template <
typename T>
259 template <
typename T>
265 for (
size_t j = 0; j < d; ++j) {
266 size_t peer = my_rank ^ (0x1 << j);
268 size_t snd_pos = (~((0x1 << j) - 1) & my_rank) * n;
270 size_t rcv_pos = (~((0x1 << j) - 1) & peer) * n;
272 size_t ins_n = (0x1 << j) * n;
278 template <
typename T>
282 size_t size = num_hosts * n;
283 std::vector<T> temp(size);
285 for (
size_t i = 0; i < n; ++i) {
289 for (
size_t j = 0; (0x1U << j) < num_hosts; ++j) {
290 size_t snd_peer = (my_rank + num_hosts - (0x1 << j)) %
num_hosts;
291 size_t rcv_peer = (my_rank + (0x1 << j)) %
num_hosts;
293 size_t ins_pos = (0x1 << j) * n;
295 size_t ins_n =
std::min((0x1 << j) * n, size - ins_pos);
297 if ((0x1 << j) & my_rank) {
308 for (
size_t i = 0; i < size; ++i) {
309 values[i] = temp[(i + size - my_rank * n) % size];
330 template <
typename T,
typename BinarySumOp>
332 static constexpr
bool debug =
false;
335 const size_t shifted_rank = (my_rank - root) % num_hosts;
338 for (
size_t d = 1; d <
num_hosts; d <<= 1) {
339 if (shifted_rank & d) {
341 <<
"->" << (my_rank - d) % num_hosts <<
"/" 342 << shifted_rank <<
"->" << shifted_rank - d;
343 SendTo((my_rank - d) % num_hosts, value);
346 else if (shifted_rank + d < num_hosts) {
348 <<
"<-" << (my_rank + d) % num_hosts <<
"/" 349 << shifted_rank <<
"<-" << shifted_rank + d;
351 ReceiveFrom((my_rank + d) % num_hosts, &recv_data);
352 value = sum_op(value, recv_data);
367 template <
typename T,
typename BinarySumOp>
381 template <
typename T,
typename BinarySumOp>
386 for (
size_t p = 1; p <
num_hosts(); ++p) {
389 value = sum_op(value, recv_value);
392 for (
size_t p = 1; p <
num_hosts(); ++p) {
413 template <
typename T,
typename BinarySumOp>
420 for (
size_t d = 1; d <
num_hosts(); d <<= 1) {
434 value = sum_op(recv_data, value);
438 value = sum_op(value, recv_data);
458 template <
typename T,
typename BinarySumOp>
464 template <
typename T,
typename BinarySumOp>
469 return sum_op(recv_data, value);
473 return sum_op(value, recv_data);
478 template <
typename T,
typename BinarySumOp>
480 size_t host_id,
size_t group_size,
size_t remaining_hosts,
481 size_t send_to,
T&
value, BinarySumOp sum_op) {
488 size_t group_count = remaining_hosts / group_size;
489 if (group_count % 2 == 0) {
491 size_t peer = host_id ^ group_size;
492 if (peer < remaining_hosts) {
498 size_t host_group = host_id / group_size;
499 if (host_group >= group_count - 3) {
501 if (host_group == group_count - 1) {
502 size_t peer = (host_id ^ group_size) - 2 * group_size;
506 else if (host_group == group_count - 2) {
507 size_t peer = (host_id ^ group_size) + 2 * group_size;
512 value = sum_op(recv_data, value);
514 value = sum_op(value, recv_data);
519 peer = host_id ^ group_size;
522 else if (host_group == group_count - 3) {
523 size_t peer = host_id ^ group_size;
529 size_t peer = host_id ^ group_size;
530 if (peer < remaining_hosts) {
534 remaining_hosts -= group_size;
539 if (group_size < remaining_hosts) {
541 host_id, group_size, remaining_hosts, send_to,
544 else if (send_to != 0) {
550 template <
typename T,
typename BinarySumOp>
567 template <
typename T,
typename BinarySumOp>
577 #endif // !THRILL_NET_COLLECTIVE_HEADER #define sLOG
Default logging method: output if the local debug variable is true.
void BroadcastSelect(T &value, size_t origin=0)
select broadcast implementation (often due to total number of processors)
std::enable_if< std::is_pod< T >::value, void >::type ReceiveSend(const T &value, T *out_value)
static unsigned ffs(int i)
find first set bit in integer, or zero if none are set.
virtual Connection & connection(size_t id)=0
Return Connection to client id.
void ExPrefixSum(T &value, BinarySumOp sum_op=BinarySumOp(), const T &initial=T())
Calculate exclusive prefix sum.
void AllReduceHypercube(T &value, BinarySumOp sum_op=BinarySumOp())
Perform an All-Reduce for powers of two.
void BroadcastBinomialTree(T &value, size_t origin=0)
Broadcasts the value of the worker with index "origin" to all the others.
void AllReduce(T &value, BinarySumOp sum_op=BinarySumOp())
Reduce a value from all workers to all workers.
std::enable_if< std::is_pod< T >::value, void >::type SendN(const T *value, size_t n)
void SendTo(size_t dest, const T &data)
Sends a serializable type to the given peer.
void AllReduceElimination(T &value, BinarySumOp sum_op=BinarySumOp())
Perform an All-Reduce using the elimination protocol described in R.
void PrefixSumHypercube(T &value, BinarySumOp sum_op=BinarySumOp())
Calculate for every worker his prefix sum.
void Broadcast(T &value, size_t origin=0)
Broadcast a value from the worker "origin".
static constexpr bool debug
void BroadcastTrivial(T &value, size_t origin=0)
Broadcasts the value of the peer with index 0 to all the others.
void AllGatherBruck(T *values, size_t n)
void AllReduceAtRoot(T &value, BinarySumOp sum_op=BinarySumOp())
Broadcasts the value of the peer with index 0 to all the others.
void AllGatherRecursiveDoublingPowerOfTwo(T *values, size_t n)
static uint_pair min()
return an uint_pair instance containing the smallest value possible
static unsigned integer_log2_ceil(int i)
calculate the log2 floor of an integer type
void Reduce(T &value, size_t root=0, BinarySumOp sum_op=BinarySumOp())
Reduce a value from all workers to the worker 0.
void AllReduceSimple(T &value, BinarySumOp sum_op=BinarySumOp())
Perform an All-Reduce on the workers.
void AllReduceEliminationProcess(size_t host_id, size_t group_size, size_t remaining_hosts, size_t send_to, T &value, BinarySumOp sum_op)
Helper method for AllReduce().
void PrefixSum(T &value, BinarySumOp sum_op=BinarySumOp(), const T &initial=T())
Calculate inclusive prefix sum.
static int round_up_to_power_of_two(int i)
does what it says: round up to next power of two
std::enable_if< std::is_pod< T >::value, void >::type SendReceive(const T *value, T *out_value, size_t n=1)
SendReceive any serializable POD item T.
void ReceiveFrom(size_t src, T *data)
Receives a serializable type from the given peer.
void PrefixSumSelect(T &value, BinarySumOp sum_op=BinarySumOp(), const T &initial=T(), bool inclusive=true)
select prefixsum implementation (often due to total number of processors)
size_t my_host_rank() const
Return our rank among hosts in this group.
std::enable_if< std::is_pod< T >::value, void >::type ReceiveN(T *out_value, size_t n)
Receive an array of serializable POD items T.
virtual size_t num_hosts() const =0
Return number of connections in this group (= number computing hosts)
void AllReduceSelect(T &value, BinarySumOp sum_op=BinarySumOp())
select allreduce implementation (often due to total number of processors)
void PrefixSumDoubling(T &value, BinarySumOp sum_op=BinarySumOp(), const T &initial=T(), bool inclusive=true)
Calculate for every worker his prefix sum.
T SendReceiveReduce(size_t peer, const T &value, BinarySumOp sum_op)
Helper method for AllReduce().