14 #ifndef THRILL_API_SORT_HEADER 15 #define THRILL_API_SORT_HEADER 38 #include <type_traits> 61 typename CompareFunction,
62 typename SortAlgorithm,
66 static constexpr
bool debug =
false;
89 template <
typename ReaderIterator,
90 typename Comparator = std::less<ValueType> >
92 ReaderIterator seqs_begin, ReaderIterator seqs_end,
93 const Comparator& comp = Comparator()) {
95 return core::make_multiway_merge_tree<ValueType>(
96 seqs_begin, seqs_end, comp);
101 template <
typename ReaderIterator,
102 typename Comparator = std::less<ValueType> >
104 ReaderIterator seqs_begin, ReaderIterator seqs_end,
105 const Comparator& comp = Comparator()) {
107 return core::make_stable_multiway_merge_tree<ValueType>(
108 seqs_begin, seqs_end, comp);
122 template <
typename ParentDIA>
124 const CompareFunction& compare_function,
125 const SortAlgorithm& sort_algorithm = SortAlgorithm())
126 :
Super(parent.ctx(),
"Sort", { parent.id() }, { parent.node() }),
131 auto pre_op_fn = [
this](
const ValueType& input) {
135 auto lop_chain = parent.stack().push(pre_op_fn).fold();
136 parent.node()->AddChild(
this, lop_chain);
144 void PreOp(
const ValueType& input) {
154 <<
"Sort rejected File from parent " 155 <<
"due to non-empty function stack.";
165 sLOG <<
"Pick" << pick_items <<
"samples by random access" 166 <<
" from File containing " <<
local_items_ <<
" items.";
167 for (
size_t i = 0; i < pick_items; ++i) {
169 sLOG <<
"got index[" << i <<
"] = " << index;
181 <<
" samples.size()= " <<
samples_.size();
217 Timer timer_pushdata;
218 timer_pushdata.Start();
220 size_t local_size = 0;
224 else if (
files_.size() == 1) {
225 local_size =
files_[0].num_items();
229 size_t merge_degree, prefetch;
232 while (std::tie(merge_degree, prefetch) =
234 files_.size() > merge_degree)
240 <<
"Start multi-way-merge of" <<
files_.size() <<
"files" 241 <<
"with prefetch" << prefetch;
244 std::vector<data::File::Reader> seq;
245 seq.reserve(
files_.size());
247 for (
size_t t = 0; t <
files_.size(); ++t) {
249 files_[t].GetReader(consume, 0));
257 while (puller.HasNext()) {
263 timer_pushdata.Stop();
267 "Sort() timer_pushdata", timer_pushdata.SecondsDouble());
338 std::vector<SampleIndexPair>& splitters,
size_t sample_size,
345 std::vector<SampleIndexPair> samples;
346 samples.reserve(sample_size * num_total_workers);
348 auto reader = sample_stream->GetMixReader(
true);
350 while (reader.HasNext()) {
351 samples.push_back(reader.template Next<SampleIndexPair>());
353 if (samples.size() == 0)
return;
355 LOG <<
"FindAndSendSplitters() samples.size()=" << samples.size();
358 std::sort(samples.begin(), samples.end(),
364 double splitting_size =
static_cast<double>(samples.size())
365 / static_cast<double>(num_total_workers);
368 for (
size_t i = 1; i < num_total_workers; ++i) {
370 samples[static_cast<size_t>(i * splitting_size)]);
371 for (
size_t j = 1; j < num_total_workers; j++) {
372 sample_writers[j].Put(splitters.back());
376 for (
size_t j = 1; j < num_total_workers; ++j)
377 sample_writers[j].Close();
396 : tree_(splitter_tree),
398 ssplitter_(ssplitter) {
400 recurse(samples, samples + ssplitter, 1);
404 unsigned int treeidx) {
407 assert(mid < samples_ + ssplitter_);
408 tree_[treeidx] = mid->first;
410 if (2 * treeidx < ssplitter_)
413 recurse(lo, midlo, 2 * treeidx + 0);
414 recurse(midhi, hi, 2 * treeidx + 1);
429 template <
typename Integral>
430 static inline size_t RoundDown(Integral n, Integral k) {
431 return (n & ~(k - 1));
436 const ValueType*
const tree,
449 auto data_writers = data_stream->GetWriters();
454 assert(data_writers.size() == actual_k);
455 assert(actual_k <= k);
457 data_writers.reserve(k);
458 while (data_writers.size() < k)
459 data_writers.emplace_back(
typename TranmissionStreamType::Writer());
461 std::swap(data_writers[actual_k - 1], data_writers[k - 1]);
465 const size_t stepsize = 2;
467 size_t i = prefix_items;
468 for ( ; i < prefix_items +
RoundDown(local_items_, stepsize); i += stepsize)
472 ValueType el0 = unsorted_reader.
Next<ValueType>();
475 ValueType el1 = unsorted_reader.
Next<ValueType>();
478 for (
size_t l = 0; l < log_k; l++)
504 assert(data_writers[b0].IsValid());
505 assert(data_writers[b1].IsValid());
507 data_writers[b0].Put(el0);
508 data_writers[b1].Put(el1);
515 ValueType el0 = unsorted_reader.
Next<ValueType>();
518 for (
size_t l = 0; l < log_k; l++)
530 assert(data_writers[b0].IsValid());
531 data_writers[b0].Put(el0);
546 <<
"local_items_" << local_items_
547 <<
"prefix_items" << prefix_items
548 <<
"total_items" << total_items
549 <<
"local sample_.size()" << samples_.size();
551 if (total_items == 0) {
553 <<
"class" <<
"SortNode" 555 <<
"workers" << num_total_workers
556 <<
"local_out_size" << local_out_size_
558 <<
"sample_size" << samples_.size();
570 sample_writers[0].Put(
573 sample_writers[0].
Close();
578 size_t workers_algo = size_t(1) << ceil_log;
579 size_t splitter_count_algo = workers_algo - 1;
581 std::vector<SampleIndexPair> splitters;
582 splitters.reserve(workers_algo);
586 sample_stream, sample_writers);
590 for (
size_t j = 1; j < num_total_workers; j++) {
591 sample_writers[j].
Close();
594 sample_stream->GetMixReader(
true);
596 splitters.push_back(reader.template Next<SampleIndexPair>());
599 sample_writers.clear();
600 sample_stream.
reset();
604 std::vector<ValueType> splitter_tree(workers_algo + 1);
607 for (
size_t i = num_total_workers; i < workers_algo; i++) {
608 splitters.push_back(splitters.back());
613 splitter_count_algo);
615 auto data_stream =
context_.template GetNewStream<TranmissionStreamType>(this->
dia_id());
618 if (use_background_thread_) {
621 [
this, &data_stream]() {
628 splitter_tree.data(),
638 if (use_background_thread_)
646 if (local_out_size_ > 0) {
648 * static_cast<double>(num_total_workers)
649 /
static_cast<double>(total_items);
653 balance = 1 / balance;
657 <<
"class" <<
"SortNode" 659 <<
"workers" << num_total_workers
660 <<
"local_out_size" << local_out_size_
661 <<
"balance" << balance
662 <<
"sample_size" << samples_.size();
667 auto reader = data_stream->GetReader(
true);
669 LOG0 <<
"Writing files";
673 size_t capacity_half = capacity / 2;
674 std::vector<ValueType> vec;
675 vec.reserve(capacity);
677 while (reader.HasNext()) {
678 if (vec.size() < capacity_half ||
680 vec.push_back(reader.template Next<ValueType>());
692 "Sort() timer_sort_", timer_sort_.SecondsDouble());
698 LOG <<
"SortAndWriteToFile() " << vec.size()
699 <<
" items into file #" << files_.size();
703 size_t vec_size = vec.size();
704 local_out_size_ += vec.size();
721 auto writer = files_.back().GetWriter();
722 for (
const ValueType& elem : vec) {
729 LOG0 <<
"SortAndWriteToFile() finished writing files";
733 LOG0 <<
"SortAndWriteToFile() vector cleared";
736 <<
"class" <<
"SortNode" 737 <<
"event" <<
"write_file" 738 <<
"file_num" << (files_.size() - 1)
739 <<
"items" << vec_size
740 <<
"timer_sort_" << timer_sort_
741 <<
"write_time" << write_time;
745 sLOG1 <<
"Partial multi-way-merge of" << files_.size()
746 <<
"files with degree" << merge_degree
747 <<
"and prefetch" << prefetch;
749 std::vector<data::File> new_files;
753 for (fi = 0; fi + merge_degree < files_.size(); fi += merge_degree) {
755 std::vector<data::File::ConsumeReader> seq;
756 seq.reserve(merge_degree);
758 for (
size_t t = 0; t < merge_degree; ++t) {
760 files_[fi + t].GetConsumeReader( 0));
770 auto writer = new_files.back().GetWriter();
772 while (puller.HasNext()) {
773 writer.Put(puller.Next());
781 for ( ; fi < files_.size(); ++fi) {
782 new_files.emplace_back(std::move(files_[fi]));
792 template <
typename Iterator,
typename CompareFunction>
793 void operator () (Iterator begin, Iterator end, CompareFunction cmp)
const {
794 return std::sort(begin, end, cmp);
798 template <
typename ValueType,
typename Stack>
799 template <
typename CompareFunction>
810 "CompareFunction has the wrong input type");
816 "CompareFunction has the wrong input type");
822 "CompareFunction has the wrong output type (should be bool)");
824 auto node = tlx::make_counting<SortNode>(*
this, compare_function);
829 template <
typename ValueType,
typename Stack>
830 template <
typename CompareFunction,
typename SortAlgorithm>
832 const SortAlgorithm& sort_algorithm)
const {
836 ValueType, CompareFunction, SortAlgorithm>;
842 "CompareFunction has the wrong input type");
848 "CompareFunction has the wrong input type");
854 "CompareFunction has the wrong output type (should be bool)");
856 auto node = tlx::make_counting<SortNode>(
857 *
this, compare_function, sort_algorithm);
865 template <
typename Iterator,
typename CompareFunction>
866 void operator () (Iterator begin, Iterator end, CompareFunction cmp)
const {
867 return std::stable_sort(begin, end, cmp);
871 template <
typename ValueType,
typename Stack>
872 template <
typename CompareFunction>
874 const CompareFunction& compare_function)
const {
885 "CompareFunction has the wrong input type");
891 "CompareFunction has the wrong input type");
897 "CompareFunction has the wrong output type (should be bool)");
899 auto node = tlx::make_counting<SortStableNode>(*
this, compare_function);
904 template <
typename ValueType,
typename Stack>
905 template <
typename CompareFunction,
typename SortAlgorithm>
907 const CompareFunction& compare_function,
908 const SortAlgorithm& sort_algorithm)
const {
913 ValueType, CompareFunction, SortAlgorithm,
true>;
919 "CompareFunction has the wrong input type");
925 "CompareFunction has the wrong input type");
931 "CompareFunction has the wrong output type (should be bool)");
933 auto node = tlx::make_counting<SortStableNode>(
934 *
this, compare_function, sort_algorithm);
942 #endif // !THRILL_API_SORT_HEADER std::thread CreateThread(Args &&... args)
create a std::thread and repeat creation if it fails
size_t calc_sample_size(size_t count) const
calculate desired sample size
void StartPrefetch(std::vector< Reader > &readers, size_t prefetch_size)
Take a vector of Readers and prefetch equally from them.
bool HasNext()
HasNext() returns true if at least one more item is available.
SortAlgorithm sort_algorithm_
Sort function class.
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
A DIANode which performs a Sort operation.
net::FlowControlChannel & net
#define sLOG
Default logging method: output if the local debug variable is true.
DIA is the interface between the user and the Thrill framework.
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
Description of the amount of RAM the internal data structures of a DIANode require.
Timer timer_sort_
time spent in sort()
std::pair< ValueType, size_t > SampleIndexPair
void PrintCollectiveMeanStdev(const char *text, const Type &local)
Multiway merge tree creation depends on Stable flag.
std::pair< size_t, size_t > MaxMergeDegreePrefetch(size_t num_files)
bool EqualSampleGreaterIndex(const SampleIndexPair &a, const SampleIndexPair &b)
size_t num_workers() const
Global number of workers in the system.
A File is an ordered sequence of Block objects for storing items.
DIAMemUse PushDataMemUse() final
Amount of RAM used by PushData()
#define LOG0
Override default output: never or always output log.
#define sLOGC(cond)
Explicitly specify the condition for logging.
auto operator()(ReaderIterator seqs_begin, ReaderIterator seqs_end, const Comparator &comp=Comparator())
RIAA class for running the timer until destruction.
void reset()
release contained pointer, frees object if this is the last reference.
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
Ownership handle onto a MixStream.
void SortAndWriteToFile(std::vector< ValueType > &vec)
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
void ReceiveItems(TranmissionStreamPtr &data_stream)
static constexpr bool stats_enabled
Set this variable to true to enable generation and output of stats.
static constexpr bool g_debug_push_file
TreeBuilder(ValueType *splitter_tree, const SampleIndexPair *samples, size_t ssplitter)
Target: tree.
bool memory_exceeded
memory limit exceeded indicator
std::default_random_engine rng_
a random generator
void add(const Type &item)
visit item, maybe add it to the sample.
static const bool use_background_thread_
void StopPreOp(size_t) final
Virtual method for preparing end of PushData.
size_t wanted_sample_size() const
calculate currently desired number of samples
void Close()
custom destructor to close writers is a cyclic fashion
void FindAndSendSplitters(std::vector< SampleIndexPair > &splitters, size_t sample_size, data::MixStreamPtr &sample_stream, data::MixStream::Writers &sample_writers)
data::File::Writer unsorted_writer_
Writer for unsorted_file_.
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
Reader to retrieve items in unordered sequence from a MixBlockQueue.
ConsumeReader GetConsumeReader(size_t prefetch_size=File::default_prefetch_size_)
Get consuming BlockReader for beginning of File.
void StartPreOp(size_t) final
Virtual method for preparing start of PushData.
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
data::File unsorted_file_
All local unsorted items before communication.
const bool parent_stack_empty_
Whether the parent stack is empty.
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
void PartialMultiwayMerge(size_t merge_degree, size_t prefetch)
bool OnPreOpFile(const data::File &file, size_t) final
Receive a whole data::File of ValueType, but only if our stack is empty.
common::FunctionTraits< Function > FunctionTraits
alias for convenience.
Ownership handle onto a CatStreamData.
static size_t RoundDown(Integral n, Integral k)
round n down by k where k is a power of two.
auto Sort(const CompareFunction &compare_function=CompareFunction()) const
Sort is a DOp, which sorts a given DIA according to the given compare_function.
auto SortStable(const CompareFunction &compare_function=CompareFunction()) const
SortStable is a DOp, which sorts a given DIA stably according to the given compare_function.
size_t local_worker_id() const
typename std::conditional< Stable, MakeStableMultiwayMergeTree, MakeDefaultMultiwayMergeTree >::type MakeMultiwayMergeTree
size_t local_out_size_
Total number of local elements after communication.
High-performance smart pointer used as a wrapping reference counting pointer.
size_t my_rank() const
Global rank of this worker among all other workers in the system.
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
data::MixStreamPtr GetNewMixStream(size_t dia_id)
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSumTotal(T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, and delivers the total sum as well...
const size_t & dia_id() const
return unique id of DIANode subclass as stored by StatsNode
DIAMemUse ExecuteMemUse() final
Amount of RAM used by Execute()
void vector_free(std::vector< Type > &v)
void PreOp(const ValueType &input)
A DOpNode is a typed node representing and distributed operations in Thrill.
static uint_pair min()
return an uint_pair instance containing the smallest value possible
static unsigned integer_log2_ceil(int i)
calculate the log2 floor of an integer type
common::JsonLogger logger_
File Copy() const
Return a copy of the File (explicit copy-constructor)
typename std::conditional< Stable, data::CatStream, data::MixStream >::type TranmissionStreamType
Stream type for item transmission depends on Stable flag.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
const SampleIndexPair * samples_
void TransmitItems(const ValueType *const tree, size_t k, size_t log_k, size_t actual_k, const SampleIndexPair *const sorted_splitters, size_t prefix_items, TranmissionStreamPtr &data_stream)
void recurse(const SampleIndexPair *lo, const SampleIndexPair *hi, unsigned int treeidx)
static constexpr bool debug
size_t num_items() const
Return the number of items in the file.
void Close()
Explicitly close the writer.
std::vector< data::File > files_
Local data files.
void Execute() final
Executes the sum operation.
size_t local_items_
Number of items on this worker.
common::ReservoirSamplingGrow< SampleIndexPair > res_sampler_
Reservoir sampler.
#define LOG
Default logging method: output if the local debug variable is true.
Timer timer_execute_
time spent in Execute
ItemType GetItemAt(size_t index) const
void SetCpuAffinity(std::thread &thread, size_t cpu_id)
set cpu/core affinity of a thread
CompareFunction compare_function_
The comparison function which is applied to two elements.
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
void PushFile(data::File &file, bool consume) const
Context & context_
associated Context
Timer timer_preop_
time spent in PreOp (including preceding Node's computation)
#define LOGC(cond)
Explicitly specify the condition for logging.
std::vector< SampleIndexPair > samples_
Sample vector: pairs of (sample,local index)
bool LessSampleIndex(const SampleIndexPair &a, const SampleIndexPair &b)
static constexpr double desired_imbalance_
epsilon
SortNode(const ParentDIA &parent, const CompareFunction &compare_function, const SortAlgorithm &sort_algorithm=SortAlgorithm())
Constructor for a sort node.
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.