12 #ifndef THRILL_API_DIA_NODE_HEADER 13 #define THRILL_API_DIA_NODE_HEADER 36 template <
typename ValueType>
58 const std::initializer_list<size_t>&
parent_ids,
59 const std::initializer_list<DIABasePtr>&
parents)
60 :
DIABase(ctx, label, parent_ids, parents) { }
68 std::vector<DIABasePtr>&&
parents)
84 typename std::vector<Child>::iterator swap_end =
87 [
node](
const Child& c) {
return c.node ==
node; });
98 [
this](Child& child) {
99 if (child.node->ForwardDataOnly())
101 child.node->RemoveParent(
this);
108 child.node->RemoveAllChildren();
113 std::vector<DIABase*> out;
116 out.emplace_back(child.node);
124 bool need_callback =
false;
126 need_callback = need_callback || (child.callback !=
nullptr);
127 if (!need_callback) {
128 LOG0 <<
"RunPushData(): skip PushData as no callback";
132 for (
const Child& child : children_)
133 child.node->StartPreOp(child.parent_index);
142 for (
const Child& child : children_)
143 child.node->StopPreOp(child.parent_index);
150 child.callback(item);
158 std::vector<Child> nonfile_children;
160 if (child.node->OnPreOpFile(file, child.parent_index))
161 LOG0 <<
"PushFile: direct push accepted by " << *child.node;
163 nonfile_children.push_back(child);
166 if (nonfile_children.size() == 0)
return;
171 ValueType item = reader.
Next<ValueType>();
172 for (
const Child& child : nonfile_children) {
174 child.callback(item);
189 #endif // !THRILL_API_DIA_NODE_HEADER virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
std::vector< size_t > parent_ids() const
Returns the parents of this DIABase.
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Context & context()
Returns the api::Context of this DIABase.
virtual void DecConsumeCounter(size_t counter)
A File is an ordered sequence of Block objects for storing items.
tlx::delegate< void(const ValueType &)> Callback
#define LOG0
Override default output: never or always output log.
const std::vector< DIABasePtr > & parents() const
Returns the parents of this DIABase.
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
const char * label() const
return label() of DIANode subclass as stored by StatsNode
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
A DIANode is a typed node representing and operation in Thrill.
void RemoveChild(DIABase *node) override
The DIABase is the untyped super class of DIANode.
void RunPushData() override
Callback callback
callback to invoke (currently for each item)
virtual size_t consume_counter() const
Returns consume_counter_.
virtual void AddChild(DIABase *node, const Callback &callback=Callback(), size_t parent_index=0)
Enables children to push their "folded" function chains to their parent.
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
bool consume() const
return value of consume flag.
DIANode(Context &ctx, const char *label, std::vector< size_t > &&parent_ids, std::vector< DIABasePtr > &&parents)
Constructor for a DIANode, which sets references to the parent nodes.
Reader GetReader(bool consume, size_t prefetch_size=File::default_prefetch_size_)
Get BlockReader or a consuming BlockReader for beginning of File.
std::vector< Child > children_
Callback functions from the child nodes.
DIABase * node
reference to child node
void RemoveAllChildren() override
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
DIANode(Context &ctx, const char *label, const std::initializer_list< size_t > &parent_ids, const std::initializer_list< DIABasePtr > &parents)
Constructor for a DIANode, which sets references to the parent nodes.
std::vector< DIABase * > children() const override
Returns the children of this DIABase.
void PushFile(data::File &file, bool consume) const
static constexpr size_t kNeverConsume
Never full consume.