18 #ifndef THRILL_NET_GROUP_HEADER 19 #define THRILL_NET_GROUP_HEADER 78 virtual void Close() = 0;
111 template <
typename T>
122 template <
typename T>
133 template <
typename T,
typename BinarySumOp = std::plus<T> >
135 const T& initial =
T());
138 template <
typename T,
typename BinarySumOp = std::plus<T> >
139 void ExPrefixSum(
T& value, BinarySumOp sum_op = BinarySumOp(),
140 const T& initial =
T());
143 template <
typename T>
147 template <
typename T,
typename BinarySumOp = std::plus<T> >
148 void Reduce(
T& value,
size_t root = 0, BinarySumOp sum_op = BinarySumOp());
151 template <
typename T,
typename BinarySumOp = std::plus<T> >
152 void AllReduce(
T& value, BinarySumOp sum_op = BinarySumOp());
160 template <
typename T,
typename BinarySumOp = std::plus<T> >
162 const T& initial =
T(),
bool inclusive =
true);
164 template <
typename T,
typename BinarySumOp = std::plus<T> >
166 const T& initial =
T(),
bool inclusive =
true);
168 template <
typename T,
typename BinarySumOp = std::plus<T> >
173 template <
typename T>
176 template <
typename T>
179 template <
typename T>
184 template <
typename T>
187 template <
typename T>
192 template <
typename T,
typename BinarySumOp = std::plus<T> >
195 template <
typename T,
typename BinarySumOp = std::plus<T> >
198 template <
typename T,
typename BinarySumOp = std::plus<T> >
201 template <
typename T,
typename BinarySumOp = std::plus<T> >
204 template <
typename T,
typename BinarySumOp = std::plus<T> >
219 template <
typename T,
typename BinarySumOp>
223 template <
typename T,
typename BinarySumOp>
225 size_t host_id,
size_t group_size,
size_t remaining_hosts,
226 size_t send_to,
T& value, BinarySumOp sum_op);
298 template <
typename Group,
typename GroupCalled>
300 const std::vector<std::unique_ptr<Group> >& groups,
301 const std::function<
void(GroupCalled*)>& thread_function) {
305 std::vector<std::thread> threads(num_hosts);
308 threads[i] = std::thread(
309 [thread_function, g = groups[i].
get()]() {
310 return thread_function(g);
328 const std::function<
void(
Group*)>& thread_function);
491 inline void Group::PrefixSum(
unsigned long long&
value, std::plus<unsigned long long>,
const unsigned long long& initial) {
523 #endif // !THRILL_NET_GROUP_HEADER virtual void AllReducePlusLongLong(long long &value)
virtual void BroadcastLong(long &value, size_t origin)
virtual void AllReducePlusUnsignedInt(unsigned int &value)
virtual void PrefixSumPlusLongLong(long long &value, const long long &initial)
Group(size_t my_rank)
initializing constructor
virtual void AllReduceMinimumUnsignedInt(unsigned int &value)
virtual void ExPrefixSumPlusLongLong(long long &value, const long long &initial)
void BroadcastSelect(T &value, size_t origin=0)
select broadcast implementation (often due to total number of processors)
virtual size_t num_parallel_async() const
size_t OneFactorPeer(size_t round) const
size_t OneFactorSize() const
Number of of 1-factor iterations.
virtual void ExPrefixSumPlusInt(int &value, const int &initial)
Group & operator=(const Group &)=delete
non-copyable: delete assignment operator
virtual void ExPrefixSumPlusUnsignedLongLong(unsigned long long &value, const unsigned long long &initial)
size_t my_rank_
our rank in the network group
virtual void AllReducePlusInt(int &value)
virtual void BroadcastInt(int &value, size_t origin)
virtual std::unique_ptr< class Dispatcher > ConstructDispatcher() const =0
static size_t CalcOneFactorPeer(size_t r, size_t p, size_t n)
Calculate a Perfect Matching (1-Factor) on a Complete Graph.
void ExecuteGroupThreads(const std::vector< std::unique_ptr< Group > > &groups, const std::function< void(GroupCalled *)> &thread_function)
virtual Connection & connection(size_t id)=0
Return Connection to client id.
void ExPrefixSum(T &value, BinarySumOp sum_op=BinarySumOp(), const T &initial=T())
Calculate exclusive prefix sum.
virtual void AllReduceMinimumInt(int &value)
void AllReduceHypercube(T &value, BinarySumOp sum_op=BinarySumOp())
Perform an All-Reduce for powers of two.
std::enable_if< std::is_pod< T >::value, void >::type Send(const T &value)
void BroadcastBinomialTree(T &value, size_t origin=0)
Broadcasts the value of the worker with index "origin" to all the others.
virtual void BroadcastUnsignedLong(unsigned long &value, size_t origin)
virtual void ExPrefixSumPlusLong(long &value, const long &initial)
void AllReduce(T &value, BinarySumOp sum_op=BinarySumOp())
Reduce a value from all workers to all workers.
virtual void PrefixSumPlusUnsignedLong(unsigned long &value, const unsigned long &initial)
virtual void PrefixSumPlusUnsignedLongLong(unsigned long long &value, const unsigned long long &initial)
virtual void AllReduceMinimumLong(long &value)
virtual void AllReduceMaximumUnsignedLong(unsigned long &value)
virtual void Close()=0
Close.
virtual void AllReducePlusUnsignedLongLong(unsigned long long &value)
virtual void AllReduceMaximumLongLong(long long &value)
virtual void AllReduceMinimumUnsignedLong(unsigned long &value)
A Connection represents a link to another peer in a network group.
void SendTo(size_t dest, const T &data)
Sends a serializable type to the given peer.
virtual void BroadcastUnsignedLongLong(unsigned long long &value, size_t origin)
void AllReduceElimination(T &value, BinarySumOp sum_op=BinarySumOp())
Perform an All-Reduce using the elimination protocol described in R.
void PrefixSumHypercube(T &value, BinarySumOp sum_op=BinarySumOp())
Calculate for every worker his prefix sum.
virtual void ExPrefixSumPlusUnsignedInt(unsigned int &value, const unsigned int &initial)
void Broadcast(T &value, size_t origin=0)
Broadcast a value from the worker "origin".
virtual void AllReduceMaximumUnsignedLongLong(unsigned long long &value)
std::vector< T, Allocator< T > > vector
vector with Manager tracking
void BroadcastTrivial(T &value, size_t origin=0)
Broadcasts the value of the peer with index 0 to all the others.
virtual void ExPrefixSumPlusUnsignedLong(unsigned long &value, const unsigned long &initial)
std::unique_ptr< Group > GroupPtr
unique pointer to a Group.
void AllGatherBruck(T *values, size_t n)
void RunLoopbackGroupTest(size_t num_hosts, const std::function< void(Group *)> &thread_function)
virtual void BroadcastUnsignedInt(unsigned int &value, size_t origin)
virtual void AllReduceMaximumUnsignedInt(unsigned int &value)
std::enable_if< std::is_pod< T >::value, void >::type Receive(T *out_value)
Receive any serializable POD item T.
void AllReduceAtRoot(T &value, BinarySumOp sum_op=BinarySumOp())
Broadcasts the value of the peer with index 0 to all the others.
A network Group is a collection of enumerated communication links, which provides point-to-point comm...
virtual void AllReduceMaximumLong(long &value)
void AllGatherRecursiveDoublingPowerOfTwo(T *values, size_t n)
void Reduce(T &value, size_t root=0, BinarySumOp sum_op=BinarySumOp())
Reduce a value from all workers to the worker 0.
virtual void AllReduceMaximumInt(int &value)
static size_t CalcOneFactorSize(size_t n)
Number of rounds in Perfect Matching (1-Factor).
void AllReduceSimple(T &value, BinarySumOp sum_op=BinarySumOp())
Perform an All-Reduce on the workers.
void AllReduceEliminationProcess(size_t host_id, size_t group_size, size_t remaining_hosts, size_t send_to, T &value, BinarySumOp sum_op)
Helper method for AllReduce().
void PrefixSum(T &value, BinarySumOp sum_op=BinarySumOp(), const T &initial=T())
Calculate inclusive prefix sum.
virtual void AllReduceMinimumUnsignedLongLong(unsigned long long &value)
void ReceiveFrom(size_t src, T *data)
Receives a serializable type from the given peer.
virtual ~Group()
virtual destructor
void PrefixSumSelect(T &value, BinarySumOp sum_op=BinarySumOp(), const T &initial=T(), bool inclusive=true)
select prefixsum implementation (often due to total number of processors)
virtual void PrefixSumPlusUnsignedInt(unsigned int &value, const unsigned int &initial)
size_t my_host_rank() const
Return our rank among hosts in this group.
virtual void AllReduceMinimumLongLong(long long &value)
virtual void PrefixSumPlusInt(int &value, const int &initial)
virtual void AllReducePlusLong(long &value)
virtual size_t num_hosts() const =0
Return number of connections in this group (= number computing hosts)
virtual void AllReducePlusUnsignedLong(unsigned long &value)
void AllReduceSelect(T &value, BinarySumOp sum_op=BinarySumOp())
select allreduce implementation (often due to total number of processors)
virtual void BroadcastLongLong(long long &value, size_t origin)
void PrefixSumDoubling(T &value, BinarySumOp sum_op=BinarySumOp(), const T &initial=T(), bool inclusive=true)
Calculate for every worker his prefix sum.
virtual void PrefixSumPlusLong(long &value, const long &initial)
T SendReceiveReduce(size_t peer, const T &value, BinarySumOp sum_op)
Helper method for AllReduce().