THRIFT-5494 fix cpu full caused by infinite select() when frameSize < maxReadBufferBytes but readBufferBytesAllocated.get() + frameSize always greater than MAX_READ_BUFFER_BYTES
Client: Java
Patch: wangfan <wangfan8@xiaomi.com>

This closes #2533
diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
index f91e825..beef954 100644
--- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
@@ -352,9 +352,10 @@
 
           // if this frame will always be too large for this server, log the
           // error and close the connection.
-          if (frameSize > MAX_READ_BUFFER_BYTES) {
+          if (frameSize > trans_.getMaxFrameSize()) {
             LOGGER.error("Read a frame size of " + frameSize
-                + ", which is bigger than the maximum allowable buffer size for ALL connections.");
+                + ", which is bigger than the maximum allowable frame size "
+                + trans_.getMaxFrameSize() + " for ALL connections.");
             return false;
           }
 
diff --git a/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java b/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java
index f32efae..f33b8b7 100644
--- a/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TEndpointTransport.java
@@ -26,15 +26,20 @@
 
     protected long getMaxMessageSize() { return getConfiguration().getMaxMessageSize(); }
 
+    public int getMaxFrameSize() { return getConfiguration().getMaxFrameSize(); }
+
+    public void setMaxFrameSize(int maxFrameSize) { getConfiguration().setMaxFrameSize(maxFrameSize); }
+
     protected long knownMessageSize;
     protected long remainingMessageSize;
 
     private TConfiguration _configuration;
+
     public TConfiguration getConfiguration() {
         return _configuration;
     }
 
-    public TEndpointTransport( TConfiguration config) throws TTransportException {
+    public TEndpointTransport(TConfiguration config) throws TTransportException {
         _configuration = Objects.isNull(config) ? new TConfiguration() : config;
 
         resetConsumedMessageSize(-1);
diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java
index 1631892..535fd6f 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java
@@ -30,6 +30,8 @@
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 
+import org.apache.thrift.TConfiguration;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -54,6 +56,11 @@
    */
   private int clientTimeout_ = 0;
 
+  /**
+   * Limit for client sockets request size
+   */
+  private int maxFrameSize_ = 0;
+
   public static class NonblockingAbstractServerSocketArgs extends
       AbstractServerTransportArgs<NonblockingAbstractServerSocketArgs> {}
 
@@ -68,7 +75,11 @@
    * Creates just a port listening server socket
    */
   public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException {
-    this(new NonblockingAbstractServerSocketArgs().port(port).clientTimeout(clientTimeout));
+    this(port, clientTimeout, TConfiguration.DEFAULT_MAX_FRAME_SIZE);
+  }
+
+  public TNonblockingServerSocket(int port, int clientTimeout, int maxFrameSize) throws TTransportException {
+    this(new NonblockingAbstractServerSocketArgs().port(port).clientTimeout(clientTimeout).maxFrameSize(maxFrameSize));
   }
 
   public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException {
@@ -76,11 +87,16 @@
   }
 
   public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
-    this(new NonblockingAbstractServerSocketArgs().bindAddr(bindAddr).clientTimeout(clientTimeout));
+    this(bindAddr, clientTimeout, TConfiguration.DEFAULT_MAX_FRAME_SIZE);
+  }
+
+  public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout, int maxFrameSize) throws TTransportException {
+    this(new NonblockingAbstractServerSocketArgs().bindAddr(bindAddr).clientTimeout(clientTimeout).maxFrameSize(maxFrameSize));
   }
 
   public TNonblockingServerSocket(NonblockingAbstractServerSocketArgs args) throws TTransportException {
     clientTimeout_ = args.clientTimeout;
+    maxFrameSize_ = args.maxFrameSize;
     try {
       serverSocketChannel = ServerSocketChannel.open();
       serverSocketChannel.configureBlocking(false);
@@ -121,6 +137,7 @@
 
       TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
       tsocket.setTimeout(clientTimeout_);
+      tsocket.setMaxFrameSize(maxFrameSize_);
       return tsocket;
     } catch (IOException iox) {
       throw new TTransportException(iox);
diff --git a/lib/java/src/org/apache/thrift/transport/TServerTransport.java b/lib/java/src/org/apache/thrift/transport/TServerTransport.java
index 55ef0c4..3a7b49a 100644
--- a/lib/java/src/org/apache/thrift/transport/TServerTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TServerTransport.java
@@ -22,6 +22,8 @@
 import java.io.Closeable;
 import java.net.InetSocketAddress;
 
+import org.apache.thrift.TConfiguration;
+
 /**
  * Server transport. Object which provides client transports.
  *
@@ -32,6 +34,7 @@
     int backlog = 0; // A value of 0 means the default value will be used (currently set at 50)
     int clientTimeout = 0;
     InetSocketAddress bindAddr;
+    int maxFrameSize = TConfiguration.DEFAULT_MAX_FRAME_SIZE;
 
     public T backlog(int backlog) {
       this.backlog = backlog;
@@ -52,6 +55,11 @@
       this.bindAddr = bindAddr;
       return (T) this;
     }
+
+    public T maxFrameSize(int maxFrameSize) {
+      this.maxFrameSize = maxFrameSize;
+      return (T) this;
+    }
   }
 
   public abstract void listen() throws TTransportException;