Better socket timeout and options support for Thrift C++
Summary: Also compile without degugging symbols for the linked library
Reviewed By: aditya
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664810 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/configure.ac b/lib/cpp/configure.ac
index d571ba0..2d30c59 100644
--- a/lib/cpp/configure.ac
+++ b/lib/cpp/configure.ac
@@ -80,4 +80,8 @@
AC_PROG_LIBTOOL
+CFLAGS="-O2"
+
+CXXFLAGS="-O2"
+
AC_OUTPUT(Makefile)
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc
index 5df87ec..130976c 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cc
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc
@@ -161,7 +161,7 @@
int quanta = (HIGHEST - LOWEST) + 1;
float stepsperquanta = (max_priority - min_priority) / quanta;
- if(priority <= HIGHEST) {
+ if (priority <= HIGHEST) {
return (int)(min_priority + stepsperquanta * priority);
} else {
// should never get here for priority increments.
diff --git a/lib/cpp/src/transport/TServerSocket.cc b/lib/cpp/src/transport/TServerSocket.cc
index 75bd504..8a14ea5 100644
--- a/lib/cpp/src/transport/TServerSocket.cc
+++ b/lib/cpp/src/transport/TServerSocket.cc
@@ -1,5 +1,6 @@
#include <sys/socket.h>
#include <netinet/in.h>
+#include <netinet/tcp.h>
#include <errno.h>
#include "TSocket.h"
@@ -11,12 +12,31 @@
using namespace boost;
TServerSocket::TServerSocket(int port) :
- port_(port), serverSocket_(0), acceptBacklog_(1024) {}
+ port_(port),
+ serverSocket_(0),
+ acceptBacklog_(1024),
+ sendTimeout_(0),
+ recvTimeout_(0) {}
+
+TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout) :
+ port_(port),
+ serverSocket_(0),
+ acceptBacklog_(1024),
+ sendTimeout_(sendTimeout),
+ recvTimeout_(recvTimeout) {}
TServerSocket::~TServerSocket() {
close();
}
+void TServerSocket::setSendTimeout(int sendTimeout) {
+ sendTimeout_ = sendTimeout;
+}
+
+void TServerSocket::setRecvTimeout(int recvTimeout) {
+ recvTimeout_ = recvTimeout;
+}
+
void TServerSocket::listen() {
serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
if (serverSocket_ == -1) {
@@ -34,6 +54,16 @@
throw TTransportException(TTX_NOT_OPEN, "Could not set SO_REUSEADDR");
}
+ // Defer accept
+ #ifdef TCP_DEFER_ACCEPT
+ if (-1 == setsockopt(serverSocket_, SOL_SOCKET, TCP_DEFER_ACCEPT,
+ &one, sizeof(one))) {
+ perror("TServerSocket::listen() TCP_DEFER_ACCEPT");
+ close();
+ throw TTransportException(TTX_NOT_OPEN, "Could not set TCP_DEFER_ACCEPT");
+ }
+ #endif // #ifdef TCP_DEFER_ACCEPT
+
// Turn linger off, don't want to block on calls to close
struct linger ling = {0, 0};
if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER,
@@ -43,6 +73,14 @@
throw TTransportException(TTX_NOT_OPEN, "Could not set SO_LINGER");
}
+ // TCP Nodelay, speed over bandwidth
+ if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY,
+ &one, sizeof(one))) {
+ close();
+ perror("setsockopt TCP_NODELAY");
+ throw TTransportException(TTX_NOT_OPEN, "Could not set TCP_NODELAY");
+ }
+
// Bind to a port
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
@@ -83,7 +121,14 @@
throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
}
- return shared_ptr<TTransport>(new TSocket(clientSocket));
+ shared_ptr<TSocket> client(new TSocket(clientSocket));
+ if (sendTimeout_ > 0) {
+ client->setSendTimeout(sendTimeout_);
+ }
+ if (recvTimeout_ > 0) {
+ client->setRecvTimeout(recvTimeout_);
+ }
+ return client;
}
void TServerSocket::close() {
diff --git a/lib/cpp/src/transport/TServerSocket.h b/lib/cpp/src/transport/TServerSocket.h
index 29cfdfe..496d540 100644
--- a/lib/cpp/src/transport/TServerSocket.h
+++ b/lib/cpp/src/transport/TServerSocket.h
@@ -17,8 +17,13 @@
class TServerSocket : public TServerTransport {
public:
TServerSocket(int port);
+ TServerSocket(int port, int sendTimeout, int recvTimeout);
+
~TServerSocket();
+ void setSendTimeout(int sendTimeout);
+ void setRecvTimeout(int recvTimeout);
+
void listen();
void close();
@@ -26,10 +31,11 @@
shared_ptr<TTransport> acceptImpl();
private:
-
int port_;
int serverSocket_;
int acceptBacklog_;
+ int sendTimeout_;
+ int recvTimeout_;
};
}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TSocket.cc b/lib/cpp/src/transport/TSocket.cc
index eef76ef..de3bea7 100644
--- a/lib/cpp/src/transport/TSocket.cc
+++ b/lib/cpp/src/transport/TSocket.cc
@@ -6,14 +6,19 @@
#include <netdb.h>
#include <unistd.h>
#include <errno.h>
+#include <fcntl.h>
+#include <sys/select.h>
+#include "concurrency/Monitor.h"
#include "TSocket.h"
#include "TTransportException.h"
namespace facebook { namespace thrift { namespace transport {
using namespace std;
+using namespace facebook::thrift::concurrency;
+// Global var to track total socket sys calls
uint32_t g_socket_syscalls = 0;
/**
@@ -23,18 +28,35 @@
*/
// Mutex to protect syscalls to netdb
-pthread_mutex_t g_netdb_mutex = PTHREAD_MUTEX_INITIALIZER;
+static Monitor s_netdb_monitor;
// TODO(mcslee): Make this an option to the socket class
#define MAX_RECV_RETRIES 20
-
-TSocket::TSocket(string host, int port) :
- host_(host), port_(port), socket_(0) {}
-
-TSocket::TSocket(int socket) {
- socket_ = socket;
+
+TSocket::TSocket(string host, int port) :
+ host_(host),
+ port_(port),
+ socket_(0),
+ connTimeout_(0),
+ sendTimeout_(0),
+ recvTimeout_(0),
+ lingerOn_(1),
+ lingerVal_(0),
+ noDelay_(1) {
}
+TSocket::TSocket(int socket) :
+ host_(""),
+ port_(0),
+ socket_(socket),
+ connTimeout_(0),
+ sendTimeout_(0),
+ recvTimeout_(0),
+ lingerOn_(1),
+ lingerVal_(0),
+ noDelay_(1) {
+}
+
TSocket::~TSocket() {
close();
}
@@ -51,25 +73,35 @@
close();
throw TTransportException(TTX_NOT_OPEN, "socket() ERROR:" + errno);
}
-
+
+ // Send timeout
+ if (sendTimeout_ > 0) {
+ setSendTimeout(sendTimeout_);
+ }
+
+ // Recv timeout
+ if (recvTimeout_ > 0) {
+ setRecvTimeout(recvTimeout_);
+ }
+
+ // Linger
+ setLinger(lingerOn_, lingerVal_);
+
+ // No delay
+ setNoDelay(noDelay_);
+
// Lookup the hostname
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port_);
- /*
- if (inet_pton(AF_INET, host_.c_str(), &addr.sin_addr) < 0) {
- perror("TSocket::open() inet_pton");
- }
- */
-
{
- // TODO(mcslee): Fix scope-locking here to protect hostname lookups
- // scopelock sl(&netdb_mutex);
+ // Scope lock on host entry lookup
+ Synchronized s(s_netdb_monitor);
struct hostent *host_entry = gethostbyname(host_.c_str());
if (host_entry == NULL) {
- // perror("dns error: failed call to gethostbyname.\n");
+ perror("TSocket: dns error: failed call to gethostbyname.");
close();
throw TTransportException(TTX_NOT_OPEN, "gethostbyname() failed");
}
@@ -79,18 +111,69 @@
host_entry->h_addr_list[0],
host_entry->h_length);
}
+
+ // Set the socket to be non blocking for connect if a timeout exists
+ int flags = fcntl(socket_, F_GETFL, 0);
+ if (connTimeout_ > 0) {
+ fcntl(socket_, F_SETFL, flags | O_NONBLOCK);
+ } else {
+ fcntl(socket_, F_SETFL, flags | ~O_NONBLOCK);
+ }
+
+ // Conn timeout
+ struct timeval c = {(int)(connTimeout_/1000),
+ (int)((connTimeout_%1000)*1000)};
// Connect the socket
int ret = connect(socket_, (struct sockaddr *)&addr, sizeof(addr));
- // Connect failed
- if (ret < 0) {
- perror("TSocket::open() connect");
+ if (ret == 0) {
+ goto done;
+ }
+
+ if (errno != EINPROGRESS) {
close();
+ char buff[1024];
+ sprintf(buff, "TSocket::open() connect %s %d", host_.c_str(), port_);
+ perror(buff);
throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
}
- // Connection was successful
+ fd_set fds;
+ FD_ZERO(&fds);
+ FD_SET(socket_, &fds);
+ ret = select(socket_+1, NULL, &fds, NULL, &c);
+
+ if (ret > 0) {
+ // Ensure connected
+ int val;
+ socklen_t lon;
+ lon = sizeof(int);
+ int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, (void *)&val, &lon);
+ if (ret2 == -1) {
+ close();
+ perror("TSocket::open() getsockopt SO_ERROR");
+ throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
+ }
+ if (val == 0) {
+ goto done;
+ }
+ close();
+ perror("TSocket::open() SO_ERROR was set");
+ throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
+ } else if (ret == 0) {
+ close();
+ perror("TSocket::open() timeed out");
+ throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
+ } else {
+ close();
+ perror("TSocket::open() select error");
+ throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
+ }
+
+ done:
+ // Set socket back to normal mode (blocking)
+ fcntl(socket_, F_SETFL, flags);
}
void TSocket::close() {
@@ -167,12 +250,11 @@
while (sent < len) {
int flags = 0;
-
- #if defined(MSG_NOSIGNAL)
+ #ifdef MSG_NOSIGNAL
// Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
// check for the EPIPE return condition and close the socket in that case
flags |= MSG_NOSIGNAL;
- #endif // defined(MSG_NOSIGNAL)
+ #endif // ifdef MSG_NOSIGNAL
int b = send(socket_, buf + sent, len - sent, flags);
++g_socket_syscalls;
@@ -207,29 +289,63 @@
}
void TSocket::setLinger(bool on, int linger) {
- // TODO(mcslee): Store these options so they can be set pre-connect
+ lingerOn_ = on;
+ lingerVal_ = linger;
if (socket_ <= 0) {
return;
}
- struct linger ling = {(on ? 1 : 0), linger};
- if (-1 == setsockopt(socket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling))) {
- close();
+ struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_};
+ int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
+ if (ret == -1) {
perror("TSocket::setLinger()");
}
}
void TSocket::setNoDelay(bool noDelay) {
- // TODO(mcslee): Store these options so they can be set pre-connect
+ noDelay_ = noDelay;
if (socket_ <= 0) {
return;
}
// Set socket to NODELAY
- int val = (noDelay ? 1 : 0);
- if (-1 == setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) {
- close();
+ int v = noDelay_ ? 1 : 0;
+ int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v));
+ if (ret == -1) {
perror("TSocket::setNoDelay()");
}
}
+
+void TSocket::setConnTimeout(int ms) {
+ connTimeout_ = ms;
+}
+
+void TSocket::setRecvTimeout(int ms) {
+ recvTimeout_ = ms;
+ if (socket_ <= 0) {
+ return;
+ }
+
+ struct timeval r = {(int)(recvTimeout_/1000),
+ (int)((recvTimeout_%1000)*1000)};
+ int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, &r, sizeof(r));
+ if (ret == -1) {
+ perror("TSocket::setRecvTimeout()");
+ }
+}
+
+void TSocket::setSendTimeout(int ms) {
+ sendTimeout_ = ms;
+ if (socket_ <= 0) {
+ return;
+ }
+
+ struct timeval s = {(int)(sendTimeout_/1000),
+ (int)((sendTimeout_%1000)*1000)};
+ int ret = setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, &s, sizeof(s));
+ if (ret == -1) {
+ perror("TSocket::setSendTimeout()");
+ }
+}
+
}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h
index f1a065f..b946b6a 100644
--- a/lib/cpp/src/transport/TSocket.h
+++ b/lib/cpp/src/transport/TSocket.h
@@ -82,6 +82,22 @@
*/
void setNoDelay(bool noDelay);
+ /**
+ * Set the connect timeout
+ */
+ void setConnTimeout(int ms);
+
+ /**
+ * Set the receive timeout
+ */
+ void setRecvTimeout(int ms);
+
+ /**
+ * Set the send timeout
+ */
+ void setSendTimeout(int ms);
+
+
private:
/**
* Constructor to create socket from raw UNIX handle. Never called directly
@@ -97,6 +113,24 @@
/** Underlying UNIX socket handle */
int socket_;
+
+ /** Connect timeout in ms */
+ int connTimeout_;
+
+ /** Send timeout in ms */
+ int sendTimeout_;
+
+ /** Recv timeout in ms */
+ int recvTimeout_;
+
+ /** Linger on */
+ bool lingerOn_;
+
+ /** Linger val */
+ int lingerVal_;
+
+ /** Nodelay */
+ bool noDelay_;
};
}}} // facebook::thrift::transport