Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
dia.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/dia.hpp
3  *
4  * Interface for Operations, holds pointer to node and lambda from node to state
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Alexander Noe <[email protected]>
9  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
10  * Copyright (C) 2015 Timo Bingmann <[email protected]>
11  * Copyright (C) 2015 Huyen Chau Nguyen <[email protected]>
12  *
13  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
14  ******************************************************************************/
15 
16 #pragma once
17 #ifndef THRILL_API_DIA_HEADER
18 #define THRILL_API_DIA_HEADER
19 
21 #include <thrill/api/context.hpp>
22 #include <thrill/api/dia_node.hpp>
26 
27 #include <cassert>
28 #include <functional>
29 #include <ostream>
30 #include <string>
31 #include <utility>
32 #include <vector>
33 
34 namespace thrill {
35 namespace api {
36 
37 //! \ingroup api_layer
38 //! \{
39 
40 //! tag structure for ReduceByKey(), and ReduceToIndex()
41 template <bool Value>
44  static const bool value = Value;
45 };
46 
47 //! global const VolatileKeyFlag instance
48 const struct VolatileKeyFlag<true> VolatileKeyTag;
49 
50 //! global const VolatileKeyFlag instance
51 const struct VolatileKeyFlag<false> NoVolatileKeyTag;
52 
53 //! tag structure for ReduceToIndex()
56 };
57 
58 //! global const SkipPreReducePhaseTag instance
60 
61 //! tag structure for Window() and FlatWindow()
62 struct DisjointTag {
64 };
65 
66 //! global const DisjointTag instance
67 const struct DisjointTag DisjointTag;
68 
69 //! tag structure for Zip()
70 struct CutTag {
71  CutTag() { }
72 };
73 
74 //! global const CutTag instance
75 const struct CutTag CutTag;
76 
77 //! tag structure for Zip()
78 struct PadTag {
79  PadTag() { }
80 };
81 
82 //! global const PadTag instance
83 const struct PadTag PadTag;
84 
85 //! tag structure for Zip()
88 };
89 
90 //! global const NoRebalanceTag instance
92 
93 //! tag structure for Read()
96 };
97 
98 //! global const LocalStorageTag instance
100 
101 //! tag structure for ReduceByKey()
102 template <bool Value>
105  static const bool value = Value;
106 };
107 
108 //! global const DuplicateDetectionFlag instance
110 
111 //! global const DuplicateDetectionFlag instance
113 
114 //! tag structure for GroupByKey(), and InnerJoin()
115 template <bool Value>
118  static const bool value = Value;
119 };
120 
121 //! global const LocationDetectionFlag instance
123 
124 //! global const LocationDetectionFlag instance
126 
127 /*!
128  * DIA is the interface between the user and the Thrill framework. A DIA can be
129  * imagined as an immutable array, even though the data does not need to be
130  * which represents the state after the previous DOp or Action. Additionally, a
131  * DIA stores the local lambda function chain of type Stack, which can transform
132  * elements of the DIANode to elements of this DIA. DOps/Actions create a DIA
133  * and a new DIANode, to which the DIA links to. LOps only create a new DIA,
134  * which link to the previous DIANode.
135  *
136  * \tparam ValueType Type of elements currently in this DIA.
137  * \tparam Stack Type of the function chain.
138  */
139 template <typename ValueType_,
140  typename Stack_ = tlx::FunctionStack<ValueType_> >
141 class DIA
142 {
143  friend class Context;
144 
145  //! alias for convenience.
146  template <typename Function>
147  using FunctionTraits = common::FunctionTraits<Function>;
148 
149 public:
150  //! type of the items virtually in the DIA, which is the type emitted by the
151  //! current LOp stack.
152  using ValueType = ValueType_;
153 
154  //! Type of this function stack
155  using Stack = Stack_;
156 
157  //! type of the items delivered by the DOp, and pushed down the function
158  //! stack towards the next nodes. If the function stack contains LOps nodes,
159  //! these may transform the type.
160  using StackInput = typename Stack::Input;
161 
162  //! boolean indication whether this FunctionStack is empty
163  static constexpr bool stack_empty = Stack::empty;
164 
165  //! type of pointer to the real node object implementation. This object has
166  //! base item type StackInput which is transformed by the function stack
167  //! lambdas further. But even pushing more lambdas does not change the stack
168  //! input type.
170 
171  //! default-constructor: invalid DIA
172  DIA() = default;
173 
174  //! Return whether the DIA is valid.
175  bool IsValid() const { return node_.get() != nullptr; }
176 
177  //! Assert that the DIA is valid.
178  void AssertValid() const { assert(IsValid()); }
179 
180  /*!
181  * Constructor of a new DIA with a pointer to a DIANode and a
182  * function chain from the DIANode to this DIA.
183  *
184  * \param node Pointer to the last DIANode, DOps and Actions create a new
185  * DIANode, LOps link to the DIANode of the previous DIA.
186  *
187  * \param stack Function stack consisting of functions between last DIANode
188  * and this DIA.
189  *
190  * \param id Serial id of DIA, which includes LOps
191  *
192  * \param label static string label of DIA.
193  */
194  DIA(const DIANodePtr& node, const Stack& stack, size_t id, const char* label)
195  : node_(node), stack_(stack), id_(id), label_(label) { }
196 
197  /*!
198  * Constructor of a new DIA supporting move semantics of nodes.
199  *
200  * \param node Pointer to the last DIANode, DOps and Actions create a new
201  * DIANode, LOps link to the DIANode of the previous DIA.
202  *
203  * \param stack Function stack consisting of functions between last DIANode
204  * and this DIA.
205  *
206  * \param id Serial id of DIA, which includes LOps
207  *
208  * \param label static string label of DIA.
209  */
210  DIA(DIANodePtr&& node, const Stack& stack, size_t id, const char* label)
211  : node_(std::move(node)), stack_(stack), id_(id), label_(label) { }
212 
213  /*!
214  * Constructor of a new DIA with a real backing DIABase.
215  *
216  * \param node Pointer to the last DIANode, DOps and Actions create a new
217  * DIANode, LOps link to the DIANode of the previous DIA.
218  */
219  explicit DIA(DIANodePtr&& node)
220  : DIA(std::move(node), tlx::FunctionStack<ValueType>(),
221  node->id(), node->label()) { }
222 
223  /*!
224  * Copy-Constructor of a DIA with empty function chain from a DIA with
225  * a non-empty chain. The functionality of the chain is stored in a newly
226  * created LOpNode. The current DIA than points to this LOpNode. This
227  * is needed to support assignment operations between DIA's.
228  */
229 #ifdef THRILL_DOXYGEN_IGNORE
230  template <typename AnyStack>
231  DIA(const DIA<ValueType, AnyStack>& rhs);
232 #else
233  template <typename AnyStack>
234  DIA(const DIA<ValueType, AnyStack>& rhs)
235 #if __GNUC__ && !__clang__
236  // the attribute warning does not work with gcc?
237  __attribute__ ((warning( // NOLINT
238  "Casting to DIA creates LOpNode instead of inline chaining.\n"
239  "Consider whether you can use auto instead of DIA.")));
240 #elif __GNUC__ && __clang__
241  __attribute__ ((deprecated)); // NOLINT
242 #else
243  ; // NOLINT
244 #endif
245 #endif // THRILL_DOXYGEN_IGNORE
246 
247  //! \name Const Accessors
248  //! \{
249 
250  //! Returns a pointer to the according DIANode.
251  const DIANodePtr& node() const {
252  assert(IsValid());
253  return node_;
254  }
255 
256  //! Returns the number of references to the according DIANode.
257  size_t node_refcount() const {
258  assert(IsValid());
259  return node_->reference_count();
260  }
261 
262  //! Returns the stored function chain.
263  const Stack& stack() const {
264  assert(IsValid());
265  return stack_;
266  }
267 
268  //! Return context_ of DIANode, e.g. for creating new LOps and DOps
269  Context& context() const {
270  assert(IsValid());
271  return node_->context();
272  }
273 
274  //! Return context_ of DIANode, e.g. for creating new LOps and DOps
275  Context& ctx() const {
276  assert(IsValid());
277  return node_->context();
278  }
279 
280  //! Returns id_
281  size_t id() const { return id_; }
282 
283  //! Returns label_
284  const char * label() const { return label_; }
285 
286  //! \}
287 
288  /*!
289  * Mark the referenced DIANode for keeping, which makes children not consume
290  * the data when executing. This does not create a new DIA, but returns the
291  * existing one.
292  */
293  const DIA& Keep(size_t increase = 1) const {
294  assert(IsValid());
295  if (node_->context().consume() && node_->consume_counter() == 0) {
296  die("Keep() called on "
297  << *node_ << " which was already consumed.");
298  }
299  node_->IncConsumeCounter(increase);
300  return *this;
301  }
302 
303  /*!
304  * Mark the referenced DIANode for keeping forever, which makes children not
305  * consume the data when executing. This does not create a new DIA, but
306  * returns the existing one.
307  */
308  const DIA& KeepForever() const {
309  assert(IsValid());
311  return *this;
312  }
313 
314  /*!
315  * Execute DIA's scope and parents such that this (Action)Node is
316  * Executed. This does not create a new DIA, but returns the existing one.
317  */
318  const DIA& Execute() const {
319  assert(IsValid());
320  node_->RunScope();
321  return *this;
322  }
323 
324  //! \name Local Operations (LOps)
325  //! \{
326 
327  /*!
328  * Map applies `map_function` : \f$ A \to B \f$ to each item of a DIA and
329  * delivers a new DIA contains the returned values, which may be of a
330  * different type.
331  *
332  * The function chain of the returned DIA is this DIA's stack_ chained with
333  * map_fn.
334  *
335  * \param map_function Map function of type MapFunction, which maps each
336  * element to an element of a possibly different type.
337  *
338  * \ingroup dia_lops
339  */
340  template <typename MapFunction>
341  auto Map(const MapFunction& map_function) const {
342  assert(IsValid());
343 
344  using MapArgument
345  = typename FunctionTraits<MapFunction>::template arg_plain<0>;
346  using MapResult
348  auto conv_map_function =
349  [map_function](const MapArgument& input, auto emit_func) {
350  emit_func(map_function(input));
351  };
352 
353  static_assert(
355  "MapFunction has the wrong input type");
356 
357  size_t new_id = context().next_dia_id();
358 
360  << "id" << new_id
361  << "label" << "Map"
362  << "class" << "DIA"
363  << "event" << "create"
364  << "type" << "LOp"
365  << "parents" << (common::Array<size_t>{ id_ });
366 
367  auto new_stack = stack_.push(conv_map_function);
369  node_, new_stack, new_id, "Map");
370  }
371 
372  /*!
373  * Each item of a DIA is tested using `filter_function` : \f$ A \to
374  * \textrm{bool} \f$ to determine whether it is copied into the output DIA
375  * or excluded.
376  *
377  * The function chain of the returned DIA is this DIA's stack_ chained with
378  * filter_function.
379  *
380  * \param filter_function Filter function of type FilterFunction, which maps
381  * each element to a boolean.
382  *
383  * \ingroup dia_lops
384  */
385  template <typename FilterFunction>
386  auto Filter(const FilterFunction& filter_function) const {
387  assert(IsValid());
388 
389  using FilterArgument
390  = typename FunctionTraits<FilterFunction>::template arg_plain<0>;
391  auto conv_filter_function =
392  [filter_function](const FilterArgument& input, auto emit_func) {
393  if (filter_function(input)) emit_func(input);
394  };
395 
396  static_assert(
398  "FilterFunction has the wrong input type");
399 
400  size_t new_id = context().next_dia_id();
401 
403  << "id" << new_id
404  << "label" << "Filter"
405  << "class" << "DIA"
406  << "event" << "create"
407  << "type" << "LOp"
408  << "parents" << (common::Array<size_t>{ id_ });
409 
410  auto new_stack = stack_.push(conv_filter_function);
412  node_, new_stack, new_id, "Filter");
413  }
414 
415  /*!
416  * \brief Each item of a DIA is expanded by the `flatmap_function` : \f$ A
417  * \to \textrm{array}(B) \f$ to zero or more items of different type, which
418  * are concatenated in the resulting DIA. The return type of
419  * `flatmap_function` must be specified as template parameter.
420  *
421  * FlatMap is a LOp, which maps this DIA according to the flatmap_function
422  * given by the user. The flatmap_function maps each element to elements of
423  * a possibly different type. The flatmap_function has an emitter function
424  * as it's second parameter. This emitter is called once for each element to
425  * be emitted. The function chain of the returned DIA is this DIA's stack_
426  * chained with flatmap_function.
427 
428  * \tparam ResultType ResultType of the FlatmapFunction, if different from
429  * item type of DIA.
430  *
431  * \param flatmap_function Map function of type FlatmapFunction, which maps
432  * each element to elements of a possibly different type.
433  *
434  * \ingroup dia_lops
435  */
436  template <typename ResultType = ValueType, typename FlatmapFunction>
437  auto FlatMap(const FlatmapFunction& flatmap_function) const {
438  assert(IsValid());
439 
440  size_t new_id = context().next_dia_id();
441 
443  << "id" << new_id
444  << "label" << "FlatMap"
445  << "class" << "DIA"
446  << "event" << "create"
447  << "type" << "LOp"
448  << "parents" << (common::Array<size_t>{ id_ });
449 
450  auto new_stack = stack_.push(flatmap_function);
452  node_, new_stack, new_id, "FlatMap");
453  }
454 
455  /*!
456  * Each item of a DIA is copied into the output DIA with success probability
457  * p (an independent Bernoulli trial).
458  *
459  * \ingroup dia_lops
460  */
461  auto BernoulliSample(double p) const;
462 
463  /*!
464  * Union is a LOp, which creates the union of all items from any number of
465  * DIAs as a single DIA, where the items are in an arbitrary order. All
466  * input DIAs must contain the same type, which is also the output DIA's
467  * type.
468  *
469  * The Union operation concatenates all _local_ pieces of a DIA, no
470  * rebalancing is performed, and no communication is needed.
471  *
472  * \ingroup dia_lops
473  */
474  template <typename SecondDIA>
475  auto Union(const SecondDIA& second_dia) const;
476 
477  //! \}
478 
479  //! \name Actions
480  //! \{
481 
482  /*!
483  * Computes the total size of all elements across all workers.
484  *
485  * \ingroup dia_actions
486  */
487  size_t Size() const;
488 
489  /*!
490  * Lazily computes the total size of all elements across all workers.
491  *
492  * \ingroup dia_actions
493  */
494  Future<size_t> SizeFuture() const;
495 
496  /*!
497  * Returns the whole DIA in an std::vector on each worker. This is only for
498  * testing purposes and should not be used on large datasets.
499  *
500  * \ingroup dia_actions
501  */
502  std::vector<ValueType> AllGather() const;
503 
504  /**
505  * \brief AllGather is an Action, which returns the whole DIA in an
506  * std::vector on each worker. This is only for testing purposes and should
507  * not be used on large datasets.
508  *
509  * \ingroup dia_actions
510  */
511  void AllGather(std::vector<ValueType>* out_vector) const;
512 
513  /*!
514  * Returns the whole DIA in an std::vector on each worker. This is only for
515  * testing purposes and should not be used on large datasets.
516  *
517  * \ingroup dia_actions
518  */
520 
521  /*!
522  * Print is an Action, which collects all data of the DIA at the worker 0
523  * and prints using ostream serialization. It is implemented using Gather().
524  *
525  * \ingroup dia_actions
526  */
527  void Print(const std::string& name) const;
528 
529  /*!
530  * Print is an Action, which collects all data of the DIA at the worker 0
531  * and prints using ostream serialization. It is implemented using Gather().
532  *
533  * \ingroup dia_actions
534  */
535  void Print(const std::string& name, std::ostream& out) const;
536 
537  /*!
538  * Gather is an Action, which collects all data of the DIA into a vector at
539  * the given worker. This should only be done if the received data can fit
540  * into RAM of the one worker.
541  *
542  * \ingroup dia_actions
543  */
544  std::vector<ValueType> Gather(size_t target_id = 0) const;
545 
546  /*!
547  * Gather is an Action, which collects all data of the DIA into a vector at
548  * the given worker. This should only be done if the received data can fit
549  * into RAM of the one worker.
550  *
551  * \ingroup dia_actions
552  */
553  void Gather(size_t target_id, std::vector<ValueType>* out_vector) const;
554 
555  /*!
556  * Select up to sample_size items uniformly at random and return a new
557  * DIA<T>.
558  */
559  auto Sample(size_t sample_size) const;
560 
561  /*!
562  * AllReduce is an Action, which computes the reduction sum of all elements
563  * globally and delivers the same value on all workers.
564  *
565  * \param reduce_function Reduce function.
566  *
567  * \param initial_value Initial value of the reduction.
568  *
569  * \ingroup dia_actions
570  */
571  template <typename ReduceFunction>
572  ValueType AllReduce(const ReduceFunction& reduce_function,
573  const ValueType& initial_value = ValueType()) const;
574 
575  /*!
576  * AllReduce is an ActionFuture, which computes the reduction sum of
577  * all elements globally and delivers the same value on all workers.
578  *
579  * \param reduce_function Reduce function.
580  *
581  * \param initial_value Initial value of the reduction.
582  *
583  * \ingroup dia_actions
584  */
585  template <typename ReduceFunction>
587  const ReduceFunction& reduce_function,
588  const ValueType& initial_value = ValueType()) const;
589 
590  /*!
591  * Sum is an Action, which computes the sum of all elements globally.
592  *
593  * \param sum_function Sum function.
594  *
595  * \param initial_value Initial value of the sum.
596  *
597  * \ingroup dia_actions
598  */
599  template <typename SumFunction = std::plus<ValueType> >
600  ValueType Sum(const SumFunction& sum_function = SumFunction(),
601  const ValueType& initial_value = ValueType()) const;
602 
603  /*!
604  * Sum is an ActionFuture, which computes the sum of all elements
605  * globally.
606  *
607  * \param sum_function Sum function.
608  *
609  * \param initial_value Initial value of the sum.
610  *
611  * \ingroup dia_actions
612  */
613  template <typename SumFunction = std::plus<ValueType> >
615  const SumFunction& sum_function = SumFunction(),
616  const ValueType& initial_value = ValueType()) const;
617 
618  /*!
619  * Min is an Action, which computes the minimum of all elements globally.
620  *
621  * \param initial_value Initial value of the min.
622  *
623  * \ingroup dia_actions
624  */
625  ValueType Min(const ValueType& initial_value = ValueType()) const;
626 
627  /*!
628  * Min is an ActionFuture, which computes the minimum of all elements
629  * globally.
630  *
631  * \param initial_value Initial value of the min.
632  *
633  * \ingroup dia_actions
634  */
636  const ValueType& initial_value = ValueType()) const;
637 
638  /*!
639  * Max is an Action, which computes the maximum of all elements globally.
640  *
641  * \param initial_value Initial value of the max.
642  *
643  * \ingroup dia_actions
644  */
645  ValueType Max(const ValueType& initial_value = ValueType()) const;
646 
647  /*!
648  * Max is an ActionFuture, which computes the maximum of all elements
649  * globally.
650  *
651  * \param initial_value Initial value of the max.
652  *
653  * \ingroup dia_actions
654  */
656  const ValueType& initial_value = ValueType()) const;
657 
658  /*!
659  * Compute the approximate number of distinct elements in the DIA.
660  *
661  * \tparam p Number of bits to use for index. Should be between 4 and 16.
662  * \ingroup dia_actions
663  */
664  template <size_t p>
665  double HyperLogLog() const;
666 
667  /*!
668  * WriteLinesOne is an Action, which writes std::strings to a single output
669  * file.
670  *
671  * \param filepath Destination of the output file.
672  *
673  * \ingroup dia_actions
674  */
675  void WriteLinesOne(const std::string& filepath) const;
676 
677  /*!
678  * WriteLinesOne is an ActionFuture, which writes std::strings to a single
679  * output file.
680  *
681  * \param filepath Destination of the output file.
682  *
683  * \ingroup dia_actions
684  */
686  const std::string& filepath) const;
687 
688  /*!
689  * WriteLines is an Action, which writes std::strings to multiple output
690  * files. Strings are written using fstream with a newline after each
691  * entry. Each worker creates its individual file.
692  *
693  * \param filepath Destination of the output file. This filepath must
694  * contain two special substrings: "$$$$$" is replaced by the worker id and
695  * "#####" will be replaced by the file chunk id. The last occurrences of
696  * "$" and "#" are replaced, otherwise "$$$$" and/or "##########" are
697  * automatically appended.
698  *
699  * \param target_file_size target size of each individual file.
700  *
701  * \ingroup dia_actions
702  */
703  void WriteLines(const std::string& filepath,
704  size_t target_file_size = 128* 1024* 1024) const;
705 
706  /*!
707  * WriteLines is an ActionFuture, which writes std::strings to multiple
708  * output files. Strings are written using fstream with a newline after each
709  * entry. Each worker creates its individual file.
710  *
711  * \param filepath Destination of the output file. This filepath must
712  * contain two special substrings: "$$$$$" is replaced by the worker id and
713  * "#####" will be replaced by the file chunk id. The last occurrences of
714  * "$" and "#" are replaced, otherwise "$$$$" and/or "##########" are
715  * automatically appended.
716  *
717  * \param target_file_size target size of each individual file.
718  *
719  * \ingroup dia_actions
720  */
722  const std::string& filepath,
723  size_t target_file_size = 128* 1024* 1024) const;
724 
725  /*!
726  * WriteBinary is a function, which writes a DIA to many files per
727  * worker. The input DIA can be recreated with ReadBinary and equal
728  * filepath.
729  *
730  * \param filepath Destination of the output file. This filepath must
731  * contain two special substrings: "$$$$$" is replaced by the worker id and
732  * "#####" will be replaced by the file chunk id. The last occurrences of
733  * "$" and "#" are replaced, otherwise "$$$$" and/or "##########" are
734  * automatically appended.
735  *
736  * \param max_file_size size limit of individual file.
737  *
738  * \ingroup dia_actions
739  */
740  void WriteBinary(const std::string& filepath,
741  size_t max_file_size = 128* 1024* 1024) const;
742 
743  /*!
744  * WriteBinary is a function, which writes a DIA to many files per
745  * worker. The input DIA can be recreated with ReadBinary and equal
746  * filepath.
747  *
748  * \param filepath Destination of the output file. This filepath must
749  * contain two special substrings: "$$$$$" is replaced by the worker id and
750  * "#####" will be replaced by the file chunk id. The last occurrences of
751  * "$" and "#" are replaced, otherwise "$$$$" and/or "##########" are
752  * automatically appended.
753  *
754  * \param max_file_size size limit of individual file.
755  *
756  * \ingroup dia_actions
757  */
759  const std::string& filepath,
760  size_t max_file_size = 128* 1024* 1024) const;
761 
762  //! \}
763 
764  //! \name Distributed Operations (DOps)
765  //! \{
766 
767  /*!
768  * ReduceByKey is a DOp, which groups elements of the DIA with the
769  * key_extractor and reduces each key-bucket to a single element using the
770  * associative reduce_function. The reduce_function defines how two elements
771  * can be reduced to a single element of equal type.
772  *
773  * The key of the reduced element has to be equal to the keys of the input
774  * elements. Since ReduceBy is a DOp, it creates a new DIANode. The DIA
775  * returned by Reduce links to this newly created DIANode. The stack_ of the
776  * returned DIA consists of the PostOp of Reduce, as a reduced element can
777  * directly be chained to the following LOps.
778  *
779  * \param key_extractor Key extractor function, which maps each element to a
780  * key of possibly different type.
781  *
782  * \tparam ReduceFunction Type of the reduce_function. This is a function
783  * reducing two elements of L's result type to a single element of equal
784  * type.
785  *
786  * \param reduce_function Reduce function, which defines how the key buckets
787  * are reduced to a single element. This function is applied associative but
788  * not necessarily commutative.
789  *
790  * \param reduce_config Reduce configuration.
791  *
792  * \ingroup dia_dops
793  */
794  template <typename KeyExtractor, typename ReduceFunction,
795  typename ReduceConfig = class DefaultReduceConfig>
796  auto ReduceByKey(
797  const KeyExtractor& key_extractor,
798  const ReduceFunction& reduce_function,
799  const ReduceConfig& reduce_config = ReduceConfig()) const;
800 
801  /*!
802  * ReduceByKey is a DOp, which groups elements of the DIA with the
803  * key_extractor and reduces each key-bucket to a single element using the
804  * associative reduce_function. The reduce_function defines how two elements
805  * can be reduced to a single element of equal type.
806  *
807  * The key of the reduced element has to be equal to the keys of the input
808  * elements. Since ReduceBy is a DOp, it creates a new DIANode. The DIA
809  * returned by Reduce links to this newly created DIANode. The stack_ of the
810  * returned DIA consists of the PostOp of Reduce, as a reduced element can
811  * directly be chained to the following LOps.
812  *
813  * \param key_extractor Key extractor function, which maps each element to a
814  * key of possibly different type.
815  *
816  * \tparam ReduceFunction Type of the reduce_function. This is a function
817  * reducing two elements of L's result type to a single element of equal
818  * type.
819  *
820  * \param reduce_function Reduce function, which defines how the key buckets
821  * are reduced to a single element. This function is applied associative but
822  * not necessarily commutative.
823  *
824  * \param reduce_config Reduce configuration.
825  *
826  * \param key_hash_function Function to hash keys extracted by KeyExtractor.
827  *
828  * \ingroup dia_dops
829  */
830  template <typename KeyExtractor, typename ReduceFunction,
831  typename ReduceConfig, typename KeyHashFunction>
832  auto ReduceByKey(
833  const KeyExtractor& key_extractor,
834  const ReduceFunction& reduce_function,
835  const ReduceConfig& reduce_config,
836  const KeyHashFunction& key_hash_function) const;
837 
838  /*!
839  * ReduceByKey is a DOp, which groups elements of the DIA with the
840  * key_extractor and reduces each key-bucket to a single element using the
841  * associative reduce_function. The reduce_function defines how two elements
842  * can be reduced to a single element of equal type.
843  *
844  * The key of the reduced element has to be equal to the keys of the input
845  * elements. Since ReduceBy is a DOp, it creates a new DIANode. The DIA
846  * returned by Reduce links to this newly created DIANode. The stack_ of the
847  * returned DIA consists of the PostOp of Reduce, as a reduced element can
848  * directly be chained to the following LOps.
849  *
850  * \param key_extractor Key extractor function, which maps each element to a
851  * key of possibly different type.
852  *
853  * \tparam ReduceFunction Type of the reduce_function. This is a function
854  * reducing two elements of L's result type to a single element of equal
855  * type.
856  *
857  * \param reduce_function Reduce function, which defines how the key buckets
858  * are reduced to a single element. This function is applied associative but
859  * not necessarily commutative.
860  *
861  * \param reduce_config Reduce configuration.
862  *
863  * \param key_hash_function Function to hash keys extracted by KeyExtractor.
864  *
865  * \param key_equal_function Function to compare keys in reduce hash tables.
866  *
867  * \ingroup dia_dops
868  */
869  template <typename KeyExtractor, typename ReduceFunction,
870  typename ReduceConfig,
871  typename KeyHashFunction, typename KeyEqualFunction>
872  auto ReduceByKey(
873  const KeyExtractor& key_extractor,
874  const ReduceFunction& reduce_function,
875  const ReduceConfig& reduce_config,
876  const KeyHashFunction& key_hash_function,
877  const KeyEqualFunction& key_equal_function) const;
878 
879  /*!
880  * ReduceByKey is a DOp, which groups elements of the DIA with the
881  * key_extractor and reduces each key-bucket to a single element using the
882  * associative reduce_function. The reduce_function defines how two elements
883  * can be reduced to a single element of equal type.
884  *
885  * In contrast to ReduceBy, the reduce_function is allowed to change the key
886  * (Example: Integers with modulo function as key_extractor). Creates
887  * overhead as both key and value have to be sent in shuffle step. Since
888  * ReduceByKey is a DOp, it creates a new DIANode. The DIA returned by
889  * Reduce links to this newly created DIANode. The stack_ of the returned
890  * DIA consists of the PostOp of Reduce, as a reduced element can directly
891  * be chained to the following LOps.
892  *
893  * \param volatile_key_flag tag
894  *
895  * \param key_extractor Key extractor function, which maps each element to a
896  * key of possibly different type.
897  *
898  * \tparam ReduceFunction Type of the reduce_function. This is a function
899  * reducing two elements of L's result type to a single element of equal
900  * type.
901  *
902  * \param reduce_function Reduce function, which defines how the key buckets
903  * are reduced to a single element. This function is applied associative but
904  * not necessarily commutative.
905  *
906  * \param reduce_config Reduce configuration.
907  *
908  * \param key_hash_function Function to hash keys extracted by KeyExtractor.
909  *
910  * \param key_equal_function Function to compare keys in reduce hash tables.
911  *
912  * \ingroup dia_dops
913  */
914  template <bool VolatileKeyValue,
915  typename KeyExtractor, typename ReduceFunction,
916  typename ReduceConfig = class DefaultReduceConfig,
917  typename KeyHashFunction =
918  std::hash<typename FunctionTraits<KeyExtractor>::result_type>,
919  typename KeyEqualFunction =
920  std::equal_to<typename FunctionTraits<KeyExtractor>::result_type> >
921  auto ReduceByKey(
923  const KeyExtractor& key_extractor,
924  const ReduceFunction& reduce_function,
925  const ReduceConfig& reduce_config = ReduceConfig(),
926  const KeyHashFunction& key_hash_function = KeyHashFunction(),
927  const KeyEqualFunction& key_equal_function = KeyEqualFunction()) const;
928 
929  /*!
930  * ReduceByKey is a DOp, which groups elements of the DIA with the
931  * key_extractor and reduces each key-bucket to a single element using the
932  * associative reduce_function. The reduce_function defines how two elements
933  * can be reduced to a single element of equal type.
934  *
935  * In contrast to ReduceBy, the reduce_function is allowed to change the key
936  * (Example: Integers with modulo function as key_extractor). Creates
937  * overhead as both key and value have to be sent in shuffle step. Since
938  * ReduceByKey is a DOp, it creates a new DIANode. The DIA returned by
939  * Reduce links to this newly created DIANode. The stack_ of the returned
940  * DIA consists of the PostOp of Reduce, as a reduced element can directly
941  * be chained to the following LOps.
942  *
943  * \param duplicate_detection_flag tag
944  *
945  * \param key_extractor Key extractor function, which maps each element to a
946  * key of possibly different type.
947  *
948  * \tparam ReduceFunction Type of the reduce_function. This is a function
949  * reducing two elements of L's result type to a single element of equal
950  * type.
951  *
952  * \param reduce_function Reduce function, which defines how the key buckets
953  * are reduced to a single element. This function is applied associative but
954  * not necessarily commutative.
955  *
956  * \param reduce_config Reduce configuration.
957  *
958  * \param key_hash_function Function to hash keys extracted by KeyExtractor.
959  *
960  * \param key_equal_function Function to compare keys in reduce hash tables.
961  *
962  * \ingroup dia_dops
963  */
964  template <bool DuplicateDetectionValue,
965  typename KeyExtractor, typename ReduceFunction,
966  typename ReduceConfig = class DefaultReduceConfig,
967  typename KeyHashFunction =
968  std::hash<typename FunctionTraits<KeyExtractor>::result_type>,
969  typename KeyEqualFunction =
970  std::equal_to<typename FunctionTraits<KeyExtractor>::result_type> >
971  auto ReduceByKey(
973  const KeyExtractor& key_extractor,
974  const ReduceFunction& reduce_function,
975  const ReduceConfig& reduce_config = ReduceConfig(),
976  const KeyHashFunction& key_hash_function = KeyHashFunction(),
977  const KeyEqualFunction& key_equal_function = KeyEqualFunction()) const;
978 
979  /*!
980  * ReduceByKey is a DOp, which groups elements of the DIA with the
981  * key_extractor and reduces each key-bucket to a single element using the
982  * associative reduce_function. The reduce_function defines how two elements
983  * can be reduced to a single element of equal type.
984  *
985  * In contrast to ReduceBy, the reduce_function is allowed to change the key
986  * (Example: Integers with modulo function as key_extractor). Creates
987  * overhead as both key and value have to be sent in shuffle step. Since
988  * ReduceByKey is a DOp, it creates a new DIANode. The DIA returned by
989  * Reduce links to this newly created DIANode. The stack_ of the returned
990  * DIA consists of the PostOp of Reduce, as a reduced element can directly
991  * be chained to the following LOps.
992  *
993  * \param key_extractor Key extractor function, which maps each element to a
994  * key of possibly different type.
995  *
996  * \tparam ReduceFunction Type of the reduce_function. This is a function
997  * reducing two elements of L's result type to a single element of equal
998  * type.
999  *
1000  * \param reduce_function Reduce function, which defines how the key buckets
1001  * are reduced to a single element. This function is applied associative but
1002  * not necessarily commutative.
1003  *
1004  * \param reduce_config Reduce configuration.
1005  *
1006  * \param key_hash_function Function to hash keys extracted by KeyExtractor.
1007  *
1008  * \param key_equal_function Function to compare keys in reduce hash tables.
1009  *
1010  * \ingroup dia_dops
1011  */
1012  template <bool VolatileKeyValue,
1013  bool DuplicateDetectionValue,
1014  typename KeyExtractor, typename ReduceFunction,
1015  typename ReduceConfig = class DefaultReduceConfig,
1016  typename KeyHashFunction =
1017  std::hash<typename FunctionTraits<KeyExtractor>::result_type>,
1018  typename KeyEqualFunction =
1019  std::equal_to<typename FunctionTraits<KeyExtractor>::result_type> >
1020  auto ReduceByKey(
1023  const KeyExtractor& key_extractor,
1024  const ReduceFunction& reduce_function,
1025  const ReduceConfig& reduce_config = ReduceConfig(),
1026  const KeyHashFunction& key_hash_function = KeyHashFunction(),
1027  const KeyEqualFunction& key_equal_function = KeyEqualFunction()) const;
1028 
1029  /*!
1030  * ReducePair is a DOp, which groups key-value-pairs in the input DIA by
1031  * their key and reduces each key-bucket to a single element using the
1032  * associative reduce_function. The reduce_function defines how two elements
1033  * can be reduced to a single element of equal type. The reduce_function is
1034  * allowed to change the key. Since ReducePair is a DOp, it creates a new
1035  * DIANode. The DIA returned by Reduce links to this newly created
1036  * DIANode. The stack_ of the returned DIA consists of the PostOp of Reduce,
1037  * as a reduced element can directly be chained to the following LOps.
1038  *
1039  * \tparam ReduceFunction Type of the reduce_function. This is a function
1040  * reducing two elements of L's result type to a single element of equal
1041  * type.
1042  *
1043  * \param reduce_function Reduce function, which defines how the key buckets
1044  * are reduced to a single element. This function is applied associative but
1045  * not necessarily commutative.
1046  *
1047  * \param reduce_config Reduce configuration.
1048  *
1049  * \ingroup dia_dops
1050  */
1051  template <typename ReduceFunction,
1052  typename ReduceConfig = class DefaultReduceConfig>
1053  auto ReducePair(
1054  const ReduceFunction& reduce_function,
1055  const ReduceConfig& reduce_config = ReduceConfig()) const;
1056 
1057  /*!
1058  * ReducePair is a DOp, which groups key-value-pairs in the input DIA by
1059  * their key and reduces each key-bucket to a single element using the
1060  * associative reduce_function. The reduce_function defines how two elements
1061  * can be reduced to a single element of equal type. The reduce_function is
1062  * allowed to change the key. Since ReducePair is a DOp, it creates a new
1063  * DIANode. The DIA returned by Reduce links to this newly created
1064  * DIANode. The stack_ of the returned DIA consists of the PostOp of Reduce,
1065  * as a reduced element can directly be chained to the following LOps.
1066  *
1067  * \tparam ReduceFunction Type of the reduce_function. This is a function
1068  * reducing two elements of L's result type to a single element of equal
1069  * type.
1070  *
1071  * \param reduce_function Reduce function, which defines how the key buckets
1072  * are reduced to a single element. This function is applied associative but
1073  * not necessarily commutative.
1074  *
1075  * \param reduce_config Reduce configuration.
1076  *
1077  * \param key_hash_function Function to hash keys extracted by KeyExtractor.
1078  *
1079  * \ingroup dia_dops
1080  */
1081  template <typename ReduceFunction, typename ReduceConfig,
1082  typename KeyHashFunction>
1083  auto ReducePair(
1084  const ReduceFunction& reduce_function,
1085  const ReduceConfig& reduce_config,
1086  const KeyHashFunction& key_hash_function) const;
1087 
1088  /*!
1089  * ReducePair is a DOp, which groups key-value-pairs in the input DIA by
1090  * their key and reduces each key-bucket to a single element using the
1091  * associative reduce_function. The reduce_function defines how two elements
1092  * can be reduced to a single element of equal type. The reduce_function is
1093  * allowed to change the key. Since ReducePair is a DOp, it creates a new
1094  * DIANode. The DIA returned by Reduce links to this newly created
1095  * DIANode. The stack_ of the returned DIA consists of the PostOp of Reduce,
1096  * as a reduced element can directly be chained to the following LOps.
1097  *
1098  * \tparam ReduceFunction Type of the reduce_function. This is a function
1099  * reducing two elements of L's result type to a single element of equal
1100  * type.
1101  *
1102  * \param reduce_function Reduce function, which defines how the key buckets
1103  * are reduced to a single element. This function is applied associative but
1104  * not necessarily commutative.
1105  *
1106  * \param reduce_config Reduce configuration.
1107  *
1108  * \param key_hash_function Function to hash keys extracted by KeyExtractor.
1109  *
1110  * \param key_equal_function Function to compare keys in reduce hash tables.
1111  *
1112  * \ingroup dia_dops
1113  */
1114  template <typename ReduceFunction, typename ReduceConfig,
1115  typename KeyHashFunction, typename KeyEqualFunction>
1116  auto ReducePair(
1117  const ReduceFunction& reduce_function,
1118  const ReduceConfig& reduce_config,
1119  const KeyHashFunction& key_hash_function,
1120  const KeyEqualFunction& key_equal_function) const;
1121 
1122  /*!
1123  * ReducePair is a DOp, which groups key-value-pairs in the input DIA by
1124  * their key and reduces each key-bucket to a single element using the
1125  * associative reduce_function. The reduce_function defines how two elements
1126  * can be reduced to a single element of equal type. The reduce_function is
1127  * allowed to change the key. Since ReducePair is a DOp, it creates a new
1128  * DIANode. The DIA returned by Reduce links to this newly created
1129  * DIANode. The stack_ of the returned DIA consists of the PostOp of Reduce,
1130  * as a reduced element can directly be chained to the following LOps.
1131  *
1132  * \tparam ReduceFunction Type of the reduce_function. This is a function
1133  * reducing two elements of L's result type to a single element of equal
1134  * type.
1135  *
1136  * \param reduce_function Reduce function, which defines how the key buckets
1137  * are reduced to a single element. This function is applied associative but
1138  * not necessarily commutative.
1139  *
1140  * \param reduce_config Reduce configuration.
1141  *
1142  * \param key_hash_function Function to hash keys extracted by KeyExtractor.
1143  *
1144  * \param key_equal_function Function to compare keys in reduce hash tables.
1145  *
1146  * \ingroup dia_dops
1147  */
1148  template <bool DuplicateDetectionValue,
1149  typename ReduceFunction,
1150  typename ReduceConfig = class DefaultReduceConfig,
1151  typename KeyHashFunction,
1152  typename KeyEqualFunction
1153  >
1154  auto ReducePair(
1156  const ReduceFunction& reduce_function,
1157  const ReduceConfig& reduce_config = ReduceConfig(),
1158  const KeyHashFunction& key_hash_function = KeyHashFunction(),
1159  const KeyEqualFunction& key_equal_function = KeyEqualFunction()) const;
1160 
1161  /*!
1162  * ReduceToIndex is a DOp, which groups elements of the DIA with the
1163  * key_extractor returning an unsigned integers and reduces each key-bucket
1164  * to a single element using the associative reduce_function. In contrast
1165  * to ReduceBy, ReduceToIndex returns a DIA in a defined order, which has
1166  * the reduced element with key i in position i.
1167  *
1168  * The reduce_function defines how two elements can be reduced to a single
1169  * element of equal type. The key of the reduced element has to be equal to
1170  * the keys of the input elements. Since ReduceToIndex is a DOp, it creates
1171  * a new DIANode. The DIA returned by ReduceToIndex links to this newly
1172  * created DIANode. The stack_ of the returned DIA consists of the PostOp of
1173  * ReduceToIndex, as a reduced element can directly be chained to the
1174  * following LOps.
1175  *
1176  * \param key_extractor Key extractor function, which maps each element to a
1177  * key of possibly different type.
1178  *
1179  * \tparam ReduceFunction Type of the reduce_function. This is a function
1180  * reducing two elements of L's result type to a single element of equal
1181  * type.
1182  *
1183  * \param reduce_function Reduce function, which defines how the key buckets
1184  * are reduced to a single element. This function is applied associative but
1185  * not necessarily commutative.
1186  *
1187  * \param size Resulting DIA size. Consequently, the key_extractor function
1188  * but always return < size for any element in the input DIA.
1189  *
1190  * \param neutral_element Item value with which to start the reduction in
1191  * each array cell.
1192  *
1193  * \param reduce_config Reduce configuration.
1194  *
1195  * \ingroup dia_dops
1196  */
1197  template <typename KeyExtractor, typename ReduceFunction,
1198  typename ReduceConfig = class DefaultReduceToIndexConfig>
1199  auto ReduceToIndex(
1200  const KeyExtractor& key_extractor,
1201  const ReduceFunction& reduce_function,
1202  size_t size,
1203  const ValueType& neutral_element = ValueType(),
1204  const ReduceConfig& reduce_config = ReduceConfig()) const;
1205 
1206  /*!
1207  * ReduceToIndex is a DOp, which groups elements of the DIA with the
1208  * key_extractor returning an unsigned integers and reduces each key-bucket
1209  * to a single element using the associative reduce_function. In contrast
1210  * to ReduceByKey, ReduceToIndex returns a DIA in a defined order, which has
1211  * the reduced element with key i in position i. The reduce_function
1212  * defines how two elements can be reduced to a single element of equal
1213  * type.
1214  *
1215  * ReduceToIndex is the equivalent to ReduceByKey, as the
1216  * reduce_function is allowed to change the key. Since ReduceToIndex
1217  * is a DOp, it creates a new DIANode. The DIA returned by ReduceToIndex
1218  * links to this newly created DIANode. The stack_ of the returned DIA
1219  * consists of the PostOp of ReduceToIndex, as a reduced element can
1220  * directly be chained to the following LOps.
1221  *
1222  * \param key_extractor Key extractor function, which maps each element to a
1223  * key of possibly different type.
1224  *
1225  * \tparam ReduceFunction Type of the reduce_function. This is a function
1226  * reducing two elements of L's result type to a single element of equal
1227  * type.
1228  *
1229  * \param reduce_function Reduce function, which defines how the key buckets
1230  * are reduced to a single element. This function is applied associative but
1231  * not necessarily commutative.
1232  *
1233  * \param size Resulting DIA size. Consequently, the key_extractor function
1234  * but always return < size for any element in the input DIA.
1235  *
1236  * \param neutral_element Item value with which to start the reduction in
1237  * each array cell.
1238  *
1239  * \param reduce_config Reduce configuration.
1240  *
1241  * \ingroup dia_dops
1242  */
1243  template <bool VolatileKeyValue,
1244  typename KeyExtractor, typename ReduceFunction,
1245  typename ReduceConfig = class DefaultReduceToIndexConfig>
1246  auto ReduceToIndex(
1248  const KeyExtractor& key_extractor,
1249  const ReduceFunction& reduce_function,
1250  size_t size,
1251  const ValueType& neutral_element = ValueType(),
1252  const ReduceConfig& reduce_config = ReduceConfig()) const;
1253 
1254  /*!
1255  * ReduceToIndex is a DOp, which groups elements of the DIA with the
1256  * key_extractor returning an unsigned integers and reduces each key-bucket
1257  * to a single element using the associative reduce_function. In contrast
1258  * to ReduceByKey, ReduceToIndex returns a DIA in a defined order, which has
1259  * the reduced element with key i in position i. The reduce_function
1260  * defines how two elements can be reduced to a single element of equal
1261  * type.
1262  *
1263  * ReduceToIndex is the equivalent to ReduceByKey, as the
1264  * reduce_function is allowed to change the key. Since ReduceToIndex
1265  * is a DOp, it creates a new DIANode. The DIA returned by ReduceToIndex
1266  * links to this newly created DIANode. The stack_ of the returned DIA
1267  * consists of the PostOp of ReduceToIndex, as a reduced element can
1268  * directly be chained to the following LOps.
1269  *
1270  * \param key_extractor Key extractor function, which maps each element to a
1271  * key of possibly different type.
1272  *
1273  * \tparam ReduceFunction Type of the reduce_function. This is a function
1274  * reducing two elements of L's result type to a single element of equal
1275  * type.
1276  *
1277  * \param reduce_function Reduce function, which defines how the key buckets
1278  * are reduced to a single element. This function is applied associative but
1279  * not necessarily commutative.
1280  *
1281  * \param size Resulting DIA size. Consequently, the key_extractor function
1282  * but always return < size for any element in the input DIA.
1283  *
1284  * \param neutral_element Item value with which to start the reduction in
1285  * each array cell.
1286  *
1287  * \param reduce_config Reduce configuration.
1288  *
1289  * \ingroup dia_dops
1290  */
1291  template <typename KeyExtractor, typename ReduceFunction,
1292  typename ReduceConfig = class DefaultReduceToIndexConfig>
1293  auto ReduceToIndex(
1294  const struct SkipPreReducePhaseTag&,
1295  const KeyExtractor& key_extractor,
1296  const ReduceFunction& reduce_function,
1297  size_t size,
1298  const ValueType& neutral_element = ValueType(),
1299  const ReduceConfig& reduce_config = ReduceConfig()) const;
1300 
1301  /*!
1302  * GroupByKey is a DOp, which groups elements of the DIA by its key.
1303  * After having grouped all elements of one key, all elements of one key
1304  * will be processed according to the GroupByFunction and returns an output
1305  * Contrary to Reduce, GroupBy allows usage of functions that require all
1306  * elements of one key at once as GroupByFunction will be applied _after_
1307  * all elements with the same key have been grouped. However because of this
1308  * reason, the communication overhead is also higher. If possible, usage of
1309  * Reduce is therefore recommended.
1310  *
1311  * As GroupBy is a DOp, it creates a new DIANode. The DIA returned by
1312  * Reduce links to this newly created DIANode. The stack_ of the returned
1313  * DIA consists of the PostOp of Reduce, as a reduced element can
1314  * directly be chained to the following LOps.
1315  *
1316  * \tparam KeyExtractor Type of the key_extractor function.
1317  * The key_extractor function is equal to a map function.
1318  *
1319  * \param key_extractor Key extractor function, which maps each element to a
1320  * key of possibly different type.
1321  *
1322  * \tparam GroupByFunction Type of the groupby_function. This is a function
1323  * taking an iterator for all elements of the same key as input.
1324  *
1325  * \param groupby_function Reduce function, which defines how the key
1326  * buckets are grouped and processed.
1327  * input param: api::GroupByReader with functions HasNext() and Next()
1328  *
1329  * \ingroup dia_dops
1330  */
1331  template <typename ValueOut, typename KeyExtractor,
1332  typename GroupByFunction>
1333  auto GroupByKey(const KeyExtractor& key_extractor,
1334  const GroupByFunction& groupby_function) const;
1335 
1336  /*!
1337  * GroupByKey is a DOp, which groups elements of the DIA by its key.
1338  * After having grouped all elements of one key, all elements of one key
1339  * will be processed according to the GroupByFunction and returns an output
1340  * Contrary to Reduce, GroupBy allows usage of functions that require all
1341  * elements of one key at once as GroupByFunction will be applied _after_
1342  * all elements with the same key have been grouped. However because of this
1343  * reason, the communication overhead is also higher. If possible, usage of
1344  * Reduce is therefore recommended.
1345  *
1346  * As GroupBy is a DOp, it creates a new DIANode. The DIA returned by
1347  * Reduce links to this newly created DIANode. The stack_ of the returned
1348  * DIA consists of the PostOp of Reduce, as a reduced element can
1349  * directly be chained to the following LOps.
1350  *
1351  * \tparam KeyExtractor Type of the key_extractor function.
1352  * The key_extractor function is equal to a map function.
1353  *
1354  * \param key_extractor Key extractor function, which maps each element to a
1355  * key of possibly different type.
1356  *
1357  * \tparam GroupByFunction Type of the groupby_function. This is a function
1358  * taking an iterator for all elements of the same key as input.
1359  *
1360  * \param groupby_function Reduce function, which defines how the key
1361  * buckets are grouped and processed.
1362  * input param: api::GroupByReader with functions HasNext() and Next()
1363  *
1364  * \param hash_function Hash method for Keys
1365  *
1366  * \ingroup dia_dops
1367  */
1368  template <typename ValueOut, typename KeyExtractor,
1369  typename GroupByFunction, typename HashFunction>
1370  auto GroupByKey(const KeyExtractor& key_extractor,
1371  const GroupByFunction& groupby_function,
1372  const HashFunction& hash_function) const;
1373 
1374  /*!
1375  * GroupByKey is a DOp, which groups elements of the DIA by its key.
1376  * After having grouped all elements of one key, all elements of one key
1377  * will be processed according to the GroupByFunction and returns an output
1378  * Contrary to Reduce, GroupBy allows usage of functions that require all
1379  * elements of one key at once as GroupByFunction will be applied _after_
1380  * all elements with the same key have been grouped. However because of this
1381  * reason, the communication overhead is also higher. If possible, usage of
1382  * Reduce is therefore recommended.
1383  *
1384  * As GroupBy is a DOp, it creates a new DIANode. The DIA returned by
1385  * Reduce links to this newly created DIANode. The stack_ of the returned
1386  * DIA consists of the PostOp of Reduce, as a reduced element can
1387  * directly be chained to the following LOps.
1388  *
1389  * \tparam KeyExtractor Type of the key_extractor function.
1390  * The key_extractor function is equal to a map function.
1391  *
1392  * \param key_extractor Key extractor function, which maps each element to a
1393  * key of possibly different type.
1394  *
1395  * \tparam GroupByFunction Type of the groupby_function. This is a function
1396  * taking an iterator for all elements of the same key as input.
1397  *
1398  * \param groupby_function Reduce function, which defines how the key
1399  * buckets are grouped and processed.
1400  * input param: api::GroupByReader with functions HasNext() and Next()
1401  *
1402  * \param hash_function Hash method for Keys
1403  *
1404  * \ingroup dia_dops
1405  */
1406  template <typename ValueOut, bool LocationDetectionTagValue,
1407  typename KeyExtractor, typename GroupByFunction,
1408  typename HashFunction =
1409  std::hash<typename FunctionTraits<KeyExtractor>::result_type>
1410  >
1412  const KeyExtractor& key_extractor,
1413  const GroupByFunction& groupby_function,
1414  const HashFunction& hash_function = HashFunction()) const;
1415 
1416  /*!
1417  * GroupBy is a DOp, which groups elements of the DIA by its key.
1418  * After having grouped all elements of one key, all elements of one key
1419  * will be processed according to the GroupByFunction and returns an output
1420  * Contrary to Reduce, GroupBy allows usage of functions that require all
1421  * elements of one key at once as GroupByFunction will be applied _after_
1422  * all elements with the same key have been grouped. However because of this
1423  * reason, the communication overhead is also higher. If possible, usage of
1424  * Reduce is therefore recommended.
1425  *
1426  * In contrast to GroupBy, GroupToIndex returns a DIA in a defined order,
1427  * which has the reduced element with key i in position i.
1428  * As GroupBy is a DOp, it creates a new DIANode. The DIA returned by
1429  * Reduce links to this newly created DIANode. The stack_ of the returned
1430  * DIA consists of the PostOp of Reduce, as a reduced element can
1431  * directly be chained to the following LOps.
1432  *
1433  * \tparam KeyExtractor Type of the key_extractor function.
1434  * The key_extractor function is equal to a map function.
1435  *
1436  * \param key_extractor Key extractor function, which maps each element to a
1437  * key of possibly different type.
1438  *
1439  * \tparam GroupByFunction Type of the groupby_function. This is a function
1440  * taking an iterator for all elements of the same key as input.
1441  *
1442  * \param groupby_function Reduce function, which defines how the key
1443  * buckets are grouped and processed.
1444  * input param: api::GroupByReader with functions HasNext() and Next()
1445  *
1446  * \param size Resulting DIA size. Consequently, the key_extractor function
1447  * but always return < size for any element in the input DIA.
1448  *
1449  * \param neutral_element Item value with which to start the reduction in
1450  * each array cell.
1451  *
1452  * \ingroup dia_dops
1453  */
1454  template <typename ValueOut, typename KeyExtractor,
1455  typename GroupByFunction>
1456  auto GroupToIndex(const KeyExtractor& key_extractor,
1457  const GroupByFunction& groupby_function,
1458  const size_t size,
1459  const ValueOut& neutral_element = ValueOut()) const;
1460 
1461  /*!
1462  * Zips two DIAs of equal size in style of functional programming by
1463  * applying zip_function to the i-th elements of both input DIAs to form the
1464  * i-th element of the output DIA. The type of the output DIA can be
1465  * inferred from the zip_function.
1466  *
1467  * The two input DIAs are required to be of equal size, otherwise use the
1468  * CutTag variant.
1469  *
1470  * \tparam ZipFunction Type of the zip_function. This is a function with two
1471  * input elements, both of the local type, and one output element, which is
1472  * the type of the Zip node.
1473  *
1474  * \param zip_function Zip function, which zips two elements together
1475  *
1476  * \param second_dia DIA, which is zipped together with the original
1477  * DIA.
1478  *
1479  * \ingroup dia_dops
1480  */
1481  template <typename ZipFunction, typename SecondDIA>
1482  auto Zip(const SecondDIA& second_dia,
1483  const ZipFunction& zip_function) const;
1484 
1485  /*!
1486  * Zips two DIAs in style of functional programming by applying zip_function
1487  * to the i-th elements of both input DIAs to form the i-th element of the
1488  * output DIA. The type of the output DIA can be inferred from the
1489  * zip_function.
1490  *
1491  * If the two input DIAs are of unequal size, the result is the shorter of
1492  * both. Otherwise use PadTag().
1493  *
1494  * \tparam ZipFunction Type of the zip_function. This is a function with two
1495  * input elements, both of the local type, and one output element, which is
1496  * the type of the Zip node.
1497  *
1498  * \param zip_function Zip function, which zips two elements together
1499  *
1500  * \param second_dia DIA, which is zipped together with the original
1501  * DIA.
1502  *
1503  * \ingroup dia_dops
1504  */
1505  template <typename ZipFunction, typename SecondDIA>
1506  auto Zip(struct CutTag const&, const SecondDIA& second_dia,
1507  const ZipFunction& zip_function) const;
1508 
1509  /*!
1510  * Zips two DIAs in style of functional programming by applying zip_function
1511  * to the i-th elements of both input DIAs to form the i-th element of the
1512  * output DIA. The type of the output DIA can be inferred from the
1513  * zip_function.
1514  *
1515  * The output DIA's length is the *maximum* of all input DIAs, shorter DIAs
1516  * are padded with default-constructed items.
1517  *
1518  * \tparam ZipFunction Type of the zip_function. This is a function with two
1519  * input elements, both of the local type, and one output element, which is
1520  * the type of the Zip node.
1521  *
1522  * \param zip_function Zip function, which zips two elements together
1523  *
1524  * \param second_dia DIA, which is zipped together with the original
1525  * DIA.
1526  *
1527  * \ingroup dia_dops
1528  */
1529  template <typename ZipFunction, typename SecondDIA>
1530  auto Zip(struct PadTag const&, const SecondDIA& second_dia,
1531  const ZipFunction& zip_function) const;
1532 
1533  /*!
1534  * Zips two DIAs in style of functional programming by applying zip_function
1535  * to the i-th elements of both input DIAs to form the i-th element of the
1536  * output DIA. The type of the output DIA can be inferred from the
1537  * zip_function.
1538  *
1539  * In this variant, the DIA partitions on all PEs must have matching
1540  * length. No rebalancing is performed, and the program will die if any
1541  * partition mismatches. This enables Zip to proceed without any
1542  * communication.
1543  *
1544  * \tparam ZipFunction Type of the zip_function. This is a function with two
1545  * input elements, both of the local type, and one output element, which is
1546  * the type of the Zip node.
1547  *
1548  * \param zip_function Zip function, which zips two elements together
1549  *
1550  * \param second_dia DIA, which is zipped together with the original
1551  * DIA.
1552  *
1553  * \ingroup dia_dops
1554  */
1555  template <typename ZipFunction, typename SecondDIA>
1556  auto Zip(struct NoRebalanceTag const&, const SecondDIA& second_dia,
1557  const ZipFunction& zip_function) const;
1558 
1559  /*!
1560  * Zips each item of a DIA with its zero-based array index. This requires a
1561  * full data store/retrieve cycle because the input DIA's size is generally
1562  * unknown.
1563  *
1564  * \param zip_function Zip function, which gets each element together with
1565  * its array index.
1566  *
1567  * \ingroup dia_dops
1568  */
1569  template <typename ZipFunction>
1570  auto ZipWithIndex(const ZipFunction& zip_function) const;
1571 
1572  /*!
1573  * Sort is a DOp, which sorts a given DIA according to the given compare_function.
1574  *
1575  * \tparam CompareFunction Type of the compare_function.
1576  * Should be (ValueType,ValueType)->bool
1577  *
1578  * \param compare_function Function, which compares two elements. Returns
1579  * true, if first element is smaller than second. False otherwise.
1580  *
1581  * \ingroup dia_dops
1582  */
1583  template <typename CompareFunction = std::less<ValueType> >
1584  auto Sort(const CompareFunction& compare_function = CompareFunction()) const;
1585 
1586  /*!
1587  * Sort is a DOp, which sorts a given DIA according to the given compare_function.
1588  *
1589  * \tparam CompareFunction Type of the compare_function.
1590  * Should be (ValueType,ValueType)->bool
1591  *
1592  * \param compare_function Function, which compares two elements. Returns
1593  * true, if first element is smaller than second. False otherwise.
1594  *
1595  * \param sort_algorithm Algorithm class used to sort items. Merging is
1596  * always done using a tournament tree with compare_function.
1597  *
1598  * \ingroup dia_dops
1599  */
1600  template <typename CompareFunction, typename SortFunction>
1601  auto Sort(const CompareFunction& compare_function,
1602  const SortFunction& sort_algorithm) const;
1603 
1604  /*!
1605  * Merge is a DOp, which merges two sorted DIAs to a single sorted DIA.
1606  * Both input DIAs must be used sorted conforming to the given comparator.
1607  * The type of the output DIA will be the type of this DIA.
1608  *
1609  * The merge operation balances all input data, so that each worker will
1610  * have an equal number of elements when the merge completes.
1611  *
1612  * \param comparator Comparator to specify the order of input and output.
1613  *
1614  * \param second_dia DIA, which is merged with this DIA.
1615  *
1616  * \ingroup dia_dops
1617  */
1618  template <typename Comparator = std::less<ValueType>, typename SecondDIA>
1619  auto Merge(const SecondDIA& second_dia,
1620  const Comparator& comparator = Comparator()) const;
1621 
1622  /*!
1623  * PrefixSum is a DOp, which computes the prefix sum of all elements. The sum
1624  * function defines how two elements are combined to a single element.
1625  *
1626  * \param sum_function Sum function (any associative function).
1627  *
1628  * \param initial_element Initial element of the sum function.
1629  *
1630  * \ingroup dia_dops
1631  */
1632  template <typename SumFunction = std::plus<ValueType> >
1633  auto PrefixSum(const SumFunction& sum_function = SumFunction(),
1634  const ValueType& initial_element = ValueType()) const;
1635 
1636  /*!
1637  * Window is a DOp, which applies a window function to every k
1638  * consecutive items in a DIA. The window function is also given the index
1639  * of the first item, and can output zero or more items via an Emitter.
1640  *
1641  * \param window_size the size of the delivered window. Signature: TODO(tb).
1642  *
1643  * \param window_function Window function applied to each k item.
1644  *
1645  * \ingroup dia_dops
1646  */
1647  template <typename WindowFunction>
1648  auto Window(size_t window_size,
1649  const WindowFunction& window_function = WindowFunction()) const;
1650 
1651  /*!
1652  * Window is a DOp, which applies a window function to every k
1653  * consecutive items in a DIA. The window function is also given the index
1654  * of the first item, and can output zero or more items via an Emitter.
1655  *
1656  * \param window_size the size of the delivered window. Signature: TODO(tb).
1657  *
1658  * \param window_function Window function applied to each k item.
1659  *
1660  * \param partial_window_function Window function applied to less than k
1661  * items.
1662  *
1663  * \ingroup dia_dops
1664  */
1665  template <typename WindowFunction, typename PartialWindowFunction>
1666  auto Window(size_t window_size,
1667  const WindowFunction& window_function,
1668  const PartialWindowFunction& partial_window_function) const;
1669 
1670  /*!
1671  * Window is a DOp, which applies a window function to every k
1672  * consecutive items in a DIA. The window function is also given the index
1673  * of the first item, and can output zero or more items via an Emitter.
1674  *
1675  * \param window_size the size of the delivered window.
1676  *
1677  * \param window_function Window function applied to each k item.
1678  *
1679  * \ingroup dia_dops
1680  */
1681  template <typename WindowFunction>
1682  auto Window(struct DisjointTag const&, size_t window_size,
1683  const WindowFunction& window_function) const;
1684 
1685  /*!
1686  * FlatWindow is a DOp, which applies a window function to every k
1687  * consecutive items in a DIA. The window function is also given the index
1688  * of the first item, and can output zero or more items via an Emitter.
1689  *
1690  * \param window_size the size of the delivered window. Signature: TODO(tb).
1691  *
1692  * \param window_function Window function applied to each k item.
1693  *
1694  * \ingroup dia_dops
1695  */
1696  template <typename ValueOut, typename WindowFunction>
1697  auto FlatWindow(
1698  size_t window_size,
1699  const WindowFunction& window_function = WindowFunction()) const;
1700 
1701  /*!
1702  * FlatWindow is a DOp, which applies a window function to every k
1703  * consecutive items in a DIA. The window function is also given the index
1704  * of the first item, and can output zero or more items via an Emitter.
1705  *
1706  * \param window_size the size of the delivered window. Signature: TODO(tb).
1707  *
1708  * \param window_function Window function applied to each k item.
1709  *
1710  * \param partial_window_function Window function applied to less than k
1711  * items.
1712  *
1713  * \ingroup dia_dops
1714  */
1715  template <typename ValueOut, typename WindowFunction,
1716  typename PartialWindowFunction>
1717  auto FlatWindow(size_t window_size,
1718  const WindowFunction& window_function,
1719  const PartialWindowFunction& partial_window_function) const;
1720 
1721  /*!
1722  * FlatWindow is a DOp, which applies a window function to every k
1723  * consecutive items in a DIA. The window function is also given the index
1724  * of the first item, and can output zero or more items via an Emitter.
1725  *
1726  * \param window_size the size of the delivered window. Signature: TODO(tb).
1727  *
1728  * \param window_function Window function applied to each k item.
1729  *
1730  * \ingroup dia_dops
1731  */
1732  template <typename ValueOut, typename WindowFunction>
1733  auto FlatWindow(struct DisjointTag const&, size_t window_size,
1734  const WindowFunction& window_function) const;
1735 
1736  /*!
1737  * Concat is a DOp, which concatenates any number of DIAs to a single DIA.
1738  * All input DIAs must contain the same type, which is also the output DIA's
1739  * type.
1740  *
1741  * The concat operation balances all input data, so that each worker will
1742  * have an equal number of elements when the concat completes.
1743  *
1744  * \ingroup dia_dops
1745  */
1746  template <typename SecondDIA>
1747  auto Concat(const SecondDIA& second_dia) const;
1748 
1749  /*!
1750  * Rebalance is a DOp, which rebalances a single DIA among all workers; in
1751  * general, this operation is needed only if previous steps are known to
1752  * create heavy imbalance (e.g. like Filter()s which cut DIAs to ranges).
1753  *
1754  * \ingroup dia_dops
1755  */
1756  auto Rebalance() const;
1757 
1758  /*!
1759  * Create a CollapseNode which is mainly used to collapse the LOp chain into
1760  * a DIA<T> with an empty stack. This is most often necessary for iterative
1761  * algorithms, where a DIA<T> reference variable is updated in each
1762  * iteration.
1763  *
1764  * \ingroup dia_dops
1765  */
1766  DIA<ValueType> Collapse() const;
1767 
1768  /*!
1769  * Create a CacheNode which contains all items of a DIA in calculated plain
1770  * format. This is needed if a DIA is reused many times, in order to avoid
1771  * recalculating a PostOp multiple times.
1772  *
1773  * \ingroup dia_dops
1774  */
1775  DIA<ValueType> Cache() const;
1776 
1777  //! \}
1778 
1779 private:
1780  //! The DIANode which DIA points to. The node represents the latest DOp
1781  //! or Action performed previously.
1783 
1784  //! The local function chain, which stores the chained lambda function from
1785  //! the last DIANode to this DIA.
1787 
1788  //! DIA serial id for logging, matches DIANode::id_ for DOps.
1789  size_t id_ = 0;
1790 
1791  //! static DIA (LOp or DOp) node label string, may match DIANode::label_.
1792  const char* label_ = nullptr;
1793 
1794  //! deliver next DIA serial id
1795  size_t next_dia_id() { return context().next_dia_id(); }
1796 };
1797 
1798 //! \}
1799 
1800 } // namespace api
1801 
1802 //! imported from api namespace
1803 using api::DIA;
1804 
1805 //! imported from api namespace
1806 using api::DisjointTag;
1807 
1808 //! imported from api namespace
1809 using api::VolatileKeyFlag;
1810 
1811 //! imported from api namespace
1812 using api::VolatileKeyTag;
1813 
1814 //! imported from api namespace
1815 using api::NoVolatileKeyTag;
1816 
1817 //! imported from api namespace
1819 
1820 //! imported from api namespace
1821 using api::CutTag;
1822 
1823 //! imported from api namespace
1824 using api::PadTag;
1825 
1826 //! imported from api namespace
1827 using api::NoRebalanceTag;
1828 
1829 //! imported from api namespace
1830 using api::DuplicateDetectionFlag;
1831 
1832 //! imported from api namespace
1834 
1835 //! imported from api namespace
1837 
1838 //! imported from api namespace
1839 using api::LocationDetectionFlag;
1840 
1841 //! imported from api namespace
1843 
1844 //! imported from api namespace
1846 
1847 } // namespace thrill
1848 
1849 #endif // !THRILL_API_DIA_HEADER
1850 
1851 /******************************************************************************/
std::vector< ValueType > AllGather() const
Returns the whole DIA in an std::vector on each worker.
Definition: all_gather.hpp:115
const DIA & Keep(size_t increase=1) const
Mark the referenced DIANode for keeping, which makes children not consume the data when executing...
Definition: dia.hpp:293
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
ValueType_ ValueType
Definition: dia.hpp:152
Future< std::vector< ValueType > > AllGatherFuture() const
Returns the whole DIA in an std::vector on each worker.
Definition: all_gather.hpp:144
Context & context()
Returns the api::Context of this DIABase.
Definition: dia_base.hpp:208
Type[] Array
A template to make writing temporary arrays easy: Array<int>{ 1, 2, 3 }.
Definition: json_logger.hpp:64
Future< ValueType > AllReduceFuture(const ReduceFunction &reduce_function, const ValueType &initial_value=ValueType()) const
AllReduce is an ActionFuture, which computes the reduction sum of all elements globally and delivers ...
Definition: all_reduce.hpp:122
const DIA & Execute() const
Execute DIA's scope and parents such that this (Action)Node is Executed.
Definition: dia.hpp:318
auto Sample(size_t sample_size) const
Select up to sample_size items uniformly at random and return a new DIA<T>.
Definition: sample.hpp:246
const struct LocationDetectionFlag< true > LocationDetectionTag
global const LocationDetectionFlag instance
Definition: dia.hpp:122
bool consume() const
return value of consume flag.
Definition: context.hpp:375
typename Stack::Input StackInput
Definition: dia.hpp:160
const struct PadTag PadTag
global const PadTag instance
Definition: dia.hpp:83
DIANodePtr node_
Definition: dia.hpp:1782
const char * label_
static DIA (LOp or DOp) node label string, may match DIANode::label_.
Definition: dia.hpp:1792
std::vector< ValueType > Gather(size_t target_id=0) const
Gather is an Action, which collects all data of the DIA into a vector at the given worker...
Definition: gather.hpp:94
virtual size_t consume_counter() const
Returns consume_counter_.
Definition: dia_base.hpp:226
DIA()=default
default-constructor: invalid DIA
const DIANodePtr & node() const
Returns a pointer to the according DIANode.
Definition: dia.hpp:251
bool IsValid() const
Return whether the DIA is valid.
Definition: dia.hpp:175
const Stack & stack() const
Returns the stored function chain.
Definition: dia.hpp:263
auto Zip(const SecondDIA &second_dia, const ZipFunction &zip_function) const
Zips two DIAs of equal size in style of functional programming by applying zip_function to the i-th e...
Definition: zip.hpp:676
auto Concat(const SecondDIA &second_dia) const
Concat is a DOp, which concatenates any number of DIAs to a single DIA.
Definition: concat.hpp:389
const struct VolatileKeyFlag< true > VolatileKeyTag
global const VolatileKeyFlag instance
Definition: dia.hpp:48
const char * label() const
Returns label_.
Definition: dia.hpp:284
tag structure for ReduceToIndex()
Definition: dia.hpp:54
const struct SkipPreReducePhaseTag SkipPreReducePhaseTag
global const SkipPreReducePhaseTag instance
Definition: dia.hpp:59
void WriteLines(const std::string &filepath, size_t target_file_size=128 *1024 *1024) const
WriteLines is an Action, which writes std::strings to multiple output files.
#define die(msg)
Instead of abort(), throw the output the message via an exception.
Definition: die.hpp:42
auto Filter(const FilterFunction &filter_function) const
Each item of a DIA is tested using filter_function : to determine whether it is copied into the outp...
Definition: dia.hpp:386
ValueType AllReduce(const ReduceFunction &reduce_function, const ValueType &initial_value=ValueType()) const
AllReduce is an Action, which computes the reduction sum of all elements globally and delivers the sa...
Definition: all_reduce.hpp:88
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:218
Context & context() const
Return context_ of DIANode, e.g. for creating new LOps and DOps.
Definition: dia.hpp:269
DIA(const DIANodePtr &node, const Stack &stack, size_t id, const char *label)
Constructor of a new DIA with a pointer to a DIANode and a function chain from the DIANode to this DI...
Definition: dia.hpp:194
const DIA & KeepForever() const
Mark the referenced DIANode for keeping forever, which makes children not consume the data when execu...
Definition: dia.hpp:308
auto Sort(const CompareFunction &compare_function=CompareFunction()) const
Sort is a DOp, which sorts a given DIA according to the given compare_function.
Definition: sort.hpp:755
Type * get() const noexcept
return the enclosed pointer.
Specialized template class for ActionFuture which return void.
tag structure for Zip()
Definition: dia.hpp:78
auto Rebalance() const
Rebalance is a DOp, which rebalances a single DIA among all workers; in general, this operation is ne...
Definition: rebalance.hpp:123
A FunctionStack is a chain of functor that can be folded to a single functor (which is usually optimi...
double HyperLogLog() const
Compute the approximate number of distinct elements in the DIA.
Definition: hyperloglog.hpp:64
size_t next_dia_id()
deliver next DIA serial id
Definition: dia.hpp:1795
const struct VolatileKeyFlag< false > NoVolatileKeyTag
global const VolatileKeyFlag instance
Definition: dia.hpp:51
auto PrefixSum(const SumFunction &sum_function=SumFunction(), const ValueType &initial_element=ValueType()) const
PrefixSum is a DOp, which computes the prefix sum of all elements.
Definition: prefixsum.hpp:132
Future< size_t > SizeFuture() const
Lazily computes the total size of all elements across all workers.
Definition: size.hpp:110
auto ReduceToIndex(const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, size_t size, const ValueType &neutral_element=ValueType(), const ReduceConfig &reduce_config=ReduceConfig()) const
ReduceToIndex is a DOp, which groups elements of the DIA with the key_extractor returning an unsigned...
tag structure for ReduceByKey(), and ReduceToIndex()
Definition: dia.hpp:42
auto ReducePair(const ReduceFunction &reduce_function, const ReduceConfig &reduce_config=ReduceConfig()) const
ReducePair is a DOp, which groups key-value-pairs in the input DIA by their key and reduces each key-...
size_t next_dia_id()
Returns next_dia_id_ to generate DIA::id_ serial.
Definition: context.hpp:388
ValueType Sum(const SumFunction &sum_function=SumFunction(), const ValueType &initial_value=ValueType()) const
Sum is an Action, which computes the sum of all elements globally.
Definition: sum.hpp:23
Context & ctx() const
Return context_ of DIANode, e.g. for creating new LOps and DOps.
Definition: dia.hpp:275
int value
Definition: gen_data.py:41
tag structure for Zip()
Definition: dia.hpp:70
size_t reference_count() const noexcept
Return the number of references to this object (for debugging)
void Print(const std::string &name) const
Print is an Action, which collects all data of the DIA at the worker 0 and prints using ostream seria...
Definition: print.hpp:48
common::JsonLogger logger_
Definition: context.hpp:459
common::FunctionTraits< Function > FunctionTraits
alias for convenience.
Definition: dia.hpp:147
auto Window(size_t window_size, const WindowFunction &window_function=WindowFunction()) const
Window is a DOp, which applies a window function to every k consecutive items in a DIA...
Definition: window.hpp:287
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
auto Merge(const SecondDIA &second_dia, const Comparator &comparator=Comparator()) const
Merge is a DOp, which merges two sorted DIAs to a single sorted DIA.
Definition: merge.hpp:717
void WriteLinesOne(const std::string &filepath) const
WriteLinesOne is an Action, which writes std::strings to a single output file.
Stack stack_
Definition: dia.hpp:1786
The return type class for all ActionFutures.
Definition: action_node.hpp:83
auto GroupByKey(const KeyExtractor &key_extractor, const GroupByFunction &groupby_function) const
GroupByKey is a DOp, which groups elements of the DIA by its key.
const struct LocationDetectionFlag< false > NoLocationDetectionTag
global const LocationDetectionFlag instance
Definition: dia.hpp:125
static constexpr bool stack_empty
boolean indication whether this FunctionStack is empty
Definition: dia.hpp:163
tag structure for Zip()
Definition: dia.hpp:86
const struct DuplicateDetectionFlag< true > DuplicateDetectionTag
global const DuplicateDetectionFlag instance
Definition: dia.hpp:109
auto ReduceByKey(const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, const ReduceConfig &reduce_config=ReduceConfig()) const
ReduceByKey is a DOp, which groups elements of the DIA with the key_extractor and reduces each key-bu...
virtual void SetConsumeCounter(size_t counter)
Definition: dia_base.hpp:248
const struct DuplicateDetectionFlag< false > NoDuplicateDetectionTag
global const DuplicateDetectionFlag instance
Definition: dia.hpp:112
auto Map(const MapFunction &map_function) const
Map applies map_function : to each item of a DIA and delivers a new DIA contains the returned values...
Definition: dia.hpp:341
auto GroupToIndex(const KeyExtractor &key_extractor, const GroupByFunction &groupby_function, const size_t size, const ValueOut &neutral_element=ValueOut()) const
GroupBy is a DOp, which groups elements of the DIA by its key.
auto FlatMap(const FlatmapFunction &flatmap_function) const
Each item of a DIA is expanded by the flatmap_function : to zero or more items of different type...
Definition: dia.hpp:437
DIA< ValueType > Cache() const
Create a CacheNode which contains all items of a DIA in calculated plain format.
Definition: cache.hpp:94
static const bool value
Definition: dia.hpp:44
tag structure for ReduceByKey()
Definition: dia.hpp:103
Future< void > WriteLinesOneFuture(const std::string &filepath) const
WriteLinesOne is an ActionFuture, which writes std::strings to a single output file.
auto BernoulliSample(double p) const
Each item of a DIA is copied into the output DIA with success probability p (an independent Bernoulli...
size_t id() const
Returns id_.
Definition: dia.hpp:281
Future< ValueType > SumFuture(const SumFunction &sum_function=SumFunction(), const ValueType &initial_value=ValueType()) const
Sum is an ActionFuture, which computes the sum of all elements globally.
Definition: sum.hpp:57
size_t node_refcount() const
Returns the number of references to the according DIANode.
Definition: dia.hpp:257
ValueType Min(const ValueType &initial_value=ValueType()) const
Min is an Action, which computes the minimum of all elements globally.
Definition: min.hpp:22
tag structure for Read()
Definition: dia.hpp:94
void WriteBinary(const std::string &filepath, size_t max_file_size=128 *1024 *1024) const
WriteBinary is a function, which writes a DIA to many files per worker.
size_t id_
DIA serial id for logging, matches DIANode::id_ for DOps.
Definition: dia.hpp:1789
DIA(DIANodePtr &&node)
Constructor of a new DIA with a real backing DIABase.
Definition: dia.hpp:219
ValueType Max(const ValueType &initial_value=ValueType()) const
Max is an Action, which computes the maximum of all elements globally.
Definition: max.hpp:22
auto FlatWindow(size_t window_size, const WindowFunction &window_function=WindowFunction()) const
FlatWindow is a DOp, which applies a window function to every k consecutive items in a DIA...
Definition: window.hpp:272
Future< ValueType > MaxFuture(const ValueType &initial_value=ValueType()) const
Max is an ActionFuture, which computes the maximum of all elements globally.
Definition: max.hpp:32
auto ZipWithIndex(const ZipFunction &zip_function) const
Zips each item of a DIA with its zero-based array index.
tag structure for Window() and FlatWindow()
Definition: dia.hpp:62
Future< void > WriteBinaryFuture(const std::string &filepath, size_t max_file_size=128 *1024 *1024) const
WriteBinary is a function, which writes a DIA to many files per worker.
tag structure for GroupByKey(), and InnerJoin()
Definition: dia.hpp:116
void AssertValid() const
Assert that the DIA is valid.
Definition: dia.hpp:178
size_t Size() const
Computes the total size of all elements across all workers.
Definition: size.hpp:100
virtual void IncConsumeCounter(size_t counter)
Definition: dia_base.hpp:230
const struct CutTag CutTag
global const CutTag instance
Definition: dia.hpp:75
Stack_ Stack
Type of this function stack.
Definition: dia.hpp:155
const struct NoRebalanceTag NoRebalanceTag
global const NoRebalanceTag instance
Definition: dia.hpp:91
const struct DisjointTag DisjointTag
global const DisjointTag instance
Definition: dia.hpp:67
static const bool value
Definition: dia.hpp:118
auto Union(const SecondDIA &second_dia) const
Union is a LOp, which creates the union of all items from any number of DIAs as a single DIA...
Definition: union.hpp:379
static constexpr size_t kNeverConsume
Never full consume.
Definition: dia_base.hpp:324
Future< ValueType > MinFuture(const ValueType &initial_value=ValueType()) const
Min is an ActionFuture, which computes the minimum of all elements globally.
Definition: min.hpp:32
Future< void > WriteLinesFuture(const std::string &filepath, size_t target_file_size=128 *1024 *1024) const
WriteLines is an ActionFuture, which writes std::strings to multiple output files.
DIA< ValueType > Collapse() const
Create a CollapseNode which is mainly used to collapse the LOp chain into a DIA<T> with an empty stac...
Definition: collapse.hpp:158
DIA(DIANodePtr &&node, const Stack &stack, size_t id, const char *label)
Constructor of a new DIA supporting move semantics of nodes.
Definition: dia.hpp:210