Thrill  0.1
terasort.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * examples/terasort/terasort.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Timo Bingmann <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #include <thrill/api/generate.hpp>
13 #include <thrill/api/size.hpp>
14 #include <thrill/api/sort.hpp>
16 #include <thrill/common/logger.hpp>
17 #include <thrill/common/string.hpp>
18 #include <tlx/cmdline_parser.hpp>
19 
20 #include <tlx/string/hexdump.hpp>
22 
23 #include <algorithm>
24 #include <random>
25 #include <string>
26 #include <utility>
27 #include <vector>
28 
29 using namespace thrill; // NOLINT
30 
31 struct Record {
32  uint8_t key[10];
33  uint8_t value[90];
34 
35  bool operator < (const Record& b) const {
36  return std::lexicographical_compare(key, key + 10, b.key, b.key + 10);
37  }
38  friend std::ostream& operator << (std::ostream& os, const Record& c) {
39  return os << tlx::hexdump(c.key, 10);
40  }
42 
43 static_assert(sizeof(Record) == 100, "struct Record packing incorrect.");
44 
45 struct RecordSigned {
46  char key[10];
47  char value[90];
48 
49  // this sorted by _signed_ characters, which is the same as what some
50  // Java/Scala TeraSorts do.
51  bool operator < (const RecordSigned& b) const {
52  return std::lexicographical_compare(key, key + 10, b.key, b.key + 10);
53  }
54  friend std::ostream& operator << (std::ostream& os, const RecordSigned& c) {
55  return os << tlx::hexdump(c.key, 10);
56  }
58 
59 /*!
60  * Generate a Record in a similar way as the "binary" version of Hadoop's
61  * GenSort does. The underlying random generator is different.
62  */
63 class GenerateRecord
64 {
65 public:
66  Record operator () (size_t index) {
67  Record r;
68 
69  // generate random key record
70  for (size_t i = 0; i < 10; ++i)
71  r.key[i] = static_cast<unsigned char>(rng_());
72 
73  uint8_t* v = r.value;
74 
75  // add 2 bytes "break"
76  *v++ = 0x00;
77  *v++ = 0x11;
78 
79  // fill values with hexadecimal representation of the record number
80  static constexpr uint8_t hexdigits[16] = {
81  '0', '1', '2', '3', '4', '5', '6', '7',
82  '8', '9', 'A', 'B', 'C', 'D', 'E', 'F'
83  };
84  uint64_t rec = index;
85  for (size_t i = 0; i != 2 * sizeof(rec); ++i)
86  *v++ = hexdigits[(rec >> (4 * i)) & 0x0F];
87  for (size_t i = 0; i != 16; ++i)
88  *v++ = '0';
89 
90  // add 4 bytes "break"
91  *v++ = 0x88;
92  *v++ = 0x99;
93  *v++ = 0xAA;
94  *v++ = 0xBB;
95 
96  // add 48 byte filler based on index
97  for (size_t i = 0; i < 12; ++i) {
98  uint8_t f = hexdigits[((20 + rec) >> (4 * i)) & 0x0F];
99  *v++ = f;
100  *v++ = f;
101  *v++ = f;
102  *v++ = f;
103  }
104 
105  // add 4 bytes "break"
106  *v++ = 0xCC;
107  *v++ = 0xDD;
108  *v++ = 0xEE;
109  *v++ = 0xFF;
110 
111  assert(v == r.value + 90);
112 
113  return r;
114  }
115 
116 private:
117  std::default_random_engine rng_ { std::random_device { } () };
118 };
119 
120 int main(int argc, char* argv[]) {
121 
122  tlx::CmdlineParser clp;
123 
124  bool use_signed_char = false;
125  clp.add_bool('s', "signed_char", use_signed_char,
126  "compare with signed chars to compare with broken Java "
127  "implementations, default: false");
128 
129  bool generate = false;
130  clp.add_bool('g', "generate", generate,
131  "generate binary record on-the-fly for testing."
132  " size: first input pattern, default: false");
133 
134  bool generate_only = false;
135  clp.add_bool('G', "generate-only", generate_only,
136  "write unsorted generated binary records to output.");
137 
138  std::string output;
139  clp.add_string('o', "output", output,
140  "output file pattern");
141 
142  std::vector<std::string> input;
143  clp.add_param_stringlist("input", input,
144  "input file pattern(s)");
145 
146  if (!clp.process(argc, argv)) {
147  return -1;
148  }
149 
150  clp.print_result();
151 
152  return api::Run(
153  [&](api::Context& ctx) {
154  ctx.enable_consume();
155 
157 
158  if (generate_only) {
159  die_unequal(input.size(), 1u);
160  // parse first argument like "100mib" size
161  uint64_t size;
162  die_unless(tlx::parse_si_iec_units(input[0].c_str(), &size));
163  die_unless(!use_signed_char);
164 
165  Generate(ctx, size / sizeof(Record), GenerateRecord())
166  .WriteBinary(output);
167  }
168  else if (generate) {
169  die_unequal(input.size(), 1u);
170  // parse first argument like "100mib" size
171  uint64_t size;
172  die_unless(tlx::parse_si_iec_units(input[0].c_str(), &size));
173  die_unless(!use_signed_char);
174 
175  auto r =
176  Generate(ctx, size / sizeof(Record), GenerateRecord())
177  .Sort();
178 
179  if (output.size())
180  r.WriteBinary(output);
181  else
182  r.Size();
183  }
184  else {
185  if (use_signed_char) {
186  auto r = ReadBinary<RecordSigned>(ctx, input).Sort();
187 
188  if (output.size())
189  r.WriteBinary(output);
190  else
191  r.Size();
192  }
193  else {
194  auto r = ReadBinary<Record>(ctx, input).Sort();
195 
196  if (output.size())
197  r.WriteBinary(output);
198  else
199  r.Size();
200  }
201  }
202 
203  ctx.net.Barrier();
204  if (ctx.my_rank() == 0) {
205  auto traffic = ctx.net_manager().Traffic();
206  LOG1 << "RESULT"
207  << " benchmark=terasort"
208  << " time=" << timer
209  << " traffic=" << traffic.total()
210  << " hosts=" << ctx.num_hosts();
211  }
212  });
213 }
214 
215 /******************************************************************************/
net::FlowControlChannel & net
Definition: context.hpp:446
auto Generate(Context &ctx, size_t size, const GenerateFunction &generate_function)
Generate is a Source-DOp, which creates a DIA of given size using a generator function.
Definition: generate.hpp:87
#define die_unless(X)
Definition: die.hpp:27
size_t num_hosts() const
Returns the total number of hosts.
Definition: context.hpp:233
#define LOG1
Definition: logger.hpp:28
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
Definition: context.cpp:947
void Barrier()
A trivial global barrier.
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
void enable_consume(bool consume=true)
Sets consume-mode flag such that DIA contents may be consumed during PushData().
Definition: context.hpp:388
int main(int argc, char *argv[])
Definition: terasort.cpp:120
bool parse_si_iec_units(const char *str, uint64_t *out_size, char default_unit)
Parse a string like "343KB" or "44 GiB" into the corresponding size in bytes.
void print_result(std::ostream &os)
print nicely formatted result of processing
#define die_unequal(X, Y)
Definition: die.hpp:50
int value
Definition: gen_data.py:41
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
struct Record TLX_ATTRIBUTE_PACKED
Command line parser which automatically fills variables and prints nice usage messages.
void add_string(char key, const std::string &longkey, std::string &dest, const std::string &desc)
add string option -key, –longkey and store to dest
net::Traffic Traffic() const
calculate overall traffic for final stats
Definition: group.cpp:67
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
void add_param_stringlist(const std::string &name, std::vector< std::string > &dest, const std::string &desc)
void add_bool(char key, const std::string &longkey, bool &dest, const std::string &desc)
net::Manager & net_manager()
Definition: context.hpp:334
std::string hexdump(const void *const data, size_t size)
Dump a (binary) string as a sequence of uppercase hexadecimal pairs.
Definition: hexdump.cpp:21
bool operator<(const uint_pair &b) const
less-than comparison operator
Definition: uint_types.hpp:187
bool process(int argc, const char *const *argv, std::ostream &os)
std::ostream & operator<<(std::ostream &os, const DIABase &d)
make ostream-able.
Definition: dia_base.cpp:449