Add join to the ThreadManager

Summary: Now you can join against all the threads in a ThreadManager

Reviewed By: marc, xp-style

Test Plan: Use with new ThriftServer shutdown mechanisms


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665037 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp
index 2e5472d..1631541 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/concurrency/ThreadManager.cpp
@@ -47,7 +47,9 @@
 
   void start();
 
-  void stop();
+  void stop() { stopImpl(false); }
+
+  void join() { stopImpl(true); }
 
   const ThreadManager::STATE state() const {
     return state_;
@@ -91,6 +93,8 @@
   void remove(shared_ptr<Runnable> task);
 
 private:
+  void stopImpl(bool join);
+
   size_t workerCount_;
   size_t workerMaxCount_;
   size_t idleCount_;
@@ -154,10 +158,14 @@
 
   ~Worker() {}
 
+ private:
   bool isActive() const {
-    return manager_->workerCount_ <= manager_->workerMaxCount_;
+    return
+      (manager_->workerCount_ <= manager_->workerMaxCount_) ||
+      (manager_->state_ == JOINING && !manager_->tasks_.empty());
   }
 
+ public:
   /**
    * Worker entry point
    *
@@ -205,12 +213,13 @@
       {
         Synchronized s(manager_->monitor_);
 	active = isActive();
+
 	while (active && manager_->tasks_.empty()) {
           manager_->idleCount_++;
-	  idle_ = true;
+          idle_ = true;
           manager_->monitor_.wait();
           active = isActive();
-	  idle_ = false;
+          idle_ = false;
           manager_->idleCount_--;
 	}
 
@@ -223,9 +232,9 @@
 	    }
 	  }
 	} else {
-	  idle_ = true;  
+	  idle_ = true;
 	  manager_->workerCount_--;
-          notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
+          notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
 	}
       }
       
@@ -309,7 +318,7 @@
   }
 }
 
-void ThreadManager::Impl::stop() {
+void ThreadManager::Impl::stopImpl(bool join) {
   bool doStop = false;
   if (state_ == ThreadManager::STOPPED) {
     return;
@@ -317,22 +326,29 @@
 
   {
     Synchronized s(monitor_); 
-    if (!state_ != ThreadManager::STOPPING && state_ != ThreadManager::STOPPED) {
+    if (state_ != ThreadManager::STOPPING &&
+        state_ != ThreadManager::JOINING &&
+        state_ != ThreadManager::STOPPED) {
       doStop = true;
-      state_ = ThreadManager::STOPPING;
+      state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
     }
   }
 
   if (doStop) {
     removeWorker(workerCount_);
-    state_ = ThreadManager::STOPPING;
   }
 
   // XXX 
   // should be able to block here for transition to STOPPED since we're no
   // using shared_ptrs
+
+  {
+    Synchronized s(monitor_); 
+    state_ = ThreadManager::STOPPED;
+  }
+
 }
-  
+
 void ThreadManager::Impl::removeWorker(size_t value) {
   std::set<shared_ptr<Thread> > removedThreads;
   {
diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h
index 52bc75c..f7c4b3c 100644
--- a/lib/cpp/src/concurrency/ThreadManager.h
+++ b/lib/cpp/src/concurrency/ThreadManager.h
@@ -63,10 +63,16 @@
    */
   virtual void stop() = 0;
 
+  /**
+   * Joins the thread manager. This is the same as stop, except that it will
+   * block until all the workers have finished their work. At that point
+   * the ThreadManager will transition into the STOPPED state.
+   */
   enum STATE {
     UNINITIALIZED,
     STARTING,
     STARTED,
+    JOINING,
     STOPPING,
     STOPPED
   };