Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
thread_pool.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * tlx/thread_pool.hpp
3  *
4  * Part of tlx - http://panthema.net/tlx
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the Boost Software License, Version 1.0
9  ******************************************************************************/
10 
11 #ifndef TLX_THREAD_POOL_HEADER
12 #define TLX_THREAD_POOL_HEADER
13 
14 #include <atomic>
15 #include <cassert>
16 #include <condition_variable>
17 #include <deque>
18 #include <mutex>
19 #include <thread>
20 
21 #include <tlx/delegate.hpp>
22 #include <tlx/simple_vector.hpp>
23 
24 namespace tlx {
25 
26 /*!
27  * ThreadPool starts a fixed number p of std::threads which process Jobs that
28  * are \ref enqueue "enqueued" into a concurrent job queue. The jobs
29  * themselves can enqueue more jobs that will be processed when a thread is
30  * ready.
31  *
32  * The ThreadPool can either run until
33  *
34  * 1. all jobs are done AND all threads are idle, when called with
35  * loop_until_empty(), or
36  *
37  * 2. until Terminate() is called when run with loop_until_terminate().
38  *
39  * Jobs are plain tlx::Delegate<void()> objects, hence the pool user must pass
40  * in ALL CONTEXT himself. The best method to pass parameters to Jobs is to use
41  * lambda captures. Alternatively, old-school objects implementing operator(),
42  * or std::binds can be used.
43  *
44  * The ThreadPool uses a condition variable to wait for new jobs and does not
45  * remain busy waiting.
46  *
47  * Note that the threads in the pool start **before** the two loop functions are
48  * called. In case of loop_until_empty() the threads continue to be idle
49  * afterwards, and can be reused, until the ThreadPool is destroyed.
50 
51 \code
52 ThreadPool pool(4); // pool with 4 threads
53 
54 int value = 0;
55 pool.enqueue([&value]() {
56  // increment value in another thread.
57  ++value;
58 });
59 
60 pool.loop_until_empty();
61 \endcode
62 
63  */
65 {
66 public:
68 
69 private:
70  //! Deque of scheduled jobs.
71  std::deque<Job> jobs_;
72 
73  //! Mutex used to access the queue of scheduled jobs.
74  std::mutex mutex_;
75 
76  //! threads in pool
78 
79  //! Condition variable used to notify that a new job has been inserted in
80  //! the queue.
81  std::condition_variable cv_jobs_;
82  //! Condition variable to signal when a jobs finishes.
83  std::condition_variable cv_finished_;
84 
85  //! Counter for number of threads busy.
86  std::atomic<size_t> busy_ = { 0 };
87  //! Counter for total number of jobs executed
88  std::atomic<size_t> done_ = { 0 };
89 
90  //! Flag whether to terminate
91  std::atomic<bool> terminate_ = { false };
92 
93 public:
94  //! Construct running thread pool of num_threads
95  explicit ThreadPool(
96  size_t num_threads = std::thread::hardware_concurrency());
97 
98  //! non-copyable: delete copy-constructor
99  ThreadPool(const ThreadPool&) = delete;
100  //! non-copyable: delete assignment operator
101  ThreadPool& operator = (const ThreadPool&) = delete;
102 
103  //! Stop processing jobs, terminate threads.
104  ~ThreadPool();
105 
106  //! enqueue a Job, the caller must pass in all context using captures.
107  void enqueue(Job&& job) {
108  std::unique_lock<std::mutex> lock(mutex_);
109  jobs_.emplace_back(std::move(job));
110  cv_jobs_.notify_all();
111  }
112 
113  //! Loop until no more jobs are in the queue AND all threads are idle. When
114  //! this occurs, this method exits, however, the threads remain active.
115  void loop_until_empty();
116 
117  //! Loop until terminate flag was set.
118  void loop_until_terminate();
119 
120  //! Terminate thread pool gracefully, wait until currently running jobs
121  //! finish and then exit. This should be called from within one of the
122  //! enqueue jobs or from an outside thread.
123  void terminate();
124 
125  //! Return number of jobs currently completed.
126  size_t done() const { return done_; }
127 
128  //! Return number of threads in pool
129  size_t size() const { return threads_.size(); }
130 
131  //! Return thread handle to thread i
132  std::thread& thread(size_t i) {
133  assert(i < threads_.size());
134  return threads_[i];
135  }
136 
137 private:
138  //! Worker function, one per thread is started.
139  void worker();
140 };
141 
142 } // namespace tlx
143 
144 #endif // !TLX_THREAD_POOL_HEADER
145 
146 /******************************************************************************/
std::atomic< size_t > done_
Counter for total number of jobs executed.
Definition: thread_pool.hpp:88
std::deque< Job > jobs_
Deque of scheduled jobs.
Definition: thread_pool.hpp:71
size_t done() const
Return number of jobs currently completed.
ThreadPool(size_t num_threads=std::thread::hardware_concurrency())
Construct running thread pool of num_threads.
Definition: thread_pool.cpp:17
size_t size() const
Return number of threads in pool.
std::thread & thread(size_t i)
Return thread handle to thread i.
std::atomic< bool > terminate_
Flag whether to terminate.
Definition: thread_pool.hpp:91
std::condition_variable cv_jobs_
Definition: thread_pool.hpp:81
void loop_until_empty()
Definition: thread_pool.cpp:36
SimpleVector< std::thread > threads_
threads in pool
Definition: thread_pool.hpp:77
~ThreadPool()
Stop processing jobs, terminate threads.
Definition: thread_pool.cpp:24
void enqueue(Job &&job)
enqueue a Job, the caller must pass in all context using captures.
void worker()
Worker function, one per thread is started.
Definition: thread_pool.cpp:58
std::atomic< size_t > busy_
Counter for number of threads busy.
Definition: thread_pool.hpp:86
size_type size() const noexcept
return number of items in vector
ThreadPool & operator=(const ThreadPool &)=delete
non-copyable: delete assignment operator
std::condition_variable cv_finished_
Condition variable to signal when a jobs finishes.
Definition: thread_pool.hpp:83
void loop_until_terminate()
Loop until terminate flag was set.
Definition: thread_pool.cpp:42
ThreadPool starts a fixed number p of std::threads which process Jobs that are enqueued into a concur...
Definition: thread_pool.hpp:64
std::mutex mutex_
Mutex used to access the queue of scheduled jobs.
Definition: thread_pool.hpp:74