16 #ifndef THRILL_CORE_REDUCE_PRE_PHASE_HEADER 17 #define THRILL_CORE_REDUCE_PRE_PHASE_HEADER 45 template <
typename TableItem,
bool VolatileKey,
typename BlockWriter>
48 static constexpr
bool debug =
false;
53 stats_(writer.size(), 0) { }
57 void Emit(
const size_t& partition_id,
const TableItem& p) {
58 assert(partition_id <
writer_.size());
63 void Flush(
size_t partition_id) {
64 assert(partition_id <
writer_.size());
69 sLOG <<
"emit stats:";
71 for (BlockWriter& e :
writer_) {
73 sLOG <<
"emitter" << i <<
"pushed" <<
stats_[i++];
85 template <
typename TableItem,
typename Key,
typename Value,
86 typename KeyExtractor,
typename ReduceFunction,
87 const bool VolatileKey,
91 typename KeyEqualFunction = std::equal_to<Key>,
92 typename HashFunction = std::hash<Key>,
93 bool UseDuplicateDetection =
false>
96 template <
typename TableItem,
typename Key,
typename Value,
97 typename KeyExtractor,
typename ReduceFunction,
98 const bool VolatileKey,
typename BlockWriter,
99 typename ReduceConfig_,
100 typename IndexFunction,
101 typename KeyEqualFunction,
102 typename HashFunction>
104 KeyExtractor, ReduceFunction,
105 VolatileKey, BlockWriter,
112 static constexpr
bool debug =
false;
120 ReduceConfig::table_impl_,
121 TableItem, Key, Value,
122 KeyExtractor, ReduceFunction,
Emitter,
131 size_t num_partitions,
132 KeyExtractor key_extractor,
133 ReduceFunction reduce_function,
134 std::vector<BlockWriter>& emit,
135 const ReduceConfig& config = ReduceConfig(),
136 const IndexFunction& index_function = IndexFunction(),
137 const KeyEqualFunction& key_equal_function = KeyEqualFunction(),
138 const HashFunction hash_function = HashFunction(),
139 bool duplicates =
false)
141 key_extractor_(key_extractor),
143 key_extractor, reduce_function, emit_,
144 num_partitions, config, !duplicates,
145 index_function, key_equal_function) {
149 sLOG <<
"creating ReducePrePhase with" << emit.size() <<
"output emitters";
151 assert(num_partitions == emit.size());
160 table_.Initialize(limit_memory_bytes);
164 table_.InitializeSkip();
169 return table_.Insert(MakeTableItem::Make(v, table_.key_extractor()));
173 TableItem t = MakeTableItem::Make(v, table_.key_extractor());
174 typename IndexFunction::Result h = table_.calculate_index(t);
175 emit_.Emit(h.partition_id, t);
180 for (
size_t id = 0;
id < table_.num_partitions(); ++id) {
181 FlushPartition(
id,
true,
false);
187 table_.FlushPartition(partition_id, consume, grow);
205 {
return table_.key_range(partition_id); }
220 template <
typename TableItem,
typename Key,
typename Value,
221 typename KeyExtractor,
typename ReduceFunction,
222 const bool VolatileKey,
typename BlockWriter,
223 typename ReduceConfig,
224 typename IndexFunction,
225 typename EqualToFunction,
226 typename HashFunction>
251 ReduceFunction, VolatileKey, BlockWriter,
253 IndexFunction, EqualToFunction, HashFunction,
258 size_t num_partitions,
259 KeyExtractor key_extractor,
260 ReduceFunction reduce_function,
261 std::vector<BlockWriter>& emit,
262 const ReduceConfig& config = ReduceConfig(),
263 const IndexFunction& index_function = IndexFunction(),
264 const EqualToFunction& equal_to_function = EqualToFunction(),
265 const HashFunction hash_function = HashFunction())
266 :
Super(ctx, dia_id, num_partitions, key_extractor, reduce_function,
267 emit, config, index_function, equal_to_function, hash_function,
269 hash_function_(hash_function) { }
272 if (Super::table_.Insert(
273 Super::MakeTableItem::Make(v, Super::table_.key_extractor()))) {
274 hashes_.push_back(hash_function_(Super::key_extractor_(v)));
284 Super::table_.dia_id());
286 for (
size_t id = 0;
id < Super::table_.num_partitions(); ++id) {
287 FlushPartition(
id,
true,
false);
292 Super::table_.FlushPartitionEmit(
293 partition_id, consume, grow,
294 [
this](
const size_t& partition_id,
const TableItem& ti) {
295 Key key = Super::MakeTableItem::GetKey(
296 ti, Super::table_.key_extractor());
297 if (!non_duplicates_[hash_function_(key) % max_hash_]) {
299 duplicated_elements_++;
300 Super::emit_.Emit(partition_id, ti);
303 non_duplicate_elements_++;
304 Super::emit_.Emit(Super::table_.ctx().my_rank(), ti);
308 if (Super::table_.has_spilled_data_on_partition(partition_id)) {
310 Super::table_.partition_files()[partition_id].GetReader(
true);
312 TableItem ti = reader.
Next<TableItem>();
313 Key key = Super::MakeTableItem::GetKey(
314 ti, Super::table_.key_extractor());
315 if (!non_duplicates_[hash_function_(key) % max_hash_]) {
317 duplicated_elements_++;
318 Super::emit_.Emit(partition_id, ti);
321 non_duplicate_elements_++;
322 Super::emit_.Emit(Super::table_.ctx().my_rank(), ti);
328 Super::emit_.Flush(partition_id);
329 Super::emit_.Flush(Super::table_.ctx().my_rank());
344 size_t duplicated_elements_ = 0;
345 size_t non_duplicate_elements_ = 0;
353 #endif // !THRILL_CORE_REDUCE_PRE_PHASE_HEADER
std::vector< size_t > stats_
Emitter stats.
#define sLOG
Default logging method: output if the local debug variable is true.
void Flush(size_t partition_id)
Type selection via ReduceTableImpl enum.
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
represents a 1 dimensional range (interval) [begin,end)
size_t FindNonDuplicates(std::vector< bool > &non_duplicates, std::vector< size_t > &hashes, Context &context, size_t dia_id)
Identifies all hashes which occur on only a single worker.
void Emit(const size_t &partition_id, const TableItem &p)
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
std::vector< BlockWriter > & writer_
Set of emitters, one per partition.
A reduce index function which returns a hash index and partition.
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
Configuration class to define operational parameters of reduce hash tables and reduce phases...
static constexpr bool debug
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
ReducePrePhaseEmitter(std::vector< BlockWriter > &writer)
Duplicate detection to identify all elements occuring only on one worker.