15 #ifndef THRILL_CORE_LOCATION_DETECTION_HEADER 16 #define THRILL_CORE_LOCATION_DETECTION_HEADER 36 #include <unordered_map> 47 template <
typename HashCount>
58 void Emit(
const size_t& ,
const HashCount& p) {
65 std::vector<HashCount>&
vec_;
69 template <
typename HashCount>
73 static constexpr
bool debug =
false;
94 : bit_reader_(bit_reader), delta_reader_(delta_reader) { }
97 return bit_reader_.HasNext();
100 template <
typename Type>
103 hc.hash = delta_reader_.Next<
size_t>();
104 hc.ReadBits(bit_reader_);
114 size_t operator () (
const HashCount& hc)
const {
return hc.hash; }
119 const std::vector<HashCount>& hash_occ,
126 for (
size_t i = 0, j = 0; i < num_workers; ++i) {
130 GolombBitStreamWriter golomb_writer(writers[i], golomb_param);
135 while (j < hash_occ.size() && hash_occ[j].hash < range_i.
end)
138 HashCount total_hash = hash_occ[j++];
139 while (j < hash_occ.size() &&
140 hash_occ[j].hash == total_hash.hash)
142 total_hash += hash_occ[j++];
147 delta_writer.
Put(total_hash.hash);
148 total_hash.WriteBits(golomb_writer);
158 ReduceConfig::table_impl_,
159 HashCount,
typename HashCount::HashType, HashCount,
160 ExtractHash, std::plus<HashCount>,
Emitter,
169 table_(ctx, dia_id, ExtractHash(),
std::plus<HashCount>(),
171 sLOG <<
"creating LocationDetection";
180 table_.Initialize(limit_memory_bytes);
193 size_t Flush(std::unordered_map<size_t, size_t>& target_processors) {
195 size_t num_items = table_.num_items();
196 if (table_.has_spilled_data_on_partition(0)) {
197 num_items += table_.partition_files()[0].num_items();
201 size_t upper_bound_uniques = context_.net.AllReduce(num_items);
202 double fpr_parameter = 8;
203 size_t golomb_param = (size_t)fpr_parameter;
204 size_t max_hash = golomb_param * upper_bound_uniques;
206 emit_.SetModulo(max_hash);
207 hash_occ_.reserve(num_items);
210 if (table_.has_spilled_data_on_partition(0)) {
212 table_.partition_files()[0].GetReader(
true);
215 emit_.Emit(0, reader.
Next<HashCount>());
219 std::sort(hash_occ_.begin(), hash_occ_.end());
222 context_.GetNewCatStream(dia_id_);
224 WriteOccurenceCounts(golomb_data_stream,
225 hash_occ_, golomb_param,
226 context_.num_workers(),
233 std::vector<data::CatStream::Reader> hash_readers =
234 golomb_data_stream->GetReaders();
236 std::vector<GolombBitStreamReader> golomb_readers;
237 std::vector<GolumbDeltaReader> delta_readers;
238 std::vector<GolombPairReader> pair_readers;
240 golomb_readers.reserve(context_.num_workers());
241 delta_readers.reserve(context_.num_workers());
242 pair_readers.reserve(context_.num_workers());
244 for (
auto& reader : hash_readers) {
245 golomb_readers.emplace_back(reader, golomb_param);
246 delta_readers.emplace_back(
247 golomb_readers.back(),
249 pair_readers.emplace_back(
250 golomb_readers.back(), delta_readers.back());
258 auto puller = make_multiway_merge_tree<HashCount>(
259 pair_readers.begin(), pair_readers.end());
266 location_stream->GetWriters();
268 std::vector<GolombBitStreamWriter> location_gbsw;
269 std::vector<GolumbDeltaWriter> location_dw;
270 location_gbsw.reserve(context_.num_workers());
271 location_dw.reserve(context_.num_workers());
273 for (
size_t i = 0; i < context_.num_workers(); ++i) {
274 location_gbsw.emplace_back(location_writers[i], golomb_param);
275 location_dw.emplace_back(
276 location_gbsw.back(),
280 std::pair<HashCount, size_t> next;
281 std::vector<size_t> workers;
283 bool not_finished = puller.HasNext();
286 next = puller.NextWithSource();
288 while (not_finished) {
290 HashCount sum = next.first;
291 workers.push_back(next.second);
293 size_t max_worker = next.second;
297 while ((not_finished = puller.HasNext()) &&
298 (next = puller.NextWithSource(),
299 next.first.hash == sum.hash))
305 if (next.first.count > max_count) {
306 max_count = next.first.count;
307 max_worker = next.second;
310 workers.push_back(next.second);
315 if (sum.NeedBroadcast()) {
316 for (
const size_t& w : workers) {
317 location_dw[w].Put(sum.hash);
318 location_gbsw[w].PutBits(max_worker, worker_bitsize);
319 LOG <<
"Put: " << sum.hash <<
" @ " << max_worker
329 location_gbsw.clear();
330 location_writers.
Close();
334 std::vector<data::CatStream::Reader> location_readers =
335 location_stream->GetReaders();
337 target_processors.reserve(num_items);
341 GolombBitStreamReader golomb_reader(reader, golomb_param);
343 golomb_reader,
size_t(-1) );
346 while (delta_reader.
HasNext()) {
348 size_t hash = delta_reader.
Next<
size_t>();
349 size_t worker = golomb_reader.
GetBits(worker_bitsize);
351 LOG <<
"Hash " << hash <<
" on worker " << worker;
352 target_processors[
hash] = worker;
380 #endif // !THRILL_CORE_LOCATION_DETECTION_HEADER #define sLOG
Default logging method: output if the local debug variable is true.
static uint_pair max()
return an uint_pair instance containing the largest value possible
std::vector< HashCount > & vec_
Type selection via ReduceTableImpl enum.
GolombPairReader(GolombBitStreamReader &bit_reader, GolumbDeltaReader &delta_reader)
GolumbDeltaReader & delta_reader_
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
void Emit(const size_t &, const HashCount &p)
represents a 1 dimensional range (interval) [begin,end)
size_t GetBits(unsigned bits)
Get bits at the cursor.
Emitter for a ReduceTable, which emits all of its data into a vector of hash-counter-pairs.
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
std::vector< HashCount > hash_occ_
Target vector for vector emitter.
void Close()
custom destructor to close writers is a cyclic fashion
LocationDetection(Context &ctx, size_t dia_id, const ReduceConfig &config=ReduceConfig())
ToVectorEmitter(std::vector< HashCount > &vec)
Context & context_
Thrill context.
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
static Range CalculateLocalRange(size_t global_size, size_t p, size_t i)
void Put(const Type &value)
void Insert(const HashCount &item)
Inserts a HashCount item into the table.
void Initialize(size_t limit_memory_bytes)
Initializes the table to the memory limit size.
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t Flush(std::unordered_map< size_t, size_t > &target_processors)
Flushes the table and detects the most common location for each element.
Configuration class to define operational parameters of reduce hash tables and reduce phases...
static constexpr bool debug
ReduceConfig config_
Reduce configuration used.
void SetModulo(size_t modulo)
void vector_free(std::vector< Type > &v)
Emitter emit_
Emitter to vector.
static unsigned integer_log2_ceil(int i)
calculate the log2 floor of an integer type
typename thrill::api::JoinNode::HashCount ::CounterType CounterType
Table table_
Reduce table used to count keys.
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
HashCrc32< T > hash
Select a hashing method.
#define LOG
Default logging method: output if the local debug variable is true.
typename ReduceTableSelect< ReduceConfig::table_impl_, thrill::api::JoinNode::HashCount, typename thrill::api::JoinNode::HashCount ::HashType, thrill::api::JoinNode::HashCount, ExtractHash, std::plus< thrill::api::JoinNode::HashCount >, Emitter, false, ReduceConfig >::type Table
void WriteOccurenceCounts(const data::CatStreamPtr &stream_pointer, const std::vector< HashCount > &hash_occ, size_t golomb_param, size_t num_workers, size_t max_hash)
GolombBitStreamReader & bit_reader_