THRIFT-2423 Facebook's THeader protocol and transport for cpp
Client: C++ Library, Compiler
Patch: Dave Watson rebased by Nobuaki Sukegawa
This closes #357 and closes #677
diff --git a/doc/specs/HeaderFormat.md b/doc/specs/HeaderFormat.md
new file mode 100644
index 0000000..42ec7ae
--- /dev/null
+++ b/doc/specs/HeaderFormat.md
@@ -0,0 +1,82 @@
+<link href="http://kevinburke.bitbucket.org/markdowncss/markdown.css" rel="stylesheet"></link>
+
+Header format for the THeader.h
+===============================
+
+ 0 1 2 3 4 5 6 7 8 9 a b c d e f 0 1 2 3 4 5 6 7 8 9 a b c d e f
+ +----------------------------------------------------------------+
+ | 0| LENGTH |
+ +----------------------------------------------------------------+
+ | 0| HEADER MAGIC | FLAGS |
+ +----------------------------------------------------------------+
+ | SEQUENCE NUMBER |
+ +----------------------------------------------------------------+
+ | 0| Header Size(/32) | ...
+ +---------------------------------
+
+ Header is of variable size:
+ (and starts at offset 14)
+
+ +----------------------------------------------------------------+
+ | PROTOCOL ID (varint) | NUM TRANSFORMS (varint) |
+ +----------------------------------------------------------------+
+ | TRANSFORM 0 ID (varint) | TRANSFORM 0 DATA ...
+ +----------------------------------------------------------------+
+ | ... ... |
+ +----------------------------------------------------------------+
+ | INFO 0 ID (varint) | INFO 0 DATA ...
+ +----------------------------------------------------------------+
+ | ... ... |
+ +----------------------------------------------------------------+
+ | |
+ | PAYLOAD |
+ | |
+ +----------------------------------------------------------------+
+
+The `LENGTH` field is 32 bits, and counts the remaining bytes in the
+packet, NOT including the length field. The header size field is 16
+bits, and defines the size of the header remaining NOT including the
+`HEADER MAGIC`, `FLAGS`, `SEQUENCE NUMBER` and header size fields. The
+Header size field is in bytes/4.
+
+The transform ID's are varints. The data for each transform is
+defined by the transform ID in the code - no size is given in the
+header. If a transform ID is specified from a client and the server
+doesn't know about the transform ID, an error MUST be returned as we
+don't know how to transform the data.
+
+Conversely, data in the info headers is ignorable. This should only
+be things like timestamps, debuging tracing, etc. Using the header
+size you should be able to skip this data and read the payload safely
+if you don't know the info ID.
+
+Info's should be oldest supported to newest supported order, so that
+if we read an info ID we don't support, none of the remaining info
+ID's will be supported either, and we can safely skip to the payload.
+
+Info ID's and transform ID's should share the same ID space.
+
+### PADDING:
+
+Header will be padded out to next 4-byte boundary with `0x00`.
+
+Max frame size is `0x3FFFFFFF`, which is slightly less than `HTTP_MAGIC`.
+This allows us to distingush between different (older) transports.
+
+### Transform IDs:
+
+ ZLIB_TRANSFORM 0x01 - No data for this. Use zlib to (de)compress the
+ data.
+
+ HMAC_TRANSFORM 0x02 - Variable amount of mac data. One byte to specify
+ size. Mac data is appended at the end of the packet.
+ SNAPPY_TRANSFORM 0x03 - No data for this. Use snappy to (de)compress the
+ data.
+
+
+###Info IDs:
+
+ INFO_KEYVALUE 0x01 - varint32 number of headers.
+ - key/value pairs of varstrings (varint16 length plus
+ no-trailing-null string).
+
diff --git a/lib/c_glib/test/CMakeLists.txt b/lib/c_glib/test/CMakeLists.txt
index 31e6c6b..1b32c46 100644
--- a/lib/c_glib/test/CMakeLists.txt
+++ b/lib/c_glib/test/CMakeLists.txt
@@ -116,7 +116,7 @@
include_directories("${CMAKE_CURRENT_BINARY_DIR}/gen-cpp" "${CMAKE_CURRENT_BINARY_DIR}/gen-c_glib")
add_executable(testthrifttestclient testthrifttestclient.cpp)
- target_link_libraries(testthrifttestclient testgenc testgenc_cpp)
+ target_link_libraries(testthrifttestclient testgenc testgenc_cpp ${ZLIB_LIBRARIES})
add_test(NAME testthrifttestclient COMMAND testthrifttestclient)
endif(BUILD_CPP)
diff --git a/lib/cpp/CMakeLists.txt b/lib/cpp/CMakeLists.txt
index 4c7caeb..a0b9743 100755
--- a/lib/cpp/CMakeLists.txt
+++ b/lib/cpp/CMakeLists.txt
@@ -46,6 +46,7 @@
src/thrift/protocol/TJSONProtocol.cpp
src/thrift/protocol/TMultiplexedProtocol.cpp
src/thrift/protocol/TProtocol.cpp
+ src/thrift/protocol/THeaderProtocol.cpp
src/thrift/transport/TTransportException.cpp
src/thrift/transport/TFDTransport.cpp
src/thrift/transport/TSimpleFileTransport.cpp
@@ -57,6 +58,7 @@
src/thrift/transport/TServerSocket.cpp
src/thrift/transport/TTransportUtils.cpp
src/thrift/transport/TBufferTransports.cpp
+ src/thrift/transport/THeaderTransport.cpp
src/thrift/server/TConnectedClient.cpp
src/thrift/server/TServerFramework.cpp
src/thrift/server/TSimpleServer.cpp
@@ -146,6 +148,8 @@
# Thrift zlib server
set( thriftcppz_SOURCES
src/thrift/transport/TZlibTransport.cpp
+ src/thrift/protocol/THeaderProtocol.cpp
+ src/thrift/transport/THeaderTransport.cpp
)
# Thrift Qt4 server
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index 4742ee0..80e8917 100755
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -114,7 +114,10 @@
src/thrift/async/TEvhttpServer.cpp \
src/thrift/async/TEvhttpClientChannel.cpp
-libthriftz_la_SOURCES = src/thrift/transport/TZlibTransport.cpp
+libthriftz_la_SOURCES = src/thrift/transport/TZlibTransport.cpp \
+ src/thrift/transport/THeaderTransport.cpp \
+ src/thrift/protocol/THeaderProtocol.cpp
+
libthriftqt_la_MOC = src/thrift/qt/moc_TQTcpServer.cpp
nodist_libthriftqt_la_SOURCES = $(libthriftqt_la_MOC)
@@ -183,11 +186,13 @@
src/thrift/protocol/TCompactProtocol.h \
src/thrift/protocol/TCompactProtocol.tcc \
src/thrift/protocol/TDebugProtocol.h \
+ src/thrift/protocol/THeaderProtocol.h \
src/thrift/protocol/TBase64Utils.h \
src/thrift/protocol/TJSONProtocol.h \
src/thrift/protocol/TMultiplexedProtocol.h \
src/thrift/protocol/TProtocolDecorator.h \
src/thrift/protocol/TProtocolTap.h \
+ src/thrift/protocol/TProtocolTypes.h \
src/thrift/protocol/TProtocolException.h \
src/thrift/protocol/TVirtualProtocol.h \
src/thrift/protocol/TProtocol.h
@@ -197,6 +202,7 @@
src/thrift/transport/PlatformSocket.h \
src/thrift/transport/TFDTransport.h \
src/thrift/transport/TFileTransport.h \
+ src/thrift/transport/THeaderTransport.h \
src/thrift/transport/TSimpleFileTransport.h \
src/thrift/transport/TServerSocket.h \
src/thrift/transport/TSSLServerSocket.h \
diff --git a/lib/cpp/src/thrift/protocol/TBinaryProtocol.h b/lib/cpp/src/thrift/protocol/TBinaryProtocol.h
index e0650cf..92491b0 100644
--- a/lib/cpp/src/thrift/protocol/TBinaryProtocol.h
+++ b/lib/cpp/src/thrift/protocol/TBinaryProtocol.h
@@ -36,12 +36,11 @@
*/
template <class Transport_, class ByteOrder_ = TNetworkBigEndian>
class TBinaryProtocolT : public TVirtualProtocol<TBinaryProtocolT<Transport_, ByteOrder_> > {
-protected:
+public:
static const int32_t VERSION_MASK = ((int32_t)0xffff0000);
static const int32_t VERSION_1 = ((int32_t)0x80010000);
// VERSION_2 (0x80020000) was taken by TDenseProtocol (which has since been removed)
-public:
TBinaryProtocolT(boost::shared_ptr<Transport_> trans)
: TVirtualProtocol<TBinaryProtocolT<Transport_, ByteOrder_> >(trans),
trans_(trans.get()),
diff --git a/lib/cpp/src/thrift/protocol/THeaderProtocol.cpp b/lib/cpp/src/thrift/protocol/THeaderProtocol.cpp
new file mode 100644
index 0000000..76732b0
--- /dev/null
+++ b/lib/cpp/src/thrift/protocol/THeaderProtocol.cpp
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef THRIFT_PROTOCOL_THEADERPROTOCOL_CPP_
+#define THRIFT_PROTOCOL_THEADERPROTOCOL_CPP_ 1
+
+#include <thrift/protocol/THeaderProtocol.h>
+#include <thrift/protocol/TCompactProtocol.h>
+#include <thrift/TApplicationException.h>
+
+#include <limits>
+#include <boost/static_assert.hpp>
+
+namespace apache {
+namespace thrift {
+namespace protocol {
+
+void THeaderProtocol::resetProtocol() {
+ if (proto_ && protoId_ == trans_->getProtocolId()) {
+ return;
+ }
+
+ protoId_ = trans_->getProtocolId();
+
+ switch (protoId_) {
+ case T_BINARY_PROTOCOL:
+ proto_ = boost::make_shared<TBinaryProtocolT<THeaderTransport> >(trans_);
+ break;
+
+ case T_COMPACT_PROTOCOL:
+ proto_ = boost::make_shared<TCompactProtocolT<THeaderTransport> >(trans_);
+ break;
+
+ default:
+ throw TApplicationException(TApplicationException::INVALID_PROTOCOL,
+ "Unknown protocol requested");
+ }
+}
+
+uint32_t THeaderProtocol::writeMessageBegin(const std::string& name,
+ const TMessageType messageType,
+ const int32_t seqId) {
+ resetProtocol(); // Reset in case we changed protocols
+ trans_->setSequenceNumber(seqId);
+ return proto_->writeMessageBegin(name, messageType, seqId);
+}
+
+uint32_t THeaderProtocol::writeMessageEnd() {
+ return proto_->writeMessageEnd();
+}
+
+uint32_t THeaderProtocol::writeStructBegin(const char* name) {
+ return proto_->writeStructBegin(name);
+}
+
+uint32_t THeaderProtocol::writeStructEnd() {
+ return proto_->writeStructEnd();
+}
+
+uint32_t THeaderProtocol::writeFieldBegin(const char* name,
+ const TType fieldType,
+ const int16_t fieldId) {
+ return proto_->writeFieldBegin(name, fieldType, fieldId);
+}
+
+uint32_t THeaderProtocol::writeFieldEnd() {
+ return proto_->writeFieldEnd();
+}
+
+uint32_t THeaderProtocol::writeFieldStop() {
+ return proto_->writeFieldStop();
+}
+
+uint32_t THeaderProtocol::writeMapBegin(const TType keyType,
+ const TType valType,
+ const uint32_t size) {
+ return proto_->writeMapBegin(keyType, valType, size);
+}
+
+uint32_t THeaderProtocol::writeMapEnd() {
+ return proto_->writeMapEnd();
+}
+
+uint32_t THeaderProtocol::writeListBegin(const TType elemType, const uint32_t size) {
+ return proto_->writeListBegin(elemType, size);
+}
+
+uint32_t THeaderProtocol::writeListEnd() {
+ return proto_->writeListEnd();
+}
+
+uint32_t THeaderProtocol::writeSetBegin(const TType elemType, const uint32_t size) {
+ return proto_->writeSetBegin(elemType, size);
+}
+
+uint32_t THeaderProtocol::writeSetEnd() {
+ return proto_->writeSetEnd();
+}
+
+uint32_t THeaderProtocol::writeBool(const bool value) {
+ return proto_->writeBool(value);
+}
+
+uint32_t THeaderProtocol::writeByte(const int8_t byte) {
+ return proto_->writeByte(byte);
+}
+
+uint32_t THeaderProtocol::writeI16(const int16_t i16) {
+ return proto_->writeI16(i16);
+}
+
+uint32_t THeaderProtocol::writeI32(const int32_t i32) {
+ return proto_->writeI32(i32);
+}
+
+uint32_t THeaderProtocol::writeI64(const int64_t i64) {
+ return proto_->writeI64(i64);
+}
+
+uint32_t THeaderProtocol::writeDouble(const double dub) {
+ return proto_->writeDouble(dub);
+}
+
+uint32_t THeaderProtocol::writeString(const std::string& str) {
+ return proto_->writeString(str);
+}
+
+uint32_t THeaderProtocol::writeBinary(const std::string& str) {
+ return proto_->writeBinary(str);
+}
+
+/**
+ * Reading functions
+ */
+
+uint32_t THeaderProtocol::readMessageBegin(std::string& name,
+ TMessageType& messageType,
+ int32_t& seqId) {
+ // Read the next frame, and change protocols if needed
+ try {
+ trans_->resetProtocol();
+ resetProtocol();
+ } catch (const TApplicationException& ex) {
+ writeMessageBegin("", T_EXCEPTION, 0);
+ ex.write((TProtocol*)this);
+ writeMessageEnd();
+ trans_->flush();
+
+ // The framing is still good, but we don't know about this protocol.
+ // In the future, this could be made a client-side only error if
+ // connection pooling is used.
+ throw ex;
+ }
+ return proto_->readMessageBegin(name, messageType, seqId);
+}
+
+uint32_t THeaderProtocol::readMessageEnd() {
+ return proto_->readMessageEnd();
+}
+
+uint32_t THeaderProtocol::readStructBegin(std::string& name) {
+ return proto_->readStructBegin(name);
+}
+
+uint32_t THeaderProtocol::readStructEnd() {
+ return proto_->readStructEnd();
+}
+
+uint32_t THeaderProtocol::readFieldBegin(std::string& name, TType& fieldType, int16_t& fieldId) {
+ return proto_->readFieldBegin(name, fieldType, fieldId);
+}
+
+uint32_t THeaderProtocol::readFieldEnd() {
+ return proto_->readFieldEnd();
+}
+
+uint32_t THeaderProtocol::readMapBegin(TType& keyType, TType& valType, uint32_t& size) {
+ return proto_->readMapBegin(keyType, valType, size);
+}
+
+uint32_t THeaderProtocol::readMapEnd() {
+ return proto_->readMapEnd();
+}
+
+uint32_t THeaderProtocol::readListBegin(TType& elemType, uint32_t& size) {
+ return proto_->readListBegin(elemType, size);
+}
+
+uint32_t THeaderProtocol::readListEnd() {
+ return proto_->readListEnd();
+}
+
+uint32_t THeaderProtocol::readSetBegin(TType& elemType, uint32_t& size) {
+ return proto_->readSetBegin(elemType, size);
+}
+
+uint32_t THeaderProtocol::readSetEnd() {
+ return proto_->readSetEnd();
+}
+
+uint32_t THeaderProtocol::readBool(bool& value) {
+ return proto_->readBool(value);
+}
+
+uint32_t THeaderProtocol::readByte(int8_t& byte) {
+ return proto_->readByte(byte);
+}
+
+uint32_t THeaderProtocol::readI16(int16_t& i16) {
+ return proto_->readI16(i16);
+}
+
+uint32_t THeaderProtocol::readI32(int32_t& i32) {
+ return proto_->readI32(i32);
+}
+
+uint32_t THeaderProtocol::readI64(int64_t& i64) {
+ return proto_->readI64(i64);
+}
+
+uint32_t THeaderProtocol::readDouble(double& dub) {
+ return proto_->readDouble(dub);
+}
+
+uint32_t THeaderProtocol::readString(std::string& str) {
+ return proto_->readString(str);
+}
+
+uint32_t THeaderProtocol::readBinary(std::string& binary) {
+ return proto_->readBinary(binary);
+}
+}
+}
+} // apache::thrift::protocol
+
+#endif // #ifndef THRIFT_PROTOCOL_THEADERPROTOCOL_CPP_
diff --git a/lib/cpp/src/thrift/protocol/THeaderProtocol.h b/lib/cpp/src/thrift/protocol/THeaderProtocol.h
new file mode 100644
index 0000000..3998f48
--- /dev/null
+++ b/lib/cpp/src/thrift/protocol/THeaderProtocol.h
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef THRIFT_PROTOCOL_THEADERPROTOCOL_H_
+#define THRIFT_PROTOCOL_THEADERPROTOCOL_H_ 1
+
+#include <thrift/protocol/TProtocol.h>
+#include <thrift/protocol/TProtocolTypes.h>
+#include <thrift/protocol/TVirtualProtocol.h>
+#include <thrift/transport/THeaderTransport.h>
+
+#include <bitset>
+
+#include <boost/shared_ptr.hpp>
+#include <boost/make_shared.hpp>
+using apache::thrift::transport::THeaderTransport;
+
+namespace apache {
+namespace thrift {
+namespace protocol {
+
+/**
+ * The header protocol for thrift. Reads unframed, framed, header format,
+ * and http
+ *
+ */
+class THeaderProtocol : public TVirtualProtocol<THeaderProtocol> {
+protected:
+public:
+ void resetProtocol();
+
+ explicit THeaderProtocol(const boost::shared_ptr<TTransport>& trans,
+ uint16_t protoId = T_COMPACT_PROTOCOL)
+ : TVirtualProtocol<THeaderProtocol>(boost::shared_ptr<TTransport>(new THeaderTransport(trans))),
+ trans_(boost::dynamic_pointer_cast<THeaderTransport>(this->getTransport())),
+ protoId_(protoId) {
+ trans_->setProtocolId(protoId);
+ resetProtocol();
+ }
+
+ THeaderProtocol(const boost::shared_ptr<TTransport>& inTrans,
+ const boost::shared_ptr<TTransport>& outTrans,
+ uint16_t protoId = T_COMPACT_PROTOCOL)
+ : TVirtualProtocol<THeaderProtocol>(
+ boost::shared_ptr<TTransport>(new THeaderTransport(inTrans, outTrans))),
+ trans_(boost::dynamic_pointer_cast<THeaderTransport>(this->getTransport())),
+ protoId_(protoId) {
+ trans_->setProtocolId(protoId);
+ resetProtocol();
+ }
+
+ ~THeaderProtocol() {}
+
+ /**
+ * Functions to work with headers by calling into THeaderTransport
+ */
+ void setProtocolId(uint16_t protoId) {
+ trans_->setProtocolId(protoId);
+ resetProtocol();
+ }
+
+ typedef THeaderTransport::StringToStringMap StringToStringMap;
+
+ // these work with write headers
+ void setHeader(const std::string& key, const std::string& value) {
+ trans_->setHeader(key, value);
+ }
+
+ void clearHeaders() { trans_->clearHeaders(); }
+
+ StringToStringMap& getWriteHeaders() { return trans_->getWriteHeaders(); }
+
+ // these work with read headers
+ const StringToStringMap& getHeaders() const { return trans_->getHeaders(); }
+
+ /**
+ * Writing functions.
+ */
+
+ /*ol*/ uint32_t writeMessageBegin(const std::string& name,
+ const TMessageType messageType,
+ const int32_t seqId);
+
+ /*ol*/ uint32_t writeMessageEnd();
+
+ uint32_t writeStructBegin(const char* name);
+
+ uint32_t writeStructEnd();
+
+ uint32_t writeFieldBegin(const char* name, const TType fieldType, const int16_t fieldId);
+
+ uint32_t writeFieldEnd();
+
+ uint32_t writeFieldStop();
+
+ uint32_t writeMapBegin(const TType keyType, const TType valType, const uint32_t size);
+
+ uint32_t writeMapEnd();
+
+ uint32_t writeListBegin(const TType elemType, const uint32_t size);
+
+ uint32_t writeListEnd();
+
+ uint32_t writeSetBegin(const TType elemType, const uint32_t size);
+
+ uint32_t writeSetEnd();
+
+ uint32_t writeBool(const bool value);
+
+ uint32_t writeByte(const int8_t byte);
+
+ uint32_t writeI16(const int16_t i16);
+
+ uint32_t writeI32(const int32_t i32);
+
+ uint32_t writeI64(const int64_t i64);
+
+ uint32_t writeDouble(const double dub);
+
+ uint32_t writeString(const std::string& str);
+
+ uint32_t writeBinary(const std::string& str);
+
+ /**
+ * Reading functions
+ */
+
+ /*ol*/ uint32_t readMessageBegin(std::string& name, TMessageType& messageType, int32_t& seqId);
+
+ /*ol*/ uint32_t readMessageEnd();
+
+ uint32_t readStructBegin(std::string& name);
+
+ uint32_t readStructEnd();
+
+ uint32_t readFieldBegin(std::string& name, TType& fieldType, int16_t& fieldId);
+
+ uint32_t readFieldEnd();
+
+ uint32_t readMapBegin(TType& keyType, TType& valType, uint32_t& size);
+
+ uint32_t readMapEnd();
+
+ uint32_t readListBegin(TType& elemType, uint32_t& size);
+
+ uint32_t readListEnd();
+
+ uint32_t readSetBegin(TType& elemType, uint32_t& size);
+
+ uint32_t readSetEnd();
+
+ uint32_t readBool(bool& value);
+ // Provide the default readBool() implementation for std::vector<bool>
+ using TVirtualProtocol<THeaderProtocol>::readBool;
+
+ uint32_t readByte(int8_t& byte);
+
+ uint32_t readI16(int16_t& i16);
+
+ uint32_t readI32(int32_t& i32);
+
+ uint32_t readI64(int64_t& i64);
+
+ uint32_t readDouble(double& dub);
+
+ uint32_t readString(std::string& str);
+
+ uint32_t readBinary(std::string& binary);
+
+protected:
+ boost::shared_ptr<THeaderTransport> trans_;
+
+ boost::shared_ptr<TProtocol> proto_;
+ uint32_t protoId_;
+};
+
+class THeaderProtocolFactory : public TProtocolFactory {
+public:
+ virtual boost::shared_ptr<TProtocol> getProtocol(boost::shared_ptr<transport::TTransport> trans) {
+ THeaderProtocol* headerProtocol
+ = new THeaderProtocol(trans, boost::shared_ptr<transport::TTransport>(), T_BINARY_PROTOCOL);
+ return boost::shared_ptr<TProtocol>(headerProtocol);
+ }
+
+ virtual boost::shared_ptr<TProtocol> getProtocol(
+ boost::shared_ptr<transport::TTransport> inTrans,
+ boost::shared_ptr<transport::TTransport> outTrans) {
+ THeaderProtocol* headerProtocol = new THeaderProtocol(inTrans, outTrans, T_BINARY_PROTOCOL);
+ return boost::shared_ptr<TProtocol>(headerProtocol);
+ }
+};
+}
+}
+} // apache::thrift::protocol
+
+#endif // #ifndef THRIFT_PROTOCOL_THEADERPROTOCOL_H_
diff --git a/lib/cpp/src/thrift/protocol/TProtocol.h b/lib/cpp/src/thrift/protocol/TProtocol.h
index 0db2216..1b46faf 100644
--- a/lib/cpp/src/thrift/protocol/TProtocol.h
+++ b/lib/cpp/src/thrift/protocol/TProtocol.h
@@ -595,6 +595,11 @@
virtual ~TProtocolFactory();
virtual boost::shared_ptr<TProtocol> getProtocol(boost::shared_ptr<TTransport> trans) = 0;
+ virtual boost::shared_ptr<TProtocol> getProtocol(boost::shared_ptr<TTransport> inTrans,
+ boost::shared_ptr<TTransport> outTrans) {
+ (void)outTrans;
+ return getProtocol(inTrans);
+ }
};
/**
diff --git a/lib/cpp/src/thrift/protocol/TProtocolTypes.h b/lib/cpp/src/thrift/protocol/TProtocolTypes.h
new file mode 100644
index 0000000..ca22b54
--- /dev/null
+++ b/lib/cpp/src/thrift/protocol/TProtocolTypes.h
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef THRIFT_PROTOCOL_TPROTOCOLTYPES_H_
+#define THRIFT_PROTOCOL_TPROTOCOLTYPES_H_ 1
+
+namespace apache { namespace thrift { namespace protocol {
+
+enum PROTOCOL_TYPES {
+ T_BINARY_PROTOCOL = 0,
+ T_JSON_PROTOCOL = 1,
+ T_COMPACT_PROTOCOL = 2,
+};
+
+}}} // apache::thrift::protocol
+
+#endif // #define _THRIFT_PROTOCOL_TPROTOCOLTYPES_H_ 1
+
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index d1e9ede..ede34c4 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -392,8 +392,14 @@
factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(outputTransport_);
// Create protocol
- inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
- outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
+ if (server_->getHeaderTransport()) {
+ inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_,
+ factoryOutputTransport_);
+ outputProtocol_ = inputProtocol_;
+ } else {
+ inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
+ outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
+ }
// Set up for any server event handler
serverEventHandler_ = server_->getEventHandler();
@@ -535,6 +541,14 @@
}
}
+bool TNonblockingServer::getHeaderTransport() {
+ // Currently if there is no output protocol factory,
+ // we assume header transport (without having to create
+ // a new transport and check)
+ return getOutputProtocolFactory() == NULL;
+}
+
+
/**
* This is called when the application transitions from one state into
* another. This means that it has finished writing the data that it needed
@@ -551,12 +565,20 @@
case APP_READ_REQUEST:
// We are done reading the request, package the read buffer into transport
// and get back some data from the dispatch function
- inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
- outputTransport_->resetBuffer();
- // Prepend four bytes of blank space to the buffer so we can
- // write the frame size there later.
- outputTransport_->getWritePtr(4);
- outputTransport_->wroteBytes(4);
+ if (server_->getHeaderTransport()) {
+ inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
+ outputTransport_->resetBuffer();
+ } else {
+ // We saved room for the framing size in case header transport needed it,
+ // but just skip it for the non-header case
+ inputTransport_->resetBuffer(readBuffer_ + 4, readBufferPos_ - 4);
+ outputTransport_->resetBuffer();
+
+ // Prepend four bytes of blank space to the buffer so we can
+ // write the frame size there later.
+ outputTransport_->getWritePtr(4);
+ outputTransport_->wroteBytes(4);
+ }
server_->incrementActiveProcessors();
@@ -691,6 +713,8 @@
return;
case APP_READ_FRAME_SIZE:
+ readWant_ += 4;
+
// We just read the request length
// Double the buffer size until it is big enough
if (readWant_ > readBufferSize_) {
@@ -711,7 +735,8 @@
readBufferSize_ = newSize;
}
- readBufferPos_ = 0;
+ readBufferPos_ = 4;
+ *((uint32_t*)readBuffer_) = htonl(readWant_ - 4);
// Move into read request state
socketState_ = SOCKET_RECV;
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.h b/lib/cpp/src/thrift/server/TNonblockingServer.h
index 4fb83f1..82d40e9 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.h
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.h
@@ -712,6 +712,11 @@
*/
event_base* getUserEventBase() const { return userEventBase_; }
+ /** Some transports, like THeaderTransport, require passing through
+ * the framing size instead of stripping it.
+ */
+ bool getHeaderTransport();
+
private:
/**
* Callback function that the threadmanager calls when a task reaches
diff --git a/lib/cpp/src/thrift/server/TServerFramework.cpp b/lib/cpp/src/thrift/server/TServerFramework.cpp
index e843921..43d11c5 100644
--- a/lib/cpp/src/thrift/server/TServerFramework.cpp
+++ b/lib/cpp/src/thrift/server/TServerFramework.cpp
@@ -147,8 +147,13 @@
inputTransport = inputTransportFactory_->getTransport(client);
outputTransport = outputTransportFactory_->getTransport(client);
- inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
- outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
+ if (!outputProtocolFactory_) {
+ inputProtocol = inputProtocolFactory_->getProtocol(inputTransport, outputTransport);
+ outputProtocol = inputProtocol;
+ } else {
+ inputProtocol = inputProtocolFactory_->getProtocol(inputTransport);
+ outputProtocol = outputProtocolFactory_->getProtocol(outputTransport);
+ }
newlyConnectedClient(shared_ptr<TConnectedClient>(
new TConnectedClient(getProcessor(inputProtocol, outputProtocol, client),
diff --git a/lib/cpp/src/thrift/transport/TBufferTransports.h b/lib/cpp/src/thrift/transport/TBufferTransports.h
index 98e8a9e..013c6e0 100644
--- a/lib/cpp/src/thrift/transport/TBufferTransports.h
+++ b/lib/cpp/src/thrift/transport/TBufferTransports.h
@@ -309,6 +309,16 @@
static const int DEFAULT_MAX_FRAME_SIZE = 256 * 1024 * 1024;
/// Use default buffer sizes.
+ TFramedTransport()
+ : transport_(),
+ rBufSize_(0),
+ wBufSize_(DEFAULT_BUFFER_SIZE),
+ rBuf_(),
+ wBuf_(new uint8_t[wBufSize_]),
+ bufReclaimThresh_((std::numeric_limits<uint32_t>::max)()) {
+ initPointers();
+ }
+
TFramedTransport(boost::shared_ptr<TTransport> transport)
: transport_(transport),
rBufSize_(0),
diff --git a/lib/cpp/src/thrift/transport/THeaderTransport.cpp b/lib/cpp/src/thrift/transport/THeaderTransport.cpp
new file mode 100644
index 0000000..26841b9
--- /dev/null
+++ b/lib/cpp/src/thrift/transport/THeaderTransport.cpp
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <thrift/transport/THeaderTransport.h>
+#include <thrift/TApplicationException.h>
+#include <thrift/protocol/TProtocolTypes.h>
+
+#include <algorithm>
+#include <bitset>
+#include <cassert>
+#include <string>
+#include <zlib.h>
+
+using std::map;
+using boost::shared_ptr;
+using std::string;
+using std::vector;
+
+namespace apache {
+namespace thrift {
+namespace transport {
+
+using namespace apache::thrift::protocol;
+using apache::thrift::protocol::TBinaryProtocol;
+
+uint32_t THeaderTransport::readAll(uint8_t* buf, uint32_t len) {
+ // We want to call TBufferBase's version here, because
+ // TFramedTransport would try and call its own readFrame function
+ return TBufferBase::readAll(buf, len);
+}
+
+uint32_t THeaderTransport::readSlow(uint8_t* buf, uint32_t len) {
+
+ if (clientType == THRIFT_UNFRAMED_DEPRECATED) {
+ return transport_->read(buf, len);
+ }
+
+ return TFramedTransport::readSlow(buf, len);
+}
+
+uint16_t THeaderTransport::getProtocolId() const {
+ if (clientType == THRIFT_HEADER_CLIENT_TYPE) {
+ return protoId;
+ } else {
+ return T_BINARY_PROTOCOL; // Assume other transports use TBinary
+ }
+}
+
+void THeaderTransport::ensureReadBuffer(uint32_t sz) {
+ if (sz > rBufSize_) {
+ rBuf_.reset(new uint8_t[sz]);
+ rBufSize_ = sz;
+ }
+}
+
+bool THeaderTransport::readFrame(uint32_t minFrameSize) {
+ // szN is network byte order of sz
+ uint32_t szN;
+ uint32_t sz;
+
+ // Read the size of the next frame.
+ // We can't use readAll(&sz, sizeof(sz)), since that always throws an
+ // exception on EOF. We want to throw an exception only if EOF occurs after
+ // partial size data.
+ uint32_t sizeBytesRead = 0;
+ while (sizeBytesRead < sizeof(szN)) {
+ uint8_t* szp = reinterpret_cast<uint8_t*>(&szN) + sizeBytesRead;
+ uint32_t bytesRead = transport_->read(szp, sizeof(szN) - sizeBytesRead);
+ if (bytesRead == 0) {
+ if (sizeBytesRead == 0) {
+ // EOF before any data was read.
+ return false;
+ } else {
+ // EOF after a partial frame header. Raise an exception.
+ throw TTransportException(TTransportException::END_OF_FILE,
+ "No more data to read after "
+ "partial frame header.");
+ }
+ }
+ sizeBytesRead += bytesRead;
+ }
+
+ sz = ntohl(szN);
+
+ ensureReadBuffer(minFrameSize + 4);
+
+ if ((sz & TBinaryProtocol::VERSION_MASK) == (uint32_t)TBinaryProtocol::VERSION_1) {
+ // unframed
+ clientType = THRIFT_UNFRAMED_DEPRECATED;
+ memcpy(rBuf_.get(), &szN, sizeof(szN));
+ if (minFrameSize > 4) {
+ transport_->readAll(rBuf_.get() + 4, minFrameSize - 4);
+ setReadBuffer(rBuf_.get(), minFrameSize);
+ } else {
+ setReadBuffer(rBuf_.get(), 4);
+ }
+ } else {
+ // Could be header format or framed. Check next uint32
+ uint32_t magic_n;
+ uint32_t magic;
+
+ if (sz > MAX_FRAME_SIZE) {
+ throw TTransportException(TTransportException::CORRUPTED_DATA,
+ "Header transport frame is too large");
+ }
+
+ ensureReadBuffer(sz);
+
+ // We can use readAll here, because it would be an invalid frame otherwise
+ transport_->readAll(reinterpret_cast<uint8_t*>(&magic_n), sizeof(magic_n));
+ memcpy(rBuf_.get(), &magic_n, sizeof(magic_n));
+ magic = ntohl(magic_n);
+
+ if ((magic & TBinaryProtocol::VERSION_MASK) == (uint32_t)TBinaryProtocol::VERSION_1) {
+ // framed
+ clientType = THRIFT_FRAMED_DEPRECATED;
+ transport_->readAll(rBuf_.get() + 4, sz - 4);
+ setReadBuffer(rBuf_.get(), sz);
+ } else if (HEADER_MAGIC == (magic & HEADER_MASK)) {
+ if (sz < 10) {
+ throw TTransportException(TTransportException::CORRUPTED_DATA,
+ "Header transport frame is too small");
+ }
+
+ transport_->readAll(rBuf_.get() + 4, sz - 4);
+
+ // header format
+ clientType = THRIFT_HEADER_CLIENT_TYPE;
+ // flags
+ flags = magic & FLAGS_MASK;
+ // seqId
+ uint32_t seqId_n;
+ memcpy(&seqId_n, rBuf_.get() + 4, sizeof(seqId_n));
+ seqId = ntohl(seqId_n);
+ // header size
+ uint16_t headerSize_n;
+ memcpy(&headerSize_n, rBuf_.get() + 8, sizeof(headerSize_n));
+ uint16_t headerSize = ntohs(headerSize_n);
+ setReadBuffer(rBuf_.get(), sz);
+ readHeaderFormat(headerSize, sz);
+ } else {
+ clientType = THRIFT_UNKNOWN_CLIENT_TYPE;
+ throw TTransportException(TTransportException::BAD_ARGS,
+ "Could not detect client transport type");
+ }
+ }
+
+ return true;
+}
+
+/**
+ * Reads a string from ptr, taking care not to reach headerBoundary
+ * Advances ptr on success
+ *
+ * @param str output string
+ * @throws CORRUPTED_DATA if size of string exceeds boundary
+ */
+void THeaderTransport::readString(uint8_t*& ptr,
+ /* out */ string& str,
+ uint8_t const* headerBoundary) {
+ int32_t strLen;
+
+ uint32_t bytes = readVarint32(ptr, &strLen, headerBoundary);
+ if (strLen > headerBoundary - ptr) {
+ throw TTransportException(TTransportException::CORRUPTED_DATA,
+ "Info header length exceeds header size");
+ }
+ ptr += bytes;
+ str.assign(reinterpret_cast<const char*>(ptr), strLen);
+ ptr += strLen;
+}
+
+void THeaderTransport::readHeaderFormat(uint16_t headerSize, uint32_t sz) {
+ readTrans_.clear(); // Clear out any previous transforms.
+ readHeaders_.clear(); // Clear out any previous headers.
+
+ // skip over already processed magic(4), seqId(4), headerSize(2)
+ uint8_t* ptr = reinterpret_cast<uint8_t*>(rBuf_.get() + 10);
+
+ // Catch integer overflow, check for reasonable header size
+ assert(headerSize < 16384);
+ headerSize *= 4;
+ const uint8_t* const headerBoundary = ptr + headerSize;
+ if (headerSize > sz) {
+ throw TTransportException(TTransportException::CORRUPTED_DATA,
+ "Header size is larger than frame");
+ }
+ uint8_t* data = ptr + headerSize;
+ ptr += readVarint16(ptr, &protoId, headerBoundary);
+ int16_t numTransforms;
+ ptr += readVarint16(ptr, &numTransforms, headerBoundary);
+
+ // For now all transforms consist of only the ID, not data.
+ for (int i = 0; i < numTransforms; i++) {
+ int32_t transId;
+ ptr += readVarint32(ptr, &transId, headerBoundary);
+
+ readTrans_.push_back(transId);
+ }
+
+ // Info headers
+ while (ptr < headerBoundary) {
+ int32_t infoId;
+ ptr += readVarint32(ptr, &infoId, headerBoundary);
+
+ if (infoId == 0) {
+ // header padding
+ break;
+ }
+ if (infoId >= infoIdType::END) {
+ // cannot handle infoId
+ break;
+ }
+ switch (infoId) {
+ case infoIdType::KEYVALUE:
+ // Process key-value headers
+ uint32_t numKVHeaders;
+ ptr += readVarint32(ptr, (int32_t*)&numKVHeaders, headerBoundary);
+ // continue until we reach (padded) end of packet
+ while (numKVHeaders-- && ptr < headerBoundary) {
+ // format: key; value
+ // both: length (varint32); value (string)
+ string key, value;
+ readString(ptr, key, headerBoundary);
+ // value
+ readString(ptr, value, headerBoundary);
+ // save to headers
+ readHeaders_[key] = value;
+ }
+ break;
+ }
+ }
+
+ // Untransform the data section. rBuf will contain result.
+ untransform(data, sz - (data - rBuf_.get())); // ignore header in size calc
+}
+
+void THeaderTransport::untransform(uint8_t* ptr, uint32_t sz) {
+ // Update the transform buffer size if needed
+ resizeTransformBuffer();
+
+ for (vector<uint16_t>::const_iterator it = readTrans_.begin(); it != readTrans_.end(); ++it) {
+ const uint16_t transId = *it;
+
+ if (transId == ZLIB_TRANSFORM) {
+ z_stream stream;
+ int err;
+
+ stream.next_in = ptr;
+ stream.avail_in = sz;
+
+ // Setting these to 0 means use the default free/alloc functions
+ stream.zalloc = (alloc_func)0;
+ stream.zfree = (free_func)0;
+ stream.opaque = (voidpf)0;
+ err = inflateInit(&stream);
+ if (err != Z_OK) {
+ throw TApplicationException(TApplicationException::MISSING_RESULT,
+ "Error while zlib deflateInit");
+ }
+ stream.next_out = tBuf_.get();
+ stream.avail_out = tBufSize_;
+ err = inflate(&stream, Z_FINISH);
+ if (err != Z_STREAM_END || stream.avail_out == 0) {
+ throw TApplicationException(TApplicationException::MISSING_RESULT,
+ "Error while zlib deflate");
+ }
+ sz = stream.total_out;
+
+ err = inflateEnd(&stream);
+ if (err != Z_OK) {
+ throw TApplicationException(TApplicationException::MISSING_RESULT,
+ "Error while zlib deflateEnd");
+ }
+
+ memcpy(ptr, tBuf_.get(), sz);
+ } else {
+ throw TApplicationException(TApplicationException::MISSING_RESULT, "Unknown transform");
+ }
+ }
+
+ setReadBuffer(ptr, sz);
+}
+
+/**
+ * We may have updated the wBuf size, update the tBuf size to match.
+ * Should be called in transform.
+ *
+ * The buffer should be slightly larger than write buffer size due to
+ * compression transforms (that may slightly grow on small frame sizes)
+ */
+void THeaderTransport::resizeTransformBuffer(uint32_t additionalSize) {
+ if (tBufSize_ < wBufSize_ + DEFAULT_BUFFER_SIZE) {
+ uint32_t new_size = wBufSize_ + DEFAULT_BUFFER_SIZE + additionalSize;
+ uint8_t* new_buf = new uint8_t[new_size];
+ tBuf_.reset(new_buf);
+ tBufSize_ = new_size;
+ }
+}
+
+void THeaderTransport::transform(uint8_t* ptr, uint32_t sz) {
+ // Update the transform buffer size if needed
+ resizeTransformBuffer();
+
+ for (vector<uint16_t>::const_iterator it = writeTrans_.begin(); it != writeTrans_.end(); ++it) {
+ const uint16_t transId = *it;
+
+ if (transId == ZLIB_TRANSFORM) {
+ z_stream stream;
+ int err;
+
+ stream.next_in = ptr;
+ stream.avail_in = sz;
+
+ stream.zalloc = (alloc_func)0;
+ stream.zfree = (free_func)0;
+ stream.opaque = (voidpf)0;
+ err = deflateInit(&stream, Z_DEFAULT_COMPRESSION);
+ if (err != Z_OK) {
+ throw TTransportException(TTransportException::CORRUPTED_DATA,
+ "Error while zlib deflateInit");
+ }
+ uint32_t tbuf_size = 0;
+ while (err == Z_OK) {
+ resizeTransformBuffer(tbuf_size);
+
+ stream.next_out = tBuf_.get();
+ stream.avail_out = tBufSize_;
+ err = deflate(&stream, Z_FINISH);
+ tbuf_size += DEFAULT_BUFFER_SIZE;
+ }
+ sz = stream.total_out;
+
+ err = deflateEnd(&stream);
+ if (err != Z_OK) {
+ throw TTransportException(TTransportException::CORRUPTED_DATA,
+ "Error while zlib deflateEnd");
+ }
+
+ memcpy(ptr, tBuf_.get(), sz);
+ } else {
+ throw TTransportException(TTransportException::CORRUPTED_DATA, "Unknown transform");
+ }
+ }
+
+ wBase_ = wBuf_.get() + sz;
+}
+
+void THeaderTransport::resetProtocol() {
+ // Set to anything except HTTP type so we don't flush again
+ clientType = THRIFT_HEADER_CLIENT_TYPE;
+
+ // Read the header and decide which protocol to go with
+ readFrame(0);
+}
+
+uint32_t THeaderTransport::getWriteBytes() {
+ return wBase_ - wBuf_.get();
+}
+
+/**
+ * Writes a string to a byte buffer, as size (varint32) + string (non-null
+ * terminated)
+ * Automatically advances ptr to after the written portion
+ */
+void THeaderTransport::writeString(uint8_t*& ptr, const string& str) {
+ uint32_t strLen = str.length();
+ ptr += writeVarint32(strLen, ptr);
+ memcpy(ptr, str.c_str(), strLen); // no need to write \0
+ ptr += strLen;
+}
+
+void THeaderTransport::setHeader(const string& key, const string& value) {
+ writeHeaders_[key] = value;
+}
+
+size_t THeaderTransport::getMaxWriteHeadersSize() const {
+ size_t maxWriteHeadersSize = 0;
+ THeaderTransport::StringToStringMap::const_iterator it;
+ for (it = writeHeaders_.begin(); it != writeHeaders_.end(); ++it) {
+ // add sizes of key and value to maxWriteHeadersSize
+ // 2 varints32 + the strings themselves
+ maxWriteHeadersSize += 5 + 5 + (it->first).length() + (it->second).length();
+ }
+ return maxWriteHeadersSize;
+}
+
+void THeaderTransport::clearHeaders() {
+ writeHeaders_.clear();
+}
+
+void THeaderTransport::flush() {
+ // Write out any data waiting in the write buffer.
+ uint32_t haveBytes = getWriteBytes();
+
+ if (clientType == THRIFT_HEADER_CLIENT_TYPE) {
+ transform(wBuf_.get(), haveBytes);
+ haveBytes = getWriteBytes(); // transform may have changed the size
+ }
+
+ // Note that we reset wBase_ prior to the underlying write
+ // to ensure we're in a sane state (i.e. internal buffer cleaned)
+ // if the underlying write throws up an exception
+ wBase_ = wBuf_.get();
+
+ if (haveBytes > MAX_FRAME_SIZE) {
+ throw TTransportException(TTransportException::CORRUPTED_DATA,
+ "Attempting to send frame that is too large");
+ }
+
+ if (clientType == THRIFT_HEADER_CLIENT_TYPE) {
+ // header size will need to be updated at the end because of varints.
+ // Make it big enough here for max varint size, plus 4 for padding.
+ int headerSize = (2 + getNumTransforms()) * THRIFT_MAX_VARINT32_BYTES + 4;
+ // add approximate size of info headers
+ headerSize += getMaxWriteHeadersSize();
+
+ // Pkt size
+ uint32_t maxSzHbo = headerSize + haveBytes // thrift header + payload
+ + 10; // common header section
+ uint8_t* pkt = tBuf_.get();
+ uint8_t* headerStart;
+ uint8_t* headerSizePtr;
+ uint8_t* pktStart = pkt;
+
+ if (maxSzHbo > tBufSize_) {
+ throw TTransportException(TTransportException::CORRUPTED_DATA,
+ "Attempting to header frame that is too large");
+ }
+
+ uint32_t szHbo;
+ uint32_t szNbo;
+ uint16_t headerSizeN;
+
+ // Fixup szHbo later
+ pkt += sizeof(szNbo);
+ uint16_t headerN = htons(HEADER_MAGIC >> 16);
+ memcpy(pkt, &headerN, sizeof(headerN));
+ pkt += sizeof(headerN);
+ uint16_t flagsN = htons(flags);
+ memcpy(pkt, &flagsN, sizeof(flagsN));
+ pkt += sizeof(flagsN);
+ uint32_t seqIdN = htonl(seqId);
+ memcpy(pkt, &seqIdN, sizeof(seqIdN));
+ pkt += sizeof(seqIdN);
+ headerSizePtr = pkt;
+ // Fixup headerSizeN later
+ pkt += sizeof(headerSizeN);
+ headerStart = pkt;
+
+ pkt += writeVarint32(protoId, pkt);
+ pkt += writeVarint32(getNumTransforms(), pkt);
+
+ // For now, each transform is only the ID, no following data.
+ for (vector<uint16_t>::const_iterator it = writeTrans_.begin(); it != writeTrans_.end(); ++it) {
+ pkt += writeVarint32(*it, pkt);
+ }
+
+ // write info headers
+
+ // for now only write kv-headers
+ uint16_t headerCount = writeHeaders_.size();
+ if (headerCount > 0) {
+ pkt += writeVarint32(infoIdType::KEYVALUE, pkt);
+ // Write key-value headers count
+ pkt += writeVarint32(headerCount, pkt);
+ // Write info headers
+ map<string, string>::const_iterator it;
+ for (it = writeHeaders_.begin(); it != writeHeaders_.end(); ++it) {
+ writeString(pkt, it->first); // key
+ writeString(pkt, it->second); // value
+ }
+ writeHeaders_.clear();
+ }
+
+ // Fixups after varint size calculations
+ headerSize = (pkt - headerStart);
+ uint8_t padding = 4 - (headerSize % 4);
+ headerSize += padding;
+
+ // Pad out pkt with 0x00
+ for (int i = 0; i < padding; i++) {
+ *(pkt++) = 0x00;
+ }
+
+ // Pkt size
+ szHbo = headerSize + haveBytes // thrift header + payload
+ + (headerStart - pktStart - 4); // common header section
+ headerSizeN = htons(headerSize / 4);
+ memcpy(headerSizePtr, &headerSizeN, sizeof(headerSizeN));
+
+ // Set framing size.
+ szNbo = htonl(szHbo);
+ memcpy(pktStart, &szNbo, sizeof(szNbo));
+
+ outTransport_->write(pktStart, szHbo - haveBytes + 4);
+ outTransport_->write(wBuf_.get(), haveBytes);
+ } else if (clientType == THRIFT_FRAMED_DEPRECATED) {
+ uint32_t szHbo = (uint32_t)haveBytes;
+ uint32_t szNbo = htonl(szHbo);
+
+ outTransport_->write(reinterpret_cast<uint8_t*>(&szNbo), 4);
+ outTransport_->write(wBuf_.get(), haveBytes);
+ } else if (clientType == THRIFT_UNFRAMED_DEPRECATED) {
+ outTransport_->write(wBuf_.get(), haveBytes);
+ } else {
+ throw TTransportException(TTransportException::BAD_ARGS, "Unknown client type");
+ }
+
+ // Flush the underlying transport.
+ outTransport_->flush();
+}
+
+/**
+ * Read an i16 from the wire as a varint. The MSB of each byte is set
+ * if there is another byte to follow. This can read up to 3 bytes.
+ */
+uint32_t THeaderTransport::readVarint16(uint8_t const* ptr, int16_t* i16, uint8_t const* boundary) {
+ int32_t val;
+ uint32_t rsize = readVarint32(ptr, &val, boundary);
+ *i16 = (int16_t)val;
+ return rsize;
+}
+
+/**
+ * Read an i32 from the wire as a varint. The MSB of each byte is set
+ * if there is another byte to follow. This can read up to 5 bytes.
+ */
+uint32_t THeaderTransport::readVarint32(uint8_t const* ptr, int32_t* i32, uint8_t const* boundary) {
+
+ uint32_t rsize = 0;
+ uint32_t val = 0;
+ int shift = 0;
+
+ while (true) {
+ if (ptr == boundary) {
+ throw TApplicationException(TApplicationException::INVALID_MESSAGE_TYPE,
+ "Trying to read past header boundary");
+ }
+ uint8_t byte = *(ptr++);
+ rsize++;
+ val |= (uint64_t)(byte & 0x7f) << shift;
+ shift += 7;
+ if (!(byte & 0x80)) {
+ *i32 = val;
+ return rsize;
+ }
+ }
+}
+
+/**
+ * Write an i32 as a varint. Results in 1-5 bytes on the wire.
+ */
+uint32_t THeaderTransport::writeVarint32(int32_t n, uint8_t* pkt) {
+ uint8_t buf[5];
+ uint32_t wsize = 0;
+
+ while (true) {
+ if ((n & ~0x7F) == 0) {
+ buf[wsize++] = (int8_t)n;
+ break;
+ } else {
+ buf[wsize++] = (int8_t)((n & 0x7F) | 0x80);
+ n >>= 7;
+ }
+ }
+
+ // Caller will advance pkt.
+ for (uint32_t i = 0; i < wsize; i++) {
+ pkt[i] = buf[i];
+ }
+
+ return wsize;
+}
+
+uint32_t THeaderTransport::writeVarint16(int16_t n, uint8_t* pkt) {
+ return writeVarint32(n, pkt);
+}
+}
+}
+} // apache::thrift::transport
diff --git a/lib/cpp/src/thrift/transport/THeaderTransport.h b/lib/cpp/src/thrift/transport/THeaderTransport.h
new file mode 100644
index 0000000..0cef56d
--- /dev/null
+++ b/lib/cpp/src/thrift/transport/THeaderTransport.h
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef THRIFT_TRANSPORT_THEADERTRANSPORT_H_
+#define THRIFT_TRANSPORT_THEADERTRANSPORT_H_ 1
+
+#include <thrift/protocol/TBinaryProtocol.h>
+#include <thrift/protocol/TProtocolTypes.h>
+#include <thrift/transport/TBufferTransports.h>
+#include <thrift/transport/TTransport.h>
+#include <thrift/transport/TVirtualTransport.h>
+
+#include <bitset>
+#include <boost/scoped_array.hpp>
+#include <pwd.h>
+#include <unistd.h>
+
+// Don't include the unknown client.
+#define CLIENT_TYPES_LEN 3
+
+enum CLIENT_TYPE {
+ THRIFT_HEADER_CLIENT_TYPE = 0,
+ THRIFT_FRAMED_DEPRECATED = 1,
+ THRIFT_UNFRAMED_DEPRECATED = 2,
+ THRIFT_UNKNOWN_CLIENT_TYPE = 4,
+};
+
+namespace apache {
+namespace thrift {
+namespace transport {
+
+using apache::thrift::protocol::T_COMPACT_PROTOCOL;
+
+/**
+ * Header transport. All writes go into an in-memory buffer until flush is
+ * called, at which point the transport writes the length of the entire
+ * binary chunk followed by the data payload. This allows the receiver on the
+ * other end to always do fixed-length reads.
+ *
+ * Subclass TFramedTransport because most of the read/write methods are similar
+ * and need similar buffers. Major changes are readFrame & flush.
+ *
+ * Header Transport *must* be the same transport for both input and
+ * output when used on the server side - client responses should be
+ * the same protocol as those in the request.
+ */
+class THeaderTransport : public TVirtualTransport<THeaderTransport, TFramedTransport> {
+public:
+ static const int DEFAULT_BUFFER_SIZE = 512u;
+ static const int THRIFT_MAX_VARINT32_BYTES = 5;
+
+ /// Use default buffer sizes.
+ explicit THeaderTransport(const boost::shared_ptr<TTransport>& transport)
+ : transport_(transport),
+ outTransport_(transport),
+ protoId(T_COMPACT_PROTOCOL),
+ clientType(THRIFT_HEADER_CLIENT_TYPE),
+ seqId(0),
+ flags(0),
+ tBufSize_(0),
+ tBuf_(NULL) {
+ initBuffers();
+ }
+
+ THeaderTransport(const boost::shared_ptr<TTransport> inTransport,
+ const boost::shared_ptr<TTransport> outTransport)
+ : transport_(inTransport),
+ outTransport_(outTransport),
+ protoId(T_COMPACT_PROTOCOL),
+ clientType(THRIFT_HEADER_CLIENT_TYPE),
+ seqId(0),
+ flags(0),
+ tBufSize_(0),
+ tBuf_(NULL) {
+ initBuffers();
+ }
+
+ void open() { transport_->open(); }
+
+ bool isOpen() { return transport_->isOpen(); }
+
+ bool peek() { return (this->rBase_ < this->rBound_) || transport_->peek(); }
+
+ void close() {
+ flush();
+ transport_->close();
+ }
+
+ virtual uint32_t readSlow(uint8_t* buf, uint32_t len);
+ virtual uint32_t readAll(uint8_t* buf, uint32_t len);
+ virtual void flush();
+
+ void resizeTransformBuffer(uint32_t additionalSize = 0);
+
+ boost::shared_ptr<TTransport> getUnderlyingTransport() { return transport_; }
+
+ /*
+ * TVirtualTransport provides a default implementation of readAll().
+ * We want to use the TBufferBase version instead.
+ */
+ using TBufferBase::readAll;
+
+ uint16_t getProtocolId() const;
+ void setProtocolId(uint16_t protoId) { this->protoId = protoId; }
+
+ void resetProtocol();
+
+ /**
+ * We know we got a packet in header format here, try to parse the header
+ *
+ * @param headerSize size of the header portion
+ * @param sz Size of the whole message, including header
+ */
+ void readHeaderFormat(uint16_t headerSize, uint32_t sz);
+
+ /**
+ * Untransform the data based on the recieved header flags
+ * On conclusion of function, setReadBuffer is called with the
+ * untransformed data.
+ *
+ * @param ptr ptr to data
+ * @param size of data
+ */
+ void untransform(uint8_t* ptr, uint32_t sz);
+
+ /**
+ * Transform the data based on our write transform flags
+ * At conclusion of function the write buffer is set to the
+ * transformed data.
+ *
+ * @param ptr Ptr to data to transform
+ * @param sz Size of data buffer
+ */
+ void transform(uint8_t* ptr, uint32_t sz);
+
+ uint16_t getNumTransforms() const {
+ int trans = writeTrans_.size();
+ return trans;
+ }
+
+ void setTransform(uint16_t transId) { writeTrans_.push_back(transId); }
+
+ // Info headers
+
+ typedef std::map<std::string, std::string> StringToStringMap;
+
+ // these work with write headers
+ void setHeader(const std::string& key, const std::string& value);
+
+ void clearHeaders();
+
+ StringToStringMap& getWriteHeaders() { return writeHeaders_; }
+
+ // these work with read headers
+ const StringToStringMap& getHeaders() const { return readHeaders_; }
+
+ // accessors for seqId
+ int32_t getSequenceNumber() const { return seqId; }
+ void setSequenceNumber(int32_t seqId) { this->seqId = seqId; }
+
+ enum TRANSFORMS {
+ ZLIB_TRANSFORM = 0x01,
+ };
+
+protected:
+ std::bitset<CLIENT_TYPES_LEN> supported_clients;
+
+ void initSupportedClients(std::bitset<CLIENT_TYPES_LEN> const*);
+
+ /**
+ * Reads a frame of input from the underlying stream.
+ *
+ * Returns true if a frame was read successfully, or false on EOF.
+ * (Raises a TTransportException if EOF occurs after a partial frame.)
+ */
+ bool readFrame(uint32_t minFrameSize);
+
+ void ensureReadBuffer(uint32_t sz);
+ uint32_t getWriteBytes();
+
+ void initBuffers() {
+ setReadBuffer(NULL, 0);
+ setWriteBuffer(wBuf_.get(), wBufSize_);
+ }
+
+ boost::shared_ptr<TTransport> transport_;
+ boost::shared_ptr<TTransport> outTransport_;
+
+ // 0 and 16th bits must be 0 to differentiate from framed & unframed
+ static const uint32_t HEADER_MAGIC = 0x0FFF0000;
+ static const uint32_t HEADER_MASK = 0xFFFF0000;
+ static const uint32_t FLAGS_MASK = 0x0000FFFF;
+
+ static const uint32_t MAX_FRAME_SIZE = 0x3FFFFFFF;
+
+ int16_t protoId;
+ uint16_t clientType;
+ uint32_t seqId;
+ uint16_t flags;
+
+ std::vector<uint16_t> readTrans_;
+ std::vector<uint16_t> writeTrans_;
+
+ // Map to use for headers
+ StringToStringMap readHeaders_;
+ StringToStringMap writeHeaders_;
+
+ /**
+ * Returns the maximum number of bytes that write k/v headers can take
+ */
+ size_t getMaxWriteHeadersSize() const;
+
+ struct infoIdType {
+ enum idType {
+ // start at 1 to avoid confusing header padding for an infoId
+ KEYVALUE = 1,
+ END // signal the end of infoIds we can handle
+ };
+ };
+
+ // Buffers to use for transform processing
+ uint32_t tBufSize_;
+ boost::scoped_array<uint8_t> tBuf_;
+
+ void readString(uint8_t*& ptr, /* out */ std::string& str, uint8_t const* headerBoundary);
+
+ void writeString(uint8_t*& ptr, const std::string& str);
+
+ // Varint utils
+ /**
+ * Read an i16 from the wire as a varint. The MSB of each byte is set
+ * if there is another byte to follow. This can read up to 3 bytes.
+ */
+ uint32_t readVarint16(uint8_t const* ptr, int16_t* i16, uint8_t const* boundary);
+
+ /**
+ * Read an i32 from the wire as a varint. The MSB of each byte is set
+ * if there is another byte to follow. This can read up to 5 bytes.
+ */
+ uint32_t readVarint32(uint8_t const* ptr, int32_t* i32, uint8_t const* boundary);
+
+ /**
+ * Write an i32 as a varint. Results in 1-5 bytes on the wire.
+ */
+ uint32_t writeVarint32(int32_t n, uint8_t* pkt);
+
+ /**
+ * Write an i16 as a varint. Results in 1-3 bytes on the wire.
+ */
+ uint32_t writeVarint16(int16_t n, uint8_t* pkt);
+};
+
+/**
+ * Wraps a transport into a header one.
+ *
+ */
+class THeaderTransportFactory : public TTransportFactory {
+public:
+ THeaderTransportFactory() {}
+
+ virtual ~THeaderTransportFactory() {}
+
+ /**
+ * Wraps the transport into a header one.
+ */
+ virtual boost::shared_ptr<TTransport> getTransport(boost::shared_ptr<TTransport> trans) {
+ return boost::shared_ptr<TTransport>(new THeaderTransport(trans));
+ }
+};
+}
+}
+} // apache::thrift::transport
+
+#endif // #ifndef THRIFT_TRANSPORT_THEADERTRANSPORT_H_
diff --git a/lib/cpp/test/CMakeLists.txt b/lib/cpp/test/CMakeLists.txt
index 5de9fc4..033b4d2 100644
--- a/lib/cpp/test/CMakeLists.txt
+++ b/lib/cpp/test/CMakeLists.txt
@@ -21,6 +21,7 @@
set(Boost_USE_STATIC_LIBS ON) # Force the use of static boost test framework
find_package(Boost 1.53.0 REQUIRED COMPONENTS chrono filesystem system thread unit_test_framework)
include_directories(SYSTEM "${Boost_INCLUDE_DIRS}")
+include_directories(SYSTEM "${ZLIB_INCLUDE_DIRS}")
#Make sure gen-cpp files can be included
include_directories("${CMAKE_CURRENT_BINARY_DIR}")
@@ -62,6 +63,7 @@
target_link_libraries(Benchmark testgencpp)
LINK_AGAINST_THRIFT_LIBRARY(Benchmark thrift)
add_test(NAME Benchmark COMMAND Benchmark)
+target_link_libraries(Benchmark testgencpp ${ZLIB_LIBRARIES})
set(UnitTest_SOURCES
UnitTestMain.cpp
@@ -79,7 +81,8 @@
endif()
add_executable(UnitTests ${UnitTest_SOURCES})
-target_link_libraries(UnitTests testgencpp ${Boost_LIBRARIES})
+target_link_libraries(UnitTests testgencpp ${Boost_LIBRARIES}
+ ${ZLIB_LIBRARIES})
LINK_AGAINST_THRIFT_LIBRARY(UnitTests thrift)
add_test(NAME UnitTests COMMAND UnitTests)
if ( MSVC )
@@ -99,6 +102,7 @@
target_link_libraries(TInterruptTest
testgencpp
${Boost_LIBRARIES}
+ ${ZLIB_LIBRARIES}
)
LINK_AGAINST_THRIFT_LIBRARY(TInterruptTest thrift)
if (NOT MSVC AND NOT ${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
@@ -110,6 +114,7 @@
target_link_libraries(TServerIntegrationTest
testgencpp_cob
${Boost_LIBRARIES}
+ ${ZLIB_LIBRARIES}
)
LINK_AGAINST_THRIFT_LIBRARY(TServerIntegrationTest thrift)
if (NOT MSVC AND NOT ${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
@@ -144,6 +149,7 @@
target_link_libraries(EnumTest
testgencpp
${Boost_LIBRARIES}
+ ${ZLIB_LIBRARIES}
)
LINK_AGAINST_THRIFT_LIBRARY(EnumTest thrift)
add_test(NAME EnumTest COMMAND EnumTest)
@@ -153,6 +159,7 @@
target_link_libraries(TFileTransportTest
testgencpp
${Boost_LIBRARIES}
+ ${ZLIB_LIBRARIES}
)
LINK_AGAINST_THRIFT_LIBRARY(TFileTransportTest thrift)
add_test(NAME TFileTransportTest COMMAND TFileTransportTest)
@@ -161,6 +168,7 @@
add_executable(TFDTransportTest TFDTransportTest.cpp)
target_link_libraries(TFDTransportTest
${Boost_LIBRARIES}
+ ${ZLIB_LIBRARIES}
)
LINK_AGAINST_THRIFT_LIBRARY(TFDTransportTest thrift)
add_test(NAME TFDTransportTest COMMAND TFDTransportTest)
@@ -168,6 +176,7 @@
add_executable(TPipedTransportTest TPipedTransportTest.cpp)
target_link_libraries(TPipedTransportTest
${Boost_LIBRARIES}
+ ${ZLIB_LIBRARIES}
)
LINK_AGAINST_THRIFT_LIBRARY(TPipedTransportTest thrift)
add_test(NAME TPipedTransportTest COMMAND TPipedTransportTest)
@@ -182,6 +191,7 @@
target_link_libraries(AllProtocolsTest
testgencpp
${Boost_LIBRARIES}
+ ${ZLIB_LIBRARIES}
)
LINK_AGAINST_THRIFT_LIBRARY(AllProtocolsTest thrift)
add_test(NAME AllProtocolsTest COMMAND AllProtocolsTest)
@@ -192,6 +202,7 @@
target_link_libraries(DebugProtoTest
testgencpp
${Boost_LIBRARIES}
+ ${ZLIB_LIBRARIES}
)
LINK_AGAINST_THRIFT_LIBRARY(DebugProtoTest thrift)
add_test(NAME DebugProtoTest COMMAND DebugProtoTest)
@@ -201,6 +212,7 @@
target_link_libraries(JSONProtoTest
testgencpp
${Boost_LIBRARIES}
+ ${ZLIB_LIBRARIES}
)
LINK_AGAINST_THRIFT_LIBRARY(JSONProtoTest thrift)
add_test(NAME JSONProtoTest COMMAND JSONProtoTest)
@@ -209,6 +221,7 @@
target_link_libraries(OptionalRequiredTest
testgencpp
${Boost_LIBRARIES}
+ ${ZLIB_LIBRARIES}
)
LINK_AGAINST_THRIFT_LIBRARY(OptionalRequiredTest thrift)
add_test(NAME OptionalRequiredTest COMMAND OptionalRequiredTest)
@@ -217,6 +230,7 @@
target_link_libraries(RecursiveTest
testgencpp
${Boost_LIBRARIES}
+ ${ZLIB_LIBRARIES}
)
LINK_AGAINST_THRIFT_LIBRARY(RecursiveTest thrift)
add_test(NAME RecursiveTest COMMAND RecursiveTest)
@@ -225,6 +239,7 @@
target_link_libraries(SpecializationTest
testgencpp
${Boost_LIBRARIES}
+ ${ZLIB_LIBRARIES}
)
LINK_AGAINST_THRIFT_LIBRARY(SpecializationTest thrift)
add_test(NAME SpecializationTest COMMAND SpecializationTest)
@@ -238,6 +253,7 @@
add_executable(concurrency_test ${concurrency_test_SOURCES})
LINK_AGAINST_THRIFT_LIBRARY(concurrency_test thrift)
add_test(NAME concurrency_test COMMAND concurrency_test)
+target_link_libraries(concurrency_test ${ZLIB_LIBRARIES})
set(link_test_SOURCES
link/LinkTest.cpp
@@ -249,6 +265,7 @@
add_executable(link_test ${link_test_SOURCES})
target_link_libraries(link_test testgencpp_cob)
LINK_AGAINST_THRIFT_LIBRARY(link_test thrift)
+target_link_libraries(link_test testgencpp ${ZLIB_LIBRARIES})
add_test(NAME link_test COMMAND link_test)
if(WITH_LIBEVENT)
@@ -264,6 +281,7 @@
target_link_libraries(processor_test
testgencpp_cob
${Boost_LIBRARIES}
+ ${ZLIB_LIBRARIES}
)
LINK_AGAINST_THRIFT_LIBRARY(processor_test thrift)
LINK_AGAINST_THRIFT_LIBRARY(processor_test thriftnb)
@@ -276,6 +294,7 @@
testgencpp_cob
${LIBEVENT_LIBRARIES}
${Boost_LIBRARIES}
+ ${ZLIB_LIBRARIES}
)
LINK_AGAINST_THRIFT_LIBRARY(TNonblockingServerTest thrift)
LINK_AGAINST_THRIFT_LIBRARY(TNonblockingServerTest thriftnb)
@@ -287,6 +306,7 @@
target_link_libraries(OpenSSLManualInitTest
${OPENSSL_LIBRARIES}
${Boost_LIBRARIES}
+ ${ZLIB_LIBRARIES}
)
LINK_AGAINST_THRIFT_LIBRARY(OpenSSLManualInitTest thrift)
add_test(NAME OpenSSLManualInitTest COMMAND OpenSSLManualInitTest)
@@ -295,6 +315,7 @@
target_link_libraries(SecurityTest
testgencpp
${Boost_LIBRARIES}
+ ${ZLIB_LIBRARIES}
)
LINK_AGAINST_THRIFT_LIBRARY(SecurityTest thrift)
if (NOT MSVC AND NOT ${CMAKE_SYSTEM_NAME} MATCHES "Darwin")
diff --git a/test/cpp/CMakeLists.txt b/test/cpp/CMakeLists.txt
index 2d75f2e..67d9510 100755
--- a/test/cpp/CMakeLists.txt
+++ b/test/cpp/CMakeLists.txt
@@ -54,23 +54,24 @@
LINK_AGAINST_THRIFT_LIBRARY(crossstressgencpp thrift)
add_executable(TestServer src/TestServer.cpp)
-target_link_libraries(TestServer crosstestgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB})
+target_link_libraries(TestServer crosstestgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB} ${ZLIB_LIBRARIES})
LINK_AGAINST_THRIFT_LIBRARY(TestServer thrift)
LINK_AGAINST_THRIFT_LIBRARY(TestServer thriftnb)
+LINK_AGAINST_THRIFT_LIBRARY(TestServer thriftnb)
add_executable(TestClient src/TestClient.cpp)
-target_link_libraries(TestClient crosstestgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB})
+target_link_libraries(TestClient crosstestgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB} ${ZLIB_LIBRARIES})
LINK_AGAINST_THRIFT_LIBRARY(TestClient thrift)
LINK_AGAINST_THRIFT_LIBRARY(TestClient thriftnb)
add_executable(StressTest src/StressTest.cpp)
-target_link_libraries(StressTest crossstressgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB})
+target_link_libraries(StressTest crossstressgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB} ${ZLIB_LIBRARIES})
LINK_AGAINST_THRIFT_LIBRARY(StressTest thrift)
LINK_AGAINST_THRIFT_LIBRARY(StressTest thriftnb)
add_test(NAME StressTest COMMAND StressTest)
add_executable(StressTestNonBlocking src/StressTestNonBlocking.cpp)
-target_link_libraries(StressTestNonBlocking crossstressgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB})
+target_link_libraries(StressTestNonBlocking crossstressgencpp ${Boost_LIBRARIES} ${LIBEVENT_LIB} ${ZLIB_LIBRARIES})
LINK_AGAINST_THRIFT_LIBRARY(StressTestNonBlocking thrift)
LINK_AGAINST_THRIFT_LIBRARY(StressTestNonBlocking thriftnb)
LINK_AGAINST_THRIFT_LIBRARY(StressTestNonBlocking thriftz)
diff --git a/test/cpp/Makefile.am b/test/cpp/Makefile.am
index bc026b2..c609a71 100755
--- a/test/cpp/Makefile.am
+++ b/test/cpp/Makefile.am
@@ -69,7 +69,7 @@
$(top_builddir)/lib/cpp/libthrift.la \
$(top_builddir)/lib/cpp/libthriftz.la \
$(top_builddir)/lib/cpp/libthriftnb.la \
- -levent -lboost_program_options -lboost_system -lboost_filesystem
+ -levent -lboost_program_options -lboost_system -lboost_filesystem $(ZLIB_LIBS)
TestClient_SOURCES = \
src/TestClient.cpp
@@ -79,7 +79,7 @@
$(top_builddir)/lib/cpp/libthrift.la \
$(top_builddir)/lib/cpp/libthriftz.la \
$(top_builddir)/lib/cpp/libthriftnb.la \
- -levent -lboost_program_options -lboost_system -lboost_filesystem
+ -levent -lboost_program_options -lboost_system -lboost_filesystem $(ZLIB_LIBS)
StressTest_SOURCES = \
src/StressTest.cpp
diff --git a/test/cpp/src/TestClient.cpp b/test/cpp/src/TestClient.cpp
index 2736ee8..47539dc 100644
--- a/test/cpp/src/TestClient.cpp
+++ b/test/cpp/src/TestClient.cpp
@@ -23,6 +23,7 @@
#include <iostream>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TCompactProtocol.h>
+#include <thrift/protocol/THeaderProtocol.h>
#include <thrift/protocol/TJSONProtocol.h>
#include <thrift/transport/THttpClient.h>
#include <thrift/transport/TTransportUtils.h>
@@ -170,7 +171,7 @@
"Transport: buffered, framed, http, evhttp")(
"protocol",
boost::program_options::value<string>(&protocol_type)->default_value(protocol_type),
- "Protocol: binary, compact, json")("ssl", "Encrypted Transport using SSL")(
+ "Protocol: binary, header, compact, json")("ssl", "Encrypted Transport using SSL")(
"testloops,n",
boost::program_options::value<int>(&numTests)->default_value(numTests),
"Number of Tests")("noinsane", "Do not run insanity test");
@@ -188,6 +189,7 @@
if (!protocol_type.empty()) {
if (protocol_type == "binary") {
} else if (protocol_type == "compact") {
+ } else if (protocol_type == "header") {
} else if (protocol_type == "json") {
} else {
throw invalid_argument("Unknown protocol type " + protocol_type);
@@ -266,6 +268,9 @@
} else if (protocol_type.compare("compact") == 0) {
boost::shared_ptr<TProtocol> compactProtocol(new TCompactProtocol(transport));
protocol = compactProtocol;
+ } else if (protocol_type == "header") {
+ boost::shared_ptr<TProtocol> headerProtocol(new THeaderProtocol(transport));
+ protocol = headerProtocol;
} else {
boost::shared_ptr<TBinaryProtocol> binaryProtocol(new TBinaryProtocol(transport));
protocol = binaryProtocol;
diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp
index 51169af..12c4b97 100644
--- a/test/cpp/src/TestServer.cpp
+++ b/test/cpp/src/TestServer.cpp
@@ -24,6 +24,7 @@
#include <thrift/concurrency/PlatformThreadFactory.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/protocol/TCompactProtocol.h>
+#include <thrift/protocol/THeaderProtocol.h>
#include <thrift/protocol/TJSONProtocol.h>
#include <thrift/server/TSimpleServer.h>
#include <thrift/server/TThreadedServer.h>
@@ -567,7 +568,7 @@
"transport: buffered, framed, http")(
"protocol",
boost::program_options::value<string>(&protocol_type)->default_value(protocol_type),
- "protocol: binary, compact, json")("ssl", "Encrypted Transport using SSL")(
+ "protocol: binary, compact, header, json")("ssl", "Encrypted Transport using SSL")(
"processor-events",
"processor-events")("workers,n",
boost::program_options::value<size_t>(&workers)->default_value(workers),
@@ -597,6 +598,7 @@
if (protocol_type == "binary") {
} else if (protocol_type == "compact") {
} else if (protocol_type == "json") {
+ } else if (protocol_type == "header") {
} else {
throw invalid_argument("Unknown protocol type " + protocol_type);
}
@@ -633,6 +635,9 @@
} else if (protocol_type == "compact") {
boost::shared_ptr<TProtocolFactory> compactProtocolFactory(new TCompactProtocolFactory());
protocolFactory = compactProtocolFactory;
+ } else if (protocol_type == "header") {
+ boost::shared_ptr<TProtocolFactory> headerProtocolFactory(new THeaderProtocolFactory());
+ protocolFactory = headerProtocolFactory;
} else {
boost::shared_ptr<TProtocolFactory> binaryProtocolFactory(
new TBinaryProtocolFactoryT<TBufferBase>());
@@ -739,11 +744,16 @@
TEvhttpServer nonblockingServer(testBufferProcessor, port);
nonblockingServer.serve();
} else {
- server.reset(new TNonblockingServer(testProcessor, port));
+ server.reset(new TNonblockingServer(testProcessor, protocolFactory, port));
}
}
if (server.get() != NULL) {
+ if (protocol_type == "header") {
+ // Tell the server to use the same protocol for input / output
+ // if using header
+ server->setOutputProtocolFactory(boost::shared_ptr<TProtocolFactory>());
+ }
apache::thrift::concurrency::PlatformThreadFactory factory;
factory.setDetached(false);
boost::shared_ptr<apache::thrift::concurrency::Runnable> serverThreadRunner(server);
diff --git a/test/known_failures_Linux.json b/test/known_failures_Linux.json
index a20209b..4c78c52 100644
--- a/test/known_failures_Linux.json
+++ b/test/known_failures_Linux.json
@@ -11,6 +11,8 @@
"c_glib-rb_binary_framed-ip",
"cpp-cpp_binary_http-ip",
"cpp-cpp_compact_http-domain",
+ "cpp-cpp_header_http-domain",
+ "cpp-cpp_header_http-ip",
"cpp-cpp_json_http-ip",
"cpp-hs_json_buffered-ip",
"cpp-hs_json_framed-ip",
diff --git a/test/tests.json b/test/tests.json
index 44040f2..aeff933 100644
--- a/test/tests.json
+++ b/test/tests.json
@@ -228,7 +228,8 @@
"protocols": [
"compact",
"binary",
- "json"
+ "json",
+ "header"
],
"workdir": "cpp"
},
diff --git a/tutorial/cpp/CMakeLists.txt b/tutorial/cpp/CMakeLists.txt
index 2b0c143..42d92ff 100644
--- a/tutorial/cpp/CMakeLists.txt
+++ b/tutorial/cpp/CMakeLists.txt
@@ -45,7 +45,9 @@
add_executable(TutorialServer CppServer.cpp)
target_link_libraries(TutorialServer tutorialgencpp)
LINK_AGAINST_THRIFT_LIBRARY(TutorialServer thrift)
+target_link_libraries(TutorialServer ${ZLIB_LIBRARIES})
add_executable(TutorialClient CppClient.cpp)
target_link_libraries(TutorialClient tutorialgencpp)
LINK_AGAINST_THRIFT_LIBRARY(TutorialClient thrift)
+target_link_libraries(TutorialClient ${ZLIB_LIBRARIES})