Thrift multithreaded Java server

Summary: Ported the Pillar multithreaded Java server to Thrift


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664791 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/server/TServer.java b/lib/java/src/server/TServer.java
index 38ef81f..702ba69 100644
--- a/lib/java/src/server/TServer.java
+++ b/lib/java/src/server/TServer.java
@@ -24,6 +24,13 @@
   protected Options options_;
 
   /**
+   * Default options constructor
+   */
+  protected TServer(TProcessor processor) {
+    this(processor, new Options());
+  }
+
+  /**
    * Default constructor, all servers take a processor and some options.
    */
   protected TServer(TProcessor processor, Options options) {
diff --git a/lib/java/src/server/TSimpleServer.java b/lib/java/src/server/TSimpleServer.java
index 352a6de..94b739e 100644
--- a/lib/java/src/server/TSimpleServer.java
+++ b/lib/java/src/server/TSimpleServer.java
@@ -16,6 +16,12 @@
   private TServerTransport serverTransport_;
 
   public TSimpleServer(TProcessor processor,
+                       TServerTransport serverTransport) {
+    this(processor, new TServer.Options(), serverTransport);
+  }
+
+
+  public TSimpleServer(TProcessor processor,
                        TServer.Options options,
                        TServerTransport serverTransport) {
     super(processor, options);
diff --git a/lib/java/src/server/TThreadPoolServer.java b/lib/java/src/server/TThreadPoolServer.java
new file mode 100644
index 0000000..d19275e
--- /dev/null
+++ b/lib/java/src/server/TThreadPoolServer.java
@@ -0,0 +1,106 @@
+package com.facebook.thrift.server;
+
+import com.facebook.thrift.TException;
+import com.facebook.thrift.TProcessor;
+import com.facebook.thrift.transport.TServerTransport;
+import com.facebook.thrift.transport.TTransport;
+import com.facebook.thrift.transport.TTransportException;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Server which uses Java's built in ThreadPool management to spawn off
+ * a worker pool that 
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+public class TThreadPoolServer extends TServer {
+
+  // Server transport
+  private TServerTransport serverTransport_;
+
+  // Executor service for handling client connections
+  private ExecutorService executorService_;
+
+  // Customizable server options
+  public static class Options extends TServer.Options {
+    public int port = 9190;
+    public int minWorkerThreads = 5;
+    public int maxWorkerThreads = Integer.MAX_VALUE;
+  }
+
+  public TThreadPoolServer(TProcessor processor,
+                           TServerTransport serverTransport) {
+    this(processor, new Options(), serverTransport);
+  }
+
+  public TThreadPoolServer(TProcessor processor,
+                           Options options,
+                           TServerTransport serverTransport) {
+    super(processor, options);
+    serverTransport_ = serverTransport;
+    executorService_ = null;
+
+    LinkedBlockingQueue<Runnable> executorQueue =
+      new LinkedBlockingQueue<Runnable>();
+
+    executorService_ = new ThreadPoolExecutor(options.minWorkerThreads,
+                                              options.maxWorkerThreads,
+                                              60,
+                                              TimeUnit.SECONDS,
+                                              executorQueue);
+  }
+
+
+  public void run() {
+    try {
+      serverTransport_.listen();
+    } catch (TTransportException ttx) {
+      ttx.printStackTrace();
+      return;
+    }
+
+    while (true) {
+      try {
+        TTransport client = serverTransport_.accept();
+        WorkerProcess wp = new WorkerProcess(client);
+        executorService_.execute(wp);
+      } catch (TTransportException ttx) {
+        ttx.printStackTrace();
+      }
+    }
+  }
+
+  private class WorkerProcess implements Runnable {
+
+    /**
+     * Client that this services.
+     */
+    private TTransport client_;
+
+    /**
+     * Default constructor.
+     *
+     * @param client Transport to process
+     */
+    private WorkerProcess(TTransport client) {
+      client_ = client;
+    }
+
+    /**
+     * Loops on processing a client forever
+     */
+    public void run() {
+      try {
+        while (processor_.process(client_, client_)) {}
+      } catch (TException tx) {
+        tx.printStackTrace();
+      }
+      client_.close();
+    }
+  }
+}
diff --git a/lib/java/src/transport/TServerSocket.java b/lib/java/src/transport/TServerSocket.java
index a885fa1..8a8421d 100644
--- a/lib/java/src/transport/TServerSocket.java
+++ b/lib/java/src/transport/TServerSocket.java
@@ -1,6 +1,7 @@
 package com.facebook.thrift.transport;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.Socket;
 
@@ -11,12 +12,26 @@
  */
 public class TServerSocket extends TServerTransport {
   
-  private ServerSocket serverSocket_;
-  
+  private ServerSocket serverSocket_ = null;
+  private int port_ = 0;
+
   public TServerSocket(ServerSocket serverSocket) {
     serverSocket_ = serverSocket;
   }
 
+  public TServerSocket(int port) throws TTransportException {
+    port_ = port;
+    try {
+      serverSocket_ = new ServerSocket();
+      serverSocket_.setReuseAddress(true);
+      serverSocket_.setSoTimeout(0);
+      serverSocket_.bind(new InetSocketAddress(port_));
+    } catch (IOException ioe) {
+      serverSocket_ = null;
+      throw new TTransportException("Could not create ServerSocket on port " + port + ".");
+    }
+  }
+
   public void listen() throws TTransportException {}
   
   protected TSocket acceptImpl() throws TTransportException {
diff --git a/test/java/src/TestServer.java b/test/java/src/TestServer.java
index 8e3e4ed..b527777 100644
--- a/test/java/src/TestServer.java
+++ b/test/java/src/TestServer.java
@@ -5,6 +5,7 @@
 import com.facebook.thrift.protocol.TProtocol;
 import com.facebook.thrift.server.TServer;
 import com.facebook.thrift.server.TSimpleServer;
+import com.facebook.thrift.server.TThreadPoolServer;
 import com.facebook.thrift.transport.TServerSocket;
 import com.facebook.thrift.transport.TServerTransport;
 
@@ -243,23 +244,22 @@
       ThriftTest.Server testServer =
         new ThriftTest.Server(testHandler, binaryProtocol);
 
-      // Options
-      TServer.Options serverOptions =
-        new TServer.Options();
-
       // Transport
-      ServerSocket serverSocket =
-        new ServerSocket(port);
       TServerSocket tServerSocket =
-        new TServerSocket(serverSocket);
+        new TServerSocket(port);
 
-      // Server
-      TSimpleServer simpleServer =
-        new TSimpleServer(testServer, serverOptions, tServerSocket);
+      TServer serverEngine;
+
+      // Simple Server
+      // serverEngine = new TSimpleServer(testServer, tServerSocket);
+
+      // ThreadPool Server
+      serverEngine =  new TThreadPoolServer(testServer, tServerSocket);
 
       // Run it
       System.out.println("Starting the server on port " + port + "...");
-      simpleServer.run();
+      serverEngine.run();
+
     } catch (Exception x) {
       x.printStackTrace();
     }