More test code added...
     more bugs found

facebook::thrift::concurrency::ThreadManager::add
	Fixed dispatch error that resulted in only one of N worker threads ever getting notified of work

facebook::thrift::concurrency::ThreadManager
	Cleaned up addWorker/removeWorker and stop logic so that adding/removing workers doesn't wake up 
	all blocked workers.

facebook::thrift::concurrency::Thread
facebook::thrift::concurrency::Runnable
	Fixed initialization logic so that runnable can return the thread that runs it


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664729 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc
index 3587c03..00b2613 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cc
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc
@@ -36,9 +36,6 @@
 
   int _stackSize;
 
-  Runnable* _runnable;
-
-
 public:
   
   PthreadThread(int policy, int priority, int stackSize, Runnable* runnable) : 
@@ -46,9 +43,10 @@
     _state(uninitialized), 
     _policy(policy),
     _priority(priority),
-    _stackSize(stackSize),
-    _runnable(runnable)
-  {}
+    _stackSize(stackSize) { 
+
+    this->Thread::runnable(runnable);
+  }
 
   void start() {
 
@@ -92,7 +90,9 @@
     }
   }
 
-  Runnable* runnable() const {return _runnable;}
+  Runnable* runnable() const {return Thread::runnable();}
+
+  void runnable(Runnable* value) {Thread::runnable(value);}
 
 };
 
@@ -107,7 +107,7 @@
 
   thread->_state = starting;
 
-  thread->_runnable->run();
+  thread->runnable()->run();
 
   if(thread->_state != stopping && thread->_state != stopped) {
     thread->_state = stopping;
diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h
index 8237091..2416887 100644
--- a/lib/cpp/src/concurrency/Thread.h
+++ b/lib/cpp/src/concurrency/Thread.h
@@ -17,6 +17,18 @@
   virtual ~Runnable() {};
 
   virtual void run() = 0;
+
+  virtual Thread* thread() {return _thread;}
+
+ private:
+
+  /** Sets the thread that is executing this object.  This is only meant for use by concrete implementations of Thread. */
+
+  friend class Thread;
+
+  virtual void thread(Thread* value) {_thread = value;}
+
+  Thread* _thread;
 };
 
 /** Minimal thread class.  Returned by thread factory bound to a Runnable object and ready to start execution.  More or less analogous to java.lang.Thread
@@ -43,7 +55,15 @@
 
   /** Gets the runnable object this thread is hosting */
   
-  virtual Runnable* runnable() const = 0;
+  virtual Runnable* runnable() const {return _runnable;}
+
+ protected:
+
+  virtual void runnable(Runnable* value, bool x=false) {_runnable = value; _runnable->thread(this);}
+
+ private:
+  
+  Runnable* _runnable;
 };
 
 /** Factory to create platform-specific thread object and bind them to Runnable object for execution */
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
index 0aa4bef..ca2bbb5 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cc
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -8,6 +8,7 @@
 
 namespace facebook { namespace thrift { namespace concurrency { 
 
+
 /** ThreadManager class
     
     This class manages a pool of threads.  It uses a ThreadFactory to create threads.  It never actually creates or destroys worker threads, rather
@@ -20,19 +21,18 @@
 
  public:
 
-  Impl() : _stopped(false) {}
+  Impl() : _state(ThreadManager::UNINITIALIZED) {}
 
+  ~Impl() {stop();}
 
-
-  ~Impl() {
-
-    if(!_stopped) {
-      stop();
-    }
-  }
+  void start();
 
   void stop();
 
+  const ThreadManager::STATE state() const {
+    return _state;
+  };
+
   const ThreadFactory* threadFactory() const {
 
     Synchronized s(_monitor); 
@@ -82,9 +82,11 @@
 
   size_t _workerCount;
 
+  size_t _workerMaxCount;
+
   size_t _idleCount;
 
-  bool _stopped;
+  ThreadManager::STATE _state;
 
   const ThreadFactory* _threadFactory;
 
@@ -94,9 +96,13 @@
 
   Monitor _monitor;
 
+  Monitor _workerMonitor;
+
   friend class ThreadManager::Worker;
 
   std::set<Thread*> _workers;
+  
+  std::set<Thread*> _deadWorkers;
 };
 
 class ThreadManager::Task : public Runnable {
@@ -133,6 +139,7 @@
 };
 
 class ThreadManager::Worker: public Runnable {
+
   enum STATE {
     UNINITIALIZED,
     STARTING,
@@ -142,6 +149,7 @@
   };
 
  public:
+
   Worker(ThreadManager::Impl* manager) : 
     _manager(manager),
     _state(UNINITIALIZED),
@@ -150,59 +158,94 @@
 
   ~Worker() {}
 
+  bool isActive() const { return _manager->_workerCount <= _manager->_workerMaxCount;}
+
   /** Worker entry point
 
       As long as worker thread is running, pull tasks off the task queue and execute. */
 
   void run() {
 
+    bool active = false;
+    
+    bool notifyManager = false;
+
+    /** Increment worker semaphore and notify manager if worker count reached desired max
+	
+	Note
+	We have to release the monitor and acquire the workerMonitor since that is what the manager
+	blocks on for worker add/remove */
+
     {Synchronized s(_manager->_monitor);
 
-      if(_state == STARTING) {
-	_state = STARTED;
+      active = _manager->_workerCount < _manager->_workerMaxCount;
+
+      if(active) {
+
+	_manager->_workerCount++;
+
+	notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
       }
-
-      _manager->_workerCount++;
-
-      _manager->_monitor.notifyAll();
     }
 
-    do {
+    if(notifyManager) {
+
+      Synchronized s(_manager->_workerMonitor);
+
+      _manager->_workerMonitor.notify();
+
+      notifyManager = false;
+    }
+
+    while(active) {
 
       ThreadManager::Task* task = NULL;
 
       /* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop).
 
-	 Once the queue is non-empty, dequeue a task, release monitor, and execute. */
+	 Once the queue is non-empty, dequeue a task, release monitor, and execute. If the worker max count has been decremented
+	 such that we exceed it, mark ourself inactive, decrement the worker count and notify the manager (technically we're notifying
+	 the next blocked thread but eventually the manager will see it. */
 
       {Synchronized s(_manager->_monitor);
 
-	while(_state == STARTED && _manager->_tasks.empty()) {
+	active = isActive();
+
+	while(active && _manager->_tasks.empty()) {
 	  
 	  _manager->_idleCount++;
 
 	  _idle = true;
 	  
 	  _manager->_monitor.wait();
+	  
+	  active = isActive();
 
 	  _idle = false;
 	  
 	  _manager->_idleCount--;
 	}
-	
-	if(_state == STARTED) {
+
+	if(active) {
 
 	  if(!_manager->_tasks.empty()) {
 	  
 	    task = _manager->_tasks.front();
 
 	    _manager->_tasks.pop();
-
+	    
 	    if(task->_state == ThreadManager::Task::WAITING) {
-
+	    
 	      task->_state = ThreadManager::Task::EXECUTING;
 	    }
 	  }
+	} else {
+
+	  _idle = true;
+	  
+	  _manager->_workerCount--;
+	  
+	  notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
 	}
       }
 
@@ -212,31 +255,29 @@
 	  try {
 	    
 	    task->run();
-
+	    
 	  } catch(...) {
 	    
 	    // XXX need to log this
 	  }
 	  
 	  delete task;
+	  
+	  task = NULL;
 	}
       }
+    }
+
+    {Synchronized s(_manager->_workerMonitor);    
+
+      _manager->_deadWorkers.insert(this->thread());
       
-    } while(_state == STARTED);
+      if(notifyManager) {
 
-    {Synchronized s(_manager->_monitor);
-
-      _manager->_workerCount--;
-
-      if(_state == STOPPING) {
-
-	_state = STOPPED; 
-
-	_manager->_monitor.notify();
-
+	_manager->_workerMonitor.notify();
       }
     }
-    
+
     return;
   }
 
@@ -263,6 +304,13 @@
 
     newThreads.insert(_threadFactory->newThread(worker));
   }
+
+  {Synchronized s(_monitor);
+
+    _workerMaxCount+= value;
+
+    _workers.insert(newThreads.begin(), newThreads.end());
+  }
   
   for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
 
@@ -273,11 +321,33 @@
     (*ix)->start();
   }
 
-  {Synchronized s(_monitor); 
+  {Synchronized s(_workerMonitor); 
       
-    _workers.insert(newThreads.begin(), newThreads.end());
+    while(_workerCount != _workerMaxCount) {
+      _workerMonitor.wait();
+    }
+  }
+}
 
-    while(_workerCount != _workers.size()) {
+void ThreadManager::Impl::start() {
+
+  if(_state == ThreadManager::STOPPED) {
+    return;
+  }
+
+  {Synchronized s(_monitor); 
+
+    if(_state == ThreadManager::UNINITIALIZED) {
+
+      if(_threadFactory == NULL) {throw InvalidArgumentException();}
+
+      _state = ThreadManager::STARTED;
+
+      _monitor.notifyAll();
+    }
+
+    while(_state == STARTING) {
+
       _monitor.wait();
     }
   }
@@ -287,78 +357,90 @@
 
   bool doStop = false;
 
+  if(_state == ThreadManager::STOPPED) {
+    return;
+  }
+
   {Synchronized s(_monitor); 
 
-    if(!_stopped) {
+    if(!_state != ThreadManager::STOPPING && _state != ThreadManager::STOPPED) {
+
       doStop = true;
-      _stopped = true;
+
+      _state = ThreadManager::STOPPING;
     }
   }
 
   if(doStop) {
+
     removeWorker(_workerCount);
+
+    _state = ThreadManager::STOPPING;
   }
+
+  // Don't block for stopping->stopped transition here, since if stop is being performed in context of a delete, the monitor may be invalid
+     
 }
   
 void ThreadManager::Impl::removeWorker(size_t value) {
 
   std::set<Thread*> removedThreads;
 
-    {Synchronized s(_monitor); 
-  
-      /* Overly clever loop
-	 
-	 First time through, (idleOnly == 1) just look for idle threads.  If that didn't find enough, go through again (idleOnly == 0)
-	 and remove a sufficient number of busy threads. */
+  {Synchronized s(_monitor); 
 
-      for(int idleOnly = 1; idleOnly >= 0; idleOnly--) {
+    if(value > _workerMaxCount) {
+
+      throw InvalidArgumentException();
+    }
+
+    _workerMaxCount-= value;
+
+    if(_idleCount < value) {
       
-	for(std::set<Thread*>::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) {
+      for(size_t ix = 0; ix < _idleCount; ix++) {
 
-	  Worker* worker = (Worker*)(*workerThread)->runnable();
-
-	  if(worker->_idle || !idleOnly) {
-
-	    if(worker->_state == ThreadManager::Worker::STARTED) {
-
-	      worker->_state = ThreadManager::Worker::STOPPING;
-	    }
-
-	    removedThreads.insert(*workerThread);
-	
-	    _workers.erase(workerThread);
-	  }
-	}
+	_monitor.notify();
       }
-      
+    } else {
+
       _monitor.notifyAll();
     }
-
-    
-    // Join removed threads and free worker 
-
-    for(std::set<Thread*>::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) {
-
-      Worker* worker = (Worker*)(*workerThread)->runnable();
-
-      (*workerThread)->join();
-      
-      delete worker;
-    }
   }
+
+  {Synchronized s(_workerMonitor); 
+
+    while(_workerCount != _workerMaxCount) {
+      _workerMonitor.wait();
+    }
+
+    for(std::set<Thread*>::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) {
+
+	_workers.erase(*ix);
+	
+	delete (*ix)->runnable();
+
+	delete (*ix);
+    }
+
+    _deadWorkers.clear();
+  }
+}
   
 void ThreadManager::Impl::add(Runnable* value) {
 
     Synchronized s(_monitor); 
 
-    bool isEmpty = _tasks.empty();
+    if(_state != ThreadManager::STARTED) {
+
+      throw IllegalStateException();
+    }
 
     _tasks.push(new ThreadManager::Task(value));
 
-    /* If queue was empty notify a thread, otherwise all worker threads are running and will get around to this
+    /* If idle thread is available notify it, otherwise all worker threads are running and will get around to this
        task in time. */
 
-    if(isEmpty && _idleCount > 0) {
+    if(_idleCount > 0) {
 
       _monitor.notify();
     }
@@ -367,6 +449,11 @@
 void ThreadManager::Impl::remove(Runnable* task) {
 
     Synchronized s(_monitor); 
+
+    if(_state != ThreadManager::STARTED) {
+
+      throw IllegalStateException();
+    }
 }
 
 class SimpleThreadManager : public ThreadManager::Impl {
@@ -378,26 +465,10 @@
     _firstTime(true) {
   }
 
-  void add(Runnable* task) {
+  void start() {
+    ThreadManager::Impl::start();
 
-    bool addWorkers = false;
-
-    {Synchronized s(_monitor);
-
-      if(_firstTime) {
-
-	_firstTime = false;
-
-	addWorkers = true;
-      }
-    }
-
-    if(addWorkers) {
-
-      addWorker(_workerCount);
-    }
-
-    Impl::add(task);
+    addWorker(_workerCount);
   }
 
 private:
diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h
index 596471d..aa5a98a 100644
--- a/lib/cpp/src/concurrency/ThreadManager.h
+++ b/lib/cpp/src/concurrency/ThreadManager.h
@@ -32,6 +32,26 @@
 
   virtual ~ThreadManager() {}
 
+  /** Starts the thread manager.  Verifies all attributes have been properly initialized, then allocates necessary resources to begin operation */
+  
+  virtual void start() = 0;
+
+  /** Stops the thread manager.  Aborts all remaining unprocessed task, shuts down all created worker threads, and realeases all allocated resources.
+      This method blocks for all worker threads to complete, thus it can potentially block forever if a worker thread is running a task that 
+      won't terminate. */
+
+  virtual void stop() = 0;
+
+  enum STATE {
+    UNINITIALIZED,
+    STARTING,
+    STARTED,
+    STOPPING,
+    STOPPED
+  };
+  
+  virtual const STATE state() const = 0;
+
   virtual const ThreadFactory* threadFactory() const = 0;
 
   virtual void threadFactory(const ThreadFactory* value) = 0;
diff --git a/lib/cpp/src/concurrency/test/Tests.cc b/lib/cpp/src/concurrency/test/Tests.cc
index d139f55..2174bf4 100644
--- a/lib/cpp/src/concurrency/test/Tests.cc
+++ b/lib/cpp/src/concurrency/test/Tests.cc
@@ -30,13 +30,13 @@
 
     assert(threadFactoryTests.helloWorldTest());
 
-    size_t count =  10000;
+    size_t count =  1000;
 
     std::cout << "\t\tThreadFactory reap N threads test: N = " << count << std::endl;
 
     assert(threadFactoryTests.reapNThreads(count));
 
-    std::cout << "\t\tThreadFactory synchrous start test" << std::endl;
+    std::cout << "\t\tThreadFactory synchronous start test" << std::endl;
 
     assert(threadFactoryTests.synchStartTest());
   }
@@ -56,11 +56,17 @@
 
     std::cout << "ThreadManager tests..." << std::endl;
 
-    std::cout << "\t\tThreadManager test00" << std::endl;
+    size_t workerCount = 10;
+
+    size_t taskCount = 10000;
+
+    long long delay = 10LL;
+
+    std::cout << "\t\tThreadManager load test: worker count: " << workerCount << " task count: " << taskCount << " delay: " << delay << std::endl;
 
     ThreadManagerTests threadManagerTests;
 
-    assert(threadManagerTests.test00());
+    assert(threadManagerTests.loadTest(taskCount, delay, workerCount));
   }
 }
 
diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
index aad6332..72d6777 100644
--- a/lib/cpp/src/concurrency/test/ThreadManagerTests.h
+++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
@@ -28,27 +28,35 @@
       _monitor(monitor),
       _count(count),
       _timeout(timeout),
-      _addTime(Util::currentTime()),
-      _success(false),
       _done(false) {}
 
     void run() {
 
-      _startTime = Util::currentTime();
-      
       Monitor sleep;
 
       {Synchronized s(sleep);
 
-	sleep.wait(_timeout);
-      }
+	long long time00 = Util::currentTime();
 
-      _endTime = Util::currentTime();
+	sleep.wait(_timeout);
+
+	long long time01 = Util::currentTime();
+
+	double error = ((time01 - time00) - _timeout) / (double)_timeout;
+	
+	if(error < 0.0) {
+	  
+	  error*= -1.0;
+	}
+
+	if(error > .10) {
+	  
+	  assert(false);
+	}
+      }
 
       _done = true;
       
-      _success = true;
-
       {Synchronized s(_monitor);
 
 	// std::cout << "Thread " << _count << " completed " << std::endl;
@@ -65,17 +73,13 @@
     Monitor& _monitor;
     size_t& _count;
     long long _timeout;
-    long long _addTime;
-    long long _startTime;
-    long long _endTime;
-    bool _success;
     bool _done;
   };
 
   /** Dispatch count tasks, each of which blocks for timeout milliseconds then completes.
       Verify that all tasks completed and that thread manager cleans up properly on delete. */
 
-  bool test00(size_t count=100, long long timeout=100LL, size_t workerCount=4) {
+  bool loadTest(size_t count=100, long long timeout=100LL, size_t workerCount=4) {
 
     Monitor monitor;
 
@@ -84,6 +88,8 @@
     ThreadManager* threadManager = ThreadManager::newSimpleThreadManager(workerCount);
       
     threadManager->threadFactory(new PosixThreadFactory());
+
+    threadManager->start();
       
     std::set<ThreadManagerTests::Task*> tasks;
 
@@ -92,6 +98,8 @@
       tasks.insert(new ThreadManagerTests::Task(monitor, activeCount, timeout));
     }
 
+    long long time00 = Util::currentTime();
+
     for(std::set<ThreadManagerTests::Task*>::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
 
 	threadManager->add(*ix);
@@ -105,19 +113,27 @@
       }
     }
 
-    bool success;
+    long long time01 = Util::currentTime();
 
     for(std::set<ThreadManagerTests::Task*>::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
 
-      success = success || (*ix)->_success;
-
       delete *ix;
       
     }
 
+    double expectedTime = ((count + (workerCount - 1)) / workerCount) * timeout;
+
+    double error = ((time01 - time00) - expectedTime) / expectedTime;
+
+    if(error < 0) {
+      error*= -1.0;
+    }
+
+    bool success = error < .10;
+
     delete threadManager;
 
-    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "!" << std::endl;
+    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl;
 
     return true;
   }