12 #ifndef THRILL_COMMON_CONCURRENT_BOUNDED_QUEUE_HEADER 13 #define THRILL_COMMON_CONCURRENT_BOUNDED_QUEUE_HEADER 17 #include <condition_variable> 49 std::condition_variable
cv_;
57 std::unique_lock<std::mutex> lock(other.mutex_);
58 queue_ = std::move(other.queue_);
63 std::unique_lock<std::mutex> lock(mutex_);
71 std::unique_lock<std::mutex> lock(mutex_);
72 queue_.push(std::move(elem));
78 template <
typename... Arguments>
80 std::unique_lock<std::mutex> lock(mutex_);
81 queue_.emplace(std::forward<Arguments>(args) ...);
87 std::unique_lock<std::mutex> lock(mutex_);
88 return queue_.empty();
93 std::unique_lock<std::mutex> lock(mutex_);
100 std::unique_lock<std::mutex> lock(mutex_);
104 destination = std::move(queue_.front());
112 std::unique_lock<std::mutex> lock(mutex_);
113 while (queue_.empty())
115 destination = std::move(queue_.front());
122 template <
typename Rep,
typename Period>
124 const std::chrono::duration<Rep, Period>& timeout) {
125 std::unique_lock<std::mutex> lock(mutex_);
126 if (!cv_.wait_for(lock, timeout, [=]() { return !queue_.empty(); })) {
129 destination = std::move(queue_.front());
137 std::unique_lock<std::mutex> lock(mutex_);
138 return queue_.size();
145 #endif // !THRILL_COMMON_CONCURRENT_BOUNDED_QUEUE_HEADER std::mutex mutex_
the mutex to lock before accessing the queue
ConcurrentBoundedQueue()=default
default constructor
pair of (source worker, Block) stored in the main mix queue.
void emplace(Arguments &&... args)
void push(const T &source)
Pushes a copy of source onto back of the queue.
std::condition_variable cv_
condition variable signaled when an item arrives
std::ptrdiff_t difference_type
bool pop_for(T &destination, const std::chrono::duration< Rep, Period > &timeout)
This is a queue, similar to std::queue and tbb::concurrent_bounded_queue, except that it uses mutexes...
bool try_pop(T &destination)
ConcurrentBoundedQueue(ConcurrentBoundedQueue &&other)
move-constructor
void clear()
Clears the queue.
bool empty() const
Returns: true if queue has no items; false otherwise.
std::queue< T > queue_
the actual data queue