Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
reduce_to_index.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/reduce_to_index.hpp
3  *
4  * DIANode for a reduce operation. Performs the actual reduce operation
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Alexander Noe <[email protected]>
9  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
10  * Copyright (C) 2017 Tim Zeitz <[email protected]>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_API_REDUCE_TO_INDEX_HEADER
17 #define THRILL_API_REDUCE_TO_INDEX_HEADER
18 
19 #include <thrill/api/context.hpp>
20 #include <thrill/api/dia.hpp>
21 #include <thrill/api/dop_node.hpp>
23 #include <thrill/common/logger.hpp>
27 
28 #include <functional>
29 #include <thread>
30 #include <type_traits>
31 #include <utility>
32 #include <vector>
33 
34 namespace thrill {
35 namespace api {
36 
38 { };
39 
40 /*!
41  * A DIANode which performs a ReduceToIndex operation. ReduceToIndex groups the
42  * elements in a DIA by their key and reduces every key bucket to a single
43  * element each. The ReduceToIndexNode stores the key_extractor and the
44  * reduce_function UDFs. The chainable LOps ahead of the Reduce operation are
45  * stored in the Stack. The ReduceToIndexNode has the type ValueType, which is
46  * the result type of the reduce_function. The key type is an unsigned integer
47  * and the output DIA will have element with key K at index K.
48  *
49  * \tparam ParentType Input type of the Reduce operation
50  * \tparam ValueType Output type of the Reduce operation
51  * \tparam ParentStack Function stack, which contains the chained lambdas between the last and this DIANode.
52  * \tparam KeyExtractor Type of the key_extractor function.
53  * \tparam ReduceFunction Type of the reduce_function
54  *
55  * \ingroup api_layer
56  */
57 template <typename ValueType,
58  typename KeyExtractor, typename ReduceFunction,
59  typename ReduceConfig, bool VolatileKey, bool SkipPreReducePhase>
60 class ReduceToIndexNode final : public DOpNode<ValueType>
61 {
62  static constexpr bool debug = false;
63 
65  using Super::context_;
66 
67  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
68 
69  using TableItem =
70  typename std::conditional<
71  VolatileKey, std::pair<Key, ValueType>, ValueType>::type;
72 
74  "Key must be an unsigned integer");
75 
76  static constexpr bool use_mix_stream_ = ReduceConfig::use_mix_stream_;
77  static constexpr bool use_post_thread_ = ReduceConfig::use_post_thread_;
78 
79 private:
80  //! Emitter for PostPhase to push elements to next DIA object.
81  class Emitter
82  {
83  public:
84  explicit Emitter(ReduceToIndexNode* node) : node_(node) { }
85  void operator () (const ValueType& item) const
86  { return node_->PushItem(item); }
87 
88  private:
90  };
91 
92 public:
93  /*!
94  * Constructor for a ReduceToIndexNode. Sets the parent, stack,
95  * key_extractor and reduce_function.
96  */
97  template <typename ParentDIA>
98  ReduceToIndexNode(const ParentDIA& parent,
99  const char* label,
100  const KeyExtractor& key_extractor,
101  const ReduceFunction& reduce_function,
102  size_t result_size,
103  const ValueType& neutral_element,
104  const ReduceConfig& config)
105  : Super(parent.ctx(), label, { parent.id() }, { parent.node() }),
106  mix_stream_(use_mix_stream_ ?
107  parent.ctx().GetNewMixStream(this) : nullptr),
109  nullptr : parent.ctx().GetNewCatStream(this)),
111  mix_stream_->GetWriters() : cat_stream_->GetWriters()),
112  result_size_(result_size),
113  pre_phase_(
115  key_extractor, reduce_function, emitters_,
116  config, core::ReduceByIndex<Key>(0, result_size)),
117  post_phase_(
118  context_, Super::id(),
119  key_extractor, reduce_function, Emitter(this),
120  config, neutral_element)
121  {
122  // Hook PreOp: Locally hash elements of the current DIA onto buckets and
123  // reduce each bucket to a single value, afterwards send data to another
124  // worker given by the shuffle algorithm.
125  auto pre_op_fn = [this](const ValueType& input) {
126  if (SkipPreReducePhase)
127  pre_phase_.InsertSkip(input);
128  else
129  pre_phase_.Insert(input);
130  };
131 
132  // close the function stack with our pre op and register it at parent
133  // node for output
134  auto lop_chain = parent.stack().push(pre_op_fn).fold();
135  parent.node()->AddChild(this, lop_chain);
136  }
137 
138  DIAMemUse PreOpMemUse() final {
139  // request maximum RAM limit, the value is calculated by StageBuilder,
140  // and set as DIABase::mem_limit_.
141  return DIAMemUse::Max();
142  }
143 
144  void StartPreOp(size_t /* id */) final {
145  if (!use_post_thread_) {
146  // use pre_phase without extra thread
147  if (!SkipPreReducePhase)
148  pre_phase_.Initialize(DIABase::mem_limit_);
149  else
150  pre_phase_.InitializeSkip();
151 
152  // re-parameterize with resulting key range on this worker - this is
153  // only known after Initialize() of the pre_phase_.
154  post_phase_.SetRange(pre_phase_.key_range(context_.my_rank()));
155  }
156  else {
157  if (!SkipPreReducePhase)
158  pre_phase_.Initialize(DIABase::mem_limit_ / 2);
159  else
160  pre_phase_.InitializeSkip();
161 
162  // re-parameterize with resulting key range on this worker - this is
163  // only know after Initialize() of the pre_phase_.
164  post_phase_.SetRange(pre_phase_.key_range(context_.my_rank()));
165  post_phase_.Initialize(DIABase::mem_limit_ / 2);
166 
167  // start additional thread to receive from the channel
168  thread_ = common::CreateThread([this] { ProcessChannel(); });
169  }
170  }
171 
172  void StopPreOp(size_t /* id */) final {
173  LOG << *this << " running StopPreOp";
174  // Flush hash table before the postOp
175  if (!SkipPreReducePhase)
176  pre_phase_.FlushAll();
177  pre_phase_.CloseAll();
178  if (use_post_thread_) {
179  // waiting for the additional thread to finish the reduce
180  thread_.join();
181  // deallocate stream if already processed
182  use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
183  }
184  }
185 
186  void Execute() final { }
187 
188  DIAMemUse PushDataMemUse() final {
189  return DIAMemUse::Max();
190  }
191 
192  void PushData(bool consume) final {
193 
194  if (!use_post_thread_ && !reduced_) {
195  // not final reduced, and no additional thread, perform post reduce
196  post_phase_.Initialize(DIABase::mem_limit_);
197  ProcessChannel();
198 
199  // deallocate stream if already processed
200  use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
201 
202  reduced_ = true;
203  }
204  post_phase_.PushData(consume);
205  }
206 
207  //! process the inbound data in the post reduce phase
208  void ProcessChannel() {
209  if (use_mix_stream_)
210  {
211  auto reader = mix_stream_->GetMixReader(/* consume */ true);
212  sLOG << "reading data from" << mix_stream_->id()
213  << "to push into post table which flushes to" << this->id();
214  while (reader.HasNext()) {
215  post_phase_.Insert(reader.template Next<TableItem>());
216  }
217  }
218  else
219  {
220  auto reader = cat_stream_->GetCatReader(/* consume */ true);
221  sLOG << "reading data from" << cat_stream_->id()
222  << "to push into post table which flushes to" << this->id();
223  while (reader.HasNext()) {
224  post_phase_.Insert(reader.template Next<TableItem>());
225  }
226  }
227  }
228 
229  void Dispose() final {
230  post_phase_.Dispose();
231  }
232 
233 private:
234  // pointers for both Mix and CatStream. only one is used, the other costs
235  // only a null pointer.
236  data::MixStreamPtr mix_stream_;
238 
240 
241  size_t result_size_;
242 
243  //! handle to additional thread for post phase
244  std::thread thread_;
245 
247  TableItem, Key, ValueType, KeyExtractor, ReduceFunction, VolatileKey,
250 
252  TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter,
253  VolatileKey, ReduceConfig> post_phase_;
254 
255  bool reduced_ = false;
256 };
257 
258 template <typename ValueType, typename Stack>
259 template <typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
261  const KeyExtractor& key_extractor,
262  const ReduceFunction& reduce_function,
263  size_t size,
264  const ValueType& neutral_element,
265  const ReduceConfig& reduce_config) const {
266  // forward to main function
267  return ReduceToIndex(
269  key_extractor, reduce_function, size, neutral_element, reduce_config);
270 }
271 
272 template <typename ValueType, typename Stack>
273 template <bool VolatileKeyValue,
274  typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
277  const KeyExtractor& key_extractor,
278  const ReduceFunction& reduce_function,
279  size_t size,
280  const ValueType& neutral_element,
281  const ReduceConfig& reduce_config) const {
282  assert(IsValid());
283 
284  using DOpResult
285  = typename common::FunctionTraits<ReduceFunction>::result_type;
286 
287  static_assert(
288  std::is_convertible<
289  ValueType,
290  typename common::FunctionTraits<ReduceFunction>::template arg<0>
291  >::value,
292  "ReduceFunction has the wrong input type");
293 
294  static_assert(
295  std::is_convertible<
296  ValueType,
297  typename common::FunctionTraits<ReduceFunction>::template arg<1>
298  >::value,
299  "ReduceFunction has the wrong input type");
300 
301  static_assert(
302  std::is_same<
303  DOpResult,
304  ValueType>::value,
305  "ReduceFunction has the wrong output type");
306 
307  static_assert(
308  std::is_same<
309  typename std::decay<typename common::FunctionTraits<KeyExtractor>::
310  template arg<0> >::type,
311  ValueType>::value,
312  "KeyExtractor has the wrong input type");
313 
314  static_assert(
315  std::is_same<
316  typename common::FunctionTraits<KeyExtractor>::result_type,
317  size_t>::value,
318  "The key has to be an unsigned long int (aka. size_t).");
319 
321  DOpResult, KeyExtractor, ReduceFunction, ReduceConfig,
322  VolatileKeyValue, /* SkipPreReducePhase */ false>;
323 
324  auto node = tlx::make_counting<ReduceNode>(
325  *this, "ReduceToIndex", key_extractor, reduce_function,
326  size, neutral_element, reduce_config);
327 
328  return DIA<DOpResult>(node);
329 }
330 
331 template <typename ValueType, typename Stack>
332 template <typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
334  const struct SkipPreReducePhaseTag&,
335  const KeyExtractor& key_extractor,
336  const ReduceFunction& reduce_function,
337  size_t size,
338  const ValueType& neutral_element,
339  const ReduceConfig& reduce_config) const {
340  assert(IsValid());
341 
342  using DOpResult
343  = typename common::FunctionTraits<ReduceFunction>::result_type;
344 
345  static_assert(
346  std::is_convertible<
347  ValueType,
348  typename common::FunctionTraits<ReduceFunction>::template arg<0>
349  >::value,
350  "ReduceFunction has the wrong input type");
351 
352  static_assert(
353  std::is_convertible<
354  ValueType,
355  typename common::FunctionTraits<ReduceFunction>::template arg<1>
356  >::value,
357  "ReduceFunction has the wrong input type");
358 
359  static_assert(
360  std::is_same<
361  DOpResult,
362  ValueType>::value,
363  "ReduceFunction has the wrong output type");
364 
365  static_assert(
366  std::is_same<
367  typename std::decay<typename common::FunctionTraits<KeyExtractor>::
368  template arg<0> >::type,
369  ValueType>::value,
370  "KeyExtractor has the wrong input type");
371 
372  static_assert(
373  std::is_same<
374  typename common::FunctionTraits<KeyExtractor>::result_type,
375  size_t>::value,
376  "The key has to be an unsigned long int (aka. size_t).");
377 
379  DOpResult, KeyExtractor, ReduceFunction, ReduceConfig,
380  /* VolatileKey */ false, /* SkipPreReducePhase */ true>;
381 
382  auto node = tlx::make_counting<ReduceNode>(
383  *this, "ReduceToIndex", key_extractor, reduce_function,
384  size, neutral_element, reduce_config);
385 
386  return DIA<DOpResult>(node);
387 }
388 
389 } // namespace api
390 } // namespace thrill
391 
392 #endif // !THRILL_API_REDUCE_TO_INDEX_HEADER
393 
394 /******************************************************************************/
static constexpr bool use_post_thread_
data::Stream::Writers emitters_
static DIAMemUse Max()
Definition: dia_base.hpp:60
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
core::ReduceByIndexPostPhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter, VolatileKey, ReduceConfig > post_phase_
ValueType_ ValueType
Definition: dia.hpp:152
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:240
core::ReducePrePhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, VolatileKey, data::Stream::Writer, ReduceConfig, core::ReduceByIndex< Key > > pre_phase_
typename common::FunctionTraits< KeyExtractor >::result_type Key
void reset()
release contained pointer, frees object if this is the last reference.
tag structure for ReduceToIndex()
Definition: dia.hpp:54
const char * label() const
return label() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:218
ReduceToIndexNode(const ParentDIA &parent, const char *label, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, size_t result_size, const ValueType &neutral_element, const ReduceConfig &config)
Constructor for a ReduceToIndexNode.
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
static constexpr bool use_mix_stream_
const size_t & id() const
return unique id() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213
A DIANode which performs a ReduceToIndex operation.
virtual DIAMemUse PreOpMemUse()
Amount of RAM used by PreOp after StartPreOp()
Definition: dia_base.hpp:160
const struct VolatileKeyFlag< false > NoVolatileKeyTag
global const VolatileKeyFlag instance
Definition: dia.hpp:51
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
auto ReduceToIndex(const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, size_t size, const ValueType &neutral_element=ValueType(), const ReduceConfig &reduce_config=ReduceConfig()) const
ReduceToIndex is a DOp, which groups elements of the DIA with the key_extractor returning an unsigned...
tag structure for ReduceByKey(), and ReduceToIndex()
Definition: dia.hpp:42
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
A DIANode which performs a Reduce operation.
void operator()(const ValueType &item) const
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
A reduce index function, which determines a bucket depending on the current index range [begin...
Configuration class to define operational parameters of reduce hash tables and reduce phases...
std::thread CreateThread(Args &&...args)
create a std::thread and repeat creation if it fails
Definition: porting.hpp:42
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
DIAMemUse mem_limit_
Definition: dia_base.hpp:314
StreamData::Writer Writer
Definition: stream.hpp:39
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
typename std::conditional< VolatileKey, std::pair< Key, ValueType >, ValueType >::type TableItem
tlx::CountingPtr< MixStream > MixStreamPtr
Definition: mix_stream.hpp:156
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:248
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
tlx::CountingPtr< CatStream > CatStreamPtr
Definition: cat_stream.hpp:191
std::thread thread_
handle to additional thread for post phase
Context & context_
associated Context
Definition: dia_base.hpp:293
Emitter for PostPhase to push elements to next DIA object.
virtual DIAMemUse PushDataMemUse()
Amount of RAM used by PushData()
Definition: dia_base.hpp:182