THRIFT-862. java: Async client issues / improvements
This patch improves quite a large number of things about the async client code.
Patch: Ning Liang
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005221 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/org/apache/thrift/async/AsyncMethodCallback.java b/lib/java/src/org/apache/thrift/async/AsyncMethodCallback.java
index b8cd9ed..00004b7 100644
--- a/lib/java/src/org/apache/thrift/async/AsyncMethodCallback.java
+++ b/lib/java/src/org/apache/thrift/async/AsyncMethodCallback.java
@@ -18,6 +18,7 @@
*/
package org.apache.thrift.async;
+
public interface AsyncMethodCallback<T> {
/**
* This method will be called when the remote side has completed invoking
@@ -32,7 +33,7 @@
* This method will be called when there is an unexpected clientside
* exception. This does not include application-defined exceptions that
* appear in the IDL, but rather things like IOExceptions.
- * @param throwable
+ * @param exception
*/
- public void onError(Throwable throwable);
+ public void onError(Exception exception);
}
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncClient.java b/lib/java/src/org/apache/thrift/async/TAsyncClient.java
index 5c05aaa..468bc6e 100644
--- a/lib/java/src/org/apache/thrift/async/TAsyncClient.java
+++ b/lib/java/src/org/apache/thrift/async/TAsyncClient.java
@@ -26,7 +26,7 @@
protected final TNonblockingTransport transport;
protected final TAsyncClientManager manager;
protected TAsyncMethodCall currentMethod;
- private Throwable error;
+ private Exception error;
private long timeout;
public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport) {
@@ -44,7 +44,7 @@
return protocolFactory;
}
- public long getTimeout() {
+ public long getTimeout() {
return timeout;
}
@@ -68,7 +68,7 @@
* Get the client's error - returns null if no error
* @return
*/
- public Throwable getError() {
+ public Exception getError() {
return error;
}
@@ -94,9 +94,9 @@
/**
* Called by delegate method on error
*/
- protected void onError(Throwable throwable) {
+ protected void onError(Exception exception) {
transport.close();
currentMethod = null;
- error = throwable;
+ error = exception;
}
}
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java b/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
index d88b6ca..35fd353 100644
--- a/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
+++ b/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
@@ -23,12 +23,13 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
-import java.util.HashSet;
+import java.util.Comparator;
import java.util.Iterator;
-import java.util.Set;
+import java.util.TreeSet;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeoutException;
+import org.apache.commons.lang.ObjectUtils;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,7 +39,7 @@
*/
public class TAsyncClientManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TAsyncClientManager.class.getName());
-
+
private final SelectThread selectThread;
private final ConcurrentLinkedQueue<TAsyncMethodCall> pendingCalls = new ConcurrentLinkedQueue<TAsyncMethodCall>();
@@ -48,6 +49,9 @@
}
public void call(TAsyncMethodCall method) throws TException {
+ if (!isRunning()) {
+ throw new TException("SelectThread is not running");
+ }
method.prepareMethodCall();
pendingCalls.add(method);
selectThread.getSelector().wakeup();
@@ -56,18 +60,21 @@
public void stop() {
selectThread.finish();
}
-
+
+ public boolean isRunning() {
+ return selectThread.isAlive();
+ }
+
private class SelectThread extends Thread {
- // Selector waits at most SELECT_TIME milliseconds before waking
- private static final long SELECT_TIME = 5;
-
private final Selector selector;
private volatile boolean running;
- private final Set<TAsyncMethodCall> timeoutWatchSet = new HashSet<TAsyncMethodCall>();
+ private final TreeSet<TAsyncMethodCall> timeoutWatchSet = new TreeSet<TAsyncMethodCall>(new TAsyncMethodCallTimeoutComparator());
public SelectThread() throws IOException {
this.selector = SelectorProvider.provider().openSelector();
this.running = true;
+ this.setName("TAsyncClientManager#SelectorThread " + this.getId());
+
// We don't want to hold up the JVM when shutting down
setDaemon(true);
}
@@ -85,16 +92,29 @@
while (running) {
try {
try {
- selector.select(SELECT_TIME);
+ if (timeoutWatchSet.size() == 0) {
+ // No timeouts, so select indefinitely
+ selector.select();
+ } else {
+ // We have a timeout pending, so calculate the time until then and select appropriately
+ long nextTimeout = timeoutWatchSet.first().getTimeoutTimestamp();
+ long selectTime = nextTimeout - System.currentTimeMillis();
+ if (selectTime > 0) {
+ // Next timeout is in the future, select and wake up then
+ selector.select(selectTime);
+ } else {
+ // Next timeout is now or in past, select immediately so we can time out
+ selector.selectNow();
+ }
+ }
} catch (IOException e) {
LOGGER.error("Caught IOException in TAsyncClientManager!", e);
}
-
transitionMethods();
- timeoutIdleMethods();
+ timeoutMethods();
startPendingMethods();
- } catch (Throwable throwable) {
- LOGGER.error("Ignoring uncaught exception in SelectThread", throwable);
+ } catch (Exception exception) {
+ LOGGER.error("Ignoring uncaught exception in SelectThread", exception);
}
}
}
@@ -126,18 +146,16 @@
}
// Timeout any existing method calls
- private void timeoutIdleMethods() {
+ private void timeoutMethods() {
Iterator<TAsyncMethodCall> iterator = timeoutWatchSet.iterator();
+ long currentTime = System.currentTimeMillis();
while (iterator.hasNext()) {
TAsyncMethodCall methodCall = iterator.next();
- long clientTimeout = methodCall.getClient().getTimeout();
- long timeElapsed = System.currentTimeMillis() - methodCall.getLastTransitionTime();
-
- if (timeElapsed > clientTimeout) {
+ if (currentTime >= methodCall.getTimeoutTimestamp()) {
iterator.remove();
- methodCall.onError(new TimeoutException("Operation " +
- methodCall.getClass() + " timed out after " + timeElapsed +
- " milliseconds."));
+ methodCall.onError(new TimeoutException("Operation " + methodCall.getClass() + " timed out after " + (currentTime - methodCall.getStartTime()) + " ms."));
+ } else {
+ break;
}
}
}
@@ -149,17 +167,30 @@
// Catch registration errors. method will catch transition errors and cleanup.
try {
methodCall.start(selector);
-
+
// If timeout specified and first transition went smoothly, add to timeout watch set
TAsyncClient client = methodCall.getClient();
if (client.hasTimeout() && !client.hasError()) {
timeoutWatchSet.add(methodCall);
}
- } catch (Throwable e) {
- LOGGER.warn("Caught throwable in TAsyncClientManager!", e);
- methodCall.onError(e);
+ } catch (Exception exception) {
+ LOGGER.warn("Caught exception in TAsyncClientManager!", exception);
+ methodCall.onError(exception);
}
}
}
}
+
+ // Comparator used in TreeSet
+ private static class TAsyncMethodCallTimeoutComparator implements Comparator<TAsyncMethodCall> {
+ @Override
+ public int compare(TAsyncMethodCall left, TAsyncMethodCall right) {
+ if (left.getTimeoutTimestamp() == right.getTimeoutTimestamp()) {
+ return (int)(left.getSequenceId() - right.getSequenceId());
+ } else {
+ return (int)(left.getTimeoutTimestamp() - right.getTimeoutTimestamp());
+ }
+ }
+ }
+
}
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
index e75f4ab..fcd50ea 100644
--- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
+++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
@@ -22,6 +22,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TProtocol;
@@ -38,9 +39,10 @@
* - public T getResult() throws <Exception_1>, <Exception_2>, ...
* @param <T>
*/
-public abstract class TAsyncMethodCall<T extends TAsyncMethodCall> {
+public abstract class TAsyncMethodCall<T> {
private static final int INITIAL_MEMORY_BUFFER_SIZE = 128;
+ private static AtomicLong sequenceIdCounter = new AtomicLong(0);
public static enum State {
CONNECTING,
@@ -62,20 +64,21 @@
protected final TAsyncClient client;
private final AsyncMethodCallback<T> callback;
private final boolean isOneway;
-
- private long lastTransitionTime;
-
+ private long sequenceId;
+
private ByteBuffer sizeBuffer;
private final byte[] sizeBufferArray = new byte[4];
private ByteBuffer frameBuffer;
+ private long startTime = System.currentTimeMillis();
+
protected TAsyncMethodCall(TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport, AsyncMethodCallback<T> callback, boolean isOneway) {
this.transport = transport;
this.callback = callback;
this.protocolFactory = protocolFactory;
this.client = client;
this.isOneway = isOneway;
- this.lastTransitionTime = System.currentTimeMillis();
+ this.sequenceId = TAsyncMethodCall.sequenceIdCounter.getAndIncrement();
}
protected State getState() {
@@ -86,13 +89,25 @@
return state == State.RESPONSE_READ;
}
- protected long getLastTransitionTime() {
- return lastTransitionTime;
+ protected long getStartTime() {
+ return startTime;
+ }
+
+ protected long getSequenceId() {
+ return sequenceId;
}
public TAsyncClient getClient() {
return client;
}
+
+ public boolean hasTimeout() {
+ return client.hasTimeout();
+ }
+
+ public long getTimeoutTimestamp() {
+ return client.getTimeout() + startTime;
+ }
protected abstract void write_args(TProtocol protocol) throws TException;
@@ -181,15 +196,14 @@
throw new IllegalStateException("Method call in state " + state
+ " but selector called transition method. Seems like a bug...");
}
- lastTransitionTime = System.currentTimeMillis();
- } catch (Throwable e) {
+ } catch (Exception e) {
key.cancel();
key.attach(null);
onError(e);
}
}
- protected void onError(Throwable e) {
+ protected void onError(Exception e) {
client.onError(e);
callback.onError(e);
state = State.ERROR;
diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java
index e26b9cf..aa1e1e5 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingServerSocket.java
@@ -30,10 +30,15 @@
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
+import org.apache.thrift.async.TAsyncClientManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Wrapper around ServerSocketChannel
*/
public class TNonblockingServerSocket extends TNonblockingServerTransport {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingServerTransport.class.getName());
/**
* This channel is where all the nonblocking magic happens.
@@ -152,8 +157,7 @@
try {
serverSocket_.close();
} catch (IOException iox) {
- System.err.println("WARNING: Could not close server socket: " +
- iox.getMessage());
+ LOGGER.warn("WARNING: Could not close server socket: " + iox.getMessage());
}
serverSocket_ = null;
}
diff --git a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
index 187c8f8..72a57bc 100644
--- a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
+++ b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
@@ -18,6 +18,7 @@
*/
package org.apache.thrift.async;
+import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.ArrayList;
@@ -45,31 +46,182 @@
import thrift.test.Srv.AsyncClient.voidMethod_call;
public class TestTAsyncClientManager extends TestCase {
- private static void fail(Throwable throwable) {
- StringWriter sink = new StringWriter();
- throwable.printStackTrace(new PrintWriter(sink, true));
- fail("unexpected error " + sink.toString());
+
+ private THsHaServer server_;
+ private Thread serverThread_;
+ private TAsyncClientManager clientManager_;
+
+ public void setUp() throws Exception {
+ server_ = new THsHaServer(new Srv.Processor(new SrvHandler()), new TNonblockingServerSocket(ServerTestBase.PORT));
+ serverThread_ = new Thread(new Runnable() {
+ public void run() {
+ server_.serve();
+ }
+ });
+ serverThread_.start();
+ clientManager_ = new TAsyncClientManager();
+ Thread.sleep(500);
}
-
- private static abstract class FailureLessCallback<T extends TAsyncMethodCall> implements AsyncMethodCallback<T> {
- @Override
- public void onError(Throwable throwable) {
- fail(throwable);
+
+ public void tearDown() throws Exception {
+ server_.stop();
+ clientManager_.stop();
+ serverThread_.join();
+ }
+
+ public void testBasicCall() throws Exception {
+ Srv.AsyncClient client = getClient();
+ basicCall(client);
+ }
+
+ public void testBasicCallWithTimeout() throws Exception {
+ Srv.AsyncClient client = getClient();
+ client.setTimeout(5000);
+ basicCall(client);
+ }
+
+ public void testTimeoutCall() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ Srv.AsyncClient client = getClient();
+ client.setTimeout(100);
+ client.primitiveMethod(new AsyncMethodCallback<primitiveMethod_call>() {
+ @Override
+ public void onError(Exception exception) {
+ try {
+ if (!(exception instanceof TimeoutException)) {
+ StringWriter sink = new StringWriter();
+ exception.printStackTrace(new PrintWriter(sink, true));
+ fail("expected TimeoutException but got " + sink.toString());
+ }
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ @Override
+ public void onComplete(primitiveMethod_call response) {
+ try {
+ fail("Should not have finished timed out call.");
+ } finally {
+ latch.countDown();
+ }
+ }
+ });
+ latch.await(2, TimeUnit.SECONDS);
+ assertTrue(client.hasError());
+ assertTrue(client.getError() instanceof TimeoutException);
+ }
+
+ public void testVoidCall() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicBoolean returned = new AtomicBoolean(false);
+ Srv.AsyncClient client = getClient();
+ client.voidMethod(new FailureLessCallback<Srv.AsyncClient.voidMethod_call>() {
+ @Override
+ public void onComplete(voidMethod_call response) {
+ try {
+ response.getResult();
+ returned.set(true);
+ } catch (TException e) {
+ fail(e);
+ } finally {
+ latch.countDown();
+ }
+ }
+ });
+ latch.await(1, TimeUnit.SECONDS);
+ assertTrue(returned.get());
+ }
+
+ public void testOnewayCall() throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicBoolean returned = new AtomicBoolean(false);
+ Srv.AsyncClient client = getClient();
+ client.onewayMethod(new FailureLessCallback<onewayMethod_call>() {
+ @Override
+ public void onComplete(onewayMethod_call response) {
+ try {
+ response.getResult();
+ returned.set(true);
+ } catch (TException e) {
+ fail(e);
+ } finally {
+ latch.countDown();
+ }
+ }
+ });
+ latch.await(1, TimeUnit.SECONDS);
+ assertTrue(returned.get());
+ }
+
+ public void testParallelCalls() throws Exception {
+ // make multiple calls with deserialization in the selector thread (repro Eric's issue)
+ int numThreads = 50;
+ int numCallsPerThread = 100;
+ List<JankyRunnable> runnables = new ArrayList<JankyRunnable>();
+ List<Thread> threads = new ArrayList<Thread>();
+ for (int i = 0; i < numThreads; i++) {
+ JankyRunnable runnable = new JankyRunnable(numCallsPerThread);
+ Thread thread = new Thread(runnable);
+ thread.start();
+ threads.add(thread);
+ runnables.add(runnable);
}
+ for (Thread thread : threads) {
+ thread.join();
+ }
+ int numSuccesses = 0;
+ for (JankyRunnable runnable : runnables) {
+ numSuccesses += runnable.getNumSuccesses();
+ }
+ assertEquals(numThreads * numCallsPerThread, numSuccesses);
+ }
+
+ private Srv.AsyncClient getClient() throws IOException {
+ TNonblockingSocket clientSocket = new TNonblockingSocket(ServerTestBase.HOST, ServerTestBase.PORT);
+ return new Srv.AsyncClient(new TBinaryProtocol.Factory(), clientManager_, clientSocket);
}
-
+
+ private void basicCall(Srv.AsyncClient client) throws Exception {
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicBoolean returned = new AtomicBoolean(false);
+ client.Janky(1, new FailureLessCallback<Srv.AsyncClient.Janky_call>() {
+ @Override
+ public void onComplete(Janky_call response) {
+ try {
+ assertEquals(3, response.getResult());
+ returned.set(true);
+ } catch (TException e) {
+ fail(e);
+ } finally {
+ latch.countDown();
+ }
+ }
+
+ @Override
+ public void onError(Exception exception) {
+ try {
+ StringWriter sink = new StringWriter();
+ exception.printStackTrace(new PrintWriter(sink, true));
+ fail("unexpected onError with exception " + sink.toString());
+ } finally {
+ latch.countDown();
+ }
+ }
+ });
+ latch.await(100, TimeUnit.SECONDS);
+ assertTrue(returned.get());
+ }
+
public class SrvHandler implements Iface {
+ // Use this method for a standard call testing
@Override
public int Janky(int arg) throws TException {
assertEquals(1, arg);
return 3;
}
- @Override
- public void methodWithDefaultArgs(int something) throws TException {
- }
-
- // Using this method for timeout testing
+ // Using this method for timeout testing - sleeps for 1 second before returning
@Override
public int primitiveMethod() throws TException {
try {
@@ -79,6 +231,9 @@
}
return 0;
}
+
+ @Override
+ public void methodWithDefaultArgs(int something) throws TException { }
@Override
public CompactProtoTestStruct structMethod() throws TException {
@@ -93,20 +248,29 @@
public void onewayMethod() throws TException {
}
}
-
- public class JankyRunnable implements Runnable {
- private TAsyncClientManager acm_;
+
+ private static abstract class FailureLessCallback<T extends TAsyncMethodCall> implements AsyncMethodCallback<T> {
+ @Override
+ public void onError(Exception exception) {
+ fail(exception);
+ }
+ }
+
+ private static void fail(Exception exception) {
+ StringWriter sink = new StringWriter();
+ exception.printStackTrace(new PrintWriter(sink, true));
+ fail("unexpected error " + sink.toString());
+ }
+
+ private class JankyRunnable implements Runnable {
private int numCalls_;
private int numSuccesses_ = 0;
private Srv.AsyncClient client_;
- private TNonblockingSocket clientSocket_;
- public JankyRunnable(TAsyncClientManager acm, int numCalls) throws Exception {
- this.acm_ = acm;
- this.numCalls_ = numCalls;
- this.clientSocket_ = new TNonblockingSocket(ServerTestBase.HOST, ServerTestBase.PORT);
- this.client_ = new Srv.AsyncClient(new TBinaryProtocol.Factory(), acm_, clientSocket_);
- this.client_.setTimeout(20000);
+ public JankyRunnable(int numCalls) throws Exception {
+ numCalls_ = numCalls;
+ client_ = getClient();
+ client_.setTimeout(20000);
}
public int getNumSuccesses() {
@@ -119,14 +283,14 @@
try {
// connect an async client
final CountDownLatch latch = new CountDownLatch(1);
- final AtomicBoolean jankyReturned = new AtomicBoolean(false);
+ final AtomicBoolean returned = new AtomicBoolean(false);
client_.Janky(1, new AsyncMethodCallback<Srv.AsyncClient.Janky_call>() {
-
+
@Override
public void onComplete(Janky_call response) {
try {
assertEquals(3, response.getResult());
- jankyReturned.set(true);
+ returned.set(true);
latch.countDown();
} catch (TException e) {
latch.countDown();
@@ -135,10 +299,10 @@
}
@Override
- public void onError(Throwable throwable) {
+ public void onError(Exception exception) {
try {
StringWriter sink = new StringWriter();
- throwable.printStackTrace(new PrintWriter(sink, true));
+ exception.printStackTrace(new PrintWriter(sink, true));
fail("unexpected onError on iteration " + iteration + ": " + sink.toString());
} finally {
latch.countDown();
@@ -148,7 +312,7 @@
boolean calledBack = latch.await(30, TimeUnit.SECONDS);
assertTrue("wasn't called back in time on iteration " + iteration, calledBack);
- assertTrue("onComplete not called on iteration " + iteration, jankyReturned.get());
+ assertTrue("onComplete not called on iteration " + iteration, returned.get());
this.numSuccesses_++;
} catch (Exception e) {
fail(e);
@@ -156,173 +320,4 @@
}
}
}
-
- public void standardCallTest(Srv.AsyncClient client) throws Exception {
- final CountDownLatch latch = new CountDownLatch(1);
- final AtomicBoolean jankyReturned = new AtomicBoolean(false);
- client.Janky(1, new FailureLessCallback<Srv.AsyncClient.Janky_call>() {
- @Override
- public void onComplete(Janky_call response) {
- try {
- assertEquals(3, response.getResult());
- jankyReturned.set(true);
- } catch (TException e) {
- fail(e);
- } finally {
- latch.countDown();
- }
- }
- });
-
- latch.await(100, TimeUnit.SECONDS);
- assertTrue(jankyReturned.get());
- }
-
- public void testIt() throws Exception {
- // put up a server
- final THsHaServer s = new THsHaServer(new Srv.Processor(new SrvHandler()),
- new TNonblockingServerSocket(ServerTestBase.PORT));
- new Thread(new Runnable() {
- @Override
- public void run() {
- s.serve();
- }
- }).start();
- Thread.sleep(1000);
-
- // set up async client manager
- TAsyncClientManager acm = new TAsyncClientManager();
-
- // connect an async client
- TNonblockingSocket clientSock = new TNonblockingSocket(
- ServerTestBase.HOST, ServerTestBase.PORT);
- Srv.AsyncClient client = new Srv.AsyncClient(new TBinaryProtocol.Factory(), acm, clientSock);
-
- // make a standard method call
- standardCallTest(client);
-
- // make a standard method call that succeeds within timeout
- assertFalse(s.isStopped());
- client.setTimeout(5000);
- standardCallTest(client);
-
- // make a void method call
- assertFalse(s.isStopped());
- final CountDownLatch voidLatch = new CountDownLatch(1);
- final AtomicBoolean voidMethodReturned = new AtomicBoolean(false);
- client.voidMethod(new FailureLessCallback<Srv.AsyncClient.voidMethod_call>() {
- @Override
- public void onComplete(voidMethod_call response) {
- try {
- response.getResult();
- voidMethodReturned.set(true);
- } catch (TException e) {
- fail(e);
- } finally {
- voidLatch.countDown();
- }
- }
- });
- voidLatch.await(1, TimeUnit.SECONDS);
- assertTrue(voidMethodReturned.get());
-
- // make a oneway method call
- assertFalse(s.isStopped());
- final CountDownLatch onewayLatch = new CountDownLatch(1);
- final AtomicBoolean onewayReturned = new AtomicBoolean(false);
- client.onewayMethod(new FailureLessCallback<onewayMethod_call>() {
- @Override
- public void onComplete(onewayMethod_call response) {
- try {
- response.getResult();
- onewayReturned.set(true);
- } catch (TException e) {
- fail(e);
- } finally {
- onewayLatch.countDown();
- }
- }
- });
- onewayLatch.await(1, TimeUnit.SECONDS);
- assertTrue(onewayReturned.get());
-
- // make another standard method call
- assertFalse(s.isStopped());
- final CountDownLatch voidAfterOnewayLatch = new CountDownLatch(1);
- final AtomicBoolean voidAfterOnewayReturned = new AtomicBoolean(false);
- client.voidMethod(new FailureLessCallback<voidMethod_call>() {
- @Override
- public void onComplete(voidMethod_call response) {
- try {
- response.getResult();
- voidAfterOnewayReturned.set(true);
- } catch (TException e) {
- fail(e);
- } finally {
- voidAfterOnewayLatch.countDown();
- }
- }
- });
- voidAfterOnewayLatch.await(1, TimeUnit.SECONDS);
- assertTrue(voidAfterOnewayReturned.get());
-
- // make multiple calls with deserialization in the selector thread (repro Eric's issue)
- assertFalse(s.isStopped());
- int numThreads = 50;
- int numCallsPerThread = 100;
- List<JankyRunnable> runnables = new ArrayList<JankyRunnable>();
- List<Thread> threads = new ArrayList<Thread>();
- for (int i = 0; i < numThreads; i++) {
- JankyRunnable runnable = new JankyRunnable(acm, numCallsPerThread);
- Thread thread = new Thread(runnable);
- thread.start();
- threads.add(thread);
- runnables.add(runnable);
- }
- for (Thread thread : threads) {
- thread.join();
- }
- int numSuccesses = 0;
- for (JankyRunnable runnable : runnables) {
- numSuccesses += runnable.getNumSuccesses();
- }
- assertEquals(numThreads * numCallsPerThread, numSuccesses);
-
- // check that timeouts work
- assertFalse(s.isStopped());
- assertTrue(clientSock.isOpen());
- final CountDownLatch timeoutLatch = new CountDownLatch(1);
- client.setTimeout(100);
- client.primitiveMethod(new AsyncMethodCallback<primitiveMethod_call>() {
-
- @Override
- public void onError(Throwable throwable) {
- try {
- if (!(throwable instanceof TimeoutException)) {
- StringWriter sink = new StringWriter();
- throwable.printStackTrace(new PrintWriter(sink, true));
- fail("expected TimeoutException but got " + sink.toString());
- }
- } finally {
- timeoutLatch.countDown();
- }
- }
-
- @Override
- public void onComplete(primitiveMethod_call response) {
- try {
- fail("should not have finished timed out call.");
- } finally {
- timeoutLatch.countDown();
- }
- }
-
- });
- timeoutLatch.await(2, TimeUnit.SECONDS);
- assertTrue(client.hasError());
- assertTrue(client.getError() instanceof TimeoutException);
-
- // error closes socket and make sure isOpen reflects that
- assertFalse(clientSock.isOpen());
- }
-}
+}
\ No newline at end of file