Thrill  0.1
reduce_old_probing_hash_table.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/core/reduce_old_probing_hash_table.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
7  * Copyright (C) 2016 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_CORE_REDUCE_OLD_PROBING_HASH_TABLE_HEADER
14 #define THRILL_CORE_REDUCE_OLD_PROBING_HASH_TABLE_HEADER
15 
18 
19 #include <tlx/vector_free.hpp>
20 
21 #include <functional>
22 #include <limits>
23 #include <utility>
24 #include <vector>
25 
26 namespace thrill {
27 namespace core {
28 
29 /*!
30  * A data structure which takes an arbitrary value and extracts a key using a
31  * key extractor function from that value. A key may also be provided initially
32  * as part of a key/value pair, not requiring to extract a key.
33  *
34  * Afterwards, the key is hashed and the hash is used to assign that key/value
35  * pair to some slot.
36  *
37  * In case a slot already has a key/value pair and the key of that value and the
38  * key of the value to be inserted are them same, the values are reduced
39  * according to some reduce function. No key/value is added to the data
40  * structure.
41  *
42  * If the keys are different, the next slot (moving to the right) is considered.
43  * If the slot is occupied, the same procedure happens again (know as linear
44  * probing.)
45  *
46  * Finally, the key/value pair to be inserted may either:
47  *
48  * 1.) Be reduced with some other key/value pair, sharing the same key.
49  * 2.) Inserted at a free slot.
50  * 3.) Trigger a resize of the data structure in case there are no more free
51  * slots in the data structure.
52  *
53  * The following illustrations shows the general structure of the data
54  * structure. The set of slots is divided into 1..n partitions. Each key is
55  * hashed into exactly one partition.
56  *
57  *
58  * Partition 0 Partition 1 Partition 2 Partition 3 Partition 4
59  * P00 P01 P02 P10 P11 P12 P20 P21 P22 P30 P31 P32 P40 P41 P42
60  * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
61  * || | | || | | || | | || | | || | | ||
62  * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
63  * <- LI ->
64  * LI..Local Index
65  * <- GI ->
66  * GI..Global Index
67  * PI 0 PI 1 PI 2 PI 3 PI 4
68  * PI..Partition ID
69  *
70  */
71 template <typename TableItem, typename Key, typename Value,
72  typename KeyExtractor, typename ReduceFunction, typename Emitter,
73  const bool VolatileKey,
74  typename ReduceConfig_,
75  typename IndexFunction,
76  typename KeyEqualFunction = std::equal_to<Key> >
78  : public ReduceTable<TableItem, Key, Value,
79  KeyExtractor, ReduceFunction, Emitter,
80  VolatileKey, ReduceConfig_,
81  IndexFunction, KeyEqualFunction>
82 {
83  using Super = ReduceTable<TableItem, Key, Value,
84  KeyExtractor, ReduceFunction, Emitter,
85  VolatileKey, ReduceConfig_, IndexFunction,
86  KeyEqualFunction>;
87  using Super::debug;
88  static constexpr bool debug_items = false;
89 
90 public:
91  using ReduceConfig = ReduceConfig_;
92  using TableItemIterator = typename std::vector<TableItem>::iterator;
93 
95  Context& ctx, size_t dia_id,
96  const KeyExtractor& key_extractor,
97  const ReduceFunction& reduce_function,
98  Emitter& emitter,
99  size_t num_partitions,
100  const ReduceConfig& config = ReduceConfig(),
101  bool immediate_flush = false,
102  const IndexFunction& index_function = IndexFunction(),
103  const KeyEqualFunction& key_equal_function = KeyEqualFunction())
104  : Super(ctx, dia_id,
105  key_extractor, reduce_function, emitter,
106  num_partitions, config, immediate_flush,
108 
109  assert(num_partitions > 0);
110  }
111 
112  //! Construct the hash table itself. fill it with sentinels. have one extra
113  //! cell beyond the end for reducing the sentinel itself.
115 
117 
118  // calculate num_buckets_per_partition_ from the memory limit and the
119  // number of partitions required
120 
121  assert(limit_memory_bytes_ >= 0 &&
122  "limit_memory_bytes must be greater than or equal to 0. "
123  "A byte size of zero results in exactly one item per partition");
124 
125  num_buckets_per_partition_ = std::max<size_t>(
126  1,
127  (size_t)(static_cast<double>(limit_memory_bytes_)
128  / static_cast<double>(sizeof(TableItem))
129  / static_cast<double>(num_partitions_)));
130 
132 
133  assert(num_buckets_per_partition_ > 0);
134  assert(num_buckets_ > 0);
135 
136  // calculate limit on the number of items in a partition before these
137  // are spilled to disk or flushed to network.
138 
139  double limit_fill_rate = config_.limit_partition_fill_rate();
140 
141  assert(limit_fill_rate >= 0.0 && limit_fill_rate <= 1.0
142  && "limit_partition_fill_rate must be between 0.0 and 1.0. "
143  "with a fill rate of 0.0, items are immediately flushed.");
144 
145  limit_items_per_partition_ = (size_t)(
146  static_cast<double>(num_buckets_per_partition_) * limit_fill_rate);
147 
148  assert(limit_items_per_partition_ >= 0);
149 
150  // actually allocate the table
151 
152  items_.resize(num_buckets_ + 1);
153  }
154 
155  /*!
156  * Inserts a value into the table, potentially reducing it in case both the
157  * key of the value already in the table and the key of the value to be
158  * inserted are the same.
159  *
160  * An insert may trigger a partial flush of the partition with the most
161  * items if the maximal number of items in the table (max_num_items_table)
162  * is reached.
163  *
164  * Alternatively, it may trigger a resize of the table in case the maximal
165  * fill ratio per partition is reached.
166  *
167  * \param kv Value to be inserted into the table.
168  *
169  * \return true if a new key was inserted to the table
170  */
171  bool Insert(const TableItem& kv) {
172 
175 
176  typename IndexFunction::Result h = calculate_index(kv);
177 
178  assert(h.partition_id < num_partitions_);
179 
180  if (key_equal_function_(key(kv), Key())) {
181  // handle pairs with sentinel key specially by reducing into last
182  // element of items.
183  TableItem& sentinel = items_[num_buckets_];
185  // first occurrence of sentinel key
186  sentinel = kv;
187  sentinel_partition_ = h.partition_id;
188  }
189  else {
190  sentinel = reduce(sentinel, kv);
191  }
192  ++items_per_partition_[h.partition_id];
193  ++num_items_;
194 
195  while (items_per_partition_[h.partition_id] > limit_items_per_partition_)
196  SpillPartition(h.partition_id);
197 
198  return false;
199  }
200 
201  size_t local_index = h.local_index(num_buckets_per_partition_);
202 
203  TableItemIterator pbegin =
204  items_.begin() + h.partition_id * num_buckets_per_partition_;
206 
207  TableItemIterator begin_iter = pbegin + local_index;
208  TableItemIterator iter = begin_iter;
209 
210  while (!key_equal_function_(key(*iter), Key()))
211  {
212  if (key_equal_function_(key(*iter), key(kv)))
213  {
214  *iter = reduce(*iter, kv);
215  return false;
216  }
217 
218  ++iter;
219 
220  // wrap around if beyond the current partition
221  if (iter == pend)
222  iter = pbegin;
223 
224  // flush partition, if all slots are reserved
225  if (iter == begin_iter) {
226 
227  SpillPartition(h.partition_id);
228 
229  *iter = kv;
230 
231  // increase counter for partition
232  ++items_per_partition_[h.partition_id];
233  ++num_items_;
234 
235  return true;
236  }
237  }
238 
239  // insert new pair
240  *iter = kv;
241 
242  // increase counter for partition
243  ++items_per_partition_[h.partition_id];
244  ++num_items_;
245 
246  while (items_per_partition_[h.partition_id] > limit_items_per_partition_)
247  SpillPartition(h.partition_id);
248 
249  return true;
250  }
251 
252  //! Deallocate memory
253  void Dispose() {
255  Super::Dispose();
256  }
257 
258  //! \name Spilling Mechanisms to External Memory Files
259  //! \{
260 
261  //! Spill all items of a partition into an external memory File.
262  void SpillPartition(size_t partition_id) {
263 
264  if (immediate_flush_) {
265  return FlushPartition(
266  partition_id, /* consume */ true, /* grow */ true);
267  }
268 
269  LOG << "Spilling " << items_per_partition_[partition_id]
270  << " items of partition with id: " << partition_id;
271 
272  if (items_per_partition_[partition_id] == 0)
273  return;
274 
275  data::File::Writer writer = partition_files_[partition_id].GetWriter();
276 
277  if (sentinel_partition_ == partition_id) {
278  writer.Put(items_[num_buckets_]);
281  }
282 
283  TableItemIterator iter =
284  items_.begin() + partition_id * num_buckets_per_partition_;
285  TableItemIterator end =
286  items_.begin() + (partition_id + 1) * num_buckets_per_partition_;
287 
288  for ( ; iter != end; ++iter)
289  {
290  if (!key_equal_function_(key(*iter), Key()))
291  {
292  writer.Put(*iter);
293  *iter = TableItem();
294  }
295  }
296 
297  // reset partition specific counter
298  num_items_ -= items_per_partition_[partition_id];
299  items_per_partition_[partition_id] = 0;
300  assert(num_items_ == this->num_items_calc());
301 
302  LOG << "Spilled items of partition with id: " << partition_id;
303  }
304 
305  //! Spill all items of an arbitrary partition into an external memory File.
307  // maybe make a policy later -tb
308  return SpillLargestPartition();
309  }
310 
311  //! Spill all items of the largest partition into an external memory File.
313  // get partition with max size
314  size_t size_max = 0, index = 0;
315 
316  for (size_t i = 0; i < num_partitions_; ++i)
317  {
318  if (items_per_partition_[i] > size_max)
319  {
320  size_max = items_per_partition_[i];
321  index = i;
322  }
323  }
324 
325  if (size_max == 0) {
326  return;
327  }
328 
329  return SpillPartition(index);
330  }
331 
332  //! \}
333 
334  //! \name Flushing Mechanisms to Next Stage or Phase
335  //! \{
336 
337  template <typename Emit>
339  size_t partition_id, bool consume, bool /* grow */, Emit emit) {
340 
341  LOG << "Flushing " << items_per_partition_[partition_id]
342  << " items of partition: " << partition_id;
343 
344  if (sentinel_partition_ == partition_id) {
345  emit(partition_id, items_[num_buckets_]);
346  if (consume) {
349  }
350  }
351 
352  TableItemIterator iter =
353  items_.begin() + partition_id * num_buckets_per_partition_;
354  TableItemIterator end =
355  items_.begin() + (partition_id + 1) * num_buckets_per_partition_;
356 
357  for ( ; iter != end; ++iter)
358  {
359  if (!key_equal_function_(key(*iter), Key())) {
360  emit(partition_id, *iter);
361 
362  if (consume)
363  *iter = TableItem();
364  }
365  }
366 
367  if (consume) {
368  // reset partition specific counter
369  num_items_ -= items_per_partition_[partition_id];
370  items_per_partition_[partition_id] = 0;
371  assert(num_items_ == this->num_items_calc());
372  }
373 
374  LOG << "Done flushed items of partition: " << partition_id;
375  }
376 
377  void FlushPartition(size_t partition_id, bool consume, bool grow) {
379  partition_id, consume, grow,
380  [this](const size_t& partition_id, const TableItem& p) {
381  this->emitter_.Emit(partition_id, p);
382  });
383  }
384 
385  void FlushAll() {
386  for (size_t i = 0; i < num_partitions_; ++i) {
387  FlushPartition(i, /* consume */ true, /* grow */ false);
388  }
389  }
390 
391  //! \}
392 
393 public:
395 
396 private:
397  using Super::config_;
402  using Super::key;
405  using Super::num_buckets_;
407  using Super::num_items_;
410  using Super::reduce;
411 
412  //! Storing the actual hash table.
413  std::vector<TableItem> items_;
414 
415  //! sentinel for invalid partition or no sentinel.
416  static constexpr size_t invalid_partition_ = size_t(-1);
417 
418  //! store the partition id of the sentinel key. implicitly this also stored
419  //! whether the sentinel key was found and reduced into
420  //! items_[num_buckets_].
422 };
423 
424 template <typename TableItem, typename Key, typename Value,
425  typename KeyExtractor, typename ReduceFunction,
426  typename Emitter, const bool VolatileKey,
427  typename ReduceConfig, typename IndexFunction,
428  typename KeyEqualFunction>
431  TableItem, Key, Value, KeyExtractor, ReduceFunction,
432  Emitter, VolatileKey, ReduceConfig, IndexFunction, KeyEqualFunction>
433 {
434 public:
436  TableItem, Key, Value, KeyExtractor, ReduceFunction,
437  Emitter, VolatileKey, ReduceConfig,
438  IndexFunction, KeyEqualFunction>;
439 };
440 
441 } // namespace core
442 } // namespace thrill
443 
444 #endif // !THRILL_CORE_REDUCE_OLD_PROBING_HASH_TABLE_HEADER
445 
446 /******************************************************************************/
A data structure which takes an arbitrary value and extracts a key using a key extractor function fro...
ReduceOldProbingHashTable(Context &ctx, size_t dia_id, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, Emitter &emitter, size_t num_partitions, const ReduceConfig &config=ReduceConfig(), bool immediate_flush=false, const IndexFunction &index_function=IndexFunction(), const KeyEqualFunction &key_equal_function=KeyEqualFunction())
Type selection via ReduceTableImpl enum.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
bool memory_exceeded
memory limit exceeded indicator
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
typename std::vector< TableItem >::iterator TableItemIterator
void SpillLargestPartition()
Spill all items of the largest partition into an external memory File.
static constexpr size_t sentinel
a sentinel value prefixed to each allocation
void SpillAnyPartition()
Spill all items of an arbitrary partition into an external memory File.
ReduceTableImpl
Enum class to select a hash table implementation.
static constexpr size_t invalid_partition_
sentinel for invalid partition or no sentinel.
void vector_free(std::vector< Type > &v)
Definition: vector_free.hpp:21
std::vector< TableItem > items_
Storing the actual hash table.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
Common super-class for bucket and linear-probing hash/reduce tables.
void FlushPartition(size_t partition_id, bool consume, bool grow)
bool Insert(const TableItem &kv)
Inserts a value into the table, potentially reducing it in case both the key of the value already in ...
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
void SpillPartition(size_t partition_id)
Spill all items of a partition into an external memory File.
void FlushPartitionEmit(size_t partition_id, bool consume, bool, Emit emit)