Thrill  0.1
page_rank.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * examples/page_rank/page_rank.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Timo Bingmann <[email protected]>
7  * Copyright (C) 2016 Alexander Noe <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_EXAMPLES_PAGE_RANK_PAGE_RANK_HEADER
14 #define THRILL_EXAMPLES_PAGE_RANK_PAGE_RANK_HEADER
15 
16 #include <thrill/api/collapse.hpp>
17 #include <thrill/api/generate.hpp>
19 #include <thrill/api/print.hpp>
22 #include <thrill/api/size.hpp>
23 #include <thrill/api/zip.hpp>
24 #include <thrill/common/logger.hpp>
25 
27 
28 #include <algorithm>
29 #include <string>
30 #include <utility>
31 #include <vector>
32 
33 namespace examples {
34 namespace page_rank {
35 
36 using namespace thrill; // NOLINT
37 
38 static constexpr bool debug = false;
39 
40 static constexpr double dampening = 0.85;
41 
42 using PageId = std::size_t;
43 using Rank = double;
44 
45 //! A pair (page source, page target)
46 struct PagePageLink {
47  PageId src, tgt;
48 
49  friend std::ostream& operator << (std::ostream& os, const PagePageLink& a) {
50  return os << '(' << a.src << '>' << a.tgt << ')';
51  }
53 
54 //! A pair (page, rank)
55 struct PageRankPair {
58 
59  friend std::ostream& operator << (std::ostream& os, const PageRankPair& a) {
60  return os << '(' << a.page << '|' << a.rank << ')';
61  }
63 
64 using PageRankStdPair = std::pair<PageId, Rank>;
65 using OutgoingLinks = std::vector<PageId>;
66 using OutgoingLinksRank = std::pair<std::vector<PageId>, Rank>;
67 using LinkedPage = std::pair<PageId, OutgoingLinks>;
68 using RankedPage = std::pair<PageId, Rank>;
69 
70 template <typename InStack>
72  size_t num_pages, size_t iterations) {
73 
74  api::Context& ctx = links.context();
75  double num_pages_d = static_cast<double>(num_pages);
76 
77  // initialize all ranks to 1.0 / n: (url, rank)
78 
79  DIA<Rank> ranks =
80  Generate(
81  ctx, num_pages,
82  [num_pages_d](size_t) { return Rank(1.0) / num_pages_d; })
83  .Collapse();
84 
85  // do iterations
86  for (size_t iter = 0; iter < iterations; ++iter) {
87 
88  // for all outgoing link, get their rank contribution from all
89  // links by doing:
90  //
91  // 1) group all outgoing links with rank of its parent page: (Zip)
92  // ([linked_url, linked_url, ...], rank_parent)
93  //
94  // 2) compute rank contribution for each linked_url: (FlatMap)
95  // (linked_url, rank / outgoing.size)
96 
97  auto outs_rank = links.Zip(
98  ranks,
99  [](const OutgoingLinks& ol, const Rank& r) {
100  return OutgoingLinksRank(ol, r);
101  });
102 
103  if (debug) {
104  outs_rank
105  .Map([](const OutgoingLinksRank& ol) {
106  return tlx::join(',', ol.first)
107  + " <- " + std::to_string(ol.second);
108  })
109  .Print("outs_rank");
110  }
111 
112  auto contribs = outs_rank.template FlatMap<PageRankPair>(
113  [](const OutgoingLinksRank& p, auto emit) {
114  if (p.first.size() == 0)
115  return;
116 
117  Rank rank_contrib = p.second / static_cast<double>(p.first.size());
118  for (const PageId& tgt : p.first)
119  emit(PageRankPair { tgt, rank_contrib });
120  });
121 
122  // reduce all rank contributions by adding all rank contributions and
123  // compute the new rank: (url, rank)
124 
125  ranks =
126  contribs
127  .ReduceToIndex(
128  [](const PageRankPair& p) { return p.page; },
129  [](const PageRankPair& p1, const PageRankPair& p2) {
130  return PageRankPair { p1.page, p1.rank + p2.rank };
131  }, num_pages)
132  .Map([num_pages_d](const PageRankPair& p) {
133  return dampening * p.rank + (1 - dampening) / num_pages_d;
134  })
135  .Collapse();
136  }
137 
138  return ranks;
139 }
140 
141 template <const bool UseLocationDetection = false, typename InStack>
142 auto PageRankJoin(const DIA<LinkedPage, InStack>& links, size_t num_pages,
143  size_t iterations) {
144 
145  api::Context& ctx = links.context();
146  double num_pages_d = static_cast<double>(num_pages);
147 
148  // initialize all ranks to 1.0 / n: (url, rank)
149 
150  DIA<RankedPage> ranks =
151  Generate(
152  ctx, num_pages,
153  [num_pages_d](size_t idx) {
154  return std::make_pair(idx, Rank(1.0) / num_pages_d);
155  })
156  .Collapse();
157 
158  // do iterations
159  for (size_t iter = 0; iter < iterations; ++iter) {
160 
161  // for all outgoing link, get their rank contribution from all
162  // links by doing:
163  //
164  // 1) group all outgoing links with rank of its parent page: (Zip)
165  // ([linked_url, linked_url, ...], rank_parent)
166  //
167  // 2) compute rank contribution for each linked_url: (FlatMap)
168  // (linked_url, rank / outgoing.size)
169 
170  auto outs_rank = InnerJoin(
172  links, ranks,
173  [](const LinkedPage& lp) { return lp.first; },
174  [](const RankedPage& rp) { return rp.first; },
175  [](const LinkedPage& lp, const RankedPage& rp) {
176  return std::make_pair(lp.second, rp.second);
177  });
178 
179  if (debug) {
180  outs_rank
181  .Map([](const OutgoingLinksRank& ol) {
182  return tlx::join(',', ol.first)
183  + " <- " + std::to_string(ol.second);
184  })
185  .Print("outs_rank");
186  }
187 
188  auto contribs = outs_rank.template FlatMap<PageRankStdPair>(
189  [](const OutgoingLinksRank& p, auto emit) {
190  if (p.first.size() > 0) {
191  Rank rank_contrib = p.second / static_cast<double>(p.first.size());
192  for (const PageId& tgt : p.first)
193  emit(std::make_pair(tgt, rank_contrib));
194  }
195  });
196 
197  // reduce all rank contributions by adding all rank contributions and
198  // compute the new rank: (url, rank)
199 
200  ranks =
201  contribs
202  .ReducePair(
203  [](const Rank& p1, const Rank& p2) {
204  return p1 + p2;
205  })
206  .Map([num_pages_d](const PageRankStdPair& p) {
207  return std::make_pair(
208  p.first,
209  dampening * p.second + (1 - dampening) / num_pages_d);
210  }).Collapse();
211 
212  ranks.Execute();
213  }
214 
215  return ranks;
216 }
217 
218 } // namespace page_rank
219 } // namespace examples
220 
221 #endif // !THRILL_EXAMPLES_PAGE_RANK_PAGE_RANK_HEADER
222 
223 /******************************************************************************/
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
auto Generate(Context &ctx, size_t size, const GenerateFunction &generate_function)
Generate is a Source-DOp, which creates a DIA of given size using a generator function.
Definition: generate.hpp:87
auto ReducePair(const ReduceFunction &reduce_function, const ReduceConfig &reduce_config=ReduceConfig()) const
ReducePair is a DOp, which groups key-value-pairs in the input DIA by their key and reduces each key-...
std::size_t PageId
Definition: page_rank.hpp:42
auto ReduceToIndex(const KeyExtractor &key_extractor, const ReduceFunction &reduce_function, size_t size, const ValueType &neutral_element=ValueType(), const ReduceConfig &reduce_config=ReduceConfig()) const
ReduceToIndex is a DOp, which groups elements of the DIA with the key_extractor returning an unsigned...
Definition: bfs.hpp:21
Context & context() const
Return context_ of DIANode, e.g. for creating new LOps and DOps.
Definition: dia.hpp:273
std::pair< PageId, Rank > RankedPage
Definition: page_rank.hpp:68
auto InnerJoin(const LocationDetectionFlag< LocationDetectionValue > &, const FirstDIA &first_dia, const SecondDIA &second_dia, const KeyExtractor1 &key_extractor1, const KeyExtractor2 &key_extractor2, const JoinFunction &join_function, const HashFunction &hash_function=HashFunction())
Performs an inner join between this DIA and the DIA given in the first parameter. ...
Definition: inner_join.hpp:710
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
static by_string to_string(int val)
convert to string
struct examples::page_rank::PagePageLink TLX_ATTRIBUTE_PACKED
std::vector< PageId > OutgoingLinks
Definition: page_rank.hpp:65
static constexpr double dampening
Definition: page_rank.hpp:40
static constexpr bool debug
Definition: page_rank.hpp:38
std::pair< PageId, Rank > PageRankStdPair
Definition: page_rank.hpp:64
auto PageRank(const DIA< OutgoingLinks, InStack > &links, size_t num_pages, size_t iterations)
Definition: page_rank.hpp:71
std::pair< PageId, OutgoingLinks > LinkedPage
Definition: page_rank.hpp:67
auto Zip(const SecondDIA &second_dia, const ZipFunction &zip_function) const
Zips two DIAs of equal size in style of functional programming by applying zip_function to the i-th e...
Definition: zip.hpp:686
auto PageRankJoin(const DIA< LinkedPage, InStack > &links, size_t num_pages, size_t iterations)
Definition: page_rank.hpp:142
std::string join(char glue, const std::vector< std::string > &parts)
Join a vector of strings by some glue character between each pair from the sequence.
Definition: join.cpp:16
tag structure for GroupByKey(), and InnerJoin()
Definition: dia.hpp:116
std::pair< std::vector< PageId >, Rank > OutgoingLinksRank
Definition: page_rank.hpp:66
std::ostream & operator<<(std::ostream &os, const DIABase &d)
make ostream-able.
Definition: dia_base.cpp:449