diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp
index 52473c7..a02ad74 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/concurrency/ThreadManager.cpp
@@ -20,6 +20,7 @@
 #include "ThreadManager.h"
 #include "Exception.h"
 #include "Monitor.h"
+#include "Util.h"
 
 #include <boost/shared_ptr.hpp>
 
@@ -54,6 +55,7 @@
     workerMaxCount_(0),
     idleCount_(0),
     pendingTaskCountMax_(0),
+    expiredCount_(0),
     state_(ThreadManager::UNINITIALIZED),
     monitor_(&mutex_),
     maxMonitor_(&mutex_) {}
@@ -108,6 +110,13 @@
     return pendingTaskCountMax_;
   }
 
+  size_t expiredTaskCount() {
+    Synchronized s(monitor_);
+    size_t result = expiredCount_;
+    expiredCount_ = 0;
+    return result;
+  }
+
   void pendingTaskCountMax(const size_t value) {
     Synchronized s(monitor_);
     pendingTaskCountMax_ = value;
@@ -115,12 +124,16 @@
 
   bool canSleep();
 
-  void add(shared_ptr<Runnable> value, int64_t timeout);
+  void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration);
 
   void remove(shared_ptr<Runnable> task);
 
   shared_ptr<Runnable> removeNextPending();
 
+  void removeExpiredTasks();
+
+  void setExpireCallback(ExpireCallback expireCallback);
+
 private:
   void stopImpl(bool join);
 
@@ -128,6 +141,8 @@
   size_t workerMaxCount_;
   size_t idleCount_;
   size_t pendingTaskCountMax_;
+  size_t expiredCount_;
+  ExpireCallback expireCallback_;
 
   ThreadManager::STATE state_;
   shared_ptr<ThreadFactory> threadFactory_;
@@ -156,9 +171,10 @@
     COMPLETE
   };
 
-  Task(shared_ptr<Runnable> runnable) :
+  Task(shared_ptr<Runnable> runnable, int64_t expiration=0LL)  :
     runnable_(runnable),
-    state_(WAITING) {}
+    state_(WAITING),
+    expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {}
 
   ~Task() {}
 
@@ -173,10 +189,15 @@
     return runnable_;
   }
 
+  int64_t getExpireTime() const {
+    return expireTime_;
+  }
+
  private:
   shared_ptr<Runnable> runnable_;
   friend class ThreadManager::Worker;
   STATE state_;
+  int64_t expireTime_;
 };
 
 class ThreadManager::Worker: public Runnable {
@@ -262,6 +283,8 @@
         }
 
         if (active) {
+          manager_->removeExpiredTasks();
+
           if (!manager_->tasks_.empty()) {
             task = manager_->tasks_.front();
             manager_->tasks_.pop();
@@ -435,13 +458,16 @@
     return idMap_.find(id) == idMap_.end();
   }
 
-  void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout) {
+  void ThreadManager::Impl::add(shared_ptr<Runnable> value,
+                                int64_t timeout,
+                                int64_t expiration) {
     Guard g(mutex_);
 
     if (state_ != ThreadManager::STARTED) {
       throw IllegalStateException();
     }
 
+    removeExpiredTasks();
     if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
       if (canSleep() && timeout >= 0) {
         while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
@@ -453,7 +479,7 @@
       }
     }
 
-    tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
+    tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration)));
 
     // If idle thread is available notify it, otherwise all worker threads are
     // running and will get around to this task in time.
@@ -485,6 +511,34 @@
   return task->getRunnable();
 }
 
+void ThreadManager::Impl::removeExpiredTasks() {
+  int64_t now = 0LL; // we won't ask for the time untile we need it
+
+  // note that this loop breaks at the first non-expiring task
+  while (!tasks_.empty()) {
+    shared_ptr<ThreadManager::Task> task = tasks_.front();
+    if (task->getExpireTime() == 0LL) {
+      break;
+    }
+    if (now == 0LL) {
+      now = Util::currentTime();
+    }
+    if (task->getExpireTime() > now) {
+      break;
+    }
+    if (expireCallback_) {
+      expireCallback_(task->getRunnable());
+    }
+    tasks_.pop();
+    expiredCount_++;
+  }
+}
+
+
+void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
+  expireCallback_ = expireCallback;
+}
+
 class SimpleThreadManager : public ThreadManager::Impl {
 
  public:
diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h
index cbf08c0..95c4906 100644
--- a/lib/cpp/src/concurrency/ThreadManager.h
+++ b/lib/cpp/src/concurrency/ThreadManager.h
@@ -21,6 +21,7 @@
 #define _THRIFT_CONCURRENCY_THREADMANAGER_H_ 1
 
 #include <boost/shared_ptr.hpp>
+#include <tr1/functional>
 #include <sys/types.h>
 #include "Thread.h"
 
@@ -56,6 +57,9 @@
   ThreadManager() {}
 
  public:
+  class Task;
+  typedef std::tr1::function<void(boost::shared_ptr<Runnable>)> ExpireCallback;
+
   virtual ~ThreadManager() {}
 
   /**
@@ -125,6 +129,11 @@
   virtual size_t pendingTaskCountMax() const = 0;
 
   /**
+   * Gets the number of tasks which have been expired without being run.
+   */
+  virtual size_t expiredTaskCount() = 0;
+
+  /**
    * Adds a task to be executed at some time in the future by a worker thread.
    *
    * This method will block if pendingTaskCountMax() in not zero and pendingTaskCount()
@@ -138,10 +147,14 @@
    * is specified. Specific cases:
    * timeout = 0  : Wait forever to queue task.
    * timeout = -1 : Return immediately if pending task count exceeds specified max
+   * @param expiration when nonzero, the number of milliseconds the task is valid
+   * to be run; if exceeded, the task will be dropped off the queue and not run.
    *
    * @throws TooManyPendingTasksException Pending task count exceeds max pending task count
    */
-  virtual void add(boost::shared_ptr<Runnable>task, int64_t timeout=0LL) = 0;
+  virtual void add(boost::shared_ptr<Runnable>task,
+                   int64_t timeout=0LL,
+                   int64_t expiration=0LL) = 0;
 
   /**
    * Removes a pending task
@@ -155,6 +168,19 @@
    */
   virtual boost::shared_ptr<Runnable> removeNextPending() = 0;
 
+  /**
+   * Remove tasks from front of task queue that have expired.
+   */
+  virtual void removeExpiredTasks() = 0;
+
+  /**
+   * Set a callback to be called when a task is expired and not run.
+   *
+   * @param expireCallback a function called with the shared_ptr<Runnable> for
+   * the expired task.
+   */
+  virtual void setExpireCallback(ExpireCallback expireCallback) = 0;
+
   static boost::shared_ptr<ThreadManager> newThreadManager();
 
   /**
