Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
gather.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/gather.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_API_GATHER_HEADER
13 #define THRILL_API_GATHER_HEADER
14 
16 #include <thrill/api/dia.hpp>
17 
18 #include <iostream>
19 #include <vector>
20 
21 namespace thrill {
22 namespace api {
23 
24 /*!
25  * \ingroup api_layer
26  */
27 template <typename ValueType>
28 class GatherNode final : public ActionResultNode<std::vector<ValueType> >
29 {
30 public:
32  using Super::context_;
33 
34  template <typename ParentDIA>
35  GatherNode(const ParentDIA& parent, const char* label,
36  size_t target_id,
37  std::vector<ValueType>* out_vector)
38  : Super(parent.ctx(), label,
39  { parent.id() }, { parent.node() }),
40  target_id_(target_id),
41  out_vector_(out_vector)
42  {
43  assert(target_id_ < context_.num_workers());
44 
45  auto pre_op_fn = [this](const ValueType& input) {
46  emitters_[target_id_].Put(input);
47  };
48 
49  // close the function stack with our pre op and register it at parent
50  // node for output
51  auto lop_chain = parent.stack().push(pre_op_fn).fold();
52  parent.node()->AddChild(this, lop_chain);
53  }
54 
55  void StartPreOp(size_t /* id */) final {
56  emitters_ = stream_->GetWriters();
57 
58  // close all but the target
59  for (size_t i = 0; i < emitters_.size(); i++) {
60  if (i == target_id_) continue;
61  emitters_[i].Close();
62  }
63  }
64 
65  void StopPreOp(size_t /* id */) final {
66  emitters_[target_id_].Close();
67  }
68 
69  void Execute() final {
70  auto reader = stream_->GetCatReader(true /* consume */);
71 
72  while (reader.HasNext()) {
73  out_vector_->push_back(reader.template Next<ValueType>());
74  }
75  }
76 
77  const std::vector<ValueType>& result() const final {
78  return *out_vector_;
79  }
80 
81 private:
82  //! target worker id, which collects vector, all other workers do not get
83  //! the data.
84  size_t target_id_;
85  //! Vector pointer to write elements to.
86  std::vector<ValueType>* out_vector_;
87 
90 };
91 
92 template <typename ValueType, typename Stack>
93 std::vector<ValueType>
94 DIA<ValueType, Stack>::Gather(size_t target_id) const {
95  assert(IsValid());
96 
98 
99  std::vector<ValueType> output;
100 
101  auto node = tlx::make_counting<GatherNode>(
102  *this, "Gather", target_id, &output);
103 
104  node->RunScope();
105 
106  return output;
107 }
108 
109 template <typename ValueType, typename Stack>
111  size_t target_id, std::vector<ValueType>* out_vector) const {
112  assert(IsValid());
113 
115 
116  auto node = tlx::make_counting<GatherNode>(
117  *this, "Gather", target_id, out_vector);
118 
119  node->RunScope();
120 }
121 
122 } // namespace api
123 } // namespace thrill
124 
125 #endif // !THRILL_API_GATHER_HEADER
126 
127 /******************************************************************************/
data::CatStream::Writers emitters_
Definition: gather.hpp:89
virtual const std::vector< ValueType > & result() const =0
virtual method to return result via an ActionFuture
std::vector< ValueType > Gather(size_t target_id=0) const
Gather is an Action, which collects all data of the DIA into a vector at the given worker...
Definition: gather.hpp:94
const char * label() const
return label() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:218
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1120
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
StreamData::Writers Writers
Definition: stream.hpp:40
GatherNode(const ParentDIA &parent, const char *label, size_t target_id, std::vector< ValueType > *out_vector)
Definition: gather.hpp:35
data::CatStreamPtr stream_
Definition: gather.hpp:88
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
std::vector< ValueType > * out_vector_
Vector pointer to write elements to.
Definition: gather.hpp:39
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:248
Context & context_
associated Context
Definition: dia_base.hpp:293