More test code added...
more bugs found
facebook::thrift::concurrency::ThreadManager::add
Fixed dispatch error that resulted in only one of N worker threads ever getting notified of work
facebook::thrift::concurrency::ThreadManager
Cleaned up addWorker/removeWorker and stop logic so that adding/removing workers doesn't wake up
all blocked workers.
facebook::thrift::concurrency::Thread
facebook::thrift::concurrency::Runnable
Fixed initialization logic so that runnable can return the thread that runs it
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664729 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
index 0aa4bef..ca2bbb5 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cc
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -8,6 +8,7 @@
namespace facebook { namespace thrift { namespace concurrency {
+
/** ThreadManager class
This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather
@@ -20,19 +21,18 @@
public:
- Impl() : _stopped(false) {}
+ Impl() : _state(ThreadManager::UNINITIALIZED) {}
+ ~Impl() {stop();}
-
- ~Impl() {
-
- if(!_stopped) {
- stop();
- }
- }
+ void start();
void stop();
+ const ThreadManager::STATE state() const {
+ return _state;
+ };
+
const ThreadFactory* threadFactory() const {
Synchronized s(_monitor);
@@ -82,9 +82,11 @@
size_t _workerCount;
+ size_t _workerMaxCount;
+
size_t _idleCount;
- bool _stopped;
+ ThreadManager::STATE _state;
const ThreadFactory* _threadFactory;
@@ -94,9 +96,13 @@
Monitor _monitor;
+ Monitor _workerMonitor;
+
friend class ThreadManager::Worker;
std::set<Thread*> _workers;
+
+ std::set<Thread*> _deadWorkers;
};
class ThreadManager::Task : public Runnable {
@@ -133,6 +139,7 @@
};
class ThreadManager::Worker: public Runnable {
+
enum STATE {
UNINITIALIZED,
STARTING,
@@ -142,6 +149,7 @@
};
public:
+
Worker(ThreadManager::Impl* manager) :
_manager(manager),
_state(UNINITIALIZED),
@@ -150,59 +158,94 @@
~Worker() {}
+ bool isActive() const { return _manager->_workerCount <= _manager->_workerMaxCount;}
+
/** Worker entry point
As long as worker thread is running, pull tasks off the task queue and execute. */
void run() {
+ bool active = false;
+
+ bool notifyManager = false;
+
+ /** Increment worker semaphore and notify manager if worker count reached desired max
+
+ Note
+ We have to release the monitor and acquire the workerMonitor since that is what the manager
+ blocks on for worker add/remove */
+
{Synchronized s(_manager->_monitor);
- if(_state == STARTING) {
- _state = STARTED;
+ active = _manager->_workerCount < _manager->_workerMaxCount;
+
+ if(active) {
+
+ _manager->_workerCount++;
+
+ notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
}
-
- _manager->_workerCount++;
-
- _manager->_monitor.notifyAll();
}
- do {
+ if(notifyManager) {
+
+ Synchronized s(_manager->_workerMonitor);
+
+ _manager->_workerMonitor.notify();
+
+ notifyManager = false;
+ }
+
+ while(active) {
ThreadManager::Task* task = NULL;
/* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop).
- Once the queue is non-empty, dequeue a task, release monitor, and execute. */
+ Once the queue is non-empty, dequeue a task, release monitor, and execute. If the worker max count has been decremented
+ such that we exceed it, mark ourself inactive, decrement the worker count and notify the manager (technically we're notifying
+ the next blocked thread but eventually the manager will see it. */
{Synchronized s(_manager->_monitor);
- while(_state == STARTED && _manager->_tasks.empty()) {
+ active = isActive();
+
+ while(active && _manager->_tasks.empty()) {
_manager->_idleCount++;
_idle = true;
_manager->_monitor.wait();
+
+ active = isActive();
_idle = false;
_manager->_idleCount--;
}
-
- if(_state == STARTED) {
+
+ if(active) {
if(!_manager->_tasks.empty()) {
task = _manager->_tasks.front();
_manager->_tasks.pop();
-
+
if(task->_state == ThreadManager::Task::WAITING) {
-
+
task->_state = ThreadManager::Task::EXECUTING;
}
}
+ } else {
+
+ _idle = true;
+
+ _manager->_workerCount--;
+
+ notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
}
}
@@ -212,31 +255,29 @@
try {
task->run();
-
+
} catch(...) {
// XXX need to log this
}
delete task;
+
+ task = NULL;
}
}
+ }
+
+ {Synchronized s(_manager->_workerMonitor);
+
+ _manager->_deadWorkers.insert(this->thread());
- } while(_state == STARTED);
+ if(notifyManager) {
- {Synchronized s(_manager->_monitor);
-
- _manager->_workerCount--;
-
- if(_state == STOPPING) {
-
- _state = STOPPED;
-
- _manager->_monitor.notify();
-
+ _manager->_workerMonitor.notify();
}
}
-
+
return;
}
@@ -263,6 +304,13 @@
newThreads.insert(_threadFactory->newThread(worker));
}
+
+ {Synchronized s(_monitor);
+
+ _workerMaxCount+= value;
+
+ _workers.insert(newThreads.begin(), newThreads.end());
+ }
for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
@@ -273,11 +321,33 @@
(*ix)->start();
}
- {Synchronized s(_monitor);
+ {Synchronized s(_workerMonitor);
- _workers.insert(newThreads.begin(), newThreads.end());
+ while(_workerCount != _workerMaxCount) {
+ _workerMonitor.wait();
+ }
+ }
+}
- while(_workerCount != _workers.size()) {
+void ThreadManager::Impl::start() {
+
+ if(_state == ThreadManager::STOPPED) {
+ return;
+ }
+
+ {Synchronized s(_monitor);
+
+ if(_state == ThreadManager::UNINITIALIZED) {
+
+ if(_threadFactory == NULL) {throw InvalidArgumentException();}
+
+ _state = ThreadManager::STARTED;
+
+ _monitor.notifyAll();
+ }
+
+ while(_state == STARTING) {
+
_monitor.wait();
}
}
@@ -287,78 +357,90 @@
bool doStop = false;
+ if(_state == ThreadManager::STOPPED) {
+ return;
+ }
+
{Synchronized s(_monitor);
- if(!_stopped) {
+ if(!_state != ThreadManager::STOPPING && _state != ThreadManager::STOPPED) {
+
doStop = true;
- _stopped = true;
+
+ _state = ThreadManager::STOPPING;
}
}
if(doStop) {
+
removeWorker(_workerCount);
+
+ _state = ThreadManager::STOPPING;
}
+
+ // Don't block for stopping->stopped transition here, since if stop is being performed in context of a delete, the monitor may be invalid
+
}
void ThreadManager::Impl::removeWorker(size_t value) {
std::set<Thread*> removedThreads;
- {Synchronized s(_monitor);
-
- /* Overly clever loop
-
- First time through, (idleOnly == 1) just look for idle threads. If that didn't find enough, go through again (idleOnly == 0)
- and remove a sufficient number of busy threads. */
+ {Synchronized s(_monitor);
- for(int idleOnly = 1; idleOnly >= 0; idleOnly--) {
+ if(value > _workerMaxCount) {
+
+ throw InvalidArgumentException();
+ }
+
+ _workerMaxCount-= value;
+
+ if(_idleCount < value) {
- for(std::set<Thread*>::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) {
+ for(size_t ix = 0; ix < _idleCount; ix++) {
- Worker* worker = (Worker*)(*workerThread)->runnable();
-
- if(worker->_idle || !idleOnly) {
-
- if(worker->_state == ThreadManager::Worker::STARTED) {
-
- worker->_state = ThreadManager::Worker::STOPPING;
- }
-
- removedThreads.insert(*workerThread);
-
- _workers.erase(workerThread);
- }
- }
+ _monitor.notify();
}
-
+ } else {
+
_monitor.notifyAll();
}
-
-
- // Join removed threads and free worker
-
- for(std::set<Thread*>::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) {
-
- Worker* worker = (Worker*)(*workerThread)->runnable();
-
- (*workerThread)->join();
-
- delete worker;
- }
}
+
+ {Synchronized s(_workerMonitor);
+
+ while(_workerCount != _workerMaxCount) {
+ _workerMonitor.wait();
+ }
+
+ for(std::set<Thread*>::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) {
+
+ _workers.erase(*ix);
+
+ delete (*ix)->runnable();
+
+ delete (*ix);
+ }
+
+ _deadWorkers.clear();
+ }
+}
void ThreadManager::Impl::add(Runnable* value) {
Synchronized s(_monitor);
- bool isEmpty = _tasks.empty();
+ if(_state != ThreadManager::STARTED) {
+
+ throw IllegalStateException();
+ }
_tasks.push(new ThreadManager::Task(value));
- /* If queue was empty notify a thread, otherwise all worker threads are running and will get around to this
+ /* If idle thread is available notify it, otherwise all worker threads are running and will get around to this
task in time. */
- if(isEmpty && _idleCount > 0) {
+ if(_idleCount > 0) {
_monitor.notify();
}
@@ -367,6 +449,11 @@
void ThreadManager::Impl::remove(Runnable* task) {
Synchronized s(_monitor);
+
+ if(_state != ThreadManager::STARTED) {
+
+ throw IllegalStateException();
+ }
}
class SimpleThreadManager : public ThreadManager::Impl {
@@ -378,26 +465,10 @@
_firstTime(true) {
}
- void add(Runnable* task) {
+ void start() {
+ ThreadManager::Impl::start();
- bool addWorkers = false;
-
- {Synchronized s(_monitor);
-
- if(_firstTime) {
-
- _firstTime = false;
-
- addWorkers = true;
- }
- }
-
- if(addWorkers) {
-
- addWorker(_workerCount);
- }
-
- Impl::add(task);
+ addWorker(_workerCount);
}
private: