35 double DistanceSquare(
const Point& b)
const {
36 return (x - b.x) * (x - b.x) + (y - b.y) * (y - b.y);
39 return Point { x + b.x, y + b.y };
41 Point operator / (
double s)
const {
42 return Point { x / s, y / s };
48 return os <<
'(' << p.x <<
',' << p.y <<
')';
52 struct ClosestCenter {
58 std::ostream&
operator << (std::ostream& os,
const ClosestCenter& cc) {
59 return os <<
'(' << cc.cluster_id
60 <<
':' << cc.point <<
':' << cc.count <<
')';
74 for (
size_t iter = 0; iter < 10; ++iter)
77 std::vector<Point> local_centers = centers.
AllGather();
83 [local_centers](
const Point& p) {
84 double min_dist = p.DistanceSquare(local_centers[0]);
85 size_t cluster_id = 0;
87 for (
size_t i = 1; i < local_centers.size(); ++i) {
88 double dist = p.DistanceSquare(local_centers[i]);
90 min_dist = dist, cluster_id = i;
92 return ClosestCenter { cluster_id, p, 1 };
97 [](
const ClosestCenter& cc) {
return cc.cluster_id; },
99 [](
const ClosestCenter& a,
const ClosestCenter& b) {
100 return ClosestCenter {
101 a.cluster_id, a.point + b.point, a.count + b.count
104 .Map([](
const ClosestCenter& cc) {
105 return cc.point / cc.count;
124 centers.
Print(
"final centers");
131 std::default_random_engine rng(std::random_device { } ());
132 std::uniform_real_distribution<double> dist(0.0, 1000.0);
139 return Point { dist(rng), dist(rng) };
142 return points.Cache().Execute();
154 std::istringstream iss(input);
157 if (iss.peek() != EOF)
158 die(
"Could not parse point coordinates: " << input);
161 return points.Cache();
166 int main(
int argc,
char* argv[]) {
177 std::cerr <<
"Usage: " << argv[0]
178 <<
" [points] [output]" << std::endl;
DIA is the interface between the user and the Thrill framework.
std::vector< ValueType > AllGather() const
Returns the whole DIA in an std::vector on each worker.
auto Generate(Context &ctx, size_t size, const GenerateFunction &generate_function)
Generate is a Source-DOp, which creates a DIA of given size using a generator function.
std::ostream & operator<<(std::ostream &os, const Point &p)
make ostream-able for Print()
thrill::DIA< Point > LoadPoints(thrill::Context &ctx, const char *path)
[step5 GeneratePoints]
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
thrill::common::Vector< D, double > Point
Compile-Time Fixed-Dimensional Points.
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
DIA< std::string > ReadLines(Context &ctx, const std::string &filepath)
ReadLines is a DOp, which reads a file from the file system and creates an ordered DIA according to a...
static by_string to_string(int val)
convert to string
auto Sample(size_t sample_size) const
Select up to sample_size items uniformly at random and return a new DIA<T>.
void Print(const std::string &name=std::string()) const
Print is an Action, which collects all data of the DIA at the worker 0 and prints using ostream seria...
void Process(const thrill::DIA< Point > &points, const char *output)
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
auto Map(const MapFunction &map_function) const
Map applies map_function : to each item of a DIA and delivers a new DIA contains the returned values...
int main(int argc, char *argv[])
[step5 LoadPoints]
thrill::DIA< Point > GeneratePoints(thrill::Context &ctx)
[step5 GeneratePoints]
DIA< ValueType > Collapse() const
Create a CollapseNode which is mainly used to collapse the LOp chain into a DIA<T> with an empty stac...