Thrift: TDenseProtocol using variable-length integers.

Reviewed By: mcslee

Test Plan:
test/DenseProtoTest.cpp

Still have to test:
- Bounds checking.
- borrow/consume on TBuffered and TFramed.

Revert Plan: ok


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665252 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/protocol/TDenseProtocol.cpp b/lib/cpp/src/protocol/TDenseProtocol.cpp
index 095dd64..99117d6 100644
--- a/lib/cpp/src/protocol/TDenseProtocol.cpp
+++ b/lib/cpp/src/protocol/TDenseProtocol.cpp
@@ -4,12 +4,17 @@
 // See accompanying file LICENSE or visit the Thrift site at:
 // http://developers.facebook.com/thrift/
 
+#define __STDC_LIMIT_MACROS
+#include <stdint.h>
 #include "TDenseProtocol.h"
 #include "TReflectionLocal.h"
 
 // XXX for debugging (duh)
 #define DEBUG_TDENSEPROTOCOL
 
+// XXX for development.
+#define TDENSE_PROTOCOL_MEASURE_VLI
+
 // The XXX above does not apply to this.
 #ifdef DEBUG_TDENSEPROTOCOL
 #undef NDEBUG
@@ -18,6 +23,12 @@
 
 using std::string;
 
+#ifdef __GNUC__
+#define UNLIKELY(val) (__builtin_expect((val), 0))
+#else
+#define UNLIKELY(val) (val)
+#endif
+
 namespace facebook { namespace thrift { namespace protocol {
 
 // Top TypeSpec.  TypeSpec of the structure being encoded.
@@ -75,6 +86,8 @@
 uint32_t TDenseProtocol::writeMessageBegin(const std::string& name,
                                            const TMessageType messageType,
                                            const int32_t seqid) {
+  throw TApplicationException("TDenseProtocol doesn't work with messages (yet).");
+
   int32_t version = (VERSION_2) | ((int32_t)messageType);
   uint32_t wsize = 0;
   wsize += subWriteI32(version);
@@ -207,44 +220,49 @@
   return TBinaryProtocol::writeByte(byte);
 }
 
-
-
-// XXX Remove this code for collecting statistics.
-static int vli_size(uint64_t towrite) {
-  int count = 0;
-  while (true) {
-    towrite = towrite >> 7;
-    if (towrite == 0) {
-      return count+1;
-    }
-  }
-}
-
 uint32_t TDenseProtocol::writeI16(const int16_t i16) {
-  vli_save_16 += 2 - vli_size(i16);
-  if (i16 < 0) negs++;
-
   checkTType(T_I16);
   stateTransition();
-  return TBinaryProtocol::writeI16(i16);
+
+  uint32_t rv = vliWrite(i16);
+#ifdef TDENSE_PROTOCOL_MEASURE_VLI
+  vli_save_16 += 2 - rv;
+  if (i16 < 0) {
+    negs++;
+  }
+#endif
+
+  return rv;
 }
 
 uint32_t TDenseProtocol::writeI32(const int32_t i32) {
-  vli_save_32 += 4 - vli_size(i32);
-  if (i32 < 0) negs++;
-
   checkTType(T_I32);
   stateTransition();
-  return TBinaryProtocol::writeI32(i32);
+
+  uint32_t rv = vliWrite(i32);
+#ifdef TDENSE_PROTOCOL_MEASURE_VLI
+  vli_save_32 += 4 - rv;
+  if (i32 < 0) {
+    negs++;
+  }
+#endif
+
+  return rv;
 }
 
 uint32_t TDenseProtocol::writeI64(const int64_t i64) {
-  vli_save_64 += 8 - vli_size(i64);
-  if (i64 < 0) negs++;
-
   checkTType(T_I64);
   stateTransition();
-  return TBinaryProtocol::writeI64(i64);
+
+  uint32_t rv = vliWrite(i64);
+#ifdef TDENSE_PROTOCOL_MEASURE_VLI
+  vli_save_64 += 8 - rv;
+  if (i64 < 0) {
+    negs++;
+  }
+#endif
+
+  return rv;
 }
 
 uint32_t TDenseProtocol::writeDouble(const double dub) {
@@ -259,17 +277,17 @@
   return subWriteString(str);
 }
 
-// XXX this can go into .h when we delete instrumentaion.  (See subWritebool)
 inline uint32_t TDenseProtocol::subWriteI32(const int32_t i32) {
-  vli_save_sub += 4 - vli_size(i32);
-  if (i32 < 0) negs++;
-
-  int32_t net = (int32_t)htonl(i32);
-  trans_->write((uint8_t*)&net, 4);
-  return 4;
+  uint32_t rv = vliWrite(i32);
+#ifdef TDENSE_PROTOCOL_MEASURE_VLI
+  vli_save_sub += 4 - rv;
+  if (i32 < 0) {
+    negs++;
+  }
+#endif
+  return rv;
 }
 
-// XXX Delete when subWriteI32 goes into .h
 uint32_t TDenseProtocol::subWriteString(const std::string& str) {
   uint32_t size = str.size();
   uint32_t xfer = subWriteI32((int32_t)size);
@@ -279,6 +297,70 @@
   return xfer + size;
 }
 
+inline uint32_t TDenseProtocol::vliRead(uint64_t& vli) {
+  uint32_t used = 0;
+  uint64_t val = 0;
+  uint8_t buf[10];  // 64 bits / (7 bits/byte) = 10 bytes.
+  bool borrowed = trans_->borrow(buf, sizeof(buf));
+
+  // Fast path.  TODO(dreiss): Make it faster.
+  if (borrowed) {
+    while (true) {
+      uint8_t byte = buf[used];
+      used++;
+      val = (val << 7) | (byte & 0x7f);
+      if (!(byte & 0x80)) {
+        vli = val;
+        trans_->consume(used);
+        return used;
+      }
+      // Have to check for invalid data so we don't crash.
+      if (UNLIKELY(used == sizeof(buf))) {
+        throw TProtocolException(TProtocolException::INVALID_DATA, "Variable-length int over 10 bytes.");
+      }
+    }
+  }
+
+  // Slow path.
+  else {
+    while (true) {
+      uint8_t byte;
+      used += trans_->readAll(&byte, 1);
+      val = (val << 7) | (byte & 0x7f);
+      if (!(byte & 0x80)) {
+        vli = val;
+        return used;
+      }
+      // Might as well check for invalid data on the slow path too.
+      if (UNLIKELY(used >= sizeof(buf))) {
+        throw TProtocolException(TProtocolException::INVALID_DATA, "Variable-length int over 10 bytes.");
+      }
+    }
+  }
+}
+
+inline uint32_t TDenseProtocol::vliWrite(uint64_t vli) {
+  uint8_t buf[10];  // 64 bits / (7 bits/byte) = 10 bytes.
+  int32_t pos = sizeof(buf) - 1;
+
+  // Write the thing from back to front.
+  buf[pos] = vli & 0x7f;
+  vli >>= 7;
+  pos--;
+
+  while (vli > 0) {
+    assert(pos >= 0);
+    buf[pos] = (vli | 0x80);
+    vli >>= 7;
+    pos--;
+  }
+
+  // Back up one step before writing.
+  pos++;
+
+  trans_->write(buf+pos, sizeof(buf) - pos);
+  return sizeof(buf) - pos;
+}
 
 /**
  * Reading functions
@@ -287,6 +369,8 @@
 uint32_t TDenseProtocol::readMessageBegin(std::string& name,
                                           TMessageType& messageType,
                                           int32_t& seqid) {
+  throw TApplicationException("TDenseProtocol doesn't work with messages (yet).");
+
   uint32_t xfer = 0;
   int32_t sz;
   xfer += subReadI32(sz);
@@ -450,19 +534,43 @@
 uint32_t TDenseProtocol::readI16(int16_t& i16) {
   checkTType(T_I16);
   stateTransition();
-  return TBinaryProtocol::readI16(i16);
+  uint64_t u64;
+  uint32_t rv = vliRead(u64);
+  int64_t val = (int64_t)u64;
+  if (UNLIKELY(val > INT16_MAX || val < INT16_MIN)) {
+    throw TProtocolException(TProtocolException::INVALID_DATA,
+                             "i16 out of range.");
+  }
+  i16 = (int16_t)val;
+  return rv;
 }
 
 uint32_t TDenseProtocol::readI32(int32_t& i32) {
   checkTType(T_I32);
   stateTransition();
-  return TBinaryProtocol::readI32(i32);
+  uint64_t u64;
+  uint32_t rv = vliRead(u64);
+  int64_t val = (int64_t)u64;
+  if (UNLIKELY(val > INT32_MAX || val < INT32_MIN)) {
+    throw TProtocolException(TProtocolException::INVALID_DATA,
+                             "i32 out of range.");
+  }
+  i32 = (int32_t)val;
+  return rv;
 }
 
 uint32_t TDenseProtocol::readI64(int64_t& i64) {
   checkTType(T_I64);
   stateTransition();
-  return TBinaryProtocol::readI64(i64);
+  uint64_t u64;
+  uint32_t rv = vliRead(u64);
+  int64_t val = (int64_t)u64;
+  if (UNLIKELY(val > INT64_MAX || val < INT64_MIN)) {
+    throw TProtocolException(TProtocolException::INVALID_DATA,
+                             "i64 out of range.");
+  }
+  i64 = (int64_t)val;
+  return rv;
 }
 
 uint32_t TDenseProtocol::readDouble(double& dub) {
@@ -477,4 +585,23 @@
   return subReadString(str);
 }
 
+uint32_t TDenseProtocol::subReadI32(int32_t& i32) {
+  uint64_t u64;
+  uint32_t rv = vliRead(u64);
+  int64_t val = (int64_t)u64;
+  if (UNLIKELY(val > INT32_MAX || val < INT32_MIN)) {
+    throw TProtocolException(TProtocolException::INVALID_DATA,
+                             "i32 out of range.");
+  }
+  i32 = (int32_t)val;
+  return rv;
+}
+
+uint32_t TDenseProtocol::subReadString(std::string& str) {
+  uint32_t xfer;
+  int32_t size;
+  xfer = subReadI32(size);
+  return xfer + readStringBody(str, size);
+}
+
 }}} // facebook::thrift::protocol
diff --git a/lib/cpp/src/protocol/TDenseProtocol.h b/lib/cpp/src/protocol/TDenseProtocol.h
index ffc6afa..edd5c88 100644
--- a/lib/cpp/src/protocol/TDenseProtocol.h
+++ b/lib/cpp/src/protocol/TDenseProtocol.h
@@ -30,7 +30,7 @@
 class TDenseProtocol : public TBinaryProtocol {
  protected:
   static const int32_t VERSION_MASK = 0xffff0000;
-  // VERSION_2 (0x80010000)  is taken by TBinaryProtocol.
+  // VERSION_1 (0x80010000)  is taken by TBinaryProtocol.
   static const int32_t VERSION_2 = 0x80020000;
 
  public:
@@ -131,24 +131,12 @@
    */
   inline uint32_t subWriteI32(const int32_t i32);
 
+  inline uint32_t subWriteString(const std::string& str);
+
   uint32_t subWriteBool(const bool value) {
     return TBinaryProtocol::writeBool(value);
   }
 
-#if 0
-  // Use this version when subWriteI32 is moved in here.
-  uint32_t subWriteString(const std::string& str) {
-    uint32_t size = str.size();
-    uint32_t xfer = subWriteI32((int32_t)size);
-    if (size > 0) {
-      trans_->write((uint8_t*)str.data(), size);
-    }
-    return xfer + size;
-  }
-#else
-  inline uint32_t subWriteString(const std::string& str);
-#endif
-
 
   /*
    * Reading functions
@@ -204,28 +192,25 @@
   /*
    * Helper reading functions (don't do state transitions).
    */
-  uint32_t subReadI32(int32_t& i32) {
-    return TBinaryProtocol::readI32(i32);
-  }
+  inline uint32_t subReadI32(int32_t& i32);
+
+  inline uint32_t subReadString(std::string& str);
 
   uint32_t subReadBool(bool& value) {
     return TBinaryProtocol::readBool(value);
   }
 
-  uint32_t subReadString(std::string& str) {
-    uint32_t xfer;
-    int32_t size;
-    xfer = subReadI32(size);
-    return xfer + readStringBody(str, size);
-  }
-
-
 
  private:
 
   inline void checkTType(const TType ttype);
   inline void stateTransition();
 
+  // Read and write variable-length integers.
+  // Uses the same technique as the MIDI file format.
+  inline uint32_t vliRead(uint64_t& vli);
+  inline uint32_t vliWrite(uint64_t vli);
+
   TypeSpec* type_spec_;
 
   std::vector<TypeSpec*> ts_stack_;   // TypeSpec stack.