13 #ifndef THRILL_DATA_STREAM_DATA_HEADER 14 #define THRILL_DATA_STREAM_DATA_HEADER 49 static constexpr
bool debug =
false;
62 Writers(
size_t my_worker_rank = 0);
86 const StreamId&
id,
size_t local_worker_id,
size_t dia_id);
94 virtual const char * stream_type() = 0;
97 size_t my_host_rank()
const {
return multiplexer_.my_host_rank(); }
99 size_t num_hosts()
const {
return multiplexer_.num_hosts(); }
107 return my_host_rank() * workers_per_host() + local_worker_id_;
113 virtual void Close() = 0;
115 virtual bool closed()
const = 0;
119 virtual Writers GetWriters() = 0;
123 void OnWriterClosed(
size_t peer_worker_rank,
bool sent);
126 void OnAllWritersClosed();
134 rx_net_items_ { 0 }, rx_net_bytes_ { 0 }, rx_net_blocks_ { 0 };
139 tx_net_items_ { 0 }, tx_net_bytes_ { 0 }, tx_net_blocks_ { 0 };
144 rx_int_items_ { 0 }, rx_int_bytes_ { 0 }, rx_int_blocks_ { 0 };
149 tx_int_items_ { 0 }, tx_int_bytes_ { 0 }, tx_int_blocks_ { 0 };
187 size_t writers_closed_ = 0;
190 bool all_writers_closed_ =
false;
204 static constexpr
bool debug =
false;
209 virtual void Close() = 0;
213 virtual void OnWriterClosed(
size_t peer_worker_rank,
bool sent) = 0;
220 template <
typename StreamData>
229 StreamId id,
size_t workers_per_host,
size_t dia_id);
237 bool Release(
size_t local_worker_id);
244 void OnWriterClosed(
size_t peer_worker_rank,
bool sent);
247 size_t my_host_rank()
const {
return multiplexer_.my_host_rank(); }
249 size_t num_hosts()
const {
return multiplexer_.num_hosts(); }
277 #endif // !THRILL_DATA_STREAM_DATA_HEADER StreamSink is an BlockSink that sends data via a network socket to the StreamData object on a differe...
common::StatsTimerStopped tx_timespan_
Timers from first rx / tx package until rx / tx direction is closed.
Base class for common structures for ConcatStream and MixedStream.
A simple semaphore implementation using C++11 synchronization methods.
StreamSetBase * stream_set_base_
pointer to StreamSetBase containing this StreamData
size_t my_worker_rank() const
Returns my_worker_rank_.
Multiplexer & multiplexer_
reference to multiplexer
BlockWriter contains a temporary Block object into which a) any serializable item can be stored or b)...
Multiplexer & multiplexer_
reference to multiplexer
std::vector< StreamDataPtr > streams_
'owns' all streams belonging to one stream id for all local workers.
Multiplexes virtual Connections on Dispatcher.
Simple structure that holds a all stream instances for the workers on the local host for a given stre...
std::atomic< size_t > remaining_closing_blocks_
common::StatsTimerStart tx_lifetime_
Timers from creation of stream until rx / tx direction is closed.
size_t local_worker_id_
local worker id
An extra class derived from std::vector<> for delivery of the BlockWriters of a Stream.
std::vector< size_t > writers_closed_per_host_sent_
number of writers closed per host, message is set when all are closed
size_t my_worker_rank_
rank of this worker
std::vector< size_t > writers_closed_per_host_
number of writers closed per host, message is set when all are closed
size_t num_hosts() const
Number of hosts in system.
size_t workers_per_host() const
Returns workers_per_host.
size_t remaining_
countdown to destruction
size_t num_hosts() const
Number of hosts in system.
tlx::Semaphore sem_closing_blocks_
number of received stream closing Blocks.
static constexpr bool debug
std::vector< T, Allocator< T > > vector
vector with Manager tracking
Base class for StreamSet.
const StreamId & id() const
Return stream id.
StreamId id_
our own stream id.
size_t dia_id_
associated DIANode id.
size_t num_workers() const
Number of workers in system.
size_t my_host_rank() const
Returns my_host_rank.
tlx::Semaphore sem_queue_
std::mutex mutex_
mutex for working on the data structure
Provides reference counting abilities for use with CountingPtr.
size_t workers_per_host() const
Returns workers_per_host.