Add programatic shutdown option to Java Thrift servers
Summary: Same paradigm as in C++ model. Allow ServerTransport to be interrupted to block an accept loop and cleanly stop serving client requests.
Reviewed By: dreiss
Test Plan: Invoke shutdown() method on a TServer
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665322 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/server/TServer.java b/lib/java/src/server/TServer.java
index d5f95f6..2ac322a 100644
--- a/lib/java/src/server/TServer.java
+++ b/lib/java/src/server/TServer.java
@@ -105,4 +105,10 @@
*/
public abstract void serve();
+ /**
+ * Stop the server. This is optional on a per-implementation basis. Not
+ * all servers are required to be cleanly stoppable.
+ */
+ public void stop() {}
+
}
diff --git a/lib/java/src/server/TSimpleServer.java b/lib/java/src/server/TSimpleServer.java
index 181d8e4..cb127c7 100644
--- a/lib/java/src/server/TSimpleServer.java
+++ b/lib/java/src/server/TSimpleServer.java
@@ -23,6 +23,8 @@
*/
public class TSimpleServer extends TServer {
+ private boolean stopped_ = false;
+
public TSimpleServer(TProcessor processor,
TServerTransport serverTransport) {
super(new TProcessorFactory(processor), serverTransport);
@@ -71,6 +73,7 @@
public void serve() {
+ stopped_ = false;
try {
serverTransport_.listen();
} catch (TTransportException ttx) {
@@ -78,7 +81,7 @@
return;
}
- while (true) {
+ while (!stopped_) {
TTransport client = null;
TProcessor processor = null;
TTransport inputTransport = null;
@@ -98,9 +101,13 @@
} catch (TTransportException ttx) {
// Client died, just move on
} catch (TException tx) {
- tx.printStackTrace();
+ if (!stopped_) {
+ tx.printStackTrace();
+ }
} catch (Exception x) {
- x.printStackTrace();
+ if (!stopped_) {
+ x.printStackTrace();
+ }
}
if (inputTransport != null) {
@@ -113,4 +120,9 @@
}
}
+
+ public void stop() {
+ stopped_ = true;
+ serverTransport_.interrupt();
+ }
}
diff --git a/lib/java/src/server/TThreadPoolServer.java b/lib/java/src/server/TThreadPoolServer.java
index 0945fbe..94eb5a6 100644
--- a/lib/java/src/server/TThreadPoolServer.java
+++ b/lib/java/src/server/TThreadPoolServer.java
@@ -34,10 +34,18 @@
// Executor service for handling client connections
private ExecutorService executorService_;
+ // Flag for stopping the server
+ private volatile boolean stopped_;
+
+ // Server options
+ private Options options_;
+
// Customizable server options
public static class Options {
public int minWorkerThreads = 5;
public int maxWorkerThreads = Integer.MAX_VALUE;
+ public int stopTimeoutVal = 60;
+ public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
}
public TThreadPoolServer(TProcessor processor,
@@ -120,6 +128,8 @@
60,
TimeUnit.SECONDS,
executorQueue);
+
+ options_ = options;
}
@@ -131,17 +141,33 @@
return;
}
- while (true) {
+ stopped_ = false;
+ while (!stopped_) {
int failureCount = 0;
try {
TTransport client = serverTransport_.accept();
WorkerProcess wp = new WorkerProcess(client);
executorService_.execute(wp);
} catch (TTransportException ttx) {
- ++failureCount;
- ttx.printStackTrace();
+ if (!stopped_) {
+ ++failureCount;
+ ttx.printStackTrace();
+ }
}
}
+
+ executorService_.shutdown();
+ try {
+ executorService_.awaitTermination(options_.stopTimeoutVal,
+ options_.stopTimeoutUnit);
+ } catch (InterruptedException ix) {
+ // Ignore and more on
+ }
+ }
+
+ public void stop() {
+ stopped_ = true;
+ serverTransport_.interrupt();
}
private class WorkerProcess implements Runnable {
diff --git a/lib/java/src/transport/TServerSocket.java b/lib/java/src/transport/TServerSocket.java
index 9badf1a..cc6d900 100644
--- a/lib/java/src/transport/TServerSocket.java
+++ b/lib/java/src/transport/TServerSocket.java
@@ -112,4 +112,10 @@
}
}
+ public void interrupt() {
+ // The thread-safeness of this is dubious, but Java documentation suggests
+ // that it is safe to do this from a different thread context
+ close();
+ }
+
}
diff --git a/lib/java/src/transport/TServerTransport.java b/lib/java/src/transport/TServerTransport.java
index 2ffc8df..872ac3c 100644
--- a/lib/java/src/transport/TServerTransport.java
+++ b/lib/java/src/transport/TServerTransport.java
@@ -26,4 +26,14 @@
public abstract void close();
protected abstract TTransport acceptImpl() throws TTransportException;
+
+ /**
+ * Optional method implementation. This signals to the server transport
+ * that it should break out of any accept() or listen() that it is currently
+ * blocked on. This method, if implemented, MUST be thread safe, as it may
+ * be called from a different thread context than the other TServerTransport
+ * methods.
+ */
+ public void interrupt() {}
+
}