41 std::vector<TreeInfo> treeInfos;
53 return std::make_pair(node, index);
56 auto size = lines.SizeFuture();
59 auto graph = lines.Map(
60 [startIndex](
const std::pair<std::string, size_t>& input) {
61 std::istringstream iss(input.first);
65 std::istream_iterator<VertexId>());
78 graphSize = size.get();
85 const size_t currentTreeIndex,
const size_t graphSize) {
90 [=](
const BfsNode& node,
auto emit) {
91 if (node.level == currentLevel && node.treeIndex == currentTreeIndex) {
92 for (
auto neighbor : node.edges) {
98 if (neighbors.Size() == 0)
101 auto reducedNeighbors = neighbors.ReduceToIndex(
118 node.level = currentLevel;
119 node.parent = pair.parent;
120 node.treeIndex = currentTreeIndex;
132 const size_t currentTreeIndex) {
135 validDummy.
level = 0;
138 auto node = graph.
Sum(
154 node.parent = node.nodeIndex;
155 node.treeIndex = currentTreeIndex;
167 if (output_path.empty())
173 .
template GroupToIndex<std::string>(
175 [](
auto& iter,
const size_t& key)
mutable {
176 std::ostringstream str;
177 str <<
"BFS tree " << key <<
":\n";
179 std::vector<std::pair<size_t, size_t> > nodes;
181 while (iter.HasNext()) {
186 std::sort(nodes.begin(), nodes.end(),
187 [](
const std::pair<size_t, size_t>& l,
188 const std::pair<size_t, size_t>& r) {
189 return l.first < r.first;
192 size_t lastLevel = 0;
194 for (
const auto& node : nodes)
196 if (lastLevel != node.first)
199 str <<
'\n' << lastLevel <<
": ";
201 str << node.second <<
' ';
209 grouped.WriteLines(output_path);
218 VertexId startIndex,
bool full_bfs =
false) {
220 std::vector<TreeInfo> treeInfos;
221 size_t currentTreeIndex = 0;
224 size_t currentLevel = 0;
226 while (
BFSNextLevel(graph, currentLevel, currentTreeIndex, graphSize))
229 treeInfos.emplace_back(
TreeInfo { startIndex, currentLevel });
232 }
while (full_bfs &&
PrepareNextTree(graph, startIndex, currentTreeIndex));
234 return BfsResult({ graph, treeInfos });
239 VertexId startIndex,
bool full_bfs =
false) {
244 auto result =
BFS(graph, graphSize, startIndex, full_bfs);
256 auto firstBFS =
BFS(graph, graphSize, startIndex);
258 outputBFSResult(firstBFS.graph, firstBFS.treeInfos.size(), output_path);
261 auto targetLevel = firstBFS.treeInfos.front().levels - 1;
264 .Filter([=](
const BfsNode& node) {
265 return node.
level == targetLevel;
282 emitNode.parent = startIndex;
284 emitNode.treeIndex = 0;
289 auto secondBFS =
BFS(secondGraph, graphSize, startIndex);
291 auto diameter = secondBFS.treeInfos.front().levels;
293 outputBFSResult(secondBFS.graph, secondBFS.treeInfos.size(), output_path2);
296 LOG1 <<
"RESULT diameter=" << diameter;
301 int main(
int argc,
char* argv[]) {
308 "read graph from this file");
312 "output", output_path,
313 "output BFS tree to this file");
317 "output2", output_path2,
318 "output second BFS tree of diameter sweep to this file");
320 bool full_bfs =
false;
321 clp.
add_flag(
'f',
"full-bfs", full_bfs,
322 "traverse all nodes even if this produces a disconnected bfs forest");
324 bool diameter =
false;
325 clp.
add_flag(
'd',
"diameter", diameter,
326 "calculate approximate diameter using two BFS sweeps");
336 BFS(ctx, input_path, output_path, 0, full_bfs);
DIA is the interface between the user and the Thrill framework.
DIA< BfsNode > LoadBFSGraph(thrill::Context &ctx, size_t &graphSize, const std::string &path, VertexId startIndex)
bool PrepareNextTree(DIA< BfsNode > &graph, size_t &startIndex, const size_t currentTreeIndex)
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
void add_opt_param_string(const std::string &name, std::string &dest, const std::string &desc)
add optional string parameter [name] with description and store to dest
auto FlatMap(const FlatmapFunction &flatmap_function) const
Each item of a DIA is expanded by the flatmap_function : to zero or more items of different type...
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
int main(int argc, char *argv[])
DIA< std::string > ReadLines(Context &ctx, const std::string &filepath)
ReadLines is a DOp, which reads a file from the file system and creates an ordered DIA according to a...
void print_result(std::ostream &os)
print nicely formatted result of processing
bool BFSNextLevel(DIA< BfsNode > &graph, size_t ¤tLevel, const size_t currentTreeIndex, const size_t graphSize)
auto Zip(const ZipFunction &zip_function, const DIA< FirstDIAType, FirstDIAStack > &first_dia, const DIAs &... dias)
Zips two DIAs of equal size in style of functional programming by applying zip_function to the i-th e...
auto Filter(const FilterFunction &filter_function) const
Each item of a DIA is tested using filter_function : to determine whether it is copied into the outp...
std::vector< VertexId > EdgeList
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Command line parser which automatically fills variables and prints nice usage messages.
auto Map(const MapFunction &map_function) const
Map applies map_function : to each item of a DIA and delivers a new DIA contains the returned values...
void add_param_string(const std::string &name, std::string &dest, const std::string &desc)
add string parameter [name] with description and store to dest
size_t my_rank() const
Global rank of this worker among all other workers in the system.
void outputBFSResult(DIA< BfsNode > &graph, size_t num_trees, std::string output_path)
void add_flag(char key, const std::string &longkey, bool &dest, const std::string &desc)
size_t doubleSweepDiameter(thrill::Context &ctx, std::string input_path, std::string output_path, std::string output_path2, VertexId startIndex)
BfsResult BFS(DIA< BfsNode > &graph, size_t graphSize, VertexId startIndex, bool full_bfs=false)
runs A BFS on graph starting at startIndex.
bool process(int argc, const char *const *argv, std::ostream &os)
ValueType Sum(const SumFunction &sum_function=SumFunction()) const
Sum is an Action, which computes the sum of all elements globally.