12 #ifndef THRILL_API_UNION_HEADER 13 #define THRILL_API_UNION_HEADER 21 #include <initializer_list> 52 template <
typename ValueType>
55 static constexpr
bool debug =
false;
80 for (
const size_t& i : pushed_inputs)
88 template <
typename ParentDIA0,
typename... ParentDIAs>
91 :
Super(parent0.ctx(),
"Union",
92 { parent0.id(), parents.id() ... },
93 { parent0.node(),
parents.node() ... }),
101 template <
typename ParentDIA>
103 :
Super(parents.front().ctx(),
"Union",
105 parents, [](const ParentDIA& d) {
return d.id(); }),
107 parents, [](
const ParentDIA& d) {
114 auto propagate_fn = [
this, i](
const ValueType& input) {
120 auto lop_chain =
parents[i].stack().push(propagate_fn).fold();
121 parents[i].node()->AddChild(
this, lop_chain, i);
127 template <
typename ParentDIA>
137 : union_node_(union_node) { }
139 template <
typename Index,
typename Parent>
140 void operator () (
const Index&, Parent& parent) {
143 auto propagate_fn = [union_node](
const ValueType& input) {
144 union_node->
PushItem(input, Index::index);
149 auto lop_chain = parent.stack().push(propagate_fn).fold();
150 parent.node()->AddChild(union_node_, lop_chain, Index::index);
163 size_t parent_index = 0) final {
165 node, callback, parent_index,
173 typename std::vector<UnionChild>::iterator swap_end =
176 [node](
const UnionChild& c) {
return c.node == node; });
190 if (child.node->ForwardDataOnly())
192 child.node->RemoveParent(
this);
199 if (!child.node->ForwardDataOnly())
201 child.node->RemoveAllChildren();
207 std::vector<DIABase*> out;
210 out.emplace_back(child.node);
220 if (!child.pushed_inputs[parent_index])
return true;
228 LOG0 <<
"UnionNode::StartPreOp parent_index=" << parent_index;
232 LOG <<
"UnionNode::StartPreOp triggered" 233 <<
" StartPreOp on child " << *child.node;
234 child.node->StartPreOp(child.parent_index);
241 void PushItem(
const ValueType& item,
size_t parent_index)
const {
243 if (!child.pushed_inputs[parent_index])
244 child.callback(item);
249 LOG0 <<
"UnionNode::StopPreOp parent_index=" << parent_index;
252 assert(!child.pushed_inputs[parent_index]);
253 child.pushed_inputs[parent_index] = 1;
255 if (child.AllInputsDone()) {
256 LOG <<
"UnionNode::StopPreOp triggered" 257 <<
" StopPreOp on child " << *child.node;
258 child.node->StopPreOp(child.parent_index);
272 c =
std::min(c, p->consume_counter());
280 p->IncConsumeCounter(consume);
287 p->DecConsumeCounter(consume);
294 p->SetConsumeCounter(consume);
318 template <
typename FirstDIA,
typename... DIAs>
319 auto Union(
const FirstDIA& first_dia,
const DIAs& ... dias) {
321 tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
323 using ValueType =
typename FirstDIA::ValueType;
327 return DIA<ValueType>(tlx::make_counting<UnionNode>(first_dia, dias...));
342 template <
typename ValueType>
365 template <
typename ValueType>
376 template <
typename ValueType,
typename Stack>
377 template <
typename SecondDIA>
379 const SecondDIA& second_dia)
const {
390 #endif // !THRILL_API_UNION_HEADER void SetConsumeCounter(size_t consume) final
void Execute() final
Virtual execution method. Triggers actual computation in sub-classes.
DIA is the interface between the user and the Thrill framework.
DIABase * node
reference to child node
auto Union(const FirstDIA &first_dia, const DIAs &... dias)
Union is a LOp, which creates the union of all items from any number of DIAs as a single DIA...
static constexpr bool debug
tlx::delegate< void(const ValueType &)> Callback
#define LOG0
Override default output: never or always output log.
bool RequireParentPushData(size_t parent_index) const final
Check whether we need PushData() from the specific parent.
const std::vector< DIABasePtr > & parents() const
Returns the parents of this DIABase.
ChildStatus status
status of the child.
void IncConsumeCounter(size_t consume) final
RegisterParent(UnionNode *union_node)
UnionNode(const ParentDIA0 &parent0, const ParentDIAs &... parents)
bool AllInputsDone() const
check if all inputs were pushed to the child
auto MapVector(const std::vector< Type > &input, const Functor &f) -> std::vector< typename std::result_of< Functor(Type)>::type >
bool ForwardDataOnly() const final
A UnionNode cannot be executed, it never contains any data.
void StopPreOp(size_t parent_index) final
Virtual method for preparing end of PushData.
A DIANode is a typed node representing and operation in Thrill.
std::vector< size_t > pushed_inputs
vector of inputs which were delivered to this child.
The DIABase is the untyped super class of DIANode.
void RemoveChild(DIABase *node) final
auto Union(const SecondDIA &second_dia) const
Union is a LOp, which creates the union of all items from any number of DIAs as a single DIA...
Implements a Union operation by hooking any number of input DIANodes and forwarding the output immedi...
std::vector< DIABasePtr > parents_
Parents of this DIABase.
void PushItem(const ValueType &item, size_t parent_index) const
Method for derived classes to Push a single item to all children.
std::vector< DIABase * > children() const final
Returns the children of this DIABase.
UnionNode(const std::initializer_list< ParentDIA > &parents)
std::vector< T, Allocator< T > > vector
vector with Manager tracking
void DecConsumeCounter(size_t consume) final
void RemoveAllChildren() final
UnionNode(const std::vector< ParentDIA > &parents)
Callback callback
callback to invoke (currently for each item)
static uint_pair min()
return an uint_pair instance containing the smallest value possible
tlx::CountingPtr< DIABase > DIABasePtr
size_t consume_counter() const final
Returns consume_counter_.
void PushData(bool) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
void StartPreOp(size_t parent_index) final
Virtual method for preparing start of PushData.
std::vector< UnionChild > children_
Callback functions from the child nodes.
#define LOG
Default logging method: output if the local debug variable is true.
static constexpr size_t kNeverConsume
Never full consume.
Context & context_
associated Context
void AddChild(DIABase *node, const Callback &callback, size_t parent_index=0) final
Enables children to push their "folded" function chains to their parent.