Thrill
0.1
|
Implementation of Thrill's merge.
This merge implementation balances all data before merging, so each worker has the same amount of data when merge finishes.
The algorithm performs a distributed multi-sequence selection by picking random pivots (from the largest remaining interval) for each DIA. The pivots are selected via a global AllReduce. There is one pivot per DIA.
Then the pivots are searched for in the interval [left,left + width) in each local File's partition, where these are initialized with left = 0 and width = File.size(). This delivers the local_rank of each pivot. From the local_ranks the corresponding global_ranks of each pivot is calculated via a AllReduce.
The global_ranks are then compared to the target_ranks (which are n/p * rank). If global_ranks is smaller, the interval [left,left + width) is reduced to [left,idx), where idx is the rank of the pivot in the local File. If global_ranks is larger, the interval is reduced to [idx,left+width).
left -> width V V V V V V +---------—+ +--------—+ +----------------—+ DIA 0 ^ local_ranks, global_ranks = sum over all local_ranks
#include <merge.hpp>
Classes | |
struct | Pivot |
class | ReducePivots |
class | RegisterParent |
class | Stats |
Stats holds timers for measuring merge performance, that supports accumulating the output and printing it to the standard out stream. More... | |
Public Member Functions | |
template<typename ParentDIA0 , typename... ParentDIAs> | |
MergeNode (const Comparator &comparator, const ParentDIA0 &parent0, const ParentDIAs &... parents) | |
void | Dispose () final |
Virtual clear method. Triggers actual disposing in sub-classes. More... | |
void | Execute () final |
Virtual execution method. Triggers actual computation in sub-classes. More... | |
bool | OnPreOpFile (const data::File &file, size_t parent_index) final |
Receive a whole data::File of ValueType, but only if our stack is empty. More... | |
void | PushData (bool consume) final |
Virtual method for pushing data. Triggers actual pushing in sub-classes. More... | |
void | StopPreOp (size_t parent_index) final |
Virtual method for preparing end of PushData. More... | |
Public Member Functions inherited from DOpNode< ValueType > | |
DOpNode (Context &ctx, const char *label, const std::initializer_list< size_t > &parent_ids, const std::initializer_list< DIABasePtr > &parents) | |
Constructor for a DOpNode, which sets references to the parent nodes. More... | |
DOpNode (Context &ctx, const char *label, std::vector< size_t > &&parent_ids, std::vector< DIABasePtr > &&parents) | |
Constructor for a DOpNode, which sets references to the parent nodes. More... | |
Public Member Functions inherited from DIANode< ValueType > | |
DIANode (Context &ctx, const char *label, const std::initializer_list< size_t > &parent_ids, const std::initializer_list< DIABasePtr > &parents) | |
Constructor for a DIANode, which sets references to the parent nodes. More... | |
DIANode (Context &ctx, const char *label, std::vector< size_t > &&parent_ids, std::vector< DIABasePtr > &&parents) | |
Constructor for a DIANode, which sets references to the parent nodes. More... | |
virtual void | AddChild (DIABase *node, const Callback &callback=Callback(), size_t parent_index=0) |
Enables children to push their "folded" function chains to their parent. More... | |
std::vector< DIABase * > | children () const override |
Returns the children of this DIABase. More... | |
void | PushFile (data::File &file, bool consume) const |
void | PushItem (const ValueType &item) const |
Method for derived classes to Push a single item to all children. More... | |
void | RemoveAllChildren () override |
void | RemoveChild (DIABase *node) override |
void | RunPushData () override |
Public Member Functions inherited from DIABase | |
DIABase (Context &ctx, const char *label, const std::initializer_list< size_t > &parent_ids, const std::initializer_list< DIABasePtr > &parents) | |
The constructor for a DIABase. More... | |
DIABase (Context &ctx, const char *label, std::vector< size_t > &&parent_ids, std::vector< DIABasePtr > &&parents) | |
The constructor for a DIABase. More... | |
DIABase (const DIABase &)=delete | |
non-copyable: delete copy-constructor More... | |
DIABase (DIABase &&)=default | |
move-constructor: default More... | |
virtual | ~DIABase () |
Virtual destructor for a DIABase. More... | |
virtual size_t | consume_counter () const |
Returns consume_counter_. More... | |
Context & | context () |
Returns the api::Context of this DIABase. More... | |
virtual void | DecConsumeCounter (size_t counter) |
const size_t & | dia_id () const |
return unique id of DIANode subclass as stored by StatsNode More... | |
virtual bool | ForwardDataOnly () const |
virtual void | IncConsumeCounter (size_t counter) |
const char * | label () const |
return label() of DIANode subclass as stored by StatsNode More... | |
mem::Manager & | mem_manager () |
Return the Context's memory manager. More... | |
DIABase & | operator= (const DIABase &)=delete |
non-copyable: delete assignment operator More... | |
DIABase & | operator= (DIABase &&)=default |
move-assignment operator: default More... | |
std::vector< size_t > | parent_ids () const |
Returns the parents of this DIABase. More... | |
const std::vector< DIABasePtr > & | parents () const |
Returns the parents of this DIABase. More... | |
void | RemoveParent (DIABase *p) |
Remove a parent. More... | |
virtual bool | RequireParentPushData (size_t) const |
void | RunScope () |
void | set_mem_limit (const DIAMemUse &mem_limit) |
void | set_state (const DIAState &state) |
virtual void | SetConsumeCounter (size_t counter) |
DIAState | state () const |
virtual DIAMemUse | PreOpMemUse () |
Amount of RAM used by PreOp after StartPreOp() More... | |
virtual void | StartPreOp (size_t) |
Virtual method for preparing start of PushData. More... | |
virtual DIAMemUse | ExecuteMemUse () |
Amount of RAM used by Execute() More... | |
virtual DIAMemUse | PushDataMemUse () |
Amount of RAM used by PushData() More... | |
Public Member Functions inherited from ReferenceCounter | |
ReferenceCounter () noexcept | |
new objects have zero reference count More... | |
ReferenceCounter (const ReferenceCounter &) noexcept | |
coping still creates a new object with zero reference count More... | |
~ReferenceCounter () | |
bool | dec_reference () const noexcept |
Call whenever resetting (i.e. More... | |
void | inc_reference () const noexcept |
Call whenever setting a pointer to the object. More... | |
ReferenceCounter & | operator= (const ReferenceCounter &) noexcept |
assignment operator, leaves pointers unchanged More... | |
size_t | reference_count () const noexcept |
Return the number of references to this object (for debugging) More... | |
bool | unique () const noexcept |
Test if the ReferenceCounter is referenced by only one CountingPtr. More... | |
Private Types | |
using | ArrayNumInputsSizeT = std::array< size_t, kNumInputs > |
using | StatsTimer = common::StatsTimerBaseStopped< stats_enabled > |
using | Super = DOpNode< ValueType > |
Private Member Functions | |
void | GetGlobalRanks (const std::vector< Pivot > &pivots, std::vector< size_t > &global_ranks, std::vector< ArrayNumInputsSizeT > &out_local_ranks, const std::vector< ArrayNumInputsSizeT > &left, const std::vector< ArrayNumInputsSizeT > &width) |
Calculates the global ranks of the given pivots. More... | |
void | MainOp () |
Receives elements from other workers and re-balance them, so each worker has the same amount after merging. More... | |
void | SearchStep (const std::vector< size_t > &global_ranks, const std::vector< ArrayNumInputsSizeT > &local_ranks, const std::vector< size_t > &target_ranks, std::vector< ArrayNumInputsSizeT > &left, std::vector< ArrayNumInputsSizeT > &width) |
Shrinks the search ranges according to the global ranks of the pivots. More... | |
void | SelectPivots (const std::vector< ArrayNumInputsSizeT > &left, const std::vector< ArrayNumInputsSizeT > &width, std::vector< Pivot > &out_pivots) |
Selects random global pivots for all splitter searches based on all worker's search ranges. More... | |
Static Private Member Functions | |
static std::string | VToStr (const std::vector< Pivot > &data) |
Logging helper to print vectors of vectors of pivots. More... | |
Private Attributes | |
Comparator | comparator_ |
Merge comparator. More... | |
data::FilePtr | files_ [kNumInputs] |
Files for intermediate storage. More... | |
const std::array< bool, kNumInputs > | parent_stack_empty_ |
Whether the parent stack is empty. More... | |
size_t | prefix_size_ |
Count of items on all prev workers. More... | |
Stats | stats_ |
Instance of merge statistics. More... | |
data::CatStreamPtr | streams_ [kNumInputs] |
Array of inbound CatStreams. More... | |
data::File::Writer | writers_ [kNumInputs] |
Writers to intermediate files. More... | |
Static Private Attributes | |
static constexpr bool | debug = false |
static constexpr bool | self_verify = debug && common::g_debug_mode |
static constexpr bool | stats_enabled = false |
Set this variable to true to enable generation and output of merge stats. More... | |
Additional Inherited Members | |
Public Types inherited from DOpNode< ValueType > | |
using | Super = DIANode< ValueType > |
Public Types inherited from DIANode< ValueType > | |
using | Callback = tlx::delegate< void(const ValueType &)> |
Public Types inherited from DIABase | |
using | DIABasePtr = tlx::CountingPtr< DIABase > |
Public Attributes inherited from DIABase | |
common::JsonLogger | logger_ |
Static Public Attributes inherited from DIABase | |
static constexpr size_t | kNeverConsume = static_cast<size_t>(-1) |
Never full consume. More... | |
Protected Attributes inherited from DIANode< ValueType > | |
std::vector< Child > | children_ |
Callback functions from the child nodes. More... | |
Protected Attributes inherited from DIABase | |
Context & | context_ |
associated Context More... | |
const size_t | dia_id_ |
DIA serial id. More... | |
const char *const | label_ |
DOp node static label. More... | |
DIAState | state_ = DIAState::NEW |
State of the DIANode. State is NEW on creation. More... | |
std::vector< DIABasePtr > | parents_ |
Parents of this DIABase. More... | |
DIAMemUse | mem_limit_ = 0 |
size_t | consume_counter_ = 1 |
|
private |
|
private |
|
inline |
Definition at line 91 of file merge.hpp.
References tlx::call_foreach_with_index(), MergeNode< ValueType, Comparator, kNumInputs >::comparator_, DIABase::context_, MergeNode< ValueType, Comparator, kNumInputs >::files_, Context::GetFilePtr(), MergeNode< ValueType, Comparator, kNumInputs >::parent_stack_empty_, DIABase::parents(), and MergeNode< ValueType, Comparator, kNumInputs >::writers_.
|
inlinefinalvirtual |
|
inlinefinalvirtual |
Virtual execution method. Triggers actual computation in sub-classes.
Implements DIABase.
Definition at line 156 of file merge.hpp.
References MergeNode< ValueType, Comparator, kNumInputs >::MainOp().
|
inlineprivate |
Calculates the global ranks of the given pivots.
Additionally returns the local ranks so we can use them in the next step.
Definition at line 377 of file merge.hpp.
References FlowControlChannel::AllReduce(), MergeNode< ValueType, Comparator, kNumInputs >::Stats::comm_timer_, DIABase::context_, MergeNode< ValueType, Comparator, kNumInputs >::Stats::file_op_timer_, Context::net, and gen_data::value.
Referenced by MergeNode< ValueType, Comparator, kNumInputs >::MainOp().
|
inlineprivate |
Receives elements from other workers and re-balance them, so each worker has the same amount after merging.
Definition at line 465 of file merge.hpp.
References tlx::abs_diff(), FlowControlChannel::AllReduce(), MergeNode< ValueType, Comparator, kNumInputs >::Stats::balancing_timer_, FlowControlChannel::Broadcast(), MergeNode< ValueType, Comparator, kNumInputs >::Stats::comm_timer_, MergeNode< ValueType, Comparator, kNumInputs >::comparator_, DIABase::context_, die, MergeNode< ValueType, Comparator, kNumInputs >::GetGlobalRanks(), Context::GetNewCatStream(), MergeNode< ValueType, Comparator, kNumInputs >::Stats::iterations_, LOG, LOG0, LOG1, Context::my_rank(), Context::net, Context::num_workers(), MergeNode< ValueType, Comparator, kNumInputs >::Stats::pivot_selection_timer_, MergeNode< ValueType, Comparator, kNumInputs >::Stats::scatter_timer_, MergeNode< ValueType, Comparator, kNumInputs >::Stats::search_step_timer_, MergeNode< ValueType, Comparator, kNumInputs >::SearchStep(), MergeNode< ValueType, Comparator, kNumInputs >::SelectPivots(), and MergeNode< ValueType, Comparator, kNumInputs >::VToStr().
Referenced by MergeNode< ValueType, Comparator, kNumInputs >::Execute().
|
inlinefinalvirtual |
Receive a whole data::File of ValueType, but only if our stack is empty.
Reimplemented from DIABase.
Definition at line 142 of file merge.hpp.
References MergeNode< ValueType, Comparator, kNumInputs >::files_, and MergeNode< ValueType, Comparator, kNumInputs >::parent_stack_empty_.
|
inlinefinalvirtual |
Virtual method for pushing data. Triggers actual pushing in sub-classes.
Implements DIABase.
Definition at line 160 of file merge.hpp.
References MergeNode< ValueType, Comparator, kNumInputs >::comparator_, DIABase::context_, MergeNode< ValueType, Comparator, kNumInputs >::Stats::merge_timer_, MergeNode< ValueType, Comparator, kNumInputs >::Stats::Print(), DIANode< ValueType >::PushItem(), MergeNode< ValueType, Comparator, kNumInputs >::Stats::result_size_, sLOG, MergeNode< ValueType, Comparator, kNumInputs >::stats_, and MergeNode< ValueType, Comparator, kNumInputs >::streams_.
|
inlineprivate |
Shrinks the search ranges according to the global ranks of the pivots.
global_ranks | The global ranks of all pivots. |
local_ranks | The local ranks of each pivot in each file. |
target_ranks | The desired ranks of the splitters we are looking for. |
left | The left bounds of all search ranges for all files. The first index identifies the splitter, the second index identifies the file. This parameter will be modified. |
width | The width of all search ranges for all files. The first index identifies the splitter, the second index identifies the file. This parameter will be modified. |
Definition at line 429 of file merge.hpp.
References die_unless.
Referenced by MergeNode< ValueType, Comparator, kNumInputs >::MainOp().
|
inlineprivate |
Selects random global pivots for all splitter searches based on all worker's search ranges.
left | The left bounds of all search ranges for all files. The first index identifies the splitter, the second index identifies the file. |
width | The width of all search ranges for all files. The first index identifies the splitter, the second index identifies the file. |
out_pivots | The output pivots. |
Definition at line 325 of file merge.hpp.
References FlowControlChannel::AllReduce(), MergeNode< ValueType, Comparator, kNumInputs >::Stats::comm_timer_, DIABase::context_, MergeNode< ValueType, Comparator, kNumInputs >::Stats::file_op_timer_, LOG, Context::net, Context::rng_, and MergeNode< ValueType, Comparator, kNumInputs >::VToStr().
Referenced by MergeNode< ValueType, Comparator, kNumInputs >::MainOp().
|
inlinefinalvirtual |
Virtual method for preparing end of PushData.
Reimplemented from DIABase.
Definition at line 152 of file merge.hpp.
References BlockWriter< BlockSink >::Close(), and MergeNode< ValueType, Comparator, kNumInputs >::writers_.
|
inlinestaticprivate |
Logging helper to print vectors of vectors of pivots.
Definition at line 217 of file merge.hpp.
Referenced by MergeNode< ValueType, Comparator, kNumInputs >::MainOp(), and MergeNode< ValueType, Comparator, kNumInputs >::SelectPivots().
|
private |
Merge comparator.
Definition at line 191 of file merge.hpp.
Referenced by MergeNode< ValueType, Comparator, kNumInputs >::MainOp(), MergeNode< ValueType, Comparator, kNumInputs >::MergeNode(), and MergeNode< ValueType, Comparator, kNumInputs >::PushData().
|
private |
Files for intermediate storage.
Definition at line 197 of file merge.hpp.
Referenced by MergeNode< ValueType, Comparator, kNumInputs >::MergeNode(), and MergeNode< ValueType, Comparator, kNumInputs >::OnPreOpFile().
|
private |
Whether the parent stack is empty.
Definition at line 194 of file merge.hpp.
Referenced by MergeNode< ValueType, Comparator, kNumInputs >::MergeNode(), and MergeNode< ValueType, Comparator, kNumInputs >::OnPreOpFile().
|
private |
|
staticprivate |
|
private |
Instance of merge statistics.
Definition at line 310 of file merge.hpp.
Referenced by MergeNode< ValueType, Comparator, kNumInputs >::PushData().
|
staticprivate |
|
private |
Array of inbound CatStreams.
Definition at line 203 of file merge.hpp.
Referenced by MergeNode< ValueType, Comparator, kNumInputs >::PushData().
|
private |
Writers to intermediate files.
Definition at line 200 of file merge.hpp.
Referenced by MergeNode< ValueType, Comparator, kNumInputs >::MergeNode(), MergeNode< ValueType, Comparator, kNumInputs >::RegisterParent::operator()(), and MergeNode< ValueType, Comparator, kNumInputs >::StopPreOp().