Thrill  0.1
flow_control_channel.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/flow_control_channel.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
12 
13 #include <functional>
14 
15 namespace thrill {
16 namespace net {
17 
18 /******************************************************************************/
19 // FlowControlChannel
20 
22  Group& group, size_t local_id, size_t thread_count,
23  common::ThreadBarrier& barrier, LocalData* shmem,
24  std::atomic<size_t>& generation)
25  : group_(group),
26  host_rank_(group_.my_host_rank()), num_hosts_(group_.num_hosts()),
27  local_id_(local_id),
28  thread_count_(thread_count),
29  barrier_(barrier), shmem_(shmem), generation_(generation) { }
30 
33  << "FCC worker" << my_rank() << ":"
34  << "prefixsum"
36  << "broadcast"
38  << "reduce"
39  << count_reduce_ << "in" << timer_reduce_
40  << "allreduce"
42  << "predecessor"
44  << "barrier"
45  << count_barrier_ << "in" << timer_barrier_
46  << "communication"
48 }
49 
51  RunTimer run_timer(timer_barrier_);
53 
54  LOG << "FCC::Barrier() ENTER count=" << count_barrier_;
55 
56  barrier_.wait(
57  [&]() {
58  RunTimer net_timer(timer_communication_);
59 
60  LOG << "FCC::Barrier() COMMUNICATE BEGIN"
61  << " count=" << count_barrier_;
62 
63  // Global all reduce
64  size_t i = 0;
65  group_.AllReduce(i);
66 
67  LOG << "FCC::Barrier() COMMUNICATE END"
68  << " count=" << count_barrier_;
69  });
70 
71  LOG << "FCC::Barrier() EXIT count=" << count_barrier_;
72 }
73 
75  barrier_.wait();
76 }
77 
78 /******************************************************************************/
79 // template instantiations
80 
81 template size_t FlowControlChannel::PrefixSumBase(
82  const size_t&, const std::plus<size_t>&, const size_t&, bool);
83 
84 template std::array<size_t, 2> FlowControlChannel::PrefixSumBase(
85  const std::array<size_t, 2>&,
86  const common::ComponentSum<std::array<size_t, 2> >&,
87  const std::array<size_t, 2>&, bool);
88 template std::array<size_t, 3> FlowControlChannel::PrefixSumBase(
89  const std::array<size_t, 3>&,
90  const common::ComponentSum<std::array<size_t, 3> >&,
91  const std::array<size_t, 3>&, bool);
92 template std::array<size_t, 4> FlowControlChannel::PrefixSumBase(
93  const std::array<size_t, 4>&,
94  const common::ComponentSum<std::array<size_t, 4> >&,
95  const std::array<size_t, 4>&, bool);
96 
98  size_t&, const std::plus<size_t>&, const size_t&);
99 
100 template std::array<size_t, 2> FlowControlChannel::ExPrefixSumTotal(
101  std::array<size_t, 2>&,
102  const common::ComponentSum<std::array<size_t, 2> >&,
103  const std::array<size_t, 2>&);
104 template std::array<size_t, 3> FlowControlChannel::ExPrefixSumTotal(
105  std::array<size_t, 3>&,
106  const common::ComponentSum<std::array<size_t, 3> >&,
107  const std::array<size_t, 3>&);
108 template std::array<size_t, 4> FlowControlChannel::ExPrefixSumTotal(
109  std::array<size_t, 4>&,
110  const common::ComponentSum<std::array<size_t, 4> >&,
111  const std::array<size_t, 4>&);
112 
113 template size_t FlowControlChannel::Broadcast(const size_t&, size_t);
114 
115 template std::array<size_t, 2> FlowControlChannel::Broadcast(
116  const std::array<size_t, 2>&, size_t);
117 template std::array<size_t, 3> FlowControlChannel::Broadcast(
118  const std::array<size_t, 3>&, size_t);
119 template std::array<size_t, 4> FlowControlChannel::Broadcast(
120  const std::array<size_t, 4>&, size_t);
121 
122 template size_t FlowControlChannel::AllReduce(
123  const size_t&, const std::plus<size_t>&);
124 
125 } // namespace net
126 } // namespace thrill
127 
128 /******************************************************************************/
common::AtomicMovable< size_t > count_broadcast_
void Barrier()
A trivial global barrier.
Timer timer_prefixsum_
Synchronization timer.
#define sLOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:31
common::AtomicMovable< size_t > count_barrier_
RIAA class for running the timer until destruction.
common::AtomicMovable< size_t > count_allreduce_
common::AtomicMovable< size_t > count_prefixsum_
Synchronization counters.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT AllReduce(const T &value, const BinarySumOp &sum_op=BinarySumOp())
Reduces a value of a serializable type T over all workers given a certain reduce function.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT PrefixSumBase(const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T(), bool inclusive=true)
Calculates the prefix sum over all workers, given a certain sum operation.
template for computing the component-wise sum of std::array or std::vector.
Definition: functional.hpp:94
void AllReduce(T &value, BinarySumOp sum_op=BinarySumOp())
Reduce a value from all workers to all workers.
Definition: collective.hpp:568
Group & group_
The group associated with this channel.
common::AtomicMovable< size_t > count_predecessor_
void LocalBarrier()
A trivial local thread barrier.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSumTotal(T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, and delivers the total sum as well...
A network Group is a collection of enumerated communication links, which provides point-to-point comm...
Definition: group.hpp:47
void wait(Lambda lambda=Lambda())
Waits for n threads to arrive.
size_t my_rank() const
Return the worker&#39;s global rank.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT Broadcast(const T &value, size_t origin=0)
Broadcasts a value of a serializable type T from the master (the worker with id 0) to all other worke...
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
common::AtomicMovable< size_t > count_reduce_
Implements a thread barrier using atomics and a spin lock that can be used to synchronize threads...
FlowControlChannel(Group &group, size_t local_id, size_t thread_count, common::ThreadBarrier &barrier, LocalData *shmem, std::atomic< size_t > &generation)
Creates a new instance of this class, wrapping a net::Group.