Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
window.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/window.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_API_WINDOW_HEADER
13 #define THRILL_API_WINDOW_HEADER
14 
15 #include <thrill/api/dia.hpp>
16 #include <thrill/api/dop_node.hpp>
17 #include <thrill/common/logger.hpp>
19 #include <thrill/data/file.hpp>
20 
21 #include <algorithm>
22 #include <vector>
23 
24 namespace thrill {
25 namespace api {
26 
27 /*!
28  * \ingroup api_layer
29  */
30 template <typename ValueType, typename Input,
31  typename WindowFunction, typename PartialWindowFunction>
32 class BaseWindowNode : public DOpNode<ValueType>
33 {
34 protected:
35  static constexpr bool debug = false;
36 
38  using Super::context_;
39 
40  //! RingBuffer used and passed to user-defined function.
42 
43 public:
44  template <typename ParentDIA>
45  BaseWindowNode(const ParentDIA& parent,
46  const char* label, size_t window_size,
47  const WindowFunction& window_function,
48  const PartialWindowFunction& partial_window_function)
49  : Super(parent.ctx(), label, { parent.id() }, { parent.node() }),
50  parent_stack_empty_(ParentDIA::stack_empty),
51  window_size_(window_size),
52  window_function_(window_function),
53  partial_window_function_(partial_window_function)
54  {
55  // Hook PreOp(s)
56  auto pre_op_fn = [this](const Input& input) {
57  PreOp(input);
58  };
59 
60  auto lop_chain = parent.stack().push(pre_op_fn).fold();
61  parent.node()->AddChild(this, lop_chain);
62  }
63 
64  DIAMemUse PreOpMemUse() final {
65  return window_size_ * sizeof(Input);
66  }
67 
68  void StartPreOp(size_t /* id */) final {
70  }
71 
72  bool OnPreOpFile(const data::File& file, size_t /* parent_index */) final {
73  if (!parent_stack_empty_) {
75  << "Window rejected File from parent "
76  << "due to non-empty function stack.";
77  return false;
78  }
79  // accept file
80  assert(file_.num_items() == 0);
81  file_ = file.Copy();
82  if (file_.num_items() != 0) {
83  // read last k - 1 items from File
84  size_t pos = file_.num_items() > window_size_ - 1 ?
85  file_.num_items() - window_size_ + 1 : 0;
86  auto reader = file_.GetReaderAt<Input>(pos);
87  while (reader.HasNext())
88  window_.push_back(reader.template Next<Input>());
89  }
90  return true;
91  }
92 
93  //! PreOp: keep last k - 1 items (local window) and store items.
94  void PreOp(const Input& input) {
95  if (window_.size() >= window_size_ - 1)
97  window_.push_back(input);
98 
99  writer_.Put(input);
100  }
101 
102  void StopPreOp(size_t /* id */) final {
103  writer_.Close();
104  }
105 
106  DIAMemUse PushDataMemUse() final {
107  // window_ is copied in PushData()
108  return 2 * window_size_ * sizeof(Input);
109  }
110 
111  void Dispose() final {
113  file_.Clear();
114  }
115 
116 protected:
117  //! Whether the parent stack is empty
118  const bool parent_stack_empty_;
119  //! Size k of the window
120  size_t window_size_;
121  //! The window function which is applied to k elements.
122  WindowFunction window_function_;
123  //! The window function which is applied to the last < k elements.
124  PartialWindowFunction partial_window_function_;
125 
126  //! cache the last k - 1 items for transmission
128 
129  //! Local data file
130  data::File file_ { context_.GetFile(this) };
131  //! Data writer to local file (only active in PreOp).
132  data::File::Writer writer_ { file_.GetWriter() };
133 
134  //! rank of our first element in file_
135  size_t first_rank_;
136 };
137 
138 /*!
139  * \ingroup api_layer
140  */
141 template <typename ValueType, typename Input,
142  typename WindowFunction, typename PartialWindowFunction>
143 class OverlapWindowNode final
144  : public BaseWindowNode<
145  ValueType, Input, WindowFunction, PartialWindowFunction>
146 {
147  using Super = BaseWindowNode<
148  ValueType, Input, WindowFunction, PartialWindowFunction>;
149  using Super::debug;
150  using Super::context_;
151 
152  using typename Super::RingBuffer;
153 
154 public:
155  template <typename ParentDIA>
156  OverlapWindowNode(const ParentDIA& parent,
157  const char* label, size_t window_size,
158  const WindowFunction& window_function,
159  const PartialWindowFunction& partial_window_function)
160  : Super(parent, label, window_size,
161  window_function, partial_window_function) { }
162 
163  //! Executes the window operation by receiving k - 1 items from our
164  //! preceding worker.
165  void Execute() final {
166  // get rank of our first element
167  first_rank_ = context_.net.ExPrefixSum(file_.num_items());
168 
169  // copy our last elements into a vector
170  std::vector<Input> my_last;
171  my_last.reserve(window_size_ - 1);
172 
173  assert(window_.size() < window_size_);
174  window_.move_to(&my_last);
175 
176  // collective operation: get k - 1 predecessors
177  std::vector<Input> pre =
178  context_.net.Predecessor(window_size_ - 1, my_last);
179 
180  sLOG << "Window::MainOp()"
181  << "first_rank_" << first_rank_
182  << "window_size_" << window_size_
183  << "pre.size()" << pre.size();
184 
185  assert(pre.size() == std::min(window_size_ - 1, first_rank_));
186 
187  // put k - 1 predecessors back into window_
188  for (size_t i = 0; i < pre.size(); ++i)
189  window_.push_back(pre[i]);
190  }
191 
192  void PushData(bool consume) final {
193  data::File::Reader reader = file_.GetReader(consume);
194 
195  // copy window ring buffer containing first items
196  RingBuffer window = window_;
197  // this may wrap around, but that is okay. -tb
198  size_t rank = first_rank_ - (window_size_ - 1);
199 
200  size_t num_items = file_.num_items();
201 
202  sLOG << "WindowNode::PushData()"
203  << "window.size()" << window.size()
204  << "first_rank_" << first_rank_
205  << "rank" << rank
206  << "num_items" << num_items;
207 
208  for (size_t i = 0; i < num_items; ++i, ++rank) {
209  // append an item.
210  window.emplace_back(reader.Next<Input>());
211 
212  // only issue full window frames
213  if (window.size() != window_size_) continue;
214 
215  // call window user-defined function
217  rank, window, [this](const ValueType& output) {
218  this->PushItem(output);
219  });
220 
221  // return to window size - 1
222  if (window.size() >= window_size_ - 1)
223  window.pop_front();
224  }
225 
226  if (context_.my_rank() == context_.num_workers() - 1) {
227  if (window.size() < window_size_ - 1)
228  rank = 0;
229  while (window.size()) {
231  rank, window, [this](const ValueType& output) {
232  this->PushItem(output);
233  });
234  ++rank;
235  window.pop_front();
236  }
237  }
238  }
239 
240 private:
241  using Super::file_;
242  using Super::first_rank_;
243  using Super::window_;
244  using Super::window_size_;
247 };
248 
249 template <typename ValueType, typename Stack>
250 template <typename ValueOut,
251  typename WindowFunction, typename PartialWindowFunction>
253  size_t window_size, const WindowFunction& window_function,
254  const PartialWindowFunction& partial_window_function) const {
255  assert(IsValid());
256 
257  using WindowNode = api::OverlapWindowNode<
258  ValueOut, ValueType, WindowFunction, PartialWindowFunction>;
259 
260  // cannot check WindowFunction's arguments, since it is a template methods
261  // due to the auto emitter.
262 
263  auto node = tlx::make_counting<WindowNode>(
264  *this, "FlatWindow", window_size,
265  window_function, partial_window_function);
266 
267  return DIA<ValueOut>(node);
268 }
269 
270 template <typename ValueType, typename Stack>
271 template <typename ValueOut, typename WindowFunction>
273  size_t window_size, const WindowFunction& window_function) const {
274  assert(IsValid());
275 
276  auto no_operation_function =
277  [](size_t /* index */,
278  const common::RingBuffer<ValueType>& /* window */,
279  auto /* emit */) { };
280 
281  return FlatWindow<ValueOut>(
282  window_size, window_function, no_operation_function);
283 }
284 
285 template <typename ValueType, typename Stack>
286 template <typename WindowFunction>
288  size_t window_size, const WindowFunction& window_function) const {
289  assert(IsValid());
290 
291  using Result
293 
294  static_assert(
295  std::is_convertible<
296  size_t,
298  >::value,
299  "WindowFunction's first argument must be size_t (index)");
300 
301  static_assert(
302  std::is_convertible<
305  >::value,
306  "WindowFunction's second argument must be common::RingBuffer<T>");
307 
308  // transform Map-like function into FlatMap-like function
309  auto flatwindow_function =
310  [window_function](size_t index,
311  const common::RingBuffer<ValueType>& window,
312  auto emit) {
313  emit(window_function(index, window));
314  };
315 
316  auto no_operation_function =
317  [](size_t /* index */,
318  const common::RingBuffer<ValueType>& /* window */,
319  auto /* emit */) { };
320 
321  using WindowNode = api::OverlapWindowNode<
322  Result, ValueType,
323  decltype(flatwindow_function), decltype(no_operation_function)>;
324 
325  auto node = tlx::make_counting<WindowNode>(
326  *this, "Window", window_size,
327  flatwindow_function, no_operation_function);
328 
329  return DIA<Result>(node);
330 }
331 
332 template <typename ValueType, typename Stack>
333 template <typename WindowFunction, typename PartialWindowFunction>
335  size_t window_size, const WindowFunction& window_function,
336  const PartialWindowFunction& partial_window_function) const {
337  assert(IsValid());
338 
339  using Result
341 
342  static_assert(
343  std::is_convertible<
344  size_t,
346  >::value,
347  "WindowFunction's first argument must be size_t (index)");
348 
349  static_assert(
350  std::is_convertible<
353  >::value,
354  "WindowFunction's second argument must be common::RingBuffer<T>");
355 
356  // transform Map-like function into FlatMap-like function
357  auto flatwindow_function =
358  [window_function](size_t index,
359  const common::RingBuffer<ValueType>& window,
360  auto emit) {
361  emit(window_function(index, window));
362  };
363 
364  // transform Map-like function into FlatMap-like function
365  auto flatwindow_partial_function =
366  [partial_window_function](size_t index,
367  const common::RingBuffer<ValueType>& window,
368  auto emit) {
369  emit(partial_window_function(index, window));
370  };
371 
372  using WindowNode = api::OverlapWindowNode<
373  Result, ValueType,
374  decltype(flatwindow_function), decltype(flatwindow_partial_function)>;
375 
376  auto node = tlx::make_counting<WindowNode>(
377  *this, "Window", window_size,
378  flatwindow_function, flatwindow_partial_function);
379 
380  return DIA<Result>(node);
381 }
382 
383 /******************************************************************************/
384 
385 /*!
386  * \ingroup api_layer
387  */
388 template <typename ValueType, typename Input,
389  typename WindowFunction, typename PartialWindowFunction>
391  : public BaseWindowNode<
392  ValueType, Input, WindowFunction, PartialWindowFunction>
393 {
394  using Super = BaseWindowNode<
395  ValueType, Input, WindowFunction, PartialWindowFunction>;
396  using Super::debug;
397  using Super::context_;
398 
399  using typename Super::RingBuffer;
400 
401 public:
402  template <typename ParentDIA>
403  DisjointWindowNode(const ParentDIA& parent,
404  const char* label, size_t window_size,
405  const WindowFunction& window_function,
406  const PartialWindowFunction& partial_window_function)
407  : Super(parent, label, window_size,
408  window_function, partial_window_function) { }
409 
410  //! Executes the window operation by receiving k - 1 items from our
411  //! preceding worker.
412  void Execute() final {
413  // get rank of our first element
414  first_rank_ = context_.net.ExPrefixSum(file_.num_items());
415 
416  // copy our last elements into a vector
417  std::vector<Input> my_last;
418  my_last.reserve(window_size_ - 1);
419 
420  assert(window_.size() < window_size_);
421  window_.move_to(&my_last);
422 
423  // collective operation: get k - 1 predecessors
424  std::vector<Input> pre =
425  context_.net.Predecessor(window_size_ - 1, my_last);
426 
427  assert(pre.size() == std::min(window_size_ - 1, first_rank_));
428 
429  // calculate how many (up to k - 1) predecessors to put into window_
430 
431  size_t fill_size = first_rank_ % window_size_;
432 
433  sLOG << "Window::MainOp()"
434  << "first_rank_" << first_rank_
435  << "file_.size()" << file_.num_items()
436  << "window_size_" << window_size_
437  << "pre.size()" << pre.size()
438  << "fill_size" << fill_size;
439 
440  assert(first_rank_ < window_size_ ||
441  (first_rank_ - fill_size) % window_size_ == 0);
442 
443  // put those predecessors into window_ for PushData() to start with.
444  for (size_t i = pre.size() - fill_size; i < pre.size(); ++i)
445  window_.push_back(pre[i]);
446  }
447 
448  void PushData(bool consume) final {
449  data::File::Reader reader = file_.GetReader(consume);
450 
451  // copy window into vector containing first items
452  std::vector<Input> window;
453  window.reserve(window_size_);
454  window_.copy_to(&window);
455  assert(window.size() < window_size_);
456 
457  size_t rank = first_rank_ - (window_size_ - 1);
458  size_t num_items = file_.num_items();
459 
460  sLOG << "WindowNode::PushData()"
461  << "window.size()" << window.size()
462  << "rank" << rank
463  << "rank+window+1" << (rank + window.size() + 1)
464  << "num_items" << num_items;
465 
466  for (size_t i = 0; i < num_items; ++i, ++rank) {
467  // append an item.
468  window.emplace_back(reader.Next<Input>());
469 
470  sLOG << "rank" << rank << "window.size()" << window.size();
471 
472  // only issue full window frames
473  if (window.size() != window_size_) continue;
474 
475  // call window user-defined function
477  rank, window, [this](const ValueType& output) {
478  this->PushItem(output);
479  });
480 
481  // clear window
482  window.clear();
483  }
484 
485  // call user-defined function for last incomplete window
486  if (context_.my_rank() == context_.num_workers() - 1 &&
487  window.size() != 0)
488  {
489  rank += window_size_ - window.size() - 1;
491  rank, window, [this](const ValueType& output) {
492  this->PushItem(output);
493  });
494  }
495  }
496 
497 private:
498  using Super::file_;
499  using Super::first_rank_;
500  using Super::window_;
501  using Super::window_size_;
504 };
505 
506 template <typename ValueType, typename Stack>
507 template <typename ValueOut, typename WindowFunction>
509  struct DisjointTag const&, size_t window_size,
510  const WindowFunction& window_function) const {
511  assert(IsValid());
512 
513  using WindowNode = api::DisjointWindowNode<
514  ValueOut, ValueType, WindowFunction, WindowFunction>;
515 
516  // cannot check WindowFunction's arguments, since it is a template methods
517  // due to the auto emitter.
518 
519  auto node = tlx::make_counting<WindowNode>(
520  *this, "FlatWindow", window_size, window_function, window_function);
521 
522  return DIA<ValueOut>(node);
523 }
524 
525 template <typename ValueType, typename Stack>
526 template <typename WindowFunction>
528  struct DisjointTag const&, size_t window_size,
529  const WindowFunction& window_function) const {
530  assert(IsValid());
531 
532  using Result
534 
535  static_assert(
536  std::is_convertible<
537  size_t,
539  >::value,
540  "WindowFunction's first argument must be size_t (index)");
541 
542  static_assert(
543  std::is_convertible<
544  std::vector<ValueType>,
546  >::value,
547  "WindowFunction's second argument must be std::vector<T>");
548 
549  // transform Map-like function into FlatMap-like function
550  auto flatwindow_function =
551  [window_function](size_t index,
552  const std::vector<ValueType>& window,
553  auto emit) {
554  emit(window_function(index, window));
555  };
556 
557  using WindowNode = api::DisjointWindowNode<
558  Result, ValueType,
559  decltype(flatwindow_function), decltype(flatwindow_function)>;
560 
561  auto node = tlx::make_counting<WindowNode>(
562  *this, "Window", window_size, flatwindow_function, flatwindow_function);
563 
564  return DIA<Result>(node);
565 }
566 
567 } // namespace api
568 } // namespace thrill
569 
570 #endif // !THRILL_API_WINDOW_HEADER
571 
572 /******************************************************************************/
DisjointWindowNode(const ParentDIA &parent, const char *label, size_t window_size, const WindowFunction &window_function, const PartialWindowFunction &partial_window_function)
Definition: window.hpp:403
OverlapWindowNode(const ParentDIA &parent, const char *label, size_t window_size, const WindowFunction &window_function, const PartialWindowFunction &partial_window_function)
Definition: window.hpp:156
net::FlowControlChannel & net
Definition: context.hpp:443
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
size_t num_items() const
Return the number of items in the file.
Definition: file.hpp:180
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
ValueType_ ValueType
Definition: dia.hpp:152
size_t window_size_
Size k of the window.
Definition: window.hpp:49
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:240
void push_back(const value_type &t)
add element at the end
WindowFunction window_function_
The window function which is applied to k elements.
Definition: window.hpp:122
A ring (circular) buffer of static (non-growing) size.
Definition: ring_buffer.hpp:36
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSum(const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, given a certain sum operation.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
static constexpr bool g_debug_push_file
Definition: config.hpp:44
const char * label() const
return label() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:218
virtual DIAMemUse PreOpMemUse()
Amount of RAM used by PreOp after StartPreOp()
Definition: dia_base.hpp:160
void deallocate()
deallocate buffer
size_type size() const noexcept
return the number of items in the buffer
void copy_to(std::vector< value_type > *out) const
copy all element into the vector
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
common::RingBuffer< Input > RingBuffer
RingBuffer used and passed to user-defined function.
Definition: window.hpp:41
data::File::Writer writer_
Data writer to local file (only active in PreOp).
Definition: window.hpp:132
File Copy() const
Return a copy of the File (explicit copy-constructor)
Definition: file.cpp:42
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
std::vector< T > Predecessor(size_t k, const std::vector< T > &my_values)
Collects up to k predecessors of type T from preceding PEs.
int value
Definition: gen_data.py:41
void pop_front()
remove element at the beginning
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t first_rank_
rank of our first element in file_
Definition: window.hpp:135
common::FunctionTraits< Function > FunctionTraits
alias for convenience.
Definition: dia.hpp:147
auto Window(size_t window_size, const WindowFunction &window_function=WindowFunction()) const
Window is a DOp, which applies a window function to every k consecutive items in a DIA...
Definition: window.hpp:287
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
Definition: window.hpp:448
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:280
void allocate(size_t max_size)
allocate buffer
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
data::File file_
Local data file.
Definition: window.hpp:130
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
virtual bool OnPreOpFile(const data::File &, size_t)
Definition: dia_base.hpp:168
void Close()
Explicitly close the writer.
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:248
auto FlatWindow(size_t window_size, const WindowFunction &window_function=WindowFunction()) const
FlatWindow is a DOp, which applies a window function to every k consecutive items in a DIA...
Definition: window.hpp:272
RingBuffer window_
cache the last k - 1 items for transmission
Definition: window.hpp:127
tag structure for Window() and FlatWindow()
Definition: dia.hpp:62
PartialWindowFunction partial_window_function_
The window function which is applied to the last < k elements.
Definition: window.hpp:124
void emplace_back(Args &&...args)
emplace element at the end
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
Definition: window.hpp:192
BaseWindowNode(const ParentDIA &parent, const char *label, size_t window_size, const WindowFunction &window_function, const PartialWindowFunction &partial_window_function)
Definition: window.hpp:45
static constexpr bool debug
Definition: window.hpp:35
Context & context_
associated Context
Definition: dia_base.hpp:293
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
virtual DIAMemUse PushDataMemUse()
Amount of RAM used by PushData()
Definition: dia_base.hpp:182
void move_to(std::vector< value_type > *out)
move all element from the RingBuffer into the vector