Thrill  0.1
parallel_multiway_merge.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * tlx/algorithm/parallel_multiway_merge.hpp
3  *
4  * Parallel multiway merge.
5  *
6  * Copied and modified from STXXL, see http://stxxl.org, which itself extracted
7  * it from MCSTL http://algo2.iti.uni-karlsruhe.de/singler/mcstl/. Both are
8  * distributed under the Boost Software License, Version 1.0.
9  *
10  * Part of tlx - http://panthema.net/tlx
11  *
12  * Copyright (C) 2007 Johannes Singler <[email protected]>
13  * Copyright (C) 2014-2018 Timo Bingmann <[email protected]>
14  *
15  * All rights reserved. Published under the Boost Software License, Version 1.0
16  ******************************************************************************/
17 
18 #ifndef TLX_ALGORITHM_PARALLEL_MULTIWAY_MERGE_HEADER
19 #define TLX_ALGORITHM_PARALLEL_MULTIWAY_MERGE_HEADER
20 
21 #include <algorithm>
22 #include <functional>
23 #include <thread>
24 #include <vector>
25 
26 #if defined(_OPENMP)
27 #include <omp.h>
28 #endif
29 
32 #include <tlx/simple_vector.hpp>
33 
34 namespace tlx {
35 
36 //! \addtogroup tlx_algorithm
37 //! \{
38 
39 //! default oversampling factor for parallel_multiway_merge
41 
42 /*!
43  * Parallel multi-way merge routine.
44  *
45  * Implemented either using OpenMP or with std::threads, depending on if
46  * compiled with -fopenmp or not. The OpenMP version uses the implicit thread
47  * pool, which is faster when using this method often.
48  *
49  * \param seqs_begin Begin iterator of iterator pair input sequence.
50  * \param seqs_end End iterator of iterator pair input sequence.
51  * \param target Begin iterator out output sequence.
52  * \param size Maximum size to merge.
53  * \param comp Comparator.
54  * \param mwma MultiwayMergeAlgorithm set to use.
55  * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
56  * \param num_threads Number of threads to use (defaults to all cores)
57  * \tparam Stable Stable merging incurs a performance penalty.
58  * \return End iterator of output sequence.
59  */
60 template <
61  bool Stable,
62  typename RandomAccessIteratorIterator,
63  typename RandomAccessIterator3,
64  typename Comparator = std::less<
65  typename std::iterator_traits<
66  typename std::iterator_traits<RandomAccessIteratorIterator>
67  ::value_type::first_type>::value_type> >
68 RandomAccessIterator3 parallel_multiway_merge_base(
69  RandomAccessIteratorIterator seqs_begin,
70  RandomAccessIteratorIterator seqs_end,
71  RandomAccessIterator3 target,
72  const typename std::iterator_traits<
73  typename std::iterator_traits<
74  RandomAccessIteratorIterator>::value_type::first_type>::
75  difference_type size,
76  Comparator comp = Comparator(),
79  size_t num_threads = std::thread::hardware_concurrency()) {
80 
81  using RandomAccessIteratorPair =
82  typename std::iterator_traits<RandomAccessIteratorIterator>
83  ::value_type;
84  using RandomAccessIterator =
85  typename RandomAccessIteratorPair::first_type;
86  using DiffType = typename std::iterator_traits<RandomAccessIterator>
87  ::difference_type;
88 
89  // leave only non-empty sequences
90  std::vector<RandomAccessIteratorPair> seqs_ne;
91  seqs_ne.reserve(static_cast<size_t>(seqs_end - seqs_begin));
92  DiffType total_size = 0;
93 
94  for (RandomAccessIteratorIterator ii = seqs_begin; ii != seqs_end; ++ii)
95  {
96  if (ii->first != ii->second) {
97  total_size += ii->second - ii->first;
98  seqs_ne.push_back(*ii);
99  }
100  }
101 
102  size_t num_seqs = seqs_ne.size();
103 
104  if (total_size == 0 || num_seqs == 0)
105  return target;
106 
107  if (static_cast<DiffType>(num_threads) > total_size)
108  num_threads = total_size;
109 
110  // thread t will have to merge chunks[iam][0..k - 1]
111 
113 
114  for (size_t s = 0; s < num_threads; ++s)
115  chunks[s].resize(num_seqs);
116 
117  if (mwmsa == MWMSA_SAMPLING)
118  {
119  multiway_merge_sampling_splitting<Stable>(
120  seqs_ne.begin(), seqs_ne.end(),
121  static_cast<DiffType>(size), total_size, comp,
122  chunks.data(), num_threads,
124  }
125  else // (mwmsa == MWMSA_EXACT)
126  {
127  multiway_merge_exact_splitting<Stable>(
128  seqs_ne.begin(), seqs_ne.end(),
129  static_cast<DiffType>(size), total_size, comp,
130  chunks.data(), num_threads);
131  }
132 
133 #if defined(_OPENMP)
134 #pragma omp parallel num_threads(num_threads)
135  {
136  size_t iam = omp_get_thread_num();
137 
138  DiffType target_position = 0, local_size = 0;
139 
140  for (size_t s = 0; s < num_seqs; ++s)
141  {
142  target_position += chunks[iam][s].first - seqs_ne[s].first;
143  local_size += chunks[iam][s].second - chunks[iam][s].first;
144  }
145 
146  multiway_merge_base<Stable, false>(
147  chunks[iam].begin(), chunks[iam].end(),
148  target + target_position,
149  std::min(local_size, static_cast<DiffType>(size) - target_position),
150  comp, mwma);
151  }
152 #else
153  std::vector<std::thread> threads(num_threads);
154 
155  for (size_t iam = 0; iam < num_threads; ++iam) {
156  threads[iam] = std::thread(
157  [&, iam]() {
158  DiffType target_position = 0, local_size = 0;
159 
160  for (size_t s = 0; s < num_seqs; ++s)
161  {
162  target_position += chunks[iam][s].first - seqs_ne[s].first;
163  local_size += chunks[iam][s].second - chunks[iam][s].first;
164  }
165 
166  multiway_merge_base<Stable, false>(
167  chunks[iam].begin(), chunks[iam].end(),
168  target + target_position,
169  std::min(local_size, static_cast<DiffType>(size) - target_position),
170  comp, mwma);
171  });
172  }
173 
174  for (size_t i = 0; i < num_threads; ++i)
175  threads[i].join();
176 #endif
177 
178  // update ends of sequences
179  size_t count_seqs = 0;
180  for (RandomAccessIteratorIterator ii = seqs_begin; ii != seqs_end; ++ii)
181  {
182  if (ii->first != ii->second)
183  ii->first = chunks[num_threads - 1][count_seqs++].second;
184  }
185 
186  return target + size;
187 }
188 
189 /******************************************************************************/
190 // parallel_multiway_merge() Frontends
191 
192 //! setting to force all parallel_multiway_merge() calls to run sequentially
194 
195 //! setting to force parallel_multiway_merge() calls to run with parallel code
197 
198 //! minimal number of sequences for switching to parallel merging
200 
201 //! minimal number of items for switching to parallel merging
203 
204 /*!
205  * Parallel multi-way merge routine.
206  *
207  * Implemented either using OpenMP or with std::threads, depending on if
208  * compiled with -fopenmp or not. The OpenMP version uses the implicit thread
209  * pool, which is faster when using this method often.
210  *
211  * \param seqs_begin Begin iterator of iterator pair input sequence.
212  * \param seqs_end End iterator of iterator pair input sequence.
213  * \param target Begin iterator out output sequence.
214  * \param size Maximum size to merge.
215  * \param comp Comparator.
216  * \param mwma MultiwayMergeAlgorithm set to use.
217  * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
218  * \param num_threads Number of threads to use (defaults to all cores)
219  * \tparam Stable Stable merging incurs a performance penalty.
220  * \return End iterator of output sequence.
221  */
222 template <
223  typename RandomAccessIteratorIterator,
224  typename RandomAccessIterator3,
225  typename Comparator = std::less<
226  typename std::iterator_traits<
227  typename std::iterator_traits<RandomAccessIteratorIterator>
228  ::value_type::first_type>::value_type> >
229 RandomAccessIterator3 parallel_multiway_merge(
230  RandomAccessIteratorIterator seqs_begin,
231  RandomAccessIteratorIterator seqs_end,
232  RandomAccessIterator3 target,
233  const typename std::iterator_traits<
234  typename std::iterator_traits<
235  RandomAccessIteratorIterator>::value_type::first_type>::
236  difference_type size,
237  Comparator comp = Comparator(),
240  size_t num_threads = std::thread::hardware_concurrency()) {
241 
242  if (seqs_begin == seqs_end)
243  return target;
244 
245  if (!parallel_multiway_merge_force_sequential &&
246  (parallel_multiway_merge_force_parallel ||
247  (num_threads > 1 &&
248  (static_cast<size_t>(seqs_end - seqs_begin)
249  >= parallel_multiway_merge_minimal_k) &&
250  static_cast<size_t>(size) >= parallel_multiway_merge_minimal_n))) {
251  return parallel_multiway_merge_base</* Stable */ false>(
252  seqs_begin, seqs_end, target, size, comp,
253  mwma, mwmsa, num_threads);
254  }
255  else {
256  return multiway_merge_base</* Stable */ false, /* Sentinels */ false>(
257  seqs_begin, seqs_end, target, size, comp, mwma);
258  }
259 }
260 
261 /*!
262  * Stable parallel multi-way merge routine.
263  *
264  * Implemented either using OpenMP or with std::threads, depending on if
265  * compiled with -fopenmp or not. The OpenMP version uses the implicit thread
266  * pool, which is faster when using this method often.
267  *
268  * \param seqs_begin Begin iterator of iterator pair input sequence.
269  * \param seqs_end End iterator of iterator pair input sequence.
270  * \param target Begin iterator out output sequence.
271  * \param size Maximum size to merge.
272  * \param comp Comparator.
273  * \param mwma MultiwayMergeAlgorithm set to use.
274  * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
275  * \param num_threads Number of threads to use (defaults to all cores)
276  * \tparam Stable Stable merging incurs a performance penalty.
277  * \return End iterator of output sequence.
278  */
279 template <
280  typename RandomAccessIteratorIterator,
281  typename RandomAccessIterator3,
282  typename Comparator = std::less<
283  typename std::iterator_traits<
284  typename std::iterator_traits<RandomAccessIteratorIterator>
285  ::value_type::first_type>::value_type> >
286 RandomAccessIterator3 stable_parallel_multiway_merge(
287  RandomAccessIteratorIterator seqs_begin,
288  RandomAccessIteratorIterator seqs_end,
289  RandomAccessIterator3 target,
290  const typename std::iterator_traits<
291  typename std::iterator_traits<
292  RandomAccessIteratorIterator>::value_type::first_type>::
293  difference_type size,
294  Comparator comp = Comparator(),
297  size_t num_threads = std::thread::hardware_concurrency()) {
298 
299  if (seqs_begin == seqs_end)
300  return target;
301 
302  if (!parallel_multiway_merge_force_sequential &&
303  (parallel_multiway_merge_force_parallel ||
304  (num_threads > 1 &&
305  (static_cast<size_t>(seqs_end - seqs_begin)
306  >= parallel_multiway_merge_minimal_k) &&
307  static_cast<size_t>(size) >= parallel_multiway_merge_minimal_n))) {
308  return parallel_multiway_merge_base</* Stable */ true>(
309  seqs_begin, seqs_end, target, size, comp,
310  mwma, mwmsa, num_threads);
311  }
312  else {
313  return multiway_merge_base</* Stable */ true, /* Sentinels */ false>(
314  seqs_begin, seqs_end, target, size, comp, mwma);
315  }
316 }
317 
318 /*!
319  * Parallel multi-way merge routine with sentinels.
320  *
321  * Implemented either using OpenMP or with std::threads, depending on if
322  * compiled with -fopenmp or not. The OpenMP version uses the implicit thread
323  * pool, which is faster when using this method often.
324  *
325  * \param seqs_begin Begin iterator of iterator pair input sequence.
326  * \param seqs_end End iterator of iterator pair input sequence.
327  * \param target Begin iterator out output sequence.
328  * \param size Maximum size to merge.
329  * \param comp Comparator.
330  * \param mwma MultiwayMergeAlgorithm set to use.
331  * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
332  * \param num_threads Number of threads to use (defaults to all cores)
333  * \tparam Stable Stable merging incurs a performance penalty.
334  * \return End iterator of output sequence.
335  */
336 template <
337  typename RandomAccessIteratorIterator,
338  typename RandomAccessIterator3,
339  typename Comparator = std::less<
340  typename std::iterator_traits<
341  typename std::iterator_traits<RandomAccessIteratorIterator>
342  ::value_type::first_type>::value_type> >
343 RandomAccessIterator3 parallel_multiway_merge_sentinels(
344  RandomAccessIteratorIterator seqs_begin,
345  RandomAccessIteratorIterator seqs_end,
346  RandomAccessIterator3 target,
347  const typename std::iterator_traits<
348  typename std::iterator_traits<
349  RandomAccessIteratorIterator>::value_type::first_type>::
350  difference_type size,
351  Comparator comp = Comparator(),
354  size_t num_threads = std::thread::hardware_concurrency()) {
355 
356  if (seqs_begin == seqs_end)
357  return target;
358 
359  if (!parallel_multiway_merge_force_sequential &&
360  (parallel_multiway_merge_force_parallel ||
361  (num_threads > 1 &&
362  (static_cast<size_t>(seqs_end - seqs_begin)
363  >= parallel_multiway_merge_minimal_k) &&
364  static_cast<size_t>(size) >= parallel_multiway_merge_minimal_n))) {
365  return parallel_multiway_merge_base</* Stable */ false>(
366  seqs_begin, seqs_end, target, size, comp,
367  mwma, mwmsa, num_threads);
368  }
369  else {
370  return multiway_merge_base</* Stable */ false, /* Sentinels */ true>(
371  seqs_begin, seqs_end, target, size, comp, mwma);
372  }
373 }
374 
375 /*!
376  * Stable parallel multi-way merge routine with sentinels.
377  *
378  * Implemented either using OpenMP or with std::threads, depending on if
379  * compiled with -fopenmp or not. The OpenMP version uses the implicit thread
380  * pool, which is faster when using this method often.
381  *
382  * \param seqs_begin Begin iterator of iterator pair input sequence.
383  * \param seqs_end End iterator of iterator pair input sequence.
384  * \param target Begin iterator out output sequence.
385  * \param size Maximum size to merge.
386  * \param comp Comparator.
387  * \param mwma MultiwayMergeAlgorithm set to use.
388  * \param mwmsa MultiwayMergeSplittingAlgorithm to use.
389  * \param num_threads Number of threads to use (defaults to all cores)
390  * \tparam Stable Stable merging incurs a performance penalty.
391  * \return End iterator of output sequence.
392  */
393 template <
394  typename RandomAccessIteratorIterator,
395  typename RandomAccessIterator3,
396  typename Comparator = std::less<
397  typename std::iterator_traits<
398  typename std::iterator_traits<RandomAccessIteratorIterator>
399  ::value_type::first_type>::value_type> >
401  RandomAccessIteratorIterator seqs_begin,
402  RandomAccessIteratorIterator seqs_end,
403  RandomAccessIterator3 target,
404  const typename std::iterator_traits<
405  typename std::iterator_traits<
406  RandomAccessIteratorIterator>::value_type::first_type>::
407  difference_type size,
408  Comparator comp = Comparator(),
411  size_t num_threads = std::thread::hardware_concurrency()) {
412 
413  if (seqs_begin == seqs_end)
414  return target;
415 
416  if (!parallel_multiway_merge_force_sequential &&
417  (parallel_multiway_merge_force_parallel ||
418  (num_threads > 1 &&
419  (static_cast<size_t>(seqs_end - seqs_begin)
420  >= parallel_multiway_merge_minimal_k) &&
421  static_cast<size_t>(size) >= parallel_multiway_merge_minimal_n))) {
422  return parallel_multiway_merge_base</* Stable */ true>(
423  seqs_begin, seqs_end, target, size, comp,
424  mwma, mwmsa, num_threads);
425  }
426  else {
427  return multiway_merge_base</* Stable */ true, /* Sentinels */ true>(
428  seqs_begin, seqs_end, target, size, comp, mwma);
429  }
430 }
431 
432 //! \}
433 
434 } // namespace tlx
435 
436 #endif // !TLX_ALGORITHM_PARALLEL_MULTIWAY_MERGE_HEADER
437 
438 /******************************************************************************/
iterator end() noexcept
return mutable iterator beyond last element
Simpler non-growing vector without initialization.
MultiwayMergeAlgorithm
Different merging algorithms: bubblesort-alike, loser-tree variants, enum sentinel.
RandomAccessIterator3 parallel_multiway_merge(RandomAccessIteratorIterator seqs_begin, RandomAccessIteratorIterator seqs_end, RandomAccessIterator3 target, const typename std::iterator_traits< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type::first_type >::difference_type size, Comparator comp=Comparator(), MultiwayMergeAlgorithm mwma=MWMA_ALGORITHM_DEFAULT, MultiwayMergeSplittingAlgorithm mwmsa=MWMSA_DEFAULT, size_t num_threads=std::thread::hardware_concurrency())
Parallel multi-way merge routine.
iterator data() noexcept
return iterator to beginning of vector
iterator begin() noexcept
return mutable iterator to first element
MultiwayMergeSplittingAlgorithm
Different splitting strategies for sorting/merging: by sampling, exact.
bool parallel_multiway_merge_force_sequential
setting to force all parallel_multiway_merge() calls to run sequentially
bool parallel_multiway_merge_force_parallel
setting to force parallel_multiway_merge() calls to run with parallel code
RandomAccessIterator3 multiway_merge_base(RandomAccessIteratorIterator seqs_begin, RandomAccessIteratorIterator seqs_end, RandomAccessIterator3 target, typename std::iterator_traits< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type::first_type >::difference_type size, Comparator comp=Comparator(), MultiwayMergeAlgorithm mwma=MWMA_ALGORITHM_DEFAULT)
Sequential multi-way merging switch.
static uint_pair min()
return an uint_pair instance containing the smallest value possible
Definition: uint_types.hpp:217
RandomAccessIterator3 parallel_multiway_merge_sentinels(RandomAccessIteratorIterator seqs_begin, RandomAccessIteratorIterator seqs_end, RandomAccessIterator3 target, const typename std::iterator_traits< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type::first_type >::difference_type size, Comparator comp=Comparator(), MultiwayMergeAlgorithm mwma=MWMA_ALGORITHM_DEFAULT, MultiwayMergeSplittingAlgorithm mwmsa=MWMSA_DEFAULT, size_t num_threads=std::thread::hardware_concurrency())
Parallel multi-way merge routine with sentinels.
size_t parallel_multiway_merge_minimal_n
minimal number of items for switching to parallel merging
std::string join(char glue, const std::vector< std::string > &parts)
Join a vector of strings by some glue character between each pair from the sequence.
Definition: join.cpp:16
size_t parallel_multiway_merge_oversampling
default oversampling factor for parallel_multiway_merge
RandomAccessIterator3 stable_parallel_multiway_merge(RandomAccessIteratorIterator seqs_begin, RandomAccessIteratorIterator seqs_end, RandomAccessIterator3 target, const typename std::iterator_traits< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type::first_type >::difference_type size, Comparator comp=Comparator(), MultiwayMergeAlgorithm mwma=MWMA_ALGORITHM_DEFAULT, MultiwayMergeSplittingAlgorithm mwmsa=MWMSA_DEFAULT, size_t num_threads=std::thread::hardware_concurrency())
Stable parallel multi-way merge routine.
RandomAccessIterator3 parallel_multiway_merge_base(RandomAccessIteratorIterator seqs_begin, RandomAccessIteratorIterator seqs_end, RandomAccessIterator3 target, const typename std::iterator_traits< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type::first_type >::difference_type size, Comparator comp=Comparator(), MultiwayMergeAlgorithm mwma=MWMA_ALGORITHM_DEFAULT, MultiwayMergeSplittingAlgorithm mwmsa=MWMSA_DEFAULT, size_t num_threads=std::thread::hardware_concurrency())
Parallel multi-way merge routine.
RandomAccessIterator3 stable_parallel_multiway_merge_sentinels(RandomAccessIteratorIterator seqs_begin, RandomAccessIteratorIterator seqs_end, RandomAccessIterator3 target, const typename std::iterator_traits< typename std::iterator_traits< RandomAccessIteratorIterator >::value_type::first_type >::difference_type size, Comparator comp=Comparator(), MultiwayMergeAlgorithm mwma=MWMA_ALGORITHM_DEFAULT, MultiwayMergeSplittingAlgorithm mwmsa=MWMSA_DEFAULT, size_t num_threads=std::thread::hardware_concurrency())
Stable parallel multi-way merge routine with sentinels.
size_t parallel_multiway_merge_minimal_k
minimal number of sequences for switching to parallel merging