13 #ifndef THRILL_API_READ_LINES_HEADER 14 #define THRILL_API_READ_LINES_HEADER 43 static constexpr
bool debug =
false;
52 :
Super(ctx,
"ReadLines"),
58 die(
"ReadLines: no files found in globs: " +
tlx::join(
' ', globlist));
60 sLOG <<
"ReadLines: creating for" << globlist.size() <<
"globs" 61 <<
"matching" <<
filelist_.size() <<
"files";
152 current_ = buffer.
begin();
153 total_bytes_ +=
bytes;
155 LOG <<
"ReadLines: read block containing " << bytes <<
" bytes.";
161 <<
"class" <<
"ReadLinesNode" 163 <<
"total_bytes" << total_bytes_
164 <<
"total_reads" << total_reads_
165 <<
"total_lines" << total_elements_
201 <<
"my_range_" <<
my_range_ <<
"offset_" << offset_;
212 bool found_n =
false;
233 data_.reserve(4 * 1024);
254 LOG <<
"ReadLines: opening next file";
271 if (
data_.length()) {
323 LOG <<
"Start behind last file, not reading anything!";
328 if ((
files_[i].size_inc_psum() +
files_[i].size_ex_psum) / 2
332 if ((
files_[i].size_inc_psum() +
files_[i].size_ex_psum) / 2
348 sLOG <<
"ReadLines: opening compressed file" <<
file_nr_ 355 data_.reserve(4 * 1024);
376 LOG <<
"ReadLines: opening new compressed file!";
385 LOG <<
"ReadLines: reached last file";
389 if (
data_.length()) {
390 LOG <<
"ReadLines: end - returning string of length" 408 LOG <<
"ReadLines: new buffer in HasNext()";
414 LOG <<
"ReadLines: opening new file in HasNext()";
456 tlx::make_counting<ReadLinesNode>(
457 ctx, filepath,
false));
474 tlx::make_counting<ReadLinesNode>(
475 ctx, filepath,
true));
490 Context& ctx,
const std::vector<std::string>& filepaths) {
492 tlx::make_counting<ReadLinesNode>(
493 ctx, filepaths,
false));
508 const std::vector<std::string>& filepaths) {
510 tlx::make_counting<ReadLinesNode>(
511 ctx, filepaths,
true));
521 #endif // !THRILL_API_READ_LINES_HEADER size_t size() const
Return the currently used length in bytes.
DIAMemUse PushDataMemUse() final
Amount of RAM used by PushData()
#define sLOG
Default logging method: output if the local debug variable is true.
DIA is the interface between the user and the Thrill framework.
void PushItem(const std::string &item) const
Method for derived classes to Push a single item to all children.
Description of the amount of RAM the internal data structures of a DIANode require.
FileList Glob(const std::vector< std::string > &globlist, const GlobType >ype)
Reads a glob path list and deliver a file list, sizes, and prefixsums (in bytes) for all matching fil...
uint64_t total_size
total size of files
iterator begin()
return mutable iterator to first element
common::Range CalculateLocalRange(size_t global_size) const
void PushData(bool) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
An Exception which is thrown on system errors and contains errno information.
ReadStreamPtr OpenReadStream(const std::string &path, const common::Range &range)
Construct reader for given path uri.
represents a 1 dimensional range (interval) [begin,end)
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
static constexpr bool debug
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
DIA< std::string > ReadLines(Context &ctx, const std::string &filepath)
ReadLines is a DOp, which reads a file from the file system and creates an ordered DIA according to a...
const Byte * data() const
Return a pointer to the currently kept memory area.
ReadLinesNode(Context &ctx, const std::vector< std::string > &globlist, bool local_storage)
Constructor for a ReadLinesNode. Sets the Context and file path.
uint64_t size_inc_psum(size_t i) const
inclusive prefix sum of file sizes (only for symmetry with ex_psum)
uint64_t size_ex_psum(size_t i) const
exclusive prefix sum of file sizes with total_size as sentinel
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
std::vector< T, Allocator< T > > vector
vector with Manager tracking
static const size_t bytes
number of bytes in uint_pair
BufferBuilder & Reserve(size_t n)
Make sure that at least n bytes are allocated.
BufferBuilder represents a dynamically growable area of memory, which can be modified by appending in...
common::JsonLogger logger_
List of file info and additional overall info.
common::Range CalculateLocalRangeOnHost(size_t global_size) const
std::string join(char glue, const std::vector< std::string > &parts)
Join a vector of strings by some glue character between each pair from the sequence.
ReadLinesNode(Context &ctx, const std::string &glob, bool local_storage)
Constructor for a ReadLinesNode. Sets the Context and file path.
bool contains_compressed
whether the list contains a compressed file.
#define LOG
Default logging method: output if the local debug variable is true.
iterator end()
return mutable iterator beyond last element
BufferBuilder & set_size(size_t n)
Context & context_
associated Context
A DIANode which performs a line-based Read operation.