Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
concat.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/concat.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_API_CONCAT_HEADER
13 #define THRILL_API_CONCAT_HEADER
14 
15 #include <thrill/api/dia.hpp>
16 #include <thrill/api/dop_node.hpp>
18 #include <thrill/common/logger.hpp>
19 #include <thrill/common/string.hpp>
20 #include <thrill/data/file.hpp>
22 #include <tlx/meta/vexpand.hpp>
23 
24 #include <algorithm>
25 #include <initializer_list>
26 #include <vector>
27 
28 namespace thrill {
29 namespace api {
30 
31 /*!
32  * \ingroup api_layer
33  */
34 template <typename ValueType>
35 class ConcatNode final : public DOpNode<ValueType>
36 {
37 public:
38  static constexpr bool debug = false;
39 
41  using Super::context_;
42 
43  //! Constructor for variant with variadic parent parameter pack, which each
44  //! parent may have a different FunctionStack.
45  template <typename ParentDIA0, typename... ParentDIAs>
46  explicit ConcatNode(const ParentDIA0& parent0,
47  const ParentDIAs& ... parents)
48  : Super(parent0.ctx(), "Concat",
49  { parent0.id(), parents.id() ... },
50  { parent0.node(), parents.node() ... }),
51  num_inputs_(1 + sizeof ... (ParentDIAs)),
52  // parenthesis are due to a MSVC2015 parser bug
53  parent_stack_empty_({ ParentDIA0::stack_empty, (ParentDIAs::stack_empty)... }) {
54  PrintWarning();
55 
56  files_.reserve(num_inputs_);
57  writers_.reserve(num_inputs_);
58 
59  // allocate files.
60  for (size_t i = 0; i < num_inputs_; ++i)
61  files_.emplace_back(context_.GetFile(this));
62 
63  for (size_t i = 0; i < num_inputs_; ++i)
64  writers_.emplace_back(files_[i].GetWriter());
65 
67  RegisterParent(this), parent0, parents...);
68  }
69 
70  //! Constructor for variant with a std::vector of parents all with the same
71  //! (usually empty) FunctionStack.
72  template <typename ParentDIA>
73  explicit ConcatNode(const std::vector<ParentDIA>& parents)
74  : Super(parents.front().ctx(), "Concat",
75  common::MapVector(
76  parents, [](const ParentDIA& d) { return d.id(); }),
78  parents, [](const ParentDIA& d) {
79  return DIABasePtr(d.node().get());
80  })),
81  num_inputs_(parents.size()),
82  parent_stack_empty_({ ParentDIA::stack_empty })
83  {
84  PrintWarning();
85 
86  files_.reserve(num_inputs_);
87  writers_.reserve(num_inputs_);
88 
89  // allocate files.
90  for (size_t i = 0; i < num_inputs_; ++i)
91  files_.emplace_back(context_.GetFile(this));
92 
93  for (size_t i = 0; i < num_inputs_; ++i)
94  writers_.emplace_back(files_[i].GetWriter());
95 
96  for (size_t i = 0; i < num_inputs_; ++i)
97  {
98  // construct lambda with only the writer in the closure
99  data::File::Writer* writer = &writers_[i];
100  auto pre_op_fn = [writer](const ValueType& input) -> void {
101  writer->Put(input);
102  };
103 
104  // close the function stacks with our pre ops and register it at
105  // parent nodes for output
106  auto lop_chain = parents[i].stack().push(pre_op_fn).fold();
107  parents[i].node()->AddChild(this, lop_chain, i);
108  }
109  }
110 
111  //! Constructor for variant with a std::vector of parents all with the same
112  //! (usually empty) FunctionStack.
113  template <typename ParentDIA>
114  explicit ConcatNode(const std::initializer_list<ParentDIA>& parents)
115  : ConcatNode(std::vector<ParentDIA>(parents)) { }
116 
117  void PrintWarning() {
118  static bool warned_once = false;
119  if (warned_once) return;
120  warned_once = true;
121 
122  LOG1 << "Warning: Concat() is a _very_ expensive data shuffle operation"
123  << " which can usually be avoided.";
124  }
125 
126  //! Register Parent PreOp Hooks, instantiated and called for each Concat
127  //! parent
128  class RegisterParent
129  {
130  public:
131  explicit RegisterParent(ConcatNode* concat_node)
132  : concat_node_(concat_node) { }
133 
134  template <typename Index, typename Parent>
135  void operator () (const Index&, Parent& parent) {
136 
137  // construct lambda with only the writer in the closure
138  data::File::Writer* writer = &concat_node_->writers_[Index::index];
139  auto pre_op_fn = [writer](const ValueType& input) -> void {
140  writer->Put(input);
141  };
142 
143  // close the function stacks with our pre ops and register it at
144  // parent nodes for output
145  auto lop_chain = parent.stack().push(pre_op_fn).fold();
146 
147  parent.node()->AddChild(concat_node_, lop_chain, Index::index);
148  }
149 
150  private:
151  ConcatNode* concat_node_;
152  };
153 
154  //! Receive a whole data::File of ValueType, but only if our stack is empty.
155  bool OnPreOpFile(const data::File& file, size_t parent_index) final {
156  assert(parent_index < num_inputs_);
157 
158  if (num_inputs_ == parent_stack_empty_.size()) {
159  // ConcatNode was constructed from different parents
160  if (!parent_stack_empty_[parent_index]) {
162  << "Concat rejected File from parent "
163  << "due to non-empty function stack.";
164  return false;
165  }
166  }
167  else {
168  // ConcatNode was constructor with a vector of equal parents
169  if (!parent_stack_empty_[0]) {
171  << "Concat rejected File from parent "
172  << "due to non-empty function stack.";
173  return false;
174  }
175  }
176 
177  // accept file
178  assert(files_[parent_index].num_items() == 0);
179  files_[parent_index] = file.Copy();
180  return true;
181  }
182 
183  void StopPreOp(size_t parent_index) final {
184  writers_[parent_index].Close();
185  }
186 
187  //! Executes the concat operation.
188  void Execute() final {
189  LOG << "ConcatNode::Execute() processing";
190 
191  using VectorSizeT = std::vector<size_t>;
192 
193  VectorSizeT local_sizes(num_inputs_);
194  for (size_t i = 0; i < num_inputs_; ++i) {
195  local_sizes[i] = files_[i].num_items();
196  }
197  sLOG << "local_sizes" << local_sizes;
198 
199  VectorSizeT global_sizes = context_.net.AllReduce(
200  local_sizes, common::ComponentSum<VectorSizeT>());
201 
202  sLOG << "global_sizes" << global_sizes;
203 
204  // exclusive prefixsum of whole dia sizes
205  size_t total_items = 0;
206  for (size_t i = 0; i < num_inputs_; ++i) {
207 
208  size_t next_total_items = total_items + global_sizes[i];
209 
210  // on rank 0: add sum to local_sizes
211  if (context_.my_rank() == 0)
212  local_sizes[i] = total_items;
213 
214  total_items = next_total_items;
215  }
216 
217  sLOG << "local_sizes" << local_sizes;
218  sLOG << "total_items" << total_items;
219 
220  VectorSizeT local_ranks = context_.net.PrefixSum(
221  local_sizes, common::ComponentSum<VectorSizeT>(),
222  VectorSizeT(num_inputs_));
223 
224  sLOG << "local_ranks" << local_ranks;
225 
226  streams_.reserve(num_inputs_);
227  for (size_t i = 0; i < num_inputs_; ++i)
228  streams_.emplace_back(context_.GetNewCatStream(this));
229 
230  /*
231  * Data Exchange in Concat (Example):
232  *
233  * / worker0 \ / worker1 \ / worker2 \ / worker3 \
234  * |--256--| |--256--| |--256--| |--256--| DIA0
235  * |----512----| |----512----| |----512----| |----512----| DIA1
236  *
237  * In the steps above, we calculate the global rank of each DIA piece.
238  *
239  * Result:
240  * (global per PE split points)
241  * v v v v
242  * | | | |
243  * |256||256||256||256||--512--||--512--||--512--||--512--|
244  * |------------------||----------------------------------|
245  * (stream0) (stream1)
246  *
247  * With the global ranks of each DIA piece, one can calculate where it
248  * should go. We have to use k CatStreams for the data exchange, since
249  * the last PE must be able to transmit its piece to the first PE before
250  * all those in DIA1. The offset calculation below determines the global
251  * per_pe split point relative to the current DIA piece's global rank:
252  * if the pre_pe split is below the global rank, nothing needs to be
253  * send. Otherwise, one can send only that PE part to the PE. -tb
254  */
255  const size_t num_workers = context_.num_workers();
256  const double pre_pe =
257  static_cast<double>(total_items) / static_cast<double>(num_workers);
258 
259  for (size_t in = 0; in < num_inputs_; ++in) {
260 
261  // calculate offset vector
262  std::vector<size_t> offsets(num_workers + 1, 0);
263  for (size_t p = 0; p < num_workers; ++p) {
264  size_t limit =
265  static_cast<size_t>(static_cast<double>(p) * pre_pe);
266  if (limit < local_ranks[in]) continue;
267 
268  offsets[p] = std::min(limit - local_ranks[in],
269  files_[in].num_items());
270  }
271  offsets[num_workers] = files_[in].num_items();
272 
273  LOG << "offsets[" << in << "] = " << offsets;
274 
275  streams_[in]->template Scatter<ValueType>(
276  files_[in], offsets, /* consume */ true);
277  }
278  }
279 
280  void PushData(bool consume) final {
281 
282  size_t total = 0;
283  // concatenate all CatStreams
284  for (size_t in = 0; in < num_inputs_; ++in) {
286  streams_[in]->GetCatReader(consume);
287 
288  while (reader.HasNext()) {
289  this->PushItem(reader.Next<ValueType>());
290  ++total;
291  }
292  }
293  LOG << "total = " << total;
294  }
295 
296  void Dispose() final {
297  files_.clear();
298  writers_.clear();
299  streams_.clear();
300  }
301 
302 private:
303  //! number of input DIAs
304  const size_t num_inputs_;
305 
306  //! Whether the parent stack is empty
307  const std::vector<bool> parent_stack_empty_;
308 
309  //! Files for intermediate storage
310  std::vector<data::File> files_;
311  //! Writers to intermediate files
312  std::vector<data::File::Writer> writers_;
313 
314  //! Array of CatStreams for exchange
315  std::vector<data::CatStreamPtr> streams_;
316 };
317 
318 /*!
319  * Concat is a DOp, which concatenates any number of DIAs to a single DIA. All
320  * input DIAs must contain the same type, which is also the output DIA's type.
321  *
322  * The concat operation balances all input data, so that each worker will have
323  * an equal number of elements when the concat completes.
324  *
325  * \param first_dia first DIA
326  * \param dias DIAs, which are concatd with the first DIA.
327  *
328  * \ingroup dia_dops
329  */
330 template <typename FirstDIA, typename... DIAs>
331 auto Concat(const FirstDIA& first_dia, const DIAs& ... dias) {
332 
333  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
334 
335  using ValueType = typename FirstDIA::ValueType;
336 
338 
339  return DIA<ValueType>(tlx::make_counting<ConcatNode>(first_dia, dias...));
340 }
341 
342 /*!
343  * Concat is a DOp, which concatenates any number of DIAs to a single DIA. All
344  * input DIAs must contain the same type, which is also the output DIA's type.
345  *
346  * The concat operation balances all input data, so that each worker will have
347  * an equal number of elements when the concat completes.
348  *
349  * \param dias DIAs, which is concatenated.
350  *
351  * \ingroup dia_dops
352  */
353 template <typename ValueType>
354 auto Concat(const std::initializer_list<DIA<ValueType> >& dias) {
355 
356  for (const DIA<ValueType>& d : dias)
357  d.AssertValid();
358 
360 
361  return DIA<ValueType>(tlx::make_counting<ConcatNode>(dias));
362 }
363 
364 /*!
365  * Concat is a DOp, which concatenates any number of DIAs to a single DIA. All
366  * input DIAs must contain the same type, which is also the output DIA's type.
367  *
368  * The concat operation balances all input data, so that each worker will have
369  * an equal number of elements when the concat completes.
370  *
371  * \param dias DIAs, which is concatenated.
372  *
373  * \ingroup dia_dops
374  */
375 template <typename ValueType>
376 auto Concat(const std::vector<DIA<ValueType> >& dias) {
377 
378  for (const DIA<ValueType>& d : dias)
379  d.AssertValid();
380 
382 
383  return DIA<ValueType>(tlx::make_counting<ConcatNode>(dias));
384 }
385 
386 template <typename ValueType, typename Stack>
387 template <typename SecondDIA>
389  const SecondDIA& second_dia) const {
390  return api::Concat(*this, second_dia);
391 }
392 
393 } // namespace api
394 
395 //! imported from api namespace
396 using api::Concat;
397 
398 } // namespace thrill
399 
400 #endif // !THRILL_API_CONCAT_HEADER
401 
402 /******************************************************************************/
net::FlowControlChannel & net
Definition: context.hpp:446
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
void StopPreOp(size_t parent_index) final
Virtual method for preparing end of PushData.
Definition: concat.hpp:183
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
void vexpand(Types &&...)
Definition: vexpand.hpp:24
#define LOG1
Definition: logger.hpp:28
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
auto Concat(const SecondDIA &second_dia) const
Concat is a DOp, which concatenates any number of DIAs to a single DIA.
Definition: concat.hpp:388
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
std::vector< data::CatStreamPtr > streams_
Array of CatStreams for exchange.
Definition: concat.hpp:315
static constexpr bool g_debug_push_file
Definition: config.hpp:44
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT AllReduce(const T &value, const BinarySumOp &sum_op=BinarySumOp())
Reduces a value of a serializable type T over all workers given a certain reduce function.
auto MapVector(const std::vector< Type > &input, const Functor &f) -> std::vector< typename std::result_of< Functor(Type)>::type >
Definition: functional.hpp:98
bool OnPreOpFile(const data::File &file, size_t parent_index) final
Receive a whole data::File of ValueType, but only if our stack is empty.
Definition: concat.hpp:155
BlockWriter< FileBlockSink > Writer
Definition: file.hpp:59
template for computing the component-wise sum of std::array or std::vector.
Definition: functional.hpp:114
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1147
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
ConcatNode(const ParentDIA0 &parent0, const ParentDIAs &...parents)
Definition: concat.hpp:46
std::vector< data::File > files_
Files for intermediate storage.
Definition: concat.hpp:310
static constexpr bool debug
Definition: concat.hpp:38
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
Definition: concat.hpp:280
std::vector< data::File::Writer > writers_
Writers to intermediate files.
Definition: concat.hpp:312
std::vector< T, Allocator< T > > vector
vector with Manager tracking
Definition: allocator.hpp:228
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:283
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: concat.hpp:296
tlx::CountingPtr< DIABase > DIABasePtr
Definition: dia_base.hpp:90
void Execute() final
Executes the concat operation.
Definition: concat.hpp:188
DOpNode< ValueType > Super
Definition: concat.hpp:40
const std::vector< DIABasePtr > & parents() const
Returns the parents of this DIABase.
Definition: dia_base.hpp:253
void call_foreach_with_index(Functor &&f, Args &&...args)
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:251
auto Concat(const FirstDIA &first_dia, const DIAs &...dias)
Concat is a DOp, which concatenates any number of DIAs to a single DIA.
Definition: concat.hpp:331
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
Context & context_
associated Context
Definition: dia_base.hpp:293
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT PrefixSum(const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the inclusive prefix sum over all workers, given a certain sum operation.
const std::vector< bool > parent_stack_empty_
Whether the parent stack is empty.
Definition: concat.hpp:307
const size_t num_inputs_
number of input DIAs
Definition: concat.hpp:304