Thrift: Re-committing zlib.
Summary:
Same as the last (reverted) zlib patch,
but this time with way more awesome support for building
with no zlib headers installed.
Reviewed By: mcslee
Test Plan:
- Did lots of really pathological stuff in my VMware.
- On devrs004:
./bootstrap.sh && ./configure && make && make install DESTDIR=/tmp/tzinst && echo "Yay"
Revert Plan: ok
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665269 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/Makefile.am b/lib/cpp/Makefile.am
index b80bcc3..7b1f1b0 100644
--- a/lib/cpp/Makefile.am
+++ b/lib/cpp/Makefile.am
@@ -4,11 +4,14 @@
if AMX_HAVE_LIBEVENT
lib_LTLIBRARIES += libthriftnb.la
endif
+if AMX_HAVE_ZLIB
+lib_LTLIBRARIES += libthriftz.la
+endif
common_cxxflags = -Wall -Isrc $(BOOST_CPPFLAGS)
common_ldflags = -Wall $(BOOST_LDFLAGS)
-# Define the source file for the module
+# Define the source files for the module
libthrift_la_SOURCES = src/Thrift.cpp \
src/reflection_limited_types.cpp \
@@ -32,11 +35,21 @@
src/server/TThreadedServer.cpp \
src/processor/PeekProcessor.cpp
-
libthriftnb_la_SOURCES = src/server/TNonblockingServer.cpp
+libthriftz_la_SOURCES = src/transport/TZlibTransport.cpp
+
+
+# Flags for the various libraries
+
libthrift_la_CXXFLAGS = $(common_cxxflags)
-libthriftnb_la_CXXFLAGS = $(common_cxxflags) $(LIBEVENT_CPPFLAGS)
+
+libthriftnb_la_CXXFLAGS = $(common_cxxflags)
+libthriftnb_la_CPPFLAGS = $(LIBEVENT_CPPFLAGS)
+
+libthriftz_la_CXXFLAGS = $(common_cxxflags)
+libthriftz_la_CPPFLAGS = $(ZLIB_CPPFLAGS)
+
include_thriftdir = $(includedir)/thrift
include_thrift_HEADERS = \
@@ -76,7 +89,8 @@
src/transport/TSocketPool.h \
src/transport/TTransport.h \
src/transport/TTransportException.h \
- src/transport/TTransportUtils.h
+ src/transport/TTransportUtils.h \
+ src/transport/TZlibTransport.h
include_serverdir = $(include_thriftdir)/server
include_server_HEADERS = \
diff --git a/lib/cpp/aclocal/ax_lib_event.m4 b/lib/cpp/aclocal/ax_lib_event.m4
index 14f115e..9266df4 100644
--- a/lib/cpp/aclocal/ax_lib_event.m4
+++ b/lib/cpp/aclocal/ax_lib_event.m4
@@ -1,6 +1,6 @@
dnl @synopsis AX_LIB_EVENT([MINIMUM-VERSION])
dnl
-dnl Test for the libz library of a particular version (or newer).
+dnl Test for the libevent library of a particular version (or newer).
dnl
dnl If no path to the installed libevent is given, the macro will first try
dnl using no -I or -L flags, then searches under /usr, /usr/local, /opt,
diff --git a/lib/cpp/aclocal/ax_lib_zlib.m4 b/lib/cpp/aclocal/ax_lib_zlib.m4
new file mode 100644
index 0000000..40e96df
--- /dev/null
+++ b/lib/cpp/aclocal/ax_lib_zlib.m4
@@ -0,0 +1,169 @@
+dnl @synopsis AX_LIB_ZLIB([MINIMUM-VERSION])
+dnl
+dnl Test for the libz library of a particular version (or newer).
+dnl
+dnl If no path to the installed zlib is given, the macro will first try
+dnl using no -I or -L flags, then searches under /usr, /usr/local, /opt,
+dnl and /opt/zlib.
+dnl If these all fail, it will try the $ZLIB_ROOT environment variable.
+dnl
+dnl This macro calls:
+dnl AC_SUBST(ZLIB_CPPFLAGS)
+dnl AC_SUBST(ZLIB_LDFLAGS)
+dnl AC_SUBST(ZLIB_LIBS)
+dnl
+dnl And (if zlib is found):
+dnl AC_DEFINE(HAVE_ZLIB)
+dnl
+dnl It also leaves the shell variables "success" and "ax_have_zlib"
+dnl set to "yes" or "no".
+dnl
+dnl NOTE: This macro does not currently work for cross-compiling,
+dnl but it can be easily modified to allow it. (grep "cross").
+dnl
+dnl @category InstalledPackages
+dnl @category C
+dnl @author David Reiss <dreiss@facebook.com>
+dnl @version 2007-09-12
+dnl @license AllPermissive
+
+dnl Input: ax_zlib_path, WANT_ZLIB_VERSION
+dnl Output: success=yes/no
+AC_DEFUN([AX_LIB_ZLIB_DO_CHECK],
+ [
+ # Save our flags.
+ CPPFLAGS_SAVED="$CPPFLAGS"
+ LDFLAGS_SAVED="$LDFLAGS"
+ LIBS_SAVED="$LIBS"
+ LD_LIBRARY_PATH_SAVED="$LD_LIBRARY_PATH"
+
+ # Set our flags if we are checking a specific directory.
+ if test -n "$ax_zlib_path" ; then
+ ZLIB_CPPFLAGS="-I$ax_zlib_path/include"
+ ZLIB_LDFLAGS="-L$ax_zlib_path/lib"
+ LD_LIBRARY_PATH="$ax_zlib_path/lib:$LD_LIBRARY_PATH"
+ else
+ ZLIB_CPPFLAGS=""
+ ZLIB_LDFLAGS=""
+ fi
+
+ # Required flag for zlib.
+ ZLIB_LIBS="-lz"
+
+ # Prepare the environment for compilation.
+ CPPFLAGS="$CPPFLAGS $ZLIB_CPPFLAGS"
+ LDFLAGS="$LDFLAGS $ZLIB_LDFLAGS"
+ LIBS="$LIBS $ZLIB_LIBS"
+ export CPPFLAGS
+ export LDFLAGS
+ export LIBS
+ export LD_LIBRARY_PATH
+
+ success=no
+
+ # Compile, link, and run the program. This checks:
+ # - zlib.h is available for including.
+ # - zlibVersion() is available for linking.
+ # - ZLIB_VERNUM is greater than or equal to the desired version.
+ # - ZLIB_VERSION (defined in zlib.h) matches zlibVersion()
+ # (defined in the library).
+ AC_LANG_PUSH([C])
+ dnl This can be changed to AC_LINK_IFELSE if you are cross-compiling.
+ AC_RUN_IFELSE([AC_LANG_PROGRAM([[
+ #include <zlib.h>
+ #if ZLIB_VERNUM >= 0x$WANT_ZLIB_VERSION
+ #else
+ # error zlib is too old
+ #endif
+ ]], [[
+ const char* lib_version = zlibVersion();
+ const char* hdr_version = ZLIB_VERSION;
+ for (;;) {
+ if (*lib_version != *hdr_version) {
+ /* If this happens, your zlib header doesn't match your zlib */
+ /* library. That is really bad. */
+ return 1;
+ }
+ if (*lib_version == '\0') {
+ break;
+ }
+ lib_version++;
+ hdr_version++;
+ }
+ return 0;
+ ]])], [
+ success=yes
+ ])
+ AC_LANG_POP([C])
+
+ # Restore flags.
+ CPPFLAGS="$CPPFLAGS_SAVED"
+ LDFLAGS="$LDFLAGS_SAVED"
+ LIBS="$LIBS_SAVED"
+ LD_LIBRARY_PATH="$LD_LIBRARY_PATH_SAVED"
+ ])
+
+
+AC_DEFUN([AX_LIB_ZLIB],
+ [
+
+ dnl Allow search path to be overridden on the command line.
+ AC_ARG_WITH([zlib],
+ AS_HELP_STRING([--with-zlib@<:@=DIR@:>@], [use zlib (default is yes) - it is possible to specify an alternate root directory for zlib]),
+ [
+ if test "$withval" = "xno"; then
+ want_zlib="no"
+ elif test "$withval" = "xyes"; then
+ want_zlib="yes"
+ ax_zlib_path=""
+ else
+ want_zlib="yes"
+ ax_zlib_path="$withval"
+ fi
+ ],
+ [want_zlib="yes" ; ax_zlib_path="" ])
+
+
+ if test "$want_zlib" = "yes"; then
+ # Parse out the version.
+ zlib_version_req=ifelse([$1], ,1.2.3,$1)
+ zlib_version_req_major=`expr $zlib_version_req : '\([[0-9]]*\)'`
+ zlib_version_req_minor=`expr $zlib_version_req : '[[0-9]]*\.\([[0-9]]*\)'`
+ zlib_version_req_patch=`expr $zlib_version_req : '[[0-9]]*\.[[0-9]]*\.\([[0-9]]*\)'`
+ if test -z "$zlib_version_req_patch" ; then
+ zlib_version_req_patch="0"
+ fi
+ WANT_ZLIB_VERSION=`expr $zlib_version_req_major \* 1000 \+ $zlib_version_req_minor \* 100 \+ $zlib_version_req_patch \* 10`
+
+ AC_MSG_CHECKING(for zlib >= $zlib_version_req)
+
+ # Run tests.
+ if test -n "$ax_zlib_path"; then
+ AX_LIB_ZLIB_DO_CHECK
+ else
+ for ax_zlib_path in "" /usr /usr/local /opt /opt/zlib "$ZLIB_ROOT" ; do
+ AX_LIB_ZLIB_DO_CHECK
+ if test "$success" = "yes"; then
+ break;
+ fi
+ done
+ fi
+
+ if test "$success" != "yes" ; then
+ AC_MSG_RESULT(no)
+ ZLIB_CPPFLAGS=""
+ ZLIB_LDFLAGS=""
+ ZLIB_LIBS=""
+ else
+ AC_MSG_RESULT(yes)
+ AC_DEFINE(HAVE_ZLIB,,[define if zlib is available])
+ fi
+
+ ax_have_zlib="$success"
+
+ AC_SUBST(ZLIB_CPPFLAGS)
+ AC_SUBST(ZLIB_LDFLAGS)
+ AC_SUBST(ZLIB_LIBS)
+ fi
+
+ ])
diff --git a/lib/cpp/configure.ac b/lib/cpp/configure.ac
index 5e33617..d9a66da 100644
--- a/lib/cpp/configure.ac
+++ b/lib/cpp/configure.ac
@@ -71,6 +71,9 @@
AX_LIB_EVENT([1.0])
AM_CONDITIONAL([AMX_HAVE_LIBEVENT], [test "$success" = "yes"])
+AX_LIB_ZLIB([1.2.3])
+AM_CONDITIONAL([AMX_HAVE_ZLIB], [test "$success" = "yes"])
+
AC_CHECK_LIB(pthread, pthread_create)
AC_CHECK_LIB(rt, sched_get_priority_min)
diff --git a/lib/cpp/src/Thrift.h b/lib/cpp/src/Thrift.h
index 0b13bae..ad8d46c 100644
--- a/lib/cpp/src/Thrift.h
+++ b/lib/cpp/src/Thrift.h
@@ -26,8 +26,8 @@
namespace facebook { namespace thrift {
-class TOutput{
-public:
+class TOutput {
+ public:
TOutput() : f_(&perrorTimeWrapper) {}
inline void setOutputFunction(void (*function)(const char *)){
@@ -47,7 +47,7 @@
fprintf(stderr, "%s ", dbgtime);
perror(msg);
}
-private:
+ private:
void (*f_)(const char *);
};
@@ -58,7 +58,7 @@
}
class TException : public std::exception {
-public:
+ public:
TException() {}
TException(const std::string& message) :
@@ -74,13 +74,13 @@
}
}
-protected:
+ protected:
std::string message_;
};
class TApplicationException : public TException {
-public:
+ public:
/**
* Error codes for the various types of exceptions.
@@ -134,7 +134,7 @@
uint32_t read(protocol::TProtocol* iprot);
uint32_t write(protocol::TProtocol* oprot) const;
-protected:
+ protected:
/**
* Error code
*/
diff --git a/lib/cpp/src/transport/TTransportException.h b/lib/cpp/src/transport/TTransportException.h
index bb8b8a9..9b75826 100644
--- a/lib/cpp/src/transport/TTransportException.h
+++ b/lib/cpp/src/transport/TTransportException.h
@@ -34,7 +34,9 @@
TIMED_OUT = 3,
END_OF_FILE = 4,
INTERRUPTED = 5,
- BAD_ARGS = 6
+ BAD_ARGS = 6,
+ CORRUPTED_DATA = 7,
+ INTERNAL_ERROR = 8,
};
TTransportException() :
diff --git a/lib/cpp/src/transport/TZlibTransport.cpp b/lib/cpp/src/transport/TZlibTransport.cpp
new file mode 100644
index 0000000..59e3a36
--- /dev/null
+++ b/lib/cpp/src/transport/TZlibTransport.cpp
@@ -0,0 +1,285 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#include <cassert>
+#include <algorithm>
+#include <transport/TZlibTransport.h>
+#include <zlib.h>
+
+using std::string;
+
+namespace facebook { namespace thrift { namespace transport {
+
+// Don't call this outside of the constructor.
+void TZlibTransport::initZlib() {
+ int rv;
+ bool r_init = false;
+ try {
+ rstream_ = new z_stream;
+ wstream_ = new z_stream;
+
+ rstream_->zalloc = Z_NULL;
+ wstream_->zalloc = Z_NULL;
+ rstream_->zfree = Z_NULL;
+ wstream_->zfree = Z_NULL;
+ rstream_->opaque = Z_NULL;
+ wstream_->opaque = Z_NULL;
+
+ rstream_->next_in = crbuf_;
+ wstream_->next_in = uwbuf_;
+ rstream_->next_out = urbuf_;
+ wstream_->next_out = cwbuf_;
+ rstream_->avail_in = 0;
+ wstream_->avail_in = 0;
+ rstream_->avail_out = urbuf_size_;
+ wstream_->avail_out = cwbuf_size_;
+
+ rv = inflateInit(rstream_);
+ checkZlibRv(rv, rstream_->msg);
+
+ // Have to set this flag so we know whether to de-initialize.
+ r_init = true;
+
+ rv = deflateInit(wstream_, Z_DEFAULT_COMPRESSION);
+ checkZlibRv(rv, wstream_->msg);
+ }
+
+ catch (...) {
+ if (r_init) {
+ rv = inflateEnd(rstream_);
+ checkZlibRvNothrow(rv, rstream_->msg);
+ }
+ // There is no way we can get here if wstream_ was initialized.
+
+ throw;
+ }
+}
+
+inline void TZlibTransport::checkZlibRv(int status, const char* message) {
+ if (status != Z_OK) {
+ throw TZlibTransportException(status, message);
+ }
+}
+
+inline void TZlibTransport::checkZlibRvNothrow(int status, const char* message) {
+ if (status != Z_OK) {
+ string output = "TZlibTransport: zlib failure in destructor: " +
+ TZlibTransportException::errorMessage(status, message);
+ GlobalOutput(output.c_str());
+ }
+}
+
+TZlibTransport::~TZlibTransport() {
+ int rv;
+ rv = inflateEnd(rstream_);
+ checkZlibRvNothrow(rv, rstream_->msg);
+ rv = deflateEnd(wstream_);
+ checkZlibRvNothrow(rv, wstream_->msg);
+
+ delete[] urbuf_;
+ delete[] crbuf_;
+ delete[] uwbuf_;
+ delete[] cwbuf_;
+ delete rstream_;
+ delete wstream_;
+}
+
+bool TZlibTransport::isOpen() {
+ return (readAvail() > 0) || transport_->isOpen();
+}
+
+// READING STRATEGY
+//
+// We have two buffers for reading: one containing the compressed data (crbuf_)
+// and one containing the uncompressed data (urbuf_). When read is called,
+// we repeat the following steps until we have satisfied the request:
+// - Copy data from urbuf_ into the caller's buffer.
+// - If we had enough, return.
+// - If urbuf_ is empty, read some data into it from the underlying transport.
+// - Inflate data from crbuf_ into urbuf_.
+//
+// In standalone objects, we set input_ended_ to true when inflate returns
+// Z_STREAM_END. This allows to make sure that a checksum was verified.
+
+inline int TZlibTransport::readAvail() {
+ return urbuf_size_ - rstream_->avail_out - urpos_;
+}
+
+uint32_t TZlibTransport::read(uint8_t* buf, uint32_t len) {
+ int need = len;
+
+ // TODO(dreiss): Skip urbuf on big reads.
+
+ while (true) {
+ // Copy out whatever we have available, then give them the min of
+ // what we have and what they want, then advance indices.
+ int give = std::min(readAvail(), need);
+ memcpy(buf, urbuf_ + urpos_, give);
+ need -= give;
+ buf += give;
+ urpos_ += give;
+
+ // If they were satisfied, we are done.
+ if (need == 0) {
+ return len;
+ }
+
+ // If we get to this point, we need to get some more data.
+
+ // If zlib has reported the end of a stream, we can't really do any more.
+ if (input_ended_) {
+ return len - need;
+ }
+
+ // The uncompressed read buffer is empty, so reset the stream fields.
+ rstream_->next_out = urbuf_;
+ rstream_->avail_out = urbuf_size_;
+ urpos_ = 0;
+
+ // If we don't have any more compressed data available,
+ // read some from the underlying transport.
+ if (rstream_->avail_in == 0) {
+ uint32_t got = transport_->read(crbuf_, crbuf_size_);
+ if (got == 0) {
+ return len - need;
+ }
+ rstream_->next_in = crbuf_;
+ rstream_->avail_in = got;
+ }
+
+ // We have some compressed data now. Uncompress it.
+ int zlib_rv = inflate(rstream_, Z_SYNC_FLUSH);
+
+ if (zlib_rv == Z_STREAM_END) {
+ if (standalone_) {
+ input_ended_ = true;
+ }
+ } else {
+ checkZlibRv(zlib_rv, rstream_->msg);
+ }
+
+ // Okay. The read buffer should have whatever we can give it now.
+ // Loop back to the start and try to give some more.
+ }
+}
+
+
+// WRITING STRATEGY
+//
+// We buffer up small writes before sending them to zlib, so our logic is:
+// - Is the write big?
+// - Send the buffer to zlib.
+// - Send this data to zlib.
+// - Is the write small?
+// - Is there insufficient space in the buffer for it?
+// - Send the buffer to zlib.
+// - Copy the data to the buffer.
+//
+// We have two buffers for writing also: the uncompressed buffer (mentioned
+// above) and the compressed buffer. When sending data to zlib we loop over
+// the following until the source (uncompressed buffer or big write) is empty:
+// - Is there no more space in the compressed buffer?
+// - Write the compressed buffer to the underlying transport.
+// - Deflate from the source into the compressed buffer.
+
+void TZlibTransport::write(const uint8_t* buf, uint32_t len) {
+ // zlib's "deflate" function has enough logic in it that I think
+ // we're better off (performance-wise) buffering up small writes.
+ if ((int)len > MIN_DIRECT_DEFLATE_SIZE) {
+ flushToZlib(uwbuf_, uwpos_);
+ uwpos_ = 0;
+ flushToZlib(buf, len);
+ } else if (len > 0) {
+ if (uwbuf_size_ - uwpos_ < (int)len) {
+ flushToZlib(uwbuf_, uwpos_);
+ uwpos_ = 0;
+ }
+ memcpy(uwbuf_ + uwpos_, buf, len);
+ uwpos_ += len;
+ }
+}
+
+void TZlibTransport::flush() {
+ flushToZlib(uwbuf_, uwpos_, true);
+ assert((int)wstream_->avail_out != cwbuf_size_);
+ transport_->write(cwbuf_, cwbuf_size_ - wstream_->avail_out);
+ transport_->flush();
+}
+
+void TZlibTransport::flushToZlib(const uint8_t* buf, int len, bool finish) {
+ int flush = (finish ? Z_FINISH : Z_NO_FLUSH);
+
+ wstream_->next_in = const_cast<uint8_t*>(buf);
+ wstream_->avail_in = len;
+
+ while (wstream_->avail_in > 0 || finish) {
+ // If our ouput buffer is full, flush to the underlying transport.
+ if (wstream_->avail_out == 0) {
+ transport_->write(cwbuf_, cwbuf_size_);
+ wstream_->next_out = cwbuf_;
+ wstream_->avail_out = cwbuf_size_;
+ }
+
+ int zlib_rv = deflate(wstream_, flush);
+
+ if (finish && zlib_rv == Z_STREAM_END) {
+ assert(wstream_->avail_in == 0);
+ break;
+ }
+
+ checkZlibRv(zlib_rv, wstream_->msg);
+ }
+}
+
+bool TZlibTransport::borrow(uint8_t* buf, uint32_t len) {
+ // Don't try to be clever with shifting buffers.
+ // If we have enough data, give it, otherwise
+ // let the protcol use its slow path.
+ if (readAvail() >= (int)len) {
+ memcpy(buf, urbuf_ + urpos_, len);
+ return true;
+ }
+ return false;
+}
+
+void TZlibTransport::consume(uint32_t len) {
+ if (readAvail() >= (int)len) {
+ urpos_ += len;
+ } else {
+ throw TTransportException(TTransportException::BAD_ARGS,
+ "consume did not follow a borrow.");
+ }
+}
+
+void TZlibTransport::verifyChecksum() {
+ if (!standalone_) {
+ throw TTransportException(
+ TTransportException::BAD_ARGS,
+ "TZLibTransport can only verify checksums for standalone objects.");
+ }
+
+ if (!input_ended_) {
+ // This should only be called when reading is complete,
+ // but it's possible that the whole checksum has not been fed to zlib yet.
+ // We try to read an extra byte here to force zlib to finish the stream.
+ // It might not always be easy to "unread" this byte,
+ // but we throw an exception if we get it, which is not really
+ // a recoverable error, so it doesn't matter.
+ uint8_t buf[1];
+ uint32_t got = this->read(buf, sizeof(buf));
+ if (got || !input_ended_) {
+ throw TTransportException(
+ TTransportException::CORRUPTED_DATA,
+ "Zlib stream not complete.");
+ }
+ }
+
+ // If the checksum had been bad, we would have gotten an error while
+ // inflating.
+}
+
+
+}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TZlibTransport.h b/lib/cpp/src/transport/TZlibTransport.h
new file mode 100644
index 0000000..cc2522d
--- /dev/null
+++ b/lib/cpp/src/transport/TZlibTransport.h
@@ -0,0 +1,207 @@
+// Copyright (c) 2006- Facebook
+// Distributed under the Thrift Software License
+//
+// See accompanying file LICENSE or visit the Thrift site at:
+// http://developers.facebook.com/thrift/
+
+#ifndef _THRIFT_TRANSPORT_TZLIBTRANSPORT_H_
+#define _THRIFT_TRANSPORT_TZLIBTRANSPORT_H_ 1
+
+#include <boost/lexical_cast.hpp>
+#include <transport/TTransport.h>
+
+struct z_stream_s;
+
+namespace facebook { namespace thrift { namespace transport {
+
+class TZlibTransportException : public TTransportException {
+ public:
+ TZlibTransportException(int status, const char* msg) :
+ TTransportException(TTransportException::INTERNAL_ERROR,
+ errorMessage(status, msg)),
+ zlib_status_(status),
+ zlib_msg_(msg == NULL ? "(null)" : msg) {}
+
+ virtual ~TZlibTransportException() throw() {}
+
+ int getZlibStatus() { return zlib_status_; }
+ std::string getZlibMessage() { return zlib_msg_; }
+
+ static std::string errorMessage(int status, const char* msg) {
+ std::string rv = "zlib error: ";
+ if (msg) {
+ rv += msg;
+ } else {
+ rv += "(no message)";
+ }
+ rv += " (status = ";
+ rv += boost::lexical_cast<std::string>(status);
+ rv += ")";
+ return rv;
+ }
+
+ int zlib_status_;
+ std::string zlib_msg_;
+};
+
+/**
+ * This transport uses zlib's compressed format on the "far" side.
+ *
+ * There are two kinds of TZlibTransport objects:
+ * - Standalone objects are used to encode self-contained chunks of data
+ * (like structures). They include checksums.
+ * - Non-standalone transports are used for RPC. They are not implemented yet.
+ *
+ * TODO(dreiss): Don't do an extra copy of the compressed data if
+ * the underlying transport is TBuffered or TMemory.
+ *
+ * @author David Reiss <dreiss@facebook.com>
+ */
+class TZlibTransport : public TTransport {
+ public:
+
+ /**
+ * @param transport The transport to read compressed data from
+ * and write compressed data to.
+ * @param use_for_rpc True if this object will be used for RPC,
+ * false if this is a standalone object.
+ * @param urbuf_size Uncompressed buffer size for reading.
+ * @param crbuf_size Compressed buffer size for reading.
+ * @param uwbuf_size Uncompressed buffer size for writing.
+ * @param cwbuf_size Compressed buffer size for writing.
+ *
+ * TODO(dreiss): Write a constructor that isn't a pain.
+ */
+ TZlibTransport(boost::shared_ptr<TTransport> transport,
+ bool use_for_rpc,
+ int urbuf_size = DEFAULT_URBUF_SIZE,
+ int crbuf_size = DEFAULT_CRBUF_SIZE,
+ int uwbuf_size = DEFAULT_UWBUF_SIZE,
+ int cwbuf_size = DEFAULT_CWBUF_SIZE) :
+ transport_(transport),
+ standalone_(!use_for_rpc),
+ urpos_(0),
+ uwpos_(0),
+ input_ended_(false),
+ output_flushed_(false),
+ urbuf_size_(urbuf_size),
+ crbuf_size_(crbuf_size),
+ uwbuf_size_(uwbuf_size),
+ cwbuf_size_(cwbuf_size),
+ urbuf_(NULL),
+ crbuf_(NULL),
+ uwbuf_(NULL),
+ cwbuf_(NULL),
+ rstream_(NULL),
+ wstream_(NULL)
+ {
+
+ if (!standalone_) {
+ throw TTransportException(
+ TTransportException::BAD_ARGS,
+ "TZLibTransport has not been tested for RPC.");
+ }
+
+ if (uwbuf_size_ < MIN_DIRECT_DEFLATE_SIZE) {
+ // Have to copy this into a local because of a linking issue.
+ int minimum = MIN_DIRECT_DEFLATE_SIZE;
+ throw TTransportException(
+ TTransportException::BAD_ARGS,
+ "TZLibTransport: uncompressed write buffer must be at least"
+ + boost::lexical_cast<std::string>(minimum) + ".");
+ }
+
+ try {
+ urbuf_ = new uint8_t[urbuf_size];
+ crbuf_ = new uint8_t[crbuf_size];
+ uwbuf_ = new uint8_t[uwbuf_size];
+ cwbuf_ = new uint8_t[cwbuf_size];
+
+ // Don't call this outside of the constructor.
+ initZlib();
+
+ } catch (...) {
+ delete[] urbuf_;
+ delete[] crbuf_;
+ delete[] uwbuf_;
+ delete[] cwbuf_;
+ throw;
+ }
+ }
+
+ // Don't call this outside of the constructor.
+ void initZlib();
+
+ ~TZlibTransport();
+
+ bool isOpen();
+
+ void open() {
+ transport_->open();
+ }
+
+ void close() {
+ transport_->close();
+ }
+
+ uint32_t read(uint8_t* buf, uint32_t len);
+
+ void write(const uint8_t* buf, uint32_t len);
+
+ void flush();
+
+ bool borrow(uint8_t* buf, uint32_t len);
+
+ void consume(uint32_t len);
+
+ void verifyChecksum();
+
+ /**
+ * TODO(someone_smart): Choose smart defaults.
+ */
+ static const int DEFAULT_URBUF_SIZE = 128;
+ static const int DEFAULT_CRBUF_SIZE = 1024;
+ static const int DEFAULT_UWBUF_SIZE = 128;
+ static const int DEFAULT_CWBUF_SIZE = 1024;
+
+ protected:
+
+ inline void checkZlibRv(int status, const char* msg);
+ inline void checkZlibRvNothrow(int status, const char* msg);
+ inline int readAvail();
+ void flushToZlib(const uint8_t* buf, int len, bool finish = false);
+
+ // Writes smaller than this are buffered up.
+ // Larger (or equal) writes are dumped straight to zlib.
+ static const int MIN_DIRECT_DEFLATE_SIZE = 32;
+
+ boost::shared_ptr<TTransport> transport_;
+ bool standalone_;
+
+ int urpos_;
+ int uwpos_;
+
+ /// True iff zlib has reached the end of a stream.
+ /// This is only ever true in standalone protcol objects.
+ bool input_ended_;
+ /// True iff we have flushed the output stream.
+ /// This is only ever true in standalone protcol objects.
+ bool output_flushed_;
+
+ int urbuf_size_;
+ int crbuf_size_;
+ int uwbuf_size_;
+ int cwbuf_size_;
+
+ uint8_t* urbuf_;
+ uint8_t* crbuf_;
+ uint8_t* uwbuf_;
+ uint8_t* cwbuf_;
+
+ struct z_stream_s* rstream_;
+ struct z_stream_s* wstream_;
+};
+
+}}} // facebook::thrift::transport
+
+#endif // #ifndef _THRIFT_TRANSPORT_TZLIBTRANSPORT_H_