Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
distribute.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/distribute.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_API_DISTRIBUTE_HEADER
13 #define THRILL_API_DISTRIBUTE_HEADER
14 
15 #include <thrill/api/dia.hpp>
17 #include <thrill/common/logger.hpp>
18 
19 #include <vector>
20 
21 namespace thrill {
22 namespace api {
23 
24 //! \ingroup api_layer
25 //! \{
26 
27 /*!
28  * \ingroup api_layer
29  */
30 template <typename ValueType>
31 class DistributeNode final : public SourceNode<ValueType>
32 {
33 public:
35  using Super::context_;
36 
38  const std::vector<ValueType>& in_vector,
39  size_t source_id)
40  : Super(ctx, "Distribute"),
41  in_vector_(in_vector),
42  source_id_(source_id)
43  { }
44 
46  std::vector<ValueType>&& in_vector,
47  size_t source_id)
48  : Super(ctx, "Distribute"),
49  in_vector_(std::move(in_vector)),
50  source_id_(source_id)
51  { }
52 
53  //! Executes the scatter operation: source sends out its data.
54  void Execute() final {
55 
56  data::CatStream::Writers emitters = stream_->GetWriters();
57 
58  if (context_.my_rank() == source_id_)
59  {
60  size_t in_size = in_vector_.size();
61 
62  for (size_t w = 0; w < emitters.size(); ++w) {
63 
64  common::Range local =
65  common::CalculateLocalRange(in_size, emitters.size(), w);
66 
67  for (size_t i = local.begin; i < local.end; ++i) {
68  emitters[w].Put(in_vector_[i]);
69  }
70  }
71  }
72  }
73 
74  void PushData(bool consume) final {
75  data::CatStream::CatReader readers = stream_->GetCatReader(consume);
76 
77  while (readers.HasNext()) {
78  this->PushItem(readers.Next<ValueType>());
79  }
80  }
81 
82  void Dispose() final {
83  std::vector<ValueType>().swap(in_vector_);
84  stream_.reset();
85  }
86 
87 private:
88  //! Vector pointer to read elements from.
89  std::vector<ValueType> in_vector_;
90  //! source worker id, which sends vector
91  size_t source_id_;
92 
94 };
95 
96 /*!
97  * Distribute is a Source DOp, which scatters the vector data from the
98  * source_id to all workers, partitioning equally, and returning the data in a
99  * DIA.
100  *
101  * \ingroup dia_sources
102  */
103 template <typename ValueType>
105  Context& ctx,
106  const std::vector<ValueType>& in_vector, size_t source_id = 0) {
107 
109 
110  auto node = tlx::make_counting<DistributeNode>(
111  ctx, in_vector, source_id);
112 
113  return DIA<ValueType>(node);
114 }
115 
116 /*!
117  * Distribute is a Source DOp, which scatters the vector data from the
118  * source_id to all workers, partitioning equally, and returning the data in a
119  * DIA.
120  *
121  * \ingroup dia_sources
122  */
123 template <typename ValueType>
125  Context& ctx,
126  std::vector<ValueType>&& in_vector, size_t source_id = 0) {
127 
129 
130  auto node = tlx::make_counting<DistributeNode>(
131  ctx, std::move(in_vector), source_id);
132 
133  return DIA<ValueType>(node);
134 }
135 
136 } // namespace api
137 
138 //! imported from api namespace
139 using api::Distribute;
140 
141 } // namespace thrill
142 
143 #endif // !THRILL_API_DISTRIBUTE_HEADER
144 
145 /******************************************************************************/
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
Definition: distribute.hpp:74
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:240
size_t source_id_
source worker id, which sends vector
Definition: distribute.hpp:91
void reset()
release contained pointer, frees object if this is the last reference.
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
represents a 1 dimensional range (interval) [begin,end)
Definition: math.hpp:40
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:218
data::CatStreamPtr stream_
Definition: distribute.hpp:93
DistributeNode(Context &ctx, const std::vector< ValueType > &in_vector, size_t source_id)
Definition: distribute.hpp:37
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1144
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
static Range CalculateLocalRange(size_t global_size, size_t p, size_t i)
Definition: math.hpp:109
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
size_t end
end index
Definition: math.hpp:57
void Execute() final
Executes the scatter operation: source sends out its data.
Definition: distribute.hpp:54
DistributeNode(Context &ctx, std::vector< ValueType > &&in_vector, size_t source_id)
Definition: distribute.hpp:45
auto Distribute(Context &ctx, const std::vector< ValueType > &in_vector, size_t source_id=0)
Distribute is a Source DOp, which scatters the vector data from the source_id to all workers...
Definition: distribute.hpp:104
size_t begin
begin index
Definition: math.hpp:55
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
std::vector< ValueType > in_vector_
Vector pointer to read elements from.
Definition: distribute.hpp:89
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: distribute.hpp:82
Context & context_
associated Context
Definition: dia_base.hpp:293