Thrill  0.1
location_detection.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/core/location_detection.hpp
3  *
4  * Detection of element locations using a distributed bloom filter
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2016 Alexander Noe <[email protected]>
9  * Copyright (C) 2017 Tim Zeitz <[email protected]>
10  *
11  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
12  ******************************************************************************/
13 
14 #pragma once
15 #ifndef THRILL_CORE_LOCATION_DETECTION_HEADER
16 #define THRILL_CORE_LOCATION_DETECTION_HEADER
17 
19 #include <thrill/common/logger.hpp>
30 
32 #include <tlx/vector_free.hpp>
33 
34 #include <algorithm>
35 #include <functional>
36 #include <unordered_map>
37 #include <utility>
38 #include <vector>
39 
40 namespace thrill {
41 namespace core {
42 
43 /*!
44  * Emitter for a ReduceTable, which emits all of its data into a vector of
45  * hash-counter-pairs.
46  */
47 template <typename HashCount>
49 {
50 public:
51  explicit ToVectorEmitter(std::vector<HashCount>& vec)
52  : vec_(vec) { }
53 
54  void SetModulo(size_t modulo) {
55  modulo_ = modulo;
56  }
57 
58  void Emit(const size_t& /* partition_id */, const HashCount& p) {
59  assert(modulo_ > 1);
60  vec_.emplace_back(p);
61  vec_.back().hash %= modulo_;
62  }
63 
64 private:
65  std::vector<HashCount>& vec_;
66  size_t modulo_ = 1;
67 };
68 
69 template <typename HashCount>
71 {
72 private:
73  static constexpr bool debug = false;
74 
75  using GolombBitStreamWriter =
77 
78  using GolombBitStreamReader =
80 
81  using GolumbDeltaWriter =
83 
84  using GolumbDeltaReader =
86 
88 
90  {
91  public:
92  GolombPairReader(GolombBitStreamReader& bit_reader,
93  GolumbDeltaReader& delta_reader)
94  : bit_reader_(bit_reader), delta_reader_(delta_reader) { }
95 
96  bool HasNext() {
97  return bit_reader_.HasNext();
98  }
99 
100  template <typename Type>
101  HashCount Next() {
102  HashCount hc;
103  hc.hash = delta_reader_.Next<size_t>();
104  hc.ReadBits(bit_reader_);
105  return hc;
106  }
107 
108  private:
109  GolombBitStreamReader& bit_reader_;
111  };
112 
113  struct ExtractHash {
114  size_t operator () (const HashCount& hc) const { return hc.hash; }
115  };
116 
117 private:
118  void WriteOccurenceCounts(const data::CatStreamPtr& stream_pointer,
119  const std::vector<HashCount>& hash_occ,
120  size_t golomb_param,
121  size_t num_workers,
122  size_t max_hash) {
123 
124  data::CatStream::Writers writers = stream_pointer->GetWriters();
125 
126  for (size_t i = 0, j = 0; i < num_workers; ++i) {
127  common::Range range_i =
128  common::CalculateLocalRange(max_hash, num_workers, i);
129 
130  GolombBitStreamWriter golomb_writer(writers[i], golomb_param);
131  GolumbDeltaWriter delta_writer(
132  golomb_writer,
133  /* initial */ size_t(-1) /* cancels with +1 bias */);
134 
135  while (j < hash_occ.size() && hash_occ[j].hash < range_i.end)
136  {
137  // accumulate counters hashing to same value
138  HashCount total_hash = hash_occ[j++];
139  while (j < hash_occ.size() &&
140  hash_occ[j].hash == total_hash.hash)
141  {
142  total_hash += hash_occ[j++];
143  }
144 
145  // write counter of all values hashing to hash_occ[j].hash
146  // in next bitsize bits
147  delta_writer.Put(total_hash.hash);
148  total_hash.WriteBits(golomb_writer);
149  }
150  }
151  }
152 
153 public:
156 
157  using Table = typename ReduceTableSelect<
158  ReduceConfig::table_impl_,
159  HashCount, typename HashCount::HashType, HashCount,
160  ExtractHash, std::plus<HashCount>, Emitter,
161  /* VolatileKey */ false, ReduceConfig>::type;
162 
163  LocationDetection(Context& ctx, size_t dia_id,
164  const ReduceConfig& config = ReduceConfig())
165  : emit_(hash_occ_),
166  context_(ctx),
167  dia_id_(dia_id),
168  config_(config),
169  table_(ctx, dia_id, ExtractHash(), std::plus<HashCount>(),
170  emit_, 1, config) {
171  sLOG << "creating LocationDetection";
172  }
173 
174  /*!
175  * Initializes the table to the memory limit size.
176  *
177  * \param limit_memory_bytes Memory limit in bytes
178  */
179  void Initialize(size_t limit_memory_bytes) {
180  table_.Initialize(limit_memory_bytes);
181  }
182 
183  /*!
184  * Inserts a HashCount item into the table.
185  */
186  void Insert(const HashCount& item) {
187  table_.Insert(item);
188  }
189 
190  /*!
191  * Flushes the table and detects the most common location for each element.
192  */
193  size_t Flush(std::unordered_map<size_t, size_t>& target_processors) {
194 
195  size_t num_items = table_.num_items();
196  if (table_.has_spilled_data_on_partition(0)) {
197  num_items += table_.partition_files()[0].num_items();
198  }
199 
200  // golomb code parameters
201  size_t upper_bound_uniques = context_.net.AllReduce(num_items);
202  double fpr_parameter = 8;
203  size_t golomb_param = (size_t)fpr_parameter;
204  size_t max_hash = golomb_param * upper_bound_uniques;
205 
206  emit_.SetModulo(max_hash);
207  hash_occ_.reserve(num_items);
208  table_.FlushAll();
209 
210  if (table_.has_spilled_data_on_partition(0)) {
211  data::File::Reader reader =
212  table_.partition_files()[0].GetReader(true);
213 
214  while (reader.HasNext()) {
215  emit_.Emit(0, reader.Next<HashCount>());
216  }
217  }
218 
219  std::sort(hash_occ_.begin(), hash_occ_.end());
220 
221  data::CatStreamPtr golomb_data_stream =
222  context_.GetNewCatStream(dia_id_);
223 
224  WriteOccurenceCounts(golomb_data_stream,
225  hash_occ_, golomb_param,
226  context_.num_workers(),
227  max_hash);
228 
229  tlx::vector_free(hash_occ_);
230 
231  // get inbound Golomb/delta-encoded hash stream
232 
233  std::vector<data::CatStream::Reader> hash_readers =
234  golomb_data_stream->GetReaders();
235 
236  std::vector<GolombBitStreamReader> golomb_readers;
237  std::vector<GolumbDeltaReader> delta_readers;
238  std::vector<GolombPairReader> pair_readers;
239 
240  golomb_readers.reserve(context_.num_workers());
241  delta_readers.reserve(context_.num_workers());
242  pair_readers.reserve(context_.num_workers());
243 
244  for (auto& reader : hash_readers) {
245  golomb_readers.emplace_back(reader, golomb_param);
246  delta_readers.emplace_back(
247  golomb_readers.back(),
248  /* initial */ size_t(-1) /* cancels with +1 bias */);
249  pair_readers.emplace_back(
250  golomb_readers.back(), delta_readers.back());
251  }
252 
253  size_t worker_bitsize = std::max(
254  tlx::integer_log2_ceil(context_.num_workers()), (unsigned int)1);
255 
256  // multi-way merge hash streams and detect hosts with most items per key
257 
258  auto puller = make_multiway_merge_tree<HashCount>(
259  pair_readers.begin(), pair_readers.end());
260 
261  // create streams (delta/Golomb encoded) to notify workers of location
262 
263  data::CatStreamPtr location_stream = context_.GetNewCatStream(dia_id_);
264 
265  data::CatStream::Writers location_writers =
266  location_stream->GetWriters();
267 
268  std::vector<GolombBitStreamWriter> location_gbsw;
269  std::vector<GolumbDeltaWriter> location_dw;
270  location_gbsw.reserve(context_.num_workers());
271  location_dw.reserve(context_.num_workers());
272 
273  for (size_t i = 0; i < context_.num_workers(); ++i) {
274  location_gbsw.emplace_back(location_writers[i], golomb_param);
275  location_dw.emplace_back(
276  location_gbsw.back(),
277  /* initial */ size_t(-1) /* cancels with +1 bias */);
278  }
279 
280  std::pair<HashCount, size_t> next;
281  std::vector<size_t> workers;
282 
283  bool not_finished = puller.HasNext();
284 
285  if (not_finished)
286  next = puller.NextWithSource();
287 
288  while (not_finished) {
289  // set up aggregation values from first item with equal hash
290  HashCount sum = next.first;
291  workers.push_back(next.second);
292 
293  size_t max_worker = next.second;
294  CounterType max_count = sum.count;
295 
296  // check if another item is available, and if it has the same hash
297  while ((not_finished = puller.HasNext()) &&
298  (next = puller.NextWithSource(),
299  next.first.hash == sum.hash))
300  {
301  // summarize items (this sums dia_masks and counts)
302  sum += next.first;
303 
304  // check if count is higher
305  if (next.first.count > max_count) {
306  max_count = next.first.count;
307  max_worker = next.second;
308  }
309  // store all workers to notify
310  workers.push_back(next.second);
311  }
312 
313  // for dia_mask == 3 -> notify all participating workers (this is
314  // for InnerJoin, since only they need the items)
315  if (sum.NeedBroadcast()) {
316  for (const size_t& w : workers) {
317  location_dw[w].Put(sum.hash);
318  location_gbsw[w].PutBits(max_worker, worker_bitsize);
319  LOG << "Put: " << sum.hash << " @ " << max_worker
320  << " -> " << w;
321  }
322  }
323 
324  workers.clear();
325  }
326 
327  // close target-worker writers
328  location_dw.clear();
329  location_gbsw.clear();
330  location_writers.Close();
331 
332  // read location notifications and store them in the unordered_map
333 
334  std::vector<data::CatStream::Reader> location_readers =
335  location_stream->GetReaders();
336 
337  target_processors.reserve(num_items);
338 
339  for (data::CatStream::Reader& reader : location_readers)
340  {
341  GolombBitStreamReader golomb_reader(reader, golomb_param);
342  GolumbDeltaReader delta_reader(
343  golomb_reader, /* initial */ size_t(-1) /* cancels at +1 */);
344 
345  // Builds golomb encoded bitset from data received by the stream.
346  while (delta_reader.HasNext()) {
347  // Golomb code contains deltas, we want the actual values
348  size_t hash = delta_reader.Next<size_t>();
349  size_t worker = golomb_reader.GetBits(worker_bitsize);
350 
351  LOG << "Hash " << hash << " on worker " << worker;
352  target_processors[hash] = worker;
353  }
354  }
355 
356  return max_hash;
357  }
358 
359  void Dispose() {
360  table_.Dispose();
361  tlx::vector_free(hash_occ_);
362  }
363 
364  //! Target vector for vector emitter
365  std::vector<HashCount> hash_occ_;
366  //! Emitter to vector
367  Emitter emit_;
368  //! Thrill context
370  size_t dia_id_;
371  //! Reduce configuration used
373  //! Reduce table used to count keys
375 };
376 
377 } // namespace core
378 } // namespace thrill
379 
380 #endif // !THRILL_CORE_LOCATION_DETECTION_HEADER
381 
382 /******************************************************************************/
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
std::vector< HashCount > & vec_
Type selection via ReduceTableImpl enum.
GolombPairReader(GolombBitStreamReader &bit_reader, GolumbDeltaReader &delta_reader)
STL namespace.
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
ssize_t CounterType
void Emit(const size_t &, const HashCount &p)
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
size_t GetBits(unsigned bits)
Get bits at the cursor.
Definition: bit_stream.hpp:147
Emitter for a ReduceTable, which emits all of its data into a vector of hash-counter-pairs.
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
std::vector< HashCount > hash_occ_
Target vector for vector emitter.
void Close()
custom destructor to close writers is a cyclic fashion
Definition: stream_data.cpp:92
LocationDetection(Context &ctx, size_t dia_id, const ReduceConfig &config=ReduceConfig())
ToVectorEmitter(std::vector< HashCount > &vec)
Context & context_
Thrill context.
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:59
static Range CalculateLocalRange(size_t global_size, size_t p, size_t i)
Definition: math.hpp:110
void Put(const Type &value)
void Insert(const HashCount &item)
Inserts a HashCount item into the table.
void Initialize(size_t limit_memory_bytes)
Initializes the table to the memory limit size.
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t end
end index
Definition: math.hpp:58
size_t Flush(std::unordered_map< size_t, size_t > &target_processors)
Flushes the table and detects the most common location for each element.
Configuration class to define operational parameters of reduce hash tables and reduce phases...
static constexpr bool debug
ReduceConfig config_
Reduce configuration used.
void vector_free(std::vector< Type > &v)
Definition: vector_free.hpp:21
Emitter emit_
Emitter to vector.
static unsigned integer_log2_ceil(int i)
calculate the log2 floor of an integer type
typename thrill::api::JoinNode::HashCount ::CounterType CounterType
Table table_
Reduce table used to count keys.
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
HashCrc32< T > hash
Select a hashing method.
Definition: hash.hpp:262
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
typename ReduceTableSelect< ReduceConfig::table_impl_, thrill::api::JoinNode::HashCount, typename thrill::api::JoinNode::HashCount ::HashType, thrill::api::JoinNode::HashCount, ExtractHash, std::plus< thrill::api::JoinNode::HashCount >, Emitter, false, ReduceConfig >::type Table
void WriteOccurenceCounts(const data::CatStreamPtr &stream_pointer, const std::vector< HashCount > &hash_occ, size_t golomb_param, size_t num_workers, size_t max_hash)