51 char shipinstruct[25];
62 char orderpriority[16];
67 friend std::ostream&
operator << (std::ostream& os,
const Order& o) {
68 os <<
'(' << o.orderpriority <<
"|" << o.clerk <<
"|" << o.comment;
73 struct JoinedElement {
87 char shipinstruct[25];
89 char lineitem_comment[44];
94 char orderpriority[16];
97 char order_comment[79];
102 je.orderkey = li.orderkey;
103 je.partkey = li.partkey;
104 je.suppkey = li.suppkey;
105 je.linenumber = li.linenumber;
106 je.quantity = li.quantity;
107 je.extendedprice = li.extendedprice;
108 je.discount = li.discount;
110 je.returnflag = li.returnflag;
111 je.linestatus = li.linestatus;
113 je.commit = li.commit;
114 je.receipt = li.receipt;
115 snprintf(je.shipinstruct,
sizeof(je.shipinstruct),
116 "%s", li.shipinstruct);
117 snprintf(je.shipmode,
sizeof(je.shipmode),
119 snprintf(je.lineitem_comment,
sizeof(je.lineitem_comment),
121 je.custkey = o.custkey;
122 je.orderstatus = o.orderstatus;
123 je.totalprice = o.totalprice;
124 je.ordertime = o.ordertime;
125 snprintf(je.orderpriority,
sizeof(je.orderpriority),
126 "%s", o.orderpriority);
127 snprintf(je.clerk,
sizeof(je.clerk),
129 je.priority = o.priority;
130 snprintf(je.order_comment,
sizeof(je.order_comment),
142 ltm.tm_year = std::strtoul(str.substr(0, 4).c_str(),
144 ltm.tm_mon = std::strtoul(str.substr(5, 2).c_str(),
146 ltm.tm_mday = std::strtoul(str.substr(8).c_str(),
149 const int mon_days[] =
150 { 31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31 };
151 long tyears, tdays, leaps;
154 tyears = ltm.tm_year - 70;
155 leaps = (tyears + 2) / 4;
159 for (i = 0; i < ltm.tm_mon; i++) tdays += mon_days[i];
161 tdays += ltm.tm_mday - 1;
162 tdays = tdays + (tyears * 365) + leaps;
164 return (tdays * 86400);
169 const std::vector<std::string>& input_path) {
171 std::vector<std::string> splitted;
175 auto lineitems =
ReadLines(ctx, s_lineitems).FlatMap<
struct LineItem>(
176 [&splitted](
const std::string& input,
auto emit) {
185 if (li.commit < li.receipt) {
187 li.orderkey = std::strtoul(splitted[0].c_str(), &end, 10);
188 li.partkey = std::strtoul(splitted[1].c_str(), &end, 10);
189 li.suppkey = std::strtoul(splitted[2].c_str(), &end, 10);
190 li.linenumber = std::strtoul(splitted[3].c_str(), &end, 10);
191 li.quantity = std::strtoul(splitted[4].c_str(), &end, 10);
192 li.extendedprice = std::strtod(splitted[5].c_str(), &end);
193 li.discount = std::strtod(splitted[6].c_str(), &end);
194 li.tax = std::strtod(splitted[7].c_str(), &end);
195 li.returnflag = splitted[8][0];
196 li.linestatus = splitted[9][0];
200 snprintf(li.shipinstruct,
sizeof(li.shipinstruct),
201 "%s", splitted[13].data());
202 snprintf(li.shipmode,
sizeof(li.shipmode),
203 "%s", splitted[14].data());
204 snprintf(li.comment,
sizeof(li.comment),
205 "%s", splitted[15].data());
209 }).Cache().Keep().Execute();
211 size_t num_items = lineitems.Size();
216 std::string s_orders = input_path[0] +
std::string(
"orders");
217 auto orders =
ReadLines(ctx, s_orders).FlatMap<
struct Order>(
218 [&splitted, &starttime, &stoptime](
const std::string& input,
auto emit) {
225 if (o.ordertime >= starttime && o.ordertime < stoptime) {
226 o.orderkey = std::strtoul(splitted[0].c_str(), &end, 10);
227 o.custkey = std::strtoul(splitted[1].c_str(), &end, 10);
228 o.orderstatus = splitted[2][0];
229 o.totalprice = std::strtod(splitted[3].c_str(), &end);
230 snprintf(o.orderpriority,
sizeof(o.orderpriority),
231 "%s", splitted[5].data());
232 snprintf(o.clerk,
sizeof(o.clerk),
233 "%s", splitted[6].data());
234 o.priority = (splitted[7][0] !=
'0');
235 snprintf(o.comment,
sizeof(o.comment),
236 "%s", splitted[8].data());
240 }).Cache().Keep().Execute();
246 const bool use_detection =
false;
251 [](
const LineItem& li) {
return li.orderkey; },
252 [](
const Order& o) {
return o.orderkey; },
253 [](
const LineItem& li,
const Order& o) {
261 LOG1 <<
"RESULT " <<
"benchmark=tpch " <<
"detection=ON" 262 <<
" items=" << num_items
268 LOG1 <<
"RESULT " <<
"benchmark=tpch " <<
"detection=OFF" 269 <<
" items=" << num_items
279 int main(
int argc,
char* argv[]) {
283 std::vector<std::string> input_path;
285 "input file pattern");
287 if (!clp.
process(argc, argv)) {
net::FlowControlChannel & net
size_t num_hosts() const
Returns the total number of hosts.
int Run(const std::function< void(Context &)> &job_startpoint)
Runs the given job startpoint with a Context instance.
void Barrier()
A trivial global barrier.
auto InnerJoin(const LocationDetectionFlag< LocationDetectionValue > &, const FirstDIA &first_dia, const SecondDIA &second_dia, const KeyExtractor1 &key_extractor1, const KeyExtractor2 &key_extractor2, const JoinFunction &join_function, const HashFunction &hash_function=HashFunction())
Performs an inner join between this DIA and the DIA given in the first parameter. ...
The Context of a job is a unique instance per worker which holds references to all underlying parts o...
JoinedElement ConstructJoinedElement(const struct LineItem &li, const struct Order &o)
void enable_consume(bool consume=true)
Sets consume-mode flag such that DIA contents may be consumed during PushData().
DIA< std::string > ReadLines(Context &ctx, const std::string &filepath)
ReadLines is a DOp, which reads a file from the file system and creates an ordered DIA according to a...
std::vector< std::string > split(char sep, const std::string &str, std::string::size_type limit)
Split the given string at each separator character into distinct substrings.
static size_t JoinTPCH4(api::Context &ctx, const std::vector< std::string > &input_path)
void print_result(std::ostream &os)
print nicely formatted result of processing
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Command line parser which automatically fills variables and prints nice usage messages.
net::Traffic Traffic() const
calculate overall traffic for final stats
size_t my_rank() const
Global rank of this worker among all other workers in the system.
void add_param_stringlist(const std::string &name, std::vector< std::string > &dest, const std::string &desc)
net::Manager & net_manager()
struct LineItem TLX_ATTRIBUTE_PACKED
tag structure for GroupByKey(), and InnerJoin()
time_t time_to_epoch(const std::string &str)
int main(int argc, char *argv[])
bool process(int argc, const char *const *argv, std::ostream &os)
std::ostream & operator<<(std::ostream &os, const DIABase &d)
make ostream-able.