Persistent conns in TSocketPool

Summary: Added support for persistent conns in TSocketPool
         Also, added some util functions in TNonblockingServer

Reviewed By: mcslee

Test Plan: Ran a test search cluster with these changes - open was only called once (I put fprintfs in open and close), after which the socket was reused

Revert: OK

TracCamp Project: Thrift

DiffCamp Revision: 11425


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665668 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TSocketPool.cpp b/lib/cpp/src/transport/TSocketPool.cpp
index e22a4b8..d5a6501 100644
--- a/lib/cpp/src/transport/TSocketPool.cpp
+++ b/lib/cpp/src/transport/TSocketPool.cpp
@@ -23,6 +23,7 @@
 TSocketPoolServer::TSocketPoolServer()
   : host_(""),
     port_(0),
+    socket_(-1),
     lastFailTime_(0),
     consecutiveFailures_(0) {}
 
@@ -32,6 +33,7 @@
 TSocketPoolServer::TSocketPoolServer(const string &host, int port)
   : host_(host),
     port_(port),
+    socket_(-1),
     lastFailTime_(0),
     consecutiveFailures_(0) {}
 
@@ -100,7 +102,12 @@
 }
 
 TSocketPool::~TSocketPool() {
-  close();
+  vector< shared_ptr<TSocketPoolServer> >::const_iterator iter = servers_.begin();
+  vector< shared_ptr<TSocketPoolServer> >::const_iterator iterEnd = servers_.end();
+  for (; iter != iterEnd; ++iter) {
+    setCurrentServer(*iter);
+    TSocketPool::close();
+  }
 }
 
 void TSocketPool::addServer(const string& host, int port) {
@@ -136,6 +143,13 @@
   alwaysTryLast_ = alwaysTryLast;
 }
 
+void TSocketPool::setCurrentServer(const shared_ptr<TSocketPoolServer> &server) {
+  currentServer_ = server;
+  host_ = server->host_;
+  port_ = server->port_;
+  socket_ = server->socket_;
+}
+
 /* TODO: without apc we ignore a lot of functionality from the php version */
 void TSocketPool::open() {
   if (randomize_) {
@@ -145,16 +159,21 @@
   unsigned int numServers = servers_.size();
   for (unsigned int i = 0; i < numServers; ++i) {
 
-    TSocketPoolServer &server = *(servers_[i]);
-    bool retryIntervalPassed = (server.lastFailTime_ == 0);
+    shared_ptr<TSocketPoolServer> &server = servers_[i];
+    bool retryIntervalPassed = (server->lastFailTime_ == 0);
     bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false;
 
-    host_ = server.host_;
-    port_ = server.port_;
+    // Impersonate the server socket
+    setCurrentServer(server);
 
-    if (server.lastFailTime_ > 0) {
+    if (isOpen()) {
+      // already open means we're done
+      return;
+    }
+
+    if (server->lastFailTime_ > 0) {
       // The server was marked as down, so check if enough time has elapsed to retry
-      int elapsedTime = time(NULL) - server.lastFailTime_;
+      int elapsedTime = time(NULL) - server->lastFailTime_;
       if (elapsedTime > retryInterval_) {
         retryIntervalPassed = true;
       }
@@ -165,23 +184,28 @@
         try {
           TSocket::open();
 
+          // Copy over the opened socket so that we can keep it persistent
+          server->socket_ = socket_;
+
           // reset lastFailTime_ is required
-          if (server.lastFailTime_) {
-            server.lastFailTime_ = 0;
+          if (server->lastFailTime_) {
+            server->lastFailTime_ = 0;
           }
 
           // success
           return;
         } catch (TException e) {
+          string errStr = "TSocketPool::open failed "+getSocketInfo()+": "+e.what();
+          GlobalOutput(errStr.c_str());
           // connection failed
         }
       }
 
-      ++server.consecutiveFailures_;
-      if (server.consecutiveFailures_ > maxConsecutiveFailures_) {
+      ++server->consecutiveFailures_;
+      if (server->consecutiveFailures_ > maxConsecutiveFailures_) {
         // Mark server as down
-        server.consecutiveFailures_ = 0;
-        server.lastFailTime_ = time(NULL);
+        server->consecutiveFailures_ = 0;
+        server->lastFailTime_ = time(NULL);
       }
     }
   }
@@ -190,4 +214,11 @@
   throw TTransportException(TTransportException::NOT_OPEN);
 }
 
+void TSocketPool::close() {
+  if (isOpen()) {
+    TSocket::close();
+    currentServer_->socket_ = -1;
+  }
+}
+
 }}} // facebook::thrift::transport