Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
write_binary.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/write_binary.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  * Copyright (C) 2015 Alexander Noe <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_API_WRITE_BINARY_HEADER
14 #define THRILL_API_WRITE_BINARY_HEADER
15 
17 #include <thrill/api/context.hpp>
18 #include <thrill/api/dia.hpp>
19 #include <thrill/common/string.hpp>
22 #include <thrill/vfs/file_io.hpp>
24 
25 #include <algorithm>
26 #include <string>
27 #include <vector>
28 
29 namespace thrill {
30 namespace api {
31 
32 /*!
33  * \ingroup api_layer
34  */
35 template <typename ValueType>
36 class WriteBinaryNode final : public ActionNode
37 {
38  static constexpr bool debug = false;
39 
40 public:
41  using Super = ActionNode;
42  using Super::context_;
43 
44  template <typename ParentDIA>
45  WriteBinaryNode(const ParentDIA& parent,
46  const std::string& path_out,
47  size_t max_file_size)
48  : ActionNode(parent.ctx(), "WriteBinary",
49  { parent.id() }, { parent.node() }),
50  out_pathbase_(path_out),
51  max_file_size_(max_file_size) {
52  sLOG << "Creating write node.";
53 
55  tlx::round_up_to_power_of_two(max_file_size));
56  sLOG << "block_size_" << block_size_;
57 
58  auto pre_op_fn = [=](const ValueType& input) {
59  return PreOp(input);
60  };
61  // close the function stack with our pre op and register it at parent
62  // node for output
63  auto lop_chain = parent.stack().push(pre_op_fn).fold();
64  parent.node()->AddChild(this, lop_chain);
65  }
66 
67  DIAMemUse PreOpMemUse() final {
69  }
70 
71  //! writer preop: put item into file, create files as needed.
72  void PreOp(const ValueType& input) {
74 
75  if (!writer_) OpenNextFile();
76 
77  try {
78  writer_->PutNoSelfVerify(input);
79  }
80  catch (data::FullException&) {
81  // sink is full. flush it. and repeat, which opens new file.
82  OpenNextFile();
83 
84  try {
85  writer_->PutNoSelfVerify(input);
86  }
87  catch (data::FullException&) {
88  throw std::runtime_error(
89  "Error in WriteBinary: "
90  "an item is larger than the file size limit");
91  }
92  }
93  }
94 
95  //! Closes the output file
96  void StopPreOp(size_t /* parent_index */) final {
97  sLOG << "closing file" << out_pathbase_;
98  writer_.reset();
99 
101  << "class" << "WriteBinaryNode"
102  << "total_elements" << stats_total_elements_
103  << "total_writes" << stats_total_writes_;
104  }
105 
106  void Execute() final { }
107 
108 private:
109  //! Implements BlockSink class writing to files with size limit.
110  class SysFileSink final : public data::BoundedBlockSink
111  {
112  public:
113  SysFileSink(api::Context& context,
114  size_t local_worker_id,
115  const std::string& path, size_t max_file_size,
116  size_t& stats_total_elements,
117  size_t& stats_total_writes)
118  : BlockSink(context.block_pool(), local_worker_id),
119  BoundedBlockSink(context.block_pool(), local_worker_id, max_file_size),
120  stream_(vfs::OpenWriteStream(path)),
121  stats_total_elements_(stats_total_elements),
122  stats_total_writes_(stats_total_writes) { }
123 
124  void AppendPinnedBlock(
125  data::PinnedBlock&& b, bool /* is_last_block */) final {
126  sLOG << "SysFileSink::AppendBlock()" << b;
128  stream_->write(b.data_begin(), b.size());
129  }
130 
131  void AppendBlock(const data::Block& block, bool is_last_block) {
132  return AppendPinnedBlock(
133  block.PinWait(local_worker_id()), is_last_block);
134  }
135 
136  void AppendBlock(data::Block&& block, bool is_last_block) {
137  return AppendPinnedBlock(
138  block.PinWait(local_worker_id()), is_last_block);
139  }
140 
141  void Close() final {
142  stream_->close();
143  }
144 
145  private:
146  vfs::WriteStreamPtr stream_;
147  size_t& stats_total_elements_;
148  size_t& stats_total_writes_;
149  };
150 
152 
153  //! Base path of the output file.
155 
156  //! File serial number for this worker
157  size_t out_serial_ = 0;
158 
159  //! Maximum file size
161 
162  //! Block size used by BlockWriter
163  size_t block_size_ = data::default_block_size;
164 
165  //! BlockWriter to sink.
166  std::unique_ptr<Writer> writer_;
167 
170 
171  //! Function to create sink_ and writer_ for next file
172  void OpenNextFile() {
173  writer_.reset();
174 
175  // construct path from pattern containing ### and $$$
177  out_pathbase_, context_.my_rank(), out_serial_++);
178 
179  sLOG << "OpenNextFile() out_path" << out_path;
180 
181  writer_ = std::make_unique<Writer>(
182  SysFileSink(
184  out_path, max_file_size_,
186  block_size_);
187  }
188 };
189 
190 template <typename ValueType, typename Stack>
192  const std::string& filepath, size_t max_file_size) const {
193 
195 
196  auto node = tlx::make_counting<WriteBinaryNode>(
197  *this, filepath, max_file_size);
198 
199  node->RunScope();
200 }
201 
202 template <typename ValueType, typename Stack>
204  const std::string& filepath, size_t max_file_size) const {
205 
207 
208  auto node = tlx::make_counting<WriteBinaryNode>(
209  *this, filepath, max_file_size);
210 
211  return Future<void>(node);
212 }
213 
214 } // namespace api
215 } // namespace thrill
216 
217 #endif // !THRILL_API_WRITE_BINARY_HEADER
218 
219 /******************************************************************************/
std::unique_ptr< Writer > writer_
BlockWriter to sink.
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
Context & context()
Returns the api::Context of this DIABase.
Definition: dia_base.hpp:208
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
tlx::CountingPtr< WriteStream > WriteStreamPtr
Definition: file_io.hpp:146
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
std::string FillFilePattern(const std::string &pathbase, size_t worker, size_t file_part)
Definition: file_io.cpp:71
void OpenNextFile()
Function to create sink_ and writer_ for next file.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
size_t out_serial_
File serial number for this worker.
virtual DIAMemUse PreOpMemUse()
Amount of RAM used by PreOp after StartPreOp()
Definition: dia_base.hpp:160
Specialized template class for ActionFuture which return void.
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
std::string out_pathbase_
Base path of the output file.
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
size_t block_size_
Block size used by BlockWriter.
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
ActionNode(Context &ctx, const char *label, const std::initializer_list< size_t > &parent_ids, const std::initializer_list< DIABasePtr > &parents)
Definition: action_node.hpp:30
WriteBinaryNode(const ParentDIA &parent, const std::string &path_out, size_t max_file_size)
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
common::JsonLogger logger_
Definition: dia_base.hpp:329
size_t local_worker_id() const
Definition: context.hpp:263
void WriteBinary(const std::string &filepath, size_t max_file_size=128 *1024 *1024) const
WriteBinary is a function, which writes a DIA to many files per worker.
static int round_up_to_power_of_two(int i)
does what it says: round up to next power of two
static constexpr bool debug
Future< void > WriteBinaryFuture(const std::string &filepath, size_t max_file_size=128 *1024 *1024) const
WriteBinary is a function, which writes a DIA to many files per worker.
size_t max_file_size_
Maximum file size.
WriteStreamPtr OpenWriteStream(const std::string &path)
Definition: file_io.cpp:211
Context & context_
associated Context
Definition: dia_base.hpp:293