diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp
index 52473c7..a02ad74 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/concurrency/ThreadManager.cpp
@@ -20,6 +20,7 @@
 #include "ThreadManager.h"
 #include "Exception.h"
 #include "Monitor.h"
+#include "Util.h"
 
 #include <boost/shared_ptr.hpp>
 
@@ -54,6 +55,7 @@
     workerMaxCount_(0),
     idleCount_(0),
     pendingTaskCountMax_(0),
+    expiredCount_(0),
     state_(ThreadManager::UNINITIALIZED),
     monitor_(&mutex_),
     maxMonitor_(&mutex_) {}
@@ -108,6 +110,13 @@
     return pendingTaskCountMax_;
   }
 
+  size_t expiredTaskCount() {
+    Synchronized s(monitor_);
+    size_t result = expiredCount_;
+    expiredCount_ = 0;
+    return result;
+  }
+
   void pendingTaskCountMax(const size_t value) {
     Synchronized s(monitor_);
     pendingTaskCountMax_ = value;
@@ -115,12 +124,16 @@
 
   bool canSleep();
 
-  void add(shared_ptr<Runnable> value, int64_t timeout);
+  void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration);
 
   void remove(shared_ptr<Runnable> task);
 
   shared_ptr<Runnable> removeNextPending();
 
+  void removeExpiredTasks();
+
+  void setExpireCallback(ExpireCallback expireCallback);
+
 private:
   void stopImpl(bool join);
 
@@ -128,6 +141,8 @@
   size_t workerMaxCount_;
   size_t idleCount_;
   size_t pendingTaskCountMax_;
+  size_t expiredCount_;
+  ExpireCallback expireCallback_;
 
   ThreadManager::STATE state_;
   shared_ptr<ThreadFactory> threadFactory_;
@@ -156,9 +171,10 @@
     COMPLETE
   };
 
-  Task(shared_ptr<Runnable> runnable) :
+  Task(shared_ptr<Runnable> runnable, int64_t expiration=0LL)  :
     runnable_(runnable),
-    state_(WAITING) {}
+    state_(WAITING),
+    expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {}
 
   ~Task() {}
 
@@ -173,10 +189,15 @@
     return runnable_;
   }
 
+  int64_t getExpireTime() const {
+    return expireTime_;
+  }
+
  private:
   shared_ptr<Runnable> runnable_;
   friend class ThreadManager::Worker;
   STATE state_;
+  int64_t expireTime_;
 };
 
 class ThreadManager::Worker: public Runnable {
@@ -262,6 +283,8 @@
         }
 
         if (active) {
+          manager_->removeExpiredTasks();
+
           if (!manager_->tasks_.empty()) {
             task = manager_->tasks_.front();
             manager_->tasks_.pop();
@@ -435,13 +458,16 @@
     return idMap_.find(id) == idMap_.end();
   }
 
-  void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout) {
+  void ThreadManager::Impl::add(shared_ptr<Runnable> value,
+                                int64_t timeout,
+                                int64_t expiration) {
     Guard g(mutex_);
 
     if (state_ != ThreadManager::STARTED) {
       throw IllegalStateException();
     }
 
+    removeExpiredTasks();
     if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
       if (canSleep() && timeout >= 0) {
         while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
@@ -453,7 +479,7 @@
       }
     }
 
-    tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
+    tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration)));
 
     // If idle thread is available notify it, otherwise all worker threads are
     // running and will get around to this task in time.
@@ -485,6 +511,34 @@
   return task->getRunnable();
 }
 
+void ThreadManager::Impl::removeExpiredTasks() {
+  int64_t now = 0LL; // we won't ask for the time untile we need it
+
+  // note that this loop breaks at the first non-expiring task
+  while (!tasks_.empty()) {
+    shared_ptr<ThreadManager::Task> task = tasks_.front();
+    if (task->getExpireTime() == 0LL) {
+      break;
+    }
+    if (now == 0LL) {
+      now = Util::currentTime();
+    }
+    if (task->getExpireTime() > now) {
+      break;
+    }
+    if (expireCallback_) {
+      expireCallback_(task->getRunnable());
+    }
+    tasks_.pop();
+    expiredCount_++;
+  }
+}
+
+
+void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
+  expireCallback_ = expireCallback;
+}
+
 class SimpleThreadManager : public ThreadManager::Impl {
 
  public:
diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h
index cbf08c0..95c4906 100644
--- a/lib/cpp/src/concurrency/ThreadManager.h
+++ b/lib/cpp/src/concurrency/ThreadManager.h
@@ -21,6 +21,7 @@
 #define _THRIFT_CONCURRENCY_THREADMANAGER_H_ 1
 
 #include <boost/shared_ptr.hpp>
+#include <tr1/functional>
 #include <sys/types.h>
 #include "Thread.h"
 
@@ -56,6 +57,9 @@
   ThreadManager() {}
 
  public:
+  class Task;
+  typedef std::tr1::function<void(boost::shared_ptr<Runnable>)> ExpireCallback;
+
   virtual ~ThreadManager() {}
 
   /**
@@ -125,6 +129,11 @@
   virtual size_t pendingTaskCountMax() const = 0;
 
   /**
+   * Gets the number of tasks which have been expired without being run.
+   */
+  virtual size_t expiredTaskCount() = 0;
+
+  /**
    * Adds a task to be executed at some time in the future by a worker thread.
    *
    * This method will block if pendingTaskCountMax() in not zero and pendingTaskCount()
@@ -138,10 +147,14 @@
    * is specified. Specific cases:
    * timeout = 0  : Wait forever to queue task.
    * timeout = -1 : Return immediately if pending task count exceeds specified max
+   * @param expiration when nonzero, the number of milliseconds the task is valid
+   * to be run; if exceeded, the task will be dropped off the queue and not run.
    *
    * @throws TooManyPendingTasksException Pending task count exceeds max pending task count
    */
-  virtual void add(boost::shared_ptr<Runnable>task, int64_t timeout=0LL) = 0;
+  virtual void add(boost::shared_ptr<Runnable>task,
+                   int64_t timeout=0LL,
+                   int64_t expiration=0LL) = 0;
 
   /**
    * Removes a pending task
@@ -155,6 +168,19 @@
    */
   virtual boost::shared_ptr<Runnable> removeNextPending() = 0;
 
+  /**
+   * Remove tasks from front of task queue that have expired.
+   */
+  virtual void removeExpiredTasks() = 0;
+
+  /**
+   * Set a callback to be called when a task is expired and not run.
+   *
+   * @param expireCallback a function called with the shared_ptr<Runnable> for
+   * the expired task.
+   */
+  virtual void setExpireCallback(ExpireCallback expireCallback) = 0;
+
   static boost::shared_ptr<ThreadManager> newThreadManager();
 
   /**
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index eb071a9..649994d 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -770,6 +770,16 @@
   }
 }
 
+void TNonblockingServer::setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
+  threadManager_ = threadManager;
+  if (threadManager != NULL) {
+    threadManager->setExpireCallback(std::tr1::bind(&TNonblockingServer::expireClose, this, std::tr1::placeholders::_1));
+    threadPoolProcessing_ = true;
+  } else {
+    threadPoolProcessing_ = false;
+  }
+}
+
 bool  TNonblockingServer::serverOverloaded() {
   size_t activeConnections = numTConnections_ - connectionStack_.size();
   if (numActiveProcessors_ > maxActiveProcessors_ ||
@@ -807,6 +817,14 @@
   return false;
 }
 
+void TNonblockingServer::expireClose(boost::shared_ptr<Runnable> task) {
+  TConnection* connection =
+    static_cast<TConnection::Task*>(task.get())->getTConnection();
+  assert(connection && connection->getServer()
+	 && connection->getState() == APP_WAIT_TASK);
+  connection->forceClose();
+}
+
 /**
  * Main workhorse function, starts up the server listening on a port and
  * loops over the libevent handler.
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index cf5aa18..41a2bf5 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -113,6 +113,9 @@
   /// Limit for number of open connections
   size_t maxConnections_;
 
+  /// Time in milliseconds before an unperformed task expires (0 == infinite).
+  int64_t taskExpireTime_;
+
   /**
    * Hysteresis for overload state.  This is the fraction of the overload
    * value that needs to be reached before the overload state is cleared;
@@ -173,6 +176,7 @@
     connectionStackLimit_(CONNECTION_STACK_LIMIT),
     maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
     maxConnections_(MAX_CONNECTIONS),
+    taskExpireTime_(0),
     overloadHysteresis_(0.8),
     overloadAction_(T_OVERLOAD_NO_ACTION),
     idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
@@ -194,6 +198,7 @@
     connectionStackLimit_(CONNECTION_STACK_LIMIT),
     maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
     maxConnections_(MAX_CONNECTIONS),
+    taskExpireTime_(0),
     overloadHysteresis_(0.8),
     overloadAction_(T_OVERLOAD_NO_ACTION),
     idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
@@ -224,12 +229,13 @@
     connectionStackLimit_(CONNECTION_STACK_LIMIT),
     maxActiveProcessors_(MAX_ACTIVE_PROCESSORS),
     maxConnections_(MAX_CONNECTIONS),
+    taskExpireTime_(0),
     overloadHysteresis_(0.8),
     overloadAction_(T_OVERLOAD_NO_ACTION),
     idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
     overloaded_(false),
     nConnectionsDropped_(0),
-    nTotalConnectionsDropped_(0)  {
+    nTotalConnectionsDropped_(0) {
     setInputTransportFactory(inputTransportFactory);
     setOutputTransportFactory(outputTransportFactory);
     setInputProtocolFactory(inputProtocolFactory);
@@ -239,10 +245,7 @@
 
   ~TNonblockingServer() {}
 
-  void setThreadManager(boost::shared_ptr<ThreadManager> threadManager) {
-    threadManager_ = threadManager;
-    threadPoolProcessing_ = (threadManager != NULL);
-  }
+  void setThreadManager(boost::shared_ptr<ThreadManager> threadManager);
 
   boost::shared_ptr<ThreadManager> getThreadManager() {
     return threadManager_;
@@ -271,7 +274,7 @@
   }
 
   void addTask(boost::shared_ptr<Runnable> task) {
-    threadManager_->add(task);
+    threadManager_->add(task, 0LL, taskExpireTime_);
   }
 
   event_base* getEventBase() const {
@@ -406,6 +409,24 @@
   }
 
   /**
+   * Get the time in milliseconds after which a task expires (0 == infinite).
+   *
+   * @return a 64-bit time in milliseconds.
+   */
+  int64_t getTaskExpireTime() const {
+    return taskExpireTime_;
+  }
+
+  /**
+   * Set the time in milliseconds after which a task expires (0 == infinite).
+   *
+   * @param taskExpireTime a 64-bit time in milliseconds.
+   */
+  void setTaskExpireTime(int64_t taskExpireTime) {
+    taskExpireTime_ = taskExpireTime;
+  }
+
+  /**
    * Determine if the server is currently overloaded.
    * This function checks the maximums for open connections and connections
    * currently in processing, and sets an overload condition if they are
@@ -463,6 +484,14 @@
   void returnConnection(TConnection* connection);
 
   /**
+   * Callback function that the threadmanager calls when a task reaches
+   * its expiration time.  It is needed to clean up the expired connection.
+   *
+   * @param task the runnable associated with the expired task.
+   */
+  void expireClose(boost::shared_ptr<Runnable> task);
+
+  /**
    * C-callable event handler for listener events.  Provides a callback
    * that libevent can understand which invokes server->handleEvent().
    *
