Thrill  0.1
group_to_index.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/group_to_index.hpp
3  *
4  * DIANode for a groupby to indx operation. Performs the actual groupby
5  * operation
6  *
7  * Part of Project Thrill - http://project-thrill.org
8  *
9  * Copyright (C) 2015 Huyen Chau Nguyen <[email protected]>
10  * Copyright (C) 2017 Tim Zeitz <[email protected]>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_API_GROUP_TO_INDEX_HEADER
17 #define THRILL_API_GROUP_TO_INDEX_HEADER
18 
19 #include <thrill/api/dia.hpp>
20 #include <thrill/api/dop_node.hpp>
23 #include <thrill/common/logger.hpp>
24 
25 #include <tlx/vector_free.hpp>
26 
27 #include <algorithm>
28 #include <functional>
29 #include <type_traits>
30 #include <typeinfo>
31 #include <utility>
32 #include <vector>
33 
34 namespace thrill {
35 namespace api {
36 
37 /*!
38  * \ingroup api_layer
39  */
40 template <typename ValueType,
41  typename KeyExtractor, typename GroupFunction>
42 class GroupToIndexNode final : public DOpNode<ValueType>
43 {
44  static constexpr bool debug = false;
45 
47  using Super::context_;
48 
49  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
50  using ValueOut = ValueType;
51  using ValueIn =
52  typename common::FunctionTraits<KeyExtractor>::template arg_plain<0>;
53 
55  {
56  public:
57  explicit ValueComparator(const GroupToIndexNode& node) : node_(node) { }
58 
59  bool operator () (const ValueIn& a, const ValueIn& b) const {
61  }
62 
63  private:
65  };
66 
67 public:
68  /*!
69  * Constructor for a GroupToIndexNode. Sets the DataManager, parent, stack,
70  * key_extractor and reduce_function.
71  */
72  template <typename ParentDIA>
73  GroupToIndexNode(const ParentDIA& parent,
74  const KeyExtractor& key_extractor,
75  const GroupFunction& groupby_function,
76  size_t result_size,
77  const ValueOut& neutral_element)
78  : Super(parent.ctx(), "GroupToIndex",
79  { parent.id() }, { parent.node() }),
80  key_extractor_(key_extractor),
81  groupby_function_(groupby_function),
82  result_size_(result_size),
83  key_range_(
86  neutral_element_(neutral_element) {
87  // Hook PreOp
88  auto pre_op_fn = [this](const ValueIn& input) {
89  PreOp(input);
90  };
91  // close the function stack with our pre op and register it at
92  // parent node for output
93  auto lop_chain = parent.stack().push(pre_op_fn).fold();
94  parent.node()->AddChild(this, lop_chain);
95  }
96 
97  //! Send all elements to their designated PEs
98  void PreOp(const ValueIn& v) {
99  const Key k = key_extractor_(v);
100  assert(k < result_size_);
101  const size_t recipient = common::CalculatePartition(
103  assert(recipient < emitters_.size());
104  emitters_[recipient].Put(v);
105  }
106 
107  void StopPreOp(size_t /* parent_index */) final {
108  // data has been pushed during pre-op -> close emitters
109  emitters_.Close();
110  }
111 
112  void Execute() override {
113  MainOp();
114  }
115 
116  void PushData(bool consume) final {
117  sLOG << "GroupToIndexNode::PushData()";
118 
119  const size_t num_runs = files_.size();
120  if (num_runs == 0) {
121  for (size_t index = key_range_.begin; index < key_range_.end; index++) {
122  this->PushItem(neutral_element_);
123  }
124  }
125  else if (num_runs == 1) {
126  // if there's only one run, store it
127  RunUserFunc(files_[0], consume);
128  }
129  else {
130  // otherwise sort all runs using multiway merge
131  std::vector<data::File::Reader> seq;
132  seq.reserve(num_runs);
133 
134  for (size_t t = 0; t < num_runs; ++t)
135  seq.emplace_back(files_[t].GetReader(consume));
136 
137  auto puller = core::make_multiway_merge_tree<ValueIn>(
138  seq.begin(), seq.end(), ValueComparator(*this));
139 
140  size_t curr_index = key_range_.begin;
141  if (puller.HasNext()) {
142  // create iterator to pass to user_function
143  auto user_iterator = GroupByMultiwayMergeIterator<
144  ValueIn, KeyExtractor, ValueComparator>(
145  puller, key_extractor_);
146 
147  while (user_iterator.HasNextForReal()) {
148  assert(user_iterator.GetNextKey() >= curr_index);
149 
150  if (user_iterator.GetNextKey() != curr_index) {
151  // push neutral element as result to callback functions
152  this->PushItem(neutral_element_);
153  }
154  else {
155  // call user function
156  const ValueOut res = groupby_function_(
157  user_iterator, user_iterator.GetNextKey());
158  // push result to callback functions
159  this->PushItem(res);
160  }
161  ++curr_index;
162  }
163  }
164  while (curr_index < key_range_.end) {
165  // push neutral element as result to callback functions
166  this->PushItem(neutral_element_);
167  ++curr_index;
168  }
169  }
170  }
171 
172  void Dispose() override { }
173 
174 private:
175  KeyExtractor key_extractor_;
176  GroupFunction groupby_function_;
177  const size_t result_size_;
180  size_t totalsize_ = 0;
181 
184  std::vector<data::File> files_;
185 
186  void RunUserFunc(data::File& f, bool consume) {
187  auto r = f.GetReader(consume);
188  size_t curr_index = key_range_.begin;
189  if (r.HasNext()) {
190  // create iterator to pass to user_function
191  auto user_iterator = GroupByIterator<
192  ValueIn, KeyExtractor, ValueComparator>(r, key_extractor_);
193  while (user_iterator.HasNextForReal()) {
194  assert(user_iterator.GetNextKey() >= curr_index);
195 
196  if (user_iterator.GetNextKey() != curr_index) {
197  // push neutral element as result to callback functions
198  this->PushItem(neutral_element_);
199  }
200  else {
201  // push result to callback functions
202  this->PushItem(
203  // call user function
204  groupby_function_(user_iterator,
205  user_iterator.GetNextKey()));
206  }
207  ++curr_index;
208  }
209  }
210  while (curr_index < key_range_.end) {
211  // push neutral element as result to callback functions
212  this->PushItem(neutral_element_);
213  ++curr_index;
214  }
215  }
216 
217  //! Sort and store elements in a file
218  void FlushVectorToFile(std::vector<ValueIn>& v) {
219  // sort run and sort to file
220  std::sort(v.begin(), v.end(), ValueComparator(*this));
221  totalsize_ += v.size();
222 
223  data::File f = context_.GetFile(this);
225  for (const ValueIn& e : v) {
226  w.Put(e);
227  }
228  w.Close();
229 
230  files_.emplace_back(std::move(f));
231  }
232 
233  //! Receive elements from other workers.
234  void MainOp() {
235  LOG << "Running GroupBy MainOp";
236 
237  std::vector<ValueIn> incoming;
238 
239  // get incoming elements
240  auto reader = stream_->GetCatReader(/* consume */ true);
241  while (reader.HasNext()) {
242  // if vector is full save to disk
243  if (mem::memory_exceeded) {
244  FlushVectorToFile(incoming);
245  incoming.clear();
246  }
247  // store incoming element
248  incoming.emplace_back(reader.template Next<ValueIn>());
249  }
250  FlushVectorToFile(incoming);
251  tlx::vector_free(incoming);
252 
253  stream_.reset();
254  }
255 };
256 
257 template <typename ValueType, typename Stack>
258 template <typename ValueOut,
259  typename KeyExtractor,
260  typename GroupFunction>
262  const KeyExtractor& key_extractor,
263  const GroupFunction& groupby_function,
264  const size_t result_size,
265  const ValueOut& neutral_element) const {
266 
267  using DOpResult
268  = ValueOut;
269 
270  static_assert(
271  std::is_same<
272  typename std::decay<typename common::FunctionTraits<KeyExtractor>
273  ::template arg<0> >::type,
274  ValueType>::value,
275  "KeyExtractor has the wrong input type");
276 
277  using GroupToIndexNode
279 
280  auto node = tlx::make_counting<GroupToIndexNode>(
281  *this, key_extractor, groupby_function, result_size, neutral_element);
282 
283  return DIA<DOpResult>(node);
284 }
285 
286 } // namespace api
287 } // namespace thrill
288 
289 #endif // !THRILL_API_GROUP_TO_INDEX_HEADER
290 
291 /******************************************************************************/
void PreOp(const ValueIn &v)
Send all elements to their designated PEs.
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
void Dispose() override
Virtual clear method. Triggers actual disposing in sub-classes.
const common::Range key_range_
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:251
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
void reset()
release contained pointer, frees object if this is the last reference.
GroupToIndexNode(const ParentDIA &parent, const KeyExtractor &key_extractor, const GroupFunction &groupby_function, size_t result_size, const ValueOut &neutral_element)
Constructor for a GroupToIndexNode.
void StopPreOp(size_t) final
Virtual method for preparing end of PushData.
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
std::vector< data::File > files_
bool memory_exceeded
memory limit exceeded indicator
typename common::FunctionTraits< KeyExtractor >::template arg_plain< 0 > ValueIn
void Close()
custom destructor to close writers is a cyclic fashion
Definition: stream_data.cpp:92
void Execute() override
Virtual execution method. Triggers actual computation in sub-classes.
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:59
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1209
static Range CalculateLocalRange(size_t global_size, size_t p, size_t i)
Definition: math.hpp:110
void MainOp()
Receive elements from other workers.
int value
Definition: gen_data.py:41
size_t end
end index
Definition: math.hpp:58
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:283
void vector_free(std::vector< Type > &v)
Definition: vector_free.hpp:21
static size_t CalculatePartition(size_t global_size, size_t p, size_t k)
Definition: math.hpp:115
typename common::FunctionTraits< KeyExtractor >::result_type Key
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
Reader GetReader(bool consume, size_t prefetch_size=File::default_prefetch_size_)
Get BlockReader or a consuming BlockReader for beginning of File.
Definition: file.cpp:78
void RunUserFunc(data::File &f, bool consume)
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
size_t begin
begin index
Definition: math.hpp:56
void Close()
Explicitly close the writer.
void FlushVectorToFile(std::vector< ValueIn > &v)
Sort and store elements in a file.
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
ValueComparator(const GroupToIndexNode &node)
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
auto GroupToIndex(const KeyExtractor &key_extractor, const GroupByFunction &groupby_function, const size_t size, const ValueOut &neutral_element=ValueOut()) const
GroupBy is a DOp, which groups elements of the DIA by its key.
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Definition: file.cpp:63
Context & context_
associated Context
Definition: dia_base.hpp:293
bool operator()(const ValueIn &a, const ValueIn &b) const
static constexpr bool debug
data::CatStream::Writers emitters_