THRIFT-1167. java: Java nonblocking server with more than one thread for select and handling IO
This patch refactors the nonblocking server hierarchy and adds in a new server that has a threaded selector pool as well as a threaded invoker pool.
Patch: Steve Jiang
git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1158977 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
new file mode 100644
index 0000000..2bd74fa
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
@@ -0,0 +1,552 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.thrift.server;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.thrift.TByteArrayOutputStream;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TProtocol;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TMemoryInputTransport;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Provides common methods and classes used by nonblocking TServer
+ * implementations.
+ */
+public abstract class AbstractNonblockingServer extends TServer {
+ protected final Logger LOGGER = LoggerFactory.getLogger(getClass().getName());
+
+ public static abstract class AbstractNonblockingServerArgs<T extends AbstractNonblockingServerArgs<T>> extends AbstractServerArgs<T> {
+ public long maxReadBufferBytes = Long.MAX_VALUE;
+
+ public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
+ super(transport);
+ transportFactory(new TFramedTransport.Factory());
+ }
+ }
+
+ /**
+ * The maximum amount of memory we will allocate to client IO buffers at a
+ * time. Without this limit, the server will gladly allocate client buffers
+ * right into an out of memory exception, rather than waiting.
+ */
+ private final long MAX_READ_BUFFER_BYTES;
+
+ /**
+ * How many bytes are currently allocated to read buffers.
+ */
+ private final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
+
+ public AbstractNonblockingServer(AbstractNonblockingServerArgs args) {
+ super(args);
+ MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
+ }
+
+ /**
+ * Begin accepting connections and processing invocations.
+ */
+ public void serve() {
+ // start any IO threads
+ if (!startThreads()) {
+ return;
+ }
+
+ // start listening, or exit
+ if (!startListening()) {
+ return;
+ }
+
+ setServing(true);
+
+ // this will block while we serve
+ waitForShutdown();
+
+ setServing(false);
+
+ // do a little cleanup
+ stopListening();
+ }
+
+ /**
+ * Starts any threads required for serving.
+ *
+ * @return true if everything went ok, false if threads could not be started.
+ */
+ protected abstract boolean startThreads();
+
+ /**
+ * A method that will block until when threads handling the serving have been
+ * shut down.
+ */
+ protected abstract void waitForShutdown();
+
+ /**
+ * Have the server transport start accepting connections.
+ *
+ * @return true if we started listening successfully, false if something went
+ * wrong.
+ */
+ protected boolean startListening() {
+ try {
+ serverTransport_.listen();
+ return true;
+ } catch (TTransportException ttx) {
+ LOGGER.error("Failed to start listening on server socket!", ttx);
+ return false;
+ }
+ }
+
+ /**
+ * Stop listening for connections.
+ */
+ protected void stopListening() {
+ serverTransport_.close();
+ }
+
+ /**
+ * Perform an invocation. This method could behave several different ways -
+ * invoke immediately inline, queue for separate execution, etc.
+ *
+ * @return true if invocation was successfully requested, which is not a
+ * guarantee that invocation has completed. False if the request
+ * failed.
+ */
+ protected abstract boolean requestInvoke(FrameBuffer frameBuffer);
+
+ /**
+ * An abstract thread that handles selecting on a set of transports and
+ * {@link FrameBuffer FrameBuffers} associated with selected keys
+ * corresponding to requests.
+ */
+ protected abstract class AbstractSelectThread extends Thread {
+ protected final Selector selector;
+
+ // List of FrameBuffers that want to change their selection interests.
+ protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();
+
+ public AbstractSelectThread() throws IOException {
+ this.selector = SelectorProvider.provider().openSelector();
+ }
+
+ /**
+ * If the selector is blocked, wake it up.
+ */
+ public void wakeupSelector() {
+ selector.wakeup();
+ }
+
+ /**
+ * Add FrameBuffer to the list of select interest changes and wake up the
+ * selector if it's blocked. When the select() call exits, it'll give the
+ * FrameBuffer a chance to change its interests.
+ */
+ public void requestSelectInterestChange(FrameBuffer frameBuffer) {
+ synchronized (selectInterestChanges) {
+ selectInterestChanges.add(frameBuffer);
+ }
+ // wakeup the selector, if it's currently blocked.
+ selector.wakeup();
+ }
+
+ /**
+ * Check to see if there are any FrameBuffers that have switched their
+ * interest type from read to write or vice versa.
+ */
+ protected void processInterestChanges() {
+ synchronized (selectInterestChanges) {
+ for (FrameBuffer fb : selectInterestChanges) {
+ fb.changeSelectInterests();
+ }
+ selectInterestChanges.clear();
+ }
+ }
+
+ /**
+ * Do the work required to read from a readable client. If the frame is
+ * fully read, then invoke the method call.
+ */
+ protected void handleRead(SelectionKey key) {
+ FrameBuffer buffer = (FrameBuffer) key.attachment();
+ if (!buffer.read()) {
+ cleanupSelectionKey(key);
+ return;
+ }
+
+ // if the buffer's frame read is complete, invoke the method.
+ if (buffer.isFrameFullyRead()) {
+ if (!requestInvoke(buffer)) {
+ cleanupSelectionKey(key);
+ }
+ }
+ }
+
+ /**
+ * Let a writable client get written, if there's data to be written.
+ */
+ protected void handleWrite(SelectionKey key) {
+ FrameBuffer buffer = (FrameBuffer) key.attachment();
+ if (!buffer.write()) {
+ cleanupSelectionKey(key);
+ }
+ }
+
+ /**
+ * Do connection-close cleanup on a given SelectionKey.
+ */
+ protected void cleanupSelectionKey(SelectionKey key) {
+ // remove the records from the two maps
+ FrameBuffer buffer = (FrameBuffer) key.attachment();
+ if (buffer != null) {
+ // close the buffer
+ buffer.close();
+ }
+ // cancel the selection key
+ key.cancel();
+ }
+ } // SelectThread
+
+ /**
+ * Possible states for the FrameBuffer state machine.
+ */
+ private enum FrameBufferState {
+ // in the midst of reading the frame size off the wire
+ READING_FRAME_SIZE,
+ // reading the actual frame data now, but not all the way done yet
+ READING_FRAME,
+ // completely read the frame, so an invocation can now happen
+ READ_FRAME_COMPLETE,
+ // waiting to get switched to listening for write events
+ AWAITING_REGISTER_WRITE,
+ // started writing response data, not fully complete yet
+ WRITING,
+ // another thread wants this framebuffer to go back to reading
+ AWAITING_REGISTER_READ,
+ // we want our transport and selection key invalidated in the selector
+ // thread
+ AWAITING_CLOSE
+ }
+
+ /**
+ * Class that implements a sort of state machine around the interaction with a
+ * client and an invoker. It manages reading the frame size and frame data,
+ * getting it handed off as wrapped transports, and then the writing of
+ * response data back to the client. In the process it manages flipping the
+ * read and write bits on the selection key for its client.
+ */
+ protected class FrameBuffer {
+ // the actual transport hooked up to the client.
+ private final TNonblockingTransport trans_;
+
+ // the SelectionKey that corresponds to our transport
+ private final SelectionKey selectionKey_;
+
+ // the SelectThread that owns the registration of our transport
+ private final AbstractSelectThread selectThread_;
+
+ // where in the process of reading/writing are we?
+ private FrameBufferState state_ = FrameBufferState.READING_FRAME_SIZE;
+
+ // the ByteBuffer we'll be using to write and read, depending on the state
+ private ByteBuffer buffer_;
+
+ private TByteArrayOutputStream response_;
+
+ public FrameBuffer(final TNonblockingTransport trans,
+ final SelectionKey selectionKey,
+ final AbstractSelectThread selectThread) {
+ trans_ = trans;
+ selectionKey_ = selectionKey;
+ selectThread_ = selectThread;
+ buffer_ = ByteBuffer.allocate(4);
+ }
+
+ /**
+ * Give this FrameBuffer a chance to read. The selector loop should have
+ * received a read event for this FrameBuffer.
+ *
+ * @return true if the connection should live on, false if it should be
+ * closed
+ */
+ public boolean read() {
+ if (state_ == FrameBufferState.READING_FRAME_SIZE) {
+ // try to read the frame size completely
+ if (!internalRead()) {
+ return false;
+ }
+
+ // if the frame size has been read completely, then prepare to read the
+ // actual frame.
+ if (buffer_.remaining() == 0) {
+ // pull out the frame size as an integer.
+ int frameSize = buffer_.getInt(0);
+ if (frameSize <= 0) {
+ LOGGER.error("Read an invalid frame size of " + frameSize
+ + ". Are you using TFramedTransport on the client side?");
+ return false;
+ }
+
+ // if this frame will always be too large for this server, log the
+ // error and close the connection.
+ if (frameSize > MAX_READ_BUFFER_BYTES) {
+ LOGGER.error("Read a frame size of " + frameSize
+ + ", which is bigger than the maximum allowable buffer size for ALL connections.");
+ return false;
+ }
+
+ // if this frame will push us over the memory limit, then return.
+ // with luck, more memory will free up the next time around.
+ if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) {
+ return true;
+ }
+
+ // increment the amount of memory allocated to read buffers
+ readBufferBytesAllocated.addAndGet(frameSize);
+
+ // reallocate the readbuffer as a frame-sized buffer
+ buffer_ = ByteBuffer.allocate(frameSize);
+
+ state_ = FrameBufferState.READING_FRAME;
+ } else {
+ // this skips the check of READING_FRAME state below, since we can't
+ // possibly go on to that state if there's data left to be read at
+ // this one.
+ return true;
+ }
+ }
+
+ // it is possible to fall through from the READING_FRAME_SIZE section
+ // to READING_FRAME if there's already some frame data available once
+ // READING_FRAME_SIZE is complete.
+
+ if (state_ == FrameBufferState.READING_FRAME) {
+ if (!internalRead()) {
+ return false;
+ }
+
+ // since we're already in the select loop here for sure, we can just
+ // modify our selection key directly.
+ if (buffer_.remaining() == 0) {
+ // get rid of the read select interests
+ selectionKey_.interestOps(0);
+ state_ = FrameBufferState.READ_FRAME_COMPLETE;
+ }
+
+ return true;
+ }
+
+ // if we fall through to this point, then the state must be invalid.
+ LOGGER.error("Read was called but state is invalid (" + state_ + ")");
+ return false;
+ }
+
+ /**
+ * Give this FrameBuffer a chance to write its output to the final client.
+ */
+ public boolean write() {
+ if (state_ == FrameBufferState.WRITING) {
+ try {
+ if (trans_.write(buffer_) < 0) {
+ return false;
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Got an IOException during write!", e);
+ return false;
+ }
+
+ // we're done writing. now we need to switch back to reading.
+ if (buffer_.remaining() == 0) {
+ prepareRead();
+ }
+ return true;
+ }
+
+ LOGGER.error("Write was called, but state is invalid (" + state_ + ")");
+ return false;
+ }
+
+ /**
+ * Give this FrameBuffer a chance to set its interest to write, once data
+ * has come in.
+ */
+ public void changeSelectInterests() {
+ if (state_ == FrameBufferState.AWAITING_REGISTER_WRITE) {
+ // set the OP_WRITE interest
+ selectionKey_.interestOps(SelectionKey.OP_WRITE);
+ state_ = FrameBufferState.WRITING;
+ } else if (state_ == FrameBufferState.AWAITING_REGISTER_READ) {
+ prepareRead();
+ } else if (state_ == FrameBufferState.AWAITING_CLOSE) {
+ close();
+ selectionKey_.cancel();
+ } else {
+ LOGGER.error("changeSelectInterest was called, but state is invalid (" + state_ + ")");
+ }
+ }
+
+ /**
+ * Shut the connection down.
+ */
+ public void close() {
+ // if we're being closed due to an error, we might have allocated a
+ // buffer that we need to subtract for our memory accounting.
+ if (state_ == FrameBufferState.READING_FRAME || state_ == FrameBufferState.READ_FRAME_COMPLETE) {
+ readBufferBytesAllocated.addAndGet(-buffer_.array().length);
+ }
+ trans_.close();
+ }
+
+ /**
+ * Check if this FrameBuffer has a full frame read.
+ */
+ public boolean isFrameFullyRead() {
+ return state_ == FrameBufferState.READ_FRAME_COMPLETE;
+ }
+
+ /**
+ * After the processor has processed the invocation, whatever thread is
+ * managing invocations should call this method on this FrameBuffer so we
+ * know it's time to start trying to write again. Also, if it turns out that
+ * there actually isn't any data in the response buffer, we'll skip trying
+ * to write and instead go back to reading.
+ */
+ public void responseReady() {
+ // the read buffer is definitely no longer in use, so we will decrement
+ // our read buffer count. we do this here as well as in close because
+ // we'd like to free this read memory up as quickly as possible for other
+ // clients.
+ readBufferBytesAllocated.addAndGet(-buffer_.array().length);
+
+ if (response_.len() == 0) {
+ // go straight to reading again. this was probably an oneway method
+ state_ = FrameBufferState.AWAITING_REGISTER_READ;
+ buffer_ = null;
+ } else {
+ buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());
+
+ // set state that we're waiting to be switched to write. we do this
+ // asynchronously through requestSelectInterestChange() because there is
+ // a possibility that we're not in the main thread, and thus currently
+ // blocked in select(). (this functionality is in place for the sake of
+ // the HsHa server.)
+ state_ = FrameBufferState.AWAITING_REGISTER_WRITE;
+ }
+ requestSelectInterestChange();
+ }
+
+ /**
+ * Actually invoke the method signified by this FrameBuffer.
+ */
+ public void invoke() {
+ TTransport inTrans = getInputTransport();
+ TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
+ TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());
+
+ try {
+ processorFactory_.getProcessor(inTrans).process(inProt, outProt);
+ responseReady();
+ return;
+ } catch (TException te) {
+ LOGGER.warn("Exception while invoking!", te);
+ } catch (Exception e) {
+ LOGGER.error("Unexpected exception while invoking!", e);
+ }
+ // This will only be reached when there is an exception.
+ state_ = FrameBufferState.AWAITING_CLOSE;
+ requestSelectInterestChange();
+ }
+
+ /**
+ * Wrap the read buffer in a memory-based transport so a processor can read
+ * the data it needs to handle an invocation.
+ */
+ private TTransport getInputTransport() {
+ return new TMemoryInputTransport(buffer_.array());
+ }
+
+ /**
+ * Get the transport that should be used by the invoker for responding.
+ */
+ private TTransport getOutputTransport() {
+ response_ = new TByteArrayOutputStream();
+ return outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
+ }
+
+ /**
+ * Perform a read into buffer.
+ *
+ * @return true if the read succeeded, false if there was an error or the
+ * connection closed.
+ */
+ private boolean internalRead() {
+ try {
+ if (trans_.read(buffer_) < 0) {
+ return false;
+ }
+ return true;
+ } catch (IOException e) {
+ LOGGER.warn("Got an IOException in internalRead!", e);
+ return false;
+ }
+ }
+
+ /**
+ * We're done writing, so reset our interest ops and change state
+ * accordingly.
+ */
+ private void prepareRead() {
+ // we can set our interest directly without using the queue because
+ // we're in the select thread.
+ selectionKey_.interestOps(SelectionKey.OP_READ);
+ // get ready for another go-around
+ buffer_ = ByteBuffer.allocate(4);
+ state_ = FrameBufferState.READING_FRAME_SIZE;
+ }
+
+ /**
+ * When this FrameBuffer needs to change its select interests and execution
+ * might not be in its select thread, then this method will make sure the
+ * interest change gets done when the select thread wakes back up. When the
+ * current thread is this FrameBuffer's select thread, then it just does the
+ * interest change immediately.
+ */
+ private void requestSelectInterestChange() {
+ if (Thread.currentThread() == this.selectThread_) {
+ changeSelectInterests();
+ } else {
+ this.selectThread_.requestSelectInterestChange(this);
+ }
+ }
+ } // FrameBuffer
+}
diff --git a/lib/java/src/org/apache/thrift/server/Invocation.java b/lib/java/src/org/apache/thrift/server/Invocation.java
new file mode 100644
index 0000000..e8210f4
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/server/Invocation.java
@@ -0,0 +1,20 @@
+package org.apache.thrift.server;
+
+import org.apache.thrift.server.AbstractNonblockingServer.FrameBuffer;
+
+/**
+ * An Invocation represents a method call that is prepared to execute, given
+ * an idle worker thread. It contains the input and output protocols the
+ * thread's processor should use to perform the usual Thrift invocation.
+ */
+class Invocation implements Runnable {
+ private final FrameBuffer frameBuffer;
+
+ public Invocation(final FrameBuffer frameBuffer) {
+ this.frameBuffer = frameBuffer;
+ }
+
+ public void run() {
+ frameBuffer.invoke();
+ }
+}
\ No newline at end of file
diff --git a/lib/java/src/org/apache/thrift/server/THsHaServer.java b/lib/java/src/org/apache/thrift/server/THsHaServer.java
index f3dfd0a..3541154 100644
--- a/lib/java/src/org/apache/thrift/server/THsHaServer.java
+++ b/lib/java/src/org/apache/thrift/server/THsHaServer.java
@@ -27,16 +27,12 @@
import java.util.concurrent.TimeUnit;
import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* An extension of the TNonblockingServer to a Half-Sync/Half-Async server.
* Like TNonblockingServer, it relies on the use of TFramedTransport.
*/
public class THsHaServer extends TNonblockingServer {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(THsHaServer.class.getName());
public static class Args extends AbstractNonblockingServerArgs<Args> {
private int workerThreads = 5;
@@ -85,46 +81,30 @@
}
}
+
// This wraps all the functionality of queueing and thread pool management
// for the passing of Invocations from the Selector to workers.
- private ExecutorService invoker;
+ private final ExecutorService invoker;
+
+ private final Args args;
/**
- * Create server with every option fully specified, and with an injected
- * ExecutorService
+ * Create the server with the specified Args configuration
*/
public THsHaServer(Args args) {
super(args);
invoker = args.executorService == null ? createInvokerPool(args) : args.executorService;
+ this.args = args;
}
- /** @inheritDoc */
+ /**
+ * @inheritDoc
+ */
@Override
- public void serve() {
- // start listening, or exit
- if (!startListening()) {
- return;
- }
-
- // start the selector, or exit
- if (!startSelectorThread()) {
- return;
- }
-
- setServing(true);
-
- // this will block while we serve
+ protected void waitForShutdown() {
joinSelector();
-
gracefullyShutdownInvokerPool();
-
- setServing(false);
-
- // do a little cleanup
- stopListening();
-
- // ungracefully shut down the invoker pool?
}
/**
@@ -136,12 +116,13 @@
TimeUnit stopTimeoutUnit = options.stopTimeoutUnit;
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
- ExecutorService invoker = new ThreadPoolExecutor(workerThreads, workerThreads,
- stopTimeoutVal, stopTimeoutUnit, queue);
+ ExecutorService invoker = new ThreadPoolExecutor(workerThreads,
+ workerThreads, stopTimeoutVal, stopTimeoutUnit, queue);
return invoker;
}
+
protected void gracefullyShutdownInvokerPool() {
// try to gracefully shut down the executor service
invoker.shutdown();
@@ -150,7 +131,7 @@
// exception. If we don't do this, then we'll shut down prematurely. We want
// to let the executorService clear it's task queue, closing client sockets
// appropriately.
- long timeoutMS = 10000;
+ long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
long now = System.currentTimeMillis();
while (timeoutMS >= 0) {
try {
@@ -166,7 +147,8 @@
/**
* We override the standard invoke method here to queue the invocation for
- * invoker service instead of immediately invoking. The thread pool takes care of the rest.
+ * invoker service instead of immediately invoking. The thread pool takes care
+ * of the rest.
*/
@Override
protected boolean requestInvoke(FrameBuffer frameBuffer) {
@@ -181,24 +163,6 @@
}
protected Runnable getRunnable(FrameBuffer frameBuffer){
- return new Invocation(frameBuffer);
- }
-
- /**
- * An Invocation represents a method call that is prepared to execute, given
- * an idle worker thread. It contains the input and output protocols the
- * thread's processor should use to perform the usual Thrift invocation.
- */
- private class Invocation implements Runnable {
-
- private final FrameBuffer frameBuffer;
-
- public Invocation(final FrameBuffer frameBuffer) {
- this.frameBuffer = frameBuffer;
- }
-
- public void run() {
- frameBuffer.invoke();
- }
+ return new Invocation(frameBuffer);
}
}
diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
index d44d460..7afd4b3 100644
--- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
@@ -21,27 +21,12 @@
package org.apache.thrift.server;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.spi.SelectorProvider;
-import java.util.HashSet;
import java.util.Iterator;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-import org.apache.thrift.TByteArrayOutputStream;
-import org.apache.thrift.TException;
-import org.apache.thrift.protocol.TProtocol;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TIOStreamTransport;
-import org.apache.thrift.transport.TMemoryInputTransport;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingTransport;
-import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* A nonblocking TServer implementation. This allows for fairness amongst all
@@ -54,9 +39,7 @@
* transport, otherwise this server will be unable to determine when a whole
* method call has been read off the wire. Clients must also use TFramedTransport.
*/
-public class TNonblockingServer extends TServer {
- private static final Logger LOGGER =
- LoggerFactory.getLogger(TNonblockingServer.class.getName());
+public class TNonblockingServer extends AbstractNonblockingServer {
public static class Args extends AbstractNonblockingServerArgs<Args> {
public Args(TNonblockingServerTransport transport) {
@@ -64,97 +47,29 @@
}
}
- public static abstract class AbstractNonblockingServerArgs<T extends AbstractNonblockingServerArgs<T>> extends AbstractServerArgs<T> {
- public long maxReadBufferBytes = Long.MAX_VALUE;
-
- public AbstractNonblockingServerArgs(TNonblockingServerTransport transport) {
- super(transport);
- transportFactory(new TFramedTransport.Factory());
- }
- }
-
// Flag for stopping the server
private volatile boolean stopped_ = true;
- private SelectThread selectThread_;
-
- /**
- * The maximum amount of memory we will allocate to client IO buffers at a
- * time. Without this limit, the server will gladly allocate client buffers
- * right into an out of memory exception, rather than waiting.
- */
- private final long MAX_READ_BUFFER_BYTES;
-
- /**
- * How many bytes are currently allocated to read buffers.
- */
- private final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
+ private SelectAcceptThread selectAcceptThread_;
public TNonblockingServer(AbstractNonblockingServerArgs args) {
super(args);
- MAX_READ_BUFFER_BYTES = args.maxReadBufferBytes;
}
- /**
- * Begin accepting connections and processing invocations.
- */
- public void serve() {
- // start listening, or exit
- if (!startListening()) {
- return;
- }
-
- // start the selector, or exit
- if (!startSelectorThread()) {
- return;
- }
-
- setServing(true);
-
- // this will block while we serve
- joinSelector();
-
- setServing(false);
-
- // do a little cleanup
- stopListening();
- }
/**
- * Have the server transport start accepting connections.
- *
- * @return true if we started listening successfully, false if something went
- * wrong.
- */
- protected boolean startListening() {
- try {
- serverTransport_.listen();
- return true;
- } catch (TTransportException ttx) {
- LOGGER.error("Failed to start listening on server socket!", ttx);
- return false;
- }
- }
-
- /**
- * Stop listening for connections.
- */
- protected void stopListening() {
- serverTransport_.close();
- }
-
- /**
- * Start the selector thread running to deal with clients.
+ * Start the selector thread to deal with accepts and client messages.
*
* @return true if everything went ok, false if we couldn't start for some
* reason.
*/
- protected boolean startSelectorThread() {
+ @Override
+ protected boolean startThreads() {
// start the selector
try {
- selectThread_ = new SelectThread((TNonblockingServerTransport)serverTransport_);
+ selectAcceptThread_ = new SelectAcceptThread((TNonblockingServerTransport)serverTransport_);
stopped_ = false;
- selectThread_.start();
+ selectAcceptThread_.start();
return true;
} catch (IOException e) {
LOGGER.error("Failed to start selector thread!", e);
@@ -162,13 +77,18 @@
}
}
+ @Override
+ protected void waitForShutdown() {
+ joinSelector();
+ }
+
/**
- * Block until the selector exits.
+ * Block until the selector thread exits.
*/
protected void joinSelector() {
// wait until the selector thread exits
try {
- selectThread_.join();
+ selectAcceptThread_.join();
} catch (InterruptedException e) {
// for now, just silently ignore. technically this means we'll have less of
// a graceful shutdown as a result.
@@ -178,10 +98,11 @@
/**
* Stop serving and shut everything down.
*/
+ @Override
public void stop() {
stopped_ = true;
- if (selectThread_ != null) {
- selectThread_.wakeupSelector();
+ if (selectAcceptThread_ != null) {
+ selectAcceptThread_.wakeupSelector();
}
}
@@ -189,43 +110,33 @@
* Perform an invocation. This method could behave several different ways
* - invoke immediately inline, queue for separate execution, etc.
*/
+ @Override
protected boolean requestInvoke(FrameBuffer frameBuffer) {
frameBuffer.invoke();
return true;
}
- /**
- * A FrameBuffer wants to change its selection preferences, but might not be
- * in the select thread.
- */
- protected void requestSelectInterestChange(FrameBuffer frameBuffer) {
- selectThread_.requestSelectInterestChange(frameBuffer);
- }
public boolean isStopped() {
- return selectThread_.isStopped();
+ return selectAcceptThread_.isStopped();
}
/**
* The thread that will be doing all the selecting, managing new connections
* and those that still need to be read.
*/
- protected class SelectThread extends Thread {
+ protected class SelectAcceptThread extends AbstractSelectThread {
+ // The server transport on which new client transports will be accepted
private final TNonblockingServerTransport serverTransport;
- private final Selector selector;
-
- // List of FrameBuffers that want to change their selection interests.
- private final Set<FrameBuffer> selectInterestChanges =
- new HashSet<FrameBuffer>();
/**
- * Set up the SelectorThread.
+ * Set up the thread that will handle the non-blocking accepts, reads, and
+ * writes.
*/
- public SelectThread(final TNonblockingServerTransport serverTransport)
+ public SelectAcceptThread(final TNonblockingServerTransport serverTransport)
throws IOException {
this.serverTransport = serverTransport;
- this.selector = SelectorProvider.provider().openSelector();
serverTransport.registerSelector(selector);
}
@@ -251,26 +162,6 @@
}
/**
- * If the selector is blocked, wake it up.
- */
- public void wakeupSelector() {
- selector.wakeup();
- }
-
- /**
- * Add FrameBuffer to the list of select interest changes and wake up the
- * selector if it's blocked. When the select() call exits, it'll give the
- * FrameBuffer a chance to change its interests.
- */
- public void requestSelectInterestChange(FrameBuffer frameBuffer) {
- synchronized (selectInterestChanges) {
- selectInterestChanges.add(frameBuffer);
- }
- // wakeup the selector, if it's currently blocked.
- selector.wakeup();
- }
-
- /**
* Select and process IO events appropriately:
* If there are connections to be accepted, accept them.
* If there are existing connections with data waiting to be read, read it,
@@ -291,7 +182,7 @@
// skip if not valid
if (!key.isValid()) {
- cleanupSelectionkey(key);
+ cleanupSelectionKey(key);
continue;
}
@@ -315,19 +206,6 @@
}
/**
- * Check to see if there are any FrameBuffers that have switched their
- * interest type from read to write or vice versa.
- */
- private void processInterestChanges() {
- synchronized (selectInterestChanges) {
- for (FrameBuffer fb : selectInterestChanges) {
- fb.changeSelectInterests();
- }
- selectInterestChanges.clear();
- }
- }
-
- /**
* Accept a new connection.
*/
private void handleAccept() throws IOException {
@@ -339,368 +217,16 @@
clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
// add this key to the map
- FrameBuffer frameBuffer = new FrameBuffer(client, clientKey);
+ FrameBuffer frameBuffer = new FrameBuffer(client, clientKey,
+ SelectAcceptThread.this);
clientKey.attach(frameBuffer);
} catch (TTransportException tte) {
// something went wrong accepting.
LOGGER.warn("Exception trying to accept!", tte);
tte.printStackTrace();
- if (clientKey != null) cleanupSelectionkey(clientKey);
+ if (clientKey != null) cleanupSelectionKey(clientKey);
if (client != null) client.close();
}
}
-
- /**
- * Do the work required to read from a readable client. If the frame is
- * fully read, then invoke the method call.
- */
- private void handleRead(SelectionKey key) {
- FrameBuffer buffer = (FrameBuffer)key.attachment();
- if (!buffer.read()) {
- cleanupSelectionkey(key);
- return;
- }
-
- // if the buffer's frame read is complete, invoke the method.
- if (buffer.isFrameFullyRead()) {
- if (!requestInvoke(buffer)) {
- cleanupSelectionkey(key);
- }
- }
- }
-
- /**
- * Let a writable client get written, if there's data to be written.
- */
- private void handleWrite(SelectionKey key) {
- FrameBuffer buffer = (FrameBuffer)key.attachment();
- if (!buffer.write()) {
- cleanupSelectionkey(key);
- }
- }
-
- /**
- * Do connection-close cleanup on a given SelectionKey.
- */
- private void cleanupSelectionkey(SelectionKey key) {
- // remove the records from the two maps
- FrameBuffer buffer = (FrameBuffer)key.attachment();
- if (buffer != null) {
- // close the buffer
- buffer.close();
- }
- // cancel the selection key
- key.cancel();
- }
- } // SelectorThread
-
- /**
- * Class that implements a sort of state machine around the interaction with
- * a client and an invoker. It manages reading the frame size and frame data,
- * getting it handed off as wrapped transports, and then the writing of
- * response data back to the client. In the process it manages flipping the
- * read and write bits on the selection key for its client.
- */
- protected class FrameBuffer {
- //
- // Possible states for the FrameBuffer state machine.
- //
- // in the midst of reading the frame size off the wire
- private static final int READING_FRAME_SIZE = 1;
- // reading the actual frame data now, but not all the way done yet
- private static final int READING_FRAME = 2;
- // completely read the frame, so an invocation can now happen
- private static final int READ_FRAME_COMPLETE = 3;
- // waiting to get switched to listening for write events
- private static final int AWAITING_REGISTER_WRITE = 4;
- // started writing response data, not fully complete yet
- private static final int WRITING = 6;
- // another thread wants this framebuffer to go back to reading
- private static final int AWAITING_REGISTER_READ = 7;
- // we want our transport and selection key invalidated in the selector thread
- private static final int AWAITING_CLOSE = 8;
-
- //
- // Instance variables
- //
-
- // the actual transport hooked up to the client.
- public final TNonblockingTransport trans_;
-
- // the SelectionKey that corresponds to our transport
- private final SelectionKey selectionKey_;
-
- // where in the process of reading/writing are we?
- private int state_ = READING_FRAME_SIZE;
-
- // the ByteBuffer we'll be using to write and read, depending on the state
- private ByteBuffer buffer_;
-
- private TByteArrayOutputStream response_;
-
- public FrameBuffer( final TNonblockingTransport trans,
- final SelectionKey selectionKey) {
- trans_ = trans;
- selectionKey_ = selectionKey;
- buffer_ = ByteBuffer.allocate(4);
- }
-
- /**
- * Give this FrameBuffer a chance to read. The selector loop should have
- * received a read event for this FrameBuffer.
- *
- * @return true if the connection should live on, false if it should be
- * closed
- */
- public boolean read() {
- if (state_ == READING_FRAME_SIZE) {
- // try to read the frame size completely
- if (!internalRead()) {
- return false;
- }
-
- // if the frame size has been read completely, then prepare to read the
- // actual frame.
- if (buffer_.remaining() == 0) {
- // pull out the frame size as an integer.
- int frameSize = buffer_.getInt(0);
- if (frameSize <= 0) {
- LOGGER.error("Read an invalid frame size of " + frameSize
- + ". Are you using TFramedTransport on the client side?");
- return false;
- }
-
- // if this frame will always be too large for this server, log the
- // error and close the connection.
- if (frameSize > MAX_READ_BUFFER_BYTES) {
- LOGGER.error("Read a frame size of " + frameSize
- + ", which is bigger than the maximum allowable buffer size for ALL connections.");
- return false;
- }
-
- // if this frame will push us over the memory limit, then return.
- // with luck, more memory will free up the next time around.
- if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) {
- return true;
- }
-
- // increment the amount of memory allocated to read buffers
- readBufferBytesAllocated.addAndGet(frameSize);
-
- // reallocate the readbuffer as a frame-sized buffer
- buffer_ = ByteBuffer.allocate(frameSize);
-
- state_ = READING_FRAME;
- } else {
- // this skips the check of READING_FRAME state below, since we can't
- // possibly go on to that state if there's data left to be read at
- // this one.
- return true;
- }
- }
-
- // it is possible to fall through from the READING_FRAME_SIZE section
- // to READING_FRAME if there's already some frame data available once
- // READING_FRAME_SIZE is complete.
-
- if (state_ == READING_FRAME) {
- if (!internalRead()) {
- return false;
- }
-
- // since we're already in the select loop here for sure, we can just
- // modify our selection key directly.
- if (buffer_.remaining() == 0) {
- // get rid of the read select interests
- selectionKey_.interestOps(0);
- state_ = READ_FRAME_COMPLETE;
- }
-
- return true;
- }
-
- // if we fall through to this point, then the state must be invalid.
- LOGGER.error("Read was called but state is invalid (" + state_ + ")");
- return false;
- }
-
- /**
- * Give this FrameBuffer a chance to write its output to the final client.
- */
- public boolean write() {
- if (state_ == WRITING) {
- try {
- if (trans_.write(buffer_) < 0) {
- return false;
- }
- } catch (IOException e) {
- LOGGER.warn("Got an IOException during write!", e);
- return false;
- }
-
- // we're done writing. now we need to switch back to reading.
- if (buffer_.remaining() == 0) {
- prepareRead();
- }
- return true;
- }
-
- LOGGER.error("Write was called, but state is invalid (" + state_ + ")");
- return false;
- }
-
- /**
- * Give this FrameBuffer a chance to set its interest to write, once data
- * has come in.
- */
- public void changeSelectInterests() {
- if (state_ == AWAITING_REGISTER_WRITE) {
- // set the OP_WRITE interest
- selectionKey_.interestOps(SelectionKey.OP_WRITE);
- state_ = WRITING;
- } else if (state_ == AWAITING_REGISTER_READ) {
- prepareRead();
- } else if (state_ == AWAITING_CLOSE){
- close();
- selectionKey_.cancel();
- } else {
- LOGGER.error(
- "changeSelectInterest was called, but state is invalid ("
- + state_ + ")");
- }
- }
-
- /**
- * Shut the connection down.
- */
- public void close() {
- // if we're being closed due to an error, we might have allocated a
- // buffer that we need to subtract for our memory accounting.
- if (state_ == READING_FRAME || state_ == READ_FRAME_COMPLETE) {
- readBufferBytesAllocated.addAndGet(-buffer_.array().length);
- }
- trans_.close();
- }
-
- /**
- * Check if this FrameBuffer has a full frame read.
- */
- public boolean isFrameFullyRead() {
- return state_ == READ_FRAME_COMPLETE;
- }
-
- /**
- * After the processor has processed the invocation, whatever thread is
- * managing invocations should call this method on this FrameBuffer so we
- * know it's time to start trying to write again. Also, if it turns out
- * that there actually isn't any data in the response buffer, we'll skip
- * trying to write and instead go back to reading.
- */
- public void responseReady() {
- // the read buffer is definitely no longer in use, so we will decrement
- // our read buffer count. we do this here as well as in close because
- // we'd like to free this read memory up as quickly as possible for other
- // clients.
- readBufferBytesAllocated.addAndGet(-buffer_.array().length);
-
- if (response_.len() == 0) {
- // go straight to reading again. this was probably an oneway method
- state_ = AWAITING_REGISTER_READ;
- buffer_ = null;
- } else {
- buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());
-
- // set state that we're waiting to be switched to write. we do this
- // asynchronously through requestSelectInterestChange() because there is a
- // possibility that we're not in the main thread, and thus currently
- // blocked in select(). (this functionality is in place for the sake of
- // the HsHa server.)
- state_ = AWAITING_REGISTER_WRITE;
- }
- requestSelectInterestChange();
- }
-
- /**
- * Actually invoke the method signified by this FrameBuffer.
- */
- public void invoke() {
- TTransport inTrans = getInputTransport();
- TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
- TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());
-
- try {
- processorFactory_.getProcessor(inTrans).process(inProt, outProt);
- responseReady();
- return;
- } catch (TException te) {
- LOGGER.warn("Exception while invoking!", te);
- } catch (Exception e) {
- LOGGER.error("Unexpected exception while invoking!", e);
- }
- // This will only be reached when there is an exception.
- state_ = AWAITING_CLOSE;
- requestSelectInterestChange();
- }
-
- /**
- * Wrap the read buffer in a memory-based transport so a processor can read
- * the data it needs to handle an invocation.
- */
- private TTransport getInputTransport() {
- return new TMemoryInputTransport(buffer_.array());
- }
-
- /**
- * Get the transport that should be used by the invoker for responding.
- */
- private TTransport getOutputTransport() {
- response_ = new TByteArrayOutputStream();
- return outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
- }
-
- /**
- * Perform a read into buffer.
- *
- * @return true if the read succeeded, false if there was an error or the
- * connection closed.
- */
- private boolean internalRead() {
- try {
- if (trans_.read(buffer_) < 0) {
- return false;
- }
- return true;
- } catch (IOException e) {
- LOGGER.warn("Got an IOException in internalRead!", e);
- return false;
- }
- }
-
- /**
- * We're done writing, so reset our interest ops and change state accordingly.
- */
- private void prepareRead() {
- // we can set our interest directly without using the queue because
- // we're in the select thread.
- selectionKey_.interestOps(SelectionKey.OP_READ);
- // get ready for another go-around
- buffer_ = ByteBuffer.allocate(4);
- state_ = READING_FRAME_SIZE;
- }
-
- /**
- * When this FrameBuffer needs to change it's select interests and execution
- * might not be in the select thread, then this method will make sure the
- * interest change gets done when the select thread wakes back up. When the
- * current thread is the select thread, then it just does the interest change
- * immediately.
- */
- private void requestSelectInterestChange() {
- if (Thread.currentThread() == selectThread_) {
- changeSelectInterests();
- } else {
- TNonblockingServer.this.requestSelectInterestChange(this);
- }
- }
- } // FrameBuffer
+ } // SelectAcceptThread
}
diff --git a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
new file mode 100644
index 0000000..4cf5f1b
--- /dev/null
+++ b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
@@ -0,0 +1,646 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.thrift.server;
+
+import java.io.IOException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Half-Sync/Half-Async server with a separate pool of threads to handle
+ * non-blocking I/O. Accepts are handled on a single thread, and a configurable
+ * number of nonblocking selector threads manage reading and writing of client
+ * connections. A synchronous worker thread pool handles processing of requests.
+ *
+ * Performs better than TNonblockingServer/THsHaServer in multi-core
+ * environments when the the bottleneck is CPU on the single selector thread
+ * handling I/O. In addition, because the accept handling is decoupled from
+ * reads/writes and invocation, the server has better ability to handle back-
+ * pressure from new connections (e.g. stop accepting when busy).
+ *
+ * Like TNonblockingServer, it relies on the use of TFramedTransport.
+ */
+public class TThreadedSelectorServer extends AbstractNonblockingServer {
+ private static final Logger LOGGER = LoggerFactory.getLogger(TThreadedSelectorServer.class.getName());
+
+ public static class Args extends AbstractNonblockingServerArgs<Args> {
+
+ /** The number of threads for selecting on already-accepted connections */
+ public int selectorThreads = 2;
+ /**
+ * The size of the executor service (if none is specified) that will handle
+ * invocations. This may be set to 0, in which case invocations will be
+ * handled directly on the selector threads (as is in TNonblockingServer)
+ */
+ private int workerThreads = 5;
+ /** Time to wait for server to stop gracefully */
+ private int stopTimeoutVal = 60;
+ private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+ /** The ExecutorService for handling dispatched requests */
+ private ExecutorService executorService = null;
+ /**
+ * The size of the blocking queue per selector thread for passing accepted
+ * connections to the selector thread
+ */
+ private int acceptQueueSizePerThread = 4;
+
+ /**
+ * Determines the strategy for handling new accepted connections.
+ */
+ public static enum AcceptPolicy {
+ /**
+ * Require accepted connection registration to be handled by the executor.
+ * If the worker pool is saturated, further accepts will be closed
+ * immediately. Slightly increases latency due to an extra scheduling.
+ */
+ FAIR_ACCEPT,
+ /**
+ * Handle the accepts as fast as possible, disregarding the status of the
+ * executor service.
+ */
+ FAST_ACCEPT
+ }
+
+ private AcceptPolicy acceptPolicy = AcceptPolicy.FAST_ACCEPT;
+
+ public Args(TNonblockingServerTransport transport) {
+ super(transport);
+ }
+
+ public Args selectorThreads(int i) {
+ selectorThreads = i;
+ return this;
+ }
+
+ public int getSelectorThreads() {
+ return selectorThreads;
+ }
+
+ public Args workerThreads(int i) {
+ workerThreads = i;
+ return this;
+ }
+
+ public int getWorkerThreads() {
+ return workerThreads;
+ }
+
+ public int getStopTimeoutVal() {
+ return stopTimeoutVal;
+ }
+
+ public Args stopTimeoutVal(int stopTimeoutVal) {
+ this.stopTimeoutVal = stopTimeoutVal;
+ return this;
+ }
+
+ public TimeUnit getStopTimeoutUnit() {
+ return stopTimeoutUnit;
+ }
+
+ public Args stopTimeoutUnit(TimeUnit stopTimeoutUnit) {
+ this.stopTimeoutUnit = stopTimeoutUnit;
+ return this;
+ }
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public Args executorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ return this;
+ }
+
+ public int getAcceptQueueSizePerThread() {
+ return acceptQueueSizePerThread;
+ }
+
+ public Args acceptQueueSizePerThread(int acceptQueueSizePerThread) {
+ this.acceptQueueSizePerThread = acceptQueueSizePerThread;
+ return this;
+ }
+
+ public AcceptPolicy getAcceptPolicy() {
+ return acceptPolicy;
+ }
+
+ public Args acceptPolicy(AcceptPolicy acceptPolicy) {
+ this.acceptPolicy = acceptPolicy;
+ return this;
+ }
+
+ public void validate() {
+ if (selectorThreads <= 0) {
+ throw new IllegalArgumentException("selectorThreads must be positive.");
+ }
+ if (workerThreads < 0) {
+ throw new IllegalArgumentException("workerThreads must be non-negative.");
+ }
+ if (acceptQueueSizePerThread <= 0) {
+ throw new IllegalArgumentException("acceptQueueSizePerThread must be positive.");
+ }
+ }
+ }
+
+ // Flag for stopping the server
+ private volatile boolean stopped_ = true;
+
+ // The thread handling all accepts
+ private AcceptThread acceptThread;
+
+ // Threads handling events on client transports
+ private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();
+
+ // This wraps all the functionality of queueing and thread pool management
+ // for the passing of Invocations from the selector thread(s) to the workers
+ // (if any).
+ private final ExecutorService invoker;
+
+ private final Args args;
+
+ /**
+ * Create the server with the specified Args configuration
+ */
+ public TThreadedSelectorServer(Args args) {
+ super(args);
+ args.validate();
+ invoker = args.executorService == null ? createDefaultExecutor(args) : args.executorService;
+ this.args = args;
+ }
+
+ /**
+ * Start the accept and selector threads running to deal with clients.
+ *
+ * @return true if everything went ok, false if we couldn't start for some
+ * reason.
+ */
+ @Override
+ protected boolean startThreads() {
+ try {
+ for (int i = 0; i < args.selectorThreads; ++i) {
+ selectorThreads.add(new SelectorThread(args.acceptQueueSizePerThread));
+ }
+ acceptThread = new AcceptThread((TNonblockingServerTransport) serverTransport_,
+ createSelectorThreadLoadBalancer(selectorThreads));
+ stopped_ = false;
+ for (SelectorThread thread : selectorThreads) {
+ thread.start();
+ }
+ acceptThread.start();
+ return true;
+ } catch (IOException e) {
+ LOGGER.error("Failed to start threads!", e);
+ return false;
+ }
+ }
+
+ /**
+ * Joins the accept and selector threads and shuts down the executor service.
+ */
+ @Override
+ protected void waitForShutdown() {
+ try {
+ joinThreads();
+ } catch (InterruptedException e) {
+ // Non-graceful shutdown occurred
+ LOGGER.error("Interrupted while joining threads!", e);
+ }
+ gracefullyShutdownInvokerPool();
+ }
+
+ protected void joinThreads() throws InterruptedException {
+ // wait until the io threads exit
+ acceptThread.join();
+ for (SelectorThread thread : selectorThreads) {
+ thread.join();
+ }
+ }
+
+ /**
+ * Stop serving and shut everything down.
+ */
+ @Override
+ public void stop() {
+ stopped_ = true;
+
+ // Stop queuing connect attempts asap
+ stopListening();
+
+ if (acceptThread != null) {
+ acceptThread.wakeupSelector();
+ }
+ if (selectorThreads != null) {
+ for (SelectorThread thread : selectorThreads) {
+ if (thread != null)
+ thread.wakeupSelector();
+ }
+ }
+ }
+
+ protected void gracefullyShutdownInvokerPool() {
+ // try to gracefully shut down the executor service
+ invoker.shutdown();
+
+ // Loop until awaitTermination finally does return without a interrupted
+ // exception. If we don't do this, then we'll shut down prematurely. We want
+ // to let the executorService clear it's task queue, closing client sockets
+ // appropriately.
+ long timeoutMS = args.stopTimeoutUnit.toMillis(args.stopTimeoutVal);
+ long now = System.currentTimeMillis();
+ while (timeoutMS >= 0) {
+ try {
+ invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
+ break;
+ } catch (InterruptedException ix) {
+ long newnow = System.currentTimeMillis();
+ timeoutMS -= (newnow - now);
+ now = newnow;
+ }
+ }
+ }
+
+ /**
+ * We override the standard invoke method here to queue the invocation for
+ * invoker service instead of immediately invoking. If there is no thread
+ * pool, handle the invocation inline on this thread
+ */
+ @Override
+ protected boolean requestInvoke(FrameBuffer frameBuffer) {
+ Runnable invocation = getRunnable(frameBuffer);
+ if (invoker != null) {
+ try {
+ invoker.execute(invocation);
+ return true;
+ } catch (RejectedExecutionException rx) {
+ LOGGER.warn("ExecutorService rejected execution!", rx);
+ return false;
+ }
+ } else {
+ // Invoke on the caller's thread
+ invocation.run();
+ return true;
+ }
+ }
+
+ protected Runnable getRunnable(FrameBuffer frameBuffer) {
+ return new Invocation(frameBuffer);
+ }
+
+ /**
+ * Helper to create the invoker if one is not specified
+ */
+ protected static ExecutorService createDefaultExecutor(Args options) {
+ return (options.workerThreads > 0) ? Executors.newFixedThreadPool(options.workerThreads) : null;
+ }
+
+ private static BlockingQueue<TNonblockingTransport> createDefaultAcceptQueue(int queueSize) {
+ if (queueSize == 0) {
+ // Unbounded queue
+ return new LinkedBlockingQueue<TNonblockingTransport>();
+ }
+ return new ArrayBlockingQueue<TNonblockingTransport>(queueSize);
+ }
+
+ /**
+ * The thread that selects on the server transport (listen socket) and accepts
+ * new connections to hand off to the IO selector threads
+ */
+ protected class AcceptThread extends Thread {
+
+ // The listen socket to accept on
+ private final TNonblockingServerTransport serverTransport;
+ private final Selector acceptSelector;
+
+ private final SelectorThreadLoadBalancer threadChooser;
+
+ /**
+ * Set up the AcceptThead
+ *
+ * @throws IOException
+ */
+ public AcceptThread(TNonblockingServerTransport serverTransport,
+ SelectorThreadLoadBalancer threadChooser) throws IOException {
+ this.serverTransport = serverTransport;
+ this.threadChooser = threadChooser;
+ this.acceptSelector = SelectorProvider.provider().openSelector();
+ this.serverTransport.registerSelector(acceptSelector);
+ }
+
+ /**
+ * The work loop. Selects on the server transport and accepts. If there was
+ * a server transport that had blocking accepts, and returned on blocking
+ * client transports, that should be used instead
+ */
+ public void run() {
+ try {
+ while (!stopped_) {
+ select();
+ }
+ } catch (Throwable t) {
+ LOGGER.error("run() exiting due to uncaught error", t);
+ } finally {
+ // This will wake up the selector threads
+ TThreadedSelectorServer.this.stop();
+ }
+ }
+
+ /**
+ * If the selector is blocked, wake it up.
+ */
+ public void wakeupSelector() {
+ acceptSelector.wakeup();
+ }
+
+ /**
+ * Select and process IO events appropriately: If there are connections to
+ * be accepted, accept them.
+ */
+ private void select() {
+ try {
+ // wait for connect events.
+ acceptSelector.select();
+
+ // process the io events we received
+ Iterator<SelectionKey> selectedKeys = acceptSelector.selectedKeys().iterator();
+ while (!stopped_ && selectedKeys.hasNext()) {
+ SelectionKey key = selectedKeys.next();
+ selectedKeys.remove();
+
+ // skip if not valid
+ if (!key.isValid()) {
+ continue;
+ }
+
+ if (key.isAcceptable()) {
+ handleAccept();
+ } else {
+ LOGGER.warn("Unexpected state in select! " + key.interestOps());
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Got an IOException while selecting!", e);
+ }
+ }
+
+ /**
+ * Accept a new connection.
+ */
+ private void handleAccept() {
+ final TNonblockingTransport client = doAccept();
+ if (client != null) {
+ // Pass this connection to a selector thread
+ final SelectorThread targetThread = threadChooser.nextThread();
+
+ if (args.acceptPolicy == Args.AcceptPolicy.FAST_ACCEPT || invoker == null) {
+ doAddAccept(targetThread, client);
+ } else {
+ // FAIR_ACCEPT
+ try {
+ invoker.submit(new Runnable() {
+ public void run() {
+ doAddAccept(targetThread, client);
+ }
+ });
+ } catch (RejectedExecutionException rx) {
+ LOGGER.warn("ExecutorService rejected accept registration!", rx);
+ // close immediately
+ client.close();
+ }
+ }
+ }
+ }
+
+ private TNonblockingTransport doAccept() {
+ try {
+ return (TNonblockingTransport) serverTransport.accept();
+ } catch (TTransportException tte) {
+ // something went wrong accepting.
+ LOGGER.warn("Exception trying to accept!", tte);
+ return null;
+ }
+ }
+
+ private void doAddAccept(SelectorThread thread, TNonblockingTransport client) {
+ if (!thread.addAcceptedConnection(client)) {
+ client.close();
+ }
+ }
+ } // AcceptThread
+
+ /**
+ * The SelectorThread(s) will be doing all the selecting on accepted active
+ * connections.
+ */
+ protected class SelectorThread extends AbstractSelectThread {
+
+ // Accepted connections added by the accept thread.
+ private final BlockingQueue<TNonblockingTransport> acceptedQueue;
+
+ /**
+ * Set up the SelectorThread with an unbounded queue for incoming accepts.
+ *
+ * @throws IOException
+ * if a selector cannot be created
+ */
+ public SelectorThread() throws IOException {
+ this(new LinkedBlockingQueue<TNonblockingTransport>());
+ }
+
+ /**
+ * Set up the SelectorThread with an bounded queue for incoming accepts.
+ *
+ * @throws IOException
+ * if a selector cannot be created
+ */
+ public SelectorThread(int maxPendingAccepts) throws IOException {
+ this(createDefaultAcceptQueue(maxPendingAccepts));
+ }
+
+ /**
+ * Set up the SelectorThread with a specified queue for connections.
+ *
+ * @param acceptedQueue
+ * The BlockingQueue implementation for holding incoming accepted
+ * connections.
+ * @throws IOException
+ * if a selector cannot be created.
+ */
+ public SelectorThread(BlockingQueue<TNonblockingTransport> acceptedQueue) throws IOException {
+ this.acceptedQueue = acceptedQueue;
+ }
+
+ /**
+ * Hands off an accepted connection to be handled by this thread. This
+ * method will block if the queue for new connections is at capacity.
+ *
+ * @param accepted
+ * The connection that has been accepted.
+ * @return true if the connection has been successfully added.
+ */
+ public boolean addAcceptedConnection(TNonblockingTransport accepted) {
+ try {
+ acceptedQueue.put(accepted);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Interrupted while adding accepted connection!", e);
+ return false;
+ }
+ selector.wakeup();
+ return true;
+ }
+
+ /**
+ * The work loop. Handles selecting (read/write IO), dispatching, and
+ * managing the selection preferences of all existing connections.
+ */
+ public void run() {
+ try {
+ while (!stopped_) {
+ select();
+ processAcceptedConnections();
+ processInterestChanges();
+ }
+ } catch (Throwable t) {
+ LOGGER.error("run() exiting due to uncaught error", t);
+ } finally {
+ // This will wake up the accept thread and the other selector threads
+ TThreadedSelectorServer.this.stop();
+ }
+ }
+
+ /**
+ * Select and process IO events appropriately: If there are existing
+ * connections with data waiting to be read, read it, buffering until a
+ * whole frame has been read. If there are any pending responses, buffer
+ * them until their target client is available, and then send the data.
+ */
+ private void select() {
+ try {
+ // wait for io events.
+ selector.select();
+
+ // process the io events we received
+ Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
+ while (!stopped_ && selectedKeys.hasNext()) {
+ SelectionKey key = selectedKeys.next();
+ selectedKeys.remove();
+
+ // skip if not valid
+ if (!key.isValid()) {
+ cleanupSelectionKey(key);
+ continue;
+ }
+
+ if (key.isReadable()) {
+ // deal with reads
+ handleRead(key);
+ } else if (key.isWritable()) {
+ // deal with writes
+ handleWrite(key);
+ } else {
+ LOGGER.warn("Unexpected state in select! " + key.interestOps());
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.warn("Got an IOException while selecting!", e);
+ }
+ }
+
+ private void processAcceptedConnections() {
+ // Register accepted connections
+ while (!stopped_) {
+ TNonblockingTransport accepted = acceptedQueue.poll();
+ if (accepted == null) {
+ break;
+ }
+ registerAccepted(accepted);
+ }
+ }
+
+ private void registerAccepted(TNonblockingTransport accepted) {
+ SelectionKey clientKey = null;
+ try {
+ clientKey = accepted.registerSelector(selector, SelectionKey.OP_READ);
+
+ FrameBuffer frameBuffer = new FrameBuffer(accepted, clientKey, SelectorThread.this);
+ clientKey.attach(frameBuffer);
+ } catch (IOException e) {
+ LOGGER.warn("Failed to register accepted connection to selector!", e);
+ if (clientKey != null) {
+ cleanupSelectionKey(clientKey);
+ }
+ accepted.close();
+ }
+ }
+ } // SelectorThread
+
+ /**
+ * Creates a SelectorThreadLoadBalancer to be used by the accept thread for
+ * assigning newly accepted connections across the threads.
+ */
+ protected SelectorThreadLoadBalancer createSelectorThreadLoadBalancer(Collection<? extends SelectorThread> threads) {
+ return new SelectorThreadLoadBalancer(threads);
+ }
+
+ /**
+ * A round robin load balancer for choosing selector threads for new
+ * connections.
+ */
+ protected class SelectorThreadLoadBalancer {
+ private final Collection<? extends SelectorThread> threads;
+ private Iterator<? extends SelectorThread> nextThreadIterator;
+
+ public <T extends SelectorThread> SelectorThreadLoadBalancer(Collection<T> threads) {
+ if (threads.isEmpty()) {
+ throw new IllegalArgumentException("At least one selector thread is required");
+ }
+ this.threads = Collections.unmodifiableList(new ArrayList<T>(threads));
+ nextThreadIterator = this.threads.iterator();
+ }
+
+ public SelectorThread nextThread() {
+ // Choose a selector thread (round robin)
+ if (!nextThreadIterator.hasNext()) {
+ nextThreadIterator = threads.iterator();
+ }
+ return nextThreadIterator.next();
+ }
+ }
+}
diff --git a/lib/java/test/org/apache/thrift/server/TestThreadedSelectorServer.java b/lib/java/test/org/apache/thrift/server/TestThreadedSelectorServer.java
new file mode 100644
index 0000000..ed729a2
--- /dev/null
+++ b/lib/java/test/org/apache/thrift/server/TestThreadedSelectorServer.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.thrift.server;
+
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.TThreadedSelectorServer.Args;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+
+public class TestThreadedSelectorServer extends TestNonblockingServer {
+ protected TServer getServer(TProcessor processor, TNonblockingServerSocket socket, TProtocolFactory protoFactory) {
+ return new TThreadedSelectorServer(new Args(socket).processor(processor).protocolFactory(protoFactory));
+ }
+}