13 #ifndef THRILL_API_PREFIX_SUM_HEADER 14 #define THRILL_API_PREFIX_SUM_HEADER 27 template <
typename ValueType,
typename SumFunction,
bool Inclusive>
30 static constexpr
bool debug =
false;
36 template <
typename ParentDIA>
39 const SumFunction& sum_function,
40 const ValueType& initial_element)
41 :
Super(parent.ctx(), label, { parent.id() }, { parent.node() }),
47 auto pre_op_fn = [
this](
const ValueType& input) {
51 auto lop_chain = parent.stack().push(pre_op_fn).fold();
52 parent.node()->AddChild(
this, lop_chain);
56 void PreOp(
const ValueType& input) {
57 LOG <<
"Input: " << input;
65 <<
"PrefixSum rejected File from parent " 66 <<
"due to non-empty function stack.";
73 while (reader.HasNext()) {
75 local_sum_, reader.template Next<ValueType>());
86 LOG <<
"MainOp processing";
98 for (
size_t i = 0; i < num_items; ++i) {
105 for (
size_t i = 0; i < num_items; ++i) {
132 template <
typename ValueType,
typename Stack>
133 template <
typename SumFunction>
135 const SumFunction& sum_function,
const ValueType& initial_element)
const {
146 "SumFunction has the wrong input type");
152 "SumFunction has the wrong input type");
158 "SumFunction has the wrong input type");
160 auto node = tlx::make_counting<PrefixSumNode>(
161 *
this,
"PrefixSum", sum_function, initial_element);
169 #endif // !THRILL_API_PREFIX_SUM_HEADER net::FlowControlChannel & net
DIA is the interface between the user and the Thrill framework.
ValueType local_sum_
Local sum to be used in all reduce operation.
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
KeepReader GetKeepReader(size_t prefetch_size=File::default_prefetch_size_) const
Get BlockReader for beginning of File.
void StopPreOp(size_t) final
Virtual method for preparing end of PushData.
A File is an ordered sequence of Block objects for storing items.
void Clear()
Free all Blocks in the File and deallocate vectors.
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
const char * label() const
return label() of DIANode subclass as stored by StatsNode
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSum(const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, given a certain sum operation.
static constexpr bool debug
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
static constexpr bool g_debug_push_file
data::File::Writer writer_
Data writer to local file (only active in PreOp).
auto PrefixSum(const SumFunction &sum_function=SumFunction(), const ValueType &initial_element=ValueType()) const
PrefixSum is a DOp, which computes the (inclusive) prefix sum of all elements.
const bool parent_stack_empty_
Whether the parent stack is empty.
const ValueType initial_element_
Initial element.
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
void PreOp(const ValueType &input)
PreOp: compute local prefixsum and store items.
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
common::FunctionTraits< Function > FunctionTraits
alias for convenience.
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
A DOpNode is a typed node representing and distributed operations in Thrill.
Reader GetReader(bool consume, size_t prefetch_size=File::default_prefetch_size_)
Get BlockReader or a consuming BlockReader for beginning of File.
void Execute() final
Executes the prefixsum operation.
File Copy() const
Return a copy of the File (explicit copy-constructor)
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
data::File file_
Local data file.
SumFunction sum_function_
The sum function which is applied to two elements.
size_t num_items() const
Return the number of items in the file.
void Close()
Explicitly close the writer.
#define LOG
Default logging method: output if the local debug variable is true.
PrefixSumNode(const ParentDIA &parent, const char *label, const SumFunction &sum_function, const ValueType &initial_element)
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
bool OnPreOpFile(const data::File &file, size_t) final
Context & context_
associated Context
#define LOGC(cond)
Explicitly specify the condition for logging.