Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
location_detection.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/core/location_detection.hpp
3  *
4  * Detection of element locations using a distributed bloom filter
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2016 Alexander Noe <[email protected]>
9  * Copyright (C) 2017 Tim Zeitz <[email protected]>
10  *
11  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
12  ******************************************************************************/
13 
14 #pragma once
15 #ifndef THRILL_CORE_LOCATION_DETECTION_HEADER
16 #define THRILL_CORE_LOCATION_DETECTION_HEADER
17 
19 #include <thrill/common/logger.hpp>
30 
32 #include <tlx/vector_free.hpp>
33 
34 #include <algorithm>
35 #include <functional>
36 #include <unordered_map>
37 #include <utility>
38 #include <vector>
39 
40 namespace thrill {
41 namespace core {
42 
43 /*!
44  * Emitter for a ReduceTable, which emits all of its data into a vector of
45  * hash-counter-pairs.
46  */
47 template <typename HashCount>
49 {
50 public:
51  explicit ToVectorEmitter(std::vector<HashCount>& vec)
52  : vec_(vec) { }
53 
54  void SetModulo(size_t modulo) {
55  modulo_ = modulo;
56  }
57 
58  void Emit(const size_t& /* partition_id */, const HashCount& p) {
59  assert(modulo_ > 1);
60  vec_.emplace_back(p);
61  vec_.back().hash %= modulo_;
62  }
63 
64 private:
65  std::vector<HashCount>& vec_;
66  size_t modulo_ = 1;
67 };
68 
69 template <typename HashCount>
71 {
72 private:
73  static constexpr bool debug = false;
74 
75  using GolombBitStreamWriter =
77 
78  using GolombBitStreamReader =
80 
81  using GolumbDeltaWriter =
83 
84  using GolumbDeltaReader =
86 
88 
90  {
91  public:
93  GolumbDeltaReader& delta_reader)
94  : bit_reader_(bit_reader), delta_reader_(delta_reader) { }
95 
96  bool HasNext() {
97  return bit_reader_.HasNext();
98  }
99 
100  template <typename Type>
101  HashCount Next() {
102  HashCount hc;
103  hc.hash = delta_reader_.Next<size_t>();
104  hc.ReadBits(bit_reader_);
105  return hc;
106  }
107 
108  private:
111  };
112 
113  struct ExtractHash {
114  size_t operator () (const HashCount& hc) const { return hc.hash; }
115  };
116 
117 private:
118  void WriteOccurenceCounts(const data::CatStreamPtr& stream_pointer,
119  const std::vector<HashCount>& hash_occ,
120  size_t golomb_param,
121  size_t num_workers,
122  size_t max_hash) {
123 
124  data::CatStream::Writers writers = stream_pointer->GetWriters();
125 
126  for (size_t i = 0, j = 0; i < num_workers; ++i) {
127  common::Range range_i =
128  common::CalculateLocalRange(max_hash, num_workers, i);
129 
130  GolombBitStreamWriter golomb_writer(writers[i], golomb_param);
131  GolumbDeltaWriter delta_writer(
132  golomb_writer,
133  /* initial */ size_t(-1) /* cancels with +1 bias */);
134 
135  while (j < hash_occ.size() && hash_occ[j].hash < range_i.end)
136  {
137  // accumulate counters hashing to same value
138  HashCount total_hash = hash_occ[j++];
139  while (j < hash_occ.size() &&
140  hash_occ[j].hash == total_hash.hash)
141  {
142  total_hash += hash_occ[j++];
143  }
144 
145  // write counter of all values hashing to hash_occ[j].hash
146  // in next bitsize bits
147  delta_writer.Put(total_hash.hash);
148  total_hash.WriteBits(golomb_writer);
149  }
150  }
151  }
152 
153 public:
156 
157  using Table = typename ReduceTableSelect<
159  HashCount, typename HashCount::HashType, HashCount,
160  ExtractHash, std::plus<HashCount>, Emitter,
161  /* VolatileKey */ false, ReduceConfig>::type;
162 
163  LocationDetection(Context& ctx, size_t dia_id,
164  const ReduceConfig& config = ReduceConfig())
165  : emit_(hash_occ_),
166  context_(ctx),
167  dia_id_(dia_id),
168  config_(config),
169  table_(ctx, dia_id, ExtractHash(), std::plus<HashCount>(),
170  emit_, 1, config) {
171  sLOG << "creating LocationDetection";
172  }
173 
174  /*!
175  * Initializes the table to the memory limit size.
176  *
177  * \param limit_memory_bytes Memory limit in bytes
178  */
179  void Initialize(size_t limit_memory_bytes) {
180  table_.Initialize(limit_memory_bytes);
181  }
182 
183  /*!
184  * Inserts a HashCount item into the table.
185  */
186  void Insert(const HashCount& item) {
187  table_.Insert(item);
188  }
189 
190  /*!
191  * Flushes the table and detects the most common location for each element.
192  */
193  size_t Flush(std::unordered_map<size_t, size_t>& target_processors) {
194 
195  size_t num_items = table_.num_items();
196  if (table_.has_spilled_data_on_partition(0)) {
197  num_items += table_.partition_files()[0].num_items();
198  }
199 
200  // golomb code parameters
201  size_t upper_bound_uniques = context_.net.AllReduce(num_items);
202  double fpr_parameter = 8;
203  size_t golomb_param = (size_t)fpr_parameter;
204  size_t max_hash = golomb_param * upper_bound_uniques;
205 
206  emit_.SetModulo(max_hash);
207  hash_occ_.reserve(num_items);
208  table_.FlushAll();
209 
210  if (table_.has_spilled_data_on_partition(0)) {
211  data::File::Reader reader =
212  table_.partition_files()[0].GetReader(true);
213 
214  while (reader.HasNext()) {
215  emit_.Emit(0, reader.Next<HashCount>());
216  }
217  }
218 
219  std::sort(hash_occ_.begin(), hash_occ_.end());
220 
221  data::CatStreamPtr golomb_data_stream =
222  context_.GetNewCatStream(dia_id_);
223 
224  WriteOccurenceCounts(golomb_data_stream,
225  hash_occ_, golomb_param,
226  context_.num_workers(),
227  max_hash);
228 
230 
231  // get inbound Golomb/delta-encoded hash stream
232 
233  std::vector<data::CatStream::Reader> hash_readers =
234  golomb_data_stream->GetReaders();
235 
236  std::vector<GolombBitStreamReader> golomb_readers;
237  std::vector<GolumbDeltaReader> delta_readers;
238  std::vector<GolombPairReader> pair_readers;
239 
240  golomb_readers.reserve(context_.num_workers());
241  delta_readers.reserve(context_.num_workers());
242  pair_readers.reserve(context_.num_workers());
243 
244  for (auto& reader : hash_readers) {
245  golomb_readers.emplace_back(reader, golomb_param);
246  delta_readers.emplace_back(
247  golomb_readers.back(),
248  /* initial */ size_t(-1) /* cancels with +1 bias */);
249  pair_readers.emplace_back(
250  golomb_readers.back(), delta_readers.back());
251  }
252 
253  size_t worker_bitsize = std::max(
254  tlx::integer_log2_ceil(context_.num_workers()), (unsigned int)1);
255 
256  // multi-way merge hash streams and detect hosts with most items per key
257 
258  auto puller = make_multiway_merge_tree<HashCount>(
259  pair_readers.begin(), pair_readers.end());
260 
261  // create streams (delta/Golomb encoded) to notify workers of location
262 
263  data::CatStreamPtr location_stream = context_.GetNewCatStream(dia_id_);
264 
265  data::CatStream::Writers location_writers =
266  location_stream->GetWriters();
267 
268  std::vector<GolombBitStreamWriter> location_gbsw;
269  std::vector<GolumbDeltaWriter> location_dw;
270  location_gbsw.reserve(context_.num_workers());
271  location_dw.reserve(context_.num_workers());
272 
273  for (size_t i = 0; i < context_.num_workers(); ++i) {
274  location_gbsw.emplace_back(location_writers[i], golomb_param);
275  location_dw.emplace_back(
276  location_gbsw.back(),
277  /* initial */ size_t(-1) /* cancels with +1 bias */);
278  }
279 
280  std::pair<HashCount, size_t> next;
281  std::vector<size_t> workers;
282 
283  bool not_finished = puller.HasNext();
284 
285  if (not_finished)
286  next = puller.NextWithSource();
287 
288  while (not_finished) {
289  // set up aggregation values from first item with equal hash
290  HashCount sum = next.first;
291  workers.push_back(next.second);
292 
293  size_t max_worker = next.second;
294  CounterType max_count = sum.count;
295 
296  // check if another item is available, and if it has the same hash
297  while ((not_finished = puller.HasNext()) &&
298  (next = puller.NextWithSource(),
299  next.first.hash == sum.hash))
300  {
301  // summarize items (this sums dia_masks and counts)
302  sum += next.first;
303 
304  // check if count is higher
305  if (next.first.count > max_count) {
306  max_count = next.first.count;
307  max_worker = next.second;
308  }
309  // store all workers to notify
310  workers.push_back(next.second);
311  }
312 
313  // for dia_mask == 3 -> notify all participating workers (this is
314  // for InnerJoin, since only they need the items)
315  if (sum.NeedBroadcast()) {
316  for (const size_t& w : workers) {
317  location_dw[w].Put(sum.hash);
318  location_gbsw[w].PutBits(max_worker, worker_bitsize);
319  LOG << "Put: " << sum.hash << " @ " << max_worker
320  << " -> " << w;
321  }
322  }
323 
324  workers.clear();
325  }
326 
327  // close target-worker writers
328  location_dw.clear();
329  location_gbsw.clear();
330  location_writers.Close();
331 
332  // read location notifications and store them in the unordered_map
333 
334  std::vector<data::CatStream::Reader> location_readers =
335  location_stream->GetReaders();
336 
337  target_processors.reserve(num_items);
338 
339  for (data::CatStream::Reader& reader : location_readers)
340  {
341  GolombBitStreamReader golomb_reader(reader, golomb_param);
342  GolumbDeltaReader delta_reader(
343  golomb_reader, /* initial */ size_t(-1) /* cancels at +1 */);
344 
345  // Builds golomb encoded bitset from data received by the stream.
346  while (delta_reader.HasNext()) {
347  // Golomb code contains deltas, we want the actual values
348  size_t hash = delta_reader.Next<size_t>();
349  size_t worker = golomb_reader.GetBits(worker_bitsize);
350 
351  LOG << "Hash " << hash << " on worker " << worker;
352  target_processors[hash] = worker;
353  }
354  }
355 
356  return max_hash;
357  }
358 
359  void Dispose() {
360  table_.Dispose();
362  }
363 
364  //! Target vector for vector emitter
365  std::vector<HashCount> hash_occ_;
366  //! Emitter to vector
368  //! Thrill context
369  Context& context_;
370  size_t dia_id_;
371  //! Reduce configuration used
373  //! Reduce table used to count keys
375 };
376 
377 } // namespace core
378 } // namespace thrill
379 
380 #endif // !THRILL_CORE_LOCATION_DETECTION_HEADER
381 
382 /******************************************************************************/
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
std::vector< HashCount > & vec_
size_t operator()(const HashCount &hc) const
core::GolombBitStreamReader< data::CatStream::Reader > GolombBitStreamReader
Type selection via ReduceTableImpl enum.
GolombPairReader(GolombBitStreamReader &bit_reader, GolumbDeltaReader &delta_reader)
static constexpr ReduceTableImpl table_impl_
select the hash table in the reduce phase by enum
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
ssize_t CounterType
void Emit(const size_t &, const HashCount &p)
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
size_t GetBits(unsigned bits)
Get bits at the cursor.
Definition: bit_stream.hpp:147
Emitter for a ReduceTable, which emits all of its data into a vector of hash-counter-pairs.
std::vector< HashCount > hash_occ_
Target vector for vector emitter.
void Close()
custom destructor to close writers is a cyclic fashion
Definition: stream_data.cpp:66
LocationDetection(Context &ctx, size_t dia_id, const ReduceConfig &config=ReduceConfig())
ToVectorEmitter(std::vector< HashCount > &vec)
Context & context_
Thrill context.
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
static Range CalculateLocalRange(size_t global_size, size_t p, size_t i)
Definition: math.hpp:110
void Put(const Type &value)
void Insert(const HashCount &item)
Inserts a HashCount item into the table.
void Initialize(size_t limit_memory_bytes)
Initializes the table to the memory limit size.
ToVectorEmitter< HashCount > Emitter
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t end
end index
Definition: math.hpp:58
size_t Flush(std::unordered_map< size_t, size_t > &target_processors)
Flushes the table and detects the most common location for each element.
Configuration class to define operational parameters of reduce hash tables and reduce phases...
ReduceConfig config_
Reduce configuration used.
void vector_free(std::vector< Type > &v)
Definition: vector_free.hpp:21
Emitter emit_
Emitter to vector.
static unsigned integer_log2_ceil(int i)
calculate the log2 floor of an integer type
typename thrill::api::JoinNode::HashCount::CounterType CounterType
Table table_
Reduce table used to count keys.
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
HashCrc32< T > hash
Select a hashing method.
Definition: hash.hpp:262
core::GolombBitStreamWriter< data::CatStream::Writer > GolombBitStreamWriter
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
typename ReduceTableSelect< ReduceConfig::table_impl_, thrill::api::JoinNode::HashCount, typename thrill::api::JoinNode::HashCount::HashType, thrill::api::JoinNode::HashCount, ExtractHash, std::plus< thrill::api::JoinNode::HashCount >, Emitter, false, ReduceConfig >::type Table
void WriteOccurenceCounts(const data::CatStreamPtr &stream_pointer, const std::vector< HashCount > &hash_occ, size_t golomb_param, size_t num_workers, size_t max_hash)