Thrift-1442: TNonblockingServer: Refactor to allow multiple IO Threads
Client: cpp
Patch: Dave Watson

Ads multiple IO threads to TNonblocking Server


git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1210737 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
index 70204f1..6924aa6 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
@@ -156,7 +156,13 @@
          cause the process to run out of thread resources.
          We're beyond the point of throwing an exception.  Not clear how
          best to handle this. */
-      detached_ = pthread_join(pthread_, &ignore) == 0;
+      int res = pthread_join(pthread_, &ignore);
+      detached_ = (res == 0);
+      if (res != 0) {
+        GlobalOutput.printf("PthreadThread::join(): fail with code %d", res);
+      }
+    } else {
+      GlobalOutput.printf("PthreadThread::join(): detached thread");
     }
   }
 
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index c331eda..ba029a9 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -24,6 +24,7 @@
 #include "TNonblockingServer.h"
 #include <concurrency/Exception.h>
 #include <transport/TSocket.h>
+#include <concurrency/PosixThreadFactory.h>
 
 #include <iostream>
 
@@ -50,6 +51,7 @@
 
 #include <errno.h>
 #include <assert.h>
+#include <sched.h>
 
 #ifndef AF_LOCAL
 #define AF_LOCAL AF_UNIX
@@ -63,6 +65,7 @@
 using namespace std;
 using apache::thrift::transport::TSocket;
 using apache::thrift::transport::TTransportException;
+using boost::shared_ptr;
 
 /// Three states for sockets: recv frame size, recv data, and send mode
 enum TSocketState {
@@ -94,6 +97,8 @@
  */
 class TNonblockingServer::TConnection {
  private:
+  /// Server IO Thread handling this connection
+  TNonblockingIOThread* ioThread_;
 
   /// Server handle
   TNonblockingServer* server_;
@@ -209,25 +214,25 @@
   class Task;
 
   /// Constructor
-  TConnection(int socket, short eventFlags, TNonblockingServer *s,
+  TConnection(int socket, TNonblockingIOThread* ioThread,
               const sockaddr* addr, socklen_t addrLen) {
     readBuffer_ = NULL;
     readBufferSize_ = 0;
 
-    // Allocate input and output transports
-    // these only need to be allocated once per TConnection (they don't need to be
-    // reallocated on init() call)
-    inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
-    outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
-    tSocket_.reset(new TSocket());
+    ioThread_ = ioThread;
+    server_ = ioThread->getServer();
 
-    init(socket, eventFlags, s, addr, addrLen);
-    server_->incrementNumConnections();
+    // Allocate input and output transports these only need to be allocated
+    // once per TConnection (they don't need to be reallocated on init() call)
+    inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
+    outputTransport_.reset(new TMemoryBuffer(
+                                    server_->getWriteBufferDefaultSize()));
+    tSocket_.reset(new TSocket());
+    init(socket, ioThread, addr, addrLen);
   }
 
   ~TConnection() {
     std::free(readBuffer_);
-    server_->decrementNumConnections();
   }
 
  /**
@@ -239,7 +244,7 @@
   void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
 
   /// Initialize
-  void init(int socket, short eventFlags, TNonblockingServer *s,
+  void init(int socket, TNonblockingIOThread* ioThread,
             const sockaddr* addr, socklen_t addrLen);
 
   /**
@@ -263,60 +268,41 @@
   }
 
   /**
-   * C-callable event handler for signaling task completion.  Provides a
-   * callback that libevent can understand that will read a connection
-   * object's address from a pipe and call connection->transition() for
-   * that object.
-   *
-   * @param fd the descriptor the event occurred on.
-   */
-  static void taskHandler(evutil_socket_t fd, short /* which */, void* /* v */) {
-    TConnection* connection;
-    ssize_t nBytes;
-    while ((nBytes = recv(fd, cast_sockopt(&connection), sizeof(TConnection*), 0))
-        == sizeof(TConnection*)) {
-      connection->transition();
-    }
-    if (nBytes > 0) {
-      throw TException("TConnection::taskHandler unexpected partial read");
-    }
-    if (errno != EWOULDBLOCK && errno != EAGAIN) {
-      GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
-    }
-  }
-
-  /**
    * Notification to server that processing has ended on this request.
    * Can be called either when processing is completed or when a waiting
    * task has been preemptively terminated (on overload).
    *
+   * Don't call this from the IO thread itself.
+   *
    * @return true if successful, false if unable to notify (check errno).
    */
-  bool notifyServer() {
-    TConnection* connection = this;
-    if (send(server_->getNotificationSendFD(), const_cast_sockopt(&connection),
-             sizeof(TConnection*), 0) != sizeof(TConnection*)) {
-      return false;
-    }
+  bool notifyIOThread() {
+    return ioThread_->notify(this);
+  }
 
-    return true;
+  /*
+   * Returns the number of this connection's currently assigned IO
+   * thread.
+   */
+  int getIOThreadNumber() const {
+    return ioThread_->getThreadNumber();
   }
 
   /// Force connection shutdown for this connection.
   void forceClose() {
     appState_ = APP_CLOSE_CONNECTION;
-    if (!notifyServer()) {
+    if (!notifyIOThread()) {
       throw TException("TConnection::forceClose: failed write on notify pipe");
     }
   }
 
   /// return the server this connection was initialized for.
-  TNonblockingServer* getServer() {
+  TNonblockingServer* getServer() const {
     return server_;
   }
 
   /// get state of connection.
-  TAppState getState() {
+  TAppState getState() const {
     return appState_;
   }
 
@@ -362,19 +348,20 @@
         }
       }
     } catch (const TTransportException& ttx) {
-      GlobalOutput.printf("TNonblockingServer client died: %s", ttx.what());
+      GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
     } catch (const bad_alloc&) {
-      GlobalOutput("TNonblockingServer caught bad_alloc exception.");
+      GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
       exit(-1);
     } catch (const std::exception& x) {
-      GlobalOutput.printf("TNonblockingServer process() exception: %s: %s",
+      GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
                           typeid(x).name(), x.what());
     } catch (...) {
-      GlobalOutput("TNonblockingServer uncaught exception.");
+      GlobalOutput.printf(
+        "TNonblockingServer: unknown exception while processing.");
     }
 
     // Signal completion back to the libevent thread via a pipe
-    if (!connection_->notifyServer()) {
+    if (!connection_->notifyIOThread()) {
       throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
     }
   }
@@ -392,14 +379,15 @@
   void* connectionContext_;
 };
 
-void TNonblockingServer::TConnection::init(int socket, short eventFlags,
-                                           TNonblockingServer* s,
+void TNonblockingServer::TConnection::init(int socket,
+                                           TNonblockingIOThread* ioThread,
                                            const sockaddr* addr,
                                            socklen_t addrLen) {
   tSocket_->setSocketFD(socket);
   tSocket_->setCachedAddress(addr, addrLen);
 
-  server_ = s;
+  ioThread_ = ioThread;
+  server_ = ioThread->getServer();
   appState_ = APP_INIT;
   eventFlags_ = 0;
 
@@ -412,30 +400,31 @@
   largestWriteBufferSize_ = 0;
 
   socketState_ = SOCKET_RECV_FRAMING;
-  appState_ = APP_INIT;
   callsForResize_ = 0;
 
-  // Set flags, which also registers the event
-  setFlags(eventFlags);
-
   // get input/transports
-  factoryInputTransport_ = s->getInputTransportFactory()->getTransport(inputTransport_);
-  factoryOutputTransport_ = s->getOutputTransportFactory()->getTransport(outputTransport_);
+  factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(
+                             inputTransport_);
+  factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(
+                             outputTransport_);
 
   // Create protocol
-  inputProtocol_ = s->getInputProtocolFactory()->getProtocol(factoryInputTransport_);
-  outputProtocol_ = s->getOutputProtocolFactory()->getProtocol(factoryOutputTransport_);
+  inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(
+                     factoryInputTransport_);
+  outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(
+                     factoryOutputTransport_);
 
   // Set up for any server event handler
   serverEventHandler_ = server_->getEventHandler();
   if (serverEventHandler_ != NULL) {
-    connectionContext_ = serverEventHandler_->createContext(inputProtocol_, outputProtocol_);
+    connectionContext_ = serverEventHandler_->createContext(inputProtocol_,
+                                                            outputProtocol_);
   } else {
     connectionContext_ = NULL;
   }
 
   // Get the processor
-  processor_ = s->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
+  processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
 }
 
 void TNonblockingServer::TConnection::workSocket() {
@@ -500,7 +489,7 @@
 
       return;
     }
-        
+
     if (got > 0) {
       // Move along in the buffer
       readBufferPos_ += got;
@@ -565,6 +554,9 @@
  * to, or finished receiving the data that it needed to.
  */
 void TNonblockingServer::TConnection::transition() {
+  // ensure this connection is active right now
+  assert(ioThread_);
+  assert(server_);
 
   // Switch upon the state that we are currently in and move to a new state
   switch (appState_) {
@@ -800,7 +792,7 @@
    */
   event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
             TConnection::eventHandler, this);
-  event_base_set(server_->getEventBase(), &event_);
+  event_base_set(ioThread_->getEventBase(), &event_);
 
   // Add the event
   if (event_add(&event_, 0) == -1) {
@@ -820,6 +812,7 @@
   if (serverEventHandler_ != NULL) {
     serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
   }
+  ioThread_ = NULL;
 
   // Close the socket
   tSocket_->close();
@@ -861,14 +854,6 @@
     connectionStack_.pop();
     delete connection;
   }
-
-  if (eventBase_ && ownEventBase_) {
-    event_base_free(eventBase_);
-  }
-
-  if (serverSocket_ >= 0) {
-    close(serverSocket_);
-  }
 }
 
 /**
@@ -876,27 +861,41 @@
  * by allocating a new one entirely
  */
 TNonblockingServer::TConnection* TNonblockingServer::createConnection(
-    int socket, short flags,
-    const sockaddr* addr,
-    socklen_t addrLen) {
+    int socket, const sockaddr* addr, socklen_t addrLen) {
   // Check the stack
+  Guard g(connMutex_);
+
+  // pick an IO thread to handle this connection -- currently round robin
+  assert(nextIOThread_ >= 0);
+  assert(nextIOThread_ < ioThreads_.size());
+  int selectedThreadIdx = nextIOThread_;
+  nextIOThread_ = (nextIOThread_ + 1) % ioThreads_.size();
+
+  TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
+
+  // Check the connection stack to see if we can re-use
+  TConnection* result = NULL;
   if (connectionStack_.empty()) {
-    return new TConnection(socket, flags, this, addr, addrLen);
+    result = new TConnection(socket, ioThread, addr, addrLen);
+    ++numTConnections_;
   } else {
-    TConnection* result = connectionStack_.top();
+    result = connectionStack_.top();
     connectionStack_.pop();
-    result->init(socket, flags, this, addr, addrLen);
-    return result;
+    result->init(socket, ioThread, addr, addrLen);
   }
+  return result;
 }
 
 /**
  * Returns a connection to the stack
  */
 void TNonblockingServer::returnConnection(TConnection* connection) {
+  Guard g(connMutex_);
+
   if (connectionStackLimit_ &&
       (connectionStack_.size() >= connectionStackLimit_)) {
     delete connection;
+    --numTConnections_;
   } else {
     connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
     connectionStack_.push(connection);
@@ -927,6 +926,7 @@
   while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
     // If we're overloaded, take action here
     if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
+      Guard g(connMutex_);
       nConnectionsDropped_++;
       nTotalConnectionsDropped_++;
       if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
@@ -940,6 +940,7 @@
         }
       }
     }
+
     // Explicitly set this socket to NONBLOCK mode
     int flags;
     if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
@@ -951,7 +952,7 @@
 
     // Create a new TConnection for this client socket.
     TConnection* clientConnection =
-      createConnection(clientSocket, EV_READ | EV_PERSIST, addrp, addrLen);
+      createConnection(clientSocket, addrp, addrLen);
 
     // Fail fast if we could not create a TConnection object
     if (clientConnection == NULL) {
@@ -960,13 +961,29 @@
       return;
     }
 
-    // Put this client connection into the proper state
-    clientConnection->transition();
+    /*
+     * Either notify the ioThread that is assigned this connection to
+     * start processing, or if it is us, we'll just ask this
+     * connection to do its initial state change here.
+     *
+     * (We need to avoid writing to our own notification pipe, to
+     * avoid possible deadlocks if the pipe is full.)
+     *
+     * The IO thread #0 is the only one that handles these listen
+     * events, so unless the connection has been assigned to thread #0
+     * we know it's not on our thread.
+     */
+    if (clientConnection->getIOThreadNumber() == 0) {
+      clientConnection->transition();
+    } else {
+      clientConnection->notifyIOThread();
+    }
 
     // addrLen is written by the accept() call, so needs to be set before the next call.
     addrLen = sizeof(addrStorage);
   }
 
+
   // Done looping accept, now we have to make sure the error is due to
   // blocking. Any other error is a problem
   if (errno != EAGAIN && errno != EWOULDBLOCK) {
@@ -977,8 +994,9 @@
 /**
  * Creates a socket to listen on and binds it to the local port.
  */
-void TNonblockingServer::listenSocket() {
+void TNonblockingServer::createAndListenOnSocket() {
   int s;
+
   struct addrinfo hints, *res, *res0;
   int error;
 
@@ -1082,63 +1100,6 @@
   serverSocket_ = s;
 }
 
-void TNonblockingServer::createNotificationPipe() {
-  if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
-    GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
-    throw TException("can't create notification pipe");
-  }
-  if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
-     evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
-    close(notificationPipeFDs_[0]);
-    close(notificationPipeFDs_[1]);
-    throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
-  }
-}
-
-/**
- * Register the core libevent events onto the proper base.
- */
-void TNonblockingServer::registerEvents(event_base* base, bool ownEventBase) {
-  assert(serverSocket_ != -1);
-  assert(!eventBase_);
-  eventBase_ = base;
-  ownEventBase_ = ownEventBase;
-
-  // Print some libevent stats
-  GlobalOutput.printf("libevent %s method %s",
-          event_get_version(),
-          event_base_get_method(eventBase_));
-
-  // Register the server event
-  event_set(&serverEvent_,
-            serverSocket_,
-            EV_READ | EV_PERSIST,
-            TNonblockingServer::eventHandler,
-            this);
-  event_base_set(eventBase_, &serverEvent_);
-
-  // Add the event and start up the server
-  if (-1 == event_add(&serverEvent_, 0)) {
-    throw TException("TNonblockingServer::serve(): coult not event_add");
-  }
-  if (threadPoolProcessing_) {
-    // Create an event to be notified when a task finishes
-    event_set(&notificationEvent_,
-              getNotificationRecvFD(),
-              EV_READ | EV_PERSIST,
-              TConnection::taskHandler,
-              this);
-
-    // Attach to the base
-    event_base_set(eventBase_, &notificationEvent_);
-
-    // Add the event and start up the server
-    if (-1 == event_add(&notificationEvent_, 0)) {
-      throw TException("TNonblockingServer::serve(): notification event_add fail");
-    }
-  }
-}
-
 void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
   threadManager_ = threadManager;
   if (threadManager != NULL) {
@@ -1154,14 +1115,15 @@
   if (numActiveProcessors_ > maxActiveProcessors_ ||
       activeConnections > maxConnections_) {
     if (!overloaded_) {
-      GlobalOutput.printf("thrift non-blocking server overload condition");
+       GlobalOutput.printf("TNonblockingServer: overload condition begun.");
       overloaded_ = true;
     }
   } else {
     if (overloaded_ &&
         (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
         (activeConnections <= overloadHysteresis_ * maxConnections_)) {
-      GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
+      GlobalOutput.printf("TNonblockingServer: overload ended; "
+                          "%u dropped (%llu total)",
                           nConnectionsDropped_, nTotalConnectionsDropped_);
       nConnectionsDropped_ = 0;
       overloaded_ = false;
@@ -1189,73 +1151,361 @@
 void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
   TConnection* connection =
     static_cast<TConnection::Task*>(task.get())->getTConnection();
-  assert(connection && connection->getServer()
-	 && connection->getState() == APP_WAIT_TASK);
+  assert(connection && connection->getServer() &&
+         connection->getState() == APP_WAIT_TASK);
   connection->forceClose();
 }
 
+void TNonblockingServer::stop() {
+  // Breaks the event loop in all threads so that they end ASAP.
+  for (int i = 0; i < ioThreads_.size(); ++i) {
+    ioThreads_[i]->stop();
+  }
+}
+
 /**
  * Main workhorse function, starts up the server listening on a port and
  * loops over the libevent handler.
  */
 void TNonblockingServer::serve() {
-  // Init socket
-  listenSocket();
+  // init listen socket
+  createAndListenOnSocket();
 
-  if (threadPoolProcessing_) {
-    // Init task completion notification pipe
-    createNotificationPipe();
+  // set up the IO threads
+  assert(ioThreads_.empty());
+  if (!numIOThreads_) {
+    numIOThreads_ = DEFAULT_IO_THREADS;
   }
 
-  // Initialize libevent core
-  registerEvents(static_cast<event_base*>(event_base_new()), true);
+  for (int id = 0; id < numIOThreads_; ++id) {
+    // the first IO thread also does the listening on server socket
+    int listenFd = (id == 0 ? serverSocket_ : -1);
 
-  // Run the preServe event
+    shared_ptr<TNonblockingIOThread> thread(
+      new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
+    ioThreads_.push_back(thread);
+  }
+
+  // Notify handler of the preServe event
   if (eventHandler_ != NULL) {
     eventHandler_->preServe();
   }
 
-  // Run libevent engine, invokes calls to eventHandler
-  // Only returns if stop() is called.
-  event_base_loop(eventBase_, 0);
+  // Start all of our helper IO threads. Note that the threads run forever,
+  // only terminating if stop() is called.
+  assert(ioThreads_.size() == numIOThreads_);
+  assert(ioThreads_.size() > 0);
+
+  GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
+               port_, ioThreads_.size());
+
+  // Launch all the secondary IO threads in separate threads
+  if (ioThreads_.size() > 1) {
+    ioThreadFactory_.reset(new PosixThreadFactory(
+      PosixThreadFactory::OTHER,  // scheduler
+      PosixThreadFactory::NORMAL, // priority
+      1,                          // stack size (MB)
+      false                       // detached
+    ));
+
+    assert(ioThreadFactory_.get());
+
+    // intentionally starting at thread 1, not 0
+    for (int i = 1; i < ioThreads_.size(); ++i) {
+      shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
+      ioThreads_[i]->setThread(thread);
+      thread->start();
+    }
+  }
+
+  // Run the primary (listener) IO thread loop in our main thread; this will
+  // only return when the server is shutting down.
+  ioThreads_[0]->run();
+
+  // Ensure all threads are finished before exiting serve()
+  for (int i = 0; i < ioThreads_.size(); ++i) {
+    ioThreads_[i]->join();
+    GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
+  }
 }
 
-void TNonblockingServer::stop() {
-  if (!eventBase_) {
-    return;
+TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
+                                           int number,
+                                           int listenSocket,
+                                           bool useHighPriority)
+      : server_(server)
+      , number_(number)
+      , listenSocket_(listenSocket)
+      , useHighPriority_(useHighPriority)
+      , eventBase_(NULL) {
+  notificationPipeFDs_[0] = -1;
+  notificationPipeFDs_[1] = -1;
+}
+
+TNonblockingIOThread::~TNonblockingIOThread() {
+  // make sure our associated thread is fully finished
+  join();
+
+  if (eventBase_) {
+    event_base_free(eventBase_);
   }
 
-  // Call event_base_loopbreak() to tell libevent to exit the loop
-  //
-  // (The libevent documentation doesn't explicitly state that this function is
-  // safe to call from another thread.  However, all it does is set a variable,
-  // in the event_base, so it should be fine.)
+  if (listenSocket_ >= 0) {
+    if (0 != close(listenSocket_)) {
+      GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
+                          errno);
+    }
+    listenSocket_ = TNonblockingServer::INVALID_SOCKET;
+  }
+
+  for (int i = 0; i < 2; ++i) {
+    if (notificationPipeFDs_[i] >= 0) {
+      if (0 != ::close(notificationPipeFDs_[i])) {
+        GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
+                            errno);
+      }
+      notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET;
+    }
+  }
+}
+
+void TNonblockingIOThread::createNotificationPipe() {
+  if (pipe(notificationPipeFDs_) != 0) {
+    GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
+    throw TException("can't create notification pipe");
+  }
+  int flags;
+  if ((flags = fcntl(notificationPipeFDs_[0], F_GETFL, 0)) < 0 ||
+      fcntl(notificationPipeFDs_[0], F_SETFL, flags | O_NONBLOCK) < 0) {
+    close(notificationPipeFDs_[0]);
+    close(notificationPipeFDs_[1]);
+    throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
+  }
+  for (int i = 0; i < 2; ++i) {
+    if ((flags = fcntl(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
+        fcntl(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
+      close(notificationPipeFDs_[0]);
+      close(notificationPipeFDs_[1]);
+      throw TException("TNonblockingServer::createNotificationPipe() "
+        "FD_CLOEXEC");
+    }
+  }
+}
+
+/**
+ * Register the core libevent events onto the proper base.
+ */
+void TNonblockingIOThread::registerEvents() {
+  if (listenSocket_ >= 0) {
+    // Register the server event
+    event_set(&serverEvent_,
+              listenSocket_,
+              EV_READ | EV_PERSIST,
+              TNonblockingIOThread::listenHandler,
+              server_);
+    event_base_set(eventBase_, &serverEvent_);
+
+    // Add the event and start up the server
+    if (-1 == event_add(&serverEvent_, 0)) {
+      throw TException("TNonblockingServer::serve(): "
+                       "event_add() failed on server listen event");
+    }
+    GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.",
+                        number_);
+  }
+
+  createNotificationPipe();
+
+  // Create an event to be notified when a task finishes
+  event_set(&notificationEvent_,
+            getNotificationRecvFD(),
+            EV_READ | EV_PERSIST,
+            TNonblockingIOThread::notifyHandler,
+            this);
+
+  // Attach to the base
+  event_base_set(eventBase_, &notificationEvent_);
+
+  // Add the event and start up the server
+  if (-1 == event_add(&notificationEvent_, 0)) {
+    throw TException("TNonblockingServer::serve(): "
+                     "event_add() failed on task-done notification event");
+  }
+  GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.",
+                      number_);
+}
+
+bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
+  int fd = getNotificationSendFD();
+  if (fd < 0) {
+    return false;
+  }
+
+  const int kSize = sizeof(conn);
+  if (write(fd, &conn, kSize) != kSize) {
+    return false;
+  }
+
+  return true;
+}
+
+/* static */
+void TNonblockingIOThread::notifyHandler(int fd, short which, void* v) {
+  TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
+  assert(ioThread);
+
+  while (true) {
+    TNonblockingServer::TConnection* connection = 0;
+    const int kSize = sizeof(connection);
+    int nBytes = read(fd, &connection, kSize);
+    if (nBytes == kSize) {
+      if (connection == NULL) {
+        // this is the command to stop our thread, exit the handler!
+        return;
+      }
+      connection->transition();
+    } else if (nBytes > 0) {
+      // throw away these bytes and hope that next time we get a solid read
+      GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d",
+                          nBytes, kSize);
+      ioThread->breakLoop(true);
+      return;
+    } else if (nBytes == 0) {
+      GlobalOutput.printf("notifyHandler: Notify socket closed!");
+      // exit the loop
+      break;
+    } else { // nBytes < 0
+      if (errno != EWOULDBLOCK && errno != EAGAIN) {
+          GlobalOutput.perror(
+            "TNonblocking: notifyHandler read() failed: ", errno);
+          ioThread->breakLoop(true);
+          return;
+      }
+      // exit the loop
+      break;
+    }
+  }
+}
+
+void TNonblockingIOThread::breakLoop(bool error) {
+  if (error) {
+    GlobalOutput.printf(
+      "TNonblockingServer: IO thread #%d exiting with error.", number_);
+    // TODO: figure out something better to do here, but for now kill the
+    // whole process.
+    GlobalOutput.printf("TNonblockingServer: aborting process.");
+    ::abort();
+  }
+
+  // sets a flag so that the loop exits on the next event
   event_base_loopbreak(eventBase_);
 
-  // event_base_loopbreak() only causes the loop to exit the next time it wakes
-  // up.  We need to force it to wake up, in case there are no real events
-  // it needs to process.
+  // event_base_loopbreak() only causes the loop to exit the next time
+  // it wakes up.  We need to force it to wake up, in case there are
+  // no real events it needs to process.
   //
-  // Attempt to connect to the server socket.  If anything fails,
-  // we'll just have to wait until libevent wakes up on its own.
-  //
-  // First create a socket
-  int fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
-  if (fd < 0) {
-    return;
+  // If we're running in the same thread, we can't use the notify(0)
+  // mechanism to stop the thread, but happily if we're running in the
+  // same thread, this means the thread can't be blocking in the event
+  // loop either.
+  if (!pthread_equal(pthread_self(), threadId_)) {
+    notify(NULL);
+  }
+}
+
+void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
+  // Start out with a standard, low-priority setup for the sched params.
+  struct sched_param sp;
+  bzero((void*) &sp, sizeof(sp));
+  int policy = SCHED_OTHER;
+
+  // If desired, set up high-priority sched params structure.
+  if (value) {
+    // FIFO scheduler, ranked above default SCHED_OTHER queue
+    policy = SCHED_FIFO;
+    // The priority only compares us to other SCHED_FIFO threads, so we
+    // just pick a random priority halfway between min & max.
+    const int priority = (sched_get_priority_max(policy) +
+                          sched_get_priority_min(policy)) / 2;
+
+    sp.sched_priority = priority;
   }
 
-  // Set up the address
-  struct sockaddr_in addr;
-  addr.sin_family = AF_INET;
-  addr.sin_addr.s_addr = htonl(0x7f000001); // 127.0.0.1
-  addr.sin_port = htons(port_);
+  // Actually set the sched params for the current thread.
+  if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
+    GlobalOutput.printf(
+      "TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
+  } else {
+    GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", errno);
+  }
+}
 
-  // Finally do the connect().
-  // We don't care about the return value;
-  // we're just going to close the socket either way.
-  connect(fd, reinterpret_cast<struct sockaddr*>(&addr), sizeof(addr));
-  close(fd);
+void TNonblockingIOThread::run() {
+  threadId_ = pthread_self();
+
+  assert(eventBase_ == 0);
+  eventBase_ = event_base_new();
+
+  // Print some libevent stats
+  if (number_ == 0) {
+    GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
+            event_get_version(),
+            event_base_get_method(eventBase_));
+  }
+
+
+  registerEvents();
+
+  GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...",
+                      number_);
+
+  if (useHighPriority_) {
+    setCurrentThreadHighPriority(true);
+  }
+
+  // Run libevent engine, never returns, invokes calls to eventHandler
+  event_base_loop(eventBase_, 0);
+
+  if (useHighPriority_) {
+    setCurrentThreadHighPriority(false);
+  }
+
+  // cleans up our registered events
+  cleanupEvents();
+
+  GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!",
+    number_);
+}
+
+void TNonblockingIOThread::cleanupEvents() {
+  // stop the listen socket, if any
+  if (listenSocket_ >= 0) {
+    if (event_del(&serverEvent_) == -1) {
+      GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", errno);
+    }
+  }
+
+  event_del(&notificationEvent_);
+}
+
+
+void TNonblockingIOThread::stop() {
+  // This should cause the thread to fall out of its event loop ASAP.
+  breakLoop(false);
+}
+
+void TNonblockingIOThread::join() {
+  // If this was a thread created by a factory (not the thread that called
+  // serve()), we join() it to make sure we shut down fully.
+  if (thread_) {
+    try {
+      // Note that it is safe to both join() ourselves twice, as well as join
+      // the current thread as the pthread implementation checks for deadlock.
+      thread_->join();
+    } catch(...) {
+      // swallow everything
+    }
+  }
 }
 
 }}} // apache::thrift::server
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index da36045..84e384c 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -26,7 +26,11 @@
 #include <transport/TSocket.h>
 #include <concurrency/ThreadManager.h>
 #include <climits>
+#include <concurrency/Thread.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Mutex.h>
 #include <stack>
+#include <vector>
 #include <string>
 #include <errno.h>
 #include <cstdlib>
@@ -35,6 +39,8 @@
 #endif
 #include <event.h>
 
+
+
 namespace apache { namespace thrift { namespace server {
 
 using apache::thrift::transport::TMemoryBuffer;
@@ -42,6 +48,11 @@
 using apache::thrift::protocol::TProtocol;
 using apache::thrift::concurrency::Runnable;
 using apache::thrift::concurrency::ThreadManager;
+using apache::thrift::concurrency::PosixThreadFactory;
+using apache::thrift::concurrency::ThreadFactory;
+using apache::thrift::concurrency::Thread;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::concurrency::Guard;
 
 #ifdef LIBEVENT_VERSION_NUMBER
 #define LIBEVENT_VERSION_MAJOR (LIBEVENT_VERSION_NUMBER >> 24)
@@ -78,9 +89,10 @@
 }
 
 /**
- * This is a non-blocking server in C++ for high performance that operates a
- * single IO thread. It assumes that all incoming requests are framed with a
- * 4 byte length indicator and writes out responses using the same framing.
+ * This is a non-blocking server in C++ for high performance that
+ * operates a set of IO threads (by default only one). It assumes that
+ * all incoming requests are framed with a 4 byte length indicator and
+ * writes out responses using the same framing.
  *
  * It does not use the TServerTransport framework, but rather has socket
  * operations hardcoded for use with select.
@@ -95,10 +107,14 @@
   T_OVERLOAD_DRAIN_TASK_QUEUE  ///< Drop some tasks from head of task queue */
 };
 
+class TNonblockingIOThread;
+
 class TNonblockingServer : public TServer {
  private:
   class TConnection;
 
+  friend class TNonblockingIOThread;
+ private:
   /// Listen backlog
   static const int LISTEN_BACKLOG = 1024;
 
@@ -123,6 +139,18 @@
   /// # of calls before resizing oversized buffers (0 = check only on close)
   static const int RESIZE_BUFFER_EVERY_N = 512;
 
+  /// # of IO threads to use by default
+  static const int DEFAULT_IO_THREADS = 1;
+
+  /// File descriptor of an invalid socket
+  static const int INVALID_SOCKET = -1;
+
+  /// # of IO threads this server will use
+  size_t numIOThreads_;
+
+  /// Whether to set high scheduling priority for IO threads
+  bool useHighPriorityIOThreads_;
+
   /// Server socket file descriptor
   int serverSocket_;
 
@@ -135,15 +163,17 @@
   /// Is thread pool processing?
   bool threadPoolProcessing_;
 
-  /// The event base for libevent
-  event_base* eventBase_;
-  bool ownEventBase_;
+  // Factory to create the IO threads
+  boost::shared_ptr<PosixThreadFactory> ioThreadFactory_;
 
-  /// Event struct, used with eventBase_ for connection events
-  struct event serverEvent_;
+  // Vector of IOThread objects that will handle our IO
+  std::vector<boost::shared_ptr<TNonblockingIOThread> > ioThreads_;
 
-  /// Event struct, used with eventBase_ for task completion notification
-  struct event notificationEvent_;
+  // Index of next IO Thread to be used (for round-robin)
+  int nextIOThread_;
+
+  // Synchronizes access to connection stack and similar data
+  Mutex connMutex_;
 
   /// Number of TConnection object we've created
   size_t numTConnections_;
@@ -211,9 +241,6 @@
   /// Count of connections dropped on overload since server started
   uint64_t nTotalConnectionsDropped_;
 
-  /// File descriptors for pipe used for task completion notification.
-  evutil_socket_t notificationPipeFDs_[2];
-
   /**
    * This is a stack of all the objects that have been created but that
    * are NOT currently in use. When we close a connection, we place it on this
@@ -234,10 +261,11 @@
 
   void init(int port) {
     serverSocket_ = -1;
+    numIOThreads_ = DEFAULT_IO_THREADS;
+    nextIOThread_ = 0;
+    useHighPriorityIOThreads_ = false;
     port_ = port;
     threadPoolProcessing_ = false;
-    eventBase_ = NULL;
-    ownEventBase_ = false;
     numTConnections_ = 0;
     numActiveProcessors_ = 0;
     connectionStackLimit_ = CONNECTION_STACK_LIMIT;
@@ -360,6 +388,31 @@
   }
 
   /**
+   * Sets the number of IO threads used by this server. Can only be used before
+   * the call to serve() and has no effect afterwards.  We always use a
+   * PosixThreadFactory for the IO worker threads, because they must joinable
+   * for clean shutdown.
+   */
+  void setNumIOThreads(size_t numThreads) {
+    numIOThreads_ = numThreads;
+  }
+
+  /** Return whether the IO threads will get high scheduling priority */
+  bool useHighPriorityIOThreads() const {
+    return useHighPriorityIOThreads_;
+  }
+
+  /** Set whether the IO threads will get high scheduling priority. */
+  void setUseHighPriorityIOThreads(bool val) {
+    useHighPriorityIOThreads_ = val;
+  }
+
+  /** Return the number of IO threads used by this server. */
+  size_t getNumIOThreads() const {
+    return numIOThreads_;
+  }
+
+  /**
    * Get the maximum number of unused TConnection we will hold in reserve.
    *
    * @return the current limit on TConnection pool size.
@@ -385,20 +438,6 @@
     threadManager_->add(task, 0LL, taskExpireTime_);
   }
 
-  event_base* getEventBase() const {
-    return eventBase_;
-  }
-
-  /// Increment our count of the number of connected sockets.
-  void incrementNumConnections() {
-    ++numTConnections_;
-  }
-
-  /// Decrement our count of the number of connected sockets.
-  void decrementNumConnections() {
-    --numTConnections_;
-  }
-
   /**
    * Return the count of sockets currently connected to.
    *
@@ -431,11 +470,13 @@
 
   /// Increment the count of connections currently processing.
   void incrementActiveProcessors() {
+    Guard g(connMutex_);
     ++numActiveProcessors_;
   }
 
   /// Decrement the count of connections currently processing.
   void decrementActiveProcessors() {
+    Guard g(connMutex_);
     if (numActiveProcessors_ > 0) {
       --numActiveProcessors_;
     }
@@ -615,7 +656,7 @@
     idleReadBufferLimit_ = limit;
   }
 
-  
+
 
   /**
    * Get the maximum size of write buffer allocated to idle TConnection objects.
@@ -659,21 +700,48 @@
     resizeBufferEveryN_ = count;
   }
 
+  /**
+   * Main workhorse function, starts up the server listening on a port and
+   * loops over the libevent handler.
+   */
+  void serve();
 
+  /**
+   * Causes the server to terminate gracefully (can be called from any thread).
+   */
+  void stop();
 
+ private:
+  /**
+   * Callback function that the threadmanager calls when a task reaches
+   * its expiration time.  It is needed to clean up the expired connection.
+   *
+   * @param task the runnable associated with the expired task.
+   */
+  void expireClose(boost::shared_ptr<Runnable> task);
+
+  /// Creates a socket to listen on and binds it to the local port.
+  void createAndListenOnSocket();
+
+  /**
+   * Takes a socket created by createAndListenOnSocket() and sets various
+   * options on it to prepare for use in the server.
+   *
+   * @param fd descriptor of socket to be initialized/
+   */
+  void listenSocket(int fd);
   /**
    * Return an initialized connection object.  Creates or recovers from
    * pool a TConnection and initializes it with the provided socket FD
    * and flags.
    *
    * @param socket FD of socket associated with this connection.
-   * @param flags initial lib_event flags for this connection.
    * @param addr the sockaddr of the client
    * @param addrLen the length of addr
    * @return pointer to initialized TConnection object.
    */
-  TConnection* createConnection(int socket, short flags,
-                                const sockaddr* addr, socklen_t addrLen);
+  TConnection* createConnection(int socket, const sockaddr* addr,
+                                            socklen_t addrLen);
 
   /**
    * Returns a connection to pool or deletion.  If the connection pool
@@ -683,14 +751,67 @@
    * @param connection the TConection being returned.
    */
   void returnConnection(TConnection* connection);
+};
 
+class TNonblockingIOThread : public Runnable {
+ public:
+  // Creates an IO thread and sets up the event base.  The listenSocket should
+  // be a valid FD on which listen() has already been called.  If the
+  // listenSocket is < 0, accepting will not be done.
+  TNonblockingIOThread(TNonblockingServer* server,
+                       int number,
+                       int listenSocket,
+                       bool useHighPriority);
+
+  ~TNonblockingIOThread();
+
+  // Returns the event-base for this thread.
+  event_base* getEventBase() const { return eventBase_; }
+
+  // Returns the server for this thread.
+  TNonblockingServer* getServer() const { return server_; }
+
+  // Returns the number of this IO thread.
+  int getThreadNumber() const { return number_; }
+
+  // Returns the thread id associated with this object.  This should
+  // only be called after the thread has been started.
+  pthread_t getThreadId() const { return threadId_; }
+
+  // Returns the send-fd for task complete notifications.
+  int getNotificationSendFD() const { return notificationPipeFDs_[1]; }
+
+  // Returns the read-fd for task complete notifications.
+  int getNotificationRecvFD() const { return notificationPipeFDs_[0]; }
+
+  // Returns the actual thread object associated with this IO thread.
+  boost::shared_ptr<Thread> getThread() const { return thread_; }
+
+  // Sets the actual thread object associated with this IO thread.
+  void setThread(const boost::shared_ptr<Thread>& t) { thread_ = t; }
+
+  // Used by TConnection objects to indicate processing has finished.
+  bool notify(TNonblockingServer::TConnection* conn);
+
+  // Enters the event loop and does not return until a call to stop().
+  virtual void run();
+
+  // Exits the event loop as soon as possible.
+  void stop();
+
+  // Ensures that the event-loop thread is fully finished and shut down.
+  void join();
+
+ private:
   /**
-   * Callback function that the threadmanager calls when a task reaches
-   * its expiration time.  It is needed to clean up the expired connection.
+   * C-callable event handler for signaling task completion.  Provides a
+   * callback that libevent can understand that will read a connection
+   * object's address from a pipe and call connection->transition() for
+   * that object.
    *
-   * @param task the runnable associated with the expired task.
+   * @param fd the descriptor the event occurred on.
    */
-  void expireClose(boost::shared_ptr<Runnable> task);
+  static void notifyHandler(int fd, short which, void* v);
 
   /**
    * C-callable event handler for listener events.  Provides a callback
@@ -700,63 +821,57 @@
    * @param which the flags associated with the event.
    * @param v void* callback arg where we placed TNonblockingServer's "this".
    */
-  static void eventHandler(evutil_socket_t fd, short which, void* v) {
+  static void listenHandler(evutil_socket_t fd, short which, void* v) {
     ((TNonblockingServer*)v)->handleEvent(fd, which);
   }
 
-  /// Creates a socket to listen on and binds it to the local port.
-  void listenSocket();
+  /// Exits the loop ASAP in case of shutdown or error.
+  void breakLoop(bool error);
 
-  /**
-   * Takes a socket created by listenSocket() and sets various options on it
-   * to prepare for use in the server.
-   *
-   * @param fd descriptor of socket to be initialized/
-   */
-  void listenSocket(int fd);
+  /// Registers the events for the notification & listen sockets
+  void registerEvents();
 
   /// Create the pipe used to notify I/O process of task completion.
   void createNotificationPipe();
 
-  /**
-   * Get notification pipe send descriptor.
-   *
-   * @return write fd for pipe.
-   */
-  evutil_socket_t getNotificationSendFD() const {
-    return notificationPipeFDs_[1];
-  }
+  /// Unregisters our events for notification and listen sockets.
+  void cleanupEvents();
 
-  /**
-   * Get notification pipe receive descriptor.
-   *
-   * @return read fd of pipe.
-   */
-  evutil_socket_t getNotificationRecvFD() const {
-    return notificationPipeFDs_[0];
-  }
+  /// Sets (or clears) high priority scheduling status for the current thread.
+  void setCurrentThreadHighPriority(bool value);
 
-  /**
-   * Register the core libevent events onto the proper base.
-   *
-   * @param base pointer to the event base to be initialized.
-   * @param ownEventBase if true, this server is responsible for
-   * freeing the event base memory.
-   */
-  void registerEvents(event_base* base, bool ownEventBase = true);
+ private:
+  /// associated server
+  TNonblockingServer* server_;
 
-  /**
-   * Main workhorse function, starts up the server listening on a port and
-   * loops over the libevent handler.
-   */
-  void serve();
+  /// thread number (for debugging).
+  const int number_;
 
-  /**
-   * May be called from a separate thread to cause serve() to return.
-   */
-  void stop();
+  /// The actual physical thread id.
+  pthread_t threadId_;
+
+  /// If listenSocket_ >= 0, adds an event on the event_base to accept conns
+  int listenSocket_;
+
+  /// Sets a high scheduling priority when running
+  bool useHighPriority_;
+
+  /// pointer to eventbase to be used for looping
+  event_base* eventBase_;
+
+  /// Used with eventBase_ for connection events (only in listener thread)
+  struct event serverEvent_;
+
+  /// Used with eventBase_ for task completion notification
+  struct event notificationEvent_;
+
+ /// File descriptors for pipe used for task completion notification.
+  int notificationPipeFDs_[2];
+
+  /// Actual IO Thread
+  boost::shared_ptr<Thread> thread_;
 };
 
 }}} // apache::thrift::server
 
-#endif // #ifndef _THRIFT_SERVER_TSIMPLESERVER_H_
+#endif // #ifndef _THRIFT_SERVER_TNONBLOCKINGSERVER_H_