Modified facebook::thrift::concurrency::Monitor.wait:
        Throw TimedOutException on wait timeout so caller can distinguish between timeout and event.

Modified facebook::thrift::concurrency::PthreadThread.start:
        Throw SystemrResourceException on any pthread_* function call failure rather than asserting 0.

Added facebook::thrift::concurrency::Thread.id() and  facebook::thrift::concurrency::ThreadFactory.currentThreadId():
        Return thread-id of thread and current thread respectively.  Needed for reentrancy tests in ThreadManager

Added facebook::thrift::concurrency::ThreadManager.pendingTaskCountMaxN
Modified facebook::thrift::concurrency::ThreadManager.add():
        Now support a maximum pending task count and block if the current pending task count is max.
        If timeout is specified for add, TimedOutException is thrown if pending task count doesn't decrease
        in the timeout interval.  If add() is called by a ThreadManager worker thread and the task cannot
        be added, a TooManyPendingTasksException is thrown rather than blocking, since deadlocks can ensue
        if worker threads block waiting for works threads to complete tasks.

Reviewed By: mcslee, aditya

Revert Plan: revertible

Test Plan: concurrency/test/ThreadManagerTests.h
           run concurrency-test thread-manager


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665120 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp
index 1631541..7d78edb 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/concurrency/ThreadManager.cpp
@@ -18,14 +18,13 @@
 #include <iostream>
 #endif //defined(DEBUG)
 
-namespace facebook { namespace thrift { namespace concurrency { 
+namespace facebook { namespace thrift { namespace concurrency {
 
 using namespace boost;
 
-
 /**
  * ThreadManager class
- * 
+ *
  * This class manages a pool of threads. It uses a ThreadFactory to create
  * threads.  It never actually creates or destroys worker threads, rather
  * it maintains statistics on number of idle threads, number of active threads,
@@ -37,10 +36,11 @@
 class ThreadManager::Impl : public ThreadManager  {
 
  public:
-  Impl() : 
+  Impl() :
     workerCount_(0),
     workerMaxCount_(0),
     idleCount_(0),
+    pendingTaskCountMax_(0),
     state_(ThreadManager::UNINITIALIZED) {}
 
   ~Impl() { stop(); }
@@ -56,39 +56,51 @@
   }
 
   shared_ptr<ThreadFactory> threadFactory() const {
-    Synchronized s(monitor_); 
+    Synchronized s(monitor_);
     return threadFactory_;
   }
-      
-  void threadFactory(shared_ptr<ThreadFactory> value) {  
+
+  void threadFactory(shared_ptr<ThreadFactory> value) {
     Synchronized s(monitor_);
     threadFactory_ = value;
   }
 
   void addWorker(size_t value);
-  
+
   void removeWorker(size_t value);
-  
+
   size_t idleWorkerCount() const {
     return idleCount_;
   }
 
   size_t workerCount() const {
-    Synchronized s(monitor_); 
+    Synchronized s(monitor_);
     return workerCount_;
   }
-  
+
   size_t pendingTaskCount() const {
-    Synchronized s(monitor_); 
+    Synchronized s(monitor_);
     return tasks_.size();
   }
 
   size_t totalTaskCount() const {
-    Synchronized s(monitor_);   
+    Synchronized s(monitor_);
     return tasks_.size() + workerCount_ - idleCount_;
   }
-  
-  void add(shared_ptr<Runnable> value);
+
+  size_t pendingTaskCountMax() const {
+    Synchronized s(monitor_);
+    return pendingTaskCountMax_;
+  }
+
+  void pendingTaskCountMax(const size_t value) {
+    Synchronized s(monitor_);
+    pendingTaskCountMax_ = value;
+  }
+
+  bool canSleep();
+
+  void add(shared_ptr<Runnable> value, long long timeout);
 
   void remove(shared_ptr<Runnable> task);
 
@@ -98,6 +110,8 @@
   size_t workerCount_;
   size_t workerMaxCount_;
   size_t idleCount_;
+  size_t pendingTaskCountMax_;
+
   ThreadManager::STATE state_;
   shared_ptr<ThreadFactory> threadFactory_;
 
@@ -110,6 +124,7 @@
   friend class ThreadManager::Worker;
   std::set<shared_ptr<Thread> > workers_;
   std::set<shared_ptr<Thread> > deadWorkers_;
+  std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
 };
 
 class ThreadManager::Task : public Runnable {
@@ -151,7 +166,7 @@
   };
 
  public:
-  Worker(ThreadManager::Impl* manager) : 
+  Worker(ThreadManager::Impl* manager) :
     manager_(manager),
     state_(UNINITIALIZED),
     idle_(false) {}
@@ -173,7 +188,7 @@
    * execute.
    */
   void run() {
-    bool active = false;  
+    bool active = false;
     bool notifyManager = false;
 
     /**
@@ -230,6 +245,13 @@
             if (task->state_ == ThreadManager::Task::WAITING) {
               task->state_ = ThreadManager::Task::EXECUTING;
 	    }
+
+            /* If we have a pending task max and we just dropped below it, wakeup any
+               thread that might be blocked on add. */
+            if(manager_->pendingTaskCountMax_ != 0 &&
+               manager_->tasks_.size() == manager_->pendingTaskCountMax_ - 1) {
+              manager_->workerMonitor_.notify();
+            }
 	  }
 	} else {
 	  idle_ = true;
@@ -237,7 +259,7 @@
           notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
 	}
       }
-      
+
       if (task != NULL) {
 	if (task->state_ == ThreadManager::Task::EXECUTING) {
 	  try {
@@ -248,18 +270,18 @@
 	}
       }
     }
-    
+
     {
-      Synchronized s(manager_->workerMonitor_);    
+      Synchronized s(manager_->workerMonitor_);
       manager_->deadWorkers_.insert(this->thread());
       if (notifyManager) {
         manager_->workerMonitor_.notify();
       }
     }
-    
+
     return;
   }
-  
+
   private:
     ThreadManager::Impl* manager_;
     friend class ThreadManager::Impl;
@@ -271,7 +293,7 @@
   void ThreadManager::Impl::addWorker(size_t value) {
   std::set<shared_ptr<Thread> > newThreads;
   for (size_t ix = 0; ix < value; ix++) {
-    class ThreadManager::Worker;     
+    class ThreadManager::Worker;
     shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
     newThreads.insert(threadFactory_->newThread(worker));
   }
@@ -281,15 +303,16 @@
     workerMaxCount_ += value;
     workers_.insert(newThreads.begin(), newThreads.end());
   }
-  
+
   for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
     shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
     worker->state_ = ThreadManager::Worker::STARTING;
     (*ix)->start();
+    idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->id(), *ix));
   }
 
   {
-    Synchronized s(workerMonitor_); 
+    Synchronized s(workerMonitor_);
     while (workerCount_ != workerMaxCount_) {
       workerMonitor_.wait();
     }
@@ -303,7 +326,7 @@
   }
 
   {
-    Synchronized s(monitor_); 
+    Synchronized s(monitor_);
     if (state_ == ThreadManager::UNINITIALIZED) {
       if (threadFactory_ == NULL) {
         throw InvalidArgumentException();
@@ -325,7 +348,7 @@
   }
 
   {
-    Synchronized s(monitor_); 
+    Synchronized s(monitor_);
     if (state_ != ThreadManager::STOPPING &&
         state_ != ThreadManager::JOINING &&
         state_ != ThreadManager::STOPPED) {
@@ -338,12 +361,12 @@
     removeWorker(workerCount_);
   }
 
-  // XXX 
+  // XXX
   // should be able to block here for transition to STOPPED since we're no
   // using shared_ptrs
 
   {
-    Synchronized s(monitor_); 
+    Synchronized s(monitor_);
     state_ = ThreadManager::STOPPED;
   }
 
@@ -352,7 +375,7 @@
 void ThreadManager::Impl::removeWorker(size_t value) {
   std::set<shared_ptr<Thread> > removedThreads;
   {
-    Synchronized s(monitor_); 
+    Synchronized s(monitor_);
     if (value > workerMaxCount_) {
       throw InvalidArgumentException();
     }
@@ -369,7 +392,7 @@
   }
 
   {
-    Synchronized s(workerMonitor_); 
+    Synchronized s(workerMonitor_);
 
     while (workerCount_ != workerMaxCount_) {
       workerMonitor_.wait();
@@ -377,19 +400,37 @@
 
     for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
       workers_.erase(*ix);
+      idMap_.erase((*ix)->id());
     }
-    
+
     deadWorkers_.clear();
   }
 }
-  
-void ThreadManager::Impl::add(shared_ptr<Runnable> value) {
-    Synchronized s(monitor_); 
+
+  bool ThreadManager::Impl::canSleep() {
+    const Thread::id_t id = threadFactory_->currentThreadId();
+    return idMap_.find(id) == idMap_.end();
+  }
+
+  void ThreadManager::Impl::add(shared_ptr<Runnable> value, long long timeout) {
+    Synchronized s(monitor_);
 
     if (state_ != ThreadManager::STARTED) {
       throw IllegalStateException();
     }
 
+    if(pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
+
+      if(canSleep()) {
+
+        while(pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
+          monitor_.wait(timeout);
+        }
+      } else {
+        throw TooManyPendingTasksException();
+      }
+    }
+
     tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
 
     // If idle thread is available notify it, otherwise all worker threads are
@@ -400,7 +441,7 @@
   }
 
 void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
-  Synchronized s(monitor_); 
+  Synchronized s(monitor_);
   if (state_ != ThreadManager::STARTED) {
     throw IllegalStateException();
   }
@@ -409,18 +450,21 @@
 class SimpleThreadManager : public ThreadManager::Impl {
 
 public:
-  SimpleThreadManager(size_t workerCount=4) : 
+  SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) :
     workerCount_(workerCount),
+    pendingTaskCountMax_(pendingTaskCountMax),
     firstTime_(true) {
   }
 
   void start() {
+    ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
     ThreadManager::Impl::start();
     addWorker(workerCount_);
   }
 
 private:
   const size_t workerCount_;
+  const size_t pendingTaskCountMax_;
   bool firstTime_;
   Monitor monitor_;
 };
@@ -430,8 +474,9 @@
   return shared_ptr<ThreadManager>(new ThreadManager::Impl());
 }
 
-shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count) {
-  return shared_ptr<ThreadManager>(new SimpleThreadManager(count));
+shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) {
+  return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
 }
 
 }}} // facebook::thrift::concurrency
+