13 #ifndef THRILL_NET_FLOW_CONTROL_CHANNEL_HEADER 14 #define THRILL_NET_FLOW_CONTROL_CHANNEL_HEADER 24 #include <condition_variable> 51 static constexpr
bool debug =
false;
103 std::atomic<void*>
ptr[2] = { {
nullptr }, {
nullptr } };
108 #if THRILL_HAVE_THREAD_SANITIZER 116 std::condition_variable cv;
123 #if THRILL_HAVE_THREAD_SANITIZER 124 std::unique_lock<std::mutex> lock(mutex);
129 while (
counter.load(std::memory_order_relaxed) != this_step) { }
135 #if THRILL_HAVE_THREAD_SANITIZER 136 std::unique_lock<std::mutex> lock(mutex);
145 "struct LocalData has incorrect size.");
163 template <
typename T>
167 shmem_[idx].
ptr[step].store(
168 const_cast<void*>(reinterpret_cast<const void*>(value)),
169 std::memory_order_release);
172 template <
typename T>
175 return reinterpret_cast<T*
>(
176 shmem_[idx].
ptr[step].load(std::memory_order_acquire));
179 template <
typename T>
189 Group&
group,
size_t local_id,
size_t thread_count,
191 std::atomic<size_t>& generation);
216 #define TLX_ATTRIBUTE_WARN_UNUSED_RESULT 234 template <
typename T,
typename BinarySumOp = std::plus<T> >
237 const T& initial =
T(),
bool inclusive =
true) {
253 LOG <<
"FCC::PrefixSum() COMMUNICATE BEGIN" 260 locals[i] = GetLocalShared<T>(step, i);
263 T local_sum = *(locals[0]);
265 *(locals[i]) = local_sum = sum_op(local_sum, *(locals[i]));
268 T base_sum = local_sum;
273 *(locals[i]) = sum_op(base_sum, *(locals[i]));
277 for (
size_t i = thread_count_ - 1; i > 0; i--) {
278 *(locals[i]) = sum_op(base_sum, *(locals[i - 1]));
280 *(locals[0]) = base_sum;
283 LOG <<
"FCC::PrefixSum() COMMUNICATE END" 306 template <
typename T,
typename BinarySumOp = std::plus<T> >
308 PrefixSum(
const T& value,
const BinarySumOp& sum_op = BinarySumOp(),
309 const T& initial =
T()) {
327 template <
typename T,
typename BinarySumOp = std::plus<T> >
330 const T& initial =
T()) {
349 template <
typename T,
typename BinarySumOp = std::plus<T> >
352 const T& initial =
T()) {
358 using Result = std::pair<T*, T>;
360 Result result { &
value, initial };
368 LOG <<
"FCC::ExPrefixSumTotal() COMMUNICATE BEGIN" 371 Result** locals =
reinterpret_cast<Result**
>(
375 locals[i] = GetLocalShared<Result>(step, i);
378 T local_sum = *(locals[0]->first);
380 local_sum = sum_op(local_sum, *(locals[i]->first));
381 *(locals[i]->first) = local_sum;
384 T base_sum = local_sum;
389 total_sum = sum_op(base_sum, local_sum);
392 for (
size_t i = thread_count_ - 1; i > 0; --i) {
393 *(locals[i]->first) = sum_op(base_sum, *(locals[i - 1]->first));
394 locals[i]->second = total_sum;
396 *(locals[0]->first) = base_sum;
397 locals[0]->second = total_sum;
399 LOG <<
"FCC::ExPrefixSumTotal() COMMUNICATE END" 405 return result.second;
422 template <
typename T>
446 LOG <<
"FCC::Broadcast() COMMUNICATE BEGIN" 450 T res = *GetLocalShared<T>(step, primary_pe);
452 *GetLocalShared<T>(step, i) = res;
455 LOG <<
"FCC::Broadcast() COMMUNICATE END" 475 template <
typename T>
481 using SharedVectorT = std::shared_ptr<std::vector<T> >;
484 std::pair<T, SharedVectorT> local(value, sp);
496 auto local_gather = std::make_shared<std::vector<T> >(n);
511 local_gather->at(i) =
512 GetLocalShared<std::pair<T, SharedVectorT> >(step, i)->first;
521 GetLocalShared<std::pair<T, SharedVectorT> >(step, i)->second = local_gather;
541 template <
typename T,
typename BinarySumOp = std::plus<T> >
544 const BinarySumOp& sum_op = BinarySumOp()) {
560 LOG <<
"FCC::Reduce() COMMUNICATE BEGIN" 564 T local_sum = *GetLocalShared<T>(step, 0);
566 local_sum = sum_op(local_sum, *GetLocalShared<T>(step, i));
574 *GetLocalShared<T>(step, root % thread_count_) = local_sum;
576 LOG <<
"FCC::Reduce() COMMUNICATE END" 597 template <
typename T,
typename BinarySumOp = std::plus<T> >
599 AllReduce(
const T& value,
const BinarySumOp& sum_op = BinarySumOp()) {
614 LOG <<
"FCC::AllReduce() COMMUNICATE BEGIN" 618 T local_sum = *GetLocalShared<T>(step, 0);
620 local_sum = sum_op(local_sum, *GetLocalShared<T>(step, i));
628 *GetLocalShared<T>(step, i) = local_sum;
631 LOG <<
"FCC::AllReduce() COMMUNICATE END" 652 template <
typename T>
653 std::vector<T>
Predecessor(
size_t k,
const std::vector<T>& my_values) {
659 std::vector<T> result;
663 std::vector<T> send_values;
666 size_t this_gen = generation_.load(std::memory_order_acquire) + 1;
668 if (my_values.size() >= k) {
673 std::atomic_thread_fence(std::memory_order_release);
678 if (my_values.size() > k) {
679 std::vector<T> send_values_next(my_values.end() - k, my_values.end());
699 std::atomic_thread_fence(std::memory_order_acquire);
701 std::vector<T>* pre =
702 GetLocalShared<std::vector<T> >(step,
local_id_ - 1);
705 result = std::vector<T>(
706 pre->size() <= k ? pre->begin() : pre->end() - k, pre->end());
719 std::atomic_thread_fence(std::memory_order_acquire);
721 std::vector<T>* pre =
722 GetLocalShared<std::vector<T> >(step,
local_id_ - 1);
725 result = std::vector<T>(
726 pre->size() <= k ? pre->begin() : pre->end() - k, pre->end());
734 size_t fill_size = k - my_values.size();
735 send_values.reserve(
std::min(k, fill_size + result.size()));
740 result.size() <= fill_size ? result.begin() : result.end() - fill_size,
742 send_values.insert(send_values.end(),
743 my_values.begin(), my_values.end());
744 assert(send_values.size() <= k);
751 std::atomic_thread_fence(std::memory_order_release);
768 LOG <<
"FCC::Predecessor() COMMUNICATE" 789 #if !defined(_MSC_VER) 792 const size_t&,
const std::plus<size_t>&,
const size_t&,
bool);
795 const std::array<size_t, 2>&,
797 const std::array<size_t, 2>&,
bool);
799 const std::array<size_t, 3>&,
801 const std::array<size_t, 3>&,
bool);
803 const std::array<size_t, 4>&,
805 const std::array<size_t, 4>&,
bool);
808 size_t&,
const std::plus<size_t>&,
const size_t&);
811 std::array<size_t, 2>&,
813 const std::array<size_t, 2>&);
815 std::array<size_t, 3>&,
817 const std::array<size_t, 3>&);
819 std::array<size_t, 4>&,
821 const std::array<size_t, 4>&);
826 const std::array<size_t, 2>&,
size_t);
828 const std::array<size_t, 3>&,
size_t);
830 const std::array<size_t, 4>&,
size_t);
833 const size_t&,
const std::plus<size_t>&);
835 #endif // !defined(_MSC_VER) 842 #endif // !THRILL_NET_FLOW_CONTROL_CHANNEL_HEADER common::AtomicMovable< size_t > count_broadcast_
T * GetLocalShared(size_t step, size_t idx)
#define TLX_ATTRIBUTE_WARN_UNUSED_RESULT
std::atomic< void * > ptr[2]
pointer to some thread-owned data type
void Barrier()
A trivial global barrier.
Timer timer_prefixsum_
Synchronization timer.
common::AtomicMovable< size_t > count_barrier_
RIAA class for running the timer until destruction.
static bool is_power_of_two(int i)
does what it says: true if i is a power of two
common::AtomicMovable< size_t > count_allreduce_
T * GetLocalShared(size_t step)
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSum(const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, given a certain sum operation.
size_t num_hosts_
The count of all workers connected to this group.
size_t num_workers() const
Return the total number of workers.
common::AtomicMovable< size_t > count_prefixsum_
Synchronization counters.
Timer timer_communication_
static constexpr bool debug
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT AllReduce(const T &value, const BinarySumOp &sum_op=BinarySumOp())
Reduces a value of a serializable type T over all workers given a certain reduce function.
std::atomic< size_t > counter
atomic generation counter, compare this to generation_.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT PrefixSumBase(const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T(), bool inclusive=true)
Calculates the prefix sum over all workers, given a certain sum operation.
void ExPrefixSum(T &value, BinarySumOp sum_op=BinarySumOp(), const T &initial=T())
Calculate exclusive prefix sum.
common::ThreadBarrier & barrier_
Group & group()
Return the associated net::Group. USE AT YOUR OWN RISK.
void WaitCounter(size_t this_step)
template for computing the component-wise sum of std::array or std::vector.
void AllReduce(T &value, BinarySumOp sum_op=BinarySumOp())
Reduce a value from all workers to all workers.
std::shared_ptr< std::vector< T > > TLX_ATTRIBUTE_WARN_UNUSED_RESULT AllGather(const T &value)
Gathers the value of a serializable type T over all workers and provides result to all workers as a s...
void SetLocalShared(size_t step, const T *value)
size_t host_rank_
The local host rank.
Group & group_
The group associated with this channel.
std::vector< T > Predecessor(size_t k, const std::vector< T > &my_values)
Collects up to k predecessors of type T from preceding PEs.
Provides a blocking collection for communication.
common::AtomicMovable< size_t > count_predecessor_
void SendTo(size_t dest, const T &data)
Sends a serializable type to the given peer.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT Reduce(const T &value, size_t root=0, const BinarySumOp &sum_op=BinarySumOp())
Reduces a value of a serializable type T over all workers to the given worker, provided a certain red...
void Broadcast(T &value, size_t origin=0)
Broadcast a value from the worker "origin".
void LocalBarrier()
A trivial local thread barrier.
static constexpr bool enable_stats
void AllGatherBruck(T *values, size_t n)
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSumTotal(T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, and delivers the total sum as well...
A network Group is a collection of enumerated communication links, which provides point-to-point comm...
void AllGatherRecursiveDoublingPowerOfTwo(T *values, size_t n)
LocalData * shmem_
The global shared local data memory location to work upon.
static uint_pair min()
return an uint_pair instance containing the smallest value possible
void Reduce(T &value, size_t root=0, BinarySumOp sum_op=BinarySumOp())
Reduce a value from all workers to the worker 0.
size_t step() const
Return generation step counter.
std::atomic< size_t > & generation_
Host-global shared generation counter.
void wait(Lambda lambda=Lambda())
Waits for n threads to arrive.
size_t thread_count_
The count of all workers connected to this group.
void ReceiveFrom(size_t src, T *data)
Receives a serializable type from the given peer.
size_t local_id_
The id of the worker thread associated with this flow channel.
FlowControlChannel & operator=(const FlowControlChannel &)=delete
non-copyable: delete assignment operator
size_t my_rank() const
Return the worker's global rank.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT Broadcast(const T &value, size_t origin=0)
Broadcasts a value of a serializable type T from the master (the worker with id 0) to all other worke...
#define LOG
Default logging method: output if the local debug variable is true.
size_t my_host_rank() const
Return our rank among hosts in this group.
common::AtomicMovable< size_t > count_reduce_
virtual size_t num_hosts() const =0
Return number of connections in this group (= number computing hosts)
static constexpr unsigned g_cache_line_size
Finding cache line size is hard - we assume 64 byte.
Implements a thread barrier using atomics and a spin lock that can be used to synchronize threads...
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT PrefixSum(const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the inclusive prefix sum over all workers, given a certain sum operation.
FlowControlChannel(Group &group, size_t local_id, size_t thread_count, common::ThreadBarrier &barrier, LocalData *shmem, std::atomic< size_t > &generation)
Creates a new instance of this class, wrapping a net::Group.