Add programatic shutdown option to Java Thrift servers

Summary: Same paradigm as in C++ model. Allow ServerTransport to be interrupted to block an accept loop and cleanly stop serving client requests.

Reviewed By: dreiss

Test Plan: Invoke shutdown() method on a TServer


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665322 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/server/TServer.java b/lib/java/src/server/TServer.java
index d5f95f6..2ac322a 100644
--- a/lib/java/src/server/TServer.java
+++ b/lib/java/src/server/TServer.java
@@ -105,4 +105,10 @@
    */
   public abstract void serve();
 
+  /**
+   * Stop the server. This is optional on a per-implementation basis. Not
+   * all servers are required to be cleanly stoppable.
+   */
+  public void stop() {}
+
 }
diff --git a/lib/java/src/server/TSimpleServer.java b/lib/java/src/server/TSimpleServer.java
index 181d8e4..cb127c7 100644
--- a/lib/java/src/server/TSimpleServer.java
+++ b/lib/java/src/server/TSimpleServer.java
@@ -23,6 +23,8 @@
  */
 public class TSimpleServer extends TServer {
 
+  private boolean stopped_ = false;
+
   public TSimpleServer(TProcessor processor,
                        TServerTransport serverTransport) {
     super(new TProcessorFactory(processor), serverTransport);
@@ -71,6 +73,7 @@
  
   
   public void serve() {
+    stopped_ = false;
     try {
       serverTransport_.listen();
     } catch (TTransportException ttx) {
@@ -78,7 +81,7 @@
       return;
     }
 
-    while (true) {
+    while (!stopped_) {
       TTransport client = null;
       TProcessor processor = null;
       TTransport inputTransport = null;
@@ -98,9 +101,13 @@
       } catch (TTransportException ttx) {
         // Client died, just move on
       } catch (TException tx) {
-        tx.printStackTrace();
+        if (!stopped_) {
+          tx.printStackTrace();
+        }
       } catch (Exception x) {
-        x.printStackTrace();
+        if (!stopped_) {
+          x.printStackTrace();
+        }
       }
 
       if (inputTransport != null) {
@@ -113,4 +120,9 @@
 
     }
   }
+
+  public void stop() {
+    stopped_ = true;
+    serverTransport_.interrupt();
+  }
 }
diff --git a/lib/java/src/server/TThreadPoolServer.java b/lib/java/src/server/TThreadPoolServer.java
index 0945fbe..94eb5a6 100644
--- a/lib/java/src/server/TThreadPoolServer.java
+++ b/lib/java/src/server/TThreadPoolServer.java
@@ -34,10 +34,18 @@
   // Executor service for handling client connections
   private ExecutorService executorService_;
 
+  // Flag for stopping the server
+  private volatile boolean stopped_;
+
+  // Server options
+  private Options options_;
+
   // Customizable server options
   public static class Options {
     public int minWorkerThreads = 5;
     public int maxWorkerThreads = Integer.MAX_VALUE;
+    public int stopTimeoutVal = 60;
+    public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
   }
 
   public TThreadPoolServer(TProcessor processor,
@@ -120,6 +128,8 @@
                                               60,
                                               TimeUnit.SECONDS,
                                               executorQueue);
+
+    options_ = options;
   }
 
 
@@ -131,17 +141,33 @@
       return;
     }
 
-    while (true) {
+    stopped_ = false;
+    while (!stopped_) {
       int failureCount = 0;
       try {
         TTransport client = serverTransport_.accept();
         WorkerProcess wp = new WorkerProcess(client);
         executorService_.execute(wp);
       } catch (TTransportException ttx) {
-        ++failureCount;
-        ttx.printStackTrace();
+        if (!stopped_) {
+          ++failureCount;
+          ttx.printStackTrace();
+        }
       }     
     }
+
+    executorService_.shutdown();
+    try {
+      executorService_.awaitTermination(options_.stopTimeoutVal,
+                                        options_.stopTimeoutUnit);
+    } catch (InterruptedException ix) {
+      // Ignore and more on
+    }
+  }
+
+  public void stop() {
+    stopped_ = true;
+    serverTransport_.interrupt();
   }
 
   private class WorkerProcess implements Runnable {
diff --git a/lib/java/src/transport/TServerSocket.java b/lib/java/src/transport/TServerSocket.java
index 9badf1a..cc6d900 100644
--- a/lib/java/src/transport/TServerSocket.java
+++ b/lib/java/src/transport/TServerSocket.java
@@ -112,4 +112,10 @@
     }
   }
 
+  public void interrupt() {
+    // The thread-safeness of this is dubious, but Java documentation suggests
+    // that it is safe to do this from a different thread context
+    close();
+  }
+
 }
diff --git a/lib/java/src/transport/TServerTransport.java b/lib/java/src/transport/TServerTransport.java
index 2ffc8df..872ac3c 100644
--- a/lib/java/src/transport/TServerTransport.java
+++ b/lib/java/src/transport/TServerTransport.java
@@ -26,4 +26,14 @@
   public abstract void close();
 
   protected abstract TTransport acceptImpl() throws TTransportException;
+
+  /**
+   * Optional method implementation. This signals to the server transport
+   * that it should break out of any accept() or listen() that it is currently
+   * blocked on. This method, if implemented, MUST be thread safe, as it may
+   * be called from a different thread context than the other TServerTransport
+   * methods.
+   */
+  public void interrupt() {}
+
 }