Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
read_lines.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/read_lines.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_API_READ_LINES_HEADER
14 #define THRILL_API_READ_LINES_HEADER
15 
16 #include <thrill/api/context.hpp>
17 #include <thrill/api/dia.hpp>
20 #include <thrill/common/logger.hpp>
21 #include <thrill/common/string.hpp>
24 #include <thrill/vfs/file_io.hpp>
25 
26 #include <tlx/string/join.hpp>
27 
28 #include <string>
29 #include <utility>
30 #include <vector>
31 
32 namespace thrill {
33 namespace api {
34 
35 /*!
36  * A DIANode which performs a line-based Read operation. Reads a file from the
37  * file system and delivers it as a DIA.
38  *
39  * \ingroup api_layer
40  */
41 class ReadLinesNode final : public SourceNode<std::string>
42 {
43  static constexpr bool debug = false;
44 
45 public:
47  using Super::context_;
48 
49  //! Constructor for a ReadLinesNode. Sets the Context and file path.
50  ReadLinesNode(Context& ctx, const std::vector<std::string>& globlist,
51  bool local_storage)
52  : Super(ctx, "ReadLines"),
53  local_storage_(local_storage) {
54 
55  filelist_ = vfs::Glob(globlist, vfs::GlobType::File);
56 
57  if (filelist_.size() == 0)
58  die("ReadLines: no files found in globs: " + tlx::join(' ', globlist));
59 
60  sLOG << "ReadLines: creating for" << globlist.size() << "globs"
61  << "matching" << filelist_.size() << "files";
62  }
63 
64  //! Constructor for a ReadLinesNode. Sets the Context and file path.
66  : ReadLinesNode(ctx, std::vector<std::string>{ glob }, local_storage)
67  { }
68 
69  DIAMemUse PushDataMemUse() final {
70  // InputLineIterators read files block-wise
72  }
73 
74  void PushData(bool /* consume */) final {
75  if (filelist_.contains_compressed) {
76  InputLineIteratorCompressed it(
77  filelist_, *this, local_storage_);
78 
79  // Hook Read
80  while (it.HasNext()) {
81  this->PushItem(it.Next());
82  }
83  }
84  else {
85  InputLineIteratorUncompressed it(
86  filelist_, *this, local_storage_);
87 
88  // Hook Read
89  while (it.HasNext()) {
90  this->PushItem(it.Next());
91  }
92  }
93  }
94 
95 private:
96  vfs::FileList filelist_;
97 
98  //! true, if files are on a local file system, false: common global file
99  //! system.
101 
103  {
104  public:
106  ReadLinesNode& node)
107  : files_(files), node_(node) { }
108 
109  //! non-copyable: delete copy-constructor
110  InputLineIterator(const InputLineIterator&) = delete;
111  //! non-copyable: delete assignment operator
113  //! move-constructor: default
115  //! move-assignment operator: default
117 
118  protected:
119  //! Block read size
121  //! String, which Next() references to
123  //! Input files with size prefixsum.
125 
126  //! Index of current file in files_
127  size_t file_nr_;
128  //! Byte buffer to create line std::string values.
130  //! Start of next element in current buffer.
131  unsigned char* current_;
132  //! (exclusive) [begin,end) of local block
134  //! Reference to node
136 
138 
139  size_t total_bytes_ = 0;
140  size_t total_reads_ = 0;
141  size_t total_elements_ = 0;
142 
144  net::BufferBuilder& buffer) {
145  read_timer.Start();
146  ssize_t bytes = file->read(buffer.data(), read_size);
147  read_timer.Stop();
148  if (bytes < 0) {
149  throw common::ErrnoException("Read error");
150  }
151  buffer.set_size(bytes);
152  current_ = buffer.begin();
153  total_bytes_ += bytes;
154  total_reads_++;
155  LOG << "ReadLines: read block containing " << bytes << " bytes.";
156  return bytes > 0;
157  }
158 
160  node_.logger_
161  << "class" << "ReadLinesNode"
162  << "event" << "done"
163  << "total_bytes" << total_bytes_
164  << "total_reads" << total_reads_
165  << "total_lines" << total_elements_
166  << "read_time" << read_timer;
167  }
168  };
169 
170  //! InputLineIterator gives you access to lines of a file
172  {
173  public:
174  //! Creates an instance of iterator that reads file line based
176  ReadLinesNode& node, bool local_storage)
177  : InputLineIterator(files, node) {
178 
179  // Go to start of 'local part'.
180  if (local_storage) {
182  files.total_size);
183  }
184  else {
186  files.total_size);
187  }
188 
189  assert(my_range_.begin <= my_range_.end);
190  if (my_range_.begin == my_range_.end) return;
191 
192  file_nr_ = 0;
193 
194  while (files_[file_nr_].size_inc_psum() <= my_range_.begin) {
195  file_nr_++;
196  }
197 
199 
200  sLOG << "ReadLines: opening file" << file_nr_
201  << "my_range_" << my_range_ << "offset_" << offset_;
202 
203  // find offset in current file:
204  // offset = start - sum of previous file sizes
206  files_[file_nr_].path, common::Range(offset_, 0));
207 
210 
211  if (offset_ != 0) {
212  bool found_n = false;
213 
214  // find next newline, discard all previous data as previous
215  // worker already covers it
216  while (!found_n) {
217  while (current_ < buffer_.end()) {
218  if (TLX_UNLIKELY(*current_++ == '\n')) {
219  found_n = true;
220  break;
221  }
222  }
223  // no newline found: read new data into buffer_builder
224  if (!found_n) {
225  offset_ += buffer_.size();
226  if (!ReadBlock(stream_, buffer_)) {
227  // EOF = newline per definition
228  found_n = true;
229  }
230  }
231  }
232  }
233  data_.reserve(4 * 1024);
234  }
235 
236  //! returns the next element if one exists
237  //!
238  //! does no checks whether a next element exists!
239  const std::string& Next() {
240  total_elements_++;
241  data_.clear();
242  while (true) {
243  while (TLX_LIKELY(current_ < buffer_.end())) {
244  if (TLX_UNLIKELY(*current_ == '\n')) {
245  current_++;
246  return data_;
247  }
248  else {
249  data_.push_back(*current_++);
250  }
251  }
252  offset_ += buffer_.size();
253  if (!ReadBlock(stream_, buffer_)) {
254  LOG << "ReadLines: opening next file";
255 
256  stream_->close();
257  file_nr_++;
258  offset_ = 0;
259 
260  if (file_nr_ < files_.size()) {
262  files_[file_nr_].path, common::Range(offset_, 0));
263  offset_ += buffer_.size();
265  }
266  else {
267  current_ = buffer_.begin() +
268  files_[file_nr_ - 1].size;
269  }
270 
271  if (data_.length()) {
272  return data_;
273  }
274  }
275  }
276  }
277 
278  //! returns true, if an element is available in local part
279  bool HasNext() {
280  size_t pos = current_ - buffer_.begin();
281  assert(current_ >= buffer_.begin());
282  size_t global_index = offset_ + pos + files_.size_ex_psum(file_nr_);
283  return global_index < my_range_.end ||
284  (global_index == my_range_.end &&
285  files_[file_nr_].size > offset_ + pos);
286  }
287 
288  private:
289  //! Offset of current block in stream_.
290  size_t offset_ = 0;
291  //! File handle to files_[file_nr_]
293  };
294 
295  //! InputLineIterator gives you access to lines of a file
297  {
298  public:
299  //! Creates an instance of iterator that reads file line based
301  ReadLinesNode& node, bool local_storage)
302  : InputLineIterator(files, node) {
303 
304  // Go to start of 'local part'.
305  if (local_storage) {
307  files.total_size);
308  }
309  else {
311  files.total_size);
312  }
313 
314  file_nr_ = 0;
315 
316  while (file_nr_ < files_.size() &&
319  ++file_nr_;
320  }
321 
322  if (file_nr_ == files_.size()) {
323  LOG << "Start behind last file, not reading anything!";
324  return;
325  }
326 
327  for (size_t i = file_nr_; i < files_.size(); i++) {
328  if ((files_[i].size_inc_psum() + files_[i].size_ex_psum) / 2
329  == my_range_.end) {
330  break;
331  }
332  if ((files_[i].size_inc_psum() + files_[i].size_ex_psum) / 2
333  > my_range_.end) {
335  break;
336  }
337  }
338 
339  if (my_range_.begin >= my_range_.end) {
340  // No local files, set buffer size to 2, so HasNext() does not try to read
341  LOG << "ReadLines: my_range " << my_range_;
342  buffer_.Reserve(2);
343  buffer_.set_size(2);
344  current_ = buffer_.begin();
345  return;
346  }
347 
348  sLOG << "ReadLines: opening compressed file" << file_nr_
349  << "my_range" << my_range_;
350 
352 
355  data_.reserve(4 * 1024);
356  }
357 
358  //! returns the next element if one exists
359  //!
360  //! does no checks whether a next element exists!
361  const std::string& Next() {
362  total_elements_++;
363  data_.clear();
364  while (true) {
365  while (current_ < buffer_.end()) {
366  if (TLX_UNLIKELY(*current_ == '\n')) {
367  current_++;
368  return data_;
369  }
370  else {
371  data_.push_back(*current_++);
372  }
373  }
374 
375  if (!ReadBlock(stream_, buffer_)) {
376  LOG << "ReadLines: opening new compressed file!";
377  stream_->close();
378  file_nr_++;
379 
380  if (file_nr_ < files_.size()) {
383  }
384  else {
385  LOG << "ReadLines: reached last file";
386  current_ = buffer_.begin();
387  }
388 
389  if (data_.length()) {
390  LOG << "ReadLines: end - returning string of length"
391  << data_.length();
392  return data_;
393  }
394  }
395  }
396  }
397 
398  //! returns true, if an element is available in local part
399  bool HasNext() {
401  return false;
402  }
403 
404  // if block is fully read, read next block. needs to be done here
405  // as HasNext() has to know if file is finished
406  // v-- no new line at end || v-- newline at end of file
407  if (current_ >= buffer_.end() || (current_ + 1 >= buffer_.end() && *current_ == '\n')) {
408  LOG << "ReadLines: new buffer in HasNext()";
410  if (buffer_.size() > 1 || (buffer_.size() == 1 && buffer_[0] != '\n')) {
411  return true;
412  }
413  else {
414  LOG << "ReadLines: opening new file in HasNext()";
415  // already at last file
416  if (file_nr_ >= files_.size() - 1) {
417  return false;
418  }
419  stream_->close();
420  // if (this worker reads at least one more file)
422  file_nr_++;
425  return true;
426  }
427  else {
428  return false;
429  }
430  }
431  }
432  else {
433  return true;
434  }
435  }
436 
437  private:
438  //! File handle to files_[file_nr_]
440  };
441 };
442 
443 /*!
444  * ReadLines is a DOp, which reads a file from the file system and
445  * creates an ordered DIA according to a given read function.
446  *
447  * \param ctx Reference to the context object
448  * \param filepath Path of the file in the file system
449  *
450  * \ingroup dia_sources
451  */
453  return DIA<std::string>(
454  tlx::make_counting<ReadLinesNode>(
455  ctx, filepath, /* local_storage */ false));
456 }
457 
458 /*!
459  * ReadLines is a DOp, which reads a file from the file system and
460  * creates an ordered DIA according to a given read function.
461  *
462  * \param ctx Reference to the context object
463  * \param filepaths Path of the file in the file system
464  *
465  * \ingroup dia_sources
466  */
468  Context& ctx, const std::vector<std::string>& filepaths) {
469  return DIA<std::string>(
470  tlx::make_counting<ReadLinesNode>(
471  ctx, filepaths, /* local_storage */ false));
472 }
473 
475  const std::string& filepath) {
476  return DIA<std::string>(
477  tlx::make_counting<ReadLinesNode>(
478  ctx, filepath, /* local_storage */ true));
479 }
480 
482  const std::vector<std::string>& filepaths) {
483  return DIA<std::string>(
484  tlx::make_counting<ReadLinesNode>(
485  ctx, filepaths, /* local_storage */ true));
486 }
487 
488 } // namespace api
489 
490 //! imported from api namespace
491 using api::ReadLines;
492 
493 } // namespace thrill
494 
495 #endif // !THRILL_API_READ_LINES_HEADER
496 
497 /******************************************************************************/
const size_t read_size
Block read size.
Definition: read_lines.hpp:120
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:152
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
Description of the amount of RAM the internal data structures of a DIANode require.
Definition: dia_base.hpp:51
FileList Glob(const std::vector< std::string > &globlist, const GlobType &gtype)
Reads a glob path list and deliver a file list, sizes, and prefixsums (in bytes) for all matching fil...
Definition: file_io.cpp:127
InputLineIterator & operator=(const InputLineIterator &)=delete
non-copyable: delete assignment operator
InputLineIterator gives you access to lines of a file.
Definition: read_lines.hpp:296
vfs::ReadStreamPtr stream_
File handle to files_[file_nr_].
Definition: read_lines.hpp:292
uint64_t total_size
total size of files
Definition: file_io.hpp:81
iterator begin()
return mutable iterator to first element
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
const vfs::FileList & files_
Input files with size prefixsum.
Definition: read_lines.hpp:124
An Exception which is thrown on system errors and contains errno information.
ReadStreamPtr OpenReadStream(const std::string &path, const common::Range &range)
Construct reader for given path uri.
Definition: file_io.cpp:179
bool HasNext()
returns true, if an element is available in local part
Definition: read_lines.hpp:279
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
uint64_t size_ex_psum(size_t i) const
exclusive prefix sum of file sizes with total_size as sentinel
Definition: file_io.hpp:94
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:43
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
uint64_t size_inc_psum(size_t i) const
inclusive prefix sum of file sizes (only for symmetry with ex_psum)
Definition: file_io.hpp:90
static constexpr bool debug
Definition: read_lines.hpp:43
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:218
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
InputLineIterator(const vfs::FileList &files, ReadLinesNode &node)
Definition: read_lines.hpp:105
size_t offset_
Offset of current block in stream_.
Definition: read_lines.hpp:290
DIA< std::string > ReadLines(Context &ctx, const std::string &filepath)
ReadLines is a DOp, which reads a file from the file system and creates an ordered DIA according to a...
Definition: read_lines.hpp:452
#define TLX_LIKELY(c)
Definition: likely.hpp:23
ReadLinesNode(Context &ctx, const std::vector< std::string > &globlist, bool local_storage)
Constructor for a ReadLinesNode. Sets the Context and file path.
Definition: read_lines.hpp:50
bool ReadBlock(vfs::ReadStreamPtr &file, net::BufferBuilder &buffer)
Definition: read_lines.hpp:143
unsigned char * current_
Start of next element in current buffer.
Definition: read_lines.hpp:131
void PushItem(const std::string &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
std::string data_
String, which Next() references to.
Definition: read_lines.hpp:122
net::BufferBuilder buffer_
Byte buffer to create line std::string values.
Definition: read_lines.hpp:129
size_t size() const
Return the currently used length in bytes.
size_t end
end index
Definition: math.hpp:58
ReadLinesNode & node_
Reference to node.
Definition: read_lines.hpp:135
common::Range CalculateLocalRange(size_t global_size) const
Definition: context.hpp:336
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
vfs::ReadStreamPtr stream_
File handle to files_[file_nr_].
Definition: read_lines.hpp:439
std::vector< T, Allocator< T > > vector
vector with Manager tracking
Definition: allocator.hpp:228
InputLineIteratorUncompressed(const vfs::FileList &files, ReadLinesNode &node, bool local_storage)
Creates an instance of iterator that reads file line based.
Definition: read_lines.hpp:175
common::Range CalculateLocalRangeOnHost(size_t global_size) const
Definition: context.hpp:341
static const size_t bytes
number of bytes in uint_pair
Definition: uint_types.hpp:75
common::Range my_range_
(exclusive) [begin,end) of local block
Definition: read_lines.hpp:133
size_t file_nr_
Index of current file in files_.
Definition: read_lines.hpp:127
const Byte * data() const
Return a pointer to the currently kept memory area.
BufferBuilder & Reserve(size_t n)
Make sure that at least n bytes are allocated.
BufferBuilder represents a dynamically growable area of memory, which can be modified by appending in...
common::JsonLogger logger_
Definition: dia_base.hpp:329
List of file info and additional overall info.
Definition: file_io.hpp:79
tag structure for Read()
Definition: dia.hpp:94
InputLineIteratorCompressed(const vfs::FileList &files, ReadLinesNode &node, bool local_storage)
Creates an instance of iterator that reads file line based.
Definition: read_lines.hpp:300
size_t begin
begin index
Definition: math.hpp:56
std::string join(char glue, const std::vector< std::string > &parts)
Join a vector of strings by some glue character between each pair from the sequence.
Definition: join.cpp:16
InputLineIterator gives you access to lines of a file.
Definition: read_lines.hpp:171
ReadLinesNode(Context &ctx, const std::string &glob, bool local_storage)
Constructor for a ReadLinesNode. Sets the Context and file path.
Definition: read_lines.hpp:65
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:141
iterator end()
return mutable iterator beyond last element
BufferBuilder & set_size(size_t n)
Context & context_
associated Context
Definition: dia_base.hpp:293
virtual DIAMemUse PushDataMemUse()
Amount of RAM used by PushData()
Definition: dia_base.hpp:182
A DIANode which performs a line-based Read operation.
Definition: read_lines.hpp:41
bool HasNext()
returns true, if an element is available in local part
Definition: read_lines.hpp:399