blob: de3bea7be37666ad53cb5721dac2564beeec356d [file] [log] [blame]
Marc Slemkoe03da182006-07-21 21:32:36 +00001#include <config.h>
Mark Sleee8540632006-05-30 09:24:40 +00002#include <sys/socket.h>
3#include <arpa/inet.h>
4#include <netinet/in.h>
5#include <netinet/tcp.h>
6#include <netdb.h>
7#include <unistd.h>
8#include <errno.h>
Mark Slee29050782006-09-29 00:12:30 +00009#include <fcntl.h>
10#include <sys/select.h>
Mark Sleee8540632006-05-30 09:24:40 +000011
Mark Slee29050782006-09-29 00:12:30 +000012#include "concurrency/Monitor.h"
Marc Slemkod42a2c22006-08-10 03:30:18 +000013#include "TSocket.h"
14#include "TTransportException.h"
Mark Sleee8540632006-05-30 09:24:40 +000015
Marc Slemko6f038a72006-08-03 18:58:09 +000016namespace facebook { namespace thrift { namespace transport {
17
Mark Sleee8540632006-05-30 09:24:40 +000018using namespace std;
Mark Slee29050782006-09-29 00:12:30 +000019using namespace facebook::thrift::concurrency;
Mark Sleee8540632006-05-30 09:24:40 +000020
Mark Slee29050782006-09-29 00:12:30 +000021// Global var to track total socket sys calls
Mark Slee8d7e1f62006-06-07 06:48:56 +000022uint32_t g_socket_syscalls = 0;
23
24/**
25 * TSocket implementation.
26 *
27 * @author Mark Slee <mcslee@facebook.com>
28 */
29
Mark Sleee8540632006-05-30 09:24:40 +000030// Mutex to protect syscalls to netdb
Mark Slee29050782006-09-29 00:12:30 +000031static Monitor s_netdb_monitor;
Mark Sleee8540632006-05-30 09:24:40 +000032
33// TODO(mcslee): Make this an option to the socket class
34#define MAX_RECV_RETRIES 20
Mark Slee29050782006-09-29 00:12:30 +000035
36TSocket::TSocket(string host, int port) :
37 host_(host),
38 port_(port),
39 socket_(0),
40 connTimeout_(0),
41 sendTimeout_(0),
42 recvTimeout_(0),
43 lingerOn_(1),
44 lingerVal_(0),
45 noDelay_(1) {
Mark Sleee8540632006-05-30 09:24:40 +000046}
47
Mark Slee29050782006-09-29 00:12:30 +000048TSocket::TSocket(int socket) :
49 host_(""),
50 port_(0),
51 socket_(socket),
52 connTimeout_(0),
53 sendTimeout_(0),
54 recvTimeout_(0),
55 lingerOn_(1),
56 lingerVal_(0),
57 noDelay_(1) {
58}
59
Mark Sleee8540632006-05-30 09:24:40 +000060TSocket::~TSocket() {
61 close();
62}
63
Mark Slee8d7e1f62006-06-07 06:48:56 +000064bool TSocket::isOpen() {
65 return (socket_ > 0);
66}
67
68void TSocket::open() {
Mark Sleee8540632006-05-30 09:24:40 +000069 // Create socket
70 socket_ = socket(AF_INET, SOCK_STREAM, 0);
71 if (socket_ == -1) {
Mark Slee8d7e1f62006-06-07 06:48:56 +000072 perror("TSocket::open() socket");
73 close();
74 throw TTransportException(TTX_NOT_OPEN, "socket() ERROR:" + errno);
Mark Sleee8540632006-05-30 09:24:40 +000075 }
Mark Slee29050782006-09-29 00:12:30 +000076
77 // Send timeout
78 if (sendTimeout_ > 0) {
79 setSendTimeout(sendTimeout_);
80 }
81
82 // Recv timeout
83 if (recvTimeout_ > 0) {
84 setRecvTimeout(recvTimeout_);
85 }
86
87 // Linger
88 setLinger(lingerOn_, lingerVal_);
89
90 // No delay
91 setNoDelay(noDelay_);
92
Mark Slee8d7e1f62006-06-07 06:48:56 +000093 // Lookup the hostname
Mark Sleee8540632006-05-30 09:24:40 +000094 struct sockaddr_in addr;
95 addr.sin_family = AF_INET;
96 addr.sin_port = htons(port_);
97
Mark Sleee8540632006-05-30 09:24:40 +000098 {
Mark Slee29050782006-09-29 00:12:30 +000099 // Scope lock on host entry lookup
100 Synchronized s(s_netdb_monitor);
Mark Sleee8540632006-05-30 09:24:40 +0000101 struct hostent *host_entry = gethostbyname(host_.c_str());
102
103 if (host_entry == NULL) {
Mark Slee29050782006-09-29 00:12:30 +0000104 perror("TSocket: dns error: failed call to gethostbyname.");
Mark Sleee8540632006-05-30 09:24:40 +0000105 close();
Mark Slee8d7e1f62006-06-07 06:48:56 +0000106 throw TTransportException(TTX_NOT_OPEN, "gethostbyname() failed");
Mark Sleee8540632006-05-30 09:24:40 +0000107 }
108
109 addr.sin_port = htons(port_);
110 memcpy(&addr.sin_addr.s_addr,
111 host_entry->h_addr_list[0],
112 host_entry->h_length);
113 }
Mark Slee29050782006-09-29 00:12:30 +0000114
115 // Set the socket to be non blocking for connect if a timeout exists
116 int flags = fcntl(socket_, F_GETFL, 0);
117 if (connTimeout_ > 0) {
118 fcntl(socket_, F_SETFL, flags | O_NONBLOCK);
119 } else {
120 fcntl(socket_, F_SETFL, flags | ~O_NONBLOCK);
121 }
122
123 // Conn timeout
124 struct timeval c = {(int)(connTimeout_/1000),
125 (int)((connTimeout_%1000)*1000)};
Mark Sleee8540632006-05-30 09:24:40 +0000126
127 // Connect the socket
128 int ret = connect(socket_, (struct sockaddr *)&addr, sizeof(addr));
129
Mark Slee29050782006-09-29 00:12:30 +0000130 if (ret == 0) {
131 goto done;
132 }
133
134 if (errno != EINPROGRESS) {
Mark Sleee8540632006-05-30 09:24:40 +0000135 close();
Mark Slee29050782006-09-29 00:12:30 +0000136 char buff[1024];
137 sprintf(buff, "TSocket::open() connect %s %d", host_.c_str(), port_);
138 perror(buff);
Mark Slee8d7e1f62006-06-07 06:48:56 +0000139 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
Mark Sleee8540632006-05-30 09:24:40 +0000140 }
141
Mark Slee29050782006-09-29 00:12:30 +0000142 fd_set fds;
143 FD_ZERO(&fds);
144 FD_SET(socket_, &fds);
145 ret = select(socket_+1, NULL, &fds, NULL, &c);
146
147 if (ret > 0) {
148 // Ensure connected
149 int val;
150 socklen_t lon;
151 lon = sizeof(int);
152 int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, (void *)&val, &lon);
153 if (ret2 == -1) {
154 close();
155 perror("TSocket::open() getsockopt SO_ERROR");
156 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
157 }
158 if (val == 0) {
159 goto done;
160 }
161 close();
162 perror("TSocket::open() SO_ERROR was set");
163 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
164 } else if (ret == 0) {
165 close();
166 perror("TSocket::open() timeed out");
167 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
168 } else {
169 close();
170 perror("TSocket::open() select error");
171 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
172 }
173
174 done:
175 // Set socket back to normal mode (blocking)
176 fcntl(socket_, F_SETFL, flags);
Mark Sleee8540632006-05-30 09:24:40 +0000177}
178
179void TSocket::close() {
180 if (socket_ > 0) {
181 shutdown(socket_, SHUT_RDWR);
182 ::close(socket_);
183 }
184 socket_ = 0;
185}
186
Mark Slee8d7e1f62006-06-07 06:48:56 +0000187uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
188 if (socket_ <= 0) {
189 throw TTransportException(TTX_NOT_OPEN, "Called read on non-open socket");
190 }
Mark Sleee8540632006-05-30 09:24:40 +0000191
Mark Sleee8540632006-05-30 09:24:40 +0000192 uint32_t retries = 0;
Mark Slee8d7e1f62006-06-07 06:48:56 +0000193
194 try_again:
195 // Read from the socket
196 int got = recv(socket_, buf, len, 0);
197 ++g_socket_syscalls;
198
199 // Check for error on read
200 if (got < 0) {
201 perror("TSocket::read()");
202
203 // If temporarily out of resources, sleep a bit and try again
204 if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) {
205 usleep(50);
206 goto try_again;
Mark Sleee8540632006-05-30 09:24:40 +0000207 }
208
Mark Slee8d7e1f62006-06-07 06:48:56 +0000209 // If interrupted, try again
210 if (errno == EINTR && retries++ < MAX_RECV_RETRIES) {
211 goto try_again;
Mark Sleee8540632006-05-30 09:24:40 +0000212 }
213
Mark Slee8d7e1f62006-06-07 06:48:56 +0000214 // If we disconnect with no linger time
215 if (errno == ECONNRESET) {
216 throw TTransportException(TTX_NOT_OPEN, "ECONNRESET");
217 }
218
219 // This ish isn't open
220 if (errno == ENOTCONN) {
221 throw TTransportException(TTX_NOT_OPEN, "ENOTCONN");
222 }
223
224 // Timed out!
225 if (errno == ETIMEDOUT) {
226 throw TTransportException(TTX_TIMED_OUT, "ETIMEDOUT");
227 }
228
229 // Some other error, whatevz
230 throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
231 }
232
233 // The remote host has closed the socket
234 if (got == 0) {
235 close();
236 return 0;
Mark Sleee8540632006-05-30 09:24:40 +0000237 }
238
239 // Pack data into string
Mark Slee8d7e1f62006-06-07 06:48:56 +0000240 return got;
Mark Sleee8540632006-05-30 09:24:40 +0000241}
242
Mark Slee8d7e1f62006-06-07 06:48:56 +0000243void TSocket::write(const uint8_t* buf, uint32_t len) {
244 if (socket_ <= 0) {
245 throw TTransportException(TTX_NOT_OPEN, "Called write on non-open socket");
246 }
247
Mark Sleee8540632006-05-30 09:24:40 +0000248 uint32_t sent = 0;
249
Mark Slee8d7e1f62006-06-07 06:48:56 +0000250 while (sent < len) {
Marc Slemko9d4a3e22006-07-21 19:53:48 +0000251
252 int flags = 0;
Mark Slee29050782006-09-29 00:12:30 +0000253 #ifdef MSG_NOSIGNAL
Mark Slee8d7e1f62006-06-07 06:48:56 +0000254 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
255 // check for the EPIPE return condition and close the socket in that case
Marc Slemko9d4a3e22006-07-21 19:53:48 +0000256 flags |= MSG_NOSIGNAL;
Mark Slee29050782006-09-29 00:12:30 +0000257 #endif // ifdef MSG_NOSIGNAL
Marc Slemko9d4a3e22006-07-21 19:53:48 +0000258
259 int b = send(socket_, buf + sent, len - sent, flags);
Mark Slee8d7e1f62006-06-07 06:48:56 +0000260 ++g_socket_syscalls;
261
Mark Sleee8540632006-05-30 09:24:40 +0000262 // Fail on a send error
263 if (b < 0) {
Mark Slee8d7e1f62006-06-07 06:48:56 +0000264 if (errno == EPIPE) {
265 close();
266 throw TTransportException(TTX_NOT_OPEN, "EPIPE");
267 }
268
269 if (errno == ECONNRESET) {
270 close();
271 throw TTransportException(TTX_NOT_OPEN, "ECONNRESET");
272 }
273
274 if (errno == ENOTCONN) {
275 close();
276 throw TTransportException(TTX_NOT_OPEN, "ENOTCONN");
277 }
278
279 perror("TSocket::write() send < 0");
280 throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
Mark Sleee8540632006-05-30 09:24:40 +0000281 }
282
283 // Fail on blocked send
284 if (b == 0) {
Mark Slee8d7e1f62006-06-07 06:48:56 +0000285 throw TTransportException(TTX_NOT_OPEN, "Socket send returned 0.");
Mark Sleee8540632006-05-30 09:24:40 +0000286 }
Mark Sleee8540632006-05-30 09:24:40 +0000287 sent += b;
288 }
289}
290
Mark Slee8d7e1f62006-06-07 06:48:56 +0000291void TSocket::setLinger(bool on, int linger) {
Mark Slee29050782006-09-29 00:12:30 +0000292 lingerOn_ = on;
293 lingerVal_ = linger;
Mark Slee8d7e1f62006-06-07 06:48:56 +0000294 if (socket_ <= 0) {
295 return;
296 }
297
Mark Slee29050782006-09-29 00:12:30 +0000298 struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_};
299 int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
300 if (ret == -1) {
Mark Sleee8540632006-05-30 09:24:40 +0000301 perror("TSocket::setLinger()");
Mark Sleee8540632006-05-30 09:24:40 +0000302 }
Mark Sleee8540632006-05-30 09:24:40 +0000303}
304
Mark Slee8d7e1f62006-06-07 06:48:56 +0000305void TSocket::setNoDelay(bool noDelay) {
Mark Slee29050782006-09-29 00:12:30 +0000306 noDelay_ = noDelay;
Mark Slee8d7e1f62006-06-07 06:48:56 +0000307 if (socket_ <= 0) {
308 return;
309 }
310
Mark Sleee8540632006-05-30 09:24:40 +0000311 // Set socket to NODELAY
Mark Slee29050782006-09-29 00:12:30 +0000312 int v = noDelay_ ? 1 : 0;
313 int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v));
314 if (ret == -1) {
Mark Sleee8540632006-05-30 09:24:40 +0000315 perror("TSocket::setNoDelay()");
Mark Sleee8540632006-05-30 09:24:40 +0000316 }
Mark Sleee8540632006-05-30 09:24:40 +0000317}
Mark Slee29050782006-09-29 00:12:30 +0000318
319void TSocket::setConnTimeout(int ms) {
320 connTimeout_ = ms;
321}
322
323void TSocket::setRecvTimeout(int ms) {
324 recvTimeout_ = ms;
325 if (socket_ <= 0) {
326 return;
327 }
328
329 struct timeval r = {(int)(recvTimeout_/1000),
330 (int)((recvTimeout_%1000)*1000)};
331 int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, &r, sizeof(r));
332 if (ret == -1) {
333 perror("TSocket::setRecvTimeout()");
334 }
335}
336
337void TSocket::setSendTimeout(int ms) {
338 sendTimeout_ = ms;
339 if (socket_ <= 0) {
340 return;
341 }
342
343 struct timeval s = {(int)(sendTimeout_/1000),
344 (int)((sendTimeout_%1000)*1000)};
345 int ret = setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, &s, sizeof(s));
346 if (ret == -1) {
347 perror("TSocket::setSendTimeout()");
348 }
349}
350
Marc Slemko6f038a72006-08-03 18:58:09 +0000351}}} // facebook::thrift::transport