Thrill  0.1
block_prefetcher.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  * foxxll/mng/block_prefetcher.hpp
3  *
4  * Part of FOXXLL. See http://foxxll.org
5  *
6  * Copyright (C) 2002-2004 Roman Dementiev <[email protected]>
7  * Copyright (C) 2009, 2010 Johannes Singler <[email protected]>
8  * Copyright (C) 2009 Andreas Beckmann <[email protected]>
9  *
10  * Distributed under the Boost Software License, Version 1.0.
11  * (See accompanying file LICENSE_1_0.txt or copy at
12  * http://www.boost.org/LICENSE_1_0.txt)
13  **************************************************************************/
14 
15 #ifndef FOXXLL_MNG_BLOCK_PREFETCHER_HEADER
16 #define FOXXLL_MNG_BLOCK_PREFETCHER_HEADER
17 
18 #include <algorithm>
19 #include <queue>
20 #include <vector>
21 
23 #include <foxxll/io/iostats.hpp>
24 #include <foxxll/io/request.hpp>
25 
26 namespace foxxll {
27 
28 //! \addtogroup foxxll_schedlayer
29 //! \{
30 
32 {
35 
36 public:
38  onoff_switch& _switch, const completion_handler& on_complete)
39  : switch_(_switch), on_complete_(on_complete) { }
40 
41  void operator () (request* req, bool success)
42  {
43  // call before setting switch to on, otherwise, user has no way to wait
44  // for the completion handler to be executed
45  if (on_complete_)
46  on_complete_(req, success);
47  switch_.on();
48  }
49 };
50 
51 //! Encapsulates asynchronous prefetching engine.
52 //!
53 //! \c block_prefetcher overlaps I/Os with consumption of read data.
54 //! Utilizes optimal asynchronous prefetch scheduling (by Peter Sanders et.al.)
55 template <typename BlockType, typename BidIteratorType>
57 {
58  constexpr static bool debug = false;
59 
60 public:
61  using block_type = BlockType;
62  using bid_iterator_type = BidIteratorType;
63 
64  using bid_type = typename block_type::bid_type;
65 
66 protected:
69  size_t seq_length;
70 
71  size_t* prefetch_seq;
72 
73  size_t nextread;
74  size_t nextconsume;
75 
76  const size_t nreadblocks;
77 
81 
83  size_t* pref_buffer;
84 
86 
87  block_type * wait(size_t iblock)
88  {
89  TLX_LOG << "block_prefetcher: waiting block " << iblock;
90  {
92 
93  completed[iblock].wait_for_on();
94  }
95  TLX_LOG << "block_prefetcher: finished waiting block " << iblock;
96  size_t ibuffer = pref_buffer[iblock];
97  TLX_LOG << "block_prefetcher: returning buffer " << ibuffer;
98  assert(ibuffer >= 0 && ibuffer < nreadblocks);
99  return (read_buffers + ibuffer);
100  }
101 
102 public:
103  //! Constructs an object and immediately starts prefetching.
104  //! \param _cons_begin \c bid_iterator pointing to the \c bid of the first block to be consumed
105  //! \param _cons_end \c bid_iterator pointing to the \c bid of the ( \b last + 1 ) block of consumption sequence
106  //! \param _pref_seq gives the prefetch order, is a pointer to the integer array that contains
107  //! the indices of the blocks in the consumption sequence
108  //! \param _prefetch_buf_size amount of prefetch buffers to use
109  //! \param do_after_fetch unknown
111  bid_iterator_type _cons_begin,
112  bid_iterator_type _cons_end,
113  size_t* _pref_seq,
114  size_t _prefetch_buf_size,
115  completion_handler do_after_fetch = completion_handler())
116  : consume_seq_begin(_cons_begin),
117  consume_seq_end(_cons_end),
118  seq_length(_cons_end - _cons_begin),
119  prefetch_seq(_pref_seq),
120  nextread(std::min(_prefetch_buf_size, seq_length)),
121  nextconsume(0),
122  nreadblocks(nextread),
123  do_after_fetch(do_after_fetch)
124  {
125  TLX_LOG << "block_prefetcher: seq_length=" << seq_length;
126  TLX_LOG << "block_prefetcher: _prefetch_buf_size=" << _prefetch_buf_size;
127  assert(seq_length > 0);
128  assert(_prefetch_buf_size > 0);
129  size_t i;
130  read_buffers = new block_type[nreadblocks];
131  read_reqs = new request_ptr[nreadblocks];
132  read_bids = new bid_type[nreadblocks];
133  pref_buffer = new size_t[seq_length];
134 
135  std::fill(pref_buffer, pref_buffer + seq_length, -1);
136 
137  completed = new onoff_switch[seq_length];
138 
139  for (i = 0; i < nreadblocks; ++i)
140  {
141  assert(prefetch_seq[i] < seq_length);
142  read_bids[i] = *(consume_seq_begin + prefetch_seq[i]);
143  TLX_LOG << "block_prefetcher: reading block " << i <<
144  " prefetch_seq[" << i << "]=" << prefetch_seq[i] <<
145  " @ " << &read_buffers[i] <<
146  " @ " << read_bids[i];
147  read_reqs[i] = read_buffers[i].read(
148  read_bids[i],
149  set_switch_handler(*(completed + prefetch_seq[i]), do_after_fetch)
150  );
151  pref_buffer[prefetch_seq[i]] = i;
152  }
153  }
154 
155  //! non-copyable: delete copy-constructor
156  block_prefetcher(const block_prefetcher&) = delete;
157  //! non-copyable: delete assignment operator
158  block_prefetcher& operator = (const block_prefetcher&) = delete;
159 
160  //! Pulls next unconsumed block from the consumption sequence.
161  //! \return Pointer to the already prefetched block from the internal buffer pool
163  {
164  TLX_LOG << "block_prefetcher: pulling a block";
165  return wait(nextconsume++);
166  }
167  //! Exchanges buffers between prefetcher and application.
168  //! \param buffer pointer to the consumed buffer. After call if return value is true \c buffer
169  //! contains valid pointer to the next unconsumed prefetched buffer.
170  //! \remark parameter \c buffer must be value returned by \c pull_block() or \c block_consumed() methods
171  //! \return \c false if there are no blocks to prefetch left, \c true if consumption sequence is not emptied
172  bool block_consumed(block_type*& buffer)
173  {
174  size_t ibuffer = buffer - read_buffers;
175  TLX_LOG << "block_prefetcher: buffer " << ibuffer << " consumed";
176  if (read_reqs[ibuffer].valid())
177  read_reqs[ibuffer]->wait();
178 
179  read_reqs[ibuffer] = nullptr;
180 
181  if (nextread < seq_length)
182  {
183  assert(ibuffer >= 0 && ibuffer < nreadblocks);
184  size_t next_2_prefetch = prefetch_seq[nextread++];
185  TLX_LOG << "block_prefetcher: prefetching block " << next_2_prefetch;
186 
187  assert(next_2_prefetch < seq_length);
188  assert(!completed[next_2_prefetch].is_on());
189 
190  pref_buffer[next_2_prefetch] = ibuffer;
191  read_bids[ibuffer] =
192  bid_type(*(consume_seq_begin + next_2_prefetch));
193  read_reqs[ibuffer] = read_buffers[ibuffer].read(
194  read_bids[ibuffer],
195  set_switch_handler(*(completed + next_2_prefetch), do_after_fetch)
196  );
197  }
198 
199  if (nextconsume >= seq_length)
200  return false;
201 
202  buffer = wait(nextconsume++);
203 
204  return true;
205  }
206 
207  //! No more consumable blocks available, but can't delete the prefetcher,
208  //! because not all blocks may have been returned, yet.
209  bool empty() const
210  {
211  return nextconsume >= seq_length;
212  }
213 
214  //! Index of the next element in the consume sequence.
215  size_t pos() const
216  {
217  return nextconsume;
218  }
219 
220  //! Frees used memory.
222  {
223  for (size_t i = 0; i < nreadblocks; ++i)
224  if (read_reqs[i].valid())
225  read_reqs[i]->wait();
226 
227  delete[] read_reqs;
228  delete[] read_bids;
229  delete[] completed;
230  delete[] pref_buffer;
231  delete[] read_buffers;
232  }
233 };
234 
235 //! \}
236 
237 } // namespace foxxll
238 
239 #endif // !FOXXLL_MNG_BLOCK_PREFETCHER_HEADER
240 
241 /**************************************************************************/
void wait_for_on()
wait for switch to turn ON
BidIteratorType bid_iterator_type
bid_iterator_type consume_seq_begin
block_type * wait(size_t iblock)
block_prefetcher(bid_iterator_type _cons_begin, bid_iterator_type _cons_end, size_t *_pref_seq, size_t _prefetch_buf_size, completion_handler do_after_fetch=completion_handler())
STL namespace.
~block_prefetcher()
Frees used memory.
tlx::delegate< void(request *r, bool success)> completion_handler
completion handler
Definition: request.hpp:46
void on()
turn switch ON and notify one waiter
completion_handler do_after_fetch
FOXXLL library namespace
static constexpr bool debug
bool block_consumed(block_type *&buffer)
High-performance smart pointer used as a wrapping reference counting pointer.
void operator()(request *req, bool success)
typename block_type::bid_type bid_type
size_t pos() const
Index of the next element in the consume sequence.
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
set_switch_handler(onoff_switch &_switch, const completion_handler &on_complete)
completion_handler on_complete_
Request object encapsulating basic properties like file and offset.
Definition: request.hpp:49
bid_iterator_type consume_seq_end
#define TLX_LOG
Default logging method: output if the local debug variable is true.
Definition: core.hpp:141