Thrift multithreaded Java server
Summary: Ported the Pillar multithreaded Java server to Thrift
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664791 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/server/TThreadPoolServer.java b/lib/java/src/server/TThreadPoolServer.java
new file mode 100644
index 0000000..d19275e
--- /dev/null
+++ b/lib/java/src/server/TThreadPoolServer.java
@@ -0,0 +1,106 @@
+package com.facebook.thrift.server;
+
+import com.facebook.thrift.TException;
+import com.facebook.thrift.TProcessor;
+import com.facebook.thrift.transport.TServerTransport;
+import com.facebook.thrift.transport.TTransport;
+import com.facebook.thrift.transport.TTransportException;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Server which uses Java's built in ThreadPool management to spawn off
+ * a worker pool that
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+public class TThreadPoolServer extends TServer {
+
+ // Server transport
+ private TServerTransport serverTransport_;
+
+ // Executor service for handling client connections
+ private ExecutorService executorService_;
+
+ // Customizable server options
+ public static class Options extends TServer.Options {
+ public int port = 9190;
+ public int minWorkerThreads = 5;
+ public int maxWorkerThreads = Integer.MAX_VALUE;
+ }
+
+ public TThreadPoolServer(TProcessor processor,
+ TServerTransport serverTransport) {
+ this(processor, new Options(), serverTransport);
+ }
+
+ public TThreadPoolServer(TProcessor processor,
+ Options options,
+ TServerTransport serverTransport) {
+ super(processor, options);
+ serverTransport_ = serverTransport;
+ executorService_ = null;
+
+ LinkedBlockingQueue<Runnable> executorQueue =
+ new LinkedBlockingQueue<Runnable>();
+
+ executorService_ = new ThreadPoolExecutor(options.minWorkerThreads,
+ options.maxWorkerThreads,
+ 60,
+ TimeUnit.SECONDS,
+ executorQueue);
+ }
+
+
+ public void run() {
+ try {
+ serverTransport_.listen();
+ } catch (TTransportException ttx) {
+ ttx.printStackTrace();
+ return;
+ }
+
+ while (true) {
+ try {
+ TTransport client = serverTransport_.accept();
+ WorkerProcess wp = new WorkerProcess(client);
+ executorService_.execute(wp);
+ } catch (TTransportException ttx) {
+ ttx.printStackTrace();
+ }
+ }
+ }
+
+ private class WorkerProcess implements Runnable {
+
+ /**
+ * Client that this services.
+ */
+ private TTransport client_;
+
+ /**
+ * Default constructor.
+ *
+ * @param client Transport to process
+ */
+ private WorkerProcess(TTransport client) {
+ client_ = client;
+ }
+
+ /**
+ * Loops on processing a client forever
+ */
+ public void run() {
+ try {
+ while (processor_.process(client_, client_)) {}
+ } catch (TException tx) {
+ tx.printStackTrace();
+ }
+ client_.close();
+ }
+ }
+}