15 #ifndef THRILL_CORE_DUPLICATE_DETECTION_HEADER 16 #define THRILL_CORE_DUPLICATE_DETECTION_HEADER 48 static constexpr
bool debug =
false;
75 const std::vector<size_t>& hashes,
82 size_t prev_hash = size_t(-1);
84 for (
size_t i = 0, j = 0; i < num_workers; ++i) {
88 GolombBitStreamWriter golomb_writer(writers[i], golomb_param);
94 ; j < hashes.size() && hashes[j] < range_i.
end; ++j) {
96 if (hashes[j] == prev_hash)
98 delta_writer.
Put(hashes[j]);
99 prev_hash = hashes[j];
113 std::vector<bool>& non_duplicates,
114 size_t golomb_param) {
116 std::vector<data::CatStream::Reader> readers =
117 stream_pointer->GetReaders();
121 GolombBitStreamReader golomb_reader(reader, golomb_param);
123 golomb_reader,
size_t(-1) );
126 while (delta_reader.
HasNext()) {
128 size_t hash = delta_reader.
Next<
size_t>();
129 assert(hash < non_duplicates.size());
130 non_duplicates[
hash] =
true;
151 std::vector<size_t>& hashes,
157 size_t upper_bound_uniques = context.
net.
AllReduce(hashes.size());
162 double fpr_parameter = 8;
163 size_t golomb_param = (size_t)fpr_parameter;
164 size_t max_hash = upper_bound_uniques * fpr_parameter;
166 for (
size_t i = 0; i < hashes.size(); ++i) {
167 hashes[i] = hashes[i] % max_hash;
170 std::sort(hashes.begin(), hashes.end());
175 hashes, golomb_param,
181 std::vector<data::CatStream::Reader> readers =
182 golomb_data_stream->GetReaders();
184 std::vector<GolombBitStreamReader> g_readers;
185 std::vector<GolumbDeltaReader> delta_readers;
189 for (
auto& reader : readers) {
190 g_readers.emplace_back(reader, golomb_param);
191 delta_readers.emplace_back(
198 auto puller = make_multiway_merge_tree<size_t>(
199 delta_readers.begin(), delta_readers.end());
206 duplicates_stream->GetWriters();
208 std::vector<GolombBitStreamWriter> duplicates_gbsw;
209 std::vector<GolumbDeltaWriter> duplicates_dw;
213 for (
size_t i = 0; i < context.
num_workers(); ++i) {
214 duplicates_gbsw.emplace_back(duplicates_writers[i], golomb_param);
215 duplicates_dw.emplace_back(
216 duplicates_gbsw.back(),
223 if (puller.HasNext())
225 std::pair<size_t, size_t> this_item;
226 std::pair<size_t, size_t> next_item = puller.NextWithSource();
228 while (puller.HasNext())
230 this_item = next_item;
231 next_item = puller.NextWithSource();
233 if (this_item.first != next_item.first) {
235 sLOG <<
"!" << this_item.first <<
"->" << this_item.second;
236 duplicates_dw[this_item.second].Put(this_item.first);
240 sLOG <<
"=" << this_item.first <<
"-" << this_item.second;
243 while (puller.HasNext() &&
244 (next_item = puller.NextWithSource(),
245 next_item.first == this_item.first))
247 sLOG <<
"." << next_item.first <<
"-" << next_item.second;
252 if (this_item.first != next_item.first) {
254 sLOG <<
"!" << next_item.first <<
"->" << next_item.second;
255 duplicates_dw[next_item.second].Put(next_item.first);
260 duplicates_dw.clear();
261 duplicates_gbsw.clear();
262 duplicates_writers.
Close();
265 assert(non_duplicates.size() == 0);
266 non_duplicates.resize(max_hash);
268 duplicates_stream, non_duplicates, golomb_param);
277 #endif // !THRILL_CORE_DUPLICATE_DETECTION_HEADER net::FlowControlChannel & net
#define sLOG
Default logging method: output if the local debug variable is true.
size_t num_workers() const
Global number of workers in the system.
represents a 1 dimensional range (interval) [begin,end)
size_t FindNonDuplicates(std::vector< bool > &non_duplicates, std::vector< size_t > &hashes, Context &context, size_t dia_id)
Identifies all hashes which occur on only a single worker.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT AllReduce(const T &value, const BinarySumOp &sum_op=BinarySumOp())
Reduces a value of a serializable type T over all workers given a certain reduce function.
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
void Close()
custom destructor to close writers is a cyclic fashion
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
data::CatStreamPtr GetNewCatStream(size_t dia_id)
static Range CalculateLocalRange(size_t global_size, size_t p, size_t i)
void Put(const Type &value)
static constexpr bool debug
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
void WriteEncodedHashes(const data::CatStreamPtr &stream_pointer, const std::vector< size_t > &hashes, size_t golomb_param, size_t num_workers, size_t max_hash)
Sends all hashes in the range [max_hash / num_workers * p, max_hash / num_workers * (p + 1)) to worke...
core::GolombBitStreamReader< data::CatStream::Reader > GolombBitStreamReader
core::GolombBitStreamWriter< data::CatStream::Writer > GolombBitStreamWriter
void ReadEncodedHashesToVector(const data::CatStreamPtr &stream_pointer, std::vector< bool > &non_duplicates, size_t golomb_param)
Reads a golomb encoded bitset from a data stream and returns it's contents in form of a vector of has...
HashCrc32< T > hash
Select a hashing method.
Duplicate detection to identify all elements occuring only on one worker.