THRIFT-888. java: async client should also have nonblocking connect
This patch adds optional nonblocking connect behavior.
Patch: Eric Jensen
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@995262 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java b/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
index 5464d7e..d88b6ca 100644
--- a/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
+++ b/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
@@ -36,7 +36,6 @@
/**
* Contains selector thread which transitions method call objects
*/
-@SuppressWarnings("unchecked")
public class TAsyncClientManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TAsyncClientManager.class.getName());
@@ -60,7 +59,7 @@
private class SelectThread extends Thread {
// Selector waits at most SELECT_TIME milliseconds before waking
- private static final long SELECT_TIME = 200;
+ private static final long SELECT_TIME = 5;
private final Selector selector;
private volatile boolean running;
@@ -85,14 +84,18 @@
public void run() {
while (running) {
try {
- selector.select(SELECT_TIME);
- } catch (IOException e) {
- LOGGER.error("Caught IOException in TAsyncClientManager!", e);
- }
+ try {
+ selector.select(SELECT_TIME);
+ } catch (IOException e) {
+ LOGGER.error("Caught IOException in TAsyncClientManager!", e);
+ }
- transitionMethods();
- timeoutIdleMethods();
- startPendingMethods();
+ transitionMethods();
+ timeoutIdleMethods();
+ startPendingMethods();
+ } catch (Throwable throwable) {
+ LOGGER.error("Ignoring uncaught exception in SelectThread", throwable);
+ }
}
}
@@ -129,10 +132,11 @@
TAsyncMethodCall methodCall = iterator.next();
long clientTimeout = methodCall.getClient().getTimeout();
long timeElapsed = System.currentTimeMillis() - methodCall.getLastTransitionTime();
+
if (timeElapsed > clientTimeout) {
iterator.remove();
- methodCall.onError(new TimeoutException("Operation " +
- methodCall.getClass() + " timed out after " + timeElapsed +
+ methodCall.onError(new TimeoutException("Operation " +
+ methodCall.getClass() + " timed out after " + timeElapsed +
" milliseconds."));
}
}
@@ -144,8 +148,7 @@
while ((methodCall = pendingCalls.poll()) != null) {
// Catch registration errors. method will catch transition errors and cleanup.
try {
- SelectionKey key = methodCall.registerWithSelector(selector);
- methodCall.transition(key);
+ methodCall.start(selector);
// If timeout specified and first transition went smoothly, add to timeout watch set
TAsyncClient client = methodCall.getClient();
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
index 5568afb..e75f4ab 100644
--- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
+++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
@@ -39,7 +39,11 @@
* @param <T>
*/
public abstract class TAsyncMethodCall<T extends TAsyncMethodCall> {
+
+ private static final int INITIAL_MEMORY_BUFFER_SIZE = 128;
+
public static enum State {
+ CONNECTING,
WRITING_REQUEST_SIZE,
WRITING_REQUEST_BODY,
READING_RESPONSE_SIZE,
@@ -48,20 +52,22 @@
ERROR;
}
- private static final int INITIAL_MEMORY_BUFFER_SIZE = 128;
+ /**
+ * Next step in the call, initialized by start()
+ */
+ private State state = null;
protected final TNonblockingTransport transport;
private final TProtocolFactory protocolFactory;
protected final TAsyncClient client;
private final AsyncMethodCallback<T> callback;
private final boolean isOneway;
+
private long lastTransitionTime;
private ByteBuffer sizeBuffer;
private final byte[] sizeBufferArray = new byte[4];
-
private ByteBuffer frameBuffer;
- private State state;
protected TAsyncMethodCall(TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport, AsyncMethodCallback<T> callback, boolean isOneway) {
this.transport = transport;
@@ -69,8 +75,7 @@
this.protocolFactory = protocolFactory;
this.client = client;
this.isOneway = isOneway;
-
- this.state = State.WRITING_REQUEST_SIZE;
+ this.lastTransitionTime = System.currentTimeMillis();
}
protected State getState() {
@@ -91,6 +96,10 @@
protected abstract void write_args(TProtocol protocol) throws TException;
+ /**
+ * Initialize buffers.
+ * @throws TException if buffer initialization fails
+ */
protected void prepareMethodCall() throws TException {
TMemoryBuffer memoryBuffer = new TMemoryBuffer(INITIAL_MEMORY_BUFFER_SIZE);
TProtocol protocol = protocolFactory.getProtocol(memoryBuffer);
@@ -103,10 +112,32 @@
sizeBuffer = ByteBuffer.wrap(sizeBufferArray);
}
- SelectionKey registerWithSelector(Selector sel) throws IOException {
- SelectionKey key = transport.registerSelector(sel, SelectionKey.OP_WRITE);
+ /**
+ * Register with selector and start first state, which could be either connecting or writing.
+ * @throws IOException if register or starting fails
+ */
+ void start(Selector sel) throws IOException {
+ SelectionKey key;
+ if (transport.isOpen()) {
+ state = State.WRITING_REQUEST_SIZE;
+ key = transport.registerSelector(sel, SelectionKey.OP_WRITE);
+ } else {
+ state = State.CONNECTING;
+ key = transport.registerSelector(sel, SelectionKey.OP_CONNECT);
+
+ // non-blocking connect can complete immediately,
+ // in which case we should not expect the OP_CONNECT
+ if (transport.startConnect()) {
+ registerForFirstWrite(key);
+ }
+ }
+
key.attach(this);
- return key;
+ }
+
+ void registerForFirstWrite(SelectionKey key) throws IOException {
+ state = State.WRITING_REQUEST_SIZE;
+ key.interestOps(SelectionKey.OP_WRITE);
}
protected ByteBuffer getFrameBuffer() {
@@ -131,6 +162,9 @@
// Transition function
try {
switch (state) {
+ case CONNECTING:
+ doConnecting(key);
+ break;
case WRITING_REQUEST_SIZE:
doWritingRequestSize();
break;
@@ -143,9 +177,8 @@
case READING_RESPONSE_BODY:
doReadingResponseBody(key);
break;
- case RESPONSE_READ:
- case ERROR:
- throw new IllegalStateException("Method call in state " + state
+ default: // RESPONSE_READ, ERROR, or bug
+ throw new IllegalStateException("Method call in state " + state
+ " but selector called transition method. Seems like a bug...");
}
lastTransitionTime = System.currentTimeMillis();
@@ -157,9 +190,9 @@
}
protected void onError(Throwable e) {
- state = State.ERROR;
client.onError(e);
callback.onError(e);
+ state = State.ERROR;
}
private void doReadingResponseBody(SelectionKey key) throws IOException {
@@ -213,4 +246,11 @@
state = State.WRITING_REQUEST_BODY;
}
}
+
+ private void doConnecting(SelectionKey key) throws IOException {
+ if (!key.isConnectable() || !transport.finishConnect()) {
+ throw new IOException("not connectable or finishConnect returned false after we got an OP_CONNECT");
+ }
+ registerForFirstWrite(key);
+ }
}
diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
index 31a6e24..8b98031 100644
--- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
@@ -62,7 +62,7 @@
LoggerFactory.getLogger(TNonblockingServer.class.getName());
// Flag for stopping the server
- private volatile boolean stopped_;
+ private volatile boolean stopped_ = true;
private SelectThread selectThread_;
@@ -218,6 +218,7 @@
// start the selector
try {
selectThread_ = new SelectThread((TNonblockingServerTransport)serverTransport_);
+ stopped_ = false;
selectThread_.start();
return true;
} catch (IOException e) {
@@ -265,6 +266,10 @@
selectThread_.requestSelectInterestChange(frameBuffer);
}
+ public boolean isStopped() {
+ return selectThread_.isStopped();
+ }
+
/**
* The thread that will be doing all the selecting, managing new connections
* and those that still need to be read.
@@ -288,14 +293,24 @@
serverTransport.registerSelector(selector);
}
+ public boolean isStopped() {
+ return stopped_;
+ }
+
/**
* The work loop. Handles both selecting (all IO operations) and managing
* the selection preferences of all existing connections.
*/
public void run() {
- while (!stopped_) {
- select();
- processInterestChanges();
+ try {
+ while (!stopped_) {
+ select();
+ processInterestChanges();
+ }
+ } catch (Throwable t) {
+ LOGGER.error("run() exiting due to uncaught error", t);
+ } finally {
+ stopped_ = true;
}
}
diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
index 313ef85..c3787a2 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
@@ -23,102 +23,92 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
+import org.apache.thrift.async.TAsyncMethodCall;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
- * Socket implementation of the TTransport interface. To be commented soon!
+ * Transport for use with async client.
*/
public class TNonblockingSocket extends TNonblockingTransport {
- private SocketChannel socketChannel = null;
+ private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingSocket.class.getName());
/**
- * Wrapped Socket object
+ * Host and port if passed in, used for lazy non-blocking connect.
*/
- private Socket socket_ = null;
+ private SocketAddress socketAddress_ = null;
+
+ private final SocketChannel socketChannel_;
+
+ private final Socket socket_;
+
+ public TNonblockingSocket(String host, int port) throws IOException {
+ this(host, port, 0);
+ }
/**
- * Socket timeout
- */
- private int timeout_ = 0;
-
- /**
- * Create a new nonblocking socket transport connected to host:port.
+ * Create a new nonblocking socket transport that will be connected to host:port.
* @param host
* @param port
* @throws TTransportException
* @throws IOException
*/
- public TNonblockingSocket(String host, int port) throws TTransportException, IOException {
- this(SocketChannel.open(new InetSocketAddress(host, port)));
+ public TNonblockingSocket(String host, int port, int timeout) throws IOException {
+ this(SocketChannel.open(), timeout);
+ socketAddress_ = new InetSocketAddress(host, port);
}
/**
* Constructor that takes an already created socket.
*
* @param socketChannel Already created SocketChannel object
- * @throws TTransportException if there is an error setting up the streams
+ * @throws IOException if there is an error setting up the streams
*/
- public TNonblockingSocket(SocketChannel socketChannel) throws TTransportException {
- try {
- // make it a nonblocking channel
- socketChannel.configureBlocking(false);
- } catch (IOException e) {
- throw new TTransportException(e);
- }
+ public TNonblockingSocket(SocketChannel socketChannel) throws IOException {
+ this(socketChannel, 0);
+ if (!socketChannel.isConnected()) throw new IOException("Socket must already be connected");
+ }
- this.socketChannel = socketChannel;
- this.socket_ = socketChannel.socket();
- try {
- socket_.setSoLinger(false, 0);
- socket_.setTcpNoDelay(true);
- } catch (SocketException sx) {
- sx.printStackTrace();
- }
+ private TNonblockingSocket(SocketChannel socketChannel, int timeout) throws IOException {
+ socketChannel_ = socketChannel;
+ socket_ = socketChannel.socket();
+
+ // make it a nonblocking channel
+ socketChannel.configureBlocking(false);
+ socket_.setSoLinger(false, 0);
+ socket_.setTcpNoDelay(true);
+ setTimeout(timeout);
}
/**
- * Register this socket with the specified selector for both read and write
- * operations.
+ * Register the new SocketChannel with our Selector, indicating
+ * we'd like to be notified when it's ready for I/O.
*
* @param selector
* @return the selection key for this socket.
*/
public SelectionKey registerSelector(Selector selector, int interests) throws IOException {
- // Register the new SocketChannel with our Selector, indicating
- // we'd like to be notified when there's data waiting to be read
- return socketChannel.register(selector, interests);
+ return socketChannel_.register(selector, interests);
}
/**
- * Initializes the socket object
- */
- private void initSocket() {
- socket_ = new Socket();
- try {
- socket_.setSoLinger(false, 0);
- socket_.setTcpNoDelay(true);
- socket_.setSoTimeout(timeout_);
- } catch (SocketException sx) {
- sx.printStackTrace();
- }
- }
-
- /**
- * Sets the socket timeout
+ * Sets the socket timeout, although this implementation never uses blocking operations so it is unused.
*
* @param timeout Milliseconds timeout
*/
public void setTimeout(int timeout) {
- timeout_ = timeout;
try {
socket_.setSoTimeout(timeout);
} catch (SocketException sx) {
- sx.printStackTrace();
+ LOGGER.warn("Could not set socket timeout.", sx);
}
}
@@ -126,9 +116,6 @@
* Returns a reference to the underlying socket.
*/
public Socket getSocket() {
- if (socket_ == null) {
- initSocket();
- }
return socket_;
}
@@ -136,24 +123,21 @@
* Checks whether the socket is connected.
*/
public boolean isOpen() {
- if (socket_ == null) {
- return false;
- }
return socket_.isConnected();
}
/**
- * Connects the socket, creating a new socket object if necessary.
+ * Do not call, the implementation provides its own lazy non-blocking connect.
*/
public void open() throws TTransportException {
- throw new RuntimeException("Not implemented yet");
+ throw new RuntimeException("open() is not implemented for TNonblockingSocket");
}
/**
* Perform a nonblocking read into buffer.
*/
public int read(ByteBuffer buffer) throws IOException {
- return socketChannel.read(buffer);
+ return socketChannel_.read(buffer);
}
@@ -161,12 +145,12 @@
* Reads from the underlying input stream if not null.
*/
public int read(byte[] buf, int off, int len) throws TTransportException {
- if ((socketChannel.validOps() & SelectionKey.OP_READ) != SelectionKey.OP_READ) {
+ if ((socketChannel_.validOps() & SelectionKey.OP_READ) != SelectionKey.OP_READ) {
throw new TTransportException(TTransportException.NOT_OPEN,
"Cannot read from write-only socket channel");
}
try {
- return socketChannel.read(ByteBuffer.wrap(buf, off, len));
+ return socketChannel_.read(ByteBuffer.wrap(buf, off, len));
} catch (IOException iox) {
throw new TTransportException(TTransportException.UNKNOWN, iox);
}
@@ -176,26 +160,26 @@
* Perform a nonblocking write of the data in buffer;
*/
public int write(ByteBuffer buffer) throws IOException {
- return socketChannel.write(buffer);
+ return socketChannel_.write(buffer);
}
/**
* Writes to the underlying output stream if not null.
*/
public void write(byte[] buf, int off, int len) throws TTransportException {
- if ((socketChannel.validOps() & SelectionKey.OP_WRITE) != SelectionKey.OP_WRITE) {
+ if ((socketChannel_.validOps() & SelectionKey.OP_WRITE) != SelectionKey.OP_WRITE) {
throw new TTransportException(TTransportException.NOT_OPEN,
"Cannot write to write-only socket channel");
}
try {
- socketChannel.write(ByteBuffer.wrap(buf, off, len));
+ socketChannel_.write(ByteBuffer.wrap(buf, off, len));
} catch (IOException iox) {
throw new TTransportException(TTransportException.UNKNOWN, iox);
}
}
/**
- * Flushes the underlying output stream if not null.
+ * Noop.
*/
public void flush() throws TTransportException {
// Not supported by SocketChannel.
@@ -206,10 +190,20 @@
*/
public void close() {
try {
- socketChannel.close();
- } catch (IOException e) {
- // silently ignore.
+ socketChannel_.close();
+ } catch (IOException iox) {
+ LOGGER.warn("Could not close socket.", iox);
}
}
+ /** {@inheritDoc} */
+ public boolean startConnect() throws IOException {
+ return socketChannel_.connect(socketAddress_);
+ }
+
+ /** {@inheritDoc} */
+ public boolean finishConnect() throws IOException {
+ return socketChannel_.finishConnect();
+ }
+
}
diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
index 517eacb..faf501f 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
@@ -24,8 +24,25 @@
import java.nio.channels.SelectionKey;
import java.nio.ByteBuffer;
+import org.apache.thrift.async.TAsyncMethodCall;
+
public abstract class TNonblockingTransport extends TTransport {
+
+ /**
+ * Non-blocking connection initialization.
+ * @see java.nio.channels.SocketChannel#connect(SocketAddress remote)
+ */
+ public abstract boolean startConnect() throws IOException;
+
+ /**
+ * Non-blocking connection completion.
+ * @see java.nio.channels.SocketChannel#finishConnect()
+ */
+ public abstract boolean finishConnect() throws IOException;
+
public abstract SelectionKey registerSelector(Selector selector, int interests) throws IOException;
+
public abstract int read(ByteBuffer buffer) throws IOException;
+
public abstract int write(ByteBuffer buffer) throws IOException;
}
diff --git a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
index 2e26aad..5e1084c 100644
--- a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
+++ b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
@@ -18,8 +18,12 @@
*/
package org.apache.thrift.async;
+import java.io.PrintWriter;
+import java.io.StringWriter;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -27,10 +31,10 @@
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.ServerTestBase;
+import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingSocket;
-import org.apache.thrift.server.ServerTestBase;
import thrift.test.CompactProtoTestStruct;
import thrift.test.Srv;
@@ -41,11 +45,16 @@
import thrift.test.Srv.AsyncClient.voidMethod_call;
public class TestTAsyncClientManager extends TestCase {
+ private static void fail(Throwable throwable) {
+ StringWriter sink = new StringWriter();
+ throwable.printStackTrace(new PrintWriter(sink, true));
+ fail("unexpected error " + sink.toString());
+ }
+
private static abstract class FailureLessCallback<T extends TAsyncMethodCall> implements AsyncMethodCallback<T> {
@Override
public void onError(Throwable throwable) {
- throwable.printStackTrace();
- fail("unexpected error " + throwable);
+ fail(throwable);
}
}
@@ -66,7 +75,6 @@
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
- // TODO Auto-generated catch block
e.printStackTrace();
}
return 0;
@@ -98,6 +106,7 @@
this.numCalls_ = numCalls;
this.clientSocket_ = new TNonblockingSocket(ServerTestBase.HOST, ServerTestBase.PORT);
this.client_ = new Srv.AsyncClient(new TBinaryProtocol.Factory(), acm_, clientSocket_);
+ this.client_.setTimeout(20000);
}
public int getNumSuccesses() {
@@ -105,55 +114,51 @@
}
public void run() {
- for (int i = 0; i < numCalls_; i++) {
+ for (int i = 0; i < numCalls_ && !client_.hasError(); i++) {
+ final int iteration = i;
try {
// connect an async client
- final Object o = new Object();
-
+ final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean jankyReturned = new AtomicBoolean(false);
client_.Janky(1, new AsyncMethodCallback<Srv.AsyncClient.Janky_call>() {
+
@Override
public void onComplete(Janky_call response) {
try {
assertEquals(3, response.getResult());
jankyReturned.set(true);
- synchronized(o) {
- o.notifyAll();
- }
+ latch.countDown();
} catch (TException e) {
- e.printStackTrace();
- synchronized(o) {
- o.notifyAll();
- }
- fail("unexpected exception: " + e);
+ latch.countDown();
+ fail(e);
}
}
@Override
public void onError(Throwable throwable) {
- System.out.println(throwable.toString());
- synchronized(o) {
- o.notifyAll();
+ try {
+ StringWriter sink = new StringWriter();
+ throwable.printStackTrace(new PrintWriter(sink, true));
+ fail("unexpected onError on iteration " + iteration + ": " + sink.toString());
+ } finally {
+ latch.countDown();
}
- fail("unexpected exception: " + throwable);
}
});
- synchronized(o) {
- o.wait(1000);
- }
-
- assertTrue(jankyReturned.get());
+ boolean calledBack = latch.await(30, TimeUnit.SECONDS);
+ assertTrue("wasn't called back in time on iteration " + iteration, calledBack);
+ assertTrue("onComplete not called on iteration " + iteration, jankyReturned.get());
this.numSuccesses_++;
} catch (Exception e) {
- fail("Unexpected " + e);
+ fail(e);
}
}
}
}
public void standardCallTest(Srv.AsyncClient client) throws Exception {
- final Object o = new Object();
+ final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean jankyReturned = new AtomicBoolean(false);
client.Janky(1, new FailureLessCallback<Srv.AsyncClient.Janky_call>() {
@Override
@@ -162,23 +167,20 @@
assertEquals(3, response.getResult());
jankyReturned.set(true);
} catch (TException e) {
- fail("unexpected exception: " + e);
- }
- synchronized(o) {
- o.notifyAll();
+ fail(e);
+ } finally {
+ latch.countDown();
}
}
});
- synchronized(o) {
- o.wait(100000);
- }
+ latch.await(100, TimeUnit.SECONDS);
assertTrue(jankyReturned.get());
}
public void testIt() throws Exception {
// put up a server
- final TNonblockingServer s = new TNonblockingServer(new Srv.Processor(new SrvHandler()),
+ final THsHaServer s = new THsHaServer(new Srv.Processor(new SrvHandler()),
new TNonblockingServerSocket(ServerTestBase.PORT));
new Thread(new Runnable() {
@Override
@@ -196,16 +198,17 @@
ServerTestBase.HOST, ServerTestBase.PORT);
Srv.AsyncClient client = new Srv.AsyncClient(new TBinaryProtocol.Factory(), acm, clientSock);
- final Object o = new Object();
-
// make a standard method call
standardCallTest(client);
// make a standard method call that succeeds within timeout
+ assertFalse(s.isStopped());
client.setTimeout(5000);
standardCallTest(client);
// make a void method call
+ assertFalse(s.isStopped());
+ final CountDownLatch voidLatch = new CountDownLatch(1);
final AtomicBoolean voidMethodReturned = new AtomicBoolean(false);
client.voidMethod(new FailureLessCallback<Srv.AsyncClient.voidMethod_call>() {
@Override
@@ -214,20 +217,18 @@
response.getResult();
voidMethodReturned.set(true);
} catch (TException e) {
- fail("unexpected exception " + e);
- }
- synchronized (o) {
- o.notifyAll();
+ fail(e);
+ } finally {
+ voidLatch.countDown();
}
}
});
-
- synchronized(o) {
- o.wait(1000);
- }
+ voidLatch.await(1, TimeUnit.SECONDS);
assertTrue(voidMethodReturned.get());
-
+
// make a oneway method call
+ assertFalse(s.isStopped());
+ final CountDownLatch onewayLatch = new CountDownLatch(1);
final AtomicBoolean onewayReturned = new AtomicBoolean(false);
client.onewayMethod(new FailureLessCallback<onewayMethod_call>() {
@Override
@@ -236,20 +237,18 @@
response.getResult();
onewayReturned.set(true);
} catch (TException e) {
- fail("unexpected exception " + e);
- }
- synchronized(o) {
- o.notifyAll();
+ fail(e);
+ } finally {
+ onewayLatch.countDown();
}
}
});
- synchronized(o) {
- o.wait(1000);
- }
-
+ onewayLatch.await(1, TimeUnit.SECONDS);
assertTrue(onewayReturned.get());
// make another standard method call
+ assertFalse(s.isStopped());
+ final CountDownLatch voidAfterOnewayLatch = new CountDownLatch(1);
final AtomicBoolean voidAfterOnewayReturned = new AtomicBoolean(false);
client.voidMethod(new FailureLessCallback<voidMethod_call>() {
@Override
@@ -258,20 +257,18 @@
response.getResult();
voidAfterOnewayReturned.set(true);
} catch (TException e) {
- fail("unexpected exception " + e);
- }
- synchronized(o) {
- o.notifyAll();
+ fail(e);
+ } finally {
+ voidAfterOnewayLatch.countDown();
}
}
});
- synchronized(o) {
- o.wait(1000);
- }
+ voidAfterOnewayLatch.await(1, TimeUnit.SECONDS);
assertTrue(voidAfterOnewayReturned.get());
// make multiple calls with deserialization in the selector thread (repro Eric's issue)
- int numThreads = 200;
+ assertFalse(s.isStopped());
+ int numThreads = 50;
int numCallsPerThread = 100;
List<JankyRunnable> runnables = new ArrayList<JankyRunnable>();
List<Thread> threads = new ArrayList<Thread>();
@@ -289,34 +286,38 @@
for (JankyRunnable runnable : runnables) {
numSuccesses += runnable.getNumSuccesses();
}
- assertEquals(numSuccesses, numThreads * numCallsPerThread);
+ assertEquals(numThreads * numCallsPerThread, numSuccesses);
// check that timeouts work
+ assertFalse(s.isStopped());
+ final CountDownLatch timeoutLatch = new CountDownLatch(1);
client.setTimeout(100);
client.primitiveMethod(new AsyncMethodCallback<primitiveMethod_call>() {
@Override
public void onError(Throwable throwable) {
- if (!(throwable instanceof TimeoutException)) {
- fail("should have received timeout exception");
- synchronized(o) {
- o.notifyAll();
+ try {
+ if (!(throwable instanceof TimeoutException)) {
+ StringWriter sink = new StringWriter();
+ throwable.printStackTrace(new PrintWriter(sink, true));
+ fail("expected TimeoutException but got " + sink.toString());
}
+ } finally {
+ timeoutLatch.countDown();
}
}
@Override
public void onComplete(primitiveMethod_call response) {
- fail("should not have finished timed out call.");
- synchronized(o) {
- o.notifyAll();
+ try {
+ fail("should not have finished timed out call.");
+ } finally {
+ timeoutLatch.countDown();
}
}
});
- synchronized(o) {
- o.wait(2000);
- }
+ timeoutLatch.await(2, TimeUnit.SECONDS);
assertTrue(client.hasError());
assertTrue(client.getError() instanceof TimeoutException);
}