THRIFT-926. cpp: remove "standalone" distinction in TZlibTransport
Now that TZlibTransport::flush() behaves the same way as other
transports, there is no need to distinguish between RPC and standalone
behavior for TZlibTransport.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005152 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TZlibTransport.cpp b/lib/cpp/src/transport/TZlibTransport.cpp
index 8953742..ae28bb6 100644
--- a/lib/cpp/src/transport/TZlibTransport.cpp
+++ b/lib/cpp/src/transport/TZlibTransport.cpp
@@ -161,26 +161,10 @@
rstream_->avail_out = urbuf_size_;
urpos_ = 0;
- // If we don't have any more compressed data available,
- // read some from the underlying transport.
- if (rstream_->avail_in == 0) {
- uint32_t got = transport_->read(crbuf_, crbuf_size_);
- if (got == 0) {
- return len - need;
- }
- rstream_->next_in = crbuf_;
- rstream_->avail_in = got;
- }
-
- // We have some compressed data now. Uncompress it.
- int zlib_rv = inflate(rstream_, Z_SYNC_FLUSH);
-
- if (zlib_rv == Z_STREAM_END) {
- if (standalone_) {
- input_ended_ = true;
- }
- } else {
- checkZlibRv(zlib_rv, rstream_->msg);
+ // Call inflate() to uncompress some more data
+ if (!readFromZlib()) {
+ // no data available from underlying transport
+ return len - need;
}
// Okay. The read buffer should have whatever we can give it now.
@@ -188,6 +172,32 @@
}
}
+bool TZlibTransport::readFromZlib() {
+ assert(!input_ended_);
+
+ // If we don't have any more compressed data available,
+ // read some from the underlying transport.
+ if (rstream_->avail_in == 0) {
+ uint32_t got = transport_->read(crbuf_, crbuf_size_);
+ if (got == 0) {
+ return false;
+ }
+ rstream_->next_in = crbuf_;
+ rstream_->avail_in = got;
+ }
+
+ // We have some compressed data now. Uncompress it.
+ int zlib_rv = inflate(rstream_, Z_SYNC_FLUSH);
+
+ if (zlib_rv == Z_STREAM_END) {
+ input_ended_ = true;
+ } else {
+ checkZlibRv(zlib_rv, rstream_->msg);
+ }
+
+ return true;
+}
+
// WRITING STRATEGY
//
@@ -315,30 +325,60 @@
}
void TZlibTransport::verifyChecksum() {
- if (!standalone_) {
+ // If zlib has already reported the end of the stream,
+ // it has verified the checksum.
+ if (input_ended_) {
+ return;
+ }
+
+ // This should only be called when reading is complete.
+ // If the caller still has unread data, throw an exception.
+ if (readAvail() > 0) {
throw TTransportException(
- TTransportException::BAD_ARGS,
- "TZLibTransport can only verify checksums for standalone objects.");
+ TTransportException::CORRUPTED_DATA,
+ "verifyChecksum() called before end of zlib stream");
}
- if (!input_ended_) {
- // This should only be called when reading is complete,
- // but it's possible that the whole checksum has not been fed to zlib yet.
- // We try to read an extra byte here to force zlib to finish the stream.
- // It might not always be easy to "unread" this byte,
- // but we throw an exception if we get it, which is not really
- // a recoverable error, so it doesn't matter.
- uint8_t buf[1];
- uint32_t got = this->read(buf, sizeof(buf));
- if (got || !input_ended_) {
- throw TTransportException(
- TTransportException::CORRUPTED_DATA,
- "Zlib stream not complete.");
- }
+ // Reset the rstream fields, in case avail_out is 0.
+ // (Since readAvail() is 0, we know there is no unread data in urbuf_)
+ rstream_->next_out = urbuf_;
+ rstream_->avail_out = urbuf_size_;
+ urpos_ = 0;
+
+ // Call inflate()
+ // This will throw an exception if the checksum is bad.
+ bool performed_inflate = readFromZlib();
+ if (!performed_inflate) {
+ // We needed to read from the underlying transport, and the read() call
+ // returned 0.
+ //
+ // Not all TTransport implementations behave the same way here, so we'll
+ // end up with different behavior depending on the underlying transport.
+ //
+ // For some transports (e.g., TFDTransport), read() blocks if no more data
+ // is available. They only return 0 if EOF has been reached, or if the
+ // remote endpoint has closed the connection. For those transports,
+ // verifyChecksum() will block until the checksum becomes available.
+ //
+ // Other transport types (e.g., TMemoryBuffer) always return 0 immediately
+ // if no more data is available. For those transport types, verifyChecksum
+ // will raise the following exception if the checksum is not available from
+ // the underlying transport yet.
+ throw TTransportException(TTransportException::CORRUPTED_DATA,
+ "checksum not available yet in "
+ "verifyChecksum()");
}
- // If the checksum had been bad, we would have gotten an error while
- // inflating.
+ // If input_ended_ is true now, the checksum has been verified
+ if (input_ended_) {
+ return;
+ }
+
+ // The caller invoked us before the actual end of the data stream
+ assert(rstream_->avail_out < urbuf_size_);
+ throw TTransportException(TTransportException::CORRUPTED_DATA,
+ "verifyChecksum() called before end of "
+ "zlib stream");
}
diff --git a/lib/cpp/src/transport/TZlibTransport.h b/lib/cpp/src/transport/TZlibTransport.h
index 0f9815e..45afe79 100644
--- a/lib/cpp/src/transport/TZlibTransport.h
+++ b/lib/cpp/src/transport/TZlibTransport.h
@@ -76,8 +76,6 @@
/**
* @param transport The transport to read compressed data from
* and write compressed data to.
- * @param use_for_rpc True if this object will be used for RPC,
- * false if this is a standalone object.
* @param urbuf_size Uncompressed buffer size for reading.
* @param crbuf_size Compressed buffer size for reading.
* @param uwbuf_size Uncompressed buffer size for writing.
@@ -86,13 +84,11 @@
* TODO(dreiss): Write a constructor that isn't a pain.
*/
TZlibTransport(boost::shared_ptr<TTransport> transport,
- bool use_for_rpc,
int urbuf_size = DEFAULT_URBUF_SIZE,
int crbuf_size = DEFAULT_CRBUF_SIZE,
int uwbuf_size = DEFAULT_UWBUF_SIZE,
int cwbuf_size = DEFAULT_CWBUF_SIZE) :
transport_(transport),
- standalone_(!use_for_rpc),
urpos_(0),
uwpos_(0),
input_ended_(false),
@@ -108,13 +104,6 @@
rstream_(NULL),
wstream_(NULL)
{
-
- if (!standalone_) {
- throw TTransportException(
- TTransportException::BAD_ARGS,
- "TZLibTransport has not been tested for RPC.");
- }
-
if (uwbuf_size_ < MIN_DIRECT_DEFLATE_SIZE) {
// Have to copy this into a local because of a linking issue.
int minimum = MIN_DIRECT_DEFLATE_SIZE;
@@ -206,22 +195,47 @@
inline int readAvail();
void flushToTransport(int flush);
void flushToZlib(const uint8_t* buf, int len, int flush);
+ bool readFromZlib();
+ private:
+ // Deprecated constructor signature.
+ //
+ // This used to be the constructor signature. If you are getting a compile
+ // error because you are trying to use this constructor, you need to update
+ // your code as follows:
+ // - Remove the use_for_rpc argument in the constructur.
+ // There is no longer any distinction between RPC and standalone zlib
+ // transports. (Previously, only standalone was allowed, anyway.)
+ // - Replace TZlibTransport::flush() calls with TZlibTransport::finish()
+ // in your code. Previously, flush() used to finish the zlib stream.
+ // Now flush() only flushes out pending data, so more writes can be
+ // performed after a flush(). The finish() method can be used to finalize
+ // the zlib stream.
+ //
+ // If we don't declare this constructor, old code written as
+ // TZlibTransport(trans, false) still compiles but behaves incorrectly.
+ // The second bool argument is converted to an integer and used as the
+ // urbuf_size.
+ TZlibTransport(boost::shared_ptr<TTransport> transport,
+ bool use_for_rpc,
+ int urbuf_size = DEFAULT_URBUF_SIZE,
+ int crbuf_size = DEFAULT_CRBUF_SIZE,
+ int uwbuf_size = DEFAULT_UWBUF_SIZE,
+ int cwbuf_size = DEFAULT_CWBUF_SIZE);
+
+ protected:
// Writes smaller than this are buffered up.
// Larger (or equal) writes are dumped straight to zlib.
static const int MIN_DIRECT_DEFLATE_SIZE = 32;
boost::shared_ptr<TTransport> transport_;
- bool standalone_;
int urpos_;
int uwpos_;
- /// True iff zlib has reached the end of a stream.
- /// This is only ever true in standalone protcol objects.
+ /// True iff zlib has reached the end of the input stream.
bool input_ended_;
/// True iff we have finished the output stream.
- /// This is only ever true in standalone protcol objects.
bool output_finished_;
int urbuf_size_;
diff --git a/lib/cpp/test/TransportTest.cpp b/lib/cpp/test/TransportTest.cpp
index 7f95e38..a932643 100644
--- a/lib/cpp/test/TransportTest.cpp
+++ b/lib/cpp/test/TransportTest.cpp
@@ -183,8 +183,8 @@
public:
CoupledZlibTransports() :
buf(new TMemoryBuffer) {
- in = new TZlibTransport(buf, false);
- out = new TZlibTransport(buf, false);
+ in = new TZlibTransport(buf);
+ out = new TZlibTransport(buf);
}
~CoupledZlibTransports() {
diff --git a/lib/cpp/test/ZlibTest.cpp b/lib/cpp/test/ZlibTest.cpp
index e2403d7..1e9f187 100644
--- a/lib/cpp/test/ZlibTest.cpp
+++ b/lib/cpp/test/ZlibTest.cpp
@@ -143,7 +143,7 @@
void test_write_then_read(const uint8_t* buf, uint32_t buf_len) {
shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
- shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
+ shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf));
zlib_trans->write(buf, buf_len);
zlib_trans->finish();
@@ -162,12 +162,12 @@
// the stream was not complete. I'm about to go fix that.
// It worked. Awesome.
shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
- shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
+ shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf));
zlib_trans->write(buf, buf_len);
zlib_trans->finish();
string tmp_buf;
membuf->appendBufferToString(tmp_buf);
- zlib_trans.reset(new TZlibTransport(membuf, false,
+ zlib_trans.reset(new TZlibTransport(membuf,
TZlibTransport::DEFAULT_URBUF_SIZE,
tmp_buf.length()-1));
@@ -182,7 +182,7 @@
// Make sure we still get that "not complete" error if
// it really isn't complete.
shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
- shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
+ shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf));
zlib_trans->write(buf, buf_len);
zlib_trans->finish();
string tmp_buf;
@@ -209,7 +209,7 @@
const shared_ptr<SizeGenerator>& read_gen) {
// Try it with a mix of read/write sizes.
shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
- shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
+ shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf));
unsigned int tot;
tot = 0;
@@ -244,7 +244,7 @@
void test_invalid_checksum(const uint8_t* buf, uint32_t buf_len) {
// Verify checksum checking.
shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
- shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
+ shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf));
zlib_trans->write(buf, buf_len);
zlib_trans->finish();
string tmp_buf;
@@ -282,7 +282,7 @@
void test_write_after_flush(const uint8_t* buf, uint32_t buf_len) {
// write some data
shared_ptr<TMemoryBuffer> membuf(new TMemoryBuffer());
- shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf, false));
+ shared_ptr<TZlibTransport> zlib_trans(new TZlibTransport(membuf));
zlib_trans->write(buf, buf_len);
// call finish()
@@ -321,7 +321,7 @@
{
// Create a TZlibTransport object, and immediately destroy it
// when it goes out of scope.
- TZlibTransport w_zlib_trans(membuf, false);
+ TZlibTransport w_zlib_trans(membuf);
}
BOOST_CHECK_EQUAL(membuf->available_read(), 0);