Update Thrift CPP libraries to work with new generated source, change underlying buffers to use uint8_t* instead of std::string

Summary: Major overhaul to the CPP libraries.

Reviewed By: aditya

Test Plan: Again, keep an eye out for the unit tests commit

Notes: Initial perf tests show that Thrift is not only more robust than Pillar, but its implementation is actually around 10-20% faster. We can do about 10 RPC function calls with small data payloads in under 2ms. THAT IS FAST. THAT IS THRIFTY.




git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664714 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/transport/TSocket.cc b/lib/cpp/transport/TSocket.cc
index 1dfe431..3471755 100644
--- a/lib/cpp/transport/TSocket.cc
+++ b/lib/cpp/transport/TSocket.cc
@@ -7,9 +7,18 @@
 #include <errno.h>
 
 #include "transport/TSocket.h"
+#include "transport/TTransportException.h"
 
 using namespace std;
 
+uint32_t g_socket_syscalls = 0;
+
+/**
+ * TSocket implementation.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+
 // Mutex to protect syscalls to netdb
 pthread_mutex_t g_netdb_mutex = PTHREAD_MUTEX_INITIALIZER;
 
@@ -27,15 +36,20 @@
   close();
 }
 
-bool TSocket::open() {
+bool TSocket::isOpen() {
+  return (socket_ > 0); 
+}
+
+void TSocket::open() {
   // Create socket
   socket_ = socket(AF_INET, SOCK_STREAM, 0);
   if (socket_ == -1) {
-    socket_ = 0;
-    return false;
+    perror("TSocket::open() socket");
+    close();
+    throw TTransportException(TTX_NOT_OPEN, "socket() ERROR:" + errno);
   }
   
-  // Lookup the host
+  // Lookup the hostname
   struct sockaddr_in addr;
   addr.sin_family = AF_INET;
   addr.sin_port = htons(port_);
@@ -54,7 +68,7 @@
     if (host_entry == NULL) {
       // perror("dns error: failed call to gethostbyname.\n");
       close();
-      return false;
+      throw TTransportException(TTX_NOT_OPEN, "gethostbyname() failed");
     }
     
     addr.sin_port = htons(port_);
@@ -70,10 +84,10 @@
   if (ret < 0) {
     perror("TSocket::open() connect");
     close();
-    return false;
+    throw TTransportException(TTX_NOT_OPEN, "open() ERROR: " + errno);
   }
 
-  return true;
+  // Connection was successful
 }
 
 void TSocket::close() {
@@ -84,97 +98,127 @@
   socket_ = 0;
 }
 
-int TSocket::read(string& s, uint32_t len) {
-  char buff[len];
-  s = "";
+uint32_t TSocket::read(uint8_t* buf, uint32_t len) {
+  if (socket_ <= 0) {
+    throw TTransportException(TTX_NOT_OPEN, "Called read on non-open socket");
+  }
 
-  uint32_t have = 0;
   uint32_t retries = 0;
-
-  while (have < len) {
-  try_again:
-    // Read from the socket
-    int got = recv(socket_, buff+have, len-have, 0);
-
-    // Check for error on read
-    if (got < 0) {
-      perror("TSocket::read()");
-
-      // If temporarily out of resources, sleep a bit and try again
-      if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) {
-        usleep(50);
-        goto try_again;
-      }
-
-      // If interrupted, try again
-      if (errno == EINTR && retries++ < MAX_RECV_RETRIES) {
-        goto try_again;
-      }
-
-      // If we disconnect with no linger time
-      if (errno == ECONNRESET) {
-        return 0;
-      }
-
-      return 0;
+  
+ try_again:
+  // Read from the socket
+  int got = recv(socket_, buf, len, 0);
+  ++g_socket_syscalls;
+  
+  // Check for error on read
+  if (got < 0) {
+    perror("TSocket::read()");
+    
+    // If temporarily out of resources, sleep a bit and try again
+    if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) {
+      usleep(50);
+      goto try_again;
     }
     
-    // Check for empty read
-    if (got == 0) {
-      return 0;
+    // If interrupted, try again
+    if (errno == EINTR && retries++ < MAX_RECV_RETRIES) {
+      goto try_again;
     }
     
-    // Update the count
-    have += (uint32_t) got;
+    // If we disconnect with no linger time
+    if (errno == ECONNRESET) {
+      throw TTransportException(TTX_NOT_OPEN, "ECONNRESET");
+    }
+    
+    // This ish isn't open
+    if (errno == ENOTCONN) {
+      throw TTransportException(TTX_NOT_OPEN, "ENOTCONN");
+    }
+    
+    // Timed out!
+    if (errno == ETIMEDOUT) {
+      throw TTransportException(TTX_TIMED_OUT, "ETIMEDOUT");
+    }
+    
+    // Some other error, whatevz
+    throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
+  }
+  
+  // The remote host has closed the socket
+  if (got == 0) {
+    close();
+    return 0;
   }
   
   // Pack data into string
-  s = string(buff, have);
-  return have;
+  return got;
 }
 
-void TSocket::write(const string& s) {
+void TSocket::write(const uint8_t* buf, uint32_t len) {
+  if (socket_ <= 0) {
+    throw TTransportException(TTX_NOT_OPEN, "Called write on non-open socket");
+  }
+
   uint32_t sent = 0;
     
-  while (sent < s.size()) {
-    int b = send(socket_, s.data() + sent, s.size() - sent, 0);
-    
+  while (sent < len) {
+    // Note the use of MSG_NOSIGNAL to suppress SIGPIPE errors, instead we
+    // check for the EPIPE return condition and close the socket in that case
+    int b = send(socket_, buf + sent, len - sent, MSG_NOSIGNAL);
+    ++g_socket_syscalls;
+
     // Fail on a send error
     if (b < 0) {
-      // TODO(mcslee): Make the function return how many bytes it wrote or
-      // throw an exception
-      // throw_perror("send");
-      return;
+      if (errno == EPIPE) {
+        close();
+        throw TTransportException(TTX_NOT_OPEN, "EPIPE");
+      }
+
+      if (errno == ECONNRESET) {
+        close();
+        throw TTransportException(TTX_NOT_OPEN, "ECONNRESET");
+      }
+
+      if (errno == ENOTCONN) {
+        close();
+        throw TTransportException(TTX_NOT_OPEN, "ENOTCONN");
+      }
+
+      perror("TSocket::write() send < 0");
+      throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
     }
     
     // Fail on blocked send
     if (b == 0) {
-      // TODO(mcslee): Make the function return how many bytes it wrote or
-      // throw string("couldn't send data.\n");
-      return;
+      throw TTransportException(TTX_NOT_OPEN, "Socket send returned 0.");
     }
-
     sent += b;
   }
 }
 
-bool TSocket::setLinger(bool on, int linger) {
+void TSocket::setLinger(bool on, int linger) {
+  // TODO(mcslee): Store these options so they can be set pre-connect
+  if (socket_ <= 0) {
+    return;
+  }
+
   struct linger ling = {(on ? 1 : 0), linger};
   if (-1 == setsockopt(socket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling))) {
     close();
     perror("TSocket::setLinger()");
-    return false;
   }
-  return true; 
 }
 
-bool TSocket::setNoDelay(bool noDelay) {
+void TSocket::setNoDelay(bool noDelay) {
+  // TODO(mcslee): Store these options so they can be set pre-connect
+  if (socket_ <= 0) {
+    return;
+  }
+
   // Set socket to NODELAY
   int val = (noDelay ? 1 : 0);
   if (-1 == setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) {
     close();
     perror("TSocket::setNoDelay()");
-    return false;
   }
-  return true;
 }