Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
merge.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/merge.hpp
3  *
4  * DIANode for a merge operation. Performs the actual merge operation
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
9  * Copyright (C) 2015 Emanuel J√∂bstl <[email protected]>
10  *
11  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
12  ******************************************************************************/
13 
14 #pragma once
15 #ifndef THRILL_API_MERGE_HEADER
16 #define THRILL_API_MERGE_HEADER
17 
18 #include <thrill/api/dia.hpp>
19 #include <thrill/api/dop_node.hpp>
21 #include <thrill/common/logger.hpp>
24 #include <thrill/common/string.hpp>
27 #include <thrill/data/file.hpp>
28 
29 #include <tlx/math/abs_diff.hpp>
31 #include <tlx/meta/vexpand.hpp>
32 
33 #include <algorithm>
34 #include <array>
35 #include <functional>
36 #include <random>
37 #include <string>
38 #include <vector>
39 
40 namespace thrill {
41 namespace api {
42 
43 /*!
44  * Implementation of Thrill's merge. This merge implementation balances all data
45  * before merging, so each worker has the same amount of data when merge
46  * finishes.
47  *
48  * The algorithm performs a distributed multi-sequence selection by picking
49  * random pivots (from the largest remaining interval) for each DIA. The pivots
50  * are selected via a global AllReduce. There is one pivot per DIA.
51  *
52  * Then the pivots are searched for in the interval [left,left + width) in each
53  * local File's partition, where these are initialized with left = 0 and width =
54  * File.size(). This delivers the local_rank of each pivot. From the local_ranks
55  * the corresponding global_ranks of each pivot is calculated via a AllReduce.
56  *
57  * The global_ranks are then compared to the target_ranks (which are n/p *
58  * rank). If global_ranks is smaller, the interval [left,left + width) is
59  * reduced to [left,idx), where idx is the rank of the pivot in the local
60  * File. If global_ranks is larger, the interval is reduced to [idx,left+width).
61  *
62  * left -> width
63  * V V V V V V
64  * +------------+ +-----------+ +-------------------+ DIA 0
65  * ^
66  * local_ranks, global_ranks = sum over all local_ranks
67  *
68  * \tparam ValueType The type of the first and second input DIA
69  * \tparam Comparator The comparator defining input and output order.
70  * \tparam ParentDIA0 The type of the first input DIA
71  * \tparam ParentDIAs The types of the other input DIAs
72  *
73  * \ingroup api_layer
74  */
75 template <typename ValueType, typename Comparator, size_t kNumInputs>
76 class MergeNode : public DOpNode<ValueType>
77 {
78  static constexpr bool debug = false;
79  static constexpr bool self_verify = debug && common::g_debug_mode;
80 
81  //! Set this variable to true to enable generation and output of merge stats
82  static constexpr bool stats_enabled = false;
83 
85  using Super::context_;
86 
87  static_assert(kNumInputs >= 2, "Merge requires at least two inputs.");
88 
89 public:
90  template <typename ParentDIA0, typename... ParentDIAs>
91  MergeNode(const Comparator& comparator,
92  const ParentDIA0& parent0, const ParentDIAs& ... parents)
93  : Super(parent0.ctx(), "Merge",
94  { parent0.id(), parents.id() ... },
95  { parent0.node(), parents.node() ... }),
96  comparator_(comparator),
97  // this weirdness is due to a MSVC2015 parser bug
99  std::array<bool, kNumInputs>{
100  { ParentDIA0::stack_empty, (ParentDIAs::stack_empty)... }
101  })
102  {
103  // allocate files.
104  for (size_t i = 0; i < kNumInputs; ++i)
105  files_[i] = context_.GetFilePtr(this);
106 
107  for (size_t i = 0; i < kNumInputs; ++i)
108  writers_[i] = files_[i]->GetWriter();
109 
111  RegisterParent(this), parent0, parents...);
112  }
113 
114  //! Register Parent PreOp Hooks, instantiated and called for each Merge
115  //! parent
116  class RegisterParent
117  {
118  public:
119  explicit RegisterParent(MergeNode* merge_node)
120  : merge_node_(merge_node) { }
121 
122  template <typename Index, typename Parent>
123  void operator () (const Index&, Parent& parent) {
124 
125  // construct lambda with only the writer in the closure
126  data::File::Writer* writer = &merge_node_->writers_[Index::index];
127  auto pre_op_fn = [writer](const ValueType& input) -> void {
128  writer->Put(input);
129  };
130 
131  // close the function stacks with our pre ops and register it at
132  // parent nodes for output
133  auto lop_chain = parent.stack().push(pre_op_fn).fold();
134 
135  parent.node()->AddChild(merge_node_, lop_chain, Index::index);
136  }
137 
138  private:
139  MergeNode* merge_node_;
140  };
141 
142  //! Receive a whole data::File of ValueType, but only if our stack is empty.
143  bool OnPreOpFile(const data::File& file, size_t parent_index) final {
144  assert(parent_index < kNumInputs);
145  if (!parent_stack_empty_[parent_index]) return false;
146 
147  // accept file
148  assert(files_[parent_index]->num_items() == 0);
149  *files_[parent_index] = file.Copy();
150  return true;
151  }
152 
153  void StopPreOp(size_t id) final {
154  writers_[id].Close();
155  }
156 
157  void Execute() final {
158  MainOp();
159  }
160 
161  void PushData(bool consume) final {
162  size_t result_count = 0;
163  static constexpr bool debug = false;
164 
165  stats_.merge_timer_.Start();
166 
167  // get inbound readers from all Channels
168  std::vector<data::CatStream::CatReader> readers;
169  readers.reserve(kNumInputs);
170 
171  for (size_t i = 0; i < kNumInputs; i++)
172  readers.emplace_back(streams_[i]->GetCatReader(consume));
173 
174  auto puller = core::make_multiway_merge_tree<ValueType>(
175  readers.begin(), readers.end(), comparator_);
176 
177  while (puller.HasNext())
178  this->PushItem(puller.Next());
179 
180  stats_.merge_timer_.Stop();
181 
182  sLOG << "Merge: result_count" << result_count;
183 
184  stats_.result_size_ = result_count;
186  }
187 
188  void Dispose() final { }
189 
190 private:
191  //! Merge comparator
192  Comparator comparator_;
193 
194  //! Whether the parent stack is empty
195  const std::array<bool, kNumInputs> parent_stack_empty_;
196 
197  //! Files for intermediate storage
198  data::FilePtr files_[kNumInputs];
199 
200  //! Writers to intermediate files
202 
203  //! Array of inbound CatStreams
205 
206  struct Pivot {
207  ValueType value;
208  size_t tie_idx;
209  size_t segment_len;
210  };
211 
212  //! Count of items on all prev workers.
213  size_t prefix_size_;
214 
215  using ArrayNumInputsSizeT = std::array<size_t, kNumInputs>;
216 
217  //! Logging helper to print vectors of vectors of pivots.
218  static std::string VToStr(const std::vector<Pivot>& data) {
219  std::stringstream oss;
220  for (const Pivot& elem : data) {
221  oss << "(" << elem.value
222  << ", itie: " << elem.tie_idx
223  << ", len: " << elem.segment_len << ") ";
224  }
225  return oss.str();
226  }
227 
228  //! Reduce functor that returns the pivot originating from the biggest
229  //! range. That removes some nasty corner cases, like selecting the same
230  //! pivot over and over again from a tiny range.
232  {
233  public:
234  Pivot operator () (const Pivot& a, const Pivot& b) const {
235  return a.segment_len > b.segment_len ? a : b;
236  }
237  };
238 
240 
241  /*!
242  * Stats holds timers for measuring merge performance, that supports
243  * accumulating the output and printing it to the standard out stream.
244  */
245  class Stats
246  {
247  public:
248  //! A Timer accumulating all time spent in File operations.
250  //! A Timer accumulating all time spent while actually merging.
252  //! A Timer accumulating all time spent while re-balancing the data.
254  //! A Timer accumulating all time spent for selecting the global pivot
255  //! elements.
257  //! A Timer accumulating all time spent in global search steps.
259  //! A Timer accumulating all time spent communicating.
261  //! A Timer accumulating all time spent calling the scatter method of
262  //! the data subsystem.
264  //! The count of all elements processed on this host.
265  size_t result_size_ = 0;
266  //! The count of search iterations needed for balancing.
267  size_t iterations_ = 0;
268 
270  const std::string& label, size_t p, size_t value) {
271 
272  LOG1 << "RESULT " << "operation=" << label << " time=" << value
273  << " workers=" << p << " result_size_=" << result_size_;
274  }
275 
276  void Print(Context& ctx) {
277  if (stats_enabled) {
278  size_t p = ctx.num_workers();
279  size_t merge =
280  ctx.net.AllReduce(merge_timer_.Milliseconds()) / p;
281  size_t balance =
282  ctx.net.AllReduce(balancing_timer_.Milliseconds()) / p;
283  size_t pivot_selection =
284  ctx.net.AllReduce(pivot_selection_timer_.Milliseconds()) / p;
285  size_t search_step =
286  ctx.net.AllReduce(search_step_timer_.Milliseconds()) / p;
287  size_t file_op =
288  ctx.net.AllReduce(file_op_timer_.Milliseconds()) / p;
289  size_t comm =
290  ctx.net.AllReduce(comm_timer_.Milliseconds()) / p;
291  size_t scatter =
292  ctx.net.AllReduce(scatter_timer_.Milliseconds()) / p;
293 
295 
296  if (ctx.my_rank() == 0) {
297  PrintToSQLPlotTool("merge", p, merge);
298  PrintToSQLPlotTool("balance", p, balance);
299  PrintToSQLPlotTool("pivot_selection", p, pivot_selection);
300  PrintToSQLPlotTool("search_step", p, search_step);
301  PrintToSQLPlotTool("file_op", p, file_op);
302  PrintToSQLPlotTool("communication", p, comm);
303  PrintToSQLPlotTool("scatter", p, scatter);
304  PrintToSQLPlotTool("iterations", p, iterations_);
305  }
306  }
307  }
308  };
309 
310  //! Instance of merge statistics
312 
313  /*!
314  * Selects random global pivots for all splitter searches based on all
315  * worker's search ranges.
316  *
317  * \param left The left bounds of all search ranges for all files. The
318  * first index identifies the splitter, the second index identifies the
319  * file.
320  *
321  * \param width The width of all search ranges for all files. The first
322  * index identifies the splitter, the second index identifies the file.
323  *
324  * \param out_pivots The output pivots.
325  */
327  const std::vector<ArrayNumInputsSizeT>& left,
328  const std::vector<ArrayNumInputsSizeT>& width,
329  std::vector<Pivot>& out_pivots) {
330 
331  // Select a random pivot for the largest range we have for each
332  // splitter.
333  for (size_t s = 0; s < width.size(); s++) {
334  size_t mp = 0;
335 
336  // Search for the largest range.
337  for (size_t p = 1; p < width[s].size(); p++) {
338  if (width[s][p] > width[s][mp]) {
339  mp = p;
340  }
341  }
342 
343  // We can leave pivot_elem uninitialized. If it is not initialized
344  // below, then an other worker's pivot will be taken for this range,
345  // since our range is zero.
346  ValueType pivot_elem = ValueType();
347  size_t pivot_idx = left[s][mp];
348 
349  if (width[s][mp] > 0) {
350  pivot_idx = left[s][mp] + (context_.rng_() % width[s][mp]);
351  assert(pivot_idx < files_[mp]->num_items());
352  stats_.file_op_timer_.Start();
353  pivot_elem = files_[mp]->template GetItemAt<ValueType>(pivot_idx);
354  stats_.file_op_timer_.Stop();
355  }
356 
357  out_pivots[s] = Pivot {
358  pivot_elem,
359  pivot_idx,
360  width[s][mp]
361  };
362  }
363 
364  LOG << "local pivots: " << VToStr(out_pivots);
365 
366  // Reduce vectors of pivots globally to select the pivots from the
367  // largest ranges.
368  stats_.comm_timer_.Start();
369  out_pivots = context_.net.AllReduce(
370  out_pivots, common::ComponentSum<std::vector<Pivot>, ReducePivots>());
371  stats_.comm_timer_.Stop();
372  }
373 
374  /*!
375  * Calculates the global ranks of the given pivots.
376  * Additionally returns the local ranks so we can use them in the next step.
377  */
379  const std::vector<Pivot>& pivots,
380  std::vector<size_t>& global_ranks,
381  std::vector<ArrayNumInputsSizeT>& out_local_ranks,
382  const std::vector<ArrayNumInputsSizeT>& left,
383  const std::vector<ArrayNumInputsSizeT>& width) {
384 
385  // Simply get the rank of each pivot in each file. Sum the ranks up
386  // locally.
387  for (size_t s = 0; s < pivots.size(); s++) {
388  size_t rank = 0;
389  for (size_t i = 0; i < kNumInputs; i++) {
390  stats_.file_op_timer_.Start();
391 
392  size_t idx = files_[i]->GetIndexOf(
393  pivots[s].value, pivots[s].tie_idx,
394  left[s][i], left[s][i] + width[s][i],
395  comparator_);
396 
397  stats_.file_op_timer_.Stop();
398 
399  rank += idx;
400  out_local_ranks[s][i] = idx;
401  }
402  global_ranks[s] = rank;
403  }
404 
405  stats_.comm_timer_.Start();
406  // Sum up ranks globally.
407  global_ranks = context_.net.AllReduce(
408  global_ranks, common::ComponentSum<std::vector<size_t> >());
409  stats_.comm_timer_.Stop();
410  }
411 
412  /*!
413  * Shrinks the search ranges according to the global ranks of the pivots.
414  *
415  * \param global_ranks The global ranks of all pivots.
416  *
417  * \param local_ranks The local ranks of each pivot in each file.
418  *
419  * \param target_ranks The desired ranks of the splitters we are looking
420  * for.
421  *
422  * \param left The left bounds of all search ranges for all files. The
423  * first index identifies the splitter, the second index identifies the
424  * file. This parameter will be modified.
425  *
426  * \param width The width of all search ranges for all files. The first
427  * index identifies the splitter, the second index identifies the file.
428  * This parameter will be modified.
429  */
431  const std::vector<size_t>& global_ranks,
432  const std::vector<ArrayNumInputsSizeT>& local_ranks,
433  const std::vector<size_t>& target_ranks,
434  std::vector<ArrayNumInputsSizeT>& left,
435  std::vector<ArrayNumInputsSizeT>& width) {
436 
437  for (size_t s = 0; s < width.size(); s++) {
438  for (size_t p = 0; p < width[s].size(); p++) {
439 
440  if (width[s][p] == 0)
441  continue;
442 
443  size_t local_rank = local_ranks[s][p];
444  size_t old_width = width[s][p];
445  assert(left[s][p] <= local_rank);
446 
447  if (global_ranks[s] < target_ranks[s]) {
448  width[s][p] -= local_rank - left[s][p];
449  left[s][p] = local_rank;
450  }
451  else if (global_ranks[s] >= target_ranks[s]) {
452  width[s][p] = local_rank - left[s][p];
453  }
454 
455  if (debug) {
456  die_unless(width[s][p] <= old_width);
457  }
458  }
459  }
460  }
461 
462  /*!
463  * Receives elements from other workers and re-balance them, so each worker
464  * has the same amount after merging.
465  */
466  void MainOp() {
467  // *** Setup Environment for merging ***
468 
469  // Count of all workers (and count of target partitions)
470  size_t p = context_.num_workers();
471  LOG << "splitting to " << p << " workers";
472 
473  // Count of all local elements.
474  size_t local_size = 0;
475 
476  for (size_t i = 0; i < kNumInputs; i++) {
477  local_size += files_[i]->num_items();
478  }
479 
480  // test that the data we got is sorted!
481  if (self_verify) {
482  for (size_t i = 0; i < kNumInputs; i++) {
483  auto reader = files_[i]->GetKeepReader();
484  if (!reader.HasNext()) continue;
485 
486  ValueType prev = reader.template Next<ValueType>();
487  while (reader.HasNext()) {
488  ValueType next = reader.template Next<ValueType>();
489  if (comparator_(next, prev)) {
490  die("Merge input was not sorted!");
491  }
492  prev = std::move(next);
493  }
494  }
495  }
496 
497  // Count of all global elements.
498  stats_.comm_timer_.Start();
499  size_t global_size = context_.net.AllReduce(local_size);
500  stats_.comm_timer_.Stop();
501 
502  LOG << "local size: " << local_size;
503  LOG << "global size: " << global_size;
504 
505  // Calculate and remember the ranks we search for. In our case, we
506  // search for ranks that split the data into equal parts.
507  std::vector<size_t> target_ranks(p - 1);
508 
509  for (size_t r = 0; r < p - 1; r++) {
510  target_ranks[r] = (global_size / p) * (r + 1);
511  // Modify all ranks 0..(globalSize % p), in case global_size is not
512  // divisible by p.
513  if (r < global_size % p)
514  target_ranks[r] += 1;
515  }
516 
517  if (debug) {
518  LOG << "target_ranks: " << target_ranks;
519 
520  stats_.comm_timer_.Start();
521  assert(context_.net.Broadcast(target_ranks) == target_ranks);
522  stats_.comm_timer_.Stop();
523  }
524 
525  // buffer for the global ranks of selected pivots
526  std::vector<size_t> global_ranks(p - 1);
527 
528  // Search range bounds.
529  std::vector<ArrayNumInputsSizeT> left(p - 1), width(p - 1);
530 
531  // Auxillary arrays.
532  std::vector<Pivot> pivots(p - 1);
533  std::vector<ArrayNumInputsSizeT> local_ranks(p - 1);
534 
535  // Initialize all lefts with 0 and all widths with size of their
536  // respective file.
537  for (size_t r = 0; r < p - 1; r++) {
538  for (size_t q = 0; q < kNumInputs; q++) {
539  width[r][q] = files_[q]->num_items();
540  }
541  }
542 
543  bool finished = false;
544  stats_.balancing_timer_.Start();
545 
546  // Iterate until we find a pivot which is within the prescribed balance
547  // tolerance
548  while (!finished) {
549 
550  LOG << "iteration: " << stats_.iterations_;
551  LOG0 << "left: " << left;
552  LOG0 << "width: " << width;
553 
554  if (debug) {
555  for (size_t q = 0; q < kNumInputs; q++) {
556  std::ostringstream oss;
557  for (size_t i = 0; i < p - 1; ++i) {
558  if (i != 0) oss << " # ";
559  oss << '[' << left[i][q] << ',' << left[i][q] + width[i][q] << ')';
560  }
561  LOG1 << "left/right[" << q << "]: " << oss.str();
562  }
563  }
564 
565  // Find pivots.
566  stats_.pivot_selection_timer_.Start();
567  SelectPivots(left, width, pivots);
568  stats_.pivot_selection_timer_.Stop();
569 
570  LOG << "final pivots: " << VToStr(pivots);
571 
572  // Get global ranks and shrink ranges.
573  stats_.search_step_timer_.Start();
574  GetGlobalRanks(pivots, global_ranks, local_ranks, left, width);
575 
576  LOG << "global_ranks: " << global_ranks;
577  LOG << "local_ranks: " << local_ranks;
578 
579  SearchStep(global_ranks, local_ranks, target_ranks, left, width);
580 
581  if (debug) {
582  for (size_t q = 0; q < kNumInputs; q++) {
583  std::ostringstream oss;
584  for (size_t i = 0; i < p - 1; ++i) {
585  if (i != 0) oss << " # ";
586  oss << '[' << left[i][q] << ',' << left[i][q] + width[i][q] << ')';
587  }
588  LOG1 << "left/right[" << q << "]: " << oss.str();
589  }
590  }
591 
592  // We check for accuracy of kNumInputs + 1
593  finished = true;
594  for (size_t i = 0; i < p - 1; i++) {
595  size_t a = global_ranks[i], b = target_ranks[i];
596  if (tlx::abs_diff(a, b) > kNumInputs + 1) {
597  finished = false;
598  break;
599  }
600  }
601 
602  stats_.search_step_timer_.Stop();
603  stats_.iterations_++;
604  }
605  stats_.balancing_timer_.Stop();
606 
607  LOG << "Finished after " << stats_.iterations_ << " iterations";
608 
609  LOG << "Creating channels";
610 
611  // Initialize channels for distributing data.
612  for (size_t j = 0; j < kNumInputs; j++)
613  streams_[j] = context_.GetNewCatStream(this);
614 
615  stats_.scatter_timer_.Start();
616 
617  LOG << "Scattering.";
618 
619  // For each file, initialize an array of offsets according to the
620  // splitters we found. Then call Scatter to distribute the data.
621 
622  std::vector<size_t> tx_items(p);
623  for (size_t j = 0; j < kNumInputs; j++) {
624 
625  std::vector<size_t> offsets(p + 1, 0);
626 
627  for (size_t r = 0; r < p - 1; r++)
628  offsets[r + 1] = local_ranks[r][j];
629 
630  offsets[p] = files_[j]->num_items();
631 
632  LOG << "Scatter from file " << j << " to other workers: "
633  << offsets;
634 
635  for (size_t r = 0; r < p; ++r) {
636  tx_items[r] += offsets[r + 1] - offsets[r];
637  }
638 
639  streams_[j]->template Scatter<ValueType>(
640  *files_[j], offsets, /* consume */ true);
641  }
642 
643  LOG << "tx_items: " << tx_items;
644 
645  // calculate total items on each worker after Scatter
646  tx_items = context_.net.AllReduce(
647  tx_items, common::ComponentSum<std::vector<size_t> >());
648  if (context_.my_rank() == 0)
649  LOG1 << "Merge(): total_items: " << tx_items;
650 
651  stats_.scatter_timer_.Stop();
652  }
653 };
654 
655 /*!
656  * Merge is a DOp, which merges any number of sorted DIAs to a single sorted
657  * DIA. All input DIAs must be sorted conforming to the given comparator. The
658  * type of the output DIA will be the type of this DIA.
659  *
660  * The merge operation balances all input data, so that each worker will have an
661  * equal number of elements when the merge completes.
662  *
663  * \tparam Comparator Comparator to specify the order of input and output.
664  *
665  * \param comparator Comparator to specify the order of input and output.
666  *
667  * \param first_dia first DIA
668  * \param dias DIAs, which is merged with this DIA.
669  *
670  * \ingroup dia_dops
671  */
672 template <typename Comparator, typename FirstDIA, typename... DIAs>
673 auto Merge(const Comparator& comparator,
674  const FirstDIA& first_dia, const DIAs& ... dias) {
675 
676  tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
677 
678  using ValueType = typename FirstDIA::ValueType;
679 
680  using CompareResult =
681  typename common::FunctionTraits<Comparator>::result_type;
682 
683  using MergeNode = api::MergeNode<
684  ValueType, Comparator, 1 + sizeof ... (DIAs)>;
685 
686  // Assert comparator types.
687  static_assert(
688  std::is_convertible<
689  ValueType,
690  typename common::FunctionTraits<Comparator>::template arg<0>
691  >::value,
692  "Comparator has the wrong input type in argument 0");
693 
694  static_assert(
695  std::is_convertible<
696  ValueType,
697  typename common::FunctionTraits<Comparator>::template arg<1>
698  >::value,
699  "Comparator has the wrong input type in argument 1");
700 
701  // Assert meaningful return type of comperator.
702  static_assert(
703  std::is_convertible<
704  CompareResult,
705  bool
706  >::value,
707  "Comparator must return bool");
708 
709  auto merge_node =
710  tlx::make_counting<MergeNode>(comparator, first_dia, dias...);
711 
712  return DIA<ValueType>(merge_node);
713 }
714 
715 template <typename ValueType, typename Stack>
716 template <typename Comparator, typename SecondDIA>
718  const SecondDIA& second_dia, const Comparator& comparator) const {
719  return api::Merge(comparator, *this, second_dia);
720 }
721 
722 } // namespace api
723 
724 //! imported from api namespace
725 using api::Merge;
726 
727 } // namespace thrill
728 
729 #endif // !THRILL_API_MERGE_HEADER
730 
731 /******************************************************************************/
Comparator comparator_
Merge comparator.
Definition: merge.hpp:192
net::FlowControlChannel & net
Definition: context.hpp:443
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:152
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
size_t prefix_size_
Count of items on all prev workers.
Definition: merge.hpp:213
void GetGlobalRanks(const std::vector< Pivot > &pivots, std::vector< size_t > &global_ranks, std::vector< ArrayNumInputsSizeT > &out_local_ranks, const std::vector< ArrayNumInputsSizeT > &left, const std::vector< ArrayNumInputsSizeT > &width)
Calculates the global ranks of the given pivots.
Definition: merge.hpp:378
#define die_unless(X)
Definition: die.hpp:64
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:240
Pivot operator()(const Pivot &a, const Pivot &b) const
Definition: merge.hpp:234
static constexpr bool stats_enabled
Set this variable to true to enable generation and output of merge stats.
Definition: merge.hpp:82
MergeNode(const Comparator &comparator, const ParentDIA0 &parent0, const ParentDIAs &...parents)
Definition: merge.hpp:91
void vexpand(Types &&...)
Definition: vexpand.hpp:24
#define LOG1
Definition: logger.hpp:145
void SelectPivots(const std::vector< ArrayNumInputsSizeT > &left, const std::vector< ArrayNumInputsSizeT > &width, std::vector< Pivot > &out_pivots)
Selects random global pivots for all splitter searches based on all worker's search ranges...
Definition: merge.hpp:326
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:144
StatsTimer balancing_timer_
A Timer accumulating all time spent while re-balancing the data.
Definition: merge.hpp:253
size_t iterations_
The count of search iterations needed for balancing.
Definition: merge.hpp:267
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
const char * label() const
return label() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:218
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:43
void SearchStep(const std::vector< size_t > &global_ranks, const std::vector< ArrayNumInputsSizeT > &local_ranks, const std::vector< size_t > &target_ranks, std::vector< ArrayNumInputsSizeT > &left, std::vector< ArrayNumInputsSizeT > &width)
Shrinks the search ranges according to the global ranks of the pivots.
Definition: merge.hpp:430
std::default_random_engine rng_
a random generator
Definition: context.hpp:432
void Execute() final
Virtual execution method. Triggers actual computation in sub-classes.
Definition: merge.hpp:157
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT AllReduce(const T &value, const BinarySumOp &sum_op=BinarySumOp())
Reduces a value of a serializable type T over all workers given a certain reduce function.
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:218
Stats stats_
Instance of merge statistics.
Definition: merge.hpp:311
BlockWriter< FileBlockSink > Writer
Definition: file.hpp:59
static constexpr bool self_verify
Definition: merge.hpp:79
const std::array< bool, kNumInputs > parent_stack_empty_
Whether the parent stack is empty.
Definition: merge.hpp:195
const size_t & id() const
return unique id() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213
template for computing the component-wise sum of std::array or std::vector.
Definition: functional.hpp:114
StatsTimer pivot_selection_timer_
Definition: merge.hpp:256
void Print(Context &ctx)
Definition: merge.hpp:276
data::File::Writer writers_[kNumInputs]
Writers to intermediate files.
Definition: merge.hpp:201
data::CatStreamPtr GetNewCatStream(size_t dia_id)
Definition: context.cpp:1120
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
Stats holds timers for measuring merge performance, that supports accumulating the output and printin...
Definition: merge.hpp:245
void StopPreOp(size_t id) final
Virtual method for preparing end of PushData.
Definition: merge.hpp:153
void PrintToSQLPlotTool(const std::string &label, size_t p, size_t value)
Definition: merge.hpp:269
int value
Definition: gen_data.py:41
T abs_diff(const T &a, const T &b)
absolute difference, which also works for unsigned types
Definition: abs_diff.hpp:24
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
auto Merge(const SecondDIA &second_dia, const Comparator &comparator=Comparator()) const
Merge is a DOp, which merges two sorted DIAs to a single sorted DIA.
Definition: merge.hpp:717
StatsTimer merge_timer_
A Timer accumulating all time spent while actually merging.
Definition: merge.hpp:251
static std::string VToStr(const std::vector< Pivot > &data)
Logging helper to print vectors of vectors of pivots.
Definition: merge.hpp:218
data::FilePtr GetFilePtr(size_t dia_id)
Definition: context.cpp:1111
void MainOp()
Receives elements from other workers and re-balance them, so each worker has the same amount after me...
Definition: merge.hpp:466
std::array< size_t, kNumInputs > ArrayNumInputsSizeT
Definition: merge.hpp:215
static constexpr bool debug
Definition: merge.hpp:78
static constexpr bool g_debug_mode
debug mode is active, if NDEBUG is false.
Definition: config.hpp:28
auto Merge(const Comparator &comparator, const FirstDIA &first_dia, const DIAs &...dias)
Merge is a DOp, which merges any number of sorted DIAs to a single sorted DIA.
Definition: merge.hpp:673
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
StatsTimer file_op_timer_
A Timer accumulating all time spent in File operations.
Definition: merge.hpp:249
StatsTimer comm_timer_
A Timer accumulating all time spent communicating.
Definition: merge.hpp:260
const std::vector< DIABasePtr > & parents() const
Returns the parents of this DIABase.
Definition: dia_base.hpp:253
bool OnPreOpFile(const data::File &file, size_t parent_index) final
Receive a whole data::File of ValueType, but only if our stack is empty.
Definition: merge.hpp:143
void call_foreach_with_index(Functor &&f, Args &&...args)
void Close()
Explicitly close the writer.
data::FilePtr files_[kNumInputs]
Files for intermediate storage.
Definition: merge.hpp:198
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:248
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
Definition: merge.hpp:161
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT Broadcast(const T &value, size_t origin=0)
Broadcasts a value of a serializable type T from the master (the worker with id 0) to all other worke...
size_t result_size_
The count of all elements processed on this host.
Definition: merge.hpp:265
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:141
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: merge.hpp:188
Implementation of Thrill's merge.
Definition: merge.hpp:76
data::CatStreamPtr streams_[kNumInputs]
Array of inbound CatStreams.
Definition: merge.hpp:204
Context & context_
associated Context
Definition: dia_base.hpp:293
StatsTimer search_step_timer_
A Timer accumulating all time spent in global search steps.
Definition: merge.hpp:258