Thrill
0.1
|
In step 3 we managed to calculate new cluster centers using a reduction. The natural next step is to iterate this process a fixed number of items.
This is straight-forward in Thrill, since it allows you to use C++ loops as usual. We can just use a for-loop that runs 10 times:
Besides the for-loop we pulled all operations into a long chain composed using operator (dot)
. This is identical to the previous version, except for the .Print() calls we no longer need.
The most interesting part of the iteration loop is the required .Collapse() operation. Without this call the code will not compile. Why? The .Collapse() operation folds the chains of lambda functions hidden inside the auto
type of new_centers
. In this case, the division .Map() operation has to be folded.
This complication only occurs in loops, and when assigning DIA variables with new ones in the iteration. All DIA variables except these should be declared using auto
.
See the complete example code tutorial/k-means_step4.cpp
The output of our program so far is something like the following:
[... as before ...] [host 0 worker 0 000005] PushData() stage Cache.2 with targets [Sample.4] [host 0 worker 0 000006] Execute() stage Sample.4 [host 0 worker 0 000007] PushData() stage Sample.4 with targets [AllGather.5] [host 0 worker 0 000008] Execute() stage AllGather.5 [host 0 worker 0 000009] PushData() stage Cache.2 with targets [ReduceByKey.7] [host 0 worker 0 000010] Execute() stage ReduceByKey.7 [host 0 worker 0 000011] PushData() stage ReduceByKey.7 with targets [Print.9] [host 0 worker 0 000012] Execute() stage Print.9 new_centers --- Begin DIA.Print() --- size=10 new_centers[0]: (764.343,852.686) new_centers[1]: (681.82,965.352) new_centers[2]: (310.153,959.519) new_centers[3]: (325.327,371.88) new_centers[4]: (405.167,127.242) new_centers[5]: (918.4,601.902) new_centers[6]: (832.401,331.061) new_centers[7]: (234.231,721.164) new_centers[8]: (776.408,79.0238) new_centers[9]: (708.703,677.69) new_centers --- End DIA.Print() --- size=10 [host 0 worker 0 000013] PushData() stage ReduceByKey.7 with targets [Collapse.10 [AllGather.11]] [host 0 worker 0 000014] Execute() stage AllGather.11 [host 0 worker 0 000015] PushData() stage Cache.2 with targets [ReduceByKey.13] [host 0 worker 0 000016] Execute() stage ReduceByKey.13 [host 0 worker 0 000017] PushData() stage ReduceByKey.13 with targets [Print.15] [host 0 worker 0 000018] Execute() stage Print.15 new_centers --- Begin DIA.Print() --- size=10 new_centers[0]: (842.671,842.851) new_centers[1]: (630.191,901.574) new_centers[2]: (160.131,666.648) new_centers[3]: (919.674,618.683) new_centers[4]: (368.526,133.928) new_centers[5]: (623.79,686.699) new_centers[6]: (319.875,929.566) new_centers[7]: (715.076,102.113) new_centers[8]: (296.944,368.415) new_centers[9]: (841.718,346.038) [... repeats 8 more times ...] final centers --- Begin DIA.Print() --- size=10 final centers[0]: (855.61,815.513) final centers[1]: (581.785,843.037) final centers[2]: (596.504,123.161) final centers[3]: (455.245,501.34) final centers[4]: (196.218,125.501) final centers[5]: (828.393,291.954) final centers[6]: (121.893,677.166) final centers[7]: (223.355,341.488) final centers[8]: (874.505,578.813) final centers[9]: (289.547,941.065) final centers --- End DIA.Print() --- size=10 [host 0 worker 0 000069] PushData() stage ReduceByKey.61 with targets [Collapse.64 [AllGather.66]] [host 0 worker 0 000070] Execute() stage AllGather.66