Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
gather.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/gather.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_API_GATHER_HEADER
13 #define THRILL_API_GATHER_HEADER
14 
16 #include <thrill/api/dia.hpp>
17 
18 #include <iostream>
19 #include <vector>
20 
21 namespace thrill {
22 namespace api {
23 
24 /*!
25  * \ingroup api_layer
26  */
27 template <typename ValueType>
28 class GatherNode final : public ActionResultNode<std::vector<ValueType> >
29 {
30 public:
32  using Super::context_;
33 
34  template <typename ParentDIA>
35  GatherNode(const ParentDIA& parent, const char* label,
36  size_t target_id,
37  std::vector<ValueType>* out_vector)
38  : Super(parent.ctx(), label,
39  { parent.id() }, { parent.node() }),
40  target_id_(target_id),
41  out_vector_(out_vector) {
42  assert(target_id_ < context_.num_workers());
43 
44  auto pre_op_fn = [this](const ValueType& input) {
45  emitters_[target_id_].Put(input);
46  };
47 
48  // close the function stack with our pre op and register it at parent
49  // node for output
50  auto lop_chain = parent.stack().push(pre_op_fn).fold();
51  parent.node()->AddChild(this, lop_chain);
52  }
53 
54  void StartPreOp(size_t /* parent_index */) final {
55  emitters_ = stream_->GetWriters();
56 
57  // close all but the target
58  for (size_t i = 0; i < emitters_.size(); i++) {
59  if (i == target_id_) continue;
60  emitters_[i].Close();
61  }
62  }
63 
64  void StopPreOp(size_t /* parent_index */) final {
65  emitters_[target_id_].Close();
66  }
67 
68  void Execute() final {
69  auto reader = stream_->GetCatReader(true /* consume */);
70 
71  while (reader.HasNext()) {
72  out_vector_->push_back(reader.template Next<ValueType>());
73  }
74  }
75 
76  const std::vector<ValueType>& result() const final {
77  return *out_vector_;
78  }
79 
80 private:
81  //! target worker id, which collects vector, all other workers do not get
82  //! the data.
83  size_t target_id_;
84  //! Vector pointer to write elements to.
85  std::vector<ValueType>* out_vector_;
86 
89 };
90 
91 template <typename ValueType, typename Stack>
92 std::vector<ValueType>
93 DIA<ValueType, Stack>::Gather(size_t target_id) const {
94  assert(IsValid());
95 
97 
98  std::vector<ValueType> output;
99 
100  auto node = tlx::make_counting<GatherNode>(
101  *this, "Gather", target_id, &output);
102 
103  node->RunScope();
104 
105  return output;
106 }
107 
108 template <typename ValueType, typename Stack>
110  size_t target_id, std::vector<ValueType>* out_vector) const {
111  assert(IsValid());
112 
114 
115  auto node = tlx::make_counting<GatherNode>(
116  *this, "Gather", target_id, out_vector);
117 
118  node->RunScope();
119 }
120 
121 } // namespace api
122 } // namespace thrill
123 
124 #endif // !THRILL_API_GATHER_HEADER
125 
126 /******************************************************************************/
data::CatStream::Writers emitters_
Definition: gather.hpp:88
virtual const std::vector< ValueType > & result() const =0
virtual method to return result via an ActionFuture
std::vector< ValueType > Gather(size_t target_id=0) const
Gather is an Action, which collects all data of the DIA into a vector at the given worker...
Definition: gather.hpp:93
const char * label() const
return label() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:218
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1147
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
StreamData::Writers Writers
Definition: stream.hpp:40
GatherNode(const ParentDIA &parent, const char *label, size_t target_id, std::vector< ValueType > *out_vector)
Definition: gather.hpp:35
data::CatStreamPtr stream_
Definition: gather.hpp:87
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
std::vector< ValueType > * out_vector_
Vector pointer to write elements to.
Definition: gather.hpp:39
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:251
Context & context_
associated Context
Definition: dia_base.hpp:293