Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
group_to_index.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/group_to_index.hpp
3  *
4  * DIANode for a groupby to indx operation. Performs the actual groupby
5  * operation
6  *
7  * Part of Project Thrill - http://project-thrill.org
8  *
9  * Copyright (C) 2015 Huyen Chau Nguyen <[email protected]>
10  *
11  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
12  ******************************************************************************/
13 
14 #pragma once
15 #ifndef THRILL_API_GROUP_TO_INDEX_HEADER
16 #define THRILL_API_GROUP_TO_INDEX_HEADER
17 
18 #include <thrill/api/dia.hpp>
19 #include <thrill/api/dop_node.hpp>
22 #include <thrill/common/logger.hpp>
23 
24 #include <algorithm>
25 #include <functional>
26 #include <type_traits>
27 #include <typeinfo>
28 #include <utility>
29 #include <vector>
30 
31 namespace thrill {
32 namespace api {
33 
34 /*!
35  * \ingroup api_layer
36  */
37 template <typename ValueType,
38  typename KeyExtractor, typename GroupFunction>
39 class GroupToIndexNode final : public DOpNode<ValueType>
40 {
41  static constexpr bool debug = false;
42 
44  using Super::context_;
45 
46  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
47  using ValueOut = ValueType;
48  using ValueIn =
49  typename common::FunctionTraits<KeyExtractor>::template arg_plain<0>;
50 
52  {
53  public:
54  explicit ValueComparator(const GroupToIndexNode& node) : node_(node) { }
55 
56  bool operator () (const ValueIn& a, const ValueIn& b) const {
57  return node_.key_extractor_(a) < node_.key_extractor_(b);
58  }
59 
60  private:
62  };
63 
64 public:
65  /*!
66  * Constructor for a GroupToIndexNode. Sets the DataManager, parent, stack,
67  * key_extractor and reduce_function.
68  */
69  template <typename ParentDIA>
70  GroupToIndexNode(const ParentDIA& parent,
71  const KeyExtractor& key_extractor,
72  const GroupFunction& groupby_function,
73  size_t result_size,
74  const ValueOut& neutral_element)
75  : Super(parent.ctx(), "GroupToIndex",
76  { parent.id() }, { parent.node() }),
77  key_extractor_(key_extractor),
78  groupby_function_(groupby_function),
79  result_size_(result_size),
80  key_range_(
83  neutral_element_(neutral_element)
84  {
85  // Hook PreOp
86  auto pre_op_fn = [this](const ValueIn& input) {
87  PreOp(input);
88  };
89  // close the function stack with our pre op and register it at
90  // parent node for output
91  auto lop_chain = parent.stack().push(pre_op_fn).fold();
92  parent.node()->AddChild(this, lop_chain);
93  }
94 
95  //! Send all elements to their designated PEs
96  void PreOp(const ValueIn& v) {
97  const Key k = key_extractor_(v);
98  assert(k < result_size_);
99  const size_t recipient = common::CalculatePartition(
101  assert(recipient < emitters_.size());
102  emitters_[recipient].Put(v);
103  }
104 
105  void StopPreOp(size_t /* id */) final {
106  // data has been pushed during pre-op -> close emitters
107  emitters_.Close();
108  }
109 
110  void Execute() override {
111  MainOp();
112  }
113 
114  void PushData(bool consume) final {
115  sLOG << "GroupToIndexNode::PushData()";
116 
117  const size_t num_runs = files_.size();
118  if (num_runs == 0) {
119  for (size_t index = key_range_.begin; index < key_range_.end; index++) {
120  this->PushItem(neutral_element_);
121  }
122  }
123  else if (num_runs == 1) {
124  // if there's only one run, store it
125  RunUserFunc(files_[0], consume);
126  }
127  else {
128  // otherwise sort all runs using multiway merge
129  std::vector<data::File::Reader> seq;
130  seq.reserve(num_runs);
131 
132  for (size_t t = 0; t < num_runs; ++t)
133  seq.emplace_back(files_[t].GetReader(consume));
134 
135  auto puller = core::make_multiway_merge_tree<ValueIn>(
136  seq.begin(), seq.end(), ValueComparator(*this));
137 
138  size_t curr_index = key_range_.begin;
139  if (puller.HasNext()) {
140  // create iterator to pass to user_function
141  auto user_iterator = GroupByMultiwayMergeIterator<
142  ValueIn, KeyExtractor, ValueComparator>(
143  puller, key_extractor_);
144 
145  while (user_iterator.HasNextForReal()) {
146  assert(user_iterator.GetNextKey() >= curr_index);
147 
148  if (user_iterator.GetNextKey() != curr_index) {
149  // push neutral element as result to callback functions
150  this->PushItem(neutral_element_);
151  }
152  else {
153  // call user function
154  const ValueOut res = groupby_function_(
155  user_iterator, user_iterator.GetNextKey());
156  // push result to callback functions
157  this->PushItem(res);
158  }
159  ++curr_index;
160  }
161  }
162  while (curr_index < key_range_.end) {
163  // push neutral element as result to callback functions
164  this->PushItem(neutral_element_);
165  ++curr_index;
166  }
167  }
168  }
169 
170  void Dispose() override { }
171 
172 private:
173  KeyExtractor key_extractor_;
174  GroupFunction groupby_function_;
175  const size_t result_size_;
178  size_t totalsize_ = 0;
179 
182  std::vector<data::File> files_;
183 
184  void RunUserFunc(data::File& f, bool consume) {
185  auto r = f.GetReader(consume);
186  size_t curr_index = key_range_.begin;
187  if (r.HasNext()) {
188  // create iterator to pass to user_function
189  auto user_iterator = GroupByIterator<
190  ValueIn, KeyExtractor, ValueComparator>(r, key_extractor_);
191  while (user_iterator.HasNextForReal()) {
192  assert(user_iterator.GetNextKey() >= curr_index);
193 
194  if (user_iterator.GetNextKey() != curr_index) {
195  // push neutral element as result to callback functions
196  this->PushItem(neutral_element_);
197  }
198  else {
199  // push result to callback functions
200  this->PushItem(
201  // call user function
202  groupby_function_(user_iterator,
203  user_iterator.GetNextKey()));
204  }
205  ++curr_index;
206  }
207  }
208  while (curr_index < key_range_.end) {
209  // push neutral element as result to callback functions
210  this->PushItem(neutral_element_);
211  ++curr_index;
212  }
213  }
214 
215  //! Sort and store elements in a file
216  void FlushVectorToFile(std::vector<ValueIn>& v) {
217  // sort run and sort to file
218  std::sort(v.begin(), v.end(), ValueComparator(*this));
219  totalsize_ += v.size();
220 
221  data::File f = context_.GetFile(this);
223  for (const ValueIn& e : v) {
224  w.Put(e);
225  }
226  w.Close();
227 
228  files_.emplace_back(std::move(f));
229  }
230 
231  //! Receive elements from other workers.
232  void MainOp() {
233  LOG << "Running GroupBy MainOp";
234 
235  std::vector<ValueIn> incoming;
236 
237  // get incoming elements
238  auto reader = stream_->GetCatReader(/* consume */ true);
239  while (reader.HasNext()) {
240  // if vector is full save to disk
241  if (mem::memory_exceeded) {
242  FlushVectorToFile(incoming);
243  incoming.clear();
244  }
245  // store incoming element
246  incoming.emplace_back(reader.template Next<ValueIn>());
247  }
248  FlushVectorToFile(incoming);
249  std::vector<ValueIn>().swap(incoming);
250 
251  stream_.reset();
252  }
253 };
254 
255 template <typename ValueType, typename Stack>
256 template <typename ValueOut,
257  typename KeyExtractor,
258  typename GroupFunction>
260  const KeyExtractor& key_extractor,
261  const GroupFunction& groupby_function,
262  const size_t result_size,
263  const ValueOut& neutral_element) const {
264 
265  using DOpResult
266  = ValueOut;
267 
268  static_assert(
269  std::is_same<
270  typename std::decay<typename common::FunctionTraits<KeyExtractor>
271  ::template arg<0> >::type,
272  ValueType>::value,
273  "KeyExtractor has the wrong input type");
274 
275  using GroupToIndexNode
277 
278  auto node = tlx::make_counting<GroupToIndexNode>(
279  *this, key_extractor, groupby_function, result_size, neutral_element);
280 
281  return DIA<DOpResult>(node);
282 }
283 
284 } // namespace api
285 } // namespace thrill
286 
287 #endif // !THRILL_API_GROUP_TO_INDEX_HEADER
288 
289 /******************************************************************************/
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
ValueType_ ValueType
Definition: dia.hpp:152
const common::Range key_range_
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:240
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
void reset()
release contained pointer, frees object if this is the last reference.
GroupToIndexNode(const ParentDIA &parent, const KeyExtractor &key_extractor, const GroupFunction &groupby_function, size_t result_size, const ValueOut &neutral_element)
Constructor for a GroupToIndexNode.
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:40
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
std::vector< data::File > files_
bool memory_exceeded
memory limit exceeded indicator
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
typename common::FunctionTraits< KeyExtractor >::template arg_plain< 0 > ValueIn
void Close()
custom destructor to close writers is a cyclic fashion
Definition: stream.cpp:60
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:184
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1144
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
static Range CalculateLocalRange(size_t global_size, size_t p, size_t i)
Definition: math.hpp:109
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
void MainOp()
Receive elements from other workers.
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
size_t end
end index
Definition: math.hpp:57
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:280
auto GroupToIndex(const KeyExtractor &key_extractor, const GroupByFunction &groupby_function, const size_t size, const ValueOut &neutral_element=ValueOut()) const
GroupBy is a DOp, which groups elements of the DIA by its key.
static size_t CalculatePartition(size_t global_size, size_t p, size_t k)
Definition: math.hpp:114
Reader GetReader(bool consume, size_t num_prefetch=File::default_prefetch)
Get BlockReader or a consuming BlockReader for beginning of File.
Definition: file.cpp:78
typename common::FunctionTraits< KeyExtractor >::result_type Key
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
void RunUserFunc(data::File &f, bool consume)
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
size_t begin
begin index
Definition: math.hpp:55
void Close()
Explicitly close the writer.
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:248
bool operator()(const ValueIn &a, const ValueIn &b) const
void FlushVectorToFile(std::vector< ValueIn > &v)
Sort and store elements in a file.
ValueComparator(const GroupToIndexNode &node)
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Definition: file.cpp:63
Context & context_
associated Context
Definition: dia_base.hpp:293
static constexpr bool debug
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:172
data::CatStream::Writers emitters_