Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
reduce_by_key.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/reduce_by_key.hpp
3  *
4  * DIANode for a reduce operation. Performs the actual reduce operation
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
9  * Copyright (C) 2015 Alexander Noe <[email protected]>
10  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_API_REDUCE_BY_KEY_HEADER
17 #define THRILL_API_REDUCE_BY_KEY_HEADER
18 
19 #include <thrill/api/dia.hpp>
20 #include <thrill/api/dop_node.hpp>
22 #include <thrill/common/logger.hpp>
26 #include <tlx/meta/is_std_pair.hpp>
27 
28 #include <functional>
29 #include <thread>
30 #include <type_traits>
31 #include <typeinfo>
32 #include <utility>
33 #include <vector>
34 
35 namespace thrill {
36 namespace api {
37 
39 { };
40 
41 /*!
42  * A DIANode which performs a Reduce operation. Reduce groups the elements in a
43  * DIA by their key and reduces every key bucket to a single element each. The
44  * ReduceNode stores the key_extractor and the reduce_function UDFs. The
45  * chainable LOps ahead of the Reduce operation are stored in the Stack. The
46  * ReduceNode has the type ValueType, which is the result type of the
47  * reduce_function.
48  *
49  * \tparam ValueType Output type of the Reduce operation
50  * \tparam Stack Function stack, which contains the chained lambdas between the
51  * last and this DIANode.
52  * \tparam KeyExtractor Type of the key_extractor function.
53  * \tparam ReduceFunction Type of the reduce_function.
54  * \tparam VolatileKey Whether to reuse the key once extracted in during pre reduce
55  * (false) or let the post reduce extract the key again (true).
56  *
57  * \ingroup api_layer
58  */
59 template <typename ValueType,
60  typename KeyExtractor, typename ReduceFunction,
61  typename ReduceConfig, typename KeyHashFunction,
62  typename KeyEqualFunction, const bool VolatileKey,
63  bool UseDuplicateDetection>
64 class ReduceNode final : public DOpNode<ValueType>
65 {
66 private:
67  static constexpr bool debug = false;
68 
70  using Super::context_;
71 
72  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
73 
74  using TableItem =
75  typename std::conditional<
76  VolatileKey, std::pair<Key, ValueType>, ValueType>::type;
77 
79 
80  static constexpr bool use_mix_stream_ = ReduceConfig::use_mix_stream_;
81  static constexpr bool use_post_thread_ = ReduceConfig::use_post_thread_;
82 
83  //! Emitter for PostPhase to push elements to next DIA object.
84  class Emitter
85  {
86  public:
87  explicit Emitter(ReduceNode* node) : node_(node) { }
88  void operator () (const ValueType& item) const
89  { return node_->PushItem(item); }
90 
91  private:
93  };
94 
95 public:
96  /*!
97  * Constructor for a ReduceNode. Sets the parent, stack, key_extractor and
98  * reduce_function.
99  */
100  template <typename ParentDIA>
101  ReduceNode(const ParentDIA& parent,
102  const char* label,
103  const KeyExtractor& key_extractor,
104  const ReduceFunction& reduce_function,
105  const ReduceConfig& config,
106  const KeyHashFunction& key_hash_function,
107  const KeyEqualFunction& key_equal_function)
108  : Super(parent.ctx(), label, { parent.id() }, { parent.node() }),
109  mix_stream_(use_mix_stream_ ?
110  parent.ctx().GetNewMixStream(this) : nullptr),
112  nullptr : parent.ctx().GetNewCatStream(this)),
114  mix_stream_->GetWriters() : cat_stream_->GetWriters()),
115  pre_phase_(
116  context_, Super::dia_id(), parent.ctx().num_workers(),
117  key_extractor, reduce_function, emitters_, config,
118  HashIndexFunction(key_hash_function), key_equal_function,
119  key_hash_function),
120  post_phase_(
121  context_, Super::dia_id(), key_extractor, reduce_function,
122  Emitter(this), config,
123  HashIndexFunction(key_hash_function), key_equal_function) {
124  // Hook PreOp: Locally hash elements of the current DIA onto buckets and
125  // reduce each bucket to a single value, afterwards send data to another
126  // worker given by the shuffle algorithm.
127  auto pre_op_fn = [this](const ValueType& input) {
128  return pre_phase_.Insert(input);
129  };
130  // close the function stack with our pre op and register it at
131  // parent node for output
132  auto lop_chain = parent.stack().push(pre_op_fn).fold();
133  parent.node()->AddChild(this, lop_chain);
134  }
135 
136  DIAMemUse PreOpMemUse() final {
137  // request maximum RAM limit, the value is calculated by StageBuilder,
138  // and set as DIABase::mem_limit_.
139  return DIAMemUse::Max();
140  }
141 
142  void StartPreOp(size_t /* parent_index */) final {
143  LOG << *this << " running StartPreOp";
144  if (!use_post_thread_) {
145  // use pre_phase without extra thread
146  pre_phase_.Initialize(DIABase::mem_limit_);
147  }
148  else {
149  pre_phase_.Initialize(DIABase::mem_limit_ / 2);
150  post_phase_.Initialize(DIABase::mem_limit_ / 2);
151 
152  // start additional thread to receive from the channel
153  thread_ = common::CreateThread([this] { ProcessChannel(); });
154  }
155  }
156 
157  void StopPreOp(size_t /* parent_index */) final {
158  LOG << *this << " running StopPreOp";
159  // Flush hash table before the postOp
160  pre_phase_.FlushAll();
161  pre_phase_.CloseAll();
162  if (use_post_thread_) {
163  // waiting for the additional thread to finish the reduce
164  thread_.join();
165  // deallocate stream if already processed
166  use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
167  }
168  }
169 
170  void Execute() final { }
171 
172  DIAMemUse PushDataMemUse() final {
173  return DIAMemUse::Max();
174  }
175 
176  void PushData(bool consume) final {
177 
178  if (!use_post_thread_ && !reduced_) {
179  // not final reduced, and no additional thread, perform post reduce
180  post_phase_.Initialize(DIABase::mem_limit_);
181  ProcessChannel();
182 
183  // deallocate stream if already processed
184  use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
185 
186  reduced_ = true;
187  }
188  post_phase_.PushData(consume);
189  }
190 
191  //! process the inbound data in the post reduce phase
192  void ProcessChannel() {
193  if (use_mix_stream_)
194  {
195  auto reader = mix_stream_->GetMixReader(/* consume */ true);
196  sLOG << "reading data from" << mix_stream_->id()
197  << "to push into post phase which flushes to" << this->dia_id();
198  while (reader.HasNext()) {
199  post_phase_.Insert(reader.template Next<TableItem>());
200  }
201  }
202  else
203  {
204  auto reader = cat_stream_->GetCatReader(/* consume */ true);
205  sLOG << "reading data from" << cat_stream_->id()
206  << "to push into post phase which flushes to" << this->dia_id();
207  while (reader.HasNext()) {
208  post_phase_.Insert(reader.template Next<TableItem>());
209  }
210  }
211  }
212 
213  void Dispose() final {
214  post_phase_.Dispose();
215  }
216 
217 private:
218  // pointers for both Mix and CatStream. only one is used, the other costs
219  // only a null pointer.
220  data::MixStreamPtr mix_stream_;
222 
224  //! handle to additional thread for post phase
225  std::thread thread_;
226 
228  TableItem, Key, ValueType, KeyExtractor,
229  ReduceFunction, VolatileKey, data::Stream::Writer, ReduceConfig,
230  HashIndexFunction, KeyEqualFunction, KeyHashFunction,
231  UseDuplicateDetection> pre_phase_;
232 
234  TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter,
235  VolatileKey, ReduceConfig,
236  HashIndexFunction, KeyEqualFunction> post_phase_;
237 
238  bool reduced_ = false;
239 };
240 
241 template <typename ValueType, typename Stack>
242 template <typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
244  const KeyExtractor& key_extractor,
245  const ReduceFunction& reduce_function,
246  const ReduceConfig& reduce_config) const {
247  // forward to main function
248  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
249  return ReduceByKey(
251  key_extractor, reduce_function, reduce_config,
252  std::hash<Key>(), std::equal_to<Key>());
253 }
254 
255 template <typename ValueType, typename Stack>
256 template <typename KeyExtractor, typename ReduceFunction,
257  typename ReduceConfig, typename KeyHashFunction>
259  const KeyExtractor& key_extractor,
260  const ReduceFunction& reduce_function,
261  const ReduceConfig& reduce_config,
262  const KeyHashFunction& key_hash_function) const {
263  // forward to main function
264  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
265  return ReduceByKey(
267  key_extractor, reduce_function, reduce_config,
268  key_hash_function, std::equal_to<Key>());
269 }
270 
271 template <typename ValueType, typename Stack>
272 template <bool VolatileKeyValue,
273  typename KeyExtractor, typename ReduceFunction, typename ReduceConfig,
274  typename KeyHashFunction, typename KeyEqualFunction>
276  const VolatileKeyFlag<VolatileKeyValue>& volatile_key_flag,
277  const KeyExtractor& key_extractor,
278  const ReduceFunction& reduce_function,
279  const ReduceConfig& reduce_config,
280  const KeyHashFunction& key_hash_function,
281  const KeyEqualFunction& key_equal_funtion) const {
282  // forward to main function
283  return ReduceByKey(
284  volatile_key_flag, NoDuplicateDetectionTag,
285  key_extractor, reduce_function, reduce_config,
286  key_hash_function, key_equal_funtion);
287 }
288 
289 template <typename ValueType, typename Stack>
290 template <bool DuplicateDetectionValue,
291  typename KeyExtractor, typename ReduceFunction, typename ReduceConfig,
292  typename KeyHashFunction, typename KeyEqualFunction>
294  const DuplicateDetectionFlag<DuplicateDetectionValue>& duplicate_detection_flag,
295  const KeyExtractor& key_extractor,
296  const ReduceFunction& reduce_function,
297  const ReduceConfig& reduce_config,
298  const KeyHashFunction& key_hash_function,
299  const KeyEqualFunction& key_equal_funtion) const {
300  // forward to main function
301  return ReduceByKey(
302  NoVolatileKeyTag, duplicate_detection_flag,
303  key_extractor, reduce_function, reduce_config,
304  key_hash_function, key_equal_funtion);
305 }
306 
307 template <typename ValueType, typename Stack>
308 template <bool VolatileKeyValue,
309  bool DuplicateDetectionValue,
310  typename KeyExtractor, typename ReduceFunction, typename ReduceConfig,
311  typename KeyHashFunction, typename KeyEqualFunction>
315  const KeyExtractor& key_extractor,
316  const ReduceFunction& reduce_function,
317  const ReduceConfig& reduce_config,
318  const KeyHashFunction& key_hash_function,
319  const KeyEqualFunction& key_equal_funtion) const {
320  assert(IsValid());
321 
322  using DOpResult
323  = typename common::FunctionTraits<ReduceFunction>::result_type;
324 
325  static_assert(
326  std::is_convertible<
327  ValueType,
328  typename common::FunctionTraits<ReduceFunction>::template arg<0>
329  >::value,
330  "ReduceFunction has the wrong input type");
331 
332  static_assert(
333  std::is_convertible<
334  ValueType,
335  typename common::FunctionTraits<ReduceFunction>::template arg<1>
336  >::value,
337  "ReduceFunction has the wrong input type");
338 
339  static_assert(
340  std::is_same<
341  DOpResult,
342  ValueType>::value,
343  "ReduceFunction has the wrong output type");
344 
345  static_assert(
346  std::is_same<
347  typename std::decay<typename common::FunctionTraits<KeyExtractor>::
348  template arg<0> >::type,
349  ValueType>::value,
350  "KeyExtractor has the wrong input type");
351 
352  using ReduceNode = api::ReduceNode<
353  DOpResult, KeyExtractor, ReduceFunction, ReduceConfig,
354  KeyHashFunction, KeyEqualFunction,
355  VolatileKeyValue, DuplicateDetectionValue>;
356 
357  auto node = tlx::make_counting<ReduceNode>(
358  *this, "ReduceByKey",
359  key_extractor, reduce_function, reduce_config,
360  key_hash_function, key_equal_funtion);
361 
362  return DIA<DOpResult>(node);
363 }
364 
365 /******************************************************************************/
366 // ReducePair
367 
368 template <typename ValueType, typename Stack>
369 template <typename ReduceFunction, typename ReduceConfig>
371  const ReduceFunction& reduce_function,
372  const ReduceConfig& reduce_config) const {
373  // forward to main function
374  using Key = typename ValueType::first_type;
375  return ReducePair(reduce_function, reduce_config,
376  std::hash<Key>(), std::equal_to<Key>());
377 }
378 
379 template <typename ValueType, typename Stack>
380 template <typename ReduceFunction, typename ReduceConfig,
381  typename KeyHashFunction>
383  const ReduceFunction& reduce_function,
384  const ReduceConfig& reduce_config,
385  const KeyHashFunction& key_hash_function) const {
386  // forward to main function
387  using Key = typename ValueType::first_type;
388  return ReducePair(reduce_function, reduce_config,
389  key_hash_function, std::equal_to<Key>());
390 }
391 
392 template <typename ValueType, typename Stack>
393 template <typename ReduceFunction, typename ReduceConfig,
394  typename KeyHashFunction, typename KeyEqualFunction>
396  const ReduceFunction& reduce_function,
397  const ReduceConfig& reduce_config,
398  const KeyHashFunction& key_hash_function,
399  const KeyEqualFunction& key_equal_funtion) const {
400  // forward to main function
401  return ReducePair(NoDuplicateDetectionTag,
402  reduce_function, reduce_config,
403  key_hash_function, key_equal_funtion);
404 }
405 
406 template <typename ValueType, typename Stack>
407 template <bool DuplicateDetectionValue,
408  typename ReduceFunction, typename ReduceConfig,
409  typename KeyHashFunction, typename KeyEqualFunction>
412  const ReduceFunction& reduce_function,
413  const ReduceConfig& reduce_config,
414  const KeyHashFunction& key_hash_function,
415  const KeyEqualFunction& key_equal_funtion) const {
416  assert(IsValid());
417 
418  using DOpResult
419  = typename common::FunctionTraits<ReduceFunction>::result_type;
420 
422  "ValueType is not a pair");
423 
424  static_assert(
425  std::is_convertible<
426  typename ValueType::second_type,
427  typename common::FunctionTraits<ReduceFunction>::template arg<0>
428  >::value,
429  "ReduceFunction has the wrong input type");
430 
431  static_assert(
432  std::is_convertible<
433  typename ValueType::second_type,
434  typename common::FunctionTraits<ReduceFunction>::template arg<1>
435  >::value,
436  "ReduceFunction has the wrong input type");
437 
438  static_assert(
439  std::is_same<
440  DOpResult,
441  typename ValueType::second_type>::value,
442  "ReduceFunction has the wrong output type");
443 
444  auto key_extractor = [](const ValueType& value) { return value.first; };
445 
446  auto reduce_pair_function =
447  [reduce_function](const ValueType& a, const ValueType& b) {
448  return ValueType(a.first, reduce_function(a.second, b.second));
449  };
450 
451  using ReduceNode = api::ReduceNode<
452  ValueType,
453  decltype(key_extractor), decltype(reduce_pair_function),
454  ReduceConfig, KeyHashFunction, KeyEqualFunction,
455  /* VolatileKey */ false, DuplicateDetectionValue>;
456 
457  auto node = tlx::make_counting<ReduceNode>(
458  *this, "ReducePair",
459  key_extractor, reduce_pair_function, reduce_config,
460  key_hash_function, key_equal_funtion);
461 
462  return DIA<ValueType>(node);
463 }
464 
465 } // namespace api
466 } // namespace thrill
467 
468 #endif // !THRILL_API_REDUCE_BY_KEY_HEADER
469 
470 /******************************************************************************/
static DIAMemUse Max()
Definition: dia_base.hpp:60
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
ValueType_ ValueType
Definition: dia.hpp:152
static constexpr bool use_post_thread_
void reset()
release contained pointer, frees object if this is the last reference.
const char * label() const
return label() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:218
Emitter for PostPhase to push elements to next DIA object.
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
core::ReduceByHash< Key, KeyHashFunction > HashIndexFunction
core::ReduceByHashPostPhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter, VolatileKey, ReduceConfig, HashIndexFunction, KeyEqualFunction > post_phase_
data::CatStreamPtr cat_stream_
virtual DIAMemUse PreOpMemUse()
Amount of RAM used by PreOp after StartPreOp()
Definition: dia_base.hpp:160
A reduce index function which returns a hash index and partition.
const struct VolatileKeyFlag< false > NoVolatileKeyTag
global const VolatileKeyFlag instance
Definition: dia.hpp:51
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
static constexpr bool use_mix_stream_
tag structure for ReduceByKey(), and ReduceToIndex()
Definition: dia.hpp:42
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
A DIANode which performs a Reduce operation.
auto ReducePair(const ReduceFunction &reduce_function, const ReduceConfig &reduce_config=ReduceConfig()) const
ReducePair is a DOp, which groups key-value-pairs in the input DIA by their key and reduces each key-...
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
Configuration class to define operational parameters of reduce hash tables and reduce phases...
data::Stream::Writers emitters_
test if is a std::pair<...>
Definition: is_std_pair.hpp:23
std::thread CreateThread(Args &&...args)
create a std::thread and repeat creation if it fails
Definition: porting.hpp:44
void operator()(const ValueType &item) const
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
auto ReduceByKey(const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, const ReduceConfig &reduce_config=ReduceConfig()) const
ReduceByKey is a DOp, which groups elements of the DIA with the key_extractor and reduces each key-bu...
const struct DuplicateDetectionFlag< false > NoDuplicateDetectionTag
global const DuplicateDetectionFlag instance
Definition: dia.hpp:112
DIAMemUse mem_limit_
Definition: dia_base.hpp:314
StreamData::Writer Writer
Definition: stream.hpp:39
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
ReduceNode(const ParentDIA &parent, const char *label, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, const ReduceConfig &config, const KeyHashFunction &key_hash_function, const KeyEqualFunction &key_equal_function)
Constructor for a ReduceNode.
typename common::FunctionTraits< KeyExtractor >::result_type Key
tag structure for ReduceByKey()
Definition: dia.hpp:103
tlx::CountingPtr< MixStream > MixStreamPtr
Definition: mix_stream.hpp:156
std::thread thread_
handle to additional thread for post phase
typename std::conditional< VolatileKey, std::pair< Key, ValueType >, ValueType >::type TableItem
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
static constexpr bool debug
core::ReducePrePhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, VolatileKey, data::Stream::Writer, ReduceConfig, HashIndexFunction, KeyEqualFunction, KeyHashFunction, UseDuplicateDetection > pre_phase_
tlx::CountingPtr< CatStream > CatStreamPtr
Definition: cat_stream.hpp:191
Context & context_
associated Context
Definition: dia_base.hpp:293
virtual DIAMemUse PushDataMemUse()
Amount of RAM used by PushData()
Definition: dia_base.hpp:182
const size_t & dia_id() const
return unique id of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213