Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
all_gather.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/all_gather.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015 Sebastian Lamm <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_API_ALL_GATHER_HEADER
14 #define THRILL_API_ALL_GATHER_HEADER
15 
17 #include <thrill/api/dia.hpp>
18 
19 #include <vector>
20 
21 namespace thrill {
22 namespace api {
23 
24 /*!
25  * \ingroup api_layer
26  */
27 template <typename ValueType>
28 class AllGatherNode final : public ActionResultNode<std::vector<ValueType> >
29 {
30  static constexpr bool debug = false;
31 
32 public:
34  using Super::context_;
35 
36  template <typename ParentDIA>
37  AllGatherNode(const ParentDIA& parent,
38  std::vector<ValueType>* out_vector, bool ownership)
39  : Super(parent.ctx(), "AllGather",
40  { parent.id() }, { parent.node() }),
41  parent_stack_empty_(ParentDIA::stack_empty),
42  out_vector_(out_vector), ownership_(ownership)
43  {
44  auto pre_op_function = [this](const ValueType& input) {
45  PreOp(input);
46  };
47 
48  // close the function stack with our pre op and register it at parent
49  // node for output
50  auto lop_chain = parent.stack().push(pre_op_function).fold();
51  parent.node()->AddChild(this, lop_chain);
52  }
53 
54  ~AllGatherNode() {
55  if (ownership_ && out_vector_)
56  delete out_vector_;
57  }
58 
59  void StartPreOp(size_t /* id */) final {
60  emitters_ = stream_->GetWriters();
61  }
62 
63  void PreOp(const ValueType& element) {
64  for (size_t i = 0; i < emitters_.size(); i++) {
65  emitters_[i].Put(element);
66  }
67  }
68 
69  bool OnPreOpFile(const data::File& file, size_t /* parent_index */) final {
70  if (!parent_stack_empty_) {
72  << "AllGather rejected File from parent "
73  << "due to non-empty function stack.";
74  return false;
75  }
76  for (size_t i = 0; i < emitters_.size(); i++) {
77  emitters_[i].AppendBlocks(file.blocks());
78  }
79  return true;
80  }
81 
82  void StopPreOp(size_t /* id */) final {
83  // data has been pushed during pre-op -> close emitters
84  emitters_.Close();
85  }
86 
87  //! Closes the output file
88  void Execute() final {
89  auto reader = stream_->GetCatReader(/* consume */ true);
90  while (reader.HasNext()) {
91  out_vector_->push_back(reader.template Next<ValueType>());
92  }
93  stream_.reset();
94  }
95 
96  const std::vector<ValueType>& result() const final {
97  return *out_vector_;
98  }
99 
100 private:
101  //! Whether the parent stack is empty
102  const bool parent_stack_empty_;
103 
104  //! Vector pointer to write elements to.
105  std::vector<ValueType>* out_vector_;
106 
107  //! take ownership of vector
109 
112 };
113 
114 template <typename ValueType, typename Stack>
115 std::vector<ValueType> DIA<ValueType, Stack>::AllGather() const {
116  assert(IsValid());
117 
119 
120  std::vector<ValueType> output;
121 
122  auto node = tlx::make_counting<AllGatherNode>(
123  *this, &output, /* ownership */ false);
124 
125  node->RunScope();
126 
127  return output;
128 }
129 
130 template <typename ValueType, typename Stack>
131 void DIA<ValueType, Stack>::AllGather(std::vector<ValueType>* out_vector) const {
132  assert(IsValid());
133 
135 
136  auto node = tlx::make_counting<AllGatherNode>(
137  *this, out_vector, /* ownership */ false);
138 
139  node->RunScope();
140 }
141 
142 template <typename ValueType, typename Stack>
145  assert(IsValid());
146 
148 
149  std::vector<ValueType>* output = new std::vector<ValueType>();
150 
151  auto node = tlx::make_counting<AllGatherNode>(
152  *this, output, /* ownership */ true);
153 
154  return Future<std::vector<ValueType> >(node);
155 }
156 
157 } // namespace api
158 } // namespace thrill
159 
160 #endif // !THRILL_API_ALL_GATHER_HEADER
161 
162 /******************************************************************************/
std::vector< ValueType > AllGather() const
Returns the whole DIA in an std::vector on each worker.
Definition: all_gather.hpp:115
data::CatStreamPtr stream_
Definition: all_gather.hpp:110
Future< std::vector< ValueType > > AllGatherFuture() const
Returns the whole DIA in an std::vector on each worker.
Definition: all_gather.hpp:144
virtual const std::vector< ValueType > & result() const =0
virtual method to return result via an ActionFuture
void reset()
release contained pointer, frees object if this is the last reference.
static constexpr bool g_debug_push_file
Definition: config.hpp:44
AllGatherNode(const ParentDIA &parent, std::vector< ValueType > *out_vector, bool ownership)
Definition: all_gather.hpp:37
bool ownership_
take ownership of vector
Definition: all_gather.hpp:108
void Close()
custom destructor to close writers is a cyclic fashion
Definition: stream_data.cpp:66
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1120
static constexpr bool debug
Definition: all_gather.hpp:30
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
data::CatStream::Writers emitters_
Definition: all_gather.hpp:111
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
The return type class for all ActionFutures.
Definition: action_node.hpp:83
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
virtual bool OnPreOpFile(const data::File &, size_t)
Definition: dia_base.hpp:168
std::vector< ValueType > * out_vector_
Vector pointer to write elements to.
Definition: all_gather.hpp:40
Context & context_
associated Context
Definition: dia_base.hpp:293
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21