Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
inner_join.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/api/inner_join.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Alexander Noe <[email protected]>
7  * Copyright (C) 2017 Tim Zeitz <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_API_INNER_JOIN_HEADER
14 #define THRILL_API_INNER_JOIN_HEADER
15 
16 #include <thrill/api/dia.hpp>
17 #include <thrill/api/dop_node.hpp>
20 #include <thrill/common/logger.hpp>
24 #include <thrill/data/file.hpp>
25 
26 #include <algorithm>
27 #include <deque>
28 #include <functional>
29 #include <utility>
30 #include <vector>
31 
32 namespace thrill {
33 namespace api {
34 
35 /*!
36  * Performs an inner join between two DIAs. The key from each DIA element is
37  * hereby extracted with a key extractor function. All pairs of elements with
38  * equal keys from both DIAs are then joined with the join function.
39  *
40  * \tparam KeyExtractor1 Type of the key_extractor1 function. This is a
41  * function ValueType to the key type.
42  *
43  * \tparam KeyExtractor2 Type of the key_extractor2 function. This is a
44  * function from SecondDIA::ValueType to the key type.
45  *
46  * \tparam JoinFunction Type of the join_function. This is a function
47  * from ValueType and SecondDIA::ValueType to the type of the output DIA.
48  *
49  * \param SecondDIA Other DIA joined with this DIA.
50  *
51  * \param key_extractor1 Key extractor for first DIA
52  *
53  * \param key_extractor2 Key extractor for second DIA
54  *
55  * \param join_function Join function applied to all equal key pairs
56  *
57  * \ingroup dia_dops
58  */
59 template <typename ValueType, typename FirstDIA, typename SecondDIA,
60  typename KeyExtractor1, typename KeyExtractor2,
61  typename JoinFunction, typename HashFunction,
62  bool UseLocationDetection>
63 class JoinNode final : public DOpNode<ValueType>
64 {
65 private:
66  static constexpr bool debug = false;
67 
69  using Super::context_;
70 
71  using InputTypeFirst = typename FirstDIA::ValueType;
72  using InputTypeSecond = typename SecondDIA::ValueType;
73 
74  //! Key type of join. must be equal to the other key extractor
75  using Key = typename common::FunctionTraits<KeyExtractor1>::result_type;
76 
77  //! hash counter used by LocationDetection
78  class HashCount
79  {
80  public:
81  using HashType = size_t;
82  using CounterType = uint8_t;
83  using DIAIdxType = uint8_t;
84 
85  size_t hash;
88 
89  static constexpr size_t counter_bits_ = 8 * sizeof(CounterType);
90 
91  HashCount operator + (const HashCount& b) const {
92  assert(hash == b.hash);
93  return HashCount {
94  hash,
96  static_cast<DIAIdxType>(dia_mask | b.dia_mask)
97  };
98  }
99 
101  assert(hash == b.hash);
103  dia_mask |= b.dia_mask;
104  return *this;
105  }
106 
107  bool operator < (const HashCount& b) const { return hash < b.hash; }
108 
109  //! method to check if this hash count should be broadcasted to all
110  //! workers interested -- for InnerJoin this check if the dia_mask == 3
111  //! -> hash was in both DIAs on some worker.
112  bool NeedBroadcast() const {
113  return dia_mask == 3;
114  }
115 
116  //! Read count and dia_mask from BitReader
117  template <typename BitReader>
118  void ReadBits(BitReader& reader) {
119  count = reader.GetBits(counter_bits_);
120  dia_mask = reader.GetBits(2);
121  }
122 
123  //! Write count and dia_mask to BitWriter
124  template <typename BitWriter>
125  void WriteBits(BitWriter& writer) const {
126  writer.PutBits(count, counter_bits_);
127  writer.PutBits(dia_mask, 2);
128  }
129  };
130 
131 public:
132  /*!
133  * Constructor for a JoinNode.
134  */
135  JoinNode(const FirstDIA& parent1, const SecondDIA& parent2,
136  const KeyExtractor1& key_extractor1,
137  const KeyExtractor2& key_extractor2,
138  const JoinFunction& join_function,
139  const HashFunction& hash_function)
140  : Super(parent1.ctx(), "Join",
141  { parent1.id(), parent2.id() },
142  { parent1.node(), parent2.node() }),
143  key_extractor1_(key_extractor1),
144  key_extractor2_(key_extractor2),
145  join_function_(join_function),
146  hash_function_(hash_function) {
147  auto pre_op_fn1 = [this](const InputTypeFirst& input) {
148  PreOp1(input);
149  };
150 
151  auto pre_op_fn2 = [this](const InputTypeSecond& input) {
152  PreOp2(input);
153  };
154 
155  auto lop_chain1 = parent1.stack().push(pre_op_fn1).fold();
156  auto lop_chain2 = parent2.stack().push(pre_op_fn2).fold();
157  parent1.node()->AddChild(this, lop_chain1, 0);
158  parent2.node()->AddChild(this, lop_chain2, 1);
159  }
160 
161  void Execute() final {
162 
163  if (UseLocationDetection) {
164  std::unordered_map<size_t, size_t> target_processors;
165  size_t max_hash = location_detection_.Flush(target_processors);
166  location_detection_.Dispose();
167 
168  auto file1reader = pre_file1_.GetConsumeReader();
169  while (file1reader.HasNext()) {
170  InputTypeFirst in1 = file1reader.template Next<InputTypeFirst>();
171  auto target_processor =
172  target_processors.find(
173  hash_function_(key_extractor1_(in1)) % max_hash);
174  if (target_processor != target_processors.end()) {
175  hash_writers1_[target_processor->second].Put(in1);
176  }
177  }
178 
179  auto file2reader = pre_file2_.GetConsumeReader();
180  while (file2reader.HasNext()) {
181  InputTypeSecond in2 = file2reader.template Next<InputTypeSecond>();
182  auto target_processor =
183  target_processors.find(
184  hash_function_(key_extractor2_(in2)) % max_hash);
185  if (target_processor != target_processors.end()) {
186  hash_writers2_[target_processor->second].Put(in2);
187  }
188  }
189  }
190 
191  hash_writers1_.Close();
192  hash_writers2_.Close();
193 
194  MainOp();
195  }
196 
197  template <typename ElementType, typename CompareFunction>
198  auto MakePuller(std::deque<data::File>& files,
199  std::vector<data::File::Reader>& seq,
200  CompareFunction compare_function, bool consume) {
201 
202  size_t merge_degree, prefetch;
203  std::tie(merge_degree, prefetch) =
205  // construct output merger of remaining Files
206  seq.reserve(files.size());
207  for (size_t t = 0; t < files.size(); ++t)
208  seq.emplace_back(files[t].GetReader(consume, /* prefetch */ 0));
209  StartPrefetch(seq, prefetch);
210 
211  return core::make_buffered_multiway_merge_tree<ElementType>(
212  seq.begin(), seq.end(), compare_function);
213  }
214 
215  void PushData(bool consume) final {
216 
217  auto compare_function_1 =
218  [this](const InputTypeFirst& in1, const InputTypeFirst& in2) {
219  return key_extractor1_(in1) < key_extractor1_(in2);
220  };
221 
222  auto compare_function_2 =
223  [this](const InputTypeSecond& in1, const InputTypeSecond& in2) {
224  return key_extractor2_(in1) < key_extractor2_(in2);
225  };
226 
227  // no possible join results when at least one data set is empty
228  if (!files1_.size() || !files2_.size())
229  return;
230 
231  //! Merge files when there are too many for the merge tree.
232  MergeFiles<InputTypeFirst>(files1_, compare_function_1);
233  MergeFiles<InputTypeSecond>(files2_, compare_function_2);
234 
235  std::vector<data::File::Reader> seq1;
236  std::vector<data::File::Reader> seq2;
237 
238  // construct output merger of remaining Files
239  auto puller1 = MakePuller<InputTypeFirst>(
240  files1_, seq1, compare_function_1, consume);
241  auto puller2 = MakePuller<InputTypeSecond>(
242  files2_, seq2, compare_function_2, consume);
243 
244  bool puller1_done = false;
245  if (!puller1.HasNext())
246  puller1_done = true;
247 
248  bool puller2_done = false;
249  if (!puller2.HasNext())
250  puller2_done = true;
251 
252  //! cache for elements with equal keys, cartesian product of both caches
253  //! are joined with the join_function
254  std::vector<InputTypeFirst> equal_keys1;
255  std::vector<InputTypeSecond> equal_keys2;
256 
257  while (!puller1_done && !puller2_done) {
258  //! find elements with equal key
259  if (key_extractor1_(puller1.Top()) <
260  key_extractor2_(puller2.Top())) {
261  if (!puller1.Update()) {
262  puller1_done = true;
263  break;
264  }
265  }
266  else if (key_extractor2_(puller2.Top()) <
267  key_extractor1_(puller1.Top())) {
268  if (!puller2.Update()) {
269  puller2_done = true;
270  break;
271  }
272  }
273  else {
274  bool external1 = false;
275  bool external2 = false;
276  equal_keys1.clear();
277  equal_keys2.clear();
278  std::tie(puller1_done, external1) =
279  AddEqualKeysToVec(equal_keys1, puller1,
281 
282  std::tie(puller2_done, external2) =
283  AddEqualKeysToVec(equal_keys2, puller2,
285 
286  JoinAllElements(equal_keys1, external1, equal_keys2, external2);
287  }
288  }
289  }
290 
291  void Dispose() final {
292  files1_.clear();
293  files2_.clear();
294  }
295 
296 private:
297  //! files for sorted datasets
298  std::deque<data::File> files1_;
299  std::deque<data::File> files2_;
300 
301  //! user-defined functions
302  KeyExtractor1 key_extractor1_;
303  KeyExtractor2 key_extractor2_;
304  JoinFunction join_function_;
305  HashFunction hash_function_;
306 
307  //! data streams for inter-worker communication of DIA elements
312 
313  //! location detection and associated files
318 
321 
322  void PreOp1(const InputTypeFirst& input) {
323  size_t hash = hash_function_(key_extractor1_(input));
324  if (UseLocationDetection) {
325  pre_writer1_.Put(input);
326  location_detection_.Insert(HashCount { hash, 1, /* dia_mask */ 1 });
327  }
328  else {
329  hash_writers1_[hash % context_.num_workers()].Put(input);
330  }
331  }
332 
333  void PreOp2(const InputTypeSecond& input) {
334  size_t hash = hash_function_(key_extractor2_(input));
335  if (UseLocationDetection) {
336  pre_writer2_.Put(input);
337  location_detection_.Insert(HashCount { hash, 1, /* dia_mask */ 2 });
338  }
339  else {
340  hash_writers2_[hash % context_.num_workers()].Put(input);
341  }
342  }
343 
344  //! Receive elements from other workers, create pre-sorted files
345  void MainOp() {
346  data::MixStream::MixReader reader1_ =
347  hash_stream1_->GetMixReader(/* consume */ true);
348 
349  size_t capacity = DIABase::mem_limit_ / sizeof(InputTypeFirst) / 2;
350 
351  ReceiveItems<InputTypeFirst>(capacity, reader1_, files1_, key_extractor1_);
352 
353  data::MixStream::MixReader reader2_ =
354  hash_stream2_->GetMixReader(/* consume */ true);
355 
356  capacity = DIABase::mem_limit_ / sizeof(InputTypeSecond) / 2;
357 
358  ReceiveItems<InputTypeSecond>(capacity, reader2_, files2_, key_extractor2_);
359  }
360 
361  template <typename ItemType>
362  size_t JoinCapacity() {
363  return DIABase::mem_limit_ / sizeof(ItemType) / 4;
364  }
365 
366  /*!
367  * Adds all elements from merge tree to a vector, afterwards sets the first_element
368  * pointer to the first element with a different key.
369  *
370  * \param vec target vector
371  *
372  * \param puller Input merge tree
373  *
374  * \param key_extractor Key extractor function
375  *
376  * \param file_ptr Pointer to a data::File
377  *
378  * \return Pair of bools, first bool indicates whether the merge tree is
379  * emptied, second bool indicates whether external memory was needed.
380  */
381  template <typename ItemType, typename KeyExtractor, typename MergeTree>
382  std::pair<bool, bool> AddEqualKeysToVec(
383  std::vector<ItemType>& vec, MergeTree& puller,
384  const KeyExtractor& key_extractor, data::FilePtr& file_ptr) {
385 
386  vec.push_back(puller.Top());
387  Key key = key_extractor(puller.Top());
388 
389  size_t capacity = JoinCapacity<ItemType>();
390 
391  if (!puller.Update())
392  return std::make_pair(true, false);
393 
394  while (key_extractor(puller.Top()) == key) {
395 
396  if (!mem::memory_exceeded && vec.size() < capacity) {
397  vec.push_back(puller.Top());
398  }
399  else {
400  file_ptr = context_.GetFilePtr(this);
401  data::File::Writer writer = file_ptr->GetWriter();
402  for (const ItemType& item : vec) {
403  writer.Put(item);
404  }
405  writer.Put(puller.Top());
406  //! vec is very large when this happens
407  //! swap with empty vector to free the memory
408  tlx::vector_free(vec);
409 
410  return AddEqualKeysToFile(puller, key_extractor, writer, key);
411  }
412 
413  if (!puller.Update())
414  return std::make_pair(true, false);
415  }
416 
417  return std::make_pair(false, false);
418  }
419 
420  /*!
421  * Adds all elements from merge tree to a data::File, potentially to external memory,
422  * afterwards sets the first_element pointer to the first element with a different key.
423  *
424  * \param puller Input merge tree
425  *
426  * \param key_extractor Key extractor function
427  *
428  * \param writer File writer
429  *
430  * \param key target key
431  *
432  * \return Pair of bools, first bool indicates whether the merge tree is
433  * emptied, second bool indicates whether external memory was needed (always true, when
434  * this method was called).
435  */
436  template <typename KeyExtractor, typename MergeTree>
437  std::pair<bool, bool> AddEqualKeysToFile(
438  MergeTree& puller, const KeyExtractor& key_extractor,
439  data::File::Writer& writer, const Key& key) {
440  if (!puller.Update()) {
441  return std::make_pair(true, true);
442  }
443 
444  while (key_extractor(puller.Top()) == key) {
445  writer.Put(puller.Top());
446  if (!puller.Update())
447  return std::make_pair(true, true);
448  }
449 
450  return std::make_pair(false, true);
451  }
452 
454  return DIAMemUse::Max();
455  }
456 
457  void StartPreOp(size_t parent_index) final {
458  LOG << *this << " running StartPreOp parent_index=" << parent_index;
459  if (!location_detection_initialized_ && UseLocationDetection) {
462  }
463 
464  auto ids = this->parent_ids();
465 
466  if (parent_index == 0) {
468  }
469  if (parent_index == 1) {
471  }
472  }
473 
474  void StopPreOp(size_t parent_index) final {
475  LOG << *this << " running StopPreOp parent_index=" << parent_index;
476 
477  if (parent_index == 0) {
479  }
480  if (parent_index == 1) {
482  }
483  }
484 
486  return DIAMemUse::Max();
487  }
488 
490  return DIAMemUse::Max();
491  }
492 
493  /*!
494  * Recieve all elements from a stream and write them to files sorted by key.
495  */
496  template <typename ItemType, typename KeyExtractor>
498  size_t capacity, data::MixStream::MixReader& reader,
499  std::deque<data::File>& files, const KeyExtractor& key_extractor) {
500 
501  std::vector<ItemType> vec;
502  vec.reserve(capacity);
503 
504  while (reader.HasNext()) {
505  if (vec.size() < capacity) {
506  vec.push_back(reader.template Next<ItemType>());
507  }
508  else {
509  SortAndWriteToFile(vec, files, key_extractor);
510  }
511  }
512 
513  if (vec.size())
514  SortAndWriteToFile(vec, files, key_extractor);
515  }
516 
517  /*!
518  * Merge files when there are too many for the merge tree to handle
519  */
520  template <typename ItemType, typename CompareFunction>
521  void MergeFiles(std::deque<data::File>& files,
522  CompareFunction compare_function) {
523 
524  size_t merge_degree, prefetch;
525 
526  // merge batches of files if necessary
527  while (std::tie(merge_degree, prefetch) =
529  files.size() > merge_degree)
530  {
531  sLOG1 << "Partial multi-way-merge of"
532  << merge_degree << "files with prefetch" << prefetch;
533 
534  // create merger for first merge_degree_ Files
535  std::vector<data::File::ConsumeReader> seq;
536  seq.reserve(merge_degree);
537 
538  for (size_t t = 0; t < merge_degree; ++t)
539  seq.emplace_back(files[t].GetConsumeReader(/* prefetch */ 0));
540 
541  StartPrefetch(seq, prefetch);
542 
543  auto puller = core::make_multiway_merge_tree<ItemType>(
544  seq.begin(), seq.end(), compare_function);
545 
546  // create new File for merged items
547  files.emplace_back(context_.GetFile(this));
548  auto writer = files.back().GetWriter();
549 
550  while (puller.HasNext()) {
551  writer.Put(puller.Next());
552  }
553  writer.Close();
554 
555  // this clear is important to release references to the files.
556  seq.clear();
557 
558  // remove merged files
559  files.erase(files.begin(), files.begin() + merge_degree);
560  }
561  }
562 
565 
566  /*!
567  * Joins all elements in cartesian product of both vectors. Uses files when
568  * one of the data sets is too large to fit in memory. (indicated by
569  * 'external' bools)
570  */
572  const std::vector<InputTypeFirst>& vec1, bool external1,
573  const std::vector<InputTypeSecond>& vec2, bool external2) {
574 
575  if (!external1 && !external2) {
576  for (const InputTypeFirst& join1 : vec1) {
577  for (const InputTypeSecond& join2 : vec2) {
578  assert(key_extractor1_(join1) == key_extractor2_(join2));
579  this->PushItem(join_function_(join1, join2));
580  }
581  }
582  }
583  else if (external1 && !external2) {
584  LOG1 << "Thrill: Warning: Too many equal keys for main memory "
585  << "in first DIA";
586 
587  data::File::ConsumeReader reader = join_file1_->GetConsumeReader();
588 
589  while (reader.HasNext()) {
590  InputTypeFirst join1 = reader.template Next<InputTypeFirst>();
591  for (auto const& join2 : vec2) {
592  assert(key_extractor1_(join1) == key_extractor2_(join2));
593  this->PushItem(join_function_(join1, join2));
594  }
595  }
596  }
597  else if (!external1 && external2) {
598  LOG1 << "Thrill: Warning: Too many equal keys for main memory "
599  << "in second DIA";
600 
601  data::File::ConsumeReader reader = join_file2_->GetConsumeReader();
602 
603  while (reader.HasNext()) {
604  InputTypeSecond join2 = reader.template Next<InputTypeSecond>();
605  for (const InputTypeFirst& join1 : vec1) {
606  assert(key_extractor1_(join1) == key_extractor2_(join2));
607  this->PushItem(join_function_(join1, join2));
608  }
609  }
610  }
611  else if (external1 && external2) {
612  LOG1 << "Thrill: Warning: Too many equal keys for main memory "
613  << "in both DIAs. This is very slow.";
614 
615  size_t capacity = JoinCapacity<InputTypeFirst>();
616 
617  std::vector<InputTypeFirst> temp_vec;
618  temp_vec.reserve(capacity);
619 
620  //! file 2 needs to be read multiple times
621  data::File::ConsumeReader reader1 = join_file1_->GetConsumeReader();
622 
623  while (reader1.HasNext()) {
624 
625  for (size_t i = 0; i < capacity && reader1.HasNext() &&
626  !mem::memory_exceeded; ++i) {
627  temp_vec.push_back(reader1.template Next<InputTypeFirst>());
628  }
629 
630  data::File::Reader reader2 = join_file2_->GetReader(/* consume */ false);
631 
632  while (reader2.HasNext()) {
633  InputTypeSecond join2 = reader2.template Next<InputTypeSecond>();
634  for (const InputTypeFirst& join1 : temp_vec) {
635  assert(key_extractor1_(join1) == key_extractor2_(join2));
636  this->PushItem(join_function_(join1, join2));
637  }
638  }
639  temp_vec.clear();
640  }
641 
642  //! non-consuming reader, need to clear now
643  join_file2_->Clear();
644  }
645  }
646 
647  /*!
648  * Sorts all elements in a vector and writes them to a file.
649  */
650  template <typename ItemType, typename KeyExtractor>
652  std::vector<ItemType>& vec, std::deque<data::File>& files,
653  const KeyExtractor& key_extractor) {
654 
655  // advise block pool to write out data if necessary
656  context_.block_pool().AdviseFree(vec.size() * sizeof(ValueType));
657 
658  std::sort(vec.begin(), vec.end(),
659  [&key_extractor](const ItemType& i1, const ItemType& i2) {
660  return key_extractor(i1) < key_extractor(i2);
661  });
662 
663  files.emplace_back(context_.GetFile(this));
664  auto writer = files.back().GetWriter();
665  for (const ItemType& elem : vec) {
666  writer.Put(elem);
667  }
668  writer.Close();
669 
670  vec.clear();
671  }
672 };
673 
674 /*!
675  * Performs an inner join between this DIA and the DIA given in the first
676  * parameter. The key from each DIA element is hereby extracted with a key
677  * extractor function. All pairs of elements with equal keys from both DIAs are
678  * then joined with the join function.
679  *
680  * \tparam KeyExtractor1 Type of the key_extractor1 function. This is a function
681  * from FirstDIA::ValueType to the key type.
682  *
683  * \tparam KeyExtractor2 Type of the key_extractor2 function. This is a function
684  * from SecondDIA::ValueType to the key type.
685  *
686  * \tparam JoinFunction Type of the join_function. This is a function from
687  * ValueType and SecondDIA::ValueType to the type of the output DIA.
688  *
689  * \param first_dia First DIA to join.
690  *
691  * \param second_dia Second DIA to join.
692  *
693  * \param key_extractor1 Key extractor for this DIA
694  *
695  * \param key_extractor2 Key extractor for second DIA
696  *
697  * \param join_function Join function applied to all equal key pairs
698  *
699  * \param hash_function If necessary a hash funtion for Key
700  *
701  * \ingroup dia_dops
702  */
703 template <
704  bool LocationDetectionValue,
705  typename FirstDIA,
706  typename SecondDIA,
707  typename KeyExtractor1,
708  typename KeyExtractor2,
709  typename JoinFunction,
710  typename HashFunction =
711  std::hash<typename common::FunctionTraits<KeyExtractor1>::result_type> >
714  const FirstDIA& first_dia, const SecondDIA& second_dia,
715  const KeyExtractor1& key_extractor1, const KeyExtractor2& key_extractor2,
716  const JoinFunction& join_function,
717  const HashFunction& hash_function = HashFunction()) {
718 
719  assert(first_dia.IsValid());
720  assert(second_dia.IsValid());
721 
722  static_assert(
723  std::is_convertible<
724  typename FirstDIA::ValueType,
725  typename common::FunctionTraits<KeyExtractor1>::template arg<0>
726  >::value,
727  "Key Extractor 1 has the wrong input type");
728 
729  static_assert(
730  std::is_convertible<
731  typename SecondDIA::ValueType,
732  typename common::FunctionTraits<KeyExtractor2>::template arg<0>
733  >::value,
734  "Key Extractor 2 has the wrong input type");
735 
736  static_assert(
737  std::is_convertible<
738  typename common::FunctionTraits<KeyExtractor1>::result_type,
739  typename common::FunctionTraits<KeyExtractor2>::result_type
740  >::value,
741  "Keys have different types");
742 
743  static_assert(
744  std::is_convertible<
745  typename FirstDIA::ValueType,
746  typename common::FunctionTraits<JoinFunction>::template arg<0>
747  >::value,
748  "Join Function has wrong input type in argument 0");
749 
750  static_assert(
751  std::is_convertible<
752  typename SecondDIA::ValueType,
753  typename common::FunctionTraits<JoinFunction>::template arg<1>
754  >::value,
755  "Join Function has wrong input type in argument 1");
756 
757  using JoinResult
758  = typename common::FunctionTraits<JoinFunction>::result_type;
759 
760  using JoinNode = api::JoinNode<
761  JoinResult, FirstDIA, SecondDIA, KeyExtractor1, KeyExtractor2,
762  JoinFunction, HashFunction, LocationDetectionValue>;
763 
764  auto node = tlx::make_counting<JoinNode>(
765  first_dia, second_dia, key_extractor1, key_extractor2, join_function,
766  hash_function);
767 
768  return DIA<JoinResult>(node);
769 }
770 
771 /*!
772  * Performs an inner join between this DIA and the DIA given in the first
773  * parameter. The key from each DIA element is hereby extracted with a key
774  * extractor function. All pairs of elements with equal keys from both DIAs are
775  * then joined with the join function.
776  *
777  * \tparam KeyExtractor1 Type of the key_extractor1 function. This is a function
778  * from FirstDIA::ValueType to the key type.
779  *
780  * \tparam KeyExtractor2 Type of the key_extractor2 function. This is a function
781  * from SecondDIA::ValueType to the key type.
782  *
783  * \tparam JoinFunction Type of the join_function. This is a function from
784  * ValueType and SecondDIA::ValueType to the type of the output DIA.
785  *
786  * \param first_dia First DIA to join.
787  *
788  * \param second_dia Second DIA to join.
789  *
790  * \param key_extractor1 Key extractor for this DIA
791  *
792  * \param key_extractor2 Key extractor for second DIA
793  *
794  * \param join_function Join function applied to all equal key pairs
795  *
796  * \param hash_function If necessary a hash funtion for Key
797  *
798  * \ingroup dia_dops
799  */
800 template <
801  typename FirstDIA,
802  typename SecondDIA,
803  typename KeyExtractor1,
804  typename KeyExtractor2,
805  typename JoinFunction,
806  typename HashFunction =
807  std::hash<typename common::FunctionTraits<KeyExtractor1>::result_type> >
809  const FirstDIA& first_dia, const SecondDIA& second_dia,
810  const KeyExtractor1& key_extractor1, const KeyExtractor2& key_extractor2,
811  const JoinFunction& join_function,
812  const HashFunction& hash_function = HashFunction()) {
813  // forward to method _with_ location detection ON
814  return InnerJoin(
816  first_dia, second_dia, key_extractor1, key_extractor2,
817  join_function, hash_function);
818 }
819 
820 //! \}
821 
822 } // namespace api
823 
824 //! imported from api namespace
825 using api::InnerJoin;
826 
827 } // namespace thrill
828 
829 #endif // !THRILL_API_INNER_JOIN_HEADER
830 
831 /******************************************************************************/
std::pair< bool, bool > AddEqualKeysToFile(MergeTree &puller, const KeyExtractor &key_extractor, data::File::Writer &writer, const Key &key)
Adds all elements from merge tree to a data::File, potentially to external memory, afterwards sets the first_element pointer to the first element with a different key.
Definition: inner_join.hpp:437
void StartPrefetch(std::vector< Reader > &readers, size_t prefetch_size)
Take a vector of Readers and prefetch equally from them.
Definition: file.hpp:585
bool HasNext()
HasNext() returns true if at least one more item is available.
DIAMemUse PushDataMemUse() final
Amount of RAM used by PushData()
Definition: inner_join.hpp:489
DIAMemUse ExecuteMemUse() final
Amount of RAM used by Execute()
Definition: inner_join.hpp:485
data::MixStream::Writers hash_writers1_
Definition: inner_join.hpp:309
static DIAMemUse Max()
Definition: dia_base.hpp:60
data::File pre_file1_
location detection and associated files
Definition: inner_join.hpp:314
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
bool operator<(const HashCount &b) const
Definition: inner_join.hpp:107
Description of the amount of RAM the internal data structures of a DIANode require.
Definition: dia_base.hpp:51
void ReadBits(BitReader &reader)
Read count and dia_mask from BitReader.
Definition: inner_join.hpp:118
JoinNode(const FirstDIA &parent1, const SecondDIA &parent2, const KeyExtractor1 &key_extractor1, const KeyExtractor2 &key_extractor2, const JoinFunction &join_function, const HashFunction &hash_function)
Constructor for a JoinNode.
Definition: inner_join.hpp:135
data::File::Writer pre_writer1_
Definition: inner_join.hpp:315
data::FilePtr join_file2_
Definition: inner_join.hpp:564
void AdviseFree(size_t size)
const struct LocationDetectionFlag< true > LocationDetectionTag
global const LocationDetectionFlag instance
Definition: dia.hpp:122
JoinFunction join_function_
Definition: inner_join.hpp:304
data::File::Writer pre_writer2_
Definition: inner_join.hpp:317
std::pair< size_t, size_t > MaxMergeDegreePrefetch(size_t num_files)
Definition: block_pool.cpp:703
#define LOG1
Definition: logger.hpp:28
auto InnerJoin(const LocationDetectionFlag< LocationDetectionValue > &, const FirstDIA &first_dia, const SecondDIA &second_dia, const KeyExtractor1 &key_extractor1, const KeyExtractor2 &key_extractor2, const JoinFunction &join_function, const HashFunction &hash_function=HashFunction())
Performs an inner join between this DIA and the DIA given in the first parameter. ...
Definition: inner_join.hpp:712
A File is an ordered sequence of Block objects for storing items.
Definition: file.hpp:56
static constexpr bool debug
Definition: inner_join.hpp:66
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
bool memory_exceeded
memory limit exceeded indicator
core::LocationDetection< HashCount > location_detection_
Definition: inner_join.hpp:319
#define sLOG1
Definition: logger.hpp:38
void ReceiveItems(size_t capacity, data::MixStream::MixReader &reader, std::deque< data::File > &files, const KeyExtractor &key_extractor)
Recieve all elements from a stream and write them to files sorted by key.
Definition: inner_join.hpp:497
void PreOp2(const InputTypeSecond &input)
Definition: inner_join.hpp:333
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
Definition: inner_join.hpp:291
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
Definition: inner_join.hpp:215
KeyExtractor2 key_extractor2_
Definition: inner_join.hpp:303
typename FirstDIA::ValueType InputTypeFirst
Definition: inner_join.hpp:71
void JoinAllElements(const std::vector< InputTypeFirst > &vec1, bool external1, const std::vector< InputTypeSecond > &vec2, bool external2)
Joins all elements in cartesian product of both vectors.
Definition: inner_join.hpp:571
void WriteBits(BitWriter &writer) const
Write count and dia_mask to BitWriter.
Definition: inner_join.hpp:125
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
Definition: dia_node.hpp:147
Reader to retrieve items in unordered sequence from a MixBlockQueue.
ConsumeReader GetConsumeReader(size_t prefetch_size=File::default_prefetch_size_)
Get consuming BlockReader for beginning of File.
Definition: file.cpp:73
DIAMemUse PreOpMemUse() final
Amount of RAM used by PreOp after StartPreOp()
Definition: inner_join.hpp:453
CompareFunction auto MakePuller(std::deque< data::File > &files, std::vector< data::File::Reader > &seq, CompareFunction compare_function, bool consume)
Definition: inner_join.hpp:198
void MainOp()
Receive elements from other workers, create pre-sorted files.
Definition: inner_join.hpp:345
data::MixStreamPtr hash_stream2_
Definition: inner_join.hpp:310
virtual void Execute()=0
Virtual execution method. Triggers actual computation in sub-classes.
int value
Definition: gen_data.py:41
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
StreamData::Writers Writers
Definition: stream.hpp:40
KeyExtractor1 key_extractor1_
user-defined functions
Definition: inner_join.hpp:302
std::vector< size_t > parent_ids() const
Returns the parents of this DIABase.
Definition: dia_base.hpp:258
std::deque< data::File > files2_
Definition: inner_join.hpp:299
tlx::counting_ptr< file > file_ptr
A reference counting pointer for file.
Definition: file.hpp:265
data::FilePtr GetFilePtr(size_t dia_id)
Definition: context.cpp:1142
void StartPreOp(size_t parent_index) final
Virtual method for preparing start of PushData.
Definition: inner_join.hpp:457
static IntegerType AddTruncToType(const IntegerType &a, const IntegerType &b)
Definition: math.hpp:31
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
Definition: context.hpp:283
data::MixStreamPtr GetNewMixStream(size_t dia_id)
Definition: context.cpp:1159
HashCount operator+(const HashCount &b) const
Definition: inner_join.hpp:91
DIAMemUse mem_limit_
Definition: dia_base.hpp:314
void vector_free(std::vector< Type > &v)
Definition: vector_free.hpp:21
A DOpNode is a typed node representing and distributed operations in Thrill.
Definition: dop_node.hpp:32
bool location_detection_initialized_
Definition: inner_join.hpp:320
typename SecondDIA::ValueType InputTypeSecond
Definition: inner_join.hpp:72
hash counter used by LocationDetection
Definition: inner_join.hpp:78
HashCount & operator+=(const HashCount &b)
Definition: inner_join.hpp:100
data::MixStreamPtr hash_stream1_
data streams for inter-worker communication of DIA elements
Definition: inner_join.hpp:308
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
std::pair< bool, bool > AddEqualKeysToVec(std::vector< ItemType > &vec, MergeTree &puller, const KeyExtractor &key_extractor, data::FilePtr &file_ptr)
Adds all elements from merge tree to a vector, afterwards sets the first_element pointer to the first...
Definition: inner_join.hpp:382
typename common::FunctionTraits< KeyExtractor1 >::result_type Key
Key type of join. must be equal to the other key extractor.
Definition: inner_join.hpp:75
data::FilePtr join_file1_
Definition: inner_join.hpp:563
void StopPreOp(size_t parent_index) final
Virtual method for preparing end of PushData.
Definition: inner_join.hpp:474
void Close()
Explicitly close the writer.
static constexpr size_t counter_bits_
Definition: inner_join.hpp:89
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:251
tag structure for GroupByKey(), and InnerJoin()
Definition: dia.hpp:116
void SortAndWriteToFile(std::vector< ItemType > &vec, std::deque< data::File > &files, const KeyExtractor &key_extractor)
Sorts all elements in a vector and writes them to a file.
Definition: inner_join.hpp:651
data::MixStream::Writers hash_writers2_
Definition: inner_join.hpp:311
std::deque< data::File > files1_
files for sorted datasets
Definition: inner_join.hpp:298
void PreOp1(const InputTypeFirst &input)
Definition: inner_join.hpp:322
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
HashFunction hash_function_
Definition: inner_join.hpp:305
Writer GetWriter(size_t block_size=default_block_size)
Get BlockWriter.
Definition: file.cpp:63
Context & context_
associated Context
Definition: dia_base.hpp:293
data::BlockPool & block_pool()
the block manager keeps all data blocks moving through the system.
Definition: context.hpp:324
const size_t & dia_id() const
return unique id of DIANode subclass as stored by StatsNode
Definition: dia_base.hpp:213
void MergeFiles(std::deque< data::File > &files, CompareFunction compare_function)
Merge files when there are too many for the merge tree to handle.
Definition: inner_join.hpp:521
Performs an inner join between two DIAs.
Definition: inner_join.hpp:63