Thrill  0.1
dia_node.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/dia_node.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_API_DIA_NODE_HEADER
13 #define THRILL_API_DIA_NODE_HEADER
14 
15 #include <thrill/api/dia_base.hpp>
16 #include <thrill/data/file.hpp>
17 #include <tlx/delegate.hpp>
18 
19 #include <algorithm>
20 #include <string>
21 #include <vector>
22 
23 namespace thrill {
24 namespace api {
25 
26 //! \ingroup api_layer
27 //! \{
28 
29 /*!
30  * A DIANode is a typed node representing and operation in Thrill. It is the
31  * super class for all operation nodes and stores the state of the
32  * operation.
33  *
34  * \tparam ValueType The output type of the DIA delivered by this DIANode.
35  */
36 template <typename ValueType>
37 class DIANode : public DIABase
38 {
39 public:
41 
42  struct Child {
43  //! reference to child node
45  //! callback to invoke (currently for each item)
47  //! index this node has among the parents of the child (passed to
48  //! callbacks), e.g. for ZipNode which has multiple parents and their order
49  //! is important.
50  size_t parent_index;
51  };
52 
53  /*!
54  * Constructor for a DIANode, which sets references to the
55  * parent nodes. Calls the constructor of DIABase with the same parameters.
56  */
57  DIANode(Context& ctx, const char* label,
58  const std::initializer_list<size_t>& parent_ids,
59  const std::initializer_list<DIABasePtr>& parents)
60  : DIABase(ctx, label, parent_ids, parents) { }
61 
62  /*!
63  * Constructor for a DIANode, which sets references to the
64  * parent nodes. Calls the constructor of DIABase with the same parameters.
65  */
66  DIANode(Context& ctx, const char* label,
67  std::vector<size_t>&& parent_ids,
68  std::vector<DIABasePtr>&& parents)
69  : DIABase(ctx, label, std::move(parent_ids), std::move(parents)) { }
70 
71  /*!
72  * Enables children to push their "folded" function chains to their parent.
73  * This way the parent can push all its result elements to each of the
74  * children. This procedure enables the minimization of IO-accesses.
75  */
76  virtual void AddChild(DIABase* node, const Callback& callback = Callback(),
77  size_t parent_index = 0) {
78  children_.emplace_back(Child { node, callback, parent_index });
79  }
80 
81  //! Remove a child from the vector of children. This method is called by the
82  //! destructor of children.
83  void RemoveChild(DIABase* node) override {
84  typename std::vector<Child>::iterator swap_end =
85  std::remove_if(
86  children_.begin(), children_.end(),
87  [node](const Child& c) { return c.node == node; });
88 
89  // assert(swap_end != children_.end());
90  children_.erase(swap_end, children_.end());
91  }
92 
93  void RemoveAllChildren() override {
94  // remove all children other than Collapse and Union nodes
95  children_.erase(
96  std::remove_if(
97  children_.begin(), children_.end(),
98  [this](Child& child) {
99  if (child.node->ForwardDataOnly())
100  return false;
101  child.node->RemoveParent(this);
102  return true;
103  }),
104  children_.end());
105 
106  // recurse into remaining nodes (CollapseNode)
107  for (Child& child : children_)
108  child.node->RemoveAllChildren();
109  }
110 
111  //! Returns the children of this DIABase.
112  std::vector<DIABase*> children() const override {
113  std::vector<DIABase*> out;
114  out.reserve(children_.size());
115  for (const Child& child : children_)
116  out.emplace_back(child.node);
117  return out;
118  }
119 
120  //! Performing push operation. Notifies children and calls actual push
121  //! method. Then cleans up the DIA graph by freeing parent references of
122  //! children.
123  void RunPushData() override {
124  bool need_callback = false;
125  for (const Child& child : children_)
126  need_callback = need_callback || (child.callback != nullptr);
127  if (!need_callback) {
128  LOG0 << "RunPushData(): skip PushData as no callback";
129  return;
130  }
131 
132  for (const Child& child : children_)
133  child.node->StartPreOp(child.parent_index);
134 
137 
138  bool consume = context().consume() && consume_counter() == 0;
139  PushData(consume);
140  if (consume) Dispose();
141 
142  for (const Child& child : children_)
143  child.node->StopPreOp(child.parent_index);
144  }
145 
146  //! Method for derived classes to Push a single item to all children.
147  void PushItem(const ValueType& item) const {
148  for (const Child& child : children_) {
149  if (child.callback)
150  child.callback(item);
151  }
152  }
153 
154  //! Method for derived classes to Push a whole File of ValueType items to
155  //! all children.
156  void PushFile(data::File& file, bool consume) const {
157  // iterate over children, push directly into those with data:File*
158  std::vector<Child> nonfile_children;
159  for (const Child& child : children_) {
160  if (child.node->OnPreOpFile(file, child.parent_index))
161  LOG0 << "PushFile: direct push accepted by " << *child.node;
162  else
163  nonfile_children.push_back(child);
164  }
165 
166  if (nonfile_children.size() == 0) return;
167 
168  // push into remaining which have a function stack or no direct File*
169  data::File::Reader reader = file.GetReader(consume);
170  while (reader.HasNext()) {
171  ValueType item = reader.Next<ValueType>();
172  for (const Child& child : nonfile_children) {
173  if (child.callback)
174  child.callback(item);
175  }
176  }
177  }
178 
179 protected:
180  //! Callback functions from the child nodes.
181  std::vector<Child> children_;
182 };
183 
184 //! \}
185 
186 } // namespace api
187 } // namespace thrill
188 
189 #endif // !THRILL_API_DIA_NODE_HEADER
190 
191 /******************************************************************************/
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
std::vector< size_t > parent_ids() const
Returns the parents of this DIABase.
Definition: dia_base.hpp:258
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
Context & context()
Returns the api::Context of this DIABase.
Definition: dia_base.hpp:208
virtual void DecConsumeCounter(size_t counter)
Definition: dia_base.hpp:237
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
tlx::delegate< void(const ValueType &)> Callback
Definition: dia_node.hpp:40
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:27
const std::vector< DIABasePtr > & parents() const
Returns the parents of this DIABase.
Definition: dia_base.hpp:253
STL namespace.
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
const char * label() const
return label() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:218
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
A DIANode is a typed node representing and operation in Thrill.
Definition: dia_node.hpp:37
void RemoveChild(DIABase *node) override
Definition: dia_node.hpp:83
The DIABase is the untyped super class of DIANode.
Definition: dia_base.hpp:87
void RunPushData() override
Definition: dia_node.hpp:123
Callback callback
callback to invoke (currently for each item)
Definition: dia_node.hpp:46
virtual size_t consume_counter() const
Returns consume_counter_.
Definition: dia_base.hpp:226
virtual void AddChild(DIABase *node, const Callback &callback=Callback(), size_t parent_index=0)
Enables children to push their "folded" function chains to their parent.
Definition: dia_node.hpp:76
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
bool consume() const
return value of consume flag.
Definition: context.hpp:378
DIANode(Context &ctx, const char *label, std::vector< size_t > &&parent_ids, std::vector< DIABasePtr > &&parents)
Constructor for a DIANode, which sets references to the parent nodes.
Definition: dia_node.hpp:66
Reader GetReader(bool consume, size_t prefetch_size=File::default_prefetch_size_)
Get BlockReader or a consuming BlockReader for beginning of File.
Definition: file.cpp:78
std::vector< Child > children_
Callback functions from the child nodes.
Definition: dia_node.hpp:181
DIABase * node
reference to child node
Definition: dia_node.hpp:44
void RemoveAllChildren() override
Definition: dia_node.hpp:93
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
DIANode(Context &ctx, const char *label, const std::initializer_list< size_t > &parent_ids, const std::initializer_list< DIABasePtr > &parents)
Constructor for a DIANode, which sets references to the parent nodes.
Definition: dia_node.hpp:57
std::vector< DIABase * > children() const override
Returns the children of this DIABase.
Definition: dia_node.hpp:112
void PushFile(data::File &file, bool consume) const
Definition: dia_node.hpp:156
static constexpr size_t kNeverConsume
Never full consume.
Definition: dia_base.hpp:324