Thrill  0.1
reduce_by_index_post_phase.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/core/reduce_by_index_post_phase.hpp
3  *
4  * Hash table with support for reduce.
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2016 Timo Bingmann <[email protected]>
9  * Copyright (C) 2017 Tim Zeitz <[email protected]>
10  *
11  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
12  ******************************************************************************/
13 
14 #pragma once
15 #ifndef THRILL_CORE_REDUCE_BY_INDEX_POST_PHASE_HEADER
16 #define THRILL_CORE_REDUCE_BY_INDEX_POST_PHASE_HEADER
17 
18 #include <thrill/api/context.hpp>
19 #include <thrill/common/logger.hpp>
23 #include <thrill/data/file.hpp>
24 
25 #include <tlx/vector_free.hpp>
26 
27 #include <algorithm>
28 #include <cassert>
29 #include <cmath>
30 #include <functional>
31 #include <string>
32 #include <utility>
33 #include <vector>
34 
35 namespace thrill {
36 namespace core {
37 
38 template <typename TableItem, typename Key, typename Value,
39  typename KeyExtractor, typename ReduceFunction, typename Emitter,
40  const bool VolatileKey,
41  typename ReduceConfig_ = DefaultReduceConfig>
43 {
44  static constexpr bool debug = false;
45 
46 public:
47  using ReduceConfig = ReduceConfig_;
50  TableItem, Value, Emitter, VolatileKey>;
51 
52  /*!
53  * A data structure which takes an arbitrary value and extracts an index
54  * using a key extractor function from that value. Afterwards, values with
55  * the same index are merged together
56  */
58  Context& ctx,
59  size_t dia_id,
60  const KeyExtractor& key_extractor,
61  const ReduceFunction& reduce_function,
62  const Emitter& emitter,
63  const ReduceConfig& config = ReduceConfig(),
64  const Value& neutral_element = Value())
65  : ctx_(ctx), dia_id_(dia_id),
66  key_extractor_(key_extractor), reduce_function_(reduce_function),
67  config_(config), emitter_(emitter),
68  neutral_element_(neutral_element) { }
69 
70  //! non-copyable: delete copy-constructor
72  //! non-copyable: delete assignment operator
74 
75  //! Sets the range of indexes to be handled by this index table
76  void SetRange(const common::Range& range) {
77  range_ = range;
78  full_range_ = range;
79  }
80 
81  void Initialize(size_t limit_memory_bytes) {
82  assert(range_.IsValid() || range_.IsEmpty());
83  limit_memory_bytes_ = limit_memory_bytes;
84 
85  TableItem neutral =
86  MakeTableItem::Make(neutral_element_, key_extractor_);
87  neutral_element_key_ = key(neutral);
88 
89  if (range_.size() * sizeof(TableItem) < limit_memory_bytes) {
90  num_subranges_ = 0;
91 
92  // all good, we can store the whole index range
93  items_.resize(range_.size(), neutral);
94 
95  LOG << "ReduceByIndexPostPhase()"
96  << " limit_memory_bytes_=" << limit_memory_bytes_
97  << " num_subranges_=" << 0
98  << " range_=" << range_;
99  }
100  else {
101  // we have to outsource some subranges
103  1 + (range_.size() * sizeof(TableItem) / limit_memory_bytes);
104  // we keep the first subrange in memory and only the other ones go
105  // into a file
107 
108  items_.resize(range_.size(), neutral);
109 
110  LOG << "ReduceByIndexPostPhase()"
111  << " limit_memory_bytes_=" << limit_memory_bytes_
112  << " num_subranges_=" << num_subranges_
113  << " full_range_=" << full_range_
114  << " range_=" << range_
115  << " range_.size()=" << range_.size();
116 
117  subranges_.reserve(num_subranges_ - 1);
118  subrange_files_.reserve(num_subranges_ - 1);
119  subrange_writers_.reserve(num_subranges_ - 1);
120 
121  for (size_t partition = 1; partition < num_subranges_; partition++) {
122  auto file = ctx_.GetFilePtr(dia_id_);
123  auto writer = file->GetWriter();
124  common::Range subrange =
125  full_range_.Partition(partition, num_subranges_);
126 
127  LOG << "ReduceByIndexPostPhase()"
128  << " partition=" << partition
129  << " subrange=" << subrange;
130 
131  subranges_.emplace_back(subrange);
132  subrange_files_.emplace_back(std::move(file));
133  subrange_writers_.emplace_back(std::move(writer));
134  }
135  }
136  }
137 
138  bool Insert(const TableItem& kv) {
139  size_t item_key = key(kv);
140  assert(item_key >= full_range_.begin && item_key < full_range_.end);
141 
142  LOG << "Insert() item_key=" << item_key
143  << " full_range_=" << full_range_
144  << " range_" << range_;
145 
146  if (item_key < range_.end) {
147  // items is in the main range
148  size_t offset = item_key - full_range_.begin;
149 
150  if (item_key != neutral_element_key_) {
151  // normal index
152  if (key(items_[offset]) == item_key) {
153  items_[offset] = reduce(items_[offset], kv);
154  return false;
155  }
156  else {
157  items_[offset] = kv;
158  return true;
159  }
160  }
161  else {
162  // special handling for element with neutral index
164  items_[offset] = reduce(items_[offset], kv);
165  return false;
166  }
167  else {
168  items_[offset] = kv;
170  return true;
171  }
172  }
173  }
174  else {
175  // items has to be stored in an overflow File
176  size_t r = full_range_.FindPartition(item_key, num_subranges_) - 1;
177 
178  const common::Range& subrange = subranges_.at(r);
179  data::File::Writer& writer = subrange_writers_.at(r);
180 
181  LOG << "Insert() item_key=" << item_key
182  << " r=" << r
183  << " subrange=" << subrange;
184 
185  assert(item_key >= subrange.begin && item_key < subrange.end);
186  writer.Put(kv);
187  return false;
188  }
189  }
190 
191  void PushData(bool consume = false, data::File::Writer* pwriter = nullptr) {
192  assert(!pwriter || consume);
193 
194  if (cache_) {
195  // previous PushData() has stored data in cache_
196  data::File::Reader reader = cache_->GetReader(consume);
197  while (reader.HasNext())
198  emitter_.Emit(reader.Next<TableItem>());
199  return;
200  }
201 
202  if (!consume) {
203  if (subranges_.empty()) {
204  Flush();
205  }
206  else {
208  data::File::Writer writer = cache_->GetWriter();
209  PushData(true, &writer);
210  cache_ = cache;
211  writer.Close();
212  }
213  return;
214  }
215 
216  // close File writers
217  for (auto& w : subrange_writers_) {
218  w.Close();
219  }
220 
221  if (pwriter) {
222  FlushAndConsume<true>(pwriter);
223  }
224  else {
225  FlushAndConsume();
226  }
227 
228  for (size_t i = 0; i < subranges_.size(); ++i) {
229  ReduceByIndexPostPhase<TableItem, Key, Value, KeyExtractor,
230  ReduceFunction, Emitter, VolatileKey,
231  ReduceConfig>
234  subtable.SetRange(subranges_[i]);
235  subtable.Initialize(limit_memory_bytes_);
236 
237  {
238  // insert items
239  auto reader = subrange_files_[i]->GetConsumeReader();
240  while (reader.HasNext()) {
241  subtable.Insert(reader.template Next<TableItem>());
242  }
243  }
244 
245  subtable.PushData(consume || pwriter, pwriter);
246 
247  // delete File
248  subrange_files_[i].reset();
249  }
250  }
251 
252  void Dispose() {
254 
258  }
259 
260 private:
261  void Flush() {
262  for (auto it = items_.begin(); it != items_.end(); ++it) {
263  emitter_.Emit(*it);
264  }
265  }
266 
267  template <bool DoCache = false>
268  void FlushAndConsume(data::File::Writer* writer = nullptr) {
269  for (auto it = items_.begin(); it != items_.end(); ++it) {
270  emitter_.Emit(*it);
271  if (DoCache) { writer->Put(*it); }
272  }
274  // free array
276  }
277 
278  Key key(const TableItem& t) {
279  return MakeTableItem::GetKey(t, key_extractor_);
280  }
281 
282  TableItem reduce(const TableItem& a, const TableItem& b) {
283  return MakeTableItem::Reduce(a, b, reduce_function_);
284  }
285 
286  //! Context
288 
289  //! Associated DIA id
290  size_t dia_id_;
291 
292  //! Key extractor function for extracting a key from a value.
293  KeyExtractor key_extractor_;
294 
295  //! Reduce function for reducing two values.
296  ReduceFunction reduce_function_;
297 
298  //! Stored reduce config to initialize the subtable.
300 
301  //! Emitters used to parameterize hash table for output to next DIA node.
303 
304  //! neutral element to fill holes in output
306 
307  //! Size of the table in bytes
309 
310  //! The index where the neutral element would go if acutally inserted
312 
313  //! Is there an actual element at the index of the neutral element?
315 
316  //! Range of indexes actually managed in this instance -
317  //! not including subranges
319 
320  //! Full range of indexes actually managed in this instance -
321  //! including subranges
323 
324  //! Store for items in range of this worker.
325  //! Stored in reverse order so we can consume while emitting.
326  std::vector<TableItem> items_;
327 
328  //! number of subranges
330 
331  //! Subranges
332  std::vector<common::Range> subranges_;
333 
334  //! Subranges external Files
335  std::vector<data::FilePtr> subrange_files_;
336 
337  //! Subranges external File Writers
338  std::vector<data::File::Writer> subrange_writers_;
339 
340  //! File for storing data in-case we need multiple re-reduce levels.
342 };
343 
344 } // namespace core
345 } // namespace thrill
346 
347 #endif // !THRILL_CORE_REDUCE_BY_INDEX_POST_PHASE_HEADER
348 
349 /******************************************************************************/
size_t size() const
size of range
Definition: math.hpp:63
TableItem reduce(const TableItem &a, const TableItem &b)
size_t limit_memory_bytes_
Size of the table in bytes.
void SetRange(const common::Range &range)
Sets the range of indexes to be handled by this index table.
std::vector< data::FilePtr > subrange_files_
Subranges external Files.
void PushData(bool consume=false, data::File::Writer *pwriter=nullptr)
Emitter emit_
Set of emitters, one per partition.
Value neutral_element_
neutral element to fill holes in output
void Initialize(size_t limit_memory_bytes)
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
Range Partition(size_t i, size_t parts) const
Definition: math.hpp:85
bool IsValid() const
valid non-empty range (begin < end)
Definition: math.hpp:68
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
PhaseEmitter emitter_
Emitters used to parameterize hash table for output to next DIA node.
ReduceByIndexPostPhase(Context &ctx, size_t dia_id, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, const Emitter &emitter, const ReduceConfig &config=ReduceConfig(), const Value &neutral_element=Value())
A data structure which takes an arbitrary value and extracts an index using a key extractor function ...
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
KeyExtractor key_extractor_
Key extractor function for extracting a key from a value.
size_t neutral_element_key_
The index where the neutral element would go if acutally inserted.
ReduceConfig config_
Stored reduce config to initialize the subtable.
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t end
end index
Definition: math.hpp:58
std::vector< common::Range > subranges_
Subranges.
data::FilePtr GetFilePtr(size_t dia_id)
Definition: context.cpp:1200
void vector_free(std::vector< Type > &v)
Definition: vector_free.hpp:21
ReduceFunction reduce_function_
Reduce function for reducing two values.
std::vector< data::File::Writer > subrange_writers_
Subranges external File Writers.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
size_t FindPartition(size_t index, size_t parts) const
Definition: math.hpp:98
bool neutral_element_index_occupied_
Is there an actual element at the index of the neutral element?
size_t begin
begin index
Definition: math.hpp:56
void Close()
Explicitly close the writer.
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
data::FilePtr cache_
File for storing data in-case we need multiple re-reduce levels.
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
bool IsEmpty() const
range is empty (begin == end)
Definition: math.hpp:66
ReduceByIndexPostPhase & operator=(const ReduceByIndexPostPhase &)=delete
non-copyable: delete assignment operator
void FlushAndConsume(data::File::Writer *writer=nullptr)