Thrill  0.1
linuxaio_queue.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  * foxxll/io/linuxaio_queue.cpp
3  *
4  * Part of FOXXLL. See http://foxxll.org
5  *
6  * Copyright (C) 2011 Johannes Singler <[email protected]>
7  * Copyright (C) 2014 Timo Bingmann <[email protected]>
8  * Copyright (C) 2018 Manuel Penschuck <[email protected]>
9  *
10  * Distributed under the Boost Software License, Version 1.0.
11  * (See accompanying file LICENSE_1_0.txt or copy at
12  * http://www.boost.org/LICENSE_1_0.txt)
13  **************************************************************************/
14 
16 
17 #if FOXXLL_HAVE_LINUXAIO_FILE
18 
19 #include <sys/syscall.h>
20 #include <unistd.h>
21 
22 #include <algorithm>
23 
24 #include <tlx/define/likely.hpp>
25 #include <tlx/die/core.hpp>
26 #include <tlx/logger/core.hpp>
27 
31 
32 namespace foxxll {
33 
34 linuxaio_queue::linuxaio_queue(int desired_queue_length)
35  : num_waiting_requests_(0), num_free_events_(0), num_posted_requests_(0),
36  post_thread_state_(NOT_RUNNING), wait_thread_state_(NOT_RUNNING)
37 {
38  if (desired_queue_length == 0) {
39  // default value, 64 entries per queue (i.e. usually per disk) should
40  // be enough
41  max_events_ = 64;
42  }
43  else
44  max_events_ = desired_queue_length;
45 
46  // negotiate maximum number of simultaneous events with the OS
47  context_ = 0;
48  long result;
49  while ((result = syscall(SYS_io_setup, max_events_, &context_)) == -1 &&
50  errno == EAGAIN && max_events_ > 1)
51  {
52  max_events_ <<= 1; // try with half as many events
53  }
54  if (result != 0) {
56  io_error, "linuxaio_queue::linuxaio_queue"
57  " io_setup() nr_events=" << max_events_
58  );
59  }
60 
61  num_free_events_.signal(max_events_);
62 
63  TLX_LOG1 << "Set up an linuxaio queue with " << max_events_ << " entries.";
64 
65  start_thread(post_async, static_cast<void*>(this), post_thread_, post_thread_state_);
66  start_thread(wait_async, static_cast<void*>(this), wait_thread_, wait_thread_state_);
67 }
68 
69 linuxaio_queue::~linuxaio_queue()
70 {
71  stop_thread(post_thread_, post_thread_state_, num_waiting_requests_);
72  stop_thread(wait_thread_, wait_thread_state_, num_posted_requests_);
73  syscall(SYS_io_destroy, context_);
74 }
75 
76 void linuxaio_queue::add_request(request_ptr& req)
77 {
78  if (req.empty())
79  FOXXLL_THROW_INVALID_ARGUMENT("Empty request submitted to disk_queue.");
80  if (post_thread_state_() != RUNNING)
81  tlx_die("Request submitted to stopped queue.");
82  if (!dynamic_cast<linuxaio_request*>(req.get()))
83  tlx_die("Non-LinuxAIO request submitted to LinuxAIO queue.");
84 
85  std::unique_lock<std::mutex> lock(waiting_mtx_);
86  waiting_requests_.push_back(req);
87  lock.unlock();
88 
89  num_waiting_requests_.signal();
90 }
91 
92 bool linuxaio_queue::cancel_request(request_ptr& req)
93 {
94  if (req.empty())
95  FOXXLL_THROW_INVALID_ARGUMENT("Empty request canceled disk_queue.");
96  if (post_thread_state_() != RUNNING)
97  tlx_die("Request canceled in stopped queue.");
98 
99  linuxaio_request* areq = dynamic_cast<linuxaio_request*>(req.get());
100  if (!areq)
101  tlx_die("Non-LinuxAIO request submitted to LinuxAIO queue.");
102 
103  queue_type::iterator pos;
104  {
105  std::unique_lock<std::mutex> lock(waiting_mtx_);
106 
107  pos = std::find(
108  waiting_requests_.begin(), waiting_requests_.end(), req
109  );
110  if (pos != waiting_requests_.end())
111  {
112  waiting_requests_.erase(pos);
113  lock.unlock();
114 
115  // request is canceled, but was not yet posted.
116  areq->completed(false, true);
117 
118  num_waiting_requests_.wait(); // will never block
119  return true;
120  }
121  }
122 
123  std::unique_lock<std::mutex> lock(waiting_mtx_);
124 
125  // perform syscall to cancel I/O
126  bool canceled_io_operation = areq->cancel_aio(this);
127 
128  if (canceled_io_operation)
129  {
130  lock.unlock();
131  num_free_events_.signal();
132 
133  // request is canceled, already posted, but canceled in kernel
134  areq->completed(true, true);
135 
136  num_posted_requests_.wait(); // will never block
137  return true;
138  }
139 
140  return false;
141 }
142 
143 // internal routines, run by the posting thread
144 void linuxaio_queue::post_requests()
145 {
146  tlx::simple_vector<io_event> events(max_events_);
147 
148  for ( ; ; ) // as long as thread is running
149  {
150  // block until next request or message comes in
151  int num_currently_waiting_requests = num_waiting_requests_.wait();
152 
153  // terminate if termination has been requested
154  if (post_thread_state_() == TERMINATING &&
155  num_currently_waiting_requests == 0)
156  break;
157 
158  std::unique_lock<std::mutex> lock(waiting_mtx_);
159  if (TLX_UNLIKELY(waiting_requests_.empty())) {
160  // unlock queue
161  lock.unlock();
162 
163  // num_waiting_requests_-- was premature, compensate for that
164  num_waiting_requests_.signal();
165  continue;
166  }
167 
168  // collect requests from waiting queue: first is there
169  std::vector<request_ptr> reqs;
170 
171  request_ptr req = waiting_requests_.front();
172  waiting_requests_.pop_front();
173  reqs.emplace_back(std::move(req));
174 
175  // collect additional requests
176  while (!waiting_requests_.empty()) {
177  // acquire one free event, but keep one in slack
178  if (!num_free_events_.try_acquire(/* delta */ 1, /* slack */ 1))
179  break;
180  if (!num_waiting_requests_.try_acquire()) {
181  num_free_events_.signal();
182  break;
183  }
184 
185  request_ptr req = waiting_requests_.front();
186  waiting_requests_.pop_front();
187  reqs.emplace_back(std::move(req));
188  }
189 
190  lock.unlock();
191 
192  // the last free_event must be acquired outside of the lock.
193  num_free_events_.wait();
194 
195  // construct batch iocb
196  tlx::simple_vector<iocb*> cbs(reqs.size());
197 
198  for (size_t i = 0; i < reqs.size(); ++i) {
199  // polymorphic_downcast
200  auto ar = dynamic_cast<linuxaio_request*>(reqs[i].get());
201  cbs[i] = ar->fill_control_block();
202  }
203  reqs.clear();
204 
205  // io_submit loop
206  size_t cb_done = 0;
207  while (cb_done < cbs.size()) {
208  long success = syscall(
209  SYS_io_submit, context_,
210  cbs.size() - cb_done,
211  cbs.data() + cb_done
212  );
213 
214  if (success <= 0 && errno != EAGAIN) {
216  io_error, "linuxaio_request::post io_submit()"
217  );
218  }
219  if (success > 0) {
220  // request is posted
221  num_posted_requests_.signal(success);
222 
223  cb_done += success;
224  if (cb_done == cbs.size())
225  break;
226  }
227 
228  // post failed, so first handle events to make queues (more) empty,
229  // then try again.
230 
231  // wait for at least one event to complete, no time limit
232  long num_events = syscall(
233  SYS_io_getevents, context_, 0,
234  max_events_, events.data(), nullptr
235  );
236  if (num_events < 0) {
238  io_error, "linuxaio_queue::post_requests"
239  " io_getevents() nr_events=" << num_events
240  );
241  }
242  if (num_events > 0)
243  handle_events(events.data(), num_events, false);
244  }
245  }
246 }
247 
248 void linuxaio_queue::handle_events(io_event* events, long num_events, bool canceled)
249 {
250  // first mark all events as free
251  num_free_events_.signal(num_events);
252 
253  for (int e = 0; e < num_events; ++e)
254  {
255  request* r = reinterpret_cast<request*>(
256  static_cast<uintptr_t>(events[e].data));
257  r->completed(canceled);
258  // release counting_ptr reference, this may delete the request object
259  r->dec_reference();
260  }
261 
262  num_posted_requests_.wait(num_events); // will never block
263 }
264 
265 // internal routines, run by the waiting thread
266 void linuxaio_queue::wait_requests()
267 {
268  tlx::simple_vector<io_event> events(max_events_);
269 
270  for ( ; ; ) // as long as thread is running
271  {
272  // might block until next request is posted or message comes in
273  int num_currently_posted_requests = num_posted_requests_.wait();
274 
275  // terminate if termination has been requested
276  if (wait_thread_state_() == TERMINATING &&
277  num_currently_posted_requests == 0)
278  break;
279 
280  // wait for at least one of them to finish
281  long num_events;
282  while (1) {
283  num_events = syscall(
284  SYS_io_getevents, context_, 1,
285  max_events_, events.data(), nullptr
286  );
287 
288  if (num_events < 0) {
289  if (errno == EINTR) {
290  // io_getevents may return prematurely in case a signal is
291  // received
292  continue;
293  }
294 
296  io_error, "linuxaio_queue::wait_requests"
297  " io_getevents() nr_events=" << max_events_
298  );
299  }
300  break;
301  }
302 
303  // compensate for the one eaten prematurely above
304  num_posted_requests_.signal();
305 
306  handle_events(events.data(), num_events, false);
307  }
308 }
309 
310 void* linuxaio_queue::post_async(void* arg)
311 {
312  (static_cast<linuxaio_queue*>(arg))->post_requests();
313 
314  self_type* pthis = static_cast<self_type*>(arg);
315  pthis->post_thread_state_.set_to(TERMINATED);
316 
317 #if FOXXLL_MSVC >= 1700 && FOXXLL_MSVC <= 1800
318  // Workaround for deadlock bug in Visual C++ Runtime 2012 and 2013, see
319  // request_queue_impl_worker.cpp. -tb
320  ExitThread(nullptr);
321 #else
322  return nullptr;
323 #endif
324 }
325 
326 void* linuxaio_queue::wait_async(void* arg)
327 {
328  (static_cast<linuxaio_queue*>(arg))->wait_requests();
329 
330  self_type* pthis = static_cast<self_type*>(arg);
331  pthis->wait_thread_state_.set_to(TERMINATED);
332 
333 #if FOXXLL_MSVC >= 1700 && FOXXLL_MSVC <= 1800
334  // Workaround for deadlock bug in Visual C++ Runtime 2012 and 2013, see
335  // request_queue_impl_worker.cpp. -tb
336  ExitThread(nullptr);
337 #else
338  return nullptr;
339 #endif
340 }
341 
342 } // namespace foxxll
343 
344 #endif // #if FOXXLL_HAVE_LINUXAIO_FILE
345 
346 /**************************************************************************/
#define FOXXLL_THROW_INVALID_ARGUMENT(error_message)
Throws std::invalid_argument with "Error in [function] : [error_message]".
tlx::counting_ptr< request > request_ptr
A reference counting pointer for request.
Definition: request.hpp:43
Simpler non-growing vector without initialization.
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
#define FOXXLL_THROW_ERRNO(exception_type, error_message)
Throws exception_type with "Error in [function] : [error_message] : [errno message]".
FOXXLL library namespace
#define tlx_die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: core.hpp:44
#define TLX_LOG1
Definition: core.hpp:145