Thrill  0.1
page_rank_run.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * examples/page_rank/page_rank_run.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Timo Bingmann <[email protected]>
7  * Copyright (C) 2016 Alexander Noe <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
14 
15 #include <thrill/api/cache.hpp>
18 #include <thrill/api/max.hpp>
20 #include <thrill/api/sum.hpp>
23 #include <thrill/common/logger.hpp>
25 #include <tlx/cmdline_parser.hpp>
26 
27 #include <algorithm>
28 #include <string>
29 #include <utility>
30 #include <vector>
31 
32 using namespace thrill; // NOLINT
33 using namespace examples::page_rank; // NOLINT
34 
35 struct PageRankLineParser {
36  PagePageLink operator () (const std::string& input) const {
37  // parse "source\ttarget\n" lines
38  char* endptr;
39  unsigned long src = std::strtoul(input.c_str(), &endptr, 10);
40  die_unless(endptr && *endptr == '\t' &&
41  "Could not parse src tgt line");
42  unsigned long tgt = std::strtoul(endptr + 1, &endptr, 10);
43  die_unless(endptr && *endptr == 0 &&
44  "Could not parse src tgt line");
45  return PagePageLink { src, tgt };
46  }
47 };
48 
50  api::Context& ctx,
51  const std::vector<std::string>& input_path, const std::string& output_path,
52  size_t iterations) {
53  ctx.enable_consume();
54 
56 
57  // read input file and create links in this format:
58  //
59  // url linked_url
60  // url linked_url
61  // url linked_url
62  // ...
63  auto input =
64  ReadLines(ctx, input_path)
65  .Map(PageRankLineParser());
66 
67  size_t num_pages =
68  input.Keep()
69  .Map([](const PagePageLink& ppl) { return std::max(ppl.src, ppl.tgt); })
70  .Max() + 1;
71 
72  auto number_edges_future = input.Keep().SizeFuture();
73 
74  // aggregate all outgoing links of a page in this format: by index
75  // ([linked_url, linked_url, ...])
76 
77  // group outgoing links from input file
78 
79  auto links = input.template GroupToIndex<OutgoingLinks>(
80  [](const PagePageLink& p) { return p.src; },
81  [all = std::vector<PageId> ()](auto& r, const PageId&) mutable {
82  all.clear();
83  while (r.HasNext()) {
84  all.push_back(r.Next().tgt);
85  }
86  return all;
87  },
88  num_pages).Cache();
89 
90  // perform actual page rank calculation iterations
91 
92  auto ranks = PageRank(links, num_pages, iterations);
93 
94  // construct output as "pageid: rank"
95 
96  if (output_path.size()) {
97  ranks.ZipWithIndex(
98  // generate index numbers: 0...num_pages-1
99  [](const Rank& r, const PageId& p) {
100  return tlx::ssprintf("%zu: %g", p, r);
101  })
102  .WriteLines(output_path);
103  }
104  else {
105  ranks.Execute();
106  }
107 
108  timer.Stop();
109 
110  // SizeFuture must be read by all workers.
111  size_t number_edges = number_edges_future();
112 
113  if (ctx.my_rank() == 0) {
114  LOG1 << "FINISHED PAGERANK COMPUTATION";
115  LOG1 << "#pages: " << num_pages;
116  LOG1 << "#edges: " << number_edges;
117  LOG1 << "#iterations: " << iterations;
118  LOG1 << "time: " << timer << "s";
119  }
120 }
121 
123  api::Context& ctx,
124  const std::vector<std::string>& input_path, const std::string& output_path,
125  size_t iterations) {
126  ctx.enable_consume();
127 
129 
130  const bool UseLocationDetection = true;
131 
132  // read input file and create links in this format:
133  //
134  // url linked_url
135  // url linked_url
136  // url linked_url
137  // ...
138  auto input =
139  ReadLines(ctx, input_path)
140  .Map(PageRankLineParser());
141 
142  size_t num_pages =
143  input.Keep()
144  .Map([](const PagePageLink& ppl) { return std::max(ppl.src, ppl.tgt); })
145  .Max() + 1;
146  // aggregate all outgoing links of a page in this format: by index
147  // ([linked_url, linked_url, ...])
148 
149  // group outgoing links from input file
150 
151  auto links = input.GroupByKey<LinkedPage>(
152  [](const PagePageLink& p) { return p.src; },
153  [all = std::vector<PageId> ()](auto& r, const PageId& pid) mutable {
154  all.clear();
155  while (r.HasNext()) {
156  all.push_back(r.Next().tgt);
157  }
158  return std::make_pair(pid, all);
159  }).Cache().KeepForever();
160 
161  // perform actual page rank calculation iterations
162 
163  auto ranks = PageRankJoin<UseLocationDetection>(
164  links, num_pages, iterations);
165 
166  // construct output as "pageid: rank"
167 
168  if (output_path.size()) {
169  ranks.Map([](const RankedPage& rp) {
170  return tlx::ssprintf("%zu: %g", rp.first, rp.second);
171  }).WriteLines(output_path);
172  }
173  else {
174  ranks.Execute();
175  }
176 
177  timer.Stop();
178 
179  if (ctx.my_rank() == 0) {
180  if (UseLocationDetection) {
181  LOG1 << "RESULT benchmark=pagerank_gen detection=ON"
182  << " pages=" << num_pages
183  << " iterations=" << iterations
184  << " time=" << timer
185  << " traffic= " << ctx.net_manager().Traffic()
186  << " hosts=" << ctx.num_hosts();
187  }
188  else {
189  LOG1 << "RESULT benchmark=pagerank_gen detection=OFF"
190  << " pages=" << num_pages
191  << " iterations=" << iterations
192  << " time=" << timer
193  << " traffic=" << ctx.net_manager().Traffic()
194  << " hosts=" << ctx.num_hosts();
195  }
196  }
197 }
198 
200  api::Context& ctx,
201  const std::string& input_path, const ZipfGraphGen& base_graph_gen,
202  const std::string& output_path, size_t iterations) {
203  ctx.enable_consume();
204 
206 
207  size_t num_pages;
208  if (!common::from_str<size_t>(input_path, num_pages))
209  die("For generated graph data, set input_path to the number of pages.");
210 
211  auto links = Generate(
212  ctx, num_pages,
213  [graph_gen = ZipfGraphGen(base_graph_gen, num_pages),
214  rng = std::default_random_engine(std::random_device { } ())](
215  size_t /* index */) mutable {
216  return graph_gen.GenerateOutgoing(rng);
217  })
218  .Cache();
219 
220  auto number_edges =
221  links.Keep().Map([](const OutgoingLinks& ol) { return ol.size(); }).Sum();
222 
223  // perform actual page rank calculation iterations
224 
225  auto ranks = PageRank(links, num_pages, iterations);
226 
227  // construct output as "pageid: rank"
228 
229  if (output_path.size()) {
230  ranks.ZipWithIndex(
231  // generate index numbers: 0...num_pages-1
232  [](const PageId& p, const Rank& r) {
233  return std::to_string(p) + ": " + std::to_string(r);
234  })
235  .WriteLines(output_path);
236  }
237  else {
238  ranks.Execute();
239  }
240 
241  timer.Stop();
242 
243  if (ctx.my_rank() == 0) {
244  LOG1 << "RESULT"
245  << " benchmark=pagerank_gen"
246  << " pages=" << num_pages
247  << " edges=" << number_edges
248  << " iterations=" << iterations
249  << " time=" << timer
250  << " hosts=" << ctx.num_hosts();
251  }
252 }
253 
255  api::Context& ctx,
256  const std::string& input_path, const ZipfGraphGen& base_graph_gen,
257  const std::string& output_path, size_t iterations) {
258  ctx.enable_consume();
259 
261  const bool UseLocationDetection = true;
262 
263  size_t num_pages;
264  if (!common::from_str<size_t>(input_path, num_pages))
265  die("For generated graph data, set input_path to the number of pages.");
266 
267  auto links = Generate(
268  ctx, num_pages,
269  [graph_gen = ZipfGraphGen(base_graph_gen, num_pages),
270  rng = std::default_random_engine(std::random_device { } ())](
271  size_t index) mutable {
272  return std::make_pair(index, graph_gen.GenerateOutgoing(rng));
273  }).Cache().KeepForever();
274 
275  // perform actual page rank calculation iterations
276 
277  auto ranks = PageRankJoin<UseLocationDetection>(
278  links, num_pages, iterations);
279 
280  // construct output as "pageid: rank"
281 
282  if (output_path.size()) {
283  ranks.Map([](const RankedPage& rp) {
284  return tlx::ssprintf("%zu: %g", rp.first, rp.second);
285  }).WriteLines(output_path);
286  }
287  else {
288  ranks.Execute();
289  }
290 
291  timer.Stop();
292 
293  if (ctx.my_rank() == 0) {
294  if (UseLocationDetection) {
295  LOG1 << "RESULT benchmark=pagerank_gen detection=ON"
296  << " pages=" << num_pages
297  << " time=" << timer
298  << " traffic= " << ctx.net_manager().Traffic()
299  << " hosts=" << ctx.num_hosts();
300  }
301  else {
302  LOG1 << "RESULT benchmark=pagerank_gen detection=OFF"
303  << " pages=" << num_pages
304  << " time=" << timer
305  << " traffic=" << ctx.net_manager().Traffic()
306  << " hosts=" << ctx.num_hosts();
307  }
308  }
309 }
310 
311 int main(int argc, char* argv[]) {
312 
313  tlx::CmdlineParser clp;
314 
315  bool generate = false;
316  clp.add_bool('g', "generate", generate,
317  "generate graph data, set input = #pages");
318  bool use_join = false;
319  clp.add_bool('j', "join", use_join,
320  "use Join() instead of *ByIndex()");
321 
322  // Graph Generator
323  ZipfGraphGen gg(1);
324 
325  clp.add_double(0, "size_mean", gg.size_mean,
326  "generated: mean of number of outgoing links, "
327  "default: " + std::to_string(gg.size_mean));
328 
329  clp.add_double(0, "size_var", gg.size_var,
330  "generated: variance of number of outgoing links, "
331  "default: " + std::to_string(gg.size_var));
332 
333  clp.add_double(0, "link_scale", gg.link_zipf_scale,
334  "generated: Zipf scale parameter for outgoing links, "
335  "default: " + std::to_string(gg.link_zipf_scale));
336 
337  clp.add_double(0, "link_exponent", gg.link_zipf_exponent,
338  "generated: Zipf exponent parameter for outgoing links, "
339  "default: " + std::to_string(gg.link_zipf_exponent));
340 
341  std::string output_path;
342  clp.add_string('o', "output", output_path,
343  "output file pattern");
344 
345  size_t iter = 10;
346  clp.add_size_t('n', "iterations", iter, "PageRank iterations, default: 10");
347 
348  std::vector<std::string> input_path;
349  clp.add_param_stringlist("input", input_path,
350  "input file pattern(s)");
351 
352  if (!clp.process(argc, argv)) {
353  return -1;
354  }
355 
356  clp.print_result();
357 
358  die_unless(!generate || input_path.size() == 1);
359 
360  return api::Run(
361  [&](api::Context& ctx) {
362  if (generate && !use_join)
363  return RunPageRankGenerated(
364  ctx, input_path[0], gg, output_path, iter);
365  else if (!generate && !use_join)
366  return RunPageRankEdgePerLine(
367  ctx, input_path, output_path, iter);
368  else if (generate && use_join)
370  ctx, input_path[0], gg, output_path, iter);
371  else if (!generate && use_join)
373  ctx, input_path, output_path, iter);
374  });
375 }
376 
377 /******************************************************************************/
static void RunPageRankGenerated(api::Context &ctx, const std::string &input_path, const ZipfGraphGen &base_graph_gen, const std::string &output_path, size_t iterations)
static uint_pair max()
return an uint_pair instance containing the largest value possible
Definition: uint_types.hpp:226
auto Generate(Context &ctx, size_t size, const GenerateFunction &generate_function)
Generate is a Source-DOp, which creates a DIA of given size using a generator function.
Definition: generate.hpp:87
#define die_unless(X)
Definition: die.hpp:27
static void RunPageRankJoinGenerated(api::Context &ctx, const std::string &input_path, const ZipfGraphGen &base_graph_gen, const std::string &output_path, size_t iterations)
std::size_t PageId
Definition: page_rank.hpp:42
size_t num_hosts() const
Returns the total number of hosts.
Definition: context.hpp:233
#define LOG1
Definition: logger.hpp:28
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
Definition: context.cpp:947
std::pair< PageId, Rank > RankedPage
Definition: page_rank.hpp:68
void add_size_t(char key, const std::string &longkey, size_t &dest, const std::string &desc)
add size_t option -key, –longkey with description and store to dest
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
void enable_consume(bool consume=true)
Sets consume-mode flag such that DIA contents may be consumed during PushData().
Definition: context.hpp:388
DIA< std::string > ReadLines(Context &ctx, const std::string &filepath)
ReadLines is a DOp, which reads a file from the file system and creates an ordered DIA according to a...
Definition: read_lines.hpp:454
static by_string to_string(int val)
convert to string
static void RunJoinPageRankEdgePerLine(api::Context &ctx, const std::vector< std::string > &input_path, const std::string &output_path, size_t iterations)
std::vector< PageId > OutgoingLinks
Definition: page_rank.hpp:65
double size_mean
Gaussian mean and variance of content length.
void print_result(std::ostream &os)
print nicely formatted result of processing
auto PageRank(const DIA< OutgoingLinks, InStack > &links, size_t num_pages, size_t iterations)
Definition: page_rank.hpp:71
std::string ssprintf(const char *fmt,...)
Helper for return the result of a sprintf() call inside a std::string.
Definition: ssprintf.cpp:18
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
Command line parser which automatically fills variables and prints nice usage messages.
static void RunPageRankEdgePerLine(api::Context &ctx, const std::vector< std::string > &input_path, const std::string &output_path, size_t iterations)
std::pair< PageId, OutgoingLinks > LinkedPage
Definition: page_rank.hpp:67
void add_string(char key, const std::string &longkey, std::string &dest, const std::string &desc)
add string option -key, –longkey and store to dest
net::Traffic Traffic() const
calculate overall traffic for final stats
Definition: group.cpp:67
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
void add_double(char key, const std::string &longkey, double &dest, const std::string &desc)
add double option -key, –longkey with description and store to dest
void add_param_stringlist(const std::string &name, std::vector< std::string > &dest, const std::string &desc)
void add_bool(char key, const std::string &longkey, bool &dest, const std::string &desc)
net::Manager & net_manager()
Definition: context.hpp:334
int main(int argc, char *argv[])
bool process(int argc, const char *const *argv, std::ostream &os)