Thrill  0.1
MergeNode< ValueType, Comparator, kNumInputs > Class Template Reference

Detailed Description

template<typename ValueType, typename Comparator, size_t kNumInputs>
class thrill::api::MergeNode< ValueType, Comparator, kNumInputs >

Implementation of Thrill's merge.

This merge implementation balances all data before merging, so each worker has the same amount of data when merge finishes.

The algorithm performs a distributed multi-sequence selection by picking random pivots (from the largest remaining interval) for each DIA. The pivots are selected via a global AllReduce. There is one pivot per DIA.

Then the pivots are searched for in the interval [left,left + width) in each local File's partition, where these are initialized with left = 0 and width = File.size(). This delivers the local_rank of each pivot. From the local_ranks the corresponding global_ranks of each pivot is calculated via a AllReduce.

The global_ranks are then compared to the target_ranks (which are n/p * rank). If global_ranks is smaller, the interval [left,left + width) is reduced to [left,idx), where idx is the rank of the pivot in the local File. If global_ranks is larger, the interval is reduced to [idx,left+width).

left -> width V V V V V V +---------—+ +--------—+ +----------------—+ DIA 0 ^ local_ranks, global_ranks = sum over all local_ranks

Template Parameters
ValueTypeThe type of the first and second input DIA
ComparatorThe comparator defining input and output order.
ParentDIA0The type of the first input DIA
ParentDIAsThe types of the other input DIAs

Definition at line 76 of file merge.hpp.

+ Inheritance diagram for MergeNode< ValueType, Comparator, kNumInputs >:
+ Collaboration diagram for MergeNode< ValueType, Comparator, kNumInputs >:

#include <merge.hpp>

Classes

struct  Pivot
 
class  ReducePivots
 
class  RegisterParent
 
class  Stats
 Stats holds timers for measuring merge performance, that supports accumulating the output and printing it to the standard out stream. More...
 

Public Member Functions

template<typename ParentDIA0 , typename... ParentDIAs>
 MergeNode (const Comparator &comparator, const ParentDIA0 &parent0, const ParentDIAs &... parents)
 
void Dispose () final
 Virtual clear method. Triggers actual disposing in sub-classes. More...
 
void Execute () final
 Virtual execution method. Triggers actual computation in sub-classes. More...
 
bool OnPreOpFile (const data::File &file, size_t parent_index) final
 Receive a whole data::File of ValueType, but only if our stack is empty. More...
 
void PushData (bool consume) final
 Virtual method for pushing data. Triggers actual pushing in sub-classes. More...
 
void StopPreOp (size_t parent_index) final
 Virtual method for preparing end of PushData. More...
 
- Public Member Functions inherited from DOpNode< ValueType >
 DOpNode (Context &ctx, const char *label, const std::initializer_list< size_t > &parent_ids, const std::initializer_list< DIABasePtr > &parents)
 Constructor for a DOpNode, which sets references to the parent nodes. More...
 
 DOpNode (Context &ctx, const char *label, std::vector< size_t > &&parent_ids, std::vector< DIABasePtr > &&parents)
 Constructor for a DOpNode, which sets references to the parent nodes. More...
 
- Public Member Functions inherited from DIANode< ValueType >
 DIANode (Context &ctx, const char *label, const std::initializer_list< size_t > &parent_ids, const std::initializer_list< DIABasePtr > &parents)
 Constructor for a DIANode, which sets references to the parent nodes. More...
 
 DIANode (Context &ctx, const char *label, std::vector< size_t > &&parent_ids, std::vector< DIABasePtr > &&parents)
 Constructor for a DIANode, which sets references to the parent nodes. More...
 
virtual void AddChild (DIABase *node, const Callback &callback=Callback(), size_t parent_index=0)
 Enables children to push their "folded" function chains to their parent. More...
 
std::vector< DIABase * > children () const override
 Returns the children of this DIABase. More...
 
void PushFile (data::File &file, bool consume) const
 
void PushItem (const ValueType &item) const
 Method for derived classes to Push a single item to all children. More...
 
void RemoveAllChildren () override
 
void RemoveChild (DIABase *node) override
 
void RunPushData () override
 
- Public Member Functions inherited from DIABase
 DIABase (Context &ctx, const char *label, const std::initializer_list< size_t > &parent_ids, const std::initializer_list< DIABasePtr > &parents)
 The constructor for a DIABase. More...
 
 DIABase (Context &ctx, const char *label, std::vector< size_t > &&parent_ids, std::vector< DIABasePtr > &&parents)
 The constructor for a DIABase. More...
 
 DIABase (const DIABase &)=delete
 non-copyable: delete copy-constructor More...
 
 DIABase (DIABase &&)=default
 move-constructor: default More...
 
virtual ~DIABase ()
 Virtual destructor for a DIABase. More...
 
virtual size_t consume_counter () const
 Returns consume_counter_. More...
 
Contextcontext ()
 Returns the api::Context of this DIABase. More...
 
virtual void DecConsumeCounter (size_t counter)
 
const size_t & dia_id () const
 return unique id of DIANode subclass as stored by StatsNode More...
 
virtual bool ForwardDataOnly () const
 
virtual void IncConsumeCounter (size_t counter)
 
const char * label () const
 return label() of DIANode subclass as stored by StatsNode More...
 
mem::Managermem_manager ()
 Return the Context's memory manager. More...
 
DIABaseoperator= (const DIABase &)=delete
 non-copyable: delete assignment operator More...
 
DIABaseoperator= (DIABase &&)=default
 move-assignment operator: default More...
 
std::vector< size_t > parent_ids () const
 Returns the parents of this DIABase. More...
 
const std::vector< DIABasePtr > & parents () const
 Returns the parents of this DIABase. More...
 
void RemoveParent (DIABase *p)
 Remove a parent. More...
 
virtual bool RequireParentPushData (size_t) const
 
void RunScope ()
 
void set_mem_limit (const DIAMemUse &mem_limit)
 
void set_state (const DIAState &state)
 
virtual void SetConsumeCounter (size_t counter)
 
DIAState state () const
 
virtual DIAMemUse PreOpMemUse ()
 Amount of RAM used by PreOp after StartPreOp() More...
 
virtual void StartPreOp (size_t)
 Virtual method for preparing start of PushData. More...
 
virtual DIAMemUse ExecuteMemUse ()
 Amount of RAM used by Execute() More...
 
virtual DIAMemUse PushDataMemUse ()
 Amount of RAM used by PushData() More...
 
- Public Member Functions inherited from ReferenceCounter
 ReferenceCounter () noexcept
 new objects have zero reference count More...
 
 ReferenceCounter (const ReferenceCounter &) noexcept
 coping still creates a new object with zero reference count More...
 
 ~ReferenceCounter ()
 
bool dec_reference () const noexcept
 Call whenever resetting (i.e. More...
 
void inc_reference () const noexcept
 Call whenever setting a pointer to the object. More...
 
ReferenceCounteroperator= (const ReferenceCounter &) noexcept
 assignment operator, leaves pointers unchanged More...
 
size_t reference_count () const noexcept
 Return the number of references to this object (for debugging) More...
 
bool unique () const noexcept
 Test if the ReferenceCounter is referenced by only one CountingPtr. More...
 

Private Types

using ArrayNumInputsSizeT = std::array< size_t, kNumInputs >
 
using StatsTimer = common::StatsTimerBaseStopped< stats_enabled >
 
using Super = DOpNode< ValueType >
 

Private Member Functions

void GetGlobalRanks (const std::vector< Pivot > &pivots, std::vector< size_t > &global_ranks, std::vector< ArrayNumInputsSizeT > &out_local_ranks, const std::vector< ArrayNumInputsSizeT > &left, const std::vector< ArrayNumInputsSizeT > &width)
 Calculates the global ranks of the given pivots. More...
 
void MainOp ()
 Receives elements from other workers and re-balance them, so each worker has the same amount after merging. More...
 
void SearchStep (const std::vector< size_t > &global_ranks, const std::vector< ArrayNumInputsSizeT > &local_ranks, const std::vector< size_t > &target_ranks, std::vector< ArrayNumInputsSizeT > &left, std::vector< ArrayNumInputsSizeT > &width)
 Shrinks the search ranges according to the global ranks of the pivots. More...
 
void SelectPivots (const std::vector< ArrayNumInputsSizeT > &left, const std::vector< ArrayNumInputsSizeT > &width, std::vector< Pivot > &out_pivots)
 Selects random global pivots for all splitter searches based on all worker's search ranges. More...
 

Static Private Member Functions

static std::string VToStr (const std::vector< Pivot > &data)
 Logging helper to print vectors of vectors of pivots. More...
 

Private Attributes

Comparator comparator_
 Merge comparator. More...
 
data::FilePtr files_ [kNumInputs]
 Files for intermediate storage. More...
 
const std::array< bool, kNumInputs > parent_stack_empty_
 Whether the parent stack is empty. More...
 
size_t prefix_size_
 Count of items on all prev workers. More...
 
Stats stats_
 Instance of merge statistics. More...
 
data::CatStreamPtr streams_ [kNumInputs]
 Array of inbound CatStreams. More...
 
data::File::Writer writers_ [kNumInputs]
 Writers to intermediate files. More...
 

Static Private Attributes

static constexpr bool debug = false
 
static constexpr bool self_verify = debug && common::g_debug_mode
 
static constexpr bool stats_enabled = false
 Set this variable to true to enable generation and output of merge stats. More...
 

Additional Inherited Members

- Public Types inherited from DOpNode< ValueType >
using Super = DIANode< ValueType >
 
- Public Types inherited from DIANode< ValueType >
using Callback = tlx::delegate< void(const ValueType &)>
 
- Public Types inherited from DIABase
using DIABasePtr = tlx::CountingPtr< DIABase >
 
- Public Attributes inherited from DIABase
common::JsonLogger logger_
 
- Static Public Attributes inherited from DIABase
static constexpr size_t kNeverConsume = static_cast<size_t>(-1)
 Never full consume. More...
 
- Protected Attributes inherited from DIANode< ValueType >
std::vector< Childchildren_
 Callback functions from the child nodes. More...
 
- Protected Attributes inherited from DIABase
Contextcontext_
 associated Context More...
 
const size_t dia_id_
 DIA serial id. More...
 
const char *const label_
 DOp node static label. More...
 
DIAState state_ = DIAState::NEW
 State of the DIANode. State is NEW on creation. More...
 
std::vector< DIABasePtrparents_
 Parents of this DIABase. More...
 
DIAMemUse mem_limit_ = 0
 
size_t consume_counter_ = 1
 

Member Typedef Documentation

◆ ArrayNumInputsSizeT

using ArrayNumInputsSizeT = std::array<size_t, kNumInputs>
private

Definition at line 214 of file merge.hpp.

◆ StatsTimer

Definition at line 238 of file merge.hpp.

◆ Super

using Super = DOpNode<ValueType>
private

Definition at line 84 of file merge.hpp.

Constructor & Destructor Documentation

◆ MergeNode()

Member Function Documentation

◆ Dispose()

void Dispose ( )
inlinefinalvirtual

Virtual clear method. Triggers actual disposing in sub-classes.

Reimplemented from DIABase.

Definition at line 187 of file merge.hpp.

◆ Execute()

void Execute ( )
inlinefinalvirtual

Virtual execution method. Triggers actual computation in sub-classes.

Implements DIABase.

Definition at line 156 of file merge.hpp.

References MergeNode< ValueType, Comparator, kNumInputs >::MainOp().

◆ GetGlobalRanks()

void GetGlobalRanks ( const std::vector< Pivot > &  pivots,
std::vector< size_t > &  global_ranks,
std::vector< ArrayNumInputsSizeT > &  out_local_ranks,
const std::vector< ArrayNumInputsSizeT > &  left,
const std::vector< ArrayNumInputsSizeT > &  width 
)
inlineprivate

Calculates the global ranks of the given pivots.

Additionally returns the local ranks so we can use them in the next step.

Definition at line 377 of file merge.hpp.

References FlowControlChannel::AllReduce(), MergeNode< ValueType, Comparator, kNumInputs >::Stats::comm_timer_, DIABase::context_, MergeNode< ValueType, Comparator, kNumInputs >::Stats::file_op_timer_, Context::net, and gen_data::value.

Referenced by MergeNode< ValueType, Comparator, kNumInputs >::MainOp().

◆ MainOp()

◆ OnPreOpFile()

bool OnPreOpFile ( const data::File file,
size_t  parent_index 
)
inlinefinalvirtual

Receive a whole data::File of ValueType, but only if our stack is empty.

Reimplemented from DIABase.

Definition at line 142 of file merge.hpp.

References MergeNode< ValueType, Comparator, kNumInputs >::files_, and MergeNode< ValueType, Comparator, kNumInputs >::parent_stack_empty_.

◆ PushData()

◆ SearchStep()

void SearchStep ( const std::vector< size_t > &  global_ranks,
const std::vector< ArrayNumInputsSizeT > &  local_ranks,
const std::vector< size_t > &  target_ranks,
std::vector< ArrayNumInputsSizeT > &  left,
std::vector< ArrayNumInputsSizeT > &  width 
)
inlineprivate

Shrinks the search ranges according to the global ranks of the pivots.

Parameters
global_ranksThe global ranks of all pivots.
local_ranksThe local ranks of each pivot in each file.
target_ranksThe desired ranks of the splitters we are looking for.
leftThe left bounds of all search ranges for all files. The first index identifies the splitter, the second index identifies the file. This parameter will be modified.
widthThe width of all search ranges for all files. The first index identifies the splitter, the second index identifies the file. This parameter will be modified.

Definition at line 429 of file merge.hpp.

References die_unless.

Referenced by MergeNode< ValueType, Comparator, kNumInputs >::MainOp().

◆ SelectPivots()

void SelectPivots ( const std::vector< ArrayNumInputsSizeT > &  left,
const std::vector< ArrayNumInputsSizeT > &  width,
std::vector< Pivot > &  out_pivots 
)
inlineprivate

Selects random global pivots for all splitter searches based on all worker's search ranges.

Parameters
leftThe left bounds of all search ranges for all files. The first index identifies the splitter, the second index identifies the file.
widthThe width of all search ranges for all files. The first index identifies the splitter, the second index identifies the file.
out_pivotsThe output pivots.

Definition at line 325 of file merge.hpp.

References FlowControlChannel::AllReduce(), MergeNode< ValueType, Comparator, kNumInputs >::Stats::comm_timer_, DIABase::context_, MergeNode< ValueType, Comparator, kNumInputs >::Stats::file_op_timer_, LOG, Context::net, Context::rng_, and MergeNode< ValueType, Comparator, kNumInputs >::VToStr().

Referenced by MergeNode< ValueType, Comparator, kNumInputs >::MainOp().

◆ StopPreOp()

void StopPreOp ( size_t  )
inlinefinalvirtual

Virtual method for preparing end of PushData.

Reimplemented from DIABase.

Definition at line 152 of file merge.hpp.

References BlockWriter< BlockSink >::Close(), and MergeNode< ValueType, Comparator, kNumInputs >::writers_.

◆ VToStr()

static std::string VToStr ( const std::vector< Pivot > &  data)
inlinestaticprivate

Logging helper to print vectors of vectors of pivots.

Definition at line 217 of file merge.hpp.

Referenced by MergeNode< ValueType, Comparator, kNumInputs >::MainOp(), and MergeNode< ValueType, Comparator, kNumInputs >::SelectPivots().

Member Data Documentation

◆ comparator_

◆ debug

constexpr bool debug = false
staticprivate

Definition at line 78 of file merge.hpp.

◆ files_

data::FilePtr files_[kNumInputs]
private

◆ parent_stack_empty_

const std::array<bool, kNumInputs> parent_stack_empty_
private

Whether the parent stack is empty.

Definition at line 194 of file merge.hpp.

Referenced by MergeNode< ValueType, Comparator, kNumInputs >::MergeNode(), and MergeNode< ValueType, Comparator, kNumInputs >::OnPreOpFile().

◆ prefix_size_

size_t prefix_size_
private

Count of items on all prev workers.

Definition at line 212 of file merge.hpp.

◆ self_verify

constexpr bool self_verify = debug && common::g_debug_mode
staticprivate

Definition at line 79 of file merge.hpp.

◆ stats_

Stats stats_
private

Instance of merge statistics.

Definition at line 310 of file merge.hpp.

Referenced by MergeNode< ValueType, Comparator, kNumInputs >::PushData().

◆ stats_enabled

constexpr bool stats_enabled = false
staticprivate

Set this variable to true to enable generation and output of merge stats.

Definition at line 82 of file merge.hpp.

◆ streams_

data::CatStreamPtr streams_[kNumInputs]
private

Array of inbound CatStreams.

Definition at line 203 of file merge.hpp.

Referenced by MergeNode< ValueType, Comparator, kNumInputs >::PushData().

◆ writers_


The documentation for this class was generated from the following file: