Rev 2 of Thrift, the Pillar successor

Summary: End-to-end communications and serialization in C++ is working

Reviewed By: aditya

Test Plan: See the new top-level test/ folder. It vaguely resembles a unit test, though it could be more automated.

Revert Plan: Revertible

Notes: Still a LOT of optimization work to be done on the generated C++ code, which should be using dynamic memory in a number of places. Next major task is writing the PHP/Java/Python generators.




git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664712 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/transport/TServerSocket.cc b/lib/cpp/transport/TServerSocket.cc
new file mode 100644
index 0000000..178de81
--- /dev/null
+++ b/lib/cpp/transport/TServerSocket.cc
@@ -0,0 +1,90 @@
+#include <sys/socket.h>
+#include <netinet/in.h>
+
+#include "transport/TSocket.h"
+#include "transport/TServerSocket.h"
+
+TServerSocket::TServerSocket(int port) :
+  port_(port), serverSocket_(0), acceptBacklog_(1024) {}
+
+TServerSocket::~TServerSocket() {
+  close();
+}
+
+bool TServerSocket::listen() {
+  serverSocket_ = socket(AF_INET, SOCK_STREAM, 0);
+  if (serverSocket_ == -1) {
+    close();
+    return false;
+  }
+
+  // Set reusaddress to prevent 2MSL delay on accept
+  int one = 1;
+  if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_REUSEADDR,
+                       &one, sizeof(one))) {
+    perror("TServerSocket::listen() SO_REUSEADDR");
+    close();
+    return false;
+  }
+
+  // Turn linger off, don't want to block on calls to close
+  struct linger ling = {0, 0};
+  if (-1 == setsockopt(serverSocket_, SOL_SOCKET, SO_LINGER,
+                       &ling, sizeof(ling))) {
+    perror("TServerSocket::listen() SO_LINGER");
+    close();
+    return false;
+  }
+
+  // Bind to a port
+  struct sockaddr_in addr;
+  memset(&addr, 0, sizeof(addr));
+  addr.sin_family = AF_INET;
+  addr.sin_port = htons(port_);
+  addr.sin_addr.s_addr = INADDR_ANY;
+  if (-1 == bind(serverSocket_, (struct sockaddr *)&addr, sizeof(addr))) {
+    char errbuf[1024];
+    sprintf(errbuf, "TServerSocket::listen() BIND %d", port_);
+    perror(errbuf);
+    close();
+    return false;
+  }
+
+  // Call listen
+  if (-1 == ::listen(serverSocket_, acceptBacklog_)) {
+    perror("TServerSocket::listen() LISTEN");
+    close();
+    return false;
+  }
+
+  // The socket is now listening!
+  return true;
+}
+
+TTransport* TServerSocket::accept() {
+  if (serverSocket_ <= 0) {
+    // TODO(mcslee): Log error with common logging tool
+    return NULL;
+  }
+
+  struct sockaddr_in clientAddress;
+  int size = sizeof(clientAddress);
+  int clientSocket = ::accept(serverSocket_,
+                              (struct sockaddr *) &clientAddress,
+                              (socklen_t *) &size);
+    
+  if (clientSocket <= 0) {
+    perror("TServerSocket::accept()");
+    return NULL;
+  }
+
+  return new TSocket(clientSocket);
+}
+
+void TServerSocket::close() {
+  if (serverSocket_ > 0) {
+    shutdown(serverSocket_, SHUT_RDWR);
+    ::close(serverSocket_);
+  }
+  serverSocket_ = 0;
+}
diff --git a/lib/cpp/transport/TServerSocket.h b/lib/cpp/transport/TServerSocket.h
new file mode 100644
index 0000000..8ded4e2
--- /dev/null
+++ b/lib/cpp/transport/TServerSocket.h
@@ -0,0 +1,29 @@
+#ifndef T_SERVER_SOCKET_H
+#define T_SERVER_SOCKET_H
+
+#include "transport/TServerTransport.h"
+
+class TSocket;
+
+/**
+ * Server socket implementation of TServerTransport. Wrapper around a unix
+ * socket listen and accept calls.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TServerSocket : public TServerTransport {
+ public:
+  TServerSocket(int port);
+  ~TServerSocket();
+
+  bool listen();
+  TTransport* accept();
+  void close();
+
+ private:
+  int port_;
+  int serverSocket_;
+  int acceptBacklog_;
+};
+
+#endif
diff --git a/lib/cpp/transport/TServerTransport.h b/lib/cpp/transport/TServerTransport.h
new file mode 100644
index 0000000..4d063fc
--- /dev/null
+++ b/lib/cpp/transport/TServerTransport.h
@@ -0,0 +1,24 @@
+#ifndef T_SERVER_TRANSPORT_H
+#define T_SERVER_TRANSPORT_H
+
+#include "TTransport.h"
+
+/**
+ * Server transport framework. A server needs to have some facility for
+ * creating base transports to read/write from.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TServerTransport {
+ public:
+  virtual ~TServerTransport() {}
+
+  virtual bool listen() = 0;
+  virtual TTransport* accept() = 0;
+  virtual void close() = 0;
+
+ protected:
+  TServerTransport() {}
+};
+
+#endif
diff --git a/lib/cpp/transport/TSocket.cc b/lib/cpp/transport/TSocket.cc
new file mode 100644
index 0000000..1dfe431
--- /dev/null
+++ b/lib/cpp/transport/TSocket.cc
@@ -0,0 +1,180 @@
+#include <sys/socket.h>
+#include <arpa/inet.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <errno.h>
+
+#include "transport/TSocket.h"
+
+using namespace std;
+
+// Mutex to protect syscalls to netdb
+pthread_mutex_t g_netdb_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+// TODO(mcslee): Make this an option to the socket class
+#define MAX_RECV_RETRIES 20
+
+TSocket::TSocket(string host, int port) :
+  host_(host), port_(port), socket_(0) {}
+
+TSocket::TSocket(int socket) {
+  socket_ = socket;
+}
+
+TSocket::~TSocket() {
+  close();
+}
+
+bool TSocket::open() {
+  // Create socket
+  socket_ = socket(AF_INET, SOCK_STREAM, 0);
+  if (socket_ == -1) {
+    socket_ = 0;
+    return false;
+  }
+  
+  // Lookup the host
+  struct sockaddr_in addr;
+  addr.sin_family = AF_INET;
+  addr.sin_port = htons(port_);
+
+  /*
+  if (inet_pton(AF_INET, host_.c_str(), &addr.sin_addr) < 0) {
+    perror("TSocket::open() inet_pton");
+  }
+  */
+
+  {
+    // TODO(mcslee): Fix scope-locking here to protect hostname lookups
+    // scopelock sl(&netdb_mutex);
+    struct hostent *host_entry = gethostbyname(host_.c_str());
+    
+    if (host_entry == NULL) {
+      // perror("dns error: failed call to gethostbyname.\n");
+      close();
+      return false;
+    }
+    
+    addr.sin_port = htons(port_);
+    memcpy(&addr.sin_addr.s_addr,
+           host_entry->h_addr_list[0],
+           host_entry->h_length);
+  }
+   
+  // Connect the socket
+  int ret = connect(socket_, (struct sockaddr *)&addr, sizeof(addr));
+  
+  // Connect failed
+  if (ret < 0) {
+    perror("TSocket::open() connect");
+    close();
+    return false;
+  }
+
+  return true;
+}
+
+void TSocket::close() {
+  if (socket_ > 0) {
+    shutdown(socket_, SHUT_RDWR);
+    ::close(socket_);
+  }
+  socket_ = 0;
+}
+
+int TSocket::read(string& s, uint32_t len) {
+  char buff[len];
+  s = "";
+
+  uint32_t have = 0;
+  uint32_t retries = 0;
+
+  while (have < len) {
+  try_again:
+    // Read from the socket
+    int got = recv(socket_, buff+have, len-have, 0);
+
+    // Check for error on read
+    if (got < 0) {
+      perror("TSocket::read()");
+
+      // If temporarily out of resources, sleep a bit and try again
+      if (errno == EAGAIN && retries++ < MAX_RECV_RETRIES) {
+        usleep(50);
+        goto try_again;
+      }
+
+      // If interrupted, try again
+      if (errno == EINTR && retries++ < MAX_RECV_RETRIES) {
+        goto try_again;
+      }
+
+      // If we disconnect with no linger time
+      if (errno == ECONNRESET) {
+        return 0;
+      }
+
+      return 0;
+    }
+    
+    // Check for empty read
+    if (got == 0) {
+      return 0;
+    }
+    
+    // Update the count
+    have += (uint32_t) got;
+  }
+  
+  // Pack data into string
+  s = string(buff, have);
+  return have;
+}
+
+void TSocket::write(const string& s) {
+  uint32_t sent = 0;
+    
+  while (sent < s.size()) {
+    int b = send(socket_, s.data() + sent, s.size() - sent, 0);
+    
+    // Fail on a send error
+    if (b < 0) {
+      // TODO(mcslee): Make the function return how many bytes it wrote or
+      // throw an exception
+      // throw_perror("send");
+      return;
+    }
+    
+    // Fail on blocked send
+    if (b == 0) {
+      // TODO(mcslee): Make the function return how many bytes it wrote or
+      // throw string("couldn't send data.\n");
+      return;
+    }
+
+    sent += b;
+  }
+}
+
+bool TSocket::setLinger(bool on, int linger) {
+  struct linger ling = {(on ? 1 : 0), linger};
+  if (-1 == setsockopt(socket_, SOL_SOCKET, SO_LINGER, &ling, sizeof(ling))) {
+    close();
+    perror("TSocket::setLinger()");
+    return false;
+  }
+  return true; 
+}
+
+bool TSocket::setNoDelay(bool noDelay) {
+  // Set socket to NODELAY
+  int val = (noDelay ? 1 : 0);
+  if (-1 == setsockopt(socket_, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val))) {
+    close();
+    perror("TSocket::setNoDelay()");
+    return false;
+  }
+  return true;
+}
diff --git a/lib/cpp/transport/TSocket.h b/lib/cpp/transport/TSocket.h
new file mode 100644
index 0000000..1da74c6
--- /dev/null
+++ b/lib/cpp/transport/TSocket.h
@@ -0,0 +1,39 @@
+#ifndef T_SOCKET_H
+#define T_SOCKET_H
+
+#include <string>
+
+#include "transport/TTransport.h"
+#include "transport/TServerSocket.h"
+
+class TSocketOptions;
+
+/**
+ * TCP Socket implementation of the TTransport interface.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TSocket : public TTransport {
+  friend TTransport* TServerSocket::accept();
+
+ public:
+  TSocket(std::string host, int port);
+  ~TSocket();
+
+  bool open();
+  void close();
+  int  read (std::string &s, uint32_t size);
+  void write(const std::string& s);
+
+  bool setLinger(bool on, int linger);
+  bool setNoDelay(bool noDelay);
+
+ private:
+  TSocket(int socket);
+  TSocketOptions *options_;
+  std::string host_;
+  int port_;
+  int socket_;
+};
+
+#endif
diff --git a/lib/cpp/transport/TTransport.h b/lib/cpp/transport/TTransport.h
new file mode 100644
index 0000000..a1f43d4
--- /dev/null
+++ b/lib/cpp/transport/TTransport.h
@@ -0,0 +1,25 @@
+#ifndef T_TRANSPORT_H
+#define T_TRANSPORT_H
+
+#include <string>
+
+/**
+ * Generic interface for a method of transporting data.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TTransport {
+ public:
+  virtual ~TTransport() {};
+
+  virtual bool open() = 0;
+  virtual void close() = 0;
+
+  virtual int  read (std::string& s, uint32_t size) = 0;
+  virtual void write(const std::string& s) = 0;
+
+ protected:
+  TTransport() {};
+};
+
+#endif