Thrill  0.1
zip.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/zip.hpp
3  *
4  * DIANode for a zip operation. Performs the actual zip operation
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
9  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
10  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_API_ZIP_HEADER
17 #define THRILL_API_ZIP_HEADER
18 
19 #include <thrill/api/dia.hpp>
20 #include <thrill/api/dop_node.hpp>
22 #include <thrill/common/logger.hpp>
23 #include <thrill/common/string.hpp>
24 #include <thrill/data/file.hpp>
25 #include <tlx/meta/apply_tuple.hpp>
28 #include <tlx/meta/vexpand.hpp>
30 
31 #include <algorithm>
32 #include <array>
33 #include <functional>
34 #include <tuple>
35 #include <vector>
36 
37 namespace thrill {
38 namespace api {
39 
40 /*!
41  * A DIANode which performs a Zip operation. Zip combines two DIAs
42  * element-by-element. The ZipNode stores the zip_function UDF. The chainable
43  * LOps are stored in the Stack.
44  *
45  * <pre>
46  * ParentStack0 ParentStack1
47  * +--------+ +--------+
48  * | | | | A ParentStackX is called with
49  * | | | | ParentInputX, and must deliver
50  * | | | | a ZipArgX item.
51  * +-+--------+---+--------+-+
52  * | | PreOp0 | | PreOp1 | |
53  * | +--------+ +--------+ |
54  * DIA<T> --> | Zip |
55  * | +-------+ |
56  * | |PostOp | |
57  * +--------+-------+--------+
58  * | | New DIA<T>::stack_ is started
59  * | | with PostOp to chain next nodes.
60  * +-------+
61  * </pre>
62  *
63  * \tparam ValueType Output type of the Zip operation.
64  *
65  * \tparam ParentDIA0 Function stack, which contains the chained lambdas
66  * between the last and this DIANode for first input DIA.
67  *
68  * \tparam ParentDIA1 Function stack, which contains the chained lambdas
69  * between the last and this DIANode for second input DIA.
70  *
71  * \tparam ZipFunction Type of the ZipFunction.
72  *
73  * \ingroup api_layer
74  */
75 template <typename ValueType, typename ZipFunction,
76  bool Pad, bool UnequalCheck, bool NoRebalance, size_t kNumInputs>
77 class ZipNode final : public DOpNode<ValueType>
78 {
79  static constexpr bool debug = false;
80 
81  //! Set this variable to true to enable generation and output of stats
82  static constexpr bool stats_enabled = false;
83 
85  using Super::context_;
86 
87  template <size_t Index>
88  using ZipArgN =
89  typename common::FunctionTraits<ZipFunction>::template arg_plain<Index>;
90  using ZipArgsTuple =
91  typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
92 
93 public:
94  /*!
95  * Constructor for a ZipNode.
96  */
97  template <typename ParentDIA0, typename... ParentDIAs>
98  ZipNode(const ZipFunction& zip_function, const ZipArgsTuple& padding,
99  const ParentDIA0& parent0, const ParentDIAs& ... parents)
100  : Super(parent0.ctx(), "Zip",
101  { parent0.id(), parents.id() ... },
102  { parent0.node(), parents.node() ... }),
103  zip_function_(zip_function),
104  padding_(padding),
105  // this weirdness is due to a MSVC2015 parser bug
107  std::array<bool, kNumInputs>{
108  { ParentDIA0::stack_empty, (ParentDIAs::stack_empty)... }
109  }) {
110  // allocate files.
111  files_.reserve(kNumInputs);
112  for (size_t i = 0; i < kNumInputs; ++i)
113  files_.emplace_back(context_.GetFile(this));
114 
115  // Hook PreOp(s)
117  RegisterParent(this), parent0, parents...);
118  }
119 
120  void StartPreOp(size_t parent_index) final {
121  writers_[parent_index] = files_[parent_index].GetWriter();
122  }
123 
124  //! Receive a whole data::File of ValueType, but only if our stack is empty.
125  bool OnPreOpFile(const data::File& file, size_t parent_index) final {
126  assert(parent_index < kNumInputs);
127  if (!parent_stack_empty_[parent_index]) {
129  << "Zip rejected File from parent "
130  << "due to non-empty function stack.";
131  return false;
132  }
133 
134  // accept file
135  assert(files_[parent_index].num_items() == 0);
136  files_[parent_index] = file.Copy();
137  return true;
138  }
139 
140  void StopPreOp(size_t parent_index) final {
141  LOG << *this << " StopPreOp() parent_index=" << parent_index;
142  writers_[parent_index].Close();
143  }
144 
145  void Execute() final {
146  MainOp();
147  }
148 
149  void PushData(bool consume) final {
150  size_t result_count = 0;
151 
152  if (result_size_ != 0) {
153  if (NoRebalance) {
154  // get inbound readers from all Streams
155  std::array<data::File::Reader, kNumInputs> readers;
156  for (size_t i = 0; i < kNumInputs; ++i)
157  readers[i] = files_[i].GetReader(consume);
158 
159  ReaderNext<data::File::Reader> reader_next(*this, readers);
160 
161  while (reader_next.HasNext()) {
162  auto v = tlx::vmap_for_range<kNumInputs>(reader_next);
164  ++result_count;
165  }
166  }
167  else {
168  // get inbound readers from all Streams
169  std::array<data::CatStream::CatReader, kNumInputs> readers;
170  for (size_t i = 0; i < kNumInputs; ++i)
171  readers[i] = streams_[i]->GetCatReader(consume);
172 
173  ReaderNext<data::CatStream::CatReader> reader_next(*this, readers);
174 
175  while (reader_next.HasNext()) {
176  auto v = tlx::vmap_for_range<kNumInputs>(reader_next);
178  ++result_count;
179  }
180  }
181  }
182 
183  if (stats_enabled) {
185  "Zip() result_count", result_count);
186  }
187  }
188 
189  void Dispose() final {
190  files_.clear();
191  }
192 
193 private:
194  //! Zip function
195  ZipFunction zip_function_;
196 
197  //! padding for shorter DIAs
199 
200  //! Whether the parent stack is empty
201  const std::array<bool, kNumInputs> parent_stack_empty_;
202 
203  //! Files for intermediate storage
204  std::vector<data::File> files_;
205 
206  //! Writers to intermediate files
208 
209  //! Array of inbound CatStreams
211 
212  //! \name Variables for Calculating Exchange
213  //! \{
214 
215  //! exclusive prefix sum over the number of items in workers
216  std::array<size_t, kNumInputs> size_prefixsum_;
217 
218  //! shortest size of Zipped inputs
219  size_t result_size_;
220 
221  //! \}
222 
223  //! Register Parent PreOp Hooks, instantiated and called for each Zip parent
225  {
226  public:
227  explicit RegisterParent(ZipNode* node) : node_(node) { }
228 
229  template <typename Index, typename Parent>
230  void operator () (const Index&, Parent& parent) {
231 
232  // get the ZipFunction's argument for this index
233  using ZipArg = ZipArgN<Index::index>;
234 
235  // check that the parent's type is convertible to the ZipFunction
236  // argument.
237  static_assert(
239  "ZipFunction argument does not match input DIA");
240 
241  // construct lambda with only the writer in the closure
242  data::File::Writer* writer = &node_->writers_[Index::index];
243  auto pre_op_fn = [writer](const ZipArg& input) -> void {
244  writer->Put(input);
245  };
246 
247  // close the function stacks with our pre ops and register it at
248  // parent nodes for output
249  auto lop_chain = parent.stack().push(pre_op_fn).fold();
250  parent.node()->AddChild(node_, lop_chain, Index::index);
251  }
252 
253  private:
255  };
256 
257  //! Scatter items from DIA "Index" to other workers if necessary.
258  template <size_t Index>
259  void DoScatter() {
260  const size_t workers = context_.num_workers();
261 
262  // range of items on local node
263  size_t local_begin = std::min(result_size_, size_prefixsum_[Index]);
264  size_t local_end = std::min(
265  result_size_, size_prefixsum_[Index] + files_[Index].num_items());
266 
267  // number of elements per worker (double)
268  double per_pe =
269  static_cast<double>(result_size_) / static_cast<double>(workers);
270  // offsets for scattering
271  std::vector<size_t> offsets(workers + 1, 0);
272 
273  for (size_t i = 0; i <= workers; ++i) {
274  // calculate range we have to send to each PE
275  size_t cut = static_cast<size_t>(std::ceil(i * per_pe));
276  offsets[i] =
277  cut < local_begin ? 0 : std::min(cut, local_end) - local_begin;
278  }
279 
280  LOG << "per_pe=" << per_pe
281  << " offsets[" << Index << "] = " << offsets;
282 
283  // target stream id
284  streams_[Index] = context_.GetNewCatStream(this);
285 
286  // scatter elements to other workers, if necessary
287  using ZipArg = ZipArgN<Index>;
288  streams_[Index]->template ScatterConsume<ZipArg>(
289  files_[Index], offsets);
290  }
291 
292  //! Receive elements from other workers.
293  void MainOp() {
294  if (NoRebalance) {
295  // no communication: everyone just checks that all input DIAs have
296  // the same local size.
297  result_size_ = files_[0].num_items();
298  for (size_t i = 1; i < kNumInputs; ++i) {
299  if (result_size_ != files_[i].num_items()) {
300  die("Zip() input DIA " << i << " partition does not match.");
301  }
302  }
303  return;
304  }
305 
306  // first: calculate total size of the DIAs to Zip
307 
308  using ArraySizeT = std::array<size_t, kNumInputs>;
309 
310  // number of elements of this worker
311  ArraySizeT local_size;
312  for (size_t i = 0; i < kNumInputs; ++i) {
313  local_size[i] = files_[i].num_items();
314  sLOG << "input" << i << "local_size" << local_size[i];
315 
316  if (stats_enabled) {
318  "Zip() local_size", local_size[i]);
319  }
320  }
321 
322  // exclusive prefixsum of number of elements: we have items from
323  // [size_prefixsum, size_prefixsum + local_size). And get the total
324  // number of items in each DIAs, over all worker.
325  size_prefixsum_ = local_size;
326  ArraySizeT total_size = context_.net.ExPrefixSumTotal(
327  size_prefixsum_, common::ComponentSum<ArraySizeT>());
328 
329  size_t max_total_size =
330  *std::max_element(total_size.begin(), total_size.end());
331 
332  // return only the minimum size of all DIAs.
333  result_size_ =
334  Pad ? max_total_size
335  : *std::min_element(total_size.begin(), total_size.end());
336 
337  // warn if DIAs have unequal size
338  if (!Pad && UnequalCheck && result_size_ != max_total_size) {
339  die("Zip(): input DIAs have unequal size: "
340  << common::VecToStr(total_size));
341  }
342 
343  if (result_size_ == 0) return;
344 
345  // perform scatters to exchange data, with different types.
346  tlx::call_for_range<kNumInputs>(
347  [=](auto index) {
348  (void)index;
349  this->DoScatter<decltype(index)::index>();
350  });
351  }
352 
353  //! Access CatReaders for different different parents.
354  template <typename Reader>
356  {
357  public:
358  ReaderNext(ZipNode& zip_node,
359  std::array<Reader, kNumInputs>& readers)
360  : zip_node_(zip_node), readers_(readers) { }
361 
362  //! helper for PushData() which checks all inputs
363  bool HasNext() {
364  if (Pad) {
365  for (size_t i = 0; i < kNumInputs; ++i) {
366  if (readers_[i].HasNext()) return true;
367  }
368  return false;
369  }
370  else {
371  for (size_t i = 0; i < kNumInputs; ++i) {
372  if (!readers_[i].HasNext()) return false;
373  }
374  return true;
375  }
376  }
377 
378  template <typename Index>
379  auto operator () (const Index&) {
380 
381  // get the ZipFunction's argument for this index
382  using ZipArg = ZipArgN<Index::index>;
383 
384  if (Pad && !readers_[Index::index].HasNext()) {
385  // take padding_ if next is not available.
386  return std::get<Index::index>(zip_node_.padding_);
387  }
388  return readers_[Index::index].template Next<ZipArg>();
389  }
390 
391  private:
393 
394  //! reference to the reader array in PushData().
395  std::array<Reader, kNumInputs>& readers_;
396  };
397 };
398 
399 /******************************************************************************/
400 
401 /*!
402  * Zips two DIAs of equal size in style of functional programming by applying
403  * zip_function to the i-th elements of both input DIAs to form the i-th element
404  * of the output DIA. The type of the output DIA can be inferred from the
405  * zip_function.
406  *
407  * \image html dia_ops/Zip.svg
408  *
409  * The two input DIAs are required to be of equal size, otherwise use the
410  * CutTag variant.
411  *
412  * \tparam ZipFunction Type of the zip_function. This is a function with two
413  * input elements, both of the local type, and one output element, which is
414  * the type of the Zip node.
415  *
416  * \param zip_function Zip function, which zips two elements together
417  *
418  * \param first_dia the initial DIA.
419  *
420  * \param dias DIAs, which is zipped together with the original DIA.
421  *
422  * \ingroup dia_dops_free
423  */
424 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
425  typename... DIAs>
426 auto Zip(const ZipFunction& zip_function,
427  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
428  const DIAs& ... dias) {
429 
430  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
431 
432  static_assert(
433  std::is_convertible<
434  FirstDIAType,
435  typename common::FunctionTraits<ZipFunction>::template arg<0>
436  >::value,
437  "ZipFunction has the wrong input type in DIA 0");
438 
439  using ZipResult
440  = typename common::FunctionTraits<ZipFunction>::result_type;
441 
442  using ZipArgsTuple =
443  typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
444 
445  using ZipNode = api::ZipNode<
446  ZipResult, ZipFunction,
447  /* Pad */ false, /* UnequalCheck */ true, /* NoRebalance */ false,
448  1 + sizeof ... (DIAs)>;
449 
450  auto node = tlx::make_counting<ZipNode>(
451  zip_function, ZipArgsTuple(), first_dia, dias...);
452 
453  return DIA<ZipResult>(node);
454 }
455 
456 /*!
457  * Zips any number of DIAs of equal size in style of functional programming by
458  * applying zip_function to the i-th elements of both input DIAs to form the
459  * i-th element of the output DIA. The type of the output DIA can be inferred
460  * from the zip_function.
461  *
462  * \image html dia_ops/Zip.svg
463  *
464  * If the two input DIAs are of unequal size, the result is the shorter of
465  * both. Otherwise use the PadTag variant.
466  *
467  * \tparam ZipFunction Type of the zip_function. This is a function with two
468  * input elements, both of the local type, and one output element, which is the
469  * type of the Zip node.
470  *
471  * \param zip_function Zip function, which zips two elements together
472  *
473  * \param first_dia the initial DIA.
474  *
475  * \param dias DIAs, which is zipped together with the original DIA.
476  *
477  * \ingroup dia_dops_free
478  */
479 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
480  typename... DIAs>
481 auto Zip(struct CutTag,
482  const ZipFunction& zip_function,
483  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
484  const DIAs& ... dias) {
485 
486  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
487 
488  static_assert(
489  std::is_convertible<
490  FirstDIAType,
491  typename common::FunctionTraits<ZipFunction>::template arg<0>
492  >::value,
493  "ZipFunction has the wrong input type in DIA 0");
494 
495  using ZipResult
496  = typename common::FunctionTraits<ZipFunction>::result_type;
497 
498  using ZipArgsTuple =
499  typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
500 
501  using ZipNode = api::ZipNode<
502  ZipResult, ZipFunction,
503  /* Pad */ false, /* UnequalCheck */ false, /* NoRebalance */ false,
504  1 + sizeof ... (DIAs)>;
505 
506  auto node = tlx::make_counting<ZipNode>(
507  zip_function, ZipArgsTuple(), first_dia, dias...);
508 
509  return DIA<ZipResult>(node);
510 }
511 
512 /*!
513  * Zips any number of DIAs in style of functional programming by applying
514  * zip_function to the i-th elements of both input DIAs to form the i-th element
515  * of the output DIA. The type of the output DIA can be inferred from the
516  * zip_function.
517  *
518  * \image html dia_ops/Zip.svg
519  *
520  * The output DIA's length is the *maximum* of all input DIAs, shorter DIAs are
521  * padded with default-constructed items.
522  *
523  * \tparam ZipFunction Type of the zip_function. This is a function with two
524  * input elements, both of the local type, and one output element, which is
525  * the type of the Zip node.
526  *
527  * \param zip_function Zip function, which zips two elements together
528  *
529  * \param first_dia the initial DIA.
530  *
531  * \param dias DIAs, which is zipped together with the first DIA.
532  *
533  * \ingroup dia_dops_free
534  */
535 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
536  typename... DIAs>
537 auto Zip(struct PadTag,
538  const ZipFunction& zip_function,
539  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
540  const DIAs& ... dias) {
541 
542  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
543 
544  static_assert(
545  std::is_convertible<
546  FirstDIAType,
547  typename common::FunctionTraits<ZipFunction>::template arg<0>
548  >::value,
549  "ZipFunction has the wrong input type in DIA 0");
550 
551  using ZipResult =
552  typename common::FunctionTraits<ZipFunction>::result_type;
553 
554  using ZipArgsTuple =
555  typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
556 
557  using ZipNode = api::ZipNode<
558  ZipResult, ZipFunction,
559  /* Pad */ true, /* UnequalCheck */ false, /* NoRebalance */ false,
560  1 + sizeof ... (DIAs)>;
561 
562  auto node = tlx::make_counting<ZipNode>(
563  zip_function, ZipArgsTuple(), first_dia, dias...);
564 
565  return DIA<ZipResult>(node);
566 }
567 
568 /*!
569  * Zips any number of DIAs in style of functional programming by applying
570  * zip_function to the i-th elements of both input DIAs to form the i-th element
571  * of the output DIA. The type of the output DIA can be inferred from the
572  * zip_function.
573  *
574  * \image html dia_ops/Zip.svg
575  *
576  * The output DIA's length is the *maximum* of all input DIAs, shorter DIAs are
577  * padded with items given by the padding parameter.
578  *
579  * \tparam ZipFunction Type of the zip_function. This is a function with two
580  * input elements, both of the local type, and one output element, which is
581  * the type of the Zip node.
582  *
583  * \param zip_function Zip function, which zips two elements together
584  *
585  * \param padding std::tuple<args> of padding sentinels delivered to ZipFunction
586  * if an input dia is too short.
587  *
588  * \param first_dia the initial DIA.
589  *
590  * \param dias DIAs, which is zipped together with the original DIA.
591  *
592  * \ingroup dia_dops_free
593  */
594 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
595  typename... DIAs>
596 auto Zip(
597  struct PadTag,
598  const ZipFunction& zip_function,
599  const typename common::FunctionTraits<ZipFunction>::args_tuple_plain& padding,
600  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
601  const DIAs& ... dias) {
602 
603  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
604 
605  static_assert(
606  std::is_convertible<
607  FirstDIAType,
608  typename common::FunctionTraits<ZipFunction>::template arg<0>
609  >::value,
610  "ZipFunction has the wrong input type in DIA 0");
611 
612  using ZipResult =
613  typename common::FunctionTraits<ZipFunction>::result_type;
614 
615  using ZipNode = api::ZipNode<
616  ZipResult, ZipFunction,
617  /* Pad */ true, /* UnequalCheck */ false, /* NoRebalance */ false,
618  1 + sizeof ... (DIAs)>;
619 
620  auto node = tlx::make_counting<ZipNode>(
621  zip_function, padding, first_dia, dias...);
622 
623  return DIA<ZipResult>(node);
624 }
625 
626 /*!
627  * Zips any number of DIAs in style of functional programming by applying
628  * zip_function to the i-th elements of both input DIAs to form the i-th element
629  * of the output DIA. The type of the output DIA can be inferred from the
630  * zip_function.
631  *
632  * \image html dia_ops/Zip.svg
633  *
634  * In this variant, the DIA partitions on all PEs must have matching length. No
635  * rebalancing is performed, and the program will die if any partition
636  * mismatches. This enables Zip to proceed without any communication.
637  *
638  * \tparam ZipFunction Type of the zip_function. This is a function with two
639  * input elements, both of the local type, and one output element, which is
640  * the type of the Zip node.
641  *
642  * \param zip_function Zip function, which zips two elements together
643  *
644  * \param first_dia the initial DIA.
645  *
646  * \param dias DIAs, which is zipped together with the original DIA.
647  *
648  * \ingroup dia_dops_free
649  */
650 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
651  typename... DIAs>
652 auto Zip(
653  struct NoRebalanceTag,
654  const ZipFunction& zip_function,
655  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
656  const DIAs& ... dias) {
657 
658  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
659 
660  static_assert(
661  std::is_convertible<
662  FirstDIAType,
663  typename common::FunctionTraits<ZipFunction>::template arg<0>
664  >::value,
665  "ZipFunction has the wrong input type in DIA 0");
666 
667  using ZipResult =
668  typename common::FunctionTraits<ZipFunction>::result_type;
669 
670  using ZipArgsTuple =
671  typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
672 
673  using ZipNode = api::ZipNode<
674  ZipResult, ZipFunction,
675  /* Pad */ false, /* UnequalCheck */ false, /* NoRebalance */ true,
676  1 + sizeof ... (DIAs)>;
677 
678  auto node = tlx::make_counting<ZipNode>(
679  zip_function, ZipArgsTuple(), first_dia, dias...);
680 
681  return DIA<ZipResult>(node);
682 }
683 
684 template <typename ValueType, typename Stack>
685 template <typename ZipFunction, typename SecondDIA>
687  const SecondDIA& second_dia, const ZipFunction& zip_function) const {
688  return api::Zip(zip_function, *this, second_dia);
689 }
690 
691 template <typename ValueType, typename Stack>
692 template <typename ZipFunction, typename SecondDIA>
694  struct CutTag const&, const SecondDIA& second_dia,
695  const ZipFunction& zip_function) const {
696  return api::Zip(CutTag, zip_function, *this, second_dia);
697 }
698 
699 template <typename ValueType, typename Stack>
700 template <typename ZipFunction, typename SecondDIA>
702  struct PadTag const&, const SecondDIA& second_dia,
703  const ZipFunction& zip_function) const {
704  return api::Zip(PadTag, zip_function, *this, second_dia);
705 }
706 
707 template <typename ValueType, typename Stack>
708 template <typename ZipFunction, typename SecondDIA>
710  struct NoRebalanceTag const&, const SecondDIA& second_dia,
711  const ZipFunction& zip_function) const {
712  return api::Zip(NoRebalanceTag, zip_function, *this, second_dia);
713 }
714 
715 } // namespace api
716 
717 //! imported from api namespace
718 using api::Zip;
719 
720 } // namespace thrill
721 
722 #endif // !THRILL_API_ZIP_HEADER
723 
724 /******************************************************************************/
const ZipArgsTuple padding_
padding for shorter DIAs
Definition: zip.hpp:198
void call_foreach_with_index(Functor &&f, Args &&... args)
net::FlowControlChannel & net
Definition: context.hpp:446
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
std::array< Reader, kNumInputs > & readers_
reference to the reader array in PushData().
Definition: zip.hpp:395
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
static constexpr bool debug
Definition: zip.hpp:79
Register Parent PreOp Hooks, instantiated and called for each Zip parent.
Definition: zip.hpp:224
void PrintCollectiveMeanStdev(const char *text, const Type &local)
Definition: context.hpp:352
const std::array< bool, kNumInputs > parent_stack_empty_
Whether the parent stack is empty.
Definition: zip.hpp:201
Access CatReaders for different different parents.
Definition: zip.hpp:355
void vexpand(Types &&...)
Definition: vexpand.hpp:24
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:251
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
const std::vector< DIABasePtr > & parents() const
Returns the parents of this DIABase.
Definition: dia_base.hpp:253
void AssertValid() const
Assert that the DIA is valid.
Definition: dia.hpp:178
void StartPreOp(size_t parent_index) final
Virtual method for preparing start of PushData.
Definition: zip.hpp:120
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
static constexpr bool g_debug_push_file
Definition: config.hpp:44
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
template for computing the component-wise sum of std::array or std::vector.
Definition: functional.hpp:94
void StopPreOp(size_t parent_index) final
Virtual method for preparing end of PushData.
Definition: zip.hpp:140
tag structure for Zip()
Definition: dia.hpp:78
typename common::FunctionTraits< ZipFunction >::template arg_plain< Index > ZipArgN
Definition: zip.hpp:89
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1209
ReaderNext(ZipNode &zip_node, std::array< Reader, kNumInputs > &readers)
Definition: zip.hpp:358
void operator()(const Index &, Parent &parent)
Definition: zip.hpp:230
auto Zip(const ZipFunction &zip_function, const DIA< FirstDIAType, FirstDIAStack > &first_dia, const DIAs &... dias)
Zips two DIAs of equal size in style of functional programming by applying zip_function to the i-th e...
Definition: zip.hpp:426
static std::string VecToStr(const std::array< T, N > &data)
Logging helper to print arrays as [a1,a2,a3,...].
Definition: string.hpp:121
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
Definition: zip.hpp:149
int value
Definition: gen_data.py:41
auto apply_tuple(Functor &&f, Tuple &&t)
Call the functor f with the contents of t as arguments.
Definition: apply_tuple.hpp:40
std::vector< data::File > files_
Files for intermediate storage.
Definition: zip.hpp:204
bool HasNext()
helper for PushData() which checks all inputs
Definition: zip.hpp:363
tag structure for Zip()
Definition: dia.hpp:70
void Execute() final
Virtual execution method. Triggers actual computation in sub-classes.
Definition: zip.hpp:145
typename common::FunctionTraits< ZipFunction >::args_tuple_plain ZipArgsTuple
Definition: zip.hpp:91
bool OnPreOpFile(const data::File &file, size_t parent_index) final
Receive a whole data::File of ValueType, but only if our stack is empty.
Definition: zip.hpp:125
void MainOp()
Receive elements from other workers.
Definition: zip.hpp:293
tag structure for Zip()
Definition: dia.hpp:86
static constexpr size_t padding
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:283
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSumTotal(T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, and delivers the total sum as well...
ZipFunction zip_function_
Zip function.
Definition: zip.hpp:195
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
void DoScatter()
Scatter items from DIA "Index" to other workers if necessary.
Definition: zip.hpp:259
auto Zip(const SecondDIA &second_dia, const ZipFunction &zip_function) const
Zips two DIAs of equal size in style of functional programming by applying zip_function to the i-th e...
Definition: zip.hpp:686
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
void Close()
Explicitly close the writer.
static constexpr bool stats_enabled
Set this variable to true to enable generation and output of stats.
Definition: zip.hpp:82
ZipNode(const ZipFunction &zip_function, const ZipArgsTuple &padding, const ParentDIA0 &parent0, const ParentDIAs &... parents)
Constructor for a ZipNode.
Definition: zip.hpp:98
std::array< size_t, kNumInputs > size_prefixsum_
exclusive prefix sum over the number of items in workers
Definition: zip.hpp:216
data::CatStreamPtr streams_[kNumInputs]
Array of inbound CatStreams.
Definition: zip.hpp:210
data::File::Writer writers_[kNumInputs]
Writers to intermediate files.
Definition: zip.hpp:207
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
size_t result_size_
shortest size of Zipped inputs
Definition: zip.hpp:219
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: zip.hpp:189
Context & context_
associated Context
Definition: dia_base.hpp:293
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
A DIANode which performs a Zip operation.
Definition: zip.hpp:77