THRIFT-745. java: Make it easier to instantiate servers
This patch replaces the multitude of constructors with builder-esque Args objects for each server and single constructor.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1026482 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/org/apache/thrift/server/THsHaServer.java b/lib/java/src/org/apache/thrift/server/THsHaServer.java
index 207d8e8..f3dfd0a 100644
--- a/lib/java/src/org/apache/thrift/server/THsHaServer.java
+++ b/lib/java/src/org/apache/thrift/server/THsHaServer.java
@@ -26,11 +26,6 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.TProcessorFactory;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,185 +38,65 @@
private static final Logger LOGGER =
LoggerFactory.getLogger(THsHaServer.class.getName());
+ public static class Args extends AbstractNonblockingServerArgs<Args> {
+ private int workerThreads = 5;
+ private int stopTimeoutVal = 60;
+ private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+ private ExecutorService executorService = null;
+
+ public Args(TNonblockingServerTransport transport) {
+ super(transport);
+ }
+
+ public Args workerThreads(int i) {
+ workerThreads = i;
+ return this;
+ }
+
+ public int getWorkerThreads() {
+ return workerThreads;
+ }
+
+ public int getStopTimeoutVal() {
+ return stopTimeoutVal;
+ }
+
+ public Args stopTimeoutVal(int stopTimeoutVal) {
+ this.stopTimeoutVal = stopTimeoutVal;
+ return this;
+ }
+
+ public TimeUnit getStopTimeoutUnit() {
+ return stopTimeoutUnit;
+ }
+
+ public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) {
+ this.stopTimeoutUnit = stopTimeoutUnit;
+ return this;
+ }
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public Args executorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ return this;
+ }
+ }
+
// This wraps all the functionality of queueing and thread pool management
// for the passing of Invocations from the Selector to workers.
private ExecutorService invoker;
/**
- * Create server with given processor, and server transport. Default server
- * options, TBinaryProtocol for the protocol, and TFramedTransport.Factory on
- * both input and output transports. A TProcessorFactory will be created that
- * always returns the specified processor.
- */
- public THsHaServer( TProcessor processor,
- TNonblockingServerTransport serverTransport) {
- this(processor, serverTransport, new Options());
- }
-
- /**
- * Create server with given processor, server transport, and server options
- * using TBinaryProtocol for the protocol, and TFramedTransport.Factory on
- * both input and output transports. A TProcessorFactory will be created that
- * always returns the specified processor.
- */
- public THsHaServer( TProcessor processor,
- TNonblockingServerTransport serverTransport,
- Options options) {
- this(new TProcessorFactory(processor), serverTransport, options);
- }
-
- /**
- * Create server with specified processor factory and server transport. Uses
- * default options. TBinaryProtocol is assumed. TFramedTransport.Factory is
- * used on both input and output transports.
- */
- public THsHaServer( TProcessorFactory processorFactory,
- TNonblockingServerTransport serverTransport) {
- this(processorFactory, serverTransport, new Options());
- }
-
- /**
- * Create server with specified processor factory, server transport, and server
- * options. TBinaryProtocol is assumed. TFramedTransport.Factory is used on
- * both input and output transports.
- */
- public THsHaServer( TProcessorFactory processorFactory,
- TNonblockingServerTransport serverTransport,
- Options options) {
- this(processorFactory, serverTransport, new TFramedTransport.Factory(),
- new TBinaryProtocol.Factory(), options);
- }
-
- /**
- * Server with specified processor, server transport, and in/out protocol
- * factory. Defaults will be used for in/out transport factory and server
- * options.
- */
- public THsHaServer( TProcessor processor,
- TNonblockingServerTransport serverTransport,
- TProtocolFactory protocolFactory) {
- this(processor, serverTransport, protocolFactory, new Options());
- }
-
- /**
- * Server with specified processor, server transport, and in/out protocol
- * factory. Defaults will be used for in/out transport factory and server
- * options.
- */
- public THsHaServer( TProcessor processor,
- TNonblockingServerTransport serverTransport,
- TProtocolFactory protocolFactory,
- Options options) {
- this(new TProcessorFactory(processor), serverTransport,
- new TFramedTransport.Factory(),
- protocolFactory, protocolFactory,
- options);
- }
-
- /**
- * Create server with specified processor, server transport, in/out
- * transport factory, in/out protocol factory, and default server options. A
- * processor factory will be created that always returns the specified
- * processor.
- */
- public THsHaServer( TProcessor processor,
- TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory transportFactory,
- TProtocolFactory protocolFactory) {
- this(new TProcessorFactory(processor), serverTransport,
- transportFactory, protocolFactory);
- }
-
- /**
- * Create server with specified processor factory, server transport, in/out
- * transport factory, in/out protocol factory, and default server options.
- */
- public THsHaServer( TProcessorFactory processorFactory,
- TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory transportFactory,
- TProtocolFactory protocolFactory) {
- this(processorFactory, serverTransport,
- transportFactory,
- protocolFactory, protocolFactory, new Options());
- }
-
- /**
- * Create server with specified processor factory, server transport, in/out
- * transport factory, in/out protocol factory, and server options.
- */
- public THsHaServer( TProcessorFactory processorFactory,
- TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory transportFactory,
- TProtocolFactory protocolFactory,
- Options options) {
- this(processorFactory, serverTransport,
- transportFactory,
- protocolFactory, protocolFactory,
- options);
- }
-
- /**
- * Create server with everything specified, except use default server options.
- */
- public THsHaServer( TProcessor processor,
- TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory outputTransportFactory,
- TProtocolFactory inputProtocolFactory,
- TProtocolFactory outputProtocolFactory) {
- this(new TProcessorFactory(processor), serverTransport,
- outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory);
- }
-
- /**
- * Create server with everything specified, except use default server options.
- */
- public THsHaServer( TProcessorFactory processorFactory,
- TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory outputTransportFactory,
- TProtocolFactory inputProtocolFactory,
- TProtocolFactory outputProtocolFactory)
- {
- this(processorFactory, serverTransport,
- outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory, new Options());
- }
-
- /**
- * Create server with every option fully specified, with an internally managed
- * ExecutorService
- */
- public THsHaServer( TProcessorFactory processorFactory,
- TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory outputTransportFactory,
- TProtocolFactory inputProtocolFactory,
- TProtocolFactory outputProtocolFactory,
- Options options)
- {
- this(processorFactory, serverTransport,
- outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory,
- createInvokerPool(options),
- options);
- }
-
- /**
* Create server with every option fully specified, and with an injected
* ExecutorService
*/
- public THsHaServer( TProcessorFactory processorFactory,
- TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory outputTransportFactory,
- TProtocolFactory inputProtocolFactory,
- TProtocolFactory outputProtocolFactory,
- ExecutorService executor,
- TNonblockingServer.Options options) {
- super(processorFactory, serverTransport,
- outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory,
- options);
+ public THsHaServer(Args args) {
+ super(args);
- invoker = executor;
+ invoker = args.executorService == null ? createInvokerPool(args) : args.executorService;
}
/** @inheritDoc */
@@ -255,7 +130,7 @@
/**
* Helper to create an invoker pool
*/
- protected static ExecutorService createInvokerPool(Options options) {
+ protected static ExecutorService createInvokerPool(Args options) {
int workerThreads = options.workerThreads;
int stopTimeoutVal = options.stopTimeoutVal;
TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;
@@ -326,10 +201,4 @@
frameBuffer.invoke();
}
}
-
- public static class Options extends TNonblockingServer.Options {
- public int workerThreads = 5;
- public int stopTimeoutVal = 60;
- public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
- }
}
diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
index 0d60f71..3587bf7 100644
--- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
@@ -31,11 +31,7 @@
import org.apache.thrift.TByteArrayOutputStream;
import org.apache.thrift.TException;
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.TProcessorFactory;
-import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TIOStreamTransport;
import org.apache.thrift.transport.TMemoryInputTransport;
@@ -61,6 +57,21 @@
private static final Logger LOGGER =
LoggerFactory.getLogger(TNonblockingServer.class.getName());
+ public static class Args extends AbstractNonblockingServerArgs<Args> {
+ public Args(TNonblockingServerTransport transport) {
+ super(transport);
+ }
+ }
+
+ public static abstract class AbstractNonblockingServerArgs<T extends AbstractNonblockingServerArgs<T>> extends AbstractServerArgs<T> {
+ public long maxReadBufferBytes = Long.MAX_VALUE;
+
+ public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
+ super(transport);
+ transportFactory(new TFramedTransport.Factory());
+ }
+ }
+
// Flag for stopping the server
private volatile boolean stopped_ = true;
@@ -73,95 +84,14 @@
*/
private final long MAX_READ_BUFFER_BYTES;
- protected final Options options_;
-
/**
* How many bytes are currently allocated to read buffers.
*/
private long readBufferBytesAllocated = 0;
- /**
- * Create server with given processor and server transport, using
- * TBinaryProtocol for the protocol, TFramedTransport.Factory on both input
- * and output transports. A TProcessorFactory will be created that always
- * returns the specified processor.
- */
- public TNonblockingServer(TProcessor processor,
- TNonblockingServerTransport serverTransport) {
- this(new TProcessorFactory(processor), serverTransport);
- }
-
- /**
- * Create server with specified processor factory and server transport.
- * TBinaryProtocol is assumed. TFramedTransport.Factory is used on both input
- * and output transports.
- */
- public TNonblockingServer(TProcessorFactory processorFactory,
- TNonblockingServerTransport serverTransport) {
- this(processorFactory, serverTransport,
- new TFramedTransport.Factory(),
- new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory());
- }
-
- public TNonblockingServer(TProcessor processor,
- TNonblockingServerTransport serverTransport,
- TProtocolFactory protocolFactory) {
- this(processor, serverTransport,
- new TFramedTransport.Factory(),
- protocolFactory, protocolFactory);
- }
-
- public TNonblockingServer(TProcessor processor,
- TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory transportFactory,
- TProtocolFactory protocolFactory) {
- this(processor, serverTransport,
- transportFactory,
- protocolFactory, protocolFactory);
- }
-
- public TNonblockingServer(TProcessorFactory processorFactory,
- TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory transportFactory,
- TProtocolFactory protocolFactory) {
- this(processorFactory, serverTransport,
- transportFactory,
- protocolFactory, protocolFactory);
- }
-
- public TNonblockingServer(TProcessor processor,
- TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory outputTransportFactory,
- TProtocolFactory inputProtocolFactory,
- TProtocolFactory outputProtocolFactory) {
- this(new TProcessorFactory(processor), serverTransport,
- outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory);
- }
-
- public TNonblockingServer(TProcessorFactory processorFactory,
- TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory outputTransportFactory,
- TProtocolFactory inputProtocolFactory,
- TProtocolFactory outputProtocolFactory) {
- this(processorFactory, serverTransport,
- outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory,
- new Options());
- }
-
- public TNonblockingServer(TProcessorFactory processorFactory,
- TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory outputTransportFactory,
- TProtocolFactory inputProtocolFactory,
- TProtocolFactory outputProtocolFactory,
- Options options) {
- super(processorFactory, serverTransport,
- null, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory);
- options_ = options;
- options_.validate();
- MAX_READ_BUFFER_BYTES = options.maxReadBufferBytes;
+ public TNonblockingServer(AbstractNonblockingServerArgs args) {
+ super(args);
+ MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
}
/**
@@ -772,17 +702,4 @@
}
}
} // FrameBuffer
-
-
- public static class Options {
- public long maxReadBufferBytes = Long.MAX_VALUE;
-
- public Options() {}
-
- public void validate() {
- if (maxReadBufferBytes <= 1024) {
- throw new IllegalArgumentException("You must allocate at least 1KB to the read buffer.");
- }
- }
- }
}
diff --git a/lib/java/src/org/apache/thrift/server/TServer.java b/lib/java/src/org/apache/thrift/server/TServer.java
index 34093be..0af66d3 100644
--- a/lib/java/src/org/apache/thrift/server/TServer.java
+++ b/lib/java/src/org/apache/thrift/server/TServer.java
@@ -19,6 +19,7 @@
package org.apache.thrift.server;
+import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocolFactory;
@@ -31,6 +32,67 @@
*/
public abstract class TServer {
+ public static class Args extends AbstractServerArgs<Args> {
+ public Args(TServerTransport transport) {
+ super(transport);
+ }
+ }
+
+ public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {
+ final TServerTransport serverTransport;
+ TProcessorFactory processorFactory;
+ TTransportFactory inputTransportFactory = new TTransportFactory();
+ TTransportFactory outputTransportFactory = new TTransportFactory();
+ TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
+ TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();
+
+ public AbstractServerArgs(TServerTransport transport) {
+ serverTransport = transport;
+ }
+
+ public T processorFactory(TProcessorFactory factory) {
+ this.processorFactory = factory;
+ return (T) this;
+ }
+
+ public T processor(TProcessor processor) {
+ this.processorFactory = new TProcessorFactory(processor);
+ return (T) this;
+ }
+
+ public T transportFactory(TTransportFactory factory) {
+ this.inputTransportFactory = factory;
+ this.outputTransportFactory = factory;
+ return (T) this;
+ }
+
+ public T inputTransportFactory(TTransportFactory factory) {
+ this.inputTransportFactory = factory;
+ return (T) this;
+ }
+
+ public T outputTransportFactory(TTransportFactory factory) {
+ this.outputTransportFactory = factory;
+ return (T) this;
+ }
+
+ public T protocolFactory(TProtocolFactory factory) {
+ this.inputProtocolFactory = factory;
+ this.outputProtocolFactory = factory;
+ return (T) this;
+ }
+
+ public T inputProtocolFactory(TProtocolFactory factory) {
+ this.inputProtocolFactory = factory;
+ return (T) this;
+ }
+
+ public T outputProtocolFactory(TProtocolFactory factory) {
+ this.outputProtocolFactory = factory;
+ return (T) this;
+ }
+ }
+
/**
* Core processor
*/
@@ -63,55 +125,13 @@
private boolean isServing;
- /**
- * Default constructors.
- */
-
- protected TServer(TProcessorFactory processorFactory,
- TServerTransport serverTransport) {
- this(processorFactory,
- serverTransport,
- new TTransportFactory(),
- new TTransportFactory(),
- new TBinaryProtocol.Factory(),
- new TBinaryProtocol.Factory());
- }
-
- protected TServer(TProcessorFactory processorFactory,
- TServerTransport serverTransport,
- TTransportFactory transportFactory) {
- this(processorFactory,
- serverTransport,
- transportFactory,
- transportFactory,
- new TBinaryProtocol.Factory(),
- new TBinaryProtocol.Factory());
- }
-
- protected TServer(TProcessorFactory processorFactory,
- TServerTransport serverTransport,
- TTransportFactory transportFactory,
- TProtocolFactory protocolFactory) {
- this(processorFactory,
- serverTransport,
- transportFactory,
- transportFactory,
- protocolFactory,
- protocolFactory);
- }
-
- protected TServer(TProcessorFactory processorFactory,
- TServerTransport serverTransport,
- TTransportFactory inputTransportFactory,
- TTransportFactory outputTransportFactory,
- TProtocolFactory inputProtocolFactory,
- TProtocolFactory outputProtocolFactory) {
- processorFactory_ = processorFactory;
- serverTransport_ = serverTransport;
- inputTransportFactory_ = inputTransportFactory;
- outputTransportFactory_ = outputTransportFactory;
- inputProtocolFactory_ = inputProtocolFactory;
- outputProtocolFactory_ = outputProtocolFactory;
+ protected TServer(AbstractServerArgs args) {
+ processorFactory_ = args.processorFactory;
+ serverTransport_ = args.serverTransport;
+ inputTransportFactory_ = args.inputTransportFactory;
+ outputTransportFactory_ = args.outputTransportFactory;
+ inputProtocolFactory_ = args.inputProtocolFactory;
+ outputProtocolFactory_ = args.outputProtocolFactory;
}
/**
diff --git a/lib/java/src/org/apache/thrift/server/TSimpleServer.java b/lib/java/src/org/apache/thrift/server/TSimpleServer.java
index 97ba0ad..ef1b10a 100644
--- a/lib/java/src/org/apache/thrift/server/TSimpleServer.java
+++ b/lib/java/src/org/apache/thrift/server/TSimpleServer.java
@@ -21,13 +21,9 @@
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
-import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.transport.TTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,53 +37,10 @@
private boolean stopped_ = false;
- public TSimpleServer(TProcessor processor,
- TServerTransport serverTransport) {
- super(new TProcessorFactory(processor), serverTransport);
+ public TSimpleServer(AbstractServerArgs args) {
+ super(args);
}
- public TSimpleServer(TProcessor processor,
- TServerTransport serverTransport,
- TTransportFactory transportFactory,
- TProtocolFactory protocolFactory) {
- super(new TProcessorFactory(processor), serverTransport, transportFactory, protocolFactory);
- }
-
- public TSimpleServer(TProcessor processor,
- TServerTransport serverTransport,
- TTransportFactory inputTransportFactory,
- TTransportFactory outputTransportFactory,
- TProtocolFactory inputProtocolFactory,
- TProtocolFactory outputProtocolFactory) {
- super(new TProcessorFactory(processor), serverTransport,
- inputTransportFactory, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory);
- }
-
- public TSimpleServer(TProcessorFactory processorFactory,
- TServerTransport serverTransport) {
- super(processorFactory, serverTransport);
- }
-
- public TSimpleServer(TProcessorFactory processorFactory,
- TServerTransport serverTransport,
- TTransportFactory transportFactory,
- TProtocolFactory protocolFactory) {
- super(processorFactory, serverTransport, transportFactory, protocolFactory);
- }
-
- public TSimpleServer(TProcessorFactory processorFactory,
- TServerTransport serverTransport,
- TTransportFactory inputTransportFactory,
- TTransportFactory outputTransportFactory,
- TProtocolFactory inputProtocolFactory,
- TProtocolFactory outputProtocolFactory) {
- super(processorFactory, serverTransport,
- inputTransportFactory, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory);
- }
-
-
public void serve() {
stopped_ = false;
try {
diff --git a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
index 6af2208..85537bf 100644
--- a/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
+++ b/lib/java/src/org/apache/thrift/server/TThreadPoolServer.java
@@ -20,21 +20,16 @@
package org.apache.thrift.server;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
-import org.apache.thrift.TProcessorFactory;
-import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.transport.TTransportFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,126 +40,53 @@
*
*/
public class TThreadPoolServer extends TServer {
-
private static final Logger LOGGER = LoggerFactory.getLogger(TThreadPoolServer.class.getName());
+ public static class Args extends AbstractServerArgs<Args> {
+ public int minWorkerThreads = 5;
+ public int maxWorkerThreads = Integer.MAX_VALUE;
+ public int stopTimeoutVal = 60;
+ public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+
+ public Args(TServerTransport transport) {
+ super(transport);
+ }
+
+ public Args minWorkerThreads(int n) {
+ minWorkerThreads = n;
+ return this;
+ }
+
+ public Args maxWorkerThreads(int n) {
+ maxWorkerThreads = n;
+ return this;
+ }
+ }
+
// Executor service for handling client connections
private ExecutorService executorService_;
// Flag for stopping the server
private volatile boolean stopped_;
- // Server options
- private Options options_;
+ private final TimeUnit stopTimeoutUnit;
- // Customizable server options
- public static class Options {
- public int minWorkerThreads = 5;
- public int maxWorkerThreads = Integer.MAX_VALUE;
- public int stopTimeoutVal = 60;
- public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
- }
+ private final long stopTimeoutVal;
- public TThreadPoolServer(TProcessor processor,
- TServerTransport serverTransport) {
- this(processor, serverTransport,
- new TTransportFactory(), new TTransportFactory(),
- new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory());
- }
-
- public TThreadPoolServer(TProcessorFactory processorFactory,
- TServerTransport serverTransport) {
- this(processorFactory, serverTransport,
- new TTransportFactory(), new TTransportFactory(),
- new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory());
- }
-
- public TThreadPoolServer(TProcessor processor,
- TServerTransport serverTransport,
- TProtocolFactory protocolFactory) {
- this(processor, serverTransport,
- new TTransportFactory(), new TTransportFactory(),
- protocolFactory, protocolFactory);
- }
-
- public TThreadPoolServer(TProcessor processor,
- TServerTransport serverTransport,
- TTransportFactory transportFactory,
- TProtocolFactory protocolFactory) {
- this(processor, serverTransport,
- transportFactory, transportFactory,
- protocolFactory, protocolFactory);
- }
-
- public TThreadPoolServer(TProcessorFactory processorFactory,
- TServerTransport serverTransport,
- TTransportFactory transportFactory,
- TProtocolFactory protocolFactory) {
- this(processorFactory, serverTransport,
- transportFactory, transportFactory,
- protocolFactory, protocolFactory);
- }
-
- public TThreadPoolServer(TProcessor processor,
- TServerTransport serverTransport,
- TTransportFactory inputTransportFactory,
- TTransportFactory outputTransportFactory,
- TProtocolFactory inputProtocolFactory,
- TProtocolFactory outputProtocolFactory) {
- this(new TProcessorFactory(processor), serverTransport,
- inputTransportFactory, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory);
- }
-
- public TThreadPoolServer(TProcessorFactory processorFactory,
- TServerTransport serverTransport,
- TTransportFactory inputTransportFactory,
- TTransportFactory outputTransportFactory,
- TProtocolFactory inputProtocolFactory,
- TProtocolFactory outputProtocolFactory) {
- super(processorFactory, serverTransport,
- inputTransportFactory, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory);
- options_ = new Options();
- executorService_ = Executors.newCachedThreadPool();
- }
-
- public TThreadPoolServer(TProcessor processor,
- TServerTransport serverTransport,
- TTransportFactory inputTransportFactory,
- TTransportFactory outputTransportFactory,
- TProtocolFactory inputProtocolFactory,
- TProtocolFactory outputProtocolFactory,
- Options options) {
- this(new TProcessorFactory(processor), serverTransport,
- inputTransportFactory, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory,
- options);
- }
-
- public TThreadPoolServer(TProcessorFactory processorFactory,
- TServerTransport serverTransport,
- TTransportFactory inputTransportFactory,
- TTransportFactory outputTransportFactory,
- TProtocolFactory inputProtocolFactory,
- TProtocolFactory outputProtocolFactory,
- Options options) {
- super(processorFactory, serverTransport,
- inputTransportFactory, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory);
-
- executorService_ = null;
+ public TThreadPoolServer(Args args) {
+ super(args);
SynchronousQueue<Runnable> executorQueue =
new SynchronousQueue<Runnable>();
- executorService_ = new ThreadPoolExecutor(options.minWorkerThreads,
- options.maxWorkerThreads,
+ stopTimeoutUnit = args.stopTimeoutUnit;
+ stopTimeoutVal = args.stopTimeoutVal;
+
+ executorService_ = new ThreadPoolExecutor(args.minWorkerThreads,
+ args.maxWorkerThreads,
60,
TimeUnit.SECONDS,
executorQueue);
-
- options_ = options;
}
@@ -198,7 +120,7 @@
// exception. If we don't do this, then we'll shut down prematurely. We want
// to let the executorService clear it's task queue, closing client sockets
// appropriately.
- long timeoutMS = options_.stopTimeoutUnit.toMillis(options_.stopTimeoutVal);
+ long timeoutMS = stopTimeoutUnit.toMillis(stopTimeoutVal);
long now = System.currentTimeMillis();
while (timeoutMS >= 0) {
try {
diff --git a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
index 72a57bc..d88b8a5 100644
--- a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
+++ b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
@@ -34,6 +34,7 @@
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.ServerTestBase;
import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.THsHaServer.Args;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingSocket;
@@ -46,13 +47,13 @@
import thrift.test.Srv.AsyncClient.voidMethod_call;
public class TestTAsyncClientManager extends TestCase {
-
+
private THsHaServer server_;
private Thread serverThread_;
private TAsyncClientManager clientManager_;
-
+
public void setUp() throws Exception {
- server_ = new THsHaServer(new Srv.Processor(new SrvHandler()), new TNonblockingServerSocket(ServerTestBase.PORT));
+ server_ = new THsHaServer(new Args(new TNonblockingServerSocket(ServerTestBase.PORT)).processor(new Srv.Processor(new SrvHandler())));
serverThread_ = new Thread(new Runnable() {
public void run() {
server_.serve();
@@ -62,18 +63,18 @@
clientManager_ = new TAsyncClientManager();
Thread.sleep(500);
}
-
+
public void tearDown() throws Exception {
server_.stop();
clientManager_.stop();
serverThread_.join();
}
-
+
public void testBasicCall() throws Exception {
Srv.AsyncClient client = getClient();
basicCall(client);
}
-
+
public void testBasicCallWithTimeout() throws Exception {
Srv.AsyncClient client = getClient();
client.setTimeout(5000);
diff --git a/lib/java/test/org/apache/thrift/server/TestHsHaServer.java b/lib/java/test/org/apache/thrift/server/TestHsHaServer.java
index f80560b..6638a33 100644
--- a/lib/java/test/org/apache/thrift/server/TestHsHaServer.java
+++ b/lib/java/test/org/apache/thrift/server/TestHsHaServer.java
@@ -20,10 +20,11 @@
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.THsHaServer.Args;
import org.apache.thrift.transport.TNonblockingServerSocket;
public class TestHsHaServer extends TestNonblockingServer {
protected TServer getServer(TProcessor processor, TNonblockingServerSocket socket, TProtocolFactory protoFactory) {
- return new THsHaServer(processor, socket, protoFactory);
+ return new THsHaServer(new Args(socket).processor(processor).protocolFactory(protoFactory));
}
}
diff --git a/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java b/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java
index e202435..52b62c3 100644
--- a/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java
+++ b/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java
@@ -21,6 +21,7 @@
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TNonblockingServer.Args;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TTransport;
@@ -31,7 +32,7 @@
private TServer server;
protected TServer getServer(TProcessor processor, TNonblockingServerSocket socket, TProtocolFactory protoFactory) {
- return new TNonblockingServer(processor, socket, protoFactory);
+ return new TNonblockingServer(new Args(socket).processor(processor).protocolFactory(protoFactory));
}
@Override
diff --git a/lib/java/test/org/apache/thrift/transport/TestTSSLTransportFactory.java b/lib/java/test/org/apache/thrift/transport/TestTSSLTransportFactory.java
index 6066f00..4bba451 100644
--- a/lib/java/test/org/apache/thrift/transport/TestTSSLTransportFactory.java
+++ b/lib/java/test/org/apache/thrift/transport/TestTSSLTransportFactory.java
@@ -28,6 +28,7 @@
import org.apache.thrift.server.ServerTestBase;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
+import org.apache.thrift.server.TServer.Args;
public class TestTSSLTransportFactory extends ServerTestBase {
private Thread serverThread;
@@ -52,7 +53,7 @@
public void run() {
try {
TServerTransport serverTransport = TSSLTransportFactory.getServerSocket(PORT);
- server = new TSimpleServer(processor, serverTransport);
+ server = new TSimpleServer(new Args(serverTransport).processor(processor));
server.serve();
} catch (TTransportException e) {
e.printStackTrace();
diff --git a/lib/java/test/org/apache/thrift/transport/TestTSaslTransports.java b/lib/java/test/org/apache/thrift/transport/TestTSaslTransports.java
index 10da6d1..dfd087f 100644
--- a/lib/java/test/org/apache/thrift/transport/TestTSaslTransports.java
+++ b/lib/java/test/org/apache/thrift/transport/TestTSaslTransports.java
@@ -44,6 +44,7 @@
import org.apache.thrift.server.ServerTestBase;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TSimpleServer;
+import org.apache.thrift.server.TServer.Args;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -284,7 +285,7 @@
TTransportFactory factory = new TSaslServerTransport.Factory(
WRAPPED_MECHANISM, SERVICE, HOST, WRAPPED_PROPS,
new TestSaslCallbackHandler(PASSWORD));
- server = new TSimpleServer(processor, socket, factory, protoFactory);
+ server = new TSimpleServer(new Args(socket).processor(processor).transportFactory(factory).protocolFactory(protoFactory));
// Run it
LOGGER.debug("Starting the server on port {}", PORT);