Thrill  0.1
Step 1: Generate Random Points

Welcome to the first step in the Thrill k-means tutorial. This tutorial will show how to implement the k-means clustering algorithm (Lloyd's algorithm) in Thrill.

The algorithm works as follows: Given a set of d-dimensional points, select k initial cluster center points at random. Then attempt to improve the centers by iteratively calculating new centers. This is done by classifying all points and associating them with their nearest center, and then taking the mean of all points associated to one cluster as the new center. This will be repeated a constant number of iterations.

We will implement this algorithm in Thrill, and only work with two-dimensional points for simplicity. Furthermore, we will hard-code many constants to make the code easier to understand.

In this step 1, let us start with generating random 2-dimensional points and outputting them for debugging.

We first need a Point class to represent the points. We may add some calculation functions to it later on.

//! A 2-dimensional point with double precision
struct Point {
//! point coordinates
double x, y;
};

For outputting the Point class, we need to add an operator << for std::ostream, which is the standard way for enabling a struct to be written to a C++ stream. A Point will be nicely formatted as "(x,y)".

//! make ostream-able for Print()
std::ostream& operator << (std::ostream& os, const Point& p) {
return os << '(' << p.x << ',' << p.y << ')';
}

Thrill programs run collectively on p worker threads across h hosts. The whole distributed communication context is created by calling the thrill::Run() method. This method inspects how to communicate with its peers, creates sockets, etc etc, and ultimately runs the given function in parallel with l local worker threads.

int main() {
// launch Thrill program: the lambda function will be run on each worker.
return thrill::Run(
[&](thrill::Context& ctx) { Process(ctx); });
}

The caller of the Run() lambda function delivers a thrill::Context object, which is needed to create DIAs and also provides some useful synchronous MPI-like communication collectives via its net reference. Since our main program will be a bit larger, we decide put it into a Process() function and call this function from Run().

void Process(thrill::Context& ctx) {
std::default_random_engine rng(std::random_device { } ());
std::uniform_real_distribution<double> dist(0.0, 1000.0);
// generate 100 random points using uniform distribution
DIA<Point> points =
ctx, /* size */ 100,
[&](const size_t& /* index */) {
return Point { dist(rng), dist(rng) };
})
.Cache();
// print out the points
points.Print("points");
}

In Process() we do the first actual Thrill computation: generating random points. For this we need a random generator and a random distribution from C++11's STL.

And then we use Thrill's Generate() DIA operation to create a DIA with 100 items. Each item of the DIA is generated using the provided lambda. For our random point, we are not interested in the generated index, and instead return a Point with uniformly random coordinates.

Notice that the random generator and distribution are automatically captured by the lambda via [&]. It is generally hazardous to capture local variable in Map() lambda functions, since they are processed distributed in parallel. But with random generators this is somewhat okay, if one caches the result using Cache(). This is a common pitfall when using randomly generated DIAs: the result must be cached, otherwise it will be regenerated on-the-fly ... returning new random numbers.

We save the generated DIA in the variable points and use Thrill's Print() operation to output it for debugging. The Print() method uses the operator << which we defined earlier.

Notice that we do not have to deal with serialization of struct Point. This is because Point is a "POD" (plain old datatype), which means it contains only primitive types and has only the default constructor. Thrill automatically supports all POD structs (actually: trivially copyable ones). If you add e.g. a std::string, things get more complicated. Read TODO on serializing complex structs.

See the complete example code examples/tutorial/k-means_step1.cpp

The output of our program so far is something like the following:

Thrill: using 7.718 GiB RAM total, BlockPool=2.573 GiB, workers=1.286 GiB, floating=2.573 GiB.
Thrill: running locally with 2 test hosts and 2 workers per host in a local tcp network.
Thrill: using 7.718 GiB RAM total, BlockPool=2.573 GiB, workers=1.286 GiB, floating=2.573 GiB.
Thrill: no THRILL_LOG was found, so no json log is written.
Thrill: no config file ~/.thrill found, using default disk configuration.
Thrill: disk '/tmp/thrill.tmp' is allocated, space: 1000 MiB, I/O implementation: syscall queue=0 devid=0 unlink_on_open
[host 0 worker 0 000000] Execute()  stage Generate.1
[host 0 worker 0 000001] PushData() stage Generate.1 with targets [Cache.2]
[host 0 worker 0 000002] Execute()  stage Cache.2
[host 0 worker 0 000003] PushData() stage Cache.2 with targets [Print.3]
[host 0 worker 0 000004] Execute()  stage Print.3
points --- Begin DIA.Print() --- size=100
points[0]: (176.178,141.586)
points[1]: (24.0951,894.39)
points[2]: (215.228,483.075)
points[3]: (868.574,350.557)
[... more points ...]
points[97]: (91.8578,569.384)
points[98]: (943.67,74.128)
points[99]: (761.59,315.613)
points --- End DIA.Print() --- size=100
Thrill: ran 0.001916s with max 8.008 MiB in DIA Blocks, 1.461 KiB network traffic, 0.000 B disk I/O, and 0.000 B max disk use.
malloc_tracker ### exiting, total: 9922202, peak: 9772884, current: 74256 / 0, allocs: 670, unfreed: 8