Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
reduce_old_probing_hash_table.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/core/reduce_old_probing_hash_table.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
7  * Copyright (C) 2016 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_CORE_REDUCE_OLD_PROBING_HASH_TABLE_HEADER
14 #define THRILL_CORE_REDUCE_OLD_PROBING_HASH_TABLE_HEADER
15 
18 
19 #include <functional>
20 #include <limits>
21 #include <utility>
22 #include <vector>
23 
24 namespace thrill {
25 namespace core {
26 
27 /*!
28  * A data structure which takes an arbitrary value and extracts a key using a
29  * key extractor function from that value. A key may also be provided initially
30  * as part of a key/value pair, not requiring to extract a key.
31  *
32  * Afterwards, the key is hashed and the hash is used to assign that key/value
33  * pair to some slot.
34  *
35  * In case a slot already has a key/value pair and the key of that value and the
36  * key of the value to be inserted are them same, the values are reduced
37  * according to some reduce function. No key/value is added to the data
38  * structure.
39  *
40  * If the keys are different, the next slot (moving to the right) is considered.
41  * If the slot is occupied, the same procedure happens again (know as linear
42  * probing.)
43  *
44  * Finally, the key/value pair to be inserted may either:
45  *
46  * 1.) Be reduced with some other key/value pair, sharing the same key.
47  * 2.) Inserted at a free slot.
48  * 3.) Trigger a resize of the data structure in case there are no more free
49  * slots in the data structure.
50  *
51  * The following illustrations shows the general structure of the data
52  * structure. The set of slots is divided into 1..n partitions. Each key is
53  * hashed into exactly one partition.
54  *
55  *
56  * Partition 0 Partition 1 Partition 2 Partition 3 Partition 4
57  * P00 P01 P02 P10 P11 P12 P20 P21 P22 P30 P31 P32 P40 P41 P42
58  * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
59  * || | | || | | || | | || | | || | | ||
60  * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
61  * <- LI ->
62  * LI..Local Index
63  * <- GI ->
64  * GI..Global Index
65  * PI 0 PI 1 PI 2 PI 3 PI 4
66  * PI..Partition ID
67  *
68  */
69 template <typename TableItem, typename Key, typename Value,
70  typename KeyExtractor, typename ReduceFunction, typename Emitter,
71  const bool VolatileKey,
72  typename ReduceConfig_,
73  typename IndexFunction,
74  typename KeyEqualFunction = std::equal_to<Key> >
76  : public ReduceTable<TableItem, Key, Value,
77  KeyExtractor, ReduceFunction, Emitter,
78  VolatileKey, ReduceConfig_,
79  IndexFunction, KeyEqualFunction>
80 {
81  using Super = ReduceTable<TableItem, Key, Value,
82  KeyExtractor, ReduceFunction, Emitter,
83  VolatileKey, ReduceConfig_, IndexFunction,
84  KeyEqualFunction>;
85  using Super::debug;
86  static constexpr bool debug_items = false;
87 
88 public:
89  using ReduceConfig = ReduceConfig_;
90  using TableItemIterator = typename std::vector<TableItem>::iterator;
91 
93  Context& ctx, size_t dia_id,
94  const KeyExtractor& key_extractor,
95  const ReduceFunction& reduce_function,
96  Emitter& emitter,
97  size_t num_partitions,
98  const ReduceConfig& config = ReduceConfig(),
99  bool immediate_flush = false,
100  const IndexFunction& index_function = IndexFunction(),
101  const KeyEqualFunction& key_equal_function = KeyEqualFunction())
102  : Super(ctx, dia_id,
103  key_extractor, reduce_function, emitter,
104  num_partitions, config, immediate_flush,
106 
107  assert(num_partitions > 0);
108  }
109 
110  //! Construct the hash table itself. fill it with sentinels. have one extra
111  //! cell beyond the end for reducing the sentinel itself.
113 
115 
116  // calculate num_buckets_per_partition_ from the memory limit and the
117  // number of partitions required
118 
119  assert(limit_memory_bytes_ >= 0 &&
120  "limit_memory_bytes must be greater than or equal to 0. "
121  "A byte size of zero results in exactly one item per partition");
122 
123  num_buckets_per_partition_ = std::max<size_t>(
124  1,
125  (size_t)(static_cast<double>(limit_memory_bytes_)
126  / static_cast<double>(sizeof(TableItem))
127  / static_cast<double>(num_partitions_)));
128 
130 
131  assert(num_buckets_per_partition_ > 0);
132  assert(num_buckets_ > 0);
133 
134  // calculate limit on the number of items in a partition before these
135  // are spilled to disk or flushed to network.
136 
137  double limit_fill_rate = config_.limit_partition_fill_rate();
138 
139  assert(limit_fill_rate >= 0.0 && limit_fill_rate <= 1.0
140  && "limit_partition_fill_rate must be between 0.0 and 1.0. "
141  "with a fill rate of 0.0, items are immediately flushed.");
142 
143  limit_items_per_partition_ = (size_t)(
144  static_cast<double>(num_buckets_per_partition_) * limit_fill_rate);
145 
146  assert(limit_items_per_partition_ >= 0);
147 
148  // actually allocate the table
149 
150  items_.resize(num_buckets_ + 1);
151  }
152 
153  /*!
154  * Inserts a value into the table, potentially reducing it in case both the
155  * key of the value already in the table and the key of the value to be
156  * inserted are the same.
157  *
158  * An insert may trigger a partial flush of the partition with the most
159  * items if the maximal number of items in the table (max_num_items_table)
160  * is reached.
161  *
162  * Alternatively, it may trigger a resize of the table in case the maximal
163  * fill ratio per partition is reached.
164  *
165  * \param kv Value to be inserted into the table.
166  *
167  * \return true if a new key was inserted to the table
168  */
169  bool Insert(const TableItem& kv) {
170 
173 
174  typename IndexFunction::Result h = calculate_index(kv);
175 
176  assert(h.partition_id < num_partitions_);
177 
178  if (key_equal_function_(key(kv), Key())) {
179  // handle pairs with sentinel key specially by reducing into last
180  // element of items.
183  // first occurrence of sentinel key
184  sentinel = kv;
185  sentinel_partition_ = h.partition_id;
186  }
187  else {
188  sentinel = reduce(sentinel, kv);
189  }
190  ++items_per_partition_[h.partition_id];
191  ++num_items_;
192 
193  while (items_per_partition_[h.partition_id] > limit_items_per_partition_)
194  SpillPartition(h.partition_id);
195 
196  return false;
197  }
198 
199  size_t local_index = h.local_index(num_buckets_per_partition_);
200 
201  TableItemIterator pbegin =
202  items_.begin() + h.partition_id * num_buckets_per_partition_;
204 
205  TableItemIterator begin_iter = pbegin + local_index;
206  TableItemIterator iter = begin_iter;
207 
208  while (!key_equal_function_(key(*iter), Key()))
209  {
210  if (key_equal_function_(key(*iter), key(kv)))
211  {
212  *iter = reduce(*iter, kv);
213  return false;
214  }
215 
216  ++iter;
217 
218  // wrap around if beyond the current partition
219  if (iter == pend)
220  iter = pbegin;
221 
222  // flush partition, if all slots are reserved
223  if (iter == begin_iter) {
224 
225  SpillPartition(h.partition_id);
226 
227  *iter = kv;
228 
229  // increase counter for partition
230  ++items_per_partition_[h.partition_id];
231  ++num_items_;
232 
233  return true;
234  }
235  }
236 
237  // insert new pair
238  *iter = kv;
239 
240  // increase counter for partition
241  ++items_per_partition_[h.partition_id];
242  ++num_items_;
243 
244  while (items_per_partition_[h.partition_id] > limit_items_per_partition_)
245  SpillPartition(h.partition_id);
246 
247  return true;
248  }
249 
250  //! Deallocate memory
251  void Dispose() {
252  std::vector<TableItem>().swap(items_);
253  Super::Dispose();
254  }
255 
256  //! \name Spilling Mechanisms to External Memory Files
257  //! \{
258 
259  //! Spill all items of a partition into an external memory File.
260  void SpillPartition(size_t partition_id) {
261 
262  if (immediate_flush_) {
263  return FlushPartition(
264  partition_id, /* consume */ true, /* grow */ true);
265  }
266 
267  LOG << "Spilling " << items_per_partition_[partition_id]
268  << " items of partition with id: " << partition_id;
269 
270  if (items_per_partition_[partition_id] == 0)
271  return;
272 
273  data::File::Writer writer = partition_files_[partition_id].GetWriter();
274 
275  if (sentinel_partition_ == partition_id) {
276  writer.Put(items_[num_buckets_]);
279  }
280 
281  TableItemIterator iter =
282  items_.begin() + partition_id * num_buckets_per_partition_;
283  TableItemIterator end =
284  items_.begin() + (partition_id + 1) * num_buckets_per_partition_;
285 
286  for ( ; iter != end; ++iter)
287  {
288  if (!key_equal_function_(key(*iter), Key()))
289  {
290  writer.Put(*iter);
291  *iter = TableItem();
292  }
293  }
294 
295  // reset partition specific counter
296  num_items_ -= items_per_partition_[partition_id];
297  items_per_partition_[partition_id] = 0;
298  assert(num_items_ == this->num_items_calc());
299 
300  LOG << "Spilled items of partition with id: " << partition_id;
301  }
302 
303  //! Spill all items of an arbitrary partition into an external memory File.
305  // maybe make a policy later -tb
306  return SpillLargestPartition();
307  }
308 
309  //! Spill all items of the largest partition into an external memory File.
311  // get partition with max size
312  size_t size_max = 0, index = 0;
313 
314  for (size_t i = 0; i < num_partitions_; ++i)
315  {
316  if (items_per_partition_[i] > size_max)
317  {
318  size_max = items_per_partition_[i];
319  index = i;
320  }
321  }
322 
323  if (size_max == 0) {
324  return;
325  }
326 
327  return SpillPartition(index);
328  }
329 
330  //! \}
331 
332  //! \name Flushing Mechanisms to Next Stage or Phase
333  //! \{
334 
335  template <typename Emit>
337  size_t partition_id, bool consume, bool /* grow */, Emit emit) {
338 
339  LOG << "Flushing " << items_per_partition_[partition_id]
340  << " items of partition: " << partition_id;
341 
342  if (sentinel_partition_ == partition_id) {
343  emit(partition_id, items_[num_buckets_]);
344  if (consume) {
347  }
348  }
349 
350  TableItemIterator iter =
351  items_.begin() + partition_id * num_buckets_per_partition_;
352  TableItemIterator end =
353  items_.begin() + (partition_id + 1) * num_buckets_per_partition_;
354 
355  for ( ; iter != end; ++iter)
356  {
357  if (!key_equal_function_(key(*iter), Key())) {
358  emit(partition_id, *iter);
359 
360  if (consume)
361  *iter = TableItem();
362  }
363  }
364 
365  if (consume) {
366  // reset partition specific counter
367  num_items_ -= items_per_partition_[partition_id];
368  items_per_partition_[partition_id] = 0;
369  assert(num_items_ == this->num_items_calc());
370  }
371 
372  LOG << "Done flushed items of partition: " << partition_id;
373  }
374 
375  void FlushPartition(size_t partition_id, bool consume, bool grow) {
377  partition_id, consume, grow,
378  [this](const size_t& partition_id, const TableItem& p) {
379  this->emitter_.Emit(partition_id, p);
380  });
381  }
382 
383  void FlushAll() {
384  for (size_t i = 0; i < num_partitions_; ++i) {
385  FlushPartition(i, /* consume */ true, /* grow */ false);
386  }
387  }
388 
389  //! \}
390 
391 public:
393 
394 private:
395  using Super::config_;
400  using Super::key;
403  using Super::num_buckets_;
405  using Super::num_items_;
408  using Super::reduce;
409 
410  //! Storing the actual hash table.
411  std::vector<TableItem> items_;
412 
413  //! sentinel for invalid partition or no sentinel.
414  static constexpr size_t invalid_partition_ = size_t(-1);
415 
416  //! store the partition id of the sentinel key. implicitly this also stored
417  //! whether the sentinel key was found and reduced into
418  //! items_[num_buckets_].
420 };
421 
422 template <typename TableItem, typename Key, typename Value,
423  typename KeyExtractor, typename ReduceFunction,
424  typename Emitter, const bool VolatileKey,
425  typename ReduceConfig, typename IndexFunction,
426  typename KeyEqualFunction>
429  TableItem, Key, Value, KeyExtractor, ReduceFunction,
430  Emitter, VolatileKey, ReduceConfig, IndexFunction, KeyEqualFunction>
431 {
432 public:
434  TableItem, Key, Value, KeyExtractor, ReduceFunction,
435  Emitter, VolatileKey, ReduceConfig,
436  IndexFunction, KeyEqualFunction>;
437 };
438 
439 } // namespace core
440 } // namespace thrill
441 
442 #endif // !THRILL_CORE_REDUCE_OLD_PROBING_HASH_TABLE_HEADER
443 
444 /******************************************************************************/
A data structure which takes an arbitrary value and extracts a key using a key extractor function fro...
ReduceOldProbingHashTable(Context &ctx, size_t dia_id, const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, Emitter &emitter, size_t num_partitions, const ReduceConfig &config=ReduceConfig(), bool immediate_flush=false, const IndexFunction &index_function=IndexFunction(), const KeyEqualFunction &key_equal_function=KeyEqualFunction())
Type selection via ReduceTableImpl enum.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
#define TLX_UNLIKELY(c)
Definition: likely.hpp:24
bool memory_exceeded
memory limit exceeded indicator
typename std::vector< TableItem >::iterator TableItemIterator
void SpillLargestPartition()
Spill all items of the largest partition into an external memory File.
static constexpr size_t sentinel
a sentinel value prefixed to each allocation
void SpillAnyPartition()
Spill all items of an arbitrary partition into an external memory File.
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
ReduceTableImpl
Enum class to select a hash table implementation.
static constexpr size_t invalid_partition_
sentinel for invalid partition or no sentinel.
std::vector< TableItem > items_
Storing the actual hash table.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
Common super-class for bucket and linear-probing hash/reduce tables.
void FlushPartition(size_t partition_id, bool consume, bool grow)
bool Insert(const TableItem &kv)
Inserts a value into the table, potentially reducing it in case both the key of the value already in ...
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
void SpillPartition(size_t partition_id)
Spill all items of a partition into an external memory File.
void FlushPartitionEmit(size_t partition_id, bool consume, bool, Emit emit)