15 #ifndef THRILL_NET_CONNECTION_HEADER 16 #define THRILL_NET_CONNECTION_HEADER 70 return static_cast<Flags>(
71 static_cast<size_t>(a) | static_cast<size_t>(b));
78 virtual bool IsValid()
const = 0;
84 virtual std::ostream&
OutputOstream(std::ostream& os)
const = 0;
93 virtual void SyncSend(
const void* data,
size_t size,
98 virtual ssize_t
SendOne(
const void* data,
size_t size,
103 template <
typename T>
106 if (self_verify_ && is_loopback_) {
108 size_t hash_code =
typeid(
T).hash_code();
109 SyncSend(&hash_code,
sizeof(hash_code));
117 template <
typename T>
118 typename std::enable_if<
122 if (self_verify_ && is_loopback_) {
124 size_t hash_code =
typeid(
T).hash_code();
125 SyncSend(&hash_code,
sizeof(hash_code));
128 static constexpr
size_t fixed_size
130 if (fixed_size < 2 * 1024 * 1024) {
135 assert(fb.size() == fixed_size);
148 template <
typename T>
149 typename std::enable_if<
151 !data::Serialization<BufferBuilder, T>::is_fixed_size,
void>::type
153 if (self_verify_ && is_loopback_) {
155 size_t hash_code =
typeid(
T).hash_code();
156 SyncSend(&hash_code,
sizeof(hash_code));
161 size_t size = bb.
size();
175 virtual void SyncRecv(
void* out_data,
size_t size) = 0;
179 virtual ssize_t
RecvOne(
void* out_data,
size_t size) = 0;
182 template <
typename T>
185 if (self_verify_ && is_loopback_) {
188 SyncRecv(&hash_code,
sizeof(hash_code));
189 if (hash_code !=
typeid(
T).hash_code()) {
190 throw std::runtime_error(
191 "Connection::Receive() attempted to receive item " 192 "with different typeid!");
196 SyncRecv(out_value,
sizeof(*out_value));
200 template <
typename T>
201 typename std::enable_if<
203 data::Serialization<BufferBuilder, T>::is_fixed_size,
void>::type
205 if (self_verify_ && is_loopback_) {
208 SyncRecv(&hash_code,
sizeof(hash_code));
209 if (hash_code !=
typeid(
T).hash_code()) {
210 throw std::runtime_error(
211 "Connection::Receive() attempted to receive item " 212 "with different typeid!");
216 static constexpr
size_t fixed_size
218 if (fixed_size < 2 * 1024 * 1024) {
220 std::array<uint8_t, fixed_size> b;
235 template <
typename T>
236 typename std::enable_if<
238 !data::Serialization<BufferBuilder, T>::is_fixed_size,
void>::type
240 if (self_verify_ && is_loopback_) {
243 SyncRecv(&hash_code,
sizeof(hash_code));
244 if (hash_code !=
typeid(
T).hash_code()) {
245 throw std::runtime_error(
246 "Connection::Receive() attempted to receive item " 247 "with different typeid!");
269 virtual void SyncSendRecv(
const void* send_data,
size_t send_size,
270 void* recv_data,
size_t recv_size) = 0;
271 virtual void SyncRecvSend(
const void* send_data,
size_t send_size,
272 void* recv_data,
size_t recv_size) = 0;
275 template <
typename T>
278 if (self_verify_ && is_loopback_) {
280 size_t send_hash_code =
typeid(
T).hash_code(), recv_hash_code;
282 &recv_hash_code,
sizeof(recv_hash_code));
283 if (recv_hash_code !=
typeid(
T).hash_code()) {
284 throw std::runtime_error(
285 "Connection::SendReceive() attempted to receive item " 286 "with different typeid!");
294 template <
typename T>
297 if (self_verify_ && is_loopback_) {
299 size_t send_hash_code =
typeid(
T).hash_code(), recv_hash_code;
301 &recv_hash_code,
sizeof(recv_hash_code));
302 if (recv_hash_code !=
typeid(
T).hash_code()) {
303 throw std::runtime_error(
304 "Connection::SendReceive() attempted to receive item " 305 "with different typeid!");
309 SyncRecvSend(&value,
sizeof(value), out_value,
sizeof(*out_value));
313 template <
typename T>
314 typename std::enable_if<
316 data::Serialization<BufferBuilder, T>::is_fixed_size,
void>::type
318 if (self_verify_ && is_loopback_) {
320 size_t send_hash_code =
typeid(
T).hash_code(), recv_hash_code;
322 &recv_hash_code,
sizeof(recv_hash_code));
323 if (recv_hash_code !=
typeid(
T).hash_code()) {
324 throw std::runtime_error(
325 "Connection::SendReceive() attempted to receive item " 326 "with different typeid!");
332 for (
size_t i = 0; i < n; ++i) {
335 Buffer recvb(n * data::Serialization<BufferBuilder, T>::fixed_size);
339 for (
size_t i = 0; i < n; ++i) {
344 template <
typename T>
345 typename std::enable_if<
347 data::Serialization<BufferBuilder, T>::is_fixed_size,
void>::type
349 if (self_verify_ && is_loopback_) {
351 size_t send_hash_code =
typeid(
T).hash_code(), recv_hash_code;
353 &recv_hash_code,
sizeof(recv_hash_code));
354 if (recv_hash_code !=
typeid(
T).hash_code()) {
355 throw std::runtime_error(
356 "Connection::SendReceive() attempted to receive item " 357 "with different typeid!");
362 static constexpr
size_t fixed_size
364 if (fixed_size < 2 * 1024 * 1024) {
369 assert(sendb.size() == fixed_size);
370 std::array<uint8_t, fixed_size> recvb;
372 recvb.data(), recvb.size());
389 template <
typename T>
390 typename std::enable_if<
392 !data::Serialization<BufferBuilder, T>::is_fixed_size,
void>::type
394 if (self_verify_ && is_loopback_) {
396 size_t send_hash_code =
typeid(
T).hash_code(), recv_hash_code;
398 &recv_hash_code,
sizeof(recv_hash_code));
399 if (recv_hash_code !=
typeid(
T).hash_code()) {
400 throw std::runtime_error(
401 "Connection::SendReceive() attempted to receive item " 402 "with different typeid!");
407 for (
size_t i = 0; i < n; ++i) {
410 size_t send_size = sendb.
size(), recv_size;
412 &recv_size,
sizeof(recv_size));
416 recvb.
data(), recv_size);
418 for (
size_t i = 0; i < n; ++i) {
423 template <
typename T>
424 typename std::enable_if<
426 !data::Serialization<BufferBuilder, T>::is_fixed_size,
void>::type
428 if (self_verify_ && is_loopback_) {
430 size_t send_hash_code =
typeid(
T).hash_code(), recv_hash_code;
432 &recv_hash_code,
sizeof(recv_hash_code));
433 if (recv_hash_code !=
typeid(
T).hash_code()) {
434 throw std::runtime_error(
435 "Connection::SendReceive() attempted to receive item " 436 "with different typeid!");
442 size_t send_size = sendb.
size(), recv_size;
444 &recv_size,
sizeof(recv_size));
448 recvb.
data(), recv_size);
460 template <
typename T>
463 if (self_verify_ && is_loopback_) {
465 size_t hash_code =
typeid(
T).hash_code();
466 SyncSend(&hash_code,
sizeof(hash_code));
474 template <
typename T>
475 typename std::enable_if<
477 data::Serialization<BufferBuilder, T>::is_fixed_size,
void>::type
479 if (self_verify_ && is_loopback_) {
481 size_t hash_code =
typeid(
T).hash_code();
482 SyncSend(&hash_code,
sizeof(hash_code));
485 static constexpr
size_t fixed_size
488 for (
size_t i = 0; i < n; ++i) {
496 template <
typename T>
497 typename std::enable_if<
499 !data::Serialization<BufferBuilder, T>::is_fixed_size,
void>::type
501 if (self_verify_ && is_loopback_) {
503 size_t hash_code =
typeid(
T).hash_code();
504 SyncSend(&hash_code,
sizeof(hash_code));
508 for (
size_t i = 0; i < n; ++i) {
511 size_t size = bb.
size();
522 template <
typename T>
525 if (self_verify_ && is_loopback_) {
528 SyncRecv(&hash_code,
sizeof(hash_code));
529 if (hash_code !=
typeid(
T).hash_code()) {
530 throw std::runtime_error(
531 "Connection::ReceiveN() attempted to receive item " 532 "with different typeid!");
540 template <
typename T>
541 typename std::enable_if<
543 data::Serialization<BufferBuilder, T>::is_fixed_size,
void>::type
545 if (self_verify_ && is_loopback_) {
548 SyncRecv(&hash_code,
sizeof(hash_code));
549 if (hash_code !=
typeid(
T).hash_code()) {
550 throw std::runtime_error(
551 "Connection::ReceiveN() attempted to receive item " 552 "with different typeid!");
559 for (
size_t i = 0; i < n; ++i) {
565 template <
typename T>
566 typename std::enable_if<
568 !data::Serialization<BufferBuilder, T>::is_fixed_size,
void>::type
570 if (self_verify_ && is_loopback_) {
573 SyncRecv(&hash_code,
sizeof(hash_code));
574 if (hash_code !=
typeid(
T).hash_code()) {
575 throw std::runtime_error(
576 "Connection::ReceiveN() attempted to receive item " 577 "with different typeid!");
587 for (
size_t i = 0; i < n; ++i) {
639 #endif // !THRILL_NET_CONNECTION_HEADER size_t size() const
Return the currently used length in bytes.
virtual bool IsValid() const =0
check whether the connection is (still) valid.
std::enable_if< std::is_pod< T >::value, void >::type ReceiveSend(const T &value, T *out_value)
std::atomic< size_t > tx_bytes_
sent bytes
size_t prev_tx_bytes_
previous read of sent bytes
virtual std::string ToString() const =0
return a string representation of this connection, for user output.
std::enable_if< !std::is_pod< T >::value &&!data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type ReceiveN(T *out_value, size_t n)
Receive an array of serializable non-POD fixed-length items T.
virtual std::ostream & OutputOstream(std::ostream &os) const =0
virtual method to output to a std::ostream
std::enable_if< !std::is_pod< T >::value &&!data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type SendN(const T *value, size_t n)
std::enable_if< std::is_pod< T >::value, void >::type Send(const T &value)
static constexpr bool g_self_verify
BufferReader represents a BufferRef with an additional cursor with which the memory can be read incre...
std::enable_if< !std::is_pod< T >::value &&data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type Send(const T &value)
const Byte * data() const
Return a pointer to the currently kept memory area.
iterator data() noexcept
return iterator to beginning of Buffer
std::enable_if< !std::is_pod< T >::value &&data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type ReceiveSend(const T &value, T *out_value)
virtual void SyncSendRecv(const void *send_data, size_t send_size, void *recv_data, size_t recv_size)=0
virtual ssize_t RecvOne(void *out_data, size_t size)=0
std::enable_if< !std::is_pod< T >::value &&data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type SendN(const T *value, size_t n)
Represents a FIXED length area of memory, which can be modified by appending integral data types via ...
virtual void SyncSend(const void *data, size_t size, Flags flags=NoFlags)=0
std::enable_if< std::is_pod< T >::value, void >::type SendN(const T *value, size_t n)
A Connection represents a link to another peer in a network group.
std::enable_if< !std::is_pod< T >::value &&data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type SendReceive(const T *value, T *out_value, size_t n=1)
SendReceive any serializable non-POD fixed-length item T.
std::atomic< uint32_t > rx_seq_
receive sequence
std::enable_if< !std::is_pod< T >::value &&!data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type ReceiveSend(const T &value, T *out_value)
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
std::enable_if< !std::is_pod< T >::value &&!data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type Receive(T *out_value)
Receive any serializable non-POD fixed-length item T.
std::enable_if< !std::is_pod< T >::value &&data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type ReceiveN(T *out_value, size_t n)
Receive an array of serializable non-POD fixed-length items T.
std::enable_if< std::is_pod< T >::value, void >::type Receive(T *out_value)
Receive any serializable POD item T.
std::enable_if< !std::is_pod< T >::value &&data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type Receive(T *out_value)
Receive any serializable non-POD fixed-length item T.
friend Flags operator|(const Flags &a, const Flags &b)
operator to combine flags
friend std::ostream & operator<<(std::ostream &os, const Connection &c)
}
BufferBuilder represents a dynamically growable area of memory, which can be modified by appending in...
virtual void SyncRecvSend(const void *send_data, size_t send_size, void *recv_data, size_t recv_size)=0
std::enable_if< !std::is_pod< T >::value &&!data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type SendReceive(const T *value, T *out_value, size_t n=1)
SendReceive any serializable non-POD fixed-length item T.
size_t prev_rx_bytes_
previous read of received bytes
std::enable_if< !std::is_pod< T >::value &&!data::Serialization< BufferBuilder, T >::is_fixed_size, void >::type Send(const T &value)
std::atomic< uint32_t > tx_seq_
send sequence
Simple buffer of characters without initialization or growing functionality.
static constexpr bool self_verify_
std::enable_if< std::is_pod< T >::value, void >::type SendReceive(const T *value, T *out_value, size_t n=1)
SendReceive any serializable POD item T.
virtual void SyncRecv(void *out_data, size_t size)=0
std::atomic< size_t > tx_active_
active send requests
Flags
Additional flags for sending or receiving.
std::enable_if< std::is_pod< T >::value, void >::type ReceiveN(T *out_value, size_t n)
Receive an array of serializable POD items T.
virtual ssize_t SendOne(const void *data, size_t size, Flags flags=NoFlags)=0
size_type size() const noexcept
return number of items in Buffer
std::atomic< size_t > rx_active_
active recv requests
std::atomic< size_t > rx_bytes_
received bytes