16 #ifndef THRILL_CORE_REDUCE_BY_HASH_POST_PHASE_HEADER 17 #define THRILL_CORE_REDUCE_BY_HASH_POST_PHASE_HEADER 38 template <
typename TableItem,
typename Key,
typename Value,
39 typename KeyExtractor,
typename ReduceFunction,
typename Emitter,
40 const bool VolatileKey,
41 typename ReduceConfig_ = DefaultReduceConfig,
42 typename IndexFunction = ReduceByHash<Key>,
43 typename KeyEqualFunction = std::equal_to<Key> >
46 static constexpr
bool debug =
false;
51 TableItem, Value, Emitter, VolatileKey>;
54 ReduceConfig::table_impl_,
55 TableItem, Key, Value,
58 IndexFunction, KeyEqualFunction>::type;
67 const KeyExtractor& key_extractor,
68 const ReduceFunction& reduce_function,
71 const IndexFunction& index_function = IndexFunction(),
72 const KeyEqualFunction& key_equal_function = KeyEqualFunction())
76 key_extractor, reduce_function,
emitter_,
79 index_function, key_equal_function) { }
87 table_.Initialize(limit_memory_bytes);
95 template <
bool DoCache>
97 LOG <<
"Flushing items";
101 std::vector<data::File> remaining_files;
107 std::vector<data::File>& files =
table_.partition_files();
109 for (
size_t id = 0;
id < files.size(); ++id)
116 table_.SpillPartition(
id);
118 LOG <<
"partition " <<
id <<
" contains " 119 << file.
num_items() <<
" partially reduced items";
121 remaining_files.emplace_back(std::move(file));
124 LOG <<
"partition " <<
id <<
" contains " 125 <<
table_.items_per_partition(
id)
126 <<
" fully reduced items";
128 table_.FlushPartitionEmit(
131 const size_t& partition_id,
const TableItem& p) {
132 if (DoCache) writer->Put(p);
139 if (remaining_files.size() == 0) {
140 LOG <<
"Flushed items directly.";
146 assert(consume &&
"Items were spilled hence Flushing must consume");
151 size_t iteration = 1;
153 while (remaining_files.size())
155 sLOG <<
"ReducePostPhase: re-reducing items from" 156 << remaining_files.size() <<
"spilled files" 157 <<
"iteration" << iteration;
158 sLOG <<
"-- Try to increase the amount of RAM to avoid this.";
160 std::vector<data::File> next_remaining_files;
166 IndexFunction(iteration,
table_.index_function()),
167 table_.key_equal_function());
169 subtable.Initialize(
table_.limit_memory_bytes());
171 size_t num_subfile = 0;
176 sLOG <<
"re-reducing subfile" << num_subfile++
177 <<
"containing" << file.num_items() <<
"items";
182 subtable.Insert(reader.
Next<TableItem>());
188 std::vector<data::File>& subfiles = subtable.partition_files();
190 for (
size_t id = 0;
id < subfiles.size(); ++id)
197 subtable.SpillPartition(
id);
199 sLOG <<
"partition" <<
id <<
"contains" 200 << subfile.
num_items() <<
"partially reduced items";
202 next_remaining_files.emplace_back(std::move(subfile));
205 sLOG <<
"partition" <<
id <<
"contains" 206 << subtable.items_per_partition(
id)
207 <<
"fully reduced items";
209 subtable.FlushPartitionEmit(
212 const size_t& partition_id,
const TableItem& p) {
213 if (DoCache) writer->Put(p);
220 remaining_files = std::move(next_remaining_files);
224 LOG <<
"Flushed items";
231 if (!
table_.has_spilled_data()) {
234 Flush<
false>(consume);
241 Flush<
true>(
true, &writer);
286 #endif // !THRILL_CORE_REDUCE_BY_HASH_POST_PHASE_HEADER ReduceConfig config_
Stored reduce config to initialize the subtable.
#define sLOG
Default logging method: output if the local debug variable is true.
Table table_
the first-level hash table implementation
ReduceConfig_ ReduceConfig
Table & table()
Returns mutable reference to first table_.
size_t num_items() const
Returns the total num of items in the table.
Type selection via ReduceTableImpl enum.
data::FilePtr cache_
File for storing data in-case we need multiple re-reduce levels.
A File is an ordered sequence of Block objects for storing items.
void reset()
release contained pointer, frees object if this is the last reference.
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
ReduceByHashPostPhase & operator=(const ReduceByHashPostPhase &)=delete
non-copyable: delete assignment operator
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
void PushData(bool consume=false)
Push data into emitter.
void Emit(const TableItem &p)
void Flush(bool consume, data::File::Writer *writer=nullptr)
Flushes all items in the whole table.
bool Insert(const TableItem &kv)
void Initialize(size_t limit_memory_bytes)
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
PhaseEmitter emitter_
Emitters used to parameterize hash table for output to next DIA node.
size_t num_items() const
Return the number of items in the file.
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
typename ReduceTableSelect< ReduceConfig::table_impl_, TableItem, Key, ValueType, KeyExtractor, ReduceFunction, PhaseEmitter, VolatileKey, ReduceConfig, thrill::core::ReduceByHash, KeyEqualFunction >::type Table
static constexpr bool debug
ReducePostPhaseEmitter< TableItem, Value, Emitter, VolatileKey > PhaseEmitter
#define LOG
Default logging method: output if the local debug variable is true.
ReduceByHashPostPhase(Context &ctx, size_t dia_id, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, const Emitter &emit, const ReduceConfig &config=ReduceConfig(), const IndexFunction &index_function=IndexFunction(), const KeyEqualFunction &key_equal_function=KeyEqualFunction())
A data structure which takes an arbitrary value and extracts a key using a key extractor function fro...