Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
write_binary.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/write_binary.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  * Copyright (C) 2015 Alexander Noe <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_API_WRITE_BINARY_HEADER
14 #define THRILL_API_WRITE_BINARY_HEADER
15 
17 #include <thrill/api/context.hpp>
18 #include <thrill/api/dia.hpp>
19 #include <thrill/common/string.hpp>
22 #include <thrill/vfs/file_io.hpp>
24 
25 #include <algorithm>
26 #include <string>
27 #include <vector>
28 
29 namespace thrill {
30 namespace api {
31 
32 /*!
33  * \ingroup api_layer
34  */
35 template <typename ValueType>
36 class WriteBinaryNode final : public ActionNode
37 {
38  static constexpr bool debug = false;
39 
40 public:
41  using Super = ActionNode;
42  using Super::context_;
43 
44  template <typename ParentDIA>
45  WriteBinaryNode(const ParentDIA& parent,
46  const std::string& path_out,
47  size_t max_file_size)
48  : ActionNode(parent.ctx(), "WriteBinary",
49  { parent.id() }, { parent.node() }),
50  out_pathbase_(path_out),
51  max_file_size_(max_file_size)
52  {
53  sLOG << "Creating write node.";
54 
56  tlx::round_up_to_power_of_two(max_file_size));
57  sLOG << "block_size_" << block_size_;
58 
59  auto pre_op_fn = [=](const ValueType& input) {
60  return PreOp(input);
61  };
62  // close the function stack with our pre op and register it at parent
63  // node for output
64  auto lop_chain = parent.stack().push(pre_op_fn).fold();
65  parent.node()->AddChild(this, lop_chain);
66  }
67 
68  DIAMemUse PreOpMemUse() final {
70  }
71 
72  //! writer preop: put item into file, create files as needed.
73  void PreOp(const ValueType& input) {
75 
76  if (!writer_) OpenNextFile();
77 
78  try {
79  writer_->PutNoSelfVerify(input);
80  }
81  catch (data::FullException&) {
82  // sink is full. flush it. and repeat, which opens new file.
83  OpenNextFile();
84 
85  try {
86  writer_->PutNoSelfVerify(input);
87  }
88  catch (data::FullException&) {
89  throw std::runtime_error(
90  "Error in WriteBinary: "
91  "an item is larger than the file size limit");
92  }
93  }
94  }
95 
96  //! Closes the output file
97  void StopPreOp(size_t /* id */) final {
98  sLOG << "closing file" << out_pathbase_;
99  writer_.reset();
100 
102  << "class" << "WriteBinaryNode"
103  << "total_elements" << stats_total_elements_
104  << "total_writes" << stats_total_writes_;
105  }
106 
107  void Execute() final { }
108 
109 private:
110  //! Implements BlockSink class writing to files with size limit.
111  class SysFileSink final : public data::BoundedBlockSink
112  {
113  public:
114  SysFileSink(api::Context& context,
115  size_t local_worker_id,
116  const std::string& path, size_t max_file_size,
117  size_t& stats_total_elements,
118  size_t& stats_total_writes)
119  : BlockSink(context.block_pool(), local_worker_id),
120  BoundedBlockSink(context.block_pool(), local_worker_id, max_file_size),
121  stream_(vfs::OpenWriteStream(path)),
122  stats_total_elements_(stats_total_elements),
123  stats_total_writes_(stats_total_writes) { }
124 
125  void AppendPinnedBlock(
126  data::PinnedBlock&& b, bool /* is_last_block */) final {
127  sLOG << "SysFileSink::AppendBlock()" << b;
129  stream_->write(b.data_begin(), b.size());
130  }
131 
132  void AppendBlock(const data::Block& block, bool is_last_block) {
133  return AppendPinnedBlock(
134  block.PinWait(local_worker_id()), is_last_block);
135  }
136 
137  void AppendBlock(data::Block&& block, bool is_last_block) {
138  return AppendPinnedBlock(
139  block.PinWait(local_worker_id()), is_last_block);
140  }
141 
142  void Close() final {
143  stream_->close();
144  }
145 
146  private:
147  vfs::WriteStreamPtr stream_;
148  size_t& stats_total_elements_;
149  size_t& stats_total_writes_;
150  };
151 
153 
154  //! Base path of the output file.
156 
157  //! File serial number for this worker
158  size_t out_serial_ = 0;
159 
160  //! Maximum file size
162 
163  //! Block size used by BlockWriter
164  size_t block_size_ = data::default_block_size;
165 
166  //! BlockWriter to sink.
167  std::unique_ptr<Writer> writer_;
168 
171 
172  //! Function to create sink_ and writer_ for next file
173  void OpenNextFile() {
174  writer_.reset();
175 
176  // construct path from pattern containing ### and $$$
178  out_pathbase_, context_.my_rank(), out_serial_++);
179 
180  sLOG << "OpenNextFile() out_path" << out_path;
181 
182  writer_ = std::make_unique<Writer>(
183  SysFileSink(
185  out_path, max_file_size_,
187  block_size_);
188  }
189 };
190 
191 template <typename ValueType, typename Stack>
193  const std::string& filepath, size_t max_file_size) const {
194 
196 
197  auto node = tlx::make_counting<WriteBinaryNode>(
198  *this, filepath, max_file_size);
199 
200  node->RunScope();
201 }
202 
203 template <typename ValueType, typename Stack>
205  const std::string& filepath, size_t max_file_size) const {
206 
208 
209  auto node = tlx::make_counting<WriteBinaryNode>(
210  *this, filepath, max_file_size);
211 
212  return Future<void>(node);
213 }
214 
215 } // namespace api
216 } // namespace thrill
217 
218 #endif // !THRILL_API_WRITE_BINARY_HEADER
219 
220 /******************************************************************************/
std::unique_ptr< Writer > writer_
BlockWriter to sink.
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
Context & context()
Returns the api::Context of this DIABase.
Definition: dia_base.hpp:208
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:240
tlx::CountingPtr< WriteStream > WriteStreamPtr
Definition: file_io.hpp:146
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
std::string FillFilePattern(const std::string &pathbase, size_t worker, size_t file_part)
Definition: file_io.cpp:70
void OpenNextFile()
Function to create sink_ and writer_ for next file.
int round_up_to_power_of_two(int i)
does what it says: round up to next power of two
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
size_t out_serial_
File serial number for this worker.
virtual DIAMemUse PreOpMemUse()
Amount of RAM used by PreOp after StartPreOp()
Definition: dia_base.hpp:160
Specialized template class for ActionFuture which return void.
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
std::string out_pathbase_
Base path of the output file.
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
size_t block_size_
Block size used by BlockWriter.
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
ActionNode(Context &ctx, const char *label, const std::initializer_list< size_t > &parent_ids, const std::initializer_list< DIABasePtr > &parents)
Definition: action_node.hpp:30
WriteBinaryNode(const ParentDIA &parent, const std::string &path_out, size_t max_file_size)
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
common::JsonLogger logger_
Definition: dia_base.hpp:329
size_t local_worker_id() const
Definition: context.hpp:260
void WriteBinary(const std::string &filepath, size_t max_file_size=128 *1024 *1024) const
WriteBinary is a function, which writes a DIA to many files per worker.
static constexpr bool debug
Future< void > WriteBinaryFuture(const std::string &filepath, size_t max_file_size=128 *1024 *1024) const
WriteBinary is a function, which writes a DIA to many files per worker.
size_t max_file_size_
Maximum file size.
WriteStreamPtr OpenWriteStream(const std::string &path)
Definition: file_io.cpp:210
Context & context_
associated Context
Definition: dia_base.hpp:293