Persistent conns in TSocketPool

Summary: Added support for persistent conns in TSocketPool
         Also, added some util functions in TNonblockingServer

Reviewed By: mcslee

Test Plan: Ran a test search cluster with these changes - open was only called once (I put fprintfs in open and close), after which the socket was reused

Revert: OK

TracCamp Project: Thrift

DiffCamp Revision: 11425


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665668 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index 7c6bc7f..0c3c6d5 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -64,6 +64,9 @@
   // Event struct, for use with eventBase_
   struct event serverEvent_;
 
+  // Number of TConnection object we've created
+  size_t numTConnections_;
+
   /**
    * This is a stack of all the objects that have been created but that
    * are NOT currently in use. When we close a connection, we place it on this
@@ -82,7 +85,8 @@
     port_(port),
     frameResponses_(true),
     threadPoolProcessing_(false),
-    eventBase_(NULL) {}
+    eventBase_(NULL),
+    numTConnections_(0) {}
 
   TNonblockingServer(boost::shared_ptr<TProcessor> processor,
                      boost::shared_ptr<TProtocolFactory> protocolFactory,
@@ -93,7 +97,8 @@
     port_(port),
     frameResponses_(true),
     threadManager_(threadManager),
-    eventBase_(NULL) {
+    eventBase_(NULL),
+    numTConnections_(0) {
     setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setInputProtocolFactory(protocolFactory);
@@ -113,7 +118,8 @@
     port_(port),
     frameResponses_(true),
     threadManager_(threadManager),
-    eventBase_(NULL) {
+    eventBase_(NULL),
+    numTConnections_(0) {
     setInputTransportFactory(inputTransportFactory);
     setOutputTransportFactory(outputTransportFactory);
     setInputProtocolFactory(inputProtocolFactory);
@@ -128,6 +134,10 @@
     threadPoolProcessing_ = (threadManager != NULL);
   }
 
+  boost::shared_ptr<ThreadManager> getThreadManager() {
+    return threadManager_;
+  }
+
   bool isThreadPoolProcessing() const {
     return threadPoolProcessing_;
   }
@@ -148,6 +158,18 @@
     return eventBase_;
   }
 
+  void incrementNumConnections(size_t incr=1) {
+    numTConnections_ += incr;
+  }
+
+  size_t getNumConnections() {
+    return numTConnections_;
+  }
+
+  size_t getNumIdleConnections() {
+    return connectionStack_.size();
+  }
+
   TConnection* createConnection(int socket, short flags);
 
   void returnConnection(TConnection* connection);
@@ -163,7 +185,6 @@
   void registerEvents(event_base* base);
 
   void serve();
-
 };
 
 /**
@@ -304,6 +325,11 @@
     outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
 
     init(socket, eventFlags, s);
+    server_->incrementNumConnections();
+  }
+
+  ~TConnection() {
+    server_->incrementNumConnections(-1);
   }
 
   // Initialize
diff --git a/lib/cpp/src/transport/TSocketPool.cpp b/lib/cpp/src/transport/TSocketPool.cpp
index e22a4b8..d5a6501 100644
--- a/lib/cpp/src/transport/TSocketPool.cpp
+++ b/lib/cpp/src/transport/TSocketPool.cpp
@@ -23,6 +23,7 @@
 TSocketPoolServer::TSocketPoolServer()
   : host_(""),
     port_(0),
+    socket_(-1),
     lastFailTime_(0),
     consecutiveFailures_(0) {}
 
@@ -32,6 +33,7 @@
 TSocketPoolServer::TSocketPoolServer(const string &host, int port)
   : host_(host),
     port_(port),
+    socket_(-1),
     lastFailTime_(0),
     consecutiveFailures_(0) {}
 
@@ -100,7 +102,12 @@
 }
 
 TSocketPool::~TSocketPool() {
-  close();
+  vector< shared_ptr<TSocketPoolServer> >::const_iterator iter = servers_.begin();
+  vector< shared_ptr<TSocketPoolServer> >::const_iterator iterEnd = servers_.end();
+  for (; iter != iterEnd; ++iter) {
+    setCurrentServer(*iter);
+    TSocketPool::close();
+  }
 }
 
 void TSocketPool::addServer(const string& host, int port) {
@@ -136,6 +143,13 @@
   alwaysTryLast_ = alwaysTryLast;
 }
 
+void TSocketPool::setCurrentServer(const shared_ptr<TSocketPoolServer> &server) {
+  currentServer_ = server;
+  host_ = server->host_;
+  port_ = server->port_;
+  socket_ = server->socket_;
+}
+
 /* TODO: without apc we ignore a lot of functionality from the php version */
 void TSocketPool::open() {
   if (randomize_) {
@@ -145,16 +159,21 @@
   unsigned int numServers = servers_.size();
   for (unsigned int i = 0; i < numServers; ++i) {
 
-    TSocketPoolServer &server = *(servers_[i]);
-    bool retryIntervalPassed = (server.lastFailTime_ == 0);
+    shared_ptr<TSocketPoolServer> &server = servers_[i];
+    bool retryIntervalPassed = (server->lastFailTime_ == 0);
     bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false;
 
-    host_ = server.host_;
-    port_ = server.port_;
+    // Impersonate the server socket
+    setCurrentServer(server);
 
-    if (server.lastFailTime_ > 0) {
+    if (isOpen()) {
+      // already open means we're done
+      return;
+    }
+
+    if (server->lastFailTime_ > 0) {
       // The server was marked as down, so check if enough time has elapsed to retry
-      int elapsedTime = time(NULL) - server.lastFailTime_;
+      int elapsedTime = time(NULL) - server->lastFailTime_;
       if (elapsedTime > retryInterval_) {
         retryIntervalPassed = true;
       }
@@ -165,23 +184,28 @@
         try {
           TSocket::open();
 
+          // Copy over the opened socket so that we can keep it persistent
+          server->socket_ = socket_;
+
           // reset lastFailTime_ is required
-          if (server.lastFailTime_) {
-            server.lastFailTime_ = 0;
+          if (server->lastFailTime_) {
+            server->lastFailTime_ = 0;
           }
 
           // success
           return;
         } catch (TException e) {
+          string errStr = "TSocketPool::open failed "+getSocketInfo()+": "+e.what();
+          GlobalOutput(errStr.c_str());
           // connection failed
         }
       }
 
-      ++server.consecutiveFailures_;
-      if (server.consecutiveFailures_ > maxConsecutiveFailures_) {
+      ++server->consecutiveFailures_;
+      if (server->consecutiveFailures_ > maxConsecutiveFailures_) {
         // Mark server as down
-        server.consecutiveFailures_ = 0;
-        server.lastFailTime_ = time(NULL);
+        server->consecutiveFailures_ = 0;
+        server->lastFailTime_ = time(NULL);
       }
     }
   }
@@ -190,4 +214,11 @@
   throw TTransportException(TTransportException::NOT_OPEN);
 }
 
+void TSocketPool::close() {
+  if (isOpen()) {
+    TSocket::close();
+    currentServer_->socket_ = -1;
+  }
+}
+
 }}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TSocketPool.h b/lib/cpp/src/transport/TSocketPool.h
index 6c3b0f7..a455aca 100644
--- a/lib/cpp/src/transport/TSocketPool.h
+++ b/lib/cpp/src/transport/TSocketPool.h
@@ -36,6 +36,9 @@
   // Port to connect on
   int port_;
 
+  // Socket for the server
+  int socket_;
+
   // Last time connecting to this server failed
   int lastFailTime_;
 
@@ -138,11 +141,21 @@
     */
    void open();
 
+   /*
+    * Closes the UNIX socket
+    */
+   void close();
+
  protected:
 
+  void setCurrentServer(const boost::shared_ptr<TSocketPoolServer> &server);
+
    /** List of servers to connect to */
   std::vector< boost::shared_ptr<TSocketPoolServer> > servers_;
 
+  /** Current server */
+  boost::shared_ptr<TSocketPoolServer> currentServer_;
+
    /** How many times to retry each host in connect */
    int numRetries_;