cpp: TNonblockingServer: Allow unrun tasks to expire after a time limit

Enhance ThreadManager to allow a expiration time interval to be assigned
to tasks, and expire those tasks after that time limit has passed.
Enhance TNonblockingServer to utilize this capability so it can be used
for overload resilience.

Note: expired entries are only removed from the queue head, so the
mechanism in ThreadManager may not do what you expect if you have
heterogeneous expiration times. That's not an issue with
TNonblockingServer (which will give all tasks the same limit) and might
not be in other cases where most tasks have the same limit and the rest
execute quickly. A full-up timeout queue would be more complex and have
greater overhead than that used here. It's unnecessary for the task at
hand so I didn't go that route...

The TNonblocking interface is simple: a setTaskExpireTime() accepts a
64-bit millisecond argument. 0 means infinite. A getTaskExpireTime()
accessor completes the interface.

The ThreadManager interface involves an added argument to add() for the
expiration interval and a setExpireCallback() function for setting a
callback that is called for expired tasks (for this project this is
necessary to shut down the associated connection).

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@920673 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp
index 52473c7..a02ad74 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/concurrency/ThreadManager.cpp
@@ -20,6 +20,7 @@
 #include "ThreadManager.h"
 #include "Exception.h"
 #include "Monitor.h"
+#include "Util.h"
 
 #include <boost/shared_ptr.hpp>
 
@@ -54,6 +55,7 @@
     workerMaxCount_(0),
     idleCount_(0),
     pendingTaskCountMax_(0),
+    expiredCount_(0),
     state_(ThreadManager::UNINITIALIZED),
     monitor_(&mutex_),
     maxMonitor_(&mutex_) {}
@@ -108,6 +110,13 @@
     return pendingTaskCountMax_;
   }
 
+  size_t expiredTaskCount() {
+    Synchronized s(monitor_);
+    size_t result = expiredCount_;
+    expiredCount_ = 0;
+    return result;
+  }
+
   void pendingTaskCountMax(const size_t value) {
     Synchronized s(monitor_);
     pendingTaskCountMax_ = value;
@@ -115,12 +124,16 @@
 
   bool canSleep();
 
-  void add(shared_ptr<Runnable> value, int64_t timeout);
+  void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration);
 
   void remove(shared_ptr<Runnable> task);
 
   shared_ptr<Runnable> removeNextPending();
 
+  void removeExpiredTasks();
+
+  void setExpireCallback(ExpireCallback expireCallback);
+
 private:
   void stopImpl(bool join);
 
@@ -128,6 +141,8 @@
   size_t workerMaxCount_;
   size_t idleCount_;
   size_t pendingTaskCountMax_;
+  size_t expiredCount_;
+  ExpireCallback expireCallback_;
 
   ThreadManager::STATE state_;
   shared_ptr<ThreadFactory> threadFactory_;
@@ -156,9 +171,10 @@
     COMPLETE
   };
 
-  Task(shared_ptr<Runnable> runnable) :
+  Task(shared_ptr<Runnable> runnable, int64_t expiration=0LL)  :
     runnable_(runnable),
-    state_(WAITING) {}
+    state_(WAITING),
+    expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {}
 
   ~Task() {}
 
@@ -173,10 +189,15 @@
     return runnable_;
   }
 
+  int64_t getExpireTime() const {
+    return expireTime_;
+  }
+
  private:
   shared_ptr<Runnable> runnable_;
   friend class ThreadManager::Worker;
   STATE state_;
+  int64_t expireTime_;
 };
 
 class ThreadManager::Worker: public Runnable {
@@ -262,6 +283,8 @@
         }
 
         if (active) {
+          manager_->removeExpiredTasks();
+
           if (!manager_->tasks_.empty()) {
             task = manager_->tasks_.front();
             manager_->tasks_.pop();
@@ -435,13 +458,16 @@
     return idMap_.find(id) == idMap_.end();
   }
 
-  void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout) {
+  void ThreadManager::Impl::add(shared_ptr<Runnable> value,
+                                int64_t timeout,
+                                int64_t expiration) {
     Guard g(mutex_);
 
     if (state_ != ThreadManager::STARTED) {
       throw IllegalStateException();
     }
 
+    removeExpiredTasks();
     if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
       if (canSleep() && timeout >= 0) {
         while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
@@ -453,7 +479,7 @@
       }
     }
 
-    tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
+    tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration)));
 
     // If idle thread is available notify it, otherwise all worker threads are
     // running and will get around to this task in time.
@@ -485,6 +511,34 @@
   return task->getRunnable();
 }
 
+void ThreadManager::Impl::removeExpiredTasks() {
+  int64_t now = 0LL; // we won't ask for the time untile we need it
+
+  // note that this loop breaks at the first non-expiring task
+  while (!tasks_.empty()) {
+    shared_ptr<ThreadManager::Task> task = tasks_.front();
+    if (task->getExpireTime() == 0LL) {
+      break;
+    }
+    if (now == 0LL) {
+      now = Util::currentTime();
+    }
+    if (task->getExpireTime() > now) {
+      break;
+    }
+    if (expireCallback_) {
+      expireCallback_(task->getRunnable());
+    }
+    tasks_.pop();
+    expiredCount_++;
+  }
+}
+
+
+void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
+  expireCallback_ = expireCallback;
+}
+
 class SimpleThreadManager : public ThreadManager::Impl {
 
  public:
diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h
index cbf08c0..95c4906 100644
--- a/lib/cpp/src/concurrency/ThreadManager.h
+++ b/lib/cpp/src/concurrency/ThreadManager.h
@@ -21,6 +21,7 @@
 #define _THRIFT_CONCURRENCY_THREADMANAGER_H_ 1
 
 #include <boost/shared_ptr.hpp>
+#include <tr1/functional>
 #include <sys/types.h>
 #include "Thread.h"
 
@@ -56,6 +57,9 @@
   ThreadManager() {}
 
  public:
+  class Task;
+  typedef std::tr1::function<void(boost::shared_ptr<Runnable>)> ExpireCallback;
+
   virtual ~ThreadManager() {}
 
   /**
@@ -125,6 +129,11 @@
   virtual size_t pendingTaskCountMax() const = 0;
 
   /**
+   * Gets the number of tasks which have been expired without being run.
+   */
+  virtual size_t expiredTaskCount() = 0;
+
+  /**
    * Adds a task to be executed at some time in the future by a worker thread.
    *
    * This method will block if pendingTaskCountMax() in not zero and pendingTaskCount()
@@ -138,10 +147,14 @@
    * is specified. Specific cases:
    * timeout = 0  : Wait forever to queue task.
    * timeout = -1 : Return immediately if pending task count exceeds specified max
+   * @param expiration when nonzero, the number of milliseconds the task is valid
+   * to be run; if exceeded, the task will be dropped off the queue and not run.
    *
    * @throws TooManyPendingTasksException Pending task count exceeds max pending task count
    */
-  virtual void add(boost::shared_ptr<Runnable>task, int64_t timeout=0LL) = 0;
+  virtual void add(boost::shared_ptr<Runnable>task,
+                   int64_t timeout=0LL,
+                   int64_t expiration=0LL) = 0;
 
   /**
    * Removes a pending task
@@ -155,6 +168,19 @@
    */
   virtual boost::shared_ptr<Runnable> removeNextPending() = 0;
 
+  /**
+   * Remove tasks from front of task queue that have expired.
+   */
+  virtual void removeExpiredTasks() = 0;
+
+  /**
+   * Set a callback to be called when a task is expired and not run.
+   *
+   * @param expireCallback a function called with the shared_ptr<Runnable> for
+   * the expired task.
+   */
+  virtual void setExpireCallback(ExpireCallback expireCallback) = 0;
+
   static boost::shared_ptr<ThreadManager> newThreadManager();
 
   /**