THRIFT-3854 add a way in java to clear TFramedTransport read buffers
Client: Java
Patch: Chris Lockfort <clockfort@palantir.com>

This closes #1164
This closes #1081
diff --git a/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java b/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java
index 0398ca7..d265600 100644
--- a/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java
@@ -65,7 +65,8 @@
 
   private final TTransport underlying;
   private final AutoExpandingBufferWriteTransport writeBuffer;
-  private final AutoExpandingBufferReadTransport readBuffer;
+  private AutoExpandingBufferReadTransport readBuffer;
+  private final int initialBufferCapacity;
   private final byte[] i32buf = new byte[4];
   private final int maxLength;
 
@@ -104,6 +105,7 @@
   public TFastFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) {
     this.underlying = underlying;
     this.maxLength = maxLength;
+    this.initialBufferCapacity = initialBufferCapacity;
     writeBuffer = new AutoExpandingBufferWriteTransport(initialBufferCapacity, 1.5);
     readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity, 1.5);
   }
@@ -164,6 +166,10 @@
     readBuffer.consumeBuffer(len);
   }
 
+  public void clear() {
+    readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity, 1.5);
+  }
+
   @Override
   public void flush() throws TTransportException {
     int length = writeBuffer.getPos();
diff --git a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java b/lib/java/src/org/apache/thrift/transport/TFramedTransport.java
index f7d220c..fa531ef 100644
--- a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TFramedTransport.java
@@ -45,7 +45,8 @@
   /**
    * Buffer for input
    */
-  private TMemoryInputTransport readBuffer_ = new TMemoryInputTransport(new byte[0]);
+  private final TMemoryInputTransport readBuffer_ =
+    new TMemoryInputTransport(new byte[0]);
 
   public static class Factory extends TTransportFactory {
     private int maxLength_;
@@ -90,11 +91,9 @@
   }
 
   public int read(byte[] buf, int off, int len) throws TTransportException {
-    if (readBuffer_ != null) {
-      int got = readBuffer_.read(buf, off, len);
-      if (got > 0) {
-        return got;
-      }
+    int got = readBuffer_.read(buf, off, len);
+    if (got > 0) {
+      return got;
     }
 
     // Read another frame of data
@@ -123,6 +122,10 @@
     readBuffer_.consumeBuffer(len);
   }
 
+  public void clear() {
+    readBuffer_.clear();
+  }
+
   private final byte[] i32buf = new byte[4];
 
   private void readFrame() throws TTransportException {
diff --git a/lib/java/test/org/apache/thrift/transport/TestTFastFramedTransport.java b/lib/java/test/org/apache/thrift/transport/TestTFastFramedTransport.java
index 11fbdf4..06ee206 100644
--- a/lib/java/test/org/apache/thrift/transport/TestTFastFramedTransport.java
+++ b/lib/java/test/org/apache/thrift/transport/TestTFastFramedTransport.java
@@ -19,13 +19,15 @@
 package org.apache.thrift.transport;
 
 public class TestTFastFramedTransport extends TestTFramedTransport {
+  protected final static int INITIAL_CAPACITY = 50;
+
   @Override
   protected TTransport getTransport(TTransport underlying) {
-    return new TFastFramedTransport(underlying, 50, 10 * 1024 * 1024);
+    return new TFastFramedTransport(underlying, INITIAL_CAPACITY, 10 * 1024 * 1024);
   }
 
   @Override
   protected TTransport getTransport(TTransport underlying, int maxLength) {
-    return new TFastFramedTransport(underlying, 50, maxLength);
+    return new TFastFramedTransport(underlying, INITIAL_CAPACITY, maxLength);
   }
 }
diff --git a/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java b/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java
index 6cebd3c..7e889d6 100644
--- a/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java
+++ b/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java
@@ -183,4 +183,32 @@
     assertEquals(65, trans.getBytesRemainingInBuffer());
     assertEquals(10, trans.getBufferPosition());
   }
+
+  public void testClear() throws IOException, TTransportException {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    DataOutputStream dos = new DataOutputStream(baos);
+    dos.writeInt(220);
+    dos.write(byteSequence(0, 219));
+
+    TMemoryBuffer membuf = new TMemoryBuffer(0);
+    membuf.write(baos.toByteArray());
+
+    ReadCountingTransport countTrans = new ReadCountingTransport(membuf);
+    TTransport trans = getTransport(countTrans);
+
+    byte[] readBuf = new byte[220];
+    trans.read(readBuf, 0, 220);
+    assertTrue(Arrays.equals(readBuf, byteSequence(0,219)));
+
+    assertTrue(trans instanceof TFramedTransport || trans instanceof TFastFramedTransport);
+    if (trans instanceof TFramedTransport) {
+      assertTrue(trans.getBuffer() != null && trans.getBuffer().length > 0);
+      ((TFramedTransport) trans).clear();
+      assertTrue(trans.getBuffer() == null);
+    } else if (trans instanceof TFastFramedTransport) {
+      assertTrue(trans.getBuffer().length > TestTFastFramedTransport.INITIAL_CAPACITY);
+      ((TFastFramedTransport) trans).clear();
+      assertTrue(trans.getBuffer().length == TestTFastFramedTransport.INITIAL_CAPACITY);
+    }
+  }
 }