Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
zip.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/zip.hpp
3  *
4  * DIANode for a zip operation. Performs the actual zip operation
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
9  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
10  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_API_ZIP_HEADER
17 #define THRILL_API_ZIP_HEADER
18 
19 #include <thrill/api/dia.hpp>
20 #include <thrill/api/dop_node.hpp>
22 #include <thrill/common/logger.hpp>
23 #include <thrill/common/string.hpp>
24 #include <thrill/data/file.hpp>
25 #include <tlx/meta/apply_tuple.hpp>
28 #include <tlx/meta/vexpand.hpp>
30 
31 #include <algorithm>
32 #include <array>
33 #include <functional>
34 #include <tuple>
35 #include <vector>
36 
37 namespace thrill {
38 namespace api {
39 
40 /*!
41  * A DIANode which performs a Zip operation. Zip combines two DIAs
42  * element-by-element. The ZipNode stores the zip_function UDF. The chainable
43  * LOps are stored in the Stack.
44  *
45  * <pre>
46  * ParentStack0 ParentStack1
47  * +--------+ +--------+
48  * | | | | A ParentStackX is called with
49  * | | | | ParentInputX, and must deliver
50  * | | | | a ZipArgX item.
51  * +-+--------+---+--------+-+
52  * | | PreOp0 | | PreOp1 | |
53  * | +--------+ +--------+ |
54  * DIA<T> --> | Zip |
55  * | +-------+ |
56  * | |PostOp | |
57  * +--------+-------+--------+
58  * | | New DIA<T>::stack_ is started
59  * | | with PostOp to chain next nodes.
60  * +-------+
61  * </pre>
62  *
63  * \tparam ValueType Output type of the Zip operation.
64  *
65  * \tparam ParentDIA0 Function stack, which contains the chained lambdas
66  * between the last and this DIANode for first input DIA.
67  *
68  * \tparam ParentDIA1 Function stack, which contains the chained lambdas
69  * between the last and this DIANode for second input DIA.
70  *
71  * \tparam ZipFunction Type of the ZipFunction.
72  *
73  * \ingroup api_layer
74  */
75 template <typename ValueType, typename ZipFunction,
76  bool Pad, bool UnequalCheck, bool NoRebalance, size_t kNumInputs>
77 class ZipNode final : public DOpNode<ValueType>
78 {
79  static constexpr bool debug = false;
80 
81  //! Set this variable to true to enable generation and output of stats
82  static constexpr bool stats_enabled = false;
83 
85  using Super::context_;
86 
87  template <size_t Index>
88  using ZipArgN =
89  typename common::FunctionTraits<ZipFunction>::template arg_plain<Index>;
90  using ZipArgsTuple =
91  typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
92 
93 public:
94  /*!
95  * Constructor for a ZipNode.
96  */
97  template <typename ParentDIA0, typename... ParentDIAs>
98  ZipNode(const ZipFunction& zip_function, const ZipArgsTuple& padding,
99  const ParentDIA0& parent0, const ParentDIAs& ... parents)
100  : Super(parent0.ctx(), "Zip",
101  { parent0.id(), parents.id() ... },
102  { parent0.node(), parents.node() ... }),
103  zip_function_(zip_function),
104  padding_(padding),
105  // this weirdness is due to a MSVC2015 parser bug
107  std::array<bool, kNumInputs>{
108  { ParentDIA0::stack_empty, (ParentDIAs::stack_empty)... }
109  }) {
110  // allocate files.
111  files_.reserve(kNumInputs);
112  for (size_t i = 0; i < kNumInputs; ++i)
113  files_.emplace_back(context_.GetFile(this));
114 
115  // Hook PreOp(s)
117  RegisterParent(this), parent0, parents...);
118  }
119 
120  void StartPreOp(size_t parent_index) final {
121  writers_[parent_index] = files_[parent_index].GetWriter();
122  }
123 
124  //! Receive a whole data::File of ValueType, but only if our stack is empty.
125  bool OnPreOpFile(const data::File& file, size_t parent_index) final {
126  assert(parent_index < kNumInputs);
127  if (!parent_stack_empty_[parent_index]) {
129  << "Zip rejected File from parent "
130  << "due to non-empty function stack.";
131  return false;
132  }
133 
134  // accept file
135  assert(files_[parent_index].num_items() == 0);
136  files_[parent_index] = file.Copy();
137  return true;
138  }
139 
140  void StopPreOp(size_t parent_index) final {
141  LOG << *this << " StopPreOp() parent_index=" << parent_index;
142  writers_[parent_index].Close();
143  }
144 
145  void Execute() final {
146  MainOp();
147  }
148 
149  void PushData(bool consume) final {
150  size_t result_count = 0;
151 
152  if (result_size_ != 0) {
153  if (NoRebalance) {
154  // get inbound readers from all Streams
155  std::array<data::File::Reader, kNumInputs> readers;
156  for (size_t i = 0; i < kNumInputs; ++i)
157  readers[i] = files_[i].GetReader(consume);
158 
159  ReaderNext<data::File::Reader> reader_next(*this, readers);
160 
161  while (reader_next.HasNext()) {
162  auto v = tlx::vmap_for_range<kNumInputs>(reader_next);
163  this->PushItem(tlx::apply_tuple(zip_function_, v));
164  ++result_count;
165  }
166  }
167  else {
168  // get inbound readers from all Streams
169  std::array<data::CatStream::CatReader, kNumInputs> readers;
170  for (size_t i = 0; i < kNumInputs; ++i)
171  readers[i] = streams_[i]->GetCatReader(consume);
172 
173  ReaderNext<data::CatStream::CatReader> reader_next(*this, readers);
174 
175  while (reader_next.HasNext()) {
176  auto v = tlx::vmap_for_range<kNumInputs>(reader_next);
177  this->PushItem(tlx::apply_tuple(zip_function_, v));
178  ++result_count;
179  }
180  }
181  }
182 
183  if (stats_enabled) {
185  "Zip() result_count", result_count);
186  }
187  }
188 
189  void Dispose() final {
190  files_.clear();
191  }
192 
193 private:
194  //! Zip function
195  ZipFunction zip_function_;
196 
197  //! padding for shorter DIAs
198  const ZipArgsTuple padding_;
199 
200  //! Whether the parent stack is empty
201  const std::array<bool, kNumInputs> parent_stack_empty_;
202 
203  //! Files for intermediate storage
204  std::vector<data::File> files_;
205 
206  //! Writers to intermediate files
208 
209  //! Array of inbound CatStreams
211 
212  //! \name Variables for Calculating Exchange
213  //! \{
214 
215  //! exclusive prefix sum over the number of items in workers
216  std::array<size_t, kNumInputs> size_prefixsum_;
217 
218  //! shortest size of Zipped inputs
219  size_t result_size_;
220 
221  //! \}
222 
223  //! Register Parent PreOp Hooks, instantiated and called for each Zip parent
225  {
226  public:
227  explicit RegisterParent(ZipNode* node) : node_(node) { }
228 
229  template <typename Index, typename Parent>
230  void operator () (const Index&, Parent& parent) {
231 
232  // get the ZipFunction's argument for this index
233  using ZipArg = ZipArgN<Index::index>;
234 
235  // check that the parent's type is convertible to the ZipFunction
236  // argument.
237  static_assert(
239  "ZipFunction argument does not match input DIA");
240 
241  // construct lambda with only the writer in the closure
242  data::File::Writer* writer = &node_->writers_[Index::index];
243  auto pre_op_fn = [writer](const ZipArg& input) -> void {
244  writer->Put(input);
245  };
246 
247  // close the function stacks with our pre ops and register it at
248  // parent nodes for output
249  auto lop_chain = parent.stack().push(pre_op_fn).fold();
250  parent.node()->AddChild(node_, lop_chain, Index::index);
251  }
252 
253  private:
255  };
256 
257  //! Scatter items from DIA "Index" to other workers if necessary.
258  template <size_t Index>
259  void DoScatter() {
260  const size_t workers = context_.num_workers();
261 
262  // range of items on local node
263  size_t local_begin = std::min(result_size_, size_prefixsum_[Index]);
264  size_t local_end = std::min(
265  result_size_, size_prefixsum_[Index] + files_[Index].num_items());
266 
267  // number of elements per worker (double)
268  double per_pe =
269  static_cast<double>(result_size_) / static_cast<double>(workers);
270  // offsets for scattering
271  std::vector<size_t> offsets(workers + 1, 0);
272 
273  for (size_t i = 0; i <= workers; ++i) {
274  // calculate range we have to send to each PE
275  size_t cut = static_cast<size_t>(std::ceil(i * per_pe));
276  offsets[i] =
277  cut < local_begin ? 0 : std::min(cut, local_end) - local_begin;
278  }
279 
280  LOG << "per_pe=" << per_pe
281  << " offsets[" << Index << "] = " << offsets;
282 
283  // target stream id
284  streams_[Index] = context_.GetNewCatStream(this);
285 
286  // scatter elements to other workers, if necessary
287  using ZipArg = ZipArgN<Index>;
288  streams_[Index]->template Scatter<ZipArg>(
289  files_[Index], offsets, /* consume */ true);
290  }
291 
292  //! Receive elements from other workers.
293  void MainOp() {
294  if (NoRebalance) {
295  // no communication: everyone just checks that all input DIAs have
296  // the same local size.
297  result_size_ = files_[0].num_items();
298  for (size_t i = 1; i < kNumInputs; ++i) {
299  if (result_size_ != files_[i].num_items()) {
300  die("Zip() input DIA " << i << " partition does not match.");
301  }
302  }
303  return;
304  }
305 
306  // first: calculate total size of the DIAs to Zip
307 
308  using ArraySizeT = std::array<size_t, kNumInputs>;
309 
310  // number of elements of this worker
311  ArraySizeT local_size;
312  for (size_t i = 0; i < kNumInputs; ++i) {
313  local_size[i] = files_[i].num_items();
314  sLOG << "input" << i << "local_size" << local_size[i];
315 
316  if (stats_enabled) {
318  "Zip() local_size", local_size[i]);
319  }
320  }
321 
322  // exclusive prefixsum of number of elements: we have items from
323  // [size_prefixsum, size_prefixsum + local_size). And get the total
324  // number of items in each DIAs, over all worker.
325  size_prefixsum_ = local_size;
326  ArraySizeT total_size = context_.net.ExPrefixSumTotal(
327  size_prefixsum_, common::ComponentSum<ArraySizeT>());
328 
329  size_t max_total_size =
330  *std::max_element(total_size.begin(), total_size.end());
331 
332  // return only the minimum size of all DIAs.
333  result_size_ =
334  Pad ? max_total_size
335  : *std::min_element(total_size.begin(), total_size.end());
336 
337  // warn if DIAs have unequal size
338  if (!Pad && UnequalCheck && result_size_ != max_total_size) {
339  die("Zip(): input DIAs have unequal size: "
340  << common::VecToStr(total_size));
341  }
342 
343  if (result_size_ == 0) return;
344 
345  // perform scatters to exchange data, with different types.
346  tlx::call_for_range<kNumInputs>(
347  [=](auto index) {
348  (void)index;
349  this->DoScatter<decltype(index)::index>();
350  });
351  }
352 
353  //! Access CatReaders for different different parents.
354  template <typename Reader>
356  {
357  public:
358  ReaderNext(ZipNode& zip_node,
359  std::array<Reader, kNumInputs>& readers)
360  : zip_node_(zip_node), readers_(readers) { }
361 
362  //! helper for PushData() which checks all inputs
363  bool HasNext() {
364  if (Pad) {
365  for (size_t i = 0; i < kNumInputs; ++i) {
366  if (readers_[i].HasNext()) return true;
367  }
368  return false;
369  }
370  else {
371  for (size_t i = 0; i < kNumInputs; ++i) {
372  if (!readers_[i].HasNext()) return false;
373  }
374  return true;
375  }
376  }
377 
378  template <typename Index>
379  auto operator () (const Index&) {
380 
381  // get the ZipFunction's argument for this index
382  using ZipArg = ZipArgN<Index::index>;
383 
384  if (Pad && !readers_[Index::index].HasNext()) {
385  // take padding_ if next is not available.
386  return std::get<Index::index>(zip_node_.padding_);
387  }
388  return readers_[Index::index].template Next<ZipArg>();
389  }
390 
391  private:
393 
394  //! reference to the reader array in PushData().
395  std::array<Reader, kNumInputs>& readers_;
396  };
397 };
398 
399 /*!
400  * Zips two DIAs of equal size in style of functional programming by applying
401  * zip_function to the i-th elements of both input DIAs to form the i-th element
402  * of the output DIA. The type of the output DIA can be inferred from the
403  * zip_function.
404  *
405  * The two input DIAs are required to be of equal size, otherwise use the
406  * CutTag variant.
407  *
408  * \tparam ZipFunction Type of the zip_function. This is a function with two
409  * input elements, both of the local type, and one output element, which is
410  * the type of the Zip node.
411  *
412  * \param zip_function Zip function, which zips two elements together
413  *
414  * \param first_dia the initial DIA.
415  *
416  * \param dias DIAs, which is zipped together with the original DIA.
417  *
418  * \ingroup dia_dops
419  */
420 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
421  typename... DIAs>
422 auto Zip(const ZipFunction& zip_function,
423  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
424  const DIAs& ... dias) {
425 
426  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
427 
428  static_assert(
429  std::is_convertible<
430  FirstDIAType,
431  typename common::FunctionTraits<ZipFunction>::template arg<0>
432  >::value,
433  "ZipFunction has the wrong input type in DIA 0");
434 
435  using ZipResult
436  = typename common::FunctionTraits<ZipFunction>::result_type;
437 
438  using ZipArgsTuple =
439  typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
440 
441  using ZipNode = api::ZipNode<
442  ZipResult, ZipFunction,
443  /* Pad */ false, /* UnequalCheck */ true, /* NoRebalance */ false,
444  1 + sizeof ... (DIAs)>;
445 
446  auto node = tlx::make_counting<ZipNode>(
447  zip_function, ZipArgsTuple(), first_dia, dias...);
448 
449  return DIA<ZipResult>(node);
450 }
451 
452 /*!
453  * Zips any number of DIAs of equal size in style of functional programming by
454  * applying zip_function to the i-th elements of both input DIAs to form the
455  * i-th element of the output DIA. The type of the output DIA can be inferred
456  * from the zip_function.
457  *
458  * If the two input DIAs are of unequal size, the result is the shorter of
459  * both. Otherwise use the PadTag variant.
460  *
461  * \tparam ZipFunction Type of the zip_function. This is a function with two
462  * input elements, both of the local type, and one output element, which is the
463  * type of the Zip node.
464  *
465  * \param zip_function Zip function, which zips two elements together
466  *
467  * \param first_dia the initial DIA.
468  *
469  * \param dias DIAs, which is zipped together with the original DIA.
470  *
471  * \ingroup dia_dops
472  */
473 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
474  typename... DIAs>
475 auto Zip(struct CutTag,
476  const ZipFunction& zip_function,
477  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
478  const DIAs& ... dias) {
479 
480  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
481 
482  static_assert(
483  std::is_convertible<
484  FirstDIAType,
485  typename common::FunctionTraits<ZipFunction>::template arg<0>
486  >::value,
487  "ZipFunction has the wrong input type in DIA 0");
488 
489  using ZipResult
490  = typename common::FunctionTraits<ZipFunction>::result_type;
491 
492  using ZipArgsTuple =
493  typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
494 
495  using ZipNode = api::ZipNode<
496  ZipResult, ZipFunction,
497  /* Pad */ false, /* UnequalCheck */ false, /* NoRebalance */ false,
498  1 + sizeof ... (DIAs)>;
499 
500  auto node = tlx::make_counting<ZipNode>(
501  zip_function, ZipArgsTuple(), first_dia, dias...);
502 
503  return DIA<ZipResult>(node);
504 }
505 
506 /*!
507  * Zips any number of DIAs in style of functional programming by applying
508  * zip_function to the i-th elements of both input DIAs to form the i-th element
509  * of the output DIA. The type of the output DIA can be inferred from the
510  * zip_function.
511  *
512  * The output DIA's length is the *maximum* of all input DIAs, shorter DIAs are
513  * padded with default-constructed items.
514  *
515  * \tparam ZipFunction Type of the zip_function. This is a function with two
516  * input elements, both of the local type, and one output element, which is
517  * the type of the Zip node.
518  *
519  * \param zip_function Zip function, which zips two elements together
520  *
521  * \param first_dia the initial DIA.
522  *
523  * \param dias DIAs, which is zipped together with the first DIA.
524  *
525  * \ingroup dia_dops
526  */
527 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
528  typename... DIAs>
529 auto Zip(struct PadTag,
530  const ZipFunction& zip_function,
531  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
532  const DIAs& ... dias) {
533 
534  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
535 
536  static_assert(
537  std::is_convertible<
538  FirstDIAType,
539  typename common::FunctionTraits<ZipFunction>::template arg<0>
540  >::value,
541  "ZipFunction has the wrong input type in DIA 0");
542 
543  using ZipResult =
544  typename common::FunctionTraits<ZipFunction>::result_type;
545 
546  using ZipArgsTuple =
547  typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
548 
549  using ZipNode = api::ZipNode<
550  ZipResult, ZipFunction,
551  /* Pad */ true, /* UnequalCheck */ false, /* NoRebalance */ false,
552  1 + sizeof ... (DIAs)>;
553 
554  auto node = tlx::make_counting<ZipNode>(
555  zip_function, ZipArgsTuple(), first_dia, dias...);
556 
557  return DIA<ZipResult>(node);
558 }
559 
560 /*!
561  * Zips any number of DIAs in style of functional programming by applying
562  * zip_function to the i-th elements of both input DIAs to form the i-th element
563  * of the output DIA. The type of the output DIA can be inferred from the
564  * zip_function.
565  *
566  * The output DIA's length is the *maximum* of all input DIAs, shorter DIAs are
567  * padded with items given by the padding parameter.
568  *
569  * \tparam ZipFunction Type of the zip_function. This is a function with two
570  * input elements, both of the local type, and one output element, which is
571  * the type of the Zip node.
572  *
573  * \param zip_function Zip function, which zips two elements together
574  *
575  * \param padding std::tuple<args> of padding sentinels delivered to ZipFunction
576  * if an input dia is too short.
577  *
578  * \param first_dia the initial DIA.
579  *
580  * \param dias DIAs, which is zipped together with the original DIA.
581  *
582  * \ingroup dia_dops
583  */
584 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
585  typename... DIAs>
586 auto Zip(
587  struct PadTag,
588  const ZipFunction& zip_function,
589  const typename common::FunctionTraits<ZipFunction>::args_tuple_plain& padding,
590  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
591  const DIAs& ... dias) {
592 
593  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
594 
595  static_assert(
596  std::is_convertible<
597  FirstDIAType,
598  typename common::FunctionTraits<ZipFunction>::template arg<0>
599  >::value,
600  "ZipFunction has the wrong input type in DIA 0");
601 
602  using ZipResult =
603  typename common::FunctionTraits<ZipFunction>::result_type;
604 
605  using ZipNode = api::ZipNode<
606  ZipResult, ZipFunction,
607  /* Pad */ true, /* UnequalCheck */ false, /* NoRebalance */ false,
608  1 + sizeof ... (DIAs)>;
609 
610  auto node = tlx::make_counting<ZipNode>(
611  zip_function, padding, first_dia, dias...);
612 
613  return DIA<ZipResult>(node);
614 }
615 
616 /*!
617  * Zips any number of DIAs in style of functional programming by applying
618  * zip_function to the i-th elements of both input DIAs to form the i-th element
619  * of the output DIA. The type of the output DIA can be inferred from the
620  * zip_function.
621  *
622  * In this variant, the DIA partitions on all PEs must have matching length. No
623  * rebalancing is performed, and the program will die if any partition
624  * mismatches. This enables Zip to proceed without any communication.
625  *
626  * \tparam ZipFunction Type of the zip_function. This is a function with two
627  * input elements, both of the local type, and one output element, which is
628  * the type of the Zip node.
629  *
630  * \param zip_function Zip function, which zips two elements together
631  *
632  * \param first_dia the initial DIA.
633  *
634  * \param dias DIAs, which is zipped together with the original DIA.
635  *
636  * \ingroup dia_dops
637  */
638 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
639  typename... DIAs>
640 auto Zip(
641  struct NoRebalanceTag,
642  const ZipFunction& zip_function,
643  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
644  const DIAs& ... dias) {
645 
646  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
647 
648  static_assert(
649  std::is_convertible<
650  FirstDIAType,
651  typename common::FunctionTraits<ZipFunction>::template arg<0>
652  >::value,
653  "ZipFunction has the wrong input type in DIA 0");
654 
655  using ZipResult =
656  typename common::FunctionTraits<ZipFunction>::result_type;
657 
658  using ZipArgsTuple =
659  typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
660 
661  using ZipNode = api::ZipNode<
662  ZipResult, ZipFunction,
663  /* Pad */ false, /* UnequalCheck */ false, /* NoRebalance */ true,
664  1 + sizeof ... (DIAs)>;
665 
666  auto node = tlx::make_counting<ZipNode>(
667  zip_function, ZipArgsTuple(), first_dia, dias...);
668 
669  return DIA<ZipResult>(node);
670 }
671 
672 template <typename ValueType, typename Stack>
673 template <typename ZipFunction, typename SecondDIA>
675  const SecondDIA& second_dia, const ZipFunction& zip_function) const {
676  return api::Zip(zip_function, *this, second_dia);
677 }
678 
679 template <typename ValueType, typename Stack>
680 template <typename ZipFunction, typename SecondDIA>
682  struct CutTag const&, const SecondDIA& second_dia,
683  const ZipFunction& zip_function) const {
684  return api::Zip(CutTag, zip_function, *this, second_dia);
685 }
686 
687 template <typename ValueType, typename Stack>
688 template <typename ZipFunction, typename SecondDIA>
690  struct PadTag const&, const SecondDIA& second_dia,
691  const ZipFunction& zip_function) const {
692  return api::Zip(PadTag, zip_function, *this, second_dia);
693 }
694 
695 template <typename ValueType, typename Stack>
696 template <typename ZipFunction, typename SecondDIA>
698  struct NoRebalanceTag const&, const SecondDIA& second_dia,
699  const ZipFunction& zip_function) const {
700  return api::Zip(NoRebalanceTag, zip_function, *this, second_dia);
701 }
702 
703 //! \}
704 
705 } // namespace api
706 
707 //! imported from api namespace
708 using api::Zip;
709 
710 } // namespace thrill
711 
712 #endif // !THRILL_API_ZIP_HEADER
713 
714 /******************************************************************************/
const ZipArgsTuple padding_
padding for shorter DIAs
Definition: zip.hpp:102
net::FlowControlChannel & net
Definition: context.hpp:446
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
std::array< Reader, kNumInputs > & readers_
reference to the reader array in PushData().
Definition: zip.hpp:395
static constexpr bool debug
Definition: zip.hpp:79
Register Parent PreOp Hooks, instantiated and called for each Zip parent.
Definition: zip.hpp:224
void PrintCollectiveMeanStdev(const char *text, const Type &local)
Definition: context.hpp:352
const std::array< bool, kNumInputs > parent_stack_empty_
Whether the parent stack is empty.
Definition: zip.hpp:201
Access CatReaders for different different parents.
Definition: zip.hpp:355
void vexpand(Types &&...)
Definition: vexpand.hpp:24
auto Zip(const SecondDIA &second_dia, const ZipFunction &zip_function) const
Zips two DIAs of equal size in style of functional programming by applying zip_function to the i-th e...
Definition: zip.hpp:674
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
static constexpr bool g_debug_push_file
Definition: config.hpp:44
auto operator()(const Index &)
Definition: zip.hpp:379
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
auto Zip(const ZipFunction &zip_function, const DIA< FirstDIAType, FirstDIAStack > &first_dia, const DIAs &...dias)
Zips two DIAs of equal size in style of functional programming by applying zip_function to the i-th e...
Definition: zip.hpp:422
template for computing the component-wise sum of std::array or std::vector.
Definition: functional.hpp:114
tag structure for Zip()
Definition: dia.hpp:78
typename common::FunctionTraits< ZipFunction >::template arg_plain< Index > ZipArgN
Definition: zip.hpp:89
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1147
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
ReaderNext(ZipNode &zip_node, std::array< Reader, kNumInputs > &readers)
Definition: zip.hpp:358
void operator()(const Index &, Parent &parent)
Definition: zip.hpp:230
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
static std::string VecToStr(const std::array< T, N > &data)
Logging helper to print arrays as [a1,a2,a3,...].
Definition: string.hpp:179
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
auto apply_tuple(Functor &&f, Tuple &&t)
Call the functor f with the contents of t as arguments.
Definition: apply_tuple.hpp:40
std::vector< data::File > files_
Files for intermediate storage.
Definition: zip.hpp:204
bool HasNext()
helper for PushData() which checks all inputs
Definition: zip.hpp:363
tag structure for Zip()
Definition: dia.hpp:70
typename common::FunctionTraits< ZipFunction >::args_tuple_plain ZipArgsTuple
Definition: zip.hpp:91
void MainOp()
Receive elements from other workers.
Definition: zip.hpp:293
tag structure for Zip()
Definition: dia.hpp:86
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
static constexpr size_t padding
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:283
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSumTotal(T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, and delivers the total sum as well...
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
void DoScatter()
Scatter items from DIA "Index" to other workers if necessary.
Definition: zip.hpp:259
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
virtual bool OnPreOpFile(const data::File &, size_t)
Definition: dia_base.hpp:168
ZipNode(const ZipFunction &zip_function, const ZipArgsTuple &padding, const ParentDIA0 &parent0, const ParentDIAs &...parents)
Constructor for a ZipNode.
Definition: zip.hpp:98
const std::vector< DIABasePtr > & parents() const
Returns the parents of this DIABase.
Definition: dia_base.hpp:253
void call_foreach_with_index(Functor &&f, Args &&...args)
void Close()
Explicitly close the writer.
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:251
static constexpr bool stats_enabled
Set this variable to true to enable generation and output of stats.
Definition: zip.hpp:82
std::array< size_t, kNumInputs > size_prefixsum_
exclusive prefix sum over the number of items in workers
Definition: zip.hpp:216
data::CatStreamPtr streams_[kNumInputs]
Array of inbound CatStreams.
Definition: zip.hpp:210
void AssertValid() const
Assert that the DIA is valid.
Definition: dia.hpp:178
data::File::Writer writers_[kNumInputs]
Writers to intermediate files.
Definition: zip.hpp:207
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
size_t result_size_
shortest size of Zipped inputs
Definition: zip.hpp:219
Context & context_
associated Context
Definition: dia_base.hpp:293
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
A DIANode which performs a Zip operation.
Definition: zip.hpp:77