Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
buf_istream.hpp
Go to the documentation of this file.
1 /***************************************************************************
2  * foxxll/mng/buf_istream.hpp
3  *
4  * Part of FOXXLL. See http://foxxll.org
5  *
6  * Copyright (C) 2002-2004 Roman Dementiev <[email protected]>
7  *
8  * Distributed under the Boost Software License, Version 1.0.
9  * (See accompanying file LICENSE_1_0.txt or copy at
10  * http://www.boost.org/LICENSE_1_0.txt)
11  **************************************************************************/
12 
13 #ifndef FOXXLL_MNG_BUF_ISTREAM_HEADER
14 #define FOXXLL_MNG_BUF_ISTREAM_HEADER
15 
16 #include <algorithm>
17 
20 #include <foxxll/mng/config.hpp>
21 
22 namespace foxxll {
23 
24 //! \addtogroup foxxll_schedlayer
25 //! \{
26 
27 // a paranoid check
28 #define BUF_ISTREAM_CHECK_END
29 
30 //! Buffered input stream.
31 //!
32 //! Reads data records from the stream of blocks.
33 //! \remark Reading performed in the background, i.e. with overlapping of I/O and computation
34 template <typename BlockType, typename BidIteratorType>
36 {
37 public:
38  using block_type = BlockType;
39  using bid_iterator_type = BidIteratorType;
40 
41 private:
43 
44 protected:
47  size_t current_elem;
49  size_t* prefetch_seq;
50 #ifdef BUF_ISTREAM_CHECK_END
52 #endif
53 
54 public:
55  using reference = typename block_type::reference;
57 
58  //! Constructs input stream object.
59  //! \param begin \c bid_iterator pointing to the first block of the stream
60  //! \param end \c bid_iterator pointing to the ( \b last + 1 ) block of the stream
61  //! \param nbuffers number of buffers for internal use
62  buf_istream(bid_iterator_type begin, bid_iterator_type end, size_t nbuffers)
63  : current_elem(0)
65  , not_finished(true)
66 #endif
67  {
68  const size_t ndisks = config::get_instance()->disks_number();
69  const size_t mdevid = config::get_instance()->max_device_id();
70  const size_t seq_length = end - begin;
71  prefetch_seq = new size_t[seq_length];
72 
73  // obvious schedule
74  //for(size_t i = 0; i < seq_length; ++i)
75  // prefetch_seq[i] = i;
76 
77  // optimal schedule
78  nbuffers = std::max(2 * ndisks, size_t(nbuffers - 1));
80  begin, end, prefetch_seq,
81  nbuffers, mdevid
82  );
83 
84  prefetcher = new prefetcher_type(begin, end, prefetch_seq, nbuffers);
85 
87  }
88 
89  //! non-copyable: delete copy-constructor
90  buf_istream(const buf_istream&) = delete;
91  //! non-copyable: delete assignment operator
92  buf_istream& operator = (const buf_istream&) = delete;
93 
94  //! Input stream operator, reads in \c record.
95  //! \param record reference to the block record type,
96  //! contains value of the next record in the stream after the call of the operator
97  //! \return reference to itself (stream object)
99  {
100 #ifdef BUF_ISTREAM_CHECK_END
101  assert(not_finished);
102 #endif
103 
104  record = current_blk->elem[current_elem++];
105 
106  if (UNLIKELY(current_elem >= block_type::size))
107  {
108  current_elem = 0;
109 #ifdef BUF_ISTREAM_CHECK_END
111 #else
113 #endif
114  }
115 
116  return (*this);
117  }
118 
119  //! Returns reference to the current record in the stream.
120  reference current() /* const */
121  {
122  return current_blk->elem[current_elem];
123  }
124 
125  //! Returns reference to the current record in the stream.
126  reference operator * () /* const */
127  {
128  return current_blk->elem[current_elem];
129  }
130 
131  //! Moves to the next record in the stream.
132  //! \return reference to itself after the advance
134  {
135 #ifdef BUF_ISTREAM_CHECK_END
136  assert(not_finished);
137 #endif
138 
139  current_elem++;
140 
141  if (UNLIKELY(current_elem >= block_type::size))
142  {
143  current_elem = 0;
144 #ifdef BUF_ISTREAM_CHECK_END
146 #else
148 #endif
149  }
150  return *this;
151  }
152 
153  //! Frees used internal objects.
155  {
156  delete prefetcher;
157  delete[] prefetch_seq;
158  }
159 };
160 
161 //! \}
162 
163 } // namespace foxxll
164 
165 #endif // !FOXXLL_MNG_BUF_ISTREAM_HEADER
166 
167 /**************************************************************************/
typename block_type::reference reference
Definition: buf_istream.hpp:55
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
buf_istream & operator=(const buf_istream &)=delete
non-copyable: delete assignment operator
reference operator*()
Returns reference to the current record in the stream.
block_type * current_blk
Definition: buf_istream.hpp:48
unsigned int max_device_id()
Returns automatic physical device id counter.
Definition: config.cpp:186
~buf_istream()
Frees used internal objects.
self_type & operator>>(reference record)
Definition: buf_istream.hpp:98
void compute_prefetch_schedule(const size_t *first, const size_t *last, size_t *out_first, size_t m, size_t D)
#define BUF_ISTREAM_CHECK_END
Definition: buf_istream.hpp:28
BidIteratorType bid_iterator_type
Definition: buf_istream.hpp:39
reference current()
Returns reference to the current record in the stream.
size_t disks_number()
Definition: config.hpp:195
static instance_pointer get_instance()
Definition: singleton.hpp:44
bool block_consumed(block_type *&buffer)
#define UNLIKELY(c)
Definition: utils.hpp:88
buf_istream(bid_iterator_type begin, bid_iterator_type end, size_t nbuffers)
Definition: buf_istream.hpp:62
block_prefetcher< block_type, bid_iterator_type > prefetcher_type
Definition: buf_istream.hpp:45
self_type & operator++()
prefetcher_type * prefetcher
Definition: buf_istream.hpp:46