blob: 42864401da0c507a922eda2c06d48d6d5d865e38 [file] [log] [blame]
Marc Slemko0e53ccd2006-07-17 23:51:05 +00001#include "TimerManager.h"
Marc Slemko8a40a762006-07-19 17:46:50 +00002#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +00003#include "Util.h"
4
5#include <assert.h>
Marc Slemko8a40a762006-07-19 17:46:50 +00006#include <iostream>
Marc Slemko0e53ccd2006-07-17 23:51:05 +00007#include <set>
8
9namespace facebook { namespace thrift { namespace concurrency {
10
Marc Slemko6f038a72006-08-03 18:58:09 +000011typedef std::multimap<long long, shared_ptr<TimerManager::Task> >::iterator task_iterator;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000012typedef std::pair<task_iterator, task_iterator> task_range;
13
Mark Sleef5f2be42006-09-05 21:05:31 +000014/**
15 * TimerManager class
16 *
17 * @author marc
18 * @version $Id:$
19 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000020class TimerManager::Task : public Runnable {
21
Mark Sleef5f2be42006-09-05 21:05:31 +000022 public:
Marc Slemko0e53ccd2006-07-17 23:51:05 +000023 enum STATE {
24 WAITING,
25 EXECUTING,
26 CANCELLED,
27 COMPLETE
28 };
29
Marc Slemko6f038a72006-08-03 18:58:09 +000030 Task(shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +000031 runnable_(runnable),
32 state_(WAITING) {}
Marc Slemko0e53ccd2006-07-17 23:51:05 +000033
Marc Slemko6f038a72006-08-03 18:58:09 +000034 ~Task() {
Mark Sleef5f2be42006-09-05 21:05:31 +000035 //debug
36 std::cerr << "TimerManager::Task.dtor[" << this << "]" << std::endl;
37 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000038
39 void run() {
Mark Slee2f6404d2006-10-10 01:37:40 +000040 if (state_ == EXECUTING) {
41 runnable_->run();
42 state_ = COMPLETE;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000043 }
44 }
45
46 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000047 shared_ptr<Runnable> runnable_;
Marc Slemko8a40a762006-07-19 17:46:50 +000048 class TimerManager::Dispatcher;
Marc Slemko8a40a762006-07-19 17:46:50 +000049 friend class TimerManager::Dispatcher;
Mark Slee2f6404d2006-10-10 01:37:40 +000050 STATE state_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000051};
52
53class TimerManager::Dispatcher: public Runnable {
54
Mark Sleef5f2be42006-09-05 21:05:31 +000055 public:
Marc Slemko0e53ccd2006-07-17 23:51:05 +000056 Dispatcher(TimerManager* manager) :
Mark Slee2f6404d2006-10-10 01:37:40 +000057 manager_(manager) {}
Marc Slemko0e53ccd2006-07-17 23:51:05 +000058
Marc Slemko6f038a72006-08-03 18:58:09 +000059 ~Dispatcher() {
Mark Sleef5f2be42006-09-05 21:05:31 +000060 // debug
61 std::cerr << "Dispatcher::dtor[" << this << "]" << std::endl;
Marc Slemko6f038a72006-08-03 18:58:09 +000062 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000063
Mark Sleef5f2be42006-09-05 21:05:31 +000064 /**
65 * Dispatcher entry point
66 *
Mark Slee2f6404d2006-10-10 01:37:40 +000067 * As long as dispatcher thread is running, pull tasks off the task taskMap_
Mark Sleef5f2be42006-09-05 21:05:31 +000068 * and execute.
69 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000070 void run() {
Mark Sleef5f2be42006-09-05 21:05:31 +000071 {
Mark Slee2f6404d2006-10-10 01:37:40 +000072 Synchronized s(manager_->monitor_);
73 if (manager_->state_ == TimerManager::STARTING) {
74 manager_->state_ = TimerManager::STARTED;
75 manager_->monitor_.notifyAll();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000076 }
77 }
78
79 do {
Marc Slemko6f038a72006-08-03 18:58:09 +000080 std::set<shared_ptr<TimerManager::Task> > expiredTasks;
Mark Sleef5f2be42006-09-05 21:05:31 +000081 {
Mark Slee2f6404d2006-10-10 01:37:40 +000082 Synchronized s(manager_->monitor_);
Marc Slemko0e53ccd2006-07-17 23:51:05 +000083 task_iterator expiredTaskEnd;
Marc Slemko9f27a4e2006-07-19 20:02:22 +000084 long long now = Util::currentTime();
Mark Slee2f6404d2006-10-10 01:37:40 +000085 while (manager_->state_ == TimerManager::STARTED &&
86 (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) == manager_->taskMap_.begin()) {
Marc Slemko8a40a762006-07-19 17:46:50 +000087 long long timeout = 0LL;
Mark Slee2f6404d2006-10-10 01:37:40 +000088 if (!manager_->taskMap_.empty()) {
89 timeout = manager_->taskMap_.begin()->first - now;
Marc Slemko8a40a762006-07-19 17:46:50 +000090 }
Mark Slee2f6404d2006-10-10 01:37:40 +000091 assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0));
92 manager_->monitor_.wait(timeout);
Marc Slemko9f27a4e2006-07-19 20:02:22 +000093 now = Util::currentTime();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000094 }
95
Mark Slee2f6404d2006-10-10 01:37:40 +000096 if (manager_->state_ == TimerManager::STARTED) {
97 for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
Marc Slemko6f038a72006-08-03 18:58:09 +000098 shared_ptr<TimerManager::Task> task = ix->second;
Mark Sleef5f2be42006-09-05 21:05:31 +000099 expiredTasks.insert(task);
Mark Slee2f6404d2006-10-10 01:37:40 +0000100 if (task->state_ == TimerManager::Task::WAITING) {
101 task->state_ = TimerManager::Task::EXECUTING;
Marc Slemko8a40a762006-07-19 17:46:50 +0000102 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000103 manager_->taskCount_--;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000104 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000105 manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000106 }
107 }
108
Mark Sleef5f2be42006-09-05 21:05:31 +0000109 for (std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
110 (*ix)->run();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000111 }
112
Mark Slee2f6404d2006-10-10 01:37:40 +0000113 } while (manager_->state_ == TimerManager::STARTED);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000114
Mark Sleef5f2be42006-09-05 21:05:31 +0000115 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000116 Synchronized s(manager_->monitor_);
117 if (manager_->state_ == TimerManager::STOPPING) {
118 manager_->state_ = TimerManager::STOPPED;
119 manager_->monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000120 }
121 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000122 return;
123 }
124
125 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000126 TimerManager* manager_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000127 friend class TimerManager;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000128};
129
Marc Slemko8a40a762006-07-19 17:46:50 +0000130TimerManager::TimerManager() :
Mark Slee2f6404d2006-10-10 01:37:40 +0000131 taskCount_(0),
132 state_(TimerManager::UNINITIALIZED),
133 dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000134}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000135
Marc Slemko8a40a762006-07-19 17:46:50 +0000136
137TimerManager::~TimerManager() {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000138
Mark Sleef5f2be42006-09-05 21:05:31 +0000139 // If we haven't been explicitly stopped, do so now. We don't need to grab
140 // the monitor here, since stop already takes care of reentrancy.
Marc Slemko6f038a72006-08-03 18:58:09 +0000141 std::cerr << "TimerManager::dtor[" << this << "]" << std::endl;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000142
Mark Slee2f6404d2006-10-10 01:37:40 +0000143 if (state_ != STOPPED) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000144 try {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000145 stop();
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000146 } catch(...) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000147 std::cerr << "TimerManager::dtor[" << this << "] uhoh " << std::endl;
148 throw;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000149 // uhoh
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000150 }
151 }
Marc Slemko8a40a762006-07-19 17:46:50 +0000152}
153
154void TimerManager::start() {
Marc Slemko8a40a762006-07-19 17:46:50 +0000155 bool doStart = false;
Mark Sleef5f2be42006-09-05 21:05:31 +0000156 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000157 Synchronized s(monitor_);
158 if (threadFactory_ == NULL) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000159 throw InvalidArgumentException();
160 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000161 if (state_ == TimerManager::UNINITIALIZED) {
162 state_ = TimerManager::STARTING;
Marc Slemko8a40a762006-07-19 17:46:50 +0000163 doStart = true;
164 }
165 }
166
Mark Sleef5f2be42006-09-05 21:05:31 +0000167 if (doStart) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000168 dispatcherThread_ = threadFactory_->newThread(dispatcher_);
169 dispatcherThread_->start();
Marc Slemko8a40a762006-07-19 17:46:50 +0000170 }
171
Mark Sleef5f2be42006-09-05 21:05:31 +0000172 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000173 Synchronized s(monitor_);
174 while (state_ == TimerManager::STARTING) {
175 monitor_.wait();
Marc Slemko8a40a762006-07-19 17:46:50 +0000176 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000177 assert(state_ != TimerManager::STARTING);
Marc Slemko8a40a762006-07-19 17:46:50 +0000178 }
179}
180
181void TimerManager::stop() {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000182 bool doStop = false;
Mark Sleef5f2be42006-09-05 21:05:31 +0000183 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000184 Synchronized s(monitor_);
185 if (state_ == TimerManager::UNINITIALIZED) {
186 state_ = TimerManager::STOPPED;
187 } else if (state_ != STOPPING && state_ != STOPPED) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000188 doStop = true;
Mark Slee2f6404d2006-10-10 01:37:40 +0000189 state_ = STOPPING;
190 monitor_.notifyAll();
Marc Slemko8a40a762006-07-19 17:46:50 +0000191 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000192 while (state_ != STOPPED) {
193 monitor_.wait();
Marc Slemko8a40a762006-07-19 17:46:50 +0000194 }
195 }
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000196
Mark Sleef5f2be42006-09-05 21:05:31 +0000197 if (doStop) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000198 // Clean up any outstanding tasks
Mark Slee2f6404d2006-10-10 01:37:40 +0000199 for (task_iterator ix = taskMap_.begin(); ix != taskMap_.end(); ix++) {
200 taskMap_.erase(ix);
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000201 }
202
Marc Slemko6f038a72006-08-03 18:58:09 +0000203 // Remove dispatcher's reference to us.
Mark Slee2f6404d2006-10-10 01:37:40 +0000204 dispatcher_->manager_ = NULL;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000205 }
Marc Slemko8a40a762006-07-19 17:46:50 +0000206}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000207
Marc Slemko6f038a72006-08-03 18:58:09 +0000208shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
Mark Slee2f6404d2006-10-10 01:37:40 +0000209 Synchronized s(monitor_);
210 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000211}
212
Marc Slemko6f038a72006-08-03 18:58:09 +0000213void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000214 Synchronized s(monitor_);
215 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000216}
217
Marc Slemko8a40a762006-07-19 17:46:50 +0000218size_t TimerManager::taskCount() const {
Mark Slee2f6404d2006-10-10 01:37:40 +0000219 return taskCount_;
Marc Slemko8a40a762006-07-19 17:46:50 +0000220}
221
Marc Slemko6f038a72006-08-03 18:58:09 +0000222void TimerManager::add(shared_ptr<Runnable> task, long long timeout) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000223 long long now = Util::currentTime();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000224 timeout += now;
225
Mark Sleef5f2be42006-09-05 21:05:31 +0000226 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000227 Synchronized s(monitor_);
228 if (state_ != TimerManager::STARTED) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000229 throw IllegalStateException();
230 }
231
Mark Slee2f6404d2006-10-10 01:37:40 +0000232 taskCount_++;
233 taskMap_.insert(std::pair<long long, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000234
Mark Sleef5f2be42006-09-05 21:05:31 +0000235 // If the task map was empty, or if we have an expiration that is earlier
236 // than any previously seen, kick the dispatcher so it can update its
237 // timeout
Mark Slee2f6404d2006-10-10 01:37:40 +0000238 if (taskCount_ == 1 || timeout < taskMap_.begin()->first) {
239 monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000240 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000241 }
242}
243
Marc Slemko6f038a72006-08-03 18:58:09 +0000244void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000245
Mark Sleef5f2be42006-09-05 21:05:31 +0000246 long long expiration;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000247 Util::toMilliseconds(expiration, value);
248
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000249 long long now = Util::currentTime();
250
Mark Sleef5f2be42006-09-05 21:05:31 +0000251 if (expiration < now) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000252 throw InvalidArgumentException();
253 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000254
255 add(task, expiration - now);
256}
257
258
Marc Slemko6f038a72006-08-03 18:58:09 +0000259void TimerManager::remove(shared_ptr<Runnable> task) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000260 Synchronized s(monitor_);
261 if (state_ != TimerManager::STARTED) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000262 throw IllegalStateException();
Marc Slemko8a40a762006-07-19 17:46:50 +0000263 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000264}
265
Mark Slee2f6404d2006-10-10 01:37:40 +0000266const TimerManager::STATE TimerManager::state() const { return state_; }
Marc Slemko8a40a762006-07-19 17:46:50 +0000267
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000268}}} // facebook::thrift::concurrency
269