Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
zip_window.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/zip_window.hpp
3  *
4  * DIANode for a zip operation. Performs the actual zip operation
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
9  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
10  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_API_ZIP_WINDOW_HEADER
17 #define THRILL_API_ZIP_WINDOW_HEADER
18 
19 #include <thrill/api/dia.hpp>
20 #include <thrill/api/dop_node.hpp>
22 #include <thrill/common/logger.hpp>
23 #include <thrill/common/string.hpp>
24 #include <thrill/data/file.hpp>
25 #include <tlx/meta/apply_tuple.hpp>
28 #include <tlx/meta/vexpand.hpp>
30 
31 #include <algorithm>
32 #include <array>
33 #include <functional>
34 #include <tuple>
35 #include <vector>
36 
37 namespace thrill {
38 namespace api {
39 
40 /******************************************************************************/
41 
42 //! tag structure for ZipWindow()
43 struct ArrayTag {
44  ArrayTag() { }
45 };
46 
47 //! global const ArrayTag instance
48 const struct ArrayTag ArrayTag;
49 
50 /******************************************************************************/
51 // ZipWindowTraits - Helper to create std::tuple<std::vector<Args> ...>
52 
53 #ifndef THRILL_DOXYGEN_IGNORE
54 
55 // taken from: http://stackoverflow.com/questions/7943525/
56 // is-it-possible-to-figure-out-the-parameter-type-and-return-type-of-a-lambda
57 template <typename T>
58 struct ZipWindowTraits : public ZipWindowTraits<decltype(& T::operator ())>{ };
59 // For generic types, directly use the result of the signature of its 'operator()'
60 
61 #endif
62 
63 //! specialize for pointers to const member function
64 template <typename ClassType, typename ReturnType, typename... Args>
65 struct ZipWindowTraits<ReturnType (ClassType::*)(Args...) const>{
66 
67  //! arity is the number of arguments.
68  static constexpr size_t arity = sizeof ... (Args);
69 
70  using result_type = ReturnType;
71  using is_const = std::true_type;
72 
73  //! the tuple of value_type inside the vectors
74  using value_type_tuple = std::tuple<
75  typename std::remove_cv<
76  typename std::remove_reference<Args>::type
77  >::type::value_type...>;
78 
79  //! the tuple of value_types: with remove_cv and remove_reference applied.
80  using value_type_tuple_plain = std::tuple<
81  typename std::remove_cv<
82  typename std::remove_reference<
83  typename std::remove_cv<
84  typename std::remove_reference<Args>::type
85  >::type::value_type>::type>::type...>;
86 
87  //! the i-th argument is equivalent to the i-th tuple element of a tuple
88  //! composed of those arguments.
89  template <size_t i>
90  using value_type = typename std::tuple_element<i, value_type_tuple>::type;
91 
92  //! return i-th argument reduced to plain type: remove_cv and
93  //! remove_reference.
94  template <size_t i>
95  using value_type_plain =
96  typename std::remove_cv<
97  typename std::remove_reference<value_type<i> >::type>::type;
98 
99  //! the tuple of std::vector<>s: with remove_cv and remove_reference applied
100  using vector_tuple_plain = std::tuple<
101  typename std::remove_cv<
102  typename std::remove_reference<Args>::type>::type...>;
103 
104  //! the i-th argument is equivalent to the i-th tuple element of a tuple
105  //! composed of those arguments.
106  template <size_t i>
107  using vector_plain = typename std::tuple_element<
109 };
110 
111 //! specialize for pointers to mutable member function
112 template <typename ClassType, typename ReturnType, typename... Args>
113 struct ZipWindowTraits<ReturnType (ClassType::*)(Args...)>
114  : public ZipWindowTraits<ReturnType (ClassType::*)(Args...) const>{
115  using is_const = std::false_type;
116 };
117 
118 //! specialize for function pointers
119 template <typename ReturnType, typename... Args>
120 struct ZipWindowTraits<ReturnType (*)(Args...)>{
121 
122  //! arity is the number of arguments.
123  static constexpr size_t arity = sizeof ... (Args);
124 
125  using result_type = ReturnType;
126  using is_const = std::true_type;
127 
128  //! the tuple of value_type inside the vectors
129  using value_type_tuple = std::tuple<
130  typename std::remove_cv<
131  typename std::remove_reference<Args>::type
132  >::type::value_type...>;
133 
134  //! the tuple of value_types: with remove_cv and remove_reference applied.
135  using value_type_tuple_plain = std::tuple<
136  typename std::remove_cv<
137  typename std::remove_reference<
138  typename std::remove_cv<
139  typename std::remove_reference<Args>::type
140  >::type::value_type>::type>::type...>;
141 
142  //! the i-th argument is equivalent to the i-th tuple element of a tuple
143  //! composed of those arguments.
144  template <size_t i>
145  using value_type = typename std::tuple_element<i, value_type_tuple>::type;
146 
147  //! return i-th argument reduced to plain type: remove_cv and
148  //! remove_reference.
149  template <size_t i>
150  using value_type_plain =
151  typename std::remove_cv<
152  typename std::remove_reference<value_type<i> >::type>::type;
153 
154  //! the tuple of std::vector<>s: with remove_cv and remove_reference applied
155  using vector_tuple_plain = std::tuple<
156  typename std::remove_cv<
157  typename std::remove_reference<Args>::type>::type...>;
158 
159  //! the i-th argument is equivalent to the i-th tuple element of a tuple
160  //! composed of those arguments.
161  template <size_t i>
162  using vector_plain = typename std::tuple_element<
164 };
165 
166 /******************************************************************************/
167 
168 template <typename ZipWindowNode, bool UseArray>
170 
171 /******************************************************************************/
172 
173 template <typename ValueType, typename ZipFunction_,
174  bool Pad_, bool UnequalCheck, bool UseStdArray, size_t kNumInputs_>
175 class ZipWindowNode final : public DOpNode<ValueType>
176 {
177  static constexpr bool debug = false;
178 
179  //! Set this variable to true to enable generation and output of stats
180  static constexpr bool stats_enabled = false;
181 
183  using Super::context_;
184 
185 public:
186  using ZipFunction = ZipFunction_;
187  static constexpr bool Pad = Pad_;
188  static constexpr size_t kNumInputs = kNumInputs_;
189 
190  template <size_t Index>
191  using ZipArgN =
192  typename ZipWindowTraits<ZipFunction>::template value_type_plain<Index>;
193  using ZipArgsTuple =
194  typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain;
195 
196 public:
197  /*!
198  * Constructor for a ZipWindowNode.
199  */
200  template <typename ParentDIA0, typename... ParentDIAs>
201  ZipWindowNode(const std::array<size_t, kNumInputs>& window_size,
202  const ZipFunction& zip_function, const ZipArgsTuple& padding,
203  const ParentDIA0& parent0, const ParentDIAs& ... parents)
204  : Super(parent0.ctx(), "ZipWindow",
205  { parent0.id(), parents.id() ... },
206  { parent0.node(), parents.node() ... }),
207  window_size_(window_size),
208  zip_function_(zip_function),
209  padding_(padding),
210  // this weirdness is due to a MSVC2015 parser bug
212  std::array<bool, kNumInputs>{
213  { ParentDIA0::stack_empty, (ParentDIAs::stack_empty)... }
214  })
215  {
216  // allocate files.
217  files_.reserve(kNumInputs);
218  for (size_t i = 0; i < kNumInputs; ++i)
219  files_.emplace_back(context_.GetFile(this));
220 
221  // Hook PreOp(s)
223  RegisterParent(this), parent0, parents...);
224  }
225 
226  void StartPreOp(size_t parent_index) final {
227  writers_[parent_index] = files_[parent_index].GetWriter();
228  }
229 
230  //! Receive a whole data::File of ValueType, but only if our stack is empty.
231  bool OnPreOpFile(const data::File& file, size_t parent_index) final {
232  assert(parent_index < kNumInputs);
233  if (!parent_stack_empty_[parent_index]) {
234  LOGC(context_.my_rank() == 0)
235  << "ZipWindow rejected File from parent "
236  << "due to non-empty function stack.";
237  return false;
238  }
239 
240  // accept file
241  LOGC(context_.my_rank() == 0)
242  << "ZipWindow accepted File from parent " << parent_index;
243  assert(files_[parent_index].num_items() == 0);
244  files_[parent_index] = file.Copy();
245  return true;
246  }
247 
248  void StopPreOp(size_t parent_index) final {
249  LOG << *this << " StopPreOp() parent_index=" << parent_index;
250  writers_[parent_index].Close();
251  }
252 
253  void Execute() final {
254  MainOp();
255  }
256 
257  void PushData(bool consume) final {
258  size_t result_count = 0;
259 
260  if (result_window_count_ != 0) {
261  // get inbound readers from all Streams
262  std::array<data::CatStream::CatReader, kNumInputs> readers;
263  for (size_t i = 0; i < kNumInputs; ++i)
264  readers[i] = streams_[i]->GetCatReader(consume);
265 
266  ZipWindowReader<ZipWindowNode, UseStdArray> reader_next(
267  *this, readers);
268 
269  while (reader_next.HasNext()) {
270  auto v = tlx::vmap_for_range<kNumInputs>(reader_next);
272  ++result_count;
273  }
274  }
275 
276  if (stats_enabled) {
278  "ZipWindow() result_count", result_count);
279  }
280  }
281 
282  void Dispose() final {
283  files_.clear();
284  for (size_t i = 0; i < kNumInputs; ++i)
285  streams_[i].reset();
286  }
287 
288 private:
289  //! Size k of the windows
290  const std::array<size_t, kNumInputs> window_size_;
291 
292  //! Zip function
294 
295  //! padding for shorter DIAs
297 
298  //! Whether the parent stack is empty
299  const std::array<bool, kNumInputs> parent_stack_empty_;
300 
301  //! Files for intermediate storage
302  std::vector<data::File> files_;
303 
304  //! Writers to intermediate files
306 
307  //! Array of inbound CatStreams
309 
310  //! \name Variables for Calculating Exchange
311  //! \{
312 
313  //! exclusive prefix sum over the number of items in workers
314  std::array<size_t, kNumInputs> size_prefixsum_;
315 
316  //! shortest size of Zipped inputs
318 
319  //! \}
320 
321  //! Register Parent PreOp Hooks, instantiated and called for each Zip parent
323  {
324  public:
325  explicit RegisterParent(ZipWindowNode* node) : node_(node) { }
326 
327  template <typename Index, typename Parent>
328  void operator () (const Index&, Parent& parent) {
329 
330  // get the ZipFunction's argument for this index
331  using ZipArg = ZipArgN<Index::index>;
332 
333  // check that the parent's type is convertible to the ZipFunction
334  // argument.
335 
336  static_assert(
338  "ZipFunction argument does not match input DIA");
339 
340  // construct lambda with only the writer in the closure
341  data::File::Writer* writer = &node_->writers_[Index::index];
342  auto pre_op_fn = [writer](const ZipArg& input) -> void {
343  writer->Put(input);
344  };
345 
346  // close the function stacks with our pre ops and register it at
347  // parent nodes for output
348  auto lop_chain = parent.stack().push(pre_op_fn).fold();
349  parent.node()->AddChild(node_, lop_chain, Index::index);
350  }
351 
352  private:
354  };
355 
356  //! Scatter items from DIA "Index" to other workers if necessary.
357  template <size_t Index>
358  void DoScatter() {
359  const size_t workers = context_.num_workers();
360 
361  size_t result_size = result_window_count_ * window_size_[Index];
362 
363  // range of items on local node
364  size_t local_begin = std::min(result_size, size_prefixsum_[Index]);
365  size_t local_end = std::min(
366  result_size, size_prefixsum_[Index] + files_[Index].num_items());
367 
368  // number of elements per worker (double)
369  double per_pe =
370  static_cast<double>(result_window_count_) / static_cast<double>(workers);
371  // offsets for scattering
372  std::vector<size_t> offsets(workers + 1, 0);
373 
374  for (size_t i = 0; i <= workers; ++i) {
375  // calculate range we have to send to each PE
376  size_t cut =
377  static_cast<size_t>(std::ceil(i * per_pe)) * window_size_[Index];
378  offsets[i] =
379  cut < local_begin ? 0 : std::min(cut, local_end) - local_begin;
380  }
381 
382  LOG << "per_pe=" << per_pe
383  << " offsets[" << Index << "] = " << offsets;
384 
385  // target stream id
386  streams_[Index] = context_.GetNewCatStream(this);
387 
388  // scatter elements to other workers, if necessary
389  using ZipArg = ZipArgN<Index>;
390  streams_[Index]->template Scatter<ZipArg>(
391  files_[Index], offsets, /* consume */ true);
392  }
393 
394  //! Receive elements from other workers.
395  void MainOp() {
396  // first: calculate total size of the DIAs to Zip
397 
398  using ArraySizeT = std::array<size_t, kNumInputs>;
399 
400  // number of elements of this worker
401  ArraySizeT local_size;
402  for (size_t i = 0; i < kNumInputs; ++i) {
403  local_size[i] = files_[i].num_items();
404  sLOG << "input" << i << "local_size" << local_size[i];
405 
406  if (stats_enabled) {
408  "ZipWindow() local_size", local_size[i]);
409  }
410  }
411 
412  // exclusive prefixsum of number of elements: we have items from
413  // [size_prefixsum, size_prefixsum + local_size). And get the total
414  // number of items in each DIAs, over all worker.
415  size_prefixsum_ = local_size;
416  ArraySizeT total_size = context_.net.ExPrefixSumTotal(
417  size_prefixsum_, common::ComponentSum<ArraySizeT>());
418 
419  // calculate number of full windows in each DIA
420  ArraySizeT total_window_count;
421  for (size_t i = 0; i < kNumInputs; ++i) {
422  total_window_count[i] =
423  (total_size[i] + window_size_[i] - 1) / window_size_[i];
424  }
425 
426  size_t max_total_window_count = *std::max_element(
427  total_window_count.begin(), total_window_count.end());
428 
429  // return only the minimum window count of all DIAs.
430  result_window_count_ =
431  Pad ? max_total_window_count : *std::min_element(
432  total_window_count.begin(), total_window_count.end());
433 
434  sLOG << "ZipWindow() total_size" << total_size
435  << "total_window_count" << total_window_count
436  << "max_total_window_count" << max_total_window_count
437  << "result_window_count_" << result_window_count_;
438 
439  // warn if DIAs have unequal window size
440  if (!Pad && UnequalCheck && result_window_count_ != max_total_window_count) {
441  die("ZipWindow(): input DIAs have unequal size: "
442  << common::VecToStr(total_size));
443  }
444 
445  if (result_window_count_ == 0) return;
446 
447  // perform scatters to exchange data, with different types.
448  tlx::call_for_range<kNumInputs>(
449  [=](auto index) {
450  tlx::unused(index);
451  this->DoScatter<decltype(index)::index>();
452  });
453  }
454 
455  //! for access to internal members
456  template <typename ZipWindowNode, bool UseArray>
457  friend class ZipWindowReader;
458 };
459 
460 //! template specialization Reader which delivers std::vector<>s to ZipFunction
461 template <typename ZipWindowNode>
462 class ZipWindowReader<ZipWindowNode, /* UseArray */ false>
463 {
464 public:
467  static constexpr size_t Pad = ZipWindowNode::Pad;
468  static constexpr size_t kNumInputs = ZipWindowNode::kNumInputs;
469 
470  template <size_t Index>
471  using ZipArgN =
472  typename ZipWindowTraits<ZipFunction>
473  ::template value_type_plain<Index>;
474 
475  template <size_t Index>
476  using ZipVectorN =
477  typename ZipWindowTraits<ZipFunction>
478  ::template vector_plain<Index>;
479 
481  std::array<Reader, kNumInputs>& readers)
482  : zip_node_(zip_node), readers_(readers) { }
483 
484  //! helper for PushData() which checks all inputs
485  bool HasNext() {
486  if (Pad) {
487  for (size_t i = 0; i < kNumInputs; ++i) {
488  if (readers_[i].HasNext()) return true;
489  }
490  return false;
491  }
492  else {
493  for (size_t i = 0; i < kNumInputs; ++i) {
494  if (!readers_[i].HasNext()) return false;
495  }
496  return true;
497  }
498  }
499 
500  template <typename Index>
501  const ZipVectorN<Index::index>& operator () (const Index&) {
502 
503  // get the ZipFunction's argument for this index
504  using ZipArg = ZipArgN<Index::index>;
505 
506  std::vector<ZipArg>& vec = std::get<Index::index>(vectors_);
507  vec.clear();
508 
509  for (size_t i = 0; i < zip_node_.window_size_[Index::index]; ++i) {
510  if (Pad && !readers_[Index::index].HasNext()) {
511  // take padding_ if next is not available.
512  vec.emplace_back(
513  std::get<Index::index>(zip_node_.padding_));
514  }
515  else {
516  vec.emplace_back(
517  readers_[Index::index].template Next<ZipArg>());
518  }
519  }
520 
521  return vec;
522  }
523 
524 private:
526 
527  //! reference to the reader array in PushData().
528  std::array<Reader, kNumInputs>& readers_;
529 
530  //! tuple of std::vector<>s
531  typename ZipWindowTraits<ZipFunction>::vector_tuple_plain vectors_;
532 };
533 
534 //! template specialization Reader which delivers std::array<>s to ZipFunction
535 template <typename ZipWindowNode>
536 class ZipWindowReader<ZipWindowNode, /* UseArray */ true>
537 {
538 public:
541  static constexpr size_t Pad = ZipWindowNode::Pad;
542  static constexpr size_t kNumInputs = ZipWindowNode::kNumInputs;
543 
544  template <size_t Index>
545  using ZipArgN =
546  typename ZipWindowTraits<ZipFunction>
547  ::template value_type_plain<Index>;
548 
549  template <size_t Index>
550  using ZipVectorN =
551  typename ZipWindowTraits<ZipFunction>
552  ::template vector_plain<Index>;
553 
555  std::array<Reader, kNumInputs>& readers)
556  : zip_node_(zip_node), readers_(readers) { }
557 
558  //! helper for PushData() which checks all inputs
559  bool HasNext() {
560  if (Pad) {
561  for (size_t i = 0; i < kNumInputs; ++i) {
562  if (readers_[i].HasNext()) return true;
563  }
564  return false;
565  }
566  else {
567  for (size_t i = 0; i < kNumInputs; ++i) {
568  if (!readers_[i].HasNext()) return false;
569  }
570  return true;
571  }
572  }
573 
574  template <typename Index>
575  const ZipVectorN<Index::index>& operator () (const Index&) {
576 
577  // get the ZipFunction's argument for this index
578  using ZipArg = ZipArgN<Index::index>;
579  using ZipVector = ZipVectorN<Index::index>;
580 
581  ZipVector& vec = std::get<Index::index>(vectors_);
582 
583  for (size_t i = 0; i < zip_node_.window_size_[Index::index]; ++i) {
584  if (Pad && !readers_[Index::index].HasNext()) {
585  // take padding_ if next is not available.
586  vec[i] = std::get<Index::index>(zip_node_.padding_);
587  }
588  else {
589  vec[i] = readers_[Index::index].template Next<ZipArg>();
590  }
591  }
592 
593  return vec;
594  }
595 
596 private:
598 
599  //! reference to the reader array in PushData().
600  std::array<Reader, kNumInputs>& readers_;
601 
602  //! tuple of std::vector<>s
603  typename ZipWindowTraits<ZipFunction>::vector_tuple_plain vectors_;
604 };
605 
606 /******************************************************************************/
607 
608 /*!
609  * Zips two DIAs of equal size in style of functional programming by applying
610  * zip_function to the i-th fixed-sized windows of both input DIAs to form the
611  * i-th element of the output DIA. The input DIAs length must be multiples of
612  * the corresponding window size. The type of the output DIA can be inferred
613  * from the zip_function.
614  *
615  * The two input DIAs are required to be of equal window multiples, otherwise
616  * use the CutTag variant.
617  *
618  * \ingroup dia_dops
619  */
620 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
621  typename... DIAs>
622 auto ZipWindow(const std::array<size_t, 1 + sizeof ... (DIAs)>& window_size,
623  const ZipFunction& zip_function,
624  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
625  const DIAs& ... dias) {
626 
627  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
628 
629  static_assert(
630  std::is_convertible<
631  std::vector<FirstDIAType>,
632  typename ZipWindowTraits<ZipFunction>::template vector_plain<0>
633  >::value,
634  "ZipFunction has the wrong input type in DIA 0");
635 
636  using ZipResult
637  = typename ZipWindowTraits<ZipFunction>::result_type;
638 
639  using ZipArgsTuple =
640  typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain;
641 
643  ZipResult, ZipFunction,
644  /* Pad */ false, /* UnequalCheck */ true,
645  /* UseStdArray */ false,
646  1 + sizeof ... (DIAs)>;
647 
648  auto node = tlx::make_counting<ZipWindowNode>(
649  window_size, zip_function, ZipArgsTuple(), first_dia, dias...);
650 
651  return DIA<ZipResult>(node);
652 }
653 
654 /*!
655  * Zips two DIAs of equal size in style of functional programming by applying
656  * zip_function to the i-th fixed-sized windows of both input DIAs to form the
657  * i-th element of the output DIA. The input DIAs length must be multiples of
658  * the corresponding window size. The type of the output DIA can be inferred
659  * from the zip_function.
660  *
661  * If the two input DIAs are of unequal size, the result is the shorter of
662  * both. Otherwise use the PadTag variant.
663  *
664  * \ingroup dia_dops
665  */
666 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
667  typename... DIAs>
668 auto ZipWindow(struct CutTag,
669  const std::array<size_t, 1 + sizeof ... (DIAs)>& window_size,
670  const ZipFunction& zip_function,
671  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
672  const DIAs& ... dias) {
673 
674  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
675 
676  static_assert(
677  std::is_convertible<
678  std::vector<FirstDIAType>,
679  typename ZipWindowTraits<ZipFunction>::template vector_plain<0>
680  >::value,
681  "ZipFunction has the wrong input type in DIA 0");
682 
683  using ZipResult
684  = typename ZipWindowTraits<ZipFunction>::result_type;
685 
686  using ZipArgsTuple =
687  typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain;
688 
690  ZipResult, ZipFunction,
691  /* Pad */ false, /* UnequalCheck */ false,
692  /* UseStdArray */ false,
693  1 + sizeof ... (DIAs)>;
694 
695  auto node = tlx::make_counting<ZipWindowNode>(
696  window_size, zip_function, ZipArgsTuple(), first_dia, dias...);
697 
698  return DIA<ZipResult>(node);
699 }
700 
701 /*!
702  * Zips two DIAs of equal size in style of functional programming by applying
703  * zip_function to the i-th fixed-sized windows of both input DIAs to form the
704  * i-th element of the output DIA. The input DIAs length must be multiples of
705  * the corresponding window size. The type of the output DIA can be inferred
706  * from the zip_function.
707  *
708  * The output DIA's length is the *maximum* of all input DIAs, shorter DIAs are
709  * padded with items given by the padding parameter.
710  *
711  * \ingroup dia_dops
712  */
713 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
714  typename... DIAs>
716  struct PadTag,
717  const std::array<size_t, 1 + sizeof ... (DIAs)>& window_size,
718  const ZipFunction& zip_function,
719  const typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain& padding,
720  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
721  const DIAs& ... dias) {
722 
723  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
724 
725  static_assert(
726  std::is_convertible<
727  std::vector<FirstDIAType>,
728  typename ZipWindowTraits<ZipFunction>::template vector_plain<0>
729  >::value,
730  "ZipFunction has the wrong input type in DIA 0");
731 
732  using ZipResult =
733  typename common::FunctionTraits<ZipFunction>::result_type;
734 
736  ZipResult, ZipFunction,
737  /* Pad */ true, /* UnequalCheck */ false,
738  /* UseStdArray */ false,
739  1 + sizeof ... (DIAs)>;
740 
741  auto node = tlx::make_counting<ZipWindowNode>(
742  window_size, zip_function, padding, first_dia, dias...);
743 
744  return DIA<ZipResult>(node);
745 }
746 
747 /*!
748  * Zips two DIAs of equal size in style of functional programming by applying
749  * zip_function to the i-th fixed-sized windows of both input DIAs to form the
750  * i-th element of the output DIA. The input DIAs length must be multiples of
751  * the corresponding window size. The type of the output DIA can be inferred
752  * from the zip_function.
753  *
754  * The output DIA's length is the *maximum* of all input DIAs, shorter DIAs are
755  * padded with default-constructed items.
756  *
757  * \ingroup dia_dops
758  */
759 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
760  typename... DIAs>
761 auto ZipWindow(struct PadTag,
762  const std::array<size_t, 1 + sizeof ... (DIAs)>& window_size,
763  const ZipFunction& zip_function,
764  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
765  const DIAs& ... dias) {
766 
767  using ZipArgsTuple =
768  typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain;
769 
770  return ZipWindow(PadTag, window_size, zip_function,
771  ZipArgsTuple(), first_dia, dias...);
772 }
773 
774 /******************************************************************************/
775 
776 /*!
777  * Zips two DIAs of equal size in style of functional programming by applying
778  * zip_function to the i-th fixed-sized windows of both input DIAs to form the
779  * i-th element of the output DIA. The input DIAs length must be multiples of
780  * the corresponding window size. The type of the output DIA can be inferred
781  * from the zip_function.
782  *
783  * The output DIA's length is the *maximum* of all input DIAs, shorter DIAs are
784  * padded with items given by the padding parameter.
785  *
786  * \ingroup dia_dops
787  */
788 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
789  typename... DIAs>
791  struct ArrayTag,
792  struct PadTag,
793  const std::array<size_t, 1 + sizeof ... (DIAs)>& window_size,
794  const ZipFunction& zip_function,
795  const typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain& padding,
796  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
797  const DIAs& ... dias) {
798 
799  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
800 
801  // static_assert(
802  // std::is_convertible<
803  // std::array<FirstDIAType, >,
804  // typename ZipWindowTraits<ZipFunction>::template vector_plain<0>
805  // >::value,
806  // "ZipFunction has the wrong input type in DIA 0");
807 
808  using ZipResult =
809  typename common::FunctionTraits<ZipFunction>::result_type;
810 
812  ZipResult, ZipFunction,
813  /* Pad */ true, /* UnequalCheck */ false,
814  /* UseStdArray */ true,
815  1 + sizeof ... (DIAs)>;
816 
817  auto node = tlx::make_counting<ZipWindowNode>(
818  window_size, zip_function, padding, first_dia, dias...);
819 
820  return DIA<ZipResult>(node);
821 }
822 
823 /*!
824  * Zips two DIAs of equal size in style of functional programming by applying
825  * zip_function to the i-th fixed-sized windows of both input DIAs to form the
826  * i-th element of the output DIA. The input DIAs length must be multiples of
827  * the corresponding window size. The type of the output DIA can be inferred
828  * from the zip_function.
829  *
830  * The output DIA's length is the *maximum* of all input DIAs, shorter DIAs are
831  * padded with default constructed items.
832  *
833  * \ingroup dia_dops
834  */
835 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
836  typename... DIAs>
838  struct ArrayTag,
839  struct PadTag,
840  const std::array<size_t, 1 + sizeof ... (DIAs)>& window_size,
841  const ZipFunction& zip_function,
842  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
843  const DIAs& ... dias) {
844 
845  using ZipArgsTuple =
846  typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain;
847 
848  return ZipWindow(ArrayTag, PadTag, window_size, zip_function,
849  ZipArgsTuple(), first_dia, dias...);
850 }
851 
852 /******************************************************************************/
853 
854 //! \}
855 
856 } // namespace api
857 
858 //! imported from api namespace
859 using api::ZipWindow;
860 
861 //! imported from api namespace
862 using api::ArrayTag;
863 
864 } // namespace thrill
865 
866 #endif // !THRILL_API_ZIP_WINDOW_HEADER
867 
868 /******************************************************************************/
typename ZipWindowNode::ZipFunction ZipFunction
Definition: zip_window.hpp:466
tag structure for ZipWindow()
Definition: zip_window.hpp:43
net::FlowControlChannel & net
Definition: context.hpp:443
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
std::tuple< typename std::remove_cv< typename std::remove_reference< Args >::type >::type::value_type...> value_type_tuple
the tuple of value_type inside the vectors
Definition: zip_window.hpp:77
ZipWindowNode(const std::array< size_t, kNumInputs > &window_size, const ZipFunction &zip_function, const ZipArgsTuple &padding, const ParentDIA0 &parent0, const ParentDIAs &...parents)
Constructor for a ZipWindowNode.
Definition: zip_window.hpp:201
void PrintCollectiveMeanStdev(const char *text, const Type &local)
Definition: context.hpp:349
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:240
void operator()(const Index &, Parent &parent)
Definition: zip_window.hpp:328
void vexpand(Types &&...)
Definition: vexpand.hpp:24
CatStreamData::CatReader CatReader
Definition: cat_stream.hpp:154
std::array< Reader, kNumInputs > & readers_
reference to the reader array in PushData().
Definition: zip_window.hpp:600
static constexpr size_t kNumInputs
Definition: zip_window.hpp:188
typename ZipWindowTraits< ZipFunction >::value_type_tuple_plain ZipArgsTuple
Definition: zip_window.hpp:194
static constexpr bool stats_enabled
Set this variable to true to enable generation and output of stats.
Definition: zip_window.hpp:180
typename std::tuple_element< i, value_type_tuple >::type value_type
Definition: zip_window.hpp:90
typename ZipWindowTraits< ZipFunction >::template vector_plain< Index > ZipVectorN
Definition: zip_window.hpp:552
ZipWindowTraits< ZipFunction >::vector_tuple_plain vectors_
tuple of std::vector<>s
Definition: zip_window.hpp:531
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
typename std::tuple_element< i, vector_tuple_plain >::type vector_plain
Definition: zip_window.hpp:108
std::tuple< typename std::remove_cv< typename std::remove_reference< Args >::type >::type...> vector_tuple_plain
the tuple of std::vector<>s: with remove_cv and remove_reference applied
Definition: zip_window.hpp:102
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
const ZipArgsTuple padding_
padding for shorter DIAs
Definition: zip_window.hpp:296
const std::array< bool, kNumInputs > parent_stack_empty_
Whether the parent stack is empty.
Definition: zip_window.hpp:299
typename std::tuple_element< i, vector_tuple_plain >::type vector_plain
Definition: zip_window.hpp:163
void unused(Types &&...)
Definition: unused.hpp:20
template for computing the component-wise sum of std::array or std::vector.
Definition: functional.hpp:114
void DoScatter()
Scatter items from DIA "Index" to other workers if necessary.
Definition: zip_window.hpp:358
tag structure for Zip()
Definition: dia.hpp:78
ZipFunction zip_function_
Zip function.
Definition: zip_window.hpp:293
typename ZipWindowTraits< ZipFunction >::template value_type_plain< Index > ZipArgN
Definition: zip_window.hpp:547
size_t result_window_count_
shortest size of Zipped inputs
Definition: zip_window.hpp:317
typename std::remove_cv< typename std::remove_reference< value_type< i > >::type >::type value_type_plain
Definition: zip_window.hpp:97
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1120
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
Register Parent PreOp Hooks, instantiated and called for each Zip parent.
Definition: zip_window.hpp:322
auto ZipWindow(const std::array< size_t, 1+sizeof...(DIAs)> &window_size, const ZipFunction &zip_function, const DIA< FirstDIAType, FirstDIAStack > &first_dia, const DIAs &...dias)
Zips two DIAs of equal size in style of functional programming by applying zip_function to the i-th f...
Definition: zip_window.hpp:622
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
static constexpr bool debug
Definition: zip_window.hpp:177
static std::string VecToStr(const std::array< T, N > &data)
Logging helper to print arrays as [a1,a2,a3,...].
Definition: string.hpp:179
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
auto apply_tuple(Functor &&f, Tuple &&t)
Call the functor f with the contents of t as arguments.
Definition: apply_tuple.hpp:40
bool HasNext()
helper for PushData() which checks all inputs
Definition: zip_window.hpp:559
tag structure for Zip()
Definition: dia.hpp:70
data::CatStreamPtr streams_[kNumInputs]
Array of inbound CatStreams.
Definition: zip_window.hpp:308
ZipWindowReader(ZipWindowNode &zip_node, std::array< Reader, kNumInputs > &readers)
Definition: zip_window.hpp:554
std::array< Reader, kNumInputs > & readers_
reference to the reader array in PushData().
Definition: zip_window.hpp:528
typename ZipWindowNode::ZipFunction ZipFunction
Definition: zip_window.hpp:540
std::tuple< typename std::remove_cv< typename std::remove_reference< Args >::type >::type...> vector_tuple_plain
the tuple of std::vector<>s: with remove_cv and remove_reference applied
Definition: zip_window.hpp:157
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
static constexpr size_t padding
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:280
typename ZipWindowTraits< ZipFunction >::template vector_plain< Index > ZipVectorN
Definition: zip_window.hpp:478
static constexpr bool Pad
Definition: zip_window.hpp:187
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSumTotal(T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, and delivers the total sum as well...
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
ZipWindowReader(ZipWindowNode &zip_node, std::array< Reader, kNumInputs > &readers)
Definition: zip_window.hpp:480
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
std::tuple< typename std::remove_cv< typename std::remove_reference< typename std::remove_cv< typename std::remove_reference< Args >::type >::type::value_type >::type >::type...> value_type_tuple_plain
the tuple of value_types: with remove_cv and remove_reference applied.
Definition: zip_window.hpp:140
std::tuple< typename std::remove_cv< typename std::remove_reference< Args >::type >::type::value_type...> value_type_tuple
the tuple of value_type inside the vectors
Definition: zip_window.hpp:132
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
virtual bool OnPreOpFile(const data::File &, size_t)
Definition: dia_base.hpp:168
const std::vector< DIABasePtr > & parents() const
Returns the parents of this DIABase.
Definition: dia_base.hpp:253
void call_foreach_with_index(Functor &&f, Args &&...args)
void Close()
Explicitly close the writer.
std::vector< data::File > files_
Files for intermediate storage.
Definition: zip_window.hpp:302
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:248
typename ZipWindowTraits< ZipFunction >::template value_type_plain< Index > ZipArgN
Definition: zip_window.hpp:192
void MainOp()
Receive elements from other workers.
Definition: zip_window.hpp:395
typename ZipWindowTraits< ZipFunction >::template value_type_plain< Index > ZipArgN
Definition: zip_window.hpp:473
void AssertValid() const
Assert that the DIA is valid.
Definition: dia.hpp:178
std::array< size_t, kNumInputs > size_prefixsum_
exclusive prefix sum over the number of items in workers
Definition: zip_window.hpp:314
ZipWindowTraits< ZipFunction >::vector_tuple_plain vectors_
tuple of std::vector<>s
Definition: zip_window.hpp:603
bool HasNext()
helper for PushData() which checks all inputs
Definition: zip_window.hpp:485
typename std::remove_cv< typename std::remove_reference< value_type< i > >::type >::type value_type_plain
Definition: zip_window.hpp:152
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
const struct ArrayTag ArrayTag
global const ArrayTag instance
Definition: zip_window.hpp:48
data::File::Writer writers_[kNumInputs]
Writers to intermediate files.
Definition: zip_window.hpp:305
std::tuple< typename std::remove_cv< typename std::remove_reference< typename std::remove_cv< typename std::remove_reference< Args >::type >::type::value_type >::type >::type...> value_type_tuple_plain
the tuple of value_types: with remove_cv and remove_reference applied.
Definition: zip_window.hpp:85
Context & context_
associated Context
Definition: dia_base.hpp:293
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
typename std::tuple_element< i, value_type_tuple >::type value_type
Definition: zip_window.hpp:145