18 #include <condition_variable> 33 class Connection::Data
41 std::condition_variable cv_;
44 std::set<Dispatcher*> watcher_;
47 using DataQueue = std::deque<net::Buffer>;
54 d_ = std::make_unique<Data>();
61 std::unique_lock<std::mutex> lock(
d_->mutex_);
62 d_->inbound_.emplace_back(std::move(msg));
73 return os <<
"[mock::Connection" 91 std::unique_lock<std::mutex> lock(
d_->mutex_);
92 while (
d_->inbound_.empty())
95 d_->inbound_.pop_front();
107 char* out_cdata =
reinterpret_cast<char*
>(out_data);
108 std::copy(msg.
begin(), msg.
end(), out_cdata);
117 void* recv_data,
size_t recv_size) {
123 void* recv_data,
size_t recv_size) {
132 : net::
Group(my_rank) {
133 peers_.resize(group_size);
137 for (
size_t i = 0; i < group_size; ++i)
150 assert(peer <
peers_.size());
158 return std::make_unique<Dispatcher>();
161 std::vector<std::unique_ptr<Group> >
164 std::vector<std::unique_ptr<Group> > groups(num_hosts);
167 for (
size_t i = 0; i < groups.size(); ++i) {
168 groups[i] = std::make_unique<Group>(i,
num_hosts);
172 for (
size_t i = 0; i < groups.size(); ++i) {
173 for (
size_t j = 0; j < groups.size(); ++j) {
174 groups[i]->peers_[j] = groups[j].get();
189 assert(tgt <
peers_.size());
201 class Dispatcher::Data
210 using Map = std::map<Connection*, Watch>;
216 class Dispatcher::Watch
222 std::deque<Callback, mem::GPoolAllocator<Callback> > read_cb, write_cb;
237 assert(dynamic_cast<Connection*>(&_c));
240 std::unique_lock<std::mutex> d_lock(
d_->mutex_);
242 w.read_cb.emplace_back(read_cb);
244 std::unique_lock<std::mutex> c_lock(c.
d_->mutex_);
245 c.
d_->watcher_.insert(
this);
248 if (c.
d_->inbound_.size())
254 assert(dynamic_cast<Connection*>(&_c));
257 std::unique_lock<std::mutex> d_lock(
d_->mutex_);
259 w.write_cb.emplace_back(write_cb);
261 std::unique_lock<std::mutex> c_lock(c.
d_->mutex_);
262 c.
d_->watcher_.insert(
this);
274 d_->notify_.emplace(c);
282 Data::Map::iterator it =
d_->map_.find(c);
283 if (it ==
d_->map_.end())
284 it =
d_->map_.emplace(c, Watch()).first;
291 if (!
d_->notify_.pop_for(c, timeout)) {
292 sLOG <<
"DispatchOne timeout";
297 sLOG <<
"DispatchOne interrupt";
301 sLOG <<
"DispatchOne run";
303 std::unique_lock<std::mutex> d_lock(
d_->mutex_);
305 Data::Map::iterator it =
d_->map_.find(c);
306 if (it ==
d_->map_.end()) {
307 sLOG <<
"DispatchOne expired connection?";
311 Watch& w = it->second;
314 std::unique_lock<std::mutex> c_lock(c->
d_->mutex_);
317 if (w.read_cb.size() && c->
d_->inbound_.size()) {
319 while (c->
d_->inbound_.size() && w.read_cb.size()) {
325 ret = w.read_cb.front()();
327 catch (std::exception& e) {
328 LOG1 <<
"Dispatcher: exception " <<
typeid(e).name()
329 <<
"in read callback.";
330 LOG1 <<
" what(): " << e.what();
338 w.read_cb.pop_front();
341 if (w.read_cb.size() == 0 && w.write_cb.size() == 0) {
343 c->
d_->watcher_.erase(
this);
350 if (w.write_cb.size()) {
352 while (w.write_cb.size()) {
358 ret = w.write_cb.front()();
360 catch (std::exception& e) {
361 LOG1 <<
"Dispatcher: exception " <<
typeid(e).name()
362 <<
"in write callback.";
363 LOG1 <<
" what(): " << e.what();
371 w.write_cb.pop_front();
374 if (w.read_cb.size() == 0 && w.write_cb.size() == 0) {
376 c->
d_->watcher_.erase(
this);
void Cancel(net::Connection &) final
Cancel all callbacks on a given connection.
void DispatchOne(const std::chrono::milliseconds &timeout) final
#define sLOG
Default logging method: output if the local debug variable is true.
size_t peer_
Outgoing peer id of this Connection.
The central object of a mock network: the Group containing links to other mock Group forming the netw...
std::atomic< size_t > tx_bytes_
sent bytes
std::unique_ptr< net::Dispatcher > ConstructDispatcher() const final
ssize_t RecvOne(void *out_data, size_t size) final
size_t my_rank_
our rank in the network group
Connection * conns_
vector of virtual connection objects to remote peers
~Group()
virtual destructor
std::string ToString() const final
return a string representation of this connection, for user output.
void SyncRecv(void *out_data, size_t size) final
A virtual connection through the mock network: each Group has p Connections to its peers...
ssize_t SendOne(const void *data, size_t size, Flags flags=NoFlags) final
iterator begin() noexcept
return mutable iterator to first element
A virtual Dispatcher which waits for messages to arrive in the mock network.
std::unique_ptr< Data > d_
pimpl data struct with complex components
static by_string to_string(int val)
convert to string
static std::string MaybeHexdump(const void *data, size_t size)
return hexdump or just [data] if not debugging
iterator end() noexcept
return mutable iterator beyond last element
static constexpr bool debug_data
static constexpr bool debug
void SyncSendRecv(const void *send_data, size_t send_size, void *recv_data, size_t recv_size) final
#define die_unequal(X, Y)
void AddRead(net::Connection &_c, const Callback &read_cb) final
Register a buffered read callback and a default exception callback.
std::unique_ptr< Data > d_
pimpl data struct with complex components
A Connection represents a link to another peer in a network group.
unique_ptr< T > make_unique(Manager &manager, Args &&... args)
make_unique with Manager tracking
This is a queue, similar to std::queue and tbb::concurrent_bounded_queue, except that it uses mutexes...
AsyncCallback Callback
type for file descriptor readiness callbacks
std::ostream & OutputOstream(std::ostream &os) const final
virtual method to output to a std::ostream
static std::vector< std::unique_ptr< Group > > ConstructLoopbackMesh(size_t num_hosts)
Construct a mock network with num_hosts peers and deliver Group contexts for each of them...
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Watch & GetWatch(Connection *c)
lookup method
void Initialize(Group *group, size_t peer)
construct from mock::Group
size_t num_hosts() const final
Return number of connections in this group (= number computing hosts)
void SyncSend(const void *data, size_t size, Flags=NoFlags) final
void InboundMsg(net::Buffer &&msg)
Method which is called by other peers to enqueue a message.
std::vector< Group * > peers_
vector of peers for delivery of messages.
void Send(size_t tgt, net::Buffer &&msg)
Send a buffer to peer tgt. Blocking, ... sort of.
Simple buffer of characters without initialization or growing functionality.
void AddWrite(net::Connection &_c, const Callback &write_cb) final
Register a buffered write callback and a default exception callback.
std::string hexdump(const void *const data, size_t size)
Dump a (binary) string as a sequence of uppercase hexadecimal pairs.
net::Buffer RecvNext()
some-what internal function to extract the next packet from the queue.
static void Initialize()
run MPI_Init() if not already done (can be called multiple times).
Flags
Additional flags for sending or receiving.
net::Connection & connection(size_t peer) final
Return Connection to client id.
void SyncRecvSend(const void *send_data, size_t send_size, void *recv_data, size_t recv_size) final
Group(size_t my_rank, size_t group_size)
Initialize a Group for the given size rank.
Group * group_
Reference to our group.
void Notify(Connection *c)
~Dispatcher()
virtual destructor
size_type size() const noexcept
return number of items in Buffer
void Interrupt() final
Interrupt current dispatch.
std::atomic< size_t > rx_bytes_
received bytes