Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
inner_join.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/inner_join.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Alexander Noe <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_API_INNER_JOIN_HEADER
13 #define THRILL_API_INNER_JOIN_HEADER
14 
15 #include <thrill/api/dia.hpp>
16 #include <thrill/api/dop_node.hpp>
19 #include <thrill/common/logger.hpp>
23 #include <thrill/data/file.hpp>
24 
25 #include <algorithm>
26 #include <deque>
27 #include <functional>
28 #include <utility>
29 #include <vector>
30 
31 namespace thrill {
32 namespace api {
33 
34 /*!
35  * Performs an inner join between two DIAs. The key from each DIA element is
36  * hereby extracted with a key extractor function. All pairs of elements with
37  * equal keys from both DIAs are then joined with the join function.
38  *
39  * \tparam KeyExtractor1 Type of the key_extractor1 function. This is a
40  * function ValueType to the key type.
41  *
42  * \tparam KeyExtractor2 Type of the key_extractor2 function. This is a
43  * function from SecondDIA::ValueType to the key type.
44  *
45  * \tparam JoinFunction Type of the join_function. This is a function
46  * from ValueType and SecondDIA::ValueType to the type of the output DIA.
47  *
48  * \param SecondDIA Other DIA joined with this DIA.
49  *
50  * \param key_extractor1 Key extractor for first DIA
51  *
52  * \param key_extractor2 Key extractor for second DIA
53  *
54  * \param join_function Join function applied to all equal key pairs
55  *
56  * \ingroup dia_dops
57  */
58 template <typename ValueType, typename FirstDIA, typename SecondDIA,
59  typename KeyExtractor1, typename KeyExtractor2,
60  typename JoinFunction, typename HashFunction,
61  bool UseLocationDetection>
62 class JoinNode final : public DOpNode<ValueType>
63 {
64 private:
65  static constexpr bool debug = false;
66 
68  using Super::context_;
69 
70  using InputTypeFirst = typename FirstDIA::ValueType;
71  using InputTypeSecond = typename SecondDIA::ValueType;
72 
73  //! Key type of join. must be equal to the other key extractor
74  using Key = typename common::FunctionTraits<KeyExtractor1>::result_type;
75 
76  //! hash counter used by LocationDetection
77  class HashCount
78  {
79  public:
80  using HashType = size_t;
81  using CounterType = uint8_t;
82  using DIAIdxType = uint8_t;
83 
84  size_t hash;
87 
88  static constexpr size_t counter_bits_ = 8 * sizeof(CounterType);
89 
90  HashCount operator + (const HashCount& b) const {
91  assert(hash == b.hash);
92  return HashCount {
93  hash,
95  static_cast<DIAIdxType>(dia_mask | b.dia_mask)
96  };
97  }
98 
100  assert(hash == b.hash);
102  dia_mask |= b.dia_mask;
103  return *this;
104  }
105 
106  bool operator < (const HashCount& b) const { return hash < b.hash; }
107 
108  //! method to check if this hash count should be broadcasted to all
109  //! workers interested -- for InnerJoin this check if the dia_mask == 3
110  //! -> hash was in both DIAs on some worker.
111  bool NeedBroadcast() const {
112  return dia_mask == 3;
113  }
114 
115  //! Read count and dia_mask from BitReader
116  template <typename BitReader>
117  void ReadBits(BitReader& reader) {
118  count = reader.GetBits(counter_bits_);
119  dia_mask = reader.GetBits(2);
120  }
121 
122  //! Write count and dia_mask to BitWriter
123  template <typename BitWriter>
124  void WriteBits(BitWriter& writer) const {
125  writer.PutBits(count, counter_bits_);
126  writer.PutBits(dia_mask, 2);
127  }
128  };
129 
130 public:
131  /*!
132  * Constructor for a JoinNode.
133  */
134  JoinNode(const FirstDIA& parent1, const SecondDIA& parent2,
135  const KeyExtractor1& key_extractor1,
136  const KeyExtractor2& key_extractor2,
137  const JoinFunction& join_function,
138  const HashFunction& hash_function)
139  : Super(parent1.ctx(), "Join",
140  { parent1.id(), parent2.id() },
141  { parent1.node(), parent2.node() }),
142  key_extractor1_(key_extractor1),
143  key_extractor2_(key_extractor2),
144  join_function_(join_function),
145  hash_function_(hash_function)
146  {
147  auto pre_op_fn1 = [this](const InputTypeFirst& input) {
148  PreOp1(input);
149  };
150 
151  auto pre_op_fn2 = [this](const InputTypeSecond& input) {
152  PreOp2(input);
153  };
154 
155  auto lop_chain1 = parent1.stack().push(pre_op_fn1).fold();
156  auto lop_chain2 = parent2.stack().push(pre_op_fn2).fold();
157  parent1.node()->AddChild(this, lop_chain1, 0);
158  parent2.node()->AddChild(this, lop_chain2, 1);
159  }
160 
161  void Execute() final {
162 
163  if (UseLocationDetection) {
164  std::unordered_map<size_t, size_t> target_processors;
165  size_t max_hash = location_detection_.Flush(target_processors);
166  location_detection_.Dispose();
167 
168  auto file1reader = pre_file1_.GetConsumeReader();
169  while (file1reader.HasNext()) {
170  InputTypeFirst in1 = file1reader.template Next<InputTypeFirst>();
171  auto target_processor =
172  target_processors.find(
173  hash_function_(key_extractor1_(in1)) % max_hash);
174  if (target_processor != target_processors.end()) {
175  hash_writers1_[target_processor->second].Put(in1);
176  }
177  }
178 
179  auto file2reader = pre_file2_.GetConsumeReader();
180  while (file2reader.HasNext()) {
181  InputTypeSecond in2 = file2reader.template Next<InputTypeSecond>();
182  auto target_processor =
183  target_processors.find(
184  hash_function_(key_extractor2_(in2)) % max_hash);
185  if (target_processor != target_processors.end()) {
186  hash_writers2_[target_processor->second].Put(in2);
187  }
188  }
189  }
190 
191  hash_writers1_.clear();
192  hash_writers2_.clear();
193 
194  MainOp();
195  }
196 
197  template <typename ElementType, typename CompareFunction>
198  auto MakePuller(std::deque<data::File>& files,
199  std::vector<data::File::Reader>& seq,
200  CompareFunction compare_function, bool consume) {
201 
202  size_t merge_degree, prefetch;
203  std::tie(merge_degree, prefetch) = MaxMergeDegreePrefetch(files);
204  // construct output merger of remaining Files
205  seq.reserve(files.size());
206  for (size_t t = 0; t < files.size(); ++t)
207  seq.emplace_back(files[t].GetReader(consume, 0));
208  StartPrefetch(seq, prefetch);
209 
210  return core::make_buffered_multiway_merge_tree<ElementType>(
211  seq.begin(), seq.end(), compare_function);
212  }
213 
214  void PushData(bool consume) final {
215 
216  auto compare_function_1 =
217  [this](const InputTypeFirst& in1, const InputTypeFirst& in2) {
218  return key_extractor1_(in1) < key_extractor1_(in2);
219  };
220 
221  auto compare_function_2 =
222  [this](const InputTypeSecond& in1, const InputTypeSecond& in2) {
223  return key_extractor2_(in1) < key_extractor2_(in2);
224  };
225 
226  // no possible join results when at least one data set is empty
227  if (!files1_.size() || !files2_.size())
228  return;
229 
230  //! Merge files when there are too many for the merge tree.
231  MergeFiles<InputTypeFirst>(files1_, compare_function_1);
232  MergeFiles<InputTypeSecond>(files2_, compare_function_2);
233 
234  std::vector<data::File::Reader> seq1;
235  std::vector<data::File::Reader> seq2;
236 
237  // construct output merger of remaining Files
238  auto puller1 = MakePuller<InputTypeFirst>(
239  files1_, seq1, compare_function_1, consume);
240  auto puller2 = MakePuller<InputTypeSecond>(
241  files2_, seq2, compare_function_2, consume);
242 
243  bool puller1_done = false;
244  if (!puller1.HasNext())
245  puller1_done = true;
246 
247  bool puller2_done = false;
248  if (!puller2.HasNext())
249  puller2_done = true;
250 
251  //! cache for elements with equal keys, cartesian product of both caches
252  //! are joined with the join_function
253  std::vector<InputTypeFirst> equal_keys1;
254  std::vector<InputTypeSecond> equal_keys2;
255 
256  while (!puller1_done && !puller2_done) {
257  //! find elements with equal key
258  if (key_extractor1_(puller1.Top()) <
259  key_extractor2_(puller2.Top())) {
260  if (!puller1.Update()) {
261  puller1_done = true;
262  break;
263  }
264  }
265  else if (key_extractor2_(puller2.Top()) <
266  key_extractor1_(puller1.Top())) {
267  if (!puller2.Update()) {
268  puller2_done = true;
269  break;
270  }
271  }
272  else {
273  bool external1 = false;
274  bool external2 = false;
275  equal_keys1.clear();
276  equal_keys2.clear();
277  std::tie(puller1_done, external1) =
278  AddEqualKeysToVec(equal_keys1, puller1,
280 
281  std::tie(puller2_done, external2) =
282  AddEqualKeysToVec(equal_keys2, puller2,
284 
285  JoinAllElements(equal_keys1, external1, equal_keys2, external2);
286  }
287  }
288  }
289 
290  void Dispose() final {
291  files1_.clear();
292  files2_.clear();
293  }
294 
295 private:
296  //! files for sorted datasets
297  std::deque<data::File> files1_;
298  std::deque<data::File> files2_;
299 
300  //! user-defined functions
301  KeyExtractor1 key_extractor1_;
302  KeyExtractor2 key_extractor2_;
303  JoinFunction join_function_;
304  HashFunction hash_function_;
305 
306  //! data streams for inter-worker communication of DIA elements
308  std::vector<data::Stream::Writer> hash_writers1_ { hash_stream1_->GetWriters() };
310  std::vector<data::Stream::Writer> hash_writers2_ { hash_stream2_->GetWriters() };
311 
312  //! location detection and associated files
317 
320 
321  void PreOp1(const InputTypeFirst& input) {
322  size_t hash = hash_function_(key_extractor1_(input));
323  if (UseLocationDetection) {
324  pre_writer1_.Put(input);
325  location_detection_.Insert(HashCount { hash, 1, /* dia_mask */ 1 });
326  }
327  else {
328  hash_writers1_[hash % context_.num_workers()].Put(input);
329  }
330  }
331 
332  void PreOp2(const InputTypeSecond& input) {
333  size_t hash = hash_function_(key_extractor2_(input));
334  if (UseLocationDetection) {
335  pre_writer2_.Put(input);
336  location_detection_.Insert(HashCount { hash, 1, /* dia_mask */ 2 });
337  }
338  else {
339  hash_writers2_[hash % context_.num_workers()].Put(input);
340  }
341  }
342 
343  //! Receive elements from other workers, create pre-sorted files
344  void MainOp() {
345  data::MixStream::MixReader reader1_ =
346  hash_stream1_->GetMixReader(/* consume */ true);
347 
348  size_t capacity = DIABase::mem_limit_ / sizeof(InputTypeFirst) / 2;
349 
350  ReceiveItems<InputTypeFirst>(capacity, reader1_, files1_, key_extractor1_);
351 
352  data::MixStream::MixReader reader2_ =
353  hash_stream2_->GetMixReader(/* consume */ true);
354 
355  capacity = DIABase::mem_limit_ / sizeof(InputTypeSecond) / 2;
356 
357  ReceiveItems<InputTypeSecond>(capacity, reader2_, files2_, key_extractor2_);
358  }
359 
360  template <typename ItemType>
361  size_t JoinCapacity() {
362  return DIABase::mem_limit_ / sizeof(ItemType) / 4;
363  }
364 
365  /*!
366  * Adds all elements from merge tree to a vector, afterwards sets the first_element
367  * pointer to the first element with a different key.
368  *
369  * \param vec target vector
370  *
371  * \param puller Input merge tree
372  *
373  * \param key_extractor Key extractor function
374  *
375  * \param file_ptr Pointer to a data::File
376  *
377  * \return Pair of bools, first bool indicates whether the merge tree is
378  * emptied, second bool indicates whether external memory was needed.
379  */
380  template <typename ItemType, typename KeyExtractor, typename MergeTree>
381  std::pair<bool, bool> AddEqualKeysToVec(
382  std::vector<ItemType>& vec, MergeTree& puller,
383  const KeyExtractor& key_extractor, data::FilePtr& file_ptr) {
384 
385  vec.push_back(puller.Top());
386  Key key = key_extractor(puller.Top());
387 
388  size_t capacity = JoinCapacity<ItemType>();
389 
390  if (!puller.Update())
391  return std::make_pair(true, false);
392 
393  while (key_extractor(puller.Top()) == key) {
394 
395  if (!mem::memory_exceeded && vec.size() < capacity) {
396  vec.push_back(puller.Top());
397  }
398  else {
399  file_ptr = context_.GetFilePtr(this);
400  data::File::Writer writer = file_ptr->GetWriter();
401  for (const ItemType& item : vec) {
402  writer.Put(item);
403  }
404  writer.Put(puller.Top());
405  //! vec is very large when this happens
406  //! swap with empty vector to free the memory
407  std::vector<ItemType>().swap(vec);
408 
409  return AddEqualKeysToFile(puller, key_extractor, writer, key);
410  }
411 
412  if (!puller.Update())
413  return std::make_pair(true, false);
414  }
415 
416  return std::make_pair(false, false);
417  }
418 
419  /*!
420  * Adds all elements from merge tree to a data::File, potentially to external memory,
421  * afterwards sets the first_element pointer to the first element with a different key.
422  *
423  * \param puller Input merge tree
424  *
425  * \param key_extractor Key extractor function
426  *
427  * \param writer File writer
428  *
429  * \param key target key
430  *
431  * \return Pair of bools, first bool indicates whether the merge tree is
432  * emptied, second bool indicates whether external memory was needed (always true, when
433  * this method was called).
434  */
435  template <typename KeyExtractor, typename MergeTree>
436  std::pair<bool, bool> AddEqualKeysToFile(
437  MergeTree& puller, const KeyExtractor& key_extractor,
438  data::File::Writer& writer, const Key& key) {
439  if (!puller.Update()) {
440  return std::make_pair(true, true);
441  }
442 
443  while (key_extractor(puller.Top()) == key) {
444  writer.Put(puller.Top());
445  if (!puller.Update())
446  return std::make_pair(true, true);
447  }
448 
449  return std::make_pair(false, true);
450  }
451 
453  return DIAMemUse::Max();
454  }
455 
456  void StartPreOp(size_t id) final {
457  LOG << *this << " running StartPreOp parent_idx=" << id;
458  if (!location_detection_initialized_ && UseLocationDetection) {
461  }
462 
463  auto ids = this->parent_ids();
464 
465  if (id == 0) {
467  }
468  if (id == 1) {
470  }
471  }
472 
473  void StopPreOp(size_t id) final {
474  LOG << *this << " running StopPreOp parent_idx=" << id;
475 
476  if (id == 0) {
478  }
479  if (id == 1) {
481  }
482  }
483 
485  return DIAMemUse::Max();
486  }
487 
489  return DIAMemUse::Max();
490  }
491 
492  /*!
493  * Recieve all elements from a stream and write them to files sorted by key.
494  */
495  template <typename ItemType, typename KeyExtractor>
497  size_t capacity, data::MixStream::MixReader& reader,
498  std::deque<data::File>& files, const KeyExtractor& key_extractor) {
499 
500  std::vector<ItemType> vec;
501  vec.reserve(capacity);
502 
503  while (reader.HasNext()) {
504  if (vec.size() < capacity) {
505  vec.push_back(reader.template Next<ItemType>());
506  }
507  else {
508  SortAndWriteToFile(vec, files, key_extractor);
509  }
510  }
511 
512  if (vec.size())
513  SortAndWriteToFile(vec, files, key_extractor);
514  }
515 
516  //! calculate maximum merging degree from available memory and the number of
517  //! files. additionally calculate the prefetch size of each File.
518  std::pair<size_t, size_t> MaxMergeDegreePrefetch(std::deque<data::File>& files) {
519  // 1/4 of avail_blocks in api::Sort, as Join has two mergers and two vectors of
520  // data to join
521  size_t avail_blocks = DIABase::mem_limit_ / data::default_block_size / 4;
522  if (files.size() >= avail_blocks) {
523  // more files than blocks available -> partial merge of avail_blocks
524  // Files with prefetch = 0, which is one read Block per File.
525  return std::make_pair(avail_blocks, 0u);
526  }
527  else {
528  // less files than available Blocks -> split blocks equally among
529  // Files.
530  return std::make_pair(
531  files.size(),
532  std::min<size_t>(16u, (avail_blocks / files.size()) - 1));
533  }
534  }
535 
536  /*!
537  * Merge files when there are too many for the merge tree to handle
538  */
539  template <typename ItemType, typename CompareFunction>
540  void MergeFiles(std::deque<data::File>& files,
541  CompareFunction compare_function) {
542 
543  size_t merge_degree, prefetch;
544 
545  // merge batches of files if necessary
546  while (files.size() > MaxMergeDegreePrefetch(files).first)
547  {
548  std::tie(merge_degree, prefetch) = MaxMergeDegreePrefetch(files);
549 
550  sLOG1 << "Partial multi-way-merge of"
551  << merge_degree << "files with prefetch" << prefetch;
552 
553  // create merger for first merge_degree_ Files
554  std::vector<data::File::ConsumeReader> seq;
555  seq.reserve(merge_degree);
556 
557  for (size_t t = 0; t < merge_degree; ++t)
558  seq.emplace_back(files[t].GetConsumeReader(0));
559 
560  StartPrefetch(seq, prefetch);
561 
562  auto puller = core::make_multiway_merge_tree<ItemType>(
563  seq.begin(), seq.end(), compare_function);
564 
565  // create new File for merged items
566  files.emplace_back(context_.GetFile(this));
567  auto writer = files.back().GetWriter();
568 
569  while (puller.HasNext()) {
570  writer.Put(puller.Next());
571  }
572  writer.Close();
573 
574  // this clear is important to release references to the files.
575  seq.clear();
576 
577  // remove merged files
578  files.erase(files.begin(), files.begin() + merge_degree);
579  }
580  }
581 
584 
585  /*!
586  * Joins all elements in cartesian product of both vectors. Uses files when
587  * one of the data sets is too large to fit in memory. (indicated by
588  * 'external' bools)
589  */
591  const std::vector<InputTypeFirst>& vec1, bool external1,
592  const std::vector<InputTypeSecond>& vec2, bool external2) {
593 
594  if (!external1 && !external2) {
595  for (const InputTypeFirst& join1 : vec1) {
596  for (const InputTypeSecond& join2 : vec2) {
597  assert(key_extractor1_(join1) == key_extractor2_(join2));
598  this->PushItem(join_function_(join1, join2));
599  }
600  }
601  }
602  else if (external1 && !external2) {
603  LOG1 << "Thrill: Warning: Too many equal keys for main memory "
604  << "in first DIA";
605 
606  data::File::ConsumeReader reader = join_file1_->GetConsumeReader();
607 
608  while (reader.HasNext()) {
609  InputTypeFirst join1 = reader.template Next<InputTypeFirst>();
610  for (auto const& join2 : vec2) {
611  assert(key_extractor1_(join1) == key_extractor2_(join2));
612  this->PushItem(join_function_(join1, join2));
613  }
614  }
615  }
616  else if (!external1 && external2) {
617  LOG1 << "Thrill: Warning: Too many equal keys for main memory "
618  << "in second DIA";
619 
620  data::File::ConsumeReader reader = join_file2_->GetConsumeReader();
621 
622  while (reader.HasNext()) {
623  InputTypeSecond join2 = reader.template Next<InputTypeSecond>();
624  for (const InputTypeFirst& join1 : vec1) {
625  assert(key_extractor1_(join1) == key_extractor2_(join2));
626  this->PushItem(join_function_(join1, join2));
627  }
628  }
629  }
630  else if (external1 && external2) {
631  LOG1 << "Thrill: Warning: Too many equal keys for main memory "
632  << "in both DIAs. This is very slow.";
633 
634  size_t capacity = JoinCapacity<InputTypeFirst>();
635 
636  std::vector<InputTypeFirst> temp_vec;
637  temp_vec.reserve(capacity);
638 
639  //! file 2 needs to be read multiple times
640  data::File::ConsumeReader reader1 = join_file1_->GetConsumeReader();
641 
642  while (reader1.HasNext()) {
643 
644  for (size_t i = 0; i < capacity && reader1.HasNext() &&
645  !mem::memory_exceeded; ++i) {
646  temp_vec.push_back(reader1.template Next<InputTypeFirst>());
647  }
648 
649  data::File::Reader reader2 = join_file2_->GetReader(/* consume */ false);
650 
651  while (reader2.HasNext()) {
652  InputTypeSecond join2 = reader2.template Next<InputTypeSecond>();
653  for (const InputTypeFirst& join1 : temp_vec) {
654  assert(key_extractor1_(join1) == key_extractor2_(join2));
655  this->PushItem(join_function_(join1, join2));
656  }
657  }
658  temp_vec.clear();
659  }
660 
661  //! non-consuming reader, need to clear now
662  join_file2_->Clear();
663  }
664  }
665 
666  /*!
667  * Sorts all elements in a vector and writes them to a file.
668  */
669  template <typename ItemType, typename KeyExtractor>
671  std::vector<ItemType>& vec, std::deque<data::File>& files,
672  const KeyExtractor& key_extractor) {
673 
674  // advise block pool to write out data if necessary
675  context_.block_pool().AdviseFree(vec.size() * sizeof(ValueType));
676 
677  std::sort(vec.begin(), vec.end(),
678  [&key_extractor](const ItemType& i1, const ItemType& i2) {
679  return key_extractor(i1) < key_extractor(i2);
680  });
681 
682  files.emplace_back(context_.GetFile(this));
683  auto writer = files.back().GetWriter();
684  for (const ItemType& elem : vec) {
685  writer.Put(elem);
686  }
687  writer.Close();
688 
689  vec.clear();
690  }
691 };
692 
693 /*!
694  * Performs an inner join between this DIA and the DIA given in the first
695  * parameter. The key from each DIA element is hereby extracted with a key
696  * extractor function. All pairs of elements with equal keys from both DIAs are
697  * then joined with the join function.
698  *
699  * \tparam KeyExtractor1 Type of the key_extractor1 function. This is a function
700  * from FirstDIA::ValueType to the key type.
701  *
702  * \tparam KeyExtractor2 Type of the key_extractor2 function. This is a function
703  * from SecondDIA::ValueType to the key type.
704  *
705  * \tparam JoinFunction Type of the join_function. This is a function from
706  * ValueType and SecondDIA::ValueType to the type of the output DIA.
707  *
708  * \param first_dia First DIA to join.
709  *
710  * \param second_dia Second DIA to join.
711  *
712  * \param key_extractor1 Key extractor for this DIA
713  *
714  * \param key_extractor2 Key extractor for second DIA
715  *
716  * \param join_function Join function applied to all equal key pairs
717  *
718  * \param hash_function If necessary a hash funtion for Key
719  *
720  * \ingroup dia_dops
721  */
722 template <
723  bool LocationDetectionValue,
724  typename FirstDIA,
725  typename SecondDIA,
726  typename KeyExtractor1,
727  typename KeyExtractor2,
728  typename JoinFunction,
729  typename HashFunction =
730  std::hash<typename common::FunctionTraits<KeyExtractor1>::result_type> >
733  const FirstDIA& first_dia, const SecondDIA& second_dia,
734  const KeyExtractor1& key_extractor1, const KeyExtractor2& key_extractor2,
735  const JoinFunction& join_function,
736  const HashFunction& hash_function = HashFunction()) {
737 
738  assert(first_dia.IsValid());
739  assert(second_dia.IsValid());
740 
741  static_assert(
742  std::is_convertible<
743  typename FirstDIA::ValueType,
744  typename common::FunctionTraits<KeyExtractor1>::template arg<0>
745  >::value,
746  "Key Extractor 1 has the wrong input type");
747 
748  static_assert(
749  std::is_convertible<
750  typename SecondDIA::ValueType,
751  typename common::FunctionTraits<KeyExtractor2>::template arg<0>
752  >::value,
753  "Key Extractor 2 has the wrong input type");
754 
755  static_assert(
756  std::is_convertible<
757  typename common::FunctionTraits<KeyExtractor1>::result_type,
758  typename common::FunctionTraits<KeyExtractor2>::result_type
759  >::value,
760  "Keys have different types");
761 
762  static_assert(
763  std::is_convertible<
764  typename FirstDIA::ValueType,
765  typename common::FunctionTraits<JoinFunction>::template arg<0>
766  >::value,
767  "Join Function has wrong input type in argument 0");
768 
769  static_assert(
770  std::is_convertible<
771  typename SecondDIA::ValueType,
772  typename common::FunctionTraits<JoinFunction>::template arg<1>
773  >::value,
774  "Join Function has wrong input type in argument 1");
775 
776  using JoinResult
777  = typename common::FunctionTraits<JoinFunction>::result_type;
778 
779  using JoinNode = api::JoinNode<
780  JoinResult, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2,
781  JoinFunction, HashFunction, LocationDetectionValue>;
782 
783  auto node = tlx::make_counting<JoinNode>(
784  first_dia, second_dia, key_extractor1, key_extractor2, join_function,
785  hash_function);
786 
787  return DIA<JoinResult>(node);
788 }
789 
790 /*!
791  * Performs an inner join between this DIA and the DIA given in the first
792  * parameter. The key from each DIA element is hereby extracted with a key
793  * extractor function. All pairs of elements with equal keys from both DIAs are
794  * then joined with the join function.
795  *
796  * \tparam KeyExtractor1 Type of the key_extractor1 function. This is a function
797  * from FirstDIA::ValueType to the key type.
798  *
799  * \tparam KeyExtractor2 Type of the key_extractor2 function. This is a function
800  * from SecondDIA::ValueType to the key type.
801  *
802  * \tparam JoinFunction Type of the join_function. This is a function from
803  * ValueType and SecondDIA::ValueType to the type of the output DIA.
804  *
805  * \param first_dia First DIA to join.
806  *
807  * \param second_dia Second DIA to join.
808  *
809  * \param key_extractor1 Key extractor for this DIA
810  *
811  * \param key_extractor2 Key extractor for second DIA
812  *
813  * \param join_function Join function applied to all equal key pairs
814  *
815  * \param hash_function If necessary a hash funtion for Key
816  *
817  * \ingroup dia_dops
818  */
819 template <
820  typename FirstDIA,
821  typename SecondDIA,
822  typename KeyExtractor1,
823  typename KeyExtractor2,
824  typename JoinFunction,
825  typename HashFunction =
826  std::hash<typename common::FunctionTraits<KeyExtractor1>::result_type> >
828  const FirstDIA& first_dia, const SecondDIA& second_dia,
829  const KeyExtractor1& key_extractor1, const KeyExtractor2& key_extractor2,
830  const JoinFunction& join_function,
831  const HashFunction& hash_function = HashFunction()) {
832  // forward to method _with_ location detection ON
833  return InnerJoin(
835  first_dia, second_dia, key_extractor1, key_extractor2,
836  join_function, hash_function);
837 }
838 
839 //! \}
840 
841 } // namespace api
842 
843 //! imported from api namespace
844 using api::InnerJoin;
845 
846 } // namespace thrill
847 
848 #endif // !THRILL_API_INNER_JOIN_HEADER
849 
850 /******************************************************************************/
std::pair< bool, bool > AddEqualKeysToFile(MergeTree &puller, const KeyExtractor &key_extractor, data::File::Writer &writer, const Key &key)
Adds all elements from merge tree to a data::File, potentially to external memory, afterwards sets the first_element pointer to the first element with a different key.
Definition: inner_join.hpp:436
bool HasNext()
HasNext() returns true if at least one more item is available.
DIAMemUse PushDataMemUse() final
Amount of RAM used by PushData()
Definition: inner_join.hpp:488
DIAMemUse ExecuteMemUse() final
Amount of RAM used by Execute()
Definition: inner_join.hpp:484
static DIAMemUse Max()
Definition: dia_base.hpp:60
data::File pre_file1_
location detection and associated files
Definition: inner_join.hpp:313
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:133
bool operator<(const HashCount &b) const
Definition: inner_join.hpp:106
Description of the amount of RAM the internal data structures of a DIANode require.
Definition: dia_base.hpp:51
void ReadBits(BitReader &reader)
Read count and dia_mask from BitReader.
Definition: inner_join.hpp:117
JoinNode(const FirstDIA &parent1, const SecondDIA &parent2, const KeyExtractor1 &key_extractor1, const KeyExtractor2 &key_extractor2, const JoinFunction &join_function, const HashFunction &hash_function)
Constructor for a JoinNode.
Definition: inner_join.hpp:134
data::File::Writer pre_writer1_
Definition: inner_join.hpp:314
data::FilePtr join_file2_
Definition: inner_join.hpp:583
#define sLOG1
Definition: logger.hpp:183
std::vector< data::Stream::Writer > hash_writers2_
Definition: inner_join.hpp:310
void AdviseFree(size_t size)
const struct LocationDetectionFlag< true > LocationDetectionTag
global const LocationDetectionFlag instance
Definition: dia.hpp:114
JoinFunction join_function_
Definition: inner_join.hpp:303
data::File::Writer pre_writer2_
Definition: inner_join.hpp:316
#define LOG1
Definition: logger.hpp:171
auto InnerJoin(const LocationDetectionFlag< LocationDetectionValue > &, const FirstDIA &first_dia, const SecondDIA &second_dia, const KeyExtractor1 &key_extractor1, const KeyExtractor2 &key_extractor2, const JoinFunction &join_function, const HashFunction &hash_function=HashFunction())
Performs an inner join between this DIA and the DIA given in the first parameter. ...
Definition: inner_join.hpp:731
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
Definition: byte_block.cpp:24
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:55
static constexpr bool debug
Definition: inner_join.hpp:65
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
bool memory_exceeded
memory limit exceeded indicator
core::LocationDetection< HashCount > location_detection_
Definition: inner_join.hpp:318
ConsumeReader GetConsumeReader(size_t num_prefetch=File::default_prefetch)
Get consuming BlockReader for beginning of File.
Definition: file.cpp:71
void ReceiveItems(size_t capacity, data::MixStream::MixReader &reader, std::deque< data::File > &files, const KeyExtractor &key_extractor)
Recieve all elements from a stream and write them to files sorted by key.
Definition: inner_join.hpp:496
void StartPrefetch(std::vector< Reader > &readers, size_t prefetch)
Take a vector of Readers and prefetch equally from them.
Definition: file.hpp:508
A DIANode is a typed node representing and operation in Thrill.
Definition: dia_node.hpp:37
void PreOp2(const InputTypeSecond &input)
Definition: inner_join.hpp:332
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: inner_join.hpp:290
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
Definition: inner_join.hpp:214
const size_t & id() const
return unique id() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213
KeyExtractor2 key_extractor2_
Definition: inner_join.hpp:302
typename FirstDIA::ValueType InputTypeFirst
Definition: inner_join.hpp:70
void JoinAllElements(const std::vector< InputTypeFirst > &vec1, bool external1, const std::vector< InputTypeSecond > &vec2, bool external2)
Joins all elements in cartesian product of both vectors.
Definition: inner_join.hpp:590
void WriteBits(BitWriter &writer) const
Write count and dia_mask to BitWriter.
Definition: inner_join.hpp:124
void swap(CountingPtr< A > &a1, CountingPtr< A > &a2) noexcept
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
Reader to retrieve items in unordered sequence from a MixBlockQueue.
std::vector< data::Stream::Writer > hash_writers1_
Definition: inner_join.hpp:308
DIAMemUse PreOpMemUse() final
Amount of RAM used by PreOp after StartPreOp()
Definition: inner_join.hpp:452
CompareFunction auto MakePuller(std::deque< data::File > &files, std::vector< data::File::Reader > &seq, CompareFunction compare_function, bool consume)
Definition: inner_join.hpp:198
void MainOp()
Receive elements from other workers, create pre-sorted files.
Definition: inner_join.hpp:344
data::MixStreamPtr hash_stream2_
Definition: inner_join.hpp:309
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
void StopPreOp(size_t id) final
Virtual method for preparing end of PushData.
Definition: inner_join.hpp:473
KeyExtractor1 key_extractor1_
user-defined functions
Definition: inner_join.hpp:301
std::vector< size_t > parent_ids() const
Returns the parents of this DIABase.
Definition: dia_base.hpp:258
std::deque< data::File > files2_
Definition: inner_join.hpp:298
data::FilePtr GetFilePtr(size_t dia_id)
Definition: context.cpp:1027
static IntegerType AddTruncToType(const IntegerType &a, const IntegerType &b)
Definition: math.hpp:30
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:285
data::MixStreamPtr GetNewMixStream(size_t dia_id)
Definition: context.cpp:1044
HashCount operator+(const HashCount &b) const
Definition: inner_join.hpp:90
DIAMemUse mem_limit_
Definition: dia_base.hpp:314
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
bool location_detection_initialized_
Definition: inner_join.hpp:319
typename SecondDIA::ValueType InputTypeSecond
Definition: inner_join.hpp:71
hash counter used by LocationDetection
Definition: inner_join.hpp:77
void StartPreOp(size_t id) final
Virtual method for preparing start of PushData.
Definition: inner_join.hpp:456
HashCount & operator+=(const HashCount &b)
Definition: inner_join.hpp:99
data::MixStreamPtr hash_stream1_
data streams for inter-worker communication of DIA elements
Definition: inner_join.hpp:307
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
std::pair< bool, bool > AddEqualKeysToVec(std::vector< ItemType > &vec, MergeTree &puller, const KeyExtractor &key_extractor, data::FilePtr &file_ptr)
Adds all elements from merge tree to a vector, afterwards sets the first_element pointer to the first...
Definition: inner_join.hpp:381
typename common::FunctionTraits< KeyExtractor1 >::result_type Key
Key type of join. must be equal to the other key extractor.
Definition: inner_join.hpp:74
data::FilePtr join_file1_
Definition: inner_join.hpp:582
void Close()
Explicitly close the writer.
static constexpr size_t counter_bits_
Definition: inner_join.hpp:88
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:253
tag structure for GroupByKey(), and InnerJoin()
Definition: dia.hpp:108
void SortAndWriteToFile(std::vector< ItemType > &vec, std::deque< data::File > &files, const KeyExtractor &key_extractor)
Sorts all elements in a vector and writes them to a file.
Definition: inner_join.hpp:670
std::deque< data::File > files1_
files for sorted datasets
Definition: inner_join.hpp:297
void PreOp1(const InputTypeFirst &input)
Definition: inner_join.hpp:321
std::pair< size_t, size_t > MaxMergeDegreePrefetch(std::deque< data::File > &files)
Definition: inner_join.hpp:518
HashFunction hash_function_
Definition: inner_join.hpp:304
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Definition: file.cpp:58
Context & context_
associated Context
Definition: dia_base.hpp:293
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
Definition: context.hpp:326
void MergeFiles(std::deque< data::File > &files, CompareFunction compare_function)
Merge files when there are too many for the merge tree to handle.
Definition: inner_join.hpp:540
Performs an inner join between two DIAs.
Definition: inner_join.hpp:62
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:167