Thrill  0.1
read_binary.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/read_binary.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_API_READ_BINARY_HEADER
14 #define THRILL_API_READ_BINARY_HEADER
15 
16 #include <thrill/api/context.hpp>
17 #include <thrill/api/dia.hpp>
20 #include <thrill/common/logger.hpp>
21 #include <thrill/data/block.hpp>
24 #include <thrill/vfs/file_io.hpp>
25 
27 #include <tlx/string/join.hpp>
28 #include <tlx/vector_free.hpp>
29 
30 #include <algorithm>
31 #include <limits>
32 #include <string>
33 #include <vector>
34 
35 namespace thrill {
36 namespace api {
37 
38 /*!
39  * A DIANode which performs a line-based Read operation. Read reads a file from
40  * the file system and emits it as a DIA.
41  *
42  * \ingroup api_layer
43  */
44 template <typename ValueType>
45 class ReadBinaryNode final : public SourceNode<ValueType>
46 {
47  static constexpr bool debug = false;
48 
49  //! for testing old method of pushing items instead of PushFile().
50  static constexpr bool debug_no_extfile = false;
51 
52 private:
53  class VfsFileBlockSource;
54 
55 public:
57  using Super::context_;
58 
60 
61  //! flag whether ValueType is fixed size
62  static constexpr bool is_fixed_size_ =
64 
65  //! fixed size of ValueType or zero.
66  static constexpr size_t fixed_size_ =
68 
69  //! structure to store info on what to read from files
70  struct FileInfo {
72  //! begin and end offsets in file.
74  //! whether file is compressed
76  };
77 
78  //! sentinel to disable size limit
79  static constexpr uint64_t no_size_limit_ =
81 
82  ReadBinaryNode(Context& ctx, const std::vector<std::string>& globlist,
83  uint64_t size_limit, bool local_storage)
84  : Super(ctx, "ReadBinary") {
85 
86  vfs::FileList files = vfs::Glob(globlist, vfs::GlobType::File);
87 
88  if (files.size() == 0)
89  die("ReadBinary: no files found in globs: " + tlx::join(' ', globlist));
90 
91  if (size_limit != no_size_limit_)
92  files.total_size = std::min(files.total_size, size_limit);
93 
94  if (is_fixed_size_ && !files.contains_compressed)
95  {
96  // use fixed_size information to split binary files.
97 
98  // check that files have acceptable sizes
99  for (size_t i = 0; i < files.size(); ++i) {
100  if (files[i].size % fixed_size_ == 0) continue;
101 
102  die("ReadBinary: path " + files[i].path +
103  " size is not a multiple of " << size_t(fixed_size_));
104  }
105 
106  common::Range my_range;
107 
108  if (local_storage) {
110  files.total_size / fixed_size_);
111  }
112  else {
113  my_range = context_.CalculateLocalRange(
114  files.total_size / fixed_size_);
115  }
116 
117  my_range.begin *= fixed_size_;
118  my_range.end *= fixed_size_;
119 
120  sLOG << "ReadBinaryNode:" << ctx.num_workers()
121  << "my_range" << my_range;
122 
123  size_t i = 0;
124  while (i < files.size() &&
125  files[i].size_inc_psum() <= my_range.begin) {
126  i++;
127  }
128 
129  for ( ; i < files.size() &&
130  files.size_ex_psum(i) <= my_range.end; ++i) {
131 
132  size_t file_begin = files.size_ex_psum(i);
133  size_t file_end = files.size_inc_psum(i);
134  size_t file_size = files[i].size;
135 
136  FileInfo fi;
137  fi.path = files[i].path;
138  fi.range = common::Range(
139  my_range.begin <= file_begin ? 0 : my_range.begin - file_begin,
140  my_range.end >= file_end ? file_size : my_range.end - file_begin);
141  fi.is_compressed = false;
142 
143  sLOG << "ReadBinary: fileinfo"
144  << "path" << fi.path << "range" << fi.range;
145 
146  if (fi.range.begin == fi.range.end) continue;
147 
148  if (files.contains_remote_uri || debug_no_extfile) {
149  // push file and range into file list for remote files
150  // (these cannot be mapped using the io layer)
151  my_files_.push_back(fi);
152  }
153  else {
154  // new method: map blocks into a File using io layer
155 
156  foxxll::file_ptr file =
157  tlx::make_counting<foxxll::syscall_file>(
158  fi.path,
160 
161  size_t item_off = 0;
162 
163  for (size_t off = fi.range.begin; off < fi.range.end;
164  off += data::default_block_size) {
165 
166  size_t bsize = std::min(
167  off + data::default_block_size, fi.range.end) - off;
168 
169  data::ByteBlockPtr bbp =
171  file, off, bsize);
172 
173  size_t item_num =
174  (bsize - item_off + fixed_size_ - 1) / fixed_size_;
175 
176  data::Block block(
177  std::move(bbp), 0, bsize, item_off, item_num,
178  /* typecode_verify */ false);
179 
180  item_off += item_num * fixed_size_ - bsize;
181 
182  LOG << "ReadBinary: adding Block " << block;
183  ext_file_.AppendBlock(std::move(block));
184  }
185 
186  use_ext_file_ = true;
187  }
188  }
189  }
190  else
191  {
192  // split filelist by whole files.
193  size_t i = 0;
194 
195  common::Range my_range;
196 
197  if (local_storage) {
199  files.total_size);
200  }
201  else {
202  my_range = context_.CalculateLocalRange(files.total_size);
203  }
204 
205  while (i < files.size() &&
206  files[i].size_inc_psum() <= my_range.begin) {
207  i++;
208  }
209 
210  while (i < files.size() &&
211  files[i].size_inc_psum() <= my_range.end) {
212  my_files_.push_back(
213  FileInfo { files[i].path,
215  files[i].IsCompressed() });
216  i++;
217  }
218 
219  sLOG << "ReadBinary:" << my_files_.size() << "files,"
220  << "my_range" << my_range;
221  }
222  }
223 
224  ReadBinaryNode(Context& ctx, const std::string& glob, uint64_t size_limit,
225  bool local_storage)
226  : ReadBinaryNode(ctx, std::vector<std::string>{ glob }, size_limit,
227  local_storage) { }
228 
229  void PushData(bool consume) final {
230  LOG << "ReadBinaryNode::PushData() start " << *this
231  << " consume=" << consume
232  << " use_ext_file_=" << use_ext_file_;
233 
234  if (use_ext_file_)
235  return this->PushFile(ext_file_, consume);
236 
237  // Hook Read
238  for (const FileInfo& file : my_files_) {
239  LOG << "ReadBinaryNode::PushData() opening " << file.path;
240 
244 
245  while (br.HasNext()) {
246  this->PushItem(br.template NextNoSelfVerify<ValueType>());
247  }
248  }
249 
251  << "class" << "ReadBinaryNode"
252  << "event" << "done"
253  << "total_bytes" << stats_total_bytes
254  << "total_reads" << stats_total_reads;
255  }
256 
257  void Dispose() final {
259  ext_file_.Clear();
260  }
261 
262 private:
263  //! list of files for non-mapped File push
264  std::vector<FileInfo> my_files_;
265 
266  //! File containing Blocks mapped directly to a io fileimpl.
267  bool use_ext_file_ = false;
269 
270  size_t stats_total_bytes = 0;
271  size_t stats_total_reads = 0;
272 
274  {
275  public:
276  const size_t block_size = data::default_block_size;
277 
278  VfsFileBlockSource(const FileInfo& fileinfo,
279  Context& ctx,
280  size_t& stats_total_bytes,
281  size_t& stats_total_reads)
282  : context_(ctx),
283  remain_size_(fileinfo.range.size()),
284  is_compressed_(fileinfo.is_compressed),
285  stats_total_bytes_(stats_total_bytes),
286  stats_total_reads_(stats_total_reads) {
287  // open file
288  if (!is_compressed_) {
289  stream_ = vfs::OpenReadStream(fileinfo.path, fileinfo.range);
290  }
291  else {
292  stream_ = vfs::OpenReadStream(fileinfo.path);
293  }
294  }
295 
297  if (done_ || remain_size_ == 0)
298  return data::PinnedBlock();
299 
302  block_size, context_.local_worker_id());
303 
304  size_t rb = is_compressed_
305  ? block_size : std::min(block_size, remain_size_);
306 
307  ssize_t size = stream_->read(bytes->data(), rb);
308  stats_total_bytes_ += size;
309  stats_total_reads_++;
310 
311  LOG << "VfsFileBlockSource::NextBlock() size " << size;
312 
313  if (size > 0) {
314  if (!is_compressed_) {
315  assert(remain_size_ >= rb);
316  remain_size_ -= rb;
317  }
318  return data::PinnedBlock(std::move(bytes), 0, size, 0, 0,
319  /* typecode_verify */ false);
320  }
321  else if (size < 0) {
322  throw common::ErrnoException("Error reading vfs file");
323  }
324  else {
325  // size == 0 -> read finished
326  stream_->close();
327  done_ = true;
328  return data::PinnedBlock();
329  }
330  }
331 
332  private:
335  size_t remain_size_;
339  bool done_ = false;
340  };
341 };
342 
343 /*!
344  * ReadBinary is a DOp, which reads a file written by WriteBinary from the file
345  * system and creates a DIA.
346  *
347  * \image html dia_ops/ReadBinary.svg
348  *
349  * \param ctx Reference to the context object
350  * \param filepath Path of the file in the file system
351  * \param size_limit Optional limit to the total file size (e.g. for testing
352  * algorithms on prefixes)
353  *
354  * \ingroup dia_sources
355  */
356 template <typename ValueType>
358  Context& ctx, const std::vector<std::string>& filepath,
359  uint64_t size_limit = ReadBinaryNode<ValueType>::no_size_limit_) {
360 
361  auto node = tlx::make_counting<ReadBinaryNode<ValueType> >(
362  ctx, filepath, size_limit, /* local_storage */ false);
363 
364  return DIA<ValueType>(node);
365 }
366 
367 /*!
368  * ReadBinary is a DOp, which reads a file written by WriteBinary from the file
369  * system and creates a DIA.
370  *
371  * \image html dia_ops/ReadBinary.svg
372  *
373  * \param ctx Reference to the context object
374  * \param filepath Path of the file in the file system
375  * \param size_limit Optional limit to the total file size (e.g. for testing
376  * algorithms on prefixes)
377  *
378  * \ingroup dia_sources
379  */
380 template <typename ValueType>
382  struct LocalStorageTag, Context& ctx,
383  const std::vector<std::string>& filepath,
384  uint64_t size_limit = ReadBinaryNode<ValueType>::no_size_limit_) {
385 
386  auto node = tlx::make_counting<ReadBinaryNode<ValueType> >(
387  ctx, filepath, size_limit, /* local_storage */ true);
388 
389  return DIA<ValueType>(node);
390 }
391 
392 /*!
393  * ReadBinary is a DOp, which reads a file written by WriteBinary from the file
394  * system and creates a DIA.
395  *
396  * \image html dia_ops/ReadBinary.svg
397  *
398  * \param ctx Reference to the context object
399  * \param filepath Path of the file in the file system
400  * \param size_limit Optional limit to the total file size (e.g. for testing
401  * algorithms on prefixes)
402  *
403  * \ingroup dia_sources
404  */
405 template <typename ValueType>
407  Context& ctx, const std::string& filepath,
408  uint64_t size_limit = ReadBinaryNode<ValueType>::no_size_limit_) {
409 
410  auto node = tlx::make_counting<ReadBinaryNode<ValueType> >(
411  ctx, filepath, size_limit, /* local_storage */ false);
412 
413  return DIA<ValueType>(node);
414 }
415 
416 /*!
417  * ReadBinary is a DOp, which reads a file written by WriteBinary from the file
418  * system and creates a DIA.
419  *
420  * \image html dia_ops/ReadBinary.svg
421  *
422  * \param ctx Reference to the context object
423  * \param filepath Path of the file in the file system
424  * \param size_limit Optional limit to the total file size (e.g. for testing
425  * algorithms on prefixes)
426  *
427  * \ingroup dia_sources
428  */
429 template <typename ValueType>
431  struct LocalStorageTag, Context& ctx, const std::string& filepath,
432  uint64_t size_limit = ReadBinaryNode<ValueType>::no_size_limit_) {
433 
434  auto node = tlx::make_counting<ReadBinaryNode<ValueType> >(
435  ctx, filepath, size_limit, /* local_storage */ true);
436 
437  return DIA<ValueType>(node);
438 }
439 
440 } // namespace api
441 
442 //! imported from api namespace
443 using api::ReadBinary;
444 
445 } // namespace thrill
446 
447 #endif // !THRILL_API_READ_BINARY_HEADER
448 
449 /******************************************************************************/
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
only reading of the file is allowed
Definition: file.hpp:67
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
FileList Glob(const std::vector< std::string > &globlist, const GlobType &gtype)
Reads a glob path list and deliver a file list, sizes, and prefixsums (in bytes) for all matching fil...
Definition: file_io.cpp:128
uint64_t total_size
total size of files
Definition: file_io.hpp:81
common::Range CalculateLocalRange(size_t global_size) const
Definition: context.hpp:339
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:251
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
void Clear()
Free all Blocks in the File and deallocate vectors.
Definition: file.cpp:57
do not acquire an exclusive lock by default
Definition: file.hpp:84
An Exception which is thrown on system errors and contains errno information.
ReadStreamPtr OpenReadStream(const std::string &path, const common::Range &range)
Construct reader for given path uri.
Definition: file_io.cpp:180
A pinned / pin-counted pointer to a ByteBlock.
Definition: byte_block.hpp:205
STL namespace.
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
bool contains_remote_uri
whether the list contains a remote-uri file.
Definition: file_io.hpp:87
static constexpr bool is_fixed_size_
flag whether ValueType is fixed size
Definition: read_binary.hpp:62
A non-pinned counting pointer to a ByteBlock.
Definition: byte_block.hpp:176
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
static constexpr uint64_t no_size_limit_
sentinel to disable size limit
Definition: read_binary.hpp:79
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
std::vector< FileInfo > my_files_
list of files for non-mapped File push
DIA< ValueType > ReadBinary(Context &ctx, const std::vector< std::string > &filepath, uint64_t size_limit=ReadBinaryNode< ValueType >::no_size_limit_)
ReadBinary is a DOp, which reads a file written by WriteBinary from the file system and creates a DIA...
PinnedByteBlockPtr AllocateByteBlock(size_t size, size_t local_worker_id)
Definition: block_pool.cpp:484
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
uint64_t size_inc_psum(size_t i) const
inclusive prefix sum of file sizes (only for symmetry with ex_psum)
Definition: file_io.hpp:90
void AppendBlock(const Block &b)
Definition: file.hpp:88
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t end
end index
Definition: math.hpp:58
ReadBinaryNode(Context &ctx, const std::string &glob, uint64_t size_limit, bool local_storage)
uint64_t size_ex_psum(size_t i) const
exclusive prefix sum of file sizes with total_size as sentinel
Definition: file_io.hpp:94
bool use_ext_file_
File containing Blocks mapped directly to a io fileimpl.
ReadBinaryNode(Context &ctx, const std::vector< std::string > &globlist, uint64_t size_limit, bool local_storage)
Definition: read_binary.hpp:82
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
std::vector< T, Allocator< T > > vector
vector with Manager tracking
Definition: allocator.hpp:228
size_t local_worker_id() const
Definition: context.hpp:263
High-performance smart pointer used as a wrapping reference counting pointer.
A DIANode which performs a line-based Read operation.
Definition: read_binary.hpp:45
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:283
static const size_t bytes
number of bytes in uint_pair
Definition: uint_types.hpp:75
static constexpr size_t fixed_size_
fixed size of ValueType or zero.
Definition: read_binary.hpp:66
void vector_free(std::vector< Type > &v)
Definition: vector_free.hpp:21
static constexpr bool debug_no_extfile
for testing old method of pushing items instead of PushFile().
Definition: read_binary.hpp:50
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
common::Range range
begin and end offsets in file.
Definition: read_binary.hpp:73
common::JsonLogger logger_
Definition: dia_base.hpp:329
List of file info and additional overall info.
Definition: file_io.hpp:79
common::Range CalculateLocalRangeOnHost(size_t global_size) const
Definition: context.hpp:344
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
tag structure for Read()
Definition: dia.hpp:94
bool is_compressed
whether file is compressed
Definition: read_binary.hpp:75
size_t begin
begin index
Definition: math.hpp:56
std::string join(char glue, const std::vector< std::string > &parts)
Join a vector of strings by some glue character between each pair from the sequence.
Definition: join.cpp:16
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
structure to store info on what to read from files
Definition: read_binary.hpp:70
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
bool contains_compressed
whether the list contains a compressed file.
Definition: file_io.hpp:84
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
void PushFile(data::File &file, bool consume) const
Definition: dia_node.hpp:156
static constexpr bool debug
Definition: read_binary.hpp:47
Context & context_
associated Context
Definition: dia_base.hpp:293
VfsFileBlockSource(const FileInfo &fileinfo, Context &ctx, size_t &stats_total_bytes, size_t &stats_total_reads)
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
Definition: context.hpp:324
ByteBlockPtr MapExternalBlock(const foxxll::file_ptr &file, uint64_t offset, size_t size)
Definition: block_pool.cpp:528