Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
linuxaio_queue.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  * foxxll/io/linuxaio_queue.cpp
3  *
4  * Part of FOXXLL. See http://foxxll.org
5  *
6  * Copyright (C) 2011 Johannes Singler <[email protected]>
7  * Copyright (C) 2014 Timo Bingmann <[email protected]>
8  * Copyright (C) 2018 Manuel Penschuck <[email protected]>
9  *
10  * Distributed under the Boost Software License, Version 1.0.
11  * (See accompanying file LICENSE_1_0.txt or copy at
12  * http://www.boost.org/LICENSE_1_0.txt)
13  **************************************************************************/
14 
16 
17 #if FOXXLL_HAVE_LINUXAIO_FILE
18 
19 #include <sys/syscall.h>
20 #include <unistd.h>
21 
22 #include <algorithm>
23 
24 #include <tlx/die.hpp>
25 #include <tlx/logger/core.hpp>
26 
30 
31 namespace foxxll {
32 
33 linuxaio_queue::linuxaio_queue(int desired_queue_length)
34  : num_waiting_requests_(0), num_free_events_(0), num_posted_requests_(0),
35  post_thread_state_(NOT_RUNNING), wait_thread_state_(NOT_RUNNING)
36 {
37  if (desired_queue_length == 0) {
38  // default value, 64 entries per queue (i.e. usually per disk) should
39  // be enough
40  max_events_ = 64;
41  }
42  else
43  max_events_ = desired_queue_length;
44 
45  // negotiate maximum number of simultaneous events with the OS
46  context_ = 0;
47  long result;
48  while ((result = syscall(SYS_io_setup, max_events_, &context_)) == -1 &&
49  errno == EAGAIN && max_events_ > 1)
50  {
51  max_events_ <<= 1; // try with half as many events
52  }
53  if (result != 0) {
55  io_error, "linuxaio_queue::linuxaio_queue"
56  " io_setup() nr_events=" << max_events_
57  );
58  }
59 
60  num_free_events_.signal(max_events_);
61 
62  TLX_LOG1 << "Set up an linuxaio queue with " << max_events_ << " entries.";
63 
64  start_thread(post_async, static_cast<void*>(this), post_thread_, post_thread_state_);
65  start_thread(wait_async, static_cast<void*>(this), wait_thread_, wait_thread_state_);
66 }
67 
68 linuxaio_queue::~linuxaio_queue()
69 {
70  stop_thread(post_thread_, post_thread_state_, num_waiting_requests_);
71  stop_thread(wait_thread_, wait_thread_state_, num_posted_requests_);
72  syscall(SYS_io_destroy, context_);
73 }
74 
75 void linuxaio_queue::add_request(request_ptr& req)
76 {
77  if (req.empty())
78  FOXXLL_THROW_INVALID_ARGUMENT("Empty request submitted to disk_queue.");
79  if (post_thread_state_() != RUNNING)
80  die("Request submitted to stopped queue.");
81  if (!dynamic_cast<linuxaio_request*>(req.get()))
82  die("Non-LinuxAIO request submitted to LinuxAIO queue.");
83 
84  std::unique_lock<std::mutex> lock(waiting_mtx_);
85 
86  waiting_requests_.push_back(req);
87  num_waiting_requests_.signal();
88 }
89 
90 bool linuxaio_queue::cancel_request(request_ptr& req)
91 {
92  if (req.empty())
93  FOXXLL_THROW_INVALID_ARGUMENT("Empty request canceled disk_queue.");
94  if (post_thread_state_() != RUNNING)
95  die("Request canceled in stopped queue.");
96 
97  linuxaio_request* areq = dynamic_cast<linuxaio_request*>(req.get());
98  if (!areq)
99  die("Non-LinuxAIO request submitted to LinuxAIO queue.");
100 
101  queue_type::iterator pos;
102  {
103  std::unique_lock<std::mutex> lock(waiting_mtx_);
104 
105  pos = std::find(
106  waiting_requests_.begin(), waiting_requests_.end(), req
107  );
108  if (pos != waiting_requests_.end())
109  {
110  waiting_requests_.erase(pos);
111  lock.unlock();
112 
113  // request is canceled, but was not yet posted.
114  areq->completed(false, true);
115 
116  num_waiting_requests_.wait(); // will never block
117  return true;
118  }
119  }
120 
121  std::unique_lock<std::mutex> lock(waiting_mtx_);
122 
123  // perform syscall to cancel I/O
124  bool canceled_io_operation = areq->cancel_aio(this);
125 
126  if (canceled_io_operation)
127  {
128  lock.unlock();
129 
130  // request is canceled, already posted, but canceled in kernel
131  areq->completed(true, true);
132 
133  num_free_events_.signal();
134  num_posted_requests_.wait(); // will never block
135  return true;
136  }
137 
138  return false;
139 }
140 
141 // internal routines, run by the posting thread
142 void linuxaio_queue::post_requests()
143 {
144  request_ptr req;
145  io_event* events = new io_event[max_events_];
146 
147  for ( ; ; ) // as long as thread is running
148  {
149  // might block until next request or message comes in
150  int num_currently_waiting_requests = num_waiting_requests_.wait();
151 
152  // terminate if termination has been requested
153  if (post_thread_state_() == TERMINATING &&
154  num_currently_waiting_requests == 0)
155  break;
156 
157  std::unique_lock<std::mutex> lock(waiting_mtx_);
158  if (!waiting_requests_.empty())
159  {
160  req = waiting_requests_.front();
161  waiting_requests_.pop_front();
162  lock.unlock();
163 
164  num_free_events_.wait(); // might block because too many requests are posted
165 
166  // polymorphic_downcast
167  while (!dynamic_cast<linuxaio_request*>(req.get())->post())
168  {
169  // post failed, so first handle events to make queues (more)
170  // empty, then try again.
171 
172  // wait for at least one event to complete, no time limit
173  long num_events = syscall(
174  SYS_io_getevents, context_, 1, max_events_, events, nullptr
175  );
176  if (num_events < 0) {
178  io_error, "linuxaio_queue::post_requests"
179  " io_getevents() nr_events=" << num_events
180  );
181  }
182 
183  handle_events(events, num_events, false);
184  }
185 
186  // request is finally posted
187  num_posted_requests_.signal();
188  }
189  else
190  {
191  lock.unlock();
192 
193  // num_waiting_requests_-- was premature, compensate for that
194  num_waiting_requests_.signal();
195  }
196  }
197 
198  delete[] events;
199 }
200 
201 void linuxaio_queue::handle_events(io_event* events, long num_events, bool canceled)
202 {
203  for (int e = 0; e < num_events; ++e)
204  {
205  request* r = reinterpret_cast<request*>(
206  static_cast<uintptr_t>(events[e].data));
207  r->completed(canceled);
208  // release counting_ptr reference, this may delete the request object
209  r->dec_reference();
210  num_free_events_.signal();
211  num_posted_requests_.wait(); // will never block
212  }
213 }
214 
215 // internal routines, run by the waiting thread
216 void linuxaio_queue::wait_requests()
217 {
218  request_ptr req;
219  io_event* events = new io_event[max_events_];
220 
221  for ( ; ; ) // as long as thread is running
222  {
223  // might block until next request is posted or message comes in
224  int num_currently_posted_requests = num_posted_requests_.wait();
225 
226  // terminate if termination has been requested
227  if (wait_thread_state_() == TERMINATING &&
228  num_currently_posted_requests == 0)
229  break;
230 
231  // wait for at least one of them to finish
232  long num_events;
233  while (1) {
234  num_events = syscall(
235  SYS_io_getevents, context_, 1, max_events_, events, nullptr
236  );
237 
238  if (num_events < 0) {
239  if (errno == EINTR) {
240  // io_getevents may return prematurely in case a signal is received
241  continue;
242  }
243 
245  io_error, "linuxaio_queue::wait_requests"
246  " io_getevents() nr_events=" << max_events_
247  );
248  }
249  break;
250  }
251 
252  num_posted_requests_.signal(); // compensate for the one eaten prematurely above
253 
254  handle_events(events, num_events, false);
255  }
256 
257  delete[] events;
258 }
259 
260 void* linuxaio_queue::post_async(void* arg)
261 {
262  (static_cast<linuxaio_queue*>(arg))->post_requests();
263 
264  self_type* pthis = static_cast<self_type*>(arg);
265  pthis->post_thread_state_.set_to(TERMINATED);
266 
267 #if FOXXLL_MSVC >= 1700 && FOXXLL_MSVC <= 1800
268  // Workaround for deadlock bug in Visual C++ Runtime 2012 and 2013, see
269  // request_queue_impl_worker.cpp. -tb
270  ExitThread(nullptr);
271 #else
272  return nullptr;
273 #endif
274 }
275 
276 void* linuxaio_queue::wait_async(void* arg)
277 {
278  (static_cast<linuxaio_queue*>(arg))->wait_requests();
279 
280  self_type* pthis = static_cast<self_type*>(arg);
281  pthis->wait_thread_state_.set_to(TERMINATED);
282 
283 #if FOXXLL_MSVC >= 1700 && FOXXLL_MSVC <= 1800
284  // Workaround for deadlock bug in Visual C++ Runtime 2012 and 2013, see
285  // request_queue_impl_worker.cpp. -tb
286  ExitThread(nullptr);
287 #else
288  return nullptr;
289 #endif
290 }
291 
292 } // namespace foxxll
293 
294 #endif // #if FOXXLL_HAVE_LINUXAIO_FILE
295 
296 /**************************************************************************/
#define FOXXLL_THROW_INVALID_ARGUMENT(error_message)
Throws std::invalid_argument with "Error in [function] : [error_message]".
tlx::counting_ptr< request > request_ptr
A reference counting pointer for request.
Definition: request.hpp:43
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
#define FOXXLL_THROW_ERRNO(exception_type, error_message)
Throws exception_type with "Error in [function] : [error_message] : [errno message]".
#define TLX_LOG1
Definition: core.hpp:145