Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
location_detection.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/core/location_detection.hpp
3  *
4  * Detection of element locations using a distributed bloom filter
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2016 Alexander Noe <[email protected]>
9  * Copyright (C) 2017 Tim Zeitz <[email protected]>
10  *
11  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
12  ******************************************************************************/
13 
14 #pragma once
15 #ifndef THRILL_CORE_LOCATION_DETECTION_HEADER
16 #define THRILL_CORE_LOCATION_DETECTION_HEADER
17 
19 #include <thrill/common/logger.hpp>
30 
32 
33 #include <algorithm>
34 #include <functional>
35 #include <unordered_map>
36 #include <utility>
37 #include <vector>
38 
39 namespace thrill {
40 namespace core {
41 
42 /*!
43  * Emitter for a ReduceTable, which emits all of its data into a vector of
44  * hash-counter-pairs.
45  */
46 template <typename HashCount>
48 {
49 public:
50  explicit ToVectorEmitter(std::vector<HashCount>& vec)
51  : vec_(vec) { }
52 
53  void SetModulo(size_t modulo) {
54  modulo_ = modulo;
55  }
56 
57  void Emit(const size_t& /* partition_id */, const HashCount& p) {
58  assert(modulo_ > 1);
59  vec_.emplace_back(p);
60  vec_.back().hash %= modulo_;
61  }
62 
63 private:
64  std::vector<HashCount>& vec_;
65  size_t modulo_ = 1;
66 };
67 
68 template <typename HashCount>
70 {
71 private:
72  static constexpr bool debug = false;
73 
74  using GolombBitStreamWriter =
76 
77  using GolombBitStreamReader =
79 
80  using GolumbDeltaWriter =
82 
83  using GolumbDeltaReader =
85 
87 
89  {
90  public:
92  GolumbDeltaReader& delta_reader)
93  : bit_reader_(bit_reader), delta_reader_(delta_reader) { }
94 
95  bool HasNext() {
96  return bit_reader_.HasNext();
97  }
98 
99  template <typename Type>
100  HashCount Next() {
101  HashCount hc;
102  hc.hash = delta_reader_.Next<size_t>();
103  hc.ReadBits(bit_reader_);
104  return hc;
105  }
106 
107  private:
110  };
111 
112  struct ExtractHash {
113  size_t operator () (const HashCount& hc) const { return hc.hash; }
114  };
115 
116 private:
117  void WriteOccurenceCounts(const data::CatStreamPtr& stream_pointer,
118  const std::vector<HashCount>& hash_occ,
119  size_t golomb_param,
120  size_t num_workers,
121  size_t max_hash) {
122 
123  data::CatStream::Writers writers = stream_pointer->GetWriters();
124 
125  for (size_t i = 0, j = 0; i < num_workers; ++i) {
126  common::Range range_i =
127  common::CalculateLocalRange(max_hash, num_workers, i);
128 
129  GolombBitStreamWriter golomb_writer(writers[i], golomb_param);
130  GolumbDeltaWriter delta_writer(
131  golomb_writer,
132  /* initial */ size_t(-1) /* cancels with +1 bias */);
133 
134  while (j < hash_occ.size() && hash_occ[j].hash < range_i.end)
135  {
136  // accumulate counters hashing to same value
137  HashCount total_hash = hash_occ[j++];
138  while (j < hash_occ.size() &&
139  hash_occ[j].hash == total_hash.hash)
140  {
141  total_hash += hash_occ[j++];
142  }
143 
144  // write counter of all values hashing to hash_occ[j].hash
145  // in next bitsize bits
146  delta_writer.Put(total_hash.hash);
147  total_hash.WriteBits(golomb_writer);
148  }
149  }
150  }
151 
152 public:
155 
156  using Table = typename ReduceTableSelect<
158  HashCount, typename HashCount::HashType, HashCount,
159  ExtractHash, std::plus<HashCount>, Emitter,
160  /* VolatileKey */ false, ReduceConfig>::type;
161 
162  LocationDetection(Context& ctx, size_t dia_id,
163  const ReduceConfig& config = ReduceConfig())
164  : emit_(hash_occ_),
165  context_(ctx),
166  dia_id_(dia_id),
167  config_(config),
168  table_(ctx, dia_id, ExtractHash(), std::plus<HashCount>(),
169  emit_, 1, config) {
170  sLOG << "creating LocationDetection";
171  }
172 
173  /*!
174  * Initializes the table to the memory limit size.
175  *
176  * \param limit_memory_bytes Memory limit in bytes
177  */
178  void Initialize(size_t limit_memory_bytes) {
179  table_.Initialize(limit_memory_bytes);
180  }
181 
182  /*!
183  * Inserts a HashCount item into the table.
184  */
185  void Insert(const HashCount& item) {
186  table_.Insert(item);
187  }
188 
189  /*!
190  * Flushes the table and detects the most common location for each element.
191  */
192  size_t Flush(std::unordered_map<size_t, size_t>& target_processors) {
193 
194  size_t num_items = table_.num_items();
195  if (table_.has_spilled_data_on_partition(0)) {
196  num_items += table_.partition_files()[0].num_items();
197  }
198 
199  // golomb code parameters
200  size_t upper_bound_uniques = context_.net.AllReduce(num_items);
201  double fpr_parameter = 8;
202  size_t golomb_param = (size_t)fpr_parameter;
203  size_t max_hash = golomb_param * upper_bound_uniques;
204 
205  emit_.SetModulo(max_hash);
206  hash_occ_.reserve(num_items);
207  table_.FlushAll();
208 
209  if (table_.has_spilled_data_on_partition(0)) {
210  data::File::Reader reader =
211  table_.partition_files()[0].GetReader(true);
212 
213  while (reader.HasNext()) {
214  emit_.Emit(0, reader.Next<HashCount>());
215  }
216  }
217 
218  std::sort(hash_occ_.begin(), hash_occ_.end());
219 
220  data::CatStreamPtr golomb_data_stream =
221  context_.GetNewCatStream(dia_id_);
222 
223  WriteOccurenceCounts(golomb_data_stream,
224  hash_occ_, golomb_param,
225  context_.num_workers(),
226  max_hash);
227 
228  std::vector<HashCount>().swap(hash_occ_);
229 
230  // get inbound Golomb/delta-encoded hash stream
231 
232  std::vector<data::CatStream::Reader> hash_readers =
233  golomb_data_stream->GetReaders();
234 
235  std::vector<GolombBitStreamReader> golomb_readers;
236  std::vector<GolumbDeltaReader> delta_readers;
237  std::vector<GolombPairReader> pair_readers;
238 
239  golomb_readers.reserve(context_.num_workers());
240  delta_readers.reserve(context_.num_workers());
241  pair_readers.reserve(context_.num_workers());
242 
243  for (auto& reader : hash_readers) {
244  golomb_readers.emplace_back(reader, golomb_param);
245  delta_readers.emplace_back(
246  golomb_readers.back(),
247  /* initial */ size_t(-1) /* cancels with +1 bias */);
248  pair_readers.emplace_back(
249  golomb_readers.back(), delta_readers.back());
250  }
251 
252  size_t worker_bitsize = std::max(
253  tlx::integer_log2_ceil(context_.num_workers()), (unsigned int)1);
254 
255  // multi-way merge hash streams and detect hosts with most items per key
256 
257  auto puller = make_multiway_merge_tree<HashCount>(
258  pair_readers.begin(), pair_readers.end());
259 
260  // create streams (delta/Golomb encoded) to notify workers of location
261 
262  data::CatStreamPtr location_stream = context_.GetNewCatStream(dia_id_);
263 
264  data::CatStream::Writers location_writers =
265  location_stream->GetWriters();
266 
267  std::vector<GolombBitStreamWriter> location_gbsw;
268  std::vector<GolumbDeltaWriter> location_dw;
269  location_gbsw.reserve(context_.num_workers());
270  location_dw.reserve(context_.num_workers());
271 
272  for (size_t i = 0; i < context_.num_workers(); ++i) {
273  location_gbsw.emplace_back(location_writers[i], golomb_param);
274  location_dw.emplace_back(
275  location_gbsw.back(),
276  /* initial */ size_t(-1) /* cancels with +1 bias */);
277  }
278 
279  std::pair<HashCount, size_t> next;
280  std::vector<size_t> workers;
281 
282  bool not_finished = puller.HasNext();
283 
284  if (not_finished)
285  next = puller.NextWithSource();
286 
287  while (not_finished) {
288  // set up aggregation values from first item with equal hash
289  HashCount sum = next.first;
290  workers.push_back(next.second);
291 
292  size_t max_worker = next.second;
293  CounterType max_count = sum.count;
294 
295  // check if another item is available, and if it has the same hash
296  while ((not_finished = puller.HasNext()) &&
297  (next = puller.NextWithSource(),
298  next.first.hash == sum.hash))
299  {
300  // summarize items (this sums dia_masks and counts)
301  sum += next.first;
302 
303  // check if count is higher
304  if (next.first.count > max_count) {
305  max_count = next.first.count;
306  max_worker = next.second;
307  }
308  // store all workers to notify
309  workers.push_back(next.second);
310  }
311 
312  // for dia_mask == 3 -> notify all participating workers (this is
313  // for InnerJoin, since only they need the items)
314  if (sum.NeedBroadcast()) {
315  for (const size_t& w : workers) {
316  location_dw[w].Put(sum.hash);
317  location_gbsw[w].PutBits(max_worker, worker_bitsize);
318  LOG << "Put: " << sum.hash << " @ " << max_worker
319  << " -> " << w;
320  }
321  }
322 
323  workers.clear();
324  }
325 
326  // close target-worker writers
327  location_dw.clear();
328  location_gbsw.clear();
329  location_writers.Close();
330 
331  // read location notifications and store them in the unordered_map
332 
333  std::vector<data::CatStream::Reader> location_readers =
334  location_stream->GetReaders();
335 
336  target_processors.reserve(num_items);
337 
338  for (data::CatStream::Reader& reader : location_readers)
339  {
340  GolombBitStreamReader golomb_reader(reader, golomb_param);
341  GolumbDeltaReader delta_reader(
342  golomb_reader, /* initial */ size_t(-1) /* cancels at +1 */);
343 
344  // Builds golomb encoded bitset from data received by the stream.
345  while (delta_reader.HasNext()) {
346  // Golomb code contains deltas, we want the actual values
347  size_t hash = delta_reader.Next<size_t>();
348  size_t worker = golomb_reader.GetBits(worker_bitsize);
349 
350  LOG << "Hash " << hash << " on worker " << worker;
351  target_processors[hash] = worker;
352  }
353  }
354 
355  return max_hash;
356  }
357 
358  void Dispose() {
359  table_.Dispose();
360  std::vector<HashCount>().swap(hash_occ_);
361  }
362 
363  //! Target vector for vector emitter
364  std::vector<HashCount> hash_occ_;
365  //! Emitter to vector
367  //! Thrill context
368  Context& context_;
369  size_t dia_id_;
370  //! Reduce configuration used
372  //! Reduce table used to count keys
374 };
375 
376 } // namespace core
377 } // namespace thrill
378 
379 #endif // !THRILL_CORE_LOCATION_DETECTION_HEADER
380 
381 /******************************************************************************/
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
std::vector< HashCount > & vec_
size_t operator()(const HashCount &hc) const
core::GolombBitStreamReader< data::CatStream::Reader > GolombBitStreamReader
Type selection via ReduceTableImpl enum.
GolombPairReader(GolombBitStreamReader &bit_reader, GolumbDeltaReader &delta_reader)
static constexpr ReduceTableImpl table_impl_
select the hash table in the reduce phase by enum
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
ssize_t CounterType
void Emit(const size_t &, const HashCount &p)
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
size_t GetBits(unsigned bits)
Get bits at the cursor.
Definition: bit_stream.hpp:147
Emitter for a ReduceTable, which emits all of its data into a vector of hash-counter-pairs.
std::vector< HashCount > hash_occ_
Target vector for vector emitter.
void Close()
custom destructor to close writers is a cyclic fashion
Definition: stream_data.cpp:66
LocationDetection(Context &ctx, size_t dia_id, const ReduceConfig &config=ReduceConfig())
ToVectorEmitter(std::vector< HashCount > &vec)
Context & context_
Thrill context.
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
static Range CalculateLocalRange(size_t global_size, size_t p, size_t i)
Definition: math.hpp:110
void Put(const Type &value)
void Insert(const HashCount &item)
Inserts a HashCount item into the table.
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
void Initialize(size_t limit_memory_bytes)
Initializes the table to the memory limit size.
ToVectorEmitter< HashCount > Emitter
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t end
end index
Definition: math.hpp:58
size_t Flush(std::unordered_map< size_t, size_t > &target_processors)
Flushes the table and detects the most common location for each element.
Configuration class to define operational parameters of reduce hash tables and reduce phases...
ReduceConfig config_
Reduce configuration used.
Emitter emit_
Emitter to vector.
typename thrill::api::JoinNode::HashCount::CounterType CounterType
Table table_
Reduce table used to count keys.
unsigned integer_log2_ceil(int i)
calculate the log2 ceiling of an integer type (by repeated bit shifts)
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
HashCrc32< T > hash
Select a hashing method.
Definition: hash.hpp:262
core::GolombBitStreamWriter< data::CatStream::Writer > GolombBitStreamWriter
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
typename ReduceTableSelect< ReduceConfig::table_impl_, thrill::api::JoinNode::HashCount, typename thrill::api::JoinNode::HashCount::HashType, thrill::api::JoinNode::HashCount, ExtractHash, std::plus< thrill::api::JoinNode::HashCount >, Emitter, false, ReduceConfig >::type Table
void WriteOccurenceCounts(const data::CatStreamPtr &stream_pointer, const std::vector< HashCount > &hash_occ, size_t golomb_param, size_t num_workers, size_t max_hash)