Thrift: TDenseProtocol using variable-length integers.
Reviewed By: mcslee
Test Plan:
test/DenseProtoTest.cpp
Still have to test:
- Bounds checking.
- borrow/consume on TBuffered and TFramed.
Revert Plan: ok
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665252 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/protocol/TDenseProtocol.cpp b/lib/cpp/src/protocol/TDenseProtocol.cpp
index 095dd64..99117d6 100644
--- a/lib/cpp/src/protocol/TDenseProtocol.cpp
+++ b/lib/cpp/src/protocol/TDenseProtocol.cpp
@@ -4,12 +4,17 @@
// See accompanying file LICENSE or visit the Thrift site at:
// http://developers.facebook.com/thrift/
+#define __STDC_LIMIT_MACROS
+#include <stdint.h>
#include "TDenseProtocol.h"
#include "TReflectionLocal.h"
// XXX for debugging (duh)
#define DEBUG_TDENSEPROTOCOL
+// XXX for development.
+#define TDENSE_PROTOCOL_MEASURE_VLI
+
// The XXX above does not apply to this.
#ifdef DEBUG_TDENSEPROTOCOL
#undef NDEBUG
@@ -18,6 +23,12 @@
using std::string;
+#ifdef __GNUC__
+#define UNLIKELY(val) (__builtin_expect((val), 0))
+#else
+#define UNLIKELY(val) (val)
+#endif
+
namespace facebook { namespace thrift { namespace protocol {
// Top TypeSpec. TypeSpec of the structure being encoded.
@@ -75,6 +86,8 @@
uint32_t TDenseProtocol::writeMessageBegin(const std::string& name,
const TMessageType messageType,
const int32_t seqid) {
+ throw TApplicationException("TDenseProtocol doesn't work with messages (yet).");
+
int32_t version = (VERSION_2) | ((int32_t)messageType);
uint32_t wsize = 0;
wsize += subWriteI32(version);
@@ -207,44 +220,49 @@
return TBinaryProtocol::writeByte(byte);
}
-
-
-// XXX Remove this code for collecting statistics.
-static int vli_size(uint64_t towrite) {
- int count = 0;
- while (true) {
- towrite = towrite >> 7;
- if (towrite == 0) {
- return count+1;
- }
- }
-}
-
uint32_t TDenseProtocol::writeI16(const int16_t i16) {
- vli_save_16 += 2 - vli_size(i16);
- if (i16 < 0) negs++;
-
checkTType(T_I16);
stateTransition();
- return TBinaryProtocol::writeI16(i16);
+
+ uint32_t rv = vliWrite(i16);
+#ifdef TDENSE_PROTOCOL_MEASURE_VLI
+ vli_save_16 += 2 - rv;
+ if (i16 < 0) {
+ negs++;
+ }
+#endif
+
+ return rv;
}
uint32_t TDenseProtocol::writeI32(const int32_t i32) {
- vli_save_32 += 4 - vli_size(i32);
- if (i32 < 0) negs++;
-
checkTType(T_I32);
stateTransition();
- return TBinaryProtocol::writeI32(i32);
+
+ uint32_t rv = vliWrite(i32);
+#ifdef TDENSE_PROTOCOL_MEASURE_VLI
+ vli_save_32 += 4 - rv;
+ if (i32 < 0) {
+ negs++;
+ }
+#endif
+
+ return rv;
}
uint32_t TDenseProtocol::writeI64(const int64_t i64) {
- vli_save_64 += 8 - vli_size(i64);
- if (i64 < 0) negs++;
-
checkTType(T_I64);
stateTransition();
- return TBinaryProtocol::writeI64(i64);
+
+ uint32_t rv = vliWrite(i64);
+#ifdef TDENSE_PROTOCOL_MEASURE_VLI
+ vli_save_64 += 8 - rv;
+ if (i64 < 0) {
+ negs++;
+ }
+#endif
+
+ return rv;
}
uint32_t TDenseProtocol::writeDouble(const double dub) {
@@ -259,17 +277,17 @@
return subWriteString(str);
}
-// XXX this can go into .h when we delete instrumentaion. (See subWritebool)
inline uint32_t TDenseProtocol::subWriteI32(const int32_t i32) {
- vli_save_sub += 4 - vli_size(i32);
- if (i32 < 0) negs++;
-
- int32_t net = (int32_t)htonl(i32);
- trans_->write((uint8_t*)&net, 4);
- return 4;
+ uint32_t rv = vliWrite(i32);
+#ifdef TDENSE_PROTOCOL_MEASURE_VLI
+ vli_save_sub += 4 - rv;
+ if (i32 < 0) {
+ negs++;
+ }
+#endif
+ return rv;
}
-// XXX Delete when subWriteI32 goes into .h
uint32_t TDenseProtocol::subWriteString(const std::string& str) {
uint32_t size = str.size();
uint32_t xfer = subWriteI32((int32_t)size);
@@ -279,6 +297,70 @@
return xfer + size;
}
+inline uint32_t TDenseProtocol::vliRead(uint64_t& vli) {
+ uint32_t used = 0;
+ uint64_t val = 0;
+ uint8_t buf[10]; // 64 bits / (7 bits/byte) = 10 bytes.
+ bool borrowed = trans_->borrow(buf, sizeof(buf));
+
+ // Fast path. TODO(dreiss): Make it faster.
+ if (borrowed) {
+ while (true) {
+ uint8_t byte = buf[used];
+ used++;
+ val = (val << 7) | (byte & 0x7f);
+ if (!(byte & 0x80)) {
+ vli = val;
+ trans_->consume(used);
+ return used;
+ }
+ // Have to check for invalid data so we don't crash.
+ if (UNLIKELY(used == sizeof(buf))) {
+ throw TProtocolException(TProtocolException::INVALID_DATA, "Variable-length int over 10 bytes.");
+ }
+ }
+ }
+
+ // Slow path.
+ else {
+ while (true) {
+ uint8_t byte;
+ used += trans_->readAll(&byte, 1);
+ val = (val << 7) | (byte & 0x7f);
+ if (!(byte & 0x80)) {
+ vli = val;
+ return used;
+ }
+ // Might as well check for invalid data on the slow path too.
+ if (UNLIKELY(used >= sizeof(buf))) {
+ throw TProtocolException(TProtocolException::INVALID_DATA, "Variable-length int over 10 bytes.");
+ }
+ }
+ }
+}
+
+inline uint32_t TDenseProtocol::vliWrite(uint64_t vli) {
+ uint8_t buf[10]; // 64 bits / (7 bits/byte) = 10 bytes.
+ int32_t pos = sizeof(buf) - 1;
+
+ // Write the thing from back to front.
+ buf[pos] = vli & 0x7f;
+ vli >>= 7;
+ pos--;
+
+ while (vli > 0) {
+ assert(pos >= 0);
+ buf[pos] = (vli | 0x80);
+ vli >>= 7;
+ pos--;
+ }
+
+ // Back up one step before writing.
+ pos++;
+
+ trans_->write(buf+pos, sizeof(buf) - pos);
+ return sizeof(buf) - pos;
+}
/**
* Reading functions
@@ -287,6 +369,8 @@
uint32_t TDenseProtocol::readMessageBegin(std::string& name,
TMessageType& messageType,
int32_t& seqid) {
+ throw TApplicationException("TDenseProtocol doesn't work with messages (yet).");
+
uint32_t xfer = 0;
int32_t sz;
xfer += subReadI32(sz);
@@ -450,19 +534,43 @@
uint32_t TDenseProtocol::readI16(int16_t& i16) {
checkTType(T_I16);
stateTransition();
- return TBinaryProtocol::readI16(i16);
+ uint64_t u64;
+ uint32_t rv = vliRead(u64);
+ int64_t val = (int64_t)u64;
+ if (UNLIKELY(val > INT16_MAX || val < INT16_MIN)) {
+ throw TProtocolException(TProtocolException::INVALID_DATA,
+ "i16 out of range.");
+ }
+ i16 = (int16_t)val;
+ return rv;
}
uint32_t TDenseProtocol::readI32(int32_t& i32) {
checkTType(T_I32);
stateTransition();
- return TBinaryProtocol::readI32(i32);
+ uint64_t u64;
+ uint32_t rv = vliRead(u64);
+ int64_t val = (int64_t)u64;
+ if (UNLIKELY(val > INT32_MAX || val < INT32_MIN)) {
+ throw TProtocolException(TProtocolException::INVALID_DATA,
+ "i32 out of range.");
+ }
+ i32 = (int32_t)val;
+ return rv;
}
uint32_t TDenseProtocol::readI64(int64_t& i64) {
checkTType(T_I64);
stateTransition();
- return TBinaryProtocol::readI64(i64);
+ uint64_t u64;
+ uint32_t rv = vliRead(u64);
+ int64_t val = (int64_t)u64;
+ if (UNLIKELY(val > INT64_MAX || val < INT64_MIN)) {
+ throw TProtocolException(TProtocolException::INVALID_DATA,
+ "i64 out of range.");
+ }
+ i64 = (int64_t)val;
+ return rv;
}
uint32_t TDenseProtocol::readDouble(double& dub) {
@@ -477,4 +585,23 @@
return subReadString(str);
}
+uint32_t TDenseProtocol::subReadI32(int32_t& i32) {
+ uint64_t u64;
+ uint32_t rv = vliRead(u64);
+ int64_t val = (int64_t)u64;
+ if (UNLIKELY(val > INT32_MAX || val < INT32_MIN)) {
+ throw TProtocolException(TProtocolException::INVALID_DATA,
+ "i32 out of range.");
+ }
+ i32 = (int32_t)val;
+ return rv;
+}
+
+uint32_t TDenseProtocol::subReadString(std::string& str) {
+ uint32_t xfer;
+ int32_t size;
+ xfer = subReadI32(size);
+ return xfer + readStringBody(str, size);
+}
+
}}} // facebook::thrift::protocol
diff --git a/lib/cpp/src/protocol/TDenseProtocol.h b/lib/cpp/src/protocol/TDenseProtocol.h
index ffc6afa..edd5c88 100644
--- a/lib/cpp/src/protocol/TDenseProtocol.h
+++ b/lib/cpp/src/protocol/TDenseProtocol.h
@@ -30,7 +30,7 @@
class TDenseProtocol : public TBinaryProtocol {
protected:
static const int32_t VERSION_MASK = 0xffff0000;
- // VERSION_2 (0x80010000) is taken by TBinaryProtocol.
+ // VERSION_1 (0x80010000) is taken by TBinaryProtocol.
static const int32_t VERSION_2 = 0x80020000;
public:
@@ -131,24 +131,12 @@
*/
inline uint32_t subWriteI32(const int32_t i32);
+ inline uint32_t subWriteString(const std::string& str);
+
uint32_t subWriteBool(const bool value) {
return TBinaryProtocol::writeBool(value);
}
-#if 0
- // Use this version when subWriteI32 is moved in here.
- uint32_t subWriteString(const std::string& str) {
- uint32_t size = str.size();
- uint32_t xfer = subWriteI32((int32_t)size);
- if (size > 0) {
- trans_->write((uint8_t*)str.data(), size);
- }
- return xfer + size;
- }
-#else
- inline uint32_t subWriteString(const std::string& str);
-#endif
-
/*
* Reading functions
@@ -204,28 +192,25 @@
/*
* Helper reading functions (don't do state transitions).
*/
- uint32_t subReadI32(int32_t& i32) {
- return TBinaryProtocol::readI32(i32);
- }
+ inline uint32_t subReadI32(int32_t& i32);
+
+ inline uint32_t subReadString(std::string& str);
uint32_t subReadBool(bool& value) {
return TBinaryProtocol::readBool(value);
}
- uint32_t subReadString(std::string& str) {
- uint32_t xfer;
- int32_t size;
- xfer = subReadI32(size);
- return xfer + readStringBody(str, size);
- }
-
-
private:
inline void checkTType(const TType ttype);
inline void stateTransition();
+ // Read and write variable-length integers.
+ // Uses the same technique as the MIDI file format.
+ inline uint32_t vliRead(uint64_t& vli);
+ inline uint32_t vliWrite(uint64_t vli);
+
TypeSpec* type_spec_;
std::vector<TypeSpec*> ts_stack_; // TypeSpec stack.