Thrift PHP TSocketPool client

Summary: Client that connects to one of an arbitrary pool of servers

Reviewed By: aditya


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664795 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/php/src/transport/TSocket.php b/lib/php/src/transport/TSocket.php
index 0a3b090..74ef01f 100644
--- a/lib/php/src/transport/TSocket.php
+++ b/lib/php/src/transport/TSocket.php
@@ -20,21 +20,49 @@
    * 
    * @var string
    */
-  private $host_ = 'localhost';
+  protected $host_ = 'localhost';
 
   /**
    * Remote port
    *
    * @var int
    */
-  private $port_ = '9090';
+  protected $port_ = '9090';
+
+  /**
+   * Send timeout in milliseconds
+   *
+   * @var int
+   */
+  private $sendTimeout_ = 100;
+
+  /**
+   * Recv timeout in milliseconds
+   *
+   * @var int
+   */
+  private $recvTimeout_ = 750;
+
+  /**
+   * Is send timeout set?
+   *
+   * @var bool
+   */
+  private $sendTimeoutSet_ = FALSE;
 
   /**
    * Persistent socket or plain?
    *
    * @var bool
    */
-  private $persist_ = false;
+  private $persist_ = FALSE;
+
+  /**
+   * Debugging on?
+   *
+   * @var bool
+   */
+  private $debug_ = FALSE;
 
   /**
    * Socket constructor
@@ -43,13 +71,40 @@
    * @param int    $port    Remote port
    * @param bool   $persist Whether to use a persistent socket
    */
-  public function __construct($host='localhost', $port=9090, $persist=false) {
+  public function __construct($host='localhost', $port=9090, $persist=FALSE) {
     $this->host_ = $host;
     $this->port_ = $port;
     $this->persist_ = $persist;
   }
 
   /**
+   * Sets the send timeout.
+   *
+   * @param int $timeout
+   */
+  public function setSendTimeout($timeout) {
+    $this->sendTimeout_ = $timeout;
+  }
+
+  /**
+   * Sets the receive timeout.
+   *
+   * @param int $timeout
+   */
+  public function setRecvTimeout($timeout) {
+    $this->recvTimeout_ = $timeout;
+  }
+
+  /**
+   * Sets debugging output on or off
+   *
+   * @param bool $debug
+   */
+  public function setDebug($debug) {
+    $this->debug_ = $debug;
+  }
+
+  /**
    * Tests whether this is open
    *
    * @return bool true if the socket is open
@@ -63,37 +118,73 @@
    */
   public function open() {
     if ($this->persist_) {
-      $this->handle_ = pfsockopen($this->host_, $this->port_);
+      $this->handle_ = pfsockopen($this->host_,
+                                  $this->port_,
+                                  $errno,
+                                  $errstr,
+                                  $this->sendTimeout_/1000.0);
     } else {
-      $this->handle_ = fsockopen($this->host_, $this->port_);
+      $this->handle_ = fsockopen($this->host_,
+                                 $this->port_,
+                                 $errno,
+                                 $errstr,
+                                 $this->sendTimeout_/1000.0);
     }
-    if ($this->handle_ === FALSE) {      
-      throw new Exception('TSocket: Could not connect to '.
-                          $this->host_.':'.$this->port_);
+
+    // Connect failed?
+    if ($this->handle_ === FALSE) {
+      $error = 'TSocket: Could not connect to '.$this->host_.':'.$this->port_;
+      if ($this->debug_) {
+        error_log($error);
+      }
+      throw new Exception($error);
     }
+    
+    stream_set_timeout($this->handle_, 0, $this->sendTimeout_*1000);
+    $this->sendTimeoutSet_ = TRUE;
   }
 
   /**
-   * Closes the socket
+   * Closes the socket.
    */
   public function close() {
     if (!$this->persist_) {
-      fclose($this->handle_);
+      @fclose($this->handle_);
+      $this->handle_ = null;
     }
   }
   
   /**
    * Uses stream get contents to do the reading
+   *
+   * @param int $len How many bytes
+   * @return string Binary data
    */
   public function readAll($len) {
-    return stream_get_contents($this->handle_, $len);
+    if ($this->sendTimeoutSet_) {
+      stream_set_timeout($this->handle_, 0, $this->recvTimeout_*1000);
+      $this->sendTimeoutSet_ = FALSE;
+    }
+    $buf = @stream_get_contents($this->handle_, $len);
+    if ($buf === FALSE || strlen($buf) !== $len) {
+      throw new Exception('TSocket: Could not read '.$len.' bytes from '.
+                          $this->host_.':'.$this->port_);
+    }
+    return $buf;
   }
 
   /**
    * Read from the socket
+   *
+   * @param int $len How many bytes
+   * @return string Binary data
    */
   public function read($len) {
-    $data = fread($this->handle_, 1);
+    if ($this->sendTimeoutSet_) {
+      stream_set_timeout($this->handle_, 0, $this->recvTimeout_*1000);
+      $this->sendTimeoutSet_ = FALSE;
+    }
+    $data = @fread($this->handle_, 1);
     if ($data === FALSE) {
       throw new Exception('TSocket: Could not read '.$len.' bytes from '.
                           $this->host_.':'.$this->port_);
@@ -103,11 +194,17 @@
 
   /**
    * Write to the socket.
+   *
+   * @param string $buf The data to write
    */
   public function write($buf) {
+    if (!$this->sendTimeoutSet_) {
+      stream_set_timeout($this->handle_, 0, $this->sendTimeout_*1000);
+      $this->sendTimeoutSet_ = TRUE;
+    }
     while (!empty($buf)) {
-      $got = fwrite($this->handle_, $buf);
-      if ($got == false) {
+      $got = @fwrite($this->handle_, $buf);
+      if ($got === 0 || $got === FALSE) {
         throw new Exception('TSocket: Could not write '.strlen($buf).' bytes '.
                             $this->host_.':'.$this->port_);
       }
@@ -119,7 +216,11 @@
    * Flush output to the socket.
    */
   public function flush() {
-    fflush($this->handle_);
+    $ret = fflush($this->handle_);
+    if ($ret === FALSE) {
+      throw new Exception('TSocket: Could not flush: '.
+                          $this->host_.':'.$this->port_);
+    }
   }
 }