Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
reduce_by_key.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/reduce_by_key.hpp
3  *
4  * DIANode for a reduce operation. Performs the actual reduce operation
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
9  * Copyright (C) 2015 Alexander Noe <[email protected]>
10  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_API_REDUCE_BY_KEY_HEADER
17 #define THRILL_API_REDUCE_BY_KEY_HEADER
18 
19 #include <thrill/api/dia.hpp>
20 #include <thrill/api/dop_node.hpp>
22 #include <thrill/common/logger.hpp>
26 #include <tlx/meta/is_std_pair.hpp>
27 
28 #include <functional>
29 #include <thread>
30 #include <type_traits>
31 #include <typeinfo>
32 #include <utility>
33 #include <vector>
34 
35 namespace thrill {
36 namespace api {
37 
39 { };
40 
41 /*!
42  * A DIANode which performs a Reduce operation. Reduce groups the elements in a
43  * DIA by their key and reduces every key bucket to a single element each. The
44  * ReduceNode stores the key_extractor and the reduce_function UDFs. The
45  * chainable LOps ahead of the Reduce operation are stored in the Stack. The
46  * ReduceNode has the type ValueType, which is the result type of the
47  * reduce_function.
48  *
49  * \tparam ValueType Output type of the Reduce operation
50  * \tparam Stack Function stack, which contains the chained lambdas between the
51  * last and this DIANode.
52  * \tparam KeyExtractor Type of the key_extractor function.
53  * \tparam ReduceFunction Type of the reduce_function.
54  * \tparam VolatileKey Whether to reuse the key once extracted in during pre reduce
55  * (false) or let the post reduce extract the key again (true).
56  *
57  * \ingroup api_layer
58  */
59 template <typename ValueType,
60  typename KeyExtractor, typename ReduceFunction,
61  typename ReduceConfig, typename KeyHashFunction,
62  typename KeyEqualFunction, const bool VolatileKey,
63  bool UseDuplicateDetection>
64 class ReduceNode final : public DOpNode<ValueType>
65 {
66 private:
67  static constexpr bool debug = false;
68 
70  using Super::context_;
71 
72  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
73 
74  using TableItem =
75  typename std::conditional<
76  VolatileKey, std::pair<Key, ValueType>, ValueType>::type;
77 
79 
80  static constexpr bool use_mix_stream_ = ReduceConfig::use_mix_stream_;
81  static constexpr bool use_post_thread_ = ReduceConfig::use_post_thread_;
82 
83  //! Emitter for PostPhase to push elements to next DIA object.
84  class Emitter
85  {
86  public:
87  explicit Emitter(ReduceNode* node) : node_(node) { }
88  void operator () (const ValueType& item) const
89  { return node_->PushItem(item); }
90 
91  private:
93  };
94 
95 public:
96  /*!
97  * Constructor for a ReduceNode. Sets the parent, stack, key_extractor and
98  * reduce_function.
99  */
100  template <typename ParentDIA>
101  ReduceNode(const ParentDIA& parent,
102  const char* label,
103  const KeyExtractor& key_extractor,
104  const ReduceFunction& reduce_function,
105  const ReduceConfig& config,
106  const KeyHashFunction& key_hash_function,
107  const KeyEqualFunction& key_equal_function)
108  : Super(parent.ctx(), label, { parent.id() }, { parent.node() }),
109  mix_stream_(use_mix_stream_ ?
110  parent.ctx().GetNewMixStream(this) : nullptr),
112  nullptr : parent.ctx().GetNewCatStream(this)),
114  mix_stream_->GetWriters() : cat_stream_->GetWriters()),
115  pre_phase_(
116  context_, Super::id(), parent.ctx().num_workers(),
117  key_extractor, reduce_function, emitters_, config,
118  HashIndexFunction(key_hash_function), key_equal_function,
119  key_hash_function),
120  post_phase_(
121  context_, Super::id(), key_extractor, reduce_function,
122  Emitter(this), config,
123  HashIndexFunction(key_hash_function), key_equal_function)
124  {
125  // Hook PreOp: Locally hash elements of the current DIA onto buckets and
126  // reduce each bucket to a single value, afterwards send data to another
127  // worker given by the shuffle algorithm.
128  auto pre_op_fn = [this](const ValueType& input) {
129  return pre_phase_.Insert(input);
130  };
131  // close the function stack with our pre op and register it at
132  // parent node for output
133  auto lop_chain = parent.stack().push(pre_op_fn).fold();
134  parent.node()->AddChild(this, lop_chain);
135  }
136 
137  DIAMemUse PreOpMemUse() final {
138  // request maximum RAM limit, the value is calculated by StageBuilder,
139  // and set as DIABase::mem_limit_.
140  return DIAMemUse::Max();
141  }
142 
143  void StartPreOp(size_t /* id */) final {
144  LOG << *this << " running StartPreOp";
145  if (!use_post_thread_) {
146  // use pre_phase without extra thread
147  pre_phase_.Initialize(DIABase::mem_limit_);
148  }
149  else {
150  pre_phase_.Initialize(DIABase::mem_limit_ / 2);
151  post_phase_.Initialize(DIABase::mem_limit_ / 2);
152 
153  // start additional thread to receive from the channel
154  thread_ = common::CreateThread([this] { ProcessChannel(); });
155  }
156  }
157 
158  void StopPreOp(size_t /* id */) final {
159  LOG << *this << " running StopPreOp";
160  // Flush hash table before the postOp
161  pre_phase_.FlushAll();
162  pre_phase_.CloseAll();
163  if (use_post_thread_) {
164  // waiting for the additional thread to finish the reduce
165  thread_.join();
166  // deallocate stream if already processed
167  use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
168  }
169  }
170 
171  void Execute() final { }
172 
173  DIAMemUse PushDataMemUse() final {
174  return DIAMemUse::Max();
175  }
176 
177  void PushData(bool consume) final {
178 
179  if (!use_post_thread_ && !reduced_) {
180  // not final reduced, and no additional thread, perform post reduce
181  post_phase_.Initialize(DIABase::mem_limit_);
182  ProcessChannel();
183 
184  // deallocate stream if already processed
185  use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
186 
187  reduced_ = true;
188  }
189  post_phase_.PushData(consume);
190  }
191 
192  //! process the inbound data in the post reduce phase
193  void ProcessChannel() {
194  if (use_mix_stream_)
195  {
196  auto reader = mix_stream_->GetMixReader(/* consume */ true);
197  sLOG << "reading data from" << mix_stream_->id()
198  << "to push into post phase which flushes to" << this->id();
199  while (reader.HasNext()) {
200  post_phase_.Insert(reader.template Next<TableItem>());
201  }
202  }
203  else
204  {
205  auto reader = cat_stream_->GetCatReader(/* consume */ true);
206  sLOG << "reading data from" << cat_stream_->id()
207  << "to push into post phase which flushes to" << this->id();
208  while (reader.HasNext()) {
209  post_phase_.Insert(reader.template Next<TableItem>());
210  }
211  }
212  }
213 
214  void Dispose() final {
215  post_phase_.Dispose();
216  }
217 
218 private:
219  // pointers for both Mix and CatStream. only one is used, the other costs
220  // only a null pointer.
221  data::MixStreamPtr mix_stream_;
223 
225  //! handle to additional thread for post phase
226  std::thread thread_;
227 
229  TableItem, Key, ValueType, KeyExtractor,
230  ReduceFunction, VolatileKey, data::Stream::Writer, ReduceConfig,
231  HashIndexFunction, KeyEqualFunction, KeyHashFunction,
232  UseDuplicateDetection> pre_phase_;
233 
235  TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter,
236  VolatileKey, ReduceConfig,
237  HashIndexFunction, KeyEqualFunction> post_phase_;
238 
239  bool reduced_ = false;
240 };
241 
242 template <typename ValueType, typename Stack>
243 template <typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
245  const KeyExtractor& key_extractor,
246  const ReduceFunction& reduce_function,
247  const ReduceConfig& reduce_config) const {
248  // forward to main function
249  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
250  return ReduceByKey(
252  key_extractor, reduce_function, reduce_config,
253  std::hash<Key>(), std::equal_to<Key>());
254 }
255 
256 template <typename ValueType, typename Stack>
257 template <typename KeyExtractor, typename ReduceFunction,
258  typename ReduceConfig, typename KeyHashFunction>
260  const KeyExtractor& key_extractor,
261  const ReduceFunction& reduce_function,
262  const ReduceConfig& reduce_config,
263  const KeyHashFunction& key_hash_function) const {
264  // forward to main function
265  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
266  return ReduceByKey(
268  key_extractor, reduce_function, reduce_config,
269  key_hash_function, std::equal_to<Key>());
270 }
271 
272 template <typename ValueType, typename Stack>
273 template <bool VolatileKeyValue,
274  typename KeyExtractor, typename ReduceFunction, typename ReduceConfig,
275  typename KeyHashFunction, typename KeyEqualFunction>
277  const VolatileKeyFlag<VolatileKeyValue>& volatile_key_flag,
278  const KeyExtractor& key_extractor,
279  const ReduceFunction& reduce_function,
280  const ReduceConfig& reduce_config,
281  const KeyHashFunction& key_hash_function,
282  const KeyEqualFunction& key_equal_funtion) const {
283  // forward to main function
284  return ReduceByKey(
285  volatile_key_flag, NoDuplicateDetectionTag,
286  key_extractor, reduce_function, reduce_config,
287  key_hash_function, key_equal_funtion);
288 }
289 
290 template <typename ValueType, typename Stack>
291 template <bool DuplicateDetectionValue,
292  typename KeyExtractor, typename ReduceFunction, typename ReduceConfig,
293  typename KeyHashFunction, typename KeyEqualFunction>
295  const DuplicateDetectionFlag<DuplicateDetectionValue>& duplicate_detection_flag,
296  const KeyExtractor& key_extractor,
297  const ReduceFunction& reduce_function,
298  const ReduceConfig& reduce_config,
299  const KeyHashFunction& key_hash_function,
300  const KeyEqualFunction& key_equal_funtion) const {
301  // forward to main function
302  return ReduceByKey(
303  NoVolatileKeyTag, duplicate_detection_flag,
304  key_extractor, reduce_function, reduce_config,
305  key_hash_function, key_equal_funtion);
306 }
307 
308 template <typename ValueType, typename Stack>
309 template <bool VolatileKeyValue,
310  bool DuplicateDetectionValue,
311  typename KeyExtractor, typename ReduceFunction, typename ReduceConfig,
312  typename KeyHashFunction, typename KeyEqualFunction>
316  const KeyExtractor& key_extractor,
317  const ReduceFunction& reduce_function,
318  const ReduceConfig& reduce_config,
319  const KeyHashFunction& key_hash_function,
320  const KeyEqualFunction& key_equal_funtion) const {
321  assert(IsValid());
322 
323  using DOpResult
324  = typename common::FunctionTraits<ReduceFunction>::result_type;
325 
326  static_assert(
327  std::is_convertible<
328  ValueType,
329  typename common::FunctionTraits<ReduceFunction>::template arg<0>
330  >::value,
331  "ReduceFunction has the wrong input type");
332 
333  static_assert(
334  std::is_convertible<
335  ValueType,
336  typename common::FunctionTraits<ReduceFunction>::template arg<1>
337  >::value,
338  "ReduceFunction has the wrong input type");
339 
340  static_assert(
341  std::is_same<
342  DOpResult,
343  ValueType>::value,
344  "ReduceFunction has the wrong output type");
345 
346  static_assert(
347  std::is_same<
348  typename std::decay<typename common::FunctionTraits<KeyExtractor>::
349  template arg<0> >::type,
350  ValueType>::value,
351  "KeyExtractor has the wrong input type");
352 
353  using ReduceNode = api::ReduceNode<
354  DOpResult, KeyExtractor, ReduceFunction, ReduceConfig,
355  KeyHashFunction, KeyEqualFunction,
356  VolatileKeyValue, DuplicateDetectionValue>;
357 
358  auto node = tlx::make_counting<ReduceNode>(
359  *this, "ReduceByKey",
360  key_extractor, reduce_function, reduce_config,
361  key_hash_function, key_equal_funtion);
362 
363  return DIA<DOpResult>(node);
364 }
365 
366 /******************************************************************************/
367 // ReducePair
368 
369 template <typename ValueType, typename Stack>
370 template <typename ReduceFunction, typename ReduceConfig>
372  const ReduceFunction& reduce_function,
373  const ReduceConfig& reduce_config) const {
374  // forward to main function
375  using Key = typename ValueType::first_type;
376  return ReducePair(reduce_function, reduce_config,
377  std::hash<Key>(), std::equal_to<Key>());
378 }
379 
380 template <typename ValueType, typename Stack>
381 template <typename ReduceFunction, typename ReduceConfig,
382  typename KeyHashFunction>
384  const ReduceFunction& reduce_function,
385  const ReduceConfig& reduce_config,
386  const KeyHashFunction& key_hash_function) const {
387  // forward to main function
388  using Key = typename ValueType::first_type;
389  return ReducePair(reduce_function, reduce_config,
390  key_hash_function, std::equal_to<Key>());
391 }
392 
393 template <typename ValueType, typename Stack>
394 template <typename ReduceFunction, typename ReduceConfig,
395  typename KeyHashFunction, typename KeyEqualFunction>
397  const ReduceFunction& reduce_function,
398  const ReduceConfig& reduce_config,
399  const KeyHashFunction& key_hash_function,
400  const KeyEqualFunction& key_equal_funtion) const {
401  // forward to main function
402  return ReducePair(NoDuplicateDetectionTag,
403  reduce_function, reduce_config,
404  key_hash_function, key_equal_funtion);
405 }
406 
407 template <typename ValueType, typename Stack>
408 template <bool DuplicateDetectionValue,
409  typename ReduceFunction, typename ReduceConfig,
410  typename KeyHashFunction, typename KeyEqualFunction>
413  const ReduceFunction& reduce_function,
414  const ReduceConfig& reduce_config,
415  const KeyHashFunction& key_hash_function,
416  const KeyEqualFunction& key_equal_funtion) const {
417  assert(IsValid());
418 
419  using DOpResult
420  = typename common::FunctionTraits<ReduceFunction>::result_type;
421 
423  "ValueType is not a pair");
424 
425  static_assert(
426  std::is_convertible<
427  typename ValueType::second_type,
428  typename common::FunctionTraits<ReduceFunction>::template arg<0>
429  >::value,
430  "ReduceFunction has the wrong input type");
431 
432  static_assert(
433  std::is_convertible<
434  typename ValueType::second_type,
435  typename common::FunctionTraits<ReduceFunction>::template arg<1>
436  >::value,
437  "ReduceFunction has the wrong input type");
438 
439  static_assert(
440  std::is_same<
441  DOpResult,
442  typename ValueType::second_type>::value,
443  "ReduceFunction has the wrong output type");
444 
445  auto key_extractor = [](const ValueType& value) { return value.first; };
446 
447  auto reduce_pair_function =
448  [reduce_function](const ValueType& a, const ValueType& b) {
449  return ValueType(a.first, reduce_function(a.second, b.second));
450  };
451 
452  using ReduceNode = api::ReduceNode<
453  ValueType,
454  decltype(key_extractor), decltype(reduce_pair_function),
455  ReduceConfig, KeyHashFunction, KeyEqualFunction,
456  /* VolatileKey */ false, DuplicateDetectionValue>;
457 
458  auto node = tlx::make_counting<ReduceNode>(
459  *this, "ReducePair",
460  key_extractor, reduce_pair_function, reduce_config,
461  key_hash_function, key_equal_funtion);
462 
463  return DIA<ValueType>(node);
464 }
465 
466 } // namespace api
467 } // namespace thrill
468 
469 #endif // !THRILL_API_REDUCE_BY_KEY_HEADER
470 
471 /******************************************************************************/
static DIAMemUse Max()
Definition: dia_base.hpp:60
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
ValueType_ ValueType
Definition: dia.hpp:152
static constexpr bool use_post_thread_
void reset()
release contained pointer, frees object if this is the last reference.
const char * label() const
return label() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:218
Emitter for PostPhase to push elements to next DIA object.
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
core::ReduceByHash< Key, KeyHashFunction > HashIndexFunction
const size_t & id() const
return unique id() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213
core::ReduceByHashPostPhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter, VolatileKey, ReduceConfig, HashIndexFunction, KeyEqualFunction > post_phase_
data::CatStreamPtr cat_stream_
virtual DIAMemUse PreOpMemUse()
Amount of RAM used by PreOp after StartPreOp()
Definition: dia_base.hpp:160
A reduce index function which returns a hash index and partition.
const struct VolatileKeyFlag< false > NoVolatileKeyTag
global const VolatileKeyFlag instance
Definition: dia.hpp:51
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
static constexpr bool use_mix_stream_
tag structure for ReduceByKey(), and ReduceToIndex()
Definition: dia.hpp:42
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
A DIANode which performs a Reduce operation.
auto ReducePair(const ReduceFunction &reduce_function, const ReduceConfig &reduce_config=ReduceConfig()) const
ReducePair is a DOp, which groups key-value-pairs in the input DIA by their key and reduces each key-...
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
Configuration class to define operational parameters of reduce hash tables and reduce phases...
data::Stream::Writers emitters_
test if is a std::pair<...>
Definition: is_std_pair.hpp:23
std::thread CreateThread(Args &&...args)
create a std::thread and repeat creation if it fails
Definition: porting.hpp:42
void operator()(const ValueType &item) const
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
auto ReduceByKey(const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, const ReduceConfig &reduce_config=ReduceConfig()) const
ReduceByKey is a DOp, which groups elements of the DIA with the key_extractor and reduces each key-bu...
const struct DuplicateDetectionFlag< false > NoDuplicateDetectionTag
global const DuplicateDetectionFlag instance
Definition: dia.hpp:112
DIAMemUse mem_limit_
Definition: dia_base.hpp:314
StreamData::Writer Writer
Definition: stream.hpp:39
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
ReduceNode(const ParentDIA &parent, const char *label, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, const ReduceConfig &config, const KeyHashFunction &key_hash_function, const KeyEqualFunction &key_equal_function)
Constructor for a ReduceNode.
typename common::FunctionTraits< KeyExtractor >::result_type Key
tag structure for ReduceByKey()
Definition: dia.hpp:103
tlx::CountingPtr< MixStream > MixStreamPtr
Definition: mix_stream.hpp:156
std::thread thread_
handle to additional thread for post phase
typename std::conditional< VolatileKey, std::pair< Key, ValueType >, ValueType >::type TableItem
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
static constexpr bool debug
core::ReducePrePhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, VolatileKey, data::Stream::Writer, ReduceConfig, HashIndexFunction, KeyEqualFunction, KeyHashFunction, UseDuplicateDetection > pre_phase_
tlx::CountingPtr< CatStream > CatStreamPtr
Definition: cat_stream.hpp:191
Context & context_
associated Context
Definition: dia_base.hpp:293
virtual DIAMemUse PushDataMemUse()
Amount of RAM used by PushData()
Definition: dia_base.hpp:182