THRIFT-1868:Make the TPC backlog configurable in the Java servers
Client: java
Patch: Jean-Daniel Cryans
Makes TServerSocket backlog configurable.
diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java
index 25b487e..112d939 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java
@@ -54,6 +54,9 @@
*/
private int clientTimeout_ = 0;
+ public static class NonblockingAbstractServerSocketArgs extends
+ AbstractServerTransportArgs<NonblockingAbstractServerSocketArgs> {}
+
/**
* Creates just a port listening server socket
*/
@@ -65,7 +68,7 @@
* Creates just a port listening server socket
*/
public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException {
- this(new InetSocketAddress(port), clientTimeout);
+ this(new NonblockingAbstractServerSocketArgs().port(port).clientTimeout(clientTimeout));
}
public TNonblockingServerSocket(InetSocketAddress bindAddr) throws TTransportException {
@@ -73,7 +76,11 @@
}
public TNonblockingServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
- clientTimeout_ = clientTimeout;
+ this(new NonblockingAbstractServerSocketArgs().bindAddr(bindAddr).clientTimeout(clientTimeout));
+ }
+
+ public TNonblockingServerSocket(NonblockingAbstractServerSocketArgs args) throws TTransportException {
+ clientTimeout_ = args.clientTimeout;
try {
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
@@ -83,10 +90,10 @@
// Prevent 2MSL delay problem on server restarts
serverSocket_.setReuseAddress(true);
// Bind to listening port
- serverSocket_.bind(bindAddr);
+ serverSocket_.bind(args.bindAddr, args.backlog);
} catch (IOException ioe) {
serverSocket_ = null;
- throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + ".");
+ throw new TTransportException("Could not create ServerSocket on address " + args.bindAddr.toString() + ".");
}
}
diff --git a/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java b/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java
index 044e06a..ca27729 100755
--- a/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java
+++ b/lib/java/src/org/apache/thrift/transport/TSSLTransportFactory.java
@@ -33,7 +33,7 @@
import javax.net.ssl.TrustManagerFactory;
/**
- * A Factory for providing and setting up Client and Server SSL wrapped
+ * A Factory for providing and setting up Client and Server SSL wrapped
* TSocket and TServerSocket
*/
public class TSSLTransportFactory {
@@ -42,24 +42,24 @@
* Get a SSL wrapped TServerSocket bound to the specified port. In this
* configuration the default settings are used. Default settings are retrieved
* from System properties that are set.
- *
+ *
* Example system properties:
* -Djavax.net.ssl.trustStore=<truststore location>
* -Djavax.net.ssl.trustStorePassword=password
* -Djavax.net.ssl.keyStore=<keystore location>
* -Djavax.net.ssl.keyStorePassword=password
- *
+ *
* @param port
* @return A SSL wrapped TServerSocket
* @throws TTransportException
*/
public static TServerSocket getServerSocket(int port) throws TTransportException {
- return getServerSocket(port, 0);
+ return getServerSocket(port, 0);
}
/**
* Get a default SSL wrapped TServerSocket bound to the specified port
- *
+ *
* @param port
* @param clientTimeout
* @return A SSL wrapped TServerSocket
@@ -71,7 +71,7 @@
/**
* Get a default SSL wrapped TServerSocket bound to the specified port and interface
- *
+ *
* @param port
* @param clientTimeout
* @param ifAddress
@@ -80,14 +80,14 @@
*/
public static TServerSocket getServerSocket(int port, int clientTimeout, boolean clientAuth, InetAddress ifAddress) throws TTransportException {
SSLServerSocketFactory factory = (SSLServerSocketFactory) SSLServerSocketFactory.getDefault();
- return createServer(factory, port, clientTimeout, clientAuth, ifAddress, null);
+ return createServer(factory, port, clientTimeout, clientAuth, ifAddress, null);
}
/**
- * Get a configured SSL wrapped TServerSocket bound to the specified port and interface.
- * Here the TSSLTransportParameters are used to set the values for the algorithms, keystore,
+ * Get a configured SSL wrapped TServerSocket bound to the specified port and interface.
+ * Here the TSSLTransportParameters are used to set the values for the algorithms, keystore,
* truststore and other settings
- *
+ *
* @param port
* @param clientTimeout
* @param ifAddress
@@ -113,7 +113,8 @@
if (params != null && params.cipherSuites != null) {
serverSocket.setEnabledCipherSuites(params.cipherSuites);
}
- return new TServerSocket(serverSocket, timeout);
+ return new TServerSocket(new TServerSocket.ServerSocketTransportArgs().
+ serverSocket(serverSocket).clientTimeout(timeout));
} catch (Exception e) {
throw new TTransportException("Could not bind to port " + port, e);
}
@@ -121,9 +122,9 @@
/**
* Get a default SSL wrapped TSocket connected to the specified host and port. All
- * the client methods return a bound connection. So there is no need to call open() on the
+ * the client methods return a bound connection. So there is no need to call open() on the
* TTransport.
- *
+ *
* @param host
* @param port
* @param timeout
@@ -137,7 +138,7 @@
/**
* Get a default SSL wrapped TSocket connected to the specified host and port.
- *
+ *
* @param host
* @param port
* @return A SSL wrapped TSocket
@@ -148,9 +149,9 @@
}
/**
- * Get a custom configured SSL wrapped TSocket. The SSL settings are obtained from the
+ * Get a custom configured SSL wrapped TSocket. The SSL settings are obtained from the
* passed in TSSLTransportParameters.
- *
+ *
* @param host
* @param port
* @param timeout
@@ -250,7 +251,7 @@
/**
* Create parameters specifying the protocol and cipher suites
- *
+ *
* @param protocol The specific protocol (TLS/SSL) can be specified with versions
* @param cipherSuites
*/
@@ -261,7 +262,7 @@
/**
* Create parameters specifying the protocol, cipher suites and if client authentication
* is required
- *
+ *
* @param protocol The specific protocol (TLS/SSL) can be specified with versions
* @param cipherSuites
* @param clientAuth
@@ -276,7 +277,7 @@
/**
* Set the keystore, password, certificate type and the store type
- *
+ *
* @param keyStore Location of the Keystore on disk
* @param keyPass Keystore password
* @param keyManagerType The default is X509
@@ -296,7 +297,7 @@
/**
* Set the keystore and password
- *
+ *
* @param keyStore Location of the Keystore on disk
* @param keyPass Keystore password
*/
@@ -306,7 +307,7 @@
/**
* Set the truststore, password, certificate type and the store type
- *
+ *
* @param trustStore Location of the Truststore on disk
* @param trustPass Truststore password
* @param trustManagerType The default is X509
@@ -326,7 +327,7 @@
/**
* Set the truststore and password
- *
+ *
* @param trustStore Location of the Truststore on disk
* @param trustPass Truststore password
*/
@@ -336,7 +337,7 @@
/**
* Set if client authentication is required
- *
+ *
* @param clientAuth
*/
public void requireClientAuth(boolean clientAuth) {
diff --git a/lib/java/src/org/apache/thrift/transport/TServerSocket.java b/lib/java/src/org/apache/thrift/transport/TServerSocket.java
index 147074a..8345d44 100644
--- a/lib/java/src/org/apache/thrift/transport/TServerSocket.java
+++ b/lib/java/src/org/apache/thrift/transport/TServerSocket.java
@@ -46,19 +46,27 @@
*/
private int clientTimeout_ = 0;
+ public static class ServerSocketTransportArgs extends AbstractServerTransportArgs<ServerSocketTransportArgs> {
+ ServerSocket serverSocket;
+
+ public ServerSocketTransportArgs serverSocket(ServerSocket serverSocket) {
+ this.serverSocket = serverSocket;
+ return this;
+ }
+ }
+
/**
* Creates a server socket from underlying socket object
*/
- public TServerSocket(ServerSocket serverSocket) {
+ public TServerSocket(ServerSocket serverSocket) throws TTransportException {
this(serverSocket, 0);
}
/**
* Creates a server socket from underlying socket object
*/
- public TServerSocket(ServerSocket serverSocket, int clientTimeout) {
- serverSocket_ = serverSocket;
- clientTimeout_ = clientTimeout;
+ public TServerSocket(ServerSocket serverSocket, int clientTimeout) throws TTransportException {
+ this(new ServerSocketTransportArgs().serverSocket(serverSocket).clientTimeout(clientTimeout));
}
/**
@@ -80,17 +88,25 @@
}
public TServerSocket(InetSocketAddress bindAddr, int clientTimeout) throws TTransportException {
- clientTimeout_ = clientTimeout;
+ this(new ServerSocketTransportArgs().bindAddr(bindAddr).clientTimeout(clientTimeout));
+ }
+
+ public TServerSocket(ServerSocketTransportArgs args) throws TTransportException {
+ clientTimeout_ = args.clientTimeout;
+ if (args.serverSocket != null) {
+ this.serverSocket_ = args.serverSocket;
+ return;
+ }
try {
// Make server socket
serverSocket_ = new ServerSocket();
// Prevent 2MSL delay problem on server restarts
serverSocket_.setReuseAddress(true);
// Bind to listening port
- serverSocket_.bind(bindAddr);
+ serverSocket_.bind(args.bindAddr, args.backlog);
} catch (IOException ioe) {
serverSocket_ = null;
- throw new TTransportException("Could not create ServerSocket on address " + bindAddr.toString() + ".");
+ throw new TTransportException("Could not create ServerSocket on address " + args.bindAddr.toString() + ".");
}
}
diff --git a/lib/java/src/org/apache/thrift/transport/TServerTransport.java b/lib/java/src/org/apache/thrift/transport/TServerTransport.java
index e03ec4c..424e4fa 100644
--- a/lib/java/src/org/apache/thrift/transport/TServerTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TServerTransport.java
@@ -20,6 +20,7 @@
package org.apache.thrift.transport;
import java.io.Closeable;
+import java.net.InetSocketAddress;
/**
* Server transport. Object which provides client transports.
@@ -27,6 +28,32 @@
*/
public abstract class TServerTransport implements Closeable {
+ public static abstract class AbstractServerTransportArgs<T extends AbstractServerTransportArgs<T>> {
+ int backlog = 0; // A value of 0 means the default value will be used (currently set at 50)
+ int clientTimeout = 0;
+ InetSocketAddress bindAddr;
+
+ public T backlog(int backlog) {
+ this.backlog = backlog;
+ return (T) this;
+ }
+
+ public T clientTimeout(int clientTimeout) {
+ this.clientTimeout = clientTimeout;
+ return (T) this;
+ }
+
+ public T port(int port) {
+ this.bindAddr = new InetSocketAddress(port);
+ return (T) this;
+ }
+
+ public T bindAddr(InetSocketAddress bindAddr) {
+ this.bindAddr = bindAddr;
+ return (T) this;
+ }
+ }
+
public abstract void listen() throws TTransportException;
public final TTransport accept() throws TTransportException {
diff --git a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
index d88b8a5..12d0eaf 100644
--- a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
+++ b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
@@ -53,7 +53,9 @@
private TAsyncClientManager clientManager_;
public void setUp() throws Exception {
- server_ = new THsHaServer(new Args(new TNonblockingServerSocket(ServerTestBase.PORT)).processor(new Srv.Processor(new SrvHandler())));
+ server_ = new THsHaServer(new Args(new TNonblockingServerSocket(
+ new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs().port(ServerTestBase.PORT))).
+ processor(new Srv.Processor(new SrvHandler())));
serverThread_ = new Thread(new Runnable() {
public void run() {
server_.serve();
@@ -79,8 +81,8 @@
Srv.AsyncClient client = getClient();
client.setTimeout(5000);
basicCall(client);
- }
-
+ }
+
public void testTimeoutCall() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
Srv.AsyncClient client = getClient();
@@ -98,7 +100,7 @@
latch.countDown();
}
}
-
+
@Override
public void onComplete(primitiveMethod_call response) {
try {
@@ -111,8 +113,8 @@
latch.await(2, TimeUnit.SECONDS);
assertTrue(client.hasError());
assertTrue(client.getError() instanceof TimeoutException);
- }
-
+ }
+
public void testVoidCall() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean returned = new AtomicBoolean(false);
@@ -132,8 +134,8 @@
});
latch.await(1, TimeUnit.SECONDS);
assertTrue(returned.get());
- }
-
+ }
+
public void testOnewayCall() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean returned = new AtomicBoolean(false);
@@ -153,8 +155,8 @@
});
latch.await(1, TimeUnit.SECONDS);
assertTrue(returned.get());
- }
-
+ }
+
public void testParallelCalls() throws Exception {
// make multiple calls with deserialization in the selector thread (repro Eric's issue)
int numThreads = 50;
@@ -176,13 +178,13 @@
numSuccesses += runnable.getNumSuccesses();
}
assertEquals(numThreads * numCallsPerThread, numSuccesses);
- }
-
+ }
+
private Srv.AsyncClient getClient() throws IOException {
TNonblockingSocket clientSocket = new TNonblockingSocket(ServerTestBase.HOST, ServerTestBase.PORT);
return new Srv.AsyncClient(new TBinaryProtocol.Factory(), clientManager_, clientSocket);
}
-
+
private void basicCall(Srv.AsyncClient client) throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean returned = new AtomicBoolean(false);
@@ -198,7 +200,7 @@
latch.countDown();
}
}
-
+
@Override
public void onError(Exception exception) {
try {
@@ -213,7 +215,7 @@
latch.await(100, TimeUnit.SECONDS);
assertTrue(returned.get());
}
-
+
public class SrvHandler implements Iface {
// Use this method for a standard call testing
@Override
@@ -232,7 +234,7 @@
}
return 0;
}
-
+
@Override
public void methodWithDefaultArgs(int something) throws TException { }
@@ -249,20 +251,20 @@
public void onewayMethod() throws TException {
}
}
-
+
private static abstract class FailureLessCallback<T extends TAsyncMethodCall> implements AsyncMethodCallback<T> {
@Override
public void onError(Exception exception) {
fail(exception);
}
}
-
+
private static void fail(Exception exception) {
StringWriter sink = new StringWriter();
exception.printStackTrace(new PrintWriter(sink, true));
fail("unexpected error " + sink.toString());
}
-
+
private class JankyRunnable implements Runnable {
private int numCalls_;
private int numSuccesses_ = 0;
@@ -286,7 +288,7 @@
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean returned = new AtomicBoolean(false);
client_.Janky(1, new AsyncMethodCallback<Srv.AsyncClient.Janky_call>() {
-
+
@Override
public void onComplete(Janky_call response) {
try {
diff --git a/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java b/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java
index 7837695..3df3bd8 100644
--- a/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java
+++ b/lib/java/test/org/apache/thrift/server/TestNonblockingServer.java
@@ -53,7 +53,7 @@
try {
// Transport
TNonblockingServerSocket tServerSocket =
- new TNonblockingServerSocket(PORT);
+ new TNonblockingServerSocket(new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs().port(PORT));
server = getServer(processor, tServerSocket, protoFactory, factory);
diff --git a/lib/java/test/org/apache/thrift/test/TestNonblockingServer.java b/lib/java/test/org/apache/thrift/test/TestNonblockingServer.java
index 18343b0..41c4b65 100644
--- a/lib/java/test/org/apache/thrift/test/TestNonblockingServer.java
+++ b/lib/java/test/org/apache/thrift/test/TestNonblockingServer.java
@@ -52,7 +52,7 @@
// Transport
TNonblockingServerSocket tServerSocket =
- new TNonblockingServerSocket(port);
+ new TNonblockingServerSocket(new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs().port(port));
TServer serverEngine;
diff --git a/lib/java/test/org/apache/thrift/test/TestServer.java b/lib/java/test/org/apache/thrift/test/TestServer.java
index 125a773..ee0866e 100644
--- a/lib/java/test/org/apache/thrift/test/TestServer.java
+++ b/lib/java/test/org/apache/thrift/test/TestServer.java
@@ -124,12 +124,12 @@
protocol_type = args[i].split("=")[1];
protocol_type.trim();
} else if (args[i].startsWith("--transport")) {
- transport_type = args[i].split("=")[1];
+ transport_type = args[i].split("=")[1];
transport_type.trim();
} else if (args[i].equals("--ssl")) {
ssl = true;
} else if (args[i].equals("--help")) {
- System.out.println("Allowed options:");
+ System.out.println("Allowed options:");
System.out.println(" --help\t\t\tProduce help message");
System.out.println(" --port=arg (=" + port + ")\tPort number to connect");
System.out.println(" --transport=arg (=" + transport_type + ")\n\t\t\t\tTransport: buffered, framed, fastframed");
@@ -143,7 +143,7 @@
System.err.println("Can not parse arguments! See --help");
System.exit(1);
}
-
+
try {
if (server_type.equals("simple")) {
} else if (server_type.equals("thread-pool")) {
@@ -156,13 +156,13 @@
throw new Exception("SSL is not supported over nonblocking servers!");
}
} else {
- throw new Exception("Unknown server type! " + server_type);
+ throw new Exception("Unknown server type! " + server_type);
}
if (protocol_type.equals("binary")) {
} else if (protocol_type.equals("json")) {
} else if (protocol_type.equals("compact")) {
} else {
- throw new Exception("Unknown protocol type! " + protocol_type);
+ throw new Exception("Unknown protocol type! " + protocol_type);
}
if (transport_type.equals("buffered")) {
} else if (transport_type.equals("framed")) {
@@ -171,7 +171,7 @@
throw new Exception("Unknown transport type! " + transport_type);
}
} catch (Exception e) {
- System.err.println("Error: " + e.getMessage());
+ System.err.println("Error: " + e.getMessage());
System.exit(1);
}
@@ -204,15 +204,15 @@
TServer serverEngine = null;
- if (server_type.equals("nonblocking") ||
+ if (server_type.equals("nonblocking") ||
server_type.equals("threaded-selector")) {
// Nonblocking servers
TNonblockingServerSocket tNonblockingServerSocket =
- new TNonblockingServerSocket(port);
-
+ new TNonblockingServerSocket(new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs().port(port));
+
if (server_type.equals("nonblocking")) {
// Nonblocking Server
- TNonblockingServer.Args tNonblockingServerArgs
+ TNonblockingServer.Args tNonblockingServerArgs
= new TNonblockingServer.Args(tNonblockingServerSocket);
tNonblockingServerArgs.processor(testProcessor);
tNonblockingServerArgs.protocolFactory(tProtocolFactory);
@@ -221,12 +221,12 @@
serverEngine = new TNonblockingServer(tNonblockingServerArgs);
} else { // server_type.equals("threaded-selector")
// ThreadedSelector Server
- TThreadedSelectorServer.Args tThreadedSelectorServerArgs
+ TThreadedSelectorServer.Args tThreadedSelectorServerArgs
= new TThreadedSelectorServer.Args(tNonblockingServerSocket);
tThreadedSelectorServerArgs.processor(testProcessor);
tThreadedSelectorServerArgs.protocolFactory(tProtocolFactory);
tThreadedSelectorServerArgs.transportFactory(tTransportFactory);
-
+
serverEngine = new TThreadedSelectorServer(tThreadedSelectorServerArgs);
}
} else {
@@ -237,7 +237,7 @@
if (ssl) {
tServerSocket = TSSLTransportFactory.getServerSocket(port, 0);
} else {
- tServerSocket = new TServerSocket(port);
+ tServerSocket = new TServerSocket(new TServerSocket.ServerSocketTransportArgs().port(port));
}
if (server_type.equals("simple")) {
@@ -250,7 +250,7 @@
serverEngine = new TSimpleServer(tServerArgs);
} else { // server_type.equals("threadpool")
// ThreadPool Server
- TThreadPoolServer.Args tThreadPoolServerArgs
+ TThreadPoolServer.Args tThreadPoolServerArgs
= new TThreadPoolServer.Args(tServerSocket);
tThreadPoolServerArgs.processor(testProcessor);
tThreadPoolServerArgs.protocolFactory(tProtocolFactory);
diff --git a/lib/java/test/org/apache/thrift/transport/TestTSaslTransports.java b/lib/java/test/org/apache/thrift/transport/TestTSaslTransports.java
index 41d08f6..80e53b9 100644
--- a/lib/java/test/org/apache/thrift/transport/TestTSaslTransports.java
+++ b/lib/java/test/org/apache/thrift/transport/TestTSaslTransports.java
@@ -73,7 +73,7 @@
+ "score and seven years ago our fathers brought forth on this "
+ "continent a new nation, conceived in liberty, and dedicated to the "
+ "proposition that all men are created equal.";
-
+
private static final String testMessage2 = "I have a dream that one day "
+ "this nation will rise up and live out the true meaning of its creed: "
+ "'We hold these truths to be self-evident, that all men are created equal.'";
@@ -123,7 +123,9 @@
}
private void internalRun() throws Exception {
- TServerSocket serverSocket = new TServerSocket(ServerTestBase.PORT);
+ TServerSocket serverSocket = new TServerSocket(
+ new TServerSocket.ServerSocketTransportArgs().
+ port(ServerTestBase.PORT));
try {
acceptAndWrite(serverSocket);
} finally {
@@ -280,7 +282,7 @@
public void run() {
try {
// Transport
- TServerSocket socket = new TServerSocket(PORT);
+ TServerSocket socket = new TServerSocket(new TServerSocket.ServerSocketTransportArgs().port(PORT));
TTransportFactory factory = new TSaslServerTransport.Factory(
WRAPPED_MECHANISM, SERVICE, HOST, WRAPPED_PROPS,