blob: 0c3f77348ba51afc4b728ab9e5725868d92384e0 [file] [log] [blame]
Marc Slemkoe03da182006-07-21 21:32:36 +00001#include <config.h>
Mark Sleee8540632006-05-30 09:24:40 +00002#include <sys/socket.h>
3#include <arpa/inet.h>
4#include <netinet/in.h>
5#include <netinet/tcp.h>
6#include <netdb.h>
7#include <unistd.h>
8#include <errno.h>
Mark Slee29050782006-09-29 00:12:30 +00009#include <fcntl.h>
10#include <sys/select.h>
Mark Sleee8540632006-05-30 09:24:40 +000011
Mark Slee29050782006-09-29 00:12:30 +000012#include "concurrency/Monitor.h"
Marc Slemkod42a2c22006-08-10 03:30:18 +000013#include "TSocket.h"
14#include "TTransportException.h"
Mark Sleee8540632006-05-30 09:24:40 +000015
Marc Slemko6f038a72006-08-03 18:58:09 +000016namespace facebook { namespace thrift { namespace transport {
17
Mark Sleee8540632006-05-30 09:24:40 +000018using namespace std;
Mark Slee29050782006-09-29 00:12:30 +000019using namespace facebook::thrift::concurrency;
Mark Sleee8540632006-05-30 09:24:40 +000020
Mark Slee29050782006-09-29 00:12:30 +000021// Global var to track total socket sys calls
Mark Slee8d7e1f62006-06-07 06:48:56 +000022uint32_t g_socket_syscalls = 0;
23
24/**
25 * TSocket implementation.
26 *
27 * @author Mark Slee <mcslee@facebook.com>
28 */
29
Mark Sleee8540632006-05-30 09:24:40 +000030// Mutex to protect syscalls to netdb
Mark Slee29050782006-09-29 00:12:30 +000031static Monitor s_netdb_monitor;
Mark Sleee8540632006-05-30 09:24:40 +000032
33// TODO(mcslee): Make this an option to the socket class
34#define MAX_RECV_RETRIES 20
Mark Slee29050782006-09-29 00:12:30 +000035
36TSocket::TSocket(string host, int port) :
37 host_(host),
38 port_(port),
39 socket_(0),
40 connTimeout_(0),
41 sendTimeout_(0),
42 recvTimeout_(0),
43 lingerOn_(1),
44 lingerVal_(0),
45 noDelay_(1) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000046 recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
47 recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
Mark Sleee8540632006-05-30 09:24:40 +000048}
49
Aditya Agarwalebc99e02007-01-15 23:14:58 +000050TSocket::TSocket() :
51 host_(""),
52 port_(0),
53 socket_(0),
54 connTimeout_(0),
55 sendTimeout_(0),
56 recvTimeout_(0),
57 lingerOn_(1),
58 lingerVal_(0),
59 noDelay_(1) {
60 recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
61 recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
62}
63
Mark Slee29050782006-09-29 00:12:30 +000064TSocket::TSocket(int socket) :
65 host_(""),
66 port_(0),
67 socket_(socket),
68 connTimeout_(0),
69 sendTimeout_(0),
70 recvTimeout_(0),
71 lingerOn_(1),
72 lingerVal_(0),
73 noDelay_(1) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +000074 recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
75 recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
Mark Slee29050782006-09-29 00:12:30 +000076}
77
Mark Sleee8540632006-05-30 09:24:40 +000078TSocket::~TSocket() {
79 close();
80}
81
Mark Slee8d7e1f62006-06-07 06:48:56 +000082bool TSocket::isOpen() {
83 return (socket_ > 0);
84}
85
Mark Sleeb9ff32a2006-11-16 01:00:24 +000086bool TSocket::peek() {
87 if (!isOpen()) {
88 return false;
89 }
90 uint8_t buf;
91 int r = recv(socket_, &buf, 1, MSG_PEEK);
92 if (r == -1) {
93 perror("TSocket::peek()");
94 close();
95 throw TTransportException(TTX_UNKNOWN, "recv() ERROR:" + errno);
96 }
97 return (r > 0);
98}
99
Mark Slee8d7e1f62006-06-07 06:48:56 +0000100void TSocket::open() {
Mark Sleee8540632006-05-30 09:24:40 +0000101 // Create socket
102 socket_ = socket(AF_INET, SOCK_STREAM, 0);
103 if (socket_ == -1) {
Mark Slee8d7e1f62006-06-07 06:48:56 +0000104 perror("TSocket::open() socket");
105 close();
106 throw TTransportException(TTX_NOT_OPEN, "socket() ERROR:" + errno);
Mark Sleee8540632006-05-30 09:24:40 +0000107 }
Mark Slee29050782006-09-29 00:12:30 +0000108
109 // Send timeout
110 if (sendTimeout_ > 0) {
111 setSendTimeout(sendTimeout_);
112 }
113
114 // Recv timeout
115 if (recvTimeout_ > 0) {
116 setRecvTimeout(recvTimeout_);
117 }
118
119 // Linger
120 setLinger(lingerOn_, lingerVal_);
121
122 // No delay
123 setNoDelay(noDelay_);
124
Mark Slee8d7e1f62006-06-07 06:48:56 +0000125 // Lookup the hostname
Mark Sleee8540632006-05-30 09:24:40 +0000126 struct sockaddr_in addr;
127 addr.sin_family = AF_INET;
128 addr.sin_port = htons(port_);
129
Mark Sleee8540632006-05-30 09:24:40 +0000130 {
Mark Slee29050782006-09-29 00:12:30 +0000131 // Scope lock on host entry lookup
132 Synchronized s(s_netdb_monitor);
Mark Sleee8540632006-05-30 09:24:40 +0000133 struct hostent *host_entry = gethostbyname(host_.c_str());
134
135 if (host_entry == NULL) {
Mark Slee29050782006-09-29 00:12:30 +0000136 perror("TSocket: dns error: failed call to gethostbyname.");
Mark Sleee8540632006-05-30 09:24:40 +0000137 close();
Mark Slee8d7e1f62006-06-07 06:48:56 +0000138 throw TTransportException(TTX_NOT_OPEN, "gethostbyname() failed");
Mark Sleee8540632006-05-30 09:24:40 +0000139 }
140
141 addr.sin_port = htons(port_);
142 memcpy(&addr.sin_addr.s_addr,
143 host_entry->h_addr_list[0],
144 host_entry->h_length);
145 }
Mark Slee29050782006-09-29 00:12:30 +0000146
147 // Set the socket to be non blocking for connect if a timeout exists
148 int flags = fcntl(socket_, F_GETFL, 0);
149 if (connTimeout_ > 0) {
150 fcntl(socket_, F_SETFL, flags | O_NONBLOCK);
151 } else {
152 fcntl(socket_, F_SETFL, flags | ~O_NONBLOCK);
153 }
154
155 // Conn timeout
156 struct timeval c = {(int)(connTimeout_/1000),
157 (int)((connTimeout_%1000)*1000)};
Mark Sleee8540632006-05-30 09:24:40 +0000158
159 // Connect the socket
160 int ret = connect(socket_, (struct sockaddr *)&addr, sizeof(addr));
161
Mark Slee29050782006-09-29 00:12:30 +0000162 if (ret == 0) {
163 goto done;
164 }
165
166 if (errno != EINPROGRESS) {
Mark Sleee8540632006-05-30 09:24:40 +0000167 close();
Mark Slee29050782006-09-29 00:12:30 +0000168 char buff[1024];
169 sprintf(buff, "TSocket::open() connect %s %d", host_.c_str(), port_);
170 perror(buff);
Mark Slee8d7e1f62006-06-07 06:48:56 +0000171 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
Mark Sleee8540632006-05-30 09:24:40 +0000172 }
173
Mark Slee29050782006-09-29 00:12:30 +0000174 fd_set fds;
175 FD_ZERO(&fds);
176 FD_SET(socket_, &fds);
177 ret = select(socket_+1, NULL, &fds, NULL, &c);
178
179 if (ret > 0) {
180 // Ensure connected
181 int val;
182 socklen_t lon;
183 lon = sizeof(int);
184 int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, (void *)&val, &lon);
185 if (ret2 == -1) {
186 close();
187 perror("TSocket::open() getsockopt SO_ERROR");
188 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
189 }
190 if (val == 0) {
191 goto done;
192 }
193 close();
194 perror("TSocket::open() SO_ERROR was set");
195 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
196 } else if (ret == 0) {
197 close();
198 perror("TSocket::open() timeed out");
199 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
200 } else {
201 close();
202 perror("TSocket::open() select error");
203 throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
204 }
205
206 done:
207 // Set socket back to normal mode (blocking)
208 fcntl(socket_, F_SETFL, flags);
Mark Sleee8540632006-05-30 09:24:40 +0000209}
210
211void TSocket::close() {
212 if (socket_ > 0) {
213 shutdown(socket_, SHUT_RDWR);
214 ::close(socket_);
215 }
216 socket_ = 0;
217}
218
Mark Slee8d7e1f62006-06-07 06:48:56 +0000219uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
220 if (socket_ <= 0) {
221 throw TTransportException(TTX_NOT_OPEN, "Called read on non-open socket");
222 }
Mark Sleee8540632006-05-30 09:24:40 +0000223
Mark Sleee8540632006-05-30 09:24:40 +0000224 uint32_t retries = 0;
Mark Slee8d7e1f62006-06-07 06:48:56 +0000225
226 try_again:
227 // Read from the socket
228 int got = recv(socket_, buf, len, 0);
229 ++g_socket_syscalls;
230
231 // Check for error on read
Mark Sleec4257802007-01-24 23:14:30 +0000232 if (got < 0) {
Mark Slee8d7e1f62006-06-07 06:48:56 +0000233 // If temporarily out of resources, sleep a bit and try again
234 if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) {
235 usleep(50);
236 goto try_again;
Mark Sleee8540632006-05-30 09:24:40 +0000237 }
238
Mark Slee8d7e1f62006-06-07 06:48:56 +0000239 // If interrupted, try again
240 if (errno == EINTR && retries++ < MAX_RECV_RETRIES) {
241 goto try_again;
Mark Sleee8540632006-05-30 09:24:40 +0000242 }
243
Mark Sleec4257802007-01-24 23:14:30 +0000244 // Now it's not a try again case, but a real probblez
245 perror("TSocket::read()");
246
Mark Slee8d7e1f62006-06-07 06:48:56 +0000247 // If we disconnect with no linger time
248 if (errno == ECONNRESET) {
249 throw TTransportException(TTX_NOT_OPEN, "ECONNRESET");
250 }
251
252 // This ish isn't open
253 if (errno == ENOTCONN) {
254 throw TTransportException(TTX_NOT_OPEN, "ENOTCONN");
255 }
256
257 // Timed out!
258 if (errno == ETIMEDOUT) {
259 throw TTransportException(TTX_TIMED_OUT, "ETIMEDOUT");
260 }
261
262 // Some other error, whatevz
263 throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
264 }
265
266 // The remote host has closed the socket
267 if (got == 0) {
268 close();
269 return 0;
Mark Sleee8540632006-05-30 09:24:40 +0000270 }
271
272 // Pack data into string
Mark Slee8d7e1f62006-06-07 06:48:56 +0000273 return got;
Mark Sleee8540632006-05-30 09:24:40 +0000274}
275
Mark Slee8d7e1f62006-06-07 06:48:56 +0000276void TSocket::write(const uint8_t* buf, uint32_t len) {
277 if (socket_ <= 0) {
278 throw TTransportException(TTX_NOT_OPEN, "Called write on non-open socket");
279 }
280
Mark Sleee8540632006-05-30 09:24:40 +0000281 uint32_t sent = 0;
282
Mark Slee8d7e1f62006-06-07 06:48:56 +0000283 while (sent < len) {
Marc Slemko9d4a3e22006-07-21 19:53:48 +0000284
285 int flags = 0;
Mark Slee29050782006-09-29 00:12:30 +0000286 #ifdef MSG_NOSIGNAL
Mark Slee8d7e1f62006-06-07 06:48:56 +0000287 // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
288 // check for the EPIPE return condition and close the socket in that case
Marc Slemko9d4a3e22006-07-21 19:53:48 +0000289 flags |= MSG_NOSIGNAL;
Mark Slee29050782006-09-29 00:12:30 +0000290 #endif // ifdef MSG_NOSIGNAL
Marc Slemko9d4a3e22006-07-21 19:53:48 +0000291
292 int b = send(socket_, buf + sent, len - sent, flags);
Mark Slee8d7e1f62006-06-07 06:48:56 +0000293 ++g_socket_syscalls;
294
Mark Sleee8540632006-05-30 09:24:40 +0000295 // Fail on a send error
296 if (b < 0) {
Mark Slee8d7e1f62006-06-07 06:48:56 +0000297 if (errno == EPIPE) {
298 close();
299 throw TTransportException(TTX_NOT_OPEN, "EPIPE");
300 }
301
302 if (errno == ECONNRESET) {
303 close();
304 throw TTransportException(TTX_NOT_OPEN, "ECONNRESET");
305 }
306
307 if (errno == ENOTCONN) {
308 close();
309 throw TTransportException(TTX_NOT_OPEN, "ENOTCONN");
310 }
311
312 perror("TSocket::write() send < 0");
313 throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
Mark Sleee8540632006-05-30 09:24:40 +0000314 }
315
316 // Fail on blocked send
317 if (b == 0) {
Mark Slee8d7e1f62006-06-07 06:48:56 +0000318 throw TTransportException(TTX_NOT_OPEN, "Socket send returned 0.");
Mark Sleee8540632006-05-30 09:24:40 +0000319 }
Mark Sleee8540632006-05-30 09:24:40 +0000320 sent += b;
321 }
322}
323
Aditya Agarwalebc99e02007-01-15 23:14:58 +0000324void TSocket::setHost(string host) {
325 host_ = host;
326}
327
328void TSocket::setPort(int port) {
329 port_ = port;
330}
331
Mark Slee8d7e1f62006-06-07 06:48:56 +0000332void TSocket::setLinger(bool on, int linger) {
Mark Slee29050782006-09-29 00:12:30 +0000333 lingerOn_ = on;
334 lingerVal_ = linger;
Mark Slee8d7e1f62006-06-07 06:48:56 +0000335 if (socket_ <= 0) {
336 return;
337 }
338
Mark Slee29050782006-09-29 00:12:30 +0000339 struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_};
340 int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
341 if (ret == -1) {
Mark Sleee8540632006-05-30 09:24:40 +0000342 perror("TSocket::setLinger()");
Mark Sleee8540632006-05-30 09:24:40 +0000343 }
Mark Sleee8540632006-05-30 09:24:40 +0000344}
345
Mark Slee8d7e1f62006-06-07 06:48:56 +0000346void TSocket::setNoDelay(bool noDelay) {
Mark Slee29050782006-09-29 00:12:30 +0000347 noDelay_ = noDelay;
Mark Slee8d7e1f62006-06-07 06:48:56 +0000348 if (socket_ <= 0) {
349 return;
350 }
351
Mark Sleee8540632006-05-30 09:24:40 +0000352 // Set socket to NODELAY
Mark Slee29050782006-09-29 00:12:30 +0000353 int v = noDelay_ ? 1 : 0;
354 int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v));
355 if (ret == -1) {
Mark Sleee8540632006-05-30 09:24:40 +0000356 perror("TSocket::setNoDelay()");
Mark Sleee8540632006-05-30 09:24:40 +0000357 }
Mark Sleee8540632006-05-30 09:24:40 +0000358}
Mark Slee29050782006-09-29 00:12:30 +0000359
360void TSocket::setConnTimeout(int ms) {
361 connTimeout_ = ms;
362}
363
364void TSocket::setRecvTimeout(int ms) {
365 recvTimeout_ = ms;
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000366 recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
367 recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
Mark Slee29050782006-09-29 00:12:30 +0000368 if (socket_ <= 0) {
369 return;
370 }
371
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000372 // Copy because select may modify
373 struct timeval r = recvTimeval_;
Mark Slee29050782006-09-29 00:12:30 +0000374 int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, &r, sizeof(r));
375 if (ret == -1) {
376 perror("TSocket::setRecvTimeout()");
377 }
378}
379
380void TSocket::setSendTimeout(int ms) {
381 sendTimeout_ = ms;
382 if (socket_ <= 0) {
383 return;
384 }
385
386 struct timeval s = {(int)(sendTimeout_/1000),
387 (int)((sendTimeout_%1000)*1000)};
388 int ret = setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, &s, sizeof(s));
389 if (ret == -1) {
390 perror("TSocket::setSendTimeout()");
391 }
392}
393
Marc Slemko6f038a72006-08-03 18:58:09 +0000394}}} // facebook::thrift::transport