Thrill  0.1
rebalance.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/rebalance.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_API_REBALANCE_HEADER
13 #define THRILL_API_REBALANCE_HEADER
14 
15 #include <thrill/api/dia.hpp>
16 #include <thrill/api/dop_node.hpp>
17 #include <thrill/common/logger.hpp>
18 #include <thrill/data/file.hpp>
19 
20 #include <algorithm>
21 #include <vector>
22 
23 namespace thrill {
24 namespace api {
25 
26 /*!
27  * \ingroup api_layer
28  */
29 template <typename ValueType>
30 class RebalanceNode final : public DOpNode<ValueType>
31 {
32  static constexpr bool debug = false;
33 
34 public:
36  using Super::context_;
37 
38  template <typename ParentDIA>
39  explicit RebalanceNode(const ParentDIA& parent)
40  : Super(parent.ctx(), "Rebalance", { parent.id() }, { parent.node() }),
41  parent_stack_empty_(ParentDIA::stack_empty) {
42 
43  auto save_fn = [this](const ValueType& input) {
44  writer_.Put(input);
45  };
46  auto lop_chain = parent.stack().push(save_fn).fold();
47  parent.node()->AddChild(this, lop_chain);
48  }
49 
50  bool OnPreOpFile(const data::File& file, size_t /* parent_index */) final {
51  if (!parent_stack_empty_) {
53  << "Rebalance rejected File from parent "
54  << "due to non-empty function stack.";
55  return false;
56  }
57  assert(file_.num_items() == 0);
58  file_ = file.Copy();
59  return true;
60  }
61 
62  void StopPreOp(size_t /* parent_index */) final {
63  // Push local elements to children
64  writer_.Close();
65  }
66 
67  //! Executes the rebalance operation.
68  void Execute() final {
69  LOG << "RebalanceNode::Execute() processing";
70 
71  size_t local_size;
72  local_size = file_.num_items();
73  sLOG << "local_size" << local_size;
74 
75  size_t local_rank = local_size;
76  size_t global_size = context_.net.ExPrefixSumTotal(local_rank);
77  sLOG << "local_rank" << local_rank;
78  sLOG << "global_size" << global_size;
79 
80  const size_t num_workers = context_.num_workers();
81  const double pre_pe =
82  static_cast<double>(global_size) / static_cast<double>(num_workers);
83 
84  // calculate offset vector
85  std::vector<size_t> offsets(num_workers + 1, 0);
86  for (size_t p = 0; p < num_workers; ++p) {
87  size_t limit = static_cast<size_t>(static_cast<double>(p) * pre_pe);
88  if (limit < local_rank) continue;
89 
90  offsets[p] = std::min(limit - local_rank, file_.num_items());
91  }
92  offsets[num_workers] = file_.num_items();
93  LOG << "offsets = " << offsets;
94 
95  stream_->template ScatterConsume<ValueType>(file_, offsets);
96  }
97 
98  void PushData(bool consume) final {
99  auto reader = stream_->GetCatReader(consume);
100  while (reader.HasNext()) {
101  this->PushItem(reader.template Next<ValueType>());
102  }
103  }
104 
105  void Dispose() final {
106  file_.Clear();
107  }
108 
109 private:
110  //! Local data file
112  //! Data writer to local file (only active in PreOp).
114  //! Whether the parent stack is empty
116 
117  //! CatStream for exchange
119 };
120 
121 template <typename ValueType, typename Stack>
123  assert(IsValid());
125  return DIA<ValueType>(tlx::make_counting<RebalanceNode>(*this));
126 }
127 
128 } // namespace api
129 } // namespace thrill
130 
131 #endif // !THRILL_API_REBALANCE_HEADER
132 
133 /******************************************************************************/
net::FlowControlChannel & net
Definition: context.hpp:446
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
static constexpr bool debug
Definition: rebalance.hpp:32
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:251
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
void Clear()
Free all Blocks in the File and deallocate vectors.
Definition: file.cpp:57
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
static constexpr bool g_debug_push_file
Definition: config.hpp:44
const bool parent_stack_empty_
Whether the parent stack is empty.
Definition: rebalance.hpp:115
data::File::Writer writer_
Data writer to local file (only active in PreOp).
Definition: rebalance.hpp:113
void StopPreOp(size_t) final
Virtual method for preparing end of PushData.
Definition: rebalance.hpp:62
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1209
RebalanceNode(const ParentDIA &parent)
Definition: rebalance.hpp:39
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
Definition: rebalance.hpp:98
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:283
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSumTotal(T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, and delivers the total sum as well...
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: rebalance.hpp:105
bool OnPreOpFile(const data::File &file, size_t) final
Definition: rebalance.hpp:50
void Execute() final
Executes the rebalance operation.
Definition: rebalance.hpp:68
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
auto Rebalance() const
Rebalance is a DOp, which rebalances a single DIA among all workers; in general, this operation is ne...
Definition: rebalance.hpp:122
File Copy() const
Return a copy of the File (explicit copy-constructor)
Definition: file.cpp:42
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
size_t num_items() const
Return the number of items in the file.
Definition: file.hpp:180
void Close()
Explicitly close the writer.
data::File file_
Local data file.
Definition: rebalance.hpp:111
data::CatStreamPtr stream_
CatStream for exchange.
Definition: rebalance.hpp:118
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Definition: file.cpp:63
Context & context_
associated Context
Definition: dia_base.hpp:293
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21