Thrill  0.1
buf_istream_reverse.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  * foxxll/mng/buf_istream_reverse.hpp
3  *
4  * Part of FOXXLL. See http://foxxll.org
5  *
6  * Copyright (C) 2002-2004 Roman Dementiev <[email protected]>
7  * Copyright (C) 2013 Timo Bingmann <[email protected]>
8  *
9  * Distributed under the Boost Software License, Version 1.0.
10  * (See accompanying file LICENSE_1_0.txt or copy at
11  * http://www.boost.org/LICENSE_1_0.txt)
12  **************************************************************************/
13 
14 #ifndef FOXXLL_MNG_BUF_ISTREAM_REVERSE_HEADER
15 #define FOXXLL_MNG_BUF_ISTREAM_REVERSE_HEADER
16 
17 #include <algorithm>
18 
20 #include <foxxll/mng/bid.hpp>
22 #include <foxxll/mng/config.hpp>
23 
24 #include <tlx/define/likely.hpp>
25 
26 namespace foxxll {
27 
28 //! \addtogroup foxxll_schedlayer
29 //! \{
30 
31 // a paranoid check
32 #define BUF_ISTREAM_CHECK_END
33 
34 //! Buffered input stream, reading the items in the blocks in reverse order.
35 //!
36 //! Reads data records from the stream of blocks in reverse order.
37 //! \remark Reading performed in the background, i.e. with overlapping of I/O and computation
38 template <typename BlockType, typename BidIteratorType>
40 {
41 public:
42  using block_type = BlockType;
43  using bid_iterator_type = BidIteratorType;
44 
45  //-tb note that we redefine the BID type here, because there is no way to
46  //-derive it from BidIteratorType (which is usually just a POD pointer).
48 
49 private:
51 
52 protected:
55  size_t current_elem;
57  size_t* prefetch_seq;
58 #ifdef BUF_ISTREAM_CHECK_END
60 #endif
62 
63 public:
64  using reference = typename block_type::reference;
66 
67  //! Constructs input stream object, reading [first,last) blocks in reverse.
68  //! \param begin \c bid_iterator pointing to the first block of the stream
69  //! \param end \c bid_iterator pointing to the ( \b last + 1 ) block of the stream
70  //! \param nbuffers number of buffers for internal use
72  : current_elem(0),
74  not_finished(true),
75 #endif
76  bids_(end - begin)
77  {
78  // copy list of bids in reverse
79  std::reverse_copy(begin, end, bids_.begin());
80 
81  // calculate prefetch sequence
82  const size_t ndisks = config::get_instance()->disks_number();
83  const size_t mdevid = config::get_instance()->max_device_id();
84 
85  prefetch_seq = new size_t[bids_.size()];
86 
87  // optimal schedule
88  nbuffers = std::max(2 * ndisks, nbuffers - 1);
90  bids_.begin(), bids_.end(), prefetch_seq,
91  nbuffers, mdevid
92  );
93 
94  // create stream prefetcher
95  prefetcher = new prefetcher_type(bids_.begin(), bids_.end(), prefetch_seq, nbuffers);
96 
97  // fetch block: last in sequence
98  current_blk = prefetcher->pull_block();
99  current_elem = block_type::size - 1;
100  }
101 
102  //! non-copyable: delete copy-constructor
103  buf_istream_reverse(const buf_istream_reverse&) = delete;
104  //! non-copyable: delete assignment operator
106 
107  //! Input stream operator, reads in \c record.
108  //! \param record reference to the block record type,
109  //! contains value of the next record in the stream after the call of the operator
110  //! \return reference to itself (stream object)
112  {
113 #ifdef BUF_ISTREAM_CHECK_END
114  assert(not_finished);
115 #endif
116 
117  record = current_blk->elem[current_elem];
118 
119  if (TLX_UNLIKELY(current_elem == 0))
120  {
121  current_elem = block_type::size - 1;
122 #ifdef BUF_ISTREAM_CHECK_END
123  not_finished = prefetcher->block_consumed(current_blk);
124 #else
125  prefetcher->block_consumed(current_blk);
126 #endif
127  }
128  else
129  {
130  current_elem--;
131  }
132 
133  return (*this);
134  }
135 
136  //! Returns reference to the current record in the stream.
137  reference current() /* const */
138  {
139  return current_blk->elem[current_elem];
140  }
141 
142  //! Returns reference to the current record in the stream.
143  reference operator * () /* const */
144  {
145  return current_blk->elem[current_elem];
146  }
147 
148  //! Moves to the _previous_ record in the stream.
149  //! \return reference to itself after the advance
151  {
152 #ifdef BUF_ISTREAM_CHECK_END
153  assert(not_finished);
154 #endif
155 
156  if (TLX_UNLIKELY(current_elem == 0))
157  {
158  current_elem = block_type::size - 1;
159 #ifdef BUF_ISTREAM_CHECK_END
160  not_finished = prefetcher->block_consumed(current_blk);
161 #else
162  prefetcher->block_consumed(current_blk);
163 #endif
164  }
165  else
166  {
167  current_elem--;
168  }
169 
170  return *this;
171  }
172 
173  //! Frees used internal objects.
175  {
176  delete prefetcher;
177  delete[] prefetch_seq;
178  }
179 };
180 
181 //! \}
182 
183 } // namespace foxxll
184 
185 #endif // !FOXXLL_MNG_BUF_ISTREAM_REVERSE_HEADER
186 
187 /**************************************************************************/
buf_istream_reverse(bid_iterator_type begin, bid_iterator_type end, size_t nbuffers)
iterator end() noexcept
return mutable iterator beyond last element
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
reference operator*()
Returns reference to the current record in the stream.
self_type & operator>>(reference record)
unsigned int max_device_id()
Returns automatic physical device id counter.
Definition: config.cpp:189
Simpler non-growing vector without initialization.
void compute_prefetch_schedule(const size_t *first, const size_t *last, size_t *out_first, size_t m, size_t D)
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
reference current()
Returns reference to the current record in the stream.
#define BUF_ISTREAM_CHECK_END
buf_istream_reverse & operator=(const buf_istream_reverse &)=delete
non-copyable: delete assignment operator
iterator begin() noexcept
return mutable iterator to first element
block_prefetcher< block_type, typename bid_vector_type::iterator > prefetcher_type
size_t disks_number()
Definition: config.hpp:195
size_type size() const noexcept
return number of items in vector
FOXXLL library namespace
static instance_pointer get_instance()
return instance or create base instance if empty
Definition: singleton.hpp:41
typename block_type::reference reference
bool block_consumed(block_type *&buffer)
~buf_istream_reverse()
Frees used internal objects.