Thrill  0.1
connection.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/connection.hpp
3  *
4  * Contains net::Connection, a richer set of network point-to-point primitives.
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Timo Bingmann <[email protected]>
9  * Copyright (C) 2015 Emanuel Jöbstl <[email protected]>
10  *
11  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
12  ******************************************************************************/
13 
14 #pragma once
15 #ifndef THRILL_NET_CONNECTION_HEADER
16 #define THRILL_NET_CONNECTION_HEADER
17 
18 #include <thrill/common/config.hpp>
19 #include <thrill/common/logger.hpp>
24 #include <thrill/net/exception.hpp>
26 
27 #include <array>
28 #include <cassert>
29 #include <cerrno>
30 #include <cstdio>
31 #include <cstring>
32 #include <stdexcept>
33 #include <string>
34 
35 namespace thrill {
36 namespace net {
37 
38 //! \addtogroup net_layer
39 //! \{
40 
41 /*!
42  * A Connection represents a link to another peer in a network group. The link
43  * need not be an actual stateful TCP connection, but may be reliable and
44  * stateless.
45  *
46  * The Connection class is abstract, and subclasses must exist for every network
47  * implementation.
48  */
50 {
51 public:
52  //! flag which enables transmission of verification bytes for debugging,
53  //! this increases network volume.
54  static constexpr bool self_verify_ = common::g_self_verify;
55 
56  //! typeid().hash_code() is only guaranteed to be equal for the same program
57  //! run, hence, we can only use it on loopback networks.
58  bool is_loopback_ = false;
59 
60  //! Additional flags for sending or receiving.
61  enum Flags : size_t {
62  NoFlags = 0,
63  //! indicate that more data is coming, hence, sending a packet may be
64  //! delayed. currently only applies to TCP.
65  MsgMore = 1
66  };
67 
68  //! operator to combine flags
69  friend inline Flags operator | (const Flags& a, const Flags& b) {
70  return static_cast<Flags>(
71  static_cast<size_t>(a) | static_cast<size_t>(b));
72  }
73 
74  //! \name Base Status Functions
75  //! \{
76 
77  //! check whether the connection is (still) valid.
78  virtual bool IsValid() const = 0;
79 
80  //! return a string representation of this connection, for user output.
81  virtual std::string ToString() const = 0;
82 
83  //! virtual method to output to a std::ostream
84  virtual std::ostream& OutputOstream(std::ostream& os) const = 0;
85 
86  //! \}
87 
88  //! \name Send Functions
89  //! \{
90 
91  //! Synchronous blocking send of the (data,size) packet. if sending fails, a
92  //! net::Exception is thrown.
93  virtual void SyncSend(const void* data, size_t size,
94  Flags flags = NoFlags) = 0;
95 
96  //! Non-blocking send of a (data,size) message. returns number of bytes
97  //! possible to send. check errno for errors.
98  virtual ssize_t SendOne(const void* data, size_t size,
99  Flags flags = NoFlags) = 0;
100 
101  //! Send any serializable POD item T. if sending fails, a net::Exception is
102  //! thrown.
103  template <typename T>
104  typename std::enable_if<std::is_pod<T>::value, void>::type
105  Send(const T& value) {
106  if (self_verify_ && is_loopback_) {
107  // for communication verification, send hash_code.
108  size_t hash_code = typeid(T).hash_code();
109  SyncSend(&hash_code, sizeof(hash_code));
110  }
111  // send PODs directly from memory.
112  SyncSend(&value, sizeof(value));
113  }
114 
115  //! Send any serializable non-POD fixed-length item T. if sending fails, a
116  //! net::Exception is thrown.
117  template <typename T>
118  typename std::enable_if<
121  Send(const T& value) {
122  if (self_verify_ && is_loopback_) {
123  // for communication verification, send hash_code.
124  size_t hash_code = typeid(T).hash_code();
125  SyncSend(&hash_code, sizeof(hash_code));
126  }
127  // fixed_size items can be sent without size header
128  static constexpr size_t fixed_size
130  if (fixed_size < 2 * 1024 * 1024) {
131  // allocate buffer on stack (no allocation)
132  using FixedBuilder = FixedBufferBuilder<fixed_size>;
133  FixedBuilder fb;
135  assert(fb.size() == fixed_size);
136  SyncSend(fb.data(), fb.size());
137  }
138  else {
139  // too big, use heap allocation
140  BufferBuilder bb;
142  SyncSend(bb.data(), bb.size());
143  }
144  }
145 
146  //! Send any serializable non-POD variable-length item T. if sending fails,
147  //! a net::Exception is thrown.
148  template <typename T>
149  typename std::enable_if<
151  !data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
152  Send(const T& value) {
153  if (self_verify_ && is_loopback_) {
154  // for communication verification, send hash_code.
155  size_t hash_code = typeid(T).hash_code();
156  SyncSend(&hash_code, sizeof(hash_code));
157  }
158  // variable length items must be prefixed with size header
159  BufferBuilder bb;
161  size_t size = bb.size();
162  SyncSend(&size, sizeof(size), MsgMore);
163  SyncSend(bb.data(), bb.size());
164  }
165 
166  //! \}
167 
168  //! \name Receive Functions
169  //! \{
170 
171  //! Synchronous blocking receive a message of given size. The size must
172  //! match the SyncSend size for some network layers may only support
173  //! matching messages (read: RDMA!, but also true for the mock net). Throws
174  //! a net::Exception on errors.
175  virtual void SyncRecv(void* out_data, size_t size) = 0;
176 
177  //! Non-blocking receive of at most size data. returns number of bytes
178  //! actually received. check errno for errors.
179  virtual ssize_t RecvOne(void* out_data, size_t size) = 0;
180 
181  //! Receive any serializable POD item T.
182  template <typename T>
183  typename std::enable_if<std::is_pod<T>::value, void>::type
184  Receive(T* out_value) {
185  if (self_verify_ && is_loopback_) {
186  // for communication verification, receive hash_code.
187  size_t hash_code;
188  SyncRecv(&hash_code, sizeof(hash_code));
189  if (hash_code != typeid(T).hash_code()) {
190  throw std::runtime_error(
191  "Connection::Receive() attempted to receive item "
192  "with different typeid!");
193  }
194  }
195  // receive PODs directly into memory.
196  SyncRecv(out_value, sizeof(*out_value));
197  }
198 
199  //! Receive any serializable non-POD fixed-length item T.
200  template <typename T>
201  typename std::enable_if<
203  data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
204  Receive(T* out_value) {
205  if (self_verify_ && is_loopback_) {
206  // for communication verification, receive hash_code.
207  size_t hash_code;
208  SyncRecv(&hash_code, sizeof(hash_code));
209  if (hash_code != typeid(T).hash_code()) {
210  throw std::runtime_error(
211  "Connection::Receive() attempted to receive item "
212  "with different typeid!");
213  }
214  }
215  // fixed_size items can be received without size header
216  static constexpr size_t fixed_size
218  if (fixed_size < 2 * 1024 * 1024) {
219  // allocate buffer on stack (no allocation)
220  std::array<uint8_t, fixed_size> b;
221  SyncRecv(b.data(), b.size());
222  BufferReader br(b.data(), b.size());
224  }
225  else {
226  // too big, use heap allocation
228  SyncRecv(b.data(), b.size());
229  BufferReader br(b);
231  }
232  }
233 
234  //! Receive any serializable non-POD fixed-length item T.
235  template <typename T>
236  typename std::enable_if<
238  !data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
239  Receive(T* out_value) {
240  if (self_verify_ && is_loopback_) {
241  // for communication verification, receive hash_code.
242  size_t hash_code;
243  SyncRecv(&hash_code, sizeof(hash_code));
244  if (hash_code != typeid(T).hash_code()) {
245  throw std::runtime_error(
246  "Connection::Receive() attempted to receive item "
247  "with different typeid!");
248  }
249  }
250  // variable length items are prefixed with size header
251  size_t size;
252  SyncRecv(&size, sizeof(size));
253  // receives message
254  Buffer b(size);
255  SyncRecv(b.data(), size);
256  BufferReader br(b);
258  }
259 
260  //! \}
261 
262  //! \name Paired SendReceive Methods
263  //! \{
264 
265  //! Synchronous blocking sending and receive a message of given size. The
266  //! size must match the SyncSendRecv size for some network layers may only
267  //! support matching messages (read: RDMA!, but also true for the mock
268  //! net). Throws a net::Exception on errors.
269  virtual void SyncSendRecv(const void* send_data, size_t send_size,
270  void* recv_data, size_t recv_size) = 0;
271  virtual void SyncRecvSend(const void* send_data, size_t send_size,
272  void* recv_data, size_t recv_size) = 0;
273 
274  //! SendReceive any serializable POD item T.
275  template <typename T>
276  typename std::enable_if<std::is_pod<T>::value, void>::type
277  SendReceive(const T* value, T* out_value, size_t n = 1) {
278  if (self_verify_ && is_loopback_) {
279  // for communication verification, send/receive hash_code.
280  size_t send_hash_code = typeid(T).hash_code(), recv_hash_code;
281  SyncSendRecv(&send_hash_code, sizeof(send_hash_code),
282  &recv_hash_code, sizeof(recv_hash_code));
283  if (recv_hash_code != typeid(T).hash_code()) {
284  throw std::runtime_error(
285  "Connection::SendReceive() attempted to receive item "
286  "with different typeid!");
287  }
288  }
289 
290  // receive PODs directly into memory.
291  SyncSendRecv(value, n * sizeof(T), out_value, n * sizeof(T));
292  }
293 
294  template <typename T>
295  typename std::enable_if<std::is_pod<T>::value, void>::type
296  ReceiveSend(const T& value, T* out_value) {
297  if (self_verify_ && is_loopback_) {
298  // for communication verification, send/receive hash_code.
299  size_t send_hash_code = typeid(T).hash_code(), recv_hash_code;
300  SyncRecvSend(&send_hash_code, sizeof(send_hash_code),
301  &recv_hash_code, sizeof(recv_hash_code));
302  if (recv_hash_code != typeid(T).hash_code()) {
303  throw std::runtime_error(
304  "Connection::SendReceive() attempted to receive item "
305  "with different typeid!");
306  }
307  }
308  // receive PODs directly into memory.
309  SyncRecvSend(&value, sizeof(value), out_value, sizeof(*out_value));
310  }
311 
312  //! SendReceive any serializable non-POD fixed-length item T.
313  template <typename T>
314  typename std::enable_if<
316  data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
317  SendReceive(const T* value, T* out_value, size_t n = 1) {
318  if (self_verify_ && is_loopback_) {
319  // for communication verification, send/receive hash_code.
320  size_t send_hash_code = typeid(T).hash_code(), recv_hash_code;
321  SyncSendRecv(&send_hash_code, sizeof(send_hash_code),
322  &recv_hash_code, sizeof(recv_hash_code));
323  if (recv_hash_code != typeid(T).hash_code()) {
324  throw std::runtime_error(
325  "Connection::SendReceive() attempted to receive item "
326  "with different typeid!");
327  }
328  }
329 
330  // fixed_size items can be sent/recv without size header
332  for (size_t i = 0; i < n; ++i) {
334  }
335  Buffer recvb(n * data::Serialization<BufferBuilder, T>::fixed_size);
336  SyncSendRecv(sendb.data(), sendb.size(),
337  recvb.data(), recvb.size());
338  BufferReader br(recvb);
339  for (size_t i = 0; i < n; ++i) {
341  }
342  }
343 
344  template <typename T>
345  typename std::enable_if<
347  data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
348  ReceiveSend(const T& value, T* out_value) {
349  if (self_verify_ && is_loopback_) {
350  // for communication verification, send/receive hash_code.
351  size_t send_hash_code = typeid(T).hash_code(), recv_hash_code;
352  SyncRecvSend(&send_hash_code, sizeof(send_hash_code),
353  &recv_hash_code, sizeof(recv_hash_code));
354  if (recv_hash_code != typeid(T).hash_code()) {
355  throw std::runtime_error(
356  "Connection::SendReceive() attempted to receive item "
357  "with different typeid!");
358  }
359  }
360 
361  // fixed_size items can be sent/recv without size header
362  static constexpr size_t fixed_size
364  if (fixed_size < 2 * 1024 * 1024) {
365  // allocate buffer on stack (no allocation)
366  using FixedBuilder = FixedBufferBuilder<fixed_size>;
367  FixedBuilder sendb;
369  assert(sendb.size() == fixed_size);
370  std::array<uint8_t, fixed_size> recvb;
371  SyncRecvSend(sendb.data(), sendb.size(),
372  recvb.data(), recvb.size());
373  BufferReader br(recvb.data(), recvb.size());
375  }
376  else {
377  // too big, use heap allocation
378  BufferBuilder sendb;
381  SyncRecvSend(sendb.data(), sendb.size(),
382  recvb.data(), recvb.size());
383  BufferReader br(recvb);
385  }
386  }
387 
388  //! SendReceive any serializable non-POD fixed-length item T.
389  template <typename T>
390  typename std::enable_if<
392  !data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
393  SendReceive(const T* value, T* out_value, size_t n = 1) {
394  if (self_verify_ && is_loopback_) {
395  // for communication verification, send/receive hash_code.
396  size_t send_hash_code = typeid(T).hash_code(), recv_hash_code;
397  SyncSendRecv(&send_hash_code, sizeof(send_hash_code),
398  &recv_hash_code, sizeof(recv_hash_code));
399  if (recv_hash_code != typeid(T).hash_code()) {
400  throw std::runtime_error(
401  "Connection::SendReceive() attempted to receive item "
402  "with different typeid!");
403  }
404  }
405  // variable length items must be prefixed with size header
406  BufferBuilder sendb;
407  for (size_t i = 0; i < n; ++i) {
409  }
410  size_t send_size = sendb.size(), recv_size;
411  SyncSendRecv(&send_size, sizeof(send_size),
412  &recv_size, sizeof(recv_size));
413  // receives message
414  Buffer recvb(recv_size);
415  SyncSendRecv(sendb.data(), sendb.size(),
416  recvb.data(), recv_size);
417  BufferReader br(recvb);
418  for (size_t i = 0; i < n; ++i) {
420  }
421  }
422 
423  template <typename T>
424  typename std::enable_if<
426  !data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
427  ReceiveSend(const T& value, T* out_value) {
428  if (self_verify_ && is_loopback_) {
429  // for communication verification, send/receive hash_code.
430  size_t send_hash_code = typeid(T).hash_code(), recv_hash_code;
431  SyncRecvSend(&send_hash_code, sizeof(send_hash_code),
432  &recv_hash_code, sizeof(recv_hash_code));
433  if (recv_hash_code != typeid(T).hash_code()) {
434  throw std::runtime_error(
435  "Connection::SendReceive() attempted to receive item "
436  "with different typeid!");
437  }
438  }
439  // variable length items must be prefixed with size header
440  BufferBuilder sendb;
442  size_t send_size = sendb.size(), recv_size;
443  SyncRecvSend(&send_size, sizeof(send_size),
444  &recv_size, sizeof(recv_size));
445  // receives message
446  Buffer recvb(recv_size);
447  SyncRecvSend(sendb.data(), sendb.size(),
448  recvb.data(), recv_size);
449  BufferReader br(recvb);
451  }
452 
453  //! \}
454 
455  //! \name SendN Functions
456  //! \{
457 
458  //! Send an array of serializable POD items T. if sending fails, a net::Exception is
459  //! thrown.
460  template <typename T>
461  typename std::enable_if<std::is_pod<T>::value, void>::type
462  SendN(const T* value, size_t n) {
463  if (self_verify_ && is_loopback_) {
464  // for communication verification, send hash_code.
465  size_t hash_code = typeid(T).hash_code();
466  SyncSend(&hash_code, sizeof(hash_code));
467  }
468  // send PODs directly from memory.
469  SyncSend(value, n * sizeof(T));
470  }
471 
472  //! Send an array of serializable non-POD fixed-length items T. if sending fails, a
473  //! net::Exception is thrown.
474  template <typename T>
475  typename std::enable_if<
477  data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
478  SendN(const T* value, size_t n) {
479  if (self_verify_ && is_loopback_) {
480  // for communication verification, send hash_code.
481  size_t hash_code = typeid(T).hash_code();
482  SyncSend(&hash_code, sizeof(hash_code));
483  }
484  // fixed_size items can be sent without size header
485  static constexpr size_t fixed_size
487  BufferBuilder bb(n * fixed_size);
488  for (size_t i = 0; i < n; ++i) {
490  }
491  SyncSend(bb.data(), bb.size());
492  }
493 
494  //! Send an array of serializable non-POD variable-length items T. if sending fails,
495  //! a net::Exception is thrown.
496  template <typename T>
497  typename std::enable_if<
499  !data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
500  SendN(const T* value, size_t n) {
501  if (self_verify_ && is_loopback_) {
502  // for communication verification, send hash_code.
503  size_t hash_code = typeid(T).hash_code();
504  SyncSend(&hash_code, sizeof(hash_code));
505  }
506  // variable length items must be prefixed with size header
507  BufferBuilder bb;
508  for (size_t i = 0; i < n; ++i) {
510  }
511  size_t size = bb.size();
512  SyncSend(&size, sizeof(size), MsgMore);
513  SyncSend(bb.data(), bb.size());
514  }
515 
516  //! \}
517 
518  //! \name ReceiveN Functions
519  //! \{
520 
521  //! Receive an array of serializable POD items T.
522  template <typename T>
523  typename std::enable_if<std::is_pod<T>::value, void>::type
524  ReceiveN(T* out_value, size_t n) {
525  if (self_verify_ && is_loopback_) {
526  // for communication verification, receive hash_code.
527  size_t hash_code;
528  SyncRecv(&hash_code, sizeof(hash_code));
529  if (hash_code != typeid(T).hash_code()) {
530  throw std::runtime_error(
531  "Connection::ReceiveN() attempted to receive item "
532  "with different typeid!");
533  }
534  }
535  // receive PODs directly into memory.
536  SyncRecv(out_value, n * sizeof(T));
537  }
538 
539  //! Receive an array of serializable non-POD fixed-length items T.
540  template <typename T>
541  typename std::enable_if<
543  data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
544  ReceiveN(T* out_value, size_t n) {
545  if (self_verify_ && is_loopback_) {
546  // for communication verification, receive hash_code.
547  size_t hash_code;
548  SyncRecv(&hash_code, sizeof(hash_code));
549  if (hash_code != typeid(T).hash_code()) {
550  throw std::runtime_error(
551  "Connection::ReceiveN() attempted to receive item "
552  "with different typeid!");
553  }
554  }
555  // fixed_size items can be received without size header
557  SyncRecv(b.data(), b.size());
558  BufferReader br(b);
559  for (size_t i = 0; i < n; ++i) {
561  }
562  }
563 
564  //! Receive an array of serializable non-POD fixed-length items T.
565  template <typename T>
566  typename std::enable_if<
568  !data::Serialization<BufferBuilder, T>::is_fixed_size, void>::type
569  ReceiveN(T* out_value, size_t n) {
570  if (self_verify_ && is_loopback_) {
571  // for communication verification, receive hash_code.
572  size_t hash_code;
573  SyncRecv(&hash_code, sizeof(hash_code));
574  if (hash_code != typeid(T).hash_code()) {
575  throw std::runtime_error(
576  "Connection::ReceiveN() attempted to receive item "
577  "with different typeid!");
578  }
579  }
580  // variable length items are prefixed with size header
581  size_t size;
582  SyncRecv(&size, sizeof(size));
583  // receives message
584  Buffer b(size);
585  SyncRecv(b.data(), size);
586  BufferReader br(b);
587  for (size_t i = 0; i < n; ++i) {
589  }
590  }
591 
592  //! \}
593 
594  //! \name Sequence Numbers
595  //! \{
596 
597  //! send sequence
598  std::atomic<uint32_t> tx_seq_ { 0 };
599 
600  //! receive sequence
601  std::atomic<uint32_t> rx_seq_ { 0 };
602 
603  //! \}
604 
605  //! \name Statistics
606  //! {
607 
608  //! sent bytes
609  std::atomic<size_t> tx_bytes_ { 0 };
610 
611  //! received bytes
612  std::atomic<size_t> rx_bytes_ = { 0 };
613 
614  //! previous read of sent bytes
615  size_t prev_tx_bytes_ = 0;
616 
617  //! previous read of received bytes
618  size_t prev_rx_bytes_ = 0;
619 
620  //! active send requests
621  std::atomic<size_t> tx_active_ { 0 };
622 
623  //! active recv requests
624  std::atomic<size_t> rx_active_ = { 0 };
625 
626  //! }
627 
628  //! make ostreamable
629  friend std::ostream& operator << (std::ostream& os, const Connection& c) {
630  return c.OutputOstream(os);
631  }
632 };
633 
634 // \}
635 
636 } // namespace net
637 } // namespace thrill
638 
639 #endif // !THRILL_NET_CONNECTION_HEADER
640 
641 /******************************************************************************/
size_t size() const
Return the currently used length in bytes.
virtual bool IsValid() const =0
check whether the connection is (still) valid.
std::enable_if< std::is_pod< T >::value, void >::type ReceiveSend(const T &value, T *out_value)
Definition: connection.hpp:296
double T
std::atomic< size_t > tx_bytes_
sent bytes
Definition: connection.hpp:609
size_t prev_tx_bytes_
previous read of sent bytes
Definition: connection.hpp:615
virtual std::string ToString() const =0
return a string representation of this connection, for user output.
std::enable_if< !std::is_pod< T >::value &&!data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type ReceiveN(T *out_value, size_t n)
Receive an array of serializable non-POD fixed-length items T.
Definition: connection.hpp:569
virtual std::ostream & OutputOstream(std::ostream &os) const =0
virtual method to output to a std::ostream
std::enable_if< !std::is_pod< T >::value &&!data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type SendN(const T *value, size_t n)
Definition: connection.hpp:500
std::enable_if< std::is_pod< T >::value, void >::type Send(const T &value)
Definition: connection.hpp:105
static constexpr bool g_self_verify
Definition: config.hpp:32
BufferReader represents a BufferRef with an additional cursor with which the memory can be read incre...
std::enable_if< !std::is_pod< T >::value &&data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type Send(const T &value)
Definition: connection.hpp:121
const Byte * data() const
Return a pointer to the currently kept memory area.
iterator data() noexcept
return iterator to beginning of Buffer
Definition: buffer.hpp:134
std::enable_if< !std::is_pod< T >::value &&data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type ReceiveSend(const T &value, T *out_value)
Definition: connection.hpp:348
virtual void SyncSendRecv(const void *send_data, size_t send_size, void *recv_data, size_t recv_size)=0
virtual ssize_t RecvOne(void *out_data, size_t size)=0
std::enable_if< !std::is_pod< T >::value &&data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type SendN(const T *value, size_t n)
Definition: connection.hpp:478
Represents a FIXED length area of memory, which can be modified by appending integral data types via ...
virtual void SyncSend(const void *data, size_t size, Flags flags=NoFlags)=0
int value
Definition: gen_data.py:41
std::enable_if< std::is_pod< T >::value, void >::type SendN(const T *value, size_t n)
Definition: connection.hpp:462
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
std::enable_if< !std::is_pod< T >::value &&data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type SendReceive(const T *value, T *out_value, size_t n=1)
SendReceive any serializable non-POD fixed-length item T.
Definition: connection.hpp:317
std::atomic< uint32_t > rx_seq_
receive sequence
Definition: connection.hpp:601
std::enable_if< !std::is_pod< T >::value &&!data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type ReceiveSend(const T &value, T *out_value)
Definition: connection.hpp:427
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
std::enable_if< !std::is_pod< T >::value &&!data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type Receive(T *out_value)
Receive any serializable non-POD fixed-length item T.
Definition: connection.hpp:239
std::enable_if< !std::is_pod< T >::value &&data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type ReceiveN(T *out_value, size_t n)
Receive an array of serializable non-POD fixed-length items T.
Definition: connection.hpp:544
std::enable_if< std::is_pod< T >::value, void >::type Receive(T *out_value)
Receive any serializable POD item T.
Definition: connection.hpp:184
std::enable_if< !std::is_pod< T >::value &&data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type Receive(T *out_value)
Receive any serializable non-POD fixed-length item T.
Definition: connection.hpp:204
friend Flags operator|(const Flags &a, const Flags &b)
operator to combine flags
Definition: connection.hpp:69
friend std::ostream & operator<<(std::ostream &os, const Connection &c)
}
Definition: connection.hpp:629
BufferBuilder represents a dynamically growable area of memory, which can be modified by appending in...
virtual void SyncRecvSend(const void *send_data, size_t send_size, void *recv_data, size_t recv_size)=0
std::enable_if< !std::is_pod< T >::value &&!data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type SendReceive(const T *value, T *out_value, size_t n=1)
SendReceive any serializable non-POD fixed-length item T.
Definition: connection.hpp:393
size_t prev_rx_bytes_
previous read of received bytes
Definition: connection.hpp:618
std::enable_if< !std::is_pod< T >::value &&!data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type Send(const T &value)
Definition: connection.hpp:152
std::atomic< uint32_t > tx_seq_
send sequence
Definition: connection.hpp:598
Simple buffer of characters without initialization or growing functionality.
Definition: buffer.hpp:40
static constexpr bool self_verify_
Definition: connection.hpp:54
std::enable_if< std::is_pod< T >::value, void >::type SendReceive(const T *value, T *out_value, size_t n=1)
SendReceive any serializable POD item T.
Definition: connection.hpp:277
virtual void SyncRecv(void *out_data, size_t size)=0
std::atomic< size_t > tx_active_
active send requests
Definition: connection.hpp:621
Flags
Additional flags for sending or receiving.
Definition: connection.hpp:61
std::enable_if< std::is_pod< T >::value, void >::type ReceiveN(T *out_value, size_t n)
Receive an array of serializable POD items T.
Definition: connection.hpp:524
virtual ssize_t SendOne(const void *data, size_t size, Flags flags=NoFlags)=0
size_type size() const noexcept
return number of items in Buffer
Definition: buffer.hpp:141
std::atomic< size_t > rx_active_
active recv requests
Definition: connection.hpp:624
std::atomic< size_t > rx_bytes_
received bytes
Definition: connection.hpp:612