THRIFT-888. java: async client should also have nonblocking connect

This patch adds optional nonblocking connect behavior.

Patch: Eric Jensen

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@995262 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java b/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
index 5464d7e..d88b6ca 100644
--- a/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
+++ b/lib/java/src/org/apache/thrift/async/TAsyncClientManager.java
@@ -36,7 +36,6 @@
 /**
  * Contains selector thread which transitions method call objects
  */
-@SuppressWarnings("unchecked")
 public class TAsyncClientManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(TAsyncClientManager.class.getName());
 
@@ -60,7 +59,7 @@
 
   private class SelectThread extends Thread {
     // Selector waits at most SELECT_TIME milliseconds before waking
-    private static final long SELECT_TIME = 200;
+    private static final long SELECT_TIME = 5;
 
     private final Selector selector;
     private volatile boolean running;
@@ -85,14 +84,18 @@
     public void run() {
       while (running) {
         try {
-          selector.select(SELECT_TIME);
-        } catch (IOException e) {
-          LOGGER.error("Caught IOException in TAsyncClientManager!", e);
-        }
+          try {
+            selector.select(SELECT_TIME);
+          } catch (IOException e) {
+            LOGGER.error("Caught IOException in TAsyncClientManager!", e);
+          }
 
-        transitionMethods();
-        timeoutIdleMethods();
-        startPendingMethods();
+          transitionMethods();
+          timeoutIdleMethods();
+          startPendingMethods();
+        } catch (Throwable throwable) {
+          LOGGER.error("Ignoring uncaught exception in SelectThread", throwable);
+        }
       }
     }
 
@@ -129,10 +132,11 @@
         TAsyncMethodCall methodCall = iterator.next();
         long clientTimeout = methodCall.getClient().getTimeout();
         long timeElapsed = System.currentTimeMillis() - methodCall.getLastTransitionTime();
+
         if (timeElapsed > clientTimeout) {
           iterator.remove();
-          methodCall.onError(new TimeoutException("Operation " + 
-              methodCall.getClass() + " timed out after " + timeElapsed + 
+          methodCall.onError(new TimeoutException("Operation " +
+              methodCall.getClass() + " timed out after " + timeElapsed +
               " milliseconds."));
         }
       }
@@ -144,8 +148,7 @@
       while ((methodCall = pendingCalls.poll()) != null) {
         // Catch registration errors. method will catch transition errors and cleanup.
         try {
-          SelectionKey key = methodCall.registerWithSelector(selector);
-          methodCall.transition(key);
+          methodCall.start(selector);
 
           // If timeout specified and first transition went smoothly, add to timeout watch set
           TAsyncClient client = methodCall.getClient();
diff --git a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
index 5568afb..e75f4ab 100644
--- a/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
+++ b/lib/java/src/org/apache/thrift/async/TAsyncMethodCall.java
@@ -39,7 +39,11 @@
  * @param <T>
  */
 public abstract class TAsyncMethodCall<T extends TAsyncMethodCall> {
+
+  private static final int INITIAL_MEMORY_BUFFER_SIZE = 128;
+
   public static enum State {
+    CONNECTING,
     WRITING_REQUEST_SIZE,
     WRITING_REQUEST_BODY,
     READING_RESPONSE_SIZE,
@@ -48,20 +52,22 @@
     ERROR;
   }
 
-  private static final int INITIAL_MEMORY_BUFFER_SIZE = 128;
+  /**
+   * Next step in the call, initialized by start()
+   */
+  private State state = null;
 
   protected final TNonblockingTransport transport;
   private final TProtocolFactory protocolFactory;
   protected final TAsyncClient client;
   private final AsyncMethodCallback<T> callback;
   private final boolean isOneway;
+
   private long lastTransitionTime;
 
   private ByteBuffer sizeBuffer;
   private final byte[] sizeBufferArray = new byte[4];
-
   private ByteBuffer frameBuffer;
-  private State state;
 
   protected TAsyncMethodCall(TAsyncClient client, TProtocolFactory protocolFactory, TNonblockingTransport transport, AsyncMethodCallback<T> callback, boolean isOneway) {
     this.transport = transport;
@@ -69,8 +75,7 @@
     this.protocolFactory = protocolFactory;
     this.client = client;
     this.isOneway = isOneway;
-
-    this.state = State.WRITING_REQUEST_SIZE;
+    this.lastTransitionTime = System.currentTimeMillis();
   }
 
   protected State getState() {
@@ -91,6 +96,10 @@
 
   protected abstract void write_args(TProtocol protocol) throws TException;
 
+  /**
+   * Initialize buffers.
+   * @throws TException if buffer initialization fails
+   */
   protected void prepareMethodCall() throws TException {
     TMemoryBuffer memoryBuffer = new TMemoryBuffer(INITIAL_MEMORY_BUFFER_SIZE);
     TProtocol protocol = protocolFactory.getProtocol(memoryBuffer);
@@ -103,10 +112,32 @@
     sizeBuffer = ByteBuffer.wrap(sizeBufferArray);
   }
 
-  SelectionKey registerWithSelector(Selector sel) throws IOException {
-    SelectionKey key = transport.registerSelector(sel, SelectionKey.OP_WRITE);
+  /**
+   * Register with selector and start first state, which could be either connecting or writing.
+   * @throws IOException if register or starting fails
+   */
+  void start(Selector sel) throws IOException {
+    SelectionKey key;
+    if (transport.isOpen()) {
+      state = State.WRITING_REQUEST_SIZE;
+      key = transport.registerSelector(sel, SelectionKey.OP_WRITE);
+    } else {
+      state = State.CONNECTING;
+      key = transport.registerSelector(sel, SelectionKey.OP_CONNECT);
+
+      // non-blocking connect can complete immediately,
+      // in which case we should not expect the OP_CONNECT
+      if (transport.startConnect()) {
+        registerForFirstWrite(key);
+      }
+    }
+
     key.attach(this);
-    return key;
+  }
+
+  void registerForFirstWrite(SelectionKey key) throws IOException {
+    state = State.WRITING_REQUEST_SIZE;
+    key.interestOps(SelectionKey.OP_WRITE);
   }
 
   protected ByteBuffer getFrameBuffer() {
@@ -131,6 +162,9 @@
     // Transition function
     try {
       switch (state) {
+        case CONNECTING:
+          doConnecting(key);
+          break;
         case WRITING_REQUEST_SIZE:
           doWritingRequestSize();
           break;
@@ -143,9 +177,8 @@
         case READING_RESPONSE_BODY:
           doReadingResponseBody(key);
           break;
-        case RESPONSE_READ:
-        case ERROR:
-          throw new IllegalStateException("Method call in state " + state 
+        default: // RESPONSE_READ, ERROR, or bug
+          throw new IllegalStateException("Method call in state " + state
               + " but selector called transition method. Seems like a bug...");
       }
       lastTransitionTime = System.currentTimeMillis();
@@ -157,9 +190,9 @@
   }
 
   protected void onError(Throwable e) {
-    state = State.ERROR;
     client.onError(e);
     callback.onError(e);
+    state = State.ERROR;
   }
 
   private void doReadingResponseBody(SelectionKey key) throws IOException {
@@ -213,4 +246,11 @@
       state = State.WRITING_REQUEST_BODY;
     }
   }
+
+  private void doConnecting(SelectionKey key) throws IOException {
+    if (!key.isConnectable() || !transport.finishConnect()) {
+      throw new IOException("not connectable or finishConnect returned false after we got an OP_CONNECT");
+    }
+    registerForFirstWrite(key);
+  }
 }
diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
index 31a6e24..8b98031 100644
--- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
@@ -62,7 +62,7 @@
     LoggerFactory.getLogger(TNonblockingServer.class.getName());
 
   // Flag for stopping the server
-  private volatile boolean stopped_;
+  private volatile boolean stopped_ = true;
 
   private SelectThread selectThread_;
 
@@ -218,6 +218,7 @@
     // start the selector
     try {
       selectThread_ = new SelectThread((TNonblockingServerTransport)serverTransport_);
+      stopped_ = false;
       selectThread_.start();
       return true;
     } catch (IOException e) {
@@ -265,6 +266,10 @@
     selectThread_.requestSelectInterestChange(frameBuffer);
   }
 
+  public boolean isStopped() {
+    return selectThread_.isStopped();
+  }
+
   /**
    * The thread that will be doing all the selecting, managing new connections
    * and those that still need to be read.
@@ -288,14 +293,24 @@
       serverTransport.registerSelector(selector);
     }
 
+    public boolean isStopped() {
+      return stopped_;
+    }
+
     /**
      * The work loop. Handles both selecting (all IO operations) and managing
      * the selection preferences of all existing connections.
      */
     public void run() {
-      while (!stopped_) {
-        select();
-        processInterestChanges();
+      try {
+        while (!stopped_) {
+          select();
+          processInterestChanges();
+        }
+      } catch (Throwable t) {
+        LOGGER.error("run() exiting due to uncaught error", t);
+      } finally {
+        stopped_ = true;
       }
     }
 
diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
index 313ef85..c3787a2 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingSocket.java
@@ -23,102 +23,92 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.net.SocketAddress;
 import java.net.SocketException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 
+import org.apache.thrift.async.TAsyncMethodCall;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
- * Socket implementation of the TTransport interface. To be commented soon!
+ * Transport for use with async client.
  */
 public class TNonblockingSocket extends TNonblockingTransport {
 
-  private SocketChannel socketChannel = null;
+  private static final Logger LOGGER = LoggerFactory.getLogger(TNonblockingSocket.class.getName());
 
   /**
-   * Wrapped Socket object
+   * Host and port if passed in, used for lazy non-blocking connect.
    */
-  private Socket socket_ = null;
+  private SocketAddress socketAddress_ = null;
+
+  private final SocketChannel socketChannel_;
+
+  private final Socket socket_;
+
+  public TNonblockingSocket(String host, int port) throws IOException {
+    this(host, port, 0);
+  }
 
   /**
-   * Socket timeout
-   */
-  private int timeout_ = 0;
-
-  /**
-   * Create a new nonblocking socket transport connected to host:port.
+   * Create a new nonblocking socket transport that will be connected to host:port.
    * @param host
    * @param port
    * @throws TTransportException
    * @throws IOException
    */
-  public TNonblockingSocket(String host, int port) throws TTransportException, IOException {
-    this(SocketChannel.open(new InetSocketAddress(host, port)));
+  public TNonblockingSocket(String host, int port, int timeout) throws IOException {
+    this(SocketChannel.open(), timeout);
+    socketAddress_ = new InetSocketAddress(host, port);
   }
 
   /**
    * Constructor that takes an already created socket.
    *
    * @param socketChannel Already created SocketChannel object
-   * @throws TTransportException if there is an error setting up the streams
+   * @throws IOException if there is an error setting up the streams
    */
-  public TNonblockingSocket(SocketChannel socketChannel) throws TTransportException {
-    try {
-      // make it a nonblocking channel
-      socketChannel.configureBlocking(false);
-    } catch (IOException e) {
-      throw new TTransportException(e);
-    }
+  public TNonblockingSocket(SocketChannel socketChannel) throws IOException {
+    this(socketChannel, 0);
+    if (!socketChannel.isConnected()) throw new IOException("Socket must already be connected");
+  }
 
-    this.socketChannel = socketChannel;
-    this.socket_ = socketChannel.socket();
-    try {
-      socket_.setSoLinger(false, 0);
-      socket_.setTcpNoDelay(true);
-    } catch (SocketException sx) {
-      sx.printStackTrace();
-    }
+  private TNonblockingSocket(SocketChannel socketChannel, int timeout) throws IOException {
+    socketChannel_ = socketChannel;
+    socket_ = socketChannel.socket();
+
+    // make it a nonblocking channel
+    socketChannel.configureBlocking(false);
+    socket_.setSoLinger(false, 0);
+    socket_.setTcpNoDelay(true);
+    setTimeout(timeout);
   }
 
   /**
-   * Register this socket with the specified selector for both read and write
-   * operations.
+   * Register the new SocketChannel with our Selector, indicating
+   * we'd like to be notified when it's ready for I/O.
    *
    * @param selector
    * @return the selection key for this socket.
    */
   public SelectionKey registerSelector(Selector selector, int interests) throws IOException {
-    // Register the new SocketChannel with our Selector, indicating
-    // we'd like to be notified when there's data waiting to be read
-    return socketChannel.register(selector, interests);
+    return socketChannel_.register(selector, interests);
   }
 
   /**
-   * Initializes the socket object
-   */
-  private void initSocket() {
-    socket_ = new Socket();
-    try {
-      socket_.setSoLinger(false, 0);
-      socket_.setTcpNoDelay(true);
-      socket_.setSoTimeout(timeout_);
-    } catch (SocketException sx) {
-      sx.printStackTrace();
-    }
-  }
-
-  /**
-   * Sets the socket timeout
+   * Sets the socket timeout, although this implementation never uses blocking operations so it is unused.
    *
    * @param timeout Milliseconds timeout
    */
   public void setTimeout(int timeout) {
-    timeout_ = timeout;
     try {
       socket_.setSoTimeout(timeout);
     } catch (SocketException sx) {
-      sx.printStackTrace();
+      LOGGER.warn("Could not set socket timeout.", sx);
     }
   }
 
@@ -126,9 +116,6 @@
    * Returns a reference to the underlying socket.
    */
   public Socket getSocket() {
-    if (socket_ == null) {
-      initSocket();
-    }
     return socket_;
   }
 
@@ -136,24 +123,21 @@
    * Checks whether the socket is connected.
    */
   public boolean isOpen() {
-    if (socket_ == null) {
-      return false;
-    }
     return socket_.isConnected();
   }
 
   /**
-   * Connects the socket, creating a new socket object if necessary.
+   * Do not call, the implementation provides its own lazy non-blocking connect.
    */
   public void open() throws TTransportException {
-    throw new RuntimeException("Not implemented yet");
+    throw new RuntimeException("open() is not implemented for TNonblockingSocket");
   }
 
   /**
    * Perform a nonblocking read into buffer.
    */
   public int read(ByteBuffer buffer) throws IOException {
-    return socketChannel.read(buffer);
+    return socketChannel_.read(buffer);
   }
 
 
@@ -161,12 +145,12 @@
    * Reads from the underlying input stream if not null.
    */
   public int read(byte[] buf, int off, int len) throws TTransportException {
-    if ((socketChannel.validOps() & SelectionKey.OP_READ) != SelectionKey.OP_READ) {
+    if ((socketChannel_.validOps() & SelectionKey.OP_READ) != SelectionKey.OP_READ) {
       throw new TTransportException(TTransportException.NOT_OPEN,
         "Cannot read from write-only socket channel");
     }
     try {
-      return socketChannel.read(ByteBuffer.wrap(buf, off, len));
+      return socketChannel_.read(ByteBuffer.wrap(buf, off, len));
     } catch (IOException iox) {
       throw new TTransportException(TTransportException.UNKNOWN, iox);
     }
@@ -176,26 +160,26 @@
    * Perform a nonblocking write of the data in buffer;
    */
   public int write(ByteBuffer buffer) throws IOException {
-    return socketChannel.write(buffer);
+    return socketChannel_.write(buffer);
   }
 
   /**
    * Writes to the underlying output stream if not null.
    */
   public void write(byte[] buf, int off, int len) throws TTransportException {
-    if ((socketChannel.validOps() & SelectionKey.OP_WRITE) != SelectionKey.OP_WRITE) {
+    if ((socketChannel_.validOps() & SelectionKey.OP_WRITE) != SelectionKey.OP_WRITE) {
       throw new TTransportException(TTransportException.NOT_OPEN,
         "Cannot write to write-only socket channel");
     }
     try {
-      socketChannel.write(ByteBuffer.wrap(buf, off, len));
+      socketChannel_.write(ByteBuffer.wrap(buf, off, len));
     } catch (IOException iox) {
       throw new TTransportException(TTransportException.UNKNOWN, iox);
     }
   }
 
   /**
-   * Flushes the underlying output stream if not null.
+   * Noop.
    */
   public void flush() throws TTransportException {
     // Not supported by SocketChannel.
@@ -206,10 +190,20 @@
    */
   public void close() {
     try {
-      socketChannel.close();
-    } catch (IOException e) {
-      // silently ignore.
+      socketChannel_.close();
+    } catch (IOException iox) {
+      LOGGER.warn("Could not close socket.", iox);
     }
   }
 
+  /** {@inheritDoc} */
+  public boolean startConnect() throws IOException {
+    return socketChannel_.connect(socketAddress_);
+  }
+
+  /** {@inheritDoc} */
+  public boolean finishConnect() throws IOException {
+    return socketChannel_.finishConnect();
+  }
+
 }
diff --git a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
index 517eacb..faf501f 100644
--- a/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
+++ b/lib/java/src/org/apache/thrift/transport/TNonblockingTransport.java
@@ -24,8 +24,25 @@
 import java.nio.channels.SelectionKey;
 import java.nio.ByteBuffer;
 
+import org.apache.thrift.async.TAsyncMethodCall;
+
 public abstract class TNonblockingTransport extends TTransport {
+
+  /**
+   * Non-blocking connection initialization.
+   * @see java.nio.channels.SocketChannel#connect(SocketAddress remote)
+   */
+  public abstract boolean startConnect() throws IOException;
+
+  /**
+   * Non-blocking connection completion.
+   * @see java.nio.channels.SocketChannel#finishConnect()
+   */
+  public abstract boolean finishConnect() throws IOException;
+
   public abstract SelectionKey registerSelector(Selector selector, int interests) throws IOException;
+
   public abstract int read(ByteBuffer buffer) throws IOException;
+
   public abstract int write(ByteBuffer buffer) throws IOException;
 }
diff --git a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
index 2e26aad..5e1084c 100644
--- a/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
+++ b/lib/java/test/org/apache/thrift/async/TestTAsyncClientManager.java
@@ -18,8 +18,12 @@
  */
 package org.apache.thrift.async;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -27,10 +31,10 @@
 
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.ServerTestBase;
+import org.apache.thrift.server.THsHaServer;
 import org.apache.thrift.transport.TNonblockingServerSocket;
 import org.apache.thrift.transport.TNonblockingSocket;
-import org.apache.thrift.server.ServerTestBase;
 
 import thrift.test.CompactProtoTestStruct;
 import thrift.test.Srv;
@@ -41,11 +45,16 @@
 import thrift.test.Srv.AsyncClient.voidMethod_call;
 
 public class TestTAsyncClientManager extends TestCase {
+  private static void fail(Throwable throwable) {
+    StringWriter sink = new StringWriter();
+    throwable.printStackTrace(new PrintWriter(sink, true));
+    fail("unexpected error " + sink.toString());
+  }
+
   private static abstract class FailureLessCallback<T extends TAsyncMethodCall> implements AsyncMethodCallback<T> {
     @Override
     public void onError(Throwable throwable) {
-      throwable.printStackTrace();
-      fail("unexpected error " + throwable);
+      fail(throwable);
     }
   }
 
@@ -66,7 +75,6 @@
       try {
         Thread.sleep(1000);
       } catch (InterruptedException e) {
-        // TODO Auto-generated catch block
         e.printStackTrace();
       }
       return 0;
@@ -98,6 +106,7 @@
       this.numCalls_ = numCalls;
       this.clientSocket_ = new TNonblockingSocket(ServerTestBase.HOST, ServerTestBase.PORT);
       this.client_ = new Srv.AsyncClient(new TBinaryProtocol.Factory(), acm_, clientSocket_);
+      this.client_.setTimeout(20000);
     }
 
     public int getNumSuccesses() {
@@ -105,55 +114,51 @@
     }
 
     public void run() {
-      for (int i = 0; i < numCalls_; i++) {
+      for (int i = 0; i < numCalls_ && !client_.hasError(); i++) {
+        final int iteration = i;
         try {
           // connect an async client
-          final Object o = new Object();
-
+          final CountDownLatch latch = new CountDownLatch(1);
           final AtomicBoolean jankyReturned = new AtomicBoolean(false);
           client_.Janky(1, new AsyncMethodCallback<Srv.AsyncClient.Janky_call>() {
+
             @Override
             public void onComplete(Janky_call response) {
               try {
                 assertEquals(3, response.getResult());
                 jankyReturned.set(true);
-                synchronized(o) {
-                  o.notifyAll();
-                }
+                latch.countDown();
               } catch (TException e) {
-                e.printStackTrace();
-                synchronized(o) {
-                  o.notifyAll();
-                }
-                fail("unexpected exception: " + e);
+                latch.countDown();
+                fail(e);
               }
             }
 
             @Override
             public void onError(Throwable throwable) {
-              System.out.println(throwable.toString());
-              synchronized(o) {
-                o.notifyAll();
+              try {
+                StringWriter sink = new StringWriter();
+                throwable.printStackTrace(new PrintWriter(sink, true));
+                fail("unexpected onError on iteration " + iteration + ": " + sink.toString());
+              } finally {
+                latch.countDown();
               }
-              fail("unexpected exception: " + throwable);
             }
           });
 
-          synchronized(o) {
-            o.wait(1000);
-          }
-
-          assertTrue(jankyReturned.get());
+          boolean calledBack = latch.await(30, TimeUnit.SECONDS);
+          assertTrue("wasn't called back in time on iteration " + iteration, calledBack);
+          assertTrue("onComplete not called on iteration " + iteration, jankyReturned.get());
           this.numSuccesses_++;
         } catch (Exception e) {
-          fail("Unexpected " + e);
+          fail(e);
         }
       }
     }
   }
 
   public void standardCallTest(Srv.AsyncClient client) throws Exception {
-    final Object o = new Object();
+    final CountDownLatch latch = new CountDownLatch(1);
     final AtomicBoolean jankyReturned = new AtomicBoolean(false);
     client.Janky(1, new FailureLessCallback<Srv.AsyncClient.Janky_call>() {
       @Override
@@ -162,23 +167,20 @@
           assertEquals(3, response.getResult());
           jankyReturned.set(true);
         } catch (TException e) {
-          fail("unexpected exception: " + e);
-        }
-        synchronized(o) {
-          o.notifyAll();
+          fail(e);
+        } finally {
+          latch.countDown();
         }
       }
     });
 
-    synchronized(o) {
-      o.wait(100000);
-    }
+    latch.await(100, TimeUnit.SECONDS);
     assertTrue(jankyReturned.get());
   }
 
   public void testIt() throws Exception {
     // put up a server
-    final TNonblockingServer s = new TNonblockingServer(new Srv.Processor(new SrvHandler()),
+    final THsHaServer s = new THsHaServer(new Srv.Processor(new SrvHandler()),
       new TNonblockingServerSocket(ServerTestBase.PORT));
     new Thread(new Runnable() {
       @Override
@@ -196,16 +198,17 @@
       ServerTestBase.HOST, ServerTestBase.PORT);
     Srv.AsyncClient client = new Srv.AsyncClient(new TBinaryProtocol.Factory(), acm, clientSock);
 
-    final Object o = new Object();
-
     // make a standard method call
     standardCallTest(client);
 
     // make a standard method call that succeeds within timeout
+    assertFalse(s.isStopped());
     client.setTimeout(5000);
     standardCallTest(client);
 
     // make a void method call
+    assertFalse(s.isStopped());
+    final CountDownLatch voidLatch = new CountDownLatch(1);
     final AtomicBoolean voidMethodReturned = new AtomicBoolean(false);
     client.voidMethod(new FailureLessCallback<Srv.AsyncClient.voidMethod_call>() {
       @Override
@@ -214,20 +217,18 @@
           response.getResult();
           voidMethodReturned.set(true);
         } catch (TException e) {
-          fail("unexpected exception " + e);
-        }
-        synchronized (o) {
-          o.notifyAll();
+          fail(e);
+        } finally {
+          voidLatch.countDown();
         }
       }
     });
-
-    synchronized(o) {
-      o.wait(1000);
-    }
+    voidLatch.await(1, TimeUnit.SECONDS);
     assertTrue(voidMethodReturned.get());
- 
+
     // make a oneway method call
+    assertFalse(s.isStopped());
+    final CountDownLatch onewayLatch = new CountDownLatch(1);
     final AtomicBoolean onewayReturned = new AtomicBoolean(false);
     client.onewayMethod(new FailureLessCallback<onewayMethod_call>() {
       @Override
@@ -236,20 +237,18 @@
           response.getResult();
           onewayReturned.set(true);
         } catch (TException e) {
-          fail("unexpected exception " + e);
-        }
-        synchronized(o) {
-          o.notifyAll();
+          fail(e);
+        } finally {
+          onewayLatch.countDown();
         }
       }
     });
-    synchronized(o) {
-      o.wait(1000);
-    }
-
+    onewayLatch.await(1, TimeUnit.SECONDS);
     assertTrue(onewayReturned.get());
 
     // make another standard method call
+    assertFalse(s.isStopped());
+    final CountDownLatch voidAfterOnewayLatch = new CountDownLatch(1);
     final AtomicBoolean voidAfterOnewayReturned = new AtomicBoolean(false);
     client.voidMethod(new FailureLessCallback<voidMethod_call>() {
       @Override
@@ -258,20 +257,18 @@
           response.getResult();
           voidAfterOnewayReturned.set(true);
         } catch (TException e) {
-          fail("unexpected exception " + e);
-        }
-        synchronized(o) {
-          o.notifyAll();
+          fail(e);
+        } finally {
+          voidAfterOnewayLatch.countDown();
         }
       }
     });
-    synchronized(o) {
-      o.wait(1000);
-    }
+    voidAfterOnewayLatch.await(1, TimeUnit.SECONDS);
     assertTrue(voidAfterOnewayReturned.get());
 
     // make multiple calls with deserialization in the selector thread (repro Eric's issue)
-    int numThreads = 200;
+    assertFalse(s.isStopped());
+    int numThreads = 50;
     int numCallsPerThread = 100;
     List<JankyRunnable> runnables = new ArrayList<JankyRunnable>();
     List<Thread> threads = new ArrayList<Thread>();
@@ -289,34 +286,38 @@
     for (JankyRunnable runnable : runnables) {
       numSuccesses += runnable.getNumSuccesses();
     }
-    assertEquals(numSuccesses, numThreads * numCallsPerThread);
+    assertEquals(numThreads * numCallsPerThread, numSuccesses);
 
     // check that timeouts work
+    assertFalse(s.isStopped());
+    final CountDownLatch timeoutLatch = new CountDownLatch(1);
     client.setTimeout(100);
     client.primitiveMethod(new AsyncMethodCallback<primitiveMethod_call>() {
 
       @Override
       public void onError(Throwable throwable) {
-        if (!(throwable instanceof TimeoutException)) {
-          fail("should have received timeout exception");
-          synchronized(o) {
-            o.notifyAll();
+        try {
+          if (!(throwable instanceof TimeoutException)) {
+            StringWriter sink = new StringWriter();
+            throwable.printStackTrace(new PrintWriter(sink, true));
+            fail("expected TimeoutException but got " + sink.toString());
           }
+        } finally {
+          timeoutLatch.countDown();
         }
       }
 
       @Override
       public void onComplete(primitiveMethod_call response) {
-        fail("should not have finished timed out call.");
-        synchronized(o) {
-          o.notifyAll();
+        try {
+          fail("should not have finished timed out call.");
+        } finally {
+          timeoutLatch.countDown();
         }
       }
 
     });
-    synchronized(o) {
-      o.wait(2000);
-    }
+    timeoutLatch.await(2, TimeUnit.SECONDS);
     assertTrue(client.hasError());
     assertTrue(client.getError() instanceof TimeoutException);
   }