Thrill  0.1
dia_base.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/dia_base.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
7  * Copyright (C) 2016 Timo Bingmann <[email protected]>
8  * Copyright (C) 2017 Tim Zeitz <[email protected]>
9  *
10  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11  ******************************************************************************/
12 
13 #include <thrill/api/dia_base.hpp>
15 #include <thrill/common/logger.hpp>
17 #include <thrill/mem/allocator.hpp>
18 
19 #include <algorithm>
20 #include <chrono>
21 #include <deque>
22 #include <functional>
23 #include <iomanip>
24 #include <set>
25 #include <string>
26 #include <utility>
27 #include <vector>
28 
29 namespace thrill {
30 namespace api {
31 
32 /******************************************************************************/
33 // DIABase StageBuilder
34 
35 class Stage
36 {
37 public:
38  static constexpr bool debug = false;
39 
40  explicit Stage(const DIABasePtr& node)
41  : node_(node), context_(node->context()),
42  verbose_(context_.mem_config().verbose_)
43  { }
44 
45  //! iterate over all target nodes into which this Stage pushes
46  template <typename Lambda>
47  void Targets(const Lambda& lambda) const {
48  std::vector<DIABase*> children = node_->children();
49  std::reverse(children.begin(), children.end());
50 
51  while (!children.empty()) {
52  DIABase* child = children.back();
53  children.pop_back();
54 
55  if (child->ForwardDataOnly()) {
56  // push children of Collapse onto stack
57  std::vector<DIABase*> sub = child->children();
58  children.insert(children.end(), sub.begin(), sub.end());
59  lambda(child);
60  }
61  else {
62  lambda(child);
63  }
64  }
65  }
66 
67  //! compute a string to show all target nodes into which this Stage pushes.
68  std::string TargetsString() const {
69  std::ostringstream oss;
70  std::vector<DIABase*> children = node_->children();
71  std::reverse(children.begin(), children.end());
72  bool first = true;
73 
74  oss << '[';
75  while (!children.empty())
76  {
77  DIABase* child = children.back();
78  children.pop_back();
79 
80  if (child == nullptr) {
81  oss << ']';
82  }
83  else if (child->ForwardDataOnly()) {
84  // push children of Collapse onto stack
85  std::vector<DIABase*> sub = child->children();
86  children.push_back(nullptr);
87  children.insert(children.end(), sub.begin(), sub.end());
88  if (first)
89  first = false;
90  else
91  oss << ' ';
92 
93  oss << *child << ' ' << '[';
94  first = true;
95  }
96  else {
97  if (first)
98  first = false;
99  else
100  oss << ' ';
101 
102  oss << *child;
103  }
104  }
105  oss << ']';
106  return oss.str();
107  }
108 
109  std::vector<size_t> TargetIds() const {
110  std::vector<size_t> ids;
111  Targets([&ids](DIABase* child) { ids.emplace_back(child->dia_id()); });
112  return ids;
113  }
114 
115  std::vector<DIABase*> TargetPtrs() const {
116  std::vector<DIABase*> children;
117  Targets([&children](DIABase* child) { children.emplace_back(child); });
118  return children;
119  }
120 
121  void Execute() {
122  sLOG << "START (EXECUTE) stage" << *node_ << "targets" << TargetsString();
123 
124  if (context_.my_rank() == 0) {
125  sLOGC(verbose_)
126  << "Execute() stage" << *node_;
127  }
128 
129  std::vector<size_t> target_ids = TargetIds();
130 
131  logger_ << "class" << "StageBuilder" << "event" << "execute-start"
132  << "targets" << target_ids;
133 
134  DIAMemUse mem_use = node_->ExecuteMemUse();
135  if (mem_use.is_max())
136  mem_use = context_.mem_limit();
137  node_->set_mem_limit(mem_use);
138 
139  // old: acquire memory from BlockPool -tb
140  // data::BlockPoolMemoryHolder mem_holder(context_.block_pool(), mem_use);
141 
143  try {
144  node_->Execute();
145  }
146  catch (std::exception& e) {
147  LOG1 << "StageBuilder: caught exception from Execute()"
148  << " of stage " << *node_ << " - what(): " << e.what();
149  throw;
150  }
151  node_->set_state(DIAState::EXECUTED);
152  timer.Stop();
153 
154  sLOG << "FINISH (EXECUTE) stage" << *node_ << "targets" << TargetsString()
155  << "took" << timer << "ms";
156 
157  logger_ << "class" << "StageBuilder" << "event" << "execute-done"
158  << "targets" << target_ids << "elapsed" << timer;
159 
160  LOG << "DIA bytes: " << node_->context().block_pool().total_bytes();
161  }
162 
163  void PushData() {
164  sLOG << "START (PUSHDATA) stage" << *node_ << "targets" << TargetsString();
165 
166  if (context_.my_rank() == 0) {
167  sLOGC(verbose_)
168  << "PushData() stage" << *node_
169  << "with targets" << TargetsString();
170  }
171 
172  if (context_.consume() && node_->consume_counter() == 0) {
173  sLOG1 << "StageBuilder: attempt to PushData from"
174  << "stage" << *node_ << "to" << TargetsString()
175  << "failed, it was already consumed. Add .Keep()";
176  abort();
177  }
178 
179  std::vector<size_t> target_ids = TargetIds();
180 
181  logger_ << "class" << "StageBuilder" << "event" << "pushdata-start"
182  << "targets" << target_ids;
183 
184  // collect memory requests of source node and all targeted children
185 
186  std::vector<DIABase*> targets = TargetPtrs();
187 
188  const size_t mem_limit = context_.mem_limit();
189  std::vector<DIABase*> max_mem_nodes;
190  size_t const_mem = 0;
191 
192  {
193  // process node which will PushData() to targets
194  DIAMemUse m = node_->PushDataMemUse();
195  if (m.is_max()) {
196  max_mem_nodes.emplace_back(node_.get());
197  }
198  else {
199  const_mem += m.limit();
200  node_->set_mem_limit(m.limit());
201  }
202  }
203  {
204  // process nodes which will receive data
205  for (DIABase* target : TargetPtrs()) {
206  DIAMemUse m = target->PreOpMemUse();
207  if (m.is_max()) {
208  max_mem_nodes.emplace_back(target);
209  }
210  else {
211  const_mem += m.limit();
212  target->set_mem_limit(m.limit());
213  }
214  }
215  }
216 
217  if (const_mem > mem_limit) {
218  LOG1 << "StageBuilder: constant memory usage of DIANodes in Stage: "
219  << const_mem
220  << ", already exceeds Context's mem_limit: " << mem_limit;
221  abort();
222  }
223 
224  // distribute remaining memory to nodes requesting maximum RAM amount
225 
226  if (!max_mem_nodes.empty()) {
227  size_t remaining_mem = mem_limit - const_mem;
228  remaining_mem /= max_mem_nodes.size();
229 
230  if (context_.my_rank() == 0) {
231  LOG << "StageBuilder: distribute remaining worker memory "
232  << remaining_mem << " to "
233  << max_mem_nodes.size() << " DIANodes";
234  }
235 
236  for (DIABase* target : max_mem_nodes) {
237  target->set_mem_limit(remaining_mem);
238  }
239 
240  // update const_mem: later allocate the mem limit of this worker
241  const_mem = mem_limit;
242  }
243 
244  // execute push data: hold memory for DIANodes, and remove filled
245  // children afterwards
246 
247  // old: acquire memory from BlockPool
248  // data::BlockPoolMemoryHolder mem_holder(context_.block_pool(), const_mem);
249 
251  try {
252  node_->RunPushData();
253  }
254  catch (std::exception& e) {
255  LOG1 << "StageBuilder: caught exception from PushData()"
256  << " of stage " << *node_ << " targets " << TargetsString()
257  << " - what(): " << e.what();
258  throw;
259  }
260  node_->RemoveAllChildren();
261  timer.Stop();
262 
263  sLOG << "FINISH (PUSHDATA) stage" << *node_ << "targets" << TargetsString()
264  << "took" << timer << "ms";
265 
266  logger_ << "class" << "StageBuilder" << "event" << "pushdata-done"
267  << "targets" << target_ids << "elapsed" << timer;
268 
269  LOG << "DIA bytes: " << node_->context().block_pool().total_bytes();
270  }
271 
272  //! order for std::set in FindStages() - this must be deterministic such
273  //! that DIAs on different workers are executed in the same order.
274  bool operator < (const Stage& s) const {
275  return node_->dia_id() < s.node_->dia_id();
276  }
277 
278  //! shared pointer to node
279  DIABasePtr node_;
280 
281  //! reference to Context of node
282  Context& context_;
283 
284  //! reference to node's Logger.
285  common::JsonLogger& logger_ { node_->logger_ };
286 
287  //! StageBuilder verbosity flag from MemoryConfig
288  bool verbose_;
289 
290  //! temporary marker for toposort to detect cycles
291  mutable bool cycle_mark_ = false;
292 
293  //! toposort seen marker
294  mutable bool topo_seen_ = false;
295 };
296 
297 template <typename T>
298 using mm_set = std::set<T, std::less<T>, mem::Allocator<T> >;
299 
300 //! Do a BFS on parents to find all DIANodes (Stages) needed to Execute or
301 //! PushData to calculate this action node.
302 static void FindStages(
303  Context& ctx, const DIABasePtr& action, mm_set<Stage>* stages) {
304  static constexpr bool debug = Stage::debug;
305 
306  if (ctx.my_rank() == 0)
307  LOG << "Finding Stages:";
308 
309  mem::deque<DIABasePtr> bfs_stack(
310  mem::Allocator<DIABasePtr>(action->mem_manager()));
311 
312  bfs_stack.push_back(action);
313  stages->insert(Stage(action));
314 
315  while (!bfs_stack.empty()) {
316  DIABasePtr curr = bfs_stack.front();
317  bfs_stack.pop_front();
318 
319  const std::vector<DIABasePtr>& parents = curr->parents();
320 
321  for (size_t i = 0; i < parents.size(); ++i) {
322  const DIABasePtr& p = parents[i];
323 
324  // if parent was already seen, done.
325  if (stages->count(Stage(p)) != 0) continue;
326 
327  if (!curr->ForwardDataOnly()) {
328  if (ctx.my_rank() == 0)
329  LOG << " Stage: " << *p;
330  stages->emplace(p);
331  // If parent was not executed push it to the BFS queue and
332  // continue upwards. if state is EXECUTED, then we only need to
333  // PushData(), which is already indicated by stages.insert().
334  if (p->state() == DIAState::NEW)
335  bfs_stack.push_back(p);
336  }
337  else {
338  // If parent cannot hold data continue upward.
339  if (curr->RequireParentPushData(i)) {
340  if (ctx.my_rank() == 0)
341  LOG << " Stage: " << *p;
342  stages->emplace(p);
343  bfs_stack.push_back(p);
344  }
345  }
346  }
347  }
348 }
349 
350 static void TopoSortVisit(
351  const Stage& s, mm_set<Stage>* stages, mem::vector<Stage>* result) {
352  // check markers
353  die_unless(!s.cycle_mark_ && "Cycle in toposort of Stages? Impossible.");
354  if (s.topo_seen_) return;
355 
356  s.cycle_mark_ = true;
357  // iterate over all children of s which are in the to-be-calculate stages
358  for (DIABase* child : s.node_->children()) {
359  auto it = stages->find(Stage(DIABasePtr(child)));
360 
361  // child not in stage set
362  if (it == stages->end()) continue;
363 
364  // depth-first search
365  TopoSortVisit(*it, stages, result);
366  }
367 
368  s.topo_seen_ = true;
369  s.cycle_mark_ = false;
370  result->push_back(s);
371 }
372 
373 static void TopoSortStages(mm_set<Stage>* stages, mem::vector<Stage>* result) {
374  // iterate over all stages and visit nodes in DFS search
375  for (const Stage& s : *stages) {
376  if (s.topo_seen_) continue;
377  TopoSortVisit(s, stages, result);
378  }
379 }
380 
382  static constexpr bool debug = Stage::debug;
383 
384  LOG << "DIABase::Execute() this=" << *this;
385 
386  if (state_ == DIAState::EXECUTED) {
387  LOG << "DIA node " << *this << " was already executed.";
388  return;
389  }
390 
391  if (ForwardDataOnly()) {
392  // CollapseNodes cannot be executed: execute their parent(s)
393  for (const DIABasePtr& p : parents_)
394  p->RunScope();
395  return;
396  }
397 
398  mm_set<Stage> stages {
399  mem::Allocator<Stage>(mem_manager())
400  };
401  FindStages(context_, DIABasePtr(this), &stages);
402 
403  mem::vector<Stage> toporder {
404  mem::Allocator<Stage>(mem_manager())
405  };
406  TopoSortStages(&stages, &toporder);
407 
408  if (context_.my_rank() == 0) {
409  LOG << "Topological order";
410  for (auto top = toporder.rbegin(); top != toporder.rend(); ++top) {
411  LOG << " " << *top->node_;
412  }
413  }
414 
415  assert(toporder.front().node_.get() == this);
416 
417  while (!toporder.empty())
418  {
419  Stage& s = toporder.back();
420 
421  if (s.node_->ForwardDataOnly()) {
422  toporder.pop_back();
423  continue;
424  }
425 
426  if (debug)
428 
429  if (s.node_->state() == DIAState::NEW) {
430  s.Execute();
431  if (s.node_.get() != this)
432  s.PushData();
433  }
434  else if (s.node_->state() == DIAState::EXECUTED) {
435  if (s.node_.get() != this)
436  s.PushData();
437  }
438 
439  // remove from result stack, this may destroy the last CountingPtr
440  // reference to a node.
441  toporder.pop_back();
442  }
443 }
444 
445 /******************************************************************************/
446 // DIABase
447 
448 //! make ostream-able.
449 std::ostream& operator << (std::ostream& os, const DIABase& d) {
450  return os << d.label() << '.' << d.dia_id();
451 }
452 
453 } // namespace api
454 } // namespace thrill
455 
456 /******************************************************************************/
The DIABase has not been computed yet.
std::set< T, std::less< T >, mem::Allocator< T > > mm_set
Definition: dia_base.cpp:298
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
#define die_unless(X)
Definition: die.hpp:27
#define LOG1
Definition: logger.hpp:28
#define sLOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:31
const char * label() const
return label() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:218
#define sLOG1
Definition: logger.hpp:38
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
void malloc_tracker_print_status()
user function which prints current and peak allocation to stderr
The DIABase is the untyped super class of DIANode.
Definition: dia_base.hpp:87
static void FindStages(Context &ctx, const DIABasePtr &action, mm_set< Stage > *stages)
Definition: dia_base.cpp:302
StatsTimerBaseStarted< true > StatsTimerStart
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
static void TopoSortStages(mm_set< Stage > *stages, mem::vector< Stage > *result)
Definition: dia_base.cpp:373
static constexpr bool debug
std::vector< T, Allocator< T > > vector
vector with Manager tracking
Definition: allocator.hpp:228
static void TopoSortVisit(const Stage &s, mm_set< Stage > *stages, mem::vector< Stage > *result)
Definition: dia_base.cpp:350
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
const size_t & dia_id() const
return unique id of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213
std::deque< T, Allocator< T > > deque
deque with Manager tracking
Definition: allocator.hpp:232
tlx::CountingPtr< DIABase > DIABasePtr
Definition: dia_base.hpp:334
bool operator<(const uint_pair &b) const
less-than comparison operator
Definition: uint_types.hpp:187
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
std::ostream & operator<<(std::ostream &os, const DIABase &d)
make ostream-able.
Definition: dia_base.cpp:449