THRIFT-2046:The worktask can be timed out in TThreadPoolServer (Java) when the max# thrift thread is reached
Client: java
Patch: Brock Noland

Introduces a task timeout for unexecuted tasks
diff --git a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
index 488d17f..adac27e 100755
--- a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
+++ b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
@@ -19,6 +19,7 @@
 
 package org.apache.thrift.server;
 
+import java.util.Random;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.SynchronousQueue;
@@ -50,6 +51,10 @@
     public ExecutorService executorService;
     public int stopTimeoutVal = 60;
     public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+    public int requestTimeout = 20;
+    public TimeUnit requestTimeoutUnit = TimeUnit.SECONDS;
+    public int beBackoffSlotLength = 100;
+    public TimeUnit beBackoffSlotLengthUnit = TimeUnit.MILLISECONDS;
 
     public Args(TServerTransport transport) {
       super(transport);
@@ -65,6 +70,27 @@
       return this;
     }
 
+    public Args requestTimeout(int n) {
+      requestTimeout = n;
+      return this;
+    }
+
+    public Args requestTimeoutUnit(TimeUnit tu) {
+      requestTimeoutUnit = tu;
+      return this;
+    }
+    //Binary exponential backoff slot length
+    public Args beBackoffSlotLength(int n) {
+      beBackoffSlotLength = n;
+      return this;
+    }
+
+    //Binary exponential backoff slot time unit
+    public Args beBackoffSlotLengthUnit(TimeUnit tu) {
+      beBackoffSlotLengthUnit = tu;
+      return this;
+    }
+
     public Args executorService(ExecutorService executorService) {
       this.executorService = executorService;
       return this;
@@ -82,11 +108,22 @@
 
   private final long stopTimeoutVal;
 
+  private final TimeUnit requestTimeoutUnit;
+
+  private final long requestTimeout;
+
+  private final long beBackoffSlotInMillis;
+
+  private Random random = new Random(System.currentTimeMillis());
+
   public TThreadPoolServer(Args args) {
     super(args);
 
     stopTimeoutUnit = args.stopTimeoutUnit;
     stopTimeoutVal = args.stopTimeoutVal;
+    requestTimeoutUnit = args.requestTimeoutUnit;
+    requestTimeout = args.requestTimeout;
+    beBackoffSlotInMillis = args.beBackoffSlotLengthUnit.toMillis(args.beBackoffSlotLength);
 
     executorService_ = args.executorService != null ?
         args.executorService : createDefaultExecutorService(args);
@@ -119,34 +156,46 @@
     stopped_ = false;
     setServing(true);
     int failureCount = 0;
-    while (!stopped_) {      
+    while (!stopped_) {
       try {
         TTransport client = serverTransport_.accept();
         WorkerProcess wp = new WorkerProcess(client);
-        int rejections = 0;
+
+        int retryCount = 0;
+        long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);
         while(true) {
           try {
             executorService_.execute(wp);
             break;
-          } catch(RejectedExecutionException ex) {
-            LOGGER.warn("ExecutorService rejected client " + (++rejections) +
-                " times(s)", ex);
-            try {
-              TimeUnit.SECONDS.sleep(1);
-            } catch (InterruptedException e) {
-              LOGGER.warn("Interrupted while waiting to place client on" +
-              		" executor queue.");
-              Thread.currentThread().interrupt();
-              break;
-            }
-          } catch(OutOfMemoryError ex) {
-            LOGGER.warn("ExecutorService throws OutOfMemoryError "+ ex.getMessage() + (++rejections) +
-                " times(s)", ex);
-            try {
-              TimeUnit.SECONDS.sleep(1);
-            } catch (InterruptedException e) {
-              LOGGER.warn("Interrupted while waiting to place client on executor queue.");
-              Thread.currentThread().interrupt();
+          } catch(Throwable t) {
+            if (t instanceof RejectedExecutionException) {
+              retryCount++;
+              try {
+                if (remainTimeInMillis > 0) {
+                  //do a truncated 20 binary exponential backoff sleep
+                  long sleepTimeInMillis = ((long) (random.nextDouble() *
+                      (1L << Math.min(retryCount, 20)))) * beBackoffSlotInMillis;
+                  sleepTimeInMillis = Math.min(sleepTimeInMillis, remainTimeInMillis);
+                  TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
+                  remainTimeInMillis = remainTimeInMillis - sleepTimeInMillis;
+                } else {
+                  client.close();
+                  wp = null;
+                  LOGGER.warn("Task has been rejected by ExecutorService " + retryCount
+                      + " times till timedout, reason: " + t);
+                  break;
+                }
+              } catch (InterruptedException e) {
+                LOGGER.warn("Interrupted while waiting to place client on executor queue.");
+                Thread.currentThread().interrupt();
+                break;
+              }
+            } else if (t instanceof Error) {
+              LOGGER.error("ExecutorService threw error: " + t, t);
+              throw (Error)t;
+            } else {
+              //for other possible runtime errors from ExecutorService, should also not kill serve
+              LOGGER.warn("ExecutorService threw error: " + t, t);
               break;
             }
           }