Thrift now works in PHP, hot stuff
Summary: End to end communication working in Thrift with PHP
Problem: It's a bit slower than pillar still. Need to find out why.
Reviewed By: aditya
Test Plan: Unit tests are in the test directory. Get lucas on the PHP case...
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664720 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/Makefile b/lib/cpp/Makefile
index 15664fa..8403d7c 100644
--- a/lib/cpp/Makefile
+++ b/lib/cpp/Makefile
@@ -17,6 +17,7 @@
# Source files
SRCS = src/protocol/TBinaryProtocol.cc \
src/transport/TBufferedTransport.cc \
+ src/transport/TChunkedTransport.cc \
src/transport/TSocket.cc \
src/transport/TServerSocket.cc \
src/server/TSimpleServer.cc
diff --git a/lib/cpp/src/protocol/TBinaryProtocol.cc b/lib/cpp/src/protocol/TBinaryProtocol.cc
index 666a6a4..6ac028a 100644
--- a/lib/cpp/src/protocol/TBinaryProtocol.cc
+++ b/lib/cpp/src/protocol/TBinaryProtocol.cc
@@ -240,8 +240,12 @@
uint32_t result;
int32_t size;
result = readI32(in, size);
- uint8_t b[size];
+
+ // Use the heap here to prevent stack overflow for v. large strings
+ uint8_t *b = new uint8_t[size];
in->readAll(b, size);
str = string((char*)b, size);
+ delete [] b;
+
return result + (uint32_t)size;
}
diff --git a/lib/cpp/src/transport/TBufferedTransport.cc b/lib/cpp/src/transport/TBufferedTransport.cc
index 3fccc58..d7ce56a 100644
--- a/lib/cpp/src/transport/TBufferedTransport.cc
+++ b/lib/cpp/src/transport/TBufferedTransport.cc
@@ -7,7 +7,7 @@
// We don't have enough data yet
if (rLen_-rPos_ < need) {
// Copy out whatever we have
- if (rLen_ > 0) {
+ if (rLen_-rPos_ > 0) {
memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
need -= rLen_-rPos_;
buf += rLen_-rPos_;
diff --git a/lib/cpp/src/transport/TBufferedTransport.h b/lib/cpp/src/transport/TBufferedTransport.h
index 991b50c..43c31f1 100644
--- a/lib/cpp/src/transport/TBufferedTransport.h
+++ b/lib/cpp/src/transport/TBufferedTransport.h
@@ -53,6 +53,10 @@
void close() {
transport_->close();
}
+
+ uint32_t readAll(uint8_t* buf, uint32_t len) {
+ return transport_->readAll(buf, len);
+ }
uint32_t read(uint8_t* buf, uint32_t len);
diff --git a/lib/cpp/src/transport/TChunkedTransport.cc b/lib/cpp/src/transport/TChunkedTransport.cc
new file mode 100644
index 0000000..f35d747
--- /dev/null
+++ b/lib/cpp/src/transport/TChunkedTransport.cc
@@ -0,0 +1,97 @@
+#include "TChunkedTransport.h"
+using std::string;
+
+uint32_t TChunkedTransport::read(uint8_t* buf, uint32_t len) {
+ uint32_t need = len;
+
+ // We don't have enough data yet
+ if (rLen_-rPos_ < need) {
+ // Copy out whatever we have
+ if (rLen_-rPos_ > 0) {
+ memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
+ need -= rLen_-rPos_;
+ buf += rLen_-rPos_;
+ }
+
+ // Read another chunk
+ readChunk();
+ }
+
+ // Hand over whatever we have
+ uint32_t give = need;
+ if (rLen_-rPos_ < give) {
+ give = rLen_-rPos_;
+ }
+ memcpy(buf, rBuf_+rPos_, give);
+ rPos_ += give;
+ need -= give;
+ return (len - need);
+}
+
+void TChunkedTransport::readChunk() {
+ // Get rid of the old chunk
+ if (rBuf_ != NULL) {
+ delete [] rBuf_;
+ rBuf_ = NULL;
+ }
+
+ // Read in the next chunk size
+ int32_t sz;
+ transport_->readAll((uint8_t*)&sz, 4);
+
+ if (sz < 0) {
+ throw new TTransportException("Next chunk has negative size");
+ }
+
+ // Read the chunk payload, reset markers
+ rBuf_ = new uint8_t[sz];
+ transport_->readAll(rBuf_, sz);
+ rPos_ = 0;
+ rLen_ = sz;
+}
+
+void TChunkedTransport::write(const uint8_t* buf, uint32_t len) {
+ if (len == 0) {
+ return;
+ }
+
+ // Need to grow the buffer
+ if (len + wLen_ >= wBufSize_) {
+
+ // Double buffer size until sufficient
+ while (wBufSize_ < len + wLen_) {
+ wBufSize_ *= 2;
+ }
+
+ // Allocate new buffer
+ uint8_t* wBuf2 = new uint8_t[wBufSize_];
+
+ // Copy the old buffer to the new one
+ memcpy(wBuf2, wBuf_, wLen_);
+
+ // Now point buf to the new one
+ delete [] wBuf_;
+ wBuf_ = wBuf2;
+ }
+
+ // Copy data into buffer
+ memcpy(wBuf_ + wLen_, buf, len);
+ wLen_ += len;
+}
+
+void TChunkedTransport::flush() {
+ // Write chunk size
+ int32_t sz = wLen_;
+ transport_->write((const uint8_t*)&sz, 4);
+
+ // Write chunk body
+ if (sz > 0) {
+ transport_->write(wBuf_, wLen_);
+ }
+
+ // All done
+ wLen_ = 0;
+
+ // Flush the underlying
+ transport_->flush();
+}
diff --git a/lib/cpp/src/transport/TChunkedTransport.h b/lib/cpp/src/transport/TChunkedTransport.h
new file mode 100644
index 0000000..07bdbb5
--- /dev/null
+++ b/lib/cpp/src/transport/TChunkedTransport.h
@@ -0,0 +1,76 @@
+#ifndef T_CHUNKED_TRANSPORT_H
+#define T_CHUNKED_TRANSPORT_H
+
+#include "transport/TTransport.h"
+#include <string>
+
+/**
+ * Chunked transport. All writes go into an in-memory buffer until flush is
+ * called, at which point the transport writes the length of the entire
+ * binary chunk followed by the data payload. This allows the receiver on the
+ * other end to always do fixed-length reads.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TChunkedTransport : public TTransport {
+ public:
+ TChunkedTransport(TTransport* transport) :
+ transport_(transport),
+ rPos_(0), rLen_(0),
+ wBufSize_(512), wLen_(0) {
+ rBuf_ = NULL;
+ wBuf_ = new uint8_t[wBufSize_];
+ }
+
+ TChunkedTransport(TTransport* transport, uint32_t sz) :
+ transport_(transport),
+ rPos_(0), rLen_(0),
+ wBufSize_(sz), wLen_(0) {
+ rBuf_ = NULL;
+ wBuf_ = new uint8_t[wBufSize_];
+ }
+
+ ~TChunkedTransport() {
+ if (rBuf_ != NULL) {
+ delete [] rBuf_;
+ }
+ if (wBuf_ != NULL) {
+ delete [] wBuf_;
+ }
+ }
+
+ bool isOpen() {
+ return transport_->isOpen();
+ }
+
+ void open() {
+ transport_->open();
+ }
+
+ void close() {
+ transport_->close();
+ }
+
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ void write(const uint8_t* buf, uint32_t len);
+
+ void flush();
+
+ protected:
+ TTransport* transport_;
+ uint8_t* rBuf_;
+ uint32_t rPos_;
+ uint32_t rLen_;
+
+ uint8_t* wBuf_;
+ uint32_t wBufSize_;
+ uint32_t wLen_;
+
+ /**
+ * Reads a chunk of input from the underlying stream.
+ */
+ void readChunk();
+};
+
+#endif