Thrill  0.1
connection.hpp
Go to the documentation of this file.
1 /*******************************************************************************
2  * thrill/net/tcp/connection.hpp
3  *
4  * Contains net::Connection, a richer set of network point-to-point primitives.
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Timo Bingmann <[email protected]>
9  * Copyright (C) 2015 Emanuel Jöbstl <[email protected]>
10  *
11  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
12  ******************************************************************************/
13 
14 #pragma once
15 #ifndef THRILL_NET_TCP_CONNECTION_HEADER
16 #define THRILL_NET_TCP_CONNECTION_HEADER
17 
18 #include <thrill/common/config.hpp>
21 
22 #include <cassert>
23 #include <cerrno>
24 #include <cstdio>
25 #include <cstring>
26 #include <string>
27 
28 namespace thrill {
29 namespace net {
30 namespace tcp {
31 
32 //! \addtogroup net_tcp TCP Socket API
33 //! \{
34 
35 enum ConnectionState : unsigned {
38 };
39 
40 // Because Mac OSX does not know MSG_MORE.
41 #ifndef MSG_MORE
42 #define MSG_MORE 0
43 #endif
44 
45 /*!
46  * Connection is a rich point-to-point socket connection to another client
47  * (worker, master, or whatever). Messages are fixed-length integral items or
48  * opaque byte strings with a length.
49  *
50  * If any function fails to send or receive, then a NetException is thrown
51  * instead of explicit error handling. If ever an error occurs, we probably have
52  * to rebuild the whole network explicitly.
53  */
54 class Connection final : public net::Connection
55 {
56  static constexpr bool debug = false;
57 
58 public:
59  //! default construction, contains invalid socket
60  Connection() = default;
61 
62  //! Construct Connection from a Socket
63  explicit Connection(Socket&& s)
64  : socket_(std::move(s))
65  { }
66 
67  //! Construct Connection from a Socket, with immediate
68  //! initialization. (Currently used by tests).
69  Connection(Socket&& s, size_t group_id, size_t peer_id)
70  : socket_(std::move(s)),
71  group_id_(group_id), peer_id_(peer_id)
72  { }
73 
74  //! move-constructor
76  : socket_(std::move(other.socket_)),
77  state_(other.state_),
78  group_id_(other.group_id_),
79  peer_id_(other.peer_id_) {
80  other.state_ = ConnectionState::Invalid;
81  }
82 
83  //! move assignment-operator
85  if (IsValid()) {
86  sLOG1 << "Assignment-destruction of valid Connection" << this;
87  Close();
88  }
89  socket_ = std::move(other.socket_);
90  state_ = other.state_;
91  group_id_ = other.group_id_;
92  peer_id_ = other.peer_id_;
93 
94  other.state_ = ConnectionState::Invalid;
95  return *this;
96  }
97 
98  //! Gets the state of this connection.
100  { return state_; }
101 
102  //! Gets the id of the net group this connection is associated with.
103  size_t group_id() const
104  { return group_id_; }
105 
106  //! Gets the id of the worker this connection is connected to.
107  size_t peer_id() const
108  { return peer_id_; }
109 
110  //! Sets the state of this connection.
112  { state_ = state; }
113 
114  //! Sets the group id of this connection.
115  void set_group_id(size_t groupId)
116  { group_id_ = groupId; }
117 
118  //! Sets the id of the worker this connection is connected to.
119  void set_peer_id(size_t peerId)
120  { peer_id_ = peerId; }
121 
122  //! Check whether the contained file descriptor is valid.
123  bool IsValid() const final
124  { return socket_.IsValid(); }
125 
126  std::string ToString() const final
127  { return std::to_string(GetSocket().fd()); }
128 
129  //! Return the raw socket object for more low-level network programming.
131  { return socket_; }
132 
133  //! Return the raw socket object for more low-level network programming.
134  const Socket& GetSocket() const
135  { return socket_; }
136 
137  //! Return the associated socket error
138  int GetError() const
139  { return socket_.GetError(); }
140 
141  //! Set socket to non-blocking
142  void SetNonBlocking(bool non_blocking) {
143  if (!socket_.SetNonBlocking(non_blocking))
144  throw Exception("Error setting socket non-blocking flag", errno);
145  }
146 
147  //! Return the socket peer address
149  { return socket_.GetPeerAddress().ToStringHostPort(); }
150 
151  //! Checks wether two connections have the same underlying socket or not.
152  bool operator == (const Connection& c) const noexcept
153  { return GetSocket().fd() == c.GetSocket().fd(); }
154 
155  //! Destruction of Connection should be explicitly done by a NetGroup or
156  //! other network class.
158  if (IsValid())
159  Close();
160  }
161 
162  void SyncSend(const void* data, size_t size, Flags flags) final {
163  SetNonBlocking(false);
164  int f = 0;
165  if (flags & MsgMore) f |= MSG_MORE;
166  if (socket_.send(data, size, f) != static_cast<ssize_t>(size))
167  throw Exception("Error during SyncSend", errno);
168  tx_bytes_ += size;
169  }
170 
171  ssize_t SendOne(const void* data, size_t size, Flags flags) final {
172 #if __APPLE__
173  // MacOSX has no MSG_DONTWAIT
174  SetNonBlocking(true);
175 #endif
176  int f = MSG_DONTWAIT;
177  if (flags & MsgMore) f |= MSG_MORE;
178  ssize_t wb = socket_.send_one(data, size, f);
179  if (wb > 0) tx_bytes_ += wb;
180  return wb;
181  }
182 
183  void SyncRecv(void* out_data, size_t size) final {
184  SetNonBlocking(false);
185  if (socket_.recv(out_data, size) != static_cast<ssize_t>(size))
186  throw Exception("Error during SyncRecv", errno);
187  rx_bytes_ += size;
188  }
189 
190  ssize_t RecvOne(void* out_data, size_t size) final {
191 #if __APPLE__
192  // MacOSX has no MSG_DONTWAIT
193  SetNonBlocking(true);
194 #endif
195  ssize_t rb = socket_.recv_one(out_data, size, MSG_DONTWAIT);
196  if (rb > 0) rx_bytes_ += rb;
197  return rb;
198  }
199 
200  void SyncSendRecv(const void* send_data, size_t send_size,
201  void* recv_data, size_t recv_size) final {
202  SyncSend(send_data, send_size, NoFlags);
203  SyncRecv(recv_data, recv_size);
204  }
205 
206  void SyncRecvSend(const void* send_data, size_t send_size,
207  void* recv_data, size_t recv_size) final {
208  SyncRecv(recv_data, recv_size);
209  SyncSend(send_data, send_size, NoFlags);
210  }
211 
212  //! Close this Connection
213  void Close() {
214  socket_.close();
215  }
216 
217  //! make ostreamable
218  std::ostream& OutputOstream(std::ostream& os) const final {
219  os << "[tcp::Connection"
220  << " fd=" << GetSocket().fd();
221 
222  if (IsValid())
223  os << " peer=" << GetPeerAddress();
224 
225  return os << "]";
226  }
227 
228 private:
229  //! Underlying socket or connection handle.
231 
232  //! The connection state of this connection in the Thrill network state
233  //! machine.
235 
236  //! The id of the group this connection is associated with.
237  size_t group_id_ = size_t(-1);
238 
239  //! The id of the worker this connection is connected to.
240  size_t peer_id_ = size_t(-1);
241 };
242 
243 // \}
244 
245 } // namespace tcp
246 } // namespace net
247 } // namespace thrill
248 
249 #endif // !THRILL_NET_TCP_CONNECTION_HEADER
250 
251 /******************************************************************************/
size_t peer_id_
The id of the worker this connection is connected to.
Definition: connection.hpp:240
ConnectionState state() const
Gets the state of this connection.
Definition: connection.hpp:99
Socket & GetSocket()
Return the raw socket object for more low-level network programming.
Definition: connection.hpp:130
void SyncRecvSend(const void *send_data, size_t send_size, void *recv_data, size_t recv_size) final
Definition: connection.hpp:206
void SyncSend(const void *data, size_t size, Flags flags) final
Definition: connection.hpp:162
size_t peer_id() const
Gets the id of the worker this connection is connected to.
Definition: connection.hpp:107
ssize_t SendOne(const void *data, size_t size, Flags flags) final
Definition: connection.hpp:171
bool close()
Close socket.
Definition: socket.hpp:241
std::string ToString() const final
return a string representation of this connection, for user output.
Definition: connection.hpp:126
std::atomic< size_t > tx_bytes_
sent bytes
Definition: connection.hpp:609
Connection(Connection &&other)
move-constructor
Definition: connection.hpp:75
void set_peer_id(size_t peerId)
Sets the id of the worker this connection is connected to.
Definition: connection.hpp:119
const Socket & GetSocket() const
Return the raw socket object for more low-level network programming.
Definition: connection.hpp:134
STL namespace.
void SyncSendRecv(const void *send_data, size_t send_size, void *recv_data, size_t recv_size) final
Definition: connection.hpp:200
Connection(Socket &&s)
Construct Connection from a Socket.
Definition: connection.hpp:63
void Close()
Close this Connection.
Definition: connection.hpp:213
#define MSG_MORE
Definition: connection.hpp:42
ssize_t recv_one(void *out_data, size_t max_size, int flags=0)
Recv (out_data,max_size) from socket (BSD socket API function wrapper)
Definition: socket.hpp:440
int GetError() const
Return the associated socket error.
Definition: connection.hpp:138
#define sLOG1
Definition: logger.hpp:38
ssize_t send_one(const void *data, size_t size, int flags=0)
Definition: socket.hpp:354
Connection is a rich point-to-point socket connection to another client (worker, master, or whatever).
Definition: connection.hpp:54
static by_string to_string(int val)
convert to string
size_t group_id_
The id of the group this connection is associated with.
Definition: connection.hpp:237
A Exception is thrown by Connection on all errors instead of returning error codes.
Definition: exception.hpp:30
void SyncRecv(void *out_data, size_t size) final
Definition: connection.hpp:183
std::string GetPeerAddress() const
Return the socket peer address.
Definition: connection.hpp:148
Socket is a light-weight wrapper around the BSD socket API.
Definition: socket.hpp:50
std::ostream & OutputOstream(std::ostream &os) const final
make ostreamable
Definition: connection.hpp:218
A Connection represents a link to another peer in a network group.
Definition: connection.hpp:49
std::basic_string< char, std::char_traits< char >, Allocator< char > > string
string with Manager tracking
Definition: allocator.hpp:220
static constexpr bool debug
Definition: connection.hpp:56
Connection()=default
default construction, contains invalid socket
ssize_t recv(void *out_data, size_t size, int flags=0)
Receive (data,size) from socket, retry recvs if short-reads occur.
Definition: socket.hpp:467
void set_group_id(size_t groupId)
Sets the group id of this connection.
Definition: connection.hpp:115
int GetError() const
Query socket for its current error state.
Definition: socket.hpp:157
static bool SetNonBlocking(int fd, bool non_blocking)
Turn socket into non-blocking state.
Definition: socket.hpp:165
bool IsValid() const final
Check whether the contained file descriptor is valid.
Definition: connection.hpp:123
Socket socket_
Underlying socket or connection handle.
Definition: connection.hpp:230
void SetNonBlocking(bool non_blocking)
Set socket to non-blocking.
Definition: connection.hpp:142
int fd() const
Return the associated file descriptor.
Definition: socket.hpp:154
std::string ToStringHostPort() const
Return the enclosed socket address as a string with the port number.
Connection(Socket &&s, size_t group_id, size_t peer_id)
Definition: connection.hpp:69
Flags
Additional flags for sending or receiving.
Definition: connection.hpp:61
ssize_t RecvOne(void *out_data, size_t size) final
Definition: connection.hpp:190
SocketAddress GetPeerAddress() const
Return the current peer socket address.
Definition: socket.hpp:217
ssize_t send(const void *data, size_t size, int flags=0)
Send (data,size) to socket, retry sends if short-sends occur.
Definition: socket.hpp:375
Connection & operator=(Connection &&other)
move assignment-operator
Definition: connection.hpp:84
size_t group_id() const
Gets the id of the net group this connection is associated with.
Definition: connection.hpp:103
bool operator==(const Connection &c) const noexcept
Checks wether two connections have the same underlying socket or not.
Definition: connection.hpp:152
bool IsValid() const
Check whether the contained file descriptor is valid.
Definition: socket.hpp:150
std::atomic< size_t > rx_bytes_
received bytes
Definition: connection.hpp:612
void set_state(ConnectionState state)
Sets the state of this connection.
Definition: connection.hpp:111