13 #ifndef THRILL_DATA_MULTIPLEXER_HEADER 14 #define THRILL_DATA_MULTIPLEXER_HEADER 32 template <
typename Stream>
49 class MixBlockQueueSink;
69 static constexpr
bool debug =
false;
127 size_t id,
size_t local_worker_id,
size_t dia_id);
142 size_t id,
size_t local_worker_id,
size_t dia_id);
200 std::unique_ptr<Data>
d_;
203 size_t id,
size_t local_worker_id,
size_t dia_id);
205 size_t id,
size_t local_worker_id,
size_t dia_id);
241 #endif // !THRILL_DATA_MULTIPLEXER_HEADER StreamSink is an BlockSink that sends data via a network socket to the StreamData object on a differe...
CatStreamPtr GetNewCatStream(size_t local_worker_id, size_t dia_id)
Request next stream.
common::JsonLogger & logger()
Get the JsonLogger from the BlockPool.
size_t AllocateMixStreamId(size_t local_worker_id)
Allocate the next stream.
std::mutex mutex_
protects critical sections
BlockPool & block_pool()
Get the used BlockPool.
A Stream is a virtual set of connections to all other worker instances, hence a "Stream" bundles them...
void IntReleaseCatStream(size_t id, size_t local_worker_id)
release pointer onto a CatStreamData object
CatStreamDataPtr GetOrCreateCatStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Get stream with given id, if it does not exist, create it.
MixStreamDataPtr MixLoopback(size_t stream_id, size_t to_worker_id)
A pinned / pin-counted pointer to a ByteBlock.
MixStreamDataPtr IntGetOrCreateMixStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Ownership handle onto a MixStream.
~Multiplexer()
Closes all client connections.
mem::Manager & mem_manager_
reference to host-global memory manager
Multiplexes virtual Connections on Dispatcher.
Simple structure that holds a all stream instances for the workers on the local host for a given stre...
std::atomic< size_t > max_active_streams_
maximu number of active Cat/MixStreams
size_t num_hosts() const
total number of hosts.
void OnCatStreamBlock(size_t peer, Connection &s, const StreamMultiplexerHeader &header, const CatStreamDataPtr &stream, PinnedByteBlockPtr &&bytes)
Receives and dispatches a Block to a CatStreamData.
A Stream is a virtual set of connections to all other worker instances, hence a "Stream" bundles them...
void IntReleaseMixStream(size_t id, size_t local_worker_id)
release pointer onto a MixStream object
size_t workers_per_host_
Number of workers per host.
size_t my_host_rank() const
my rank among the hosts.
void OnMixStreamBlock(size_t peer, Connection &s, const StreamMultiplexerHeader &header, const MixStreamDataPtr &stream, PinnedByteBlockPtr &&bytes)
Receives and dispatches a Block to a MixStream.
size_t num_parallel_async_
number of parallel recv requests
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
CatStreamDataPtr IntGetOrCreateCatStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Multiplexer(mem::Manager &mem_manager, BlockPool &block_pool, net::DispatcherThread &dispatcher, net::Group &group, size_t workers_per_host)
A Connection represents a link to another peer in a network group.
net::Group & group_
Holds NetConnections for outgoing Streams.
Ownership handle onto a CatStreamData.
size_t workers_per_host() const
number of workers per host
net::DispatcherThread & dispatcher()
get network dispatcher
size_t send_size_limit_
Calculated send queue size limit for StreamData semaphores.
MixStreamPtr GetNewMixStream(size_t local_worker_id, size_t dia_id)
Request next stream.
net::Group & group()
get network group connection
CatStreamDataPtr CatLoopback(size_t stream_id, size_t to_worker_id)
size_t AllocateCatStreamId(size_t local_worker_id)
Allocate the next stream.
static const size_t bytes
number of bytes in uint_pair
std::unique_ptr< Data > d_
pimpl data structure
static constexpr bool debug
A network Group is a collection of enumerated communication links, which provides point-to-point comm...
net::DispatcherThread & dispatcher_
Object shared by allocators and other classes to track memory allocations.
Simple buffer of characters without initialization or growing functionality.
size_t num_workers() const
total number of workers.
void OnMultiplexerHeader(size_t peer, uint32_t seq, Connection &s, net::Buffer &&buffer)
JsonLogger is a receiver of JSON output objects for logging.
void AsyncReadMultiplexerHeader(size_t peer, Connection &s)
Multiplexer & operator=(const Multiplexer &)=delete
non-copyable: delete assignment operator
BlockPool & block_pool_
reference to host-global BlockPool.
void Close()
Closes all client connections.
size_t my_host_rank() const
Return our rank among hosts in this group.
std::atomic< size_t > active_streams_
number of active Cat/MixStreams
virtual size_t num_hosts() const =0
Return number of connections in this group (= number computing hosts)
DispatcherThread contains a net::Dispatcher object and an associated thread that runs in the dispatch...
A BlockQueue is a thread-safe queue used to hand-over Block objects between threads.
MixStreamDataPtr GetOrCreateMixStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Get stream with given id, if it does not exist, create it.