23 #include <unordered_map> 38 template <
typename Object>
46 explicit Repository(
size_t num_workers_per_node)
47 : next_id_(num_workers_per_node, 0) { }
52 size_t AllocateId(
size_t local_worker_id) {
53 assert(local_worker_id < next_id_.size());
54 return ++next_id_[local_worker_id];
60 template <
typename Subclass = Object,
typename... Types>
62 GetOrCreate(Id object_id, Types&& ... construction) {
63 auto it = map_.find(object_id);
65 if (it != map_.end()) {
66 die_unless(dynamic_cast<Subclass*>(it->second.get()));
68 dynamic_cast<Subclass*
>(it->second.get()));
73 std::forward<Types>(construction) ...);
75 map_.insert(std::make_pair(object_id, ObjectPtr(
value)));
79 template <
typename Sub
class = Object>
81 auto it = map_.find(object_id);
83 if (it != map_.end()) {
84 die_unless(dynamic_cast<Subclass*>(it->second.get()));
86 dynamic_cast<Subclass*
>(it->second.get()));
93 void EraseOrDie(Id object_id) {
94 auto it = map_.find(object_id);
95 if (it != map_.end()) {
103 std::unordered_map<Id, ObjectPtr>& map() {
return map_; }
107 std::vector<size_t> next_id_;
110 std::unordered_map<Id, ObjectPtr> map_;
116 struct Multiplexer::Data {
118 Repository<StreamSetBase> stream_sets_;
121 std::vector<std::atomic<size_t> > ongoing_requests_;
123 explicit Data(
size_t num_hosts,
size_t workers_per_host)
124 : stream_sets_(workers_per_host),
125 ongoing_requests_(num_hosts) { }
130 size_t workers_per_host)
131 : mem_manager_(mem_manager),
132 block_pool_(block_pool),
133 dispatcher_(dispatcher),
135 workers_per_host_(workers_per_host),
136 d_(
std::
make_unique<Data>(group_.num_hosts(), workers_per_host)) {
163 std::unique_lock<std::mutex> lock(
mutex_);
165 if (!
d_->stream_sets_.map().empty()) {
166 LOG1 <<
"Multiplexer::Close()" 167 <<
" remaining_streams=" <<
d_->stream_sets_.map().size();
172 d_->stream_sets_.map().clear();
185 std::unique_lock<std::mutex> lock(
mutex_);
186 return d_->stream_sets_.AllocateId(local_worker_id);
190 size_t id,
size_t local_worker_id,
size_t dia_id) {
191 std::unique_lock<std::mutex> lock(
mutex_);
196 std::unique_lock<std::mutex> lock(
mutex_);
197 return tlx::make_counting<CatStream>(
199 d_->stream_sets_.AllocateId(local_worker_id),
200 local_worker_id, dia_id));
204 size_t id,
size_t local_worker_id,
size_t dia_id) {
211 if (ptr && ptr->dia_id_ == 0)
212 ptr->set_dia_id(dia_id);
217 std::unique_lock<std::mutex> lock(
mutex_);
218 return d_->stream_sets_.AllocateId(local_worker_id);
222 size_t id,
size_t local_worker_id,
size_t dia_id) {
223 std::unique_lock<std::mutex> lock(
mutex_);
228 std::unique_lock<std::mutex> lock(
mutex_);
229 return tlx::make_counting<MixStream>(
231 d_->stream_sets_.AllocateId(local_worker_id),
232 local_worker_id, dia_id));
236 size_t id,
size_t local_worker_id,
size_t dia_id) {
243 if (ptr && ptr->dia_id_ == 0)
244 ptr->set_dia_id(dia_id);
253 sLOG <<
"Multiplexer::IntReleaseCatStream() release" 254 <<
"stream" <<
id <<
"local_worker_id" << local_worker_id;
256 if (set->Release(local_worker_id)) {
257 LOG <<
"Multiplexer::IntReleaseCatStream() destroy stream " << id;
258 d_->stream_sets_.EraseOrDie(
id);
267 sLOG <<
"Multiplexer::IntReleaseMixStream() release" 268 <<
"stream" <<
id <<
"local_worker_id" << local_worker_id;
270 if (set->Release(local_worker_id)) {
271 LOG <<
"Multiplexer::IntReleaseMixStream() destroy stream " << id;
272 d_->stream_sets_.EraseOrDie(
id);
285 uint32_t seq = 42 + (s.
rx_seq_.fetch_add(2) & 0xFFFF);
292 d_->ongoing_requests_[peer]++;
300 d_->ongoing_requests_[peer]--;
303 if (!buffer.IsValid())
return;
308 LOG <<
"OnMultiplexerHeader() header" 309 <<
" magic=" << unsigned(header.
magic)
310 <<
" size=" << header.
size 321 size_t alloc_size = header.
size;
328 sLOG <<
"end of all stream on" << s <<
"CatStream" <<
id 338 for (
size_t sender_worker = 0;
340 if (!stream->is_queue_closed(
342 stream->OnStreamBlock(
349 else if (header.
IsEnd()) {
350 sLOG <<
"end of stream on" << s <<
"in CatStream" <<
id 352 <<
"seq" << header.
seq;
355 id, local_worker, 0);
356 stream->rx_net_bytes_ += buffer.size();
358 stream->OnStreamBlock(
363 id, local_worker, 0);
364 stream->rx_net_bytes_ += buffer.size();
366 sLOG <<
"stream header from" << s <<
"on CatStream" <<
id 368 <<
"for local_worker" << local_worker
369 <<
"seq" << header.
seq 370 <<
"size" << header.
size;
373 alloc_size, local_worker);
374 sLOG <<
"new PinnedByteBlockPtr bytes=" << *
bytes;
376 d_->ongoing_requests_[peer]++;
379 s, seq + 1, header.
size, std::move(bytes),
380 [
this, peer, header, stream](
382 OnCatStreamBlock(peer, s, header, stream, std::move(bytes));
389 sLOG <<
"end of all stream on" << s <<
"MixStream" <<
id 399 for (
size_t sender_worker = 0;
401 if (!stream->is_queue_closed(
403 stream->OnStreamBlock(
410 else if (header.
IsEnd()) {
411 sLOG <<
"end of stream on" << s <<
"in MixStream" <<
id 415 id, local_worker, 0);
416 stream->rx_net_bytes_ += buffer.size();
422 id, local_worker, 0);
423 stream->rx_net_bytes_ += buffer.size();
425 sLOG <<
"stream header from" << s <<
"on MixStream" <<
id 427 <<
"for local_worker" << local_worker
428 <<
"seq" << header.
seq 429 <<
"size" << header.
size;
432 alloc_size, local_worker);
434 d_->ongoing_requests_[peer]++;
437 s, seq + 1, header.
size, std::move(bytes),
438 [
this, peer, header, stream](
440 OnMixStreamBlock(peer, s, header, stream, std::move(bytes));
445 die(
"Invalid magic byte in MultiplexerHeader");
456 d_->ongoing_requests_[peer]--;
458 sLOG <<
"Multiplexer::OnCatStreamBlock()" 459 <<
"got block" << *
bytes <<
"seq" << header.
seq <<
"on" << s
463 stream->OnStreamBlock(
480 d_->ongoing_requests_[peer]--;
482 sLOG <<
"Multiplexer::OnMixStreamBlock()" 483 <<
"got block" << *
bytes <<
"seq" << header.
seq <<
"on" << s
487 stream->OnStreamBlock(
500 size_t stream_id,
size_t to_worker_id) {
501 std::unique_lock<std::mutex> lock(
mutex_);
503 ->Peer(to_worker_id);
507 size_t stream_id,
size_t to_worker_id) {
508 std::unique_lock<std::mutex> lock(
mutex_);
510 ->Peer(to_worker_id);
CatStreamPtr GetNewCatStream(size_t local_worker_id, size_t dia_id)
Request next stream.
Block combines a reference to a read-only ByteBlock and book-keeping information. ...
common::JsonLogger & logger()
Get the JsonLogger from the BlockPool.
#define sLOG
Default logging method: output if the local debug variable is true.
static uint_pair max()
return an uint_pair instance containing the largest value possible
virtual size_t num_parallel_async() const
size_t AllocateMixStreamId(size_t local_worker_id)
Allocate the next stream.
std::mutex mutex_
protects critical sections
size_t hard_ram_limit() noexcept
Hard limit on amount of memory used for ByteBlock.
void IntReleaseCatStream(size_t id, size_t local_worker_id)
release pointer onto a CatStreamData object
size_t default_block_size
default size of blocks in File, Channel, BlockQueue, etc.
CatStreamDataPtr GetOrCreateCatStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Get stream with given id, if it does not exist, create it.
MixStreamDataPtr MixLoopback(size_t stream_id, size_t to_worker_id)
A pinned / pin-counted pointer to a ByteBlock.
MixStreamDataPtr IntGetOrCreateMixStreamData(size_t id, size_t local_worker_id, size_t dia_id)
~Multiplexer()
Closes all client connections.
#define die(msg)
Instead of std::terminate(), throw the output the message via an exception.
virtual Connection & connection(size_t id)=0
Return Connection to client id.
Simple structure that holds a all stream instances for the workers on the local host for a given stre...
#define THRILL_DEFAULT_ALIGN
void OnCatStreamBlock(size_t peer, Connection &s, const StreamMultiplexerHeader &header, const CatStreamDataPtr &stream, PinnedByteBlockPtr &&bytes)
Receives and dispatches a Block to a CatStreamData.
BufferReader represents a BufferRef with an additional cursor with which the memory can be read incre...
static by_string to_string(int val)
convert to string
PinnedByteBlockPtr AllocateByteBlock(size_t size, size_t local_worker_id)
void IntReleaseMixStream(size_t id, size_t local_worker_id)
release pointer onto a MixStream object
virtual void Close()=0
Close.
size_t workers_per_host_
Number of workers per host.
size_t my_host_rank() const
my rank among the hosts.
void OnMixStreamBlock(size_t peer, Connection &s, const StreamMultiplexerHeader &header, const MixStreamDataPtr &stream, PinnedByteBlockPtr &&bytes)
Receives and dispatches a Block to a MixStream.
size_t num_parallel_async_
number of parallel recv requests
Pool to allocate, keep, swap out/in, and free all ByteBlocks on the host.
void AsyncRead(Connection &c, uint32_t seq, size_t size, const AsyncReadCallback &done_cb)
asynchronously read n bytes and deliver them to the callback
CatStreamDataPtr IntGetOrCreateCatStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Multiplexer(mem::Manager &mem_manager, BlockPool &block_pool, net::DispatcherThread &dispatcher, net::Group &group, size_t workers_per_host)
A Connection represents a link to another peer in a network group.
unique_ptr< T > make_unique(Manager &manager, Args &&... args)
make_unique with Manager tracking
std::atomic< uint32_t > rx_seq_
receive sequence
net::Group & group_
Holds NetConnections for outgoing Streams.
size_t workers_per_host() const
number of workers per host
size_t send_size_limit_
Calculated send queue size limit for StreamData semaphores.
MixStreamPtr GetNewMixStream(size_t local_worker_id, size_t dia_id)
Request next stream.
common::JsonLogger & logger()
Returns logger_.
CatStreamDataPtr CatLoopback(size_t stream_id, size_t to_worker_id)
High-performance smart pointer used as a wrapping reference counting pointer.
size_t AllocateCatStreamId(size_t local_worker_id)
Allocate the next stream.
static const size_t bytes
number of bytes in uint_pair
std::unique_ptr< Data > d_
pimpl data structure
A network Group is a collection of enumerated communication links, which provides point-to-point comm...
net::DispatcherThread & dispatcher_
Object shared by allocators and other classes to track memory allocations.
Simple buffer of characters without initialization or growing functionality.
void OnMultiplexerHeader(size_t peer, uint32_t seq, Connection &s, net::Buffer &&buffer)
static int round_up_to_power_of_two(int i)
does what it says: round up to next power of two
JsonLogger is a receiver of JSON output objects for logging.
void AsyncReadMultiplexerHeader(size_t peer, Connection &s)
BlockPool & block_pool_
reference to host-global BlockPool.
void Close()
Closes all client connections.
#define LOG
Default logging method: output if the local debug variable is true.
size_t my_host_rank() const
Return our rank among hosts in this group.
virtual size_t num_hosts() const =0
Return number of connections in this group (= number computing hosts)
DispatcherThread contains a net::Dispatcher object and an associated thread that runs in the dispatch...
MixStreamDataPtr GetOrCreateMixStreamData(size_t id, size_t local_worker_id, size_t dia_id)
Get stream with given id, if it does not exist, create it.