Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
write_lines.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/write_lines.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  * Copyright (C) 2015 Alexander Noe <[email protected]>
9  *
10  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11  ******************************************************************************/
12 
13 #pragma once
14 #ifndef THRILL_API_WRITE_LINES_HEADER
15 #define THRILL_API_WRITE_LINES_HEADER
16 
18 #include <thrill/api/dia.hpp>
19 #include <thrill/common/math.hpp>
21 #include <thrill/vfs/file_io.hpp>
22 
23 #include <algorithm>
24 #include <string>
25 
26 namespace thrill {
27 namespace api {
28 
29 /*!
30  * \ingroup api_layer
31  */
32 template <typename ValueType>
33 class WriteLinesNode final : public ActionNode
34 {
35  static constexpr bool debug = false;
36 
37 public:
38  using Super = ActionNode;
39  using Super::context_;
40 
41  //! input type is the parent's output value type.
42  using ValueType_ = ValueType;
43 
44  template <typename ParentDIA>
45  WriteLinesNode(const ParentDIA& parent,
46  const std::string& path_out,
47  size_t target_file_size)
48  : Super(parent.ctx(), "WriteLines",
49  { parent.id() }, { parent.node() }),
50  out_pathbase_(path_out),
53  out_pathbase_, context_.my_rank(), 0))),
54  target_file_size_(target_file_size) {
55  sLOG << "Creating write node.";
56 
57  auto pre_op_fn = [this](const std::string& input) {
58  PreOp(input);
59  };
60 
64 
65  // close the function stack with our pre op and register it at parent
66  // node for output
67  auto lop_chain = parent.stack().push(pre_op_fn).fold();
68  parent.node()->AddChild(this, lop_chain);
69  }
70 
71  DIAMemUse PreOpMemUse() final {
72  return max_buffer_size_;
73  }
74 
75  void StartPreOp(size_t /* parent_index */) final {
77  }
78 
79  void PreOp(const std::string& input) {
81 
82  if (TLX_UNLIKELY(current_buffer_size_ + input.size() + 1
83  >= max_buffer_size_)) {
86  timer.Start();
88  timer.Stop();
93  LOG << "Closing file" << out_serial_;
94  stream_->close();
96  out_pathbase_, context_.my_rank(), out_serial_++);
97  stream_ = vfs::OpenWriteStream(new_path);
98  LOG << "Opening file: " << new_path;
100  }
101  // String is too long to fit into buffer, write directly, add '\n' to
102  // start of next buffer.
103  if (TLX_UNLIKELY(input.size() >= max_buffer_size_)) {
105  stats_total_bytes_ += input.size();
106  current_file_size_ += input.size() + 1;
107  timer.Start();
108  stream_->write(input.data(), input.size());
109  timer.Stop();
111  write_buffer_.PutByte('\n');
112  return;
113  }
114  }
115  current_buffer_size_ += input.size() + 1;
117  write_buffer_.PutByte('\n');
119  }
120 
121  //! Closes the output file, write last buffer
122  void StopPreOp(size_t /* parent_index */) final {
123  sLOG << "closing file";
126  timer.Start();
128  timer.Stop();
129  stream_->close();
130 
132  << "class" << "WriteLinesNode"
133  << "event" << "done"
134  << "total_bytes" << stats_total_bytes_
135  << "total_lines" << stats_total_elements_
136  << "total_writes" << stats_total_writes_
137  << "total_files" << out_serial_
138  << "write_time" << timer;
139  }
140 
141  void Execute() final { }
142 
143 private:
144  //! Base path of the output file.
145  std::string out_pathbase_;
146 
147  //! Current file size in bytes
148  size_t current_file_size_ = 0;
149 
150  //! File serial number for this worker
151  size_t out_serial_ = 1;
152 
153  //! File to wrtie to
155 
156  //! Write buffer
158 
159  //! Maximum buffer size
161 
162  //! Current buffer size
164 
165  //! Targetl file size in bytes
167 
169 
170  size_t stats_total_bytes_ = 0;
173 };
174 
175 template <typename ValueType, typename Stack>
177  const std::string& filepath, size_t target_file_size) const {
178  assert(IsValid());
179 
181  "WriteLines needs an std::string as input parameter");
182 
184 
185  auto node = tlx::make_counting<WriteLinesNode>(
186  *this, filepath, target_file_size);
187 
188  node->RunScope();
189 }
190 
191 template <typename ValueType, typename Stack>
193  const std::string& filepath, size_t target_file_size) const {
194  assert(IsValid());
195 
197  "WriteLines needs an std::string as input parameter");
198 
200 
201  auto node = tlx::make_counting<WriteLinesNode>(
202  *this, filepath, target_file_size);
203 
204  return Future<void>(node);
205 }
206 
207 } // namespace api
208 } // namespace thrill
209 
210 #endif // !THRILL_API_WRITE_LINES_HEADER
211 
212 /******************************************************************************/
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
common::StatsTimerStopped timer
BufferBuilder & AppendString(const std::string &s)
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
WriteLinesNode(const ParentDIA &parent, const std::string &path_out, size_t target_file_size)
Definition: write_lines.hpp:45
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
std::string FillFilePattern(const std::string &pathbase, size_t worker, size_t file_part)
Definition: file_io.cpp:71
BufferBuilder & PutByte(Byte data)
Put a single byte to the buffer (used via CRTP from ItemWriterToolsBase)
void WriteLines(const std::string &filepath, size_t target_file_size=128 *1024 *1024) const
WriteLines is an Action, which writes std::strings to multiple output files.
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
size_t current_buffer_size_
Current buffer size.
ValueType ValueType_
input type is the parent's output value type.
Definition: write_lines.hpp:42
size_t target_file_size_
Targetl file size in bytes.
size_t out_serial_
File serial number for this worker.
virtual DIAMemUse PreOpMemUse()
Amount of RAM used by PreOp after StartPreOp()
Definition: dia_base.hpp:160
Specialized template class for ActionFuture which return void.
net::BufferBuilder write_buffer_
Write buffer.
size_t max_buffer_size_
Maximum buffer size.
size_t size() const
Return the currently used length in bytes.
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
ActionNode(Context &ctx, const char *label, const std::initializer_list< size_t > &parent_ids, const std::initializer_list< DIABasePtr > &parents)
Definition: action_node.hpp:30
const Byte * data() const
Return a pointer to the currently kept memory area.
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
BufferBuilder & Reserve(size_t n)
Make sure that at least n bytes are allocated.
BufferBuilder represents a dynamically growable area of memory, which can be modified by appending in...
common::JsonLogger logger_
Definition: dia_base.hpp:329
static int round_up_to_power_of_two(int i)
does what it says: round up to next power of two
size_t current_file_size_
Current file size in bytes.
vfs::WriteStreamPtr stream_
File to wrtie to.
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
WriteStreamPtr OpenWriteStream(const std::string &path)
Definition: file_io.cpp:211
BufferBuilder & set_size(size_t n)
Context & context_
associated Context
Definition: dia_base.hpp:293
static constexpr bool debug
Definition: write_lines.hpp:35
Future< void > WriteLinesFuture(const std::string &filepath, size_t target_file_size=128 *1024 *1024) const
WriteLines is an ActionFuture, which writes std::strings to multiple output files.