More boosification of thrift driver, server, transport and protocol code
Modified TestServer to use thread-pool manager
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664737 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index 1ec11d4..cee5327 100644
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -1,6 +1,5 @@
-#lib_LTLIBRARIES = libthrift.la
-
-lib_LIBRARIES = libthrift.a
+lib_LTLIBRARIES = libthrift.la
+#lib_LIBRARIES = libthrift.a
common_cxxflags = -Isrc $(BOOST_CPPFLAGS)
common_ldflags = $(BOOST_LDFLAGS)
@@ -19,8 +18,8 @@
src/server/TSimpleServer.cc \
src/server/TThreadPoolServer.cc
-libthrift_a_SOURCES = $(libthrift_sources)
-#libthrift_la_SOURCES = $(libthrift_sources)
+#libthrift_a_SOURCES = $(libthrift_sources)
+libthrift_la_SOURCES = $(libthrift_sources)
libthrift_cxxflags = $(common_cxxflags)
libthrift_ldflags = $(common_ldflags)
@@ -28,14 +27,27 @@
libthrift_la_CXXFLAGS = $(libthrift_cxxflags)
libthrift_a_CXXFLAGS = $(libthrift_cxxflags)
-libthrift_inst_headers = src/concurrency/Exception.h \
+include_thriftdir = $(includedir)/thrift
+include_thrift_HEADERS = \
+ src/Thrift.h \
+ src/TProcessor.h
+
+include_concurrencydir = $(includedir)/thrift/concurrency
+include_concurrency_HEADERS = \
+ src/concurrency/Exception.h \
src/concurrency/Monitor.h \
src/concurrency/PosixThreadFactory.h \
src/concurrency/Thread.h \
src/concurrency/ThreadManager.h \
- src/concurrency/TimerManager.h \
+ src/concurrency/TimerManager.h
+
+include_protocoldir = $(includedir)/thrift/protocol
+include_protocol_HEADERS = \
src/protocol/TBinaryProtocol.h \
- src/protocol/TProtocol.h \
+ src/protocol/TProtocol.h
+
+include_transportdir = $(includedir)/thrift/transport
+include_transport_HEADERS = \
src/transport/TBufferedTransport.h \
src/transport/TChunkedTransport.h \
src/transport/TNullTransport.h \
@@ -43,16 +55,16 @@
src/transport/TServerTransport.h \
src/transport/TSocket.h \
src/transport/TTransport.h \
- src/transport/TTransport.h \
- src/transport/TTransportException.h \
+ src/transport/TTransportException.h
+
+include_serverdir = $(includedir)/thrift/server
+include_server_HEADERS = \
+ src/server/TServer.h \
src/server/TSimpleServer.h \
src/server/TThreadPoolServer.h
bin_PROGRAMS = concurrency_test
-include_HEADERS = $(libconcurrency_inst_headers) \
- $(libthrift_inst_headers)
-
concurrency_test_SOURCES = src/concurrency/test/Tests.cc \
src/concurrency/test/ThreadFactoryTests.h \
src/concurrency/test/ThreadManagerTests.h \
diff --git a/lib/cpp/src/TProcessor.h b/lib/cpp/src/TProcessor.h
index 2ed2430..ce27539 100644
--- a/lib/cpp/src/TProcessor.h
+++ b/lib/cpp/src/TProcessor.h
@@ -2,10 +2,13 @@
#define T_PROCESSOR_H
#include <string>
-#include "transport/TTransport.h"
+#include <transport/TTransport.h>
+#include <boost/shared_ptr.hpp>
namespace facebook { namespace thrift {
+using namespace boost;
+
using namespace facebook::thrift::transport;
/**
@@ -19,8 +22,8 @@
class TProcessor {
public:
virtual ~TProcessor() {}
- virtual bool process(TTransport* in, TTransport *out) = 0;
- virtual bool process(TTransport* io) { return process(io, io); }
+ virtual bool process(shared_ptr<TTransport> in, shared_ptr<TTransport> out) = 0;
+ virtual bool process(shared_ptr<TTransport> io) { return process(io, io); }
protected:
TProcessor() {}
};
diff --git a/lib/cpp/src/protocol/TBinaryProtocol.cc b/lib/cpp/src/protocol/TBinaryProtocol.cc
index ed482b8..fef8ab4 100644
--- a/lib/cpp/src/protocol/TBinaryProtocol.cc
+++ b/lib/cpp/src/protocol/TBinaryProtocol.cc
@@ -3,16 +3,28 @@
namespace facebook { namespace thrift { namespace protocol {
-uint32_t TBinaryProtocol::writeStructBegin(TTransport* out,
+uint32_t TBinaryProtocol::writeMessageBegin(shared_ptr<TTransport> out,
+ const TMessageType messageType,
+ const uint32_t seqid) const {
+ return
+ writeByte(out, (uint8_t)messageType) +
+ writeU32(out, seqid);
+}
+
+uint32_t TBinaryProtocol::writeMessageEnd(shared_ptr<TTransport> out) const {
+ return 0;
+}
+
+uint32_t TBinaryProtocol::writeStructBegin(shared_ptr<TTransport> out,
const string& name) const {
return 0;
}
-uint32_t TBinaryProtocol::writeStructEnd(TTransport* out) const {
+uint32_t TBinaryProtocol::writeStructEnd(shared_ptr<TTransport> out) const {
return 0;
}
-uint32_t TBinaryProtocol::writeFieldBegin(TTransport* out,
+uint32_t TBinaryProtocol::writeFieldBegin(shared_ptr<TTransport> out,
const string& name,
const TType fieldType,
const uint16_t fieldId) const {
@@ -21,16 +33,16 @@
writeI32(out, (int32_t)fieldId);
}
-uint32_t TBinaryProtocol::writeFieldEnd(TTransport* out) const {
+uint32_t TBinaryProtocol::writeFieldEnd(shared_ptr<TTransport> out) const {
return 0;
}
-uint32_t TBinaryProtocol::writeFieldStop(TTransport* out) const {
+uint32_t TBinaryProtocol::writeFieldStop(shared_ptr<TTransport> out) const {
return
writeByte(out, (uint8_t)T_STOP);
}
-uint32_t TBinaryProtocol::writeMapBegin(TTransport* out,
+uint32_t TBinaryProtocol::writeMapBegin(shared_ptr<TTransport> out,
const TType keyType,
const TType valType,
const int32_t size) const {
@@ -40,11 +52,11 @@
writeI32(out, (int32_t)size);
}
-uint32_t TBinaryProtocol::writeMapEnd(TTransport* out) const {
+uint32_t TBinaryProtocol::writeMapEnd(shared_ptr<TTransport> out) const {
return 0;
}
-uint32_t TBinaryProtocol::writeListBegin(TTransport* out,
+uint32_t TBinaryProtocol::writeListBegin(shared_ptr<TTransport> out,
const TType elemType,
const int32_t size) const {
return
@@ -52,11 +64,11 @@
writeI32(out, (int32_t)size);
}
-uint32_t TBinaryProtocol::writeListEnd(TTransport* out) const {
+uint32_t TBinaryProtocol::writeListEnd(shared_ptr<TTransport> out) const {
return 0;
}
-uint32_t TBinaryProtocol::writeSetBegin(TTransport* out,
+uint32_t TBinaryProtocol::writeSetBegin(shared_ptr<TTransport> out,
const TType elemType,
const int32_t size) const {
return
@@ -64,45 +76,45 @@
writeI32(out, (int32_t)size);
}
-uint32_t TBinaryProtocol::writeSetEnd(TTransport* out) const {
+uint32_t TBinaryProtocol::writeSetEnd(shared_ptr<TTransport> out) const {
return 0;
}
-uint32_t TBinaryProtocol::writeByte(TTransport* out,
+uint32_t TBinaryProtocol::writeByte(shared_ptr<TTransport> out,
const uint8_t byte) const {
out->write(&byte, 1);
return 1;
}
-uint32_t TBinaryProtocol::writeU32(TTransport* out,
+uint32_t TBinaryProtocol::writeU32(shared_ptr<TTransport> out,
const uint32_t u32) const {
uint32_t net = (uint32_t)htonl(u32);
out->write((uint8_t*)&net, 4);
return 4;
}
-uint32_t TBinaryProtocol::writeI32(TTransport* out,
+uint32_t TBinaryProtocol::writeI32(shared_ptr<TTransport> out,
const int32_t i32) const {
int32_t net = (int32_t)htonl(i32);
out->write((uint8_t*)&net, 4);
return 4;
}
-uint32_t TBinaryProtocol::writeU64(TTransport* out,
+uint32_t TBinaryProtocol::writeU64(shared_ptr<TTransport> out,
const uint64_t u64) const {
uint64_t net = (uint64_t)htonll(u64);
out->write((uint8_t*)&net, 8);
return 8;
}
-uint32_t TBinaryProtocol::writeI64(TTransport* out,
+uint32_t TBinaryProtocol::writeI64(shared_ptr<TTransport> out,
const int64_t i64) const {
int64_t net = (int64_t)htonll(i64);
out->write((uint8_t*)&net, 8);
return 8;
}
-uint32_t TBinaryProtocol::writeString(TTransport* out,
+uint32_t TBinaryProtocol::writeString(shared_ptr<TTransport> out,
const string& str) const {
uint32_t result = writeI32(out, str.size());
out->write((uint8_t*)str.data(), str.size());
@@ -113,17 +125,33 @@
* Reading functions
*/
-uint32_t TBinaryProtocol::readStructBegin(TTransport* in,
+uint32_t TBinaryProtocol::readMessasgeBegin(shared_ptr<TTransport> in,
+ TMessageType& messageType,
+ uint32_t& seqid) const {
+
+ uint32_t result = 0;
+ uint8_t type;
+ result+= readByte(in, type);
+ messageType = (TMessageType)type;
+ result+= readU32(in, seqid);
+ return result;
+}
+
+uint32_t TBinaryProtocol::readMessageEnd(shared_ptr<TTransport> in) const{
+ return 0;
+}
+
+uint32_t TBinaryProtocol::readStructBegin(shared_ptr<TTransport> in,
string& name) const {
name = "";
return 0;
}
-uint32_t TBinaryProtocol::readStructEnd(TTransport* in) const {
+uint32_t TBinaryProtocol::readStructEnd(shared_ptr<TTransport> in) const {
return 0;
}
-uint32_t TBinaryProtocol::readFieldBegin(TTransport* in,
+uint32_t TBinaryProtocol::readFieldBegin(shared_ptr<TTransport> in,
string& name,
TType& fieldType,
uint16_t& fieldId) const {
@@ -141,11 +169,11 @@
return result;
}
-uint32_t TBinaryProtocol::readFieldEnd(TTransport* in) const {
+uint32_t TBinaryProtocol::readFieldEnd(shared_ptr<TTransport> in) const {
return 0;
}
-uint32_t TBinaryProtocol::readMapBegin(TTransport* in,
+uint32_t TBinaryProtocol::readMapBegin(shared_ptr<TTransport> in,
TType& keyType,
TType& valType,
int32_t& size) const {
@@ -159,11 +187,11 @@
return result;
}
-uint32_t TBinaryProtocol::readMapEnd(TTransport* in) const {
+uint32_t TBinaryProtocol::readMapEnd(shared_ptr<TTransport> in) const {
return 0;
}
-uint32_t TBinaryProtocol::readListBegin(TTransport* in,
+uint32_t TBinaryProtocol::readListBegin(shared_ptr<TTransport> in,
TType& elemType,
int32_t& size) const {
uint8_t e;
@@ -174,11 +202,11 @@
return result;
}
-uint32_t TBinaryProtocol::readListEnd(TTransport* in) const {
+uint32_t TBinaryProtocol::readListEnd(shared_ptr<TTransport> in) const {
return 0;
}
-uint32_t TBinaryProtocol::readSetBegin(TTransport* in,
+uint32_t TBinaryProtocol::readSetBegin(shared_ptr<TTransport> in,
TType& elemType,
int32_t& size) const {
uint8_t e;
@@ -189,11 +217,11 @@
return result;
}
-uint32_t TBinaryProtocol::readSetEnd(TTransport* in) const {
+uint32_t TBinaryProtocol::readSetEnd(shared_ptr<TTransport> in) const {
return 0;
}
-uint32_t TBinaryProtocol::readByte(TTransport* in,
+uint32_t TBinaryProtocol::readByte(shared_ptr<TTransport> in,
uint8_t& byte) const {
uint8_t b[1];
in->readAll(b, 1);
@@ -201,7 +229,7 @@
return 1;
}
-uint32_t TBinaryProtocol::readU32(TTransport* in,
+uint32_t TBinaryProtocol::readU32(shared_ptr<TTransport> in,
uint32_t& u32) const {
uint8_t b[4];
in->readAll(b, 4);
@@ -210,7 +238,7 @@
return 4;
}
-uint32_t TBinaryProtocol::readI32(TTransport* in,
+uint32_t TBinaryProtocol::readI32(shared_ptr<TTransport> in,
int32_t& i32) const {
uint8_t b[4];
in->readAll(b, 4);
@@ -219,7 +247,7 @@
return 4;
}
-uint32_t TBinaryProtocol::readU64(TTransport* in,
+uint32_t TBinaryProtocol::readU64(shared_ptr<TTransport> in,
uint64_t& u64) const {
uint8_t b[8];
in->readAll(b, 8);
@@ -228,7 +256,7 @@
return 8;
}
-uint32_t TBinaryProtocol::readI64(TTransport* in,
+uint32_t TBinaryProtocol::readI64(shared_ptr<TTransport> in,
int64_t& i64) const {
uint8_t b[8];
in->readAll(b, 8);
@@ -237,7 +265,7 @@
return 8;
}
-uint32_t TBinaryProtocol::readString(TTransport* in,
+uint32_t TBinaryProtocol::readString(shared_ptr<TTransport> in,
string& str) const {
uint32_t result;
int32_t size;
diff --git a/lib/cpp/src/protocol/TBinaryProtocol.h b/lib/cpp/src/protocol/TBinaryProtocol.h
index 0f0560a..3456abf 100644
--- a/lib/cpp/src/protocol/TBinaryProtocol.h
+++ b/lib/cpp/src/protocol/TBinaryProtocol.h
@@ -1,10 +1,14 @@
#ifndef T_BINARY_PROTOCOL_H
#define T_BINARY_PROTOCOL_H
-#include "protocol/TProtocol.h"
+#include <protocol/TProtocol.h>
+
+#include <boost/shared_ptr.hpp>
namespace facebook { namespace thrift { namespace protocol {
+using namespace boost;
+
/**
* The default binary protocol for thrift. Writes all data in a very basic
* binary format, essentially just spitting out the raw bytes.
@@ -12,7 +16,7 @@
* @author Mark Slee <mcslee@facebook.com>
*/
class TBinaryProtocol : public TProtocol {
- public:
+ public:
TBinaryProtocol() {}
~TBinaryProtocol() {}
@@ -20,108 +24,122 @@
* Writing functions.
*/
- uint32_t writeStructBegin (TTransport* out,
+ virtual uint32_t writeMessageBegin(shared_ptr<TTransport> out,
+ const TMessageType messageType,
+ const uint32_t seqid) const;
+
+ virtual uint32_t writeMessageEnd (shared_ptr<TTransport> out) const;
+
+
+ uint32_t writeStructBegin (shared_ptr<TTransport> out,
const std::string& name) const;
- uint32_t writeStructEnd (TTransport* out) const;
+ uint32_t writeStructEnd (shared_ptr<TTransport> out) const;
- uint32_t writeFieldBegin (TTransport* out,
+ uint32_t writeFieldBegin (shared_ptr<TTransport> out,
const std::string& name,
const TType fieldType,
const uint16_t fieldId) const;
- uint32_t writeFieldEnd (TTransport* out) const;
+ uint32_t writeFieldEnd (shared_ptr<TTransport> out) const;
- uint32_t writeFieldStop (TTransport* out) const;
+ uint32_t writeFieldStop (shared_ptr<TTransport> out) const;
- uint32_t writeMapBegin (TTransport* out,
+ uint32_t writeMapBegin (shared_ptr<TTransport> out,
const TType keyType,
const TType valType,
const int32_t size) const;
- uint32_t writeMapEnd (TTransport* out) const;
+ uint32_t writeMapEnd (shared_ptr<TTransport> out) const;
- uint32_t writeListBegin (TTransport* out,
+ uint32_t writeListBegin (shared_ptr<TTransport> out,
const TType elemType,
const int32_t size) const;
- uint32_t writeListEnd (TTransport* out) const;
+ uint32_t writeListEnd (shared_ptr<TTransport> out) const;
- uint32_t writeSetBegin (TTransport* out,
+ uint32_t writeSetBegin (shared_ptr<TTransport> out,
const TType elemType,
const int32_t size) const;
- uint32_t writeSetEnd (TTransport* out) const;
+ uint32_t writeSetEnd (shared_ptr<TTransport> out) const;
- uint32_t writeByte (TTransport* out,
+ uint32_t writeByte (shared_ptr<TTransport> out,
const uint8_t byte) const;
- uint32_t writeU32 (TTransport* out,
+ uint32_t writeU32 (shared_ptr<TTransport> out,
const uint32_t u32) const;
- uint32_t writeI32 (TTransport* out,
+ uint32_t writeI32 (shared_ptr<TTransport> out,
const int32_t i32) const;
- uint32_t writeU64 (TTransport* out,
+ uint32_t writeU64 (shared_ptr<TTransport> out,
const uint64_t u64) const;
- uint32_t writeI64 (TTransport* out,
+ uint32_t writeI64 (shared_ptr<TTransport> out,
const int64_t i64) const;
- uint32_t writeString (TTransport* out,
+ uint32_t writeString (shared_ptr<TTransport> out,
const std::string& str) const;
/**
* Reading functions
*/
- uint32_t readStructBegin (TTransport* in,
+
+ uint32_t readMessasgeBegin (shared_ptr<TTransport> in,
+ TMessageType& messageType,
+ uint32_t& seqid) const;
+
+ uint32_t readMessageEnd (shared_ptr<TTransport> in) const;
+
+ uint32_t readStructBegin (shared_ptr<TTransport> in,
std::string& name) const;
- uint32_t readStructEnd (TTransport* in) const;
+ uint32_t readStructEnd (shared_ptr<TTransport> in) const;
- uint32_t readFieldBegin (TTransport* in,
+ uint32_t readFieldBegin (shared_ptr<TTransport> in,
std::string& name,
TType& fieldType,
uint16_t& fieldId) const;
- uint32_t readFieldEnd (TTransport* in) const;
+ uint32_t readFieldEnd (shared_ptr<TTransport> in) const;
- uint32_t readMapBegin (TTransport* in,
+ uint32_t readMapBegin (shared_ptr<TTransport> in,
TType& keyType,
TType& valType,
int32_t& size) const;
- uint32_t readMapEnd (TTransport* in) const;
+ uint32_t readMapEnd (shared_ptr<TTransport> in) const;
- uint32_t readListBegin (TTransport* in,
+ uint32_t readListBegin (shared_ptr<TTransport> in,
TType& elemType,
int32_t& size) const;
- uint32_t readListEnd (TTransport* in) const;
+ uint32_t readListEnd (shared_ptr<TTransport> in) const;
- uint32_t readSetBegin (TTransport* in,
+ uint32_t readSetBegin (shared_ptr<TTransport> in,
TType& elemType,
int32_t& size) const;
- uint32_t readSetEnd (TTransport* in) const;
+ uint32_t readSetEnd (shared_ptr<TTransport> in) const;
- uint32_t readByte (TTransport* in,
+ uint32_t readByte (shared_ptr<TTransport> in,
uint8_t& byte) const;
- uint32_t readU32 (TTransport* in,
+ uint32_t readU32 (shared_ptr<TTransport> in,
uint32_t& u32) const;
- uint32_t readI32 (TTransport* in,
+ uint32_t readI32 (shared_ptr<TTransport> in,
int32_t& i32) const;
- uint32_t readU64 (TTransport* in,
+ uint32_t readU64 (shared_ptr<TTransport> in,
uint64_t& u64) const;
- uint32_t readI64 (TTransport* in,
+ uint32_t readI64 (shared_ptr<TTransport> in,
int64_t& i64) const;
- uint32_t readString (TTransport* in,
+ uint32_t readString (shared_ptr<TTransport> in,
std::string& str) const;
};
diff --git a/lib/cpp/src/protocol/TProtocol.h b/lib/cpp/src/protocol/TProtocol.h
index 40beadd..33a6eb7 100644
--- a/lib/cpp/src/protocol/TProtocol.h
+++ b/lib/cpp/src/protocol/TProtocol.h
@@ -1,15 +1,19 @@
#ifndef T_PROTOCOL_H
#define T_PROTOCOL_H
+#include <transport/TTransport.h>
+
+#include <boost/shared_ptr.hpp>
+
#include <netinet/in.h>
#include <sys/types.h>
#include <string>
#include <map>
-#include "transport/TTransport.h"
-
namespace facebook { namespace thrift { namespace protocol {
+using namespace boost;
+
using namespace facebook::thrift::transport;
#define ntohll(x) (((uint64_t)(ntohl((int)((x << 32) >> 32))) << 32) | (uint32_t)ntohl(((int)(x >> 32))))
@@ -41,6 +45,14 @@
};
/**
+ * Enumerated definition of the message types that the Thrift protocol supports.
+ */
+enum TMessageType {
+ T_CALL = 1,
+ T_REPLY = 2
+};
+
+/**
* Abstract class for a thrift protocol driver. These are all the methods that
* a protocol must implement. Essentially, there must be some way of reading
* and writing all the base types, plus a mechanism for writing out structs
@@ -60,114 +72,127 @@
* Writing functions.
*/
- virtual uint32_t writeStructBegin (TTransport* out,
+ virtual uint32_t writeMessageBegin (shared_ptr<TTransport> out,
+ const TMessageType messageType,
+ const uint32_t seqid) const = 0;
+
+ virtual uint32_t writeMessageEnd (shared_ptr<TTransport> out) const = 0;
+
+
+ virtual uint32_t writeStructBegin (shared_ptr<TTransport> out,
const std::string& name) const = 0;
- virtual uint32_t writeStructEnd (TTransport* out) const = 0;
+ virtual uint32_t writeStructEnd (shared_ptr<TTransport> out) const = 0;
- virtual uint32_t writeFieldBegin (TTransport* out,
+ virtual uint32_t writeFieldBegin (shared_ptr<TTransport> out,
const std::string& name,
const TType fieldType,
const uint16_t fieldId) const = 0;
- virtual uint32_t writeFieldEnd (TTransport* out) const = 0;
+ virtual uint32_t writeFieldEnd (shared_ptr<TTransport> out) const = 0;
- virtual uint32_t writeFieldStop (TTransport* out) const = 0;
+ virtual uint32_t writeFieldStop (shared_ptr<TTransport> out) const = 0;
- virtual uint32_t writeMapBegin (TTransport* out,
+ virtual uint32_t writeMapBegin (shared_ptr<TTransport> out,
const TType keyType,
const TType valType,
const int32_t size) const = 0;
- virtual uint32_t writeMapEnd (TTransport* out) const = 0;
+ virtual uint32_t writeMapEnd (shared_ptr<TTransport> out) const = 0;
- virtual uint32_t writeListBegin (TTransport* out,
+ virtual uint32_t writeListBegin (shared_ptr<TTransport> out,
const TType elemType,
const int32_t size) const = 0;
- virtual uint32_t writeListEnd (TTransport* out) const = 0;
+ virtual uint32_t writeListEnd (shared_ptr<TTransport> out) const = 0;
- virtual uint32_t writeSetBegin (TTransport* out,
+ virtual uint32_t writeSetBegin (shared_ptr<TTransport> out,
const TType elemType,
const int32_t size) const = 0;
- virtual uint32_t writeSetEnd (TTransport* out) const = 0;
+ virtual uint32_t writeSetEnd (shared_ptr<TTransport> out) const = 0;
- virtual uint32_t writeByte (TTransport* out,
+ virtual uint32_t writeByte (shared_ptr<TTransport> out,
const uint8_t byte) const = 0;
- virtual uint32_t writeU32 (TTransport* out,
+ virtual uint32_t writeU32 (shared_ptr<TTransport> out,
const uint32_t u32) const = 0;
- virtual uint32_t writeI32 (TTransport* out,
+ virtual uint32_t writeI32 (shared_ptr<TTransport> out,
const int32_t i32) const = 0;
- virtual uint32_t writeU64 (TTransport* out,
+ virtual uint32_t writeU64 (shared_ptr<TTransport> out,
const uint64_t u64) const = 0;
- virtual uint32_t writeI64 (TTransport* out,
+ virtual uint32_t writeI64 (shared_ptr<TTransport> out,
const int64_t i64) const = 0;
- virtual uint32_t writeString (TTransport* out,
+ virtual uint32_t writeString (shared_ptr<TTransport> out,
const std::string& str) const = 0;
/**
* Reading functions
*/
- virtual uint32_t readStructBegin (TTransport* in,
+ virtual uint32_t readMessasgeBegin (shared_ptr<TTransport> in,
+ TMessageType& messageType,
+ uint32_t& seqid) const = 0;
+
+ virtual uint32_t readMessageEnd (shared_ptr<TTransport> in) const = 0;
+
+ virtual uint32_t readStructBegin (shared_ptr<TTransport> in,
std::string& name) const = 0;
- virtual uint32_t readStructEnd (TTransport* in) const = 0;
+ virtual uint32_t readStructEnd (shared_ptr<TTransport> in) const = 0;
- virtual uint32_t readFieldBegin (TTransport* in,
+ virtual uint32_t readFieldBegin (shared_ptr<TTransport> in,
std::string& name,
TType& fieldType,
uint16_t& fieldId) const = 0;
- virtual uint32_t readFieldEnd (TTransport* in) const = 0;
+ virtual uint32_t readFieldEnd (shared_ptr<TTransport> in) const = 0;
- virtual uint32_t readMapBegin (TTransport* in,
+ virtual uint32_t readMapBegin (shared_ptr<TTransport> in,
TType& keyType,
TType& valType,
int32_t& size) const = 0;
- virtual uint32_t readMapEnd (TTransport* in) const = 0;
+ virtual uint32_t readMapEnd (shared_ptr<TTransport> in) const = 0;
- virtual uint32_t readListBegin (TTransport* in,
+ virtual uint32_t readListBegin (shared_ptr<TTransport> in,
TType& elemType,
int32_t& size) const = 0;
- virtual uint32_t readListEnd (TTransport* in) const = 0;
+ virtual uint32_t readListEnd (shared_ptr<TTransport> in) const = 0;
- virtual uint32_t readSetBegin (TTransport* in,
+ virtual uint32_t readSetBegin (shared_ptr<TTransport> in,
TType& elemType,
int32_t& size) const = 0;
- virtual uint32_t readSetEnd (TTransport* in) const = 0;
+ virtual uint32_t readSetEnd (shared_ptr<TTransport> in) const = 0;
- virtual uint32_t readByte (TTransport* in,
+ virtual uint32_t readByte (shared_ptr<TTransport> in,
uint8_t& byte) const = 0;
- virtual uint32_t readU32 (TTransport* in,
+ virtual uint32_t readU32 (shared_ptr<TTransport> in,
uint32_t& u32) const = 0;
- virtual uint32_t readI32 (TTransport* in,
+ virtual uint32_t readI32 (shared_ptr<TTransport> in,
int32_t& i32) const = 0;
- virtual uint32_t readU64 (TTransport* in,
+ virtual uint32_t readU64 (shared_ptr<TTransport> in,
uint64_t& u64) const = 0;
- virtual uint32_t readI64 (TTransport* in,
+ virtual uint32_t readI64 (shared_ptr<TTransport> in,
int64_t& i64) const = 0;
- virtual uint32_t readString (TTransport* in,
+ virtual uint32_t readString (shared_ptr<TTransport> in,
std::string& str) const = 0;
/**
* Method to arbitrarily skip over data.
*/
- uint32_t skip(TTransport* in, TType type) const {
+ uint32_t skip(shared_ptr<TTransport> in, TType type) const {
switch (type) {
case T_BYTE:
{
diff --git a/lib/cpp/src/protocol/protocol.txt b/lib/cpp/src/protocol/protocol.txt
index d6db0ab..d66b7eb 100644
--- a/lib/cpp/src/protocol/protocol.txt
+++ b/lib/cpp/src/protocol/protocol.txt
@@ -25,13 +25,7 @@
package-name : STRING
service-name : STRING
- arguments : ARGS_b arg-list ARGS_e
- arg-list : arg-list arg | NIL
- arg : ARG_b arg-identifier arg-value ARG_e
- arg-identifier : arg-name | arg-id | arg-name arg-id
- arg-name : STRING
- arg-id : UINT32
- arg-value : datum
+ arguments : struct-datum
""" service function reply message body """
@@ -79,16 +73,10 @@
""" collection datum """
- collection-type-specifier : ARRAY | MAP | SET | LIST
+ collection-type-specifier : MAP | SET | LIST
collection-datum : list-datum | set-datum | map-datum
- array-datum : simple-array-datum | complex-array-datum
-
- simple-array-datum : ARRAY_b element-count simple-type-specifier simple-data ARRAY_e
-
- complex-array-datum : ARRAY_b element-count simple-type-specifier simple-data ARRAY_e
-
list-datum : LIST_b element-count element-type-specifier elements LIST_e
element-count : UINT32
diff --git a/lib/cpp/src/server/TServer.h b/lib/cpp/src/server/TServer.h
index f34944b..68728f2 100644
--- a/lib/cpp/src/server/TServer.h
+++ b/lib/cpp/src/server/TServer.h
@@ -1,11 +1,14 @@
#ifndef T_SERVER_H
#define T_SERVER_H
-#include "TProcessor.h"
+#include <TProcessor.h>
+
+#include <boost/shared_ptr.hpp>
namespace facebook { namespace thrift { namespace server {
using namespace facebook::thrift;
+using namespace boost;
class TServerOptions;
@@ -15,25 +18,25 @@
* @author Mark Slee <mcslee@facebook.com>
*/
class TServer {
- public:
+public:
virtual ~TServer() {}
virtual void run() = 0;
-
- protected:
- TServer(TProcessor* processor, TServerOptions* options) :
+
+protected:
+ TServer(shared_ptr<TProcessor> processor, shared_ptr<TServerOptions> options) :
processor_(processor), options_(options) {}
-
- TProcessor* processor_;
- TServerOptions* options_;
+
+ shared_ptr<TProcessor> processor_;
+ shared_ptr<TServerOptions> options_;
};
-
+
/**
* Class to encapsulate all generic server options.
*/
class TServerOptions {
- public:
+public:
// TODO(mcslee): Fill in getters/setters here
- protected:
+protected:
// TODO(mcslee): Fill data members in here
};
diff --git a/lib/cpp/src/server/TSimpleServer.cc b/lib/cpp/src/server/TSimpleServer.cc
index 7199ab9..2ad5145 100644
--- a/lib/cpp/src/server/TSimpleServer.cc
+++ b/lib/cpp/src/server/TSimpleServer.cc
@@ -13,7 +13,8 @@
* @author Mark Slee <mcslee@facebook.com>
*/
void TSimpleServer::run() {
- TTransport* client = NULL;
+
+ shared_ptr<TTransport> client;
try {
// Start the server listening
@@ -29,8 +30,8 @@
client = serverTransport_->accept();
if (client != NULL) {
// Process for as long as we can keep the processor happy!
- TBufferedTransport bufferedClient(client);
- while (processor_->process(&bufferedClient)) {}
+ shared_ptr<TBufferedTransport> bufferedClient(new TBufferedTransport(client));
+ while (processor_->process(bufferedClient)) {}
}
} catch (TTransportException& ttx) {
if (client != NULL) {
@@ -43,13 +44,7 @@
// Ensure no resource leaks
client->close();
-
- // Ensure no memory leaks
- delete client;
-
- // Ensure we don't try to double-free on the next pass
- client = NULL;
- }
+ }
}
// TODO(mcslee): Could this be a timeout case? Or always the real thing?
diff --git a/lib/cpp/src/server/TSimpleServer.h b/lib/cpp/src/server/TSimpleServer.h
index a4a2d98..a8242d4 100644
--- a/lib/cpp/src/server/TSimpleServer.h
+++ b/lib/cpp/src/server/TSimpleServer.h
@@ -16,9 +16,9 @@
*/
class TSimpleServer : public TServer {
public:
- TSimpleServer(TProcessor* processor,
- TServerOptions* options,
- TServerTransport* serverTransport) :
+ TSimpleServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerOptions> options,
+ shared_ptr<TServerTransport> serverTransport) :
TServer(processor, options), serverTransport_(serverTransport) {}
~TSimpleServer() {}
@@ -26,7 +26,7 @@
void run();
protected:
- TServerTransport* serverTransport_;
+ shared_ptr<TServerTransport> serverTransport_;
};
}}} // facebook::thrift::server
diff --git a/lib/cpp/src/server/TThreadPoolServer.cc b/lib/cpp/src/server/TThreadPoolServer.cc
index 2d6290c..d53d174 100644
--- a/lib/cpp/src/server/TThreadPoolServer.cc
+++ b/lib/cpp/src/server/TThreadPoolServer.cc
@@ -12,87 +12,78 @@
using namespace facebook::thrift::concurrency;
using namespace facebook::thrift::transport;
-class TThreadPoolServer : public TServer {
-
- class Task: public Runnable {
+class TThreadPoolServer::Task: public Runnable {
- TProcessor* _processor;
- TTransport* _transport;
- TBufferedTransport _bufferedTransport;
+ shared_ptr<TProcessor> _processor;
+ shared_ptr<TTransport> _transport;
+ shared_ptr<TBufferedTransport> _bufferedTransport;
- public:
+public:
- Task(TProcessor* processor,
- TTransport* transport) :
- _processor(processor),
- _transport(transport),
- _bufferedTransport(_transport) {
- }
-
- ~Task() {
- delete _transport;
- }
-
- void run() {
-
- while(true) {
-
- try {
- _processor->process(&_bufferedTransport);
-
- } catch (TTransportException& ttx) {
-
- break;
-
- } catch(...) {
-
- break;
- }
- }
-
- _bufferedTransport.close();
- }
- };
-
- TThreadPoolServer(TProcessor* processor,
- TServerOptions* options,
- TServerTransport* serverTransport,
- ThreadManager* threadManager) :
- TServer(processor, options),
- serverTransport_(serverTransport),
- threadManager_(threadManager) {
+ Task(shared_ptr<TProcessor> processor,
+ shared_ptr<TTransport> transport) :
+ _processor(processor),
+ _transport(transport),
+ _bufferedTransport(new TBufferedTransport(transport)) {
}
+
+ ~Task() {}
- ~TThreadPoolServer() {}
-
- protected:
-
- TServerTransport* serverTransport_;
- ThreadManager* threadManager_;
-
void run() {
-
- try {
- // Start the server listening
- serverTransport_->listen();
- } catch (TTransportException& ttx) {
- cerr << "TThreadPoolServer::run() listen(): " << ttx.getMessage() << endl;
- return;
- }
-
- // Fetch client from server
-
- while (true) {
-
+
+ while(true) {
+
try {
-
- threadManager_->add(shared_ptr<Task>(new TThreadPoolServer::Task(processor_, serverTransport_->accept())));;
-
+ _processor->process(_bufferedTransport);
+
} catch (TTransportException& ttx) {
+
+ break;
+
+ } catch(...) {
+
break;
}
}
+
+ _bufferedTransport->close();
}
};
+
+TThreadPoolServer::TThreadPoolServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerOptions> options,
+ shared_ptr<TServerTransport> serverTransport,
+ shared_ptr<ThreadManager> threadManager) :
+ TServer(processor, options),
+ serverTransport_(serverTransport),
+ threadManager_(threadManager) {
+}
+
+TThreadPoolServer::~TThreadPoolServer() {}
+
+void TThreadPoolServer::run() {
+
+ try {
+ // Start the server listening
+ serverTransport_->listen();
+ } catch (TTransportException& ttx) {
+ cerr << "TThreadPoolServer::run() listen(): " << ttx.getMessage() << endl;
+ return;
+ }
+
+ // Fetch client from server
+
+ while (true) {
+
+ try {
+
+ threadManager_->add(shared_ptr<TThreadPoolServer::Task>(new TThreadPoolServer::Task(processor_,
+ shared_ptr<TTransport>(serverTransport_->accept()))));
+
+ } catch (TTransportException& ttx) {
+ break;
+ }
+ }
+}
}}} // facebook::thrift::server
diff --git a/lib/cpp/src/server/TThreadPoolServer.h b/lib/cpp/src/server/TThreadPoolServer.h
index ef63a37..827491d 100644
--- a/lib/cpp/src/server/TThreadPoolServer.h
+++ b/lib/cpp/src/server/TThreadPoolServer.h
@@ -1,10 +1,39 @@
#ifndef T_THREADPOOL_SERVER_H
#define T_THREADPOOL_SERVER_H
-#include "server/TServer.h"
-#include "transport/TServerTransport.h"
+#include <concurrency/ThreadManager.h>
+#include <server/TServer.h>
+#include <transport/TServerTransport.h>
+
+#include <boost/shared_ptr.hpp>
namespace facebook { namespace thrift { namespace server {
+
+using namespace facebook::thrift::concurrency;
+using namespace facebook::thrift::transport;
+using namespace boost;
+
+class TThreadPoolServer : public TServer {
+public:
+
+ class Task;
+
+ TThreadPoolServer(shared_ptr<TProcessor> processor,
+ shared_ptr<TServerOptions> options,
+ shared_ptr<TServerTransport> serverTransport,
+ shared_ptr<ThreadManager> threadManager);
+
+ virtual ~TThreadPoolServer();
+
+ virtual void run();
+
+protected:
+
+ shared_ptr<TServerTransport> serverTransport_;
+ shared_ptr<ThreadManager> threadManager_;
+
+};
+
}}} // facebook::thrift::server
#endif
diff --git a/lib/cpp/src/transport/TBufferedTransport.h b/lib/cpp/src/transport/TBufferedTransport.h
index b8153fe..922754e 100644
--- a/lib/cpp/src/transport/TBufferedTransport.h
+++ b/lib/cpp/src/transport/TBufferedTransport.h
@@ -1,11 +1,15 @@
#ifndef T_BUFFERED_TRANSPORT_H
#define T_BUFFERED_TRANSPORT_H
-#include "transport/TTransport.h"
+#include <transport/TTransport.h>
#include <string>
+#include <boost/shared_ptr.hpp>
+
namespace facebook { namespace thrift { namespace transport {
+using namespace boost;
+
/**
* Buffered transport. For reads it will read more data than is requested
* and will serve future data out of a local buffer. For writes, data is
@@ -15,7 +19,7 @@
*/
class TBufferedTransport : public TTransport {
public:
- TBufferedTransport(TTransport* transport) :
+ TBufferedTransport(shared_ptr<TTransport> transport) :
transport_(transport),
rBufSize_(512), rPos_(0), rLen_(0),
wBufSize_(512), wLen_(0) {
@@ -23,7 +27,7 @@
wBuf_ = new uint8_t[wBufSize_];
}
- TBufferedTransport(TTransport* transport, uint32_t sz) :
+ TBufferedTransport(shared_ptr<TTransport> transport, uint32_t sz) :
transport_(transport),
rBufSize_(sz), rPos_(0), rLen_(0),
wBufSize_(sz), wLen_(0) {
@@ -31,7 +35,7 @@
wBuf_ = new uint8_t[wBufSize_];
}
- TBufferedTransport(TTransport* transport, uint32_t rsz, uint32_t wsz) :
+ TBufferedTransport(shared_ptr<TTransport> transport, uint32_t rsz, uint32_t wsz) :
transport_(transport),
rBufSize_(rsz), rPos_(0), rLen_(0),
wBufSize_(wsz), wLen_(0) {
@@ -67,7 +71,7 @@
void flush();
protected:
- TTransport* transport_;
+ shared_ptr<TTransport> transport_;
uint8_t* rBuf_;
uint32_t rBufSize_;
uint32_t rPos_;
diff --git a/lib/cpp/src/transport/TChunkedTransport.h b/lib/cpp/src/transport/TChunkedTransport.h
index 16f9e0e..0fe8d75 100644
--- a/lib/cpp/src/transport/TChunkedTransport.h
+++ b/lib/cpp/src/transport/TChunkedTransport.h
@@ -1,11 +1,14 @@
#ifndef T_CHUNKED_TRANSPORT_H
#define T_CHUNKED_TRANSPORT_H
-#include "transport/TTransport.h"
+#include <transport/TTransport.h>
#include <string>
+#include <boost/shared_ptr.hpp>
namespace facebook { namespace thrift { namespace transport {
+using namespace boost;
+
/**
* Chunked transport. All writes go into an in-memory buffer until flush is
* called, at which point the transport writes the length of the entire
@@ -16,7 +19,7 @@
*/
class TChunkedTransport : public TTransport {
public:
- TChunkedTransport(TTransport* transport) :
+ TChunkedTransport(shared_ptr<TTransport> transport) :
transport_(transport),
rPos_(0), rLen_(0),
wBufSize_(512), wLen_(0) {
@@ -24,7 +27,7 @@
wBuf_ = new uint8_t[wBufSize_];
}
- TChunkedTransport(TTransport* transport, uint32_t sz) :
+ TChunkedTransport(shared_ptr<TTransport> transport, uint32_t sz) :
transport_(transport),
rPos_(0), rLen_(0),
wBufSize_(sz), wLen_(0) {
@@ -60,7 +63,7 @@
void flush();
protected:
- TTransport* transport_;
+ shared_ptr<TTransport> transport_;
uint8_t* rBuf_;
uint32_t rPos_;
uint32_t rLen_;
diff --git a/lib/cpp/src/transport/TServerSocket.cc b/lib/cpp/src/transport/TServerSocket.cc
index 21230d9..003ddec 100644
--- a/lib/cpp/src/transport/TServerSocket.cc
+++ b/lib/cpp/src/transport/TServerSocket.cc
@@ -4,9 +4,12 @@
#include "transport/TSocket.h"
#include "transport/TServerSocket.h"
+#include <boost/shared_ptr.hpp>
namespace facebook { namespace thrift { namespace transport {
+using namespace boost;
+
TServerSocket::TServerSocket(int port) :
port_(port), serverSocket_(0), acceptBacklog_(1024) {}
@@ -64,7 +67,7 @@
// The socket is now listening!
}
-TTransport* TServerSocket::acceptImpl() {
+shared_ptr<TTransport> TServerSocket::acceptImpl() {
if (serverSocket_ <= 0) {
throw TTransportException(TTX_NOT_OPEN, "TServerSocket not listening");
}
@@ -80,7 +83,7 @@
throw TTransportException(TTX_UNKNOWN, "ERROR:" + errno);
}
- return new TSocket(clientSocket);
+ return shared_ptr<TTransport>(new TSocket(clientSocket));
}
void TServerSocket::close() {
diff --git a/lib/cpp/src/transport/TServerSocket.h b/lib/cpp/src/transport/TServerSocket.h
index c18a8d2..619a366 100644
--- a/lib/cpp/src/transport/TServerSocket.h
+++ b/lib/cpp/src/transport/TServerSocket.h
@@ -1,7 +1,8 @@
#ifndef T_SERVER_SOCKET_H
#define T_SERVER_SOCKET_H
-#include "transport/TServerTransport.h"
+#include <transport/TServerTransport.h>
+#include <boost/shared_ptr.hpp>
namespace facebook { namespace thrift { namespace transport {
@@ -22,7 +23,7 @@
void close();
protected:
- TTransport* acceptImpl();
+ shared_ptr<TTransport> acceptImpl();
private:
diff --git a/lib/cpp/src/transport/TServerTransport.h b/lib/cpp/src/transport/TServerTransport.h
index 9bf74d1..f51e88c 100644
--- a/lib/cpp/src/transport/TServerTransport.h
+++ b/lib/cpp/src/transport/TServerTransport.h
@@ -3,9 +3,12 @@
#include "transport/TTransport.h"
#include "transport/TTransportException.h"
+#include <boost/shared_ptr.hpp>
namespace facebook { namespace thrift { namespace transport {
+using namespace boost;
+
/**
* Server transport framework. A server needs to have some facility for
* creating base transports to read/write from.
@@ -34,8 +37,8 @@
* @return A new TTransport object
* @throws TTransportException if there is an error
*/
- TTransport* accept() {
- TTransport* result = acceptImpl();
+ shared_ptr<TTransport> accept() {
+ shared_ptr<TTransport> result = acceptImpl();
if (result == NULL) {
throw TTransportException("accept() may not return NULL");
}
@@ -56,7 +59,7 @@
* @return A newly allocated TTransport object
* @throw TTransportException If an error occurs
*/
- virtual TTransport* acceptImpl() = 0;
+ virtual shared_ptr<TTransport> acceptImpl() = 0;
};