More test code added...
     more bugs found

facebook::thrift::concurrency::ThreadManager::add
	Fixed dispatch error that resulted in only one of N worker threads ever getting notified of work

facebook::thrift::concurrency::ThreadManager
	Cleaned up addWorker/removeWorker and stop logic so that adding/removing workers doesn't wake up 
	all blocked workers.

facebook::thrift::concurrency::Thread
facebook::thrift::concurrency::Runnable
	Fixed initialization logic so that runnable can return the thread that runs it


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664729 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
index 0aa4bef..ca2bbb5 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cc
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -8,6 +8,7 @@
 
 namespace facebook { namespace thrift { namespace concurrency { 
 
+
 /** ThreadManager class
     
     This class manages a pool of threads.  It uses a ThreadFactory to create threads.  It never actually creates or destroys worker threads, rather
@@ -20,19 +21,18 @@
 
  public:
 
-  Impl() : _stopped(false) {}
+  Impl() : _state(ThreadManager::UNINITIALIZED) {}
 
+  ~Impl() {stop();}
 
-
-  ~Impl() {
-
-    if(!_stopped) {
-      stop();
-    }
-  }
+  void start();
 
   void stop();
 
+  const ThreadManager::STATE state() const {
+    return _state;
+  };
+
   const ThreadFactory* threadFactory() const {
 
     Synchronized s(_monitor); 
@@ -82,9 +82,11 @@
 
   size_t _workerCount;
 
+  size_t _workerMaxCount;
+
   size_t _idleCount;
 
-  bool _stopped;
+  ThreadManager::STATE _state;
 
   const ThreadFactory* _threadFactory;
 
@@ -94,9 +96,13 @@
 
   Monitor _monitor;
 
+  Monitor _workerMonitor;
+
   friend class ThreadManager::Worker;
 
   std::set<Thread*> _workers;
+  
+  std::set<Thread*> _deadWorkers;
 };
 
 class ThreadManager::Task : public Runnable {
@@ -133,6 +139,7 @@
 };
 
 class ThreadManager::Worker: public Runnable {
+
   enum STATE {
     UNINITIALIZED,
     STARTING,
@@ -142,6 +149,7 @@
   };
 
  public:
+
   Worker(ThreadManager::Impl* manager) : 
     _manager(manager),
     _state(UNINITIALIZED),
@@ -150,59 +158,94 @@
 
   ~Worker() {}
 
+  bool isActive() const { return _manager->_workerCount <= _manager->_workerMaxCount;}
+
   /** Worker entry point
 
       As long as worker thread is running, pull tasks off the task queue and execute. */
 
   void run() {
 
+    bool active = false;
+    
+    bool notifyManager = false;
+
+    /** Increment worker semaphore and notify manager if worker count reached desired max
+	
+	Note
+	We have to release the monitor and acquire the workerMonitor since that is what the manager
+	blocks on for worker add/remove */
+
     {Synchronized s(_manager->_monitor);
 
-      if(_state == STARTING) {
-	_state = STARTED;
+      active = _manager->_workerCount < _manager->_workerMaxCount;
+
+      if(active) {
+
+	_manager->_workerCount++;
+
+	notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
       }
-
-      _manager->_workerCount++;
-
-      _manager->_monitor.notifyAll();
     }
 
-    do {
+    if(notifyManager) {
+
+      Synchronized s(_manager->_workerMonitor);
+
+      _manager->_workerMonitor.notify();
+
+      notifyManager = false;
+    }
+
+    while(active) {
 
       ThreadManager::Task* task = NULL;
 
       /* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop).
 
-	 Once the queue is non-empty, dequeue a task, release monitor, and execute. */
+	 Once the queue is non-empty, dequeue a task, release monitor, and execute. If the worker max count has been decremented
+	 such that we exceed it, mark ourself inactive, decrement the worker count and notify the manager (technically we're notifying
+	 the next blocked thread but eventually the manager will see it. */
 
       {Synchronized s(_manager->_monitor);
 
-	while(_state == STARTED && _manager->_tasks.empty()) {
+	active = isActive();
+
+	while(active && _manager->_tasks.empty()) {
 	  
 	  _manager->_idleCount++;
 
 	  _idle = true;
 	  
 	  _manager->_monitor.wait();
+	  
+	  active = isActive();
 
 	  _idle = false;
 	  
 	  _manager->_idleCount--;
 	}
-	
-	if(_state == STARTED) {
+
+	if(active) {
 
 	  if(!_manager->_tasks.empty()) {
 	  
 	    task = _manager->_tasks.front();
 
 	    _manager->_tasks.pop();
-
+	    
 	    if(task->_state == ThreadManager::Task::WAITING) {
-
+	    
 	      task->_state = ThreadManager::Task::EXECUTING;
 	    }
 	  }
+	} else {
+
+	  _idle = true;
+	  
+	  _manager->_workerCount--;
+	  
+	  notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
 	}
       }
 
@@ -212,31 +255,29 @@
 	  try {
 	    
 	    task->run();
-
+	    
 	  } catch(...) {
 	    
 	    // XXX need to log this
 	  }
 	  
 	  delete task;
+	  
+	  task = NULL;
 	}
       }
+    }
+
+    {Synchronized s(_manager->_workerMonitor);    
+
+      _manager->_deadWorkers.insert(this->thread());
       
-    } while(_state == STARTED);
+      if(notifyManager) {
 
-    {Synchronized s(_manager->_monitor);
-
-      _manager->_workerCount--;
-
-      if(_state == STOPPING) {
-
-	_state = STOPPED; 
-
-	_manager->_monitor.notify();
-
+	_manager->_workerMonitor.notify();
       }
     }
-    
+
     return;
   }
 
@@ -263,6 +304,13 @@
 
     newThreads.insert(_threadFactory->newThread(worker));
   }
+
+  {Synchronized s(_monitor);
+
+    _workerMaxCount+= value;
+
+    _workers.insert(newThreads.begin(), newThreads.end());
+  }
   
   for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
 
@@ -273,11 +321,33 @@
     (*ix)->start();
   }
 
-  {Synchronized s(_monitor); 
+  {Synchronized s(_workerMonitor); 
       
-    _workers.insert(newThreads.begin(), newThreads.end());
+    while(_workerCount != _workerMaxCount) {
+      _workerMonitor.wait();
+    }
+  }
+}
 
-    while(_workerCount != _workers.size()) {
+void ThreadManager::Impl::start() {
+
+  if(_state == ThreadManager::STOPPED) {
+    return;
+  }
+
+  {Synchronized s(_monitor); 
+
+    if(_state == ThreadManager::UNINITIALIZED) {
+
+      if(_threadFactory == NULL) {throw InvalidArgumentException();}
+
+      _state = ThreadManager::STARTED;
+
+      _monitor.notifyAll();
+    }
+
+    while(_state == STARTING) {
+
       _monitor.wait();
     }
   }
@@ -287,78 +357,90 @@
 
   bool doStop = false;
 
+  if(_state == ThreadManager::STOPPED) {
+    return;
+  }
+
   {Synchronized s(_monitor); 
 
-    if(!_stopped) {
+    if(!_state != ThreadManager::STOPPING && _state != ThreadManager::STOPPED) {
+
       doStop = true;
-      _stopped = true;
+
+      _state = ThreadManager::STOPPING;
     }
   }
 
   if(doStop) {
+
     removeWorker(_workerCount);
+
+    _state = ThreadManager::STOPPING;
   }
+
+  // Don't block for stopping->stopped transition here, since if stop is being performed in context of a delete, the monitor may be invalid
+     
 }
   
 void ThreadManager::Impl::removeWorker(size_t value) {
 
   std::set<Thread*> removedThreads;
 
-    {Synchronized s(_monitor); 
-  
-      /* Overly clever loop
-	 
-	 First time through, (idleOnly == 1) just look for idle threads.  If that didn't find enough, go through again (idleOnly == 0)
-	 and remove a sufficient number of busy threads. */
+  {Synchronized s(_monitor); 
 
-      for(int idleOnly = 1; idleOnly >= 0; idleOnly--) {
+    if(value > _workerMaxCount) {
+
+      throw InvalidArgumentException();
+    }
+
+    _workerMaxCount-= value;
+
+    if(_idleCount < value) {
       
-	for(std::set<Thread*>::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) {
+      for(size_t ix = 0; ix < _idleCount; ix++) {
 
-	  Worker* worker = (Worker*)(*workerThread)->runnable();
-
-	  if(worker->_idle || !idleOnly) {
-
-	    if(worker->_state == ThreadManager::Worker::STARTED) {
-
-	      worker->_state = ThreadManager::Worker::STOPPING;
-	    }
-
-	    removedThreads.insert(*workerThread);
-	
-	    _workers.erase(workerThread);
-	  }
-	}
+	_monitor.notify();
       }
-      
+    } else {
+
       _monitor.notifyAll();
     }
-
-    
-    // Join removed threads and free worker 
-
-    for(std::set<Thread*>::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) {
-
-      Worker* worker = (Worker*)(*workerThread)->runnable();
-
-      (*workerThread)->join();
-      
-      delete worker;
-    }
   }
+
+  {Synchronized s(_workerMonitor); 
+
+    while(_workerCount != _workerMaxCount) {
+      _workerMonitor.wait();
+    }
+
+    for(std::set<Thread*>::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) {
+
+	_workers.erase(*ix);
+	
+	delete (*ix)->runnable();
+
+	delete (*ix);
+    }
+
+    _deadWorkers.clear();
+  }
+}
   
 void ThreadManager::Impl::add(Runnable* value) {
 
     Synchronized s(_monitor); 
 
-    bool isEmpty = _tasks.empty();
+    if(_state != ThreadManager::STARTED) {
+
+      throw IllegalStateException();
+    }
 
     _tasks.push(new ThreadManager::Task(value));
 
-    /* If queue was empty notify a thread, otherwise all worker threads are running and will get around to this
+    /* If idle thread is available notify it, otherwise all worker threads are running and will get around to this
        task in time. */
 
-    if(isEmpty && _idleCount > 0) {
+    if(_idleCount > 0) {
 
       _monitor.notify();
     }
@@ -367,6 +449,11 @@
 void ThreadManager::Impl::remove(Runnable* task) {
 
     Synchronized s(_monitor); 
+
+    if(_state != ThreadManager::STARTED) {
+
+      throw IllegalStateException();
+    }
 }
 
 class SimpleThreadManager : public ThreadManager::Impl {
@@ -378,26 +465,10 @@
     _firstTime(true) {
   }
 
-  void add(Runnable* task) {
+  void start() {
+    ThreadManager::Impl::start();
 
-    bool addWorkers = false;
-
-    {Synchronized s(_monitor);
-
-      if(_firstTime) {
-
-	_firstTime = false;
-
-	addWorkers = true;
-      }
-    }
-
-    if(addWorkers) {
-
-      addWorker(_workerCount);
-    }
-
-    Impl::add(task);
+    addWorker(_workerCount);
   }
 
 private: