Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
request_queue_impl_qwqr.cpp
Go to the documentation of this file.
1 /***************************************************************************
2  * foxxll/io/request_queue_impl_qwqr.cpp
3  *
4  * Part of FOXXLL. See http://foxxll.org
5  *
6  * Copyright (C) 2002-2005 Roman Dementiev <[email protected]>
7  * Copyright (C) 2008, 2009 Andreas Beckmann <[email protected]>
8  * Copyright (C) 2009 Johannes Singler <[email protected]>
9  * Copyright (C) 2013 Timo Bingmann <[email protected]>
10  *
11  * Distributed under the Boost Software License, Version 1.0.
12  * (See accompanying file LICENSE_1_0.txt or copy at
13  * http://www.boost.org/LICENSE_1_0.txt)
14  **************************************************************************/
15 
16 #include <algorithm>
17 #include <functional>
18 
19 #include <tlx/logger/core.hpp>
20 
24 
25 #if FOXXLL_MSVC >= 1700 && FOXXLL_MSVC <= 1800
26  #include <windows.h>
27 #endif
28 
29 #ifndef FOXXLL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
30 #define FOXXLL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION 1
31 #endif
32 
33 namespace foxxll {
34 
35 struct file_offset_match
36  : public std::binary_function<request_ptr, request_ptr, bool>
37 {
38  bool operator () (
39  const request_ptr& a,
40  const request_ptr& b) const
41  {
42  // matching file and offset are enough to cause problems
43  return (a->offset() == b->offset()) &&
44  (a->get_file() == b->get_file());
45  }
46 };
47 
49  : thread_state_(NOT_RUNNING), sem_(0)
50 {
51  tlx::unused(n);
52  start_thread(worker, static_cast<void*>(this), thread_, thread_state_);
53 }
54 
56 {
57  //_priority_op = op;
58  tlx::unused(op);
59 }
60 
62 {
63  if (req.empty())
64  FOXXLL_THROW_INVALID_ARGUMENT("Empty request submitted to disk_queue.");
65  if (thread_state_() != RUNNING)
66  FOXXLL_THROW_INVALID_ARGUMENT("Request submitted to not running queue.");
67  if (!dynamic_cast<serving_request*>(req.get()))
68  TLX_LOG1 << "Incompatible request submitted to running queue.";
69 
70  if (req.get()->op() == request::READ)
71  {
72 #if FOXXLL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
73  {
74  std::unique_lock<std::mutex> lock(write_mutex_);
75  if (std::find_if(
76  write_queue_.begin(), write_queue_.end(),
77  bind2nd(file_offset_match(), req)
78  )
79  != write_queue_.end())
80  {
81  TLX_LOG1 << "READ request submitted for a BID with a pending WRITE request";
82  }
83  }
84 #endif
85  std::unique_lock<std::mutex> lock(read_mutex_);
86  read_queue_.push_back(req);
87  }
88  else
89  {
90 #if FOXXLL_CHECK_FOR_PENDING_REQUESTS_ON_SUBMISSION
91  {
92  std::unique_lock<std::mutex> lock(read_mutex_);
93  if (std::find_if(
94  read_queue_.begin(), read_queue_.end(),
95  bind2nd(file_offset_match(), req)
96  )
97  != read_queue_.end())
98  {
99  TLX_LOG1 << "WRITE request submitted for a BID with a pending READ request";
100  }
101  }
102 #endif
103  std::unique_lock<std::mutex> lock(write_mutex_);
104  write_queue_.push_back(req);
105  }
106 
107  sem_.signal();
108 }
109 
111 {
112  if (req.empty())
113  FOXXLL_THROW_INVALID_ARGUMENT("Empty request canceled disk_queue.");
114  if (thread_state_() != RUNNING)
115  FOXXLL_THROW_INVALID_ARGUMENT("Request canceled to not running queue.");
116  if (!dynamic_cast<serving_request*>(req.get()))
117  TLX_LOG1 << "Incompatible request submitted to running queue.";
118 
119  bool was_still_in_queue = false;
120  if (req.get()->op() == request::READ)
121  {
122  std::unique_lock<std::mutex> lock(read_mutex_);
123  queue_type::iterator pos
124  = std::find(read_queue_.begin(), read_queue_.end(), req);
125  if (pos != read_queue_.end())
126  {
127  read_queue_.erase(pos);
128  was_still_in_queue = true;
129  lock.unlock();
130  sem_.wait();
131  }
132  }
133  else
134  {
135  std::unique_lock<std::mutex> lock(write_mutex_);
136  queue_type::iterator pos
137  = std::find(write_queue_.begin(), write_queue_.end(), req);
138  if (pos != write_queue_.end())
139  {
140  write_queue_.erase(pos);
141  was_still_in_queue = true;
142  lock.unlock();
143  sem_.wait();
144  }
145  }
146 
147  return was_still_in_queue;
148 }
149 
151 {
153 }
154 
156 {
157  self* pthis = static_cast<self*>(arg);
158 
159  bool write_phase = true;
160  for ( ; ; )
161  {
162  pthis->sem_.wait();
163 
164  if (write_phase)
165  {
166  std::unique_lock<std::mutex> write_lock(pthis->write_mutex_);
167  if (!pthis->write_queue_.empty())
168  {
169  request_ptr req = pthis->write_queue_.front();
170  pthis->write_queue_.pop_front();
171 
172  write_lock.unlock();
173 
174  //assert(req->get_reference_count()) > 1);
175  dynamic_cast<serving_request*>(req.get())->serve();
176  }
177  else
178  {
179  write_lock.unlock();
180 
181  pthis->sem_.signal();
182 
183  if (pthis->priority_op_ == WRITE)
184  write_phase = false;
185  }
186 
187  if (pthis->priority_op_ == NONE || pthis->priority_op_ == READ)
188  write_phase = false;
189  }
190  else
191  {
192  std::unique_lock<std::mutex> read_lock(pthis->read_mutex_);
193 
194  if (!pthis->read_queue_.empty())
195  {
196  request_ptr req = pthis->read_queue_.front();
197  pthis->read_queue_.pop_front();
198 
199  read_lock.unlock();
200 
201  TLX_LOG << "queue: before serve request has "
202  << req->reference_count() << " references ";
203  //assert(req->get_reference_count() > 1);
204  dynamic_cast<serving_request*>(req.get())->serve();
205  TLX_LOG << "queue: after serve request has "
206  << req->reference_count() << " references ";
207  }
208  else
209  {
210  read_lock.unlock();
211 
212  pthis->sem_.signal();
213 
214  if (pthis->priority_op_ == READ)
215  write_phase = true;
216  }
217 
218  if (pthis->priority_op_ == NONE || pthis->priority_op_ == WRITE)
219  write_phase = true;
220  }
221 
222  // terminate if it has been requested and queues are empty
223  if (pthis->thread_state_() == TERMINATING) {
224  if (pthis->sem_.wait() == 0)
225  break;
226  else
227  pthis->sem_.signal();
228  }
229  }
230 
231  pthis->thread_state_.set_to(TERMINATED);
232 
233 #if FOXXLL_MSVC >= 1700 && FOXXLL_MSVC <= 1800
234  // Workaround for deadlock bug in Visual C++ Runtime 2012 and 2013, see
235  // request_queue_impl_worker.cpp. -tb
236  ExitThread(nullptr);
237 #else
238  return nullptr;
239 #endif
240 }
241 
242 } // namespace foxxll
243 
244 /**************************************************************************/
#define FOXXLL_THROW_INVALID_ARGUMENT(error_message)
Throws std::invalid_argument with "Error in [function] : [error_message]".
tlx::counting_ptr< request > request_ptr
A reference counting pointer for request.
Definition: request.hpp:43
size_t wait(size_t delta=1)
Definition: semaphore.hpp:60
void add_request(request_ptr &req) final
void unused(Types &&...)
Definition: unused.hpp:20
Type * get() const noexcept
return the enclosed pointer.
void set_priority_op(const priority_op &op) final
size_t signal()
Definition: semaphore.hpp:44
void stop_thread(std::thread &t, shared_state< thread_state > &s, tlx::semaphore &sem)
High-performance smart pointer used as a wrapping reference counting pointer.
bool cancel_request(request_ptr &req) final
Request which serves an I/O by calling the synchronous routine of the file.
bool empty() const noexcept
test for a nullptr pointer
shared_state< thread_state > thread_state_
#define TLX_LOG1
Definition: core.hpp:145
void start_thread(void *(*worker)(void *), void *arg, std::thread &t, shared_state< thread_state > &s)
#define TLX_LOG
Default logging method: output if the local debug variable is true.
Definition: core.hpp:141