Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
zip.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/zip.hpp
3  *
4  * DIANode for a zip operation. Performs the actual zip operation
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
9  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
10  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_API_ZIP_HEADER
17 #define THRILL_API_ZIP_HEADER
18 
19 #include <thrill/api/dia.hpp>
20 #include <thrill/api/dop_node.hpp>
22 #include <thrill/common/logger.hpp>
23 #include <thrill/common/string.hpp>
24 #include <thrill/data/file.hpp>
25 #include <tlx/meta/apply_tuple.hpp>
28 #include <tlx/meta/vexpand.hpp>
30 
31 #include <algorithm>
32 #include <array>
33 #include <functional>
34 #include <tuple>
35 #include <vector>
36 
37 namespace thrill {
38 namespace api {
39 
40 /*!
41  * A DIANode which performs a Zip operation. Zip combines two DIAs
42  * element-by-element. The ZipNode stores the zip_function UDF. The chainable
43  * LOps are stored in the Stack.
44  *
45  * <pre>
46  * ParentStack0 ParentStack1
47  * +--------+ +--------+
48  * | | | | A ParentStackX is called with
49  * | | | | ParentInputX, and must deliver
50  * | | | | a ZipArgX item.
51  * +-+--------+---+--------+-+
52  * | | PreOp0 | | PreOp1 | |
53  * | +--------+ +--------+ |
54  * DIA<T> --> | Zip |
55  * | +-------+ |
56  * | |PostOp | |
57  * +--------+-------+--------+
58  * | | New DIA<T>::stack_ is started
59  * | | with PostOp to chain next nodes.
60  * +-------+
61  * </pre>
62  *
63  * \tparam ValueType Output type of the Zip operation.
64  *
65  * \tparam ParentDIA0 Function stack, which contains the chained lambdas
66  * between the last and this DIANode for first input DIA.
67  *
68  * \tparam ParentDIA1 Function stack, which contains the chained lambdas
69  * between the last and this DIANode for second input DIA.
70  *
71  * \tparam ZipFunction Type of the ZipFunction.
72  *
73  * \ingroup api_layer
74  */
75 template <typename ValueType, typename ZipFunction,
76  bool Pad, bool UnequalCheck, bool NoRebalance, size_t kNumInputs>
77 class ZipNode final : public DOpNode<ValueType>
78 {
79  static constexpr bool debug = false;
80 
81  //! Set this variable to true to enable generation and output of stats
82  static constexpr bool stats_enabled = false;
83 
85  using Super::context_;
86 
87  template <size_t Index>
88  using ZipArgN =
89  typename common::FunctionTraits<ZipFunction>::template arg_plain<Index>;
90  using ZipArgsTuple =
91  typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
92 
93 public:
94  /*!
95  * Constructor for a ZipNode.
96  */
97  template <typename ParentDIA0, typename... ParentDIAs>
98  ZipNode(const ZipFunction& zip_function, const ZipArgsTuple& padding,
99  const ParentDIA0& parent0, const ParentDIAs& ... parents)
100  : Super(parent0.ctx(), "Zip",
101  { parent0.id(), parents.id() ... },
102  { parent0.node(), parents.node() ... }),
103  zip_function_(zip_function),
104  padding_(padding),
105  // this weirdness is due to a MSVC2015 parser bug
107  std::array<bool, kNumInputs>{
108  { ParentDIA0::stack_empty, (ParentDIAs::stack_empty)... }
109  })
110  {
111  // allocate files.
112  files_.reserve(kNumInputs);
113  for (size_t i = 0; i < kNumInputs; ++i)
114  files_.emplace_back(context_.GetFile(this));
115 
116  // Hook PreOp(s)
118  RegisterParent(this), parent0, parents...);
119  }
120 
121  void StartPreOp(size_t parent_index) final {
122  writers_[parent_index] = files_[parent_index].GetWriter();
123  }
124 
125  //! Receive a whole data::File of ValueType, but only if our stack is empty.
126  bool OnPreOpFile(const data::File& file, size_t parent_index) final {
127  assert(parent_index < kNumInputs);
128  if (!parent_stack_empty_[parent_index]) {
130  << "Zip rejected File from parent "
131  << "due to non-empty function stack.";
132  return false;
133  }
134 
135  // accept file
136  assert(files_[parent_index].num_items() == 0);
137  files_[parent_index] = file.Copy();
138  return true;
139  }
140 
141  void StopPreOp(size_t parent_index) final {
142  LOG << *this << " StopPreOp() parent_index=" << parent_index;
143  writers_[parent_index].Close();
144  }
145 
146  void Execute() final {
147  MainOp();
148  }
149 
150  void PushData(bool consume) final {
151  size_t result_count = 0;
152 
153  if (result_size_ != 0) {
154  if (NoRebalance) {
155  // get inbound readers from all Streams
156  std::array<data::File::Reader, kNumInputs> readers;
157  for (size_t i = 0; i < kNumInputs; ++i)
158  readers[i] = files_[i].GetReader(consume);
159 
160  ReaderNext<data::File::Reader> reader_next(*this, readers);
161 
162  while (reader_next.HasNext()) {
163  auto v = tlx::vmap_for_range<kNumInputs>(reader_next);
164  this->PushItem(tlx::apply_tuple(zip_function_, v));
165  ++result_count;
166  }
167  }
168  else {
169  // get inbound readers from all Streams
170  std::array<data::CatStream::CatReader, kNumInputs> readers;
171  for (size_t i = 0; i < kNumInputs; ++i)
172  readers[i] = streams_[i]->GetCatReader(consume);
173 
174  ReaderNext<data::CatStream::CatReader> reader_next(*this, readers);
175 
176  while (reader_next.HasNext()) {
177  auto v = tlx::vmap_for_range<kNumInputs>(reader_next);
178  this->PushItem(tlx::apply_tuple(zip_function_, v));
179  ++result_count;
180  }
181  }
182  }
183 
184  if (stats_enabled) {
186  "Zip() result_count", result_count);
187  }
188  }
189 
190  void Dispose() final {
191  files_.clear();
192  }
193 
194 private:
195  //! Zip function
196  ZipFunction zip_function_;
197 
198  //! padding for shorter DIAs
199  const ZipArgsTuple padding_;
200 
201  //! Whether the parent stack is empty
202  const std::array<bool, kNumInputs> parent_stack_empty_;
203 
204  //! Files for intermediate storage
205  std::vector<data::File> files_;
206 
207  //! Writers to intermediate files
209 
210  //! Array of inbound CatStreams
212 
213  //! \name Variables for Calculating Exchange
214  //! \{
215 
216  //! exclusive prefix sum over the number of items in workers
217  std::array<size_t, kNumInputs> size_prefixsum_;
218 
219  //! shortest size of Zipped inputs
220  size_t result_size_;
221 
222  //! \}
223 
224  //! Register Parent PreOp Hooks, instantiated and called for each Zip parent
226  {
227  public:
228  explicit RegisterParent(ZipNode* node) : node_(node) { }
229 
230  template <typename Index, typename Parent>
231  void operator () (const Index&, Parent& parent) {
232 
233  // get the ZipFunction's argument for this index
234  using ZipArg = ZipArgN<Index::index>;
235 
236  // check that the parent's type is convertible to the ZipFunction
237  // argument.
238  static_assert(
240  "ZipFunction argument does not match input DIA");
241 
242  // construct lambda with only the writer in the closure
243  data::File::Writer* writer = &node_->writers_[Index::index];
244  auto pre_op_fn = [writer](const ZipArg& input) -> void {
245  writer->Put(input);
246  };
247 
248  // close the function stacks with our pre ops and register it at
249  // parent nodes for output
250  auto lop_chain = parent.stack().push(pre_op_fn).fold();
251  parent.node()->AddChild(node_, lop_chain, Index::index);
252  }
253 
254  private:
256  };
257 
258  //! Scatter items from DIA "Index" to other workers if necessary.
259  template <size_t Index>
260  void DoScatter() {
261  const size_t workers = context_.num_workers();
262 
263  // range of items on local node
264  size_t local_begin = std::min(result_size_, size_prefixsum_[Index]);
265  size_t local_end = std::min(
266  result_size_, size_prefixsum_[Index] + files_[Index].num_items());
267 
268  // number of elements per worker (double)
269  double per_pe =
270  static_cast<double>(result_size_) / static_cast<double>(workers);
271  // offsets for scattering
272  std::vector<size_t> offsets(workers + 1, 0);
273 
274  for (size_t i = 0; i <= workers; ++i) {
275  // calculate range we have to send to each PE
276  size_t cut = static_cast<size_t>(std::ceil(i * per_pe));
277  offsets[i] =
278  cut < local_begin ? 0 : std::min(cut, local_end) - local_begin;
279  }
280 
281  LOG << "per_pe=" << per_pe
282  << " offsets[" << Index << "] = " << offsets;
283 
284  // target stream id
285  streams_[Index] = context_.GetNewCatStream(this);
286 
287  // scatter elements to other workers, if necessary
288  using ZipArg = ZipArgN<Index>;
289  streams_[Index]->template Scatter<ZipArg>(
290  files_[Index], offsets, /* consume */ true);
291  }
292 
293  //! Receive elements from other workers.
294  void MainOp() {
295  if (NoRebalance) {
296  // no communication: everyone just checks that all input DIAs have
297  // the same local size.
298  result_size_ = files_[0].num_items();
299  for (size_t i = 1; i < kNumInputs; ++i) {
300  if (result_size_ != files_[i].num_items()) {
301  die("Zip() input DIA " << i << " partition does not match.");
302  }
303  }
304  return;
305  }
306 
307  // first: calculate total size of the DIAs to Zip
308 
309  using ArraySizeT = std::array<size_t, kNumInputs>;
310 
311  // number of elements of this worker
312  ArraySizeT local_size;
313  for (size_t i = 0; i < kNumInputs; ++i) {
314  local_size[i] = files_[i].num_items();
315  sLOG << "input" << i << "local_size" << local_size[i];
316 
317  if (stats_enabled) {
319  "Zip() local_size", local_size[i]);
320  }
321  }
322 
323  // exclusive prefixsum of number of elements: we have items from
324  // [size_prefixsum, size_prefixsum + local_size). And get the total
325  // number of items in each DIAs, over all worker.
326  size_prefixsum_ = local_size;
327  ArraySizeT total_size = context_.net.ExPrefixSumTotal(
328  size_prefixsum_, common::ComponentSum<ArraySizeT>());
329 
330  size_t max_total_size =
331  *std::max_element(total_size.begin(), total_size.end());
332 
333  // return only the minimum size of all DIAs.
334  result_size_ =
335  Pad ? max_total_size
336  : *std::min_element(total_size.begin(), total_size.end());
337 
338  // warn if DIAs have unequal size
339  if (!Pad && UnequalCheck && result_size_ != max_total_size) {
340  die("Zip(): input DIAs have unequal size: "
341  << common::VecToStr(total_size));
342  }
343 
344  if (result_size_ == 0) return;
345 
346  // perform scatters to exchange data, with different types.
347  tlx::call_for_range<kNumInputs>(
348  [=](auto index) {
349  (void)index;
350  this->DoScatter<decltype(index)::index>();
351  });
352  }
353 
354  //! Access CatReaders for different different parents.
355  template <typename Reader>
357  {
358  public:
359  ReaderNext(ZipNode& zip_node,
360  std::array<Reader, kNumInputs>& readers)
361  : zip_node_(zip_node), readers_(readers) { }
362 
363  //! helper for PushData() which checks all inputs
364  bool HasNext() {
365  if (Pad) {
366  for (size_t i = 0; i < kNumInputs; ++i) {
367  if (readers_[i].HasNext()) return true;
368  }
369  return false;
370  }
371  else {
372  for (size_t i = 0; i < kNumInputs; ++i) {
373  if (!readers_[i].HasNext()) return false;
374  }
375  return true;
376  }
377  }
378 
379  template <typename Index>
380  auto operator () (const Index&) {
381 
382  // get the ZipFunction's argument for this index
383  using ZipArg = ZipArgN<Index::index>;
384 
385  if (Pad && !readers_[Index::index].HasNext()) {
386  // take padding_ if next is not available.
387  return std::get<Index::index>(zip_node_.padding_);
388  }
389  return readers_[Index::index].template Next<ZipArg>();
390  }
391 
392  private:
394 
395  //! reference to the reader array in PushData().
396  std::array<Reader, kNumInputs>& readers_;
397  };
398 };
399 
400 /*!
401  * Zips two DIAs of equal size in style of functional programming by applying
402  * zip_function to the i-th elements of both input DIAs to form the i-th element
403  * of the output DIA. The type of the output DIA can be inferred from the
404  * zip_function.
405  *
406  * The two input DIAs are required to be of equal size, otherwise use the
407  * CutTag variant.
408  *
409  * \tparam ZipFunction Type of the zip_function. This is a function with two
410  * input elements, both of the local type, and one output element, which is
411  * the type of the Zip node.
412  *
413  * \param zip_function Zip function, which zips two elements together
414  *
415  * \param first_dia the initial DIA.
416  *
417  * \param dias DIAs, which is zipped together with the original DIA.
418  *
419  * \ingroup dia_dops
420  */
421 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
422  typename... DIAs>
423 auto Zip(const ZipFunction& zip_function,
424  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
425  const DIAs& ... dias) {
426 
427  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
428 
429  static_assert(
430  std::is_convertible<
431  FirstDIAType,
432  typename common::FunctionTraits<ZipFunction>::template arg<0>
433  >::value,
434  "ZipFunction has the wrong input type in DIA 0");
435 
436  using ZipResult
437  = typename common::FunctionTraits<ZipFunction>::result_type;
438 
439  using ZipArgsTuple =
440  typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
441 
442  using ZipNode = api::ZipNode<
443  ZipResult, ZipFunction,
444  /* Pad */ false, /* UnequalCheck */ true, /* NoRebalance */ false,
445  1 + sizeof ... (DIAs)>;
446 
447  auto node = tlx::make_counting<ZipNode>(
448  zip_function, ZipArgsTuple(), first_dia, dias...);
449 
450  return DIA<ZipResult>(node);
451 }
452 
453 /*!
454  * Zips any number of DIAs of equal size in style of functional programming by
455  * applying zip_function to the i-th elements of both input DIAs to form the
456  * i-th element of the output DIA. The type of the output DIA can be inferred
457  * from the zip_function.
458  *
459  * If the two input DIAs are of unequal size, the result is the shorter of
460  * both. Otherwise use the PadTag variant.
461  *
462  * \tparam ZipFunction Type of the zip_function. This is a function with two
463  * input elements, both of the local type, and one output element, which is the
464  * type of the Zip node.
465  *
466  * \param zip_function Zip function, which zips two elements together
467  *
468  * \param first_dia the initial DIA.
469  *
470  * \param dias DIAs, which is zipped together with the original DIA.
471  *
472  * \ingroup dia_dops
473  */
474 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
475  typename... DIAs>
476 auto Zip(struct CutTag,
477  const ZipFunction& zip_function,
478  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
479  const DIAs& ... dias) {
480 
481  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
482 
483  static_assert(
484  std::is_convertible<
485  FirstDIAType,
486  typename common::FunctionTraits<ZipFunction>::template arg<0>
487  >::value,
488  "ZipFunction has the wrong input type in DIA 0");
489 
490  using ZipResult
491  = typename common::FunctionTraits<ZipFunction>::result_type;
492 
493  using ZipArgsTuple =
494  typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
495 
496  using ZipNode = api::ZipNode<
497  ZipResult, ZipFunction,
498  /* Pad */ false, /* UnequalCheck */ false, /* NoRebalance */ false,
499  1 + sizeof ... (DIAs)>;
500 
501  auto node = tlx::make_counting<ZipNode>(
502  zip_function, ZipArgsTuple(), first_dia, dias...);
503 
504  return DIA<ZipResult>(node);
505 }
506 
507 /*!
508  * Zips any number of DIAs in style of functional programming by applying
509  * zip_function to the i-th elements of both input DIAs to form the i-th element
510  * of the output DIA. The type of the output DIA can be inferred from the
511  * zip_function.
512  *
513  * The output DIA's length is the *maximum* of all input DIAs, shorter DIAs are
514  * padded with default-constructed items.
515  *
516  * \tparam ZipFunction Type of the zip_function. This is a function with two
517  * input elements, both of the local type, and one output element, which is
518  * the type of the Zip node.
519  *
520  * \param zip_function Zip function, which zips two elements together
521  *
522  * \param first_dia the initial DIA.
523  *
524  * \param dias DIAs, which is zipped together with the first DIA.
525  *
526  * \ingroup dia_dops
527  */
528 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
529  typename... DIAs>
530 auto Zip(struct PadTag,
531  const ZipFunction& zip_function,
532  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
533  const DIAs& ... dias) {
534 
535  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
536 
537  static_assert(
538  std::is_convertible<
539  FirstDIAType,
540  typename common::FunctionTraits<ZipFunction>::template arg<0>
541  >::value,
542  "ZipFunction has the wrong input type in DIA 0");
543 
544  using ZipResult =
545  typename common::FunctionTraits<ZipFunction>::result_type;
546 
547  using ZipArgsTuple =
548  typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
549 
550  using ZipNode = api::ZipNode<
551  ZipResult, ZipFunction,
552  /* Pad */ true, /* UnequalCheck */ false, /* NoRebalance */ false,
553  1 + sizeof ... (DIAs)>;
554 
555  auto node = tlx::make_counting<ZipNode>(
556  zip_function, ZipArgsTuple(), first_dia, dias...);
557 
558  return DIA<ZipResult>(node);
559 }
560 
561 /*!
562  * Zips any number of DIAs in style of functional programming by applying
563  * zip_function to the i-th elements of both input DIAs to form the i-th element
564  * of the output DIA. The type of the output DIA can be inferred from the
565  * zip_function.
566  *
567  * The output DIA's length is the *maximum* of all input DIAs, shorter DIAs are
568  * padded with items given by the padding parameter.
569  *
570  * \tparam ZipFunction Type of the zip_function. This is a function with two
571  * input elements, both of the local type, and one output element, which is
572  * the type of the Zip node.
573  *
574  * \param zip_function Zip function, which zips two elements together
575  *
576  * \param padding std::tuple<args> of padding sentinels delivered to ZipFunction
577  * if an input dia is too short.
578  *
579  * \param first_dia the initial DIA.
580  *
581  * \param dias DIAs, which is zipped together with the original DIA.
582  *
583  * \ingroup dia_dops
584  */
585 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
586  typename... DIAs>
587 auto Zip(
588  struct PadTag,
589  const ZipFunction& zip_function,
590  const typename common::FunctionTraits<ZipFunction>::args_tuple_plain& padding,
591  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
592  const DIAs& ... dias) {
593 
594  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
595 
596  static_assert(
597  std::is_convertible<
598  FirstDIAType,
599  typename common::FunctionTraits<ZipFunction>::template arg<0>
600  >::value,
601  "ZipFunction has the wrong input type in DIA 0");
602 
603  using ZipResult =
604  typename common::FunctionTraits<ZipFunction>::result_type;
605 
606  using ZipNode = api::ZipNode<
607  ZipResult, ZipFunction,
608  /* Pad */ true, /* UnequalCheck */ false, /* NoRebalance */ false,
609  1 + sizeof ... (DIAs)>;
610 
611  auto node = tlx::make_counting<ZipNode>(
612  zip_function, padding, first_dia, dias...);
613 
614  return DIA<ZipResult>(node);
615 }
616 
617 /*!
618  * Zips any number of DIAs in style of functional programming by applying
619  * zip_function to the i-th elements of both input DIAs to form the i-th element
620  * of the output DIA. The type of the output DIA can be inferred from the
621  * zip_function.
622  *
623  * In this variant, the DIA partitions on all PEs must have matching length. No
624  * rebalancing is performed, and the program will die if any partition
625  * mismatches. This enables Zip to proceed without any communication.
626  *
627  * \tparam ZipFunction Type of the zip_function. This is a function with two
628  * input elements, both of the local type, and one output element, which is
629  * the type of the Zip node.
630  *
631  * \param zip_function Zip function, which zips two elements together
632  *
633  * \param first_dia the initial DIA.
634  *
635  * \param dias DIAs, which is zipped together with the original DIA.
636  *
637  * \ingroup dia_dops
638  */
639 template <typename ZipFunction, typename FirstDIAType, typename FirstDIAStack,
640  typename... DIAs>
641 auto Zip(
642  struct NoRebalanceTag,
643  const ZipFunction& zip_function,
644  const DIA<FirstDIAType, FirstDIAStack>& first_dia,
645  const DIAs& ... dias) {
646 
647  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
648 
649  static_assert(
650  std::is_convertible<
651  FirstDIAType,
652  typename common::FunctionTraits<ZipFunction>::template arg<0>
653  >::value,
654  "ZipFunction has the wrong input type in DIA 0");
655 
656  using ZipResult =
657  typename common::FunctionTraits<ZipFunction>::result_type;
658 
659  using ZipArgsTuple =
660  typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
661 
662  using ZipNode = api::ZipNode<
663  ZipResult, ZipFunction,
664  /* Pad */ false, /* UnequalCheck */ false, /* NoRebalance */ true,
665  1 + sizeof ... (DIAs)>;
666 
667  auto node = tlx::make_counting<ZipNode>(
668  zip_function, ZipArgsTuple(), first_dia, dias...);
669 
670  return DIA<ZipResult>(node);
671 }
672 
673 template <typename ValueType, typename Stack>
674 template <typename ZipFunction, typename SecondDIA>
676  const SecondDIA& second_dia, const ZipFunction& zip_function) const {
677  return api::Zip(zip_function, *this, second_dia);
678 }
679 
680 template <typename ValueType, typename Stack>
681 template <typename ZipFunction, typename SecondDIA>
683  struct CutTag const&, const SecondDIA& second_dia,
684  const ZipFunction& zip_function) const {
685  return api::Zip(CutTag, zip_function, *this, second_dia);
686 }
687 
688 template <typename ValueType, typename Stack>
689 template <typename ZipFunction, typename SecondDIA>
691  struct PadTag const&, const SecondDIA& second_dia,
692  const ZipFunction& zip_function) const {
693  return api::Zip(PadTag, zip_function, *this, second_dia);
694 }
695 
696 template <typename ValueType, typename Stack>
697 template <typename ZipFunction, typename SecondDIA>
699  struct NoRebalanceTag const&, const SecondDIA& second_dia,
700  const ZipFunction& zip_function) const {
701  return api::Zip(NoRebalanceTag, zip_function, *this, second_dia);
702 }
703 
704 //! \}
705 
706 } // namespace api
707 
708 //! imported from api namespace
709 using api::Zip;
710 
711 } // namespace thrill
712 
713 #endif // !THRILL_API_ZIP_HEADER
714 
715 /******************************************************************************/
const ZipArgsTuple padding_
padding for shorter DIAs
Definition: zip.hpp:102
net::FlowControlChannel & net
Definition: context.hpp:443
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
std::array< Reader, kNumInputs > & readers_
reference to the reader array in PushData().
Definition: zip.hpp:396
static constexpr bool debug
Definition: zip.hpp:79
Register Parent PreOp Hooks, instantiated and called for each Zip parent.
Definition: zip.hpp:225
void PrintCollectiveMeanStdev(const char *text, const Type &local)
Definition: context.hpp:349
const std::array< bool, kNumInputs > parent_stack_empty_
Whether the parent stack is empty.
Definition: zip.hpp:202
Access CatReaders for different different parents.
Definition: zip.hpp:356
void vexpand(Types &&...)
Definition: vexpand.hpp:24
auto Zip(const SecondDIA &second_dia, const ZipFunction &zip_function) const
Zips two DIAs of equal size in style of functional programming by applying zip_function to the i-th e...
Definition: zip.hpp:675
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
static constexpr bool g_debug_push_file
Definition: config.hpp:44
auto operator()(const Index &)
Definition: zip.hpp:380
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
auto Zip(const ZipFunction &zip_function, const DIA< FirstDIAType, FirstDIAStack > &first_dia, const DIAs &...dias)
Zips two DIAs of equal size in style of functional programming by applying zip_function to the i-th e...
Definition: zip.hpp:423
template for computing the component-wise sum of std::array or std::vector.
Definition: functional.hpp:114
tag structure for Zip()
Definition: dia.hpp:78
typename common::FunctionTraits< ZipFunction >::template arg_plain< Index > ZipArgN
Definition: zip.hpp:89
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1120
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
ReaderNext(ZipNode &zip_node, std::array< Reader, kNumInputs > &readers)
Definition: zip.hpp:359
void operator()(const Index &, Parent &parent)
Definition: zip.hpp:231
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
static std::string VecToStr(const std::array< T, N > &data)
Logging helper to print arrays as [a1,a2,a3,...].
Definition: string.hpp:179
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
auto apply_tuple(Functor &&f, Tuple &&t)
Call the functor f with the contents of t as arguments.
Definition: apply_tuple.hpp:40
std::vector< data::File > files_
Files for intermediate storage.
Definition: zip.hpp:205
bool HasNext()
helper for PushData() which checks all inputs
Definition: zip.hpp:364
tag structure for Zip()
Definition: dia.hpp:70
typename common::FunctionTraits< ZipFunction >::args_tuple_plain ZipArgsTuple
Definition: zip.hpp:91
void MainOp()
Receive elements from other workers.
Definition: zip.hpp:294
tag structure for Zip()
Definition: dia.hpp:86
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
static constexpr size_t padding
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:280
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSumTotal(T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, and delivers the total sum as well...
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
void DoScatter()
Scatter items from DIA "Index" to other workers if necessary.
Definition: zip.hpp:260
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
virtual bool OnPreOpFile(const data::File &, size_t)
Definition: dia_base.hpp:168
ZipNode(const ZipFunction &zip_function, const ZipArgsTuple &padding, const ParentDIA0 &parent0, const ParentDIAs &...parents)
Constructor for a ZipNode.
Definition: zip.hpp:98
const std::vector< DIABasePtr > & parents() const
Returns the parents of this DIABase.
Definition: dia_base.hpp:253
void call_foreach_with_index(Functor &&f, Args &&...args)
void Close()
Explicitly close the writer.
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:248
static constexpr bool stats_enabled
Set this variable to true to enable generation and output of stats.
Definition: zip.hpp:82
std::array< size_t, kNumInputs > size_prefixsum_
exclusive prefix sum over the number of items in workers
Definition: zip.hpp:217
data::CatStreamPtr streams_[kNumInputs]
Array of inbound CatStreams.
Definition: zip.hpp:211
void AssertValid() const
Assert that the DIA is valid.
Definition: dia.hpp:178
data::File::Writer writers_[kNumInputs]
Writers to intermediate files.
Definition: zip.hpp:208
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
size_t result_size_
shortest size of Zipped inputs
Definition: zip.hpp:220
Context & context_
associated Context
Definition: dia_base.hpp:293
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
A DIANode which performs a Zip operation.
Definition: zip.hpp:77