Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
sort.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/sort.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015 Michael Axtmann <[email protected]>
8  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
9  *
10  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11  ******************************************************************************/
12 
13 #pragma once
14 #ifndef THRILL_API_SORT_HEADER
15 #define THRILL_API_SORT_HEADER
16 
17 #include <thrill/api/context.hpp>
18 #include <thrill/api/dia.hpp>
19 #include <thrill/api/dop_node.hpp>
20 #include <thrill/common/logger.hpp>
21 #include <thrill/common/math.hpp>
23 #include <thrill/common/qsort.hpp>
26 #include <thrill/data/file.hpp>
27 #include <thrill/net/group.hpp>
29 
30 #include <algorithm>
31 #include <cstdlib>
32 #include <deque>
33 #include <functional>
34 #include <numeric>
35 #include <random>
36 #include <type_traits>
37 #include <utility>
38 #include <vector>
39 
40 namespace thrill {
41 namespace api {
42 
43 /*!
44  * A DIANode which performs a Sort operation. Sort sorts a DIA according to a
45  * given compare function
46  *
47  * \tparam ValueType Type of DIA elements
48  *
49  * \tparam CompareFunction Type of the compare function
50  *
51  * \tparam SortAlgorithm Type of the local sort function
52  *
53  * \tparam Stable Whether or not to use stable sorting mechanisms
54  *
55  * \ingroup api_layer
56  */
57 template <
58  typename ValueType,
59  typename CompareFunction,
60  typename SortAlgorithm,
61  bool Stable = false>
62 class SortNode final : public DOpNode<ValueType>
63 {
64  static constexpr bool debug = false;
65 
66  //! Set this variable to true to enable generation and output of stats
67  static constexpr bool stats_enabled = false;
68 
70  using Super::context_;
71 
72  //! Timer or FakeTimer
74  //! RIAA class for running the timer
76 
77  using SampleIndexPair = std::pair<ValueType, size_t>;
78 
79  //! Stream type for item transmission depends on Stable flag
80  using TranmissionStreamType = typename std::conditional<
81  Stable,
84 
85  //! Multiway merge tree creation depends on Stable flag
87  template <typename ReaderIterator,
88  typename Comparator = std::less<ValueType> >
90  ReaderIterator seqs_begin, ReaderIterator seqs_end,
91  const Comparator& comp = Comparator()) {
92 
93  return core::make_multiway_merge_tree<ValueType>(
94  seqs_begin, seqs_end, comp);
95  }
96  };
97 
99  template <typename ReaderIterator,
100  typename Comparator = std::less<ValueType> >
102  ReaderIterator seqs_begin, ReaderIterator seqs_end,
103  const Comparator& comp = Comparator()) {
104 
105  return core::make_stable_multiway_merge_tree<ValueType>(
106  seqs_begin, seqs_end, comp);
107  }
108  };
109 
110  using MakeMultiwayMergeTreeDelegate = typename std::conditional<
111  Stable,
113 
114  static const bool use_background_thread_ = false;
115 
116 public:
117  /*!
118  * Constructor for a sort node.
119  */
120  template <typename ParentDIA>
121  SortNode(const ParentDIA& parent,
122  const CompareFunction& compare_function,
123  const SortAlgorithm& sort_algorithm = SortAlgorithm())
124  : Super(parent.ctx(), "Sort", { parent.id() }, { parent.node() }),
125  compare_function_(compare_function),
126  sort_algorithm_(sort_algorithm),
127  parent_stack_empty_(ParentDIA::stack_empty)
128  {
129  // Hook PreOp(s)
130  auto pre_op_fn = [this](const ValueType& input) {
131  PreOp(input);
132  };
133 
134  auto lop_chain = parent.stack().push(pre_op_fn).fold();
135  parent.node()->AddChild(this, lop_chain);
136  }
137 
138  void StartPreOp(size_t /* id */) final {
139  timer_preop_.Start();
141  }
142 
143  void PreOp(const ValueType& input) {
144  unsorted_writer_.Put(input);
146  local_items_++;
147  }
148 
149  //! Receive a whole data::File of ValueType, but only if our stack is empty.
150  bool OnPreOpFile(const data::File& file, size_t /* parent_index */) final {
151  if (!parent_stack_empty_) {
153  << "Sort rejected File from parent "
154  << "due to non-empty function stack.";
155  return false;
156  }
157 
158  // accept file
159  unsorted_file_ = file.Copy();
161 
162  size_t pick_items = std::min(local_items_, wanted_sample_size());
163 
164  sLOG << "Pick" << pick_items << "samples by random access"
165  << " from File containing " << local_items_ << " items.";
166  for (size_t i = 0; i < pick_items; ++i) {
167  size_t index = context_.rng_() % local_items_;
168  sLOG << "got index[" << i << "] = " << index;
169  samples_.emplace_back(
170  unsorted_file_.GetItemAt<ValueType>(index), index);
171  }
172 
173  return true;
174  }
175 
176  void StopPreOp(size_t /* id */) final {
178 
179  LOG0 << "wanted_sample_size()=" << wanted_sample_size()
180  << " samples.size()= " << samples_.size();
181 
182  timer_preop_.Stop();
183  if (stats_enabled) {
185  "Sort() timer_preop_", timer_preop_.SecondsDouble());
187  "Sort() preop local_items_", local_items_);
188  }
189  }
190 
191  DIAMemUse ExecuteMemUse() final {
192  return DIAMemUse::Max();
193  }
194 
195  //! Executes the sum operation.
196  void Execute() final {
197  MainOp();
198  if (stats_enabled) {
200  "Sort() timer_execute", timer_execute_.SecondsDouble());
201  }
202  }
203 
204  DIAMemUse PushDataMemUse() final {
205  if (files_.size() <= 1) {
206  // direct push, no merge necessary
207  return 0;
208  }
209  else {
210  // need to perform multiway merging
211  return DIAMemUse::Max();
212  }
213  }
214 
215  void PushData(bool consume) final {
216  Timer timer_pushdata;
217  timer_pushdata.Start();
218 
219  size_t local_size = 0;
220  if (files_.size() == 0) {
221  // nothing to push
222  }
223  else if (files_.size() == 1) {
224  local_size = files_[0].num_items();
225  this->PushFile(files_[0], consume);
226  }
227  else {
228  MakeMultiwayMergeTreeDelegate MakeMultiwayMergeTree;
229  size_t merge_degree, prefetch;
230 
231  // merge batches of files if necessary
232  while (std::tie(merge_degree, prefetch) =
234  files_.size() > merge_degree)
235  {
236  sLOG1 << "Partial multi-way-merge of"
237  << merge_degree << "files with prefetch" << prefetch;
238 
239  // create merger for first merge_degree_ Files
240  std::vector<data::File::ConsumeReader> seq;
241  seq.reserve(merge_degree);
242 
243  for (size_t t = 0; t < merge_degree; ++t) {
244  seq.emplace_back(
245  files_[t].GetConsumeReader(/* prefetch */ 0));
246  }
247 
248  StartPrefetch(seq, prefetch);
249 
250  auto puller = MakeMultiwayMergeTree(
251  seq.begin(), seq.end(), compare_function_);
252 
253  // create new File for merged items
254  files_.emplace_back(context_.GetFile(this));
255  auto writer = files_.back().GetWriter();
256 
257  while (puller.HasNext()) {
258  writer.Put(puller.Next());
259  }
260  writer.Close();
261 
262  // this clear is important to release references to the files.
263  seq.clear();
264 
265  // remove merged files
266  files_.erase(files_.begin(), files_.begin() + merge_degree);
267  }
268 
269  sLOGC(context_.my_rank() == 0)
270  << "Start multi-way-merge of" << files_.size() << "files"
271  << "with prefetch" << prefetch;
272 
273  // construct output merger of remaining Files
274  std::vector<data::File::Reader> seq;
275  seq.reserve(files_.size());
276 
277  for (size_t t = 0; t < files_.size(); ++t) {
278  seq.emplace_back(
279  files_[t].GetReader(consume, /* prefetch */ 0));
280  }
281 
282  StartPrefetch(seq, prefetch);
283 
284  auto puller = MakeMultiwayMergeTree(
285  seq.begin(), seq.end(), compare_function_);
286 
287  while (puller.HasNext()) {
288  this->PushItem(puller.Next());
289  local_size++;
290  }
291  }
292 
293  timer_pushdata.Stop();
294 
295  if (stats_enabled) {
297  "Sort() timer_pushdata", timer_pushdata.SecondsDouble());
298 
299  context_.PrintCollectiveMeanStdev("Sort() local_size", local_size);
300  }
301  }
302 
303  void Dispose() final {
304  files_.clear();
305  }
306 
307 private:
308  //! The comparison function which is applied to two elements.
309  CompareFunction compare_function_;
310 
311  //! Sort function class
312  SortAlgorithm sort_algorithm_;
313 
314  //! Whether the parent stack is empty
316 
317  //! \name PreOp Phase
318  //! \{
319 
320  //! All local unsorted items before communication
322  //! Writer for unsorted_file_
324  //! Number of items on this worker
325  size_t local_items_ = 0;
326 
327  //! epsilon
328  static constexpr double desired_imbalance_ = 0.1;
329 
330  //! Sample vector: pairs of (sample,local index)
331  std::vector<SampleIndexPair> samples_;
332  //! Reservoir sampler
335  };
336  //! calculate currently desired number of samples
337  size_t wanted_sample_size() const {
338  return res_sampler_.calc_sample_size(local_items_);
339  }
340 
341  //! \}
342 
343  //! \name MainOp and PushData
344  //! \{
345 
346  //! Local data files
347  std::deque<data::File> files_;
348  //! Total number of local elements after communication
349  size_t local_out_size_ = 0;
350 
351  //! \}
352 
353  //! \name Statistics
354  //! \{
355 
356  //! time spent in PreOp (including preceding Node's computation)
358 
359  //! time spent in Execute
361 
362  //! time spent in sort()
364 
365  //! \}
366 
368  std::vector<SampleIndexPair>& splitters, size_t sample_size,
369  data::MixStreamPtr& sample_stream,
370  data::MixStream::Writers& sample_writers) {
371 
372  // Get samples from other workers
373  size_t num_total_workers = context_.num_workers();
374 
375  std::vector<SampleIndexPair> samples;
376  samples.reserve(sample_size * num_total_workers);
377 
378  auto reader = sample_stream->GetMixReader(/* consume */ true);
379 
380  while (reader.HasNext()) {
381  samples.push_back(reader.template Next<SampleIndexPair>());
382  }
383  if (samples.size() == 0) return;
384 
385  LOG << "FindAndSendSplitters() samples.size()=" << samples.size();
386 
387  // Find splitters
388  std::sort(samples.begin(), samples.end(),
389  [this](
390  const SampleIndexPair& a, const SampleIndexPair& b) {
391  return LessSampleIndex(a, b);
392  });
393 
394  double splitting_size = static_cast<double>(samples.size())
395  / static_cast<double>(num_total_workers);
396 
397  // Send splitters to other workers
398  for (size_t i = 1; i < num_total_workers; ++i) {
399  splitters.push_back(
400  samples[static_cast<size_t>(i * splitting_size)]);
401  for (size_t j = 1; j < num_total_workers; j++) {
402  sample_writers[j].Put(splitters.back());
403  }
404  }
405 
406  for (size_t j = 1; j < num_total_workers; ++j)
407  sample_writers[j].Close();
408  }
409 
411  {
412  public:
413  ValueType* tree_;
415  size_t index_ = 0;
416  size_t ssplitter_;
417 
418  /*!
419  * Target: tree. Size of 'number of splitter'
420  * Source: sorted splitters. Size of 'number of splitter'
421  * Number of splitter
422  */
423  TreeBuilder(ValueType* splitter_tree,
424  const SampleIndexPair* samples,
425  size_t ssplitter)
426  : tree_(splitter_tree),
427  samples_(samples),
428  ssplitter_(ssplitter) {
429  if (ssplitter != 0)
430  recurse(samples, samples + ssplitter, 1);
431  }
432 
433  void recurse(const SampleIndexPair* lo, const SampleIndexPair* hi,
434  unsigned int treeidx) {
435  // pick middle element as splitter
436  const SampleIndexPair* mid = lo + (ssize_t)(hi - lo) / 2;
437  assert(mid < samples_ + ssplitter_);
438  tree_[treeidx] = mid->first;
439 
440  if (2 * treeidx < ssplitter_)
441  {
442  const SampleIndexPair* midlo = mid, * midhi = mid + 1;
443  recurse(lo, midlo, 2 * treeidx + 0);
444  recurse(midhi, hi, 2 * treeidx + 1);
445  }
446  }
447  };
448 
450  return compare_function_(a.first, b.first) || (
451  !compare_function_(b.first, a.first) && a.second < b.second);
452  }
453 
455  return !compare_function_(a.first, b.first) && a.second >= b.second;
456  }
457 
458  //! round n down by k where k is a power of two.
459  template <typename Integral>
460  static inline size_t RoundDown(Integral n, Integral k) {
461  return (n & ~(k - 1));
462  }
463 
465  // Tree of splitters, sizeof |splitter|
466  const ValueType* const tree,
467  // Number of buckets: k = 2^{log_k}
468  size_t k,
469  size_t log_k,
470  // Number of actual workers to send to
471  size_t actual_k,
472  const SampleIndexPair* const sorted_splitters,
473  size_t prefix_items,
474  TranmissionStreamPtr& data_stream) {
475 
476  data::File::ConsumeReader unsorted_reader =
478 
479  auto data_writers = data_stream->GetWriters();
480 
481  // enlarge emitters array to next power of two to have direct access,
482  // because we fill the splitter set up with sentinels == last splitter,
483  // hence all items land in the last bucket.
484  assert(data_writers.size() == actual_k);
485  assert(actual_k <= k);
486 
487  data_writers.reserve(k);
488  while (data_writers.size() < k)
489  data_writers.emplace_back(typename TranmissionStreamType::Writer());
490 
491  std::swap(data_writers[actual_k - 1], data_writers[k - 1]);
492 
493  // classify all items (take two at once) and immediately transmit them.
494 
495  const size_t stepsize = 2;
496 
497  size_t i = prefix_items;
498  for ( ; i < prefix_items + RoundDown(local_items_, stepsize); i += stepsize)
499  {
500  // take two items
501  size_t j0 = 1;
502  ValueType el0 = unsorted_reader.Next<ValueType>();
503 
504  size_t j1 = 1;
505  ValueType el1 = unsorted_reader.Next<ValueType>();
506 
507  // run items down the tree
508  for (size_t l = 0; l < log_k; l++)
509  {
510  j0 = 2 * j0 + (compare_function_(el0, tree[j0]) ? 0 : 1);
511  j1 = 2 * j1 + (compare_function_(el1, tree[j1]) ? 0 : 1);
512  }
513 
514  size_t b0 = j0 - k;
515  size_t b1 = j1 - k;
516 
517  while (b0 && EqualSampleGreaterIndex(
518  sorted_splitters[b0 - 1], SampleIndexPair(el0, i + 0))) {
519  b0--;
520 
521  // LOG0 << "el0 equal match b0 " << b0
522  // << " prefix_items " << prefix_items
523  // << " lhs.first = " << sorted_splitters[b0 - 1].first
524  // << " lhs.second = " << sorted_splitters[b0 - 1].second
525  // << " rhs.first = " << el0
526  // << " rhs.second = " << i;
527  }
528 
529  while (b1 && EqualSampleGreaterIndex(
530  sorted_splitters[b1 - 1], SampleIndexPair(el1, i + 1))) {
531  b1--;
532  }
533 
534  assert(data_writers[b0].IsValid());
535  assert(data_writers[b1].IsValid());
536 
537  data_writers[b0].Put(el0);
538  data_writers[b1].Put(el1);
539  }
540 
541  // last iteration of loop if we have an odd number of items.
542  for ( ; i < prefix_items + local_items_; i++)
543  {
544  size_t j0 = 1;
545  ValueType el0 = unsorted_reader.Next<ValueType>();
546 
547  // run item down the tree
548  for (size_t l = 0; l < log_k; l++)
549  {
550  j0 = 2 * j0 + (compare_function_(el0, tree[j0]) ? 0 : 1);
551  }
552 
553  size_t b0 = j0 - k;
554 
555  while (b0 && EqualSampleGreaterIndex(
556  sorted_splitters[b0 - 1], SampleIndexPair(el0, i))) {
557  b0--;
558  }
559 
560  assert(data_writers[b0].IsValid());
561  data_writers[b0].Put(el0);
562  }
563 
564  // implicitly close writers and flush data
565  }
566 
567  void MainOp() {
568  RunTimer timer(timer_execute_);
569 
570  size_t prefix_items = local_items_;
571  size_t total_items = context_.net.ExPrefixSumTotal(prefix_items);
572 
573  size_t num_total_workers = context_.num_workers();
574 
575  sLOG << "worker" << context_.my_rank()
576  << "local_items_" << local_items_
577  << "prefix_items" << prefix_items
578  << "total_items" << total_items
579  << "local sample_.size()" << samples_.size();
580 
581  if (total_items == 0) {
583  << "class" << "SortNode"
584  << "event" << "done"
585  << "workers" << num_total_workers
586  << "local_out_size" << local_out_size_
587  << "balance" << 0
588  << "sample_size" << samples_.size();
589  return;
590  }
591 
592  // stream to send samples to process 0 and receive them back
593  data::MixStreamPtr sample_stream = context_.GetNewMixStream(this);
594 
595  // Send all samples to worker 0.
596  data::MixStream::Writers sample_writers = sample_stream->GetWriters();
597 
598  for (const SampleIndexPair& sample : samples_) {
599  // send samples but add the local prefix to index ranks
600  sample_writers[0].Put(
601  SampleIndexPair(sample.first, prefix_items + sample.second));
602  }
603  sample_writers[0].Close();
604  std::vector<SampleIndexPair>().swap(samples_);
605 
606  // Get the ceiling of log(num_total_workers), as SSSS needs 2^n buckets.
607  size_t ceil_log = tlx::integer_log2_ceil(num_total_workers);
608  size_t workers_algo = size_t(1) << ceil_log;
609  size_t splitter_count_algo = workers_algo - 1;
610 
611  std::vector<SampleIndexPair> splitters;
612  splitters.reserve(workers_algo);
613 
614  if (context_.my_rank() == 0) {
615  FindAndSendSplitters(splitters, samples_.size(),
616  sample_stream, sample_writers);
617  }
618  else {
619  // Close unused emitters
620  for (size_t j = 1; j < num_total_workers; j++) {
621  sample_writers[j].Close();
622  }
624  sample_stream->GetMixReader(/* consume */ true);
625  while (reader.HasNext()) {
626  splitters.push_back(reader.template Next<SampleIndexPair>());
627  }
628  }
629  sample_writers.clear();
630  sample_stream.reset();
631 
632  // code from SS2NPartition, slightly altered
633 
634  std::vector<ValueType> splitter_tree(workers_algo + 1);
635 
636  // add sentinel splitters if fewer nodes than splitters.
637  for (size_t i = num_total_workers; i < workers_algo; i++) {
638  splitters.push_back(splitters.back());
639  }
640 
641  TreeBuilder(splitter_tree.data(),
642  splitters.data(),
643  splitter_count_algo);
644 
645  auto data_stream = context_.template GetNewStream<TranmissionStreamType>(this->id());
646 
647  std::thread thread;
649  // launch receiver thread.
650  thread = common::CreateThread(
651  [this, &data_stream]() {
653  return ReceiveItems(data_stream);
654  });
655  }
656 
658  splitter_tree.data(), // Tree. sizeof |splitter|
659  workers_algo, // Number of buckets
660  ceil_log,
661  num_total_workers,
662  splitters.data(),
663  prefix_items,
664  data_stream);
665 
666  std::vector<ValueType>().swap(splitter_tree);
667 
669  thread.join();
670  else
671  ReceiveItems(data_stream);
672 
673  data_stream.reset();
674 
675  double balance = 0;
676  if (local_out_size_ > 0) {
677  balance = static_cast<double>(local_out_size_)
678  * static_cast<double>(num_total_workers)
679  / static_cast<double>(total_items);
680  }
681 
682  if (balance > 1) {
683  balance = 1 / balance;
684  }
685 
687  << "class" << "SortNode"
688  << "event" << "done"
689  << "workers" << num_total_workers
690  << "local_out_size" << local_out_size_
691  << "balance" << balance
692  << "sample_size" << samples_.size();
693  }
694 
695  void ReceiveItems(TranmissionStreamPtr& data_stream) {
696 
697  auto reader = data_stream->GetReader(/* consume */ true);
698 
699  LOG0 << "Writing files";
700 
701  // M/2 such that the other half is used to prepare the next bulk
702  size_t capacity = DIABase::mem_limit_ / sizeof(ValueType) / 2;
703  std::vector<ValueType> vec;
704  vec.reserve(capacity);
705 
706  while (reader.HasNext()) {
707  if (!mem::memory_exceeded && vec.size() < capacity) {
708  vec.push_back(reader.template Next<ValueType>());
709  }
710  else {
711  SortAndWriteToFile(vec);
712  }
713  }
714 
715  if (vec.size())
716  SortAndWriteToFile(vec);
717 
718  if (stats_enabled) {
720  "Sort() timer_sort_", timer_sort_.SecondsDouble());
721  }
722  }
723 
724  void SortAndWriteToFile(std::vector<ValueType>& vec) {
725 
726  LOG << "SortAndWriteToFile() " << vec.size()
727  << " items into file #" << files_.size();
728 
729  size_t vec_size = vec.size();
730  local_out_size_ += vec.size();
731 
732  // advise block pool to write out data if necessary
733  // context_.block_pool().AdviseFree(vec.size() * sizeof(ValueType));
734 
735  timer_sort_.Start();
736  sort_algorithm_(vec.begin(), vec.end(), compare_function_);
737  // common::qsort_two_pivots_yaroslavskiy(vec.begin(), vec.end(), compare_function_);
738  // common::qsort_three_pivots(vec.begin(), vec.end(), compare_function_);
739  timer_sort_.Stop();
740 
741  LOG0 << "SortAndWriteToFile() sort took " << timer_sort_;
742 
743  Timer write_time;
744  write_time.Start();
745 
746  files_.emplace_back(context_.GetFile(this));
747  auto writer = files_.back().GetWriter();
748  for (const ValueType& elem : vec) {
749  writer.Put(elem);
750  }
751  writer.Close();
752 
753  write_time.Stop();
754 
755  LOG0 << "SortAndWriteToFile() finished writing files";
756 
757  vec.clear();
758 
759  LOG0 << "SortAndWriteToFile() vector cleared";
760 
762  << "class" << "SortNode"
763  << "event" << "write_file"
764  << "file_num" << (files_.size() - 1)
765  << "items" << vec_size
766  << "timer_sort_" << timer_sort_
767  << "write_time" << write_time;
768  }
769 };
770 
772 {
773 public:
774  template <typename Iterator, typename CompareFunction>
775  void operator () (Iterator begin, Iterator end, CompareFunction cmp) const {
776  return std::sort(begin, end, cmp);
777  }
778 };
779 
780 template <typename ValueType, typename Stack>
781 template <typename CompareFunction>
782 auto DIA<ValueType, Stack>::Sort(const CompareFunction& compare_function) const {
783  assert(IsValid());
784 
785  using SortNode = api::SortNode<
786  ValueType, CompareFunction, DefaultSortAlgorithm>;
787 
788  static_assert(
789  std::is_convertible<
790  ValueType,
792  "CompareFunction has the wrong input type");
793 
794  static_assert(
795  std::is_convertible<
796  ValueType,
798  "CompareFunction has the wrong input type");
799 
800  static_assert(
801  std::is_convertible<
803  bool>::value,
804  "CompareFunction has the wrong output type (should be bool)");
805 
806  auto node = tlx::make_counting<SortNode>(*this, compare_function);
807 
808  return DIA<ValueType>(node);
809 }
810 
811 template <typename ValueType, typename Stack>
812 template <typename CompareFunction, typename SortAlgorithm>
813 auto DIA<ValueType, Stack>::Sort(const CompareFunction& compare_function,
814  const SortAlgorithm& sort_algorithm) const {
815  assert(IsValid());
816 
817  using SortNode = api::SortNode<
818  ValueType, CompareFunction, SortAlgorithm>;
819 
820  static_assert(
821  std::is_convertible<
822  ValueType,
824  "CompareFunction has the wrong input type");
825 
826  static_assert(
827  std::is_convertible<
828  ValueType,
830  "CompareFunction has the wrong input type");
831 
832  static_assert(
833  std::is_convertible<
835  bool>::value,
836  "CompareFunction has the wrong output type (should be bool)");
837 
838  auto node = tlx::make_counting<SortNode>(
839  *this, compare_function, sort_algorithm);
840 
841  return DIA<ValueType>(node);
842 }
843 
845 {
846 public:
847  template <typename Iterator, typename CompareFunction>
848  void operator () (Iterator begin, Iterator end, CompareFunction cmp) const {
849  return std::stable_sort(begin, end, cmp);
850  }
851 };
852 
853 template <typename ValueType, typename Stack>
854 template <typename CompareFunction>
856  const CompareFunction& compare_function) const {
857 
858  assert(IsValid());
859 
860  using SortStableNode = api::SortNode<
861  ValueType, CompareFunction, DefaultStableSortAlgorithm, /*Stable*/ true>;
862 
863  static_assert(
864  std::is_convertible<
865  ValueType,
867  "CompareFunction has the wrong input type");
868 
869  static_assert(
870  std::is_convertible<
871  ValueType,
873  "CompareFunction has the wrong input type");
874 
875  static_assert(
876  std::is_convertible<
878  bool>::value,
879  "CompareFunction has the wrong output type (should be bool)");
880 
881  auto node = tlx::make_counting<SortStableNode>(*this, compare_function);
882 
883  return DIA<ValueType>(node);
884 }
885 
886 template <typename ValueType, typename Stack>
887 template <typename CompareFunction, typename SortAlgorithm>
889  const CompareFunction& compare_function,
890  const SortAlgorithm& sort_algorithm) const {
891 
892  assert(IsValid());
893 
894  using SortStableNode = api::SortNode<
895  ValueType, CompareFunction, SortAlgorithm, /* Stable */ true>;
896 
897  static_assert(
898  std::is_convertible<
899  ValueType,
901  "CompareFunction has the wrong input type");
902 
903  static_assert(
904  std::is_convertible<
905  ValueType,
907  "CompareFunction has the wrong input type");
908 
909  static_assert(
910  std::is_convertible<
912  bool>::value,
913  "CompareFunction has the wrong output type (should be bool)");
914 
915  auto node = tlx::make_counting<SortStableNode>(
916  *this, compare_function, sort_algorithm);
917 
918  return DIA<ValueType>(node);
919 }
920 
921 } // namespace api
922 } // namespace thrill
923 
924 #endif // !THRILL_API_SORT_HEADER
925 
926 /******************************************************************************/
void StartPrefetch(std::vector< Reader > &readers, size_t prefetch_size)
Take a vector of Readers and prefetch equally from them.
Definition: file.hpp:570
bool HasNext()
HasNext() returns true if at least one more item is available.
SortAlgorithm sort_algorithm_
Sort function class.
Definition: sort.hpp:124
A DIANode which performs a Sort operation.
Definition: sort.hpp:62
void PushFile(data::File &file, bool consume) const
Definition: dia_node.hpp:156
static DIAMemUse Max()
Definition: dia_base.hpp:60
net::FlowControlChannel & net
Definition: context.hpp:443
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
size_t num_items() const
Return the number of items in the file.
Definition: file.hpp:180
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
ValueType_ ValueType
Definition: dia.hpp:152
virtual DIAMemUse ExecuteMemUse()
Amount of RAM used by Execute()
Definition: dia_base.hpp:176
Timer timer_sort_
time spent in sort()
Definition: sort.hpp:363
std::pair< ValueType, size_t > SampleIndexPair
Definition: sort.hpp:77
void PrintCollectiveMeanStdev(const char *text, const Type &local)
Definition: context.hpp:349
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:240
Multiway merge tree creation depends on Stable flag.
Definition: sort.hpp:86
std::pair< size_t, size_t > MaxMergeDegreePrefetch(size_t num_files)
Definition: block_pool.cpp:703
bool EqualSampleGreaterIndex(const SampleIndexPair &a, const SampleIndexPair &b)
Definition: sort.hpp:454
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:27
#define sLOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:31
void operator()(Iterator begin, Iterator end, CompareFunction cmp) const
Definition: sort.hpp:775
auto operator()(ReaderIterator seqs_begin, ReaderIterator seqs_end, const Comparator &comp=Comparator())
Definition: sort.hpp:89
RIAA class for running the timer until destruction.
void reset()
release contained pointer, frees object if this is the last reference.
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
Ownership handle onto a MixStream.
Definition: mix_stream.hpp:119
void SortAndWriteToFile(std::vector< ValueType > &vec)
Definition: sort.hpp:724
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
void ReceiveItems(TranmissionStreamPtr &data_stream)
Definition: sort.hpp:695
static constexpr bool stats_enabled
Set this variable to true to enable generation and output of stats.
Definition: sort.hpp:67
static constexpr bool g_debug_push_file
Definition: config.hpp:44
TreeBuilder(ValueType *splitter_tree, const SampleIndexPair *samples, size_t ssplitter)
Target: tree.
Definition: sort.hpp:423
bool memory_exceeded
memory limit exceeded indicator
std::default_random_engine rng_
a random generator
Definition: context.hpp:432
#define sLOG1
Definition: logger.hpp:38
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
void add(const Type &item)
visit item, maybe add it to the sample.
static const bool use_background_thread_
Definition: sort.hpp:114
size_t wanted_sample_size() const
calculate currently desired number of samples
Definition: sort.hpp:337
auto SortStable(const CompareFunction &compare_function=CompareFunction()) const
SortStable is a DOp, which sorts a given DIA stably according to the given compare_function.
Definition: sort.hpp:855
void Close()
custom destructor to close writers is a cyclic fashion
Definition: stream_data.cpp:66
void FindAndSendSplitters(std::vector< SampleIndexPair > &splitters, size_t sample_size, data::MixStreamPtr &sample_stream, data::MixStream::Writers &sample_writers)
Definition: sort.hpp:367
const size_t & id() const
return unique id() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213
auto Sort(const CompareFunction &compare_function=CompareFunction()) const
Sort is a DOp, which sorts a given DIA according to the given compare_function.
Definition: sort.hpp:782
data::File::Writer unsorted_writer_
Writer for unsorted_file_.
Definition: sort.hpp:323
size_t calc_sample_size(size_t count) const
calculate desired sample size
auto operator()(ReaderIterator seqs_begin, ReaderIterator seqs_end, const Comparator &comp=Comparator())
Definition: sort.hpp:101
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
Reader to retrieve items in unordered sequence from a MixBlockQueue.
ConsumeReader GetConsumeReader(size_t prefetch_size=File::default_prefetch_size_)
Get consuming BlockReader for beginning of File.
Definition: file.cpp:73
File Copy() const
Return a copy of the File (explicit copy-constructor)
Definition: file.cpp:42
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
ItemType GetItemAt(size_t index) const
Definition: file.hpp:513
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
data::File unsorted_file_
All local unsorted items before communication.
Definition: sort.hpp:321
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
const bool parent_stack_empty_
Whether the parent stack is empty.
Definition: sort.hpp:315
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
common::FunctionTraits< Function > FunctionTraits
alias for convenience.
Definition: dia.hpp:147
Ownership handle onto a CatStreamData.
Definition: cat_stream.hpp:148
static size_t RoundDown(Integral n, Integral k)
round n down by k where k is a power of two.
Definition: sort.hpp:460
std::thread CreateThread(Args &&...args)
create a std::thread and repeat creation if it fails
Definition: porting.hpp:42
size_t local_out_size_
Total number of local elements after communication.
Definition: sort.hpp:349
High-performance smart pointer used as a wrapping reference counting pointer.
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:280
data::MixStreamPtr GetNewMixStream(size_t dia_id)
Definition: context.cpp:1128
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSumTotal(T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, and delivers the total sum as well...
DIAMemUse mem_limit_
Definition: dia_base.hpp:314
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
common::JsonLogger logger_
Definition: dia_base.hpp:329
size_t local_worker_id() const
Definition: context.hpp:260
typename std::conditional< Stable, data::CatStream, data::MixStream >::type TranmissionStreamType
Stream type for item transmission depends on Stable flag.
Definition: sort.hpp:82
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
const SampleIndexPair * samples_
Definition: sort.hpp:414
void operator()(Iterator begin, Iterator end, CompareFunction cmp) const
Definition: sort.hpp:848
void TransmitItems(const ValueType *const tree, size_t k, size_t log_k, size_t actual_k, const SampleIndexPair *const sorted_splitters, size_t prefix_items, TranmissionStreamPtr &data_stream)
Definition: sort.hpp:464
void recurse(const SampleIndexPair *lo, const SampleIndexPair *hi, unsigned int treeidx)
Definition: sort.hpp:433
virtual bool OnPreOpFile(const data::File &, size_t)
Definition: dia_base.hpp:168
static constexpr bool debug
Definition: sort.hpp:64
void Close()
Explicitly close the writer.
unsigned integer_log2_ceil(int i)
calculate the log2 ceiling of an integer type (by repeated bit shifts)
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:248
common::StatsTimerBaseStopped< stats_enabled > Timer
Timer or FakeTimer.
Definition: sort.hpp:73
size_t local_items_
Number of items on this worker.
Definition: sort.hpp:325
common::ReservoirSamplingGrow< SampleIndexPair > res_sampler_
Reservoir sampler.
Definition: sort.hpp:333
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
Timer timer_execute_
time spent in Execute
Definition: sort.hpp:360
void SetCpuAffinity(std::thread &thread, size_t cpu_id)
set cpu/core affinity of a thread
Definition: porting.cpp:110
typename std::conditional< Stable, MakeStableMultiwayMergeTree, MakeDefaultMultiwayMergeTree >::type MakeMultiwayMergeTreeDelegate
Definition: sort.hpp:112
std::deque< data::File > files_
Local data files.
Definition: sort.hpp:347
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Definition: file.cpp:63
Context & context_
associated Context
Definition: dia_base.hpp:293
Timer timer_preop_
time spent in PreOp (including preceding Node's computation)
Definition: sort.hpp:357
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
std::vector< SampleIndexPair > samples_
Sample vector: pairs of (sample,local index)
Definition: sort.hpp:331
virtual DIAMemUse PushDataMemUse()
Amount of RAM used by PushData()
Definition: dia_base.hpp:182
bool LessSampleIndex(const SampleIndexPair &a, const SampleIndexPair &b)
Definition: sort.hpp:449
static constexpr double desired_imbalance_
epsilon
Definition: sort.hpp:328
SortNode(const ParentDIA &parent, const CompareFunction &compare_function, const SortAlgorithm &sort_algorithm=SortAlgorithm())
Constructor for a sort node.
Definition: sort.hpp:121
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
Definition: context.hpp:321