13 #ifndef THRILL_CORE_REDUCE_TABLE_HEADER 14 #define THRILL_CORE_REDUCE_TABLE_HEADER 45 double limit_partition_fill_rate_ = 0.5;
49 double bucket_rate_ = 0.6;
55 static constexpr
size_t initial_items_per_partition_ = 512;
59 static constexpr
size_t bucket_block_size_ = 512;
63 static constexpr
bool use_mix_stream_ =
true;
67 static constexpr
bool use_post_thread_ =
true;
74 {
return limit_partition_fill_rate_; }
85 template <ReduceTableImpl table_impl>
97 template <
typename ValueType,
typename Key,
typename Value,
98 typename KeyExtractor,
typename ReduceFunction,
typename Emitter,
99 const bool VolatileKey,
100 typename ReduceConfig_,
typename IndexFunction,
101 typename KeyEqualFunction = std::equal_to<Key> >
105 static constexpr
bool debug =
false;
109 typename std::conditional<
110 VolatileKey, std::pair<Key, Value>, Value>::type;
115 const KeyExtractor& key_extractor,
116 const ReduceFunction& reduce_function,
118 size_t num_partitions,
120 bool immediate_flush,
121 const IndexFunction& index_function,
122 const KeyEqualFunction& key_equal_function)
123 : ctx_(ctx), dia_id_(dia_id),
124 key_extractor_(key_extractor),
125 reduce_function_(reduce_function),
127 index_function_(index_function),
128 key_equal_function_(key_equal_function),
129 num_partitions_(num_partitions),
131 immediate_flush_(immediate_flush),
132 items_per_partition_(num_partitions_, 0) {
134 assert(num_partitions > 0);
139 if (!immediate_flush_) {
140 for (
size_t i = 0; i < num_partitions_; i++) {
141 partition_files_.push_back(ctx.
GetFile(dia_id_));
160 num_buckets_per_partition_ = 1;
161 num_buckets_ = num_buckets_per_partition_ * num_partitions_;
163 assert(num_buckets_per_partition_ > 0);
164 assert(num_buckets_ > 0);
174 size_t dia_id()
const {
return dia_id_; }
183 const Emitter&
emitter()
const {
return emitter_; }
193 {
return key_equal_function_; }
206 {
return num_buckets_per_partition_; }
213 {
return limit_items_per_partition_; }
217 assert(
id < items_per_partition_.size());
218 return items_per_partition_[id];
228 size_t total_num_items = 0;
229 for (
size_t num_items : items_per_partition_) {
230 total_num_items += num_items;
233 return total_num_items;
238 return index_function().inverse_range(
239 partition_id, num_buckets_per_partition_, num_buckets_);
244 for (
const data::File& file : partition_files_) {
245 if (file.num_items())
return true;
251 return partition_files_[partition_id].num_items() != 0;
260 return MakeTableItem::GetKey(t, key_extractor_);
264 return MakeTableItem::Reduce(a, b, reduce_function_);
268 return index_function_(
269 key(kv), num_partitions_, num_buckets_per_partition_, num_buckets_);
316 size_t limit_memory_bytes_ = 0;
331 size_t num_items_ = 0;
341 typename ValueType,
typename Key,
typename Value,
342 typename KeyExtractor,
typename ReduceFunction,
344 const bool VolatileKey =
false,
347 typename KeyEqualFunction = std::equal_to<Key> >
353 #endif // !THRILL_CORE_REDUCE_TABLE_HEADER ReduceConfig config_
config of reduce table
ReduceTable(Context &ctx, size_t dia_id, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, Emitter &emitter, size_t num_partitions, const ReduceConfig &config, bool immediate_flush, const IndexFunction &index_function, const KeyEqualFunction &key_equal_function)
size_t dia_id_
Associated DIA id.
double bucket_rate() const
Returns bucket_rate_.
bool has_spilled_data() const
returns whether and partition has spilled data into external memory.
void Dispose()
Deallocate memory.
Emitter & emitter_
Emitter object to receive items outputted to next phase.
void InitializeSkip()
Initialize table for SkipPreReducePhase.
IndexFunction index_function_
Index Calculation functions: Hash or ByIndex.
size_t num_partitions()
Returns the number of partitions.
Type selection via ReduceTableImpl enum.
A File is an ordered sequence of Block objects for storing items.
const size_t num_partitions_
Number of partitions.
IndexFunction::Result calculate_index(const TableItem &kv) const
const KeyEqualFunction & key_equal_function() const
Returns key_equal_function_.
represents a 1 dimensional range (interval) [begin,end)
std::vector< size_t > items_per_partition_
Current number of items per partition.
size_t dia_id() const
Returns dia_id_.
size_t limit_memory_bytes() const
Returns limit_memory_bytes_.
const IndexFunction & index_function() const
Returns index_function_.
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
ReduceConfig_ ReduceConfig
const Emitter & emitter() const
Returns emitter_.
common::Range key_range(size_t partition_id)
calculate key range for the given output partition
DefaultReduceConfig with implementation type selection.
IndexFunction & index_function()
Returns index_function_ (mutable)
A reduce index function which returns a hash index and partition.
double limit_partition_fill_rate() const
Returns limit_partition_fill_rate_.
KeyEqualFunction key_equal_function_
Comparator function for keys.
std::vector< data::File > & partition_files()
Returns the vector of partition files.
const KeyExtractor & key_extractor() const
Returns the key_extractor.
ReduceTableImpl
Enum class to select a hash table implementation.
std::vector< data::File > partition_files_
Store the files for partitions.
Configuration class to define operational parameters of reduce hash tables and reduce phases...
ReduceFunction reduce_function_
Reduce function for reducing two values.
size_t limit_items_per_partition() const
Returns limit_items_per_partition_.
size_t num_buckets_per_partition_
Partition size, the number of buckets per partition.
Context & ctx() const
Returns the context.
static constexpr bool debug
size_t limit_items_per_partition_
Number of items in a partition before the partition is spilled.
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
size_t num_buckets() const
Returns num_buckets_.
void vector_free(std::vector< Type > &v)
typename std::conditional< VolatileKey, std::pair< Key, Value >, Value >::type TableItem
bool has_spilled_data_on_partition(size_t partition_id)
size_t num_items() const
Returns the total num of items in the table.
TableItem reduce(const TableItem &a, const TableItem &b) const
Key key(const TableItem &t) const
Common super-class for bucket and linear-probing hash/reduce tables.
size_t num_items_calc() const
Returns the total num of items in the table.
const ReduceFunction & reduce_function() const
Returns the reduce_function.
size_t num_buckets_per_partition() const
Returns num_buckets_per_partition_.
KeyExtractor key_extractor_
Key extractor function for extracting a key from a value.
size_t items_per_partition(size_t id) const
Returns items_per_partition_.