blob: a223a77a9054adb5fede663f48f3555d5dcfae91 [file] [log] [blame]
Marc Slemko0e53ccd2006-07-17 23:51:05 +00001#include "TimerManager.h"
Marc Slemko8a40a762006-07-19 17:46:50 +00002#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +00003#include "Util.h"
4
5#include <assert.h>
Marc Slemko8a40a762006-07-19 17:46:50 +00006#include <iostream>
Marc Slemko0e53ccd2006-07-17 23:51:05 +00007#include <set>
8
9namespace facebook { namespace thrift { namespace concurrency {
10
11/** TimerManager class
12
13 @author marc
14 @version $Id:$ */
15
Marc Slemko6f038a72006-08-03 18:58:09 +000016typedef std::multimap<long long, shared_ptr<TimerManager::Task> >::iterator task_iterator;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000017typedef std::pair<task_iterator, task_iterator> task_range;
18
19class TimerManager::Task : public Runnable {
20
21public:
22 enum STATE {
23 WAITING,
24 EXECUTING,
25 CANCELLED,
26 COMPLETE
27 };
28
Marc Slemko6f038a72006-08-03 18:58:09 +000029 Task(shared_ptr<Runnable> runnable) :
Marc Slemko0e53ccd2006-07-17 23:51:05 +000030 _runnable(runnable),
31 _state(WAITING)
32 {}
33
Marc Slemko6f038a72006-08-03 18:58:09 +000034 ~Task() {
35 std::cerr << "TimerManager::Task.dtor[" << this << "]" << std::endl; //debug
36};
Marc Slemko0e53ccd2006-07-17 23:51:05 +000037
38 void run() {
39 if(_state == EXECUTING) {
40 _runnable->run();
41 _state = COMPLETE;
42 }
43 }
44
45 private:
46
Marc Slemko6f038a72006-08-03 18:58:09 +000047 shared_ptr<Runnable> _runnable;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000048
Marc Slemko8a40a762006-07-19 17:46:50 +000049 class TimerManager::Dispatcher;
50
51 friend class TimerManager::Dispatcher;
52
Marc Slemko0e53ccd2006-07-17 23:51:05 +000053 STATE _state;
54};
55
56class TimerManager::Dispatcher: public Runnable {
57
Marc Slemko0e53ccd2006-07-17 23:51:05 +000058public:
59 Dispatcher(TimerManager* manager) :
Marc Slemko8a40a762006-07-19 17:46:50 +000060 _manager(manager) {
61}
Marc Slemko0e53ccd2006-07-17 23:51:05 +000062
Marc Slemko6f038a72006-08-03 18:58:09 +000063 ~Dispatcher() {
64 std::cerr << "Dispatcher::dtor[" << this << "]" << std::endl; //debug
65 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000066
67 /** Dispatcher entry point
68
69 As long as dispatcher thread is running, pull tasks off the task _taskMap and execute. */
70
71 void run() {
72
Marc Slemko8a40a762006-07-19 17:46:50 +000073 {Synchronized s(_manager->_monitor);
Marc Slemko0e53ccd2006-07-17 23:51:05 +000074
Marc Slemko8a40a762006-07-19 17:46:50 +000075 if(_manager->_state == TimerManager::STARTING) {
76
77 _manager->_state = TimerManager::STARTED;
78
79 _manager->_monitor.notifyAll();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000080 }
81 }
82
83 do {
84
Marc Slemko6f038a72006-08-03 18:58:09 +000085 std::set<shared_ptr<TimerManager::Task> > expiredTasks;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000086
Marc Slemko8a40a762006-07-19 17:46:50 +000087 {Synchronized s(_manager->_monitor);
88
Marc Slemko0e53ccd2006-07-17 23:51:05 +000089 task_iterator expiredTaskEnd;
Marc Slemko8a40a762006-07-19 17:46:50 +000090
Marc Slemko9f27a4e2006-07-19 20:02:22 +000091 long long now = Util::currentTime();
92
Marc Slemko8a40a762006-07-19 17:46:50 +000093 while(_manager->_state == TimerManager::STARTED &&
Marc Slemko9f27a4e2006-07-19 20:02:22 +000094 (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.begin()) {
Marc Slemko8a40a762006-07-19 17:46:50 +000095
96 long long timeout = 0LL;
97
98 if(!_manager->_taskMap.empty()) {
99
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000100 timeout = _manager->_taskMap.begin()->first - now;
Marc Slemko8a40a762006-07-19 17:46:50 +0000101 }
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000102
103 assert((timeout != 0 && _manager->_taskCount > 0) || (timeout == 0 && _manager->_taskCount == 0));
Marc Slemko8a40a762006-07-19 17:46:50 +0000104
105 _manager->_monitor.wait(timeout);
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000106
107 now = Util::currentTime();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000108 }
109
Marc Slemko8a40a762006-07-19 17:46:50 +0000110 if(_manager->_state == TimerManager::STARTED) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000111
112 for(task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) {
113
Marc Slemko6f038a72006-08-03 18:58:09 +0000114 shared_ptr<TimerManager::Task> task = ix->second;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000115
116 expiredTasks.insert(task);
Marc Slemko8a40a762006-07-19 17:46:50 +0000117
118 if(task->_state == TimerManager::Task::WAITING) {
119
120 task->_state = TimerManager::Task::EXECUTING;
121 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000122
123 _manager->_taskCount--;
124 }
125
126 _manager->_taskMap.erase(_manager->_taskMap.begin(), expiredTaskEnd);
127 }
128 }
129
Marc Slemko6f038a72006-08-03 18:58:09 +0000130 for(std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000131
132 (*ix)->run();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000133 }
134
Marc Slemko8a40a762006-07-19 17:46:50 +0000135 } while(_manager->_state == TimerManager::STARTED);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000136
Marc Slemko8a40a762006-07-19 17:46:50 +0000137 {Synchronized s(_manager->_monitor);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000138
Marc Slemko8a40a762006-07-19 17:46:50 +0000139 if(_manager->_state == TimerManager::STOPPING) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000140
Marc Slemko8a40a762006-07-19 17:46:50 +0000141 _manager->_state = TimerManager::STOPPED;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000142
143 _manager->_monitor.notify();
144
145 }
146 }
147
148 return;
149 }
150
151 private:
152
153 TimerManager* _manager;
154
155 friend class TimerManager;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000156};
157
Marc Slemko8a40a762006-07-19 17:46:50 +0000158TimerManager::TimerManager() :
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000159 _taskCount(0),
Marc Slemko8a40a762006-07-19 17:46:50 +0000160 _state(TimerManager::UNINITIALIZED),
Marc Slemko6f038a72006-08-03 18:58:09 +0000161 _dispatcher(shared_ptr<Dispatcher>(new Dispatcher(this))) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000162}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000163
Marc Slemko8a40a762006-07-19 17:46:50 +0000164
165TimerManager::~TimerManager() {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000166
167 /* If we haven't been explicitly stopped, do so now. We don't need to grab the monitor here, since
168 stop already takes care of reentrancy. */
Marc Slemko6f038a72006-08-03 18:58:09 +0000169
170 std::cerr << "TimerManager::dtor[" << this << "]" << std::endl;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000171
172 if(_state != STOPPED) {
173
174 try {
175
176 stop();
177
178 } catch(...) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000179 std::cerr << "TimerManager::dtor[" << this << "] uhoh " << std::endl;
180 throw;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000181
182 // uhoh
183
184 }
185 }
Marc Slemko8a40a762006-07-19 17:46:50 +0000186}
187
188void TimerManager::start() {
189
190 bool doStart = false;
191
192 {Synchronized s(_monitor);
193
194 if(_threadFactory == NULL) {throw InvalidArgumentException();}
195
196 if(_state == TimerManager::UNINITIALIZED) {
197
198 _state = TimerManager::STARTING;
199
200 doStart = true;
201 }
202 }
203
204 if(doStart) {
205
206 _dispatcherThread = _threadFactory->newThread(_dispatcher);
207
208 _dispatcherThread->start();
209 }
210
211 {Synchronized s(_monitor);
212
213 while(_state == TimerManager::STARTING) {
214
215 _monitor.wait();
216 }
217
218 assert(_state != TimerManager::STARTING);
219 }
220}
221
222void TimerManager::stop() {
223
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000224 bool doStop = false;
225
Marc Slemko8a40a762006-07-19 17:46:50 +0000226 {Synchronized s(_monitor);
227
228 if(_state == TimerManager::UNINITIALIZED) {
229
230 _state = TimerManager::STOPPED;
231
232 } else if(_state != STOPPING && _state != STOPPED) {
233
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000234 doStop = true;
235
Marc Slemko8a40a762006-07-19 17:46:50 +0000236 _state = STOPPING;
237
238 _monitor.notifyAll();
239 }
240
241 while(_state != STOPPED) {
242
243 _monitor.wait();
244 }
245 }
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000246
247 if(doStop) {
248
249 // Clean up any outstanding tasks
250
251 for(task_iterator ix = _taskMap.begin(); ix != _taskMap.end(); ix++) {
252
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000253 _taskMap.erase(ix);
254 }
255
Marc Slemko6f038a72006-08-03 18:58:09 +0000256 // Remove dispatcher's reference to us.
257
258 _dispatcher->_manager = NULL;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000259 }
Marc Slemko8a40a762006-07-19 17:46:50 +0000260}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000261
Marc Slemko6f038a72006-08-03 18:58:09 +0000262shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000263
264 Synchronized s(_monitor);
265
266 return _threadFactory;
267}
268
Marc Slemko6f038a72006-08-03 18:58:09 +0000269void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000270
271 Synchronized s(_monitor);
272
273 _threadFactory = value;
274}
275
Marc Slemko8a40a762006-07-19 17:46:50 +0000276size_t TimerManager::taskCount() const {
277
278 return _taskCount;
279}
280
Marc Slemko6f038a72006-08-03 18:58:09 +0000281void TimerManager::add(shared_ptr<Runnable> task, long long timeout) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000282
283 long long now = Util::currentTime();
284
285 timeout += now;
286
287 {Synchronized s(_monitor);
288
Marc Slemko8a40a762006-07-19 17:46:50 +0000289 if(_state != TimerManager::STARTED) {
290 throw IllegalStateException();
291 }
292
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000293 _taskCount++;
294
Marc Slemko6f038a72006-08-03 18:58:09 +0000295 _taskMap.insert(std::pair<long long, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000296
297 /* If the task map was empty, or if we have an expiration that is earlier than any previously seen,
298 kick the dispatcher so it can update its timeout */
299
Marc Slemko8a40a762006-07-19 17:46:50 +0000300 if(_taskCount == 1 || timeout < _taskMap.begin()->first) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000301
302 _monitor.notify();
303 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000304 }
305}
306
Marc Slemko6f038a72006-08-03 18:58:09 +0000307void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000308
309 long long expiration;
310
311 Util::toMilliseconds(expiration, value);
312
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000313 long long now = Util::currentTime();
314
Marc Slemko8a40a762006-07-19 17:46:50 +0000315 if(expiration < now) {
316 throw InvalidArgumentException();
317 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000318
319 add(task, expiration - now);
320}
321
322
Marc Slemko6f038a72006-08-03 18:58:09 +0000323void TimerManager::remove(shared_ptr<Runnable> task) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000324 {Synchronized s(_monitor);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000325
Marc Slemko8a40a762006-07-19 17:46:50 +0000326 if(_state != TimerManager::STARTED) {
327 throw IllegalStateException();
328 }
329 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000330}
331
Marc Slemko8a40a762006-07-19 17:46:50 +0000332const TimerManager::STATE TimerManager::state() const { return _state;}
333
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000334}}} // facebook::thrift::concurrency
335