THRIFT-917. java: THsHaServer should not accept an ExecutorService without catching RejectedExecutionException
This patch catches RejectedExecutionException from requestInvoke and closes the client connection when that occurs.
Patch: Ed Ceaser
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1001820 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/org/apache/thrift/server/THsHaServer.java b/lib/java/src/org/apache/thrift/server/THsHaServer.java
index b6c475e..9bf20c2 100644
--- a/lib/java/src/org/apache/thrift/server/THsHaServer.java
+++ b/lib/java/src/org/apache/thrift/server/THsHaServer.java
@@ -22,6 +22,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -31,12 +32,16 @@
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* An extension of the TNonblockingServer to a Half-Sync/Half-Async server.
* Like TNonblockingServer, it relies on the use of TFramedTransport.
*/
public class THsHaServer extends TNonblockingServer {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(THsHaServer.class.getName());
// This wraps all the functionality of queueing and thread pool management
// for the passing of Invocations from the Selector to workers.
@@ -285,8 +290,14 @@
* invoker service instead of immediately invoking. The thread pool takes care of the rest.
*/
@Override
- protected void requestInvoke(FrameBuffer frameBuffer) {
- invoker.execute(new Invocation(frameBuffer));
+ protected boolean requestInvoke(FrameBuffer frameBuffer) {
+ try {
+ invoker.execute(new Invocation(frameBuffer));
+ return true;
+ } catch (RejectedExecutionException rx) {
+ LOGGER.warn("ExecutorService rejected execution!", rx);
+ return false;
+ }
}
/**
diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
index 8b98031..7ee27a8 100644
--- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
@@ -254,8 +254,9 @@
* Perform an invocation. This method could behave several different ways
* - invoke immediately inline, queue for separate execution, etc.
*/
- protected void requestInvoke(FrameBuffer frameBuffer) {
+ protected boolean requestInvoke(FrameBuffer frameBuffer) {
frameBuffer.invoke();
+ return true;
}
/**
@@ -420,13 +421,16 @@
*/
private void handleRead(SelectionKey key) {
FrameBuffer buffer = (FrameBuffer)key.attachment();
- if (buffer.read()) {
- // if the buffer's frame read is complete, invoke the method.
- if (buffer.isFrameFullyRead()) {
- requestInvoke(buffer);
- }
- } else {
+ if (!buffer.read()) {
cleanupSelectionkey(key);
+ return;
+ }
+
+ // if the buffer's frame read is complete, invoke the method.
+ if (buffer.isFrameFullyRead()) {
+ if (!requestInvoke(buffer)) {
+ cleanupSelectionkey(key);
+ }
}
}