Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
reduce_table.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/core/reduce_table.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
7  * Copyright (C) 2016 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_CORE_REDUCE_TABLE_HEADER
14 #define THRILL_CORE_REDUCE_TABLE_HEADER
15 
16 #include <thrill/api/context.hpp>
18 
19 #include <algorithm>
20 #include <functional>
21 #include <limits>
22 #include <utility>
23 #include <vector>
24 
25 namespace thrill {
26 namespace core {
27 
28 //! Enum class to select a hash table implementation.
29 enum class ReduceTableImpl {
31 };
32 
33 /*!
34  * Configuration class to define operational parameters of reduce hash tables
35  * and reduce phases. Most members can be defined static constexpr or be mutable
36  * variables. Not all members need to be used by all implementations.
37  */
39 {
40 public:
41  //! limit on the fill rate of a reduce table partition prior to triggering a
42  //! flush.
44 
45  //! only for BucketHashTable: ratio of number of buckets in a partition
46  //! relative to the maximum possible number.
47  double bucket_rate_ = 0.6;
48 
49  //! select the hash table in the reduce phase by enum
51 
52  //! only for growing ProbingHashTable: items initially in a partition.
53  static constexpr size_t initial_items_per_partition_ = 512;
54 
55  //! only for BucketHashTable: size of a block in the bucket chain in bytes
56  //! (must be a static constexpr)
57  static constexpr size_t bucket_block_size_ = 512;
58 
59  //! use MixStream instead of CatStream in ReduceNodes: this makes the order
60  //! of items delivered in the ReduceFunction arbitrary.
61  static constexpr bool use_mix_stream_ = true;
62 
63  //! use an additional thread in ReduceNode and ReduceToIndexNode to process
64  //! the pre and post phases simultaneously.
65  static constexpr bool use_post_thread_ = true;
66 
67  //! \name Accessors
68  //! \{
69 
70  //! Returns limit_partition_fill_rate_
72  { return limit_partition_fill_rate_; }
73 
74  //! Returns bucket_rate_
75  double bucket_rate() const { return bucket_rate_; }
76 
77  //! \}
78 };
79 
80 /*!
81  * DefaultReduceConfig with implementation type selection
82  */
83 template <ReduceTableImpl table_impl>
85 {
86 public:
87  //! select the hash table in the reduce phase by enum
88  static constexpr ReduceTableImpl table_impl_ = table_impl;
89 };
90 
91 /*!
92  * Common super-class for bucket and linear-probing hash/reduce tables. It
93  * contains partitioning parameters, statistics, and the output files.
94  */
95 template <typename ValueType, typename Key, typename Value,
96  typename KeyExtractor, typename ReduceFunction, typename Emitter,
97  const bool VolatileKey,
98  typename ReduceConfig_, typename IndexFunction,
99  typename KeyEqualFunction = std::equal_to<Key> >
101 {
102 public:
103  static constexpr bool debug = false;
104 
105  using ReduceConfig = ReduceConfig_;
106  using TableItem =
107  typename std::conditional<
108  VolatileKey, std::pair<Key, Value>, Value>::type;
110 
112  Context& ctx, size_t dia_id,
113  const KeyExtractor& key_extractor,
114  const ReduceFunction& reduce_function,
115  Emitter& emitter,
116  size_t num_partitions,
117  const ReduceConfig& config,
118  bool immediate_flush,
119  const IndexFunction& index_function,
120  const KeyEqualFunction& key_equal_function)
121  : ctx_(ctx), dia_id_(dia_id),
122  key_extractor_(key_extractor),
123  reduce_function_(reduce_function),
124  emitter_(emitter),
125  index_function_(index_function),
126  key_equal_function_(key_equal_function),
127  num_partitions_(num_partitions),
128  config_(config),
129  immediate_flush_(immediate_flush),
131 
132  assert(num_partitions > 0);
133 
134  // allocate Files for each partition to spill into. TODO(tb): switch to
135  // FilePtr ondemand
136 
137  if (!immediate_flush_) {
138  for (size_t i = 0; i < num_partitions_; i++) {
139  partition_files_.push_back(ctx.GetFile(dia_id_));
140  }
141  }
142  }
143 
144  //! non-copyable: delete copy-constructor
145  ReduceTable(const ReduceTable&) = delete;
146  //! non-copyable: delete assignment operator
147  ReduceTable& operator = (const ReduceTable&) = delete;
148 
149  //! Deallocate memory
150  void Dispose() {
151  std::vector<data::File>().swap(partition_files_);
152  std::vector<size_t>().swap(items_per_partition_);
153  }
154 
155  //! Initialize table for SkipPreReducePhase
156  void InitializeSkip() {
157  // num_partitions_ == number of workers
160 
161  assert(num_buckets_per_partition_ > 0);
162  assert(num_buckets_ > 0);
163  }
164 
165  //! \name Accessors
166  //! \{
167 
168  //! Returns the context
169  Context& ctx() const { return ctx_; }
170 
171  //! Returns dia_id_
172  size_t dia_id() const { return dia_id_; }
173 
174  //! Returns the key_extractor
175  const KeyExtractor& key_extractor() const { return key_extractor_; }
176 
177  //! Returns the reduce_function
178  const ReduceFunction& reduce_function() const { return reduce_function_; }
179 
180  //! Returns emitter_
181  const Emitter& emitter() const { return emitter_; }
182 
183  //! Returns index_function_
184  const IndexFunction& index_function() const { return index_function_; }
185 
186  //! Returns index_function_ (mutable)
187  IndexFunction& index_function() { return index_function_; }
188 
189  //! Returns key_equal_function_
190  const KeyEqualFunction& key_equal_function() const
191  { return key_equal_function_; }
192 
193  //! Returns the vector of partition files.
194  std::vector<data::File>& partition_files() { return partition_files_; }
195 
196  //! Returns the number of partitions
197  size_t num_partitions() { return num_partitions_; }
198 
199  //! Returns num_buckets_
200  size_t num_buckets() const { return num_buckets_; }
201 
202  //! Returns num_buckets_per_partition_
204  { return num_buckets_per_partition_; }
205 
206  //! Returns limit_memory_bytes_
207  size_t limit_memory_bytes() const { return limit_memory_bytes_; }
208 
209  //! Returns limit_items_per_partition_
211  { return limit_items_per_partition_; }
212 
213  //! Returns items_per_partition_
214  size_t items_per_partition(size_t id) const {
215  assert(id < items_per_partition_.size());
216  return items_per_partition_[id];
217  }
218 
219  //! Returns the total num of items in the table.
220  size_t num_items() const {
221  return num_items_;
222  }
223 
224  //! Returns the total num of items in the table.
225  size_t num_items_calc() const {
226  size_t total_num_items = 0;
227  for (size_t num_items : items_per_partition_) {
228  total_num_items += num_items;
229  }
230 
231  return total_num_items;
232  }
233 
234  //! calculate key range for the given output partition
235  common::Range key_range(size_t partition_id) {
236  return index_function().inverse_range(
238  }
239 
240  //! returns whether and partition has spilled data into external memory.
241  bool has_spilled_data() const {
242  for (const data::File& file : partition_files_) {
243  if (file.num_items()) return true;
244  }
245  return false;
246  }
247 
248  bool has_spilled_data_on_partition(size_t partition_id) {
249  return partition_files_[partition_id].num_items() != 0;
250  }
251 
252  //! \}
253 
254  //! \name Switches for VolatileKey
255  //! \{
256 
257  Key key(const TableItem& t) const {
258  return MakeTableItem::GetKey(t, key_extractor_);
259  }
260 
261  TableItem reduce(const TableItem& a, const TableItem& b) const {
262  return MakeTableItem::Reduce(a, b, reduce_function_);
263  }
264 
265  typename IndexFunction::Result calculate_index(const TableItem& kv) const {
266  return index_function_(
268  }
269 
270  //! \}
271 
272 protected:
273  //! Context
274  Context& ctx_;
275 
276  //! Associated DIA id
277  size_t dia_id_;
278 
279  //! Key extractor function for extracting a key from a value.
280  KeyExtractor key_extractor_;
281 
282  //! Reduce function for reducing two values.
283  ReduceFunction reduce_function_;
284 
285  //! Emitter object to receive items outputted to next phase.
286  Emitter& emitter_;
287 
288  //! Index Calculation functions: Hash or ByIndex.
289  IndexFunction index_function_;
290 
291  //! Comparator function for keys.
292  KeyEqualFunction key_equal_function_;
293 
294  //! Store the files for partitions.
295  std::vector<data::File> partition_files_;
296 
297  //! \name Fixed Operational Parameters
298  //! \{
299 
300  //! Number of partitions
301  const size_t num_partitions_;
302 
303  //! config of reduce table
305 
306  //! Size of the table, which is the number of slots / buckets / entries
307  //! available for items or chains of items.
308  size_t num_buckets_;
309 
310  //! Partition size, the number of buckets per partition.
312 
313  //! Size of the table in bytes
315 
316  //! Number of items in a partition before the partition is spilled.
318 
319  //! Whether to spill overfull partitions to disk or to immediately flush to
320  //! next phase.
322 
323  //! \}
324 
325  //! \name Current Statistical Parameters
326  //! \{
327 
328  //! Current number of items.
329  size_t num_items_ = 0;
330 
331  //! Current number of items per partition.
332  std::vector<size_t> items_per_partition_;
333 
334  //! \}
335 };
336 
337 //! Type selection via ReduceTableImpl enum
338 template <ReduceTableImpl ImplSelect,
339  typename ValueType, typename Key, typename Value,
340  typename KeyExtractor, typename ReduceFunction,
341  typename Emitter,
342  const bool VolatileKey = false,
343  typename ReduceConfig = DefaultReduceConfig,
344  typename IndexFunction = ReduceByHash<Key>,
345  typename KeyEqualFunction = std::equal_to<Key> >
347 
348 } // namespace core
349 } // namespace thrill
350 
351 #endif // !THRILL_CORE_REDUCE_TABLE_HEADER
352 
353 /******************************************************************************/
ReduceConfig config_
config of reduce table
ReduceTable(Context &ctx, size_t dia_id, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, Emitter &emitter, size_t num_partitions, const ReduceConfig &config, bool immediate_flush, const IndexFunction &index_function, const KeyEqualFunction &key_equal_function)
IndexFunction::Result calculate_index(const TableItem &kv) const
size_t dia_id_
Associated DIA id.
TableItem reduce(const TableItem &a, const TableItem &b) const
bool has_spilled_data() const
returns whether and partition has spilled data into external memory.
size_t num_items_calc() const
Returns the total num of items in the table.
void Dispose()
Deallocate memory.
size_t limit_memory_bytes() const
Returns limit_memory_bytes_.
Emitter & emitter_
Emitter object to receive items outputted to next phase.
const KeyEqualFunction & key_equal_function() const
Returns key_equal_function_.
void InitializeSkip()
Initialize table for SkipPreReducePhase.
IndexFunction index_function_
Index Calculation functions: Hash or ByIndex.
size_t num_partitions()
Returns the number of partitions.
const KeyExtractor & key_extractor() const
Returns the key_extractor.
Type selection via ReduceTableImpl enum.
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
const size_t num_partitions_
Number of partitions.
static constexpr bool use_post_thread_
static constexpr ReduceTableImpl table_impl_
select the hash table in the reduce phase by enum
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
std::vector< size_t > items_per_partition_
Current number of items per partition.
const IndexFunction & index_function() const
Returns index_function_.
ReduceTable & operator=(const ReduceTable &)=delete
non-copyable: delete assignment operator
size_t dia_id() const
Returns dia_id_.
common::Range key_range(size_t partition_id)
calculate key range for the given output partition
DefaultReduceConfig with implementation type selection.
IndexFunction & index_function()
Returns index_function_ (mutable)
A reduce index function which returns a hash index and partition.
size_t num_buckets_per_partition() const
Returns num_buckets_per_partition_.
KeyEqualFunction key_equal_function_
Comparator function for keys.
const ReduceFunction & reduce_function() const
Returns the reduce_function.
std::vector< data::File > & partition_files()
Returns the vector of partition files.
size_t limit_items_per_partition() const
Returns limit_items_per_partition_.
double bucket_rate() const
Returns bucket_rate_.
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
ReduceTableImpl
Enum class to select a hash table implementation.
static constexpr bool use_mix_stream_
std::vector< data::File > partition_files_
Store the files for partitions.
Context & ctx() const
Returns the context.
static constexpr bool debug
Configuration class to define operational parameters of reduce hash tables and reduce phases...
ReduceFunction reduce_function_
Reduce function for reducing two values.
size_t num_buckets_per_partition_
Partition size, the number of buckets per partition.
size_t limit_memory_bytes_
Size of the table in bytes.
Context & ctx_
Context.
size_t num_items() const
Returns the total num of items in the table.
size_t limit_items_per_partition_
Number of items in a partition before the partition is spilled.
double limit_partition_fill_rate() const
Returns limit_partition_fill_rate_.
static constexpr size_t initial_items_per_partition_
only for growing ProbingHashTable: items initially in a partition.
size_t num_buckets() const
Returns num_buckets_.
bool has_spilled_data_on_partition(size_t partition_id)
static constexpr ReduceTableImpl table_impl_
select the hash table in the reduce phase by enum
static constexpr size_t bucket_block_size_
size_t items_per_partition(size_t id) const
Returns items_per_partition_.
size_t num_items_
Current number of items.
Common super-class for bucket and linear-probing hash/reduce tables.
Key key(const TableItem &t) const
const Emitter & emitter() const
Returns emitter_.
KeyExtractor key_extractor_
Key extractor function for extracting a key from a value.