Thrill  0.1
write_lines_one.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/write_lines_one.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  * Copyright (C) 2015 Alexander Noe <[email protected]>
9  *
10  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11  ******************************************************************************/
12 
13 #pragma once
14 #ifndef THRILL_API_WRITE_LINES_ONE_HEADER
15 #define THRILL_API_WRITE_LINES_ONE_HEADER
16 
18 #include <thrill/api/dia.hpp>
19 #include <thrill/data/file.hpp>
20 
21 #include <fstream>
22 #include <string>
23 
24 namespace thrill {
25 namespace api {
26 
27 /*!
28  * \ingroup api_layer
29  */
30 template <typename ValueType>
31 class WriteLinesOneNode final : public ActionNode
32 {
33  static constexpr bool debug = false;
34 
35 public:
36  using Super = ActionNode;
37  using Super::context_;
38 
39  template <typename ParentDIA>
40  WriteLinesOneNode(const ParentDIA& parent,
41  const std::string& path_out)
42  : ActionNode(parent.ctx(), "WriteLinesOne",
43  { parent.id() }, { parent.node() }),
44  path_out_(path_out),
45  file_(path_out_, std::ios::binary) {
46  sLOG << "Creating write node.";
47 
48  auto pre_op_fn = [this](const ValueType& input) {
49  PreOp(input);
50  };
51  // close the function stack with our pre op and register it at parent
52  // node for output
53  auto lop_chain = parent.stack().push(pre_op_fn).fold();
54  parent.node()->AddChild(this, lop_chain);
55  }
56 
57  void PreOp(const ValueType& input) {
58  writer_.Put(input);
59  local_size_ += input.size() + 1;
60  local_lines_++;
61  }
62 
63  void StopPreOp(size_t /* parent_index */) final {
64  writer_.Close();
65  }
66 
67  //! Closes the output file
68  void Execute() final {
70  << "class" << "WriteLinesOneNode"
71  << "total_bytes" << local_size_
72  << "total_lines" << local_lines_;
73 
74  // (Portable) allocation of output file, setting individual file pointers.
75  size_t prefix_elem = context_.net.ExPrefixSum(local_size_);
76  if (context_.my_rank() == context_.num_workers() - 1) {
77  file_.seekp(prefix_elem + local_size_ - 1);
78  file_.put('\0');
79  }
80  file_.seekp(prefix_elem);
82 
84  size_t num_items = temp_file_.num_items();
85 
86  for (size_t i = 0; i < num_items; ++i) {
87  file_ << reader.Next<ValueType>() << '\n';
88  }
89  }
90 
91 private:
92  //! Path of the output file.
94 
95  //! File to write to
96  std::ofstream file_;
97 
98  //! Local file size
99  size_t local_size_ = 0;
100 
101  //! Temporary File for splitting correctly?
103 
104  //! File writer used.
106 
107  size_t local_lines_ = 0;
108 };
109 
110 template <typename ValueType, typename Stack>
112  const std::string& filepath) const {
113  assert(IsValid());
114 
116  "WriteLinesOne needs an std::string as input parameter");
117 
119 
120  auto node = tlx::make_counting<WriteLinesOneNode>(*this, filepath);
121 
122  node->RunScope();
123 }
124 
125 template <typename ValueType, typename Stack>
127  const std::string& filepath) const {
128  assert(IsValid());
129 
131  "WriteLinesOne needs an std::string as input parameter");
132 
134 
135  auto node = tlx::make_counting<WriteLinesOneNode>(*this, filepath);
136 
137  return Future<void>(node);
138 }
139 
140 } // namespace api
141 } // namespace thrill
142 
143 #endif // !THRILL_API_WRITE_LINES_ONE_HEADER
144 
145 /******************************************************************************/
void WriteLinesOne(const std::string &filepath) const
WriteLinesOne is an Action, which writes std::strings to a single output file.
std::string path_out_
Path of the output file.
net::FlowControlChannel & net
Definition: context.hpp:446
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
data::File::Writer writer_
File writer used.
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:251
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
void Barrier()
A trivial global barrier.
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
void StopPreOp(size_t) final
Virtual method for preparing end of PushData.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSum(const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, given a certain sum operation.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
WriteLinesOneNode(const ParentDIA &parent, const std::string &path_out)
std::ofstream file_
File to write to.
Specialized template class for ActionFuture which return void.
data::File temp_file_
Temporary File for splitting correctly?
ConsumeReader GetConsumeReader(size_t prefetch_size=File::default_prefetch_size_)
Get consuming BlockReader for beginning of File.
Definition: file.cpp:73
int value
Definition: gen_data.py:41
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:283
ActionNode(Context &ctx, const char *label, const std::initializer_list< size_t > &parent_ids, const std::initializer_list< DIABasePtr > &parents)
Definition: action_node.hpp:30
Future< void > WriteLinesOneFuture(const std::string &filepath) const
WriteLinesOne is an ActionFuture, which writes std::strings to a single output file.
common::JsonLogger logger_
Definition: dia_base.hpp:329
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
void PreOp(const ValueType &input)
size_t num_items() const
Return the number of items in the file.
Definition: file.hpp:180
void Close()
Explicitly close the writer.
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Definition: file.cpp:63
void Execute() final
Closes the output file.
Context & context_
associated Context
Definition: dia_base.hpp:293
size_t local_size_
Local file size.