15 #ifndef THRILL_CORE_REDUCE_BY_INDEX_POST_PHASE_HEADER 16 #define THRILL_CORE_REDUCE_BY_INDEX_POST_PHASE_HEADER 38 template <
typename TableItem,
typename Key,
typename Value,
39 typename KeyExtractor,
typename ReduceFunction,
typename Emitter,
40 const bool VolatileKey,
41 typename ReduceConfig_ = DefaultReduceConfig>
44 static constexpr
bool debug =
false;
50 TableItem, Value, Emitter, VolatileKey>;
60 const KeyExtractor& key_extractor,
61 const ReduceFunction& reduce_function,
62 const Emitter& emitter,
64 const Value& neutral_element = Value())
89 if (
range_.
size() *
sizeof(TableItem) < limit_memory_bytes) {
95 LOG <<
"ReduceByIndexPostPhase()" 96 <<
" limit_memory_bytes_=" << limit_memory_bytes_
97 <<
" num_subranges_=" << 0
103 1 + (
range_.
size() *
sizeof(TableItem) / limit_memory_bytes);
110 LOG <<
"ReduceByIndexPostPhase()" 111 <<
" limit_memory_bytes_=" << limit_memory_bytes_
121 for (
size_t partition = 1; partition <
num_subranges_; partition++) {
123 auto writer = file->GetWriter();
127 LOG <<
"ReduceByIndexPostPhase()" 128 <<
" partition=" << partition
129 <<
" subrange=" << subrange;
139 size_t item_key =
key(kv);
142 LOG <<
"Insert() item_key=" << item_key
146 if (item_key < range_.end) {
181 LOG <<
"Insert() item_key=" << item_key
183 <<
" subrange=" << subrange;
185 assert(item_key >= subrange.begin && item_key < subrange.end);
192 assert(!pwriter || consume);
222 FlushAndConsume<true>(pwriter);
228 for (
size_t i = 0; i <
subranges_.size(); ++i) {
230 ReduceFunction, Emitter, VolatileKey,
240 while (reader.HasNext()) {
241 subtable.Insert(reader.template Next<TableItem>());
245 subtable.PushData(consume || pwriter, pwriter);
262 for (
auto it =
items_.begin(); it !=
items_.end(); ++it) {
267 template <
bool DoCache = false>
269 for (
auto it =
items_.begin(); it !=
items_.end(); ++it) {
271 if (DoCache) { writer->Put(*it); }
278 Key
key(
const TableItem& t) {
282 TableItem
reduce(
const TableItem& a,
const TableItem& b) {
347 #endif // !THRILL_CORE_REDUCE_BY_INDEX_POST_PHASE_HEADER size_t dia_id_
Associated DIA id.
size_t size() const
size of range
TableItem reduce(const TableItem &a, const TableItem &b)
size_t limit_memory_bytes_
Size of the table in bytes.
void SetRange(const common::Range &range)
Sets the range of indexes to be handled by this index table.
Key key(const TableItem &t)
std::vector< data::FilePtr > subrange_files_
Subranges external Files.
void PushData(bool consume=false, data::File::Writer *pwriter=nullptr)
Emitter emit_
Set of emitters, one per partition.
Value neutral_element_
neutral element to fill holes in output
void Initialize(size_t limit_memory_bytes)
size_t num_subranges_
number of subranges
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
represents a 1 dimensional range (interval) [begin,end)
Range Partition(size_t i, size_t parts) const
bool IsValid() const
valid non-empty range (begin < end)
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
PhaseEmitter emitter_
Emitters used to parameterize hash table for output to next DIA node.
ReduceByIndexPostPhase(Context &ctx, size_t dia_id, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, const Emitter &emitter, const ReduceConfig &config=ReduceConfig(), const Value &neutral_element=Value())
A data structure which takes an arbitrary value and extracts an index using a key extractor function ...
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
static constexpr bool debug
KeyExtractor key_extractor_
Key extractor function for extracting a key from a value.
size_t neutral_element_key_
The index where the neutral element would go if acutally inserted.
ReduceConfig config_
Stored reduce config to initialize the subtable.
void Emit(const TableItem &p)
common::Range full_range_
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
bool Insert(const TableItem &kv)
std::vector< common::Range > subranges_
Subranges.
std::vector< TableItem > items_
data::FilePtr GetFilePtr(size_t dia_id)
void vector_free(std::vector< Type > &v)
ReduceFunction reduce_function_
Reduce function for reducing two values.
std::vector< data::File::Writer > subrange_writers_
Subranges external File Writers.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
size_t FindPartition(size_t index, size_t parts) const
bool neutral_element_index_occupied_
Is there an actual element at the index of the neutral element?
void Close()
Explicitly close the writer.
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
data::FilePtr cache_
File for storing data in-case we need multiple re-reduce levels.
#define LOG
Default logging method: output if the local debug variable is true.
bool IsEmpty() const
range is empty (begin == end)
ReduceByIndexPostPhase & operator=(const ReduceByIndexPostPhase &)=delete
non-copyable: delete assignment operator
void FlushAndConsume(data::File::Writer *writer=nullptr)
ReduceConfig_ ReduceConfig