17 #if FOXXLL_HAVE_LINUXAIO_FILE 19 #include <sys/syscall.h> 34 linuxaio_queue::linuxaio_queue(
int desired_queue_length)
35 : num_waiting_requests_(0), num_free_events_(0), num_posted_requests_(0),
36 post_thread_state_(NOT_RUNNING), wait_thread_state_(NOT_RUNNING)
38 if (desired_queue_length == 0) {
44 max_events_ = desired_queue_length;
49 while ((result = syscall(SYS_io_setup, max_events_, &context_)) == -1 &&
50 errno == EAGAIN && max_events_ > 1)
56 io_error,
"linuxaio_queue::linuxaio_queue" 57 " io_setup() nr_events=" << max_events_
61 num_free_events_.signal(max_events_);
63 TLX_LOG1 <<
"Set up an linuxaio queue with " << max_events_ <<
" entries.";
65 start_thread(post_async, static_cast<void*>(
this), post_thread_, post_thread_state_);
66 start_thread(wait_async, static_cast<void*>(
this), wait_thread_, wait_thread_state_);
69 linuxaio_queue::~linuxaio_queue()
71 stop_thread(post_thread_, post_thread_state_, num_waiting_requests_);
72 stop_thread(wait_thread_, wait_thread_state_, num_posted_requests_);
73 syscall(SYS_io_destroy, context_);
80 if (post_thread_state_() != RUNNING)
81 tlx_die(
"Request submitted to stopped queue.");
82 if (!dynamic_cast<linuxaio_request*>(req.get()))
83 tlx_die(
"Non-LinuxAIO request submitted to LinuxAIO queue.");
85 std::unique_lock<std::mutex> lock(waiting_mtx_);
86 waiting_requests_.push_back(req);
89 num_waiting_requests_.signal();
92 bool linuxaio_queue::cancel_request(
request_ptr& req)
96 if (post_thread_state_() != RUNNING)
97 tlx_die(
"Request canceled in stopped queue.");
99 linuxaio_request* areq =
dynamic_cast<linuxaio_request*
>(req.get());
101 tlx_die(
"Non-LinuxAIO request submitted to LinuxAIO queue.");
103 queue_type::iterator pos;
105 std::unique_lock<std::mutex> lock(waiting_mtx_);
108 waiting_requests_.begin(), waiting_requests_.end(), req
110 if (pos != waiting_requests_.end())
112 waiting_requests_.erase(pos);
116 areq->completed(
false,
true);
118 num_waiting_requests_.wait();
123 std::unique_lock<std::mutex> lock(waiting_mtx_);
126 bool canceled_io_operation = areq->cancel_aio(
this);
128 if (canceled_io_operation)
131 num_free_events_.signal();
134 areq->completed(
true,
true);
136 num_posted_requests_.wait();
144 void linuxaio_queue::post_requests()
151 int num_currently_waiting_requests = num_waiting_requests_.wait();
154 if (post_thread_state_() == TERMINATING &&
155 num_currently_waiting_requests == 0)
158 std::unique_lock<std::mutex> lock(waiting_mtx_);
164 num_waiting_requests_.signal();
169 std::vector<request_ptr> reqs;
172 waiting_requests_.pop_front();
173 reqs.emplace_back(std::move(req));
176 while (!waiting_requests_.empty()) {
178 if (!num_free_events_.try_acquire( 1, 1))
180 if (!num_waiting_requests_.try_acquire()) {
181 num_free_events_.signal();
186 waiting_requests_.pop_front();
187 reqs.emplace_back(std::move(req));
193 num_free_events_.wait();
198 for (
size_t i = 0; i < reqs.size(); ++i) {
200 auto ar =
dynamic_cast<linuxaio_request*
>(reqs[i].get());
201 cbs[i] = ar->fill_control_block();
207 while (cb_done < cbs.size()) {
208 long success = syscall(
209 SYS_io_submit, context_,
210 cbs.size() - cb_done,
214 if (success <= 0 && errno != EAGAIN) {
216 io_error,
"linuxaio_request::post io_submit()" 221 num_posted_requests_.signal(success);
224 if (cb_done == cbs.size())
232 long num_events = syscall(
233 SYS_io_getevents, context_, 0,
234 max_events_, events.data(), nullptr
236 if (num_events < 0) {
238 io_error,
"linuxaio_queue::post_requests" 239 " io_getevents() nr_events=" << num_events
243 handle_events(events.data(), num_events,
false);
248 void linuxaio_queue::handle_events(io_event* events,
long num_events,
bool canceled)
251 num_free_events_.signal(num_events);
253 for (
int e = 0; e < num_events; ++e)
255 request* r =
reinterpret_cast<request*
>(
256 static_cast<uintptr_t
>(events[e].data));
257 r->completed(canceled);
262 num_posted_requests_.wait(num_events);
266 void linuxaio_queue::wait_requests()
273 int num_currently_posted_requests = num_posted_requests_.wait();
276 if (wait_thread_state_() == TERMINATING &&
277 num_currently_posted_requests == 0)
283 num_events = syscall(
284 SYS_io_getevents, context_, 1,
285 max_events_, events.data(), nullptr
288 if (num_events < 0) {
289 if (errno == EINTR) {
296 io_error,
"linuxaio_queue::wait_requests" 297 " io_getevents() nr_events=" << max_events_
304 num_posted_requests_.signal();
306 handle_events(events.data(), num_events,
false);
310 void* linuxaio_queue::post_async(
void* arg)
312 (
static_cast<linuxaio_queue*
>(arg))->post_requests();
314 self_type* pthis =
static_cast<self_type*
>(arg);
315 pthis->post_thread_state_.set_to(TERMINATED);
317 #if FOXXLL_MSVC >= 1700 && FOXXLL_MSVC <= 1800 326 void* linuxaio_queue::wait_async(
void* arg)
328 (
static_cast<linuxaio_queue*
>(arg))->wait_requests();
330 self_type* pthis =
static_cast<self_type*
>(arg);
331 pthis->wait_thread_state_.set_to(TERMINATED);
333 #if FOXXLL_MSVC >= 1700 && FOXXLL_MSVC <= 1800 344 #endif // #if FOXXLL_HAVE_LINUXAIO_FILE
#define FOXXLL_THROW_INVALID_ARGUMENT(error_message)
Throws std::invalid_argument with "Error in [function] : [error_message]".
tlx::counting_ptr< request > request_ptr
A reference counting pointer for request.
Simpler non-growing vector without initialization.
#define FOXXLL_THROW_ERRNO(exception_type, error_message)
Throws exception_type with "Error in [function] : [error_message] : [errno message]".
#define tlx_die(msg)
Instead of std::terminate(), throw the output the message via an exception.