Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
thread_barrier.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/common/thread_barrier.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Emanuel J√∂bstl <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_COMMON_THREAD_BARRIER_HEADER
14 #define THRILL_COMMON_THREAD_BARRIER_HEADER
15 
19 #include <thrill/common/logger.hpp>
21 
22 #include <atomic>
23 #include <condition_variable>
24 #include <mutex>
25 #include <thread>
26 
27 namespace thrill {
28 namespace common {
29 
30 /*!
31  * Implements a cyclic barrier using mutex locking and condition variables that
32  * can be used to synchronize threads.
33  */
35 {
36 public:
37  /*!
38  * Creates a new barrier that waits for n threads.
39  */
40  explicit ThreadBarrierLocking(size_t thread_count)
41  : thread_count_(thread_count) { }
42 
43  /*!
44  * Waits for n threads to arrive.
45  *
46  * This method blocks and returns as soon as n threads are waiting inside
47  * the method.
48  */
49  template <typename Lambda = NoOperation<void> >
50  void Await(Lambda lambda = Lambda()) {
51  std::unique_lock<std::mutex> lock(mutex_);
52 
53  size_t local_ = step_;
54  counts_[local_]++;
55 
56  if (counts_[local_] < thread_count_) {
57  while (counts_[local_] < thread_count_) {
58  cv_.wait(lock);
59  }
60  }
61  else {
62  step_ = step_ ? 0 : 1;
63  counts_[step_] = 0;
64  lambda();
65  cv_.notify_all();
66  }
67  }
68 
69  //! Return generation step counter
70  size_t step() const {
71  return step_;
72  }
73 
74 protected:
75  std::mutex mutex_;
76  std::condition_variable cv_;
77 
78  //! number of threads
79  const size_t thread_count_;
80 
81  //! two counters: switch between them every run.
82  size_t counts_[2] = { 0, 0 };
83 
84  //! current counter used.
85  size_t step_ = 0;
86 };
87 
88 /*!
89  * Implements a cyclic barrier using atomics and a spin lock that can be used to
90  * synchronize threads.
91  *
92  * This ThreadBarrier implementation was a lot faster in tests, but
93  * ThreadSanitizer shows data races (probably due to the generation counter).
94  */
96 {
97 public:
98  /*!
99  * Creates a new barrier that waits for n threads.
100  */
101  explicit ThreadBarrierSpinning(size_t thread_count)
102  : thread_count_(thread_count) { }
103 
105  LOG0 << "ThreadBarrierSpinning() needed "
106  << wait_time_.load() << " us for " << thread_count_ << " threads "
107  << " = "
108  << wait_time_.load() / static_cast<double>(thread_count_) / 1e6
109  << " us avg";
110  }
111 
112  /*!
113  * Waits for n threads to arrive. When they have arrive, execute lambda on
114  * the one thread, which arrived last. After lambda, step the generation
115  * counter.
116  *
117  * This method blocks and returns as soon as n threads are waiting inside
118  * the method.
119  */
120  template <typename Lambda = NoOperation<void> >
121  void Await(Lambda lambda = Lambda()) {
122  // get synchronization generation step counter.
123  size_t this_step = step_.load(std::memory_order_acquire);
124 
125  if (waiting_.fetch_add(1, std::memory_order_acq_rel) == thread_count_ - 1) {
126  // we are the last thread to Await() -> reset and increment step.
127  waiting_.store(0, std::memory_order_release);
128  // step other generation counters.
129  lambda();
130  // the following statement releases all threads from busy waiting.
131  step_.fetch_add(1, std::memory_order_acq_rel);
132  }
133  else {
134  // FakeStatsTimerStart timer;
135  // spin lock awaiting the last thread to increment the step counter.
136  while (step_.load(std::memory_order_acquire) == this_step) {
137  // std::this_thread::yield();
138  }
139  // wait_time_ += timer.Microseconds();
140  }
141  }
142 
143  //! Return generation step counter
144  size_t step() const {
145  return step_.load(std::memory_order_acquire);
146  }
147 
148 protected:
149  //! number of threads
150  const size_t thread_count_;
151 
152  //! number of threads in spin lock
153  std::atomic<size_t> waiting_ { 0 };
154 
155  //! barrier synchronization generation
156  std::atomic<size_t> step_ { 0 };
157 
159 };
160 
161 // select thread barrier implementation.
162 #if THRILL_HAVE_THREAD_SANITIZER
163 using ThreadBarrier = ThreadBarrierLocking;
164 #else
166 #endif
167 
168 } // namespace common
169 } // namespace thrill
170 
171 #endif // !THRILL_COMMON_THREAD_BARRIER_HEADER
172 
173 /******************************************************************************/
size_t step_
current counter used.
std::atomic< size_t > waiting_
number of threads in spin lock
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:27
const size_t thread_count_
number of threads
ThreadBarrierSpinning(size_t thread_count)
Creates a new barrier that waits for n threads.
Implements a cyclic barrier using mutex locking and condition variables that can be used to synchroni...
size_t step() const
Return generation step counter.
size_t counts_[2]
two counters: switch between them every run.
size_t step() const
Return generation step counter.
std::atomic< size_t > step_
barrier synchronization generation
void Await(Lambda lambda=Lambda())
Waits for n threads to arrive.
ThreadBarrierSpinning ThreadBarrier
const size_t thread_count_
number of threads
ThreadBarrierLocking(size_t thread_count)
Creates a new barrier that waits for n threads.
void Await(Lambda lambda=Lambda())
Waits for n threads to arrive.
Implements a cyclic barrier using atomics and a spin lock that can be used to synchronize threads...
AtomicMovable< uint64_t > wait_time_