blob: 015ffba5c3e105bbd1ceefb19037bc2239fdf84c [file] [log] [blame]
Marc Slemko0e53ccd2006-07-17 23:51:05 +00001#include "TimerManager.h"
Marc Slemko8a40a762006-07-19 17:46:50 +00002#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +00003#include "Util.h"
4
5#include <assert.h>
Marc Slemko8a40a762006-07-19 17:46:50 +00006#include <iostream>
Marc Slemko0e53ccd2006-07-17 23:51:05 +00007#include <set>
8
9namespace facebook { namespace thrift { namespace concurrency {
10
11/** TimerManager class
12
13 @author marc
14 @version $Id:$ */
15
16typedef std::multimap<long long, TimerManager::Task*>::iterator task_iterator;
17typedef std::pair<task_iterator, task_iterator> task_range;
18
19class TimerManager::Task : public Runnable {
20
21public:
22 enum STATE {
23 WAITING,
24 EXECUTING,
25 CANCELLED,
26 COMPLETE
27 };
28
29 Task(Runnable* runnable) :
30 _runnable(runnable),
31 _state(WAITING)
32 {}
33
34 ~Task() {};
35
36 void run() {
37 if(_state == EXECUTING) {
38 _runnable->run();
39 _state = COMPLETE;
40 }
41 }
42
43 private:
44
45 Runnable* _runnable;
46
Marc Slemko8a40a762006-07-19 17:46:50 +000047 class TimerManager::Dispatcher;
48
49 friend class TimerManager::Dispatcher;
50
Marc Slemko0e53ccd2006-07-17 23:51:05 +000051 STATE _state;
52};
53
54class TimerManager::Dispatcher: public Runnable {
55
Marc Slemko0e53ccd2006-07-17 23:51:05 +000056public:
57 Dispatcher(TimerManager* manager) :
Marc Slemko8a40a762006-07-19 17:46:50 +000058 _manager(manager) {
59}
Marc Slemko0e53ccd2006-07-17 23:51:05 +000060
61 ~Dispatcher() {}
62
63 /** Dispatcher entry point
64
65 As long as dispatcher thread is running, pull tasks off the task _taskMap and execute. */
66
67 void run() {
68
Marc Slemko8a40a762006-07-19 17:46:50 +000069 {Synchronized s(_manager->_monitor);
Marc Slemko0e53ccd2006-07-17 23:51:05 +000070
Marc Slemko8a40a762006-07-19 17:46:50 +000071 if(_manager->_state == TimerManager::STARTING) {
72
73 _manager->_state = TimerManager::STARTED;
74
75 _manager->_monitor.notifyAll();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000076 }
77 }
78
79 do {
80
81 std::set<TimerManager::Task*> expiredTasks;
82
Marc Slemko8a40a762006-07-19 17:46:50 +000083 {Synchronized s(_manager->_monitor);
84
85 /* Update next timeout if necessary */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000086
87 task_iterator expiredTaskEnd;
Marc Slemko8a40a762006-07-19 17:46:50 +000088
89 while(_manager->_state == TimerManager::STARTED &&
90 (expiredTaskEnd = _manager->_taskMap.upper_bound(Util::currentTime())) == _manager->_taskMap.begin()) {
91
92 long long timeout = 0LL;
93
94 if(!_manager->_taskMap.empty()) {
95
96 timeout = Util::currentTime() - _manager->_taskMap.begin()->first;
97 }
98
99 _manager->_monitor.wait(timeout);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000100
101 }
102
Marc Slemko8a40a762006-07-19 17:46:50 +0000103 if(_manager->_state == TimerManager::STARTED) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000104
105 for(task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) {
106
107 TimerManager::Task* task = ix->second;
108
109 expiredTasks.insert(task);
Marc Slemko8a40a762006-07-19 17:46:50 +0000110
111 if(task->_state == TimerManager::Task::WAITING) {
112
113 task->_state = TimerManager::Task::EXECUTING;
114 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000115
116 _manager->_taskCount--;
117 }
118
119 _manager->_taskMap.erase(_manager->_taskMap.begin(), expiredTaskEnd);
120 }
121 }
122
123 for(std::set<Task*>::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
124
125 (*ix)->run();
126
127 delete *ix;
128 }
129
Marc Slemko8a40a762006-07-19 17:46:50 +0000130 } while(_manager->_state == TimerManager::STARTED);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000131
Marc Slemko8a40a762006-07-19 17:46:50 +0000132 {Synchronized s(_manager->_monitor);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000133
Marc Slemko8a40a762006-07-19 17:46:50 +0000134 if(_manager->_state == TimerManager::STOPPING) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000135
Marc Slemko8a40a762006-07-19 17:46:50 +0000136 _manager->_state = TimerManager::STOPPED;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000137
138 _manager->_monitor.notify();
139
140 }
141 }
142
143 return;
144 }
145
146 private:
147
148 TimerManager* _manager;
149
150 friend class TimerManager;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000151};
152
Marc Slemko8a40a762006-07-19 17:46:50 +0000153TimerManager::TimerManager() :
154 _state(TimerManager::UNINITIALIZED),
155 _dispatcher(new Dispatcher(this)) {
156}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000157
Marc Slemko8a40a762006-07-19 17:46:50 +0000158
159TimerManager::~TimerManager() {
160 delete _dispatcher;
161}
162
163void TimerManager::start() {
164
165 bool doStart = false;
166
167 {Synchronized s(_monitor);
168
169 if(_threadFactory == NULL) {throw InvalidArgumentException();}
170
171 if(_state == TimerManager::UNINITIALIZED) {
172
173 _state = TimerManager::STARTING;
174
175 doStart = true;
176 }
177 }
178
179 if(doStart) {
180
181 _dispatcherThread = _threadFactory->newThread(_dispatcher);
182
183 _dispatcherThread->start();
184 }
185
186 {Synchronized s(_monitor);
187
188 while(_state == TimerManager::STARTING) {
189
190 _monitor.wait();
191 }
192
193 assert(_state != TimerManager::STARTING);
194 }
195}
196
197void TimerManager::stop() {
198
199 {Synchronized s(_monitor);
200
201 if(_state == TimerManager::UNINITIALIZED) {
202
203 _state = TimerManager::STOPPED;
204
205 } else if(_state != STOPPING && _state != STOPPED) {
206
207 _state = STOPPING;
208
209 _monitor.notifyAll();
210 }
211
212 while(_state != STOPPED) {
213
214 _monitor.wait();
215 }
216 }
217}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000218
219const ThreadFactory* TimerManager::threadFactory() const {
220
221 Synchronized s(_monitor);
222
223 return _threadFactory;
224}
225
226void TimerManager::threadFactory(const ThreadFactory* value) {
227
228 Synchronized s(_monitor);
229
230 _threadFactory = value;
231}
232
Marc Slemko8a40a762006-07-19 17:46:50 +0000233size_t TimerManager::taskCount() const {
234
235 return _taskCount;
236}
237
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000238void TimerManager::add(Runnable* task, long long timeout) {
239
240 long long now = Util::currentTime();
241
242 timeout += now;
243
244 {Synchronized s(_monitor);
245
Marc Slemko8a40a762006-07-19 17:46:50 +0000246 if(_state != TimerManager::STARTED) {
247 throw IllegalStateException();
248 }
249
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000250 _taskCount++;
251
252 _taskMap.insert(std::pair<long long, Task*>(timeout, new Task(task)));
253
254 /* If the task map was empty, or if we have an expiration that is earlier than any previously seen,
255 kick the dispatcher so it can update its timeout */
256
Marc Slemko8a40a762006-07-19 17:46:50 +0000257 if(_taskCount == 1 || timeout < _taskMap.begin()->first) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000258
259 _monitor.notify();
260 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000261 }
262}
263
264void TimerManager::add(Runnable* task, const struct timespec& value) {
265
266 long long expiration;
267
268 Util::toMilliseconds(expiration, value);
269
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000270 long long now = Util::currentTime();
271
Marc Slemko8a40a762006-07-19 17:46:50 +0000272 if(expiration < now) {
273 throw InvalidArgumentException();
274 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000275
276 add(task, expiration - now);
277}
278
279
280void TimerManager::remove(Runnable* task) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000281 {Synchronized s(_monitor);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000282
Marc Slemko8a40a762006-07-19 17:46:50 +0000283 if(_state != TimerManager::STARTED) {
284 throw IllegalStateException();
285 }
286 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000287}
288
Marc Slemko8a40a762006-07-19 17:46:50 +0000289const TimerManager::STATE TimerManager::state() const { return _state;}
290
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000291}}} // facebook::thrift::concurrency
292