Thrill  0.1
duplicate_detection.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/core/duplicate_detection.hpp
3  *
4  * Duplicate detection
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2016 Alexander Noe <[email protected]>
9  * Copyright (C) 2017 Timo Bingmann <[email protected]>
10  *
11  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
12  ******************************************************************************/
13 
14 #pragma once
15 #ifndef THRILL_CORE_DUPLICATE_DETECTION_HEADER
16 #define THRILL_CORE_DUPLICATE_DETECTION_HEADER
17 
18 #include <thrill/api/context.hpp>
19 #include <thrill/common/logger.hpp>
23 
24 #include <algorithm>
25 #include <memory>
26 #include <utility>
27 #include <vector>
28 
29 namespace thrill {
30 namespace core {
31 
32 /*!
33  * Duplicate detection to identify all elements occuring only on one worker.
34  * This information can be used to locally reduce uniquely-occuring elements.
35  * Therefore this saves communication volume in operations such as api::Reduce()
36  * or api::Join().
37  *
38  * Internally, this duplicate detection uses a golomb encoded distributed single
39  * shot bloom filter to find duplicates and non-duplicates with as low
40  * communication volume as possible. Due to the bloom filter's inherent
41  * properties, this has false duplicates but no false non-duplicates.
42  *
43  * Should only be used when a large amount of uniquely-occuring elements are
44  * expected.
45  */
47 {
48  static constexpr bool debug = false;
49 
50 private:
51  using GolombBitStreamWriter =
53 
54  using GolombBitStreamReader =
56 
57  using GolumbDeltaWriter =
59 
60  using GolumbDeltaReader =
62 
63  /*!
64  * Sends all hashes in the range
65  * [max_hash / num_workers * p, max_hash / num_workers * (p + 1)) to worker
66  * p. These hashes are encoded with a Golomb encoder in core.
67  *
68  * \param stream_pointer Pointer to data stream
69  * \param hashes Sorted vector of all hashes modulo max_hash
70  * \param golomb_param Golomb parameter
71  * \param num_workers Number of workers in this Thrill process
72  * \param max_hash Modulo for all hashes
73  */
74  void WriteEncodedHashes(const data::CatStreamPtr& stream_pointer,
75  const std::vector<size_t>& hashes,
76  size_t golomb_param,
77  size_t num_workers,
78  size_t max_hash) {
79 
80  data::CatStream::Writers writers = stream_pointer->GetWriters();
81 
82  size_t prev_hash = size_t(-1);
83 
84  for (size_t i = 0, j = 0; i < num_workers; ++i) {
85  common::Range range_i =
86  common::CalculateLocalRange(max_hash, num_workers, i);
87 
88  GolombBitStreamWriter golomb_writer(writers[i], golomb_param);
89  GolumbDeltaWriter delta_writer(
90  golomb_writer,
91  /* initial */ size_t(-1) /* cancels with +1 bias */);
92 
93  for ( /* j is already set from previous workers */
94  ; j < hashes.size() && hashes[j] < range_i.end; ++j) {
95  // Send hash deltas to make the encoded bitset smaller.
96  if (hashes[j] == prev_hash)
97  continue;
98  delta_writer.Put(hashes[j]);
99  prev_hash = hashes[j];
100  }
101  }
102  }
103 
104  /*!
105  * Reads a golomb encoded bitset from a data stream and returns it's
106  * contents in form of a vector of hashes.
107  *
108  * \param stream_pointer Pointer to data stream
109  * \param non_duplicates Target vector for hashes, should be empty beforehand
110  * \param golomb_param Golomb parameter
111  */
112  void ReadEncodedHashesToVector(const data::CatStreamPtr& stream_pointer,
113  std::vector<bool>& non_duplicates,
114  size_t golomb_param) {
115 
116  std::vector<data::CatStream::Reader> readers =
117  stream_pointer->GetReaders();
118 
119  for (data::CatStream::Reader& reader : readers)
120  {
121  GolombBitStreamReader golomb_reader(reader, golomb_param);
122  GolumbDeltaReader delta_reader(
123  golomb_reader, /* initial */ size_t(-1) /* cancels at +1 */);
124 
125  // Builds golomb encoded bitset from data received by the stream.
126  while (delta_reader.HasNext()) {
127  // Golomb code contains deltas, we want the actual values
128  size_t hash = delta_reader.Next<size_t>();
129  assert(hash < non_duplicates.size());
130  non_duplicates[hash] = true;
131  }
132  }
133  }
134 
135 public:
136  /*!
137  * Identifies all hashes which occur on only a single worker.
138  * Returns all local uniques in form of a vector of hashes.
139  *
140  * \param non_duplicates Empty vector, which contains all non-duplicate
141  * hashes after this method
142  * \param hashes Hashes for all elements on this worker.
143  * \param context Thrill context, used for collective communication
144  * \param dia_id Id of the operation, which calls this method. Used
145  * to uniquely identify the data streams used.
146  *
147  * \return Modulo used on all hashes. (Use this modulo on all hashes to
148  * identify possible non-duplicates)
149  */
150  size_t FindNonDuplicates(std::vector<bool>& non_duplicates,
151  std::vector<size_t>& hashes,
152  Context& context,
153  size_t dia_id) {
154 
155  // This bound could often be lowered when we have many duplicates.
156  // This would however require a large amount of added communication.
157  size_t upper_bound_uniques = context.net.AllReduce(hashes.size());
158 
159  // Golomb Parameters taken from original paper (Sanders, Schlag, Müller)
160 
161  // Parameter for false positive rate (FPR: 1/fpr_parameter)
162  double fpr_parameter = 8;
163  size_t golomb_param = (size_t)fpr_parameter; // (size_t)(std::log(2) * fpr_parameter);
164  size_t max_hash = upper_bound_uniques * fpr_parameter;
165 
166  for (size_t i = 0; i < hashes.size(); ++i) {
167  hashes[i] = hashes[i] % max_hash;
168  }
169 
170  std::sort(hashes.begin(), hashes.end());
171 
172  data::CatStreamPtr golomb_data_stream = context.GetNewCatStream(dia_id);
173 
174  WriteEncodedHashes(golomb_data_stream,
175  hashes, golomb_param,
176  context.num_workers(),
177  max_hash);
178 
179  // get inbound Golomb/delta-encoded hash stream
180 
181  std::vector<data::CatStream::Reader> readers =
182  golomb_data_stream->GetReaders();
183 
184  std::vector<GolombBitStreamReader> g_readers;
185  std::vector<GolumbDeltaReader> delta_readers;
186  g_readers.reserve(context.num_workers());
187  delta_readers.reserve(context.num_workers());
188 
189  for (auto& reader : readers) {
190  g_readers.emplace_back(reader, golomb_param);
191  delta_readers.emplace_back(
192  g_readers.back(),
193  /* initial */ size_t(-1) /* cancels with +1 bias */);
194  }
195 
196  // multi-way merge hash streams and detect duplicates/notify uniques
197 
198  auto puller = make_multiway_merge_tree<size_t>(
199  delta_readers.begin(), delta_readers.end());
200 
201  // create streams (delta/Golomb encoded) to notify workers of duplicates
202 
203  data::CatStreamPtr duplicates_stream = context.GetNewCatStream(dia_id);
204 
205  data::CatStream::Writers duplicates_writers =
206  duplicates_stream->GetWriters();
207 
208  std::vector<GolombBitStreamWriter> duplicates_gbsw;
209  std::vector<GolumbDeltaWriter> duplicates_dw;
210  duplicates_gbsw.reserve(context.num_workers());
211  duplicates_dw.reserve(context.num_workers());
212 
213  for (size_t i = 0; i < context.num_workers(); ++i) {
214  duplicates_gbsw.emplace_back(duplicates_writers[i], golomb_param);
215  duplicates_dw.emplace_back(
216  duplicates_gbsw.back(),
217  /* initial */ size_t(-1) /* cancels with +1 bias */);
218  }
219 
220  // find all keys only occuring on a single worker and insert to
221  // according bitset
222 
223  if (puller.HasNext())
224  {
225  std::pair<size_t, size_t> this_item;
226  std::pair<size_t, size_t> next_item = puller.NextWithSource();
227 
228  while (puller.HasNext())
229  {
230  this_item = next_item;
231  next_item = puller.NextWithSource();
232 
233  if (this_item.first != next_item.first) {
234  // this_item is a unique
235  sLOG << "!" << this_item.first << "->" << this_item.second;
236  duplicates_dw[this_item.second].Put(this_item.first);
237  }
238  else {
239  // this_item is a duplicate with next_item
240  sLOG << "=" << this_item.first << "-" << this_item.second;
241 
242  // read more items into next_item until the key mismatches
243  while (puller.HasNext() &&
244  (next_item = puller.NextWithSource(),
245  next_item.first == this_item.first))
246  {
247  sLOG << "." << next_item.first << "-" << next_item.second;
248  }
249  }
250  }
251 
252  if (this_item.first != next_item.first) {
253  // last item (next_item) is a unique
254  sLOG << "!" << next_item.first << "->" << next_item.second;
255  duplicates_dw[next_item.second].Put(next_item.first);
256  }
257  }
258 
259  // close duplicate delta writers
260  duplicates_dw.clear();
261  duplicates_gbsw.clear();
262  duplicates_writers.Close();
263 
264  // read inbound duplicate hash bits into non_duplicates hash table
265  assert(non_duplicates.size() == 0);
266  non_duplicates.resize(max_hash);
268  duplicates_stream, non_duplicates, golomb_param);
269 
270  return max_hash;
271  }
272 };
273 
274 } // namespace core
275 } // namespace thrill
276 
277 #endif // !THRILL_CORE_DUPLICATE_DETECTION_HEADER
278 
279 /******************************************************************************/
net::FlowControlChannel & net
Definition: context.hpp:446
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:251
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
size_t FindNonDuplicates(std::vector< bool > &non_duplicates, std::vector< size_t > &hashes, Context &context, size_t dia_id)
Identifies all hashes which occur on only a single worker.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT AllReduce(const T &value, const BinarySumOp &sum_op=BinarySumOp())
Reduces a value of a serializable type T over all workers given a certain reduce function.
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
void Close()
custom destructor to close writers is a cyclic fashion
Definition: stream_data.cpp:92
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:59
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1209
static Range CalculateLocalRange(size_t global_size, size_t p, size_t i)
Definition: math.hpp:110
void Put(const Type &value)
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t end
end index
Definition: math.hpp:58
void WriteEncodedHashes(const data::CatStreamPtr &stream_pointer, const std::vector< size_t > &hashes, size_t golomb_param, size_t num_workers, size_t max_hash)
Sends all hashes in the range [max_hash / num_workers * p, max_hash / num_workers * (p + 1)) to worke...
core::GolombBitStreamReader< data::CatStream::Reader > GolombBitStreamReader
core::GolombBitStreamWriter< data::CatStream::Writer > GolombBitStreamWriter
void ReadEncodedHashesToVector(const data::CatStreamPtr &stream_pointer, std::vector< bool > &non_duplicates, size_t golomb_param)
Reads a golomb encoded bitset from a data stream and returns it&#39;s contents in form of a vector of has...
HashCrc32< T > hash
Select a hashing method.
Definition: hash.hpp:262
Duplicate detection to identify all elements occuring only on one worker.