Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
reduce_by_key.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/reduce_by_key.hpp
3  *
4  * DIANode for a reduce operation. Performs the actual reduce operation
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
9  * Copyright (C) 2015 Alexander Noe <[email protected]>
10  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_API_REDUCE_BY_KEY_HEADER
17 #define THRILL_API_REDUCE_BY_KEY_HEADER
18 
19 #include <thrill/api/dia.hpp>
20 #include <thrill/api/dop_node.hpp>
22 #include <thrill/common/logger.hpp>
26 #include <tlx/meta/is_std_pair.hpp>
27 
28 #include <functional>
29 #include <thread>
30 #include <type_traits>
31 #include <typeinfo>
32 #include <utility>
33 #include <vector>
34 
35 namespace thrill {
36 namespace api {
37 
39 { };
40 
41 /*!
42  * A DIANode which performs a Reduce operation. Reduce groups the elements in a
43  * DIA by their key and reduces every key bucket to a single element each. The
44  * ReduceNode stores the key_extractor and the reduce_function UDFs. The
45  * chainable LOps ahead of the Reduce operation are stored in the Stack. The
46  * ReduceNode has the type ValueType, which is the result type of the
47  * reduce_function.
48  *
49  * \tparam ValueType Output type of the Reduce operation
50  * \tparam Stack Function stack, which contains the chained lambdas between the
51  * last and this DIANode.
52  * \tparam KeyExtractor Type of the key_extractor function.
53  * \tparam ReduceFunction Type of the reduce_function.
54  * \tparam VolatileKey Whether to reuse the key once extracted in during pre reduce
55  * (false) or let the post reduce extract the key again (true).
56  *
57  * \ingroup api_layer
58  */
59 template <typename ValueType,
60  typename KeyExtractor, typename ReduceFunction,
61  typename ReduceConfig, typename KeyHashFunction,
62  typename KeyEqualFunction, const bool VolatileKey,
63  bool UseDuplicateDetection>
64 class ReduceNode final : public DOpNode<ValueType>
65 {
66 private:
67  static constexpr bool debug = false;
68 
70  using Super::context_;
71 
72  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
73 
74  using TableItem =
75  typename std::conditional<
76  VolatileKey, std::pair<Key, ValueType>, ValueType>::type;
77 
79 
80  static constexpr bool use_mix_stream_ = ReduceConfig::use_mix_stream_;
81  static constexpr bool use_post_thread_ = ReduceConfig::use_post_thread_;
82 
83  //! Emitter for PostPhase to push elements to next DIA object.
84  class Emitter
85  {
86  public:
87  explicit Emitter(ReduceNode* node) : node_(node) { }
88  void operator () (const ValueType& item) const
89  { return node_->PushItem(item); }
90 
91  private:
93  };
94 
95 public:
96  /*!
97  * Constructor for a ReduceNode. Sets the parent, stack, key_extractor and
98  * reduce_function.
99  */
100  template <typename ParentDIA>
101  ReduceNode(const ParentDIA& parent,
102  const char* label,
103  const KeyExtractor& key_extractor,
104  const ReduceFunction& reduce_function,
105  const ReduceConfig& config,
106  const KeyHashFunction& key_hash_function,
107  const KeyEqualFunction& key_equal_function)
108  : Super(parent.ctx(), label, { parent.id() }, { parent.node() }),
109  mix_stream_(use_mix_stream_ ?
110  parent.ctx().GetNewMixStream(this) : nullptr),
112  nullptr : parent.ctx().GetNewCatStream(this)),
114  mix_stream_->GetWriters() : cat_stream_->GetWriters()),
115  pre_phase_(
116  context_, Super::id(), parent.ctx().num_workers(),
117  key_extractor, reduce_function, emitters_, config,
118  HashIndexFunction(key_hash_function), key_equal_function,
119  key_hash_function),
120  post_phase_(
121  context_, Super::id(), key_extractor, reduce_function,
122  Emitter(this), config,
123  HashIndexFunction(key_hash_function), key_equal_function)
124  {
125  // Hook PreOp: Locally hash elements of the current DIA onto buckets and
126  // reduce each bucket to a single value, afterwards send data to another
127  // worker given by the shuffle algorithm.
128  auto pre_op_fn = [this](const ValueType& input) {
129  return pre_phase_.Insert(input);
130  };
131  // close the function stack with our pre op and register it at
132  // parent node for output
133  auto lop_chain = parent.stack().push(pre_op_fn).fold();
134  parent.node()->AddChild(this, lop_chain);
135  }
136 
137  DIAMemUse PreOpMemUse() final {
138  // request maximum RAM limit, the value is calculated by StageBuilder,
139  // and set as DIABase::mem_limit_.
140  return DIAMemUse::Max();
141  }
142 
143  void StartPreOp(size_t /* id */) final {
144  LOG << *this << " running StartPreOp";
145  if (!use_post_thread_) {
146  // use pre_phase without extra thread
147  pre_phase_.Initialize(DIABase::mem_limit_);
148  }
149  else {
150  pre_phase_.Initialize(DIABase::mem_limit_ / 2);
151  post_phase_.Initialize(DIABase::mem_limit_ / 2);
152 
153  // start additional thread to receive from the channel
154  thread_ = common::CreateThread([this] { ProcessChannel(); });
155  }
156  }
157 
158  void StopPreOp(size_t /* id */) final {
159  LOG << *this << " running StopPreOp";
160  // Flush hash table before the postOp
161  pre_phase_.FlushAll();
162  pre_phase_.CloseAll();
163  // waiting for the additional thread to finish the reduce
164  if (use_post_thread_) thread_.join();
165  use_mix_stream_ ? mix_stream_->Close() : cat_stream_->Close();
166  }
167 
168  void Execute() final { }
169 
170  DIAMemUse PushDataMemUse() final {
171  return DIAMemUse::Max();
172  }
173 
174  void PushData(bool consume) final {
175 
176  if (!use_post_thread_ && !reduced_) {
177  // not final reduced, and no additional thread, perform post reduce
178  post_phase_.Initialize(DIABase::mem_limit_);
179  ProcessChannel();
180 
181  reduced_ = true;
182  }
183  post_phase_.PushData(consume);
184  }
185 
186  //! process the inbound data in the post reduce phase
187  void ProcessChannel() {
188  if (use_mix_stream_)
189  {
190  auto reader = mix_stream_->GetMixReader(/* consume */ true);
191  sLOG << "reading data from" << mix_stream_->id()
192  << "to push into post phase which flushes to" << this->id();
193  while (reader.HasNext()) {
194  post_phase_.Insert(reader.template Next<TableItem>());
195  }
196  }
197  else
198  {
199  auto reader = cat_stream_->GetCatReader(/* consume */ true);
200  sLOG << "reading data from" << cat_stream_->id()
201  << "to push into post phase which flushes to" << this->id();
202  while (reader.HasNext()) {
203  post_phase_.Insert(reader.template Next<TableItem>());
204  }
205  }
206  }
207 
208  void Dispose() final {
209  post_phase_.Dispose();
210  }
211 
212 private:
213  // pointers for both Mix and CatStream. only one is used, the other costs
214  // only a null pointer.
215  data::MixStreamPtr mix_stream_;
217 
218  std::vector<data::Stream::Writer> emitters_;
219  //! handle to additional thread for post phase
220  std::thread thread_;
221 
222  core::ReducePrePhase<TableItem, Key, ValueType, KeyExtractor,
223  ReduceFunction, VolatileKey, ReduceConfig,
224  HashIndexFunction, KeyEqualFunction, KeyHashFunction,
225  UseDuplicateDetection> pre_phase_;
226 
228  TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter,
229  VolatileKey, ReduceConfig,
230  HashIndexFunction, KeyEqualFunction> post_phase_;
231 
232  bool reduced_ = false;
233 };
234 
235 template <typename ValueType, typename Stack>
236 template <typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
238  const KeyExtractor& key_extractor,
239  const ReduceFunction& reduce_function,
240  const ReduceConfig& reduce_config) const {
241  // forward to main function
242  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
243  return ReduceByKey(
245  key_extractor, reduce_function, reduce_config,
246  std::hash<Key>(), std::equal_to<Key>());
247 }
248 
249 template <typename ValueType, typename Stack>
250 template <typename KeyExtractor, typename ReduceFunction,
251  typename ReduceConfig, typename KeyHashFunction>
253  const KeyExtractor& key_extractor,
254  const ReduceFunction& reduce_function,
255  const ReduceConfig& reduce_config,
256  const KeyHashFunction& key_hash_function) const {
257  // forward to main function
258  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
259  return ReduceByKey(
261  key_extractor, reduce_function, reduce_config,
262  key_hash_function, std::equal_to<Key>());
263 }
264 
265 template <typename ValueType, typename Stack>
266 template <bool VolatileKeyValue,
267  typename KeyExtractor, typename ReduceFunction, typename ReduceConfig,
268  typename KeyHashFunction, typename KeyEqualFunction>
270  const VolatileKeyFlag<VolatileKeyValue>& volatile_key_flag,
271  const KeyExtractor& key_extractor,
272  const ReduceFunction& reduce_function,
273  const ReduceConfig& reduce_config,
274  const KeyHashFunction& key_hash_function,
275  const KeyEqualFunction& key_equal_funtion) const {
276  // forward to main function
277  return ReduceByKey(
278  volatile_key_flag, NoDuplicateDetectionTag,
279  key_extractor, reduce_function, reduce_config,
280  key_hash_function, key_equal_funtion);
281 }
282 
283 template <typename ValueType, typename Stack>
284 template <bool DuplicateDetectionValue,
285  typename KeyExtractor, typename ReduceFunction, typename ReduceConfig,
286  typename KeyHashFunction, typename KeyEqualFunction>
288  const DuplicateDetectionFlag<DuplicateDetectionValue>& duplicate_detection_flag,
289  const KeyExtractor& key_extractor,
290  const ReduceFunction& reduce_function,
291  const ReduceConfig& reduce_config,
292  const KeyHashFunction& key_hash_function,
293  const KeyEqualFunction& key_equal_funtion) const {
294  // forward to main function
295  return ReduceByKey(
296  NoVolatileKeyTag, duplicate_detection_flag,
297  key_extractor, reduce_function, reduce_config,
298  key_hash_function, key_equal_funtion);
299 }
300 
301 template <typename ValueType, typename Stack>
302 template <bool VolatileKeyValue,
303  bool DuplicateDetectionValue,
304  typename KeyExtractor, typename ReduceFunction, typename ReduceConfig,
305  typename KeyHashFunction, typename KeyEqualFunction>
309  const KeyExtractor& key_extractor,
310  const ReduceFunction& reduce_function,
311  const ReduceConfig& reduce_config,
312  const KeyHashFunction& key_hash_function,
313  const KeyEqualFunction& key_equal_funtion) const {
314  assert(IsValid());
315 
316  using DOpResult
317  = typename common::FunctionTraits<ReduceFunction>::result_type;
318 
319  static_assert(
320  std::is_convertible<
321  ValueType,
322  typename common::FunctionTraits<ReduceFunction>::template arg<0>
323  >::value,
324  "ReduceFunction has the wrong input type");
325 
326  static_assert(
327  std::is_convertible<
328  ValueType,
329  typename common::FunctionTraits<ReduceFunction>::template arg<1>
330  >::value,
331  "ReduceFunction has the wrong input type");
332 
333  static_assert(
334  std::is_same<
335  DOpResult,
336  ValueType>::value,
337  "ReduceFunction has the wrong output type");
338 
339  static_assert(
340  std::is_same<
341  typename std::decay<typename common::FunctionTraits<KeyExtractor>::
342  template arg<0> >::type,
343  ValueType>::value,
344  "KeyExtractor has the wrong input type");
345 
346  using ReduceNode = api::ReduceNode<
347  DOpResult, KeyExtractor, ReduceFunction, ReduceConfig,
348  KeyHashFunction, KeyEqualFunction,
349  VolatileKeyValue, DuplicateDetectionValue>;
350 
351  auto node = tlx::make_counting<ReduceNode>(
352  *this, "ReduceByKey",
353  key_extractor, reduce_function, reduce_config,
354  key_hash_function, key_equal_funtion);
355 
356  return DIA<DOpResult>(node);
357 }
358 
359 /******************************************************************************/
360 // ReducePair
361 
362 template <typename ValueType, typename Stack>
363 template <typename ReduceFunction, typename ReduceConfig>
365  const ReduceFunction& reduce_function,
366  const ReduceConfig& reduce_config) const {
367  // forward to main function
368  using Key = typename ValueType::first_type;
369  return ReducePair(reduce_function, reduce_config,
370  std::hash<Key>(), std::equal_to<Key>());
371 }
372 
373 template <typename ValueType, typename Stack>
374 template <typename ReduceFunction, typename ReduceConfig,
375  typename KeyHashFunction>
377  const ReduceFunction& reduce_function,
378  const ReduceConfig& reduce_config,
379  const KeyHashFunction& key_hash_function) const {
380  // forward to main function
381  using Key = typename ValueType::first_type;
382  return ReducePair(reduce_function, reduce_config,
383  key_hash_function, std::equal_to<Key>());
384 }
385 
386 template <typename ValueType, typename Stack>
387 template <typename ReduceFunction, typename ReduceConfig,
388  typename KeyHashFunction, typename KeyEqualFunction>
390  const ReduceFunction& reduce_function,
391  const ReduceConfig& reduce_config,
392  const KeyHashFunction& key_hash_function,
393  const KeyEqualFunction& key_equal_funtion) const {
394  // forward to main function
395  return ReducePair(NoDuplicateDetectionTag,
396  reduce_function, reduce_config,
397  key_hash_function, key_equal_funtion);
398 }
399 
400 template <typename ValueType, typename Stack>
401 template <bool DuplicateDetectionValue,
402  typename ReduceFunction, typename ReduceConfig,
403  typename KeyHashFunction, typename KeyEqualFunction>
406  const ReduceFunction& reduce_function,
407  const ReduceConfig& reduce_config,
408  const KeyHashFunction& key_hash_function,
409  const KeyEqualFunction& key_equal_funtion) const {
410  assert(IsValid());
411 
412  using DOpResult
413  = typename common::FunctionTraits<ReduceFunction>::result_type;
414 
416  "ValueType is not a pair");
417 
418  static_assert(
419  std::is_convertible<
420  typename ValueType::second_type,
421  typename common::FunctionTraits<ReduceFunction>::template arg<0>
422  >::value,
423  "ReduceFunction has the wrong input type");
424 
425  static_assert(
426  std::is_convertible<
427  typename ValueType::second_type,
428  typename common::FunctionTraits<ReduceFunction>::template arg<1>
429  >::value,
430  "ReduceFunction has the wrong input type");
431 
432  static_assert(
433  std::is_same<
434  DOpResult,
435  typename ValueType::second_type>::value,
436  "ReduceFunction has the wrong output type");
437 
438  auto key_extractor = [](const ValueType& value) { return value.first; };
439 
440  auto reduce_pair_function =
441  [reduce_function](const ValueType& a, const ValueType& b) {
442  return ValueType(a.first, reduce_function(a.second, b.second));
443  };
444 
445  using ReduceNode = api::ReduceNode<
446  ValueType,
447  decltype(key_extractor), decltype(reduce_pair_function),
448  ReduceConfig, KeyHashFunction, KeyEqualFunction,
449  /* VolatileKey */ false, DuplicateDetectionValue>;
450 
451  auto node = tlx::make_counting<ReduceNode>(
452  *this, "ReducePair",
453  key_extractor, reduce_pair_function, reduce_config,
454  key_hash_function, key_equal_funtion);
455 
456  return DIA<ValueType>(node);
457 }
458 
459 } // namespace api
460 } // namespace thrill
461 
462 #endif // !THRILL_API_REDUCE_BY_KEY_HEADER
463 
464 /******************************************************************************/
std::vector< data::Stream::Writer > emitters_
static DIAMemUse Max()
Definition: dia_base.hpp:60
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:133
ValueType_ ValueType
Definition: dia.hpp:144
core::ReducePrePhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, VolatileKey, ReduceConfig, HashIndexFunction, KeyEqualFunction, KeyHashFunction, UseDuplicateDetection > pre_phase_
static constexpr bool use_post_thread_
const char * label() const
return label() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:218
Emitter for PostPhase to push elements to next DIA object.
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
core::ReduceByHash< Key, KeyHashFunction > HashIndexFunction
A DIANode is a typed node representing and operation in Thrill.
Definition: dia_node.hpp:37
const size_t & id() const
return unique id() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213
core::ReduceByHashPostPhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter, VolatileKey, ReduceConfig, HashIndexFunction, KeyEqualFunction > post_phase_
data::CatStreamPtr cat_stream_
virtual DIAMemUse PreOpMemUse()
Amount of RAM used by PreOp after StartPreOp()
Definition: dia_base.hpp:160
A reduce index function which returns a hash index and partition.
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:179
const struct VolatileKeyFlag< false > NoVolatileKeyTag
global const VolatileKeyFlag instance
Definition: dia.hpp:51
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
static constexpr bool use_mix_stream_
tag structure for ReduceByKey(), and ReduceToIndex()
Definition: dia.hpp:42
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
A DIANode which performs a Reduce operation.
auto ReducePair(const ReduceFunction &reduce_function, const ReduceConfig &reduce_config=ReduceConfig()) const
ReducePair is a DOp, which groups key-value-pairs in the input DIA by their key and reduces each key-...
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
Configuration class to define operational parameters of reduce hash tables and reduce phases...
test if is a std::pair<...>
Definition: is_std_pair.hpp:23
std::thread CreateThread(Args &&...args)
create a std::thread and repeat creation if it fails
Definition: porting.hpp:42
void operator()(const ValueType &item) const
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
auto ReduceByKey(const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, const ReduceConfig &reduce_config=ReduceConfig()) const
ReduceByKey is a DOp, which groups elements of the DIA with the key_extractor and reduces each key-bu...
const struct DuplicateDetectionFlag< false > NoDuplicateDetectionTag
global const DuplicateDetectionFlag instance
Definition: dia.hpp:104
DIAMemUse mem_limit_
Definition: dia_base.hpp:314
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
ReduceNode(const ParentDIA &parent, const char *label, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, const ReduceConfig &config, const KeyHashFunction &key_hash_function, const KeyEqualFunction &key_equal_function)
Constructor for a ReduceNode.
typename common::FunctionTraits< KeyExtractor >::result_type Key
tag structure for ReduceByKey()
Definition: dia.hpp:95
tlx::CountingPtr< MixStream > MixStreamPtr
Definition: mix_stream.hpp:114
std::thread thread_
handle to additional thread for post phase
typename std::conditional< VolatileKey, std::pair< Key, ValueType >, ValueType >::type TableItem
static constexpr bool debug
tlx::CountingPtr< CatStream > CatStreamPtr
Definition: cat_stream.hpp:138
Context & context_
associated Context
Definition: dia_base.hpp:293
virtual DIAMemUse PushDataMemUse()
Amount of RAM used by PushData()
Definition: dia_base.hpp:182
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:167