THRIFT-845. java: async client does not respect timeout
This patch adds timeout handling to async method calls through TAsyncClientManager.
Patch: Ning Liang
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@987323 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncClient.java b/lib/java/src/org/apache/thrift/async/TAsyncClient.java
index 2e8dea3..0355f80 100644
--- a/lib/java/src/org/apache/thrift/async/TAsyncClient.java
+++ b/lib/java/src/org/apache/thrift/async/TAsyncClient.java
@@ -27,17 +27,35 @@
protected final TAsyncClientManager manager;
private TAsyncMethodCall currentMethod;
private Throwable error;
+ private long timeout;
public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport) {
+ this(protocolFactory, manager, transport, 0);
+ }
+
+ public TAsyncClient(TProtocolFactory protocolFactory, TAsyncClientManager manager, TNonblockingTransport transport, long timeout) {
this.protocolFactory = protocolFactory;
this.manager = manager;
this.transport = transport;
+ this.timeout = timeout;
}
public TProtocolFactory getProtocolFactory() {
return protocolFactory;
}
+ public long getTimeout() {
+ return timeout;
+ }
+
+ public boolean hasTimeout() {
+ return timeout > 0;
+ }
+
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
/**
* Is the client in an error state?
* @return
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java b/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
index 1d32ace..5464d7e 100644
--- a/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
+++ b/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
@@ -19,14 +19,15 @@
package org.apache.thrift.async;
import java.io.IOException;
+import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
-import java.nio.channels.CancelledKeyException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.ClosedSelectorException;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeoutException;
import org.apache.thrift.TException;
import org.slf4j.Logger;
@@ -35,6 +36,7 @@
/**
* Contains selector thread which transitions method call objects
*/
+@SuppressWarnings("unchecked")
public class TAsyncClientManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TAsyncClientManager.class.getName());
@@ -57,8 +59,12 @@
}
private class SelectThread extends Thread {
+ // Selector waits at most SELECT_TIME milliseconds before waking
+ private static final long SELECT_TIME = 200;
+
private final Selector selector;
private volatile boolean running;
+ private final Set<TAsyncMethodCall> timeoutWatchSet = new HashSet<TAsyncMethodCall>();
public SelectThread() throws IOException {
this.selector = SelectorProvider.provider().openSelector();
@@ -79,46 +85,76 @@
public void run() {
while (running) {
try {
- selector.select();
+ selector.select(SELECT_TIME);
} catch (IOException e) {
LOGGER.error("Caught IOException in TAsyncClientManager!", e);
}
- // Handle any ready channels calls
- try {
- Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
- while (keys.hasNext()) {
- SelectionKey key = keys.next();
- keys.remove();
- if (!key.isValid()) {
- // this should only have happened if the method call experienced an
- // error and the key was cancelled. just skip it.
- continue;
- }
- TAsyncMethodCall method = (TAsyncMethodCall)key.attachment();
- method.transition(key);
- }
- } catch (ClosedSelectorException e) {
- LOGGER.error("Caught ClosedSelectorException in TAsyncClientManager!", e);
- }
+ transitionMethods();
+ timeoutIdleMethods();
+ startPendingMethods();
+ }
+ }
- // Start any new calls
- TAsyncMethodCall methodCall;
- while ((methodCall = pendingCalls.poll()) != null) {
- // Catch registration errors. Method will catch transition errors and cleanup.
- try {
- SelectionKey key = methodCall.registerWithSelector(selector);
- methodCall.transition(key);
- } catch (ClosedChannelException e) {
- methodCall.onError(e);
- LOGGER.warn("Caught ClosedChannelException in TAsyncClientManager!", e);
- } catch (CancelledKeyException e) {
- methodCall.onError(e);
- LOGGER.warn("Caught CancelledKeyExce115ption in TAsyncClientManager!", e);
- } catch (Exception e) {
- methodCall.onError(e);
- LOGGER.warn("Caught unexpected exception in TAsyncClientManager!", e);
- }
+ // Transition methods for ready keys
+ private void transitionMethods() {
+ try {
+ Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
+ while (keys.hasNext()) {
+ SelectionKey key = keys.next();
+ keys.remove();
+ if (!key.isValid()) {
+ // this can happen if the method call experienced an error and the key was cancelled
+ // this can also happen if we timeout a method, which results in a channel close
+ // just skip
+ continue;
+ }
+ TAsyncMethodCall methodCall = (TAsyncMethodCall)key.attachment();
+ methodCall.transition(key);
+
+ // If done or error occurred, remove from timeout watch set
+ if (methodCall.isFinished() || methodCall.getClient().hasError()) {
+ timeoutWatchSet.remove(methodCall);
+ }
+ }
+ } catch (ClosedSelectorException e) {
+ LOGGER.error("Caught ClosedSelectorException in TAsyncClientManager!", e);
+ }
+ }
+
+ // Timeout any existing method calls
+ private void timeoutIdleMethods() {
+ Iterator<TAsyncMethodCall> iterator = timeoutWatchSet.iterator();
+ while (iterator.hasNext()) {
+ TAsyncMethodCall methodCall = iterator.next();
+ long clientTimeout = methodCall.getClient().getTimeout();
+ long timeElapsed = System.currentTimeMillis() - methodCall.getLastTransitionTime();
+ if (timeElapsed > clientTimeout) {
+ iterator.remove();
+ methodCall.onError(new TimeoutException("Operation " +
+ methodCall.getClass() + " timed out after " + timeElapsed +
+ " milliseconds."));
+ }
+ }
+ }
+
+ // Start any new calls
+ private void startPendingMethods() {
+ TAsyncMethodCall methodCall;
+ while ((methodCall = pendingCalls.poll()) != null) {
+ // Catch registration errors. method will catch transition errors and cleanup.
+ try {
+ SelectionKey key = methodCall.registerWithSelector(selector);
+ methodCall.transition(key);
+
+ // If timeout specified and first transition went smoothly, add to timeout watch set
+ TAsyncClient client = methodCall.getClient();
+ if (client.hasTimeout() && !client.hasError()) {
+ timeoutWatchSet.add(methodCall);
+ }
+ } catch (Throwable e) {
+ LOGGER.warn("Caught throwable in TAsyncClientManager!", e);
+ methodCall.onError(e);
}
}
}
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
index eca321b..5568afb 100644
--- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
+++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
@@ -55,6 +55,7 @@
protected final TAsyncClient client;
private final AsyncMethodCallback<T> callback;
private final boolean isOneway;
+ private long lastTransitionTime;
private ByteBuffer sizeBuffer;
private final byte[] sizeBufferArray = new byte[4];
@@ -76,6 +77,18 @@
return state;
}
+ protected boolean isFinished() {
+ return state == State.RESPONSE_READ;
+ }
+
+ protected long getLastTransitionTime() {
+ return lastTransitionTime;
+ }
+
+ public TAsyncClient getClient() {
+ return client;
+ }
+
protected abstract void write_args(TProtocol protocol) throws TException;
protected void prepareMethodCall() throws TException {
@@ -135,13 +148,14 @@
throw new IllegalStateException("Method call in state " + state
+ " but selector called transition method. Seems like a bug...");
}
+ lastTransitionTime = System.currentTimeMillis();
} catch (Throwable e) {
key.cancel();
key.attach(null);
onError(e);
}
}
-
+
protected void onError(Throwable e) {
state = State.ERROR;
client.onError(e);
diff --git a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
index 2535067..55b054a 100644
--- a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
+++ b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
@@ -18,6 +18,9 @@
*/
package org.apache.thrift.async;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.TestCase;
@@ -28,14 +31,12 @@
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingSocket;
-import java.util.List;
-import java.util.ArrayList;
-
import thrift.test.CompactProtoTestStruct;
import thrift.test.Srv;
import thrift.test.Srv.Iface;
import thrift.test.Srv.AsyncClient.Janky_call;
import thrift.test.Srv.AsyncClient.onewayMethod_call;
+import thrift.test.Srv.AsyncClient.primitiveMethod_call;
import thrift.test.Srv.AsyncClient.voidMethod_call;
public class TestTAsyncClientManager extends TestCase {
@@ -58,8 +59,15 @@
public void methodWithDefaultArgs(int something) throws TException {
}
+ // Using this method for timeout testing
@Override
public int primitiveMethod() throws TException {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
return 0;
}
@@ -76,31 +84,31 @@
public void onewayMethod() throws TException {
}
}
-
+
public class JankyRunnable implements Runnable {
private TAsyncClientManager acm_;
private int numCalls_;
private int numSuccesses_ = 0;
private Srv.AsyncClient client_;
private TNonblockingSocket clientSocket_;
-
+
public JankyRunnable(TAsyncClientManager acm, int numCalls) throws Exception {
this.acm_ = acm;
this.numCalls_ = numCalls;
this.clientSocket_ = new TNonblockingSocket("localhost", 12345);
this.client_ = new Srv.AsyncClient(new TBinaryProtocol.Factory(), acm_, clientSocket_);
}
-
+
public int getNumSuccesses() {
return numSuccesses_;
}
-
+
public void run() {
for (int i = 0; i < numCalls_; i++) {
- try {
+ try {
// connect an async client
final Object o = new Object();
-
+
final AtomicBoolean jankyReturned = new AtomicBoolean(false);
client_.Janky(1, new AsyncMethodCallback<Srv.AsyncClient.Janky_call>() {
@Override
@@ -112,28 +120,28 @@
o.notifyAll();
}
} catch (TException e) {
- e.printStackTrace();
+ e.printStackTrace();
synchronized(o) {
o.notifyAll();
}
fail("unexpected exception: " + e);
- }
-
+ }
}
-
+
@Override
public void onError(Throwable throwable) {
+ System.out.println(throwable.toString());
synchronized(o) {
o.notifyAll();
}
- fail("unexpected exception: " + throwable);
+ fail("unexpected exception: " + throwable);
}
});
-
+
synchronized(o) {
o.wait(1000);
}
-
+
assertTrue(jankyReturned.get());
this.numSuccesses_++;
} catch (Exception e) {
@@ -143,6 +151,30 @@
}
}
+ public void standardCallTest(Srv.AsyncClient client) throws Exception {
+ final Object o = new Object();
+ final AtomicBoolean jankyReturned = new AtomicBoolean(false);
+ client.Janky(1, new FailureLessCallback<Srv.AsyncClient.Janky_call>() {
+ @Override
+ public void onComplete(Janky_call response) {
+ try {
+ assertEquals(3, response.getResult());
+ jankyReturned.set(true);
+ } catch (TException e) {
+ fail("unexpected exception: " + e);
+ }
+ synchronized(o) {
+ o.notifyAll();
+ }
+ }
+ });
+
+ synchronized(o) {
+ o.wait(100000);
+ }
+ assertTrue(jankyReturned.get());
+ }
+
public void testIt() throws Exception {
// put up a server
final TNonblockingServer s = new TNonblockingServer(new Srv.Processor(new SrvHandler()), new TNonblockingServerSocket(12345));
@@ -164,26 +196,11 @@
final Object o = new Object();
// make a standard method call
- final AtomicBoolean jankyReturned = new AtomicBoolean(false);
- client.Janky(1, new FailureLessCallback<Srv.AsyncClient.Janky_call>() {
- @Override
- public void onComplete(Janky_call response) {
- try {
- assertEquals(3, response.getResult());
- jankyReturned.set(true);
- } catch (TException e) {
- fail("unexpected exception: " + e);
- }
- synchronized(o) {
- o.notifyAll();
- }
- }
- });
+ standardCallTest(client);
- synchronized(o) {
- o.wait(100000);
- }
- assertTrue(jankyReturned.get());
+ // make a standard method call that succeeds within timeout
+ client.setTimeout(5000);
+ standardCallTest(client);
// make a void method call
final AtomicBoolean voidMethodReturned = new AtomicBoolean(false);
@@ -249,7 +266,7 @@
o.wait(1000);
}
assertTrue(voidAfterOnewayReturned.get());
-
+
// make multiple calls with deserialization in the selector thread (repro Eric's issue)
int numThreads = 500;
int numCallsPerThread = 100;
@@ -270,5 +287,34 @@
numSuccesses += runnable.getNumSuccesses();
}
assertEquals(numSuccesses, numThreads * numCallsPerThread);
+
+ // check that timeouts work
+ client.setTimeout(100);
+ client.primitiveMethod(new AsyncMethodCallback<primitiveMethod_call>() {
+
+ @Override
+ public void onError(Throwable throwable) {
+ if (!(throwable instanceof TimeoutException)) {
+ fail("should have received timeout exception");
+ synchronized(o) {
+ o.notifyAll();
+ }
+ }
+ }
+
+ @Override
+ public void onComplete(primitiveMethod_call response) {
+ fail("should not have finished timed out call.");
+ synchronized(o) {
+ o.notifyAll();
+ }
+ }
+
+ });
+ synchronized(o) {
+ o.wait(2000);
+ }
+ assertTrue(client.hasError());
+ assertTrue(client.getError() instanceof TimeoutException);
}
}