Thrift now a TLP - INFRA-3116

git-svn-id: https://svn.apache.org/repos/asf/thrift/branches/0.1.x@1028168 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/php/src/transport/TSocketPool.php b/lib/php/src/transport/TSocketPool.php
new file mode 100644
index 0000000..7f1157c
--- /dev/null
+++ b/lib/php/src/transport/TSocketPool.php
@@ -0,0 +1,296 @@
+<?php
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ * @package thrift.transport
+ */
+
+
+/** Inherits from Socket */
+include_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php';
+
+/**
+ * This library makes use of APC cache to make hosts as down in a web
+ * environment. If you are running from the CLI or on a system without APC
+ * installed, then these null functions will step in and act like cache
+ * misses.
+ */
+if (!function_exists('apc_fetch')) {
+  function apc_fetch($key) { return FALSE; }
+  function apc_store($key, $var, $ttl=0) { return FALSE; }
+}
+
+/**
+ * Sockets implementation of the TTransport interface that allows connection
+ * to a pool of servers.
+ *
+ * @package thrift.transport
+ */
+class TSocketPool extends TSocket {
+
+  /**
+   * Remote servers. Array of associative arrays with 'host' and 'port' keys
+   */
+  private $servers_ = array();
+
+  /**
+   * How many times to retry each host in connect
+   *
+   * @var int
+   */
+  private $numRetries_ = 1;
+
+  /**
+   * Retry interval in seconds, how long to not try a host if it has been
+   * marked as down.
+   *
+   * @var int
+   */
+  private $retryInterval_ = 60;
+
+  /**
+   * Max consecutive failures before marking a host down.
+   *
+   * @var int
+   */
+  private $maxConsecutiveFailures_ = 1;
+
+  /**
+   * Try hosts in order? or Randomized?
+   *
+   * @var bool
+   */
+  private $randomize_ = TRUE;
+
+  /**
+   * Always try last host, even if marked down?
+   *
+   * @var bool
+   */
+  private $alwaysTryLast_ = TRUE;
+
+  /**
+   * Socket pool constructor
+   *
+   * @param array  $hosts        List of remote hostnames
+   * @param mixed  $ports        Array of remote ports, or a single common port
+   * @param bool   $persist      Whether to use a persistent socket
+   * @param mixed  $debugHandler Function for error logging
+   */
+  public function __construct($hosts=array('localhost'),
+                              $ports=array(9090),
+                              $persist=FALSE,
+                              $debugHandler=null) {
+    parent::__construct(null, 0, $persist, $debugHandler);
+
+    if (!is_array($ports)) {
+      $port = $ports;
+      $ports = array();
+      foreach ($hosts as $key => $val) {
+        $ports[$key] = $port;
+      }
+    }
+
+    foreach ($hosts as $key => $host) {
+      $this->servers_ []= array('host' => $host,
+                                'port' => $ports[$key]);
+    }
+  }
+
+  /**
+   * Add a server to the pool
+   *
+   * This function does not prevent you from adding a duplicate server entry.
+   *
+   * @param string $host hostname or IP
+   * @param int $port port
+   */
+  public function addServer($host, $port) {
+    $this->servers_[] = array('host' => $host, 'port' => $port);
+  }
+
+  /**
+   * Sets how many time to keep retrying a host in the connect function.
+   *
+   * @param int $numRetries
+   */
+  public function setNumRetries($numRetries) {
+    $this->numRetries_ = $numRetries;
+  }
+
+  /**
+   * Sets how long to wait until retrying a host if it was marked down
+   *
+   * @param int $numRetries
+   */
+  public function setRetryInterval($retryInterval) {
+    $this->retryInterval_ = $retryInterval;
+  }
+
+  /**
+   * Sets how many time to keep retrying a host before marking it as down.
+   *
+   * @param int $numRetries
+   */
+  public function setMaxConsecutiveFailures($maxConsecutiveFailures) {
+    $this->maxConsecutiveFailures_ = $maxConsecutiveFailures;
+  }
+
+  /**
+   * Turns randomization in connect order on or off.
+   *
+   * @param bool $randomize
+   */
+  public function setRandomize($randomize) {
+    $this->randomize_ = $randomize;
+  }
+
+  /**
+   * Whether to always try the last server.
+   *
+   * @param bool $alwaysTryLast
+   */
+  public function setAlwaysTryLast($alwaysTryLast) {
+    $this->alwaysTryLast_ = $alwaysTryLast;
+  }
+
+
+  /**
+   * Connects the socket by iterating through all the servers in the pool
+   * and trying to find one that works.
+   */
+  public function open() {
+    // Check if we want order randomization
+    if ($this->randomize_) {
+      shuffle($this->servers_);
+    }
+
+    // Count servers to identify the "last" one
+    $numServers = count($this->servers_);
+
+    for ($i = 0; $i < $numServers; ++$i) {
+
+      // This extracts the $host and $port variables
+      extract($this->servers_[$i]);
+
+      // Check APC cache for a record of this server being down
+      $failtimeKey = 'thrift_failtime:'.$host.':'.$port.'~';
+
+      // Cache miss? Assume it's OK
+      $lastFailtime = apc_fetch($failtimeKey);
+      if ($lastFailtime === FALSE) {
+        $lastFailtime = 0;
+      }
+
+      $retryIntervalPassed = FALSE;
+
+      // Cache hit...make sure enough the retry interval has elapsed
+      if ($lastFailtime > 0) {
+        $elapsed = time() - $lastFailtime;
+        if ($elapsed > $this->retryInterval_) {
+          $retryIntervalPassed = TRUE;
+          if ($this->debug_) {
+            call_user_func($this->debugHandler_,
+                           'TSocketPool: retryInterval '.
+                           '('.$this->retryInterval_.') '.
+                           'has passed for host '.$host.':'.$port);
+          }
+        }
+      }
+
+      // Only connect if not in the middle of a fail interval, OR if this
+      // is the LAST server we are trying, just hammer away on it
+      $isLastServer = FALSE;
+      if ($this->alwaysTryLast_) {
+        $isLastServer = ($i == ($numServers - 1));
+      }
+
+      if (($lastFailtime === 0) ||
+          ($isLastServer) ||
+          ($lastFailtime > 0 && $retryIntervalPassed)) {
+
+        // Set underlying TSocket params to this one
+        $this->host_ = $host;
+        $this->port_ = $port;
+
+        // Try up to numRetries_ connections per server
+        for ($attempt = 0; $attempt < $this->numRetries_; $attempt++) {
+          try {
+            // Use the underlying TSocket open function
+            parent::open();
+
+            // Only clear the failure counts if required to do so
+            if ($lastFailtime > 0) {
+              apc_store($failtimeKey, 0);
+            }
+
+            // Successful connection, return now
+            return;
+
+          } catch (TException $tx) {
+            // Connection failed
+          }
+        }
+
+        // Mark failure of this host in the cache
+        $consecfailsKey = 'thrift_consecfails:'.$host.':'.$port.'~';
+
+        // Ignore cache misses
+        $consecfails = apc_fetch($consecfailsKey);
+        if ($consecfails === FALSE) {
+          $consecfails = 0;
+        }
+
+        // Increment by one
+        $consecfails++;
+
+        // Log and cache this failure
+        if ($consecfails >= $this->maxConsecutiveFailures_) {
+          if ($this->debug_) {
+            call_user_func($this->debugHandler_,
+                           'TSocketPool: marking '.$host.':'.$port.
+                           ' as down for '.$this->retryInterval_.' secs '.
+                           'after '.$consecfails.' failed attempts.');
+          }
+          // Store the failure time
+          apc_store($failtimeKey, time());
+
+          // Clear the count of consecutive failures
+          apc_store($consecfailsKey, 0);
+        } else {
+          apc_store($consecfailsKey, $consecfails);
+        }
+      }
+    }
+
+    // Holy shit we failed them all. The system is totally ill!
+    $error = 'TSocketPool: All hosts in pool are down. ';
+    $hosts = array();
+    foreach ($this->servers_ as $server) {
+      $hosts []= $server['host'].':'.$server['port'];
+    }
+    $hostlist = implode(',', $hosts);
+    $error .= '('.$hostlist.')';
+    if ($this->debug_) {
+      call_user_func($this->debugHandler_, $error);
+    }
+    throw new TException($error);
+  }
+}
+
+?>