Thrift now works in PHP, hot stuff

Summary: End to end communication working in Thrift with PHP

Problem: It's a bit slower than pillar still. Need to find out why.

Reviewed By: aditya

Test Plan: Unit tests are in the test directory. Get lucas on the PHP case...




git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664720 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/Makefile b/lib/cpp/Makefile
index 15664fa..8403d7c 100644
--- a/lib/cpp/Makefile
+++ b/lib/cpp/Makefile
@@ -17,6 +17,7 @@
 # Source files
 SRCS  = src/protocol/TBinaryProtocol.cc \
 	src/transport/TBufferedTransport.cc \
+	src/transport/TChunkedTransport.cc \
 	src/transport/TSocket.cc \
 	src/transport/TServerSocket.cc \
 	src/server/TSimpleServer.cc
diff --git a/lib/cpp/src/protocol/TBinaryProtocol.cc b/lib/cpp/src/protocol/TBinaryProtocol.cc
index 666a6a4..6ac028a 100644
--- a/lib/cpp/src/protocol/TBinaryProtocol.cc
+++ b/lib/cpp/src/protocol/TBinaryProtocol.cc
@@ -240,8 +240,12 @@
   uint32_t result;
   int32_t size;
   result = readI32(in, size);
-  uint8_t b[size];
+
+  // Use the heap here to prevent stack overflow for v. large strings
+  uint8_t *b = new uint8_t[size];
   in->readAll(b, size);
   str = string((char*)b, size);
+  delete [] b;
+
   return result + (uint32_t)size;
 }
diff --git a/lib/cpp/src/transport/TBufferedTransport.cc b/lib/cpp/src/transport/TBufferedTransport.cc
index 3fccc58..d7ce56a 100644
--- a/lib/cpp/src/transport/TBufferedTransport.cc
+++ b/lib/cpp/src/transport/TBufferedTransport.cc
@@ -7,7 +7,7 @@
   // We don't have enough data yet
   if (rLen_-rPos_ < need) {
     // Copy out whatever we have
-    if (rLen_ > 0) {
+    if (rLen_-rPos_ > 0) {
       memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
       need -= rLen_-rPos_;
       buf += rLen_-rPos_;
diff --git a/lib/cpp/src/transport/TBufferedTransport.h b/lib/cpp/src/transport/TBufferedTransport.h
index 991b50c..43c31f1 100644
--- a/lib/cpp/src/transport/TBufferedTransport.h
+++ b/lib/cpp/src/transport/TBufferedTransport.h
@@ -53,6 +53,10 @@
   void close() {
     transport_->close();
   }
+
+  uint32_t readAll(uint8_t* buf, uint32_t len) {
+    return transport_->readAll(buf, len);
+  }
   
   uint32_t read(uint8_t* buf, uint32_t len);
 
diff --git a/lib/cpp/src/transport/TChunkedTransport.cc b/lib/cpp/src/transport/TChunkedTransport.cc
new file mode 100644
index 0000000..f35d747
--- /dev/null
+++ b/lib/cpp/src/transport/TChunkedTransport.cc
@@ -0,0 +1,97 @@
+#include "TChunkedTransport.h"
+using std::string;
+
+uint32_t TChunkedTransport::read(uint8_t* buf, uint32_t len) {
+  uint32_t need = len;
+
+  // We don't have enough data yet
+  if (rLen_-rPos_ < need) {
+    // Copy out whatever we have
+    if (rLen_-rPos_ > 0) {
+      memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
+      need -= rLen_-rPos_;
+      buf += rLen_-rPos_;
+    }
+
+    // Read another chunk
+    readChunk();
+  }
+  
+  // Hand over whatever we have
+  uint32_t give = need;
+  if (rLen_-rPos_ < give) {
+    give = rLen_-rPos_;
+  }
+  memcpy(buf, rBuf_+rPos_, give);
+  rPos_ += give;
+  need -= give;
+  return (len - need);
+}
+
+void TChunkedTransport::readChunk() {
+  // Get rid of the old chunk
+  if (rBuf_ != NULL) {
+    delete [] rBuf_;
+    rBuf_ = NULL;
+  }
+
+  // Read in the next chunk size
+  int32_t sz;
+  transport_->readAll((uint8_t*)&sz, 4);
+
+  if (sz < 0) {
+    throw new TTransportException("Next chunk has negative size");
+  }
+
+  // Read the chunk payload, reset markers
+  rBuf_ = new uint8_t[sz];
+  transport_->readAll(rBuf_, sz);
+  rPos_ = 0;
+  rLen_ = sz;
+}
+
+void TChunkedTransport::write(const uint8_t* buf, uint32_t len) {
+  if (len == 0) {
+    return;
+  }
+
+  // Need to grow the buffer
+  if (len + wLen_ >= wBufSize_) {
+
+    // Double buffer size until sufficient
+    while (wBufSize_ < len + wLen_) {
+      wBufSize_ *= 2;
+    }
+
+    // Allocate new buffer
+    uint8_t* wBuf2 = new uint8_t[wBufSize_];
+
+    // Copy the old buffer to the new one
+    memcpy(wBuf2, wBuf_, wLen_);
+   
+    // Now point buf to the new one
+    delete [] wBuf_;
+    wBuf_ = wBuf2;
+  }
+
+  // Copy data into buffer
+  memcpy(wBuf_ + wLen_, buf, len);
+  wLen_ += len;
+}
+
+void TChunkedTransport::flush()  {
+  // Write chunk size
+  int32_t sz = wLen_;
+  transport_->write((const uint8_t*)&sz, 4);
+  
+  // Write chunk body
+  if (sz > 0) {
+    transport_->write(wBuf_, wLen_);
+  }
+
+  // All done
+  wLen_ = 0;
+
+  // Flush the underlying
+  transport_->flush();
+}
diff --git a/lib/cpp/src/transport/TChunkedTransport.h b/lib/cpp/src/transport/TChunkedTransport.h
new file mode 100644
index 0000000..07bdbb5
--- /dev/null
+++ b/lib/cpp/src/transport/TChunkedTransport.h
@@ -0,0 +1,76 @@
+#ifndef T_CHUNKED_TRANSPORT_H
+#define T_CHUNKED_TRANSPORT_H
+
+#include "transport/TTransport.h"
+#include <string>
+
+/**
+ * Chunked transport. All writes go into an in-memory buffer until flush is
+ * called, at which point the transport writes the length of the entire
+ * binary chunk followed by the data payload. This allows the receiver on the
+ * other end to always do fixed-length reads.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TChunkedTransport : public TTransport {
+ public:
+  TChunkedTransport(TTransport* transport) :
+    transport_(transport),
+    rPos_(0), rLen_(0),
+    wBufSize_(512), wLen_(0) {
+    rBuf_ = NULL;
+    wBuf_ = new uint8_t[wBufSize_];
+  }
+
+  TChunkedTransport(TTransport* transport, uint32_t sz) :
+    transport_(transport),
+    rPos_(0), rLen_(0),
+    wBufSize_(sz), wLen_(0) {
+    rBuf_ = NULL;
+    wBuf_ = new uint8_t[wBufSize_];
+  }
+
+  ~TChunkedTransport() {
+    if (rBuf_ != NULL) {
+      delete [] rBuf_;
+    }
+    if (wBuf_ != NULL) {
+      delete [] wBuf_;
+    }
+  }
+
+  bool isOpen() {
+    return transport_->isOpen();
+  }
+  
+  void open() {
+    transport_->open();
+  }
+
+  void close() {
+    transport_->close();
+  }
+  
+  uint32_t read(uint8_t* buf, uint32_t len);
+
+  void write(const uint8_t* buf, uint32_t len);
+
+  void flush();
+
+ protected:
+  TTransport* transport_;
+  uint8_t* rBuf_;
+  uint32_t rPos_;
+  uint32_t rLen_;
+
+  uint8_t* wBuf_;
+  uint32_t wBufSize_;
+  uint32_t wLen_;
+
+  /**
+   * Reads a chunk of input from the underlying stream.
+   */
+  void readChunk();
+};
+
+#endif
diff --git a/lib/php/src/Thrift.php b/lib/php/src/Thrift.php
new file mode 100644
index 0000000..8a25611
--- /dev/null
+++ b/lib/php/src/Thrift.php
@@ -0,0 +1,9 @@
+<?php
+
+if (!defined('THRIFT_ROOT')) {
+  define('THRIFT_ROOT', dirname(__FILE__));
+}
+
+include_once THRIFT_ROOT.'/protocol/TProtocol.php';
+
+?>
diff --git a/lib/php/src/protocol/TBinaryProtocol.php b/lib/php/src/protocol/TBinaryProtocol.php
new file mode 100644
index 0000000..867a8aa
--- /dev/null
+++ b/lib/php/src/protocol/TBinaryProtocol.php
@@ -0,0 +1,195 @@
+<?php
+
+/** For transport operations */
+require_once THRIFT_ROOT.'/transport/TTransport.php';
+
+/**
+ * Binary implementation of the Thrift protocol.
+ *
+ * @package thrift.protocol
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TBinaryProtocol extends TProtocol {
+
+  public function writeStructBegin($out, $name) {
+    return 0;
+  }
+
+  public function writeStructEnd($out) {
+    return 0;
+  }
+
+  public function writeFieldBegin($out, $fieldName, $fieldType, $fieldId) {
+    return
+      $this->writeByte($out, $fieldType) +
+      $this->writeI32($out, $fieldId);
+  }
+
+  public function writeFieldEnd($out) {
+    return 0;
+  } 
+
+  public function writeFieldStop($out) {
+    return
+      $this->writeByte($out, TType::STOP);
+  }
+
+  public function writeMapBegin($out, $keyType, $valType, $size) {
+    return
+      $this->writeByte($out, $keyType) +
+      $this->writeByte($out, $valType) +
+      $this->writeI32($out, $size);
+  }
+
+  public function writeMapEnd($out) {
+    return 0;
+  }
+
+  public function writeListBegin($out, $elemType, $size) {
+    return
+      $this->writeByte($out, $elemType) +
+      $this->writeI32($out, $size);
+  }
+
+  public function writeListEnd($out) {
+    return 0;
+  }
+
+  public function writeSetBegin($out, $elemType, $size) {
+    return
+      $this->writeByte($out, $elemType) +
+      $this->writeI32($out, $size);
+  }
+
+  public function writeSetEnd($out) {
+    return 0;
+  }
+
+  public function writeByte($out, $byte) {
+    $data = pack('c', $byte);
+    $out->write($data, 1);
+    return 1;
+  }
+
+  public function writeI32($out, $i32) {
+    $data = pack('l', $i32);
+  //if (!defined('BIG_ENDIAN')) {
+      $data = strrev($data);
+  //}
+    $out->write($data, 4);
+    return 4;
+  }
+
+  public function writeI64($out, $i64) {
+    $hi = $i64 >> 32;
+    $lo = $i64 & 0xFFFFFFFF;
+    if (!defined('BIG_ENDIAN')) {
+      $data = pack('N2', $hi, $lo);
+    } else {
+      $data = pack('N2', $lo, $hi);
+    }
+    $out->write($data, 8);
+    return 8;
+  }
+
+  public function writeString($out, $str) {
+    $len = strlen($str);
+    $result = $this->writeI32($out, $len);
+    $out->write($str, $len);
+    return $result + $len;
+  }
+
+  public function readStructBegin($in, &$name) {
+    $name = '';
+    return 0;
+  }
+
+  public function readStructEnd($in) {
+    return 0;
+  }
+
+  public function readFieldBegin($in, &$name, &$fieldType, &$fieldId) {
+    $result = $this->readByte($in, $fieldType);
+    if ($fieldType == TType::STOP) {
+      $fieldId = 0;
+      return $result;
+    }
+    $result += $this->readI32($in, $fieldId);
+    return $result;
+  }
+
+  public function readFieldEnd($in) {
+    return 0;
+  }
+
+  public function readMapBegin($in, &$keyType, &$valType, &$size) {
+    $result = $this->readByte($in, $keyType);
+    $result += $this->readByte($in, $valType);
+    $result += $this->readI32($in, $size);
+    return $result;
+  }
+
+  public function readMapEnd($in) {
+    return 0;
+  }
+
+  public function readListBegin($in, &$elemType, &$size) {
+    $result = $this->readByte($in, $elemType);
+    $result += $this->readI32($in, $size);
+    return $result;
+  }
+
+  public function readListEnd($in) {
+    return 0;
+  }
+
+  public function readSetBegin($in, &$elemType, &$size) {
+    $result = $this->readByte($in, $elemType);
+    $result += $this->readI32($in, $size);
+    return $result;
+  }
+
+  public function readSetEnd($in) {
+    return 0;
+  }
+
+  public function readByte($in, &$byte) {
+    $data = $in->readAll(1);
+    $arr = unpack('c', $data);
+    $byte = $arr[1];
+    return 1;
+  }
+
+  public function readI32($in, &$i32) {
+    $data = $in->readAll(4);
+    if (!defined('BIG_ENDIAN')) {
+      $data = strrev($data);
+    }
+    $arr = unpack('l', $data);
+    $i32 = $arr[1];
+    return 4;
+  }
+
+  public function readI64($in, &$i64) {
+    $data = $in->readAll(8);
+    $arr = unpack('N2', $data);
+
+    // Check for a negative
+    if ($arr[1] & 0x80000000) {
+      $arr[1] = $arr[1] ^ 0xFFFFFFFF;
+      $arr[2] = $arr[2] ^ 0xFFFFFFFF;
+      $i64 = 0 - $arr[1]*4294967296 - $arr[2] - 1;
+    } else {
+      $i64 = $arr[1]*4294967296 + $arr[2];
+    }
+    return 8;
+  }
+
+  public function readString($in, &$str) {
+    $result = $this->readI32($in, $len);
+    $str = $in->readAll($len);
+    return $result + $len;
+  }
+}
+
+?>
\ No newline at end of file
diff --git a/lib/php/src/protocol/TProtocol.php b/lib/php/src/protocol/TProtocol.php
new file mode 100644
index 0000000..5a69bbd
--- /dev/null
+++ b/lib/php/src/protocol/TProtocol.php
@@ -0,0 +1,157 @@
+<?php
+
+/** Types */
+require_once THRIFT_ROOT.'/protocol/TType.php';
+
+/**
+ * Protocol module.
+ *
+ * @package thrift.protocol
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+abstract class TProtocol {
+
+  /**
+   * Writes a struct header.
+   *
+   * @param TTransport $out  Output transport
+   * @param string     $name Struct name
+   * @throws TException on write error
+   * @return int How many bytes written
+   */
+  public abstract function writeStructBegin($out, $name);
+
+
+  /**
+   * Close a struct.
+   *
+   * @param TTransport $out Output transport
+   * @throws TException on write error
+   * @return int How many bytes written
+   */
+  public abstract function writeStructEnd($out);
+
+  /*
+   * Starts a field.
+   *
+   * @param TTransport $out  Output transport
+   * @param string     $name Field name
+   * @param int        $type Field type
+   * @param int        $fid  Field id
+   * @throws TException on write error
+   * @return int How many bytes written
+   */
+  public abstract function writeFieldBegin($out, $fieldName, $fieldType, $fieldId);
+
+  public abstract function writeFieldEnd($out);
+
+  public abstract function writeFieldStop($out);
+
+  public abstract function writeMapBegin($out, $keyType, $valType, $size);
+
+  public abstract function writeMapEnd($out);
+  
+  public abstract function writeListBegin($out, $elemType, $size);
+  
+  public abstract function writeListEnd($out);
+
+  public abstract function writeSetBegin($out, $elemType, $size);
+
+  public abstract function writeSetEnd($out);
+  
+  public abstract function writeByte($out, $byte);
+  
+  public abstract function writeI32($out, $i32);
+
+  public abstract function writeI64($out, $i64);
+
+  public abstract function writeString($out, $str);
+
+
+  public abstract function readStructBegin($in, &$name);
+  
+  public abstract function readStructEnd($in);
+
+  public abstract function readFieldBegin($in, &$name, &$fieldType, &$fieldId);
+
+  public abstract function readFieldEnd($in);
+
+  public abstract function readMapBegin($in, &$keyType, &$valType, &$size);
+
+  public abstract function readMapEnd($in);
+
+  public abstract function readListBegin($in, &$elemType, &$size);
+  
+  public abstract function readListEnd($in);
+
+  public abstract function readSetBegin($in, &$elemType, &$size);
+  
+  public abstract function readSetEnd($in);
+
+  public abstract function readByte($in, &$byte);
+  
+  public abstract function readI32($in, &$i32);
+
+  public abstract function readI64($in, &$i64);
+
+  public abstract function readString($in, &$str);
+
+  public function skip($in, $type) {
+    switch ($type) {
+    case TType::BYTE:
+      return $this->readByte($in, $byte);
+    case TType::I32:
+      return $this->readI32($in, $i32);
+    case TType::I64:
+      return $this->readI64($in, $i64);
+    case TType::STRING:
+      return $this->readString($in, $str);
+    case TType::STRUCT:
+      {
+        $result = $this->readStructBegin($in, $name);
+        while (true) {
+          $result += $this->readFieldBegin($in, $name, $ftype, $fid);
+          if ($ftype == TType::STOP) {
+            break;
+          }
+          $result += $this->skip($in, $ftype);
+          $result += $this->readFieldEnd($in);
+        }
+        $result += $this->readStructEnd($in);
+        return $result;
+      }
+    case TType::MAP:
+      {
+        $result = $this->readMapBegin($in, $keyType, $valType, $size);
+        for ($i = 0; $i < $size; $i++) {
+          $result += $this->skip($in, $keyType);
+          $result += $this->skip($in, $valType);
+        }
+        $result += $this->readMapEnd($in);
+        return $result;
+      }
+    case TType::SET:
+      {
+        $result = $this->readSetBegin($in, $elemType, $size);
+        for ($i = 0; $i < $size; $i++) {
+          $result += $this->skip($in, $elemType);
+        }
+        $result += $this->readSetEnd($in);
+        return $result;
+      }
+    case TType::LST:
+      {
+        $result = $this->readListBegin($in, $elemType, $size);
+        for ($i = 0; $i < $size; $i++) {
+          $result += $this->skip($in, $elemType);
+        }
+        $result += $this->readListEnd($in);
+        return $result;
+      }
+    default:
+      return 0;
+    }
+  }
+}
+
+?>
diff --git a/lib/php/src/protocol/TType.php b/lib/php/src/protocol/TType.php
new file mode 100644
index 0000000..957efe6
--- /dev/null
+++ b/lib/php/src/protocol/TType.php
@@ -0,0 +1,19 @@
+<?php
+
+/**
+ * Constants for Thrift data types.
+ *
+ * @package thrift.protocol
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TType {
+  const STOP = 1;
+  const BYTE = 2;
+  const I32 = 6;
+  const I64 = 8;
+  const STRING = 9;
+  const STRUCT = 10;
+  const MAP = 11;
+  const SET = 12;
+  const LST = 13; // cannot use LIST keyword in PHP!
+}
diff --git a/lib/php/src/transport/TBufferedTransport.php b/lib/php/src/transport/TBufferedTransport.php
new file mode 100644
index 0000000..dad96ff
--- /dev/null
+++ b/lib/php/src/transport/TBufferedTransport.php
@@ -0,0 +1,108 @@
+<?php
+
+/**
+ * Buffered transport. Stores data to an internal buffer that it doesn't
+ * actually write out until flush is called. For reading, we do a greedy
+ * read and then serve data out of the internal buffer.
+ *
+ * @package thrift.transport
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TBufferedTransport extends TTransport {
+
+  /**
+   * Constructor. Creates a buffered transport around an underlying transport
+   */
+  public function __construct($transport=null, $rBufSize=512, $wBufSize=512) {
+    $this->transport_ = $transport;
+    $this->rBufSize_ = $rBufSize;
+    $this->wBufSize_ = $wBufSize;
+  }
+
+  /**
+   * The underlying transport
+   *
+   * @var TTransport
+   */
+  protected $transport_ = null;
+
+  /**
+   * The receive buffer size
+   *
+   * @var int
+   */
+  protected $rBufSize_ = 512;
+
+  /**
+   * The write buffer size
+   *
+   * @var int
+   */
+  protected $wBufSize_ = 512;
+
+  /**
+   * The write buffer.
+   *
+   * @var string
+   */
+  protected $wBuf_ = '';
+
+  /**
+   * The read buffer.
+   *
+   * @var string
+   */
+  protected $rBuf_ = '';
+
+  public function isOpen() {
+    return $this->transport_->isOpen();
+  }
+
+  public function open() {
+    $this->transport_->open();
+  }
+
+  public function close() {
+    $this->transport_->close();
+  }
+
+  public function readAll($len) {
+    return $this->transport_->readAll($len);
+  }
+  
+  public function read($len) {
+    // Methinks PHP is already buffering these for us
+    return $this->transport_->read($len);
+
+    if (strlen($this->rBuf_) >= $len) {
+      $ret = substr($this->rBuf_, 0, $len);
+      $this->rBuf_ = substr($this->rBuf_, $len);
+      return $ret;
+    }
+
+    $this->rBuf_ .= $this->transport_->read($this->rBufSize_);
+    $give = min(strlen($this->rBuf_), $len);
+    $ret = substr($this->rBuf_, 0, $give);
+    $this->rBuf_ = substr($this->rBuf_, $give);
+    return $ret;
+  }
+
+  public function write($buf) {
+    $this->wBuf_ .= $buf;
+    if (strlen($this->wBuf_) >= $this->wBufSize_) {
+      $this->transport_->write($this->wBuf_);
+      $this->wBuf_ = '';
+    }
+  }
+
+  public function flush() {
+    if (!empty($this->wBuf_)) {
+      $this->transport_->write($this->wBuf_);
+      $this->wBuf_ = '';
+    }
+  }
+
+}
+
+?>
+
diff --git a/lib/php/src/transport/TChunkedTransport.php b/lib/php/src/transport/TChunkedTransport.php
new file mode 100644
index 0000000..04fba1f
--- /dev/null
+++ b/lib/php/src/transport/TChunkedTransport.php
@@ -0,0 +1,106 @@
+<?php
+
+/**
+ * Chunked transport. Writes and reads data in chunks that are stamped with
+ * their length.
+ *
+ * @package thrift.transport
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TChunkedTransport extends TTransport {
+
+  /**
+   * Underlying transport object.
+   *
+   * @var TTransport
+   */
+  private $transport_;
+
+  /**
+   * Buffer for read data.
+   *
+   * @var string
+   */
+  private $rBuf_;
+
+  /**
+   * Buffer for queued output data
+   *
+   * @var string
+   */
+  private $wBuf_;
+
+  /**
+   * Constructor.
+   *
+   * @param TTransport $transport Underlying transport
+   */
+  public __construct($transport=null) {
+    $this->transport_ = $transport;
+  }
+
+  /**
+   * Reads from the buffer. When more data is required reads another entire
+   * chunk and serves future reads out of that.
+   *
+   * @param int $len How much data
+   */
+  public function read($len) {
+    $out = '';
+    $need = $len;
+    $have = strlen($this->rBuf_);
+    if ($need > $have) {
+      $out = $this->rBuf_;
+      $need -= $have;
+      $this->readChunk();
+    }
+
+    $give = $need;
+    if (strlen($this->rBuf_) < $give) {
+      $out .= $this->rBuf_;
+      $this->rBuf_ = '';
+    } else {
+      $out .= substr($this->rBuf_, 0, $give);
+      $this->rBuf_ = substr($this->rBuf_, $give);
+    }
+
+    return $out;
+  }
+
+  /**
+   * Reads a chunk of data into the internal read buffer.
+   */
+  private function readChunk() {
+    $buf = $this->transport_->readAll(4);
+    $val = unpack('N', $buf);
+    $sz = $val[1];
+
+    $this->rBuf_ = $this->transport_->readAll($sz);
+  }
+
+  /**
+   * Writes some data to the pending output buffer.
+   *
+   * @param string $buf The data
+   * @param int    $len Limit of bytes to write
+   */
+  public function write($buf, $len=null) {
+    if ($len !== null && $len < strlen($buf)) {
+      $buf = substr($buf, 0, $len);
+    }
+    $this->wBuf_ .= $buf;
+  }
+
+  /**
+   * Writes the output buffer to the stream in the format of a 4-byte length
+   * followed by the actual data.
+   */
+  public function flush() {
+    $out = pack('N', strlen($this->wBuf_));
+    $out .= $this->wBuf_;
+    $this->transport_->write($out);
+    $this->transport_->flush();
+    $this->wBuf_ = '';
+  }
+
+}
\ No newline at end of file
diff --git a/lib/php/src/transport/TSocket.php b/lib/php/src/transport/TSocket.php
new file mode 100644
index 0000000..0a3b090
--- /dev/null
+++ b/lib/php/src/transport/TSocket.php
@@ -0,0 +1,126 @@
+<?php
+
+/**
+ * Sockets implementation of the TTransport interface.
+ *
+ * @package thrift.transport
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TSocket extends TTransport {
+
+  /**
+   * Handle to PHP socket
+   *
+   * @var resource
+   */
+  private $handle_ = null;
+
+  /**
+   * Remote hostname
+   * 
+   * @var string
+   */
+  private $host_ = 'localhost';
+
+  /**
+   * Remote port
+   *
+   * @var int
+   */
+  private $port_ = '9090';
+
+  /**
+   * Persistent socket or plain?
+   *
+   * @var bool
+   */
+  private $persist_ = false;
+
+  /**
+   * Socket constructor
+   *
+   * @param string $host    Remote hostname
+   * @param int    $port    Remote port
+   * @param bool   $persist Whether to use a persistent socket
+   */
+  public function __construct($host='localhost', $port=9090, $persist=false) {
+    $this->host_ = $host;
+    $this->port_ = $port;
+    $this->persist_ = $persist;
+  }
+
+  /**
+   * Tests whether this is open
+   *
+   * @return bool true if the socket is open
+   */
+  public function isOpen() {
+    return is_resource($this->handle_);
+  }
+
+  /**
+   * Connects the socket.
+   */
+  public function open() {
+    if ($this->persist_) {
+      $this->handle_ = pfsockopen($this->host_, $this->port_);
+    } else {
+      $this->handle_ = fsockopen($this->host_, $this->port_);
+    }
+    if ($this->handle_ === FALSE) {      
+      throw new Exception('TSocket: Could not connect to '.
+                          $this->host_.':'.$this->port_);
+    }
+  }
+
+  /**
+   * Closes the socket
+   */
+  public function close() {
+    if (!$this->persist_) {
+      fclose($this->handle_);
+    }
+  }
+  
+  /**
+   * Uses stream get contents to do the reading
+   */
+  public function readAll($len) {
+    return stream_get_contents($this->handle_, $len);
+  }
+
+  /**
+   * Read from the socket
+   */
+  public function read($len) {
+    $data = fread($this->handle_, 1);
+    if ($data === FALSE) {
+      throw new Exception('TSocket: Could not read '.$len.' bytes from '.
+                          $this->host_.':'.$this->port_);
+    }
+    return $data;
+  }
+
+  /**
+   * Write to the socket.
+   */
+  public function write($buf) {
+    while (!empty($buf)) {
+      $got = fwrite($this->handle_, $buf);
+      if ($got == false) {
+        throw new Exception('TSocket: Could not write '.strlen($buf).' bytes '.
+                            $this->host_.':'.$this->port_);
+      }
+      $buf = substr($buf, $got);
+    }
+  }
+
+  /**
+   * Flush output to the socket.
+   */
+  public function flush() {
+    fflush($this->handle_);
+  }
+}
+
+?>
diff --git a/lib/php/src/transport/TTransport.php b/lib/php/src/transport/TTransport.php
new file mode 100644
index 0000000..8d57d2a
--- /dev/null
+++ b/lib/php/src/transport/TTransport.php
@@ -0,0 +1,72 @@
+<?php
+
+/**
+ * Base interface for a transport agent.
+ *
+ * @package thrift.transport
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+abstract class TTransport {
+
+  /**
+   * Whether this transport is open.
+   *
+   * @return boolean true if open
+   */
+  public abstract function isOpen();
+
+  /**
+   * Open the transport for reading/writing
+   *
+   * @throws TTransportException if cannot open
+   */
+  public abstract function open();
+
+  /**
+   * Close the transport.
+   */
+  public abstract function close();
+
+  /**
+   * Read some data into the array.
+   *
+   * @param int    $len How much to read
+   * @return string The data that has been read
+   * @throws TTransportException if cannot read any more data
+   */
+  public abstract function read($len);
+
+  /**
+   * Guarantees that the full amount of data is read.
+   *
+   * @return string The data, of exact length
+   * @throws TTransportException if cannot read data
+   */
+  public function readAll($len) {
+    // return $this->read($len);
+
+    $data = '';
+    $got = 0;
+    while (($got = strlen($data)) < $len) {
+      $data .= $this->read($len - $got);
+    }
+    return $data;
+  }
+
+  /**
+   * Writes the given data out.
+   *
+   * @param string $buf  The data to write
+   * @throws TTransportException if writing fails
+   */
+  public abstract function write($buf);
+
+  /**
+   * Flushes any pending data out of a buffer
+   *
+   * @throws TTransportException if a writing error occurs
+   */
+  public function flush() {}
+}
+
+?>