cpp: TNonBlockingServer overload handling and optimizations

- Establish a mechanism for TNonBlockingServer to handle overloads by
  limiting the number of connections accepted or in-process.

- Provide a framework for further work in handling server overloads.

- Limit memory consumption of connection object pool.

- Drop connections when overloaded.

- Add overload-handling behavior allowing pending tasks to be dropped
  from the front of the task queue (short of the ability to terminate
  running tasks, these are the oldest tasks in the system and thus the
  most likely to be beyond their freshness date).  This reduces the
  chance of spending valuable CPU time processing a request that the
  client has already timed out.

- Uses a single persistent pipe() to communicate task completion instead
  of constructing and monitoring a new socketpair() for every task in
  the system.

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@920664 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp
index abfcf6e..7bba0e6 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/concurrency/ThreadManager.cpp
@@ -117,6 +117,8 @@
 
   void remove(shared_ptr<Runnable> task);
 
+  shared_ptr<Runnable> removeNextPending();
+
 private:
   void stopImpl(bool join);
 
@@ -163,6 +165,10 @@
     }
   }
 
+  shared_ptr<Runnable> getRunnable() {
+    return runnable_;
+  }
+
  private:
   shared_ptr<Runnable> runnable_;
   friend class ThreadManager::Worker;
@@ -458,6 +464,22 @@
   }
 }
 
+boost::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
+  Guard g(mutex_);
+  if (state_ != ThreadManager::STARTED) {
+    throw IllegalStateException();
+  }
+
+  if (tasks_.empty()) {
+    return boost::shared_ptr<Runnable>();
+  }
+
+  shared_ptr<ThreadManager::Task> task = tasks_.front();
+  tasks_.pop();
+  
+  return task->getRunnable();
+}
+
 class SimpleThreadManager : public ThreadManager::Impl {
 
  public:
diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h
index 6e5a178..cbf08c0 100644
--- a/lib/cpp/src/concurrency/ThreadManager.h
+++ b/lib/cpp/src/concurrency/ThreadManager.h
@@ -148,6 +148,13 @@
    */
   virtual void remove(boost::shared_ptr<Runnable> task) = 0;
 
+  /**
+   * Remove the next pending task which would be run.
+   *
+   * @return the task removed.
+   */
+  virtual boost::shared_ptr<Runnable> removeNextPending() = 0;
+
   static boost::shared_ptr<ThreadManager> newThreadManager();
 
   /**
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 5375387..a9c1c15 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -45,11 +45,11 @@
   Task(boost::shared_ptr<TProcessor> processor,
        boost::shared_ptr<TProtocol> input,
        boost::shared_ptr<TProtocol> output,
-       int taskHandle) :
+       TConnection* connection) :
     processor_(processor),
     input_(input),
     output_(output),
-    taskHandle_(taskHandle) {}
+    connection_(connection) {}
 
   void run() {
     try {
@@ -66,21 +66,21 @@
       cerr << "TNonblockingServer uncaught exception." << endl;
     }
 
-    // Signal completion back to the libevent thread via a socketpair
-    int8_t b = 0;
-    if (-1 == send(taskHandle_, &b, sizeof(int8_t), 0)) {
-      GlobalOutput.perror("TNonblockingServer::Task: send ", errno);
+    // Signal completion back to the libevent thread via a pipe
+    if (!connection_->notifyServer()) {
+      throw TException("TNonblockingServer::Task::run: failed write on notify pipe");
     }
-    if (-1 == ::close(taskHandle_)) {
-      GlobalOutput.perror("TNonblockingServer::Task: close, possible resource leak ", errno);
-    }
+  }
+
+  TConnection* getTConnection() {
+    return connection_;
   }
 
  private:
   boost::shared_ptr<TProcessor> processor_;
   boost::shared_ptr<TProtocol> input_;
   boost::shared_ptr<TProtocol> output_;
-  int taskHandle_;
+  TConnection* connection_;
 };
 
 void TConnection::init(int socket, short eventFlags, TNonblockingServer* s) {
@@ -99,8 +99,6 @@
   socketState_ = SOCKET_RECV;
   appState_ = APP_INIT;
 
-  taskHandle_ = -1;
-
   // Set flags, which also registers the event
   setFlags(eventFlags);
 
@@ -256,37 +254,20 @@
     outputTransport_->getWritePtr(4);
     outputTransport_->wroteBytes(4);
 
+    server_->incrementActiveProcessors();
+
     if (server_->isThreadPoolProcessing()) {
       // We are setting up a Task to do this work and we will wait on it
-      int sv[2];
-      if (-1 == socketpair(AF_LOCAL, SOCK_STREAM, 0, sv)) {
-        GlobalOutput.perror("TConnection::socketpair() failed ", errno);
-        // Now we will fall through to the APP_WAIT_TASK block with no response
-      } else {
-        // Create task and dispatch to the thread manager
-        boost::shared_ptr<Runnable> task =
-          boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
-                                               inputProtocol_,
-                                               outputProtocol_,
-                                               sv[1]));
-        // The application is now waiting on the task to finish
-        appState_ = APP_WAIT_TASK;
 
-        // Create an event to be notified when the task finishes
-        event_set(&taskEvent_,
-                  taskHandle_ = sv[0],
-                  EV_READ,
-                  TConnection::taskHandler,
-                  this);
+      // Create task and dispatch to the thread manager
+      boost::shared_ptr<Runnable> task =
+        boost::shared_ptr<Runnable>(new Task(server_->getProcessor(),
+                                             inputProtocol_,
+                                             outputProtocol_,
+                                             this));
+      // The application is now waiting on the task to finish
+      appState_ = APP_WAIT_TASK;
 
-        // Attach to the base
-        event_base_set(server_->getEventBase(), &taskEvent_);
-
-        // Add the event and start up the server
-        if (-1 == event_add(&taskEvent_, 0)) {
-          GlobalOutput("TNonblockingServer::serve(): coult not event_add");
-          return;
-        }
         try {
           server_->addTask(task);
         } catch (IllegalStateException & ise) {
@@ -295,26 +276,28 @@
           close();
         }
 
-        // Set this connection idle so that libevent doesn't process more
-        // data on it while we're still waiting for the threadmanager to
-        // finish this task
-        setIdle();
-        return;
-      }
+      // Set this connection idle so that libevent doesn't process more
+      // data on it while we're still waiting for the threadmanager to
+      // finish this task
+      setIdle();
+      return;
     } else {
       try {
         // Invoke the processor
         server_->getProcessor()->process(inputProtocol_, outputProtocol_);
       } catch (TTransportException &ttx) {
         GlobalOutput.printf("TTransportException: Server::process() %s", ttx.what());
+        server_->decrementActiveProcessors();
         close();
         return;
       } catch (TException &x) {
         GlobalOutput.printf("TException: Server::process() %s", x.what());
+        server_->decrementActiveProcessors();
         close();
         return;
       } catch (...) {
         GlobalOutput.printf("Server::process() unknown exception");
+        server_->decrementActiveProcessors();
         close();
         return;
       }
@@ -328,6 +311,7 @@
     // into the outputTransport_, so we grab its contents and place them into
     // the writeBuffer_ for actual writing by the libevent thread
 
+    server_->decrementActiveProcessors();
     // Get the result of the operation
     outputTransport_->getBuffer(&writeBuffer_, &writeBufferSize_);
 
@@ -425,6 +409,11 @@
 
     return;
 
+  case APP_CLOSE_CONNECTION:
+    server_->decrementActiveProcessors();
+    close();
+    return;
+
   default:
     GlobalOutput.printf("Unexpected Application State %d", appState_);
     assert(0);
@@ -453,7 +442,7 @@
     return;
   }
 
-  /**
+  /*
    * event_set:
    *
    * Prepares the event structure &event to be used in future calls to
@@ -499,10 +488,10 @@
   }
 
   // Close the socket
-  if (socket_ > 0) {
+  if (socket_ >= 0) {
     ::close(socket_);
   }
-  socket_ = 0;
+  socket_ = -1;
 
   // close any factory produced transports
   factoryInputTransport_->close();
@@ -512,7 +501,7 @@
   server_->returnConnection(this);
 }
 
-void TConnection::checkIdleBufferMemLimit(uint32_t limit) {
+void TConnection::checkIdleBufferMemLimit(size_t limit) {
   if (readBufferSize_ > limit) {
     readBufferSize_ = limit;
     readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
@@ -572,7 +561,21 @@
   // one, this helps us to avoid having to go back into the libevent engine so
   // many times
   while ((clientSocket = accept(fd, &addr, &addrLen)) != -1) {
-
+    // If we're overloaded, take action here
+    if (overloadAction_ != T_OVERLOAD_NO_ACTION && serverOverloaded()) {
+      nConnectionsDropped_++;
+      nTotalConnectionsDropped_++;
+      if (overloadAction_ == T_OVERLOAD_CLOSE_ON_ACCEPT) {
+        close(clientSocket);
+        continue;
+      } else if (overloadAction_ == T_OVERLOAD_DRAIN_TASK_QUEUE) {
+        if (!drainPendingTask()) {
+          // Nothing left to discard, so we drop connection instead.
+          close(clientSocket);
+          continue;
+        }
+      }
+    }
     // Explicitly set this socket to NONBLOCK mode
     int flags;
     if ((flags = fcntl(clientSocket, F_GETFL, 0)) < 0 ||
@@ -707,6 +710,13 @@
   serverSocket_ = s;
 }
 
+void TNonblockingServer::createNotificationPipe() {
+  if (pipe(notificationPipeFDs_) != 0) {
+    GlobalOutput.perror("TNonblockingServer::createNotificationPipe ", errno);
+      throw TException("can't create notification pipe");
+  }
+}
+
 /**
  * Register the core libevent events onto the proper base.
  */
@@ -732,6 +742,59 @@
   if (-1 == event_add(&serverEvent_, 0)) {
     throw TException("TNonblockingServer::serve(): coult not event_add");
   }
+  if (threadPoolProcessing_) {
+    // Create an event to be notified when a task finishes
+    event_set(&notificationEvent_,
+              getNotificationRecvFD(),
+              EV_READ | EV_PERSIST,
+              TConnection::taskHandler,
+              this);
+    
+    // Attach to the base
+    event_base_set(eventBase_, &notificationEvent_);
+
+    // Add the event and start up the server
+    if (-1 == event_add(&notificationEvent_, 0)) {
+      throw TException("TNonblockingServer::serve(): notification event_add fail");
+    }
+  }
+}
+
+bool  TNonblockingServer::serverOverloaded() {
+  size_t activeConnections = numTConnections_ - connectionStack_.size();
+  if (numActiveProcessors_ > maxActiveProcessors_ ||
+      activeConnections > maxConnections_) {
+    if (!overloaded_) {
+      GlobalOutput.printf("thrift non-blocking server overload condition");
+      overloaded_ = true;
+    }
+  } else {
+    if (overloaded_ &&
+        (numActiveProcessors_ <= overloadHysteresis_ * maxActiveProcessors_) &&
+        (activeConnections <= overloadHysteresis_ * maxConnections_)) {
+      GlobalOutput.printf("thrift non-blocking server overload ended; %u dropped (%llu total)",
+                          nConnectionsDropped_, nTotalConnectionsDropped_);
+      nConnectionsDropped_ = 0;
+      overloaded_ = false;
+    }
+  }
+
+  return overloaded_;
+}
+
+bool TNonblockingServer::drainPendingTask() {
+  if (threadManager_) {
+    boost::shared_ptr<Runnable> task = threadManager_->removeNextPending();
+    if (task) {
+      TConnection* connection =
+        static_cast<TConnection::Task*>(task.get())->getTConnection();
+      assert(connection && connection->getServer()
+             && connection->getState() == APP_WAIT_TASK);
+      connection->forceClose();
+      return true;
+    }
+  }
+  return false;
 }
 
 /**
@@ -742,6 +805,11 @@
   // Init socket
   listenSocket();
 
+  if (threadPoolProcessing_) {
+    // Init task completion notification pipe
+    createNotificationPipe();
+  }
+
   // Initialize libevent core
   registerEvents(static_cast<event_base*>(event_init()));
 
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index 8506507..2650dd1 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -24,6 +24,7 @@
 #include <server/TServer.h>
 #include <transport/TBufferTransports.h>
 #include <concurrency/ThreadManager.h>
+#include <climits>
 #include <stack>
 #include <string>
 #include <errno.h>
@@ -50,48 +51,96 @@
  * operations hardcoded for use with select.
  *
  */
+
+
+/// Overload condition actions.
+enum TOverloadAction {
+  T_OVERLOAD_NO_ACTION,        ///< Don't handle overload */
+  T_OVERLOAD_CLOSE_ON_ACCEPT,  ///< Drop new connections immediately */
+  T_OVERLOAD_DRAIN_TASK_QUEUE  ///< Drop some tasks from head of task queue */
+};
+
 class TNonblockingServer : public TServer {
  private:
-
-  // Listen backlog
+  /// Listen backlog
   static const int LISTEN_BACKLOG = 1024;
 
-  // Default limit on size of idle connection pool
+  /// Default limit on size of idle connection pool
   static const size_t CONNECTION_STACK_LIMIT = 1024;
 
-  // Maximum size of buffer allocated to idle connection
+  /// Maximum size of buffer allocated to idle connection
   static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
 
-  // Server socket file descriptor
+  /// Default limit on total number of connected sockets
+  static const int MAX_CONNECTIONS = INT_MAX;
+
+  /// Default limit on connections in handler/task processing
+  static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
+
+  /// Server socket file descriptor
   int serverSocket_;
 
-  // Port server runs on
+  /// Port server runs on
   int port_;
 
-  // For processing via thread pool, may be NULL
+  /// For processing via thread pool, may be NULL
   boost::shared_ptr<ThreadManager> threadManager_;
 
-  // Is thread pool processing?
+  /// Is thread pool processing?
   bool threadPoolProcessing_;
 
-  // The event base for libevent
+  /// The event base for libevent
   event_base* eventBase_;
 
-  // Event struct, for use with eventBase_
+  /// Event struct, used with eventBase_ for connection events
   struct event serverEvent_;
 
-  // Number of TConnection object we've created
+  /// Event struct, used with eventBase_ for task completion notification
+  struct event notificationEvent_;
+
+  /// Number of TConnection object we've created
   size_t numTConnections_;
 
-  // Limit for how many TConnection objects to cache
+  /// Number of Connections processing or waiting to process 
+  size_t numActiveProcessors_;
+
+  /// Limit for how many TConnection objects to cache
   size_t connectionStackLimit_;
 
+  /// Limit for number of connections processing or waiting to process
+  size_t maxActiveProcessors_;
+
+  /// Limit for number of open connections
+  size_t maxConnections_;
+
+  /**
+   * Hysteresis for overload state.  This is the fraction of the overload
+   * value that needs to be reached before the overload state is cleared;
+   * must be <= 1.0.
+   */
+  double overloadHysteresis_;
+
+  /// Action to take when we're overloaded.
+  TOverloadAction overloadAction_;
+
   /**
    * Max read buffer size for an idle connection.  When we place an idle
    * TConnection into connectionStack_, we insure that its read buffer is
    * reduced to this size to insure that idle connections don't hog memory.
    */
-  uint32_t idleBufferMemLimit_;
+  size_t idleBufferMemLimit_;
+
+  /// Set if we are currently in an overloaded state.
+  bool overloaded_;
+
+  /// Count of connections dropped since overload started
+  uint32_t nConnectionsDropped_;
+
+  /// Count of connections dropped on overload since server started
+  uint64_t nTotalConnectionsDropped_;
+
+  /// File descriptors for pipe used for task completion notification.
+  int notificationPipeFDs_[2];
 
   /**
    * This is a stack of all the objects that have been created but that
@@ -101,6 +150,14 @@
    */
   std::stack<TConnection*> connectionStack_;
 
+  /**
+   * Called when server socket had something happen.  We accept all waiting
+   * client connections on listen socket fd and assign TConnection objects
+   * to handle those requests.
+   *
+   * @param fd the listen socket.
+   * @param which the event flag that triggered the handler.
+   */
   void handleEvent(int fd, short which);
 
  public:
@@ -112,8 +169,16 @@
     threadPoolProcessing_(false),
     eventBase_(NULL),
     numTConnections_(0),
+    numActiveProcessors_(0),
     connectionStackLimit_(CONNECTION_STACK_LIMIT),
-    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {}
+    maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
+    maxConnections_(MAX_CONNECTIONS),
+    overloadHysteresis_(0.8),
+    overloadAction_(T_OVERLOAD_NO_ACTION),
+    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
+    overloaded_(false),
+    nConnectionsDropped_(0),
+    nTotalConnectionsDropped_(0) {}
 
   TNonblockingServer(boost::shared_ptr<TProcessor> processor,
                      boost::shared_ptr<TProtocolFactory> protocolFactory,
@@ -125,8 +190,16 @@
     threadManager_(threadManager),
     eventBase_(NULL),
     numTConnections_(0),
+    numActiveProcessors_(0),
     connectionStackLimit_(CONNECTION_STACK_LIMIT),
-    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
+    maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
+    maxConnections_(MAX_CONNECTIONS),
+    overloadHysteresis_(0.8),
+    overloadAction_(T_OVERLOAD_NO_ACTION),
+    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
+    overloaded_(false),
+    nConnectionsDropped_(0),
+    nTotalConnectionsDropped_(0) {
     setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setInputProtocolFactory(protocolFactory);
@@ -142,13 +215,21 @@
                      int port,
                      boost::shared_ptr<ThreadManager> threadManager = boost::shared_ptr<ThreadManager>()) :
     TServer(processor),
-    serverSocket_(0),
+    serverSocket_(-1),
     port_(port),
     threadManager_(threadManager),
     eventBase_(NULL),
     numTConnections_(0),
+    numActiveProcessors_(0),
     connectionStackLimit_(CONNECTION_STACK_LIMIT),
-    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
+    maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
+    maxConnections_(MAX_CONNECTIONS),
+    overloadHysteresis_(0.8),
+    overloadAction_(T_OVERLOAD_NO_ACTION),
+    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
+    overloaded_(false),
+    nConnectionsDropped_(0),
+    nTotalConnectionsDropped_(0)  {
     setInputTransportFactory(inputTransportFactory);
     setOutputTransportFactory(outputTransportFactory);
     setInputProtocolFactory(inputProtocolFactory);
@@ -197,23 +278,151 @@
     return eventBase_;
   }
 
+  /// Increment our count of the number of connected sockets.
   void incrementNumConnections() {
     ++numTConnections_;
   }
 
+  /// Decrement our count of the number of connected sockets.
   void decrementNumConnections() {
     --numTConnections_;
   }
 
-  size_t getNumConnections() {
+  /**
+   * Return the count of sockets currently connected to.
+   *
+   * @return count of connected sockets.
+   */
+  size_t getNumConnections() const {
     return numTConnections_;
   }
 
-  size_t getNumIdleConnections() {
+  /**
+   * Return the count of connection objects allocated but not in use.
+   *
+   * @return count of idle connection objects.
+   */
+  size_t getNumIdleConnections() const {
     return connectionStack_.size();
   }
 
   /**
+   * Return count of number of connections which are currently processing.
+   * This is defined as a connection where all data has been received and
+   * either assigned a task (when threading) or passed to a handler (when
+   * not threading), and where the handler has not yet returned.
+   *
+   * @return # of connections currently processing.
+   */
+  size_t getNumActiveProcessors() const {
+    return numActiveProcessors_;
+  }
+
+  /// Increment the count of connections currently processing.
+  void incrementActiveProcessors() {
+    ++numActiveProcessors_;
+  }
+
+  /// Decrement the count of connections currently processing.
+  void decrementActiveProcessors() {
+    if (numActiveProcessors_ > 0) {
+      --numActiveProcessors_;
+    }
+  }
+
+  /**
+   * Get the maximum # of connections allowed before overload.
+   *
+   * @return current setting.
+   */
+  size_t getMaxConnections() const {
+    return maxConnections_;
+  }
+
+  /**
+   * Set the maximum # of connections allowed before overload.
+   *
+   * @param maxConnections new setting for maximum # of connections.
+   */
+  void setMaxConnections(size_t maxConnections) {
+    maxConnections_ = maxConnections;
+  }
+
+  /**
+   * Get the maximum # of connections waiting in handler/task before overload.
+   *
+   * @return current setting.
+   */
+  size_t getMaxActiveProcessors() const {
+    return maxActiveProcessors_;
+  }
+
+  /**
+   * Set the maximum # of connections waiting in handler/task before overload.
+   *
+   * @param maxActiveProcessors new setting for maximum # of active processes.
+   */
+  void setMaxActiveProcessors(size_t maxActiveProcessors) {
+    maxActiveProcessors_ = maxActiveProcessors;
+  }
+
+  /**
+   * Get fraction of maximum limits before an overload condition is cleared.
+   *
+   * @return hysteresis fraction
+   */
+  double getOverloadHysteresis() const {
+    return overloadHysteresis_;
+  }
+
+  /**
+   * Set fraction of maximum limits before an overload condition is cleared.
+   * A good value would probably be between 0.5 and 0.9.
+   *
+   * @param hysteresisFraction fraction <= 1.0.
+   */
+  void setOverloadHysteresis(double hysteresisFraction) {
+    if (hysteresisFraction <= 1.0 && hysteresisFraction > 0.0) {
+      overloadHysteresis_ = hysteresisFraction;
+    }
+  }
+
+  /**
+   * Get the action the server will take on overload.
+   *
+   * @return a TOverloadAction enum value for the currently set action.
+   */
+  TOverloadAction getOverloadAction() const {
+    return overloadAction_;
+  }
+
+  /**
+   * Set the action the server is to take on overload.
+   *
+   * @param overloadAction a TOverloadAction enum value for the action.
+   */
+  void setOverloadAction(TOverloadAction overloadAction) {
+    overloadAction_ = overloadAction;
+  }
+
+  /**
+   * Determine if the server is currently overloaded.
+   * This function checks the maximums for open connections and connections
+   * currently in processing, and sets an overload condition if they are
+   * exceeded.  The overload will persist until both values are below the
+   * current hysteresis fraction of their maximums.
+   *
+   * @return true if an overload condition exists, false if not.
+   */
+  bool serverOverloaded();
+
+  /** Pop and discard next task on threadpool wait queue.
+   *
+   * @return true if a task was discarded, false if the wait queue was empty.
+   */
+  bool drainPendingTask();
+
+  /**
    * Get the maximum limit of memory allocated to idle TConnection objects.
    *
    * @return # bytes beyond which we will shrink buffers when idle.
@@ -233,44 +442,105 @@
     idleBufferMemLimit_ = limit;
   }
 
+  /**
+   * Return an initialized connection object.  Creates or recovers from
+   * pool a TConnection and initializes it with the provided socket FD
+   * and flags.
+   *
+   * @param socket FD of socket associated with this connection.
+   * @param flags initial lib_event flags for this connection.
+   * @return pointer to initialized TConnection object.
+   */
   TConnection* createConnection(int socket, short flags);
 
+  /**
+   * Returns a connection to pool or deletion.  If the connection pool
+   * (a stack) isn't full, place the connection object on it, otherwise
+   * just delete it.
+   *
+   * @param connection the TConection being returned.
+   */
   void returnConnection(TConnection* connection);
 
+  /**
+   * C-callable event handler for listener events.  Provides a callback
+   * that libevent can understand which invokes server->handleEvent().
+   *
+   * @param fd the descriptor the event occured on.
+   * @param which the flags associated with the event.
+   * @param v void* callback arg where we placed TNonblockingServer's "this".
+   */
   static void eventHandler(int fd, short which, void* v) {
     ((TNonblockingServer*)v)->handleEvent(fd, which);
   }
 
+  /// Creates a socket to listen on and binds it to the local port.
   void listenSocket();
 
+  /**
+   * Takes a socket created by listenSocket() and sets various options on it
+   * to prepare for use in the server.
+   *
+   * @param fd descriptor of socket to be initialized/
+   */
   void listenSocket(int fd);
 
+  /// Create the pipe used to notify I/O process of task completion.
+  void createNotificationPipe();
+
+  /**
+   * Get notification pipe send descriptor.
+   *
+   * @return write fd for pipe.
+   */
+  int getNotificationSendFD() const {
+    return notificationPipeFDs_[1];
+  }
+
+  /**
+   * Get notification pipe receive descriptor.
+   *
+   * @return read fd of pipe.
+   */
+  int getNotificationRecvFD() const {
+    return notificationPipeFDs_[0];
+  }
+
+  /**
+   * Register the core libevent events onto the proper base.
+   *
+   * @param base pointer to the event base to be initialized.
+   */
   void registerEvents(event_base* base);
 
+  /**
+   * Main workhorse function, starts up the server listening on a port and
+   * loops over the libevent handler.
+   */
   void serve();
 };
 
-/**
- * Two states for sockets, recv and send mode
- */
+/// Two states for sockets, recv and send mode
 enum TSocketState {
   SOCKET_RECV,
   SOCKET_SEND
 };
 
 /**
- * Four states for the nonblocking servr:
+ * Five states for the nonblocking servr:
  *  1) initialize
  *  2) read 4 byte frame size
  *  3) read frame of data
  *  4) send back data (if any)
+ *  5) force immediate connection close
  */
 enum TAppState {
   APP_INIT,
   APP_READ_FRAME_SIZE,
   APP_READ_REQUEST,
   APP_WAIT_TASK,
-  APP_SEND_RESULT
+  APP_SEND_RESULT,
+  APP_CLOSE_CONNECTION
 };
 
 /**
@@ -280,108 +550,120 @@
 class TConnection {
  private:
 
-  class Task;
+  /// Starting size for new connection buffer
+  static const int STARTING_CONNECTION_BUFFER_SIZE = 1024;
 
-  // Server handle
+  /// Server handle
   TNonblockingServer* server_;
 
-  // Socket handle
+  /// Socket handle
   int socket_;
 
-  // Libevent object
+  /// Libevent object
   struct event event_;
 
-  // Libevent flags
+  /// Libevent flags
   short eventFlags_;
 
-  // Socket mode
+  /// Socket mode
   TSocketState socketState_;
 
-  // Application state
+  /// Application state
   TAppState appState_;
 
-  // How much data needed to read
+  /// How much data needed to read
   uint32_t readWant_;
 
-  // Where in the read buffer are we
+  /// Where in the read buffer are we
   uint32_t readBufferPos_;
 
-  // Read buffer
+  /// Read buffer
   uint8_t* readBuffer_;
 
-  // Read buffer size
+  /// Read buffer size
   uint32_t readBufferSize_;
 
-  // Write buffer
+  /// Write buffer
   uint8_t* writeBuffer_;
 
-  // Write buffer size
+  /// Write buffer size
   uint32_t writeBufferSize_;
 
-  // How far through writing are we?
+  /// How far through writing are we?
   uint32_t writeBufferPos_;
 
-  // How many times have we read since our last buffer reset?
+  /// How many times have we read since our last buffer reset?
   uint32_t numReadsSinceReset_;
 
-  // How many times have we written since our last buffer reset?
+  /// How many times have we written since our last buffer reset?
   uint32_t numWritesSinceReset_;
 
-  // Task handle
+  /// Task handle
   int taskHandle_;
 
-  // Task event
+  /// Task event
   struct event taskEvent_;
 
-  // Transport to read from
+  /// Transport to read from
   boost::shared_ptr<TMemoryBuffer> inputTransport_;
 
-  // Transport that processor writes to
+  /// Transport that processor writes to
   boost::shared_ptr<TMemoryBuffer> outputTransport_;
 
-  // extra transport generated by transport factory (e.g. BufferedRouterTransport)
+  /// extra transport generated by transport factory (e.g. BufferedRouterTransport)
   boost::shared_ptr<TTransport> factoryInputTransport_;
   boost::shared_ptr<TTransport> factoryOutputTransport_;
 
-  // Protocol decoder
+  /// Protocol decoder
   boost::shared_ptr<TProtocol> inputProtocol_;
 
-  // Protocol encoder
+  /// Protocol encoder
   boost::shared_ptr<TProtocol> outputProtocol_;
 
-  // Go into read mode
+  /// Go into read mode
   void setRead() {
     setFlags(EV_READ | EV_PERSIST);
   }
 
-  // Go into write mode
+  /// Go into write mode
   void setWrite() {
     setFlags(EV_WRITE | EV_PERSIST);
   }
 
-  // Set socket idle
+  /// Set socket idle
   void setIdle() {
     setFlags(0);
   }
 
-  // Set event flags
+  /**
+   * Set event flags for this connection.
+   *
+   * @param eventFlags flags we pass to libevent for the connection.
+   */
   void setFlags(short eventFlags);
 
-  // Libevent handlers
+  /**
+   * Libevent handler called (via our static wrapper) when the connection
+   * socket had something happen.  Rather than use the flags libevent passed,
+   * we use the connection state to determine whether we need to read or
+   * write the socket.
+   */
   void workSocket();
 
-  // Close this client and reset
+  /// Close this connection and free or reset its resources.
   void close();
 
  public:
 
-  // Constructor
+  class Task;
+
+  /// Constructor
   TConnection(int socket, short eventFlags, TNonblockingServer *s) {
-    readBuffer_ = (uint8_t*)std::malloc(1024);
+    readBuffer_ = (uint8_t*)std::malloc(STARTING_CONNECTION_BUFFER_SIZE);
     if (readBuffer_ == NULL) {
       throw new apache::thrift::TException("Out of memory.");
     }
-    readBufferSize_ = 1024;
+    readBufferSize_ = STARTING_CONNECTION_BUFFER_SIZE;
 
     numReadsSinceReset_ = 0;
     numWritesSinceReset_ = 0;
@@ -405,29 +687,84 @@
    *
    * @param limit we limit buffer size to.
    */
-  void checkIdleBufferMemLimit(uint32_t limit);
+  void checkIdleBufferMemLimit(size_t limit);
 
-  // Initialize
+  /// Initialize
   void init(int socket, short eventFlags, TNonblockingServer *s);
 
-  // Transition into a new state
+  /**
+   * This is called when the application transitions from one state into
+   * another. This means that it has finished writing the data that it needed
+   * to, or finished receiving the data that it needed to.
+   */
   void transition();
 
-  // Handler wrapper
+  /**
+   * C-callable event handler for connection events.  Provides a callback
+   * that libevent can understand which invokes connection_->workSocket().
+   *
+   * @param fd the descriptor the event occured on.
+   * @param which the flags associated with the event.
+   * @param v void* callback arg where we placed TConnection's "this".
+   */
   static void eventHandler(int fd, short /* which */, void* v) {
     assert(fd == ((TConnection*)v)->socket_);
     ((TConnection*)v)->workSocket();
   }
 
-  // Handler wrapper for task block
-  static void taskHandler(int fd, short /* which */, void* v) {
-    assert(fd == ((TConnection*)v)->taskHandle_);
-    if (-1 == ::close(((TConnection*)v)->taskHandle_)) {
-      GlobalOutput.perror("TConnection::taskHandler close handle failed, resource leak ", errno);
+  /**
+   * C-callable event handler for signaling task completion.  Provides a
+   * callback that libevent can understand that will read a connection
+   * object's address from a pipe and call connection->transition() for
+   * that object.
+   *
+   * @param fd the descriptor the event occured on.
+   */
+  static void taskHandler(int fd, short /* which */, void* /* v */) {
+    TConnection* connection;
+    if (read(fd, (void*)&connection, sizeof(TConnection*))
+        != sizeof(TConnection*)) {
+      GlobalOutput.perror("TConnection::taskHandler read failed, resource leak", errno);
+      return;
     }
-    ((TConnection*)v)->transition();
+
+    connection->transition();
   }
 
+  /**
+   * Notification to server that processing has ended on this request.
+   * Can be called either when processing is completed or when a waiting
+   * task has been preemptively terminated (on overload).
+   *
+   * @return true if successful, false if unable to notify (check errno). 
+   */
+  bool notifyServer() {
+    TConnection* connection = this;
+    if (write(server_->getNotificationSendFD(), (const void*)&connection,
+             sizeof(TConnection*)) != sizeof(TConnection*)) {
+      return false;
+    }
+
+    return true;
+  }
+
+  /// Force connection shutdown for this connection.
+  void forceClose() {
+    appState_ = APP_CLOSE_CONNECTION;
+    if (!notifyServer()) {
+      throw TException("TConnection::forceClose: failed write on notify pipe");
+    }
+  }
+
+  /// return the server this connection was initialized for.
+  TNonblockingServer* getServer() {
+    return server_;
+  }
+
+  /// get state of connection.
+  TAppState getState() {
+    return appState_;
+  }
 };
 
 }}} // apache::thrift::server