Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
concurrent_queue.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/common/concurrent_queue.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_COMMON_CONCURRENT_QUEUE_HEADER
13 #define THRILL_COMMON_CONCURRENT_QUEUE_HEADER
14 
15 #if THRILL_HAVE_INTELTBB
16 
17 #if __clang__ && !defined(_LIBCPP_VERSION)
18 // tbb feature detection is broken for clang + libstdc++
19 #define _LIBCPP_VERSION 4000
20 #define FAKE_LIBCPP_VERSION
21 #endif
22 
23 #include <tbb/concurrent_queue.h>
24 
25 #if defined(FAKE_LIBCPP_VERSION)
26 // undo libc++ version faking
27 #undef _LIBCPP_VERSION
28 #undef FAKE_LIBCPP_VERSION
29 #endif
30 
31 #endif // THRILL_HAVE_INTELTBB
32 
33 #include <atomic>
34 #include <deque>
35 #include <mutex>
36 
37 namespace thrill {
38 namespace common {
39 
40 /*!
41  * This is a queue, similar to std::queue and tbb::concurrent_queue, except that
42  * it uses mutexes for synchronization. This implementation is only here to be
43  * used if the Intel TBB is not available.
44  *
45  * Not all methods of tbb:concurrent_queue<> are available here, please add them
46  * if you need them. However, NEVER add any other methods that you might need.
47  *
48  * StyleGuide is violated, because signatures MUST match those in the TBB
49  * version.
50  */
51 template <typename T, typename Allocator>
53 {
54 public:
55  using value_type = T;
56  using reference = T &;
57  using const_reference = const T &;
58  using size_type = std::size_t;
59  using difference_type = std::ptrdiff_t;
60 
61 private:
62  //! the actual data queue
63  std::deque<T, Allocator> queue_;
64 
65  //! the mutex to lock before accessing the queue
66  mutable std::mutex mutex_;
67 
68 public:
69  //! Constructor
70  explicit OurConcurrentQueue(const Allocator& alloc = Allocator())
71  : queue_(alloc) { }
72 
73  //! Pushes a copy of source onto back of the queue.
74  void push(const T& source) {
75  std::unique_lock<std::mutex> lock(mutex_);
76  queue_.push_back(source);
77  }
78 
79  //! Pushes given element into the queue by utilizing element's move
80  //! constructor
81  void push(T&& elem) {
82  std::unique_lock<std::mutex> lock(mutex_);
83  queue_.push_back(std::move(elem));
84  }
85 
86  //! Pushes a new element into the queue. The element is constructed with
87  //! given arguments.
88  template <typename... Arguments>
89  void emplace(Arguments&& ... args) {
90  std::unique_lock<std::mutex> lock(mutex_);
91  queue_.emplace_back(std::forward<Arguments>(args) ...);
92  }
93 
94  //! Returns: true if queue has no items; false otherwise.
95  bool empty() const {
96  std::unique_lock<std::mutex> lock(mutex_);
97  return queue_.empty();
98  }
99 
100  //! If value is available, pops it from the queue, assigns it to
101  //! destination, and destroys the original value. Otherwise does nothing.
102  bool try_pop(T& destination) {
103  std::unique_lock<std::mutex> lock(mutex_);
104  if (queue_.empty())
105  return false;
106 
107  destination = std::move(queue_.front());
108  queue_.pop_front();
109  return true;
110  }
111 
112  //! Clears the queue.
113  void clear() {
114  std::unique_lock<std::mutex> lock(mutex_);
115  queue_.clear();
116  }
117 };
118 
119 #if THRILL_HAVE_INTELTBB
120 
121 template <typename T, typename Allocator>
122 using ConcurrentQueue = tbb::concurrent_queue<T, Allocator>;
123 
124 #else // !THRILL_HAVE_INTELTBB
125 
126 template <typename T, typename Allocator>
128 
129 #endif // !THRILL_HAVE_INTELTBB
130 
131 } // namespace common
132 } // namespace thrill
133 
134 #endif // !THRILL_COMMON_CONCURRENT_QUEUE_HEADER
135 
136 /******************************************************************************/
void push(const T &source)
Pushes a copy of source onto back of the queue.
double T
std::mutex mutex_
the mutex to lock before accessing the queue
OurConcurrentQueue(const Allocator &alloc=Allocator())
Constructor.
std::deque< T, Allocator > queue_
the actual data queue
OurConcurrentQueue< T, Allocator > ConcurrentQueue
This is a queue, similar to std::queue and tbb::concurrent_queue, except that it uses mutexes for syn...
bool empty() const
Returns: true if queue has no items; false otherwise.
void emplace(Arguments &&...args)