16 #ifndef THRILL_CORE_REDUCE_FUNCTIONAL_HEADER 17 #define THRILL_CORE_REDUCE_FUNCTIONAL_HEADER 30 template <
typename Key,
typename HashFunction = std::hash<Key> >
43 return remaining_hash % size;
48 const HashFunction& hash_function = HashFunction())
53 const HashFunction& hash_function = HashFunction())
62 const size_t& num_partitions,
64 const size_t& )
const {
83 template <
typename Key>
98 return global_index % num_buckets_per_partition
99 * size / num_buckets_per_partition;
107 : range_(begin, end) { }
116 const size_t& num_buckets_per_partition,
117 const size_t& num_buckets)
const {
122 size_t global_index = (k - range_.begin) * num_buckets / range_.size();
125 global_index / num_buckets_per_partition,
126 global_index, num_buckets_per_partition
132 size_t inverse(
size_t bucket,
const size_t& num_buckets) {
134 return range_.begin +
135 (bucket * range_.size() + num_buckets - 1) / num_buckets;
141 const size_t& num_buckets_per_partition,
const size_t& num_buckets) {
143 inverse(partition_id * num_buckets_per_partition, num_buckets),
144 inverse((partition_id + 1) * num_buckets_per_partition, num_buckets));
156 template <
typename Value,
typename TableItem,
bool VolatileKey>
159 template <
typename Value,
typename TableItem>
163 template <
typename KeyExtractor>
164 static TableItem
Make(
const Value& v, KeyExtractor& ) {
168 template <
typename KeyExtractor>
169 static auto GetKey(
const TableItem& t, KeyExtractor& key_extractor) {
170 return key_extractor(t);
173 template <
typename ReduceFunction>
174 static auto Reduce(
const TableItem& a,
const TableItem& b,
175 ReduceFunction& reduce_function) {
176 return reduce_function(a, b);
179 template <
typename Emitter>
180 static void Put(
const TableItem& p, Emitter& emit) {
185 template <
typename Value,
typename TableItem>
189 template <
typename KeyExtractor>
190 static TableItem
Make(
const Value& v, KeyExtractor& key_extractor) {
191 return TableItem(key_extractor(v), v);
194 template <
typename KeyExtractor>
195 static auto GetKey(
const TableItem& t, KeyExtractor& ) {
199 template <
typename ReduceFunction>
200 static auto Reduce(
const TableItem& a,
const TableItem& b,
201 ReduceFunction& reduce_function) {
202 return TableItem(a.first, reduce_function(a.second, b.second));
205 template <
typename Emitter>
206 static void Put(
const TableItem& p, Emitter& emit) {
215 typename TableItem,
typename Value,
typename Emitter,
bool VolatileKey>
224 void Emit(
const TableItem& p) {
230 void Emit(
const size_t& ,
const TableItem& p) {
242 #endif // !THRILL_CORE_REDUCE_FUNCTIONAL_HEADER static auto Reduce(const TableItem &a, const TableItem &b, ReduceFunction &reduce_function)
ReduceByHash(const uint64_t &salt, const HashFunction &hash_function=HashFunction())
ReduceByHash(const uint64_t &salt, const ReduceByHash &other)
size_t local_index(size_t size) const
Emitter emit_
Set of emitters, one per partition.
static auto Reduce(const TableItem &a, const TableItem &b, ReduceFunction &reduce_function)
represents a 1 dimensional range (interval) [begin,end)
ReducePostPhaseEmitter(const Emitter &emit)
static auto GetKey(const TableItem &t, KeyExtractor &key_extractor)
static auto GetKey(const TableItem &t, KeyExtractor &)
void Emit(const TableItem &p)
A reduce index function which returns a hash index and partition.
size_t partition_id
which partition number the item belongs to.
HashFunction hash_function_
size_t global_index
index of the item among all local partition
Result operator()(const Key &k, const size_t &num_partitions, const size_t &, const size_t &) const
size_t inverse(size_t bucket, const size_t &num_buckets)
static void Put(const TableItem &p, Emitter &emit)
ReduceByIndex(size_t begin=0, size_t end=0)
static TableItem Make(const Value &v, KeyExtractor &key_extractor)
A reduce index function, which determines a bucket depending on the current index range [begin...
size_t partition_id
which partition number the item belongs to.
ReduceByIndex(const common::Range &range)
static uint64_t Hash128to64(const uint64_t upper, const uint64_t lower)
size_t local_index(size_t size) const
void Emit(const size_t &, const TableItem &p)
ReduceByHash(const HashFunction &hash_function=HashFunction())
size_t num_buckets_per_partition
saved parameter
common::Range inverse_range(size_t partition_id, const size_t &num_buckets_per_partition, const size_t &num_buckets)
deliver inverse range mapping of a partition
void set_range(const common::Range &range)
static TableItem Make(const Value &v, KeyExtractor &)
static JsonLine & Put(JsonLine &line, bool const &value)
size_t remaining_hash
remaining hash bits for local index
HashCrc32< T > hash
Select a hashing method.
static void Put(const TableItem &p, Emitter &emit)
const common::Range & range() const