13 #ifndef THRILL_API_SAMPLE_HEADER 14 #define THRILL_API_SAMPLE_HEADER 49 template <
typename ValueType>
52 static constexpr
bool debug =
false;
58 template <
typename ParentDIA>
60 :
Super(parent.ctx(),
"Sample", { parent.id() }, { parent.node() }),
65 auto presample_fn = [
this](
const ValueType& input) {
68 auto lop_chain = parent.stack().push(presample_fn).fold();
69 parent.node()->AddChild(
this, lop_chain);
76 <<
"elements of which" <<
samples_.size()
77 <<
"were presampled, global sample size =" <<
sample_size_;
82 sLOG <<
"SampleNode::Execute (alone) => all" 96 sLOG <<
"SampleNode::Execute (underfull)" 104 seed = std::random_device { } ();
120 sLOG <<
"SampleNode::Execute" 124 <<
"communication time:" <<
comm_timer_.Microseconds() / 1000.0;
139 <<
samples_.size() <<
"pre-samples";
140 std::vector<ValueType> subsample;
146 <<
"ERROR: SAMPLE SIZE IS WRONG";
151 for (
const ValueType& v :
samples_) {
157 sLOG <<
"SampleNode::PushData finished; total local time excl PushData:" 159 <<
"ms, communication:" <<
comm_timer_.Microseconds() / 1000.0
172 return seed ^ (v + 0x9e3779b9 + (seed << 6) + (seed >> 2));
177 size_t range_begin,
size_t range_end,
178 size_t sample_size,
size_t seed) {
180 if (range_begin >= range_end)
return 0;
181 if (my_begin >= my_end)
return 0;
182 if (sample_size == 0)
return 0;
185 if (my_begin <= range_begin && range_end <= my_end) {
186 LOG <<
"my range [" << my_begin <<
", " << my_end
187 <<
") is contained in the currently considered range [" 188 << range_begin <<
", " << range_end <<
") and thus gets all " 189 << sample_size <<
" samples";
194 if ((range_begin <= my_begin && my_begin < range_end) ||
195 (range_begin < my_end && my_end <= range_end)) {
203 const size_t left_size = (range_end - range_begin) / 2,
204 right_size = (range_end - range_begin) - left_size;
205 const size_t left_samples =
hyp_(left_size, right_size, sample_size);
207 LOG <<
"my range [" << my_begin <<
", " << my_end
208 <<
") overlaps the currently considered range [" 209 << range_begin <<
", " << range_end <<
"), splitting: " 210 <<
"left range [" << range_begin <<
", " << range_begin + left_size
211 <<
") gets " << left_samples <<
" samples, right range [" 212 << range_begin + left_size <<
", " << range_end <<
") the remaining " 213 << sample_size - left_samples <<
" for a total of " << sample_size
218 my_begin, my_end, range_begin, range_begin + left_size,
221 my_begin, my_end, range_begin + left_size, range_end,
222 sample_size - left_samples, seed);
223 return left_result + right_result;
237 std::mt19937_64
rng_ { std::random_device { } () };
246 template <
typename ValueType,
typename Stack>
252 auto node = tlx::make_counting<SampleNode>(
261 #endif // !THRILL_API_SAMPLE_HEADER std::mt19937_64 rng_
Random generator for reservoir sampler.
common::StatsTimerStopped local_timer_
Timers for local work and communication.
net::FlowControlChannel & net
#define sLOG
Default logging method: output if the local debug variable is true.
DIA is the interface between the user and the Thrill framework.
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
void seed(size_t seed_val)
static constexpr bool debug
void add(const Type &item)
visit item, maybe add it to the sample.
SampleNode(const ParentDIA &parent, size_t sample_size)
size_t num_workers() const
Global number of workers in the system.
#define sLOGC(cond)
Explicitly specify the condition for logging.
size_t local_size_
local input size, number of samples to draw globally, and locally
void Execute() final
Virtual execution method. Triggers actual computation in sub-classes.
size_t count() const
number of items seen
size_t calc_local_samples(size_t my_begin, size_t my_end, size_t range_begin, size_t range_end, size_t sample_size, size_t seed)
auto Sample(size_t sample_size) const
Select up to sample_size items uniformly at random and return a new DIA<T>.
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
common::ReservoirSamplingFast< ValueType, decltype(rng_)> sampler_
Reservoir sampler for pre-op.
common::hypergeometric hyp_
Hypergeometric distribution to calculate local sample sizes.
size_t my_rank() const
Global rank of this worker among all other workers in the system.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSumTotal(T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, and delivers the total sum as well...
void vector_free(std::vector< Type > &v)
const bool parent_stack_empty_
Whether the parent stack is empty.
A DOpNode is a typed node representing and distributed operations in Thrill.
static uint_pair min()
return an uint_pair instance containing the smallest value possible
A DIANode which performs sampling without replacement.
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
size_t hash_combine(size_t seed, size_t v)
std::vector< ValueType > samples_
local samples
common::StatsTimerStopped comm_timer_
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT Broadcast(const T &value, size_t origin=0)
Broadcasts a value of a serializable type T from the master (the worker with id 0) to all other worke...
#define LOG
Default logging method: output if the local debug variable is true.
Context & context_
associated Context
#define LOGC(cond)
Explicitly specify the condition for logging.