Thrill  0.1
reduce_pre_phase.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/core/reduce_pre_phase.hpp
3  *
4  * Hash table with support for reduce and partitions.
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
9  * Copyright (C) 2015 Alexander Noe <[email protected]>
10  * Copyright (C) 2015 Timo Bingmann <[email protected]>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_CORE_REDUCE_PRE_PHASE_HEADER
17 #define THRILL_CORE_REDUCE_PRE_PHASE_HEADER
18 
20 #include <thrill/common/logger.hpp>
21 #include <thrill/common/math.hpp>
29 #include <thrill/data/file.hpp>
30 
31 #include <algorithm>
32 #include <cassert>
33 #include <cmath>
34 #include <functional>
35 #include <string>
36 #include <utility>
37 #include <vector>
38 
39 namespace thrill {
40 namespace core {
41 
42 //! Emitter implementation to plug into a reduce hash table for
43 //! collecting/flushing items while reducing. Items flushed in the pre-phase are
44 //! transmitted via a network Channel.
45 template <typename TableItem, bool VolatileKey, typename BlockWriter>
47 {
48  static constexpr bool debug = false;
49 
50 public:
51  explicit ReducePrePhaseEmitter(std::vector<BlockWriter>& writer)
52  : writer_(writer),
53  stats_(writer.size(), 0) { }
54 
55  //! output an element into a partition, template specialized for robust and
56  //! non-robust keys
57  void Emit(const size_t& partition_id, const TableItem& p) {
58  assert(partition_id < writer_.size());
59  stats_[partition_id]++;
60  writer_[partition_id].Put(p);
61  }
62 
63  void Flush(size_t partition_id) {
64  assert(partition_id < writer_.size());
65  writer_[partition_id].Flush();
66  }
67 
68  void CloseAll() {
69  sLOG << "emit stats:";
70  size_t i = 0;
71  for (BlockWriter& e : writer_) {
72  e.Close();
73  sLOG << "emitter" << i << "pushed" << stats_[i++];
74  }
75  }
76 
77 public:
78  //! Set of emitters, one per partition.
79  std::vector<BlockWriter>& writer_;
80 
81  //! Emitter stats.
82  std::vector<size_t> stats_;
83 };
84 
85 template <typename TableItem, typename Key, typename Value,
86  typename KeyExtractor, typename ReduceFunction,
87  const bool VolatileKey,
88  typename BlockWriter,
89  typename ReduceConfig_ = DefaultReduceConfig,
90  typename IndexFunction = ReduceByHash<Key>,
91  typename KeyEqualFunction = std::equal_to<Key>,
92  typename HashFunction = std::hash<Key>,
93  bool UseDuplicateDetection = false>
95 
96 template <typename TableItem, typename Key, typename Value,
97  typename KeyExtractor, typename ReduceFunction,
98  const bool VolatileKey, typename BlockWriter,
99  typename ReduceConfig_,
100  typename IndexFunction,
101  typename KeyEqualFunction,
102  typename HashFunction>
103 class ReducePrePhase<TableItem, Key, Value,
104  KeyExtractor, ReduceFunction,
105  VolatileKey, BlockWriter,
106  ReduceConfig_,
107  IndexFunction,
108  KeyEqualFunction,
109  HashFunction,
110  false>
111 {
112  static constexpr bool debug = false;
113 
114 public:
115  using ReduceConfig = ReduceConfig_;
118 
119  using Table = typename ReduceTableSelect<
120  ReduceConfig::table_impl_,
121  TableItem, Key, Value,
122  KeyExtractor, ReduceFunction, Emitter,
123  VolatileKey, ReduceConfig, IndexFunction, KeyEqualFunction>::type;
124 
125  /*!
126  * A data structure which takes an arbitrary value and extracts a key using
127  * a key extractor function from that value. Afterwards, the value is hashed
128  * based on the key into some slot.
129  */
130  ReducePrePhase(Context& ctx, size_t dia_id,
131  size_t num_partitions,
132  KeyExtractor key_extractor,
133  ReduceFunction reduce_function,
134  std::vector<BlockWriter>& emit,
135  const ReduceConfig& config = ReduceConfig(),
136  const IndexFunction& index_function = IndexFunction(),
137  const KeyEqualFunction& key_equal_function = KeyEqualFunction(),
138  const HashFunction hash_function = HashFunction(),
139  bool duplicates = false)
140  : emit_(emit),
141  key_extractor_(key_extractor),
142  table_(ctx, dia_id,
143  key_extractor, reduce_function, emit_,
144  num_partitions, config, !duplicates,
145  index_function, key_equal_function) {
146 
147  tlx::unused(hash_function);
148 
149  sLOG << "creating ReducePrePhase with" << emit.size() << "output emitters";
150 
151  assert(num_partitions == emit.size());
152  }
153 
154  //! non-copyable: delete copy-constructor
155  ReducePrePhase(const ReducePrePhase&) = delete;
156  //! non-copyable: delete assignment operator
157  ReducePrePhase& operator = (const ReducePrePhase&) = delete;
158 
159  void Initialize(size_t limit_memory_bytes) {
160  table_.Initialize(limit_memory_bytes);
161  }
162 
163  void InitializeSkip() {
164  table_.InitializeSkip();
165  }
166 
167  bool Insert(const Value& v) {
168  // for VolatileKey this makes std::pair and extracts the key
169  return table_.Insert(MakeTableItem::Make(v, table_.key_extractor()));
170  }
171 
172  void InsertSkip(const Value& v) {
173  TableItem t = MakeTableItem::Make(v, table_.key_extractor());
174  typename IndexFunction::Result h = table_.calculate_index(t);
175  emit_.Emit(h.partition_id, t);
176  }
177 
178  //! Flush all partitions
179  void FlushAll() {
180  for (size_t id = 0; id < table_.num_partitions(); ++id) {
181  FlushPartition(id, /* consume */ true, /* grow */ false);
182  }
183  }
184 
185  //! Flushes a partition
186  void FlushPartition(size_t partition_id, bool consume, bool grow) {
187  table_.FlushPartition(partition_id, consume, grow);
188  // data is flushed immediately, there is no spilled data
189  }
190 
191  //! Closes all emitter
192  void CloseAll() {
193  emit_.CloseAll();
194  table_.Dispose();
195  }
196 
197  //! \name Accessors
198  //! \{
199 
200  //! Returns the total num of items in the table.
201  size_t num_items() const { return table_.num_items(); }
202 
203  //! calculate key range for the given output partition
204  common::Range key_range(size_t partition_id)
205  { return table_.key_range(partition_id); }
206 
207  //! \}
208 
209 protected:
210  //! Emitters used to parameterize hash table for output to network.
211  Emitter emit_;
212 
213  //! extractor function which maps a value to it's key
214  KeyExtractor key_extractor_;
215 
216  //! the first-level hash table implementation
218 };
219 
220 template <typename TableItem, typename Key, typename Value,
221  typename KeyExtractor, typename ReduceFunction,
222  const bool VolatileKey, typename BlockWriter,
223  typename ReduceConfig,
224  typename IndexFunction,
225  typename EqualToFunction,
226  typename HashFunction>
227 class ReducePrePhase<TableItem, Key, Value,
228  KeyExtractor,
229  ReduceFunction,
230  VolatileKey,
231  BlockWriter,
232  ReduceConfig,
233  IndexFunction,
234  EqualToFunction,
235  HashFunction,
236  true>
237  : public ReducePrePhase<TableItem, Key, Value,
238  KeyExtractor,
239  ReduceFunction,
240  VolatileKey,
241  BlockWriter,
242  ReduceConfig,
243  IndexFunction,
244  EqualToFunction,
245  HashFunction,
246  false>
247 {
248 
249 public:
250  using Super = ReducePrePhase<TableItem, Key, Value, KeyExtractor,
251  ReduceFunction, VolatileKey, BlockWriter,
252  ReduceConfig,
253  IndexFunction, EqualToFunction, HashFunction,
254  false>;
255  using KeyValuePair = std::pair<Key, Value>;
256 
257  ReducePrePhase(Context& ctx, size_t dia_id,
258  size_t num_partitions,
259  KeyExtractor key_extractor,
260  ReduceFunction reduce_function,
261  std::vector<BlockWriter>& emit,
262  const ReduceConfig& config = ReduceConfig(),
263  const IndexFunction& index_function = IndexFunction(),
264  const EqualToFunction& equal_to_function = EqualToFunction(),
265  const HashFunction hash_function = HashFunction())
266  : Super(ctx, dia_id, num_partitions, key_extractor, reduce_function,
267  emit, config, index_function, equal_to_function, hash_function,
268  /*duplicates*/ true),
269  hash_function_(hash_function) { }
270 
271  void Insert(const Value& v) {
272  if (Super::table_.Insert(
273  Super::MakeTableItem::Make(v, Super::table_.key_extractor()))) {
274  hashes_.push_back(hash_function_(Super::key_extractor_(v)));
275  }
276  }
277 
278  //! Flush all partitions
279  void FlushAll() {
280  DuplicateDetection dup_detect;
281  max_hash_ = dup_detect.FindNonDuplicates(non_duplicates_,
282  hashes_,
283  Super::table_.ctx(),
284  Super::table_.dia_id());
285 
286  for (size_t id = 0; id < Super::table_.num_partitions(); ++id) {
287  FlushPartition(id, /* consume */ true, /* grow */ false);
288  }
289  }
290 
291  void FlushPartition(size_t partition_id, bool consume, bool grow) {
292  Super::table_.FlushPartitionEmit(
293  partition_id, consume, grow,
294  [this](const size_t& partition_id, const TableItem& ti) {
295  Key key = Super::MakeTableItem::GetKey(
296  ti, Super::table_.key_extractor());
297  if (!non_duplicates_[hash_function_(key) % max_hash_]) {
298 
299  duplicated_elements_++;
300  Super::emit_.Emit(partition_id, ti);
301  }
302  else {
303  non_duplicate_elements_++;
304  Super::emit_.Emit(Super::table_.ctx().my_rank(), ti);
305  }
306  });
307 
308  if (Super::table_.has_spilled_data_on_partition(partition_id)) {
309  data::File::Reader reader =
310  Super::table_.partition_files()[partition_id].GetReader(true);
311  while (reader.HasNext()) {
312  TableItem ti = reader.Next<TableItem>();
313  Key key = Super::MakeTableItem::GetKey(
314  ti, Super::table_.key_extractor());
315  if (!non_duplicates_[hash_function_(key) % max_hash_]) {
316 
317  duplicated_elements_++;
318  Super::emit_.Emit(partition_id, ti);
319  }
320  else {
321  non_duplicate_elements_++;
322  Super::emit_.Emit(Super::table_.ctx().my_rank(), ti);
323  }
324  }
325  }
326 
327  // flush elements pushed into emitter
328  Super::emit_.Flush(partition_id);
329  Super::emit_.Flush(Super::table_.ctx().my_rank());
330  }
331 
332  //! \name Duplicate Detection
333  //! \{
334 
335  HashFunction hash_function_;
336  //! Hashes of all keys.
337  std::vector<size_t> hashes_;
338  //! All elements occuring on more than one worker. (Elements not appearing here
339  //! can be reduced locally)
340  std::vector<bool> non_duplicates_;
341  //! Modulo for all hashes in duplicate detection to reduce hash space.
342  size_t max_hash_;
343 
344  size_t duplicated_elements_ = 0;
345  size_t non_duplicate_elements_ = 0;
346 
347  //! \}
348 };
349 
350 } // namespace core
351 } // namespace thrill
352 
353 #endif // !THRILL_CORE_REDUCE_PRE_PHASE_HEADER
354 
355 /******************************************************************************/
std::vector< size_t > stats_
Emitter stats.
ReducePrePhase(Context &ctx, size_t dia_id, size_t num_partitions, KeyExtractor key_extractor, ReduceFunction reduce_function, std::vector< BlockWriter > &emit, const ReduceConfig &config=ReduceConfig(), const IndexFunction &index_function=IndexFunction(), const KeyEqualFunction &key_equal_function=KeyEqualFunction(), const HashFunction hash_function=HashFunction(), bool duplicates=false)
A data structure which takes an arbitrary value and extracts a key using a key extractor function fro...
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
typename ReduceTableSelect< ReduceConfig::table_impl_, TableItem, Key, Value, KeyExtractor, ReduceFunction, Emitter, VolatileKey, ReduceConfig, IndexFunction, KeyEqualFunction >::type Table
Type selection via ReduceTableImpl enum.
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
size_t FindNonDuplicates(std::vector< bool > &non_duplicates, std::vector< size_t > &hashes, Context &context, size_t dia_id)
Identifies all hashes which occur on only a single worker.
void Emit(const size_t &partition_id, const TableItem &p)
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
std::vector< BlockWriter > & writer_
Set of emitters, one per partition.
void unused(Types &&...)
Definition: unused.hpp:20
A reduce index function which returns a hash index and partition.
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
Configuration class to define operational parameters of reduce hash tables and reduce phases...
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
ReducePrePhase(Context &ctx, size_t dia_id, size_t num_partitions, KeyExtractor key_extractor, ReduceFunction reduce_function, std::vector< BlockWriter > &emit, const ReduceConfig &config=ReduceConfig(), const IndexFunction &index_function=IndexFunction(), const EqualToFunction &equal_to_function=EqualToFunction(), const HashFunction hash_function=HashFunction())
ReducePrePhaseEmitter(std::vector< BlockWriter > &writer)
Duplicate detection to identify all elements occuring only on one worker.