Change Thrift c++ to new protocol wrapping transport model
Summary: Also cleaned up excessive .h/.cpp files into Utils files
Reviewed By: aditya
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664838 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index 134325c..0844048 100644
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -10,13 +10,11 @@
src/concurrency/ThreadManager.cpp \
src/concurrency/TimerManager.cpp \
src/protocol/TBinaryProtocol.cpp \
- src/transport/TBufferedTransport.cpp \
src/transport/TBufferedFileWriter.cpp \
src/transport/TBufferedRouterTransport.cpp \
- src/transport/TFramedTransport.cpp \
- src/transport/TMemoryBuffer.cpp \
src/transport/TSocket.cpp \
src/transport/TServerSocket.cpp \
+ src/transport/TTransportUtils.cpp \
src/server/TSimpleServer.cpp \
src/server/TThreadPoolServer.cpp \
src/server/TNonblockingServer.cpp
@@ -52,19 +50,14 @@
include_transportdir = $(include_thriftdir)/transport
include_transport_HEADERS = \
- src/transport/TBufferedTransport.h \
- src/transport/TFramedTransport.h \
- src/transport/TNullTransport.h \
- src/transport/TMemoryBuffer.h \
+ src/transport/TBufferedFileWriter.h \
+ src/transport/TBufferedRouterTransport.h \
src/transport/TServerSocket.h \
src/transport/TServerTransport.h \
src/transport/TSocket.h \
src/transport/TTransport.h \
src/transport/TTransportException.h \
- src/transport/TTransportFactory.h \
- src/transport/TBufferedTransportFactory.h \
- src/transport/TBufferedFileWriter.h \
- src/transport/TBufferedRouterTransport.h
+ src/transport/TTransportUtils.h
include_serverdir = $(include_thriftdir)/server
include_server_HEADERS = \
diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h
index f905b1d..8ce9285 100644
--- a/lib/cpp/src/TProcessor.h
+++ b/lib/cpp/src/TProcessor.h
@@ -2,14 +2,14 @@
#define _THRIFT_TPROCESSOR_H_ 1
#include <string>
-#include <transport/TTransport.h>
+#include <protocol/TProtocol.h>
#include <boost/shared_ptr.hpp>
namespace facebook { namespace thrift {
using namespace boost;
-using namespace facebook::thrift::transport;
+using namespace facebook::thrift::protocol;
/**
* A processor is a generic object that acts upon two streams of data, one
@@ -22,8 +22,13 @@
class TProcessor {
public:
virtual ~TProcessor() {}
- virtual bool process(shared_ptr<TTransport> in, shared_ptr<TTransport> out) = 0;
- bool process(shared_ptr<TTransport> io) { return process(io, io); }
+
+ virtual bool process(shared_ptr<TProtocol> in,
+ shared_ptr<TProtocol> out) = 0;
+
+ bool process(shared_ptr<TProtocol> io) {
+ return process(io, io);
+ }
protected:
TProcessor() {}
diff --git a/lib/cpp/src/protocol/TBinaryProtocol.cpp b/lib/cpp/src/protocol/TBinaryProtocol.cpp
index 9fb2ec3..481012c 100644
--- a/lib/cpp/src/protocol/TBinaryProtocol.cpp
+++ b/lib/cpp/src/protocol/TBinaryProtocol.cpp
@@ -4,121 +4,109 @@
namespace facebook { namespace thrift { namespace protocol {
-uint32_t TBinaryProtocol::writeMessageBegin(shared_ptr<TTransport> out,
- const std::string name,
+uint32_t TBinaryProtocol::writeMessageBegin(const std::string name,
const TMessageType messageType,
- const int32_t seqid) const {
+ const int32_t seqid) {
return
- writeString(out, name) +
- writeByte(out, (int8_t)messageType) +
- writeI32(out, seqid);
+ writeString(name) +
+ writeByte((int8_t)messageType) +
+ writeI32(seqid);
}
-uint32_t TBinaryProtocol::writeMessageEnd(shared_ptr<TTransport> out) const {
+uint32_t TBinaryProtocol::writeMessageEnd() {
return 0;
}
-uint32_t TBinaryProtocol::writeStructBegin(shared_ptr<TTransport> out,
- const string& name) const {
+uint32_t TBinaryProtocol::writeStructBegin(const string& name) {
return 0;
}
-uint32_t TBinaryProtocol::writeStructEnd(shared_ptr<TTransport> out) const {
+uint32_t TBinaryProtocol::writeStructEnd() {
return 0;
}
-uint32_t TBinaryProtocol::writeFieldBegin(shared_ptr<TTransport> out,
- const string& name,
+uint32_t TBinaryProtocol::writeFieldBegin(const string& name,
const TType fieldType,
- const int16_t fieldId) const {
+ const int16_t fieldId) {
return
- writeByte(out, (int8_t)fieldType) +
- writeI16(out, fieldId);
+ writeByte((int8_t)fieldType) +
+ writeI16(fieldId);
}
-uint32_t TBinaryProtocol::writeFieldEnd(shared_ptr<TTransport> out) const {
+uint32_t TBinaryProtocol::writeFieldEnd() {
return 0;
}
-uint32_t TBinaryProtocol::writeFieldStop(shared_ptr<TTransport> out) const {
+uint32_t TBinaryProtocol::writeFieldStop() {
return
- writeByte(out, (int8_t)T_STOP);
+ writeByte((int8_t)T_STOP);
}
-uint32_t TBinaryProtocol::writeMapBegin(shared_ptr<TTransport> out,
- const TType keyType,
+uint32_t TBinaryProtocol::writeMapBegin(const TType keyType,
const TType valType,
- const uint32_t size) const {
+ const uint32_t size) {
return
- writeByte(out, (int8_t)keyType) +
- writeByte(out, (int8_t)valType) +
- writeI32(out, (int32_t)size);
+ writeByte((int8_t)keyType) +
+ writeByte((int8_t)valType) +
+ writeI32((int32_t)size);
}
-uint32_t TBinaryProtocol::writeMapEnd(shared_ptr<TTransport> out) const {
+uint32_t TBinaryProtocol::writeMapEnd() {
return 0;
}
-uint32_t TBinaryProtocol::writeListBegin(shared_ptr<TTransport> out,
- const TType elemType,
- const uint32_t size) const {
+uint32_t TBinaryProtocol::writeListBegin(const TType elemType,
+ const uint32_t size) {
return
- writeByte(out, (int8_t) elemType) +
- writeI32(out, (int32_t)size);
+ writeByte((int8_t) elemType) +
+ writeI32((int32_t)size);
}
-uint32_t TBinaryProtocol::writeListEnd(shared_ptr<TTransport> out) const {
+uint32_t TBinaryProtocol::writeListEnd() {
return 0;
}
-uint32_t TBinaryProtocol::writeSetBegin(shared_ptr<TTransport> out,
- const TType elemType,
- const uint32_t size) const {
+uint32_t TBinaryProtocol::writeSetBegin(const TType elemType,
+ const uint32_t size) {
return
- writeByte(out, (int8_t)elemType) +
- writeI32(out, (int32_t)size);
+ writeByte((int8_t)elemType) +
+ writeI32((int32_t)size);
}
-uint32_t TBinaryProtocol::writeSetEnd(shared_ptr<TTransport> out) const {
+uint32_t TBinaryProtocol::writeSetEnd() {
return 0;
}
-uint32_t TBinaryProtocol::writeBool(shared_ptr<TTransport> out,
- const bool value) const {
+uint32_t TBinaryProtocol::writeBool(const bool value) {
uint8_t tmp = value ? 1 : 0;
- out->write(&tmp, 1);
+ outputTransport_->write(&tmp, 1);
return 1;
}
-uint32_t TBinaryProtocol::writeByte(shared_ptr<TTransport> out,
- const int8_t byte) const {
- out->write((uint8_t*)&byte, 1);
+uint32_t TBinaryProtocol::writeByte(const int8_t byte) {
+ outputTransport_->write((uint8_t*)&byte, 1);
return 1;
}
-uint32_t TBinaryProtocol::writeI16(shared_ptr<TTransport> out,
- const int16_t i16) const {
+uint32_t TBinaryProtocol::writeI16(const int16_t i16) {
int16_t net = (int16_t)htons(i16);
- out->write((uint8_t*)&net, 2);
+ outputTransport_->write((uint8_t*)&net, 2);
return 2;
}
-uint32_t TBinaryProtocol::writeI32(shared_ptr<TTransport> out,
- const int32_t i32) const {
+uint32_t TBinaryProtocol::writeI32(const int32_t i32) {
int32_t net = (int32_t)htonl(i32);
- out->write((uint8_t*)&net, 4);
+ outputTransport_->write((uint8_t*)&net, 4);
return 4;
}
-uint32_t TBinaryProtocol::writeI64(shared_ptr<TTransport> out,
- const int64_t i64) const {
+uint32_t TBinaryProtocol::writeI64(const int64_t i64) {
int64_t net = (int64_t)htonll(i64);
- out->write((uint8_t*)&net, 8);
+ outputTransport_->write((uint8_t*)&net, 8);
return 8;
}
-uint32_t TBinaryProtocol::writeDouble(shared_ptr<TTransport> out,
- const double dub) const {
+uint32_t TBinaryProtocol::writeDouble(const double dub) {
uint8_t b[8];
uint8_t* d = (uint8_t*)&dub;
b[0] = d[7];
@@ -129,15 +117,14 @@
b[5] = d[2];
b[6] = d[1];
b[7] = d[0];
- out->write((uint8_t*)b, 8);
+ outputTransport_->write((uint8_t*)b, 8);
return 8;
}
-uint32_t TBinaryProtocol::writeString(shared_ptr<TTransport> out,
- const string& str) const {
- uint32_t result = writeI32(out, str.size());
- out->write((uint8_t*)str.data(), str.size());
+uint32_t TBinaryProtocol::writeString(const string& str) {
+ uint32_t result = writeI32(str.size());
+ outputTransport_->write((uint8_t*)str.data(), str.size());
return result + str.size();
}
@@ -145,159 +132,147 @@
* Reading functions
*/
-uint32_t TBinaryProtocol::readMessageBegin(shared_ptr<TTransport> in,
- std::string& name,
+uint32_t TBinaryProtocol::readMessageBegin(std::string& name,
TMessageType& messageType,
- int32_t& seqid) const {
+ int32_t& seqid) {
uint32_t result = 0;
int8_t type;
- result+= readString(in, name);
- result+= readByte(in, type);
+ result+= readString(name);
+ result+= readByte(type);
messageType = (TMessageType)type;
- result+= readI32(in, seqid);
+ result+= readI32(seqid);
return result;
}
-uint32_t TBinaryProtocol::readMessageEnd(shared_ptr<TTransport> in) const{
+uint32_t TBinaryProtocol::readMessageEnd() {
return 0;
}
-uint32_t TBinaryProtocol::readStructBegin(shared_ptr<TTransport> in,
- string& name) const {
+uint32_t TBinaryProtocol::readStructBegin(string& name) {
name = "";
return 0;
}
-uint32_t TBinaryProtocol::readStructEnd(shared_ptr<TTransport> in) const {
+uint32_t TBinaryProtocol::readStructEnd() {
return 0;
}
-uint32_t TBinaryProtocol::readFieldBegin(shared_ptr<TTransport> in,
- string& name,
+uint32_t TBinaryProtocol::readFieldBegin(string& name,
TType& fieldType,
- int16_t& fieldId) const {
+ int16_t& fieldId) {
uint32_t result = 0;
int8_t type;
- result += readByte(in, type);
+ result += readByte(type);
fieldType = (TType)type;
if (fieldType == T_STOP) {
fieldId = 0;
return result;
}
- result += readI16(in, fieldId);
+ result += readI16(fieldId);
return result;
}
-uint32_t TBinaryProtocol::readFieldEnd(shared_ptr<TTransport> in) const {
+uint32_t TBinaryProtocol::readFieldEnd() {
return 0;
}
-uint32_t TBinaryProtocol::readMapBegin(shared_ptr<TTransport> in,
- TType& keyType,
+uint32_t TBinaryProtocol::readMapBegin(TType& keyType,
TType& valType,
- uint32_t& size) const {
+ uint32_t& size) {
int8_t k, v;
uint32_t result = 0;
int32_t sizei;
- result += readByte(in, k);
+ result += readByte(k);
keyType = (TType)k;
- result += readByte(in, v);
+ result += readByte(v);
valType = (TType)v;
- result += readI32(in, sizei);
+ result += readI32(sizei);
// TODO(mcslee): check for negative size
size = (uint32_t)sizei;
return result;
}
-uint32_t TBinaryProtocol::readMapEnd(shared_ptr<TTransport> in) const {
+uint32_t TBinaryProtocol::readMapEnd() {
return 0;
}
-uint32_t TBinaryProtocol::readListBegin(shared_ptr<TTransport> in,
- TType& elemType,
- uint32_t& size) const {
+uint32_t TBinaryProtocol::readListBegin(TType& elemType,
+ uint32_t& size) {
int8_t e;
uint32_t result = 0;
int32_t sizei;
- result += readByte(in, e);
+ result += readByte(e);
elemType = (TType)e;
- result += readI32(in, sizei);
+ result += readI32(sizei);
// TODO(mcslee): check for negative size
size = (uint32_t)sizei;
return result;
}
-uint32_t TBinaryProtocol::readListEnd(shared_ptr<TTransport> in) const {
+uint32_t TBinaryProtocol::readListEnd() {
return 0;
}
-uint32_t TBinaryProtocol::readSetBegin(shared_ptr<TTransport> in,
- TType& elemType,
- uint32_t& size) const {
+uint32_t TBinaryProtocol::readSetBegin(TType& elemType,
+ uint32_t& size) {
int8_t e;
uint32_t result = 0;
int32_t sizei;
- result += readByte(in, e);
+ result += readByte(e);
elemType = (TType)e;
- result += readI32(in, sizei);
+ result += readI32(sizei);
// TODO(mcslee): check for negative size
size = (uint32_t)sizei;
return result;
}
-uint32_t TBinaryProtocol::readSetEnd(shared_ptr<TTransport> in) const {
+uint32_t TBinaryProtocol::readSetEnd() {
return 0;
}
-uint32_t TBinaryProtocol::readBool(shared_ptr<TTransport> in,
- bool& value) const {
+uint32_t TBinaryProtocol::readBool(bool& value) {
uint8_t b[1];
- in->readAll(b, 1);
+ inputTransport_->readAll(b, 1);
value = *(int8_t*)b != 0;
return 1;
}
-uint32_t TBinaryProtocol::readByte(shared_ptr<TTransport> in,
- int8_t& byte) const {
+uint32_t TBinaryProtocol::readByte(int8_t& byte) {
uint8_t b[1];
- in->readAll(b, 1);
+ inputTransport_->readAll(b, 1);
byte = *(int8_t*)b;
return 1;
}
-uint32_t TBinaryProtocol::readI16(shared_ptr<TTransport> in,
- int16_t& i16) const {
+uint32_t TBinaryProtocol::readI16(int16_t& i16) {
uint8_t b[2];
- in->readAll(b, 2);
+ inputTransport_->readAll(b, 2);
i16 = *(int16_t*)b;
i16 = (int16_t)ntohs(i16);
return 2;
}
-uint32_t TBinaryProtocol::readI32(shared_ptr<TTransport> in,
- int32_t& i32) const {
+uint32_t TBinaryProtocol::readI32(int32_t& i32) {
uint8_t b[4];
- in->readAll(b, 4);
+ inputTransport_->readAll(b, 4);
i32 = *(int32_t*)b;
i32 = (int32_t)ntohl(i32);
return 4;
}
-uint32_t TBinaryProtocol::readI64(shared_ptr<TTransport> in,
- int64_t& i64) const {
+uint32_t TBinaryProtocol::readI64(int64_t& i64) {
uint8_t b[8];
- in->readAll(b, 8);
+ inputTransport_->readAll(b, 8);
i64 = *(int64_t*)b;
i64 = (int64_t)ntohll(i64);
return 8;
}
-uint32_t TBinaryProtocol::readDouble(shared_ptr<TTransport> in,
- double& dub) const {
+uint32_t TBinaryProtocol::readDouble(double& dub) {
uint8_t b[8];
uint8_t d[8];
- in->readAll(b, 8);
+ inputTransport_->readAll(b, 8);
d[0] = b[7];
d[1] = b[6];
d[2] = b[5];
@@ -310,17 +285,16 @@
return 8;
}
-uint32_t TBinaryProtocol::readString(shared_ptr<TTransport> in,
- string& str) const {
+uint32_t TBinaryProtocol::readString(string& str) {
uint32_t result;
int32_t size;
- result = readI32(in, size);
+ result = readI32(size);
// TODO(mcslee): check for negative size
// Use the heap here to prevent stack overflow for v. large strings
uint8_t *b = new uint8_t[size];
- in->readAll(b, size);
+ inputTransport_->readAll(b, size);
str = string((char*)b, size);
delete [] b;
diff --git a/lib/cpp/src/protocol/TBinaryProtocol.h b/lib/cpp/src/protocol/TBinaryProtocol.h
index 7f36a57..de9a836 100644
--- a/lib/cpp/src/protocol/TBinaryProtocol.h
+++ b/lib/cpp/src/protocol/TBinaryProtocol.h
@@ -17,139 +17,130 @@
*/
class TBinaryProtocol : public TProtocol {
public:
- TBinaryProtocol() {}
+ TBinaryProtocol(shared_ptr<TTransport> in, shared_ptr<TTransport> out) :
+ TProtocol(in, out) {}
+
~TBinaryProtocol() {}
/**
* Writing functions.
*/
- virtual uint32_t writeMessageBegin(shared_ptr<TTransport> out,
- const std::string name,
+ virtual uint32_t writeMessageBegin(const std::string name,
const TMessageType messageType,
- const int32_t seqid) const;
+ const int32_t seqid);
- virtual uint32_t writeMessageEnd(shared_ptr<TTransport> out) const;
+ virtual uint32_t writeMessageEnd();
- uint32_t writeStructBegin(shared_ptr<TTransport> out,
- const std::string& name) const;
+ uint32_t writeStructBegin(const std::string& name);
- uint32_t writeStructEnd(shared_ptr<TTransport> out) const;
+ uint32_t writeStructEnd();
- uint32_t writeFieldBegin(shared_ptr<TTransport> out,
- const std::string& name,
- const TType fieldType,
- const int16_t fieldId) const;
+ uint32_t writeFieldBegin(const std::string& name,
+ const TType fieldType,
+ const int16_t fieldId);
- uint32_t writeFieldEnd(shared_ptr<TTransport> out) const;
+ uint32_t writeFieldEnd();
- uint32_t writeFieldStop(shared_ptr<TTransport> out) const;
+ uint32_t writeFieldStop();
- uint32_t writeMapBegin(shared_ptr<TTransport> out,
- const TType keyType,
- const TType valType,
- const uint32_t size) const;
+ uint32_t writeMapBegin(const TType keyType,
+ const TType valType,
+ const uint32_t size);
- uint32_t writeMapEnd(shared_ptr<TTransport> out) const;
+ uint32_t writeMapEnd();
- uint32_t writeListBegin(shared_ptr<TTransport> out,
- const TType elemType,
- const uint32_t size) const;
+ uint32_t writeListBegin(const TType elemType,
+ const uint32_t size);
- uint32_t writeListEnd(shared_ptr<TTransport> out) const;
+ uint32_t writeListEnd();
- uint32_t writeSetBegin(shared_ptr<TTransport> out,
- const TType elemType,
- const uint32_t size) const;
+ uint32_t writeSetBegin(const TType elemType,
+ const uint32_t size);
- uint32_t writeSetEnd(shared_ptr<TTransport> out) const;
+ uint32_t writeSetEnd();
- uint32_t writeBool(shared_ptr<TTransport> out,
- const bool value) const;
+ uint32_t writeBool(const bool value);
- uint32_t writeByte(shared_ptr<TTransport> out,
- const int8_t byte) const;
+ uint32_t writeByte(const int8_t byte);
- uint32_t writeI16(shared_ptr<TTransport> out,
- const int16_t i16) const;
+ uint32_t writeI16(const int16_t i16);
- uint32_t writeI32(shared_ptr<TTransport> out,
- const int32_t i32) const;
+ uint32_t writeI32(const int32_t i32);
- uint32_t writeI64(shared_ptr<TTransport> out,
- const int64_t i64) const;
+ uint32_t writeI64(const int64_t i64);
- uint32_t writeDouble(shared_ptr<TTransport> out,
- const double dub) const;
+ uint32_t writeDouble(const double dub);
- uint32_t writeString(shared_ptr<TTransport> out,
- const std::string& str) const;
+ uint32_t writeString(const std::string& str);
/**
* Reading functions
*/
- uint32_t readMessageBegin(shared_ptr<TTransport> in,
- std::string& name,
+ uint32_t readMessageBegin(std::string& name,
TMessageType& messageType,
- int32_t& seqid) const;
+ int32_t& seqid);
- uint32_t readMessageEnd(shared_ptr<TTransport> in) const;
+ uint32_t readMessageEnd();
- uint32_t readStructBegin(shared_ptr<TTransport> in,
- std::string& name) const;
+ uint32_t readStructBegin(std::string& name);
- uint32_t readStructEnd(shared_ptr<TTransport> in) const;
+ uint32_t readStructEnd();
- uint32_t readFieldBegin(shared_ptr<TTransport> in,
- std::string& name,
- TType& fieldType,
- int16_t& fieldId) const;
+ uint32_t readFieldBegin(std::string& name,
+ TType& fieldType,
+ int16_t& fieldId);
- uint32_t readFieldEnd(shared_ptr<TTransport> in) const;
+ uint32_t readFieldEnd();
- uint32_t readMapBegin(shared_ptr<TTransport> in,
- TType& keyType,
- TType& valType,
- uint32_t& size) const;
+ uint32_t readMapBegin(TType& keyType,
+ TType& valType,
+ uint32_t& size);
- uint32_t readMapEnd(shared_ptr<TTransport> in) const;
+ uint32_t readMapEnd();
- uint32_t readListBegin(shared_ptr<TTransport> in,
- TType& elemType,
- uint32_t& size) const;
+ uint32_t readListBegin(TType& elemType,
+ uint32_t& size);
- uint32_t readListEnd(shared_ptr<TTransport> in) const;
+ uint32_t readListEnd();
- uint32_t readSetBegin(shared_ptr<TTransport> in,
- TType& elemType,
- uint32_t& size) const;
+ uint32_t readSetBegin(TType& elemType,
+ uint32_t& size);
- uint32_t readSetEnd(shared_ptr<TTransport> in) const;
+ uint32_t readSetEnd();
- uint32_t readBool(shared_ptr<TTransport> in,
- bool& value) const;
+ uint32_t readBool(bool& value);
- uint32_t readByte(shared_ptr<TTransport> in,
- int8_t& byte) const;
+ uint32_t readByte(int8_t& byte);
- uint32_t readI16(shared_ptr<TTransport> in,
- int16_t& i16) const;
+ uint32_t readI16(int16_t& i16);
- uint32_t readI32(shared_ptr<TTransport> in,
- int32_t& i32) const;
+ uint32_t readI32(int32_t& i32);
- uint32_t readI64(shared_ptr<TTransport> in,
- int64_t& i64) const;
+ uint32_t readI64(int64_t& i64);
- uint32_t readDouble(shared_ptr<TTransport> in,
- double& dub) const;
+ uint32_t readDouble(double& dub);
- uint32_t readString(shared_ptr<TTransport> in,
- std::string& str) const;
+ uint32_t readString(std::string& str);
+};
+
+/**
+ * Constructs binary protocol handlers
+ */
+class TBinaryProtocolFactory : public TProtocolFactory {
+ public:
+ TBinaryProtocolFactory() {}
+
+ virtual ~TBinaryProtocolFactory() {}
+
+ std::pair<boost::shared_ptr<TProtocol>, boost::shared_ptr<TProtocol> > getIOProtocols(boost::shared_ptr<TTransport> in, boost::shared_ptr<TTransport> out) {
+ boost::shared_ptr<TProtocol> prot(new TBinaryProtocol(in, out));
+ return std::make_pair(prot, prot);
+ }
};
}}} // facebook::thrift::protocol
diff --git a/lib/cpp/src/protocol/TProtocol.h b/lib/cpp/src/protocol/TProtocol.h
index 60d85dc..8077b27 100644
--- a/lib/cpp/src/protocol/TProtocol.h
+++ b/lib/cpp/src/protocol/TProtocol.h
@@ -82,170 +82,144 @@
* Writing functions.
*/
- virtual uint32_t writeMessageBegin(shared_ptr<TTransport> out,
- const std::string name,
+ virtual uint32_t writeMessageBegin(const std::string name,
const TMessageType messageType,
- const int32_t seqid) const = 0;
+ const int32_t seqid) = 0;
- virtual uint32_t writeMessageEnd(shared_ptr<TTransport> out) const = 0;
+ virtual uint32_t writeMessageEnd() = 0;
- virtual uint32_t writeStructBegin(shared_ptr<TTransport> out,
- const std::string& name) const = 0;
+ virtual uint32_t writeStructBegin(const std::string& name) = 0;
- virtual uint32_t writeStructEnd(shared_ptr<TTransport> out) const = 0;
+ virtual uint32_t writeStructEnd() = 0;
- virtual uint32_t writeFieldBegin(shared_ptr<TTransport> out,
- const std::string& name,
+ virtual uint32_t writeFieldBegin(const std::string& name,
const TType fieldType,
- const int16_t fieldId) const = 0;
+ const int16_t fieldId) = 0;
- virtual uint32_t writeFieldEnd(shared_ptr<TTransport> out) const = 0;
+ virtual uint32_t writeFieldEnd() = 0;
- virtual uint32_t writeFieldStop(shared_ptr<TTransport> out) const = 0;
+ virtual uint32_t writeFieldStop() = 0;
- virtual uint32_t writeMapBegin(shared_ptr<TTransport> out,
- const TType keyType,
+ virtual uint32_t writeMapBegin(const TType keyType,
const TType valType,
- const uint32_t size) const = 0;
+ const uint32_t size) = 0;
- virtual uint32_t writeMapEnd(shared_ptr<TTransport> out) const = 0;
+ virtual uint32_t writeMapEnd() = 0;
- virtual uint32_t writeListBegin(shared_ptr<TTransport> out,
- const TType elemType,
- const uint32_t size) const = 0;
+ virtual uint32_t writeListBegin(const TType elemType,
+ const uint32_t size) = 0;
- virtual uint32_t writeListEnd(shared_ptr<TTransport> out) const = 0;
+ virtual uint32_t writeListEnd() = 0;
- virtual uint32_t writeSetBegin(shared_ptr<TTransport> out,
- const TType elemType,
- const uint32_t size) const = 0;
+ virtual uint32_t writeSetBegin(const TType elemType,
+ const uint32_t size) = 0;
- virtual uint32_t writeSetEnd(shared_ptr<TTransport> out) const = 0;
+ virtual uint32_t writeSetEnd() = 0;
- virtual uint32_t writeBool(shared_ptr<TTransport> out,
- const bool value) const = 0;
+ virtual uint32_t writeBool(const bool value) = 0;
- virtual uint32_t writeByte(shared_ptr<TTransport> out,
- const int8_t byte) const = 0;
+ virtual uint32_t writeByte(const int8_t byte) = 0;
- virtual uint32_t writeI16(shared_ptr<TTransport> out,
- const int16_t i16) const = 0;
+ virtual uint32_t writeI16(const int16_t i16) = 0;
- virtual uint32_t writeI32(shared_ptr<TTransport> out,
- const int32_t i32) const = 0;
+ virtual uint32_t writeI32(const int32_t i32) = 0;
- virtual uint32_t writeI64(shared_ptr<TTransport> out,
- const int64_t i64) const = 0;
+ virtual uint32_t writeI64(const int64_t i64) = 0;
- virtual uint32_t writeDouble(shared_ptr<TTransport> out,
- const double dub) const = 0;
+ virtual uint32_t writeDouble(const double dub) = 0;
- virtual uint32_t writeString(shared_ptr<TTransport> out,
- const std::string& str) const = 0;
+ virtual uint32_t writeString(const std::string& str) = 0;
/**
* Reading functions
*/
- virtual uint32_t readMessageBegin(shared_ptr<TTransport> in,
- std::string& name,
+ virtual uint32_t readMessageBegin(std::string& name,
TMessageType& messageType,
- int32_t& seqid) const = 0;
+ int32_t& seqid) = 0;
- virtual uint32_t readMessageEnd(shared_ptr<TTransport> in) const = 0;
+ virtual uint32_t readMessageEnd() = 0;
- virtual uint32_t readStructBegin(shared_ptr<TTransport> in,
- std::string& name) const = 0;
+ virtual uint32_t readStructBegin(std::string& name) = 0;
- virtual uint32_t readStructEnd(shared_ptr<TTransport> in) const = 0;
+ virtual uint32_t readStructEnd() = 0;
- virtual uint32_t readFieldBegin(shared_ptr<TTransport> in,
- std::string& name,
+ virtual uint32_t readFieldBegin(std::string& name,
TType& fieldType,
- int16_t& fieldId) const = 0;
+ int16_t& fieldId) = 0;
- virtual uint32_t readFieldEnd(shared_ptr<TTransport> in) const = 0;
+ virtual uint32_t readFieldEnd() = 0;
- virtual uint32_t readMapBegin(shared_ptr<TTransport> in,
- TType& keyType,
+ virtual uint32_t readMapBegin(TType& keyType,
TType& valType,
- uint32_t& size) const = 0;
+ uint32_t& size) = 0;
- virtual uint32_t readMapEnd(shared_ptr<TTransport> in) const = 0;
+ virtual uint32_t readMapEnd() = 0;
- virtual uint32_t readListBegin(shared_ptr<TTransport> in,
- TType& elemType,
- uint32_t& size) const = 0;
+ virtual uint32_t readListBegin(TType& elemType,
+ uint32_t& size) = 0;
- virtual uint32_t readListEnd(shared_ptr<TTransport> in) const = 0;
+ virtual uint32_t readListEnd() = 0;
- virtual uint32_t readSetBegin(shared_ptr<TTransport> in,
- TType& elemType,
- uint32_t& size) const = 0;
+ virtual uint32_t readSetBegin(TType& elemType,
+ uint32_t& size) = 0;
- virtual uint32_t readSetEnd(shared_ptr<TTransport> in) const = 0;
+ virtual uint32_t readSetEnd() = 0;
- virtual uint32_t readBool(shared_ptr<TTransport> in,
- bool& value) const = 0;
+ virtual uint32_t readBool(bool& value) = 0;
- virtual uint32_t readByte(shared_ptr<TTransport> in,
- int8_t& byte) const = 0;
+ virtual uint32_t readByte(int8_t& byte) = 0;
- virtual uint32_t readI16(shared_ptr<TTransport> in,
- int16_t& i16) const = 0;
+ virtual uint32_t readI16(int16_t& i16) = 0;
- virtual uint32_t readI32(shared_ptr<TTransport> in,
- int32_t& i32) const = 0;
+ virtual uint32_t readI32(int32_t& i32) = 0;
- virtual uint32_t readI64(shared_ptr<TTransport> in,
- int64_t& i64) const = 0;
+ virtual uint32_t readI64(int64_t& i64) = 0;
- virtual uint32_t readDouble(shared_ptr<TTransport> in,
- double& dub) const = 0;
+ virtual uint32_t readDouble(double& dub) = 0;
- virtual uint32_t readString(shared_ptr<TTransport> in,
- std::string& str) const = 0;
+ virtual uint32_t readString(std::string& str) = 0;
/**
* Method to arbitrarily skip over data.
*/
- uint32_t skip(shared_ptr<TTransport> in, TType type) const {
+ uint32_t skip(TType type) {
switch (type) {
case T_BOOL:
{
bool boolv;
- return readBool(in, boolv);
+ return readBool(boolv);
}
case T_BYTE:
{
int8_t bytev;
- return readByte(in, bytev);
+ return readByte(bytev);
}
case T_I16:
{
int16_t i16;
- return readI16(in, i16);
+ return readI16(i16);
}
case T_I32:
{
int32_t i32;
- return readI32(in, i32);
+ return readI32(i32);
}
case T_I64:
{
int64_t i64;
- return readI64(in, i64);
+ return readI64(i64);
}
case T_DOUBLE:
{
double dub;
- return readDouble(in, dub);
+ return readDouble(dub);
}
case T_STRING:
{
std::string str;
- return readString(in, str);
+ return readString(str);
}
case T_STRUCT:
{
@@ -253,16 +227,16 @@
std::string name;
int16_t fid;
TType ftype;
- result += readStructBegin(in, name);
+ result += readStructBegin(name);
while (true) {
- result += readFieldBegin(in, name, ftype, fid);
+ result += readFieldBegin(name, ftype, fid);
if (ftype == T_STOP) {
break;
}
- result += skip(in, ftype);
- result += readFieldEnd(in);
+ result += skip(ftype);
+ result += readFieldEnd();
}
- result += readStructEnd(in);
+ result += readStructEnd();
return result;
}
case T_MAP:
@@ -271,12 +245,12 @@
TType keyType;
TType valType;
uint32_t i, size;
- result += readMapBegin(in, keyType, valType, size);
+ result += readMapBegin(keyType, valType, size);
for (i = 0; i < size; i++) {
- result += skip(in, keyType);
- result += skip(in, valType);
+ result += skip(keyType);
+ result += skip(valType);
}
- result += readMapEnd(in);
+ result += readMapEnd();
return result;
}
case T_SET:
@@ -284,11 +258,11 @@
uint32_t result = 0;
TType elemType;
uint32_t i, size;
- result += readSetBegin(in, elemType, size);
+ result += readSetBegin(elemType, size);
for (i = 0; i < size; i++) {
- result += skip(in, elemType);
+ result += skip(elemType);
}
- result += readSetEnd(in);
+ result += readSetEnd();
return result;
}
case T_LIST:
@@ -296,11 +270,11 @@
uint32_t result = 0;
TType elemType;
uint32_t i, size;
- result += readListBegin(in, elemType, size);
+ result += readListBegin(elemType, size);
for (i = 0; i < size; i++) {
- result += skip(in, elemType);
+ result += skip(elemType);
}
- result += readListEnd(in);
+ result += readListEnd();
return result;
}
default:
@@ -308,10 +282,39 @@
}
}
+ shared_ptr<TTransport> getInputTransport() {
+ return inputTransport_;
+ }
+
+ shared_ptr<TTransport> getOutputTransport() {
+ return outputTransport_;
+ }
+
protected:
+ TProtocol(shared_ptr<TTransport> in, shared_ptr<TTransport> out) :
+ inputTransport_(in),
+ outputTransport_(out) {}
+
+ shared_ptr<TTransport> inputTransport_;
+
+ shared_ptr<TTransport> outputTransport_;
+
+ private:
TProtocol() {}
};
+/**
+ * Constructs input and output protocol objects given transports.
+ */
+class TProtocolFactory {
+ public:
+ TProtocolFactory() {}
+
+ virtual ~TProtocolFactory() {}
+
+ virtual std::pair<boost::shared_ptr<TProtocol>, boost::shared_ptr<TProtocol> > getIOProtocols(boost::shared_ptr<TTransport> in, boost::shared_ptr<TTransport> out) = 0;
+};
+
}}} // facebook::thrift::protocol
#endif // #define _THRIFT_PROTOCOL_TPROTOCOL_H_ 1
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index bc39877..75a209e 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -153,7 +153,7 @@
try {
// Invoke the processor
- server_->getProcessor()->process(inputTransport_, outputTransport_);
+ server_->getProcessor()->process(inputProtocol_, outputProtocol_);
} catch (TTransportException &x) {
fprintf(stderr, "Server::process %s\n", x.getMessage().c_str());
close();
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index ec024c0..faa572f 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -1,9 +1,9 @@
#ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_
#define _THRIFT_SERVER_TNONBLOCKINGSERVER_H_ 1
-#include "Thrift.h"
-#include "server/TServer.h"
-#include "transport/TMemoryBuffer.h"
+#include <Thrift.h>
+#include <server/TServer.h>
+#include <transport/TTransportUtils.h>
#include <stack>
#include <event.h>
@@ -50,10 +50,8 @@
void handleEvent(int fd, short which);
public:
- TNonblockingServer(shared_ptr<TProcessor> processor,
- shared_ptr<TServerOptions> options,
- int port) :
- TServer(processor, options),
+ TNonblockingServer(shared_ptr<TProcessor> processor, int port) :
+ TServer(processor),
serverSocket_(0),
port_(port),
frameResponses_(true) {}
@@ -157,6 +155,12 @@
// Transport that processor writes to
shared_ptr<TMemoryBuffer> outputTransport_;
+ // Protocol encoder
+ shared_ptr<TProtocol> outputProtocol_;
+
+ // Protocol decoder
+ shared_ptr<TProtocol> inputProtocol_;
+
// Go into read mode
void setRead() {
setFlags(EV_READ | EV_PERSIST);
@@ -190,6 +194,12 @@
inputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
outputTransport_ = shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
+ // Create protocol
+ std::pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
+ iop = s->getProtocolFactory()->getIOProtocols(inputTransport_, outputTransport_);
+ inputProtocol_ = iop.first;
+ outputProtocol_ = iop.second;
+
init(socket, eventFlags, s);
}
diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h
index 293aa28..4b20432 100644
--- a/lib/cpp/src/server/TServer.h
+++ b/lib/cpp/src/server/TServer.h
@@ -3,7 +3,7 @@
#include <TProcessor.h>
#include <transport/TServerTransport.h>
-#include <transport/TTransportFactory.h>
+#include <protocol/TBinaryProtocol.h>
#include <concurrency/Thread.h>
#include <boost/shared_ptr.hpp>
@@ -14,8 +14,6 @@
using namespace facebook::thrift::transport;
using namespace boost;
-class TServerOptions;
-
/**
* Thrift server.
*
@@ -24,45 +22,61 @@
class TServer : public concurrency::Runnable {
public:
virtual ~TServer() {}
+
virtual void serve() = 0;
// Allows running the server as a Runnable thread
- virtual void run() { serve(); }
+ virtual void run() {
+ serve();
+ }
shared_ptr<TProcessor> getProcessor() {
return processor_;
}
+ shared_ptr<TProtocolFactory> getProtocolFactory() {
+ return protocolFactory_;
+ }
+
protected:
TServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport,
shared_ptr<TTransportFactory> transportFactory,
- shared_ptr<TServerOptions> options) :
+ shared_ptr<TProtocolFactory> protocolFactory) :
processor_(processor),
serverTransport_(serverTransport),
transportFactory_(transportFactory),
- options_(options) {}
+ protocolFactory_(protocolFactory) {}
TServer(shared_ptr<TProcessor> processor,
- shared_ptr<TServerOptions> options) :
- processor_(processor), options_(options) {}
+ shared_ptr<TServerTransport> serverTransport,
+ shared_ptr<TTransportFactory> transportFactory) :
+ processor_(processor),
+ serverTransport_(serverTransport),
+ transportFactory_(transportFactory) {
+ protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
+ }
+
+ TServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerTransport> serverTransport) :
+ processor_(processor),
+ serverTransport_(serverTransport) {
+ transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
+ protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
+ }
+
+ TServer(shared_ptr<TProcessor> processor) :
+ processor_(processor) {
+ transportFactory_ = boost::shared_ptr<TTransportFactory>(new TTransportFactory());
+ protocolFactory_ = boost::shared_ptr<TProtocolFactory>(new TBinaryProtocolFactory());
+ }
shared_ptr<TProcessor> processor_;
shared_ptr<TServerTransport> serverTransport_;
shared_ptr<TTransportFactory> transportFactory_;
- shared_ptr<TServerOptions> options_;
+ shared_ptr<TProtocolFactory> protocolFactory_;
};
-/**
- * Class to encapsulate all generic server options.
- */
-class TServerOptions {
- public:
- // TODO(mcslee): Fill in getters/setters here
- protected:
- // TODO(mcslee): Fill data members in here
-};
-
}}} // facebook::thrift::server
#endif // #ifndef _THRIFT_SERVER_TSERVER_H_
diff --git a/lib/cpp/src/server/TSimpleServer.cpp b/lib/cpp/src/server/TSimpleServer.cpp
index 63b6c6b..3eb035e 100644
--- a/lib/cpp/src/server/TSimpleServer.cpp
+++ b/lib/cpp/src/server/TSimpleServer.cpp
@@ -14,7 +14,8 @@
void TSimpleServer::serve() {
shared_ptr<TTransport> client;
- pair<shared_ptr<TTransport>,shared_ptr<TTransport> > io;
+ pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
+ pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
try {
// Start the server listening
@@ -28,14 +29,15 @@
try {
while (true) {
client = serverTransport_->accept();
- io = transportFactory_->getIOTransports(client);
+ iot = transportFactory_->getIOTransports(client);
+ iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
try {
- while (processor_->process(io.first, io.second)) {}
+ while (processor_->process(iop.first, iop.second)) {}
} catch (TTransportException& ttx) {
cerr << "TSimpleServer client died: " << ttx.getMessage() << endl;
}
- io.first->close();
- io.second->close();
+ iot.first->close();
+ iot.second->close();
client->close();
}
} catch (TTransportException& ttx) {
diff --git a/lib/cpp/src/server/TSimpleServer.h b/lib/cpp/src/server/TSimpleServer.h
index a0d22a7..6470519 100644
--- a/lib/cpp/src/server/TSimpleServer.h
+++ b/lib/cpp/src/server/TSimpleServer.h
@@ -19,8 +19,8 @@
TSimpleServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport,
shared_ptr<TTransportFactory> transportFactory,
- shared_ptr<TServerOptions> options) :
- TServer(processor, serverTransport, transportFactory, options) {}
+ shared_ptr<TProtocolFactory> protocolFactory) :
+ TServer(processor, serverTransport, transportFactory, protocolFactory) {}
~TSimpleServer() {}
diff --git a/lib/cpp/src/server/TThreadPoolServer.cpp b/lib/cpp/src/server/TThreadPoolServer.cpp
index 43f7463..7885f0f 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cpp
+++ b/lib/cpp/src/server/TThreadPoolServer.cpp
@@ -16,8 +16,8 @@
public:
Task(shared_ptr<TProcessor> processor,
- shared_ptr<TTransport> input,
- shared_ptr<TTransport> output) :
+ shared_ptr<TProtocol> input,
+ shared_ptr<TProtocol> output) :
processor_(processor),
input_(input),
output_(output) {
@@ -35,23 +35,24 @@
break;
}
}
- input_->close();
- output_->close();
+ input_->getInputTransport()->close();
+ output_->getOutputTransport()->close();
}
private:
shared_ptr<TProcessor> processor_;
- shared_ptr<TTransport> input_;
- shared_ptr<TTransport> output_;
+ shared_ptr<TProtocol> input_;
+ shared_ptr<TProtocol> output_;
};
TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport,
shared_ptr<TTransportFactory> transportFactory,
- shared_ptr<ThreadManager> threadManager,
- shared_ptr<TServerOptions> options) :
- TServer(processor, serverTransport, transportFactory, options),
+ shared_ptr<TProtocolFactory> protocolFactory,
+
+ shared_ptr<ThreadManager> threadManager) :
+ TServer(processor, serverTransport, transportFactory, protocolFactory),
threadManager_(threadManager) {
}
@@ -60,7 +61,8 @@
void TThreadPoolServer::serve() {
shared_ptr<TTransport> client;
- pair<shared_ptr<TTransport>,shared_ptr<TTransport> > io;
+ pair<shared_ptr<TTransport>,shared_ptr<TTransport> > iot;
+ pair<shared_ptr<TProtocol>,shared_ptr<TProtocol> > iop;
try {
// Start the server listening
@@ -75,9 +77,11 @@
// Fetch client from server
client = serverTransport_->accept();
// Make IO transports
- io = transportFactory_->getIOTransports(client);
+ iot = transportFactory_->getIOTransports(client);
+ iop = protocolFactory_->getIOProtocols(iot.first, iot.second);
+
// Add to threadmanager pool
- threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, io.first, io.second)));
+ threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_, iop.first, iop.second)));
} catch (TTransportException& ttx) {
break;
}
diff --git a/lib/cpp/src/server/TThreadPoolServer.h b/lib/cpp/src/server/TThreadPoolServer.h
index b8f8b47..5c5899e 100644
--- a/lib/cpp/src/server/TThreadPoolServer.h
+++ b/lib/cpp/src/server/TThreadPoolServer.h
@@ -21,8 +21,8 @@
TThreadPoolServer(shared_ptr<TProcessor> processor,
shared_ptr<TServerTransport> serverTransport,
shared_ptr<TTransportFactory> transportFactory,
- shared_ptr<ThreadManager> threadManager,
- shared_ptr<TServerOptions> options);
+ shared_ptr<TProtocolFactory> protocolFactory,
+ shared_ptr<ThreadManager> threadManager);
virtual ~TThreadPoolServer();
diff --git a/lib/cpp/src/transport/TBufferedRouterTransport.h b/lib/cpp/src/transport/TBufferedRouterTransport.h
index b01faac..b89198e 100644
--- a/lib/cpp/src/transport/TBufferedRouterTransport.h
+++ b/lib/cpp/src/transport/TBufferedRouterTransport.h
@@ -87,6 +87,30 @@
uint32_t wLen_;
};
+
+/**
+ * Wraps a transport into a bufferedRouter instance.
+ *
+ * @author Aditya Agarwal <aditya@facebook.com>
+ */
+class TBufferedRouterTransportFactory : public TTransportFactory {
+ public:
+ TBufferedRouterTransportFactory(boost::shared_ptr<TTransport> rTrans): rTrans_(rTrans) {}
+
+ virtual ~TBufferedRouterTransportFactory() {}
+
+ /**
+ * Wraps the transport into a buffered one.
+ */
+ virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
+ boost::shared_ptr<TTransport> buffered(new TBufferedRouterTransport(trans, rTrans_));
+ return std::make_pair(buffered, buffered);
+ }
+
+ private:
+ boost::shared_ptr<TTransport> rTrans_;
+};
+
}}} // facebook::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TBufferedRouterTransportFactory.h b/lib/cpp/src/transport/TBufferedRouterTransportFactory.h
deleted file mode 100644
index 2b82570..0000000
--- a/lib/cpp/src/transport/TBufferedRouterTransportFactory.h
+++ /dev/null
@@ -1,35 +0,0 @@
-#ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORTFACTORY_H_
-#define _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORTFACTORY_H_ 1
-
-#include <transport/TTransportFactory.h>
-#include <transport/TBufferedRouterTransport.h>
-#include <boost/shared_ptr.hpp>
-
-namespace facebook { namespace thrift { namespace transport {
-
-/**
- * Wraps a transport into a bufferedRouter instance.
- *
- * @author Aditya Agarwal <aditya@facebook.com>
- */
-class TBufferedRouterTransportFactory : public TTransportFactory {
- public:
- TBufferedRouterTransportFactory(boost::shared_ptr<TTransport> rTrans): rTrans_(rTrans) {}
-
- virtual ~TBufferedRouterTransportFactory() {}
-
- /**
- * Wraps the transport into a buffered one.
- */
- virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
- boost::shared_ptr<TTransport> buffered(new TBufferedRouterTransport(trans, rTrans_));
- return std::make_pair(buffered, buffered);
- }
-
- private:
- boost::shared_ptr<TTransport> rTrans_;
-};
-
-}}}
-
-#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDROUTERTRANSPORTFACTORY_H_
diff --git a/lib/cpp/src/transport/TBufferedTransport.cpp b/lib/cpp/src/transport/TBufferedTransport.cpp
deleted file mode 100644
index 9dfd63a..0000000
--- a/lib/cpp/src/transport/TBufferedTransport.cpp
+++ /dev/null
@@ -1,65 +0,0 @@
-#include "TBufferedTransport.h"
-using std::string;
-
-namespace facebook { namespace thrift { namespace transport {
-
-uint32_t TBufferedTransport::read(uint8_t* buf, uint32_t len) {
- uint32_t need = len;
-
- // We don't have enough data yet
- if (rLen_-rPos_ < need) {
- // Copy out whatever we have
- if (rLen_-rPos_ > 0) {
- memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
- need -= rLen_-rPos_;
- buf += rLen_-rPos_;
- }
- // Get more from underlying transport up to buffer size
- // TODO: should this be a readAll?
- rLen_ = transport_->read(rBuf_, rBufSize_);
- rPos_ = 0;
- }
-
- // Hand over whatever we have
- uint32_t give = need;
- if (rLen_-rPos_ < give) {
- give = rLen_-rPos_;
- }
- memcpy(buf, rBuf_+rPos_, give);
- rPos_ += give;
- need -= give;
- return (len - need);
-}
-
-void TBufferedTransport::write(const uint8_t* buf, uint32_t len) {
- if (len == 0) {
- return;
- }
-
- if (len + wLen_ >= wBufSize_) {
- uint32_t copy = wBufSize_ - wLen_;
- memcpy(wBuf_ + wLen_, buf, copy);
- transport_->write(wBuf_, wBufSize_);
-
- wLen_ = len - copy;
- if (wLen_ > 0) {
- memcpy(wBuf_, buf+copy, wLen_);
- }
- } else {
- memcpy(wBuf_+wLen_, buf, len);
- wLen_ += len;
- }
-}
-
-void TBufferedTransport::flush() {
- // Write out any data waiting in the write buffer
- if (wLen_ > 0) {
- transport_->write(wBuf_, wLen_);
- wLen_ = 0;
- }
-
- // Flush the underlying transport
- transport_->flush();
-}
-
-}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TBufferedTransport.h b/lib/cpp/src/transport/TBufferedTransport.h
deleted file mode 100644
index 9992777..0000000
--- a/lib/cpp/src/transport/TBufferedTransport.h
+++ /dev/null
@@ -1,83 +0,0 @@
-#ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_
-#define _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_ 1
-
-#include "TTransport.h"
-#include <string>
-
-#include <boost/shared_ptr.hpp>
-
-namespace facebook { namespace thrift { namespace transport {
-
-using namespace boost;
-
-/**
- * Buffered transport. For reads it will read more data than is requested
- * and will serve future data out of a local buffer. For writes, data is
- * stored to an in memory buffer before being written out.
- *
- * @author Mark Slee <mcslee@facebook.com>
- */
-class TBufferedTransport : public TTransport {
- public:
- TBufferedTransport(shared_ptr<TTransport> transport) :
- transport_(transport),
- rBufSize_(512), rPos_(0), rLen_(0),
- wBufSize_(512), wLen_(0) {
- rBuf_ = new uint8_t[rBufSize_];
- wBuf_ = new uint8_t[wBufSize_];
- }
-
- TBufferedTransport(shared_ptr<TTransport> transport, uint32_t sz) :
- transport_(transport),
- rBufSize_(sz), rPos_(0), rLen_(0),
- wBufSize_(sz), wLen_(0) {
- rBuf_ = new uint8_t[rBufSize_];
- wBuf_ = new uint8_t[wBufSize_];
- }
-
- TBufferedTransport(shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz) :
- transport_(transport),
- rBufSize_(rsz), rPos_(0), rLen_(0),
- wBufSize_(wsz), wLen_(0) {
- rBuf_ = new uint8_t[rBufSize_];
- wBuf_ = new uint8_t[wBufSize_];
- }
-
- ~TBufferedTransport() {
- delete [] rBuf_;
- delete [] wBuf_;
- }
-
- bool isOpen() {
- return transport_->isOpen();
- }
-
- void open() {
- transport_->open();
- }
-
- void close() {
- transport_->close();
- }
-
- uint32_t read(uint8_t* buf, uint32_t len);
-
- void write(const uint8_t* buf, uint32_t len);
-
- void flush();
-
- protected:
- shared_ptr<TTransport> transport_;
- uint8_t* rBuf_;
- uint32_t rBufSize_;
- uint32_t rPos_;
- uint32_t rLen_;
-
- uint8_t* wBuf_;
- uint32_t wBufSize_;
- uint32_t wLen_;
-};
-
-}}} // facebook::thrift::transport
-
-#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TBufferedTransportFactory.h b/lib/cpp/src/transport/TBufferedTransportFactory.h
deleted file mode 100644
index c6e87b1..0000000
--- a/lib/cpp/src/transport/TBufferedTransportFactory.h
+++ /dev/null
@@ -1,33 +0,0 @@
-#ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORTFACTORY_H_
-#define _THRIFT_TRANSPORT_TBUFFEREDTRANSPORTFACTORY_H_ 1
-
-#include <transport/TTransportFactory.h>
-#include <transport/TBufferedTransport.h>
-#include <boost/shared_ptr.hpp>
-
-namespace facebook { namespace thrift { namespace transport {
-
-/**
- * Wraps a transport into a buffered one.
- *
- * @author Mark Slee <mcslee@facebook.com>
- */
-class TBufferedTransportFactory : public TTransportFactory {
- public:
- TBufferedTransportFactory() {}
-
- virtual ~TBufferedTransportFactory() {}
-
- /**
- * Wraps the transport into a buffered one.
- */
- virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
- boost::shared_ptr<TTransport> buffered(new TBufferedTransport(trans));
- return std::make_pair(buffered, buffered);
- }
-
-};
-
-}}}
-
-#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_
diff --git a/lib/cpp/src/transport/TFramedTransport.cpp b/lib/cpp/src/transport/TFramedTransport.cpp
deleted file mode 100644
index a05fd27..0000000
--- a/lib/cpp/src/transport/TFramedTransport.cpp
+++ /dev/null
@@ -1,121 +0,0 @@
-#include <transport/TFramedTransport.h>
-#include <netinet/in.h>
-
-using std::string;
-
-namespace facebook { namespace thrift { namespace transport {
-
-uint32_t TFramedTransport::read(uint8_t* buf, uint32_t len) {
- if (!read_) {
- return transport_->read(buf, len);
- }
-
- uint32_t need = len;
-
- // We don't have enough data yet
- if (rLen_-rPos_ < need) {
- // Copy out whatever we have
- if (rLen_-rPos_ > 0) {
- memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
- need -= rLen_-rPos_;
- buf += rLen_-rPos_;
- }
-
- // Read another chunk
- readFrame();
- }
-
- // Hand over whatever we have
- uint32_t give = need;
- if (rLen_-rPos_ < give) {
- give = rLen_-rPos_;
- }
- memcpy(buf, rBuf_+rPos_, give);
- rPos_ += give;
- need -= give;
- return (len - need);
-}
-
-void TFramedTransport::readFrame() {
- // Get rid of the old frame
- if (rBuf_ != NULL) {
- delete [] rBuf_;
- rBuf_ = NULL;
- }
-
- // Read in the next chunk size
- int32_t sz;
- transport_->readAll((uint8_t*)&sz, 4);
- sz = (int32_t)ntohl(sz);
-
- if (sz < 0) {
- throw new TTransportException("Frame size has negative value");
- }
-
- // Read the frame payload, reset markers
- rBuf_ = new uint8_t[sz];
- transport_->readAll(rBuf_, sz);
- rPos_ = 0;
- rLen_ = sz;
-}
-
-void TFramedTransport::write(const uint8_t* buf, uint32_t len) {
- if (len == 0) {
- return;
- }
-
- // Shortcut out if not write mode
- if (!write_) {
- transport_->write(buf, len);
- return;
- }
-
- // Need to grow the buffer
- if (len + wLen_ >= wBufSize_) {
-
- // Double buffer size until sufficient
- while (wBufSize_ < len + wLen_) {
- wBufSize_ *= 2;
- }
-
- // Allocate new buffer
- uint8_t* wBuf2 = new uint8_t[wBufSize_];
-
- // Copy the old buffer to the new one
- memcpy(wBuf2, wBuf_, wLen_);
-
- // Now point buf to the new one
- delete [] wBuf_;
- wBuf_ = wBuf2;
- }
-
- // Copy data into buffer
- memcpy(wBuf_ + wLen_, buf, len);
- wLen_ += len;
-}
-
-void TFramedTransport::flush() {
- if (!write_) {
- transport_->flush();
- return;
- }
-
- // Write frame size
- int32_t sz = wLen_;
- sz = (int32_t)htonl(sz);
-
- transport_->write((const uint8_t*)&sz, 4);
-
- // Write frame body
- if (sz > 0) {
- transport_->write(wBuf_, wLen_);
- }
-
- // All done
- wLen_ = 0;
-
- // Flush the underlying
- transport_->flush();
-}
-
-}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TFramedTransport.h b/lib/cpp/src/transport/TFramedTransport.h
deleted file mode 100644
index 37b82b4..0000000
--- a/lib/cpp/src/transport/TFramedTransport.h
+++ /dev/null
@@ -1,101 +0,0 @@
-#ifndef _THRIFT_TRANSPORT_TFRAMEDTRANSPORT_H_
-#define _THRIFT_TRANSPORT_TFRAMEDTRANSPORT_H_ 1
-
-#include "TTransport.h"
-#include <string>
-#include <boost/shared_ptr.hpp>
-
-namespace facebook { namespace thrift { namespace transport {
-
-using namespace boost;
-
-/**
- * Framed transport. All writes go into an in-memory buffer until flush is
- * called, at which point the transport writes the length of the entire
- * binary chunk followed by the data payload. This allows the receiver on the
- * other end to always do fixed-length reads.
- *
- * @author Mark Slee <mcslee@facebook.com>
- */
-class TFramedTransport : public TTransport {
- public:
- TFramedTransport(shared_ptr<TTransport> transport) :
- transport_(transport),
- rPos_(0),
- rLen_(0),
- read_(true),
- wBufSize_(512),
- wLen_(0),
- write_(true) {
- rBuf_ = NULL;
- wBuf_ = new uint8_t[wBufSize_];
- }
-
- TFramedTransport(shared_ptr<TTransport> transport, uint32_t sz) :
- transport_(transport),
- rPos_(0),
- rLen_(0),
- read_(true),
- wBufSize_(sz),
- wLen_(0),
- write_(true) {
- rBuf_ = NULL;
- wBuf_ = new uint8_t[wBufSize_];
- }
-
- ~TFramedTransport() {
- if (rBuf_ != NULL) {
- delete [] rBuf_;
- }
- if (wBuf_ != NULL) {
- delete [] wBuf_;
- }
- }
-
- void setRead(bool read) {
- read_ = read;
- }
-
- void setWrite(bool write) {
- write_ = write;
- }
-
- void open() {
- transport_->open();
- }
-
- bool isOpen() {
- return transport_->isOpen();
- }
-
- void close() {
- transport_->close();
- }
-
- uint32_t read(uint8_t* buf, uint32_t len);
-
- void write(const uint8_t* buf, uint32_t len);
-
- void flush();
-
- protected:
- shared_ptr<TTransport> transport_;
- uint8_t* rBuf_;
- uint32_t rPos_;
- uint32_t rLen_;
- bool read_;
-
- uint8_t* wBuf_;
- uint32_t wBufSize_;
- uint32_t wLen_;
- bool write_;
-
- /**
- * Reads a frame of input from the underlying stream.
- */
- void readFrame();
-};
-
-}}} // facebook::thrift::transport
-
-#endif // #ifndef _THRIFT_TRANSPORT_TFRAMEDTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TMemoryBuffer.cpp b/lib/cpp/src/transport/TMemoryBuffer.cpp
deleted file mode 100644
index 084f297..0000000
--- a/lib/cpp/src/transport/TMemoryBuffer.cpp
+++ /dev/null
@@ -1,45 +0,0 @@
-#include "TMemoryBuffer.h"
-
-namespace facebook { namespace thrift { namespace transport {
-
-uint32_t TMemoryBuffer::read(uint8_t* buf, uint32_t len) {
- // Check avaible data for reading
- uint32_t avail = wPos_ - rPos_;
-
- // Device how much to give
- uint32_t give = len;
- if (avail < len) {
- give = avail;
- }
-
- // Copy into buffer and increment rPos_
- memcpy(buf, buffer_ + rPos_, give);
- rPos_ += give;
-
- return give;
-}
-
-void TMemoryBuffer::write(const uint8_t* buf, uint32_t len) {
- // Check available space
- uint32_t avail = bufferSize_ - wPos_;
-
- // Grow the buffer
- if (len > avail) {
- if (!owner_) {
- throw TTransportException("Insufficient space in external MemoryBuffer");
- }
- while (len > avail) {
- bufferSize_ *= 2;
- buffer_ = (uint8_t*)realloc(buffer_, bufferSize_);
- if (buffer_ == NULL) {
- throw TTransportException("Out of memory.");
- }
- }
- }
-
- // Copy into the buffer and increment wPos_
- memcpy(buffer_ + wPos_, buf, len);
- wPos_ += len;
-}
-
-}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TMemoryBuffer.h b/lib/cpp/src/transport/TMemoryBuffer.h
deleted file mode 100644
index 9aa3135..0000000
--- a/lib/cpp/src/transport/TMemoryBuffer.h
+++ /dev/null
@@ -1,116 +0,0 @@
-#ifndef _THRIFT_TRANSPORT_TMEMORYBUFFER_H_
-#define _THRIFT_TRANSPORT_TMEMORYBUFFER_H_ 1
-
-#include "TTransport.h"
-#include <string>
-
-namespace facebook { namespace thrift { namespace transport {
-
-/**
- * A memory buffer is a tranpsort that simply reads from and writes to an
- * in memory buffer. Anytime you call write on it, the data is simply placed
- * into a buffer, and anytime you call read, data is read from that buffer.
- *
- * The buffers are allocated using C constructs malloc,realloc, and the size
- * doubles as necessary.
- *
- * @author Mark Slee <mcslee@facebook.com>
- */
-class TMemoryBuffer : public TTransport {
- public:
- TMemoryBuffer() {
- owner_ = true;
- bufferSize_ = 1024;
- buffer_ = (uint8_t*)malloc(bufferSize_);
- if (buffer_ == NULL) {
- throw TTransportException("Out of memory");
- }
- wPos_ = 0;
- rPos_ = 0;
- }
-
- TMemoryBuffer(uint32_t sz) {
- owner_ = true;
- bufferSize_ = sz;
- buffer_ = (uint8_t*)malloc(bufferSize_);
- if (buffer_ == NULL) {
- throw TTransportException("Out of memory");
- }
- wPos_ = 0;
- rPos_ = 0;
- }
-
- TMemoryBuffer(uint8_t* buf, int sz) {
- owner_ = false;
- buffer_ = buf;
- bufferSize_ = sz;
- wPos_ = sz;
- rPos_ = 0;
- }
-
- ~TMemoryBuffer() {
- if (owner_) {
- if (buffer_ != NULL) {
- free(buffer_);
- buffer_ = NULL;
- }
- }
- }
-
- bool isOpen() {
- return true;
- }
-
-
- void open() {}
-
- void close() {}
-
- void getBuffer(uint8_t** bufPtr, uint32_t* sz) {
- *bufPtr = buffer_;
- *sz = wPos_;
- }
-
- void resetBuffer() {
- wPos_ = 0;
- rPos_ = 0;
- }
-
- void resetBuffer(uint8_t* buf, uint32_t sz) {
- if (owner_) {
- if (buffer_ != NULL) {
- free(buffer_);
- }
- }
- owner_ = false;
- buffer_ = buf;
- bufferSize_ = sz;
- wPos_ = sz;
- rPos_ = 0;
- }
-
- uint32_t read(uint8_t* buf, uint32_t len);
-
- void write(const uint8_t* buf, uint32_t len);
-
- private:
- // Data buffer
- uint8_t* buffer_;
-
- // Allocated buffer size
- uint32_t bufferSize_;
-
- // Where the write is at
- uint32_t wPos_;
-
- // Where the reader is at
- uint32_t rPos_;
-
- // Is this object the owner of the buffer?
- bool owner_;
-
-};
-
-}}} // facebook::thrift::transport
-
-#endif // #ifndef _THRIFT_TRANSPORT_TBUFFEREDTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TNullTransport.h b/lib/cpp/src/transport/TNullTransport.h
deleted file mode 100644
index dd2999f..0000000
--- a/lib/cpp/src/transport/TNullTransport.h
+++ /dev/null
@@ -1,28 +0,0 @@
-#ifndef _THRIFT_TRANSPORT_TNULLTRANSPORT_H_
-#define _THRIFT_TRANSPORT_TNULLTRANSPORT_H_ 1
-
-#include "TTransport.h"
-
-namespace facebook { namespace thrift { namespace transport {
-
-/**
- * The null transport is a dummy transport that doesn't actually do anything.
- * It's sort of an analogy to /dev/null, you can never read anything from it
- * and it will let you write anything you want to it, though it won't actually
- * go anywhere.
- *
- * @author Mark Slee <mcslee@facebook.com>
- */
-class TNullTransport : public TTransport {
- public:
- TNullTransport() {}
- ~TNullTransport() {}
-
- bool isOpen() { return true; }
- void open() { }
- void write(const std::string& s) {}
-};
-
-}}} // facebook::thrift::transport
-
-#endif // #ifndef _THRIFT_TRANSPORT_TNULLTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TTransport.h b/lib/cpp/src/transport/TTransport.h
index c5456d5..7b4cbe1 100644
--- a/lib/cpp/src/transport/TTransport.h
+++ b/lib/cpp/src/transport/TTransport.h
@@ -1,7 +1,8 @@
#ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_
#define _THRIFT_TRANSPORT_TTRANSPORT_H_ 1
-#include "Thrift.h"
+#include <Thrift.h>
+#include <boost/shared_ptr.hpp>
#include <transport/TTransportException.h>
#include <string>
@@ -124,6 +125,28 @@
TTransport() {}
};
+/**
+ * Generic factory class to make an input and output transport out of a
+ * source transport. Commonly used inside servers to make input and output
+ * streams out of raw clients.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TTransportFactory {
+ public:
+ TTransportFactory() {}
+
+ virtual ~TTransportFactory() {}
+
+ /**
+ * Default implementation does nothing, just returns the transport given.
+ */
+ virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
+ return std::make_pair(trans, trans);
+ }
+
+};
+
}}} // facebook::thrift::transport
#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORT_H_
diff --git a/lib/cpp/src/transport/TTransportFactory.h b/lib/cpp/src/transport/TTransportFactory.h
deleted file mode 100644
index abd1048..0000000
--- a/lib/cpp/src/transport/TTransportFactory.h
+++ /dev/null
@@ -1,33 +0,0 @@
-#ifndef _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_
-#define _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_ 1
-
-#include <transport/TTransport.h>
-#include <boost/shared_ptr.hpp>
-
-namespace facebook { namespace thrift { namespace transport {
-
-/**
- * Generic factory class to make an input and output transport out of a
- * source transport. Commonly used inside servers to make input and output
- * streams out of raw clients.
- *
- * @author Mark Slee <mcslee@facebook.com>
- */
-class TTransportFactory {
- public:
- TTransportFactory() {}
-
- virtual ~TTransportFactory() {}
-
- /**
- * Default implementation does nothing, just returns the transport given.
- */
- virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
- return std::make_pair(trans, trans);
- }
-
-};
-
-}}}
-
-#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTFACTORY_H_
diff --git a/lib/cpp/src/transport/TTransportUtils.cpp b/lib/cpp/src/transport/TTransportUtils.cpp
new file mode 100644
index 0000000..d9f4775
--- /dev/null
+++ b/lib/cpp/src/transport/TTransportUtils.cpp
@@ -0,0 +1,219 @@
+#include <transport/TTransportUtils.h>
+
+using std::string;
+
+namespace facebook { namespace thrift { namespace transport {
+
+uint32_t TBufferedTransport::read(uint8_t* buf, uint32_t len) {
+ uint32_t need = len;
+
+ // We don't have enough data yet
+ if (rLen_-rPos_ < need) {
+ // Copy out whatever we have
+ if (rLen_-rPos_ > 0) {
+ memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
+ need -= rLen_-rPos_;
+ buf += rLen_-rPos_;
+ }
+ // Get more from underlying transport up to buffer size
+ // TODO: should this be a readAll?
+ rLen_ = transport_->read(rBuf_, rBufSize_);
+ rPos_ = 0;
+ }
+
+ // Hand over whatever we have
+ uint32_t give = need;
+ if (rLen_-rPos_ < give) {
+ give = rLen_-rPos_;
+ }
+ memcpy(buf, rBuf_+rPos_, give);
+ rPos_ += give;
+ need -= give;
+ return (len - need);
+}
+
+void TBufferedTransport::write(const uint8_t* buf, uint32_t len) {
+ if (len == 0) {
+ return;
+ }
+
+ if (len + wLen_ >= wBufSize_) {
+ uint32_t copy = wBufSize_ - wLen_;
+ memcpy(wBuf_ + wLen_, buf, copy);
+ transport_->write(wBuf_, wBufSize_);
+
+ wLen_ = len - copy;
+ if (wLen_ > 0) {
+ memcpy(wBuf_, buf+copy, wLen_);
+ }
+ } else {
+ memcpy(wBuf_+wLen_, buf, len);
+ wLen_ += len;
+ }
+}
+
+void TBufferedTransport::flush() {
+ // Write out any data waiting in the write buffer
+ if (wLen_ > 0) {
+ transport_->write(wBuf_, wLen_);
+ wLen_ = 0;
+ }
+
+ // Flush the underlying transport
+ transport_->flush();
+}
+
+uint32_t TFramedTransport::read(uint8_t* buf, uint32_t len) {
+ if (!read_) {
+ return transport_->read(buf, len);
+ }
+
+ uint32_t need = len;
+
+ // We don't have enough data yet
+ if (rLen_-rPos_ < need) {
+ // Copy out whatever we have
+ if (rLen_-rPos_ > 0) {
+ memcpy(buf, rBuf_+rPos_, rLen_-rPos_);
+ need -= rLen_-rPos_;
+ buf += rLen_-rPos_;
+ }
+
+ // Read another chunk
+ readFrame();
+ }
+
+ // Hand over whatever we have
+ uint32_t give = need;
+ if (rLen_-rPos_ < give) {
+ give = rLen_-rPos_;
+ }
+ memcpy(buf, rBuf_+rPos_, give);
+ rPos_ += give;
+ need -= give;
+ return (len - need);
+}
+
+void TFramedTransport::readFrame() {
+ // Get rid of the old frame
+ if (rBuf_ != NULL) {
+ delete [] rBuf_;
+ rBuf_ = NULL;
+ }
+
+ // Read in the next chunk size
+ int32_t sz;
+ transport_->readAll((uint8_t*)&sz, 4);
+ sz = (int32_t)ntohl(sz);
+
+ if (sz < 0) {
+ throw new TTransportException("Frame size has negative value");
+ }
+
+ // Read the frame payload, reset markers
+ rBuf_ = new uint8_t[sz];
+ transport_->readAll(rBuf_, sz);
+ rPos_ = 0;
+ rLen_ = sz;
+}
+
+void TFramedTransport::write(const uint8_t* buf, uint32_t len) {
+ if (len == 0) {
+ return;
+ }
+
+ // Shortcut out if not write mode
+ if (!write_) {
+ transport_->write(buf, len);
+ return;
+ }
+
+ // Need to grow the buffer
+ if (len + wLen_ >= wBufSize_) {
+
+ // Double buffer size until sufficient
+ while (wBufSize_ < len + wLen_) {
+ wBufSize_ *= 2;
+ }
+
+ // Allocate new buffer
+ uint8_t* wBuf2 = new uint8_t[wBufSize_];
+
+ // Copy the old buffer to the new one
+ memcpy(wBuf2, wBuf_, wLen_);
+
+ // Now point buf to the new one
+ delete [] wBuf_;
+ wBuf_ = wBuf2;
+ }
+
+ // Copy data into buffer
+ memcpy(wBuf_ + wLen_, buf, len);
+ wLen_ += len;
+}
+
+void TFramedTransport::flush() {
+ if (!write_) {
+ transport_->flush();
+ return;
+ }
+
+ // Write frame size
+ int32_t sz = wLen_;
+ sz = (int32_t)htonl(sz);
+
+ transport_->write((const uint8_t*)&sz, 4);
+
+ // Write frame body
+ if (sz > 0) {
+ transport_->write(wBuf_, wLen_);
+ }
+
+ // All done
+ wLen_ = 0;
+
+ // Flush the underlying
+ transport_->flush();
+}
+
+uint32_t TMemoryBuffer::read(uint8_t* buf, uint32_t len) {
+ // Check avaible data for reading
+ uint32_t avail = wPos_ - rPos_;
+
+ // Device how much to give
+ uint32_t give = len;
+ if (avail < len) {
+ give = avail;
+ }
+
+ // Copy into buffer and increment rPos_
+ memcpy(buf, buffer_ + rPos_, give);
+ rPos_ += give;
+
+ return give;
+}
+
+void TMemoryBuffer::write(const uint8_t* buf, uint32_t len) {
+ // Check available space
+ uint32_t avail = bufferSize_ - wPos_;
+
+ // Grow the buffer
+ if (len > avail) {
+ if (!owner_) {
+ throw TTransportException("Insufficient space in external MemoryBuffer");
+ }
+ while (len > avail) {
+ bufferSize_ *= 2;
+ buffer_ = (uint8_t*)realloc(buffer_, bufferSize_);
+ if (buffer_ == NULL) {
+ throw TTransportException("Out of memory.");
+ }
+ }
+ }
+
+ // Copy into the buffer and increment wPos_
+ memcpy(buffer_ + wPos_, buf, len);
+ wPos_ += len;
+}
+
+}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TTransportUtils.h b/lib/cpp/src/transport/TTransportUtils.h
new file mode 100644
index 0000000..8d8d093
--- /dev/null
+++ b/lib/cpp/src/transport/TTransportUtils.h
@@ -0,0 +1,316 @@
+#ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_
+#define _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_ 1
+
+#include <transport/TTransport.h>
+
+namespace facebook { namespace thrift { namespace transport {
+
+/**
+ * The null transport is a dummy transport that doesn't actually do anything.
+ * It's sort of an analogy to /dev/null, you can never read anything from it
+ * and it will let you write anything you want to it, though it won't actually
+ * go anywhere.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TNullTransport : public TTransport {
+ public:
+ TNullTransport() {}
+
+ ~TNullTransport() {}
+
+ bool isOpen() {
+ return true;
+ }
+
+ void open() {}
+
+ void write(const std::string& s) {}
+};
+
+
+/**
+ * Buffered transport. For reads it will read more data than is requested
+ * and will serve future data out of a local buffer. For writes, data is
+ * stored to an in memory buffer before being written out.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TBufferedTransport : public TTransport {
+ public:
+ TBufferedTransport(boost::shared_ptr<TTransport> transport) :
+ transport_(transport),
+ rBufSize_(512), rPos_(0), rLen_(0),
+ wBufSize_(512), wLen_(0) {
+ rBuf_ = new uint8_t[rBufSize_];
+ wBuf_ = new uint8_t[wBufSize_];
+ }
+
+ TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz) :
+ transport_(transport),
+ rBufSize_(sz), rPos_(0), rLen_(0),
+ wBufSize_(sz), wLen_(0) {
+ rBuf_ = new uint8_t[rBufSize_];
+ wBuf_ = new uint8_t[wBufSize_];
+ }
+
+ TBufferedTransport(boost::shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz) :
+ transport_(transport),
+ rBufSize_(rsz), rPos_(0), rLen_(0),
+ wBufSize_(wsz), wLen_(0) {
+ rBuf_ = new uint8_t[rBufSize_];
+ wBuf_ = new uint8_t[wBufSize_];
+ }
+
+ ~TBufferedTransport() {
+ delete [] rBuf_;
+ delete [] wBuf_;
+ }
+
+ bool isOpen() {
+ return transport_->isOpen();
+ }
+
+ void open() {
+ transport_->open();
+ }
+
+ void close() {
+ transport_->close();
+ }
+
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ void write(const uint8_t* buf, uint32_t len);
+
+ void flush();
+
+ protected:
+ boost::shared_ptr<TTransport> transport_;
+ uint8_t* rBuf_;
+ uint32_t rBufSize_;
+ uint32_t rPos_;
+ uint32_t rLen_;
+
+ uint8_t* wBuf_;
+ uint32_t wBufSize_;
+ uint32_t wLen_;
+};
+
+/**
+ * Wraps a transport into a buffered one.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TBufferedTransportFactory : public TTransportFactory {
+ public:
+ TBufferedTransportFactory() {}
+
+ virtual ~TBufferedTransportFactory() {}
+
+ /**
+ * Wraps the transport into a buffered one.
+ */
+ virtual std::pair<boost::shared_ptr<TTransport>, boost::shared_ptr<TTransport> > getIOTransports(boost::shared_ptr<TTransport> trans) {
+ boost::shared_ptr<TTransport> buffered(new TBufferedTransport(trans));
+ return std::make_pair(buffered, buffered);
+ }
+
+};
+
+/**
+ * Framed transport. All writes go into an in-memory buffer until flush is
+ * called, at which point the transport writes the length of the entire
+ * binary chunk followed by the data payload. This allows the receiver on the
+ * other end to always do fixed-length reads.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TFramedTransport : public TTransport {
+ public:
+ TFramedTransport(boost::shared_ptr<TTransport> transport) :
+ transport_(transport),
+ rPos_(0),
+ rLen_(0),
+ read_(true),
+ wBufSize_(512),
+ wLen_(0),
+ write_(true) {
+ rBuf_ = NULL;
+ wBuf_ = new uint8_t[wBufSize_];
+ }
+
+ TFramedTransport(boost::shared_ptr<TTransport> transport, uint32_t sz) :
+ transport_(transport),
+ rPos_(0),
+ rLen_(0),
+ read_(true),
+ wBufSize_(sz),
+ wLen_(0),
+ write_(true) {
+ rBuf_ = NULL;
+ wBuf_ = new uint8_t[wBufSize_];
+ }
+
+ ~TFramedTransport() {
+ if (rBuf_ != NULL) {
+ delete [] rBuf_;
+ }
+ if (wBuf_ != NULL) {
+ delete [] wBuf_;
+ }
+ }
+
+ void setRead(bool read) {
+ read_ = read;
+ }
+
+ void setWrite(bool write) {
+ write_ = write;
+ }
+
+ void open() {
+ transport_->open();
+ }
+
+ bool isOpen() {
+ return transport_->isOpen();
+ }
+
+ void close() {
+ transport_->close();
+ }
+
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ void write(const uint8_t* buf, uint32_t len);
+
+ void flush();
+
+ protected:
+ boost::shared_ptr<TTransport> transport_;
+ uint8_t* rBuf_;
+ uint32_t rPos_;
+ uint32_t rLen_;
+ bool read_;
+
+ uint8_t* wBuf_;
+ uint32_t wBufSize_;
+ uint32_t wLen_;
+ bool write_;
+
+ /**
+ * Reads a frame of input from the underlying stream.
+ */
+ void readFrame();
+};
+
+/**
+ * A memory buffer is a tranpsort that simply reads from and writes to an
+ * in memory buffer. Anytime you call write on it, the data is simply placed
+ * into a buffer, and anytime you call read, data is read from that buffer.
+ *
+ * The buffers are allocated using C constructs malloc,realloc, and the size
+ * doubles as necessary.
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+class TMemoryBuffer : public TTransport {
+ public:
+ TMemoryBuffer() {
+ owner_ = true;
+ bufferSize_ = 1024;
+ buffer_ = (uint8_t*)malloc(bufferSize_);
+ if (buffer_ == NULL) {
+ throw TTransportException("Out of memory");
+ }
+ wPos_ = 0;
+ rPos_ = 0;
+ }
+
+ TMemoryBuffer(uint32_t sz) {
+ owner_ = true;
+ bufferSize_ = sz;
+ buffer_ = (uint8_t*)malloc(bufferSize_);
+ if (buffer_ == NULL) {
+ throw TTransportException("Out of memory");
+ }
+ wPos_ = 0;
+ rPos_ = 0;
+ }
+
+ TMemoryBuffer(uint8_t* buf, int sz) {
+ owner_ = false;
+ buffer_ = buf;
+ bufferSize_ = sz;
+ wPos_ = sz;
+ rPos_ = 0;
+ }
+
+ ~TMemoryBuffer() {
+ if (owner_) {
+ if (buffer_ != NULL) {
+ free(buffer_);
+ buffer_ = NULL;
+ }
+ }
+ }
+
+ bool isOpen() {
+ return true;
+ }
+
+
+ void open() {}
+
+ void close() {}
+
+ void getBuffer(uint8_t** bufPtr, uint32_t* sz) {
+ *bufPtr = buffer_;
+ *sz = wPos_;
+ }
+
+ void resetBuffer() {
+ wPos_ = 0;
+ rPos_ = 0;
+ }
+
+ void resetBuffer(uint8_t* buf, uint32_t sz) {
+ if (owner_) {
+ if (buffer_ != NULL) {
+ free(buffer_);
+ }
+ }
+ owner_ = false;
+ buffer_ = buf;
+ bufferSize_ = sz;
+ wPos_ = sz;
+ rPos_ = 0;
+ }
+
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ void write(const uint8_t* buf, uint32_t len);
+
+ private:
+ // Data buffer
+ uint8_t* buffer_;
+
+ // Allocated buffer size
+ uint32_t bufferSize_;
+
+ // Where the write is at
+ uint32_t wPos_;
+
+ // Where the reader is at
+ uint32_t rPos_;
+
+ // Is this object the owner of the buffer?
+ bool owner_;
+
+};
+
+}}} // facebook::thrift::transport
+
+
+#endif // #ifndef _THRIFT_TRANSPORT_TTRANSPORTUTILS_H_