14 #ifndef THRILL_CORE_REDUCE_PROBING_HASH_TABLE_HEADER 15 #define THRILL_CORE_REDUCE_PROBING_HASH_TABLE_HEADER 71 template <
typename TableItem,
typename Key,
typename Value,
72 typename KeyExtractor,
typename ReduceFunction,
typename Emitter,
73 const bool VolatileKey,
74 typename ReduceConfig_,
75 typename IndexFunction,
76 typename KeyEqualFunction = std::equal_to<Key> >
79 KeyExtractor, ReduceFunction, Emitter,
80 VolatileKey, ReduceConfig_,
81 IndexFunction, KeyEqualFunction>
84 KeyExtractor, ReduceFunction, Emitter,
85 VolatileKey, ReduceConfig_, IndexFunction,
100 bool immediate_flush =
false,
104 key_extractor, reduce_function, emitter,
105 num_partitions, config, immediate_flush,
107 { assert(num_partitions > 0); }
120 "limit_memory_bytes must be greater than or equal to 0. " 121 "A byte size of zero results in exactly one item per partition");
132 assert(num_buckets_ > 0);
142 double limit_fill_rate =
config_.limit_partition_fill_rate();
144 assert(limit_fill_rate >= 0.0 && limit_fill_rate <= 1.0
145 &&
"limit_partition_fill_rate must be between 0.0 and 1.0. " 146 "with a fill rate of 0.0, items are immediately flushed.");
158 items_ =
static_cast<TableItem*
>(
159 operator new ((num_buckets_ + 1) *
sizeof(TableItem)));
165 for ( ; iter != pend; ++iter)
205 sentinel =
reduce(sentinel, kv);
226 TableItem* begin_iter = pbegin + local_index;
227 TableItem* iter = begin_iter;
233 *iter =
reduce(*iter, kv);
260 LOG <<
"Grow due to " 263 <<
" among " << partition_size_[h.partition_id];
280 for ( ; iter != pend; ++iter)
313 TableItem* iter = pbegin;
314 TableItem* pend = pbegin + old_size;
316 bool passed_first_half =
false;
317 bool found_hole =
false;
318 while (!passed_first_half || !found_hole) {
319 Key item_key =
key(*iter);
324 TableItem item = std::move(*iter);
330 found_hole = passed_first_half && is_empty;
331 passed_first_half = passed_first_half || iter == pend;
349 sLOG <<
"Growing partition" << partition_id
351 <<
"limit_items" << new_size *
config_.limit_partition_fill_rate();
358 TableItem* pend = pbegin + new_size;
360 for ( ; iter != pend; ++iter)
363 partition_size_[partition_id] = new_size;
365 = new_size *
config_.limit_partition_fill_rate();
380 <<
" items of partition with id: " << partition_id;
396 for ( ; iter != pend; ++iter) {
408 LOG <<
"Spilled items of partition with id: " << partition_id;
420 size_t size_max = 0, index = 0;
443 template <
typename Emit>
445 size_t partition_id,
bool consume,
bool grow, Emit emit) {
448 <<
" items of partition: " << partition_id;
461 for ( ; iter != pend; ++iter)
464 emit(partition_id, *iter);
478 LOG <<
"Done flushed items of partition: " << partition_id;
486 partition_id, consume, grow,
487 [
this](
const size_t& partition_id,
const TableItem& p) {
488 this->
emitter_.Emit(partition_id, p);
537 template <
typename TableItem,
typename Key,
typename Value,
538 typename KeyExtractor,
typename ReduceFunction,
539 typename Emitter,
const bool VolatileKey,
541 typename KeyEqualFunction>
544 TableItem, Key, Value, KeyExtractor, ReduceFunction,
545 Emitter, VolatileKey, ReduceConfig, IndexFunction, KeyEqualFunction>
549 TableItem, Key, Value, KeyExtractor, ReduceFunction,
551 IndexFunction, KeyEqualFunction>;
557 #endif // !THRILL_CORE_REDUCE_PROBING_HASH_TABLE_HEADER ReduceConfig config_
config of reduce table
~ReduceProbingHashTable()
#define sLOG
Default logging method: output if the local debug variable is true.
void Dispose()
Deallocate memory.
ReduceProbingHashTable(Context &ctx, size_t dia_id, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, Emitter &emitter, size_t num_partitions, const ReduceConfig &config=ReduceConfig(), bool immediate_flush=false, const IndexFunction &index_function=IndexFunction(), const KeyEqualFunction &key_equal_function=KeyEqualFunction())
Emitter & emitter_
Emitter object to receive items outputted to next phase.
size_t sentinel_partition_
IndexFunction index_function_
Index Calculation functions: Hash or ByIndex.
size_t num_partitions()
Returns the number of partitions.
std::vector< size_t > limit_items_per_partition_
Type selection via ReduceTableImpl enum.
const size_t num_partitions_
Number of partitions.
IndexFunction::Result calculate_index(const TableItem &kv) const
void SpillAnyPartition()
Spill all items of an arbitrary partition into an external memory File.
const KeyEqualFunction & key_equal_function() const
Returns key_equal_function_.
void FlushPartitionEmit(size_t partition_id, bool consume, bool grow, Emit emit)
ReduceConfig_ ReduceConfig
std::vector< size_t > items_per_partition_
Current number of items per partition.
size_t dia_id() const
Returns dia_id_.
A data structure which takes an arbitrary value and extracts a key using a key extractor function fro...
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
size_t limit_memory_bytes() const
Returns limit_memory_bytes_.
bool memory_exceeded
memory limit exceeded indicator
const IndexFunction & index_function() const
Returns index_function_.
static constexpr size_t invalid_partition_
sentinel for invalid partition or no sentinel.
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
const Emitter & emitter() const
Returns emitter_.
static constexpr size_t sentinel
a sentinel value prefixed to each allocation
static constexpr bool debug_items
void SpillPartition(size_t partition_id)
Spill all items of a partition into an external memory File.
KeyEqualFunction key_equal_function_
Comparator function for keys.
const KeyExtractor & key_extractor() const
Returns the key_extractor.
ReduceTableImpl
Enum class to select a hash table implementation.
bool Insert(const TableItem &kv)
Inserts a value into the table, potentially reducing it in case both the key of the value already in ...
std::vector< data::File > partition_files_
Store the files for partitions.
void GrowAndRehash(size_t partition_id)
static constexpr bool debug
size_t num_buckets_per_partition_
Partition size, the number of buckets per partition.
Context & ctx() const
Returns the context.
size_t limit_memory_bytes_
Size of the table in bytes.
TableItem * items_
Storing the actual hash table.
std::vector< size_t > partition_size_
Current sizes of the partitions because the valid allocated areas grow.
typename std::conditional< VolatileKey, std::pair< Key, Value >, Value >::type TableItem
static uint_pair min()
return an uint_pair instance containing the smallest value possible
void Initialize(size_t limit_memory_bytes)
TableItem reduce(const TableItem &a, const TableItem &b) const
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
void SpillLargestPartition()
Spill all items of the largest partition into an external memory File.
size_t num_items_
Current number of items.
void GrowPartition(size_t partition_id)
Grow a partition after a spill or flush (if possible)
Key key(const TableItem &t) const
void Dispose()
Deallocate items and memory.
Common super-class for bucket and linear-probing hash/reduce tables.
size_t num_items_calc() const
Returns the total num of items in the table.
const ReduceFunction & reduce_function() const
Returns the reduce_function.
#define LOG
Default logging method: output if the local debug variable is true.
void FlushPartition(size_t partition_id, bool consume, bool grow)