13 #ifndef THRILL_API_READ_BINARY_HEADER 14 #define THRILL_API_READ_BINARY_HEADER 44 template <
typename ValueType>
47 static constexpr
bool debug =
false;
83 uint64_t size_limit,
bool local_storage)
84 :
Super(ctx,
"ReadBinary") {
88 if (files.size() == 0)
89 die(
"ReadBinary: no files found in globs: " +
tlx::join(
' ', globlist));
91 if (size_limit != no_size_limit_)
99 for (
size_t i = 0; i < files.size(); ++i) {
100 if (files[i].size % fixed_size_ == 0)
continue;
102 die(
"ReadBinary: path " + files[i].
path +
103 " size is not a multiple of " <<
size_t(fixed_size_));
121 <<
"my_range" << my_range;
124 while (i < files.size() &&
129 for ( ; i < files.size() &&
134 size_t file_size = files[i].size;
137 fi.
path = files[i].path;
139 my_range.
begin <= file_begin ? 0 : my_range.
begin - file_begin,
140 my_range.
end >= file_end ? file_size : my_range.
end - file_begin);
143 sLOG <<
"ReadBinary: fileinfo" 144 <<
"path" << fi.
path <<
"range" << fi.
range;
157 tlx::make_counting<foxxll::syscall_file>(
174 (bsize - item_off + fixed_size_ - 1) / fixed_size_;
177 std::move(bbp), 0, bsize, item_off, item_num,
180 item_off += item_num * fixed_size_ - bsize;
182 LOG <<
"ReadBinary: adding Block " << block;
205 while (i < files.size() &&
210 while (i < files.size() &&
215 files[i].IsCompressed() });
220 <<
"my_range" << my_range;
230 LOG <<
"ReadBinaryNode::PushData() start " << *
this 231 <<
" consume=" << consume
239 LOG <<
"ReadBinaryNode::PushData() opening " << file.path;
246 this->
PushItem(br.template NextNoSelfVerify<ValueType>());
251 <<
"class" <<
"ReadBinaryNode" 280 size_t& stats_total_bytes,
281 size_t& stats_total_reads)
283 remain_size_(fileinfo.
range.size()),
285 stats_total_bytes_(stats_total_bytes),
286 stats_total_reads_(stats_total_reads) {
288 if (!is_compressed_) {
297 if (done_ || remain_size_ == 0)
304 size_t rb = is_compressed_
305 ? block_size :
std::min(block_size, remain_size_);
307 ssize_t size = stream_->read(bytes->data(), rb);
308 stats_total_bytes_ += size;
309 stats_total_reads_++;
311 LOG <<
"VfsFileBlockSource::NextBlock() size " << size;
314 if (!is_compressed_) {
315 assert(remain_size_ >= rb);
356 template <
typename ValueType>
358 Context& ctx,
const std::vector<std::string>& filepath,
361 auto node = tlx::make_counting<ReadBinaryNode<ValueType> >(
362 ctx, filepath, size_limit,
false);
380 template <
typename ValueType>
383 const std::vector<std::string>& filepath,
386 auto node = tlx::make_counting<ReadBinaryNode<ValueType> >(
387 ctx, filepath, size_limit,
true);
405 template <
typename ValueType>
410 auto node = tlx::make_counting<ReadBinaryNode<ValueType> >(
411 ctx, filepath, size_limit,
false);
429 template <
typename ValueType>
434 auto node = tlx::make_counting<ReadBinaryNode<ValueType> >(
435 ctx, filepath, size_limit,
true);
447 #endif // !THRILL_API_READ_BINARY_HEADER
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
only reading of the file is allowed
#define sLOG
Default logging method: output if the local debug variable is true.
DIA is the interface between the user and the Thrill framework.
static uint_pair max()
return an uint_pair instance containing the largest value possible
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
FileList Glob(const std::vector< std::string > &globlist, const GlobType >ype)
Reads a glob path list and deliver a file list, sizes, and prefixsums (in bytes) for all matching fil...
uint64_t total_size
total size of files
common::Range CalculateLocalRange(size_t global_size) const
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
size_t num_workers() const
Global number of workers in the system.
A File is an ordered sequence of Block objects for storing items.
void Clear()
Free all Blocks in the File and deallocate vectors.
do not acquire an exclusive lock by default
An Exception which is thrown on system errors and contains errno information.
ReadStreamPtr OpenReadStream(const std::string &path, const common::Range &range)
Construct reader for given path uri.
A pinned / pin-counted pointer to a ByteBlock.
represents a 1 dimensional range (interval) [begin,end)
bool contains_remote_uri
whether the list contains a remote-uri file.
static constexpr bool is_fixed_size_
flag whether ValueType is fixed size
A non-pinned counting pointer to a ByteBlock.
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
static constexpr uint64_t no_size_limit_
sentinel to disable size limit
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
std::vector< FileInfo > my_files_
list of files for non-mapped File push
DIA< ValueType > ReadBinary(Context &ctx, const std::vector< std::string > &filepath, uint64_t size_limit=ReadBinaryNode< ValueType >::no_size_limit_)
ReadBinary is a DOp, which reads a file written by WriteBinary from the file system and creates a DIA...
vfs::ReadStreamPtr stream_
PinnedByteBlockPtr AllocateByteBlock(size_t size, size_t local_worker_id)
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
uint64_t size_inc_psum(size_t i) const
inclusive prefix sum of file sizes (only for symmetry with ex_psum)
void AppendBlock(const Block &b)
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t & stats_total_bytes_
ReadBinaryNode(Context &ctx, const std::string &glob, uint64_t size_limit, bool local_storage)
uint64_t size_ex_psum(size_t i) const
exclusive prefix sum of file sizes with total_size as sentinel
bool use_ext_file_
File containing Blocks mapped directly to a io fileimpl.
ReadBinaryNode(Context &ctx, const std::vector< std::string > &globlist, uint64_t size_limit, bool local_storage)
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
std::vector< T, Allocator< T > > vector
vector with Manager tracking
size_t local_worker_id() const
High-performance smart pointer used as a wrapping reference counting pointer.
A DIANode which performs a line-based Read operation.
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
static const size_t bytes
number of bytes in uint_pair
static constexpr size_t fixed_size_
fixed size of ValueType or zero.
data::PinnedBlock NextBlock()
void vector_free(std::vector< Type > &v)
static constexpr bool debug_no_extfile
for testing old method of pushing items instead of PushFile().
static uint_pair min()
return an uint_pair instance containing the smallest value possible
common::Range range
begin and end offsets in file.
common::JsonLogger logger_
List of file info and additional overall info.
common::Range CalculateLocalRangeOnHost(size_t global_size) const
A pinned / pin-counted derivative of a Block.
bool is_compressed
whether file is compressed
std::string join(char glue, const std::vector< std::string > &parts)
Join a vector of strings by some glue character between each pair from the sequence.
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
structure to store info on what to read from files
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
bool contains_compressed
whether the list contains a compressed file.
#define LOG
Default logging method: output if the local debug variable is true.
void PushFile(data::File &file, bool consume) const
static constexpr bool debug
Context & context_
associated Context
VfsFileBlockSource(const FileInfo &fileinfo, Context &ctx, size_t &stats_total_bytes, size_t &stats_total_reads)
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
size_t & stats_total_reads_
ByteBlockPtr MapExternalBlock(const foxxll::file_ptr &file, uint64_t offset, size_t size)