Thrill  0.1
thread_pool.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * tlx/thread_pool.cpp
3  *
4  * Part of tlx - http://panthema.net/tlx
5  *
6  * Copyright (C) 2015-2019 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the Boost Software License, Version 1.0
9  ******************************************************************************/
10 
11 #include <tlx/thread_pool.hpp>
12 
13 #include <iostream>
14 
15 namespace tlx {
16 
17 ThreadPool::ThreadPool(size_t num_threads, InitThread&& init_thread)
18  : threads_(num_threads),
19  init_thread_(std::move(init_thread)) {
20  // immediately construct worker threads
21  for (size_t i = 0; i < num_threads; ++i)
22  threads_[i] = std::thread(&ThreadPool::worker, this, i);
23 }
24 
26  std::unique_lock<std::mutex> lock(mutex_);
27  // set stop-condition
28  terminate_ = true;
29  cv_jobs_.notify_all();
30  lock.unlock();
31 
32  // all threads terminate, then we're done
33  for (size_t i = 0; i < threads_.size(); ++i)
34  threads_[i].join();
35 }
36 
37 void ThreadPool::enqueue(Job&& job) {
38  std::unique_lock<std::mutex> lock(mutex_);
39  jobs_.emplace_back(std::move(job));
40  cv_jobs_.notify_one();
41 }
42 
44  std::unique_lock<std::mutex> lock(mutex_);
45  cv_finished_.wait(lock, [this]() { return jobs_.empty() && (busy_ == 0); });
46  std::atomic_thread_fence(std::memory_order_seq_cst);
47 }
48 
50  std::unique_lock<std::mutex> lock(mutex_);
51  cv_finished_.wait(lock, [this]() { return terminate_ && (busy_ == 0); });
52  std::atomic_thread_fence(std::memory_order_seq_cst);
53 }
54 
56  std::unique_lock<std::mutex> lock(mutex_);
57  // flag termination
58  terminate_ = true;
59  // wake up all worker threads and let them terminate.
60  cv_jobs_.notify_all();
61  // notify LoopUntilTerminate in case all threads are idle.
62  cv_finished_.notify_one();
63 }
64 
65 size_t ThreadPool::done() const {
66  return done_;
67 }
68 
69 size_t ThreadPool::size() const {
70  return threads_.size();
71 }
72 
73 size_t ThreadPool::idle() const {
74  return idle_;
75 }
76 
77 bool ThreadPool::has_idle() const {
78  return (idle_.load(std::memory_order_relaxed) != 0);
79 }
80 
81 std::thread& ThreadPool::thread(size_t i) {
82  assert(i < threads_.size());
83  return threads_[i];
84 }
85 
86 void ThreadPool::worker(size_t p) {
87  if (init_thread_)
88  init_thread_(p);
89 
90  // lock mutex, it is released during condition waits
91  std::unique_lock<std::mutex> lock(mutex_);
92 
93  while (true) {
94  // wait on condition variable until job arrives, frees lock
95  if (!terminate_ && jobs_.empty()) {
96  ++idle_;
97  cv_jobs_.wait(
98  lock, [this]() { return terminate_ || !jobs_.empty(); });
99  --idle_;
100  }
101 
102  if (terminate_)
103  break;
104 
105  if (!jobs_.empty()) {
106  // got work. set busy.
107  ++busy_;
108 
109  {
110  // pull job.
111  Job job = std::move(jobs_.front());
112  jobs_.pop_front();
113 
114  // release lock.
115  lock.unlock();
116 
117  // execute job.
118  try {
119  job();
120  }
121  catch (std::exception& e) {
122  std::cerr << "EXCEPTION: " << e.what() << std::endl;
123  }
124  // destroy job by closing scope
125  }
126 
127  // release memory the Job changed
128  std::atomic_thread_fence(std::memory_order_seq_cst);
129 
130  ++done_;
131  --busy_;
132 
133  // relock mutex before signaling condition.
134  lock.lock();
135  cv_finished_.notify_one();
136  }
137  }
138 }
139 
140 } // namespace tlx
141 
142 /******************************************************************************/
std::atomic< size_t > done_
Counter for total number of jobs executed.
Definition: thread_pool.hpp:91
std::deque< Job > jobs_
Deque of scheduled jobs.
Definition: thread_pool.hpp:72
simple_vector< std::thread > threads_
threads in pool
Definition: thread_pool.hpp:78
size_t done() const
Return number of jobs currently completed.
Definition: thread_pool.cpp:65
size_t size() const
Return number of threads in pool.
Definition: thread_pool.cpp:69
std::atomic< bool > terminate_
Flag whether to terminate.
Definition: thread_pool.hpp:94
std::condition_variable cv_jobs_
Definition: thread_pool.hpp:82
STL namespace.
void worker(size_t p)
Worker function, one per thread is started.
Definition: thread_pool.cpp:86
void loop_until_empty()
Definition: thread_pool.cpp:43
~ThreadPool()
Stop processing jobs, terminate threads.
Definition: thread_pool.cpp:25
ThreadPool(size_t num_threads=std::thread::hardware_concurrency(), InitThread &&init_thread=InitThread())
Construct running thread pool of num_threads.
Definition: thread_pool.cpp:17
void enqueue(Job &&job)
enqueue a Job, the caller must pass in all context using captures.
Definition: thread_pool.cpp:37
std::thread & thread(size_t i)
Return thread handle to thread i.
Definition: thread_pool.cpp:81
bool has_idle() const
true if any thread is idle (= waiting for jobs)
Definition: thread_pool.cpp:77
std::atomic< size_t > busy_
Counter for number of threads busy.
Definition: thread_pool.hpp:87
std::atomic< size_t > idle_
Counter for number of idle threads waiting for a job.
Definition: thread_pool.hpp:89
std::string join(char glue, const std::vector< std::string > &parts)
Join a vector of strings by some glue character between each pair from the sequence.
Definition: join.cpp:16
std::condition_variable cv_finished_
Condition variable to signal when a jobs finishes.
Definition: thread_pool.hpp:84
void loop_until_terminate()
Loop until terminate flag was set.
Definition: thread_pool.cpp:49
size_t idle() const
return number of idle threads in pool
Definition: thread_pool.cpp:73
std::mutex mutex_
Mutex used to access the queue of scheduled jobs.
Definition: thread_pool.hpp:75
InitThread init_thread_
Run once per worker thread.
Definition: thread_pool.hpp:97