|  | // Copyright (c) 2006- Facebook | 
|  | // Distributed under the Thrift Software License | 
|  | // | 
|  | // See accompanying file LICENSE or visit the Thrift site at: | 
|  | // http://developers.facebook.com/thrift/ | 
|  |  | 
|  | #include "TimerManager.h" | 
|  | #include "Exception.h" | 
|  | #include "Util.h" | 
|  |  | 
|  | #include <assert.h> | 
|  | #include <iostream> | 
|  | #include <set> | 
|  |  | 
|  | namespace facebook { namespace thrift { namespace concurrency { | 
|  |  | 
|  | using boost::shared_ptr; | 
|  |  | 
|  | typedef std::multimap<int64_t, shared_ptr<TimerManager::Task> >::iterator task_iterator; | 
|  | typedef std::pair<task_iterator, task_iterator> task_range; | 
|  |  | 
|  | /** | 
|  | * TimerManager class | 
|  | * | 
|  | * @author marc | 
|  | * @version $Id:$ | 
|  | */ | 
|  | class TimerManager::Task : public Runnable { | 
|  |  | 
|  | public: | 
|  | enum STATE { | 
|  | WAITING, | 
|  | EXECUTING, | 
|  | CANCELLED, | 
|  | COMPLETE | 
|  | }; | 
|  |  | 
|  | Task(shared_ptr<Runnable> runnable) : | 
|  | runnable_(runnable), | 
|  | state_(WAITING) {} | 
|  |  | 
|  | ~Task() { | 
|  | } | 
|  |  | 
|  | void run() { | 
|  | if (state_ == EXECUTING) { | 
|  | runnable_->run(); | 
|  | state_ = COMPLETE; | 
|  | } | 
|  | } | 
|  |  | 
|  | private: | 
|  | shared_ptr<Runnable> runnable_; | 
|  | class TimerManager::Dispatcher; | 
|  | friend class TimerManager::Dispatcher; | 
|  | STATE state_; | 
|  | }; | 
|  |  | 
|  | class TimerManager::Dispatcher: public Runnable { | 
|  |  | 
|  | public: | 
|  | Dispatcher(TimerManager* manager) : | 
|  | manager_(manager) {} | 
|  |  | 
|  | ~Dispatcher() {} | 
|  |  | 
|  | /** | 
|  | * Dispatcher entry point | 
|  | * | 
|  | * As long as dispatcher thread is running, pull tasks off the task taskMap_ | 
|  | * and execute. | 
|  | */ | 
|  | void run() { | 
|  | { | 
|  | Synchronized s(manager_->monitor_); | 
|  | if (manager_->state_ == TimerManager::STARTING) { | 
|  | manager_->state_ = TimerManager::STARTED; | 
|  | manager_->monitor_.notifyAll(); | 
|  | } | 
|  | } | 
|  |  | 
|  | do { | 
|  | std::set<shared_ptr<TimerManager::Task> > expiredTasks; | 
|  | { | 
|  | Synchronized s(manager_->monitor_); | 
|  | task_iterator expiredTaskEnd; | 
|  | int64_t now = Util::currentTime(); | 
|  | while (manager_->state_ == TimerManager::STARTED && | 
|  | (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) == manager_->taskMap_.begin()) { | 
|  | int64_t timeout = 0LL; | 
|  | if (!manager_->taskMap_.empty()) { | 
|  | timeout = manager_->taskMap_.begin()->first - now; | 
|  | } | 
|  | assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0)); | 
|  | try { | 
|  | manager_->monitor_.wait(timeout); | 
|  | } catch (TimedOutException &e) {} | 
|  | now = Util::currentTime(); | 
|  | } | 
|  |  | 
|  | if (manager_->state_ == TimerManager::STARTED) { | 
|  | for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) { | 
|  | shared_ptr<TimerManager::Task> task = ix->second; | 
|  | expiredTasks.insert(task); | 
|  | if (task->state_ == TimerManager::Task::WAITING) { | 
|  | task->state_ = TimerManager::Task::EXECUTING; | 
|  | } | 
|  | manager_->taskCount_--; | 
|  | } | 
|  | manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd); | 
|  | } | 
|  | } | 
|  |  | 
|  | for (std::set<shared_ptr<Task> >::iterator ix =  expiredTasks.begin(); ix != expiredTasks.end(); ix++) { | 
|  | (*ix)->run(); | 
|  | } | 
|  |  | 
|  | } while (manager_->state_ == TimerManager::STARTED); | 
|  |  | 
|  | { | 
|  | Synchronized s(manager_->monitor_); | 
|  | if (manager_->state_ == TimerManager::STOPPING) { | 
|  | manager_->state_ = TimerManager::STOPPED; | 
|  | manager_->monitor_.notify(); | 
|  | } | 
|  | } | 
|  | return; | 
|  | } | 
|  |  | 
|  | private: | 
|  | TimerManager* manager_; | 
|  | friend class TimerManager; | 
|  | }; | 
|  |  | 
|  | TimerManager::TimerManager() : | 
|  | taskCount_(0), | 
|  | state_(TimerManager::UNINITIALIZED), | 
|  | dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) { | 
|  | } | 
|  |  | 
|  |  | 
|  | TimerManager::~TimerManager() { | 
|  |  | 
|  | // If we haven't been explicitly stopped, do so now.  We don't need to grab | 
|  | // the monitor here, since stop already takes care of reentrancy. | 
|  |  | 
|  | if (state_ != STOPPED) { | 
|  | try { | 
|  | stop(); | 
|  | } catch(...) { | 
|  | throw; | 
|  | // uhoh | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | void TimerManager::start() { | 
|  | bool doStart = false; | 
|  | { | 
|  | Synchronized s(monitor_); | 
|  | if (threadFactory_ == NULL) { | 
|  | throw InvalidArgumentException(); | 
|  | } | 
|  | if (state_ == TimerManager::UNINITIALIZED) { | 
|  | state_ = TimerManager::STARTING; | 
|  | doStart = true; | 
|  | } | 
|  | } | 
|  |  | 
|  | if (doStart) { | 
|  | dispatcherThread_ = threadFactory_->newThread(dispatcher_); | 
|  | dispatcherThread_->start(); | 
|  | } | 
|  |  | 
|  | { | 
|  | Synchronized s(monitor_); | 
|  | while (state_ == TimerManager::STARTING) { | 
|  | monitor_.wait(); | 
|  | } | 
|  | assert(state_ != TimerManager::STARTING); | 
|  | } | 
|  | } | 
|  |  | 
|  | void TimerManager::stop() { | 
|  | bool doStop = false; | 
|  | { | 
|  | Synchronized s(monitor_); | 
|  | if (state_ == TimerManager::UNINITIALIZED) { | 
|  | state_ = TimerManager::STOPPED; | 
|  | } else if (state_ != STOPPING &&  state_ != STOPPED) { | 
|  | doStop = true; | 
|  | state_ = STOPPING; | 
|  | monitor_.notifyAll(); | 
|  | } | 
|  | while (state_ != STOPPED) { | 
|  | monitor_.wait(); | 
|  | } | 
|  | } | 
|  |  | 
|  | if (doStop) { | 
|  | // Clean up any outstanding tasks | 
|  | for (task_iterator ix =  taskMap_.begin(); ix != taskMap_.end(); ix++) { | 
|  | taskMap_.erase(ix); | 
|  | } | 
|  |  | 
|  | // Remove dispatcher's reference to us. | 
|  | dispatcher_->manager_ = NULL; | 
|  | } | 
|  | } | 
|  |  | 
|  | shared_ptr<const ThreadFactory> TimerManager::threadFactory() const { | 
|  | Synchronized s(monitor_); | 
|  | return threadFactory_; | 
|  | } | 
|  |  | 
|  | void TimerManager::threadFactory(shared_ptr<const ThreadFactory>  value) { | 
|  | Synchronized s(monitor_); | 
|  | threadFactory_ = value; | 
|  | } | 
|  |  | 
|  | size_t TimerManager::taskCount() const { | 
|  | return taskCount_; | 
|  | } | 
|  |  | 
|  | void TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) { | 
|  | int64_t now = Util::currentTime(); | 
|  | timeout += now; | 
|  |  | 
|  | { | 
|  | Synchronized s(monitor_); | 
|  | if (state_ != TimerManager::STARTED) { | 
|  | throw IllegalStateException(); | 
|  | } | 
|  |  | 
|  | taskCount_++; | 
|  | taskMap_.insert(std::pair<int64_t, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task)))); | 
|  |  | 
|  | // If the task map was empty, or if we have an expiration that is earlier | 
|  | // than any previously seen, kick the dispatcher so it can update its | 
|  | // timeout | 
|  | if (taskCount_ == 1 || timeout < taskMap_.begin()->first) { | 
|  | monitor_.notify(); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) { | 
|  |  | 
|  | int64_t expiration; | 
|  | Util::toMilliseconds(expiration, value); | 
|  |  | 
|  | int64_t now = Util::currentTime(); | 
|  |  | 
|  | if (expiration < now) { | 
|  | throw  InvalidArgumentException(); | 
|  | } | 
|  |  | 
|  | add(task, expiration - now); | 
|  | } | 
|  |  | 
|  |  | 
|  | void TimerManager::remove(shared_ptr<Runnable> task) { | 
|  | Synchronized s(monitor_); | 
|  | if (state_ != TimerManager::STARTED) { | 
|  | throw IllegalStateException(); | 
|  | } | 
|  | } | 
|  |  | 
|  | const TimerManager::STATE TimerManager::state() const { return state_; } | 
|  |  | 
|  | }}} // facebook::thrift::concurrency | 
|  |  |