Thrill  0.1
reduce_probing_hash_table.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/core/reduce_probing_hash_table.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
7  * Copyright (C) 2016 Timo Bingmann <[email protected]>
8  * Copyright (C) 2017 Tim Zeitz <[email protected]>
9  *
10  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11  ******************************************************************************/
12 
13 #pragma once
14 #ifndef THRILL_CORE_REDUCE_PROBING_HASH_TABLE_HEADER
15 #define THRILL_CORE_REDUCE_PROBING_HASH_TABLE_HEADER
16 
19 
20 #include <algorithm>
21 #include <functional>
22 #include <limits>
23 #include <utility>
24 #include <vector>
25 
26 namespace thrill {
27 namespace core {
28 
29 /*!
30  * A data structure which takes an arbitrary value and extracts a key using a
31  * key extractor function from that value. A key may also be provided initially
32  * as part of a key/value pair, not requiring to extract a key.
33  *
34  * Afterwards, the key is hashed and the hash is used to assign that key/value
35  * pair to some slot.
36  *
37  * In case a slot already has a key/value pair and the key of that value and the
38  * key of the value to be inserted are them same, the values are reduced
39  * according to some reduce function. No key/value is added to the data
40  * structure.
41  *
42  * If the keys are different, the next slot (moving to the right) is considered.
43  * If the slot is occupied, the same procedure happens again (know as linear
44  * probing.)
45  *
46  * Finally, the key/value pair to be inserted may either:
47  *
48  * 1.) Be reduced with some other key/value pair, sharing the same key.
49  * 2.) Inserted at a free slot.
50  * 3.) Trigger a resize of the data structure in case there are no more free
51  * slots in the data structure.
52  *
53  * The following illustrations shows the general structure of the data
54  * structure. The set of slots is divided into 1..n partitions. Each key is
55  * hashed into exactly one partition.
56  *
57  *
58  * Partition 0 Partition 1 Partition 2 Partition 3 Partition 4
59  * P00 P01 P02 P10 P11 P12 P20 P21 P22 P30 P31 P32 P40 P41 P42
60  * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
61  * || | | || | | || | | || | | || | | ||
62  * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
63  * <- LI ->
64  * LI..Local Index
65  * <- GI ->
66  * GI..Global Index
67  * PI 0 PI 1 PI 2 PI 3 PI 4
68  * PI..Partition ID
69  *
70  */
71 template <typename TableItem, typename Key, typename Value,
72  typename KeyExtractor, typename ReduceFunction, typename Emitter,
73  const bool VolatileKey,
74  typename ReduceConfig_,
75  typename IndexFunction,
76  typename KeyEqualFunction = std::equal_to<Key> >
78  : public ReduceTable<TableItem, Key, Value,
79  KeyExtractor, ReduceFunction, Emitter,
80  VolatileKey, ReduceConfig_,
81  IndexFunction, KeyEqualFunction>
82 {
83  using Super = ReduceTable<TableItem, Key, Value,
84  KeyExtractor, ReduceFunction, Emitter,
85  VolatileKey, ReduceConfig_, IndexFunction,
86  KeyEqualFunction>;
87  using Super::debug;
88  static constexpr bool debug_items = false;
89 
90 public:
91  using ReduceConfig = ReduceConfig_;
92 
94  Context& ctx, size_t dia_id,
95  const KeyExtractor& key_extractor,
96  const ReduceFunction& reduce_function,
97  Emitter& emitter,
98  size_t num_partitions,
99  const ReduceConfig& config = ReduceConfig(),
100  bool immediate_flush = false,
101  const IndexFunction& index_function = IndexFunction(),
102  const KeyEqualFunction& key_equal_function = KeyEqualFunction())
103  : Super(ctx, dia_id,
104  key_extractor, reduce_function, emitter,
105  num_partitions, config, immediate_flush,
107  { assert(num_partitions > 0); }
108 
109  //! Construct the hash table itself. fill it with sentinels. have one extra
110  //! cell beyond the end for reducing the sentinel itself.
112  assert(!items_);
113 
115 
116  // calculate num_buckets_per_partition_ from the memory limit and the
117  // number of partitions required, initialize partition_size_ array.
118 
119  assert(limit_memory_bytes_ >= 0 &&
120  "limit_memory_bytes must be greater than or equal to 0. "
121  "A byte size of zero results in exactly one item per partition");
122 
123  num_buckets_per_partition_ = std::max<size_t>(
124  1,
125  (size_t)(static_cast<double>(limit_memory_bytes_)
126  / static_cast<double>(sizeof(TableItem))
127  / static_cast<double>(num_partitions_)));
128 
130 
131  assert(num_buckets_per_partition_ > 0);
132  assert(num_buckets_ > 0);
133 
134  partition_size_.resize(
135  num_partitions_,
136  std::min(size_t(config_.initial_items_per_partition_),
138 
139  // calculate limit on the number of items in a partition before these
140  // are spilled to disk or flushed to network.
141 
142  double limit_fill_rate = config_.limit_partition_fill_rate();
143 
144  assert(limit_fill_rate >= 0.0 && limit_fill_rate <= 1.0
145  && "limit_partition_fill_rate must be between 0.0 and 1.0. "
146  "with a fill rate of 0.0, items are immediately flushed.");
147 
149  num_partitions_,
150  static_cast<size_t>(
151  static_cast<double>(partition_size_[0]) * limit_fill_rate));
152 
153  assert(limit_items_per_partition_[0] >= 0);
154 
155  // actually allocate the table and initialize the valid ranges, the + 1
156  // is for the sentinel's slot.
157 
158  items_ = static_cast<TableItem*>(
159  operator new ((num_buckets_ + 1) * sizeof(TableItem)));
160 
161  for (size_t id = 0; id < num_partitions_; ++id) {
162  TableItem* iter = items_ + id * num_buckets_per_partition_;
163  TableItem* pend = iter + partition_size_[id];
164 
165  for ( ; iter != pend; ++iter)
166  new (iter)TableItem();
167  }
168  }
169 
171  if (items_) Dispose();
172  }
173 
174  /*!
175  * Inserts a value into the table, potentially reducing it in case both the
176  * key of the value already in the table and the key of the value to be
177  * inserted are the same.
178  *
179  * An insert may trigger a partial flush of the partition with the most
180  * items if the maximal number of items in the table (max_num_items_table)
181  * is reached.
182  *
183  * Alternatively, it may trigger a resize of the table in case the maximal
184  * fill ratio per partition is reached.
185  *
186  * \param kv Value to be inserted into the table.
187  *
188  * \return true if a new key was inserted to the table
189  */
190  bool Insert(const TableItem& kv) {
191 
192  typename IndexFunction::Result h = calculate_index(kv);
193  assert(h.partition_id < num_partitions_);
194 
195  if (TLX_UNLIKELY(key_equal_function_(key(kv), Key()))) {
196  // handle pairs with sentinel key specially by reducing into last
197  // element of items.
198  TableItem& sentinel = items_[num_buckets_];
200  // first occurrence of sentinel key
201  new (&sentinel)TableItem(kv);
202  sentinel_partition_ = h.partition_id;
203  }
204  else {
205  sentinel = reduce(sentinel, kv);
206  return false;
207  }
208  ++items_per_partition_[h.partition_id];
209  ++num_items_;
210 
211  while (TLX_UNLIKELY(
212  items_per_partition_[h.partition_id] >
213  limit_items_per_partition_[h.partition_id])) {
214  GrowAndRehash(h.partition_id);
215  }
216 
217  return true;
218  }
219 
220  // calculate local index depending on the current subtable's size
221  size_t local_index = h.local_index(partition_size_[h.partition_id]);
222 
223  TableItem* pbegin = items_ + h.partition_id * num_buckets_per_partition_;
224  TableItem* pend = pbegin + partition_size_[h.partition_id];
225 
226  TableItem* begin_iter = pbegin + local_index;
227  TableItem* iter = begin_iter;
228 
229  while (!key_equal_function_(key(*iter), Key()))
230  {
231  if (key_equal_function_(key(*iter), key(kv)))
232  {
233  *iter = reduce(*iter, kv);
234  return false;
235  }
236 
237  ++iter;
238 
239  // wrap around if beyond the current partition
240  if (TLX_UNLIKELY(iter == pend))
241  iter = pbegin;
242 
243  // flush partition and retry, if all slots are reserved
244  if (TLX_UNLIKELY(iter == begin_iter)) {
245  GrowAndRehash(h.partition_id);
246  return Insert(kv);
247  }
248  }
249 
250  // insert new pair
251  *iter = kv;
252 
253  // increase counter for partition
254  ++items_per_partition_[h.partition_id];
255  ++num_items_;
256 
257  while (TLX_UNLIKELY(
258  items_per_partition_[h.partition_id] >=
259  limit_items_per_partition_[h.partition_id])) {
260  LOG << "Grow due to "
261  << items_per_partition_[h.partition_id] << " >= "
262  << limit_items_per_partition_[h.partition_id]
263  << " among " << partition_size_[h.partition_id];
264  GrowAndRehash(h.partition_id);
265  }
266 
267  return true;
268  }
269 
270  //! Deallocate items and memory
271  void Dispose() {
272  if (!items_) return;
273 
274  // dispose the items by destructor
275 
276  for (size_t id = 0; id < num_partitions_; ++id) {
277  TableItem* iter = items_ + id * num_buckets_per_partition_;
278  TableItem* pend = iter + partition_size_[id];
279 
280  for ( ; iter != pend; ++iter)
281  iter->~TableItem();
282  }
283 
285  items_[num_buckets_].~TableItem();
286 
287  operator delete (items_);
288  items_ = nullptr;
289 
290  Super::Dispose();
291  }
292 
293  void GrowAndRehash(size_t partition_id) {
294 
295  size_t old_size = partition_size_[partition_id];
296  GrowPartition(partition_id);
297  if (partition_size_[partition_id] == old_size) {
298  SpillPartition(partition_id);
299  return;
300  }
301 
302  if (partition_size_[partition_id] % old_size != 0) {
303  // in place rehashing won't work properly so we spill rather than
304  // potentially blasting memory limits by using an extra vector for
305  // temporary item storage
306  SpillPartition(partition_id);
307  return;
308  }
309 
310  // initialize pointers to old range - the second half is still empty
311  TableItem* pbegin =
312  items_ + partition_id * num_buckets_per_partition_;
313  TableItem* iter = pbegin;
314  TableItem* pend = pbegin + old_size;
315 
316  bool passed_first_half = false;
317  bool found_hole = false;
318  while (!passed_first_half || !found_hole) {
319  Key item_key = key(*iter);
320  bool is_empty = key_equal_function_(item_key, Key());
321  if (!is_empty) {
322  --items_per_partition_[partition_id];
323  --num_items_;
324  TableItem item = std::move(*iter);
325  new (iter)TableItem();
326  Insert(item);
327  }
328 
329  iter++;
330  found_hole = passed_first_half && is_empty;
331  passed_first_half = passed_first_half || iter == pend;
332  }
333  }
334 
335  //! Grow a partition after a spill or flush (if possible)
336  void GrowPartition(size_t partition_id) {
337 
339  SpillPartition(partition_id);
340  return;
341  }
342 
343  if (partition_size_[partition_id] == num_buckets_per_partition_)
344  return;
345 
346  size_t new_size = std::min(
347  num_buckets_per_partition_, 2 * partition_size_[partition_id]);
348 
349  sLOG << "Growing partition" << partition_id
350  << "from" << partition_size_[partition_id] << "to" << new_size
351  << "limit_items" << new_size * config_.limit_partition_fill_rate();
352 
353  // initialize new items
354 
355  TableItem* pbegin =
356  items_ + partition_id * num_buckets_per_partition_;
357  TableItem* iter = pbegin + partition_size_[partition_id];
358  TableItem* pend = pbegin + new_size;
359 
360  for ( ; iter != pend; ++iter)
361  new (iter)TableItem();
362 
363  partition_size_[partition_id] = new_size;
364  limit_items_per_partition_[partition_id]
365  = new_size * config_.limit_partition_fill_rate();
366  }
367 
368  //! \name Spilling Mechanisms to External Memory Files
369  //! \{
370 
371  //! Spill all items of a partition into an external memory File.
372  void SpillPartition(size_t partition_id) {
373 
374  if (immediate_flush_) {
375  return FlushPartition(
376  partition_id, /* consume */ true, /* grow */ !mem::memory_exceeded);
377  }
378 
379  LOG << "Spilling " << items_per_partition_[partition_id]
380  << " items of partition with id: " << partition_id;
381 
382  if (items_per_partition_[partition_id] == 0)
383  return;
384 
385  data::File::Writer writer = partition_files_[partition_id].GetWriter();
386 
387  if (sentinel_partition_ == partition_id) {
388  writer.Put(items_[num_buckets_]);
389  items_[num_buckets_].~TableItem();
391  }
392 
393  TableItem* iter = items_ + partition_id * num_buckets_per_partition_;
394  TableItem* pend = iter + partition_size_[partition_id];
395 
396  for ( ; iter != pend; ++iter) {
397  if (!key_equal_function_(key(*iter), Key())) {
398  writer.Put(*iter);
399  *iter = TableItem();
400  }
401  }
402 
403  // reset partition specific counter
404  num_items_ -= items_per_partition_[partition_id];
405  items_per_partition_[partition_id] = 0;
406  assert(num_items_ == this->num_items_calc());
407 
408  LOG << "Spilled items of partition with id: " << partition_id;
409  }
410 
411  //! Spill all items of an arbitrary partition into an external memory File.
413  // maybe make a policy later -tb
414  return SpillLargestPartition();
415  }
416 
417  //! Spill all items of the largest partition into an external memory File.
419  // get partition with max size
420  size_t size_max = 0, index = 0;
421 
422  for (size_t i = 0; i < num_partitions_; ++i)
423  {
424  if (items_per_partition_[i] > size_max)
425  {
426  size_max = items_per_partition_[i];
427  index = i;
428  }
429  }
430 
431  if (size_max == 0) {
432  return;
433  }
434 
435  return SpillPartition(index);
436  }
437 
438  //! \}
439 
440  //! \name Flushing Mechanisms to Next Stage or Phase
441  //! \{
442 
443  template <typename Emit>
445  size_t partition_id, bool consume, bool grow, Emit emit) {
446 
447  LOG << "Flushing " << items_per_partition_[partition_id]
448  << " items of partition: " << partition_id;
449 
450  if (sentinel_partition_ == partition_id) {
451  emit(partition_id, items_[num_buckets_]);
452  if (consume) {
453  items_[num_buckets_].~TableItem();
455  }
456  }
457 
458  TableItem* iter = items_ + partition_id * num_buckets_per_partition_;
459  TableItem* pend = iter + partition_size_[partition_id];
460 
461  for ( ; iter != pend; ++iter)
462  {
463  if (!key_equal_function_(key(*iter), Key())) {
464  emit(partition_id, *iter);
465 
466  if (consume)
467  *iter = TableItem();
468  }
469  }
470 
471  if (consume) {
472  // reset partition specific counter
473  num_items_ -= items_per_partition_[partition_id];
474  items_per_partition_[partition_id] = 0;
475  assert(num_items_ == this->num_items_calc());
476  }
477 
478  LOG << "Done flushed items of partition: " << partition_id;
479 
480  if (grow)
481  GrowPartition(partition_id);
482  }
483 
484  void FlushPartition(size_t partition_id, bool consume, bool grow) {
486  partition_id, consume, grow,
487  [this](const size_t& partition_id, const TableItem& p) {
488  this->emitter_.Emit(partition_id, p);
489  });
490  }
491 
492  void FlushAll() {
493  for (size_t i = 0; i < num_partitions_; ++i) {
494  FlushPartition(i, /* consume */ true, /* grow */ false);
495  }
496  }
497 
498  //! \}
499 
500 public:
502 
503 private:
504  using Super::config_;
508  using Super::key;
511  using Super::num_buckets_;
513  using Super::num_items_;
516  using Super::reduce;
517 
518  //! Storing the actual hash table.
519  TableItem* items_ = nullptr;
520 
521  //! Current sizes of the partitions because the valid allocated areas grow
522  std::vector<size_t> partition_size_;
523 
524  //! Current limits on the number of items in a partitions, different for
525  //! different partitions, because the valid allocated areas grow.
526  std::vector<size_t> limit_items_per_partition_;
527 
528  //! sentinel for invalid partition or no sentinel.
529  static constexpr size_t invalid_partition_ = size_t(-1);
530 
531  //! store the partition id of the sentinel key. implicitly this also stored
532  //! whether the sentinel key was found and reduced into
533  //! items_[num_buckets_].
535 };
536 
537 template <typename TableItem, typename Key, typename Value,
538  typename KeyExtractor, typename ReduceFunction,
539  typename Emitter, const bool VolatileKey,
540  typename ReduceConfig, typename IndexFunction,
541  typename KeyEqualFunction>
544  TableItem, Key, Value, KeyExtractor, ReduceFunction,
545  Emitter, VolatileKey, ReduceConfig, IndexFunction, KeyEqualFunction>
546 {
547 public:
549  TableItem, Key, Value, KeyExtractor, ReduceFunction,
550  Emitter, VolatileKey, ReduceConfig,
551  IndexFunction, KeyEqualFunction>;
552 };
553 
554 } // namespace core
555 } // namespace thrill
556 
557 #endif // !THRILL_CORE_REDUCE_PROBING_HASH_TABLE_HEADER
558 
559 /******************************************************************************/
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
ReduceProbingHashTable(Context &ctx, size_t dia_id, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, Emitter &emitter, size_t num_partitions, const ReduceConfig &config=ReduceConfig(), bool immediate_flush=false, const IndexFunction &index_function=IndexFunction(), const KeyEqualFunction &key_equal_function=KeyEqualFunction())
Type selection via ReduceTableImpl enum.
void SpillAnyPartition()
Spill all items of an arbitrary partition into an external memory File.
void FlushPartitionEmit(size_t partition_id, bool consume, bool grow, Emit emit)
A data structure which takes an arbitrary value and extracts a key using a key extractor function fro...
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
bool memory_exceeded
memory limit exceeded indicator
static constexpr size_t invalid_partition_
sentinel for invalid partition or no sentinel.
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
static constexpr size_t sentinel
a sentinel value prefixed to each allocation
void SpillPartition(size_t partition_id)
Spill all items of a partition into an external memory File.
ReduceTableImpl
Enum class to select a hash table implementation.
bool Insert(const TableItem &kv)
Inserts a value into the table, potentially reducing it in case both the key of the value already in ...
TableItem * items_
Storing the actual hash table.
std::vector< size_t > partition_size_
Current sizes of the partitions because the valid allocated areas grow.
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
void Initialize(size_t limit_memory_bytes)
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
void SpillLargestPartition()
Spill all items of the largest partition into an external memory File.
void GrowPartition(size_t partition_id)
Grow a partition after a spill or flush (if possible)
void Dispose()
Deallocate items and memory.
Common super-class for bucket and linear-probing hash/reduce tables.
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
void FlushPartition(size_t partition_id, bool consume, bool grow)