THRIFT-719. java: Update Nonblocking and HsHa server to avoid an extra buffer copy
This patch causes Nonblocking and HsHa servers to explicitly enforce use of TFramedTransport and make sure that the actual invoker is deserializing from a TMemoryInputTransport. This should provide a substantial boost in performance.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@927695 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/org/apache/thrift/server/THsHaServer.java b/lib/java/src/org/apache/thrift/server/THsHaServer.java
index 47600f5..933ab17 100644
--- a/lib/java/src/org/apache/thrift/server/THsHaServer.java
+++ b/lib/java/src/org/apache/thrift/server/THsHaServer.java
@@ -112,8 +112,7 @@
TNonblockingServerTransport serverTransport,
TProtocolFactory protocolFactory,
Options options) {
- this(new TProcessorFactory(processor), serverTransport,
- new TFramedTransport.Factory(),
+ this(new TProcessorFactory(processor), serverTransport,
new TFramedTransport.Factory(),
protocolFactory, protocolFactory,
options);
@@ -142,7 +141,7 @@
TFramedTransport.Factory transportFactory,
TProtocolFactory protocolFactory) {
this(processorFactory, serverTransport,
- transportFactory, transportFactory,
+ transportFactory,
protocolFactory, protocolFactory, new Options());
}
@@ -156,7 +155,7 @@
TProtocolFactory protocolFactory,
Options options) {
this(processorFactory, serverTransport,
- transportFactory, transportFactory,
+ transportFactory,
protocolFactory, protocolFactory,
options);
}
@@ -166,12 +165,11 @@
*/
public THsHaServer( TProcessor processor,
TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory inputTransportFactory,
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory) {
this(new TProcessorFactory(processor), serverTransport,
- inputTransportFactory, outputTransportFactory,
+ outputTransportFactory,
inputProtocolFactory, outputProtocolFactory);
}
@@ -180,13 +178,12 @@
*/
public THsHaServer( TProcessorFactory processorFactory,
TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory inputTransportFactory,
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory)
{
this(processorFactory, serverTransport,
- inputTransportFactory, outputTransportFactory,
+ outputTransportFactory,
inputProtocolFactory, outputProtocolFactory, new Options());
}
@@ -195,14 +192,13 @@
*/
public THsHaServer( TProcessorFactory processorFactory,
TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory inputTransportFactory,
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory,
Options options)
{
super(processorFactory, serverTransport,
- inputTransportFactory, outputTransportFactory,
+ outputTransportFactory,
inputProtocolFactory, outputProtocolFactory,
options);
diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
index 02fed33..31a6e24 100644
--- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
@@ -20,7 +20,6 @@
package org.apache.thrift.server;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
@@ -30,9 +29,6 @@
import java.util.Iterator;
import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.thrift.TByteArrayOutputStream;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
@@ -42,10 +38,13 @@
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TMemoryInputTransport;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A nonblocking TServer implementation. This allows for fairness amongst all
@@ -100,7 +99,7 @@
public TNonblockingServer(TProcessorFactory processorFactory,
TNonblockingServerTransport serverTransport) {
this(processorFactory, serverTransport,
- new TFramedTransport.Factory(), new TFramedTransport.Factory(),
+ new TFramedTransport.Factory(),
new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory());
}
@@ -108,7 +107,7 @@
TNonblockingServerTransport serverTransport,
TProtocolFactory protocolFactory) {
this(processor, serverTransport,
- new TFramedTransport.Factory(), new TFramedTransport.Factory(),
+ new TFramedTransport.Factory(),
protocolFactory, protocolFactory);
}
@@ -117,7 +116,7 @@
TFramedTransport.Factory transportFactory,
TProtocolFactory protocolFactory) {
this(processor, serverTransport,
- transportFactory, transportFactory,
+ transportFactory,
protocolFactory, protocolFactory);
}
@@ -126,42 +125,39 @@
TFramedTransport.Factory transportFactory,
TProtocolFactory protocolFactory) {
this(processorFactory, serverTransport,
- transportFactory, transportFactory,
+ transportFactory,
protocolFactory, protocolFactory);
}
public TNonblockingServer(TProcessor processor,
TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory inputTransportFactory,
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory) {
this(new TProcessorFactory(processor), serverTransport,
- inputTransportFactory, outputTransportFactory,
+ outputTransportFactory,
inputProtocolFactory, outputProtocolFactory);
}
public TNonblockingServer(TProcessorFactory processorFactory,
TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory inputTransportFactory,
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory) {
this(processorFactory, serverTransport,
- inputTransportFactory, outputTransportFactory,
+ outputTransportFactory,
inputProtocolFactory, outputProtocolFactory,
new Options());
}
public TNonblockingServer(TProcessorFactory processorFactory,
TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory inputTransportFactory,
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory,
Options options) {
super(processorFactory, serverTransport,
- inputTransportFactory, outputTransportFactory,
+ null, outputTransportFactory,
inputProtocolFactory, outputProtocolFactory);
options_ = options;
options_.validate();
@@ -522,7 +518,7 @@
// if this frame will always be too large for this server, log the
// error and close the connection.
- if (frameSize + 4 > MAX_READ_BUFFER_BYTES) {
+ if (frameSize > MAX_READ_BUFFER_BYTES) {
LOGGER.error("Read a frame size of " + frameSize
+ ", which is bigger than the maximum allowable buffer size for ALL connections.");
return false;
@@ -530,17 +526,15 @@
// if this frame will push us over the memory limit, then return.
// with luck, more memory will free up the next time around.
- if (readBufferBytesAllocated + frameSize + 4 > MAX_READ_BUFFER_BYTES) {
+ if (readBufferBytesAllocated + frameSize > MAX_READ_BUFFER_BYTES) {
return true;
}
// incremement the amount of memory allocated to read buffers
- readBufferBytesAllocated += frameSize + 4;
+ readBufferBytesAllocated += frameSize;
// reallocate the readbuffer as a frame-sized buffer
- buffer_ = ByteBuffer.allocate(frameSize + 4);
- // put the frame size at the head of the buffer
- buffer_.putInt(frameSize);
+ buffer_ = ByteBuffer.allocate(frameSize);
state_ = READING_FRAME;
} else {
@@ -699,8 +693,7 @@
* the data it needs to handle an invocation.
*/
private TTransport getInputTransport() {
- return inputTransportFactory_.getTransport(new TIOStreamTransport(
- new ByteArrayInputStream(buffer_.array())));
+ return new TMemoryInputTransport(buffer_.array());
}
/**