Thrill  0.1
tpch_run.cpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * examples/tpch/tpch_run.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Alexander Noe <[email protected]>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
13 
14 #include <thrill/api/cache.hpp>
15 #include <thrill/api/dia.hpp>
16 #include <thrill/api/generate.hpp>
19 #include <thrill/api/size.hpp>
20 #include <thrill/common/logger.hpp>
22 #include <thrill/common/string.hpp>
23 #include <tlx/cmdline_parser.hpp>
24 
25 #include <tlx/string/split.hpp>
26 
27 #include <algorithm>
28 #include <cmath>
29 #include <ctime>
30 #include <functional>
31 #include <string>
32 #include <utility>
33 #include <vector>
34 
35 using namespace thrill; // NOLINT
36 
37 struct LineItem {
38  size_t orderkey;
39  size_t partkey;
40  size_t suppkey;
41  size_t linenumber;
42  size_t quantity;
43  double extendedprice;
44  double discount;
45  double tax;
46  char returnflag;
47  char linestatus;
48  time_t ship;
49  time_t commit;
50  time_t receipt;
51  char shipinstruct[25];
52  char shipmode[10];
53  char comment[44];
55 
56 struct Order {
57  size_t orderkey;
58  size_t custkey;
59  char orderstatus;
60  double totalprice;
61  time_t ordertime;
62  char orderpriority[16];
63  char clerk[16];
64  bool priority;
65  char comment[79];
66 
67  friend std::ostream& operator << (std::ostream& os, const Order& o) {
68  os << '(' << o.orderpriority << "|" << o.clerk << "|" << o.comment;
69  return os << ')';
70  }
72 
73 struct JoinedElement {
74  size_t orderkey;
75  size_t partkey;
76  size_t suppkey;
77  size_t linenumber;
78  size_t quantity;
79  double extendedprice;
80  double discount;
81  double tax;
82  char returnflag;
83  char linestatus;
84  time_t ship;
85  time_t commit;
86  time_t receipt;
87  char shipinstruct[25];
88  char shipmode[10];
89  char lineitem_comment[44];
90  size_t custkey;
91  char orderstatus;
92  double totalprice;
93  time_t ordertime;
94  char orderpriority[16];
95  char clerk[16];
96  bool priority;
97  char order_comment[79];
99 
100 JoinedElement ConstructJoinedElement(const struct LineItem& li, const struct Order& o) {
101  JoinedElement je;
102  je.orderkey = li.orderkey;
103  je.partkey = li.partkey;
104  je.suppkey = li.suppkey;
105  je.linenumber = li.linenumber;
106  je.quantity = li.quantity;
107  je.extendedprice = li.extendedprice;
108  je.discount = li.discount;
109  je.tax = li.tax;
110  je.returnflag = li.returnflag;
111  je.linestatus = li.linestatus;
112  je.ship = li.ship;
113  je.commit = li.commit;
114  je.receipt = li.receipt;
115  snprintf(je.shipinstruct, sizeof(je.shipinstruct),
116  "%s", li.shipinstruct);
117  snprintf(je.shipmode, sizeof(je.shipmode),
118  "%s", li.shipmode);
119  snprintf(je.lineitem_comment, sizeof(je.lineitem_comment),
120  "%s", li.comment);
121  je.custkey = o.custkey;
122  je.orderstatus = o.orderstatus;
123  je.totalprice = o.totalprice;
124  je.ordertime = o.ordertime;
125  snprintf(je.orderpriority, sizeof(je.orderpriority),
126  "%s", o.orderpriority);
127  snprintf(je.clerk, sizeof(je.clerk),
128  "%s", o.clerk);
129  je.priority = o.priority;
130  snprintf(je.order_comment, sizeof(je.order_comment),
131  "%s", o.comment);
132  return je;
133 }
134 
135 // adapted from:
136 // https://gmbabar.wordpress.com/2010/12/01/mktime-slow-use-custom-function/
137 // removed time of day as we dont need that
138 time_t time_to_epoch(const std::string& str) {
139  char* end;
140 
141  struct tm ltm;
142  ltm.tm_year = std::strtoul(str.substr(0, 4).c_str(),
143  &end, 10) - 1900;
144  ltm.tm_mon = std::strtoul(str.substr(5, 2).c_str(),
145  &end, 10);
146  ltm.tm_mday = std::strtoul(str.substr(8).c_str(),
147  &end, 10);
148 
149  const int mon_days[] =
150  { 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31 };
151  long tyears, tdays, leaps;
152  int i;
153 
154  tyears = ltm.tm_year - 70; // tm->tm_year is from 1900.
155  leaps = (tyears + 2) / 4; // no of next two lines until year 2100.
156  // i = (ltm->tm_year – 100) / 100;
157  // leaps -= ( (i/4)*3 + i%4 );
158  tdays = 0;
159  for (i = 0; i < ltm.tm_mon; i++) tdays += mon_days[i];
160 
161  tdays += ltm.tm_mday - 1; // days of month passed.
162  tdays = tdays + (tyears * 365) + leaps;
163 
164  return (tdays * 86400);
165 }
166 
167 static size_t JoinTPCH4(
168  api::Context& ctx,
169  const std::vector<std::string>& input_path) {
170  ctx.enable_consume();
171  std::vector<std::string> splitted;
172  splitted.resize(17);
173  std::string s_lineitems = input_path[0] + std::string("lineitem");
174 
175  auto lineitems = ReadLines(ctx, s_lineitems).FlatMap<struct LineItem>(
176  [&splitted](const std::string& input, auto emit) {
177 
178  LineItem li;
179  char* end;
180  tlx::split(&splitted, '|', input);
181 
182  li.commit = time_to_epoch(splitted[11]);
183  li.receipt = time_to_epoch(splitted[12]);
184 
185  if (li.commit < li.receipt) {
186 
187  li.orderkey = std::strtoul(splitted[0].c_str(), &end, 10);
188  li.partkey = std::strtoul(splitted[1].c_str(), &end, 10);
189  li.suppkey = std::strtoul(splitted[2].c_str(), &end, 10);
190  li.linenumber = std::strtoul(splitted[3].c_str(), &end, 10);
191  li.quantity = std::strtoul(splitted[4].c_str(), &end, 10);
192  li.extendedprice = std::strtod(splitted[5].c_str(), &end);
193  li.discount = std::strtod(splitted[6].c_str(), &end);
194  li.tax = std::strtod(splitted[7].c_str(), &end);
195  li.returnflag = splitted[8][0];
196  li.linestatus = splitted[9][0];
197 
198  li.ship = time_to_epoch(splitted[10]);
199 
200  snprintf(li.shipinstruct, sizeof(li.shipinstruct),
201  "%s", splitted[13].data());
202  snprintf(li.shipmode, sizeof(li.shipmode),
203  "%s", splitted[14].data());
204  snprintf(li.comment, sizeof(li.comment),
205  "%s", splitted[15].data());
206 
207  emit(li);
208  }
209  }).Cache().Keep().Execute();
210 
211  size_t num_items = lineitems.Size();
212 
213  time_t starttime = time_to_epoch("1993-07-01");
214  time_t stoptime = time_to_epoch("1993-10-01");
215 
216  std::string s_orders = input_path[0] + std::string("orders");
217  auto orders = ReadLines(ctx, s_orders).FlatMap<struct Order>(
218  [&splitted, &starttime, &stoptime](const std::string& input, auto emit) {
219  Order o;
220 
221  char* end;
222  tlx::split(&splitted, '|', input);
223 
224  o.ordertime = time_to_epoch(splitted[4]);
225  if (o.ordertime >= starttime && o.ordertime < stoptime) {
226  o.orderkey = std::strtoul(splitted[0].c_str(), &end, 10);
227  o.custkey = std::strtoul(splitted[1].c_str(), &end, 10);
228  o.orderstatus = splitted[2][0];
229  o.totalprice = std::strtod(splitted[3].c_str(), &end);
230  snprintf(o.orderpriority, sizeof(o.orderpriority),
231  "%s", splitted[5].data());
232  snprintf(o.clerk, sizeof(o.clerk),
233  "%s", splitted[6].data());
234  o.priority = (splitted[7][0] != '0');
235  snprintf(o.comment, sizeof(o.comment),
236  "%s", splitted[8].data());
237 
238  emit(o);
239  }
240  }).Cache().Keep().Execute();
241 
242  ctx.net.Barrier();
243 
245 
246  const bool use_detection = false;
247  auto joined =
248  InnerJoin(
250  lineitems, orders,
251  [](const LineItem& li) { return li.orderkey; },
252  [](const Order& o) { return o.orderkey; },
253  [](const LineItem& li, const Order& o) {
254  return ConstructJoinedElement(li, o);
255  }).Size();
256 
257  ctx.net.Barrier();
258 
259  if (ctx.my_rank() == 0) {
260  if (use_detection) {
261  LOG1 << "RESULT " << "benchmark=tpch " << "detection=ON"
262  << " items=" << num_items
263  << " time=" << timer
264  << " traffic=" << ctx.net_manager().Traffic()
265  << " hosts=" << ctx.num_hosts();
266  }
267  else {
268  LOG1 << "RESULT " << "benchmark=tpch " << "detection=OFF"
269  << " items=" << num_items
270  << " time=" << timer
271  << " traffic=" << ctx.net_manager().Traffic()
272  << " hosts=" << ctx.num_hosts();
273  }
274  }
275 
276  return joined;
277 }
278 
279 int main(int argc, char* argv[]) {
280 
281  tlx::CmdlineParser clp;
282 
283  std::vector<std::string> input_path;
284  clp.add_param_stringlist("input", input_path,
285  "input file pattern");
286 
287  if (!clp.process(argc, argv)) {
288  return -1;
289  }
290 
291  die_unless(input_path.size() == 1);
292 
293  clp.print_result();
294 
295  return api::Run(
296  [&](api::Context& ctx) {
297  ctx.enable_consume();
298 
299  LOG1 << JoinTPCH4(ctx, input_path);
300 
301  return 42;
302  });
303 }
304 
305 /******************************************************************************/
net::FlowControlChannel & net
Definition: context.hpp:446
#define die_unless(X)
Definition: die.hpp:27
size_t num_hosts() const
Returns the total number of hosts.
Definition: context.hpp:233
#define LOG1
Definition: logger.hpp:28
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
Definition: context.cpp:947
void Barrier()
A trivial global barrier.
auto InnerJoin(const LocationDetectionFlag< LocationDetectionValue > &, const FirstDIA &first_dia, const SecondDIA &second_dia, const KeyExtractor1 &key_extractor1, const KeyExtractor2 &key_extractor2, const JoinFunction &join_function, const HashFunction &hash_function=HashFunction())
Performs an inner join between this DIA and the DIA given in the first parameter. ...
Definition: inner_join.hpp:710
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
Definition: context.hpp:221
JoinedElement ConstructJoinedElement(const struct LineItem &li, const struct Order &o)
Definition: tpch_run.cpp:100
void enable_consume(bool consume=true)
Sets consume-mode flag such that DIA contents may be consumed during PushData().
Definition: context.hpp:388
DIA< std::string > ReadLines(Context &ctx, const std::string &filepath)
ReadLines is a DOp, which reads a file from the file system and creates an ordered DIA according to a...
Definition: read_lines.hpp:454
std::vector< std::string > split(char sep, const std::string &str, std::string::size_type limit)
Split the given string at each separator character into distinct substrings.
Definition: split.cpp:20
static size_t JoinTPCH4(api::Context &ctx, const std::vector< std::string > &input_path)
Definition: tpch_run.cpp:167
void print_result(std::ostream &os)
print nicely formatted result of processing
string comment
Definition: gen_data.py:32
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
Command line parser which automatically fills variables and prints nice usage messages.
net::Traffic Traffic() const
calculate overall traffic for final stats
Definition: group.cpp:67
size_t my_rank() const
Global rank of this worker among all other workers in the system.
Definition: context.hpp:243
void add_param_stringlist(const std::string &name, std::vector< std::string > &dest, const std::string &desc)
net::Manager & net_manager()
Definition: context.hpp:334
struct LineItem TLX_ATTRIBUTE_PACKED
tag structure for GroupByKey(), and InnerJoin()
Definition: dia.hpp:116
time_t time_to_epoch(const std::string &str)
Definition: tpch_run.cpp:138
int main(int argc, char *argv[])
Definition: tpch_run.cpp:279
bool process(int argc, const char *const *argv, std::ostream &os)
std::ostream & operator<<(std::ostream &os, const DIABase &d)
make ostream-able.
Definition: dia_base.cpp:449