12 #ifndef THRILL_API_REBALANCE_HEADER 13 #define THRILL_API_REBALANCE_HEADER 29 template <
typename ValueType>
32 static constexpr
bool debug =
false;
38 template <
typename ParentDIA>
40 :
Super(parent.ctx(),
"Rebalance", { parent.id() }, { parent.node() }),
43 auto save_fn = [
this](
const ValueType& input) {
46 auto lop_chain = parent.stack().push(save_fn).fold();
47 parent.node()->AddChild(
this, lop_chain);
53 <<
"Rebalance rejected File from parent " 54 <<
"due to non-empty function stack.";
69 LOG <<
"RebalanceNode::Execute() processing";
73 sLOG <<
"local_size" << local_size;
75 size_t local_rank = local_size;
77 sLOG <<
"local_rank" << local_rank;
78 sLOG <<
"global_size" << global_size;
82 static_cast<double>(global_size) / static_cast<double>(num_workers);
85 std::vector<size_t> offsets(num_workers + 1, 0);
86 for (
size_t p = 0; p < num_workers; ++p) {
87 size_t limit =
static_cast<size_t>(
static_cast<double>(p) * pre_pe);
88 if (limit < local_rank)
continue;
93 LOG <<
"offsets = " << offsets;
95 stream_->template ScatterConsume<ValueType>(
file_, offsets);
99 auto reader =
stream_->GetCatReader(consume);
100 while (reader.HasNext()) {
101 this->
PushItem(reader.template Next<ValueType>());
121 template <
typename ValueType,
typename Stack>
131 #endif // !THRILL_API_REBALANCE_HEADER net::FlowControlChannel & net
#define sLOG
Default logging method: output if the local debug variable is true.
DIA is the interface between the user and the Thrill framework.
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
static constexpr bool debug
size_t num_workers() const
Global number of workers in the system.
A File is an ordered sequence of Block objects for storing items.
void Clear()
Free all Blocks in the File and deallocate vectors.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
static constexpr bool g_debug_push_file
const bool parent_stack_empty_
Whether the parent stack is empty.
data::File::Writer writer_
Data writer to local file (only active in PreOp).
void StopPreOp(size_t) final
Virtual method for preparing end of PushData.
data::CatStreamPtr GetNewCatStream(size_t dia_id)
RebalanceNode(const ParentDIA &parent)
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSumTotal(T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, and delivers the total sum as well...
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
bool OnPreOpFile(const data::File &file, size_t) final
void Execute() final
Executes the rebalance operation.
A DOpNode is a typed node representing and distributed operations in Thrill.
static uint_pair min()
return an uint_pair instance containing the smallest value possible
auto Rebalance() const
Rebalance is a DOp, which rebalances a single DIA among all workers; in general, this operation is ne...
File Copy() const
Return a copy of the File (explicit copy-constructor)
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
size_t num_items() const
Return the number of items in the file.
void Close()
Explicitly close the writer.
data::File file_
Local data file.
data::CatStreamPtr stream_
CatStream for exchange.
#define LOG
Default logging method: output if the local debug variable is true.
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Context & context_
associated Context
#define LOGC(cond)
Explicitly specify the condition for logging.