Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
size.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/size.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Matthias Stumpp <[email protected]>
7  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_API_SIZE_HEADER
14 #define THRILL_API_SIZE_HEADER
15 
17 #include <thrill/api/cache.hpp>
18 #include <thrill/api/dia.hpp>
19 #include <thrill/net/group.hpp>
20 
21 namespace thrill {
22 namespace api {
23 
24 /*!
25  * \ingroup api_layer
26  */
27 template <typename ValueType>
28 class SizeNode final : public ActionResultNode<size_t>
29 {
30  static constexpr bool debug = false;
31 
33  using Super::context_;
34 
35 public:
36  template <typename ParentDIA>
37  explicit SizeNode(const ParentDIA& parent)
38  : Super(parent.ctx(), "Size", { parent.id() }, { parent.node() }),
39  parent_stack_empty_(ParentDIA::stack_empty) {
40 
41  if (parent_stack_empty_ &&
42  dynamic_cast<CacheNode<ValueType>*>(parent.node().get()) != nullptr) {
43  // Add as child, but do not receive items via PreOp Hook.
44  LOG << "SizeNode: skipping callback, accessing CacheNode directly";
45  parent.node()->AddChild(this);
46  }
47  else {
48  // Hook PreOp(s)
49  auto pre_op_fn = [this](const ValueType&) { ++local_size_; };
50 
51  auto lop_chain = parent.stack().push(pre_op_fn).fold();
52  parent.node()->AddChild(this, lop_chain);
53  }
54  }
55 
56  //! Receive a whole data::File of ValueType, but only if our stack is empty.
57  bool OnPreOpFile(const data::File& file, size_t /* parent_index */) final {
58  if (!parent_stack_empty_) {
60  << "Size rejected File from parent "
61  << "due to non-empty function stack.";
62  return false;
63  }
64  local_size_ = file.num_items();
65  return true;
66  }
67 
68  //! Executes the size operation.
69  void Execute() final {
70 
71  // check if parent is CacheNode -> read number of items directly
72  CacheNode<ValueType>* parent_cache =
73  dynamic_cast<CacheNode<ValueType>*>(parents_[0].get());
74 
75  if (parent_stack_empty_ && parent_cache != nullptr)
76  local_size_ = parent_cache->NumItems();
77 
78  // get the number of elements that are stored on this worker
79  LOG << "MainOp processing, sum: " << local_size_;
80 
81  // process the reduce, default argument is SumOp.
82  global_size_ = context_.net.AllReduce(local_size_);
83  }
84 
85  //! Returns result of global size.
86  const size_t& result() const final {
87  return global_size_;
88  }
89 
90 private:
91  //! Whether the parent stack is empty
92  const bool parent_stack_empty_;
93  // Local size to be used.
94  size_t local_size_ = 0;
95  // Global size resulting from all reduce.
96  size_t global_size_ = 0;
97 };
98 
99 template <typename ValueType, typename Stack>
101  assert(IsValid());
102 
104  auto node = tlx::make_counting<SizeNode>(*this);
105  node->RunScope();
106  return node->result();
107 }
108 
109 template <typename ValueType, typename Stack>
111  assert(IsValid());
112 
114  auto node = tlx::make_counting<SizeNode>(*this);
115  return Future<size_t>(node);
116 }
117 
118 } // namespace api
119 } // namespace thrill
120 
121 #endif // !THRILL_API_SIZE_HEADER
122 
123 /******************************************************************************/
net::FlowControlChannel & net
Definition: context.hpp:443
virtual const size_t & result() const =0
virtual method to return result via an ActionFuture
size_t global_size_
Definition: size.hpp:96
static constexpr bool g_debug_push_file
Definition: config.hpp:44
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT AllReduce(const T &value, const BinarySumOp &sum_op=BinarySumOp())
Reduces a value of a serializable type T over all workers given a certain reduce function.
size_t local_size_
Definition: size.hpp:94
SizeNode(const ParentDIA &parent)
Definition: size.hpp:37
std::vector< DIABasePtr > parents_
Parents of this DIABase.
Definition: dia_base.hpp:310
Future< size_t > SizeFuture() const
Lazily computes the total size of all elements across all workers.
Definition: size.hpp:110
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:167
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
The return type class for all ActionFutures.
Definition: action_node.hpp:83
virtual bool OnPreOpFile(const data::File &, size_t)
Definition: dia_base.hpp:168
static constexpr bool debug
Definition: size.hpp:30
size_t Size() const
Computes the total size of all elements across all workers.
Definition: size.hpp:100
Context & context_
associated Context
Definition: dia_base.hpp:293
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:172