Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
block_reader.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/block_reader.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_DATA_BLOCK_READER_HEADER
13 #define THRILL_DATA_BLOCK_READER_HEADER
14 
15 #include <thrill/common/config.hpp>
17 #include <thrill/common/logger.hpp>
18 #include <thrill/data/block.hpp>
20 
21 #include <tlx/define.hpp>
22 #include <tlx/die.hpp>
23 #include <tlx/string/hexdump.hpp>
24 
25 #include <algorithm>
26 #include <string>
27 #include <vector>
28 
29 namespace thrill {
30 namespace data {
31 
32 //! \addtogroup data_layer
33 //! \{
34 
35 /*!
36  * BlockReader takes Block objects from BlockSource and allows reading of
37  * a) serializable Items or b) arbitray data from the Block sequence. It takes
38  * care of fetching the next Block when the previous one underruns and also of
39  * data items split between two Blocks.
40  */
41 template <typename BlockSource>
43  : public common::ItemReaderToolsBase<BlockReader<BlockSource> >
44 {
45 public:
46  static constexpr bool self_verify = common::g_self_verify;
47 
48  //! Start reading a BlockSource
49  explicit BlockReader(BlockSource&& source)
50  : source_(std::move(source)) { }
51 
52  //! default constructor
53  BlockReader() = default;
54 
55  //! Return reference to enclosed BlockSource
56  BlockSource& source() { return source_; }
57 
58  //! non-copyable: delete copy-constructor
59  BlockReader(const BlockReader&) = delete;
60  //! non-copyable: delete assignment operator
61  BlockReader& operator = (const BlockReader&) = delete;
62 
63  //! move-constructor: default
64  BlockReader(BlockReader&&) = default;
65  //! move-assignment operator: default
66  BlockReader& operator = (BlockReader&&) = default;
67 
68  //! return current block for debugging
70  if (!block_.byte_block()) return PinnedBlock();
71  return PinnedBlock(
75  }
76 
77  //! return current ByteBlock
78  ByteBlockPtr byte_block() const { return block_.byte_block(); }
79 
80  //! Returns typecode_verify_
81  size_t typecode_verify() const { return typecode_verify_; }
82 
83  //! \name Reading (Generic) Items
84  //! \{
85 
86  //! Next() reads a complete item T
87  template <typename T>
89  T Next() {
90  assert(HasNext());
91  assert(num_items_ > 0);
92  --num_items_;
93 
95  // for self-verification, T is prefixed with its hash code
96  size_t code = GetRaw<size_t>();
97  if (code != typeid(T).hash_code()) {
98  die("BlockReader::Next() attempted to retrieve item "
99  "with different typeid! - expected "
100  << tlx::hexdump_type(typeid(T).hash_code())
101  << " got " << tlx::hexdump_type(code));
102  }
103  }
105  }
106 
107  //! Next() reads a complete item T, without item counter or self
108  //! verification
109  template <typename T>
112  assert(HasNext());
114  }
115 
116  //! HasNext() returns true if at least one more item is available.
118  bool HasNext() {
119  while (current_ == end_) {
120  if (!NextBlock()) {
121  return false;
122  }
123  }
124  return true;
125  }
126 
127  //! Return complete contents until empty as a std::vector<T>. Use this only
128  //! if you are sure that it will fit into memory, -> only use it for tests.
129  template <typename ItemType>
130  std::vector<ItemType> ReadComplete() {
131  std::vector<ItemType> out;
132  while (HasNext()) out.emplace_back(Next<ItemType>());
133  return out;
134  }
135 
136  //! Read n items, however, do not deserialize them but deliver them as a
137  //! vector of (unpinned) Block objects. This is used to take out a range of
138  //! items, the internal item cursor is advanced by n.
139  template <typename ItemType>
140  std::vector<Block> GetItemBatch(size_t n) {
141  static constexpr bool debug = false;
142 
143  std::vector<Block> out;
144  if (n == 0) return out;
145 
146  die_unless(HasNext());
147  assert(block_.IsValid());
148 
149  const Byte* begin_output = current_;
150  size_t first_output = current_ - byte_block()->begin();
151 
152  // inside the if-clause the current_ may not point to a valid item
153  // boundary.
154  if (n >= num_items_)
155  {
156  // *** if the current block still contains items, push it partially
157 
158  if (n >= num_items_) {
159  // construct first Block using current_ pointer
160  out.emplace_back(
161  byte_block(),
162  // valid range: excludes preceding items.
163  current_ - byte_block()->begin(),
164  end_ - byte_block()->begin(),
165  // first item is at begin_ (we may have dropped some)
166  current_ - byte_block()->begin(),
167  // remaining items in this block
168  num_items_,
169  // typecode verify flag
171 
172  sLOG << "partial first:" << out.back();
173 
174  n -= num_items_;
175 
176  // get next block. if not possible -> may be okay since last
177  // item might just terminate the current block.
178  if (!NextBlock()) {
179  assert(n == 0);
180  sLOG << "exit1 after batch.";
181  return out;
182  }
183  }
184 
185  // *** then append complete blocks without deserializing them
186 
187  while (n >= num_items_) {
188  out.emplace_back(
189  byte_block(),
190  // full range is valid.
191  current_ - byte_block()->begin(),
192  end_ - byte_block()->begin(),
194  // typecode verify flag
196 
197  sLOG << "middle:" << out.back();
198 
199  n -= num_items_;
200 
201  if (!NextBlock()) {
202  assert(n == 0);
203  sLOG << "exit2 after batch.";
204  return out;
205  }
206  }
207 
208  // move current_ to the first valid item of the block we got (at
209  // least one NextBlock() has been called). But when constructing the
210  // last Block, we have to include the partial item in the
211  // front.
212  begin_output = current_;
213  first_output = block_.first_item_absolute();
214 
216  }
217 
218  // put prospective last block into vector.
219 
220  out.emplace_back(
221  byte_block(),
222  // full range is valid.
223  begin_output - byte_block()->begin(), end_ - byte_block()->begin(),
224  first_output, n,
225  // typecode verify flag
227 
228  // skip over remaining items in this block, there while collect all
229  // blocks needed for those items via block_collect_. There can be more
230  // than one block necessary for Next if an item is large!
231 
232  std::vector<PinnedBlock> out_pinned;
233 
234  block_collect_ = &out_pinned;
236  Skip(n, n * ((self_verify && typecode_verify_ ? sizeof(size_t) : 0) +
238  }
239  else {
240  while (n > 0) {
241  Next<ItemType>();
242  --n;
243  }
244  }
245  block_collect_ = nullptr;
246 
247  for (PinnedBlock& pb : out_pinned)
248  out.emplace_back(std::move(pb).MoveToBlock());
249  out_pinned.clear();
250 
251  out.back().set_end(current_ - byte_block()->begin());
252 
253  sLOG << "partial last:" << out.back();
254 
255  sLOG << "exit3 after batch:"
256  << "current_=" << current_ - byte_block()->begin();
257 
258  return out;
259  }
260 
261  //! \}
262 
263  //! \name Cursor Reading Methods
264  //! \{
265 
266  //! Fetch a number of unstructured bytes from the current block, advancing
267  //! the cursor.
268  BlockReader& Read(void* outdata, size_t size) {
269 
270  Byte* cdata = reinterpret_cast<Byte*>(outdata);
271 
272  while (TLX_UNLIKELY(current_ + size > end_)) {
273  // partial copy of remainder of block
274  size_t partial_size = end_ - current_;
275  std::copy(current_, current_ + partial_size, cdata);
276 
277  cdata += partial_size;
278  size -= partial_size;
279 
280  if (!NextBlock())
281  throw std::runtime_error("Data underflow in BlockReader.");
282  }
283 
284  // copy rest from current block
285  std::copy(current_, current_ + size, cdata);
286  current_ += size;
287 
288  return *this;
289  }
290 
291  //! Fetch a number of unstructured bytes from the buffer as std::string,
292  //! advancing the cursor.
293  std::string Read(size_t datalen) {
294  std::string out(datalen, 0);
295  Read(const_cast<char*>(out.data()), out.size());
296  return out;
297  }
298 
299  //! Advance the cursor given number of bytes without reading them.
300  BlockReader& Skip(size_t items, size_t bytes) {
301  while (TLX_UNLIKELY(current_ + bytes > end_)) {
302  bytes -= end_ - current_;
303  // deduct number of remaining items in skipped block from item skip
304  // counter.
305  items -= num_items_;
306  if (!NextBlock())
307  throw std::runtime_error("Data underflow in BlockReader.");
308  }
309  current_ += bytes;
310  // the last line skipped over the remaining "items" number of items.
311  num_items_ -= items;
312  return *this;
313  }
314 
315  //! Fetch a single byte from the current block, advancing the cursor.
317  // loop, since blocks can actually be empty.
318  while (TLX_UNLIKELY(current_ == end_)) {
319  if (!NextBlock())
320  throw std::runtime_error("Data underflow in BlockReader.");
321  }
322  return *current_++;
323  }
324 
325  //! Fetch a single item of the template type Type from the buffer,
326  //! advancing the cursor. Be careful with implicit type conversions!
327  template <typename Type>
330  static_assert(std::is_pod<Type>::value,
331  "You only want to GetRaw() POD types as raw values.");
332 
333  Type ret;
334 
335  // fast path for reading item from block if it fits.
336  if (TLX_LIKELY(current_ + sizeof(Type) <= end_)) {
337  ret = *reinterpret_cast<const Type*>(current_);
338  current_ += sizeof(Type);
339  }
340  else {
341  Read(&ret, sizeof(ret));
342  }
343 
344  return ret;
345  }
346 
347  //! \}
348 
349 private:
350  //! Instance of BlockSource. This is NOT a reference, as to enable embedding
351  //! of FileBlockSource to compose classes into File::Reader.
352  BlockSource source_;
353 
354  //! The current block being read, this holds a shared pointer reference.
356 
357  //! current read pointer into current block of file.
358  const Byte* current_ = nullptr;
359 
360  //! pointer to end of current block.
361  const Byte* end_ = nullptr;
362 
363  //! remaining number of items starting in this block
364  size_t num_items_ = 0;
365 
366  //! pointer to vector to collect blocks in GetItemRange.
367  std::vector<PinnedBlock>* block_collect_ = nullptr;
368 
369  //! flag whether the underlying data contains self verify type codes from
370  //! BlockReader, this is false to needed to read external files.
372 
373  //! Call source_.NextBlock with appropriate parameters
374  bool NextBlock() {
375  // first release old pin.
376  block_.Reset();
377  // request next pinned block
378  block_ = source_.NextBlock();
379  sLOG0 << "BlockReader::NextBlock" << block_;
380 
381  if (!block_.IsValid()) return false;
382 
383  if (block_collect_)
384  block_collect_->emplace_back(block_);
385 
386  current_ = block_.data_begin();
387  end_ = block_.data_end();
388  num_items_ = block_.num_items();
389  typecode_verify_ = block_.typecode_verify();
390  return true;
391  }
392 };
393 
394 //! \}
395 
396 } // namespace data
397 } // namespace thrill
398 
399 #endif // !THRILL_DATA_BLOCK_READER_HEADER
400 
401 /******************************************************************************/
std::vector< Block > GetItemBatch(size_t n)
const ByteBlockPtr & byte_block() const
access to byte_block_
Definition: block.hpp:226
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
std::vector< ItemType > ReadComplete()
#define die_unless(X)
Definition: die.hpp:27
PinnedByteBlockPtr CopyPinnedByteBlock() const
Definition: block.hpp:298
#define TLX_ATTRIBUTE_ALWAYS_INLINE
double T
Type
VFS object type.
Definition: file_io.hpp:52
std::string Read(size_t datalen)
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
BlockReader & Read(void *outdata, size_t size)
BlockReader(BlockSource &&source)
Start reading a BlockSource.
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
CRTP class to enhance item/memory reader classes with Varint decoding and String decoding.
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
BlockReader & operator=(const BlockReader &)=delete
non-copyable: delete assignment operator
bool NextBlock()
Call source_.NextBlock with appropriate parameters.
size_t num_items_
remaining number of items starting in this block
static constexpr bool g_self_verify
Definition: config.hpp:32
#define TLX_LIKELY(c)
Definition: likely.hpp:23
void Reset()
release pin on block and reset Block pointer to nullptr
Definition: block.hpp:271
#define sLOG0
Override default output: never or always output log.
Definition: logger.hpp:37
ByteBlockPtr byte_block() const
return current ByteBlock
uint8_t Byte
type of underlying memory area
Definition: byte_block.hpp:37
BlockReader()=default
default constructor
int value
Definition: gen_data.py:41
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
BlockSource & source()
Return reference to enclosed BlockSource.
Byte GetByte()
Fetch a single byte from the current block, advancing the cursor.
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
static constexpr bool debug
const Byte * data_begin() const
return pointer to beginning of valid data
Definition: block.hpp:259
TLX_ATTRIBUTE_ALWAYS_INLINE T NextNoSelfVerify()
High-performance smart pointer used as a wrapping reference counting pointer.
PinnedBlock CopyBlock() const
return current block for debugging
static const size_t bytes
number of bytes in uint_pair
Definition: uint_types.hpp:75
const Byte * end_
pointer to end of current block.
const Byte * current_
current read pointer into current block of file.
std::string hexdump_type(const Type &t)
Dump a (binary) item as a sequence of uppercase hexadecimal pairs.
Definition: hexdump.hpp:52
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
std::vector< PinnedBlock > * block_collect_
pointer to vector to collect blocks in GetItemRange.
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
BlockReader & Skip(size_t items, size_t bytes)
Advance the cursor given number of bytes without reading them.
bool IsValid() const
Return whether the enclosed ByteBlock is valid.
Definition: block.hpp:223
static constexpr bool self_verify
PinnedBlock block_
The current block being read, this holds a shared pointer reference.
size_t first_item_absolute() const
accessor to first_item_ (absolute in ByteBlock)
Definition: block.hpp:248
TLX_ATTRIBUTE_ALWAYS_INLINE Type GetRaw()
size_t typecode_verify() const
Returns typecode_verify_.