Thrill  0.1
parallel_mergesort.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * tlx/sort/parallel_mergesort.hpp
3  *
4  * **EXPERIMENTAL** Parallel multiway mergesort **EXPERIMENTAL**
5  *
6  * Copied and modified from STXXL, see http://stxxl.org, which itself extracted
7  * it from MCSTL http://algo2.iti.uni-karlsruhe.de/singler/mcstl/. Both are
8  * distributed under the Boost Software License, Version 1.0.
9  *
10  * Part of tlx - http://panthema.net/tlx
11  *
12  * Copyright (C) 2007 Johannes Singler <[email protected]>
13  * Copyright (C) 2014-2018 Timo Bingmann <[email protected]>
14  *
15  * All rights reserved. Published under the Boost Software License, Version 1.0
16  ******************************************************************************/
17 
18 #ifndef TLX_SORT_PARALLEL_MERGESORT_HEADER
19 #define TLX_SORT_PARALLEL_MERGESORT_HEADER
20 
21 #include <algorithm>
22 #include <functional>
23 #include <thread>
24 #include <utility>
25 
26 #if defined(_OPENMP)
27 #include <omp.h>
28 #endif
29 
32 #include <tlx/simple_vector.hpp>
34 
35 namespace tlx {
36 
37 //! \addtogroup tlx_sort
38 //! \{
39 
40 namespace parallel_mergesort_detail {
41 
42 //! Subsequence description.
43 template <typename DiffType>
44 struct PMWMSPiece {
45  //! Begin of subsequence.
46  DiffType begin;
47  //! End of subsequence.
48  DiffType end;
49 };
50 
51 /*!
52  * Data accessed by all threads.
53  *
54  * PMWMS = parallel multiway mergesort
55  */
56 template <typename RandomAccessIterator>
58  using ValueType =
59  typename std::iterator_traits<RandomAccessIterator>::value_type;
60  using DiffType =
61  typename std::iterator_traits<RandomAccessIterator>::difference_type;
62 
63  //! Input begin.
64  RandomAccessIterator source;
65  //! Start indices, per thread.
67 
68  /** Storage in which to sort. */
70  /** Samples. */
72  /** Offsets to add to the found positions. */
74  /** PMWMSPieces of data to merge \c [thread][sequence] */
76 
77  explicit PMWMSSortingData(size_t num_threads)
78  : starts(num_threads + 1),
79  temporary(num_threads),
80  offsets(num_threads - 1),
81  pieces(num_threads)
82  { }
83 };
84 
85 /*!
86  * Select samples from a sequence.
87  * \param sd Pointer to sorting data struct. Result will be placed in \c sd->samples.
88  * \param num_samples Number of samples to select.
89  * \param iam my thread number
90  * \param num_threads number of threads in group
91  */
92 template <typename RandomAccessIterator, typename DiffType>
94  DiffType& num_samples,
95  size_t iam,
96  size_t num_threads) {
97 
98  num_samples = parallel_multiway_merge_oversampling * num_threads - 1;
99 
100  simple_vector<DiffType> es(num_samples + 2);
102  sd->starts[iam + 1] - sd->starts[iam],
103  static_cast<size_t>(num_samples + 1), es.begin());
104 
105  for (DiffType i = 0; i < num_samples; i++) {
106  sd->samples[iam * num_samples + i] = sd->source[sd->starts[iam] + es[i + 1]];
107  }
108 }
109 
110 /*!
111  * PMWMS code executed by each thread.
112  * \param sd Pointer to sorting data struct.
113  * \param iam my thread number
114  * \param num_threads number of threads in group
115  * \param barrier thread barrier from main function
116  * \param comp Comparator.
117  * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
118  */
119 template <bool Stable, typename RandomAccessIterator, typename Comparator>
121  size_t iam,
122  size_t num_threads,
123  ThreadBarrierMutex& barrier,
124  Comparator& comp,
126  using ValueType =
127  typename std::iterator_traits<RandomAccessIterator>::value_type;
128  using DiffType =
129  typename std::iterator_traits<RandomAccessIterator>::difference_type;
130 
131  // length of this thread's chunk, before merging
132  DiffType length_local = sd->starts[iam + 1] - sd->starts[iam];
133 
134  using SortingPlacesIterator = ValueType *;
135 
136  // sort in temporary storage, leave space for sentinel
137  sd->temporary[iam] = static_cast<ValueType*>(
138  ::operator new (sizeof(ValueType) * (length_local + 1)));
139 
140  // copy there
141  std::uninitialized_copy(sd->source + sd->starts[iam],
142  sd->source + sd->starts[iam] + length_local,
143  sd->temporary[iam]);
144 
145  // sort locally
146  if (Stable)
147  std::stable_sort(sd->temporary[iam],
148  sd->temporary[iam] + length_local, comp);
149  else
150  std::sort(sd->temporary[iam],
151  sd->temporary[iam] + length_local, comp);
152 
153  // invariant: locally sorted subsequence in sd->temporary[iam],
154  // sd->temporary[iam] + length_local
155 
156  if (mwmsa == MWMSA_SAMPLING)
157  {
158  DiffType num_samples;
159  determine_samples(sd, num_samples, iam, num_threads);
160 
161  barrier.wait(
162  [&]() {
163  std::sort(sd->samples.begin(), sd->samples.end(), comp);
164  });
165 
166  for (size_t s = 0; s < num_threads; s++)
167  {
168  // for each sequence
169  if (num_samples * iam > 0)
170  sd->pieces[iam][s].begin =
171  std::lower_bound(sd->temporary[s],
172  sd->temporary[s] + sd->starts[s + 1] - sd->starts[s],
173  sd->samples[num_samples * iam],
174  comp)
175  - sd->temporary[s];
176  else
177  // absolute beginning
178  sd->pieces[iam][s].begin = 0;
179 
180  if ((num_samples * (iam + 1)) < (num_samples * num_threads))
181  sd->pieces[iam][s].end =
182  std::lower_bound(sd->temporary[s],
183  sd->temporary[s] + sd->starts[s + 1] - sd->starts[s],
184  sd->samples[num_samples * (iam + 1)],
185  comp)
186  - sd->temporary[s];
187  else
188  // absolute end
189  sd->pieces[iam][s].end = sd->starts[s + 1] - sd->starts[s];
190  }
191  }
192  else if (mwmsa == MWMSA_EXACT)
193  {
194  barrier.wait();
195 
196  simple_vector<std::pair<SortingPlacesIterator,
197  SortingPlacesIterator> > seqs(num_threads);
198 
199  for (size_t s = 0; s < num_threads; s++)
200  seqs[s] = std::make_pair(
201  sd->temporary[s],
202  sd->temporary[s] + sd->starts[s + 1] - sd->starts[s]);
203 
204  simple_vector<SortingPlacesIterator> offsets(num_threads);
205 
206  // if not last thread
207  if (iam < num_threads - 1)
208  multisequence_partition(seqs.begin(), seqs.end(),
209  sd->starts[iam + 1], offsets.begin(), comp);
210 
211  for (size_t seq = 0; seq < num_threads; seq++)
212  {
213  // for each sequence
214  if (iam < (num_threads - 1))
215  sd->pieces[iam][seq].end = offsets[seq] - seqs[seq].first;
216  else
217  // absolute end of this sequence
218  sd->pieces[iam][seq].end = sd->starts[seq + 1] - sd->starts[seq];
219  }
220 
221  barrier.wait();
222 
223  for (size_t seq = 0; seq < num_threads; seq++)
224  {
225  // for each sequence
226  if (iam > 0)
227  sd->pieces[iam][seq].begin = sd->pieces[iam - 1][seq].end;
228  else
229  // absolute beginning
230  sd->pieces[iam][seq].begin = 0;
231  }
232  }
233 
234  // offset from target begin, length after merging
235  DiffType offset = 0, length_am = 0;
236  for (size_t s = 0; s < num_threads; s++)
237  {
238  length_am += sd->pieces[iam][s].end - sd->pieces[iam][s].begin;
239  offset += sd->pieces[iam][s].begin;
240  }
241 
242  // merge directly to target
243 
244  simple_vector<std::pair<SortingPlacesIterator,
245  SortingPlacesIterator> > seqs(num_threads);
246 
247  for (size_t s = 0; s < num_threads; s++)
248  {
249  seqs[s] = std::make_pair(
250  sd->temporary[s] + sd->pieces[iam][s].begin,
251  sd->temporary[s] + sd->pieces[iam][s].end);
252  }
253 
254  multiway_merge_base<Stable, /* Sentinels */ false>(
255  seqs.begin(), seqs.end(),
256  sd->source + offset, length_am, comp);
257 
258  barrier.wait();
259 
260  operator delete (sd->temporary[iam]);
261 }
262 
263 } // namespace parallel_mergesort_detail
264 
265 //! \name Parallel Sorting Algorithms
266 //! \{
267 
268 /*!
269  * Parallel multiway mergesort main call.
270  *
271  * \param begin Begin iterator of sequence.
272  * \param end End iterator of sequence.
273  * \param comp Comparator.
274  * \param num_threads Number of threads to use.
275  * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
276  * \tparam Stable Stable sorting.
277  */
278 template <bool Stable,
279  typename RandomAccessIterator, typename Comparator>
281  RandomAccessIterator begin,
282  RandomAccessIterator end,
283  Comparator comp,
284  size_t num_threads = std::thread::hardware_concurrency(),
286 
287  using namespace parallel_mergesort_detail;
288 
289  using DiffType =
290  typename std::iterator_traits<RandomAccessIterator>::difference_type;
291 
292  DiffType n = end - begin;
293 
294  if (n <= 1)
295  return;
296 
297  // at least one element per thread
298  if (num_threads > static_cast<size_t>(n))
299  num_threads = static_cast<size_t>(n);
300 
301  PMWMSSortingData<RandomAccessIterator> sd(num_threads);
302  sd.source = begin;
303 
304  if (mwmsa == MWMSA_SAMPLING) {
305  sd.samples.resize(
306  num_threads * (parallel_multiway_merge_oversampling * num_threads - 1));
307  }
308 
309  for (size_t s = 0; s < num_threads; s++)
310  sd.pieces[s].resize(num_threads);
311 
312  DiffType* starts = sd.starts.data();
313 
314  DiffType chunk_length = n / num_threads, split = n % num_threads, start = 0;
315  for (size_t i = 0; i < num_threads; i++)
316  {
317  starts[i] = start;
318  start += (i < static_cast<size_t>(split))
319  ? (chunk_length + 1) : chunk_length;
320  }
321  starts[num_threads] = start;
322 
323  // now sort in parallel
324 
325  ThreadBarrierMutex barrier(num_threads);
326 
327 #if defined(_OPENMP)
328 #pragma omp parallel num_threads(num_threads)
329  {
330  size_t iam = omp_get_thread_num();
331  parallel_sort_mwms_pu<Stable>(
332  &sd, iam, num_threads, barrier, comp, mwmsa);
333  }
334 #else
335  simple_vector<std::thread> threads(num_threads);
336  for (size_t iam = 0; iam < num_threads; ++iam) {
337  threads[iam] = std::thread(
338  [&, iam]() {
339  parallel_sort_mwms_pu<Stable>(
340  &sd, iam, num_threads, barrier, comp, mwmsa);
341  });
342  }
343  for (size_t i = 0; i < num_threads; i++) {
344  threads[i].join();
345  }
346 #endif // defined(_OPENMP)
347 }
348 
349 /*!
350  * Parallel multiway mergesort.
351  *
352  * \param begin Begin iterator of sequence.
353  * \param end End iterator of sequence.
354  * \param comp Comparator.
355  * \param num_threads Number of threads to use.
356  * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
357  */
358 template <typename RandomAccessIterator,
359  typename Comparator = std::less<
360  typename std::iterator_traits<RandomAccessIterator>::value_type> >
362  RandomAccessIterator begin,
363  RandomAccessIterator end,
364  Comparator comp = Comparator(),
365  size_t num_threads = std::thread::hardware_concurrency(),
367 
368  return parallel_mergesort_base</* Stable */ false>(
369  begin, end, comp, num_threads, mwmsa);
370 }
371 
372 /*!
373  * Stable parallel multiway mergesort.
374  *
375  * \param begin Begin iterator of sequence.
376  * \param end End iterator of sequence.
377  * \param comp Comparator.
378  * \param num_threads Number of threads to use.
379  * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
380  */
381 template <typename RandomAccessIterator,
382  typename Comparator = std::less<
383  typename std::iterator_traits<RandomAccessIterator>::value_type> >
385  RandomAccessIterator begin,
386  RandomAccessIterator end,
387  Comparator comp = Comparator(),
388  size_t num_threads = std::thread::hardware_concurrency(),
390 
391  return parallel_mergesort_base</* Stable */ true>(
392  begin, end, comp, num_threads, mwmsa);
393 }
394 
395 //! \}
396 //! \}
397 
398 } // namespace tlx
399 
400 #endif // !TLX_SORT_PARALLEL_MERGESORT_HEADER
401 
402 /******************************************************************************/
simple_vector< DiffType > starts
Start indices, per thread.
DiffTypeOutputIterator equally_split(DiffType n, size_t p, DiffTypeOutputIterator s)
Split a sequence into parts of almost equal size.
void parallel_mergesort_base(RandomAccessIterator begin, RandomAccessIterator end, Comparator comp, size_t num_threads=std::thread::hardware_concurrency(), MultiwayMergeSplittingAlgorithm mwmsa=MWMSA_DEFAULT)
Parallel multiway mergesort main call.
void stable_parallel_mergesort(RandomAccessIterator begin, RandomAccessIterator end, Comparator comp=Comparator(), size_t num_threads=std::thread::hardware_concurrency(), MultiwayMergeSplittingAlgorithm mwmsa=MWMSA_DEFAULT)
Stable parallel multiway mergesort.
Simpler non-growing vector without initialization.
void parallel_sort_mwms_pu(PMWMSSortingData< RandomAccessIterator > *sd, size_t iam, size_t num_threads, ThreadBarrierMutex &barrier, Comparator &comp, MultiwayMergeSplittingAlgorithm mwmsa)
PMWMS code executed by each thread.
void multisequence_partition(const RanSeqs &begin_seqs, const RanSeqs &end_seqs, const RankType &rank, RankIterator begin_offsets, Comparator comp=Comparator())
Splits several sorted sequences at a certain global rank, resulting in a splitting point for each seq...
std::vector< std::string > split(char sep, const std::string &str, std::string::size_type limit)
Split the given string at each separator character into distinct substrings.
Definition: split.cpp:20
void wait(Lambda lambda=Lambda())
Waits for n threads to arrive.
iterator begin() noexcept
return mutable iterator to first element
MultiwayMergeSplittingAlgorithm
Different splitting strategies for sorting/merging: by sampling, exact.
void parallel_mergesort(RandomAccessIterator begin, RandomAccessIterator end, Comparator comp=Comparator(), size_t num_threads=std::thread::hardware_concurrency(), MultiwayMergeSplittingAlgorithm mwmsa=MWMSA_DEFAULT)
Parallel multiway mergesort.
RandomAccessIterator3 multiway_merge_base(RandomAccessIteratorIterator seqs_begin, RandomAccessIteratorIterator seqs_end, RandomAccessIterator3 target, typename std::iterator_traits< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type::first_type >::difference_type size, Comparator comp=Comparator(), MultiwayMergeAlgorithm mwma=MWMA_ALGORITHM_DEFAULT)
Sequential multi-way merging switch.
Implements a thread barrier using mutex locking and condition variables that can be used to synchroni...
void determine_samples(PMWMSSortingData< RandomAccessIterator > *sd, DiffType &num_samples, size_t iam, size_t num_threads)
Select samples from a sequence.
size_t parallel_multiway_merge_oversampling
default oversampling factor for parallel_multiway_merge
typename std::iterator_traits< RandomAccessIterator >::difference_type DiffType
simple_vector< simple_vector< PMWMSPiece< DiffType > > > pieces
typename std::iterator_traits< RandomAccessIterator >::value_type ValueType