Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
file.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/data/file.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_DATA_FILE_HEADER
13 #define THRILL_DATA_FILE_HEADER
14 
16 #include <thrill/common/logger.hpp>
17 #include <thrill/data/block.hpp>
22 
23 #include <tlx/die.hpp>
24 
25 #include <cassert>
26 #include <deque>
27 #include <functional>
28 #include <limits>
29 #include <string>
30 #include <vector>
31 
32 namespace thrill {
33 namespace data {
34 
35 //! \addtogroup data_layer
36 //! \{
37 
38 class FileBlockSink;
39 class KeepFileBlockSource;
40 class ConsumeFileBlockSource;
41 
42 /*!
43  * A File is an ordered sequence of Block objects for storing items. By using
44  * the Block indirection, the File can be composed using existing Block objects
45  * (via reference counting), but only contain a subset of the items in those
46  * Blocks. This may be used for Zip() and Repartition().
47  *
48  * A File can be written using a BlockWriter instance, which is delivered by
49  * GetWriter(). Thereafter it can be read (multiple times) using a BlockReader,
50  * delivered by GetReader().
51  *
52  * Using a prefixsum over the number of items in a Block, one can seek to the
53  * block contained any item offset in log_2(Blocks) time, though seeking within
54  * the Block goes sequentially.
55  */
56 class File : public BlockSink, public tlx::ReferenceCounter
57 {
58 public:
63 
64  //! external static variable containing the default number of bytes to
65  //! prefetch in File readers
66  static size_t default_prefetch_size_;
67 
68  //! Constructor from BlockPool
69  File(BlockPool& block_pool, size_t local_worker_id, size_t dia_id);
70 
71  //! non-copyable: delete copy-constructor
72  File(const File&) = delete;
73  //! non-copyable: delete assignment operator
74  File& operator = (const File&) = delete;
75  //! move-constructor: default
76  File(File&&) = default;
77  //! move-assignment operator: default
78  File& operator = (File&&) = default;
79 
80  //! Return a copy of the File (explicit copy-constructor)
81  File Copy() const;
82 
83  //! \name Methods of a BlockSink
84  //! \{
85 
86  //! Append a block to this file, the block must contain given number of
87  //! items after the offset first.
88  void AppendBlock(const Block& b) {
89  if (b.size() == 0) return;
90  num_items_sum_.push_back(num_items() + b.num_items());
91  size_bytes_ += b.size();
92  stats_bytes_ += b.size();
93  stats_items_ += b.num_items();
94  blocks_.push_back(b);
95  }
96 
97  //! Append a block to this file, the block must contain given number of
98  //! items after the offset first.
99  void AppendBlock(Block&& b) {
100  if (b.size() == 0) return;
101  num_items_sum_.push_back(num_items() + b.num_items());
102  size_bytes_ += b.size();
103  stats_bytes_ += b.size();
104  stats_items_ += b.num_items();
105  blocks_.emplace_back(std::move(b));
106  }
107 
108  //! Append a block to this file, the block must contain given number of
109  //! items after the offset first.
110  void AppendBlock(const Block& b, bool /* is_last_block */) final {
111  return AppendBlock(b);
112  }
113 
114  //! Append a block to this file, the block must contain given number of
115  //! items after the offset first.
116  void AppendBlock(Block&& b, bool /* is_last_block */) final {
117  return AppendBlock(std::move(b));
118  }
119 
120  void Close() final;
121 
122  //! write out stats
123  ~File();
124 
125  //! Free all Blocks in the File and deallocate vectors
126  void Clear();
127 
128  //! boolean flag whether to check if AllocateByteBlock can fail in any
129  //! subclass (if false: accelerate BlockWriter to not be able to cope with
130  //! nullptr).
131  static constexpr bool allocate_can_fail_ = false;
132 
133  //! \}
134 
135  //! \name Writers and Readers
136  //! \{
137 
138  //! Get BlockWriter.
139  Writer GetWriter(size_t block_size = default_block_size);
140 
141  /*!
142  * Get BlockReader or a consuming BlockReader for beginning of File
143  *
144  * \attention If consume is true, the reader consumes the File's contents
145  * UNCONDITIONALLY, the File will always be emptied whether all items were
146  * read via the Reader or not.
147  */
149  bool consume, size_t prefetch_size = File::default_prefetch_size_);
150 
151  //! Get BlockReader for beginning of File
153  size_t prefetch_size = File::default_prefetch_size_) const;
154 
155  /*!
156  * Get consuming BlockReader for beginning of File
157  *
158  * \attention The reader consumes the File's contents UNCONDITIONALLY, the
159  * File will always be emptied whether all items were read via the Reader or
160  * not.
161  */
163  size_t prefetch_size = File::default_prefetch_size_);
164 
165  //! Get BlockReader seeked to the corresponding item index
166  template <typename ItemType>
168  size_t index, size_t prefetch = default_prefetch_size_) const;
169 
170  //! Read complete File into a std::string, obviously, this should only be
171  //! used for debugging!
172  std::string ReadComplete() const;
173 
174  //! \}
175 
176  //! Return the number of blocks
177  size_t num_blocks() const { return blocks_.size(); }
178 
179  //! Return the number of items in the file
180  size_t num_items() const {
181  return num_items_sum_.size() ? num_items_sum_.back() : 0;
182  }
183 
184  //! Returns true if the File is empty.
185  bool empty() const { return blocks_.empty(); }
186 
187  //! Return the number of bytes of user data in this file.
188  size_t size_bytes() const { return size_bytes_; }
189 
190  //! Return reference to a block
191  const Block& block(size_t i) const {
192  assert(i < blocks_.size());
193  return blocks_[i];
194  }
195 
196  //! Returns constant reference to all Blocks in the File.
197  const std::deque<Block>& blocks() const { return blocks_; }
198 
199  //! Return number of items starting in block i
200  size_t ItemsStartIn(size_t i) const {
201  assert(i < blocks_.size());
202  return num_items_sum_[i] - (i == 0 ? 0 : num_items_sum_[i - 1]);
203  }
204 
205  //! Get item at the corresponding position. Do not use this
206  //! method for reading multiple successive items.
207  template <typename ItemType>
208  ItemType GetItemAt(size_t index) const;
209 
210  /*!
211  * Get index of the given item, or the next greater item, in this file. The
212  * file has to be ordered according to the given compare function. The tie
213  * value can be used to make a decision in case of many successive equal
214  * elements. The tie is compared with the local rank of the element.
215  *
216  * WARNING: This method uses GetItemAt combined with a binary search and is
217  * therefore not efficient. The method should be reimplemented in near
218  * future.
219  */
220  template <typename ItemType, typename CompareFunction = std::less<ItemType> >
221  size_t GetIndexOf(const ItemType& item, size_t tie,
222  size_t left, size_t right,
223  const CompareFunction& func = CompareFunction()) const;
224 
225  /*!
226  * Get index of the given item, or the next greater item, in this file. The
227  * file has to be ordered according to the given compare function. The tie
228  * value can be used to make a decision in case of many successive equal
229  * elements. The tie is compared with the local rank of the element.
230  *
231  * WARNING: This method uses GetItemAt combined with a binary search and is
232  * therefore not efficient. The method should be reimplemented in near
233  * future.
234  */
235  template <typename ItemType, typename CompareFunction = std::less<ItemType> >
236  size_t GetIndexOf(const ItemType& item, size_t tie,
237  const CompareFunction& less = CompareFunction()) const {
238  // start binary search with range [0,num_items)
239  return GetIndexOf(item, tie, 0, num_items(), less);
240  }
241 
242  //! Seek in File: return a Block range containing items begin, end of
243  //! given type.
244  template <typename ItemType>
245  std::vector<Block> GetItemRange(size_t begin, size_t end) const;
246 
247  //! Output the Block objects contained in this File.
248  friend std ::ostream& operator << (std::ostream& os, const File& f);
249 
250  //! change dia_id after construction (needed because it may be unknown at
251  //! construction)
252  void set_dia_id(size_t dia_id) { dia_id_ = dia_id; }
253 
254 private:
255  //! unique file id
256  size_t id_;
257 
258  //! optionally associated DIANode id
259  size_t dia_id_;
260 
261  //! container holding Blocks and thus shared pointers to all byte blocks.
262  std::deque<Block> blocks_;
263 
264  //! inclusive prefixsum of number of elements of blocks, hence
265  //! num_items_sum_[i] is the number of items starting in all blocks
266  //! preceding and including the i-th block.
267  std::deque<size_t> num_items_sum_;
268 
269  //! Total size of this file in bytes. Sum of all block sizes.
270  size_t size_bytes_ = 0;
271 
272  //! Total number of bytes stored in the File by a Writer: for stats, never
273  //! decreases.
274  size_t stats_bytes_ = 0;
275 
276  //! Total number of items stored in the File by a Writer: for stats, never
277  //! decreases.
278  size_t stats_items_ = 0;
279 
280  //! for access to blocks_ and num_items_sum_
281  friend class KeepFileBlockSource;
283 };
284 
286 
287 /*!
288  * BlockSink which interfaces to a File
289  */
290 class FileBlockSink final : public BlockSink
291 {
292  static constexpr bool debug = false;
293 
294 public:
296  : BlockSink(nullptr, -1), file_(nullptr)
297  { }
298 
300  : BlockSink(file->block_pool(), file->local_worker_id()),
301  file_(std::move(file)) {
302  LOG << "FileBlockSink() new for " << file_.get();
303  }
304 
305  //! default copy-constructor
306  FileBlockSink(const FileBlockSink&) = default;
307  //! default assignment operator
308  FileBlockSink& operator = (const FileBlockSink&) = default;
309 
311  LOG << "~FileBlockSink() for " << file_.get();
312  }
313 
314  //! \name Methods of a BlockSink
315  //! \{
316 
317  //! Append a block to this file, the block must contain given number of
318  //! items after the offset first.
319  void AppendBlock(const Block& b, bool is_last_block) final {
320  assert(file_);
321  return file_->AppendBlock(b, is_last_block);
322  }
323 
324  //! Append a block to this file, the block must contain given number of
325  //! items after the offset first.
326  void AppendBlock(Block&& b, bool is_last_block) final {
327  assert(file_);
328  return file_->AppendBlock(std::move(b), is_last_block);
329  }
330 
331  void Close() final {
332  if (file_) {
333  file_->Close();
334  file_.reset();
335  }
336  }
337 
338  //! \}
339 
340 private:
342 };
343 
344 /*!
345  * A BlockSource to read Blocks from a File. The KeepFileBlockSource mainly
346  * contains an index to the current block, which is incremented when the
347  * NextBlock() must be delivered.
348  */
350 {
351 public:
352  //! Start reading a File
354  const File& file, size_t local_worker_id,
355  size_t prefetch_size = File::default_prefetch_size_,
356  size_t first_block = 0, size_t first_item = keep_first_item);
357 
358  //! Advance to next block of file, delivers current_ and end_ for
359  //! BlockReader
361 
362  //! Perform prefetch
363  void Prefetch(size_t prefetch_size);
364 
365 protected:
366  //! Determine current unpinned Block to deliver via NextBlock()
368 
369 private:
370  //! sentinel value for not changing the first_item item
371  static constexpr size_t keep_first_item = size_t(-1);
372 
373  //! file to read blocks from
374  const File& file_;
375 
376  //! local worker id reading the File
378 
379  //! number of bytes of prefetch for reader
381 
382  //! current prefetch operations
383  std::deque<PinRequestPtr> fetching_blocks_;
384 
385  //! current number of bytes in prefetch
387 
388  //! number of the first block
389  size_t first_block_;
390 
391  //! index of current block.
393 
394  //! offset of first item in first block read
395  size_t first_item_;
396 };
397 
398 /*!
399  * A BlockSource to read and simultaneously consume Blocks from a File. The
400  * ConsumeFileBlockSource always returns the first block of the File and removes
401  * it, hence, consuming Blocks from the File.
402  *
403  * \attention The reader consumes the File's contents UNCONDITIONALLY, the File
404  * will always be emptied whether all items were read via the Reader or not.
405  */
407 {
408 public:
409  //! Start reading a File. Creates a source for the given file and set the
410  //! number of blocks that should be prefetched. 0 means that no blocks are
411  //! prefetched.
413  File* file, size_t local_worker_id,
414  size_t prefetch_size = File::default_prefetch_size_);
415 
416  //! non-copyable: delete copy-constructor
418  //! non-copyable: delete assignment operator
420  //! move-constructor: default
422 
423  //! Perform prefetch
424  void Prefetch(size_t prefetch_size);
425 
426  //! Get the next block of file.
428 
429  //! Consume unread blocks and reset File to zero items.
431 
432 private:
433  //! file to consume blocks from (ptr to make moving easier)
435 
436  //! local worker id reading the File
438 
439  //! number of bytes of prefetch for reader
441 
442  //! current prefetch operations
443  std::deque<PinRequestPtr> fetching_blocks_;
444 
445  //! current number of bytes in prefetch
447 };
448 
449 //! Get BlockReader seeked to the corresponding item index
450 template <typename ItemType>
451 typename File::KeepReader
452 File::GetReaderAt(size_t index, size_t prefetch_size) const {
453  static constexpr bool debug = false;
454 
455  // perform binary search for item block with largest exclusive size
456  // prefixsum less or equal to index.
457  auto it =
458  std::lower_bound(num_items_sum_.begin(), num_items_sum_.end(), index);
459 
460  if (it == num_items_sum_.end())
461  die("Access beyond end of File?");
462 
463  size_t begin_block = it - num_items_sum_.begin();
464 
465  sLOG << "File::GetReaderAt()"
466  << "item" << index << "in block" << begin_block
467  << "psum" << num_items_sum_[begin_block]
468  << "first_item" << blocks_[begin_block].first_item_absolute();
469 
470  // start Reader at given first valid item in located block
471  KeepReader fr(
472  KeepFileBlockSource(*this, local_worker_id_, prefetch_size,
473  begin_block,
474  blocks_[begin_block].first_item_absolute()));
475 
476  // skip over extra items in beginning of block
477  size_t items_before = it == num_items_sum_.begin() ? 0 : *(it - 1);
478 
479  sLOG << "File::GetReaderAt()"
480  << "items_before" << items_before << "index" << index
481  << "delta" << (index - items_before);
482  assert(items_before <= index);
483 
484  // use fixed_size information to accelerate jump.
486  {
487  // fetch a Block to get typecode_verify flag
488  fr.HasNext();
489 
490  const size_t skip_items = index - items_before;
491  const size_t bytes_per_item =
492  (fr.typecode_verify() ? sizeof(size_t) : 0)
494 
495  fr.Skip(skip_items, skip_items * bytes_per_item);
496  }
497  else
498  {
499  for (size_t i = items_before; i < index; ++i) {
500  if (!fr.HasNext())
501  die("Underflow in GetItemRange()");
502  fr.template Next<ItemType>();
503  }
504  }
505 
506  sLOG << "File::GetReaderAt()"
507  << "after seek at" << fr.CopyBlock();
508 
509  return fr;
510 }
511 
512 template <typename ItemType>
513 ItemType File::GetItemAt(size_t index) const {
514  KeepReader reader = this->GetReaderAt<ItemType>(index, /* prefetch */ 0);
515  return reader.Next<ItemType>();
516 }
517 
518 template <typename ItemType, typename CompareFunction>
520  const ItemType& item, size_t tie, size_t left, size_t right,
521  const CompareFunction& less) const {
522 
523  static constexpr bool debug = false;
524 
525  static_assert(
526  std::is_convertible<
527  bool, typename common::FunctionTraits<CompareFunction>::result_type
528  >::value,
529  "Comperator must return boolean.");
530 
531  LOG << "File::GetIndexOf() looking for item " << item << " tie " << tie
532  << " in range [" << left << "," << right << ") ="
533  << " size " << right - left;
534 
535  assert(left <= right);
536  assert(left <= num_items());
537  assert(right <= num_items());
538 
539  // Use a binary search to find the item.
540  while (left < right) {
541  size_t mid = (right + left) >> 1;
542  LOG << "left: " << left << "right: " << right << "mid: " << mid;
543  ItemType cur = GetItemAt<ItemType>(mid);
544  LOG << "Item at mid: " << cur;
545  if (less(item, cur) ||
546  (!less(item, cur) && !less(cur, item) && tie <= mid)) {
547  right = mid;
548  }
549  else {
550  left = mid + 1;
551  }
552  }
553 
554  LOG << "found insert position at: " << left;
555  return left;
556 }
557 
558 //! Seek in File: return a Block range containing items begin, end of
559 //! given type.
560 template <typename ItemType>
561 std::vector<Block> File::GetItemRange(size_t begin, size_t end) const {
562  assert(begin <= end);
563  // deliver array of remaining blocks
564  return GetReaderAt<ItemType>(begin)
565  .template GetItemBatch<ItemType>(end - begin);
566 }
567 
568 //! Take a vector of Readers and prefetch equally from them
569 template <typename Reader>
570 void StartPrefetch(std::vector<Reader>& readers, size_t prefetch_size) {
571  for (size_t p = default_block_size; p < prefetch_size;
572  p += default_block_size)
573  {
574  for (Reader& r : readers)
575  r.source().Prefetch(p);
576  }
577  for (Reader& r : readers)
578  r.source().Prefetch(prefetch_size);
579 }
580 
581 //! \}
582 
583 } // namespace data
584 } // namespace thrill
585 
586 #endif // !THRILL_DATA_FILE_HEADER
587 
588 /******************************************************************************/
void StartPrefetch(std::vector< Reader > &readers, size_t prefetch_size)
Take a vector of Readers and prefetch equally from them.
Definition: file.hpp:570
size_t local_worker_id_
local worker id reading the File
Definition: file.hpp:377
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
void Prefetch(size_t prefetch_size)
Perform prefetch.
Definition: file.cpp:197
size_t ItemsStartIn(size_t i) const
Return number of items starting in block i.
Definition: file.hpp:200
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
size_t num_items() const
Return the number of items in the file.
Definition: file.hpp:180
FileBlockSink & operator=(const FileBlockSink &)=default
default assignment operator
BlockPool * block_pool() const
Returns block_pool_.
Definition: block_sink.hpp:69
std::deque< Block > blocks_
container holding Blocks and thus shared pointers to all byte blocks.
Definition: file.hpp:262
static constexpr size_t keep_first_item
sentinel value for not changing the first_item item
Definition: file.hpp:371
size_t id_
unique file id
Definition: file.hpp:256
FileBlockSink(tlx::CountingPtrNoDelete< File > file)
Definition: file.hpp:299
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
const std::deque< Block > & blocks() const
Returns constant reference to all Blocks in the File.
Definition: file.hpp:197
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
void Clear()
Free all Blocks in the File and deallocate vectors.
Definition: file.cpp:57
size_t fetching_bytes_
current number of bytes in prefetch
Definition: file.hpp:446
size_t current_block_
index of current block.
Definition: file.hpp:392
PinnedBlock NextBlock()
Get the next block of file.
Definition: file.cpp:214
std::deque< PinRequestPtr > fetching_blocks_
current prefetch operations
Definition: file.hpp:443
size_t dia_id_
optionally associated DIANode id
Definition: file.hpp:259
ConsumeFileBlockSource(File *file, size_t local_worker_id, size_t prefetch_size=File::default_prefetch_size_)
Definition: file.cpp:181
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
const Block & block(size_t i) const
Return reference to a block.
Definition: file.hpp:191
size_t num_items() const
return number of items beginning in this block
Definition: block.hpp:85
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
tlx::CountingPtrNoDelete< File > file_
Definition: file.hpp:341
size_t GetIndexOf(const ItemType &item, size_t tie, size_t left, size_t right, const CompareFunction &func=CompareFunction()) const
Get index of the given item, or the next greater item, in this file.
Definition: file.hpp:519
void AppendBlock(Block &&b)
Definition: file.hpp:99
void AppendBlock(Block &&b, bool) final
Definition: file.hpp:116
std::vector< Block > GetItemRange(size_t begin, size_t end) const
Definition: file.hpp:561
ConsumeFileBlockSource & operator=(const ConsumeFileBlockSource &)=delete
non-copyable: delete assignment operator
void set_dia_id(size_t dia_id)
Definition: file.hpp:252
std::string ReadComplete() const
Definition: file.cpp:87
~ConsumeFileBlockSource()
Consume unread blocks and reset File to zero items.
Definition: file.cpp:241
size_t first_item_
offset of first item in first block read
Definition: file.hpp:395
BlockSink which interfaces to a File.
Definition: file.hpp:290
size_t local_worker_id_
local worker id to associate pinned block with
Definition: block_sink.hpp:103
size_t size_bytes() const
Return the number of bytes of user data in this file.
Definition: file.hpp:188
size_t first_block_
number of the first block
Definition: file.hpp:389
void AppendBlock(Block &&b, bool is_last_block) final
Definition: file.hpp:326
std::deque< PinRequestPtr > fetching_blocks_
current prefetch operations
Definition: file.hpp:383
size_t stats_bytes_
Definition: file.hpp:274
ConsumeReader GetConsumeReader(size_t prefetch_size=File::default_prefetch_size_)
Get consuming BlockReader for beginning of File.
Definition: file.cpp:73
void AppendBlock(const Block &b)
Definition: file.hpp:88
File Copy() const
Return a copy of the File (explicit copy-constructor)
Definition: file.cpp:42
void Close() final
Closes the sink. Must not be called multiple times.
Definition: file.cpp:52
void AppendBlock(const Block &b, bool is_last_block) final
Definition: file.hpp:319
ItemType GetItemAt(size_t index) const
Definition: file.hpp:513
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
Definition: block_pool.hpp:42
void AppendBlock(const Block &b, bool) final
Definition: file.hpp:110
int value
Definition: gen_data.py:41
void Close() final
Closes the sink. Must not be called multiple times.
Definition: file.hpp:331
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t stats_items_
Definition: file.hpp:278
size_t num_blocks() const
Return the number of blocks.
Definition: file.hpp:177
size_t GetIndexOf(const ItemType &item, size_t tie, const CompareFunction &less=CompareFunction()) const
Get index of the given item, or the next greater item, in this file.
Definition: file.hpp:236
size_t prefetch_size_
number of bytes of prefetch for reader
Definition: file.hpp:380
size_t size() const
return length of valid data in bytes.
Definition: block.hpp:100
static constexpr bool debug
const File & file_
file to read blocks from
Definition: file.hpp:374
File(BlockPool &block_pool, size_t local_worker_id, size_t dia_id)
Constructor from BlockPool.
Definition: file.cpp:22
friend std::ostream & operator<<(std::ostream &os, const File &f)
Output the Block objects contained in this File.
Definition: file.cpp:94
void Prefetch(size_t prefetch_size)
Perform prefetch.
Definition: file.cpp:117
PinnedBlock CopyBlock() const
return current block for debugging
Block NextUnpinnedBlock()
Determine current unpinned Block to deliver via NextBlock()
Definition: file.cpp:165
bool empty() const
Returns true if the File is empty.
Definition: file.hpp:185
static constexpr bool allocate_can_fail_
Definition: file.hpp:131
Reader GetReader(bool consume, size_t prefetch_size=File::default_prefetch_size_)
Get BlockReader or a consuming BlockReader for beginning of File.
Definition: file.cpp:78
KeepReader GetReaderAt(size_t index, size_t prefetch=default_prefetch_size_) const
Get BlockReader seeked to the corresponding item index.
Definition: file.hpp:452
static size_t default_prefetch_size_
Definition: file.hpp:66
size_t local_worker_id_
local worker id reading the File
Definition: file.hpp:437
Pure virtual base class for all things that can receive Blocks from a BlockWriter.
Definition: block_sink.hpp:28
friend class KeepFileBlockSource
for access to blocks_ and num_items_sum_
Definition: file.hpp:281
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
size_t local_worker_id() const
local worker id to associate pinned block with
Definition: block_sink.hpp:94
File * file_
file to consume blocks from (ptr to make moving easier)
Definition: file.hpp:434
KeepFileBlockSource(const File &file, size_t local_worker_id, size_t prefetch_size=File::default_prefetch_size_, size_t first_block=0, size_t first_item=keep_first_item)
Start reading a File.
Definition: file.cpp:107
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
std::deque< size_t > num_items_sum_
Definition: file.hpp:267
A BlockSource to read and simultaneously consume Blocks from a File.
Definition: file.hpp:406
BlockReader & Skip(size_t items, size_t bytes)
Advance the cursor given number of bytes without reading them.
size_t fetching_bytes_
current number of bytes in prefetch
Definition: file.hpp:386
size_t prefetch_size_
number of bytes of prefetch for reader
Definition: file.hpp:440
BlockReader< DynBlockSource > DynBlockReader
Instantiation of BlockReader for reading from the polymorphic source.
A BlockSource to read Blocks from a File.
Definition: file.hpp:349
KeepReader GetKeepReader(size_t prefetch_size=File::default_prefetch_size_) const
Get BlockReader for beginning of File.
Definition: file.cpp:68
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
static constexpr bool debug
Definition: file.hpp:292
size_t size_bytes_
Total size of this file in bytes. Sum of all block sizes.
Definition: file.hpp:270
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Definition: file.cpp:63
Provides reference counting abilities for use with CountingPtr.
size_t typecode_verify() const
Returns typecode_verify_.
File & operator=(const File &)=delete
non-copyable: delete assignment operator