THRIFT-831. java: FramedTransport implementation that reuses its buffers
This patch adds a TFastFramedTransport that is compatible with TFramedTransport, but makes use of a pair of internal, automatically-expanding buffers to avoid unnecessary reallocations. This makes interactions with the transport up to 2.5x faster.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@985049 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java
new file mode 100644
index 0000000..b02905f
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBuffer.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+/**
+ * Helper class that wraps a byte[] so that it can expand and be reused. Users
+ * should call resizeIfNecessary to make sure the buffer has suitable capacity,
+ * and then use the array as needed. Note that the internal array will grow at a
+ * rate slightly faster than the requested capacity with the (untested)
+ * objective of avoiding expensive buffer allocations and copies.
+ */
+public class AutoExpandingBuffer {
+ private byte[] array;
+
+ private final double growthCoefficient;
+
+ public AutoExpandingBuffer(int initialCapacity, double growthCoefficient) {
+ if (growthCoefficient < 1.0) {
+ throw new IllegalArgumentException("Growth coefficient must be >= 1.0");
+ }
+ array = new byte[initialCapacity];
+ this.growthCoefficient = growthCoefficient;
+ }
+
+ public void resizeIfNecessary(int size) {
+ if (array.length < size) {
+ byte[] newBuf = new byte[(int)(size * growthCoefficient)];
+ System.arraycopy(array, 0, newBuf, 0, array.length);
+ array = newBuf;
+ }
+ }
+
+ public byte[] array() {
+ return array;
+ }
+}
diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java
new file mode 100644
index 0000000..d29d60b
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferReadTransport.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+import org.apache.commons.lang.NotImplementedException;
+
+/**
+ * TTransport for reading from an AutoExpandingBuffer.
+ */
+public class AutoExpandingBufferReadTransport extends TTransport {
+
+ private final AutoExpandingBuffer buf;
+
+ private int pos = 0;
+ private int limit = 0;
+
+ public AutoExpandingBufferReadTransport(int initialCapacity, double overgrowthCoefficient) {
+ this.buf = new AutoExpandingBuffer(initialCapacity, overgrowthCoefficient);
+ }
+
+ public void fill(TTransport inTrans, int length) throws TTransportException {
+ buf.resizeIfNecessary(length);
+ inTrans.readAll(buf.array(), 0, length);
+ pos = 0;
+ limit = length;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean isOpen() { return true; }
+
+ @Override
+ public void open() throws TTransportException {}
+
+ @Override
+ public final int read(byte[] target, int off, int len) throws TTransportException {
+ int amtToRead = Math.min(len, getBytesRemainingInBuffer());
+ System.arraycopy(buf.array(), pos, target, off, amtToRead);
+ consumeBuffer(amtToRead);
+ return amtToRead;
+ }
+
+ @Override
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public final void consumeBuffer(int len) {
+ pos += len;
+ }
+
+ @Override
+ public final byte[] getBuffer() {
+ return buf.array();
+ }
+
+ @Override
+ public final int getBufferPosition() {
+ return pos;
+ }
+
+ @Override
+ public final int getBytesRemainingInBuffer() {
+ return limit - pos;
+ }
+}
+
\ No newline at end of file
diff --git a/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java
new file mode 100644
index 0000000..2376cf3
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/transport/AutoExpandingBufferWriteTransport.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+import org.apache.commons.lang.NotImplementedException;
+
+/**
+ * TTransport for writing to an AutoExpandingBuffer.
+ */
+public final class AutoExpandingBufferWriteTransport extends TTransport {
+
+ private final AutoExpandingBuffer buf;
+ private int pos;
+
+ public AutoExpandingBufferWriteTransport(int initialCapacity, double growthCoefficient) {
+ this.buf = new AutoExpandingBuffer(initialCapacity, growthCoefficient);
+ this.pos = 0;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public boolean isOpen() {return true;}
+
+ @Override
+ public void open() throws TTransportException {}
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ throw new NotImplementedException();
+ }
+
+ @Override
+ public void write(byte[] toWrite, int off, int len) throws TTransportException {
+ buf.resizeIfNecessary(pos + len);
+ System.arraycopy(toWrite, off, buf.array(), pos, len);
+ pos += len;
+ }
+
+ public AutoExpandingBuffer getBuf() {
+ return buf;
+ }
+
+ public int getPos() {
+ return pos;
+ }
+
+ public void reset() {
+ pos = 0;
+ }
+}
diff --git a/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java b/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java
new file mode 100644
index 0000000..2a1f1da
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/transport/TFastFramedTransport.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+/**
+ * This transport is wire compatible with {@link TFramedTransport}, but makes
+ * use of reusable, expanding read and write buffers in order to avoid
+ * allocating new byte[]s all the time. Since the buffers only expand, you
+ * should probably only use this transport if your messages are not too variably
+ * large, unless the persistent memory cost is not an issue.
+ *
+ * This implementation is NOT threadsafe.
+ */
+public class TFastFramedTransport extends TTransport {
+
+ public static class Factory extends TTransportFactory {
+ private final int initialCapacity;
+ private final int maxLength;
+
+ public Factory() {
+ this(DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
+ }
+
+ public Factory(int initialCapacity) {
+ this(initialCapacity, DEFAULT_MAX_LENGTH);
+ }
+
+ public Factory(int initialCapacity, int maxLength) {
+ this.initialCapacity = initialCapacity;
+ this.maxLength = maxLength;
+ }
+
+ @Override
+ public TTransport getTransport(TTransport trans) {
+ return new TFastFramedTransport(trans,
+ initialCapacity,
+ maxLength);
+ }
+ }
+
+ /**
+ * How big should the default read and write buffers be?
+ */
+ public static final int DEFAULT_BUF_CAPACITY = 1024;
+ /**
+ * How big is the largest allowable frame? Defaults to Integer.MAX_VALUE.
+ */
+ public static final int DEFAULT_MAX_LENGTH = Integer.MAX_VALUE;
+
+ private final TTransport underlying;
+ private final AutoExpandingBufferWriteTransport writeBuffer;
+ private final AutoExpandingBufferReadTransport readBuffer;
+ private final byte[] i32buf = new byte[4];
+ private final int maxLength;
+
+ /**
+ * Create a new {@link TFastFramedTransport}. Use the defaults
+ * for initial buffer size and max frame length.
+ * @param underlying Transport that real reads and writes will go through to.
+ */
+ public TFastFramedTransport(TTransport underlying) {
+ this(underlying, DEFAULT_BUF_CAPACITY, DEFAULT_MAX_LENGTH);
+ }
+
+ /**
+ * Create a new {@link TFastFramedTransport}. Use the specified
+ * initial buffer capacity and the default max frame length.
+ * @param underlying Transport that real reads and writes will go through to.
+ * @param initialBufferCapacity The initial size of the read and write buffers.
+ * In practice, it's not critical to set this unless you know in advance that
+ * your messages are going to be very large.
+ */
+ public TFastFramedTransport(TTransport underlying, int initialBufferCapacity) {
+ this(underlying, initialBufferCapacity, DEFAULT_MAX_LENGTH);
+ }
+
+ /**
+ *
+ * @param underlying Transport that real reads and writes will go through to.
+ * @param initialBufferCapacity The initial size of the read and write buffers.
+ * In practice, it's not critical to set this unless you know in advance that
+ * your messages are going to be very large. (You can pass
+ * TFramedTransportWithReusableBuffer.DEFAULT_BUF_CAPACITY if you're only
+ * using this constructor because you want to set the maxLength.)
+ * @param maxLength The max frame size you are willing to read. You can use
+ * this parameter to limit how much memory can be allocated.
+ */
+ public TFastFramedTransport(TTransport underlying, int initialBufferCapacity, int maxLength) {
+ this.underlying = underlying;
+ this.maxLength = maxLength;
+ writeBuffer = new AutoExpandingBufferWriteTransport(initialBufferCapacity, 1.5);
+ readBuffer = new AutoExpandingBufferReadTransport(initialBufferCapacity, 1.5);
+ }
+
+ @Override
+ public void close() {
+ underlying.close();
+ }
+
+ @Override
+ public boolean isOpen() {
+ return underlying.isOpen();
+ }
+
+ @Override
+ public void open() throws TTransportException {
+ underlying.open();
+ }
+
+ @Override
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ int got = readBuffer.read(buf, off, len);
+ if (got > 0) {
+ return got;
+ }
+
+ // Read another frame of data
+ readFrame();
+
+ return readBuffer.read(buf, off, len);
+ }
+
+ private void readFrame() throws TTransportException {
+ underlying.readAll(i32buf , 0, 4);
+ int size = TFramedTransport.decodeFrameSize(i32buf);
+
+ if (size < 0) {
+ throw new TTransportException("Read a negative frame size (" + size + ")!");
+ }
+
+ if (size > maxLength) {
+ throw new TTransportException("Frame size (" + size + ") larger than max length (" + maxLength + ")!");
+ }
+
+ readBuffer.fill(underlying, size);
+ }
+
+ @Override
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ writeBuffer.write(buf, off, len);
+ }
+
+ @Override
+ public void consumeBuffer(int len) {
+ readBuffer.consumeBuffer(len);
+ }
+
+ @Override
+ public void flush() throws TTransportException {
+ int length = writeBuffer.getPos();
+ TFramedTransport.encodeFrameSize(length, i32buf);
+ underlying.write(i32buf, 0, 4);
+ underlying.write(writeBuffer.getBuf().array(), 0, length);
+ writeBuffer.reset();
+ underlying.flush();
+ }
+
+ @Override
+ public byte[] getBuffer() {
+ return readBuffer.getBuffer();
+ }
+
+ @Override
+ public int getBufferPosition() {
+ return readBuffer.getBufferPosition();
+ }
+
+ @Override
+ public int getBytesRemainingInBuffer() {
+ return readBuffer.getBytesRemainingInBuffer();
+ }
+}
diff --git a/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBuffer.java b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBuffer.java
new file mode 100644
index 0000000..337dcf8
--- /dev/null
+++ b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBuffer.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+import junit.framework.TestCase;
+
+public class TestAutoExpandingBuffer extends TestCase {
+ public void testExpands() throws Exception {
+ // has expected initial capacity
+ AutoExpandingBuffer b = new AutoExpandingBuffer(10, 1.5);
+ assertEquals(10, b.array().length);
+
+ // doesn't shrink
+ b.resizeIfNecessary(8);
+ assertEquals(10, b.array().length);
+
+ // grows when more capacity is needed
+ b.resizeIfNecessary(100);
+ assertTrue(b.array().length >= 100);
+ }
+}
diff --git a/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferReadTransport.java b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferReadTransport.java
new file mode 100644
index 0000000..2e1f947
--- /dev/null
+++ b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferReadTransport.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+import java.nio.ByteBuffer;
+
+import junit.framework.TestCase;
+
+public class TestAutoExpandingBufferReadTransport extends TestCase {
+ private static final byte[] HUNDRED_BYTES = new byte[100];
+
+ static {
+ for (byte i = 0; i < 100; i++) {
+ HUNDRED_BYTES[i] = i;
+ }
+ }
+
+ public void testIt() throws Exception {
+ AutoExpandingBufferReadTransport t = new AutoExpandingBufferReadTransport(150, 1.5);
+
+ TMemoryInputTransport membuf = new TMemoryInputTransport(HUNDRED_BYTES);
+
+ t.fill(membuf, 100);
+ assertEquals(100, t.getBytesRemainingInBuffer());
+ assertEquals(0, t.getBufferPosition());
+
+ byte[] target = new byte[10];
+ assertEquals(10, t.read(target, 0, 10));
+ assertEquals(ByteBuffer.wrap(HUNDRED_BYTES, 0, 10), ByteBuffer.wrap(target));
+
+ assertEquals(90, t.getBytesRemainingInBuffer());
+ assertEquals(10, t.getBufferPosition());
+ }
+}
diff --git a/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferWriteTransport.java b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferWriteTransport.java
new file mode 100644
index 0000000..d5f239d
--- /dev/null
+++ b/lib/java/test/org/apache/thrift/transport/TestAutoExpandingBufferWriteTransport.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+import java.nio.ByteBuffer;
+
+import junit.framework.TestCase;
+
+public class TestAutoExpandingBufferWriteTransport extends TestCase {
+
+ public void testIt() throws Exception {
+ AutoExpandingBufferWriteTransport t = new AutoExpandingBufferWriteTransport(1, 1.5);
+ assertEquals(1, t.getBuf().array().length);
+ byte[] b1 = new byte[]{1,2,3};
+ t.write(b1);
+ assertEquals(3, t.getPos());
+ assertTrue(t.getBuf().array().length >= 3);
+ assertEquals(ByteBuffer.wrap(b1), ByteBuffer.wrap(t.getBuf().array(), 0, 3));
+
+ t.reset();
+ assertTrue(t.getBuf().array().length >= 3);
+ assertEquals(0, t.getPos());
+ byte[] b2 = new byte[]{4,5};
+ t.write(b2);
+ assertEquals(2, t.getPos());
+ assertEquals(ByteBuffer.wrap(b2), ByteBuffer.wrap(t.getBuf().array(), 0, 2));
+ }
+}
diff --git a/lib/java/test/org/apache/thrift/transport/TestTFastFramedTransport.java b/lib/java/test/org/apache/thrift/transport/TestTFastFramedTransport.java
new file mode 100644
index 0000000..e024049
--- /dev/null
+++ b/lib/java/test/org/apache/thrift/transport/TestTFastFramedTransport.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.transport;
+
+public class TestTFastFramedTransport extends TestTFramedTransport {
+ @Override
+ protected TTransport getTransport(TTransport underlying) {
+ return new TFastFramedTransport(underlying, 50, 10 * 1024 * 1024);
+ }
+}
diff --git a/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java b/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java
index 27dad80..78f58ec 100644
--- a/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java
+++ b/lib/java/test/org/apache/thrift/transport/TestTFramedTransport.java
@@ -18,6 +18,7 @@
*/
package org.apache.thrift.transport;
+import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -29,7 +30,11 @@
public class TestTFramedTransport extends TestCase {
- private static byte[] byteSequence(int start, int end) {
+ protected TTransport getTransport(TTransport underlying) {
+ return new TFramedTransport(underlying);
+ }
+
+ public static byte[] byteSequence(int start, int end) {
byte[] result = new byte[end-start+1];
for (int i = 0; i <= (end-start); i++) {
result[i] = (byte)(start+i);
@@ -43,26 +48,37 @@
dos.writeInt(50);
dos.write(byteSequence(0, 49));
+ dos.writeInt(220);
+ dos.write(byteSequence(0, 219));
+
TMemoryBuffer membuf = new TMemoryBuffer(0);
membuf.write(baos.toByteArray());
ReadCountingTransport countTrans = new ReadCountingTransport(membuf);
- TFramedTransport trans = new TFramedTransport(countTrans);
+ TTransport trans = getTransport(countTrans);
byte[] readBuf = new byte[10];
trans.read(readBuf, 0, 10);
assertTrue(Arrays.equals(readBuf, byteSequence(0,9)));
+ assertEquals(2, countTrans.readCount);
trans.read(readBuf, 0, 10);
assertTrue(Arrays.equals(readBuf, byteSequence(10,19)));
-
assertEquals(2, countTrans.readCount);
+
+ assertEquals(30, trans.read(new byte[30], 0, 30));
+ assertEquals(2, countTrans.readCount);
+
+ readBuf = new byte[220];
+ assertEquals(220, trans.read(readBuf, 0, 220));
+ assertTrue(Arrays.equals(readBuf, byteSequence(0, 219)));
+ assertEquals(4, countTrans.readCount);
}
public void testWrite() throws TTransportException, IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
- WriteCountingTransport countingTrans = new WriteCountingTransport(new TIOStreamTransport(baos));
- TTransport trans = new TFramedTransport(countingTrans);
+ WriteCountingTransport countingTrans = new WriteCountingTransport(new TIOStreamTransport(new BufferedOutputStream(baos)));
+ TTransport trans = getTransport(countingTrans);
trans.write(byteSequence(0,100));
assertEquals(0, countingTrans.writeCount);
@@ -73,12 +89,21 @@
trans.flush();
assertEquals(2, countingTrans.writeCount);
+ trans.write(byteSequence(0, 245));
+ trans.flush();
+ assertEquals(4, countingTrans.writeCount);
+
DataInputStream din = new DataInputStream(new ByteArrayInputStream(baos.toByteArray()));
assertEquals(256, din.readInt());
byte[] buf = new byte[256];
din.read(buf, 0, 256);
assertTrue(Arrays.equals(byteSequence(0,255), buf));
+
+ assertEquals(246, din.readInt());
+ buf = new byte[246];
+ din.read(buf, 0, 246);
+ assertTrue(Arrays.equals(byteSequence(0,245), buf));
}
public void testDirectRead() throws IOException, TTransportException {
@@ -86,12 +111,14 @@
DataOutputStream dos = new DataOutputStream(baos);
dos.writeInt(50);
dos.write(byteSequence(0, 49));
+ dos.writeInt(75);
+ dos.write(byteSequence(125, 200));
TMemoryBuffer membuf = new TMemoryBuffer(0);
membuf.write(baos.toByteArray());
ReadCountingTransport countTrans = new ReadCountingTransport(membuf);
- TFramedTransport trans = new TFramedTransport(countTrans);
+ TTransport trans = getTransport(countTrans);
assertEquals(0, trans.getBytesRemainingInBuffer());
@@ -107,5 +134,15 @@
assertEquals(15, trans.getBufferPosition());
assertEquals(2, countTrans.readCount);
+
+ assertEquals(35, trans.read(new byte[35], 0, 35));
+ assertEquals(0, trans.getBytesRemainingInBuffer());
+ assertEquals(50, trans.getBufferPosition());
+
+ trans.read(readBuf, 0, 10);
+ assertEquals(4, countTrans.readCount);
+ assertTrue(Arrays.equals(readBuf, byteSequence(125,134)));
+ assertEquals(65, trans.getBytesRemainingInBuffer());
+ assertEquals(10, trans.getBufferPosition());
}
}
diff --git a/lib/java/test/org/apache/thrift/transport/WriteCountingTransport.java b/lib/java/test/org/apache/thrift/transport/WriteCountingTransport.java
index 39a7836..daad838 100644
--- a/lib/java/test/org/apache/thrift/transport/WriteCountingTransport.java
+++ b/lib/java/test/org/apache/thrift/transport/WriteCountingTransport.java
@@ -48,4 +48,9 @@
writeCount ++;
trans.write(buf, off, len);
}
+
+ @Override
+ public void flush() throws TTransportException {
+ trans.flush();
+ }
}
\ No newline at end of file