blob: f48df4ebee12d7ea7de6e7dbc3c647f4cded0dc9 [file] [log] [blame]
#include "TimerManager.h"
#include "Exception.h"
#include "Util.h"
#include <assert.h>
#include <iostream>
#include <set>
namespace facebook { namespace thrift { namespace concurrency {
typedef std::multimap<long long, shared_ptr<TimerManager::Task> >::iterator task_iterator;
typedef std::pair<task_iterator, task_iterator> task_range;
/**
* TimerManager class
*
* @author marc
* @version $Id:$
*/
class TimerManager::Task : public Runnable {
public:
enum STATE {
WAITING,
EXECUTING,
CANCELLED,
COMPLETE
};
Task(shared_ptr<Runnable> runnable) :
_runnable(runnable),
_state(WAITING) {}
~Task() {
//debug
std::cerr << "TimerManager::Task.dtor[" << this << "]" << std::endl;
}
void run() {
if (_state == EXECUTING) {
_runnable->run();
_state = COMPLETE;
}
}
private:
shared_ptr<Runnable> _runnable;
class TimerManager::Dispatcher;
friend class TimerManager::Dispatcher;
STATE _state;
};
class TimerManager::Dispatcher: public Runnable {
public:
Dispatcher(TimerManager* manager) :
_manager(manager) {}
~Dispatcher() {
// debug
std::cerr << "Dispatcher::dtor[" << this << "]" << std::endl;
}
/**
* Dispatcher entry point
*
* As long as dispatcher thread is running, pull tasks off the task _taskMap
* and execute.
*/
void run() {
{
Synchronized s(_manager->_monitor);
if (_manager->_state == TimerManager::STARTING) {
_manager->_state = TimerManager::STARTED;
_manager->_monitor.notifyAll();
}
}
do {
std::set<shared_ptr<TimerManager::Task> > expiredTasks;
{
Synchronized s(_manager->_monitor);
task_iterator expiredTaskEnd;
long long now = Util::currentTime();
while (_manager->_state == TimerManager::STARTED &&
(expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.begin()) {
long long timeout = 0LL;
if (!_manager->_taskMap.empty()) {
timeout = _manager->_taskMap.begin()->first - now;
}
assert((timeout != 0 && _manager->_taskCount > 0) || (timeout == 0 && _manager->_taskCount == 0));
_manager->_monitor.wait(timeout);
now = Util::currentTime();
}
if (_manager->_state == TimerManager::STARTED) {
for (task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) {
shared_ptr<TimerManager::Task> task = ix->second;
expiredTasks.insert(task);
if (task->_state == TimerManager::Task::WAITING) {
task->_state = TimerManager::Task::EXECUTING;
}
_manager->_taskCount--;
}
_manager->_taskMap.erase(_manager->_taskMap.begin(), expiredTaskEnd);
}
}
for (std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
(*ix)->run();
}
} while (_manager->_state == TimerManager::STARTED);
{
Synchronized s(_manager->_monitor);
if (_manager->_state == TimerManager::STOPPING) {
_manager->_state = TimerManager::STOPPED;
_manager->_monitor.notify();
}
}
return;
}
private:
TimerManager* _manager;
friend class TimerManager;
};
TimerManager::TimerManager() :
_taskCount(0),
_state(TimerManager::UNINITIALIZED),
_dispatcher(shared_ptr<Dispatcher>(new Dispatcher(this))) {
}
TimerManager::~TimerManager() {
// If we haven't been explicitly stopped, do so now. We don't need to grab
// the monitor here, since stop already takes care of reentrancy.
std::cerr << "TimerManager::dtor[" << this << "]" << std::endl;
if (_state != STOPPED) {
try {
stop();
} catch(...) {
std::cerr << "TimerManager::dtor[" << this << "] uhoh " << std::endl;
throw;
// uhoh
}
}
}
void TimerManager::start() {
bool doStart = false;
{
Synchronized s(_monitor);
if (_threadFactory == NULL) {
throw InvalidArgumentException();
}
if (_state == TimerManager::UNINITIALIZED) {
_state = TimerManager::STARTING;
doStart = true;
}
}
if (doStart) {
_dispatcherThread = _threadFactory->newThread(_dispatcher);
_dispatcherThread->start();
}
{
Synchronized s(_monitor);
while (_state == TimerManager::STARTING) {
_monitor.wait();
}
assert(_state != TimerManager::STARTING);
}
}
void TimerManager::stop() {
bool doStop = false;
{
Synchronized s(_monitor);
if (_state == TimerManager::UNINITIALIZED) {
_state = TimerManager::STOPPED;
} else if (_state != STOPPING && _state != STOPPED) {
doStop = true;
_state = STOPPING;
_monitor.notifyAll();
}
while (_state != STOPPED) {
_monitor.wait();
}
}
if (doStop) {
// Clean up any outstanding tasks
for (task_iterator ix = _taskMap.begin(); ix != _taskMap.end(); ix++) {
_taskMap.erase(ix);
}
// Remove dispatcher's reference to us.
_dispatcher->_manager = NULL;
}
}
shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
Synchronized s(_monitor);
return _threadFactory;
}
void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
Synchronized s(_monitor);
_threadFactory = value;
}
size_t TimerManager::taskCount() const {
return _taskCount;
}
void TimerManager::add(shared_ptr<Runnable> task, long long timeout) {
long long now = Util::currentTime();
timeout += now;
{
Synchronized s(_monitor);
if (_state != TimerManager::STARTED) {
throw IllegalStateException();
}
_taskCount++;
_taskMap.insert(std::pair<long long, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
// If the task map was empty, or if we have an expiration that is earlier
// than any previously seen, kick the dispatcher so it can update its
// timeout
if (_taskCount == 1 || timeout < _taskMap.begin()->first) {
_monitor.notify();
}
}
}
void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) {
long long expiration;
Util::toMilliseconds(expiration, value);
long long now = Util::currentTime();
if (expiration < now) {
throw InvalidArgumentException();
}
add(task, expiration - now);
}
void TimerManager::remove(shared_ptr<Runnable> task) {
Synchronized s(_monitor);
if (_state != TimerManager::STARTED) {
throw IllegalStateException();
}
}
const TimerManager::STATE TimerManager::state() const { return _state; }
}}} // facebook::thrift::concurrency