Thrill  0.1
reduce_table.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/core/reduce_table.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
7  * Copyright (C) 2016 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_CORE_REDUCE_TABLE_HEADER
14 #define THRILL_CORE_REDUCE_TABLE_HEADER
15 
16 #include <thrill/api/context.hpp>
18 
19 #include <tlx/vector_free.hpp>
20 
21 #include <algorithm>
22 #include <functional>
23 #include <limits>
24 #include <utility>
25 #include <vector>
26 
27 namespace thrill {
28 namespace core {
29 
30 //! Enum class to select a hash table implementation.
31 enum class ReduceTableImpl {
33 };
34 
35 /*!
36  * Configuration class to define operational parameters of reduce hash tables
37  * and reduce phases. Most members can be defined static constexpr or be mutable
38  * variables. Not all members need to be used by all implementations.
39  */
41 {
42 public:
43  //! limit on the fill rate of a reduce table partition prior to triggering a
44  //! flush.
45  double limit_partition_fill_rate_ = 0.5;
46 
47  //! only for BucketHashTable: ratio of number of buckets in a partition
48  //! relative to the maximum possible number.
49  double bucket_rate_ = 0.6;
50 
51  //! select the hash table in the reduce phase by enum
52  static constexpr ReduceTableImpl table_impl_ = ReduceTableImpl::PROBING;
53 
54  //! only for growing ProbingHashTable: items initially in a partition.
55  static constexpr size_t initial_items_per_partition_ = 512;
56 
57  //! only for BucketHashTable: size of a block in the bucket chain in bytes
58  //! (must be a static constexpr)
59  static constexpr size_t bucket_block_size_ = 512;
60 
61  //! use MixStream instead of CatStream in ReduceNodes: this makes the order
62  //! of items delivered in the ReduceFunction arbitrary.
63  static constexpr bool use_mix_stream_ = true;
64 
65  //! use an additional thread in ReduceNode and ReduceToIndexNode to process
66  //! the pre and post phases simultaneously.
67  static constexpr bool use_post_thread_ = true;
68 
69  //! \name Accessors
70  //! \{
71 
72  //! Returns limit_partition_fill_rate_
74  { return limit_partition_fill_rate_; }
75 
76  //! Returns bucket_rate_
77  double bucket_rate() const { return bucket_rate_; }
78 
79  //! \}
80 };
81 
82 /*!
83  * DefaultReduceConfig with implementation type selection
84  */
85 template <ReduceTableImpl table_impl>
87 {
88 public:
89  //! select the hash table in the reduce phase by enum
90  static constexpr ReduceTableImpl table_impl_ = table_impl;
91 };
92 
93 /*!
94  * Common super-class for bucket and linear-probing hash/reduce tables. It
95  * contains partitioning parameters, statistics, and the output files.
96  */
97 template <typename ValueType, typename Key, typename Value,
98  typename KeyExtractor, typename ReduceFunction, typename Emitter,
99  const bool VolatileKey,
100  typename ReduceConfig_, typename IndexFunction,
101  typename KeyEqualFunction = std::equal_to<Key> >
103 {
104 public:
105  static constexpr bool debug = false;
106 
107  using ReduceConfig = ReduceConfig_;
108  using TableItem =
109  typename std::conditional<
110  VolatileKey, std::pair<Key, Value>, Value>::type;
112 
114  Context& ctx, size_t dia_id,
115  const KeyExtractor& key_extractor,
116  const ReduceFunction& reduce_function,
117  Emitter& emitter,
118  size_t num_partitions,
119  const ReduceConfig& config,
120  bool immediate_flush,
121  const IndexFunction& index_function,
122  const KeyEqualFunction& key_equal_function)
123  : ctx_(ctx), dia_id_(dia_id),
124  key_extractor_(key_extractor),
125  reduce_function_(reduce_function),
126  emitter_(emitter),
127  index_function_(index_function),
128  key_equal_function_(key_equal_function),
129  num_partitions_(num_partitions),
130  config_(config),
131  immediate_flush_(immediate_flush),
132  items_per_partition_(num_partitions_, 0) {
133 
134  assert(num_partitions > 0);
135 
136  // allocate Files for each partition to spill into. TODO(tb): switch to
137  // FilePtr ondemand
138 
139  if (!immediate_flush_) {
140  for (size_t i = 0; i < num_partitions_; i++) {
141  partition_files_.push_back(ctx.GetFile(dia_id_));
142  }
143  }
144  }
145 
146  //! non-copyable: delete copy-constructor
147  ReduceTable(const ReduceTable&) = delete;
148  //! non-copyable: delete assignment operator
149  ReduceTable& operator = (const ReduceTable&) = delete;
150 
151  //! Deallocate memory
152  void Dispose() {
153  tlx::vector_free(partition_files_);
154  tlx::vector_free(items_per_partition_);
155  }
156 
157  //! Initialize table for SkipPreReducePhase
158  void InitializeSkip() {
159  // num_partitions_ == number of workers
160  num_buckets_per_partition_ = 1;
161  num_buckets_ = num_buckets_per_partition_ * num_partitions_;
162 
163  assert(num_buckets_per_partition_ > 0);
164  assert(num_buckets_ > 0);
165  }
166 
167  //! \name Accessors
168  //! \{
169 
170  //! Returns the context
171  Context& ctx() const { return ctx_; }
172 
173  //! Returns dia_id_
174  size_t dia_id() const { return dia_id_; }
175 
176  //! Returns the key_extractor
177  const KeyExtractor& key_extractor() const { return key_extractor_; }
178 
179  //! Returns the reduce_function
180  const ReduceFunction& reduce_function() const { return reduce_function_; }
181 
182  //! Returns emitter_
183  const Emitter& emitter() const { return emitter_; }
184 
185  //! Returns index_function_
186  const IndexFunction& index_function() const { return index_function_; }
187 
188  //! Returns index_function_ (mutable)
189  IndexFunction& index_function() { return index_function_; }
190 
191  //! Returns key_equal_function_
192  const KeyEqualFunction& key_equal_function() const
193  { return key_equal_function_; }
194 
195  //! Returns the vector of partition files.
196  std::vector<data::File>& partition_files() { return partition_files_; }
197 
198  //! Returns the number of partitions
199  size_t num_partitions() { return num_partitions_; }
200 
201  //! Returns num_buckets_
202  size_t num_buckets() const { return num_buckets_; }
203 
204  //! Returns num_buckets_per_partition_
206  { return num_buckets_per_partition_; }
207 
208  //! Returns limit_memory_bytes_
209  size_t limit_memory_bytes() const { return limit_memory_bytes_; }
210 
211  //! Returns limit_items_per_partition_
213  { return limit_items_per_partition_; }
214 
215  //! Returns items_per_partition_
216  size_t items_per_partition(size_t id) const {
217  assert(id < items_per_partition_.size());
218  return items_per_partition_[id];
219  }
220 
221  //! Returns the total num of items in the table.
222  size_t num_items() const {
223  return num_items_;
224  }
225 
226  //! Returns the total num of items in the table.
227  size_t num_items_calc() const {
228  size_t total_num_items = 0;
229  for (size_t num_items : items_per_partition_) {
230  total_num_items += num_items;
231  }
232 
233  return total_num_items;
234  }
235 
236  //! calculate key range for the given output partition
237  common::Range key_range(size_t partition_id) {
238  return index_function().inverse_range(
239  partition_id, num_buckets_per_partition_, num_buckets_);
240  }
241 
242  //! returns whether and partition has spilled data into external memory.
243  bool has_spilled_data() const {
244  for (const data::File& file : partition_files_) {
245  if (file.num_items()) return true;
246  }
247  return false;
248  }
249 
250  bool has_spilled_data_on_partition(size_t partition_id) {
251  return partition_files_[partition_id].num_items() != 0;
252  }
253 
254  //! \}
255 
256  //! \name Switches for VolatileKey
257  //! \{
258 
259  Key key(const TableItem& t) const {
260  return MakeTableItem::GetKey(t, key_extractor_);
261  }
262 
263  TableItem reduce(const TableItem& a, const TableItem& b) const {
264  return MakeTableItem::Reduce(a, b, reduce_function_);
265  }
266 
267  typename IndexFunction::Result calculate_index(const TableItem& kv) const {
268  return index_function_(
269  key(kv), num_partitions_, num_buckets_per_partition_, num_buckets_);
270  }
271 
272  //! \}
273 
274 protected:
275  //! Context
277 
278  //! Associated DIA id
279  size_t dia_id_;
280 
281  //! Key extractor function for extracting a key from a value.
282  KeyExtractor key_extractor_;
283 
284  //! Reduce function for reducing two values.
285  ReduceFunction reduce_function_;
286 
287  //! Emitter object to receive items outputted to next phase.
288  Emitter& emitter_;
289 
290  //! Index Calculation functions: Hash or ByIndex.
291  IndexFunction index_function_;
292 
293  //! Comparator function for keys.
294  KeyEqualFunction key_equal_function_;
295 
296  //! Store the files for partitions.
297  std::vector<data::File> partition_files_;
298 
299  //! \name Fixed Operational Parameters
300  //! \{
301 
302  //! Number of partitions
303  const size_t num_partitions_;
304 
305  //! config of reduce table
307 
308  //! Size of the table, which is the number of slots / buckets / entries
309  //! available for items or chains of items.
310  size_t num_buckets_;
311 
312  //! Partition size, the number of buckets per partition.
314 
315  //! Size of the table in bytes
316  size_t limit_memory_bytes_ = 0;
317 
318  //! Number of items in a partition before the partition is spilled.
320 
321  //! Whether to spill overfull partitions to disk or to immediately flush to
322  //! next phase.
324 
325  //! \}
326 
327  //! \name Current Statistical Parameters
328  //! \{
329 
330  //! Current number of items.
331  size_t num_items_ = 0;
332 
333  //! Current number of items per partition.
334  std::vector<size_t> items_per_partition_;
335 
336  //! \}
337 };
338 
339 //! Type selection via ReduceTableImpl enum
340 template <ReduceTableImpl ImplSelect,
341  typename ValueType, typename Key, typename Value,
342  typename KeyExtractor, typename ReduceFunction,
343  typename Emitter,
344  const bool VolatileKey = false,
346  typename IndexFunction = ReduceByHash<Key>,
347  typename KeyEqualFunction = std::equal_to<Key> >
349 
350 } // namespace core
351 } // namespace thrill
352 
353 #endif // !THRILL_CORE_REDUCE_TABLE_HEADER
354 
355 /******************************************************************************/
ReduceConfig config_
config of reduce table
ReduceTable(Context &ctx, size_t dia_id, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, Emitter &emitter, size_t num_partitions, const ReduceConfig &config, bool immediate_flush, const IndexFunction &index_function, const KeyEqualFunction &key_equal_function)
size_t dia_id_
Associated DIA id.
double bucket_rate() const
Returns bucket_rate_.
bool has_spilled_data() const
returns whether and partition has spilled data into external memory.
void Dispose()
Deallocate memory.
Emitter & emitter_
Emitter object to receive items outputted to next phase.
void InitializeSkip()
Initialize table for SkipPreReducePhase.
IndexFunction index_function_
Index Calculation functions: Hash or ByIndex.
size_t num_partitions()
Returns the number of partitions.
Type selection via ReduceTableImpl enum.
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
const size_t num_partitions_
Number of partitions.
IndexFunction::Result calculate_index(const TableItem &kv) const
const KeyEqualFunction & key_equal_function() const
Returns key_equal_function_.
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
std::vector< size_t > items_per_partition_
Current number of items per partition.
size_t dia_id() const
Returns dia_id_.
size_t limit_memory_bytes() const
Returns limit_memory_bytes_.
const IndexFunction & index_function() const
Returns index_function_.
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
const Emitter & emitter() const
Returns emitter_.
common::Range key_range(size_t partition_id)
calculate key range for the given output partition
DefaultReduceConfig with implementation type selection.
IndexFunction & index_function()
Returns index_function_ (mutable)
A reduce index function which returns a hash index and partition.
double limit_partition_fill_rate() const
Returns limit_partition_fill_rate_.
KeyEqualFunction key_equal_function_
Comparator function for keys.
std::vector< data::File > & partition_files()
Returns the vector of partition files.
const KeyExtractor & key_extractor() const
Returns the key_extractor.
ReduceTableImpl
Enum class to select a hash table implementation.
std::vector< data::File > partition_files_
Store the files for partitions.
Configuration class to define operational parameters of reduce hash tables and reduce phases...
ReduceFunction reduce_function_
Reduce function for reducing two values.
size_t limit_items_per_partition() const
Returns limit_items_per_partition_.
size_t num_buckets_per_partition_
Partition size, the number of buckets per partition.
Context & ctx() const
Returns the context.
static constexpr bool debug
Context & ctx_
Context.
size_t limit_items_per_partition_
Number of items in a partition before the partition is spilled.
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:283
size_t num_buckets() const
Returns num_buckets_.
void vector_free(std::vector< Type > &v)
Definition: vector_free.hpp:21
bool has_spilled_data_on_partition(size_t partition_id)
size_t num_items() const
Returns the total num of items in the table.
TableItem reduce(const TableItem &a, const TableItem &b) const
Key key(const TableItem &t) const
Common super-class for bucket and linear-probing hash/reduce tables.
size_t num_items_calc() const
Returns the total num of items in the table.
const ReduceFunction & reduce_function() const
Returns the reduce_function.
size_t num_buckets_per_partition() const
Returns num_buckets_per_partition_.
KeyExtractor key_extractor_
Key extractor function for extracting a key from a value.
size_t items_per_partition(size_t id) const
Returns items_per_partition_.