Thrift now works in PHP, hot stuff

Summary: End to end communication working in Thrift with PHP

Problem: It's a bit slower than pillar still. Need to find out why.

Reviewed By: aditya

Test Plan: Unit tests are in the test directory. Get lucas on the PHP case...




git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664720 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/Makefile b/lib/cpp/Makefile
index 15664fa..8403d7c 100644
--- a/lib/cpp/Makefile
+++ b/lib/cpp/Makefile
@@ -17,6 +17,7 @@
 # Source files
 SRCS  = src/protocol/TBinaryProtocol.cc \
 	src/transport/TBufferedTransport.cc \
+	src/transport/TChunkedTransport.cc \
 	src/transport/TSocket.cc \
 	src/transport/TServerSocket.cc \
 	src/server/TSimpleServer.cc
diff --git a/lib/cpp/src/protocol/TBinaryProtocol.cc b/lib/cpp/src/protocol/TBinaryProtocol.cc
index 666a6a4..6ac028a 100644
--- a/lib/cpp/src/protocol/TBinaryProtocol.cc
+++ b/lib/cpp/src/protocol/TBinaryProtocol.cc
@@ -240,8 +240,12 @@
   uint32_t result;
   int32_t size;
   result = readI32(in, size);
-  uint8_t b[size];
+
+  // Use the heap here to prevent stack overflow for v. large strings
+  uint8_t *b = new uint8_t[size];
   in->readAll(b, size);
   str = string((char*)b, size);
+  delete [] b;
+
   return result + (uint32_t)size;
 }
diff --git a/lib/cpp/src/transport/TBufferedTransport.cc b/lib/cpp/src/transport/TBufferedTransport.cc
index 3fccc58..d7ce56a 100644
--- a/lib/cpp/src/transport/TBufferedTransport.cc
+++ b/lib/cpp/src/transport/TBufferedTransport.cc
@@ -7,7 +7,7 @@
   // We don't have enough data yet
   if (rLen_-rPos_ < need) {
     // Copy out whatever we have
-    if (rLen_ > 0) {
+    if (rLen_-rPos_ > 0) {
       memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
       need -= rLen_-rPos_;
       buf += rLen_-rPos_;
diff --git a/lib/cpp/src/transport/TBufferedTransport.h b/lib/cpp/src/transport/TBufferedTransport.h
index 991b50c..43c31f1 100644
--- a/lib/cpp/src/transport/TBufferedTransport.h
+++ b/lib/cpp/src/transport/TBufferedTransport.h
@@ -53,6 +53,10 @@
   void close() {
     transport_->close();
   }
+
+  uint32_t readAll(uint8_t* buf, uint32_t len) {
+    return transport_->readAll(buf, len);
+  }
   
   uint32_t read(uint8_t* buf, uint32_t len);
 
diff --git a/lib/cpp/src/transport/TChunkedTransport.cc b/lib/cpp/src/transport/TChunkedTransport.cc
new file mode 100644
index 0000000..f35d747
--- /dev/null
+++ b/lib/cpp/src/transport/TChunkedTransport.cc
@@ -0,0 +1,97 @@
+#include "TChunkedTransport.h"
+using std::string;
+
+uint32_t TChunkedTransport::read(uint8_t* buf, uint32_t len) {
+  uint32_t need = len;
+
+  // We don't have enough data yet
+  if (rLen_-rPos_ < need) {
+    // Copy out whatever we have
+    if (rLen_-rPos_ > 0) {
+      memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
+      need -= rLen_-rPos_;
+      buf += rLen_-rPos_;
+    }
+
+    // Read another chunk
+    readChunk();
+  }
+  
+  // Hand over whatever we have
+  uint32_t give = need;
+  if (rLen_-rPos_ < give) {
+    give = rLen_-rPos_;
+  }
+  memcpy(buf, rBuf_+rPos_, give);
+  rPos_ += give;
+  need -= give;
+  return (len - need);
+}
+
+void TChunkedTransport::readChunk() {
+  // Get rid of the old chunk
+  if (rBuf_ != NULL) {
+    delete [] rBuf_;
+    rBuf_ = NULL;
+  }
+
+  // Read in the next chunk size
+  int32_t sz;
+  transport_->readAll((uint8_t*)&sz, 4);
+
+  if (sz < 0) {
+    throw new TTransportException("Next chunk has negative size");
+  }
+
+  // Read the chunk payload, reset markers
+  rBuf_ = new uint8_t[sz];
+  transport_->readAll(rBuf_, sz);
+  rPos_ = 0;
+  rLen_ = sz;
+}
+
+void TChunkedTransport::write(const uint8_t* buf, uint32_t len) {
+  if (len == 0) {
+    return;
+  }
+
+  // Need to grow the buffer
+  if (len + wLen_ >= wBufSize_) {
+
+    // Double buffer size until sufficient
+    while (wBufSize_ < len + wLen_) {
+      wBufSize_ *= 2;
+    }
+
+    // Allocate new buffer
+    uint8_t* wBuf2 = new uint8_t[wBufSize_];
+
+    // Copy the old buffer to the new one
+    memcpy(wBuf2, wBuf_, wLen_);
+   
+    // Now point buf to the new one
+    delete [] wBuf_;
+    wBuf_ = wBuf2;
+  }
+
+  // Copy data into buffer
+  memcpy(wBuf_ + wLen_, buf, len);
+  wLen_ += len;
+}
+
+void TChunkedTransport::flush()  {
+  // Write chunk size
+  int32_t sz = wLen_;
+  transport_->write((const uint8_t*)&sz, 4);
+  
+  // Write chunk body
+  if (sz > 0) {
+    transport_->write(wBuf_, wLen_);
+  }
+
+  // All done
+  wLen_ = 0;
+
+  // Flush the underlying
+  transport_->flush();
+}
diff --git a/lib/cpp/src/transport/TChunkedTransport.h b/lib/cpp/src/transport/TChunkedTransport.h
new file mode 100644
index 0000000..07bdbb5
--- /dev/null
+++ b/lib/cpp/src/transport/TChunkedTransport.h
@@ -0,0 +1,76 @@
+#ifndef T_CHUNKED_TRANSPORT_H
+#define T_CHUNKED_TRANSPORT_H
+
+#include "transport/TTransport.h"
+#include <string>
+
+/**
+ * Chunked transport. All writes go into an in-memory buffer until flush is
+ * called, at which point the transport writes the length of the entire
+ * binary chunk followed by the data payload. This allows the receiver on the
+ * other end to always do fixed-length reads.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TChunkedTransport : public TTransport {
+ public:
+  TChunkedTransport(TTransport* transport) :
+    transport_(transport),
+    rPos_(0), rLen_(0),
+    wBufSize_(512), wLen_(0) {
+    rBuf_ = NULL;
+    wBuf_ = new uint8_t[wBufSize_];
+  }
+
+  TChunkedTransport(TTransport* transport, uint32_t sz) :
+    transport_(transport),
+    rPos_(0), rLen_(0),
+    wBufSize_(sz), wLen_(0) {
+    rBuf_ = NULL;
+    wBuf_ = new uint8_t[wBufSize_];
+  }
+
+  ~TChunkedTransport() {
+    if (rBuf_ != NULL) {
+      delete [] rBuf_;
+    }
+    if (wBuf_ != NULL) {
+      delete [] wBuf_;
+    }
+  }
+
+  bool isOpen() {
+    return transport_->isOpen();
+  }
+  
+  void open() {
+    transport_->open();
+  }
+
+  void close() {
+    transport_->close();
+  }
+  
+  uint32_t read(uint8_t* buf, uint32_t len);
+
+  void write(const uint8_t* buf, uint32_t len);
+
+  void flush();
+
+ protected:
+  TTransport* transport_;
+  uint8_t* rBuf_;
+  uint32_t rPos_;
+  uint32_t rLen_;
+
+  uint8_t* wBuf_;
+  uint32_t wBufSize_;
+  uint32_t wLen_;
+
+  /**
+   * Reads a chunk of input from the underlying stream.
+   */
+  void readChunk();
+};
+
+#endif