Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
collapse.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/collapse.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
7  * Copyright (C) 2016 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_API_COLLAPSE_HEADER
14 #define THRILL_API_COLLAPSE_HEADER
15 
16 #include <thrill/api/dia.hpp>
17 #include <thrill/api/dia_node.hpp>
19 
20 #include <algorithm>
21 
22 namespace thrill {
23 namespace api {
24 
25 /*!
26  * \ingroup api_layer
27  */
28 template <typename ValueType>
29 class CollapseNode final : public DIANode<ValueType>
30 {
31 public:
33  using Super::context_;
34 
35  /*!
36  * Constructor for a LOpNode. Sets the Context, parents and stack.
37  */
38  template <typename ParentDIA>
39  explicit CollapseNode(const ParentDIA& parent)
40  : Super(parent.ctx(), "Collapse", { parent.id() }, { parent.node() }),
41  parent_stack_empty_(ParentDIA::stack_empty) {
42  auto propagate_fn = [this](const ValueType& input) {
43  this->PushItem(input);
44  };
45  auto lop_chain = parent.stack().push(propagate_fn).fold();
46  parent.node()->AddChild(this, lop_chain);
47  }
48 
49  //! A CollapseNode cannot be executed, it never contains any data.
50  bool ForwardDataOnly() const final { return true; }
51 
52  bool RequireParentPushData(size_t /* parent_index */) const final
53  { return true; }
54 
55  void Execute() final { abort(); }
56 
57  void StartPreOp(size_t /* parent_index */) final {
58  for (typename Super::Child& child : Super::children_)
59  child.node->StartPreOp(child.parent_index);
60  }
61 
62  //! Receive a whole data::File of ValueType, but only if our stack is empty.
63  bool OnPreOpFile(const data::File& file, size_t /* parent_index */) final {
64  if (!parent_stack_empty_) {
65  LOGC(context_.my_rank() == 0)
66  << "Collapse rejected File from parent "
67  << "due to non-empty function stack.";
68  return false;
69  }
70 
71  // forward file
72  LOGC(context_.my_rank() == 0)
73  << "Collapse accepted File from parent";
74  data::File file_copy = file.Copy();
75  this->PushFile(file_copy, /* consume */ true);
76  return true;
77  }
78 
79  void StopPreOp(size_t /* parent_index */) final {
80  for (typename Super::Child& child : Super::children_)
81  child.node->StopPreOp(child.parent_index);
82  }
83 
84  void PushData(bool /* consume */) final { }
85 
86  size_t consume_counter() const final {
87  // calculate consumption of parents
88  size_t c = Super::kNeverConsume;
89  for (auto& p : Super::parents_) {
90  c = std::min(c, p->consume_counter());
91  }
92  return c;
93  }
94 
95  void IncConsumeCounter(size_t consume) final {
96  // propagate consumption up to parents.
97  for (auto& p : Super::parents_) {
98  p->IncConsumeCounter(consume);
99  }
100  }
101 
102  void DecConsumeCounter(size_t consume) final {
103  // propagate consumption up to parents.
104  for (auto& p : Super::parents_) {
105  p->DecConsumeCounter(consume);
106  }
107  }
108 
109  void SetConsumeCounter(size_t consume) final {
110  // propagate consumption up to parents.
111  for (auto& p : Super::parents_) {
112  p->SetConsumeCounter(consume);
113  }
114  }
115 
116 private:
117  //! Whether the parent stack is empty
118  const bool parent_stack_empty_;
119 };
120 
121 #ifndef THRILL_DOXYGEN_IGNORE
122 
123 template <typename ValueType, typename Stack>
124 template <typename AnyStack>
125 DIA<ValueType, Stack>::DIA(const DIA<ValueType, AnyStack>& rhs)
126 // Create new CollapseNode. Transfer stack from rhs to CollapseNode. Build new
127 // DIA with empty stack and CollapseNode
128  : DIA(tlx::make_counting<api::CollapseNode<ValueType> >(rhs)) {
129  LOG0 << "WARNING: cast to DIA creates CollapseNode instead of inline chaining.";
130  LOG0 << "Consider whether you can use auto instead of DIA.";
131 }
132 
133 #endif // THRILL_DOXYGEN_IGNORE
134 
135 //! Template switch to generate a CollapseNode if there is a non-empty Stack
136 template <typename ValueType, typename Stack>
139  assert(dia.IsValid());
140 
141  // Create new CollapseNode. Transfer stack from rhs to
142  // CollapseNode. Build new DIA with empty stack and CollapseNode
144 
145  return DIA<ValueType>(tlx::make_counting<CollapseNode>(dia));
146  }
147 };
148 
149 //! Template switch to NOT generate a CollapseNode if there is an empty Stack.
150 template <typename ValueType>
151 struct CollapseSwitch<ValueType, tlx::FunctionStack<ValueType> > {
153  const DIA<ValueType, tlx::FunctionStack<ValueType> >& dia) {
154  return dia;
155  }
156 };
157 
158 template <typename ValueType, typename Stack>
161 }
162 
163 } // namespace api
164 } // namespace thrill
165 
166 #endif // !THRILL_API_COLLAPSE_HEADER
167 
168 /******************************************************************************/
void PushFile(data::File &file, bool consume) const
Definition: dia_node.hpp:156
Template switch to generate a CollapseNode if there is a non-empty Stack.
Definition: collapse.hpp:137
CountingPtr< Type > make_counting(Args &&...args)
method analogous to std::make_shared and std::make_unique.
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
virtual void DecConsumeCounter(size_t counter)
Definition: dia_base.hpp:237
virtual size_t consume_counter() const
Returns consume_counter_.
Definition: dia_base.hpp:226
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:27
DIA()=default
default-constructor: invalid DIA
bool IsValid() const
Return whether the DIA is valid.
Definition: dia.hpp:175
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
A DIANode is a typed node representing and operation in Thrill.
Definition: dia_node.hpp:37
A FunctionStack is a chain of functor that can be folded to a single functor (which is usually optimi...
CollapseNode(const ParentDIA &parent)
Constructor for a LOpNode.
Definition: collapse.hpp:39
static DIA< ValueType > MakeCollapse(const DIA< ValueType, Stack > &dia)
Definition: collapse.hpp:138
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
std::vector< DIABasePtr > parents_
Parents of this DIABase.
Definition: dia_base.hpp:310
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
static DIA< ValueType > MakeCollapse(const DIA< ValueType, tlx::FunctionStack< ValueType > > &dia)
Definition: collapse.hpp:152
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
virtual void SetConsumeCounter(size_t counter)
Definition: dia_base.hpp:248
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
std::vector< Child > children_
Callback functions from the child nodes.
Definition: dia_node.hpp:181
virtual bool OnPreOpFile(const data::File &, size_t)
Definition: dia_base.hpp:168
virtual bool ForwardDataOnly() const
Definition: dia_base.hpp:148
virtual bool RequireParentPushData(size_t) const
Definition: dia_base.hpp:153
virtual void IncConsumeCounter(size_t counter)
Definition: dia_base.hpp:230
static constexpr size_t kNeverConsume
Never full consume.
Definition: dia_base.hpp:324
Context & context_
associated Context
Definition: dia_base.hpp:293
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
DIA< ValueType > Collapse() const
Create a CollapseNode which is mainly used to collapse the LOp chain into a DIA<T> with an empty stac...
Definition: collapse.hpp:159