16 #ifndef THRILL_API_REDUCE_BY_KEY_HEADER 17 #define THRILL_API_REDUCE_BY_KEY_HEADER 30 #include <type_traits> 59 template <
typename ValueType,
60 typename KeyExtractor,
typename ReduceFunction,
61 typename ReduceConfig,
typename KeyHashFunction,
62 typename KeyEqualFunction,
const bool VolatileKey,
63 bool UseDuplicateDetection>
67 static constexpr
bool debug =
false;
70 using Super::context_;
72 using Key =
typename common::FunctionTraits<KeyExtractor>::result_type;
75 typename std::conditional<
76 VolatileKey, std::pair<Key, ValueType>, ValueType>::type;
88 void operator () (
const ValueType& item)
const 89 {
return node_->PushItem(item); }
100 template <
typename ParentDIA>
103 const KeyExtractor& key_extractor,
104 const ReduceFunction& reduce_function,
105 const ReduceConfig& config,
106 const KeyHashFunction& key_hash_function,
107 const KeyEqualFunction& key_equal_function)
108 :
Super(parent.ctx(), label, { parent.id() }, { parent.node() }),
109 mix_stream_(use_mix_stream_ ?
110 parent.ctx().GetNewMixStream(
this) :
nullptr),
111 cat_stream_(use_mix_stream_ ?
112 nullptr : parent.ctx().GetNewCatStream(
this)),
113 emitters_(use_mix_stream_ ?
114 mix_stream_->GetWriters() : cat_stream_->GetWriters()),
116 context_, Super::dia_id(), parent.ctx().num_workers(),
117 key_extractor, reduce_function, emitters_, config,
121 context_, Super::dia_id(), key_extractor, reduce_function,
127 auto pre_op_fn = [
this](
const ValueType& input) {
128 return pre_phase_.Insert(input);
132 auto lop_chain = parent.stack().push(pre_op_fn).fold();
133 parent.node()->
AddChild(
this, lop_chain);
143 LOG << *
this <<
" running StartPreOp";
144 if (!use_post_thread_) {
158 LOG << *
this <<
" running StopPreOp";
160 pre_phase_.FlushAll();
161 pre_phase_.CloseAll();
162 if (use_post_thread_) {
166 use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
178 if (!use_post_thread_ && !reduced_) {
184 use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
188 post_phase_.PushData(consume);
195 auto reader = mix_stream_->GetMixReader(
true);
196 sLOG <<
"reading data from" << mix_stream_->id()
197 <<
"to push into post phase which flushes to" << this->dia_id();
198 while (reader.HasNext()) {
199 post_phase_.Insert(reader.template Next<TableItem>());
204 auto reader = cat_stream_->GetCatReader(
true);
205 sLOG <<
"reading data from" << cat_stream_->id()
206 <<
"to push into post phase which flushes to" << this->dia_id();
207 while (reader.HasNext()) {
208 post_phase_.Insert(reader.template Next<TableItem>());
214 post_phase_.Dispose();
234 TableItem, Key, ValueType, KeyExtractor, ReduceFunction,
Emitter,
235 VolatileKey, ReduceConfig,
238 bool reduced_ =
false;
241 template <
typename ValueType,
typename Stack>
242 template <
typename KeyExtractor,
typename ReduceFunction,
typename ReduceConfig>
244 const KeyExtractor& key_extractor,
245 const ReduceFunction& reduce_function,
246 const ReduceConfig& reduce_config)
const {
248 using Key =
typename common::FunctionTraits<KeyExtractor>::result_type;
251 key_extractor, reduce_function, reduce_config,
252 std::hash<Key>(), std::equal_to<Key>());
255 template <
typename ValueType,
typename Stack>
256 template <
typename KeyExtractor,
typename ReduceFunction,
257 typename ReduceConfig,
typename KeyHashFunction>
259 const KeyExtractor& key_extractor,
260 const ReduceFunction& reduce_function,
261 const ReduceConfig& reduce_config,
262 const KeyHashFunction& key_hash_function)
const {
264 using Key =
typename common::FunctionTraits<KeyExtractor>::result_type;
267 key_extractor, reduce_function, reduce_config,
268 key_hash_function, std::equal_to<Key>());
271 template <
typename ValueType,
typename Stack>
272 template <
bool VolatileKeyValue,
273 typename KeyExtractor,
typename ReduceFunction,
typename ReduceConfig,
274 typename KeyHashFunction,
typename KeyEqualFunction>
277 const KeyExtractor& key_extractor,
278 const ReduceFunction& reduce_function,
279 const ReduceConfig& reduce_config,
280 const KeyHashFunction& key_hash_function,
281 const KeyEqualFunction& key_equal_funtion)
const {
285 key_extractor, reduce_function, reduce_config,
286 key_hash_function, key_equal_funtion);
289 template <
typename ValueType,
typename Stack>
290 template <
bool DuplicateDetectionValue,
291 typename KeyExtractor,
typename ReduceFunction,
typename ReduceConfig,
292 typename KeyHashFunction,
typename KeyEqualFunction>
295 const KeyExtractor& key_extractor,
296 const ReduceFunction& reduce_function,
297 const ReduceConfig& reduce_config,
298 const KeyHashFunction& key_hash_function,
299 const KeyEqualFunction& key_equal_funtion)
const {
303 key_extractor, reduce_function, reduce_config,
304 key_hash_function, key_equal_funtion);
307 template <
typename ValueType,
typename Stack>
308 template <
bool VolatileKeyValue,
309 bool DuplicateDetectionValue,
310 typename KeyExtractor,
typename ReduceFunction,
typename ReduceConfig,
311 typename KeyHashFunction,
typename KeyEqualFunction>
315 const KeyExtractor& key_extractor,
316 const ReduceFunction& reduce_function,
317 const ReduceConfig& reduce_config,
318 const KeyHashFunction& key_hash_function,
319 const KeyEqualFunction& key_equal_funtion)
const {
323 =
typename common::FunctionTraits<ReduceFunction>::result_type;
328 typename common::FunctionTraits<ReduceFunction>::template arg<0>
330 "ReduceFunction has the wrong input type");
335 typename common::FunctionTraits<ReduceFunction>::template arg<1>
337 "ReduceFunction has the wrong input type");
343 "ReduceFunction has the wrong output type");
347 typename std::decay<
typename common::FunctionTraits<KeyExtractor>::
348 template arg<0> >::type,
350 "KeyExtractor has the wrong input type");
353 DOpResult, KeyExtractor, ReduceFunction, ReduceConfig,
354 KeyHashFunction, KeyEqualFunction,
355 VolatileKeyValue, DuplicateDetectionValue>;
357 auto node = tlx::make_counting<ReduceNode>(
358 *
this,
"ReduceByKey",
359 key_extractor, reduce_function, reduce_config,
360 key_hash_function, key_equal_funtion);
368 template <
typename ValueType,
typename Stack>
369 template <
typename ReduceFunction,
typename ReduceConfig>
371 const ReduceFunction& reduce_function,
372 const ReduceConfig& reduce_config)
const {
374 using Key =
typename ValueType::first_type;
375 return ReducePair(reduce_function, reduce_config,
376 std::hash<Key>(), std::equal_to<Key>());
379 template <
typename ValueType,
typename Stack>
380 template <
typename ReduceFunction,
typename ReduceConfig,
381 typename KeyHashFunction>
383 const ReduceFunction& reduce_function,
384 const ReduceConfig& reduce_config,
385 const KeyHashFunction& key_hash_function)
const {
387 using Key =
typename ValueType::first_type;
388 return ReducePair(reduce_function, reduce_config,
389 key_hash_function, std::equal_to<Key>());
392 template <
typename ValueType,
typename Stack>
393 template <
typename ReduceFunction,
typename ReduceConfig,
394 typename KeyHashFunction,
typename KeyEqualFunction>
396 const ReduceFunction& reduce_function,
397 const ReduceConfig& reduce_config,
398 const KeyHashFunction& key_hash_function,
399 const KeyEqualFunction& key_equal_funtion)
const {
402 reduce_function, reduce_config,
403 key_hash_function, key_equal_funtion);
406 template <
typename ValueType,
typename Stack>
407 template <
bool DuplicateDetectionValue,
408 typename ReduceFunction,
typename ReduceConfig,
409 typename KeyHashFunction,
typename KeyEqualFunction>
412 const ReduceFunction& reduce_function,
413 const ReduceConfig& reduce_config,
414 const KeyHashFunction& key_hash_function,
415 const KeyEqualFunction& key_equal_funtion)
const {
419 =
typename common::FunctionTraits<ReduceFunction>::result_type;
422 "ValueType is not a pair");
426 typename ValueType::second_type,
427 typename common::FunctionTraits<ReduceFunction>::template arg<0>
429 "ReduceFunction has the wrong input type");
433 typename ValueType::second_type,
434 typename common::FunctionTraits<ReduceFunction>::template arg<1>
436 "ReduceFunction has the wrong input type");
441 typename ValueType::second_type>::
value,
442 "ReduceFunction has the wrong output type");
446 auto reduce_pair_function =
448 return ValueType(a.first, reduce_function(a.second, b.second));
453 decltype(key_extractor), decltype(reduce_pair_function),
454 ReduceConfig, KeyHashFunction, KeyEqualFunction,
455 false, DuplicateDetectionValue>;
457 auto node = tlx::make_counting<ReduceNode>(
459 key_extractor, reduce_pair_function, reduce_config,
460 key_hash_function, key_equal_funtion);
468 #endif // !THRILL_API_REDUCE_BY_KEY_HEADER std::thread CreateThread(Args &&... args)
create a std::thread and repeat creation if it fails
void ProcessChannel()
process the inbound data in the post reduce phase
#define sLOG
Default logging method: output if the local debug variable is true.
DIA is the interface between the user and the Thrill framework.
Description of the amount of RAM the internal data structures of a DIANode require.
auto ReduceByKey(const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, const ReduceConfig &reduce_config=ReduceConfig()) const
ReduceByKey is a DOp, which groups elements of the DIA with the key_extractor and reduces each key-bu...
auto ReducePair(const ReduceFunction &reduce_function, const ReduceConfig &reduce_config=ReduceConfig()) const
ReducePair is a DOp, which groups key-value-pairs in the input DIA by their key and reduces each key-...
void StartPreOp(size_t) final
Virtual method for preparing start of PushData.
static constexpr bool use_post_thread_
DIAMemUse PreOpMemUse() final
Amount of RAM used by PreOp after StartPreOp()
Emitter for PostPhase to push elements to next DIA object.
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
DIAMemUse PushDataMemUse() final
Amount of RAM used by PushData()
core::ReduceByHashPostPhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter, VolatileKey, ReduceConfig, HashIndexFunction, KeyEqualFunction > post_phase_
data::CatStreamPtr cat_stream_
A reduce index function which returns a hash index and partition.
const struct VolatileKeyFlag< false > NoVolatileKeyTag
global const VolatileKeyFlag instance
virtual void AddChild(DIABase *node, const Callback &callback=Callback(), size_t parent_index=0)
Enables children to push their "folded" function chains to their parent.
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
tag structure for ReduceByKey(), and ReduceToIndex()
A DIANode which performs a Reduce operation.
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
static constexpr bool use_mix_stream_
Configuration class to define operational parameters of reduce hash tables and reduce phases...
data::Stream::Writers emitters_
test if is a std::pair<...>
static constexpr bool debug
Emitter(ReduceNode *node)
const struct DuplicateDetectionFlag< false > NoDuplicateDetectionTag
global const DuplicateDetectionFlag instance
StreamData::Writer Writer
A DOpNode is a typed node representing and distributed operations in Thrill.
void Execute() final
Virtual execution method. Triggers actual computation in sub-classes.
ReduceNode(const ParentDIA &parent, const char *label, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, const ReduceConfig &config, const KeyHashFunction &key_hash_function, const KeyEqualFunction &key_equal_function)
Constructor for a ReduceNode.
typename common::FunctionTraits< KeyExtractor >::result_type Key
tag structure for ReduceByKey()
std::thread thread_
handle to additional thread for post phase
void StopPreOp(size_t) final
Virtual method for preparing end of PushData.
typename std::conditional< VolatileKey, std::pair< Key, ValueType >, ValueType >::type TableItem
#define LOG
Default logging method: output if the local debug variable is true.
core::ReducePrePhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, VolatileKey, data::Stream::Writer, ReduceConfig, HashIndexFunction, KeyEqualFunction, KeyHashFunction, UseDuplicateDetection > pre_phase_
data::MixStreamPtr mix_stream_