Thrill  0.1
reduce_to_index.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/reduce_to_index.hpp
3  *
4  * DIANode for a reduce operation. Performs the actual reduce operation
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Alexander Noe <[email protected]>
9  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
10  * Copyright (C) 2017 Tim Zeitz <[email protected]>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_API_REDUCE_TO_INDEX_HEADER
17 #define THRILL_API_REDUCE_TO_INDEX_HEADER
18 
19 #include <thrill/api/context.hpp>
20 #include <thrill/api/dia.hpp>
21 #include <thrill/api/dop_node.hpp>
23 #include <thrill/common/logger.hpp>
27 
28 #include <functional>
29 #include <thread>
30 #include <type_traits>
31 #include <utility>
32 #include <vector>
33 
34 namespace thrill {
35 namespace api {
36 
38 { };
39 
40 /*!
41  * A DIANode which performs a ReduceToIndex operation. ReduceToIndex groups the
42  * elements in a DIA by their key and reduces every key bucket to a single
43  * element each. The ReduceToIndexNode stores the key_extractor and the
44  * reduce_function UDFs. The chainable LOps ahead of the Reduce operation are
45  * stored in the Stack. The ReduceToIndexNode has the type ValueType, which is
46  * the result type of the reduce_function. The key type is an unsigned integer
47  * and the output DIA will have element with key K at index K.
48  *
49  * \tparam ParentType Input type of the Reduce operation
50  * \tparam ValueType Output type of the Reduce operation
51  * \tparam ParentStack Function stack, which contains the chained lambdas between the last and this DIANode.
52  * \tparam KeyExtractor Type of the key_extractor function.
53  * \tparam ReduceFunction Type of the reduce_function
54  *
55  * \ingroup api_layer
56  */
57 template <typename ValueType,
58  typename KeyExtractor, typename ReduceFunction,
59  typename ReduceConfig, bool VolatileKey, bool SkipPreReducePhase>
60 class ReduceToIndexNode final : public DOpNode<ValueType>
61 {
62  static constexpr bool debug = false;
63 
65  using Super::context_;
66 
67  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
68 
69  using TableItem =
70  typename std::conditional<
71  VolatileKey, std::pair<Key, ValueType>, ValueType>::type;
72 
74  "Key must be an unsigned integer");
75 
76  static constexpr bool use_mix_stream_ = ReduceConfig::use_mix_stream_;
77  static constexpr bool use_post_thread_ = ReduceConfig::use_post_thread_;
78 
79 private:
80  //! Emitter for PostPhase to push elements to next DIA object.
81  class Emitter
82  {
83  public:
84  explicit Emitter(ReduceToIndexNode* node) : node_(node) { }
85  void operator () (const ValueType& item) const
86  { return node_->PushItem(item); }
87 
88  private:
90  };
91 
92 public:
93  /*!
94  * Constructor for a ReduceToIndexNode. Sets the parent, stack,
95  * key_extractor and reduce_function.
96  */
97  template <typename ParentDIA>
98  ReduceToIndexNode(const ParentDIA& parent,
99  const char* label,
100  const KeyExtractor& key_extractor,
101  const ReduceFunction& reduce_function,
102  size_t result_size,
103  const ValueType& neutral_element,
104  const ReduceConfig& config)
105  : Super(parent.ctx(), label, { parent.id() }, { parent.node() }),
106  mix_stream_(use_mix_stream_ ?
107  parent.ctx().GetNewMixStream(this) : nullptr),
108  cat_stream_(use_mix_stream_ ?
109  nullptr : parent.ctx().GetNewCatStream(this)),
110  emitters_(use_mix_stream_ ?
111  mix_stream_->GetWriters() : cat_stream_->GetWriters()),
112  result_size_(result_size),
113  pre_phase_(
114  context_, Super::dia_id(), context_.num_workers(),
115  key_extractor, reduce_function, emitters_,
116  config, core::ReduceByIndex<Key>(0, result_size)),
117  post_phase_(
118  context_, Super::dia_id(),
119  key_extractor, reduce_function, Emitter(this),
120  config, neutral_element) {
121  // Hook PreOp: Locally hash elements of the current DIA onto buckets and
122  // reduce each bucket to a single value, afterwards send data to another
123  // worker given by the shuffle algorithm.
124  auto pre_op_fn = [this](const ValueType& input) {
125  if (SkipPreReducePhase)
126  pre_phase_.InsertSkip(input);
127  else
128  pre_phase_.Insert(input);
129  };
130 
131  // close the function stack with our pre op and register it at parent
132  // node for output
133  auto lop_chain = parent.stack().push(pre_op_fn).fold();
134  parent.node()->AddChild(this, lop_chain);
135  }
136 
138  // request maximum RAM limit, the value is calculated by StageBuilder,
139  // and set as DIABase::mem_limit_.
140  return DIAMemUse::Max();
141  }
142 
143  void StartPreOp(size_t /* parent_index */) final {
144  if (!use_post_thread_) {
145  // use pre_phase without extra thread
146  if (!SkipPreReducePhase)
147  pre_phase_.Initialize(DIABase::mem_limit_);
148  else
149  pre_phase_.InitializeSkip();
150 
151  // re-parameterize with resulting key range on this worker - this is
152  // only known after Initialize() of the pre_phase_.
153  post_phase_.SetRange(pre_phase_.key_range(context_.my_rank()));
154  }
155  else {
156  if (!SkipPreReducePhase)
157  pre_phase_.Initialize(DIABase::mem_limit_ / 2);
158  else
159  pre_phase_.InitializeSkip();
160 
161  // re-parameterize with resulting key range on this worker - this is
162  // only know after Initialize() of the pre_phase_.
163  post_phase_.SetRange(pre_phase_.key_range(context_.my_rank()));
164  post_phase_.Initialize(DIABase::mem_limit_ / 2);
165 
166  // start additional thread to receive from the channel
167  thread_ = common::CreateThread([this] { ProcessChannel(); });
168  }
169  }
170 
171  void StopPreOp(size_t /* parent_index */) final {
172  LOG << *this << " running StopPreOp";
173  // Flush hash table before the postOp
174  if (!SkipPreReducePhase)
175  pre_phase_.FlushAll();
176  pre_phase_.CloseAll();
177  if (use_post_thread_) {
178  // waiting for the additional thread to finish the reduce
179  thread_.join();
180  // deallocate stream if already processed
181  use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
182  }
183  }
184 
185  void Execute() final { }
186 
188  return DIAMemUse::Max();
189  }
190 
191  void PushData(bool consume) final {
192 
193  if (!use_post_thread_ && !reduced_) {
194  // not final reduced, and no additional thread, perform post reduce
195  post_phase_.Initialize(DIABase::mem_limit_);
196  ProcessChannel();
197 
198  // deallocate stream if already processed
199  use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
200 
201  reduced_ = true;
202  }
203  post_phase_.PushData(consume);
204  }
205 
206  //! process the inbound data in the post reduce phase
207  void ProcessChannel() {
208  if (use_mix_stream_)
209  {
210  auto reader = mix_stream_->GetMixReader(/* consume */ true);
211  sLOG << "reading data from" << mix_stream_->id()
212  << "to push into post table which flushes to" << this->dia_id();
213  while (reader.HasNext()) {
214  post_phase_.Insert(reader.template Next<TableItem>());
215  }
216  }
217  else
218  {
219  auto reader = cat_stream_->GetCatReader(/* consume */ true);
220  sLOG << "reading data from" << cat_stream_->id()
221  << "to push into post table which flushes to" << this->dia_id();
222  while (reader.HasNext()) {
223  post_phase_.Insert(reader.template Next<TableItem>());
224  }
225  }
226  }
227 
228  void Dispose() final {
229  post_phase_.Dispose();
230  }
231 
232 private:
233  // pointers for both Mix and CatStream. only one is used, the other costs
234  // only a null pointer.
237 
239 
240  size_t result_size_;
241 
242  //! handle to additional thread for post phase
243  std::thread thread_;
244 
246  TableItem, Key, ValueType, KeyExtractor, ReduceFunction, VolatileKey,
247  data::Stream::Writer, ReduceConfig, core::ReduceByIndex<Key>
249 
251  TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter,
252  VolatileKey, ReduceConfig> post_phase_;
253 
254  bool reduced_ = false;
255 };
256 
257 template <typename ValueType, typename Stack>
258 template <typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
260  const KeyExtractor& key_extractor,
261  const ReduceFunction& reduce_function,
262  size_t size,
263  const ValueType& neutral_element,
264  const ReduceConfig& reduce_config) const {
265  // forward to main function
266  return ReduceToIndex(
268  key_extractor, reduce_function, size, neutral_element, reduce_config);
269 }
270 
271 template <typename ValueType, typename Stack>
272 template <bool VolatileKeyValue,
273  typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
276  const KeyExtractor& key_extractor,
277  const ReduceFunction& reduce_function,
278  size_t size,
279  const ValueType& neutral_element,
280  const ReduceConfig& reduce_config) const {
281  assert(IsValid());
282 
283  using DOpResult
284  = typename common::FunctionTraits<ReduceFunction>::result_type;
285 
286  static_assert(
287  std::is_convertible<
288  ValueType,
289  typename common::FunctionTraits<ReduceFunction>::template arg<0>
290  >::value,
291  "ReduceFunction has the wrong input type");
292 
293  static_assert(
294  std::is_convertible<
295  ValueType,
296  typename common::FunctionTraits<ReduceFunction>::template arg<1>
297  >::value,
298  "ReduceFunction has the wrong input type");
299 
300  static_assert(
301  std::is_same<
302  DOpResult,
303  ValueType>::value,
304  "ReduceFunction has the wrong output type");
305 
306  static_assert(
307  std::is_same<
308  typename std::decay<typename common::FunctionTraits<KeyExtractor>::
309  template arg<0> >::type,
310  ValueType>::value,
311  "KeyExtractor has the wrong input type");
312 
313  static_assert(
314  std::is_same<
315  typename common::FunctionTraits<KeyExtractor>::result_type,
316  size_t>::value,
317  "The key has to be an unsigned long int (aka. size_t).");
318 
320  DOpResult, KeyExtractor, ReduceFunction, ReduceConfig,
321  VolatileKeyValue, /* SkipPreReducePhase */ false>;
322 
323  auto node = tlx::make_counting<ReduceNode>(
324  *this, "ReduceToIndex", key_extractor, reduce_function,
325  size, neutral_element, reduce_config);
326 
327  return DIA<DOpResult>(node);
328 }
329 
330 template <typename ValueType, typename Stack>
331 template <typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
333  const struct SkipPreReducePhaseTag&,
334  const KeyExtractor& key_extractor,
335  const ReduceFunction& reduce_function,
336  size_t size,
337  const ValueType& neutral_element,
338  const ReduceConfig& reduce_config) const {
339  assert(IsValid());
340 
341  using DOpResult
342  = typename common::FunctionTraits<ReduceFunction>::result_type;
343 
344  static_assert(
345  std::is_convertible<
346  ValueType,
347  typename common::FunctionTraits<ReduceFunction>::template arg<0>
348  >::value,
349  "ReduceFunction has the wrong input type");
350 
351  static_assert(
352  std::is_convertible<
353  ValueType,
354  typename common::FunctionTraits<ReduceFunction>::template arg<1>
355  >::value,
356  "ReduceFunction has the wrong input type");
357 
358  static_assert(
359  std::is_same<
360  DOpResult,
361  ValueType>::value,
362  "ReduceFunction has the wrong output type");
363 
364  static_assert(
365  std::is_same<
366  typename std::decay<typename common::FunctionTraits<KeyExtractor>::
367  template arg<0> >::type,
368  ValueType>::value,
369  "KeyExtractor has the wrong input type");
370 
371  static_assert(
372  std::is_same<
373  typename common::FunctionTraits<KeyExtractor>::result_type,
374  size_t>::value,
375  "The key has to be an unsigned long int (aka. size_t).");
376 
378  DOpResult, KeyExtractor, ReduceFunction, ReduceConfig,
379  /* VolatileKey */ false, /* SkipPreReducePhase */ true>;
380 
381  auto node = tlx::make_counting<ReduceNode>(
382  *this, "ReduceToIndex", key_extractor, reduce_function,
383  size, neutral_element, reduce_config);
384 
385  return DIA<DOpResult>(node);
386 }
387 
388 } // namespace api
389 } // namespace thrill
390 
391 #endif // !THRILL_API_REDUCE_TO_INDEX_HEADER
392 
393 /******************************************************************************/
std::thread CreateThread(Args &&... args)
create a std::thread and repeat creation if it fails
Definition: porting.hpp:44
void StopPreOp(size_t) final
Virtual method for preparing end of PushData.
void StartPreOp(size_t) final
Virtual method for preparing start of PushData.
data::Stream::Writers emitters_
static DIAMemUse Max()
Definition: dia_base.hpp:60
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
core::ReduceByIndexPostPhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter, VolatileKey, ReduceConfig > post_phase_
Description of the amount of RAM the internal data structures of a DIANode require.
Definition: dia_base.hpp:51
core::ReducePrePhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, VolatileKey, data::Stream::Writer, ReduceConfig, core::ReduceByIndex< Key > > pre_phase_
auto ReduceToIndex(const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, size_t size, const ValueType &neutral_element=ValueType(), const ReduceConfig &reduce_config=ReduceConfig()) const
ReduceToIndex is a DOp, which groups elements of the DIA with the key_extractor returning an unsigned...
typename common::FunctionTraits< KeyExtractor >::result_type Key
static constexpr bool use_post_thread_
tag structure for ReduceToIndex()
Definition: dia.hpp:54
ReduceToIndexNode(const ParentDIA &parent, const char *label, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, size_t result_size, const ValueType &neutral_element, const ReduceConfig &config)
Constructor for a ReduceToIndexNode.
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
A DIANode which performs a ReduceToIndex operation.
const struct VolatileKeyFlag< false > NoVolatileKeyTag
global const VolatileKeyFlag instance
Definition: dia.hpp:51
virtual void AddChild(DIABase *node, const Callback &callback=Callback(), size_t parent_index=0)
Enables children to push their "folded" function chains to their parent.
Definition: dia_node.hpp:76
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:59
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
tag structure for ReduceByKey(), and ReduceToIndex()
Definition: dia.hpp:42
A DIANode which performs a Reduce operation.
static constexpr bool use_mix_stream_
int value
Definition: gen_data.py:41
A reduce index function, which determines a bucket depending on the current index range [begin...
Configuration class to define operational parameters of reduce hash tables and reduce phases...
DIAMemUse PreOpMemUse() final
Amount of RAM used by PreOp after StartPreOp()
static constexpr bool debug
void ProcessChannel()
process the inbound data in the post reduce phase
DIAMemUse mem_limit_
Definition: dia_base.hpp:314
StreamData::Writer Writer
Definition: stream.hpp:39
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
DIAMemUse PushDataMemUse() final
Amount of RAM used by PushData()
typename std::conditional< VolatileKey, std::pair< Key, ValueType >, ValueType >::type TableItem
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
std::thread thread_
handle to additional thread for post phase
Emitter for PostPhase to push elements to next DIA object.
void Execute() final
Virtual execution method. Triggers actual computation in sub-classes.