Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
all_gather.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/all_gather.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_API_ALL_GATHER_HEADER
14 #define THRILL_API_ALL_GATHER_HEADER
15 
17 #include <thrill/api/dia.hpp>
18 
19 #include <vector>
20 
21 namespace thrill {
22 namespace api {
23 
24 /*!
25  * \ingroup api_layer
26  */
27 template <typename ValueType>
28 class AllGatherNode final : public ActionResultNode<std::vector<ValueType> >
29 {
30  static constexpr bool debug = false;
31 
32 public:
34  using Super::context_;
35 
36  template <typename ParentDIA>
37  AllGatherNode(const ParentDIA& parent,
38  std::vector<ValueType>* out_vector, bool ownership)
39  : Super(parent.ctx(), "AllGather",
40  { parent.id() }, { parent.node() }),
41  parent_stack_empty_(ParentDIA::stack_empty),
42  out_vector_(out_vector), ownership_(ownership) {
43  auto pre_op_function = [this](const ValueType& input) {
44  PreOp(input);
45  };
46 
47  // close the function stack with our pre op and register it at parent
48  // node for output
49  auto lop_chain = parent.stack().push(pre_op_function).fold();
50  parent.node()->AddChild(this, lop_chain);
51  }
52 
53  ~AllGatherNode() {
54  if (ownership_ && out_vector_)
55  delete out_vector_;
56  }
57 
58  void StartPreOp(size_t /* parent_index */) final {
59  emitters_ = stream_->GetWriters();
60  }
61 
62  void PreOp(const ValueType& element) {
63  for (size_t i = 0; i < emitters_.size(); i++) {
64  emitters_[i].Put(element);
65  }
66  }
67 
68  bool OnPreOpFile(const data::File& file, size_t /* parent_index */) final {
69  if (!parent_stack_empty_) {
71  << "AllGather rejected File from parent "
72  << "due to non-empty function stack.";
73  return false;
74  }
75  for (size_t i = 0; i < emitters_.size(); i++) {
76  emitters_[i].AppendBlocks(file.blocks());
77  }
78  return true;
79  }
80 
81  void StopPreOp(size_t /* parent_index */) final {
82  // data has been pushed during pre-op -> close emitters
83  emitters_.Close();
84  }
85 
86  //! Closes the output file
87  void Execute() final {
88  auto reader = stream_->GetCatReader(/* consume */ true);
89  while (reader.HasNext()) {
90  out_vector_->push_back(reader.template Next<ValueType>());
91  }
92  stream_.reset();
93  }
94 
95  const std::vector<ValueType>& result() const final {
96  return *out_vector_;
97  }
98 
99 private:
100  //! Whether the parent stack is empty
101  const bool parent_stack_empty_;
102 
103  //! Vector pointer to write elements to.
104  std::vector<ValueType>* out_vector_;
105 
106  //! take ownership of vector
108 
111 };
112 
113 template <typename ValueType, typename Stack>
114 std::vector<ValueType> DIA<ValueType, Stack>::AllGather() const {
115  assert(IsValid());
116 
118 
119  std::vector<ValueType> output;
120 
121  auto node = tlx::make_counting<AllGatherNode>(
122  *this, &output, /* ownership */ false);
123 
124  node->RunScope();
125 
126  return output;
127 }
128 
129 template <typename ValueType, typename Stack>
130 void DIA<ValueType, Stack>::AllGather(std::vector<ValueType>* out_vector) const {
131  assert(IsValid());
132 
134 
135  auto node = tlx::make_counting<AllGatherNode>(
136  *this, out_vector, /* ownership */ false);
137 
138  node->RunScope();
139 }
140 
141 template <typename ValueType, typename Stack>
144  assert(IsValid());
145 
147 
148  std::vector<ValueType>* output = new std::vector<ValueType>();
149 
150  auto node = tlx::make_counting<AllGatherNode>(
151  *this, output, /* ownership */ true);
152 
153  return Future<std::vector<ValueType> >(node);
154 }
155 
156 } // namespace api
157 } // namespace thrill
158 
159 #endif // !THRILL_API_ALL_GATHER_HEADER
160 
161 /******************************************************************************/
std::vector< ValueType > AllGather() const
Returns the whole DIA in an std::vector on each worker.
Definition: all_gather.hpp:114
data::CatStreamPtr stream_
Definition: all_gather.hpp:109
Future< std::vector< ValueType > > AllGatherFuture() const
Returns the whole DIA in an std::vector on each worker.
Definition: all_gather.hpp:143
virtual const std::vector< ValueType > & result() const =0
virtual method to return result via an ActionFuture
void reset()
release contained pointer, frees object if this is the last reference.
static constexpr bool g_debug_push_file
Definition: config.hpp:44
AllGatherNode(const ParentDIA &parent, std::vector< ValueType > *out_vector, bool ownership)
Definition: all_gather.hpp:37
bool ownership_
take ownership of vector
Definition: all_gather.hpp:107
void Close()
custom destructor to close writers is a cyclic fashion
Definition: stream_data.cpp:66
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1151
static constexpr bool debug
Definition: all_gather.hpp:30
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
data::CatStream::Writers emitters_
Definition: all_gather.hpp:110
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
The return type class for all ActionFutures.
Definition: action_node.hpp:83
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
virtual bool OnPreOpFile(const data::File &, size_t)
Definition: dia_base.hpp:168
std::vector< ValueType > * out_vector_
Vector pointer to write elements to.
Definition: all_gather.hpp:40
Context & context_
associated Context
Definition: dia_base.hpp:293
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21