Thrill
0.1
|
Provides a blocking collection for communication.
This wraps a raw net group, adds multi-worker/thread support, and should be used for flow control with integral types.
Important notice on threading: It is not allowed to call two different methods of two different instances of FlowControlChannel simultaniously by different threads, since the internal synchronization state (the barrier) is shared globally.
Definition at line 48 of file flow_control_channel.hpp.
#include <flow_control_channel.hpp>
Classes | |
class | LocalData |
Public Member Functions | |
FlowControlChannel (Group &group, size_t local_id, size_t thread_count, common::ThreadBarrier &barrier, LocalData *shmem, std::atomic< size_t > &generation) | |
Creates a new instance of this class, wrapping a net::Group. More... | |
FlowControlChannel (const FlowControlChannel &)=delete | |
non-copyable: delete copy-constructor More... | |
FlowControlChannel (FlowControlChannel &&)=default | |
move-constructor: default More... | |
~FlowControlChannel () | |
template<typename T > | |
std::shared_ptr< std::vector< T > > TLX_ATTRIBUTE_WARN_UNUSED_RESULT | AllGather (const T &value) |
Gathers the value of a serializable type T over all workers and provides result to all workers as a shared pointer to a vector. More... | |
template<typename T , typename BinarySumOp = std::plus<T>> | |
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT | AllReduce (const T &value, const BinarySumOp &sum_op=BinarySumOp()) |
Reduces a value of a serializable type T over all workers given a certain reduce function. More... | |
void | Barrier () |
A trivial global barrier. More... | |
template<typename T > | |
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT | Broadcast (const T &value, size_t origin=0) |
Broadcasts a value of a serializable type T from the master (the worker with id 0) to all other workers. More... | |
template<typename T , typename BinarySumOp = std::plus<T>> | |
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT | ExPrefixSum (const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T()) |
Calculates the exclusive prefix sum over all workers, given a certain sum operation. More... | |
template<typename T , typename BinarySumOp = std::plus<T>> | |
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT | ExPrefixSumTotal (T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T()) |
Calculates the exclusive prefix sum over all workers, and delivers the total sum as well. More... | |
Group & | group () |
Return the associated net::Group. USE AT YOUR OWN RISK. More... | |
void | LocalBarrier () |
A trivial local thread barrier. More... | |
size_t | my_rank () const |
Return the worker's global rank. More... | |
size_t | num_workers () const |
Return the total number of workers. More... | |
FlowControlChannel & | operator= (const FlowControlChannel &)=delete |
non-copyable: delete assignment operator More... | |
template<typename T > | |
std::vector< T > | Predecessor (size_t k, const std::vector< T > &my_values) |
Collects up to k predecessors of type T from preceding PEs. More... | |
template<typename T , typename BinarySumOp = std::plus<T>> | |
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT | PrefixSum (const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T()) |
Calculates the inclusive prefix sum over all workers, given a certain sum operation. More... | |
template<typename T , typename BinarySumOp = std::plus<T>> | |
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT | PrefixSumBase (const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T(), bool inclusive=true) |
Calculates the prefix sum over all workers, given a certain sum operation. More... | |
template<typename T , typename BinarySumOp = std::plus<T>> | |
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT | Reduce (const T &value, size_t root=0, const BinarySumOp &sum_op=BinarySumOp()) |
Reduces a value of a serializable type T over all workers to the given worker, provided a certain reduce function. More... | |
Private Types | |
using | RunTimer = common::RunTimer< Timer > |
RIAA class for running the timer. More... | |
using | Timer = common::StatsTimerBaseStopped< enable_stats > |
Timer or FakeTimer. More... | |
Private Member Functions | |
Pointer Casting | |
size_t | GetNextStep () |
template<typename T > | |
void | SetLocalShared (size_t step, const T *value) |
template<typename T > | |
T * | GetLocalShared (size_t step, size_t idx) |
template<typename T > | |
T * | GetLocalShared (size_t step) |
Private Attributes | |
common::ThreadBarrier & | barrier_ |
common::AtomicMovable< size_t > | count_allreduce_ { 0 } |
common::AtomicMovable< size_t > | count_barrier_ { 0 } |
common::AtomicMovable< size_t > | count_broadcast_ { 0 } |
common::AtomicMovable< size_t > | count_predecessor_ { 0 } |
common::AtomicMovable< size_t > | count_prefixsum_ { 0 } |
Synchronization counters. More... | |
common::AtomicMovable< size_t > | count_reduce_ { 0 } |
std::atomic< size_t > & | generation_ |
Host-global shared generation counter. More... | |
Group & | group_ |
The group associated with this channel. More... | |
size_t | host_rank_ |
The local host rank. More... | |
size_t | local_id_ |
The id of the worker thread associated with this flow channel. More... | |
size_t | num_hosts_ |
The count of all workers connected to this group. More... | |
LocalData * | shmem_ |
The global shared local data memory location to work upon. More... | |
size_t | thread_count_ |
The count of all workers connected to this group. More... | |
Timer | timer_allreduce_ |
Timer | timer_barrier_ |
Timer | timer_broadcast_ |
Timer | timer_communication_ |
Timer | timer_predecessor_ |
Timer | timer_prefixsum_ |
Synchronization timer. More... | |
Timer | timer_reduce_ |
Static Private Attributes | |
static constexpr bool | debug = false |
static constexpr bool | enable_stats = false |
|
private |
RIAA class for running the timer.
Definition at line 72 of file flow_control_channel.hpp.
|
private |
Timer or FakeTimer.
Definition at line 70 of file flow_control_channel.hpp.
FlowControlChannel | ( | Group & | group, |
size_t | local_id, | ||
size_t | thread_count, | ||
common::ThreadBarrier & | barrier, | ||
LocalData * | shmem, | ||
std::atomic< size_t > & | generation | ||
) |
Creates a new instance of this class, wrapping a net::Group.
Definition at line 21 of file flow_control_channel.cpp.
Referenced by FlowControlChannel::GetLocalShared(), and FlowControlChannel::num_workers().
|
delete |
non-copyable: delete copy-constructor
|
default |
move-constructor: default
~FlowControlChannel | ( | ) |
Definition at line 31 of file flow_control_channel.cpp.
References FlowControlChannel::count_allreduce_, FlowControlChannel::count_barrier_, FlowControlChannel::count_broadcast_, FlowControlChannel::count_predecessor_, FlowControlChannel::count_prefixsum_, FlowControlChannel::count_reduce_, FlowControlChannel::enable_stats, FlowControlChannel::my_rank(), sLOGC, FlowControlChannel::timer_allreduce_, FlowControlChannel::timer_barrier_, FlowControlChannel::timer_broadcast_, FlowControlChannel::timer_communication_, FlowControlChannel::timer_predecessor_, FlowControlChannel::timer_prefixsum_, and FlowControlChannel::timer_reduce_.
Referenced by FlowControlChannel::num_workers().
|
inline |
Gathers the value of a serializable type T over all workers and provides result to all workers as a shared pointer to a vector.
value | The value this worker contributes to the allgather operation. |
Definition at line 477 of file flow_control_channel.hpp.
References Group::AllGatherBruck(), Group::AllGatherRecursiveDoublingPowerOfTwo(), FlowControlChannel::barrier_, FlowControlChannel::count_reduce_, FlowControlChannel::enable_stats, FlowControlChannel::GetLocalShared(), FlowControlChannel::GetNextStep(), FlowControlChannel::group(), FlowControlChannel::group_, tlx::is_power_of_two(), Group::my_host_rank(), FlowControlChannel::num_workers(), FlowControlChannel::SetLocalShared(), FlowControlChannel::thread_count_, FlowControlChannel::timer_communication_, FlowControlChannel::timer_reduce_, TLX_ATTRIBUTE_WARN_UNUSED_RESULT, and ThreadBarrierSpin::wait().
void Barrier | ( | ) |
A trivial global barrier.
Definition at line 50 of file flow_control_channel.cpp.
References Group::AllReduce(), FlowControlChannel::barrier_, FlowControlChannel::count_barrier_, FlowControlChannel::debug, FlowControlChannel::enable_stats, FlowControlChannel::group_, LOG, FlowControlChannel::timer_barrier_, FlowControlChannel::timer_communication_, and ThreadBarrierSpin::wait().
Referenced by CountTrianglesGenerated(), WriteLinesOneNode< ValueType >::Execute(), JoinTPCH4(), main(), Percentiles(), FlowControlChannel::Predecessor(), RunHashWordCount(), RunKMeansFile(), RunKMeansGenerated(), and RunWordCount().
|
inline |
Calculates the exclusive prefix sum over all workers, given a certain sum operation.
This method blocks until the sum is calculated. Values are applied in order, that means sum_op(sum_op(a, b), c) if a, b, c are the values of workers 0, 1, 2.
value | The local value of this worker. |
sum_op | The operation to use for |
initial | The initial element of the body defined by T and SumOp calculating the prefix sum. The default operation is a normal addition. |
Definition at line 329 of file flow_control_channel.hpp.
References FlowControlChannel::PrefixSumBase(), and TLX_ATTRIBUTE_WARN_UNUSED_RESULT.
Referenced by WriteLinesOneNode< ValueType >::Execute(), PrefixSumNode< ValueType, SumFunction, Inclusive >::Execute(), ZipWithIndexNode< ValueType, ZipFunction >::Execute(), OverlapWindowNode< ValueType, Input, WindowFunction, PartialWindowFunction >::Execute(), and DisjointWindowNode< ValueType, Input, WindowFunction, PartialWindowFunction >::Execute().
|
inlineprivate |
Definition at line 173 of file flow_control_channel.hpp.
References FlowControlChannel::LocalData::ptr, and FlowControlChannel::thread_count_.
Referenced by FlowControlChannel::AllGather().
|
inlineprivate |
Definition at line 180 of file flow_control_channel.hpp.
References FlowControlChannel::FlowControlChannel(), FlowControlChannel::group(), and FlowControlChannel::local_id_.
|
inlineprivate |
Definition at line 159 of file flow_control_channel.hpp.
References FlowControlChannel::barrier_, and ThreadBarrierSpin::step().
Referenced by FlowControlChannel::AllGather(), FlowControlChannel::AllReduce(), FlowControlChannel::Broadcast(), FlowControlChannel::ExPrefixSumTotal(), FlowControlChannel::Predecessor(), FlowControlChannel::PrefixSumBase(), and FlowControlChannel::Reduce().
|
inline |
Return the associated net::Group. USE AT YOUR OWN RISK.
Definition at line 194 of file flow_control_channel.hpp.
References FlowControlChannel::group_.
Referenced by FlowControlChannel::AllGather(), and FlowControlChannel::GetLocalShared().
void LocalBarrier | ( | ) |
A trivial local thread barrier.
Definition at line 74 of file flow_control_channel.cpp.
References FlowControlChannel::AllReduce(), FlowControlChannel::barrier_, FlowControlChannel::Broadcast(), FlowControlChannel::ExPrefixSumTotal(), FlowControlChannel::PrefixSumBase(), and ThreadBarrierSpin::wait().
Referenced by FlowControlChannel::Predecessor().
|
inline |
Return the worker's global rank.
Definition at line 197 of file flow_control_channel.hpp.
References FlowControlChannel::group_, FlowControlChannel::local_id_, Group::my_host_rank(), and FlowControlChannel::thread_count_.
Referenced by FlowControlChannel::~FlowControlChannel().
|
inline |
Return the total number of workers.
Definition at line 202 of file flow_control_channel.hpp.
References FlowControlChannel::FlowControlChannel(), FlowControlChannel::group_, Group::num_hosts(), FlowControlChannel::operator=(), FlowControlChannel::thread_count_, TLX_ATTRIBUTE_WARN_UNUSED_RESULT, and FlowControlChannel::~FlowControlChannel().
Referenced by FlowControlChannel::AllGather(), and FlowControlChannel::Reduce().
|
delete |
non-copyable: delete assignment operator
Referenced by FlowControlChannel::num_workers().
Collects up to k predecessors of type T from preceding PEs.
k must be equal on all PEs.
Assume each worker has <= k items. Predecessor collects up to the k items from preceding PEs. If the directly preceding PE has fewer than k items, then it waits for its predecessor to deliver items, in the hope to get up to k.
This is used by the Window() transformation, but may in future also be useful to get a single predecessor item in other distributed operations.
Definition at line 653 of file flow_control_channel.hpp.
References FlowControlChannel::AllReduce(), FlowControlChannel::Barrier(), FlowControlChannel::barrier_, FlowControlChannel::Broadcast(), FlowControlChannel::count_predecessor_, FlowControlChannel::debug, FlowControlChannel::enable_stats, FlowControlChannel::ExPrefixSumTotal(), FlowControlChannel::GetNextStep(), FlowControlChannel::group_, FlowControlChannel::host_rank_, FlowControlChannel::LocalData::IncCounter(), FlowControlChannel::local_id_, FlowControlChannel::LocalBarrier(), LOG, min(), FlowControlChannel::num_hosts_, FlowControlChannel::PrefixSumBase(), Group::ReceiveFrom(), Group::SendTo(), FlowControlChannel::SetLocalShared(), FlowControlChannel::thread_count_, FlowControlChannel::timer_predecessor_, ThreadBarrierSpin::wait(), and FlowControlChannel::LocalData::WaitCounter().
Referenced by OverlapWindowNode< ValueType, Input, WindowFunction, PartialWindowFunction >::Execute(), and DisjointWindowNode< ValueType, Input, WindowFunction, PartialWindowFunction >::Execute().
|
inline |
Calculates the inclusive prefix sum over all workers, given a certain sum operation.
This method blocks until the sum is caluclated. Values are applied in order, that means sum_op(sum_op(a, b), c) if a, b, c are the values of workers 0, 1, 2.
value | The local value of this worker. |
sum_op | The operation to use for |
initial | The initial element of the body defined by T and SumOp calculating the prefix sum. The default operation is a normal addition. |
Definition at line 308 of file flow_control_channel.hpp.
References FlowControlChannel::PrefixSumBase(), and TLX_ATTRIBUTE_WARN_UNUSED_RESULT.
Referenced by ConcatNode< ValueType >::Execute().
|
inline |
Reduces a value of a serializable type T over all workers to the given worker, provided a certain reduce function.
This method is blocking. The reduce happens in order as with prefix sum. The operation is assumed to be associative.
value | The value to use for the reduce operation. |
root | destination worker of the reduce |
sum_op | The operation to use for calculating the reduced value. The default operation is a normal addition. |
Definition at line 543 of file flow_control_channel.hpp.
References FlowControlChannel::barrier_, FlowControlChannel::count_reduce_, FlowControlChannel::debug, FlowControlChannel::enable_stats, FlowControlChannel::GetNextStep(), FlowControlChannel::group_, LOG, Group::my_host_rank(), FlowControlChannel::num_workers(), Group::Reduce(), FlowControlChannel::SetLocalShared(), FlowControlChannel::thread_count_, FlowControlChannel::timer_communication_, FlowControlChannel::timer_reduce_, TLX_ATTRIBUTE_WARN_UNUSED_RESULT, gen_data::value, and ThreadBarrierSpin::wait().
Referenced by Context::Launch().
|
inlineprivate |
Definition at line 164 of file flow_control_channel.hpp.
References FlowControlChannel::local_id_, and FlowControlChannel::LocalData::ptr.
Referenced by FlowControlChannel::AllGather(), FlowControlChannel::AllReduce(), FlowControlChannel::Broadcast(), FlowControlChannel::ExPrefixSumTotal(), FlowControlChannel::Predecessor(), FlowControlChannel::PrefixSumBase(), and FlowControlChannel::Reduce().
|
private |
The shared barrier used to synchronize between worker threads on this node.
Definition at line 94 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::AllGather(), FlowControlChannel::AllReduce(), FlowControlChannel::Barrier(), FlowControlChannel::Broadcast(), FlowControlChannel::ExPrefixSumTotal(), FlowControlChannel::GetNextStep(), FlowControlChannel::LocalBarrier(), FlowControlChannel::Predecessor(), FlowControlChannel::PrefixSumBase(), and FlowControlChannel::Reduce().
|
private |
Definition at line 88 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::AllReduce(), and FlowControlChannel::~FlowControlChannel().
|
private |
Definition at line 90 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::Barrier(), and FlowControlChannel::~FlowControlChannel().
|
private |
Definition at line 86 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::Broadcast(), and FlowControlChannel::~FlowControlChannel().
|
private |
Definition at line 89 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::Predecessor(), and FlowControlChannel::~FlowControlChannel().
|
private |
Synchronization counters.
Definition at line 85 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::ExPrefixSumTotal(), FlowControlChannel::PrefixSumBase(), and FlowControlChannel::~FlowControlChannel().
|
private |
Definition at line 87 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::AllGather(), FlowControlChannel::Reduce(), and FlowControlChannel::~FlowControlChannel().
|
staticprivate |
Definition at line 51 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::AllReduce(), FlowControlChannel::Barrier(), FlowControlChannel::Broadcast(), FlowControlChannel::ExPrefixSumTotal(), FlowControlChannel::Predecessor(), FlowControlChannel::PrefixSumBase(), and FlowControlChannel::Reduce().
|
staticprivate |
Definition at line 52 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::AllGather(), FlowControlChannel::AllReduce(), FlowControlChannel::Barrier(), FlowControlChannel::Broadcast(), FlowControlChannel::ExPrefixSumTotal(), FlowControlChannel::Predecessor(), FlowControlChannel::PrefixSumBase(), FlowControlChannel::Reduce(), and FlowControlChannel::~FlowControlChannel().
|
private |
Host-global shared generation counter.
Definition at line 154 of file flow_control_channel.hpp.
|
private |
The group associated with this channel.
Definition at line 55 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::AllGather(), FlowControlChannel::AllReduce(), FlowControlChannel::Barrier(), FlowControlChannel::Broadcast(), FlowControlChannel::ExPrefixSumTotal(), FlowControlChannel::group(), FlowControlChannel::my_rank(), FlowControlChannel::num_workers(), FlowControlChannel::Predecessor(), FlowControlChannel::PrefixSumBase(), and FlowControlChannel::Reduce().
|
private |
The local host rank.
Definition at line 58 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::ExPrefixSumTotal(), and FlowControlChannel::Predecessor().
|
private |
The id of the worker thread associated with this flow channel.
Definition at line 64 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::Broadcast(), FlowControlChannel::GetLocalShared(), FlowControlChannel::my_rank(), FlowControlChannel::Predecessor(), and FlowControlChannel::SetLocalShared().
|
private |
The count of all workers connected to this group.
Definition at line 61 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::ExPrefixSumTotal(), and FlowControlChannel::Predecessor().
|
private |
The global shared local data memory location to work upon.
Definition at line 151 of file flow_control_channel.hpp.
|
private |
The count of all workers connected to this group.
Definition at line 67 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::AllGather(), FlowControlChannel::AllReduce(), FlowControlChannel::Broadcast(), FlowControlChannel::ExPrefixSumTotal(), FlowControlChannel::GetLocalShared(), FlowControlChannel::my_rank(), FlowControlChannel::num_workers(), FlowControlChannel::Predecessor(), FlowControlChannel::PrefixSumBase(), and FlowControlChannel::Reduce().
|
private |
Definition at line 78 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::AllReduce(), and FlowControlChannel::~FlowControlChannel().
|
private |
Definition at line 80 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::Barrier(), and FlowControlChannel::~FlowControlChannel().
|
private |
Definition at line 76 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::Broadcast(), and FlowControlChannel::~FlowControlChannel().
|
private |
Definition at line 82 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::AllGather(), FlowControlChannel::AllReduce(), FlowControlChannel::Barrier(), FlowControlChannel::Broadcast(), FlowControlChannel::ExPrefixSumTotal(), FlowControlChannel::PrefixSumBase(), FlowControlChannel::Reduce(), and FlowControlChannel::~FlowControlChannel().
|
private |
Definition at line 79 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::Predecessor(), and FlowControlChannel::~FlowControlChannel().
|
private |
Synchronization timer.
Definition at line 75 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::ExPrefixSumTotal(), FlowControlChannel::PrefixSumBase(), and FlowControlChannel::~FlowControlChannel().
|
private |
Definition at line 77 of file flow_control_channel.hpp.
Referenced by FlowControlChannel::AllGather(), FlowControlChannel::Reduce(), and FlowControlChannel::~FlowControlChannel().