16 #ifndef THRILL_API_ZIP_HEADER 17 #define THRILL_API_ZIP_HEADER 75 template <
typename ValueType,
typename ZipFunction,
76 bool Pad,
bool UnequalCheck,
bool NoRebalance,
size_t kNumInputs>
79 static constexpr
bool debug =
false;
87 template <
size_t Index>
89 typename common::FunctionTraits<ZipFunction>::template arg_plain<Index>;
91 typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
97 template <
typename ParentDIA0,
typename... ParentDIAs>
99 const ParentDIA0& parent0,
const ParentDIAs& ...
parents)
100 :
Super(parent0.ctx(),
"Zip",
101 { parent0.id(), parents.id() ... },
102 { parent0.node(),
parents.node() ... }),
107 std::array<bool, kNumInputs>{
108 { ParentDIA0::stack_empty, (ParentDIAs::stack_empty)... }
111 files_.reserve(kNumInputs);
112 for (
size_t i = 0; i < kNumInputs; ++i)
126 assert(parent_index < kNumInputs);
129 <<
"Zip rejected File from parent " 130 <<
"due to non-empty function stack.";
135 assert(
files_[parent_index].num_items() == 0);
136 files_[parent_index] = file.Copy();
141 LOG << *
this <<
" StopPreOp() parent_index=" << parent_index;
150 size_t result_count = 0;
155 std::array<data::File::Reader, kNumInputs> readers;
156 for (
size_t i = 0; i < kNumInputs; ++i)
157 readers[i] =
files_[i].GetReader(consume);
161 while (reader_next.
HasNext()) {
162 auto v = tlx::vmap_for_range<kNumInputs>(reader_next);
169 std::array<data::CatStream::CatReader, kNumInputs> readers;
170 for (
size_t i = 0; i < kNumInputs; ++i)
171 readers[i] =
streams_[i]->GetCatReader(consume);
175 while (reader_next.
HasNext()) {
176 auto v = tlx::vmap_for_range<kNumInputs>(reader_next);
185 "Zip() result_count", result_count);
229 template <
typename Index,
typename Parent>
239 "ZipFunction argument does not match input DIA");
243 auto pre_op_fn = [writer](
const ZipArg& input) ->
void {
249 auto lop_chain = parent.stack().push(pre_op_fn).fold();
250 parent.node()->AddChild(
node_, lop_chain, Index::index);
258 template <
size_t Index>
263 size_t local_begin =
std::min(result_size_, size_prefixsum_[Index]);
265 result_size_, size_prefixsum_[Index] + files_[Index].num_items());
269 static_cast<double>(
result_size_) / static_cast<double>(workers);
271 std::vector<size_t> offsets(workers + 1, 0);
273 for (
size_t i = 0; i <= workers; ++i) {
275 size_t cut =
static_cast<size_t>(std::ceil(i * per_pe));
277 cut < local_begin ? 0 :
std::min(cut, local_end) - local_begin;
280 LOG <<
"per_pe=" << per_pe
281 <<
" offsets[" << Index <<
"] = " << offsets;
288 streams_[Index]->template ScatterConsume<ZipArg>(
289 files_[Index], offsets);
297 result_size_ = files_[0].num_items();
298 for (
size_t i = 1; i < kNumInputs; ++i) {
299 if (result_size_ != files_[i].num_items()) {
300 die(
"Zip() input DIA " << i <<
" partition does not match.");
308 using ArraySizeT = std::array<size_t, kNumInputs>;
311 ArraySizeT local_size;
312 for (
size_t i = 0; i < kNumInputs; ++i) {
313 local_size[i] = files_[i].num_items();
314 sLOG <<
"input" << i <<
"local_size" << local_size[i];
318 "Zip() local_size", local_size[i]);
325 size_prefixsum_ = local_size;
329 size_t max_total_size =
330 *std::max_element(total_size.begin(), total_size.end());
335 : *std::min_element(total_size.begin(), total_size.end());
338 if (!Pad && UnequalCheck && result_size_ != max_total_size) {
339 die(
"Zip(): input DIAs have unequal size: " 343 if (result_size_ == 0)
return;
346 tlx::call_for_range<kNumInputs>(
349 this->DoScatter<decltype(index)::index>();
354 template <
typename Reader>
359 std::array<Reader, kNumInputs>& readers)
360 : zip_node_(zip_node), readers_(readers) { }
365 for (
size_t i = 0; i < kNumInputs; ++i) {
366 if (readers_[i].HasNext())
return true;
371 for (
size_t i = 0; i < kNumInputs; ++i) {
372 if (!readers_[i].HasNext())
return false;
378 template <
typename Index>
384 if (Pad && !readers_[Index::index].HasNext()) {
386 return std::get<Index::index>(zip_node_.padding_);
388 return readers_[Index::index].template Next<ZipArg>();
424 template <
typename ZipFunction,
typename FirstDIAType,
typename FirstDIAStack,
426 auto Zip(
const ZipFunction& zip_function,
428 const DIAs& ... dias) {
435 typename common::FunctionTraits<ZipFunction>::template arg<0>
437 "ZipFunction has the wrong input type in DIA 0");
440 =
typename common::FunctionTraits<ZipFunction>::result_type;
443 typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
446 ZipResult, ZipFunction,
448 1 +
sizeof ... (DIAs)>;
450 auto node = tlx::make_counting<ZipNode>(
479 template <
typename ZipFunction,
typename FirstDIAType,
typename FirstDIAStack,
482 const ZipFunction& zip_function,
484 const DIAs& ... dias) {
491 typename common::FunctionTraits<ZipFunction>::template arg<0>
493 "ZipFunction has the wrong input type in DIA 0");
496 =
typename common::FunctionTraits<ZipFunction>::result_type;
499 typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
502 ZipResult, ZipFunction,
504 1 +
sizeof ... (DIAs)>;
506 auto node = tlx::make_counting<ZipNode>(
535 template <
typename ZipFunction,
typename FirstDIAType,
typename FirstDIAStack,
538 const ZipFunction& zip_function,
540 const DIAs& ... dias) {
547 typename common::FunctionTraits<ZipFunction>::template arg<0>
549 "ZipFunction has the wrong input type in DIA 0");
552 typename common::FunctionTraits<ZipFunction>::result_type;
555 typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
558 ZipResult, ZipFunction,
560 1 +
sizeof ... (DIAs)>;
562 auto node = tlx::make_counting<ZipNode>(
594 template <
typename ZipFunction,
typename FirstDIAType,
typename FirstDIAStack,
598 const ZipFunction& zip_function,
599 const typename common::FunctionTraits<ZipFunction>::args_tuple_plain&
padding,
601 const DIAs& ... dias) {
608 typename common::FunctionTraits<ZipFunction>::template arg<0>
610 "ZipFunction has the wrong input type in DIA 0");
613 typename common::FunctionTraits<ZipFunction>::result_type;
616 ZipResult, ZipFunction,
618 1 +
sizeof ... (DIAs)>;
620 auto node = tlx::make_counting<ZipNode>(
621 zip_function,
padding, first_dia, dias...);
650 template <
typename ZipFunction,
typename FirstDIAType,
typename FirstDIAStack,
654 const ZipFunction& zip_function,
656 const DIAs& ... dias) {
663 typename common::FunctionTraits<ZipFunction>::template arg<0>
665 "ZipFunction has the wrong input type in DIA 0");
668 typename common::FunctionTraits<ZipFunction>::result_type;
671 typename common::FunctionTraits<ZipFunction>::args_tuple_plain;
674 ZipResult, ZipFunction,
676 1 +
sizeof ... (DIAs)>;
678 auto node = tlx::make_counting<ZipNode>(
684 template <
typename ValueType,
typename Stack>
685 template <
typename ZipFunction,
typename SecondDIA>
687 const SecondDIA& second_dia,
const ZipFunction& zip_function)
const {
688 return api::Zip(zip_function, *
this, second_dia);
691 template <
typename ValueType,
typename Stack>
692 template <
typename ZipFunction,
typename SecondDIA>
694 struct CutTag const&,
const SecondDIA& second_dia,
695 const ZipFunction& zip_function)
const {
699 template <
typename ValueType,
typename Stack>
700 template <
typename ZipFunction,
typename SecondDIA>
702 struct PadTag const&,
const SecondDIA& second_dia,
703 const ZipFunction& zip_function)
const {
707 template <
typename ValueType,
typename Stack>
708 template <
typename ZipFunction,
typename SecondDIA>
711 const ZipFunction& zip_function)
const {
722 #endif // !THRILL_API_ZIP_HEADER const ZipArgsTuple padding_
padding for shorter DIAs
RegisterParent(ZipNode *node)
net::FlowControlChannel & net
#define sLOG
Default logging method: output if the local debug variable is true.
DIA is the interface between the user and the Thrill framework.
std::array< Reader, kNumInputs > & readers_
reference to the reader array in PushData().
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
static constexpr bool debug
Register Parent PreOp Hooks, instantiated and called for each Zip parent.
void PrintCollectiveMeanStdev(const char *text, const Type &local)
const std::array< bool, kNumInputs > parent_stack_empty_
Whether the parent stack is empty.
Access CatReaders for different different parents.
size_t num_workers() const
Global number of workers in the system.
A File is an ordered sequence of Block objects for storing items.
const std::vector< DIABasePtr > & parents() const
Returns the parents of this DIABase.
void AssertValid() const
Assert that the DIA is valid.
void StartPreOp(size_t parent_index) final
Virtual method for preparing start of PushData.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
static constexpr bool g_debug_push_file
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
template for computing the component-wise sum of std::array or std::vector.
void StopPreOp(size_t parent_index) final
Virtual method for preparing end of PushData.
typename common::FunctionTraits< ZipFunction >::template arg_plain< Index > ZipArgN
data::CatStreamPtr GetNewCatStream(size_t dia_id)
ReaderNext(ZipNode &zip_node, std::array< Reader, kNumInputs > &readers)
void operator()(const Index &, Parent &parent)
auto Zip(const ZipFunction &zip_function, const DIA< FirstDIAType, FirstDIAStack > &first_dia, const DIAs &... dias)
Zips two DIAs of equal size in style of functional programming by applying zip_function to the i-th e...
static std::string VecToStr(const std::array< T, N > &data)
Logging helper to print arrays as [a1,a2,a3,...].
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
std::vector< data::File > files_
Files for intermediate storage.
bool HasNext()
helper for PushData() which checks all inputs
void Execute() final
Virtual execution method. Triggers actual computation in sub-classes.
typename common::FunctionTraits< ZipFunction >::args_tuple_plain ZipArgsTuple
bool OnPreOpFile(const data::File &file, size_t parent_index) final
Receive a whole data::File of ValueType, but only if our stack is empty.
void MainOp()
Receive elements from other workers.
static constexpr size_t padding
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT ExPrefixSumTotal(T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the exclusive prefix sum over all workers, and delivers the total sum as well...
ZipFunction zip_function_
Zip function.
A DOpNode is a typed node representing and distributed operations in Thrill.
static uint_pair min()
return an uint_pair instance containing the smallest value possible
void DoScatter()
Scatter items from DIA "Index" to other workers if necessary.
auto Zip(const SecondDIA &second_dia, const ZipFunction &zip_function) const
Zips two DIAs of equal size in style of functional programming by applying zip_function to the i-th e...
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
void Close()
Explicitly close the writer.
static constexpr bool stats_enabled
Set this variable to true to enable generation and output of stats.
ZipNode(const ZipFunction &zip_function, const ZipArgsTuple &padding, const ParentDIA0 &parent0, const ParentDIAs &... parents)
Constructor for a ZipNode.
std::array< size_t, kNumInputs > size_prefixsum_
exclusive prefix sum over the number of items in workers
data::CatStreamPtr streams_[kNumInputs]
Array of inbound CatStreams.
data::File::Writer writers_[kNumInputs]
Writers to intermediate files.
#define LOG
Default logging method: output if the local debug variable is true.
size_t result_size_
shortest size of Zipped inputs
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
Context & context_
associated Context
#define LOGC(cond)
Explicitly specify the condition for logging.
A DIANode which performs a Zip operation.