Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
reduce_to_index.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/reduce_to_index.hpp
3  *
4  * DIANode for a reduce operation. Performs the actual reduce operation
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Alexander Noe <[email protected]>
9  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
10  *
11  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
12  ******************************************************************************/
13 
14 #pragma once
15 #ifndef THRILL_API_REDUCE_TO_INDEX_HEADER
16 #define THRILL_API_REDUCE_TO_INDEX_HEADER
17 
18 #include <thrill/api/context.hpp>
19 #include <thrill/api/dia.hpp>
20 #include <thrill/api/dop_node.hpp>
22 #include <thrill/common/logger.hpp>
26 
27 #include <functional>
28 #include <thread>
29 #include <type_traits>
30 #include <utility>
31 #include <vector>
32 
33 namespace thrill {
34 namespace api {
35 
37 { };
38 
39 /*!
40  * A DIANode which performs a ReduceToIndex operation. ReduceToIndex groups the
41  * elements in a DIA by their key and reduces every key bucket to a single
42  * element each. The ReduceToIndexNode stores the key_extractor and the
43  * reduce_function UDFs. The chainable LOps ahead of the Reduce operation are
44  * stored in the Stack. The ReduceToIndexNode has the type ValueType, which is
45  * the result type of the reduce_function. The key type is an unsigned integer
46  * and the output DIA will have element with key K at index K.
47  *
48  * \tparam ParentType Input type of the Reduce operation
49  * \tparam ValueType Output type of the Reduce operation
50  * \tparam ParentStack Function stack, which contains the chained lambdas between the last and this DIANode.
51  * \tparam KeyExtractor Type of the key_extractor function.
52  * \tparam ReduceFunction Type of the reduce_function
53  *
54  * \ingroup api_layer
55  */
56 template <typename ValueType,
57  typename KeyExtractor, typename ReduceFunction,
58  typename ReduceConfig, bool VolatileKey, bool SkipPreReducePhase>
59 class ReduceToIndexNode final : public DOpNode<ValueType>
60 {
61  static constexpr bool debug = false;
62 
64  using Super::context_;
65 
66  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
67 
68  using TableItem =
69  typename std::conditional<
70  VolatileKey, std::pair<Key, ValueType>, ValueType>::type;
71 
73  "Key must be an unsigned integer");
74 
75  static constexpr bool use_mix_stream_ = ReduceConfig::use_mix_stream_;
76  static constexpr bool use_post_thread_ = ReduceConfig::use_post_thread_;
77 
78 private:
79  //! Emitter for PostPhase to push elements to next DIA object.
80  class Emitter
81  {
82  public:
83  explicit Emitter(ReduceToIndexNode* node) : node_(node) { }
84  void operator () (const ValueType& item) const
85  { return node_->PushItem(item); }
86 
87  private:
89  };
90 
91 public:
92  /*!
93  * Constructor for a ReduceToIndexNode. Sets the parent, stack,
94  * key_extractor and reduce_function.
95  */
96  template <typename ParentDIA>
97  ReduceToIndexNode(const ParentDIA& parent,
98  const char* label,
99  const KeyExtractor& key_extractor,
100  const ReduceFunction& reduce_function,
101  size_t result_size,
102  const ValueType& neutral_element,
103  const ReduceConfig& config)
104  : Super(parent.ctx(), label, { parent.id() }, { parent.node() }),
105  mix_stream_(use_mix_stream_ ?
106  parent.ctx().GetNewMixStream(this) : nullptr),
108  nullptr : parent.ctx().GetNewCatStream(this)),
110  mix_stream_->GetWriters() : cat_stream_->GetWriters()),
111  result_size_(result_size),
112  pre_phase_(
114  key_extractor, reduce_function, emitters_,
115  config, core::ReduceByIndex<Key>(0, result_size)),
116  post_phase_(
117  context_, Super::id(),
118  key_extractor, reduce_function, Emitter(this),
119  config, neutral_element)
120  {
121  // Hook PreOp: Locally hash elements of the current DIA onto buckets and
122  // reduce each bucket to a single value, afterwards send data to another
123  // worker given by the shuffle algorithm.
124  auto pre_op_fn = [this](const ValueType& input) {
125  if (SkipPreReducePhase)
126  pre_phase_.InsertSkip(input);
127  else
128  pre_phase_.Insert(input);
129  };
130 
131  // close the function stack with our pre op and register it at parent
132  // node for output
133  auto lop_chain = parent.stack().push(pre_op_fn).fold();
134  parent.node()->AddChild(this, lop_chain);
135  }
136 
137  DIAMemUse PreOpMemUse() final {
138  // request maximum RAM limit, the value is calculated by StageBuilder,
139  // and set as DIABase::mem_limit_.
140  return DIAMemUse::Max();
141  }
142 
143  void StartPreOp(size_t /* id */) final {
144  if (!use_post_thread_) {
145  // use pre_phase without extra thread
146  if (!SkipPreReducePhase)
147  pre_phase_.Initialize(DIABase::mem_limit_);
148  else
149  pre_phase_.InitializeSkip();
150 
151  // re-parameterize with resulting key range on this worker - this is
152  // only known after Initialize() of the pre_phase_.
153  post_phase_.SetRange(pre_phase_.key_range(context_.my_rank()));
154  }
155  else {
156  if (!SkipPreReducePhase)
157  pre_phase_.Initialize(DIABase::mem_limit_ / 2);
158  else
159  pre_phase_.InitializeSkip();
160 
161  // re-parameterize with resulting key range on this worker - this is
162  // only know after Initialize() of the pre_phase_.
163  post_phase_.SetRange(pre_phase_.key_range(context_.my_rank()));
164  post_phase_.Initialize(DIABase::mem_limit_ / 2);
165 
166  // start additional thread to receive from the channel
167  thread_ = common::CreateThread([this] { ProcessChannel(); });
168  }
169  }
170 
171  void StopPreOp(size_t /* id */) final {
172  LOG << *this << " running StopPreOp";
173  // Flush hash table before the postOp
174  if (!SkipPreReducePhase)
175  pre_phase_.FlushAll();
176  pre_phase_.CloseAll();
177  if (use_post_thread_) {
178  // waiting for the additional thread to finish the reduce
179  thread_.join();
180  // deallocate stream if already processed
181  use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
182  }
183  }
184 
185  void Execute() final { }
186 
187  DIAMemUse PushDataMemUse() final {
188  return DIAMemUse::Max();
189  }
190 
191  void PushData(bool consume) final {
192 
193  if (!use_post_thread_ && !reduced_) {
194  // not final reduced, and no additional thread, perform post reduce
195  post_phase_.Initialize(DIABase::mem_limit_);
196  ProcessChannel();
197 
198  // deallocate stream if already processed
199  use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
200 
201  reduced_ = true;
202  }
203  post_phase_.PushData(consume);
204  }
205 
206  //! process the inbound data in the post reduce phase
207  void ProcessChannel() {
208  if (use_mix_stream_)
209  {
210  auto reader = mix_stream_->GetMixReader(/* consume */ true);
211  sLOG << "reading data from" << mix_stream_->id()
212  << "to push into post table which flushes to" << this->id();
213  while (reader.HasNext()) {
214  post_phase_.Insert(reader.template Next<TableItem>());
215  }
216  }
217  else
218  {
219  auto reader = cat_stream_->GetCatReader(/* consume */ true);
220  sLOG << "reading data from" << cat_stream_->id()
221  << "to push into post table which flushes to" << this->id();
222  while (reader.HasNext()) {
223  post_phase_.Insert(reader.template Next<TableItem>());
224  }
225  }
226  }
227 
228  void Dispose() final {
229  post_phase_.Dispose();
230  }
231 
232 private:
233  // pointers for both Mix and CatStream. only one is used, the other costs
234  // only a null pointer.
235  data::MixStreamPtr mix_stream_;
237 
239 
240  size_t result_size_;
241 
242  //! handle to additional thread for post phase
243  std::thread thread_;
244 
246  TableItem, Key, ValueType, KeyExtractor, ReduceFunction, VolatileKey,
249 
251  TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter,
252  VolatileKey, ReduceConfig> post_phase_;
253 
254  bool reduced_ = false;
255 };
256 
257 template <typename ValueType, typename Stack>
258 template <typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
260  const KeyExtractor& key_extractor,
261  const ReduceFunction& reduce_function,
262  size_t size,
263  const ValueType& neutral_element,
264  const ReduceConfig& reduce_config) const {
265  // forward to main function
266  return ReduceToIndex(
268  key_extractor, reduce_function, size, neutral_element, reduce_config);
269 }
270 
271 template <typename ValueType, typename Stack>
272 template <bool VolatileKeyValue,
273  typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
276  const KeyExtractor& key_extractor,
277  const ReduceFunction& reduce_function,
278  size_t size,
279  const ValueType& neutral_element,
280  const ReduceConfig& reduce_config) const {
281  assert(IsValid());
282 
283  using DOpResult
284  = typename common::FunctionTraits<ReduceFunction>::result_type;
285 
286  static_assert(
287  std::is_convertible<
288  ValueType,
289  typename common::FunctionTraits<ReduceFunction>::template arg<0>
290  >::value,
291  "ReduceFunction has the wrong input type");
292 
293  static_assert(
294  std::is_convertible<
295  ValueType,
296  typename common::FunctionTraits<ReduceFunction>::template arg<1>
297  >::value,
298  "ReduceFunction has the wrong input type");
299 
300  static_assert(
301  std::is_same<
302  DOpResult,
303  ValueType>::value,
304  "ReduceFunction has the wrong output type");
305 
306  static_assert(
307  std::is_same<
308  typename std::decay<typename common::FunctionTraits<KeyExtractor>::
309  template arg<0> >::type,
310  ValueType>::value,
311  "KeyExtractor has the wrong input type");
312 
313  static_assert(
314  std::is_same<
315  typename common::FunctionTraits<KeyExtractor>::result_type,
316  size_t>::value,
317  "The key has to be an unsigned long int (aka. size_t).");
318 
320  DOpResult, KeyExtractor, ReduceFunction, ReduceConfig,
321  VolatileKeyValue, /* SkipPreReducePhase */ false>;
322 
323  auto node = tlx::make_counting<ReduceNode>(
324  *this, "ReduceToIndex", key_extractor, reduce_function,
325  size, neutral_element, reduce_config);
326 
327  return DIA<DOpResult>(node);
328 }
329 
330 template <typename ValueType, typename Stack>
331 template <typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
333  const struct SkipPreReducePhaseTag&,
334  const KeyExtractor& key_extractor,
335  const ReduceFunction& reduce_function,
336  size_t size,
337  const ValueType& neutral_element,
338  const ReduceConfig& reduce_config) const {
339  assert(IsValid());
340 
341  using DOpResult
342  = typename common::FunctionTraits<ReduceFunction>::result_type;
343 
344  static_assert(
345  std::is_convertible<
346  ValueType,
347  typename common::FunctionTraits<ReduceFunction>::template arg<0>
348  >::value,
349  "ReduceFunction has the wrong input type");
350 
351  static_assert(
352  std::is_convertible<
353  ValueType,
354  typename common::FunctionTraits<ReduceFunction>::template arg<1>
355  >::value,
356  "ReduceFunction has the wrong input type");
357 
358  static_assert(
359  std::is_same<
360  DOpResult,
361  ValueType>::value,
362  "ReduceFunction has the wrong output type");
363 
364  static_assert(
365  std::is_same<
366  typename std::decay<typename common::FunctionTraits<KeyExtractor>::
367  template arg<0> >::type,
368  ValueType>::value,
369  "KeyExtractor has the wrong input type");
370 
371  static_assert(
372  std::is_same<
373  typename common::FunctionTraits<KeyExtractor>::result_type,
374  size_t>::value,
375  "The key has to be an unsigned long int (aka. size_t).");
376 
378  DOpResult, KeyExtractor, ReduceFunction, ReduceConfig,
379  /* VolatileKey */ false, /* SkipPreReducePhase */ true>;
380 
381  auto node = tlx::make_counting<ReduceNode>(
382  *this, "ReduceToIndex", key_extractor, reduce_function,
383  size, neutral_element, reduce_config);
384 
385  return DIA<DOpResult>(node);
386 }
387 
388 } // namespace api
389 } // namespace thrill
390 
391 #endif // !THRILL_API_REDUCE_TO_INDEX_HEADER
392 
393 /******************************************************************************/
static constexpr bool use_post_thread_
data::Stream::Writers emitters_
static DIAMemUse Max()
Definition: dia_base.hpp:60
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
core::ReduceByIndexPostPhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter, VolatileKey, ReduceConfig > post_phase_
ValueType_ ValueType
Definition: dia.hpp:152
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:240
core::ReducePrePhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, VolatileKey, data::Stream::Writer, ReduceConfig, core::ReduceByIndex< Key > > pre_phase_
typename common::FunctionTraits< KeyExtractor >::result_type Key
void reset()
release contained pointer, frees object if this is the last reference.
tag structure for ReduceToIndex()
Definition: dia.hpp:54
const char * label() const
return label() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:218
ReduceToIndexNode(const ParentDIA &parent, const char *label, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, size_t result_size, const ValueType &neutral_element, const ReduceConfig &config)
Constructor for a ReduceToIndexNode.
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
static constexpr bool use_mix_stream_
const size_t & id() const
return unique id() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213
A DIANode which performs a ReduceToIndex operation.
virtual DIAMemUse PreOpMemUse()
Amount of RAM used by PreOp after StartPreOp()
Definition: dia_base.hpp:160
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:184
const struct VolatileKeyFlag< false > NoVolatileKeyTag
global const VolatileKeyFlag instance
Definition: dia.hpp:51
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
auto ReduceToIndex(const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, size_t size, const ValueType &neutral_element=ValueType(), const ReduceConfig &reduce_config=ReduceConfig()) const
ReduceToIndex is a DOp, which groups elements of the DIA with the key_extractor returning an unsigned...
tag structure for ReduceByKey(), and ReduceToIndex()
Definition: dia.hpp:42
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
A DIANode which performs a Reduce operation.
void operator()(const ValueType &item) const
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
A reduce index function, which determines a bucket depending on the current index range [begin...
Configuration class to define operational parameters of reduce hash tables and reduce phases...
std::thread CreateThread(Args &&...args)
create a std::thread and repeat creation if it fails
Definition: porting.hpp:42
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
DIAMemUse mem_limit_
Definition: dia_base.hpp:314
StreamData::Writer Writer
Definition: stream.hpp:40
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
typename std::conditional< VolatileKey, std::pair< Key, ValueType >, ValueType >::type TableItem
tlx::CountingPtr< MixStream > MixStreamPtr
Definition: mix_stream.hpp:156
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:248
tlx::CountingPtr< CatStream > CatStreamPtr
Definition: cat_stream.hpp:191
std::thread thread_
handle to additional thread for post phase
Context & context_
associated Context
Definition: dia_base.hpp:293
Emitter for PostPhase to push elements to next DIA object.
virtual DIAMemUse PushDataMemUse()
Amount of RAM used by PushData()
Definition: dia_base.hpp:182
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:172