THRIFT-638. php: BufferedTransport + C extensions block until recv timeout is reached on last fread call

This patch refactors TSocket to make use of stream_select() for timeout detection. 

Patch: Nicholas Telford

git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1076917 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/php/src/transport/TSocket.php b/lib/php/src/transport/TSocket.php
index 8297631..f713016 100644
--- a/lib/php/src/transport/TSocket.php
+++ b/lib/php/src/transport/TSocket.php
@@ -50,25 +50,40 @@
   protected $port_ = '9090';
 
   /**
-   * Send timeout in milliseconds
+   * Send timeout in seconds.
+   *
+   * Combined with sendTimeoutUsec this is used for send timeouts.
    *
    * @var int
    */
-  private $sendTimeout_ = 100;
+  private $sendTimeoutSec_ = 0;
 
   /**
-   * Recv timeout in milliseconds
+   * Send timeout in microseconds.
+   *
+   * Combined with sendTimeoutSec this is used for send timeouts.
    *
    * @var int
    */
-  private $recvTimeout_ = 750;
+  private $sendTimeoutUsec_ = 100000;
 
   /**
-   * Is send timeout set?
+   * Recv timeout in seconds
    *
-   * @var bool
+   * Combined with recvTimeoutUsec this is used for recv timeouts.
+   *
+   * @var int
    */
-  private $sendTimeoutSet_ = FALSE;
+  private $recvTimeoutSec_ = 0;
+
+  /**
+   * Recv timeout in microseconds
+   *
+   * Combined with recvTimeoutSec this is used for recv timeouts.
+   *
+   * @var int
+   */
+  private $recvTimeoutUsec_ = 750000;
 
   /**
    * Persistent socket or plain?
@@ -123,7 +138,9 @@
    * @param int $timeout  Timeout in milliseconds.
    */
   public function setSendTimeout($timeout) {
-    $this->sendTimeout_ = $timeout;
+    $this->sendTimeoutSec_ = floor($timeout / 1000);
+    $this->sendTimeoutUsec_ =
+            ($timeout - ($this->sendTimeoutSec_ * 1000)) * 1000;
   }
 
   /**
@@ -132,7 +149,9 @@
    * @param int $timeout  Timeout in milliseconds.
    */
   public function setRecvTimeout($timeout) {
-    $this->recvTimeout_ = $timeout;
+    $this->recvTimeoutSec_ = floor($timeout / 1000);
+    $this->recvTimeoutUsec_ =
+            ($timeout - ($this->recvTimeoutSec_ * 1000)) * 1000;
   }
 
   /**
@@ -192,13 +211,13 @@
                                    $this->port_,
                                    $errno,
                                    $errstr,
-                                   $this->sendTimeout_/1000.0);
+                                   $this->sendTimeoutSec_ + ($this->sendTimeoutUsec_ / 1000000));
     } else {
       $this->handle_ = @fsockopen($this->host_,
                                   $this->port_,
                                   $errno,
                                   $errstr,
-                                  $this->sendTimeout_/1000.0);
+                                  $this->sendTimeoutSec_ + ($this->sendTimeoutUsec_ / 1000000));
     }
 
     // Connect failed?
@@ -209,9 +228,6 @@
       }
       throw new TException($error);
     }
-
-    stream_set_timeout($this->handle_, 0, $this->sendTimeout_*1000);
-    $this->sendTimeoutSet_ = TRUE;
   }
 
   /**
@@ -225,66 +241,30 @@
   }
 
   /**
-   * Uses stream get contents to do the reading
+   * Read from the socket at most $len bytes.
    *
-   * @param int $len How many bytes
-   * @return string Binary data
-   */
-  public function readAll($len) {
-    if ($this->sendTimeoutSet_) {
-      stream_set_timeout($this->handle_, 0, $this->recvTimeout_*1000);
-      $this->sendTimeoutSet_ = FALSE;
-    }
-    // This call does not obey stream_set_timeout values!
-    // $buf = @stream_get_contents($this->handle_, $len);
-
-    $pre = null;
-    while (TRUE) {
-      $buf = @fread($this->handle_, $len);
-      if ($buf === FALSE) {
-        $md = stream_get_meta_data($this->handle_);
-        if (true === $md['timed_out'] && false === $md['blocked']) {
-          throw new TTransportException('TSocket: timed out reading '.$len.' bytes from '.
-                               $this->host_.':'.$this->port_);
-        } else {
-          throw new TTransportException('TSocket: Could not read '.$len.' bytes from '.
-                               $this->host_.':'.$this->port_);
-        }
-      }
-      else if (($sz = strlen($buf)) < $len) {
-        if((strlen($buf) == 0) && feof($this->handle_)){
-          throw new TTransportException('TSocket read 0 bytes');
-        };
-
-        $md = stream_get_meta_data($this->handle_);
-        if (true === $md['timed_out'] && false === $md['blocked']) {
-          throw new TTransportException('TSocket: timed out reading '.$len.' bytes from '.
-                               $this->host_.':'.$this->port_);
-        } else {
-          $pre .= $buf;
-          $len -= $sz;
-        }
-      } else {
-        return $pre.$buf;
-      }
-    }
-  }
-
-  /**
-   * Read from the socket
+   * This method will not wait for all the requested data, it will return as
+   * soon as any data is received.
    *
-   * @param int $len How many bytes
+   * @param int $len Maximum number of bytes to read.
    * @return string Binary data
    */
   public function read($len) {
-    if ($this->sendTimeoutSet_) {
-      stream_set_timeout($this->handle_, 0, $this->recvTimeout_*1000);
-      $this->sendTimeoutSet_ = FALSE;
-    }
-    $data = @fread($this->handle_, $len);
-    if ($data === FALSE) {
-      $md = stream_get_meta_data($this->handle_);
-      if (true === $md['timed_out'] && false === $md['blocked']) {
+    $null = null;
+    $read = array($this->handle_);
+    $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec_, $this->recvTimeoutUsec_);
+
+    if ($readable > 0) {
+      $data = @stream_socket_recvfrom($this->handle_, $len);
+      if ($data === false) {
+          throw new TTransportException('TSocket: Could not read '.$len.' bytes from '.
+                               $this->host_.':'.$this->port_);
+      } elseif($data == '' && feof($this->handle_)) {
+          throw new TTransportException('TSocket read 0 bytes');
+        }
+
+      return $data;
+    } else if ($readable === 0) {
         throw new TTransportException('TSocket: timed out reading '.$len.' bytes from '.
                              $this->host_.':'.$this->port_);
       } else {
@@ -292,12 +272,6 @@
                              $this->host_.':'.$this->port_);
       }
     }
-    elseif((strlen($data) == 0) && feof($this->handle_))
-    {
-      throw new TTransportException('TSocket read 0 bytes');
-    };
-    return $data;
-  }
 
   /**
    * Write to the socket.
@@ -305,15 +279,23 @@
    * @param string $buf The data to write
    */
   public function write($buf) {
-    if (!$this->sendTimeoutSet_) {
-      stream_set_timeout($this->handle_, 0, $this->sendTimeout_*1000);
-      $this->sendTimeoutSet_ = TRUE;
-    }
+    $null = null;
+    $write = array($this->handle_);
+
+    // keep writing until all the data has been written
     while (strlen($buf) > 0) {
-      $got = @fwrite($this->handle_, $buf);
-      if ($got === 0 || $got === FALSE) {
-        $md = stream_get_meta_data($this->handle_);
-        if ($md['timed_out']) {
+      // wait for stream to become available for writing
+      $writable = @stream_select($null, $write, $null, $this->sendTimeoutSec_, $this->sendTimeoutUsec_);
+      if ($writable > 0) {
+        // write buffer to stream
+        $written = @stream_socket_sendto($this->handle_, $buf);
+        if ($written === -1 || $written === false) {
+          throw new TTransportException('TSocket: Could not write '.strlen($buf).' bytes '.
+                                   $this->host_.':'.$this->port_);
+        }
+        // determine how much of the buffer is left to write
+        $buf = substr($buf, $written);
+      } else if ($writable === 0) {
           throw new TTransportException('TSocket: timed out writing '.strlen($buf).' bytes from '.
                                $this->host_.':'.$this->port_);
         } else {
@@ -321,18 +303,18 @@
                                  $this->host_.':'.$this->port_);
         }
       }
-      $buf = substr($buf, $got);
     }
-  }
 
   /**
    * Flush output to the socket.
+   *
+   * Since read(), readAll() and write() operate on the sockets directly,
+   * this is a no-op
+   *
+   * If you wish to have flushable buffering behaviour, wrap this TSocket
+   * in a TBufferedTransport.
    */
   public function flush() {
-    $ret = fflush($this->handle_);
-    if ($ret === FALSE) {
-      throw new TException('TSocket: Could not flush: '.
-                           $this->host_.':'.$this->port_);
+    // no-op
     }
   }
-}