Thrill  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
word_count_run.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * examples/word_count/word_count_run.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Alexander Noe <[email protected]>
7  * Copyright (C) 2016 Timo Bingmann <[email protected]>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
14 
15 #include <thrill/api/cache.hpp>
16 #include <thrill/api/generate.hpp>
19 #include <thrill/common/logger.hpp>
21 #include <thrill/common/string.hpp>
22 #include <tlx/cmdline_parser.hpp>
23 
24 #include <algorithm>
25 #include <random>
26 #include <string>
27 #include <utility>
28 #include <vector>
29 
30 using namespace thrill; // NOLINT
31 using namespace examples::word_count; // NOLINT
32 
33 /******************************************************************************/
34 // Run methods
35 
36 static void RunWordCount(
37  api::Context& ctx,
38  const std::vector<std::string>& input_filelist, const std::string& output) {
39  ctx.enable_consume();
40 
42 
43  auto lines = ReadLines(ctx, input_filelist);
44 
45  auto word_pairs = WordCount(lines);
46 
47  if (output.size()) {
48  word_pairs
49  .Map([](const WordCountPair& wc) {
50  return wc.first + ": " + std::to_string(wc.second);
51  })
52  .WriteLines(output);
53  }
54  else {
55  word_pairs.Execute();
56  ctx.net.Barrier();
57  if (ctx.my_rank() == 0) {
58  LOG1 << "RESULT"
59  << " benchmark=wordcount"
60  << " files=" << input_filelist.size()
61  << " time=" << timer
62  << " traffic=" << ctx.net_manager().Traffic()
63  << " hosts=" << ctx.num_hosts();
64  }
65  }
66 }
67 
68 static void RunHashWordCount(
69  api::Context& ctx,
70  const std::vector<std::string>& input_filelist, const std::string& output) {
71  ctx.enable_consume();
72 
74 
75  auto lines = ReadLines(ctx, input_filelist);
76 
77  auto word_pairs = HashWordCountExample(lines);
78 
79  if (output.size()) {
80  word_pairs
81  .Map([](const WordCountPair& wc) {
82  return wc.first + ": " + std::to_string(wc.second);
83  })
84  .WriteLines(output);
85  }
86  else {
87  word_pairs.Execute();
88  ctx.net.Barrier();
89  if (ctx.my_rank() == 0) {
90  LOG1 << "RESULT"
91  << " benchmark=wordcount_hash"
92  << " files=" << input_filelist.size()
93  << " time=" << timer
94  << " traffic= " << ctx.net_manager().Traffic()
95  << " hosts=" << ctx.num_hosts();
96  }
97  }
98 }
99 
100 /******************************************************************************/
101 // Run methods with generated input, duplicate some code since it makes the
102 // example easier to understand.
103 
105  api::Context& ctx, size_t num_words, const std::string& output) {
106  ctx.enable_consume();
107 
108  std::default_random_engine rng(std::random_device { } ());
109 
110  auto lines = Generate(
111  ctx, num_words / 10,
112  [&](size_t /* index */) {
113  return RandomTextWriterGenerate(10, rng);
114  });
115 
116  auto word_pairs = WordCount(lines);
117 
118  if (output.size()) {
119  word_pairs
120  .Map([](const WordCountPair& wc) {
121  return wc.first + ": " + std::to_string(wc.second);
122  })
123  .WriteLines(output);
124  }
125  else {
126  word_pairs.Execute();
127  }
128 }
129 
131  api::Context& ctx, size_t num_words, const std::string& output) {
132  ctx.enable_consume();
133 
134  std::default_random_engine rng(std::random_device { } ());
135 
136  auto lines = Generate(
137  ctx, num_words / 10,
138  [&](size_t /* index */) {
139  return RandomTextWriterGenerate(10, rng);
140  });
141 
142  auto word_pairs = HashWordCountExample(lines);
143 
144  if (output.size()) {
145  word_pairs
146  .Map([](const WordCountPair& wc) {
147  return wc.first + ": " + std::to_string(wc.second);
148  })
149  .WriteLines(output);
150  }
151  else {
152  word_pairs.Execute();
153  }
154 }
155 
156 /******************************************************************************/
157 
158 int main(int argc, char* argv[]) {
159 
160  tlx::CmdlineParser clp;
161 
162  std::string output;
163  clp.add_string('o', "output", output,
164  "output file pattern");
165 
166  std::vector<std::string> input;
167  clp.add_param_stringlist("input", input,
168  "input file pattern(s)");
169 
170  bool generate = false;
171  clp.add_bool('g', "generate", generate,
172  "generate random words, first file pattern "
173  "specifies approximately how many.");
174 
175  bool hash_words = false;
176  clp.add_bool('H', "hash_words", hash_words,
177  "explicitly calculate hash values for words "
178  "to accelerate reduction.");
179 
180  if (!clp.process(argc, argv)) {
181  return -1;
182  }
183 
184  clp.print_result();
185 
186  return api::Run(
187  [&](api::Context& ctx) {
188  if (generate) {
189  size_t num_words;
190  if (!common::from_str<size_t>(input[0], num_words))
191  die("For generated word data, set input to the number of words.");
192 
193  if (hash_words)
194  RunHashWordCountGenerated(ctx, num_words, output);
195  else
196  RunWordCountGenerated(ctx, num_words, output);
197  }
198  else {
199  if (hash_words)
200  RunHashWordCount(ctx, input, output);
201  else
202  RunWordCount(ctx, input, output);
203  }
204  });
205 }
206 
207 /******************************************************************************/
net::FlowControlChannel & net
Definition: context.hpp:443
static void RunHashWordCount(api::Context &ctx, const std::vector< std::string > &input_filelist, const std::string &output)
size_t num_hosts() const
Returns the total number of hosts.
Definition: context.hpp:230
auto Generate(Context &ctx, size_t size, const GenerateFunction &generate_function)
Generate is a Source-DOp, which creates a DIA of given size using a generator function.
Definition: generate.hpp:85
static void RunWordCount(api::Context &ctx, const std::vector< std::string > &input_filelist, const std::string &output)
void WordCount(thrill::Context &ctx, std::string input, std::string output)
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:240
#define LOG1
Definition: logger.hpp:176
std::pair< std::string, size_t > WordCountPair
Definition: word_count.hpp:30
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
Definition: context.cpp:887
void Barrier()
A trivial global barrier.
int main(int argc, char *argv[])
auto HashWordCountExample(const DIA< std::string, InputStack > &input)
Definition: word_count.hpp:74
std::string RandomTextWriterGenerate(size_t num_words, RandomGenerator &rng)
#define die(msg)
Instead of abort(), throw the output the message via an exception.
Definition: die.hpp:42
void add_string(char key, const std::string &longkey, const std::string &keytype, std::string &dest, const std::string &desc)
add string option -key, –longkey [keytype] and store to dest
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:218
void enable_consume(bool consume=true)
Sets consume-mode flag such that DIA contents may be consumed during PushData().
Definition: context.hpp:385
DIA< std::string > ReadLines(Context &ctx, const std::string &filepath)
ReadLines is a DOp, which reads a file from the file system and creates an ordered DIA according to a...
Definition: read_lines.hpp:452
static by_string to_string(int val)
convert to string
void print_result(std::ostream &os)
print nicely formatted result of processing
static void RunWordCountGenerated(api::Context &ctx, size_t num_words, const std::string &output)
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
Command line parser which automatically fills variables and prints nice usage messages.
net::Traffic Traffic() const
calculate overall traffic for final stats
Definition: group.cpp:67
static void RunHashWordCountGenerated(api::Context &ctx, size_t num_words, const std::string &output)
void add_param_stringlist(const std::string &name, std::vector< std::string > &dest, const std::string &desc)
net::Manager & net_manager()
Definition: context.hpp:331
void add_bool(char key, const std::string &longkey, const std::string &keytype, bool &dest, const std::string &desc)
bool process(int argc, const char *const *argv, std::ostream &os)