Thrill  0.1
concurrent_bounded_queue.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/common/concurrent_bounded_queue.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_COMMON_CONCURRENT_BOUNDED_QUEUE_HEADER
13 #define THRILL_COMMON_CONCURRENT_BOUNDED_QUEUE_HEADER
14 
15 #include <atomic>
16 #include <chrono>
17 #include <condition_variable>
18 #include <mutex>
19 #include <queue>
20 
21 namespace thrill {
22 namespace common {
23 
24 /*!
25  * This is a queue, similar to std::queue and tbb::concurrent_bounded_queue,
26  * except that it uses mutexes for synchronization.
27 *
28  * StyleGuide is violated, because signatures are expected to match those of
29  * std::queue.
30  */
31 template <typename T>
33 {
34 public:
35  using value_type = T;
36  using reference = T&;
37  using const_reference = const T&;
38  using size_type = std::size_t;
39  using difference_type = std::ptrdiff_t;
40 
41 private:
42  //! the actual data queue
43  std::queue<T> queue_;
44 
45  //! the mutex to lock before accessing the queue
46  mutable std::mutex mutex_;
47 
48  //! condition variable signaled when an item arrives
49  std::condition_variable cv_;
50 
51 public:
52  //! default constructor
53  ConcurrentBoundedQueue() = default;
54 
55  //! move-constructor
57  std::unique_lock<std::mutex> lock(other.mutex_);
58  queue_ = std::move(other.queue_);
59  }
60 
61  //! Pushes a copy of source onto back of the queue.
62  void push(const T& source) {
63  std::unique_lock<std::mutex> lock(mutex_);
64  queue_.push(source);
65  cv_.notify_one();
66  }
67 
68  //! Pushes given element into the queue by utilizing element's move
69  //! constructor
70  void push(T&& elem) {
71  std::unique_lock<std::mutex> lock(mutex_);
72  queue_.push(std::move(elem));
73  cv_.notify_one();
74  }
75 
76  //! Pushes a new element into the queue. The element is constructed with
77  //! given arguments.
78  template <typename... Arguments>
79  void emplace(Arguments&& ... args) {
80  std::unique_lock<std::mutex> lock(mutex_);
81  queue_.emplace(std::forward<Arguments>(args) ...);
82  cv_.notify_one();
83  }
84 
85  //! Returns: true if queue has no items; false otherwise.
86  bool empty() const {
87  std::unique_lock<std::mutex> lock(mutex_);
88  return queue_.empty();
89  }
90 
91  //! Clears the queue.
92  void clear() {
93  std::unique_lock<std::mutex> lock(mutex_);
94  queue_.clear();
95  }
96 
97  //! If value is available, pops it from the queue, move it to destination,
98  //! destroying the original position. Otherwise does nothing.
99  bool try_pop(T& destination) {
100  std::unique_lock<std::mutex> lock(mutex_);
101  if (queue_.empty())
102  return false;
103 
104  destination = std::move(queue_.front());
105  queue_.pop();
106  return true;
107  }
108 
109  //! If value is available, pops it from the queue, move it to
110  //! destination. If no item is in the queue, wait until there is one.
111  void pop(T& destination) {
112  std::unique_lock<std::mutex> lock(mutex_);
113  while (queue_.empty())
114  cv_.wait(lock);
115  destination = std::move(queue_.front());
116  queue_.pop();
117  }
118 
119  //! If value is available, pops it from the queue, move it to
120  //! destination. If no item is in the queue, wait until there is one, or
121  //! timeout and return false. NOTE: not available in TBB!
122  template <typename Rep, typename Period>
123  bool pop_for(T& destination,
124  const std::chrono::duration<Rep, Period>& timeout) {
125  std::unique_lock<std::mutex> lock(mutex_);
126  if (!cv_.wait_for(lock, timeout, [=]() { return !queue_.empty(); })) {
127  return false;
128  }
129  destination = std::move(queue_.front());
130  queue_.pop();
131  return true;
132  }
133 
134  //! return number of items available in the queue (tbb says "can return
135  //! negative size", due to pending pop()s, but we ignore that here).
136  size_t size() {
137  std::unique_lock<std::mutex> lock(mutex_);
138  return queue_.size();
139  }
140 };
141 
142 } // namespace common
143 } // namespace thrill
144 
145 #endif // !THRILL_COMMON_CONCURRENT_BOUNDED_QUEUE_HEADER
146 
147 /******************************************************************************/
std::mutex mutex_
the mutex to lock before accessing the queue
ConcurrentBoundedQueue()=default
default constructor
double T
pair of (source worker, Block) stored in the main mix queue.
void push(const T &source)
Pushes a copy of source onto back of the queue.
std::condition_variable cv_
condition variable signaled when an item arrives
bool pop_for(T &destination, const std::chrono::duration< Rep, Period > &timeout)
This is a queue, similar to std::queue and tbb::concurrent_bounded_queue, except that it uses mutexes...
ConcurrentBoundedQueue(ConcurrentBoundedQueue &&other)
move-constructor
bool empty() const
Returns: true if queue has no items; false otherwise.
std::queue< T > queue_
the actual data queue