Added thread factory test - problems in thread
Fixed stupid typo in TimerManager::start
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664723 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/Monitor.cc b/lib/cpp/src/concurrency/Monitor.cc
index a2e4276..f6144ba 100644
--- a/lib/cpp/src/concurrency/Monitor.cc
+++ b/lib/cpp/src/concurrency/Monitor.cc
@@ -1,8 +1,12 @@
#include "Monitor.h"
+#include "Exception.h"
#include "Util.h"
#include <assert.h>
#include <errno.h>
+
+#include <iostream>
+
#include <pthread.h>
@@ -57,19 +61,19 @@
if(timeout == 0LL) {
- pthread_cond_wait(&_pthread_cond, &_pthread_mutex);
+ assert(pthread_cond_wait(&_pthread_cond, &_pthread_mutex) == 0);
} else {
struct timespec abstime;
- Util::toAbsoluteTimespec(abstime, timeout);
-
int result = pthread_cond_timedwait(&_pthread_cond, &_pthread_mutex, &abstime);
if(result == ETIMEDOUT) {
- // XXX If result is timeout need to throw timeout exception
+ // XXX Add assert once currentTime is fixed to have ms resolution or better
+
+ // assert(Util::currentTime() >= (now + timeout));
}
}
}
@@ -101,7 +105,7 @@
Monitor::Monitor() : _impl(new Monitor::Impl()) {}
- Monitor::~Monitor() { delete _impl;}
+Monitor::~Monitor() { delete _impl;}
void Monitor::lock() const {_impl->lock();}
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc
index e7f84cd..bac122d 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cc
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc
@@ -22,6 +22,8 @@
static const int MB = 1024 * 1024;
+ static void* threadMain(void* arg);
+
private:
pthread_t _pthread;
@@ -36,26 +38,6 @@
Runnable* _runnable;
- static void* threadMain(void* arg) {
-
- // XXX need a lock here when testing thread state
-
- PthreadThread* thread = (PthreadThread*)arg;
-
- if(thread->_state != starting) {
- return (void*)0;
- }
-
- thread->_state = starting;
-
- thread->_runnable->run();
-
- if(thread->_state != stopping && thread->_state != stopped) {
- thread->_state = stopping;
- }
-
- return (void*)0;
- }
public:
@@ -95,9 +77,9 @@
// Set thread priority
- assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
+ // assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
- assert(pthread_create(&_pthread, &thread_attr, PthreadThread::threadMain, (void*)this) == 0);
+ assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)this) == 0);
}
void join() {
@@ -114,6 +96,26 @@
};
+void* PthreadThread::threadMain(void* arg) {
+ // XXX need a lock here when testing thread state
+
+ PthreadThread* thread = (PthreadThread*)arg;
+
+ if(thread->_state != starting) {
+ return (void*)0;
+ }
+
+ thread->_state = starting;
+
+ thread->_runnable->run();
+
+ if(thread->_state != stopping && thread->_state != stopped) {
+ thread->_state = stopping;
+ }
+
+ return (void*)0;
+}
+
/** POSIX Thread factory implementation */
class PosixThreadFactory::Impl {
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
index b5d02e6..d13ce7b 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cc
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -176,7 +176,7 @@
void run() {
- {Synchronized(_manager->_monitor);
+ {Synchronized s(_manager->_monitor);
if(_state == STARTING) {
_state = STARTED;
@@ -191,7 +191,7 @@
Once the queue is non-empty, dequeue a task, release monitor, and execute. */
- {Synchronized(_manager->_monitor);
+ {Synchronized s(_manager->_monitor);
while(_state == STARTED && _manager->_tasks.empty()) {
@@ -221,7 +221,7 @@
} while(_state == STARTED);
- {Synchronized(_manager->_monitor);
+ {Synchronized s(_manager->_monitor);
if(_state == STOPPING) {
diff --git a/lib/cpp/src/concurrency/TimerManager.cc b/lib/cpp/src/concurrency/TimerManager.cc
index bd68264..015ffba 100644
--- a/lib/cpp/src/concurrency/TimerManager.cc
+++ b/lib/cpp/src/concurrency/TimerManager.cc
@@ -1,8 +1,9 @@
#include "TimerManager.h"
+#include "Exception.h"
#include "Util.h"
#include <assert.h>
-
+#include <iostream>
#include <set>
namespace facebook { namespace thrift { namespace concurrency {
@@ -43,24 +44,19 @@
Runnable* _runnable;
+ class TimerManager::Dispatcher;
+
+ friend class TimerManager::Dispatcher;
+
STATE _state;
};
class TimerManager::Dispatcher: public Runnable {
- enum STATE {
- UNINITIALIZED,
- STARTING,
- STARTED,
- STOPPING,
- STOPPED
- };
-
public:
Dispatcher(TimerManager* manager) :
- _manager(manager),
- _state(UNINITIALIZED)
- {}
+ _manager(manager) {
+}
~Dispatcher() {}
@@ -70,10 +66,13 @@
void run() {
- {Synchronized(_manager->_monitor);
+ {Synchronized s(_manager->_monitor);
- if(_state == STARTING) {
- _state = STARTED;
+ if(_manager->_state == TimerManager::STARTING) {
+
+ _manager->_state = TimerManager::STARTED;
+
+ _manager->_monitor.notifyAll();
}
}
@@ -81,26 +80,38 @@
std::set<TimerManager::Task*> expiredTasks;
- {Synchronized(_manager->_monitor);
-
- long long now = Util::currentTime();
+ {Synchronized s(_manager->_monitor);
+
+ /* Update next timeout if necessary */
task_iterator expiredTaskEnd;
-
- while(_state == STARTED &&
- (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.end()) {
-
- _manager->_monitor.wait(_manager->_nextTimeout - now);
+
+ while(_manager->_state == TimerManager::STARTED &&
+ (expiredTaskEnd = _manager->_taskMap.upper_bound(Util::currentTime())) == _manager->_taskMap.begin()) {
+
+ long long timeout = 0LL;
+
+ if(!_manager->_taskMap.empty()) {
+
+ timeout = Util::currentTime() - _manager->_taskMap.begin()->first;
+ }
+
+ _manager->_monitor.wait(timeout);
}
- if(_state == STARTED) {
+ if(_manager->_state == TimerManager::STARTED) {
for(task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) {
TimerManager::Task* task = ix->second;
expiredTasks.insert(task);
+
+ if(task->_state == TimerManager::Task::WAITING) {
+
+ task->_state = TimerManager::Task::EXECUTING;
+ }
_manager->_taskCount--;
}
@@ -116,13 +127,13 @@
delete *ix;
}
- } while(_state == STARTED);
+ } while(_manager->_state == TimerManager::STARTED);
- {Synchronized(_manager->_monitor);
+ {Synchronized s(_manager->_monitor);
- if(_state == STOPPING) {
+ if(_manager->_state == TimerManager::STOPPING) {
- _state = STOPPED;
+ _manager->_state = TimerManager::STOPPED;
_manager->_monitor.notify();
@@ -137,13 +148,73 @@
TimerManager* _manager;
friend class TimerManager;
-
- STATE _state;
};
-TimerManager::TimerManager() {}
+TimerManager::TimerManager() :
+ _state(TimerManager::UNINITIALIZED),
+ _dispatcher(new Dispatcher(this)) {
+}
-TimerManager::~TimerManager() {}
+
+TimerManager::~TimerManager() {
+ delete _dispatcher;
+}
+
+void TimerManager::start() {
+
+ bool doStart = false;
+
+ {Synchronized s(_monitor);
+
+ if(_threadFactory == NULL) {throw InvalidArgumentException();}
+
+ if(_state == TimerManager::UNINITIALIZED) {
+
+ _state = TimerManager::STARTING;
+
+ doStart = true;
+ }
+ }
+
+ if(doStart) {
+
+ _dispatcherThread = _threadFactory->newThread(_dispatcher);
+
+ _dispatcherThread->start();
+ }
+
+ {Synchronized s(_monitor);
+
+ while(_state == TimerManager::STARTING) {
+
+ _monitor.wait();
+ }
+
+ assert(_state != TimerManager::STARTING);
+ }
+}
+
+void TimerManager::stop() {
+
+ {Synchronized s(_monitor);
+
+ if(_state == TimerManager::UNINITIALIZED) {
+
+ _state = TimerManager::STOPPED;
+
+ } else if(_state != STOPPING && _state != STOPPED) {
+
+ _state = STOPPING;
+
+ _monitor.notifyAll();
+ }
+
+ while(_state != STOPPED) {
+
+ _monitor.wait();
+ }
+ }
+}
const ThreadFactory* TimerManager::threadFactory() const {
@@ -159,6 +230,11 @@
_threadFactory = value;
}
+size_t TimerManager::taskCount() const {
+
+ return _taskCount;
+}
+
void TimerManager::add(Runnable* task, long long timeout) {
long long now = Util::currentTime();
@@ -167,6 +243,10 @@
{Synchronized s(_monitor);
+ if(_state != TimerManager::STARTED) {
+ throw IllegalStateException();
+ }
+
_taskCount++;
_taskMap.insert(std::pair<long long, Task*>(timeout, new Task(task)));
@@ -174,15 +254,10 @@
/* If the task map was empty, or if we have an expiration that is earlier than any previously seen,
kick the dispatcher so it can update its timeout */
- if(_taskCount == 1 || timeout < _nextTimeout) {
+ if(_taskCount == 1 || timeout < _taskMap.begin()->first) {
_monitor.notify();
}
-
- if(timeout < _nextTimeout) {
-
- _nextTimeout = timeout;
- }
}
}
@@ -192,20 +267,26 @@
Util::toMilliseconds(expiration, value);
- /* XXX
- Need to convert this to an explicit exception */
-
long long now = Util::currentTime();
- assert(expiration < now);
+ if(expiration < now) {
+ throw InvalidArgumentException();
+ }
add(task, expiration - now);
}
void TimerManager::remove(Runnable* task) {
+ {Synchronized s(_monitor);
+ if(_state != TimerManager::STARTED) {
+ throw IllegalStateException();
+ }
+ }
}
+const TimerManager::STATE TimerManager::state() const { return _state;}
+
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/TimerManager.h b/lib/cpp/src/concurrency/TimerManager.h
index 002460a..2681a5a 100644
--- a/lib/cpp/src/concurrency/TimerManager.h
+++ b/lib/cpp/src/concurrency/TimerManager.h
@@ -1,6 +1,7 @@
#if !defined(_concurrency_TimerManager_h_)
#define _concurrency_TimerManager_h_ 1
+#include "Exception.h"
#include "Monitor.h"
#include "Thread.h"
@@ -9,7 +10,7 @@
#include <time.h>
namespace facebook { namespace thrift { namespace concurrency {
-
+
/** Timer Manager
This class dispatches timer tasks when they fall due.
@@ -23,31 +24,56 @@
TimerManager();
- virtual ~TimerManager() = 0;
+ virtual ~TimerManager();
- virtual const ThreadFactory* threadFactory() const = 0;
+ virtual const ThreadFactory* threadFactory() const;
- virtual void threadFactory(const ThreadFactory* value) = 0;
+ virtual void threadFactory(const ThreadFactory* value);
- virtual size_t taskCount() const = 0;
+ /** Starts the timer manager service
+
+ @throws IllegalArgumentException Missing thread factory attribute */
+
+ virtual void start();
+
+ /** Stops the timer manager service */
+
+ virtual void stop();
+
+ virtual size_t taskCount() const ;
/** Adds a task to be executed at some time in the future by a worker thread.
@param task The task to execute
@param timeout Time in milliseconds to delay before executing task */
- virtual void add(Runnable* task, long long timeout) = 0;
+ virtual void add(Runnable* task, long long timeout);
/** Adds a task to be executed at some time in the future by a worker thread.
@param task The task to execute
@param timeout Absolute time in the future to execute task. */
- virtual void add(Runnable* task, const struct timespec& timeout) = 0;
+ virtual void add(Runnable* task, const struct timespec& timeout);
- /** Removes a pending task */
+ /** Removes a pending task
- virtual void remove(Runnable* task) = 0;
+ @throws NoSuchTaskException Specified task doesn't exist. It was either processed already or this call was made for a task that
+ was never added to this timer
+
+ @throws UncancellableTaskException Specified task is already being executed or has completed execution. */
+
+ virtual void remove(Runnable* task);
+
+ enum STATE {
+ UNINITIALIZED = 1000,
+ STARTING = 1001,
+ STARTED = 1002,
+ STOPPING = 1003,
+ STOPPED = 1004
+ };
+
+ virtual const STATE state() const;
private:
@@ -61,15 +87,18 @@
size_t _taskCount;
- long long _nextTimeout;
-
Monitor _monitor;
+ STATE _state;
+
class Dispatcher;
friend class Dispatcher;
Dispatcher* _dispatcher;
+
+ Thread* _dispatcherThread;
+
};
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/test/Tests.cc b/lib/cpp/src/concurrency/test/Tests.cc
new file mode 100644
index 0000000..36f29ff
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/Tests.cc
@@ -0,0 +1,52 @@
+#include <iostream>
+#include <string>
+
+#include "ThreadFactoryTests.h"
+#include "TimerManagerTests.h"
+
+int main(int argc, char** argv) {
+
+ std::string arg;
+
+ if(argc < 2) {
+
+ arg = "all";
+
+ } else {
+
+ arg = std::string(argv[1]);
+ }
+
+ bool runAll = arg.compare("all") == 0;
+
+ if(runAll || arg.compare("thread-factory") == 0) {
+
+ ThreadFactoryTests threadFactoryTests;
+
+ std::cout << "ThreadFactory tests..." << std::endl;
+
+ std::cout << "\tThreadFactory hello-world test" << std::endl;
+
+ assert(threadFactoryTests.helloWorldTest());
+
+ std::cout << "\t\tThreadFactory reap N threads test: N = 100" << std::endl;
+
+ assert(threadFactoryTests.reapNThreads(100));
+
+ std::cout << "\t\tThreadFactory synchrous start test" << std::endl;
+
+ assert(threadFactoryTests.synchStartTest());
+ }
+
+ if(runAll || arg.compare("timer-manager") == 0) {
+
+ std::cout << "TimerManager tests..." << std::endl;
+
+ std::cout << "\t\tTimerManager test00" << std::endl;
+
+ TimerManagerTests timerManagerTests;
+
+ assert(timerManagerTests.test00());
+ }
+}
+
diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
new file mode 100644
index 0000000..0d93564
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
@@ -0,0 +1,228 @@
+#include <Thread.h>
+#include <PosixThreadFactory.h>
+#include <Monitor.h>
+
+#include <assert.h>
+#include <iostream>
+#include <set>
+
+namespace facebook { namespace thrift { namespace concurrency { namespace test {
+
+using namespace facebook::thrift::concurrency;
+
+/** ThreadManagerTests class
+
+ @author marc
+ @version $Id:$ */
+
+class ThreadFactoryTests {
+
+ class Task: public Runnable {
+
+ public:
+
+ Task() {}
+
+ void run() {
+ std::cout << "\t\t\tHello World" << std::endl;
+ }
+ };
+
+public:
+
+ /** Hello world test */
+
+ bool helloWorldTest() {
+
+ PosixThreadFactory threadFactory = PosixThreadFactory();
+
+ Task* task = new ThreadFactoryTests::Task();
+
+ Thread* thread = threadFactory.newThread(task);
+
+ thread->start();
+
+ thread->join();
+
+ delete thread;
+
+ delete task;
+
+ std::cout << "\t\t\tSuccess!" << std::endl;
+
+ return true;
+ }
+
+ /** Reap N threads */
+
+ class ReapNTask: public Runnable {
+
+ public:
+
+ ReapNTask(Monitor& monitor, int& activeCount) :
+ _monitor(monitor),
+ _count(activeCount) {
+ }
+
+ void run() {
+
+ {Synchronized s(_monitor);
+
+ _count--;
+
+ //std::cout << "\t\t\tthread count: " << _count << std::endl;
+
+ if(_count == 0) {
+ _monitor.notify();
+ }
+ }
+ }
+
+ Monitor& _monitor;
+
+ int& _count;
+ };
+
+ bool reapNThreads(int count=100) {
+
+ Monitor* monitor = new Monitor();
+
+ int* activeCount = new int(count);
+
+ PosixThreadFactory threadFactory = PosixThreadFactory();
+
+ std::set<Thread*> threads;
+
+ for(int ix = 0; ix < count; ix++) {
+ threads.insert(threadFactory.newThread(new ReapNTask(*monitor, *activeCount)));
+ }
+
+ for(std::set<Thread*>::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
+
+ (*thread)->start();
+ }
+
+
+ {Synchronized s(*monitor);
+
+ while(*activeCount > 0) {
+ monitor->wait(1000);
+ }
+ }
+
+ for(std::set<Thread*>::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
+
+ delete (*thread)->runnable();
+
+ delete *thread;
+ }
+
+ std::cout << "\t\t\tSuccess!" << std::endl;
+
+ return true;
+ }
+
+ class SynchStartTask: public Runnable {
+
+ public:
+
+ enum STATE {
+ UNINITIALIZED = 1000,
+ STARTING = 1001,
+ STARTED = 1002,
+ STOPPING = 1003,
+ STOPPED = 1004
+ };
+
+ SynchStartTask(Monitor& monitor,
+ volatile STATE& state) :
+ _monitor(monitor),
+ _state(state) {
+ }
+
+ void run() {
+
+ {Synchronized s(_monitor);
+
+ if(_state == SynchStartTask::STARTING) {
+ _state = SynchStartTask::STARTED;
+ _monitor.notify();
+ }
+ }
+
+ {Synchronized s(_monitor);
+
+ while(_state == SynchStartTask::STARTED) {
+ _monitor.wait();
+ }
+
+ if(_state == SynchStartTask::STOPPING) {
+
+ _state = SynchStartTask::STOPPED;
+
+ _monitor.notifyAll();
+ }
+ }
+ }
+
+ private:
+ Monitor& _monitor;
+ volatile STATE& _state;
+ };
+
+ bool synchStartTest() {
+
+ Monitor monitor;
+
+ SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED;
+
+ SynchStartTask* task = new SynchStartTask(monitor, state);
+
+ PosixThreadFactory threadFactory = PosixThreadFactory();
+
+ Thread* thread = threadFactory.newThread(task);
+
+ if(state == SynchStartTask::UNINITIALIZED) {
+
+ state = SynchStartTask::STARTING;
+
+ thread->start();
+ }
+
+ {Synchronized s(monitor);
+
+ while(state == SynchStartTask::STARTING) {
+ monitor.wait();
+ }
+ }
+
+ assert(state != SynchStartTask::STARTING);
+
+ {Synchronized s(monitor);
+
+ monitor.wait(100);
+
+ if(state == SynchStartTask::STARTED) {
+
+ state = SynchStartTask::STOPPING;
+
+ monitor.notify();
+ }
+
+ while(state == SynchStartTask::STOPPING) {
+ monitor.wait();
+ }
+ }
+
+ assert(state == SynchStartTask::STOPPED);
+
+ return true;
+ }
+
+};
+
+
+}}}} // facebook::thrift::concurrency
+
+using namespace facebook::thrift::concurrency::test;
+
diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.cc b/lib/cpp/src/concurrency/test/TimerManagerTests.cc
deleted file mode 100644
index abd0e95..0000000
--- a/lib/cpp/src/concurrency/test/TimerManagerTests.cc
+++ /dev/null
@@ -1,33 +0,0 @@
-#include <ThreadManager.h>
-#include <PosixThreadFactory.h>
-
-#include <assert.h>
-
-namespace facebook { namespace thrift { namespace concurrency { namespace test {
-
-/** ThreadManagerTests class */
-
-
-class ThreadManagerTests {
-
- void init() {
-
- ThreadManager* threadManager = ThreadManager::newThreadManager();
-
- threadManager->poolPolicy(new BasicPoolPolicy());
-
- threadManager->threadFactory(new PosixThreadFactory());
-
- threadManager->poolPolicy(new BasicPoolPolicy());
- }
-};
-
-
-}}}} // facebook::thrift::concurrency
-
-int main(int argc, char** argv) {
-
- return 0;
-
-}
-
diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.h b/lib/cpp/src/concurrency/test/TimerManagerTests.h
new file mode 100644
index 0000000..24f7964
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/TimerManagerTests.h
@@ -0,0 +1,81 @@
+#include <TimerManager.h>
+#include <PosixThreadFactory.h>
+#include <Monitor.h>
+
+#include <assert.h>
+#include <iostream>
+
+namespace facebook { namespace thrift { namespace concurrency { namespace test {
+
+using namespace facebook::thrift::concurrency;
+
+/** ThreadManagerTests class
+
+ @author marc
+ @version $Id:$ */
+
+class TimerManagerTests {
+
+ class Task: public Runnable {
+
+ public:
+
+ Task(Monitor& monitor) :
+ _monitor(monitor),
+ _done(false) {}
+
+ void run() {
+
+ std::cout << "\t\t\tHello World" << std::endl;
+
+ _done = true;
+
+ {Synchronized s(_monitor);
+ _monitor.notifyAll();
+ }
+ }
+
+ Monitor& _monitor;
+ bool _done;
+ };
+
+public:
+
+ bool test00() {
+
+ TimerManager* timerManager = new TimerManager();
+
+ timerManager->threadFactory(new PosixThreadFactory());
+
+ timerManager->start();
+
+ assert(timerManager->state() == TimerManager::STARTED);
+
+ TimerManagerTests::Task* task = new TimerManagerTests::Task(_monitor);
+
+ {Synchronized s(_monitor);
+
+ timerManager->add(task, 1000LL);
+
+ _monitor.wait();
+ }
+
+ assert(task->_done);
+
+ delete task;
+
+ std::cout << "\t\t\tSuccess!" << std::endl;
+
+ return true;
+ }
+
+ friend class TestTask;
+
+ Monitor _monitor;
+};
+
+
+}}}} // facebook::thrift::concurrency
+
+using namespace facebook::thrift::concurrency::test;
+