THRIFT-831. java: FramedTransport implementation that reuses its buffers

This patch adds a TFastFramedTransport that is compatible with TFramedTransport, but makes use of a pair of internal, automatically-expanding buffers to avoid unnecessary reallocations. This makes interactions with the transport up to 2.5x faster.

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@985049 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java
new file mode 100644
index 0000000..b02905f
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+/**
+ * Helper class that wraps a byte[] so that it can expand and be reused. Users
+ * should call resizeIfNecessary to make sure the buffer has suitable capacity,
+ * and then use the array as needed. Note that the internal array will grow at a
+ * rate slightly faster than the requested capacity with the (untested)
+ * objective of avoiding expensive buffer allocations and copies.
+ */
+public class AutoExpandingBuffer {
+  private byte[] array;
+
+  private final double growthCoefficient;
+
+  public AutoExpandingBuffer(int initialCapacity, double growthCoefficient) {
+    if (growthCoefficient < 1.0) {
+      throw new IllegalArgumentException("Growth coefficient must be >= 1.0");
+    }
+    array = new byte[initialCapacity];
+    this.growthCoefficient = growthCoefficient;
+  }
+
+  public void resizeIfNecessary(int size) {
+    if (array.length < size) {
+      byte[] newBuf = new byte[(int)(size * growthCoefficient)];
+      System.arraycopy(array, 0, newBuf, 0, array.length);
+      array = newBuf;
+    }
+  }
+
+  public byte[] array() {
+    return array;
+  }
+}
diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java
new file mode 100644
index 0000000..d29d60b
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+import org.apache.commons.lang.NotImplementedException;
+
+/**
+ * TTransport for reading from an AutoExpandingBuffer.
+ */
+public class AutoExpandingBufferReadTransport extends TTransport {
+
+  private final AutoExpandingBuffer buf;
+
+  private int pos = 0;
+  private int limit = 0;
+
+  public AutoExpandingBufferReadTransport(int initialCapacity, double overgrowthCoefficient) {
+    this.buf = new AutoExpandingBuffer(initialCapacity, overgrowthCoefficient);
+  }
+
+  public void fill(TTransport inTrans, int length) throws TTransportException {
+    buf.resizeIfNecessary(length);
+    inTrans.readAll(buf.array(), 0, length);
+    pos = 0;
+    limit = length;
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public boolean isOpen() { return true; }
+
+  @Override
+  public void open() throws TTransportException {}
+
+  @Override
+  public final int read(byte[] target, int off, int len) throws TTransportException {
+    int amtToRead = Math.min(len, getBytesRemainingInBuffer());
+    System.arraycopy(buf.array(), pos, target, off, amtToRead);
+    consumeBuffer(amtToRead);
+    return amtToRead;
+  }
+
+  @Override
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public final void consumeBuffer(int len) {
+    pos += len;
+  }
+
+  @Override
+  public final byte[] getBuffer() {
+    return buf.array();
+  }
+
+  @Override
+  public final int getBufferPosition() {
+    return pos;
+  }
+
+  @Override
+  public final int getBytesRemainingInBuffer() {
+    return limit - pos;
+  }
+}
+  
\ No newline at end of file
diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java
new file mode 100644
index 0000000..2376cf3
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+import org.apache.commons.lang.NotImplementedException;
+
+/**
+ * TTransport for writing to an AutoExpandingBuffer.
+ */
+public final class AutoExpandingBufferWriteTransport extends TTransport {
+
+  private final AutoExpandingBuffer buf;
+  private int pos;
+
+  public AutoExpandingBufferWriteTransport(int initialCapacity, double growthCoefficient) {
+    this.buf = new AutoExpandingBuffer(initialCapacity, growthCoefficient);
+    this.pos = 0;
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public boolean isOpen() {return true;}
+
+  @Override
+  public void open() throws TTransportException {}
+
+  @Override
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    throw new NotImplementedException();
+  }
+
+  @Override
+  public void write(byte[] toWrite, int off, int len) throws TTransportException {
+    buf.resizeIfNecessary(pos + len);
+    System.arraycopy(toWrite, off, buf.array(), pos, len);
+    pos += len;
+  }
+
+  public AutoExpandingBuffer getBuf() {
+    return buf;
+  }
+
+  public int getPos() {
+    return pos;
+  }
+
+  public void reset() {
+    pos = 0;
+  }
+}
diff --git a/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java b/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java
new file mode 100644
index 0000000..2a1f1da
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+/**
+ * This transport is wire compatible with {@link TFramedTransport}, but makes 
+ * use of reusable, expanding read and write buffers in order to avoid
+ * allocating new byte[]s all the time. Since the buffers only expand, you
+ * should probably only use this transport if your messages are not too variably
+ * large, unless the persistent memory cost is not an issue.
+ * 
+ * This implementation is NOT threadsafe.
+ */
+public class TFastFramedTransport extends TTransport {
+
+  public static class Factory extends TTransportFactory {
+    private final int initialCapacity;
+    private final int maxLength;
+
+    public Factory() {
+      this(DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
+    }
+
+    public Factory(int initialCapacity) {
+      this(initialCapacity, DEFAULT_MAX_LENGTH);
+    }
+
+    public Factory(int initialCapacity, int maxLength) {
+      this.initialCapacity = initialCapacity;
+      this.maxLength = maxLength;
+    }
+
+    @Override
+    public TTransport getTransport(TTransport trans) {
+      return new TFastFramedTransport(trans,
+          initialCapacity,
+          maxLength);
+    }
+  }
+
+  /**
+   * How big should the default read and write buffers be?
+   */
+  public static final int DEFAULT_BUF_CAPACITY = 1024;
+  /**
+   * How big is the largest allowable frame? Defaults to Integer.MAX_VALUE.
+   */
+  public static final int DEFAULT_MAX_LENGTH = Integer.MAX_VALUE;
+
+  private final TTransport underlying;
+  private final AutoExpandingBufferWriteTransport writeBuffer;
+  private final AutoExpandingBufferReadTransport readBuffer;
+  private final byte[] i32buf = new byte[4];
+  private final int maxLength;
+
+  /**
+   * Create a new {@link TFastFramedTransport}. Use the defaults
+   * for initial buffer size and max frame length.
+   * @param underlying Transport that real reads and writes will go through to.
+   */
+  public TFastFramedTransport(TTransport underlying) {
+    this(underlying, DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
+  }
+
+  /**
+   * Create a new {@link TFastFramedTransport}. Use the specified
+   * initial buffer capacity and the default max frame length.
+   * @param underlying Transport that real reads and writes will go through to.
+   * @param initialBufferCapacity The initial size of the read and write buffers.
+   * In practice, it's not critical to set this unless you know in advance that
+   * your messages are going to be very large.
+   */
+  public TFastFramedTransport(TTransport underlying, int initialBufferCapacity) {
+    this(underlying, initialBufferCapacity, DEFAULT_MAX_LENGTH);
+  }
+
+  /**
+   * 
+   * @param underlying Transport that real reads and writes will go through to.
+   * @param initialBufferCapacity The initial size of the read and write buffers.
+   * In practice, it's not critical to set this unless you know in advance that
+   * your messages are going to be very large. (You can pass
+   * TFramedTransportWithReusableBuffer.DEFAULT_BUF_CAPACITY if you're only
+   * using this constructor because you want to set the maxLength.)
+   * @param maxLength The max frame size you are willing to read. You can use
+   * this parameter to limit how much memory can be allocated.
+   */
+  public TFastFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) {
+    this.underlying = underlying;
+    this.maxLength = maxLength;
+    writeBuffer = new AutoExpandingBufferWriteTransport(initialBufferCapacity, 1.5);
+    readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity, 1.5);
+  }
+
+  @Override
+  public void close() {
+    underlying.close();
+  }
+
+  @Override
+  public boolean isOpen() {
+    return underlying.isOpen();
+  }
+
+  @Override
+  public void open() throws TTransportException {
+    underlying.open();
+  }
+
+  @Override
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    int got = readBuffer.read(buf, off, len);
+    if (got > 0) {
+      return got;
+    }
+
+    // Read another frame of data
+    readFrame();
+
+    return readBuffer.read(buf, off, len);
+  }
+
+  private void readFrame() throws TTransportException {
+    underlying.readAll(i32buf , 0, 4);
+    int size = TFramedTransport.decodeFrameSize(i32buf);
+
+    if (size < 0) {
+      throw new TTransportException("Read a negative frame size (" + size + ")!");
+    }
+
+    if (size > maxLength) {
+      throw new TTransportException("Frame size (" + size + ") larger than max length (" + maxLength + ")!");
+    }
+
+    readBuffer.fill(underlying, size);
+  }
+
+  @Override
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    writeBuffer.write(buf, off, len);
+  }
+
+  @Override
+  public void consumeBuffer(int len) {
+    readBuffer.consumeBuffer(len);
+  }
+
+  @Override
+  public void flush() throws TTransportException {
+    int length = writeBuffer.getPos();
+    TFramedTransport.encodeFrameSize(length, i32buf);
+    underlying.write(i32buf, 0, 4);
+    underlying.write(writeBuffer.getBuf().array(), 0, length);
+    writeBuffer.reset();
+    underlying.flush();
+  }
+
+  @Override
+  public byte[] getBuffer() {
+    return readBuffer.getBuffer();
+  }
+
+  @Override
+  public int getBufferPosition() {
+    return readBuffer.getBufferPosition();
+  }
+
+  @Override
+  public int getBytesRemainingInBuffer() {
+    return readBuffer.getBytesRemainingInBuffer();
+  }
+}
diff --git a/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBuffer.java b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBuffer.java
new file mode 100644
index 0000000..337dcf8
--- /dev/null
+++ b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBuffer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+import junit.framework.TestCase;
+
+public class TestAutoExpandingBuffer extends TestCase {
+  public void testExpands() throws Exception {
+    // has expected initial capacity
+    AutoExpandingBuffer b = new AutoExpandingBuffer(10, 1.5);
+    assertEquals(10, b.array().length);
+
+    // doesn't shrink
+    b.resizeIfNecessary(8);
+    assertEquals(10, b.array().length);
+
+    // grows when more capacity is needed
+    b.resizeIfNecessary(100);
+    assertTrue(b.array().length >= 100);
+  }
+}
diff --git a/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferReadTransport.java b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferReadTransport.java
new file mode 100644
index 0000000..2e1f947
--- /dev/null
+++ b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferReadTransport.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+import java.nio.ByteBuffer;
+
+import junit.framework.TestCase;
+
+public class TestAutoExpandingBufferReadTransport extends TestCase {
+  private static final byte[] HUNDRED_BYTES = new byte[100];
+
+  static {
+    for (byte i = 0; i < 100; i++) {
+      HUNDRED_BYTES[i] = i;
+    }
+  }
+
+  public void testIt() throws Exception {
+    AutoExpandingBufferReadTransport t = new AutoExpandingBufferReadTransport(150, 1.5);
+
+    TMemoryInputTransport membuf = new TMemoryInputTransport(HUNDRED_BYTES);
+
+    t.fill(membuf, 100);
+    assertEquals(100, t.getBytesRemainingInBuffer());
+    assertEquals(0, t.getBufferPosition());
+
+    byte[] target = new byte[10];
+    assertEquals(10, t.read(target, 0, 10));
+    assertEquals(ByteBuffer.wrap(HUNDRED_BYTES, 0, 10), ByteBuffer.wrap(target));
+
+    assertEquals(90, t.getBytesRemainingInBuffer());
+    assertEquals(10, t.getBufferPosition());
+  }
+}
diff --git a/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferWriteTransport.java b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferWriteTransport.java
new file mode 100644
index 0000000..d5f239d
--- /dev/null
+++ b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferWriteTransport.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+import java.nio.ByteBuffer;
+
+import junit.framework.TestCase;
+
+public class TestAutoExpandingBufferWriteTransport extends TestCase {
+
+  public void testIt() throws Exception {
+    AutoExpandingBufferWriteTransport t = new AutoExpandingBufferWriteTransport(1, 1.5);
+    assertEquals(1, t.getBuf().array().length);
+    byte[] b1 = new byte[]{1,2,3};
+    t.write(b1);
+    assertEquals(3, t.getPos());
+    assertTrue(t.getBuf().array().length >= 3);
+    assertEquals(ByteBuffer.wrap(b1), ByteBuffer.wrap(t.getBuf().array(), 0, 3));
+
+    t.reset();
+    assertTrue(t.getBuf().array().length >= 3);
+    assertEquals(0, t.getPos());
+    byte[] b2 = new byte[]{4,5};
+    t.write(b2);
+    assertEquals(2, t.getPos());
+    assertEquals(ByteBuffer.wrap(b2), ByteBuffer.wrap(t.getBuf().array(), 0, 2));
+  }
+}
diff --git a/lib/java/test/org/apache/thrift/transport/TestTFastFramedTransport.java b/lib/java/test/org/apache/thrift/transport/TestTFastFramedTransport.java
new file mode 100644
index 0000000..e024049
--- /dev/null
+++ b/lib/java/test/org/apache/thrift/transport/TestTFastFramedTransport.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+public class TestTFastFramedTransport extends TestTFramedTransport {
+  @Override
+  protected TTransport getTransport(TTransport underlying) {
+    return new TFastFramedTransport(underlying, 50, 10 * 1024 * 1024);
+  }
+}
diff --git a/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java b/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java
index 27dad80..78f58ec 100644
--- a/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java
+++ b/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java
@@ -18,6 +18,7 @@
  */
 package org.apache.thrift.transport;
 
+import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
@@ -29,7 +30,11 @@
 
 public class TestTFramedTransport extends TestCase {
 
-  private static byte[] byteSequence(int start, int end) {
+  protected TTransport getTransport(TTransport underlying) {
+    return new TFramedTransport(underlying);
+  }
+
+  public static byte[] byteSequence(int start, int end) {
     byte[] result = new byte[end-start+1];
     for (int i = 0; i <= (end-start); i++) {
       result[i] = (byte)(start+i);
@@ -43,26 +48,37 @@
     dos.writeInt(50);
     dos.write(byteSequence(0, 49));
 
+    dos.writeInt(220);
+    dos.write(byteSequence(0, 219));
+
     TMemoryBuffer membuf = new TMemoryBuffer(0);
     membuf.write(baos.toByteArray());
 
     ReadCountingTransport countTrans = new ReadCountingTransport(membuf);
-    TFramedTransport trans = new TFramedTransport(countTrans);
+    TTransport trans = getTransport(countTrans);
 
     byte[] readBuf = new byte[10];
     trans.read(readBuf, 0, 10);
     assertTrue(Arrays.equals(readBuf, byteSequence(0,9)));
+    assertEquals(2, countTrans.readCount);
 
     trans.read(readBuf, 0, 10);
     assertTrue(Arrays.equals(readBuf, byteSequence(10,19)));
-
     assertEquals(2, countTrans.readCount);
+
+    assertEquals(30, trans.read(new byte[30], 0, 30));
+    assertEquals(2, countTrans.readCount);
+
+    readBuf = new byte[220];
+    assertEquals(220, trans.read(readBuf, 0, 220));
+    assertTrue(Arrays.equals(readBuf, byteSequence(0, 219)));
+    assertEquals(4, countTrans.readCount);
   }
 
   public void testWrite() throws TTransportException, IOException {
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    WriteCountingTransport countingTrans = new WriteCountingTransport(new TIOStreamTransport(baos));
-    TTransport trans = new TFramedTransport(countingTrans);
+    WriteCountingTransport countingTrans = new WriteCountingTransport(new TIOStreamTransport(new BufferedOutputStream(baos)));
+    TTransport trans = getTransport(countingTrans);
 
     trans.write(byteSequence(0,100));
     assertEquals(0, countingTrans.writeCount);
@@ -73,12 +89,21 @@
     trans.flush();
     assertEquals(2, countingTrans.writeCount);
 
+    trans.write(byteSequence(0, 245));
+    trans.flush();
+    assertEquals(4, countingTrans.writeCount);
+
     DataInputStream din = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
     assertEquals(256, din.readInt());
 
     byte[] buf = new byte[256];
     din.read(buf, 0, 256);
     assertTrue(Arrays.equals(byteSequence(0,255), buf));
+
+    assertEquals(246, din.readInt());
+    buf = new byte[246];
+    din.read(buf, 0, 246);
+    assertTrue(Arrays.equals(byteSequence(0,245), buf));
   }
 
   public void testDirectRead() throws IOException, TTransportException {
@@ -86,12 +111,14 @@
     DataOutputStream dos = new DataOutputStream(baos);
     dos.writeInt(50);
     dos.write(byteSequence(0, 49));
+    dos.writeInt(75);
+    dos.write(byteSequence(125, 200));
 
     TMemoryBuffer membuf = new TMemoryBuffer(0);
     membuf.write(baos.toByteArray());
 
     ReadCountingTransport countTrans = new ReadCountingTransport(membuf);
-    TFramedTransport trans = new TFramedTransport(countTrans);
+    TTransport trans = getTransport(countTrans);
 
     assertEquals(0, trans.getBytesRemainingInBuffer());
 
@@ -107,5 +134,15 @@
     assertEquals(15, trans.getBufferPosition());
 
     assertEquals(2, countTrans.readCount);
+
+    assertEquals(35, trans.read(new byte[35], 0, 35));
+    assertEquals(0, trans.getBytesRemainingInBuffer());
+    assertEquals(50, trans.getBufferPosition());
+
+    trans.read(readBuf, 0, 10);
+    assertEquals(4, countTrans.readCount);
+    assertTrue(Arrays.equals(readBuf, byteSequence(125,134)));
+    assertEquals(65, trans.getBytesRemainingInBuffer());
+    assertEquals(10, trans.getBufferPosition());
   }
 }
diff --git a/lib/java/test/org/apache/thrift/transport/WriteCountingTransport.java b/lib/java/test/org/apache/thrift/transport/WriteCountingTransport.java
index 39a7836..daad838 100644
--- a/lib/java/test/org/apache/thrift/transport/WriteCountingTransport.java
+++ b/lib/java/test/org/apache/thrift/transport/WriteCountingTransport.java
@@ -48,4 +48,9 @@
     writeCount ++;
     trans.write(buf, off, len);
   }
+
+  @Override
+  public void flush() throws TTransportException {
+    trans.flush();
+  }
 }
\ No newline at end of file