Thrift now works in PHP, hot stuff
Summary: End to end communication working in Thrift with PHP
Problem: It's a bit slower than pillar still. Need to find out why.
Reviewed By: aditya
Test Plan: Unit tests are in the test directory. Get lucas on the PHP case...
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664720 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/php/src/Thrift.php b/lib/php/src/Thrift.php
new file mode 100644
index 0000000..8a25611
--- /dev/null
+++ b/lib/php/src/Thrift.php
@@ -0,0 +1,9 @@
+<?php
+
+if (!defined('THRIFT_ROOT')) {
+ define('THRIFT_ROOT', dirname(__FILE__));
+}
+
+include_once THRIFT_ROOT.'/protocol/TProtocol.php';
+
+?>
diff --git a/lib/php/src/protocol/TBinaryProtocol.php b/lib/php/src/protocol/TBinaryProtocol.php
new file mode 100644
index 0000000..867a8aa
--- /dev/null
+++ b/lib/php/src/protocol/TBinaryProtocol.php
@@ -0,0 +1,195 @@
+<?php
+
+/** For transport operations */
+require_once THRIFT_ROOT.'/transport/TTransport.php';
+
+/**
+ * Binary implementation of the Thrift protocol.
+ *
+ * @package thrift.protocol
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TBinaryProtocol extends TProtocol {
+
+ public function writeStructBegin($out, $name) {
+ return 0;
+ }
+
+ public function writeStructEnd($out) {
+ return 0;
+ }
+
+ public function writeFieldBegin($out, $fieldName, $fieldType, $fieldId) {
+ return
+ $this->writeByte($out, $fieldType) +
+ $this->writeI32($out, $fieldId);
+ }
+
+ public function writeFieldEnd($out) {
+ return 0;
+ }
+
+ public function writeFieldStop($out) {
+ return
+ $this->writeByte($out, TType::STOP);
+ }
+
+ public function writeMapBegin($out, $keyType, $valType, $size) {
+ return
+ $this->writeByte($out, $keyType) +
+ $this->writeByte($out, $valType) +
+ $this->writeI32($out, $size);
+ }
+
+ public function writeMapEnd($out) {
+ return 0;
+ }
+
+ public function writeListBegin($out, $elemType, $size) {
+ return
+ $this->writeByte($out, $elemType) +
+ $this->writeI32($out, $size);
+ }
+
+ public function writeListEnd($out) {
+ return 0;
+ }
+
+ public function writeSetBegin($out, $elemType, $size) {
+ return
+ $this->writeByte($out, $elemType) +
+ $this->writeI32($out, $size);
+ }
+
+ public function writeSetEnd($out) {
+ return 0;
+ }
+
+ public function writeByte($out, $byte) {
+ $data = pack('c', $byte);
+ $out->write($data, 1);
+ return 1;
+ }
+
+ public function writeI32($out, $i32) {
+ $data = pack('l', $i32);
+ //if (!defined('BIG_ENDIAN')) {
+ $data = strrev($data);
+ //}
+ $out->write($data, 4);
+ return 4;
+ }
+
+ public function writeI64($out, $i64) {
+ $hi = $i64 >> 32;
+ $lo = $i64 & 0xFFFFFFFF;
+ if (!defined('BIG_ENDIAN')) {
+ $data = pack('N2', $hi, $lo);
+ } else {
+ $data = pack('N2', $lo, $hi);
+ }
+ $out->write($data, 8);
+ return 8;
+ }
+
+ public function writeString($out, $str) {
+ $len = strlen($str);
+ $result = $this->writeI32($out, $len);
+ $out->write($str, $len);
+ return $result + $len;
+ }
+
+ public function readStructBegin($in, &$name) {
+ $name = '';
+ return 0;
+ }
+
+ public function readStructEnd($in) {
+ return 0;
+ }
+
+ public function readFieldBegin($in, &$name, &$fieldType, &$fieldId) {
+ $result = $this->readByte($in, $fieldType);
+ if ($fieldType == TType::STOP) {
+ $fieldId = 0;
+ return $result;
+ }
+ $result += $this->readI32($in, $fieldId);
+ return $result;
+ }
+
+ public function readFieldEnd($in) {
+ return 0;
+ }
+
+ public function readMapBegin($in, &$keyType, &$valType, &$size) {
+ $result = $this->readByte($in, $keyType);
+ $result += $this->readByte($in, $valType);
+ $result += $this->readI32($in, $size);
+ return $result;
+ }
+
+ public function readMapEnd($in) {
+ return 0;
+ }
+
+ public function readListBegin($in, &$elemType, &$size) {
+ $result = $this->readByte($in, $elemType);
+ $result += $this->readI32($in, $size);
+ return $result;
+ }
+
+ public function readListEnd($in) {
+ return 0;
+ }
+
+ public function readSetBegin($in, &$elemType, &$size) {
+ $result = $this->readByte($in, $elemType);
+ $result += $this->readI32($in, $size);
+ return $result;
+ }
+
+ public function readSetEnd($in) {
+ return 0;
+ }
+
+ public function readByte($in, &$byte) {
+ $data = $in->readAll(1);
+ $arr = unpack('c', $data);
+ $byte = $arr[1];
+ return 1;
+ }
+
+ public function readI32($in, &$i32) {
+ $data = $in->readAll(4);
+ if (!defined('BIG_ENDIAN')) {
+ $data = strrev($data);
+ }
+ $arr = unpack('l', $data);
+ $i32 = $arr[1];
+ return 4;
+ }
+
+ public function readI64($in, &$i64) {
+ $data = $in->readAll(8);
+ $arr = unpack('N2', $data);
+
+ // Check for a negative
+ if ($arr[1] & 0x80000000) {
+ $arr[1] = $arr[1] ^ 0xFFFFFFFF;
+ $arr[2] = $arr[2] ^ 0xFFFFFFFF;
+ $i64 = 0 - $arr[1]*4294967296 - $arr[2] - 1;
+ } else {
+ $i64 = $arr[1]*4294967296 + $arr[2];
+ }
+ return 8;
+ }
+
+ public function readString($in, &$str) {
+ $result = $this->readI32($in, $len);
+ $str = $in->readAll($len);
+ return $result + $len;
+ }
+}
+
+?>
\ No newline at end of file
diff --git a/lib/php/src/protocol/TProtocol.php b/lib/php/src/protocol/TProtocol.php
new file mode 100644
index 0000000..5a69bbd
--- /dev/null
+++ b/lib/php/src/protocol/TProtocol.php
@@ -0,0 +1,157 @@
+<?php
+
+/** Types */
+require_once THRIFT_ROOT.'/protocol/TType.php';
+
+/**
+ * Protocol module.
+ *
+ * @package thrift.protocol
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+abstract class TProtocol {
+
+ /**
+ * Writes a struct header.
+ *
+ * @param TTransport $out Output transport
+ * @param string $name Struct name
+ * @throws TException on write error
+ * @return int How many bytes written
+ */
+ public abstract function writeStructBegin($out, $name);
+
+
+ /**
+ * Close a struct.
+ *
+ * @param TTransport $out Output transport
+ * @throws TException on write error
+ * @return int How many bytes written
+ */
+ public abstract function writeStructEnd($out);
+
+ /*
+ * Starts a field.
+ *
+ * @param TTransport $out Output transport
+ * @param string $name Field name
+ * @param int $type Field type
+ * @param int $fid Field id
+ * @throws TException on write error
+ * @return int How many bytes written
+ */
+ public abstract function writeFieldBegin($out, $fieldName, $fieldType, $fieldId);
+
+ public abstract function writeFieldEnd($out);
+
+ public abstract function writeFieldStop($out);
+
+ public abstract function writeMapBegin($out, $keyType, $valType, $size);
+
+ public abstract function writeMapEnd($out);
+
+ public abstract function writeListBegin($out, $elemType, $size);
+
+ public abstract function writeListEnd($out);
+
+ public abstract function writeSetBegin($out, $elemType, $size);
+
+ public abstract function writeSetEnd($out);
+
+ public abstract function writeByte($out, $byte);
+
+ public abstract function writeI32($out, $i32);
+
+ public abstract function writeI64($out, $i64);
+
+ public abstract function writeString($out, $str);
+
+
+ public abstract function readStructBegin($in, &$name);
+
+ public abstract function readStructEnd($in);
+
+ public abstract function readFieldBegin($in, &$name, &$fieldType, &$fieldId);
+
+ public abstract function readFieldEnd($in);
+
+ public abstract function readMapBegin($in, &$keyType, &$valType, &$size);
+
+ public abstract function readMapEnd($in);
+
+ public abstract function readListBegin($in, &$elemType, &$size);
+
+ public abstract function readListEnd($in);
+
+ public abstract function readSetBegin($in, &$elemType, &$size);
+
+ public abstract function readSetEnd($in);
+
+ public abstract function readByte($in, &$byte);
+
+ public abstract function readI32($in, &$i32);
+
+ public abstract function readI64($in, &$i64);
+
+ public abstract function readString($in, &$str);
+
+ public function skip($in, $type) {
+ switch ($type) {
+ case TType::BYTE:
+ return $this->readByte($in, $byte);
+ case TType::I32:
+ return $this->readI32($in, $i32);
+ case TType::I64:
+ return $this->readI64($in, $i64);
+ case TType::STRING:
+ return $this->readString($in, $str);
+ case TType::STRUCT:
+ {
+ $result = $this->readStructBegin($in, $name);
+ while (true) {
+ $result += $this->readFieldBegin($in, $name, $ftype, $fid);
+ if ($ftype == TType::STOP) {
+ break;
+ }
+ $result += $this->skip($in, $ftype);
+ $result += $this->readFieldEnd($in);
+ }
+ $result += $this->readStructEnd($in);
+ return $result;
+ }
+ case TType::MAP:
+ {
+ $result = $this->readMapBegin($in, $keyType, $valType, $size);
+ for ($i = 0; $i < $size; $i++) {
+ $result += $this->skip($in, $keyType);
+ $result += $this->skip($in, $valType);
+ }
+ $result += $this->readMapEnd($in);
+ return $result;
+ }
+ case TType::SET:
+ {
+ $result = $this->readSetBegin($in, $elemType, $size);
+ for ($i = 0; $i < $size; $i++) {
+ $result += $this->skip($in, $elemType);
+ }
+ $result += $this->readSetEnd($in);
+ return $result;
+ }
+ case TType::LST:
+ {
+ $result = $this->readListBegin($in, $elemType, $size);
+ for ($i = 0; $i < $size; $i++) {
+ $result += $this->skip($in, $elemType);
+ }
+ $result += $this->readListEnd($in);
+ return $result;
+ }
+ default:
+ return 0;
+ }
+ }
+}
+
+?>
diff --git a/lib/php/src/protocol/TType.php b/lib/php/src/protocol/TType.php
new file mode 100644
index 0000000..957efe6
--- /dev/null
+++ b/lib/php/src/protocol/TType.php
@@ -0,0 +1,19 @@
+<?php
+
+/**
+ * Constants for Thrift data types.
+ *
+ * @package thrift.protocol
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TType {
+ const STOP = 1;
+ const BYTE = 2;
+ const I32 = 6;
+ const I64 = 8;
+ const STRING = 9;
+ const STRUCT = 10;
+ const MAP = 11;
+ const SET = 12;
+ const LST = 13; // cannot use LIST keyword in PHP!
+}
diff --git a/lib/php/src/transport/TBufferedTransport.php b/lib/php/src/transport/TBufferedTransport.php
new file mode 100644
index 0000000..dad96ff
--- /dev/null
+++ b/lib/php/src/transport/TBufferedTransport.php
@@ -0,0 +1,108 @@
+<?php
+
+/**
+ * Buffered transport. Stores data to an internal buffer that it doesn't
+ * actually write out until flush is called. For reading, we do a greedy
+ * read and then serve data out of the internal buffer.
+ *
+ * @package thrift.transport
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TBufferedTransport extends TTransport {
+
+ /**
+ * Constructor. Creates a buffered transport around an underlying transport
+ */
+ public function __construct($transport=null, $rBufSize=512, $wBufSize=512) {
+ $this->transport_ = $transport;
+ $this->rBufSize_ = $rBufSize;
+ $this->wBufSize_ = $wBufSize;
+ }
+
+ /**
+ * The underlying transport
+ *
+ * @var TTransport
+ */
+ protected $transport_ = null;
+
+ /**
+ * The receive buffer size
+ *
+ * @var int
+ */
+ protected $rBufSize_ = 512;
+
+ /**
+ * The write buffer size
+ *
+ * @var int
+ */
+ protected $wBufSize_ = 512;
+
+ /**
+ * The write buffer.
+ *
+ * @var string
+ */
+ protected $wBuf_ = '';
+
+ /**
+ * The read buffer.
+ *
+ * @var string
+ */
+ protected $rBuf_ = '';
+
+ public function isOpen() {
+ return $this->transport_->isOpen();
+ }
+
+ public function open() {
+ $this->transport_->open();
+ }
+
+ public function close() {
+ $this->transport_->close();
+ }
+
+ public function readAll($len) {
+ return $this->transport_->readAll($len);
+ }
+
+ public function read($len) {
+ // Methinks PHP is already buffering these for us
+ return $this->transport_->read($len);
+
+ if (strlen($this->rBuf_) >= $len) {
+ $ret = substr($this->rBuf_, 0, $len);
+ $this->rBuf_ = substr($this->rBuf_, $len);
+ return $ret;
+ }
+
+ $this->rBuf_ .= $this->transport_->read($this->rBufSize_);
+ $give = min(strlen($this->rBuf_), $len);
+ $ret = substr($this->rBuf_, 0, $give);
+ $this->rBuf_ = substr($this->rBuf_, $give);
+ return $ret;
+ }
+
+ public function write($buf) {
+ $this->wBuf_ .= $buf;
+ if (strlen($this->wBuf_) >= $this->wBufSize_) {
+ $this->transport_->write($this->wBuf_);
+ $this->wBuf_ = '';
+ }
+ }
+
+ public function flush() {
+ if (!empty($this->wBuf_)) {
+ $this->transport_->write($this->wBuf_);
+ $this->wBuf_ = '';
+ }
+ }
+
+}
+
+?>
+
diff --git a/lib/php/src/transport/TChunkedTransport.php b/lib/php/src/transport/TChunkedTransport.php
new file mode 100644
index 0000000..04fba1f
--- /dev/null
+++ b/lib/php/src/transport/TChunkedTransport.php
@@ -0,0 +1,106 @@
+<?php
+
+/**
+ * Chunked transport. Writes and reads data in chunks that are stamped with
+ * their length.
+ *
+ * @package thrift.transport
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TChunkedTransport extends TTransport {
+
+ /**
+ * Underlying transport object.
+ *
+ * @var TTransport
+ */
+ private $transport_;
+
+ /**
+ * Buffer for read data.
+ *
+ * @var string
+ */
+ private $rBuf_;
+
+ /**
+ * Buffer for queued output data
+ *
+ * @var string
+ */
+ private $wBuf_;
+
+ /**
+ * Constructor.
+ *
+ * @param TTransport $transport Underlying transport
+ */
+ public __construct($transport=null) {
+ $this->transport_ = $transport;
+ }
+
+ /**
+ * Reads from the buffer. When more data is required reads another entire
+ * chunk and serves future reads out of that.
+ *
+ * @param int $len How much data
+ */
+ public function read($len) {
+ $out = '';
+ $need = $len;
+ $have = strlen($this->rBuf_);
+ if ($need > $have) {
+ $out = $this->rBuf_;
+ $need -= $have;
+ $this->readChunk();
+ }
+
+ $give = $need;
+ if (strlen($this->rBuf_) < $give) {
+ $out .= $this->rBuf_;
+ $this->rBuf_ = '';
+ } else {
+ $out .= substr($this->rBuf_, 0, $give);
+ $this->rBuf_ = substr($this->rBuf_, $give);
+ }
+
+ return $out;
+ }
+
+ /**
+ * Reads a chunk of data into the internal read buffer.
+ */
+ private function readChunk() {
+ $buf = $this->transport_->readAll(4);
+ $val = unpack('N', $buf);
+ $sz = $val[1];
+
+ $this->rBuf_ = $this->transport_->readAll($sz);
+ }
+
+ /**
+ * Writes some data to the pending output buffer.
+ *
+ * @param string $buf The data
+ * @param int $len Limit of bytes to write
+ */
+ public function write($buf, $len=null) {
+ if ($len !== null && $len < strlen($buf)) {
+ $buf = substr($buf, 0, $len);
+ }
+ $this->wBuf_ .= $buf;
+ }
+
+ /**
+ * Writes the output buffer to the stream in the format of a 4-byte length
+ * followed by the actual data.
+ */
+ public function flush() {
+ $out = pack('N', strlen($this->wBuf_));
+ $out .= $this->wBuf_;
+ $this->transport_->write($out);
+ $this->transport_->flush();
+ $this->wBuf_ = '';
+ }
+
+}
\ No newline at end of file
diff --git a/lib/php/src/transport/TSocket.php b/lib/php/src/transport/TSocket.php
new file mode 100644
index 0000000..0a3b090
--- /dev/null
+++ b/lib/php/src/transport/TSocket.php
@@ -0,0 +1,126 @@
+<?php
+
+/**
+ * Sockets implementation of the TTransport interface.
+ *
+ * @package thrift.transport
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TSocket extends TTransport {
+
+ /**
+ * Handle to PHP socket
+ *
+ * @var resource
+ */
+ private $handle_ = null;
+
+ /**
+ * Remote hostname
+ *
+ * @var string
+ */
+ private $host_ = 'localhost';
+
+ /**
+ * Remote port
+ *
+ * @var int
+ */
+ private $port_ = '9090';
+
+ /**
+ * Persistent socket or plain?
+ *
+ * @var bool
+ */
+ private $persist_ = false;
+
+ /**
+ * Socket constructor
+ *
+ * @param string $host Remote hostname
+ * @param int $port Remote port
+ * @param bool $persist Whether to use a persistent socket
+ */
+ public function __construct($host='localhost', $port=9090, $persist=false) {
+ $this->host_ = $host;
+ $this->port_ = $port;
+ $this->persist_ = $persist;
+ }
+
+ /**
+ * Tests whether this is open
+ *
+ * @return bool true if the socket is open
+ */
+ public function isOpen() {
+ return is_resource($this->handle_);
+ }
+
+ /**
+ * Connects the socket.
+ */
+ public function open() {
+ if ($this->persist_) {
+ $this->handle_ = pfsockopen($this->host_, $this->port_);
+ } else {
+ $this->handle_ = fsockopen($this->host_, $this->port_);
+ }
+ if ($this->handle_ === FALSE) {
+ throw new Exception('TSocket: Could not connect to '.
+ $this->host_.':'.$this->port_);
+ }
+ }
+
+ /**
+ * Closes the socket
+ */
+ public function close() {
+ if (!$this->persist_) {
+ fclose($this->handle_);
+ }
+ }
+
+ /**
+ * Uses stream get contents to do the reading
+ */
+ public function readAll($len) {
+ return stream_get_contents($this->handle_, $len);
+ }
+
+ /**
+ * Read from the socket
+ */
+ public function read($len) {
+ $data = fread($this->handle_, 1);
+ if ($data === FALSE) {
+ throw new Exception('TSocket: Could not read '.$len.' bytes from '.
+ $this->host_.':'.$this->port_);
+ }
+ return $data;
+ }
+
+ /**
+ * Write to the socket.
+ */
+ public function write($buf) {
+ while (!empty($buf)) {
+ $got = fwrite($this->handle_, $buf);
+ if ($got == false) {
+ throw new Exception('TSocket: Could not write '.strlen($buf).' bytes '.
+ $this->host_.':'.$this->port_);
+ }
+ $buf = substr($buf, $got);
+ }
+ }
+
+ /**
+ * Flush output to the socket.
+ */
+ public function flush() {
+ fflush($this->handle_);
+ }
+}
+
+?>
diff --git a/lib/php/src/transport/TTransport.php b/lib/php/src/transport/TTransport.php
new file mode 100644
index 0000000..8d57d2a
--- /dev/null
+++ b/lib/php/src/transport/TTransport.php
@@ -0,0 +1,72 @@
+<?php
+
+/**
+ * Base interface for a transport agent.
+ *
+ * @package thrift.transport
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+abstract class TTransport {
+
+ /**
+ * Whether this transport is open.
+ *
+ * @return boolean true if open
+ */
+ public abstract function isOpen();
+
+ /**
+ * Open the transport for reading/writing
+ *
+ * @throws TTransportException if cannot open
+ */
+ public abstract function open();
+
+ /**
+ * Close the transport.
+ */
+ public abstract function close();
+
+ /**
+ * Read some data into the array.
+ *
+ * @param int $len How much to read
+ * @return string The data that has been read
+ * @throws TTransportException if cannot read any more data
+ */
+ public abstract function read($len);
+
+ /**
+ * Guarantees that the full amount of data is read.
+ *
+ * @return string The data, of exact length
+ * @throws TTransportException if cannot read data
+ */
+ public function readAll($len) {
+ // return $this->read($len);
+
+ $data = '';
+ $got = 0;
+ while (($got = strlen($data)) < $len) {
+ $data .= $this->read($len - $got);
+ }
+ return $data;
+ }
+
+ /**
+ * Writes the given data out.
+ *
+ * @param string $buf The data to write
+ * @throws TTransportException if writing fails
+ */
+ public abstract function write($buf);
+
+ /**
+ * Flushes any pending data out of a buffer
+ *
+ * @throws TTransportException if a writing error occurs
+ */
+ public function flush() {}
+}
+
+?>