Thrill  0.1
Step 4: Iteration!

In step 3 we managed to calculate new cluster centers using a reduction. The natural next step is to iterate this process a fixed number of items.

This is straight-forward in Thrill, since it allows you to use C++ loops as usual. We can just use a for-loop that runs 10 times:

// pick some initial random cluster centers
thrill::DIA<Point> centers = points.Sample(/* num_clusters */ 10);
for (size_t iter = 0; iter < /* iterations */ 10; ++iter)
{
// collect centers in a local vector on each worker
std::vector<Point> local_centers = centers.AllGather();
auto new_centers =
points
// calculate the closest center for each point
.Map(
[local_centers](const Point& p) {
double min_dist = p.DistanceSquare(local_centers[0]);
size_t cluster_id = 0;
for (size_t i = 1; i < local_centers.size(); ++i) {
double dist = p.DistanceSquare(local_centers[i]);
if (dist < min_dist)
min_dist = dist, cluster_id = i;
}
return ClosestCenter { cluster_id, p, /* count */ 1 };
})
// new centers as the mean of all points associated with it
.ReduceByKey(
// key extractor: the cluster id
[](const ClosestCenter& cc) { return cc.cluster_id; },
// reduction: add points and the counter
[](const ClosestCenter& a, const ClosestCenter& b) {
return ClosestCenter {
a.cluster_id, a.point + b.point, a.count + b.count
};
})
.Map([](const ClosestCenter& cc) {
return cc.point / cc.count;
});
new_centers.Print("new_centers");
// Collapse() is needed to fold lambda chain to DIA<Points>
centers = new_centers.Collapse();
}
centers.Print("final centers");
return centers.AllGather();

Besides the for-loop we pulled all operations into a long chain composed using operator (dot). This is identical to the previous version, except for the .Print() calls we no longer need.

The most interesting part of the iteration loop is the required .Collapse() operation. Without this call the code will not compile. Why? The .Collapse() operation folds the chains of lambda functions hidden inside the auto type of new_centers. In this case, the division .Map() operation has to be folded.

This complication only occurs in loops, and when assigning DIA variables with new ones in the iteration. All DIA variables except these should be declared using auto.

See the complete example code tutorial/k-means_step4.cpp

The output of our program so far is something like the following:

[... as before ...]
[host 0 worker 0 000005] PushData() stage Cache.2 with targets [Sample.4]
[host 0 worker 0 000006] Execute()  stage Sample.4
[host 0 worker 0 000007] PushData() stage Sample.4 with targets [AllGather.5]
[host 0 worker 0 000008] Execute()  stage AllGather.5
[host 0 worker 0 000009] PushData() stage Cache.2 with targets [ReduceByKey.7]
[host 0 worker 0 000010] Execute()  stage ReduceByKey.7
[host 0 worker 0 000011] PushData() stage ReduceByKey.7 with targets [Print.9]
[host 0 worker 0 000012] Execute()  stage Print.9
new_centers --- Begin DIA.Print() --- size=10
new_centers[0]: (764.343,852.686)
new_centers[1]: (681.82,965.352)
new_centers[2]: (310.153,959.519)
new_centers[3]: (325.327,371.88)
new_centers[4]: (405.167,127.242)
new_centers[5]: (918.4,601.902)
new_centers[6]: (832.401,331.061)
new_centers[7]: (234.231,721.164)
new_centers[8]: (776.408,79.0238)
new_centers[9]: (708.703,677.69)
new_centers --- End DIA.Print() --- size=10
[host 0 worker 0 000013] PushData() stage ReduceByKey.7 with targets [Collapse.10 [AllGather.11]]
[host 0 worker 0 000014] Execute()  stage AllGather.11
[host 0 worker 0 000015] PushData() stage Cache.2 with targets [ReduceByKey.13]
[host 0 worker 0 000016] Execute()  stage ReduceByKey.13
[host 0 worker 0 000017] PushData() stage ReduceByKey.13 with targets [Print.15]
[host 0 worker 0 000018] Execute()  stage Print.15
new_centers --- Begin DIA.Print() --- size=10
new_centers[0]: (842.671,842.851)
new_centers[1]: (630.191,901.574)
new_centers[2]: (160.131,666.648)
new_centers[3]: (919.674,618.683)
new_centers[4]: (368.526,133.928)
new_centers[5]: (623.79,686.699)
new_centers[6]: (319.875,929.566)
new_centers[7]: (715.076,102.113)
new_centers[8]: (296.944,368.415)
new_centers[9]: (841.718,346.038)
[... repeats 8 more times ...]
final centers --- Begin DIA.Print() --- size=10
final centers[0]: (855.61,815.513)
final centers[1]: (581.785,843.037)
final centers[2]: (596.504,123.161)
final centers[3]: (455.245,501.34)
final centers[4]: (196.218,125.501)
final centers[5]: (828.393,291.954)
final centers[6]: (121.893,677.166)
final centers[7]: (223.355,341.488)
final centers[8]: (874.505,578.813)
final centers[9]: (289.547,941.065)
final centers --- End DIA.Print() --- size=10
[host 0 worker 0 000069] PushData() stage ReduceByKey.61 with targets [Collapse.64 [AllGather.66]]
[host 0 worker 0 000070] Execute()  stage AllGather.66