35 struct PageRankLineParser {
39 unsigned long src = std::strtoul(input.c_str(), &endptr, 10);
41 "Could not parse src tgt line");
42 unsigned long tgt = std::strtoul(endptr + 1, &endptr, 10);
44 "Could not parse src tgt line");
51 const std::vector<std::string>& input_path,
const std::string& output_path,
65 .Map(PageRankLineParser());
72 auto number_edges_future = input.Keep().SizeFuture();
79 auto links = input.template GroupToIndex<OutgoingLinks>(
81 [all = std::vector<PageId> ()](
auto& r,
const PageId&)
mutable {
84 all.push_back(r.Next().tgt);
92 auto ranks =
PageRank(links, num_pages, iterations);
96 if (output_path.size()) {
102 .WriteLines(output_path);
111 size_t number_edges = number_edges_future();
114 LOG1 <<
"FINISHED PAGERANK COMPUTATION";
115 LOG1 <<
"#pages: " << num_pages;
116 LOG1 <<
"#edges: " << number_edges;
117 LOG1 <<
"#iterations: " << iterations;
118 LOG1 <<
"time: " << timer <<
"s";
124 const std::vector<std::string>& input_path,
const std::string& output_path,
130 const bool UseLocationDetection =
true;
140 .Map(PageRankLineParser());
153 [all = std::vector<PageId> ()](
auto& r,
const PageId& pid)
mutable {
155 while (r.HasNext()) {
156 all.push_back(r.Next().tgt);
158 return std::make_pair(pid, all);
159 }).Cache().KeepForever();
163 auto ranks = PageRankJoin<UseLocationDetection>(
164 links, num_pages, iterations);
168 if (output_path.size()) {
171 }).WriteLines(output_path);
180 if (UseLocationDetection) {
181 LOG1 <<
"RESULT benchmark=pagerank_gen detection=ON" 182 <<
" pages=" << num_pages
183 <<
" iterations=" << iterations
189 LOG1 <<
"RESULT benchmark=pagerank_gen detection=OFF" 190 <<
" pages=" << num_pages
191 <<
" iterations=" << iterations
202 const std::string& output_path,
size_t iterations) {
208 if (!common::from_str<size_t>(input_path, num_pages))
209 die(
"For generated graph data, set input_path to the number of pages.");
214 rng = std::default_random_engine(std::random_device { } ())](
216 return graph_gen.GenerateOutgoing(rng);
221 links.Keep().Map([](
const OutgoingLinks& ol) {
return ol.size(); }).Sum();
225 auto ranks =
PageRank(links, num_pages, iterations);
229 if (output_path.size()) {
235 .WriteLines(output_path);
245 <<
" benchmark=pagerank_gen" 246 <<
" pages=" << num_pages
247 <<
" edges=" << number_edges
248 <<
" iterations=" << iterations
257 const std::string& output_path,
size_t iterations) {
261 const bool UseLocationDetection =
true;
264 if (!common::from_str<size_t>(input_path, num_pages))
265 die(
"For generated graph data, set input_path to the number of pages.");
270 rng = std::default_random_engine(std::random_device { } ())](
271 size_t index)
mutable {
272 return std::make_pair(index, graph_gen.GenerateOutgoing(rng));
273 }).Cache().KeepForever();
277 auto ranks = PageRankJoin<UseLocationDetection>(
278 links, num_pages, iterations);
282 if (output_path.size()) {
285 }).WriteLines(output_path);
294 if (UseLocationDetection) {
295 LOG1 <<
"RESULT benchmark=pagerank_gen detection=ON" 296 <<
" pages=" << num_pages
302 LOG1 <<
"RESULT benchmark=pagerank_gen detection=OFF" 303 <<
" pages=" << num_pages
311 int main(
int argc,
char* argv[]) {
315 bool generate =
false;
316 clp.
add_bool(
'g',
"generate", generate,
317 "generate graph data, set input = #pages");
318 bool use_join =
false;
320 "use Join() instead of *ByIndex()");
326 "generated: mean of number of outgoing links, " 330 "generated: variance of number of outgoing links, " 334 "generated: Zipf scale parameter for outgoing links, " 338 "generated: Zipf exponent parameter for outgoing links, " 343 "output file pattern");
346 clp.
add_size_t(
'n',
"iterations", iter,
"PageRank iterations, default: 10");
348 std::vector<std::string> input_path;
350 "input file pattern(s)");
352 if (!clp.
process(argc, argv)) {
358 die_unless(!generate || input_path.size() == 1);
362 if (generate && !use_join)
364 ctx, input_path[0], gg, output_path, iter);
365 else if (!generate && !use_join)
367 ctx, input_path, output_path, iter);
368 else if (generate && use_join)
370 ctx, input_path[0], gg, output_path, iter);
371 else if (!generate && use_join)
373 ctx, input_path, output_path, iter);
static void RunPageRankGenerated(api::Context &ctx, const std::string &input_path, const ZipfGraphGen &base_graph_gen, const std::string &output_path, size_t iterations)
static uint_pair max()
return an uint_pair instance containing the largest value possible
auto Generate(Context &ctx, size_t size, const GenerateFunction &generate_function)
Generate is a Source-DOp, which creates a DIA of given size using a generator function.
static void RunPageRankJoinGenerated(api::Context &ctx, const std::string &input_path, const ZipfGraphGen &base_graph_gen, const std::string &output_path, size_t iterations)
size_t num_hosts() const
Returns the total number of hosts.
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
std::pair< PageId, Rank > RankedPage
void add_size_t(char key, const std::string &longkey, size_t &dest, const std::string &desc)
add size_t option -key, –longkey with description and store to dest
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
void enable_consume(bool consume=true)
Sets consume-mode flag such that DIA contents may be consumed during PushData().
DIA< std::string > ReadLines(Context &ctx, const std::string &filepath)
ReadLines is a DOp, which reads a file from the file system and creates an ordered DIA according to a...
static by_string to_string(int val)
convert to string
static void RunJoinPageRankEdgePerLine(api::Context &ctx, const std::vector< std::string > &input_path, const std::string &output_path, size_t iterations)
std::vector< PageId > OutgoingLinks
double size_mean
Gaussian mean and variance of content length.
void print_result(std::ostream &os)
print nicely formatted result of processing
auto PageRank(const DIA< OutgoingLinks, InStack > &links, size_t num_pages, size_t iterations)
std::string ssprintf(const char *fmt,...)
Helper for return the result of a sprintf() call inside a std::string.
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
double link_zipf_exponent
Command line parser which automatically fills variables and prints nice usage messages.
static void RunPageRankEdgePerLine(api::Context &ctx, const std::vector< std::string > &input_path, const std::string &output_path, size_t iterations)
std::pair< PageId, OutgoingLinks > LinkedPage
void add_string(char key, const std::string &longkey, std::string &dest, const std::string &desc)
add string option -key, –longkey and store to dest
net::Traffic Traffic() const
calculate overall traffic for final stats
size_t my_rank() const
Global rank of this worker among all other workers in the system.
A pair (page source, page target)
void add_double(char key, const std::string &longkey, double &dest, const std::string &desc)
add double option -key, –longkey with description and store to dest
void add_param_stringlist(const std::string &name, std::vector< std::string > &dest, const std::string &desc)
void add_bool(char key, const std::string &longkey, bool &dest, const std::string &desc)
net::Manager & net_manager()
int main(int argc, char *argv[])
bool process(int argc, const char *const *argv, std::ostream &os)