Thrill  0.1
parallel_sample_sort.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * tlx/sort/strings/parallel_sample_sort.hpp
3  *
4  * Parallel Super Scalar String Sample Sort (pS5)
5  *
6  * See also Timo Bingmann, Andreas Eberle, and Peter Sanders. "Engineering
7  * parallel string sorting." Algorithmica 77.1 (2017): 235-286.
8  *
9  * Part of tlx - http://panthema.net/tlx
10  *
11  * Copyright (C) 2013-2019 Timo Bingmann <[email protected]>
12  *
13  * All rights reserved. Published under the Boost Software License, Version 1.0
14  ******************************************************************************/
15 
16 #ifndef TLX_SORT_STRINGS_PARALLEL_SAMPLE_SORT_HEADER
17 #define TLX_SORT_STRINGS_PARALLEL_SAMPLE_SORT_HEADER
18 
19 #include <algorithm>
20 #include <atomic>
21 #include <cmath>
22 #include <cstdlib>
23 #include <cstring>
24 #include <random>
25 #include <vector>
26 
30 
31 #include <tlx/logger/core.hpp>
32 #include <tlx/math/clz.hpp>
33 #include <tlx/math/ctz.hpp>
34 #include <tlx/meta/enable_if.hpp>
35 #include <tlx/multi_timer.hpp>
36 #include <tlx/simple_vector.hpp>
37 #include <tlx/thread_pool.hpp>
38 #include <tlx/unused.hpp>
39 
40 namespace tlx {
41 namespace sort_strings_detail {
42 
43 class PS5SortStep;
44 
45 /******************************************************************************/
46 //! Parallel Super Scalar String Sample Sort Parameter Struct
47 
49 {
50 public:
51  static const bool debug_steps = false;
52  static const bool debug_jobs = false;
53 
54  static const bool debug_bucket_size = false;
55  static const bool debug_recursion = false;
56  static const bool debug_lcp = false;
57 
58  static const bool debug_result = false;
59 
60  //! enable/disable various sorting levels
61  static const bool enable_parallel_sample_sort = true;
62  static const bool enable_sequential_sample_sort = true;
63  static const bool enable_sequential_mkqs = true;
64 
65  //! terminate sort after first parallel sample sort step
66  static const bool use_only_first_sortstep = false;
67 
68  //! enable work freeing
69  static const bool enable_work_sharing = true;
70 
71  //! whether the base sequential_threshold() on the remaining unsorted string
72  //! set or on the whole string set.
73  static const bool enable_rest_size = false;
74 
75  //! key type for sample sort: 32-bit or 64-bit
76  typedef size_t key_type;
77 
78  //! depth of classification tree used in sample sorts
79  static const unsigned TreeBits = 10;
80 
81  //! classification tree variant for sample sorts
83 
84  //! threshold to run sequential small sorts
85  static const size_t smallsort_threshold = 1024 * 1024;
86  //! threshold to switch to insertion sort
87  static const size_t inssort_threshold = 32;
88 };
89 
90 /******************************************************************************/
91 //! Parallel Super Scalar String Sample Sort Context
92 
93 template <typename Parameters>
94 class PS5Context : public Parameters
95 {
96 public:
97  //! total size of input
98  size_t total_size;
99 
100  //! number of remaining strings to sort
101  std::atomic<size_t> rest_size;
102 
103  //! counters
104  std::atomic<size_t> para_ss_steps, sequ_ss_steps, base_sort_steps;
105 
106  //! timers for individual sorting steps
108 
109  //! number of threads overall
110  size_t num_threads;
111 
112  //! thread pool
114 
115  //! context constructor
116  PS5Context(size_t _thread_num)
117  : para_ss_steps(0), sequ_ss_steps(0), base_sort_steps(0),
118  num_threads(_thread_num),
119  threads_(_thread_num)
120  { }
121 
122  //! enqueue a new job in the thread pool
123  template <typename StringPtr>
124  void enqueue(PS5SortStep* sstep, const StringPtr& strptr, size_t depth);
125 
126  //! return sequential sorting threshold
128  size_t threshold = this->smallsort_threshold;
129  if (this->enable_rest_size) {
130  return std::max(threshold, rest_size / num_threads);
131  }
132  else {
133  return std::max(threshold, total_size / num_threads);
134  }
135  }
136 
137  //! decrement number of unordered strings
138  void donesize(size_t n) {
139  if (this->enable_rest_size)
140  rest_size -= n;
141  }
142 };
143 
144 /******************************************************************************/
145 //! LCP calculation of Splitter Strings
146 
147 template <typename KeyType>
148 static inline unsigned char
149 lcpKeyType(const KeyType& a, const KeyType& b) {
150  // XOR both values and count the number of zero bytes
151  return clz(a ^ b) / 8;
152 }
153 
154 template <typename KeyType>
155 static inline unsigned char
156 lcpKeyDepth(const KeyType& a) {
157  // count number of non-zero bytes
158  return sizeof(KeyType) - (ctz(a) / 8);
159 }
160 
161 //! return the d-th character in the (swapped) key
162 template <typename KeyType>
163 static inline unsigned char
164 getCharAtDepth(const KeyType& a, unsigned char d) {
165  return static_cast<unsigned char>(a >> (8 * (sizeof(KeyType) - 1 - d)));
166 }
167 
168 /******************************************************************************/
169 //! PS5SortStep Top-Level Class to Keep Track of Substeps
170 
172 {
173 private:
174  //! Number of substeps still running
175  std::atomic<size_t> substep_working_;
176 
177  //! Pure virtual function called by substep when all substeps are done.
178  virtual void substep_all_done() = 0;
179 
180 protected:
181  PS5SortStep() : substep_working_(0) { }
182 
183  virtual ~PS5SortStep() {
184  assert(substep_working_ == 0);
185  }
186 
187  //! Register new substep
188  void substep_add() {
189  ++substep_working_;
190  }
191 
192 public:
193  //! Notify superstep that the currently substep is done.
195  assert(substep_working_ > 0);
196  if (--substep_working_ == 0)
197  substep_all_done();
198  }
199 };
200 
201 /******************************************************************************/
202 //! LCP Calculation for Finished Sample Sort Steps
203 
204 template <size_t bktnum, typename Context, typename Classify,
205  typename StringPtr, typename BktSizeType>
206 void ps5_sample_sort_lcp(const Context& ctx, const Classify& classifier,
207  const StringPtr& strptr, size_t depth,
208  const BktSizeType* bkt) {
209  assert(!strptr.flipped());
210 
211  const typename StringPtr::StringSet& strset = strptr.active();
212  typedef typename Context::key_type key_type;
213 
214  size_t b = 0; // current bucket number
215  key_type prevkey = 0; // previous key
216 
217  // the following while loops only check b < bktnum when b is odd,
218  // because bktnum is always odd. We need a goto to jump into the loop,
219  // as b == 0 start even.
220  goto even_first;
221 
222  // find first non-empty bucket
223  while (b < bktnum)
224  {
225  // odd bucket: = bkt
226  if (bkt[b] != bkt[b + 1]) {
227  prevkey = classifier.get_splitter(b / 2);
228  assert(prevkey == get_key_at<key_type>(strset, bkt[b + 1] - 1, depth));
229  break;
230  }
231  ++b;
232 even_first:
233  // even bucket: <, << or > bkt
234  if (bkt[b] != bkt[b + 1]) {
235  prevkey = get_key_at<key_type>(strset, bkt[b + 1] - 1, depth);
236  break;
237  }
238  ++b;
239  }
240  ++b;
241 
242  // goto depends on whether the first non-empty bucket was odd or
243  // even. the while loop below encodes this in the program counter.
244  if (b < bktnum && b % 2 == 0) goto even_bucket;
245 
246  // find next non-empty bucket
247  while (b < bktnum)
248  {
249  // odd bucket: = bkt
250  if (bkt[b] != bkt[b + 1]) {
251  key_type thiskey = classifier.get_splitter(b / 2);
252  assert(thiskey == get_key_at<key_type>(strset, bkt[b], depth));
253 
254  int rlcp = lcpKeyType(prevkey, thiskey);
255  strptr.set_lcp(bkt[b], depth + rlcp);
256  // strptr.set_cache(bkt[b], getCharAtDepth(thiskey, rlcp));
257 
258  TLX_LOGC(ctx.debug_lcp)
259  << "LCP at odd-bucket " << b
260  << " [" << bkt[b] << "," << bkt[b + 1] << ")"
261  << " is " << depth + rlcp;
262 
263  prevkey = thiskey;
264  assert(prevkey == get_key_at<key_type>(strset, bkt[b + 1] - 1, depth));
265  }
266  ++b;
267 even_bucket:
268  // even bucket: <, << or > bkt
269  if (bkt[b] != bkt[b + 1]) {
270  key_type thiskey = get_key_at<key_type>(strset, bkt[b], depth);
271 
272  int rlcp = lcpKeyType(prevkey, thiskey);
273  strptr.set_lcp(bkt[b], depth + rlcp);
274  // strptr.set_cache(bkt[b], getCharAtDepth(thiskey, rlcp));
275 
276  TLX_LOGC(ctx.debug_lcp)
277  << "LCP at even-bucket " << b
278  << " [" << bkt[b] << "," << bkt[b + 1] << ")"
279  << " is " << depth + rlcp;
280 
281  prevkey = get_key_at<key_type>(strset, bkt[b + 1] - 1, depth);
282  }
283  ++b;
284  }
285 }
286 
287 /******************************************************************************/
288 //! SampleSort: Non-Recursive In-Place Sequential Sample Sort for Small Sorts
289 
290 template <typename Context, typename StringPtr, typename BktSizeType>
292 {
293 public:
294  Context& ctx_;
295 
296  //! parent sort step
298 
299  StringPtr strptr_;
300  size_t depth_;
302 
303  typedef typename Context::key_type key_type;
304  typedef typename StringPtr::StringSet StringSet;
305  typedef BktSizeType bktsize_type;
306 
307  PS5SmallsortJob(Context& ctx, PS5SortStep* pstep,
308  const StringPtr& strptr, size_t depth)
309  : ctx_(ctx), pstep_(pstep), strptr_(strptr), depth_(depth) {
310  TLX_LOGC(ctx_.debug_steps)
311  << "enqueue depth=" << depth_
312  << " size=" << strptr_.size() << " flip=" << strptr_.flipped();
313  }
314 
316  mtimer_.stop();
317  ctx_.mtimer.add(mtimer_);
318  }
319 
321  size_t bktcache_size_ = 0;
322 
323  void run() {
324  mtimer_.start("sequ_ss");
325 
326  size_t n = strptr_.size();
327 
328  TLX_LOGC(ctx_.debug_jobs)
329  << "Process PS5SmallsortJob " << this << " of size " << n;
330 
331  // create anonymous wrapper job
332  this->substep_add();
333 
334  if (ctx_.enable_sequential_sample_sort && n >= ctx_.smallsort_threshold)
335  {
336  bktcache_.resize(n * sizeof(uint16_t));
337  sort_sample_sort(strptr_, depth_);
338  }
339  else
340  {
341  mtimer_.start("mkqs");
342  sort_mkqs_cache(strptr_, depth_);
343  }
344 
345  // finish wrapper job, handler delete's this
346  this->substep_notify_done();
347  }
348 
349  /*------------------------------------------------------------------------*/
350  //! Stack of Recursive Sample Sort Steps
351 
353  {
354  public:
355  StringPtr strptr_;
356  size_t idx_;
357  size_t depth_;
358 
359  using StringSet = typename StringPtr::StringSet;
360  using bktsize_type = BktSizeType;
361 
362  typename Context::Classify classifier;
363 
364  static const size_t num_splitters = Context::Classify::num_splitters;
365  static const size_t bktnum = 2 * num_splitters + 1;
366 
367  unsigned char splitter_lcp[num_splitters + 1];
368  bktsize_type bkt[bktnum + 1];
369 
370  SeqSampleSortStep(Context& ctx, const StringPtr& strptr, size_t depth,
371  uint16_t* bktcache)
372  : strptr_(strptr), idx_(0), depth_(depth) {
373  size_t n = strptr_.size();
374 
375  // step 1: select splitters with oversampling
376 
377  const size_t oversample_factor = 2;
378  const size_t sample_size = oversample_factor * num_splitters;
379 
380  simple_vector<key_type> samples(sample_size);
381 
382  const StringSet& strset = strptr_.active();
383 
384  std::minstd_rand rng(reinterpret_cast<uintptr_t>(samples.data()));
385 
386  for (size_t i = 0; i < sample_size; ++i)
387  samples[i] = get_key_at<key_type>(strset, rng() % n, depth_);
388 
389  std::sort(samples.begin(), samples.end());
390 
391  classifier.build(samples.data(), sample_size, splitter_lcp);
392 
393  // step 2: classify all strings
394 
395  classifier.classify(
396  strset, strset.begin(), strset.end(), bktcache, depth_);
397 
398  // step 2.5: count bucket sizes
399 
400  bktsize_type bktsize[bktnum];
401  memset(bktsize, 0, bktnum * sizeof(bktsize_type));
402 
403  for (size_t si = 0; si < n; ++si)
404  ++bktsize[bktcache[si]];
405 
406  // step 3: inclusive prefix sum
407 
408  bkt[0] = bktsize[0];
409  for (unsigned int i = 1; i < bktnum; ++i) {
410  bkt[i] = bkt[i - 1] + bktsize[i];
411  }
412  assert(bkt[bktnum - 1] == n);
413  bkt[bktnum] = n;
414 
415  // step 4: premute out-of-place
416 
417  const StringSet& strB = strptr_.active();
418  // get alternative shadow pointer array
419  const StringSet& sorted = strptr_.shadow();
420  typename StringSet::Iterator sbegin = sorted.begin();
421 
422  for (typename StringSet::Iterator str = strB.begin();
423  str != strB.end(); ++str, ++bktcache)
424  *(sbegin + --bkt[*bktcache]) = std::move(*str);
425 
426  // bkt is afterwards the exclusive prefix sum of bktsize
427 
428  // statistics
429 
430  ++ctx.sequ_ss_steps;
431  }
432 
433  void calculate_lcp(Context& ctx) {
434  TLX_LOGC(ctx.debug_lcp) << "Calculate LCP after sample sort step";
435  if (strptr_.with_lcp) {
436  ps5_sample_sort_lcp<bktnum>(ctx, classifier, strptr_, depth_, bkt);
437  }
438  }
439  };
440 
441  size_t ss_front_ = 0;
442  std::vector<SeqSampleSortStep> ss_stack_;
443 
444  void sort_sample_sort(const StringPtr& strptr, size_t depth) {
445  typedef SeqSampleSortStep Step;
446 
447  assert(ss_front_ == 0);
448  assert(ss_stack_.size() == 0);
449 
450  uint16_t* bktcache = reinterpret_cast<uint16_t*>(bktcache_.data());
451 
452  // sort first level
453  ss_stack_.emplace_back(ctx_, strptr, depth, bktcache);
454 
455  // step 5: "recursion"
456 
457  while (ss_stack_.size() > ss_front_)
458  {
459  Step& s = ss_stack_.back();
460  size_t i = s.idx_++; // process the bucket s.idx_
461 
462  if (i < Step::bktnum)
463  {
464  size_t bktsize = s.bkt[i + 1] - s.bkt[i];
465 
466  StringPtr sp = s.strptr_.flip(s.bkt[i], bktsize);
467 
468  // i is even -> bkt[i] is less-than bucket
469  if (i % 2 == 0)
470  {
471  if (bktsize == 0) {
472  // empty bucket
473  }
474  else if (bktsize < ctx_.smallsort_threshold)
475  {
476  assert(i / 2 <= Step::num_splitters);
477  if (i == Step::bktnum - 1)
478  TLX_LOGC(ctx_.debug_recursion)
479  << "Recurse[" << s.depth_ << "]: > bkt "
480  << i << " size " << bktsize << " no lcp";
481  else
482  TLX_LOGC(ctx_.debug_recursion)
483  << "Recurse[" << s.depth_ << "]: < bkt "
484  << i << " size " << bktsize << " lcp "
485  << int(s.splitter_lcp[i / 2] & 0x7F);
486 
487  ScopedMultiTimerSwitch sts_inssort(mtimer_, "mkqs");
488  sort_mkqs_cache(
489  sp, s.depth_ + (s.splitter_lcp[i / 2] & 0x7F));
490  }
491  else
492  {
493  if (i == Step::bktnum - 1)
494  TLX_LOGC(ctx_.debug_recursion)
495  << "Recurse[" << s.depth_ << "]: > bkt "
496  << i << " size " << bktsize << " no lcp";
497  else
498  TLX_LOGC(ctx_.debug_recursion)
499  << "Recurse[" << s.depth_ << "]: < bkt "
500  << i << " size " << bktsize << " lcp "
501  << int(s.splitter_lcp[i / 2] & 0x7F);
502 
503  ss_stack_.emplace_back(
504  ctx_, sp, s.depth_ + (s.splitter_lcp[i / 2] & 0x7F), bktcache);
505  }
506  }
507  // i is odd -> bkt[i] is equal bucket
508  else
509  {
510  if (bktsize == 0) {
511  // empty bucket
512  }
513  else if (s.splitter_lcp[i / 2] & 0x80) {
514  // equal-bucket has nullptr-terminated key, done.
515  TLX_LOGC(ctx_.debug_recursion)
516  << "Recurse[" << s.depth_ << "]: = bkt "
517  << i << " size " << bktsize << " is done!";
518  StringPtr spb = sp.copy_back();
519 
520  if (sp.with_lcp) {
521  spb.fill_lcp(
522  s.depth_ + lcpKeyDepth(s.classifier.get_splitter(i / 2)));
523  }
524  ctx_.donesize(bktsize);
525  }
526  else if (bktsize < ctx_.smallsort_threshold)
527  {
528  TLX_LOGC(ctx_.debug_recursion)
529  << "Recurse[" << s.depth_ << "]: = bkt "
530  << i << " size " << bktsize << " lcp keydepth!";
531 
532  ScopedMultiTimerSwitch sts_inssort(mtimer_, "mkqs");
533  sort_mkqs_cache(sp, s.depth_ + sizeof(key_type));
534  }
535  else
536  {
537  TLX_LOGC(ctx_.debug_recursion)
538  << "Recurse[" << s.depth_ << "]: = bkt "
539  << i << " size " << bktsize << " lcp keydepth!";
540 
541  ss_stack_.emplace_back(
542  ctx_, sp, s.depth_ + sizeof(key_type), bktcache);
543  }
544  }
545  }
546  else
547  {
548  // finished sort
549  assert(ss_stack_.size() > ss_front_);
550 
551  // after full sort: calculate LCPs at this level
552  ss_stack_.back().calculate_lcp(ctx_);
553 
554  ss_stack_.pop_back();
555  }
556 
557  if (ctx_.enable_work_sharing && ctx_.threads_.has_idle()) {
558  sample_sort_free_work();
559  }
560  }
561  }
562 
564  assert(ss_stack_.size() >= ss_front_);
565 
566  if (ss_stack_.size() == ss_front_) {
567  // ss_stack_ is empty, check other stack
568  return mkqs_free_work();
569  }
570 
571  // convert top level of stack into independent jobs
572  TLX_LOGC(ctx_.debug_jobs)
573  << "Freeing top level of PS5SmallsortJob's sample_sort stack";
574 
575  typedef SeqSampleSortStep Step;
576  Step& s = ss_stack_[ss_front_];
577 
578  while (s.idx_ < Step::bktnum)
579  {
580  size_t i = s.idx_++; // process the bucket s.idx_
581 
582  size_t bktsize = s.bkt[i + 1] - s.bkt[i];
583 
584  StringPtr sp = s.strptr_.flip(s.bkt[i], bktsize);
585 
586  // i is even -> bkt[i] is less-than bucket
587  if (i % 2 == 0)
588  {
589  if (bktsize == 0) {
590  // empty bucket
591  }
592  else
593  {
594  if (i == Step::bktnum - 1)
595  TLX_LOGC(ctx_.debug_recursion)
596  << "Recurse[" << s.depth_ << "]: > bkt "
597  << i << " size " << bktsize << " no lcp";
598  else
599  TLX_LOGC(ctx_.debug_recursion)
600  << "Recurse[" << s.depth_ << "]: < bkt "
601  << i << " size " << bktsize << " lcp "
602  << int(s.splitter_lcp[i / 2] & 0x7F);
603 
604  this->substep_add();
605  ctx_.enqueue(this, sp,
606  s.depth_ + (s.splitter_lcp[i / 2] & 0x7F));
607  }
608  }
609  // i is odd -> bkt[i] is equal bucket
610  else
611  {
612  if (bktsize == 0) {
613  // empty bucket
614  }
615  else if (s.splitter_lcp[i / 2] & 0x80) {
616  // equal-bucket has nullptr-terminated key, done.
617  TLX_LOGC(ctx_.debug_recursion)
618  << "Recurse[" << s.depth_ << "]: = bkt "
619  << i << " size " << bktsize << " is done!";
620  StringPtr spb = sp.copy_back();
621 
622  if (sp.with_lcp) {
623  spb.fill_lcp(s.depth_ + lcpKeyDepth(
624  s.classifier.get_splitter(i / 2)));
625  }
626  ctx_.donesize(bktsize);
627  }
628  else
629  {
630  TLX_LOGC(ctx_.debug_recursion)
631  << "Recurse[" << s.depth_ << "]: = bkt "
632  << i << " size " << bktsize << " lcp keydepth!";
633 
634  this->substep_add();
635  ctx_.enqueue(this, sp, s.depth_ + sizeof(key_type));
636  }
637  }
638  }
639 
640  // shorten the current stack
641  ++ss_front_;
642  }
643 
644  /*------------------------------------------------------------------------*/
645  //! Stack of Recursive MKQS Steps
646 
647  static inline int cmp(const key_type& a, const key_type& b) {
648  return (a > b) ? 1 : (a < b) ? -1 : 0;
649  }
650 
651  template <typename Type>
652  static inline size_t
653  med3(Type* A, size_t i, size_t j, size_t k) {
654  if (A[i] == A[j]) return i;
655  if (A[k] == A[i] || A[k] == A[j]) return k;
656  if (A[i] < A[j]) {
657  if (A[j] < A[k]) return j;
658  if (A[i] < A[k]) return k;
659  return i;
660  }
661  else {
662  if (A[j] > A[k]) return j;
663  if (A[i] < A[k]) return i;
664  return k;
665  }
666  }
667 
668  //! Insertion sort the strings only based on the cached characters.
669  static inline void
670  insertion_sort_cache_block(const StringPtr& strptr, key_type* cache) {
671  const StringSet& strings = strptr.active();
672  size_t n = strptr.size();
673  size_t pi, pj;
674  for (pi = 1; --n > 0; ++pi) {
675  typename StringSet::String tmps = std::move(strings.at(pi));
676  key_type tmpc = cache[pi];
677  for (pj = pi; pj > 0; --pj) {
678  if (cache[pj - 1] <= tmpc)
679  break;
680  strings.at(pj) = std::move(strings.at(pj - 1));
681  cache[pj] = cache[pj - 1];
682  }
683  strings.at(pj) = std::move(tmps);
684  cache[pj] = tmpc;
685  }
686  }
687 
688  //! Insertion sort, but use cached characters if possible.
689  template <bool CacheDirty>
690  static inline void
691  insertion_sort_cache(const StringPtr& _strptr, key_type* cache, size_t depth) {
692  StringPtr strptr = _strptr.copy_back();
693 
694  if (strptr.size() <= 1) return;
695  if (CacheDirty)
696  return insertion_sort(strptr, depth, /* memory */ 0);
697 
698  insertion_sort_cache_block(strptr, cache);
699 
700  size_t start = 0, bktsize = 1;
701  for (size_t i = 0; i < strptr.size() - 1; ++i) {
702  // group areas with equal cache values
703  if (cache[i] == cache[i + 1]) {
704  ++bktsize;
705  continue;
706  }
707  // calculate LCP between group areas
708  if (start != 0 && strptr.with_lcp) {
709  int rlcp = lcpKeyType(cache[start - 1], cache[start]);
710  strptr.set_lcp(start, depth + rlcp);
711  // strptr.set_cache(start, getCharAtDepth(cache[start], rlcp));
712  }
713  // sort group areas deeper if needed
714  if (bktsize > 1) {
715  if (cache[start] & 0xFF) {
716  // need deeper sort
718  strptr.sub(start, bktsize), depth + sizeof(key_type),
719  /* memory */ 0);
720  }
721  else {
722  // cache contains nullptr-termination
723  strptr.sub(start, bktsize).fill_lcp(depth + lcpKeyDepth(cache[start]));
724  }
725  }
726  bktsize = 1;
727  start = i + 1;
728  }
729  // tail of loop for last item
730  if (start != 0 && strptr.with_lcp) {
731  int rlcp = lcpKeyType(cache[start - 1], cache[start]);
732  strptr.set_lcp(start, depth + rlcp);
733  // strptr.set_cache(start, getCharAtDepth(cache[start], rlcp));
734  }
735  if (bktsize > 1) {
736  if (cache[start] & 0xFF) {
737  // need deeper sort
739  strptr.sub(start, bktsize), depth + sizeof(key_type),
740  /* memory */ 0);
741  }
742  else {
743  // cache contains nullptr-termination
744  strptr.sub(start, bktsize).fill_lcp(depth + lcpKeyDepth(cache[start]));
745  }
746  }
747  }
748 
749  class MKQSStep
750  {
751  public:
752  StringPtr strptr_;
753  key_type* cache_;
754  size_t num_lt_, num_eq_, num_gt_, depth_;
755  size_t idx_;
756  unsigned char eq_recurse_;
757  // typename StringPtr::StringSet::Char dchar_eq_, dchar_gt_;
758  uint8_t lcp_lt_, lcp_eq_, lcp_gt_;
759 
760  MKQSStep(Context& ctx, const StringPtr& strptr,
761  key_type* cache, size_t depth, bool CacheDirty)
762  : strptr_(strptr), cache_(cache), depth_(depth), idx_(0) {
763  size_t n = strptr_.size();
764 
765  const StringSet& strset = strptr_.active();
766 
767  if (CacheDirty) {
768  typename StringSet::Iterator it = strset.begin();
769  for (size_t i = 0; i < n; ++i, ++it) {
770  cache_[i] = get_key<key_type>(strset, *it, depth);
771  }
772  }
773  // select median of 9
774  size_t p = med3(
775  cache_,
776  med3(cache_, 0, n / 8, n / 4),
777  med3(cache_, n / 2 - n / 8, n / 2, n / 2 + n / 8),
778  med3(cache_, n - 1 - n / 4, n - 1 - n / 8, n - 3));
779  // swap pivot to first position
780  std::swap(strset.at(0), strset.at(p));
781  std::swap(cache_[0], cache_[p]);
782  // save the pivot value
783  key_type pivot = cache_[0];
784  // for immediate LCP calculation
785  key_type max_lt = 0, min_gt = std::numeric_limits<key_type>::max();
786 
787  // indexes into array:
788  // 0 [pivot] 1 [===] leq [<<<] llt [???] rgt [>>>] req [===] n-1
789  size_t leq = 1, llt = 1, rgt = n - 1, req = n - 1;
790  while (true)
791  {
792  while (llt <= rgt)
793  {
794  int r = cmp(cache[llt], pivot);
795  if (r > 0) {
796  min_gt = std::min(min_gt, cache[llt]);
797  break;
798  }
799  else if (r == 0) {
800  std::swap(strset.at(leq), strset.at(llt));
801  std::swap(cache[leq], cache[llt]);
802  leq++;
803  }
804  else {
805  max_lt = std::max(max_lt, cache[llt]);
806  }
807  ++llt;
808  }
809  while (llt <= rgt)
810  {
811  int r = cmp(cache[rgt], pivot);
812  if (r < 0) {
813  max_lt = std::max(max_lt, cache[rgt]);
814  break;
815  }
816  else if (r == 0) {
817  std::swap(strset.at(req), strset.at(rgt));
818  std::swap(cache[req], cache[rgt]);
819  req--;
820  }
821  else {
822  min_gt = std::min(min_gt, cache[rgt]);
823  }
824  --rgt;
825  }
826  if (llt > rgt)
827  break;
828  std::swap(strset.at(llt), strset.at(rgt));
829  std::swap(cache[llt], cache[rgt]);
830  ++llt;
831  --rgt;
832  }
833  // calculate size of areas = < and >, save into struct
834  size_t num_leq = leq, num_req = n - 1 - req;
835  num_eq_ = num_leq + num_req;
836  num_lt_ = llt - leq;
837  num_gt_ = req - rgt;
838  assert(num_eq_ > 0);
839  assert(num_lt_ + num_eq_ + num_gt_ == n);
840 
841  // swap equal values from left to center
842  const size_t size1 = std::min(num_leq, num_lt_);
843  std::swap_ranges(strset.begin(), strset.begin() + size1,
844  strset.begin() + llt - size1);
845  std::swap_ranges(cache, cache + size1, cache + llt - size1);
846 
847  // swap equal values from right to center
848  const size_t size2 = std::min(num_req, num_gt_);
849  std::swap_ranges(strset.begin() + llt, strset.begin() + llt + size2,
850  strset.begin() + n - size2);
851  std::swap_ranges(cache + llt, cache + llt + size2,
852  cache + n - size2);
853 
854  // No recursive sorting if pivot has a zero byte
855  eq_recurse_ = (pivot & 0xFF);
856 
857  // save LCP values for writing into LCP array after sorting further
858  if (strptr_.with_lcp && num_lt_ > 0) {
859  assert(max_lt == *std::max_element(
860  cache_ + 0, cache + num_lt_));
861 
862  lcp_lt_ = lcpKeyType(max_lt, pivot);
863  // dchar_eq_ = getCharAtDepth(pivot, lcp_lt_);
864  TLX_LOGC(ctx.debug_lcp) << "LCP lt with pivot: " << depth_ + lcp_lt_;
865  }
866 
867  // calculate equal area lcp: +1 for the equal zero termination byte
868  lcp_eq_ = lcpKeyDepth(pivot);
869 
870  if (strptr_.with_lcp && num_gt_ > 0) {
871  assert(min_gt == *std::min_element(
872  cache_ + num_lt_ + num_eq_, cache_ + n));
873 
874  lcp_gt_ = lcpKeyType(pivot, min_gt);
875  // dchar_gt_ = getCharAtDepth(min_gt, lcp_gt_);
876  TLX_LOGC(ctx.debug_lcp) << "LCP pivot with gt: " << depth_ + lcp_gt_;
877  }
878 
879  ++ctx.base_sort_steps;
880  }
881 
882  void calculate_lcp() {
883  if (strptr_.with_lcp && num_lt_ > 0) {
884  strptr_.set_lcp(num_lt_, depth_ + lcp_lt_);
885  // strptr_.set_cache(num_lt_, dchar_eq_);
886  }
887 
888  if (strptr_.with_lcp && num_gt_ > 0) {
889  strptr_.set_lcp(num_lt_ + num_eq_, depth_ + lcp_gt_);
890  // strptr_.set_cache(num_lt_ + num_eq_, dchar_gt_);
891  }
892  }
893  };
894 
895  size_t ms_front_ = 0;
896  std::vector<MKQSStep> ms_stack_;
897 
898  void sort_mkqs_cache(const StringPtr& strptr, size_t depth) {
899  assert(strcmp(mtimer_.running(), "mkqs") == 0);
900 
901  if (!ctx_.enable_sequential_mkqs ||
902  strptr.size() < ctx_.inssort_threshold) {
903  TLX_LOGC(ctx_.debug_jobs)
904  << "insertion_sort() size "
905  << strptr.size() << " depth " << depth;
906 
907  ScopedMultiTimerSwitch sts_inssort(mtimer_, "inssort");
908  insertion_sort(strptr.copy_back(), depth, /* memory */ 0);
909  ctx_.donesize(strptr.size());
910  return;
911  }
912 
913  TLX_LOGC(ctx_.debug_jobs)
914  << "sort_mkqs_cache() size " << strptr.size() << " depth " << depth;
915 
916  if (bktcache_.size() < strptr.size() * sizeof(key_type)) {
917  bktcache_.destroy();
918  bktcache_.resize(strptr.size() * sizeof(key_type));
919  }
920 
921  // reuse bktcache as keycache
922  key_type* cache = reinterpret_cast<key_type*>(bktcache_.data());
923 
924  assert(ms_front_ == 0);
925  assert(ms_stack_.size() == 0);
926 
927  // std::deque is much slower than std::vector, so we use an artificial
928  // pop_front variable.
929  ms_stack_.emplace_back(ctx_, strptr, cache, depth, true);
930 
931  while (ms_stack_.size() > ms_front_)
932  {
933  MKQSStep& ms = ms_stack_.back();
934  ++ms.idx_; // increment here, because stack may change
935 
936  // process the lt-subsequence
937  if (ms.idx_ == 1) {
938  if (ms.num_lt_ == 0) {
939  // empty subsequence
940  }
941  else if (ms.num_lt_ < ctx_.inssort_threshold) {
942  ScopedMultiTimerSwitch sts_inssort(mtimer_, "inssort");
943  insertion_sort_cache<false>(ms.strptr_.sub(0, ms.num_lt_),
944  ms.cache_, ms.depth_);
945  ctx_.donesize(ms.num_lt_);
946  }
947  else {
948  ms_stack_.emplace_back(
949  ctx_,
950  ms.strptr_.sub(0, ms.num_lt_),
951  ms.cache_, ms.depth_, false);
952  }
953  }
954  // process the eq-subsequence
955  else if (ms.idx_ == 2) {
956  StringPtr sp = ms.strptr_.sub(ms.num_lt_, ms.num_eq_);
957 
958  assert(ms.num_eq_ > 0);
959 
960  if (!ms.eq_recurse_) {
961  StringPtr spb = sp.copy_back();
962  spb.fill_lcp(ms.depth_ + ms.lcp_eq_);
963  ctx_.donesize(spb.size());
964  }
965  else if (ms.num_eq_ < ctx_.inssort_threshold) {
966  ScopedMultiTimerSwitch sts_inssort(mtimer_, "inssort");
967  insertion_sort_cache<true>(sp, ms.cache_ + ms.num_lt_,
968  ms.depth_ + sizeof(key_type));
969  ctx_.donesize(ms.num_eq_);
970  }
971  else {
972  ms_stack_.emplace_back(
973  ctx_, sp,
974  ms.cache_ + ms.num_lt_,
975  ms.depth_ + sizeof(key_type), true);
976  }
977  }
978  // process the gt-subsequence
979  else if (ms.idx_ == 3) {
980  StringPtr sp = ms.strptr_.sub(
981  ms.num_lt_ + ms.num_eq_, ms.num_gt_);
982 
983  if (ms.num_gt_ == 0) {
984  // empty subsequence
985  }
986  else if (ms.num_gt_ < ctx_.inssort_threshold) {
987  ScopedMultiTimerSwitch sts_inssort(mtimer_, "inssort");
988  insertion_sort_cache<false>(
989  sp, ms.cache_ + ms.num_lt_ + ms.num_eq_, ms.depth_);
990  ctx_.donesize(ms.num_gt_);
991  }
992  else {
993  ms_stack_.emplace_back(
994  ctx_, sp,
995  ms.cache_ + ms.num_lt_ + ms.num_eq_,
996  ms.depth_, false);
997  }
998  }
999  // calculate lcps
1000  else {
1001  // finished sort
1002  assert(ms_stack_.size() > ms_front_);
1003 
1004  // calculate LCP after the three parts are sorted
1005  ms_stack_.back().calculate_lcp();
1006 
1007  ms_stack_.pop_back();
1008  }
1009 
1010  if (ctx_.enable_work_sharing && ctx_.threads_.has_idle()) {
1011  sample_sort_free_work();
1012  }
1013  }
1014  }
1015 
1017  assert(ms_stack_.size() >= ms_front_);
1018 
1019  for (unsigned int fl = 0; fl < 8; ++fl)
1020  {
1021  if (ms_stack_.size() == ms_front_) {
1022  return;
1023  }
1024 
1025  TLX_LOGC(ctx_.debug_jobs)
1026  << "Freeing top level of PS5SmallsortJob's mkqs stack - size "
1027  << ms_stack_.size();
1028 
1029  // convert top level of stack into independent jobs
1030 
1031  MKQSStep& ms = ms_stack_[ms_front_];
1032 
1033  if (ms.idx_ == 0 && ms.num_lt_ != 0)
1034  {
1035  this->substep_add();
1036  ctx_.enqueue(this, ms.strptr_.sub(0, ms.num_lt_), ms.depth_);
1037  }
1038  if (ms.idx_ <= 1) // st.num_eq > 0 always
1039  {
1040  assert(ms.num_eq_ > 0);
1041 
1042  StringPtr sp = ms.strptr_.sub(ms.num_lt_, ms.num_eq_);
1043 
1044  if (ms.eq_recurse_) {
1045  this->substep_add();
1046  ctx_.enqueue(this, sp, ms.depth_ + sizeof(key_type));
1047  }
1048  else {
1049  StringPtr spb = sp.copy_back();
1050  spb.fill_lcp(ms.depth_ + ms.lcp_eq_);
1051  ctx_.donesize(ms.num_eq_);
1052  }
1053  }
1054  if (ms.idx_ <= 2 && ms.num_gt_ != 0)
1055  {
1056  this->substep_add();
1057  ctx_.enqueue(
1058  this, ms.strptr_.sub(ms.num_lt_ + ms.num_eq_, ms.num_gt_),
1059  ms.depth_);
1060  }
1061 
1062  // shorten the current stack
1063  ++ms_front_;
1064  }
1065  }
1066 
1067  /*------------------------------------------------------------------------*/
1068  // Called When PS5SmallsortJob is Finished
1069 
1070  void substep_all_done() final {
1071  TLX_LOGC(ctx_.debug_recursion)
1072  << "SmallSort[" << depth_ << "] "
1073  << "all substeps done -> LCP calculation";
1074 
1075  while (ms_front_ > 0) {
1076  TLX_LOGC(ctx_.debug_lcp)
1077  << "SmallSort[" << depth_ << "] ms_front_: " << ms_front_;
1078  ms_stack_[--ms_front_].calculate_lcp();
1079  }
1080 
1081  while (ss_front_ > 0) {
1082  TLX_LOGC(ctx_.debug_lcp)
1083  << "SmallSort[" << depth_ << "] ss_front_: " << ss_front_;
1084  ss_stack_[--ss_front_].calculate_lcp(ctx_);
1085  }
1086 
1087  if (pstep_) pstep_->substep_notify_done();
1088  delete this;
1089  }
1090 };
1091 
1092 /******************************************************************************/
1093 //! PS5BigSortStep Out-of-Place Parallel Sample Sort with Separate Jobs
1094 
1095 template <typename Context, typename StringPtr>
1097 {
1098 public:
1100  typedef typename StringSet::Iterator StrIterator;
1101  typedef typename Context::key_type key_type;
1102 
1103  //! context
1104  Context& ctx_;
1105  //! parent sort step for notification
1107 
1108  //! string pointers, size, and current sorting depth
1109  StringPtr strptr_;
1110  size_t depth_;
1111 
1112  //! number of parts into which the strings were split
1113  size_t parts_;
1114  //! size of all parts except the last
1115  size_t psize_;
1116  //! number of threads still working
1117  std::atomic<size_t> pwork_;
1118 
1119  //! classifier instance and variables (contains splitter tree
1120  typename Context::Classify classifier_;
1121 
1122  static const size_t treebits_ = Context::Classify::treebits;
1123  static const size_t num_splitters_ = Context::Classify::num_splitters;
1124  static const size_t bktnum_ = 2 * num_splitters_ + 1;
1125 
1126  //! LCPs of splitters, needed for recursive calls
1127  unsigned char splitter_lcp_[num_splitters_ + 1];
1128 
1129  //! individual bucket array of threads, keep bkt[0] for DistributeJob
1131  //! bucket ids cache, created by classifier and later counted
1133 
1134  /*------------------------------------------------------------------------*/
1135  // Constructor
1136 
1137  PS5BigSortStep(Context& ctx, PS5SortStep* pstep,
1138  const StringPtr& strptr, size_t depth)
1139  : ctx_(ctx), pstep_(pstep), strptr_(strptr), depth_(depth) {
1140  // calculate number of parts
1141  parts_ = strptr_.size() / ctx.sequential_threshold() * 2;
1142  if (parts_ == 0) parts_ = 1;
1143 
1144  bkt_.resize(parts_);
1145  bktcache_.resize(parts_);
1146 
1147  psize_ = (strptr.size() + parts_ - 1) / parts_;
1148 
1149  TLX_LOGC(ctx_.debug_steps)
1150  << "enqueue depth=" << depth_
1151  << " size=" << strptr_.size()
1152  << " parts=" << parts_
1153  << " psize=" << psize_
1154  << " flip=" << strptr_.flipped();
1155 
1156  ctx.threads_.enqueue([this]() { sample(); });
1157  ++ctx.para_ss_steps;
1158  }
1159 
1160  virtual ~PS5BigSortStep() { }
1161 
1162  /*------------------------------------------------------------------------*/
1163  // Sample Step
1164 
1165  void sample() {
1166  ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1167  TLX_LOGC(ctx_.debug_jobs) << "Process SampleJob @ " << this;
1168 
1169  const size_t oversample_factor = 2;
1170  size_t sample_size = oversample_factor * num_splitters_;
1171 
1172  const StringSet& strset = strptr_.active();
1173  size_t n = strset.size();
1174 
1175  simple_vector<key_type> samples(sample_size);
1176 
1177  std::minstd_rand rng(reinterpret_cast<uintptr_t>(samples.data()));
1178 
1179  for (size_t i = 0; i < sample_size; ++i)
1180  samples[i] = get_key_at<key_type>(strset, rng() % n, depth_);
1181 
1182  std::sort(samples.begin(), samples.end());
1183 
1184  classifier_.build(samples.data(), sample_size, splitter_lcp_);
1185 
1186  // create new jobs
1187  pwork_ = parts_;
1188  for (unsigned int p = 0; p < parts_; ++p) {
1189  ctx_.threads_.enqueue([this, p]() { count(p); });
1190  }
1191  }
1192 
1193  /*------------------------------------------------------------------------*/
1194  // Counting Step
1195 
1196  void count(unsigned int p) {
1197  ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1198  TLX_LOGC(ctx_.debug_jobs) << "Process CountJob " << p << " @ " << this;
1199 
1200  const StringSet& strset = strptr_.active();
1201 
1202  StrIterator strB = strset.begin() + p * psize_;
1203  StrIterator strE = strset.begin() + std::min((p + 1) * psize_, strptr_.size());
1204  if (strE < strB) strE = strB;
1205 
1206  bktcache_[p].resize(strE - strB);
1207  uint16_t* bktcache = bktcache_[p].data();
1208  classifier_.classify(strset, strB, strE, bktcache, depth_);
1209 
1210  bkt_[p].resize(bktnum_ + (p == 0 ? 1 : 0));
1211  size_t* bkt = bkt_[p].data();
1212  memset(bkt, 0, bktnum_ * sizeof(size_t));
1213 
1214  for (uint16_t* bc = bktcache; bc != bktcache + (strE - strB); ++bc)
1215  ++bkt[*bc];
1216 
1217  if (--pwork_ == 0)
1218  count_finished();
1219  }
1220 
1222  ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1223  TLX_LOGC(ctx_.debug_jobs) << "Finishing CountJob " << this << " with prefixsum";
1224 
1225  // abort sorting if we're measuring only the top level
1226  if (ctx_.use_only_first_sortstep)
1227  return;
1228 
1229  // inclusive prefix sum over bkt
1230  size_t sum = 0;
1231  for (unsigned int i = 0; i < bktnum_; ++i) {
1232  for (unsigned int p = 0; p < parts_; ++p) {
1233  bkt_[p][i] = (sum += bkt_[p][i]);
1234  }
1235  }
1236  assert(sum == strptr_.size());
1237 
1238  // create new jobs
1239  pwork_ = parts_;
1240  for (unsigned int p = 0; p < parts_; ++p) {
1241  ctx_.threads_.enqueue([this, p]() { distribute(p); });
1242  }
1243  }
1244 
1245  /*------------------------------------------------------------------------*/
1246  // Distribute Step
1247 
1248  void distribute(unsigned int p) {
1249  ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1250  TLX_LOGC(ctx_.debug_jobs) << "Process DistributeJob " << p << " @ " << this;
1251 
1252  const StringSet& strset = strptr_.active();
1253 
1254  StrIterator strB = strset.begin() + p * psize_;
1255  StrIterator strE = strset.begin() + std::min((p + 1) * psize_, strptr_.size());
1256  if (strE < strB) strE = strB;
1257 
1258  // get alternative shadow pointer array
1259  const StringSet& sorted = strptr_.shadow();
1260  typename StringSet::Iterator sbegin = sorted.begin();
1261 
1262  uint16_t* bktcache = bktcache_[p].data();
1263  size_t* bkt = bkt_[p].data();
1264 
1265  for (StrIterator str = strB; str != strE; ++str, ++bktcache)
1266  *(sbegin + --bkt[*bktcache]) = std::move(*str);
1267 
1268  if (p != 0) // p = 0 is needed for recursion into bkts
1269  bkt_[p].destroy();
1270 
1271  bktcache_[p].destroy();
1272 
1273  if (--pwork_ == 0)
1274  distribute_finished();
1275  }
1276 
1278  TLX_LOGC(ctx_.debug_jobs)
1279  << "Finishing DistributeJob " << this << " with enqueuing subjobs";
1280 
1281  size_t* bkt = bkt_[0].data();
1282  assert(bkt);
1283 
1284  // first processor's bkt pointers are boundaries between bkts, just add sentinel:
1285  assert(bkt[0] == 0);
1286  bkt[bktnum_] = strptr_.size();
1287 
1288  // keep anonymous subjob handle while creating subjobs
1289  this->substep_add();
1290 
1291  size_t i = 0;
1292  while (i < bktnum_ - 1)
1293  {
1294  // i is even -> bkt[i] is less-than bucket
1295  size_t bktsize = bkt[i + 1] - bkt[i];
1296  if (bktsize == 0) {
1297  // empty bucket
1298  }
1299  else if (bktsize == 1) { // just one string pointer, copyback
1300  strptr_.flip(bkt[i], 1).copy_back();
1301  ctx_.donesize(1);
1302  }
1303  else
1304  {
1305  TLX_LOGC(ctx_.debug_recursion)
1306  << "Recurse[" << depth_ << "]: < bkt " << bkt[i]
1307  << " size " << bktsize << " lcp "
1308  << int(splitter_lcp_[i / 2] & 0x7F);
1309  this->substep_add();
1310  ctx_.enqueue(this, strptr_.flip(bkt[i], bktsize),
1311  depth_ + (splitter_lcp_[i / 2] & 0x7F));
1312  }
1313  ++i;
1314  // i is odd -> bkt[i] is equal bucket
1315  bktsize = bkt[i + 1] - bkt[i];
1316  if (bktsize == 0) {
1317  // empty bucket
1318  }
1319  else if (bktsize == 1) { // just one string pointer, copyback
1320  strptr_.flip(bkt[i], 1).copy_back();
1321  ctx_.donesize(1);
1322  }
1323  else
1324  {
1325  if (splitter_lcp_[i / 2] & 0x80) {
1326  // equal-bucket has nullptr-terminated key, done.
1327  TLX_LOGC(ctx_.debug_recursion)
1328  << "Recurse[" << depth_ << "]: = bkt " << bkt[i]
1329  << " size " << bktsize << " is done!";
1330  StringPtr sp = strptr_.flip(bkt[i], bktsize).copy_back();
1331  sp.fill_lcp(
1332  depth_ + lcpKeyDepth(classifier_.get_splitter(i / 2)));
1333  ctx_.donesize(bktsize);
1334  }
1335  else {
1336  TLX_LOGC(ctx_.debug_recursion)
1337  << "Recurse[" << depth_ << "]: = bkt " << bkt[i]
1338  << " size " << bktsize << " lcp keydepth!";
1339  this->substep_add();
1340  ctx_.enqueue(this, strptr_.flip(bkt[i], bktsize),
1341  depth_ + sizeof(key_type));
1342  }
1343  }
1344  ++i;
1345  }
1346 
1347  size_t bktsize = bkt[i + 1] - bkt[i];
1348 
1349  if (bktsize == 0) {
1350  // empty bucket
1351  }
1352  else if (bktsize == 1) { // just one string pointer, copyback
1353  strptr_.flip(bkt[i], 1).copy_back();
1354  ctx_.donesize(1);
1355  }
1356  else {
1357  TLX_LOGC(ctx_.debug_recursion)
1358  << "Recurse[" << depth_ << "]: > bkt " << bkt[i]
1359  << " size " << bktsize << " no lcp";
1360  this->substep_add();
1361  ctx_.enqueue(this, strptr_.flip(bkt[i], bktsize), depth_);
1362  }
1363 
1364  this->substep_notify_done(); // release anonymous subjob handle
1365 
1366  if (!strptr_.with_lcp)
1367  bkt_[0].destroy();
1368  }
1369 
1370  /*------------------------------------------------------------------------*/
1371  // After Recursive Sorting
1372 
1373  void substep_all_done() final {
1374  ScopedMultiTimer smt(ctx_.mtimer, "para_ss");
1375  if (strptr_.with_lcp) {
1376  TLX_LOGC(ctx_.debug_steps)
1377  << "pSampleSortStep[" << depth_ << "]: all substeps done.";
1378 
1379  ps5_sample_sort_lcp<bktnum_>(
1380  ctx_, classifier_, strptr_, depth_, bkt_[0].data());
1381  bkt_[0].destroy();
1382  }
1383 
1384  if (pstep_) pstep_->substep_notify_done();
1385  delete this;
1386  }
1387 };
1388 
1389 /******************************************************************************/
1390 // PS5Context::enqueue()
1391 
1392 template <typename Parameters>
1393 template <typename StringPtr>
1395  PS5SortStep* pstep, const StringPtr& strptr, size_t depth) {
1396  if (this->enable_parallel_sample_sort &&
1397  (strptr.size() > sequential_threshold() ||
1398  this->use_only_first_sortstep)) {
1399  new PS5BigSortStep<PS5Context, StringPtr>(*this, pstep, strptr, depth);
1400  }
1401  else {
1402  if (strptr.size() < (1LLU << 32)) {
1404  *this, pstep, strptr, depth);
1405  threads_.enqueue([j]() { j->run(); });
1406  }
1407  else {
1409  *this, pstep, strptr, depth);
1410  threads_.enqueue([j]() { j->run(); });
1411  }
1412  }
1413 }
1414 
1415 /******************************************************************************/
1416 // Externally Callable Sorting Methods
1417 
1418 //! Main Parallel Sample Sort Function. See below for more convenient wrappers.
1419 template <typename PS5Parameters, typename StringPtr>
1420 void parallel_sample_sort_base(const StringPtr& strptr, size_t depth) {
1421 
1422  using Context = PS5Context<PS5Parameters>;
1423  Context ctx(std::thread::hardware_concurrency());
1424  ctx.total_size = strptr.size();
1425  ctx.rest_size = strptr.size();
1426  ctx.num_threads = ctx.threads_.size();
1427 
1428  MultiTimer timer;
1429  timer.start("sort");
1430 
1431  ctx.enqueue(/* pstep */ nullptr, strptr, depth);
1432  ctx.threads_.loop_until_empty();
1433 
1434  timer.stop();
1435 
1436  assert(!ctx.enable_rest_size || ctx.rest_size == 0);
1437 
1438  using BigSortStep = PS5BigSortStep<Context, StringPtr>;
1439 
1440  TLX_LOGC(ctx.debug_result)
1441  << "RESULT"
1442  << " sizeof(key_type)=" << sizeof(typename PS5Parameters::key_type)
1443  << " splitter_treebits=" << size_t(BigSortStep::treebits_)
1444  << " num_splitters=" << size_t(BigSortStep::num_splitters_)
1445  << " num_threads=" << ctx.num_threads
1446  << " enable_work_sharing=" << size_t(ctx.enable_work_sharing)
1447  << " use_restsize=" << size_t(ctx.enable_rest_size)
1448  << " tm_para_ss=" << ctx.mtimer.get("para_ss")
1449  << " tm_seq_ss=" << ctx.mtimer.get("sequ_ss")
1450  << " tm_mkqs=" << ctx.mtimer.get("mkqs")
1451  << " tm_inssort=" << ctx.mtimer.get("inssort")
1452  << " tm_total=" << ctx.mtimer.total()
1453  << " tm_idle="
1454  << (ctx.num_threads * timer.total()) - ctx.mtimer.total()
1455  << " steps_para_sample_sort=" << ctx.para_ss_steps
1456  << " steps_seq_sample_sort=" << ctx.sequ_ss_steps
1457  << " steps_base_sort=" << ctx.base_sort_steps;
1458 }
1459 
1460 //! Parallel Sample Sort Function for a generic StringSet, this allocates the
1461 //! shadow array for flipping.
1462 template <typename PS5Parameters, typename StringPtr>
1465  const StringPtr& strptr, size_t depth, size_t memory = 0) {
1466  tlx::unused(memory);
1467 
1468  typedef typename StringPtr::StringSet StringSet;
1469  const StringSet& strset = strptr.active();
1470 
1472  typedef typename StringSet::Container Container;
1473 
1474  // allocate shadow pointer array
1475  Container shadow = strset.allocate(strset.size());
1476  StringShadowPtr new_strptr(strset, StringSet(shadow));
1477 
1478  parallel_sample_sort_base<PS5Parameters>(new_strptr, depth);
1479 
1480  StringSet::deallocate(shadow);
1481 }
1482 
1483 //! Parallel Sample Sort Function for a generic StringSet with LCPs, this
1484 //! allocates the shadow array for flipping.
1485 template <typename PS5Parameters, typename StringPtr>
1488  const StringPtr& strptr, size_t depth, size_t memory = 0) {
1489  tlx::unused(memory);
1490 
1491  typedef typename StringPtr::StringSet StringSet;
1492  typedef typename StringPtr::LcpType LcpType;
1493  const StringSet& strset = strptr.active();
1494 
1496  typedef typename StringSet::Container Container;
1497 
1498  // allocate shadow pointer array
1499  Container shadow = strset.allocate(strset.size());
1500  StringShadowLcpPtr new_strptr(strset, StringSet(shadow), strptr.lcp());
1501 
1502  parallel_sample_sort_base<PS5Parameters>(new_strptr, depth);
1503 
1504  StringSet::deallocate(shadow);
1505 }
1506 
1507 //! Parallel Sample Sort Function with default parameter size for a generic
1508 //! StringSet.
1509 template <typename StringPtr>
1511  const StringPtr& strptr, size_t depth, size_t memory) {
1512  return parallel_sample_sort_params<PS5ParametersDefault>(
1513  strptr, depth, memory);
1514 }
1515 
1516 } // namespace sort_strings_detail
1517 } // namespace tlx
1518 
1519 #endif // !TLX_SORT_STRINGS_PARALLEL_SAMPLE_SORT_HEADER
1520 
1521 /******************************************************************************/
size_t size() const
return valid length
Definition: string_ptr.hpp:66
iterator end() noexcept
return mutable iterator beyond last element
PS5BigSortStep(Context &ctx, PS5SortStep *pstep, const StringPtr &strptr, size_t depth)
std::atomic< size_t > substep_working_
Number of substeps still running.
size_t sequential_threshold()
return sequential sorting threshold
PS5SmallsortJob(Context &ctx, PS5SortStep *pstep, const StringPtr &strptr, size_t depth)
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
unsigned ctz(Integral x)
size_t psize_
size of all parts except the last
unsigned clz(Integral x)
void resize(size_type new_size)
resize the array to contain exactly new_size items
void parallel_sample_sort(const StringPtr &strptr, size_t depth, size_t memory)
const char * running() const
return name of currently running timer.
MultiTimer mtimer
timers for individual sorting steps
void enqueue(PS5SortStep *sstep, const StringPtr &strptr, size_t depth)
enqueue a new job in the thread pool
static const bool enable_parallel_sample_sort
enable/disable various sorting levels
static const unsigned TreeBits
depth of classification tree used in sample sorts
PS5SortStep * pstep_
parent sort step for notification
void sort_mkqs_cache(const StringPtr &strptr, size_t depth)
static unsigned char getCharAtDepth(const KeyType &a, unsigned char d)
return the d-th character in the (swapped) key
Type
VFS object type.
Definition: file_io.hpp:52
Parallel Super Scalar String Sample Sort Context.
Simpler non-growing vector without initialization.
Parallel Super Scalar String Sample Sort Parameter Struct.
static size_t med3(Type *A, size_t i, size_t j, size_t k)
void ps5_sample_sort_lcp(const Context &ctx, const Classify &classifier, const StringPtr &strptr, size_t depth, const BktSizeType *bkt)
LCP Calculation for Finished Sample Sort Steps.
void set_lcp(size_t, const LcpType &) const
set the i-th lcp to v and check its value
Definition: string_ptr.hpp:79
void substep_all_done() final
Pure virtual function called by substep when all substeps are done.
MultiTimer can be used to measure time usage of different phases in a program or algorithm.
Definition: multi_timer.hpp:36
void substep_add()
Register new substep.
static const size_t smallsort_threshold
threshold to run sequential small sorts
static const bool enable_work_sharing
enable work freeing
iterator data() noexcept
return iterator to beginning of vector
PS5BigSortStep Out-of-Place Parallel Sample Sort with Separate Jobs.
Context::Classify classifier_
classifier instance and variables (contains splitter tree
void unused(Types &&...)
Definition: unused.hpp:20
SSClassifyTreeCalcUnrollInterleave< key_type, TreeBits > Classify
classification tree variant for sample sorts
static void insertion_sort_cache_block(const StringPtr &strptr, key_type *cache)
Insertion sort the strings only based on the cached characters.
StringPtr sub(size_t offset, size_t sub_size) const
Advance (both) pointers by given offset, return sub-array.
Definition: string_ptr.hpp:69
static const bool use_only_first_sortstep
terminate sort after first parallel sample sort step
void substep_notify_done()
Notify superstep that the currently substep is done.
std::atomic< size_t > rest_size
number of remaining strings to sort
static enable_if<!StringPtr::with_lcp, void >::type insertion_sort(const StringPtr &strptr, size_t depth, size_t)
iterator begin() noexcept
return mutable iterator to first element
static const bool with_lcp
if we want to save the LCPs
Definition: string_ptr.hpp:75
PS5Context(size_t _thread_num)
context constructor
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
void substep_all_done() final
Pure virtual function called by substep when all substeps are done.
static int cmp(const key_type &a, const key_type &b)
Stack of Recursive MKQS Steps.
void stop()
stop the currently running timer.
Definition: multi_timer.cpp:84
const StringSet & active() const
return currently active array
Definition: string_ptr.hpp:63
size_t key_type
key type for sample sort: 32-bit or 64-bit
static unsigned char lcpKeyDepth(const KeyType &a)
double total() const
return total duration of all timers.
size_type size() const noexcept
return number of items in vector
StringPtr strptr_
string pointers, size, and current sorting depth
SFINAE enable_if – copy of std::enable_if<> with less extra cruft.
Definition: enable_if.hpp:21
static double threshold()
Definition: hyperloglog.cpp:36
std::atomic< size_t > pwork_
number of threads still working
static const size_t inssort_threshold
threshold to switch to insertion sort
simple_vector< simple_vector< size_t > > bkt_
individual bucket array of threads, keep bkt[0] for DistributeJob
#define TLX_LOGC(cond)
Explicitly specify the condition for logging.
Definition: core.hpp:137
size_t num_threads
number of threads overall
simple_vector< simple_vector< uint16_t > > bktcache_
bucket ids cache, created by classifier and later counted
void start(const char *timer)
start new timer phase, stop the currently running one.
Definition: multi_timer.cpp:66
SampleSort: Non-Recursive In-Place Sequential Sample Sort for Small Sorts.
std::vector< SeqSampleSortStep > ss_stack_
void fill_lcp(const LcpType &) const
fill entire LCP array with v, excluding the first lcp[0] position!
Definition: string_ptr.hpp:83
MKQSStep(Context &ctx, const StringPtr &strptr, key_type *cache, size_t depth, bool CacheDirty)
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
size_t parts_
number of parts into which the strings were split
void sort_sample_sort(const StringPtr &strptr, size_t depth)
void destroy()
deallocate contained array
static unsigned char lcpKeyType(const KeyType &a, const KeyType &b)
LCP calculation of Splitter Strings.
enable_if<!StringPtr::with_lcp, void >::type parallel_sample_sort_params(const StringPtr &strptr, size_t depth, size_t memory=0)
ThreadPool starts a fixed number p of std::threads which process Jobs that are enqueued into a concur...
Definition: thread_pool.hpp:64
SeqSampleSortStep(Context &ctx, const StringPtr &strptr, size_t depth, uint16_t *bktcache)
void donesize(size_t n)
decrement number of unordered strings
PS5SortStep Top-Level Class to Keep Track of Substeps.
Objectified string array pointer array.
Definition: string_ptr.hpp:47
static void insertion_sort_cache(const StringPtr &_strptr, key_type *cache, size_t depth)
Insertion sort, but use cached characters if possible.
void parallel_sample_sort_base(const StringPtr &strptr, size_t depth)
Main Parallel Sample Sort Function. See below for more convenient wrappers.