Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
dia_base.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/dia_base.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
7  * Copyright (C) 2016 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #include <thrill/api/dia_base.hpp>
14 #include <thrill/common/logger.hpp>
16 #include <thrill/mem/allocator.hpp>
17 
18 #include <algorithm>
19 #include <chrono>
20 #include <deque>
21 #include <functional>
22 #include <iomanip>
23 #include <set>
24 #include <string>
25 #include <utility>
26 #include <vector>
27 
28 namespace thrill {
29 namespace api {
30 
31 /******************************************************************************/
32 // DIABase StageBuilder
33 
34 class Stage
35 {
36 public:
37  static constexpr bool debug = false;
38 
39  explicit Stage(const DIABasePtr& node)
40  : node_(node), context_(node->context()),
41  verbose_(context_.mem_config().verbose_)
42  { }
43 
44  //! iterate over all target nodes into which this Stage pushes
45  template <typename Lambda>
46  void Targets(const Lambda& lambda) const {
47  std::vector<DIABase*> children = node_->children();
48  std::reverse(children.begin(), children.end());
49 
50  while (!children.empty()) {
51  DIABase* child = children.back();
52  children.pop_back();
53 
54  if (child->ForwardDataOnly()) {
55  // push children of Collapse onto stack
56  std::vector<DIABase*> sub = child->children();
57  children.insert(children.end(), sub.begin(), sub.end());
58  lambda(child);
59  }
60  else {
61  lambda(child);
62  }
63  }
64  }
65 
66  //! compute a string to show all target nodes into which this Stage pushes.
67  std::string TargetsString() const {
68  std::ostringstream oss;
69  std::vector<DIABase*> children = node_->children();
70  std::reverse(children.begin(), children.end());
71  bool first = true;
72 
73  oss << '[';
74  while (!children.empty())
75  {
76  DIABase* child = children.back();
77  children.pop_back();
78 
79  if (child == nullptr) {
80  oss << ']';
81  }
82  else if (child->ForwardDataOnly()) {
83  // push children of Collapse onto stack
84  std::vector<DIABase*> sub = child->children();
85  children.push_back(nullptr);
86  children.insert(children.end(), sub.begin(), sub.end());
87  if (first)
88  first = false;
89  else
90  oss << ' ';
91 
92  oss << *child << ' ' << '[';
93  first = true;
94  }
95  else {
96  if (first)
97  first = false;
98  else
99  oss << ' ';
100 
101  oss << *child;
102  }
103  }
104  oss << ']';
105  return oss.str();
106  }
107 
108  std::vector<size_t> TargetIds() const {
109  std::vector<size_t> ids;
110  Targets([&ids](DIABase* child) { ids.emplace_back(child->id()); });
111  return ids;
112  }
113 
114  std::vector<DIABase*> TargetPtrs() const {
115  std::vector<DIABase*> children;
116  Targets([&children](DIABase* child) { children.emplace_back(child); });
117  return children;
118  }
119 
120  void Execute() {
121  sLOG << "START (EXECUTE) stage" << *node_ << "targets" << TargetsString();
122 
123  if (context_.my_rank() == 0) {
124  sLOGC(verbose_)
125  << "Execute() stage" << *node_;
126  }
127 
128  std::vector<size_t> target_ids = TargetIds();
129 
130  logger_ << "class" << "StageBuilder" << "event" << "execute-start"
131  << "targets" << target_ids;
132 
133  DIAMemUse mem_use = node_->ExecuteMemUse();
134  if (mem_use.is_max())
135  mem_use = context_.mem_limit();
136  node_->set_mem_limit(mem_use);
137 
138  // old: acquire memory from BlockPool -tb
139  // data::BlockPoolMemoryHolder mem_holder(context_.block_pool(), mem_use);
140 
142  try {
143  node_->Execute();
144  }
145  catch (std::exception& e) {
146  LOG1 << "StageBuilder: caught exception from Execute()"
147  << " of stage " << *node_ << " - what(): " << e.what();
148  throw;
149  }
150  node_->set_state(DIAState::EXECUTED);
151  timer.Stop();
152 
153  sLOG << "FINISH (EXECUTE) stage" << *node_ << "targets" << TargetsString()
154  << "took" << timer << "ms";
155 
156  logger_ << "class" << "StageBuilder" << "event" << "execute-done"
157  << "targets" << target_ids << "elapsed" << timer;
158 
159  LOG << "DIA bytes: " << node_->context().block_pool().total_bytes();
160  }
161 
162  void PushData() {
163  sLOG << "START (PUSHDATA) stage" << *node_ << "targets" << TargetsString();
164 
165  if (context_.my_rank() == 0) {
166  sLOGC(verbose_)
167  << "PushData() stage" << *node_
168  << "with targets" << TargetsString();
169  }
170 
171  if (context_.consume() && node_->consume_counter() == 0) {
172  sLOG1 << "StageBuilder: attempt to PushData from"
173  << "stage" << *node_ << "to" << TargetsString()
174  << "failed, it was already consumed. Add .Keep()";
175  abort();
176  }
177 
178  std::vector<size_t> target_ids = TargetIds();
179 
180  logger_ << "class" << "StageBuilder" << "event" << "pushdata-start"
181  << "targets" << target_ids;
182 
183  // collect memory requests of source node and all targeted children
184 
185  std::vector<DIABase*> targets = TargetPtrs();
186 
187  const size_t mem_limit = context_.mem_limit();
188  std::vector<DIABase*> max_mem_nodes;
189  size_t const_mem = 0;
190 
191  {
192  // process node which will PushData() to targets
193  DIAMemUse m = node_->PushDataMemUse();
194  if (m.is_max()) {
195  max_mem_nodes.emplace_back(node_.get());
196  }
197  else {
198  const_mem += m.limit();
199  node_->set_mem_limit(m.limit());
200  }
201  }
202  {
203  // process nodes which will receive data
204  for (DIABase* target : TargetPtrs()) {
205  DIAMemUse m = target->PreOpMemUse();
206  if (m.is_max()) {
207  max_mem_nodes.emplace_back(target);
208  }
209  else {
210  const_mem += m.limit();
211  target->set_mem_limit(m.limit());
212  }
213  }
214  }
215 
216  if (const_mem > mem_limit) {
217  LOG1 << "StageBuilder: constant memory usage of DIANodes in Stage: "
218  << const_mem
219  << ", already exceeds Context's mem_limit: " << mem_limit;
220  abort();
221  }
222 
223  // distribute remaining memory to nodes requesting maximum RAM amount
224 
225  if (!max_mem_nodes.empty()) {
226  size_t remaining_mem = mem_limit - const_mem;
227  remaining_mem /= max_mem_nodes.size();
228 
229  if (context_.my_rank() == 0) {
230  LOG << "StageBuilder: distribute remaining worker memory "
231  << remaining_mem << " to "
232  << max_mem_nodes.size() << " DIANodes";
233  }
234 
235  for (DIABase* target : max_mem_nodes) {
236  target->set_mem_limit(remaining_mem);
237  }
238 
239  // update const_mem: later allocate the mem limit of this worker
240  const_mem = mem_limit;
241  }
242 
243  // execute push data: hold memory for DIANodes, and remove filled
244  // children afterwards
245 
246  // old: acquire memory from BlockPool
247  // data::BlockPoolMemoryHolder mem_holder(context_.block_pool(), const_mem);
248 
250  try {
251  node_->RunPushData();
252  }
253  catch (std::exception& e) {
254  LOG1 << "StageBuilder: caught exception from PushData()"
255  << " of stage " << *node_ << " targets " << TargetsString()
256  << " - what(): " << e.what();
257  throw;
258  }
259  node_->RemoveAllChildren();
260  timer.Stop();
261 
262  sLOG << "FINISH (PUSHDATA) stage" << *node_ << "targets" << TargetsString()
263  << "took" << timer << "ms";
264 
265  logger_ << "class" << "StageBuilder" << "event" << "pushdata-done"
266  << "targets" << target_ids << "elapsed" << timer;
267 
268  LOG << "DIA bytes: " << node_->context().block_pool().total_bytes();
269  }
270 
271  //! order for std::set in FindStages() - this must be deterministic such
272  //! that DIAs on different workers are executed in the same order.
273  bool operator < (const Stage& s) const {
274  return node_->id() < s.node_->id();
275  }
276 
277  //! shared pointer to node
278  DIABasePtr node_;
279 
280  //! reference to Context of node
281  Context& context_;
282 
283  //! reference to node's Logger.
284  common::JsonLogger& logger_ { node_->logger_ };
285 
286  //! StageBuilder verbosity flag from MemoryConfig
287  bool verbose_;
288 
289  //! temporary marker for toposort to detect cycles
290  mutable bool cycle_mark_ = false;
291 
292  //! toposort seen marker
293  mutable bool topo_seen_ = false;
294 };
295 
296 template <typename T>
297 using mm_set = std::set<T, std::less<T>, mem::Allocator<T> >;
298 
299 //! Do a BFS on parents to find all DIANodes (Stages) needed to Execute or
300 //! PushData to calculate this action node.
301 static void FindStages(
302  Context& ctx, const DIABasePtr& action, mm_set<Stage>* stages) {
303  static constexpr bool debug = Stage::debug;
304 
305  if (ctx.my_rank() == 0)
306  LOG << "Finding Stages:";
307 
308  mem::deque<DIABasePtr> bfs_stack(
310 
311  bfs_stack.push_back(action);
312  stages->insert(Stage(action));
313 
314  while (!bfs_stack.empty()) {
315  DIABasePtr curr = bfs_stack.front();
316  bfs_stack.pop_front();
317 
318  const std::vector<DIABasePtr>& parents = curr->parents();
319 
320  for (size_t i = 0; i < parents.size(); ++i) {
321  const DIABasePtr& p = parents[i];
322 
323  // if parent was already seen, done.
324  if (stages->count(Stage(p)) != 0) continue;
325 
326  if (!curr->ForwardDataOnly()) {
327  if (ctx.my_rank() == 0)
328  LOG << " Stage: " << *p;
329  stages->emplace(p);
330  // If parent was not executed push it to the BFS queue and
331  // continue upwards. if state is EXECUTED, then we only need to
332  // PushData(), which is already indicated by stages.insert().
333  if (p->state() == DIAState::NEW)
334  bfs_stack.push_back(p);
335  }
336  else {
337  // If parent cannot hold data continue upward.
338  if (curr->RequireParentPushData(i)) {
339  if (ctx.my_rank() == 0)
340  LOG << " Stage: " << *p;
341  stages->emplace(p);
342  bfs_stack.push_back(p);
343  }
344  }
345  }
346  }
347 }
348 
349 static void TopoSortVisit(
350  const Stage& s, mm_set<Stage>* stages, mem::vector<Stage>* result) {
351  // check markers
352  die_unless(!s.cycle_mark_ && "Cycle in toposort of Stages? Impossible.");
353  if (s.topo_seen_) return;
354 
355  s.cycle_mark_ = true;
356  // iterate over all children of s which are in the to-be-calculate stages
357  for (DIABase* child : s.node_->children()) {
358  auto it = stages->find(Stage(DIABasePtr(child)));
359 
360  // child not in stage set
361  if (it == stages->end()) continue;
362 
363  // depth-first search
364  TopoSortVisit(*it, stages, result);
365  }
366 
367  s.topo_seen_ = true;
368  s.cycle_mark_ = false;
369  result->push_back(s);
370 }
371 
372 static void TopoSortStages(mm_set<Stage>* stages, mem::vector<Stage>* result) {
373  // iterate over all stages and visit nodes in DFS search
374  for (const Stage& s : *stages) {
375  if (s.topo_seen_) continue;
376  TopoSortVisit(s, stages, result);
377  }
378 }
379 
381  static constexpr bool debug = Stage::debug;
382 
383  LOG << "DIABase::Execute() this=" << *this;
384 
385  if (state_ == DIAState::EXECUTED) {
386  LOG << "DIA node " << *this << " was already executed.";
387  return;
388  }
389 
390  if (ForwardDataOnly()) {
391  // CollapseNodes cannot be executed: execute their parent(s)
392  for (const DIABasePtr& p : parents_)
393  p->RunScope();
394  return;
395  }
396 
397  mm_set<Stage> stages {
399  };
400  FindStages(context_, DIABasePtr(this), &stages);
401 
402  mem::vector<Stage> toporder {
404  };
405  TopoSortStages(&stages, &toporder);
406 
407  if (context_.my_rank() == 0) {
408  LOG << "Topological order";
409  for (auto top = toporder.rbegin(); top != toporder.rend(); ++top) {
410  LOG << " " << *top->node_;
411  }
412  }
413 
414  assert(toporder.front().node_.get() == this);
415 
416  while (!toporder.empty())
417  {
418  Stage& s = toporder.back();
419 
420  if (s.node_->ForwardDataOnly()) {
421  toporder.pop_back();
422  continue;
423  }
424 
425  if (debug)
427 
428  if (s.node_->state() == DIAState::NEW) {
429  s.Execute();
430  if (s.node_.get() != this)
431  s.PushData();
432  }
433  else if (s.node_->state() == DIAState::EXECUTED) {
434  if (s.node_.get() != this)
435  s.PushData();
436  }
437 
438  // remove from result stack, this may destroy the last CountingPtr
439  // reference to a node.
440  toporder.pop_back();
441  }
442 }
443 
444 /******************************************************************************/
445 // DIABase
446 
447 //! make ostream-able.
448 std::ostream& operator << (std::ostream& os, const DIABase& d) {
449  return os << d.label() << '.' << d.id();
450 }
451 
452 } // namespace api
453 } // namespace thrill
454 
455 /******************************************************************************/
The DIABase has not been computed yet.
std::set< T, std::less< T >, mem::Allocator< T > > mm_set
Definition: dia_base.cpp:297
DIAState state_
State of the DIANode. State is NEW on creation.
Definition: dia_base.hpp:307
#define die_unless(X)
Definition: die.hpp:52
#define sLOG1
Definition: logger.hpp:183
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:245
#define LOG1
Definition: logger.hpp:171
const char * label() const
return label() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:218
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:210
void malloc_tracker_print_status()
user function which prints current and peak allocation to stderr
The DIABase is the untyped super class of DIANode.
Definition: dia_base.hpp:87
const size_t & id() const
return unique id() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213
static void FindStages(Context &ctx, const DIABasePtr &action, mm_set< Stage > *stages)
Definition: dia_base.cpp:301
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:179
mem::Manager & mem_manager()
Return the Context's memory manager.
Definition: dia_base.hpp:278
std::vector< DIABasePtr > parents_
Parents of this DIABase.
Definition: dia_base.hpp:310
StatsTimerBaseStarted< true > StatsTimerStart
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
static void TopoSortStages(mm_set< Stage > *stages, mem::vector< Stage > *result)
Definition: dia_base.cpp:372
static constexpr bool debug
std::vector< T, Allocator< T > > vector
vector with Manager tracking
Definition: allocator.hpp:228
static void TopoSortVisit(const Stage &s, mm_set< Stage > *stages, mem::vector< Stage > *result)
Definition: dia_base.cpp:349
#define sLOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:174
std::deque< T, Allocator< T > > deque
deque with Manager tracking
Definition: allocator.hpp:232
tlx::CountingPtr< DIABase > DIABasePtr
Definition: dia_base.hpp:90
virtual bool ForwardDataOnly() const
Definition: dia_base.hpp:148
const std::vector< DIABasePtr > & parents() const
Returns the parents of this DIABase.
Definition: dia_base.hpp:253
virtual bool RequireParentPushData(size_t) const
Definition: dia_base.hpp:153
tlx::CountingPtr< DIABase > DIABasePtr
Definition: dia_base.hpp:334
friend std::ostream & operator<<(std::ostream &os, const DIABase &d)
make ostream-able.
Definition: dia_base.cpp:448
DIAState state() const
Definition: dia_base.hpp:282
Context & context_
associated Context
Definition: dia_base.hpp:293
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:167