| #include "TimerManager.h" |
| #include "Exception.h" |
| #include "Util.h" |
| |
| #include <assert.h> |
| #include <iostream> |
| #include <set> |
| |
| namespace facebook { namespace thrift { namespace concurrency { |
| |
| /** TimerManager class |
| |
| @author marc |
| @version $Id:$ */ |
| |
| typedef std::multimap<long long, shared_ptr<TimerManager::Task> >::iterator task_iterator; |
| typedef std::pair<task_iterator, task_iterator> task_range; |
| |
| class TimerManager::Task : public Runnable { |
| |
| public: |
| enum STATE { |
| WAITING, |
| EXECUTING, |
| CANCELLED, |
| COMPLETE |
| }; |
| |
| Task(shared_ptr<Runnable> runnable) : |
| _runnable(runnable), |
| _state(WAITING) |
| {} |
| |
| ~Task() { |
| std::cerr << "TimerManager::Task.dtor[" << this << "]" << std::endl; //debug |
| }; |
| |
| void run() { |
| if(_state == EXECUTING) { |
| _runnable->run(); |
| _state = COMPLETE; |
| } |
| } |
| |
| private: |
| |
| shared_ptr<Runnable> _runnable; |
| |
| class TimerManager::Dispatcher; |
| |
| friend class TimerManager::Dispatcher; |
| |
| STATE _state; |
| }; |
| |
| class TimerManager::Dispatcher: public Runnable { |
| |
| public: |
| Dispatcher(TimerManager* manager) : |
| _manager(manager) { |
| } |
| |
| ~Dispatcher() { |
| std::cerr << "Dispatcher::dtor[" << this << "]" << std::endl; //debug |
| } |
| |
| /** Dispatcher entry point |
| |
| As long as dispatcher thread is running, pull tasks off the task _taskMap and execute. */ |
| |
| void run() { |
| |
| {Synchronized s(_manager->_monitor); |
| |
| if(_manager->_state == TimerManager::STARTING) { |
| |
| _manager->_state = TimerManager::STARTED; |
| |
| _manager->_monitor.notifyAll(); |
| } |
| } |
| |
| do { |
| |
| std::set<shared_ptr<TimerManager::Task> > expiredTasks; |
| |
| {Synchronized s(_manager->_monitor); |
| |
| task_iterator expiredTaskEnd; |
| |
| long long now = Util::currentTime(); |
| |
| while(_manager->_state == TimerManager::STARTED && |
| (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.begin()) { |
| |
| long long timeout = 0LL; |
| |
| if(!_manager->_taskMap.empty()) { |
| |
| timeout = _manager->_taskMap.begin()->first - now; |
| } |
| |
| assert((timeout != 0 && _manager->_taskCount > 0) || (timeout == 0 && _manager->_taskCount == 0)); |
| |
| _manager->_monitor.wait(timeout); |
| |
| now = Util::currentTime(); |
| } |
| |
| if(_manager->_state == TimerManager::STARTED) { |
| |
| for(task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) { |
| |
| shared_ptr<TimerManager::Task> task = ix->second; |
| |
| expiredTasks.insert(task); |
| |
| if(task->_state == TimerManager::Task::WAITING) { |
| |
| task->_state = TimerManager::Task::EXECUTING; |
| } |
| |
| _manager->_taskCount--; |
| } |
| |
| _manager->_taskMap.erase(_manager->_taskMap.begin(), expiredTaskEnd); |
| } |
| } |
| |
| for(std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) { |
| |
| (*ix)->run(); |
| } |
| |
| } while(_manager->_state == TimerManager::STARTED); |
| |
| {Synchronized s(_manager->_monitor); |
| |
| if(_manager->_state == TimerManager::STOPPING) { |
| |
| _manager->_state = TimerManager::STOPPED; |
| |
| _manager->_monitor.notify(); |
| |
| } |
| } |
| |
| return; |
| } |
| |
| private: |
| |
| TimerManager* _manager; |
| |
| friend class TimerManager; |
| }; |
| |
| TimerManager::TimerManager() : |
| _taskCount(0), |
| _state(TimerManager::UNINITIALIZED), |
| _dispatcher(shared_ptr<Dispatcher>(new Dispatcher(this))) { |
| } |
| |
| |
| TimerManager::~TimerManager() { |
| |
| /* If we haven't been explicitly stopped, do so now. We don't need to grab the monitor here, since |
| stop already takes care of reentrancy. */ |
| |
| std::cerr << "TimerManager::dtor[" << this << "]" << std::endl; |
| |
| if(_state != STOPPED) { |
| |
| try { |
| |
| stop(); |
| |
| } catch(...) { |
| std::cerr << "TimerManager::dtor[" << this << "] uhoh " << std::endl; |
| throw; |
| |
| // uhoh |
| |
| } |
| } |
| } |
| |
| void TimerManager::start() { |
| |
| bool doStart = false; |
| |
| {Synchronized s(_monitor); |
| |
| if(_threadFactory == NULL) {throw InvalidArgumentException();} |
| |
| if(_state == TimerManager::UNINITIALIZED) { |
| |
| _state = TimerManager::STARTING; |
| |
| doStart = true; |
| } |
| } |
| |
| if(doStart) { |
| |
| _dispatcherThread = _threadFactory->newThread(_dispatcher); |
| |
| _dispatcherThread->start(); |
| } |
| |
| {Synchronized s(_monitor); |
| |
| while(_state == TimerManager::STARTING) { |
| |
| _monitor.wait(); |
| } |
| |
| assert(_state != TimerManager::STARTING); |
| } |
| } |
| |
| void TimerManager::stop() { |
| |
| bool doStop = false; |
| |
| {Synchronized s(_monitor); |
| |
| if(_state == TimerManager::UNINITIALIZED) { |
| |
| _state = TimerManager::STOPPED; |
| |
| } else if(_state != STOPPING && _state != STOPPED) { |
| |
| doStop = true; |
| |
| _state = STOPPING; |
| |
| _monitor.notifyAll(); |
| } |
| |
| while(_state != STOPPED) { |
| |
| _monitor.wait(); |
| } |
| } |
| |
| if(doStop) { |
| |
| // Clean up any outstanding tasks |
| |
| for(task_iterator ix = _taskMap.begin(); ix != _taskMap.end(); ix++) { |
| |
| _taskMap.erase(ix); |
| } |
| |
| // Remove dispatcher's reference to us. |
| |
| _dispatcher->_manager = NULL; |
| } |
| } |
| |
| shared_ptr<const ThreadFactory> TimerManager::threadFactory() const { |
| |
| Synchronized s(_monitor); |
| |
| return _threadFactory; |
| } |
| |
| void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) { |
| |
| Synchronized s(_monitor); |
| |
| _threadFactory = value; |
| } |
| |
| size_t TimerManager::taskCount() const { |
| |
| return _taskCount; |
| } |
| |
| void TimerManager::add(shared_ptr<Runnable> task, long long timeout) { |
| |
| long long now = Util::currentTime(); |
| |
| timeout += now; |
| |
| {Synchronized s(_monitor); |
| |
| if(_state != TimerManager::STARTED) { |
| throw IllegalStateException(); |
| } |
| |
| _taskCount++; |
| |
| _taskMap.insert(std::pair<long long, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task)))); |
| |
| /* If the task map was empty, or if we have an expiration that is earlier than any previously seen, |
| kick the dispatcher so it can update its timeout */ |
| |
| if(_taskCount == 1 || timeout < _taskMap.begin()->first) { |
| |
| _monitor.notify(); |
| } |
| } |
| } |
| |
| void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) { |
| |
| long long expiration; |
| |
| Util::toMilliseconds(expiration, value); |
| |
| long long now = Util::currentTime(); |
| |
| if(expiration < now) { |
| throw InvalidArgumentException(); |
| } |
| |
| add(task, expiration - now); |
| } |
| |
| |
| void TimerManager::remove(shared_ptr<Runnable> task) { |
| {Synchronized s(_monitor); |
| |
| if(_state != TimerManager::STARTED) { |
| throw IllegalStateException(); |
| } |
| } |
| } |
| |
| const TimerManager::STATE TimerManager::state() const { return _state;} |
| |
| }}} // facebook::thrift::concurrency |
| |