More bullet proofing of timer manager
facebook::thrift::concurrency::TimerManager::stop
Added proper cleanup of unprocessed tasks and shutdown of dispatcher thread to stop
facebook::thrift::concurrency::TimerManager::~TimerManager
Call stop if manager wasn't explicitly stopped
facebook::thrift::concurrency::test.TimerManagerTest
Calculate error margin for timeout expiration and verify it's within bounds
Verify manager stops properly when it goes out of scope
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664724 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/Monitor.cc b/lib/cpp/src/concurrency/Monitor.cc
index f6144ba..b1d7b72 100644
--- a/lib/cpp/src/concurrency/Monitor.cc
+++ b/lib/cpp/src/concurrency/Monitor.cc
@@ -59,6 +59,8 @@
// XXX Need to assert that caller owns mutex
+ assert(timeout >= 0LL);
+
if(timeout == 0LL) {
assert(pthread_cond_wait(&_pthread_cond, &_pthread_mutex) == 0);
@@ -67,13 +69,15 @@
struct timespec abstime;
+ long long now = Util::currentTime();
+
+ Util::toTimespec(abstime, now + timeout);
+
int result = pthread_cond_timedwait(&_pthread_cond, &_pthread_mutex, &abstime);
if(result == ETIMEDOUT) {
- // XXX Add assert once currentTime is fixed to have ms resolution or better
-
- // assert(Util::currentTime() >= (now + timeout));
+ assert(Util::currentTime() >= (now + timeout));
}
}
}
diff --git a/lib/cpp/src/concurrency/TimerManager.cc b/lib/cpp/src/concurrency/TimerManager.cc
index 015ffba..93b0dc5 100644
--- a/lib/cpp/src/concurrency/TimerManager.cc
+++ b/lib/cpp/src/concurrency/TimerManager.cc
@@ -82,22 +82,25 @@
{Synchronized s(_manager->_monitor);
- /* Update next timeout if necessary */
-
task_iterator expiredTaskEnd;
+ long long now = Util::currentTime();
+
while(_manager->_state == TimerManager::STARTED &&
- (expiredTaskEnd = _manager->_taskMap.upper_bound(Util::currentTime())) == _manager->_taskMap.begin()) {
+ (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.begin()) {
long long timeout = 0LL;
if(!_manager->_taskMap.empty()) {
- timeout = Util::currentTime() - _manager->_taskMap.begin()->first;
+ timeout = _manager->_taskMap.begin()->first - now;
}
+
+ assert((timeout != 0 && _manager->_taskCount > 0) || (timeout == 0 && _manager->_taskCount == 0));
_manager->_monitor.wait(timeout);
-
+
+ now = Util::currentTime();
}
if(_manager->_state == TimerManager::STARTED) {
@@ -151,13 +154,29 @@
};
TimerManager::TimerManager() :
+ _taskCount(0),
_state(TimerManager::UNINITIALIZED),
_dispatcher(new Dispatcher(this)) {
}
TimerManager::~TimerManager() {
- delete _dispatcher;
+
+ /* If we haven't been explicitly stopped, do so now. We don't need to grab the monitor here, since
+ stop already takes care of reentrancy. */
+
+ if(_state != STOPPED) {
+
+ try {
+
+ stop();
+
+ } catch(...) {
+
+ // uhoh
+
+ }
+ }
}
void TimerManager::start() {
@@ -196,6 +215,8 @@
void TimerManager::stop() {
+ bool doStop = false;
+
{Synchronized s(_monitor);
if(_state == TimerManager::UNINITIALIZED) {
@@ -204,6 +225,8 @@
} else if(_state != STOPPING && _state != STOPPED) {
+ doStop = true;
+
_state = STOPPING;
_monitor.notifyAll();
@@ -214,6 +237,21 @@
_monitor.wait();
}
}
+
+ if(doStop) {
+
+ // Clean up any outstanding tasks
+
+ for(task_iterator ix = _taskMap.begin(); ix != _taskMap.end(); ix++) {
+
+ delete ix->second;
+
+ _taskMap.erase(ix);
+ }
+
+ delete _dispatcher;
+ }
+
}
const ThreadFactory* TimerManager::threadFactory() const {
diff --git a/lib/cpp/src/concurrency/Util.h b/lib/cpp/src/concurrency/Util.h
index 3ab8929..d04435d 100644
--- a/lib/cpp/src/concurrency/Util.h
+++ b/lib/cpp/src/concurrency/Util.h
@@ -2,7 +2,7 @@
#define _concurrency_Util_h_ 1
#include <assert.h>
-#include <time.h>
+#include <sys/time.h>
namespace facebook { namespace thrift { namespace concurrency {
@@ -19,45 +19,34 @@
public:
- /** Converts relative timeout specified as a duration in milliseconds to a struct timespec structure
- specifying current time plus timeout
+ /** Converts timespec to milliseconds
- @param struct timespec& current time plus timeout result
- @param timeout time to delay in milliseconds */
+ @param struct timespec& result
+ @param time or duration in milliseconds */
- static const void toAbsoluteTimespec(struct timespec& result, long long value) {
+ static void toTimespec(struct timespec& result, long long value) {
- // XXX Darwin doesn't seem to have any readily useable hi-res clock.
+ result.tv_sec = value / 1000; // ms to s
- time_t seconds;
-
- assert(time(&seconds) != (time_t)-1);
-
- seconds+= (value / 1000);
-
- long nanoseconds = (value % 1000) * 1000000;
-
- result.tv_sec = seconds + (nanoseconds / 1000000000);
-
- result.tv_nsec = nanoseconds % 1000000000;
+ result.tv_nsec = (value % 1000) * 1000000; // ms to ns
}
- /** Converts absolute timespec to milliseconds from epoch */
+ /** Converts timespec to milliseconds */
static const void toMilliseconds(long long& result, const struct timespec& value) {
- result = value.tv_sec * 1000 + value.tv_nsec * 1000000;
+ result = value.tv_sec * 1000 + value.tv_nsec / 1000000;
}
/** Get current time as milliseconds from epoch */
static const long long currentTime() {
- time_t now;
+ struct timeval now;
- time(&now);
+ assert(gettimeofday(&now, NULL) == 0);
- return (long long)now * 1000;
+ return ((long long)now.tv_sec) * 1000LL + now.tv_usec / 1000;
}
};
diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.h b/lib/cpp/src/concurrency/test/TimerManagerTests.h
index 24f7964..f34f8b0 100644
--- a/lib/cpp/src/concurrency/test/TimerManagerTests.h
+++ b/lib/cpp/src/concurrency/test/TimerManagerTests.h
@@ -1,6 +1,7 @@
#include <TimerManager.h>
#include <PosixThreadFactory.h>
#include <Monitor.h>
+#include <Util.h>
#include <assert.h>
#include <iostream>
@@ -20,12 +21,30 @@
public:
- Task(Monitor& monitor) :
+ Task(Monitor& monitor, long long timeout) :
+ _timeout(timeout),
+ _startTime(Util::currentTime()),
_monitor(monitor),
+ _success(false),
_done(false) {}
void run() {
+ _endTime = Util::currentTime();
+
+ // Figure out error percentage
+
+ long long delta = _endTime - _startTime;
+
+
+ delta = delta > _timeout ? delta - _timeout : _timeout - delta;
+
+ float error = delta / _timeout;
+
+ if(error < .10) {
+ _success = true;
+ }
+
std::cout << "\t\t\tHello World" << std::endl;
_done = true;
@@ -34,37 +53,60 @@
_monitor.notifyAll();
}
}
-
+
+
+ long long _timeout;
+ long long _startTime;
+ long long _endTime;
Monitor& _monitor;
+ bool _success;
bool _done;
};
public:
- bool test00() {
+ /** This test creates two tasks and waits for the first to expire within 10% of the expected expiration time. It then verifies that
+ the timer manager properly clean up itself and the remaining orphaned timeout task when the manager goes out of scope and its
+ destructor is called. */
- TimerManager* timerManager = new TimerManager();
+ bool test00(long long timeout=1000LL) {
- timerManager->threadFactory(new PosixThreadFactory());
+ TimerManagerTests::Task* orphanTask = new TimerManagerTests::Task(_monitor, 10 * timeout);
- timerManager->start();
+ {
- assert(timerManager->state() == TimerManager::STARTED);
+ TimerManager timerManager;
+
+ timerManager.threadFactory(new PosixThreadFactory());
+
+ timerManager.start();
+
+ assert(timerManager.state() == TimerManager::STARTED);
- TimerManagerTests::Task* task = new TimerManagerTests::Task(_monitor);
+ TimerManagerTests::Task* task = new TimerManagerTests::Task(_monitor, timeout);
- {Synchronized s(_monitor);
+ {Synchronized s(_monitor);
- timerManager->add(task, 1000LL);
+ timerManager.add(orphanTask, 10 * timeout);
- _monitor.wait();
+ timerManager.add(task, timeout);
+
+ _monitor.wait();
+ }
+
+ assert(task->_done);
+
+
+ std::cout << "\t\t\t" << (task->_success ? "Success" : "Failure") << "!" << std::endl;
+
+ delete task;
}
- assert(task->_done);
+ // timerManager.stop(); This is where it happens via destructor
- delete task;
+ assert(!orphanTask->_done);
- std::cout << "\t\t\tSuccess!" << std::endl;
+ delete orphanTask;
return true;
}