24 id_(block_pool.next_file_id()), dia_id_(dia_id) { }
29 die(
"File[" <<
this <<
"]::~File() called " 31 "handles are still open.");
80 return ConstructDynBlockReader<ConsumeFileBlockSource>(
83 return ConstructDynBlockReader<KeepFileBlockSource>(
90 output += b.PinWait(0).ToString();
95 os <<
"[File " << std::hex << &f << std::dec
100 os <<
"\n " << i++ <<
" " << b;
109 size_t prefetch_size,
110 size_t first_block,
size_t first_item)
112 prefetch_size_(prefetch_size),
114 first_block_(first_block), current_block_(first_block),
115 first_item_(first_item)
183 return std::move(b).MoveToBlock();
200 File* file,
size_t local_worker_id,
size_t prefetch_size)
219 while (fetching_bytes_ < prefetch_size_ && !file_->blocks_.empty()) {
245 while (fetching_bytes_ < prefetch_size_ && !file_->blocks_.empty()) {
267 return std::move(b).MoveToBlock();
283 if (
file_ !=
nullptr)
PinnedBlock PinWait(size_t local_worker_id) const
Convenience function to call Pin() and wait for the future.
BlockReader< KeepFileBlockSource > KeepReader
size_t local_worker_id_
local worker id reading the File
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
void Prefetch(size_t prefetch_size)
Perform prefetch.
KeepReader GetKeepReader(size_t prefetch_size=File::default_prefetch_size_) const
Get BlockReader for beginning of File.
std::deque< Block > blocks_
container holding Blocks and thus shared pointers to all byte blocks.
static constexpr size_t keep_first_item
sentinel value for not changing the first_item item
PinnedBlock AcquirePin(const Block &block)
Acquire Pin for Block returned from NextBlockUnpinned.
A File is an ordered sequence of Block objects for storing items.
void Clear()
Free all Blocks in the File and deallocate vectors.
size_t fetching_bytes_
current number of bytes in prefetch
PinnedBlock AcquirePin(const Block &block)
Acquire Pin for Block returned from NextBlockUnpinned.
size_t current_block_
index of current block.
PinnedBlock NextBlock()
Get the next block of file.
std::deque< PinRequestPtr > fetching_blocks_
current prefetch operations
size_t dia_id_
optionally associated DIANode id
ConsumeFileBlockSource(File *file, size_t local_worker_id, size_t prefetch_size=File::default_prefetch_size_)
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
size_t size() const
return length of valid data in bytes.
BlockWriter< FileBlockSink > Writer
void set_begin(size_t i)
accessor to begin_
~ConsumeFileBlockSource()
Consume unread blocks and reset File to zero items.
size_t first_item_
offset of first item in first block read
size_t size() const
return length of valid data in bytes.
BlockSink which interfaces to a File.
size_t local_worker_id_
local worker id to associate pinned block with
friend class ConsumeFileBlockSource
size_t first_block_
number of the first block
std::deque< PinRequestPtr > fetching_blocks_
current prefetch operations
ConsumeReader GetConsumeReader(size_t prefetch_size=File::default_prefetch_size_)
Get consuming BlockReader for beginning of File.
BlockReader< ConsumeFileBlockSource > ConsumeReader
void Close() final
Closes the sink. Must not be called multiple times.
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
size_t local_worker_id() const
local worker id to associate pinned block with
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t reference_count() const noexcept
Return the number of references to this object (for debugging)
BlockPool * block_pool() const
Returns block_pool_.
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
friend std::ostream & operator<<(std::ostream &os, const File &f)
Output the Block objects contained in this File.
size_t prefetch_size_
number of bytes of prefetch for reader
PinRequestPtr Pin(size_t local_worker_id) const
const File & file_
file to read blocks from
File(BlockPool &block_pool, size_t local_worker_id, size_t dia_id)
Constructor from BlockPool.
High-performance smart pointer used as a wrapping reference counting pointer.
void Prefetch(size_t prefetch_size)
Perform prefetch.
Block NextBlockUnpinned()
Reader GetReader(bool consume, size_t prefetch_size=File::default_prefetch_size_)
Get BlockReader or a consuming BlockReader for beginning of File.
size_t local_worker_id_
local worker id reading the File
Pure virtual base class for all things that can receive Blocks from a BlockWriter.
friend class KeepFileBlockSource
for access to blocks_ and num_items_sum_
File Copy() const
Return a copy of the File (explicit copy-constructor)
A pinned / pin-counted derivative of a Block.
File * file_
file to consume blocks from (ptr to make moving easier)
KeepFileBlockSource(const File &file, size_t local_worker_id, size_t prefetch_size=File::default_prefetch_size_, size_t first_block=0, size_t first_item=keep_first_item)
Start reading a File.
std::string ReadComplete() const
std::deque< size_t > num_items_sum_
A BlockSource to read and simultaneously consume Blocks from a File.
size_t fetching_bytes_
current number of bytes in prefetch
size_t num_blocks() const
Return the number of blocks.
size_t prefetch_size_
number of bytes of prefetch for reader
Block NextBlockUnpinned()
size_t size_bytes_
Total size of this file in bytes. Sum of all block sizes.
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
const Block & block(size_t i) const
Return reference to a block.
common::JsonLogger & logger()
Returns BlockPool.logger_.