THRIFT-638. php: BufferedTransport + C extensions block until recv timeout is reached on last fread call
This patch refactors TSocket to make use of stream_select() for timeout detection.
Patch: Nicholas Telford
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1076917 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/php/src/transport/TSocket.php b/lib/php/src/transport/TSocket.php
index 8297631..f713016 100644
--- a/lib/php/src/transport/TSocket.php
+++ b/lib/php/src/transport/TSocket.php
@@ -50,25 +50,40 @@
protected $port_ = '9090';
/**
- * Send timeout in milliseconds
+ * Send timeout in seconds.
+ *
+ * Combined with sendTimeoutUsec this is used for send timeouts.
*
* @var int
*/
- private $sendTimeout_ = 100;
+ private $sendTimeoutSec_ = 0;
/**
- * Recv timeout in milliseconds
+ * Send timeout in microseconds.
+ *
+ * Combined with sendTimeoutSec this is used for send timeouts.
*
* @var int
*/
- private $recvTimeout_ = 750;
+ private $sendTimeoutUsec_ = 100000;
/**
- * Is send timeout set?
+ * Recv timeout in seconds
*
- * @var bool
+ * Combined with recvTimeoutUsec this is used for recv timeouts.
+ *
+ * @var int
*/
- private $sendTimeoutSet_ = FALSE;
+ private $recvTimeoutSec_ = 0;
+
+ /**
+ * Recv timeout in microseconds
+ *
+ * Combined with recvTimeoutSec this is used for recv timeouts.
+ *
+ * @var int
+ */
+ private $recvTimeoutUsec_ = 750000;
/**
* Persistent socket or plain?
@@ -123,7 +138,9 @@
* @param int $timeout Timeout in milliseconds.
*/
public function setSendTimeout($timeout) {
- $this->sendTimeout_ = $timeout;
+ $this->sendTimeoutSec_ = floor($timeout / 1000);
+ $this->sendTimeoutUsec_ =
+ ($timeout - ($this->sendTimeoutSec_ * 1000)) * 1000;
}
/**
@@ -132,7 +149,9 @@
* @param int $timeout Timeout in milliseconds.
*/
public function setRecvTimeout($timeout) {
- $this->recvTimeout_ = $timeout;
+ $this->recvTimeoutSec_ = floor($timeout / 1000);
+ $this->recvTimeoutUsec_ =
+ ($timeout - ($this->recvTimeoutSec_ * 1000)) * 1000;
}
/**
@@ -192,13 +211,13 @@
$this->port_,
$errno,
$errstr,
- $this->sendTimeout_/1000.0);
+ $this->sendTimeoutSec_ + ($this->sendTimeoutUsec_ / 1000000));
} else {
$this->handle_ = @fsockopen($this->host_,
$this->port_,
$errno,
$errstr,
- $this->sendTimeout_/1000.0);
+ $this->sendTimeoutSec_ + ($this->sendTimeoutUsec_ / 1000000));
}
// Connect failed?
@@ -209,9 +228,6 @@
}
throw new TException($error);
}
-
- stream_set_timeout($this->handle_, 0, $this->sendTimeout_*1000);
- $this->sendTimeoutSet_ = TRUE;
}
/**
@@ -225,66 +241,30 @@
}
/**
- * Uses stream get contents to do the reading
+ * Read from the socket at most $len bytes.
*
- * @param int $len How many bytes
- * @return string Binary data
- */
- public function readAll($len) {
- if ($this->sendTimeoutSet_) {
- stream_set_timeout($this->handle_, 0, $this->recvTimeout_*1000);
- $this->sendTimeoutSet_ = FALSE;
- }
- // This call does not obey stream_set_timeout values!
- // $buf = @stream_get_contents($this->handle_, $len);
-
- $pre = null;
- while (TRUE) {
- $buf = @fread($this->handle_, $len);
- if ($buf === FALSE) {
- $md = stream_get_meta_data($this->handle_);
- if (true === $md['timed_out'] && false === $md['blocked']) {
- throw new TTransportException('TSocket: timed out reading '.$len.' bytes from '.
- $this->host_.':'.$this->port_);
- } else {
- throw new TTransportException('TSocket: Could not read '.$len.' bytes from '.
- $this->host_.':'.$this->port_);
- }
- }
- else if (($sz = strlen($buf)) < $len) {
- if((strlen($buf) == 0) && feof($this->handle_)){
- throw new TTransportException('TSocket read 0 bytes');
- };
-
- $md = stream_get_meta_data($this->handle_);
- if (true === $md['timed_out'] && false === $md['blocked']) {
- throw new TTransportException('TSocket: timed out reading '.$len.' bytes from '.
- $this->host_.':'.$this->port_);
- } else {
- $pre .= $buf;
- $len -= $sz;
- }
- } else {
- return $pre.$buf;
- }
- }
- }
-
- /**
- * Read from the socket
+ * This method will not wait for all the requested data, it will return as
+ * soon as any data is received.
*
- * @param int $len How many bytes
+ * @param int $len Maximum number of bytes to read.
* @return string Binary data
*/
public function read($len) {
- if ($this->sendTimeoutSet_) {
- stream_set_timeout($this->handle_, 0, $this->recvTimeout_*1000);
- $this->sendTimeoutSet_ = FALSE;
- }
- $data = @fread($this->handle_, $len);
- if ($data === FALSE) {
- $md = stream_get_meta_data($this->handle_);
- if (true === $md['timed_out'] && false === $md['blocked']) {
+ $null = null;
+ $read = array($this->handle_);
+ $readable = @stream_select($read, $null, $null, $this->recvTimeoutSec_, $this->recvTimeoutUsec_);
+
+ if ($readable > 0) {
+ $data = @stream_socket_recvfrom($this->handle_, $len);
+ if ($data === false) {
+ throw new TTransportException('TSocket: Could not read '.$len.' bytes from '.
+ $this->host_.':'.$this->port_);
+ } elseif($data == '' && feof($this->handle_)) {
+ throw new TTransportException('TSocket read 0 bytes');
+ }
+
+ return $data;
+ } else if ($readable === 0) {
throw new TTransportException('TSocket: timed out reading '.$len.' bytes from '.
$this->host_.':'.$this->port_);
} else {
@@ -292,12 +272,6 @@
$this->host_.':'.$this->port_);
}
}
- elseif((strlen($data) == 0) && feof($this->handle_))
- {
- throw new TTransportException('TSocket read 0 bytes');
- };
- return $data;
- }
/**
* Write to the socket.
@@ -305,15 +279,23 @@
* @param string $buf The data to write
*/
public function write($buf) {
- if (!$this->sendTimeoutSet_) {
- stream_set_timeout($this->handle_, 0, $this->sendTimeout_*1000);
- $this->sendTimeoutSet_ = TRUE;
- }
+ $null = null;
+ $write = array($this->handle_);
+
+ // keep writing until all the data has been written
while (strlen($buf) > 0) {
- $got = @fwrite($this->handle_, $buf);
- if ($got === 0 || $got === FALSE) {
- $md = stream_get_meta_data($this->handle_);
- if ($md['timed_out']) {
+ // wait for stream to become available for writing
+ $writable = @stream_select($null, $write, $null, $this->sendTimeoutSec_, $this->sendTimeoutUsec_);
+ if ($writable > 0) {
+ // write buffer to stream
+ $written = @stream_socket_sendto($this->handle_, $buf);
+ if ($written === -1 || $written === false) {
+ throw new TTransportException('TSocket: Could not write '.strlen($buf).' bytes '.
+ $this->host_.':'.$this->port_);
+ }
+ // determine how much of the buffer is left to write
+ $buf = substr($buf, $written);
+ } else if ($writable === 0) {
throw new TTransportException('TSocket: timed out writing '.strlen($buf).' bytes from '.
$this->host_.':'.$this->port_);
} else {
@@ -321,18 +303,18 @@
$this->host_.':'.$this->port_);
}
}
- $buf = substr($buf, $got);
}
- }
/**
* Flush output to the socket.
+ *
+ * Since read(), readAll() and write() operate on the sockets directly,
+ * this is a no-op
+ *
+ * If you wish to have flushable buffering behaviour, wrap this TSocket
+ * in a TBufferedTransport.
*/
public function flush() {
- $ret = fflush($this->handle_);
- if ($ret === FALSE) {
- throw new TException('TSocket: Could not flush: '.
- $this->host_.':'.$this->port_);
+ // no-op
}
}
-}