Thrill  0.1
Multiplexer Class Reference

Detailed Description

Multiplexes virtual Connections on Dispatcher.

A worker as a TCP conneciton to each other worker to exchange large amounts of data. Since multiple exchanges can occur at the same time on this single connection we use multiplexing. The slices are called Blocks and are indicated by a MultiplexerHeader. Multiple Blocks form a Stream on a single TCP connection. The multiplexer multiplexes all streams on all sockets.

All sockets are polled for headers. As soon as the a header arrives it is either attached to an existing stream or a new stream instance is created.

Definition at line 67 of file multiplexer.hpp.

+ Collaboration diagram for Multiplexer:

#include <multiplexer.hpp>

Public Member Functions

 Multiplexer (mem::Manager &mem_manager, BlockPool &block_pool, net::DispatcherThread &dispatcher, net::Group &group, size_t workers_per_host)
 
 Multiplexer (const Multiplexer &)=delete
 non-copyable: delete copy-constructor More...
 
 ~Multiplexer ()
 Closes all client connections. More...
 
BlockPoolblock_pool ()
 Get the used BlockPool. More...
 
void Close ()
 Closes all client connections. More...
 
net::DispatcherThreaddispatcher ()
 get network dispatcher More...
 
net::Groupgroup ()
 get network group connection More...
 
common::JsonLoggerlogger ()
 Get the JsonLogger from the BlockPool. More...
 
size_t my_host_rank () const
 my rank among the hosts. More...
 
size_t num_hosts () const
 total number of hosts. More...
 
size_t num_workers () const
 total number of workers. More...
 
Multiplexeroperator= (const Multiplexer &)=delete
 non-copyable: delete assignment operator More...
 
size_t workers_per_host () const
 number of workers per host More...
 
CatStreamData
size_t AllocateCatStreamId (size_t local_worker_id)
 Allocate the next stream. More...
 
CatStreamDataPtr GetOrCreateCatStreamData (size_t id, size_t local_worker_id, size_t dia_id)
 Get stream with given id, if it does not exist, create it. More...
 
CatStreamPtr GetNewCatStream (size_t local_worker_id, size_t dia_id)
 Request next stream. More...
 
MixStream
size_t AllocateMixStreamId (size_t local_worker_id)
 Allocate the next stream. More...
 
MixStreamDataPtr GetOrCreateMixStreamData (size_t id, size_t local_worker_id, size_t dia_id)
 Get stream with given id, if it does not exist, create it. More...
 
MixStreamPtr GetNewMixStream (size_t local_worker_id, size_t dia_id)
 Request next stream. More...
 

Private Types

using Connection = net::Connection
 

Private Member Functions

void AsyncReadMultiplexerHeader (size_t peer, Connection &s)
 
CatStreamDataPtr CatLoopback (size_t stream_id, size_t to_worker_id)
 
CatStreamDataPtr IntGetOrCreateCatStreamData (size_t id, size_t local_worker_id, size_t dia_id)
 
MixStreamDataPtr IntGetOrCreateMixStreamData (size_t id, size_t local_worker_id, size_t dia_id)
 
void IntReleaseCatStream (size_t id, size_t local_worker_id)
 release pointer onto a CatStreamData object More...
 
void IntReleaseMixStream (size_t id, size_t local_worker_id)
 release pointer onto a MixStream object More...
 
MixStreamDataPtr MixLoopback (size_t stream_id, size_t to_worker_id)
 
void OnCatStreamBlock (size_t peer, Connection &s, const StreamMultiplexerHeader &header, const CatStreamDataPtr &stream, PinnedByteBlockPtr &&bytes)
 Receives and dispatches a Block to a CatStreamData. More...
 
void OnMixStreamBlock (size_t peer, Connection &s, const StreamMultiplexerHeader &header, const MixStreamDataPtr &stream, PinnedByteBlockPtr &&bytes)
 Receives and dispatches a Block to a MixStream. More...
 
void OnMultiplexerHeader (size_t peer, uint32_t seq, Connection &s, net::Buffer &&buffer)
 

Private Attributes

std::atomic< size_t > active_streams_ { 0 }
 number of active Cat/MixStreams More...
 
BlockPoolblock_pool_
 reference to host-global BlockPool. More...
 
bool closed_ = false
 closed More...
 
std::unique_ptr< Data > d_
 pimpl data structure More...
 
net::DispatcherThreaddispatcher_
 
net::Groupgroup_
 Holds NetConnections for outgoing Streams. More...
 
std::atomic< size_t > max_active_streams_ { 0 }
 maximu number of active Cat/MixStreams More...
 
mem::Managermem_manager_
 reference to host-global memory manager More...
 
std::mutex mutex_
 protects critical sections More...
 
size_t num_parallel_async_
 number of parallel recv requests More...
 
size_t send_size_limit_
 Calculated send queue size limit for StreamData semaphores. More...
 
size_t workers_per_host_
 Number of workers per host. More...
 

Static Private Attributes

static constexpr bool debug = false
 

Member Typedef Documentation

◆ Connection

using Connection = net::Connection
private

Definition at line 214 of file multiplexer.hpp.

Constructor & Destructor Documentation

◆ Multiplexer() [1/2]

◆ Multiplexer() [2/2]

Multiplexer ( const Multiplexer )
delete

non-copyable: delete copy-constructor

◆ ~Multiplexer()

Closes all client connections.

Definition at line 177 of file multiplexer.cpp.

References Group::Close(), Multiplexer::Close(), Multiplexer::closed_, and Multiplexer::group_.

Member Function Documentation

◆ AllocateCatStreamId()

size_t AllocateCatStreamId ( size_t  local_worker_id)

Allocate the next stream.

Definition at line 184 of file multiplexer.cpp.

References Multiplexer::d_, and Multiplexer::mutex_.

Referenced by Multiplexer::group().

◆ AllocateMixStreamId()

size_t AllocateMixStreamId ( size_t  local_worker_id)

Allocate the next stream.

Definition at line 216 of file multiplexer.cpp.

References Multiplexer::d_, and Multiplexer::mutex_.

Referenced by Multiplexer::group().

◆ AsyncReadMultiplexerHeader()

◆ block_pool()

BlockPool& block_pool ( )
inline

Get the used BlockPool.

Definition at line 108 of file multiplexer.hpp.

References Multiplexer::block_pool_, and Multiplexer::logger().

◆ CatLoopback()

CatStreamDataPtr CatLoopback ( size_t  stream_id,
size_t  to_worker_id 
)
private

Pointer to queue that is used for communication between two workers on the same host.

Definition at line 499 of file multiplexer.cpp.

References Multiplexer::d_, and Multiplexer::mutex_.

Referenced by CatStreamData::GetWriters().

◆ Close()

void Close ( )

Closes all client connections.

Definition at line 162 of file multiplexer.cpp.

References Multiplexer::closed_, Multiplexer::d_, die_unless, LOG1, and Multiplexer::mutex_.

Referenced by Multiplexer::~Multiplexer().

◆ dispatcher()

net::DispatcherThread& dispatcher ( )
inline

get network dispatcher

Definition at line 114 of file multiplexer.hpp.

References Multiplexer::dispatcher_.

Referenced by StreamSet< StreamData >::OnWriterClosed().

◆ GetNewCatStream()

CatStreamPtr GetNewCatStream ( size_t  local_worker_id,
size_t  dia_id 
)

Request next stream.

Definition at line 195 of file multiplexer.cpp.

References Multiplexer::d_, Multiplexer::IntGetOrCreateCatStreamData(), and Multiplexer::mutex_.

Referenced by Context::GetNewCatStream(), and Multiplexer::group().

◆ GetNewMixStream()

MixStreamPtr GetNewMixStream ( size_t  local_worker_id,
size_t  dia_id 
)

Request next stream.

Definition at line 227 of file multiplexer.cpp.

References Multiplexer::d_, Multiplexer::IntGetOrCreateMixStreamData(), and Multiplexer::mutex_.

Referenced by Context::GetNewMixStream(), and Multiplexer::group().

◆ GetOrCreateCatStreamData()

CatStreamDataPtr GetOrCreateCatStreamData ( size_t  id,
size_t  local_worker_id,
size_t  dia_id 
)

Get stream with given id, if it does not exist, create it.

Definition at line 189 of file multiplexer.cpp.

References Multiplexer::IntGetOrCreateCatStreamData(), and Multiplexer::mutex_.

Referenced by Multiplexer::group(), and Multiplexer::OnMultiplexerHeader().

◆ GetOrCreateMixStreamData()

MixStreamDataPtr GetOrCreateMixStreamData ( size_t  id,
size_t  local_worker_id,
size_t  dia_id 
)

Get stream with given id, if it does not exist, create it.

Definition at line 221 of file multiplexer.cpp.

References Multiplexer::IntGetOrCreateMixStreamData(), and Multiplexer::mutex_.

Referenced by Multiplexer::group(), and Multiplexer::OnMultiplexerHeader().

◆ group()

◆ IntGetOrCreateCatStreamData()

CatStreamDataPtr IntGetOrCreateCatStreamData ( size_t  id,
size_t  local_worker_id,
size_t  dia_id 
)
private

◆ IntGetOrCreateMixStreamData()

MixStreamDataPtr IntGetOrCreateMixStreamData ( size_t  id,
size_t  local_worker_id,
size_t  dia_id 
)
private

◆ IntReleaseCatStream()

void IntReleaseCatStream ( size_t  id,
size_t  local_worker_id 
)
private

release pointer onto a CatStreamData object

Definition at line 248 of file multiplexer.cpp.

References Multiplexer::d_, LOG, and sLOG.

Referenced by CatStreamData::Close().

◆ IntReleaseMixStream()

void IntReleaseMixStream ( size_t  id,
size_t  local_worker_id 
)
private

release pointer onto a MixStream object

Definition at line 262 of file multiplexer.cpp.

References Multiplexer::d_, LOG, and sLOG.

Referenced by MixStreamData::Close().

◆ logger()

common::JsonLogger & logger ( )

◆ MixLoopback()

MixStreamDataPtr MixLoopback ( size_t  stream_id,
size_t  to_worker_id 
)
private

Definition at line 506 of file multiplexer.cpp.

References Multiplexer::d_, and Multiplexer::mutex_.

Referenced by MixStreamData::GetWriters().

◆ my_host_rank()

size_t my_host_rank ( ) const
inline

my rank among the hosts.

Definition at line 93 of file multiplexer.hpp.

References Multiplexer::group_, and Group::my_host_rank().

Referenced by Multiplexer::OnMultiplexerHeader().

◆ num_hosts()

size_t num_hosts ( ) const
inline

total number of hosts.

Definition at line 88 of file multiplexer.hpp.

References Multiplexer::group_, and Group::num_hosts().

Referenced by Multiplexer::num_workers().

◆ num_workers()

size_t num_workers ( ) const
inline

total number of workers.

Definition at line 98 of file multiplexer.hpp.

References Multiplexer::num_hosts(), and Multiplexer::workers_per_host_.

Referenced by MixStreamData::GetWriters(), and CatStreamData::GetWriters().

◆ OnCatStreamBlock()

◆ OnMixStreamBlock()

◆ OnMultiplexerHeader()

◆ operator=()

Multiplexer& operator= ( const Multiplexer )
delete

non-copyable: delete assignment operator

◆ workers_per_host()

size_t workers_per_host ( ) const
inline

Member Data Documentation

◆ active_streams_

std::atomic<size_t> active_streams_ { 0 }
private

number of active Cat/MixStreams

Definition at line 179 of file multiplexer.hpp.

Referenced by MixStreamData::Close(), CatStreamData::Close(), MixStreamData::GetWriters(), and CatStreamData::GetWriters().

◆ block_pool_

◆ closed_

bool closed_ = false
private

closed

Definition at line 170 of file multiplexer.hpp.

Referenced by Multiplexer::Close(), and Multiplexer::~Multiplexer().

◆ d_

◆ debug

constexpr bool debug = false
staticprivate

Definition at line 69 of file multiplexer.hpp.

◆ dispatcher_

net::DispatcherThread& dispatcher_
private

dispatcher used for all communication by data::Multiplexer, the thread never leaves the data components!

Definition at line 158 of file multiplexer.hpp.

Referenced by Multiplexer::AsyncReadMultiplexerHeader(), Multiplexer::dispatcher(), and Multiplexer::OnMultiplexerHeader().

◆ group_

◆ max_active_streams_

std::atomic<size_t> max_active_streams_ { 0 }
private

maximu number of active Cat/MixStreams

Definition at line 182 of file multiplexer.hpp.

Referenced by MixStreamData::GetWriters(), and CatStreamData::GetWriters().

◆ mem_manager_

mem::Manager& mem_manager_
private

reference to host-global memory manager

Definition at line 151 of file multiplexer.hpp.

◆ mutex_

◆ num_parallel_async_

size_t num_parallel_async_
private

number of parallel recv requests

Definition at line 173 of file multiplexer.hpp.

Referenced by Multiplexer::AsyncReadMultiplexerHeader(), and Multiplexer::Multiplexer().

◆ send_size_limit_

size_t send_size_limit_
private

Calculated send queue size limit for StreamData semaphores.

Definition at line 176 of file multiplexer.hpp.

Referenced by Multiplexer::IntGetOrCreateCatStreamData(), Multiplexer::IntGetOrCreateMixStreamData(), and Multiplexer::Multiplexer().

◆ workers_per_host_

size_t workers_per_host_
private

The documentation for this class was generated from the following files: