16 #ifndef THRILL_API_GROUP_TO_INDEX_HEADER 17 #define THRILL_API_GROUP_TO_INDEX_HEADER 29 #include <type_traits> 40 template <
typename ValueType,
41 typename KeyExtractor,
typename GroupFunction>
42 class GroupToIndexNode final :
public DOpNode<ValueType>
44 static constexpr
bool debug =
false;
49 using Key =
typename common::FunctionTraits<KeyExtractor>::result_type;
52 typename common::FunctionTraits<KeyExtractor>::template arg_plain<0>;
72 template <
typename ParentDIA>
74 const KeyExtractor& key_extractor,
75 const GroupFunction& groupby_function,
78 :
Super(parent.ctx(),
"GroupToIndex",
79 { parent.id() }, { parent.node() }),
88 auto pre_op_fn = [
this](
const ValueIn& input) {
93 auto lop_chain = parent.stack().push(pre_op_fn).fold();
94 parent.node()->AddChild(
this, lop_chain);
117 sLOG <<
"GroupToIndexNode::PushData()";
119 const size_t num_runs =
files_.size();
125 else if (num_runs == 1) {
131 std::vector<data::File::Reader> seq;
132 seq.reserve(num_runs);
134 for (
size_t t = 0; t < num_runs; ++t)
135 seq.emplace_back(
files_[t].GetReader(consume));
137 auto puller = core::make_multiway_merge_tree<ValueIn>(
141 if (puller.HasNext()) {
147 while (user_iterator.HasNextForReal()) {
148 assert(user_iterator.GetNextKey() >= curr_index);
150 if (user_iterator.GetNextKey() != curr_index) {
157 user_iterator, user_iterator.GetNextKey());
188 size_t curr_index = key_range_.
begin;
193 while (user_iterator.HasNextForReal()) {
194 assert(user_iterator.GetNextKey() >= curr_index);
196 if (user_iterator.GetNextKey() != curr_index) {
205 user_iterator.GetNextKey()));
210 while (curr_index < key_range_.
end) {
221 totalsize_ += v.size();
230 files_.emplace_back(std::move(f));
235 LOG <<
"Running GroupBy MainOp";
237 std::vector<ValueIn> incoming;
240 auto reader =
stream_->GetCatReader(
true);
241 while (reader.HasNext()) {
248 incoming.emplace_back(reader.template Next<ValueIn>());
257 template <
typename ValueType,
typename Stack>
259 typename KeyExtractor,
260 typename GroupFunction>
262 const KeyExtractor& key_extractor,
263 const GroupFunction& groupby_function,
264 const size_t result_size,
265 const ValueOut& neutral_element)
const {
272 typename std::decay<
typename common::FunctionTraits<KeyExtractor>
273 ::
template arg<0> >::type,
275 "KeyExtractor has the wrong input type");
280 auto node = tlx::make_counting<GroupToIndexNode>(
281 *
this, key_extractor, groupby_function, result_size, neutral_element);
289 #endif // !THRILL_API_GROUP_TO_INDEX_HEADER void PreOp(const ValueIn &v)
Send all elements to their designated PEs.
#define sLOG
Default logging method: output if the local debug variable is true.
DIA is the interface between the user and the Thrill framework.
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
void Dispose() override
Virtual clear method. Triggers actual disposing in sub-classes.
const common::Range key_range_
const size_t result_size_
size_t num_workers() const
Global number of workers in the system.
A File is an ordered sequence of Block objects for storing items.
KeyExtractor key_extractor_
void reset()
release contained pointer, frees object if this is the last reference.
GroupToIndexNode(const ParentDIA &parent, const KeyExtractor &key_extractor, const GroupFunction &groupby_function, size_t result_size, const ValueOut &neutral_element)
Constructor for a GroupToIndexNode.
void StopPreOp(size_t) final
Virtual method for preparing end of PushData.
represents a 1 dimensional range (interval) [begin,end)
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
std::vector< data::File > files_
bool memory_exceeded
memory limit exceeded indicator
typename common::FunctionTraits< KeyExtractor >::template arg_plain< 0 > ValueIn
void Close()
custom destructor to close writers is a cyclic fashion
void Execute() override
Virtual execution method. Triggers actual computation in sub-classes.
data::CatStreamPtr stream_
const GroupToIndexNode & node_
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
data::CatStreamPtr GetNewCatStream(size_t dia_id)
static Range CalculateLocalRange(size_t global_size, size_t p, size_t i)
void MainOp()
Receive elements from other workers.
GroupFunction groupby_function_
size_t my_rank() const
Global rank of this worker among all other workers in the system.
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
void vector_free(std::vector< Type > &v)
static size_t CalculatePartition(size_t global_size, size_t p, size_t k)
typename common::FunctionTraits< KeyExtractor >::result_type Key
A DOpNode is a typed node representing and distributed operations in Thrill.
Reader GetReader(bool consume, size_t prefetch_size=File::default_prefetch_size_)
Get BlockReader or a consuming BlockReader for beginning of File.
ValueOut neutral_element_
void RunUserFunc(data::File &f, bool consume)
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
void Close()
Explicitly close the writer.
void FlushVectorToFile(std::vector< ValueIn > &v)
Sort and store elements in a file.
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
ValueComparator(const GroupToIndexNode &node)
#define LOG
Default logging method: output if the local debug variable is true.
auto GroupToIndex(const KeyExtractor &key_extractor, const GroupByFunction &groupby_function, const size_t size, const ValueOut &neutral_element=ValueOut()) const
GroupBy is a DOp, which groups elements of the DIA by its key.
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Context & context_
associated Context
bool operator()(const ValueIn &a, const ValueIn &b) const
static constexpr bool debug
data::CatStream::Writers emitters_