THRIFT-1167. java: Java nonblocking server with more than one thread for select and handling IO

This patch refactors the nonblocking server hierarchy and adds in a new server that has a threaded selector pool as well as a threaded invoker pool.

Patch: Steve Jiang

git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1158977 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
new file mode 100644
index 0000000..2bd74fa
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.thrift.server;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.thrift.TByteArrayOutputStream;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TMemoryInputTransport;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides common methods and classes used by nonblocking TServer
+ * implementations.
+ */
+public abstract class AbstractNonblockingServer extends TServer {
+  protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());
+
+  public static abstract class AbstractNonblockingServerArgs<T extends AbstractNonblockingServerArgs<T>> extends AbstractServerArgs<T> {
+    public long maxReadBufferBytes = Long.MAX_VALUE;
+
+    public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
+      super(transport);
+      transportFactory(new TFramedTransport.Factory());
+    }
+  }
+
+  /**
+   * The maximum amount of memory we will allocate to client IO buffers at a
+   * time. Without this limit, the server will gladly allocate client buffers
+   * right into an out of memory exception, rather than waiting.
+   */
+  private final long MAX_READ_BUFFER_BYTES;
+
+  /**
+   * How many bytes are currently allocated to read buffers.
+   */
+  private final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
+
+  public AbstractNonblockingServer(AbstractNonblockingServerArgs args) {
+    super(args);
+    MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
+  }
+
+  /**
+   * Begin accepting connections and processing invocations.
+   */
+  public void serve() {
+    // start any IO threads
+    if (!startThreads()) {
+      return;
+    }
+
+    // start listening, or exit
+    if (!startListening()) {
+      return;
+    }
+
+    setServing(true);
+
+    // this will block while we serve
+    waitForShutdown();
+
+    setServing(false);
+
+    // do a little cleanup
+    stopListening();
+  }
+
+  /**
+   * Starts any threads required for serving.
+   * 
+   * @return true if everything went ok, false if threads could not be started.
+   */
+  protected abstract boolean startThreads();
+
+  /**
+   * A method that will block until when threads handling the serving have been
+   * shut down.
+   */
+  protected abstract void waitForShutdown();
+
+  /**
+   * Have the server transport start accepting connections.
+   * 
+   * @return true if we started listening successfully, false if something went
+   *         wrong.
+   */
+  protected boolean startListening() {
+    try {
+      serverTransport_.listen();
+      return true;
+    } catch (TTransportException ttx) {
+      LOGGER.error("Failed to start listening on server socket!", ttx);
+      return false;
+    }
+  }
+
+  /**
+   * Stop listening for connections.
+   */
+  protected void stopListening() {
+    serverTransport_.close();
+  }
+
+  /**
+   * Perform an invocation. This method could behave several different ways -
+   * invoke immediately inline, queue for separate execution, etc.
+   * 
+   * @return true if invocation was successfully requested, which is not a
+   *         guarantee that invocation has completed. False if the request
+   *         failed.
+   */
+  protected abstract boolean requestInvoke(FrameBuffer frameBuffer);
+
+  /**
+   * An abstract thread that handles selecting on a set of transports and
+   * {@link FrameBuffer FrameBuffers} associated with selected keys
+   * corresponding to requests.
+   */
+  protected abstract class AbstractSelectThread extends Thread {
+    protected final Selector selector;
+
+    // List of FrameBuffers that want to change their selection interests.
+    protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();
+
+    public AbstractSelectThread() throws IOException {
+      this.selector = SelectorProvider.provider().openSelector();
+    }
+
+    /**
+     * If the selector is blocked, wake it up.
+     */
+    public void wakeupSelector() {
+      selector.wakeup();
+    }
+
+    /**
+     * Add FrameBuffer to the list of select interest changes and wake up the
+     * selector if it's blocked. When the select() call exits, it'll give the
+     * FrameBuffer a chance to change its interests.
+     */
+    public void requestSelectInterestChange(FrameBuffer frameBuffer) {
+      synchronized (selectInterestChanges) {
+        selectInterestChanges.add(frameBuffer);
+      }
+      // wakeup the selector, if it's currently blocked.
+      selector.wakeup();
+    }
+
+    /**
+     * Check to see if there are any FrameBuffers that have switched their
+     * interest type from read to write or vice versa.
+     */
+    protected void processInterestChanges() {
+      synchronized (selectInterestChanges) {
+        for (FrameBuffer fb : selectInterestChanges) {
+          fb.changeSelectInterests();
+        }
+        selectInterestChanges.clear();
+      }
+    }
+
+    /**
+     * Do the work required to read from a readable client. If the frame is
+     * fully read, then invoke the method call.
+     */
+    protected void handleRead(SelectionKey key) {
+      FrameBuffer buffer = (FrameBuffer) key.attachment();
+      if (!buffer.read()) {
+        cleanupSelectionKey(key);
+        return;
+      }
+
+      // if the buffer's frame read is complete, invoke the method.
+      if (buffer.isFrameFullyRead()) {
+        if (!requestInvoke(buffer)) {
+          cleanupSelectionKey(key);
+        }
+      }
+    }
+
+    /**
+     * Let a writable client get written, if there's data to be written.
+     */
+    protected void handleWrite(SelectionKey key) {
+      FrameBuffer buffer = (FrameBuffer) key.attachment();
+      if (!buffer.write()) {
+        cleanupSelectionKey(key);
+      }
+    }
+
+    /**
+     * Do connection-close cleanup on a given SelectionKey.
+     */
+    protected void cleanupSelectionKey(SelectionKey key) {
+      // remove the records from the two maps
+      FrameBuffer buffer = (FrameBuffer) key.attachment();
+      if (buffer != null) {
+        // close the buffer
+        buffer.close();
+      }
+      // cancel the selection key
+      key.cancel();
+    }
+  } // SelectThread
+
+  /**
+   * Possible states for the FrameBuffer state machine.
+   */
+  private enum FrameBufferState {
+    // in the midst of reading the frame size off the wire
+    READING_FRAME_SIZE,
+    // reading the actual frame data now, but not all the way done yet
+    READING_FRAME,
+    // completely read the frame, so an invocation can now happen
+    READ_FRAME_COMPLETE,
+    // waiting to get switched to listening for write events
+    AWAITING_REGISTER_WRITE,
+    // started writing response data, not fully complete yet
+    WRITING,
+    // another thread wants this framebuffer to go back to reading
+    AWAITING_REGISTER_READ,
+    // we want our transport and selection key invalidated in the selector
+    // thread
+    AWAITING_CLOSE
+  }
+
+  /**
+   * Class that implements a sort of state machine around the interaction with a
+   * client and an invoker. It manages reading the frame size and frame data,
+   * getting it handed off as wrapped transports, and then the writing of
+   * response data back to the client. In the process it manages flipping the
+   * read and write bits on the selection key for its client.
+   */
+  protected class FrameBuffer {
+    // the actual transport hooked up to the client.
+    private final TNonblockingTransport trans_;
+
+    // the SelectionKey that corresponds to our transport
+    private final SelectionKey selectionKey_;
+
+    // the SelectThread that owns the registration of our transport
+    private final AbstractSelectThread selectThread_;
+
+    // where in the process of reading/writing are we?
+    private FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE;
+
+    // the ByteBuffer we'll be using to write and read, depending on the state
+    private ByteBuffer buffer_;
+
+    private TByteArrayOutputStream response_;
+
+    public FrameBuffer(final TNonblockingTransport trans,
+        final SelectionKey selectionKey,
+        final AbstractSelectThread selectThread) {
+      trans_ = trans;
+      selectionKey_ = selectionKey;
+      selectThread_ = selectThread;
+      buffer_ = ByteBuffer.allocate(4);
+    }
+
+    /**
+     * Give this FrameBuffer a chance to read. The selector loop should have
+     * received a read event for this FrameBuffer.
+     * 
+     * @return true if the connection should live on, false if it should be
+     *         closed
+     */
+    public boolean read() {
+      if (state_ == FrameBufferState.READING_FRAME_SIZE) {
+        // try to read the frame size completely
+        if (!internalRead()) {
+          return false;
+        }
+
+        // if the frame size has been read completely, then prepare to read the
+        // actual frame.
+        if (buffer_.remaining() == 0) {
+          // pull out the frame size as an integer.
+          int frameSize = buffer_.getInt(0);
+          if (frameSize <= 0) {
+            LOGGER.error("Read an invalid frame size of " + frameSize
+                + ". Are you using TFramedTransport on the client side?");
+            return false;
+          }
+
+          // if this frame will always be too large for this server, log the
+          // error and close the connection.
+          if (frameSize > MAX_READ_BUFFER_BYTES) {
+            LOGGER.error("Read a frame size of " + frameSize
+                + ", which is bigger than the maximum allowable buffer size for ALL connections.");
+            return false;
+          }
+
+          // if this frame will push us over the memory limit, then return.
+          // with luck, more memory will free up the next time around.
+          if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) {
+            return true;
+          }
+
+          // increment the amount of memory allocated to read buffers
+          readBufferBytesAllocated.addAndGet(frameSize);
+
+          // reallocate the readbuffer as a frame-sized buffer
+          buffer_ = ByteBuffer.allocate(frameSize);
+
+          state_ = FrameBufferState.READING_FRAME;
+        } else {
+          // this skips the check of READING_FRAME state below, since we can't
+          // possibly go on to that state if there's data left to be read at
+          // this one.
+          return true;
+        }
+      }
+
+      // it is possible to fall through from the READING_FRAME_SIZE section
+      // to READING_FRAME if there's already some frame data available once
+      // READING_FRAME_SIZE is complete.
+
+      if (state_ == FrameBufferState.READING_FRAME) {
+        if (!internalRead()) {
+          return false;
+        }
+
+        // since we're already in the select loop here for sure, we can just
+        // modify our selection key directly.
+        if (buffer_.remaining() == 0) {
+          // get rid of the read select interests
+          selectionKey_.interestOps(0);
+          state_ = FrameBufferState.READ_FRAME_COMPLETE;
+        }
+
+        return true;
+      }
+
+      // if we fall through to this point, then the state must be invalid.
+      LOGGER.error("Read was called but state is invalid (" + state_ + ")");
+      return false;
+    }
+
+    /**
+     * Give this FrameBuffer a chance to write its output to the final client.
+     */
+    public boolean write() {
+      if (state_ == FrameBufferState.WRITING) {
+        try {
+          if (trans_.write(buffer_) < 0) {
+            return false;
+          }
+        } catch (IOException e) {
+          LOGGER.warn("Got an IOException during write!", e);
+          return false;
+        }
+
+        // we're done writing. now we need to switch back to reading.
+        if (buffer_.remaining() == 0) {
+          prepareRead();
+        }
+        return true;
+      }
+
+      LOGGER.error("Write was called, but state is invalid (" + state_ + ")");
+      return false;
+    }
+
+    /**
+     * Give this FrameBuffer a chance to set its interest to write, once data
+     * has come in.
+     */
+    public void changeSelectInterests() {
+      if (state_ == FrameBufferState.AWAITING_REGISTER_WRITE) {
+        // set the OP_WRITE interest
+        selectionKey_.interestOps(SelectionKey.OP_WRITE);
+        state_ = FrameBufferState.WRITING;
+      } else if (state_ == FrameBufferState.AWAITING_REGISTER_READ) {
+        prepareRead();
+      } else if (state_ == FrameBufferState.AWAITING_CLOSE) {
+        close();
+        selectionKey_.cancel();
+      } else {
+        LOGGER.error("changeSelectInterest was called, but state is invalid (" + state_ + ")");
+      }
+    }
+
+    /**
+     * Shut the connection down.
+     */
+    public void close() {
+      // if we're being closed due to an error, we might have allocated a
+      // buffer that we need to subtract for our memory accounting.
+      if (state_ == FrameBufferState.READING_FRAME || state_ == FrameBufferState.READ_FRAME_COMPLETE) {
+        readBufferBytesAllocated.addAndGet(-buffer_.array().length);
+      }
+      trans_.close();
+    }
+
+    /**
+     * Check if this FrameBuffer has a full frame read.
+     */
+    public boolean isFrameFullyRead() {
+      return state_ == FrameBufferState.READ_FRAME_COMPLETE;
+    }
+
+    /**
+     * After the processor has processed the invocation, whatever thread is
+     * managing invocations should call this method on this FrameBuffer so we
+     * know it's time to start trying to write again. Also, if it turns out that
+     * there actually isn't any data in the response buffer, we'll skip trying
+     * to write and instead go back to reading.
+     */
+    public void responseReady() {
+      // the read buffer is definitely no longer in use, so we will decrement
+      // our read buffer count. we do this here as well as in close because
+      // we'd like to free this read memory up as quickly as possible for other
+      // clients.
+      readBufferBytesAllocated.addAndGet(-buffer_.array().length);
+
+      if (response_.len() == 0) {
+        // go straight to reading again. this was probably an oneway method
+        state_ = FrameBufferState.AWAITING_REGISTER_READ;
+        buffer_ = null;
+      } else {
+        buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());
+
+        // set state that we're waiting to be switched to write. we do this
+        // asynchronously through requestSelectInterestChange() because there is
+        // a possibility that we're not in the main thread, and thus currently
+        // blocked in select(). (this functionality is in place for the sake of
+        // the HsHa server.)
+        state_ = FrameBufferState.AWAITING_REGISTER_WRITE;
+      }
+      requestSelectInterestChange();
+    }
+
+    /**
+     * Actually invoke the method signified by this FrameBuffer.
+     */
+    public void invoke() {
+      TTransport inTrans = getInputTransport();
+      TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
+      TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());
+
+      try {
+        processorFactory_.getProcessor(inTrans).process(inProt, outProt);
+        responseReady();
+        return;
+      } catch (TException te) {
+        LOGGER.warn("Exception while invoking!", te);
+      } catch (Exception e) {
+        LOGGER.error("Unexpected exception while invoking!", e);
+      }
+      // This will only be reached when there is an exception.
+      state_ = FrameBufferState.AWAITING_CLOSE;
+      requestSelectInterestChange();
+    }
+
+    /**
+     * Wrap the read buffer in a memory-based transport so a processor can read
+     * the data it needs to handle an invocation.
+     */
+    private TTransport getInputTransport() {
+      return new TMemoryInputTransport(buffer_.array());
+    }
+
+    /**
+     * Get the transport that should be used by the invoker for responding.
+     */
+    private TTransport getOutputTransport() {
+      response_ = new TByteArrayOutputStream();
+      return outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
+    }
+
+    /**
+     * Perform a read into buffer.
+     * 
+     * @return true if the read succeeded, false if there was an error or the
+     *         connection closed.
+     */
+    private boolean internalRead() {
+      try {
+        if (trans_.read(buffer_) < 0) {
+          return false;
+        }
+        return true;
+      } catch (IOException e) {
+        LOGGER.warn("Got an IOException in internalRead!", e);
+        return false;
+      }
+    }
+
+    /**
+     * We're done writing, so reset our interest ops and change state
+     * accordingly.
+     */
+    private void prepareRead() {
+      // we can set our interest directly without using the queue because
+      // we're in the select thread.
+      selectionKey_.interestOps(SelectionKey.OP_READ);
+      // get ready for another go-around
+      buffer_ = ByteBuffer.allocate(4);
+      state_ = FrameBufferState.READING_FRAME_SIZE;
+    }
+
+    /**
+     * When this FrameBuffer needs to change its select interests and execution
+     * might not be in its select thread, then this method will make sure the
+     * interest change gets done when the select thread wakes back up. When the
+     * current thread is this FrameBuffer's select thread, then it just does the
+     * interest change immediately.
+     */
+    private void requestSelectInterestChange() {
+      if (Thread.currentThread() == this.selectThread_) {
+        changeSelectInterests();
+      } else {
+        this.selectThread_.requestSelectInterestChange(this);
+      }
+    }
+  } // FrameBuffer
+}
diff --git a/lib/java/src/org/apache/thrift/server/Invocation.java b/lib/java/src/org/apache/thrift/server/Invocation.java
new file mode 100644
index 0000000..e8210f4
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/server/Invocation.java
@@ -0,0 +1,20 @@
+package org.apache.thrift.server;
+
+import org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer;
+
+/**
+ * An Invocation represents a method call that is prepared to execute, given
+ * an idle worker thread. It contains the input and output protocols the
+ * thread's processor should use to perform the usual Thrift invocation.
+ */
+class Invocation implements Runnable {
+  private final FrameBuffer frameBuffer;
+
+  public Invocation(final FrameBuffer frameBuffer) {
+    this.frameBuffer = frameBuffer;
+  }
+
+  public void run() {
+    frameBuffer.invoke();
+  }
+}
\ No newline at end of file
diff --git a/lib/java/src/org/apache/thrift/server/THsHaServer.java b/lib/java/src/org/apache/thrift/server/THsHaServer.java
index f3dfd0a..3541154 100644
--- a/lib/java/src/org/apache/thrift/server/THsHaServer.java
+++ b/lib/java/src/org/apache/thrift/server/THsHaServer.java
@@ -27,16 +27,12 @@
 import java.util.concurrent.TimeUnit;
 
 import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * An extension of the TNonblockingServer to a Half-Sync/Half-Async server.
  * Like TNonblockingServer, it relies on the use of TFramedTransport.
  */
 public class THsHaServer extends TNonblockingServer {
-  private static final Logger LOGGER =
-    LoggerFactory.getLogger(THsHaServer.class.getName());
 
   public static class Args extends AbstractNonblockingServerArgs<Args> {
     private int workerThreads = 5;
@@ -85,46 +81,30 @@
     }
   }
 
+
   // This wraps all the functionality of queueing and thread pool management
   // for the passing of Invocations from the Selector to workers.
-  private ExecutorService invoker;
+  private final ExecutorService invoker;
+
+  private final Args args;
 
   /**
-   * Create server with every option fully specified, and with an injected
-   * ExecutorService
+   * Create the server with the specified Args configuration
    */
   public THsHaServer(Args args) {
     super(args);
 
     invoker = args.executorService == null ? createInvokerPool(args) : args.executorService;
+    this.args = args;
   }
 
-  /** @inheritDoc */
+  /**
+   * @inheritDoc
+   */
   @Override
-  public void serve() {
-    // start listening, or exit
-    if (!startListening()) {
-      return;
-    }
-
-    // start the selector, or exit
-    if (!startSelectorThread()) {
-      return;
-    }
-
-    setServing(true);
-
-    // this will block while we serve
+  protected void waitForShutdown() {
     joinSelector();
-
     gracefullyShutdownInvokerPool();
-
-    setServing(false);
-
-    // do a little cleanup
-    stopListening();
-
-    // ungracefully shut down the invoker pool?
   }
 
   /**
@@ -136,12 +116,13 @@
     TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;
 
     LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
-    ExecutorService invoker = new ThreadPoolExecutor(workerThreads, workerThreads,
-      stopTimeoutVal, stopTimeoutUnit, queue);
+    ExecutorService invoker = new ThreadPoolExecutor(workerThreads,
+      workerThreads, stopTimeoutVal, stopTimeoutUnit, queue);
 
     return invoker;
   }
 
+
   protected void gracefullyShutdownInvokerPool() {
     // try to gracefully shut down the executor service
     invoker.shutdown();
@@ -150,7 +131,7 @@
     // exception. If we don't do this, then we'll shut down prematurely. We want
     // to let the executorService clear it's task queue, closing client sockets
     // appropriately.
-    long timeoutMS = 10000;
+    long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
     long now = System.currentTimeMillis();
     while (timeoutMS >= 0) {
       try {
@@ -166,7 +147,8 @@
 
   /**
    * We override the standard invoke method here to queue the invocation for
-   * invoker service instead of immediately invoking. The thread pool takes care of the rest.
+   * invoker service instead of immediately invoking. The thread pool takes care
+   * of the rest.
    */
   @Override
   protected boolean requestInvoke(FrameBuffer frameBuffer) {
@@ -181,24 +163,6 @@
   }
 
   protected Runnable getRunnable(FrameBuffer frameBuffer){
-	return new Invocation(frameBuffer);
-  }
-
-  /**
-   * An Invocation represents a method call that is prepared to execute, given
-   * an idle worker thread. It contains the input and output protocols the
-   * thread's processor should use to perform the usual Thrift invocation.
-   */
-  private class Invocation implements Runnable {
-
-    private final FrameBuffer frameBuffer;
-
-    public Invocation(final FrameBuffer frameBuffer) {
-      this.frameBuffer = frameBuffer;
-    }
-
-    public void run() {
-      frameBuffer.invoke();
-    }
+    return new Invocation(frameBuffer);
   }
 }
diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
index d44d460..7afd4b3 100644
--- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
@@ -21,27 +21,12 @@
 package org.apache.thrift.server;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.spi.SelectorProvider;
-import java.util.HashSet;
 import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.thrift.TByteArrayOutputStream;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TIOStreamTransport;
-import org.apache.thrift.transport.TMemoryInputTransport;
 import org.apache.thrift.transport.TNonblockingServerTransport;
 import org.apache.thrift.transport.TNonblockingTransport;
-import org.apache.thrift.transport.TTransport;
 import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * A nonblocking TServer implementation. This allows for fairness amongst all
@@ -54,9 +39,7 @@
  * transport, otherwise this server will be unable to determine when a whole
  * method call has been read off the wire. Clients must also use TFramedTransport.
  */
-public class TNonblockingServer extends TServer {
-  private static final Logger LOGGER =
-    LoggerFactory.getLogger(TNonblockingServer.class.getName());
+public class TNonblockingServer extends AbstractNonblockingServer {
 
   public static class Args extends AbstractNonblockingServerArgs<Args> {
     public Args(TNonblockingServerTransport transport) {
@@ -64,97 +47,29 @@
     }
   }
 
-  public static abstract class AbstractNonblockingServerArgs<T extends AbstractNonblockingServerArgs<T>> extends AbstractServerArgs<T> {
-    public long maxReadBufferBytes = Long.MAX_VALUE;
-
-    public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
-      super(transport);
-      transportFactory(new TFramedTransport.Factory());
-    }
-  }
-
   // Flag for stopping the server
   private volatile boolean stopped_ = true;
 
-  private SelectThread selectThread_;
-
-  /**
-   * The maximum amount of memory we will allocate to client IO buffers at a
-   * time. Without this limit, the server will gladly allocate client buffers
-   * right into an out of memory exception, rather than waiting.
-   */
-  private final long MAX_READ_BUFFER_BYTES;
-
-  /**
-   * How many bytes are currently allocated to read buffers.
-   */
-  private final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
+  private SelectAcceptThread selectAcceptThread_;
 
   public TNonblockingServer(AbstractNonblockingServerArgs args) {
     super(args);
-    MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
   }
 
-  /**
-   * Begin accepting connections and processing invocations.
-   */
-  public void serve() {
-    // start listening, or exit
-    if (!startListening()) {
-      return;
-    }
-
-    // start the selector, or exit
-    if (!startSelectorThread()) {
-      return;
-    }
-
-    setServing(true);
-
-    // this will block while we serve
-    joinSelector();
-
-    setServing(false);
-
-    // do a little cleanup
-    stopListening();
-  }
 
   /**
-   * Have the server transport start accepting connections.
-   *
-   * @return true if we started listening successfully, false if something went
-   * wrong.
-   */
-  protected boolean startListening() {
-    try {
-      serverTransport_.listen();
-      return true;
-    } catch (TTransportException ttx) {
-      LOGGER.error("Failed to start listening on server socket!", ttx);
-      return false;
-    }
-  }
-
-  /**
-   * Stop listening for connections.
-   */
-  protected void stopListening() {
-    serverTransport_.close();
-  }
-
-  /**
-   * Start the selector thread running to deal with clients.
+   * Start the selector thread to deal with accepts and client messages.
    *
    * @return true if everything went ok, false if we couldn't start for some
    * reason.
    */
-  protected boolean startSelectorThread() {
+  @Override
+  protected boolean startThreads() {
     // start the selector
     try {
-      selectThread_ = new SelectThread((TNonblockingServerTransport)serverTransport_);
+      selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport)serverTransport_);
       stopped_ = false;
-      selectThread_.start();
+      selectAcceptThread_.start();
       return true;
     } catch (IOException e) {
       LOGGER.error("Failed to start selector thread!", e);
@@ -162,13 +77,18 @@
     }
   }
 
+  @Override
+  protected void waitForShutdown() {
+    joinSelector();
+  }
+
   /**
-   * Block until the selector exits.
+   * Block until the selector thread exits.
    */
   protected void joinSelector() {
     // wait until the selector thread exits
     try {
-      selectThread_.join();
+      selectAcceptThread_.join();
     } catch (InterruptedException e) {
       // for now, just silently ignore. technically this means we'll have less of
       // a graceful shutdown as a result.
@@ -178,10 +98,11 @@
   /**
    * Stop serving and shut everything down.
    */
+  @Override
   public void stop() {
     stopped_ = true;
-    if (selectThread_ != null) {
-      selectThread_.wakeupSelector();
+    if (selectAcceptThread_ != null) {
+      selectAcceptThread_.wakeupSelector();
     }
   }
 
@@ -189,43 +110,33 @@
    * Perform an invocation. This method could behave several different ways
    * - invoke immediately inline, queue for separate execution, etc.
    */
+  @Override
   protected boolean requestInvoke(FrameBuffer frameBuffer) {
     frameBuffer.invoke();
     return true;
   }
 
-  /**
-   * A FrameBuffer wants to change its selection preferences, but might not be
-   * in the select thread.
-   */
-  protected void requestSelectInterestChange(FrameBuffer frameBuffer) {
-    selectThread_.requestSelectInterestChange(frameBuffer);
-  }
 
   public boolean isStopped() {
-    return selectThread_.isStopped();
+    return selectAcceptThread_.isStopped();
   }
 
   /**
    * The thread that will be doing all the selecting, managing new connections
    * and those that still need to be read.
    */
-  protected class SelectThread extends Thread {
+  protected class SelectAcceptThread extends AbstractSelectThread {
 
+    // The server transport on which new client transports will be accepted
     private final TNonblockingServerTransport serverTransport;
-    private final Selector selector;
-
-    // List of FrameBuffers that want to change their selection interests.
-    private final Set<FrameBuffer> selectInterestChanges =
-      new HashSet<FrameBuffer>();
 
     /**
-     * Set up the SelectorThread.
+     * Set up the thread that will handle the non-blocking accepts, reads, and
+     * writes.
      */
-    public SelectThread(final TNonblockingServerTransport serverTransport)
+    public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
     throws IOException {
       this.serverTransport = serverTransport;
-      this.selector = SelectorProvider.provider().openSelector();
       serverTransport.registerSelector(selector);
     }
 
@@ -251,26 +162,6 @@
     }
 
     /**
-     * If the selector is blocked, wake it up.
-     */
-    public void wakeupSelector() {
-      selector.wakeup();
-    }
-
-    /**
-     * Add FrameBuffer to the list of select interest changes and wake up the
-     * selector if it's blocked. When the select() call exits, it'll give the
-     * FrameBuffer a chance to change its interests.
-     */
-    public void requestSelectInterestChange(FrameBuffer frameBuffer) {
-      synchronized (selectInterestChanges) {
-        selectInterestChanges.add(frameBuffer);
-      }
-      // wakeup the selector, if it's currently blocked.
-      selector.wakeup();
-    }
-
-    /**
      * Select and process IO events appropriately:
      * If there are connections to be accepted, accept them.
      * If there are existing connections with data waiting to be read, read it,
@@ -291,7 +182,7 @@
 
           // skip if not valid
           if (!key.isValid()) {
-            cleanupSelectionkey(key);
+            cleanupSelectionKey(key);
             continue;
           }
 
@@ -315,19 +206,6 @@
     }
 
     /**
-     * Check to see if there are any FrameBuffers that have switched their
-     * interest type from read to write or vice versa.
-     */
-    private void processInterestChanges() {
-      synchronized (selectInterestChanges) {
-        for (FrameBuffer fb : selectInterestChanges) {
-          fb.changeSelectInterests();
-        }
-        selectInterestChanges.clear();
-      }
-    }
-
-    /**
      * Accept a new connection.
      */
     private void handleAccept() throws IOException {
@@ -339,368 +217,16 @@
         clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
 
         // add this key to the map
-        FrameBuffer frameBuffer = new FrameBuffer(client, clientKey);
+        FrameBuffer frameBuffer = new FrameBuffer(client, clientKey,
+          SelectAcceptThread.this);
         clientKey.attach(frameBuffer);
       } catch (TTransportException tte) {
         // something went wrong accepting.
         LOGGER.warn("Exception trying to accept!", tte);
         tte.printStackTrace();
-        if (clientKey != null) cleanupSelectionkey(clientKey);
+        if (clientKey != null) cleanupSelectionKey(clientKey);
         if (client != null) client.close();
       }
     }
-
-    /**
-     * Do the work required to read from a readable client. If the frame is
-     * fully read, then invoke the method call.
-     */
-    private void handleRead(SelectionKey key) {
-      FrameBuffer buffer = (FrameBuffer)key.attachment();
-      if (!buffer.read()) {
-        cleanupSelectionkey(key);
-        return;
-      }
-
-      // if the buffer's frame read is complete, invoke the method.
-      if (buffer.isFrameFullyRead()) {
-        if (!requestInvoke(buffer)) {
-          cleanupSelectionkey(key);
-        }
-      }
-    }
-
-    /**
-     * Let a writable client get written, if there's data to be written.
-     */
-    private void handleWrite(SelectionKey key) {
-      FrameBuffer buffer = (FrameBuffer)key.attachment();
-      if (!buffer.write()) {
-        cleanupSelectionkey(key);
-      }
-    }
-
-    /**
-     * Do connection-close cleanup on a given SelectionKey.
-     */
-    private void cleanupSelectionkey(SelectionKey key) {
-      // remove the records from the two maps
-      FrameBuffer buffer = (FrameBuffer)key.attachment();
-      if (buffer != null) {
-        // close the buffer
-        buffer.close();
-      }
-      // cancel the selection key
-      key.cancel();
-    }
-  } // SelectorThread
-
-  /**
-   * Class that implements a sort of state machine around the interaction with
-   * a client and an invoker. It manages reading the frame size and frame data,
-   * getting it handed off as wrapped transports, and then the writing of
-   * response data back to the client. In the process it manages flipping the
-   * read and write bits on the selection key for its client.
-   */
-  protected class FrameBuffer {
-    //
-    // Possible states for the FrameBuffer state machine.
-    //
-    // in the midst of reading the frame size off the wire
-    private static final int READING_FRAME_SIZE = 1;
-    // reading the actual frame data now, but not all the way done yet
-    private static final int READING_FRAME = 2;
-    // completely read the frame, so an invocation can now happen
-    private static final int READ_FRAME_COMPLETE = 3;
-    // waiting to get switched to listening for write events
-    private static final int AWAITING_REGISTER_WRITE = 4;
-    // started writing response data, not fully complete yet
-    private static final int WRITING = 6;
-    // another thread wants this framebuffer to go back to reading
-    private static final int AWAITING_REGISTER_READ = 7;
-    // we want our transport and selection key invalidated in the selector thread
-    private static final int AWAITING_CLOSE = 8;
-
-    //
-    // Instance variables
-    //
-
-    // the actual transport hooked up to the client.
-    public final TNonblockingTransport trans_;
-
-    // the SelectionKey that corresponds to our transport
-    private final SelectionKey selectionKey_;
-
-    // where in the process of reading/writing are we?
-    private int state_ = READING_FRAME_SIZE;
-
-    // the ByteBuffer we'll be using to write and read, depending on the state
-    private ByteBuffer buffer_;
-
-    private TByteArrayOutputStream response_;
-
-    public FrameBuffer( final TNonblockingTransport trans,
-                        final SelectionKey selectionKey) {
-      trans_ = trans;
-      selectionKey_ = selectionKey;
-      buffer_ = ByteBuffer.allocate(4);
-    }
-
-    /**
-     * Give this FrameBuffer a chance to read. The selector loop should have
-     * received a read event for this FrameBuffer.
-     *
-     * @return true if the connection should live on, false if it should be
-     * closed
-     */
-    public boolean read() {
-      if (state_ == READING_FRAME_SIZE) {
-        // try to read the frame size completely
-        if (!internalRead()) {
-          return false;
-        }
-
-        // if the frame size has been read completely, then prepare to read the
-        // actual frame.
-        if (buffer_.remaining() == 0) {
-          // pull out the frame size as an integer.
-          int frameSize = buffer_.getInt(0);
-          if (frameSize <= 0) {
-            LOGGER.error("Read an invalid frame size of " + frameSize
-              + ". Are you using TFramedTransport on the client side?");
-            return false;
-          }
-
-          // if this frame will always be too large for this server, log the
-          // error and close the connection.
-          if (frameSize > MAX_READ_BUFFER_BYTES) {
-            LOGGER.error("Read a frame size of " + frameSize
-              + ", which is bigger than the maximum allowable buffer size for ALL connections.");
-            return false;
-          }
-
-          // if this frame will push us over the memory limit, then return.
-          // with luck, more memory will free up the next time around.
-          if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) {
-            return true;
-          }
-
-          // increment the amount of memory allocated to read buffers
-          readBufferBytesAllocated.addAndGet(frameSize);
-
-          // reallocate the readbuffer as a frame-sized buffer
-          buffer_ = ByteBuffer.allocate(frameSize);
-
-          state_ = READING_FRAME;
-        } else {
-          // this skips the check of READING_FRAME state below, since we can't
-          // possibly go on to that state if there's data left to be read at
-          // this one.
-          return true;
-        }
-      }
-
-      // it is possible to fall through from the READING_FRAME_SIZE section
-      // to READING_FRAME if there's already some frame data available once
-      // READING_FRAME_SIZE is complete.
-
-      if (state_ == READING_FRAME) {
-        if (!internalRead()) {
-          return false;
-        }
-
-        // since we're already in the select loop here for sure, we can just
-        // modify our selection key directly.
-        if (buffer_.remaining() == 0) {
-          // get rid of the read select interests
-          selectionKey_.interestOps(0);
-          state_ = READ_FRAME_COMPLETE;
-        }
-
-        return true;
-      }
-
-      // if we fall through to this point, then the state must be invalid.
-      LOGGER.error("Read was called but state is invalid (" + state_ + ")");
-      return false;
-    }
-
-    /**
-     * Give this FrameBuffer a chance to write its output to the final client.
-     */
-    public boolean write() {
-      if (state_ == WRITING) {
-        try {
-          if (trans_.write(buffer_) < 0) {
-            return false;
-          }
-        } catch (IOException e) {
-          LOGGER.warn("Got an IOException during write!", e);
-          return false;
-        }
-
-        // we're done writing. now we need to switch back to reading.
-        if (buffer_.remaining() == 0) {
-          prepareRead();
-        }
-        return true;
-      }
-
-      LOGGER.error("Write was called, but state is invalid (" + state_ + ")");
-      return false;
-    }
-
-    /**
-     * Give this FrameBuffer a chance to set its interest to write, once data
-     * has come in.
-     */
-    public void changeSelectInterests() {
-      if (state_ == AWAITING_REGISTER_WRITE) {
-        // set the OP_WRITE interest
-        selectionKey_.interestOps(SelectionKey.OP_WRITE);
-        state_ = WRITING;
-      } else if (state_ == AWAITING_REGISTER_READ) {
-        prepareRead();
-      } else if (state_ == AWAITING_CLOSE){
-        close();
-        selectionKey_.cancel();
-      } else {
-        LOGGER.error(
-          "changeSelectInterest was called, but state is invalid ("
-          + state_ + ")");
-      }
-    }
-
-    /**
-     * Shut the connection down.
-     */
-    public void close() {
-      // if we're being closed due to an error, we might have allocated a
-      // buffer that we need to subtract for our memory accounting.
-      if (state_ == READING_FRAME || state_ == READ_FRAME_COMPLETE) {
-        readBufferBytesAllocated.addAndGet(-buffer_.array().length);
-      }
-      trans_.close();
-    }
-
-    /**
-     * Check if this FrameBuffer has a full frame read.
-     */
-    public boolean isFrameFullyRead() {
-      return state_ == READ_FRAME_COMPLETE;
-    }
-
-    /**
-     * After the processor has processed the invocation, whatever thread is
-     * managing invocations should call this method on this FrameBuffer so we
-     * know it's time to start trying to write again. Also, if it turns out
-     * that there actually isn't any data in the response buffer, we'll skip
-     * trying to write and instead go back to reading.
-     */
-    public void responseReady() {
-      // the read buffer is definitely no longer in use, so we will decrement
-      // our read buffer count. we do this here as well as in close because
-      // we'd like to free this read memory up as quickly as possible for other
-      // clients.
-      readBufferBytesAllocated.addAndGet(-buffer_.array().length);
-
-      if (response_.len() == 0) {
-        // go straight to reading again. this was probably an oneway method
-        state_ = AWAITING_REGISTER_READ;
-        buffer_ = null;
-      } else {
-        buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());
-
-        // set state that we're waiting to be switched to write. we do this
-        // asynchronously through requestSelectInterestChange() because there is a
-        // possibility that we're not in the main thread, and thus currently
-        // blocked in select(). (this functionality is in place for the sake of
-        // the HsHa server.)
-        state_ = AWAITING_REGISTER_WRITE;
-      }
-      requestSelectInterestChange();
-    }
-
-    /**
-     * Actually invoke the method signified by this FrameBuffer.
-     */
-    public void invoke() {
-      TTransport inTrans = getInputTransport();
-      TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
-      TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());
-
-      try {
-        processorFactory_.getProcessor(inTrans).process(inProt, outProt);
-        responseReady();
-        return;
-      } catch (TException te) {
-        LOGGER.warn("Exception while invoking!", te);
-      } catch (Exception e) {
-        LOGGER.error("Unexpected exception while invoking!", e);
-      }
-      // This will only be reached when there is an exception.
-      state_ = AWAITING_CLOSE;
-      requestSelectInterestChange();
-    }
-
-    /**
-     * Wrap the read buffer in a memory-based transport so a processor can read
-     * the data it needs to handle an invocation.
-     */
-    private TTransport getInputTransport() {
-      return new TMemoryInputTransport(buffer_.array());
-    }
-
-    /**
-     * Get the transport that should be used by the invoker for responding.
-     */
-    private TTransport getOutputTransport() {
-      response_ = new TByteArrayOutputStream();
-      return outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
-    }
-
-    /**
-     * Perform a read into buffer.
-     *
-     * @return true if the read succeeded, false if there was an error or the
-     * connection closed.
-     */
-    private boolean internalRead() {
-      try {
-        if (trans_.read(buffer_) < 0) {
-          return false;
-        }
-        return true;
-      } catch (IOException e) {
-        LOGGER.warn("Got an IOException in internalRead!", e);
-        return false;
-      }
-    }
-
-    /**
-     * We're done writing, so reset our interest ops and change state accordingly.
-     */
-    private void prepareRead() {
-      // we can set our interest directly without using the queue because
-      // we're in the select thread.
-      selectionKey_.interestOps(SelectionKey.OP_READ);
-      // get ready for another go-around
-      buffer_ = ByteBuffer.allocate(4);
-      state_ = READING_FRAME_SIZE;
-    }
-
-    /**
-     * When this FrameBuffer needs to change it's select interests and execution
-     * might not be in the select thread, then this method will make sure the
-     * interest change gets done when the select thread wakes back up. When the
-     * current thread is the select thread, then it just does the interest change
-     * immediately.
-     */
-    private void requestSelectInterestChange() {
-      if (Thread.currentThread() == selectThread_) {
-        changeSelectInterests();
-      } else {
-        TNonblockingServer.this.requestSelectInterestChange(this);
-      }
-    }
-  } // FrameBuffer
+  } // SelectAcceptThread
 }
diff --git a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
new file mode 100644
index 0000000..4cf5f1b
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
@@ -0,0 +1,646 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.thrift.server;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Half-Sync/Half-Async server with a separate pool of threads to handle
+ * non-blocking I/O. Accepts are handled on a single thread, and a configurable
+ * number of nonblocking selector threads manage reading and writing of client
+ * connections. A synchronous worker thread pool handles processing of requests.
+ * 
+ * Performs better than TNonblockingServer/THsHaServer in multi-core
+ * environments when the the bottleneck is CPU on the single selector thread
+ * handling I/O. In addition, because the accept handling is decoupled from
+ * reads/writes and invocation, the server has better ability to handle back-
+ * pressure from new connections (e.g. stop accepting when busy).
+ * 
+ * Like TNonblockingServer, it relies on the use of TFramedTransport.
+ */
+public class TThreadedSelectorServer extends AbstractNonblockingServer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TThreadedSelectorServer.class.getName());
+
+  public static class Args extends AbstractNonblockingServerArgs<Args> {
+
+    /** The number of threads for selecting on already-accepted connections */
+    public int selectorThreads = 2;
+    /**
+     * The size of the executor service (if none is specified) that will handle
+     * invocations. This may be set to 0, in which case invocations will be
+     * handled directly on the selector threads (as is in TNonblockingServer)
+     */
+    private int workerThreads = 5;
+    /** Time to wait for server to stop gracefully */
+    private int stopTimeoutVal = 60;
+    private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+    /** The ExecutorService for handling dispatched requests */
+    private ExecutorService executorService = null;
+    /**
+     * The size of the blocking queue per selector thread for passing accepted
+     * connections to the selector thread
+     */
+    private int acceptQueueSizePerThread = 4;
+
+    /**
+     * Determines the strategy for handling new accepted connections.
+     */
+    public static enum AcceptPolicy {
+      /**
+       * Require accepted connection registration to be handled by the executor.
+       * If the worker pool is saturated, further accepts will be closed
+       * immediately. Slightly increases latency due to an extra scheduling.
+       */
+      FAIR_ACCEPT,
+      /**
+       * Handle the accepts as fast as possible, disregarding the status of the
+       * executor service.
+       */
+      FAST_ACCEPT
+    }
+
+    private AcceptPolicy acceptPolicy = AcceptPolicy.FAST_ACCEPT;
+
+    public Args(TNonblockingServerTransport transport) {
+      super(transport);
+    }
+
+    public Args selectorThreads(int i) {
+      selectorThreads = i;
+      return this;
+    }
+
+    public int getSelectorThreads() {
+      return selectorThreads;
+    }
+
+    public Args workerThreads(int i) {
+      workerThreads = i;
+      return this;
+    }
+
+    public int getWorkerThreads() {
+      return workerThreads;
+    }
+
+    public int getStopTimeoutVal() {
+      return stopTimeoutVal;
+    }
+
+    public Args stopTimeoutVal(int stopTimeoutVal) {
+      this.stopTimeoutVal = stopTimeoutVal;
+      return this;
+    }
+
+    public TimeUnit getStopTimeoutUnit() {
+      return stopTimeoutUnit;
+    }
+
+    public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) {
+      this.stopTimeoutUnit = stopTimeoutUnit;
+      return this;
+    }
+
+    public ExecutorService getExecutorService() {
+      return executorService;
+    }
+
+    public Args executorService(ExecutorService executorService) {
+      this.executorService = executorService;
+      return this;
+    }
+
+    public int getAcceptQueueSizePerThread() {
+      return acceptQueueSizePerThread;
+    }
+
+    public Args acceptQueueSizePerThread(int acceptQueueSizePerThread) {
+      this.acceptQueueSizePerThread = acceptQueueSizePerThread;
+      return this;
+    }
+
+    public AcceptPolicy getAcceptPolicy() {
+      return acceptPolicy;
+    }
+
+    public Args acceptPolicy(AcceptPolicy acceptPolicy) {
+      this.acceptPolicy = acceptPolicy;
+      return this;
+    }
+
+    public void validate() {
+      if (selectorThreads <= 0) {
+        throw new IllegalArgumentException("selectorThreads must be positive.");
+      }
+      if (workerThreads < 0) {
+        throw new IllegalArgumentException("workerThreads must be non-negative.");
+      }
+      if (acceptQueueSizePerThread <= 0) {
+        throw new IllegalArgumentException("acceptQueueSizePerThread must be positive.");
+      }
+    }
+  }
+
+  // Flag for stopping the server
+  private volatile boolean stopped_ = true;
+
+  // The thread handling all accepts
+  private AcceptThread acceptThread;
+
+  // Threads handling events on client transports
+  private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();
+
+  // This wraps all the functionality of queueing and thread pool management
+  // for the passing of Invocations from the selector thread(s) to the workers
+  // (if any).
+  private final ExecutorService invoker;
+
+  private final Args args;
+
+  /**
+   * Create the server with the specified Args configuration
+   */
+  public TThreadedSelectorServer(Args args) {
+    super(args);
+    args.validate();
+    invoker = args.executorService == null ? createDefaultExecutor(args) : args.executorService;
+    this.args = args;
+  }
+
+  /**
+   * Start the accept and selector threads running to deal with clients.
+   * 
+   * @return true if everything went ok, false if we couldn't start for some
+   *         reason.
+   */
+  @Override
+  protected boolean startThreads() {
+    try {
+      for (int i = 0; i < args.selectorThreads; ++i) {
+        selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
+      }
+      acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
+        createSelectorThreadLoadBalancer(selectorThreads));
+      stopped_ = false;
+      for (SelectorThread thread : selectorThreads) {
+        thread.start();
+      }
+      acceptThread.start();
+      return true;
+    } catch (IOException e) {
+      LOGGER.error("Failed to start threads!", e);
+      return false;
+    }
+  }
+
+  /**
+   * Joins the accept and selector threads and shuts down the executor service.
+   */
+  @Override
+  protected void waitForShutdown() {
+    try {
+      joinThreads();
+    } catch (InterruptedException e) {
+      // Non-graceful shutdown occurred
+      LOGGER.error("Interrupted while joining threads!", e);
+    }
+    gracefullyShutdownInvokerPool();
+  }
+
+  protected void joinThreads() throws InterruptedException {
+    // wait until the io threads exit
+    acceptThread.join();
+    for (SelectorThread thread : selectorThreads) {
+      thread.join();
+    }
+  }
+
+  /**
+   * Stop serving and shut everything down.
+   */
+  @Override
+  public void stop() {
+    stopped_ = true;
+
+    // Stop queuing connect attempts asap
+    stopListening();
+
+    if (acceptThread != null) {
+      acceptThread.wakeupSelector();
+    }
+    if (selectorThreads != null) {
+      for (SelectorThread thread : selectorThreads) {
+        if (thread != null)
+          thread.wakeupSelector();
+      }
+    }
+  }
+
+  protected void gracefullyShutdownInvokerPool() {
+    // try to gracefully shut down the executor service
+    invoker.shutdown();
+
+    // Loop until awaitTermination finally does return without a interrupted
+    // exception. If we don't do this, then we'll shut down prematurely. We want
+    // to let the executorService clear it's task queue, closing client sockets
+    // appropriately.
+    long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
+    long now = System.currentTimeMillis();
+    while (timeoutMS >= 0) {
+      try {
+        invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
+        break;
+      } catch (InterruptedException ix) {
+        long newnow = System.currentTimeMillis();
+        timeoutMS -= (newnow - now);
+        now = newnow;
+      }
+    }
+  }
+
+  /**
+   * We override the standard invoke method here to queue the invocation for
+   * invoker service instead of immediately invoking. If there is no thread
+   * pool, handle the invocation inline on this thread
+   */
+  @Override
+  protected boolean requestInvoke(FrameBuffer frameBuffer) {
+    Runnable invocation = getRunnable(frameBuffer);
+    if (invoker != null) {
+      try {
+        invoker.execute(invocation);
+        return true;
+      } catch (RejectedExecutionException rx) {
+        LOGGER.warn("ExecutorService rejected execution!", rx);
+        return false;
+      }
+    } else {
+      // Invoke on the caller's thread
+      invocation.run();
+      return true;
+    }
+  }
+
+  protected Runnable getRunnable(FrameBuffer frameBuffer) {
+    return new Invocation(frameBuffer);
+  }
+
+  /**
+   * Helper to create the invoker if one is not specified
+   */
+  protected static ExecutorService createDefaultExecutor(Args options) {
+    return (options.workerThreads > 0) ? Executors.newFixedThreadPool(options.workerThreads) : null;
+  }
+
+  private static BlockingQueue<TNonblockingTransport> createDefaultAcceptQueue(int queueSize) {
+    if (queueSize == 0) {
+      // Unbounded queue
+      return new LinkedBlockingQueue<TNonblockingTransport>();
+    }
+    return new ArrayBlockingQueue<TNonblockingTransport>(queueSize);
+  }
+
+  /**
+   * The thread that selects on the server transport (listen socket) and accepts
+   * new connections to hand off to the IO selector threads
+   */
+  protected class AcceptThread extends Thread {
+
+    // The listen socket to accept on
+    private final TNonblockingServerTransport serverTransport;
+    private final Selector acceptSelector;
+
+    private final SelectorThreadLoadBalancer threadChooser;
+
+    /**
+     * Set up the AcceptThead
+     * 
+     * @throws IOException
+     */
+    public AcceptThread(TNonblockingServerTransport serverTransport,
+        SelectorThreadLoadBalancer threadChooser) throws IOException {
+      this.serverTransport = serverTransport;
+      this.threadChooser = threadChooser;
+      this.acceptSelector = SelectorProvider.provider().openSelector();
+      this.serverTransport.registerSelector(acceptSelector);
+    }
+
+    /**
+     * The work loop. Selects on the server transport and accepts. If there was
+     * a server transport that had blocking accepts, and returned on blocking
+     * client transports, that should be used instead
+     */
+    public void run() {
+      try {
+        while (!stopped_) {
+          select();
+        }
+      } catch (Throwable t) {
+        LOGGER.error("run() exiting due to uncaught error", t);
+      } finally {
+        // This will wake up the selector threads
+        TThreadedSelectorServer.this.stop();
+      }
+    }
+
+    /**
+     * If the selector is blocked, wake it up.
+     */
+    public void wakeupSelector() {
+      acceptSelector.wakeup();
+    }
+
+    /**
+     * Select and process IO events appropriately: If there are connections to
+     * be accepted, accept them.
+     */
+    private void select() {
+      try {
+        // wait for connect events.
+        acceptSelector.select();
+
+        // process the io events we received
+        Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
+        while (!stopped_ && selectedKeys.hasNext()) {
+          SelectionKey key = selectedKeys.next();
+          selectedKeys.remove();
+
+          // skip if not valid
+          if (!key.isValid()) {
+            continue;
+          }
+
+          if (key.isAcceptable()) {
+            handleAccept();
+          } else {
+            LOGGER.warn("Unexpected state in select! " + key.interestOps());
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.warn("Got an IOException while selecting!", e);
+      }
+    }
+
+    /**
+     * Accept a new connection.
+     */
+    private void handleAccept() {
+      final TNonblockingTransport client = doAccept();
+      if (client != null) {
+        // Pass this connection to a selector thread
+        final SelectorThread targetThread = threadChooser.nextThread();
+
+        if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
+          doAddAccept(targetThread, client);
+        } else {
+          // FAIR_ACCEPT
+          try {
+            invoker.submit(new Runnable() {
+              public void run() {
+                doAddAccept(targetThread, client);
+              }
+            });
+          } catch (RejectedExecutionException rx) {
+            LOGGER.warn("ExecutorService rejected accept registration!", rx);
+            // close immediately
+            client.close();
+          }
+        }
+      }
+    }
+
+    private TNonblockingTransport doAccept() {
+      try {
+        return (TNonblockingTransport) serverTransport.accept();
+      } catch (TTransportException tte) {
+        // something went wrong accepting.
+        LOGGER.warn("Exception trying to accept!", tte);
+        return null;
+      }
+    }
+
+    private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
+      if (!thread.addAcceptedConnection(client)) {
+        client.close();
+      }
+    }
+  } // AcceptThread
+
+  /**
+   * The SelectorThread(s) will be doing all the selecting on accepted active
+   * connections.
+   */
+  protected class SelectorThread extends AbstractSelectThread {
+
+    // Accepted connections added by the accept thread.
+    private final BlockingQueue<TNonblockingTransport> acceptedQueue;
+
+    /**
+     * Set up the SelectorThread with an unbounded queue for incoming accepts.
+     * 
+     * @throws IOException
+     *           if a selector cannot be created
+     */
+    public SelectorThread() throws IOException {
+      this(new LinkedBlockingQueue<TNonblockingTransport>());
+    }
+
+    /**
+     * Set up the SelectorThread with an bounded queue for incoming accepts.
+     * 
+     * @throws IOException
+     *           if a selector cannot be created
+     */
+    public SelectorThread(int maxPendingAccepts) throws IOException {
+      this(createDefaultAcceptQueue(maxPendingAccepts));
+    }
+
+    /**
+     * Set up the SelectorThread with a specified queue for connections.
+     * 
+     * @param acceptedQueue
+     *          The BlockingQueue implementation for holding incoming accepted
+     *          connections.
+     * @throws IOException
+     *           if a selector cannot be created.
+     */
+    public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue) throws IOException {
+      this.acceptedQueue = acceptedQueue;
+    }
+
+    /**
+     * Hands off an accepted connection to be handled by this thread. This
+     * method will block if the queue for new connections is at capacity.
+     * 
+     * @param accepted
+     *          The connection that has been accepted.
+     * @return true if the connection has been successfully added.
+     */
+    public boolean addAcceptedConnection(TNonblockingTransport accepted) {
+      try {
+        acceptedQueue.put(accepted);
+      } catch (InterruptedException e) {
+        LOGGER.warn("Interrupted while adding accepted connection!", e);
+        return false;
+      }
+      selector.wakeup();
+      return true;
+    }
+
+    /**
+     * The work loop. Handles selecting (read/write IO), dispatching, and
+     * managing the selection preferences of all existing connections.
+     */
+    public void run() {
+      try {
+        while (!stopped_) {
+          select();
+          processAcceptedConnections();
+          processInterestChanges();
+        }
+      } catch (Throwable t) {
+        LOGGER.error("run() exiting due to uncaught error", t);
+      } finally {
+        // This will wake up the accept thread and the other selector threads
+        TThreadedSelectorServer.this.stop();
+      }
+    }
+
+    /**
+     * Select and process IO events appropriately: If there are existing
+     * connections with data waiting to be read, read it, buffering until a
+     * whole frame has been read. If there are any pending responses, buffer
+     * them until their target client is available, and then send the data.
+     */
+    private void select() {
+      try {
+        // wait for io events.
+        selector.select();
+
+        // process the io events we received
+        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
+        while (!stopped_ && selectedKeys.hasNext()) {
+          SelectionKey key = selectedKeys.next();
+          selectedKeys.remove();
+
+          // skip if not valid
+          if (!key.isValid()) {
+            cleanupSelectionKey(key);
+            continue;
+          }
+
+          if (key.isReadable()) {
+            // deal with reads
+            handleRead(key);
+          } else if (key.isWritable()) {
+            // deal with writes
+            handleWrite(key);
+          } else {
+            LOGGER.warn("Unexpected state in select! " + key.interestOps());
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.warn("Got an IOException while selecting!", e);
+      }
+    }
+
+    private void processAcceptedConnections() {
+      // Register accepted connections
+      while (!stopped_) {
+        TNonblockingTransport accepted = acceptedQueue.poll();
+        if (accepted == null) {
+          break;
+        }
+        registerAccepted(accepted);
+      }
+    }
+
+    private void registerAccepted(TNonblockingTransport accepted) {
+      SelectionKey clientKey = null;
+      try {
+        clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);
+
+        FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this);
+        clientKey.attach(frameBuffer);
+      } catch (IOException e) {
+        LOGGER.warn("Failed to register accepted connection to selector!", e);
+        if (clientKey != null) {
+          cleanupSelectionKey(clientKey);
+        }
+        accepted.close();
+      }
+    }
+  } // SelectorThread
+
+  /**
+   * Creates a SelectorThreadLoadBalancer to be used by the accept thread for
+   * assigning newly accepted connections across the threads.
+   */
+  protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection<? extends SelectorThread> threads) {
+    return new SelectorThreadLoadBalancer(threads);
+  }
+
+  /**
+   * A round robin load balancer for choosing selector threads for new
+   * connections.
+   */
+  protected class SelectorThreadLoadBalancer {
+    private final Collection<? extends SelectorThread> threads;
+    private Iterator<? extends SelectorThread> nextThreadIterator;
+
+    public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T> threads) {
+      if (threads.isEmpty()) {
+        throw new IllegalArgumentException("At least one selector thread is required");
+      }
+      this.threads = Collections.unmodifiableList(new ArrayList<T>(threads));
+      nextThreadIterator = this.threads.iterator();
+    }
+
+    public SelectorThread nextThread() {
+      // Choose a selector thread (round robin)
+      if (!nextThreadIterator.hasNext()) {
+        nextThreadIterator = threads.iterator();
+      }
+      return nextThreadIterator.next();
+    }
+  }
+}
diff --git a/lib/java/test/org/apache/thrift/server/TestThreadedSelectorServer.java b/lib/java/test/org/apache/thrift/server/TestThreadedSelectorServer.java
new file mode 100644
index 0000000..ed729a2
--- /dev/null
+++ b/lib/java/test/org/apache/thrift/server/TestThreadedSelectorServer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.server;
+
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TThreadedSelectorServer.Args;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+
+public class TestThreadedSelectorServer extends TestNonblockingServer {
+  protected TServer getServer(TProcessor processor, TNonblockingServerSocket socket, TProtocolFactory protoFactory) {
+    return new TThreadedSelectorServer(new Args(socket).processor(processor).protocolFactory(protoFactory));
+  }
+}