THRIFT-4714: optimize java TFramedTransport to call write once per flush
diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java
index ad2ec55..ec7e7d4 100644
--- a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java
@@ -25,10 +25,29 @@
private final AutoExpandingBuffer buf;
private int pos;
+ private int res;
- public AutoExpandingBufferWriteTransport(int initialCapacity) {
+ /**
+ * Constructor.
+ * @param initialCapacity the initial capacity of the buffer
+ * @param frontReserve space, if any, to reserve at the beginning such
+ * that the first write is after this reserve.
+ * This allows framed transport to reserve space
+ * for the frame buffer length.
+ * @throws IllegalArgumentException if initialCapacity is less than one
+ * @throws IllegalArgumentException if frontReserve is less than zero
+ * @throws IllegalArgumentException if frontReserve is greater than initialCapacity
+ */
+ public AutoExpandingBufferWriteTransport(int initialCapacity, int frontReserve) {
+ if (initialCapacity < 1) {
+ throw new IllegalArgumentException("initialCapacity");
+ }
+ if (frontReserve < 0 || initialCapacity < frontReserve) {
+ throw new IllegalArgumentException("frontReserve");
+ }
this.buf = new AutoExpandingBuffer(initialCapacity);
- this.pos = 0;
+ this.pos = frontReserve;
+ this.res = frontReserve;
}
@Override
@@ -56,11 +75,14 @@
return buf;
}
- public int getPos() {
+ /**
+ * @return length of the buffer, including any front reserve
+ */
+ public int getLength() {
return pos;
}
public void reset() {
- pos = 0;
+ pos = res;
}
}
diff --git a/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java b/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java
index 891d798..a1fd249 100644
--- a/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java
@@ -106,8 +106,8 @@
this.underlying = underlying;
this.maxLength = maxLength;
this.initialBufferCapacity = initialBufferCapacity;
- writeBuffer = new AutoExpandingBufferWriteTransport(initialBufferCapacity);
readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity);
+ writeBuffer = new AutoExpandingBufferWriteTransport(initialBufferCapacity, 4);
}
@Override
@@ -166,16 +166,19 @@
readBuffer.consumeBuffer(len);
}
+ /**
+ * Only clears the read buffer!
+ */
public void clear() {
readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity);
}
@Override
public void flush() throws TTransportException {
- int length = writeBuffer.getPos();
- TFramedTransport.encodeFrameSize(length, i32buf);
- underlying.write(i32buf, 0, 4);
- underlying.write(writeBuffer.getBuf().array(), 0, length);
+ int payloadLength = writeBuffer.getLength() - 4;
+ byte[] data = writeBuffer.getBuf().array();
+ TFramedTransport.encodeFrameSize(payloadLength, data);
+ underlying.write(data, 0, payloadLength + 4);
writeBuffer.reset();
underlying.flush();
}
diff --git a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java b/lib/java/src/org/apache/thrift/transport/TFramedTransport.java
index fa531ef..a006c3a 100644
--- a/lib/java/src/org/apache/thrift/transport/TFramedTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TFramedTransport.java
@@ -66,16 +66,25 @@
}
/**
+ * Something to fill in the first four bytes of the buffer
+ * to make room for the frame size. This allows the
+ * implementation to write once instead of twice.
+ */
+ private static final byte[] sizeFiller_ = new byte[] { 0x00, 0x00, 0x00, 0x00 };
+
+ /**
* Constructor wraps around another transport
*/
public TFramedTransport(TTransport transport, int maxLength) {
transport_ = transport;
maxLength_ = maxLength;
+ writeBuffer_.write(sizeFiller_, 0, 4);
}
public TFramedTransport(TTransport transport) {
transport_ = transport;
maxLength_ = TFramedTransport.DEFAULT_MAX_LENGTH;
+ writeBuffer_.write(sizeFiller_, 0, 4);
}
public void open() throws TTransportException {
@@ -155,12 +164,12 @@
@Override
public void flush() throws TTransportException {
byte[] buf = writeBuffer_.get();
- int len = writeBuffer_.len();
+ int len = writeBuffer_.len() - 4; // account for the prepended frame size
writeBuffer_.reset();
+ writeBuffer_.write(sizeFiller_, 0, 4); // make room for the next frame's size data
- encodeFrameSize(len, i32buf);
- transport_.write(i32buf, 0, 4);
- transport_.write(buf, 0, len);
+ encodeFrameSize(len, buf); // this is the frame length without the filler
+ transport_.write(buf, 0, len + 4); // we have to write the frame size and frame data
transport_.flush();
}
diff --git a/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferWriteTransport.java b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferWriteTransport.java
index 6b04feb..86b5b0d 100644
--- a/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferWriteTransport.java
+++ b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferWriteTransport.java
@@ -19,26 +19,51 @@
package org.apache.thrift.transport;
import java.nio.ByteBuffer;
+import org.junit.Test;
+import static org.junit.Assert.*;
-import junit.framework.TestCase;
+public class TestAutoExpandingBufferWriteTransport {
-public class TestAutoExpandingBufferWriteTransport extends TestCase {
-
+ @Test
public void testIt() throws Exception {
- AutoExpandingBufferWriteTransport t = new AutoExpandingBufferWriteTransport(1);
+ AutoExpandingBufferWriteTransport t = new AutoExpandingBufferWriteTransport(1, 0);
+ assertEquals(0, t.getLength());
assertEquals(1, t.getBuf().array().length);
byte[] b1 = new byte[]{1,2,3};
t.write(b1);
- assertEquals(3, t.getPos());
+ assertEquals(3, t.getLength());
assertTrue(t.getBuf().array().length >= 3);
assertEquals(ByteBuffer.wrap(b1), ByteBuffer.wrap(t.getBuf().array(), 0, 3));
t.reset();
+ assertEquals(0, t.getLength());
assertTrue(t.getBuf().array().length >= 3);
- assertEquals(0, t.getPos());
byte[] b2 = new byte[]{4,5};
t.write(b2);
- assertEquals(2, t.getPos());
+ assertEquals(2, t.getLength());
assertEquals(ByteBuffer.wrap(b2), ByteBuffer.wrap(t.getBuf().array(), 0, 2));
+
+ AutoExpandingBufferWriteTransport uut = new AutoExpandingBufferWriteTransport(8, 4);
+ assertEquals(4, uut.getLength());
+ assertEquals(8, uut.getBuf().array().length);
+ uut.write(b1);
+ assertEquals(7, uut.getLength());
+ assertEquals(8, uut.getBuf().array().length);
+ assertEquals(ByteBuffer.wrap(b1), ByteBuffer.wrap(uut.getBuf().array(), 4, 3));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBadInitialSize() throws IllegalArgumentException {
+ new AutoExpandingBufferWriteTransport(0, 0);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testBadFrontReserveSize() throws IllegalArgumentException {
+ new AutoExpandingBufferWriteTransport(4, -1);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testTooSmallFrontReserveSize() throws IllegalArgumentException {
+ new AutoExpandingBufferWriteTransport(4, 5);
}
}
diff --git a/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java b/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java
index 7e889d6..e30d74b 100644
--- a/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java
+++ b/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java
@@ -125,11 +125,11 @@
assertEquals(0, countingTrans.writeCount);
trans.flush();
- assertEquals(2, countingTrans.writeCount);
+ assertEquals(1, countingTrans.writeCount);
trans.write(byteSequence(0, 245));
trans.flush();
- assertEquals(4, countingTrans.writeCount);
+ assertEquals(2, countingTrans.writeCount);
DataInputStream din = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
assertEquals(256, din.readInt());