Thrill  0.1
percentiles.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * examples/percentiles/percentiles.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Alexander Noe <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #include <thrill/api/cache.hpp>
12 #include <thrill/api/dia.hpp>
15 #include <thrill/api/size.hpp>
16 #include <thrill/api/sort.hpp>
17 #include <thrill/api/sum.hpp>
18 #include <thrill/common/string.hpp>
19 #include <tlx/cmdline_parser.hpp>
20 
21 #include <tlx/string/split.hpp>
22 
23 #include <algorithm>
24 #include <ctime>
25 #include <string>
26 #include <utility>
27 #include <vector>
28 
29 using namespace thrill; // NOLINT
30 
31 void Percentiles(api::Context& ctx, const std::string& input_path) {
32 
33  const bool use_detection = false;
34 
35  std::vector<std::string> splitted;
36 
37  auto temps =
38  ReadLines(ctx, input_path)
39  .FlatMap<std::pair<time_t, double> >(
40  [&splitted](const std::string& input, auto emit) {
41  tlx::split(&splitted, ',', input);
42  if (splitted[0] != "time") {
43  time_t timet = std::stod(splitted[0]);
44  struct tm* tmstr = std::localtime(&timet);
45  double temp = std::stod(splitted[1]);
46  size_t time = 24 * tmstr->tm_yday + tmstr->tm_hour;
47  emit(std::make_pair(time, temp));
48  }
49  })
50  .Cache().Execute();
51 
52  auto median_fn =
53  [](auto& r, std::size_t) {
54  std::vector<double> all;
55  size_t time = 0;
56  while (r.HasNext()) {
57  auto next = r.Next();
58  all.push_back(next.second);
59  time = next.first;
60  }
61  std::sort(std::begin(all), std::end(all));
62  return std::make_pair(time, all[all.size() / 2 - 1]);
63  };
64 
65  auto time_keyfn =
66  [](std::pair<time_t, double> input) {
67  return input.first;
68  };
69 
70  // group by to compute median
71  ctx.net.Barrier();
73  temps.GroupByKey<std::pair<size_t, double> >(
75  time_keyfn, median_fn).Size();
76 
77  ctx.net.Barrier();
78  timer.Stop();
79 
80  if (ctx.my_rank() == 0) {
81  if (use_detection) {
82  LOG1 << "RESULT " << "benchmark=median " << "detection=ON"
83  << " time=" << timer
84  << " traffic=" << ctx.net_manager().Traffic()
85  << " hosts=" << ctx.num_hosts();
86  }
87  else {
88  LOG1 << "RESULT " << "benchmark=median " << "detection=OFF"
89  << " time=" << timer
90  << " traffic=" << ctx.net_manager().Traffic()
91  << " hosts=" << ctx.num_hosts();
92  }
93  }
94 }
95 
96 int main(int argc, char* argv[]) {
97 
99 
100  std::string input_path;
101  clp.add_param_string("input", input_path,
102  "input file pattern");
103 
104  if (!clp.process(argc, argv)) {
105  return -1;
106  }
107 
108  clp.print_result();
109 
110  return api::Run([&input_path](api::Context& ctx) {
111  return Percentiles(ctx, input_path);
112  });
113 }
114 
115 /******************************************************************************/
net::FlowControlChannel & net
Definition: context.hpp:446
size_t num_hosts() const
Returns the total number of hosts.
Definition: context.hpp:233
#define LOG1
Definition: logger.hpp:28
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
Definition: context.cpp:947
void Barrier()
A trivial global barrier.
int main(int argc, char *argv[])
Definition: percentiles.cpp:96
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
DIA< std::string > ReadLines(Context &ctx, const std::string &filepath)
ReadLines is a DOp, which reads a file from the file system and creates an ordered DIA according to a...
Definition: read_lines.hpp:454
std::vector< std::string > split(char sep, const std::string &str, std::string::size_type limit)
Split the given string at each separator character into distinct substrings.
Definition: split.cpp:20
void print_result(std::ostream &os)
print nicely formatted result of processing
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
Command line parser which automatically fills variables and prints nice usage messages.
void add_param_string(const std::string &name, std::string &dest, const std::string &desc)
add string parameter [name] with description and store to dest
net::Traffic Traffic() const
calculate overall traffic for final stats
Definition: group.cpp:67
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
void Percentiles(api::Context &ctx, const std::string &input_path)
Definition: percentiles.cpp:31
net::Manager & net_manager()
Definition: context.hpp:334
tag structure for GroupByKey(), and InnerJoin()
Definition: dia.hpp:116
bool process(int argc, const char *const *argv, std::ostream &os)