Thrill  0.1
k-means_step6.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * examples/tutorial/k-means_step6.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 //! \example examples/tutorial/k-means_step6.cpp
12 //!
13 //! This example is part of the k-means tutorial. See \ref kmeans_tutorial_step6
14 
16 #include <thrill/api/cache.hpp>
17 #include <thrill/api/generate.hpp>
18 #include <thrill/api/print.hpp>
21 #include <thrill/api/sample.hpp>
23 
24 // Boost Spirit Qi is a header-only library
25 #include <boost/spirit/include/qi.hpp>
26 
27 #include <ostream>
28 #include <random>
29 #include <sstream>
30 #include <string>
31 #include <vector>
32 
33 //! A 2-dimensional point with double precision
34 struct Point {
35  //! point coordinates
36  double x, y;
37 
38  double DistanceSquare(const Point& b) const {
39  return (x - b.x) * (x - b.x) + (y - b.y) * (y - b.y);
40  }
41  Point operator + (const Point& b) const {
42  return Point { x + b.x, y + b.y };
43  }
44  Point operator / (double s) const {
45  return Point { x / s, y / s };
46  }
47 };
48 
49 //! make ostream-able for Print()
50 std::ostream& operator << (std::ostream& os, const Point& p) {
51  return os << '(' << p.x << ',' << p.y << ')';
52 }
53 
54 //! Assignment of a point to a cluster.
55 struct ClosestCenter {
56  size_t cluster_id;
57  Point point;
58  size_t count;
59 };
60 //! make ostream-able for Print()
61 std::ostream& operator << (std::ostream& os, const ClosestCenter& cc) {
62  return os << '(' << cc.cluster_id
63  << ':' << cc.point << ':' << cc.count << ')';
64 }
65 
66 //! our main processing method
67 void Process(const thrill::DIA<Point>& points, const char* output) {
68 
69  // print out the points
70  // points.Print("points");
71 
72  // pick some initial random cluster centers
73  thrill::DIA<Point> centers = points.Sample(/* num_clusters */ 10);
74 
75  for (size_t iter = 0; iter < /* iterations */ 10; ++iter)
76  {
77  // collect centers in a local vector on each worker
78  std::vector<Point> local_centers = centers.AllGather();
79 
80  auto new_centers =
81  points
82  // calculate the closest center for each point
83  .Map(
84  [local_centers](const Point& p) {
85  double min_dist = p.DistanceSquare(local_centers[0]);
86  size_t cluster_id = 0;
87 
88  for (size_t i = 1; i < local_centers.size(); ++i) {
89  double dist = p.DistanceSquare(local_centers[i]);
90  if (dist < min_dist)
91  min_dist = dist, cluster_id = i;
92  }
93  return ClosestCenter { cluster_id, p, /* count */ 1 };
94  })
95  // new centers as the mean of all points associated with it
96  .ReduceByKey(
97  // key extractor: the cluster id
98  [](const ClosestCenter& cc) { return cc.cluster_id; },
99  // reduction: add points and the counter
100  [](const ClosestCenter& a, const ClosestCenter& b) {
101  return ClosestCenter {
102  a.cluster_id, a.point + b.point, a.count + b.count
103  };
104  })
105  .Map([](const ClosestCenter& cc) {
106  return cc.point / cc.count;
107  });
108 
109  // new_centers.Print("new_centers");
110 
111  // Collapse() is needed to fold lambda chain to DIA<Points>
112  centers = new_centers.Collapse();
113  }
114 
115  if (output) {
116  // write output as "x y" lines
117  centers
118  .Map([](const Point& p) {
119  return std::to_string(p.x) + " " + std::to_string(p.y);
120  })
121  .WriteLines(output);
122  }
123  else {
124  centers.Print("final centers");
125  }
126 }
127 
129  std::default_random_engine rng(std::random_device { } ());
130  std::uniform_real_distribution<double> dist(0.0, 1000.0);
131 
132  // generate 100 random points using uniform distribution
133  auto points =
134  Generate(
135  ctx, /* size */ 100,
136  [&](const size_t&) {
137  return Point { dist(rng), dist(rng) };
138  });
139  // Execute() is require due to lazy evaluation
140  return points.Cache().Execute();
141 }
142 
143 //! [step6 LoadPoints]
145 
146  // shorthand namespace
147  namespace qi = boost::spirit::qi;
148 
149  // load points from text file
150  auto points =
151  ReadLines(ctx, path)
152  .Map(
153  [](const std::string& input) {
154  // parse "<x> <y>" lines
155  Point p;
156  std::string::const_iterator begin = input.begin(), end = input.end();
157 
158  qi::phrase_parse(
159  begin, end, // iterators
160  qi::double_ >> qi::double_, // parser grammar: two doubles
161  qi::ascii::space, // skip grammar: spaces
162  p.x, p.y); // put directly into the Point
163 
164  if (begin != end) // check that fully parsed
165  die("Could not parse point coordinates: " << input);
166  return p;
167  });
168  return points.Cache();
169 }
170 //! [step6 LoadPoints]
171 
172 int main(int argc, char* argv[]) {
173  // launch Thrill program: the lambda function will be run on each worker.
174  return thrill::Run(
175  [&](thrill::Context& ctx) {
176  if (argc == 1)
177  Process(GeneratePoints(ctx), nullptr);
178  else if (argc == 2)
179  Process(LoadPoints(ctx, argv[1]), nullptr);
180  else if (argc == 3)
181  Process(LoadPoints(ctx, argv[1]), argv[2]);
182  else
183  std::cerr << "Usage: " << argv[0]
184  << " [points] [output]" << std::endl;
185  });
186 }
187 
188 /******************************************************************************/
std::ostream & operator<<(std::ostream &os, const Point &p)
make ostream-able for Print()
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
thrill::DIA< Point > GeneratePoints(thrill::Context &ctx)
std::vector< ValueType > AllGather() const
Returns the whole DIA in an std::vector on each worker.
Definition: all_gather.hpp:114
auto Generate(Context &ctx, size_t size, const GenerateFunction &generate_function)
Generate is a Source-DOp, which creates a DIA of given size using a generator function.
Definition: generate.hpp:87
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
Definition: context.cpp:947
thrill::common::Vector< D, double > Point
Compile-Time Fixed-Dimensional Points.
Definition: k-means.hpp:39
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
void Process(const thrill::DIA< Point > &points, const char *output)
our main processing method
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
thrill::DIA< Point > LoadPoints(thrill::Context &ctx, const char *path)
[step6 LoadPoints]
DIA< std::string > ReadLines(Context &ctx, const std::string &filepath)
ReadLines is a DOp, which reads a file from the file system and creates an ordered DIA according to a...
Definition: read_lines.hpp:454
static by_string to_string(int val)
convert to string
auto Sample(size_t sample_size) const
Select up to sample_size items uniformly at random and return a new DIA<T>.
Definition: sample.hpp:247
void Print(const std::string &name=std::string()) const
Print is an Action, which collects all data of the DIA at the worker 0 and prints using ostream seria...
Definition: print.hpp:50
int main(int argc, char *argv[])
[step6 LoadPoints]
list x
Definition: gen_data.py:39
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
auto Map(const MapFunction &map_function) const
Map applies map_function : to each item of a DIA and delivers a new DIA contains the returned values...
Definition: dia.hpp:358
DIA< ValueType > Collapse() const
Create a CollapseNode which is mainly used to collapse the LOp chain into a DIA<T> with an empty stac...
Definition: collapse.hpp:159