Thrill  0.1
k-means_step5.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * examples/tutorial/k-means_step5.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 //! \example examples/tutorial/k-means_step5.cpp
12 //!
13 //! This example is part of the k-means tutorial. See \ref kmeans_tutorial_step5
14 
16 #include <thrill/api/cache.hpp>
17 #include <thrill/api/generate.hpp>
18 #include <thrill/api/print.hpp>
21 #include <thrill/api/sample.hpp>
23 
24 #include <ostream>
25 #include <random>
26 #include <sstream>
27 #include <string>
28 #include <vector>
29 
30 //! A 2-dimensional point with double precision
31 struct Point {
32  //! point coordinates
33  double x, y;
34 
35  double DistanceSquare(const Point& b) const {
36  return (x - b.x) * (x - b.x) + (y - b.y) * (y - b.y);
37  }
38  Point operator + (const Point& b) const {
39  return Point { x + b.x, y + b.y };
40  }
41  Point operator / (double s) const {
42  return Point { x / s, y / s };
43  }
44 };
45 
46 //! make ostream-able for Print()
47 std::ostream& operator << (std::ostream& os, const Point& p) {
48  return os << '(' << p.x << ',' << p.y << ')';
49 }
50 
51 //! Assignment of a point to a cluster.
52 struct ClosestCenter {
53  size_t cluster_id;
54  Point point;
55  size_t count;
56 };
57 //! make ostream-able for Print()
58 std::ostream& operator << (std::ostream& os, const ClosestCenter& cc) {
59  return os << '(' << cc.cluster_id
60  << ':' << cc.point << ':' << cc.count << ')';
61 }
62 
63 //! [step5 new Process signature]
64 //! our main processing method
65 void Process(const thrill::DIA<Point>& points, const char* output) {
66 //! [step5 new Process signature]
67 
68  // print out the points
69  // points.Print("points");
70 
71  // pick some initial random cluster centers
72  thrill::DIA<Point> centers = points.Sample(/* num_clusters */ 10);
73 
74  for (size_t iter = 0; iter < /* iterations */ 10; ++iter)
75  {
76  // collect centers in a local vector on each worker
77  std::vector<Point> local_centers = centers.AllGather();
78 
79  auto new_centers =
80  points
81  // calculate the closest center for each point
82  .Map(
83  [local_centers](const Point& p) {
84  double min_dist = p.DistanceSquare(local_centers[0]);
85  size_t cluster_id = 0;
86 
87  for (size_t i = 1; i < local_centers.size(); ++i) {
88  double dist = p.DistanceSquare(local_centers[i]);
89  if (dist < min_dist)
90  min_dist = dist, cluster_id = i;
91  }
92  return ClosestCenter { cluster_id, p, /* count */ 1 };
93  })
94  // new centers as the mean of all points associated with it
95  .ReduceByKey(
96  // key extractor: the cluster id
97  [](const ClosestCenter& cc) { return cc.cluster_id; },
98  // reduction: add points and the counter
99  [](const ClosestCenter& a, const ClosestCenter& b) {
100  return ClosestCenter {
101  a.cluster_id, a.point + b.point, a.count + b.count
102  };
103  })
104  .Map([](const ClosestCenter& cc) {
105  return cc.point / cc.count;
106  });
107 
108  // new_centers.Print("new_centers");
109 
110  // Collapse() is needed to fold lambda chain to DIA<Points>
111  centers = new_centers.Collapse();
112  }
113 
114  //! [step5 WriteLines output]
115  if (output) {
116  // write output as "x y" lines
117  centers
118  .Map([](const Point& p) {
119  return std::to_string(p.x) + " " + std::to_string(p.y);
120  })
121  .WriteLines(output);
122  }
123  else {
124  centers.Print("final centers");
125  }
126  //! [step5 WriteLines output]
127 }
128 
129 //! [step5 GeneratePoints]
131  std::default_random_engine rng(std::random_device { } ());
132  std::uniform_real_distribution<double> dist(0.0, 1000.0);
133 
134  // generate 100 random points using uniform distribution
135  auto points =
136  Generate(
137  ctx, /* size */ 100,
138  [&](const size_t&) {
139  return Point { dist(rng), dist(rng) };
140  });
141  // Execute() is require due to lazy evaluation
142  return points.Cache().Execute();
143 }
144 //! [step5 GeneratePoints]
145 
146 //! [step5 LoadPoints]
148  // load points from text file
149  auto points =
150  ReadLines(ctx, path)
151  .Map(
152  [](const std::string& input) {
153  // parse "<x> <y>" lines
154  std::istringstream iss(input);
155  Point p;
156  iss >> p.x >> p.y;
157  if (iss.peek() != EOF)
158  die("Could not parse point coordinates: " << input);
159  return p;
160  });
161  return points.Cache();
162 }
163 //! [step5 LoadPoints]
164 
165 //! [step5 Run launcher]
166 int main(int argc, char* argv[]) {
167  // launch Thrill program: the lambda function will be run on each worker.
168  return thrill::Run(
169  [&](thrill::Context& ctx) {
170  if (argc == 1)
171  Process(GeneratePoints(ctx), nullptr);
172  else if (argc == 2)
173  Process(LoadPoints(ctx, argv[1]), nullptr);
174  else if (argc == 3)
175  Process(LoadPoints(ctx, argv[1]), argv[2]);
176  else
177  std::cerr << "Usage: " << argv[0]
178  << " [points] [output]" << std::endl;
179  });
180 }
181 //! [step5 Run launcher]
182 
183 /******************************************************************************/
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
std::vector< ValueType > AllGather() const
Returns the whole DIA in an std::vector on each worker.
Definition: all_gather.hpp:114
auto Generate(Context &ctx, size_t size, const GenerateFunction &generate_function)
Generate is a Source-DOp, which creates a DIA of given size using a generator function.
Definition: generate.hpp:87
std::ostream & operator<<(std::ostream &os, const Point &p)
make ostream-able for Print()
thrill::DIA< Point > LoadPoints(thrill::Context &ctx, const char *path)
[step5 GeneratePoints]
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
Definition: context.cpp:947
thrill::common::Vector< D, double > Point
Compile-Time Fixed-Dimensional Points.
Definition: k-means.hpp:39
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
Definition: die.hpp:22
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
DIA< std::string > ReadLines(Context &ctx, const std::string &filepath)
ReadLines is a DOp, which reads a file from the file system and creates an ordered DIA according to a...
Definition: read_lines.hpp:454
static by_string to_string(int val)
convert to string
auto Sample(size_t sample_size) const
Select up to sample_size items uniformly at random and return a new DIA<T>.
Definition: sample.hpp:247
void Print(const std::string &name=std::string()) const
Print is an Action, which collects all data of the DIA at the worker 0 and prints using ostream seria...
Definition: print.hpp:50
void Process(const thrill::DIA< Point > &points, const char *output)
list x
Definition: gen_data.py:39
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
auto Map(const MapFunction &map_function) const
Map applies map_function : to each item of a DIA and delivers a new DIA contains the returned values...
Definition: dia.hpp:358
int main(int argc, char *argv[])
[step5 LoadPoints]
thrill::DIA< Point > GeneratePoints(thrill::Context &ctx)
[step5 GeneratePoints]
DIA< ValueType > Collapse() const
Create a CollapseNode which is mainly used to collapse the LOp chain into a DIA<T> with an empty stac...
Definition: collapse.hpp:159