Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
parallel_mergesort.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * tlx/sort/parallel_mergesort.hpp
3  *
4  * **EXPERIMENTAL** Parallel multiway mergesort **EXPERIMENTAL**
5  *
6  * Copied and modified from STXXL, see http://stxxl.org, which itself extracted
7  * it from MCSTL http://algo2.iti.uni-karlsruhe.de/singler/mcstl/. Both are
8  * distributed under the Boost Software License, Version 1.0.
9  *
10  * Part of tlx - http://panthema.net/tlx
11  *
12  * Copyright (C) 2007 Johannes Singler <[email protected]>
13  * Copyright (C) 2014-2018 Timo Bingmann <[email protected]>
14  *
15  * All rights reserved. Published under the Boost Software License, Version 1.0
16  ******************************************************************************/
17 
18 #ifndef TLX_SORT_PARALLEL_MERGESORT_HEADER
19 #define TLX_SORT_PARALLEL_MERGESORT_HEADER
20 
21 #if defined(_OPENMP)
22 
23 #include <algorithm>
24 #include <functional>
25 #include <utility>
26 #include <vector>
27 
30 #include <tlx/simple_vector.hpp>
31 
32 #include <omp.h>
33 
34 namespace tlx {
35 
36 //! \addtogroup tlx_sort
37 //! \{
38 
39 namespace parallel_mergesort_detail {
40 
41 //! Subsequence description.
42 template <typename DiffType>
43 struct PMWMSPiece {
44  //! Begin of subsequence.
45  DiffType begin;
46  //! End of subsequence.
47  DiffType end;
48 };
49 
50 /*!
51  * Data accessed by all threads.
52  *
53  * PMWMS = parallel multiway mergesort
54  */
55 template <typename RandomAccessIterator>
56 struct PMWMSSortingData {
57  using ValueType =
58  typename std::iterator_traits<RandomAccessIterator>::value_type;
59  using DiffType =
60  typename std::iterator_traits<RandomAccessIterator>::difference_type;
61 
62  //! Input begin.
63  RandomAccessIterator source;
64  //! Start indices, per thread.
65  simple_vector<DiffType> starts;
66 
67  /** Storage in which to sort. */
68  simple_vector<ValueType*> temporary;
69  /** Samples. */
70  simple_vector<ValueType> samples;
71  /** Offsets to add to the found positions. */
72  simple_vector<DiffType> offsets;
73  /** PMWMSPieces of data to merge \c [thread][sequence] */
74  simple_vector<std::vector<PMWMSPiece<DiffType> > > pieces;
75 
76  explicit PMWMSSortingData(size_t num_threads)
77  : starts(num_threads + 1),
78  temporary(num_threads),
79  offsets(num_threads - 1),
80  pieces(num_threads)
81  { }
82 };
83 
84 /*!
85  * Select samples from a sequence.
86  * \param sd Pointer to sorting data struct. Result will be placed in \c sd->samples.
87  * \param num_samples Number of samples to select.
88  * \param iam my thread number
89  * \param num_threads number of threads in group
90  */
91 template <typename RandomAccessIterator, typename DiffType>
92 void determine_samples(PMWMSSortingData<RandomAccessIterator>* sd,
93  DiffType& num_samples,
94  size_t iam,
95  size_t num_threads) {
96 
97  num_samples = parallel_multiway_merge_oversampling * num_threads - 1;
98 
99  std::vector<DiffType> es(num_samples + 2);
101  sd->starts[iam + 1] - sd->starts[iam],
102  static_cast<size_t>(num_samples + 1), es.begin());
103 
104  for (DiffType i = 0; i < num_samples; i++)
105  sd->samples[iam * num_samples + i] = sd->source[sd->starts[iam] + es[i + 1]];
106 }
107 
108 /*!
109  * PMWMS code executed by each thread.
110  * \param sd Pointer to sorting data struct.
111  * \param iam my thread number
112  * \param num_threads number of threads in group
113  * \param comp Comparator.
114  * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
115  */
116 template <bool Stable, typename RandomAccessIterator, typename Comparator>
117 void parallel_sort_mwms_pu(PMWMSSortingData<RandomAccessIterator>* sd,
118  size_t iam,
119  size_t num_threads,
120  Comparator& comp,
122  using ValueType =
123  typename std::iterator_traits<RandomAccessIterator>::value_type;
124  using DiffType =
125  typename std::iterator_traits<RandomAccessIterator>::difference_type;
126 
127  // length of this thread's chunk, before merging
128  DiffType length_local = sd->starts[iam + 1] - sd->starts[iam];
129 
130  using SortingPlacesIterator = ValueType *;
131  // sort in temporary storage, leave space for sentinel
132  sd->temporary[iam] =
133  static_cast<ValueType*>(::operator new (sizeof(ValueType) * (length_local + 1)));
134  // copy there
135  std::uninitialized_copy(sd->source + sd->starts[iam],
136  sd->source + sd->starts[iam] + length_local,
137  sd->temporary[iam]);
138 
139  // sort locally
140  if (Stable)
141  std::stable_sort(sd->temporary[iam],
142  sd->temporary[iam] + length_local, comp);
143  else
144  std::sort(sd->temporary[iam],
145  sd->temporary[iam] + length_local, comp);
146 
147  // invariant: locally sorted subsequence in sd->temporary[iam],
148  // sd->temporary[iam] + length_local
149 
150  if (mwmsa == MWMSA_SAMPLING)
151  {
152  DiffType num_samples;
153  determine_samples(sd, num_samples, iam, num_threads);
154 
155 #pragma omp barrier
156 
157 #pragma omp single
158  std::sort(sd->samples.begin(), sd->samples.end(), comp);
159 
160 #pragma omp barrier
161 
162  for (size_t s = 0; s < num_threads; s++)
163  {
164  // for each sequence
165  if (num_samples * iam > 0)
166  sd->pieces[iam][s].begin =
167  std::lower_bound(sd->temporary[s],
168  sd->temporary[s] + sd->starts[s + 1] - sd->starts[s],
169  sd->samples[num_samples * iam],
170  comp)
171  - sd->temporary[s];
172  else
173  // absolute beginning
174  sd->pieces[iam][s].begin = 0;
175 
176  if ((num_samples * (iam + 1)) < (num_samples * num_threads))
177  sd->pieces[iam][s].end =
178  std::lower_bound(sd->temporary[s],
179  sd->temporary[s] + sd->starts[s + 1] - sd->starts[s],
180  sd->samples[num_samples * (iam + 1)],
181  comp)
182  - sd->temporary[s];
183  else
184  // absolute end
185  sd->pieces[iam][s].end = sd->starts[s + 1] - sd->starts[s];
186  }
187  }
188  else if (mwmsa == MWMSA_EXACT)
189  {
190 #pragma omp barrier
191 
192  simple_vector<std::pair<SortingPlacesIterator,
193  SortingPlacesIterator> > seqs(num_threads);
194 
195  for (size_t s = 0; s < num_threads; s++)
196  seqs[s] = std::make_pair(
197  sd->temporary[s],
198  sd->temporary[s] + sd->starts[s + 1] - sd->starts[s]);
199 
200  std::vector<SortingPlacesIterator> offsets(num_threads);
201 
202  // if not last thread
203  if (iam < num_threads - 1)
204  multisequence_partition(seqs.begin(), seqs.end(),
205  sd->starts[iam + 1], offsets.begin(), comp);
206 
207  for (size_t seq = 0; seq < num_threads; seq++)
208  {
209  // for each sequence
210  if (iam < (num_threads - 1))
211  sd->pieces[iam][seq].end = offsets[seq] - seqs[seq].first;
212  else
213  // absolute end of this sequence
214  sd->pieces[iam][seq].end = sd->starts[seq + 1] - sd->starts[seq];
215  }
216 
217 #pragma omp barrier
218 
219  for (size_t seq = 0; seq < num_threads; seq++)
220  {
221  // for each sequence
222  if (iam > 0)
223  sd->pieces[iam][seq].begin = sd->pieces[iam - 1][seq].end;
224  else
225  // absolute beginning
226  sd->pieces[iam][seq].begin = 0;
227  }
228  }
229 
230  // offset from target begin, length after merging
231  DiffType offset = 0, length_am = 0;
232  for (size_t s = 0; s < num_threads; s++)
233  {
234  length_am += sd->pieces[iam][s].end - sd->pieces[iam][s].begin;
235  offset += sd->pieces[iam][s].begin;
236  }
237 
238  // merge directly to target
239 
240  std::vector<std::pair<SortingPlacesIterator,
241  SortingPlacesIterator> > seqs(num_threads);
242 
243  for (size_t s = 0; s < num_threads; s++)
244  {
245  seqs[s] = std::make_pair(
246  sd->temporary[s] + sd->pieces[iam][s].begin,
247  sd->temporary[s] + sd->pieces[iam][s].end);
248  }
249 
250  multiway_merge_base<Stable, /* Sentinels */ false>(
251  seqs.begin(), seqs.end(),
252  sd->source + offset, length_am, comp);
253 
254 #pragma omp barrier
255 
256  delete sd->temporary[iam];
257 }
258 
259 } // namespace parallel_mergesort_detail
260 
261 //! \name Parallel Sorting Algorithms
262 //! \{
263 
264 /*!
265  * Parallel multiway mergesort main call.
266  *
267  * \param begin Begin iterator of sequence.
268  * \param end End iterator of sequence.
269  * \param comp Comparator.
270  * \param num_threads Number of threads to use.
271  * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
272  * \tparam Stable Stable sorting.
273  */
274 template <bool Stable,
275  typename RandomAccessIterator, typename Comparator>
276 void parallel_mergesort_base(
277  RandomAccessIterator begin,
278  RandomAccessIterator end,
279  Comparator comp,
280  size_t num_threads = std::thread::hardware_concurrency(),
282 
283  using namespace parallel_mergesort_detail;
284 
285  using DiffType =
286  typename std::iterator_traits<RandomAccessIterator>::difference_type;
287 
288  DiffType n = end - begin;
289 
290  if (n <= 1)
291  return;
292 
293  // at least one element per thread
294  if (num_threads > static_cast<size_t>(n))
295  num_threads = static_cast<size_t>(n);
296 
297  PMWMSSortingData<RandomAccessIterator> sd(num_threads);
298  sd.source = begin;
299 
300  if (mwmsa == MWMSA_SAMPLING)
301  sd.samples.resize(
302  num_threads * (parallel_multiway_merge_oversampling * num_threads - 1));
303 
304  for (size_t s = 0; s < num_threads; s++)
305  sd.pieces[s].resize(num_threads);
306 
307  DiffType* starts = sd.starts.data();
308 
309  DiffType chunk_length = n / num_threads, split = n % num_threads, start = 0;
310  for (size_t i = 0; i < num_threads; i++)
311  {
312  starts[i] = start;
313  start += (i < static_cast<size_t>(split))
314  ? (chunk_length + 1) : chunk_length;
315  }
316  starts[num_threads] = start;
317 
318  // now sort in parallel
319 #pragma omp parallel num_threads(num_threads)
320  {
321  size_t iam = omp_get_thread_num();
322  parallel_sort_mwms_pu<Stable>(&sd, iam, num_threads, comp, mwmsa);
323  }
324 }
325 
326 /*!
327  * Parallel multiway mergesort.
328  *
329  * \param begin Begin iterator of sequence.
330  * \param end End iterator of sequence.
331  * \param comp Comparator.
332  * \param num_threads Number of threads to use.
333  * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
334  */
335 template <typename RandomAccessIterator,
336  typename Comparator = std::less<
337  typename std::iterator_traits<RandomAccessIterator>::value_type> >
338 void parallel_mergesort(
339  RandomAccessIterator begin,
340  RandomAccessIterator end,
341  Comparator comp = Comparator(),
342  size_t num_threads = std::thread::hardware_concurrency(),
344 
345  return parallel_mergesort_base</* Stable */ false>(
346  begin, end, comp, num_threads, mwmsa);
347 }
348 
349 /*!
350  * Stable parallel multiway mergesort.
351  *
352  * \param begin Begin iterator of sequence.
353  * \param end End iterator of sequence.
354  * \param comp Comparator.
355  * \param num_threads Number of threads to use.
356  * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
357  */
358 template <typename RandomAccessIterator,
359  typename Comparator = std::less<
360  typename std::iterator_traits<RandomAccessIterator>::value_type> >
361 void stable_parallel_mergesort(
362  RandomAccessIterator begin,
363  RandomAccessIterator end,
364  Comparator comp = Comparator(),
365  size_t num_threads = std::thread::hardware_concurrency(),
367 
368  return parallel_mergesort_base</* Stable */ true>(
369  begin, end, comp, num_threads, mwmsa);
370 }
371 
372 //! \}
373 //! \}
374 
375 } // namespace tlx
376 
377 #endif // defined(_OPENMP)
378 
379 #endif // !TLX_SORT_PARALLEL_MERGESORT_HEADER
380 
381 /******************************************************************************/
SimpleVector< T > simple_vector
make template alias due to similarity with std::vector
DiffTypeOutputIterator equally_split(DiffType n, size_t p, DiffTypeOutputIterator s)
Split a sequence into parts of almost equal size.
void multisequence_partition(const RanSeqs &begin_seqs, const RanSeqs &end_seqs, const RankType &rank, RankIterator begin_offsets, Comparator comp=Comparator())
Splits several sorted sequences at a certain global rank, resulting in a splitting point for each seq...
std::vector< std::string > split(char sep, const std::string &str, std::string::size_type limit)
Split the given string at each separator character into distinct substrings.
Definition: split.cpp:20
MultiwayMergeSplittingAlgorithm
Different splitting strategies for sorting/merging: by sampling, exact.
std::vector< T, Allocator< T > > vector
vector with Manager tracking
Definition: allocator.hpp:228
RandomAccessIterator3 multiway_merge_base(RandomAccessIteratorIterator seqs_begin, RandomAccessIteratorIterator seqs_end, RandomAccessIterator3 target, typename std::iterator_traits< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type::first_type >::difference_type size, Comparator comp=Comparator(), MultiwayMergeAlgorithm mwma=MWMA_ALGORITHM_DEFAULT)
Sequential multi-way merging switch.
size_t parallel_multiway_merge_oversampling
default oversampling factor for parallel_multiway_merge