(THRIFT-5) A TNonblockingServers (single-threaded and thread-pool) for Java
This patch adds two Thrift servers for Java that both use non-blocking I/O
to avoid locking up worker threads for idle connections. The two classes are
- TNonblockingServer, which supports single-threaded serving
- THsHaServer, which performs I/O in one thread and method invocations in
a configurable thread pool.
To support these servers, TNonblockingServerSocket and TNonblockingSocket
have been added.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@673550 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/com/facebook/thrift/server/THsHaServer.java b/lib/java/src/com/facebook/thrift/server/THsHaServer.java
new file mode 100644
index 0000000..a8764ec
--- /dev/null
+++ b/lib/java/src/com/facebook/thrift/server/THsHaServer.java
@@ -0,0 +1,288 @@
+
+package com.facebook.thrift.server;
+
+import com.facebook.thrift.TException;
+import com.facebook.thrift.TProcessor;
+import com.facebook.thrift.TProcessorFactory;
+import com.facebook.thrift.protocol.TProtocol;
+import com.facebook.thrift.protocol.TProtocolFactory;
+import com.facebook.thrift.protocol.TBinaryProtocol;
+import com.facebook.thrift.transport.TNonblockingServerTransport;
+import com.facebook.thrift.transport.TTransport;
+import com.facebook.thrift.transport.TFramedTransport;
+import com.facebook.thrift.transport.TNonblockingTransport;
+import com.facebook.thrift.transport.TTransportException;
+import com.facebook.thrift.transport.TTransportFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import java.io.IOException;
+
+/**
+ * An extension of the TNonblockingServer to a Half-Sync/Half-Async server.
+ * Like TNonblockingServer, it relies on the use of TFramedTransport.
+ */
+public class THsHaServer extends TNonblockingServer {
+
+ // This wraps all the functionality of queueing and thread pool management
+ // for the passing of Invocations from the Selector to workers.
+ private ExecutorService invoker;
+
+ private final Options options;
+
+ /**
+ * Create server with given processor, and server transport. Default server
+ * options, TBinaryProtocol for the protocol, and TFramedTransport.Factory on
+ * both input and output transports. A TProcessorFactory will be created that
+ * always returns the specified processor.
+ */
+ public THsHaServer( TProcessor processor,
+ TNonblockingServerTransport serverTransport) {
+ this(processor, serverTransport, new Options());
+ }
+
+ /**
+ * Create server with given processor, server transport, and server options
+ * using TBinaryProtocol for the protocol, and TFramedTransport.Factory on
+ * both input and output transports. A TProcessorFactory will be created that
+ * always returns the specified processor.
+ */
+ public THsHaServer( TProcessor processor,
+ TNonblockingServerTransport serverTransport,
+ Options options) {
+ this(new TProcessorFactory(processor), serverTransport, options);
+ }
+
+ /**
+ * Create server with specified processor factory and server transport. Uses
+ * default options. TBinaryProtocol is assumed. TFramedTransport.Factory is
+ * used on both input and output transports.
+ */
+ public THsHaServer( TProcessorFactory processorFactory,
+ TNonblockingServerTransport serverTransport) {
+ this(processorFactory, serverTransport, new Options());
+ }
+
+ /**
+ * Create server with specified processor factory, server transport, and server
+ * options. TBinaryProtocol is assumed. TFramedTransport.Factory is used on
+ * both input and output transports.
+ */
+ public THsHaServer( TProcessorFactory processorFactory,
+ TNonblockingServerTransport serverTransport,
+ Options options) {
+ this(processorFactory, serverTransport, new TFramedTransport.Factory(),
+ new TBinaryProtocol.Factory(), options);
+ }
+
+ /**
+ * Server with specified processor, server transport, and in/out protocol
+ * factory. Defaults will be used for in/out transport factory and server
+ * options.
+ */
+ public THsHaServer( TProcessor processor,
+ TNonblockingServerTransport serverTransport,
+ TProtocolFactory protocolFactory) {
+ this(processor, serverTransport, protocolFactory, new Options());
+ }
+
+ /**
+ * Server with specified processor, server transport, and in/out protocol
+ * factory. Defaults will be used for in/out transport factory and server
+ * options.
+ */
+ public THsHaServer( TProcessor processor,
+ TNonblockingServerTransport serverTransport,
+ TProtocolFactory protocolFactory,
+ Options options) {
+ this(processor, serverTransport, new TFramedTransport.Factory(),
+ protocolFactory);
+ }
+
+ /**
+ * Create server with specified processor, server transport, in/out
+ * transport factory, in/out protocol factory, and default server options. A
+ * processor factory will be created that always returns the specified
+ * processor.
+ */
+ public THsHaServer( TProcessor processor,
+ TNonblockingServerTransport serverTransport,
+ TFramedTransport.Factory transportFactory,
+ TProtocolFactory protocolFactory) {
+ this(new TProcessorFactory(processor), serverTransport,
+ transportFactory, protocolFactory);
+ }
+
+ /**
+ * Create server with specified processor factory, server transport, in/out
+ * transport factory, in/out protocol factory, and default server options.
+ */
+ public THsHaServer( TProcessorFactory processorFactory,
+ TNonblockingServerTransport serverTransport,
+ TFramedTransport.Factory transportFactory,
+ TProtocolFactory protocolFactory) {
+ this(processorFactory, serverTransport,
+ transportFactory, transportFactory,
+ protocolFactory, protocolFactory, new Options());
+ }
+
+ /**
+ * Create server with specified processor factory, server transport, in/out
+ * transport factory, in/out protocol factory, and server options.
+ */
+ public THsHaServer( TProcessorFactory processorFactory,
+ TNonblockingServerTransport serverTransport,
+ TFramedTransport.Factory transportFactory,
+ TProtocolFactory protocolFactory,
+ Options options) {
+ this(processorFactory, serverTransport,
+ transportFactory, transportFactory,
+ protocolFactory, protocolFactory,
+ options);
+ }
+
+ /**
+ * Create server with everything specified, except use default server options.
+ */
+ public THsHaServer( TProcessor processor,
+ TNonblockingServerTransport serverTransport,
+ TFramedTransport.Factory inputTransportFactory,
+ TFramedTransport.Factory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory) {
+ this(new TProcessorFactory(processor), serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory);
+ }
+
+ /**
+ * Create server with everything specified, except use default server options.
+ */
+ public THsHaServer( TProcessorFactory processorFactory,
+ TNonblockingServerTransport serverTransport,
+ TFramedTransport.Factory inputTransportFactory,
+ TFramedTransport.Factory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory)
+ {
+ this(processorFactory, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory, new Options());
+ }
+
+ /**
+ * Create server with every option fully specified.
+ */
+ public THsHaServer( TProcessorFactory processorFactory,
+ TNonblockingServerTransport serverTransport,
+ TFramedTransport.Factory inputTransportFactory,
+ TFramedTransport.Factory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory,
+ Options options)
+ {
+ super(processorFactory, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory);
+ this.options = options;
+ }
+
+ /** @inheritdoc */
+ @Override
+ public void serve() {
+ if (!startInvokerPool()) {
+ return;
+ }
+
+ // start listening, or exit
+ if (!startListening()) {
+ return;
+ }
+
+ // start the selector, or exit
+ if (!startSelectorThread()) {
+ return;
+ }
+
+ // this will block while we serve
+ joinSelector();
+
+ gracefullyShutdownInvokerPool();
+
+ // do a little cleanup
+ stopListening();
+
+ // ungracefully shut down the invoker pool?
+ }
+
+ protected boolean startInvokerPool() {
+ // start the invoker pool
+ LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
+ invoker = new ThreadPoolExecutor(options.minWorkerThreads,
+ options.maxWorkerThreads, options.stopTimeoutVal, options.stopTimeoutUnit,
+ queue);
+
+ return true;
+ }
+
+ protected void gracefullyShutdownInvokerPool() {
+ // try to gracefully shut down the executor service
+ invoker.shutdown();
+
+ // Loop until awaitTermination finally does return without a interrupted
+ // exception. If we don't do this, then we'll shut down prematurely. We want
+ // to let the executorService clear it's task queue, closing client sockets
+ // appropriately.
+ long timeoutMS = 10000;
+ long now = System.currentTimeMillis();
+ while (timeoutMS >= 0) {
+ try {
+ invoker.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
+ break;
+ } catch (InterruptedException ix) {
+ long newnow = System.currentTimeMillis();
+ timeoutMS -= (newnow - now);
+ now = newnow;
+ }
+ }
+ }
+
+ /**
+ * We override the standard invoke method here to queue the invocation for
+ * invoker service instead of immediately invoking. The thread pool takes care of the rest.
+ */
+ @Override
+ protected void requestInvoke(FrameBuffer frameBuffer) {
+ invoker.execute(new Invocation(frameBuffer));
+ }
+
+ /**
+ * An Invocation represents a method call that is prepared to execute, given
+ * an idle worker thread. It contains the input and output protocols the
+ * thread's processor should use to perform the usual Thrift invocation.
+ */
+ private class Invocation implements Runnable {
+
+ private final FrameBuffer frameBuffer;
+
+ public Invocation(final FrameBuffer frameBuffer) {
+ this.frameBuffer = frameBuffer;
+ }
+
+ public void run() {
+ frameBuffer.invoke();
+ }
+ }
+
+ public static class Options {
+ public int minWorkerThreads = 5;
+ public int maxWorkerThreads = Integer.MAX_VALUE;
+ public int stopTimeoutVal = 60;
+ public TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;
+ }
+}
diff --git a/lib/java/src/com/facebook/thrift/server/TNonblockingServer.java b/lib/java/src/com/facebook/thrift/server/TNonblockingServer.java
new file mode 100644
index 0000000..d09ec67
--- /dev/null
+++ b/lib/java/src/com/facebook/thrift/server/TNonblockingServer.java
@@ -0,0 +1,678 @@
+
+package com.facebook.thrift.server;
+
+import com.facebook.thrift.TException;
+import com.facebook.thrift.TProcessor;
+import com.facebook.thrift.TProcessorFactory;
+import com.facebook.thrift.protocol.TProtocol;
+import com.facebook.thrift.protocol.TProtocolFactory;
+import com.facebook.thrift.protocol.TBinaryProtocol;
+import com.facebook.thrift.transport.TNonblockingServerTransport;
+import com.facebook.thrift.transport.TIOStreamTransport;
+import com.facebook.thrift.transport.TTransport;
+import com.facebook.thrift.transport.TFramedTransport;
+import com.facebook.thrift.transport.TNonblockingTransport;
+import com.facebook.thrift.transport.TTransportException;
+import com.facebook.thrift.transport.TTransportFactory;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.Iterator;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import java.io.IOException;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Selector;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.spi.SelectorProvider;
+import java.nio.channels.CancelledKeyException;
+import java.nio.channels.ClosedChannelException;
+
+/**
+ * A nonblocking TServer implementation. This allows for fairness amongst all
+ * connected clients in terms of invocations.
+ *
+ * This server is inherently single-threaded. If you want a limited thread pool
+ * coupled with invocation-fairness, see THsHaServer.
+ *
+ * To use this server, you MUST use a TFramedTransport at the outermost
+ * transport, otherwise this server will be unable to determine when a whole
+ * method call has been read off the wire. Clients must also use TFramedTransport.
+ */
+public class TNonblockingServer extends TServer {
+ private static final Logger LOGGER =
+ Logger.getLogger(TNonblockingServer.class.getName());
+
+ // Flag for stopping the server
+ private volatile boolean stopped_;
+
+ private SelectThread selectThread_;
+
+ /**
+ * Create server with given processor and server transport, using
+ * TBinaryProtocol for the protocol, TFramedTransport.Factory on both input
+ * and output transports. A TProcessorFactory will be created that always
+ * returns the specified processor.
+ */
+ public TNonblockingServer(TProcessor processor,
+ TNonblockingServerTransport serverTransport) {
+ this(new TProcessorFactory(processor), serverTransport);
+ }
+
+ /**
+ * Create server with specified processor factory and server transport.
+ * TBinaryProtocol is assumed. TFramedTransport.Factory is used on both input
+ * and output transports.
+ */
+ public TNonblockingServer(TProcessorFactory processorFactory,
+ TNonblockingServerTransport serverTransport) {
+ this(processorFactory, serverTransport,
+ new TFramedTransport.Factory(), new TFramedTransport.Factory(),
+ new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory());
+ }
+
+ public TNonblockingServer(TProcessor processor,
+ TNonblockingServerTransport serverTransport,
+ TProtocolFactory protocolFactory) {
+ this(processor, serverTransport,
+ new TFramedTransport.Factory(), new TFramedTransport.Factory(),
+ protocolFactory, protocolFactory);
+ }
+
+ public TNonblockingServer(TProcessor processor,
+ TNonblockingServerTransport serverTransport,
+ TFramedTransport.Factory transportFactory,
+ TProtocolFactory protocolFactory) {
+ this(processor, serverTransport,
+ transportFactory, transportFactory,
+ protocolFactory, protocolFactory);
+ }
+
+ public TNonblockingServer(TProcessorFactory processorFactory,
+ TNonblockingServerTransport serverTransport,
+ TFramedTransport.Factory transportFactory,
+ TProtocolFactory protocolFactory) {
+ this(processorFactory, serverTransport,
+ transportFactory, transportFactory,
+ protocolFactory, protocolFactory);
+ }
+
+ public TNonblockingServer(TProcessor processor,
+ TNonblockingServerTransport serverTransport,
+ TFramedTransport.Factory inputTransportFactory,
+ TFramedTransport.Factory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory) {
+ this(new TProcessorFactory(processor), serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory);
+ }
+
+ public TNonblockingServer(TProcessorFactory processorFactory,
+ TNonblockingServerTransport serverTransport,
+ TFramedTransport.Factory inputTransportFactory,
+ TFramedTransport.Factory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory) {
+ super(processorFactory, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory);
+ }
+
+ /**
+ * Begin accepting connections and processing invocations.
+ */
+ public void serve() {
+ // start listening, or exit
+ if (!startListening()) {
+ return;
+ }
+
+ // start the selector, or exit
+ if (!startSelectorThread()) {
+ return;
+ }
+
+ // this will block while we serve
+ joinSelector();
+
+ // do a little cleanup
+ stopListening();
+ }
+
+ /**
+ * Have the server transport start accepting connections.
+ *
+ * @return true if we started listening successfully, false if something went
+ * wrong.
+ */
+ protected boolean startListening() {
+ try {
+ serverTransport_.listen();
+ return true;
+ } catch (TTransportException ttx) {
+ LOGGER.log(Level.SEVERE, "Failed to start listening on server socket!", ttx);
+ return false;
+ }
+ }
+
+ /**
+ * Stop listening for conections.
+ */
+ protected void stopListening() {
+ serverTransport_.close();
+ }
+
+ /**
+ * Start the selector thread running to deal with clients.
+ *
+ * @return true if everything went ok, false if we couldn't start for some
+ * reason.
+ */
+ protected boolean startSelectorThread() {
+ // start the selector
+ try {
+ selectThread_ = new SelectThread((TNonblockingServerTransport)serverTransport_);
+ selectThread_.start();
+ return true;
+ } catch (IOException e) {
+ LOGGER.log(Level.SEVERE, "Failed to start selector thread!", e);
+ return false;
+ }
+ }
+
+ /**
+ * Block until the selector exits.
+ */
+ protected void joinSelector() {
+ // wait until the selector thread exits
+ try {
+ selectThread_.join();
+ } catch (InterruptedException e) {
+ // for now, just silently ignore. technically this means we'll have less of
+ // a graceful shutdown as a result.
+ }
+ }
+
+ /**
+ * Stop serving and shut everything down.
+ */
+ public void stop() {
+ stopped_ = true;
+ selectThread_.wakeupSelector();
+ }
+
+ /**
+ * Perform an invocation. This method could behave several different ways
+ * - invoke immediately inline, queue for separate execution, etc.
+ */
+ protected void requestInvoke(FrameBuffer frameBuffer) {
+ frameBuffer.invoke();
+ }
+
+ /**
+ * A FrameBuffer wants to change its selection preferences, but might not be
+ * in the select thread.
+ */
+ protected void requestSelectInterestChange(FrameBuffer frameBuffer) {
+ selectThread_.requestSelectInterestChange(frameBuffer);
+ }
+
+ /**
+ * The thread that will be doing all the selecting, managing new connections
+ * and those that still need to be read.
+ */
+ protected class SelectThread extends Thread {
+
+ private final TNonblockingServerTransport serverTransport;
+ private final Selector selector;
+
+ // List of FrameBuffers that want to change their selection interests.
+ private final Set<FrameBuffer> selectInterestChanges =
+ new HashSet<FrameBuffer>();
+
+ /**
+ * Set up the SelectorThread.
+ */
+ public SelectThread(final TNonblockingServerTransport serverTransport)
+ throws IOException {
+ this.serverTransport = serverTransport;
+ this.selector = SelectorProvider.provider().openSelector();
+ serverTransport.registerSelector(selector);
+ }
+
+ /**
+ * The work loop. Handles both selecting (all IO operations) and managing
+ * the selection preferences of all existing connections.
+ */
+ public void run() {
+ while (!stopped_) {
+ select();
+ processInterestChanges();
+ }
+ }
+
+ /**
+ * If the selector is blocked, wake it up.
+ */
+ public void wakeupSelector() {
+ selector.wakeup();
+ }
+
+ /**
+ * Add FrameBuffer to the list of select interest changes and wake up the
+ * selector if it's blocked. When the select() call exits, it'll give the
+ * FrameBuffer a chance to change its interests.
+ */
+ public void requestSelectInterestChange(FrameBuffer frameBuffer) {
+ synchronized (selectInterestChanges) {
+ selectInterestChanges.add(frameBuffer);
+ }
+ // wakeup the selector, if it's currently blocked.
+ selector.wakeup();
+ }
+
+ /**
+ * Select and process IO events appropriately:
+ * If there are connections to be accepted, accept them.
+ * If there are existing connections with data waiting to be read, read it,
+ * bufferring until a whole frame has been read.
+ * If there are any pending responses, buffer them until their target client
+ * is available, and then send the data.
+ */
+ private void select() {
+ try {
+ // wait for io events.
+ selector.select();
+
+ // process the io events we received
+ Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
+ while (!stopped_ && selectedKeys.hasNext()) {
+ SelectionKey key = selectedKeys.next();
+ selectedKeys.remove();
+
+ // skip if not valid
+ if (!key.isValid()) {
+ cleanupSelectionkey(key);
+ continue;
+ }
+
+ // if the key is marked Accept, then it has to be the server
+ // transport.
+ if (key.isAcceptable()) {
+ handleAccept();
+ } else if (key.isReadable()) {
+ // deal with reads
+ handleRead(key);
+ } else if (key.isWritable()) {
+ // deal with writes
+ handleWrite(key);
+ }
+ }
+ } catch (IOException e) {
+ LOGGER.log(Level.WARNING, "Got an IOException while selecting!", e);
+ }
+ }
+
+ /**
+ * Check to see if there are any FrameBuffers that have switched their
+ * interest type from read to write or vice versa.
+ */
+ private void processInterestChanges() {
+ synchronized (selectInterestChanges) {
+ for (FrameBuffer fb : selectInterestChanges) {
+ fb.changeSelectInterests();
+ }
+ selectInterestChanges.clear();
+ }
+ }
+
+ /**
+ * Accept a new connection.
+ */
+ private void handleAccept() throws IOException {
+ SelectionKey clientKey = null;
+ try {
+ // accept the connection
+ TNonblockingTransport client = (TNonblockingTransport)serverTransport.accept();
+ clientKey = client.registerSelector(selector, SelectionKey.OP_READ);
+
+ // add this key to the map
+ FrameBuffer frameBuffer = new FrameBuffer(client, clientKey);
+ clientKey.attach(frameBuffer);
+ } catch (TTransportException tte) {
+ // something went wrong accepting.
+ cleanupSelectionkey(clientKey);
+ LOGGER.log(Level.WARNING, "Exception trying to accept!", tte);
+ tte.printStackTrace();
+ }
+ }
+
+ /**
+ * Do the work required to read from a readable client. If the frame is
+ * fully read, then invoke the method call.
+ */
+ private void handleRead(SelectionKey key) {
+ FrameBuffer buffer = (FrameBuffer)key.attachment();
+ if (buffer.read()) {
+ // if the buffer's frame read is complete, invoke the method.
+ if (buffer.isFrameFullyRead()) {
+ requestInvoke(buffer);
+ }
+ } else {
+ cleanupSelectionkey(key);
+ }
+ }
+
+ /**
+ * Let a writable client get written, if there's data to be written.
+ */
+ private void handleWrite(SelectionKey key) {
+ FrameBuffer buffer = (FrameBuffer)key.attachment();
+ if (!buffer.write()) {
+ cleanupSelectionkey(key);
+ }
+ }
+
+ /**
+ * Do connection-close cleanup on a given SelectionKey.
+ */
+ private void cleanupSelectionkey(SelectionKey key) {
+ // remove the records from the two maps
+ FrameBuffer buffer = (FrameBuffer)key.attachment();
+ if (buffer != null) {
+ // close the buffer
+ buffer.close();
+ }
+ // cancel the selection key
+ key.cancel();
+ }
+ } // SelectorThread
+
+ /**
+ * Class that implements a sort of state machine around the interaction with
+ * a client and an invoker. It manages reading the frame size and frame data,
+ * getting it handed off as wrapped transports, and then the writing of
+ * reponse data back to the client. In the process it manages flipping the
+ * read and write bits on the selection key for its client.
+ */
+ protected class FrameBuffer {
+ //
+ // Possible states for the FrameBuffer state machine.
+ //
+ // in the midst of reading the frame size off the wire
+ private static final int READING_FRAME_SIZE = 1;
+ // reading the actual frame data now, but not all the way done yet
+ private static final int READING_FRAME = 2;
+ // completely read the frame, so an invocation can now happen
+ private static final int READ_FRAME_COMPLETE = 3;
+ // waiting to get switched to listening for write events
+ private static final int AWAITING_REGISTER_WRITE = 4;
+ // started writing response data, not fully complete yet
+ private static final int WRITING = 6;
+ // another thread wants this framebuffer to go back to reading
+ private static final int AWAITING_REGISTER_READ = 7;
+
+ //
+ // Instance variables
+ //
+
+ // the actual transport hooked up to the client.
+ private final TNonblockingTransport trans_;
+
+ // the SelectionKey that corresponds to our transport
+ private final SelectionKey selectionKey_;
+
+ // where in the process of reading/writing are we?
+ private int state_ = READING_FRAME_SIZE;
+
+ // the ByteBuffer we'll be using to write and read, depending on the state
+ private ByteBuffer buffer_;
+
+ private ByteArrayOutputStream response_;
+
+ public FrameBuffer( final TNonblockingTransport trans,
+ final SelectionKey selectionKey) {
+ trans_ = trans;
+ selectionKey_ = selectionKey;
+ buffer_ = ByteBuffer.allocate(4);
+ }
+
+ /**
+ * Give this FrameBuffer a chance to read. The selector loop should have
+ * received a read event for this FrameBuffer.
+ *
+ * @return true if the connection should live on, false if it should be
+ * closed
+ */
+ public boolean read() {
+ if (state_ == READING_FRAME_SIZE) {
+ // try to read the frame size completely
+ if (!internalRead()) {
+ return false;
+ }
+
+ // if the frame size has been read completely, then prepare to read the
+ // actual frame.
+ if (buffer_.remaining() == 0) {
+ // pull out the frame size as an integer.
+ int frameSize = buffer_.getInt(0);
+ if (frameSize <= 0) {
+ LOGGER.severe("Read an invalid frame size of " + frameSize
+ + ". Are you using TFramedTransport on the client side?");
+ return false;
+ }
+ // reallocate the readbuffer as a frame-sized buffer
+ buffer_ = ByteBuffer.allocate(frameSize + 4);
+ // put the frame size at the head of the buffer
+ buffer_.putInt(frameSize);
+
+ state_ = READING_FRAME;
+ } else {
+ // this skips the check of READING_FRAME state below, since we can't
+ // possibly go on to that state if there's data left to be read at
+ // this one.
+ return true;
+ }
+ }
+
+ // it is possible to fall through from the READING_FRAME_SIZE section
+ // to READING_FRAME if there's already some frame data available once
+ // READING_FRAME_SIZE is complete.
+
+ if (state_ == READING_FRAME) {
+ if (!internalRead()) {
+ return false;
+ }
+
+ // since we're already in the select loop here for sure, we can just
+ // modify our selection key directly.
+ if (buffer_.remaining() == 0) {
+ // get rid of the read select interests
+ selectionKey_.interestOps(0);
+ state_ = READ_FRAME_COMPLETE;
+ }
+
+ return true;
+ }
+
+ // if we fall through to this point, then the state must be invalid.
+ LOGGER.severe("Read was called but state is invalid (" + state_ + ")");
+ return false;
+ }
+
+ /**
+ * Give this FrameBuffer a chance to write its output to the final client.
+ */
+ public boolean write() {
+ if (state_ == WRITING) {
+ try {
+ if (trans_.write(buffer_) < 0) {
+ return false;
+ }
+ } catch (IOException e) {
+ LOGGER.log(Level.WARNING, "Got an IOException during write!", e);
+ return false;
+ }
+
+ // we're done writing. now we need to switch back to reading.
+ if (buffer_.remaining() == 0) {
+ prepareRead();
+ }
+ return true;
+ }
+
+ LOGGER.severe("Write was called, but state is invalid (" + state_ + ")");
+ return false;
+ }
+
+ /**
+ * Give this FrameBuffer a chance to set its interest to write, once data
+ * has come in.
+ */
+ public void changeSelectInterests() {
+ if (state_ == AWAITING_REGISTER_WRITE) {
+ // set the OP_WRITE interest
+ selectionKey_.interestOps(SelectionKey.OP_WRITE);
+ state_ = WRITING;
+ } else if (state_ == AWAITING_REGISTER_READ) {
+ prepareRead();
+ } else {
+ LOGGER.severe(
+ "changeSelectInterest was called, but state is invalid ("
+ + state_ + ")");
+ }
+ }
+
+ /**
+ * Shut the connection down.
+ */
+ public void close() {
+ trans_.close();
+ }
+
+ /**
+ * Check if this FrameBuffer has a full frame read.
+ */
+ public boolean isFrameFullyRead() {
+ return state_ == READ_FRAME_COMPLETE;
+ }
+
+ /**
+ * After the processor has processed the invocation, whatever thread is
+ * managing invocations should call this method on this FrameBuffer so we
+ * know it's time to start trying to write again. Also, if it turns out
+ * that there actually isn't any data in the response buffer, we'll skip
+ * trying to write and instead go back to reading.
+ */
+ public void responseReady() {
+ // capture the data we want to write as a byte array.
+ byte[] bytes = response_.toByteArray();
+
+ if (bytes.length <= 0) {
+ // go straight to reading again. this was probably an async method
+ state_ = AWAITING_REGISTER_READ;
+ } else {
+ buffer_ = ByteBuffer.wrap(bytes);
+
+ // set state that we're waiting to be switched to write. we do this
+ // asynchronously through requestSelectInterestChange() because there is a
+ // possibility that we're not in the main thread, and thus currently
+ // blocked in select(). (this functionality is in place for the sake of
+ // the HsHa server.)
+ state_ = AWAITING_REGISTER_WRITE;
+ }
+ requestSelectInterestChange();
+ }
+
+ /**
+ * Actually invoke the method signified by this FrameBuffer.
+ */
+ public void invoke() {
+ TTransport inTrans = getInputTransport();
+ TProtocol inProt = inputProtocolFactory_.getProtocol(inTrans);
+ TProtocol outProt = outputProtocolFactory_.getProtocol(getOutputTransport());
+
+ try {
+ processorFactory_.getProcessor(inTrans).process(inProt, outProt);
+ responseReady();
+ } catch (TException te) {
+ LOGGER.log(Level.WARNING, "Exception while invoking!", te);
+ }
+ }
+
+ /**
+ * Wrap the read buffer in a memory-based transport so a processor can read
+ * the data it needs to handle an invocation.
+ */
+ private TTransport getInputTransport() {
+ return inputTransportFactory_.getTransport(new TIOStreamTransport(
+ new ByteArrayInputStream(buffer_.array())));
+ }
+
+ /**
+ * Get the transport that should be used by the invoker for responding.
+ */
+ private TTransport getOutputTransport() {
+ response_ = new ByteArrayOutputStream();
+ return outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
+ }
+
+ /**
+ * Perform a read into buffer.
+ *
+ * @return true if the read succeeded, false if there was an error or the
+ * connection closed.
+ */
+ private boolean internalRead() {
+ try {
+ if (trans_.read(buffer_) < 0) {
+ return false;
+ }
+ return true;
+ } catch (IOException e) {
+ LOGGER.log(Level.WARNING, "Got an IOException in internalRead!", e);
+ return false;
+ }
+ }
+
+ /**
+ * We're done writing, so reset our interest ops and change state accordingly.
+ */
+ private void prepareRead() {
+ // we can set our interest directly without using the queue because
+ // we're in the select thread.
+ selectionKey_.interestOps(SelectionKey.OP_READ);
+ // get ready for another go-around
+ buffer_ = ByteBuffer.allocate(4);
+ state_ = READING_FRAME_SIZE;
+ }
+
+ /**
+ * When this FrameBuffer needs to change it's select interests and execution
+ * might not be in the select thread, then this method will make sure the
+ * interest change gets done when the select thread wakes back up. When the
+ * current thread is the select thread, then it just does the interest change
+ * immediately.
+ */
+ private void requestSelectInterestChange() {
+ if (Thread.currentThread() == selectThread_) {
+ changeSelectInterests();
+ } else {
+ TNonblockingServer.this.requestSelectInterestChange(this);
+ }
+ }
+ } // FrameBuffer
+
+}
diff --git a/lib/java/src/com/facebook/thrift/transport/TFramedTransport.java b/lib/java/src/com/facebook/thrift/transport/TFramedTransport.java
index a9ad4b3..e7dfcef 100644
--- a/lib/java/src/com/facebook/thrift/transport/TFramedTransport.java
+++ b/lib/java/src/com/facebook/thrift/transport/TFramedTransport.java
@@ -43,6 +43,15 @@
*/
private boolean frameWrite_ = true;
+ public static class Factory extends TTransportFactory {
+ public Factory() {
+ }
+
+ public TTransport getTransport(TTransport base) {
+ return new TFramedTransport(base);
+ }
+ }
+
/**
* Constructor wraps around another tranpsort
*/
diff --git a/lib/java/src/com/facebook/thrift/transport/TNonblockingServerSocket.java b/lib/java/src/com/facebook/thrift/transport/TNonblockingServerSocket.java
new file mode 100644
index 0000000..44ed0d8
--- /dev/null
+++ b/lib/java/src/com/facebook/thrift/transport/TNonblockingServerSocket.java
@@ -0,0 +1,143 @@
+
+package com.facebook.thrift.transport;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketException;
+
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.ClosedChannelException;
+
+/**
+ * Wrapper around ServerSocketChannel
+ */
+public class TNonblockingServerSocket extends TNonblockingServerTransport {
+
+ /**
+ * This channel is where all the nonblocking magic happens.
+ */
+ private ServerSocketChannel serverSocketChannel = null;
+
+ /**
+ * Underlying serversocket object
+ */
+ private ServerSocket serverSocket_ = null;
+
+ /**
+ * Port to listen on
+ */
+ private int port_ = 0;
+
+ /**
+ * Timeout for client sockets from accept
+ */
+ private int clientTimeout_ = 0;
+
+ /**
+ * Creates a server socket from underlying socket object
+ */
+ // public TNonblockingServerSocket(ServerSocket serverSocket) {
+ // this(serverSocket, 0);
+ // }
+
+ /**
+ * Creates a server socket from underlying socket object
+ */
+ // public TNonblockingServerSocket(ServerSocket serverSocket, int clientTimeout) {
+ // serverSocket_ = serverSocket;
+ // clientTimeout_ = clientTimeout;
+ // }
+
+ /**
+ * Creates just a port listening server socket
+ */
+ public TNonblockingServerSocket(int port) throws TTransportException {
+ this(port, 0);
+ }
+
+ /**
+ * Creates just a port listening server socket
+ */
+ public TNonblockingServerSocket(int port, int clientTimeout) throws TTransportException {
+ port_ = port;
+ clientTimeout_ = clientTimeout;
+ try {
+ serverSocketChannel = ServerSocketChannel.open();
+ serverSocketChannel.configureBlocking(false);
+
+ // Make server socket
+ serverSocket_ = serverSocketChannel.socket();
+ // Prevent 2MSL delay problem on server restarts
+ serverSocket_.setReuseAddress(true);
+ // Bind to listening port
+ serverSocket_.bind(new InetSocketAddress(port_));
+ } catch (IOException ioe) {
+ serverSocket_ = null;
+ throw new TTransportException("Could not create ServerSocket on port " + port + ".");
+ }
+ }
+
+ public void listen() throws TTransportException {
+ // Make sure not to block on accept
+ if (serverSocket_ != null) {
+ try {
+ serverSocket_.setSoTimeout(0);
+ } catch (SocketException sx) {
+ sx.printStackTrace();
+ }
+ }
+ }
+
+ protected TNonblockingSocket acceptImpl() throws TTransportException {
+ if (serverSocket_ == null) {
+ throw new TTransportException(TTransportException.NOT_OPEN, "No underlying server socket.");
+ }
+ try {
+ SocketChannel socketChannel = serverSocketChannel.accept();
+ if (socketChannel == null) {
+ return null;
+ }
+
+ TNonblockingSocket tsocket = new TNonblockingSocket(socketChannel);
+ tsocket.setTimeout(clientTimeout_);
+ return tsocket;
+ } catch (IOException iox) {
+ throw new TTransportException(iox);
+ }
+ }
+
+ public void registerSelector(Selector selector) {
+ try {
+ // Register the server socket channel, indicating an interest in
+ // accepting new connections
+ serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+ } catch (ClosedChannelException e) {
+ // this shouldn't happen, ideally...
+ // TODO: decide what to do with this.
+ }
+ }
+
+ public void close() {
+ if (serverSocket_ != null) {
+ try {
+ serverSocket_.close();
+ } catch (IOException iox) {
+ System.err.println("WARNING: Could not close server socket: " +
+ iox.getMessage());
+ }
+ serverSocket_ = null;
+ }
+ }
+
+ public void interrupt() {
+ // The thread-safeness of this is dubious, but Java documentation suggests
+ // that it is safe to do this from a different thread context
+ close();
+ }
+
+}
diff --git a/lib/java/src/com/facebook/thrift/transport/TNonblockingServerTransport.java b/lib/java/src/com/facebook/thrift/transport/TNonblockingServerTransport.java
new file mode 100644
index 0000000..7911851
--- /dev/null
+++ b/lib/java/src/com/facebook/thrift/transport/TNonblockingServerTransport.java
@@ -0,0 +1,12 @@
+
+package com.facebook.thrift.transport;
+
+import java.nio.channels.Selector;
+
+/**
+ * Server transport that can be operated in a nonblocking fashion.
+ */
+public abstract class TNonblockingServerTransport extends TServerTransport {
+
+ public abstract void registerSelector(Selector selector);
+}
diff --git a/lib/java/src/com/facebook/thrift/transport/TNonblockingSocket.java b/lib/java/src/com/facebook/thrift/transport/TNonblockingSocket.java
new file mode 100644
index 0000000..07c03e3
--- /dev/null
+++ b/lib/java/src/com/facebook/thrift/transport/TNonblockingSocket.java
@@ -0,0 +1,259 @@
+
+package com.facebook.thrift.transport;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+
+/**
+ * Socket implementation of the TTransport interface. To be commented soon!
+ */
+public class TNonblockingSocket extends TNonblockingTransport {
+
+ private SocketChannel socketChannel = null;
+
+ /**
+ * Wrapped Socket object
+ */
+ private Socket socket_ = null;
+
+ /**
+ * Remote host
+ */
+ private String host_ = null;
+
+ /**
+ * Remote port
+ */
+ private int port_ = 0;
+
+ /**
+ * Socket timeout
+ */
+ private int timeout_ = 0;
+
+ /**
+ * Constructor that takes an already created socket.
+ *
+ * @param socketChannel Already created SocketChannel object
+ * @throws TTransportException if there is an error setting up the streams
+ */
+ public TNonblockingSocket(SocketChannel socketChannel) throws TTransportException {
+ try {
+ // make it a nonblocking channel
+ socketChannel.configureBlocking(false);
+ } catch (IOException e) {
+ throw new TTransportException(e);
+ }
+
+ this.socketChannel = socketChannel;
+ this.socket_ = socketChannel.socket();
+ try {
+ socket_.setSoLinger(false, 0);
+ socket_.setTcpNoDelay(true);
+ } catch (SocketException sx) {
+ sx.printStackTrace();
+ }
+
+ // if (isOpen()) {
+ // try {
+ // // inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
+ // // outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
+ // } catch (IOException iox) {
+ // close();
+ // throw new TTransportException(TTransportException.NOT_OPEN, iox);
+ // }
+ // }
+ }
+
+ // This is all for the clientside stuff. Not sure that we'll actually be supporting that yet.
+ // /**
+ // * Creates a new unconnected socket that will connect to the given host
+ // * on the given port.
+ // *
+ // * @param host Remote host
+ // * @param port Remote port
+ // */
+ // public TNonblockingSocket(String host, int port) {
+ // this(host, port, 0);
+ // }
+ //
+ // /**
+ // * Creates a new unconnected socket that will connect to the given host
+ // * on the given port.
+ // *
+ // * @param host Remote host
+ // * @param port Remote port
+ // * @param timeout Socket timeout
+ // */
+ // public TSocket(String host, int port, int timeout) {
+ // host_ = host;
+ // port_ = port;
+ // timeout_ = timeout;
+ // initSocket();
+ // }
+
+
+ /**
+ * Register this socket with the specified selector for both read and write
+ * operations.
+ *
+ * @param selector
+ * @return the selection key for this socket.
+ */
+ public SelectionKey registerSelector(Selector selector, int interests) throws IOException {
+ // Register the new SocketChannel with our Selector, indicating
+ // we'd like to be notified when there's data waiting to be read
+ return socketChannel.register(selector, interests);
+ }
+
+ /**
+ * Initializes the socket object
+ */
+ private void initSocket() {
+ socket_ = new Socket();
+ try {
+ socket_.setSoLinger(false, 0);
+ socket_.setTcpNoDelay(true);
+ socket_.setSoTimeout(timeout_);
+ } catch (SocketException sx) {
+ sx.printStackTrace();
+ }
+ }
+
+ /**
+ * Sets the socket timeout
+ *
+ * @param timeout Milliseconds timeout
+ */
+ public void setTimeout(int timeout) {
+ timeout_ = timeout;
+ try {
+ socket_.setSoTimeout(timeout);
+ } catch (SocketException sx) {
+ sx.printStackTrace();
+ }
+ }
+
+ /**
+ * Returns a reference to the underlying socket.
+ */
+ public Socket getSocket() {
+ if (socket_ == null) {
+ initSocket();
+ }
+ return socket_;
+ }
+
+ /**
+ * Checks whether the socket is connected.
+ */
+ public boolean isOpen() {
+ if (socket_ == null) {
+ return false;
+ }
+ return socket_.isConnected();
+ }
+
+ /**
+ * Connects the socket, creating a new socket object if necessary.
+ */
+ public void open() throws TTransportException {
+ throw new RuntimeException("Not implemented yet");
+ // if (isOpen()) {
+ // throw new TTransportException(TTransportException.ALREADY_OPEN, "Socket already connected.");
+ // }
+ //
+ // if (host_.length() == 0) {
+ // throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open null host.");
+ // }
+ // if (port_ <= 0) {
+ // throw new TTransportException(TTransportException.NOT_OPEN, "Cannot open without port.");
+ // }
+ //
+ // if (socket_ == null) {
+ // initSocket();
+ // }
+ //
+ // try {
+ // socket_.connect(new InetSocketAddress(host_, port_));
+ // inputStream_ = new BufferedInputStream(socket_.getInputStream(), 1024);
+ // outputStream_ = new BufferedOutputStream(socket_.getOutputStream(), 1024);
+ // } catch (IOException iox) {
+ // close();
+ // throw new TTransportException(TTransportException.NOT_OPEN, iox);
+ // }
+ }
+
+ /**
+ * Perform a nonblocking read into buffer.
+ */
+ public int read(ByteBuffer buffer) throws IOException {
+ return socketChannel.read(buffer);
+ }
+
+
+ /**
+ * Reads from the underlying input stream if not null.
+ */
+ public int read(byte[] buf, int off, int len) throws TTransportException {
+ if ((socketChannel.validOps() & SelectionKey.OP_READ) != SelectionKey.OP_READ) {
+ throw new TTransportException(TTransportException.NOT_OPEN,
+ "Cannot read from write-only socket channel");
+ }
+ try {
+ return socketChannel.read(ByteBuffer.wrap(buf, off, len));
+ } catch (IOException iox) {
+ throw new TTransportException(TTransportException.UNKNOWN, iox);
+ }
+ }
+
+ /**
+ * Perform a nonblocking write of the data in buffer;
+ */
+ public int write(ByteBuffer buffer) throws IOException {
+ return socketChannel.write(buffer);
+ }
+
+ /**
+ * Writes to the underlying output stream if not null.
+ */
+ public void write(byte[] buf, int off, int len) throws TTransportException {
+ if ((socketChannel.validOps() & SelectionKey.OP_WRITE) != SelectionKey.OP_WRITE) {
+ throw new TTransportException(TTransportException.NOT_OPEN,
+ "Cannot write to write-only socket channel");
+ }
+ try {
+ socketChannel.write(ByteBuffer.wrap(buf, off, len));
+ } catch (IOException iox) {
+ throw new TTransportException(TTransportException.UNKNOWN, iox);
+ }
+ }
+
+ /**
+ * Flushes the underlying output stream if not null.
+ */
+ public void flush() throws TTransportException {
+ // Not supported by SocketChannel.
+ }
+
+ /**
+ * Closes the socket.
+ */
+ public void close() {
+ try {
+ socketChannel.close();
+ } catch (IOException e) {
+ // silently ignore.
+ }
+ }
+
+}
diff --git a/lib/java/src/com/facebook/thrift/transport/TNonblockingTransport.java b/lib/java/src/com/facebook/thrift/transport/TNonblockingTransport.java
new file mode 100644
index 0000000..a702490
--- /dev/null
+++ b/lib/java/src/com/facebook/thrift/transport/TNonblockingTransport.java
@@ -0,0 +1,12 @@
+package com.facebook.thrift.transport;
+
+import java.io.IOException;
+import java.nio.channels.Selector;
+import java.nio.channels.SelectionKey;
+import java.nio.ByteBuffer;
+
+public abstract class TNonblockingTransport extends TTransport {
+ public abstract SelectionKey registerSelector(Selector selector, int interests) throws IOException;
+ public abstract int read(ByteBuffer buffer) throws IOException;
+ public abstract int write(ByteBuffer buffer) throws IOException;
+}
diff --git a/lib/java/src/com/facebook/thrift/transport/TTransport.java b/lib/java/src/com/facebook/thrift/transport/TTransport.java
index 2c10870..3d911bf 100644
--- a/lib/java/src/com/facebook/thrift/transport/TTransport.java
+++ b/lib/java/src/com/facebook/thrift/transport/TTransport.java
@@ -56,7 +56,7 @@
throws TTransportException;
/**
- * Guarantees that all of len bytes are
+ * Guarantees that all of len bytes are actually read off the transport.
*
* @param buf Array to read into
* @param off Index to start reading at
@@ -71,7 +71,7 @@
while (got < len) {
ret = read(buf, off+got, len-got);
if (ret <= 0) {
- throw new TTransportException("Cannot read. Remote side has closed.");
+ throw new TTransportException("Cannot read. Remote side has closed. Tried to read " + len + " bytes, but only got " + got + " bytes.");
}
got += ret;
}
diff --git a/test/java/TestNonblockingServer b/test/java/TestNonblockingServer
new file mode 100644
index 0000000..cb330d4
--- /dev/null
+++ b/test/java/TestNonblockingServer
@@ -0,0 +1,2 @@
+#!/bin/bash -v
+java -server -cp thrifttest.jar:../../lib/java/libthrift.jar com.facebook.thrift.test.TestNonblockingServer $*
diff --git a/test/java/src/TestClient.java b/test/java/src/TestClient.java
index 379761d..bc76e83 100644
--- a/test/java/src/TestClient.java
+++ b/test/java/src/TestClient.java
@@ -38,6 +38,8 @@
boolean framedInput = true;
boolean framedOutput = true;
+ int socketTimeout = 1000;
+
try {
for (int i = 0; i < args.length; ++i) {
if (args[i].equals("-h")) {
@@ -53,6 +55,8 @@
url = args[++i];
} else if (args[i].equals("-n")) {
numTests = Integer.valueOf(args[++i]);
+ } else if (args[i].equals("-timeout")) {
+ socketTimeout = Integer.valueOf(args[++i]);
}
}
} catch (Exception x) {
@@ -65,7 +69,7 @@
transport = new THttpClient(url);
} else {
TSocket socket = new TSocket(host, port);
- socket.setTimeout(1000);
+ socket.setTimeout(socketTimeout);
transport = socket;
if (framed) {
transport = new TFramedTransport(transport,
diff --git a/test/java/src/TestNonblockingServer.java b/test/java/src/TestNonblockingServer.java
new file mode 100644
index 0000000..450d67a
--- /dev/null
+++ b/test/java/src/TestNonblockingServer.java
@@ -0,0 +1,70 @@
+package com.facebook.thrift.test;
+
+import com.facebook.thrift.TException;
+import com.facebook.thrift.protocol.TBinaryProtocol;
+import com.facebook.thrift.protocol.TProtocol;
+import com.facebook.thrift.protocol.TProtocolFactory;
+import com.facebook.thrift.server.TServer;
+import com.facebook.thrift.server.TSimpleServer;
+import com.facebook.thrift.server.TNonblockingServer;
+import com.facebook.thrift.server.THsHaServer;
+import com.facebook.thrift.transport.TNonblockingServerSocket;
+import com.facebook.thrift.transport.TNonblockingServerTransport;
+import com.facebook.thrift.transport.TFramedTransport;
+
+// Generated code
+import thrift.test.*;
+
+import java.net.ServerSocket;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+
+
+public class TestNonblockingServer extends TestServer {
+ public static void main(String [] args) {
+ try {
+ int port = 9090;
+ boolean hsha = false;
+
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].equals("-p")) {
+ port = Integer.valueOf(args[i++]);
+ } else if (args[i].equals("-hsha")) {
+ hsha = true;
+ }
+ }
+
+ // Processor
+ TestHandler testHandler =
+ new TestHandler();
+ ThriftTest.Processor testProcessor =
+ new ThriftTest.Processor(testHandler);
+
+ // Transport
+ TNonblockingServerSocket tServerSocket =
+ new TNonblockingServerSocket(port);
+
+ TServer serverEngine;
+
+ if (hsha) {
+ // HsHa Server
+ serverEngine = new THsHaServer(testProcessor, tServerSocket);
+ } else {
+ // Nonblocking Server
+ serverEngine = new TNonblockingServer(testProcessor, tServerSocket);
+ }
+
+ // Run it
+ System.out.println("Starting the server on port " + port + "...");
+ serverEngine.serve();
+
+ } catch (Exception x) {
+ x.printStackTrace();
+ }
+ System.out.println("done.");
+ }
+}