Thrill  0.1
socket.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/tcp/socket.hpp
3  *
4  * Lightweight wrapper around BSD socket API.
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Timo Bingmann <[email protected]>
9  *
10  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11  ******************************************************************************/
12 
13 #pragma once
14 #ifndef THRILL_NET_TCP_SOCKET_HEADER
15 #define THRILL_NET_TCP_SOCKET_HEADER
16 
17 #include <thrill/common/logger.hpp>
18 #include <thrill/common/string.hpp>
21 
22 #include <tlx/string/hexdump.hpp>
23 
24 #include <fcntl.h>
25 #include <netinet/in.h>
26 #include <sys/socket.h>
27 #include <unistd.h>
28 
29 #include <cassert>
30 #include <cerrno>
31 #include <cstring>
32 #include <string>
33 #include <utility>
34 
35 namespace thrill {
36 namespace net {
37 namespace tcp {
38 
39 //! \addtogroup net_tcp TCP Socket API
40 //! \ingroup net
41 //! \{
42 
43 /*!
44  * Socket is a light-weight wrapper around the BSD socket API. Functions all
45  * have plain return values and do not through exceptions.
46  *
47  * Not all functions in this class follow the normal naming conventions, because
48  * they are wrappers around the equally named functions of the socket API.
49  */
50 class Socket
51 {
52  static constexpr bool debug = false;
53  static constexpr bool debug_data = false;
54 
55 public:
56  //! \name Creation
57  //! \{
58 
59  //! construct new Socket object from existing file descriptor.
60  explicit Socket(int fd, bool loopback_socket = false)
61  : fd_(fd) {
62  SetNoDelay(true);
63  if (!loopback_socket) {
64  // enable large send and receive kernel buffers
65  SetSndBuf(4 * 1024 * 1024);
66  SetRcvBuf(4 * 1024 * 1024);
67  }
68  }
69 
70  //! default constructor: invalid socket.
71  Socket() : fd_(-1) { }
72 
73  //! non-copyable: delete copy-constructor
74  Socket(const Socket&) = delete;
75  //! non-copyable: delete assignment operator
76  Socket& operator = (const Socket&) = delete;
77  //! move-constructor: move file descriptor
78  Socket(Socket&& s) noexcept : fd_(s.fd_) { s.fd_ = -1; }
79  //! move-assignment operator: move file descriptor
81  if (this == &s) return *this;
82  if (fd_ >= 0) close();
83  fd_ = s.fd_;
84  s.fd_ = -1;
85  return *this;
86  }
87 
88  ~Socket() {
89  if (fd_ >= 0) close();
90  }
91 
92  //! Create a new stream socket.
93  static Socket Create() {
94 #ifdef SOCK_CLOEXEC
95  int fd = ::socket(PF_INET, SOCK_STREAM | SOCK_CLOEXEC, 0);
96 #else
97  int fd = ::socket(PF_INET, SOCK_STREAM, 0);
98 #endif
99  if (fd < 0) {
100  LOG << "Socket::Create()"
101  << " fd=" << fd
102  << " error=" << strerror(errno);
103  }
104 
105 #ifndef SOCK_CLOEXEC
106  if (fcntl(fd, F_SETFD, FD_CLOEXEC) != 0) {
108  "Error setting FD_CLOEXEC on network socket");
109  }
110 #endif
111 
112  return Socket(fd);
113  }
114 
115  //! Create a pair of connected stream sockets. Use this for internal local
116  //! test connection pairs.
117  static std::pair<Socket, Socket> CreatePair() {
118  int fds[2];
119 #ifdef SOCK_CLOEXEC
120  int r = ::socketpair(PF_UNIX, SOCK_STREAM | SOCK_CLOEXEC, 0, fds);
121 #else
122  int r = ::socketpair(PF_UNIX, SOCK_STREAM, 0, fds);
123 #endif
124  if (r != 0) {
125  LOG1 << "Socket::CreatePair()"
126  << " error=" << strerror(errno);
127  abort();
128  }
129 
130 #ifndef SOCK_CLOEXEC
131  if (fcntl(fds[0], F_SETFD, FD_CLOEXEC) != 0) {
133  "Error setting FD_CLOEXEC on network socket");
134  }
135  if (fcntl(fds[1], F_SETFD, FD_CLOEXEC) != 0) {
137  "Error setting FD_CLOEXEC on network socket");
138  }
139 #endif
140  return std::make_pair(Socket(fds[0], /* loopback_socket */ true),
141  Socket(fds[1], /* loopback_socket */ true));
142  }
143 
144  //! \}
145 
146  //! \name Status
147  //! \{
148 
149  //! Check whether the contained file descriptor is valid.
150  bool IsValid() const
151  { return fd_ >= 0; }
152 
153  //! Return the associated file descriptor
154  int fd() const { return fd_; }
155 
156  //! Query socket for its current error state.
157  int GetError() const {
158  int socket_error = -1;
159  socklen_t len = sizeof(socket_error);
160  getsockopt(SOL_SOCKET, SO_ERROR, &socket_error, &len);
161  return socket_error;
162  }
163 
164  //! Turn socket into non-blocking state.
165  static bool SetNonBlocking(int fd, bool non_blocking) {
166 
167  int old_opts = fcntl(fd, F_GETFL);
168 
169  int new_opts = non_blocking
170  ? (old_opts | O_NONBLOCK) : (old_opts & ~O_NONBLOCK);
171 
172  if (fcntl(fd, F_SETFL, new_opts) != 0)
173  {
174  LOG << "Socket::SetNonBlocking()"
175  << " fd=" << fd
176  << " non_blocking=" << non_blocking
177  << " error=" << strerror(errno);
178  return false;
179  }
180 
181  return true;
182  }
183 
184  //! Turn socket into non-blocking state.
185  bool SetNonBlocking(bool non_blocking) {
186  assert(IsValid());
187 
188  if (non_blocking == non_blocking_) return true;
189 
190  if (SetNonBlocking(fd_, non_blocking)) {
191  non_blocking_ = non_blocking;
192  return true;
193  }
194  return false;
195  }
196 
197  //! Return the current local socket address.
199  assert(IsValid());
200 
201  struct sockaddr_in6 sa;
202  socklen_t salen = sizeof(sa);
203 
204  if (getsockname(
205  fd_, reinterpret_cast<struct sockaddr*>(&sa), &salen) != 0)
206  {
207  LOG << "Socket::GetLocalAddress()"
208  << " fd_=" << fd_
209  << " error=" << strerror(errno);
210  return SocketAddress();
211  }
212 
213  return SocketAddress(reinterpret_cast<struct sockaddr*>(&sa), salen);
214  }
215 
216  //! Return the current peer socket address.
218  assert(IsValid());
219 
220  struct sockaddr_in6 sa;
221  socklen_t salen = sizeof(sa);
222 
223  if (getpeername(
224  fd_, reinterpret_cast<struct sockaddr*>(&sa), &salen) != 0)
225  {
226  LOG << "Socket::GetPeerAddress()"
227  << " fd_=" << fd_
228  << " error=" << strerror(errno);
229  return SocketAddress();
230  }
231 
232  return SocketAddress(reinterpret_cast<struct sockaddr*>(&sa), salen);
233  }
234 
235  //! \}
236 
237  //! \name Close
238  //! \{
239 
240  //! Close socket.
241  bool close() {
242  assert(IsValid());
243 
244  if (::close(fd_) != 0)
245  {
246  LOG << "Socket::close()"
247  << " fd_=" << fd_
248  << " error=" << strerror(errno);
249  return false;
250  }
251 
252  LOG << "Socket::close()"
253  << " fd_=" << fd_
254  << " closed";
255 
256  fd_ = -1;
257 
258  return true;
259  }
260 
261  //! \}
262 
263  //! \name Connect, Bind and Accept Functions
264  //! \{
265 
266  //! Bind socket to given SocketAddress for listening or connecting.
267  bool bind(const SocketAddress& sa) {
268  assert(IsValid());
269  assert(sa.IsValid());
270 
271  int r = ::bind(fd_, sa.sockaddr(), sa.socklen());
272 
273  if (r != 0) {
274  LOG << "Socket::bind()"
275  << " fd_=" << fd_
276  << " sa=" << sa
277  << " return=" << r
278  << " error=" << strerror(errno);
279  }
280 
281  return (r == 0);
282  }
283 
284  //! Initial socket connection to address
285  int connect(const SocketAddress& sa) {
286  assert(IsValid());
287  assert(sa.IsValid());
288 
289  int r = ::connect(fd_, sa.sockaddr(), sa.socklen());
290 
291  if (r == 0)
292  return r;
293 
294  LOG << "Socket::connect()"
295  << " fd_=" << fd_
296  << " sa=" << sa
297  << " return=" << r
298  << " error=" << strerror(errno);
299 
300  return r;
301  }
302 
303  //! Turn socket into listener state to accept incoming connections.
304  bool listen(int backlog = 0) {
305  assert(IsValid());
306 
307  if (backlog == 0) backlog = SOMAXCONN;
308 
309  int r = ::listen(fd_, backlog);
310 
311  if (r == 0) {
312  LOG << "Socket::listen()"
313  << " fd_=" << fd_;
314  }
315  else {
316  LOG << "Socket::listen()"
317  << " fd_=" << fd_
318  << " error=" << strerror(errno);
319  }
320  return (r == 0);
321  }
322 
323  //! Wait on socket until a new connection comes in.
324  Socket accept() const {
325  assert(IsValid());
326 
327  struct sockaddr_in6 sa;
328  socklen_t salen = sizeof(sa);
329 
330  int newfd = ::accept(fd_,
331  reinterpret_cast<struct sockaddr*>(&sa), &salen);
332  if (newfd < 0) {
333  LOG << "Socket::accept()"
334  << " fd_=" << fd_
335  << " error=" << strerror(errno);
336  return Socket();
337  }
338 
339  LOG << "Socket::accept()"
340  << " fd_=" << fd_
341  << " newfd=" << newfd;
342 
343  return Socket(newfd);
344  }
345 
346  //! \}
347 
348  //! \name Send and Recv Functions
349  //! \{
350 
351  //! Send (data,size) to socket (BSD socket API function wrapper), for
352  //! blocking sockets one should probably use send() instead of this
353  //! lower-layer functions.
354  ssize_t send_one(const void* data, size_t size, int flags = 0) {
355  assert(IsValid());
356 
357  if (debug) {
358  LOG << "Socket::send_one()"
359  << " fd_=" << fd_
360  << " size=" << size
361  << " data=" << MaybeHexdump(data, size)
362  << " flags=" << flags;
363  }
364 
365  ssize_t r = ::send(fd_, data, size, flags);
366 
367  LOG << "done Socket::send_one()"
368  << " fd_=" << fd_
369  << " return=" << r;
370 
371  return r;
372  }
373 
374  //! Send (data,size) to socket, retry sends if short-sends occur.
375  ssize_t send(const void* data, size_t size, int flags = 0) {
376  assert(IsValid());
377 
378  if (debug) {
379  LOG << "Socket::send()"
380  << " fd_=" << fd_
381  << " size=" << size
382  << " data=" << MaybeHexdump(data, size)
383  << " flags=" << flags;
384  }
385 
386  const char* cdata = static_cast<const char*>(data);
387  size_t wb = 0; // written bytes
388 
389  while (wb < size)
390  {
391  ssize_t r = ::send(fd_, cdata + wb, size - wb, flags);
392 
393  if (r <= 0) {
394  // an error occured, check errno.
395  if (errno == EAGAIN) continue;
396 
397  LOG << "done Socket::send()"
398  << " fd_=" << fd_
399  << " return=" << r
400  << " errno=" << strerror(errno);
401 
402  return r;
403  }
404 
405  wb += r;
406  }
407 
408  LOG << "done Socket::send()"
409  << " fd_=" << fd_
410  << " return=" << wb;
411 
412  return wb;
413  }
414 
415  //! Send (data,size) to destination
416  ssize_t sendto(const void* data, size_t size, int flags,
417  const SocketAddress& dest) {
418  assert(IsValid());
419 
420  if (debug) {
421  LOG << "Socket::sendto()"
422  << " fd_=" << fd_
423  << " size=" << size
424  << " data=" << MaybeHexdump(data, size)
425  << " flags=" << flags
426  << " dest=" << dest;
427  }
428 
429  ssize_t r = ::sendto(fd_, data, size, flags,
430  dest.sockaddr(), dest.socklen());
431 
432  LOG << "done Socket::sendto()"
433  << " fd_=" << fd_
434  << " return=" << r;
435 
436  return r;
437  }
438 
439  //! Recv (out_data,max_size) from socket (BSD socket API function wrapper)
440  ssize_t recv_one(void* out_data, size_t max_size, int flags = 0) {
441  assert(IsValid());
442 
443  // this is a work-around, since on errno is spontaneously == EINVAL,
444  // with no relationship to recv() -tb 2015-08-28
445  errno = 0;
446 
447  LOG << "Socket::recv_one()"
448  << " fd_=" << fd_
449  << " max_size=" << max_size
450  << " flags=" << flags
451  << " errno=" << errno;
452 
453  ssize_t r = ::recv(fd_, out_data, max_size, flags);
454 
455  if (debug) {
456  LOG << "done Socket::recv_one()"
457  << " fd_=" << fd_
458  << " return=" << r
459  << " errno=" << errno
460  << " data=" << (r >= 0 ? MaybeHexdump(out_data, r) : "<error>");
461  }
462 
463  return r;
464  }
465 
466  //! Receive (data,size) from socket, retry recvs if short-reads occur.
467  ssize_t recv(void* out_data, size_t size, int flags = 0) {
468  assert(IsValid());
469 
470  LOG << "Socket::recv()"
471  << " fd_=" << fd_
472  << " size=" << size
473  << " flags=" << flags;
474 
475  char* cdata = static_cast<char*>(out_data);
476  size_t rb = 0; // read bytes
477 
478  while (rb < size)
479  {
480  ssize_t r = ::recv(fd_, cdata + rb, size - rb, flags);
481 
482  if (r <= 0) {
483  // an error occured, check errno.
484  if (errno == EAGAIN) continue;
485 
486  LOG << "done Socket::recv()"
487  << " fd_=" << fd_
488  << " size=" << size
489  << " return=" << r
490  << " errno=" << strerror(errno);
491 
492  return r;
493  }
494 
495  rb += r;
496  }
497 
498  if (debug) {
499  LOG << "done Socket::recv()"
500  << " fd_=" << fd_
501  << " return=" << rb
502  << " data=" << MaybeHexdump(out_data, rb);
503  }
504 
505  return rb;
506  }
507 
508  //! Recv (out_data,max_size) and source address from socket (BSD socket API
509  //! function wrapper)
510  ssize_t recvfrom(void* out_data, size_t max_size, int flags = 0,
511  SocketAddress* out_source = nullptr) {
512  assert(IsValid());
513 
514  LOG << "Socket::recvfrom()"
515  << " fd_=" << fd_
516  << " max_size=" << max_size
517  << " flags=" << flags
518  << " out_socklen=" << (out_source ? out_source->socklen() : 0);
519 
520  socklen_t out_socklen = out_source ? out_source->socklen() : 0;
521 
522  ssize_t r = ::recvfrom(fd_, out_data, max_size, flags,
523  out_source ? out_source->sockaddr() : nullptr,
524  &out_socklen);
525 
526  if (debug) {
527  LOG << "done Socket::recvfrom()"
528  << " fd_=" << fd_
529  << " return=" << r
530  << " data="
531  << (r >= 0 ? MaybeHexdump(out_data, r) : "<error>")
532  << " out_source="
533  << (out_source ? out_source->ToStringHostPort() : "<null>");
534  }
535 
536  return r;
537  }
538 
539  //! \}
540 
541  //! \name Socket Options and Accelerations
542  //! \{
543 
544  //! Perform raw getsockopt() operation on socket.
545  int getsockopt(int level, int optname,
546  void* optval, socklen_t* optlen) const {
547  assert(IsValid());
548 
549  int r = ::getsockopt(fd_, level, optname, optval, optlen);
550 
551  if (r != 0)
552  LOG << "Socket::getsockopt()"
553  << " fd_=" << fd_
554  << " level=" << level
555  << " optname=" << optname
556  << " optval=" << optval
557  << " optlen=" << optlen
558  << " error=" << strerror(errno);
559 
560  return r;
561  }
562 
563  //! Perform raw setsockopt() operation on socket.
564  int setsockopt(int level, int optname,
565  const void* optval, socklen_t optlen) {
566  assert(IsValid());
567 
568  int r = ::setsockopt(fd_, level, optname, optval, optlen);
569 
570  if (r != 0)
571  LOG << "Socket::setsockopt()"
572  << " fd_=" << fd_
573  << " level=" << level
574  << " optname=" << optname
575  << " optval=" << optval
576  << " optlen=" << optlen
577  << " error=" << strerror(errno);
578 
579  return r;
580  }
581 
582  //! Enable sending of keep-alive messages on connection-oriented sockets.
583  void SetKeepAlive(bool activate = true);
584 
585  //! Enable SO_REUSEADDR, which allows the socket to be bound more quickly to
586  //! previously used ports.
587  void SetReuseAddr(bool activate = true);
588 
589  //! If set, disable the Nagle algorithm. This means that segments are always
590  //! sent as soon as possible, even if there is only a small amount of data.
591  void SetNoDelay(bool activate = true);
592 
593  //! Set SO_SNDBUF socket option.
594  void SetSndBuf(size_t size);
595 
596  //! Set SO_RCVBUF socket option.
597  void SetRcvBuf(size_t size);
598 
599  //! \}
600 
601 private:
602  //! the file descriptor of the socket.
603  int fd_;
604 
605  //! flag whether the socket is set to non-blocking
606  bool non_blocking_ = false;
607 
608  //! return hexdump or just [data] if not debugging
609  static std::string MaybeHexdump(const void* data, size_t size) {
610  if (debug_data)
611  return tlx::hexdump(data, size);
612  else
613  return "[data]";
614  }
615 };
616 
617 // \}
618 
619 } // namespace tcp
620 } // namespace net
621 } // namespace thrill
622 
623 #endif // !THRILL_NET_TCP_SOCKET_HEADER
624 
625 /******************************************************************************/
ssize_t recvfrom(void *out_data, size_t max_size, int flags=0, SocketAddress *out_source=nullptr)
Definition: socket.hpp:510
void SetRcvBuf(size_t size)
Set SO_RCVBUF socket option.
Definition: socket.cpp:106
int getsockopt(int level, int optname, void *optval, socklen_t *optlen) const
Perform raw getsockopt() operation on socket.
Definition: socket.hpp:545
bool close()
Close socket.
Definition: socket.hpp:241
bool listen(int backlog=0)
Turn socket into listener state to accept incoming connections.
Definition: socket.hpp:304
static std::string MaybeHexdump(const void *data, size_t size)
return hexdump or just [data] if not debugging
Definition: socket.hpp:609
#define LOG1
Definition: logger.hpp:28
int fd_
the file descriptor of the socket.
Definition: socket.hpp:603
SocketAddress is a super class used to unify the two different IPv4 and IPv6 socket address represent...
An Exception which is thrown on system errors and contains errno information.
static std::pair< Socket, Socket > CreatePair()
Definition: socket.hpp:117
Socket()
default constructor: invalid socket.
Definition: socket.hpp:71
void SetKeepAlive(bool activate=true)
Enable sending of keep-alive messages on connection-oriented sockets.
Definition: socket.cpp:23
int connect(const SocketAddress &sa)
Initial socket connection to address.
Definition: socket.hpp:285
ssize_t recv_one(void *out_data, size_t max_size, int flags=0)
Recv (out_data,max_size) from socket (BSD socket API function wrapper)
Definition: socket.hpp:440
bool SetNonBlocking(bool non_blocking)
Turn socket into non-blocking state.
Definition: socket.hpp:185
ssize_t send_one(const void *data, size_t size, int flags=0)
Definition: socket.hpp:354
bool bind(const SocketAddress &sa)
Bind socket to given SocketAddress for listening or connecting.
Definition: socket.hpp:267
SocketAddress GetLocalAddress() const
Return the current local socket address.
Definition: socket.hpp:198
bool non_blocking_
flag whether the socket is set to non-blocking
Definition: socket.hpp:606
ssize_t sendto(const void *data, size_t size, int flags, const SocketAddress &dest)
Send (data,size) to destination.
Definition: socket.hpp:416
Socket accept() const
Wait on socket until a new connection comes in.
Definition: socket.hpp:324
Socket is a light-weight wrapper around the BSD socket API.
Definition: socket.hpp:50
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
socklen_t socklen() const
Return total length of enclosed sockaddr structure.
void SetReuseAddr(bool activate=true)
Definition: socket.cpp:37
Socket & operator=(const Socket &)=delete
non-copyable: delete assignment operator
ssize_t recv(void *out_data, size_t size, int flags=0)
Receive (data,size) from socket, retry recvs if short-reads occur.
Definition: socket.hpp:467
static Socket Create()
Create a new stream socket.
Definition: socket.hpp:93
Socket(int fd, bool loopback_socket=false)
construct new Socket object from existing file descriptor.
Definition: socket.hpp:60
int GetError() const
Query socket for its current error state.
Definition: socket.hpp:157
static bool SetNonBlocking(int fd, bool non_blocking)
Turn socket into non-blocking state.
Definition: socket.hpp:165
struct sockaddr * sockaddr()
Return pointer to enclosed address as a generic sockattr struct.
std::string hexdump(const void *const data, size_t size)
Dump a (binary) string as a sequence of uppercase hexadecimal pairs.
Definition: hexdump.cpp:21
Socket(Socket &&s) noexcept
move-constructor: move file descriptor
Definition: socket.hpp:78
static constexpr bool debug
Definition: socket.hpp:52
int fd() const
Return the associated file descriptor.
Definition: socket.hpp:154
void SetSndBuf(size_t size)
Set SO_SNDBUF socket option.
Definition: socket.cpp:81
void SetNoDelay(bool activate=true)
Definition: socket.cpp:60
SocketAddress GetPeerAddress() const
Return the current peer socket address.
Definition: socket.hpp:217
#define LOG
Default logging method: output if the local debug variable is true.
Definition: logger.hpp:24
ssize_t send(const void *data, size_t size, int flags=0)
Send (data,size) to socket, retry sends if short-sends occur.
Definition: socket.hpp:375
int setsockopt(int level, int optname, const void *optval, socklen_t optlen)
Perform raw setsockopt() operation on socket.
Definition: socket.hpp:564
bool IsValid() const
Check whether the contained file descriptor is valid.
Definition: socket.hpp:150
static constexpr bool debug_data
Definition: socket.hpp:53