Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
file.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/file.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_DATA_FILE_HEADER
13 #define THRILL_DATA_FILE_HEADER
14 
16 #include <thrill/common/logger.hpp>
17 #include <thrill/data/block.hpp>
22 
23 #include <tlx/die.hpp>
24 
25 #include <cassert>
26 #include <deque>
27 #include <functional>
28 #include <limits>
29 #include <string>
30 #include <vector>
31 
32 namespace thrill {
33 namespace data {
34 
35 //! \addtogroup data_layer
36 //! \{
37 
38 class FileBlockSink;
39 class KeepFileBlockSource;
40 class ConsumeFileBlockSource;
41 
42 /*!
43  * A File is an ordered sequence of Block objects for storing items. By using
44  * the Block indirection, the File can be composed using existing Block objects
45  * (via reference counting), but only contain a subset of the items in those
46  * Blocks. This may be used for Zip() and Repartition().
47  *
48  * A File can be written using a BlockWriter instance, which is delivered by
49  * GetWriter(). Thereafter it can be read (multiple times) using a BlockReader,
50  * delivered by GetReader().
51  *
52  * Using a prefixsum over the number of items in a Block, one can seek to the
53  * block contained any item offset in log_2(Blocks) time, though seeking within
54  * the Block goes sequentially.
55  */
56 class File : public BlockSink, public tlx::ReferenceCounter
57 {
58 public:
63 
64  //! external static variable containing the default number of bytes to
65  //! prefetch in File readers
66  static size_t default_prefetch_size_;
67 
68  //! Constructor from BlockPool
69  File(BlockPool& block_pool, size_t local_worker_id, size_t dia_id);
70 
71  //! non-copyable: delete copy-constructor
72  File(const File&) = delete;
73  //! non-copyable: delete assignment operator
74  File& operator = (const File&) = delete;
75  //! move-constructor: default
76  File(File&&) = default;
77  //! move-assignment operator: default
78  File& operator = (File&&) = default;
79 
80  //! Return a copy of the File (explicit copy-constructor)
81  File Copy() const;
82 
83  //! \name Methods of a BlockSink
84  //! \{
85 
86  //! Append a block to this file, the block must contain given number of
87  //! items after the offset first.
88  void AppendBlock(const Block& b) {
89  if (b.size() == 0) return;
90  num_items_sum_.push_back(num_items() + b.num_items());
91  size_bytes_ += b.size();
92  stats_bytes_ += b.size();
93  stats_items_ += b.num_items();
94  blocks_.push_back(b);
95  }
96 
97  //! Append a block to this file, the block must contain given number of
98  //! items after the offset first.
99  void AppendBlock(Block&& b) {
100  if (b.size() == 0) return;
101  num_items_sum_.push_back(num_items() + b.num_items());
102  size_bytes_ += b.size();
103  stats_bytes_ += b.size();
104  stats_items_ += b.num_items();
105  blocks_.emplace_back(std::move(b));
106  }
107 
108  //! Append a block to this file, the block must contain given number of
109  //! items after the offset first.
110  void AppendBlock(const Block& b, bool /* is_last_block */) final {
111  return AppendBlock(b);
112  }
113 
114  //! Append a block to this file, the block must contain given number of
115  //! items after the offset first.
116  void AppendBlock(Block&& b, bool /* is_last_block */) final {
117  return AppendBlock(std::move(b));
118  }
119 
120  void Close() final;
121 
122  //! write out stats
123  ~File();
124 
125  //! Free all Blocks in the File and deallocate vectors
126  void Clear();
127 
128  //! boolean flag whether to check if AllocateByteBlock can fail in any
129  //! subclass (if false: accelerate BlockWriter to not be able to cope with
130  //! nullptr).
131  static constexpr bool allocate_can_fail_ = false;
132 
133  //! \}
134 
135  //! \name Writers and Readers
136  //! \{
137 
138  //! Get BlockWriter.
139  Writer GetWriter(size_t block_size = default_block_size);
140 
141  /*!
142  * Get BlockReader or a consuming BlockReader for beginning of File
143  *
144  * \attention If consume is true, the reader consumes the File's contents
145  * UNCONDITIONALLY, the File will always be emptied whether all items were
146  * read via the Reader or not.
147  */
149  bool consume, size_t prefetch_size = File::default_prefetch_size_);
150 
151  //! Get BlockReader for beginning of File
153  size_t prefetch_size = File::default_prefetch_size_) const;
154 
155  /*!
156  * Get consuming BlockReader for beginning of File
157  *
158  * \attention The reader consumes the File's contents UNCONDITIONALLY, the
159  * File will always be emptied whether all items were read via the Reader or
160  * not.
161  */
163  size_t prefetch_size = File::default_prefetch_size_);
164 
165  //! Get BlockReader seeked to the corresponding item index
166  template <typename ItemType>
168  size_t index, size_t prefetch = default_prefetch_size_) const;
169 
170  //! Read complete File into a std::string, obviously, this should only be
171  //! used for debugging!
172  std::string ReadComplete() const;
173 
174  //! \}
175 
176  //! Return the number of blocks
177  size_t num_blocks() const { return blocks_.size(); }
178 
179  //! Return the number of items in the file
180  size_t num_items() const {
181  return num_items_sum_.size() ? num_items_sum_.back() : 0;
182  }
183 
184  //! Returns true if the File is empty.
185  bool empty() const { return blocks_.empty(); }
186 
187  //! Return the number of bytes of user data in this file.
188  size_t size_bytes() const { return size_bytes_; }
189 
190  //! Return reference to a block
191  const Block& block(size_t i) const {
192  assert(i < blocks_.size());
193  return blocks_[i];
194  }
195 
196  //! Returns constant reference to all Blocks in the File.
197  const std::deque<Block>& blocks() const { return blocks_; }
198 
199  //! Return number of items starting in block i
200  size_t ItemsStartIn(size_t i) const {
201  assert(i < blocks_.size());
202  return num_items_sum_[i] - (i == 0 ? 0 : num_items_sum_[i - 1]);
203  }
204 
205  //! Get item at the corresponding position. Do not use this
206  //! method for reading multiple successive items.
207  template <typename ItemType>
208  ItemType GetItemAt(size_t index) const;
209 
210  /*!
211  * Get index of the given item, or the next greater item, in this file. The
212  * file has to be ordered according to the given compare function. The tie
213  * value can be used to make a decision in case of many successive equal
214  * elements. The tie is compared with the local rank of the element.
215  *
216  * WARNING: This method uses GetItemAt combined with a binary search and is
217  * therefore not efficient. The method should be reimplemented in near
218  * future.
219  */
220  template <typename ItemType, typename CompareFunction = std::less<ItemType> >
221  size_t GetIndexOf(const ItemType& item, size_t tie,
222  size_t left, size_t right,
223  const CompareFunction& func = CompareFunction()) const;
224 
225  /*!
226  * Get index of the given item, or the next greater item, in this file. The
227  * file has to be ordered according to the given compare function. The tie
228  * value can be used to make a decision in case of many successive equal
229  * elements. The tie is compared with the local rank of the element.
230  *
231  * WARNING: This method uses GetItemAt combined with a binary search and is
232  * therefore not efficient. The method should be reimplemented in near
233  * future.
234  */
235  template <typename ItemType, typename CompareFunction = std::less<ItemType> >
236  size_t GetIndexOf(const ItemType& item, size_t tie,
237  const CompareFunction& less = CompareFunction()) const {
238  // start binary search with range [0,num_items)
239  return GetIndexOf(item, tie, 0, num_items(), less);
240  }
241 
242  //! Seek in File: return a Block range containing items begin, end of
243  //! given type.
244  template <typename ItemType>
245  std::vector<Block> GetItemRange(size_t begin, size_t end) const;
246 
247  //! Output the Block objects contained in this File.
248  friend std::ostream& operator << (std::ostream& os, const File& f);
249 
250  //! change dia_id after construction (needed because it may be unknown at
251  //! construction)
252  void set_dia_id(size_t dia_id) { dia_id_ = dia_id; }
253 
254 private:
255  //! unique file id
256  size_t id_;
257 
258  //! optionally associated DIANode id
259  size_t dia_id_;
260 
261  //! container holding Blocks and thus shared pointers to all byte blocks.
262  std::deque<Block> blocks_;
263 
264  //! inclusive prefixsum of number of elements of blocks, hence
265  //! num_items_sum_[i] is the number of items starting in all blocks
266  //! preceding and including the i-th block.
267  std::deque<size_t> num_items_sum_;
268 
269  //! Total size of this file in bytes. Sum of all block sizes.
270  size_t size_bytes_ = 0;
271 
272  //! Total number of bytes stored in the File by a Writer: for stats, never
273  //! decreases.
274  size_t stats_bytes_ = 0;
275 
276  //! Total number of items stored in the File by a Writer: for stats, never
277  //! decreases.
278  size_t stats_items_ = 0;
279 
280  //! for access to blocks_ and num_items_sum_
281  friend class KeepFileBlockSource;
283 };
284 
286 
287 /*!
288  * BlockSink which interfaces to a File
289  */
290 class FileBlockSink final : public BlockSink
291 {
292  static constexpr bool debug = false;
293 
294 public:
296  : BlockSink(nullptr, -1), file_(nullptr)
297  { }
298 
300  : BlockSink(file->block_pool(), file->local_worker_id()),
301  file_(std::move(file)) {
302  LOG << "FileBlockSink() new for " << file_.get();
303  }
304 
305  //! default copy-constructor
306  FileBlockSink(const FileBlockSink&) = default;
307  //! default assignment operator
308  FileBlockSink& operator = (const FileBlockSink&) = default;
309 
311  LOG << "~FileBlockSink() for " << file_.get();
312  }
313 
314  //! \name Methods of a BlockSink
315  //! \{
316 
317  //! Append a block to this file, the block must contain given number of
318  //! items after the offset first.
319  void AppendBlock(const Block& b, bool is_last_block) final {
320  assert(file_);
321  return file_->AppendBlock(b, is_last_block);
322  }
323 
324  //! Append a block to this file, the block must contain given number of
325  //! items after the offset first.
326  void AppendBlock(Block&& b, bool is_last_block) final {
327  assert(file_);
328  return file_->AppendBlock(std::move(b), is_last_block);
329  }
330 
331  void Close() final {
332  if (file_) {
333  file_->Close();
334  file_.reset();
335  }
336  }
337 
338  //! \}
339 
340 private:
342 };
343 
344 /*!
345  * A BlockSource to read Blocks from a File. The KeepFileBlockSource mainly
346  * contains an index to the current block, which is incremented when the
347  * NextBlock() must be delivered.
348  */
350 {
351 public:
352  //! Start reading a File
354  const File& file, size_t local_worker_id,
355  size_t prefetch_size = File::default_prefetch_size_,
356  size_t first_block = 0, size_t first_item = keep_first_item);
357 
358  //! Perform prefetch
359  void Prefetch(size_t prefetch_size);
360 
361  //! Advance to next block of file, delivers current_ and end_ for
362  //! BlockReader
364 
365  //! Get next block unpinned, used by GetItemBatch to read Blocks without
366  //! pinning them
368 
369  //! Acquire Pin for Block returned from NextBlockUnpinned
370  PinnedBlock AcquirePin(const Block& block);
371 
372 protected:
373  //! Determine current unpinned Block to deliver via NextBlock() or
374  //! NextBlockUnpinned().
376 
377 private:
378  //! sentinel value for not changing the first_item item
379  static constexpr size_t keep_first_item = size_t(-1);
380 
381  //! file to read blocks from
382  const File& file_;
383 
384  //! local worker id reading the File
386 
387  //! number of bytes of prefetch for reader
389 
390  //! current prefetch operations
391  std::deque<PinRequestPtr> fetching_blocks_;
392 
393  //! current number of bytes in prefetch
395 
396  //! number of the first block
397  size_t first_block_;
398 
399  //! index of current block.
401 
402  //! offset of first item in first block read
403  size_t first_item_;
404 };
405 
406 /*!
407  * A BlockSource to read and simultaneously consume Blocks from a File. The
408  * ConsumeFileBlockSource always returns the first block of the File and removes
409  * it, hence, consuming Blocks from the File.
410  *
411  * \attention The reader consumes the File's contents UNCONDITIONALLY, the File
412  * will always be emptied whether all items were read via the Reader or not.
413  */
415 {
416 public:
417  //! Start reading a File. Creates a source for the given file and set the
418  //! number of blocks that should be prefetched. 0 means that no blocks are
419  //! prefetched.
421  File* file, size_t local_worker_id,
422  size_t prefetch_size = File::default_prefetch_size_);
423 
424  //! non-copyable: delete copy-constructor
426  //! non-copyable: delete assignment operator
428  //! move-constructor: default
430 
431  //! Perform prefetch
432  void Prefetch(size_t prefetch_size);
433 
434  //! Get the next block of file.
436 
437  //! Get next block unpinned, used by GetItemBatch to read Blocks without
438  //! pinning them
440 
441  //! Acquire Pin for Block returned from NextBlockUnpinned
442  PinnedBlock AcquirePin(const Block& block);
443 
444  //! Consume unread blocks and reset File to zero items.
446 
447 private:
448  //! file to consume blocks from (ptr to make moving easier)
450 
451  //! local worker id reading the File
453 
454  //! number of bytes of prefetch for reader
456 
457  //! current prefetch operations
458  std::deque<PinRequestPtr> fetching_blocks_;
459 
460  //! current number of bytes in prefetch
462 };
463 
464 //! Get BlockReader seeked to the corresponding item index
465 template <typename ItemType>
466 typename File::KeepReader
467 File::GetReaderAt(size_t index, size_t prefetch_size) const {
468  static constexpr bool debug = false;
469 
470  // perform binary search for item block with largest exclusive size
471  // prefixsum less or equal to index.
472  auto it =
473  std::lower_bound(num_items_sum_.begin(), num_items_sum_.end(), index);
474 
475  if (it == num_items_sum_.end())
476  die("Access beyond end of File?");
477 
478  size_t begin_block = it - num_items_sum_.begin();
479 
480  sLOG << "File::GetReaderAt()"
481  << "item" << index << "in block" << begin_block
482  << "psum" << num_items_sum_[begin_block]
483  << "first_item" << blocks_[begin_block].first_item_absolute();
484 
485  // start Reader at given first valid item in located block
486  KeepReader fr(
487  KeepFileBlockSource(*this, local_worker_id_, prefetch_size,
488  begin_block,
489  blocks_[begin_block].first_item_absolute()));
490 
491  // skip over extra items in beginning of block
492  size_t items_before = it == num_items_sum_.begin() ? 0 : *(it - 1);
493 
494  sLOG << "File::GetReaderAt()"
495  << "items_before" << items_before << "index" << index
496  << "delta" << (index - items_before);
497  assert(items_before <= index);
498 
499  // use fixed_size information to accelerate jump.
501  {
502  // fetch a Block to get typecode_verify flag
503  fr.HasNext();
504 
505  const size_t skip_items = index - items_before;
506  const size_t bytes_per_item =
507  (fr.typecode_verify() ? sizeof(size_t) : 0)
509 
510  fr.Skip(skip_items, skip_items * bytes_per_item);
511  }
512  else
513  {
514  for (size_t i = items_before; i < index; ++i) {
515  if (!fr.HasNext())
516  die("Underflow in GetItemRange()");
517  fr.template Next<ItemType>();
518  }
519  }
520 
521  sLOG << "File::GetReaderAt()"
522  << "after seek at" << fr.CopyBlock();
523 
524  return fr;
525 }
526 
527 template <typename ItemType>
528 ItemType File::GetItemAt(size_t index) const {
529  KeepReader reader = this->GetReaderAt<ItemType>(index, /* prefetch */ 0);
530  return reader.Next<ItemType>();
531 }
532 
533 template <typename ItemType, typename CompareFunction>
535  const ItemType& item, size_t tie, size_t left, size_t right,
536  const CompareFunction& less) const {
537 
538  static constexpr bool debug = false;
539 
540  static_assert(
541  std::is_convertible<
542  bool, typename common::FunctionTraits<CompareFunction>::result_type
543  >::value,
544  "Comperator must return boolean.");
545 
546  LOG << "File::GetIndexOf() looking for item " << item << " tie " << tie
547  << " in range [" << left << "," << right << ") ="
548  << " size " << right - left;
549 
550  assert(left <= right);
551  assert(left <= num_items());
552  assert(right <= num_items());
553 
554  // Use a binary search to find the item.
555  while (left < right) {
556  size_t mid = (right + left) >> 1;
557  LOG << "left: " << left << "right: " << right << "mid: " << mid;
558  ItemType cur = GetItemAt<ItemType>(mid);
559  LOG << "Item at mid: " << cur;
560  if (less(item, cur) ||
561  (!less(item, cur) && !less(cur, item) && tie <= mid)) {
562  right = mid;
563  }
564  else {
565  left = mid + 1;
566  }
567  }
568 
569  LOG << "found insert position at: " << left;
570  return left;
571 }
572 
573 //! Seek in File: return a Block range containing items begin, end of
574 //! given type.
575 template <typename ItemType>
576 std::vector<Block> File::GetItemRange(size_t begin, size_t end) const {
577  assert(begin <= end);
578  // deliver array of remaining blocks
579  return GetReaderAt<ItemType>(begin)
580  .template GetItemBatch<ItemType>(end - begin);
581 }
582 
583 //! Take a vector of Readers and prefetch equally from them
584 template <typename Reader>
585 void StartPrefetch(std::vector<Reader>& readers, size_t prefetch_size) {
586  for (size_t p = default_block_size; p < prefetch_size;
587  p += default_block_size)
588  {
589  for (Reader& r : readers)
590  r.source().Prefetch(p);
591  }
592  for (Reader& r : readers)
593  r.source().Prefetch(prefetch_size);
594 }
595 
596 //! \}
597 
598 } // namespace data
599 } // namespace thrill
600 
601 #endif // !THRILL_DATA_FILE_HEADER
602 
603 /******************************************************************************/
void StartPrefetch(std::vector< Reader > &readers, size_t prefetch_size)
Take a vector of Readers and prefetch equally from them.
Definition: file.hpp:585
size_t local_worker_id_
local worker id reading the File
Definition: file.hpp:385
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
void Prefetch(size_t prefetch_size)
Perform prefetch.
Definition: file.cpp:215
size_t ItemsStartIn(size_t i) const
Return number of items starting in block i.
Definition: file.hpp:200
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
size_t num_items() const
Return the number of items in the file.
Definition: file.hpp:180
FileBlockSink & operator=(const FileBlockSink &)=default
default assignment operator
BlockPool * block_pool() const
Returns block_pool_.
Definition: block_sink.hpp:69
std::deque< Block > blocks_
container holding Blocks and thus shared pointers to all byte blocks.
Definition: file.hpp:262
static constexpr size_t keep_first_item
sentinel value for not changing the first_item item
Definition: file.hpp:379
PinnedBlock AcquirePin(const Block &block)
Acquire Pin for Block returned from NextBlockUnpinned.
Definition: file.cpp:278
size_t id_
unique file id
Definition: file.hpp:256
FileBlockSink(tlx::CountingPtrNoDelete< File > file)
Definition: file.hpp:299
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
const std::deque< Block > & blocks() const
Returns constant reference to all Blocks in the File.
Definition: file.hpp:197
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
void Clear()
Free all Blocks in the File and deallocate vectors.
Definition: file.cpp:57
size_t fetching_bytes_
current number of bytes in prefetch
Definition: file.hpp:461
PinnedBlock AcquirePin(const Block &block)
Acquire Pin for Block returned from NextBlockUnpinned.
Definition: file.cpp:192
size_t current_block_
index of current block.
Definition: file.hpp:400
PinnedBlock NextBlock()
Get the next block of file.
Definition: file.cpp:232
std::deque< PinRequestPtr > fetching_blocks_
current prefetch operations
Definition: file.hpp:458
size_t dia_id_
optionally associated DIANode id
Definition: file.hpp:259
ConsumeFileBlockSource(File *file, size_t local_worker_id, size_t prefetch_size=File::default_prefetch_size_)
Definition: file.cpp:199
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
const Block & block(size_t i) const
Return reference to a block.
Definition: file.hpp:191
size_t num_items() const
return number of items beginning in this block
Definition: block.hpp:85
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
tlx::CountingPtrNoDelete< File > file_
Definition: file.hpp:341
size_t GetIndexOf(const ItemType &item, size_t tie, size_t left, size_t right, const CompareFunction &func=CompareFunction()) const
Get index of the given item, or the next greater item, in this file.
Definition: file.hpp:534
void AppendBlock(Block &&b)
Definition: file.hpp:99
void AppendBlock(Block &&b, bool) final
Definition: file.hpp:116
std::vector< Block > GetItemRange(size_t begin, size_t end) const
Definition: file.hpp:576
ConsumeFileBlockSource & operator=(const ConsumeFileBlockSource &)=delete
non-copyable: delete assignment operator
void set_dia_id(size_t dia_id)
Definition: file.hpp:252
std::string ReadComplete() const
Definition: file.cpp:87
~ConsumeFileBlockSource()
Consume unread blocks and reset File to zero items.
Definition: file.cpp:282
size_t first_item_
offset of first item in first block read
Definition: file.hpp:403
BlockSink which interfaces to a File.
Definition: file.hpp:290
size_t local_worker_id_
local worker id to associate pinned block with
Definition: block_sink.hpp:103
size_t size_bytes() const
Return the number of bytes of user data in this file.
Definition: file.hpp:188
size_t first_block_
number of the first block
Definition: file.hpp:397
void AppendBlock(Block &&b, bool is_last_block) final
Definition: file.hpp:326
std::deque< PinRequestPtr > fetching_blocks_
current prefetch operations
Definition: file.hpp:391
size_t stats_bytes_
Definition: file.hpp:274
ConsumeReader GetConsumeReader(size_t prefetch_size=File::default_prefetch_size_)
Get consuming BlockReader for beginning of File.
Definition: file.cpp:73
void AppendBlock(const Block &b)
Definition: file.hpp:88
File Copy() const
Return a copy of the File (explicit copy-constructor)
Definition: file.cpp:42
void Close() final
Closes the sink. Must not be called multiple times.
Definition: file.cpp:52
void AppendBlock(const Block &b, bool is_last_block) final
Definition: file.hpp:319
ItemType GetItemAt(size_t index) const
Definition: file.hpp:528
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
void AppendBlock(const Block &b, bool) final
Definition: file.hpp:110
int value
Definition: gen_data.py:41
void Close() final
Closes the sink. Must not be called multiple times.
Definition: file.hpp:331
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t stats_items_
Definition: file.hpp:278
size_t num_blocks() const
Return the number of blocks.
Definition: file.hpp:177
friend std::ostream & operator<<(std::ostream &os, const File &f)
Output the Block objects contained in this File.
Definition: file.cpp:94
size_t GetIndexOf(const ItemType &item, size_t tie, const CompareFunction &less=CompareFunction()) const
Get index of the given item, or the next greater item, in this file.
Definition: file.hpp:236
size_t prefetch_size_
number of bytes of prefetch for reader
Definition: file.hpp:388
size_t size() const
return length of valid data in bytes.
Definition: block.hpp:100
static constexpr bool debug
const File & file_
file to read blocks from
Definition: file.hpp:382
File(BlockPool &block_pool, size_t local_worker_id, size_t dia_id)
Constructor from BlockPool.
Definition: file.cpp:22
void Prefetch(size_t prefetch_size)
Perform prefetch.
Definition: file.cpp:131
PinnedBlock CopyBlock() const
return current block for debugging
bool empty() const
Returns true if the File is empty.
Definition: file.hpp:185
static constexpr bool allocate_can_fail_
Definition: file.hpp:131
Reader GetReader(bool consume, size_t prefetch_size=File::default_prefetch_size_)
Get BlockReader or a consuming BlockReader for beginning of File.
Definition: file.cpp:78
KeepReader GetReaderAt(size_t index, size_t prefetch=default_prefetch_size_) const
Get BlockReader seeked to the corresponding item index.
Definition: file.hpp:467
static size_t default_prefetch_size_
Definition: file.hpp:66
size_t local_worker_id_
local worker id reading the File
Definition: file.hpp:452
Pure virtual base class for all things that can receive Blocks from a BlockWriter.
Definition: block_sink.hpp:28
friend class KeepFileBlockSource
for access to blocks_ and num_items_sum_
Definition: file.hpp:281
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
size_t local_worker_id() const
local worker id to associate pinned block with
Definition: block_sink.hpp:94
File * file_
file to consume blocks from (ptr to make moving easier)
Definition: file.hpp:449
KeepFileBlockSource(const File &file, size_t local_worker_id, size_t prefetch_size=File::default_prefetch_size_, size_t first_block=0, size_t first_item=keep_first_item)
Start reading a File.
Definition: file.cpp:107
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
std::deque< size_t > num_items_sum_
Definition: file.hpp:267
A BlockSource to read and simultaneously consume Blocks from a File.
Definition: file.hpp:414
BlockReader & Skip(size_t items, size_t bytes)
Advance the cursor given number of bytes without reading them.
size_t fetching_bytes_
current number of bytes in prefetch
Definition: file.hpp:394
size_t prefetch_size_
number of bytes of prefetch for reader
Definition: file.hpp:455
BlockReader< DynBlockSource > DynBlockReader
Instantiation of BlockReader for reading from the polymorphic source.
A BlockSource to read Blocks from a File.
Definition: file.hpp:349
KeepReader GetKeepReader(size_t prefetch_size=File::default_prefetch_size_) const
Get BlockReader for beginning of File.
Definition: file.cpp:68
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
static constexpr bool debug
Definition: file.hpp:292
size_t size_bytes_
Total size of this file in bytes. Sum of all block sizes.
Definition: file.hpp:270
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Definition: file.cpp:63
Provides reference counting abilities for use with CountingPtr.
size_t typecode_verify() const
Returns typecode_verify_.
File & operator=(const File &)=delete
non-copyable: delete assignment operator