23 std::unique_ptr<class Dispatcher> dispatcher,
size_t host_rank)
24 : dispatcher_(
std::move(dispatcher)),
25 host_rank_(host_rank) {
46 Enqueue([
this, cb = std::move(cb)]() {
93 assert(block.valid());
94 Enqueue([=, &c, b = std::move(block)]()
mutable {
95 dispatcher_->AsyncRead(c, seq, size, std::move(b), done_cb);
103 Enqueue([=, &c, b = std::move(buffer)]()
mutable {
104 dispatcher_->AsyncWrite(c, seq, std::move(b), done_cb);
112 assert(block.IsValid());
115 b1 = std::move(buffer), b2 = std::move(block)]()
mutable {
117 dispatcher_->AsyncWrite(c, seq + 1, std::move(b2), done_cb);
123 Connection& c, uint32_t seq,
const void* buffer,
size_t size,
171 LOG <<
"DispatcherThread finished.";
DispatcherThread(std::unique_ptr< class Dispatcher > dispatcher, size_t host_rank)
void Work()
What happens in the dispatcher thread.
void AddWrite(Connection &c, const AsyncCallback &write_cb)
Register a buffered write callback and a default exception callback.
void WakeUpThread()
wake up select() in dispatching thread.
std::unique_ptr< class Dispatcher > dispatcher_
enclosed dispatcher.
A pinned / pin-counted pointer to a ByteBlock.
void NameThisThread(const std::string &name)
Defines a name for the current thread, only if no name was set previously.
static by_string to_string(int val)
convert to string
void AddTimer(std::chrono::milliseconds timeout, const TimerCallback &cb)
Register a relative timeout callback.
std::atomic< bool > terminate_
termination flag
void Terminate()
Terminate the dispatcher thread (if now already done).
void AsyncRead(Connection &c, uint32_t seq, size_t size, const AsyncReadCallback &done_cb)
asynchronously read n bytes and deliver them to the callback
A Connection represents a link to another peer in a network group.
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
void AsyncWriteCopy(Connection &c, uint32_t seq, const void *buffer, size_t size, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
void Cancel(Connection &c)
Cancel all callbacks on a given connection.
void AsyncWrite(Connection &c, uint32_t seq, Buffer &&buffer, const AsyncWriteCallback &done_cb=AsyncWriteCallback())
A pinned / pin-counted derivative of a Block.
void AddRead(Connection &c, const AsyncCallback &read_cb)
Register a buffered read callback and a default exception callback.
Simple buffer of characters without initialization or growing functionality.
size_t host_rank_
for thread name for logging
std::atomic< bool > busy_
whether to call Interrupt() in WakeUpThread()
void Enqueue(Job &&job)
Enqueue job in queue for dispatching thread to run at its discretion.
common::ConcurrentQueue< Job, mem::GPoolAllocator< Job > > jobqueue_
Queue of jobs to be run by dispatching thread at its discretion.
#define LOG
Default logging method: output if the local debug variable is true.
void SetCpuAffinity(std::thread &thread, size_t cpu_id)
set cpu/core affinity of a thread
void RunInThread(const AsyncDispatcherThreadCallback &cb)
Run generic callback in dispatcher thread to enqueue stuff.
std::thread thread_
thread of dispatcher