Thrill  0.1
buf_istream.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  * foxxll/mng/buf_istream.hpp
3  *
4  * Part of FOXXLL. See http://foxxll.org
5  *
6  * Copyright (C) 2002-2004 Roman Dementiev <[email protected]>
7  *
8  * Distributed under the Boost Software License, Version 1.0.
9  * (See accompanying file LICENSE_1_0.txt or copy at
10  * http://www.boost.org/LICENSE_1_0.txt)
11  **************************************************************************/
12 
13 #ifndef FOXXLL_MNG_BUF_ISTREAM_HEADER
14 #define FOXXLL_MNG_BUF_ISTREAM_HEADER
15 
16 #include <algorithm>
17 
20 #include <foxxll/mng/config.hpp>
21 
22 #include <tlx/define/likely.hpp>
23 
24 namespace foxxll {
25 
26 //! \addtogroup foxxll_schedlayer
27 //! \{
28 
29 // a paranoid check
30 #define BUF_ISTREAM_CHECK_END
31 
32 //! Buffered input stream.
33 //!
34 //! Reads data records from the stream of blocks.
35 //! \remark Reading performed in the background, i.e. with overlapping of I/O and computation
36 template <typename BlockType, typename BidIteratorType>
38 {
39 public:
40  using block_type = BlockType;
41  using bid_iterator_type = BidIteratorType;
42 
43 private:
45 
46 protected:
49  size_t current_elem;
51  size_t* prefetch_seq;
52 #ifdef BUF_ISTREAM_CHECK_END
54 #endif
55 
56 public:
57  using reference = typename block_type::reference;
59 
60  //! Constructs input stream object.
61  //! \param begin \c bid_iterator pointing to the first block of the stream
62  //! \param end \c bid_iterator pointing to the ( \b last + 1 ) block of the stream
63  //! \param nbuffers number of buffers for internal use
64  buf_istream(bid_iterator_type begin, bid_iterator_type end, size_t nbuffers)
65  : current_elem(0)
67  , not_finished(true)
68 #endif
69  {
70  const size_t ndisks = config::get_instance()->disks_number();
71  const size_t mdevid = config::get_instance()->max_device_id();
72  const size_t seq_length = end - begin;
73  prefetch_seq = new size_t[seq_length];
74 
75  // obvious schedule
76  //for(size_t i = 0; i < seq_length; ++i)
77  // prefetch_seq[i] = i;
78 
79  // optimal schedule
80  nbuffers = std::max(2 * ndisks, size_t(nbuffers - 1));
82  begin, end, prefetch_seq,
83  nbuffers, mdevid
84  );
85 
86  prefetcher = new prefetcher_type(begin, end, prefetch_seq, nbuffers);
87 
88  current_blk = prefetcher->pull_block();
89  }
90 
91  //! non-copyable: delete copy-constructor
92  buf_istream(const buf_istream&) = delete;
93  //! non-copyable: delete assignment operator
94  buf_istream& operator = (const buf_istream&) = delete;
95 
96  //! Input stream operator, reads in \c record.
97  //! \param record reference to the block record type,
98  //! contains value of the next record in the stream after the call of the operator
99  //! \return reference to itself (stream object)
101  {
102 #ifdef BUF_ISTREAM_CHECK_END
103  assert(not_finished);
104 #endif
105 
106  record = current_blk->elem[current_elem++];
107 
108  if (TLX_UNLIKELY(current_elem >= block_type::size))
109  {
110  current_elem = 0;
111 #ifdef BUF_ISTREAM_CHECK_END
112  not_finished = prefetcher->block_consumed(current_blk);
113 #else
114  prefetcher->block_consumed(current_blk);
115 #endif
116  }
117 
118  return (*this);
119  }
120 
121  //! Returns reference to the current record in the stream.
122  reference current() /* const */
123  {
124  return current_blk->elem[current_elem];
125  }
126 
127  //! Returns reference to the current record in the stream.
128  reference operator * () /* const */
129  {
130  return current_blk->elem[current_elem];
131  }
132 
133  //! Moves to the next record in the stream.
134  //! \return reference to itself after the advance
136  {
137 #ifdef BUF_ISTREAM_CHECK_END
138  assert(not_finished);
139 #endif
140 
141  current_elem++;
142 
143  if (TLX_UNLIKELY(current_elem >= block_type::size))
144  {
145  current_elem = 0;
146 #ifdef BUF_ISTREAM_CHECK_END
147  not_finished = prefetcher->block_consumed(current_blk);
148 #else
149  prefetcher->block_consumed(current_blk);
150 #endif
151  }
152  return *this;
153  }
154 
155  //! Frees used internal objects.
157  {
158  delete prefetcher;
159  delete[] prefetch_seq;
160  }
161 };
162 
163 //! \}
164 
165 } // namespace foxxll
166 
167 #endif // !FOXXLL_MNG_BUF_ISTREAM_HEADER
168 
169 /**************************************************************************/
typename block_type::reference reference
Definition: buf_istream.hpp:57
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
buf_istream & operator=(const buf_istream &)=delete
non-copyable: delete assignment operator
reference operator*()
Returns reference to the current record in the stream.
block_type * current_blk
Definition: buf_istream.hpp:50
unsigned int max_device_id()
Returns automatic physical device id counter.
Definition: config.cpp:189
~buf_istream()
Frees used internal objects.
self_type & operator>>(reference record)
void compute_prefetch_schedule(const size_t *first, const size_t *last, size_t *out_first, size_t m, size_t D)
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
#define BUF_ISTREAM_CHECK_END
Definition: buf_istream.hpp:30
BidIteratorType bid_iterator_type
Definition: buf_istream.hpp:41
reference current()
Returns reference to the current record in the stream.
size_t disks_number()
Definition: config.hpp:195
FOXXLL library namespace
static instance_pointer get_instance()
return instance or create base instance if empty
Definition: singleton.hpp:41
bool block_consumed(block_type *&buffer)
buf_istream(bid_iterator_type begin, bid_iterator_type end, size_t nbuffers)
Definition: buf_istream.hpp:64
block_prefetcher< block_type, bid_iterator_type > prefetcher_type
Definition: buf_istream.hpp:47
self_type & operator++()
prefetcher_type * prefetcher
Definition: buf_istream.hpp:48