Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
concurrent_bounded_queue.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/common/concurrent_bounded_queue.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_COMMON_CONCURRENT_BOUNDED_QUEUE_HEADER
13 #define THRILL_COMMON_CONCURRENT_BOUNDED_QUEUE_HEADER
14 
15 #if THRILL_HAVE_INTELTBB
16 
17 #if __clang__ && !defined(_LIBCPP_VERSION)
18 // tbb feature detection is broken for clang + libstdc++
19 #define _LIBCPP_VERSION 4000
20 #define FAKE_LIBCPP_VERSION
21 #endif
22 
23 #include <tbb/concurrent_queue.h>
24 
25 #if defined(FAKE_LIBCPP_VERSION)
26 // undo libc++ version faking
27 #undef _LIBCPP_VERSION
28 #undef FAKE_LIBCPP_VERSION
29 #endif
30 
31 #endif // THRILL_HAVE_INTELTBB
32 
33 #include <atomic>
34 #include <chrono>
35 #include <condition_variable>
36 #include <mutex>
37 #include <queue>
38 
39 namespace thrill {
40 namespace common {
41 
42 /*!
43  * This is a queue, similar to std::queue and tbb::concurrent_bounded_queue,
44  * except that it uses mutexes for synchronization. This implementation is only
45  * here to be used if the Intel TBB is not available.
46  *
47  * Not all methods of tbb::concurrent_bounded_queue<> are available here, please
48  * add them if you need them. However, NEVER add any other methods that you
49  * might need.
50  *
51  * StyleGuide is violated, because signatures MUST match those in the TBB
52  * version.
53  */
54 template <typename T>
56 {
57 public:
58  using value_type = T;
59  using reference = T &;
60  using const_reference = const T &;
61  using size_type = std::size_t;
62  using difference_type = std::ptrdiff_t;
63 
64 private:
65  //! the actual data queue
66  std::queue<T> queue_;
67 
68  //! the mutex to lock before accessing the queue
69  mutable std::mutex mutex_;
70 
71  //! condition variable signaled when an item arrives
72  std::condition_variable cv_;
73 
74 public:
75  //! default constructor
76  OurConcurrentBoundedQueue() = default;
77 
78  //! move-constructor
80  std::unique_lock<std::mutex> lock(other.mutex_);
81  queue_ = std::move(other.queue_);
82  }
83 
84  //! Pushes a copy of source onto back of the queue.
85  void push(const T& source) {
86  std::unique_lock<std::mutex> lock(mutex_);
87  queue_.push(source);
88  cv_.notify_one();
89  }
90 
91  //! Pushes given element into the queue by utilizing element's move
92  //! constructor
93  void push(T&& elem) {
94  std::unique_lock<std::mutex> lock(mutex_);
95  queue_.push(std::move(elem));
96  cv_.notify_one();
97  }
98 
99  //! Pushes a new element into the queue. The element is constructed with
100  //! given arguments.
101  template <typename... Arguments>
102  void emplace(Arguments&& ... args) {
103  std::unique_lock<std::mutex> lock(mutex_);
104  queue_.emplace(std::forward<Arguments>(args) ...);
105  cv_.notify_one();
106  }
107 
108  //! Returns: true if queue has no items; false otherwise.
109  bool empty() const {
110  std::unique_lock<std::mutex> lock(mutex_);
111  return queue_.empty();
112  }
113 
114  //! Clears the queue.
115  void clear() {
116  std::unique_lock<std::mutex> lock(mutex_);
117  queue_.clear();
118  }
119 
120  //! If value is available, pops it from the queue, move it to destination,
121  //! destroying the original position. Otherwise does nothing.
122  bool try_pop(T& destination) {
123  std::unique_lock<std::mutex> lock(mutex_);
124  if (queue_.empty())
125  return false;
126 
127  destination = std::move(queue_.front());
128  queue_.pop();
129  return true;
130  }
131 
132  //! If value is available, pops it from the queue, move it to
133  //! destination. If no item is in the queue, wait until there is one.
134  void pop(T& destination) {
135  std::unique_lock<std::mutex> lock(mutex_);
136  while (queue_.empty())
137  cv_.wait(lock);
138  destination = std::move(queue_.front());
139  queue_.pop();
140  }
141 
142  //! If value is available, pops it from the queue, move it to
143  //! destination. If no item is in the queue, wait until there is one, or
144  //! timeout and return false. NOTE: not available in TBB!
145  template <typename Rep, typename Period>
146  bool pop_for(T& destination,
147  const std::chrono::duration<Rep, Period>& timeout) {
148  std::unique_lock<std::mutex> lock(mutex_);
149  if (!cv_.wait_for(lock, timeout, [=]() { return !queue_.empty(); })) {
150  return false;
151  }
152  destination = std::move(queue_.front());
153  queue_.pop();
154  return true;
155  }
156 
157  //! return number of items available in the queue (tbb says "can return
158  //! negative size", due to pending pop()s, but we ignore that here).
159  size_t size() {
160  std::unique_lock<std::mutex> lock(mutex_);
161  return queue_.size();
162  }
163 };
164 
165 #if THRILL_HAVE_INTELTBB
166 
167 template <typename T>
168 using ConcurrentBoundedQueue = tbb::concurrent_bounded_queue<T>;
169 
170 #else // !THRILL_HAVE_INTELTBB
171 
172 template <typename T>
174 
175 #endif // !THRILL_HAVE_INTELTBB
176 
177 } // namespace common
178 } // namespace thrill
179 
180 #endif // !THRILL_COMMON_CONCURRENT_BOUNDED_QUEUE_HEADER
181 
182 /******************************************************************************/
OurConcurrentBoundedQueue()=default
default constructor
This is a queue, similar to std::queue and tbb::concurrent_bounded_queue, except that it uses mutexes...
bool empty() const
Returns: true if queue has no items; false otherwise.
double T
pair of (source worker, Block) stored in the main mix queue.
OurConcurrentBoundedQueue< T > ConcurrentBoundedQueue
std::mutex mutex_
the mutex to lock before accessing the queue
void push(const T &source)
Pushes a copy of source onto back of the queue.
std::queue< T > queue_
the actual data queue
OurConcurrentBoundedQueue(OurConcurrentBoundedQueue &&other)
move-constructor
std::condition_variable cv_
condition variable signaled when an item arrives
bool pop_for(T &destination, const std::chrono::duration< Rep, Period > &timeout)