Thrill  0.1
thread_pool.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * tlx/thread_pool.hpp
3  *
4  * Part of tlx - http://panthema.net/tlx
5  *
6  * Copyright (C) 2015-2019 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the Boost Software License, Version 1.0
9  ******************************************************************************/
10 
11 #ifndef TLX_THREAD_POOL_HEADER
12 #define TLX_THREAD_POOL_HEADER
13 
14 #include <atomic>
15 #include <cassert>
16 #include <condition_variable>
17 #include <deque>
18 #include <mutex>
19 #include <thread>
20 
22 #include <tlx/delegate.hpp>
23 
24 namespace tlx {
25 
26 /*!
27  * ThreadPool starts a fixed number p of std::threads which process Jobs that
28  * are \ref enqueue "enqueued" into a concurrent job queue. The jobs
29  * themselves can enqueue more jobs that will be processed when a thread is
30  * ready.
31  *
32  * The ThreadPool can either run until
33  *
34  * 1. all jobs are done AND all threads are idle, when called with
35  * loop_until_empty(), or
36  *
37  * 2. until Terminate() is called when run with loop_until_terminate().
38  *
39  * Jobs are plain tlx::Delegate<void()> objects, hence the pool user must pass
40  * in ALL CONTEXT himself. The best method to pass parameters to Jobs is to use
41  * lambda captures. Alternatively, old-school objects implementing operator(),
42  * or std::binds can be used.
43  *
44  * The ThreadPool uses a condition variable to wait for new jobs and does not
45  * remain busy waiting.
46  *
47  * Note that the threads in the pool start **before** the two loop functions are
48  * called. In case of loop_until_empty() the threads continue to be idle
49  * afterwards, and can be reused, until the ThreadPool is destroyed.
50 
51 \code
52 ThreadPool pool(4); // pool with 4 threads
53 
54 int value = 0;
55 pool.enqueue([&value]() {
56  // increment value in another thread.
57  ++value;
58 });
59 
60 pool.loop_until_empty();
61 \endcode
62 
63  */
65 {
66 public:
69 
70 private:
71  //! Deque of scheduled jobs.
72  std::deque<Job> jobs_;
73 
74  //! Mutex used to access the queue of scheduled jobs.
75  std::mutex mutex_;
76 
77  //! threads in pool
79 
80  //! Condition variable used to notify that a new job has been inserted in
81  //! the queue.
82  std::condition_variable cv_jobs_;
83  //! Condition variable to signal when a jobs finishes.
84  std::condition_variable cv_finished_;
85 
86  //! Counter for number of threads busy.
87  std::atomic<size_t> busy_ = { 0 };
88  //! Counter for number of idle threads waiting for a job.
89  std::atomic<size_t> idle_ = { 0 };
90  //! Counter for total number of jobs executed
91  std::atomic<size_t> done_ = { 0 };
92 
93  //! Flag whether to terminate
94  std::atomic<bool> terminate_ = { false };
95 
96  //! Run once per worker thread
98 
99 public:
100  //! Construct running thread pool of num_threads
101  ThreadPool(
102  size_t num_threads = std::thread::hardware_concurrency(),
103  InitThread&& init_thread = InitThread());
104 
105  //! non-copyable: delete copy-constructor
106  ThreadPool(const ThreadPool&) = delete;
107  //! non-copyable: delete assignment operator
108  ThreadPool& operator = (const ThreadPool&) = delete;
109 
110  //! Stop processing jobs, terminate threads.
111  ~ThreadPool();
112 
113  //! enqueue a Job, the caller must pass in all context using captures.
114  void enqueue(Job&& job);
115 
116  //! Loop until no more jobs are in the queue AND all threads are idle. When
117  //! this occurs, this method exits, however, the threads remain active.
118  void loop_until_empty();
119 
120  //! Loop until terminate flag was set.
121  void loop_until_terminate();
122 
123  //! Terminate thread pool gracefully, wait until currently running jobs
124  //! finish and then exit. This should be called from within one of the
125  //! enqueue jobs or from an outside thread.
126  void terminate();
127 
128  //! Return number of jobs currently completed.
129  size_t done() const;
130 
131  //! Return number of threads in pool
132  size_t size() const;
133 
134  //! return number of idle threads in pool
135  size_t idle() const;
136 
137  //! true if any thread is idle (= waiting for jobs)
138  bool has_idle() const;
139 
140  //! Return thread handle to thread i
141  std::thread& thread(size_t i);
142 
143 private:
144  //! Worker function, one per thread is started.
145  void worker(size_t p);
146 };
147 
148 } // namespace tlx
149 
150 #endif // !TLX_THREAD_POOL_HEADER
151 
152 /******************************************************************************/
Delegate< void(size_t)> InitThread
Definition: thread_pool.hpp:68
std::atomic< size_t > done_
Counter for total number of jobs executed.
Definition: thread_pool.hpp:91
std::deque< Job > jobs_
Deque of scheduled jobs.
Definition: thread_pool.hpp:72
simple_vector< std::thread > threads_
threads in pool
Definition: thread_pool.hpp:78
size_t done() const
Return number of jobs currently completed.
Definition: thread_pool.cpp:65
size_t size() const
Return number of threads in pool.
Definition: thread_pool.cpp:69
Simpler non-growing vector without initialization.
std::atomic< bool > terminate_
Flag whether to terminate.
Definition: thread_pool.hpp:94
std::condition_variable cv_jobs_
Definition: thread_pool.hpp:82
void worker(size_t p)
Worker function, one per thread is started.
Definition: thread_pool.cpp:86
void loop_until_empty()
Definition: thread_pool.cpp:43
~ThreadPool()
Stop processing jobs, terminate threads.
Definition: thread_pool.cpp:25
ThreadPool(size_t num_threads=std::thread::hardware_concurrency(), InitThread &&init_thread=InitThread())
Construct running thread pool of num_threads.
Definition: thread_pool.cpp:17
void enqueue(Job &&job)
enqueue a Job, the caller must pass in all context using captures.
Definition: thread_pool.cpp:37
std::thread & thread(size_t i)
Return thread handle to thread i.
Definition: thread_pool.cpp:81
bool has_idle() const
true if any thread is idle (= waiting for jobs)
Definition: thread_pool.cpp:77
std::atomic< size_t > busy_
Counter for number of threads busy.
Definition: thread_pool.hpp:87
ThreadPool & operator=(const ThreadPool &)=delete
non-copyable: delete assignment operator
std::atomic< size_t > idle_
Counter for number of idle threads waiting for a job.
Definition: thread_pool.hpp:89
std::condition_variable cv_finished_
Condition variable to signal when a jobs finishes.
Definition: thread_pool.hpp:84
void loop_until_terminate()
Loop until terminate flag was set.
Definition: thread_pool.cpp:49
ThreadPool starts a fixed number p of std::threads which process Jobs that are enqueued into a concur...
Definition: thread_pool.hpp:64
size_t idle() const
return number of idle threads in pool
Definition: thread_pool.cpp:73
std::mutex mutex_
Mutex used to access the queue of scheduled jobs.
Definition: thread_pool.hpp:75
InitThread init_thread_
Run once per worker thread.
Definition: thread_pool.hpp:97