Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
read_binary.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/read_binary.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_API_READ_BINARY_HEADER
14 #define THRILL_API_READ_BINARY_HEADER
15 
16 #include <thrill/api/context.hpp>
17 #include <thrill/api/dia.hpp>
20 #include <thrill/common/logger.hpp>
21 #include <thrill/data/block.hpp>
24 #include <thrill/vfs/file_io.hpp>
25 
27 #include <tlx/string/join.hpp>
28 
29 #include <algorithm>
30 #include <limits>
31 #include <string>
32 #include <vector>
33 
34 namespace thrill {
35 namespace api {
36 
37 /*!
38  * A DIANode which performs a line-based Read operation. Read reads a file from
39  * the file system and emits it as a DIA.
40  *
41  * \ingroup api_layer
42  */
43 template <typename ValueType>
44 class ReadBinaryNode final : public SourceNode<ValueType>
45 {
46  static constexpr bool debug = false;
47 
48  //! for testing old method of pushing items instead of PushFile().
49  static constexpr bool debug_no_extfile = false;
50 
51 private:
52  class VfsFileBlockSource;
53 
54 public:
56  using Super::context_;
57 
59 
60  //! flag whether ValueType is fixed size
61  static constexpr bool is_fixed_size_ =
63 
64  //! fixed size of ValueType or zero.
65  static constexpr size_t fixed_size_ =
67 
68  //! structure to store info on what to read from files
69  struct FileInfo {
71  //! begin and end offsets in file.
73  //! whether file is compressed
75  };
76 
77  //! sentinel to disable size limit
78  static constexpr uint64_t no_size_limit_ =
80 
81  ReadBinaryNode(Context& ctx, const std::vector<std::string>& globlist,
82  uint64_t size_limit, bool local_storage)
83  : Super(ctx, "ReadBinary") {
84 
85  vfs::FileList files = vfs::Glob(globlist, vfs::GlobType::File);
86 
87  if (files.size() == 0)
88  die("ReadBinary: no files found in globs: " + tlx::join(' ', globlist));
89 
90  if (size_limit != no_size_limit_)
91  files.total_size = std::min(files.total_size, size_limit);
92 
93  if (is_fixed_size_ && !files.contains_compressed)
94  {
95  // use fixed_size information to split binary files.
96 
97  // check that files have acceptable sizes
98  for (size_t i = 0; i < files.size(); ++i) {
99  if (files[i].size % fixed_size_ == 0) continue;
100 
101  die("ReadBinary: path " + files[i].path +
102  " size is not a multiple of " << size_t(fixed_size_));
103  }
104 
105  common::Range my_range;
106 
107  if (local_storage) {
109  files.total_size / fixed_size_);
110  }
111  else {
112  my_range = context_.CalculateLocalRange(
113  files.total_size / fixed_size_);
114  }
115 
116  my_range.begin *= fixed_size_;
117  my_range.end *= fixed_size_;
118 
119  sLOG << "ReadBinaryNode:" << ctx.num_workers()
120  << "my_range" << my_range;
121 
122  size_t i = 0;
123  while (i < files.size() &&
124  files[i].size_inc_psum() <= my_range.begin) {
125  i++;
126  }
127 
128  for ( ; i < files.size() &&
129  files.size_ex_psum(i) <= my_range.end; ++i) {
130 
131  size_t file_begin = files.size_ex_psum(i);
132  size_t file_end = files.size_inc_psum(i);
133  size_t file_size = files[i].size;
134 
135  FileInfo fi;
136  fi.path = files[i].path;
137  fi.range = common::Range(
138  my_range.begin <= file_begin ? 0 : my_range.begin - file_begin,
139  my_range.end >= file_end ? file_size : my_range.end - file_begin);
140  fi.is_compressed = false;
141 
142  sLOG << "ReadBinary: fileinfo"
143  << "path" << fi.path << "range" << fi.range;
144 
145  if (fi.range.begin == fi.range.end) continue;
146 
147  if (files.contains_remote_uri || debug_no_extfile) {
148  // push file and range into file list for remote files
149  // (these cannot be mapped using the io layer)
150  my_files_.push_back(fi);
151  }
152  else {
153  // new method: map blocks into a File using io layer
154 
155  foxxll::file_ptr file =
156  tlx::make_counting<foxxll::syscall_file>(
157  fi.path,
159 
160  size_t item_off = 0;
161 
162  for (size_t off = fi.range.begin; off < fi.range.end;
163  off += data::default_block_size) {
164 
165  size_t bsize = std::min(
166  off + data::default_block_size, fi.range.end) - off;
167 
168  data::ByteBlockPtr bbp =
170  file, off, bsize);
171 
172  size_t item_num =
173  (bsize - item_off + fixed_size_ - 1) / fixed_size_;
174 
175  data::Block block(
176  std::move(bbp), 0, bsize, item_off, item_num,
177  /* typecode_verify */ false);
178 
179  item_off += item_num * fixed_size_ - bsize;
180 
181  LOG << "ReadBinary: adding Block " << block;
182  ext_file_.AppendBlock(std::move(block));
183  }
184 
185  use_ext_file_ = true;
186  }
187  }
188  }
189  else
190  {
191  // split filelist by whole files.
192  size_t i = 0;
193 
194  common::Range my_range;
195 
196  if (local_storage) {
198  files.total_size);
199  }
200  else {
201  my_range = context_.CalculateLocalRange(files.total_size);
202  }
203 
204  while (i < files.size() &&
205  files[i].size_inc_psum() <= my_range.begin) {
206  i++;
207  }
208 
209  while (i < files.size() &&
210  files[i].size_inc_psum() <= my_range.end) {
211  my_files_.push_back(
212  FileInfo { files[i].path,
214  files[i].IsCompressed() });
215  i++;
216  }
217 
218  sLOG << "ReadBinary:" << my_files_.size() << "files,"
219  << "my_range" << my_range;
220  }
221  }
222 
223  ReadBinaryNode(Context& ctx, const std::string& glob, uint64_t size_limit,
224  bool local_storage)
225  : ReadBinaryNode(ctx, std::vector<std::string>{ glob }, size_limit,
227 
228  void PushData(bool consume) final {
229  LOG << "ReadBinaryNode::PushData() start " << *this
230  << " consume=" << consume
231  << " use_ext_file_=" << use_ext_file_;
232 
233  if (use_ext_file_)
234  return this->PushFile(ext_file_, consume);
235 
236  // Hook Read
237  for (const FileInfo& file : my_files_) {
238  LOG << "ReadBinaryNode::PushData() opening " << file.path;
239 
241  VfsFileBlockSource(file, context_,
243 
244  while (br.HasNext()) {
245  this->PushItem(br.template NextNoSelfVerify<ValueType>());
246  }
247  }
248 
250  << "class" << "ReadBinaryNode"
251  << "event" << "done"
252  << "total_bytes" << stats_total_bytes
253  << "total_reads" << stats_total_reads;
254  }
255 
256  void Dispose() final {
257  std::vector<FileInfo>().swap(my_files_);
258  ext_file_.Clear();
259  }
260 
261 private:
262  //! list of files for non-mapped File push
263  std::vector<FileInfo> my_files_;
264 
265  //! File containing Blocks mapped directly to a io fileimpl.
266  bool use_ext_file_ = false;
268 
269  size_t stats_total_bytes = 0;
270  size_t stats_total_reads = 0;
271 
273  {
274  public:
276 
277  VfsFileBlockSource(const FileInfo& fileinfo,
278  Context& ctx,
279  size_t& stats_total_bytes,
280  size_t& stats_total_reads)
281  : context_(ctx),
282  remain_size_(fileinfo.range.size()),
283  is_compressed_(fileinfo.is_compressed),
284  stats_total_bytes_(stats_total_bytes),
285  stats_total_reads_(stats_total_reads) {
286  // open file
287  if (!is_compressed_) {
288  stream_ = vfs::OpenReadStream(fileinfo.path, fileinfo.range);
289  }
290  else {
291  stream_ = vfs::OpenReadStream(fileinfo.path);
292  }
293  }
294 
296  if (done_ || remain_size_ == 0)
297  return data::PinnedBlock();
298 
302 
303  size_t rb = is_compressed_
305 
306  ssize_t size = stream_->read(bytes->data(), rb);
307  stats_total_bytes_ += size;
309 
310  LOG << "VfsFileBlockSource::NextBlock() size " << size;
311 
312  if (size > 0) {
313  if (!is_compressed_) {
314  assert(remain_size_ >= rb);
315  remain_size_ -= rb;
316  }
317  return data::PinnedBlock(std::move(bytes), 0, size, 0, 0,
318  /* typecode_verify */ false);
319  }
320  else if (size < 0) {
321  throw common::ErrnoException("Error reading vfs file");
322  }
323  else {
324  // size == 0 -> read finished
325  stream_->close();
326  done_ = true;
327  return data::PinnedBlock();
328  }
329  }
330 
331  private:
334  size_t remain_size_;
338  bool done_ = false;
339  };
340 };
341 
342 /*!
343  * ReadBinary is a DOp, which reads a file written by WriteBinary from the file
344  * system and creates a DIA.
345  *
346  * \param ctx Reference to the context object
347  * \param filepath Path of the file in the file system
348  * \param size_limit Optional limit to the total file size (e.g. for testing
349  * algorithms on prefixes)
350  *
351  * \ingroup dia_sources
352  */
353 template <typename ValueType>
355  Context& ctx, const std::vector<std::string>& filepath,
356  uint64_t size_limit = ReadBinaryNode<ValueType>::no_size_limit_) {
357 
358  auto node = tlx::make_counting<ReadBinaryNode<ValueType> >(
359  ctx, filepath, size_limit, /* local_storage */ false);
360 
361  return DIA<ValueType>(node);
362 }
363 
364 template <typename ValueType>
366  struct LocalStorageTag, Context& ctx,
367  const std::vector<std::string>& filepath,
368  uint64_t size_limit = ReadBinaryNode<ValueType>::no_size_limit_) {
369 
370  auto node = tlx::make_counting<ReadBinaryNode<ValueType> >(
371  ctx, filepath, size_limit, /* local_storage */ true);
372 
373  return DIA<ValueType>(node);
374 }
375 
376 /*!
377  * ReadBinary is a DOp, which reads a file written by WriteBinary from the file
378  * system and creates a DIA.
379  *
380  * \param ctx Reference to the context object
381  * \param filepath Path of the file in the file system
382  * \param size_limit Optional limit to the total file size (e.g. for testing
383  * algorithms on prefixes)
384  *
385  * \ingroup dia_sources
386  */
387 template <typename ValueType>
389  Context& ctx, const std::string& filepath,
390  uint64_t size_limit = ReadBinaryNode<ValueType>::no_size_limit_) {
391 
392  auto node = tlx::make_counting<ReadBinaryNode<ValueType> >(
393  ctx, filepath, size_limit, /* local_storage */ false);
394 
395  return DIA<ValueType>(node);
396 }
397 
398 template <typename ValueType>
400  struct LocalStorageTag, Context& ctx, const std::string& filepath,
401  uint64_t size_limit = ReadBinaryNode<ValueType>::no_size_limit_) {
402 
403  auto node = tlx::make_counting<ReadBinaryNode<ValueType> >(
404  ctx, filepath, size_limit, /* local_storage */ true);
405 
406  return DIA<ValueType>(node);
407 }
408 
409 } // namespace api
410 
411 //! imported from api namespace
412 using api::ReadBinary;
413 
414 } // namespace thrill
415 
416 #endif // !THRILL_API_READ_BINARY_HEADER
417 
418 /******************************************************************************/
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
Definition: block.hpp:52
void PushFile(data::File &file, bool consume) const
Definition: dia_node.hpp:156
only reading of the file is allowed
Definition: file.hpp:67
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:152
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
FileList Glob(const std::vector< std::string > &globlist, const GlobType &gtype)
Reads a glob path list and deliver a file list, sizes, and prefixsums (in bytes) for all matching fil...
Definition: file_io.cpp:127
uint64_t total_size
total size of files
Definition: file_io.hpp:81
data::BlockReader< VfsFileBlockSource > VfsFileBlockReader
Definition: read_binary.hpp:58
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
void Clear()
Free all Blocks in the File and deallocate vectors.
Definition: file.cpp:57
do not acquire an exclusive lock by default
Definition: file.hpp:84
An Exception which is thrown on system errors and contains errno information.
ReadStreamPtr OpenReadStream(const std::string &path, const common::Range &range)
Construct reader for given path uri.
Definition: file_io.cpp:179
A pinned / pin-counted pointer to a ByteBlock.
Definition: byte_block.hpp:182
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
bool contains_remote_uri
whether the list contains a remote-uri file.
Definition: file_io.hpp:87
uint64_t size_ex_psum(size_t i) const
exclusive prefix sum of file sizes with total_size as sentinel
Definition: file_io.hpp:94
static constexpr bool is_fixed_size_
flag whether ValueType is fixed size
Definition: read_binary.hpp:61
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:43
static constexpr uint64_t no_size_limit_
sentinel to disable size limit
Definition: read_binary.hpp:78
uint64_t size_inc_psum(size_t i) const
inclusive prefix sum of file sizes (only for symmetry with ex_psum)
Definition: file_io.hpp:90
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:218
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
DIA< ValueType > ReadBinary(Context &ctx, const std::vector< std::string > &filepath, uint64_t size_limit=ReadBinaryNode< ValueType >::no_size_limit_)
ReadBinary is a DOp, which reads a file written by WriteBinary from the file system and creates a DIA...
PinnedByteBlockPtr AllocateByteBlock(size_t size, size_t local_worker_id)
Definition: block_pool.cpp:484
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
void AppendBlock(const Block &b)
Definition: file.hpp:88
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t end
end index
Definition: math.hpp:58
ReadBinaryNode(Context &ctx, const std::string &glob, uint64_t size_limit, bool local_storage)
bool use_ext_file_
File containing Blocks mapped directly to a io fileimpl.
common::Range CalculateLocalRange(size_t global_size) const
Definition: context.hpp:336
ReadBinaryNode(Context &ctx, const std::vector< std::string > &globlist, uint64_t size_limit, bool local_storage)
Definition: read_binary.hpp:81
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
std::vector< T, Allocator< T > > vector
vector with Manager tracking
Definition: allocator.hpp:228
High-performance smart pointer used as a wrapping reference counting pointer.
A DIANode which performs a line-based Read operation.
Definition: read_binary.hpp:44
common::Range CalculateLocalRangeOnHost(size_t global_size) const
Definition: context.hpp:341
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:280
static const size_t bytes
number of bytes in uint_pair
Definition: uint_types.hpp:75
static constexpr size_t fixed_size_
fixed size of ValueType or zero.
Definition: read_binary.hpp:65
static constexpr bool debug_no_extfile
for testing old method of pushing items instead of PushFile().
Definition: read_binary.hpp:49
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
common::Range range
begin and end offsets in file.
Definition: read_binary.hpp:72
common::JsonLogger logger_
Definition: dia_base.hpp:329
List of file info and additional overall info.
Definition: file_io.hpp:79
A pinned / pin-counted derivative of a Block.
Definition: block.hpp:157
size_t local_worker_id() const
Definition: context.hpp:260
tag structure for Read()
Definition: dia.hpp:94
bool is_compressed
whether file is compressed
Definition: read_binary.hpp:74
size_t begin
begin index
Definition: math.hpp:56
std::string join(char glue, const std::vector< std::string > &parts)
Join a vector of strings by some glue character between each pair from the sequence.
Definition: join.cpp:16
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:248
structure to store info on what to read from files
Definition: read_binary.hpp:69
bool contains_compressed
whether the list contains a compressed file.
Definition: file_io.hpp:84
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:141
static constexpr bool debug
Definition: read_binary.hpp:46
Context & context_
associated Context
Definition: dia_base.hpp:293
VfsFileBlockSource(const FileInfo &fileinfo, Context &ctx, size_t &stats_total_bytes, size_t &stats_total_reads)
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
Definition: context.hpp:321
ByteBlockPtr MapExternalBlock(const foxxll::file_ptr &file, uint64_t offset, size_t size)
Definition: block_pool.cpp:528