Thrill  0.1
distribute.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/distribute.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_API_DISTRIBUTE_HEADER
13 #define THRILL_API_DISTRIBUTE_HEADER
14 
15 #include <thrill/api/dia.hpp>
17 #include <thrill/common/logger.hpp>
18 
19 #include <tlx/vector_free.hpp>
20 
21 #include <vector>
22 
23 namespace thrill {
24 namespace api {
25 
26 //! \ingroup api_layer
27 //! \{
28 
29 /*!
30  * \ingroup api_layer
31  */
32 template <typename ValueType>
33 class DistributeNode final : public SourceNode<ValueType>
34 {
35 public:
37  using Super::context_;
38 
40  const std::vector<ValueType>& in_vector,
41  size_t source_id)
42  : Super(ctx, "Distribute"),
43  in_vector_(in_vector),
44  source_id_(source_id)
45  { }
46 
48  std::vector<ValueType>&& in_vector,
49  size_t source_id)
50  : Super(ctx, "Distribute"),
51  in_vector_(std::move(in_vector)),
52  source_id_(source_id)
53  { }
54 
55  //! Executes the scatter operation: source sends out its data.
56  void Execute() final {
57 
58  data::CatStream::Writers emitters = stream_->GetWriters();
59 
60  if (context_.my_rank() == source_id_)
61  {
62  size_t in_size = in_vector_.size();
63 
64  for (size_t w = 0; w < emitters.size(); ++w) {
65 
66  common::Range local =
67  common::CalculateLocalRange(in_size, emitters.size(), w);
68 
69  for (size_t i = local.begin; i < local.end; ++i) {
70  emitters[w].Put(in_vector_[i]);
71  }
72  }
73  }
74  }
75 
76  void PushData(bool consume) final {
77  data::CatStream::CatReader readers = stream_->GetCatReader(consume);
78 
79  while (readers.HasNext()) {
80  this->PushItem(readers.Next<ValueType>());
81  }
82  }
83 
84  void Dispose() final {
86  stream_.reset();
87  }
88 
89 private:
90  //! Vector pointer to read elements from.
91  std::vector<ValueType> in_vector_;
92  //! source worker id, which sends vector
93  size_t source_id_;
94 
96 };
97 
98 /*!
99  * Distribute is a Source DOp, which scatters the vector data from the
100  * source_id to all workers, partitioning equally, and returning the data in a
101  * DIA.
102  *
103  * \ingroup dia_sources
104  */
105 template <typename ValueType>
107  Context& ctx,
108  const std::vector<ValueType>& in_vector, size_t source_id = 0) {
109 
111 
112  auto node = tlx::make_counting<DistributeNode>(
113  ctx, in_vector, source_id);
114 
115  return DIA<ValueType>(node);
116 }
117 
118 /*!
119  * Distribute is a Source DOp, which scatters the vector data from the
120  * source_id to all workers, partitioning equally, and returning the data in a
121  * DIA.
122  *
123  * \ingroup dia_sources
124  */
125 template <typename ValueType>
127  Context& ctx,
128  std::vector<ValueType>&& in_vector, size_t source_id = 0) {
129 
131 
132  auto node = tlx::make_counting<DistributeNode>(
133  ctx, std::move(in_vector), source_id);
134 
135  return DIA<ValueType>(node);
136 }
137 
138 } // namespace api
139 
140 //! imported from api namespace
141 using api::Distribute;
142 
143 } // namespace thrill
144 
145 #endif // !THRILL_API_DISTRIBUTE_HEADER
146 
147 /******************************************************************************/
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
Definition: distribute.hpp:76
size_t source_id_
source worker id, which sends vector
Definition: distribute.hpp:93
STL namespace.
void reset()
release contained pointer, frees object if this is the last reference.
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:41
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
data::CatStreamPtr stream_
Definition: distribute.hpp:95
DistributeNode(Context &ctx, const std::vector< ValueType > &in_vector, size_t source_id)
Definition: distribute.hpp:39
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:59
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1209
static Range CalculateLocalRange(size_t global_size, size_t p, size_t i)
Definition: math.hpp:110
size_t end
end index
Definition: math.hpp:58
void Execute() final
Executes the scatter operation: source sends out its data.
Definition: distribute.hpp:56
DistributeNode(Context &ctx, std::vector< ValueType > &&in_vector, size_t source_id)
Definition: distribute.hpp:47
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
void vector_free(std::vector< Type > &v)
Definition: vector_free.hpp:21
auto Distribute(Context &ctx, const std::vector< ValueType > &in_vector, size_t source_id=0)
Distribute is a Source DOp, which scatters the vector data from the source_id to all workers...
Definition: distribute.hpp:106
size_t begin
begin index
Definition: math.hpp:56
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
std::vector< ValueType > in_vector_
Vector pointer to read elements from.
Definition: distribute.hpp:91
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: distribute.hpp:84
Context & context_
associated Context
Definition: dia_base.hpp:293