Fail and retry logic for TSocketPool

Summary: Replicating php logic: If opening fails enough times, mark server as down for some amount of time

Reviewed By: aditya

Test Plan: compiling thrift - any good test ideas?

Revert: OK

DiffCamp Revision: 8381


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665534 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TSocketPool.cpp b/lib/cpp/src/transport/TSocketPool.cpp
index 235d060..af7303b 100644
--- a/lib/cpp/src/transport/TSocketPool.cpp
+++ b/lib/cpp/src/transport/TSocketPool.cpp
@@ -14,6 +14,26 @@
 using namespace std;
 
 /**
+ * TSocketPoolServer implementation
+ *
+ * @author Akhil Wable <akhil@facebook.com>
+ */
+TSocketPoolServer::TSocketPoolServer()
+  : host_(""),
+    port_(0),
+    lastFailTime_(0),
+    consecutiveFailures_(0) {}
+
+/**
+ * Constructor for TSocketPool server
+ */
+TSocketPoolServer::TSocketPoolServer(const std::string &host, int port)
+  : host_(host),
+    port_(port),
+    lastFailTime_(0),
+    consecutiveFailures_(0) {}
+
+/**
  * TSocketPool implementation.
  *
  * @author Jason Sobel <jsobel@facebook.com>
@@ -38,13 +58,15 @@
 }
 
 TSocketPool::TSocketPool(const vector<pair<string, int> > servers) : TSocket(),
-  servers_(servers),
   numRetries_(1),
   retryInterval_(60),
   maxConsecutiveFailures_(1),
   randomize_(true),
   alwaysTryLast_(true)
 {
+  for (unsigned i = 0; i < servers.size(); ++i) {
+    addServer(servers[i].first, servers[i].second);
+  }
 }
 
 TSocketPool::TSocketPool(const string& host, int port) : TSocket(),
@@ -62,7 +84,7 @@
 }
 
 void TSocketPool::addServer(const string& host, int port) {
-  servers_.push_back(pair<string, int>(host, port));
+  servers_.push_back(TSocketPoolServer(host, port));
 }
 
 void TSocketPool::setNumRetries(int numRetries) {
@@ -92,20 +114,45 @@
     std::random_shuffle(servers_.begin(), servers_.end());
   }
 
-  for (unsigned int i = 0; i < servers_.size(); ++i) {
-    host_ = servers_[i].first;
-    port_ = servers_[i].second;
+  unsigned int numServers = servers_.size();
+  for (unsigned int i = 0; i < numServers; ++i) {
 
-    for (int j = 0; j < numRetries_; ++j) {
-      try {
-        TSocket::open();
+    TSocketPoolServer &server = servers_[i];
+    bool retryIntervalPassed = (server.lastFailTime_ == 0);
+    bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false;
 
-        // success
-        return;
-      } catch (TException e) {
-        // connection failed
+    if (server.lastFailTime_ > 0) {
+      // The server was marked as down, so check if enough time has elapsed to retry
+      int elapsedTime = time(NULL) - server.lastFailTime_;
+      if (elapsedTime > retryInterval_) {
+        retryIntervalPassed = true;
       }
     }
+
+    if (retryIntervalPassed || isLastServer) {
+      for (int j = 0; j < numRetries_; ++j) {
+        try {
+          TSocket::open();
+
+          // reset lastFailTime_ is required
+          if (server.lastFailTime_) {
+            server.lastFailTime_ = 0;
+          }
+
+          // success
+          return;
+        } catch (TException e) {
+          // connection failed
+        }
+      }
+    }
+
+    ++server.consecutiveFailures_;
+    if (server.consecutiveFailures_ > maxConsecutiveFailures_) {
+      // Mark server as down
+      server.consecutiveFailures_ = 0;
+      server.lastFailTime_ = time(NULL);
+    }
   }
 
   GlobalOutput("TSocketPool::open: all connections failed");
diff --git a/lib/cpp/src/transport/TSocketPool.h b/lib/cpp/src/transport/TSocketPool.h
index 847f67e..bed4cca 100644
--- a/lib/cpp/src/transport/TSocketPool.h
+++ b/lib/cpp/src/transport/TSocketPool.h
@@ -12,6 +12,37 @@
 
 namespace facebook { namespace thrift { namespace transport {
 
+ /**
+  * Class to hold server information for TSocketPool
+  *
+  * @author Akhil Wable <akhil@facebook.com>
+  */
+class TSocketPoolServer {
+
+  public:
+  /**
+   * Default constructor for server info
+   */
+  TSocketPoolServer();
+
+  /**
+   * Constructor for TSocketPool server
+   */
+  TSocketPoolServer(const std::string &host, int port);
+
+  // Host name
+  std::string host_;
+
+  // Port to connect on
+  int port_;
+
+  // Last time connecting to this server failed
+  int lastFailTime_;
+
+  // Number of consecutive times connecting to this server failed
+  int consecutiveFailures_;
+};
+
 /**
  * TCP Socket implementation of the TTransport interface.
  *
@@ -87,7 +118,7 @@
  protected:
 
    /** List of servers to connect to */
-   std::vector<std::pair<std::string, int> > servers_;
+   std::vector<TSocketPoolServer> servers_;
 
    /** How many times to retry each host in connect */
    int numRetries_;