Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
sort.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/sort.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015 Michael Axtmann <[email protected]>
8  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
9  *
10  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11  ******************************************************************************/
12 
13 #pragma once
14 #ifndef THRILL_API_SORT_HEADER
15 #define THRILL_API_SORT_HEADER
16 
17 #include <thrill/api/context.hpp>
18 #include <thrill/api/dia.hpp>
19 #include <thrill/api/dop_node.hpp>
20 #include <thrill/common/logger.hpp>
21 #include <thrill/common/math.hpp>
23 #include <thrill/common/qsort.hpp>
26 #include <thrill/data/file.hpp>
27 #include <thrill/net/group.hpp>
28 
30 #include <tlx/vector_free.hpp>
31 
32 #include <algorithm>
33 #include <cstdlib>
34 #include <deque>
35 #include <functional>
36 #include <numeric>
37 #include <random>
38 #include <type_traits>
39 #include <utility>
40 #include <vector>
41 
42 namespace thrill {
43 namespace api {
44 
45 /*!
46  * A DIANode which performs a Sort operation. Sort sorts a DIA according to a
47  * given compare function
48  *
49  * \tparam ValueType Type of DIA elements
50  *
51  * \tparam CompareFunction Type of the compare function
52  *
53  * \tparam SortAlgorithm Type of the local sort function
54  *
55  * \tparam Stable Whether or not to use stable sorting mechanisms
56  *
57  * \ingroup api_layer
58  */
59 template <
60  typename ValueType,
61  typename CompareFunction,
62  typename SortAlgorithm,
63  bool Stable = false>
64 class SortNode final : public DOpNode<ValueType>
65 {
66  static constexpr bool debug = false;
67 
68  //! Set this variable to true to enable generation and output of stats
69  static constexpr bool stats_enabled = false;
70 
72  using Super::context_;
73 
74  //! Timer or FakeTimer
76  //! RIAA class for running the timer
78 
79  using SampleIndexPair = std::pair<ValueType, size_t>;
80 
81  //! Stream type for item transmission depends on Stable flag
82  using TranmissionStreamType = typename std::conditional<
83  Stable,
86 
87  //! Multiway merge tree creation depends on Stable flag
89  template <typename ReaderIterator,
90  typename Comparator = std::less<ValueType> >
92  ReaderIterator seqs_begin, ReaderIterator seqs_end,
93  const Comparator& comp = Comparator()) {
94 
95  return core::make_multiway_merge_tree<ValueType>(
96  seqs_begin, seqs_end, comp);
97  }
98  };
99 
101  template <typename ReaderIterator,
102  typename Comparator = std::less<ValueType> >
104  ReaderIterator seqs_begin, ReaderIterator seqs_end,
105  const Comparator& comp = Comparator()) {
106 
107  return core::make_stable_multiway_merge_tree<ValueType>(
108  seqs_begin, seqs_end, comp);
109  }
110  };
111 
112  using MakeMultiwayMergeTree = typename std::conditional<
113  Stable,
115 
116  static const bool use_background_thread_ = false;
117 
118 public:
119  /*!
120  * Constructor for a sort node.
121  */
122  template <typename ParentDIA>
123  SortNode(const ParentDIA& parent,
124  const CompareFunction& compare_function,
125  const SortAlgorithm& sort_algorithm = SortAlgorithm())
126  : Super(parent.ctx(), "Sort", { parent.id() }, { parent.node() }),
127  compare_function_(compare_function),
128  sort_algorithm_(sort_algorithm),
129  parent_stack_empty_(ParentDIA::stack_empty) {
130  // Hook PreOp(s)
131  auto pre_op_fn = [this](const ValueType& input) {
132  PreOp(input);
133  };
134 
135  auto lop_chain = parent.stack().push(pre_op_fn).fold();
136  parent.node()->AddChild(this, lop_chain);
137  }
138 
139  void StartPreOp(size_t /* parent_index */) final {
140  timer_preop_.Start();
142  }
143 
144  void PreOp(const ValueType& input) {
145  unsorted_writer_.Put(input);
147  local_items_++;
148  }
149 
150  //! Receive a whole data::File of ValueType, but only if our stack is empty.
151  bool OnPreOpFile(const data::File& file, size_t /* parent_index */) final {
152  if (!parent_stack_empty_) {
154  << "Sort rejected File from parent "
155  << "due to non-empty function stack.";
156  return false;
157  }
158 
159  // accept file
160  unsorted_file_ = file.Copy();
162 
163  size_t pick_items = std::min(local_items_, wanted_sample_size());
164 
165  sLOG << "Pick" << pick_items << "samples by random access"
166  << " from File containing " << local_items_ << " items.";
167  for (size_t i = 0; i < pick_items; ++i) {
168  size_t index = context_.rng_() % local_items_;
169  sLOG << "got index[" << i << "] = " << index;
170  samples_.emplace_back(
171  unsorted_file_.GetItemAt<ValueType>(index), index);
172  }
173 
174  return true;
175  }
176 
177  void StopPreOp(size_t /* parent_index */) final {
179 
180  LOG0 << "wanted_sample_size()=" << wanted_sample_size()
181  << " samples.size()= " << samples_.size();
182 
183  timer_preop_.Stop();
184  if (stats_enabled) {
186  "Sort() timer_preop_", timer_preop_.SecondsDouble());
188  "Sort() preop local_items_", local_items_);
189  }
190  }
191 
192  DIAMemUse ExecuteMemUse() final {
193  return DIAMemUse::Max();
194  }
195 
196  //! Executes the sum operation.
197  void Execute() final {
198  MainOp();
199  if (stats_enabled) {
201  "Sort() timer_execute", timer_execute_.SecondsDouble());
202  }
203  }
204 
205  DIAMemUse PushDataMemUse() final {
206  if (files_.size() <= 1) {
207  // direct push, no merge necessary
208  return 0;
209  }
210  else {
211  // need to perform multiway merging
212  return DIAMemUse::Max();
213  }
214  }
215 
216  void PushData(bool consume) final {
217  Timer timer_pushdata;
218  timer_pushdata.Start();
219 
220  size_t local_size = 0;
221  if (files_.size() == 0) {
222  // nothing to push
223  }
224  else if (files_.size() == 1) {
225  local_size = files_[0].num_items();
226  this->PushFile(files_[0], consume);
227  }
228  else {
229  size_t merge_degree, prefetch;
230 
231  // merge batches of files if necessary
232  while (std::tie(merge_degree, prefetch) =
234  files_.size() > merge_degree)
235  {
236  PartialMultiwayMerge(merge_degree, prefetch);
237  }
238 
239  sLOGC(context_.my_rank() == 0)
240  << "Start multi-way-merge of" << files_.size() << "files"
241  << "with prefetch" << prefetch;
242 
243  // construct output merger of remaining Files
244  std::vector<data::File::Reader> seq;
245  seq.reserve(files_.size());
246 
247  for (size_t t = 0; t < files_.size(); ++t) {
248  seq.emplace_back(
249  files_[t].GetReader(consume, /* prefetch */ 0));
250  }
251 
252  StartPrefetch(seq, prefetch);
253 
254  auto puller = MakeMultiwayMergeTree()(
255  seq.begin(), seq.end(), compare_function_);
256 
257  while (puller.HasNext()) {
258  this->PushItem(puller.Next());
259  local_size++;
260  }
261  }
262 
263  timer_pushdata.Stop();
264 
265  if (stats_enabled) {
267  "Sort() timer_pushdata", timer_pushdata.SecondsDouble());
268 
269  context_.PrintCollectiveMeanStdev("Sort() local_size", local_size);
270  }
271  }
272 
273  void Dispose() final {
274  files_.clear();
275  }
276 
277 private:
278  //! The comparison function which is applied to two elements.
279  CompareFunction compare_function_;
280 
281  //! Sort function class
282  SortAlgorithm sort_algorithm_;
283 
284  //! Whether the parent stack is empty
286 
287  //! \name PreOp Phase
288  //! \{
289 
290  //! All local unsorted items before communication
292  //! Writer for unsorted_file_
294  //! Number of items on this worker
295  size_t local_items_ = 0;
296 
297  //! epsilon
298  static constexpr double desired_imbalance_ = 0.1;
299 
300  //! Sample vector: pairs of (sample,local index)
301  std::vector<SampleIndexPair> samples_;
302  //! Reservoir sampler
305  };
306  //! calculate currently desired number of samples
307  size_t wanted_sample_size() const {
308  return res_sampler_.calc_sample_size(local_items_);
309  }
310 
311  //! \}
312 
313  //! \name MainOp and PushData
314  //! \{
315 
316  //! Local data files
317  std::vector<data::File> files_;
318  //! Total number of local elements after communication
319  size_t local_out_size_ = 0;
320 
321  //! \}
322 
323  //! \name Statistics
324  //! \{
325 
326  //! time spent in PreOp (including preceding Node's computation)
328 
329  //! time spent in Execute
331 
332  //! time spent in sort()
334 
335  //! \}
336 
338  std::vector<SampleIndexPair>& splitters, size_t sample_size,
339  data::MixStreamPtr& sample_stream,
340  data::MixStream::Writers& sample_writers) {
341 
342  // Get samples from other workers
343  size_t num_total_workers = context_.num_workers();
344 
345  std::vector<SampleIndexPair> samples;
346  samples.reserve(sample_size * num_total_workers);
347 
348  auto reader = sample_stream->GetMixReader(/* consume */ true);
349 
350  while (reader.HasNext()) {
351  samples.push_back(reader.template Next<SampleIndexPair>());
352  }
353  if (samples.size() == 0) return;
354 
355  LOG << "FindAndSendSplitters() samples.size()=" << samples.size();
356 
357  // Find splitters
358  std::sort(samples.begin(), samples.end(),
359  [this](
360  const SampleIndexPair& a, const SampleIndexPair& b) {
361  return LessSampleIndex(a, b);
362  });
363 
364  double splitting_size = static_cast<double>(samples.size())
365  / static_cast<double>(num_total_workers);
366 
367  // Send splitters to other workers
368  for (size_t i = 1; i < num_total_workers; ++i) {
369  splitters.push_back(
370  samples[static_cast<size_t>(i * splitting_size)]);
371  for (size_t j = 1; j < num_total_workers; j++) {
372  sample_writers[j].Put(splitters.back());
373  }
374  }
375 
376  for (size_t j = 1; j < num_total_workers; ++j)
377  sample_writers[j].Close();
378  }
379 
381  {
382  public:
383  ValueType* tree_;
385  size_t index_ = 0;
386  size_t ssplitter_;
387 
388  /*!
389  * Target: tree. Size of 'number of splitter'
390  * Source: sorted splitters. Size of 'number of splitter'
391  * Number of splitter
392  */
393  TreeBuilder(ValueType* splitter_tree,
394  const SampleIndexPair* samples,
395  size_t ssplitter)
396  : tree_(splitter_tree),
397  samples_(samples),
398  ssplitter_(ssplitter) {
399  if (ssplitter != 0)
400  recurse(samples, samples + ssplitter, 1);
401  }
402 
403  void recurse(const SampleIndexPair* lo, const SampleIndexPair* hi,
404  unsigned int treeidx) {
405  // pick middle element as splitter
406  const SampleIndexPair* mid = lo + (ssize_t)(hi - lo) / 2;
407  assert(mid < samples_ + ssplitter_);
408  tree_[treeidx] = mid->first;
409 
410  if (2 * treeidx < ssplitter_)
411  {
412  const SampleIndexPair* midlo = mid, * midhi = mid + 1;
413  recurse(lo, midlo, 2 * treeidx + 0);
414  recurse(midhi, hi, 2 * treeidx + 1);
415  }
416  }
417  };
418 
420  return compare_function_(a.first, b.first) || (
421  !compare_function_(b.first, a.first) && a.second < b.second);
422  }
423 
425  return !compare_function_(a.first, b.first) && a.second >= b.second;
426  }
427 
428  //! round n down by k where k is a power of two.
429  template <typename Integral>
430  static inline size_t RoundDown(Integral n, Integral k) {
431  return (n & ~(k - 1));
432  }
433 
435  // Tree of splitters, sizeof |splitter|
436  const ValueType* const tree,
437  // Number of buckets: k = 2^{log_k}
438  size_t k,
439  size_t log_k,
440  // Number of actual workers to send to
441  size_t actual_k,
442  const SampleIndexPair* const sorted_splitters,
443  size_t prefix_items,
444  TranmissionStreamPtr& data_stream) {
445 
446  data::File::ConsumeReader unsorted_reader =
448 
449  auto data_writers = data_stream->GetWriters();
450 
451  // enlarge emitters array to next power of two to have direct access,
452  // because we fill the splitter set up with sentinels == last splitter,
453  // hence all items land in the last bucket.
454  assert(data_writers.size() == actual_k);
455  assert(actual_k <= k);
456 
457  data_writers.reserve(k);
458  while (data_writers.size() < k)
459  data_writers.emplace_back(typename TranmissionStreamType::Writer());
460 
461  std::swap(data_writers[actual_k - 1], data_writers[k - 1]);
462 
463  // classify all items (take two at once) and immediately transmit them.
464 
465  const size_t stepsize = 2;
466 
467  size_t i = prefix_items;
468  for ( ; i < prefix_items + RoundDown(local_items_, stepsize); i += stepsize)
469  {
470  // take two items
471  size_t j0 = 1;
472  ValueType el0 = unsorted_reader.Next<ValueType>();
473 
474  size_t j1 = 1;
475  ValueType el1 = unsorted_reader.Next<ValueType>();
476 
477  // run items down the tree
478  for (size_t l = 0; l < log_k; l++)
479  {
480  j0 = 2 * j0 + (compare_function_(el0, tree[j0]) ? 0 : 1);
481  j1 = 2 * j1 + (compare_function_(el1, tree[j1]) ? 0 : 1);
482  }
483 
484  size_t b0 = j0 - k;
485  size_t b1 = j1 - k;
486 
487  while (b0 && EqualSampleGreaterIndex(
488  sorted_splitters[b0 - 1], SampleIndexPair(el0, i + 0))) {
489  b0--;
490 
491  // LOG0 << "el0 equal match b0 " << b0
492  // << " prefix_items " << prefix_items
493  // << " lhs.first = " << sorted_splitters[b0 - 1].first
494  // << " lhs.second = " << sorted_splitters[b0 - 1].second
495  // << " rhs.first = " << el0
496  // << " rhs.second = " << i;
497  }
498 
499  while (b1 && EqualSampleGreaterIndex(
500  sorted_splitters[b1 - 1], SampleIndexPair(el1, i + 1))) {
501  b1--;
502  }
503 
504  assert(data_writers[b0].IsValid());
505  assert(data_writers[b1].IsValid());
506 
507  data_writers[b0].Put(el0);
508  data_writers[b1].Put(el1);
509  }
510 
511  // last iteration of loop if we have an odd number of items.
512  for ( ; i < prefix_items + local_items_; i++)
513  {
514  size_t j0 = 1;
515  ValueType el0 = unsorted_reader.Next<ValueType>();
516 
517  // run item down the tree
518  for (size_t l = 0; l < log_k; l++)
519  {
520  j0 = 2 * j0 + (compare_function_(el0, tree[j0]) ? 0 : 1);
521  }
522 
523  size_t b0 = j0 - k;
524 
525  while (b0 && EqualSampleGreaterIndex(
526  sorted_splitters[b0 - 1], SampleIndexPair(el0, i))) {
527  b0--;
528  }
529 
530  assert(data_writers[b0].IsValid());
531  data_writers[b0].Put(el0);
532  }
533 
534  // implicitly close writers and flush data
535  }
536 
537  void MainOp() {
538  RunTimer timer(timer_execute_);
539 
540  size_t prefix_items = local_items_;
541  size_t total_items = context_.net.ExPrefixSumTotal(prefix_items);
542 
543  size_t num_total_workers = context_.num_workers();
544 
545  sLOG << "worker" << context_.my_rank()
546  << "local_items_" << local_items_
547  << "prefix_items" << prefix_items
548  << "total_items" << total_items
549  << "local sample_.size()" << samples_.size();
550 
551  if (total_items == 0) {
553  << "class" << "SortNode"
554  << "event" << "done"
555  << "workers" << num_total_workers
556  << "local_out_size" << local_out_size_
557  << "balance" << 0
558  << "sample_size" << samples_.size();
559  return;
560  }
561 
562  // stream to send samples to process 0 and receive them back
563  data::MixStreamPtr sample_stream = context_.GetNewMixStream(this);
564 
565  // Send all samples to worker 0.
566  data::MixStream::Writers sample_writers = sample_stream->GetWriters();
567 
568  for (const SampleIndexPair& sample : samples_) {
569  // send samples but add the local prefix to index ranks
570  sample_writers[0].Put(
571  SampleIndexPair(sample.first, prefix_items + sample.second));
572  }
573  sample_writers[0].Close();
574  tlx::vector_free(samples_);
575 
576  // Get the ceiling of log(num_total_workers), as SSSS needs 2^n buckets.
577  size_t ceil_log = tlx::integer_log2_ceil(num_total_workers);
578  size_t workers_algo = size_t(1) << ceil_log;
579  size_t splitter_count_algo = workers_algo - 1;
580 
581  std::vector<SampleIndexPair> splitters;
582  splitters.reserve(workers_algo);
583 
584  if (context_.my_rank() == 0) {
585  FindAndSendSplitters(splitters, samples_.size(),
586  sample_stream, sample_writers);
587  }
588  else {
589  // Close unused emitters
590  for (size_t j = 1; j < num_total_workers; j++) {
591  sample_writers[j].Close();
592  }
594  sample_stream->GetMixReader(/* consume */ true);
595  while (reader.HasNext()) {
596  splitters.push_back(reader.template Next<SampleIndexPair>());
597  }
598  }
599  sample_writers.clear();
600  sample_stream.reset();
601 
602  // code from SS2NPartition, slightly altered
603 
604  std::vector<ValueType> splitter_tree(workers_algo + 1);
605 
606  // add sentinel splitters if fewer nodes than splitters.
607  for (size_t i = num_total_workers; i < workers_algo; i++) {
608  splitters.push_back(splitters.back());
609  }
610 
611  TreeBuilder(splitter_tree.data(),
612  splitters.data(),
613  splitter_count_algo);
614 
615  auto data_stream = context_.template GetNewStream<TranmissionStreamType>(this->dia_id());
616 
617  std::thread thread;
619  // launch receiver thread.
620  thread = common::CreateThread(
621  [this, &data_stream]() {
623  return ReceiveItems(data_stream);
624  });
625  }
626 
628  splitter_tree.data(), // Tree. sizeof |splitter|
629  workers_algo, // Number of buckets
630  ceil_log,
631  num_total_workers,
632  splitters.data(),
633  prefix_items,
634  data_stream);
635 
636  tlx::vector_free(splitter_tree);
637 
639  thread.join();
640  else
641  ReceiveItems(data_stream);
642 
643  data_stream.reset();
644 
645  double balance = 0;
646  if (local_out_size_ > 0) {
647  balance = static_cast<double>(local_out_size_)
648  * static_cast<double>(num_total_workers)
649  / static_cast<double>(total_items);
650  }
651 
652  if (balance > 1) {
653  balance = 1 / balance;
654  }
655 
657  << "class" << "SortNode"
658  << "event" << "done"
659  << "workers" << num_total_workers
660  << "local_out_size" << local_out_size_
661  << "balance" << balance
662  << "sample_size" << samples_.size();
663  }
664 
665  void ReceiveItems(TranmissionStreamPtr& data_stream) {
666 
667  auto reader = data_stream->GetReader(/* consume */ true);
668 
669  LOG0 << "Writing files";
670 
671  // M/2 such that the other half is used to prepare the next bulk
672  size_t capacity = DIABase::mem_limit_ / sizeof(ValueType) / 2;
673  size_t capacity_half = capacity / 2;
674  std::vector<ValueType> vec;
675  vec.reserve(capacity);
676 
677  while (reader.HasNext()) {
678  if (vec.size() < capacity_half ||
679  (vec.size() < capacity && !mem::memory_exceeded)) {
680  vec.push_back(reader.template Next<ValueType>());
681  }
682  else {
683  SortAndWriteToFile(vec);
684  }
685  }
686 
687  if (vec.size())
688  SortAndWriteToFile(vec);
689 
690  if (stats_enabled) {
692  "Sort() timer_sort_", timer_sort_.SecondsDouble());
693  }
694  }
695 
696  void SortAndWriteToFile(std::vector<ValueType>& vec) {
697 
698  LOG << "SortAndWriteToFile() " << vec.size()
699  << " items into file #" << files_.size();
700 
701  die_unless(vec.size() > 0);
702 
703  size_t vec_size = vec.size();
704  local_out_size_ += vec.size();
705 
706  // advise block pool to write out data if necessary
707  // context_.block_pool().AdviseFree(vec.size() * sizeof(ValueType));
708 
709  timer_sort_.Start();
710  sort_algorithm_(vec.begin(), vec.end(), compare_function_);
711  // common::qsort_two_pivots_yaroslavskiy(vec.begin(), vec.end(), compare_function_);
712  // common::qsort_three_pivots(vec.begin(), vec.end(), compare_function_);
713  timer_sort_.Stop();
714 
715  LOG0 << "SortAndWriteToFile() sort took " << timer_sort_;
716 
717  Timer write_time;
718  write_time.Start();
719 
720  files_.emplace_back(context_.GetFile(this));
721  auto writer = files_.back().GetWriter();
722  for (const ValueType& elem : vec) {
723  writer.Put(elem);
724  }
725  writer.Close();
726 
727  write_time.Stop();
728 
729  LOG0 << "SortAndWriteToFile() finished writing files";
730 
731  vec.clear();
732 
733  LOG0 << "SortAndWriteToFile() vector cleared";
734 
736  << "class" << "SortNode"
737  << "event" << "write_file"
738  << "file_num" << (files_.size() - 1)
739  << "items" << vec_size
740  << "timer_sort_" << timer_sort_
741  << "write_time" << write_time;
742  }
743 
744  void PartialMultiwayMerge(size_t merge_degree, size_t prefetch) {
745  sLOG1 << "Partial multi-way-merge of" << files_.size()
746  << "files with degree" << merge_degree
747  << "and prefetch" << prefetch;
748 
749  std::vector<data::File> new_files;
750 
751  // merge batches of merge_degree Files into new_files
752  size_t fi;
753  for (fi = 0; fi + merge_degree < files_.size(); fi += merge_degree) {
754  // create merger for first merge_degree Files
755  std::vector<data::File::ConsumeReader> seq;
756  seq.reserve(merge_degree);
757 
758  for (size_t t = 0; t < merge_degree; ++t) {
759  seq.emplace_back(
760  files_[fi + t].GetConsumeReader(/* prefetch */ 0));
761  }
762 
763  StartPrefetch(seq, prefetch);
764 
765  auto puller = MakeMultiwayMergeTree()(
766  seq.begin(), seq.end(), compare_function_);
767 
768  // create new File for merged items
769  new_files.emplace_back(context_.GetFile(this));
770  auto writer = new_files.back().GetWriter();
771 
772  while (puller.HasNext()) {
773  writer.Put(puller.Next());
774  }
775  writer.Close();
776 
777  // merged files are cleared by the ConsumeReader
778  }
779 
780  // copy remaining files into new_files
781  for ( ; fi < files_.size(); ++fi) {
782  new_files.emplace_back(std::move(files_[fi]));
783  }
784 
785  std::swap(files_, new_files);
786  }
787 };
788 
790 {
791 public:
792  template <typename Iterator, typename CompareFunction>
793  void operator () (Iterator begin, Iterator end, CompareFunction cmp) const {
794  return std::sort(begin, end, cmp);
795  }
796 };
797 
798 template <typename ValueType, typename Stack>
799 template <typename CompareFunction>
800 auto DIA<ValueType, Stack>::Sort(const CompareFunction& compare_function) const {
801  assert(IsValid());
802 
803  using SortNode = api::SortNode<
804  ValueType, CompareFunction, DefaultSortAlgorithm>;
805 
806  static_assert(
807  std::is_convertible<
808  ValueType,
810  "CompareFunction has the wrong input type");
811 
812  static_assert(
813  std::is_convertible<
814  ValueType,
816  "CompareFunction has the wrong input type");
817 
818  static_assert(
819  std::is_convertible<
821  bool>::value,
822  "CompareFunction has the wrong output type (should be bool)");
823 
824  auto node = tlx::make_counting<SortNode>(*this, compare_function);
825 
826  return DIA<ValueType>(node);
827 }
828 
829 template <typename ValueType, typename Stack>
830 template <typename CompareFunction, typename SortAlgorithm>
831 auto DIA<ValueType, Stack>::Sort(const CompareFunction& compare_function,
832  const SortAlgorithm& sort_algorithm) const {
833  assert(IsValid());
834 
835  using SortNode = api::SortNode<
836  ValueType, CompareFunction, SortAlgorithm>;
837 
838  static_assert(
839  std::is_convertible<
840  ValueType,
842  "CompareFunction has the wrong input type");
843 
844  static_assert(
845  std::is_convertible<
846  ValueType,
848  "CompareFunction has the wrong input type");
849 
850  static_assert(
851  std::is_convertible<
853  bool>::value,
854  "CompareFunction has the wrong output type (should be bool)");
855 
856  auto node = tlx::make_counting<SortNode>(
857  *this, compare_function, sort_algorithm);
858 
859  return DIA<ValueType>(node);
860 }
861 
863 {
864 public:
865  template <typename Iterator, typename CompareFunction>
866  void operator () (Iterator begin, Iterator end, CompareFunction cmp) const {
867  return std::stable_sort(begin, end, cmp);
868  }
869 };
870 
871 template <typename ValueType, typename Stack>
872 template <typename CompareFunction>
874  const CompareFunction& compare_function) const {
875 
876  assert(IsValid());
877 
878  using SortStableNode = api::SortNode<
879  ValueType, CompareFunction, DefaultStableSortAlgorithm, /*Stable*/ true>;
880 
881  static_assert(
882  std::is_convertible<
883  ValueType,
885  "CompareFunction has the wrong input type");
886 
887  static_assert(
888  std::is_convertible<
889  ValueType,
891  "CompareFunction has the wrong input type");
892 
893  static_assert(
894  std::is_convertible<
896  bool>::value,
897  "CompareFunction has the wrong output type (should be bool)");
898 
899  auto node = tlx::make_counting<SortStableNode>(*this, compare_function);
900 
901  return DIA<ValueType>(node);
902 }
903 
904 template <typename ValueType, typename Stack>
905 template <typename CompareFunction, typename SortAlgorithm>
907  const CompareFunction& compare_function,
908  const SortAlgorithm& sort_algorithm) const {
909 
910  assert(IsValid());
911 
912  using SortStableNode = api::SortNode<
913  ValueType, CompareFunction, SortAlgorithm, /* Stable */ true>;
914 
915  static_assert(
916  std::is_convertible<
917  ValueType,
919  "CompareFunction has the wrong input type");
920 
921  static_assert(
922  std::is_convertible<
923  ValueType,
925  "CompareFunction has the wrong input type");
926 
927  static_assert(
928  std::is_convertible<
930  bool>::value,
931  "CompareFunction has the wrong output type (should be bool)");
932 
933  auto node = tlx::make_counting<SortStableNode>(
934  *this, compare_function, sort_algorithm);
935 
936  return DIA<ValueType>(node);
937 }
938 
939 } // namespace api
940 } // namespace thrill
941 
942 #endif // !THRILL_API_SORT_HEADER
943 
944 /******************************************************************************/
void StartPrefetch(std::vector< Reader > &readers, size_t prefetch_size)
Take a vector of Readers and prefetch equally from them.
Definition: file.hpp:585
bool HasNext()
HasNext() returns true if at least one more item is available.
SortAlgorithm sort_algorithm_
Sort function class.
Definition: sort.hpp:126
A DIANode which performs a Sort operation.
Definition: sort.hpp:64
void PushFile(data::File &file, bool consume) const
Definition: dia_node.hpp:156
static DIAMemUse Max()
Definition: dia_base.hpp:60
net::FlowControlChannel & net
Definition: context.hpp:446
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
size_t num_items() const
Return the number of items in the file.
Definition: file.hpp:180
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
ValueType_ ValueType
Definition: dia.hpp:152
virtual DIAMemUse ExecuteMemUse()
Amount of RAM used by Execute()
Definition: dia_base.hpp:176
Timer timer_sort_
time spent in sort()
Definition: sort.hpp:333
#define die_unless(X)
Definition: die.hpp:27
std::pair< ValueType, size_t > SampleIndexPair
Definition: sort.hpp:79
void PrintCollectiveMeanStdev(const char *text, const Type &local)
Definition: context.hpp:352
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
Multiway merge tree creation depends on Stable flag.
Definition: sort.hpp:88
std::pair< size_t, size_t > MaxMergeDegreePrefetch(size_t num_files)
Definition: block_pool.cpp:703
bool EqualSampleGreaterIndex(const SampleIndexPair &a, const SampleIndexPair &b)
Definition: sort.hpp:424
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:27
#define sLOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:31
void operator()(Iterator begin, Iterator end, CompareFunction cmp) const
Definition: sort.hpp:793
auto operator()(ReaderIterator seqs_begin, ReaderIterator seqs_end, const Comparator &comp=Comparator())
Definition: sort.hpp:91
RIAA class for running the timer until destruction.
void reset()
release contained pointer, frees object if this is the last reference.
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
Ownership handle onto a MixStream.
Definition: mix_stream.hpp:119
void SortAndWriteToFile(std::vector< ValueType > &vec)
Definition: sort.hpp:696
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
void ReceiveItems(TranmissionStreamPtr &data_stream)
Definition: sort.hpp:665
static constexpr bool stats_enabled
Set this variable to true to enable generation and output of stats.
Definition: sort.hpp:69
static constexpr bool g_debug_push_file
Definition: config.hpp:44
TreeBuilder(ValueType *splitter_tree, const SampleIndexPair *samples, size_t ssplitter)
Target: tree.
Definition: sort.hpp:393
bool memory_exceeded
memory limit exceeded indicator
std::default_random_engine rng_
a random generator
Definition: context.hpp:435
#define sLOG1
Definition: logger.hpp:38
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
void add(const Type &item)
visit item, maybe add it to the sample.
static const bool use_background_thread_
Definition: sort.hpp:116
size_t wanted_sample_size() const
calculate currently desired number of samples
Definition: sort.hpp:307
auto SortStable(const CompareFunction &compare_function=CompareFunction()) const
SortStable is a DOp, which sorts a given DIA stably according to the given compare_function.
Definition: sort.hpp:873
void Close()
custom destructor to close writers is a cyclic fashion
Definition: stream_data.cpp:66
void FindAndSendSplitters(std::vector< SampleIndexPair > &splitters, size_t sample_size, data::MixStreamPtr &sample_stream, data::MixStream::Writers &sample_writers)
Definition: sort.hpp:337
auto Sort(const CompareFunction &compare_function=CompareFunction()) const
Sort is a DOp, which sorts a given DIA according to the given compare_function.
Definition: sort.hpp:800
data::File::Writer unsorted_writer_
Writer for unsorted_file_.
Definition: sort.hpp:293
size_t calc_sample_size(size_t count) const
calculate desired sample size
auto operator()(ReaderIterator seqs_begin, ReaderIterator seqs_end, const Comparator &comp=Comparator())
Definition: sort.hpp:103
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
Reader to retrieve items in unordered sequence from a MixBlockQueue.
ConsumeReader GetConsumeReader(size_t prefetch_size=File::default_prefetch_size_)
Get consuming BlockReader for beginning of File.
Definition: file.cpp:73
File Copy() const
Return a copy of the File (explicit copy-constructor)
Definition: file.cpp:42
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
ItemType GetItemAt(size_t index) const
Definition: file.hpp:528
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
data::File unsorted_file_
All local unsorted items before communication.
Definition: sort.hpp:291
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
const bool parent_stack_empty_
Whether the parent stack is empty.
Definition: sort.hpp:285
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
void PartialMultiwayMerge(size_t merge_degree, size_t prefetch)
Definition: sort.hpp:744
common::FunctionTraits< Function > FunctionTraits
alias for convenience.
Definition: dia.hpp:147
Ownership handle onto a CatStreamData.
Definition: cat_stream.hpp:148
static size_t RoundDown(Integral n, Integral k)
round n down by k where k is a power of two.
Definition: sort.hpp:430
std::thread CreateThread(Args &&...args)
create a std::thread and repeat creation if it fails
Definition: porting.hpp:44
typename std::conditional< Stable, MakeStableMultiwayMergeTree, MakeDefaultMultiwayMergeTree >::type MakeMultiwayMergeTree
Definition: sort.hpp:114
size_t local_out_size_
Total number of local elements after communication.
Definition: sort.hpp:319
High-performance smart pointer used as a wrapping reference counting pointer.
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:283
data::MixStreamPtr GetNewMixStream(size_t dia_id)
Definition: context.cpp:1159
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSumTotal(T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, and delivers the total sum as well...
DIAMemUse mem_limit_
Definition: dia_base.hpp:314
void vector_free(std::vector< Type > &v)
Definition: vector_free.hpp:21
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
static unsigned integer_log2_ceil(int i)
calculate the log2 floor of an integer type
common::JsonLogger logger_
Definition: dia_base.hpp:329
size_t local_worker_id() const
Definition: context.hpp:263
typename std::conditional< Stable, data::CatStream, data::MixStream >::type TranmissionStreamType
Stream type for item transmission depends on Stable flag.
Definition: sort.hpp:84
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
const SampleIndexPair * samples_
Definition: sort.hpp:384
void operator()(Iterator begin, Iterator end, CompareFunction cmp) const
Definition: sort.hpp:866
void TransmitItems(const ValueType *const tree, size_t k, size_t log_k, size_t actual_k, const SampleIndexPair *const sorted_splitters, size_t prefix_items, TranmissionStreamPtr &data_stream)
Definition: sort.hpp:434
void recurse(const SampleIndexPair *lo, const SampleIndexPair *hi, unsigned int treeidx)
Definition: sort.hpp:403
virtual bool OnPreOpFile(const data::File &, size_t)
Definition: dia_base.hpp:168
static constexpr bool debug
Definition: sort.hpp:66
void Close()
Explicitly close the writer.
std::vector< data::File > files_
Local data files.
Definition: sort.hpp:317
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:251
common::StatsTimerBaseStopped< stats_enabled > Timer
Timer or FakeTimer.
Definition: sort.hpp:75
size_t local_items_
Number of items on this worker.
Definition: sort.hpp:295
common::ReservoirSamplingGrow< SampleIndexPair > res_sampler_
Reservoir sampler.
Definition: sort.hpp:303
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
Timer timer_execute_
time spent in Execute
Definition: sort.hpp:330
void SetCpuAffinity(std::thread &thread, size_t cpu_id)
set cpu/core affinity of a thread
Definition: porting.cpp:111
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Definition: file.cpp:63
Context & context_
associated Context
Definition: dia_base.hpp:293
Timer timer_preop_
time spent in PreOp (including preceding Node's computation)
Definition: sort.hpp:327
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
std::vector< SampleIndexPair > samples_
Sample vector: pairs of (sample,local index)
Definition: sort.hpp:301
virtual DIAMemUse PushDataMemUse()
Amount of RAM used by PushData()
Definition: dia_base.hpp:182
bool LessSampleIndex(const SampleIndexPair &a, const SampleIndexPair &b)
Definition: sort.hpp:419
static constexpr double desired_imbalance_
epsilon
Definition: sort.hpp:298
SortNode(const ParentDIA &parent, const CompareFunction &compare_function, const SortAlgorithm &sort_algorithm=SortAlgorithm())
Constructor for a sort node.
Definition: sort.hpp:123
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
Definition: context.hpp:324
const size_t & dia_id() const
return unique id of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213