Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
zip_window.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/zip_window.hpp
3  *
4  * DIANode for a zip operation. Performs the actual zip operation
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
9  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
10  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_API_ZIP_WINDOW_HEADER
17 #define THRILL_API_ZIP_WINDOW_HEADER
18 
19 #include <thrill/api/dia.hpp>
20 #include <thrill/api/dop_node.hpp>
22 #include <thrill/common/logger.hpp>
23 #include <thrill/common/string.hpp>
24 #include <thrill/data/file.hpp>
25 #include <tlx/meta/apply_tuple.hpp>
28 #include <tlx/meta/vexpand.hpp>
30 
31 #include <algorithm>
32 #include <array>
33 #include <functional>
34 #include <tuple>
35 #include <vector>
36 
37 namespace thrill {
38 namespace api {
39 
40 /******************************************************************************/
41 
42 //! tag structure for ZipWindow()
43 struct ArrayTag {
44  ArrayTag() { }
45 };
46 
47 //! global const ArrayTag instance
48 const struct ArrayTag ArrayTag;
49 
50 /******************************************************************************/
51 // ZipWindowTraits - Helper to create std::tuple<std::vector<Args> ...>
52 
53 #ifndef THRILL_DOXYGEN_IGNORE
54 
55 // taken from: http://stackoverflow.com/questions/7943525/
56 // is-it-possible-to-figure-out-the-parameter-type-and-return-type-of-a-lambda
57 template <typename T>
58 struct ZipWindowTraits : public ZipWindowTraits<decltype(&T::operator ())> { };
59 // For generic types, directly use the result of the signature of its 'operator()'
60 
61 #endif
62 
63 //! specialize for pointers to const member function
64 template <typename ClassType, typename ReturnType, typename... Args>
65 struct ZipWindowTraits<ReturnType (ClassType::*)(Args...) const> {
66 
67  //! arity is the number of arguments.
68  static constexpr size_t arity = sizeof ... (Args);
69 
70  using result_type = ReturnType;
71  using is_const = std::true_type;
72 
73  //! the tuple of value_type inside the vectors
74  using value_type_tuple = std::tuple<
75  typename std::remove_cv<
76  typename std::remove_reference<Args>::type
77  >::type::value_type...>;
78 
79  //! the tuple of value_types: with remove_cv and remove_reference applied.
80  using value_type_tuple_plain = std::tuple<
81  typename std::remove_cv<
82  typename std::remove_reference<
83  typename std::remove_cv<
84  typename std::remove_reference<Args>::type
85  >::type::value_type>::type>::type...>;
86 
87  //! the i-th argument is equivalent to the i-th tuple element of a tuple
88  //! composed of those arguments.
89  template <size_t i>
90  using value_type = typename std::tuple_element<i, value_type_tuple>::type;
91 
92  //! return i-th argument reduced to plain type: remove_cv and
93  //! remove_reference.
94  template <size_t i>
95  using value_type_plain =
96  typename std::remove_cv<
97  typename std::remove_reference<value_type<i> >::type>::type;
98 
99  //! the tuple of std::vector<>s: with remove_cv and remove_reference applied
100  using vector_tuple_plain = std::tuple<
101  typename std::remove_cv<
102  typename std::remove_reference<Args>::type>::type...>;
103 
104  //! the i-th argument is equivalent to the i-th tuple element of a tuple
105  //! composed of those arguments.
106  template <size_t i>
107  using vector_plain = typename std::tuple_element<
109 };
110 
111 //! specialize for pointers to mutable member function
112 template <typename ClassType, typename ReturnType, typename... Args>
113 struct ZipWindowTraits<ReturnType (ClassType::*)(Args...)>
114  : public ZipWindowTraits<ReturnType (ClassType::*)(Args...) const> {
115  using is_const = std::false_type;
116 };
117 
118 //! specialize for function pointers
119 template <typename ReturnType, typename... Args>
120 struct ZipWindowTraits<ReturnType (*)(Args...)> {
121 
122  //! arity is the number of arguments.
123  static constexpr size_t arity = sizeof ... (Args);
124 
125  using result_type = ReturnType;
126  using is_const = std::true_type;
127 
128  //! the tuple of value_type inside the vectors
129  using value_type_tuple = std::tuple<
130  typename std::remove_cv<
131  typename std::remove_reference<Args>::type
132  >::type::value_type...>;
133 
134  //! the tuple of value_types: with remove_cv and remove_reference applied.
135  using value_type_tuple_plain = std::tuple<
136  typename std::remove_cv<
137  typename std::remove_reference<
138  typename std::remove_cv<
139  typename std::remove_reference<Args>::type
140  >::type::value_type>::type>::type...>;
141 
142  //! the i-th argument is equivalent to the i-th tuple element of a tuple
143  //! composed of those arguments.
144  template <size_t i>
145  using value_type = typename std::tuple_element<i, value_type_tuple>::type;
146 
147  //! return i-th argument reduced to plain type: remove_cv and
148  //! remove_reference.
149  template <size_t i>
150  using value_type_plain =
151  typename std::remove_cv<
152  typename std::remove_reference<value_type<i> >::type>::type;
153 
154  //! the tuple of std::vector<>s: with remove_cv and remove_reference applied
155  using vector_tuple_plain = std::tuple<
156  typename std::remove_cv<
157  typename std::remove_reference<Args>::type>::type...>;
158 
159  //! the i-th argument is equivalent to the i-th tuple element of a tuple
160  //! composed of those arguments.
161  template <size_t i>
162  using vector_plain = typename std::tuple_element<
164 };
165 
166 /******************************************************************************/
167 
168 template <typename ZipWindowNode, bool UseArray>
170 
171 /******************************************************************************/
172 
173 template <typename ValueType, typename ZipFunction_,
174  bool Pad_, bool UnequalCheck, bool UseStdArray, size_t kNumInputs_>
175 class ZipWindowNode final : public DOpNode<ValueType>
176 {
177  static constexpr bool debug = false;
178 
179  //! Set this variable to true to enable generation and output of stats
180  static constexpr bool stats_enabled = false;
181 
183  using Super::context_;
184 
185 public:
186  using ZipFunction = ZipFunction_;
187  static constexpr bool Pad = Pad_;
188  static constexpr size_t kNumInputs = kNumInputs_;
189 
190  template <size_t Index>
191  using ZipArgN =
192  typename ZipWindowTraits<ZipFunction>::template value_type_plain<Index>;
193  using ZipArgsTuple =
194  typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain;
195 
196 public:
197  /*!
198  * Constructor for a ZipWindowNode.
199  */
200  template <typename ParentDIA0, typename... ParentDIAs>
201  ZipWindowNode(const std::array<size_t, kNumInputs>& window_size,
202  const ZipFunction& zip_function, const ZipArgsTuple& padding,
203  const ParentDIA0& parent0, const ParentDIAs& ... parents)
204  : Super(parent0.ctx(), "ZipWindow",
205  { parent0.id(), parents.id() ... },
206  { parent0.node(), parents.node() ... }),
207  window_size_(window_size),
208  zip_function_(zip_function),
209  padding_(padding),
210  // this weirdness is due to a MSVC2015 parser bug
212  std::array<bool, kNumInputs>{
213  { ParentDIA0::stack_empty, (ParentDIAs::stack_empty)... }
214  }) {
215  // allocate files.
216  files_.reserve(kNumInputs);
217  for (size_t i = 0; i < kNumInputs; ++i)
218  files_.emplace_back(context_.GetFile(this));
219 
220  // Hook PreOp(s)
222  RegisterParent(this), parent0, parents...);
223  }
224 
225  void StartPreOp(size_t parent_index) final {
226  writers_[parent_index] = files_[parent_index].GetWriter();
227  }
228 
229  //! Receive a whole data::File of ValueType, but only if our stack is empty.
230  bool OnPreOpFile(const data::File& file, size_t parent_index) final {
231  assert(parent_index < kNumInputs);
232  if (!parent_stack_empty_[parent_index]) {
233  LOGC(context_.my_rank() == 0)
234  << "ZipWindow rejected File from parent "
235  << "due to non-empty function stack.";
236  return false;
237  }
238 
239  // accept file
240  LOGC(context_.my_rank() == 0)
241  << "ZipWindow accepted File from parent " << parent_index;
242  assert(files_[parent_index].num_items() == 0);
243  files_[parent_index] = file.Copy();
244  return true;
245  }
246 
247  void StopPreOp(size_t parent_index) final {
248  LOG << *this << " StopPreOp() parent_index=" << parent_index;
249  writers_[parent_index].Close();
250  }
251 
252  void Execute() final {
253  MainOp();
254  }
255 
256  void PushData(bool consume) final {
257  size_t result_count = 0;
258 
259  if (result_window_count_ != 0) {
260  // get inbound readers from all Streams
261  std::array<data::CatStream::CatReader, kNumInputs> readers;
262  for (size_t i = 0; i < kNumInputs; ++i)
263  readers[i] = streams_[i]->GetCatReader(consume);
264 
265  ZipWindowReader<ZipWindowNode, UseStdArray> reader_next(
266  *this, readers);
267 
268  while (reader_next.HasNext()) {
269  auto v = tlx::vmap_for_range<kNumInputs>(reader_next);
271  ++result_count;
272  }
273  }
274 
275  if (stats_enabled) {
277  "ZipWindow() result_count", result_count);
278  }
279  }
280 
281  void Dispose() final {
282  files_.clear();
283  for (size_t i = 0; i < kNumInputs; ++i)
284  streams_[i].reset();
285  }
286 
287 private:
288  //! Size k of the windows
289  const std::array<size_t, kNumInputs> window_size_;
290 
291  //! Zip function
293 
294  //! padding for shorter DIAs
296 
297  //! Whether the parent stack is empty
298  const std::array<bool, kNumInputs> parent_stack_empty_;
299 
300  //! Files for intermediate storage
301  std::vector<data::File> files_;
302 
303  //! Writers to intermediate files
305 
306  //! Array of inbound CatStreams
308 
309  //! \name Variables for Calculating Exchange
310  //! \{
311 
312  //! exclusive prefix sum over the number of items in workers
313  std::array<size_t, kNumInputs> size_prefixsum_;
314 
315  //! shortest size of Zipped inputs
317 
318  //! \}
319 
320  //! Register Parent PreOp Hooks, instantiated and called for each Zip parent
322  {
323  public:
324  explicit RegisterParent(ZipWindowNode* node) : node_(node) { }
325 
326  template <typename Index, typename Parent>
327  void operator () (const Index&, Parent& parent) {
328 
329  // get the ZipFunction's argument for this index
330  using ZipArg = ZipArgN<Index::index>;
331 
332  // check that the parent's type is convertible to the ZipFunction
333  // argument.
334 
335  static_assert(
337  "ZipFunction argument does not match input DIA");
338 
339  // construct lambda with only the writer in the closure
340  data::File::Writer* writer = &node_->writers_[Index::index];
341  auto pre_op_fn = [writer](const ZipArg& input) -> void {
342  writer->Put(input);
343  };
344 
345  // close the function stacks with our pre ops and register it at
346  // parent nodes for output
347  auto lop_chain = parent.stack().push(pre_op_fn).fold();
348  parent.node()->AddChild(node_, lop_chain, Index::index);
349  }
350 
351  private:
353  };
354 
355  //! Scatter items from DIA "Index" to other workers if necessary.
356  template <size_t Index>
357  void DoScatter() {
358  const size_t workers = context_.num_workers();
359 
360  size_t result_size = result_window_count_ * window_size_[Index];
361 
362  // range of items on local node
363  size_t local_begin = std::min(result_size, size_prefixsum_[Index]);
364  size_t local_end = std::min(
365  result_size, size_prefixsum_[Index] + files_[Index].num_items());
366 
367  // number of elements per worker (double)
368  double per_pe =
369  static_cast<double>(result_window_count_) / static_cast<double>(workers);
370  // offsets for scattering
371  std::vector<size_t> offsets(workers + 1, 0);
372 
373  for (size_t i = 0; i <= workers; ++i) {
374  // calculate range we have to send to each PE
375  size_t cut =
376  static_cast<size_t>(std::ceil(i * per_pe)) * window_size_[Index];
377  offsets[i] =
378  cut < local_begin ? 0 : std::min(cut, local_end) - local_begin;
379  }
380 
381  LOG << "per_pe=" << per_pe
382  << " offsets[" << Index << "] = " << offsets;
383 
384  // target stream id
385  streams_[Index] = context_.GetNewCatStream(this);
386 
387  // scatter elements to other workers, if necessary
388  using ZipArg = ZipArgN<Index>;
389  streams_[Index]->template ScatterConsume<ZipArg>(
390  files_[Index], offsets);
391  }
392 
393  //! Receive elements from other workers.
394  void MainOp() {
395  // first: calculate total size of the DIAs to Zip
396 
397  using ArraySizeT = std::array<size_t, kNumInputs>;
398 
399  // number of elements of this worker
400  ArraySizeT local_size;
401  for (size_t i = 0; i < kNumInputs; ++i) {
402  local_size[i] = files_[i].num_items();
403  sLOG << "input" << i << "local_size" << local_size[i];
404 
405  if (stats_enabled) {
407  "ZipWindow() local_size", local_size[i]);
408  }
409  }
410 
411  // exclusive prefixsum of number of elements: we have items from
412  // [size_prefixsum, size_prefixsum + local_size). And get the total
413  // number of items in each DIAs, over all worker.
414  size_prefixsum_ = local_size;
415  ArraySizeT total_size = context_.net.ExPrefixSumTotal(
416  size_prefixsum_, common::ComponentSum<ArraySizeT>());
417 
418  // calculate number of full windows in each DIA
419  ArraySizeT total_window_count;
420  for (size_t i = 0; i < kNumInputs; ++i) {
421  total_window_count[i] =
422  (total_size[i] + window_size_[i] - 1) / window_size_[i];
423  }
424 
425  size_t max_total_window_count = *std::max_element(
426  total_window_count.begin(), total_window_count.end());
427 
428  // return only the minimum window count of all DIAs.
429  result_window_count_ =
430  Pad ? max_total_window_count : *std::min_element(
431  total_window_count.begin(), total_window_count.end());
432 
433  sLOG << "ZipWindow() total_size" << total_size
434  << "total_window_count" << total_window_count
435  << "max_total_window_count" << max_total_window_count
436  << "result_window_count_" << result_window_count_;
437 
438  // warn if DIAs have unequal window size
439  if (!Pad && UnequalCheck && result_window_count_ != max_total_window_count) {
440  die("ZipWindow(): input DIAs have unequal size: "
441  << common::VecToStr(total_size));
442  }
443 
444  if (result_window_count_ == 0) return;
445 
446  // perform scatters to exchange data, with different types.
447  tlx::call_for_range<kNumInputs>(
448  [=](auto index) {
449  tlx::unused(index);
450  this->DoScatter<decltype(index)::index>();
451  });
452  }
453 
454  //! for access to internal members
455  template <typename ZipWindowNode, bool UseArray>
456  friend class ZipWindowReader;
457 };
458 
459 //! template specialization Reader which delivers std::vector<>s to ZipFunction
460 template <typename ZipWindowNode>
461 class ZipWindowReader<ZipWindowNode, /* UseArray */ false>
462 {
463 public:
466  static constexpr size_t Pad = ZipWindowNode::Pad;
467  static constexpr size_t kNumInputs = ZipWindowNode::kNumInputs;
468 
469  template <size_t Index>
470  using ZipArgN =
471  typename ZipWindowTraits<ZipFunction>
472  ::template value_type_plain<Index>;
473 
474  template <size_t Index>
475  using ZipVectorN =
476  typename ZipWindowTraits<ZipFunction>
477  ::template vector_plain<Index>;
478 
480  std::array<Reader, kNumInputs>& readers)
481  : zip_node_(zip_node), readers_(readers) { }
482 
483  //! helper for PushData() which checks all inputs
484  bool HasNext() {
485  if (Pad) {
486  for (size_t i = 0; i < kNumInputs; ++i) {
487  if (readers_[i].HasNext()) return true;
488  }
489  return false;
490  }
491  else {
492  for (size_t i = 0; i < kNumInputs; ++i) {
493  if (!readers_[i].HasNext()) return false;
494  }
495  return true;
496  }
497  }
498 
499  template <typename Index>
500  const ZipVectorN<Index::index>& operator () (const Index&) {
501 
502  // get the ZipFunction's argument for this index
503  using ZipArg = ZipArgN<Index::index>;
504 
505  std::vector<ZipArg>& vec = std::get<Index::index>(vectors_);
506  vec.clear();
507 
508  for (size_t i = 0; i < zip_node_.window_size_[Index::index]; ++i) {
509  if (Pad && !readers_[Index::index].HasNext()) {
510  // take padding_ if next is not available.
511  vec.emplace_back(
512  std::get<Index::index>(zip_node_.padding_));
513  }
514  else {
515  vec.emplace_back(
516  readers_[Index::index].template Next<ZipArg>());
517  }
518  }
519 
520  return vec;
521  }
522 
523 private:
525 
526  //! reference to the reader array in PushData().
527  std::array<Reader, kNumInputs>& readers_;
528 
529  //! tuple of std::vector<>s
530  typename ZipWindowTraits<ZipFunction>::vector_tuple_plain vectors_;
531 };
532 
533 //! template specialization Reader which delivers std::array<>s to ZipFunction
534 template <typename ZipWindowNode>
535 class ZipWindowReader<ZipWindowNode, /* UseArray */ true>
536 {
537 public:
540  static constexpr size_t Pad = ZipWindowNode::Pad;
541  static constexpr size_t kNumInputs = ZipWindowNode::kNumInputs;
542 
543  template <size_t Index>
544  using ZipArgN =
545  typename ZipWindowTraits<ZipFunction>
546  ::template value_type_plain<Index>;
547 
548  template <size_t Index>
549  using ZipVectorN =
550  typename ZipWindowTraits<ZipFunction>
551  ::template vector_plain<Index>;
552 
554  std::array<Reader, kNumInputs>& readers)
555  : zip_node_(zip_node), readers_(readers) { }
556 
557  //! helper for PushData() which checks all inputs
558  bool HasNext() {
559  if (Pad) {
560  for (size_t i = 0; i < kNumInputs; ++i) {
561  if (readers_[i].HasNext()) return true;
562  }
563  return false;
564  }
565  else {
566  for (size_t i = 0; i < kNumInputs; ++i) {
567  if (!readers_[i].HasNext()) return false;
568  }
569  return true;
570  }
571  }
572 
573  template <typename Index>
574  const ZipVectorN<Index::index>& operator () (const Index&) {
575 
576  // get the ZipFunction's argument for this index
577  using ZipArg = ZipArgN<Index::index>;
578  using ZipVector = ZipVectorN<Index::index>;
579 
580  ZipVector& vec = std::get<Index::index>(vectors_);
581 
582  for (size_t i = 0; i < zip_node_.window_size_[Index::index]; ++i) {
583  if (Pad && !readers_[Index::index].HasNext()) {
584  // take padding_ if next is not available.
585  vec[i] = std::get<Index::index>(zip_node_.padding_);
586  }
587  else {
588  vec[i] = readers_[Index::index].template Next<ZipArg>();
589  }
590  }
591 
592  return vec;
593  }
594 
595 private:
597 
598  //! reference to the reader array in PushData().
599  std::array<Reader, kNumInputs>& readers_;
600 
601  //! tuple of std::vector<>s
602  typename ZipWindowTraits<ZipFunction>::vector_tuple_plain vectors_;
603 };
604 
605 /******************************************************************************/
606 
607 /*!
608  * Zips two DIAs of equal size in style of functional programming by applying
609  * zip_function to the i-th fixed-sized windows of both input DIAs to form the
610  * i-th element of the output DIA. The input DIAs length must be multiples of
611  * the corresponding window size. The type of the output DIA can be inferred
612  * from the zip_function.
613  *
614  * The two input DIAs are required to be of equal window multiples, otherwise
615  * use the CutTag variant.
616  *
617  * \ingroup dia_dops
618  */
619 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
620  typename... DIAs>
621 auto ZipWindow(const std::array<size_t, 1 + sizeof ... (DIAs)>& window_size,
622  const ZipFunction& zip_function,
623  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
624  const DIAs& ... dias) {
625 
626  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
627 
628  static_assert(
629  std::is_convertible<
630  std::vector<FirstDIAType>,
631  typename ZipWindowTraits<ZipFunction>::template vector_plain<0>
632  >::value,
633  "ZipFunction has the wrong input type in DIA 0");
634 
635  using ZipResult
636  = typename ZipWindowTraits<ZipFunction>::result_type;
637 
638  using ZipArgsTuple =
639  typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain;
640 
642  ZipResult, ZipFunction,
643  /* Pad */ false, /* UnequalCheck */ true,
644  /* UseStdArray */ false,
645  1 + sizeof ... (DIAs)>;
646 
647  auto node = tlx::make_counting<ZipWindowNode>(
648  window_size, zip_function, ZipArgsTuple(), first_dia, dias...);
649 
650  return DIA<ZipResult>(node);
651 }
652 
653 /*!
654  * Zips two DIAs of equal size in style of functional programming by applying
655  * zip_function to the i-th fixed-sized windows of both input DIAs to form the
656  * i-th element of the output DIA. The input DIAs length must be multiples of
657  * the corresponding window size. The type of the output DIA can be inferred
658  * from the zip_function.
659  *
660  * If the two input DIAs are of unequal size, the result is the shorter of
661  * both. Otherwise use the PadTag variant.
662  *
663  * \ingroup dia_dops
664  */
665 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
666  typename... DIAs>
667 auto ZipWindow(struct CutTag,
668  const std::array<size_t, 1 + sizeof ... (DIAs)>& window_size,
669  const ZipFunction& zip_function,
670  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
671  const DIAs& ... dias) {
672 
673  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
674 
675  static_assert(
676  std::is_convertible<
677  std::vector<FirstDIAType>,
678  typename ZipWindowTraits<ZipFunction>::template vector_plain<0>
679  >::value,
680  "ZipFunction has the wrong input type in DIA 0");
681 
682  using ZipResult
683  = typename ZipWindowTraits<ZipFunction>::result_type;
684 
685  using ZipArgsTuple =
686  typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain;
687 
689  ZipResult, ZipFunction,
690  /* Pad */ false, /* UnequalCheck */ false,
691  /* UseStdArray */ false,
692  1 + sizeof ... (DIAs)>;
693 
694  auto node = tlx::make_counting<ZipWindowNode>(
695  window_size, zip_function, ZipArgsTuple(), first_dia, dias...);
696 
697  return DIA<ZipResult>(node);
698 }
699 
700 /*!
701  * Zips two DIAs of equal size in style of functional programming by applying
702  * zip_function to the i-th fixed-sized windows of both input DIAs to form the
703  * i-th element of the output DIA. The input DIAs length must be multiples of
704  * the corresponding window size. The type of the output DIA can be inferred
705  * from the zip_function.
706  *
707  * The output DIA's length is the *maximum* of all input DIAs, shorter DIAs are
708  * padded with items given by the padding parameter.
709  *
710  * \ingroup dia_dops
711  */
712 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
713  typename... DIAs>
715  struct PadTag,
716  const std::array<size_t, 1 + sizeof ... (DIAs)>& window_size,
717  const ZipFunction& zip_function,
718  const typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain& padding,
719  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
720  const DIAs& ... dias) {
721 
722  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
723 
724  static_assert(
725  std::is_convertible<
726  std::vector<FirstDIAType>,
727  typename ZipWindowTraits<ZipFunction>::template vector_plain<0>
728  >::value,
729  "ZipFunction has the wrong input type in DIA 0");
730 
731  using ZipResult =
732  typename common::FunctionTraits<ZipFunction>::result_type;
733 
735  ZipResult, ZipFunction,
736  /* Pad */ true, /* UnequalCheck */ false,
737  /* UseStdArray */ false,
738  1 + sizeof ... (DIAs)>;
739 
740  auto node = tlx::make_counting<ZipWindowNode>(
741  window_size, zip_function, padding, first_dia, dias...);
742 
743  return DIA<ZipResult>(node);
744 }
745 
746 /*!
747  * Zips two DIAs of equal size in style of functional programming by applying
748  * zip_function to the i-th fixed-sized windows of both input DIAs to form the
749  * i-th element of the output DIA. The input DIAs length must be multiples of
750  * the corresponding window size. The type of the output DIA can be inferred
751  * from the zip_function.
752  *
753  * The output DIA's length is the *maximum* of all input DIAs, shorter DIAs are
754  * padded with default-constructed items.
755  *
756  * \ingroup dia_dops
757  */
758 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
759  typename... DIAs>
760 auto ZipWindow(struct PadTag,
761  const std::array<size_t, 1 + sizeof ... (DIAs)>& window_size,
762  const ZipFunction& zip_function,
763  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
764  const DIAs& ... dias) {
765 
766  using ZipArgsTuple =
767  typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain;
768 
769  return ZipWindow(PadTag, window_size, zip_function,
770  ZipArgsTuple(), first_dia, dias...);
771 }
772 
773 /******************************************************************************/
774 
775 /*!
776  * Zips two DIAs of equal size in style of functional programming by applying
777  * zip_function to the i-th fixed-sized windows of both input DIAs to form the
778  * i-th element of the output DIA. The input DIAs length must be multiples of
779  * the corresponding window size. The type of the output DIA can be inferred
780  * from the zip_function.
781  *
782  * The output DIA's length is the *maximum* of all input DIAs, shorter DIAs are
783  * padded with items given by the padding parameter.
784  *
785  * \ingroup dia_dops
786  */
787 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
788  typename... DIAs>
790  struct ArrayTag,
791  struct PadTag,
792  const std::array<size_t, 1 + sizeof ... (DIAs)>& window_size,
793  const ZipFunction& zip_function,
794  const typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain& padding,
795  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
796  const DIAs& ... dias) {
797 
798  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
799 
800  // static_assert(
801  // std::is_convertible<
802  // std::array<FirstDIAType, >,
803  // typename ZipWindowTraits<ZipFunction>::template vector_plain<0>
804  // >::value,
805  // "ZipFunction has the wrong input type in DIA 0");
806 
807  using ZipResult =
808  typename common::FunctionTraits<ZipFunction>::result_type;
809 
811  ZipResult, ZipFunction,
812  /* Pad */ true, /* UnequalCheck */ false,
813  /* UseStdArray */ true,
814  1 + sizeof ... (DIAs)>;
815 
816  auto node = tlx::make_counting<ZipWindowNode>(
817  window_size, zip_function, padding, first_dia, dias...);
818 
819  return DIA<ZipResult>(node);
820 }
821 
822 /*!
823  * Zips two DIAs of equal size in style of functional programming by applying
824  * zip_function to the i-th fixed-sized windows of both input DIAs to form the
825  * i-th element of the output DIA. The input DIAs length must be multiples of
826  * the corresponding window size. The type of the output DIA can be inferred
827  * from the zip_function.
828  *
829  * The output DIA's length is the *maximum* of all input DIAs, shorter DIAs are
830  * padded with default constructed items.
831  *
832  * \ingroup dia_dops
833  */
834 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
835  typename... DIAs>
837  struct ArrayTag,
838  struct PadTag,
839  const std::array<size_t, 1 + sizeof ... (DIAs)>& window_size,
840  const ZipFunction& zip_function,
841  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
842  const DIAs& ... dias) {
843 
844  using ZipArgsTuple =
845  typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain;
846 
847  return ZipWindow(ArrayTag, PadTag, window_size, zip_function,
848  ZipArgsTuple(), first_dia, dias...);
849 }
850 
851 /******************************************************************************/
852 
853 //! \}
854 
855 } // namespace api
856 
857 //! imported from api namespace
858 using api::ZipWindow;
859 
860 //! imported from api namespace
861 using api::ArrayTag;
862 
863 } // namespace thrill
864 
865 #endif // !THRILL_API_ZIP_WINDOW_HEADER
866 
867 /******************************************************************************/
typename ZipWindowNode::ZipFunction ZipFunction
Definition: zip_window.hpp:465
tag structure for ZipWindow()
Definition: zip_window.hpp:43
net::FlowControlChannel & net
Definition: context.hpp:446
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
std::tuple< typename std::remove_cv< typename std::remove_reference< Args >::type >::type::value_type...> value_type_tuple
the tuple of value_type inside the vectors
Definition: zip_window.hpp:77
ZipWindowNode(const std::array< size_t, kNumInputs > &window_size, const ZipFunction &zip_function, const ZipArgsTuple &padding, const ParentDIA0 &parent0, const ParentDIAs &...parents)
Constructor for a ZipWindowNode.
Definition: zip_window.hpp:201
void PrintCollectiveMeanStdev(const char *text, const Type &local)
Definition: context.hpp:352
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
void operator()(const Index &, Parent &parent)
Definition: zip_window.hpp:327
void vexpand(Types &&...)
Definition: vexpand.hpp:24
CatStreamData::CatReader CatReader
Definition: cat_stream.hpp:154
std::array< Reader, kNumInputs > & readers_
reference to the reader array in PushData().
Definition: zip_window.hpp:599
static constexpr size_t kNumInputs
Definition: zip_window.hpp:188
typename ZipWindowTraits< ZipFunction >::value_type_tuple_plain ZipArgsTuple
Definition: zip_window.hpp:194
static constexpr bool stats_enabled
Set this variable to true to enable generation and output of stats.
Definition: zip_window.hpp:180
typename std::tuple_element< i, value_type_tuple >::type value_type
Definition: zip_window.hpp:90
typename ZipWindowTraits< ZipFunction >::template vector_plain< Index > ZipVectorN
Definition: zip_window.hpp:551
ZipWindowTraits< ZipFunction >::vector_tuple_plain vectors_
tuple of std::vector<>s
Definition: zip_window.hpp:530
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
typename std::tuple_element< i, vector_tuple_plain >::type vector_plain
Definition: zip_window.hpp:108
std::tuple< typename std::remove_cv< typename std::remove_reference< Args >::type >::type...> vector_tuple_plain
the tuple of std::vector<>s: with remove_cv and remove_reference applied
Definition: zip_window.hpp:102
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
const ZipArgsTuple padding_
padding for shorter DIAs
Definition: zip_window.hpp:295
const std::array< bool, kNumInputs > parent_stack_empty_
Whether the parent stack is empty.
Definition: zip_window.hpp:298
typename std::tuple_element< i, vector_tuple_plain >::type vector_plain
Definition: zip_window.hpp:163
void unused(Types &&...)
Definition: unused.hpp:20
template for computing the component-wise sum of std::array or std::vector.
Definition: functional.hpp:114
void DoScatter()
Scatter items from DIA "Index" to other workers if necessary.
Definition: zip_window.hpp:357
tag structure for Zip()
Definition: dia.hpp:78
ZipFunction zip_function_
Zip function.
Definition: zip_window.hpp:292
typename ZipWindowTraits< ZipFunction >::template value_type_plain< Index > ZipArgN
Definition: zip_window.hpp:546
size_t result_window_count_
shortest size of Zipped inputs
Definition: zip_window.hpp:316
typename std::remove_cv< typename std::remove_reference< value_type< i > >::type >::type value_type_plain
Definition: zip_window.hpp:97
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1151
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
Register Parent PreOp Hooks, instantiated and called for each Zip parent.
Definition: zip_window.hpp:321
auto ZipWindow(const std::array< size_t, 1+sizeof...(DIAs)> &window_size, const ZipFunction &zip_function, const DIA< FirstDIAType, FirstDIAStack > &first_dia, const DIAs &...dias)
Zips two DIAs of equal size in style of functional programming by applying zip_function to the i-th f...
Definition: zip_window.hpp:621
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
static constexpr bool debug
Definition: zip_window.hpp:177
static std::string VecToStr(const std::array< T, N > &data)
Logging helper to print arrays as [a1,a2,a3,...].
Definition: string.hpp:121
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
auto apply_tuple(Functor &&f, Tuple &&t)
Call the functor f with the contents of t as arguments.
Definition: apply_tuple.hpp:40
bool HasNext()
helper for PushData() which checks all inputs
Definition: zip_window.hpp:558
tag structure for Zip()
Definition: dia.hpp:70
data::CatStreamPtr streams_[kNumInputs]
Array of inbound CatStreams.
Definition: zip_window.hpp:307
ZipWindowReader(ZipWindowNode &zip_node, std::array< Reader, kNumInputs > &readers)
Definition: zip_window.hpp:553
std::array< Reader, kNumInputs > & readers_
reference to the reader array in PushData().
Definition: zip_window.hpp:527
typename ZipWindowNode::ZipFunction ZipFunction
Definition: zip_window.hpp:539
std::tuple< typename std::remove_cv< typename std::remove_reference< Args >::type >::type...> vector_tuple_plain
the tuple of std::vector<>s: with remove_cv and remove_reference applied
Definition: zip_window.hpp:157
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
static constexpr size_t padding
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:283
typename ZipWindowTraits< ZipFunction >::template vector_plain< Index > ZipVectorN
Definition: zip_window.hpp:477
static constexpr bool Pad
Definition: zip_window.hpp:187
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSumTotal(T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, and delivers the total sum as well...
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
ZipWindowReader(ZipWindowNode &zip_node, std::array< Reader, kNumInputs > &readers)
Definition: zip_window.hpp:479
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
std::tuple< typename std::remove_cv< typename std::remove_reference< typename std::remove_cv< typename std::remove_reference< Args >::type >::type::value_type >::type >::type...> value_type_tuple_plain
the tuple of value_types: with remove_cv and remove_reference applied.
Definition: zip_window.hpp:140
std::tuple< typename std::remove_cv< typename std::remove_reference< Args >::type >::type::value_type...> value_type_tuple
the tuple of value_type inside the vectors
Definition: zip_window.hpp:132
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
virtual bool OnPreOpFile(const data::File &, size_t)
Definition: dia_base.hpp:168
const std::vector< DIABasePtr > & parents() const
Returns the parents of this DIABase.
Definition: dia_base.hpp:253
void call_foreach_with_index(Functor &&f, Args &&...args)
void Close()
Explicitly close the writer.
std::vector< data::File > files_
Files for intermediate storage.
Definition: zip_window.hpp:301
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:251
typename ZipWindowTraits< ZipFunction >::template value_type_plain< Index > ZipArgN
Definition: zip_window.hpp:192
void MainOp()
Receive elements from other workers.
Definition: zip_window.hpp:394
typename ZipWindowTraits< ZipFunction >::template value_type_plain< Index > ZipArgN
Definition: zip_window.hpp:472
void AssertValid() const
Assert that the DIA is valid.
Definition: dia.hpp:178
std::array< size_t, kNumInputs > size_prefixsum_
exclusive prefix sum over the number of items in workers
Definition: zip_window.hpp:313
ZipWindowTraits< ZipFunction >::vector_tuple_plain vectors_
tuple of std::vector<>s
Definition: zip_window.hpp:602
bool HasNext()
helper for PushData() which checks all inputs
Definition: zip_window.hpp:484
typename std::remove_cv< typename std::remove_reference< value_type< i > >::type >::type value_type_plain
Definition: zip_window.hpp:152
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
const struct ArrayTag ArrayTag
global const ArrayTag instance
Definition: zip_window.hpp:48
data::File::Writer writers_[kNumInputs]
Writers to intermediate files.
Definition: zip_window.hpp:304
std::tuple< typename std::remove_cv< typename std::remove_reference< typename std::remove_cv< typename std::remove_reference< Args >::type >::type::value_type >::type >::type...> value_type_tuple_plain
the tuple of value_types: with remove_cv and remove_reference applied.
Definition: zip_window.hpp:85
Context & context_
associated Context
Definition: dia_base.hpp:293
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
typename std::tuple_element< i, value_type_tuple >::type value_type
Definition: zip_window.hpp:145