16 #ifndef THRILL_API_REDUCE_TO_INDEX_HEADER 17 #define THRILL_API_REDUCE_TO_INDEX_HEADER 30 #include <type_traits> 57 template <
typename ValueType,
58 typename KeyExtractor,
typename ReduceFunction,
59 typename ReduceConfig,
bool VolatileKey,
bool SkipPreReducePhase>
62 static constexpr
bool debug =
false;
65 using Super::context_;
67 using Key =
typename common::FunctionTraits<KeyExtractor>::result_type;
70 typename std::conditional<
71 VolatileKey, std::pair<Key, ValueType>, ValueType>::type;
74 "Key must be an unsigned integer");
85 void operator () (
const ValueType& item)
const 86 {
return node_->PushItem(item); }
97 template <
typename ParentDIA>
100 const KeyExtractor& key_extractor,
101 const ReduceFunction& reduce_function,
103 const ValueType& neutral_element,
104 const ReduceConfig& config)
105 :
Super(parent.ctx(), label, { parent.id() }, { parent.node() }),
106 mix_stream_(use_mix_stream_ ?
107 parent.ctx().GetNewMixStream(
this) :
nullptr),
108 cat_stream_(use_mix_stream_ ?
109 nullptr : parent.ctx().GetNewCatStream(
this)),
110 emitters_(use_mix_stream_ ?
111 mix_stream_->GetWriters() : cat_stream_->GetWriters()),
112 result_size_(result_size),
114 context_, Super::dia_id(), context_.num_workers(),
115 key_extractor, reduce_function, emitters_,
118 context_, Super::dia_id(),
119 key_extractor, reduce_function,
Emitter(
this),
120 config, neutral_element) {
124 auto pre_op_fn = [
this](
const ValueType& input) {
125 if (SkipPreReducePhase)
126 pre_phase_.InsertSkip(input);
128 pre_phase_.Insert(input);
133 auto lop_chain = parent.stack().push(pre_op_fn).fold();
134 parent.node()->
AddChild(
this, lop_chain);
144 if (!use_post_thread_) {
146 if (!SkipPreReducePhase)
149 pre_phase_.InitializeSkip();
153 post_phase_.SetRange(pre_phase_.key_range(context_.my_rank()));
156 if (!SkipPreReducePhase)
159 pre_phase_.InitializeSkip();
163 post_phase_.SetRange(pre_phase_.key_range(context_.my_rank()));
172 LOG << *
this <<
" running StopPreOp";
174 if (!SkipPreReducePhase)
175 pre_phase_.FlushAll();
176 pre_phase_.CloseAll();
177 if (use_post_thread_) {
181 use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
193 if (!use_post_thread_ && !reduced_) {
199 use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
203 post_phase_.PushData(consume);
210 auto reader = mix_stream_->GetMixReader(
true);
211 sLOG <<
"reading data from" << mix_stream_->id()
212 <<
"to push into post table which flushes to" << this->dia_id();
213 while (reader.HasNext()) {
214 post_phase_.Insert(reader.template Next<TableItem>());
219 auto reader = cat_stream_->GetCatReader(
true);
220 sLOG <<
"reading data from" << cat_stream_->id()
221 <<
"to push into post table which flushes to" << this->dia_id();
222 while (reader.HasNext()) {
223 post_phase_.Insert(reader.template Next<TableItem>());
229 post_phase_.Dispose();
246 TableItem,
Key, ValueType, KeyExtractor, ReduceFunction, VolatileKey,
251 TableItem, Key, ValueType, KeyExtractor, ReduceFunction,
Emitter,
254 bool reduced_ =
false;
257 template <
typename ValueType,
typename Stack>
258 template <
typename KeyExtractor,
typename ReduceFunction,
typename ReduceConfig>
260 const KeyExtractor& key_extractor,
261 const ReduceFunction& reduce_function,
264 const ReduceConfig& reduce_config)
const {
266 return ReduceToIndex(
268 key_extractor, reduce_function, size, neutral_element, reduce_config);
271 template <
typename ValueType,
typename Stack>
272 template <
bool VolatileKeyValue,
273 typename KeyExtractor,
typename ReduceFunction,
typename ReduceConfig>
276 const KeyExtractor& key_extractor,
277 const ReduceFunction& reduce_function,
280 const ReduceConfig& reduce_config)
const {
284 =
typename common::FunctionTraits<ReduceFunction>::result_type;
289 typename common::FunctionTraits<ReduceFunction>::template arg<0>
291 "ReduceFunction has the wrong input type");
296 typename common::FunctionTraits<ReduceFunction>::template arg<1>
298 "ReduceFunction has the wrong input type");
304 "ReduceFunction has the wrong output type");
308 typename std::decay<
typename common::FunctionTraits<KeyExtractor>::
309 template arg<0> >::type,
311 "KeyExtractor has the wrong input type");
315 typename common::FunctionTraits<KeyExtractor>::result_type,
317 "The key has to be an unsigned long int (aka. size_t).");
320 DOpResult, KeyExtractor, ReduceFunction, ReduceConfig,
321 VolatileKeyValue,
false>;
323 auto node = tlx::make_counting<ReduceNode>(
324 *
this,
"ReduceToIndex", key_extractor, reduce_function,
325 size, neutral_element, reduce_config);
330 template <
typename ValueType,
typename Stack>
331 template <
typename KeyExtractor,
typename ReduceFunction,
typename ReduceConfig>
334 const KeyExtractor& key_extractor,
335 const ReduceFunction& reduce_function,
338 const ReduceConfig& reduce_config)
const {
342 =
typename common::FunctionTraits<ReduceFunction>::result_type;
347 typename common::FunctionTraits<ReduceFunction>::template arg<0>
349 "ReduceFunction has the wrong input type");
354 typename common::FunctionTraits<ReduceFunction>::template arg<1>
356 "ReduceFunction has the wrong input type");
362 "ReduceFunction has the wrong output type");
366 typename std::decay<
typename common::FunctionTraits<KeyExtractor>::
367 template arg<0> >::type,
369 "KeyExtractor has the wrong input type");
373 typename common::FunctionTraits<KeyExtractor>::result_type,
375 "The key has to be an unsigned long int (aka. size_t).");
378 DOpResult, KeyExtractor, ReduceFunction, ReduceConfig,
381 auto node = tlx::make_counting<ReduceNode>(
382 *
this,
"ReduceToIndex", key_extractor, reduce_function,
383 size, neutral_element, reduce_config);
391 #endif // !THRILL_API_REDUCE_TO_INDEX_HEADER std::thread CreateThread(Args &&... args)
create a std::thread and repeat creation if it fails
void StopPreOp(size_t) final
Virtual method for preparing end of PushData.
void StartPreOp(size_t) final
Virtual method for preparing start of PushData.
data::Stream::Writers emitters_
#define sLOG
Default logging method: output if the local debug variable is true.
DIA is the interface between the user and the Thrill framework.
core::ReduceByIndexPostPhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter, VolatileKey, ReduceConfig > post_phase_
Description of the amount of RAM the internal data structures of a DIANode require.
core::ReducePrePhase< TableItem, Key, ValueType, KeyExtractor, ReduceFunction, VolatileKey, data::Stream::Writer, ReduceConfig, core::ReduceByIndex< Key > > pre_phase_
auto ReduceToIndex(const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, size_t size, const ValueType &neutral_element=ValueType(), const ReduceConfig &reduce_config=ReduceConfig()) const
ReduceToIndex is a DOp, which groups elements of the DIA with the key_extractor returning an unsigned...
typename common::FunctionTraits< KeyExtractor >::result_type Key
static constexpr bool use_post_thread_
tag structure for ReduceToIndex()
ReduceToIndexNode(const ParentDIA &parent, const char *label, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, size_t result_size, const ValueType &neutral_element, const ReduceConfig &config)
Constructor for a ReduceToIndexNode.
data::MixStreamPtr mix_stream_
Emitter(ReduceToIndexNode *node)
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
A DIANode which performs a ReduceToIndex operation.
const struct VolatileKeyFlag< false > NoVolatileKeyTag
global const VolatileKeyFlag instance
virtual void AddChild(DIABase *node, const Callback &callback=Callback(), size_t parent_index=0)
Enables children to push their "folded" function chains to their parent.
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
tag structure for ReduceByKey(), and ReduceToIndex()
A DIANode which performs a Reduce operation.
static constexpr bool use_mix_stream_
A reduce index function, which determines a bucket depending on the current index range [begin...
Configuration class to define operational parameters of reduce hash tables and reduce phases...
DIAMemUse PreOpMemUse() final
Amount of RAM used by PreOp after StartPreOp()
static constexpr bool debug
void ProcessChannel()
process the inbound data in the post reduce phase
data::CatStreamPtr cat_stream_
StreamData::Writer Writer
A DOpNode is a typed node representing and distributed operations in Thrill.
DIAMemUse PushDataMemUse() final
Amount of RAM used by PushData()
ReduceToIndexNode * node_
typename std::conditional< VolatileKey, std::pair< Key, ValueType >, ValueType >::type TableItem
#define LOG
Default logging method: output if the local debug variable is true.
std::thread thread_
handle to additional thread for post phase
Emitter for PostPhase to push elements to next DIA object.
void Execute() final
Virtual execution method. Triggers actual computation in sub-classes.