12 #ifndef THRILL_DATA_FILE_HEADER 13 #define THRILL_DATA_FILE_HEADER 39 class KeepFileBlockSource;
40 class ConsumeFileBlockSource;
89 if (b.
size() == 0)
return;
100 if (b.size() == 0)
return;
105 blocks_.emplace_back(std::move(b));
166 template <
typename ItemType>
168 size_t index,
size_t prefetch = default_prefetch_size_)
const;
207 template <
typename ItemType>
220 template <
typename ItemType,
typename CompareFunction = std::less<ItemType> >
221 size_t GetIndexOf(
const ItemType& item,
size_t tie,
222 size_t left,
size_t right,
223 const CompareFunction& func = CompareFunction())
const;
235 template <
typename ItemType,
typename CompareFunction = std::less<ItemType> >
237 const CompareFunction& less = CompareFunction())
const {
244 template <
typename ItemType>
245 std::vector<Block>
GetItemRange(
size_t begin,
size_t end)
const;
292 static constexpr
bool debug =
false;
301 file_(
std::move(file)) {
302 LOG <<
"FileBlockSink() new for " << file_.get();
311 LOG <<
"~FileBlockSink() for " << file_.get();
321 return file_->AppendBlock(b, is_last_block);
328 return file_->AppendBlock(std::move(b), is_last_block);
356 size_t first_block = 0,
size_t first_item = keep_first_item);
359 void Prefetch(
size_t prefetch_size);
367 Block NextBlockUnpinned();
375 Block MakeNextBlock();
379 static constexpr
size_t keep_first_item = size_t(-1);
432 void Prefetch(
size_t prefetch_size);
439 Block NextBlockUnpinned();
465 template <
typename ItemType>
468 static constexpr
bool debug =
false;
476 die(
"Access beyond end of File?");
480 sLOG <<
"File::GetReaderAt()" 481 <<
"item" << index <<
"in block" << begin_block
483 <<
"first_item" <<
blocks_[begin_block].first_item_absolute();
489 blocks_[begin_block].first_item_absolute()));
492 size_t items_before = it ==
num_items_sum_.begin() ? 0 : *(it - 1);
494 sLOG <<
"File::GetReaderAt()" 495 <<
"items_before" << items_before <<
"index" << index
496 <<
"delta" << (index - items_before);
497 assert(items_before <= index);
505 const size_t skip_items = index - items_before;
506 const size_t bytes_per_item =
510 fr.
Skip(skip_items, skip_items * bytes_per_item);
514 for (
size_t i = items_before; i < index; ++i) {
516 die(
"Underflow in GetItemRange()");
517 fr.template Next<ItemType>();
521 sLOG <<
"File::GetReaderAt()" 527 template <
typename ItemType>
529 KeepReader reader = this->GetReaderAt<ItemType>(index, 0);
530 return reader.
Next<ItemType>();
533 template <
typename ItemType,
typename CompareFunction>
535 const ItemType& item,
size_t tie,
size_t left,
size_t right,
536 const CompareFunction& less)
const {
538 static constexpr
bool debug =
false;
542 bool,
typename common::FunctionTraits<CompareFunction>::result_type
544 "Comperator must return boolean.");
546 LOG <<
"File::GetIndexOf() looking for item " << item <<
" tie " << tie
547 <<
" in range [" << left <<
"," << right <<
") =" 548 <<
" size " << right - left;
550 assert(left <= right);
555 while (left < right) {
556 size_t mid = (right + left) >> 1;
557 LOG <<
"left: " << left <<
"right: " << right <<
"mid: " << mid;
558 ItemType cur = GetItemAt<ItemType>(mid);
559 LOG <<
"Item at mid: " << cur;
560 if (less(item, cur) ||
561 (!less(item, cur) && !less(cur, item) && tie <= mid)) {
569 LOG <<
"found insert position at: " << left;
575 template <
typename ItemType>
577 assert(begin <= end);
579 return GetReaderAt<ItemType>(begin)
580 .
template GetItemBatch<ItemType>(end - begin);
584 template <
typename Reader>
590 r.source().Prefetch(p);
593 r.source().Prefetch(prefetch_size);
601 #endif // !THRILL_DATA_FILE_HEADER void StartPrefetch(std::vector< Reader > &readers, size_t prefetch_size)
Take a vector of Readers and prefetch equally from them.
size_t local_worker_id_
local worker id reading the File
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
#define sLOG
Default logging method: output if the local debug variable is true.
bool empty() const
Returns true if the File is empty.
std::vector< Block > GetItemRange(size_t begin, size_t end) const
KeepReader GetReaderAt(size_t index, size_t prefetch=default_prefetch_size_) const
Get BlockReader seeked to the corresponding item index.
size_t size_bytes() const
Return the number of bytes of user data in this file.
size_t ItemsStartIn(size_t i) const
Return number of items starting in block i.
KeepReader GetKeepReader(size_t prefetch_size=File::default_prefetch_size_) const
Get BlockReader for beginning of File.
std::deque< Block > blocks_
container holding Blocks and thus shared pointers to all byte blocks.
FileBlockSink(tlx::CountingPtrNoDelete< File > file)
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
A File is an ordered sequence of Block objects for storing items.
void Clear()
Free all Blocks in the File and deallocate vectors.
size_t fetching_bytes_
current number of bytes in prefetch
size_t current_block_
index of current block.
std::deque< PinRequestPtr > fetching_blocks_
current prefetch operations
size_t dia_id_
optionally associated DIANode id
PinnedBlock CopyBlock() const
return current block for debugging
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
size_t GetIndexOf(const ItemType &item, size_t tie, size_t left, size_t right, const CompareFunction &func=CompareFunction()) const
Get index of the given item, or the next greater item, in this file.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
size_t size() const
return length of valid data in bytes.
tlx::CountingPtrNoDelete< File > file_
void AppendBlock(Block &&b)
void AppendBlock(Block &&b, bool) final
void set_dia_id(size_t dia_id)
const std::deque< Block > & blocks() const
Returns constant reference to all Blocks in the File.
size_t first_item_
offset of first item in first block read
BlockSink which interfaces to a File.
size_t local_worker_id_
local worker id to associate pinned block with
friend class ConsumeFileBlockSource
size_t first_block_
number of the first block
void AppendBlock(Block &&b, bool is_last_block) final
std::deque< PinRequestPtr > fetching_blocks_
current prefetch operations
ConsumeReader GetConsumeReader(size_t prefetch_size=File::default_prefetch_size_)
Get consuming BlockReader for beginning of File.
void AppendBlock(const Block &b)
void Close() final
Closes the sink. Must not be called multiple times.
void AppendBlock(const Block &b, bool is_last_block) final
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
void AppendBlock(const Block &b, bool) final
size_t local_worker_id() const
local worker id to associate pinned block with
void Close() final
Closes the sink. Must not be called multiple times.
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
BlockPool * block_pool() const
Returns block_pool_.
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
friend std::ostream & operator<<(std::ostream &os, const File &f)
Output the Block objects contained in this File.
size_t prefetch_size_
number of bytes of prefetch for reader
static constexpr bool debug
const File & file_
file to read blocks from
File(BlockPool &block_pool, size_t local_worker_id, size_t dia_id)
Constructor from BlockPool.
size_t num_items() const
return number of items beginning in this block
static constexpr bool allocate_can_fail_
Reader GetReader(bool consume, size_t prefetch_size=File::default_prefetch_size_)
Get BlockReader or a consuming BlockReader for beginning of File.
static size_t default_prefetch_size_
size_t local_worker_id_
local worker id reading the File
Pure virtual base class for all things that can receive Blocks from a BlockWriter.
friend class KeepFileBlockSource
for access to blocks_ and num_items_sum_
size_t GetIndexOf(const ItemType &item, size_t tie, const CompareFunction &less=CompareFunction()) const
Get index of the given item, or the next greater item, in this file.
File Copy() const
Return a copy of the File (explicit copy-constructor)
A pinned / pin-counted derivative of a Block.
size_t num_items() const
Return the number of items in the file.
File * file_
file to consume blocks from (ptr to make moving easier)
std::string ReadComplete() const
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
std::deque< size_t > num_items_sum_
A BlockSource to read and simultaneously consume Blocks from a File.
BlockReader & Skip(size_t items, size_t bytes)
Advance the cursor given number of bytes without reading them.
size_t fetching_bytes_
current number of bytes in prefetch
size_t num_blocks() const
Return the number of blocks.
size_t prefetch_size_
number of bytes of prefetch for reader
BlockReader< DynBlockSource > DynBlockReader
Instantiation of BlockReader for reading from the polymorphic source.
A BlockSource to read Blocks from a File.
#define LOG
Default logging method: output if the local debug variable is true.
ItemType GetItemAt(size_t index) const
size_t size_bytes_
Total size of this file in bytes. Sum of all block sizes.
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
const Block & block(size_t i) const
Return reference to a block.
size_t typecode_verify() const
Returns typecode_verify_.
Provides reference counting abilities for use with CountingPtr.
File & operator=(const File &)=delete
non-copyable: delete assignment operator