Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
sort.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/sort.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2015 Michael Axtmann <[email protected]>
8  * Copyright (C) 2015-2016 Timo Bingmann <[email protected]>
9  *
10  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11  ******************************************************************************/
12 
13 #pragma once
14 #ifndef THRILL_API_SORT_HEADER
15 #define THRILL_API_SORT_HEADER
16 
17 #include <thrill/api/context.hpp>
18 #include <thrill/api/dia.hpp>
19 #include <thrill/api/dop_node.hpp>
20 #include <thrill/common/logger.hpp>
21 #include <thrill/common/math.hpp>
23 #include <thrill/common/qsort.hpp>
26 #include <thrill/data/file.hpp>
27 #include <thrill/net/group.hpp>
29 
30 #include <algorithm>
31 #include <cstdlib>
32 #include <deque>
33 #include <functional>
34 #include <numeric>
35 #include <random>
36 #include <utility>
37 #include <vector>
38 
39 namespace thrill {
40 namespace api {
41 
42 /*!
43  * A DIANode which performs a Sort operation. Sort sorts a DIA according to a
44  * given compare function
45  *
46  * \tparam ValueType Type of DIA elements
47  *
48  * \tparam Stack Function stack, which contains the chained lambdas between the
49  * last and this DIANode.
50  *
51  * \tparam CompareFunction Type of the compare function
52  *
53  * \ingroup api_layer
54  */
55 template <typename ValueType, typename CompareFunction, typename SortAlgorithm>
56 class SortNode final : public DOpNode<ValueType>
57 {
58  static constexpr bool debug = false;
59 
60  //! Set this variable to true to enable generation and output of stats
61  static constexpr bool stats_enabled = false;
62 
64  using Super::context_;
65 
66  //! Timer or FakeTimer
68  //! RIAA class for running the timer
70 
71  using SampleIndexPair = std::pair<ValueType, size_t>;
72 
73  static const bool use_background_thread_ = false;
74 
75 public:
76  /*!
77  * Constructor for a sort node.
78  */
79  template <typename ParentDIA>
80  SortNode(const ParentDIA& parent,
81  const CompareFunction& compare_function,
82  const SortAlgorithm& sort_algorithm = SortAlgorithm())
83  : Super(parent.ctx(), "Sort", { parent.id() }, { parent.node() }),
84  compare_function_(compare_function),
85  sort_algorithm_(sort_algorithm),
86  parent_stack_empty_(ParentDIA::stack_empty)
87  {
88  // Hook PreOp(s)
89  auto pre_op_fn = [this](const ValueType& input) {
90  PreOp(input);
91  };
92 
93  auto lop_chain = parent.stack().push(pre_op_fn).fold();
94  parent.node()->AddChild(this, lop_chain);
95  }
96 
97  void StartPreOp(size_t /* id */) final {
98  timer_preop_.Start();
100  }
101 
102  void PreOp(const ValueType& input) {
103  unsorted_writer_.Put(input);
105  local_items_++;
106  }
107 
108  //! Receive a whole data::File of ValueType, but only if our stack is empty.
109  bool OnPreOpFile(const data::File& file, size_t /* parent_index */) final {
110  if (!parent_stack_empty_) {
112  << "Sort rejected File from parent "
113  << "due to non-empty function stack.";
114  return false;
115  }
116 
117  // accept file
118  unsorted_file_ = file.Copy();
120 
121  size_t pick_items = std::min(local_items_, wanted_sample_size());
122 
123  sLOG << "Pick" << pick_items << "samples by random access"
124  << " from File containing " << local_items_ << " items.";
125  for (size_t i = 0; i < pick_items; ++i) {
126  size_t index = context_.rng_() % local_items_;
127  sLOG << "got index[" << i << "] = " << index;
128  samples_.emplace_back(
129  unsorted_file_.GetItemAt<ValueType>(index), index);
130  }
131 
132  return true;
133  }
134 
135  void StopPreOp(size_t /* id */) final {
137 
138  LOG0 << "wanted_sample_size()=" << wanted_sample_size()
139  << " samples.size()= " << samples_.size();
140 
141  timer_preop_.Stop();
142  if (stats_enabled) {
144  "Sort() timer_preop_", timer_preop_.SecondsDouble());
146  "Sort() preop local_items_", local_items_);
147  }
148  }
149 
150  DIAMemUse ExecuteMemUse() final {
151  return DIAMemUse::Max();
152  }
153 
154  //! Executes the sum operation.
155  void Execute() final {
156  MainOp();
157  if (stats_enabled) {
159  "Sort() timer_execute", timer_execute_.SecondsDouble());
160  }
161  }
162 
163  DIAMemUse PushDataMemUse() final {
164  if (files_.size() <= 1) {
165  // direct push, no merge necessary
166  return 0;
167  }
168  else {
169  // need to perform multiway merging
170  return DIAMemUse::Max();
171  }
172  }
173 
174  //! calculate maximum merging degree from available memory and the number of
175  //! files. additionally calculate the prefetch size of each File.
176  std::pair<size_t, size_t> MaxMergeDegreePrefetch() {
177  size_t avail_blocks = DIABase::mem_limit_ / data::default_block_size;
178  if (files_.size() >= avail_blocks) {
179  // more files than blocks available -> partial merge of avail_blocks
180  // Files with prefetch = 0, which is one read Block per File.
181  return std::make_pair(avail_blocks, 0u);
182  }
183  else {
184  // less files than available Blocks -> split blocks equally among
185  // Files.
186  return std::make_pair(
187  files_.size(),
188  std::min<size_t>(16u, (avail_blocks / files_.size()) - 1));
189  }
190  }
191 
192  void PushData(bool consume) final {
193  Timer timer_pushdata;
194  timer_pushdata.Start();
195 
196  size_t local_size = 0;
197  if (files_.size() == 0) {
198  // nothing to push
199  }
200  else if (files_.size() == 1) {
201  local_size = files_[0].num_items();
202  this->PushFile(files_[0], consume);
203  }
204  else {
205  size_t merge_degree, prefetch;
206 
207  // merge batches of files if necessary
208  while (files_.size() > MaxMergeDegreePrefetch().first)
209  {
210  std::tie(merge_degree, prefetch) = MaxMergeDegreePrefetch();
211 
212  sLOG1 << "Partial multi-way-merge of"
213  << merge_degree << "files with prefetch" << prefetch;
214 
215  // create merger for first merge_degree_ Files
216  std::vector<data::File::ConsumeReader> seq;
217  seq.reserve(merge_degree);
218 
219  for (size_t t = 0; t < merge_degree; ++t)
220  seq.emplace_back(files_[t].GetConsumeReader(0));
221 
222  StartPrefetch(seq, prefetch);
223 
224  auto puller = core::make_multiway_merge_tree<ValueType>(
225  seq.begin(), seq.end(), compare_function_);
226 
227  // create new File for merged items
228  files_.emplace_back(context_.GetFile(this));
229  auto writer = files_.back().GetWriter();
230 
231  while (puller.HasNext()) {
232  writer.Put(puller.Next());
233  }
234  writer.Close();
235 
236  // this clear is important to release references to the files.
237  seq.clear();
238 
239  // remove merged files
240  files_.erase(files_.begin(), files_.begin() + merge_degree);
241  }
242 
243  std::tie(merge_degree, prefetch) = MaxMergeDegreePrefetch();
244 
245  sLOG1 << "Start multi-way-merge of" << files_.size() << "files"
246  << "with prefetch" << prefetch;
247 
248  // construct output merger of remaining Files
249  std::vector<data::File::Reader> seq;
250  seq.reserve(files_.size());
251 
252  for (size_t t = 0; t < files_.size(); ++t)
253  seq.emplace_back(files_[t].GetReader(consume, 0));
254 
255  StartPrefetch(seq, prefetch);
256 
257  auto puller = core::make_multiway_merge_tree<ValueType>(
258  seq.begin(), seq.end(), compare_function_);
259 
260  while (puller.HasNext()) {
261  this->PushItem(puller.Next());
262  local_size++;
263  }
264  }
265 
266  timer_pushdata.Stop();
267 
268  if (stats_enabled) {
270  "Sort() timer_pushdata", timer_pushdata.SecondsDouble());
271 
272  context_.PrintCollectiveMeanStdev("Sort() local_size", local_size);
273  }
274  }
275 
276  void Dispose() final {
277  files_.clear();
278  }
279 
280 private:
281  //! The comparison function which is applied to two elements.
282  CompareFunction compare_function_;
283 
284  //! Sort function class
285  SortAlgorithm sort_algorithm_;
286 
287  //! Whether the parent stack is empty
289 
290  //! \name PreOp Phase
291  //! \{
292 
293  //! All local unsorted items before communication
295  //! Writer for unsorted_file_
297  //! Number of items on this worker
298  size_t local_items_ = 0;
299 
300  //! epsilon
301  static constexpr double desired_imbalance_ = 0.1;
302 
303  //! Sample vector: pairs of (sample,local index)
304  std::vector<SampleIndexPair> samples_;
305  //! Reservoir sampler
308  };
309  //! calculate currently desired number of samples
310  size_t wanted_sample_size() const {
311  return res_sampler_.calc_sample_size(local_items_);
312  }
313 
314  //! \}
315 
316  //! \name MainOp and PushData
317  //! \{
318 
319  //! Local data files
320  std::deque<data::File> files_;
321  //! Total number of local elements after communication
322  size_t local_out_size_ = 0;
323 
324  //! \}
325 
326  //! \name Statistics
327  //! \{
328 
329  //! time spent in PreOp (including preceding Node's computation)
331 
332  //! time spent in Execute
334 
335  //! time spent in sort()
337 
338  //! \}
339 
341  std::vector<SampleIndexPair>& splitters, size_t sample_size,
342  data::MixStreamPtr& sample_stream,
343  data::MixStream::Writers& sample_writers) {
344 
345  // Get samples from other workers
346  size_t num_total_workers = context_.num_workers();
347 
348  std::vector<SampleIndexPair> samples;
349  samples.reserve(sample_size * num_total_workers);
350 
351  auto reader = sample_stream->GetMixReader(/* consume */ true);
352 
353  while (reader.HasNext()) {
354  samples.push_back(reader.template Next<SampleIndexPair>());
355  }
356  if (samples.size() == 0) return;
357 
358  LOG << "FindAndSendSplitters() samples.size()=" << samples.size();
359 
360  // Find splitters
361  std::sort(samples.begin(), samples.end(),
362  [this](
363  const SampleIndexPair& a, const SampleIndexPair& b) {
364  return LessSampleIndex(a, b);
365  });
366 
367  double splitting_size = static_cast<double>(samples.size())
368  / static_cast<double>(num_total_workers);
369 
370  // Send splitters to other workers
371  for (size_t i = 1; i < num_total_workers; ++i) {
372  splitters.push_back(
373  samples[static_cast<size_t>(i * splitting_size)]);
374  for (size_t j = 1; j < num_total_workers; j++) {
375  sample_writers[j].Put(splitters.back());
376  }
377  }
378 
379  for (size_t j = 1; j < num_total_workers; ++j)
380  sample_writers[j].Close();
381  }
382 
384  {
385  public:
386  ValueType* tree_;
388  size_t index_ = 0;
389  size_t ssplitter_;
390 
391  /*!
392  * Target: tree. Size of 'number of splitter'
393  * Source: sorted splitters. Size of 'number of splitter'
394  * Number of splitter
395  */
396  TreeBuilder(ValueType* splitter_tree,
397  const SampleIndexPair* samples,
398  size_t ssplitter)
399  : tree_(splitter_tree),
400  samples_(samples),
401  ssplitter_(ssplitter) {
402  if (ssplitter != 0)
403  recurse(samples, samples + ssplitter, 1);
404  }
405 
406  void recurse(const SampleIndexPair* lo, const SampleIndexPair* hi,
407  unsigned int treeidx) {
408  // pick middle element as splitter
409  const SampleIndexPair* mid = lo + (ssize_t)(hi - lo) / 2;
410  assert(mid < samples_ + ssplitter_);
411  tree_[treeidx] = mid->first;
412 
413  if (2 * treeidx < ssplitter_)
414  {
415  const SampleIndexPair* midlo = mid, * midhi = mid + 1;
416  recurse(lo, midlo, 2 * treeidx + 0);
417  recurse(midhi, hi, 2 * treeidx + 1);
418  }
419  }
420  };
421 
423  return compare_function_(a.first, b.first) || (
424  !compare_function_(b.first, a.first) && a.second < b.second);
425  }
426 
428  return !compare_function_(a.first, b.first) && a.second >= b.second;
429  }
430 
431  //! round n down by k where k is a power of two.
432  template <typename Integral>
433  static inline size_t RoundDown(Integral n, Integral k) {
434  return (n & ~(k - 1));
435  }
436 
438  // Tree of splitters, sizeof |splitter|
439  const ValueType* const tree,
440  // Number of buckets: k = 2^{log_k}
441  size_t k,
442  size_t log_k,
443  // Number of actual workers to send to
444  size_t actual_k,
445  const SampleIndexPair* const sorted_splitters,
446  size_t prefix_items,
447  data::MixStreamPtr& data_stream) {
448 
449  data::File::ConsumeReader unsorted_reader =
451 
452  data::MixStream::Writers data_writers = data_stream->GetWriters();
453 
454  // enlarge emitters array to next power of two to have direct access,
455  // because we fill the splitter set up with sentinels == last splitter,
456  // hence all items land in the last bucket.
457  assert(data_writers.size() == actual_k);
458  assert(actual_k <= k);
459 
460  data_writers.reserve(k);
461  while (data_writers.size() < k)
462  data_writers.emplace_back(data::MixStream::Writer());
463 
464  std::swap(data_writers[actual_k - 1], data_writers[k - 1]);
465 
466  // classify all items (take two at once) and immediately transmit them.
467 
468  const size_t stepsize = 2;
469 
470  size_t i = prefix_items;
471  for ( ; i < prefix_items + RoundDown(local_items_, stepsize); i += stepsize)
472  {
473  // take two items
474  size_t j0 = 1;
475  ValueType el0 = unsorted_reader.Next<ValueType>();
476 
477  size_t j1 = 1;
478  ValueType el1 = unsorted_reader.Next<ValueType>();
479 
480  // run items down the tree
481  for (size_t l = 0; l < log_k; l++)
482  {
483  j0 = 2 * j0 + (compare_function_(el0, tree[j0]) ? 0 : 1);
484  j1 = 2 * j1 + (compare_function_(el1, tree[j1]) ? 0 : 1);
485  }
486 
487  size_t b0 = j0 - k;
488  size_t b1 = j1 - k;
489 
490  while (b0 && EqualSampleGreaterIndex(
491  sorted_splitters[b0 - 1], SampleIndexPair(el0, i + 0))) {
492  b0--;
493 
494  // LOG0 << "el0 equal match b0 " << b0
495  // << " prefix_items " << prefix_items
496  // << " lhs.first = " << sorted_splitters[b0 - 1].first
497  // << " lhs.second = " << sorted_splitters[b0 - 1].second
498  // << " rhs.first = " << el0
499  // << " rhs.second = " << i;
500  }
501 
502  while (b1 && EqualSampleGreaterIndex(
503  sorted_splitters[b1 - 1], SampleIndexPair(el1, i + 1))) {
504  b1--;
505  }
506 
507  assert(data_writers[b0].IsValid());
508  assert(data_writers[b1].IsValid());
509 
510  data_writers[b0].Put(el0);
511  data_writers[b1].Put(el1);
512  }
513 
514  // last iteration of loop if we have an odd number of items.
515  for ( ; i < prefix_items + local_items_; i++)
516  {
517  size_t j0 = 1;
518  ValueType el0 = unsorted_reader.Next<ValueType>();
519 
520  // run item down the tree
521  for (size_t l = 0; l < log_k; l++)
522  {
523  j0 = 2 * j0 + (compare_function_(el0, tree[j0]) ? 0 : 1);
524  }
525 
526  size_t b0 = j0 - k;
527 
528  while (b0 && EqualSampleGreaterIndex(
529  sorted_splitters[b0 - 1], SampleIndexPair(el0, i))) {
530  b0--;
531  }
532 
533  assert(data_writers[b0].IsValid());
534  data_writers[b0].Put(el0);
535  }
536 
537  // implicitly close writers and flush data
538  }
539 
540  void MainOp() {
541  RunTimer timer(timer_execute_);
542 
543  size_t prefix_items = local_items_;
544  size_t total_items = context_.net.ExPrefixSumTotal(prefix_items);
545 
546  size_t num_total_workers = context_.num_workers();
547 
548  sLOG << "worker" << context_.my_rank()
549  << "local_items_" << local_items_
550  << "prefix_items" << prefix_items
551  << "total_items" << total_items
552  << "local sample_.size()" << samples_.size();
553 
554  if (total_items == 0) {
556  << "class" << "SortNode"
557  << "event" << "done"
558  << "workers" << num_total_workers
559  << "local_out_size" << local_out_size_
560  << "balance" << 0
561  << "sample_size" << samples_.size();
562  return;
563  }
564 
565  // stream to send samples to process 0 and receive them back
566  data::MixStreamPtr sample_stream = context_.GetNewMixStream(this);
567 
568  // Send all samples to worker 0.
569  data::MixStream::Writers sample_writers = sample_stream->GetWriters();
570 
571  for (const SampleIndexPair& sample : samples_) {
572  // send samples but add the local prefix to index ranks
573  sample_writers[0].Put(
574  SampleIndexPair(sample.first, prefix_items + sample.second));
575  }
576  sample_writers[0].Close();
577  std::vector<SampleIndexPair>().swap(samples_);
578 
579  // Get the ceiling of log(num_total_workers), as SSSS needs 2^n buckets.
580  size_t ceil_log = tlx::integer_log2_ceil(num_total_workers);
581  size_t workers_algo = size_t(1) << ceil_log;
582  size_t splitter_count_algo = workers_algo - 1;
583 
584  std::vector<SampleIndexPair> splitters;
585  splitters.reserve(workers_algo);
586 
587  if (context_.my_rank() == 0) {
588  FindAndSendSplitters(splitters, samples_.size(),
589  sample_stream, sample_writers);
590  }
591  else {
592  // Close unused emitters
593  for (size_t j = 1; j < num_total_workers; j++) {
594  sample_writers[j].Close();
595  }
597  sample_stream->GetMixReader(/* consume */ true);
598  while (reader.HasNext()) {
599  splitters.push_back(reader.template Next<SampleIndexPair>());
600  }
601  }
602  sample_writers.clear();
603  sample_stream.reset();
604 
605  // code from SS2NPartition, slightly altered
606 
607  std::vector<ValueType> splitter_tree(workers_algo + 1);
608 
609  // add sentinel splitters if fewer nodes than splitters.
610  for (size_t i = num_total_workers; i < workers_algo; i++) {
611  splitters.push_back(splitters.back());
612  }
613 
614  TreeBuilder(splitter_tree.data(),
615  splitters.data(),
616  splitter_count_algo);
617 
618  data::MixStreamPtr data_stream = context_.GetNewMixStream(this);
619 
620  std::thread thread;
622  // launch receiver thread.
623  thread = common::CreateThread(
624  [this, &data_stream]() {
626  return ReceiveItems(data_stream);
627  });
628  }
629 
631  splitter_tree.data(), // Tree. sizeof |splitter|
632  workers_algo, // Number of buckets
633  ceil_log,
634  num_total_workers,
635  splitters.data(),
636  prefix_items,
637  data_stream);
638 
639  std::vector<ValueType>().swap(splitter_tree);
640 
642  thread.join();
643  else
644  ReceiveItems(data_stream);
645 
646  data_stream.reset();
647 
648  double balance = 0;
649  if (local_out_size_ > 0) {
650  balance = static_cast<double>(local_out_size_)
651  * static_cast<double>(num_total_workers)
652  / static_cast<double>(total_items);
653  }
654 
655  if (balance > 1) {
656  balance = 1 / balance;
657  }
658 
660  << "class" << "SortNode"
661  << "event" << "done"
662  << "workers" << num_total_workers
663  << "local_out_size" << local_out_size_
664  << "balance" << balance
665  << "sample_size" << samples_.size();
666  }
667 
668  void ReceiveItems(data::MixStreamPtr& data_stream) {
669 
670  auto reader = data_stream->GetMixReader(/* consume */ true);
671 
672  LOG0 << "Writing files";
673 
674  // M/2 such that the other half is used to prepare the next bulk
675  size_t capacity = DIABase::mem_limit_ / sizeof(ValueType) / 2;
676  std::vector<ValueType> vec;
677  vec.reserve(capacity);
678 
679  while (reader.HasNext()) {
680  if (!mem::memory_exceeded && vec.size() < capacity) {
681  vec.push_back(reader.template Next<ValueType>());
682  }
683  else {
684  SortAndWriteToFile(vec);
685  }
686  }
687 
688  if (vec.size())
689  SortAndWriteToFile(vec);
690 
691  if (stats_enabled) {
693  "Sort() timer_sort_", timer_sort_.SecondsDouble());
694  }
695  }
696 
697  void SortAndWriteToFile(std::vector<ValueType>& vec) {
698 
699  LOG << "SortAndWriteToFile() " << vec.size()
700  << " items into file #" << files_.size();
701 
702  size_t vec_size = vec.size();
703  local_out_size_ += vec.size();
704 
705  // advise block pool to write out data if necessary
706  // context_.block_pool().AdviseFree(vec.size() * sizeof(ValueType));
707 
708  timer_sort_.Start();
709  sort_algorithm_(vec.begin(), vec.end(), compare_function_);
710  // common::qsort_two_pivots_yaroslavskiy(vec.begin(), vec.end(), compare_function_);
711  // common::qsort_three_pivots(vec.begin(), vec.end(), compare_function_);
712  timer_sort_.Stop();
713 
714  LOG0 << "SortAndWriteToFile() sort took " << timer_sort_;
715 
716  Timer write_time;
717  write_time.Start();
718 
719  files_.emplace_back(context_.GetFile(this));
720  auto writer = files_.back().GetWriter();
721  for (const ValueType& elem : vec) {
722  writer.Put(elem);
723  }
724  writer.Close();
725 
726  write_time.Stop();
727 
728  LOG0 << "SortAndWriteToFile() finished writing files";
729 
730  vec.clear();
731 
732  LOG0 << "SortAndWriteToFile() vector cleared";
733 
735  << "class" << "SortNode"
736  << "event" << "write_file"
737  << "file_num" << (files_.size() - 1)
738  << "items" << vec_size
739  << "timer_sort_" << timer_sort_
740  << "write_time" << write_time;
741  }
742 };
743 
745 {
746 public:
747  template <typename Iterator, typename CompareFunction>
748  void operator () (Iterator begin, Iterator end, CompareFunction cmp) const {
749  return std::sort(begin, end, cmp);
750  }
751 };
752 
753 template <typename ValueType, typename Stack>
754 template <typename CompareFunction>
755 auto DIA<ValueType, Stack>::Sort(const CompareFunction& compare_function) const {
756  assert(IsValid());
757 
758  using SortNode = api::SortNode<
759  ValueType, CompareFunction, DefaultSortAlgorithm>;
760 
761  static_assert(
762  std::is_convertible<
763  ValueType,
765  "CompareFunction has the wrong input type");
766 
767  static_assert(
768  std::is_convertible<
769  ValueType,
771  "CompareFunction has the wrong input type");
772 
773  static_assert(
774  std::is_convertible<
776  bool>::value,
777  "CompareFunction has the wrong output type (should be bool)");
778 
779  auto node = tlx::make_counting<SortNode>(*this, compare_function);
780 
781  return DIA<ValueType>(node);
782 }
783 
784 template <typename ValueType, typename Stack>
785 template <typename CompareFunction, typename SortAlgorithm>
786 auto DIA<ValueType, Stack>::Sort(const CompareFunction& compare_function,
787  const SortAlgorithm& sort_algorithm) const {
788  assert(IsValid());
789 
790  using SortNode = api::SortNode<
791  ValueType, CompareFunction, SortAlgorithm>;
792 
793  static_assert(
794  std::is_convertible<
795  ValueType,
797  "CompareFunction has the wrong input type");
798 
799  static_assert(
800  std::is_convertible<
801  ValueType,
803  "CompareFunction has the wrong input type");
804 
805  static_assert(
806  std::is_convertible<
808  bool>::value,
809  "CompareFunction has the wrong output type (should be bool)");
810 
811  auto node = tlx::make_counting<SortNode>(
812  *this, compare_function, sort_algorithm);
813 
814  return DIA<ValueType>(node);
815 }
816 
817 } // namespace api
818 } // namespace thrill
819 
820 #endif // !THRILL_API_SORT_HEADER
821 
822 /******************************************************************************/
bool HasNext()
HasNext() returns true if at least one more item is available.
SortAlgorithm sort_algorithm_
Sort function class.
Definition: sort.hpp:285
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
Definition: sort.hpp:192
size_t MaxMergeDegreePrefetch()
Definition: sort.hpp:176
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSumTotal(T &value, const T &initial=T(), const BinarySumOp &sum_op=BinarySumOp())
Calculates the exclusive prefix sum over all workers, and delivers the total sum as well...
A DIANode which performs a Sort operation.
Definition: sort.hpp:56
void PushFile(data::File &file, bool consume) const
Definition: dia_node.hpp:156
static DIAMemUse Max()
Definition: dia_base.hpp:60
net::FlowControlChannel & net
Definition: context.hpp:443
size_t num_items() const
Return the number of items in the file.
Definition: file.hpp:178
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
ValueType_ ValueType
Definition: dia.hpp:152
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: sort.hpp:276
virtual DIAMemUse ExecuteMemUse()
Amount of RAM used by Execute()
Definition: dia_base.hpp:176
Timer timer_sort_
time spent in sort()
Definition: sort.hpp:336
#define LOG0
Override default output: never or always output log.
Definition: logger.hpp:175
std::pair< ValueType, size_t > SampleIndexPair
Definition: sort.hpp:71
#define sLOG1
Definition: logger.hpp:188
void PrintCollectiveMeanStdev(const char *text, const Type &local)
Definition: context.hpp:349
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:240
bool EqualSampleGreaterIndex(const SampleIndexPair &a, const SampleIndexPair &b)
Definition: sort.hpp:427
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:24
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
void operator()(Iterator begin, Iterator end, CompareFunction cmp) const
Definition: sort.hpp:748
RIAA class for running the timer until destruction.
void reset()
release contained pointer, frees object if this is the last reference.
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
void SortAndWriteToFile(std::vector< ValueType > &vec)
Definition: sort.hpp:697
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
static constexpr bool stats_enabled
Set this variable to true to enable generation and output of stats.
Definition: sort.hpp:61
static constexpr bool g_debug_push_file
Definition: config.hpp:44
TreeBuilder(ValueType *splitter_tree, const SampleIndexPair *samples, size_t ssplitter)
Target: tree.
Definition: sort.hpp:396
bool memory_exceeded
memory limit exceeded indicator
std::default_random_engine rng_
a random generator
Definition: context.hpp:432
ConsumeReader GetConsumeReader(size_t num_prefetch=File::default_prefetch)
Get consuming BlockReader for beginning of File.
Definition: file.cpp:73
void add(const Type &item)
visit item, maybe add it to the sample.
static const bool use_background_thread_
Definition: sort.hpp:73
size_t wanted_sample_size() const
calculate currently desired number of samples
Definition: sort.hpp:310
void StartPrefetch(std::vector< Reader > &readers, size_t prefetch)
Take a vector of Readers and prefetch equally from them.
Definition: file.hpp:562
void Close()
custom destructor to close writers is a cyclic fashion
Definition: stream.cpp:60
void FindAndSendSplitters(std::vector< SampleIndexPair > &splitters, size_t sample_size, data::MixStreamPtr &sample_stream, data::MixStream::Writers &sample_writers)
Definition: sort.hpp:340
auto Sort(const CompareFunction &compare_function=CompareFunction()) const
Sort is a DOp, which sorts a given DIA according to the given compare_function.
Definition: sort.hpp:755
#define sLOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:184
data::File::Writer unsorted_writer_
Writer for unsorted_file_.
Definition: sort.hpp:296
size_t calc_sample_size(size_t count) const
calculate desired sample size
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Definition: stream_data.hpp:56
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
Reader to retrieve items in unordered sequence from a MixBlockQueue.
File Copy() const
Return a copy of the File (explicit copy-constructor)
Definition: file.cpp:42
virtual void StopPreOp(size_t)
Virtual method for preparing end of PushData.
Definition: dia_base.hpp:173
ItemType GetItemAt(size_t index) const
Definition: file.hpp:505
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
data::File unsorted_file_
All local unsorted items before communication.
Definition: sort.hpp:294
#define LOGC(cond)
Explicitly specify the condition for logging.
Definition: logger.hpp:167
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
const bool parent_stack_empty_
Whether the parent stack is empty.
Definition: sort.hpp:288
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
common::FunctionTraits< Function > FunctionTraits
alias for convenience.
Definition: dia.hpp:147
static size_t RoundDown(Integral n, Integral k)
round n down by k where k is a power of two.
Definition: sort.hpp:433
std::thread CreateThread(Args &&...args)
create a std::thread and repeat creation if it fails
Definition: porting.hpp:42
void ReceiveItems(data::MixStreamPtr &data_stream)
Definition: sort.hpp:668
size_t local_out_size_
Total number of local elements after communication.
Definition: sort.hpp:322
virtual void StartPreOp(size_t)
Virtual method for preparing start of PushData.
Definition: dia_base.hpp:163
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:280
data::MixStreamPtr GetNewMixStream(size_t dia_id)
Definition: context.cpp:1152
DIAMemUse mem_limit_
Definition: dia_base.hpp:314
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
common::JsonLogger logger_
Definition: dia_base.hpp:329
size_t local_worker_id() const
Definition: context.hpp:260
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
const SampleIndexPair * samples_
Definition: sort.hpp:387
void recurse(const SampleIndexPair *lo, const SampleIndexPair *hi, unsigned int treeidx)
Definition: sort.hpp:406
virtual bool OnPreOpFile(const data::File &, size_t)
Definition: dia_base.hpp:168
static constexpr bool debug
Definition: sort.hpp:58
void Close()
Explicitly close the writer.
void TransmitItems(const ValueType *const tree, size_t k, size_t log_k, size_t actual_k, const SampleIndexPair *const sorted_splitters, size_t prefix_items, data::MixStreamPtr &data_stream)
Definition: sort.hpp:437
unsigned integer_log2_ceil(int i)
calculate the log2 ceiling of an integer type (by repeated bit shifts)
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:248
static constexpr const T & min(const T &a, const T &b)
template for constexpr min, because std::min is not good enough.
Definition: functional.hpp:59
size_t local_items_
Number of items on this worker.
Definition: sort.hpp:298
common::ReservoirSamplingGrow< SampleIndexPair > res_sampler_
Reservoir sampler.
Definition: sort.hpp:306
Timer timer_execute_
time spent in Execute
Definition: sort.hpp:333
void SetCpuAffinity(std::thread &thread, size_t cpu_id)
set cpu/core affinity of a thread
Definition: porting.cpp:110
CompareFunction compare_function_
The comparison function which is applied to two elements.
Definition: sort.hpp:282
std::deque< data::File > files_
Local data files.
Definition: sort.hpp:320
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Definition: file.cpp:63
Context & context_
associated Context
Definition: dia_base.hpp:293
Timer timer_preop_
time spent in PreOp (including preceding Node's computation)
Definition: sort.hpp:330
std::vector< SampleIndexPair > samples_
Sample vector: pairs of (sample,local index)
Definition: sort.hpp:304
virtual DIAMemUse PushDataMemUse()
Amount of RAM used by PushData()
Definition: dia_base.hpp:182
bool LessSampleIndex(const SampleIndexPair &a, const SampleIndexPair &b)
Definition: sort.hpp:422
static constexpr double desired_imbalance_
epsilon
Definition: sort.hpp:301
SortNode(const ParentDIA &parent, const CompareFunction &compare_function, const SortAlgorithm &sort_algorithm=SortAlgorithm())
Constructor for a sort node.
Definition: sort.hpp:80
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:172