Thrill  0.1
JoinNode< ValueType, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2, JoinFunction, HashFunction, UseLocationDetection > Class Template Referencefinal

Detailed Description

template<typename ValueType, typename FirstDIA, typename SecondDIA, typename KeyExtractor1, typename KeyExtractor2, typename JoinFunction, typename HashFunction, bool UseLocationDetection>
class thrill::api::JoinNode< ValueType, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2, JoinFunction, HashFunction, UseLocationDetection >

Performs an inner join between two DIAs.

The key from each DIA element is hereby extracted with a key extractor function. All pairs of elements with equal keys from both DIAs are then joined with the join function.

Template Parameters
KeyExtractor1Type of the key_extractor1 function. This is a function ValueType to the key type.
KeyExtractor2Type of the key_extractor2 function. This is a function from SecondDIA::ValueType to the key type.
JoinFunctionType of the join_function. This is a function from ValueType and SecondDIA::ValueType to the type of the output DIA.
Parameters
SecondDIAOther DIA joined with this DIA.
key_extractor1Key extractor for first DIA
key_extractor2Key extractor for second DIA
join_functionJoin function applied to all equal key pairs

Definition at line 61 of file inner_join.hpp.

+ Inheritance diagram for JoinNode< ValueType, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2, JoinFunction, HashFunction, UseLocationDetection >:
+ Collaboration diagram for JoinNode< ValueType, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2, JoinFunction, HashFunction, UseLocationDetection >:

#include <inner_join.hpp>

Classes

class  HashCount
 hash counter used by LocationDetection More...
 

Public Member Functions

 JoinNode (const FirstDIA &parent1, const SecondDIA &parent2, const KeyExtractor1 &key_extractor1, const KeyExtractor2 &key_extractor2, const JoinFunction &join_function, const HashFunction &hash_function)
 Constructor for a JoinNode. More...
 
void Dispose () final
 Virtual clear method. Triggers actual disposing in sub-classes. More...
 
void Execute () final
 Virtual execution method. Triggers actual computation in sub-classes. More...
 
template<typename ElementType , typename CompareFunction >
auto MakePuller (std::deque< data::File > &files, std::vector< data::File::Reader > &seq, CompareFunction compare_function, bool consume)
 
void PushData (bool consume) final
 Virtual method for pushing data. Triggers actual pushing in sub-classes. More...
 
- Public Member Functions inherited from DOpNode< ValueType >
 DOpNode (Context &ctx, const char *label, const std::initializer_list< size_t > &parent_ids, const std::initializer_list< DIABasePtr > &parents)
 Constructor for a DOpNode, which sets references to the parent nodes. More...
 
 DOpNode (Context &ctx, const char *label, std::vector< size_t > &&parent_ids, std::vector< DIABasePtr > &&parents)
 Constructor for a DOpNode, which sets references to the parent nodes. More...
 
- Public Member Functions inherited from DIANode< ValueType >
 DIANode (Context &ctx, const char *label, const std::initializer_list< size_t > &parent_ids, const std::initializer_list< DIABasePtr > &parents)
 Constructor for a DIANode, which sets references to the parent nodes. More...
 
 DIANode (Context &ctx, const char *label, std::vector< size_t > &&parent_ids, std::vector< DIABasePtr > &&parents)
 Constructor for a DIANode, which sets references to the parent nodes. More...
 
virtual void AddChild (DIABase *node, const Callback &callback=Callback(), size_t parent_index=0)
 Enables children to push their "folded" function chains to their parent. More...
 
std::vector< DIABase * > children () const override
 Returns the children of this DIABase. More...
 
void PushFile (data::File &file, bool consume) const
 
void PushItem (const ValueType &item) const
 Method for derived classes to Push a single item to all children. More...
 
void RemoveAllChildren () override
 
void RemoveChild (DIABase *node) override
 
void RunPushData () override
 
- Public Member Functions inherited from DIABase
 DIABase (Context &ctx, const char *label, const std::initializer_list< size_t > &parent_ids, const std::initializer_list< DIABasePtr > &parents)
 The constructor for a DIABase. More...
 
 DIABase (Context &ctx, const char *label, std::vector< size_t > &&parent_ids, std::vector< DIABasePtr > &&parents)
 The constructor for a DIABase. More...
 
 DIABase (const DIABase &)=delete
 non-copyable: delete copy-constructor More...
 
 DIABase (DIABase &&)=default
 move-constructor: default More...
 
virtual ~DIABase ()
 Virtual destructor for a DIABase. More...
 
virtual size_t consume_counter () const
 Returns consume_counter_. More...
 
Contextcontext ()
 Returns the api::Context of this DIABase. More...
 
virtual void DecConsumeCounter (size_t counter)
 
const size_t & dia_id () const
 return unique id of DIANode subclass as stored by StatsNode More...
 
virtual bool ForwardDataOnly () const
 
virtual void IncConsumeCounter (size_t counter)
 
const char * label () const
 return label() of DIANode subclass as stored by StatsNode More...
 
mem::Managermem_manager ()
 Return the Context's memory manager. More...
 
DIABaseoperator= (const DIABase &)=delete
 non-copyable: delete assignment operator More...
 
DIABaseoperator= (DIABase &&)=default
 move-assignment operator: default More...
 
std::vector< size_t > parent_ids () const
 Returns the parents of this DIABase. More...
 
const std::vector< DIABasePtr > & parents () const
 Returns the parents of this DIABase. More...
 
void RemoveParent (DIABase *p)
 Remove a parent. More...
 
virtual bool RequireParentPushData (size_t) const
 
void RunScope ()
 
void set_mem_limit (const DIAMemUse &mem_limit)
 
void set_state (const DIAState &state)
 
virtual void SetConsumeCounter (size_t counter)
 
DIAState state () const
 
virtual bool OnPreOpFile (const data::File &, size_t)
 
- Public Member Functions inherited from ReferenceCounter
 ReferenceCounter () noexcept
 new objects have zero reference count More...
 
 ReferenceCounter (const ReferenceCounter &) noexcept
 coping still creates a new object with zero reference count More...
 
 ~ReferenceCounter ()
 
bool dec_reference () const noexcept
 Call whenever resetting (i.e. More...
 
void inc_reference () const noexcept
 Call whenever setting a pointer to the object. More...
 
ReferenceCounteroperator= (const ReferenceCounter &) noexcept
 assignment operator, leaves pointers unchanged More...
 
size_t reference_count () const noexcept
 Return the number of references to this object (for debugging) More...
 
bool unique () const noexcept
 Test if the ReferenceCounter is referenced by only one CountingPtr. More...
 

Private Types

using InputTypeFirst = typename FirstDIA::ValueType
 
using InputTypeSecond = typename SecondDIA::ValueType
 
using Key = typename common::FunctionTraits< KeyExtractor1 >::result_type
 Key type of join. must be equal to the other key extractor. More...
 
using Super = DOpNode< ValueType >
 

Private Member Functions

template<typename KeyExtractor , typename MergeTree >
std::pair< bool, bool > AddEqualKeysToFile (MergeTree &puller, const KeyExtractor &key_extractor, data::File::Writer &writer, const Key &key)
 Adds all elements from merge tree to a data::File, potentially to external memory, afterwards sets the first_element pointer to the first element with a different key. More...
 
template<typename ItemType , typename KeyExtractor , typename MergeTree >
std::pair< bool, bool > AddEqualKeysToVec (std::vector< ItemType > &vec, MergeTree &puller, const KeyExtractor &key_extractor, data::FilePtr &file_ptr)
 Adds all elements from merge tree to a vector, afterwards sets the first_element pointer to the first element with a different key. More...
 
DIAMemUse ExecuteMemUse () final
 Amount of RAM used by Execute() More...
 
void JoinAllElements (const std::vector< InputTypeFirst > &vec1, bool external1, const std::vector< InputTypeSecond > &vec2, bool external2)
 Joins all elements in cartesian product of both vectors. More...
 
template<typename ItemType >
size_t JoinCapacity ()
 
void MainOp ()
 Receive elements from other workers, create pre-sorted files. More...
 
template<typename ItemType , typename CompareFunction >
void MergeFiles (std::deque< data::File > &files, CompareFunction compare_function)
 Merge files when there are too many for the merge tree to handle. More...
 
void PreOp1 (const InputTypeFirst &input)
 
void PreOp2 (const InputTypeSecond &input)
 
DIAMemUse PreOpMemUse () final
 Amount of RAM used by PreOp after StartPreOp() More...
 
DIAMemUse PushDataMemUse () final
 Amount of RAM used by PushData() More...
 
template<typename ItemType , typename KeyExtractor >
void ReceiveItems (size_t capacity, data::MixStream::MixReader &reader, std::deque< data::File > &files, const KeyExtractor &key_extractor)
 Recieve all elements from a stream and write them to files sorted by key. More...
 
template<typename ItemType , typename KeyExtractor >
void SortAndWriteToFile (std::vector< ItemType > &vec, std::deque< data::File > &files, const KeyExtractor &key_extractor)
 Sorts all elements in a vector and writes them to a file. More...
 
void StartPreOp (size_t parent_index) final
 Virtual method for preparing start of PushData. More...
 
void StopPreOp (size_t parent_index) final
 Virtual method for preparing end of PushData. More...
 

Private Attributes

std::deque< data::Filefiles1_
 files for sorted datasets More...
 
std::deque< data::Filefiles2_
 
HashFunction hash_function_
 
data::MixStreamPtr hash_stream1_ { context_.GetNewMixStream(this) }
 data streams for inter-worker communication of DIA elements More...
 
data::MixStreamPtr hash_stream2_ { context_.GetNewMixStream(this) }
 
data::MixStream::Writers hash_writers1_ { hash_stream1_->GetWriters() }
 
data::MixStream::Writers hash_writers2_ { hash_stream2_->GetWriters() }
 
data::FilePtr join_file1_
 
data::FilePtr join_file2_
 
JoinFunction join_function_
 
KeyExtractor1 key_extractor1_
 user-defined functions More...
 
KeyExtractor2 key_extractor2_
 
core::LocationDetection< HashCountlocation_detection_ { context_, Super::dia_id() }
 
bool location_detection_initialized_ = false
 
data::File pre_file1_ { context_.GetFile(this) }
 location detection and associated files More...
 
data::File pre_file2_ { context_.GetFile(this) }
 
data::File::Writer pre_writer1_
 
data::File::Writer pre_writer2_
 

Static Private Attributes

static constexpr bool debug = false
 

Additional Inherited Members

- Public Types inherited from DOpNode< ValueType >
using Super = DIANode< ValueType >
 
- Public Types inherited from DIANode< ValueType >
using Callback = tlx::delegate< void(const ValueType &)>
 
- Public Types inherited from DIABase
using DIABasePtr = tlx::CountingPtr< DIABase >
 
- Public Attributes inherited from DIABase
common::JsonLogger logger_
 
- Static Public Attributes inherited from DIABase
static constexpr size_t kNeverConsume = static_cast<size_t>(-1)
 Never full consume. More...
 
- Protected Attributes inherited from DIANode< ValueType >
std::vector< Childchildren_
 Callback functions from the child nodes. More...
 
- Protected Attributes inherited from DIABase
Contextcontext_
 associated Context More...
 
const size_t dia_id_
 DIA serial id. More...
 
const char *const label_
 DOp node static label. More...
 
DIAState state_ = DIAState::NEW
 State of the DIANode. State is NEW on creation. More...
 
std::vector< DIABasePtrparents_
 Parents of this DIABase. More...
 
DIAMemUse mem_limit_ = 0
 
size_t consume_counter_ = 1
 

Member Typedef Documentation

◆ InputTypeFirst

using InputTypeFirst = typename FirstDIA::ValueType
private

Definition at line 69 of file inner_join.hpp.

◆ InputTypeSecond

using InputTypeSecond = typename SecondDIA::ValueType
private

Definition at line 70 of file inner_join.hpp.

◆ Key

using Key = typename common::FunctionTraits<KeyExtractor1>::result_type
private

Key type of join. must be equal to the other key extractor.

Definition at line 73 of file inner_join.hpp.

◆ Super

using Super = DOpNode<ValueType>
private

Definition at line 66 of file inner_join.hpp.

Constructor & Destructor Documentation

◆ JoinNode()

Member Function Documentation

◆ AddEqualKeysToFile()

std::pair<bool, bool> AddEqualKeysToFile ( MergeTree &  puller,
const KeyExtractor &  key_extractor,
data::File::Writer writer,
const Key key 
)
inlineprivate

Adds all elements from merge tree to a data::File, potentially to external memory, afterwards sets the first_element pointer to the first element with a different key.

Parameters
pullerInput merge tree
key_extractorKey extractor function
writerFile writer
keytarget key
Returns
Pair of bools, first bool indicates whether the merge tree is emptied, second bool indicates whether external memory was needed (always true, when this method was called).

Definition at line 435 of file inner_join.hpp.

References BlockWriter< BlockSink >::Put().

Referenced by JoinNode< ValueType, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2, JoinFunction, HashFunction, UseLocationDetection >::AddEqualKeysToVec().

◆ AddEqualKeysToVec()

std::pair<bool, bool> AddEqualKeysToVec ( std::vector< ItemType > &  vec,
MergeTree &  puller,
const KeyExtractor &  key_extractor,
data::FilePtr file_ptr 
)
inlineprivate

Adds all elements from merge tree to a vector, afterwards sets the first_element pointer to the first element with a different key.

Parameters
vectarget vector
pullerInput merge tree
key_extractorKey extractor function
file_ptrPointer to a data::File
Returns
Pair of bools, first bool indicates whether the merge tree is emptied, second bool indicates whether external memory was needed.

vec is very large when this happens swap with empty vector to free the memory

Definition at line 380 of file inner_join.hpp.

References JoinNode< ValueType, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2, JoinFunction, HashFunction, UseLocationDetection >::AddEqualKeysToFile(), DIABase::context_, Context::GetFilePtr(), thrill::mem::memory_exceeded, BlockWriter< BlockSink >::Put(), and tlx::vector_free().

Referenced by JoinNode< ValueType, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2, JoinFunction, HashFunction, UseLocationDetection >::PushData().

◆ Dispose()

◆ Execute()

◆ ExecuteMemUse()

DIAMemUse ExecuteMemUse ( )
inlinefinalprivatevirtual

Amount of RAM used by Execute()

Reimplemented from DIABase.

Definition at line 483 of file inner_join.hpp.

References DIAMemUse::Max().

◆ JoinAllElements()

◆ JoinCapacity()

size_t JoinCapacity ( )
inlineprivate

Definition at line 360 of file inner_join.hpp.

References DIABase::mem_limit_.

◆ MainOp()

◆ MakePuller()

auto MakePuller ( std::deque< data::File > &  files,
std::vector< data::File::Reader > &  seq,
CompareFunction  compare_function,
bool  consume 
)
inline

◆ MergeFiles()

void MergeFiles ( std::deque< data::File > &  files,
CompareFunction  compare_function 
)
inlineprivate

Merge files when there are too many for the merge tree to handle.

Definition at line 519 of file inner_join.hpp.

References Context::block_pool(), DIABase::context_, Context::GetFile(), BlockPool::MaxMergeDegreePrefetch(), sLOG1, and thrill::data::StartPrefetch().

◆ PreOp1()

◆ PreOp2()

◆ PreOpMemUse()

DIAMemUse PreOpMemUse ( )
inlinefinalprivatevirtual

Amount of RAM used by PreOp after StartPreOp()

Reimplemented from DIABase.

Definition at line 451 of file inner_join.hpp.

References DIAMemUse::Max().

◆ PushData()

void PushData ( bool  consume)
inlinefinalvirtual

◆ PushDataMemUse()

DIAMemUse PushDataMemUse ( )
inlinefinalprivatevirtual

Amount of RAM used by PushData()

Reimplemented from DIABase.

Definition at line 487 of file inner_join.hpp.

References DIAMemUse::Max().

◆ ReceiveItems()

void ReceiveItems ( size_t  capacity,
data::MixStream::MixReader reader,
std::deque< data::File > &  files,
const KeyExtractor &  key_extractor 
)
inlineprivate

◆ SortAndWriteToFile()

void SortAndWriteToFile ( std::vector< ItemType > &  vec,
std::deque< data::File > &  files,
const KeyExtractor &  key_extractor 
)
inlineprivate

◆ StartPreOp()

◆ StopPreOp()

void StopPreOp ( size_t  )
inlinefinalprivatevirtual

Virtual method for preparing end of PushData.

Reimplemented from DIABase.

Definition at line 472 of file inner_join.hpp.

References BlockWriter< BlockSink >::Close(), and LOG.

Member Data Documentation

◆ debug

constexpr bool debug = false
staticprivate

Definition at line 64 of file inner_join.hpp.

◆ files1_

◆ files2_

◆ hash_function_

◆ hash_stream1_

data::MixStreamPtr hash_stream1_ { context_.GetNewMixStream(this) }
private

data streams for inter-worker communication of DIA elements

Definition at line 306 of file inner_join.hpp.

Referenced by JoinNode< ValueType, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2, JoinFunction, HashFunction, UseLocationDetection >::MainOp().

◆ hash_stream2_

◆ hash_writers1_

◆ hash_writers2_

◆ join_file1_

◆ join_file2_

◆ join_function_

◆ key_extractor1_

◆ key_extractor2_

◆ location_detection_

◆ location_detection_initialized_

bool location_detection_initialized_ = false
private

Definition at line 318 of file inner_join.hpp.

◆ pre_file1_

◆ pre_file2_

◆ pre_writer1_

data::File::Writer pre_writer1_
private

Definition at line 313 of file inner_join.hpp.

◆ pre_writer2_

data::File::Writer pre_writer2_
private

Definition at line 315 of file inner_join.hpp.


The documentation for this class was generated from the following file: