Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
write_lines.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/write_lines.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  * Copyright (C) 2015 Alexander Noe <[email protected]>
9  *
10  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11  ******************************************************************************/
12 
13 #pragma once
14 #ifndef THRILL_API_WRITE_LINES_HEADER
15 #define THRILL_API_WRITE_LINES_HEADER
16 
18 #include <thrill/api/dia.hpp>
19 #include <thrill/common/math.hpp>
21 #include <thrill/vfs/file_io.hpp>
22 
23 #include <algorithm>
24 #include <string>
25 
26 namespace thrill {
27 namespace api {
28 
29 /*!
30  * \ingroup api_layer
31  */
32 template <typename ValueType>
33 class WriteLinesNode final : public ActionNode
34 {
35  static constexpr bool debug = false;
36 
37 public:
38  using Super = ActionNode;
39  using Super::context_;
40 
41  //! input type is the parent's output value type.
42  using ValueType_ = ValueType;
43 
44  template <typename ParentDIA>
45  WriteLinesNode(const ParentDIA& parent,
46  const std::string& path_out,
47  size_t target_file_size)
48  : Super(parent.ctx(), "WriteLines",
49  { parent.id() }, { parent.node() }),
50  out_pathbase_(path_out),
53  out_pathbase_, context_.my_rank(), 0))),
54  target_file_size_(target_file_size)
55  {
56  sLOG << "Creating write node.";
57 
58  auto pre_op_fn = [this](const std::string& input) {
59  PreOp(input);
60  };
61 
65 
66  // close the function stack with our pre op and register it at parent
67  // node for output
68  auto lop_chain = parent.stack().push(pre_op_fn).fold();
69  parent.node()->AddChild(this, lop_chain);
70  }
71 
72  DIAMemUse PreOpMemUse() final {
73  return max_buffer_size_;
74  }
75 
76  void StartPreOp(size_t /* id */) final {
78  }
79 
80  void PreOp(const std::string& input) {
82 
83  if (TLX_UNLIKELY(current_buffer_size_ + input.size() + 1
84  >= max_buffer_size_)) {
87  timer.Start();
89  timer.Stop();
94  LOG << "Closing file" << out_serial_;
95  stream_->close();
97  out_pathbase_, context_.my_rank(), out_serial_++);
98  stream_ = vfs::OpenWriteStream(new_path);
99  LOG << "Opening file: " << new_path;
100  current_file_size_ = 0;
101  }
102  // String is too long to fit into buffer, write directly, add '\n' to
103  // start of next buffer.
104  if (TLX_UNLIKELY(input.size() >= max_buffer_size_)) {
106  stats_total_bytes_ += input.size();
107  current_file_size_ += input.size() + 1;
108  timer.Start();
109  stream_->write(input.data(), input.size());
110  timer.Stop();
112  write_buffer_.PutByte('\n');
113  return;
114  }
115  }
116  current_buffer_size_ += input.size() + 1;
118  write_buffer_.PutByte('\n');
120  }
121 
122  //! Closes the output file, write last buffer
123  void StopPreOp(size_t /* id */) final {
124  sLOG << "closing file";
127  timer.Start();
129  timer.Stop();
130  stream_->close();
131 
133  << "class" << "WriteLinesNode"
134  << "event" << "done"
135  << "total_bytes" << stats_total_bytes_
136  << "total_lines" << stats_total_elements_
137  << "total_writes" << stats_total_writes_
138  << "total_files" << out_serial_
139  << "write_time" << timer;
140  }
141 
142  void Execute() final { }
143 
144 private:
145  //! Base path of the output file.
146  std::string out_pathbase_;
147 
148  //! Current file size in bytes
149  size_t current_file_size_ = 0;
150 
151  //! File serial number for this worker
152  size_t out_serial_ = 1;
153 
154  //! File to wrtie to
156 
157  //! Write buffer
159 
160  //! Maximum buffer size
162 
163  //! Current buffer size
165 
166  //! Targetl file size in bytes
168 
170 
171  size_t stats_total_bytes_ = 0;
174 };
175 
176 template <typename ValueType, typename Stack>
178  const std::string& filepath, size_t target_file_size) const {
179  assert(IsValid());
180 
182  "WriteLines needs an std::string as input parameter");
183 
185 
186  auto node = tlx::make_counting<WriteLinesNode>(
187  *this, filepath, target_file_size);
188 
189  node->RunScope();
190 }
191 
192 template <typename ValueType, typename Stack>
194  const std::string& filepath, size_t target_file_size) const {
195  assert(IsValid());
196 
198  "WriteLines needs an std::string as input parameter");
199 
201 
202  auto node = tlx::make_counting<WriteLinesNode>(
203  *this, filepath, target_file_size);
204 
205  return Future<void>(node);
206 }
207 
208 } // namespace api
209 } // namespace thrill
210 
211 #endif // !THRILL_API_WRITE_LINES_HEADER
212 
213 /******************************************************************************/
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
common::StatsTimerStopped timer
BufferBuilder & AppendString(const std::string &s)
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:240
WriteLinesNode(const ParentDIA &parent, const std::string &path_out, size_t target_file_size)
Definition: write_lines.hpp:45
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:25
std::string FillFilePattern(const std::string &pathbase, size_t worker, size_t file_part)
Definition: file_io.cpp:70
BufferBuilder & PutByte(Byte data)
Put a single byte to the buffer (used via CRTP from ItemWriterToolsBase)
int round_up_to_power_of_two(int i)
does what it says: round up to next power of two
void WriteLines(const std::string &filepath, size_t target_file_size=128 *1024 *1024) const
WriteLines is an Action, which writes std::strings to multiple output files.
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
size_t current_buffer_size_
Current buffer size.
ValueType ValueType_
input type is the parent's output value type.
Definition: write_lines.hpp:42
size_t target_file_size_
Targetl file size in bytes.
size_t out_serial_
File serial number for this worker.
virtual DIAMemUse PreOpMemUse()
Amount of RAM used by PreOp after StartPreOp()
Definition: dia_base.hpp:160
Specialized template class for ActionFuture which return void.
net::BufferBuilder write_buffer_
Write buffer.
size_t max_buffer_size_
Maximum buffer size.
size_t size() const
Return the currently used length in bytes.
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
ActionNode(Context &ctx, const char *label, const std::initializer_list< size_t > &parent_ids, const std::initializer_list< DIABasePtr > &parents)
Definition: action_node.hpp:30
const Byte * data() const
Return a pointer to the currently kept memory area.
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
BufferBuilder & Reserve(size_t n)
Make sure that at least n bytes are allocated.
BufferBuilder represents a dynamically growable area of memory, which can be modified by appending in...
common::JsonLogger logger_
Definition: dia_base.hpp:329
size_t current_file_size_
Current file size in bytes.
vfs::WriteStreamPtr stream_
File to wrtie to.
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
WriteStreamPtr OpenWriteStream(const std::string &path)
Definition: file_io.cpp:210
BufferBuilder & set_size(size_t n)
Context & context_
associated Context
Definition: dia_base.hpp:293
static constexpr bool debug
Definition: write_lines.hpp:35
Future< void > WriteLinesFuture(const std::string &filepath, size_t target_file_size=128 *1024 *1024) const
WriteLines is an ActionFuture, which writes std::strings to multiple output files.