THRIFT-123: TZlibTransport for Java
Client: Java
Patch: Dragan Okiljevic, Keith Chew, Randy Abernethy
Adds a Java 1.7 based TZlibTransport to the Java library.
diff --git a/lib/java/src/org/apache/thrift/transport/TZlibTransport.java b/lib/java/src/org/apache/thrift/transport/TZlibTransport.java
new file mode 100644
index 0000000..25c9d01
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/transport/TZlibTransport.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+import java.util.zip.Deflater;
+import java.util.zip.Inflater;
+import org.apache.thrift.TByteArrayOutputStream;
+import org.apache.thrift.transport.TMemoryInputTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.transport.TTransportFactory;
+
+/**
+ * TZlibTransport deflates on write and inflates on read.
+ */
+public class TZlibTransport extends TTransport {
+ //Class constants
+ public static final int INFLATE_BUF_SIZE = 1024;
+ public static final int READ_BUF_SIZE = 1024;
+ public static final int INIT_WRITE_BUF_SIZE = 1024;
+ //Client rw buffers and underlying transport
+ private TByteArrayOutputStream writeBuffer_ = new TByteArrayOutputStream(INIT_WRITE_BUF_SIZE);
+ private TMemoryInputTransport readBuffer_ = new TMemoryInputTransport(new byte[0]);
+ private TTransport transport_ = null;
+ //Zip objects and buffers
+ private byte[] inflateBuf = new byte[INFLATE_BUF_SIZE];
+ private byte[] readBuf = new byte[READ_BUF_SIZE];
+ private Inflater decompresser = new Inflater(false);
+ private Deflater compresser = new Deflater(Deflater.BEST_COMPRESSION, false);
+
+ public static class Factory extends TTransportFactory {
+ public Factory() {
+ }
+
+ @Override
+ public TTransport getTransport(TTransport base) {
+ return new TZlibTransport(base);
+ }
+ }
+
+ /**
+ * Constructs a new TZlibTransport instance.
+ * @param transport the underlying transport to read from and write to
+ */
+ public TZlibTransport(TTransport transport) {
+ transport_ = transport;
+ }
+
+ /**
+ * Constructs a new TZlibTransport instance.
+ * @param transport the underlying transport to read from and write to
+ * @param compressionLevel 0 for no compression, 9 for maximum compression
+ */
+ public TZlibTransport(TTransport transport, int compressionLevel) {
+ transport_ = transport;
+ compresser = new Deflater(compressionLevel, false);
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ transport_.open();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return transport_.isOpen();
+ }
+
+ @Override
+ public void close() {
+ readBuffer_.reset(new byte[0]);
+ writeBuffer_.reset();
+ compresser.reset();
+ decompresser.reset();
+ transport_.close();
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ int bytesRead = readBuffer_.read(buf, off, len);
+ if (bytesRead > 0) {
+ return bytesRead;
+ }
+
+ while (true) {
+ if (readComp() > 0) {
+ break;
+ }
+ }
+
+ return readBuffer_.read(buf, off, len);
+ }
+
+ private int readComp() throws TTransportException {
+ //If low level read buffer is exhausted, read more bytes from underlying transport
+ if (decompresser.needsInput()) {
+ int bytesRead = transport_.read(readBuf, 0, READ_BUF_SIZE);
+ decompresser.setInput(readBuf, 0, bytesRead);
+ }
+ //Decompress bytes into high level client read buffer
+ try {
+ int InflatedBytes = decompresser.inflate(inflateBuf);
+ if (InflatedBytes <= 0) {
+ return 0;
+ }
+
+ byte[] old = new byte[readBuffer_.getBytesRemainingInBuffer()];
+ readBuffer_.read(old, 0, readBuffer_.getBytesRemainingInBuffer());
+ byte[] all = new byte[old.length + InflatedBytes];
+ System.arraycopy(old, 0, all, 0, old.length);
+ System.arraycopy(inflateBuf, 0, all, old.length, InflatedBytes);
+
+ readBuffer_.reset(all);
+ return all.length;
+ } catch (java.util.zip.DataFormatException ex) {
+ throw new TTransportException(ex);
+ }
+ }
+
+ @Override
+ public byte[] getBuffer() {
+ return readBuffer_.getBuffer();
+ }
+
+ @Override
+ public int getBufferPosition() {
+ return readBuffer_.getBufferPosition();
+ }
+
+ @Override
+ public int getBytesRemainingInBuffer() {
+ return readBuffer_.getBytesRemainingInBuffer();
+ }
+
+ @Override
+ public void consumeBuffer(int len) {
+ readBuffer_.consumeBuffer(len);
+ }
+
+ @Override
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ writeBuffer_.write(buf, off, len);
+ }
+
+ /**
+ * Compress write buffer and send it to underlying transport.
+ */
+ @Override
+ public void flush() throws TTransportException {
+ byte[] buf = writeBuffer_.get();
+ writeBuffer_.reset();
+ compresser.setInput(buf);
+
+ byte[] compBuf = new byte[buf.length * 2];
+ int compressedDataLength = compresser.deflate(compBuf, 0, compBuf.length, Deflater.SYNC_FLUSH);
+ if (compressedDataLength >= compBuf.length) {
+ throw new TTransportException("Compression error, compressed output exceeds buffer size");
+ }
+ if (compressedDataLength > 0) {
+ transport_.write(compBuf, 0, compressedDataLength);
+ transport_.flush();
+ }
+ }
+}
+