THRIFT-1420. java: Nonblocking and HsHa server should make sure to close all their socket connections when the selector exits
This patch makes the selector threads close out all of their open sockets before completely exiting. In testing, this appears to alleviate issues with hanging clients.
Patch: Thomas Kielbus
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1197370 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
index 7afd4b3..dccae52 100644
--- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
@@ -20,14 +20,14 @@
package org.apache.thrift.server;
-import java.io.IOException;
-import java.nio.channels.SelectionKey;
-import java.util.Iterator;
-
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransportException;
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.util.Iterator;
+
/**
* A nonblocking TServer implementation. This allows for fairness amongst all
* connected clients in terms of invocations.
@@ -154,6 +154,9 @@
select();
processInterestChanges();
}
+ for (SelectionKey selectionKey : selector.keys()) {
+ cleanupSelectionKey(selectionKey);
+ }
} catch (Throwable t) {
LOGGER.error("run() exiting due to uncaught error", t);
} finally {
diff --git a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
index 4cf5f1b..04179e6 100644
--- a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
+++ b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
@@ -537,6 +537,9 @@
processAcceptedConnections();
processInterestChanges();
}
+ for (SelectionKey selectionKey : selector.keys()) {
+ cleanupSelectionKey(selectionKey);
+ }
} catch (Throwable t) {
LOGGER.error("run() exiting due to uncaught error", t);
} finally {
diff --git a/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java b/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java
index 52b62c3..597074e 100644
--- a/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java
+++ b/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java
@@ -20,16 +20,22 @@
import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.server.TNonblockingServer.Args;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import thrift.test.ThriftTest;
public class TestNonblockingServer extends ServerTestBase {
private Thread serverThread;
private TServer server;
+ private static final int NUM_QUERIES = 10000;
protected TServer getServer(TProcessor processor, TNonblockingServerSocket socket, TProtocolFactory protoFactory) {
return new TNonblockingServer(new Args(socket).processor(processor).protocolFactory(protoFactory));
@@ -71,4 +77,42 @@
public TTransport getClientTransport(TTransport underlyingTransport) throws Exception {
return new TFramedTransport(underlyingTransport);
}
+
+
+ public void testCleanupAllSelectionKeys() throws Exception {
+ for (TProtocolFactory protoFactory : getProtocols()) {
+ TestHandler handler = new TestHandler();
+ ThriftTest.Processor processor = new ThriftTest.Processor(handler);
+
+ startServer(processor, protoFactory);
+
+ TSocket socket = new TSocket(HOST, PORT);
+ socket.setTimeout(SOCKET_TIMEOUT);
+ TTransport transport = getClientTransport(socket);
+
+ TProtocol protocol = protoFactory.getProtocol(transport);
+ ThriftTest.Client testClient = new ThriftTest.Client(protocol);
+
+ open(transport);
+
+ for (int i = 0; i < NUM_QUERIES; ++i) {
+ testClient.testI32(1);
+ }
+ server.stop();
+ for (int i = 0; i < NUM_QUERIES; ++i) {
+ try {
+ testClient.testI32(1);
+ } catch(TTransportException e) {
+ System.err.println(e);
+ e.printStackTrace();
+ if (e.getCause() instanceof java.net.SocketTimeoutException) {
+ fail("timed out when it should have thrown another kind of error!");
+ }
+ }
+ }
+
+ transport.close();
+ stopServer();
+ }
+ }
}