13 #ifndef THRILL_CORE_REDUCE_OLD_PROBING_HASH_TABLE_HEADER 14 #define THRILL_CORE_REDUCE_OLD_PROBING_HASH_TABLE_HEADER 71 template <
typename TableItem,
typename Key,
typename Value,
72 typename KeyExtractor,
typename ReduceFunction,
typename Emitter,
73 const bool VolatileKey,
74 typename ReduceConfig_,
75 typename IndexFunction,
76 typename KeyEqualFunction = std::equal_to<Key> >
79 KeyExtractor, ReduceFunction, Emitter,
80 VolatileKey, ReduceConfig_,
81 IndexFunction, KeyEqualFunction>
84 KeyExtractor, ReduceFunction, Emitter,
85 VolatileKey, ReduceConfig_, IndexFunction,
101 bool immediate_flush =
false,
105 key_extractor, reduce_function, emitter,
106 num_partitions, config, immediate_flush,
109 assert(num_partitions > 0);
122 "limit_memory_bytes must be greater than or equal to 0. " 123 "A byte size of zero results in exactly one item per partition");
134 assert(num_buckets_ > 0);
139 double limit_fill_rate =
config_.limit_partition_fill_rate();
141 assert(limit_fill_rate >= 0.0 && limit_fill_rate <= 1.0
142 &&
"limit_partition_fill_rate must be between 0.0 and 1.0. " 143 "with a fill rate of 0.0, items are immediately flushed.");
152 items_.resize(num_buckets_ + 1);
190 sentinel =
reduce(sentinel, kv);
214 *iter =
reduce(*iter, kv);
225 if (iter == begin_iter) {
266 partition_id,
true,
true);
270 <<
" items of partition with id: " << partition_id;
286 items_.begin() + (partition_id + 1) * num_buckets_per_partition_;
288 for ( ; iter != end; ++iter)
302 LOG <<
"Spilled items of partition with id: " << partition_id;
314 size_t size_max = 0, index = 0;
337 template <
typename Emit>
339 size_t partition_id,
bool consume,
bool , Emit emit) {
342 <<
" items of partition: " << partition_id;
355 items_.begin() + (partition_id + 1) * num_buckets_per_partition_;
357 for ( ; iter != end; ++iter)
360 emit(partition_id, *iter);
374 LOG <<
"Done flushed items of partition: " << partition_id;
379 partition_id, consume, grow,
380 [
this](
const size_t& partition_id,
const TableItem& p) {
381 this->
emitter_.Emit(partition_id, p);
424 template <
typename TableItem,
typename Key,
typename Value,
425 typename KeyExtractor,
typename ReduceFunction,
426 typename Emitter,
const bool VolatileKey,
428 typename KeyEqualFunction>
431 TableItem, Key, Value, KeyExtractor, ReduceFunction,
432 Emitter, VolatileKey, ReduceConfig, IndexFunction, KeyEqualFunction>
436 TableItem, Key, Value, KeyExtractor, ReduceFunction,
438 IndexFunction, KeyEqualFunction>;
444 #endif // !THRILL_CORE_REDUCE_OLD_PROBING_HASH_TABLE_HEADER ReduceConfig config_
config of reduce table
A data structure which takes an arbitrary value and extracts a key using a key extractor function fro...
static constexpr bool debug_items
void Dispose()
Deallocate memory.
ReduceOldProbingHashTable(Context &ctx, size_t dia_id, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, Emitter &emitter, size_t num_partitions, const ReduceConfig &config=ReduceConfig(), bool immediate_flush=false, const IndexFunction &index_function=IndexFunction(), const KeyEqualFunction &key_equal_function=KeyEqualFunction())
Emitter & emitter_
Emitter object to receive items outputted to next phase.
IndexFunction index_function_
Index Calculation functions: Hash or ByIndex.
size_t num_partitions()
Returns the number of partitions.
Type selection via ReduceTableImpl enum.
const size_t num_partitions_
Number of partitions.
IndexFunction::Result calculate_index(const TableItem &kv) const
const KeyEqualFunction & key_equal_function() const
Returns key_equal_function_.
std::vector< size_t > items_per_partition_
Current number of items per partition.
size_t dia_id() const
Returns dia_id_.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
size_t limit_memory_bytes() const
Returns limit_memory_bytes_.
bool memory_exceeded
memory limit exceeded indicator
const IndexFunction & index_function() const
Returns index_function_.
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
void Dispose()
Deallocate memory.
typename std::vector< TableItem >::iterator TableItemIterator
ReduceConfig_ ReduceConfig
void SpillLargestPartition()
Spill all items of the largest partition into an external memory File.
const Emitter & emitter() const
Returns emitter_.
static constexpr size_t sentinel
a sentinel value prefixed to each allocation
void SpillAnyPartition()
Spill all items of an arbitrary partition into an external memory File.
KeyEqualFunction key_equal_function_
Comparator function for keys.
const KeyExtractor & key_extractor() const
Returns the key_extractor.
ReduceTableImpl
Enum class to select a hash table implementation.
void Initialize(size_t limit_memory_bytes)
std::vector< data::File > partition_files_
Store the files for partitions.
static constexpr bool debug
size_t num_buckets_per_partition_
Partition size, the number of buckets per partition.
Context & ctx() const
Returns the context.
size_t limit_memory_bytes_
Size of the table in bytes.
size_t limit_items_per_partition_
Number of items in a partition before the partition is spilled.
static constexpr size_t invalid_partition_
sentinel for invalid partition or no sentinel.
void vector_free(std::vector< Type > &v)
typename std::conditional< VolatileKey, std::pair< Key, Value >, Value >::type TableItem
std::vector< TableItem > items_
Storing the actual hash table.
TableItem reduce(const TableItem &a, const TableItem &b) const
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
size_t num_items_
Current number of items.
Key key(const TableItem &t) const
size_t sentinel_partition_
Common super-class for bucket and linear-probing hash/reduce tables.
void FlushPartition(size_t partition_id, bool consume, bool grow)
size_t num_items_calc() const
Returns the total num of items in the table.
const ReduceFunction & reduce_function() const
Returns the reduce_function.
bool Insert(const TableItem &kv)
Inserts a value into the table, potentially reducing it in case both the key of the value already in ...
#define LOG
Default logging method: output if the local debug variable is true.
void SpillPartition(size_t partition_id)
Spill all items of a partition into an external memory File.
void FlushPartitionEmit(size_t partition_id, bool consume, bool, Emit emit)