Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
window.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/window.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_API_WINDOW_HEADER
13 #define THRILL_API_WINDOW_HEADER
14 
15 #include <thrill/api/dia.hpp>
16 #include <thrill/api/dop_node.hpp>
17 #include <thrill/common/logger.hpp>
19 #include <thrill/data/file.hpp>
20 
21 #include <algorithm>
22 #include <vector>
23 
24 namespace thrill {
25 namespace api {
26 
27 /*!
28  * \ingroup api_layer
29  */
30 template <typename ValueType, typename Input,
31  typename WindowFunction, typename PartialWindowFunction>
32 class BaseWindowNode : public DOpNode<ValueType>
33 {
34 protected:
35  static constexpr bool debug = false;
36 
38  using Super::context_;
39 
40  //! RingBuffer used and passed to user-defined function.
42 
43 public:
44  template <typename ParentDIA>
45  BaseWindowNode(const ParentDIA& parent,
46  const char* label, size_t window_size,
47  const WindowFunction& window_function,
48  const PartialWindowFunction& partial_window_function)
49  : Super(parent.ctx(), label, { parent.id() }, { parent.node() }),
50  parent_stack_empty_(ParentDIA::stack_empty),
51  window_size_(window_size),
52  window_function_(window_function),
53  partial_window_function_(partial_window_function) {
54  // Hook PreOp(s)
55  auto pre_op_fn = [this](const Input& input) {
56  PreOp(input);
57  };
58 
59  auto lop_chain = parent.stack().push(pre_op_fn).fold();
60  parent.node()->AddChild(this, lop_chain);
61  }
62 
63  DIAMemUse PreOpMemUse() final {
64  return window_size_ * sizeof(Input);
65  }
66 
67  void StartPreOp(size_t /* parent_index */) final {
69  }
70 
71  bool OnPreOpFile(const data::File& file, size_t /* parent_index */) final {
72  if (!parent_stack_empty_) {
74  << "Window rejected File from parent "
75  << "due to non-empty function stack.";
76  return false;
77  }
78  // accept file
79  assert(file_.num_items() == 0);
80  file_ = file.Copy();
81  if (file_.num_items() != 0) {
82  // read last k - 1 items from File
83  size_t pos = file_.num_items() > window_size_ - 1 ?
84  file_.num_items() - window_size_ + 1 : 0;
85  auto reader = file_.GetReaderAt<Input>(pos);
86  while (reader.HasNext())
87  window_.push_back(reader.template Next<Input>());
88  }
89  return true;
90  }
91 
92  //! PreOp: keep last k - 1 items (local window) and store items.
93  void PreOp(const Input& input) {
94  if (window_.size() >= window_size_ - 1)
96  window_.push_back(input);
97 
98  writer_.Put(input);
99  }
100 
101  void StopPreOp(size_t /* parent_index */) final {
102  writer_.Close();
103  }
104 
105  DIAMemUse PushDataMemUse() final {
106  // window_ is copied in PushData()
107  return 2 * window_size_ * sizeof(Input);
108  }
109 
110  void Dispose() final {
112  file_.Clear();
113  }
114 
115 protected:
116  //! Whether the parent stack is empty
117  const bool parent_stack_empty_;
118  //! Size k of the window
119  size_t window_size_;
120  //! The window function which is applied to k elements.
121  WindowFunction window_function_;
122  //! The window function which is applied to the last < k elements.
123  PartialWindowFunction partial_window_function_;
124 
125  //! cache the last k - 1 items for transmission
127 
128  //! Local data file
129  data::File file_ { context_.GetFile(this) };
130  //! Data writer to local file (only active in PreOp).
131  data::File::Writer writer_ { file_.GetWriter() };
132 
133  //! rank of our first element in file_
134  size_t first_rank_;
135 };
136 
137 /*!
138  * \ingroup api_layer
139  */
140 template <typename ValueType, typename Input,
141  typename WindowFunction, typename PartialWindowFunction>
142 class OverlapWindowNode final
143  : public BaseWindowNode<
144  ValueType, Input, WindowFunction, PartialWindowFunction>
145 {
146  using Super = BaseWindowNode<
147  ValueType, Input, WindowFunction, PartialWindowFunction>;
148  using Super::debug;
149  using Super::context_;
150 
151  using typename Super::RingBuffer;
152 
153 public:
154  template <typename ParentDIA>
155  OverlapWindowNode(const ParentDIA& parent,
156  const char* label, size_t window_size,
157  const WindowFunction& window_function,
158  const PartialWindowFunction& partial_window_function)
159  : Super(parent, label, window_size,
160  window_function, partial_window_function) { }
161 
162  //! Executes the window operation by receiving k - 1 items from our
163  //! preceding worker.
164  void Execute() final {
165  // get rank of our first element
166  first_rank_ = context_.net.ExPrefixSum(file_.num_items());
167 
168  // copy our last elements into a vector
169  std::vector<Input> my_last;
170  my_last.reserve(window_size_ - 1);
171 
172  assert(window_.size() < window_size_);
173  window_.move_to(&my_last);
174 
175  // collective operation: get k - 1 predecessors
176  std::vector<Input> pre =
177  context_.net.Predecessor(window_size_ - 1, my_last);
178 
179  sLOG << "Window::MainOp()"
180  << "first_rank_" << first_rank_
181  << "window_size_" << window_size_
182  << "pre.size()" << pre.size();
183 
184  assert(pre.size() == std::min(window_size_ - 1, first_rank_));
185 
186  // put k - 1 predecessors back into window_
187  for (size_t i = 0; i < pre.size(); ++i)
188  window_.push_back(pre[i]);
189  }
190 
191  void PushData(bool consume) final {
192  data::File::Reader reader = file_.GetReader(consume);
193 
194  // copy window ring buffer containing first items
195  RingBuffer window = window_;
196  // this may wrap around, but that is okay. -tb
197  size_t rank = first_rank_ - (window_size_ - 1);
198 
199  size_t num_items = file_.num_items();
200 
201  sLOG << "WindowNode::PushData()"
202  << "window.size()" << window.size()
203  << "first_rank_" << first_rank_
204  << "rank" << rank
205  << "num_items" << num_items;
206 
207  for (size_t i = 0; i < num_items; ++i, ++rank) {
208  // append an item.
209  window.emplace_back(reader.Next<Input>());
210 
211  // only issue full window frames
212  if (window.size() != window_size_) continue;
213 
214  // call window user-defined function
216  rank, window, [this](const ValueType& output) {
217  this->PushItem(output);
218  });
219 
220  // return to window size - 1
221  if (window.size() >= window_size_ - 1)
222  window.pop_front();
223  }
224 
225  if (context_.my_rank() == context_.num_workers() - 1) {
226  if (window.size() < window_size_ - 1)
227  rank = 0;
228  while (window.size()) {
230  rank, window, [this](const ValueType& output) {
231  this->PushItem(output);
232  });
233  ++rank;
234  window.pop_front();
235  }
236  }
237  }
238 
239 private:
240  using Super::file_;
241  using Super::first_rank_;
242  using Super::window_;
243  using Super::window_size_;
246 };
247 
248 template <typename ValueType, typename Stack>
249 template <typename ValueOut,
250  typename WindowFunction, typename PartialWindowFunction>
252  size_t window_size, const WindowFunction& window_function,
253  const PartialWindowFunction& partial_window_function) const {
254  assert(IsValid());
255 
256  using WindowNode = api::OverlapWindowNode<
257  ValueOut, ValueType, WindowFunction, PartialWindowFunction>;
258 
259  // cannot check WindowFunction's arguments, since it is a template methods
260  // due to the auto emitter.
261 
262  auto node = tlx::make_counting<WindowNode>(
263  *this, "FlatWindow", window_size,
264  window_function, partial_window_function);
265 
266  return DIA<ValueOut>(node);
267 }
268 
269 template <typename ValueType, typename Stack>
270 template <typename ValueOut, typename WindowFunction>
272  size_t window_size, const WindowFunction& window_function) const {
273  assert(IsValid());
274 
275  auto no_operation_function =
276  [](size_t /* index */,
277  const common::RingBuffer<ValueType>& /* window */,
278  auto /* emit */) { };
279 
280  return FlatWindow<ValueOut>(
281  window_size, window_function, no_operation_function);
282 }
283 
284 template <typename ValueType, typename Stack>
285 template <typename WindowFunction>
287  size_t window_size, const WindowFunction& window_function) const {
288  assert(IsValid());
289 
290  using Result
292 
293  static_assert(
294  std::is_convertible<
295  size_t,
297  >::value,
298  "WindowFunction's first argument must be size_t (index)");
299 
300  static_assert(
301  std::is_convertible<
304  >::value,
305  "WindowFunction's second argument must be common::RingBuffer<T>");
306 
307  // transform Map-like function into FlatMap-like function
308  auto flatwindow_function =
309  [window_function](size_t index,
310  const common::RingBuffer<ValueType>& window,
311  auto emit) {
312  emit(window_function(index, window));
313  };
314 
315  auto no_operation_function =
316  [](size_t /* index */,
317  const common::RingBuffer<ValueType>& /* window */,
318  auto /* emit */) { };
319 
320  using WindowNode = api::OverlapWindowNode<
321  Result, ValueType,
322  decltype(flatwindow_function), decltype(no_operation_function)>;
323 
324  auto node = tlx::make_counting<WindowNode>(
325  *this, "Window", window_size,
326  flatwindow_function, no_operation_function);
327 
328  return DIA<Result>(node);
329 }
330 
331 template <typename ValueType, typename Stack>
332 template <typename WindowFunction, typename PartialWindowFunction>
334  size_t window_size, const WindowFunction& window_function,
335  const PartialWindowFunction& partial_window_function) const {
336  assert(IsValid());
337 
338  using Result
340 
341  static_assert(
342  std::is_convertible<
343  size_t,
345  >::value,
346  "WindowFunction's first argument must be size_t (index)");
347 
348  static_assert(
349  std::is_convertible<
352  >::value,
353  "WindowFunction's second argument must be common::RingBuffer<T>");
354 
355  // transform Map-like function into FlatMap-like function
356  auto flatwindow_function =
357  [window_function](size_t index,
358  const common::RingBuffer<ValueType>& window,
359  auto emit) {
360  emit(window_function(index, window));
361  };
362 
363  // transform Map-like function into FlatMap-like function
364  auto flatwindow_partial_function =
365  [partial_window_function](size_t index,
366  const common::RingBuffer<ValueType>& window,
367  auto emit) {
368  emit(partial_window_function(index, window));
369  };
370 
371  using WindowNode = api::OverlapWindowNode<
372  Result, ValueType,
373  decltype(flatwindow_function), decltype(flatwindow_partial_function)>;
374 
375  auto node = tlx::make_counting<WindowNode>(
376  *this, "Window", window_size,
377  flatwindow_function, flatwindow_partial_function);
378 
379  return DIA<Result>(node);
380 }
381 
382 /******************************************************************************/
383 
384 /*!
385  * \ingroup api_layer
386  */
387 template <typename ValueType, typename Input,
388  typename WindowFunction, typename PartialWindowFunction>
390  : public BaseWindowNode<
391  ValueType, Input, WindowFunction, PartialWindowFunction>
392 {
393  using Super = BaseWindowNode<
394  ValueType, Input, WindowFunction, PartialWindowFunction>;
395  using Super::debug;
396  using Super::context_;
397 
398  using typename Super::RingBuffer;
399 
400 public:
401  template <typename ParentDIA>
402  DisjointWindowNode(const ParentDIA& parent,
403  const char* label, size_t window_size,
404  const WindowFunction& window_function,
405  const PartialWindowFunction& partial_window_function)
406  : Super(parent, label, window_size,
407  window_function, partial_window_function) { }
408 
409  //! Executes the window operation by receiving k - 1 items from our
410  //! preceding worker.
411  void Execute() final {
412  // get rank of our first element
413  first_rank_ = context_.net.ExPrefixSum(file_.num_items());
414 
415  // copy our last elements into a vector
416  std::vector<Input> my_last;
417  my_last.reserve(window_size_ - 1);
418 
419  assert(window_.size() < window_size_);
420  window_.move_to(&my_last);
421 
422  // collective operation: get k - 1 predecessors
423  std::vector<Input> pre =
424  context_.net.Predecessor(window_size_ - 1, my_last);
425 
426  assert(pre.size() == std::min(window_size_ - 1, first_rank_));
427 
428  // calculate how many (up to k - 1) predecessors to put into window_
429 
430  size_t fill_size = first_rank_ % window_size_;
431 
432  sLOG << "Window::MainOp()"
433  << "first_rank_" << first_rank_
434  << "file_.size()" << file_.num_items()
435  << "window_size_" << window_size_
436  << "pre.size()" << pre.size()
437  << "fill_size" << fill_size;
438 
439  assert(first_rank_ < window_size_ ||
440  (first_rank_ - fill_size) % window_size_ == 0);
441 
442  // put those predecessors into window_ for PushData() to start with.
443  for (size_t i = pre.size() - fill_size; i < pre.size(); ++i)
444  window_.push_back(pre[i]);
445  }
446 
447  void PushData(bool consume) final {
448  data::File::Reader reader = file_.GetReader(consume);
449 
450  // copy window into vector containing first items
451  std::vector<Input> window;
452  window.reserve(window_size_);
453  window_.copy_to(&window);
454  assert(window.size() < window_size_);
455 
456  size_t rank = first_rank_ - (window_size_ - 1);
457  size_t num_items = file_.num_items();
458 
459  sLOG << "WindowNode::PushData()"
460  << "window.size()" << window.size()
461  << "rank" << rank
462  << "rank+window+1" << (rank + window.size() + 1)
463  << "num_items" << num_items;
464 
465  for (size_t i = 0; i < num_items; ++i, ++rank) {
466  // append an item.
467  window.emplace_back(reader.Next<Input>());
468 
469  sLOG << "rank" << rank << "window.size()" << window.size();
470 
471  // only issue full window frames
472  if (window.size() != window_size_) continue;
473 
474  // call window user-defined function
476  rank, window, [this](const ValueType& output) {
477  this->PushItem(output);
478  });
479 
480  // clear window
481  window.clear();
482  }
483 
484  // call user-defined function for last incomplete window
485  if (context_.my_rank() == context_.num_workers() - 1 &&
486  window.size() != 0)
487  {
488  rank += window_size_ - window.size() - 1;
490  rank, window, [this](const ValueType& output) {
491  this->PushItem(output);
492  });
493  }
494  }
495 
496 private:
497  using Super::file_;
498  using Super::first_rank_;
499  using Super::window_;
500  using Super::window_size_;
503 };
504 
505 template <typename ValueType, typename Stack>
506 template <typename ValueOut, typename WindowFunction>
508  struct DisjointTag const&, size_t window_size,
509  const WindowFunction& window_function) const {
510  assert(IsValid());
511 
512  using WindowNode = api::DisjointWindowNode<
513  ValueOut, ValueType, WindowFunction, WindowFunction>;
514 
515  // cannot check WindowFunction's arguments, since it is a template methods
516  // due to the auto emitter.
517 
518  auto node = tlx::make_counting<WindowNode>(
519  *this, "FlatWindow", window_size, window_function, window_function);
520 
521  return DIA<ValueOut>(node);
522 }
523 
524 template <typename ValueType, typename Stack>
525 template <typename WindowFunction>
527  struct DisjointTag const&, size_t window_size,
528  const WindowFunction& window_function) const {
529  assert(IsValid());
530 
531  using Result
533 
534  static_assert(
535  std::is_convertible<
536  size_t,
538  >::value,
539  "WindowFunction's first argument must be size_t (index)");
540 
541  static_assert(
542  std::is_convertible<
543  std::vector<ValueType>,
545  >::value,
546  "WindowFunction's second argument must be std::vector<T>");
547 
548  // transform Map-like function into FlatMap-like function
549  auto flatwindow_function =
550  [window_function](size_t index,
551  const std::vector<ValueType>& window,
552  auto emit) {
553  emit(window_function(index, window));
554  };
555 
556  using WindowNode = api::DisjointWindowNode<
557  Result, ValueType,
558  decltype(flatwindow_function), decltype(flatwindow_function)>;
559 
560  auto node = tlx::make_counting<WindowNode>(
561  *this, "Window", window_size, flatwindow_function, flatwindow_function);
562 
563  return DIA<Result>(node);
564 }
565 
566 } // namespace api
567 } // namespace thrill
568 
569 #endif // !THRILL_API_WINDOW_HEADER
570 
571 /******************************************************************************/
DisjointWindowNode(const ParentDIA &parent, const char *label, size_t window_size, const WindowFunction &window_function, const PartialWindowFunction &partial_window_function)
Definition: window.hpp:402
OverlapWindowNode(const ParentDIA &parent, const char *label, size_t window_size, const WindowFunction &window_function, const PartialWindowFunction &partial_window_function)
Definition: window.hpp:155
net::FlowControlChannel & net
Definition: context.hpp:446
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
size_t num_items() const
Return the number of items in the file.
Definition: file.hpp:180
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
ValueType_ ValueType
Definition: dia.hpp:152
size_t window_size_
Size k of the window.
Definition: window.hpp:49
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
void push_back(const value_type &t)
add element at the end
WindowFunction window_function_
The window function which is applied to k elements.
Definition: window.hpp:121
A ring (circular) buffer of static (non-growing) size.
Definition: ring_buffer.hpp:36
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSum(const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, given a certain sum operation.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
static constexpr bool g_debug_push_file
Definition: config.hpp:44
const char * label() const
return label() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:218
virtual DIAMemUse PreOpMemUse()
Amount of RAM used by PreOp after StartPreOp()
Definition: dia_base.hpp:160
void deallocate()
deallocate buffer
size_type size() const noexcept
return the number of items in the buffer
void copy_to(std::vector< value_type > *out) const
copy all element into the vector
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
common::RingBuffer< Input > RingBuffer
RingBuffer used and passed to user-defined function.
Definition: window.hpp:41
data::File::Writer writer_
Data writer to local file (only active in PreOp).
Definition: window.hpp:131
File Copy() const
Return a copy of the File (explicit copy-constructor)
Definition: file.cpp:42
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
std::vector< T > Predecessor(size_t k, const std::vector< T > &my_values)
Collects up to k predecessors of type T from preceding PEs.
int value
Definition: gen_data.py:41
void pop_front()
remove element at the beginning
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t first_rank_
rank of our first element in file_
Definition: window.hpp:134
common::FunctionTraits< Function > FunctionTraits
alias for convenience.
Definition: dia.hpp:147
auto Window(size_t window_size, const WindowFunction &window_function=WindowFunction()) const
Window is a DOp, which applies a window function to every k consecutive items in a DIA...
Definition: window.hpp:286
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
Definition: window.hpp:447
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:283
void allocate(size_t max_size)
allocate buffer
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
data::File file_
Local data file.
Definition: window.hpp:129
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
virtual bool OnPreOpFile(const data::File &, size_t)
Definition: dia_base.hpp:168
void Close()
Explicitly close the writer.
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:251
auto FlatWindow(size_t window_size, const WindowFunction &window_function=WindowFunction()) const
FlatWindow is a DOp, which applies a window function to every k consecutive items in a DIA...
Definition: window.hpp:271
RingBuffer window_
cache the last k - 1 items for transmission
Definition: window.hpp:126
tag structure for Window() and FlatWindow()
Definition: dia.hpp:62
PartialWindowFunction partial_window_function_
The window function which is applied to the last < k elements.
Definition: window.hpp:123
void emplace_back(Args &&...args)
emplace element at the end
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
Definition: window.hpp:191
BaseWindowNode(const ParentDIA &parent, const char *label, size_t window_size, const WindowFunction &window_function, const PartialWindowFunction &partial_window_function)
Definition: window.hpp:45
static constexpr bool debug
Definition: window.hpp:35
Context & context_
associated Context
Definition: dia_base.hpp:293
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
virtual DIAMemUse PushDataMemUse()
Amount of RAM used by PushData()
Definition: dia_base.hpp:182
void move_to(std::vector< value_type > *out)
move all element from the RingBuffer into the vector