THRIFT-917. java: THsHaServer should not accept an ExecutorService without catching RejectedExecutionException

This patch catches RejectedExecutionException from requestInvoke and closes the client connection when that occurs.

Patch: Ed Ceaser

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1001820 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/org/apache/thrift/server/THsHaServer.java b/lib/java/src/org/apache/thrift/server/THsHaServer.java
index b6c475e..9bf20c2 100644
--- a/lib/java/src/org/apache/thrift/server/THsHaServer.java
+++ b/lib/java/src/org/apache/thrift/server/THsHaServer.java
@@ -22,6 +22,7 @@
 
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -31,12 +32,16 @@
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * An extension of the TNonblockingServer to a Half-Sync/Half-Async server.
  * Like TNonblockingServer, it relies on the use of TFramedTransport.
  */
 public class THsHaServer extends TNonblockingServer {
+  private static final Logger LOGGER =
+    LoggerFactory.getLogger(THsHaServer.class.getName());
 
   // This wraps all the functionality of queueing and thread pool management
   // for the passing of Invocations from the Selector to workers.
@@ -285,8 +290,14 @@
    * invoker service instead of immediately invoking. The thread pool takes care of the rest.
    */
   @Override
-  protected void requestInvoke(FrameBuffer frameBuffer) {
-    invoker.execute(new Invocation(frameBuffer));
+  protected boolean requestInvoke(FrameBuffer frameBuffer) {
+    try {
+      invoker.execute(new Invocation(frameBuffer));
+      return true;
+    } catch (RejectedExecutionException rx) {
+      LOGGER.warn("ExecutorService rejected execution!", rx);
+      return false;
+    }
   }
 
   /**
diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
index 8b98031..7ee27a8 100644
--- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
@@ -254,8 +254,9 @@
    * Perform an invocation. This method could behave several different ways
    * - invoke immediately inline, queue for separate execution, etc.
    */
-  protected void requestInvoke(FrameBuffer frameBuffer) {
+  protected boolean requestInvoke(FrameBuffer frameBuffer) {
     frameBuffer.invoke();
+    return true;
   }
 
   /**
@@ -420,13 +421,16 @@
      */
     private void handleRead(SelectionKey key) {
       FrameBuffer buffer = (FrameBuffer)key.attachment();
-      if (buffer.read()) {
-        // if the buffer's frame read is complete, invoke the method.
-        if (buffer.isFrameFullyRead()) {
-          requestInvoke(buffer);
-        }
-      } else {
+      if (!buffer.read()) {
         cleanupSelectionkey(key);
+        return;
+      }
+
+      // if the buffer's frame read is complete, invoke the method.
+      if (buffer.isFrameFullyRead()) {
+        if (!requestInvoke(buffer)) {
+          cleanupSelectionkey(key);
+        }
       }
     }