15 #ifndef THRILL_API_GROUP_BY_KEY_HEADER 16 #define THRILL_API_GROUP_BY_KEY_HEADER 32 #include <type_traits> 34 #include <unordered_map> 44 template <
typename ValueType,
45 typename KeyExtractor,
typename GroupFunction,
typename HashFunction,
46 bool UseLocationDetection>
47 class GroupByNode final :
public DOpNode<ValueType>
50 static constexpr
bool debug =
false;
55 using Key =
typename common::FunctionTraits<KeyExtractor>::result_type;
58 typename common::FunctionTraits<KeyExtractor>::template arg_plain<0>;
81 static constexpr
size_t counter_bits_ = 8 *
sizeof(
CounterType);
84 assert(hash == b.
hash);
89 assert(hash == b.
hash);
103 template <
typename BitReader>
105 count = reader.GetBits(counter_bits_);
109 template <
typename BitWriter>
111 writer.PutBits(count, counter_bits_);
120 template <
typename ParentDIA>
122 const KeyExtractor& key_extractor,
123 const GroupFunction& groupby_function,
124 const HashFunction& hash_function = HashFunction())
125 :
Super(parent.ctx(),
"GroupByKey", { parent.id() }, { parent.node() }),
132 auto pre_op_fn = [=](
const ValueIn& input) {
137 auto lop_chain = parent.stack().push(pre_op_fn).fold();
138 parent.node()->AddChild(
this, lop_chain);
144 if (UseLocationDetection)
151 if (UseLocationDetection) {
156 const size_t recipient = hash %
emitters_.size();
185 if (UseLocationDetection) {
186 std::unordered_map<size_t, size_t> target_processors;
189 while (file_reader.HasNext()) {
190 ValueIn in = file_reader.template Next<ValueIn>();
194 auto target_processor = target_processors.find(hr);
195 emitters_[target_processor->second].Put(in);
207 const size_t num_runs =
files_.size();
211 else if (num_runs == 1) {
217 size_t merge_degree, prefetch;
220 while (std::tie(merge_degree, prefetch) =
222 files_.size() > merge_degree)
224 sLOG1 <<
"Partial multi-way-merge of" 225 << merge_degree <<
"files with prefetch" << prefetch;
228 std::vector<data::File::ConsumeReader> seq;
229 seq.reserve(merge_degree);
231 for (
size_t t = 0; t < merge_degree; ++t) {
233 files_[t].GetConsumeReader( 0));
238 auto puller = core::make_multiway_merge_tree<ValueIn>(
243 auto writer =
files_.back().GetWriter();
245 while (puller.HasNext()) {
246 writer.Put(puller.Next());
257 std::vector<data::File::Reader> seq;
258 seq.reserve(num_runs);
260 for (
size_t t = 0; t < num_runs; ++t) {
262 files_[t].GetReader(consume, 0));
267 LOG <<
"start multiwaymerge for real";
268 auto puller = core::make_multiway_merge_tree<ValueIn>(
271 LOG <<
"run user func";
272 if (puller.HasNext()) {
278 while (user_iterator.HasNextForReal()) {
281 user_iterator, user_iterator.GetNextKey());
289 <<
" name=multiwaymerge" 291 <<
" multiwaymerge=" << (num_runs > 1);
318 LOG <<
"get iterator";
321 LOG <<
"start running user func";
322 while (user_iterator.HasNextForReal()) {
325 user_iterator.GetNextKey());
329 LOG <<
"finished user func";
337 totalsize_ += v.size();
349 LOG <<
"running group by main op";
351 std::vector<ValueIn> incoming;
355 auto reader =
stream_->GetCatReader(
true);
356 while (reader.HasNext()) {
363 incoming.emplace_back(reader.template Next<ValueIn>());
367 LOG <<
"finished receiving elems";
375 <<
" number_files=" << files_.size();
381 template <
typename ValueType,
typename Stack>
382 template <
typename ValueOut,
bool LocationDetectionValue,
383 typename KeyExtractor,
typename GroupFunction,
typename HashFunction>
386 const KeyExtractor& key_extractor,
387 const GroupFunction& groupby_function,
388 const HashFunction& hash_function)
const {
392 typename std::decay<
typename common::FunctionTraits<KeyExtractor>
393 ::
template arg<0> >::type,
395 "KeyExtractor has the wrong input type");
398 ValueOut, KeyExtractor, GroupFunction, HashFunction,
399 LocationDetectionValue>;
401 auto node = tlx::make_counting<GroupByNode>(
402 *
this, key_extractor, groupby_function, hash_function);
407 template <
typename ValueType,
typename Stack>
408 template <
typename ValueOut,
typename KeyExtractor,
409 typename GroupFunction,
typename HashFunction>
411 const KeyExtractor& key_extractor,
412 const GroupFunction& groupby_function,
413 const HashFunction& hash_function)
const {
415 return GroupByKey<ValueOut>(
419 template <
typename ValueType,
typename Stack>
420 template <
typename ValueOut,
typename KeyExtractor,
typename GroupFunction>
422 const KeyExtractor& key_extractor,
423 const GroupFunction& groupby_function)
const {
425 return GroupByKey<ValueOut>(
427 std::hash<typename FunctionTraits<KeyExtractor>::result_type>());
433 #endif // !THRILL_API_GROUP_BY_KEY_HEADER
void StartPrefetch(std::vector< Reader > &readers, size_t prefetch_size)
Take a vector of Readers and prefetch equally from them.
DIA is the interface between the user and the Thrill framework.
DIAMemUse PushDataMemUse() final
Amount of RAM used by PushData()
core::LocationDetection< HashCount > location_detection_
void WriteBits(BitWriter &writer) const
Write count and dia_mask to BitWriter.
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Description of the amount of RAM the internal data structures of a DIANode require.
void StartPreOp(size_t) final
Virtual method for preparing start of PushData.
KeyExtractor key_extractor_
DIAMemUse ExecuteMemUse() final
Amount of RAM used by Execute()
const GroupByNode & node_
GroupFunction groupby_function_
std::pair< size_t, size_t > MaxMergeDegreePrefetch(size_t num_files)
A File is an ordered sequence of Block objects for storing items.
void reset()
release contained pointer, frees object if this is the last reference.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
typename common::FunctionTraits< KeyExtractor >::result_type Key
bool memory_exceeded
memory limit exceeded indicator
HashFunction hash_function_
void ReadBits(BitReader &reader)
Read count from BitReader.
data::File pre_file_
location detection and associated files
auto GroupByKey(const KeyExtractor &key_extractor, const GroupByFunction &groupby_function) const
GroupByKey is a DOp, which groups elements of the DIA by its key.
void Close()
custom destructor to close writers is a cyclic fashion
void Execute() override
Virtual execution method. Triggers actual computation in sub-classes.
static constexpr bool debug
uint_pair & operator+=(const uint_pair &b)
addition operator (uses 64-bit arithmetic)
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
data::CatStreamPtr stream_
data::CatStreamPtr GetNewCatStream(size_t dia_id)
ConsumeReader GetConsumeReader(size_t prefetch_size=File::default_prefetch_size_)
Get consuming BlockReader for beginning of File.
typename common::FunctionTraits< KeyExtractor >::template arg_plain< 0 > ValueIn
void Dispose() override
Virtual clear method. Triggers actual disposing in sub-classes.
data::File::Writer pre_writer_
bool operator()(const ValueIn &a, const ValueIn &b) const
data::CatStream::Writers emitters_
const struct LocationDetectionFlag< false > NoLocationDetectionTag
global const LocationDetectionFlag instance
bool NeedBroadcast() const
static IntegerType AddTruncToType(const IntegerType &a, const IntegerType &b)
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
const size_t & dia_id() const
return unique id of DIANode subclass as stored by StatsNode
void RunUserFunc(data::File &f, bool consume)
void vector_free(std::vector< Type > &v)
void FlushVectorToFile(std::vector< ValueIn > &v)
Sort and store elements in a file.
A DOpNode is a typed node representing and distributed operations in Thrill.
void MainOp()
Receive elements from other workers.
Reader GetReader(bool consume, size_t prefetch_size=File::default_prefetch_size_)
Get BlockReader or a consuming BlockReader for beginning of File.
DIAMemUse PreOpMemUse() final
Amount of RAM used by PreOp after StartPreOp()
void PreOp(const ValueIn &v)
Send all elements to their designated PEs.
std::deque< data::File > files_
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
GroupByNode(const ParentDIA &parent, const KeyExtractor &key_extractor, const GroupFunction &groupby_function, const HashFunction &hash_function=HashFunction())
Constructor for a GroupByNode.
void Close()
Explicitly close the writer.
tag structure for GroupByKey(), and InnerJoin()
HashCrc32< T > hash
Select a hashing method.
bool operator<(const uint_pair &b) const
less-than comparison operator
#define LOG
Default logging method: output if the local debug variable is true.
ValueComparator(const GroupByNode &node)
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
void StopPreOp(size_t) final
Virtual method for preparing end of PushData.
Context & context_
associated Context
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.