12 #ifndef THRILL_API_WINDOW_HEADER 13 #define THRILL_API_WINDOW_HEADER 30 template <
typename ValueType,
typename Input,
31 typename WindowFunction,
typename PartialWindowFunction>
35 static constexpr
bool debug =
false;
44 template <
typename ParentDIA>
46 const char*
label,
size_t window_size,
47 const WindowFunction& window_function,
48 const PartialWindowFunction& partial_window_function)
49 :
Super(parent.ctx(), label, { parent.id() }, { parent.node() }),
55 auto pre_op_fn = [
this](
const Input& input) {
59 auto lop_chain = parent.stack().push(pre_op_fn).fold();
60 parent.node()->AddChild(
this, lop_chain);
74 <<
"Window rejected File from parent " 75 <<
"due to non-empty function stack.";
81 if (file_.num_items() != 0) {
85 auto reader = file_.GetReaderAt<Input>(pos);
86 while (reader.HasNext())
93 void PreOp(
const Input& input) {
140 template <
typename ValueType,
typename Input,
141 typename WindowFunction,
typename PartialWindowFunction>
144 ValueType, Input, WindowFunction, PartialWindowFunction>
147 ValueType, Input, WindowFunction, PartialWindowFunction>;
154 template <
typename ParentDIA>
156 const char*
label,
size_t window_size,
157 const WindowFunction& window_function,
158 const PartialWindowFunction& partial_window_function)
159 : Super(parent, label, window_size,
160 window_function, partial_window_function) { }
169 std::vector<Input> my_last;
176 std::vector<Input> pre =
179 sLOG <<
"Window::MainOp()" 182 <<
"pre.size()" << pre.size();
187 for (
size_t i = 0; i < pre.size(); ++i)
201 sLOG <<
"WindowNode::PushData()" 202 <<
"window.size()" << window.
size()
205 <<
"num_items" << num_items;
207 for (
size_t i = 0; i < num_items; ++i, ++rank) {
216 rank, window, [
this](
const ValueType& output) {
228 while (window.
size()) {
230 rank, window, [
this](
const ValueType& output) {
241 using Super::first_rank_;
242 using Super::window_;
243 using Super::window_size_;
244 using Super::window_function_;
245 using Super::partial_window_function_;
248 template <
typename ValueType,
typename Stack>
249 template <
typename ValueOut,
250 typename WindowFunction,
typename PartialWindowFunction>
252 size_t window_size,
const WindowFunction& window_function,
253 const PartialWindowFunction& partial_window_function)
const {
257 ValueOut,
ValueType, WindowFunction, PartialWindowFunction>;
262 auto node = tlx::make_counting<WindowNode>(
263 *
this,
"FlatWindow", window_size,
264 window_function, partial_window_function);
269 template <
typename ValueType,
typename Stack>
270 template <
typename ValueOut,
typename WindowFunction>
272 size_t window_size,
const WindowFunction& window_function)
const {
275 auto no_operation_function =
280 return FlatWindow<ValueOut>(
281 window_size, window_function, no_operation_function);
284 template <
typename ValueType,
typename Stack>
285 template <
typename WindowFunction>
287 size_t window_size,
const WindowFunction& window_function)
const {
298 "WindowFunction's first argument must be size_t (index)");
305 "WindowFunction's second argument must be common::RingBuffer<T>");
308 auto flatwindow_function =
309 [window_function](
size_t index,
312 emit(window_function(index, window));
315 auto no_operation_function =
322 decltype(flatwindow_function), decltype(no_operation_function)>;
324 auto node = tlx::make_counting<WindowNode>(
325 *
this,
"Window", window_size,
326 flatwindow_function, no_operation_function);
331 template <
typename ValueType,
typename Stack>
332 template <
typename WindowFunction,
typename PartialWindowFunction>
334 size_t window_size,
const WindowFunction& window_function,
335 const PartialWindowFunction& partial_window_function)
const {
346 "WindowFunction's first argument must be size_t (index)");
353 "WindowFunction's second argument must be common::RingBuffer<T>");
356 auto flatwindow_function =
357 [window_function](
size_t index,
360 emit(window_function(index, window));
364 auto flatwindow_partial_function =
365 [partial_window_function](
size_t index,
368 emit(partial_window_function(index, window));
373 decltype(flatwindow_function), decltype(flatwindow_partial_function)>;
375 auto node = tlx::make_counting<WindowNode>(
376 *
this,
"Window", window_size,
377 flatwindow_function, flatwindow_partial_function);
387 template <
typename ValueType,
typename Input,
388 typename WindowFunction,
typename PartialWindowFunction>
391 ValueType, Input, WindowFunction, PartialWindowFunction>
394 ValueType, Input, WindowFunction, PartialWindowFunction>;
401 template <
typename ParentDIA>
403 const char*
label,
size_t window_size,
404 const WindowFunction& window_function,
405 const PartialWindowFunction& partial_window_function)
406 : Super(parent, label, window_size,
407 window_function, partial_window_function) { }
416 std::vector<Input> my_last;
423 std::vector<Input> pre =
432 sLOG <<
"Window::MainOp()" 436 <<
"pre.size()" << pre.size()
437 <<
"fill_size" << fill_size;
443 for (
size_t i = pre.size() - fill_size; i < pre.size(); ++i)
451 std::vector<Input> window;
459 sLOG <<
"WindowNode::PushData()" 460 <<
"window.size()" << window.size()
462 <<
"rank+window+1" << (rank + window.size() + 1)
463 <<
"num_items" << num_items;
465 for (
size_t i = 0; i < num_items; ++i, ++rank) {
467 window.emplace_back(reader.
Next<Input>());
469 sLOG <<
"rank" << rank <<
"window.size()" << window.size();
476 rank, window, [
this](
const ValueType& output) {
490 rank, window, [
this](
const ValueType& output) {
498 using Super::first_rank_;
499 using Super::window_;
500 using Super::window_size_;
501 using Super::window_function_;
502 using Super::partial_window_function_;
505 template <
typename ValueType,
typename Stack>
506 template <
typename ValueOut,
typename WindowFunction>
509 const WindowFunction& window_function)
const {
513 ValueOut, ValueType, WindowFunction, WindowFunction>;
518 auto node = tlx::make_counting<WindowNode>(
519 *
this,
"FlatWindow", window_size, window_function, window_function);
524 template <
typename ValueType,
typename Stack>
525 template <
typename WindowFunction>
528 const WindowFunction& window_function)
const {
539 "WindowFunction's first argument must be size_t (index)");
543 std::vector<ValueType>,
546 "WindowFunction's second argument must be std::vector<T>");
549 auto flatwindow_function =
550 [window_function](
size_t index,
551 const std::vector<ValueType>& window,
553 emit(window_function(index, window));
558 decltype(flatwindow_function), decltype(flatwindow_function)>;
560 auto node = tlx::make_counting<WindowNode>(
561 *
this,
"Window", window_size, flatwindow_function, flatwindow_function);
569 #endif // !THRILL_API_WINDOW_HEADER tlx::RingBuffer< Type, Allocator > RingBuffer
DisjointWindowNode(const ParentDIA &parent, const char *label, size_t window_size, const WindowFunction &window_function, const PartialWindowFunction &partial_window_function)
OverlapWindowNode(const ParentDIA &parent, const char *label, size_t window_size, const WindowFunction &window_function, const PartialWindowFunction &partial_window_function)
net::FlowControlChannel & net
#define sLOG
Default logging method: output if the local debug variable is true.
DIA is the interface between the user and the Thrill framework.
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Description of the amount of RAM the internal data structures of a DIANode require.
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
size_t window_size_
Size k of the window.
void push_back(const value_type &t)
add element at the end
WindowFunction window_function_
The window function which is applied to k elements.
A ring (circular) buffer of static (non-growing) size.
size_t num_workers() const
Global number of workers in the system.
A File is an ordered sequence of Block objects for storing items.
void Clear()
Free all Blocks in the File and deallocate vectors.
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
const char * label() const
return label() of DIANode subclass as stored by StatsNode
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSum(const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, given a certain sum operation.
const bool parent_stack_empty_
Whether the parent stack is empty.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
static constexpr bool g_debug_push_file
void emplace_back(Args &&... args)
emplace element at the end
DIAMemUse PreOpMemUse() final
Amount of RAM used by PreOp after StartPreOp()
void deallocate()
deallocate buffer
size_type size() const noexcept
return the number of items in the buffer
void copy_to(std::vector< value_type > *out) const
copy all element into the vector
DIAMemUse PushDataMemUse() final
Amount of RAM used by PushData()
data::File::Writer writer_
Data writer to local file (only active in PreOp).
void StartPreOp(size_t) final
Virtual method for preparing start of PushData.
std::vector< T > Predecessor(size_t k, const std::vector< T > &my_values)
Collects up to k predecessors of type T from preceding PEs.
void pop_front()
remove element at the beginning
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t first_rank_
rank of our first element in file_
auto Window(size_t window_size, const WindowFunction &window_function=WindowFunction()) const
Window is a DOp, which applies a window function to every k consecutive items in a DIA...
common::FunctionTraits< Function > FunctionTraits
alias for convenience.
static constexpr bool debug
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
size_t my_rank() const
Global rank of this worker among all other workers in the system.
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
void allocate(size_t max_size)
allocate buffer
A DOpNode is a typed node representing and distributed operations in Thrill.
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Reader GetReader(bool consume, size_t prefetch_size=File::default_prefetch_size_)
Get BlockReader or a consuming BlockReader for beginning of File.
data::File file_
Local data file.
File Copy() const
Return a copy of the File (explicit copy-constructor)
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
bool OnPreOpFile(const data::File &file, size_t) final
size_t num_items() const
Return the number of items in the file.
void Close()
Explicitly close the writer.
RingBuffer window_
cache the last k - 1 items for transmission
tag structure for Window() and FlatWindow()
void PreOp(const Input &input)
PreOp: keep last k - 1 items (local window) and store items.
PartialWindowFunction partial_window_function_
The window function which is applied to the last < k elements.
auto FlatWindow(size_t window_size, const WindowFunction &window_function=WindowFunction()) const
FlatWindow is a DOp, which applies a window function to every k consecutive items in a DIA...
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
BaseWindowNode(const ParentDIA &parent, const char *label, size_t window_size, const WindowFunction &window_function, const PartialWindowFunction &partial_window_function)
static constexpr bool debug
Context & context_
associated Context
void StopPreOp(size_t) final
Virtual method for preparing end of PushData.
#define LOGC(cond)
Explicitly specify the condition for logging.
void move_to(std::vector< value_type > *out)
move all element from the RingBuffer into the vector