THRIFT-4252: Close sockets when shut down server

In TThreadPoolServer, threads are blocking in io with open sockets,
as long as clients don't close the connection, server threads are
never stopped even after a shutdown is called on server (because
they are blocked waiting for io).
To be able to stop all server threads properly, server should
proactively close sockets once a shutdown is initiated.
diff --git a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
index 87e8733..e1b6e76 100644
--- a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
+++ b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
@@ -19,9 +19,8 @@
 
 package org.apache.thrift.server;
 
-import java.util.Arrays;
-import java.util.List;
 import java.util.Random;
+import java.util.WeakHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.SynchronousQueue;
@@ -39,8 +38,7 @@
 
 /**
  * Server which uses Java's built in ThreadPool management to spawn off
- * a worker pool that
- *
+ * a worker pool that deals with client connections in blocking way.
  */
 public class TThreadPoolServer extends TServer {
   private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer.class.getName());
@@ -109,6 +107,7 @@
 
   // Executor service for handling client connections
   private ExecutorService executorService_;
+  private WeakHashMap<WorkerProcess, Boolean> activeWorkers = new WeakHashMap<>();
 
   private final TimeUnit stopTimeoutUnit;
 
@@ -148,7 +147,7 @@
   protected ExecutorService getExecutorService() {
     return executorService_;
   }
-  
+
   protected boolean preServe() {
   	try {
       serverTransport_.listen();
@@ -163,7 +162,6 @@
     }
     stopped_ = false;
     setServing(true);
-    
     return true;
   }
 
@@ -173,13 +171,14 @@
   	}
 
   	execute();
-  	waitForShutdown();
-    
+  	if (!waitForShutdown()) {
+  	  LOGGER.error("Shutdown is not done after " + stopTimeoutVal + stopTimeoutUnit);
+    }
+
     setServing(false);
   }
-  
+
   protected void execute() {
-    int failureCount = 0;
     while (!stopped_) {
       try {
         TTransport client = serverTransport_.accept();
@@ -190,6 +189,7 @@
         while(true) {
           try {
             executorService_.execute(wp);
+            activeWorkers.put(wp, Boolean.TRUE);
             break;
           } catch(Throwable t) {
             if (t instanceof RejectedExecutionException) {
@@ -226,16 +226,13 @@
         }
       } catch (TTransportException ttx) {
         if (!stopped_) {
-          ++failureCount;
           LOGGER.warn("Transport error occurred during acceptance of message.", ttx);
         }
       }
     }
   }
-  
-  protected void waitForShutdown() {
-  	executorService_.shutdown();
 
+  protected boolean waitForShutdown() {
     // Loop until awaitTermination finally does return without a interrupted
     // exception. If we don't do this, then we'll shut down prematurely. We want
     // to let the executorService clear it's task queue, closing client sockets
@@ -245,18 +242,23 @@
     while (timeoutMS >= 0) {
       try {
         executorService_.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
-        break;
+        return true;
       } catch (InterruptedException ix) {
         long newnow = System.currentTimeMillis();
         timeoutMS -= (newnow - now);
         now = newnow;
       }
     }
+    return false;
   }
 
   public void stop() {
     stopped_ = true;
     serverTransport_.interrupt();
+    executorService_.shutdown();
+    for (WorkerProcess wp : activeWorkers.keySet()) {
+      wp.stop();
+    }
   }
 
   private class WorkerProcess implements Runnable {
@@ -355,5 +357,9 @@
       }
       return false;
     }
+
+    private void stop() {
+      client_.close();
+    }
   }
 }
diff --git a/lib/java/test/org/apache/thrift/server/TestThreadPoolServer.java b/lib/java/test/org/apache/thrift/server/TestThreadPoolServer.java
new file mode 100644
index 0000000..e81d801
--- /dev/null
+++ b/lib/java/test/org/apache/thrift/server/TestThreadPoolServer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.thrift.server;
+
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TSocket;
+import org.junit.Assert;
+import org.junit.Test;
+import thrift.test.ThriftTest;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+public class TestThreadPoolServer {
+
+  /**
+   * Test server is shut down properly even with some open clients.
+   */
+  @Test
+  public void testStopServerWithOpenClient() throws Exception {
+    TServerSocket serverSocket = new TServerSocket(0);
+    TThreadPoolServer server = buildServer(serverSocket);
+    Thread serverThread = new Thread(() -> server.serve());
+    serverThread.start();
+    try (TSocket client = new TSocket("localhost", serverSocket.getServerSocket().getLocalPort())) {
+      client.open();
+      Thread.sleep(1000);
+      // There is a thread listening to the client
+      Assert.assertEquals(1, ((ThreadPoolExecutor) server.getExecutorService()).getActiveCount());
+      server.stop();
+      server.waitForShutdown();
+      // After server is stopped, the executor thread pool should be shut down
+      Assert.assertTrue("Server thread pool should be terminated.", server.getExecutorService().isTerminated());
+      Assert.assertTrue("Client is still open.", client.isOpen());
+    }
+  }
+
+  private TThreadPoolServer buildServer(TServerTransport serverSocket) {
+    TThreadPoolServer.Args args = new TThreadPoolServer.Args(serverSocket)
+        .protocolFactory(new TBinaryProtocol.Factory())
+        .processor(new ThriftTest.Processor<>(new ServerTestBase.TestHandler()));
+    return new TThreadPoolServer(args);
+  }
+}