Thrift multithreaded Java server
Summary: Ported the Pillar multithreaded Java server to Thrift
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664791 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/server/TServer.java b/lib/java/src/server/TServer.java
index 38ef81f..702ba69 100644
--- a/lib/java/src/server/TServer.java
+++ b/lib/java/src/server/TServer.java
@@ -24,6 +24,13 @@
protected Options options_;
/**
+ * Default options constructor
+ */
+ protected TServer(TProcessor processor) {
+ this(processor, new Options());
+ }
+
+ /**
* Default constructor, all servers take a processor and some options.
*/
protected TServer(TProcessor processor, Options options) {
diff --git a/lib/java/src/server/TSimpleServer.java b/lib/java/src/server/TSimpleServer.java
index 352a6de..94b739e 100644
--- a/lib/java/src/server/TSimpleServer.java
+++ b/lib/java/src/server/TSimpleServer.java
@@ -16,6 +16,12 @@
private TServerTransport serverTransport_;
public TSimpleServer(TProcessor processor,
+ TServerTransport serverTransport) {
+ this(processor, new TServer.Options(), serverTransport);
+ }
+
+
+ public TSimpleServer(TProcessor processor,
TServer.Options options,
TServerTransport serverTransport) {
super(processor, options);
diff --git a/lib/java/src/server/TThreadPoolServer.java b/lib/java/src/server/TThreadPoolServer.java
new file mode 100644
index 0000000..d19275e
--- /dev/null
+++ b/lib/java/src/server/TThreadPoolServer.java
@@ -0,0 +1,106 @@
+package com.facebook.thrift.server;
+
+import com.facebook.thrift.TException;
+import com.facebook.thrift.TProcessor;
+import com.facebook.thrift.transport.TServerTransport;
+import com.facebook.thrift.transport.TTransport;
+import com.facebook.thrift.transport.TTransportException;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Server which uses Java's built in ThreadPool management to spawn off
+ * a worker pool that
+ *
+ * @author Mark Slee <mcslee@facebook.com>
+ */
+public class TThreadPoolServer extends TServer {
+
+ // Server transport
+ private TServerTransport serverTransport_;
+
+ // Executor service for handling client connections
+ private ExecutorService executorService_;
+
+ // Customizable server options
+ public static class Options extends TServer.Options {
+ public int port = 9190;
+ public int minWorkerThreads = 5;
+ public int maxWorkerThreads = Integer.MAX_VALUE;
+ }
+
+ public TThreadPoolServer(TProcessor processor,
+ TServerTransport serverTransport) {
+ this(processor, new Options(), serverTransport);
+ }
+
+ public TThreadPoolServer(TProcessor processor,
+ Options options,
+ TServerTransport serverTransport) {
+ super(processor, options);
+ serverTransport_ = serverTransport;
+ executorService_ = null;
+
+ LinkedBlockingQueue<Runnable> executorQueue =
+ new LinkedBlockingQueue<Runnable>();
+
+ executorService_ = new ThreadPoolExecutor(options.minWorkerThreads,
+ options.maxWorkerThreads,
+ 60,
+ TimeUnit.SECONDS,
+ executorQueue);
+ }
+
+
+ public void run() {
+ try {
+ serverTransport_.listen();
+ } catch (TTransportException ttx) {
+ ttx.printStackTrace();
+ return;
+ }
+
+ while (true) {
+ try {
+ TTransport client = serverTransport_.accept();
+ WorkerProcess wp = new WorkerProcess(client);
+ executorService_.execute(wp);
+ } catch (TTransportException ttx) {
+ ttx.printStackTrace();
+ }
+ }
+ }
+
+ private class WorkerProcess implements Runnable {
+
+ /**
+ * Client that this services.
+ */
+ private TTransport client_;
+
+ /**
+ * Default constructor.
+ *
+ * @param client Transport to process
+ */
+ private WorkerProcess(TTransport client) {
+ client_ = client;
+ }
+
+ /**
+ * Loops on processing a client forever
+ */
+ public void run() {
+ try {
+ while (processor_.process(client_, client_)) {}
+ } catch (TException tx) {
+ tx.printStackTrace();
+ }
+ client_.close();
+ }
+ }
+}
diff --git a/lib/java/src/transport/TServerSocket.java b/lib/java/src/transport/TServerSocket.java
index a885fa1..8a8421d 100644
--- a/lib/java/src/transport/TServerSocket.java
+++ b/lib/java/src/transport/TServerSocket.java
@@ -1,6 +1,7 @@
package com.facebook.thrift.transport;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
@@ -11,12 +12,26 @@
*/
public class TServerSocket extends TServerTransport {
- private ServerSocket serverSocket_;
-
+ private ServerSocket serverSocket_ = null;
+ private int port_ = 0;
+
public TServerSocket(ServerSocket serverSocket) {
serverSocket_ = serverSocket;
}
+ public TServerSocket(int port) throws TTransportException {
+ port_ = port;
+ try {
+ serverSocket_ = new ServerSocket();
+ serverSocket_.setReuseAddress(true);
+ serverSocket_.setSoTimeout(0);
+ serverSocket_.bind(new InetSocketAddress(port_));
+ } catch (IOException ioe) {
+ serverSocket_ = null;
+ throw new TTransportException("Could not create ServerSocket on port " + port + ".");
+ }
+ }
+
public void listen() throws TTransportException {}
protected TSocket acceptImpl() throws TTransportException {
diff --git a/test/java/src/TestServer.java b/test/java/src/TestServer.java
index 8e3e4ed..b527777 100644
--- a/test/java/src/TestServer.java
+++ b/test/java/src/TestServer.java
@@ -5,6 +5,7 @@
import com.facebook.thrift.protocol.TProtocol;
import com.facebook.thrift.server.TServer;
import com.facebook.thrift.server.TSimpleServer;
+import com.facebook.thrift.server.TThreadPoolServer;
import com.facebook.thrift.transport.TServerSocket;
import com.facebook.thrift.transport.TServerTransport;
@@ -243,23 +244,22 @@
ThriftTest.Server testServer =
new ThriftTest.Server(testHandler, binaryProtocol);
- // Options
- TServer.Options serverOptions =
- new TServer.Options();
-
// Transport
- ServerSocket serverSocket =
- new ServerSocket(port);
TServerSocket tServerSocket =
- new TServerSocket(serverSocket);
+ new TServerSocket(port);
- // Server
- TSimpleServer simpleServer =
- new TSimpleServer(testServer, serverOptions, tServerSocket);
+ TServer serverEngine;
+
+ // Simple Server
+ // serverEngine = new TSimpleServer(testServer, tServerSocket);
+
+ // ThreadPool Server
+ serverEngine = new TThreadPoolServer(testServer, tServerSocket);
// Run it
System.out.println("Starting the server on port " + port + "...");
- simpleServer.run();
+ serverEngine.run();
+
} catch (Exception x) {
x.printStackTrace();
}