THRIFT-4276:Add SSL support to the C++ Nonblocking Server
Client: C++ Lib
Patch: Divya Thaluru

Github Pull Request:

    This closes #1251
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index 97c4cd9..d5af12a 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -209,10 +209,8 @@
   class Task;
 
   /// Constructor
-  TConnection(THRIFT_SOCKET socket,
-              TNonblockingIOThread* ioThread,
-              const sockaddr* addr,
-              socklen_t addrLen) {
+  TConnection(boost::shared_ptr<TSocket> socket,
+              TNonblockingIOThread* ioThread) {
     readBuffer_ = NULL;
     readBufferSize_ = 0;
 
@@ -224,8 +222,10 @@
     inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
     outputTransport_.reset(
         new TMemoryBuffer(static_cast<uint32_t>(server_->getWriteBufferDefaultSize())));
-    tSocket_.reset(new TSocket());
-    init(socket, ioThread, addr, addrLen);
+
+    tSocket_ =  socket;
+
+    init(ioThread);
   }
 
   ~TConnection() { std::free(readBuffer_); }
@@ -242,10 +242,10 @@
   void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
 
   /// Initialize
-  void init(THRIFT_SOCKET socket,
-            TNonblockingIOThread* ioThread,
-            const sockaddr* addr,
-            socklen_t addrLen);
+  void init(TNonblockingIOThread* ioThread);
+
+  /// set socket for connection
+  void setSocket(boost::shared_ptr<TSocket> socket);
 
   /**
    * This is called when the application transitions from one state into
@@ -367,13 +367,7 @@
   void* connectionContext_;
 };
 
-void TNonblockingServer::TConnection::init(THRIFT_SOCKET socket,
-                                           TNonblockingIOThread* ioThread,
-                                           const sockaddr* addr,
-                                           socklen_t addrLen) {
-  tSocket_->setSocketFD(socket);
-  tSocket_->setCachedAddress(addr, addrLen);
-
+void TNonblockingServer::TConnection::init(TNonblockingIOThread* ioThread) {
   ioThread_ = ioThread;
   server_ = ioThread->getServer();
   appState_ = APP_INIT;
@@ -416,6 +410,10 @@
   processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
 }
 
+void TNonblockingServer::TConnection::setSocket(boost::shared_ptr<TSocket> socket) {
+  tSocket_ = socket;
+}
+
 void TNonblockingServer::TConnection::workSocket() {
   int got = 0, left = 0, sent = 0;
   uint32_t fetch = 0;
@@ -441,10 +439,14 @@
       }
       readBufferPos_ += fetch;
     } catch (TTransportException& te) {
-      GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
-      close();
+      //In Nonblocking SSLSocket some operations need to be retried again.
+      //Current approach is parsing exception message, but a better solution needs to be investigated.
+      if(!strstr(te.what(), "retry")) {
+        GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+        close();
 
-      return;
+        return;
+      }
     }
 
     if (readBufferPos_ < sizeof(framing.size)) {
@@ -481,8 +483,12 @@
       fetch = readWant_ - readBufferPos_;
       got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
     } catch (TTransportException& te) {
-      GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
-      close();
+      //In Nonblocking SSLSocket some operations need to be retried again.
+      //Current approach is parsing exception message, but a better solution needs to be investigated.
+      if(!strstr(te.what(), "retry")) {
+        GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+        close();
+      }
 
       return;
     }
@@ -748,7 +754,7 @@
     appState_ = APP_READ_REQUEST;
 
     // Work the socket right away
-    // workSocket();
+    workSocket();
 
     return;
 
@@ -883,9 +889,7 @@
  * Creates a new connection either by reusing an object off the stack or
  * by allocating a new one entirely
  */
-TNonblockingServer::TConnection* TNonblockingServer::createConnection(THRIFT_SOCKET socket,
-                                                                      const sockaddr* addr,
-                                                                      socklen_t addrLen) {
+TNonblockingServer::TConnection* TNonblockingServer::createConnection(boost::shared_ptr<TSocket> socket) {
   // Check the stack
   Guard g(connMutex_);
 
@@ -899,12 +903,13 @@
   // Check the connection stack to see if we can re-use
   TConnection* result = NULL;
   if (connectionStack_.empty()) {
-    result = new TConnection(socket, ioThread, addr, addrLen);
+    result = new TConnection(socket, ioThread);
     ++numTConnections_;
   } else {
     result = connectionStack_.top();
     connectionStack_.pop();
-    result->init(socket, ioThread, addr, addrLen);
+    result->setSocket(socket);
+    result->init(ioThread);
   }
   activeConnections_.push_back(result);
   return result;
@@ -939,53 +944,35 @@
   // Make sure that libevent didn't mess up the socket handles
   assert(fd == serverSocket_);
 
-  // Server socket accepted a new connection
-  socklen_t addrLen;
-  sockaddr_storage addrStorage;
-  sockaddr* addrp = (sockaddr*)&addrStorage;
-  addrLen = sizeof(addrStorage);
-
   // Going to accept a new client socket
-  THRIFT_SOCKET clientSocket;
+  boost::shared_ptr<TSocket> clientSocket;
 
-  // Accept as many new clients as possible, even though libevent signaled only
-  // one, this helps us to avoid having to go back into the libevent engine so
-  // many times
-  while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
+  clientSocket = serverTransport_->accept();
+  if (clientSocket) {
     // If we're overloaded, take action here
     if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
       Guard g(connMutex_);
       nConnectionsDropped_++;
       nTotalConnectionsDropped_++;
       if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
-        ::THRIFT_CLOSESOCKET(clientSocket);
+        clientSocket->close();
         return;
       } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
         if (!drainPendingTask()) {
           // Nothing left to discard, so we drop connection instead.
-          ::THRIFT_CLOSESOCKET(clientSocket);
+          clientSocket->close();
           return;
         }
       }
     }
 
-    // Explicitly set this socket to NONBLOCK mode
-    int flags;
-    if ((flags = THRIFT_FCNTL(clientSocket, THRIFT_F_GETFL, 0)) < 0
-        || THRIFT_FCNTL(clientSocket, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
-      GlobalOutput.perror("thriftServerEventHandler: set THRIFT_O_NONBLOCK (THRIFT_FCNTL) ",
-                          THRIFT_GET_SOCKET_ERROR);
-      ::THRIFT_CLOSESOCKET(clientSocket);
-      return;
-    }
-
     // Create a new TConnection for this client socket.
-    TConnection* clientConnection = createConnection(clientSocket, addrp, addrLen);
+    TConnection* clientConnection = createConnection(clientSocket);
 
     // Fail fast if we could not create a TConnection object
     if (clientConnection == NULL) {
       GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
-      ::THRIFT_CLOSESOCKET(clientSocket);
+      clientSocket->close();
       return;
     }
 
@@ -1009,15 +996,6 @@
         clientConnection->close();
       }
     }
-
-    // addrLen is written by the accept() call, so needs to be set before the next call.
-    addrLen = sizeof(addrStorage);
-  }
-
-  // Done looping accept, now we have to make sure the error is due to
-  // blocking. Any other error is a problem
-  if (THRIFT_GET_SOCKET_ERROR != THRIFT_EAGAIN && THRIFT_GET_SOCKET_ERROR != THRIFT_EWOULDBLOCK) {
-    GlobalOutput.perror("thriftServerEventHandler: accept() ", THRIFT_GET_SOCKET_ERROR);
   }
 }
 
@@ -1025,130 +1003,10 @@
  * Creates a socket to listen on and binds it to the local port.
  */
 void TNonblockingServer::createAndListenOnSocket() {
-#ifdef _WIN32
-  TWinsockSingleton::create();
-#endif // _WIN32
-
-  THRIFT_SOCKET s;
-
-  struct addrinfo hints, *res, *res0;
-  int error;
-
-  char port[sizeof("65536") + 1];
-  memset(&hints, 0, sizeof(hints));
-  hints.ai_family = PF_UNSPEC;
-  hints.ai_socktype = SOCK_STREAM;
-  hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
-  sprintf(port, "%d", port_);
-
-  // Wildcard address
-  error = getaddrinfo(NULL, port, &hints, &res0);
-  if (error) {
-    throw TException("TNonblockingServer::serve() getaddrinfo "
-                     + string(THRIFT_GAI_STRERROR(error)));
-  }
-
-  // Pick the ipv6 address first since ipv4 addresses can be mapped
-  // into ipv6 space.
-  for (res = res0; res; res = res->ai_next) {
-    if (res->ai_family == AF_INET6 || res->ai_next == NULL)
-      break;
-  }
-
-  // Create the server socket
-  s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
-  if (s == -1) {
-    freeaddrinfo(res0);
-    throw TException("TNonblockingServer::serve() socket() -1");
-  }
-
-#ifdef IPV6_V6ONLY
-  if (res->ai_family == AF_INET6) {
-    int zero = 0;
-    if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
-      GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY", THRIFT_GET_SOCKET_ERROR);
-    }
-  }
-#endif // #ifdef IPV6_V6ONLY
-
-  int one = 1;
-
-  // Set THRIFT_NO_SOCKET_CACHING to avoid 2MSL delay on server restart
-  setsockopt(s, SOL_SOCKET, THRIFT_NO_SOCKET_CACHING, const_cast_sockopt(&one), sizeof(one));
-
-  if (::bind(s, res->ai_addr, static_cast<int>(res->ai_addrlen)) == -1) {
-    ::THRIFT_CLOSESOCKET(s);
-    freeaddrinfo(res0);
-    throw TTransportException(TTransportException::NOT_OPEN,
-                              "TNonblockingServer::serve() bind",
-                              THRIFT_GET_SOCKET_ERROR);
-  }
-
-  // Done with the addr info
-  freeaddrinfo(res0);
-
-  // Set up this file descriptor for listening
-  listenSocket(s);
+  serverTransport_->listen();
+  serverSocket_ = serverTransport_->getSocketFD();
 }
 
-/**
- * Takes a socket created by listenSocket() and sets various options on it
- * to prepare for use in the server.
- */
-void TNonblockingServer::listenSocket(THRIFT_SOCKET s) {
-  // Set socket to nonblocking mode
-  int flags;
-  if ((flags = THRIFT_FCNTL(s, THRIFT_F_GETFL, 0)) < 0
-      || THRIFT_FCNTL(s, THRIFT_F_SETFL, flags | THRIFT_O_NONBLOCK) < 0) {
-    ::THRIFT_CLOSESOCKET(s);
-    throw TException("TNonblockingServer::serve() THRIFT_O_NONBLOCK");
-  }
-
-  int one = 1;
-  struct linger ling = {0, 0};
-
-  // Keepalive to ensure full result flushing
-  setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
-
-  // Turn linger off to avoid hung sockets
-  setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
-
-// Set TCP nodelay if available, MAC OS X Hack
-// See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
-#ifndef TCP_NOPUSH
-  setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
-#endif
-
-#ifdef TCP_LOW_MIN_RTO
-  if (TSocket::getUseLowMinRto()) {
-    setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
-  }
-#endif
-
-  if (listen(s, LISTEN_BACKLOG) == -1) {
-    ::THRIFT_CLOSESOCKET(s);
-    throw TTransportException(TTransportException::NOT_OPEN, "TNonblockingServer::serve() listen");
-  }
-
-  // Cool, this socket is good to go, set it as the serverSocket_
-  serverSocket_ = s;
-
-  if (!port_) {
-    struct sockaddr_storage addr;
-    socklen_t size = sizeof(addr);
-    if (!getsockname(serverSocket_, reinterpret_cast<sockaddr*>(&addr), &size)) {
-      if (addr.ss_family == AF_INET6) {
-        const struct sockaddr_in6* sin = reinterpret_cast<const struct sockaddr_in6*>(&addr);
-        listenPort_ = ntohs(sin->sin6_port);
-      } else {
-        const struct sockaddr_in* sin = reinterpret_cast<const struct sockaddr_in*>(&addr);
-        listenPort_ = ntohs(sin->sin_port);
-      }
-    } else {
-      GlobalOutput.perror("TNonblocking: failed to get listen port: ", THRIFT_GET_SOCKET_ERROR);
-    }
-  }
-}
 
 void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
   threadManager_ = threadManager;
@@ -1205,10 +1063,7 @@
   connection->forceClose();
 }
 
-void TNonblockingServer::stop() {
-  if (!port_) {
-    listenPort_ = 0;
-  }
+void TNonblockingServer::stop() { 
   // Breaks the event loop in all threads so that they end ASAP.
   for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
     ioThreads_[i]->stop();
@@ -1249,8 +1104,7 @@
   assert(ioThreads_.size() == numIOThreads_);
   assert(ioThreads_.size() > 0);
 
-  GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
-                      listenPort_,
+  GlobalOutput.printf("TNonblockingServer: Serving with %d io threads.",
                       ioThreads_.size());
 
   // Launch all the secondary IO threads in separate threads