Thrill  0.1
bfs.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * examples/bfs/bfs.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2017 Robert Williger <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #include "bfs.hpp"
12 
13 #include <thrill/api/cache.hpp>
14 #include <thrill/api/generate.hpp>
18 #include <thrill/api/min.hpp>
19 #include <thrill/api/print.hpp>
22 #include <thrill/api/size.hpp>
23 #include <thrill/api/sort.hpp>
24 #include <thrill/api/sum.hpp>
26 #include <thrill/api/zip.hpp>
28 #include <tlx/cmdline_parser.hpp>
29 
30 #include <algorithm>
31 #include <iterator>
32 #include <sstream>
33 #include <string>
34 #include <vector>
35 
36 using thrill::DIA;
37 using namespace examples::bfs;
38 
39 struct BfsResult {
40  DIA<BfsNode> graph;
41  std::vector<TreeInfo> treeInfos;
42 };
43 
44 // load graph from file
45 DIA<BfsNode> LoadBFSGraph(thrill::Context& ctx, size_t& graphSize,
46  const std::string& path, VertexId startIndex) {
47 
48  // read graph lines from file and add index
49  auto lines =
50  ReadLines(ctx, path)
51  .ZipWithIndex(
52  [](const std::string& node, const size_t& index) {
53  return std::make_pair(node, index);
54  });
55 
56  auto size = lines.SizeFuture();
57 
58  // parse lines into BfsNode structs
59  auto graph = lines.Map(
60  [startIndex](const std::pair<std::string, size_t>& input) {
61  std::istringstream iss(input.first);
62  BfsNode node;
63 
64  node.edges = EdgeList(std::istream_iterator<VertexId>(iss),
65  std::istream_iterator<VertexId>());
66  node.nodeIndex = input.second;
67 
68  if (node.nodeIndex == startIndex)
69  {
70  node.parent = startIndex;
71  node.level = 0;
72  node.treeIndex = 0;
73  }
74 
75  return node;
76  });
77 
78  graphSize = size.get();
79 
80  return graph.Cache();
81 }
82 
83 // returns true if new nodes have been possibly added to the next BFS level
84 bool BFSNextLevel(DIA<BfsNode>& graph, size_t& currentLevel,
85  const size_t currentTreeIndex, const size_t graphSize) {
86 
87  auto neighbors =
88  graph
90  [=](const BfsNode& node, auto emit) {
91  if (node.level == currentLevel && node.treeIndex == currentTreeIndex) {
92  for (auto neighbor : node.edges) {
93  emit(NodeParentPair { neighbor, node.nodeIndex });
94  }
95  }
96  });
97 
98  if (neighbors.Size() == 0)
99  return false;
100 
101  auto reducedNeighbors = neighbors.ReduceToIndex(
102  [](const NodeParentPair& pair) {
103  return pair.node == INVALID ? 0 : pair.node;
104  },
105  [](const NodeParentPair& pair1, const NodeParentPair& pair2) {
106  // pair1.node is INVALID iff it is the default constructed value for
107  // its index
108  return pair1.node == INVALID ? pair2 : pair1;
109  },
110  graphSize,
112 
113  currentLevel++;
114 
115  graph = Zip(
116  [=](BfsNode node, NodeParentPair pair) {
117  if (pair.node != INVALID && node.level == INVALID) {
118  node.level = currentLevel;
119  node.parent = pair.parent;
120  node.treeIndex = currentTreeIndex;
121  }
122  return node;
123  },
124  graph,
125  reducedNeighbors);
126 
127  return true;
128 }
129 
130 // returns true if not all nodes have been reached yet
131 bool PrepareNextTree(DIA<BfsNode>& graph, size_t& startIndex,
132  const size_t currentTreeIndex) {
133 
134  BfsNode validDummy;
135  validDummy.level = 0;
136 
137  // find a node which has not yet been traversed (level == INVALID)
138  auto node = graph.Sum(
139  [](const BfsNode& node1, const BfsNode& node2) {
140  return node1.level == INVALID ? node1 : node2;
141  },
142  validDummy);
143 
144  if (node.level != INVALID)
145  return false; // all nodes have already been traversed
146 
147  startIndex = node.nodeIndex;
148 
149  // initialize start index
150  graph = graph.Map(
151  [=](BfsNode node) {
152  if (node.nodeIndex == startIndex) {
153  node.level = 0;
154  node.parent = node.nodeIndex;
155  node.treeIndex = currentTreeIndex;
156  }
157 
158  return node;
159  }).Collapse();
160 
161  return true;
162 }
163 
164 void outputBFSResult(DIA<BfsNode>& graph, size_t num_trees,
165  std::string output_path) {
166 
167  if (output_path.empty())
168  return;
169 
170  auto grouped =
171  graph
172  .Filter([](const BfsNode& i) { return i.treeIndex != INVALID; })
173  .template GroupToIndex<std::string>(
174  [](const BfsNode& i) { return i.treeIndex; },
175  [](auto& iter, const size_t& key) mutable {
176  std::ostringstream str;
177  str << "BFS tree " << key << ":\n";
178 
179  std::vector<std::pair<size_t, size_t> > nodes;
180 
181  while (iter.HasNext()) {
182  BfsNode node = iter.Next();
183  nodes.emplace_back(node.level, node.nodeIndex);
184  }
185 
186  std::sort(nodes.begin(), nodes.end(),
187  [](const std::pair<size_t, size_t>& l,
188  const std::pair<size_t, size_t>& r) {
189  return l.first < r.first;
190  });
191 
192  size_t lastLevel = 0;
193  str << "0: ";
194  for (const auto& node : nodes)
195  {
196  if (lastLevel != node.first)
197  {
198  lastLevel++;
199  str << '\n' << lastLevel << ": ";
200  }
201  str << node.second << ' ';
202  }
203  str << '\n';
204 
205  return str.str();
206  },
207  num_trees);
208 
209  grouped.WriteLines(output_path);
210 }
211 
212 /*!
213  * runs A BFS on graph starting at startIndex. If full_bfs is true then all
214  * nodes will eventually be reached possibly resulting in a forest instead of a
215  * simple tree
216 */
217 BfsResult BFS(DIA<BfsNode>& graph, size_t graphSize,
218  VertexId startIndex, bool full_bfs = false) {
219 
220  std::vector<TreeInfo> treeInfos;
221  size_t currentTreeIndex = 0;
222 
223  do {
224  size_t currentLevel = 0;
225 
226  while (BFSNextLevel(graph, currentLevel, currentTreeIndex, graphSize))
227  { }
228 
229  treeInfos.emplace_back(TreeInfo { startIndex, currentLevel });
230 
231  currentTreeIndex++;
232  } while (full_bfs && PrepareNextTree(graph, startIndex, currentTreeIndex));
233 
234  return BfsResult({ graph, treeInfos });
235 }
236 
237 BfsResult BFS(thrill::Context& ctx,
238  std::string input_path, std::string output_path,
239  VertexId startIndex, bool full_bfs = false) {
240 
241  size_t graphSize;
242  DIA<BfsNode> graph = LoadBFSGraph(ctx, graphSize, input_path, startIndex);
243 
244  auto result = BFS(graph, graphSize, startIndex, full_bfs);
245  outputBFSResult(result.graph, result.treeInfos.size(), output_path);
246  return result;
247 }
248 
250  thrill::Context& ctx,
251  std::string input_path, std::string output_path, std::string output_path2,
252  VertexId startIndex) {
253 
254  size_t graphSize;
255  DIA<BfsNode> graph = LoadBFSGraph(ctx, graphSize, input_path, startIndex);
256  auto firstBFS = BFS(graph, graphSize, startIndex);
257 
258  outputBFSResult(firstBFS.graph, firstBFS.treeInfos.size(), output_path);
259 
260  // choose node from last level as new start index
261  auto targetLevel = firstBFS.treeInfos.front().levels - 1;
262  startIndex =
263  firstBFS.graph
264  .Filter([=](const BfsNode& node) {
265  return node.level == targetLevel;
266  })
267  .Map([](const BfsNode& node) {
268  return node.nodeIndex;
269  })
270  .Min(INVALID);
271 
272  // create clean graph with new start index
273  DIA<BfsNode> secondGraph =
274  graph
275  .Map([=](const BfsNode& node) {
276  BfsNode emitNode;
277  emitNode.nodeIndex = node.nodeIndex;
278  emitNode.edges = node.edges;
279 
280  if (emitNode.nodeIndex == startIndex)
281  {
282  emitNode.parent = startIndex;
283  emitNode.level = 0;
284  emitNode.treeIndex = 0;
285  }
286  return emitNode;
287  }).Collapse();
288 
289  auto secondBFS = BFS(secondGraph, graphSize, startIndex);
290 
291  auto diameter = secondBFS.treeInfos.front().levels;
292 
293  outputBFSResult(secondBFS.graph, secondBFS.treeInfos.size(), output_path2);
294 
295  if (ctx.my_rank() == 0)
296  LOG1 << "RESULT diameter=" << diameter;
297 
298  return diameter;
299 }
300 
301 int main(int argc, char* argv[]) {
302 
303  tlx::CmdlineParser clp;
304 
305  std::string input_path;
306  clp.add_param_string(
307  "input", input_path,
308  "read graph from this file");
309 
310  std::string output_path;
312  "output", output_path,
313  "output BFS tree to this file");
314 
315  std::string output_path2;
317  "output2", output_path2,
318  "output second BFS tree of diameter sweep to this file");
319 
320  bool full_bfs = false;
321  clp.add_flag('f', "full-bfs", full_bfs,
322  "traverse all nodes even if this produces a disconnected bfs forest");
323 
324  bool diameter = false;
325  clp.add_flag('d', "diameter", diameter,
326  "calculate approximate diameter using two BFS sweeps");
327 
328  if (!clp.process(argc, argv))
329  return -1;
330 
331  clp.print_result();
332 
333  return thrill::Run(
334  [&](thrill::Context& ctx) {
335  if (!diameter)
336  BFS(ctx, input_path, output_path, /* startIndex */ 0, full_bfs);
337  else
338  doubleSweepDiameter(ctx, input_path, output_path, output_path2,
339  /* startIndex */ 0);
340  });
341 }
342 
343 /******************************************************************************/
DIA is the interface between the user and the Thrill framework.
Definition: dia.hpp:141
DIA< BfsNode > LoadBFSGraph(thrill::Context &ctx, size_t &graphSize, const std::string &path, VertexId startIndex)
Definition: bfs.cpp:45
bool PrepareNextTree(DIA< BfsNode > &graph, size_t &startIndex, const size_t currentTreeIndex)
Definition: bfs.cpp:131
VertexId parent
Definition: bfs.hpp:43
#define LOG1
Definition: logger.hpp:28
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
Definition: context.cpp:947
void add_opt_param_string(const std::string &name, std::string &dest, const std::string &desc)
add optional string parameter [name] with description and store to dest
VertexId nodeIndex
Definition: bfs.hpp:41
auto FlatMap(const FlatmapFunction &flatmap_function) const
Each item of a DIA is expanded by the flatmap_function : to zero or more items of different type...
Definition: dia.hpp:458
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
int main(int argc, char *argv[])
Definition: bfs.cpp:301
DIA< std::string > ReadLines(Context &ctx, const std::string &filepath)
ReadLines is a DOp, which reads a file from the file system and creates an ordered DIA according to a...
Definition: read_lines.hpp:454
void print_result(std::ostream &os)
print nicely formatted result of processing
bool BFSNextLevel(DIA< BfsNode > &graph, size_t &currentLevel, const size_t currentTreeIndex, const size_t graphSize)
Definition: bfs.cpp:84
auto Zip(const ZipFunction &zip_function, const DIA< FirstDIAType, FirstDIAStack > &first_dia, const DIAs &... dias)
Zips two DIAs of equal size in style of functional programming by applying zip_function to the i-th e...
Definition: zip.hpp:426
auto Filter(const FilterFunction &filter_function) const
Each item of a DIA is tested using filter_function : to determine whether it is copied into the outp...
Definition: dia.hpp:405
std::vector< VertexId > EdgeList
Definition: bfs.hpp:26
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
Command line parser which automatically fills variables and prints nice usage messages.
auto Map(const MapFunction &map_function) const
Map applies map_function : to each item of a DIA and delivers a new DIA contains the returned values...
Definition: dia.hpp:358
void add_param_string(const std::string &name, std::string &dest, const std::string &desc)
add string parameter [name] with description and store to dest
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
const size_t INVALID
Definition: bfs.hpp:24
EdgeList edges
Definition: bfs.hpp:40
size_t VertexId
Definition: bfs.hpp:25
void outputBFSResult(DIA< BfsNode > &graph, size_t num_trees, std::string output_path)
Definition: bfs.cpp:164
void add_flag(char key, const std::string &longkey, bool &dest, const std::string &desc)
size_t doubleSweepDiameter(thrill::Context &ctx, std::string input_path, std::string output_path, std::string output_path2, VertexId startIndex)
Definition: bfs.cpp:249
BfsResult BFS(DIA< BfsNode > &graph, size_t graphSize, VertexId startIndex, bool full_bfs=false)
runs A BFS on graph starting at startIndex.
Definition: bfs.cpp:217
bool process(int argc, const char *const *argv, std::ostream &os)
ValueType Sum(const SumFunction &sum_function=SumFunction()) const
Sum is an Action, which computes the sum of all elements globally.
Definition: sum.hpp:23