Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
write_pool.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  * foxxll/mng/write_pool.hpp
3  *
4  * Part of FOXXLL. See http://foxxll.org
5  *
6  * Copyright (C) 2003-2004 Roman Dementiev <[email protected]>
7  * Copyright (C) 2009 Andreas Beckmann <[email protected]>
8  *
9  * Distributed under the Boost Software License, Version 1.0.
10  * (See accompanying file LICENSE_1_0.txt or copy at
11  * http://www.boost.org/LICENSE_1_0.txt)
12  **************************************************************************/
13 
14 #ifndef FOXXLL_MNG_WRITE_POOL_HEADER
15 #define FOXXLL_MNG_WRITE_POOL_HEADER
16 
17 #include <cassert>
18 
19 #include <algorithm>
20 #include <list>
21 #include <utility>
22 
23 #include <tlx/define.hpp>
24 
25 #include <foxxll/config.hpp>
27 
28 #define FOXXLL_VERBOSE_WPOOL(msg) LOG << "write_pool[" << static_cast<void*>(this) << "]" << msg
29 
30 namespace foxxll {
31 
32 //! \addtogroup foxxll_schedlayer
33 //! \{
34 
35 //! Implements dynamically resizable buffered writing pool.
36 template <class BlockType>
38 {
39  constexpr static bool debug = false;
40 
41 public:
42  using block_type = BlockType;
43  using bid_type = typename block_type::bid_type;
44 
45  // a hack to make wait_any work with busy_entry type
46  struct busy_entry
47  {
51 
52  busy_entry() : block(nullptr) { }
53  busy_entry(const busy_entry& a) : block(a.block), req(a.req), bid(a.bid) { }
55  : block(bl), req(r), bid(bi) { }
56 
57  operator request_ptr () { return req; }
58  };
59  using free_blocks_iterator = typename std::list<block_type*>::iterator;
60  using busy_blocks_iterator = typename std::list<busy_entry>::iterator;
61 
62 protected:
63  // contains free write blocks
64  std::list<block_type*> free_blocks;
65  // blocks that are in writing
66  std::list<busy_entry> busy_blocks;
67 
68 public:
69  //! Constructs pool.
70  //! \param init_size initial number of blocks in the pool
71  explicit write_pool(size_t init_size = 1)
72  {
73  for (size_t i = 0; i < init_size; ++i)
74  {
75  free_blocks.push_back(new block_type);
76  FOXXLL_VERBOSE_WPOOL(" create block=" << free_blocks.back());
77  }
78  }
79 
80  //! non-copyable: delete copy-constructor
81  write_pool(const write_pool&) = delete;
82  //! non-copyable: delete assignment operator
83  write_pool& operator = (const write_pool&) = delete;
84 
85  void swap(write_pool& obj)
86  {
87  std::swap(free_blocks, obj.free_blocks);
88  std::swap(busy_blocks, obj.busy_blocks);
89  }
90 
91  //! Waits for completion of all ongoing write requests and frees memory.
93  {
95  "::~write_pool free_blocks.size()=" << free_blocks.size() <<
96  " busy_blocks.size()=" << busy_blocks.size()
97  );
98  while (!free_blocks.empty())
99  {
100  FOXXLL_VERBOSE_WPOOL(" delete free block=" << free_blocks.back());
101  delete free_blocks.back();
102  free_blocks.pop_back();
103  }
104 
105  try
106  {
107  for (busy_blocks_iterator i2 = busy_blocks.begin(); i2 != busy_blocks.end(); ++i2)
108  {
109  i2->req->wait();
110  if (free_blocks.empty())
111  FOXXLL_VERBOSE_WPOOL(" delete busy block=(empty)");
112  else
113  FOXXLL_VERBOSE_WPOOL(" delete busy block=" << free_blocks.back());
114  delete i2->block;
115  }
116  }
117  catch (...)
118  { }
119  }
120 
121  //! Returns number of owned blocks.
122  size_t size() const { return free_blocks.size() + busy_blocks.size(); }
123 
124  //! Passes a block to the pool for writing.
125  //! \param block block to write. Ownership of the block goes to the pool.
126  //! \c block must be allocated dynamically with using \c new .
127  //! \param bid location, where to write
128  //! \warning \c block must be allocated dynamically with using \c new .
129  //! \return request object of the write operation
131  {
132  FOXXLL_VERBOSE_WPOOL("::write: " << block << " @ " << bid);
133  for (busy_blocks_iterator i2 = busy_blocks.begin(); i2 != busy_blocks.end(); ++i2)
134  {
135  if (i2->bid == bid) {
136  assert(i2->block != block);
137  FOXXLL_VERBOSE_WPOOL("WAW dependency");
138  // try to cancel the obsolete request
139  i2->req->cancel();
140  // invalidate the bid of the stale write request,
141  // prevents prefetch_pool from stealing a stale block
142  i2->bid.storage = 0;
143  }
144  }
145  request_ptr result = block->write(bid);
146  busy_blocks.push_back(busy_entry(block, result, bid));
147  block = nullptr; // prevent caller from using the block any further
148  return result;
149  }
150 
151  //! Take out a block from the pool.
152  //! \return pointer to the block. Ownership of the block goes to the caller.
154  {
155  assert(size() > 0);
156  if (!free_blocks.empty())
157  {
158  block_type* p = free_blocks.back();
159  FOXXLL_VERBOSE_WPOOL("::steal : " << free_blocks.size() << " free blocks available, serve block=" << p);
160  free_blocks.pop_back();
161  return p;
162  }
163  FOXXLL_VERBOSE_WPOOL("::steal : all " << busy_blocks.size() << " are busy");
164  busy_blocks_iterator completed = wait_any(busy_blocks.begin(), busy_blocks.end());
165  assert(completed != busy_blocks.end()); // we got something reasonable from wait_any
166  assert(completed->req->poll()); // and it is *really* completed
167  block_type* p = completed->block;
168  busy_blocks.erase(completed);
169  check_all_busy(); // for debug
170  FOXXLL_VERBOSE_WPOOL(" serve block=" << p);
171  return p;
172  }
173 
174  //! Resizes size of the pool.
175  //! \param new_size new size of the pool after the call
176  void resize(size_t new_size)
177  {
178  int64_t diff = int64_t(new_size) - int64_t(size());
179  if (diff > 0)
180  {
181  while (--diff >= 0)
182  {
183  free_blocks.push_back(new block_type);
184  FOXXLL_VERBOSE_WPOOL(" create block=" << free_blocks.back());
185  }
186 
187  return;
188  }
189 
190  while (++diff <= 0)
191  delete steal();
192  }
193 
195  {
196  for (busy_blocks_iterator i2 = busy_blocks.begin(); i2 != busy_blocks.end(); ++i2)
197  {
198  if (i2->bid == bid)
199  return true;
200  }
201  return false;
202  }
203 
204  // returns a block and a (potentially unfinished) I/O request associated with it
205  std::pair<block_type*, request_ptr> steal_request(bid_type bid)
206  {
207  for (busy_blocks_iterator i2 = busy_blocks.begin(); i2 != busy_blocks.end(); ++i2)
208  {
209  if (i2->bid == bid)
210  {
211  // remove busy block from list, request has not yet been waited for!
212  block_type* blk = i2->block;
213  request_ptr req = i2->req;
214  busy_blocks.erase(i2);
215 
216  FOXXLL_VERBOSE_WPOOL("::steal_request block=" << blk);
217  // hand over block and (unfinished) request to caller
218  return std::pair<block_type*, request_ptr>(blk, req);
219  }
220  }
221  FOXXLL_VERBOSE_WPOOL("::steal_request NOT FOUND");
222  // not matching request found, return a dummy
223  return std::pair<block_type*, request_ptr>(nullptr, request_ptr());
224  }
225 
226  void add(block_type*& block)
227  {
228  FOXXLL_VERBOSE_WPOOL("::add " << block);
229  free_blocks.push_back(block);
230  block = nullptr; // prevent caller from using the block any further
231  }
232 
233 protected:
235  {
236  busy_blocks_iterator cur = busy_blocks.begin();
237  size_t cnt = 0;
238  while (cur != busy_blocks.end())
239  {
240  if (cur->req->poll())
241  {
242  free_blocks.push_back(cur->block);
243  cur = busy_blocks.erase(cur);
244  ++cnt;
245  continue;
246  }
247  ++cur;
248  }
250  "::check_all_busy : " << cnt <<
251  " are completed out of " << busy_blocks.size() + cnt << " busy blocks"
252  );
253  }
254 };
255 
256 #undef FOXXLL_VERBOSE_WPOOL
257 
258 //! \}
259 
260 } // namespace foxxll
261 
262 namespace std {
263 
264 template <class BlockType>
267 {
268  a.swap(b);
269 }
270 
271 } // namespace std
272 
273 #endif // !FOXXLL_MNG_WRITE_POOL_HEADER
274 
275 /**************************************************************************/
BlockType block_type
Definition: write_pool.hpp:42
#define FOXXLL_VERBOSE_WPOOL(msg)
Definition: write_pool.hpp:28
bool has_request(bid_type bid)
Definition: write_pool.hpp:194
typename std::list< block_type * >::iterator free_blocks_iterator
Definition: write_pool.hpp:59
Definition: write_pool.hpp:46
tlx::counting_ptr< request > request_ptr
A reference counting pointer for request.
Definition: request.hpp:43
Implements dynamically resizable buffered writing pool.
Definition: write_pool.hpp:37
~write_pool()
Waits for completion of all ongoing write requests and frees memory.
Definition: write_pool.hpp:92
void resize(size_t new_size)
Definition: write_pool.hpp:176
static constexpr bool debug
Definition: write_pool.hpp:39
write_pool & operator=(const write_pool &)=delete
non-copyable: delete assignment operator
request_ptr req
Definition: write_pool.hpp:49
std::pair< block_type *, request_ptr > steal_request(bid_type bid)
Definition: write_pool.hpp:205
std::list< block_type * > free_blocks
Definition: write_pool.hpp:64
busy_entry()
Definition: write_pool.hpp:52
block_type * block
Definition: write_pool.hpp:48
busy_entry(const busy_entry &a)
Definition: write_pool.hpp:53
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
typename block_type::bid_type bid_type
Definition: write_pool.hpp:43
std::list< busy_entry > busy_blocks
Definition: write_pool.hpp:66
High-performance smart pointer used as a wrapping reference counting pointer.
bid_type bid
Definition: write_pool.hpp:50
void add(block_type *&block)
Definition: write_pool.hpp:226
RequestIterator wait_any(RequestIterator reqs_begin, RequestIterator reqs_end)
size_t size() const
Returns number of owned blocks.
Definition: write_pool.hpp:122
write_pool(size_t init_size=1)
Definition: write_pool.hpp:71
request_ptr write(block_type *&block, bid_type bid)
Definition: write_pool.hpp:130
block_type * steal()
Definition: write_pool.hpp:153
void swap(write_pool &obj)
Definition: write_pool.hpp:85
busy_entry(block_type *&bl, request_ptr &r, bid_type &bi)
Definition: write_pool.hpp:54
typename std::list< busy_entry >::iterator busy_blocks_iterator
Definition: write_pool.hpp:60