blob: 07f05adc1e87351efc040f18331b2a11240b22a8 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko0e53ccd2006-07-17 23:51:05 +00007#include "TimerManager.h"
Marc Slemko8a40a762006-07-19 17:46:50 +00008#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +00009#include "Util.h"
10
11#include <assert.h>
Marc Slemko8a40a762006-07-19 17:46:50 +000012#include <iostream>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000013#include <set>
14
15namespace facebook { namespace thrift { namespace concurrency {
16
Mark Slee5ea15f92007-03-05 22:55:59 +000017using boost::shared_ptr;
18
Marc Slemko6f038a72006-08-03 18:58:09 +000019typedef std::multimap<long long, shared_ptr<TimerManager::Task> >::iterator task_iterator;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000020typedef std::pair<task_iterator, task_iterator> task_range;
21
Mark Sleef5f2be42006-09-05 21:05:31 +000022/**
23 * TimerManager class
24 *
25 * @author marc
26 * @version $Id:$
27 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000028class TimerManager::Task : public Runnable {
29
Mark Sleef5f2be42006-09-05 21:05:31 +000030 public:
Marc Slemko0e53ccd2006-07-17 23:51:05 +000031 enum STATE {
32 WAITING,
33 EXECUTING,
34 CANCELLED,
35 COMPLETE
36 };
37
Marc Slemko6f038a72006-08-03 18:58:09 +000038 Task(shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +000039 runnable_(runnable),
40 state_(WAITING) {}
Marc Slemko0e53ccd2006-07-17 23:51:05 +000041
Marc Slemko6f038a72006-08-03 18:58:09 +000042 ~Task() {
Mark Sleef5f2be42006-09-05 21:05:31 +000043 //debug
44 std::cerr << "TimerManager::Task.dtor[" << this << "]" << std::endl;
45 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000046
47 void run() {
Mark Slee2f6404d2006-10-10 01:37:40 +000048 if (state_ == EXECUTING) {
49 runnable_->run();
50 state_ = COMPLETE;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000051 }
52 }
53
54 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000055 shared_ptr<Runnable> runnable_;
Marc Slemko8a40a762006-07-19 17:46:50 +000056 class TimerManager::Dispatcher;
Marc Slemko8a40a762006-07-19 17:46:50 +000057 friend class TimerManager::Dispatcher;
Mark Slee2f6404d2006-10-10 01:37:40 +000058 STATE state_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000059};
60
61class TimerManager::Dispatcher: public Runnable {
62
Mark Sleef5f2be42006-09-05 21:05:31 +000063 public:
Marc Slemko0e53ccd2006-07-17 23:51:05 +000064 Dispatcher(TimerManager* manager) :
Mark Slee2f6404d2006-10-10 01:37:40 +000065 manager_(manager) {}
Marc Slemko0e53ccd2006-07-17 23:51:05 +000066
Marc Slemko6f038a72006-08-03 18:58:09 +000067 ~Dispatcher() {
Mark Sleef5f2be42006-09-05 21:05:31 +000068 // debug
69 std::cerr << "Dispatcher::dtor[" << this << "]" << std::endl;
Marc Slemko6f038a72006-08-03 18:58:09 +000070 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000071
Mark Sleef5f2be42006-09-05 21:05:31 +000072 /**
73 * Dispatcher entry point
74 *
Mark Slee2f6404d2006-10-10 01:37:40 +000075 * As long as dispatcher thread is running, pull tasks off the task taskMap_
Mark Sleef5f2be42006-09-05 21:05:31 +000076 * and execute.
77 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000078 void run() {
Mark Sleef5f2be42006-09-05 21:05:31 +000079 {
Mark Slee2f6404d2006-10-10 01:37:40 +000080 Synchronized s(manager_->monitor_);
81 if (manager_->state_ == TimerManager::STARTING) {
82 manager_->state_ = TimerManager::STARTED;
83 manager_->monitor_.notifyAll();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000084 }
85 }
86
87 do {
Marc Slemko6f038a72006-08-03 18:58:09 +000088 std::set<shared_ptr<TimerManager::Task> > expiredTasks;
Mark Sleef5f2be42006-09-05 21:05:31 +000089 {
Mark Slee2f6404d2006-10-10 01:37:40 +000090 Synchronized s(manager_->monitor_);
Marc Slemko0e53ccd2006-07-17 23:51:05 +000091 task_iterator expiredTaskEnd;
Marc Slemko9f27a4e2006-07-19 20:02:22 +000092 long long now = Util::currentTime();
Mark Slee2f6404d2006-10-10 01:37:40 +000093 while (manager_->state_ == TimerManager::STARTED &&
94 (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) == manager_->taskMap_.begin()) {
Marc Slemko8a40a762006-07-19 17:46:50 +000095 long long timeout = 0LL;
Mark Slee2f6404d2006-10-10 01:37:40 +000096 if (!manager_->taskMap_.empty()) {
97 timeout = manager_->taskMap_.begin()->first - now;
Marc Slemko8a40a762006-07-19 17:46:50 +000098 }
Aditya Agarwal3f234da2007-04-01 01:19:57 +000099 assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000100 try {
Mark Slee2782d6d2007-05-23 04:55:30 +0000101 manager_->monitor_.wait(timeout);
102 } catch (TimedOutException &e) {}
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000103 now = Util::currentTime();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000104 }
105
Mark Slee2f6404d2006-10-10 01:37:40 +0000106 if (manager_->state_ == TimerManager::STARTED) {
107 for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000108 shared_ptr<TimerManager::Task> task = ix->second;
Mark Sleef5f2be42006-09-05 21:05:31 +0000109 expiredTasks.insert(task);
Mark Slee2f6404d2006-10-10 01:37:40 +0000110 if (task->state_ == TimerManager::Task::WAITING) {
111 task->state_ = TimerManager::Task::EXECUTING;
Marc Slemko8a40a762006-07-19 17:46:50 +0000112 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000113 manager_->taskCount_--;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000114 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000115 manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000116 }
117 }
118
Mark Sleef5f2be42006-09-05 21:05:31 +0000119 for (std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
120 (*ix)->run();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000121 }
122
Mark Slee2f6404d2006-10-10 01:37:40 +0000123 } while (manager_->state_ == TimerManager::STARTED);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000124
Mark Sleef5f2be42006-09-05 21:05:31 +0000125 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000126 Synchronized s(manager_->monitor_);
127 if (manager_->state_ == TimerManager::STOPPING) {
128 manager_->state_ = TimerManager::STOPPED;
129 manager_->monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000130 }
131 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000132 return;
133 }
134
135 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000136 TimerManager* manager_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000137 friend class TimerManager;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000138};
139
Marc Slemko8a40a762006-07-19 17:46:50 +0000140TimerManager::TimerManager() :
Mark Slee2f6404d2006-10-10 01:37:40 +0000141 taskCount_(0),
142 state_(TimerManager::UNINITIALIZED),
143 dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000144}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000145
Marc Slemko8a40a762006-07-19 17:46:50 +0000146
147TimerManager::~TimerManager() {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000148
Mark Sleef5f2be42006-09-05 21:05:31 +0000149 // If we haven't been explicitly stopped, do so now. We don't need to grab
150 // the monitor here, since stop already takes care of reentrancy.
Marc Slemko6f038a72006-08-03 18:58:09 +0000151 std::cerr << "TimerManager::dtor[" << this << "]" << std::endl;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000152
Mark Slee2f6404d2006-10-10 01:37:40 +0000153 if (state_ != STOPPED) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000154 try {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000155 stop();
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000156 } catch(...) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000157 std::cerr << "TimerManager::dtor[" << this << "] uhoh " << std::endl;
158 throw;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000159 // uhoh
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000160 }
161 }
Marc Slemko8a40a762006-07-19 17:46:50 +0000162}
163
164void TimerManager::start() {
Marc Slemko8a40a762006-07-19 17:46:50 +0000165 bool doStart = false;
Mark Sleef5f2be42006-09-05 21:05:31 +0000166 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000167 Synchronized s(monitor_);
168 if (threadFactory_ == NULL) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000169 throw InvalidArgumentException();
170 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000171 if (state_ == TimerManager::UNINITIALIZED) {
172 state_ = TimerManager::STARTING;
Marc Slemko8a40a762006-07-19 17:46:50 +0000173 doStart = true;
174 }
175 }
176
Mark Sleef5f2be42006-09-05 21:05:31 +0000177 if (doStart) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000178 dispatcherThread_ = threadFactory_->newThread(dispatcher_);
179 dispatcherThread_->start();
Marc Slemko8a40a762006-07-19 17:46:50 +0000180 }
181
Mark Sleef5f2be42006-09-05 21:05:31 +0000182 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000183 Synchronized s(monitor_);
184 while (state_ == TimerManager::STARTING) {
185 monitor_.wait();
Marc Slemko8a40a762006-07-19 17:46:50 +0000186 }
Aditya Agarwal3f234da2007-04-01 01:19:57 +0000187 assert(state_ != TimerManager::STARTING);
Marc Slemko8a40a762006-07-19 17:46:50 +0000188 }
189}
190
191void TimerManager::stop() {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000192 bool doStop = false;
Mark Sleef5f2be42006-09-05 21:05:31 +0000193 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000194 Synchronized s(monitor_);
195 if (state_ == TimerManager::UNINITIALIZED) {
196 state_ = TimerManager::STOPPED;
197 } else if (state_ != STOPPING && state_ != STOPPED) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000198 doStop = true;
Mark Slee2f6404d2006-10-10 01:37:40 +0000199 state_ = STOPPING;
200 monitor_.notifyAll();
Marc Slemko8a40a762006-07-19 17:46:50 +0000201 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000202 while (state_ != STOPPED) {
203 monitor_.wait();
Marc Slemko8a40a762006-07-19 17:46:50 +0000204 }
205 }
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000206
Mark Sleef5f2be42006-09-05 21:05:31 +0000207 if (doStop) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000208 // Clean up any outstanding tasks
Mark Slee2f6404d2006-10-10 01:37:40 +0000209 for (task_iterator ix = taskMap_.begin(); ix != taskMap_.end(); ix++) {
210 taskMap_.erase(ix);
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000211 }
212
Marc Slemko6f038a72006-08-03 18:58:09 +0000213 // Remove dispatcher's reference to us.
Mark Slee2f6404d2006-10-10 01:37:40 +0000214 dispatcher_->manager_ = NULL;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000215 }
Marc Slemko8a40a762006-07-19 17:46:50 +0000216}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000217
Marc Slemko6f038a72006-08-03 18:58:09 +0000218shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
Mark Slee2f6404d2006-10-10 01:37:40 +0000219 Synchronized s(monitor_);
220 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000221}
222
Marc Slemko6f038a72006-08-03 18:58:09 +0000223void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000224 Synchronized s(monitor_);
225 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000226}
227
Marc Slemko8a40a762006-07-19 17:46:50 +0000228size_t TimerManager::taskCount() const {
Mark Slee2f6404d2006-10-10 01:37:40 +0000229 return taskCount_;
Marc Slemko8a40a762006-07-19 17:46:50 +0000230}
231
Marc Slemko6f038a72006-08-03 18:58:09 +0000232void TimerManager::add(shared_ptr<Runnable> task, long long timeout) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000233 long long now = Util::currentTime();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000234 timeout += now;
235
Mark Sleef5f2be42006-09-05 21:05:31 +0000236 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000237 Synchronized s(monitor_);
238 if (state_ != TimerManager::STARTED) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000239 throw IllegalStateException();
240 }
241
Mark Slee2f6404d2006-10-10 01:37:40 +0000242 taskCount_++;
243 taskMap_.insert(std::pair<long long, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000244
Mark Sleef5f2be42006-09-05 21:05:31 +0000245 // If the task map was empty, or if we have an expiration that is earlier
246 // than any previously seen, kick the dispatcher so it can update its
247 // timeout
Mark Slee2f6404d2006-10-10 01:37:40 +0000248 if (taskCount_ == 1 || timeout < taskMap_.begin()->first) {
249 monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000250 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000251 }
252}
253
Marc Slemko6f038a72006-08-03 18:58:09 +0000254void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000255
Mark Sleef5f2be42006-09-05 21:05:31 +0000256 long long expiration;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000257 Util::toMilliseconds(expiration, value);
258
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000259 long long now = Util::currentTime();
260
Mark Sleef5f2be42006-09-05 21:05:31 +0000261 if (expiration < now) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000262 throw InvalidArgumentException();
263 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000264
265 add(task, expiration - now);
266}
267
268
Marc Slemko6f038a72006-08-03 18:58:09 +0000269void TimerManager::remove(shared_ptr<Runnable> task) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000270 Synchronized s(monitor_);
271 if (state_ != TimerManager::STARTED) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000272 throw IllegalStateException();
Marc Slemko8a40a762006-07-19 17:46:50 +0000273 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000274}
275
Mark Slee2f6404d2006-10-10 01:37:40 +0000276const TimerManager::STATE TimerManager::state() const { return state_; }
Marc Slemko8a40a762006-07-19 17:46:50 +0000277
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000278}}} // facebook::thrift::concurrency
279