Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
thread_pool.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * tlx/thread_pool.cpp
3  *
4  * Part of tlx - http://panthema.net/tlx
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the Boost Software License, Version 1.0
9  ******************************************************************************/
10 
11 #include <tlx/thread_pool.hpp>
12 
13 #include <iostream>
14 
15 namespace tlx {
16 
17 ThreadPool::ThreadPool(size_t num_threads)
18  : threads_(num_threads) {
19  // immediately construct worker threads
20  for (size_t i = 0; i < num_threads; ++i)
21  threads_[i] = std::thread(&ThreadPool::worker, this);
22 }
23 
25  std::unique_lock<std::mutex> lock(mutex_);
26  // set stop-condition
27  terminate_ = true;
28  cv_jobs_.notify_all();
29  lock.unlock();
30 
31  // all threads terminate, then we're done
32  for (size_t i = 0; i < threads_.size(); ++i)
33  threads_[i].join();
34 }
35 
37  std::unique_lock<std::mutex> lock(mutex_);
38  cv_finished_.wait(lock, [this]() { return jobs_.empty() && (busy_ == 0); });
39  std::atomic_thread_fence(std::memory_order_seq_cst);
40 }
41 
43  std::unique_lock<std::mutex> lock(mutex_);
44  cv_finished_.wait(lock, [this]() { return terminate_ && (busy_ == 0); });
45  std::atomic_thread_fence(std::memory_order_seq_cst);
46 }
47 
49  std::unique_lock<std::mutex> lock(mutex_);
50  // flag termination
51  terminate_ = true;
52  // wake up all worker threads and let them terminate.
53  cv_jobs_.notify_all();
54  // notify LoopUntilTerminate in case all threads are idle.
55  cv_finished_.notify_one();
56 }
57 
59  // lock mutex, it is released during condition waits
60  std::unique_lock<std::mutex> lock(mutex_);
61 
62  while (true) {
63  // wait on condition variable until job arrives, frees lock
64  cv_jobs_.wait(lock, [this]() { return terminate_ || !jobs_.empty(); });
65 
66  if (terminate_)
67  break;
68 
69  if (!jobs_.empty()) {
70  // got work. set busy.
71  ++busy_;
72 
73  {
74  // pull job.
75  Job job = std::move(jobs_.front());
76  jobs_.pop_front();
77 
78  // release lock.
79  lock.unlock();
80 
81  // execute job.
82  try {
83  job();
84  }
85  catch (std::exception& e) {
86  std::cerr << "EXCEPTION: " << e.what() << std::endl;
87  }
88  // destroy job by closing scope
89  }
90 
91  // release memory the Job changed
92  std::atomic_thread_fence(std::memory_order_seq_cst);
93 
94  ++done_;
95  --busy_;
96 
97  // relock mutex before signaling condition.
98  lock.lock();
99  cv_finished_.notify_one();
100  }
101  }
102 }
103 
104 } // namespace tlx
105 
106 /******************************************************************************/
std::atomic< size_t > done_
Counter for total number of jobs executed.
Definition: thread_pool.hpp:88
std::deque< Job > jobs_
Deque of scheduled jobs.
Definition: thread_pool.hpp:71
ThreadPool(size_t num_threads=std::thread::hardware_concurrency())
Construct running thread pool of num_threads.
Definition: thread_pool.cpp:17
std::atomic< bool > terminate_
Flag whether to terminate.
Definition: thread_pool.hpp:91
std::condition_variable cv_jobs_
Definition: thread_pool.hpp:81
void loop_until_empty()
Definition: thread_pool.cpp:36
SimpleVector< std::thread > threads_
threads in pool
Definition: thread_pool.hpp:77
~ThreadPool()
Stop processing jobs, terminate threads.
Definition: thread_pool.cpp:24
void worker()
Worker function, one per thread is started.
Definition: thread_pool.cpp:58
std::atomic< size_t > busy_
Counter for number of threads busy.
Definition: thread_pool.hpp:86
size_type size() const noexcept
return number of items in vector
std::string join(char glue, const std::vector< std::string > &parts)
Join a vector of strings by some glue character between each pair from the sequence.
Definition: join.cpp:16
std::condition_variable cv_finished_
Condition variable to signal when a jobs finishes.
Definition: thread_pool.hpp:83
void loop_until_terminate()
Loop until terminate flag was set.
Definition: thread_pool.cpp:42
std::mutex mutex_
Mutex used to access the queue of scheduled jobs.
Definition: thread_pool.hpp:74