Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
sort.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/sort.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015 Michael Axtmann <[email protected]>
8  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
9  *
10  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11  ******************************************************************************/
12 
13 #pragma once
14 #ifndef THRILL_API_SORT_HEADER
15 #define THRILL_API_SORT_HEADER
16 
17 #include <thrill/api/context.hpp>
18 #include <thrill/api/dia.hpp>
19 #include <thrill/api/dop_node.hpp>
20 #include <thrill/common/logger.hpp>
21 #include <thrill/common/math.hpp>
23 #include <thrill/common/qsort.hpp>
26 #include <thrill/data/file.hpp>
27 #include <thrill/net/group.hpp>
29 
30 #include <algorithm>
31 #include <cstdlib>
32 #include <deque>
33 #include <functional>
34 #include <numeric>
35 #include <random>
36 #include <type_traits>
37 #include <utility>
38 #include <vector>
39 
40 namespace thrill {
41 namespace api {
42 
43 /*!
44  * A DIANode which performs a Sort operation. Sort sorts a DIA according to a
45  * given compare function
46  *
47  * \tparam ValueType Type of DIA elements
48  *
49  * \tparam CompareFunction Type of the compare function
50  *
51  * \tparam SortAlgorithm Type of the local sort function
52  *
53  * \tparam Stable Whether or not to use stable sorting mechanisms
54  *
55  * \ingroup api_layer
56  */
57 template <
58  typename ValueType,
59  typename CompareFunction,
60  typename SortAlgorithm,
61  bool Stable = false>
62 class SortNode final : public DOpNode<ValueType>
63 {
64  static constexpr bool debug = false;
65 
66  //! Set this variable to true to enable generation and output of stats
67  static constexpr bool stats_enabled = false;
68 
70  using Super::context_;
71 
72  //! Timer or FakeTimer
74  //! RIAA class for running the timer
76 
77  using SampleIndexPair = std::pair<ValueType, size_t>;
78 
79  //! Stream type for item transmission depends on Stable flag
80  using TranmissionStreamType = typename std::conditional<
81  Stable,
84 
85  //! Multiway merge tree creation depends on Stable flag
87  template <typename ReaderIterator,
88  typename Comparator = std::less<ValueType> >
90  ReaderIterator seqs_begin, ReaderIterator seqs_end,
91  const Comparator& comp = Comparator()) {
92 
93  return core::make_multiway_merge_tree<ValueType>(
94  seqs_begin, seqs_end, comp);
95  }
96  };
97 
99  template <typename ReaderIterator,
100  typename Comparator = std::less<ValueType> >
102  ReaderIterator seqs_begin, ReaderIterator seqs_end,
103  const Comparator& comp = Comparator()) {
104 
105  return core::make_stable_multiway_merge_tree<ValueType>(
106  seqs_begin, seqs_end, comp);
107  }
108  };
109 
110  using MakeMultiwayMergeTreeDelegate = typename std::conditional<
111  Stable,
113 
114  static const bool use_background_thread_ = false;
115 
116 public:
117  /*!
118  * Constructor for a sort node.
119  */
120  template <typename ParentDIA>
121  SortNode(const ParentDIA& parent,
122  const CompareFunction& compare_function,
123  const SortAlgorithm& sort_algorithm = SortAlgorithm())
124  : Super(parent.ctx(), "Sort", { parent.id() }, { parent.node() }),
125  compare_function_(compare_function),
126  sort_algorithm_(sort_algorithm),
127  parent_stack_empty_(ParentDIA::stack_empty) {
128  // Hook PreOp(s)
129  auto pre_op_fn = [this](const ValueType& input) {
130  PreOp(input);
131  };
132 
133  auto lop_chain = parent.stack().push(pre_op_fn).fold();
134  parent.node()->AddChild(this, lop_chain);
135  }
136 
137  void StartPreOp(size_t /* parent_index */) final {
138  timer_preop_.Start();
140  }
141 
142  void PreOp(const ValueType& input) {
143  unsorted_writer_.Put(input);
145  local_items_++;
146  }
147 
148  //! Receive a whole data::File of ValueType, but only if our stack is empty.
149  bool OnPreOpFile(const data::File& file, size_t /* parent_index */) final {
150  if (!parent_stack_empty_) {
152  << "Sort rejected File from parent "
153  << "due to non-empty function stack.";
154  return false;
155  }
156 
157  // accept file
158  unsorted_file_ = file.Copy();
160 
161  size_t pick_items = std::min(local_items_, wanted_sample_size());
162 
163  sLOG << "Pick" << pick_items << "samples by random access"
164  << " from File containing " << local_items_ << " items.";
165  for (size_t i = 0; i < pick_items; ++i) {
166  size_t index = context_.rng_() % local_items_;
167  sLOG << "got index[" << i << "] = " << index;
168  samples_.emplace_back(
169  unsorted_file_.GetItemAt<ValueType>(index), index);
170  }
171 
172  return true;
173  }
174 
175  void StopPreOp(size_t /* parent_index */) final {
177 
178  LOG0 << "wanted_sample_size()=" << wanted_sample_size()
179  << " samples.size()= " << samples_.size();
180 
181  timer_preop_.Stop();
182  if (stats_enabled) {
184  "Sort() timer_preop_", timer_preop_.SecondsDouble());
186  "Sort() preop local_items_", local_items_);
187  }
188  }
189 
190  DIAMemUse ExecuteMemUse() final {
191  return DIAMemUse::Max();
192  }
193 
194  //! Executes the sum operation.
195  void Execute() final {
196  MainOp();
197  if (stats_enabled) {
199  "Sort() timer_execute", timer_execute_.SecondsDouble());
200  }
201  }
202 
203  DIAMemUse PushDataMemUse() final {
204  if (files_.size() <= 1) {
205  // direct push, no merge necessary
206  return 0;
207  }
208  else {
209  // need to perform multiway merging
210  return DIAMemUse::Max();
211  }
212  }
213 
214  void PushData(bool consume) final {
215  Timer timer_pushdata;
216  timer_pushdata.Start();
217 
218  size_t local_size = 0;
219  if (files_.size() == 0) {
220  // nothing to push
221  }
222  else if (files_.size() == 1) {
223  local_size = files_[0].num_items();
224  this->PushFile(files_[0], consume);
225  }
226  else {
227  MakeMultiwayMergeTreeDelegate MakeMultiwayMergeTree;
228  size_t merge_degree, prefetch;
229 
230  // merge batches of files if necessary
231  while (std::tie(merge_degree, prefetch) =
233  files_.size() > merge_degree)
234  {
235  sLOG1 << "Partial multi-way-merge of"
236  << merge_degree << "files with prefetch" << prefetch;
237 
238  // create merger for first merge_degree_ Files
239  std::vector<data::File::ConsumeReader> seq;
240  seq.reserve(merge_degree);
241 
242  for (size_t t = 0; t < merge_degree; ++t) {
243  seq.emplace_back(
244  files_[t].GetConsumeReader(/* prefetch */ 0));
245  }
246 
247  StartPrefetch(seq, prefetch);
248 
249  auto puller = MakeMultiwayMergeTree(
250  seq.begin(), seq.end(), compare_function_);
251 
252  // create new File for merged items
253  files_.emplace_back(context_.GetFile(this));
254  auto writer = files_.back().GetWriter();
255 
256  while (puller.HasNext()) {
257  writer.Put(puller.Next());
258  }
259  writer.Close();
260 
261  // this clear is important to release references to the files.
262  seq.clear();
263 
264  // remove merged files
265  files_.erase(files_.begin(), files_.begin() + merge_degree);
266  }
267 
268  sLOGC(context_.my_rank() == 0)
269  << "Start multi-way-merge of" << files_.size() << "files"
270  << "with prefetch" << prefetch;
271 
272  // construct output merger of remaining Files
273  std::vector<data::File::Reader> seq;
274  seq.reserve(files_.size());
275 
276  for (size_t t = 0; t < files_.size(); ++t) {
277  seq.emplace_back(
278  files_[t].GetReader(consume, /* prefetch */ 0));
279  }
280 
281  StartPrefetch(seq, prefetch);
282 
283  auto puller = MakeMultiwayMergeTree(
284  seq.begin(), seq.end(), compare_function_);
285 
286  while (puller.HasNext()) {
287  this->PushItem(puller.Next());
288  local_size++;
289  }
290  }
291 
292  timer_pushdata.Stop();
293 
294  if (stats_enabled) {
296  "Sort() timer_pushdata", timer_pushdata.SecondsDouble());
297 
298  context_.PrintCollectiveMeanStdev("Sort() local_size", local_size);
299  }
300  }
301 
302  void Dispose() final {
303  files_.clear();
304  }
305 
306 private:
307  //! The comparison function which is applied to two elements.
308  CompareFunction compare_function_;
309 
310  //! Sort function class
311  SortAlgorithm sort_algorithm_;
312 
313  //! Whether the parent stack is empty
315 
316  //! \name PreOp Phase
317  //! \{
318 
319  //! All local unsorted items before communication
321  //! Writer for unsorted_file_
323  //! Number of items on this worker
324  size_t local_items_ = 0;
325 
326  //! epsilon
327  static constexpr double desired_imbalance_ = 0.1;
328 
329  //! Sample vector: pairs of (sample,local index)
330  std::vector<SampleIndexPair> samples_;
331  //! Reservoir sampler
334  };
335  //! calculate currently desired number of samples
336  size_t wanted_sample_size() const {
337  return res_sampler_.calc_sample_size(local_items_);
338  }
339 
340  //! \}
341 
342  //! \name MainOp and PushData
343  //! \{
344 
345  //! Local data files
346  std::deque<data::File> files_;
347  //! Total number of local elements after communication
348  size_t local_out_size_ = 0;
349 
350  //! \}
351 
352  //! \name Statistics
353  //! \{
354 
355  //! time spent in PreOp (including preceding Node's computation)
357 
358  //! time spent in Execute
360 
361  //! time spent in sort()
363 
364  //! \}
365 
367  std::vector<SampleIndexPair>& splitters, size_t sample_size,
368  data::MixStreamPtr& sample_stream,
369  data::MixStream::Writers& sample_writers) {
370 
371  // Get samples from other workers
372  size_t num_total_workers = context_.num_workers();
373 
374  std::vector<SampleIndexPair> samples;
375  samples.reserve(sample_size * num_total_workers);
376 
377  auto reader = sample_stream->GetMixReader(/* consume */ true);
378 
379  while (reader.HasNext()) {
380  samples.push_back(reader.template Next<SampleIndexPair>());
381  }
382  if (samples.size() == 0) return;
383 
384  LOG << "FindAndSendSplitters() samples.size()=" << samples.size();
385 
386  // Find splitters
387  std::sort(samples.begin(), samples.end(),
388  [this](
389  const SampleIndexPair& a, const SampleIndexPair& b) {
390  return LessSampleIndex(a, b);
391  });
392 
393  double splitting_size = static_cast<double>(samples.size())
394  / static_cast<double>(num_total_workers);
395 
396  // Send splitters to other workers
397  for (size_t i = 1; i < num_total_workers; ++i) {
398  splitters.push_back(
399  samples[static_cast<size_t>(i * splitting_size)]);
400  for (size_t j = 1; j < num_total_workers; j++) {
401  sample_writers[j].Put(splitters.back());
402  }
403  }
404 
405  for (size_t j = 1; j < num_total_workers; ++j)
406  sample_writers[j].Close();
407  }
408 
410  {
411  public:
412  ValueType* tree_;
414  size_t index_ = 0;
415  size_t ssplitter_;
416 
417  /*!
418  * Target: tree. Size of 'number of splitter'
419  * Source: sorted splitters. Size of 'number of splitter'
420  * Number of splitter
421  */
422  TreeBuilder(ValueType* splitter_tree,
423  const SampleIndexPair* samples,
424  size_t ssplitter)
425  : tree_(splitter_tree),
426  samples_(samples),
427  ssplitter_(ssplitter) {
428  if (ssplitter != 0)
429  recurse(samples, samples + ssplitter, 1);
430  }
431 
432  void recurse(const SampleIndexPair* lo, const SampleIndexPair* hi,
433  unsigned int treeidx) {
434  // pick middle element as splitter
435  const SampleIndexPair* mid = lo + (ssize_t)(hi - lo) / 2;
436  assert(mid < samples_ + ssplitter_);
437  tree_[treeidx] = mid->first;
438 
439  if (2 * treeidx < ssplitter_)
440  {
441  const SampleIndexPair* midlo = mid, * midhi = mid + 1;
442  recurse(lo, midlo, 2 * treeidx + 0);
443  recurse(midhi, hi, 2 * treeidx + 1);
444  }
445  }
446  };
447 
449  return compare_function_(a.first, b.first) || (
450  !compare_function_(b.first, a.first) && a.second < b.second);
451  }
452 
454  return !compare_function_(a.first, b.first) && a.second >= b.second;
455  }
456 
457  //! round n down by k where k is a power of two.
458  template <typename Integral>
459  static inline size_t RoundDown(Integral n, Integral k) {
460  return (n & ~(k - 1));
461  }
462 
464  // Tree of splitters, sizeof |splitter|
465  const ValueType* const tree,
466  // Number of buckets: k = 2^{log_k}
467  size_t k,
468  size_t log_k,
469  // Number of actual workers to send to
470  size_t actual_k,
471  const SampleIndexPair* const sorted_splitters,
472  size_t prefix_items,
473  TranmissionStreamPtr& data_stream) {
474 
475  data::File::ConsumeReader unsorted_reader =
477 
478  auto data_writers = data_stream->GetWriters();
479 
480  // enlarge emitters array to next power of two to have direct access,
481  // because we fill the splitter set up with sentinels == last splitter,
482  // hence all items land in the last bucket.
483  assert(data_writers.size() == actual_k);
484  assert(actual_k <= k);
485 
486  data_writers.reserve(k);
487  while (data_writers.size() < k)
488  data_writers.emplace_back(typename TranmissionStreamType::Writer());
489 
490  std::swap(data_writers[actual_k - 1], data_writers[k - 1]);
491 
492  // classify all items (take two at once) and immediately transmit them.
493 
494  const size_t stepsize = 2;
495 
496  size_t i = prefix_items;
497  for ( ; i < prefix_items + RoundDown(local_items_, stepsize); i += stepsize)
498  {
499  // take two items
500  size_t j0 = 1;
501  ValueType el0 = unsorted_reader.Next<ValueType>();
502 
503  size_t j1 = 1;
504  ValueType el1 = unsorted_reader.Next<ValueType>();
505 
506  // run items down the tree
507  for (size_t l = 0; l < log_k; l++)
508  {
509  j0 = 2 * j0 + (compare_function_(el0, tree[j0]) ? 0 : 1);
510  j1 = 2 * j1 + (compare_function_(el1, tree[j1]) ? 0 : 1);
511  }
512 
513  size_t b0 = j0 - k;
514  size_t b1 = j1 - k;
515 
516  while (b0 && EqualSampleGreaterIndex(
517  sorted_splitters[b0 - 1], SampleIndexPair(el0, i + 0))) {
518  b0--;
519 
520  // LOG0 << "el0 equal match b0 " << b0
521  // << " prefix_items " << prefix_items
522  // << " lhs.first = " << sorted_splitters[b0 - 1].first
523  // << " lhs.second = " << sorted_splitters[b0 - 1].second
524  // << " rhs.first = " << el0
525  // << " rhs.second = " << i;
526  }
527 
528  while (b1 && EqualSampleGreaterIndex(
529  sorted_splitters[b1 - 1], SampleIndexPair(el1, i + 1))) {
530  b1--;
531  }
532 
533  assert(data_writers[b0].IsValid());
534  assert(data_writers[b1].IsValid());
535 
536  data_writers[b0].Put(el0);
537  data_writers[b1].Put(el1);
538  }
539 
540  // last iteration of loop if we have an odd number of items.
541  for ( ; i < prefix_items + local_items_; i++)
542  {
543  size_t j0 = 1;
544  ValueType el0 = unsorted_reader.Next<ValueType>();
545 
546  // run item down the tree
547  for (size_t l = 0; l < log_k; l++)
548  {
549  j0 = 2 * j0 + (compare_function_(el0, tree[j0]) ? 0 : 1);
550  }
551 
552  size_t b0 = j0 - k;
553 
554  while (b0 && EqualSampleGreaterIndex(
555  sorted_splitters[b0 - 1], SampleIndexPair(el0, i))) {
556  b0--;
557  }
558 
559  assert(data_writers[b0].IsValid());
560  data_writers[b0].Put(el0);
561  }
562 
563  // implicitly close writers and flush data
564  }
565 
566  void MainOp() {
567  RunTimer timer(timer_execute_);
568 
569  size_t prefix_items = local_items_;
570  size_t total_items = context_.net.ExPrefixSumTotal(prefix_items);
571 
572  size_t num_total_workers = context_.num_workers();
573 
574  sLOG << "worker" << context_.my_rank()
575  << "local_items_" << local_items_
576  << "prefix_items" << prefix_items
577  << "total_items" << total_items
578  << "local sample_.size()" << samples_.size();
579 
580  if (total_items == 0) {
582  << "class" << "SortNode"
583  << "event" << "done"
584  << "workers" << num_total_workers
585  << "local_out_size" << local_out_size_
586  << "balance" << 0
587  << "sample_size" << samples_.size();
588  return;
589  }
590 
591  // stream to send samples to process 0 and receive them back
592  data::MixStreamPtr sample_stream = context_.GetNewMixStream(this);
593 
594  // Send all samples to worker 0.
595  data::MixStream::Writers sample_writers = sample_stream->GetWriters();
596 
597  for (const SampleIndexPair& sample : samples_) {
598  // send samples but add the local prefix to index ranks
599  sample_writers[0].Put(
600  SampleIndexPair(sample.first, prefix_items + sample.second));
601  }
602  sample_writers[0].Close();
603  std::vector<SampleIndexPair>().swap(samples_);
604 
605  // Get the ceiling of log(num_total_workers), as SSSS needs 2^n buckets.
606  size_t ceil_log = tlx::integer_log2_ceil(num_total_workers);
607  size_t workers_algo = size_t(1) << ceil_log;
608  size_t splitter_count_algo = workers_algo - 1;
609 
610  std::vector<SampleIndexPair> splitters;
611  splitters.reserve(workers_algo);
612 
613  if (context_.my_rank() == 0) {
614  FindAndSendSplitters(splitters, samples_.size(),
615  sample_stream, sample_writers);
616  }
617  else {
618  // Close unused emitters
619  for (size_t j = 1; j < num_total_workers; j++) {
620  sample_writers[j].Close();
621  }
623  sample_stream->GetMixReader(/* consume */ true);
624  while (reader.HasNext()) {
625  splitters.push_back(reader.template Next<SampleIndexPair>());
626  }
627  }
628  sample_writers.clear();
629  sample_stream.reset();
630 
631  // code from SS2NPartition, slightly altered
632 
633  std::vector<ValueType> splitter_tree(workers_algo + 1);
634 
635  // add sentinel splitters if fewer nodes than splitters.
636  for (size_t i = num_total_workers; i < workers_algo; i++) {
637  splitters.push_back(splitters.back());
638  }
639 
640  TreeBuilder(splitter_tree.data(),
641  splitters.data(),
642  splitter_count_algo);
643 
644  auto data_stream = context_.template GetNewStream<TranmissionStreamType>(this->dia_id());
645 
646  std::thread thread;
648  // launch receiver thread.
649  thread = common::CreateThread(
650  [this, &data_stream]() {
652  return ReceiveItems(data_stream);
653  });
654  }
655 
657  splitter_tree.data(), // Tree. sizeof |splitter|
658  workers_algo, // Number of buckets
659  ceil_log,
660  num_total_workers,
661  splitters.data(),
662  prefix_items,
663  data_stream);
664 
665  std::vector<ValueType>().swap(splitter_tree);
666 
668  thread.join();
669  else
670  ReceiveItems(data_stream);
671 
672  data_stream.reset();
673 
674  double balance = 0;
675  if (local_out_size_ > 0) {
676  balance = static_cast<double>(local_out_size_)
677  * static_cast<double>(num_total_workers)
678  / static_cast<double>(total_items);
679  }
680 
681  if (balance > 1) {
682  balance = 1 / balance;
683  }
684 
686  << "class" << "SortNode"
687  << "event" << "done"
688  << "workers" << num_total_workers
689  << "local_out_size" << local_out_size_
690  << "balance" << balance
691  << "sample_size" << samples_.size();
692  }
693 
694  void ReceiveItems(TranmissionStreamPtr& data_stream) {
695 
696  auto reader = data_stream->GetReader(/* consume */ true);
697 
698  LOG0 << "Writing files";
699 
700  // M/2 such that the other half is used to prepare the next bulk
701  size_t capacity = DIABase::mem_limit_ / sizeof(ValueType) / 2;
702  std::vector<ValueType> vec;
703  vec.reserve(capacity);
704 
705  while (reader.HasNext()) {
706  if (!mem::memory_exceeded && vec.size() < capacity) {
707  vec.push_back(reader.template Next<ValueType>());
708  }
709  else {
710  SortAndWriteToFile(vec);
711  }
712  }
713 
714  if (vec.size())
715  SortAndWriteToFile(vec);
716 
717  if (stats_enabled) {
719  "Sort() timer_sort_", timer_sort_.SecondsDouble());
720  }
721  }
722 
723  void SortAndWriteToFile(std::vector<ValueType>& vec) {
724 
725  LOG << "SortAndWriteToFile() " << vec.size()
726  << " items into file #" << files_.size();
727 
728  size_t vec_size = vec.size();
729  local_out_size_ += vec.size();
730 
731  // advise block pool to write out data if necessary
732  // context_.block_pool().AdviseFree(vec.size() * sizeof(ValueType));
733 
734  timer_sort_.Start();
735  sort_algorithm_(vec.begin(), vec.end(), compare_function_);
736  // common::qsort_two_pivots_yaroslavskiy(vec.begin(), vec.end(), compare_function_);
737  // common::qsort_three_pivots(vec.begin(), vec.end(), compare_function_);
738  timer_sort_.Stop();
739 
740  LOG0 << "SortAndWriteToFile() sort took " << timer_sort_;
741 
742  Timer write_time;
743  write_time.Start();
744 
745  files_.emplace_back(context_.GetFile(this));
746  auto writer = files_.back().GetWriter();
747  for (const ValueType& elem : vec) {
748  writer.Put(elem);
749  }
750  writer.Close();
751 
752  write_time.Stop();
753 
754  LOG0 << "SortAndWriteToFile() finished writing files";
755 
756  vec.clear();
757 
758  LOG0 << "SortAndWriteToFile() vector cleared";
759 
761  << "class" << "SortNode"
762  << "event" << "write_file"
763  << "file_num" << (files_.size() - 1)
764  << "items" << vec_size
765  << "timer_sort_" << timer_sort_
766  << "write_time" << write_time;
767  }
768 };
769 
771 {
772 public:
773  template <typename Iterator, typename CompareFunction>
774  void operator () (Iterator begin, Iterator end, CompareFunction cmp) const {
775  return std::sort(begin, end, cmp);
776  }
777 };
778 
779 template <typename ValueType, typename Stack>
780 template <typename CompareFunction>
781 auto DIA<ValueType, Stack>::Sort(const CompareFunction& compare_function) const {
782  assert(IsValid());
783 
784  using SortNode = api::SortNode<
785  ValueType, CompareFunction, DefaultSortAlgorithm>;
786 
787  static_assert(
788  std::is_convertible<
789  ValueType,
791  "CompareFunction has the wrong input type");
792 
793  static_assert(
794  std::is_convertible<
795  ValueType,
797  "CompareFunction has the wrong input type");
798 
799  static_assert(
800  std::is_convertible<
802  bool>::value,
803  "CompareFunction has the wrong output type (should be bool)");
804 
805  auto node = tlx::make_counting<SortNode>(*this, compare_function);
806 
807  return DIA<ValueType>(node);
808 }
809 
810 template <typename ValueType, typename Stack>
811 template <typename CompareFunction, typename SortAlgorithm>
812 auto DIA<ValueType, Stack>::Sort(const CompareFunction& compare_function,
813  const SortAlgorithm& sort_algorithm) const {
814  assert(IsValid());
815 
816  using SortNode = api::SortNode<
817  ValueType, CompareFunction, SortAlgorithm>;
818 
819  static_assert(
820  std::is_convertible<
821  ValueType,
823  "CompareFunction has the wrong input type");
824 
825  static_assert(
826  std::is_convertible<
827  ValueType,
829  "CompareFunction has the wrong input type");
830 
831  static_assert(
832  std::is_convertible<
834  bool>::value,
835  "CompareFunction has the wrong output type (should be bool)");
836 
837  auto node = tlx::make_counting<SortNode>(
838  *this, compare_function, sort_algorithm);
839 
840  return DIA<ValueType>(node);
841 }
842 
844 {
845 public:
846  template <typename Iterator, typename CompareFunction>
847  void operator () (Iterator begin, Iterator end, CompareFunction cmp) const {
848  return std::stable_sort(begin, end, cmp);
849  }
850 };
851 
852 template <typename ValueType, typename Stack>
853 template <typename CompareFunction>
855  const CompareFunction& compare_function) const {
856 
857  assert(IsValid());
858 
859  using SortStableNode = api::SortNode<
860  ValueType, CompareFunction, DefaultStableSortAlgorithm, /*Stable*/ true>;
861 
862  static_assert(
863  std::is_convertible<
864  ValueType,
866  "CompareFunction has the wrong input type");
867 
868  static_assert(
869  std::is_convertible<
870  ValueType,
872  "CompareFunction has the wrong input type");
873 
874  static_assert(
875  std::is_convertible<
877  bool>::value,
878  "CompareFunction has the wrong output type (should be bool)");
879 
880  auto node = tlx::make_counting<SortStableNode>(*this, compare_function);
881 
882  return DIA<ValueType>(node);
883 }
884 
885 template <typename ValueType, typename Stack>
886 template <typename CompareFunction, typename SortAlgorithm>
888  const CompareFunction& compare_function,
889  const SortAlgorithm& sort_algorithm) const {
890 
891  assert(IsValid());
892 
893  using SortStableNode = api::SortNode<
894  ValueType, CompareFunction, SortAlgorithm, /* Stable */ true>;
895 
896  static_assert(
897  std::is_convertible<
898  ValueType,
900  "CompareFunction has the wrong input type");
901 
902  static_assert(
903  std::is_convertible<
904  ValueType,
906  "CompareFunction has the wrong input type");
907 
908  static_assert(
909  std::is_convertible<
911  bool>::value,
912  "CompareFunction has the wrong output type (should be bool)");
913 
914  auto node = tlx::make_counting<SortStableNode>(
915  *this, compare_function, sort_algorithm);
916 
917  return DIA<ValueType>(node);
918 }
919 
920 } // namespace api
921 } // namespace thrill
922 
923 #endif // !THRILL_API_SORT_HEADER
924 
925 /******************************************************************************/
void StartPrefetch(std::vector< Reader > &readers, size_t prefetch_size)
Take a vector of Readers and prefetch equally from them.
Definition: file.hpp:570
bool HasNext()
HasNext() returns true if at least one more item is available.
SortAlgorithm sort_algorithm_
Sort function class.
Definition: sort.hpp:124
A DIANode which performs a Sort operation.
Definition: sort.hpp:62
void PushFile(data::File &file, bool consume) const
Definition: dia_node.hpp:156
static DIAMemUse Max()
Definition: dia_base.hpp:60
net::FlowControlChannel & net
Definition: context.hpp:446
virtual void Dispose()
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: dia_base.hpp:188
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:34
size_t num_items() const
Return the number of items in the file.
Definition: file.hpp:180
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
ValueType_ ValueType
Definition: dia.hpp:152
virtual DIAMemUse ExecuteMemUse()
Amount of RAM used by Execute()
Definition: dia_base.hpp:176
Timer timer_sort_
time spent in sort()
Definition: sort.hpp:362
std::pair< ValueType, size_t > SampleIndexPair
Definition: sort.hpp:77
void PrintCollectiveMeanStdev(const char *text, const Type &local)
Definition: context.hpp:352
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
Multiway merge tree creation depends on Stable flag.
Definition: sort.hpp:86
std::pair< size_t, size_t > MaxMergeDegreePrefetch(size_t num_files)
Definition: block_pool.cpp:703
bool EqualSampleGreaterIndex(const SampleIndexPair &a, const SampleIndexPair &b)
Definition: sort.hpp:453
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:27
#define sLOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:31
void operator()(Iterator begin, Iterator end, CompareFunction cmp) const
Definition: sort.hpp:774
auto operator()(ReaderIterator seqs_begin, ReaderIterator seqs_end, const Comparator &comp=Comparator())
Definition: sort.hpp:89
RIAA class for running the timer until destruction.
void reset()
release contained pointer, frees object if this is the last reference.
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
Ownership handle onto a MixStream.
Definition: mix_stream.hpp:119
void SortAndWriteToFile(std::vector< ValueType > &vec)
Definition: sort.hpp:723
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
void ReceiveItems(TranmissionStreamPtr &data_stream)
Definition: sort.hpp:694
static constexpr bool stats_enabled
Set this variable to true to enable generation and output of stats.
Definition: sort.hpp:67
static constexpr bool g_debug_push_file
Definition: config.hpp:44
TreeBuilder(ValueType *splitter_tree, const SampleIndexPair *samples, size_t ssplitter)
Target: tree.
Definition: sort.hpp:422
bool memory_exceeded
memory limit exceeded indicator
std::default_random_engine rng_
a random generator
Definition: context.hpp:435
#define sLOG1
Definition: logger.hpp:38
virtual void PushData(bool consume)=0
Virtual method for pushing data. Triggers actual pushing in sub-classes.
void add(const Type &item)
visit item, maybe add it to the sample.
static const bool use_background_thread_
Definition: sort.hpp:114
size_t wanted_sample_size() const
calculate currently desired number of samples
Definition: sort.hpp:336
auto SortStable(const CompareFunction &compare_function=CompareFunction()) const
SortStable is a DOp, which sorts a given DIA stably according to the given compare_function.
Definition: sort.hpp:854
void Close()
custom destructor to close writers is a cyclic fashion
Definition: stream_data.cpp:66
void FindAndSendSplitters(std::vector< SampleIndexPair > &splitters, size_t sample_size, data::MixStreamPtr &sample_stream, data::MixStream::Writers &sample_writers)
Definition: sort.hpp:366
auto Sort(const CompareFunction &compare_function=CompareFunction()) const
Sort is a DOp, which sorts a given DIA according to the given compare_function.
Definition: sort.hpp:781
data::File::Writer unsorted_writer_
Writer for unsorted_file_.
Definition: sort.hpp:322
size_t calc_sample_size(size_t count) const
calculate desired sample size
auto operator()(ReaderIterator seqs_begin, ReaderIterator seqs_end, const Comparator &comp=Comparator())
Definition: sort.hpp:101
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
Reader to retrieve items in unordered sequence from a MixBlockQueue.
ConsumeReader GetConsumeReader(size_t prefetch_size=File::default_prefetch_size_)
Get consuming BlockReader for beginning of File.
Definition: file.cpp:73
File Copy() const
Return a copy of the File (explicit copy-constructor)
Definition: file.cpp:42
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
ItemType GetItemAt(size_t index) const
Definition: file.hpp:513
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
data::File unsorted_file_
All local unsorted items before communication.
Definition: sort.hpp:320
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
const bool parent_stack_empty_
Whether the parent stack is empty.
Definition: sort.hpp:314
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
common::FunctionTraits< Function > FunctionTraits
alias for convenience.
Definition: dia.hpp:147
Ownership handle onto a CatStreamData.
Definition: cat_stream.hpp:148
static size_t RoundDown(Integral n, Integral k)
round n down by k where k is a power of two.
Definition: sort.hpp:459
std::thread CreateThread(Args &&...args)
create a std::thread and repeat creation if it fails
Definition: porting.hpp:42
size_t local_out_size_
Total number of local elements after communication.
Definition: sort.hpp:348
High-performance smart pointer used as a wrapping reference counting pointer.
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:283
data::MixStreamPtr GetNewMixStream(size_t dia_id)
Definition: context.cpp:1155
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSumTotal(T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, and delivers the total sum as well...
DIAMemUse mem_limit_
Definition: dia_base.hpp:314
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
static unsigned integer_log2_ceil(int i)
calculate the log2 floor of an integer type
common::JsonLogger logger_
Definition: dia_base.hpp:329
size_t local_worker_id() const
Definition: context.hpp:263
typename std::conditional< Stable, data::CatStream, data::MixStream >::type TranmissionStreamType
Stream type for item transmission depends on Stable flag.
Definition: sort.hpp:82
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
const SampleIndexPair * samples_
Definition: sort.hpp:413
void operator()(Iterator begin, Iterator end, CompareFunction cmp) const
Definition: sort.hpp:847
void TransmitItems(const ValueType *const tree, size_t k, size_t log_k, size_t actual_k, const SampleIndexPair *const sorted_splitters, size_t prefix_items, TranmissionStreamPtr &data_stream)
Definition: sort.hpp:463
void recurse(const SampleIndexPair *lo, const SampleIndexPair *hi, unsigned int treeidx)
Definition: sort.hpp:432
virtual bool OnPreOpFile(const data::File &, size_t)
Definition: dia_base.hpp:168
static constexpr bool debug
Definition: sort.hpp:64
void Close()
Explicitly close the writer.
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:251
common::StatsTimerBaseStopped< stats_enabled > Timer
Timer or FakeTimer.
Definition: sort.hpp:73
size_t local_items_
Number of items on this worker.
Definition: sort.hpp:324
common::ReservoirSamplingGrow< SampleIndexPair > res_sampler_
Reservoir sampler.
Definition: sort.hpp:332
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
Timer timer_execute_
time spent in Execute
Definition: sort.hpp:359
void SetCpuAffinity(std::thread &thread, size_t cpu_id)
set cpu/core affinity of a thread
Definition: porting.cpp:110
typename std::conditional< Stable, MakeStableMultiwayMergeTree, MakeDefaultMultiwayMergeTree >::type MakeMultiwayMergeTreeDelegate
Definition: sort.hpp:112
std::deque< data::File > files_
Local data files.
Definition: sort.hpp:346
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Definition: file.cpp:63
Context & context_
associated Context
Definition: dia_base.hpp:293
Timer timer_preop_
time spent in PreOp (including preceding Node's computation)
Definition: sort.hpp:356
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:21
std::vector< SampleIndexPair > samples_
Sample vector: pairs of (sample,local index)
Definition: sort.hpp:330
virtual DIAMemUse PushDataMemUse()
Amount of RAM used by PushData()
Definition: dia_base.hpp:182
bool LessSampleIndex(const SampleIndexPair &a, const SampleIndexPair &b)
Definition: sort.hpp:448
static constexpr double desired_imbalance_
epsilon
Definition: sort.hpp:327
SortNode(const ParentDIA &parent, const CompareFunction &compare_function, const SortAlgorithm &sort_algorithm=SortAlgorithm())
Constructor for a sort node.
Definition: sort.hpp:121
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
Definition: context.hpp:324
const size_t & dia_id() const
return unique id of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213