Thrill  0.1
write_pool.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  * foxxll/mng/write_pool.hpp
3  *
4  * Part of FOXXLL. See http://foxxll.org
5  *
6  * Copyright (C) 2003-2004 Roman Dementiev <[email protected]>
7  * Copyright (C) 2009 Andreas Beckmann <[email protected]>
8  *
9  * Distributed under the Boost Software License, Version 1.0.
10  * (See accompanying file LICENSE_1_0.txt or copy at
11  * http://www.boost.org/LICENSE_1_0.txt)
12  **************************************************************************/
13 
14 #ifndef FOXXLL_MNG_WRITE_POOL_HEADER
15 #define FOXXLL_MNG_WRITE_POOL_HEADER
16 
17 #include <cassert>
18 
19 #include <algorithm>
20 #include <list>
21 #include <utility>
22 
23 #include <tlx/define.hpp>
24 
25 #include <foxxll/config.hpp>
27 
28 #define FOXXLL_VERBOSE_WPOOL(msg) \
29  TLX_LOG << "write_pool[" << static_cast<void*>(this) << "]" << msg
30 
31 namespace foxxll {
32 
33 //! \addtogroup foxxll_schedlayer
34 //! \{
35 
36 //! Implements dynamically resizable buffered writing pool.
37 template <class BlockType>
39 {
40  constexpr static bool debug = false;
41 
42 public:
43  using block_type = BlockType;
44  using bid_type = typename block_type::bid_type;
45 
46  // a hack to make wait_any work with busy_entry type
47  struct busy_entry
48  {
52 
53  busy_entry() : block(nullptr) { }
54  busy_entry(const busy_entry& a) : block(a.block), req(a.req), bid(a.bid) { }
56  : block(bl), req(r), bid(bi) { }
57 
58  operator request_ptr () { return req; }
59  };
60  using free_blocks_iterator = typename std::list<block_type*>::iterator;
61  using busy_blocks_iterator = typename std::list<busy_entry>::iterator;
62 
63 protected:
64  // contains free write blocks
65  std::list<block_type*> free_blocks;
66  // blocks that are in writing
67  std::list<busy_entry> busy_blocks;
68 
69 public:
70  //! Constructs pool.
71  //! \param init_size initial number of blocks in the pool
72  explicit write_pool(size_t init_size = 1)
73  {
74  for (size_t i = 0; i < init_size; ++i)
75  {
76  free_blocks.push_back(new block_type);
77  FOXXLL_VERBOSE_WPOOL(" create block=" << free_blocks.back());
78  }
79  }
80 
81  //! non-copyable: delete copy-constructor
82  write_pool(const write_pool&) = delete;
83  //! non-copyable: delete assignment operator
84  write_pool& operator = (const write_pool&) = delete;
85 
86  void swap(write_pool& obj)
87  {
88  std::swap(free_blocks, obj.free_blocks);
89  std::swap(busy_blocks, obj.busy_blocks);
90  }
91 
92  //! Waits for completion of all ongoing write requests and frees memory.
94  {
96  "::~write_pool free_blocks.size()=" << free_blocks.size() <<
97  " busy_blocks.size()=" << busy_blocks.size()
98  );
99  while (!free_blocks.empty())
100  {
101  FOXXLL_VERBOSE_WPOOL(" delete free block=" << free_blocks.back());
102  delete free_blocks.back();
103  free_blocks.pop_back();
104  }
105 
106  try
107  {
108  for (busy_blocks_iterator i2 = busy_blocks.begin(); i2 != busy_blocks.end(); ++i2)
109  {
110  i2->req->wait();
111  if (free_blocks.empty())
112  FOXXLL_VERBOSE_WPOOL(" delete busy block=(empty)");
113  else
114  FOXXLL_VERBOSE_WPOOL(" delete busy block=" << free_blocks.back());
115  delete i2->block;
116  }
117  }
118  catch (...)
119  { }
120  }
121 
122  //! Returns number of owned blocks.
123  size_t size() const { return free_blocks.size() + busy_blocks.size(); }
124 
125  //! Passes a block to the pool for writing.
126  //! \param block block to write. Ownership of the block goes to the pool.
127  //! \c block must be allocated dynamically with using \c new .
128  //! \param bid location, where to write
129  //! \warning \c block must be allocated dynamically with using \c new .
130  //! \return request object of the write operation
132  {
133  FOXXLL_VERBOSE_WPOOL("::write: " << block << " @ " << bid);
134  for (busy_blocks_iterator i2 = busy_blocks.begin(); i2 != busy_blocks.end(); ++i2)
135  {
136  if (i2->bid == bid) {
137  assert(i2->block != block);
138  FOXXLL_VERBOSE_WPOOL("WAW dependency");
139  // try to cancel the obsolete request
140  i2->req->cancel();
141  // invalidate the bid of the stale write request,
142  // prevents prefetch_pool from stealing a stale block
143  i2->bid.storage = 0;
144  }
145  }
146  request_ptr result = block->write(bid);
147  busy_blocks.push_back(busy_entry(block, result, bid));
148  block = nullptr; // prevent caller from using the block any further
149  return result;
150  }
151 
152  //! Take out a block from the pool.
153  //! \return pointer to the block. Ownership of the block goes to the caller.
155  {
156  assert(size() > 0);
157  if (!free_blocks.empty())
158  {
159  block_type* p = free_blocks.back();
160  FOXXLL_VERBOSE_WPOOL("::steal : " << free_blocks.size() << " free blocks available, serve block=" << p);
161  free_blocks.pop_back();
162  return p;
163  }
164  FOXXLL_VERBOSE_WPOOL("::steal : all " << busy_blocks.size() << " are busy");
165  busy_blocks_iterator completed = wait_any(busy_blocks.begin(), busy_blocks.end());
166  assert(completed != busy_blocks.end()); // we got something reasonable from wait_any
167  assert(completed->req->poll()); // and it is *really* completed
168  block_type* p = completed->block;
169  busy_blocks.erase(completed);
170  check_all_busy(); // for debug
171  FOXXLL_VERBOSE_WPOOL(" serve block=" << p);
172  return p;
173  }
174 
175  //! Resizes size of the pool.
176  //! \param new_size new size of the pool after the call
177  void resize(size_t new_size)
178  {
179  int64_t diff = int64_t(new_size) - int64_t(size());
180  if (diff > 0)
181  {
182  while (--diff >= 0)
183  {
184  free_blocks.push_back(new block_type);
185  FOXXLL_VERBOSE_WPOOL(" create block=" << free_blocks.back());
186  }
187 
188  return;
189  }
190 
191  while (++diff <= 0)
192  delete steal();
193  }
194 
196  {
197  for (busy_blocks_iterator i2 = busy_blocks.begin(); i2 != busy_blocks.end(); ++i2)
198  {
199  if (i2->bid == bid)
200  return true;
201  }
202  return false;
203  }
204 
205  // returns a block and a (potentially unfinished) I/O request associated with it
206  std::pair<block_type*, request_ptr> steal_request(bid_type bid)
207  {
208  for (busy_blocks_iterator i2 = busy_blocks.begin(); i2 != busy_blocks.end(); ++i2)
209  {
210  if (i2->bid == bid)
211  {
212  // remove busy block from list, request has not yet been waited for!
213  block_type* blk = i2->block;
214  request_ptr req = i2->req;
215  busy_blocks.erase(i2);
216 
217  FOXXLL_VERBOSE_WPOOL("::steal_request block=" << blk);
218  // hand over block and (unfinished) request to caller
219  return std::pair<block_type*, request_ptr>(blk, req);
220  }
221  }
222  FOXXLL_VERBOSE_WPOOL("::steal_request NOT FOUND");
223  // not matching request found, return a dummy
224  return std::pair<block_type*, request_ptr>(nullptr, request_ptr());
225  }
226 
228  {
229  FOXXLL_VERBOSE_WPOOL("::add " << block);
230  free_blocks.push_back(block);
231  block = nullptr; // prevent caller from using the block any further
232  }
233 
234 protected:
236  {
237  busy_blocks_iterator cur = busy_blocks.begin();
238  size_t cnt = 0;
239  while (cur != busy_blocks.end())
240  {
241  if (cur->req->poll())
242  {
243  free_blocks.push_back(cur->block);
244  cur = busy_blocks.erase(cur);
245  ++cnt;
246  continue;
247  }
248  ++cur;
249  }
251  "::check_all_busy : " << cnt <<
252  " are completed out of " << busy_blocks.size() + cnt << " busy blocks"
253  );
254  }
255 };
256 
257 #undef FOXXLL_VERBOSE_WPOOL
258 
259 //! \}
260 
261 } // namespace foxxll
262 
263 namespace std {
264 
265 template <class BlockType>
268 {
269  a.swap(b);
270 }
271 
272 } // namespace std
273 
274 #endif // !FOXXLL_MNG_WRITE_POOL_HEADER
275 
276 /**************************************************************************/
BlockType block_type
Definition: write_pool.hpp:43
#define FOXXLL_VERBOSE_WPOOL(msg)
Definition: write_pool.hpp:28
bool has_request(bid_type bid)
Definition: write_pool.hpp:195
typename std::list< block_type * >::iterator free_blocks_iterator
Definition: write_pool.hpp:60
Definition: write_pool.hpp:47
tlx::counting_ptr< request > request_ptr
A reference counting pointer for request.
Definition: request.hpp:43
Implements dynamically resizable buffered writing pool.
Definition: write_pool.hpp:38
~write_pool()
Waits for completion of all ongoing write requests and frees memory.
Definition: write_pool.hpp:93
void resize(size_t new_size)
Definition: write_pool.hpp:177
static constexpr bool debug
Definition: write_pool.hpp:40
write_pool & operator=(const write_pool &)=delete
non-copyable: delete assignment operator
request_ptr req
Definition: write_pool.hpp:50
std::pair< block_type *, request_ptr > steal_request(bid_type bid)
Definition: write_pool.hpp:206
STL namespace.
std::list< block_type * > free_blocks
Definition: write_pool.hpp:65
busy_entry()
Definition: write_pool.hpp:53
block_type * block
Definition: write_pool.hpp:49
busy_entry(const busy_entry &a)
Definition: write_pool.hpp:54
typename block_type::bid_type bid_type
Definition: write_pool.hpp:44
FOXXLL library namespace
std::list< busy_entry > busy_blocks
Definition: write_pool.hpp:67
High-performance smart pointer used as a wrapping reference counting pointer.
bid_type bid
Definition: write_pool.hpp:51
void add(block_type *&block)
Definition: write_pool.hpp:227
RequestIterator wait_any(RequestIterator reqs_begin, RequestIterator reqs_end)
write_pool(size_t init_size=1)
Definition: write_pool.hpp:72
size_t size() const
Returns number of owned blocks.
Definition: write_pool.hpp:123
request_ptr write(block_type *&block, bid_type bid)
Definition: write_pool.hpp:131
block_type * steal()
Definition: write_pool.hpp:154
void swap(write_pool &obj)
Definition: write_pool.hpp:86
busy_entry(block_type *&bl, request_ptr &r, bid_type &bi)
Definition: write_pool.hpp:55
typename std::list< busy_entry >::iterator busy_blocks_iterator
Definition: write_pool.hpp:61