THRIFT-1552 Include paths for c/c++ should be prefixed with 'thrift/'

To ensure there are no include path collisions the C and C++ header
include paths should include 'thrift' as the root leaf. This will
prevent having to place /usr/include/thrift into the compilers include
header search path, which might otherwise result in the compiler
accidentally picking up headers that it shouldn't.

e.g. #include <foo/bar.h> should be #include <thrift/foo/bar.h>

Change-Id: I48f2b0f549bda0fc81e85506ac857adc800b98a1

git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1325674 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
new file mode 100644
index 0000000..3e95508
--- /dev/null
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -0,0 +1,1535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#define __STDC_FORMAT_MACROS
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include "TNonblockingServer.h"
+#include <thrift/concurrency/Exception.h>
+#include <thrift/transport/TSocket.h>
+#include <thrift/concurrency/PlatformThreadFactory.h>
+
+#include <iostream>
+
+#ifdef HAVE_SYS_SOCKET_H
+#include <sys/socket.h>
+#endif
+
+#ifdef HAVE_NETINET_IN_H
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#endif
+
+#ifdef HAVE_ARPA_INET_H
+#include <arpa/inet.h>
+#endif
+
+#ifdef HAVE_NETDB_H
+#include <netdb.h>
+#endif
+
+#ifdef HAVE_FCNTL_H
+#include <fcntl.h>
+#endif
+
+#include <errno.h>
+#include <assert.h>
+
+#ifdef HAVE_SCHED_H
+#include <sched.h>
+#endif
+
+#ifndef AF_LOCAL
+#define AF_LOCAL AF_UNIX
+#endif
+
+#ifdef _MSC_VER
+#define PRIu32 "I32u"
+#endif
+
+namespace apache { namespace thrift { namespace server {
+
+using namespace apache::thrift::protocol;
+using namespace apache::thrift::transport;
+using namespace apache::thrift::concurrency;
+using namespace std;
+using apache::thrift::transport::TSocket;
+using apache::thrift::transport::TTransportException;
+using boost::shared_ptr;
+
+/// Three states for sockets: recv frame size, recv data, and send mode
+enum TSocketState {
+  SOCKET_RECV_FRAMING,
+  SOCKET_RECV,
+  SOCKET_SEND
+};
+
+/**
+ * Five states for the nonblocking server:
+ *  1) initialize
+ *  2) read 4 byte frame size
+ *  3) read frame of data
+ *  4) send back data (if any)
+ *  5) force immediate connection close
+ */
+enum TAppState {
+  APP_INIT,
+  APP_READ_FRAME_SIZE,
+  APP_READ_REQUEST,
+  APP_WAIT_TASK,
+  APP_SEND_RESULT,
+  APP_CLOSE_CONNECTION
+};
+
+/**
+ * Represents a connection that is handled via libevent. This connection
+ * essentially encapsulates a socket that has some associated libevent state.
+ */
+class TNonblockingServer::TConnection {
+ private:
+  /// Server IO Thread handling this connection
+  TNonblockingIOThread* ioThread_;
+
+  /// Server handle
+  TNonblockingServer* server_;
+
+  /// TProcessor
+  boost::shared_ptr<TProcessor> processor_;
+
+  /// Object wrapping network socket
+  boost::shared_ptr<TSocket> tSocket_;
+
+  /// Libevent object
+  struct event event_;
+
+  /// Libevent flags
+  short eventFlags_;
+
+  /// Socket mode
+  TSocketState socketState_;
+
+  /// Application state
+  TAppState appState_;
+
+  /// How much data needed to read
+  uint32_t readWant_;
+
+  /// Where in the read buffer are we
+  uint32_t readBufferPos_;
+
+  /// Read buffer
+  uint8_t* readBuffer_;
+
+  /// Read buffer size
+  uint32_t readBufferSize_;
+
+  /// Write buffer
+  uint8_t* writeBuffer_;
+
+  /// Write buffer size
+  uint32_t writeBufferSize_;
+
+  /// How far through writing are we?
+  uint32_t writeBufferPos_;
+
+  /// Largest size of write buffer seen since buffer was constructed
+  size_t largestWriteBufferSize_;
+
+  /// Count of the number of calls for use with getResizeBufferEveryN().
+  int32_t callsForResize_;
+
+  /// Task handle
+  int taskHandle_;
+
+  /// Task event
+  struct event taskEvent_;
+
+  /// Transport to read from
+  boost::shared_ptr<TMemoryBuffer> inputTransport_;
+
+  /// Transport that processor writes to
+  boost::shared_ptr<TMemoryBuffer> outputTransport_;
+
+  /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
+  boost::shared_ptr<TTransport> factoryInputTransport_;
+  boost::shared_ptr<TTransport> factoryOutputTransport_;
+
+  /// Protocol decoder
+  boost::shared_ptr<TProtocol> inputProtocol_;
+
+  /// Protocol encoder
+  boost::shared_ptr<TProtocol> outputProtocol_;
+
+  /// Server event handler, if any
+  boost::shared_ptr<TServerEventHandler> serverEventHandler_;
+
+  /// Thrift call context, if any
+  void *connectionContext_;
+
+  /// Go into read mode
+  void setRead() {
+    setFlags(EV_READ | EV_PERSIST);
+  }
+
+  /// Go into write mode
+  void setWrite() {
+    setFlags(EV_WRITE | EV_PERSIST);
+  }
+
+  /// Set socket idle
+  void setIdle() {
+    setFlags(0);
+  }
+
+  /**
+   * Set event flags for this connection.
+   *
+   * @param eventFlags flags we pass to libevent for the connection.
+   */
+  void setFlags(short eventFlags);
+
+  /**
+   * Libevent handler called (via our static wrapper) when the connection
+   * socket had something happen.  Rather than use the flags libevent passed,
+   * we use the connection state to determine whether we need to read or
+   * write the socket.
+   */
+  void workSocket();
+
+  /// Close this connection and free or reset its resources.
+  void close();
+
+ public:
+
+  class Task;
+
+  /// Constructor
+  TConnection(int socket, TNonblockingIOThread* ioThread,
+              const sockaddr* addr, socklen_t addrLen) {
+    readBuffer_ = NULL;
+    readBufferSize_ = 0;
+
+    ioThread_ = ioThread;
+    server_ = ioThread->getServer();
+
+    // Allocate input and output transports these only need to be allocated
+    // once per TConnection (they don't need to be reallocated on init() call)
+    inputTransport_.reset(new TMemoryBuffer(readBuffer_, readBufferSize_));
+    outputTransport_.reset(new TMemoryBuffer(
+                                    server_->getWriteBufferDefaultSize()));
+    tSocket_.reset(new TSocket());
+    init(socket, ioThread, addr, addrLen);
+  }
+
+  ~TConnection() {
+    std::free(readBuffer_);
+  }
+
+ /**
+   * Check buffers against any size limits and shrink it if exceeded.
+   *
+   * @param readLimit we reduce read buffer size to this (if nonzero).
+   * @param writeLimit if nonzero and write buffer is larger, replace it.
+   */
+  void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
+
+  /// Initialize
+  void init(int socket, TNonblockingIOThread* ioThread,
+            const sockaddr* addr, socklen_t addrLen);
+
+  /**
+   * This is called when the application transitions from one state into
+   * another. This means that it has finished writing the data that it needed
+   * to, or finished receiving the data that it needed to.
+   */
+  void transition();
+
+  /**
+   * C-callable event handler for connection events.  Provides a callback
+   * that libevent can understand which invokes connection_->workSocket().
+   *
+   * @param fd the descriptor the event occurred on.
+   * @param which the flags associated with the event.
+   * @param v void* callback arg where we placed TConnection's "this".
+   */
+  static void eventHandler(evutil_socket_t fd, short /* which */, void* v) {
+    assert(fd == ((TConnection*)v)->getTSocket()->getSocketFD());
+    ((TConnection*)v)->workSocket();
+  }
+
+  /**
+   * Notification to server that processing has ended on this request.
+   * Can be called either when processing is completed or when a waiting
+   * task has been preemptively terminated (on overload).
+   *
+   * Don't call this from the IO thread itself.
+   *
+   * @return true if successful, false if unable to notify (check errno).
+   */
+  bool notifyIOThread() {
+    return ioThread_->notify(this);
+  }
+
+  /*
+   * Returns the number of this connection's currently assigned IO
+   * thread.
+   */
+  int getIOThreadNumber() const {
+    return ioThread_->getThreadNumber();
+  }
+
+  /// Force connection shutdown for this connection.
+  void forceClose() {
+    appState_ = APP_CLOSE_CONNECTION;
+    if (!notifyIOThread()) {
+      throw TException("TConnection::forceClose: failed write on notify pipe");
+    }
+  }
+
+  /// return the server this connection was initialized for.
+  TNonblockingServer* getServer() const {
+    return server_;
+  }
+
+  /// get state of connection.
+  TAppState getState() const {
+    return appState_;
+  }
+
+  /// return the TSocket transport wrapping this network connection
+  boost::shared_ptr<TSocket> getTSocket() const {
+    return tSocket_;
+  }
+
+  /// return the server event handler if any
+  boost::shared_ptr<TServerEventHandler> getServerEventHandler() {
+    return serverEventHandler_;
+  }
+
+  /// return the Thrift connection context if any
+  void* getConnectionContext() {
+    return connectionContext_;
+  }
+
+};
+
+class TNonblockingServer::TConnection::Task: public Runnable {
+ public:
+  Task(boost::shared_ptr<TProcessor> processor,
+       boost::shared_ptr<TProtocol> input,
+       boost::shared_ptr<TProtocol> output,
+       TConnection* connection) :
+    processor_(processor),
+    input_(input),
+    output_(output),
+    connection_(connection),
+    serverEventHandler_(connection_->getServerEventHandler()),
+    connectionContext_(connection_->getConnectionContext()) {}
+
+  void run() {
+    try {
+      for (;;) {
+        if (serverEventHandler_ != NULL) {
+          serverEventHandler_->processContext(connectionContext_, connection_->getTSocket());
+        }
+        if (!processor_->process(input_, output_, connectionContext_) ||
+            !input_->getTransport()->peek()) {
+          break;
+        }
+      }
+    } catch (const TTransportException& ttx) {
+      GlobalOutput.printf("TNonblockingServer: client died: %s", ttx.what());
+    } catch (const bad_alloc&) {
+      GlobalOutput("TNonblockingServer: caught bad_alloc exception.");
+      exit(-1);
+    } catch (const std::exception& x) {
+      GlobalOutput.printf("TNonblockingServer: process() exception: %s: %s",
+                          typeid(x).name(), x.what());
+    } catch (...) {
+      GlobalOutput.printf(
+        "TNonblockingServer: unknown exception while processing.");
+    }
+
+    // Signal completion back to the libevent thread via a pipe
+    if (!connection_->notifyIOThread()) {
+      throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
+    }
+  }
+
+  TConnection* getTConnection() {
+    return connection_;
+  }
+
+ private:
+  boost::shared_ptr<TProcessor> processor_;
+  boost::shared_ptr<TProtocol> input_;
+  boost::shared_ptr<TProtocol> output_;
+  TConnection* connection_;
+  boost::shared_ptr<TServerEventHandler> serverEventHandler_;
+  void* connectionContext_;
+};
+
+void TNonblockingServer::TConnection::init(int socket,
+                                           TNonblockingIOThread* ioThread,
+                                           const sockaddr* addr,
+                                           socklen_t addrLen) {
+  tSocket_->setSocketFD(socket);
+  tSocket_->setCachedAddress(addr, addrLen);
+
+  ioThread_ = ioThread;
+  server_ = ioThread->getServer();
+  appState_ = APP_INIT;
+  eventFlags_ = 0;
+
+  readBufferPos_ = 0;
+  readWant_ = 0;
+
+  writeBuffer_ = NULL;
+  writeBufferSize_ = 0;
+  writeBufferPos_ = 0;
+  largestWriteBufferSize_ = 0;
+
+  socketState_ = SOCKET_RECV_FRAMING;
+  callsForResize_ = 0;
+
+  // get input/transports
+  factoryInputTransport_ = server_->getInputTransportFactory()->getTransport(
+                             inputTransport_);
+  factoryOutputTransport_ = server_->getOutputTransportFactory()->getTransport(
+                             outputTransport_);
+
+  // Create protocol
+  inputProtocol_ = server_->getInputProtocolFactory()->getProtocol(
+                     factoryInputTransport_);
+  outputProtocol_ = server_->getOutputProtocolFactory()->getProtocol(
+                     factoryOutputTransport_);
+
+  // Set up for any server event handler
+  serverEventHandler_ = server_->getEventHandler();
+  if (serverEventHandler_ != NULL) {
+    connectionContext_ = serverEventHandler_->createContext(inputProtocol_,
+                                                            outputProtocol_);
+  } else {
+    connectionContext_ = NULL;
+  }
+
+  // Get the processor
+  processor_ = server_->getProcessor(inputProtocol_, outputProtocol_, tSocket_);
+}
+
+void TNonblockingServer::TConnection::workSocket() {
+  int got=0, left=0, sent=0;
+  uint32_t fetch = 0;
+
+  switch (socketState_) {
+  case SOCKET_RECV_FRAMING:
+    union {
+      uint8_t buf[sizeof(uint32_t)];
+      uint32_t size;
+    } framing;
+
+    // if we've already received some bytes we kept them here
+    framing.size = readWant_;
+    // determine size of this frame
+    try {
+      // Read from the socket
+      fetch = tSocket_->read(&framing.buf[readBufferPos_],
+                             uint32_t(sizeof(framing.size) - readBufferPos_));
+      if (fetch == 0) {
+        // Whenever we get here it means a remote disconnect
+        close();
+        return;
+      }
+      readBufferPos_ += fetch;
+    } catch (TTransportException& te) {
+      GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+      close();
+
+      return;
+    }
+
+    if (readBufferPos_ < sizeof(framing.size)) {
+      // more needed before frame size is known -- save what we have so far
+      readWant_ = framing.size;
+      return;
+    }
+
+    readWant_ = ntohl(framing.size);
+    if (readWant_ > server_->getMaxFrameSize()) {
+      // Don't allow giant frame sizes.  This prevents bad clients from
+      // causing us to try and allocate a giant buffer.
+      GlobalOutput.printf("TNonblockingServer: frame size too large "
+                          "(%"PRIu32" > %zu) from client %s. remote side not "
+                          "using TFramedTransport?",
+                          readWant_, server_->getMaxFrameSize(),
+                          tSocket_->getSocketInfo().c_str());
+      close();
+      return;
+    }
+    // size known; now get the rest of the frame
+    transition();
+    return;
+
+  case SOCKET_RECV:
+    // It is an error to be in this state if we already have all the data
+    assert(readBufferPos_ < readWant_);
+
+    try {
+      // Read from the socket
+      fetch = readWant_ - readBufferPos_;
+      got = tSocket_->read(readBuffer_ + readBufferPos_, fetch);
+    }
+    catch (TTransportException& te) {
+      GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+      close();
+
+      return;
+    }
+
+    if (got > 0) {
+      // Move along in the buffer
+      readBufferPos_ += got;
+
+      // Check that we did not overdo it
+      assert(readBufferPos_ <= readWant_);
+
+      // We are done reading, move onto the next state
+      if (readBufferPos_ == readWant_) {
+        transition();
+      }
+      return;
+    }
+
+    // Whenever we get down here it means a remote disconnect
+    close();
+
+    return;
+
+  case SOCKET_SEND:
+    // Should never have position past size
+    assert(writeBufferPos_ <= writeBufferSize_);
+
+    // If there is no data to send, then let us move on
+    if (writeBufferPos_ == writeBufferSize_) {
+      GlobalOutput("WARNING: Send state with no data to send\n");
+      transition();
+      return;
+    }
+
+    try {
+      left = writeBufferSize_ - writeBufferPos_;
+      sent = tSocket_->write_partial(writeBuffer_ + writeBufferPos_, left);
+    }
+    catch (TTransportException& te) {
+      GlobalOutput.printf("TConnection::workSocket(): %s ", te.what());
+      close();
+      return;
+    }
+
+    writeBufferPos_ += sent;
+
+    // Did we overdo it?
+    assert(writeBufferPos_ <= writeBufferSize_);
+
+    // We are done!
+    if (writeBufferPos_ == writeBufferSize_) {
+      transition();
+    }
+
+    return;
+
+  default:
+    GlobalOutput.printf("Unexpected Socket State %d", socketState_);
+    assert(0);
+  }
+}
+
+/**
+ * This is called when the application transitions from one state into
+ * another. This means that it has finished writing the data that it needed
+ * to, or finished receiving the data that it needed to.
+ */
+void TNonblockingServer::TConnection::transition() {
+  // ensure this connection is active right now
+  assert(ioThread_);
+  assert(server_);
+
+  // Switch upon the state that we are currently in and move to a new state
+  switch (appState_) {
+
+  case APP_READ_REQUEST:
+    // We are done reading the request, package the read buffer into transport
+    // and get back some data from the dispatch function
+    inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
+    outputTransport_->resetBuffer();
+    // Prepend four bytes of blank space to the buffer so we can
+    // write the frame size there later.
+    outputTransport_->getWritePtr(4);
+    outputTransport_->wroteBytes(4);
+
+    server_->incrementActiveProcessors();
+
+    if (server_->isThreadPoolProcessing()) {
+      // We are setting up a Task to do this work and we will wait on it
+
+      // Create task and dispatch to the thread manager
+      boost::shared_ptr<Runnable> task =
+        boost::shared_ptr<Runnable>(new Task(processor_,
+                                             inputProtocol_,
+                                             outputProtocol_,
+                                             this));
+      // The application is now waiting on the task to finish
+      appState_ = APP_WAIT_TASK;
+
+        try {
+          server_->addTask(task);
+        } catch (IllegalStateException & ise) {
+          // The ThreadManager is not ready to handle any more tasks (it's probably shutting down).
+          GlobalOutput.printf("IllegalStateException: Server::process() %s", ise.what());
+          close();
+        }
+
+      // Set this connection idle so that libevent doesn't process more
+      // data on it while we're still waiting for the threadmanager to
+      // finish this task
+      setIdle();
+      return;
+    } else {
+      try {
+        // Invoke the processor
+        processor_->process(inputProtocol_, outputProtocol_,
+                            connectionContext_);
+      } catch (const TTransportException &ttx) {
+        GlobalOutput.printf("TNonblockingServer transport error in "
+                            "process(): %s", ttx.what());
+        server_->decrementActiveProcessors();
+        close();
+        return;
+      } catch (const std::exception &x) {
+        GlobalOutput.printf("Server::process() uncaught exception: %s: %s",
+                            typeid(x).name(), x.what());
+        server_->decrementActiveProcessors();
+        close();
+        return;
+      } catch (...) {
+        GlobalOutput.printf("Server::process() unknown exception");
+        server_->decrementActiveProcessors();
+        close();
+        return;
+      }
+    }
+
+    // Intentionally fall through here, the call to process has written into
+    // the writeBuffer_
+
+  case APP_WAIT_TASK:
+    // We have now finished processing a task and the result has been written
+    // into the outputTransport_, so we grab its contents and place them into
+    // the writeBuffer_ for actual writing by the libevent thread
+
+    server_->decrementActiveProcessors();
+    // Get the result of the operation
+    outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
+
+    // If the function call generated return data, then move into the send
+    // state and get going
+    // 4 bytes were reserved for frame size
+    if (writeBufferSize_ > 4) {
+
+      // Move into write state
+      writeBufferPos_ = 0;
+      socketState_ = SOCKET_SEND;
+
+      // Put the frame size into the write buffer
+      int32_t frameSize = (int32_t)htonl(writeBufferSize_ - 4);
+      memcpy(writeBuffer_, &frameSize, 4);
+
+      // Socket into write mode
+      appState_ = APP_SEND_RESULT;
+      setWrite();
+
+      // Try to work the socket immediately
+      // workSocket();
+
+      return;
+    }
+
+    // In this case, the request was oneway and we should fall through
+    // right back into the read frame header state
+    goto LABEL_APP_INIT;
+
+  case APP_SEND_RESULT:
+    // it's now safe to perform buffer size housekeeping.
+    if (writeBufferSize_ > largestWriteBufferSize_) {
+      largestWriteBufferSize_ = writeBufferSize_;
+    }
+    if (server_->getResizeBufferEveryN() > 0
+        && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
+      checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
+                              server_->getIdleWriteBufferLimit());
+      callsForResize_ = 0;
+    }
+
+    // N.B.: We also intentionally fall through here into the INIT state!
+
+  LABEL_APP_INIT:
+  case APP_INIT:
+
+    // Clear write buffer variables
+    writeBuffer_ = NULL;
+    writeBufferPos_ = 0;
+    writeBufferSize_ = 0;
+
+    // Into read4 state we go
+    socketState_ = SOCKET_RECV_FRAMING;
+    appState_ = APP_READ_FRAME_SIZE;
+
+    readBufferPos_ = 0;
+
+    // Register read event
+    setRead();
+
+    // Try to work the socket right away
+    // workSocket();
+
+    return;
+
+  case APP_READ_FRAME_SIZE:
+    // We just read the request length
+    // Double the buffer size until it is big enough
+    if (readWant_ > readBufferSize_) {
+      if (readBufferSize_ == 0) {
+        readBufferSize_ = 1;
+      }
+      uint32_t newSize = readBufferSize_;
+      while (readWant_ > newSize) {
+        newSize *= 2;
+      }
+
+      uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
+      if (newBuffer == NULL) {
+        // nothing else to be done...
+        throw std::bad_alloc();
+      }
+      readBuffer_ = newBuffer;
+      readBufferSize_ = newSize;
+    }
+
+    readBufferPos_= 0;
+
+    // Move into read request state
+    socketState_ = SOCKET_RECV;
+    appState_ = APP_READ_REQUEST;
+
+    // Work the socket right away
+    // workSocket();
+
+    return;
+
+  case APP_CLOSE_CONNECTION:
+    server_->decrementActiveProcessors();
+    close();
+    return;
+
+  default:
+    GlobalOutput.printf("Unexpected Application State %d", appState_);
+    assert(0);
+  }
+}
+
+void TNonblockingServer::TConnection::setFlags(short eventFlags) {
+  // Catch the do nothing case
+  if (eventFlags_ == eventFlags) {
+    return;
+  }
+
+  // Delete a previously existing event
+  if (eventFlags_ != 0) {
+    if (event_del(&event_) == -1) {
+      GlobalOutput("TConnection::setFlags event_del");
+      return;
+    }
+  }
+
+  // Update in memory structure
+  eventFlags_ = eventFlags;
+
+  // Do not call event_set if there are no flags
+  if (!eventFlags_) {
+    return;
+  }
+
+  /*
+   * event_set:
+   *
+   * Prepares the event structure &event to be used in future calls to
+   * event_add() and event_del().  The event will be prepared to call the
+   * eventHandler using the 'sock' file descriptor to monitor events.
+   *
+   * The events can be either EV_READ, EV_WRITE, or both, indicating
+   * that an application can read or write from the file respectively without
+   * blocking.
+   *
+   * The eventHandler will be called with the file descriptor that triggered
+   * the event and the type of event which will be one of: EV_TIMEOUT,
+   * EV_SIGNAL, EV_READ, EV_WRITE.
+   *
+   * The additional flag EV_PERSIST makes an event_add() persistent until
+   * event_del() has been called.
+   *
+   * Once initialized, the &event struct can be used repeatedly with
+   * event_add() and event_del() and does not need to be reinitialized unless
+   * the eventHandler and/or the argument to it are to be changed.  However,
+   * when an ev structure has been added to libevent using event_add() the
+   * structure must persist until the event occurs (assuming EV_PERSIST
+   * is not set) or is removed using event_del().  You may not reuse the same
+   * ev structure for multiple monitored descriptors; each descriptor needs
+   * its own ev.
+   */
+  event_set(&event_, tSocket_->getSocketFD(), eventFlags_,
+            TConnection::eventHandler, this);
+  event_base_set(ioThread_->getEventBase(), &event_);
+
+  // Add the event
+  if (event_add(&event_, 0) == -1) {
+    GlobalOutput("TConnection::setFlags(): could not event_add");
+  }
+}
+
+/**
+ * Closes a connection
+ */
+void TNonblockingServer::TConnection::close() {
+  // Delete the registered libevent
+  if (event_del(&event_) == -1) {
+    GlobalOutput.perror("TConnection::close() event_del", errno);
+  }
+
+  if (serverEventHandler_ != NULL) {
+    serverEventHandler_->deleteContext(connectionContext_, inputProtocol_, outputProtocol_);
+  }
+  ioThread_ = NULL;
+
+  // Close the socket
+  tSocket_->close();
+
+  // close any factory produced transports
+  factoryInputTransport_->close();
+  factoryOutputTransport_->close();
+
+  // Give this object back to the server that owns it
+  server_->returnConnection(this);
+}
+
+void TNonblockingServer::TConnection::checkIdleBufferMemLimit(
+    size_t readLimit,
+    size_t writeLimit) {
+  if (readLimit > 0 && readBufferSize_ > readLimit) {
+    free(readBuffer_);
+    readBuffer_ = NULL;
+    readBufferSize_ = 0;
+  }
+
+  if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
+    // just start over
+    outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
+    largestWriteBufferSize_ = 0;
+  }
+}
+
+TNonblockingServer::~TNonblockingServer() {
+  // TODO: We currently leak any active TConnection objects.
+  // Since we're shutting down and destroying the event_base, the TConnection
+  // objects will never receive any additional callbacks.  (And even if they
+  // did, it would be bad, since they keep a pointer around to the server,
+  // which is being destroyed.)
+
+  // Clean up unused TConnection objects in connectionStack_
+  while (!connectionStack_.empty()) {
+    TConnection* connection = connectionStack_.top();
+    connectionStack_.pop();
+    delete connection;
+  }
+}
+
+/**
+ * Creates a new connection either by reusing an object off the stack or
+ * by allocating a new one entirely
+ */
+TNonblockingServer::TConnection* TNonblockingServer::createConnection(
+    int socket, const sockaddr* addr, socklen_t addrLen) {
+  // Check the stack
+  Guard g(connMutex_);
+
+  // pick an IO thread to handle this connection -- currently round robin
+  assert(nextIOThread_ < ioThreads_.size());
+  int selectedThreadIdx = nextIOThread_;
+  nextIOThread_ = (nextIOThread_ + 1) % ioThreads_.size();
+
+  TNonblockingIOThread* ioThread = ioThreads_[selectedThreadIdx].get();
+
+  // Check the connection stack to see if we can re-use
+  TConnection* result = NULL;
+  if (connectionStack_.empty()) {
+    result = new TConnection(socket, ioThread, addr, addrLen);
+    ++numTConnections_;
+  } else {
+    result = connectionStack_.top();
+    connectionStack_.pop();
+    result->init(socket, ioThread, addr, addrLen);
+  }
+  return result;
+}
+
+/**
+ * Returns a connection to the stack
+ */
+void TNonblockingServer::returnConnection(TConnection* connection) {
+  Guard g(connMutex_);
+
+  if (connectionStackLimit_ &&
+      (connectionStack_.size() >= connectionStackLimit_)) {
+    delete connection;
+    --numTConnections_;
+  } else {
+    connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
+    connectionStack_.push(connection);
+  }
+}
+
+/**
+ * Server socket had something happen.  We accept all waiting client
+ * connections on fd and assign TConnection objects to handle those requests.
+ */
+void TNonblockingServer::handleEvent(int fd, short which) {
+  (void) which;
+  // Make sure that libevent didn't mess up the socket handles
+  assert(fd == serverSocket_);
+
+  // Server socket accepted a new connection
+  socklen_t addrLen;
+  sockaddr_storage addrStorage;
+  sockaddr* addrp = (sockaddr*)&addrStorage;
+  addrLen = sizeof(addrStorage);
+
+  // Going to accept a new client socket
+  int clientSocket;
+
+  // Accept as many new clients as possible, even though libevent signaled only
+  // one, this helps us to avoid having to go back into the libevent engine so
+  // many times
+  while ((clientSocket = ::accept(fd, addrp, &addrLen)) != -1) {
+    // If we're overloaded, take action here
+    if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
+      Guard g(connMutex_);
+      nConnectionsDropped_++;
+      nTotalConnectionsDropped_++;
+      if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
+        ::close(clientSocket);
+        return;
+      } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
+        if (!drainPendingTask()) {
+          // Nothing left to discard, so we drop connection instead.
+          ::close(clientSocket);
+          return;
+        }
+      }
+    }
+
+    // Explicitly set this socket to NONBLOCK mode
+    int flags;
+    if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
+        fcntl(clientSocket, F_SETFL, flags | O_NONBLOCK) < 0) {
+      GlobalOutput.perror("thriftServerEventHandler: set O_NONBLOCK (fcntl) ", errno);
+      ::close(clientSocket);
+      return;
+    }
+
+    // Create a new TConnection for this client socket.
+    TConnection* clientConnection =
+      createConnection(clientSocket, addrp, addrLen);
+
+    // Fail fast if we could not create a TConnection object
+    if (clientConnection == NULL) {
+      GlobalOutput.printf("thriftServerEventHandler: failed TConnection factory");
+      ::close(clientSocket);
+      return;
+    }
+
+    /*
+     * Either notify the ioThread that is assigned this connection to
+     * start processing, or if it is us, we'll just ask this
+     * connection to do its initial state change here.
+     *
+     * (We need to avoid writing to our own notification pipe, to
+     * avoid possible deadlocks if the pipe is full.)
+     *
+     * The IO thread #0 is the only one that handles these listen
+     * events, so unless the connection has been assigned to thread #0
+     * we know it's not on our thread.
+     */
+    if (clientConnection->getIOThreadNumber() == 0) {
+      clientConnection->transition();
+    } else {
+      clientConnection->notifyIOThread();
+    }
+
+    // addrLen is written by the accept() call, so needs to be set before the next call.
+    addrLen = sizeof(addrStorage);
+  }
+
+
+  // Done looping accept, now we have to make sure the error is due to
+  // blocking. Any other error is a problem
+  if (errno != EAGAIN && errno != EWOULDBLOCK) {
+    GlobalOutput.perror("thriftServerEventHandler: accept() ", errno);
+  }
+}
+
+/**
+ * Creates a socket to listen on and binds it to the local port.
+ */
+void TNonblockingServer::createAndListenOnSocket() {
+  int s;
+
+  struct addrinfo hints, *res, *res0;
+  int error;
+
+  char port[sizeof("65536") + 1];
+  memset(&hints, 0, sizeof(hints));
+  hints.ai_family = PF_UNSPEC;
+  hints.ai_socktype = SOCK_STREAM;
+  hints.ai_flags = AI_PASSIVE | AI_ADDRCONFIG;
+  sprintf(port, "%d", port_);
+
+  // Wildcard address
+  error = getaddrinfo(NULL, port, &hints, &res0);
+  if (error) {
+    throw TException("TNonblockingServer::serve() getaddrinfo " +
+                     string(gai_strerror(error)));
+  }
+
+  // Pick the ipv6 address first since ipv4 addresses can be mapped
+  // into ipv6 space.
+  for (res = res0; res; res = res->ai_next) {
+    if (res->ai_family == AF_INET6 || res->ai_next == NULL)
+      break;
+  }
+
+  // Create the server socket
+  s = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+  if (s == -1) {
+    freeaddrinfo(res0);
+    throw TException("TNonblockingServer::serve() socket() -1");
+  }
+
+  #ifdef IPV6_V6ONLY
+  if (res->ai_family == AF_INET6) {
+    int zero = 0;
+    if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
+      GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
+    }
+  }
+  #endif // #ifdef IPV6_V6ONLY
+
+
+  int one = 1;
+
+  // Set reuseaddr to avoid 2MSL delay on server restart
+  setsockopt(s, SOL_SOCKET, SO_REUSEADDR, const_cast_sockopt(&one), sizeof(one));
+
+  if (::bind(s, res->ai_addr, res->ai_addrlen) == -1) {
+    ::close(s);
+    freeaddrinfo(res0);
+    throw TTransportException(TTransportException::NOT_OPEN,
+                              "TNonblockingServer::serve() bind",
+                              errno);
+  }
+
+  // Done with the addr info
+  freeaddrinfo(res0);
+
+  // Set up this file descriptor for listening
+  listenSocket(s);
+}
+
+/**
+ * Takes a socket created by listenSocket() and sets various options on it
+ * to prepare for use in the server.
+ */
+void TNonblockingServer::listenSocket(int s) {
+  // Set socket to nonblocking mode
+  int flags;
+  if ((flags = fcntl(s, F_GETFL, 0)) < 0 ||
+      fcntl(s, F_SETFL, flags | O_NONBLOCK) < 0) {
+    ::close(s);
+    throw TException("TNonblockingServer::serve() O_NONBLOCK");
+  }
+
+  int one = 1;
+  struct linger ling = {0, 0};
+
+  // Keepalive to ensure full result flushing
+  setsockopt(s, SOL_SOCKET, SO_KEEPALIVE, const_cast_sockopt(&one), sizeof(one));
+
+  // Turn linger off to avoid hung sockets
+  setsockopt(s, SOL_SOCKET, SO_LINGER, const_cast_sockopt(&ling), sizeof(ling));
+
+  // Set TCP nodelay if available, MAC OS X Hack
+  // See http://lists.danga.com/pipermail/memcached/2005-March/001240.html
+  #ifndef TCP_NOPUSH
+  setsockopt(s, IPPROTO_TCP, TCP_NODELAY, const_cast_sockopt(&one), sizeof(one));
+  #endif
+
+  #ifdef TCP_LOW_MIN_RTO
+  if (TSocket::getUseLowMinRto()) {
+    setsockopt(s, IPPROTO_TCP, TCP_LOW_MIN_RTO, const_cast_sockopt(&one), sizeof(one));
+  }
+  #endif
+
+  if (listen(s, LISTEN_BACKLOG) == -1) {
+    ::close(s);
+    throw TException("TNonblockingServer::serve() listen");
+  }
+
+  // Cool, this socket is good to go, set it as the serverSocket_
+  serverSocket_ = s;
+}
+
+void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
+  threadManager_ = threadManager;
+  if (threadManager != NULL) {
+    threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
+    threadPoolProcessing_ = true;
+  } else {
+    threadPoolProcessing_ = false;
+  }
+}
+
+bool  TNonblockingServer::serverOverloaded() {
+  size_t activeConnections = numTConnections_ - connectionStack_.size();
+  if (numActiveProcessors_ > maxActiveProcessors_ ||
+      activeConnections > maxConnections_) {
+    if (!overloaded_) {
+       GlobalOutput.printf("TNonblockingServer: overload condition begun.");
+      overloaded_ = true;
+    }
+  } else {
+    if (overloaded_ &&
+        (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
+        (activeConnections <= overloadHysteresis_ * maxConnections_)) {
+      GlobalOutput.printf("TNonblockingServer: overload ended; "
+                          "%u dropped (%llu total)",
+                          nConnectionsDropped_, nTotalConnectionsDropped_);
+      nConnectionsDropped_ = 0;
+      overloaded_ = false;
+    }
+  }
+
+  return overloaded_;
+}
+
+bool TNonblockingServer::drainPendingTask() {
+  if (threadManager_) {
+    boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
+    if (task) {
+      TConnection* connection =
+        static_cast<TConnection::Task*>(task.get())->getTConnection();
+      assert(connection && connection->getServer()
+             && connection->getState() == APP_WAIT_TASK);
+      connection->forceClose();
+      return true;
+    }
+  }
+  return false;
+}
+
+void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
+  TConnection* connection =
+    static_cast<TConnection::Task*>(task.get())->getTConnection();
+  assert(connection && connection->getServer() &&
+         connection->getState() == APP_WAIT_TASK);
+  connection->forceClose();
+}
+
+void TNonblockingServer::stop() {
+  // Breaks the event loop in all threads so that they end ASAP.
+  for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
+    ioThreads_[i]->stop();
+  }
+}
+
+/**
+ * Main workhorse function, starts up the server listening on a port and
+ * loops over the libevent handler.
+ */
+void TNonblockingServer::serve() {
+  // init listen socket
+  createAndListenOnSocket();
+
+  // set up the IO threads
+  assert(ioThreads_.empty());
+  if (!numIOThreads_) {
+    numIOThreads_ = DEFAULT_IO_THREADS;
+  }
+
+  for (uint32_t id = 0; id < numIOThreads_; ++id) {
+    // the first IO thread also does the listening on server socket
+    int listenFd = (id == 0 ? serverSocket_ : -1);
+
+    shared_ptr<TNonblockingIOThread> thread(
+      new TNonblockingIOThread(this, id, listenFd, useHighPriorityIOThreads_));
+    ioThreads_.push_back(thread);
+  }
+
+  // Notify handler of the preServe event
+  if (eventHandler_ != NULL) {
+    eventHandler_->preServe();
+  }
+
+  // Start all of our helper IO threads. Note that the threads run forever,
+  // only terminating if stop() is called.
+  assert(ioThreads_.size() == numIOThreads_);
+  assert(ioThreads_.size() > 0);
+
+  GlobalOutput.printf("TNonblockingServer: Serving on port %d, %d io threads.",
+               port_, ioThreads_.size());
+
+  // Launch all the secondary IO threads in separate threads
+  if (ioThreads_.size() > 1) {
+    ioThreadFactory_.reset(new PlatformThreadFactory(
+#ifndef USE_BOOST_THREAD
+      PlatformThreadFactory::OTHER,  // scheduler
+      PlatformThreadFactory::NORMAL, // priority
+      1,                          // stack size (MB)
+#endif
+      false                       // detached
+    ));
+
+    assert(ioThreadFactory_.get());
+
+    // intentionally starting at thread 1, not 0
+    for (uint32_t i = 1; i < ioThreads_.size(); ++i) {
+      shared_ptr<Thread> thread = ioThreadFactory_->newThread(ioThreads_[i]);
+      ioThreads_[i]->setThread(thread);
+      thread->start();
+    }
+  }
+
+  // Run the primary (listener) IO thread loop in our main thread; this will
+  // only return when the server is shutting down.
+  ioThreads_[0]->run();
+
+  // Ensure all threads are finished before exiting serve()
+  for (uint32_t i = 0; i < ioThreads_.size(); ++i) {
+    ioThreads_[i]->join();
+    GlobalOutput.printf("TNonblocking: join done for IO thread #%d", i);
+  }
+}
+
+TNonblockingIOThread::TNonblockingIOThread(TNonblockingServer* server,
+                                           int number,
+                                           int listenSocket,
+                                           bool useHighPriority)
+      : server_(server)
+      , number_(number)
+      , listenSocket_(listenSocket)
+      , useHighPriority_(useHighPriority)
+      , eventBase_(NULL) {
+  notificationPipeFDs_[0] = -1;
+  notificationPipeFDs_[1] = -1;
+}
+
+TNonblockingIOThread::~TNonblockingIOThread() {
+  // make sure our associated thread is fully finished
+  join();
+
+  if (eventBase_) {
+    event_base_free(eventBase_);
+  }
+
+  if (listenSocket_ >= 0) {
+    if (0 != ::close(listenSocket_)) {
+      GlobalOutput.perror("TNonblockingIOThread listenSocket_ close(): ",
+                          errno);
+    }
+    listenSocket_ = TNonblockingServer::INVALID_SOCKET_VALUE;
+  }
+
+  for (int i = 0; i < 2; ++i) {
+    if (notificationPipeFDs_[i] >= 0) {
+      if (0 != ::close(notificationPipeFDs_[i])) {
+        GlobalOutput.perror("TNonblockingIOThread notificationPipe close(): ",
+                            errno);
+      }
+      notificationPipeFDs_[i] = TNonblockingServer::INVALID_SOCKET_VALUE;
+    }
+  }
+}
+
+void TNonblockingIOThread::createNotificationPipe() {
+  if(evutil_socketpair(AF_LOCAL, SOCK_STREAM, 0, notificationPipeFDs_) == -1) {
+    GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", EVUTIL_SOCKET_ERROR());
+    throw TException("can't create notification pipe");
+  }
+  if(evutil_make_socket_nonblocking(notificationPipeFDs_[0])<0 ||
+     evutil_make_socket_nonblocking(notificationPipeFDs_[1])<0) {
+    ::close(notificationPipeFDs_[0]);
+    ::close(notificationPipeFDs_[1]);
+    throw TException("TNonblockingServer::createNotificationPipe() O_NONBLOCK");
+  }
+  for (int i = 0; i < 2; ++i) {
+#if LIBEVENT_VERSION_NUMBER < 0x02000000
+    int flags;
+    if ((flags = fcntl(notificationPipeFDs_[i], F_GETFD, 0)) < 0 ||
+        fcntl(notificationPipeFDs_[i], F_SETFD, flags | FD_CLOEXEC) < 0) {
+#else
+    if (evutil_make_socket_closeonexec(notificationPipeFDs_[i]) < 0) {
+#endif
+      ::close(notificationPipeFDs_[0]);
+      ::close(notificationPipeFDs_[1]);
+      throw TException("TNonblockingServer::createNotificationPipe() "
+        "FD_CLOEXEC");
+    }
+  }
+}
+
+/**
+ * Register the core libevent events onto the proper base.
+ */
+void TNonblockingIOThread::registerEvents() {
+  if (listenSocket_ >= 0) {
+    // Register the server event
+    event_set(&serverEvent_,
+              listenSocket_,
+              EV_READ | EV_PERSIST,
+              TNonblockingIOThread::listenHandler,
+              server_);
+    event_base_set(eventBase_, &serverEvent_);
+
+    // Add the event and start up the server
+    if (-1 == event_add(&serverEvent_, 0)) {
+      throw TException("TNonblockingServer::serve(): "
+                       "event_add() failed on server listen event");
+    }
+    GlobalOutput.printf("TNonblocking: IO thread #%d registered for listen.",
+                        number_);
+  }
+
+  createNotificationPipe();
+
+  // Create an event to be notified when a task finishes
+  event_set(&notificationEvent_,
+            getNotificationRecvFD(),
+            EV_READ | EV_PERSIST,
+            TNonblockingIOThread::notifyHandler,
+            this);
+
+  // Attach to the base
+  event_base_set(eventBase_, &notificationEvent_);
+
+  // Add the event and start up the server
+  if (-1 == event_add(&notificationEvent_, 0)) {
+    throw TException("TNonblockingServer::serve(): "
+                     "event_add() failed on task-done notification event");
+  }
+  GlobalOutput.printf("TNonblocking: IO thread #%d registered for notify.",
+                      number_);
+}
+
+bool TNonblockingIOThread::notify(TNonblockingServer::TConnection* conn) {
+  int fd = getNotificationSendFD();
+  if (fd < 0) {
+    return false;
+  }
+
+  const int kSize = sizeof(conn);
+  if (send(fd, const_cast_sockopt(&conn), kSize, 0) != kSize) {
+    return false;
+  }
+
+  return true;
+}
+
+/* static */
+void TNonblockingIOThread::notifyHandler(evutil_socket_t fd, short which, void* v) {
+  TNonblockingIOThread* ioThread = (TNonblockingIOThread*) v;
+  assert(ioThread);
+  (void)which;
+
+  while (true) {
+    TNonblockingServer::TConnection* connection = 0;
+    const int kSize = sizeof(connection);
+    int nBytes = recv(fd, cast_sockopt(&connection), kSize, 0);
+    if (nBytes == kSize) {
+      if (connection == NULL) {
+        // this is the command to stop our thread, exit the handler!
+        return;
+      }
+      connection->transition();
+    } else if (nBytes > 0) {
+      // throw away these bytes and hope that next time we get a solid read
+      GlobalOutput.printf("notifyHandler: Bad read of %d bytes, wanted %d",
+                          nBytes, kSize);
+      ioThread->breakLoop(true);
+      return;
+    } else if (nBytes == 0) {
+      GlobalOutput.printf("notifyHandler: Notify socket closed!");
+      // exit the loop
+      break;
+    } else { // nBytes < 0
+      if (errno != EWOULDBLOCK && errno != EAGAIN) {
+          GlobalOutput.perror(
+            "TNonblocking: notifyHandler read() failed: ", errno);
+          ioThread->breakLoop(true);
+          return;
+      }
+      // exit the loop
+      break;
+    }
+  }
+}
+
+void TNonblockingIOThread::breakLoop(bool error) {
+  if (error) {
+    GlobalOutput.printf(
+      "TNonblockingServer: IO thread #%d exiting with error.", number_);
+    // TODO: figure out something better to do here, but for now kill the
+    // whole process.
+    GlobalOutput.printf("TNonblockingServer: aborting process.");
+    ::abort();
+  }
+
+  // sets a flag so that the loop exits on the next event
+  event_base_loopbreak(eventBase_);
+
+  // event_base_loopbreak() only causes the loop to exit the next time
+  // it wakes up.  We need to force it to wake up, in case there are
+  // no real events it needs to process.
+  //
+  // If we're running in the same thread, we can't use the notify(0)
+  // mechanism to stop the thread, but happily if we're running in the
+  // same thread, this means the thread can't be blocking in the event
+  // loop either.
+  if (!Thread::is_current(threadId_)) {
+    notify(NULL);
+  }
+}
+
+void TNonblockingIOThread::setCurrentThreadHighPriority(bool value) {
+#ifdef HAVE_SCHED_H
+  // Start out with a standard, low-priority setup for the sched params.
+  struct sched_param sp;
+  bzero((void*) &sp, sizeof(sp));
+  int policy = SCHED_OTHER;
+
+  // If desired, set up high-priority sched params structure.
+  if (value) {
+    // FIFO scheduler, ranked above default SCHED_OTHER queue
+    policy = SCHED_FIFO;
+    // The priority only compares us to other SCHED_FIFO threads, so we
+    // just pick a random priority halfway between min & max.
+    const int priority = (sched_get_priority_max(policy) +
+                          sched_get_priority_min(policy)) / 2;
+
+    sp.sched_priority = priority;
+  }
+
+  // Actually set the sched params for the current thread.
+  if (0 == pthread_setschedparam(pthread_self(), policy, &sp)) {
+    GlobalOutput.printf(
+      "TNonblocking: IO Thread #%d using high-priority scheduler!", number_);
+  } else {
+    GlobalOutput.perror("TNonblocking: pthread_setschedparam(): ", errno);
+  }
+#endif
+}
+
+void TNonblockingIOThread::run() {
+  threadId_ = Thread::get_current();
+
+  assert(eventBase_ == 0);
+  eventBase_ = event_base_new();
+
+  // Print some libevent stats
+  if (number_ == 0) {
+    GlobalOutput.printf("TNonblockingServer: using libevent %s method %s",
+            event_get_version(),
+            event_base_get_method(eventBase_));
+  }
+
+
+  registerEvents();
+
+  GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...",
+                      number_);
+
+  if (useHighPriority_) {
+    setCurrentThreadHighPriority(true);
+  }
+
+  // Run libevent engine, never returns, invokes calls to eventHandler
+  event_base_loop(eventBase_, 0);
+
+  if (useHighPriority_) {
+    setCurrentThreadHighPriority(false);
+  }
+
+  // cleans up our registered events
+  cleanupEvents();
+
+  GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!",
+    number_);
+}
+
+void TNonblockingIOThread::cleanupEvents() {
+  // stop the listen socket, if any
+  if (listenSocket_ >= 0) {
+    if (event_del(&serverEvent_) == -1) {
+      GlobalOutput.perror("TNonblockingIOThread::stop() event_del: ", errno);
+    }
+  }
+
+  event_del(&notificationEvent_);
+}
+
+
+void TNonblockingIOThread::stop() {
+  // This should cause the thread to fall out of its event loop ASAP.
+  breakLoop(false);
+}
+
+void TNonblockingIOThread::join() {
+  // If this was a thread created by a factory (not the thread that called
+  // serve()), we join() it to make sure we shut down fully.
+  if (thread_) {
+    try {
+      // Note that it is safe to both join() ourselves twice, as well as join
+      // the current thread as the pthread implementation checks for deadlock.
+      thread_->join();
+    } catch(...) {
+      // swallow everything
+    }
+  }
+}
+
+}}} // apache::thrift::server