Better socket timeout and options support for Thrift C++

Summary: Also compile without degugging symbols for the linked library

Reviewed By: aditya


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664810 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/README b/README
index 2700993..b508f2b 100644
--- a/README
+++ b/README
@@ -57,6 +57,10 @@
 
 	./configure --with-boost=/usr/local
 
+Note that by default the thrift C++ library is built with no debugging symbols included. If you would like debugging symbols during development work, run:
+
+        ./configure --CFLAGS='-g -O2'
+
 Run ./configure --help to see other configuration options
 
 Make thrift 
diff --git a/lib/cpp/configure.ac b/lib/cpp/configure.ac
index d571ba0..2d30c59 100644
--- a/lib/cpp/configure.ac
+++ b/lib/cpp/configure.ac
@@ -80,4 +80,8 @@
 
 AC_PROG_LIBTOOL
 
+CFLAGS="-O2"
+
+CXXFLAGS="-O2"
+
 AC_OUTPUT(Makefile)
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc
index 5df87ec..130976c 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cc
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc
@@ -161,7 +161,7 @@
     int quanta = (HIGHEST - LOWEST) + 1;
     float stepsperquanta = (max_priority - min_priority) / quanta;
 
-    if(priority <= HIGHEST) {
+    if (priority <= HIGHEST) {
       return (int)(min_priority + stepsperquanta * priority);
     } else {
       // should never get here for priority increments.
diff --git a/lib/cpp/src/transport/TServerSocket.cc b/lib/cpp/src/transport/TServerSocket.cc
index 75bd504..8a14ea5 100644
--- a/lib/cpp/src/transport/TServerSocket.cc
+++ b/lib/cpp/src/transport/TServerSocket.cc
@@ -1,5 +1,6 @@
 #include <sys/socket.h>
 #include <netinet/in.h>
+#include <netinet/tcp.h>
 #include <errno.h>
 
 #include "TSocket.h"
@@ -11,12 +12,31 @@
 using namespace boost;
 
 TServerSocket::TServerSocket(int port) :
-  port_(port), serverSocket_(0), acceptBacklog_(1024) {}
+  port_(port),
+  serverSocket_(0),
+  acceptBacklog_(1024),
+  sendTimeout_(0),
+  recvTimeout_(0) {}
+
+TServerSocket::TServerSocket(int port, int sendTimeout, int recvTimeout) :
+  port_(port),
+  serverSocket_(0),
+  acceptBacklog_(1024),
+  sendTimeout_(sendTimeout),
+  recvTimeout_(recvTimeout) {}
 
 TServerSocket::~TServerSocket() {
   close();
 }
 
+void TServerSocket::setSendTimeout(int sendTimeout) {
+  sendTimeout_ = sendTimeout;
+}
+
+void TServerSocket::setRecvTimeout(int recvTimeout) {
+  recvTimeout_ = recvTimeout;
+}
+
 void TServerSocket::listen() {
   serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
   if (serverSocket_ == -1) {
@@ -34,6 +54,16 @@
     throw TTransportException(TTX_NOT_OPEN, "Could not set SO_REUSEADDR");
   }
 
+  // Defer accept
+  #ifdef TCP_DEFER_ACCEPT
+  if (-1 == setsockopt(serverSocket_, SOL_SOCKET, TCP_DEFER_ACCEPT,
+                       &one, sizeof(one))) {
+    perror("TServerSocket::listen() TCP_DEFER_ACCEPT");
+    close();
+    throw TTransportException(TTX_NOT_OPEN, "Could not set TCP_DEFER_ACCEPT");
+  }
+  #endif // #ifdef TCP_DEFER_ACCEPT
+
   // Turn linger off, don't want to block on calls to close
   struct linger ling = {0, 0};
   if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER,
@@ -43,6 +73,14 @@
     throw TTransportException(TTX_NOT_OPEN, "Could not set SO_LINGER");
   }
 
+  // TCP Nodelay, speed over bandwidth
+  if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY,
+                       &one, sizeof(one))) {
+    close();
+    perror("setsockopt TCP_NODELAY");
+    throw TTransportException(TTX_NOT_OPEN, "Could not set TCP_NODELAY");
+  }
+
   // Bind to a port
   struct sockaddr_in addr;
   memset(&addr, 0, sizeof(addr));
@@ -83,7 +121,14 @@
     throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
   }
 
-  return shared_ptr<TTransport>(new TSocket(clientSocket));
+  shared_ptr<TSocket> client(new TSocket(clientSocket));
+  if (sendTimeout_ > 0) {
+    client->setSendTimeout(sendTimeout_);
+  }
+  if (recvTimeout_ > 0) {
+    client->setRecvTimeout(recvTimeout_);
+  }                          
+  return client;
 }
 
 void TServerSocket::close() {
diff --git a/lib/cpp/src/transport/TServerSocket.h b/lib/cpp/src/transport/TServerSocket.h
index 29cfdfe..496d540 100644
--- a/lib/cpp/src/transport/TServerSocket.h
+++ b/lib/cpp/src/transport/TServerSocket.h
@@ -17,8 +17,13 @@
 class TServerSocket : public TServerTransport {
  public:
   TServerSocket(int port);
+  TServerSocket(int port, int sendTimeout, int recvTimeout);
+
   ~TServerSocket();
 
+  void setSendTimeout(int sendTimeout);
+  void setRecvTimeout(int recvTimeout);
+
   void listen();
   void close();
 
@@ -26,10 +31,11 @@
   shared_ptr<TTransport> acceptImpl();
 
  private:
-
   int port_;
   int serverSocket_;
   int acceptBacklog_;
+  int sendTimeout_;
+  int recvTimeout_;
 };
 
 }}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TSocket.cc b/lib/cpp/src/transport/TSocket.cc
index eef76ef..de3bea7 100644
--- a/lib/cpp/src/transport/TSocket.cc
+++ b/lib/cpp/src/transport/TSocket.cc
@@ -6,14 +6,19 @@
 #include <netdb.h>
 #include <unistd.h>
 #include <errno.h>
+#include <fcntl.h>
+#include <sys/select.h>
 
+#include "concurrency/Monitor.h"
 #include "TSocket.h"
 #include "TTransportException.h"
 
 namespace facebook { namespace thrift { namespace transport { 
 
 using namespace std;
+using namespace facebook::thrift::concurrency;
 
+// Global var to track total socket sys calls
 uint32_t g_socket_syscalls = 0;
 
 /**
@@ -23,18 +28,35 @@
  */
 
 // Mutex to protect syscalls to netdb
-pthread_mutex_t g_netdb_mutex = PTHREAD_MUTEX_INITIALIZER;
+static Monitor s_netdb_monitor;
 
 // TODO(mcslee): Make this an option to the socket class
 #define MAX_RECV_RETRIES 20
-
-TSocket::TSocket(string host, int port) :
-  host_(host), port_(port), socket_(0) {}
-
-TSocket::TSocket(int socket) {
-  socket_ = socket;
+  
+TSocket::TSocket(string host, int port) : 
+  host_(host),
+  port_(port),
+  socket_(0),
+  connTimeout_(0),
+  sendTimeout_(0),
+  recvTimeout_(0),
+  lingerOn_(1),
+  lingerVal_(0),
+  noDelay_(1) {
 }
 
+TSocket::TSocket(int socket) :
+  host_(""),
+  port_(0),
+  socket_(socket),
+  connTimeout_(0),
+  sendTimeout_(0),
+  recvTimeout_(0),
+  lingerOn_(1),
+  lingerVal_(0),
+  noDelay_(1) {
+}
+  
 TSocket::~TSocket() {
   close();
 }
@@ -51,25 +73,35 @@
     close();
     throw TTransportException(TTX_NOT_OPEN, "socket() ERROR:" + errno);
   }
-  
+
+  // Send timeout
+  if (sendTimeout_ > 0) {
+    setSendTimeout(sendTimeout_);
+  }
+
+  // Recv timeout
+  if (recvTimeout_ > 0) {
+    setRecvTimeout(recvTimeout_);
+  }
+
+  // Linger
+  setLinger(lingerOn_, lingerVal_);
+
+  // No delay
+  setNoDelay(noDelay_);
+
   // Lookup the hostname
   struct sockaddr_in addr;
   addr.sin_family = AF_INET;
   addr.sin_port = htons(port_);
 
-  /*
-  if (inet_pton(AF_INET, host_.c_str(), &addr.sin_addr) < 0) {
-    perror("TSocket::open() inet_pton");
-  }
-  */
-
   {
-    // TODO(mcslee): Fix scope-locking here to protect hostname lookups
-    // scopelock sl(&netdb_mutex);
+    // Scope lock on host entry lookup
+    Synchronized s(s_netdb_monitor);
     struct hostent *host_entry = gethostbyname(host_.c_str());
     
     if (host_entry == NULL) {
-      // perror("dns error: failed call to gethostbyname.\n");
+      perror("TSocket: dns error: failed call to gethostbyname.");
       close();
       throw TTransportException(TTX_NOT_OPEN, "gethostbyname() failed");
     }
@@ -79,18 +111,69 @@
            host_entry->h_addr_list[0],
            host_entry->h_length);
   }
+
+  // Set the socket to be non blocking for connect if a timeout exists
+  int flags = fcntl(socket_, F_GETFL, 0); 
+  if (connTimeout_ > 0) {
+    fcntl(socket_, F_SETFL, flags | O_NONBLOCK);
+  } else {
+    fcntl(socket_, F_SETFL, flags | ~O_NONBLOCK);
+  }
+
+  // Conn timeout
+  struct timeval c = {(int)(connTimeout_/1000),
+                      (int)((connTimeout_%1000)*1000)};
    
   // Connect the socket
   int ret = connect(socket_, (struct sockaddr *)&addr, sizeof(addr));
   
-  // Connect failed
-  if (ret < 0) {
-    perror("TSocket::open() connect");
+  if (ret == 0) {
+    goto done;
+  }
+
+  if (errno != EINPROGRESS) {
     close();
+    char buff[1024];
+    sprintf(buff, "TSocket::open() connect %s %d", host_.c_str(), port_);
+    perror(buff);
     throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
   }
 
-  // Connection was successful
+  fd_set fds;
+  FD_ZERO(&fds);
+  FD_SET(socket_, &fds);
+  ret = select(socket_+1, NULL, &fds, NULL, &c);
+
+  if (ret > 0) {
+    // Ensure connected
+    int val;
+    socklen_t lon;
+    lon = sizeof(int);
+    int ret2 = getsockopt(socket_, SOL_SOCKET, SO_ERROR, (void *)&val, &lon);
+    if (ret2 == -1) {
+      close();
+      perror("TSocket::open() getsockopt SO_ERROR");
+      throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
+    }
+    if (val == 0) {
+      goto done;
+    }
+    close();
+    perror("TSocket::open() SO_ERROR was set");
+    throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
+  } else if (ret == 0) {
+    close();
+    perror("TSocket::open() timeed out");
+    throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);   
+  } else {
+    close();
+    perror("TSocket::open() select error");
+    throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
+  }
+
+ done:
+  // Set socket back to normal mode (blocking)
+  fcntl(socket_, F_SETFL, flags);
 }
 
 void TSocket::close() {
@@ -167,12 +250,11 @@
   while (sent < len) {
 
     int flags = 0;
-
-    #if defined(MSG_NOSIGNAL)
+    #ifdef MSG_NOSIGNAL
     // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
     // check for the EPIPE return condition and close the socket in that case
     flags |= MSG_NOSIGNAL;
-    #endif // defined(MSG_NOSIGNAL)
+    #endif // ifdef MSG_NOSIGNAL
 
     int b = send(socket_, buf + sent, len - sent, flags);
     ++g_socket_syscalls;
@@ -207,29 +289,63 @@
 }
 
 void TSocket::setLinger(bool on, int linger) {
-  // TODO(mcslee): Store these options so they can be set pre-connect
+  lingerOn_ = on;
+  lingerVal_ = linger;
   if (socket_ <= 0) {
     return;
   }
 
-  struct linger ling = {(on ? 1 : 0), linger};
-  if (-1 == setsockopt(socket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling))) {
-    close();
+  struct linger l = {(lingerOn_ ? 1 : 0), lingerVal_};
+  int ret = setsockopt(socket_, SOL_SOCKET, SO_LINGER, &l, sizeof(l));
+  if (ret == -1) {
     perror("TSocket::setLinger()");
   }
 }
 
 void TSocket::setNoDelay(bool noDelay) {
-  // TODO(mcslee): Store these options so they can be set pre-connect
+  noDelay_ = noDelay;
   if (socket_ <= 0) {
     return;
   }
 
   // Set socket to NODELAY
-  int val = (noDelay ? 1 : 0);
-  if (-1 == setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) {
-    close();
+  int v = noDelay_ ? 1 : 0;
+  int ret = setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &v, sizeof(v));
+  if (ret == -1) {
     perror("TSocket::setNoDelay()");
   }
 }
+
+void TSocket::setConnTimeout(int ms) {
+  connTimeout_ = ms;
+}
+
+void TSocket::setRecvTimeout(int ms) {
+  recvTimeout_ = ms;
+  if (socket_ <= 0) {
+    return;
+  }
+
+  struct timeval r = {(int)(recvTimeout_/1000),
+                      (int)((recvTimeout_%1000)*1000)};
+  int ret = setsockopt(socket_, SOL_SOCKET, SO_RCVTIMEO, &r, sizeof(r));
+  if (ret == -1) {
+    perror("TSocket::setRecvTimeout()");
+  }
+}
+
+void TSocket::setSendTimeout(int ms) {
+  sendTimeout_ = ms;
+  if (socket_ <= 0) {
+    return;
+  }
+   
+  struct timeval s = {(int)(sendTimeout_/1000),
+                      (int)((sendTimeout_%1000)*1000)};
+  int ret = setsockopt(socket_, SOL_SOCKET, SO_SNDTIMEO, &s, sizeof(s));
+  if (ret == -1) {
+    perror("TSocket::setSendTimeout()");
+  }
+}
+
 }}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h
index f1a065f..b946b6a 100644
--- a/lib/cpp/src/transport/TSocket.h
+++ b/lib/cpp/src/transport/TSocket.h
@@ -82,6 +82,22 @@
    */
   void setNoDelay(bool noDelay);
 
+  /**
+   * Set the connect timeout
+   */
+  void setConnTimeout(int ms);
+
+  /**
+   * Set the receive timeout
+   */
+  void setRecvTimeout(int ms);
+
+  /**
+   * Set the send timeout
+   */
+  void setSendTimeout(int ms);
+
+
  private:
   /**
    * Constructor to create socket from raw UNIX handle. Never called directly
@@ -97,6 +113,24 @@
 
   /** Underlying UNIX socket handle */
   int socket_;
+
+  /** Connect timeout in ms */
+  int connTimeout_;
+
+  /** Send timeout in ms */
+  int sendTimeout_;
+
+  /** Recv timeout in ms */
+  int recvTimeout_;
+
+  /** Linger on */
+  bool lingerOn_;
+  
+  /** Linger val */
+  int lingerVal_;
+
+  /** Nodelay */
+  bool noDelay_;
 };
 
 }}} // facebook::thrift::transport