Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
collapse.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/collapse.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
7  * Copyright (C) 2016 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_API_COLLAPSE_HEADER
14 #define THRILL_API_COLLAPSE_HEADER
15 
16 #include <thrill/api/dia.hpp>
17 #include <thrill/api/dia_node.hpp>
19 
20 #include <algorithm>
21 
22 namespace thrill {
23 namespace api {
24 
25 /*!
26  * \ingroup api_layer
27  */
28 template <typename ValueType>
29 class CollapseNode final : public DIANode<ValueType>
30 {
31 public:
33  using Super::context_;
34 
35  /*!
36  * Constructor for a LOpNode. Sets the Context, parents and stack.
37  */
38  template <typename ParentDIA>
39  explicit CollapseNode(const ParentDIA& parent)
40  : Super(parent.ctx(), "Collapse", { parent.id() }, { parent.node() }),
41  parent_stack_empty_(ParentDIA::stack_empty)
42  {
43  auto propagate_fn = [this](const ValueType& input) {
44  this->PushItem(input);
45  };
46  auto lop_chain = parent.stack().push(propagate_fn).fold();
47  parent.node()->AddChild(this, lop_chain);
48  }
49 
50  //! A CollapseNode cannot be executed, it never contains any data.
51  bool ForwardDataOnly() const final { return true; }
52 
53  bool RequireParentPushData(size_t /* parent_index */) const final
54  { return true; }
55 
56  void Execute() final { abort(); }
57 
58  void StartPreOp(size_t /* id */) final {
59  for (typename Super::Child& child : Super::children_)
60  child.node->StartPreOp(child.parent_index);
61  }
62 
63  //! Receive a whole data::File of ValueType, but only if our stack is empty.
64  bool OnPreOpFile(const data::File& file, size_t /* parent_index */) final {
65  if (!parent_stack_empty_) {
66  LOGC(context_.my_rank() == 0)
67  << "Collapse rejected File from parent "
68  << "due to non-empty function stack.";
69  return false;
70  }
71 
72  // forward file
73  LOGC(context_.my_rank() == 0)
74  << "Collapse accepted File from parent";
75  data::File file_copy = file.Copy();
76  this->PushFile(file_copy, /* consume */ true);
77  return true;
78  }
79 
80  void StopPreOp(size_t /* id */) final {
81  for (typename Super::Child& child : Super::children_)
82  child.node->StopPreOp(child.parent_index);
83  }
84 
85  void PushData(bool /* consume */) final { }
86 
87  size_t consume_counter() const final {
88  // calculate consumption of parents
89  size_t c = Super::kNeverConsume;
90  for (auto& p : Super::parents_) {
91  c = std::min(c, p->consume_counter());
92  }
93  return c;
94  }
95 
96  void IncConsumeCounter(size_t consume) final {
97  // propagate consumption up to parents.
98  for (auto& p : Super::parents_) {
99  p->IncConsumeCounter(consume);
100  }
101  }
102 
103  void DecConsumeCounter(size_t consume) final {
104  // propagate consumption up to parents.
105  for (auto& p : Super::parents_) {
106  p->DecConsumeCounter(consume);
107  }
108  }
109 
110  void SetConsumeCounter(size_t consume) final {
111  // propagate consumption up to parents.
112  for (auto& p : Super::parents_) {
113  p->SetConsumeCounter(consume);
114  }
115  }
116 
117 private:
118  //! Whether the parent stack is empty
119  const bool parent_stack_empty_;
120 };
121 
122 #ifndef THRILL_DOXYGEN_IGNORE
123 
124 template <typename ValueType, typename Stack>
125 template <typename AnyStack>
126 DIA<ValueType, Stack>::DIA(const DIA<ValueType, AnyStack>& rhs)
127 // Create new CollapseNode. Transfer stack from rhs to CollapseNode. Build new
128 // DIA with empty stack and CollapseNode
129  : DIA(tlx::make_counting<api::CollapseNode<ValueType> >(rhs)) {
130  LOG0 << "WARNING: cast to DIA creates CollapseNode instead of inline chaining.";
131  LOG0 << "Consider whether you can use auto instead of DIA.";
132 }
133 
134 #endif // THRILL_DOXYGEN_IGNORE
135 
136 //! Template switch to generate a CollapseNode if there is a non-empty Stack
137 template <typename ValueType, typename Stack>
140  assert(dia.IsValid());
141 
142  // Create new CollapseNode. Transfer stack from rhs to
143  // CollapseNode. Build new DIA with empty stack and CollapseNode
145 
146  return DIA<ValueType>(tlx::make_counting<CollapseNode>(dia));
147  }
148 };
149 
150 //! Template switch to NOT generate a CollapseNode if there is an empty Stack.
151 template <typename ValueType>
152 struct CollapseSwitch<ValueType, tlx::FunctionStack<ValueType> >{
154  const DIA<ValueType, tlx::FunctionStack<ValueType> >& dia) {
155  return dia;
156  }
157 };
158 
159 template <typename ValueType, typename Stack>
162 }
163 
164 } // namespace api
165 } // namespace thrill
166 
167 #endif // !THRILL_API_COLLAPSE_HEADER
168 
169 /******************************************************************************/
void PushFile(data::File &file, bool consume) const
Definition: dia_node.hpp:156
Template switch to generate a CollapseNode if there is a non-empty Stack.
Definition: collapse.hpp:138
CountingPtr< Type > make_counting(Args &&...args)
method analogous to std::make_shared and std::make_unique.
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:240
virtual void DecConsumeCounter(size_t counter)
Definition: dia_base.hpp:237
virtual size_t consume_counter() const
Returns consume_counter_.
Definition: dia_base.hpp:226
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:144
DIA()=default
default-constructor: invalid DIA
bool IsValid() const
Return whether the DIA is valid.
Definition: dia.hpp:175
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
A DIANode is a typed node representing and operation in Thrill.
Definition: dia_node.hpp:37
A FunctionStack is a chain of functor that can be folded to a single functor (which is usually optimi...
CollapseNode(const ParentDIA &parent)
Constructor for a LOpNode.
Definition: collapse.hpp:39
static DIA< ValueType > MakeCollapse(const DIA< ValueType, Stack > &dia)
Definition: collapse.hpp:139
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
std::vector< DIABasePtr > parents_
Parents of this DIABase.
Definition: dia_base.hpp:310
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
static DIA< ValueType > MakeCollapse(const DIA< ValueType, tlx::FunctionStack< ValueType > > &dia)
Definition: collapse.hpp:153
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
virtual void SetConsumeCounter(size_t counter)
Definition: dia_base.hpp:248
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
std::vector< Child > children_
Callback functions from the child nodes.
Definition: dia_node.hpp:181
virtual bool OnPreOpFile(const data::File &, size_t)
Definition: dia_base.hpp:168
virtual bool ForwardDataOnly() const
Definition: dia_base.hpp:148
virtual bool RequireParentPushData(size_t) const
Definition: dia_base.hpp:153
virtual void IncConsumeCounter(size_t counter)
Definition: dia_base.hpp:230
static constexpr size_t kNeverConsume
Never full consume.
Definition: dia_base.hpp:324
Context & context_
associated Context
Definition: dia_base.hpp:293
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:137
DIA< ValueType > Collapse() const
Create a CollapseNode which is mainly used to collapse the LOp chain into a DIA<T> with an empty stac...
Definition: collapse.hpp:160