THRIFT-2046:The worktask can be timed out in TThreadPoolServer (Java) when the max# thrift thread is reached
Client: java
Patch: Brock Noland
Introduces a task timeout for unexecuted tasks
diff --git a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
index 488d17f..adac27e 100755
--- a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
+++ b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
@@ -19,6 +19,7 @@
package org.apache.thrift.server;
+import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
@@ -50,6 +51,10 @@
public ExecutorService executorService;
public int stopTimeoutVal = 60;
public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+ public int requestTimeout = 20;
+ public TimeUnit requestTimeoutUnit = TimeUnit.SECONDS;
+ public int beBackoffSlotLength = 100;
+ public TimeUnit beBackoffSlotLengthUnit = TimeUnit.MILLISECONDS;
public Args(TServerTransport transport) {
super(transport);
@@ -65,6 +70,27 @@
return this;
}
+ public Args requestTimeout(int n) {
+ requestTimeout = n;
+ return this;
+ }
+
+ public Args requestTimeoutUnit(TimeUnit tu) {
+ requestTimeoutUnit = tu;
+ return this;
+ }
+ //Binary exponential backoff slot length
+ public Args beBackoffSlotLength(int n) {
+ beBackoffSlotLength = n;
+ return this;
+ }
+
+ //Binary exponential backoff slot time unit
+ public Args beBackoffSlotLengthUnit(TimeUnit tu) {
+ beBackoffSlotLengthUnit = tu;
+ return this;
+ }
+
public Args executorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
@@ -82,11 +108,22 @@
private final long stopTimeoutVal;
+ private final TimeUnit requestTimeoutUnit;
+
+ private final long requestTimeout;
+
+ private final long beBackoffSlotInMillis;
+
+ private Random random = new Random(System.currentTimeMillis());
+
public TThreadPoolServer(Args args) {
super(args);
stopTimeoutUnit = args.stopTimeoutUnit;
stopTimeoutVal = args.stopTimeoutVal;
+ requestTimeoutUnit = args.requestTimeoutUnit;
+ requestTimeout = args.requestTimeout;
+ beBackoffSlotInMillis = args.beBackoffSlotLengthUnit.toMillis(args.beBackoffSlotLength);
executorService_ = args.executorService != null ?
args.executorService : createDefaultExecutorService(args);
@@ -119,34 +156,46 @@
stopped_ = false;
setServing(true);
int failureCount = 0;
- while (!stopped_) {
+ while (!stopped_) {
try {
TTransport client = serverTransport_.accept();
WorkerProcess wp = new WorkerProcess(client);
- int rejections = 0;
+
+ int retryCount = 0;
+ long remainTimeInMillis = requestTimeoutUnit.toMillis(requestTimeout);
while(true) {
try {
executorService_.execute(wp);
break;
- } catch(RejectedExecutionException ex) {
- LOGGER.warn("ExecutorService rejected client " + (++rejections) +
- " times(s)", ex);
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- LOGGER.warn("Interrupted while waiting to place client on" +
- " executor queue.");
- Thread.currentThread().interrupt();
- break;
- }
- } catch(OutOfMemoryError ex) {
- LOGGER.warn("ExecutorService throws OutOfMemoryError "+ ex.getMessage() + (++rejections) +
- " times(s)", ex);
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (InterruptedException e) {
- LOGGER.warn("Interrupted while waiting to place client on executor queue.");
- Thread.currentThread().interrupt();
+ } catch(Throwable t) {
+ if (t instanceof RejectedExecutionException) {
+ retryCount++;
+ try {
+ if (remainTimeInMillis > 0) {
+ //do a truncated 20 binary exponential backoff sleep
+ long sleepTimeInMillis = ((long) (random.nextDouble() *
+ (1L << Math.min(retryCount, 20)))) * beBackoffSlotInMillis;
+ sleepTimeInMillis = Math.min(sleepTimeInMillis, remainTimeInMillis);
+ TimeUnit.MILLISECONDS.sleep(sleepTimeInMillis);
+ remainTimeInMillis = remainTimeInMillis - sleepTimeInMillis;
+ } else {
+ client.close();
+ wp = null;
+ LOGGER.warn("Task has been rejected by ExecutorService " + retryCount
+ + " times till timedout, reason: " + t);
+ break;
+ }
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interrupted while waiting to place client on executor queue.");
+ Thread.currentThread().interrupt();
+ break;
+ }
+ } else if (t instanceof Error) {
+ LOGGER.error("ExecutorService threw error: " + t, t);
+ throw (Error)t;
+ } else {
+ //for other possible runtime errors from ExecutorService, should also not kill serve
+ LOGGER.warn("ExecutorService threw error: " + t, t);
break;
}
}