Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
prefixsum.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/prefixsum.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_API_PREFIXSUM_HEADER
14 #define THRILL_API_PREFIXSUM_HEADER
15 
16 #include <thrill/api/dia.hpp>
17 #include <thrill/api/dop_node.hpp>
18 #include <thrill/common/logger.hpp>
19 #include <thrill/data/file.hpp>
20 
21 namespace thrill {
22 namespace api {
23 
24 /*!
25  * \ingroup api_layer
26  */
27 template <typename ValueType, typename SumFunction, bool Inclusive>
28 class PrefixSumNode final : public DOpNode<ValueType>
29 {
30  static constexpr bool debug = false;
31 
33  using Super::context_;
34 
35 public:
36  template <typename ParentDIA>
37  PrefixSumNode(const ParentDIA& parent,
38  const char* label,
39  const SumFunction& sum_function,
40  const ValueType& initial_element)
41  : Super(parent.ctx(), label, { parent.id() }, { parent.node() }),
42  sum_function_(sum_function),
43  local_sum_(),
44  initial_element_(initial_element),
45  parent_stack_empty_(ParentDIA::stack_empty)
46  {
47  // Hook PreOp(s)
48  auto pre_op_fn = [this](const ValueType& input) {
49  PreOp(input);
50  };
51 
52  auto lop_chain = parent.stack().push(pre_op_fn).fold();
53  parent.node()->AddChild(this, lop_chain);
54  }
55 
56  //! PreOp: compute local prefixsum and store items.
57  void PreOp(const ValueType& input) {
58  LOG << "Input: " << input;
59  local_sum_ = sum_function_(local_sum_, input);
60  writer_.Put(input);
61  }
62 
63  bool OnPreOpFile(const data::File& file, size_t /* parent_index */) final {
64  if (!parent_stack_empty_) {
66  << "PrefixSum rejected File from parent "
67  << "due to non-empty function stack.";
68  return false;
69  }
70  // copy complete Block references to writer_
71  file_ = file.Copy();
72  // read File for prefix sum.
73  auto reader = file_.GetKeepReader();
74  while (reader.HasNext()) {
75  local_sum_ = sum_function_(
76  local_sum_, reader.template Next<ValueType>());
77  }
78  return true;
79  }
80 
81  void StopPreOp(size_t /* parent_index */) final {
82  writer_.Close();
83  }
84 
85  //! Executes the prefixsum operation.
86  void Execute() final {
87  LOG << "MainOp processing";
88 
90  local_sum_, sum_function_, initial_element_);
91  }
92 
93  void PushData(bool consume) final {
94  data::File::Reader reader = file_.GetReader(consume);
95  size_t num_items = file_.num_items();
96 
97  if (Inclusive) {
98  ValueType sum = local_sum_;
99  for (size_t i = 0; i < num_items; ++i) {
100  sum = sum_function_(sum, reader.Next<ValueType>());
101  this->PushItem(sum);
102  }
103  }
104  else {
105  ValueType sum = local_sum_;
106  for (size_t i = 0; i < num_items; ++i) {
107  this->PushItem(sum);
108  sum = sum_function_(sum, reader.Next<ValueType>());
109  }
110  }
111  }
112 
113  void Dispose() final {
114  file_.Clear();
115  }
116 
117 private:
118  //! The sum function which is applied to two elements.
119  SumFunction sum_function_;
120  //! Local sum to be used in all reduce operation.
121  ValueType local_sum_;
122  //! Initial element.
123  const ValueType initial_element_;
124  //! Whether the parent stack is empty
126 
127  //! Local data file
129  //! Data writer to local file (only active in PreOp).
131 };
132 
133 template <typename ValueType, typename Stack>
134 template <typename SumFunction>
136  const SumFunction& sum_function, const ValueType& initial_element) const {
137  assert(IsValid());
138 
140  ValueType, SumFunction, /* Inclusive */ true>;
141 
142  static_assert(
143  std::is_convertible<
144  ValueType,
146  >::value,
147  "SumFunction has the wrong input type");
148 
149  static_assert(
150  std::is_convertible<
151  ValueType,
152  typename FunctionTraits<SumFunction>::template arg<1> >::value,
153  "SumFunction has the wrong input type");
154 
155  static_assert(
156  std::is_convertible<
158  ValueType>::value,
159  "SumFunction has the wrong input type");
160 
161  auto node = tlx::make_counting<PrefixSumNode>(
162  *this, "PrefixSum", sum_function, initial_element);
163 
164  return DIA<ValueType>(node);
165 }
166 
167 template <typename ValueType, typename Stack>
168 template <typename SumFunction>
170  const SumFunction& sum_function, const ValueType& initial_element) const {
171  assert(IsValid());
172 
174  ValueType, SumFunction, /* Inclusive */ false>;
175 
176  static_assert(
177  std::is_convertible<
178  ValueType,
180  >::value,
181  "SumFunction has the wrong input type");
182 
183  static_assert(
184  std::is_convertible<
185  ValueType,
186  typename FunctionTraits<SumFunction>::template arg<1> >::value,
187  "SumFunction has the wrong input type");
188 
189  static_assert(
190  std::is_convertible<
192  ValueType>::value,
193  "SumFunction has the wrong input type");
194 
195  auto node = tlx::make_counting<PrefixSumNode>(
196  *this, "ExPrefixSum", sum_function, initial_element);
197 
198  return DIA<ValueType>(node);
199 }
200 
201 } // namespace api
202 } // namespace thrill
203 
204 #endif // !THRILL_API_PREFIXSUM_HEADER
205 
206 /******************************************************************************/
net::FlowControlChannel & net
Definition: context.hpp:443
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
size_t num_items() const
Return the number of items in the file.
Definition: file.hpp:180
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
ValueType local_sum_
Local sum to be used in all reduce operation.
Definition: prefixsum.hpp:41
ValueType_ ValueType
Definition: dia.hpp:152
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
void Clear()
Free all Blocks in the File and deallocate vectors.
Definition: file.cpp:57
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSum(const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, given a certain sum operation.
static constexpr bool debug
Definition: prefixsum.hpp:30
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
static constexpr bool g_debug_push_file
Definition: config.hpp:44
const char * label() const
return label() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:218
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
data::File::Writer writer_
Data writer to local file (only active in PreOp).
Definition: prefixsum.hpp:130
const bool parent_stack_empty_
Whether the parent stack is empty.
Definition: prefixsum.hpp:125
const ValueType initial_element_
Initial element.
Definition: prefixsum.hpp:123
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
auto PrefixSum(const SumFunction &sum_function=SumFunction(), const ValueType &initial_element=ValueType()) const
PrefixSum is a DOp, which computes the (inclusive) prefix sum of all elements.
Definition: prefixsum.hpp:135
File Copy() const
Return a copy of the File (explicit copy-constructor)
Definition: file.cpp:42
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
common::FunctionTraits< Function > FunctionTraits
alias for convenience.
Definition: dia.hpp:147
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:280
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
Reader GetReader(bool consume, size_t prefetch_size=File::default_prefetch_size_)
Get BlockReader or a consuming BlockReader for beginning of File.
Definition: file.cpp:78
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
data::File file_
Local data file.
Definition: prefixsum.hpp:128
virtual bool OnPreOpFile(const data::File &, size_t)
Definition: dia_base.hpp:168
void Close()
Explicitly close the writer.
auto ExPrefixSum(const SumFunction &sum_function=SumFunction(), const ValueType &initial_element=ValueType()) const
ExPrefixSum is a DOp, which computes the exclusive prefix sum of all elements.
Definition: prefixsum.hpp:169
DynBlockReader Reader
Definition: file.hpp:60
KeepReader GetKeepReader(size_t prefetch_size=File::default_prefetch_size_) const
Get BlockReader for beginning of File.
Definition: file.cpp:68
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:141
PrefixSumNode(const ParentDIA &parent, const char *label, const SumFunction &sum_function, const ValueType &initial_element)
Definition: prefixsum.hpp:37
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Definition: file.cpp:63
Context & context_
associated Context
Definition: dia_base.hpp:293
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:137