13 #ifndef THRILL_DATA_STREAM_HEADER 14 #define THRILL_DATA_STREAM_HEADER 76 template <
typename ItemType>
77 void Scatter(
File& source,
const std::vector<size_t>& offsets,
78 bool consume =
false) {
80 return ScatterConsume<ItemType>(source, offsets);
82 return ScatterKeep<ItemType>(source, offsets);
88 template <
typename ItemType>
93 size_t num_workers = writers.size();
94 assert(offsets.size() == num_workers + 1);
100 std::vector<std::vector<Block> > block_store;
108 size_t limit = offsets[0];
109 if (current != limit) {
110 reader.template GetItemBatch<ItemType>(limit - current);
115 for (
size_t worker = 0; worker < num_workers; ++worker) {
117 size_t limit = offsets[worker + 1];
118 assert(current <= limit);
119 if (worker < my_rank) {
120 if (current != limit) {
121 block_store.emplace_back(
122 reader.template GetItemBatch<ItemType>(limit - current));
125 block_store.emplace_back(std::vector<Block>());
129 if (current != limit) {
130 writers[worker].AppendBlocks(
131 reader.template GetItemBatch<ItemType>(limit - current));
137 assert(block_store.size() == my_rank);
139 for (
size_t worker = 0; worker < my_rank; ++worker) {
140 if (!block_store[worker].empty())
141 writers[worker].AppendBlocks(std::move(block_store[worker]));
148 template <
typename ItemType>
156 size_t limit = offsets[0];
158 for ( ; current < limit; ++current) {
161 reader.template Next<ItemType>();
164 if (current != limit) {
165 reader.template GetItemBatch<ItemType>(limit - current);
173 size_t num_workers = writers.size();
174 assert(offsets.size() == num_workers + 1);
176 for (
size_t worker = 0; worker < num_workers; ++worker) {
178 size_t limit = offsets[worker + 1];
179 assert(current <= limit);
181 for ( ; current < limit; ++current) {
184 writers[worker](reader.template Next<ItemType>());
187 if (current != limit) {
188 writers[worker].AppendBlocks(
189 reader.template GetItemBatch<ItemType>(limit - current));
193 writers[worker].
Close();
268 #endif // !THRILL_DATA_STREAM_HEADER size_t tx_int_bytes() const
return number of bytes transmitted via internal loopback queues
size_t rx_int_items() const
return number of items received via network internal loopback queues
Base class for common structures for ConcatStream and MixedStream.
void Scatter(File &source, const std::vector< size_t > &offsets, bool consume=false)
Scatters a File to many worker: elements from [offset[0],offset[1]) are sent to the first worker...
size_t tx_net_bytes() const
return number of bytes transmitted via network excluding internal tx
KeepReader GetKeepReader(size_t prefetch_size=File::default_prefetch_size_) const
Get BlockReader for beginning of File.
size_t tx_bytes() const
return number of bytes transmitted
size_t tx_net_items() const
return number of items transmitted via network excluding internal tx
A File is an ordered sequence of Block objects for storing items.
size_t rx_net_bytes() const
return number of bytes received via network excluding internal tx
size_t my_worker_rank() const
Returns my_worker_rank_.
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
BlockWriter< StreamSink > Writer
virtual const StreamId & id() const =0
Return stream id.
void Close()
custom destructor to close writers is a cyclic fashion
size_t tx_int_blocks() const
return number of blocks transmitted via internal loopback queues
void ScatterKeep(File &source, const std::vector< size_t > &offsets)
Keep Version of Scatter() see documentation there.
size_t tx_net_blocks() const
return number of blocks transmitted via network excluding internal tx
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
ConsumeReader GetConsumeReader(size_t prefetch_size=File::default_prefetch_size_)
Get consuming BlockReader for beginning of File.
size_t rx_net_items() const
return number of items received via network excluding internal tx
void Close()
shuts down the stream, waits for all closing blocks
virtual StreamData & data()=0
Return stream data reference.
size_t rx_items() const
return number of items received
BlockReader takes Block objects from BlockSource and allows reading of a) serializable Items or b) ar...
size_t rx_bytes() const
return number of bytes received
size_t tx_blocks() const
return number of blocks transmitted
void ScatterConsume(File &source, const std::vector< size_t > &offsets)
Consuming Version of Scatter() see documentation there.
Stream - base class for CatStream and MixStream.
virtual Writers GetWriters()=0
size_t rx_int_bytes() const
return number of bytes received via network internal loopback queues
size_t rx_net_blocks() const
return number of blocks received via network excluding internal tx
size_t tx_int_items() const
return number of items transmitted via internal loopback queues
size_t tx_items() const
return number of items transmitted
TLX_ATTRIBUTE_ALWAYS_INLINE bool HasNext()
HasNext() returns true if at least one more item is available.
size_t rx_int_blocks() const
return number of blocks received via network internal loopback queues
size_t rx_blocks() const
return number of blocks received
Provides reference counting abilities for use with CountingPtr.