Thrill  0.1
flow_control_channel.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/flow_control_channel.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Emanuel Jöbstl <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_NET_FLOW_CONTROL_CHANNEL_HEADER
14 #define THRILL_NET_FLOW_CONTROL_CHANNEL_HEADER
15 
20 #include <thrill/net/group.hpp>
21 
22 #include <algorithm>
23 #include <array>
24 #include <condition_variable>
25 #include <functional>
26 #include <mutex>
27 #include <string>
28 #include <utility>
29 #include <vector>
30 
31 namespace thrill {
32 namespace net {
33 
34 //! \addtogroup net_layer
35 //! \{
36 
37 /*!
38  * Provides a blocking collection for communication.
39  *
40  * This wraps a raw net group, adds multi-worker/thread support, and should be
41  * used for flow control with integral types.
42  *
43  * Important notice on threading: It is not allowed to call two different
44  * methods of two different instances of FlowControlChannel simultaniously by
45  * different threads, since the internal synchronization state (the barrier) is
46  * shared globally.
47  */
49 {
50 private:
51  static constexpr bool debug = false;
52  static constexpr bool enable_stats = false;
53 
54  //! The group associated with this channel.
56 
57  //! The local host rank.
58  size_t host_rank_;
59 
60  //! The count of all workers connected to this group.
61  size_t num_hosts_;
62 
63  //! The id of the worker thread associated with this flow channel.
64  size_t local_id_;
65 
66  //! The count of all workers connected to this group.
67  size_t thread_count_;
68 
69  //! Timer or FakeTimer
71  //! RIAA class for running the timer
73 
74  //! Synchronization timer
81 
83 
84  //! Synchronization counters
91 
92  //! The shared barrier used to synchronize between worker threads on this
93  //! node.
95 
96  //! Thread local data structure: aligned such that no cache line is
97  //! shared. The actual vector is in the FlowControlChannelManager.
98  class LocalData
99  {
100  public:
101  //! pointer to some thread-owned data type
103  std::atomic<void*> ptr[2] = { { nullptr }, { nullptr } };
104 
105  //! atomic generation counter, compare this to generation_.
106  std::atomic<size_t> counter { 0 };
107 
108 #if THRILL_HAVE_THREAD_SANITIZER
109  // workarounds because ThreadSanitizer has false-positives work with
110  // generation counters.
111 
112  //! mutex for locking condition variable
113  std::mutex mutex;
114 
115  //! condition variable for signaling incrementing of conunter.
116  std::condition_variable cv;
117 #endif
118 
119  //! \name Generation Counting
120  //! \{
121 
122  void WaitCounter(size_t this_step) {
123 #if THRILL_HAVE_THREAD_SANITIZER
124  std::unique_lock<std::mutex> lock(mutex);
125  while (counter != this_step)
126  cv.wait(lock);
127 #else
128  // busy wait on generation counter of predecessor
129  while (counter.load(std::memory_order_relaxed) != this_step) { }
130 #endif
131  }
132 
133  void IncCounter() {
134  ++counter;
135 #if THRILL_HAVE_THREAD_SANITIZER
136  std::unique_lock<std::mutex> lock(mutex);
137  cv.notify_one();
138 #endif
139  }
140 
141  //! \}
142  };
143 
144  static_assert(sizeof(LocalData) % common::g_cache_line_size == 0,
145  "struct LocalData has incorrect size.");
146 
147  //! for access to struct LocalData
148  friend class FlowControlChannelManager;
149 
150  //! The global shared local data memory location to work upon.
152 
153  //! Host-global shared generation counter
154  std::atomic<size_t>& generation_;
155 
156  //! \name Pointer Casting
157  //! \{
158 
159  size_t GetNextStep() {
160  return (barrier_.step() + 1) % 2;
161  }
162 
163  template <typename T>
164  void SetLocalShared(size_t step, const T* value) {
165  // We are only allowed to set our own memory location.
166  size_t idx = local_id_;
167  shmem_[idx].ptr[step].store(
168  const_cast<void*>(reinterpret_cast<const void*>(value)),
169  std::memory_order_release);
170  }
171 
172  template <typename T>
173  T * GetLocalShared(size_t step, size_t idx) {
174  assert(idx < thread_count_);
175  return reinterpret_cast<T*>(
176  shmem_[idx].ptr[step].load(std::memory_order_acquire));
177  }
178 
179  template <typename T>
180  T * GetLocalShared(size_t step) {
181  GetLocalShared<T>(step, local_id_);
182  }
183 
184  //! \}
185 
186 public:
187  //! Creates a new instance of this class, wrapping a net::Group.
189  Group& group, size_t local_id, size_t thread_count,
190  common::ThreadBarrier& barrier, LocalData* shmem,
191  std::atomic<size_t>& generation);
192 
193  //! Return the associated net::Group. USE AT YOUR OWN RISK.
194  Group& group() { return group_; }
195 
196  //! Return the worker's global rank
197  size_t my_rank() const {
199  }
200 
201  //! Return the total number of workers.
202  size_t num_workers() const {
203  return group_.num_hosts() * thread_count_;
204  }
205 
206  //! non-copyable: delete copy-constructor
207  FlowControlChannel(const FlowControlChannel&) = delete;
208  //! non-copyable: delete assignment operator
210  //! move-constructor: default
212 
214 
215 #ifdef SWIG
216 #define TLX_ATTRIBUTE_WARN_UNUSED_RESULT
217 #endif
218 
219  /*!
220  * Calculates the prefix sum over all workers, given a certain sum
221  * operation.
222  *
223  * This method blocks until the sum is caluclated. Values are applied in
224  * order, that means sum_op(sum_op(a, b), c) if a, b, c are the values of
225  * workers 0, 1, 2.
226  *
227  * \param value The local value of this worker.
228  * \param initial The initial element for the body defined by T and sum_op
229  * \param sum_op The operation to use for
230  * calculating the prefix sum. The default operation is a normal addition.
231  * \param inclusive Whether the prefix sum is inclusive or exclusive.
232  * \return The prefix sum for the position of this worker.
233  */
234  template <typename T, typename BinarySumOp = std::plus<T> >
236  PrefixSumBase(const T& value, const BinarySumOp& sum_op = BinarySumOp(),
237  const T& initial = T(), bool inclusive = true) {
238 
239  RunTimer run_timer(timer_prefixsum_);
241  LOG << "FCC::PrefixSum() ENTER count=" << count_prefixsum_;
242 
243  T local_value = value;
244 
245  size_t step = GetNextStep();
246 
247  SetLocalShared(step, &local_value);
248 
249  barrier_.wait(
250  [&]() {
251  RunTimer net_timer(timer_communication_);
252 
253  LOG << "FCC::PrefixSum() COMMUNICATE BEGIN"
254  << " count=" << count_prefixsum_;
255 
256  // global prefix
257  T** locals = reinterpret_cast<T**>(alloca(thread_count_ * sizeof(T*)));
258 
259  for (size_t i = 0; i < thread_count_; i++) {
260  locals[i] = GetLocalShared<T>(step, i);
261  }
262 
263  T local_sum = *(locals[0]);
264  for (size_t i = 1; i < thread_count_; i++) {
265  *(locals[i]) = local_sum = sum_op(local_sum, *(locals[i]));
266  }
267 
268  T base_sum = local_sum;
269  group_.ExPrefixSum(base_sum, sum_op, initial);
270 
271  if (inclusive) {
272  for (size_t i = 0; i < thread_count_; i++) {
273  *(locals[i]) = sum_op(base_sum, *(locals[i]));
274  }
275  }
276  else {
277  for (size_t i = thread_count_ - 1; i > 0; i--) {
278  *(locals[i]) = sum_op(base_sum, *(locals[i - 1]));
279  }
280  *(locals[0]) = base_sum;
281  }
282 
283  LOG << "FCC::PrefixSum() COMMUNICATE END"
284  << " count=" << count_prefixsum_;
285  });
286 
287  LOG << "FCC::PrefixSum() EXIT count=" << count_prefixsum_;
288 
289  return local_value;
290  }
291 
292  /*!
293  * Calculates the inclusive prefix sum over all workers, given a certain sum
294  * operation.
295  *
296  * This method blocks until the sum is caluclated. Values are applied in
297  * order, that means sum_op(sum_op(a, b), c) if a, b, c are the values of
298  * workers 0, 1, 2.
299  *
300  * \param value The local value of this worker.
301  * \param sum_op The operation to use for
302  * \param initial The initial element of the body defined by T and SumOp
303  * calculating the prefix sum. The default operation is a normal addition.
304  * \return The prefix sum for the position of this worker.
305  */
306  template <typename T, typename BinarySumOp = std::plus<T> >
308  PrefixSum(const T& value, const BinarySumOp& sum_op = BinarySumOp(),
309  const T& initial = T()) {
310  return PrefixSumBase(value, sum_op, initial, true);
311  }
312 
313  /*!
314  * Calculates the exclusive prefix sum over all workers, given a certain sum
315  * operation.
316  *
317  * This method blocks until the sum is calculated. Values are applied in
318  * order, that means sum_op(sum_op(a, b), c) if a, b, c are the values of
319  * workers 0, 1, 2.
320  *
321  * \param value The local value of this worker.
322  * \param sum_op The operation to use for
323  * \param initial The initial element of the body defined by T and SumOp
324  * calculating the prefix sum. The default operation is a normal addition.
325  * \return The prefix sum for the position of this worker.
326  */
327  template <typename T, typename BinarySumOp = std::plus<T> >
329  ExPrefixSum(const T& value, const BinarySumOp& sum_op = BinarySumOp(),
330  const T& initial = T()) {
331  return PrefixSumBase(value, sum_op, initial, false);
332  }
333 
334  /*!
335  * Calculates the exclusive prefix sum over all workers, and delivers the
336  * total sum as well. The input value parameter is set to the PE's exclusive
337  * prefix sum value and the total sum is returned.
338  *
339  * This method blocks until the sum is calculated. Values are applied in
340  * order, that means sum_op(sum_op(a, b), c) if a, b, c are the values of
341  * workers 0, 1, 2.
342  *
343  * \param value The local value of this worker.
344  * \param sum_op The operation to use for
345  * \param initial The initial element of the body defined by T and SumOp
346  * calculating the prefix sum. The default operation is a normal addition.
347  * \return The prefix sum for the position of this worker.
348  */
349  template <typename T, typename BinarySumOp = std::plus<T> >
351  ExPrefixSumTotal(T& value, const BinarySumOp& sum_op = BinarySumOp(),
352  const T& initial = T()) {
353 
354  RunTimer run_timer(timer_prefixsum_);
356  LOG << "FCC::ExPrefixSumTotal() ENTER count=" << count_prefixsum_;
357 
358  using Result = std::pair<T*, T>;
359 
360  Result result { &value, initial };
361  size_t step = GetNextStep();
362  SetLocalShared(step, &result);
363 
364  barrier_.wait(
365  [&]() {
366  RunTimer net_timer(timer_communication_);
367 
368  LOG << "FCC::ExPrefixSumTotal() COMMUNICATE BEGIN"
369  << " count=" << count_prefixsum_;
370 
371  Result** locals = reinterpret_cast<Result**>(
372  alloca(thread_count_ * sizeof(Result*)));
373 
374  for (size_t i = 0; i < thread_count_; ++i) {
375  locals[i] = GetLocalShared<Result>(step, i);
376  }
377 
378  T local_sum = *(locals[0]->first);
379  for (size_t i = 1; i < thread_count_; ++i) {
380  local_sum = sum_op(local_sum, *(locals[i]->first));
381  *(locals[i]->first) = local_sum;
382  }
383 
384  T base_sum = local_sum;
385  group_.ExPrefixSum(base_sum, sum_op, initial);
386 
387  T total_sum;
388  if (host_rank_ + 1 == num_hosts_)
389  total_sum = sum_op(base_sum, local_sum);
390  group_.Broadcast(total_sum, num_hosts_ - 1);
391 
392  for (size_t i = thread_count_ - 1; i > 0; --i) {
393  *(locals[i]->first) = sum_op(base_sum, *(locals[i - 1]->first));
394  locals[i]->second = total_sum;
395  }
396  *(locals[0]->first) = base_sum;
397  locals[0]->second = total_sum;
398 
399  LOG << "FCC::ExPrefixSumTotal() COMMUNICATE END"
400  << " count=" << count_prefixsum_;
401  });
402 
403  LOG << "FCC::ExPrefixSumTotal() EXIT count=" << count_prefixsum_;
404 
405  return result.second;
406  }
407 
408  /*!
409  * Broadcasts a value of a serializable type T from the master (the worker
410  * with id 0) to all other workers.
411  *
412  * This method is blocking on all workers except the master.
413  *
414  * \param value The value to broadcast. This value is ignored for each
415  * worker except the master. We use this signature to keep the decision
416  * wether a node is the master transparent.
417  *
418  * \param origin Worker number to broadcast value from.
419  *
420  * \return The value sent by the master.
421  */
422  template <typename T>
424  Broadcast(const T& value, size_t origin = 0) {
425 
426  RunTimer run_timer(timer_broadcast_);
428  LOG << "FCC::Broadcast() ENTER count=" << count_broadcast_;
429 
430  T local = value;
431 
432  size_t step = GetNextStep();
433  SetLocalShared(step, &local);
434 
435  // Select primary thread of each node to handle I/O (assumes all hosts
436  // has the same number of threads).
437  size_t primary_pe = origin % thread_count_;
438 
439  if (local_id_ == primary_pe) {
440  RunTimer net_timer(timer_communication_);
441  group_.Broadcast(local, origin / thread_count_);
442  }
443 
444  barrier_.wait(
445  [&]() {
446  LOG << "FCC::Broadcast() COMMUNICATE BEGIN"
447  << " count=" << count_broadcast_;
448 
449  // copy from primary PE to all others
450  T res = *GetLocalShared<T>(step, primary_pe);
451  for (size_t i = 0; i < thread_count_; i++) {
452  *GetLocalShared<T>(step, i) = res;
453  }
454 
455  LOG << "FCC::Broadcast() COMMUNICATE END"
456  << " count=" << count_broadcast_;
457  });
458 
459  LOG << "FCC::Broadcast() EXIT count=" << count_broadcast_;
460 
461  return local;
462  }
463 
464  /*!
465  * Gathers the value of a serializable type T over all workers and
466  * provides result to all workers as a shared pointer to a
467  * vector.
468  *
469  * \param value
470  * The value this worker contributes to the allgather operation.
471  * \return
472  * The result of the allgather operation as a shared pointer to a
473  * vector.
474  */
475  template <typename T>
476  std::shared_ptr<std::vector<T> > TLX_ATTRIBUTE_WARN_UNUSED_RESULT
477  AllGather(const T& value) {
478  RunTimer run_timer(timer_reduce_);
480 
481  using SharedVectorT = std::shared_ptr<std::vector<T> >;
482 
483  SharedVectorT sp;
484  std::pair<T, SharedVectorT> local(value, sp);
485 
486  size_t step = GetNextStep();
487  SetLocalShared(step, &local);
488 
489  barrier_.wait(
490  [&]() {
491  RunTimer net_timer(timer_communication_);
492 
493  size_t n = num_workers();
494 
495  // allocate shared vector of correct final size
496  auto local_gather = std::make_shared<std::vector<T> >(n);
497 
498  if (tlx::is_power_of_two(group().num_hosts())) {
499  // gather local values and insert at correct final positions in the vector
500  for (size_t i = 0; i < thread_count_; i++) {
501  local_gather->at(thread_count_ * group_.my_host_rank() + i) =
502  GetLocalShared<std::pair<T, SharedVectorT> >(step, i)->first;
503  }
504 
505  // global allgather
507  }
508  else {
509  // gather local values and insert at correct final positions in the vector
510  for (size_t i = 0; i < thread_count_; i++) {
511  local_gather->at(i) =
512  GetLocalShared<std::pair<T, SharedVectorT> >(step, i)->first;
513  }
514 
515  // global allgather
516  group_.AllGatherBruck(local_gather->data(), thread_count_);
517  }
518 
519  // distribute shared pointer to worker threads
520  for (size_t i = 0; i < thread_count_; i++) {
521  GetLocalShared<std::pair<T, SharedVectorT> >(step, i)->second = local_gather;
522  }
523  });
524 
525  return local.second;
526  }
527 
528  /*!
529  * Reduces a value of a serializable type T over all workers to the given
530  * worker, provided a certain reduce function.
531  *
532  * This method is blocking. The reduce happens in order as with prefix
533  * sum. The operation is assumed to be associative.
534  *
535  * \param value The value to use for the reduce operation.
536  * \param root destination worker of the reduce
537  * \param sum_op The operation to use for
538  * calculating the reduced value. The default operation is a normal addition.
539  * \return The result of the reduce operation.
540  */
541  template <typename T, typename BinarySumOp = std::plus<T> >
543  Reduce(const T& value, size_t root = 0,
544  const BinarySumOp& sum_op = BinarySumOp()) {
545  assert(root < num_workers());
546 
547  RunTimer run_timer(timer_reduce_);
548  if (enable_stats || debug) ++count_reduce_;
549  LOG << "FCC::Reduce() ENTER count=" << count_reduce_;
550 
551  T local = value;
552 
553  size_t step = GetNextStep();
554  SetLocalShared(step, &local);
555 
556  barrier_.wait(
557  [&]() {
558  RunTimer net_timer(timer_communication_);
559 
560  LOG << "FCC::Reduce() COMMUNICATE BEGIN"
561  << " count=" << count_reduce_;
562 
563  // local reduce
564  T local_sum = *GetLocalShared<T>(step, 0);
565  for (size_t i = 1; i < thread_count_; i++) {
566  local_sum = sum_op(local_sum, *GetLocalShared<T>(step, i));
567  }
568 
569  // global reduce
570  group_.Reduce(local_sum, root / thread_count_, sum_op);
571 
572  // set the local value only at the root
573  if (root / thread_count_ == group_.my_host_rank())
574  *GetLocalShared<T>(step, root % thread_count_) = local_sum;
575 
576  LOG << "FCC::Reduce() COMMUNICATE END"
577  << " count=" << count_reduce_;
578  });
579 
580  LOG << "FCC::Reduce() EXIT count=" << count_reduce_;
581 
582  return local;
583  }
584 
585  /*!
586  * Reduces a value of a serializable type T over all workers given a certain
587  * reduce function.
588  *
589  * This method is blocking. The reduce happens in order as with prefix
590  * sum. The operation is assumed to be associative.
591  *
592  * \param value The value to use for the reduce operation.
593  * \param sum_op The operation to use for calculating the reduced value. The
594  * default operation is a normal addition.
595  * \return The result of the reduce operation.
596  */
597  template <typename T, typename BinarySumOp = std::plus<T> >
599  AllReduce(const T& value, const BinarySumOp& sum_op = BinarySumOp()) {
600 
601  RunTimer run_timer(timer_allreduce_);
603  LOG << "FCC::AllReduce() ENTER count=" << count_allreduce_;
604 
605  T local = value;
606 
607  size_t step = GetNextStep();
608  SetLocalShared(step, &local);
609 
610  barrier_.wait(
611  [&]() {
612  RunTimer net_timer(timer_communication_);
613 
614  LOG << "FCC::AllReduce() COMMUNICATE BEGIN"
615  << " count=" << count_allreduce_;
616 
617  // local reduce
618  T local_sum = *GetLocalShared<T>(step, 0);
619  for (size_t i = 1; i < thread_count_; i++) {
620  local_sum = sum_op(local_sum, *GetLocalShared<T>(step, i));
621  }
622 
623  // global reduce
624  group_.AllReduce(local_sum, sum_op);
625 
626  // distribute back to local workers
627  for (size_t i = 0; i < thread_count_; i++) {
628  *GetLocalShared<T>(step, i) = local_sum;
629  }
630 
631  LOG << "FCC::AllReduce() COMMUNICATE END"
632  << " count=" << count_allreduce_;
633  });
634 
635  LOG << "FCC::AllReduce() EXIT count=" << count_allreduce_;
636 
637  return local;
638  }
639 
640  /*!
641  * Collects up to k predecessors of type T from preceding PEs. k must be
642  * equal on all PEs.
643  *
644  * Assume each worker has <= k items. Predecessor collects up to the k items
645  * from preceding PEs. If the directly preceding PE has fewer than k items,
646  * then it waits for its predecessor to deliver items, in the hope to get up
647  * to k.
648  *
649  * This is used by the Window() transformation, but may in future also be
650  * useful to get a single predecessor item in other distributed operations.
651  */
652  template <typename T>
653  std::vector<T> Predecessor(size_t k, const std::vector<T>& my_values) {
654 
655  RunTimer run_timer(timer_predecessor_);
657  LOG << "FCC::Predecessor() ENTER count=" << count_predecessor_;
658 
659  std::vector<T> result;
660  size_t step = GetNextStep();
661 
662  // this vector must live beyond the ThreadBarrier.
663  std::vector<T> send_values;
664 
665  // get generation counter
666  size_t this_gen = generation_.load(std::memory_order_acquire) + 1;
667 
668  if (my_values.size() >= k) {
669  // if we already have k items, then "transmit" them to our successor
670  if (local_id_ + 1 != thread_count_) {
671  SetLocalShared(step, &my_values);
672  // release memory inside vector
673  std::atomic_thread_fence(std::memory_order_release);
674  // increment generation counter to match this_step.
675  shmem_[local_id_].IncCounter();
676  }
677  else if (host_rank_ + 1 != num_hosts_) {
678  if (my_values.size() > k) {
679  std::vector<T> send_values_next(my_values.end() - k, my_values.end());
680  group_.SendTo(host_rank_ + 1, send_values_next);
681  }
682  else {
683  group_.SendTo(host_rank_ + 1, my_values);
684  }
685  // increment generation counter for synchronizing
686  shmem_[local_id_].IncCounter();
687  }
688  else {
689  // increment generation counter for synchronizing
690  shmem_[local_id_].IncCounter();
691  }
692 
693  // and wait for the predecessor to deliver its batch
694  if (local_id_ != 0) {
695  // wait on generation counter of predecessor
696  shmem_[local_id_ - 1].WaitCounter(this_gen);
697 
698  // acquire memory inside vector
699  std::atomic_thread_fence(std::memory_order_acquire);
700 
701  std::vector<T>* pre =
702  GetLocalShared<std::vector<T> >(step, local_id_ - 1);
703 
704  // copy over only k elements (there may be more or less)
705  result = std::vector<T>(
706  pre->size() <= k ? pre->begin() : pre->end() - k, pre->end());
707  }
708  else if (host_rank_ != 0) {
709  group_.ReceiveFrom(host_rank_ - 1, &result);
710  }
711  }
712  else {
713  // we don't have k items, wait for our predecessor to send some.
714  if (local_id_ != 0) {
715  // wait on generation counter of predecessor
716  shmem_[local_id_ - 1].WaitCounter(this_gen);
717 
718  // acquire memory inside vector
719  std::atomic_thread_fence(std::memory_order_acquire);
720 
721  std::vector<T>* pre =
722  GetLocalShared<std::vector<T> >(step, local_id_ - 1);
723 
724  // copy over only k elements (there may be more)
725  result = std::vector<T>(
726  pre->size() <= k ? pre->begin() : pre->end() - k, pre->end());
727  }
728  else if (host_rank_ != 0) {
729  group_.ReceiveFrom(host_rank_ - 1, &result);
730  }
731 
732  // prepend values we got from our predecessor with local ones, such
733  // that they will fill up send_values together with all local items
734  size_t fill_size = k - my_values.size();
735  send_values.reserve(std::min(k, fill_size + result.size()));
736  send_values.insert(
737  send_values.end(),
738  // copy last fill_size items from res. don't do end - fill_size,
739  // because that may result in unsigned wrap-around.
740  result.size() <= fill_size ? result.begin() : result.end() - fill_size,
741  result.end());
742  send_values.insert(send_values.end(),
743  my_values.begin(), my_values.end());
744  assert(send_values.size() <= k);
745 
746  // now we have k items or at many as we can get, hence, "transmit"
747  // them to our successor
748  if (local_id_ + 1 != thread_count_) {
749  SetLocalShared(step, &send_values);
750  // release memory inside vector
751  std::atomic_thread_fence(std::memory_order_release);
752  // increment generation counter to match this_step.
753  shmem_[local_id_].IncCounter();
754  }
755  else if (host_rank_ + 1 != num_hosts_) {
756  group_.SendTo(host_rank_ + 1, send_values);
757  // increment generation counter for synchronizing
758  shmem_[local_id_].IncCounter();
759  }
760  else {
761  // increment generation counter for synchronizing
762  shmem_[local_id_].IncCounter();
763  }
764  }
765 
766  // await until all threads have retrieved their value.
767  barrier_.wait([this]() {
768  LOG << "FCC::Predecessor() COMMUNICATE"
769  << " count=" << count_predecessor_;
770 
771  generation_++;
772  });
773 
774  LOG << "FCC::Predecessor() EXIT count=" << count_predecessor_;
775 
776  return result;
777  }
778 
779  //! A trivial global barrier.
780  void Barrier();
781 
782  //! A trivial local thread barrier
783  void LocalBarrier();
784 };
785 
786 /******************************************************************************/
787 // extern templates
788 
789 #if !defined(_MSC_VER)
790 
791 extern template size_t FlowControlChannel::PrefixSumBase(
792  const size_t&, const std::plus<size_t>&, const size_t&, bool);
793 
794 extern template std::array<size_t, 2> FlowControlChannel::PrefixSumBase(
795  const std::array<size_t, 2>&,
796  const common::ComponentSum<std::array<size_t, 2> >&,
797  const std::array<size_t, 2>&, bool);
798 extern template std::array<size_t, 3> FlowControlChannel::PrefixSumBase(
799  const std::array<size_t, 3>&,
800  const common::ComponentSum<std::array<size_t, 3> >&,
801  const std::array<size_t, 3>&, bool);
802 extern template std::array<size_t, 4> FlowControlChannel::PrefixSumBase(
803  const std::array<size_t, 4>&,
804  const common::ComponentSum<std::array<size_t, 4> >&,
805  const std::array<size_t, 4>&, bool);
806 
807 extern template size_t FlowControlChannel::ExPrefixSumTotal(
808  size_t&, const std::plus<size_t>&, const size_t&);
809 
810 extern template std::array<size_t, 2> FlowControlChannel::ExPrefixSumTotal(
811  std::array<size_t, 2>&,
812  const common::ComponentSum<std::array<size_t, 2> >&,
813  const std::array<size_t, 2>&);
814 extern template std::array<size_t, 3> FlowControlChannel::ExPrefixSumTotal(
815  std::array<size_t, 3>&,
816  const common::ComponentSum<std::array<size_t, 3> >&,
817  const std::array<size_t, 3>&);
818 extern template std::array<size_t, 4> FlowControlChannel::ExPrefixSumTotal(
819  std::array<size_t, 4>&,
820  const common::ComponentSum<std::array<size_t, 4> >&,
821  const std::array<size_t, 4>&);
822 
823 extern template size_t FlowControlChannel::Broadcast(const size_t&, size_t);
824 
825 extern template std::array<size_t, 2> FlowControlChannel::Broadcast(
826  const std::array<size_t, 2>&, size_t);
827 extern template std::array<size_t, 3> FlowControlChannel::Broadcast(
828  const std::array<size_t, 3>&, size_t);
829 extern template std::array<size_t, 4> FlowControlChannel::Broadcast(
830  const std::array<size_t, 4>&, size_t);
831 
832 extern template size_t FlowControlChannel::AllReduce(
833  const size_t&, const std::plus<size_t>&);
834 
835 #endif // !defined(_MSC_VER)
836 
837 //! \}
838 
839 } // namespace net
840 } // namespace thrill
841 
842 #endif // !THRILL_NET_FLOW_CONTROL_CHANNEL_HEADER
843 
844 /******************************************************************************/
common::AtomicMovable< size_t > count_broadcast_
T * GetLocalShared(size_t step, size_t idx)
#define TLX_ATTRIBUTE_WARN_UNUSED_RESULT
std::atomic< void * > ptr[2]
pointer to some thread-owned data type
double T
void Barrier()
A trivial global barrier.
Timer timer_prefixsum_
Synchronization timer.
common::AtomicMovable< size_t > count_barrier_
RIAA class for running the timer until destruction.
static bool is_power_of_two(int i)
does what it says: true if i is a power of two
common::AtomicMovable< size_t > count_allreduce_
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSum(const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, given a certain sum operation.
size_t num_hosts_
The count of all workers connected to this group.
size_t num_workers() const
Return the total number of workers.
common::AtomicMovable< size_t > count_prefixsum_
Synchronization counters.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT AllReduce(const T &value, const BinarySumOp &sum_op=BinarySumOp())
Reduces a value of a serializable type T over all workers given a certain reduce function.
std::atomic< size_t > counter
atomic generation counter, compare this to generation_.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT PrefixSumBase(const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T(), bool inclusive=true)
Calculates the prefix sum over all workers, given a certain sum operation.
void ExPrefixSum(T &value, BinarySumOp sum_op=BinarySumOp(), const T &initial=T())
Calculate exclusive prefix sum.
Definition: collective.hpp:165
Group & group()
Return the associated net::Group. USE AT YOUR OWN RISK.
template for computing the component-wise sum of std::array or std::vector.
Definition: functional.hpp:94
void AllReduce(T &value, BinarySumOp sum_op=BinarySumOp())
Reduce a value from all workers to all workers.
Definition: collective.hpp:568
std::shared_ptr< std::vector< T > > TLX_ATTRIBUTE_WARN_UNUSED_RESULT AllGather(const T &value)
Gathers the value of a serializable type T over all workers and provides result to all workers as a s...
void SetLocalShared(size_t step, const T *value)
size_t host_rank_
The local host rank.
Group & group_
The group associated with this channel.
std::vector< T > Predecessor(size_t k, const std::vector< T > &my_values)
Collects up to k predecessors of type T from preceding PEs.
Provides a blocking collection for communication.
int value
Definition: gen_data.py:41
common::AtomicMovable< size_t > count_predecessor_
void SendTo(size_t dest, const T &data)
Sends a serializable type to the given peer.
Definition: group.hpp:112
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT Reduce(const T &value, size_t root=0, const BinarySumOp &sum_op=BinarySumOp())
Reduces a value of a serializable type T over all workers to the given worker, provided a certain red...
void Broadcast(T &value, size_t origin=0)
Broadcast a value from the worker "origin".
Definition: collective.hpp:252
void LocalBarrier()
A trivial local thread barrier.
void AllGatherBruck(T *values, size_t n)
Definition: collective.hpp:279
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSumTotal(T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, and delivers the total sum as well...
A network Group is a collection of enumerated communication links, which provides point-to-point comm...
Definition: group.hpp:47
void AllGatherRecursiveDoublingPowerOfTwo(T *values, size_t n)
Definition: collective.hpp:260
LocalData * shmem_
The global shared local data memory location to work upon.
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
void Reduce(T &value, size_t root=0, BinarySumOp sum_op=BinarySumOp())
Reduce a value from all workers to the worker 0.
Definition: collective.hpp:331
size_t step() const
Return generation step counter.
std::atomic< size_t > & generation_
Host-global shared generation counter.
void wait(Lambda lambda=Lambda())
Waits for n threads to arrive.
size_t thread_count_
The count of all workers connected to this group.
void ReceiveFrom(size_t src, T *data)
Receives a serializable type from the given peer.
Definition: group.hpp:123
size_t local_id_
The id of the worker thread associated with this flow channel.
FlowControlChannel & operator=(const FlowControlChannel &)=delete
non-copyable: delete assignment operator
size_t my_rank() const
Return the worker&#39;s global rank.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT Broadcast(const T &value, size_t origin=0)
Broadcasts a value of a serializable type T from the master (the worker with id 0) to all other worke...
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
size_t my_host_rank() const
Return our rank among hosts in this group.
Definition: group.hpp:69
common::AtomicMovable< size_t > count_reduce_
virtual size_t num_hosts() const =0
Return number of connections in this group (= number computing hosts)
static constexpr unsigned g_cache_line_size
Finding cache line size is hard - we assume 64 byte.
Definition: config.hpp:38
Implements a thread barrier using atomics and a spin lock that can be used to synchronize threads...
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT PrefixSum(const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the inclusive prefix sum over all workers, given a certain sum operation.
FlowControlChannel(Group &group, size_t local_id, size_t thread_count, common::ThreadBarrier &barrier, LocalData *shmem, std::atomic< size_t > &generation)
Creates a new instance of this class, wrapping a net::Group.