THRIFT-3854 add a way in java to clear TFramedTransport read buffers
Client: Java
Patch: Chris Lockfort <clockfort@palantir.com>
This closes #1164
This closes #1081
diff --git a/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java b/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java
index 0398ca7..d265600 100644
--- a/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java
@@ -65,7 +65,8 @@
private final TTransport underlying;
private final AutoExpandingBufferWriteTransport writeBuffer;
- private final AutoExpandingBufferReadTransport readBuffer;
+ private AutoExpandingBufferReadTransport readBuffer;
+ private final int initialBufferCapacity;
private final byte[] i32buf = new byte[4];
private final int maxLength;
@@ -104,6 +105,7 @@
public TFastFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) {
this.underlying = underlying;
this.maxLength = maxLength;
+ this.initialBufferCapacity = initialBufferCapacity;
writeBuffer = new AutoExpandingBufferWriteTransport(initialBufferCapacity, 1.5);
readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity, 1.5);
}
@@ -164,6 +166,10 @@
readBuffer.consumeBuffer(len);
}
+ public void clear() {
+ readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity, 1.5);
+ }
+
@Override
public void flush() throws TTransportException {
int length = writeBuffer.getPos();
diff --git a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java b/lib/java/src/org/apache/thrift/transport/TFramedTransport.java
index f7d220c..fa531ef 100644
--- a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TFramedTransport.java
@@ -45,7 +45,8 @@
/**
* Buffer for input
*/
- private TMemoryInputTransport readBuffer_ = new TMemoryInputTransport(new byte[0]);
+ private final TMemoryInputTransport readBuffer_ =
+ new TMemoryInputTransport(new byte[0]);
public static class Factory extends TTransportFactory {
private int maxLength_;
@@ -90,11 +91,9 @@
}
public int read(byte[] buf, int off, int len) throws TTransportException {
- if (readBuffer_ != null) {
- int got = readBuffer_.read(buf, off, len);
- if (got > 0) {
- return got;
- }
+ int got = readBuffer_.read(buf, off, len);
+ if (got > 0) {
+ return got;
}
// Read another frame of data
@@ -123,6 +122,10 @@
readBuffer_.consumeBuffer(len);
}
+ public void clear() {
+ readBuffer_.clear();
+ }
+
private final byte[] i32buf = new byte[4];
private void readFrame() throws TTransportException {
diff --git a/lib/java/test/org/apache/thrift/transport/TestTFastFramedTransport.java b/lib/java/test/org/apache/thrift/transport/TestTFastFramedTransport.java
index 11fbdf4..06ee206 100644
--- a/lib/java/test/org/apache/thrift/transport/TestTFastFramedTransport.java
+++ b/lib/java/test/org/apache/thrift/transport/TestTFastFramedTransport.java
@@ -19,13 +19,15 @@
package org.apache.thrift.transport;
public class TestTFastFramedTransport extends TestTFramedTransport {
+ protected final static int INITIAL_CAPACITY = 50;
+
@Override
protected TTransport getTransport(TTransport underlying) {
- return new TFastFramedTransport(underlying, 50, 10 * 1024 * 1024);
+ return new TFastFramedTransport(underlying, INITIAL_CAPACITY, 10 * 1024 * 1024);
}
@Override
protected TTransport getTransport(TTransport underlying, int maxLength) {
- return new TFastFramedTransport(underlying, 50, maxLength);
+ return new TFastFramedTransport(underlying, INITIAL_CAPACITY, maxLength);
}
}
diff --git a/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java b/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java
index 6cebd3c..7e889d6 100644
--- a/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java
+++ b/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java
@@ -183,4 +183,32 @@
assertEquals(65, trans.getBytesRemainingInBuffer());
assertEquals(10, trans.getBufferPosition());
}
+
+ public void testClear() throws IOException, TTransportException {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ dos.writeInt(220);
+ dos.write(byteSequence(0, 219));
+
+ TMemoryBuffer membuf = new TMemoryBuffer(0);
+ membuf.write(baos.toByteArray());
+
+ ReadCountingTransport countTrans = new ReadCountingTransport(membuf);
+ TTransport trans = getTransport(countTrans);
+
+ byte[] readBuf = new byte[220];
+ trans.read(readBuf, 0, 220);
+ assertTrue(Arrays.equals(readBuf, byteSequence(0,219)));
+
+ assertTrue(trans instanceof TFramedTransport || trans instanceof TFastFramedTransport);
+ if (trans instanceof TFramedTransport) {
+ assertTrue(trans.getBuffer() != null && trans.getBuffer().length > 0);
+ ((TFramedTransport) trans).clear();
+ assertTrue(trans.getBuffer() == null);
+ } else if (trans instanceof TFastFramedTransport) {
+ assertTrue(trans.getBuffer().length > TestTFastFramedTransport.INITIAL_CAPACITY);
+ ((TFastFramedTransport) trans).clear();
+ assertTrue(trans.getBuffer().length == TestTFastFramedTransport.INITIAL_CAPACITY);
+ }
+ }
}