Thrill  0.1
reduce_by_key.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/reduce_by_key.hpp
3  *
4  * DIANode for a reduce operation. Performs the actual reduce operation
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
9  * Copyright (C) 2015 Alexander Noe <[email protected]>
10  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_API_REDUCE_BY_KEY_HEADER
17 #define THRILL_API_REDUCE_BY_KEY_HEADER
18 
19 #include <thrill/api/dia.hpp>
20 #include <thrill/api/dop_node.hpp>
22 #include <thrill/common/logger.hpp>
26 #include <tlx/meta/is_std_pair.hpp>
27 
28 #include <functional>
29 #include <thread>
30 #include <type_traits>
31 #include <typeinfo>
32 #include <utility>
33 #include <vector>
34 
35 namespace thrill {
36 namespace api {
37 
39 { };
40 
41 /*!
42  * A DIANode which performs a Reduce operation. Reduce groups the elements in a
43  * DIA by their key and reduces every key bucket to a single element each. The
44  * ReduceNode stores the key_extractor and the reduce_function UDFs. The
45  * chainable LOps ahead of the Reduce operation are stored in the Stack. The
46  * ReduceNode has the type ValueType, which is the result type of the
47  * reduce_function.
48  *
49  * \tparam ValueType Output type of the Reduce operation
50  * \tparam Stack Function stack, which contains the chained lambdas between the
51  * last and this DIANode.
52  * \tparam KeyExtractor Type of the key_extractor function.
53  * \tparam ReduceFunction Type of the reduce_function.
54  * \tparam VolatileKey Whether to reuse the key once extracted in during pre reduce
55  * (false) or let the post reduce extract the key again (true).
56  *
57  * \ingroup api_layer
58  */
59 template <typename ValueType,
60  typename KeyExtractor, typename ReduceFunction,
61  typename ReduceConfig, typename KeyHashFunction,
62  typename KeyEqualFunction, const bool VolatileKey,
63  bool UseDuplicateDetection>
64 class ReduceNode final : public DOpNode<ValueType>
65 {
66 private:
67  static constexpr bool debug = false;
68 
70  using Super::context_;
71 
72  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
73 
74  using TableItem =
75  typename std::conditional<
76  VolatileKey, std::pair<Key, ValueType>, ValueType>::type;
77 
79 
80  static constexpr bool use_mix_stream_ = ReduceConfig::use_mix_stream_;
81  static constexpr bool use_post_thread_ = ReduceConfig::use_post_thread_;
82 
83  //! Emitter for PostPhase to push elements to next DIA object.
84  class Emitter
85  {
86  public:
87  explicit Emitter(ReduceNode* node) : node_(node) { }
88  void operator () (const ValueType& item) const
89  { return node_->PushItem(item); }
90 
91  private:
93  };
94 
95 public:
96  /*!
97  * Constructor for a ReduceNode. Sets the parent, stack, key_extractor and
98  * reduce_function.
99  */
100  template <typename ParentDIA>
101  ReduceNode(const ParentDIA& parent,
102  const char* label,
103  const KeyExtractor& key_extractor,
104  const ReduceFunction& reduce_function,
105  const ReduceConfig& config,
106  const KeyHashFunction& key_hash_function,
107  const KeyEqualFunction& key_equal_function)
108  : Super(parent.ctx(), label, { parent.id() }, { parent.node() }),
109  mix_stream_(use_mix_stream_ ?
110  parent.ctx().GetNewMixStream(this) : nullptr),
111  cat_stream_(use_mix_stream_ ?
112  nullptr : parent.ctx().GetNewCatStream(this)),
113  emitters_(use_mix_stream_ ?
114  mix_stream_->GetWriters() : cat_stream_->GetWriters()),
115  pre_phase_(
116  context_, Super::dia_id(), parent.ctx().num_workers(),
117  key_extractor, reduce_function, emitters_, config,
118  HashIndexFunction(key_hash_function), key_equal_function,
119  key_hash_function),
120  post_phase_(
121  context_, Super::dia_id(), key_extractor, reduce_function,
122  Emitter(this), config,
123  HashIndexFunction(key_hash_function), key_equal_function) {
124  // Hook PreOp: Locally hash elements of the current DIA onto buckets and
125  // reduce each bucket to a single value, afterwards send data to another
126  // worker given by the shuffle algorithm.
127  auto pre_op_fn = [this](const ValueType& input) {
128  return pre_phase_.Insert(input);
129  };
130  // close the function stack with our pre op and register it at
131  // parent node for output
132  auto lop_chain = parent.stack().push(pre_op_fn).fold();
133  parent.node()->AddChild(this, lop_chain);
134  }
135 
137  // request maximum RAM limit, the value is calculated by StageBuilder,
138  // and set as DIABase::mem_limit_.
139  return DIAMemUse::Max();
140  }
141 
142  void StartPreOp(size_t /* parent_index */) final {
143  LOG << *this << " running StartPreOp";
144  if (!use_post_thread_) {
145  // use pre_phase without extra thread
146  pre_phase_.Initialize(DIABase::mem_limit_);
147  }
148  else {
149  pre_phase_.Initialize(DIABase::mem_limit_ / 2);
150  post_phase_.Initialize(DIABase::mem_limit_ / 2);
151 
152  // start additional thread to receive from the channel
153  thread_ = common::CreateThread([this] { ProcessChannel(); });
154  }
155  }
156 
157  void StopPreOp(size_t /* parent_index */) final {
158  LOG << *this << " running StopPreOp";
159  // Flush hash table before the postOp
160  pre_phase_.FlushAll();
161  pre_phase_.CloseAll();
162  if (use_post_thread_) {
163  // waiting for the additional thread to finish the reduce
164  thread_.join();
165  // deallocate stream if already processed
166  use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
167  }
168  }
169 
170  void Execute() final { }
171 
173  return DIAMemUse::Max();
174  }
175 
176  void PushData(bool consume) final {
177 
178  if (!use_post_thread_ && !reduced_) {
179  // not final reduced, and no additional thread, perform post reduce
180  post_phase_.Initialize(DIABase::mem_limit_);
181  ProcessChannel();
182 
183  // deallocate stream if already processed
184  use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
185 
186  reduced_ = true;
187  }
188  post_phase_.PushData(consume);
189  }
190 
191  //! process the inbound data in the post reduce phase
192  void ProcessChannel() {
193  if (use_mix_stream_)
194  {
195  auto reader = mix_stream_->GetMixReader(/* consume */ true);
196  sLOG << "reading data from" << mix_stream_->id()
197  << "to push into post phase which flushes to" << this->dia_id();
198  while (reader.HasNext()) {
199  post_phase_.Insert(reader.template Next<TableItem>());
200  }
201  }
202  else
203  {
204  auto reader = cat_stream_->GetCatReader(/* consume */ true);
205  sLOG << "reading data from" << cat_stream_->id()
206  << "to push into post phase which flushes to" << this->dia_id();
207  while (reader.HasNext()) {
208  post_phase_.Insert(reader.template Next<TableItem>());
209  }
210  }
211  }
212 
213  void Dispose() final {
214  post_phase_.Dispose();
215  }
216 
217 private:
218  // pointers for both Mix and CatStream. only one is used, the other costs
219  // only a null pointer.
222 
224  //! handle to additional thread for post phase
225  std::thread thread_;
226 
228  TableItem, Key, ValueType, KeyExtractor,
229  ReduceFunction, VolatileKey, data::Stream::Writer, ReduceConfig,
230  HashIndexFunction, KeyEqualFunction, KeyHashFunction,
231  UseDuplicateDetection> pre_phase_;
232 
234  TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter,
235  VolatileKey, ReduceConfig,
236  HashIndexFunction, KeyEqualFunction> post_phase_;
237 
238  bool reduced_ = false;
239 };
240 
241 template <typename ValueType, typename Stack>
242 template <typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
244  const KeyExtractor& key_extractor,
245  const ReduceFunction& reduce_function,
246  const ReduceConfig& reduce_config) const {
247  // forward to main function
248  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
249  return ReduceByKey(
251  key_extractor, reduce_function, reduce_config,
252  std::hash<Key>(), std::equal_to<Key>());
253 }
254 
255 template <typename ValueType, typename Stack>
256 template <typename KeyExtractor, typename ReduceFunction,
257  typename ReduceConfig, typename KeyHashFunction>
259  const KeyExtractor& key_extractor,
260  const ReduceFunction& reduce_function,
261  const ReduceConfig& reduce_config,
262  const KeyHashFunction& key_hash_function) const {
263  // forward to main function
264  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
265  return ReduceByKey(
267  key_extractor, reduce_function, reduce_config,
268  key_hash_function, std::equal_to<Key>());
269 }
270 
271 template <typename ValueType, typename Stack>
272 template <bool VolatileKeyValue,
273  typename KeyExtractor, typename ReduceFunction, typename ReduceConfig,
274  typename KeyHashFunction, typename KeyEqualFunction>
276  const VolatileKeyFlag<VolatileKeyValue>& volatile_key_flag,
277  const KeyExtractor& key_extractor,
278  const ReduceFunction& reduce_function,
279  const ReduceConfig& reduce_config,
280  const KeyHashFunction& key_hash_function,
281  const KeyEqualFunction& key_equal_funtion) const {
282  // forward to main function
283  return ReduceByKey(
284  volatile_key_flag, NoDuplicateDetectionTag,
285  key_extractor, reduce_function, reduce_config,
286  key_hash_function, key_equal_funtion);
287 }
288 
289 template <typename ValueType, typename Stack>
290 template <bool DuplicateDetectionValue,
291  typename KeyExtractor, typename ReduceFunction, typename ReduceConfig,
292  typename KeyHashFunction, typename KeyEqualFunction>
294  const DuplicateDetectionFlag<DuplicateDetectionValue>& duplicate_detection_flag,
295  const KeyExtractor& key_extractor,
296  const ReduceFunction& reduce_function,
297  const ReduceConfig& reduce_config,
298  const KeyHashFunction& key_hash_function,
299  const KeyEqualFunction& key_equal_funtion) const {
300  // forward to main function
301  return ReduceByKey(
302  NoVolatileKeyTag, duplicate_detection_flag,
303  key_extractor, reduce_function, reduce_config,
304  key_hash_function, key_equal_funtion);
305 }
306 
307 template <typename ValueType, typename Stack>
308 template <bool VolatileKeyValue,
309  bool DuplicateDetectionValue,
310  typename KeyExtractor, typename ReduceFunction, typename ReduceConfig,
311  typename KeyHashFunction, typename KeyEqualFunction>
315  const KeyExtractor& key_extractor,
316  const ReduceFunction& reduce_function,
317  const ReduceConfig& reduce_config,
318  const KeyHashFunction& key_hash_function,
319  const KeyEqualFunction& key_equal_funtion) const {
320  assert(IsValid());
321 
322  using DOpResult
323  = typename common::FunctionTraits<ReduceFunction>::result_type;
324 
325  static_assert(
326  std::is_convertible<
327  ValueType,
328  typename common::FunctionTraits<ReduceFunction>::template arg<0>
329  >::value,
330  "ReduceFunction has the wrong input type");
331 
332  static_assert(
333  std::is_convertible<
334  ValueType,
335  typename common::FunctionTraits<ReduceFunction>::template arg<1>
336  >::value,
337  "ReduceFunction has the wrong input type");
338 
339  static_assert(
340  std::is_same<
341  DOpResult,
342  ValueType>::value,
343  "ReduceFunction has the wrong output type");
344 
345  static_assert(
346  std::is_same<
347  typename std::decay<typename common::FunctionTraits<KeyExtractor>::
348  template arg<0> >::type,
349  ValueType>::value,
350  "KeyExtractor has the wrong input type");
351 
352  using ReduceNode = api::ReduceNode<
353  DOpResult, KeyExtractor, ReduceFunction, ReduceConfig,
354  KeyHashFunction, KeyEqualFunction,
355  VolatileKeyValue, DuplicateDetectionValue>;
356 
357  auto node = tlx::make_counting<ReduceNode>(
358  *this, "ReduceByKey",
359  key_extractor, reduce_function, reduce_config,
360  key_hash_function, key_equal_funtion);
361 
362  return DIA<DOpResult>(node);
363 }
364 
365 /******************************************************************************/
366 // ReducePair
367 
368 template <typename ValueType, typename Stack>
369 template <typename ReduceFunction, typename ReduceConfig>
371  const ReduceFunction& reduce_function,
372  const ReduceConfig& reduce_config) const {
373  // forward to main function
374  using Key = typename ValueType::first_type;
375  return ReducePair(reduce_function, reduce_config,
376  std::hash<Key>(), std::equal_to<Key>());
377 }
378 
379 template <typename ValueType, typename Stack>
380 template <typename ReduceFunction, typename ReduceConfig,
381  typename KeyHashFunction>
383  const ReduceFunction& reduce_function,
384  const ReduceConfig& reduce_config,
385  const KeyHashFunction& key_hash_function) const {
386  // forward to main function
387  using Key = typename ValueType::first_type;
388  return ReducePair(reduce_function, reduce_config,
389  key_hash_function, std::equal_to<Key>());
390 }
391 
392 template <typename ValueType, typename Stack>
393 template <typename ReduceFunction, typename ReduceConfig,
394  typename KeyHashFunction, typename KeyEqualFunction>
396  const ReduceFunction& reduce_function,
397  const ReduceConfig& reduce_config,
398  const KeyHashFunction& key_hash_function,
399  const KeyEqualFunction& key_equal_funtion) const {
400  // forward to main function
401  return ReducePair(NoDuplicateDetectionTag,
402  reduce_function, reduce_config,
403  key_hash_function, key_equal_funtion);
404 }
405 
406 template <typename ValueType, typename Stack>
407 template <bool DuplicateDetectionValue,
408  typename ReduceFunction, typename ReduceConfig,
409  typename KeyHashFunction, typename KeyEqualFunction>
412  const ReduceFunction& reduce_function,
413  const ReduceConfig& reduce_config,
414  const KeyHashFunction& key_hash_function,
415  const KeyEqualFunction& key_equal_funtion) const {
416  assert(IsValid());
417 
418  using DOpResult
419  = typename common::FunctionTraits<ReduceFunction>::result_type;
420 
422  "ValueType is not a pair");
423 
424  static_assert(
425  std::is_convertible<
426  typename ValueType::second_type,
427  typename common::FunctionTraits<ReduceFunction>::template arg<0>
428  >::value,
429  "ReduceFunction has the wrong input type");
430 
431  static_assert(
432  std::is_convertible<
433  typename ValueType::second_type,
434  typename common::FunctionTraits<ReduceFunction>::template arg<1>
435  >::value,
436  "ReduceFunction has the wrong input type");
437 
438  static_assert(
439  std::is_same<
440  DOpResult,
441  typename ValueType::second_type>::value,
442  "ReduceFunction has the wrong output type");
443 
444  auto key_extractor = [](const ValueType& value) { return value.first; };
445 
446  auto reduce_pair_function =
447  [reduce_function](const ValueType& a, const ValueType& b) {
448  return ValueType(a.first, reduce_function(a.second, b.second));
449  };
450 
451  using ReduceNode = api::ReduceNode<
452  ValueType,
453  decltype(key_extractor), decltype(reduce_pair_function),
454  ReduceConfig, KeyHashFunction, KeyEqualFunction,
455  /* VolatileKey */ false, DuplicateDetectionValue>;
456 
457  auto node = tlx::make_counting<ReduceNode>(
458  *this, "ReducePair",
459  key_extractor, reduce_pair_function, reduce_config,
460  key_hash_function, key_equal_funtion);
461 
462  return DIA<ValueType>(node);
463 }
464 
465 } // namespace api
466 } // namespace thrill
467 
468 #endif // !THRILL_API_REDUCE_BY_KEY_HEADER
469 
470 /******************************************************************************/
std::thread CreateThread(Args &&... args)
create a std::thread and repeat creation if it fails
Definition: porting.hpp:44
static DIAMemUse Max()
Definition: dia_base.hpp:60
void ProcessChannel()
process the inbound data in the post reduce phase
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
Description of the amount of RAM the internal data structures of a DIANode require.
Definition: dia_base.hpp:51
auto ReduceByKey(const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, const ReduceConfig &reduce_config=ReduceConfig()) const
ReduceByKey is a DOp, which groups elements of the DIA with the key_extractor and reduces each key-bu...
auto ReducePair(const ReduceFunction &reduce_function, const ReduceConfig &reduce_config=ReduceConfig()) const
ReducePair is a DOp, which groups key-value-pairs in the input DIA by their key and reduces each key-...
void StartPreOp(size_t) final
Virtual method for preparing start of PushData.
static constexpr bool use_post_thread_
DIAMemUse PreOpMemUse() final
Amount of RAM used by PreOp after StartPreOp()
Emitter for PostPhase to push elements to next DIA object.
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
DIAMemUse PushDataMemUse() final
Amount of RAM used by PushData()
core::ReduceByHashPostPhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter, VolatileKey, ReduceConfig, HashIndexFunction, KeyEqualFunction > post_phase_
data::CatStreamPtr cat_stream_
A reduce index function which returns a hash index and partition.
const struct VolatileKeyFlag< false > NoVolatileKeyTag
global const VolatileKeyFlag instance
Definition: dia.hpp:51
virtual void AddChild(DIABase *node, const Callback &callback=Callback(), size_t parent_index=0)
Enables children to push their "folded" function chains to their parent.
Definition: dia_node.hpp:76
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:59
tag structure for ReduceByKey(), and ReduceToIndex()
Definition: dia.hpp:42
A DIANode which performs a Reduce operation.
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
static constexpr bool use_mix_stream_
int value
Definition: gen_data.py:41
Configuration class to define operational parameters of reduce hash tables and reduce phases...
data::Stream::Writers emitters_
test if is a std::pair<...>
Definition: is_std_pair.hpp:23
static constexpr bool debug
const struct DuplicateDetectionFlag< false > NoDuplicateDetectionTag
global const DuplicateDetectionFlag instance
Definition: dia.hpp:112
DIAMemUse mem_limit_
Definition: dia_base.hpp:314
StreamData::Writer Writer
Definition: stream.hpp:39
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
void Execute() final
Virtual execution method. Triggers actual computation in sub-classes.
ReduceNode(const ParentDIA &parent, const char *label, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, const ReduceConfig &config, const KeyHashFunction &key_hash_function, const KeyEqualFunction &key_equal_function)
Constructor for a ReduceNode.
typename common::FunctionTraits< KeyExtractor >::result_type Key
tag structure for ReduceByKey()
Definition: dia.hpp:103
std::thread thread_
handle to additional thread for post phase
void StopPreOp(size_t) final
Virtual method for preparing end of PushData.
typename std::conditional< VolatileKey, std::pair< Key, ValueType >, ValueType >::type TableItem
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
core::ReducePrePhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, VolatileKey, data::Stream::Writer, ReduceConfig, HashIndexFunction, KeyEqualFunction, KeyHashFunction, UseDuplicateDetection > pre_phase_
data::MixStreamPtr mix_stream_