Bring up of thread manager

facebook::thrift::concurrency::test.ThreadManagerTest::test00
	Launch N tasks that block for time T, verify they all complete and that the thread manager cleans up properly
	when it goes out of scope
	


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664725 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc
index bac122d..3587c03 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cc
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc
@@ -92,7 +92,7 @@
     }
   }
 
-  const Runnable* runnable() const {return _runnable;}
+  Runnable* runnable() const {return _runnable;}
 
 };
 
diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h
index 3fc094d..8237091 100644
--- a/lib/cpp/src/concurrency/Thread.h
+++ b/lib/cpp/src/concurrency/Thread.h
@@ -43,7 +43,7 @@
 
   /** Gets the runnable object this thread is hosting */
   
-  virtual const Runnable* runnable() const = 0;
+  virtual Runnable* runnable() const = 0;
 };
 
 /** Factory to create platform-specific thread object and bind them to Runnable object for execution */
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
index d13ce7b..f8a5c22 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cc
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -1,4 +1,5 @@
 #include "ThreadManager.h"
+#include "Exception.h"
 #include "Monitor.h"
 
 #include <assert.h>
@@ -10,12 +11,7 @@
 /** ThreadManager class
     
     This class manages a pool of threads.  It uses a ThreadFactory to create threads.  It never actually creates or destroys worker threads, rather
-    it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times and informs the
-    PoolPolicy object bound to instances of this manager of interesting transitions.  It is then up the PoolPolicy object to decide if the thread pool
-    size needs to be adjusted and call this object addThread and removeThread methods to make changes.
-
-    This design allows different policy implementations to used this code to handle basic worker thread management and worker task execution and focus on
-    policy issues.  The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads.
+    it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times.
 
     @author marc
     @version $Id:$ */
@@ -24,34 +20,18 @@
 
  public:
 
-  Impl(size_t highWatermark, size_t lowWatermark) : 
-    _hiwat(highWatermark), 
-    _lowat(lowWatermark) {
+  Impl() : _stopped(false) {}
+
+
+
+  ~Impl() {
+
+    if(!_stopped) {
+      stop();
+    }
   }
 
-  ~Impl() {}
-
-  size_t highWatermark() const {return _hiwat;}
-
-  void highWatermark(size_t value) {_hiwat = value;}
-
-  size_t lowWatermark() const {return _lowat;}
-
-  void lowWatermark(size_t value) {_lowat = value;}
-
-  const PoolPolicy* poolPolicy() const {
-
-    Synchronized s(_monitor); 
-  
-    return _poolPolicy;
-  }
-  
-  void poolPolicy(const PoolPolicy*  value) {
-
-    Synchronized s(_monitor); 
-
-    _poolPolicy = value;
-  }
+  void stop();
 
   const ThreadFactory* threadFactory() const {
 
@@ -67,9 +47,9 @@
     _threadFactory = value;
   }
 
-  void addThread(size_t value);
+  void addWorker(size_t value);
   
-  void removeThread(size_t value);
+  void removeWorker(size_t value);
   
   size_t idleWorkerCount() const {return _idleCount;}
 
@@ -77,7 +57,7 @@
 
     Synchronized s(_monitor); 
 
-    return _workers.size();
+    return _workerCount;
   }
   
   size_t pendingTaskCount() const {
@@ -91,7 +71,7 @@
 
     Synchronized s(_monitor); 
     
-    return _tasks.size() + _workers.size() - _idleCount;
+    return _tasks.size() + _workerCount - _idleCount;
   }
   
   void add(Runnable* value);
@@ -100,13 +80,11 @@
 
 private:
 
-  size_t _hiwat;
-
-  size_t _lowat;
+  size_t _workerCount;
 
   size_t _idleCount;
 
-  const PoolPolicy* _poolPolicy;;
+  bool _stopped;
 
   const ThreadFactory* _threadFactory;
 
@@ -148,6 +126,8 @@
  private:
 
   Runnable* _runnable;
+  
+  friend class ThreadManager::Worker;
 
   STATE _state;
 };
@@ -181,6 +161,10 @@
       if(_state == STARTING) {
 	_state = STARTED;
       }
+
+      _manager->_workerCount++;
+
+      _manager->_monitor.notify();
     }
 
     do {
@@ -207,22 +191,43 @@
 	}
 	
 	if(_state == STARTED) {
+
+	  if(!_manager->_tasks.empty()) {
 	  
-	  task = _manager->_tasks.front();
+	    task = _manager->_tasks.front();
+
+	    _manager->_tasks.pop();
+
+	    if(task->_state == ThreadManager::Task::WAITING) {
+
+	      task->_state = ThreadManager::Task::EXECUTING;
+	    }
+	  }
 	}
       }
 
       if(task != NULL) {
 
-	task->run();
+	if(task->_state == ThreadManager::Task::EXECUTING) {
+	  try {
+	    
+	    task->run();
 
-	delete task;
+	  } catch(...) {
+	    
+	    // XXX need to log this
+	  }
+	  
+	  delete task;
+	}
       }
-
+      
     } while(_state == STARTED);
 
     {Synchronized s(_manager->_monitor);
 
+      _manager->_workerCount--;
+
       if(_state == STOPPING) {
 
 	_state = STOPPED; 
@@ -246,35 +251,56 @@
   bool _idle;
 };
 
-void ThreadManager::Impl::addThread(size_t value) {
+void ThreadManager::Impl::addWorker(size_t value) {
     
-    std::set<Thread*> newThreads;
+  std::set<Thread*> newThreads;
   
-    for(size_t ix = 0; ix < value; ix++) {
+  for(size_t ix = 0; ix < value; ix++) {
 
-      class ThreadManager::Worker;
+    class ThreadManager::Worker;
       
-      ThreadManager::Worker* worker = new ThreadManager::Worker(this);
+    ThreadManager::Worker* worker = new ThreadManager::Worker(this);
 
-      newThreads.insert(_threadFactory->newThread(worker));
-    }
-  
-    for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
-
-      (*ix)->start();
-    }
-    for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
-
-      (*ix)->start();
-    }
-    
-    {Synchronized s(_monitor); 
-      
-      _workers.insert(newThreads.begin(), newThreads.end());
-    }
+    newThreads.insert(_threadFactory->newThread(worker));
   }
   
-void ThreadManager::Impl::removeThread(size_t value) {
+  for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
+
+    ThreadManager::Worker* worker = (ThreadManager::Worker*)(*ix)->runnable();
+
+    worker->_state = ThreadManager::Worker::STARTING;
+
+    (*ix)->start();
+  }
+
+  {Synchronized s(_monitor); 
+      
+    _workers.insert(newThreads.begin(), newThreads.end());
+
+    while(_workerCount != _workers.size()) {
+      _monitor.wait();
+    }
+  }
+}
+
+void ThreadManager::Impl::stop() {
+
+  bool doStop = false;
+
+  {Synchronized s(_monitor); 
+
+    if(!_stopped) {
+      doStop = true;
+      _stopped = true;
+    }
+  }
+
+  if(doStop) {
+    removeWorker(_workerCount);
+  }
+}
+  
+void ThreadManager::Impl::removeWorker(size_t value) {
 
   std::set<Thread*> removedThreads;
 
@@ -285,14 +311,19 @@
 	 First time through, (idleOnly == 1) just look for idle threads.  If that didn't find enough, go through again (idleOnly == 0)
 	 and remove a sufficient number of busy threads. */
 
-      for(int idleOnly = 1; idleOnly <= 0; idleOnly--) {
+      for(int idleOnly = 1; idleOnly >= 0; idleOnly--) {
       
 	for(std::set<Thread*>::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) {
 
 	  Worker* worker = (Worker*)(*workerThread)->runnable();
 
 	  if(worker->_idle || !idleOnly) {
-	
+
+	    if(worker->_state == ThreadManager::Worker::STARTED) {
+
+	      worker->_state = ThreadManager::Worker::STOPPING;
+	    }
+
 	    removedThreads.insert(*workerThread);
 	
 	    _workers.erase(workerThread);
@@ -320,15 +351,17 @@
 
     Synchronized s(_monitor); 
 
+    bool isEmpty = _tasks.empty();
+
     _tasks.push(new ThreadManager::Task(value));
 
-    /* If queue is empty notify a thread, otherwise all worker threads are running and will get around to this
+    /* If queue was empty notify a thread, otherwise all worker threads are running and will get around to this
        task in time. */
 
-    if(_tasks.size() == 1) {
+    if(isEmpty) {
 
-      assert(_idleCount == _workers.size());
-
+      assert(_idleCount == _workerCount);
+      
       _monitor.notify();
     }
   }
@@ -336,39 +369,54 @@
 void ThreadManager::Impl::remove(Runnable* task) {
 
     Synchronized s(_monitor); 
-  }
-
-ThreadManager* ThreadManager::newThreadManager(size_t lowWatermark, size_t highWatermark) {
-  return new ThreadManager::Impl(lowWatermark, highWatermark);
 }
 
-/**  Basic Pool Policy Implementation */
+class SimpleThreadManager : public ThreadManager::Impl {
 
-class BasicPoolPolicy::Impl : public PoolPolicy {
+public:
 
- public:
+  SimpleThreadManager(size_t workerCount=4) : 
+    _workerCount(workerCount),
+    _firstTime(true) {
+  }
 
-  Impl() {}
+  void add(Runnable* task) {
 
-  ~Impl() {}
+    bool addWorkers = false;
 
-  void onEmpty(ThreadManager* source) const  {}
+    {Synchronized s(_monitor);
 
-  void onLowWatermark(ThreadManager* source) const  {}
+      if(_firstTime) {
 
-  void onHighWatermark(ThreadManager* source) const {}
+	_firstTime = false;
+
+	addWorkers = true;
+      }
+    }
+
+    if(addWorkers) {
+
+      addWorker(_workerCount);
+    }
+
+    Impl::add(task);
+  }
+
+private:
+
+  const size_t _workerCount;
+  bool _firstTime;
+  Monitor _monitor;
 };
 
-BasicPoolPolicy::BasicPoolPolicy() : _impl(new BasicPoolPolicy::Impl())  {}
 
-BasicPoolPolicy::~BasicPoolPolicy() { delete _impl;}
+ThreadManager* ThreadManager::newThreadManager() {
+   return new ThreadManager::Impl();
+}
 
-void BasicPoolPolicy::onEmpty(ThreadManager* source) const {_impl->onEmpty(source);}
-
-void BasicPoolPolicy::onLowWatermark(ThreadManager* source) const {_impl->onLowWatermark(source);}
-
-void BasicPoolPolicy::onHighWatermark(ThreadManager* source) const {_impl->onHighWatermark(source);}
-
+ThreadManager* ThreadManager::newSimpleThreadManager() {
+   return new SimpleThreadManager();
+}
 
 }}} // facebook::thrift::concurrency
 
diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h
index 352afb9..3b87c21 100644
--- a/lib/cpp/src/concurrency/ThreadManager.h
+++ b/lib/cpp/src/concurrency/ThreadManager.h
@@ -14,53 +14,12 @@
 
 class ThreadManager;
 
-/** PoolPolicy class 
-
-    Tracks performance of ThreadManager object and makes desired changes in thread pool count if any. */
-
-class PoolPolicy {
-
- public:
-
-  PoolPolicy() {}
-
-  virtual ~PoolPolicy() {}
-
-  virtual void onEmpty(ThreadManager* source) const = 0;
-
-  virtual void onLowWatermark(ThreadManager* source) const = 0;
-
-  virtual void onHighWatermark(ThreadManager* source) const = 0;
-
-};
-
-class BasicPoolPolicy : public PoolPolicy {
-
- public:
-
-  BasicPoolPolicy();
-
-  virtual ~BasicPoolPolicy();
-
-  virtual void onEmpty(ThreadManager* source) const;
-
-  virtual void onLowWatermark(ThreadManager* source) const;
-
-  virtual void onHighWatermark(ThreadManager* source) const;
-
- private:
-
-  class Impl;
-
-  Impl* _impl;
-};
-
 /** ThreadManager class
     
     This class manages a pool of threads.  It uses a ThreadFactory to create threads.  It never actually creates or destroys worker threads, rather
     it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times and informs the
     PoolPolicy object bound to instances of this manager of interesting transitions.  It is then up the PoolPolicy object to decide if the thread pool
-    size needs to be adjusted and call this object addThread and removeThread methods to make changes.
+    size needs to be adjusted and call this object addWorker and removeWorker methods to make changes.
 
     This design allows different policy implementations to used this code to handle basic worker thread management and worker task execution and focus on
     policy issues.  The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads. */
@@ -69,29 +28,17 @@
 
  public:
 
-  ThreadManager(size_t highWatermark=4, size_t lowWatermark=2) {};
+  ThreadManager() {}
 
-  virtual ~ThreadManager() {};
-
-  virtual const PoolPolicy* poolPolicy() const = 0;
-
-  virtual void poolPolicy(const PoolPolicy* value) = 0;
+  virtual ~ThreadManager() {}
 
   virtual const ThreadFactory* threadFactory() const = 0;
 
   virtual void threadFactory(const ThreadFactory* value) = 0;
 
-  virtual size_t highWatermark() const = 0;
+  virtual void addWorker(size_t value=1) = 0;
 
-  virtual void highWatermark(size_t value) = 0;
-
-  virtual size_t lowWatermark() const = 0;
-
-  virtual void lowWatermark(size_t value) = 0;
-
-  virtual void addThread(size_t value=1) = 0;
-
-  virtual void removeThread(size_t value=1) = 0;
+  virtual void removeWorker(size_t value=1) = 0;
 
   /** Gets the current number of idle worker threads */
 
@@ -117,7 +64,9 @@
 
   virtual void remove(Runnable* task) = 0;
 
-  static ThreadManager* newThreadManager(size_t lowWatermark=2, size_t highWatermark=4);
+  static ThreadManager* newThreadManager();
+
+  static ThreadManager* newSimpleThreadManager();
 
   class Task;
   
diff --git a/lib/cpp/src/concurrency/TimerManager.cc b/lib/cpp/src/concurrency/TimerManager.cc
index 93b0dc5..7952122 100644
--- a/lib/cpp/src/concurrency/TimerManager.cc
+++ b/lib/cpp/src/concurrency/TimerManager.cc
@@ -251,7 +251,6 @@
 
     delete _dispatcher;
   }
-
 }
 
 const ThreadFactory* TimerManager::threadFactory() const {
diff --git a/lib/cpp/src/concurrency/TimerManager.h b/lib/cpp/src/concurrency/TimerManager.h
index 2681a5a..2b92f23 100644
--- a/lib/cpp/src/concurrency/TimerManager.h
+++ b/lib/cpp/src/concurrency/TimerManager.h
@@ -66,11 +66,11 @@
   virtual void remove(Runnable* task);
 
   enum STATE {
-    UNINITIALIZED = 1000,
-    STARTING = 1001,
-    STARTED = 1002,
-    STOPPING = 1003,
-    STOPPED = 1004
+    UNINITIALIZED,
+    STARTING,
+    STARTED,
+    STOPPING,
+    STOPPED
   };
   
   virtual const STATE state() const;
diff --git a/lib/cpp/src/concurrency/test/Tests.cc b/lib/cpp/src/concurrency/test/Tests.cc
index 36f29ff..02e1724 100644
--- a/lib/cpp/src/concurrency/test/Tests.cc
+++ b/lib/cpp/src/concurrency/test/Tests.cc
@@ -3,6 +3,7 @@
 
 #include "ThreadFactoryTests.h"
 #include "TimerManagerTests.h"
+#include "ThreadManagerTests.h"
 
 int main(int argc, char** argv) {
 
@@ -48,5 +49,16 @@
 
     assert(timerManagerTests.test00());
   }
+
+  if(runAll || arg.compare("thread-manager") == 0) {
+
+    std::cout << "ThreadManager tests..." << std::endl;
+
+    std::cout << "\t\tThreadManager test00" << std::endl;
+
+    ThreadManagerTests threadManagerTests;
+
+    assert(threadManagerTests.test00());
+  }
 }
 
diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.h b/lib/cpp/src/concurrency/test/TimerManagerTests.h
index f34f8b0..3c7fc0b 100644
--- a/lib/cpp/src/concurrency/test/TimerManagerTests.h
+++ b/lib/cpp/src/concurrency/test/TimerManagerTests.h
@@ -119,5 +119,3 @@
 
 }}}} // facebook::thrift::concurrency
 
-using namespace facebook::thrift::concurrency::test;
-