Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
inner_join.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/inner_join.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Alexander Noe <[email protected]>
7  * Copyright (C) 2017 Tim Zeitz <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_API_INNER_JOIN_HEADER
14 #define THRILL_API_INNER_JOIN_HEADER
15 
16 #include <thrill/api/dia.hpp>
17 #include <thrill/api/dop_node.hpp>
20 #include <thrill/common/logger.hpp>
24 #include <thrill/data/file.hpp>
25 
26 #include <algorithm>
27 #include <deque>
28 #include <functional>
29 #include <utility>
30 #include <vector>
31 
32 namespace thrill {
33 namespace api {
34 
35 /*!
36  * Performs an inner join between two DIAs. The key from each DIA element is
37  * hereby extracted with a key extractor function. All pairs of elements with
38  * equal keys from both DIAs are then joined with the join function.
39  *
40  * \tparam KeyExtractor1 Type of the key_extractor1 function. This is a
41  * function ValueType to the key type.
42  *
43  * \tparam KeyExtractor2 Type of the key_extractor2 function. This is a
44  * function from SecondDIA::ValueType to the key type.
45  *
46  * \tparam JoinFunction Type of the join_function. This is a function
47  * from ValueType and SecondDIA::ValueType to the type of the output DIA.
48  *
49  * \param SecondDIA Other DIA joined with this DIA.
50  *
51  * \param key_extractor1 Key extractor for first DIA
52  *
53  * \param key_extractor2 Key extractor for second DIA
54  *
55  * \param join_function Join function applied to all equal key pairs
56  *
57  * \ingroup dia_dops
58  */
59 template <typename ValueType, typename FirstDIA, typename SecondDIA,
60  typename KeyExtractor1, typename KeyExtractor2,
61  typename JoinFunction, typename HashFunction,
62  bool UseLocationDetection>
63 class JoinNode final : public DOpNode<ValueType>
64 {
65 private:
66  static constexpr bool debug = false;
67 
69  using Super::context_;
70 
71  using InputTypeFirst = typename FirstDIA::ValueType;
72  using InputTypeSecond = typename SecondDIA::ValueType;
73 
74  //! Key type of join. must be equal to the other key extractor
75  using Key = typename common::FunctionTraits<KeyExtractor1>::result_type;
76 
77  //! hash counter used by LocationDetection
78  class HashCount
79  {
80  public:
81  using HashType = size_t;
82  using CounterType = uint8_t;
83  using DIAIdxType = uint8_t;
84 
85  size_t hash;
88 
89  static constexpr size_t counter_bits_ = 8 * sizeof(CounterType);
90 
91  HashCount operator + (const HashCount& b) const {
92  assert(hash == b.hash);
93  return HashCount {
94  hash,
96  static_cast<DIAIdxType>(dia_mask | b.dia_mask)
97  };
98  }
99 
101  assert(hash == b.hash);
103  dia_mask |= b.dia_mask;
104  return *this;
105  }
106 
107  bool operator < (const HashCount& b) const { return hash < b.hash; }
108 
109  //! method to check if this hash count should be broadcasted to all
110  //! workers interested -- for InnerJoin this check if the dia_mask == 3
111  //! -> hash was in both DIAs on some worker.
112  bool NeedBroadcast() const {
113  return dia_mask == 3;
114  }
115 
116  //! Read count and dia_mask from BitReader
117  template <typename BitReader>
118  void ReadBits(BitReader& reader) {
119  count = reader.GetBits(counter_bits_);
120  dia_mask = reader.GetBits(2);
121  }
122 
123  //! Write count and dia_mask to BitWriter
124  template <typename BitWriter>
125  void WriteBits(BitWriter& writer) const {
126  writer.PutBits(count, counter_bits_);
127  writer.PutBits(dia_mask, 2);
128  }
129  };
130 
131 public:
132  /*!
133  * Constructor for a JoinNode.
134  */
135  JoinNode(const FirstDIA& parent1, const SecondDIA& parent2,
136  const KeyExtractor1& key_extractor1,
137  const KeyExtractor2& key_extractor2,
138  const JoinFunction& join_function,
139  const HashFunction& hash_function)
140  : Super(parent1.ctx(), "Join",
141  { parent1.id(), parent2.id() },
142  { parent1.node(), parent2.node() }),
143  key_extractor1_(key_extractor1),
144  key_extractor2_(key_extractor2),
145  join_function_(join_function),
146  hash_function_(hash_function)
147  {
148  auto pre_op_fn1 = [this](const InputTypeFirst& input) {
149  PreOp1(input);
150  };
151 
152  auto pre_op_fn2 = [this](const InputTypeSecond& input) {
153  PreOp2(input);
154  };
155 
156  auto lop_chain1 = parent1.stack().push(pre_op_fn1).fold();
157  auto lop_chain2 = parent2.stack().push(pre_op_fn2).fold();
158  parent1.node()->AddChild(this, lop_chain1, 0);
159  parent2.node()->AddChild(this, lop_chain2, 1);
160  }
161 
162  void Execute() final {
163 
164  if (UseLocationDetection) {
165  std::unordered_map<size_t, size_t> target_processors;
166  size_t max_hash = location_detection_.Flush(target_processors);
167  location_detection_.Dispose();
168 
169  auto file1reader = pre_file1_.GetConsumeReader();
170  while (file1reader.HasNext()) {
171  InputTypeFirst in1 = file1reader.template Next<InputTypeFirst>();
172  auto target_processor =
173  target_processors.find(
174  hash_function_(key_extractor1_(in1)) % max_hash);
175  if (target_processor != target_processors.end()) {
176  hash_writers1_[target_processor->second].Put(in1);
177  }
178  }
179 
180  auto file2reader = pre_file2_.GetConsumeReader();
181  while (file2reader.HasNext()) {
182  InputTypeSecond in2 = file2reader.template Next<InputTypeSecond>();
183  auto target_processor =
184  target_processors.find(
185  hash_function_(key_extractor2_(in2)) % max_hash);
186  if (target_processor != target_processors.end()) {
187  hash_writers2_[target_processor->second].Put(in2);
188  }
189  }
190  }
191 
192  hash_writers1_.Close();
193  hash_writers2_.Close();
194 
195  MainOp();
196  }
197 
198  template <typename ElementType, typename CompareFunction>
199  auto MakePuller(std::deque<data::File>& files,
200  std::vector<data::File::Reader>& seq,
201  CompareFunction compare_function, bool consume) {
202 
203  size_t merge_degree, prefetch;
204  std::tie(merge_degree, prefetch) =
206  // construct output merger of remaining Files
207  seq.reserve(files.size());
208  for (size_t t = 0; t < files.size(); ++t)
209  seq.emplace_back(files[t].GetReader(consume, /* prefetch */ 0));
210  StartPrefetch(seq, prefetch);
211 
212  return core::make_buffered_multiway_merge_tree<ElementType>(
213  seq.begin(), seq.end(), compare_function);
214  }
215 
216  void PushData(bool consume) final {
217 
218  auto compare_function_1 =
219  [this](const InputTypeFirst& in1, const InputTypeFirst& in2) {
220  return key_extractor1_(in1) < key_extractor1_(in2);
221  };
222 
223  auto compare_function_2 =
224  [this](const InputTypeSecond& in1, const InputTypeSecond& in2) {
225  return key_extractor2_(in1) < key_extractor2_(in2);
226  };
227 
228  // no possible join results when at least one data set is empty
229  if (!files1_.size() || !files2_.size())
230  return;
231 
232  //! Merge files when there are too many for the merge tree.
233  MergeFiles<InputTypeFirst>(files1_, compare_function_1);
234  MergeFiles<InputTypeSecond>(files2_, compare_function_2);
235 
236  std::vector<data::File::Reader> seq1;
237  std::vector<data::File::Reader> seq2;
238 
239  // construct output merger of remaining Files
240  auto puller1 = MakePuller<InputTypeFirst>(
241  files1_, seq1, compare_function_1, consume);
242  auto puller2 = MakePuller<InputTypeSecond>(
243  files2_, seq2, compare_function_2, consume);
244 
245  bool puller1_done = false;
246  if (!puller1.HasNext())
247  puller1_done = true;
248 
249  bool puller2_done = false;
250  if (!puller2.HasNext())
251  puller2_done = true;
252 
253  //! cache for elements with equal keys, cartesian product of both caches
254  //! are joined with the join_function
255  std::vector<InputTypeFirst> equal_keys1;
256  std::vector<InputTypeSecond> equal_keys2;
257 
258  while (!puller1_done && !puller2_done) {
259  //! find elements with equal key
260  if (key_extractor1_(puller1.Top()) <
261  key_extractor2_(puller2.Top())) {
262  if (!puller1.Update()) {
263  puller1_done = true;
264  break;
265  }
266  }
267  else if (key_extractor2_(puller2.Top()) <
268  key_extractor1_(puller1.Top())) {
269  if (!puller2.Update()) {
270  puller2_done = true;
271  break;
272  }
273  }
274  else {
275  bool external1 = false;
276  bool external2 = false;
277  equal_keys1.clear();
278  equal_keys2.clear();
279  std::tie(puller1_done, external1) =
280  AddEqualKeysToVec(equal_keys1, puller1,
282 
283  std::tie(puller2_done, external2) =
284  AddEqualKeysToVec(equal_keys2, puller2,
286 
287  JoinAllElements(equal_keys1, external1, equal_keys2, external2);
288  }
289  }
290  }
291 
292  void Dispose() final {
293  files1_.clear();
294  files2_.clear();
295  }
296 
297 private:
298  //! files for sorted datasets
299  std::deque<data::File> files1_;
300  std::deque<data::File> files2_;
301 
302  //! user-defined functions
303  KeyExtractor1 key_extractor1_;
304  KeyExtractor2 key_extractor2_;
305  JoinFunction join_function_;
306  HashFunction hash_function_;
307 
308  //! data streams for inter-worker communication of DIA elements
313 
314  //! location detection and associated files
319 
322 
323  void PreOp1(const InputTypeFirst& input) {
324  size_t hash = hash_function_(key_extractor1_(input));
325  if (UseLocationDetection) {
326  pre_writer1_.Put(input);
327  location_detection_.Insert(HashCount { hash, 1, /* dia_mask */ 1 });
328  }
329  else {
330  hash_writers1_[hash % context_.num_workers()].Put(input);
331  }
332  }
333 
334  void PreOp2(const InputTypeSecond& input) {
335  size_t hash = hash_function_(key_extractor2_(input));
336  if (UseLocationDetection) {
337  pre_writer2_.Put(input);
338  location_detection_.Insert(HashCount { hash, 1, /* dia_mask */ 2 });
339  }
340  else {
341  hash_writers2_[hash % context_.num_workers()].Put(input);
342  }
343  }
344 
345  //! Receive elements from other workers, create pre-sorted files
346  void MainOp() {
347  data::MixStream::MixReader reader1_ =
348  hash_stream1_->GetMixReader(/* consume */ true);
349 
350  size_t capacity = DIABase::mem_limit_ / sizeof(InputTypeFirst) / 2;
351 
352  ReceiveItems<InputTypeFirst>(capacity, reader1_, files1_, key_extractor1_);
353 
354  data::MixStream::MixReader reader2_ =
355  hash_stream2_->GetMixReader(/* consume */ true);
356 
357  capacity = DIABase::mem_limit_ / sizeof(InputTypeSecond) / 2;
358 
359  ReceiveItems<InputTypeSecond>(capacity, reader2_, files2_, key_extractor2_);
360  }
361 
362  template <typename ItemType>
363  size_t JoinCapacity() {
364  return DIABase::mem_limit_ / sizeof(ItemType) / 4;
365  }
366 
367  /*!
368  * Adds all elements from merge tree to a vector, afterwards sets the first_element
369  * pointer to the first element with a different key.
370  *
371  * \param vec target vector
372  *
373  * \param puller Input merge tree
374  *
375  * \param key_extractor Key extractor function
376  *
377  * \param file_ptr Pointer to a data::File
378  *
379  * \return Pair of bools, first bool indicates whether the merge tree is
380  * emptied, second bool indicates whether external memory was needed.
381  */
382  template <typename ItemType, typename KeyExtractor, typename MergeTree>
383  std::pair<bool, bool> AddEqualKeysToVec(
384  std::vector<ItemType>& vec, MergeTree& puller,
385  const KeyExtractor& key_extractor, data::FilePtr& file_ptr) {
386 
387  vec.push_back(puller.Top());
388  Key key = key_extractor(puller.Top());
389 
390  size_t capacity = JoinCapacity<ItemType>();
391 
392  if (!puller.Update())
393  return std::make_pair(true, false);
394 
395  while (key_extractor(puller.Top()) == key) {
396 
397  if (!mem::memory_exceeded && vec.size() < capacity) {
398  vec.push_back(puller.Top());
399  }
400  else {
401  file_ptr = context_.GetFilePtr(this);
402  data::File::Writer writer = file_ptr->GetWriter();
403  for (const ItemType& item : vec) {
404  writer.Put(item);
405  }
406  writer.Put(puller.Top());
407  //! vec is very large when this happens
408  //! swap with empty vector to free the memory
409  std::vector<ItemType>().swap(vec);
410 
411  return AddEqualKeysToFile(puller, key_extractor, writer, key);
412  }
413 
414  if (!puller.Update())
415  return std::make_pair(true, false);
416  }
417 
418  return std::make_pair(false, false);
419  }
420 
421  /*!
422  * Adds all elements from merge tree to a data::File, potentially to external memory,
423  * afterwards sets the first_element pointer to the first element with a different key.
424  *
425  * \param puller Input merge tree
426  *
427  * \param key_extractor Key extractor function
428  *
429  * \param writer File writer
430  *
431  * \param key target key
432  *
433  * \return Pair of bools, first bool indicates whether the merge tree is
434  * emptied, second bool indicates whether external memory was needed (always true, when
435  * this method was called).
436  */
437  template <typename KeyExtractor, typename MergeTree>
438  std::pair<bool, bool> AddEqualKeysToFile(
439  MergeTree& puller, const KeyExtractor& key_extractor,
440  data::File::Writer& writer, const Key& key) {
441  if (!puller.Update()) {
442  return std::make_pair(true, true);
443  }
444 
445  while (key_extractor(puller.Top()) == key) {
446  writer.Put(puller.Top());
447  if (!puller.Update())
448  return std::make_pair(true, true);
449  }
450 
451  return std::make_pair(false, true);
452  }
453 
455  return DIAMemUse::Max();
456  }
457 
458  void StartPreOp(size_t id) final {
459  LOG << *this << " running StartPreOp parent_idx=" << id;
460  if (!location_detection_initialized_ && UseLocationDetection) {
463  }
464 
465  auto ids = this->parent_ids();
466 
467  if (id == 0) {
469  }
470  if (id == 1) {
472  }
473  }
474 
475  void StopPreOp(size_t id) final {
476  LOG << *this << " running StopPreOp parent_idx=" << id;
477 
478  if (id == 0) {
480  }
481  if (id == 1) {
483  }
484  }
485 
487  return DIAMemUse::Max();
488  }
489 
491  return DIAMemUse::Max();
492  }
493 
494  /*!
495  * Recieve all elements from a stream and write them to files sorted by key.
496  */
497  template <typename ItemType, typename KeyExtractor>
499  size_t capacity, data::MixStream::MixReader& reader,
500  std::deque<data::File>& files, const KeyExtractor& key_extractor) {
501 
502  std::vector<ItemType> vec;
503  vec.reserve(capacity);
504 
505  while (reader.HasNext()) {
506  if (vec.size() < capacity) {
507  vec.push_back(reader.template Next<ItemType>());
508  }
509  else {
510  SortAndWriteToFile(vec, files, key_extractor);
511  }
512  }
513 
514  if (vec.size())
515  SortAndWriteToFile(vec, files, key_extractor);
516  }
517 
518  /*!
519  * Merge files when there are too many for the merge tree to handle
520  */
521  template <typename ItemType, typename CompareFunction>
522  void MergeFiles(std::deque<data::File>& files,
523  CompareFunction compare_function) {
524 
525  size_t merge_degree, prefetch;
526 
527  // merge batches of files if necessary
528  while (std::tie(merge_degree, prefetch) =
530  files.size() > merge_degree)
531  {
532  sLOG1 << "Partial multi-way-merge of"
533  << merge_degree << "files with prefetch" << prefetch;
534 
535  // create merger for first merge_degree_ Files
536  std::vector<data::File::ConsumeReader> seq;
537  seq.reserve(merge_degree);
538 
539  for (size_t t = 0; t < merge_degree; ++t)
540  seq.emplace_back(files[t].GetConsumeReader(/* prefetch */ 0));
541 
542  StartPrefetch(seq, prefetch);
543 
544  auto puller = core::make_multiway_merge_tree<ItemType>(
545  seq.begin(), seq.end(), compare_function);
546 
547  // create new File for merged items
548  files.emplace_back(context_.GetFile(this));
549  auto writer = files.back().GetWriter();
550 
551  while (puller.HasNext()) {
552  writer.Put(puller.Next());
553  }
554  writer.Close();
555 
556  // this clear is important to release references to the files.
557  seq.clear();
558 
559  // remove merged files
560  files.erase(files.begin(), files.begin() + merge_degree);
561  }
562  }
563 
566 
567  /*!
568  * Joins all elements in cartesian product of both vectors. Uses files when
569  * one of the data sets is too large to fit in memory. (indicated by
570  * 'external' bools)
571  */
573  const std::vector<InputTypeFirst>& vec1, bool external1,
574  const std::vector<InputTypeSecond>& vec2, bool external2) {
575 
576  if (!external1 && !external2) {
577  for (const InputTypeFirst& join1 : vec1) {
578  for (const InputTypeSecond& join2 : vec2) {
579  assert(key_extractor1_(join1) == key_extractor2_(join2));
580  this->PushItem(join_function_(join1, join2));
581  }
582  }
583  }
584  else if (external1 && !external2) {
585  LOG1 << "Thrill: Warning: Too many equal keys for main memory "
586  << "in first DIA";
587 
588  data::File::ConsumeReader reader = join_file1_->GetConsumeReader();
589 
590  while (reader.HasNext()) {
591  InputTypeFirst join1 = reader.template Next<InputTypeFirst>();
592  for (auto const& join2 : vec2) {
593  assert(key_extractor1_(join1) == key_extractor2_(join2));
594  this->PushItem(join_function_(join1, join2));
595  }
596  }
597  }
598  else if (!external1 && external2) {
599  LOG1 << "Thrill: Warning: Too many equal keys for main memory "
600  << "in second DIA";
601 
602  data::File::ConsumeReader reader = join_file2_->GetConsumeReader();
603 
604  while (reader.HasNext()) {
605  InputTypeSecond join2 = reader.template Next<InputTypeSecond>();
606  for (const InputTypeFirst& join1 : vec1) {
607  assert(key_extractor1_(join1) == key_extractor2_(join2));
608  this->PushItem(join_function_(join1, join2));
609  }
610  }
611  }
612  else if (external1 && external2) {
613  LOG1 << "Thrill: Warning: Too many equal keys for main memory "
614  << "in both DIAs. This is very slow.";
615 
616  size_t capacity = JoinCapacity<InputTypeFirst>();
617 
618  std::vector<InputTypeFirst> temp_vec;
619  temp_vec.reserve(capacity);
620 
621  //! file 2 needs to be read multiple times
622  data::File::ConsumeReader reader1 = join_file1_->GetConsumeReader();
623 
624  while (reader1.HasNext()) {
625 
626  for (size_t i = 0; i < capacity && reader1.HasNext() &&
627  !mem::memory_exceeded; ++i) {
628  temp_vec.push_back(reader1.template Next<InputTypeFirst>());
629  }
630 
631  data::File::Reader reader2 = join_file2_->GetReader(/* consume */ false);
632 
633  while (reader2.HasNext()) {
634  InputTypeSecond join2 = reader2.template Next<InputTypeSecond>();
635  for (const InputTypeFirst& join1 : temp_vec) {
636  assert(key_extractor1_(join1) == key_extractor2_(join2));
637  this->PushItem(join_function_(join1, join2));
638  }
639  }
640  temp_vec.clear();
641  }
642 
643  //! non-consuming reader, need to clear now
644  join_file2_->Clear();
645  }
646  }
647 
648  /*!
649  * Sorts all elements in a vector and writes them to a file.
650  */
651  template <typename ItemType, typename KeyExtractor>
653  std::vector<ItemType>& vec, std::deque<data::File>& files,
654  const KeyExtractor& key_extractor) {
655 
656  // advise block pool to write out data if necessary
657  context_.block_pool().AdviseFree(vec.size() * sizeof(ValueType));
658 
659  std::sort(vec.begin(), vec.end(),
660  [&key_extractor](const ItemType& i1, const ItemType& i2) {
661  return key_extractor(i1) < key_extractor(i2);
662  });
663 
664  files.emplace_back(context_.GetFile(this));
665  auto writer = files.back().GetWriter();
666  for (const ItemType& elem : vec) {
667  writer.Put(elem);
668  }
669  writer.Close();
670 
671  vec.clear();
672  }
673 };
674 
675 /*!
676  * Performs an inner join between this DIA and the DIA given in the first
677  * parameter. The key from each DIA element is hereby extracted with a key
678  * extractor function. All pairs of elements with equal keys from both DIAs are
679  * then joined with the join function.
680  *
681  * \tparam KeyExtractor1 Type of the key_extractor1 function. This is a function
682  * from FirstDIA::ValueType to the key type.
683  *
684  * \tparam KeyExtractor2 Type of the key_extractor2 function. This is a function
685  * from SecondDIA::ValueType to the key type.
686  *
687  * \tparam JoinFunction Type of the join_function. This is a function from
688  * ValueType and SecondDIA::ValueType to the type of the output DIA.
689  *
690  * \param first_dia First DIA to join.
691  *
692  * \param second_dia Second DIA to join.
693  *
694  * \param key_extractor1 Key extractor for this DIA
695  *
696  * \param key_extractor2 Key extractor for second DIA
697  *
698  * \param join_function Join function applied to all equal key pairs
699  *
700  * \param hash_function If necessary a hash funtion for Key
701  *
702  * \ingroup dia_dops
703  */
704 template <
705  bool LocationDetectionValue,
706  typename FirstDIA,
707  typename SecondDIA,
708  typename KeyExtractor1,
709  typename KeyExtractor2,
710  typename JoinFunction,
711  typename HashFunction =
712  std::hash<typename common::FunctionTraits<KeyExtractor1>::result_type> >
715  const FirstDIA& first_dia, const SecondDIA& second_dia,
716  const KeyExtractor1& key_extractor1, const KeyExtractor2& key_extractor2,
717  const JoinFunction& join_function,
718  const HashFunction& hash_function = HashFunction()) {
719 
720  assert(first_dia.IsValid());
721  assert(second_dia.IsValid());
722 
723  static_assert(
724  std::is_convertible<
725  typename FirstDIA::ValueType,
726  typename common::FunctionTraits<KeyExtractor1>::template arg<0>
727  >::value,
728  "Key Extractor 1 has the wrong input type");
729 
730  static_assert(
731  std::is_convertible<
732  typename SecondDIA::ValueType,
733  typename common::FunctionTraits<KeyExtractor2>::template arg<0>
734  >::value,
735  "Key Extractor 2 has the wrong input type");
736 
737  static_assert(
738  std::is_convertible<
739  typename common::FunctionTraits<KeyExtractor1>::result_type,
740  typename common::FunctionTraits<KeyExtractor2>::result_type
741  >::value,
742  "Keys have different types");
743 
744  static_assert(
745  std::is_convertible<
746  typename FirstDIA::ValueType,
747  typename common::FunctionTraits<JoinFunction>::template arg<0>
748  >::value,
749  "Join Function has wrong input type in argument 0");
750 
751  static_assert(
752  std::is_convertible<
753  typename SecondDIA::ValueType,
754  typename common::FunctionTraits<JoinFunction>::template arg<1>
755  >::value,
756  "Join Function has wrong input type in argument 1");
757 
758  using JoinResult
759  = typename common::FunctionTraits<JoinFunction>::result_type;
760 
761  using JoinNode = api::JoinNode<
762  JoinResult, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2,
763  JoinFunction, HashFunction, LocationDetectionValue>;
764 
765  auto node = tlx::make_counting<JoinNode>(
766  first_dia, second_dia, key_extractor1, key_extractor2, join_function,
767  hash_function);
768 
769  return DIA<JoinResult>(node);
770 }
771 
772 /*!
773  * Performs an inner join between this DIA and the DIA given in the first
774  * parameter. The key from each DIA element is hereby extracted with a key
775  * extractor function. All pairs of elements with equal keys from both DIAs are
776  * then joined with the join function.
777  *
778  * \tparam KeyExtractor1 Type of the key_extractor1 function. This is a function
779  * from FirstDIA::ValueType to the key type.
780  *
781  * \tparam KeyExtractor2 Type of the key_extractor2 function. This is a function
782  * from SecondDIA::ValueType to the key type.
783  *
784  * \tparam JoinFunction Type of the join_function. This is a function from
785  * ValueType and SecondDIA::ValueType to the type of the output DIA.
786  *
787  * \param first_dia First DIA to join.
788  *
789  * \param second_dia Second DIA to join.
790  *
791  * \param key_extractor1 Key extractor for this DIA
792  *
793  * \param key_extractor2 Key extractor for second DIA
794  *
795  * \param join_function Join function applied to all equal key pairs
796  *
797  * \param hash_function If necessary a hash funtion for Key
798  *
799  * \ingroup dia_dops
800  */
801 template <
802  typename FirstDIA,
803  typename SecondDIA,
804  typename KeyExtractor1,
805  typename KeyExtractor2,
806  typename JoinFunction,
807  typename HashFunction =
808  std::hash<typename common::FunctionTraits<KeyExtractor1>::result_type> >
810  const FirstDIA& first_dia, const SecondDIA& second_dia,
811  const KeyExtractor1& key_extractor1, const KeyExtractor2& key_extractor2,
812  const JoinFunction& join_function,
813  const HashFunction& hash_function = HashFunction()) {
814  // forward to method _with_ location detection ON
815  return InnerJoin(
817  first_dia, second_dia, key_extractor1, key_extractor2,
818  join_function, hash_function);
819 }
820 
821 //! \}
822 
823 } // namespace api
824 
825 //! imported from api namespace
826 using api::InnerJoin;
827 
828 } // namespace thrill
829 
830 #endif // !THRILL_API_INNER_JOIN_HEADER
831 
832 /******************************************************************************/
std::pair< bool, bool > AddEqualKeysToFile(MergeTree &puller, const KeyExtractor &key_extractor, data::File::Writer &writer, const Key &key)
Adds all elements from merge tree to a data::File, potentially to external memory, afterwards sets the first_element pointer to the first element with a different key.
Definition: inner_join.hpp:438
void StartPrefetch(std::vector< Reader > &readers, size_t prefetch_size)
Take a vector of Readers and prefetch equally from them.
Definition: file.hpp:570
bool HasNext()
HasNext() returns true if at least one more item is available.
DIAMemUse PushDataMemUse() final
Amount of RAM used by PushData()
Definition: inner_join.hpp:490
DIAMemUse ExecuteMemUse() final
Amount of RAM used by Execute()
Definition: inner_join.hpp:486
data::MixStream::Writers hash_writers1_
Definition: inner_join.hpp:310
static DIAMemUse Max()
Definition: dia_base.hpp:60
data::File pre_file1_
location detection and associated files
Definition: inner_join.hpp:315
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
bool operator<(const HashCount &b) const
Definition: inner_join.hpp:107
Description of the amount of RAM the internal data structures of a DIANode require.
Definition: dia_base.hpp:51
void ReadBits(BitReader &reader)
Read count and dia_mask from BitReader.
Definition: inner_join.hpp:118
JoinNode(const FirstDIA &parent1, const SecondDIA &parent2, const KeyExtractor1 &key_extractor1, const KeyExtractor2 &key_extractor2, const JoinFunction &join_function, const HashFunction &hash_function)
Constructor for a JoinNode.
Definition: inner_join.hpp:135
data::File::Writer pre_writer1_
Definition: inner_join.hpp:316
data::FilePtr join_file2_
Definition: inner_join.hpp:565
void AdviseFree(size_t size)
const struct LocationDetectionFlag< true > LocationDetectionTag
global const LocationDetectionFlag instance
Definition: dia.hpp:122
JoinFunction join_function_
Definition: inner_join.hpp:305
data::File::Writer pre_writer2_
Definition: inner_join.hpp:318
std::pair< size_t, size_t > MaxMergeDegreePrefetch(size_t num_files)
Definition: block_pool.cpp:703
#define LOG1
Definition: logger.hpp:28
auto InnerJoin(const LocationDetectionFlag< LocationDetectionValue > &, const FirstDIA &first_dia, const SecondDIA &second_dia, const KeyExtractor1 &key_extractor1, const KeyExtractor2 &key_extractor2, const JoinFunction &join_function, const HashFunction &hash_function=HashFunction())
Performs an inner join between this DIA and the DIA given in the first parameter. ...
Definition: inner_join.hpp:713
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
static constexpr bool debug
Definition: inner_join.hpp:66
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
bool memory_exceeded
memory limit exceeded indicator
core::LocationDetection< HashCount > location_detection_
Definition: inner_join.hpp:320
#define sLOG1
Definition: logger.hpp:38
void ReceiveItems(size_t capacity, data::MixStream::MixReader &reader, std::deque< data::File > &files, const KeyExtractor &key_extractor)
Recieve all elements from a stream and write them to files sorted by key.
Definition: inner_join.hpp:498
void PreOp2(const InputTypeSecond &input)
Definition: inner_join.hpp:334
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: inner_join.hpp:292
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
Definition: inner_join.hpp:216
const size_t & id() const
return unique id() of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213
KeyExtractor2 key_extractor2_
Definition: inner_join.hpp:304
typename FirstDIA::ValueType InputTypeFirst
Definition: inner_join.hpp:71
void JoinAllElements(const std::vector< InputTypeFirst > &vec1, bool external1, const std::vector< InputTypeSecond > &vec2, bool external2)
Joins all elements in cartesian product of both vectors.
Definition: inner_join.hpp:572
void WriteBits(BitWriter &writer) const
Write count and dia_mask to BitWriter.
Definition: inner_join.hpp:125
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
Reader to retrieve items in unordered sequence from a MixBlockQueue.
ConsumeReader GetConsumeReader(size_t prefetch_size=File::default_prefetch_size_)
Get consuming BlockReader for beginning of File.
Definition: file.cpp:73
DIAMemUse PreOpMemUse() final
Amount of RAM used by PreOp after StartPreOp()
Definition: inner_join.hpp:454
CompareFunction auto MakePuller(std::deque< data::File > &files, std::vector< data::File::Reader > &seq, CompareFunction compare_function, bool consume)
Definition: inner_join.hpp:199
void swap(CountingPtr< A, D > &a1, CountingPtr< A, D > &a2) noexcept
void MainOp()
Receive elements from other workers, create pre-sorted files.
Definition: inner_join.hpp:346
data::MixStreamPtr hash_stream2_
Definition: inner_join.hpp:311
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
StreamData::Writers Writers
Definition: stream.hpp:40
void StopPreOp(size_t id) final
Virtual method for preparing end of PushData.
Definition: inner_join.hpp:475
KeyExtractor1 key_extractor1_
user-defined functions
Definition: inner_join.hpp:303
std::vector< size_t > parent_ids() const
Returns the parents of this DIABase.
Definition: dia_base.hpp:258
std::deque< data::File > files2_
Definition: inner_join.hpp:300
tlx::counting_ptr< file > file_ptr
A reference counting pointer for file.
Definition: file.hpp:265
data::FilePtr GetFilePtr(size_t dia_id)
Definition: context.cpp:1111
static IntegerType AddTruncToType(const IntegerType &a, const IntegerType &b)
Definition: math.hpp:31
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:280
data::MixStreamPtr GetNewMixStream(size_t dia_id)
Definition: context.cpp:1128
HashCount operator+(const HashCount &b) const
Definition: inner_join.hpp:91
DIAMemUse mem_limit_
Definition: dia_base.hpp:314
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
bool location_detection_initialized_
Definition: inner_join.hpp:321
typename SecondDIA::ValueType InputTypeSecond
Definition: inner_join.hpp:72
hash counter used by LocationDetection
Definition: inner_join.hpp:78
void StartPreOp(size_t id) final
Virtual method for preparing start of PushData.
Definition: inner_join.hpp:458
HashCount & operator+=(const HashCount &b)
Definition: inner_join.hpp:100
data::MixStreamPtr hash_stream1_
data streams for inter-worker communication of DIA elements
Definition: inner_join.hpp:309
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
std::pair< bool, bool > AddEqualKeysToVec(std::vector< ItemType > &vec, MergeTree &puller, const KeyExtractor &key_extractor, data::FilePtr &file_ptr)
Adds all elements from merge tree to a vector, afterwards sets the first_element pointer to the first...
Definition: inner_join.hpp:383
typename common::FunctionTraits< KeyExtractor1 >::result_type Key
Key type of join. must be equal to the other key extractor.
Definition: inner_join.hpp:75
data::FilePtr join_file1_
Definition: inner_join.hpp:564
void Close()
Explicitly close the writer.
static constexpr size_t counter_bits_
Definition: inner_join.hpp:89
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:248
tag structure for GroupByKey(), and InnerJoin()
Definition: dia.hpp:116
void SortAndWriteToFile(std::vector< ItemType > &vec, std::deque< data::File > &files, const KeyExtractor &key_extractor)
Sorts all elements in a vector and writes them to a file.
Definition: inner_join.hpp:652
data::MixStream::Writers hash_writers2_
Definition: inner_join.hpp:312
std::deque< data::File > files1_
files for sorted datasets
Definition: inner_join.hpp:299
void PreOp1(const InputTypeFirst &input)
Definition: inner_join.hpp:323
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
HashFunction hash_function_
Definition: inner_join.hpp:306
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Definition: file.cpp:63
Context & context_
associated Context
Definition: dia_base.hpp:293
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
Definition: context.hpp:321
void MergeFiles(std::deque< data::File > &files, CompareFunction compare_function)
Merge files when there are too many for the merge tree to handle.
Definition: inner_join.hpp:522
Performs an inner join between two DIAs.
Definition: inner_join.hpp:63