16 #ifndef THRILL_API_ZIP_WINDOW_HEADER 17 #define THRILL_API_ZIP_WINDOW_HEADER 53 #ifndef THRILL_DOXYGEN_IGNORE 58 struct ZipWindowTraits :
public ZipWindowTraits<decltype(&T::operator ())> { };
64 template <
typename ClassType,
typename ReturnType,
typename... Args>
65 struct ZipWindowTraits<ReturnType (ClassType::*)(Args...) const> {
68 static constexpr
size_t arity =
sizeof ... (Args);
75 typename std::remove_cv<
76 typename std::remove_reference<Args>::type
77 >::type::value_type...>;
81 typename std::remove_cv<
82 typename std::remove_reference<
83 typename std::remove_cv<
84 typename std::remove_reference<Args>::type
85 >::type::value_type>::type>::type...>;
90 using value_type =
typename std::tuple_element<i, value_type_tuple>::type;
96 typename std::remove_cv<
97 typename std::remove_reference<value_type<i> >::type>::type;
101 typename std::remove_cv<
102 typename std::remove_reference<Args>::type>::type...>;
112 template <
typename ClassType,
typename ReturnType,
typename... Args>
113 struct ZipWindowTraits<ReturnType (ClassType::*)(Args...)>
114 :
public ZipWindowTraits<ReturnType (ClassType::*)(Args...) const> {
119 template <
typename ReturnType,
typename... Args>
120 struct ZipWindowTraits<ReturnType (*)(Args...)> {
123 static constexpr
size_t arity =
sizeof ... (Args);
130 typename std::remove_cv<
131 typename std::remove_reference<Args>::type
132 >::type::value_type...>;
136 typename std::remove_cv<
137 typename std::remove_reference<
138 typename std::remove_cv<
139 typename std::remove_reference<Args>::type
140 >::type::value_type>::type>::type...>;
145 using value_type =
typename std::tuple_element<i, value_type_tuple>::type;
151 typename std::remove_cv<
152 typename std::remove_reference<value_type<i> >::type>::type;
156 typename std::remove_cv<
157 typename std::remove_reference<Args>::type>::type...>;
168 template <
typename ZipWindowNode,
bool UseArray>
173 template <
typename ValueType,
typename ZipFunction_,
174 bool Pad_,
bool UnequalCheck,
bool UseStdArray,
size_t kNumInputs_>
177 static constexpr
bool debug =
false;
180 static constexpr
bool stats_enabled =
false;
183 using Super::context_;
187 static constexpr
bool Pad = Pad_;
188 static constexpr
size_t kNumInputs = kNumInputs_;
190 template <
size_t Index>
192 typename ZipWindowTraits<ZipFunction>::template value_type_plain<Index>;
194 typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain;
200 template <
typename ParentDIA0,
typename... ParentDIAs>
203 const ParentDIA0& parent0,
const ParentDIAs& ... parents)
204 :
Super(parent0.ctx(),
"ZipWindow",
205 { parent0.id(), parents.id() ... },
206 { parent0.node(), parents.node() ... }),
207 window_size_(window_size),
208 zip_function_(zip_function),
212 std::array<bool, kNumInputs>{
213 { ParentDIA0::stack_empty, (ParentDIAs::stack_empty)... }
216 files_.reserve(kNumInputs);
217 for (
size_t i = 0; i < kNumInputs; ++i)
218 files_.emplace_back(context_.GetFile(
this));
226 writers_[parent_index] = files_[parent_index].GetWriter();
231 assert(parent_index < kNumInputs);
232 if (!parent_stack_empty_[parent_index]) {
233 LOGC(context_.my_rank() == 0)
234 <<
"ZipWindow rejected File from parent " 235 <<
"due to non-empty function stack.";
240 LOGC(context_.my_rank() == 0)
241 <<
"ZipWindow accepted File from parent " << parent_index;
242 assert(files_[parent_index].num_items() == 0);
243 files_[parent_index] = file.Copy();
248 LOG << *
this <<
" StopPreOp() parent_index=" << parent_index;
249 writers_[parent_index].Close();
257 size_t result_count = 0;
259 if (result_window_count_ != 0) {
261 std::array<data::CatStream::CatReader, kNumInputs> readers;
262 for (
size_t i = 0; i < kNumInputs; ++i)
263 readers[i] = streams_[i]->GetCatReader(consume);
268 while (reader_next.HasNext()) {
269 auto v = tlx::vmap_for_range<kNumInputs>(reader_next);
276 context_.PrintCollectiveMeanStdev(
277 "ZipWindow() result_count", result_count);
283 for (
size_t i = 0; i < kNumInputs; ++i)
326 template <
typename Index,
typename Parent>
327 void operator () (
const Index&, Parent& parent) {
337 "ZipFunction argument does not match input DIA");
341 auto pre_op_fn = [writer](
const ZipArg& input) ->
void {
347 auto lop_chain = parent.stack().push(pre_op_fn).fold();
348 parent.node()->AddChild(node_, lop_chain, Index::index);
356 template <
size_t Index>
358 const size_t workers = context_.num_workers();
360 size_t result_size = result_window_count_ * window_size_[Index];
363 size_t local_begin =
std::min(result_size, size_prefixsum_[Index]);
365 result_size, size_prefixsum_[Index] + files_[Index].num_items());
369 static_cast<double>(result_window_count_) / static_cast<double>(workers);
371 std::vector<size_t> offsets(workers + 1, 0);
373 for (
size_t i = 0; i <= workers; ++i) {
376 static_cast<size_t>(std::ceil(i * per_pe)) * window_size_[Index];
378 cut < local_begin ? 0 :
std::min(cut, local_end) - local_begin;
381 LOG <<
"per_pe=" << per_pe
382 <<
" offsets[" << Index <<
"] = " << offsets;
385 streams_[Index] = context_.GetNewCatStream(
this);
389 streams_[Index]->template ScatterConsume<ZipArg>(
390 files_[Index], offsets);
397 using ArraySizeT = std::array<size_t, kNumInputs>;
400 ArraySizeT local_size;
401 for (
size_t i = 0; i < kNumInputs; ++i) {
402 local_size[i] = files_[i].num_items();
403 sLOG <<
"input" << i <<
"local_size" << local_size[i];
406 context_.PrintCollectiveMeanStdev(
407 "ZipWindow() local_size", local_size[i]);
414 size_prefixsum_ = local_size;
415 ArraySizeT total_size = context_.net.ExPrefixSumTotal(
419 ArraySizeT total_window_count;
420 for (
size_t i = 0; i < kNumInputs; ++i) {
421 total_window_count[i] =
422 (total_size[i] + window_size_[i] - 1) / window_size_[i];
425 size_t max_total_window_count = *std::max_element(
426 total_window_count.begin(), total_window_count.end());
429 result_window_count_ =
430 Pad ? max_total_window_count : *std::min_element(
431 total_window_count.begin(), total_window_count.end());
433 sLOG <<
"ZipWindow() total_size" << total_size
434 <<
"total_window_count" << total_window_count
435 <<
"max_total_window_count" << max_total_window_count
436 <<
"result_window_count_" << result_window_count_;
439 if (!Pad && UnequalCheck && result_window_count_ != max_total_window_count) {
440 die(
"ZipWindow(): input DIAs have unequal size: " 444 if (result_window_count_ == 0)
return;
447 tlx::call_for_range<kNumInputs>(
450 this->DoScatter<decltype(index)::index>();
455 template <
typename ZipWindowNode,
bool UseArray>
460 template <
typename ZipWindowNode>
469 template <
size_t Index>
471 typename ZipWindowTraits<ZipFunction>
472 ::template value_type_plain<Index>;
474 template <
size_t Index>
476 typename ZipWindowTraits<ZipFunction>
477 ::template vector_plain<Index>;
480 std::array<Reader, kNumInputs>& readers)
481 : zip_node_(zip_node), readers_(readers) { }
486 for (
size_t i = 0; i < kNumInputs; ++i) {
487 if (readers_[i].HasNext())
return true;
492 for (
size_t i = 0; i < kNumInputs; ++i) {
493 if (!readers_[i].HasNext())
return false;
499 template <
typename Index>
505 std::vector<ZipArg>& vec = std::get<Index::index>(vectors_);
508 for (
size_t i = 0; i < zip_node_.window_size_[Index::index]; ++i) {
509 if (Pad && !readers_[Index::index].HasNext()) {
512 std::get<Index::index>(zip_node_.padding_));
516 readers_[Index::index].
template Next<ZipArg>());
530 typename ZipWindowTraits<ZipFunction>::vector_tuple_plain
vectors_;
534 template <
typename ZipWindowNode>
543 template <
size_t Index>
545 typename ZipWindowTraits<ZipFunction>
546 ::template value_type_plain<Index>;
548 template <
size_t Index>
550 typename ZipWindowTraits<ZipFunction>
551 ::template vector_plain<Index>;
554 std::array<Reader, kNumInputs>& readers)
555 : zip_node_(zip_node), readers_(readers) { }
560 for (
size_t i = 0; i < kNumInputs; ++i) {
561 if (readers_[i].HasNext())
return true;
566 for (
size_t i = 0; i < kNumInputs; ++i) {
567 if (!readers_[i].HasNext())
return false;
573 template <
typename Index>
580 ZipVector& vec = std::get<Index::index>(vectors_);
582 for (
size_t i = 0; i < zip_node_.window_size_[Index::index]; ++i) {
583 if (Pad && !readers_[Index::index].HasNext()) {
585 vec[i] = std::get<Index::index>(zip_node_.padding_);
588 vec[i] = readers_[Index::index].template Next<ZipArg>();
602 typename ZipWindowTraits<ZipFunction>::vector_tuple_plain
vectors_;
619 template <
typename ZipFunction,
typename FirstDIAType,
typename FirstDIAStack,
621 auto ZipWindow(
const std::array<
size_t, 1 +
sizeof ... (DIAs)>& window_size,
622 const ZipFunction& zip_function,
624 const DIAs& ... dias) {
630 std::vector<FirstDIAType>,
631 typename ZipWindowTraits<ZipFunction>::template vector_plain<0>
633 "ZipFunction has the wrong input type in DIA 0");
636 =
typename ZipWindowTraits<ZipFunction>::result_type;
639 typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain;
642 ZipResult, ZipFunction,
645 1 +
sizeof ... (DIAs)>;
647 auto node = tlx::make_counting<ZipWindowNode>(
648 window_size, zip_function, ZipArgsTuple(), first_dia, dias...);
665 template <
typename ZipFunction,
typename FirstDIAType,
typename FirstDIAStack,
668 const std::array<
size_t, 1 +
sizeof ... (DIAs)>& window_size,
669 const ZipFunction& zip_function,
671 const DIAs& ... dias) {
677 std::vector<FirstDIAType>,
678 typename ZipWindowTraits<ZipFunction>::template vector_plain<0>
680 "ZipFunction has the wrong input type in DIA 0");
683 =
typename ZipWindowTraits<ZipFunction>::result_type;
686 typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain;
689 ZipResult, ZipFunction,
692 1 +
sizeof ... (DIAs)>;
694 auto node = tlx::make_counting<ZipWindowNode>(
695 window_size, zip_function, ZipArgsTuple(), first_dia, dias...);
712 template <
typename ZipFunction,
typename FirstDIAType,
typename FirstDIAStack,
716 const std::array<
size_t, 1 +
sizeof ... (DIAs)>& window_size,
717 const ZipFunction& zip_function,
718 const typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain&
padding,
720 const DIAs& ... dias) {
726 std::vector<FirstDIAType>,
727 typename ZipWindowTraits<ZipFunction>::template vector_plain<0>
729 "ZipFunction has the wrong input type in DIA 0");
732 typename common::FunctionTraits<ZipFunction>::result_type;
735 ZipResult, ZipFunction,
738 1 +
sizeof ... (DIAs)>;
740 auto node = tlx::make_counting<ZipWindowNode>(
741 window_size, zip_function,
padding, first_dia, dias...);
758 template <
typename ZipFunction,
typename FirstDIAType,
typename FirstDIAStack,
761 const std::array<
size_t, 1 +
sizeof ... (DIAs)>& window_size,
762 const ZipFunction& zip_function,
764 const DIAs& ... dias) {
767 typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain;
770 ZipArgsTuple(), first_dia, dias...);
787 template <
typename ZipFunction,
typename FirstDIAType,
typename FirstDIAStack,
792 const std::array<
size_t, 1 +
sizeof ... (DIAs)>& window_size,
793 const ZipFunction& zip_function,
794 const typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain&
padding,
796 const DIAs& ... dias) {
808 typename common::FunctionTraits<ZipFunction>::result_type;
811 ZipResult, ZipFunction,
814 1 +
sizeof ... (DIAs)>;
816 auto node = tlx::make_counting<ZipWindowNode>(
817 window_size, zip_function,
padding, first_dia, dias...);
834 template <
typename ZipFunction,
typename FirstDIAType,
typename FirstDIAStack,
839 const std::array<
size_t, 1 +
sizeof ... (DIAs)>& window_size,
840 const ZipFunction& zip_function,
842 const DIAs& ... dias) {
845 typename ZipWindowTraits<ZipFunction>::value_type_tuple_plain;
848 ZipArgsTuple(), first_dia, dias...);
863 #endif // !THRILL_API_ZIP_WINDOW_HEADER typename ZipWindowNode::ZipFunction ZipFunction
void StartPreOp(size_t parent_index) final
Virtual method for preparing start of PushData.
tag structure for ZipWindow()
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
#define sLOG
Default logging method: output if the local debug variable is true.
DIA is the interface between the user and the Thrill framework.
bool OnPreOpFile(const data::File &file, size_t parent_index) final
Receive a whole data::File of ValueType, but only if our stack is empty.
void StopPreOp(size_t parent_index) final
Virtual method for preparing end of PushData.
ZipWindowNode(const std::array< size_t, kNumInputs > &window_size, const ZipFunction &zip_function, const ZipArgsTuple &padding, const ParentDIA0 &parent0, const ParentDIAs &... parents)
Constructor for a ZipWindowNode.
CatStreamData::CatReader CatReader
A File is an ordered sequence of Block objects for storing items.
std::array< Reader, kNumInputs > & readers_
reference to the reader array in PushData().
static constexpr size_t kNumInputs
typename ZipWindowTraits< ZipFunction >::value_type_tuple_plain ZipArgsTuple
typename ZipWindowTraits< ZipFunction > ::template vector_plain< Index > ZipVectorN
void AssertValid() const
Assert that the DIA is valid.
ZipWindowTraits< ZipFunction >::vector_tuple_plain vectors_
tuple of std::vector<>s
auto ZipWindow(const std::array< size_t, 1+sizeof ...(DIAs)> &window_size, const ZipFunction &zip_function, const DIA< FirstDIAType, FirstDIAStack > &first_dia, const DIAs &... dias)
Zips two DIAs of equal size in style of functional programming by applying zip_function to the i-th f...
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
const ZipArgsTuple padding_
padding for shorter DIAs
const std::array< bool, kNumInputs > parent_stack_empty_
Whether the parent stack is empty.
typename std::tuple_element< i, vector_tuple_plain >::type vector_plain
template for computing the component-wise sum of std::array or std::vector.
void DoScatter()
Scatter items from DIA "Index" to other workers if necessary.
ZipFunction zip_function_
Zip function.
typename ZipWindowTraits< ZipFunction > ::template value_type_plain< Index > ZipArgN
size_t result_window_count_
shortest size of Zipped inputs
Register Parent PreOp Hooks, instantiated and called for each Zip parent.
const std::array< size_t, kNumInputs > window_size_
Size k of the windows.
static std::string VecToStr(const std::array< T, N > &data)
Logging helper to print arrays as [a1,a2,a3,...].
bool HasNext()
helper for PushData() which checks all inputs
ZipWindowReader(ZipWindowNode &zip_node, std::array< Reader, kNumInputs > &readers)
std::array< Reader, kNumInputs > & readers_
reference to the reader array in PushData().
RegisterParent(ZipWindowNode *node)
static constexpr bool debug
typename ZipWindowNode::ZipFunction ZipFunction
std::tuple< typename std::remove_cv< typename std::remove_reference< Args >::type >::type... > vector_tuple_plain
the tuple of std::vector<>s: with remove_cv and remove_reference applied
void Execute() final
Virtual execution method. Triggers actual computation in sub-classes.
static constexpr size_t padding
typename ZipWindowTraits< ZipFunction > ::template vector_plain< Index > ZipVectorN
static constexpr bool Pad
ZipWindowNode & zip_node_
std::tuple< typename std::remove_cv< typename std::remove_reference< Args >::type >::type::value_type... > value_type_tuple
the tuple of value_type inside the vectors
A DOpNode is a typed node representing and distributed operations in Thrill.
ZipWindowReader(ZipWindowNode &zip_node, std::array< Reader, kNumInputs > &readers)
static uint_pair min()
return an uint_pair instance containing the smallest value possible
std::tuple< typename std::remove_cv< typename std::remove_reference< typename std::remove_cv< typename std::remove_reference< Args >::type >::type::value_type >::type >::type... > value_type_tuple_plain
the tuple of value_types: with remove_cv and remove_reference applied.
std::tuple< typename std::remove_cv< typename std::remove_reference< Args >::type >::type::value_type... > value_type_tuple
the tuple of value_type inside the vectors
typename std::remove_cv< typename std::remove_reference< value_type< i > >::type >::type value_type_plain
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
typename std::tuple_element< i, vector_tuple_plain >::type vector_plain
std::vector< data::File > files_
Files for intermediate storage.
typename ZipWindowTraits< ZipFunction >::template value_type_plain< Index > ZipArgN
void MainOp()
Receive elements from other workers.
typename ZipWindowTraits< ZipFunction > ::template value_type_plain< Index > ZipArgN
std::array< size_t, kNumInputs > size_prefixsum_
exclusive prefix sum over the number of items in workers
std::tuple< typename std::remove_cv< typename std::remove_reference< typename std::remove_cv< typename std::remove_reference< Args >::type >::type::value_type >::type >::type... > value_type_tuple_plain
the tuple of value_types: with remove_cv and remove_reference applied.
ZipWindowTraits< ZipFunction >::vector_tuple_plain vectors_
tuple of std::vector<>s
bool HasNext()
helper for PushData() which checks all inputs
typename std::remove_cv< typename std::remove_reference< value_type< i > >::type >::type value_type_plain
typename std::tuple_element< i, value_type_tuple >::type value_type
#define LOG
Default logging method: output if the local debug variable is true.
std::tuple< typename std::remove_cv< typename std::remove_reference< Args >::type >::type... > vector_tuple_plain
the tuple of std::vector<>s: with remove_cv and remove_reference applied
const struct ArrayTag ArrayTag
global const ArrayTag instance
#define LOGC(cond)
Explicitly specify the condition for logging.
ZipWindowNode & zip_node_
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
typename std::tuple_element< i, value_type_tuple >::type value_type