Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
concat.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/concat.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_API_CONCAT_HEADER
13 #define THRILL_API_CONCAT_HEADER
14 
15 #include <thrill/api/dia.hpp>
16 #include <thrill/api/dop_node.hpp>
18 #include <thrill/common/logger.hpp>
19 #include <thrill/common/string.hpp>
20 #include <thrill/data/file.hpp>
22 #include <tlx/meta/vexpand.hpp>
23 
24 #include <algorithm>
25 #include <initializer_list>
26 #include <vector>
27 
28 namespace thrill {
29 namespace api {
30 
31 /*!
32  * \ingroup api_layer
33  */
34 template <typename ValueType>
35 class ConcatNode final : public DOpNode<ValueType>
36 {
37 public:
38  static constexpr bool debug = false;
39 
41  using Super::context_;
42 
43  //! Constructor for variant with variadic parent parameter pack, which each
44  //! parent may have a different FunctionStack.
45  template <typename ParentDIA0, typename... ParentDIAs>
46  explicit ConcatNode(const ParentDIA0& parent0,
47  const ParentDIAs& ... parents)
48  : Super(parent0.ctx(), "Concat",
49  { parent0.id(), parents.id() ... },
50  { parent0.node(), parents.node() ... }),
51  num_inputs_(1 + sizeof ... (ParentDIAs)),
52  // parenthesis are due to a MSVC2015 parser bug
53  parent_stack_empty_({ ParentDIA0::stack_empty, (ParentDIAs::stack_empty)... })
54  {
55  PrintWarning();
56 
57  files_.reserve(num_inputs_);
58  writers_.reserve(num_inputs_);
59 
60  // allocate files.
61  for (size_t i = 0; i < num_inputs_; ++i)
62  files_.emplace_back(context_.GetFile(this));
63 
64  for (size_t i = 0; i < num_inputs_; ++i)
65  writers_.emplace_back(files_[i].GetWriter());
66 
68  RegisterParent(this), parent0, parents...);
69  }
70 
71  //! Constructor for variant with a std::vector of parents all with the same
72  //! (usually empty) FunctionStack.
73  template <typename ParentDIA>
74  explicit ConcatNode(const std::vector<ParentDIA>& parents)
75  : Super(parents.front().ctx(), "Concat",
76  common::MapVector(
77  parents, [](const ParentDIA& d) { return d.id(); }),
79  parents, [](const ParentDIA& d) {
80  return DIABasePtr(d.node().get());
81  })),
82  num_inputs_(parents.size()),
83  parent_stack_empty_({ ParentDIA::stack_empty })
84  {
85  PrintWarning();
86 
87  files_.reserve(num_inputs_);
88  writers_.reserve(num_inputs_);
89 
90  // allocate files.
91  for (size_t i = 0; i < num_inputs_; ++i)
92  files_.emplace_back(context_.GetFile(this));
93 
94  for (size_t i = 0; i < num_inputs_; ++i)
95  writers_.emplace_back(files_[i].GetWriter());
96 
97  for (size_t i = 0; i < num_inputs_; ++i)
98  {
99  // construct lambda with only the writer in the closure
100  data::File::Writer* writer = &writers_[i];
101  auto pre_op_fn = [writer](const ValueType& input) -> void {
102  writer->Put(input);
103  };
104 
105  // close the function stacks with our pre ops and register it at
106  // parent nodes for output
107  auto lop_chain = parents[i].stack().push(pre_op_fn).fold();
108  parents[i].node()->AddChild(this, lop_chain, i);
109  }
110  }
111 
112  //! Constructor for variant with a std::vector of parents all with the same
113  //! (usually empty) FunctionStack.
114  template <typename ParentDIA>
115  explicit ConcatNode(const std::initializer_list<ParentDIA>& parents)
116  : ConcatNode(std::vector<ParentDIA>(parents)) { }
117 
118  void PrintWarning() {
119  static bool warned_once = false;
120  if (warned_once) return;
121  warned_once = true;
122 
123  LOG1 << "Warning: Concat() is a _very_ expensive data shuffle operation"
124  << " which can usually be avoided.";
125  }
126 
127  //! Register Parent PreOp Hooks, instantiated and called for each Concat
128  //! parent
129  class RegisterParent
130  {
131  public:
132  explicit RegisterParent(ConcatNode* concat_node)
133  : concat_node_(concat_node) { }
134 
135  template <typename Index, typename Parent>
136  void operator () (const Index&, Parent& parent) {
137 
138  // construct lambda with only the writer in the closure
139  data::File::Writer* writer = &concat_node_->writers_[Index::index];
140  auto pre_op_fn = [writer](const ValueType& input) -> void {
141  writer->Put(input);
142  };
143 
144  // close the function stacks with our pre ops and register it at
145  // parent nodes for output
146  auto lop_chain = parent.stack().push(pre_op_fn).fold();
147 
148  parent.node()->AddChild(concat_node_, lop_chain, Index::index);
149  }
150 
151  private:
152  ConcatNode* concat_node_;
153  };
154 
155  //! Receive a whole data::File of ValueType, but only if our stack is empty.
156  bool OnPreOpFile(const data::File& file, size_t parent_index) final {
157  assert(parent_index < num_inputs_);
158 
159  if (num_inputs_ == parent_stack_empty_.size()) {
160  // ConcatNode was constructed from different parents
161  if (!parent_stack_empty_[parent_index]) {
163  << "Concat rejected File from parent "
164  << "due to non-empty function stack.";
165  return false;
166  }
167  }
168  else {
169  // ConcatNode was constructor with a vector of equal parents
170  if (!parent_stack_empty_[0]) {
172  << "Concat rejected File from parent "
173  << "due to non-empty function stack.";
174  return false;
175  }
176  }
177 
178  // accept file
179  assert(files_[parent_index].num_items() == 0);
180  files_[parent_index] = file.Copy();
181  return true;
182  }
183 
184  void StopPreOp(size_t id) final {
185  writers_[id].Close();
186  }
187 
188  //! Executes the concat operation.
189  void Execute() final {
190  LOG << "ConcatNode::Execute() processing";
191 
192  using VectorSizeT = std::vector<size_t>;
193 
194  VectorSizeT local_sizes(num_inputs_);
195  for (size_t i = 0; i < num_inputs_; ++i) {
196  local_sizes[i] = files_[i].num_items();
197  }
198  sLOG << "local_sizes" << local_sizes;
199 
200  VectorSizeT global_sizes = context_.net.AllReduce(
201  local_sizes, common::ComponentSum<VectorSizeT>());
202 
203  sLOG << "global_sizes" << global_sizes;
204 
205  // exclusive prefixsum of whole dia sizes
206  size_t total_items = 0;
207  for (size_t i = 0; i < num_inputs_; ++i) {
208 
209  size_t next_total_items = total_items + global_sizes[i];
210 
211  // on rank 0: add sum to local_sizes
212  if (context_.my_rank() == 0)
213  local_sizes[i] = total_items;
214 
215  total_items = next_total_items;
216  }
217 
218  sLOG << "local_sizes" << local_sizes;
219  sLOG << "total_items" << total_items;
220 
221  VectorSizeT local_ranks = context_.net.PrefixSum(
222  local_sizes, common::ComponentSum<VectorSizeT>(),
223  VectorSizeT(num_inputs_));
224 
225  sLOG << "local_ranks" << local_ranks;
226 
227  streams_.reserve(num_inputs_);
228  for (size_t i = 0; i < num_inputs_; ++i)
229  streams_.emplace_back(context_.GetNewCatStream(this));
230 
231  /*
232  * Data Exchange in Concat (Example):
233  *
234  * / worker0 \ / worker1 \ / worker2 \ / worker3 \
235  * |--256--| |--256--| |--256--| |--256--| DIA0
236  * |----512----| |----512----| |----512----| |----512----| DIA1
237  *
238  * In the steps above, we calculate the global rank of each DIA piece.
239  *
240  * Result:
241  * (global per PE split points)
242  * v v v v
243  * | | | |
244  * |256||256||256||256||--512--||--512--||--512--||--512--|
245  * |------------------||----------------------------------|
246  * (stream0) (stream1)
247  *
248  * With the global ranks of each DIA piece, one can calculate where it
249  * should go. We have to use k CatStreams for the data exchange, since
250  * the last PE must be able to transmit its piece to the first PE before
251  * all those in DIA1. The offset calculation below determines the global
252  * per_pe split point relative to the current DIA piece's global rank:
253  * if the pre_pe split is below the global rank, nothing needs to be
254  * send. Otherwise, one can send only that PE part to the PE. -tb
255  */
256  const size_t num_workers = context_.num_workers();
257  const double pre_pe =
258  static_cast<double>(total_items) / static_cast<double>(num_workers);
259 
260  for (size_t in = 0; in < num_inputs_; ++in) {
261 
262  // calculate offset vector
263  std::vector<size_t> offsets(num_workers + 1, 0);
264  for (size_t p = 0; p < num_workers; ++p) {
265  size_t limit =
266  static_cast<size_t>(static_cast<double>(p) * pre_pe);
267  if (limit < local_ranks[in]) continue;
268 
269  offsets[p] = std::min(limit - local_ranks[in],
270  files_[in].num_items());
271  }
272  offsets[num_workers] = files_[in].num_items();
273 
274  LOG << "offsets[" << in << "] = " << offsets;
275 
276  streams_[in]->template Scatter<ValueType>(
277  files_[in], offsets, /* consume */ true);
278  }
279  }
280 
281  void PushData(bool consume) final {
282 
283  size_t total = 0;
284  // concatenate all CatStreams
285  for (size_t in = 0; in < num_inputs_; ++in) {
287  streams_[in]->GetCatReader(consume);
288 
289  while (reader.HasNext()) {
290  this->PushItem(reader.Next<ValueType>());
291  ++total;
292  }
293  }
294  LOG << "total = " << total;
295  }
296 
297  void Dispose() final {
298  files_.clear();
299  writers_.clear();
300  streams_.clear();
301  }
302 
303 private:
304  //! number of input DIAs
305  const size_t num_inputs_;
306 
307  //! Whether the parent stack is empty
308  const std::vector<bool> parent_stack_empty_;
309 
310  //! Files for intermediate storage
311  std::vector<data::File> files_;
312  //! Writers to intermediate files
313  std::vector<data::File::Writer> writers_;
314 
315  //! Array of CatStreams for exchange
316  std::vector<data::CatStreamPtr> streams_;
317 };
318 
319 /*!
320  * Concat is a DOp, which concatenates any number of DIAs to a single DIA. All
321  * input DIAs must contain the same type, which is also the output DIA's type.
322  *
323  * The concat operation balances all input data, so that each worker will have
324  * an equal number of elements when the concat completes.
325  *
326  * \param first_dia first DIA
327  * \param dias DIAs, which are concatd with the first DIA.
328  *
329  * \ingroup dia_dops
330  */
331 template <typename FirstDIA, typename... DIAs>
332 auto Concat(const FirstDIA& first_dia, const DIAs& ... dias) {
333 
334  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
335 
336  using ValueType = typename FirstDIA::ValueType;
337 
339 
340  return DIA<ValueType>(tlx::make_counting<ConcatNode>(first_dia, dias...));
341 }
342 
343 /*!
344  * Concat is a DOp, which concatenates any number of DIAs to a single DIA. All
345  * input DIAs must contain the same type, which is also the output DIA's type.
346  *
347  * The concat operation balances all input data, so that each worker will have
348  * an equal number of elements when the concat completes.
349  *
350  * \param dias DIAs, which is concatenated.
351  *
352  * \ingroup dia_dops
353  */
354 template <typename ValueType>
355 auto Concat(const std::initializer_list<DIA<ValueType> >& dias) {
356 
357  for (const DIA<ValueType>& d : dias)
358  d.AssertValid();
359 
361 
362  return DIA<ValueType>(tlx::make_counting<ConcatNode>(dias));
363 }
364 
365 /*!
366  * Concat is a DOp, which concatenates any number of DIAs to a single DIA. All
367  * input DIAs must contain the same type, which is also the output DIA's type.
368  *
369  * The concat operation balances all input data, so that each worker will have
370  * an equal number of elements when the concat completes.
371  *
372  * \param dias DIAs, which is concatenated.
373  *
374  * \ingroup dia_dops
375  */
376 template <typename ValueType>
377 auto Concat(const std::vector<DIA<ValueType> >& dias) {
378 
379  for (const DIA<ValueType>& d : dias)
380  d.AssertValid();
381 
383 
384  return DIA<ValueType>(tlx::make_counting<ConcatNode>(dias));
385 }
386 
387 template <typename ValueType, typename Stack>
388 template <typename SecondDIA>
390  const SecondDIA& second_dia) const {
391  return api::Concat(*this, second_dia);
392 }
393 
394 } // namespace api
395 
396 //! imported from api namespace
397 using api::Concat;
398 
399 } // namespace thrill
400 
401 #endif // !THRILL_API_CONCAT_HEADER
402 
403 /******************************************************************************/
void StopPreOp(size_t id) final
Virtual method for preparing end of PushData.
Definition: concat.hpp:184
net::FlowControlChannel & net
Definition: context.hpp:443
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:152
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:240
void vexpand(Types &&...)
Definition: vexpand.hpp:24
#define LOG1
Definition: logger.hpp:145
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
auto Concat(const SecondDIA &second_dia) const
Concat is a DOp, which concatenates any number of DIAs to a single DIA.
Definition: concat.hpp:389
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
std::vector< data::CatStreamPtr > streams_
Array of CatStreams for exchange.
Definition: concat.hpp:316
static constexpr bool g_debug_push_file
Definition: config.hpp:44
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT AllReduce(const T &value, const BinarySumOp &sum_op=BinarySumOp())
Reduces a value of a serializable type T over all workers given a certain reduce function.
auto MapVector(const std::vector< Type > &input, const Functor &f) -> std::vector< typename std::result_of< Functor(Type)>::type >
Definition: functional.hpp:98
bool OnPreOpFile(const data::File &file, size_t parent_index) final
Receive a whole data::File of ValueType, but only if our stack is empty.
Definition: concat.hpp:156
BlockWriter< FileBlockSink > Writer
Definition: file.hpp:59
const size_t & id() const
return unique id() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213
template for computing the component-wise sum of std::array or std::vector.
Definition: functional.hpp:114
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1120
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
ConcatNode(const ParentDIA0 &parent0, const ParentDIAs &...parents)
Definition: concat.hpp:46
std::vector< data::File > files_
Files for intermediate storage.
Definition: concat.hpp:311
static constexpr bool debug
Definition: concat.hpp:38
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
Definition: concat.hpp:281
std::vector< data::File::Writer > writers_
Writers to intermediate files.
Definition: concat.hpp:313
std::vector< T, Allocator< T > > vector
vector with Manager tracking
Definition: allocator.hpp:228
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:280
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: concat.hpp:297
tlx::CountingPtr< DIABase > DIABasePtr
Definition: dia_base.hpp:90
void Execute() final
Executes the concat operation.
Definition: concat.hpp:189
DOpNode< ValueType > Super
Definition: concat.hpp:40
const std::vector< DIABasePtr > & parents() const
Returns the parents of this DIABase.
Definition: dia_base.hpp:253
void call_foreach_with_index(Functor &&f, Args &&...args)
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:248
auto Concat(const FirstDIA &first_dia, const DIAs &...dias)
Concat is a DOp, which concatenates any number of DIAs to a single DIA.
Definition: concat.hpp:332
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:141
Context & context_
associated Context
Definition: dia_base.hpp:293
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:137
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT PrefixSum(const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the inclusive prefix sum over all workers, given a certain sum operation.
const std::vector< bool > parent_stack_empty_
Whether the parent stack is empty.
Definition: concat.hpp:308
const size_t num_inputs_
number of input DIAs
Definition: concat.hpp:305