Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
reduce_by_key.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/reduce_by_key.hpp
3  *
4  * DIANode for a reduce operation. Performs the actual reduce operation
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
9  * Copyright (C) 2015 Alexander Noe <[email protected]>
10  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_API_REDUCE_BY_KEY_HEADER
17 #define THRILL_API_REDUCE_BY_KEY_HEADER
18 
19 #include <thrill/api/dia.hpp>
20 #include <thrill/api/dop_node.hpp>
22 #include <thrill/common/logger.hpp>
26 #include <tlx/meta/is_std_pair.hpp>
27 
28 #include <functional>
29 #include <thread>
30 #include <type_traits>
31 #include <typeinfo>
32 #include <utility>
33 #include <vector>
34 
35 namespace thrill {
36 namespace api {
37 
39 { };
40 
41 /*!
42  * A DIANode which performs a Reduce operation. Reduce groups the elements in a
43  * DIA by their key and reduces every key bucket to a single element each. The
44  * ReduceNode stores the key_extractor and the reduce_function UDFs. The
45  * chainable LOps ahead of the Reduce operation are stored in the Stack. The
46  * ReduceNode has the type ValueType, which is the result type of the
47  * reduce_function.
48  *
49  * \tparam ValueType Output type of the Reduce operation
50  * \tparam Stack Function stack, which contains the chained lambdas between the
51  * last and this DIANode.
52  * \tparam KeyExtractor Type of the key_extractor function.
53  * \tparam ReduceFunction Type of the reduce_function.
54  * \tparam VolatileKey Whether to reuse the key once extracted in during pre reduce
55  * (false) or let the post reduce extract the key again (true).
56  *
57  * \ingroup api_layer
58  */
59 template <typename ValueType,
60  typename KeyExtractor, typename ReduceFunction,
61  typename ReduceConfig, typename KeyHashFunction,
62  typename KeyEqualFunction, const bool VolatileKey,
63  bool UseDuplicateDetection>
64 class ReduceNode final : public DOpNode<ValueType>
65 {
66 private:
67  static constexpr bool debug = false;
68 
70  using Super::context_;
71 
72  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
73 
74  using TableItem =
75  typename std::conditional<
76  VolatileKey, std::pair<Key, ValueType>, ValueType>::type;
77 
79 
80  static constexpr bool use_mix_stream_ = ReduceConfig::use_mix_stream_;
81  static constexpr bool use_post_thread_ = ReduceConfig::use_post_thread_;
82 
83  //! Emitter for PostPhase to push elements to next DIA object.
84  class Emitter
85  {
86  public:
87  explicit Emitter(ReduceNode* node) : node_(node) { }
88  void operator () (const ValueType& item) const
89  { return node_->PushItem(item); }
90 
91  private:
93  };
94 
95 public:
96  /*!
97  * Constructor for a ReduceNode. Sets the parent, stack, key_extractor and
98  * reduce_function.
99  */
100  template <typename ParentDIA>
101  ReduceNode(const ParentDIA& parent,
102  const char* label,
103  const KeyExtractor& key_extractor,
104  const ReduceFunction& reduce_function,
105  const ReduceConfig& config,
106  const KeyHashFunction& key_hash_function,
107  const KeyEqualFunction& key_equal_function)
108  : Super(parent.ctx(), label, { parent.id() }, { parent.node() }),
109  mix_stream_(use_mix_stream_ ?
110  parent.ctx().GetNewMixStream(this) : nullptr),
112  nullptr : parent.ctx().GetNewCatStream(this)),
114  mix_stream_->GetWriters() : cat_stream_->GetWriters()),
115  pre_phase_(
116  context_, Super::id(), parent.ctx().num_workers(),
117  key_extractor, reduce_function, emitters_, config,
118  HashIndexFunction(key_hash_function), key_equal_function,
119  key_hash_function),
120  post_phase_(
121  context_, Super::id(), key_extractor, reduce_function,
122  Emitter(this), config,
123  HashIndexFunction(key_hash_function), key_equal_function)
124  {
125  // Hook PreOp: Locally hash elements of the current DIA onto buckets and
126  // reduce each bucket to a single value, afterwards send data to another
127  // worker given by the shuffle algorithm.
128  auto pre_op_fn = [this](const ValueType& input) {
129  return pre_phase_.Insert(input);
130  };
131  // close the function stack with our pre op and register it at
132  // parent node for output
133  auto lop_chain = parent.stack().push(pre_op_fn).fold();
134  parent.node()->AddChild(this, lop_chain);
135  }
136 
137  DIAMemUse PreOpMemUse() final {
138  // request maximum RAM limit, the value is calculated by StageBuilder,
139  // and set as DIABase::mem_limit_.
140  return DIAMemUse::Max();
141  }
142 
143  void StartPreOp(size_t /* id */) final {
144  LOG << *this << " running StartPreOp";
145  if (!use_post_thread_) {
146  // use pre_phase without extra thread
147  pre_phase_.Initialize(DIABase::mem_limit_);
148  }
149  else {
150  pre_phase_.Initialize(DIABase::mem_limit_ / 2);
151  post_phase_.Initialize(DIABase::mem_limit_ / 2);
152 
153  // start additional thread to receive from the channel
154  thread_ = common::CreateThread([this] { ProcessChannel(); });
155  }
156  }
157 
158  void StopPreOp(size_t /* id */) final {
159  LOG << *this << " running StopPreOp";
160  // Flush hash table before the postOp
161  pre_phase_.FlushAll();
162  pre_phase_.CloseAll();
163  // waiting for the additional thread to finish the reduce
164  if (use_post_thread_) thread_.join();
165  use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
166  }
167 
168  void Execute() final { }
169 
170  DIAMemUse PushDataMemUse() final {
171  return DIAMemUse::Max();
172  }
173 
174  void PushData(bool consume) final {
175 
176  if (!use_post_thread_ && !reduced_) {
177  // not final reduced, and no additional thread, perform post reduce
178  post_phase_.Initialize(DIABase::mem_limit_);
179  ProcessChannel();
180 
181  reduced_ = true;
182  }
183  post_phase_.PushData(consume);
184  }
185 
186  //! process the inbound data in the post reduce phase
187  void ProcessChannel() {
188  if (use_mix_stream_)
189  {
190  auto reader = mix_stream_->GetMixReader(/* consume */ true);
191  sLOG << "reading data from" << mix_stream_->id()
192  << "to push into post phase which flushes to" << this->id();
193  while (reader.HasNext()) {
194  post_phase_.Insert(reader.template Next<TableItem>());
195  }
196  }
197  else
198  {
199  auto reader = cat_stream_->GetCatReader(/* consume */ true);
200  sLOG << "reading data from" << cat_stream_->id()
201  << "to push into post phase which flushes to" << this->id();
202  while (reader.HasNext()) {
203  post_phase_.Insert(reader.template Next<TableItem>());
204  }
205  }
206  }
207 
208  void Dispose() final {
209  post_phase_.Dispose();
210  }
211 
212 private:
213  // pointers for both Mix and CatStream. only one is used, the other costs
214  // only a null pointer.
215  data::MixStreamPtr mix_stream_;
217 
218  std::vector<data::Stream::Writer> emitters_;
219  //! handle to additional thread for post phase
220  std::thread thread_;
221 
223  TableItem, Key, ValueType, KeyExtractor,
224  ReduceFunction, VolatileKey, data::Stream::Writer, ReduceConfig,
225  HashIndexFunction, KeyEqualFunction, KeyHashFunction,
226  UseDuplicateDetection> pre_phase_;
227 
229  TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter,
230  VolatileKey, ReduceConfig,
231  HashIndexFunction, KeyEqualFunction> post_phase_;
232 
233  bool reduced_ = false;
234 };
235 
236 template <typename ValueType, typename Stack>
237 template <typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
239  const KeyExtractor& key_extractor,
240  const ReduceFunction& reduce_function,
241  const ReduceConfig& reduce_config) const {
242  // forward to main function
243  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
244  return ReduceByKey(
246  key_extractor, reduce_function, reduce_config,
247  std::hash<Key>(), std::equal_to<Key>());
248 }
249 
250 template <typename ValueType, typename Stack>
251 template <typename KeyExtractor, typename ReduceFunction,
252  typename ReduceConfig, typename KeyHashFunction>
254  const KeyExtractor& key_extractor,
255  const ReduceFunction& reduce_function,
256  const ReduceConfig& reduce_config,
257  const KeyHashFunction& key_hash_function) const {
258  // forward to main function
259  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
260  return ReduceByKey(
262  key_extractor, reduce_function, reduce_config,
263  key_hash_function, std::equal_to<Key>());
264 }
265 
266 template <typename ValueType, typename Stack>
267 template <bool VolatileKeyValue,
268  typename KeyExtractor, typename ReduceFunction, typename ReduceConfig,
269  typename KeyHashFunction, typename KeyEqualFunction>
271  const VolatileKeyFlag<VolatileKeyValue>& volatile_key_flag,
272  const KeyExtractor& key_extractor,
273  const ReduceFunction& reduce_function,
274  const ReduceConfig& reduce_config,
275  const KeyHashFunction& key_hash_function,
276  const KeyEqualFunction& key_equal_funtion) const {
277  // forward to main function
278  return ReduceByKey(
279  volatile_key_flag, NoDuplicateDetectionTag,
280  key_extractor, reduce_function, reduce_config,
281  key_hash_function, key_equal_funtion);
282 }
283 
284 template <typename ValueType, typename Stack>
285 template <bool DuplicateDetectionValue,
286  typename KeyExtractor, typename ReduceFunction, typename ReduceConfig,
287  typename KeyHashFunction, typename KeyEqualFunction>
289  const DuplicateDetectionFlag<DuplicateDetectionValue>& duplicate_detection_flag,
290  const KeyExtractor& key_extractor,
291  const ReduceFunction& reduce_function,
292  const ReduceConfig& reduce_config,
293  const KeyHashFunction& key_hash_function,
294  const KeyEqualFunction& key_equal_funtion) const {
295  // forward to main function
296  return ReduceByKey(
297  NoVolatileKeyTag, duplicate_detection_flag,
298  key_extractor, reduce_function, reduce_config,
299  key_hash_function, key_equal_funtion);
300 }
301 
302 template <typename ValueType, typename Stack>
303 template <bool VolatileKeyValue,
304  bool DuplicateDetectionValue,
305  typename KeyExtractor, typename ReduceFunction, typename ReduceConfig,
306  typename KeyHashFunction, typename KeyEqualFunction>
310  const KeyExtractor& key_extractor,
311  const ReduceFunction& reduce_function,
312  const ReduceConfig& reduce_config,
313  const KeyHashFunction& key_hash_function,
314  const KeyEqualFunction& key_equal_funtion) const {
315  assert(IsValid());
316 
317  using DOpResult
318  = typename common::FunctionTraits<ReduceFunction>::result_type;
319 
320  static_assert(
321  std::is_convertible<
322  ValueType,
323  typename common::FunctionTraits<ReduceFunction>::template arg<0>
324  >::value,
325  "ReduceFunction has the wrong input type");
326 
327  static_assert(
328  std::is_convertible<
329  ValueType,
330  typename common::FunctionTraits<ReduceFunction>::template arg<1>
331  >::value,
332  "ReduceFunction has the wrong input type");
333 
334  static_assert(
335  std::is_same<
336  DOpResult,
337  ValueType>::value,
338  "ReduceFunction has the wrong output type");
339 
340  static_assert(
341  std::is_same<
342  typename std::decay<typename common::FunctionTraits<KeyExtractor>::
343  template arg<0> >::type,
344  ValueType>::value,
345  "KeyExtractor has the wrong input type");
346 
347  using ReduceNode = api::ReduceNode<
348  DOpResult, KeyExtractor, ReduceFunction, ReduceConfig,
349  KeyHashFunction, KeyEqualFunction,
350  VolatileKeyValue, DuplicateDetectionValue>;
351 
352  auto node = tlx::make_counting<ReduceNode>(
353  *this, "ReduceByKey",
354  key_extractor, reduce_function, reduce_config,
355  key_hash_function, key_equal_funtion);
356 
357  return DIA<DOpResult>(node);
358 }
359 
360 /******************************************************************************/
361 // ReducePair
362 
363 template <typename ValueType, typename Stack>
364 template <typename ReduceFunction, typename ReduceConfig>
366  const ReduceFunction& reduce_function,
367  const ReduceConfig& reduce_config) const {
368  // forward to main function
369  using Key = typename ValueType::first_type;
370  return ReducePair(reduce_function, reduce_config,
371  std::hash<Key>(), std::equal_to<Key>());
372 }
373 
374 template <typename ValueType, typename Stack>
375 template <typename ReduceFunction, typename ReduceConfig,
376  typename KeyHashFunction>
378  const ReduceFunction& reduce_function,
379  const ReduceConfig& reduce_config,
380  const KeyHashFunction& key_hash_function) const {
381  // forward to main function
382  using Key = typename ValueType::first_type;
383  return ReducePair(reduce_function, reduce_config,
384  key_hash_function, std::equal_to<Key>());
385 }
386 
387 template <typename ValueType, typename Stack>
388 template <typename ReduceFunction, typename ReduceConfig,
389  typename KeyHashFunction, typename KeyEqualFunction>
391  const ReduceFunction& reduce_function,
392  const ReduceConfig& reduce_config,
393  const KeyHashFunction& key_hash_function,
394  const KeyEqualFunction& key_equal_funtion) const {
395  // forward to main function
396  return ReducePair(NoDuplicateDetectionTag,
397  reduce_function, reduce_config,
398  key_hash_function, key_equal_funtion);
399 }
400 
401 template <typename ValueType, typename Stack>
402 template <bool DuplicateDetectionValue,
403  typename ReduceFunction, typename ReduceConfig,
404  typename KeyHashFunction, typename KeyEqualFunction>
407  const ReduceFunction& reduce_function,
408  const ReduceConfig& reduce_config,
409  const KeyHashFunction& key_hash_function,
410  const KeyEqualFunction& key_equal_funtion) const {
411  assert(IsValid());
412 
413  using DOpResult
414  = typename common::FunctionTraits<ReduceFunction>::result_type;
415 
417  "ValueType is not a pair");
418 
419  static_assert(
420  std::is_convertible<
421  typename ValueType::second_type,
422  typename common::FunctionTraits<ReduceFunction>::template arg<0>
423  >::value,
424  "ReduceFunction has the wrong input type");
425 
426  static_assert(
427  std::is_convertible<
428  typename ValueType::second_type,
429  typename common::FunctionTraits<ReduceFunction>::template arg<1>
430  >::value,
431  "ReduceFunction has the wrong input type");
432 
433  static_assert(
434  std::is_same<
435  DOpResult,
436  typename ValueType::second_type>::value,
437  "ReduceFunction has the wrong output type");
438 
439  auto key_extractor = [](const ValueType& value) { return value.first; };
440 
441  auto reduce_pair_function =
442  [reduce_function](const ValueType& a, const ValueType& b) {
443  return ValueType(a.first, reduce_function(a.second, b.second));
444  };
445 
446  using ReduceNode = api::ReduceNode<
447  ValueType,
448  decltype(key_extractor), decltype(reduce_pair_function),
449  ReduceConfig, KeyHashFunction, KeyEqualFunction,
450  /* VolatileKey */ false, DuplicateDetectionValue>;
451 
452  auto node = tlx::make_counting<ReduceNode>(
453  *this, "ReducePair",
454  key_extractor, reduce_pair_function, reduce_config,
455  key_hash_function, key_equal_funtion);
456 
457  return DIA<ValueType>(node);
458 }
459 
460 } // namespace api
461 } // namespace thrill
462 
463 #endif // !THRILL_API_REDUCE_BY_KEY_HEADER
464 
465 /******************************************************************************/
std::vector< data::Stream::Writer > emitters_
static DIAMemUse Max()
Definition: dia_base.hpp:60
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:133
ValueType_ ValueType
Definition: dia.hpp:144
static constexpr bool use_post_thread_
void reset()
release contained pointer, frees object if this is the last reference.
const char * label() const
return label() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:218
Emitter for PostPhase to push elements to next DIA object.
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
core::ReduceByHash< Key, KeyHashFunction > HashIndexFunction
const size_t & id() const
return unique id() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213
core::ReduceByHashPostPhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter, VolatileKey, ReduceConfig, HashIndexFunction, KeyEqualFunction > post_phase_
data::CatStreamPtr cat_stream_
virtual DIAMemUse PreOpMemUse()
Amount of RAM used by PreOp after StartPreOp()
Definition: dia_base.hpp:160
A reduce index function which returns a hash index and partition.
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:184
const struct VolatileKeyFlag< false > NoVolatileKeyTag
global const VolatileKeyFlag instance
Definition: dia.hpp:51
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
static constexpr bool use_mix_stream_
tag structure for ReduceByKey(), and ReduceToIndex()
Definition: dia.hpp:42
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
A DIANode which performs a Reduce operation.
auto ReducePair(const ReduceFunction &reduce_function, const ReduceConfig &reduce_config=ReduceConfig()) const
ReducePair is a DOp, which groups key-value-pairs in the input DIA by their key and reduces each key-...
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
Configuration class to define operational parameters of reduce hash tables and reduce phases...
test if is a std::pair<...>
Definition: is_std_pair.hpp:23
std::thread CreateThread(Args &&...args)
create a std::thread and repeat creation if it fails
Definition: porting.hpp:42
void operator()(const ValueType &item) const
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
auto ReduceByKey(const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, const ReduceConfig &reduce_config=ReduceConfig()) const
ReduceByKey is a DOp, which groups elements of the DIA with the key_extractor and reduces each key-bu...
const struct DuplicateDetectionFlag< false > NoDuplicateDetectionTag
global const DuplicateDetectionFlag instance
Definition: dia.hpp:104
DIAMemUse mem_limit_
Definition: dia_base.hpp:314
StreamData::Writer Writer
Definition: stream.hpp:40
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
ReduceNode(const ParentDIA &parent, const char *label, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, const ReduceConfig &config, const KeyHashFunction &key_hash_function, const KeyEqualFunction &key_equal_function)
Constructor for a ReduceNode.
typename common::FunctionTraits< KeyExtractor >::result_type Key
tag structure for ReduceByKey()
Definition: dia.hpp:95
tlx::CountingPtr< MixStream > MixStreamPtr
Definition: mix_stream.hpp:152
std::thread thread_
handle to additional thread for post phase
typename std::conditional< VolatileKey, std::pair< Key, ValueType >, ValueType >::type TableItem
static constexpr bool debug
core::ReducePrePhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, VolatileKey, data::Stream::Writer, ReduceConfig, HashIndexFunction, KeyEqualFunction, KeyHashFunction, UseDuplicateDetection > pre_phase_
tlx::CountingPtr< CatStream > CatStreamPtr
Definition: cat_stream.hpp:188
Context & context_
associated Context
Definition: dia_base.hpp:293
virtual DIAMemUse PushDataMemUse()
Amount of RAM used by PushData()
Definition: dia_base.hpp:182
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:172