Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
union.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/union.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_API_UNION_HEADER
13 #define THRILL_API_UNION_HEADER
14 
15 #include <thrill/api/dia.hpp>
16 #include <thrill/api/dia_node.hpp>
18 #include <tlx/meta/vexpand.hpp>
19 
20 #include <algorithm>
21 #include <initializer_list>
22 #include <vector>
23 
24 namespace thrill {
25 namespace api {
26 
27 /*!
28  * Implements a Union operation by hooking any number of input DIANodes and
29  * forwarding the output immediately to all children.
30  *
31  * The problem with Union is that children can be added to the node after
32  * creation (as with all other nodes). This however requires the UnionNode to
33  * remember which of its children has already got which input's items. This is
34  * recorded in each UnionChild's pushed_inputs array.
35  *
36  * For example this occurs in the following DIA graph:
37  *
38  * [Gen.1] ------v
39  * [Union.3] --> [Sort.4] -------> [Size.6]
40  * +---^ +-------------------------------> [Size.7]
41  * [Gen.2] --+
42  * +------------------------> [Size.5]
43  *
44  * Size.5 triggers Execute and PushData such that Sort.4's PreOp gets all data
45  * from Gen.2, but not from Gen.1. Then, running Size.6 requires Union.3 to get
46  * data from Gen.1 and NOT run Gen.2 again.
47  *
48  * Another situation then occur, when [Size.7] is added later.
49  *
50  * \ingroup api_layer
51  */
52 template <typename ValueType>
53 class UnionNode final : public DIANode<ValueType>
54 {
55  static constexpr bool debug = false;
56 
57 public:
59  using Super::context_;
60  using Callback = typename Super::Callback;
61 
62  enum class ChildStatus { NEW, PUSHING, DONE };
63 
64  struct UnionChild {
65  //! reference to child node
67  //! callback to invoke (currently for each item)
69  //! index this node has among the parents of the child (passed to
70  //! callbacks), e.g. for ZipNode which has multiple parents and their
71  //! order is important.
72  size_t parent_index;
73  //! status of the child.
75  //! vector of inputs which were delivered to this child.
76  std::vector<size_t> pushed_inputs;
77 
78  //! check if all inputs were pushed to the child
79  bool AllInputsDone() const {
80  for (const size_t& i : pushed_inputs)
81  if (!i) return false;
82  return true;
83  }
84  };
85 
86  //! Constructor for variant with variadic parent parameter pack, which each
87  //! parent may have a different FunctionStack.
88  template <typename ParentDIA0, typename... ParentDIAs>
89  explicit UnionNode(const ParentDIA0& parent0,
90  const ParentDIAs& ... parents)
91  : Super(parent0.ctx(), "Union",
92  { parent0.id(), parents.id() ... },
93  { parent0.node(), parents.node() ... }),
94  num_inputs_(1 + sizeof ... (ParentDIAs))
95  {
97  RegisterParent(this), parent0, parents...);
98  }
99 
100  //! Constructor for variant with a std::vector of parents all with the same
101  //! (usually empty) FunctionStack.
102  template <typename ParentDIA>
103  explicit UnionNode(const std::vector<ParentDIA>& parents)
104  : Super(parents.front().ctx(), "Union",
105  common::MapVector(
106  parents, [](const ParentDIA& d) { return d.id(); }),
108  parents, [](const ParentDIA& d) {
109  return DIABasePtr(d.node().get());
110  })),
111  num_inputs_(parents.size())
112  {
113  for (size_t i = 0; i < num_inputs_; ++i)
114  {
115  auto propagate_fn = [this, i](const ValueType& input) {
116  this->PushItem(input, i);
117  };
118 
119  // close the function stacks with our pre ops and register it at
120  // parent nodes for output
121  auto lop_chain = parents[i].stack().push(propagate_fn).fold();
122  parents[i].node()->AddChild(this, lop_chain, i);
123  }
124  }
125 
126  //! Constructor for variant with a std::initializer_list of parents all with
127  //! the same (usually empty) FunctionStack.
128  template <typename ParentDIA>
129  explicit UnionNode(const std::initializer_list<ParentDIA>& parents)
130  : UnionNode(std::vector<ParentDIA>(parents)) { }
131 
132  //! Register Parent Hooks, operator() is instantiated and called for each
133  //! Union parent
134  class RegisterParent
135  {
136  public:
137  explicit RegisterParent(UnionNode* union_node)
138  : union_node_(union_node) { }
139 
140  template <typename Index, typename Parent>
141  void operator () (const Index&, Parent& parent) {
142 
143  UnionNode* union_node = union_node_;
144  auto propagate_fn = [union_node](const ValueType& input) {
145  union_node->PushItem(input, Index::index);
146  };
147 
148  // close the function stacks with our pre ops and register it at
149  // parent nodes for output
150  auto lop_chain = parent.stack().push(propagate_fn).fold();
151  parent.node()->AddChild(union_node_, lop_chain, Index::index);
152  }
153 
154  private:
155  UnionNode* union_node_;
156  };
157 
158  /*!
159  * Enables children to push their "folded" function chains to their parent.
160  * This way the parent can push all its result elements to each of the
161  * children. This procedure enables the minimization of IO-accesses.
162  */
163  void AddChild(DIABase* node, const Callback& callback,
164  size_t parent_index = 0) final {
165  children_.emplace_back(UnionChild {
166  node, callback, parent_index,
167  ChildStatus::NEW, std::vector<size_t>(num_inputs_)
168  });
169  }
170 
171  //! Remove a child from the vector of children. This method is called by the
172  //! destructor of children.
173  void RemoveChild(DIABase* node) final {
174  typename std::vector<UnionChild>::iterator swap_end =
175  std::remove_if(
176  children_.begin(), children_.end(),
177  [node](const UnionChild& c) { return c.node == node; });
178 
179  // assert(swap_end != children_.end());
180  children_.erase(swap_end, children_.end());
181  }
182 
183  void RemoveAllChildren() final {
184  // remove all children other than Collapse and Union nodes
185  children_.erase(
186  std::remove_if(
187  children_.begin(), children_.end(),
188  [this](UnionChild& child) {
189  if (child.status != ChildStatus::DONE)
190  return false;
191  if (child.node->ForwardDataOnly())
192  return false;
193  child.node->RemoveParent(this);
194  return true;
195  }),
196  children_.end());
197 
198  // recurse into remaining nodes (CollapseNode and UnionNode)
199  for (UnionChild& child : children_) {
200  if (!child.node->ForwardDataOnly())
201  continue;
202  child.node->RemoveAllChildren();
203  }
204  }
205 
206  //! Returns the children of this DIABase.
207  std::vector<DIABase*> children() const final {
208  std::vector<DIABase*> out;
209  out.reserve(children_.size());
210  for (const UnionChild& child : children_)
211  out.emplace_back(child.node);
212  return out;
213  }
214 
215  //! A UnionNode cannot be executed, it never contains any data.
216  bool ForwardDataOnly() const final { return true; }
217 
218  //! Check whether we need PushData() from the specific parent
219  bool RequireParentPushData(size_t parent_index) const final {
220  for (const UnionChild& child : children_) {
221  if (!child.pushed_inputs[parent_index]) return true;
222  }
223  return false;
224  }
225 
226  void Execute() final { abort(); }
227 
228  void StartPreOp(size_t id) final {
229  LOG0 << "UnionNode::StartPreOp id=" << id;
230  for (UnionChild& child : children_) {
231  if (child.status == ChildStatus::NEW) {
232  // start push to NEW child
233  LOG << "UnionNode::StartPreOp triggered"
234  << " StartPreOp on child " << *child.node;
235  child.node->StartPreOp(child.parent_index);
236  child.status = ChildStatus::PUSHING;
237  }
238  }
239  }
240 
241  //! Method for derived classes to Push a single item to all children.
242  void PushItem(const ValueType& item, size_t parent_index) const {
243  for (const UnionChild& child : children_) {
244  if (!child.pushed_inputs[parent_index])
245  child.callback(item);
246  }
247  }
248 
249  void StopPreOp(size_t id) final {
250  LOG0 << "UnionNode::StopPreOp id=" << id;
251  for (UnionChild& child : children_) {
252  if (child.status == ChildStatus::PUSHING) {
253  assert(!child.pushed_inputs[id]);
254  child.pushed_inputs[id] = 1;
255  }
256  if (child.AllInputsDone()) {
257  LOG << "UnionNode::StopPreOp triggered"
258  << " StopPreOp on child " << *child.node;
259  child.node->StopPreOp(child.parent_index);
260  child.status = ChildStatus::DONE;
261  }
262  }
263  }
264 
265  void RunPushData() final { abort(); }
266 
267  void PushData(bool /* consume */) final { abort(); }
268 
269  size_t consume_counter() const final {
270  // calculate consumption of parents
271  size_t c = Super::kNeverConsume;
272  for (auto& p : Super::parents_) {
273  c = std::min(c, p->consume_counter());
274  }
275  return c;
276  }
277 
278  void IncConsumeCounter(size_t consume) final {
279  // propagate consumption up to parents.
280  for (auto& p : Super::parents_) {
281  p->IncConsumeCounter(consume);
282  }
283  }
284 
285  void DecConsumeCounter(size_t consume) final {
286  // propagate consumption up to parents.
287  for (auto& p : Super::parents_) {
288  p->DecConsumeCounter(consume);
289  }
290  }
291 
292  void SetConsumeCounter(size_t consume) final {
293  // propagate consumption up to parents.
294  for (auto& p : Super::parents_) {
295  p->SetConsumeCounter(consume);
296  }
297  }
298 
299 private:
300  size_t num_inputs_;
301 
302  //! Callback functions from the child nodes.
303  std::vector<UnionChild> children_;
304 };
305 
306 /*!
307  * Union is a LOp, which creates the union of all items from any number of DIAs
308  * as a single DIA, where the items are in an arbitrary order. All input DIAs
309  * must contain the same type, which is also the output DIA's type.
310  *
311  * The Union operation concatenates all _local_ pieces of a DIA, no rebalancing
312  * is performed, and no communication is needed.
313  *
314  * \param first_dia first DIA
315  * \param dias DIAs, which are unified with the first DIA.
316  *
317  * \ingroup dia_lops
318  */
319 template <typename FirstDIA, typename... DIAs>
320 auto Union(const FirstDIA& first_dia, const DIAs& ... dias) {
321 
322  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
323 
324  using ValueType = typename FirstDIA::ValueType;
325 
327 
328  return DIA<ValueType>(tlx::make_counting<UnionNode>(first_dia, dias...));
329 }
330 
331 /*!
332  * Union is a LOp, which creates the union of all items from any number of DIAs
333  * as a single DIA, where the items are in an arbitrary order. All input DIAs
334  * must contain the same type, which is also the output DIA's type.
335  *
336  * The Union operation concatenates all _local_ pieces of a DIA, no rebalancing
337  * is performed, and no communication is needed.
338  *
339  * \param dias DIAs, which are unified.
340  *
341  * \ingroup dia_lops
342  */
343 template <typename ValueType>
344 auto Union(const std::initializer_list<DIA<ValueType> >& dias) {
345 
346  for (const DIA<ValueType>& d : dias)
347  d.AssertValid();
348 
350 
351  return DIA<ValueType>(tlx::make_counting<UnionNode>(dias));
352 }
353 
354 /*!
355  * Union is a LOp, which creates the union of all items from any number of DIAs
356  * as a single DIA, where the items are in an arbitrary order. All input DIAs
357  * must contain the same type, which is also the output DIA's type.
358  *
359  * The Union operation concatenates all _local_ pieces of a DIA, no rebalancing
360  * is performed, and no communication is needed.
361  *
362  * \param dias DIAs, which are unified.
363  *
364  * \ingroup dia_lops
365  */
366 template <typename ValueType>
367 auto Union(const std::vector<DIA<ValueType> >& dias) {
368 
369  for (const DIA<ValueType>& d : dias)
370  d.AssertValid();
371 
373 
374  return DIA<ValueType>(tlx::make_counting<UnionNode>(dias));
375 }
376 
377 template <typename ValueType, typename Stack>
378 template <typename SecondDIA>
380  const SecondDIA& second_dia) const {
381  return api::Union(*this, second_dia);
382 }
383 
384 } // namespace api
385 
386 //! imported from api namespace
387 using api::Union;
388 
389 } // namespace thrill
390 
391 #endif // !THRILL_API_UNION_HEADER
392 
393 /******************************************************************************/
void SetConsumeCounter(size_t consume) final
Definition: union.hpp:292
void Execute() final
Virtual execution method. Triggers actual computation in sub-classes.
Definition: union.hpp:226
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
DIABase * node
reference to child node
Definition: union.hpp:66
void StartPreOp(size_t id) final
Virtual method for preparing start of PushData.
Definition: union.hpp:228
static constexpr bool debug
Definition: union.hpp:55
DIANode< ValueType > Super
Definition: union.hpp:58
void vexpand(Types &&...)
Definition: vexpand.hpp:24
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:27
auto Union(const FirstDIA &first_dia, const DIAs &...dias)
Union is a LOp, which creates the union of all items from any number of DIAs as a single DIA...
Definition: union.hpp:320
bool RequireParentPushData(size_t parent_index) const final
Check whether we need PushData() from the specific parent.
Definition: union.hpp:219
tlx::delegate< void(const ValueType &)> Callback
Definition: dia_node.hpp:40
ChildStatus status
status of the child.
Definition: union.hpp:74
void IncConsumeCounter(size_t consume) final
Definition: union.hpp:278
void RunPushData() final
Definition: union.hpp:265
auto MapVector(const std::vector< Type > &input, const Functor &f) -> std::vector< typename std::result_of< Functor(Type)>::type >
Definition: functional.hpp:98
void StopPreOp(size_t id) final
Virtual method for preparing end of PushData.
Definition: union.hpp:249
bool ForwardDataOnly() const final
A UnionNode cannot be executed, it never contains any data.
Definition: union.hpp:216
A DIANode is a typed node representing and operation in Thrill.
Definition: dia_node.hpp:37
std::vector< size_t > pushed_inputs
vector of inputs which were delivered to this child.
Definition: union.hpp:76
The DIABase is the untyped super class of DIANode.
Definition: dia_base.hpp:87
void RemoveChild(DIABase *node) final
Definition: union.hpp:173
const size_t & id() const
return unique id() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213
Implements a Union operation by hooking any number of input DIANodes and forwarding the output immedi...
Definition: union.hpp:53
std::vector< DIABasePtr > parents_
Parents of this DIABase.
Definition: dia_base.hpp:310
std::vector< DIABase * > children() const final
Returns the children of this DIABase.
Definition: union.hpp:207
std::vector< T, Allocator< T > > vector
vector with Manager tracking
Definition: allocator.hpp:228
void DecConsumeCounter(size_t consume) final
Definition: union.hpp:285
void RemoveAllChildren() final
Definition: union.hpp:183
void PushItem(const ValueType &item, size_t parent_index) const
Method for derived classes to Push a single item to all children.
Definition: union.hpp:242
Callback callback
callback to invoke (currently for each item)
Definition: union.hpp:68
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
tlx::CountingPtr< DIABase > DIABasePtr
Definition: dia_base.hpp:90
bool AllInputsDone() const
check if all inputs were pushed to the child
Definition: union.hpp:79
UnionNode(const ParentDIA0 &parent0, const ParentDIAs &...parents)
Definition: union.hpp:89
size_t consume_counter() const final
Returns consume_counter_.
Definition: union.hpp:269
void PushData(bool) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
Definition: union.hpp:267
const std::vector< DIABasePtr > & parents() const
Returns the parents of this DIABase.
Definition: dia_base.hpp:253
void call_foreach_with_index(Functor &&f, Args &&...args)
std::vector< UnionChild > children_
Callback functions from the child nodes.
Definition: union.hpp:303
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
auto Union(const SecondDIA &second_dia) const
Union is a LOp, which creates the union of all items from any number of DIAs as a single DIA...
Definition: union.hpp:379
static constexpr size_t kNeverConsume
Never full consume.
Definition: dia_base.hpp:324
Context & context_
associated Context
Definition: dia_base.hpp:293
void AddChild(DIABase *node, const Callback &callback, size_t parent_index=0) final
Enables children to push their "folded" function chains to their parent.
Definition: union.hpp:163