More test code added...
more bugs found
facebook::thrift::concurrency::ThreadManager::add
Fixed dispatch error that resulted in only one of N worker threads ever getting notified of work
facebook::thrift::concurrency::ThreadManager
Cleaned up addWorker/removeWorker and stop logic so that adding/removing workers doesn't wake up
all blocked workers.
facebook::thrift::concurrency::Thread
facebook::thrift::concurrency::Runnable
Fixed initialization logic so that runnable can return the thread that runs it
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664729 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc
index 3587c03..00b2613 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cc
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc
@@ -36,9 +36,6 @@
int _stackSize;
- Runnable* _runnable;
-
-
public:
PthreadThread(int policy, int priority, int stackSize, Runnable* runnable) :
@@ -46,9 +43,10 @@
_state(uninitialized),
_policy(policy),
_priority(priority),
- _stackSize(stackSize),
- _runnable(runnable)
- {}
+ _stackSize(stackSize) {
+
+ this->Thread::runnable(runnable);
+ }
void start() {
@@ -92,7 +90,9 @@
}
}
- Runnable* runnable() const {return _runnable;}
+ Runnable* runnable() const {return Thread::runnable();}
+
+ void runnable(Runnable* value) {Thread::runnable(value);}
};
@@ -107,7 +107,7 @@
thread->_state = starting;
- thread->_runnable->run();
+ thread->runnable()->run();
if(thread->_state != stopping && thread->_state != stopped) {
thread->_state = stopping;
diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h
index 8237091..2416887 100644
--- a/lib/cpp/src/concurrency/Thread.h
+++ b/lib/cpp/src/concurrency/Thread.h
@@ -17,6 +17,18 @@
virtual ~Runnable() {};
virtual void run() = 0;
+
+ virtual Thread* thread() {return _thread;}
+
+ private:
+
+ /** Sets the thread that is executing this object. This is only meant for use by concrete implementations of Thread. */
+
+ friend class Thread;
+
+ virtual void thread(Thread* value) {_thread = value;}
+
+ Thread* _thread;
};
/** Minimal thread class. Returned by thread factory bound to a Runnable object and ready to start execution. More or less analogous to java.lang.Thread
@@ -43,7 +55,15 @@
/** Gets the runnable object this thread is hosting */
- virtual Runnable* runnable() const = 0;
+ virtual Runnable* runnable() const {return _runnable;}
+
+ protected:
+
+ virtual void runnable(Runnable* value, bool x=false) {_runnable = value; _runnable->thread(this);}
+
+ private:
+
+ Runnable* _runnable;
};
/** Factory to create platform-specific thread object and bind them to Runnable object for execution */
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
index 0aa4bef..ca2bbb5 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cc
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -8,6 +8,7 @@
namespace facebook { namespace thrift { namespace concurrency {
+
/** ThreadManager class
This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather
@@ -20,19 +21,18 @@
public:
- Impl() : _stopped(false) {}
+ Impl() : _state(ThreadManager::UNINITIALIZED) {}
+ ~Impl() {stop();}
-
- ~Impl() {
-
- if(!_stopped) {
- stop();
- }
- }
+ void start();
void stop();
+ const ThreadManager::STATE state() const {
+ return _state;
+ };
+
const ThreadFactory* threadFactory() const {
Synchronized s(_monitor);
@@ -82,9 +82,11 @@
size_t _workerCount;
+ size_t _workerMaxCount;
+
size_t _idleCount;
- bool _stopped;
+ ThreadManager::STATE _state;
const ThreadFactory* _threadFactory;
@@ -94,9 +96,13 @@
Monitor _monitor;
+ Monitor _workerMonitor;
+
friend class ThreadManager::Worker;
std::set<Thread*> _workers;
+
+ std::set<Thread*> _deadWorkers;
};
class ThreadManager::Task : public Runnable {
@@ -133,6 +139,7 @@
};
class ThreadManager::Worker: public Runnable {
+
enum STATE {
UNINITIALIZED,
STARTING,
@@ -142,6 +149,7 @@
};
public:
+
Worker(ThreadManager::Impl* manager) :
_manager(manager),
_state(UNINITIALIZED),
@@ -150,59 +158,94 @@
~Worker() {}
+ bool isActive() const { return _manager->_workerCount <= _manager->_workerMaxCount;}
+
/** Worker entry point
As long as worker thread is running, pull tasks off the task queue and execute. */
void run() {
+ bool active = false;
+
+ bool notifyManager = false;
+
+ /** Increment worker semaphore and notify manager if worker count reached desired max
+
+ Note
+ We have to release the monitor and acquire the workerMonitor since that is what the manager
+ blocks on for worker add/remove */
+
{Synchronized s(_manager->_monitor);
- if(_state == STARTING) {
- _state = STARTED;
+ active = _manager->_workerCount < _manager->_workerMaxCount;
+
+ if(active) {
+
+ _manager->_workerCount++;
+
+ notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
}
-
- _manager->_workerCount++;
-
- _manager->_monitor.notifyAll();
}
- do {
+ if(notifyManager) {
+
+ Synchronized s(_manager->_workerMonitor);
+
+ _manager->_workerMonitor.notify();
+
+ notifyManager = false;
+ }
+
+ while(active) {
ThreadManager::Task* task = NULL;
/* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop).
- Once the queue is non-empty, dequeue a task, release monitor, and execute. */
+ Once the queue is non-empty, dequeue a task, release monitor, and execute. If the worker max count has been decremented
+ such that we exceed it, mark ourself inactive, decrement the worker count and notify the manager (technically we're notifying
+ the next blocked thread but eventually the manager will see it. */
{Synchronized s(_manager->_monitor);
- while(_state == STARTED && _manager->_tasks.empty()) {
+ active = isActive();
+
+ while(active && _manager->_tasks.empty()) {
_manager->_idleCount++;
_idle = true;
_manager->_monitor.wait();
+
+ active = isActive();
_idle = false;
_manager->_idleCount--;
}
-
- if(_state == STARTED) {
+
+ if(active) {
if(!_manager->_tasks.empty()) {
task = _manager->_tasks.front();
_manager->_tasks.pop();
-
+
if(task->_state == ThreadManager::Task::WAITING) {
-
+
task->_state = ThreadManager::Task::EXECUTING;
}
}
+ } else {
+
+ _idle = true;
+
+ _manager->_workerCount--;
+
+ notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
}
}
@@ -212,31 +255,29 @@
try {
task->run();
-
+
} catch(...) {
// XXX need to log this
}
delete task;
+
+ task = NULL;
}
}
+ }
+
+ {Synchronized s(_manager->_workerMonitor);
+
+ _manager->_deadWorkers.insert(this->thread());
- } while(_state == STARTED);
+ if(notifyManager) {
- {Synchronized s(_manager->_monitor);
-
- _manager->_workerCount--;
-
- if(_state == STOPPING) {
-
- _state = STOPPED;
-
- _manager->_monitor.notify();
-
+ _manager->_workerMonitor.notify();
}
}
-
+
return;
}
@@ -263,6 +304,13 @@
newThreads.insert(_threadFactory->newThread(worker));
}
+
+ {Synchronized s(_monitor);
+
+ _workerMaxCount+= value;
+
+ _workers.insert(newThreads.begin(), newThreads.end());
+ }
for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
@@ -273,11 +321,33 @@
(*ix)->start();
}
- {Synchronized s(_monitor);
+ {Synchronized s(_workerMonitor);
- _workers.insert(newThreads.begin(), newThreads.end());
+ while(_workerCount != _workerMaxCount) {
+ _workerMonitor.wait();
+ }
+ }
+}
- while(_workerCount != _workers.size()) {
+void ThreadManager::Impl::start() {
+
+ if(_state == ThreadManager::STOPPED) {
+ return;
+ }
+
+ {Synchronized s(_monitor);
+
+ if(_state == ThreadManager::UNINITIALIZED) {
+
+ if(_threadFactory == NULL) {throw InvalidArgumentException();}
+
+ _state = ThreadManager::STARTED;
+
+ _monitor.notifyAll();
+ }
+
+ while(_state == STARTING) {
+
_monitor.wait();
}
}
@@ -287,78 +357,90 @@
bool doStop = false;
+ if(_state == ThreadManager::STOPPED) {
+ return;
+ }
+
{Synchronized s(_monitor);
- if(!_stopped) {
+ if(!_state != ThreadManager::STOPPING && _state != ThreadManager::STOPPED) {
+
doStop = true;
- _stopped = true;
+
+ _state = ThreadManager::STOPPING;
}
}
if(doStop) {
+
removeWorker(_workerCount);
+
+ _state = ThreadManager::STOPPING;
}
+
+ // Don't block for stopping->stopped transition here, since if stop is being performed in context of a delete, the monitor may be invalid
+
}
void ThreadManager::Impl::removeWorker(size_t value) {
std::set<Thread*> removedThreads;
- {Synchronized s(_monitor);
-
- /* Overly clever loop
-
- First time through, (idleOnly == 1) just look for idle threads. If that didn't find enough, go through again (idleOnly == 0)
- and remove a sufficient number of busy threads. */
+ {Synchronized s(_monitor);
- for(int idleOnly = 1; idleOnly >= 0; idleOnly--) {
+ if(value > _workerMaxCount) {
+
+ throw InvalidArgumentException();
+ }
+
+ _workerMaxCount-= value;
+
+ if(_idleCount < value) {
- for(std::set<Thread*>::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) {
+ for(size_t ix = 0; ix < _idleCount; ix++) {
- Worker* worker = (Worker*)(*workerThread)->runnable();
-
- if(worker->_idle || !idleOnly) {
-
- if(worker->_state == ThreadManager::Worker::STARTED) {
-
- worker->_state = ThreadManager::Worker::STOPPING;
- }
-
- removedThreads.insert(*workerThread);
-
- _workers.erase(workerThread);
- }
- }
+ _monitor.notify();
}
-
+ } else {
+
_monitor.notifyAll();
}
-
-
- // Join removed threads and free worker
-
- for(std::set<Thread*>::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) {
-
- Worker* worker = (Worker*)(*workerThread)->runnable();
-
- (*workerThread)->join();
-
- delete worker;
- }
}
+
+ {Synchronized s(_workerMonitor);
+
+ while(_workerCount != _workerMaxCount) {
+ _workerMonitor.wait();
+ }
+
+ for(std::set<Thread*>::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) {
+
+ _workers.erase(*ix);
+
+ delete (*ix)->runnable();
+
+ delete (*ix);
+ }
+
+ _deadWorkers.clear();
+ }
+}
void ThreadManager::Impl::add(Runnable* value) {
Synchronized s(_monitor);
- bool isEmpty = _tasks.empty();
+ if(_state != ThreadManager::STARTED) {
+
+ throw IllegalStateException();
+ }
_tasks.push(new ThreadManager::Task(value));
- /* If queue was empty notify a thread, otherwise all worker threads are running and will get around to this
+ /* If idle thread is available notify it, otherwise all worker threads are running and will get around to this
task in time. */
- if(isEmpty && _idleCount > 0) {
+ if(_idleCount > 0) {
_monitor.notify();
}
@@ -367,6 +449,11 @@
void ThreadManager::Impl::remove(Runnable* task) {
Synchronized s(_monitor);
+
+ if(_state != ThreadManager::STARTED) {
+
+ throw IllegalStateException();
+ }
}
class SimpleThreadManager : public ThreadManager::Impl {
@@ -378,26 +465,10 @@
_firstTime(true) {
}
- void add(Runnable* task) {
+ void start() {
+ ThreadManager::Impl::start();
- bool addWorkers = false;
-
- {Synchronized s(_monitor);
-
- if(_firstTime) {
-
- _firstTime = false;
-
- addWorkers = true;
- }
- }
-
- if(addWorkers) {
-
- addWorker(_workerCount);
- }
-
- Impl::add(task);
+ addWorker(_workerCount);
}
private:
diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h
index 596471d..aa5a98a 100644
--- a/lib/cpp/src/concurrency/ThreadManager.h
+++ b/lib/cpp/src/concurrency/ThreadManager.h
@@ -32,6 +32,26 @@
virtual ~ThreadManager() {}
+ /** Starts the thread manager. Verifies all attributes have been properly initialized, then allocates necessary resources to begin operation */
+
+ virtual void start() = 0;
+
+ /** Stops the thread manager. Aborts all remaining unprocessed task, shuts down all created worker threads, and realeases all allocated resources.
+ This method blocks for all worker threads to complete, thus it can potentially block forever if a worker thread is running a task that
+ won't terminate. */
+
+ virtual void stop() = 0;
+
+ enum STATE {
+ UNINITIALIZED,
+ STARTING,
+ STARTED,
+ STOPPING,
+ STOPPED
+ };
+
+ virtual const STATE state() const = 0;
+
virtual const ThreadFactory* threadFactory() const = 0;
virtual void threadFactory(const ThreadFactory* value) = 0;
diff --git a/lib/cpp/src/concurrency/test/Tests.cc b/lib/cpp/src/concurrency/test/Tests.cc
index d139f55..2174bf4 100644
--- a/lib/cpp/src/concurrency/test/Tests.cc
+++ b/lib/cpp/src/concurrency/test/Tests.cc
@@ -30,13 +30,13 @@
assert(threadFactoryTests.helloWorldTest());
- size_t count = 10000;
+ size_t count = 1000;
std::cout << "\t\tThreadFactory reap N threads test: N = " << count << std::endl;
assert(threadFactoryTests.reapNThreads(count));
- std::cout << "\t\tThreadFactory synchrous start test" << std::endl;
+ std::cout << "\t\tThreadFactory synchronous start test" << std::endl;
assert(threadFactoryTests.synchStartTest());
}
@@ -56,11 +56,17 @@
std::cout << "ThreadManager tests..." << std::endl;
- std::cout << "\t\tThreadManager test00" << std::endl;
+ size_t workerCount = 10;
+
+ size_t taskCount = 10000;
+
+ long long delay = 10LL;
+
+ std::cout << "\t\tThreadManager load test: worker count: " << workerCount << " task count: " << taskCount << " delay: " << delay << std::endl;
ThreadManagerTests threadManagerTests;
- assert(threadManagerTests.test00());
+ assert(threadManagerTests.loadTest(taskCount, delay, workerCount));
}
}
diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
index aad6332..72d6777 100644
--- a/lib/cpp/src/concurrency/test/ThreadManagerTests.h
+++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
@@ -28,27 +28,35 @@
_monitor(monitor),
_count(count),
_timeout(timeout),
- _addTime(Util::currentTime()),
- _success(false),
_done(false) {}
void run() {
- _startTime = Util::currentTime();
-
Monitor sleep;
{Synchronized s(sleep);
- sleep.wait(_timeout);
- }
+ long long time00 = Util::currentTime();
- _endTime = Util::currentTime();
+ sleep.wait(_timeout);
+
+ long long time01 = Util::currentTime();
+
+ double error = ((time01 - time00) - _timeout) / (double)_timeout;
+
+ if(error < 0.0) {
+
+ error*= -1.0;
+ }
+
+ if(error > .10) {
+
+ assert(false);
+ }
+ }
_done = true;
- _success = true;
-
{Synchronized s(_monitor);
// std::cout << "Thread " << _count << " completed " << std::endl;
@@ -65,17 +73,13 @@
Monitor& _monitor;
size_t& _count;
long long _timeout;
- long long _addTime;
- long long _startTime;
- long long _endTime;
- bool _success;
bool _done;
};
/** Dispatch count tasks, each of which blocks for timeout milliseconds then completes.
Verify that all tasks completed and that thread manager cleans up properly on delete. */
- bool test00(size_t count=100, long long timeout=100LL, size_t workerCount=4) {
+ bool loadTest(size_t count=100, long long timeout=100LL, size_t workerCount=4) {
Monitor monitor;
@@ -84,6 +88,8 @@
ThreadManager* threadManager = ThreadManager::newSimpleThreadManager(workerCount);
threadManager->threadFactory(new PosixThreadFactory());
+
+ threadManager->start();
std::set<ThreadManagerTests::Task*> tasks;
@@ -92,6 +98,8 @@
tasks.insert(new ThreadManagerTests::Task(monitor, activeCount, timeout));
}
+ long long time00 = Util::currentTime();
+
for(std::set<ThreadManagerTests::Task*>::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
threadManager->add(*ix);
@@ -105,19 +113,27 @@
}
}
- bool success;
+ long long time01 = Util::currentTime();
for(std::set<ThreadManagerTests::Task*>::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
- success = success || (*ix)->_success;
-
delete *ix;
}
+ double expectedTime = ((count + (workerCount - 1)) / workerCount) * timeout;
+
+ double error = ((time01 - time00) - expectedTime) / expectedTime;
+
+ if(error < 0) {
+ error*= -1.0;
+ }
+
+ bool success = error < .10;
+
delete threadManager;
- std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "!" << std::endl;
+ std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl;
return true;
}