Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
group_to_index.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/group_to_index.hpp
3  *
4  * DIANode for a groupby to indx operation. Performs the actual groupby
5  * operation
6  *
7  * Part of Project Thrill - http://project-thrill.org
8  *
9  * Copyright (C) 2015 Huyen Chau Nguyen <[email protected]>
10  * Copyright (C) 2017 Tim Zeitz <[email protected]>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_API_GROUP_TO_INDEX_HEADER
17 #define THRILL_API_GROUP_TO_INDEX_HEADER
18 
19 #include <thrill/api/dia.hpp>
20 #include <thrill/api/dop_node.hpp>
23 #include <thrill/common/logger.hpp>
24 
25 #include <algorithm>
26 #include <functional>
27 #include <type_traits>
28 #include <typeinfo>
29 #include <utility>
30 #include <vector>
31 
32 namespace thrill {
33 namespace api {
34 
35 /*!
36  * \ingroup api_layer
37  */
38 template <typename ValueType,
39  typename KeyExtractor, typename GroupFunction>
40 class GroupToIndexNode final : public DOpNode<ValueType>
41 {
42  static constexpr bool debug = false;
43 
45  using Super::context_;
46 
47  using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
48  using ValueOut = ValueType;
49  using ValueIn =
50  typename common::FunctionTraits<KeyExtractor>::template arg_plain<0>;
51 
53  {
54  public:
55  explicit ValueComparator(const GroupToIndexNode& node) : node_(node) { }
56 
57  bool operator () (const ValueIn& a, const ValueIn& b) const {
58  return node_.key_extractor_(a) < node_.key_extractor_(b);
59  }
60 
61  private:
63  };
64 
65 public:
66  /*!
67  * Constructor for a GroupToIndexNode. Sets the DataManager, parent, stack,
68  * key_extractor and reduce_function.
69  */
70  template <typename ParentDIA>
71  GroupToIndexNode(const ParentDIA& parent,
72  const KeyExtractor& key_extractor,
73  const GroupFunction& groupby_function,
74  size_t result_size,
75  const ValueOut& neutral_element)
76  : Super(parent.ctx(), "GroupToIndex",
77  { parent.id() }, { parent.node() }),
78  key_extractor_(key_extractor),
79  groupby_function_(groupby_function),
80  result_size_(result_size),
81  key_range_(
84  neutral_element_(neutral_element)
85  {
86  // Hook PreOp
87  auto pre_op_fn = [this](const ValueIn& input) {
88  PreOp(input);
89  };
90  // close the function stack with our pre op and register it at
91  // parent node for output
92  auto lop_chain = parent.stack().push(pre_op_fn).fold();
93  parent.node()->AddChild(this, lop_chain);
94  }
95 
96  //! Send all elements to their designated PEs
97  void PreOp(const ValueIn& v) {
98  const Key k = key_extractor_(v);
99  assert(k < result_size_);
100  const size_t recipient = common::CalculatePartition(
102  assert(recipient < emitters_.size());
103  emitters_[recipient].Put(v);
104  }
105 
106  void StopPreOp(size_t /* id */) final {
107  // data has been pushed during pre-op -> close emitters
108  emitters_.Close();
109  }
110 
111  void Execute() override {
112  MainOp();
113  }
114 
115  void PushData(bool consume) final {
116  sLOG << "GroupToIndexNode::PushData()";
117 
118  const size_t num_runs = files_.size();
119  if (num_runs == 0) {
120  for (size_t index = key_range_.begin; index < key_range_.end; index++) {
121  this->PushItem(neutral_element_);
122  }
123  }
124  else if (num_runs == 1) {
125  // if there's only one run, store it
126  RunUserFunc(files_[0], consume);
127  }
128  else {
129  // otherwise sort all runs using multiway merge
130  std::vector<data::File::Reader> seq;
131  seq.reserve(num_runs);
132 
133  for (size_t t = 0; t < num_runs; ++t)
134  seq.emplace_back(files_[t].GetReader(consume));
135 
136  auto puller = core::make_multiway_merge_tree<ValueIn>(
137  seq.begin(), seq.end(), ValueComparator(*this));
138 
139  size_t curr_index = key_range_.begin;
140  if (puller.HasNext()) {
141  // create iterator to pass to user_function
142  auto user_iterator = GroupByMultiwayMergeIterator<
143  ValueIn, KeyExtractor, ValueComparator>(
144  puller, key_extractor_);
145 
146  while (user_iterator.HasNextForReal()) {
147  assert(user_iterator.GetNextKey() >= curr_index);
148 
149  if (user_iterator.GetNextKey() != curr_index) {
150  // push neutral element as result to callback functions
151  this->PushItem(neutral_element_);
152  }
153  else {
154  // call user function
155  const ValueOut res = groupby_function_(
156  user_iterator, user_iterator.GetNextKey());
157  // push result to callback functions
158  this->PushItem(res);
159  }
160  ++curr_index;
161  }
162  }
163  while (curr_index < key_range_.end) {
164  // push neutral element as result to callback functions
165  this->PushItem(neutral_element_);
166  ++curr_index;
167  }
168  }
169  }
170 
171  void Dispose() override { }
172 
173 private:
174  KeyExtractor key_extractor_;
175  GroupFunction groupby_function_;
176  const size_t result_size_;
179  size_t totalsize_ = 0;
180 
183  std::vector<data::File> files_;
184 
185  void RunUserFunc(data::File& f, bool consume) {
186  auto r = f.GetReader(consume);
187  size_t curr_index = key_range_.begin;
188  if (r.HasNext()) {
189  // create iterator to pass to user_function
190  auto user_iterator = GroupByIterator<
191  ValueIn, KeyExtractor, ValueComparator>(r, key_extractor_);
192  while (user_iterator.HasNextForReal()) {
193  assert(user_iterator.GetNextKey() >= curr_index);
194 
195  if (user_iterator.GetNextKey() != curr_index) {
196  // push neutral element as result to callback functions
197  this->PushItem(neutral_element_);
198  }
199  else {
200  // push result to callback functions
201  this->PushItem(
202  // call user function
203  groupby_function_(user_iterator,
204  user_iterator.GetNextKey()));
205  }
206  ++curr_index;
207  }
208  }
209  while (curr_index < key_range_.end) {
210  // push neutral element as result to callback functions
211  this->PushItem(neutral_element_);
212  ++curr_index;
213  }
214  }
215 
216  //! Sort and store elements in a file
217  void FlushVectorToFile(std::vector<ValueIn>& v) {
218  // sort run and sort to file
219  std::sort(v.begin(), v.end(), ValueComparator(*this));
220  totalsize_ += v.size();
221 
222  data::File f = context_.GetFile(this);
224  for (const ValueIn& e : v) {
225  w.Put(e);
226  }
227  w.Close();
228 
229  files_.emplace_back(std::move(f));
230  }
231 
232  //! Receive elements from other workers.
233  void MainOp() {
234  LOG << "Running GroupBy MainOp";
235 
236  std::vector<ValueIn> incoming;
237 
238  // get incoming elements
239  auto reader = stream_->GetCatReader(/* consume */ true);
240  while (reader.HasNext()) {
241  // if vector is full save to disk
242  if (mem::memory_exceeded) {
243  FlushVectorToFile(incoming);
244  incoming.clear();
245  }
246  // store incoming element
247  incoming.emplace_back(reader.template Next<ValueIn>());
248  }
249  FlushVectorToFile(incoming);
250  std::vector<ValueIn>().swap(incoming);
251 
252  stream_.reset();
253  }
254 };
255 
256 template <typename ValueType, typename Stack>
257 template <typename ValueOut,
258  typename KeyExtractor,
259  typename GroupFunction>
261  const KeyExtractor& key_extractor,
262  const GroupFunction& groupby_function,
263  const size_t result_size,
264  const ValueOut& neutral_element) const {
265 
266  using DOpResult
267  = ValueOut;
268 
269  static_assert(
270  std::is_same<
271  typename std::decay<typename common::FunctionTraits<KeyExtractor>
272  ::template arg<0> >::type,
273  ValueType>::value,
274  "KeyExtractor has the wrong input type");
275 
276  using GroupToIndexNode
278 
279  auto node = tlx::make_counting<GroupToIndexNode>(
280  *this, key_extractor, groupby_function, result_size, neutral_element);
281 
282  return DIA<DOpResult>(node);
283 }
284 
285 } // namespace api
286 } // namespace thrill
287 
288 #endif // !THRILL_API_GROUP_TO_INDEX_HEADER
289 
290 /******************************************************************************/
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
ValueType_ ValueType
Definition: dia.hpp:152
const common::Range key_range_
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:240
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
void reset()
release contained pointer, frees object if this is the last reference.
GroupToIndexNode(const ParentDIA &parent, const KeyExtractor &key_extractor, const GroupFunction &groupby_function, size_t result_size, const ValueOut &neutral_element)
Constructor for a GroupToIndexNode.
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
std::vector< data::File > files_
bool memory_exceeded
memory limit exceeded indicator
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
typename common::FunctionTraits< KeyExtractor >::template arg_plain< 0 > ValueIn
void Close()
custom destructor to close writers is a cyclic fashion
Definition: stream_data.cpp:66
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1120
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
static Range CalculateLocalRange(size_t global_size, size_t p, size_t i)
Definition: math.hpp:110
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
void MainOp()
Receive elements from other workers.
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
size_t end
end index
Definition: math.hpp:58
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:280
auto GroupToIndex(const KeyExtractor &key_extractor, const GroupByFunction &groupby_function, const size_t size, const ValueOut &neutral_element=ValueOut()) const
GroupBy is a DOp, which groups elements of the DIA by its key.
static size_t CalculatePartition(size_t global_size, size_t p, size_t k)
Definition: math.hpp:115
typename common::FunctionTraits< KeyExtractor >::result_type Key
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
Reader GetReader(bool consume, size_t prefetch_size=File::default_prefetch_size_)
Get BlockReader or a consuming BlockReader for beginning of File.
Definition: file.cpp:78
void RunUserFunc(data::File &f, bool consume)
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
size_t begin
begin index
Definition: math.hpp:56
void Close()
Explicitly close the writer.
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:248
bool operator()(const ValueIn &a, const ValueIn &b) const
void FlushVectorToFile(std::vector< ValueIn > &v)
Sort and store elements in a file.
ValueComparator(const GroupToIndexNode &node)
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Definition: file.cpp:63
Context & context_
associated Context
Definition: dia_base.hpp:293
static constexpr bool debug
data::CatStream::Writers emitters_