12 #ifndef THRILL_API_CONCAT_HEADER 13 #define THRILL_API_CONCAT_HEADER 25 #include <initializer_list> 34 template <
typename ValueType>
38 static constexpr
bool debug =
false;
45 template <
typename ParentDIA0,
typename... ParentDIAs>
48 :
Super(parent0.ctx(),
"Concat",
49 { parent0.id(), parents.id() ... },
50 { parent0.node(),
parents.node() ... }),
72 template <
typename ParentDIA>
74 :
Super(parents.front().ctx(),
"Concat",
76 parents, [](const ParentDIA& d) {
return d.id(); }),
78 parents, [](
const ParentDIA& d) {
100 auto pre_op_fn = [writer](
const ValueType& input) ->
void {
106 auto lop_chain =
parents[i].stack().push(pre_op_fn).fold();
107 parents[i].node()->AddChild(
this, lop_chain, i);
113 template <
typename ParentDIA>
118 static bool warned_once =
false;
119 if (warned_once)
return;
122 LOG1 <<
"Warning: Concat() is a _very_ expensive data shuffle operation" 123 <<
" which can usually be avoided.";
134 template <
typename Index,
typename Parent>
139 auto pre_op_fn = [writer](
const ValueType& input) ->
void {
145 auto lop_chain = parent.stack().push(pre_op_fn).fold();
147 parent.node()->AddChild(
concat_node_, lop_chain, Index::index);
162 <<
"Concat rejected File from parent " 163 <<
"due to non-empty function stack.";
171 <<
"Concat rejected File from parent " 172 <<
"due to non-empty function stack.";
178 assert(
files_[parent_index].num_items() == 0);
179 files_[parent_index] = file.Copy();
189 LOG <<
"ConcatNode::Execute() processing";
191 using VectorSizeT = std::vector<size_t>;
195 local_sizes[i] =
files_[i].num_items();
197 sLOG <<
"local_sizes" << local_sizes;
202 sLOG <<
"global_sizes" << global_sizes;
205 size_t total_items = 0;
208 size_t next_total_items = total_items + global_sizes[i];
212 local_sizes[i] = total_items;
214 total_items = next_total_items;
217 sLOG <<
"local_sizes" << local_sizes;
218 sLOG <<
"total_items" << total_items;
222 VectorSizeT(num_inputs_));
224 sLOG <<
"local_ranks" << local_ranks;
256 const double pre_pe =
257 static_cast<double>(total_items) / static_cast<double>(num_workers);
262 std::vector<size_t> offsets(num_workers + 1, 0);
263 for (
size_t p = 0; p < num_workers; ++p) {
265 static_cast<size_t>(
static_cast<double>(p) * pre_pe);
266 if (limit < local_ranks[in])
continue;
268 offsets[p] =
std::min(limit - local_ranks[in],
271 offsets[num_workers] =
files_[in].num_items();
273 LOG <<
"offsets[" << in <<
"] = " << offsets;
275 streams_[in]->template ScatterConsume<ValueType>(
286 streams_[in]->GetCatReader(consume);
293 LOG <<
"total = " << total;
330 template <
typename FirstDIA,
typename... DIAs>
331 auto Concat(
const FirstDIA& first_dia,
const DIAs& ... dias) {
333 tlx::vexpand((first_dia.AssertValid(), 0), (dias.AssertValid(), 0) ...);
335 using ValueType =
typename FirstDIA::ValueType;
339 return DIA<ValueType>(tlx::make_counting<ConcatNode>(first_dia, dias...));
353 template <
typename ValueType>
375 template <
typename ValueType>
386 template <
typename ValueType,
typename Stack>
387 template <
typename SecondDIA>
389 const SecondDIA& second_dia)
const {
400 #endif // !THRILL_API_CONCAT_HEADER
net::FlowControlChannel & net
#define sLOG
Default logging method: output if the local debug variable is true.
DIA is the interface between the user and the Thrill framework.
void PushItem(const ValueType &item) const
Method for derived classes to Push a single item to all children.
void StopPreOp(size_t parent_index) final
Virtual method for preparing end of PushData.
size_t num_workers() const
Global number of workers in the system.
A File is an ordered sequence of Block objects for storing items.
const std::vector< DIABasePtr > & parents() const
Returns the parents of this DIABase.
TLX_ATTRIBUTE_ALWAYS_INLINE T Next()
Next() reads a complete item T.
std::vector< data::CatStreamPtr > streams_
Array of CatStreams for exchange.
ConcatNode(const ParentDIA0 &parent0, const ParentDIAs &... parents)
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
static constexpr bool g_debug_push_file
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT AllReduce(const T &value, const BinarySumOp &sum_op=BinarySumOp())
Reduces a value of a serializable type T over all workers given a certain reduce function.
auto MapVector(const std::vector< Type > &input, const Functor &f) -> std::vector< typename std::result_of< Functor(Type)>::type >
ConcatNode * concat_node_
bool OnPreOpFile(const data::File &file, size_t parent_index) final
Receive a whole data::File of ValueType, but only if our stack is empty.
template for computing the component-wise sum of std::array or std::vector.
data::CatStreamPtr GetNewCatStream(size_t dia_id)
std::vector< data::File > files_
Files for intermediate storage.
static constexpr bool debug
void PushData(bool consume) final
Virtual method for pushing data. Triggers actual pushing in sub-classes.
std::vector< data::File::Writer > writers_
Writers to intermediate files.
std::vector< T, Allocator< T > > vector
vector with Manager tracking
void operator()(const Index &, Parent &parent)
size_t my_rank() const
Global rank of this worker among all other workers in the system.
data::File GetFile(size_t dia_id)
Returns a new File object containing a sequence of local Blocks.
ConcatNode(const std::vector< ParentDIA > &parents)
A DOpNode is a typed node representing and distributed operations in Thrill.
static uint_pair min()
return an uint_pair instance containing the smallest value possible
void Dispose() final
Virtual clear method. Triggers actual disposing in sub-classes.
tlx::CountingPtr< DIABase > DIABasePtr
auto Concat(const FirstDIA &first_dia, const DIAs &... dias)
Concat is a DOp, which concatenates any number of DIAs to a single DIA.
void Execute() final
Executes the concat operation.
TLX_ATTRIBUTE_ALWAYS_INLINE BlockWriter & Put(const T &x)
Put appends a complete item, or fails with a FullException.
ConcatNode(const std::initializer_list< ParentDIA > &parents)
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
auto Concat(const SecondDIA &second_dia) const
Concat is a DOp, which concatenates any number of DIAs to a single DIA.
#define LOG
Default logging method: output if the local debug variable is true.
RegisterParent(ConcatNode *concat_node)
Context & context_
associated Context
#define LOGC(cond)
Explicitly specify the condition for logging.
T TLX_ATTRIBUTE_WARN_UNUSED_RESULT PrefixSum(const T &value, const BinarySumOp &sum_op=BinarySumOp(), const T &initial=T())
Calculates the inclusive prefix sum over all workers, given a certain sum operation.
const std::vector< bool > parent_stack_empty_
Whether the parent stack is empty.
const size_t num_inputs_
number of input DIAs