blob: 93b0dc5491ca29c36da59e53264bfd3091ea79cc [file] [log] [blame]
Marc Slemko0e53ccd2006-07-17 23:51:05 +00001#include "TimerManager.h"
Marc Slemko8a40a762006-07-19 17:46:50 +00002#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +00003#include "Util.h"
4
5#include <assert.h>
Marc Slemko8a40a762006-07-19 17:46:50 +00006#include <iostream>
Marc Slemko0e53ccd2006-07-17 23:51:05 +00007#include <set>
8
9namespace facebook { namespace thrift { namespace concurrency {
10
11/** TimerManager class
12
13 @author marc
14 @version $Id:$ */
15
16typedef std::multimap<long long, TimerManager::Task*>::iterator task_iterator;
17typedef std::pair<task_iterator, task_iterator> task_range;
18
19class TimerManager::Task : public Runnable {
20
21public:
22 enum STATE {
23 WAITING,
24 EXECUTING,
25 CANCELLED,
26 COMPLETE
27 };
28
29 Task(Runnable* runnable) :
30 _runnable(runnable),
31 _state(WAITING)
32 {}
33
34 ~Task() {};
35
36 void run() {
37 if(_state == EXECUTING) {
38 _runnable->run();
39 _state = COMPLETE;
40 }
41 }
42
43 private:
44
45 Runnable* _runnable;
46
Marc Slemko8a40a762006-07-19 17:46:50 +000047 class TimerManager::Dispatcher;
48
49 friend class TimerManager::Dispatcher;
50
Marc Slemko0e53ccd2006-07-17 23:51:05 +000051 STATE _state;
52};
53
54class TimerManager::Dispatcher: public Runnable {
55
Marc Slemko0e53ccd2006-07-17 23:51:05 +000056public:
57 Dispatcher(TimerManager* manager) :
Marc Slemko8a40a762006-07-19 17:46:50 +000058 _manager(manager) {
59}
Marc Slemko0e53ccd2006-07-17 23:51:05 +000060
61 ~Dispatcher() {}
62
63 /** Dispatcher entry point
64
65 As long as dispatcher thread is running, pull tasks off the task _taskMap and execute. */
66
67 void run() {
68
Marc Slemko8a40a762006-07-19 17:46:50 +000069 {Synchronized s(_manager->_monitor);
Marc Slemko0e53ccd2006-07-17 23:51:05 +000070
Marc Slemko8a40a762006-07-19 17:46:50 +000071 if(_manager->_state == TimerManager::STARTING) {
72
73 _manager->_state = TimerManager::STARTED;
74
75 _manager->_monitor.notifyAll();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000076 }
77 }
78
79 do {
80
81 std::set<TimerManager::Task*> expiredTasks;
82
Marc Slemko8a40a762006-07-19 17:46:50 +000083 {Synchronized s(_manager->_monitor);
84
Marc Slemko0e53ccd2006-07-17 23:51:05 +000085 task_iterator expiredTaskEnd;
Marc Slemko8a40a762006-07-19 17:46:50 +000086
Marc Slemko9f27a4e2006-07-19 20:02:22 +000087 long long now = Util::currentTime();
88
Marc Slemko8a40a762006-07-19 17:46:50 +000089 while(_manager->_state == TimerManager::STARTED &&
Marc Slemko9f27a4e2006-07-19 20:02:22 +000090 (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.begin()) {
Marc Slemko8a40a762006-07-19 17:46:50 +000091
92 long long timeout = 0LL;
93
94 if(!_manager->_taskMap.empty()) {
95
Marc Slemko9f27a4e2006-07-19 20:02:22 +000096 timeout = _manager->_taskMap.begin()->first - now;
Marc Slemko8a40a762006-07-19 17:46:50 +000097 }
Marc Slemko9f27a4e2006-07-19 20:02:22 +000098
99 assert((timeout != 0 && _manager->_taskCount > 0) || (timeout == 0 && _manager->_taskCount == 0));
Marc Slemko8a40a762006-07-19 17:46:50 +0000100
101 _manager->_monitor.wait(timeout);
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000102
103 now = Util::currentTime();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000104 }
105
Marc Slemko8a40a762006-07-19 17:46:50 +0000106 if(_manager->_state == TimerManager::STARTED) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000107
108 for(task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) {
109
110 TimerManager::Task* task = ix->second;
111
112 expiredTasks.insert(task);
Marc Slemko8a40a762006-07-19 17:46:50 +0000113
114 if(task->_state == TimerManager::Task::WAITING) {
115
116 task->_state = TimerManager::Task::EXECUTING;
117 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000118
119 _manager->_taskCount--;
120 }
121
122 _manager->_taskMap.erase(_manager->_taskMap.begin(), expiredTaskEnd);
123 }
124 }
125
126 for(std::set<Task*>::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
127
128 (*ix)->run();
129
130 delete *ix;
131 }
132
Marc Slemko8a40a762006-07-19 17:46:50 +0000133 } while(_manager->_state == TimerManager::STARTED);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000134
Marc Slemko8a40a762006-07-19 17:46:50 +0000135 {Synchronized s(_manager->_monitor);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000136
Marc Slemko8a40a762006-07-19 17:46:50 +0000137 if(_manager->_state == TimerManager::STOPPING) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000138
Marc Slemko8a40a762006-07-19 17:46:50 +0000139 _manager->_state = TimerManager::STOPPED;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000140
141 _manager->_monitor.notify();
142
143 }
144 }
145
146 return;
147 }
148
149 private:
150
151 TimerManager* _manager;
152
153 friend class TimerManager;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000154};
155
Marc Slemko8a40a762006-07-19 17:46:50 +0000156TimerManager::TimerManager() :
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000157 _taskCount(0),
Marc Slemko8a40a762006-07-19 17:46:50 +0000158 _state(TimerManager::UNINITIALIZED),
159 _dispatcher(new Dispatcher(this)) {
160}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000161
Marc Slemko8a40a762006-07-19 17:46:50 +0000162
163TimerManager::~TimerManager() {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000164
165 /* If we haven't been explicitly stopped, do so now. We don't need to grab the monitor here, since
166 stop already takes care of reentrancy. */
167
168 if(_state != STOPPED) {
169
170 try {
171
172 stop();
173
174 } catch(...) {
175
176 // uhoh
177
178 }
179 }
Marc Slemko8a40a762006-07-19 17:46:50 +0000180}
181
182void TimerManager::start() {
183
184 bool doStart = false;
185
186 {Synchronized s(_monitor);
187
188 if(_threadFactory == NULL) {throw InvalidArgumentException();}
189
190 if(_state == TimerManager::UNINITIALIZED) {
191
192 _state = TimerManager::STARTING;
193
194 doStart = true;
195 }
196 }
197
198 if(doStart) {
199
200 _dispatcherThread = _threadFactory->newThread(_dispatcher);
201
202 _dispatcherThread->start();
203 }
204
205 {Synchronized s(_monitor);
206
207 while(_state == TimerManager::STARTING) {
208
209 _monitor.wait();
210 }
211
212 assert(_state != TimerManager::STARTING);
213 }
214}
215
216void TimerManager::stop() {
217
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000218 bool doStop = false;
219
Marc Slemko8a40a762006-07-19 17:46:50 +0000220 {Synchronized s(_monitor);
221
222 if(_state == TimerManager::UNINITIALIZED) {
223
224 _state = TimerManager::STOPPED;
225
226 } else if(_state != STOPPING && _state != STOPPED) {
227
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000228 doStop = true;
229
Marc Slemko8a40a762006-07-19 17:46:50 +0000230 _state = STOPPING;
231
232 _monitor.notifyAll();
233 }
234
235 while(_state != STOPPED) {
236
237 _monitor.wait();
238 }
239 }
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000240
241 if(doStop) {
242
243 // Clean up any outstanding tasks
244
245 for(task_iterator ix = _taskMap.begin(); ix != _taskMap.end(); ix++) {
246
247 delete ix->second;
248
249 _taskMap.erase(ix);
250 }
251
252 delete _dispatcher;
253 }
254
Marc Slemko8a40a762006-07-19 17:46:50 +0000255}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000256
257const ThreadFactory* TimerManager::threadFactory() const {
258
259 Synchronized s(_monitor);
260
261 return _threadFactory;
262}
263
264void TimerManager::threadFactory(const ThreadFactory* value) {
265
266 Synchronized s(_monitor);
267
268 _threadFactory = value;
269}
270
Marc Slemko8a40a762006-07-19 17:46:50 +0000271size_t TimerManager::taskCount() const {
272
273 return _taskCount;
274}
275
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000276void TimerManager::add(Runnable* task, long long timeout) {
277
278 long long now = Util::currentTime();
279
280 timeout += now;
281
282 {Synchronized s(_monitor);
283
Marc Slemko8a40a762006-07-19 17:46:50 +0000284 if(_state != TimerManager::STARTED) {
285 throw IllegalStateException();
286 }
287
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000288 _taskCount++;
289
290 _taskMap.insert(std::pair<long long, Task*>(timeout, new Task(task)));
291
292 /* If the task map was empty, or if we have an expiration that is earlier than any previously seen,
293 kick the dispatcher so it can update its timeout */
294
Marc Slemko8a40a762006-07-19 17:46:50 +0000295 if(_taskCount == 1 || timeout < _taskMap.begin()->first) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000296
297 _monitor.notify();
298 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000299 }
300}
301
302void TimerManager::add(Runnable* task, const struct timespec& value) {
303
304 long long expiration;
305
306 Util::toMilliseconds(expiration, value);
307
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000308 long long now = Util::currentTime();
309
Marc Slemko8a40a762006-07-19 17:46:50 +0000310 if(expiration < now) {
311 throw InvalidArgumentException();
312 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000313
314 add(task, expiration - now);
315}
316
317
318void TimerManager::remove(Runnable* task) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000319 {Synchronized s(_monitor);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000320
Marc Slemko8a40a762006-07-19 17:46:50 +0000321 if(_state != TimerManager::STARTED) {
322 throw IllegalStateException();
323 }
324 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000325}
326
Marc Slemko8a40a762006-07-19 17:46:50 +0000327const TimerManager::STATE TimerManager::state() const { return _state;}
328
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000329}}} // facebook::thrift::concurrency
330