blob: 76ecd935079f1424bb7641eeb97548b178c6d473 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko0e53ccd2006-07-17 23:51:05 +00007#include "TimerManager.h"
Marc Slemko8a40a762006-07-19 17:46:50 +00008#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +00009#include "Util.h"
10
11#include <assert.h>
Marc Slemko8a40a762006-07-19 17:46:50 +000012#include <iostream>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000013#include <set>
14
T Jake Lucianib5e62212009-01-31 22:36:20 +000015namespace apache { namespace thrift { namespace concurrency {
Marc Slemko0e53ccd2006-07-17 23:51:05 +000016
Mark Slee5ea15f92007-03-05 22:55:59 +000017using boost::shared_ptr;
18
Mark Slee9b82d272007-05-23 05:16:07 +000019typedef std::multimap<int64_t, shared_ptr<TimerManager::Task> >::iterator task_iterator;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000020typedef std::pair<task_iterator, task_iterator> task_range;
21
Mark Sleef5f2be42006-09-05 21:05:31 +000022/**
23 * TimerManager class
24 *
Mark Sleef5f2be42006-09-05 21:05:31 +000025 * @version $Id:$
26 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000027class TimerManager::Task : public Runnable {
28
Mark Sleef5f2be42006-09-05 21:05:31 +000029 public:
Marc Slemko0e53ccd2006-07-17 23:51:05 +000030 enum STATE {
31 WAITING,
32 EXECUTING,
33 CANCELLED,
34 COMPLETE
35 };
36
Marc Slemko6f038a72006-08-03 18:58:09 +000037 Task(shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +000038 runnable_(runnable),
39 state_(WAITING) {}
David Reiss0c90f6f2008-02-06 22:18:40 +000040
Marc Slemko6f038a72006-08-03 18:58:09 +000041 ~Task() {
Mark Sleef5f2be42006-09-05 21:05:31 +000042 }
David Reiss0c90f6f2008-02-06 22:18:40 +000043
Marc Slemko0e53ccd2006-07-17 23:51:05 +000044 void run() {
Mark Slee2f6404d2006-10-10 01:37:40 +000045 if (state_ == EXECUTING) {
46 runnable_->run();
47 state_ = COMPLETE;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000048 }
49 }
50
51 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000052 shared_ptr<Runnable> runnable_;
Marc Slemko8a40a762006-07-19 17:46:50 +000053 class TimerManager::Dispatcher;
Marc Slemko8a40a762006-07-19 17:46:50 +000054 friend class TimerManager::Dispatcher;
Mark Slee2f6404d2006-10-10 01:37:40 +000055 STATE state_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000056};
57
58class TimerManager::Dispatcher: public Runnable {
59
Mark Sleef5f2be42006-09-05 21:05:31 +000060 public:
David Reiss0c90f6f2008-02-06 22:18:40 +000061 Dispatcher(TimerManager* manager) :
Mark Slee2f6404d2006-10-10 01:37:40 +000062 manager_(manager) {}
David Reiss0c90f6f2008-02-06 22:18:40 +000063
Marc Slemko67606e52007-06-04 21:01:19 +000064 ~Dispatcher() {}
David Reiss0c90f6f2008-02-06 22:18:40 +000065
Mark Sleef5f2be42006-09-05 21:05:31 +000066 /**
67 * Dispatcher entry point
68 *
Mark Slee2f6404d2006-10-10 01:37:40 +000069 * As long as dispatcher thread is running, pull tasks off the task taskMap_
Mark Sleef5f2be42006-09-05 21:05:31 +000070 * and execute.
71 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000072 void run() {
Mark Sleef5f2be42006-09-05 21:05:31 +000073 {
Mark Slee2f6404d2006-10-10 01:37:40 +000074 Synchronized s(manager_->monitor_);
75 if (manager_->state_ == TimerManager::STARTING) {
David Reiss96d23882007-07-26 21:10:32 +000076 manager_->state_ = TimerManager::STARTED;
77 manager_->monitor_.notifyAll();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000078 }
79 }
80
81 do {
Marc Slemko6f038a72006-08-03 18:58:09 +000082 std::set<shared_ptr<TimerManager::Task> > expiredTasks;
Mark Sleef5f2be42006-09-05 21:05:31 +000083 {
Mark Slee2f6404d2006-10-10 01:37:40 +000084 Synchronized s(manager_->monitor_);
David Reiss96d23882007-07-26 21:10:32 +000085 task_iterator expiredTaskEnd;
86 int64_t now = Util::currentTime();
David Reiss0c90f6f2008-02-06 22:18:40 +000087 while (manager_->state_ == TimerManager::STARTED &&
Mark Slee2f6404d2006-10-10 01:37:40 +000088 (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) == manager_->taskMap_.begin()) {
David Reiss96d23882007-07-26 21:10:32 +000089 int64_t timeout = 0LL;
90 if (!manager_->taskMap_.empty()) {
Mark Slee2f6404d2006-10-10 01:37:40 +000091 timeout = manager_->taskMap_.begin()->first - now;
David Reiss96d23882007-07-26 21:10:32 +000092 }
Aditya Agarwal3f234da2007-04-01 01:19:57 +000093 assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0));
Marc Slemko3a3b53b2007-05-22 23:59:54 +000094 try {
Mark Slee2782d6d2007-05-23 04:55:30 +000095 manager_->monitor_.wait(timeout);
96 } catch (TimedOutException &e) {}
David Reiss96d23882007-07-26 21:10:32 +000097 now = Util::currentTime();
98 }
David Reiss0c90f6f2008-02-06 22:18:40 +000099
David Reiss96d23882007-07-26 21:10:32 +0000100 if (manager_->state_ == TimerManager::STARTED) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000101 for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000102 shared_ptr<TimerManager::Task> task = ix->second;
Mark Sleef5f2be42006-09-05 21:05:31 +0000103 expiredTasks.insert(task);
David Reiss96d23882007-07-26 21:10:32 +0000104 if (task->state_ == TimerManager::Task::WAITING) {
105 task->state_ = TimerManager::Task::EXECUTING;
106 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000107 manager_->taskCount_--;
David Reiss96d23882007-07-26 21:10:32 +0000108 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000109 manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
David Reiss96d23882007-07-26 21:10:32 +0000110 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000111 }
David Reiss0c90f6f2008-02-06 22:18:40 +0000112
Mark Sleef5f2be42006-09-05 21:05:31 +0000113 for (std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
114 (*ix)->run();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000115 }
David Reiss0c90f6f2008-02-06 22:18:40 +0000116
Mark Slee2f6404d2006-10-10 01:37:40 +0000117 } while (manager_->state_ == TimerManager::STARTED);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000118
Mark Sleef5f2be42006-09-05 21:05:31 +0000119 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000120 Synchronized s(manager_->monitor_);
121 if (manager_->state_ == TimerManager::STOPPING) {
David Reiss0c90f6f2008-02-06 22:18:40 +0000122 manager_->state_ = TimerManager::STOPPED;
David Reiss96d23882007-07-26 21:10:32 +0000123 manager_->monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000124 }
125 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000126 return;
127 }
128
129 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000130 TimerManager* manager_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000131 friend class TimerManager;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000132};
133
Marc Slemko8a40a762006-07-19 17:46:50 +0000134TimerManager::TimerManager() :
Mark Slee2f6404d2006-10-10 01:37:40 +0000135 taskCount_(0),
136 state_(TimerManager::UNINITIALIZED),
137 dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000138}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000139
Marc Slemko8a40a762006-07-19 17:46:50 +0000140
141TimerManager::~TimerManager() {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000142
Mark Sleef5f2be42006-09-05 21:05:31 +0000143 // If we haven't been explicitly stopped, do so now. We don't need to grab
144 // the monitor here, since stop already takes care of reentrancy.
David Reiss0c90f6f2008-02-06 22:18:40 +0000145
Mark Slee2f6404d2006-10-10 01:37:40 +0000146 if (state_ != STOPPED) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000147 try {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000148 stop();
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000149 } catch(...) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000150 throw;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000151 // uhoh
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000152 }
153 }
Marc Slemko8a40a762006-07-19 17:46:50 +0000154}
155
156void TimerManager::start() {
Marc Slemko8a40a762006-07-19 17:46:50 +0000157 bool doStart = false;
Mark Sleef5f2be42006-09-05 21:05:31 +0000158 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000159 Synchronized s(monitor_);
160 if (threadFactory_ == NULL) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000161 throw InvalidArgumentException();
162 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000163 if (state_ == TimerManager::UNINITIALIZED) {
164 state_ = TimerManager::STARTING;
Marc Slemko8a40a762006-07-19 17:46:50 +0000165 doStart = true;
166 }
167 }
168
Mark Sleef5f2be42006-09-05 21:05:31 +0000169 if (doStart) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000170 dispatcherThread_ = threadFactory_->newThread(dispatcher_);
171 dispatcherThread_->start();
Marc Slemko8a40a762006-07-19 17:46:50 +0000172 }
173
Mark Sleef5f2be42006-09-05 21:05:31 +0000174 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000175 Synchronized s(monitor_);
176 while (state_ == TimerManager::STARTING) {
177 monitor_.wait();
Marc Slemko8a40a762006-07-19 17:46:50 +0000178 }
Aditya Agarwal3f234da2007-04-01 01:19:57 +0000179 assert(state_ != TimerManager::STARTING);
Marc Slemko8a40a762006-07-19 17:46:50 +0000180 }
181}
182
183void TimerManager::stop() {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000184 bool doStop = false;
Mark Sleef5f2be42006-09-05 21:05:31 +0000185 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000186 Synchronized s(monitor_);
187 if (state_ == TimerManager::UNINITIALIZED) {
188 state_ = TimerManager::STOPPED;
189 } else if (state_ != STOPPING && state_ != STOPPED) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000190 doStop = true;
Mark Slee2f6404d2006-10-10 01:37:40 +0000191 state_ = STOPPING;
192 monitor_.notifyAll();
Marc Slemko8a40a762006-07-19 17:46:50 +0000193 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000194 while (state_ != STOPPED) {
195 monitor_.wait();
Marc Slemko8a40a762006-07-19 17:46:50 +0000196 }
197 }
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000198
Mark Sleef5f2be42006-09-05 21:05:31 +0000199 if (doStop) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000200 // Clean up any outstanding tasks
Mark Slee2f6404d2006-10-10 01:37:40 +0000201 for (task_iterator ix = taskMap_.begin(); ix != taskMap_.end(); ix++) {
202 taskMap_.erase(ix);
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000203 }
204
David Reiss0c90f6f2008-02-06 22:18:40 +0000205 // Remove dispatcher's reference to us.
Mark Slee2f6404d2006-10-10 01:37:40 +0000206 dispatcher_->manager_ = NULL;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000207 }
Marc Slemko8a40a762006-07-19 17:46:50 +0000208}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000209
Marc Slemko6f038a72006-08-03 18:58:09 +0000210shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
David Reiss0c90f6f2008-02-06 22:18:40 +0000211 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000212 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000213}
David Reiss0c90f6f2008-02-06 22:18:40 +0000214
Marc Slemko6f038a72006-08-03 18:58:09 +0000215void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
David Reiss0c90f6f2008-02-06 22:18:40 +0000216 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000217 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000218}
219
Marc Slemko8a40a762006-07-19 17:46:50 +0000220size_t TimerManager::taskCount() const {
Mark Slee2f6404d2006-10-10 01:37:40 +0000221 return taskCount_;
Marc Slemko8a40a762006-07-19 17:46:50 +0000222}
David Reiss0c90f6f2008-02-06 22:18:40 +0000223
Mark Slee9b82d272007-05-23 05:16:07 +0000224void TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) {
225 int64_t now = Util::currentTime();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000226 timeout += now;
227
Mark Sleef5f2be42006-09-05 21:05:31 +0000228 {
David Reiss0c90f6f2008-02-06 22:18:40 +0000229 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000230 if (state_ != TimerManager::STARTED) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000231 throw IllegalStateException();
232 }
233
Mark Slee2f6404d2006-10-10 01:37:40 +0000234 taskCount_++;
Mark Slee9b82d272007-05-23 05:16:07 +0000235 taskMap_.insert(std::pair<int64_t, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000236
Mark Sleef5f2be42006-09-05 21:05:31 +0000237 // If the task map was empty, or if we have an expiration that is earlier
238 // than any previously seen, kick the dispatcher so it can update its
239 // timeout
Mark Slee2f6404d2006-10-10 01:37:40 +0000240 if (taskCount_ == 1 || timeout < taskMap_.begin()->first) {
241 monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000242 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000243 }
244}
245
Marc Slemko6f038a72006-08-03 18:58:09 +0000246void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000247
Mark Slee9b82d272007-05-23 05:16:07 +0000248 int64_t expiration;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000249 Util::toMilliseconds(expiration, value);
250
Mark Slee9b82d272007-05-23 05:16:07 +0000251 int64_t now = Util::currentTime();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000252
Mark Sleef5f2be42006-09-05 21:05:31 +0000253 if (expiration < now) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000254 throw InvalidArgumentException();
255 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000256
257 add(task, expiration - now);
258}
259
260
Marc Slemko6f038a72006-08-03 18:58:09 +0000261void TimerManager::remove(shared_ptr<Runnable> task) {
David Reiss0c90f6f2008-02-06 22:18:40 +0000262 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000263 if (state_ != TimerManager::STARTED) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000264 throw IllegalStateException();
Marc Slemko8a40a762006-07-19 17:46:50 +0000265 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000266}
267
Mark Slee2f6404d2006-10-10 01:37:40 +0000268const TimerManager::STATE TimerManager::state() const { return state_; }
Marc Slemko8a40a762006-07-19 17:46:50 +0000269
T Jake Lucianib5e62212009-01-31 22:36:20 +0000270}}} // apache::thrift::concurrency
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000271