Thrill  0.1
block_reader.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/block_reader.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_DATA_BLOCK_READER_HEADER
13 #define THRILL_DATA_BLOCK_READER_HEADER
14 
15 #include <thrill/common/config.hpp>
17 #include <thrill/common/logger.hpp>
18 #include <thrill/data/block.hpp>
20 
21 #include <tlx/define.hpp>
22 #include <tlx/die.hpp>
23 #include <tlx/string/hexdump.hpp>
24 
25 #include <algorithm>
26 #include <string>
27 #include <vector>
28 
29 namespace thrill {
30 namespace data {
31 
32 //! \addtogroup data_layer
33 //! \{
34 
35 /*!
36  * BlockReader takes Block objects from BlockSource and allows reading of
37  * a) serializable Items or b) arbitray data from the Block sequence. It takes
38  * care of fetching the next Block when the previous one underruns and also of
39  * data items split between two Blocks.
40  */
41 template <typename BlockSource>
43  : public common::ItemReaderToolsBase<BlockReader<BlockSource> >
44 {
45 public:
46  static constexpr bool self_verify = common::g_self_verify;
47 
48  //! Start reading a BlockSource
49  explicit BlockReader(BlockSource&& source)
50  : source_(std::move(source)) { }
51 
52  //! default constructor
53  BlockReader() = default;
54 
55  //! Return reference to enclosed BlockSource
56  BlockSource& source() { return source_; }
57 
58  //! non-copyable: delete copy-constructor
59  BlockReader(const BlockReader&) = delete;
60  //! non-copyable: delete assignment operator
61  BlockReader& operator = (const BlockReader&) = delete;
62 
63  //! move-constructor: default
64  BlockReader(BlockReader&&) = default;
65  //! move-assignment operator: default
66  BlockReader& operator = (BlockReader&&) = default;
67 
68  //! return current block for debugging
70  if (!block_.byte_block()) return PinnedBlock();
71  return PinnedBlock(
75  }
76 
77  //! return current ByteBlock
78  ByteBlockPtr byte_block() const { return block_.byte_block(); }
79 
80  //! Returns typecode_verify_
81  size_t typecode_verify() const { return typecode_verify_; }
82 
83  //! \name Reading (Generic) Items
84  //! \{
85 
86  //! Next() reads a complete item T
87  template <typename T>
89  T Next() {
90  assert(HasNext());
91  assert(num_items_ > 0);
92  --num_items_;
93 
94  if (self_verify && typecode_verify_) {
95  // for self-verification, T is prefixed with its hash code
96  size_t code = GetRaw<size_t>();
97  if (code != typeid(T).hash_code()) {
98  die("BlockReader::Next() attempted to retrieve item "
99  "with different typeid! - expected "
100  << tlx::hexdump_type(typeid(T).hash_code())
101  << " got " << tlx::hexdump_type(code));
102  }
103  }
105  }
106 
107  //! Next() reads a complete item T, without item counter or self
108  //! verification
109  template <typename T>
112  assert(HasNext());
114  }
115 
116  //! HasNext() returns true if at least one more item is available.
118  bool HasNext() {
119  while (current_ == end_) {
120  if (!NextBlock()) {
121  return false;
122  }
123  }
124  return true;
125  }
126 
127  //! Return complete contents until empty as a std::vector<T>. Use this only
128  //! if you are sure that it will fit into memory, -> only use it for tests.
129  template <typename ItemType>
130  std::vector<ItemType> ReadComplete() {
131  std::vector<ItemType> out;
132  while (HasNext()) out.emplace_back(Next<ItemType>());
133  return out;
134  }
135 
136  //! Read n items, however, do not deserialize them but deliver them as a
137  //! vector of (unpinned) Block objects. This is used to take out a range of
138  //! items, the internal item cursor is advanced by n.
139  template <typename ItemType>
140  std::vector<Block> GetItemBatch(size_t n) {
141  static constexpr bool debug = false;
142 
143  std::vector<Block> out;
144  if (n == 0) return out;
145 
146  die_unless(HasNext());
147  assert(block_.IsValid());
148 
149  const Byte* begin_output = current_;
150  size_t first_output = current_ - byte_block()->begin();
151 
152  // inside the if-clause the current_ may not point to a valid item
153  // boundary.
154  if (n >= num_items_)
155  {
156  // *** if the current block still contains items, push it partially
157 
158  // construct first Block using current_ pointer
159  out.emplace_back(
160  byte_block(),
161  // valid range: excludes preceding items.
162  current_ - byte_block()->begin(),
163  end_ - byte_block()->begin(),
164  // first item is at begin_ (we may have dropped some)
165  current_ - byte_block()->begin(),
166  // remaining items in this block
167  num_items_,
168  // typecode verify flag
170 
171  sLOG << "partial first:" << out.back();
172 
173  n -= num_items_;
174 
175  // get next block. if not possible -> may be okay since last
176  // item might just terminate the current block.
177  block_.Reset();
178  Block next_block = source_.NextBlockUnpinned();
179  if (!next_block.IsValid()) {
180  assert(n == 0);
181  sLOG << "exit1 after batch.";
182  return out;
183  }
184 
185  // *** then append complete blocks without deserializing or pinning
186 
187  while (n >= next_block.num_items()) {
188  n -= next_block.num_items();
189 
190  out.emplace_back(std::move(next_block));
191  sLOG << "middle:" << out.back();
192 
193  next_block = source_.NextBlockUnpinned();
194  if (!next_block.IsValid()) {
195  assert(n == 0);
196  sLOG << "exit2 after batch.";
197  return out;
198  }
199  }
200 
201  // *** no more complete blocks, so load the current one
202 
203  LoadBlock(source_.AcquirePin(next_block));
204 
205  // move current_ to the first valid item of the block we got (at
206  // least one NextBlock() has been called). But when constructing the
207  // last Block, we have to include the partial item in the
208  // front.
209  begin_output = current_;
210  first_output = block_.first_item_absolute();
211 
213  }
214 
215  // put prospective last block into vector.
216 
217  out.emplace_back(
218  byte_block(),
219  // full range is valid.
220  begin_output - byte_block()->begin(), end_ - byte_block()->begin(),
221  first_output, n,
222  // typecode verify flag
224 
225  // skip over remaining items in this block, there while collect all
226  // blocks needed for those items via block_collect_. There can be more
227  // than one block necessary for Next if an item is large!
228 
229  std::vector<PinnedBlock> out_pinned;
230 
231  block_collect_ = &out_pinned;
233  Skip(n, n * ((self_verify && typecode_verify_ ? sizeof(size_t) : 0) +
235  }
236  else {
237  while (n > 0) {
238  Next<ItemType>();
239  --n;
240  }
241  }
242  block_collect_ = nullptr;
243 
244  for (PinnedBlock& pb : out_pinned)
245  out.emplace_back(std::move(pb).MoveToBlock());
246  out_pinned.clear();
247 
248  out.back().set_end(current_ - byte_block()->begin());
249 
250  sLOG << "partial last:" << out.back();
251 
252  sLOG << "exit3 after batch:"
253  << "current_=" << current_ - byte_block()->begin();
254 
255  return out;
256  }
257 
258  //! \}
259 
260  //! \name Cursor Reading Methods
261  //! \{
262 
263  //! Fetch a number of unstructured bytes from the current block, advancing
264  //! the cursor.
265  BlockReader& Read(void* outdata, size_t size) {
266 
267  Byte* cdata = reinterpret_cast<Byte*>(outdata);
268 
269  while (TLX_UNLIKELY(current_ + size > end_)) {
270  // partial copy of remainder of block
271  size_t partial_size = end_ - current_;
272  std::copy(current_, current_ + partial_size, cdata);
273 
274  cdata += partial_size;
275  size -= partial_size;
276 
277  if (!NextBlock())
278  throw std::runtime_error("Data underflow in BlockReader.");
279  }
280 
281  // copy rest from current block
282  std::copy(current_, current_ + size, cdata);
283  current_ += size;
284 
285  return *this;
286  }
287 
288  //! Fetch a number of unstructured bytes from the buffer as std::string,
289  //! advancing the cursor.
290  std::string Read(size_t datalen) {
291  std::string out(datalen, 0);
292  Read(const_cast<char*>(out.data()), out.size());
293  return out;
294  }
295 
296  //! Advance the cursor given number of bytes without reading them.
297  BlockReader& Skip(size_t items, size_t bytes) {
298  while (TLX_UNLIKELY(current_ + bytes > end_)) {
299  bytes -= end_ - current_;
300  // deduct number of remaining items in skipped block from item skip
301  // counter.
302  items -= num_items_;
303  if (!NextBlock())
304  throw std::runtime_error("Data underflow in BlockReader.");
305  }
306  current_ += bytes;
307  // the last line skipped over the remaining "items" number of items.
308  num_items_ -= items;
309  return *this;
310  }
311 
312  //! Fetch a single byte from the current block, advancing the cursor.
314  // loop, since blocks can actually be empty.
315  while (TLX_UNLIKELY(current_ == end_)) {
316  if (!NextBlock())
317  throw std::runtime_error("Data underflow in BlockReader.");
318  }
319  return *current_++;
320  }
321 
322  //! Fetch a single item of the template type Type from the buffer,
323  //! advancing the cursor. Be careful with implicit type conversions!
324  template <typename Type>
327  static_assert(std::is_pod<Type>::value,
328  "You only want to GetRaw() POD types as raw values.");
329 
330  Type ret;
331 
332  // fast path for reading item from block if it fits.
333  if (TLX_LIKELY(current_ + sizeof(Type) <= end_)) {
334  ret = *reinterpret_cast<const Type*>(current_);
335  current_ += sizeof(Type);
336  }
337  else {
338  Read(&ret, sizeof(ret));
339  }
340 
341  return ret;
342  }
343 
344  //! \}
345 
346 private:
347  //! Instance of BlockSource. This is NOT a reference, as to enable embedding
348  //! of FileBlockSource to compose classes into File::Reader.
349  BlockSource source_;
350 
351  //! The current block being read, this holds a shared pointer reference.
353 
354  //! current read pointer into current block of file.
355  const Byte* current_ = nullptr;
356 
357  //! pointer to end of current block.
358  const Byte* end_ = nullptr;
359 
360  //! remaining number of items starting in this block
361  size_t num_items_ = 0;
362 
363  //! pointer to vector to collect blocks in GetItemRange.
364  std::vector<PinnedBlock>* block_collect_ = nullptr;
365 
366  //! flag whether the underlying data contains self verify type codes from
367  //! BlockReader, this is false to needed to read external files.
369 
370  //! Call source_.NextBlock with appropriate parameters
371  bool NextBlock() {
372  // first release old pin.
373  block_.Reset();
374  // request next pinned block
375  block_ = source_.NextBlock();
376  sLOG0 << "BlockReader::NextBlock" << block_;
377 
378  if (!block_.IsValid()) return false;
379 
380  if (block_collect_)
381  block_collect_->emplace_back(block_);
382 
383  current_ = block_.data_begin();
384  end_ = block_.data_end();
385  num_items_ = block_.num_items();
386  typecode_verify_ = block_.typecode_verify();
387  return true;
388  }
389 
390  //! Load Block into Reader
391  void LoadBlock(PinnedBlock&& block) {
392  block_ = std::move(block);
393  current_ = block_.data_begin();
394  end_ = block_.data_end();
395  num_items_ = block_.num_items();
396  typecode_verify_ = block_.typecode_verify();
397  }
398 };
399 
400 //! \}
401 
402 } // namespace data
403 } // namespace thrill
404 
405 #endif // !THRILL_DATA_BLOCK_READER_HEADER
406 
407 /******************************************************************************/
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
std::vector< Block > GetItemBatch(size_t n)
bool typecode_verify() const
return typecode_verify from Block
Definition: block.hpp:254
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
std::vector< ItemType > ReadComplete()
#define die_unless(X)
Definition: die.hpp:27
#define TLX_ATTRIBUTE_ALWAYS_INLINE
double T
size_t num_items() const
return number of items beginning in this block
Definition: block.hpp:232
size_t first_item_absolute() const
accessor to first_item_ (absolute in ByteBlock)
Definition: block.hpp:248
const Byte * data_end() const
return pointer to end of valid data
Definition: block.hpp:265
Type
VFS object type.
Definition: file_io.hpp:52
std::string Read(size_t datalen)
STL namespace.
PinnedBlock CopyBlock() const
return current block for debugging
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
BlockReader & Read(void *outdata, size_t size)
BlockReader(BlockSource &&source)
Start reading a BlockSource.
A non-pinned counting pointer to a ByteBlock.
Definition: byte_block.hpp:176
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
CRTP class to enhance item/memory reader classes with Varint decoding and String decoding.
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
BlockReader & operator=(const BlockReader &)=delete
non-copyable: delete assignment operator
bool NextBlock()
Call source_.NextBlock with appropriate parameters.
size_t num_items_
remaining number of items starting in this block
static constexpr bool g_self_verify
Definition: config.hpp:32
#define TLX_LIKELY(c)
Definition: likely.hpp:23
void Reset()
release pin on block and reset Block pointer to nullptr
Definition: block.hpp:271
#define sLOG0
Override default output: never or always output log.
Definition: logger.hpp:37
uint8_t Byte
type of underlying memory area
Definition: byte_block.hpp:37
BlockReader()=default
default constructor
int value
Definition: gen_data.py:41
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
ByteBlockPtr byte_block() const
return current ByteBlock
BlockSource & source()
Return reference to enclosed BlockSource.
Byte GetByte()
Fetch a single byte from the current block, advancing the cursor.
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
static constexpr bool debug
const ByteBlockPtr & byte_block() const
access to byte_block_
Definition: block.hpp:226
TLX_ATTRIBUTE_ALWAYS_INLINE T NextNoSelfVerify()
static const size_t bytes
number of bytes in uint_pair
Definition: uint_types.hpp:75
const Byte * end_
pointer to end of current block.
size_t num_items() const
return number of items beginning in this block
Definition: block.hpp:85
const Byte * current_
current read pointer into current block of file.
std::string hexdump_type(const Type &t)
Dump a (binary) item as a sequence of uppercase hexadecimal pairs.
Definition: hexdump.hpp:52
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
std::vector< PinnedBlock > * block_collect_
pointer to vector to collect blocks in GetItemRange.
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
BlockReader & Skip(size_t items, size_t bytes)
Advance the cursor given number of bytes without reading them.
static constexpr bool self_verify
PinnedByteBlockPtr CopyPinnedByteBlock() const
Definition: block.hpp:298
void LoadBlock(PinnedBlock &&block)
Load Block into Reader.
PinnedBlock block_
The current block being read, this holds a shared pointer reference.
bool IsValid() const
Return whether the enclosed ByteBlock is valid.
Definition: block.hpp:74
const Byte * data_begin() const
return pointer to beginning of valid data
Definition: block.hpp:259
size_t typecode_verify() const
Returns typecode_verify_.
bool IsValid() const
Return whether the enclosed ByteBlock is valid.
Definition: block.hpp:223
TLX_ATTRIBUTE_ALWAYS_INLINE Type GetRaw()