THRIFT-5494 fix cpu full caused by infinite select() when frameSize < maxReadBufferBytes but readBufferBytesAllocated.get() + frameSize always greater than MAX_READ_BUFFER_BYTES
Client: Java
Patch: wangfan <wangfan8@xiaomi.com>
This closes #2533
diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
index f91e825..beef954 100644
--- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
@@ -352,9 +352,10 @@
// if this frame will always be too large for this server, log the
// error and close the connection.
- if (frameSize > MAX_READ_BUFFER_BYTES) {
+ if (frameSize > trans_.getMaxFrameSize()) {
LOGGER.error("Read a frame size of " + frameSize
- + ", which is bigger than the maximum allowable buffer size for ALL connections.");
+ + ", which is bigger than the maximum allowable frame size "
+ + trans_.getMaxFrameSize() + " for ALL connections.");
return false;
}
diff --git a/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java b/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java
index f32efae..f33b8b7 100644
--- a/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java
@@ -26,15 +26,20 @@
protected long getMaxMessageSize() { return getConfiguration().getMaxMessageSize(); }
+ public int getMaxFrameSize() { return getConfiguration().getMaxFrameSize(); }
+
+ public void setMaxFrameSize(int maxFrameSize) { getConfiguration().setMaxFrameSize(maxFrameSize); }
+
protected long knownMessageSize;
protected long remainingMessageSize;
private TConfiguration _configuration;
+
public TConfiguration getConfiguration() {
return _configuration;
}
- public TEndpointTransport( TConfiguration config) throws TTransportException {
+ public TEndpointTransport(TConfiguration config) throws TTransportException {
_configuration = Objects.isNull(config) ? new TConfiguration() : config;
resetConsumedMessageSize(-1);
diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java
index 1631892..535fd6f 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java
@@ -30,6 +30,8 @@
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import org.apache.thrift.TConfiguration;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,6 +56,11 @@
*/
private int clientTimeout_ = 0;
+ /**
+ * Limit for client sockets request size
+ */
+ private int maxFrameSize_ = 0;
+
public static class NonblockingAbstractServerSocketArgs extends
AbstractServerTransportArgs<NonblockingAbstractServerSocketArgs> {}
@@ -68,7 +75,11 @@
* Creates just a port listening server socket
*/
public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException {
- this(new NonblockingAbstractServerSocketArgs().port(port).clientTimeout(clientTimeout));
+ this(port, clientTimeout, TConfiguration.DEFAULT_MAX_FRAME_SIZE);
+ }
+
+ public TNonblockingServerSocket(int port, int clientTimeout, int maxFrameSize) throws TTransportException {
+ this(new NonblockingAbstractServerSocketArgs().port(port).clientTimeout(clientTimeout).maxFrameSize(maxFrameSize));
}
public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException {
@@ -76,11 +87,16 @@
}
public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
- this(new NonblockingAbstractServerSocketArgs().bindAddr(bindAddr).clientTimeout(clientTimeout));
+ this(bindAddr, clientTimeout, TConfiguration.DEFAULT_MAX_FRAME_SIZE);
+ }
+
+ public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout, int maxFrameSize) throws TTransportException {
+ this(new NonblockingAbstractServerSocketArgs().bindAddr(bindAddr).clientTimeout(clientTimeout).maxFrameSize(maxFrameSize));
}
public TNonblockingServerSocket(NonblockingAbstractServerSocketArgs args) throws TTransportException {
clientTimeout_ = args.clientTimeout;
+ maxFrameSize_ = args.maxFrameSize;
try {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
@@ -121,6 +137,7 @@
TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
tsocket.setTimeout(clientTimeout_);
+ tsocket.setMaxFrameSize(maxFrameSize_);
return tsocket;
} catch (IOException iox) {
throw new TTransportException(iox);
diff --git a/lib/java/src/org/apache/thrift/transport/TServerTransport.java b/lib/java/src/org/apache/thrift/transport/TServerTransport.java
index 55ef0c4..3a7b49a 100644
--- a/lib/java/src/org/apache/thrift/transport/TServerTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TServerTransport.java
@@ -22,6 +22,8 @@
import java.io.Closeable;
import java.net.InetSocketAddress;
+import org.apache.thrift.TConfiguration;
+
/**
* Server transport. Object which provides client transports.
*
@@ -32,6 +34,7 @@
int backlog = 0; // A value of 0 means the default value will be used (currently set at 50)
int clientTimeout = 0;
InetSocketAddress bindAddr;
+ int maxFrameSize = TConfiguration.DEFAULT_MAX_FRAME_SIZE;
public T backlog(int backlog) {
this.backlog = backlog;
@@ -52,6 +55,11 @@
this.bindAddr = bindAddr;
return (T) this;
}
+
+ public T maxFrameSize(int maxFrameSize) {
+ this.maxFrameSize = maxFrameSize;
+ return (T) this;
+ }
}
public abstract void listen() throws TTransportException;