Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
request_queue_impl_1q.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  * foxxll/io/request_queue_impl_1q.cpp
3  *
4  * Part of FOXXLL. See http://foxxll.org
5  *
6  * Copyright (C) 2002-2005 Roman Dementiev <[email protected]>
7  * Copyright (C) 2009 Andreas Beckmann <[email protected]>
8  * Copyright (C) 2009 Johannes Singler <[email protected]>
9  * Copyright (C) 2013 Timo Bingmann <[email protected]>
10  *
11  * Distributed under the Boost Software License, Version 1.0.
12  * (See accompanying file LICENSE_1_0.txt or copy at
13  * http://www.boost.org/LICENSE_1_0.txt)
14  **************************************************************************/
15 
16 #include <algorithm>
17 #include <functional>
18 
19 #include <tlx/logger.hpp>
20 
22 #include <foxxll/config.hpp>
25 
26 #if FOXXLL_MSVC >= 1700 && FOXXLL_MSVC <= 1800
27  #include <windows.h>
28 #endif
29 
30 #ifndef FOXXLL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
31 #define FOXXLL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION 1
32 #endif
33 
34 namespace foxxll {
35 
36 struct file_offset_match
37  : public std::binary_function<request_ptr, request_ptr, bool>
38 {
39  bool operator () (
40  const request_ptr& a,
41  const request_ptr& b) const
42  {
43  // matching file and offset are enough to cause problems
44  return (a->offset() == b->offset()) &&
45  (a->get_file() == b->get_file());
46  }
47 };
48 
50  : thread_state_(NOT_RUNNING), sem_(0)
51 {
52  tlx::unused(n);
53  start_thread(worker, static_cast<void*>(this), thread_, thread_state_);
54 }
55 
56 void request_queue_impl_1q::set_priority_op(const priority_op& op)
57 {
58  //_priority_op = op;
59  tlx::unused(op);
60 }
61 
63 {
64  if (req.empty())
65  FOXXLL_THROW_INVALID_ARGUMENT("Empty request submitted to disk_queue.");
66  if (thread_state_() != RUNNING)
67  FOXXLL_THROW_INVALID_ARGUMENT("Request submitted to not running queue.");
68  if (!dynamic_cast<serving_request*>(req.get()))
69  LOG1 << "Incompatible request submitted to running queue.";
70 
71 #if FOXXLL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
72  {
73  std::unique_lock<std::mutex> lock(queue_mutex_);
74  if (std::find_if(
75  queue_.begin(), queue_.end(),
76  bind2nd(file_offset_match(), req)
77  )
78  != queue_.end())
79  {
80  LOG1 << "request submitted for a BID with a pending request";
81  }
82  }
83 #endif
84  std::unique_lock<std::mutex> lock(queue_mutex_);
85  queue_.push_back(req);
86 
87  sem_.signal();
88 }
89 
91 {
92  if (req.empty())
93  FOXXLL_THROW_INVALID_ARGUMENT("Empty request canceled disk_queue.");
94  if (thread_state_() != RUNNING)
95  FOXXLL_THROW_INVALID_ARGUMENT("Request canceled to not running queue.");
96  if (!dynamic_cast<serving_request*>(req.get()))
97  LOG1 << "Incompatible request submitted to running queue.";
98 
99  bool was_still_in_queue = false;
100  {
101  std::unique_lock<std::mutex> lock(queue_mutex_);
102  queue_type::iterator pos
103  = std::find(queue_.begin(), queue_.end(), req);
104 
105  if (pos != queue_.end())
106  {
107  queue_.erase(pos);
108  was_still_in_queue = true;
109  lock.unlock();
110  sem_.wait();
111  }
112  }
113 
114  return was_still_in_queue;
115 }
116 
118 {
120 }
121 
123 {
124  self* pthis = static_cast<self*>(arg);
125 
126  for ( ; ; )
127  {
128  pthis->sem_.wait();
129 
130  {
131  std::unique_lock<std::mutex> lock(pthis->queue_mutex_);
132  if (!pthis->queue_.empty())
133  {
134  request_ptr req = pthis->queue_.front();
135  pthis->queue_.pop_front();
136 
137  lock.unlock();
138 
139  //assert(req->nref() > 1);
140  dynamic_cast<serving_request*>(req.get())->serve();
141  }
142  else
143  {
144  lock.unlock();
145 
146  pthis->sem_.signal();
147  }
148  }
149 
150  // terminate if it has been requested and queues are empty
151  if (pthis->thread_state_() == TERMINATING) {
152  if (pthis->sem_.wait() == 0)
153  break;
154  else
155  pthis->sem_.signal();
156  }
157  }
158 
159  pthis->thread_state_.set_to(TERMINATED);
160 
161 #if FOXXLL_MSVC >= 1700 && FOXXLL_MSVC <= 1800
162  // Workaround for deadlock bug in Visual C++ Runtime 2012 and 2013, see
163  // request_queue_impl_worker.cpp. -tb
164  ExitThread(nullptr);
165 #else
166  return nullptr;
167 #endif
168 }
169 
170 } // namespace foxxll
171 
172 /**************************************************************************/
#define FOXXLL_THROW_INVALID_ARGUMENT(error_message)
Throws std::invalid_argument with "Error in [function] : [error_message]".
shared_state< thread_state > thread_state_
tlx::counting_ptr< request > request_ptr
A reference counting pointer for request.
Definition: request.hpp:43
#define LOG1
Definition: logger.hpp:145
void add_request(request_ptr &req) final
size_t wait(size_t delta=1)
Definition: semaphore.hpp:60
void unused(Types &&...)
Definition: unused.hpp:20
Type * get() const noexcept
return the enclosed pointer.
void set_priority_op(const priority_op &op) final
bool cancel_request(request_ptr &req) final
size_t signal()
Definition: semaphore.hpp:44
void stop_thread(std::thread &t, shared_state< thread_state > &s, tlx::semaphore &sem)
High-performance smart pointer used as a wrapping reference counting pointer.
Request which serves an I/O by calling the synchronous routine of the file.
bool empty() const noexcept
test for a nullptr pointer
void start_thread(void *(*worker)(void *), void *arg, std::thread &t, shared_state< thread_state > &s)