Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
reduce_by_index_post_phase.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/core/reduce_by_index_post_phase.hpp
3  *
4  * Hash table with support for reduce.
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2016 Timo Bingmann <[email protected]>
9  * Copyright (C) 2017 Tim Zeitz <[email protected]>
10  *
11  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
12  ******************************************************************************/
13 
14 #pragma once
15 #ifndef THRILL_CORE_REDUCE_BY_INDEX_POST_PHASE_HEADER
16 #define THRILL_CORE_REDUCE_BY_INDEX_POST_PHASE_HEADER
17 
18 #include <thrill/api/context.hpp>
19 #include <thrill/common/logger.hpp>
23 #include <thrill/data/file.hpp>
24 
25 #include <algorithm>
26 #include <cassert>
27 #include <cmath>
28 #include <functional>
29 #include <string>
30 #include <utility>
31 #include <vector>
32 
33 namespace thrill {
34 namespace core {
35 
36 template <typename TableItem, typename Key, typename Value,
37  typename KeyExtractor, typename ReduceFunction, typename Emitter,
38  const bool VolatileKey,
39  typename ReduceConfig_ = DefaultReduceConfig>
41 {
42  static constexpr bool debug = false;
43 
44 public:
45  using ReduceConfig = ReduceConfig_;
48  TableItem, Value, Emitter, VolatileKey>;
49 
50  /*!
51  * A data structure which takes an arbitrary value and extracts an index
52  * using a key extractor function from that value. Afterwards, values with
53  * the same index are merged together
54  */
56  Context& ctx,
57  size_t dia_id,
58  const KeyExtractor& key_extractor,
59  const ReduceFunction& reduce_function,
60  const Emitter& emitter,
61  const ReduceConfig& config = ReduceConfig(),
62  const Value& neutral_element = Value())
63  : ctx_(ctx), dia_id_(dia_id),
64  key_extractor_(key_extractor), reduce_function_(reduce_function),
65  config_(config), emitter_(emitter),
66  neutral_element_(neutral_element) { }
67 
68  //! non-copyable: delete copy-constructor
70  //! non-copyable: delete assignment operator
72 
73  //! Sets the range of indexes to be handled by this index table
74  void SetRange(const common::Range& range) {
75  range_ = range;
76  full_range_ = range;
77  }
78 
79  void Initialize(size_t limit_memory_bytes) {
80  assert(range_.IsValid() || range_.IsEmpty());
81  limit_memory_bytes_ = limit_memory_bytes;
82 
83  TableItem neutral =
84  MakeTableItem::Make(neutral_element_, key_extractor_);
85  neutral_element_key_ = key(neutral);
86 
87  if (range_.size() * sizeof(TableItem) < limit_memory_bytes) {
88  num_subranges_ = 0;
89 
90  // all good, we can store the whole index range
91  items_.resize(range_.size(), neutral);
92 
93  LOG << "ReduceByIndexPostPhase()"
94  << " limit_memory_bytes_=" << limit_memory_bytes_
95  << " num_subranges_=" << 0
96  << " range_=" << range_;
97  }
98  else {
99  // we have to outsource some subranges
101  1 + (range_.size() * sizeof(TableItem) / limit_memory_bytes);
102  // we keep the first subrange in memory and only the other ones go
103  // into a file
105 
106  items_.resize(range_.size(), neutral);
107 
108  LOG << "ReduceByIndexPostPhase()"
109  << " limit_memory_bytes_=" << limit_memory_bytes_
110  << " num_subranges_=" << num_subranges_
111  << " full_range_=" << full_range_
112  << " range_=" << range_
113  << " range_.size()=" << range_.size();
114 
115  subranges_.reserve(num_subranges_ - 1);
116  subrange_files_.reserve(num_subranges_ - 1);
117  subrange_writers_.reserve(num_subranges_ - 1);
118 
119  for (size_t partition = 1; partition < num_subranges_; partition++) {
120  auto file = ctx_.GetFilePtr(dia_id_);
121  auto writer = file->GetWriter();
122  common::Range subrange =
123  full_range_.Partition(partition, num_subranges_);
124 
125  LOG << "ReduceByIndexPostPhase()"
126  << " partition=" << partition
127  << " subrange=" << subrange;
128 
129  subranges_.emplace_back(subrange);
130  subrange_files_.emplace_back(std::move(file));
131  subrange_writers_.emplace_back(std::move(writer));
132  }
133  }
134  }
135 
136  bool Insert(const TableItem& kv) {
137  size_t item_key = key(kv);
138  assert(item_key >= full_range_.begin && item_key < full_range_.end);
139 
140  LOG << "Insert() item_key=" << item_key
141  << " full_range_=" << full_range_
142  << " range_" << range_;
143 
144  if (item_key < range_.end) {
145  // items is in the main range
146  size_t offset = item_key - full_range_.begin;
147 
148  if (item_key != neutral_element_key_) {
149  // normal index
150  if (key(items_[offset]) == item_key) {
151  items_[offset] = reduce(items_[offset], kv);
152  return false;
153  }
154  else {
155  items_[offset] = kv;
156  return true;
157  }
158  }
159  else {
160  // special handling for element with neutral index
162  items_[offset] = reduce(items_[offset], kv);
163  return false;
164  }
165  else {
166  items_[offset] = kv;
168  return true;
169  }
170  }
171  }
172  else {
173  // items has to be stored in an overflow File
174  size_t r = full_range_.FindPartition(item_key, num_subranges_) - 1;
175 
176  const common::Range& subrange = subranges_.at(r);
177  data::File::Writer& writer = subrange_writers_.at(r);
178 
179  LOG << "Insert() item_key=" << item_key
180  << " r=" << r
181  << " subrange=" << subrange;
182 
183  assert(item_key >= subrange.begin && item_key < subrange.end);
184  writer.Put(kv);
185  return false;
186  }
187  }
188 
189  void PushData(bool consume = false, data::File::Writer* pwriter = nullptr) {
190  assert(!pwriter || consume);
191 
192  if (cache_) {
193  // previous PushData() has stored data in cache_
194  data::File::Reader reader = cache_->GetReader(consume);
195  while (reader.HasNext())
196  emitter_.Emit(reader.Next<TableItem>());
197  return;
198  }
199 
200  if (!consume) {
201  if (subranges_.empty()) {
202  Flush();
203  }
204  else {
205  data::FilePtr cache = ctx_.GetFilePtr(dia_id_);
206  data::File::Writer writer = cache_->GetWriter();
207  PushData(true, &writer);
208  cache_ = cache;
209  writer.Close();
210  }
211  return;
212  }
213 
214  // close File writers
215  for (auto& w : subrange_writers_) {
216  w.Close();
217  }
218 
219  if (pwriter) {
220  FlushAndConsume<true>(pwriter);
221  }
222  else {
223  FlushAndConsume();
224  }
225 
226  for (size_t i = 0; i < subranges_.size(); ++i) {
227  ReduceByIndexPostPhase<TableItem, Key, Value, KeyExtractor,
228  ReduceFunction, Emitter, VolatileKey,
229  ReduceConfig>
232  subtable.SetRange(subranges_[i]);
233  subtable.Initialize(limit_memory_bytes_);
234 
235  {
236  // insert items
237  auto reader = subrange_files_[i]->GetConsumeReader();
238  while (reader.HasNext()) {
239  subtable.Insert(reader.template Next<TableItem>());
240  }
241  }
242 
243  subtable.PushData(consume || pwriter, pwriter);
244 
245  // delete File
246  subrange_files_[i].reset();
247  }
248  }
249 
250  void Dispose() {
251  std::vector<TableItem>().swap(items_);
252 
253  std::vector<common::Range>().swap(subranges_);
254  std::vector<data::FilePtr>().swap(subrange_files_);
255  std::vector<data::File::Writer>().swap(subrange_writers_);
256  }
257 
258 private:
259  void Flush() {
260  for (auto it = items_.begin(); it != items_.end(); ++it) {
261  emitter_.Emit(*it);
262  }
263  }
264 
265  template <bool DoCache = false>
266  void FlushAndConsume(data::File::Writer* writer = nullptr) {
267  for (auto it = items_.begin(); it != items_.end(); ++it) {
268  emitter_.Emit(*it);
269  if (DoCache) { writer->Put(*it); }
270  }
272  // free array
273  std::vector<TableItem>().swap(items_);
274  }
275 
276  Key key(const TableItem& t) {
277  return MakeTableItem::GetKey(t, key_extractor_);
278  }
279 
280  TableItem reduce(const TableItem& a, const TableItem& b) {
281  return MakeTableItem::Reduce(a, b, reduce_function_);
282  }
283 
284  //! Context
285  Context& ctx_;
286 
287  //! Associated DIA id
288  size_t dia_id_;
289 
290  //! Key extractor function for extracting a key from a value.
291  KeyExtractor key_extractor_;
292 
293  //! Reduce function for reducing two values.
294  ReduceFunction reduce_function_;
295 
296  //! Stored reduce config to initialize the subtable.
298 
299  //! Emitters used to parameterize hash table for output to next DIA node.
301 
302  //! neutral element to fill holes in output
304 
305  //! Size of the table in bytes
307 
308  //! The index where the neutral element would go if acutally inserted
310 
311  //! Is there an actual element at the index of the neutral element?
313 
314  //! Range of indexes actually managed in this instance -
315  //! not including subranges
317 
318  //! Full range of indexes actually managed in this instance -
319  //! including subranges
321 
322  //! Store for items in range of this worker.
323  //! Stored in reverse order so we can consume while emitting.
324  std::vector<TableItem> items_;
325 
326  //! number of subranges
328 
329  //! Subranges
330  std::vector<common::Range> subranges_;
331 
332  //! Subranges external Files
333  std::vector<data::FilePtr> subrange_files_;
334 
335  //! Subranges external File Writers
336  std::vector<data::File::Writer> subrange_writers_;
337 
338  //! File for storing data in-case we need multiple re-reduce levels.
340 };
341 
342 } // namespace core
343 } // namespace thrill
344 
345 #endif // !THRILL_CORE_REDUCE_BY_INDEX_POST_PHASE_HEADER
346 
347 /******************************************************************************/
size_t FindPartition(size_t index, size_t parts) const
Definition: math.hpp:98
bool IsEmpty() const
range is empty (begin == end)
Definition: math.hpp:66
TableItem reduce(const TableItem &a, const TableItem &b)
size_t limit_memory_bytes_
Size of the table in bytes.
void SetRange(const common::Range &range)
Sets the range of indexes to be handled by this index table.
bool IsValid() const
valid non-empty range (begin < end)
Definition: math.hpp:68
std::vector< data::FilePtr > subrange_files_
Subranges external Files.
void PushData(bool consume=false, data::File::Writer *pwriter=nullptr)
Emitter emit_
Set of emitters, one per partition.
Value neutral_element_
neutral element to fill holes in output
void Initialize(size_t limit_memory_bytes)
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
PhaseEmitter emitter_
Emitters used to parameterize hash table for output to next DIA node.
ReduceByIndexPostPhase(Context &ctx, size_t dia_id, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, const Emitter &emitter, const ReduceConfig &config=ReduceConfig(), const Value &neutral_element=Value())
A data structure which takes an arbitrary value and extracts an index using a key extractor function ...
KeyExtractor key_extractor_
Key extractor function for extracting a key from a value.
size_t neutral_element_key_
The index where the neutral element would go if acutally inserted.
ReduceConfig config_
Stored reduce config to initialize the subtable.
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t end
end index
Definition: math.hpp:58
std::vector< common::Range > subranges_
Subranges.
ReduceFunction reduce_function_
Reduce function for reducing two values.
std::vector< data::File::Writer > subrange_writers_
Subranges external File Writers.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
Range Partition(size_t i, size_t parts) const
Definition: math.hpp:85
bool neutral_element_index_occupied_
Is there an actual element at the index of the neutral element?
size_t begin
begin index
Definition: math.hpp:56
void Close()
Explicitly close the writer.
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
data::FilePtr cache_
File for storing data in-case we need multiple re-reduce levels.
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
ReduceByIndexPostPhase & operator=(const ReduceByIndexPostPhase &)=delete
non-copyable: delete assignment operator
void FlushAndConsume(data::File::Writer *writer=nullptr)
size_t size() const
size of range
Definition: math.hpp:63