13 #ifndef THRILL_CORE_REDUCE_BUCKET_HASH_TABLE_HEADER 14 #define THRILL_CORE_REDUCE_BUCKET_HASH_TABLE_HEADER 84 template <
typename TableItem,
typename Key,
typename Value,
85 typename KeyExtractor,
typename ReduceFunction,
typename Emitter,
86 const bool VolatileKey,
87 typename ReduceConfig,
typename IndexFunction,
88 typename KeyEqualFunction = std::equal_to<Key> >
91 KeyExtractor, ReduceFunction, Emitter,
92 VolatileKey, ReduceConfig, IndexFunction,
96 KeyExtractor, ReduceFunction, Emitter,
105 = ReduceConfig::bucket_block_size_;
143 bool immediate_flush =
false,
147 key_extractor, reduce_function, emitter,
148 num_partitions, config, immediate_flush,
151 assert(num_partitions > 0);
163 "limit_memory_bytes must be greater than or equal to 0. " 164 "a byte size of zero results in exactly one item per partition");
177 double limit_fill_rate =
config_.limit_partition_fill_rate();
179 assert(limit_fill_rate >= 0.0 && limit_fill_rate <= 1.0
180 &&
"limit_partition_fill_rate must be between 0.0 and 1.0. " 181 "with a fill rate of 0.0, items are immediately flushed.");
194 double bucket_rate =
config_.bucket_rate();
196 assert(bucket_rate >= 0.0 &&
197 "bucket_rate must be greater than or equal 0. " 198 "a bucket rate of 0.0 causes exactly 1 bucket per partition.");
224 assert(num_buckets_ > 0);
227 sLOG <<
"num_partitions_" << num_partitions_
231 buckets_.resize(num_buckets_,
nullptr);
271 size_t global_index =
275 while (current !=
nullptr)
279 bi != current->
items + current->
size; ++bi)
288 current = current->
next;
316 <<
"h.partition_id" << h.partition_id;
323 <<
"items_per_partition_[" << h.partition_id <<
"]" 339 while (current !=
nullptr)
344 operator delete (current);
370 partition_id,
true,
true);
374 <<
"items of partition" << partition_id
386 buckets_.begin() + (partition_id + 1) * num_buckets_per_partition_;
388 for ( ; iter != end; ++iter)
392 while (current !=
nullptr)
395 bi != current->
items + current->
size; ++bi)
415 sLOG <<
"Spilled items of partition" << partition_id;
421 size_t size_max = 0, index = 0;
467 template <
typename Emit>
469 size_t partition_id,
bool consume,
bool , Emit emit) {
472 <<
" items of partition: " << partition_id;
479 buckets_.begin() + (partition_id + 1) * num_buckets_per_partition_;
481 for ( ; iter != end; ++iter)
485 while (current !=
nullptr)
488 bi != current->
items + current->
size; ++bi)
490 emit(partition_id, *bi);
502 current = current->
next;
517 LOG <<
"Done flushing items of partition: " << partition_id;
522 partition_id, consume, grow,
523 [
this](
const size_t& partition_id,
const TableItem& p) {
524 this->
emitter_.Emit(partition_id, p);
580 place->
next =
nullptr;
590 free.push(static_cast<BucketBlock*>(o));
594 while (!
free.empty()) {
595 free.top()->destroy_items();
596 operator delete (
free.top());
654 template <
typename TableItem,
typename Key,
typename Value,
655 typename KeyExtractor,
typename ReduceFunction,
656 typename Emitter,
const bool VolatileKey,
658 typename KeyEqualFunction>
661 TableItem, Key, Value, KeyExtractor, ReduceFunction,
662 Emitter, VolatileKey, ReduceConfig, IndexFunction, KeyEqualFunction>
666 TableItem, Key, Value, KeyExtractor, ReduceFunction,
668 IndexFunction, KeyEqualFunction>;
674 #endif // !THRILL_CORE_REDUCE_BUCKET_HASH_TABLE_HEADER ReduceConfig config_
config of reduce table
A data structure which takes an arbitrary value and extracts a key using a key extractor function fro...
#define sLOG
Default logging method: output if the local debug variable is true.
BucketBlockPool to stack allocated BucketBlocks.
static uint_pair max()
return an uint_pair instance containing the largest value possible
void Dispose()
Deallocate memory.
static constexpr size_t block_size_
Emitter & emitter_
Emitter object to receive items outputted to next phase.
typename std::vector< BucketBlock * >::iterator BucketBlockIterator
IndexFunction index_function_
Index Calculation functions: Hash or ByIndex.
void SpillAnyPartition()
Spill all items of an arbitrary partition into an external memory File.
size_t num_partitions()
Returns the number of partitions.
Type selection via ReduceTableImpl enum.
ReduceBucketHashTable(Context &ctx, size_t dia_id, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, Emitter &emitter, size_t num_partitions, const ReduceConfig &config=ReduceConfig(), bool immediate_flush=false, const IndexFunction &index_function=IndexFunction(), const KeyEqualFunction &key_equal_function=KeyEqualFunction())
const size_t num_partitions_
Number of partitions.
IndexFunction::Result calculate_index(const TableItem &kv) const
const KeyEqualFunction & key_equal_function() const
Returns key_equal_function_.
bool Insert(const TableItem &kv)
Inserts a value into the table, potentially reducing it in case both the key of the value already in ...
std::vector< size_t > items_per_partition_
Current number of items per partition.
void SpillLargestPartition()
Spill all items of the largest partition into an external memory File.
size_t dia_id() const
Returns dia_id_.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
size_t limit_memory_bytes() const
Returns limit_memory_bytes_.
void Initialize(size_t limit_memory_bytes)
Construct the hash table itself. fill it with sentinels.
bool memory_exceeded
memory limit exceeded indicator
const IndexFunction & index_function() const
Returns index_function_.
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
ReduceConfig ReduceConfig
const Emitter & emitter() const
Returns emitter_.
BucketBlock * next
link of linked list to next block
std::stack< BucketBlock * > free
void Deallocate(BucketBlock *o)
KeyEqualFunction key_equal_function_
Comparator function for keys.
const KeyExtractor & key_extractor() const
Returns the key_extractor.
ReduceTableImpl
Enum class to select a hash table implementation.
static constexpr size_t bucket_block_size
target number of bytes in a BucketBlock.
Block holding reduce key/value pairs.
std::vector< data::File > partition_files_
Store the files for partitions.
ReduceBucketHashTable & operator=(const ReduceBucketHashTable &)=delete
non-copyable: delete assignment operator
void SpillPartition(size_t partition_id)
Spill all items of a partition into an external memory File.
std::vector< BucketBlock * > buckets_
Storing the items.
static constexpr bool debug
size_t num_buckets_per_partition_
Partition size, the number of buckets per partition.
Context & ctx() const
Returns the context.
size_t limit_memory_bytes_
Size of the table in bytes.
size_t limit_items_per_partition_
Number of items in a partition before the partition is spilled.
void destroy_items()
helper to destroy all allocated items
void vector_free(std::vector< Type > &v)
typename std::conditional< VolatileKey, std::pair< Key, Value >, Value >::type TableItem
void FlushPartitionEmit(size_t partition_id, bool consume, bool, Emit emit)
TableItem reduce(const TableItem &a, const TableItem &b) const
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
size_t max_blocks_per_partition_
Maximal number of blocks per partition.
BucketBlockPool block_pool_
Bucket block pool.
size_t num_items_
Current number of items.
Key key(const TableItem &t) const
Common super-class for bucket and linear-probing hash/reduce tables.
size_t num_blocks() const
Returns the number of block in the table.
void free(void *ptr) NOEXCEPT
exported free symbol that overrides loading from libc
size_t num_items_calc() const
Returns the total num of items in the table.
const ReduceFunction & reduce_function() const
Returns the reduce_function.
#define LOG
Default logging method: output if the local debug variable is true.
void FlushPartition(size_t partition_id, bool consume, bool grow)
TableItem items[block_size_]
memory area of items
size_t num_blocks_
Total number of blocks in the table.
static constexpr bool debug_items
#define LOGC(cond)
Explicitly specify the condition for logging.
size_t limit_blocks_
Number of blocks in the table before some items are spilled.
void Dispose()
Deallocate memory.
size_t max_items_per_partition_
Maximal number of items per partition.
void SpillSmallestPartition()