Thrill  0.1
reduce_functional.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/core/reduce_functional.hpp
3  *
4  * Hash table with support for reduce.
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
9  * Copyright (C) 2015 Alexander Noe <[email protected]>
10  * Copyright (C) 2015 Timo Bingmann <[email protected]>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_CORE_REDUCE_FUNCTIONAL_HEADER
17 #define THRILL_CORE_REDUCE_FUNCTIONAL_HEADER
18 
20 #include <thrill/common/hash.hpp>
21 #include <thrill/common/math.hpp>
22 
23 namespace thrill {
24 namespace core {
25 
26 /*!
27  * A reduce index function which returns a hash index and partition. It is used
28  * by ReduceByKey.
29  */
30 template <typename Key, typename HashFunction = std::hash<Key> >
32 {
33 public:
34  struct Result {
35  //! which partition number the item belongs to.
36  size_t partition_id;
37  //! remaining hash bits for local index
39 
40  //! calculate local index into a partition containing a hash table of
41  //! smaller size
42  size_t local_index(size_t size) const {
43  return remaining_hash % size;
44  }
45  };
46 
47  explicit ReduceByHash(
48  const HashFunction& hash_function = HashFunction())
49  : ReduceByHash(/* salt */ 0, hash_function) { }
50 
51  explicit ReduceByHash(
52  const uint64_t& salt,
53  const HashFunction& hash_function = HashFunction())
54  : salt_(salt), hash_function_(hash_function) { }
55 
57  const uint64_t& salt, const ReduceByHash& other)
58  : salt_(salt), hash_function_(other.hash_function_) { }
59 
61  const Key& k,
62  const size_t& num_partitions,
63  const size_t& /* num_buckets_per_partition */,
64  const size_t& /* num_buckets_per_table */) const {
65 
67 
68  size_t partition_id = hash % num_partitions;
69  size_t remaining_hash = hash / num_partitions;
70 
71  return Result { partition_id, remaining_hash };
72  }
73 
74 private:
75  uint64_t salt_;
76  HashFunction hash_function_;
77 };
78 
79 /*!
80  * A reduce index function, which determines a bucket depending on the current
81  * index range [begin,end). It is used by ReduceToIndex.
82  */
83 template <typename Key>
85 {
86 public:
87  struct Result {
88  //! which partition number the item belongs to.
89  size_t partition_id;
90  //! index of the item among all local partition
91  size_t global_index;
92  //! saved parameter
94 
95  //! calculate local index into a partition containing a hash table of
96  //! smaller size
97  size_t local_index(size_t size) const {
98  return global_index % num_buckets_per_partition
99  * size / num_buckets_per_partition;
100  }
101  };
102 
103  explicit ReduceByIndex(const common::Range& range)
104  : range_(range) { }
105 
106  explicit ReduceByIndex(size_t begin = 0, size_t end = 0)
107  : range_(begin, end) { }
108 
109  const common::Range& range() const { return range_; }
110 
111  void set_range(const common::Range& range) { range_ = range; }
112 
114  const Key& k,
115  const size_t& /* num_partitions */,
116  const size_t& num_buckets_per_partition,
117  const size_t& num_buckets) const {
118 
119  // assert(k >= range_.begin && k < range_.end && "Item out of range.");
120 
121  // round bucket number down
122  size_t global_index = (k - range_.begin) * num_buckets / range_.size();
123 
124  return Result {
125  global_index / num_buckets_per_partition,
126  global_index, num_buckets_per_partition
127  };
128  }
129 
130  //! inverse mapping: takes a bucket index and returns the smallest index
131  //! delivered to the bucket.
132  size_t inverse(size_t bucket, const size_t& num_buckets) {
133  // round inverse key up
134  return range_.begin +
135  (bucket * range_.size() + num_buckets - 1) / num_buckets;
136  }
137 
138  //! deliver inverse range mapping of a partition
140  size_t partition_id,
141  const size_t& num_buckets_per_partition, const size_t& num_buckets) {
142  return common::Range(
143  inverse(partition_id * num_buckets_per_partition, num_buckets),
144  inverse((partition_id + 1) * num_buckets_per_partition, num_buckets));
145  }
146 
147 private:
149 };
150 
151 /******************************************************************************/
152 
153 //! template specialization switch class to convert a Value either to Value
154 //! (identity) or to a std::pair<Key, Value> with Key generated from Value using
155 //! a key extractor for VolatileKey implementations.
156 template <typename Value, typename TableItem, bool VolatileKey>
158 
159 template <typename Value, typename TableItem>
160 class ReduceMakeTableItem<Value, TableItem, /* VolatileKey */ false>
161 {
162 public:
163  template <typename KeyExtractor>
164  static TableItem Make(const Value& v, KeyExtractor& /* key_extractor */) {
165  return v;
166  }
167 
168  template <typename KeyExtractor>
169  static auto GetKey(const TableItem& t, KeyExtractor& key_extractor) {
170  return key_extractor(t);
171  }
172 
173  template <typename ReduceFunction>
174  static auto Reduce(const TableItem& a, const TableItem& b,
175  ReduceFunction& reduce_function) {
176  return reduce_function(a, b);
177  }
178 
179  template <typename Emitter>
180  static void Put(const TableItem& p, Emitter& emit) {
181  emit(p);
182  }
183 };
184 
185 template <typename Value, typename TableItem>
186 class ReduceMakeTableItem<Value, TableItem, /* VolatileKey */ true>
187 {
188 public:
189  template <typename KeyExtractor>
190  static TableItem Make(const Value& v, KeyExtractor& key_extractor) {
191  return TableItem(key_extractor(v), v);
192  }
193 
194  template <typename KeyExtractor>
195  static auto GetKey(const TableItem& t, KeyExtractor& /* key_extractor */) {
196  return t.first;
197  }
198 
199  template <typename ReduceFunction>
200  static auto Reduce(const TableItem& a, const TableItem& b,
201  ReduceFunction& reduce_function) {
202  return TableItem(a.first, reduce_function(a.second, b.second));
203  }
204 
205  template <typename Emitter>
206  static void Put(const TableItem& p, Emitter& emit) {
207  emit(p.second);
208  }
209 };
210 
211 //! Emitter implementation to plug into a reduce hash table for
212 //! collecting/flushing items while reducing. Items flushed in the post-phase
213 //! are passed to the next DIA node for processing.
214 template <
215  typename TableItem, typename Value, typename Emitter, bool VolatileKey>
217 {
218 public:
219  explicit ReducePostPhaseEmitter(const Emitter& emit)
220  : emit_(emit) { }
221 
222  //! output an element into a partition, template specialized for VolatileKey
223  //! and non-VolatileKey types
224  void Emit(const TableItem& p) {
226  }
227 
228  //! output an element into a partition, template specialized for VolatileKey
229  //! and non-VolatileKey types
230  void Emit(const size_t& /* partition_id */, const TableItem& p) {
231  Emit(p);
232  }
233 
234 public:
235  //! Set of emitters, one per partition.
236  Emitter emit_;
237 };
238 
239 } // namespace core
240 } // namespace thrill
241 
242 #endif // !THRILL_CORE_REDUCE_FUNCTIONAL_HEADER
243 
244 /******************************************************************************/
static auto Reduce(const TableItem &a, const TableItem &b, ReduceFunction &reduce_function)
ReduceByHash(const uint64_t &salt, const HashFunction &hash_function=HashFunction())
ReduceByHash(const uint64_t &salt, const ReduceByHash &other)
size_t local_index(size_t size) const
Emitter emit_
Set of emitters, one per partition.
static auto Reduce(const TableItem &a, const TableItem &b, ReduceFunction &reduce_function)
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
static auto GetKey(const TableItem &t, KeyExtractor &key_extractor)
static auto GetKey(const TableItem &t, KeyExtractor &)
A reduce index function which returns a hash index and partition.
size_t partition_id
which partition number the item belongs to.
size_t global_index
index of the item among all local partition
Result operator()(const Key &k, const size_t &num_partitions, const size_t &, const size_t &) const
size_t inverse(size_t bucket, const size_t &num_buckets)
static void Put(const TableItem &p, Emitter &emit)
ReduceByIndex(size_t begin=0, size_t end=0)
static TableItem Make(const Value &v, KeyExtractor &key_extractor)
A reduce index function, which determines a bucket depending on the current index range [begin...
size_t partition_id
which partition number the item belongs to.
ReduceByIndex(const common::Range &range)
static uint64_t Hash128to64(const uint64_t upper, const uint64_t lower)
Definition: hash.hpp:64
size_t local_index(size_t size) const
void Emit(const size_t &, const TableItem &p)
ReduceByHash(const HashFunction &hash_function=HashFunction())
size_t num_buckets_per_partition
saved parameter
common::Range inverse_range(size_t partition_id, const size_t &num_buckets_per_partition, const size_t &num_buckets)
deliver inverse range mapping of a partition
void set_range(const common::Range &range)
static TableItem Make(const Value &v, KeyExtractor &)
static JsonLine & Put(JsonLine &line, bool const &value)
size_t remaining_hash
remaining hash bits for local index
HashCrc32< T > hash
Select a hashing method.
Definition: hash.hpp:262
static void Put(const TableItem &p, Emitter &emit)
const common::Range & range() const