Add join to the ThreadManager
Summary: Now you can join against all the threads in a ThreadManager
Reviewed By: marc, xp-style
Test Plan: Use with new ThriftServer shutdown mechanisms
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665037 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp
index 2e5472d..1631541 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/concurrency/ThreadManager.cpp
@@ -47,7 +47,9 @@
void start();
- void stop();
+ void stop() { stopImpl(false); }
+
+ void join() { stopImpl(true); }
const ThreadManager::STATE state() const {
return state_;
@@ -91,6 +93,8 @@
void remove(shared_ptr<Runnable> task);
private:
+ void stopImpl(bool join);
+
size_t workerCount_;
size_t workerMaxCount_;
size_t idleCount_;
@@ -154,10 +158,14 @@
~Worker() {}
+ private:
bool isActive() const {
- return manager_->workerCount_ <= manager_->workerMaxCount_;
+ return
+ (manager_->workerCount_ <= manager_->workerMaxCount_) ||
+ (manager_->state_ == JOINING && !manager_->tasks_.empty());
}
+ public:
/**
* Worker entry point
*
@@ -205,12 +213,13 @@
{
Synchronized s(manager_->monitor_);
active = isActive();
+
while (active && manager_->tasks_.empty()) {
manager_->idleCount_++;
- idle_ = true;
+ idle_ = true;
manager_->monitor_.wait();
active = isActive();
- idle_ = false;
+ idle_ = false;
manager_->idleCount_--;
}
@@ -223,9 +232,9 @@
}
}
} else {
- idle_ = true;
+ idle_ = true;
manager_->workerCount_--;
- notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
+ notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
}
}
@@ -309,7 +318,7 @@
}
}
-void ThreadManager::Impl::stop() {
+void ThreadManager::Impl::stopImpl(bool join) {
bool doStop = false;
if (state_ == ThreadManager::STOPPED) {
return;
@@ -317,22 +326,29 @@
{
Synchronized s(monitor_);
- if (!state_ != ThreadManager::STOPPING && state_ != ThreadManager::STOPPED) {
+ if (state_ != ThreadManager::STOPPING &&
+ state_ != ThreadManager::JOINING &&
+ state_ != ThreadManager::STOPPED) {
doStop = true;
- state_ = ThreadManager::STOPPING;
+ state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
}
}
if (doStop) {
removeWorker(workerCount_);
- state_ = ThreadManager::STOPPING;
}
// XXX
// should be able to block here for transition to STOPPED since we're no
// using shared_ptrs
+
+ {
+ Synchronized s(monitor_);
+ state_ = ThreadManager::STOPPED;
+ }
+
}
-
+
void ThreadManager::Impl::removeWorker(size_t value) {
std::set<shared_ptr<Thread> > removedThreads;
{
diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h
index 52bc75c..f7c4b3c 100644
--- a/lib/cpp/src/concurrency/ThreadManager.h
+++ b/lib/cpp/src/concurrency/ThreadManager.h
@@ -63,10 +63,16 @@
*/
virtual void stop() = 0;
+ /**
+ * Joins the thread manager. This is the same as stop, except that it will
+ * block until all the workers have finished their work. At that point
+ * the ThreadManager will transition into the STOPPED state.
+ */
enum STATE {
UNINITIALIZED,
STARTING,
STARTED,
+ JOINING,
STOPPING,
STOPPED
};