Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
dia.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/dia.hpp
3  *
4  * Interface for Operations, holds pointer to node and lambda from node to state
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Alexander Noe <[email protected]>
9  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
10  * Copyright (C) 2015 Timo Bingmann <[email protected]>
11  * Copyright (C) 2015 Huyen Chau Nguyen <[email protected]>
12  *
13  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
14  ******************************************************************************/
15 
16 #pragma once
17 #ifndef THRILL_API_DIA_HEADER
18 #define THRILL_API_DIA_HEADER
19 
21 #include <thrill/api/context.hpp>
22 #include <thrill/api/dia_node.hpp>
26 
27 #include <cassert>
28 #include <functional>
29 #include <ostream>
30 #include <string>
31 #include <utility>
32 #include <vector>
33 
34 namespace thrill {
35 namespace api {
36 
37 //! \ingroup api_layer
38 //! \{
39 
40 //! tag structure for ReduceByKey(), and ReduceToIndex()
41 template <bool Value>
44  static const bool value = Value;
45 };
46 
47 //! global const VolatileKeyFlag instance
48 const struct VolatileKeyFlag<true> VolatileKeyTag;
49 
50 //! global const VolatileKeyFlag instance
51 const struct VolatileKeyFlag<false> NoVolatileKeyTag;
52 
53 //! tag structure for ReduceToIndex()
56 };
57 
58 //! global const SkipPreReducePhaseTag instance
60 
61 //! tag structure for Window() and FlatWindow()
62 struct DisjointTag {
64 };
65 
66 //! global const DisjointTag instance
67 const struct DisjointTag DisjointTag;
68 
69 //! tag structure for Zip()
70 struct CutTag {
71  CutTag() { }
72 };
73 
74 //! global const CutTag instance
75 const struct CutTag CutTag;
76 
77 //! tag structure for Zip()
78 struct PadTag {
79  PadTag() { }
80 };
81 
82 //! global const PadTag instance
83 const struct PadTag PadTag;
84 
85 //! tag structure for Zip()
88 };
89 
90 //! global const NoRebalanceTag instance
92 
93 //! tag structure for Read()
96 };
97 
98 //! global const LocalStorageTag instance
100 
101 //! tag structure for ReduceByKey()
102 template <bool Value>
105  static const bool value = Value;
106 };
107 
108 //! global const DuplicateDetectionFlag instance
110 
111 //! global const DuplicateDetectionFlag instance
113 
114 //! tag structure for GroupByKey(), and InnerJoin()
115 template <bool Value>
118  static const bool value = Value;
119 };
120 
121 //! global const LocationDetectionFlag instance
123 
124 //! global const LocationDetectionFlag instance
126 
127 /*!
128  * DIA is the interface between the user and the Thrill framework. A DIA can be
129  * imagined as an immutable array, even though the data does not need to be
130  * which represents the state after the previous DOp or Action. Additionally, a
131  * DIA stores the local lambda function chain of type Stack, which can transform
132  * elements of the DIANode to elements of this DIA. DOps/Actions create a DIA
133  * and a new DIANode, to which the DIA links to. LOps only create a new DIA,
134  * which link to the previous DIANode.
135  *
136  * \tparam ValueType Type of elements currently in this DIA.
137  * \tparam Stack Type of the function chain.
138  */
139 template <typename ValueType_,
140  typename Stack_ = tlx::FunctionStack<ValueType_> >
141 class DIA
142 {
143  friend class Context;
144 
145  //! alias for convenience.
146  template <typename Function>
147  using FunctionTraits = common::FunctionTraits<Function>;
148 
149 public:
150  //! type of the items virtually in the DIA, which is the type emitted by the
151  //! current LOp stack.
152  using ValueType = ValueType_;
153 
154  //! Type of this function stack
155  using Stack = Stack_;
156 
157  //! type of the items delivered by the DOp, and pushed down the function
158  //! stack towards the next nodes. If the function stack contains LOps nodes,
159  //! these may transform the type.
160  using StackInput = typename Stack::Input;
161 
162  //! boolean indication whether this FunctionStack is empty
163  static constexpr bool stack_empty = Stack::empty;
164 
165  //! type of pointer to the real node object implementation. This object has
166  //! base item type StackInput which is transformed by the function stack
167  //! lambdas further. But even pushing more lambdas does not change the stack
168  //! input type.
170 
171  //! default-constructor: invalid DIA
172  DIA() = default;
173 
174  //! Return whether the DIA is valid.
175  bool IsValid() const { return node_.get() != nullptr; }
176 
177  //! Assert that the DIA is valid.
178  void AssertValid() const { assert(IsValid()); }
179 
180  /*!
181  * Constructor of a new DIA with a pointer to a DIANode and a
182  * function chain from the DIANode to this DIA.
183  *
184  * \param node Pointer to the last DIANode, DOps and Actions create a new
185  * DIANode, LOps link to the DIANode of the previous DIA.
186  *
187  * \param stack Function stack consisting of functions between last DIANode
188  * and this DIA.
189  *
190  * \param id Serial id of DIA, which includes LOps
191  *
192  * \param label static string label of DIA.
193  */
194  DIA(const DIANodePtr& node, const Stack& stack,
195  size_t dia_id, const char* label)
196  : node_(node), stack_(stack),
197  dia_id_(dia_id), label_(label) { }
198 
199  /*!
200  * Constructor of a new DIA supporting move semantics of nodes.
201  *
202  * \param node Pointer to the last DIANode, DOps and Actions create a new
203  * DIANode, LOps link to the DIANode of the previous DIA.
204  *
205  * \param stack Function stack consisting of functions between last DIANode
206  * and this DIA.
207  *
208  * \param id Serial id of DIA, which includes LOps
209  *
210  * \param label static string label of DIA.
211  */
213  size_t dia_id, const char* label)
214  : node_(std::move(node)), stack_(stack),
215  dia_id_(dia_id), label_(label) { }
216 
217  /*!
218  * Constructor of a new DIA with a real backing DIABase.
219  *
220  * \param node Pointer to the last DIANode, DOps and Actions create a new
221  * DIANode, LOps link to the DIANode of the previous DIA.
222  */
223  explicit DIA(DIANodePtr&& node)
224  : DIA(std::move(node), tlx::FunctionStack<ValueType>(),
225  node->dia_id(), node->label()) { }
226 
227  /*!
228  * Copy-Constructor of a DIA with empty function chain from a DIA with
229  * a non-empty chain. The functionality of the chain is stored in a newly
230  * created LOpNode. The current DIA than points to this LOpNode. This
231  * is needed to support assignment operations between DIA's.
232  */
233 #ifdef THRILL_DOXYGEN_IGNORE
234  template <typename AnyStack>
235  DIA(const DIA<ValueType, AnyStack>& rhs);
236 #else
237  template <typename AnyStack>
238  DIA(const DIA<ValueType, AnyStack>& rhs)
239 #if __GNUC__ && !__clang__
240  // the attribute warning does not work with gcc?
241  __attribute__ ((warning( // NOLINT
242  "Casting to DIA creates LOpNode instead of inline chaining.\n"
243  "Consider whether you can use auto instead of DIA.")));
244 #elif __GNUC__ && __clang__
245  __attribute__ ((deprecated)); // NOLINT
246 #else
247  ; // NOLINT
248 #endif
249 #endif // THRILL_DOXYGEN_IGNORE
250 
251  //! \name Const Accessors
252  //! \{
253 
254  //! Returns a pointer to the according DIANode.
255  const DIANodePtr& node() const {
256  assert(IsValid());
257  return node_;
258  }
259 
260  //! Returns the number of references to the according DIANode.
261  size_t node_refcount() const {
262  assert(IsValid());
263  return node_->reference_count();
264  }
265 
266  //! Returns the stored function chain.
267  const Stack& stack() const {
268  assert(IsValid());
269  return stack_;
270  }
271 
272  //! Return context_ of DIANode, e.g. for creating new LOps and DOps
273  Context& context() const {
274  assert(IsValid());
275  return node_->context();
276  }
277 
278  //! Return context_ of DIANode, e.g. for creating new LOps and DOps
279  Context& ctx() const {
280  assert(IsValid());
281  return node_->context();
282  }
283 
284  //! Returns id_
285  size_t id() const { return dia_id_; }
286 
287  //! Returns label_
288  const char * label() const { return label_; }
289 
290  //! \}
291 
292  /*!
293  * Mark the referenced DIANode for keeping, which makes children not consume
294  * the data when executing. This does not create a new DIA, but returns the
295  * existing one.
296  */
297  const DIA& Keep(size_t increase = 1) const {
298  assert(IsValid());
299  if (node_->context().consume() && node_->consume_counter() == 0) {
300  die("Keep() called on "
301  << *node_ << " which was already consumed.");
302  }
303  node_->IncConsumeCounter(increase);
304  return *this;
305  }
306 
307  /*!
308  * Mark the referenced DIANode for keeping forever, which makes children not
309  * consume the data when executing. This does not create a new DIA, but
310  * returns the existing one.
311  */
312  const DIA& KeepForever() const {
313  assert(IsValid());
315  return *this;
316  }
317 
318  /*!
319  * Execute DIA's scope and parents such that this (Action)Node is
320  * Executed. This does not create a new DIA, but returns the existing one.
321  */
322  const DIA& Execute() const {
323  assert(IsValid());
324  node_->RunScope();
325  return *this;
326  }
327 
328  //! \name Local Operations (LOps)
329  //! \{
330 
331  /*!
332  * Map applies `map_function` : \f$ A \to B \f$ to each item of a DIA and
333  * delivers a new DIA contains the returned values, which may be of a
334  * different type.
335  *
336  * The function chain of the returned DIA is this DIA's stack_ chained with
337  * map_fn.
338  *
339  * \param map_function Map function of type MapFunction, which maps each
340  * element to an element of a possibly different type.
341  *
342  * \ingroup dia_lops
343  */
344  template <typename MapFunction>
345  auto Map(const MapFunction& map_function) const {
346  assert(IsValid());
347 
348  using MapArgument
349  = typename FunctionTraits<MapFunction>::template arg_plain<0>;
350  using MapResult
352  auto conv_map_function =
353  [map_function](const MapArgument& input, auto emit_func) {
354  emit_func(map_function(input));
355  };
356 
357  static_assert(
359  "MapFunction has the wrong input type");
360 
361  size_t new_id = context().next_dia_id();
362 
364  << "dia_id" << new_id
365  << "label" << "Map"
366  << "class" << "DIA"
367  << "event" << "create"
368  << "type" << "LOp"
369  << "parents" << (common::Array<size_t>{ dia_id_ });
370 
371  auto new_stack = stack_.push(conv_map_function);
373  node_, new_stack, new_id, "Map");
374  }
375 
376  /*!
377  * Each item of a DIA is tested using `filter_function` : \f$ A \to
378  * \textrm{bool} \f$ to determine whether it is copied into the output DIA
379  * or excluded.
380  *
381  * The function chain of the returned DIA is this DIA's stack_ chained with
382  * filter_function.
383  *
384  * \param filter_function Filter function of type FilterFunction, which maps
385  * each element to a boolean.
386  *
387  * \ingroup dia_lops
388  */
389  template <typename FilterFunction>
390  auto Filter(const FilterFunction& filter_function) const {
391  assert(IsValid());
392 
393  using FilterArgument
394  = typename FunctionTraits<FilterFunction>::template arg_plain<0>;
395  auto conv_filter_function =
396  [filter_function](const FilterArgument& input, auto emit_func) {
397  if (filter_function(input)) emit_func(input);
398  };
399 
400  static_assert(
402  "FilterFunction has the wrong input type");
403 
404  size_t new_id = context().next_dia_id();
405 
407  << "dia_id" << new_id
408  << "label" << "Filter"
409  << "class" << "DIA"
410  << "event" << "create"
411  << "type" << "LOp"
412  << "parents" << (common::Array<size_t>{ dia_id_ });
413 
414  auto new_stack = stack_.push(conv_filter_function);
416  node_, new_stack, new_id, "Filter");
417  }
418 
419  /*!
420  * \brief Each item of a DIA is expanded by the `flatmap_function` : \f$ A
421  * \to \textrm{array}(B) \f$ to zero or more items of different type, which
422  * are concatenated in the resulting DIA. The return type of
423  * `flatmap_function` must be specified as template parameter.
424  *
425  * FlatMap is a LOp, which maps this DIA according to the flatmap_function
426  * given by the user. The flatmap_function maps each element to elements of
427  * a possibly different type. The flatmap_function has an emitter function
428  * as it's second parameter. This emitter is called once for each element to
429  * be emitted. The function chain of the returned DIA is this DIA's stack_
430  * chained with flatmap_function.
431 
432  * \tparam ResultType ResultType of the FlatmapFunction, if different from
433  * item type of DIA.
434  *
435  * \param flatmap_function Map function of type FlatmapFunction, which maps
436  * each element to elements of a possibly different type.
437  *
438  * \ingroup dia_lops
439  */
440  template <typename ResultType = ValueType, typename FlatmapFunction>
441  auto FlatMap(const FlatmapFunction& flatmap_function) const {
442  assert(IsValid());
443 
444  size_t new_id = context().next_dia_id();
445 
447  << "dia_id" << new_id
448  << "label" << "FlatMap"
449  << "class" << "DIA"
450  << "event" << "create"
451  << "type" << "LOp"
452  << "parents" << (common::Array<size_t>{ dia_id_ });
453 
454  auto new_stack = stack_.push(flatmap_function);
456  node_, new_stack, new_id, "FlatMap");
457  }
458 
459  /*!
460  * Each item of a DIA is copied into the output DIA with success probability
461  * p (an independent Bernoulli trial).
462  *
463  * \ingroup dia_lops
464  */
465  auto BernoulliSample(double p) const;
466 
467  /*!
468  * Union is a LOp, which creates the union of all items from any number of
469  * DIAs as a single DIA, where the items are in an arbitrary order. All
470  * input DIAs must contain the same type, which is also the output DIA's
471  * type.
472  *
473  * The Union operation concatenates all _local_ pieces of a DIA, no
474  * rebalancing is performed, and no communication is needed.
475  *
476  * \ingroup dia_lops
477  */
478  template <typename SecondDIA>
479  auto Union(const SecondDIA& second_dia) const;
480 
481  //! \}
482 
483  //! \name Actions
484  //! \{
485 
486  /*!
487  * Computes the total size of all elements across all workers.
488  *
489  * \ingroup dia_actions
490  */
491  size_t Size() const;
492 
493  /*!
494  * Lazily computes the total size of all elements across all workers.
495  *
496  * \ingroup dia_actions
497  */
498  Future<size_t> SizeFuture() const;
499 
500  /*!
501  * Returns the whole DIA in an std::vector on each worker. This is only for
502  * testing purposes and should not be used on large datasets.
503  *
504  * \ingroup dia_actions
505  */
506  std::vector<ValueType> AllGather() const;
507 
508  /**
509  * \brief AllGather is an Action, which returns the whole DIA in an
510  * std::vector on each worker. This is only for testing purposes and should
511  * not be used on large datasets.
512  *
513  * \ingroup dia_actions
514  */
515  void AllGather(std::vector<ValueType>* out_vector) const;
516 
517  /*!
518  * Returns the whole DIA in an std::vector on each worker. This is only for
519  * testing purposes and should not be used on large datasets.
520  *
521  * \ingroup dia_actions
522  */
524 
525  /*!
526  * Print is an Action, which collects all data of the DIA at the worker 0
527  * and prints using ostream serialization. It is implemented using Gather().
528  *
529  * \ingroup dia_actions
530  */
531  void Print(const std::string& name) const;
532 
533  /*!
534  * Print is an Action, which collects all data of the DIA at the worker 0
535  * and prints using ostream serialization. It is implemented using Gather().
536  *
537  * \ingroup dia_actions
538  */
539  void Print(const std::string& name, std::ostream& out) const;
540 
541  /*!
542  * Gather is an Action, which collects all data of the DIA into a vector at
543  * the given worker. This should only be done if the received data can fit
544  * into RAM of the one worker.
545  *
546  * \ingroup dia_actions
547  */
548  std::vector<ValueType> Gather(size_t target_id = 0) const;
549 
550  /*!
551  * Gather is an Action, which collects all data of the DIA into a vector at
552  * the given worker. This should only be done if the received data can fit
553  * into RAM of the one worker.
554  *
555  * \ingroup dia_actions
556  */
557  void Gather(size_t target_id, std::vector<ValueType>* out_vector) const;
558 
559  /*!
560  * Select up to sample_size items uniformly at random and return a new
561  * DIA<T>.
562  */
563  auto Sample(size_t sample_size) const;
564 
565  /*!
566  * AllReduce is an Action, which computes the reduction sum of all elements
567  * globally and delivers the same value on all workers.
568  *
569  * \param reduce_function Reduce function.
570  *
571  * \param initial_value Initial value of the reduction.
572  *
573  * \ingroup dia_actions
574  */
575  template <typename ReduceFunction>
576  ValueType AllReduce(const ReduceFunction& reduce_function,
577  const ValueType& initial_value = ValueType()) const;
578 
579  /*!
580  * AllReduce is an ActionFuture, which computes the reduction sum of
581  * all elements globally and delivers the same value on all workers.
582  *
583  * \param reduce_function Reduce function.
584  *
585  * \param initial_value Initial value of the reduction.
586  *
587  * \ingroup dia_actions
588  */
589  template <typename ReduceFunction>
591  const ReduceFunction& reduce_function,
592  const ValueType& initial_value = ValueType()) const;
593 
594  /*!
595  * Sum is an Action, which computes the sum of all elements globally.
596  *
597  * \param sum_function Sum function.
598  *
599  * \param initial_value Initial value of the sum.
600  *
601  * \ingroup dia_actions
602  */
603  template <typename SumFunction = std::plus<ValueType> >
604  ValueType Sum(const SumFunction& sum_function = SumFunction(),
605  const ValueType& initial_value = ValueType()) const;
606 
607  /*!
608  * Sum is an ActionFuture, which computes the sum of all elements
609  * globally.
610  *
611  * \param sum_function Sum function.
612  *
613  * \param initial_value Initial value of the sum.
614  *
615  * \ingroup dia_actions
616  */
617  template <typename SumFunction = std::plus<ValueType> >
619  const SumFunction& sum_function = SumFunction(),
620  const ValueType& initial_value = ValueType()) const;
621 
622  /*!
623  * Min is an Action, which computes the minimum of all elements globally.
624  *
625  * \param initial_value Initial value of the min.
626  *
627  * \ingroup dia_actions
628  */
629  ValueType Min(const ValueType& initial_value = ValueType()) const;
630 
631  /*!
632  * Min is an ActionFuture, which computes the minimum of all elements
633  * globally.
634  *
635  * \param initial_value Initial value of the min.
636  *
637  * \ingroup dia_actions
638  */
640  const ValueType& initial_value = ValueType()) const;
641 
642  /*!
643  * Max is an Action, which computes the maximum of all elements globally.
644  *
645  * \param initial_value Initial value of the max.
646  *
647  * \ingroup dia_actions
648  */
649  ValueType Max(const ValueType& initial_value = ValueType()) const;
650 
651  /*!
652  * Max is an ActionFuture, which computes the maximum of all elements
653  * globally.
654  *
655  * \param initial_value Initial value of the max.
656  *
657  * \ingroup dia_actions
658  */
660  const ValueType& initial_value = ValueType()) const;
661 
662  /*!
663  * Compute the approximate number of distinct elements in the DIA.
664  *
665  * \tparam p Number of bits to use for index. Should be between 4 and 16.
666  * \ingroup dia_actions
667  */
668  template <size_t p>
669  double HyperLogLog() const;
670 
671  /*!
672  * WriteLinesOne is an Action, which writes std::strings to a single output
673  * file.
674  *
675  * \param filepath Destination of the output file.
676  *
677  * \ingroup dia_actions
678  */
679  void WriteLinesOne(const std::string& filepath) const;
680 
681  /*!
682  * WriteLinesOne is an ActionFuture, which writes std::strings to a single
683  * output file.
684  *
685  * \param filepath Destination of the output file.
686  *
687  * \ingroup dia_actions
688  */
690  const std::string& filepath) const;
691 
692  /*!
693  * WriteLines is an Action, which writes std::strings to multiple output
694  * files. Strings are written using fstream with a newline after each
695  * entry. Each worker creates its individual file.
696  *
697  * \param filepath Destination of the output file. This filepath must
698  * contain two special substrings: "$$$$$" is replaced by the worker id and
699  * "#####" will be replaced by the file chunk id. The last occurrences of
700  * "$" and "#" are replaced, otherwise "$$$$" and/or "##########" are
701  * automatically appended.
702  *
703  * \param target_file_size target size of each individual file.
704  *
705  * \ingroup dia_actions
706  */
707  void WriteLines(const std::string& filepath,
708  size_t target_file_size = 128* 1024* 1024) const;
709 
710  /*!
711  * WriteLines is an ActionFuture, which writes std::strings to multiple
712  * output files. Strings are written using fstream with a newline after each
713  * entry. Each worker creates its individual file.
714  *
715  * \param filepath Destination of the output file. This filepath must
716  * contain two special substrings: "$$$$$" is replaced by the worker id and
717  * "#####" will be replaced by the file chunk id. The last occurrences of
718  * "$" and "#" are replaced, otherwise "$$$$" and/or "##########" are
719  * automatically appended.
720  *
721  * \param target_file_size target size of each individual file.
722  *
723  * \ingroup dia_actions
724  */
726  const std::string& filepath,
727  size_t target_file_size = 128* 1024* 1024) const;
728 
729  /*!
730  * WriteBinary is a function, which writes a DIA to many files per
731  * worker. The input DIA can be recreated with ReadBinary and equal
732  * filepath.
733  *
734  * \param filepath Destination of the output file. This filepath must
735  * contain two special substrings: "$$$$$" is replaced by the worker id and
736  * "#####" will be replaced by the file chunk id. The last occurrences of
737  * "$" and "#" are replaced, otherwise "$$$$" and/or "##########" are
738  * automatically appended.
739  *
740  * \param max_file_size size limit of individual file.
741  *
742  * \ingroup dia_actions
743  */
744  void WriteBinary(const std::string& filepath,
745  size_t max_file_size = 128* 1024* 1024) const;
746 
747  /*!
748  * WriteBinary is a function, which writes a DIA to many files per
749  * worker. The input DIA can be recreated with ReadBinary and equal
750  * filepath.
751  *
752  * \param filepath Destination of the output file. This filepath must
753  * contain two special substrings: "$$$$$" is replaced by the worker id and
754  * "#####" will be replaced by the file chunk id. The last occurrences of
755  * "$" and "#" are replaced, otherwise "$$$$" and/or "##########" are
756  * automatically appended.
757  *
758  * \param max_file_size size limit of individual file.
759  *
760  * \ingroup dia_actions
761  */
763  const std::string& filepath,
764  size_t max_file_size = 128* 1024* 1024) const;
765 
766  //! \}
767 
768  //! \name Distributed Operations (DOps)
769  //! \{
770 
771  /*!
772  * ReduceByKey is a DOp, which groups elements of the DIA with the
773  * key_extractor and reduces each key-bucket to a single element using the
774  * associative reduce_function. The reduce_function defines how two elements
775  * can be reduced to a single element of equal type.
776  *
777  * The key of the reduced element has to be equal to the keys of the input
778  * elements. Since ReduceBy is a DOp, it creates a new DIANode. The DIA
779  * returned by Reduce links to this newly created DIANode. The stack_ of the
780  * returned DIA consists of the PostOp of Reduce, as a reduced element can
781  * directly be chained to the following LOps.
782  *
783  * \param key_extractor Key extractor function, which maps each element to a
784  * key of possibly different type.
785  *
786  * \tparam ReduceFunction Type of the reduce_function. This is a function
787  * reducing two elements of L's result type to a single element of equal
788  * type.
789  *
790  * \param reduce_function Reduce function, which defines how the key buckets
791  * are reduced to a single element. This function is applied associative but
792  * not necessarily commutative.
793  *
794  * \param reduce_config Reduce configuration.
795  *
796  * \ingroup dia_dops
797  */
798  template <typename KeyExtractor, typename ReduceFunction,
799  typename ReduceConfig = class DefaultReduceConfig>
800  auto ReduceByKey(
801  const KeyExtractor& key_extractor,
802  const ReduceFunction& reduce_function,
803  const ReduceConfig& reduce_config = ReduceConfig()) const;
804 
805  /*!
806  * ReduceByKey is a DOp, which groups elements of the DIA with the
807  * key_extractor and reduces each key-bucket to a single element using the
808  * associative reduce_function. The reduce_function defines how two elements
809  * can be reduced to a single element of equal type.
810  *
811  * The key of the reduced element has to be equal to the keys of the input
812  * elements. Since ReduceBy is a DOp, it creates a new DIANode. The DIA
813  * returned by Reduce links to this newly created DIANode. The stack_ of the
814  * returned DIA consists of the PostOp of Reduce, as a reduced element can
815  * directly be chained to the following LOps.
816  *
817  * \param key_extractor Key extractor function, which maps each element to a
818  * key of possibly different type.
819  *
820  * \tparam ReduceFunction Type of the reduce_function. This is a function
821  * reducing two elements of L's result type to a single element of equal
822  * type.
823  *
824  * \param reduce_function Reduce function, which defines how the key buckets
825  * are reduced to a single element. This function is applied associative but
826  * not necessarily commutative.
827  *
828  * \param reduce_config Reduce configuration.
829  *
830  * \param key_hash_function Function to hash keys extracted by KeyExtractor.
831  *
832  * \ingroup dia_dops
833  */
834  template <typename KeyExtractor, typename ReduceFunction,
835  typename ReduceConfig, typename KeyHashFunction>
836  auto ReduceByKey(
837  const KeyExtractor& key_extractor,
838  const ReduceFunction& reduce_function,
839  const ReduceConfig& reduce_config,
840  const KeyHashFunction& key_hash_function) const;
841 
842  /*!
843  * ReduceByKey is a DOp, which groups elements of the DIA with the
844  * key_extractor and reduces each key-bucket to a single element using the
845  * associative reduce_function. The reduce_function defines how two elements
846  * can be reduced to a single element of equal type.
847  *
848  * The key of the reduced element has to be equal to the keys of the input
849  * elements. Since ReduceBy is a DOp, it creates a new DIANode. The DIA
850  * returned by Reduce links to this newly created DIANode. The stack_ of the
851  * returned DIA consists of the PostOp of Reduce, as a reduced element can
852  * directly be chained to the following LOps.
853  *
854  * \param key_extractor Key extractor function, which maps each element to a
855  * key of possibly different type.
856  *
857  * \tparam ReduceFunction Type of the reduce_function. This is a function
858  * reducing two elements of L's result type to a single element of equal
859  * type.
860  *
861  * \param reduce_function Reduce function, which defines how the key buckets
862  * are reduced to a single element. This function is applied associative but
863  * not necessarily commutative.
864  *
865  * \param reduce_config Reduce configuration.
866  *
867  * \param key_hash_function Function to hash keys extracted by KeyExtractor.
868  *
869  * \param key_equal_function Function to compare keys in reduce hash tables.
870  *
871  * \ingroup dia_dops
872  */
873  template <typename KeyExtractor, typename ReduceFunction,
874  typename ReduceConfig,
875  typename KeyHashFunction, typename KeyEqualFunction>
876  auto ReduceByKey(
877  const KeyExtractor& key_extractor,
878  const ReduceFunction& reduce_function,
879  const ReduceConfig& reduce_config,
880  const KeyHashFunction& key_hash_function,
881  const KeyEqualFunction& key_equal_function) const;
882 
883  /*!
884  * ReduceByKey is a DOp, which groups elements of the DIA with the
885  * key_extractor and reduces each key-bucket to a single element using the
886  * associative reduce_function. The reduce_function defines how two elements
887  * can be reduced to a single element of equal type.
888  *
889  * In contrast to ReduceBy, the reduce_function is allowed to change the key
890  * (Example: Integers with modulo function as key_extractor). Creates
891  * overhead as both key and value have to be sent in shuffle step. Since
892  * ReduceByKey is a DOp, it creates a new DIANode. The DIA returned by
893  * Reduce links to this newly created DIANode. The stack_ of the returned
894  * DIA consists of the PostOp of Reduce, as a reduced element can directly
895  * be chained to the following LOps.
896  *
897  * \param volatile_key_flag tag
898  *
899  * \param key_extractor Key extractor function, which maps each element to a
900  * key of possibly different type.
901  *
902  * \tparam ReduceFunction Type of the reduce_function. This is a function
903  * reducing two elements of L's result type to a single element of equal
904  * type.
905  *
906  * \param reduce_function Reduce function, which defines how the key buckets
907  * are reduced to a single element. This function is applied associative but
908  * not necessarily commutative.
909  *
910  * \param reduce_config Reduce configuration.
911  *
912  * \param key_hash_function Function to hash keys extracted by KeyExtractor.
913  *
914  * \param key_equal_function Function to compare keys in reduce hash tables.
915  *
916  * \ingroup dia_dops
917  */
918  template <bool VolatileKeyValue,
919  typename KeyExtractor, typename ReduceFunction,
920  typename ReduceConfig = class DefaultReduceConfig,
921  typename KeyHashFunction =
922  std::hash<typename FunctionTraits<KeyExtractor>::result_type>,
923  typename KeyEqualFunction =
924  std::equal_to<typename FunctionTraits<KeyExtractor>::result_type> >
925  auto ReduceByKey(
927  const KeyExtractor& key_extractor,
928  const ReduceFunction& reduce_function,
929  const ReduceConfig& reduce_config = ReduceConfig(),
930  const KeyHashFunction& key_hash_function = KeyHashFunction(),
931  const KeyEqualFunction& key_equal_function = KeyEqualFunction()) const;
932 
933  /*!
934  * ReduceByKey is a DOp, which groups elements of the DIA with the
935  * key_extractor and reduces each key-bucket to a single element using the
936  * associative reduce_function. The reduce_function defines how two elements
937  * can be reduced to a single element of equal type.
938  *
939  * In contrast to ReduceBy, the reduce_function is allowed to change the key
940  * (Example: Integers with modulo function as key_extractor). Creates
941  * overhead as both key and value have to be sent in shuffle step. Since
942  * ReduceByKey is a DOp, it creates a new DIANode. The DIA returned by
943  * Reduce links to this newly created DIANode. The stack_ of the returned
944  * DIA consists of the PostOp of Reduce, as a reduced element can directly
945  * be chained to the following LOps.
946  *
947  * \param duplicate_detection_flag tag
948  *
949  * \param key_extractor Key extractor function, which maps each element to a
950  * key of possibly different type.
951  *
952  * \tparam ReduceFunction Type of the reduce_function. This is a function
953  * reducing two elements of L's result type to a single element of equal
954  * type.
955  *
956  * \param reduce_function Reduce function, which defines how the key buckets
957  * are reduced to a single element. This function is applied associative but
958  * not necessarily commutative.
959  *
960  * \param reduce_config Reduce configuration.
961  *
962  * \param key_hash_function Function to hash keys extracted by KeyExtractor.
963  *
964  * \param key_equal_function Function to compare keys in reduce hash tables.
965  *
966  * \ingroup dia_dops
967  */
968  template <bool DuplicateDetectionValue,
969  typename KeyExtractor, typename ReduceFunction,
970  typename ReduceConfig = class DefaultReduceConfig,
971  typename KeyHashFunction =
972  std::hash<typename FunctionTraits<KeyExtractor>::result_type>,
973  typename KeyEqualFunction =
974  std::equal_to<typename FunctionTraits<KeyExtractor>::result_type> >
975  auto ReduceByKey(
977  const KeyExtractor& key_extractor,
978  const ReduceFunction& reduce_function,
979  const ReduceConfig& reduce_config = ReduceConfig(),
980  const KeyHashFunction& key_hash_function = KeyHashFunction(),
981  const KeyEqualFunction& key_equal_function = KeyEqualFunction()) const;
982 
983  /*!
984  * ReduceByKey is a DOp, which groups elements of the DIA with the
985  * key_extractor and reduces each key-bucket to a single element using the
986  * associative reduce_function. The reduce_function defines how two elements
987  * can be reduced to a single element of equal type.
988  *
989  * In contrast to ReduceBy, the reduce_function is allowed to change the key
990  * (Example: Integers with modulo function as key_extractor). Creates
991  * overhead as both key and value have to be sent in shuffle step. Since
992  * ReduceByKey is a DOp, it creates a new DIANode. The DIA returned by
993  * Reduce links to this newly created DIANode. The stack_ of the returned
994  * DIA consists of the PostOp of Reduce, as a reduced element can directly
995  * be chained to the following LOps.
996  *
997  * \param key_extractor Key extractor function, which maps each element to a
998  * key of possibly different type.
999  *
1000  * \tparam ReduceFunction Type of the reduce_function. This is a function
1001  * reducing two elements of L's result type to a single element of equal
1002  * type.
1003  *
1004  * \param reduce_function Reduce function, which defines how the key buckets
1005  * are reduced to a single element. This function is applied associative but
1006  * not necessarily commutative.
1007  *
1008  * \param reduce_config Reduce configuration.
1009  *
1010  * \param key_hash_function Function to hash keys extracted by KeyExtractor.
1011  *
1012  * \param key_equal_function Function to compare keys in reduce hash tables.
1013  *
1014  * \ingroup dia_dops
1015  */
1016  template <bool VolatileKeyValue,
1017  bool DuplicateDetectionValue,
1018  typename KeyExtractor, typename ReduceFunction,
1019  typename ReduceConfig = class DefaultReduceConfig,
1020  typename KeyHashFunction =
1021  std::hash<typename FunctionTraits<KeyExtractor>::result_type>,
1022  typename KeyEqualFunction =
1023  std::equal_to<typename FunctionTraits<KeyExtractor>::result_type> >
1024  auto ReduceByKey(
1027  const KeyExtractor& key_extractor,
1028  const ReduceFunction& reduce_function,
1029  const ReduceConfig& reduce_config = ReduceConfig(),
1030  const KeyHashFunction& key_hash_function = KeyHashFunction(),
1031  const KeyEqualFunction& key_equal_function = KeyEqualFunction()) const;
1032 
1033  /*!
1034  * ReducePair is a DOp, which groups key-value-pairs in the input DIA by
1035  * their key and reduces each key-bucket to a single element using the
1036  * associative reduce_function. The reduce_function defines how two elements
1037  * can be reduced to a single element of equal type. The reduce_function is
1038  * allowed to change the key. Since ReducePair is a DOp, it creates a new
1039  * DIANode. The DIA returned by Reduce links to this newly created
1040  * DIANode. The stack_ of the returned DIA consists of the PostOp of Reduce,
1041  * as a reduced element can directly be chained to the following LOps.
1042  *
1043  * \tparam ReduceFunction Type of the reduce_function. This is a function
1044  * reducing two elements of L's result type to a single element of equal
1045  * type.
1046  *
1047  * \param reduce_function Reduce function, which defines how the key buckets
1048  * are reduced to a single element. This function is applied associative but
1049  * not necessarily commutative.
1050  *
1051  * \param reduce_config Reduce configuration.
1052  *
1053  * \ingroup dia_dops
1054  */
1055  template <typename ReduceFunction,
1056  typename ReduceConfig = class DefaultReduceConfig>
1057  auto ReducePair(
1058  const ReduceFunction& reduce_function,
1059  const ReduceConfig& reduce_config = ReduceConfig()) const;
1060 
1061  /*!
1062  * ReducePair is a DOp, which groups key-value-pairs in the input DIA by
1063  * their key and reduces each key-bucket to a single element using the
1064  * associative reduce_function. The reduce_function defines how two elements
1065  * can be reduced to a single element of equal type. The reduce_function is
1066  * allowed to change the key. Since ReducePair is a DOp, it creates a new
1067  * DIANode. The DIA returned by Reduce links to this newly created
1068  * DIANode. The stack_ of the returned DIA consists of the PostOp of Reduce,
1069  * as a reduced element can directly be chained to the following LOps.
1070  *
1071  * \tparam ReduceFunction Type of the reduce_function. This is a function
1072  * reducing two elements of L's result type to a single element of equal
1073  * type.
1074  *
1075  * \param reduce_function Reduce function, which defines how the key buckets
1076  * are reduced to a single element. This function is applied associative but
1077  * not necessarily commutative.
1078  *
1079  * \param reduce_config Reduce configuration.
1080  *
1081  * \param key_hash_function Function to hash keys extracted by KeyExtractor.
1082  *
1083  * \ingroup dia_dops
1084  */
1085  template <typename ReduceFunction, typename ReduceConfig,
1086  typename KeyHashFunction>
1087  auto ReducePair(
1088  const ReduceFunction& reduce_function,
1089  const ReduceConfig& reduce_config,
1090  const KeyHashFunction& key_hash_function) const;
1091 
1092  /*!
1093  * ReducePair is a DOp, which groups key-value-pairs in the input DIA by
1094  * their key and reduces each key-bucket to a single element using the
1095  * associative reduce_function. The reduce_function defines how two elements
1096  * can be reduced to a single element of equal type. The reduce_function is
1097  * allowed to change the key. Since ReducePair is a DOp, it creates a new
1098  * DIANode. The DIA returned by Reduce links to this newly created
1099  * DIANode. The stack_ of the returned DIA consists of the PostOp of Reduce,
1100  * as a reduced element can directly be chained to the following LOps.
1101  *
1102  * \tparam ReduceFunction Type of the reduce_function. This is a function
1103  * reducing two elements of L's result type to a single element of equal
1104  * type.
1105  *
1106  * \param reduce_function Reduce function, which defines how the key buckets
1107  * are reduced to a single element. This function is applied associative but
1108  * not necessarily commutative.
1109  *
1110  * \param reduce_config Reduce configuration.
1111  *
1112  * \param key_hash_function Function to hash keys extracted by KeyExtractor.
1113  *
1114  * \param key_equal_function Function to compare keys in reduce hash tables.
1115  *
1116  * \ingroup dia_dops
1117  */
1118  template <typename ReduceFunction, typename ReduceConfig,
1119  typename KeyHashFunction, typename KeyEqualFunction>
1120  auto ReducePair(
1121  const ReduceFunction& reduce_function,
1122  const ReduceConfig& reduce_config,
1123  const KeyHashFunction& key_hash_function,
1124  const KeyEqualFunction& key_equal_function) const;
1125 
1126  /*!
1127  * ReducePair is a DOp, which groups key-value-pairs in the input DIA by
1128  * their key and reduces each key-bucket to a single element using the
1129  * associative reduce_function. The reduce_function defines how two elements
1130  * can be reduced to a single element of equal type. The reduce_function is
1131  * allowed to change the key. Since ReducePair is a DOp, it creates a new
1132  * DIANode. The DIA returned by Reduce links to this newly created
1133  * DIANode. The stack_ of the returned DIA consists of the PostOp of Reduce,
1134  * as a reduced element can directly be chained to the following LOps.
1135  *
1136  * \tparam ReduceFunction Type of the reduce_function. This is a function
1137  * reducing two elements of L's result type to a single element of equal
1138  * type.
1139  *
1140  * \param reduce_function Reduce function, which defines how the key buckets
1141  * are reduced to a single element. This function is applied associative but
1142  * not necessarily commutative.
1143  *
1144  * \param reduce_config Reduce configuration.
1145  *
1146  * \param key_hash_function Function to hash keys extracted by KeyExtractor.
1147  *
1148  * \param key_equal_function Function to compare keys in reduce hash tables.
1149  *
1150  * \ingroup dia_dops
1151  */
1152  template <bool DuplicateDetectionValue,
1153  typename ReduceFunction,
1154  typename ReduceConfig = class DefaultReduceConfig,
1155  typename KeyHashFunction,
1156  typename KeyEqualFunction
1157  >
1158  auto ReducePair(
1160  const ReduceFunction& reduce_function,
1161  const ReduceConfig& reduce_config = ReduceConfig(),
1162  const KeyHashFunction& key_hash_function = KeyHashFunction(),
1163  const KeyEqualFunction& key_equal_function = KeyEqualFunction()) const;
1164 
1165  /*!
1166  * ReduceToIndex is a DOp, which groups elements of the DIA with the
1167  * key_extractor returning an unsigned integers and reduces each key-bucket
1168  * to a single element using the associative reduce_function. In contrast
1169  * to ReduceBy, ReduceToIndex returns a DIA in a defined order, which has
1170  * the reduced element with key i in position i.
1171  *
1172  * The reduce_function defines how two elements can be reduced to a single
1173  * element of equal type. The key of the reduced element has to be equal to
1174  * the keys of the input elements. Since ReduceToIndex is a DOp, it creates
1175  * a new DIANode. The DIA returned by ReduceToIndex links to this newly
1176  * created DIANode. The stack_ of the returned DIA consists of the PostOp of
1177  * ReduceToIndex, as a reduced element can directly be chained to the
1178  * following LOps.
1179  *
1180  * \param key_extractor Key extractor function, which maps each element to a
1181  * key of possibly different type.
1182  *
1183  * \tparam ReduceFunction Type of the reduce_function. This is a function
1184  * reducing two elements of L's result type to a single element of equal
1185  * type.
1186  *
1187  * \param reduce_function Reduce function, which defines how the key buckets
1188  * are reduced to a single element. This function is applied associative but
1189  * not necessarily commutative.
1190  *
1191  * \param size Resulting DIA size. Consequently, the key_extractor function
1192  * but always return < size for any element in the input DIA.
1193  *
1194  * \param neutral_element Item value with which to start the reduction in
1195  * each array cell.
1196  *
1197  * \param reduce_config Reduce configuration.
1198  *
1199  * \ingroup dia_dops
1200  */
1201  template <typename KeyExtractor, typename ReduceFunction,
1202  typename ReduceConfig = class DefaultReduceToIndexConfig>
1203  auto ReduceToIndex(
1204  const KeyExtractor& key_extractor,
1205  const ReduceFunction& reduce_function,
1206  size_t size,
1207  const ValueType& neutral_element = ValueType(),
1208  const ReduceConfig& reduce_config = ReduceConfig()) const;
1209 
1210  /*!
1211  * ReduceToIndex is a DOp, which groups elements of the DIA with the
1212  * key_extractor returning an unsigned integers and reduces each key-bucket
1213  * to a single element using the associative reduce_function. In contrast
1214  * to ReduceByKey, ReduceToIndex returns a DIA in a defined order, which has
1215  * the reduced element with key i in position i. The reduce_function
1216  * defines how two elements can be reduced to a single element of equal
1217  * type.
1218  *
1219  * ReduceToIndex is the equivalent to ReduceByKey, as the
1220  * reduce_function is allowed to change the key. Since ReduceToIndex
1221  * is a DOp, it creates a new DIANode. The DIA returned by ReduceToIndex
1222  * links to this newly created DIANode. The stack_ of the returned DIA
1223  * consists of the PostOp of ReduceToIndex, as a reduced element can
1224  * directly be chained to the following LOps.
1225  *
1226  * \param key_extractor Key extractor function, which maps each element to a
1227  * key of possibly different type.
1228  *
1229  * \tparam ReduceFunction Type of the reduce_function. This is a function
1230  * reducing two elements of L's result type to a single element of equal
1231  * type.
1232  *
1233  * \param reduce_function Reduce function, which defines how the key buckets
1234  * are reduced to a single element. This function is applied associative but
1235  * not necessarily commutative.
1236  *
1237  * \param size Resulting DIA size. Consequently, the key_extractor function
1238  * but always return < size for any element in the input DIA.
1239  *
1240  * \param neutral_element Item value with which to start the reduction in
1241  * each array cell.
1242  *
1243  * \param reduce_config Reduce configuration.
1244  *
1245  * \ingroup dia_dops
1246  */
1247  template <bool VolatileKeyValue,
1248  typename KeyExtractor, typename ReduceFunction,
1249  typename ReduceConfig = class DefaultReduceToIndexConfig>
1250  auto ReduceToIndex(
1252  const KeyExtractor& key_extractor,
1253  const ReduceFunction& reduce_function,
1254  size_t size,
1255  const ValueType& neutral_element = ValueType(),
1256  const ReduceConfig& reduce_config = ReduceConfig()) const;
1257 
1258  /*!
1259  * ReduceToIndex is a DOp, which groups elements of the DIA with the
1260  * key_extractor returning an unsigned integers and reduces each key-bucket
1261  * to a single element using the associative reduce_function. In contrast
1262  * to ReduceByKey, ReduceToIndex returns a DIA in a defined order, which has
1263  * the reduced element with key i in position i. The reduce_function
1264  * defines how two elements can be reduced to a single element of equal
1265  * type.
1266  *
1267  * ReduceToIndex is the equivalent to ReduceByKey, as the
1268  * reduce_function is allowed to change the key. Since ReduceToIndex
1269  * is a DOp, it creates a new DIANode. The DIA returned by ReduceToIndex
1270  * links to this newly created DIANode. The stack_ of the returned DIA
1271  * consists of the PostOp of ReduceToIndex, as a reduced element can
1272  * directly be chained to the following LOps.
1273  *
1274  * \param key_extractor Key extractor function, which maps each element to a
1275  * key of possibly different type.
1276  *
1277  * \tparam ReduceFunction Type of the reduce_function. This is a function
1278  * reducing two elements of L's result type to a single element of equal
1279  * type.
1280  *
1281  * \param reduce_function Reduce function, which defines how the key buckets
1282  * are reduced to a single element. This function is applied associative but
1283  * not necessarily commutative.
1284  *
1285  * \param size Resulting DIA size. Consequently, the key_extractor function
1286  * but always return < size for any element in the input DIA.
1287  *
1288  * \param neutral_element Item value with which to start the reduction in
1289  * each array cell.
1290  *
1291  * \param reduce_config Reduce configuration.
1292  *
1293  * \ingroup dia_dops
1294  */
1295  template <typename KeyExtractor, typename ReduceFunction,
1296  typename ReduceConfig = class DefaultReduceToIndexConfig>
1297  auto ReduceToIndex(
1298  const struct SkipPreReducePhaseTag&,
1299  const KeyExtractor& key_extractor,
1300  const ReduceFunction& reduce_function,
1301  size_t size,
1302  const ValueType& neutral_element = ValueType(),
1303  const ReduceConfig& reduce_config = ReduceConfig()) const;
1304 
1305  /*!
1306  * GroupByKey is a DOp, which groups elements of the DIA by its key.
1307  * After having grouped all elements of one key, all elements of one key
1308  * will be processed according to the GroupByFunction and returns an output
1309  * Contrary to Reduce, GroupBy allows usage of functions that require all
1310  * elements of one key at once as GroupByFunction will be applied _after_
1311  * all elements with the same key have been grouped. However because of this
1312  * reason, the communication overhead is also higher. If possible, usage of
1313  * Reduce is therefore recommended.
1314  *
1315  * As GroupBy is a DOp, it creates a new DIANode. The DIA returned by
1316  * Reduce links to this newly created DIANode. The stack_ of the returned
1317  * DIA consists of the PostOp of Reduce, as a reduced element can
1318  * directly be chained to the following LOps.
1319  *
1320  * \tparam KeyExtractor Type of the key_extractor function.
1321  * The key_extractor function is equal to a map function.
1322  *
1323  * \param key_extractor Key extractor function, which maps each element to a
1324  * key of possibly different type.
1325  *
1326  * \tparam GroupByFunction Type of the groupby_function. This is a function
1327  * taking an iterator for all elements of the same key as input.
1328  *
1329  * \param groupby_function Reduce function, which defines how the key
1330  * buckets are grouped and processed.
1331  * input param: api::GroupByReader with functions HasNext() and Next()
1332  *
1333  * \ingroup dia_dops
1334  */
1335  template <typename ValueOut, typename KeyExtractor,
1336  typename GroupByFunction>
1337  auto GroupByKey(const KeyExtractor& key_extractor,
1338  const GroupByFunction& groupby_function) const;
1339 
1340  /*!
1341  * GroupByKey is a DOp, which groups elements of the DIA by its key.
1342  * After having grouped all elements of one key, all elements of one key
1343  * will be processed according to the GroupByFunction and returns an output
1344  * Contrary to Reduce, GroupBy allows usage of functions that require all
1345  * elements of one key at once as GroupByFunction will be applied _after_
1346  * all elements with the same key have been grouped. However because of this
1347  * reason, the communication overhead is also higher. If possible, usage of
1348  * Reduce is therefore recommended.
1349  *
1350  * As GroupBy is a DOp, it creates a new DIANode. The DIA returned by
1351  * Reduce links to this newly created DIANode. The stack_ of the returned
1352  * DIA consists of the PostOp of Reduce, as a reduced element can
1353  * directly be chained to the following LOps.
1354  *
1355  * \tparam KeyExtractor Type of the key_extractor function.
1356  * The key_extractor function is equal to a map function.
1357  *
1358  * \param key_extractor Key extractor function, which maps each element to a
1359  * key of possibly different type.
1360  *
1361  * \tparam GroupByFunction Type of the groupby_function. This is a function
1362  * taking an iterator for all elements of the same key as input.
1363  *
1364  * \param groupby_function Reduce function, which defines how the key
1365  * buckets are grouped and processed.
1366  * input param: api::GroupByReader with functions HasNext() and Next()
1367  *
1368  * \param hash_function Hash method for Keys
1369  *
1370  * \ingroup dia_dops
1371  */
1372  template <typename ValueOut, typename KeyExtractor,
1373  typename GroupByFunction, typename HashFunction>
1374  auto GroupByKey(const KeyExtractor& key_extractor,
1375  const GroupByFunction& groupby_function,
1376  const HashFunction& hash_function) const;
1377 
1378  /*!
1379  * GroupByKey is a DOp, which groups elements of the DIA by its key.
1380  * After having grouped all elements of one key, all elements of one key
1381  * will be processed according to the GroupByFunction and returns an output
1382  * Contrary to Reduce, GroupBy allows usage of functions that require all
1383  * elements of one key at once as GroupByFunction will be applied _after_
1384  * all elements with the same key have been grouped. However because of this
1385  * reason, the communication overhead is also higher. If possible, usage of
1386  * Reduce is therefore recommended.
1387  *
1388  * As GroupBy is a DOp, it creates a new DIANode. The DIA returned by
1389  * Reduce links to this newly created DIANode. The stack_ of the returned
1390  * DIA consists of the PostOp of Reduce, as a reduced element can
1391  * directly be chained to the following LOps.
1392  *
1393  * \tparam KeyExtractor Type of the key_extractor function.
1394  * The key_extractor function is equal to a map function.
1395  *
1396  * \param key_extractor Key extractor function, which maps each element to a
1397  * key of possibly different type.
1398  *
1399  * \tparam GroupByFunction Type of the groupby_function. This is a function
1400  * taking an iterator for all elements of the same key as input.
1401  *
1402  * \param groupby_function Reduce function, which defines how the key
1403  * buckets are grouped and processed.
1404  * input param: api::GroupByReader with functions HasNext() and Next()
1405  *
1406  * \param hash_function Hash method for Keys
1407  *
1408  * \ingroup dia_dops
1409  */
1410  template <typename ValueOut, bool LocationDetectionTagValue,
1411  typename KeyExtractor, typename GroupByFunction,
1412  typename HashFunction =
1413  std::hash<typename FunctionTraits<KeyExtractor>::result_type>
1414  >
1416  const KeyExtractor& key_extractor,
1417  const GroupByFunction& groupby_function,
1418  const HashFunction& hash_function = HashFunction()) const;
1419 
1420  /*!
1421  * GroupBy is a DOp, which groups elements of the DIA by its key.
1422  * After having grouped all elements of one key, all elements of one key
1423  * will be processed according to the GroupByFunction and returns an output
1424  * Contrary to Reduce, GroupBy allows usage of functions that require all
1425  * elements of one key at once as GroupByFunction will be applied _after_
1426  * all elements with the same key have been grouped. However because of this
1427  * reason, the communication overhead is also higher. If possible, usage of
1428  * Reduce is therefore recommended.
1429  *
1430  * In contrast to GroupBy, GroupToIndex returns a DIA in a defined order,
1431  * which has the reduced element with key i in position i.
1432  * As GroupBy is a DOp, it creates a new DIANode. The DIA returned by
1433  * Reduce links to this newly created DIANode. The stack_ of the returned
1434  * DIA consists of the PostOp of Reduce, as a reduced element can
1435  * directly be chained to the following LOps.
1436  *
1437  * \tparam KeyExtractor Type of the key_extractor function.
1438  * The key_extractor function is equal to a map function.
1439  *
1440  * \param key_extractor Key extractor function, which maps each element to a
1441  * key of possibly different type.
1442  *
1443  * \tparam GroupByFunction Type of the groupby_function. This is a function
1444  * taking an iterator for all elements of the same key as input.
1445  *
1446  * \param groupby_function Reduce function, which defines how the key
1447  * buckets are grouped and processed.
1448  * input param: api::GroupByReader with functions HasNext() and Next()
1449  *
1450  * \param size Resulting DIA size. Consequently, the key_extractor function
1451  * but always return < size for any element in the input DIA.
1452  *
1453  * \param neutral_element Item value with which to start the reduction in
1454  * each array cell.
1455  *
1456  * \ingroup dia_dops
1457  */
1458  template <typename ValueOut, typename KeyExtractor,
1459  typename GroupByFunction>
1460  auto GroupToIndex(const KeyExtractor& key_extractor,
1461  const GroupByFunction& groupby_function,
1462  const size_t size,
1463  const ValueOut& neutral_element = ValueOut()) const;
1464 
1465  /*!
1466  * Zips two DIAs of equal size in style of functional programming by
1467  * applying zip_function to the i-th elements of both input DIAs to form the
1468  * i-th element of the output DIA. The type of the output DIA can be
1469  * inferred from the zip_function.
1470  *
1471  * The two input DIAs are required to be of equal size, otherwise use the
1472  * CutTag variant.
1473  *
1474  * \tparam ZipFunction Type of the zip_function. This is a function with two
1475  * input elements, both of the local type, and one output element, which is
1476  * the type of the Zip node.
1477  *
1478  * \param zip_function Zip function, which zips two elements together
1479  *
1480  * \param second_dia DIA, which is zipped together with the original
1481  * DIA.
1482  *
1483  * \ingroup dia_dops
1484  */
1485  template <typename ZipFunction, typename SecondDIA>
1486  auto Zip(const SecondDIA& second_dia,
1487  const ZipFunction& zip_function) const;
1488 
1489  /*!
1490  * Zips two DIAs in style of functional programming by applying zip_function
1491  * to the i-th elements of both input DIAs to form the i-th element of the
1492  * output DIA. The type of the output DIA can be inferred from the
1493  * zip_function.
1494  *
1495  * If the two input DIAs are of unequal size, the result is the shorter of
1496  * both. Otherwise use PadTag().
1497  *
1498  * \tparam ZipFunction Type of the zip_function. This is a function with two
1499  * input elements, both of the local type, and one output element, which is
1500  * the type of the Zip node.
1501  *
1502  * \param zip_function Zip function, which zips two elements together
1503  *
1504  * \param second_dia DIA, which is zipped together with the original
1505  * DIA.
1506  *
1507  * \ingroup dia_dops
1508  */
1509  template <typename ZipFunction, typename SecondDIA>
1510  auto Zip(struct CutTag const&, const SecondDIA& second_dia,
1511  const ZipFunction& zip_function) const;
1512 
1513  /*!
1514  * Zips two DIAs in style of functional programming by applying zip_function
1515  * to the i-th elements of both input DIAs to form the i-th element of the
1516  * output DIA. The type of the output DIA can be inferred from the
1517  * zip_function.
1518  *
1519  * The output DIA's length is the *maximum* of all input DIAs, shorter DIAs
1520  * are padded with default-constructed items.
1521  *
1522  * \tparam ZipFunction Type of the zip_function. This is a function with two
1523  * input elements, both of the local type, and one output element, which is
1524  * the type of the Zip node.
1525  *
1526  * \param zip_function Zip function, which zips two elements together
1527  *
1528  * \param second_dia DIA, which is zipped together with the original
1529  * DIA.
1530  *
1531  * \ingroup dia_dops
1532  */
1533  template <typename ZipFunction, typename SecondDIA>
1534  auto Zip(struct PadTag const&, const SecondDIA& second_dia,
1535  const ZipFunction& zip_function) const;
1536 
1537  /*!
1538  * Zips two DIAs in style of functional programming by applying zip_function
1539  * to the i-th elements of both input DIAs to form the i-th element of the
1540  * output DIA. The type of the output DIA can be inferred from the
1541  * zip_function.
1542  *
1543  * In this variant, the DIA partitions on all PEs must have matching
1544  * length. No rebalancing is performed, and the program will die if any
1545  * partition mismatches. This enables Zip to proceed without any
1546  * communication.
1547  *
1548  * \tparam ZipFunction Type of the zip_function. This is a function with two
1549  * input elements, both of the local type, and one output element, which is
1550  * the type of the Zip node.
1551  *
1552  * \param zip_function Zip function, which zips two elements together
1553  *
1554  * \param second_dia DIA, which is zipped together with the original
1555  * DIA.
1556  *
1557  * \ingroup dia_dops
1558  */
1559  template <typename ZipFunction, typename SecondDIA>
1560  auto Zip(struct NoRebalanceTag const&, const SecondDIA& second_dia,
1561  const ZipFunction& zip_function) const;
1562 
1563  /*!
1564  * Zips each item of a DIA with its zero-based array index. This requires a
1565  * full data store/retrieve cycle because the input DIA's size is generally
1566  * unknown.
1567  *
1568  * \param zip_function Zip function, which gets each element together with
1569  * its array index.
1570  *
1571  * \ingroup dia_dops
1572  */
1573  template <typename ZipFunction>
1574  auto ZipWithIndex(const ZipFunction& zip_function) const;
1575 
1576  /*!
1577  * Sort is a DOp, which sorts a given DIA according to the given compare_function.
1578  *
1579  * \tparam CompareFunction Type of the compare_function.
1580  * Should be (ValueType,ValueType)->bool
1581  *
1582  * \param compare_function Function, which compares two elements. Returns
1583  * true, if first element is smaller than second. False otherwise.
1584  *
1585  * \ingroup dia_dops
1586  */
1587  template <typename CompareFunction = std::less<ValueType> >
1588  auto Sort(const CompareFunction& compare_function = CompareFunction()) const;
1589 
1590  /*!
1591  * Sort is a DOp, which sorts a given DIA according to the given compare_function.
1592  *
1593  * \tparam CompareFunction Type of the compare_function.
1594  * Should be (ValueType,ValueType)->bool
1595  *
1596  * \param compare_function Function, which compares two elements. Returns
1597  * true, if first element is smaller than second. False otherwise.
1598  *
1599  * \param sort_algorithm Algorithm class used to sort items. Merging is
1600  * always done using a tournament tree with compare_function.
1601  *
1602  * \ingroup dia_dops
1603  */
1604  template <typename CompareFunction, typename SortAlgorithm>
1605  auto Sort(const CompareFunction& compare_function,
1606  const SortAlgorithm& sort_algorithm) const;
1607 
1608  /*!
1609  * SortStable is a DOp, which sorts a given DIA stably according to the
1610  * given compare_function.
1611  *
1612  * \tparam CompareFunction Type of the compare_function.
1613  * Should be (ValueType,ValueType)->bool
1614  *
1615  * \param compare_function Function, which compares two elements. Returns
1616  * true, if first element is smaller than second. False otherwise.
1617  *
1618  * \ingroup dia_dops
1619  */
1620  template <typename CompareFunction = std::less<ValueType> >
1621  auto SortStable(const CompareFunction& compare_function = CompareFunction()) const;
1622 
1623  /*!
1624  * SortStable is a DOp, which sorts a given DIA stably according to the
1625  * given compare_function.
1626  *
1627  * \tparam CompareFunction Type of the compare_function.
1628  * Should be (ValueType,ValueType)->bool
1629  *
1630  * \param compare_function Function, which compares two elements. Returns
1631  * true, if first element is smaller than second. False otherwise.
1632  *
1633  * \param sort_algorithm Algorithm class used to stably sort items. Merging
1634  * is always done using a tournament tree with compare_function. In order
1635  * for the sorting to be stable, this must be a stable sorting algorithm.
1636  *
1637  * \ingroup dia_dops
1638  */
1639  template <typename CompareFunction, typename SortAlgorithm>
1640  auto SortStable(const CompareFunction& compare_function,
1641  const SortAlgorithm& sort_algorithm) const;
1642 
1643  /*!
1644  * Merge is a DOp, which merges two sorted DIAs to a single sorted DIA.
1645  * Both input DIAs must be used sorted conforming to the given comparator.
1646  * The type of the output DIA will be the type of this DIA.
1647  *
1648  * The merge operation balances all input data, so that each worker will
1649  * have an equal number of elements when the merge completes.
1650  *
1651  * \param comparator Comparator to specify the order of input and output.
1652  *
1653  * \param second_dia DIA, which is merged with this DIA.
1654  *
1655  * \ingroup dia_dops
1656  */
1657  template <typename Comparator = std::less<ValueType>, typename SecondDIA>
1658  auto Merge(const SecondDIA& second_dia,
1659  const Comparator& comparator = Comparator()) const;
1660 
1661  /*!
1662  * PrefixSum is a DOp, which computes the (inclusive) prefix sum of all
1663  * elements. The sum function defines how two elements are combined to a
1664  * single element.
1665  *
1666  * \param sum_function Sum function (any associative function).
1667  *
1668  * \param initial_element Initial element of the sum function.
1669  *
1670  * \ingroup dia_dops
1671  */
1672  template <typename SumFunction = std::plus<ValueType> >
1673  auto PrefixSum(const SumFunction& sum_function = SumFunction(),
1674  const ValueType& initial_element = ValueType()) const;
1675 
1676  /*!
1677  * ExPrefixSum is a DOp, which computes the exclusive prefix sum of all
1678  * elements. The sum function defines how two elements are combined to a
1679  * single element.
1680  *
1681  * \param sum_function Sum function (any associative function).
1682  *
1683  * \param initial_element Initial element of the sum function.
1684  *
1685  * \ingroup dia_dops
1686  */
1687  template <typename SumFunction = std::plus<ValueType> >
1688  auto ExPrefixSum(const SumFunction& sum_function = SumFunction(),
1689  const ValueType& initial_element = ValueType()) const;
1690 
1691  /*!
1692  * Window is a DOp, which applies a window function to every k
1693  * consecutive items in a DIA. The window function is also given the index
1694  * of the first item, and can output zero or more items via an Emitter.
1695  *
1696  * \param window_size the size of the delivered window. Signature: TODO(tb).
1697  *
1698  * \param window_function Window function applied to each k item.
1699  *
1700  * \ingroup dia_dops
1701  */
1702  template <typename WindowFunction>
1703  auto Window(size_t window_size,
1704  const WindowFunction& window_function = WindowFunction()) const;
1705 
1706  /*!
1707  * Window is a DOp, which applies a window function to every k
1708  * consecutive items in a DIA. The window function is also given the index
1709  * of the first item, and can output zero or more items via an Emitter.
1710  *
1711  * \param window_size the size of the delivered window. Signature: TODO(tb).
1712  *
1713  * \param window_function Window function applied to each k item.
1714  *
1715  * \param partial_window_function Window function applied to less than k
1716  * items.
1717  *
1718  * \ingroup dia_dops
1719  */
1720  template <typename WindowFunction, typename PartialWindowFunction>
1721  auto Window(size_t window_size,
1722  const WindowFunction& window_function,
1723  const PartialWindowFunction& partial_window_function) const;
1724 
1725  /*!
1726  * Window is a DOp, which applies a window function to every k
1727  * consecutive items in a DIA. The window function is also given the index
1728  * of the first item, and can output zero or more items via an Emitter.
1729  *
1730  * \param window_size the size of the delivered window.
1731  *
1732  * \param window_function Window function applied to each k item.
1733  *
1734  * \ingroup dia_dops
1735  */
1736  template <typename WindowFunction>
1737  auto Window(struct DisjointTag const&, size_t window_size,
1738  const WindowFunction& window_function) const;
1739 
1740  /*!
1741  * FlatWindow is a DOp, which applies a window function to every k
1742  * consecutive items in a DIA. The window function is also given the index
1743  * of the first item, and can output zero or more items via an Emitter.
1744  *
1745  * \param window_size the size of the delivered window. Signature: TODO(tb).
1746  *
1747  * \param window_function Window function applied to each k item.
1748  *
1749  * \ingroup dia_dops
1750  */
1751  template <typename ValueOut, typename WindowFunction>
1752  auto FlatWindow(
1753  size_t window_size,
1754  const WindowFunction& window_function = WindowFunction()) const;
1755 
1756  /*!
1757  * FlatWindow is a DOp, which applies a window function to every k
1758  * consecutive items in a DIA. The window function is also given the index
1759  * of the first item, and can output zero or more items via an Emitter.
1760  *
1761  * \param window_size the size of the delivered window. Signature: TODO(tb).
1762  *
1763  * \param window_function Window function applied to each k item.
1764  *
1765  * \param partial_window_function Window function applied to less than k
1766  * items.
1767  *
1768  * \ingroup dia_dops
1769  */
1770  template <typename ValueOut, typename WindowFunction,
1771  typename PartialWindowFunction>
1772  auto FlatWindow(size_t window_size,
1773  const WindowFunction& window_function,
1774  const PartialWindowFunction& partial_window_function) const;
1775 
1776  /*!
1777  * FlatWindow is a DOp, which applies a window function to every k
1778  * consecutive items in a DIA. The window function is also given the index
1779  * of the first item, and can output zero or more items via an Emitter.
1780  *
1781  * \param window_size the size of the delivered window. Signature: TODO(tb).
1782  *
1783  * \param window_function Window function applied to each k item.
1784  *
1785  * \ingroup dia_dops
1786  */
1787  template <typename ValueOut, typename WindowFunction>
1788  auto FlatWindow(struct DisjointTag const&, size_t window_size,
1789  const WindowFunction& window_function) const;
1790 
1791  /*!
1792  * Concat is a DOp, which concatenates any number of DIAs to a single DIA.
1793  * All input DIAs must contain the same type, which is also the output DIA's
1794  * type.
1795  *
1796  * The concat operation balances all input data, so that each worker will
1797  * have an equal number of elements when the concat completes.
1798  *
1799  * \ingroup dia_dops
1800  */
1801  template <typename SecondDIA>
1802  auto Concat(const SecondDIA& second_dia) const;
1803 
1804  /*!
1805  * Rebalance is a DOp, which rebalances a single DIA among all workers; in
1806  * general, this operation is needed only if previous steps are known to
1807  * create heavy imbalance (e.g. like Filter()s which cut DIAs to ranges).
1808  *
1809  * \ingroup dia_dops
1810  */
1811  auto Rebalance() const;
1812 
1813  /*!
1814  * Create a CollapseNode which is mainly used to collapse the LOp chain into
1815  * a DIA<T> with an empty stack. This is most often necessary for iterative
1816  * algorithms, where a DIA<T> reference variable is updated in each
1817  * iteration.
1818  *
1819  * \ingroup dia_dops
1820  */
1821  DIA<ValueType> Collapse() const;
1822 
1823  /*!
1824  * Create a CacheNode which contains all items of a DIA in calculated plain
1825  * format. This is needed if a DIA is reused many times, in order to avoid
1826  * recalculating a PostOp multiple times.
1827  *
1828  * \ingroup dia_dops
1829  */
1830  DIA<ValueType> Cache() const;
1831 
1832  //! \}
1833 
1834 private:
1835  //! The DIANode which DIA points to. The node represents the latest DOp
1836  //! or Action performed previously.
1838 
1839  //! The local function chain, which stores the chained lambda function from
1840  //! the last DIANode to this DIA.
1842 
1843  //! DIA serial id for logging, matches DIANode::id_ for DOps.
1844  size_t dia_id_ = 0;
1845 
1846  //! static DIA (LOp or DOp) node label string, may match DIANode::label_.
1847  const char* label_ = nullptr;
1848 
1849  //! deliver next DIA serial id
1850  size_t next_dia_id() { return context().next_dia_id(); }
1851 };
1852 
1853 //! \}
1854 
1855 } // namespace api
1856 
1857 //! imported from api namespace
1858 using api::DIA;
1859 
1860 //! imported from api namespace
1861 using api::DisjointTag;
1862 
1863 //! imported from api namespace
1864 using api::VolatileKeyFlag;
1865 
1866 //! imported from api namespace
1867 using api::VolatileKeyTag;
1868 
1869 //! imported from api namespace
1870 using api::NoVolatileKeyTag;
1871 
1872 //! imported from api namespace
1874 
1875 //! imported from api namespace
1876 using api::CutTag;
1877 
1878 //! imported from api namespace
1879 using api::PadTag;
1880 
1881 //! imported from api namespace
1882 using api::NoRebalanceTag;
1883 
1884 //! imported from api namespace
1885 using api::DuplicateDetectionFlag;
1886 
1887 //! imported from api namespace
1889 
1890 //! imported from api namespace
1892 
1893 //! imported from api namespace
1894 using api::LocationDetectionFlag;
1895 
1896 //! imported from api namespace
1898 
1899 //! imported from api namespace
1901 
1902 } // namespace thrill
1903 
1904 #endif // !THRILL_API_DIA_HEADER
1905 
1906 /******************************************************************************/
std::vector< ValueType > AllGather() const
Returns the whole DIA in an std::vector on each worker.
Definition: all_gather.hpp:114
const DIA & Keep(size_t increase=1) const
Mark the referenced DIANode for keeping, which makes children not consume the data when executing...
Definition: dia.hpp:297
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
ValueType_ ValueType
Definition: dia.hpp:152
Future< std::vector< ValueType > > AllGatherFuture() const
Returns the whole DIA in an std::vector on each worker.
Definition: all_gather.hpp:143
Context & context()
Returns the api::Context of this DIABase.
Definition: dia_base.hpp:208
Type[] Array
A template to make writing temporary arrays easy: Array<int>{ 1, 2, 3 }.
Definition: json_logger.hpp:64
Future< ValueType > AllReduceFuture(const ReduceFunction &reduce_function, const ValueType &initial_value=ValueType()) const
AllReduce is an ActionFuture, which computes the reduction sum of all elements globally and delivers ...
Definition: all_reduce.hpp:121
const DIA & Execute() const
Execute DIA's scope and parents such that this (Action)Node is Executed.
Definition: dia.hpp:322
auto Sample(size_t sample_size) const
Select up to sample_size items uniformly at random and return a new DIA<T>.
Definition: sample.hpp:247
const struct LocationDetectionFlag< true > LocationDetectionTag
global const LocationDetectionFlag instance
Definition: dia.hpp:122
bool consume() const
return value of consume flag.
Definition: context.hpp:378
typename Stack::Input StackInput
Definition: dia.hpp:160
size_t dia_id_
DIA serial id for logging, matches DIANode::id_ for DOps.
Definition: dia.hpp:1844
const struct PadTag PadTag
global const PadTag instance
Definition: dia.hpp:83
DIANodePtr node_
Definition: dia.hpp:1837
const char * label_
static DIA (LOp or DOp) node label string, may match DIANode::label_.
Definition: dia.hpp:1847
std::vector< ValueType > Gather(size_t target_id=0) const
Gather is an Action, which collects all data of the DIA into a vector at the given worker...
Definition: gather.hpp:93
virtual size_t consume_counter() const
Returns consume_counter_.
Definition: dia_base.hpp:226
DIA()=default
default-constructor: invalid DIA
const DIANodePtr & node() const
Returns a pointer to the according DIANode.
Definition: dia.hpp:255
bool IsValid() const
Return whether the DIA is valid.
Definition: dia.hpp:175
const Stack & stack() const
Returns the stored function chain.
Definition: dia.hpp:267
auto Zip(const SecondDIA &second_dia, const ZipFunction &zip_function) const
Zips two DIAs of equal size in style of functional programming by applying zip_function to the i-th e...
Definition: zip.hpp:674
auto Concat(const SecondDIA &second_dia) const
Concat is a DOp, which concatenates any number of DIAs to a single DIA.
Definition: concat.hpp:388
const struct VolatileKeyFlag< true > VolatileKeyTag
global const VolatileKeyFlag instance
Definition: dia.hpp:48
DIA(DIANodePtr &&node, const Stack &stack, size_t dia_id, const char *label)
Constructor of a new DIA supporting move semantics of nodes.
Definition: dia.hpp:212
const char * label() const
Returns label_.
Definition: dia.hpp:288
tag structure for ReduceToIndex()
Definition: dia.hpp:54
const struct SkipPreReducePhaseTag SkipPreReducePhaseTag
global const SkipPreReducePhaseTag instance
Definition: dia.hpp:59
void WriteLines(const std::string &filepath, size_t target_file_size=128 *1024 *1024) const
WriteLines is an Action, which writes std::strings to multiple output files.
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
auto Filter(const FilterFunction &filter_function) const
Each item of a DIA is tested using filter_function : to determine whether it is copied into the outp...
Definition: dia.hpp:390
ValueType AllReduce(const ReduceFunction &reduce_function, const ValueType &initial_value=ValueType()) const
AllReduce is an Action, which computes the reduction sum of all elements globally and delivers the sa...
Definition: all_reduce.hpp:87
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
Context & context() const
Return context_ of DIANode, e.g. for creating new LOps and DOps.
Definition: dia.hpp:273
const DIA & KeepForever() const
Mark the referenced DIANode for keeping forever, which makes children not consume the data when execu...
Definition: dia.hpp:312
auto SortStable(const CompareFunction &compare_function=CompareFunction()) const
SortStable is a DOp, which sorts a given DIA stably according to the given compare_function.
Definition: sort.hpp:873
DIA(const DIANodePtr &node, const Stack &stack, size_t dia_id, const char *label)
Constructor of a new DIA with a pointer to a DIANode and a function chain from the DIANode to this DI...
Definition: dia.hpp:194
auto Sort(const CompareFunction &compare_function=CompareFunction()) const
Sort is a DOp, which sorts a given DIA according to the given compare_function.
Definition: sort.hpp:800
Type * get() const noexcept
return the enclosed pointer.
Specialized template class for ActionFuture which return void.
tag structure for Zip()
Definition: dia.hpp:78
auto Rebalance() const
Rebalance is a DOp, which rebalances a single DIA among all workers; in general, this operation is ne...
Definition: rebalance.hpp:122
A FunctionStack is a chain of functor that can be folded to a single functor (which is usually optimi...
double HyperLogLog() const
Compute the approximate number of distinct elements in the DIA.
Definition: hyperloglog.hpp:64
size_t next_dia_id()
deliver next DIA serial id
Definition: dia.hpp:1850
const struct VolatileKeyFlag< false > NoVolatileKeyTag
global const VolatileKeyFlag instance
Definition: dia.hpp:51
auto PrefixSum(const SumFunction &sum_function=SumFunction(), const ValueType &initial_element=ValueType()) const
PrefixSum is a DOp, which computes the (inclusive) prefix sum of all elements.
Definition: prefixsum.hpp:134
Future< size_t > SizeFuture() const
Lazily computes the total size of all elements across all workers.
Definition: size.hpp:97
auto ReduceToIndex(const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, size_t size, const ValueType &neutral_element=ValueType(), const ReduceConfig &reduce_config=ReduceConfig()) const
ReduceToIndex is a DOp, which groups elements of the DIA with the key_extractor returning an unsigned...
tag structure for ReduceByKey(), and ReduceToIndex()
Definition: dia.hpp:42
auto ReducePair(const ReduceFunction &reduce_function, const ReduceConfig &reduce_config=ReduceConfig()) const
ReducePair is a DOp, which groups key-value-pairs in the input DIA by their key and reduces each key-...
size_t next_dia_id()
Returns next_dia_id_ to generate DIA::id_ serial.
Definition: context.hpp:391
ValueType Sum(const SumFunction &sum_function=SumFunction(), const ValueType &initial_value=ValueType()) const
Sum is an Action, which computes the sum of all elements globally.
Definition: sum.hpp:23
Context & ctx() const
Return context_ of DIANode, e.g. for creating new LOps and DOps.
Definition: dia.hpp:279
int value
Definition: gen_data.py:41
tag structure for Zip()
Definition: dia.hpp:70
size_t reference_count() const noexcept
Return the number of references to this object (for debugging)
void Print(const std::string &name) const
Print is an Action, which collects all data of the DIA at the worker 0 and prints using ostream seria...
Definition: print.hpp:48
common::JsonLogger logger_
Definition: context.hpp:462
common::FunctionTraits< Function > FunctionTraits
alias for convenience.
Definition: dia.hpp:147
auto Window(size_t window_size, const WindowFunction &window_function=WindowFunction()) const
Window is a DOp, which applies a window function to every k consecutive items in a DIA...
Definition: window.hpp:286
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
auto Merge(const SecondDIA &second_dia, const Comparator &comparator=Comparator()) const
Merge is a DOp, which merges two sorted DIAs to a single sorted DIA.
Definition: merge.hpp:716
void WriteLinesOne(const std::string &filepath) const
WriteLinesOne is an Action, which writes std::strings to a single output file.
Stack stack_
Definition: dia.hpp:1841
The return type class for all ActionFutures.
Definition: action_node.hpp:83
auto GroupByKey(const KeyExtractor &key_extractor, const GroupByFunction &groupby_function) const
GroupByKey is a DOp, which groups elements of the DIA by its key.
const struct LocationDetectionFlag< false > NoLocationDetectionTag
global const LocationDetectionFlag instance
Definition: dia.hpp:125
static constexpr bool stack_empty
boolean indication whether this FunctionStack is empty
Definition: dia.hpp:163
tag structure for Zip()
Definition: dia.hpp:86
const struct DuplicateDetectionFlag< true > DuplicateDetectionTag
global const DuplicateDetectionFlag instance
Definition: dia.hpp:109
auto ReduceByKey(const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, const ReduceConfig &reduce_config=ReduceConfig()) const
ReduceByKey is a DOp, which groups elements of the DIA with the key_extractor and reduces each key-bu...
virtual void SetConsumeCounter(size_t counter)
Definition: dia_base.hpp:248
const struct DuplicateDetectionFlag< false > NoDuplicateDetectionTag
global const DuplicateDetectionFlag instance
Definition: dia.hpp:112
auto Map(const MapFunction &map_function) const
Map applies map_function : to each item of a DIA and delivers a new DIA contains the returned values...
Definition: dia.hpp:345
auto GroupToIndex(const KeyExtractor &key_extractor, const GroupByFunction &groupby_function, const size_t size, const ValueOut &neutral_element=ValueOut()) const
GroupBy is a DOp, which groups elements of the DIA by its key.
auto FlatMap(const FlatmapFunction &flatmap_function) const
Each item of a DIA is expanded by the flatmap_function : to zero or more items of different type...
Definition: dia.hpp:441
DIA< ValueType > Cache() const
Create a CacheNode which contains all items of a DIA in calculated plain format.
Definition: cache.hpp:93
static const bool value
Definition: dia.hpp:44
tag structure for ReduceByKey()
Definition: dia.hpp:103
Future< void > WriteLinesOneFuture(const std::string &filepath) const
WriteLinesOne is an ActionFuture, which writes std::strings to a single output file.
auto BernoulliSample(double p) const
Each item of a DIA is copied into the output DIA with success probability p (an independent Bernoulli...
size_t id() const
Returns id_.
Definition: dia.hpp:285
Future< ValueType > SumFuture(const SumFunction &sum_function=SumFunction(), const ValueType &initial_value=ValueType()) const
Sum is an ActionFuture, which computes the sum of all elements globally.
Definition: sum.hpp:57
size_t node_refcount() const
Returns the number of references to the according DIANode.
Definition: dia.hpp:261
ValueType Min(const ValueType &initial_value=ValueType()) const
Min is an Action, which computes the minimum of all elements globally.
Definition: min.hpp:22
tag structure for Read()
Definition: dia.hpp:94
void WriteBinary(const std::string &filepath, size_t max_file_size=128 *1024 *1024) const
WriteBinary is a function, which writes a DIA to many files per worker.
auto ExPrefixSum(const SumFunction &sum_function=SumFunction(), const ValueType &initial_element=ValueType()) const
ExPrefixSum is a DOp, which computes the exclusive prefix sum of all elements.
Definition: prefixsum.hpp:168
DIA(DIANodePtr &&node)
Constructor of a new DIA with a real backing DIABase.
Definition: dia.hpp:223
ValueType Max(const ValueType &initial_value=ValueType()) const
Max is an Action, which computes the maximum of all elements globally.
Definition: max.hpp:22
auto FlatWindow(size_t window_size, const WindowFunction &window_function=WindowFunction()) const
FlatWindow is a DOp, which applies a window function to every k consecutive items in a DIA...
Definition: window.hpp:271
Future< ValueType > MaxFuture(const ValueType &initial_value=ValueType()) const
Max is an ActionFuture, which computes the maximum of all elements globally.
Definition: max.hpp:32
auto ZipWithIndex(const ZipFunction &zip_function) const
Zips each item of a DIA with its zero-based array index.
tag structure for Window() and FlatWindow()
Definition: dia.hpp:62
Future< void > WriteBinaryFuture(const std::string &filepath, size_t max_file_size=128 *1024 *1024) const
WriteBinary is a function, which writes a DIA to many files per worker.
tag structure for GroupByKey(), and InnerJoin()
Definition: dia.hpp:116
void AssertValid() const
Assert that the DIA is valid.
Definition: dia.hpp:178
size_t Size() const
Computes the total size of all elements across all workers.
Definition: size.hpp:87
virtual void IncConsumeCounter(size_t counter)
Definition: dia_base.hpp:230
const struct CutTag CutTag
global const CutTag instance
Definition: dia.hpp:75
Stack_ Stack
Type of this function stack.
Definition: dia.hpp:155
const struct NoRebalanceTag NoRebalanceTag
global const NoRebalanceTag instance
Definition: dia.hpp:91
const struct DisjointTag DisjointTag
global const DisjointTag instance
Definition: dia.hpp:67
static const bool value
Definition: dia.hpp:118
auto Union(const SecondDIA &second_dia) const
Union is a LOp, which creates the union of all items from any number of DIAs as a single DIA...
Definition: union.hpp:378
static constexpr size_t kNeverConsume
Never full consume.
Definition: dia_base.hpp:324
Future< ValueType > MinFuture(const ValueType &initial_value=ValueType()) const
Min is an ActionFuture, which computes the minimum of all elements globally.
Definition: min.hpp:32
Future< void > WriteLinesFuture(const std::string &filepath, size_t target_file_size=128 *1024 *1024) const
WriteLines is an ActionFuture, which writes std::strings to multiple output files.
DIA< ValueType > Collapse() const
Create a CollapseNode which is mainly used to collapse the LOp chain into a DIA<T> with an empty stac...
Definition: collapse.hpp:159