17 #ifndef THRILL_API_DIA_HEADER 18 #define THRILL_API_DIA_HEADER 44 static const bool value = Value;
102 template <
bool Value>
115 template <
bool Value>
139 template <
typename ValueType_,
146 template <
typename Function>
163 static constexpr
bool stack_empty = Stack::empty;
175 bool IsValid()
const {
return node_.get() !=
nullptr; }
195 size_t dia_id,
const char* label)
196 : node_(node), stack_(stack),
197 dia_id_(dia_id), label_(label) { }
213 size_t dia_id,
const char* label)
214 : node_(
std::move(node)), stack_(stack),
215 dia_id_(dia_id), label_(label) { }
225 node->dia_id(), node->label()) { }
233 #ifdef THRILL_DOXYGEN_IGNORE 234 template <
typename AnyStack>
237 template <
typename AnyStack>
239 #if __GNUC__ && !__clang__ 241 __attribute__ ((warning(
242 "Casting to DIA creates LOpNode instead of inline chaining.\n" 243 "Consider whether you can use auto instead of DIA.")));
244 #elif __GNUC__ && __clang__ 245 __attribute__ ((deprecated));
249 #endif // THRILL_DOXYGEN_IGNORE 263 return node_->reference_count();
275 return node_->context();
281 return node_->context();
285 size_t id()
const {
return dia_id_; }
288 const char *
label()
const {
return label_; }
297 if (node_->context().consume() && node_->consume_counter() == 0) {
298 die(
"Dispose() called on " 299 << *node_ <<
" which was already consumed.");
312 if (node_->context().consume() && node_->consume_counter() == 0) {
313 die(
"Keep() called on " 314 << *node_ <<
" which was already consumed.");
316 node_->IncConsumeCounter(increase);
357 template <
typename MapFunction>
358 auto Map(
const MapFunction& map_function)
const {
365 auto conv_map_function =
366 [map_function](
const MapArgument& input,
auto emit_func) {
367 emit_func(map_function(input));
372 "MapFunction has the wrong input type");
374 size_t new_id = context().next_dia_id();
376 node_->context().logger_
377 <<
"dia_id" << new_id
380 <<
"event" <<
"create" 384 auto new_stack = stack_.push(conv_map_function);
386 node_, new_stack, new_id,
"Map");
404 template <
typename FilterFunction>
405 auto Filter(
const FilterFunction& filter_function)
const {
410 auto conv_filter_function =
411 [filter_function](
const FilterArgument& input,
auto emit_func) {
412 if (filter_function(input)) emit_func(input);
417 "FilterFunction has the wrong input type");
419 size_t new_id = context().next_dia_id();
421 node_->context().logger_
422 <<
"dia_id" << new_id
423 <<
"label" <<
"Filter" 425 <<
"event" <<
"create" 429 auto new_stack = stack_.push(conv_filter_function);
431 node_, new_stack, new_id,
"Filter");
457 template <
typename ResultType = ValueType,
typename FlatmapFunction>
458 auto FlatMap(
const FlatmapFunction& flatmap_function)
const {
461 size_t new_id = context().next_dia_id();
463 node_->context().logger_
464 <<
"dia_id" << new_id
465 <<
"label" <<
"FlatMap" 467 <<
"event" <<
"create" 471 auto new_stack = stack_.push(flatmap_function);
473 node_, new_stack, new_id,
"FlatMap");
482 auto BernoulliSample(
double p)
const;
495 template <
typename SecondDIA>
496 auto Union(
const SecondDIA& second_dia)
const;
525 std::vector<ValueType> AllGather()
const;
536 void AllGather(std::vector<ValueType>* out_vector)
const;
562 void Print(
const std::string& name, std::ostream& out)
const;
571 std::vector<ValueType> Gather(
size_t target_id = 0)
const;
580 void Gather(
size_t target_id, std::vector<ValueType>* out_vector)
const;
586 auto Sample(
size_t sample_size)
const;
598 template <
typename ReduceFunction>
599 ValueType AllReduce(
const ReduceFunction& reduce_function)
const;
613 template <
typename ReduceFunction>
614 ValueType AllReduce(
const ReduceFunction& reduce_function,
627 template <
typename ReduceFunction>
629 const ReduceFunction& reduce_function)
const;
643 template <
typename ReduceFunction>
645 const ReduceFunction& reduce_function,
657 template <
typename SumFunction = std::plus<ValueType> >
658 ValueType Sum(
const SumFunction& sum_function = SumFunction())
const;
671 template <
typename SumFunction = std::plus<ValueType> >
672 ValueType Sum(
const SumFunction& sum_function,
687 template <
typename SumFunction = std::plus<ValueType> >
689 const SumFunction& sum_function = SumFunction(),
783 double HyperLogLog()
const;
793 void WriteLinesOne(
const std::string& filepath)
const;
824 size_t target_file_size = 128* 1024* 1024)
const;
845 size_t target_file_size = 128* 1024* 1024)
const;
865 size_t max_file_size = 128* 1024* 1024)
const;
886 size_t max_file_size = 128* 1024* 1024)
const;
929 template <
typename KeyExtractor,
typename ReduceFunction,
932 const KeyExtractor& key_extractor,
933 const ReduceFunction& reduce_function,
934 const ReduceConfig& reduce_config = ReduceConfig())
const;
967 template <
typename KeyExtractor,
typename ReduceFunction,
968 typename ReduceConfig,
typename KeyHashFunction>
970 const KeyExtractor& key_extractor,
971 const ReduceFunction& reduce_function,
972 const ReduceConfig& reduce_config,
973 const KeyHashFunction& key_hash_function)
const;
1008 template <
typename KeyExtractor,
typename ReduceFunction,
1009 typename ReduceConfig,
1010 typename KeyHashFunction,
typename KeyEqualFunction>
1012 const KeyExtractor& key_extractor,
1013 const ReduceFunction& reduce_function,
1014 const ReduceConfig& reduce_config,
1015 const KeyHashFunction& key_hash_function,
1016 const KeyEqualFunction& key_equal_function)
const;
1055 template <
bool VolatileKeyValue,
1056 typename KeyExtractor,
typename ReduceFunction,
1058 typename KeyHashFunction =
1059 std::hash<typename FunctionTraits<KeyExtractor>::result_type>,
1060 typename KeyEqualFunction =
1061 std::equal_to<typename FunctionTraits<KeyExtractor>::result_type> >
1064 const KeyExtractor& key_extractor,
1065 const ReduceFunction& reduce_function,
1066 const ReduceConfig& reduce_config = ReduceConfig(),
1067 const KeyHashFunction& key_hash_function = KeyHashFunction(),
1068 const KeyEqualFunction& key_equal_function = KeyEqualFunction())
const;
1107 template <
bool DuplicateDetectionValue,
1108 typename KeyExtractor,
typename ReduceFunction,
1110 typename KeyHashFunction =
1111 std::hash<typename FunctionTraits<KeyExtractor>::result_type>,
1112 typename KeyEqualFunction =
1113 std::equal_to<typename FunctionTraits<KeyExtractor>::result_type> >
1116 const KeyExtractor& key_extractor,
1117 const ReduceFunction& reduce_function,
1118 const ReduceConfig& reduce_config = ReduceConfig(),
1119 const KeyHashFunction& key_hash_function = KeyHashFunction(),
1120 const KeyEqualFunction& key_equal_function = KeyEqualFunction())
const;
1157 template <
bool VolatileKeyValue,
1158 bool DuplicateDetectionValue,
1159 typename KeyExtractor,
typename ReduceFunction,
1161 typename KeyHashFunction =
1162 std::hash<typename FunctionTraits<KeyExtractor>::result_type>,
1163 typename KeyEqualFunction =
1164 std::equal_to<typename FunctionTraits<KeyExtractor>::result_type> >
1168 const KeyExtractor& key_extractor,
1169 const ReduceFunction& reduce_function,
1170 const ReduceConfig& reduce_config = ReduceConfig(),
1171 const KeyHashFunction& key_hash_function = KeyHashFunction(),
1172 const KeyEqualFunction& key_equal_function = KeyEqualFunction())
const;
1196 template <
typename ReduceFunction,
1199 const ReduceFunction& reduce_function,
1200 const ReduceConfig& reduce_config = ReduceConfig())
const;
1226 template <
typename ReduceFunction,
typename ReduceConfig,
1227 typename KeyHashFunction>
1229 const ReduceFunction& reduce_function,
1230 const ReduceConfig& reduce_config,
1231 const KeyHashFunction& key_hash_function)
const;
1259 template <
typename ReduceFunction,
typename ReduceConfig,
1260 typename KeyHashFunction,
typename KeyEqualFunction>
1262 const ReduceFunction& reduce_function,
1263 const ReduceConfig& reduce_config,
1264 const KeyHashFunction& key_hash_function,
1265 const KeyEqualFunction& key_equal_function)
const;
1293 template <
bool DuplicateDetectionValue,
1294 typename ReduceFunction,
1296 typename KeyHashFunction,
1297 typename KeyEqualFunction
1301 const ReduceFunction& reduce_function,
1302 const ReduceConfig& reduce_config = ReduceConfig(),
1303 const KeyHashFunction& key_hash_function = KeyHashFunction(),
1304 const KeyEqualFunction& key_equal_function = KeyEqualFunction())
const;
1344 template <
typename KeyExtractor,
typename ReduceFunction,
1347 const KeyExtractor& key_extractor,
1348 const ReduceFunction& reduce_function,
1351 const ReduceConfig& reduce_config = ReduceConfig())
const;
1392 template <
bool VolatileKeyValue,
1393 typename KeyExtractor,
typename ReduceFunction,
1397 const KeyExtractor& key_extractor,
1398 const ReduceFunction& reduce_function,
1401 const ReduceConfig& reduce_config = ReduceConfig())
const;
1442 template <
typename KeyExtractor,
typename ReduceFunction,
1446 const KeyExtractor& key_extractor,
1447 const ReduceFunction& reduce_function,
1450 const ReduceConfig& reduce_config = ReduceConfig())
const;
1484 template <
typename ValueOut,
typename KeyExtractor,
1485 typename GroupByFunction>
1486 auto GroupByKey(
const KeyExtractor& key_extractor,
1487 const GroupByFunction& groupby_function)
const;
1523 template <
typename ValueOut,
typename KeyExtractor,
1524 typename GroupByFunction,
typename HashFunction>
1525 auto GroupByKey(
const KeyExtractor& key_extractor,
1526 const GroupByFunction& groupby_function,
1527 const HashFunction& hash_function)
const;
1563 template <
typename ValueOut,
bool LocationDetectionTagValue,
1564 typename KeyExtractor,
typename GroupByFunction,
1565 typename HashFunction =
1566 std::hash<typename FunctionTraits<KeyExtractor>::result_type>
1569 const KeyExtractor& key_extractor,
1570 const GroupByFunction& groupby_function,
1571 const HashFunction& hash_function = HashFunction())
const;
1613 template <
typename ValueOut,
typename KeyExtractor,
1614 typename GroupByFunction>
1615 auto GroupToIndex(
const KeyExtractor& key_extractor,
1616 const GroupByFunction& groupby_function,
1618 const ValueOut& neutral_element = ValueOut())
const;
1642 template <
typename ZipFunction,
typename SecondDIA>
1643 auto Zip(
const SecondDIA& second_dia,
1644 const ZipFunction& zip_function)
const;
1668 template <
typename ZipFunction,
typename SecondDIA>
1669 auto Zip(
struct CutTag const&,
const SecondDIA& second_dia,
1670 const ZipFunction& zip_function)
const;
1694 template <
typename ZipFunction,
typename SecondDIA>
1695 auto Zip(
struct PadTag const&,
const SecondDIA& second_dia,
1696 const ZipFunction& zip_function)
const;
1722 template <
typename ZipFunction,
typename SecondDIA>
1724 const ZipFunction& zip_function)
const;
1736 template <
typename ZipFunction>
1737 auto ZipWithIndex(
const ZipFunction& zip_function)
const;
1753 template <
typename CompareFunction = std::less<ValueType> >
1754 auto Sort(
const CompareFunction& compare_function = CompareFunction())
const;
1773 template <
typename CompareFunction,
typename SortAlgorithm>
1774 auto Sort(
const CompareFunction& compare_function,
1775 const SortAlgorithm& sort_algorithm)
const;
1791 template <
typename CompareFunction = std::less<ValueType> >
1792 auto SortStable(
const CompareFunction& compare_function = CompareFunction())
const;
1812 template <
typename CompareFunction,
typename SortAlgorithm>
1813 auto SortStable(
const CompareFunction& compare_function,
1814 const SortAlgorithm& sort_algorithm)
const;
1832 template <
typename Comparator = std::less<ValueType>,
typename SecondDIA>
1833 auto Merge(
const SecondDIA& second_dia,
1834 const Comparator& comparator = Comparator())
const;
1849 template <
typename SumFunction = std::plus<ValueType> >
1850 auto PrefixSum(
const SumFunction& sum_function = SumFunction(),
1866 template <
typename SumFunction = std::plus<ValueType> >
1867 auto ExPrefixSum(
const SumFunction& sum_function = SumFunction(),
1883 template <
typename WindowFunction>
1884 auto Window(
size_t window_size,
1885 const WindowFunction& window_function = WindowFunction())
const;
1903 template <
typename WindowFunction,
typename PartialWindowFunction>
1904 auto Window(
size_t window_size,
1905 const WindowFunction& window_function,
1906 const PartialWindowFunction& partial_window_function)
const;
1921 template <
typename WindowFunction>
1922 auto Window(
struct DisjointTag const&,
size_t window_size,
1923 const WindowFunction& window_function)
const;
1938 template <
typename ValueOut,
typename WindowFunction>
1941 const WindowFunction& window_function = WindowFunction())
const;
1959 template <
typename ValueOut,
typename WindowFunction,
1960 typename PartialWindowFunction>
1961 auto FlatWindow(
size_t window_size,
1962 const WindowFunction& window_function,
1963 const PartialWindowFunction& partial_window_function)
const;
1978 template <
typename ValueOut,
typename WindowFunction>
1979 auto FlatWindow(
struct DisjointTag const&,
size_t window_size,
1980 const WindowFunction& window_function)
const;
1992 template <
typename SecondDIA>
1993 auto Concat(
const SecondDIA& second_dia)
const;
2002 auto Rebalance()
const;
2038 const char* label_ =
nullptr;
2095 #endif // !THRILL_API_DIA_HEADER
DIA is the interface between the user and the Thrill framework.
auto Union(const FirstDIA &first_dia, const DIAs &... dias)
Union is a LOp, which creates the union of all items from any number of DIAs as a single DIA...
size_t id() const
Returns id_.
Type[] Array
A template to make writing temporary arrays easy: Array<int>{ 1, 2, 3 }.
const struct LocationDetectionFlag< true > LocationDetectionTag
global const LocationDetectionFlag instance
bool IsValid() const
Return whether the DIA is valid.
typename Stack::Input StackInput
const struct PadTag PadTag
global const PadTag instance
const struct VolatileKeyFlag< true > VolatileKeyTag
global const VolatileKeyFlag instance
const char * label() const
Returns label_.
void AssertValid() const
Assert that the DIA is valid.
DIA(DIANodePtr &&node, const Stack &stack, size_t dia_id, const char *label)
Constructor of a new DIA supporting move semantics of nodes.
Context & context() const
Return context_ of DIANode, e.g. for creating new LOps and DOps.
tag structure for ReduceToIndex()
const struct SkipPreReducePhaseTag SkipPreReducePhaseTag
global const SkipPreReducePhaseTag instance
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
auto FlatMap(const FlatmapFunction &flatmap_function) const
Each item of a DIA is expanded by the flatmap_function : to zero or more items of different type...
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
const DIA & Dispose() const
Dispose of the DIANode's data.
DIA(const DIANodePtr &node, const Stack &stack, size_t dia_id, const char *label)
Constructor of a new DIA with a pointer to a DIANode and a function chain from the DIANode to this DI...
const DIANodePtr & node() const
Returns a pointer to the according DIANode.
Specialized template class for ActionFuture which return void.
A FunctionStack is a chain of functor that can be folded to a single functor (which is usually optimi...
size_t next_dia_id()
deliver next DIA serial id
const struct VolatileKeyFlag< false > NoVolatileKeyTag
global const VolatileKeyFlag instance
const Stack & stack() const
Returns the stored function chain.
Context & ctx() const
Return context_ of DIANode, e.g. for creating new LOps and DOps.
tag structure for ReduceByKey(), and ReduceToIndex()
auto Merge(const Comparator &comparator, const FirstDIA &first_dia, const DIAs &... dias)
Merge is a DOp, which merges any number of sorted DIAs to a single sorted DIA.
auto Zip(const ZipFunction &zip_function, const DIA< FirstDIAType, FirstDIAStack > &first_dia, const DIAs &... dias)
Zips two DIAs of equal size in style of functional programming by applying zip_function to the i-th e...
auto Filter(const FilterFunction &filter_function) const
Each item of a DIA is tested using filter_function : to determine whether it is copied into the outp...
const DIA & Execute() const
Execute DIA's scope and parents such that this (Action)Node is Executed.
common::FunctionTraits< Function > FunctionTraits
alias for convenience.
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
const DIA & KeepForever() const
Mark the referenced DIANode for keeping forever, which makes children not consume the data when execu...
The return type class for all ActionFutures.
const struct LocationDetectionFlag< false > NoLocationDetectionTag
global const LocationDetectionFlag instance
auto Map(const MapFunction &map_function) const
Map applies map_function : to each item of a DIA and delivers a new DIA contains the returned values...
const struct DuplicateDetectionFlag< true > DuplicateDetectionTag
global const DuplicateDetectionFlag instance
const struct DuplicateDetectionFlag< false > NoDuplicateDetectionTag
global const DuplicateDetectionFlag instance
tag structure for ReduceByKey()
auto Concat(const FirstDIA &first_dia, const DIAs &... dias)
Concat is a DOp, which concatenates any number of DIAs to a single DIA.
size_t node_refcount() const
Returns the number of references to the according DIANode.
DIA(DIANodePtr &&node)
Constructor of a new DIA with a real backing DIABase.
tag structure for Window() and FlatWindow()
tag structure for GroupByKey(), and InnerJoin()
const struct CutTag CutTag
global const CutTag instance
const DIA & Keep(size_t increase=1) const
Mark the referenced DIANode for keeping, which makes children not consume the data when executing...
const struct NoRebalanceTag NoRebalanceTag
global const NoRebalanceTag instance
const struct DisjointTag DisjointTag
global const DisjointTag instance
static constexpr size_t kNeverConsume
Never full consume.