Thrill  0.1
random_text_writer.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * examples/word_count/random_text_writer.cpp
3  *
4  * A C++ clone of org.apache.hadoop.examples.RandomTextWriter. The clone outputs
5  * only text lines containing words. It uses the same words, but a different
6  * underlying random generator.
7  *
8  * Part of Project Thrill - http://project-thrill.org
9  *
10  * Copyright (C) 2016 Timo Bingmann <[email protected]>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
16 #include <thrill/api/generate.hpp>
18 #include <tlx/cmdline_parser.hpp>
19 
20 #include <fstream>
21 #include <iostream>
22 #include <random>
23 #include <string>
24 #include <vector>
25 
26 using namespace thrill; // NOLINT
27 using namespace examples::word_count; // NOLINT
28 
29 unsigned min_words_key = 5, max_words_key = 10,
31 
32 unsigned seed = 123456;
33 
34 uint64_t totalbytes;
35 bool tab_separator = false;
36 
38 
39 static int Sequential(std::ostream& os) {
40 
41  std::mt19937 prng(seed);
42 
43  uint64_t written_bytes = 0;
44 
45  while (written_bytes < totalbytes)
46  {
47  unsigned num_words_key =
48  min_words_key + static_cast<unsigned>(prng()) % range_words_key;
49  unsigned num_words_value =
50  min_words_value + static_cast<unsigned>(prng()) % range_words_value;
51 
52  std::string key_words = RandomTextWriterGenerate(num_words_key, prng);
53  std::string value_words = RandomTextWriterGenerate(num_words_value, prng);
54 
55  size_t out_size = key_words.size() + 1 + value_words.size() + 1;
56  if (written_bytes + out_size > totalbytes) break;
57 
58  if (tab_separator)
59  os << key_words << '\t' << value_words << '\n';
60  else
61  os << key_words << value_words << '\n';
62 
63  written_bytes += out_size;
64  }
65 
66  return 0;
67 }
68 
69 static void Parallel(api::Context& ctx, const std::string& output) {
70 
71  size_t num_workers = ctx.num_workers();
72  std::mt19937 prng(seed + ctx.my_rank());
73 
74  // generate sentinel value for each worker
75  Generate(ctx, num_workers)
76  // map sentinel to many lines
77  .FlatMap<std::string>(
78  [&](size_t /* index */, auto emit) {
79 
80  uint64_t written_bytes = 0;
81  while (written_bytes < totalbytes / num_workers)
82  {
83  unsigned num_words_key =
84  min_words_key + static_cast<unsigned>(prng()) % range_words_key;
85  unsigned num_words_value =
86  min_words_value + static_cast<unsigned>(prng()) % range_words_value;
87 
88  std::string key_words = RandomTextWriterGenerate(num_words_key, prng);
89  std::string value_words = RandomTextWriterGenerate(num_words_value, prng);
90 
91  size_t out_size = key_words.size() + 1 + value_words.size() + 1;
92  if (written_bytes + out_size > totalbytes) break;
93 
94  if (tab_separator)
95  emit(key_words + "\t" + value_words);
96  else
97  emit(key_words + value_words);
98 
99  written_bytes += out_size;
100  }
101  })
102  .WriteLines(output);
103 }
104 
105 int main(int argc, char* argv[]) {
106 
108 
109  cp.add_unsigned('k', "min_words_key", "<N>", min_words_key,
110  "minimum words in a key");
111  cp.add_unsigned('K', "max_words_key", "<N>", max_words_key,
112  "maximum words in a key");
113 
114  cp.add_unsigned('v', "min_words_value", "<N>", min_words_value,
115  "minimum words in a value");
116  cp.add_unsigned('V', "max_words_value", "<N>", max_words_value,
117  "maximum words in a value");
118 
119  cp.add_unsigned('s', "seed", "<N>", seed,
120  "random seed (default: 123456)");
121 
122  cp.add_bool(0, "tab-separator", tab_separator,
123  "add TAB as separator of key/value (for compatbility)");
124 
125  cp.add_param_bytes("totalbytes", totalbytes,
126  "total number of bytes to generate (approximately)");
127 
128  bool parallel = false;
129  cp.add_bool(1, "parallel", parallel,
130  "run as Thrill parallel/distributed program");
131 
132  std::string output;
133  cp.add_string('o', "output", "<path>", output, "output path");
134 
135  if (!cp.process(argc, argv)) {
136  return -1;
137  }
138 
139  cp.print_result(std::cerr);
140 
141  // calculate range of words
144 
145  if (!parallel) {
146  if (output.size()) {
147  std::ofstream of(output);
148  return Sequential(of);
149  }
150  else {
151  return Sequential(std::cout);
152  }
153  }
154  else {
155  return api::Run([&](api::Context& ctx) {
156  Parallel(ctx, output);
157  });
158  }
159 }
160 
161 /******************************************************************************/
void add_unsigned(char key, const std::string &longkey, unsigned int &dest, const std::string &desc)
unsigned range_words_key
auto Generate(Context &ctx, size_t size, const GenerateFunction &generate_function)
Generate is a Source-DOp, which creates a DIA of given size using a generator function.
Definition: generate.hpp:87
unsigned max_words_key
size_t num_workers() const
Global number of workers in the system.
Definition: context.hpp:251
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
Definition: context.cpp:947
unsigned min_words_value
unsigned min_words_key
unsigned max_words_value
std::string RandomTextWriterGenerate(size_t num_words, RandomGenerator &rng)
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
void add_param_bytes(const std::string &name, uint32_t &dest, const std::string &desc)
bool tab_separator
uint64_t totalbytes
void print_result(std::ostream &os)
print nicely formatted result of processing
static int Sequential(std::ostream &os)
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
Command line parser which automatically fills variables and prints nice usage messages.
void add_string(char key, const std::string &longkey, std::string &dest, const std::string &desc)
add string option -key, –longkey and store to dest
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
unsigned seed
unsigned range_words_value
void add_bool(char key, const std::string &longkey, bool &dest, const std::string &desc)
int main(int argc, char *argv[])
static void Parallel(api::Context &ctx, const std::string &output)
bool process(int argc, const char *const *argv, std::ostream &os)