Thrift TTransportFactory model for servers
Summary: Servers need to create bufferedtransports etc. around the transports they get in a user-definable way. So use a factory pattern to allow the user to supply an object to the server that defines this behavior.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664792 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/server/TServer.java b/lib/java/src/server/TServer.java
index 702ba69..dd83098 100644
--- a/lib/java/src/server/TServer.java
+++ b/lib/java/src/server/TServer.java
@@ -1,6 +1,9 @@
package com.facebook.thrift.server;
import com.facebook.thrift.TProcessor;
+import com.facebook.thrift.transport.TServerTransport;
+import com.facebook.thrift.transport.TTransportFactory;
+import com.facebook.thrift.transport.TBaseTransportFactory;
/**
* Generic interface for a Thrift server.
@@ -17,24 +20,64 @@
public Options() {}
}
- /** Core processor */
+ /**
+ * Core processor
+ */
protected TProcessor processor_;
- /** Server options */
+ /**
+ * Server options
+ */
protected Options options_;
/**
- * Default options constructor
+ * Server transport
*/
- protected TServer(TProcessor processor) {
- this(processor, new Options());
- }
+ protected TServerTransport serverTransport_;
/**
- * Default constructor, all servers take a processor and some options.
+ * Transport Factory
*/
- protected TServer(TProcessor processor, Options options) {
+ protected TTransportFactory transportFactory_;
+
+ /**
+ * Default constructors.
+ */
+
+ protected TServer(TProcessor processor,
+ TServerTransport serverTransport) {
+ this(processor,
+ serverTransport,
+ new TBaseTransportFactory(),
+ new Options());
+ }
+
+ protected TServer(TProcessor processor,
+ TServerTransport serverTransport,
+ TTransportFactory transportFactory) {
+ this(processor,
+ serverTransport,
+ transportFactory,
+ new Options());
+ }
+
+
+ protected TServer(TProcessor processor,
+ TServerTransport serverTransport,
+ Options options) {
+ this(processor,
+ serverTransport,
+ new TBaseTransportFactory(),
+ options);
+ }
+
+ protected TServer(TProcessor processor,
+ TServerTransport serverTransport,
+ TTransportFactory transportFactory,
+ Options options) {
processor_ = processor;
+ serverTransport_ = serverTransport;
+ transportFactory_ = transportFactory;
options_ = options;
}
diff --git a/lib/java/src/server/TSimpleServer.java b/lib/java/src/server/TSimpleServer.java
index 94b739e..76a5762 100644
--- a/lib/java/src/server/TSimpleServer.java
+++ b/lib/java/src/server/TSimpleServer.java
@@ -13,19 +13,9 @@
*/
public class TSimpleServer extends TServer {
- private TServerTransport serverTransport_;
-
public TSimpleServer(TProcessor processor,
TServerTransport serverTransport) {
- this(processor, new TServer.Options(), serverTransport);
- }
-
-
- public TSimpleServer(TProcessor processor,
- TServer.Options options,
- TServerTransport serverTransport) {
- super(processor, options);
- serverTransport_ = serverTransport;
+ super(processor, serverTransport);
}
public void run() {
@@ -38,18 +28,24 @@
while (true) {
TTransport client = null;
+ TTransport[] io = null;
try {
client = serverTransport_.accept();
if (client != null) {
- while (processor_.process(client, client));
+ io = transportFactory_.getIOTransports(client);
+ while (processor_.process(io[0], io[1]));
}
} catch (TException tx) {
tx.printStackTrace();
}
- if (client != null) {
- client.close();
- client = null;
+ if (io != null) {
+ if (io[0] != null) {
+ io[0].close();
+ }
+ if (io[1] != null) {
+ io[1].close();
+ }
}
}
}
diff --git a/lib/java/src/server/TThreadPoolServer.java b/lib/java/src/server/TThreadPoolServer.java
index d19275e..2f5be8d 100644
--- a/lib/java/src/server/TThreadPoolServer.java
+++ b/lib/java/src/server/TThreadPoolServer.java
@@ -5,6 +5,8 @@
import com.facebook.thrift.transport.TServerTransport;
import com.facebook.thrift.transport.TTransport;
import com.facebook.thrift.transport.TTransportException;
+import com.facebook.thrift.transport.TTransportFactory;
+import com.facebook.thrift.transport.TBaseTransportFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
@@ -20,28 +22,28 @@
*/
public class TThreadPoolServer extends TServer {
- // Server transport
- private TServerTransport serverTransport_;
-
// Executor service for handling client connections
private ExecutorService executorService_;
// Customizable server options
public static class Options extends TServer.Options {
- public int port = 9190;
public int minWorkerThreads = 5;
public int maxWorkerThreads = Integer.MAX_VALUE;
}
public TThreadPoolServer(TProcessor processor,
TServerTransport serverTransport) {
- this(processor, new Options(), serverTransport);
+ this(processor,
+ serverTransport,
+ new TBaseTransportFactory(),
+ new Options());
}
-
+
public TThreadPoolServer(TProcessor processor,
- Options options,
- TServerTransport serverTransport) {
- super(processor, options);
+ TServerTransport serverTransport,
+ TTransportFactory transportFactory,
+ Options options) {
+ super(processor, serverTransport, transportFactory, options);
serverTransport_ = serverTransport;
executorService_ = null;
@@ -95,12 +97,22 @@
* Loops on processing a client forever
*/
public void run() {
+ TTransport[] io = null;
try {
- while (processor_.process(client_, client_)) {}
+ io = transportFactory_.getIOTransports(client_);
+ while (processor_.process(io[0], io[1])) {}
} catch (TException tx) {
tx.printStackTrace();
}
- client_.close();
+
+ if (io != null) {
+ if (io[0] != null) {
+ io[0].close();
+ }
+ if (io[1] != null) {
+ io[1].close();
+ }
+ }
}
}
}