Thrill  0.1
FlowControlChannel Class Reference

Detailed Description

Provides a blocking collection for communication.

This wraps a raw net group, adds multi-worker/thread support, and should be used for flow control with integral types.

Important notice on threading: It is not allowed to call two different methods of two different instances of FlowControlChannel simultaniously by different threads, since the internal synchronization state (the barrier) is shared globally.

Definition at line 48 of file flow_control_channel.hpp.

+ Collaboration diagram for FlowControlChannel:

#include <flow_control_channel.hpp>

Classes

class  LocalData
 

Public Member Functions

 FlowControlChannel (Group &group, size_t local_id, size_t thread_count, common::ThreadBarrier &barrier, LocalData *shmem, std::atomic< size_t > &generation)
 Creates a new instance of this class, wrapping a net::Group. More...
 
 FlowControlChannel (const FlowControlChannel &)=delete
 non-copyable: delete copy-constructor More...
 
 FlowControlChannel (FlowControlChannel &&)=default
 move-constructor: default More...
 
 ~FlowControlChannel ()
 
template<typename T >
std::shared_ptr< std::vector< T > > TLX_ATTRIBUTE_WARN_UNUSED_RESULT AllGather (const T &value)
 Gathers the value of a serializable type T over all workers and provides result to all workers as a shared pointer to a vector. More...
 
template<typename T , typename BinarySumOp = std::plus<T>>
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT AllReduce (const T &value, const BinarySumOp &sum_op=BinarySumOp())
 Reduces a value of a serializable type T over all workers given a certain reduce function. More...
 
void Barrier ()
 A trivial global barrier. More...
 
template<typename T >
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT Broadcast (const T &value, size_t origin=0)
 Broadcasts a value of a serializable type T from the master (the worker with id 0) to all other workers. More...
 
template<typename T , typename BinarySumOp = std::plus<T>>
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSum (const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
 Calculates the exclusive prefix sum over all workers, given a certain sum operation. More...
 
template<typename T , typename BinarySumOp = std::plus<T>>
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSumTotal (T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
 Calculates the exclusive prefix sum over all workers, and delivers the total sum as well. More...
 
Groupgroup ()
 Return the associated net::Group. USE AT YOUR OWN RISK. More...
 
void LocalBarrier ()
 A trivial local thread barrier. More...
 
size_t my_rank () const
 Return the worker's global rank. More...
 
size_t num_workers () const
 Return the total number of workers. More...
 
FlowControlChanneloperator= (const FlowControlChannel &)=delete
 non-copyable: delete assignment operator More...
 
template<typename T >
std::vector< TPredecessor (size_t k, const std::vector< T > &my_values)
 Collects up to k predecessors of type T from preceding PEs. More...
 
template<typename T , typename BinarySumOp = std::plus<T>>
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT PrefixSum (const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
 Calculates the inclusive prefix sum over all workers, given a certain sum operation. More...
 
template<typename T , typename BinarySumOp = std::plus<T>>
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT PrefixSumBase (const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T(), bool inclusive=true)
 Calculates the prefix sum over all workers, given a certain sum operation. More...
 
template<typename T , typename BinarySumOp = std::plus<T>>
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT Reduce (const T &value, size_t root=0, const BinarySumOp &sum_op=BinarySumOp())
 Reduces a value of a serializable type T over all workers to the given worker, provided a certain reduce function. More...
 

Private Types

using RunTimer = common::RunTimer< Timer >
 RIAA class for running the timer. More...
 
using Timer = common::StatsTimerBaseStopped< enable_stats >
 Timer or FakeTimer. More...
 

Private Member Functions

Pointer Casting
size_t GetNextStep ()
 
template<typename T >
void SetLocalShared (size_t step, const T *value)
 
template<typename T >
TGetLocalShared (size_t step, size_t idx)
 
template<typename T >
TGetLocalShared (size_t step)
 

Private Attributes

common::ThreadBarrierbarrier_
 
common::AtomicMovable< size_t > count_allreduce_ { 0 }
 
common::AtomicMovable< size_t > count_barrier_ { 0 }
 
common::AtomicMovable< size_t > count_broadcast_ { 0 }
 
common::AtomicMovable< size_t > count_predecessor_ { 0 }
 
common::AtomicMovable< size_t > count_prefixsum_ { 0 }
 Synchronization counters. More...
 
common::AtomicMovable< size_t > count_reduce_ { 0 }
 
std::atomic< size_t > & generation_
 Host-global shared generation counter. More...
 
Groupgroup_
 The group associated with this channel. More...
 
size_t host_rank_
 The local host rank. More...
 
size_t local_id_
 The id of the worker thread associated with this flow channel. More...
 
size_t num_hosts_
 The count of all workers connected to this group. More...
 
LocalDatashmem_
 The global shared local data memory location to work upon. More...
 
size_t thread_count_
 The count of all workers connected to this group. More...
 
Timer timer_allreduce_
 
Timer timer_barrier_
 
Timer timer_broadcast_
 
Timer timer_communication_
 
Timer timer_predecessor_
 
Timer timer_prefixsum_
 Synchronization timer. More...
 
Timer timer_reduce_
 

Static Private Attributes

static constexpr bool debug = false
 
static constexpr bool enable_stats = false
 

Member Typedef Documentation

◆ RunTimer

using RunTimer = common::RunTimer<Timer>
private

RIAA class for running the timer.

Definition at line 72 of file flow_control_channel.hpp.

◆ Timer

Timer or FakeTimer.

Definition at line 70 of file flow_control_channel.hpp.

Constructor & Destructor Documentation

◆ FlowControlChannel() [1/3]

FlowControlChannel ( Group group,
size_t  local_id,
size_t  thread_count,
common::ThreadBarrier barrier,
LocalData shmem,
std::atomic< size_t > &  generation 
)

Creates a new instance of this class, wrapping a net::Group.

Definition at line 21 of file flow_control_channel.cpp.

Referenced by FlowControlChannel::GetLocalShared(), and FlowControlChannel::num_workers().

◆ FlowControlChannel() [2/3]

FlowControlChannel ( const FlowControlChannel )
delete

non-copyable: delete copy-constructor

◆ FlowControlChannel() [3/3]

move-constructor: default

◆ ~FlowControlChannel()

Member Function Documentation

◆ AllGather()

std::shared_ptr<std::vector<T> > TLX_ATTRIBUTE_WARN_UNUSED_RESULT AllGather ( const T value)
inline

◆ Barrier()

◆ ExPrefixSum()

T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSum ( const T value,
const BinarySumOp &  sum_op = BinarySumOp(),
const T initial = T() 
)
inline

Calculates the exclusive prefix sum over all workers, given a certain sum operation.

This method blocks until the sum is calculated. Values are applied in order, that means sum_op(sum_op(a, b), c) if a, b, c are the values of workers 0, 1, 2.

Parameters
valueThe local value of this worker.
sum_opThe operation to use for
initialThe initial element of the body defined by T and SumOp calculating the prefix sum. The default operation is a normal addition.
Returns
The prefix sum for the position of this worker.

Definition at line 329 of file flow_control_channel.hpp.

References FlowControlChannel::PrefixSumBase(), and TLX_ATTRIBUTE_WARN_UNUSED_RESULT.

Referenced by WriteLinesOneNode< ValueType >::Execute(), PrefixSumNode< ValueType, SumFunction, Inclusive >::Execute(), ZipWithIndexNode< ValueType, ZipFunction >::Execute(), OverlapWindowNode< ValueType, Input, WindowFunction, PartialWindowFunction >::Execute(), and DisjointWindowNode< ValueType, Input, WindowFunction, PartialWindowFunction >::Execute().

◆ GetLocalShared() [1/2]

T* GetLocalShared ( size_t  step,
size_t  idx 
)
inlineprivate

◆ GetLocalShared() [2/2]

T* GetLocalShared ( size_t  step)
inlineprivate

◆ GetNextStep()

◆ group()

Group& group ( )
inline

Return the associated net::Group. USE AT YOUR OWN RISK.

Definition at line 194 of file flow_control_channel.hpp.

References FlowControlChannel::group_.

Referenced by FlowControlChannel::AllGather(), and FlowControlChannel::GetLocalShared().

◆ LocalBarrier()

◆ my_rank()

size_t my_rank ( ) const
inline

◆ num_workers()

◆ operator=()

FlowControlChannel& operator= ( const FlowControlChannel )
delete

non-copyable: delete assignment operator

Referenced by FlowControlChannel::num_workers().

◆ Predecessor()

std::vector<T> Predecessor ( size_t  k,
const std::vector< T > &  my_values 
)
inline

Collects up to k predecessors of type T from preceding PEs.

k must be equal on all PEs.

Assume each worker has <= k items. Predecessor collects up to the k items from preceding PEs. If the directly preceding PE has fewer than k items, then it waits for its predecessor to deliver items, in the hope to get up to k.

This is used by the Window() transformation, but may in future also be useful to get a single predecessor item in other distributed operations.

Definition at line 653 of file flow_control_channel.hpp.

References FlowControlChannel::AllReduce(), FlowControlChannel::Barrier(), FlowControlChannel::barrier_, FlowControlChannel::Broadcast(), FlowControlChannel::count_predecessor_, FlowControlChannel::debug, FlowControlChannel::enable_stats, FlowControlChannel::ExPrefixSumTotal(), FlowControlChannel::GetNextStep(), FlowControlChannel::group_, FlowControlChannel::host_rank_, FlowControlChannel::LocalData::IncCounter(), FlowControlChannel::local_id_, FlowControlChannel::LocalBarrier(), LOG, min(), FlowControlChannel::num_hosts_, FlowControlChannel::PrefixSumBase(), Group::ReceiveFrom(), Group::SendTo(), FlowControlChannel::SetLocalShared(), FlowControlChannel::thread_count_, FlowControlChannel::timer_predecessor_, ThreadBarrierSpin::wait(), and FlowControlChannel::LocalData::WaitCounter().

Referenced by OverlapWindowNode< ValueType, Input, WindowFunction, PartialWindowFunction >::Execute(), and DisjointWindowNode< ValueType, Input, WindowFunction, PartialWindowFunction >::Execute().

◆ PrefixSum()

T TLX_ATTRIBUTE_WARN_UNUSED_RESULT PrefixSum ( const T value,
const BinarySumOp &  sum_op = BinarySumOp(),
const T initial = T() 
)
inline

Calculates the inclusive prefix sum over all workers, given a certain sum operation.

This method blocks until the sum is caluclated. Values are applied in order, that means sum_op(sum_op(a, b), c) if a, b, c are the values of workers 0, 1, 2.

Parameters
valueThe local value of this worker.
sum_opThe operation to use for
initialThe initial element of the body defined by T and SumOp calculating the prefix sum. The default operation is a normal addition.
Returns
The prefix sum for the position of this worker.

Definition at line 308 of file flow_control_channel.hpp.

References FlowControlChannel::PrefixSumBase(), and TLX_ATTRIBUTE_WARN_UNUSED_RESULT.

Referenced by ConcatNode< ValueType >::Execute().

◆ Reduce()

T TLX_ATTRIBUTE_WARN_UNUSED_RESULT Reduce ( const T value,
size_t  root = 0,
const BinarySumOp &  sum_op = BinarySumOp() 
)
inline

Reduces a value of a serializable type T over all workers to the given worker, provided a certain reduce function.

This method is blocking. The reduce happens in order as with prefix sum. The operation is assumed to be associative.

Parameters
valueThe value to use for the reduce operation.
rootdestination worker of the reduce
sum_opThe operation to use for calculating the reduced value. The default operation is a normal addition.
Returns
The result of the reduce operation.

Definition at line 543 of file flow_control_channel.hpp.

References FlowControlChannel::barrier_, FlowControlChannel::count_reduce_, FlowControlChannel::debug, FlowControlChannel::enable_stats, FlowControlChannel::GetNextStep(), FlowControlChannel::group_, LOG, Group::my_host_rank(), FlowControlChannel::num_workers(), Group::Reduce(), FlowControlChannel::SetLocalShared(), FlowControlChannel::thread_count_, FlowControlChannel::timer_communication_, FlowControlChannel::timer_reduce_, TLX_ATTRIBUTE_WARN_UNUSED_RESULT, gen_data::value, and ThreadBarrierSpin::wait().

Referenced by Context::Launch().

◆ SetLocalShared()

Member Data Documentation

◆ barrier_

◆ count_allreduce_

common::AtomicMovable<size_t> count_allreduce_ { 0 }
private

◆ count_barrier_

common::AtomicMovable<size_t> count_barrier_ { 0 }
private

◆ count_broadcast_

common::AtomicMovable<size_t> count_broadcast_ { 0 }
private

◆ count_predecessor_

common::AtomicMovable<size_t> count_predecessor_ { 0 }
private

◆ count_prefixsum_

common::AtomicMovable<size_t> count_prefixsum_ { 0 }
private

◆ count_reduce_

◆ debug

◆ enable_stats

◆ generation_

std::atomic<size_t>& generation_
private

Host-global shared generation counter.

Definition at line 154 of file flow_control_channel.hpp.

◆ group_

◆ host_rank_

size_t host_rank_
private

The local host rank.

Definition at line 58 of file flow_control_channel.hpp.

Referenced by FlowControlChannel::ExPrefixSumTotal(), and FlowControlChannel::Predecessor().

◆ local_id_

size_t local_id_
private

◆ num_hosts_

size_t num_hosts_
private

The count of all workers connected to this group.

Definition at line 61 of file flow_control_channel.hpp.

Referenced by FlowControlChannel::ExPrefixSumTotal(), and FlowControlChannel::Predecessor().

◆ shmem_

LocalData* shmem_
private

The global shared local data memory location to work upon.

Definition at line 151 of file flow_control_channel.hpp.

◆ thread_count_

◆ timer_allreduce_

Timer timer_allreduce_
private

◆ timer_barrier_

Timer timer_barrier_
private

◆ timer_broadcast_

Timer timer_broadcast_
private

◆ timer_communication_

◆ timer_predecessor_

Timer timer_predecessor_
private

◆ timer_prefixsum_

Timer timer_prefixsum_
private

◆ timer_reduce_


The documentation for this class was generated from the following files: