Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
reduce_bucket_hash_table.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/core/reduce_bucket_hash_table.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
7  * Copyright (C) 2016 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_CORE_REDUCE_BUCKET_HASH_TABLE_HEADER
14 #define THRILL_CORE_REDUCE_BUCKET_HASH_TABLE_HEADER
15 
18 
19 #include <algorithm>
20 #include <functional>
21 #include <limits>
22 #include <stack>
23 #include <utility>
24 #include <vector>
25 
26 namespace thrill {
27 namespace core {
28 
29 /*!
30  * A data structure which takes an arbitrary value and extracts a key using a
31  * key extractor function from that value. A key may also be provided initially
32  * as part of a key/value pair, not requiring to extract a key.
33  *
34  * Afterwards, the key is hashed and the hash is used to assign that key/value
35  * pair to some bucket. A bucket can have one or more slots to store
36  * items. There are max_num_items_per_table_per_bucket slots in each bucket.
37  *
38  * In case a slot already has a key/value pair and the key of that value and the
39  * key of the value to be inserted are them same, the values are reduced
40  * according to some reduce function. No key/value is added to the current
41  * bucket.
42  *
43  * If the keys are different, the next slot (moving down) is considered. If the
44  * slot is occupied, the same procedure happens again. This prociedure may be
45  * considered as linear probing within the scope of a bucket.
46  *
47  * Finally, the key/value pair to be inserted may either:
48  *
49  * 1.) Be reduced with some other key/value pair, sharing the same key.
50  * 2.) Inserted at a free slot in the bucket.
51  * 3.) Trigger a resize of the data structure in case there are no more free
52  * slots in the bucket.
53  *
54  * The following illustrations shows the general structure of the data
55  * structure. There are several buckets containing one or more slots. Each slot
56  * may store a item. In order to optimize I/O, slots are organized in bucket
57  * blocks. Bucket blocks are connected by pointers. Key/value pairs are directly
58  * stored in a bucket block, no pointers are required here.
59  *
60  *
61  * Partition 0 Partition 1 Partition 2 Partition 3 Partition 4
62  * B00 B01 B02 B10 B11 B12 B20 B21 B22 B30 B31 B32 B40 B41 B42
63  * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
64  * || | | || | | || | | || | | || | | ||
65  * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
66  * | | | | | | | | | | | | | | |
67  * V V V V V V V V V V V V V V >
68  * +---+ +---+
69  * | | | |
70  * +---+ +---+ ...
71  * | | | |
72  * +---+ +---+
73  * | |
74  * V V
75  * +---+ +---+
76  * | | | |
77  * +---+ +---+ ...
78  * | | | |
79  * +---+ +---+
80  *
81  */
82 template <typename TableItem, typename Key, typename Value,
83  typename KeyExtractor, typename ReduceFunction, typename Emitter,
84  const bool VolatileKey,
85  typename ReduceConfig, typename IndexFunction,
86  typename KeyEqualFunction = std::equal_to<Key> >
88  : public ReduceTable<TableItem, Key, Value,
89  KeyExtractor, ReduceFunction, Emitter,
90  VolatileKey, ReduceConfig, IndexFunction,
91  KeyEqualFunction>
92 {
93  using Super = ReduceTable<TableItem, Key, Value,
94  KeyExtractor, ReduceFunction, Emitter,
95  VolatileKey, ReduceConfig, IndexFunction,
96  KeyEqualFunction>;
97 
98  using Super::debug;
99  static constexpr bool debug_items = false;
100 
101  //! target number of bytes in a BucketBlock.
102  static constexpr size_t bucket_block_size
103  = ReduceConfig::bucket_block_size_;
104 
105 public:
106  //! calculate number of items such that each BucketBlock has about 1 MiB of
107  //! size, or at least 8 items.
108  static constexpr size_t block_size_ =
109  common::max<size_t>(1, bucket_block_size / sizeof(TableItem));
110 
111  //! Block holding reduce key/value pairs.
112  struct BucketBlock {
113  //! number of _used_/constructed items in this block. next is unused if
114  //! size != block_size.
115  size_t size;
116 
117  //! link of linked list to next block
119 
120  //! memory area of items
122 
123  //! helper to destroy all allocated items
124  void destroy_items() {
125  for (TableItem* i = items; i != items + size; ++i) {
126  i->~TableItem();
127  }
128  }
129  };
130 
131  using BucketBlockIterator = typename std::vector<BucketBlock*>::iterator;
132 
133 public:
135  Context& ctx, size_t dia_id,
136  const KeyExtractor& key_extractor,
137  const ReduceFunction& reduce_function,
138  Emitter& emitter,
139  size_t num_partitions,
140  const ReduceConfig& config = ReduceConfig(),
141  bool immediate_flush = false,
142  const IndexFunction& index_function = IndexFunction(),
143  const KeyEqualFunction& key_equal_function = KeyEqualFunction())
144  : Super(ctx, dia_id,
145  key_extractor, reduce_function, emitter,
146  num_partitions, config, immediate_flush,
148 
149  assert(num_partitions > 0);
150  }
151 
152  //! Construct the hash table itself. fill it with sentinels
154 
156 
157  // calculate maximum number of blocks allowed in a partition due to the
158  // memory limit.
159 
160  assert(limit_memory_bytes_ >= 0 &&
161  "limit_memory_bytes must be greater than or equal to 0. "
162  "a byte size of zero results in exactly one item per partition");
163 
164  max_blocks_per_partition_ = std::max<size_t>(
165  1,
166  (size_t)(static_cast<double>(limit_memory_bytes_)
167  / static_cast<double>(num_partitions_)
168  / static_cast<double>(sizeof(BucketBlock))));
169 
170  assert(max_blocks_per_partition_ > 0);
171 
172  // calculate limit on the number of _items_ in a partition before these
173  // are spilled to disk or flushed to network.
174 
175  double limit_fill_rate = config_.limit_partition_fill_rate();
176 
177  assert(limit_fill_rate >= 0.0 && limit_fill_rate <= 1.0
178  && "limit_partition_fill_rate must be between 0.0 and 1.0. "
179  "with a fill rate of 0.0, items are immediately flushed.");
180 
182 
183  limit_items_per_partition_ = (size_t)(
184  static_cast<double>(max_items_per_partition_) * limit_fill_rate);
185 
186  assert(max_items_per_partition_ > 0);
187  assert(limit_items_per_partition_ >= 0);
188 
189  // calculate number of slots in a partition of the bucket table, i.e.,
190  // the number of bucket pointers per partition
191 
192  double bucket_rate = config_.bucket_rate();
193 
194  assert(bucket_rate >= 0.0 &&
195  "bucket_rate must be greater than or equal 0. "
196  "a bucket rate of 0.0 causes exactly 1 bucket per partition.");
197 
198  num_buckets_per_partition_ = std::max<size_t>(
199  1,
200  (size_t)(static_cast<double>(max_blocks_per_partition_)
201  * bucket_rate));
202 
203  assert(num_buckets_per_partition_ > 0);
204 
205  // reduce max number of blocks per partition to cope for the memory
206  // needed for pointers
207 
208  max_blocks_per_partition_ -= std::max<size_t>(
209  0,
210  (size_t)(std::ceil(
211  static_cast<double>(
213  / static_cast<double>(sizeof(BucketBlock)))));
214 
216 
217  // finally, calculate number of buckets and allocate the table
218 
220  limit_blocks_ = max_blocks_per_partition_ * num_partitions_;
221 
222  assert(num_buckets_ > 0);
223  assert(limit_blocks_ > 0);
224 
225  sLOG << "num_partitions_" << num_partitions_
226  << "num_buckets_per_partition_" << num_buckets_per_partition_
227  << "num_buckets_" << num_buckets_;
228 
229  buckets_.resize(num_buckets_, nullptr);
230  }
231 
232  //! non-copyable: delete copy-constructor
234  //! non-copyable: delete assignment operator
236 
238  Dispose();
239  }
240 
241  /*!
242  * Inserts a value into the table, potentially reducing it in case both the
243  * key of the value already in the table and the key of the value to be
244  * inserted are the same.
245  *
246  * An insert may trigger a partial flush of the partition with the most
247  * items if the maximal number of items in the table
248  * (max_items_per_table_table) is reached.
249  *
250  * Alternatively, it may trigger a resize of table in case maximal number of
251  * items per bucket is reached.
252  *
253  * \param kv Value to be inserted into the table.
254  *
255  * \return true if a new key was inserted to the table
256  */
257  bool Insert(const TableItem& kv) {
258 
261 
262  typename IndexFunction::Result h = calculate_index(kv);
263 
264  size_t local_index = h.local_index(num_buckets_per_partition_);
265 
266  assert(h.partition_id < num_partitions_);
267  assert(local_index < num_buckets_per_partition_);
268 
269  size_t global_index =
270  h.partition_id * num_buckets_per_partition_ + local_index;
271  BucketBlock* current = buckets_[global_index];
272 
273  while (current != nullptr)
274  {
275  // iterate over valid items in a block
276  for (TableItem* bi = current->items;
277  bi != current->items + current->size; ++bi)
278  {
279  // if item and key equals, then reduce.
280  if (key_equal_function_(key(kv), key(*bi)))
281  {
282  *bi = reduce(*bi, kv);
283  return false;
284  }
285  }
286  current = current->next;
287  }
288 
289  // have an item that needs to be added.
290 
291  current = buckets_[global_index];
292 
293  if (current == nullptr || current->size == block_size_)
294  {
295  // new block needed.
296 
297  // flush largest partition if max number of blocks reached
298  while (num_blocks_ > limit_blocks_)
300 
301  // allocate a new block of uninitialized items, prepend to bucket
302  current = block_pool_.GetBlock();
303  current->next = buckets_[global_index];
304  buckets_[global_index] = current;
305 
306  // Total number of blocks
307  ++num_blocks_;
308  }
309 
310  // in-place construct/insert new item in current bucket block
311  new (current->items + current->size++)TableItem(kv);
312 
314  << "h.partition_id" << h.partition_id;
315 
316  // Increase partition item count
317  ++items_per_partition_[h.partition_id];
318  ++num_items_;
319 
321  << "items_per_partition_[" << h.partition_id << "]"
322  << items_per_partition_[h.partition_id];
323 
324  // flush current partition if max partition fill rate reached
325  while (items_per_partition_[h.partition_id] > limit_items_per_partition_)
326  SpillPartition(h.partition_id);
327 
328  return true;
329  }
330 
331  //! Deallocate memory
332  void Dispose() {
333  // destroy all block chains
334  for (BucketBlock* b_block : buckets_)
335  {
336  BucketBlock* current = b_block;
337  while (current != nullptr)
338  {
339  // destroy block and advance to next
340  BucketBlock* next = current->next;
341  current->destroy_items();
342  operator delete (current);
343  current = next;
344  }
345  }
346 
347  // destroy vector and block pool
348  std::vector<BucketBlock*>().swap(buckets_);
350 
351  Super::Dispose();
352  }
353 
354  //! \name Spilling Mechanisms to External Memory Files
355  //! \{
356 
357  //! Spill all items of an arbitrary partition into an external memory File.
359  // maybe make a policy later -tb
360  return SpillLargestPartition();
361  }
362 
363  //! Spill all items of a partition into an external memory File.
364  void SpillPartition(size_t partition_id) {
365 
366  if (immediate_flush_) {
367  return FlushPartition(
368  partition_id, /* consume */ true, /* grow */ true);
369  }
370 
371  sLOG << "Spilling" << items_per_partition_[partition_id]
372  << "items of partition" << partition_id
373  << "buckets: [" << partition_id * num_buckets_per_partition_
374  << "," << (partition_id + 1) * num_buckets_per_partition_ << ")";
375 
376  if (items_per_partition_[partition_id] == 0)
377  return;
378 
379  data::File::Writer writer = partition_files_[partition_id].GetWriter();
380 
381  BucketBlockIterator iter =
382  buckets_.begin() + partition_id * num_buckets_per_partition_;
383  BucketBlockIterator end =
384  buckets_.begin() + (partition_id + 1) * num_buckets_per_partition_;
385 
386  for ( ; iter != end; ++iter)
387  {
388  BucketBlock* current = *iter;
389 
390  while (current != nullptr)
391  {
392  for (TableItem* bi = current->items;
393  bi != current->items + current->size; ++bi)
394  {
395  writer.Put(*bi);
396  }
397 
398  // destroy block and advance to next
399  BucketBlock* next = current->next;
400  block_pool_.Deallocate(current);
401  --num_blocks_;
402  current = next;
403  }
404 
405  *iter = nullptr;
406  }
407 
408  // reset partition specific counter
409  num_items_ -= items_per_partition_[partition_id];
410  items_per_partition_[partition_id] = 0;
411  assert(num_items_ == this->num_items_calc());
412 
413  sLOG << "Spilled items of partition" << partition_id;
414  }
415 
416  //! Spill all items of the largest partition into an external memory File.
418  // get partition with max size
419  size_t size_max = 0, index = 0;
420 
421  for (size_t i = 0; i < num_partitions_; ++i)
422  {
423  if (items_per_partition_[i] > size_max)
424  {
425  size_max = items_per_partition_[i];
426  index = i;
427  }
428  }
429 
430  if (size_max == 0) {
431  return;
432  }
433 
434  return SpillPartition(index);
435  }
436 
437  //! Spill all items of the smallest non-empty partition into an external
438  //! memory File.
440  // get partition with min size
441  size_t size_min = std::numeric_limits<size_t>::max(), index = 0;
442 
443  for (size_t i = 0; i < num_partitions_; ++i)
444  {
445  if (items_per_partition_[i] < size_min
446  && items_per_partition_[i] != 0)
447  {
448  size_min = items_per_partition_[i];
449  index = i;
450  }
451  }
452 
453  if (size_min == 0 || size_min == std::numeric_limits<size_t>::max()) {
454  return;
455  }
456 
457  return SpillPartition(index);
458  }
459 
460  //! \}
461 
462  //! \name Flushing Mechanisms to Next Stage or Phase
463  //! \{
464 
465  template <typename Emit>
467  size_t partition_id, bool consume, bool /* grow */, Emit emit) {
468 
469  LOG << "Flushing " << items_per_partition_[partition_id]
470  << " items of partition: " << partition_id;
471 
472  if (items_per_partition_[partition_id] == 0) return;
473 
474  BucketBlockIterator iter =
475  buckets_.begin() + partition_id * num_buckets_per_partition_;
476  BucketBlockIterator end =
477  buckets_.begin() + (partition_id + 1) * num_buckets_per_partition_;
478 
479  for ( ; iter != end; ++iter)
480  {
481  BucketBlock* current = *iter;
482 
483  while (current != nullptr)
484  {
485  for (TableItem* bi = current->items;
486  bi != current->items + current->size; ++bi)
487  {
488  emit(partition_id, *bi);
489  }
490 
491  if (consume) {
492  // destroy block and advance to next
493  BucketBlock* next = current->next;
494  block_pool_.Deallocate(current);
495  --num_blocks_;
496  current = next;
497  }
498  else {
499  // advance to next
500  current = current->next;
501  }
502  }
503 
504  if (consume)
505  *iter = nullptr;
506  }
507 
508  if (consume) {
509  // reset partition specific counter
510  num_items_ -= items_per_partition_[partition_id];
511  items_per_partition_[partition_id] = 0;
512  assert(num_items_ == this->num_items_calc());
513  }
514 
515  LOG << "Done flushing items of partition: " << partition_id;
516  }
517 
518  void FlushPartition(size_t partition_id, bool consume, bool grow) {
520  partition_id, consume, grow,
521  [this](const size_t& partition_id, const TableItem& p) {
522  this->emitter_.Emit(partition_id, p);
523  });
524  }
525 
526  void FlushAll() {
527  for (size_t i = 0; i < num_partitions_; ++i) {
528  FlushPartition(i, /* consume */ true, /* grow */ false);
529  }
530  }
531 
532  //! \}
533 
534  //! \name Accessors
535  //! \{
536 
537  /*!
538  * Returns the number of block in the table.
539  *
540  * \return Number of blocks in the table.
541  */
542  size_t num_blocks() const {
543  return num_blocks_;
544  }
545 
546  //! \}
547 
548 protected:
549  //! BucketBlockPool to stack allocated BucketBlocks
551  {
552  public:
553  BucketBlockPool() = default;
554 
555  //! non-copyable: delete copy-constructor
556  BucketBlockPool(const BucketBlockPool&) = delete;
557  //! non-copyable: delete assignment operator
558  BucketBlockPool& operator = (const BucketBlockPool&) = delete;
559  //! move-constructor: default
560  BucketBlockPool(BucketBlockPool&&) = default;
561  //! move-assignment operator: default
563 
565  Destroy();
566  }
567 
568  // allocate a chunk of memory as big as Type needs:
570  BucketBlock* place;
571  if (!free.empty()) {
572  place = static_cast<BucketBlock*>(free.top());
573  free.pop();
574  }
575  else {
576  place = static_cast<BucketBlock*>(operator new (sizeof(BucketBlock)));
577  place->size = 0;
578  place->next = nullptr;
579  }
580 
581  return place;
582  }
583 
584  // mark some memory as available (no longer used):
586  o->size = 0;
587  o->next = nullptr;
588  free.push(static_cast<BucketBlock*>(o));
589  }
590 
591  void Destroy() {
592  while (!free.empty()) {
593  free.top()->destroy_items();
594  operator delete (free.top());
595  free.pop();
596  }
597  }
598 
599  private:
600  // stack to hold pointers to free chunks:
601  std::stack<BucketBlock*> free;
602  };
603 
604 public:
606 
607 private:
608  using Super::config_;
612  using Super::key;
616  using Super::num_buckets_;
618  using Super::num_items_;
621  using Super::reduce;
622 
623  //! Storing the items.
624  std::vector<BucketBlock*> buckets_;
625 
626  //! Bucket block pool.
628 
629  //! \name Fixed Operational Parameters
630  //! \{
631 
632  //! Number of blocks in the table before some items are spilled.
634 
635  //! Maximal number of items per partition.
637 
638  //! Maximal number of blocks per partition.
640 
641  //! \}
642 
643  //! \name Current Statistical Parameters
644  //! \{
645 
646  //! Total number of blocks in the table.
647  size_t num_blocks_ = 0;
648 
649  //! \}
650 };
651 
652 template <typename TableItem, typename Key, typename Value,
653  typename KeyExtractor, typename ReduceFunction,
654  typename Emitter, const bool VolatileKey,
655  typename ReduceConfig, typename IndexFunction,
656  typename KeyEqualFunction>
659  TableItem, Key, Value, KeyExtractor, ReduceFunction,
660  Emitter, VolatileKey, ReduceConfig, IndexFunction, KeyEqualFunction>
661 {
662 public:
663  using type = ReduceBucketHashTable<
664  TableItem, Key, Value, KeyExtractor, ReduceFunction,
665  Emitter, VolatileKey, ReduceConfig,
666  IndexFunction, KeyEqualFunction>;
667 };
668 
669 } // namespace core
670 } // namespace thrill
671 
672 #endif // !THRILL_CORE_REDUCE_BUCKET_HASH_TABLE_HEADER
673 
674 /******************************************************************************/
A data structure which takes an arbitrary value and extracts a key using a key extractor function fro...
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
BucketBlockPool to stack allocated BucketBlocks.
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
typename std::vector< BucketBlock * >::iterator BucketBlockIterator
void SpillAnyPartition()
Spill all items of an arbitrary partition into an external memory File.
Type selection via ReduceTableImpl enum.
ReduceBucketHashTable(Context &ctx, size_t dia_id, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, Emitter &emitter, size_t num_partitions, const ReduceConfig &config=ReduceConfig(), bool immediate_flush=false, const IndexFunction &index_function=IndexFunction(), const KeyEqualFunction &key_equal_function=KeyEqualFunction())
bool Insert(const TableItem &kv)
Inserts a value into the table, potentially reducing it in case both the key of the value already in ...
void SpillLargestPartition()
Spill all items of the largest partition into an external memory File.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
void Initialize(size_t limit_memory_bytes)
Construct the hash table itself. fill it with sentinels.
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
bool memory_exceeded
memory limit exceeded indicator
BucketBlock * next
link of linked list to next block
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
ReduceTableImpl
Enum class to select a hash table implementation.
static constexpr size_t bucket_block_size
target number of bytes in a BucketBlock.
ReduceBucketHashTable & operator=(const ReduceBucketHashTable &)=delete
non-copyable: delete assignment operator
void SpillPartition(size_t partition_id)
Spill all items of a partition into an external memory File.
std::vector< BucketBlock * > buckets_
Storing the items.
BucketBlockPool & operator=(const BucketBlockPool &)=delete
non-copyable: delete assignment operator
void destroy_items()
helper to destroy all allocated items
void FlushPartitionEmit(size_t partition_id, bool consume, bool, Emit emit)
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
size_t max_blocks_per_partition_
Maximal number of blocks per partition.
BucketBlockPool block_pool_
Bucket block pool.
Common super-class for bucket and linear-probing hash/reduce tables.
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
void FlushPartition(size_t partition_id, bool consume, bool grow)
TableItem items[block_size_]
memory area of items
size_t num_blocks_
Total number of blocks in the table.
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
size_t num_blocks() const
Returns the number of block in the table.
size_t limit_blocks_
Number of blocks in the table before some items are spilled.
size_t max_items_per_partition_
Maximal number of items per partition.