Thrill  0.1
reduce_bucket_hash_table.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/core/reduce_bucket_hash_table.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
7  * Copyright (C) 2016 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_CORE_REDUCE_BUCKET_HASH_TABLE_HEADER
14 #define THRILL_CORE_REDUCE_BUCKET_HASH_TABLE_HEADER
15 
18 
19 #include <tlx/vector_free.hpp>
20 
21 #include <algorithm>
22 #include <functional>
23 #include <limits>
24 #include <stack>
25 #include <utility>
26 #include <vector>
27 
28 namespace thrill {
29 namespace core {
30 
31 /*!
32  * A data structure which takes an arbitrary value and extracts a key using a
33  * key extractor function from that value. A key may also be provided initially
34  * as part of a key/value pair, not requiring to extract a key.
35  *
36  * Afterwards, the key is hashed and the hash is used to assign that key/value
37  * pair to some bucket. A bucket can have one or more slots to store
38  * items. There are max_num_items_per_table_per_bucket slots in each bucket.
39  *
40  * In case a slot already has a key/value pair and the key of that value and the
41  * key of the value to be inserted are them same, the values are reduced
42  * according to some reduce function. No key/value is added to the current
43  * bucket.
44  *
45  * If the keys are different, the next slot (moving down) is considered. If the
46  * slot is occupied, the same procedure happens again. This prociedure may be
47  * considered as linear probing within the scope of a bucket.
48  *
49  * Finally, the key/value pair to be inserted may either:
50  *
51  * 1.) Be reduced with some other key/value pair, sharing the same key.
52  * 2.) Inserted at a free slot in the bucket.
53  * 3.) Trigger a resize of the data structure in case there are no more free
54  * slots in the bucket.
55  *
56  * The following illustrations shows the general structure of the data
57  * structure. There are several buckets containing one or more slots. Each slot
58  * may store a item. In order to optimize I/O, slots are organized in bucket
59  * blocks. Bucket blocks are connected by pointers. Key/value pairs are directly
60  * stored in a bucket block, no pointers are required here.
61  *
62  *
63  * Partition 0 Partition 1 Partition 2 Partition 3 Partition 4
64  * B00 B01 B02 B10 B11 B12 B20 B21 B22 B30 B31 B32 B40 B41 B42
65  * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
66  * || | | || | | || | | || | | || | | ||
67  * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
68  * | | | | | | | | | | | | | | |
69  * V V V V V V V V V V V V V V >
70  * +---+ +---+
71  * | | | |
72  * +---+ +---+ ...
73  * | | | |
74  * +---+ +---+
75  * | |
76  * V V
77  * +---+ +---+
78  * | | | |
79  * +---+ +---+ ...
80  * | | | |
81  * +---+ +---+
82  *
83  */
84 template <typename TableItem, typename Key, typename Value,
85  typename KeyExtractor, typename ReduceFunction, typename Emitter,
86  const bool VolatileKey,
87  typename ReduceConfig, typename IndexFunction,
88  typename KeyEqualFunction = std::equal_to<Key> >
90  : public ReduceTable<TableItem, Key, Value,
91  KeyExtractor, ReduceFunction, Emitter,
92  VolatileKey, ReduceConfig, IndexFunction,
93  KeyEqualFunction>
94 {
95  using Super = ReduceTable<TableItem, Key, Value,
96  KeyExtractor, ReduceFunction, Emitter,
97  VolatileKey, ReduceConfig, IndexFunction,
98  KeyEqualFunction>;
99 
100  using Super::debug;
101  static constexpr bool debug_items = false;
102 
103  //! target number of bytes in a BucketBlock.
104  static constexpr size_t bucket_block_size
105  = ReduceConfig::bucket_block_size_;
106 
107 public:
108  //! calculate number of items such that each BucketBlock has about 1 MiB of
109  //! size, or at least 8 items.
110  static constexpr size_t block_size_ =
111  common::max<size_t>(1, bucket_block_size / sizeof(TableItem));
112 
113  //! Block holding reduce key/value pairs.
114  struct BucketBlock {
115  //! number of _used_/constructed items in this block. next is unused if
116  //! size != block_size.
117  size_t size;
118 
119  //! link of linked list to next block
121 
122  //! memory area of items
124 
125  //! helper to destroy all allocated items
126  void destroy_items() {
127  for (TableItem* i = items; i != items + size; ++i) {
128  i->~TableItem();
129  }
130  }
131  };
132 
133  using BucketBlockIterator = typename std::vector<BucketBlock*>::iterator;
134 
135 public:
137  Context& ctx, size_t dia_id,
138  const KeyExtractor& key_extractor,
139  const ReduceFunction& reduce_function,
140  Emitter& emitter,
141  size_t num_partitions,
142  const ReduceConfig& config = ReduceConfig(),
143  bool immediate_flush = false,
144  const IndexFunction& index_function = IndexFunction(),
145  const KeyEqualFunction& key_equal_function = KeyEqualFunction())
146  : Super(ctx, dia_id,
147  key_extractor, reduce_function, emitter,
148  num_partitions, config, immediate_flush,
150 
151  assert(num_partitions > 0);
152  }
153 
154  //! Construct the hash table itself. fill it with sentinels
156 
158 
159  // calculate maximum number of blocks allowed in a partition due to the
160  // memory limit.
161 
162  assert(limit_memory_bytes_ >= 0 &&
163  "limit_memory_bytes must be greater than or equal to 0. "
164  "a byte size of zero results in exactly one item per partition");
165 
166  max_blocks_per_partition_ = std::max<size_t>(
167  1,
168  (size_t)(static_cast<double>(limit_memory_bytes_)
169  / static_cast<double>(num_partitions_)
170  / static_cast<double>(sizeof(BucketBlock))));
171 
172  assert(max_blocks_per_partition_ > 0);
173 
174  // calculate limit on the number of _items_ in a partition before these
175  // are spilled to disk or flushed to network.
176 
177  double limit_fill_rate = config_.limit_partition_fill_rate();
178 
179  assert(limit_fill_rate >= 0.0 && limit_fill_rate <= 1.0
180  && "limit_partition_fill_rate must be between 0.0 and 1.0. "
181  "with a fill rate of 0.0, items are immediately flushed.");
182 
184 
185  limit_items_per_partition_ = (size_t)(
186  static_cast<double>(max_items_per_partition_) * limit_fill_rate);
187 
188  assert(max_items_per_partition_ > 0);
189  assert(limit_items_per_partition_ >= 0);
190 
191  // calculate number of slots in a partition of the bucket table, i.e.,
192  // the number of bucket pointers per partition
193 
194  double bucket_rate = config_.bucket_rate();
195 
196  assert(bucket_rate >= 0.0 &&
197  "bucket_rate must be greater than or equal 0. "
198  "a bucket rate of 0.0 causes exactly 1 bucket per partition.");
199 
200  num_buckets_per_partition_ = std::max<size_t>(
201  1,
202  (size_t)(static_cast<double>(max_blocks_per_partition_)
203  * bucket_rate));
204 
205  assert(num_buckets_per_partition_ > 0);
206 
207  // reduce max number of blocks per partition to cope for the memory
208  // needed for pointers
209 
210  max_blocks_per_partition_ -= std::max<size_t>(
211  0,
212  (size_t)(std::ceil(
213  static_cast<double>(
215  / static_cast<double>(sizeof(BucketBlock)))));
216 
218 
219  // finally, calculate number of buckets and allocate the table
220 
222  limit_blocks_ = max_blocks_per_partition_ * num_partitions_;
223 
224  assert(num_buckets_ > 0);
225  assert(limit_blocks_ > 0);
226 
227  sLOG << "num_partitions_" << num_partitions_
228  << "num_buckets_per_partition_" << num_buckets_per_partition_
229  << "num_buckets_" << num_buckets_;
230 
231  buckets_.resize(num_buckets_, nullptr);
232  }
233 
234  //! non-copyable: delete copy-constructor
236  //! non-copyable: delete assignment operator
238 
240  Dispose();
241  }
242 
243  /*!
244  * Inserts a value into the table, potentially reducing it in case both the
245  * key of the value already in the table and the key of the value to be
246  * inserted are the same.
247  *
248  * An insert may trigger a partial flush of the partition with the most
249  * items if the maximal number of items in the table
250  * (max_items_per_table_table) is reached.
251  *
252  * Alternatively, it may trigger a resize of table in case maximal number of
253  * items per bucket is reached.
254  *
255  * \param kv Value to be inserted into the table.
256  *
257  * \return true if a new key was inserted to the table
258  */
259  bool Insert(const TableItem& kv) {
260 
263 
264  typename IndexFunction::Result h = calculate_index(kv);
265 
266  size_t local_index = h.local_index(num_buckets_per_partition_);
267 
268  assert(h.partition_id < num_partitions_);
269  assert(local_index < num_buckets_per_partition_);
270 
271  size_t global_index =
272  h.partition_id * num_buckets_per_partition_ + local_index;
273  BucketBlock* current = buckets_[global_index];
274 
275  while (current != nullptr)
276  {
277  // iterate over valid items in a block
278  for (TableItem* bi = current->items;
279  bi != current->items + current->size; ++bi)
280  {
281  // if item and key equals, then reduce.
282  if (key_equal_function_(key(kv), key(*bi)))
283  {
284  *bi = reduce(*bi, kv);
285  return false;
286  }
287  }
288  current = current->next;
289  }
290 
291  // have an item that needs to be added.
292 
293  current = buckets_[global_index];
294 
295  if (current == nullptr || current->size == block_size_)
296  {
297  // new block needed.
298 
299  // flush largest partition if max number of blocks reached
300  while (num_blocks_ > limit_blocks_)
302 
303  // allocate a new block of uninitialized items, prepend to bucket
304  current = block_pool_.GetBlock();
305  current->next = buckets_[global_index];
306  buckets_[global_index] = current;
307 
308  // Total number of blocks
309  ++num_blocks_;
310  }
311 
312  // in-place construct/insert new item in current bucket block
313  new (current->items + current->size++)TableItem(kv);
314 
316  << "h.partition_id" << h.partition_id;
317 
318  // Increase partition item count
319  ++items_per_partition_[h.partition_id];
320  ++num_items_;
321 
323  << "items_per_partition_[" << h.partition_id << "]"
324  << items_per_partition_[h.partition_id];
325 
326  // flush current partition if max partition fill rate reached
327  while (items_per_partition_[h.partition_id] > limit_items_per_partition_)
328  SpillPartition(h.partition_id);
329 
330  return true;
331  }
332 
333  //! Deallocate memory
334  void Dispose() {
335  // destroy all block chains
336  for (BucketBlock* b_block : buckets_)
337  {
338  BucketBlock* current = b_block;
339  while (current != nullptr)
340  {
341  // destroy block and advance to next
342  BucketBlock* next = current->next;
343  current->destroy_items();
344  operator delete (current);
345  current = next;
346  }
347  }
348 
349  // destroy vector and block pool
350  tlx::vector_free(buckets_);
352 
353  Super::Dispose();
354  }
355 
356  //! \name Spilling Mechanisms to External Memory Files
357  //! \{
358 
359  //! Spill all items of an arbitrary partition into an external memory File.
361  // maybe make a policy later -tb
362  return SpillLargestPartition();
363  }
364 
365  //! Spill all items of a partition into an external memory File.
366  void SpillPartition(size_t partition_id) {
367 
368  if (immediate_flush_) {
369  return FlushPartition(
370  partition_id, /* consume */ true, /* grow */ true);
371  }
372 
373  sLOG << "Spilling" << items_per_partition_[partition_id]
374  << "items of partition" << partition_id
375  << "buckets: [" << partition_id * num_buckets_per_partition_
376  << "," << (partition_id + 1) * num_buckets_per_partition_ << ")";
377 
378  if (items_per_partition_[partition_id] == 0)
379  return;
380 
381  data::File::Writer writer = partition_files_[partition_id].GetWriter();
382 
383  BucketBlockIterator iter =
384  buckets_.begin() + partition_id * num_buckets_per_partition_;
385  BucketBlockIterator end =
386  buckets_.begin() + (partition_id + 1) * num_buckets_per_partition_;
387 
388  for ( ; iter != end; ++iter)
389  {
390  BucketBlock* current = *iter;
391 
392  while (current != nullptr)
393  {
394  for (TableItem* bi = current->items;
395  bi != current->items + current->size; ++bi)
396  {
397  writer.Put(*bi);
398  }
399 
400  // destroy block and advance to next
401  BucketBlock* next = current->next;
402  block_pool_.Deallocate(current);
403  --num_blocks_;
404  current = next;
405  }
406 
407  *iter = nullptr;
408  }
409 
410  // reset partition specific counter
411  num_items_ -= items_per_partition_[partition_id];
412  items_per_partition_[partition_id] = 0;
413  assert(num_items_ == this->num_items_calc());
414 
415  sLOG << "Spilled items of partition" << partition_id;
416  }
417 
418  //! Spill all items of the largest partition into an external memory File.
420  // get partition with max size
421  size_t size_max = 0, index = 0;
422 
423  for (size_t i = 0; i < num_partitions_; ++i)
424  {
425  if (items_per_partition_[i] > size_max)
426  {
427  size_max = items_per_partition_[i];
428  index = i;
429  }
430  }
431 
432  if (size_max == 0) {
433  return;
434  }
435 
436  return SpillPartition(index);
437  }
438 
439  //! Spill all items of the smallest non-empty partition into an external
440  //! memory File.
442  // get partition with min size
443  size_t size_min = std::numeric_limits<size_t>::max(), index = 0;
444 
445  for (size_t i = 0; i < num_partitions_; ++i)
446  {
447  if (items_per_partition_[i] < size_min
448  && items_per_partition_[i] != 0)
449  {
450  size_min = items_per_partition_[i];
451  index = i;
452  }
453  }
454 
455  if (size_min == 0 || size_min == std::numeric_limits<size_t>::max()) {
456  return;
457  }
458 
459  return SpillPartition(index);
460  }
461 
462  //! \}
463 
464  //! \name Flushing Mechanisms to Next Stage or Phase
465  //! \{
466 
467  template <typename Emit>
469  size_t partition_id, bool consume, bool /* grow */, Emit emit) {
470 
471  LOG << "Flushing " << items_per_partition_[partition_id]
472  << " items of partition: " << partition_id;
473 
474  if (items_per_partition_[partition_id] == 0) return;
475 
476  BucketBlockIterator iter =
477  buckets_.begin() + partition_id * num_buckets_per_partition_;
478  BucketBlockIterator end =
479  buckets_.begin() + (partition_id + 1) * num_buckets_per_partition_;
480 
481  for ( ; iter != end; ++iter)
482  {
483  BucketBlock* current = *iter;
484 
485  while (current != nullptr)
486  {
487  for (TableItem* bi = current->items;
488  bi != current->items + current->size; ++bi)
489  {
490  emit(partition_id, *bi);
491  }
492 
493  if (consume) {
494  // destroy block and advance to next
495  BucketBlock* next = current->next;
496  block_pool_.Deallocate(current);
497  --num_blocks_;
498  current = next;
499  }
500  else {
501  // advance to next
502  current = current->next;
503  }
504  }
505 
506  if (consume)
507  *iter = nullptr;
508  }
509 
510  if (consume) {
511  // reset partition specific counter
512  num_items_ -= items_per_partition_[partition_id];
513  items_per_partition_[partition_id] = 0;
514  assert(num_items_ == this->num_items_calc());
515  }
516 
517  LOG << "Done flushing items of partition: " << partition_id;
518  }
519 
520  void FlushPartition(size_t partition_id, bool consume, bool grow) {
522  partition_id, consume, grow,
523  [this](const size_t& partition_id, const TableItem& p) {
524  this->emitter_.Emit(partition_id, p);
525  });
526  }
527 
528  void FlushAll() {
529  for (size_t i = 0; i < num_partitions_; ++i) {
530  FlushPartition(i, /* consume */ true, /* grow */ false);
531  }
532  }
533 
534  //! \}
535 
536  //! \name Accessors
537  //! \{
538 
539  /*!
540  * Returns the number of block in the table.
541  *
542  * \return Number of blocks in the table.
543  */
544  size_t num_blocks() const {
545  return num_blocks_;
546  }
547 
548  //! \}
549 
550 protected:
551  //! BucketBlockPool to stack allocated BucketBlocks
553  {
554  public:
555  BucketBlockPool() = default;
556 
557  //! non-copyable: delete copy-constructor
558  BucketBlockPool(const BucketBlockPool&) = delete;
559  //! non-copyable: delete assignment operator
560  BucketBlockPool& operator = (const BucketBlockPool&) = delete;
561  //! move-constructor: default
562  BucketBlockPool(BucketBlockPool&&) = default;
563  //! move-assignment operator: default
565 
567  Destroy();
568  }
569 
570  // allocate a chunk of memory as big as Type needs:
572  BucketBlock* place;
573  if (!free.empty()) {
574  place = static_cast<BucketBlock*>(free.top());
575  free.pop();
576  }
577  else {
578  place = static_cast<BucketBlock*>(operator new (sizeof(BucketBlock)));
579  place->size = 0;
580  place->next = nullptr;
581  }
582 
583  return place;
584  }
585 
586  // mark some memory as available (no longer used):
588  o->size = 0;
589  o->next = nullptr;
590  free.push(static_cast<BucketBlock*>(o));
591  }
592 
593  void Destroy() {
594  while (!free.empty()) {
595  free.top()->destroy_items();
596  operator delete (free.top());
597  free.pop();
598  }
599  }
600 
601  private:
602  // stack to hold pointers to free chunks:
603  std::stack<BucketBlock*> free;
604  };
605 
606 public:
608 
609 private:
610  using Super::config_;
614  using Super::key;
618  using Super::num_buckets_;
620  using Super::num_items_;
623  using Super::reduce;
624 
625  //! Storing the items.
626  std::vector<BucketBlock*> buckets_;
627 
628  //! Bucket block pool.
630 
631  //! \name Fixed Operational Parameters
632  //! \{
633 
634  //! Number of blocks in the table before some items are spilled.
636 
637  //! Maximal number of items per partition.
639 
640  //! Maximal number of blocks per partition.
642 
643  //! \}
644 
645  //! \name Current Statistical Parameters
646  //! \{
647 
648  //! Total number of blocks in the table.
649  size_t num_blocks_ = 0;
650 
651  //! \}
652 };
653 
654 template <typename TableItem, typename Key, typename Value,
655  typename KeyExtractor, typename ReduceFunction,
656  typename Emitter, const bool VolatileKey,
657  typename ReduceConfig, typename IndexFunction,
658  typename KeyEqualFunction>
661  TableItem, Key, Value, KeyExtractor, ReduceFunction,
662  Emitter, VolatileKey, ReduceConfig, IndexFunction, KeyEqualFunction>
663 {
664 public:
665  using type = ReduceBucketHashTable<
666  TableItem, Key, Value, KeyExtractor, ReduceFunction,
667  Emitter, VolatileKey, ReduceConfig,
668  IndexFunction, KeyEqualFunction>;
669 };
670 
671 } // namespace core
672 } // namespace thrill
673 
674 #endif // !THRILL_CORE_REDUCE_BUCKET_HASH_TABLE_HEADER
675 
676 /******************************************************************************/
A data structure which takes an arbitrary value and extracts a key using a key extractor function fro...
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
BucketBlockPool to stack allocated BucketBlocks.
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
typename std::vector< BucketBlock * >::iterator BucketBlockIterator
void SpillAnyPartition()
Spill all items of an arbitrary partition into an external memory File.
Type selection via ReduceTableImpl enum.
ReduceBucketHashTable(Context &ctx, size_t dia_id, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, Emitter &emitter, size_t num_partitions, const ReduceConfig &config=ReduceConfig(), bool immediate_flush=false, const IndexFunction &index_function=IndexFunction(), const KeyEqualFunction &key_equal_function=KeyEqualFunction())
bool Insert(const TableItem &kv)
Inserts a value into the table, potentially reducing it in case both the key of the value already in ...
void SpillLargestPartition()
Spill all items of the largest partition into an external memory File.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
void Initialize(size_t limit_memory_bytes)
Construct the hash table itself. fill it with sentinels.
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
bool memory_exceeded
memory limit exceeded indicator
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
BucketBlock * next
link of linked list to next block
ReduceTableImpl
Enum class to select a hash table implementation.
static constexpr size_t bucket_block_size
target number of bytes in a BucketBlock.
ReduceBucketHashTable & operator=(const ReduceBucketHashTable &)=delete
non-copyable: delete assignment operator
void SpillPartition(size_t partition_id)
Spill all items of a partition into an external memory File.
std::vector< BucketBlock * > buckets_
Storing the items.
void destroy_items()
helper to destroy all allocated items
void vector_free(std::vector< Type > &v)
Definition: vector_free.hpp:21
void FlushPartitionEmit(size_t partition_id, bool consume, bool, Emit emit)
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
size_t max_blocks_per_partition_
Maximal number of blocks per partition.
BucketBlockPool block_pool_
Bucket block pool.
Common super-class for bucket and linear-probing hash/reduce tables.
size_t num_blocks() const
Returns the number of block in the table.
void free(void *ptr) NOEXCEPT
exported free symbol that overrides loading from libc
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
void FlushPartition(size_t partition_id, bool consume, bool grow)
TableItem items[block_size_]
memory area of items
size_t num_blocks_
Total number of blocks in the table.
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
size_t limit_blocks_
Number of blocks in the table before some items are spilled.
size_t max_items_per_partition_
Maximal number of items per partition.