Thrill  0.1
union.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/union.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_API_UNION_HEADER
13 #define THRILL_API_UNION_HEADER
14 
15 #include <thrill/api/dia.hpp>
16 #include <thrill/api/dia_node.hpp>
18 #include <tlx/meta/vexpand.hpp>
19 
20 #include <algorithm>
21 #include <initializer_list>
22 #include <vector>
23 
24 namespace thrill {
25 namespace api {
26 
27 /*!
28  * Implements a Union operation by hooking any number of input DIANodes and
29  * forwarding the output immediately to all children.
30  *
31  * The problem with Union is that children can be added to the node after
32  * creation (as with all other nodes). This however requires the UnionNode to
33  * remember which of its children has already got which input's items. This is
34  * recorded in each UnionChild's pushed_inputs array.
35  *
36  * For example this occurs in the following DIA graph:
37  *
38  * [Gen.1] ------v
39  * [Union.3] --> [Sort.4] -------> [Size.6]
40  * +---^ +-------------------------------> [Size.7]
41  * [Gen.2] --+
42  * +------------------------> [Size.5]
43  *
44  * Size.5 triggers Execute and PushData such that Sort.4's PreOp gets all data
45  * from Gen.2, but not from Gen.1. Then, running Size.6 requires Union.3 to get
46  * data from Gen.1 and NOT run Gen.2 again.
47  *
48  * Another situation then occur, when [Size.7] is added later.
49  *
50  * \ingroup api_layer
51  */
52 template <typename ValueType>
53 class UnionNode final : public DIANode<ValueType>
54 {
55  static constexpr bool debug = false;
56 
57 public:
59  using Super::context_;
60  using Callback = typename Super::Callback;
61 
62  enum class ChildStatus { NEW, PUSHING, DONE };
63 
64  struct UnionChild {
65  //! reference to child node
67  //! callback to invoke (currently for each item)
69  //! index this node has among the parents of the child (passed to
70  //! callbacks), e.g. for ZipNode which has multiple parents and their
71  //! order is important.
72  size_t parent_index;
73  //! status of the child.
75  //! vector of inputs which were delivered to this child.
76  std::vector<size_t> pushed_inputs;
77 
78  //! check if all inputs were pushed to the child
79  bool AllInputsDone() const {
80  for (const size_t& i : pushed_inputs)
81  if (!i) return false;
82  return true;
83  }
84  };
85 
86  //! Constructor for variant with variadic parent parameter pack, which each
87  //! parent may have a different FunctionStack.
88  template <typename ParentDIA0, typename... ParentDIAs>
89  explicit UnionNode(const ParentDIA0& parent0,
90  const ParentDIAs& ... parents)
91  : Super(parent0.ctx(), "Union",
92  { parent0.id(), parents.id() ... },
93  { parent0.node(), parents.node() ... }),
94  num_inputs_(1 + sizeof ... (ParentDIAs)) {
96  RegisterParent(this), parent0, parents...);
97  }
98 
99  //! Constructor for variant with a std::vector of parents all with the same
100  //! (usually empty) FunctionStack.
101  template <typename ParentDIA>
102  explicit UnionNode(const std::vector<ParentDIA>& parents)
103  : Super(parents.front().ctx(), "Union",
104  common::MapVector(
105  parents, [](const ParentDIA& d) { return d.id(); }),
107  parents, [](const ParentDIA& d) {
108  return DIABasePtr(d.node().get());
109  })),
110  num_inputs_(parents.size())
111  {
112  for (size_t i = 0; i < num_inputs_; ++i)
113  {
114  auto propagate_fn = [this, i](const ValueType& input) {
115  this->PushItem(input, i);
116  };
117 
118  // close the function stacks with our pre ops and register it at
119  // parent nodes for output
120  auto lop_chain = parents[i].stack().push(propagate_fn).fold();
121  parents[i].node()->AddChild(this, lop_chain, i);
122  }
123  }
124 
125  //! Constructor for variant with a std::initializer_list of parents all with
126  //! the same (usually empty) FunctionStack.
127  template <typename ParentDIA>
128  explicit UnionNode(const std::initializer_list<ParentDIA>& parents)
129  : UnionNode(std::vector<ParentDIA>(parents)) { }
130 
131  //! Register Parent Hooks, operator() is instantiated and called for each
132  //! Union parent
134  {
135  public:
136  explicit RegisterParent(UnionNode* union_node)
137  : union_node_(union_node) { }
138 
139  template <typename Index, typename Parent>
140  void operator () (const Index&, Parent& parent) {
141 
142  UnionNode* union_node = union_node_;
143  auto propagate_fn = [union_node](const ValueType& input) {
144  union_node->PushItem(input, Index::index);
145  };
146 
147  // close the function stacks with our pre ops and register it at
148  // parent nodes for output
149  auto lop_chain = parent.stack().push(propagate_fn).fold();
150  parent.node()->AddChild(union_node_, lop_chain, Index::index);
151  }
152 
153  private:
155  };
156 
157  /*!
158  * Enables children to push their "folded" function chains to their parent.
159  * This way the parent can push all its result elements to each of the
160  * children. This procedure enables the minimization of IO-accesses.
161  */
162  void AddChild(DIABase* node, const Callback& callback,
163  size_t parent_index = 0) final {
164  children_.emplace_back(UnionChild {
165  node, callback, parent_index,
166  ChildStatus::NEW, std::vector<size_t>(num_inputs_)
167  });
168  }
169 
170  //! Remove a child from the vector of children. This method is called by the
171  //! destructor of children.
172  void RemoveChild(DIABase* node) final {
173  typename std::vector<UnionChild>::iterator swap_end =
174  std::remove_if(
175  children_.begin(), children_.end(),
176  [node](const UnionChild& c) { return c.node == node; });
177 
178  // assert(swap_end != children_.end());
179  children_.erase(swap_end, children_.end());
180  }
181 
182  void RemoveAllChildren() final {
183  // remove all children other than Collapse and Union nodes
184  children_.erase(
185  std::remove_if(
186  children_.begin(), children_.end(),
187  [this](UnionChild& child) {
188  if (child.status != ChildStatus::DONE)
189  return false;
190  if (child.node->ForwardDataOnly())
191  return false;
192  child.node->RemoveParent(this);
193  return true;
194  }),
195  children_.end());
196 
197  // recurse into remaining nodes (CollapseNode and UnionNode)
198  for (UnionChild& child : children_) {
199  if (!child.node->ForwardDataOnly())
200  continue;
201  child.node->RemoveAllChildren();
202  }
203  }
204 
205  //! Returns the children of this DIABase.
206  std::vector<DIABase*> children() const final {
207  std::vector<DIABase*> out;
208  out.reserve(children_.size());
209  for (const UnionChild& child : children_)
210  out.emplace_back(child.node);
211  return out;
212  }
213 
214  //! A UnionNode cannot be executed, it never contains any data.
215  bool ForwardDataOnly() const final { return true; }
216 
217  //! Check whether we need PushData() from the specific parent
218  bool RequireParentPushData(size_t parent_index) const final {
219  for (const UnionChild& child : children_) {
220  if (!child.pushed_inputs[parent_index]) return true;
221  }
222  return false;
223  }
224 
225  void Execute() final { abort(); }
226 
227  void StartPreOp(size_t parent_index) final {
228  LOG0 << "UnionNode::StartPreOp parent_index=" << parent_index;
229  for (UnionChild& child : children_) {
230  if (child.status == ChildStatus::NEW) {
231  // start push to NEW child
232  LOG << "UnionNode::StartPreOp triggered"
233  << " StartPreOp on child " << *child.node;
234  child.node->StartPreOp(child.parent_index);
235  child.status = ChildStatus::PUSHING;
236  }
237  }
238  }
239 
240  //! Method for derived classes to Push a single item to all children.
241  void PushItem(const ValueType& item, size_t parent_index) const {
242  for (const UnionChild& child : children_) {
243  if (!child.pushed_inputs[parent_index])
244  child.callback(item);
245  }
246  }
247 
248  void StopPreOp(size_t parent_index) final {
249  LOG0 << "UnionNode::StopPreOp parent_index=" << parent_index;
250  for (UnionChild& child : children_) {
251  if (child.status == ChildStatus::PUSHING) {
252  assert(!child.pushed_inputs[parent_index]);
253  child.pushed_inputs[parent_index] = 1;
254  }
255  if (child.AllInputsDone()) {
256  LOG << "UnionNode::StopPreOp triggered"
257  << " StopPreOp on child " << *child.node;
258  child.node->StopPreOp(child.parent_index);
259  child.status = ChildStatus::DONE;
260  }
261  }
262  }
263 
264  void RunPushData() final { abort(); }
265 
266  void PushData(bool /* consume */) final { abort(); }
267 
268  size_t consume_counter() const final {
269  // calculate consumption of parents
270  size_t c = Super::kNeverConsume;
271  for (auto& p : Super::parents_) {
272  c = std::min(c, p->consume_counter());
273  }
274  return c;
275  }
276 
277  void IncConsumeCounter(size_t consume) final {
278  // propagate consumption up to parents.
279  for (auto& p : Super::parents_) {
280  p->IncConsumeCounter(consume);
281  }
282  }
283 
284  void DecConsumeCounter(size_t consume) final {
285  // propagate consumption up to parents.
286  for (auto& p : Super::parents_) {
287  p->DecConsumeCounter(consume);
288  }
289  }
290 
291  void SetConsumeCounter(size_t consume) final {
292  // propagate consumption up to parents.
293  for (auto& p : Super::parents_) {
294  p->SetConsumeCounter(consume);
295  }
296  }
297 
298 private:
299  size_t num_inputs_;
300 
301  //! Callback functions from the child nodes.
302  std::vector<UnionChild> children_;
303 };
304 
305 /*!
306  * Union is a LOp, which creates the union of all items from any number of DIAs
307  * as a single DIA, where the items are in an arbitrary order. All input DIAs
308  * must contain the same type, which is also the output DIA's type.
309  *
310  * The Union operation concatenates all _local_ pieces of a DIA, no rebalancing
311  * is performed, and no communication is needed.
312  *
313  * \param first_dia first DIA
314  * \param dias DIAs, which are unified with the first DIA.
315  *
316  * \ingroup dia_lops
317  */
318 template <typename FirstDIA, typename... DIAs>
319 auto Union(const FirstDIA& first_dia, const DIAs& ... dias) {
320 
321  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
322 
323  using ValueType = typename FirstDIA::ValueType;
324 
326 
327  return DIA<ValueType>(tlx::make_counting<UnionNode>(first_dia, dias...));
328 }
329 
330 /*!
331  * Union is a LOp, which creates the union of all items from any number of DIAs
332  * as a single DIA, where the items are in an arbitrary order. All input DIAs
333  * must contain the same type, which is also the output DIA's type.
334  *
335  * The Union operation concatenates all _local_ pieces of a DIA, no rebalancing
336  * is performed, and no communication is needed.
337  *
338  * \param dias DIAs, which are unified.
339  *
340  * \ingroup dia_lops
341  */
342 template <typename ValueType>
343 auto Union(const std::initializer_list<DIA<ValueType> >& dias) {
344 
345  for (const DIA<ValueType>& d : dias)
346  d.AssertValid();
347 
349 
350  return DIA<ValueType>(tlx::make_counting<UnionNode>(dias));
351 }
352 
353 /*!
354  * Union is a LOp, which creates the union of all items from any number of DIAs
355  * as a single DIA, where the items are in an arbitrary order. All input DIAs
356  * must contain the same type, which is also the output DIA's type.
357  *
358  * The Union operation concatenates all _local_ pieces of a DIA, no rebalancing
359  * is performed, and no communication is needed.
360  *
361  * \param dias DIAs, which are unified.
362  *
363  * \ingroup dia_lops
364  */
365 template <typename ValueType>
366 auto Union(const std::vector<DIA<ValueType> >& dias) {
367 
368  for (const DIA<ValueType>& d : dias)
369  d.AssertValid();
370 
372 
373  return DIA<ValueType>(tlx::make_counting<UnionNode>(dias));
374 }
375 
376 template <typename ValueType, typename Stack>
377 template <typename SecondDIA>
379  const SecondDIA& second_dia) const {
380  return api::Union(*this, second_dia);
381 }
382 
383 } // namespace api
384 
385 //! imported from api namespace
386 using api::Union;
387 
388 } // namespace thrill
389 
390 #endif // !THRILL_API_UNION_HEADER
391 
392 /******************************************************************************/
void SetConsumeCounter(size_t consume) final
Definition: union.hpp:291
void call_foreach_with_index(Functor &&f, Args &&... args)
void Execute() final
Virtual execution method. Triggers actual computation in sub-classes.
Definition: union.hpp:225
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
DIABase * node
reference to child node
Definition: union.hpp:66
auto Union(const FirstDIA &first_dia, const DIAs &... dias)
Union is a LOp, which creates the union of all items from any number of DIAs as a single DIA...
Definition: union.hpp:319
static constexpr bool debug
Definition: union.hpp:55
void vexpand(Types &&...)
Definition: vexpand.hpp:24
tlx::delegate< void(const ValueType &)> Callback
Definition: dia_node.hpp:40
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:27
bool RequireParentPushData(size_t parent_index) const final
Check whether we need PushData() from the specific parent.
Definition: union.hpp:218
const std::vector< DIABasePtr > & parents() const
Returns the parents of this DIABase.
Definition: dia_base.hpp:253
STL namespace.
ChildStatus status
status of the child.
Definition: union.hpp:74
void IncConsumeCounter(size_t consume) final
Definition: union.hpp:277
RegisterParent(UnionNode *union_node)
Definition: union.hpp:136
UnionNode(const ParentDIA0 &parent0, const ParentDIAs &... parents)
Definition: union.hpp:89
bool AllInputsDone() const
check if all inputs were pushed to the child
Definition: union.hpp:79
void RunPushData() final
Definition: union.hpp:264
auto MapVector(const std::vector< Type > &input, const Functor &f) -> std::vector< typename std::result_of< Functor(Type)>::type >
Definition: functional.hpp:78
bool ForwardDataOnly() const final
A UnionNode cannot be executed, it never contains any data.
Definition: union.hpp:215
void StopPreOp(size_t parent_index) final
Virtual method for preparing end of PushData.
Definition: union.hpp:248
A DIANode is a typed node representing and operation in Thrill.
Definition: dia_node.hpp:37
std::vector< size_t > pushed_inputs
vector of inputs which were delivered to this child.
Definition: union.hpp:76
The DIABase is the untyped super class of DIANode.
Definition: dia_base.hpp:87
void RemoveChild(DIABase *node) final
Definition: union.hpp:172
auto Union(const SecondDIA &second_dia) const
Union is a LOp, which creates the union of all items from any number of DIAs as a single DIA...
Definition: union.hpp:378
Implements a Union operation by hooking any number of input DIANodes and forwarding the output immedi...
Definition: union.hpp:53
std::vector< DIABasePtr > parents_
Parents of this DIABase.
Definition: dia_base.hpp:310
void PushItem(const ValueType &item, size_t parent_index) const
Method for derived classes to Push a single item to all children.
Definition: union.hpp:241
std::vector< DIABase * > children() const final
Returns the children of this DIABase.
Definition: union.hpp:206
UnionNode(const std::initializer_list< ParentDIA > &parents)
Definition: union.hpp:128
std::vector< T, Allocator< T > > vector
vector with Manager tracking
Definition: allocator.hpp:228
void DecConsumeCounter(size_t consume) final
Definition: union.hpp:284
void RemoveAllChildren() final
Definition: union.hpp:182
UnionNode(const std::vector< ParentDIA > &parents)
Definition: union.hpp:102
Callback callback
callback to invoke (currently for each item)
Definition: union.hpp:68
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
tlx::CountingPtr< DIABase > DIABasePtr
Definition: dia_base.hpp:90
size_t consume_counter() const final
Returns consume_counter_.
Definition: union.hpp:268
void PushData(bool) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
Definition: union.hpp:266
void StartPreOp(size_t parent_index) final
Virtual method for preparing start of PushData.
Definition: union.hpp:227
std::vector< UnionChild > children_
Callback functions from the child nodes.
Definition: union.hpp:302
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
static constexpr size_t kNeverConsume
Never full consume.
Definition: dia_base.hpp:324
Context & context_
associated Context
Definition: dia_base.hpp:293
void AddChild(DIABase *node, const Callback &callback, size_t parent_index=0) final
Enables children to push their "folded" function chains to their parent.
Definition: union.hpp:162