Thrift PHP TSocketPool client

Summary: Client that connects to one of an arbitrary pool of servers

Reviewed By: aditya


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664795 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/server/TSimpleServer.java b/lib/java/src/server/TSimpleServer.java
index 76a5762..05156c3 100644
--- a/lib/java/src/server/TSimpleServer.java
+++ b/lib/java/src/server/TSimpleServer.java
@@ -35,8 +35,12 @@
           io = transportFactory_.getIOTransports(client);
           while (processor_.process(io[0], io[1]));
         }
+      } catch (TTransportException ttx) {
+        // Client died, just move on
       } catch (TException tx) {
         tx.printStackTrace();
+      } catch (Exception x) {
+        x.printStackTrace();
       }
 
       if (io != null) {
diff --git a/lib/java/src/server/TThreadPoolServer.java b/lib/java/src/server/TThreadPoolServer.java
index 2f5be8d..7201cd3 100644
--- a/lib/java/src/server/TThreadPoolServer.java
+++ b/lib/java/src/server/TThreadPoolServer.java
@@ -101,8 +101,12 @@
       try {
         io = transportFactory_.getIOTransports(client_);
         while (processor_.process(io[0], io[1])) {}
+      } catch (TTransportException ttx) {
+        // Assume the client died and continue silently
       } catch (TException tx) {
         tx.printStackTrace();
+      } catch (Exception x) {
+        x.printStackTrace();
       }
 
       if (io != null) {
diff --git a/lib/php/Makefile.am b/lib/php/Makefile.am
index 23208d9..c7cd106 100644
--- a/lib/php/Makefile.am
+++ b/lib/php/Makefile.am
@@ -7,7 +7,8 @@
 transport_SCRIPTS = src/transport/TTransport.php \
                     src/transport/TBufferedTransport.php \
                     src/transport/TChunkedTransport.php \
-                    src/transport/TSocket.php
+                    src/transport/TSocket.php \
+                    src/transport/TSocketPool.php
 
 thriftdir = $(prefix)/php/thrift
 
diff --git a/lib/php/src/protocol/TBinaryProtocol.php b/lib/php/src/protocol/TBinaryProtocol.php
index 2b1384f..4123f5f 100644
--- a/lib/php/src/protocol/TBinaryProtocol.php
+++ b/lib/php/src/protocol/TBinaryProtocol.php
@@ -145,7 +145,9 @@
   public function writeString($out, $value) {
     $len = strlen($value);
     $result = $this->writeI32($out, $len);
-    $out->write($value, $len);
+    if ($len) {
+      $out->write($value, $len);
+    }
     return $result + $len;
   }
 
@@ -317,7 +319,11 @@
 
   public function readString($in, &$value) {
     $result = $this->readI32($in, $len);
-    $value = $in->readAll($len);
+    if ($len) {
+      $value = $in->readAll($len);
+    } else {
+      $value = '';
+    }
     return $result + $len;
   }
 }
diff --git a/lib/php/src/transport/TBufferedTransport.php b/lib/php/src/transport/TBufferedTransport.php
index dad96ff..3c66135 100644
--- a/lib/php/src/transport/TBufferedTransport.php
+++ b/lib/php/src/transport/TBufferedTransport.php
@@ -105,4 +105,3 @@
 }
 
 ?>
-
diff --git a/lib/php/src/transport/TSocket.php b/lib/php/src/transport/TSocket.php
index 0a3b090..74ef01f 100644
--- a/lib/php/src/transport/TSocket.php
+++ b/lib/php/src/transport/TSocket.php
@@ -20,21 +20,49 @@
    * 
    * @var string
    */
-  private $host_ = 'localhost';
+  protected $host_ = 'localhost';
 
   /**
    * Remote port
    *
    * @var int
    */
-  private $port_ = '9090';
+  protected $port_ = '9090';
+
+  /**
+   * Send timeout in milliseconds
+   *
+   * @var int
+   */
+  private $sendTimeout_ = 100;
+
+  /**
+   * Recv timeout in milliseconds
+   *
+   * @var int
+   */
+  private $recvTimeout_ = 750;
+
+  /**
+   * Is send timeout set?
+   *
+   * @var bool
+   */
+  private $sendTimeoutSet_ = FALSE;
 
   /**
    * Persistent socket or plain?
    *
    * @var bool
    */
-  private $persist_ = false;
+  private $persist_ = FALSE;
+
+  /**
+   * Debugging on?
+   *
+   * @var bool
+   */
+  private $debug_ = FALSE;
 
   /**
    * Socket constructor
@@ -43,13 +71,40 @@
    * @param int    $port    Remote port
    * @param bool   $persist Whether to use a persistent socket
    */
-  public function __construct($host='localhost', $port=9090, $persist=false) {
+  public function __construct($host='localhost', $port=9090, $persist=FALSE) {
     $this->host_ = $host;
     $this->port_ = $port;
     $this->persist_ = $persist;
   }
 
   /**
+   * Sets the send timeout.
+   *
+   * @param int $timeout
+   */
+  public function setSendTimeout($timeout) {
+    $this->sendTimeout_ = $timeout;
+  }
+
+  /**
+   * Sets the receive timeout.
+   *
+   * @param int $timeout
+   */
+  public function setRecvTimeout($timeout) {
+    $this->recvTimeout_ = $timeout;
+  }
+
+  /**
+   * Sets debugging output on or off
+   *
+   * @param bool $debug
+   */
+  public function setDebug($debug) {
+    $this->debug_ = $debug;
+  }
+
+  /**
    * Tests whether this is open
    *
    * @return bool true if the socket is open
@@ -63,37 +118,73 @@
    */
   public function open() {
     if ($this->persist_) {
-      $this->handle_ = pfsockopen($this->host_, $this->port_);
+      $this->handle_ = pfsockopen($this->host_,
+                                  $this->port_,
+                                  $errno,
+                                  $errstr,
+                                  $this->sendTimeout_/1000.0);
     } else {
-      $this->handle_ = fsockopen($this->host_, $this->port_);
+      $this->handle_ = fsockopen($this->host_,
+                                 $this->port_,
+                                 $errno,
+                                 $errstr,
+                                 $this->sendTimeout_/1000.0);
     }
-    if ($this->handle_ === FALSE) {      
-      throw new Exception('TSocket: Could not connect to '.
-                          $this->host_.':'.$this->port_);
+
+    // Connect failed?
+    if ($this->handle_ === FALSE) {
+      $error = 'TSocket: Could not connect to '.$this->host_.':'.$this->port_;
+      if ($this->debug_) {
+        error_log($error);
+      }
+      throw new Exception($error);
     }
+    
+    stream_set_timeout($this->handle_, 0, $this->sendTimeout_*1000);
+    $this->sendTimeoutSet_ = TRUE;
   }
 
   /**
-   * Closes the socket
+   * Closes the socket.
    */
   public function close() {
     if (!$this->persist_) {
-      fclose($this->handle_);
+      @fclose($this->handle_);
+      $this->handle_ = null;
     }
   }
   
   /**
    * Uses stream get contents to do the reading
+   *
+   * @param int $len How many bytes
+   * @return string Binary data
    */
   public function readAll($len) {
-    return stream_get_contents($this->handle_, $len);
+    if ($this->sendTimeoutSet_) {
+      stream_set_timeout($this->handle_, 0, $this->recvTimeout_*1000);
+      $this->sendTimeoutSet_ = FALSE;
+    }
+    $buf = @stream_get_contents($this->handle_, $len);
+    if ($buf === FALSE || strlen($buf) !== $len) {
+      throw new Exception('TSocket: Could not read '.$len.' bytes from '.
+                          $this->host_.':'.$this->port_);
+    }
+    return $buf;
   }
 
   /**
    * Read from the socket
+   *
+   * @param int $len How many bytes
+   * @return string Binary data
    */
   public function read($len) {
-    $data = fread($this->handle_, 1);
+    if ($this->sendTimeoutSet_) {
+      stream_set_timeout($this->handle_, 0, $this->recvTimeout_*1000);
+      $this->sendTimeoutSet_ = FALSE;
+    }
+    $data = @fread($this->handle_, 1);
     if ($data === FALSE) {
       throw new Exception('TSocket: Could not read '.$len.' bytes from '.
                           $this->host_.':'.$this->port_);
@@ -103,11 +194,17 @@
 
   /**
    * Write to the socket.
+   *
+   * @param string $buf The data to write
    */
   public function write($buf) {
+    if (!$this->sendTimeoutSet_) {
+      stream_set_timeout($this->handle_, 0, $this->sendTimeout_*1000);
+      $this->sendTimeoutSet_ = TRUE;
+    }
     while (!empty($buf)) {
-      $got = fwrite($this->handle_, $buf);
-      if ($got == false) {
+      $got = @fwrite($this->handle_, $buf);
+      if ($got === 0 || $got === FALSE) {
         throw new Exception('TSocket: Could not write '.strlen($buf).' bytes '.
                             $this->host_.':'.$this->port_);
       }
@@ -119,7 +216,11 @@
    * Flush output to the socket.
    */
   public function flush() {
-    fflush($this->handle_);
+    $ret = fflush($this->handle_);
+    if ($ret === FALSE) {
+      throw new Exception('TSocket: Could not flush: '.
+                          $this->host_.':'.$this->port_);
+    }
   }
 }
 
diff --git a/lib/php/src/transport/TSocketPool.php b/lib/php/src/transport/TSocketPool.php
new file mode 100644
index 0000000..58f237a
--- /dev/null
+++ b/lib/php/src/transport/TSocketPool.php
@@ -0,0 +1,265 @@
+<?php
+
+/** Inherits from Socket */
+require_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php';
+
+/**
+ * This library makes use of APC cache to make hosts as down in a web
+ * environment. If you are running from the CLI or on a system without APC
+ * installed, then these null functions will step in and act like cache
+ * misses.
+ */
+if (!function_exists('apc_fetch')) {
+  function apc_fetch($key) { return FALSE; }
+  function apc_store($key, $var, $ttl=0) { return FALSE; }
+}
+
+/**
+ * Sockets implementation of the TTransport interface that allows connection
+ * to a pool of servers.
+ *
+ * @package thrift.transport
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TSocketPool extends TSocket {
+
+  /**
+   * Remote hostname
+   * 
+   * @var array
+   */
+  private $hosts_ = array('localhost');
+
+  /**
+   * Remote ports
+   * 
+   * @var array
+   */
+  private $ports_ = array('9090');
+
+  /**
+   * How many times to retry each host in connect
+   *
+   * @var int
+   */
+  private $numRetries_ = 1;
+
+  /**
+   * Retry interval in seconds, how long to not try a host if it has been
+   * marked as down.
+   *
+   * @var int
+   */
+  private $retryInterval_ = 60;
+
+  /**
+   * Max consecutive failures before marking a host down.
+   *
+   * @var int
+   */
+  private $maxConsecutiveFailures_ = 1;
+
+  /**
+   * Try hosts in order? or Randomized?
+   *
+   * @var bool
+   */
+  private $randomize_ = TRUE;
+
+  /**
+   * Always try last host, even if marked down?
+   *
+   * @var bool
+   */
+  private $alwaysTryLast_ = TRUE;
+
+  /**
+   * Socket pool constructor
+   *
+   * @param array  $hosts   List of remote hostnames
+   * @param mixed  $ports   Array of remote ports, or a single common port
+   * @param bool   $persist Whether to use a persistent socket
+   */
+  public function __construct($hosts=array('localhost'),
+                              $ports=array(9090),
+                              $persist=FALSE) {
+    parent::__construct(null, 0, $persist);
+    $this->hosts_ = $hosts;
+
+    // Ports may be an array or a single port
+    if (is_array($ports)) {
+      $this->ports_ = $ports;
+    } else {
+      $this->ports_ = array();
+      $num = count($hosts);
+      for ($i = 0; $i < $num; ++$i) {
+        $this->ports_ []= $ports;
+      }
+    }
+  }
+
+  /**
+   * Sets how many time to keep retrying a host in the connect function.
+   *
+   * @param int $numRetries
+   */
+  public function setNumRetries($numRetries) {
+    $this->numRetries_ = $numRetries;
+  }
+
+  /**
+   * Sets how long to wait until retrying a host if it was marked down
+   *
+   * @param int $numRetries
+   */
+  public function setRetryInterval($retryInterval) {
+    $this->retryInterval_ = $retryInterval;
+  }
+
+  /**
+   * Sets how many time to keep retrying a host before marking it as down.
+   *
+   * @param int $numRetries
+   */
+  public function setMaxConsecutiveFailures($maxConsecutiveFailures) {
+    $this->maxConsecutiveFailures_ = $maxConsecutiveFailures;
+  }
+
+  /**
+   * Turns randomization in connect order on or off.
+   *
+   * @param bool $randomize
+   */
+  public function setRandomize($randomize) {
+    $this->randomize_ = $randomize;
+  }
+
+  /**
+   * Whether to always try the last server.
+   *
+   * @param bool $alwaysTryLast
+   */
+  public function setAlwaysTryLast($alwaysTryLast) {
+    $this->alwaysTryLast_ = $alwaysTryLast;
+  }
+
+
+  /**
+   * Connects the socket by iterating through all the servers in the pool
+   * and trying to find one that works.
+   */
+  public function open() {
+    $numServers = count($this->hosts_);
+
+    // Check if a random server from the pool should be hit
+    if ($this->randomize_) {
+      $startingPoint = mt_rand(0, $numServers-1);
+    } else {
+      $startingPoint = 0;
+    }
+    $i = $startingPoint;
+
+    do {     
+      $host = $this->hosts_[$i];
+      $port = $this->ports_[$i];
+
+      // Check APC cache for a record of this server being down
+      $failtimeKey = 'thrift_failtime:'.$host_.':'.$port.'~';
+
+      // Cache miss? Assume it's OK
+      $lastFailtime = apc_fetch($failtimeKey);
+      if ($lastFailtime === FALSE) {
+        $lastFailtime = 0;
+      }
+
+      $retryIntervalPassed = FALSE;
+
+      // Cache hit...make sure enough the retry interval has elapsed
+      if ($lastFailtime > 0) {
+        $elapsed = time() - $lastFailtime;
+        if ($elapsed > $retryInterval) {
+          $retryIntervalPassed = TRUE;
+          if ($this->debug_) {
+            error_log('TSocketPool: retryInterval '.
+                      '('.$this->retryInterval_.') '.
+                      'has passed for host '.$host.':'.$port);
+          }
+        }
+      }
+
+      // Only connect if not in the middle of a fail interval, OR if this
+      // is the LAST server we are trying, just hammer away on it
+      $isLastServer = FALSE;
+      if ($alwaysTryLast) {
+        $isLastServer =
+          ( (($i+1) % $numServers) == $startingPoint ) ? TRUE : FALSE;
+      }
+
+      if (($lastFailtime === 0) ||
+          ($isLastServer) ||
+          ($lastFailtime > 0 && $retryIntervalPassed)) {
+
+        // Set underlying TSocket params to this one
+        $this->host_ = $host;
+        $this->port_ = $port;
+          
+        for ($attempt = 0; $attempt < $this->numRetries_; $attempt++) {
+          try {
+            parent::open();
+
+            // Only clear the failure counts if required to do so
+            if ($lastFailtime > 0) {
+              apc_store($failtimeKey, 0);
+            }
+            // Successful connection, return now
+            return;
+
+          } catch (Exception $x) {
+            // Connection failed
+          }
+        }
+
+        // Mark failure of this host in the cache
+        $consecfailsKey = 'thrift_consecfails:'.$host.':'.$port.'~';
+
+        // Ignore cache misses
+        $consecfails = apc_fetch($consecfailsKey);
+        if ($consecfails === FALSE) {
+          $consecfails = 0;
+        }
+
+        // Increment by one
+        $consecfails++;
+
+        // Log and cache this failure
+        if ($consecfails >= $this->maxConsecutiveFailures_) {
+          if ($this->debug_) {
+            error_log('TSocketPool: marking '.$host.':'.$port.
+                      ' as down for '.$this->retryInterval.' seconds '.
+                      'after '.$consecfails.' failed connect attempts.');
+          }
+          // Store the failure time
+          apc_store($failtimeKey, time());
+
+          // Clear the count of consecutive failures
+          apc_store($consecfailsKey, 0);
+        } else {
+          apc_store($consecfailsKey, $consecfails);
+        }
+      }
+      $i = ($i + 1) % $numServers;
+
+    } while ($i != $startingPoint);
+
+    // Holy shit we failed them all. The system is totally ill!
+    $error = 'TSocketPool: All hosts in pool are down. ';
+    $hostlist = implode(',', $this->hosts_);
+    $error .= '('.$hostlist.':'.$this->port_.')';
+    if ($this->debug_) {
+      error_log($error);
+    }
+    throw new Exception($error);
+  }
+}
+
+?>