Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
collapse.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/collapse.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
7  * Copyright (C) 2016 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_API_COLLAPSE_HEADER
14 #define THRILL_API_COLLAPSE_HEADER
15 
16 #include <thrill/api/dia.hpp>
17 #include <thrill/api/dia_node.hpp>
19 
20 #include <algorithm>
21 
22 namespace thrill {
23 namespace api {
24 
25 /*!
26  * \ingroup api_layer
27  */
28 template <typename ValueType>
29 class CollapseNode final : public DIANode<ValueType>
30 {
31 public:
33  using Super::context_;
34 
35  /*!
36  * Constructor for a LOpNode. Sets the Context, parents and stack.
37  */
38  template <typename ParentDIA>
39  explicit CollapseNode(const ParentDIA& parent)
40  : Super(parent.ctx(), "Collapse", { parent.id() }, { parent.node() }),
41  parent_stack_empty_(ParentDIA::stack_empty)
42  {
43  auto propagate_fn = [this](const ValueType& input) {
44  this->PushItem(input);
45  };
46  auto lop_chain = parent.stack().push(propagate_fn).fold();
47  parent.node()->AddChild(this, lop_chain);
48  }
49 
50  //! A CollapseNode cannot be executed, it never contains any data.
51  bool ForwardDataOnly() const final { return true; }
52 
53  bool RequireParentPushData(size_t /* parent_index */) const final
54  { return true; }
55 
56  void Execute() final { abort(); }
57 
58  void StartPreOp(size_t /* id */) final {
59  for (typename Super::Child& child : Super::children_)
60  child.node->StartPreOp(child.parent_index);
61  }
62 
63  //! Receive a whole data::File of ValueType, but only if our stack is empty.
64  bool OnPreOpFile(const data::File& file, size_t /* parent_index */) final {
65  if (!parent_stack_empty_) {
66  LOG1 << "Collapse rejected File from parent "
67  << "due to non-empty function stack.";
68  return false;
69  }
70 
71  // forward file
72  LOG1 << "Collapse accepted File from parent";
73  data::File file_copy = file.Copy();
74  this->PushFile(file_copy, /* consume */ true);
75  return true;
76  }
77 
78  void StopPreOp(size_t /* id */) final {
79  for (typename Super::Child& child : Super::children_)
80  child.node->StopPreOp(child.parent_index);
81  }
82 
83  void PushData(bool /* consume */) final { }
84 
85  size_t consume_counter() const final {
86  // calculate consumption of parents
87  size_t c = Super::kNeverConsume;
88  for (auto& p : Super::parents_) {
89  c = std::min(c, p->consume_counter());
90  }
91  return c;
92  }
93 
94  void IncConsumeCounter(size_t consume) final {
95  // propagate consumption up to parents.
96  for (auto& p : Super::parents_) {
97  p->IncConsumeCounter(consume);
98  }
99  }
100 
101  void DecConsumeCounter(size_t consume) final {
102  // propagate consumption up to parents.
103  for (auto& p : Super::parents_) {
104  p->DecConsumeCounter(consume);
105  }
106  }
107 
108  void SetConsumeCounter(size_t consume) final {
109  // propagate consumption up to parents.
110  for (auto& p : Super::parents_) {
111  p->SetConsumeCounter(consume);
112  }
113  }
114 
115 private:
116  //! Whether the parent stack is empty
117  const bool parent_stack_empty_;
118 };
119 
120 #ifndef THRILL_DOXYGEN_IGNORE
121 
122 template <typename ValueType, typename Stack>
123 template <typename AnyStack>
124 DIA<ValueType, Stack>::DIA(const DIA<ValueType, AnyStack>& rhs)
125 // Create new CollapseNode. Transfer stack from rhs to CollapseNode. Build new
126 // DIA with empty stack and CollapseNode
127  : DIA(tlx::make_counting<api::CollapseNode<ValueType> >(rhs)) {
128  LOG0 << "WARNING: cast to DIA creates CollapseNode instead of inline chaining.";
129  LOG0 << "Consider whether you can use auto instead of DIA.";
130 }
131 
132 #endif // THRILL_DOXYGEN_IGNORE
133 
134 //! Template switch to generate a CollapseNode if there is a non-empty Stack
135 template <typename ValueType, typename Stack>
138  assert(dia.IsValid());
139 
140  // Create new CollapseNode. Transfer stack from rhs to
141  // CollapseNode. Build new DIA with empty stack and CollapseNode
143 
144  return DIA<ValueType>(tlx::make_counting<CollapseNode>(dia));
145  }
146 };
147 
148 //! Template switch to NOT generate a CollapseNode if there is an empty Stack.
149 template <typename ValueType>
150 struct CollapseSwitch<ValueType, tlx::FunctionStack<ValueType> >{
152  const DIA<ValueType, tlx::FunctionStack<ValueType> >& dia) {
153  return dia;
154  }
155 };
156 
157 template <typename ValueType, typename Stack>
160 }
161 
162 } // namespace api
163 } // namespace thrill
164 
165 #endif // !THRILL_API_COLLAPSE_HEADER
166 
167 /******************************************************************************/
void PushFile(data::File &file, bool consume) const
Definition: dia_node.hpp:156
Template switch to generate a CollapseNode if there is a non-empty Stack.
Definition: collapse.hpp:136
CountingPtr< Type > make_counting(Args &&...args)
method analogous to std::make_shared and std::make_unique.
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:175
virtual void DecConsumeCounter(size_t counter)
Definition: dia_base.hpp:237
#define LOG1
Definition: logger.hpp:176
virtual size_t consume_counter() const
Returns consume_counter_.
Definition: dia_base.hpp:226
DIA()=default
default-constructor: invalid DIA
bool IsValid() const
Return whether the DIA is valid.
Definition: dia.hpp:175
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
A DIANode is a typed node representing and operation in Thrill.
Definition: dia_node.hpp:37
A FunctionStack is a chain of functor that can be folded to a single functor (which is usually optimi...
CollapseNode(const ParentDIA &parent)
Constructor for a LOpNode.
Definition: collapse.hpp:39
static DIA< ValueType > MakeCollapse(const DIA< ValueType, Stack > &dia)
Definition: collapse.hpp:137
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
std::vector< DIABasePtr > parents_
Parents of this DIABase.
Definition: dia_base.hpp:310
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
static DIA< ValueType > MakeCollapse(const DIA< ValueType, tlx::FunctionStack< ValueType > > &dia)
Definition: collapse.hpp:151
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
virtual void SetConsumeCounter(size_t counter)
Definition: dia_base.hpp:248
std::vector< Child > children_
Callback functions from the child nodes.
Definition: dia_node.hpp:181
virtual bool OnPreOpFile(const data::File &, size_t)
Definition: dia_base.hpp:168
virtual bool ForwardDataOnly() const
Definition: dia_base.hpp:148
virtual bool RequireParentPushData(size_t) const
Definition: dia_base.hpp:153
virtual void IncConsumeCounter(size_t counter)
Definition: dia_base.hpp:230
static constexpr const T & min(const T &a, const T &b)
template for constexpr min, because std::min is not good enough.
Definition: functional.hpp:59
static constexpr size_t kNeverConsume
Never full consume.
Definition: dia_base.hpp:324
Context & context_
associated Context
Definition: dia_base.hpp:293
DIA< ValueType > Collapse() const
Create a CollapseNode which is mainly used to collapse the LOp chain into a DIA<T> with an empty stac...
Definition: collapse.hpp:158