13 #ifndef THRILL_API_INNER_JOIN_HEADER 14 #define THRILL_API_INNER_JOIN_HEADER 57 template <
typename ValueType,
typename FirstDIA,
typename SecondDIA,
58 typename KeyExtractor1,
typename KeyExtractor2,
59 typename JoinFunction,
typename HashFunction,
60 bool UseLocationDetection>
64 static constexpr
bool debug =
false;
73 using Key =
typename common::FunctionTraits<KeyExtractor1>::result_type;
90 assert(hash == b.
hash);
99 assert(hash == b.
hash);
111 return dia_mask == 3;
115 template <
typename BitReader>
117 count = reader.GetBits(counter_bits_);
118 dia_mask = reader.GetBits(2);
122 template <
typename BitWriter>
124 writer.PutBits(count, counter_bits_);
125 writer.PutBits(dia_mask, 2);
133 JoinNode(
const FirstDIA& parent1,
const SecondDIA& parent2,
134 const KeyExtractor1& key_extractor1,
135 const KeyExtractor2& key_extractor2,
136 const JoinFunction& join_function,
137 const HashFunction& hash_function)
138 :
Super(parent1.ctx(),
"Join",
139 { parent1.id(), parent2.id() },
140 { parent1.node(), parent2.node() }),
153 auto lop_chain1 = parent1.stack().push(pre_op_fn1).fold();
154 auto lop_chain2 = parent2.stack().push(pre_op_fn2).fold();
155 parent1.node()->AddChild(
this, lop_chain1, 0);
156 parent2.node()->AddChild(
this, lop_chain2, 1);
161 if (UseLocationDetection) {
162 std::unordered_map<size_t, size_t> target_processors;
167 while (file1reader.HasNext()) {
169 auto target_processor =
170 target_processors.find(
172 if (target_processor != target_processors.end()) {
178 while (file2reader.HasNext()) {
180 auto target_processor =
181 target_processors.find(
183 if (target_processor != target_processors.end()) {
195 template <
typename ElementType,
typename CompareFunction>
197 std::vector<data::File::Reader>& seq,
198 CompareFunction compare_function,
bool consume) {
200 size_t merge_degree, prefetch;
201 std::tie(merge_degree, prefetch) =
204 seq.reserve(files.size());
205 for (
size_t t = 0; t < files.size(); ++t)
206 seq.emplace_back(files[t].GetReader(consume, 0));
209 return core::make_buffered_multiway_merge_tree<ElementType>(
210 seq.begin(), seq.end(), compare_function);
215 auto compare_function_1 =
220 auto compare_function_2 =
230 MergeFiles<InputTypeFirst>(
files1_, compare_function_1);
231 MergeFiles<InputTypeSecond>(
files2_, compare_function_2);
233 std::vector<data::File::Reader> seq1;
234 std::vector<data::File::Reader> seq2;
237 auto puller1 = MakePuller<InputTypeFirst>(
238 files1_, seq1, compare_function_1, consume);
239 auto puller2 = MakePuller<InputTypeSecond>(
240 files2_, seq2, compare_function_2, consume);
242 bool puller1_done =
false;
243 if (!puller1.HasNext())
246 bool puller2_done =
false;
247 if (!puller2.HasNext())
252 std::vector<InputTypeFirst> equal_keys1;
253 std::vector<InputTypeSecond> equal_keys2;
255 while (!puller1_done && !puller2_done) {
259 if (!puller1.Update()) {
266 if (!puller2.Update()) {
272 bool external1 =
false;
273 bool external2 =
false;
276 std::tie(puller1_done, external1) =
280 std::tie(puller2_done, external2) =
322 if (UseLocationDetection) {
323 pre_writer1_.
Put(input);
333 if (UseLocationDetection) {
334 pre_writer2_.
Put(input);
359 template <
typename ItemType>
379 template <
typename ItemType,
typename KeyExtractor,
typename MergeTree>
381 std::vector<ItemType>& vec, MergeTree& puller,
384 vec.push_back(puller.Top());
385 Key key = key_extractor(puller.Top());
387 size_t capacity = JoinCapacity<ItemType>();
389 if (!puller.Update())
390 return std::make_pair(
true,
false);
392 while (key_extractor(puller.Top()) == key) {
395 vec.push_back(puller.Top());
400 for (
const ItemType& item : vec) {
403 writer.
Put(puller.Top());
411 if (!puller.Update())
412 return std::make_pair(
true,
false);
415 return std::make_pair(
false,
false);
434 template <
typename KeyExtractor,
typename MergeTree>
436 MergeTree& puller,
const KeyExtractor& key_extractor,
438 if (!puller.Update()) {
439 return std::make_pair(
true,
true);
442 while (key_extractor(puller.Top()) == key) {
443 writer.
Put(puller.Top());
444 if (!puller.Update())
445 return std::make_pair(
true,
true);
448 return std::make_pair(
false,
true);
456 LOG << *
this <<
" running StartPreOp parent_index=" << parent_index;
457 if (!location_detection_initialized_ && UseLocationDetection) {
459 location_detection_initialized_ =
true;
464 if (parent_index == 0) {
467 if (parent_index == 1) {
473 LOG << *
this <<
" running StopPreOp parent_index=" << parent_index;
475 if (parent_index == 0) {
476 pre_writer1_.
Close();
478 if (parent_index == 1) {
479 pre_writer2_.
Close();
494 template <
typename ItemType,
typename KeyExtractor>
497 std::deque<data::File>& files,
const KeyExtractor& key_extractor) {
499 std::vector<ItemType> vec;
500 vec.reserve(capacity);
503 if (vec.size() < capacity) {
504 vec.push_back(reader.template Next<ItemType>());
518 template <
typename ItemType,
typename CompareFunction>
520 CompareFunction compare_function) {
522 size_t merge_degree, prefetch;
525 while (std::tie(merge_degree, prefetch) =
527 files.size() > merge_degree)
529 sLOG1 <<
"Partial multi-way-merge of" 530 << merge_degree <<
"files with prefetch" << prefetch;
533 std::vector<data::File::ConsumeReader> seq;
534 seq.reserve(merge_degree);
536 for (
size_t t = 0; t < merge_degree; ++t)
537 seq.emplace_back(files[t].GetConsumeReader( 0));
541 auto puller = core::make_multiway_merge_tree<ItemType>(
542 seq.begin(), seq.end(), compare_function);
546 auto writer = files.back().GetWriter();
548 while (puller.HasNext()) {
549 writer.Put(puller.Next());
557 files.erase(files.begin(), files.begin() + merge_degree);
570 const std::vector<InputTypeFirst>& vec1,
bool external1,
571 const std::vector<InputTypeSecond>& vec2,
bool external2) {
573 if (!external1 && !external2) {
581 else if (external1 && !external2) {
582 LOG1 <<
"Thrill: Warning: Too many equal keys for main memory " 589 for (
auto const& join2 : vec2) {
595 else if (!external1 && external2) {
596 LOG1 <<
"Thrill: Warning: Too many equal keys for main memory " 609 else if (external1 && external2) {
610 LOG1 <<
"Thrill: Warning: Too many equal keys for main memory " 611 <<
"in both DIAs. This is very slow.";
613 size_t capacity = JoinCapacity<InputTypeFirst>();
615 std::vector<InputTypeFirst> temp_vec;
616 temp_vec.reserve(capacity);
623 for (
size_t i = 0; i < capacity && reader1.
HasNext() &&
625 temp_vec.push_back(reader1.template Next<InputTypeFirst>());
641 join_file2_->Clear();
648 template <
typename ItemType,
typename KeyExtractor>
650 std::vector<ItemType>& vec, std::deque<data::File>& files,
651 const KeyExtractor& key_extractor) {
656 std::sort(vec.begin(), vec.end(),
657 [&key_extractor](
const ItemType& i1,
const ItemType& i2) {
658 return key_extractor(i1) < key_extractor(i2);
662 auto writer = files.back().GetWriter();
663 for (
const ItemType& elem : vec) {
702 bool LocationDetectionValue,
705 typename KeyExtractor1,
706 typename KeyExtractor2,
707 typename JoinFunction,
708 typename HashFunction =
709 std::hash<typename common::FunctionTraits<KeyExtractor1>::result_type> >
712 const FirstDIA& first_dia,
const SecondDIA& second_dia,
713 const KeyExtractor1& key_extractor1,
const KeyExtractor2& key_extractor2,
714 const JoinFunction& join_function,
715 const HashFunction& hash_function = HashFunction()) {
717 assert(first_dia.IsValid());
718 assert(second_dia.IsValid());
722 typename FirstDIA::ValueType,
723 typename common::FunctionTraits<KeyExtractor1>::template arg<0>
725 "Key Extractor 1 has the wrong input type");
729 typename SecondDIA::ValueType,
730 typename common::FunctionTraits<KeyExtractor2>::template arg<0>
732 "Key Extractor 2 has the wrong input type");
736 typename common::FunctionTraits<KeyExtractor1>::result_type,
737 typename common::FunctionTraits<KeyExtractor2>::result_type
739 "Keys have different types");
743 typename FirstDIA::ValueType,
744 typename common::FunctionTraits<JoinFunction>::template arg<0>
746 "Join Function has wrong input type in argument 0");
750 typename SecondDIA::ValueType,
751 typename common::FunctionTraits<JoinFunction>::template arg<1>
753 "Join Function has wrong input type in argument 1");
756 =
typename common::FunctionTraits<JoinFunction>::result_type;
759 JoinResult, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2,
760 JoinFunction, HashFunction, LocationDetectionValue>;
762 auto node = tlx::make_counting<JoinNode>(
763 first_dia, second_dia, key_extractor1, key_extractor2, join_function,
801 typename KeyExtractor1,
802 typename KeyExtractor2,
803 typename JoinFunction,
804 typename HashFunction =
805 std::hash<typename common::FunctionTraits<KeyExtractor1>::result_type> >
807 const FirstDIA& first_dia,
const SecondDIA& second_dia,
808 const KeyExtractor1& key_extractor1,
const KeyExtractor2& key_extractor2,
809 const JoinFunction& join_function,
810 const HashFunction& hash_function = HashFunction()) {
814 first_dia, second_dia, key_extractor1, key_extractor2,
815 join_function, hash_function);
825 #endif // !THRILL_API_INNER_JOIN_HEADER std::pair< bool, bool > AddEqualKeysToFile(MergeTree &puller, const KeyExtractor &key_extractor, data::File::Writer &writer, const Key &key)
Adds all elements from merge tree to a data::File, potentially to external memory, afterwards sets the first_element pointer to the first element with a different key.
void StartPrefetch(std::vector< Reader > &readers, size_t prefetch_size)
Take a vector of Readers and prefetch equally from them.
bool HasNext()
HasNext() returns true if at least one more item is available.
DIAMemUse PushDataMemUse() final
Amount of RAM used by PushData()
DIAMemUse ExecuteMemUse() final
Amount of RAM used by Execute()
data::MixStream::Writers hash_writers1_
data::File pre_file1_
location detection and associated files
DIA is the interface between the user and the Thrill framework.
std::vector< size_t > parent_ids() const
Returns the parents of this DIABase.
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Description of the amount of RAM the internal data structures of a DIANode require.
void ReadBits(BitReader &reader)
Read count and dia_mask from BitReader.
JoinNode(const FirstDIA &parent1, const SecondDIA &parent2, const KeyExtractor1 &key_extractor1, const KeyExtractor2 &key_extractor2, const JoinFunction &join_function, const HashFunction &hash_function)
Constructor for a JoinNode.
data::File::Writer pre_writer1_
data::FilePtr join_file2_
void AdviseFree(size_t size)
const struct LocationDetectionFlag< true > LocationDetectionTag
global const LocationDetectionFlag instance
JoinFunction join_function_
data::File::Writer pre_writer2_
HashCount operator+(const HashCount &b) const
void Execute() final
Virtual execution method. Triggers actual computation in sub-classes.
std::pair< size_t, size_t > MaxMergeDegreePrefetch(size_t num_files)
size_t num_workers() const
Global number of workers in the system.
A File is an ordered sequence of Block objects for storing items.
static constexpr bool debug
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
bool memory_exceeded
memory limit exceeded indicator
auto InnerJoin(const LocationDetectionFlag< LocationDetectionValue > &, const FirstDIA &first_dia, const SecondDIA &second_dia, const KeyExtractor1 &key_extractor1, const KeyExtractor2 &key_extractor2, const JoinFunction &join_function, const HashFunction &hash_function=HashFunction())
Performs an inner join between this DIA and the DIA given in the first parameter. ...
core::LocationDetection< HashCount > location_detection_
void ReceiveItems(size_t capacity, data::MixStream::MixReader &reader, std::deque< data::File > &files, const KeyExtractor &key_extractor)
Recieve all elements from a stream and write them to files sorted by key.
void PreOp2(const InputTypeSecond &input)
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
KeyExtractor2 key_extractor2_
typename FirstDIA::ValueType InputTypeFirst
void JoinAllElements(const std::vector< InputTypeFirst > &vec1, bool external1, const std::vector< InputTypeSecond > &vec2, bool external2)
Joins all elements in cartesian product of both vectors.
Reader to retrieve items in unordered sequence from a MixBlockQueue.
ConsumeReader GetConsumeReader(size_t prefetch_size=File::default_prefetch_size_)
Get consuming BlockReader for beginning of File.
DIAMemUse PreOpMemUse() final
Amount of RAM used by PreOp after StartPreOp()
bool NeedBroadcast() const
void MainOp()
Receive elements from other workers, create pre-sorted files.
data::MixStreamPtr hash_stream2_
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
StreamData::Writers Writers
KeyExtractor1 key_extractor1_
user-defined functions
std::deque< data::File > files2_
tlx::counting_ptr< file > file_ptr
A reference counting pointer for file.
auto MakePuller(std::deque< data::File > &files, std::vector< data::File::Reader > &seq, CompareFunction compare_function, bool consume)
data::FilePtr GetFilePtr(size_t dia_id)
void StartPreOp(size_t parent_index) final
Virtual method for preparing start of PushData.
static IntegerType AddTruncToType(const IntegerType &a, const IntegerType &b)
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
data::MixStreamPtr GetNewMixStream(size_t dia_id)
const size_t & dia_id() const
return unique id of DIANode subclass as stored by StatsNode
void vector_free(std::vector< Type > &v)
A DOpNode is a typed node representing and distributed operations in Thrill.
bool location_detection_initialized_
typename SecondDIA::ValueType InputTypeSecond
hash counter used by LocationDetection
bool operator<(const HashCount &b) const
HashCount & operator+=(const HashCount &b)
data::MixStreamPtr hash_stream1_
data streams for inter-worker communication of DIA elements
void WriteBits(BitWriter &writer) const
Write count and dia_mask to BitWriter.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
std::pair< bool, bool > AddEqualKeysToVec(std::vector< ItemType > &vec, MergeTree &puller, const KeyExtractor &key_extractor, data::FilePtr &file_ptr)
Adds all elements from merge tree to a vector, afterwards sets the first_element pointer to the first...
typename common::FunctionTraits< KeyExtractor1 >::result_type Key
Key type of join. must be equal to the other key extractor.
data::FilePtr join_file1_
void StopPreOp(size_t parent_index) final
Virtual method for preparing end of PushData.
void Close()
Explicitly close the writer.
static constexpr size_t counter_bits_
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
tag structure for GroupByKey(), and InnerJoin()
void SortAndWriteToFile(std::vector< ItemType > &vec, std::deque< data::File > &files, const KeyExtractor &key_extractor)
Sorts all elements in a vector and writes them to a file.
data::MixStream::Writers hash_writers2_
std::deque< data::File > files1_
files for sorted datasets
void PreOp1(const InputTypeFirst &input)
#define LOG
Default logging method: output if the local debug variable is true.
HashFunction hash_function_
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Context & context_
associated Context
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
void MergeFiles(std::deque< data::File > &files, CompareFunction compare_function)
Merge files when there are too many for the merge tree to handle.
Performs an inner join between two DIAs.