Thrill
0.1
|
Multiplexes virtual Connections on Dispatcher.
A worker as a TCP conneciton to each other worker to exchange large amounts of data. Since multiple exchanges can occur at the same time on this single connection we use multiplexing. The slices are called Blocks and are indicated by a MultiplexerHeader. Multiple Blocks form a Stream on a single TCP connection. The multiplexer multiplexes all streams on all sockets.
All sockets are polled for headers. As soon as the a header arrives it is either attached to an existing stream or a new stream instance is created.
Definition at line 67 of file multiplexer.hpp.
#include <multiplexer.hpp>
Public Member Functions | |
Multiplexer (mem::Manager &mem_manager, BlockPool &block_pool, net::DispatcherThread &dispatcher, net::Group &group, size_t workers_per_host) | |
Multiplexer (const Multiplexer &)=delete | |
non-copyable: delete copy-constructor More... | |
~Multiplexer () | |
Closes all client connections. More... | |
BlockPool & | block_pool () |
Get the used BlockPool. More... | |
void | Close () |
Closes all client connections. More... | |
net::DispatcherThread & | dispatcher () |
get network dispatcher More... | |
net::Group & | group () |
get network group connection More... | |
common::JsonLogger & | logger () |
Get the JsonLogger from the BlockPool. More... | |
size_t | my_host_rank () const |
my rank among the hosts. More... | |
size_t | num_hosts () const |
total number of hosts. More... | |
size_t | num_workers () const |
total number of workers. More... | |
Multiplexer & | operator= (const Multiplexer &)=delete |
non-copyable: delete assignment operator More... | |
size_t | workers_per_host () const |
number of workers per host More... | |
CatStreamData | |
size_t | AllocateCatStreamId (size_t local_worker_id) |
Allocate the next stream. More... | |
CatStreamDataPtr | GetOrCreateCatStreamData (size_t id, size_t local_worker_id, size_t dia_id) |
Get stream with given id, if it does not exist, create it. More... | |
CatStreamPtr | GetNewCatStream (size_t local_worker_id, size_t dia_id) |
Request next stream. More... | |
MixStream | |
size_t | AllocateMixStreamId (size_t local_worker_id) |
Allocate the next stream. More... | |
MixStreamDataPtr | GetOrCreateMixStreamData (size_t id, size_t local_worker_id, size_t dia_id) |
Get stream with given id, if it does not exist, create it. More... | |
MixStreamPtr | GetNewMixStream (size_t local_worker_id, size_t dia_id) |
Request next stream. More... | |
Private Types | |
using | Connection = net::Connection |
Private Member Functions | |
void | AsyncReadMultiplexerHeader (size_t peer, Connection &s) |
CatStreamDataPtr | CatLoopback (size_t stream_id, size_t to_worker_id) |
CatStreamDataPtr | IntGetOrCreateCatStreamData (size_t id, size_t local_worker_id, size_t dia_id) |
MixStreamDataPtr | IntGetOrCreateMixStreamData (size_t id, size_t local_worker_id, size_t dia_id) |
void | IntReleaseCatStream (size_t id, size_t local_worker_id) |
release pointer onto a CatStreamData object More... | |
void | IntReleaseMixStream (size_t id, size_t local_worker_id) |
release pointer onto a MixStream object More... | |
MixStreamDataPtr | MixLoopback (size_t stream_id, size_t to_worker_id) |
void | OnCatStreamBlock (size_t peer, Connection &s, const StreamMultiplexerHeader &header, const CatStreamDataPtr &stream, PinnedByteBlockPtr &&bytes) |
Receives and dispatches a Block to a CatStreamData. More... | |
void | OnMixStreamBlock (size_t peer, Connection &s, const StreamMultiplexerHeader &header, const MixStreamDataPtr &stream, PinnedByteBlockPtr &&bytes) |
Receives and dispatches a Block to a MixStream. More... | |
void | OnMultiplexerHeader (size_t peer, uint32_t seq, Connection &s, net::Buffer &&buffer) |
Private Attributes | |
std::atomic< size_t > | active_streams_ { 0 } |
number of active Cat/MixStreams More... | |
BlockPool & | block_pool_ |
reference to host-global BlockPool. More... | |
bool | closed_ = false |
closed More... | |
std::unique_ptr< Data > | d_ |
pimpl data structure More... | |
net::DispatcherThread & | dispatcher_ |
net::Group & | group_ |
Holds NetConnections for outgoing Streams. More... | |
std::atomic< size_t > | max_active_streams_ { 0 } |
maximu number of active Cat/MixStreams More... | |
mem::Manager & | mem_manager_ |
reference to host-global memory manager More... | |
std::mutex | mutex_ |
protects critical sections More... | |
size_t | num_parallel_async_ |
number of parallel recv requests More... | |
size_t | send_size_limit_ |
Calculated send queue size limit for StreamData semaphores. More... | |
size_t | workers_per_host_ |
Number of workers per host. More... | |
Static Private Attributes | |
static constexpr bool | debug = false |
|
private |
Definition at line 214 of file multiplexer.hpp.
Multiplexer | ( | mem::Manager & | mem_manager, |
BlockPool & | block_pool, | ||
net::DispatcherThread & | dispatcher, | ||
net::Group & | group, | ||
size_t | workers_per_host | ||
) |
Definition at line 128 of file multiplexer.cpp.
References Multiplexer::AsyncReadMultiplexerHeader(), Group::connection(), thrill::data::default_block_size, Multiplexer::group_, BlockPool::hard_ram_limit(), max(), Group::my_host_rank(), Group::num_hosts(), Group::num_parallel_async(), Multiplexer::num_parallel_async_, and Multiplexer::send_size_limit_.
|
delete |
non-copyable: delete copy-constructor
~Multiplexer | ( | ) |
Closes all client connections.
Definition at line 177 of file multiplexer.cpp.
References Group::Close(), Multiplexer::Close(), Multiplexer::closed_, and Multiplexer::group_.
size_t AllocateCatStreamId | ( | size_t | local_worker_id | ) |
Allocate the next stream.
Definition at line 184 of file multiplexer.cpp.
References Multiplexer::d_, and Multiplexer::mutex_.
Referenced by Multiplexer::group().
size_t AllocateMixStreamId | ( | size_t | local_worker_id | ) |
Allocate the next stream.
Definition at line 216 of file multiplexer.cpp.
References Multiplexer::d_, and Multiplexer::mutex_.
Referenced by Multiplexer::group().
|
private |
expects the next MultiplexerHeader from a socket and passes to OnMultiplexerHeader
Definition at line 282 of file multiplexer.cpp.
References DispatcherThread::AsyncRead(), Multiplexer::d_, Multiplexer::dispatcher_, Multiplexer::num_parallel_async_, Multiplexer::OnMultiplexerHeader(), Connection::rx_seq_, and MultiplexerHeader::total_size.
Referenced by Multiplexer::Multiplexer(), Multiplexer::OnCatStreamBlock(), Multiplexer::OnMixStreamBlock(), and Multiplexer::OnMultiplexerHeader().
|
inline |
Get the used BlockPool.
Definition at line 108 of file multiplexer.hpp.
References Multiplexer::block_pool_, and Multiplexer::logger().
|
private |
Pointer to queue that is used for communication between two workers on the same host.
Definition at line 499 of file multiplexer.cpp.
References Multiplexer::d_, and Multiplexer::mutex_.
Referenced by CatStreamData::GetWriters().
void Close | ( | ) |
Closes all client connections.
Definition at line 162 of file multiplexer.cpp.
References Multiplexer::closed_, Multiplexer::d_, die_unless, LOG1, and Multiplexer::mutex_.
Referenced by Multiplexer::~Multiplexer().
|
inline |
get network dispatcher
Definition at line 114 of file multiplexer.hpp.
References Multiplexer::dispatcher_.
Referenced by StreamSet< StreamData >::OnWriterClosed().
CatStreamPtr GetNewCatStream | ( | size_t | local_worker_id, |
size_t | dia_id | ||
) |
Request next stream.
Definition at line 195 of file multiplexer.cpp.
References Multiplexer::d_, Multiplexer::IntGetOrCreateCatStreamData(), and Multiplexer::mutex_.
Referenced by Context::GetNewCatStream(), and Multiplexer::group().
MixStreamPtr GetNewMixStream | ( | size_t | local_worker_id, |
size_t | dia_id | ||
) |
Request next stream.
Definition at line 227 of file multiplexer.cpp.
References Multiplexer::d_, Multiplexer::IntGetOrCreateMixStreamData(), and Multiplexer::mutex_.
Referenced by Context::GetNewMixStream(), and Multiplexer::group().
CatStreamDataPtr GetOrCreateCatStreamData | ( | size_t | id, |
size_t | local_worker_id, | ||
size_t | dia_id | ||
) |
Get stream with given id, if it does not exist, create it.
Definition at line 189 of file multiplexer.cpp.
References Multiplexer::IntGetOrCreateCatStreamData(), and Multiplexer::mutex_.
Referenced by Multiplexer::group(), and Multiplexer::OnMultiplexerHeader().
MixStreamDataPtr GetOrCreateMixStreamData | ( | size_t | id, |
size_t | local_worker_id, | ||
size_t | dia_id | ||
) |
Get stream with given id, if it does not exist, create it.
Definition at line 221 of file multiplexer.cpp.
References Multiplexer::IntGetOrCreateMixStreamData(), and Multiplexer::mutex_.
Referenced by Multiplexer::group(), and Multiplexer::OnMultiplexerHeader().
|
inline |
get network group connection
Definition at line 117 of file multiplexer.hpp.
References Multiplexer::AllocateCatStreamId(), Multiplexer::AllocateMixStreamId(), Multiplexer::GetNewCatStream(), Multiplexer::GetNewMixStream(), Multiplexer::GetOrCreateCatStreamData(), Multiplexer::GetOrCreateMixStreamData(), and Multiplexer::group_.
Referenced by StreamSet< StreamData >::OnWriterClosed().
|
private |
Definition at line 203 of file multiplexer.cpp.
References Multiplexer::d_, Multiplexer::send_size_limit_, and Multiplexer::workers_per_host_.
Referenced by Multiplexer::GetNewCatStream(), and Multiplexer::GetOrCreateCatStreamData().
|
private |
Definition at line 235 of file multiplexer.cpp.
References Multiplexer::d_, Multiplexer::send_size_limit_, and Multiplexer::workers_per_host_.
Referenced by Multiplexer::GetNewMixStream(), and Multiplexer::GetOrCreateMixStreamData().
|
private |
release pointer onto a CatStreamData object
Definition at line 248 of file multiplexer.cpp.
References Multiplexer::d_, LOG, and sLOG.
Referenced by CatStreamData::Close().
|
private |
release pointer onto a MixStream object
Definition at line 262 of file multiplexer.cpp.
References Multiplexer::d_, LOG, and sLOG.
Referenced by MixStreamData::Close().
common::JsonLogger & logger | ( | ) |
Get the JsonLogger from the BlockPool.
Definition at line 276 of file multiplexer.cpp.
References Multiplexer::block_pool_, and BlockPool::logger().
Referenced by Multiplexer::block_pool(), CatStreamData::CatStreamData(), and StreamData::OnAllWritersClosed().
|
private |
Definition at line 506 of file multiplexer.cpp.
References Multiplexer::d_, and Multiplexer::mutex_.
Referenced by MixStreamData::GetWriters().
|
inline |
my rank among the hosts.
Definition at line 93 of file multiplexer.hpp.
References Multiplexer::group_, and Group::my_host_rank().
Referenced by Multiplexer::OnMultiplexerHeader().
|
inline |
total number of hosts.
Definition at line 88 of file multiplexer.hpp.
References Multiplexer::group_, and Group::num_hosts().
Referenced by Multiplexer::num_workers().
|
inline |
total number of workers.
Definition at line 98 of file multiplexer.hpp.
References Multiplexer::num_hosts(), and Multiplexer::workers_per_host_.
Referenced by MixStreamData::GetWriters(), and CatStreamData::GetWriters().
|
private |
Receives and dispatches a Block to a CatStreamData.
Definition at line 451 of file multiplexer.cpp.
References Multiplexer::AsyncReadMultiplexerHeader(), bytes, Multiplexer::d_, die_unless, MultiplexerHeader::first_item, MultiplexerHeader::is_last_block, MultiplexerHeader::num_items, StreamMultiplexerHeader::sender_worker, StreamMultiplexerHeader::seq, MultiplexerHeader::size, sLOG, StreamMultiplexerHeader::stream_id, and MultiplexerHeader::typecode_verify.
|
private |
Receives and dispatches a Block to a MixStream.
Definition at line 475 of file multiplexer.cpp.
References Multiplexer::AsyncReadMultiplexerHeader(), bytes, Multiplexer::d_, die_unless, MultiplexerHeader::first_item, MultiplexerHeader::is_last_block, MultiplexerHeader::num_items, StreamMultiplexerHeader::sender_worker, StreamMultiplexerHeader::seq, MultiplexerHeader::size, sLOG, StreamMultiplexerHeader::stream_id, and MultiplexerHeader::typecode_verify.
|
private |
parses MultiplexerHeader and decides whether to receive Block or close Stream
Definition at line 296 of file multiplexer.cpp.
References BlockPool::AllocateByteBlock(), DispatcherThread::AsyncRead(), Multiplexer::AsyncReadMultiplexerHeader(), Multiplexer::block_pool_, bytes, thrill::data::CatStreamBlock, Multiplexer::d_, die, die_unless, Multiplexer::dispatcher_, MultiplexerHeader::first_item, Multiplexer::GetOrCreateCatStreamData(), Multiplexer::GetOrCreateMixStreamData(), StreamMultiplexerHeader::IsAllWorkers(), StreamMultiplexerHeader::IsEnd(), LOG, MultiplexerHeader::magic, thrill::data::MixStreamBlock, Multiplexer::my_host_rank(), MultiplexerHeader::num_items, StreamMultiplexerHeader::Parse(), StreamMultiplexerHeader::receiver_local_worker, tlx::round_up_to_power_of_two(), StreamMultiplexerHeader::sender_worker, StreamMultiplexerHeader::seq, MultiplexerHeader::size, sLOG, StreamMultiplexerHeader::stream_id, THRILL_DEFAULT_ALIGN, MultiplexerHeader::typecode_verify, and Multiplexer::workers_per_host().
Referenced by Multiplexer::AsyncReadMultiplexerHeader().
|
delete |
non-copyable: delete assignment operator
|
inline |
number of workers per host
Definition at line 103 of file multiplexer.hpp.
References Multiplexer::workers_per_host_.
Referenced by MixStreamData::GetWriters(), CatStreamData::GetWriters(), StreamData::OnAllWritersClosed(), and Multiplexer::OnMultiplexerHeader().
|
private |
number of active Cat/MixStreams
Definition at line 179 of file multiplexer.hpp.
Referenced by MixStreamData::Close(), CatStreamData::Close(), MixStreamData::GetWriters(), and CatStreamData::GetWriters().
|
private |
reference to host-global BlockPool.
Definition at line 154 of file multiplexer.hpp.
Referenced by Multiplexer::block_pool(), CatStreamData::CatStreamData(), MixStreamData::GetWriters(), CatStreamData::GetWriters(), Multiplexer::logger(), and Multiplexer::OnMultiplexerHeader().
|
private |
closed
Definition at line 170 of file multiplexer.hpp.
Referenced by Multiplexer::Close(), and Multiplexer::~Multiplexer().
|
private |
pimpl data structure
Definition at line 197 of file multiplexer.hpp.
Referenced by Multiplexer::AllocateCatStreamId(), Multiplexer::AllocateMixStreamId(), Multiplexer::AsyncReadMultiplexerHeader(), Multiplexer::CatLoopback(), Multiplexer::Close(), Multiplexer::GetNewCatStream(), Multiplexer::GetNewMixStream(), Multiplexer::IntGetOrCreateCatStreamData(), Multiplexer::IntGetOrCreateMixStreamData(), Multiplexer::IntReleaseCatStream(), Multiplexer::IntReleaseMixStream(), Multiplexer::MixLoopback(), Multiplexer::OnCatStreamBlock(), Multiplexer::OnMixStreamBlock(), and Multiplexer::OnMultiplexerHeader().
|
staticprivate |
Definition at line 69 of file multiplexer.hpp.
|
private |
dispatcher used for all communication by data::Multiplexer, the thread never leaves the data components!
Definition at line 158 of file multiplexer.hpp.
Referenced by Multiplexer::AsyncReadMultiplexerHeader(), Multiplexer::dispatcher(), and Multiplexer::OnMultiplexerHeader().
|
private |
Holds NetConnections for outgoing Streams.
Definition at line 161 of file multiplexer.hpp.
Referenced by MixStreamData::GetWriters(), CatStreamData::GetWriters(), Multiplexer::group(), Multiplexer::Multiplexer(), Multiplexer::my_host_rank(), Multiplexer::num_hosts(), and Multiplexer::~Multiplexer().
|
private |
maximu number of active Cat/MixStreams
Definition at line 182 of file multiplexer.hpp.
Referenced by MixStreamData::GetWriters(), and CatStreamData::GetWriters().
|
private |
reference to host-global memory manager
Definition at line 151 of file multiplexer.hpp.
|
private |
protects critical sections
Definition at line 167 of file multiplexer.hpp.
Referenced by Multiplexer::AllocateCatStreamId(), Multiplexer::AllocateMixStreamId(), Multiplexer::CatLoopback(), Multiplexer::Close(), MixStreamData::Close(), CatStreamData::Close(), Multiplexer::GetNewCatStream(), Multiplexer::GetNewMixStream(), Multiplexer::GetOrCreateCatStreamData(), Multiplexer::GetOrCreateMixStreamData(), MixStreamData::GetWriters(), CatStreamData::GetWriters(), and Multiplexer::MixLoopback().
|
private |
number of parallel recv requests
Definition at line 173 of file multiplexer.hpp.
Referenced by Multiplexer::AsyncReadMultiplexerHeader(), and Multiplexer::Multiplexer().
|
private |
Calculated send queue size limit for StreamData semaphores.
Definition at line 176 of file multiplexer.hpp.
Referenced by Multiplexer::IntGetOrCreateCatStreamData(), Multiplexer::IntGetOrCreateMixStreamData(), and Multiplexer::Multiplexer().
|
private |
Number of workers per host.
Definition at line 164 of file multiplexer.hpp.
Referenced by Multiplexer::IntGetOrCreateCatStreamData(), Multiplexer::IntGetOrCreateMixStreamData(), Multiplexer::num_workers(), and Multiplexer::workers_per_host().