THRIFT-900. cpp: Unix domain socket

This patch adds a new Unix Socket transport.

Patch: Roger Meier

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1002179 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TServerSocket.cpp b/lib/cpp/src/transport/TServerSocket.cpp
index 2f14fd5..836f6ba 100644
--- a/lib/cpp/src/transport/TServerSocket.cpp
+++ b/lib/cpp/src/transport/TServerSocket.cpp
@@ -20,6 +20,7 @@
 #include <cstring>
 #include <sys/types.h>
 #include <sys/socket.h>
+#include <sys/un.h>
 #include <sys/poll.h>
 #include <sys/types.h>
 #include <netinet/in.h>
@@ -68,6 +69,20 @@
   intSock1_(-1),
   intSock2_(-1) {}
 
+TServerSocket::TServerSocket(string path) :
+  port_(0),
+  path_(path),
+  serverSocket_(-1),
+  acceptBacklog_(1024),
+  sendTimeout_(0),
+  recvTimeout_(0),
+  retryLimit_(0),
+  retryDelay_(0),
+  tcpSendBuffer_(0),
+  tcpRecvBuffer_(0),
+  intSock1_(-1),
+  intSock2_(-1) {}
+
 TServerSocket::~TServerSocket() {
   close();
 }
@@ -131,7 +146,12 @@
       break;
   }
 
-  serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+  if (! path_.empty()) {
+    serverSocket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
+  } else {
+    serverSocket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+  }
+
   if (serverSocket_ == -1) {
     int errno_copy = errno;
     GlobalOutput.perror("TServerSocket::listen() socket() ", errno_copy);
@@ -201,13 +221,16 @@
     throw TTransportException(TTransportException::NOT_OPEN, "Could not set SO_LINGER", errno_copy);
   }
 
-  // TCP Nodelay, speed over bandwidth
-  if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY,
-                       &one, sizeof(one))) {
-    int errno_copy = errno;
-    GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
-    close();
-    throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY", errno_copy);
+  // Unix Sockets do not need that
+  if (path_.empty()) {
+    // TCP Nodelay, speed over bandwidth
+    if (-1 == setsockopt(serverSocket_, IPPROTO_TCP, TCP_NODELAY,
+                         &one, sizeof(one))) {
+      int errno_copy = errno;
+      GlobalOutput.perror("TServerSocket::listen() setsockopt() TCP_NODELAY ", errno_copy);
+      close();
+      throw TTransportException(TTransportException::NOT_OPEN, "Could not set TCP_NODELAY", errno_copy);
+    }
   }
 
   // Set NONBLOCK on the accept socket
@@ -228,21 +251,49 @@
   // we may want to try to bind more than once, since SO_REUSEADDR doesn't
   // always seem to work. The client can configure the retry variables.
   int retries = 0;
-  do {
-    if (0 == bind(serverSocket_, res->ai_addr, res->ai_addrlen)) {
-      break;
+
+  if (! path_.empty()) {
+    // Unix Domain Socket
+    struct sockaddr_un address;
+    socklen_t len;
+
+    if (path_.length() > sizeof(address.sun_path)) {
+      int errno_copy = errno;
+      GlobalOutput.perror("TSocket::listen() Unix Domain socket path too long", errno_copy);
+      throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long");
     }
 
-    // use short circuit evaluation here to only sleep if we need to
-  } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0));
+    address.sun_family = AF_UNIX;
+    sprintf(address.sun_path, path_.c_str());
+    len = sizeof(address);
 
-  // free addrinfo
-  freeaddrinfo(res0);
+    do {
+      if (0 == bind(serverSocket_, (struct sockaddr *) &address, len)) {
+        break;
+      }
+      // use short circuit evaluation here to only sleep if we need to
+    } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0));
+  } else {
+    do {
+      if (0 == bind(serverSocket_, res->ai_addr, res->ai_addrlen)) {
+        break;
+      }
+      // use short circuit evaluation here to only sleep if we need to
+    } while ((retries++ < retryLimit_) && (sleep(retryDelay_) == 0));
+
+    // free addrinfo
+    freeaddrinfo(res0);
+  }
 
   // throw an error if we failed to bind properly
   if (retries > retryLimit_) {
     char errbuf[1024];
-    sprintf(errbuf, "TServerSocket::listen() BIND %d", port_);
+    if (! path_.empty()) {
+      sprintf(errbuf, "TServerSocket::listen() PATH %s", path_.c_str());
+    }
+    else {
+      sprintf(errbuf, "TServerSocket::listen() BIND %d", port_);
+    }
     GlobalOutput(errbuf);
     close();
     throw TTransportException(TTransportException::NOT_OPEN, "Could not bind");
diff --git a/lib/cpp/src/transport/TServerSocket.h b/lib/cpp/src/transport/TServerSocket.h
index a6be017..8cd521f 100644
--- a/lib/cpp/src/transport/TServerSocket.h
+++ b/lib/cpp/src/transport/TServerSocket.h
@@ -36,6 +36,7 @@
  public:
   TServerSocket(int port);
   TServerSocket(int port, int sendTimeout, int recvTimeout);
+  TServerSocket(std::string path);
 
   ~TServerSocket();
 
@@ -58,6 +59,7 @@
 
  private:
   int port_;
+  std::string path_;
   int serverSocket_;
   int acceptBacklog_;
   int sendTimeout_;
diff --git a/lib/cpp/src/transport/TSocket.cpp b/lib/cpp/src/transport/TSocket.cpp
index 5da33bb..951ddcf 100644
--- a/lib/cpp/src/transport/TSocket.cpp
+++ b/lib/cpp/src/transport/TSocket.cpp
@@ -21,6 +21,7 @@
 #include <cstring>
 #include <sstream>
 #include <sys/socket.h>
+#include <sys/un.h>
 #include <sys/poll.h>
 #include <sys/types.h>
 #include <arpa/inet.h>
@@ -50,6 +51,23 @@
 TSocket::TSocket(string host, int port) :
   host_(host),
   port_(port),
+  path_(""),
+  socket_(-1),
+  connTimeout_(0),
+  sendTimeout_(0),
+  recvTimeout_(0),
+  lingerOn_(1),
+  lingerVal_(0),
+  noDelay_(1),
+  maxRecvRetries_(5) {
+  recvTimeval_.tv_sec = (int)(recvTimeout_/1000);
+  recvTimeval_.tv_usec = (int)((recvTimeout_%1000)*1000);
+}
+
+TSocket::TSocket(string path) :
+  host_(""),
+  port_(0),
+  path_(path),
   socket_(-1),
   connTimeout_(0),
   sendTimeout_(0),
@@ -65,6 +83,7 @@
 TSocket::TSocket() :
   host_(""),
   port_(0),
+  path_(""),
   socket_(-1),
   connTimeout_(0),
   sendTimeout_(0),
@@ -80,6 +99,7 @@
 TSocket::TSocket(int socket) :
   host_(""),
   port_(0),
+  path_(""),
   socket_(socket),
   connTimeout_(0),
   sendTimeout_(0),
@@ -130,7 +150,12 @@
     return;
   }
 
-  socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+  if (! path_.empty()) {
+    socket_ = socket(PF_UNIX, SOCK_STREAM, IPPROTO_IP);
+  } else {
+    socket_ = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+  }
+
   if (socket_ == -1) {
     int errno_copy = errno;
     GlobalOutput.perror("TSocket::open() socket() " + getSocketInfo(), errno_copy);
@@ -179,7 +204,24 @@
   }
 
   // Connect the socket
-  int ret = connect(socket_, res->ai_addr, res->ai_addrlen);
+  int ret;
+  if (! path_.empty()) {
+    struct sockaddr_un address;
+    socklen_t len;
+
+    if (path_.length() > sizeof(address.sun_path)) {
+      int errno_copy = errno;
+      GlobalOutput.perror("TSocket::open() Unix Domain socket path too long", errno_copy);
+      throw TTransportException(TTransportException::NOT_OPEN, " Unix Domain socket path too long");
+    }
+
+    address.sun_family = AF_UNIX;
+    sprintf(address.sun_path, path_.c_str());
+    len = sizeof(address);
+    ret = connect(socket_, (struct sockaddr *) &address, len);
+  } else {
+    ret = connect(socket_, res->ai_addr, res->ai_addrlen);
+  }
 
   // success case
   if (ret == 0) {
@@ -237,6 +279,24 @@
   if (isOpen()) {
     return;
   }
+  if (! path_.empty()) {
+    unix_open();
+  } else {
+    local_open();
+  }
+}
+
+void TSocket::unix_open(){
+  if (! path_.empty()) {
+    // Unix Domain SOcket does not need addrinfo struct, so we pass NULL
+    openConnection(NULL);
+  }
+}
+
+void TSocket::local_open(){
+  if (isOpen()) {
+    return;
+  }
 
   // Validate port number
   if (port_ < 0 || port_ > 0xFFFF) {
diff --git a/lib/cpp/src/transport/TSocket.h b/lib/cpp/src/transport/TSocket.h
index 0184362..f69a9a1 100644
--- a/lib/cpp/src/transport/TSocket.h
+++ b/lib/cpp/src/transport/TSocket.h
@@ -60,6 +60,14 @@
   TSocket(std::string host, int port);
 
   /**
+   * Constructs a new Unix domain socket.
+   * Note that this does NOT actually connect the socket.
+   *
+   * @param path The Unix domain socket e.g. "/tmp/ThriftTest.binary.thrift"
+   */
+  TSocket(std::string path);
+
+  /**
    * Destroyes the socket object, closing it if necessary.
    */
   virtual ~TSocket();
@@ -217,6 +225,9 @@
   /** Port number to connect on */
   int port_;
 
+  /** UNIX domain socket path */
+  std::string path_;
+
   /** Underlying UNIX socket handle */
   int socket_;
 
@@ -246,6 +257,10 @@
 
   /** Whether to use low minimum TCP retransmission timeout */
   static bool useLowMinRto_;
+
+ private:
+  void unix_open();
+  void local_open();
 };
 
 }}} // apache::thrift::transport