Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
write_lines_one.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/write_lines_one.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  * Copyright (C) 2015 Alexander Noe <[email protected]>
9  *
10  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11  ******************************************************************************/
12 
13 #pragma once
14 #ifndef THRILL_API_WRITE_LINES_ONE_HEADER
15 #define THRILL_API_WRITE_LINES_ONE_HEADER
16 
18 #include <thrill/api/dia.hpp>
19 #include <thrill/data/file.hpp>
20 
21 #include <fstream>
22 #include <string>
23 
24 namespace thrill {
25 namespace api {
26 
27 /*!
28  * \ingroup api_layer
29  */
30 template <typename ValueType>
31 class WriteLinesOneNode final : public ActionNode
32 {
33  static constexpr bool debug = false;
34 
35 public:
36  using Super = ActionNode;
37  using Super::context_;
38 
39  template <typename ParentDIA>
40  WriteLinesOneNode(const ParentDIA& parent,
41  const std::string& path_out)
42  : ActionNode(parent.ctx(), "WriteLinesOne",
43  { parent.id() }, { parent.node() }),
44  path_out_(path_out),
45  file_(path_out_, std::ios::binary)
46  {
47  sLOG << "Creating write node.";
48 
49  auto pre_op_fn = [this](const ValueType& input) {
50  PreOp(input);
51  };
52  // close the function stack with our pre op and register it at parent
53  // node for output
54  auto lop_chain = parent.stack().push(pre_op_fn).fold();
55  parent.node()->AddChild(this, lop_chain);
56  }
57 
58  void PreOp(const ValueType& input) {
59  writer_.Put(input);
60  local_size_ += input.size() + 1;
61  local_lines_++;
62  }
63 
64  void StopPreOp(size_t /* id */) final {
65  writer_.Close();
66  }
67 
68  //! Closes the output file
69  void Execute() final {
71  << "class" << "WriteLinesOneNode"
72  << "total_bytes" << local_size_
73  << "total_lines" << local_lines_;
74 
75  // (Portable) allocation of output file, setting individual file pointers.
76  size_t prefix_elem = context_.net.ExPrefixSum(local_size_);
77  if (context_.my_rank() == context_.num_workers() - 1) {
78  file_.seekp(prefix_elem + local_size_ - 1);
79  file_.put('\0');
80  }
81  file_.seekp(prefix_elem);
83 
85  size_t num_items = temp_file_.num_items();
86 
87  for (size_t i = 0; i < num_items; ++i) {
88  file_ << reader.Next<ValueType>() << '\n';
89  }
90  }
91 
92 private:
93  //! Path of the output file.
94  std::string path_out_;
95 
96  //! File to write to
97  std::ofstream file_;
98 
99  //! Local file size
100  size_t local_size_ = 0;
101 
102  //! Temporary File for splitting correctly?
104 
105  //! File writer used.
107 
108  size_t local_lines_ = 0;
109 };
110 
111 template <typename ValueType, typename Stack>
113  const std::string& filepath) const {
114  assert(IsValid());
115 
117  "WriteLinesOne needs an std::string as input parameter");
118 
120 
121  auto node = tlx::make_counting<WriteLinesOneNode>(*this, filepath);
122 
123  node->RunScope();
124 }
125 
126 template <typename ValueType, typename Stack>
128  const std::string& filepath) const {
129  assert(IsValid());
130 
132  "WriteLinesOne needs an std::string as input parameter");
133 
135 
136  auto node = tlx::make_counting<WriteLinesOneNode>(*this, filepath);
137 
138  return Future<void>(node);
139 }
140 
141 } // namespace api
142 } // namespace thrill
143 
144 #endif // !THRILL_API_WRITE_LINES_ONE_HEADER
145 
146 /******************************************************************************/
net::FlowControlChannel & net
Definition: context.hpp:443
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
data::File::Writer writer_
File writer used.
size_t num_items() const
Return the number of items in the file.
Definition: file.hpp:180
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:240
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
void Barrier()
A trivial global barrier.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSum(const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, given a certain sum operation.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
WriteLinesOneNode(const ParentDIA &parent, const std::string &path_out)
std::ofstream file_
File to write to.
Specialized template class for ActionFuture which return void.
data::File temp_file_
Temporary File for splitting correctly?
ConsumeReader GetConsumeReader(size_t prefetch_size=File::default_prefetch_size_)
Get consuming BlockReader for beginning of File.
Definition: file.cpp:73
BlockReader< ConsumeFileBlockSource > ConsumeReader
Definition: file.hpp:62
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
void WriteLinesOne(const std::string &filepath) const
WriteLinesOne is an Action, which writes std::strings to a single output file.
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:280
ActionNode(Context &ctx, const char *label, const std::initializer_list< size_t > &parent_ids, const std::initializer_list< DIABasePtr > &parents)
Definition: action_node.hpp:30
common::JsonLogger logger_
Definition: dia_base.hpp:329
Future< void > WriteLinesOneFuture(const std::string &filepath) const
WriteLinesOne is an ActionFuture, which writes std::strings to a single output file.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
void Close()
Explicitly close the writer.
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:248
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Definition: file.cpp:63
Context & context_
associated Context
Definition: dia_base.hpp:293
size_t local_size_
Local file size.