(THRIFT-5) A TNonblockingServers (single-threaded and thread-pool) for Java

This patch adds two Thrift servers for Java that both use non-blocking I/O
to avoid locking up worker threads for idle connections.  The two classes are
- TNonblockingServer, which supports single-threaded serving
- THsHaServer, which performs I/O in one thread and method invocations in
  a configurable thread pool.
To support these servers, TNonblockingServerSocket and TNonblockingSocket
have been added.


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@673550 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/com/facebook/thrift/server/THsHaServer.java b/lib/java/src/com/facebook/thrift/server/THsHaServer.java
new file mode 100644
index 0000000..a8764ec
--- /dev/null
+++ b/lib/java/src/com/facebook/thrift/server/THsHaServer.java
@@ -0,0 +1,288 @@
+
+package com.facebook.thrift.server;
+
+import com.facebook.thrift.TException;
+import com.facebook.thrift.TProcessor;
+import com.facebook.thrift.TProcessorFactory;
+import com.facebook.thrift.protocol.TProtocol;
+import com.facebook.thrift.protocol.TProtocolFactory;
+import com.facebook.thrift.protocol.TBinaryProtocol;
+import com.facebook.thrift.transport.TNonblockingServerTransport;
+import com.facebook.thrift.transport.TTransport;
+import com.facebook.thrift.transport.TFramedTransport;
+import com.facebook.thrift.transport.TNonblockingTransport;
+import com.facebook.thrift.transport.TTransportException;
+import com.facebook.thrift.transport.TTransportFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import java.io.IOException;
+
+/**
+ * An extension of the TNonblockingServer to a Half-Sync/Half-Async server.
+ * Like TNonblockingServer, it relies on the use of TFramedTransport.
+ */
+public class THsHaServer extends TNonblockingServer {
+
+  // This wraps all the functionality of queueing and thread pool management
+  // for the passing of Invocations from the Selector to workers.
+  private ExecutorService invoker;
+
+  private final Options options;
+
+  /**
+   * Create server with given processor, and server transport. Default server
+   * options, TBinaryProtocol for the protocol, and TFramedTransport.Factory on
+   * both input and output transports. A TProcessorFactory will be created that
+   * always returns the specified processor.
+   */
+  public THsHaServer( TProcessor processor,
+                      TNonblockingServerTransport serverTransport) {
+    this(processor, serverTransport, new Options());
+  }
+
+  /**
+   * Create server with given processor, server transport, and server options
+   * using TBinaryProtocol for the protocol, and TFramedTransport.Factory on
+   * both input and output transports. A TProcessorFactory will be created that
+   * always returns the specified processor.
+   */
+  public THsHaServer( TProcessor processor,
+                      TNonblockingServerTransport serverTransport,
+                      Options options) {
+    this(new TProcessorFactory(processor), serverTransport, options);
+  }
+
+  /**
+   * Create server with specified processor factory and server transport. Uses
+   * default options. TBinaryProtocol is assumed. TFramedTransport.Factory is
+   * used on both input and output transports.
+   */
+  public THsHaServer( TProcessorFactory processorFactory,
+                      TNonblockingServerTransport serverTransport) {
+    this(processorFactory, serverTransport, new Options());
+  }
+
+  /**
+   * Create server with specified processor factory, server transport, and server
+   * options. TBinaryProtocol is assumed. TFramedTransport.Factory is used on
+   * both input and output transports.
+   */
+  public THsHaServer( TProcessorFactory processorFactory,
+                      TNonblockingServerTransport serverTransport,
+                      Options options) {
+    this(processorFactory, serverTransport, new TFramedTransport.Factory(),
+      new TBinaryProtocol.Factory(), options);
+  }
+
+  /**
+   * Server with specified processor, server transport, and in/out protocol
+   * factory. Defaults will be used for in/out transport factory and server
+   * options.
+   */
+  public THsHaServer( TProcessor processor,
+                      TNonblockingServerTransport serverTransport,
+                      TProtocolFactory protocolFactory) {
+    this(processor, serverTransport, protocolFactory, new Options());
+  }
+
+  /**
+   * Server with specified processor, server transport, and in/out protocol
+   * factory. Defaults will be used for in/out transport factory and server
+   * options.
+   */
+  public THsHaServer( TProcessor processor,
+                      TNonblockingServerTransport serverTransport,
+                      TProtocolFactory protocolFactory,
+                      Options options) {
+    this(processor, serverTransport, new TFramedTransport.Factory(),
+      protocolFactory);
+  }
+
+  /**
+   * Create server with specified processor, server transport, in/out
+   * transport factory, in/out protocol factory, and default server options. A
+   * processor factory will be created that always returns the specified
+   * processor.
+   */
+  public THsHaServer( TProcessor processor,
+                      TNonblockingServerTransport serverTransport,
+                      TFramedTransport.Factory transportFactory,
+                      TProtocolFactory protocolFactory) {
+    this(new TProcessorFactory(processor), serverTransport,
+      transportFactory, protocolFactory);
+  }
+
+  /**
+   * Create server with specified processor factory, server transport, in/out
+   * transport factory, in/out protocol factory, and default server options.
+   */
+  public THsHaServer( TProcessorFactory processorFactory,
+                      TNonblockingServerTransport serverTransport,
+                      TFramedTransport.Factory transportFactory,
+                      TProtocolFactory protocolFactory) {
+    this(processorFactory, serverTransport,
+      transportFactory, transportFactory,
+      protocolFactory, protocolFactory, new Options());
+  }
+
+  /**
+   * Create server with specified processor factory, server transport, in/out
+   * transport factory, in/out protocol factory, and server options.
+   */
+  public THsHaServer( TProcessorFactory processorFactory,
+                      TNonblockingServerTransport serverTransport,
+                      TFramedTransport.Factory transportFactory,
+                      TProtocolFactory protocolFactory,
+                      Options options) {
+    this(processorFactory, serverTransport,
+      transportFactory, transportFactory,
+      protocolFactory, protocolFactory,
+      options);
+  }
+
+  /**
+   * Create server with everything specified, except use default server options.
+   */
+  public THsHaServer( TProcessor processor,
+                      TNonblockingServerTransport serverTransport,
+                      TFramedTransport.Factory inputTransportFactory,
+                      TFramedTransport.Factory outputTransportFactory,
+                      TProtocolFactory inputProtocolFactory,
+                      TProtocolFactory outputProtocolFactory) {
+    this(new TProcessorFactory(processor), serverTransport,
+      inputTransportFactory, outputTransportFactory,
+      inputProtocolFactory, outputProtocolFactory);
+  }
+
+  /**
+   * Create server with everything specified, except use default server options.
+   */
+  public THsHaServer( TProcessorFactory processorFactory,
+                      TNonblockingServerTransport serverTransport,
+                      TFramedTransport.Factory inputTransportFactory,
+                      TFramedTransport.Factory outputTransportFactory,
+                      TProtocolFactory inputProtocolFactory,
+                      TProtocolFactory outputProtocolFactory)
+  {
+    this(processorFactory, serverTransport,
+      inputTransportFactory, outputTransportFactory,
+      inputProtocolFactory, outputProtocolFactory, new Options());
+  }
+
+  /**
+   * Create server with every option fully specified.
+   */
+  public THsHaServer( TProcessorFactory processorFactory,
+                      TNonblockingServerTransport serverTransport,
+                      TFramedTransport.Factory inputTransportFactory,
+                      TFramedTransport.Factory outputTransportFactory,
+                      TProtocolFactory inputProtocolFactory,
+                      TProtocolFactory outputProtocolFactory,
+                      Options options)
+  {
+    super(processorFactory, serverTransport,
+      inputTransportFactory, outputTransportFactory,
+      inputProtocolFactory, outputProtocolFactory);
+    this.options = options;
+  }
+
+  /** @inheritdoc */
+  @Override
+  public void serve() {
+    if (!startInvokerPool()) {
+      return;
+    }
+
+    // start listening, or exit
+    if (!startListening()) {
+      return;
+    }
+
+    // start the selector, or exit
+    if (!startSelectorThread()) {
+      return;
+    }
+
+    // this will block while we serve
+    joinSelector();
+
+    gracefullyShutdownInvokerPool();
+
+    // do a little cleanup
+    stopListening();
+
+    // ungracefully shut down the invoker pool?
+  }
+
+  protected boolean startInvokerPool() {
+    // start the invoker pool
+    LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
+    invoker = new ThreadPoolExecutor(options.minWorkerThreads,
+      options.maxWorkerThreads, options.stopTimeoutVal, options.stopTimeoutUnit,
+      queue);
+
+    return true;
+  }
+
+  protected void gracefullyShutdownInvokerPool() {
+    // try to gracefully shut down the executor service
+    invoker.shutdown();
+
+    // Loop until awaitTermination finally does return without a interrupted
+    // exception. If we don't do this, then we'll shut down prematurely. We want
+    // to let the executorService clear it's task queue, closing client sockets
+    // appropriately.
+    long timeoutMS = 10000;
+    long now = System.currentTimeMillis();
+    while (timeoutMS >= 0) {
+      try {
+        invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
+        break;
+      } catch (InterruptedException ix) {
+        long newnow = System.currentTimeMillis();
+        timeoutMS -= (newnow - now);
+        now = newnow;
+      }
+    }
+  }
+
+  /**
+   * We override the standard invoke method here to queue the invocation for
+   * invoker service instead of immediately invoking. The thread pool takes care of the rest.
+   */
+  @Override
+  protected void requestInvoke(FrameBuffer frameBuffer) {
+    invoker.execute(new Invocation(frameBuffer));
+  }
+
+  /**
+   * An Invocation represents a method call that is prepared to execute, given
+   * an idle worker thread. It contains the input and output protocols the
+   * thread's processor should use to perform the usual Thrift invocation.
+   */
+  private class Invocation implements Runnable {
+
+    private final FrameBuffer frameBuffer;
+
+    public Invocation(final FrameBuffer frameBuffer) {
+      this.frameBuffer = frameBuffer;
+    }
+
+    public void run() {
+      frameBuffer.invoke();
+    }
+  }
+
+  public static class Options {
+    public int minWorkerThreads = 5;
+    public int maxWorkerThreads = Integer.MAX_VALUE;
+    public int stopTimeoutVal = 60;
+    public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+  }
+}
diff --git a/lib/java/src/com/facebook/thrift/server/TNonblockingServer.java b/lib/java/src/com/facebook/thrift/server/TNonblockingServer.java
new file mode 100644
index 0000000..d09ec67
--- /dev/null
+++ b/lib/java/src/com/facebook/thrift/server/TNonblockingServer.java
@@ -0,0 +1,678 @@
+
+package com.facebook.thrift.server;
+
+import com.facebook.thrift.TException;
+import com.facebook.thrift.TProcessor;
+import com.facebook.thrift.TProcessorFactory;
+import com.facebook.thrift.protocol.TProtocol;
+import com.facebook.thrift.protocol.TProtocolFactory;
+import com.facebook.thrift.protocol.TBinaryProtocol;
+import com.facebook.thrift.transport.TNonblockingServerTransport;
+import com.facebook.thrift.transport.TIOStreamTransport;
+import com.facebook.thrift.transport.TTransport;
+import com.facebook.thrift.transport.TFramedTransport;
+import com.facebook.thrift.transport.TNonblockingTransport;
+import com.facebook.thrift.transport.TTransportException;
+import com.facebook.thrift.transport.TTransportFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.Iterator;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import java.io.IOException;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Selector;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.spi.SelectorProvider;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+
+/**
+ * A nonblocking TServer implementation. This allows for fairness amongst all
+ * connected clients in terms of invocations.
+ *
+ * This server is inherently single-threaded. If you want a limited thread pool
+ * coupled with invocation-fairness, see THsHaServer.
+ *
+ * To use this server, you MUST use a TFramedTransport at the outermost
+ * transport, otherwise this server will be unable to determine when a whole
+ * method call has been read off the wire. Clients must also use TFramedTransport.
+ */
+public class TNonblockingServer extends TServer {
+  private static final Logger LOGGER =
+    Logger.getLogger(TNonblockingServer.class.getName());
+
+  // Flag for stopping the server
+  private volatile boolean stopped_;
+
+  private SelectThread selectThread_;
+
+  /**
+   * Create server with given processor and server transport, using
+   * TBinaryProtocol for the protocol, TFramedTransport.Factory on both input
+   * and output transports. A TProcessorFactory will be created that always
+   * returns the specified processor.
+   */
+  public TNonblockingServer(TProcessor processor,
+                           TNonblockingServerTransport serverTransport) {
+    this(new TProcessorFactory(processor), serverTransport);
+  }
+
+  /**
+   * Create server with specified processor factory and server transport.
+   * TBinaryProtocol is assumed. TFramedTransport.Factory is used on both input
+   * and output transports.
+   */
+  public TNonblockingServer(TProcessorFactory processorFactory,
+                            TNonblockingServerTransport serverTransport) {
+    this(processorFactory, serverTransport,
+         new TFramedTransport.Factory(), new TFramedTransport.Factory(),
+         new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory());
+  }
+
+  public TNonblockingServer(TProcessor processor,
+                            TNonblockingServerTransport serverTransport,
+                            TProtocolFactory protocolFactory) {
+    this(processor, serverTransport,
+         new TFramedTransport.Factory(), new TFramedTransport.Factory(),
+         protocolFactory, protocolFactory);
+  }
+
+  public TNonblockingServer(TProcessor processor,
+                            TNonblockingServerTransport serverTransport,
+                            TFramedTransport.Factory transportFactory,
+                            TProtocolFactory protocolFactory) {
+    this(processor, serverTransport,
+         transportFactory, transportFactory,
+         protocolFactory, protocolFactory);
+  }
+
+  public TNonblockingServer(TProcessorFactory processorFactory,
+                            TNonblockingServerTransport serverTransport,
+                            TFramedTransport.Factory transportFactory,
+                            TProtocolFactory protocolFactory) {
+    this(processorFactory, serverTransport,
+         transportFactory, transportFactory,
+         protocolFactory, protocolFactory);
+  }
+
+  public TNonblockingServer(TProcessor processor,
+                            TNonblockingServerTransport serverTransport,
+                            TFramedTransport.Factory inputTransportFactory,
+                            TFramedTransport.Factory outputTransportFactory,
+                            TProtocolFactory inputProtocolFactory,
+                            TProtocolFactory outputProtocolFactory) {
+    this(new TProcessorFactory(processor), serverTransport,
+         inputTransportFactory, outputTransportFactory,
+         inputProtocolFactory, outputProtocolFactory);
+  }
+
+  public TNonblockingServer(TProcessorFactory processorFactory,
+                            TNonblockingServerTransport serverTransport,
+                            TFramedTransport.Factory inputTransportFactory,
+                            TFramedTransport.Factory outputTransportFactory,
+                            TProtocolFactory inputProtocolFactory,
+                            TProtocolFactory outputProtocolFactory) {
+    super(processorFactory, serverTransport,
+          inputTransportFactory, outputTransportFactory,
+          inputProtocolFactory, outputProtocolFactory);
+  }
+
+  /**
+   * Begin accepting connections and processing invocations.
+   */
+  public void serve() {
+    // start listening, or exit
+    if (!startListening()) {
+      return;
+    }
+
+    // start the selector, or exit
+    if (!startSelectorThread()) {
+      return;
+    }
+
+    // this will block while we serve
+    joinSelector();
+
+    // do a little cleanup
+    stopListening();
+  }
+
+  /**
+   * Have the server transport start accepting connections.
+   *
+   * @return true if we started listening successfully, false if something went
+   * wrong.
+   */
+  protected boolean startListening() {
+    try {
+      serverTransport_.listen();
+      return true;
+    } catch (TTransportException ttx) {
+      LOGGER.log(Level.SEVERE, "Failed to start listening on server socket!", ttx);
+      return false;
+    }
+  }
+
+  /**
+   * Stop listening for conections.
+   */
+  protected void stopListening() {
+    serverTransport_.close();
+  }
+
+  /**
+   * Start the selector thread running to deal with clients.
+   *
+   * @return true if everything went ok, false if we couldn't start for some
+   * reason.
+   */
+  protected boolean startSelectorThread() {
+    // start the selector
+    try {
+      selectThread_ = new SelectThread((TNonblockingServerTransport)serverTransport_);
+      selectThread_.start();
+      return true;
+    } catch (IOException e) {
+      LOGGER.log(Level.SEVERE, "Failed to start selector thread!", e);
+      return false;
+    }
+  }
+
+  /**
+   * Block until the selector exits.
+   */
+  protected void joinSelector() {
+    // wait until the selector thread exits
+    try {
+      selectThread_.join();
+    } catch (InterruptedException e) {
+      // for now, just silently ignore. technically this means we'll have less of
+      // a graceful shutdown as a result.
+    }
+  }
+
+  /**
+   * Stop serving and shut everything down.
+   */
+  public void stop() {
+    stopped_ = true;
+    selectThread_.wakeupSelector();
+  }
+
+  /**
+   * Perform an invocation. This method could behave several different ways
+   * - invoke immediately inline, queue for separate execution, etc.
+   */
+  protected void requestInvoke(FrameBuffer frameBuffer) {
+    frameBuffer.invoke();
+  }
+
+  /**
+   * A FrameBuffer wants to change its selection preferences, but might not be
+   * in the select thread.
+   */
+  protected void requestSelectInterestChange(FrameBuffer frameBuffer) {
+    selectThread_.requestSelectInterestChange(frameBuffer);
+  }
+
+  /**
+   * The thread that will be doing all the selecting, managing new connections
+   * and those that still need to be read.
+   */
+  protected class SelectThread extends Thread {
+
+    private final TNonblockingServerTransport serverTransport;
+    private final Selector selector;
+
+    // List of FrameBuffers that want to change their selection interests.
+    private final Set<FrameBuffer> selectInterestChanges =
+      new HashSet<FrameBuffer>();
+
+    /**
+     * Set up the SelectorThread.
+     */
+    public SelectThread(final TNonblockingServerTransport serverTransport)
+    throws IOException {
+      this.serverTransport = serverTransport;
+      this.selector = SelectorProvider.provider().openSelector();
+      serverTransport.registerSelector(selector);
+    }
+
+    /**
+     * The work loop. Handles both selecting (all IO operations) and managing
+     * the selection preferences of all existing connections.
+     */
+    public void run() {
+      while (!stopped_) {
+        select();
+        processInterestChanges();
+      }
+    }
+
+    /**
+     * If the selector is blocked, wake it up.
+     */
+    public void wakeupSelector() {
+      selector.wakeup();
+    }
+
+    /**
+     * Add FrameBuffer to the list of select interest changes and wake up the
+     * selector if it's blocked. When the select() call exits, it'll give the
+     * FrameBuffer a chance to change its interests.
+     */
+    public void requestSelectInterestChange(FrameBuffer frameBuffer) {
+      synchronized (selectInterestChanges) {
+        selectInterestChanges.add(frameBuffer);
+      }
+      // wakeup the selector, if it's currently blocked.
+      selector.wakeup();
+    }
+
+    /**
+     * Select and process IO events appropriately:
+     * If there are connections to be accepted, accept them.
+     * If there are existing connections with data waiting to be read, read it,
+     * bufferring until a whole frame has been read.
+     * If there are any pending responses, buffer them until their target client
+     * is available, and then send the data.
+     */
+    private void select() {
+      try {
+        // wait for io events.
+        selector.select();
+
+        // process the io events we received
+        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
+        while (!stopped_ && selectedKeys.hasNext()) {
+          SelectionKey key = selectedKeys.next();
+          selectedKeys.remove();
+
+          // skip if not valid
+          if (!key.isValid()) {
+            cleanupSelectionkey(key);
+            continue;
+          }
+
+          // if the key is marked Accept, then it has to be the server
+          // transport.
+          if (key.isAcceptable()) {
+            handleAccept();
+          } else if (key.isReadable()) {
+            // deal with reads
+            handleRead(key);
+          } else if (key.isWritable()) {
+            // deal with writes
+            handleWrite(key);
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.log(Level.WARNING, "Got an IOException while selecting!", e);
+      }
+    }
+
+    /**
+     * Check to see if there are any FrameBuffers that have switched their
+     * interest type from read to write or vice versa.
+     */
+    private void processInterestChanges() {
+      synchronized (selectInterestChanges) {
+        for (FrameBuffer fb : selectInterestChanges) {
+          fb.changeSelectInterests();
+        }
+        selectInterestChanges.clear();
+      }
+    }
+
+    /**
+     * Accept a new connection.
+     */
+    private void handleAccept() throws IOException {
+      SelectionKey clientKey = null;
+      try {
+        // accept the connection
+        TNonblockingTransport client = (TNonblockingTransport)serverTransport.accept();
+        clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
+
+        // add this key to the map
+        FrameBuffer frameBuffer = new FrameBuffer(client, clientKey);
+        clientKey.attach(frameBuffer);
+      } catch (TTransportException tte) {
+        // something went wrong accepting.
+        cleanupSelectionkey(clientKey);
+        LOGGER.log(Level.WARNING, "Exception trying to accept!", tte);
+        tte.printStackTrace();
+      }
+    }
+
+    /**
+     * Do the work required to read from a readable client. If the frame is
+     * fully read, then invoke the method call.
+     */
+    private void handleRead(SelectionKey key) {
+      FrameBuffer buffer = (FrameBuffer)key.attachment();
+      if (buffer.read()) {
+        // if the buffer's frame read is complete, invoke the method.
+        if (buffer.isFrameFullyRead()) {
+          requestInvoke(buffer);
+        }
+      } else {
+        cleanupSelectionkey(key);
+      }
+    }
+
+    /**
+     * Let a writable client get written, if there's data to be written.
+     */
+    private void handleWrite(SelectionKey key) {
+      FrameBuffer buffer = (FrameBuffer)key.attachment();
+      if (!buffer.write()) {
+        cleanupSelectionkey(key);
+      }
+    }
+
+    /**
+     * Do connection-close cleanup on a given SelectionKey.
+     */
+    private void cleanupSelectionkey(SelectionKey key) {
+      // remove the records from the two maps
+      FrameBuffer buffer = (FrameBuffer)key.attachment();
+      if (buffer != null) {
+        // close the buffer
+        buffer.close();
+      }
+      // cancel the selection key
+      key.cancel();
+    }
+  } // SelectorThread
+
+  /**
+   * Class that implements a sort of state machine around the interaction with
+   * a client and an invoker. It manages reading the frame size and frame data,
+   * getting it handed off as wrapped transports, and then the writing of
+   * reponse data back to the client. In the process it manages flipping the
+   * read and write bits on the selection key for its client.
+   */
+  protected class FrameBuffer {
+    //
+    // Possible states for the FrameBuffer state machine.
+    //
+    // in the midst of reading the frame size off the wire
+    private static final int READING_FRAME_SIZE = 1;
+    // reading the actual frame data now, but not all the way done yet
+    private static final int READING_FRAME = 2;
+    // completely read the frame, so an invocation can now happen
+    private static final int READ_FRAME_COMPLETE = 3;
+    // waiting to get switched to listening for write events
+    private static final int AWAITING_REGISTER_WRITE = 4;
+    // started writing response data, not fully complete yet
+    private static final int WRITING = 6;
+    // another thread wants this framebuffer to go back to reading
+    private static final int AWAITING_REGISTER_READ = 7;
+
+    //
+    // Instance variables
+    //
+
+    // the actual transport hooked up to the client.
+    private final TNonblockingTransport trans_;
+
+    // the SelectionKey that corresponds to our transport
+    private final SelectionKey selectionKey_;
+
+    // where in the process of reading/writing are we?
+    private int state_ = READING_FRAME_SIZE;
+
+    // the ByteBuffer we'll be using to write and read, depending on the state
+    private ByteBuffer buffer_;
+
+    private ByteArrayOutputStream response_;
+
+    public FrameBuffer( final TNonblockingTransport trans,
+                        final SelectionKey selectionKey) {
+      trans_ = trans;
+      selectionKey_ = selectionKey;
+      buffer_ = ByteBuffer.allocate(4);
+    }
+
+    /**
+     * Give this FrameBuffer a chance to read. The selector loop should have
+     * received a read event for this FrameBuffer.
+     *
+     * @return true if the connection should live on, false if it should be
+     * closed
+     */
+    public boolean read() {
+      if (state_ == READING_FRAME_SIZE) {
+        // try to read the frame size completely
+        if (!internalRead()) {
+          return false;
+        }
+
+        // if the frame size has been read completely, then prepare to read the
+        // actual frame.
+        if (buffer_.remaining() == 0) {
+          // pull out the frame size as an integer.
+          int frameSize = buffer_.getInt(0);
+          if (frameSize <= 0) {
+            LOGGER.severe("Read an invalid frame size of " + frameSize
+              + ". Are you using TFramedTransport on the client side?");
+            return false;
+          }
+          // reallocate the readbuffer as a frame-sized buffer
+          buffer_ = ByteBuffer.allocate(frameSize + 4);
+          // put the frame size at the head of the buffer
+          buffer_.putInt(frameSize);
+
+          state_ = READING_FRAME;
+        } else {
+          // this skips the check of READING_FRAME state below, since we can't
+          // possibly go on to that state if there's data left to be read at
+          // this one.
+          return true;
+        }
+      }
+
+      // it is possible to fall through from the READING_FRAME_SIZE section
+      // to READING_FRAME if there's already some frame data available once
+      // READING_FRAME_SIZE is complete.
+
+      if (state_ == READING_FRAME) {
+        if (!internalRead()) {
+          return false;
+        }
+
+        // since we're already in the select loop here for sure, we can just
+        // modify our selection key directly.
+        if (buffer_.remaining() == 0) {
+          // get rid of the read select interests
+          selectionKey_.interestOps(0);
+          state_ = READ_FRAME_COMPLETE;
+        }
+
+        return true;
+      }
+
+      // if we fall through to this point, then the state must be invalid.
+      LOGGER.severe("Read was called but state is invalid (" + state_ + ")");
+      return false;
+    }
+
+    /**
+     * Give this FrameBuffer a chance to write its output to the final client.
+     */
+    public boolean write() {
+      if (state_ == WRITING) {
+        try {
+          if (trans_.write(buffer_) < 0) {
+            return false;
+          }
+        } catch (IOException e) {
+          LOGGER.log(Level.WARNING, "Got an IOException during write!", e);
+          return false;
+        }
+
+        // we're done writing. now we need to switch back to reading.
+        if (buffer_.remaining() == 0) {
+          prepareRead();
+        }
+        return true;
+      }
+
+      LOGGER.severe("Write was called, but state is invalid (" + state_ + ")");
+      return false;
+    }
+
+    /**
+     * Give this FrameBuffer a chance to set its interest to write, once data
+     * has come in.
+     */
+    public void changeSelectInterests() {
+      if (state_ == AWAITING_REGISTER_WRITE) {
+        // set the OP_WRITE interest
+        selectionKey_.interestOps(SelectionKey.OP_WRITE);
+        state_ = WRITING;
+      } else if (state_ == AWAITING_REGISTER_READ) {
+        prepareRead();
+      } else {
+        LOGGER.severe(
+          "changeSelectInterest was called, but state is invalid ("
+          + state_ + ")");
+      }
+    }
+
+    /**
+     * Shut the connection down.
+     */
+    public void close() {
+      trans_.close();
+    }
+
+    /**
+     * Check if this FrameBuffer has a full frame read.
+     */
+    public boolean isFrameFullyRead() {
+      return state_ == READ_FRAME_COMPLETE;
+    }
+
+    /**
+     * After the processor has processed the invocation, whatever thread is
+     * managing invocations should call this method on this FrameBuffer so we
+     * know it's time to start trying to write again. Also, if it turns out
+     * that there actually isn't any data in the response buffer, we'll skip
+     * trying to write and instead go back to reading.
+     */
+    public void responseReady() {
+      // capture the data we want to write as a byte array.
+      byte[] bytes = response_.toByteArray();
+
+      if (bytes.length <= 0) {
+        // go straight to reading again. this was probably an async method
+        state_ = AWAITING_REGISTER_READ;
+      } else {
+        buffer_ = ByteBuffer.wrap(bytes);
+
+        // set state that we're waiting to be switched to write. we do this
+        // asynchronously through requestSelectInterestChange() because there is a
+        // possibility that we're not in the main thread, and thus currently
+        // blocked in select(). (this functionality is in place for the sake of
+        // the HsHa server.)
+        state_ = AWAITING_REGISTER_WRITE;
+      }
+      requestSelectInterestChange();
+    }
+
+    /**
+     * Actually invoke the method signified by this FrameBuffer.
+     */
+    public void invoke() {
+      TTransport inTrans = getInputTransport();
+      TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
+      TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());
+
+      try {
+        processorFactory_.getProcessor(inTrans).process(inProt, outProt);
+        responseReady();
+      } catch (TException te) {
+        LOGGER.log(Level.WARNING, "Exception while invoking!", te);
+      }
+    }
+
+    /**
+     * Wrap the read buffer in a memory-based transport so a processor can read
+     * the data it needs to handle an invocation.
+     */
+    private TTransport getInputTransport() {
+      return inputTransportFactory_.getTransport(new TIOStreamTransport(
+        new ByteArrayInputStream(buffer_.array())));
+    }
+
+    /**
+     * Get the transport that should be used by the invoker for responding.
+     */
+    private TTransport getOutputTransport() {
+      response_ = new ByteArrayOutputStream();
+      return outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
+    }
+
+    /**
+     * Perform a read into buffer.
+     *
+     * @return true if the read succeeded, false if there was an error or the
+     * connection closed.
+     */
+    private boolean internalRead() {
+      try {
+        if (trans_.read(buffer_) < 0) {
+          return false;
+        }
+        return true;
+      } catch (IOException e) {
+        LOGGER.log(Level.WARNING, "Got an IOException in internalRead!", e);
+        return false;
+      }
+    }
+
+    /**
+     * We're done writing, so reset our interest ops and change state accordingly.
+     */
+    private void prepareRead() {
+      // we can set our interest directly without using the queue because
+      // we're in the select thread.
+      selectionKey_.interestOps(SelectionKey.OP_READ);
+      // get ready for another go-around
+      buffer_ = ByteBuffer.allocate(4);
+      state_ = READING_FRAME_SIZE;
+    }
+
+    /**
+     * When this FrameBuffer needs to change it's select interests and execution
+     * might not be in the select thread, then this method will make sure the
+     * interest change gets done when the select thread wakes back up. When the
+     * current thread is the select thread, then it just does the interest change
+     * immediately.
+     */
+    private void requestSelectInterestChange() {
+      if (Thread.currentThread() == selectThread_) {
+        changeSelectInterests();
+      } else {
+        TNonblockingServer.this.requestSelectInterestChange(this);
+      }
+    }
+  } // FrameBuffer
+
+}
diff --git a/lib/java/src/com/facebook/thrift/transport/TFramedTransport.java b/lib/java/src/com/facebook/thrift/transport/TFramedTransport.java
index a9ad4b3..e7dfcef 100644
--- a/lib/java/src/com/facebook/thrift/transport/TFramedTransport.java
+++ b/lib/java/src/com/facebook/thrift/transport/TFramedTransport.java
@@ -43,6 +43,15 @@
    */
   private boolean frameWrite_ = true;
 
+  public static class Factory extends TTransportFactory {
+    public Factory() {
+    }
+
+    public TTransport getTransport(TTransport base) {
+      return new TFramedTransport(base);
+    }
+  }
+
   /**
    * Constructor wraps around another tranpsort
    */
diff --git a/lib/java/src/com/facebook/thrift/transport/TNonblockingServerSocket.java b/lib/java/src/com/facebook/thrift/transport/TNonblockingServerSocket.java
new file mode 100644
index 0000000..44ed0d8
--- /dev/null
+++ b/lib/java/src/com/facebook/thrift/transport/TNonblockingServerSocket.java
@@ -0,0 +1,143 @@
+
+package com.facebook.thrift.transport;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ClosedChannelException;
+
+/**
+ * Wrapper around ServerSocketChannel
+ */
+public class TNonblockingServerSocket extends TNonblockingServerTransport {
+
+  /**
+   * This channel is where all the nonblocking magic happens.
+   */
+  private ServerSocketChannel serverSocketChannel = null;
+
+  /**
+   * Underlying serversocket object
+   */
+  private ServerSocket serverSocket_ = null;
+
+  /**
+   * Port to listen on
+   */
+  private int port_ = 0;
+
+  /**
+   * Timeout for client sockets from accept
+   */
+  private int clientTimeout_ = 0;
+
+  /**
+   * Creates a server socket from underlying socket object
+   */
+  // public TNonblockingServerSocket(ServerSocket serverSocket) {
+  //   this(serverSocket, 0);
+  // }
+
+  /**
+   * Creates a server socket from underlying socket object
+   */
+  // public TNonblockingServerSocket(ServerSocket serverSocket, int clientTimeout) {
+  //   serverSocket_ = serverSocket;
+  //   clientTimeout_ = clientTimeout;
+  // }
+
+  /**
+   * Creates just a port listening server socket
+   */
+  public TNonblockingServerSocket(int port) throws TTransportException {
+    this(port, 0);
+  }
+
+  /**
+   * Creates just a port listening server socket
+   */
+  public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException {
+    port_ = port;
+    clientTimeout_ = clientTimeout;
+    try {
+      serverSocketChannel = ServerSocketChannel.open();
+      serverSocketChannel.configureBlocking(false);
+
+      // Make server socket
+      serverSocket_ = serverSocketChannel.socket();
+      // Prevent 2MSL delay problem on server restarts
+      serverSocket_.setReuseAddress(true);
+      // Bind to listening port
+      serverSocket_.bind(new InetSocketAddress(port_));
+    } catch (IOException ioe) {
+      serverSocket_ = null;
+      throw new TTransportException("Could not create ServerSocket on port " + port + ".");
+    }
+  }
+
+  public void listen() throws TTransportException {
+    // Make sure not to block on accept
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.setSoTimeout(0);
+      } catch (SocketException sx) {
+        sx.printStackTrace();
+      }
+    }
+  }
+
+  protected TNonblockingSocket acceptImpl() throws TTransportException {
+    if (serverSocket_ == null) {
+      throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
+    }
+    try {
+      SocketChannel socketChannel = serverSocketChannel.accept();
+      if (socketChannel == null) {
+        return null;
+      }
+
+      TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
+      tsocket.setTimeout(clientTimeout_);
+      return tsocket;
+    } catch (IOException iox) {
+      throw new TTransportException(iox);
+    }
+  }
+
+  public void registerSelector(Selector selector) {
+    try {
+      // Register the server socket channel, indicating an interest in
+      // accepting new connections
+      serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+    } catch (ClosedChannelException e) {
+      // this shouldn't happen, ideally...
+      // TODO: decide what to do with this.
+    }
+  }
+
+  public void close() {
+    if (serverSocket_ != null) {
+      try {
+        serverSocket_.close();
+      } catch (IOException iox) {
+        System.err.println("WARNING: Could not close server socket: " +
+                           iox.getMessage());
+      }
+      serverSocket_ = null;
+    }
+  }
+
+  public void interrupt() {
+    // The thread-safeness of this is dubious, but Java documentation suggests
+    // that it is safe to do this from a different thread context
+    close();
+  }
+
+}
diff --git a/lib/java/src/com/facebook/thrift/transport/TNonblockingServerTransport.java b/lib/java/src/com/facebook/thrift/transport/TNonblockingServerTransport.java
new file mode 100644
index 0000000..7911851
--- /dev/null
+++ b/lib/java/src/com/facebook/thrift/transport/TNonblockingServerTransport.java
@@ -0,0 +1,12 @@
+
+package com.facebook.thrift.transport;
+
+import java.nio.channels.Selector;
+
+/**
+ * Server transport that can be operated in a nonblocking fashion.
+ */
+public abstract class TNonblockingServerTransport extends TServerTransport {
+
+  public abstract void registerSelector(Selector selector);
+}
diff --git a/lib/java/src/com/facebook/thrift/transport/TNonblockingSocket.java b/lib/java/src/com/facebook/thrift/transport/TNonblockingSocket.java
new file mode 100644
index 0000000..07c03e3
--- /dev/null
+++ b/lib/java/src/com/facebook/thrift/transport/TNonblockingSocket.java
@@ -0,0 +1,259 @@
+
+package com.facebook.thrift.transport;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+
+/**
+ * Socket implementation of the TTransport interface. To be commented soon!
+ */
+public class TNonblockingSocket extends TNonblockingTransport {
+
+  private SocketChannel socketChannel = null;
+
+  /**
+   * Wrapped Socket object
+   */
+  private Socket socket_ = null;
+
+  /**
+   * Remote host
+   */
+  private String host_  = null;
+
+  /**
+   * Remote port
+   */
+  private int port_ = 0;
+
+  /**
+   * Socket timeout
+   */
+  private int timeout_ = 0;
+
+  /**
+   * Constructor that takes an already created socket.
+   *
+   * @param socketChannel Already created SocketChannel object
+   * @throws TTransportException if there is an error setting up the streams
+   */
+  public TNonblockingSocket(SocketChannel socketChannel) throws TTransportException {
+    try {
+      // make it a nonblocking channel
+      socketChannel.configureBlocking(false);
+    } catch (IOException e) {
+      throw new TTransportException(e);
+    }
+
+    this.socketChannel = socketChannel;
+    this.socket_ = socketChannel.socket();
+    try {
+      socket_.setSoLinger(false, 0);
+      socket_.setTcpNoDelay(true);
+    } catch (SocketException sx) {
+      sx.printStackTrace();
+    }
+
+    // if (isOpen()) {
+    //   try {
+    //     // inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
+    //     // outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
+    //   } catch (IOException iox) {
+    //     close();
+    //     throw new TTransportException(TTransportException.NOT_OPEN, iox);
+    //   }
+    // }
+  }
+
+  // This is all for the clientside stuff. Not sure that we'll actually be supporting that yet.
+  // /**
+  //  * Creates a new unconnected socket that will connect to the given host
+  //  * on the given port.
+  //  *
+  //  * @param host Remote host
+  //  * @param port Remote port
+  //  */
+  // public TNonblockingSocket(String host, int port) {
+  //   this(host, port, 0);
+  // }
+  //
+  // /**
+  //  * Creates a new unconnected socket that will connect to the given host
+  //  * on the given port.
+  //  *
+  //  * @param host    Remote host
+  //  * @param port    Remote port
+  //  * @param timeout Socket timeout
+  //  */
+  // public TSocket(String host, int port, int timeout) {
+  //   host_ = host;
+  //   port_ = port;
+  //   timeout_ = timeout;
+  //   initSocket();
+  // }
+
+
+  /**
+   * Register this socket with the specified selector for both read and write
+   * operations.
+   *
+   * @param selector
+   * @return the selection key for this socket.
+   */
+  public SelectionKey registerSelector(Selector selector, int interests) throws IOException {
+    // Register the new SocketChannel with our Selector, indicating
+    // we'd like to be notified when there's data waiting to be read
+    return socketChannel.register(selector, interests);
+  }
+
+  /**
+   * Initializes the socket object
+   */
+  private void initSocket() {
+    socket_ = new Socket();
+    try {
+      socket_.setSoLinger(false, 0);
+      socket_.setTcpNoDelay(true);
+      socket_.setSoTimeout(timeout_);
+    } catch (SocketException sx) {
+      sx.printStackTrace();
+    }
+  }
+
+  /**
+   * Sets the socket timeout
+   *
+   * @param timeout Milliseconds timeout
+   */
+  public void setTimeout(int timeout) {
+    timeout_ = timeout;
+    try {
+      socket_.setSoTimeout(timeout);
+    } catch (SocketException sx) {
+      sx.printStackTrace();
+    }
+  }
+
+  /**
+   * Returns a reference to the underlying socket.
+   */
+  public Socket getSocket() {
+    if (socket_ == null) {
+      initSocket();
+    }
+    return socket_;
+  }
+
+  /**
+   * Checks whether the socket is connected.
+   */
+  public boolean isOpen() {
+    if (socket_ == null) {
+      return false;
+    }
+    return socket_.isConnected();
+  }
+
+  /**
+   * Connects the socket, creating a new socket object if necessary.
+   */
+  public void open() throws TTransportException {
+    throw new RuntimeException("Not implemented yet");
+    // if (isOpen()) {
+    //   throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected.");
+    // }
+    //
+    // if (host_.length() == 0) {
+    //   throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host.");
+    // }
+    // if (port_ <= 0) {
+    //   throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open without port.");
+    // }
+    //
+    // if (socket_ == null) {
+    //   initSocket();
+    // }
+    //
+    // try {
+    //   socket_.connect(new InetSocketAddress(host_, port_));
+    //   inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
+    //   outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
+    // } catch (IOException iox) {
+    //   close();
+    //   throw new TTransportException(TTransportException.NOT_OPEN, iox);
+    // }
+  }
+
+  /**
+   * Perform a nonblocking read into buffer.
+   */
+  public int read(ByteBuffer buffer) throws IOException {
+    return socketChannel.read(buffer);
+  }
+
+
+  /**
+   * Reads from the underlying input stream if not null.
+   */
+  public int read(byte[] buf, int off, int len) throws TTransportException {
+    if ((socketChannel.validOps() & SelectionKey.OP_READ) != SelectionKey.OP_READ) {
+      throw new TTransportException(TTransportException.NOT_OPEN,
+        "Cannot read from write-only socket channel");
+    }
+    try {
+      return socketChannel.read(ByteBuffer.wrap(buf, off, len));
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.UNKNOWN, iox);
+    }
+  }
+
+  /**
+   * Perform a nonblocking write of the data in buffer;
+   */
+  public int write(ByteBuffer buffer) throws IOException {
+    return socketChannel.write(buffer);
+  }
+
+  /**
+   * Writes to the underlying output stream if not null.
+   */
+  public void write(byte[] buf, int off, int len) throws TTransportException {
+    if ((socketChannel.validOps() & SelectionKey.OP_WRITE) != SelectionKey.OP_WRITE) {
+      throw new TTransportException(TTransportException.NOT_OPEN,
+        "Cannot write to write-only socket channel");
+    }
+    try {
+      socketChannel.write(ByteBuffer.wrap(buf, off, len));
+    } catch (IOException iox) {
+      throw new TTransportException(TTransportException.UNKNOWN, iox);
+    }
+  }
+
+  /**
+   * Flushes the underlying output stream if not null.
+   */
+  public void flush() throws TTransportException {
+    // Not supported by SocketChannel.
+  }
+
+  /**
+   * Closes the socket.
+   */
+  public void close() {
+    try {
+      socketChannel.close();
+    } catch (IOException e) {
+      // silently ignore.
+    }
+  }
+
+}
diff --git a/lib/java/src/com/facebook/thrift/transport/TNonblockingTransport.java b/lib/java/src/com/facebook/thrift/transport/TNonblockingTransport.java
new file mode 100644
index 0000000..a702490
--- /dev/null
+++ b/lib/java/src/com/facebook/thrift/transport/TNonblockingTransport.java
@@ -0,0 +1,12 @@
+package com.facebook.thrift.transport;
+
+import java.io.IOException;
+import java.nio.channels.Selector;
+import java.nio.channels.SelectionKey;
+import java.nio.ByteBuffer;
+
+public abstract class TNonblockingTransport extends TTransport {
+  public abstract SelectionKey registerSelector(Selector selector, int interests) throws IOException;
+  public abstract int read(ByteBuffer buffer) throws IOException;
+  public abstract int write(ByteBuffer buffer) throws IOException;
+}
diff --git a/lib/java/src/com/facebook/thrift/transport/TTransport.java b/lib/java/src/com/facebook/thrift/transport/TTransport.java
index 2c10870..3d911bf 100644
--- a/lib/java/src/com/facebook/thrift/transport/TTransport.java
+++ b/lib/java/src/com/facebook/thrift/transport/TTransport.java
@@ -56,7 +56,7 @@
     throws TTransportException;
 
   /**
-   * Guarantees that all of len bytes are
+   * Guarantees that all of len bytes are actually read off the transport.
    *
    * @param buf Array to read into
    * @param off Index to start reading at
@@ -71,7 +71,7 @@
     while (got < len) {
       ret = read(buf, off+got, len-got);
       if (ret <= 0) {
-        throw new TTransportException("Cannot read. Remote side has closed.");
+        throw new TTransportException("Cannot read. Remote side has closed. Tried to read " + len + " bytes, but only got " + got + " bytes.");
       }
       got += ret;
     }
diff --git a/test/java/TestNonblockingServer b/test/java/TestNonblockingServer
new file mode 100644
index 0000000..cb330d4
--- /dev/null
+++ b/test/java/TestNonblockingServer
@@ -0,0 +1,2 @@
+#!/bin/bash -v
+java -server -cp thrifttest.jar:../../lib/java/libthrift.jar com.facebook.thrift.test.TestNonblockingServer $*
diff --git a/test/java/src/TestClient.java b/test/java/src/TestClient.java
index 379761d..bc76e83 100644
--- a/test/java/src/TestClient.java
+++ b/test/java/src/TestClient.java
@@ -38,6 +38,8 @@
       boolean framedInput = true;
       boolean framedOutput = true;
 
+      int socketTimeout = 1000;
+
       try {
         for (int i = 0; i < args.length; ++i) {
           if (args[i].equals("-h")) {
@@ -53,6 +55,8 @@
             url = args[++i];
           } else if (args[i].equals("-n")) {
             numTests = Integer.valueOf(args[++i]);
+          } else if (args[i].equals("-timeout")) {
+            socketTimeout = Integer.valueOf(args[++i]);
           }
         }
       } catch (Exception x) {
@@ -65,7 +69,7 @@
         transport = new THttpClient(url);
       } else {
         TSocket socket = new TSocket(host, port);
-        socket.setTimeout(1000);
+        socket.setTimeout(socketTimeout);
         transport = socket;
         if (framed) {
           transport = new TFramedTransport(transport,
diff --git a/test/java/src/TestNonblockingServer.java b/test/java/src/TestNonblockingServer.java
new file mode 100644
index 0000000..450d67a
--- /dev/null
+++ b/test/java/src/TestNonblockingServer.java
@@ -0,0 +1,70 @@
+package com.facebook.thrift.test;
+
+import com.facebook.thrift.TException;
+import com.facebook.thrift.protocol.TBinaryProtocol;
+import com.facebook.thrift.protocol.TProtocol;
+import com.facebook.thrift.protocol.TProtocolFactory;
+import com.facebook.thrift.server.TServer;
+import com.facebook.thrift.server.TSimpleServer;
+import com.facebook.thrift.server.TNonblockingServer;
+import com.facebook.thrift.server.THsHaServer;
+import com.facebook.thrift.transport.TNonblockingServerSocket;
+import com.facebook.thrift.transport.TNonblockingServerTransport;
+import com.facebook.thrift.transport.TFramedTransport;
+
+// Generated code
+import thrift.test.*;
+
+import java.net.ServerSocket;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+
+
+public class TestNonblockingServer extends TestServer {
+  public static void main(String [] args) {
+    try {
+      int port = 9090;
+      boolean hsha = false;
+
+      for (int i = 0; i < args.length; i++) {
+        if (args[i].equals("-p")) {
+          port = Integer.valueOf(args[i++]);
+        } else if (args[i].equals("-hsha")) {
+          hsha = true;
+        }
+      }
+
+      // Processor
+      TestHandler testHandler =
+        new TestHandler();
+      ThriftTest.Processor testProcessor =
+        new ThriftTest.Processor(testHandler);
+
+      // Transport
+      TNonblockingServerSocket tServerSocket =
+        new TNonblockingServerSocket(port);
+
+      TServer serverEngine;
+
+      if (hsha) {
+        // HsHa Server
+        serverEngine = new THsHaServer(testProcessor, tServerSocket);
+      } else {
+        // Nonblocking Server
+        serverEngine = new TNonblockingServer(testProcessor, tServerSocket);
+      }
+
+      // Run it
+      System.out.println("Starting the server on port " + port + "...");
+      serverEngine.serve();
+
+    } catch (Exception x) {
+      x.printStackTrace();
+    }
+    System.out.println("done.");
+  }
+}