Thrill  0.1
reservoir_sampling.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/common/reservoir_sampling.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2017 Timo Bingmann <[email protected]>
7  * Copyright (C) 2017 Lorenz Hübschle-Schneider <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_COMMON_RESERVOIR_SAMPLING_HEADER
14 #define THRILL_COMMON_RESERVOIR_SAMPLING_HEADER
15 
16 #include <thrill/common/logger.hpp>
17 
18 #include <cassert>
19 #include <cmath>
20 #include <random>
21 #include <vector>
22 
23 namespace thrill {
24 namespace common {
25 
26 /*!
27  * Implementation of reservoir sampling using Vitter's Algorithm R. The
28  * reservoir size is fixed, new items replace old ones such that all items in
29  * the stream are sampled with the same uniform probability.
30  */
31 template <typename Type, typename RNG = std::default_random_engine>
33 {
34 public:
35  //! initialize reservoir sampler
36  ReservoirSampling(size_t size, std::vector<Type>& samples,
37  RNG& rng)
38  : size_(size), samples_(samples), rng_(rng) {
39  samples_.reserve(size_);
40  }
41 
42  //! visit item, maybe add it to the sample.
43  void add(const Type& item) {
44  ++count_;
45  if (count_ <= size_) {
46  // if reservoir is too small then store item
47  samples_.emplace_back(item);
48  }
49  else {
50  // maybe replace an item
51  size_t x = rng_() % count_;
52  if (x < size_)
53  samples_[x] = item;
54  }
55  }
56 
57  //! size of reservoir
58  size_t size() const { return size_; }
59 
60  //! number of items seen
61  size_t count() const { return count_; }
62 
63  //! access to samples
64  const std::vector<Type>& samples() const { return samples_; }
65 
66 private:
67  //! size of reservoir
68  size_t size_;
69  //! number of items seen
70  size_t count_ = 0;
71  //! reservoir
72  std::vector<Type>& samples_;
73  //! source of randomness
74  RNG& rng_;
75 };
76 
77 /*!
78  * Fast exact implementation of reservoir sampling using skip values. Algorithm
79  * L from Kim-Hung Li: Reservoir Sampling Algorithms of Time Complexity
80  * O(n(1+log(N/n))), ACM TOMS 1994. The reservoir size is fixed, new items
81  * replace old ones such that all items in the stream are sampled with the same
82  * uniform probability.
83  */
84 template <typename Type, typename RNG = std::default_random_engine>
86 {
87 public:
88  //! initialize reservoir sampler
89  ReservoirSamplingFast(size_t size, std::vector<Type>& samples,
90  RNG& rng)
91  : size_(size), samples_(samples), rng_(rng) {
92  samples_.reserve(size_);
93  W_ = std::exp(std::log(uniform(rng_)) / size);
94  gap_ = std::floor(std::log(uniform(rng_)) / std::log(1 - W_));
95  }
96 
97  //! visit item, maybe add it to the sample.
98  void add(const Type& item) {
99  ++count_;
100  if (count_ <= 4 * size_) {
101  if (count_ <= size_) {
102  // if reservoir is too small then store item
103  samples_.emplace_back(item);
104  }
105  else {
106  // use Vitter's algorithm for small count_
107  size_t x = uniform(rng_) * count_;
108  if (x < size_)
109  samples_[x] = item;
110 
111  // when count_ reaches 4 * size_ switch to gap algorithm
112  if (count_ == 4 * size_)
113  calc_next_gap();
114  }
115  }
116  else if (gap_ == 0) {
117  // gap elapsed, this item is a sample
118  size_t x = uniform(rng_) * size_;
119  samples_[x] = item;
120 
121  // pick gap size: the next gap_ items are not samples
122  calc_next_gap();
123  }
124  else {
125  --gap_;
126  }
127  }
128 
129  //! size of reservoir
130  size_t size() const { return size_; }
131 
132  //! number of items seen
133  size_t count() const { return count_; }
134 
135  //! access to samples
136  const std::vector<Type>& samples() const { return samples_; }
137 
138 private:
139  //! size of reservoir
140  size_t size_;
141  //! number of items seen
142  size_t count_ = 0;
143  //! number of items to skip until next sample
144  size_t gap_;
145  //! random value for gap calculation, distribution: largest value in a
146  //! sample of Uniform(0, old_W) of size size_, where old_W is 1 initially
147  double W_;
148  //! reservoir
149  std::vector<Type>& samples_;
150  //! source of randomness
151  RNG& rng_;
152  //! uniform [0.0, 1.0) distribution
153  std::uniform_real_distribution<double> uniform;
154 
155  //! draw gap size from geometric distribution with p = size_ / count_
156  void calc_next_gap() {
157  W_ *= std::exp(std::log(uniform(rng_)) / size_);
158  gap_ = std::log(uniform(rng_)) / std::log(1 - W_);
159  }
160 };
161 
162 /*!
163  * Implementation of a fast approximation of adaptive reservoir sampling using
164  * http://erikerlandson.github.io/blog/2015/11/20/very-fast-reservoir-sampling/
165  * The reservoir size grows logarithmically with the number given to the
166  * sampler, new items replace old ones such that all items in the stream are
167  * sampled with the same approximately uniform probability.
168  *
169  * Growing of the reservoir is implemented trivially by selecting any current
170  * item to expand the array. This works well enough if growing steps become
171  * rarer for larger streams.
172  */
173 template <typename Type, typename RNG = std::default_random_engine>
175 {
176  static constexpr bool debug = false;
177 
178 public:
179  //! initialize reservoir sampler
180  ReservoirSamplingGrow(std::vector<Type>& samples,
181  RNG& rng,
182  double desired_imbalance = 0.05)
183  : samples_(samples), rng_(rng), desired_imbalance_(desired_imbalance)
184  { }
185 
186  //! visit item, maybe add it to the sample.
187  void add(const Type& item) {
188  sLOG0 << "ReservoirSamplingGrow::add"
189  << "count_" << count_
190  << "size_" << size_
191  << "gap_" << gap_;
192 
193  ++count_;
194 
195  // check if reservoir should be resize, this is equivalent to the check
196  // if (size_ != calc_sample_size())
197  if (steps_to_resize == 0)
198  {
199  // calculate new reservoir size
200  size_t target_size = calc_sample_size();
201  steps_to_resize = calc_steps_to_next_resize();
202 
203  sLOG << "steps_to_resize" << steps_to_resize
204  << "target_size" << target_size
205  << "size_" << size_ << "count_" << count_
206  << "expanded_by" << target_size - size_;
207  assert(target_size >= size_);
208 
209  // expand reservoir, sample new items from existing and new one
210  while (size_ < target_size) {
211  size_t x = rng_() % (size_ + 1);
212  if (x != size_)
213  samples_.emplace_back(samples_[x]);
214  else
215  samples_.emplace_back(item);
216  ++size_;
217  }
218  }
219  else {
220  --steps_to_resize;
221  }
222 
223  assert(samples_.size() == size_);
224  if (debug && size_ != calc_sample_size())
225  LOG0 << "delta: " << (int)size_ - (int)calc_sample_size()
226  << " count_ " << count_;
227 
228  if (count_ <= 4 * size_) {
229  if (count_ <= size_) {
230  // fill slots initially in order
231  samples_[count_ - 1] = item;
232  }
233  else {
234  // replace items using Vitter's Algorithm R
235  size_t x = rng_() % count_;
236  if (x < size_)
237  samples_[x] = item;
238 
239  // when count_ reaches 4 * size_ switch to gap algorithm
240  if (count_ == 4 * size_)
241  gap_ = calc_next_gap();
242  }
243  }
244  else if (gap_ == 0) {
245  // gap elapsed, this item is a sample
246  size_t x = rng_() % size_;
247  samples_[x] = item;
248 
249  // pick gap size: the next gap_ items are not samples
250  gap_ = calc_next_gap();
251  }
252  else {
253  --gap_;
254  }
255  }
256 
257  //! size of reservoir
258  size_t size() const { return size_; }
259 
260  //! number of items seen
261  size_t count() const { return count_; }
262 
263  //! access to samples
264  const std::vector<Type>& samples() const { return samples_; }
265 
266  //! desired imbalance
267  double desired_imbalance() const { return desired_imbalance_; }
268 
269  //! calculate desired sample size
270  size_t calc_sample_size(size_t count) const {
271  size_t s = static_cast<size_t>(
272  std::log2(count)
273  * (1.0 / (desired_imbalance_ * desired_imbalance_)));
274  return std::max(s, size_t(1));
275  }
276 
277  //! calculate desired sample size
278  size_t calc_sample_size() const {
279  return calc_sample_size(count_);
280  }
281 
282 private:
283  //! size of reservoir
284  size_t size_ = 0;
285  //! number of items seen
286  size_t count_ = 0;
287  //! items to skip until next sample (used in gap algorithm)
288  size_t gap_ = 0;
289  //! items to process prior to checking for reservoir resize
290  size_t steps_to_resize = 0;
291  //! reservoir
292  std::vector<Type>& samples_;
293  //! source of randomness
294  RNG& rng_;
295 
296  //! epsilon imbalance: this reservoir sampling works well for the range 0.5
297  //! to 0.01. Imbalance 0.5 results in 79 samples for 1 million items, 0.1 in
298  //! 1992, 0.05 in 6643, 0.02 in 49828, and 0.01 in 199315.
299  const double desired_imbalance_;
300 
301  //! draw gap size from geometric distribution with p = size_ / count_
302  size_t calc_next_gap() const {
303  if (0) {
304  // this is slower than the simpler approximation below
305  return std::geometric_distribution<size_t>(
306  static_cast<double>(size_) / static_cast<double>(count_))(rng_);
307  }
308  else {
309  // generate a geometric distributed variant with p = size_ / count_
310  double p = static_cast<double>(size_) / static_cast<double>(count_);
311  double u = std::uniform_real_distribution<double>()(rng_);
312  return std::floor(std::log(u) / std::log(1 - p));
313  }
314  }
315 
316  //! calculate number of items/steps to process without checking for sample
317  //! resize
318  size_t calc_steps_to_next_resize() const {
319  return std::floor(
320  count_ * (
321  std::pow(2.0, desired_imbalance_ * desired_imbalance_) - 1.0));
322  }
323 };
324 
325 } // namespace common
326 } // namespace thrill
327 
328 #endif // !THRILL_COMMON_RESERVOIR_SAMPLING_HEADER
329 
330 /******************************************************************************/
size_t calc_sample_size(size_t count) const
calculate desired sample size
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
const std::vector< Type > & samples() const
access to samples
void add(const Type &item)
visit item, maybe add it to the sample.
std::uniform_real_distribution< double > uniform
uniform [0.0, 1.0) distribution
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:27
Type
VFS object type.
Definition: file_io.hpp:52
Implementation of a fast approximation of adaptive reservoir sampling using http://erikerlandson.github.io/blog/2015/11/20/very-fast-reservoir-sampling/ The reservoir size grows logarithmically with the number given to the sampler, new items replace old ones such that all items in the stream are sampled with the same approximately uniform probability.
size_t count() const
number of items seen
ReservoirSamplingGrow(std::vector< Type > &samples, RNG &rng, double desired_imbalance=0.05)
initialize reservoir sampler
void add(const Type &item)
visit item, maybe add it to the sample.
void add(const Type &item)
visit item, maybe add it to the sample.
void calc_next_gap()
draw gap size from geometric distribution with p = size_ / count_
size_t size() const
size of reservoir
Implementation of reservoir sampling using Vitter&#39;s Algorithm R.
size_t count() const
number of items seen
ReservoirSampling(size_t size, std::vector< Type > &samples, RNG &rng)
initialize reservoir sampler
Fast exact implementation of reservoir sampling using skip values.
double desired_imbalance() const
desired imbalance
#define sLOG0
Override default output: never or always output log.
Definition: logger.hpp:37
ReservoirSamplingFast(size_t size, std::vector< Type > &samples, RNG &rng)
initialize reservoir sampler
list x
Definition: gen_data.py:39
size_t calc_sample_size() const
calculate desired sample size
static constexpr bool debug
size_t size() const
size of reservoir
size_t size() const
size of reservoir
RNG & rng_
source of randomness
size_t count() const
number of items seen
size_t gap_
number of items to skip until next sample
const std::vector< Type > & samples() const
access to samples
size_t calc_next_gap() const
draw gap size from geometric distribution with p = size_ / count_
std::vector< Type > & samples_
reservoir
size_t count_
number of items seen
std::vector< Type > & samples_
reservoir
std::vector< Type > & samples_
reservoir
const std::vector< Type > & samples() const
access to samples