blob: bd68264cec4a0c1b68dbd7afb5bd0a2a76d47c80 [file] [log] [blame]
Marc Slemko0e53ccd2006-07-17 23:51:05 +00001#include "TimerManager.h"
2#include "Util.h"
3
4#include <assert.h>
5
6#include <set>
7
8namespace facebook { namespace thrift { namespace concurrency {
9
10/** TimerManager class
11
12 @author marc
13 @version $Id:$ */
14
15typedef std::multimap<long long, TimerManager::Task*>::iterator task_iterator;
16typedef std::pair<task_iterator, task_iterator> task_range;
17
18class TimerManager::Task : public Runnable {
19
20public:
21 enum STATE {
22 WAITING,
23 EXECUTING,
24 CANCELLED,
25 COMPLETE
26 };
27
28 Task(Runnable* runnable) :
29 _runnable(runnable),
30 _state(WAITING)
31 {}
32
33 ~Task() {};
34
35 void run() {
36 if(_state == EXECUTING) {
37 _runnable->run();
38 _state = COMPLETE;
39 }
40 }
41
42 private:
43
44 Runnable* _runnable;
45
46 STATE _state;
47};
48
49class TimerManager::Dispatcher: public Runnable {
50
51 enum STATE {
52 UNINITIALIZED,
53 STARTING,
54 STARTED,
55 STOPPING,
56 STOPPED
57 };
58
59public:
60 Dispatcher(TimerManager* manager) :
61 _manager(manager),
62 _state(UNINITIALIZED)
63 {}
64
65 ~Dispatcher() {}
66
67 /** Dispatcher entry point
68
69 As long as dispatcher thread is running, pull tasks off the task _taskMap and execute. */
70
71 void run() {
72
73 {Synchronized(_manager->_monitor);
74
75 if(_state == STARTING) {
76 _state = STARTED;
77 }
78 }
79
80 do {
81
82 std::set<TimerManager::Task*> expiredTasks;
83
84 {Synchronized(_manager->_monitor);
85
86 long long now = Util::currentTime();
87
88 task_iterator expiredTaskEnd;
89
90 while(_state == STARTED &&
91 (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.end()) {
92
93 _manager->_monitor.wait(_manager->_nextTimeout - now);
94
95 }
96
97 if(_state == STARTED) {
98
99 for(task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) {
100
101 TimerManager::Task* task = ix->second;
102
103 expiredTasks.insert(task);
104
105 _manager->_taskCount--;
106 }
107
108 _manager->_taskMap.erase(_manager->_taskMap.begin(), expiredTaskEnd);
109 }
110 }
111
112 for(std::set<Task*>::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
113
114 (*ix)->run();
115
116 delete *ix;
117 }
118
119 } while(_state == STARTED);
120
121 {Synchronized(_manager->_monitor);
122
123 if(_state == STOPPING) {
124
125 _state = STOPPED;
126
127 _manager->_monitor.notify();
128
129 }
130 }
131
132 return;
133 }
134
135 private:
136
137 TimerManager* _manager;
138
139 friend class TimerManager;
140
141 STATE _state;
142};
143
144TimerManager::TimerManager() {}
145
146TimerManager::~TimerManager() {}
147
148const ThreadFactory* TimerManager::threadFactory() const {
149
150 Synchronized s(_monitor);
151
152 return _threadFactory;
153}
154
155void TimerManager::threadFactory(const ThreadFactory* value) {
156
157 Synchronized s(_monitor);
158
159 _threadFactory = value;
160}
161
162void TimerManager::add(Runnable* task, long long timeout) {
163
164 long long now = Util::currentTime();
165
166 timeout += now;
167
168 {Synchronized s(_monitor);
169
170 _taskCount++;
171
172 _taskMap.insert(std::pair<long long, Task*>(timeout, new Task(task)));
173
174 /* If the task map was empty, or if we have an expiration that is earlier than any previously seen,
175 kick the dispatcher so it can update its timeout */
176
177 if(_taskCount == 1 || timeout < _nextTimeout) {
178
179 _monitor.notify();
180 }
181
182 if(timeout < _nextTimeout) {
183
184 _nextTimeout = timeout;
185 }
186 }
187}
188
189void TimerManager::add(Runnable* task, const struct timespec& value) {
190
191 long long expiration;
192
193 Util::toMilliseconds(expiration, value);
194
195 /* XXX
196 Need to convert this to an explicit exception */
197
198 long long now = Util::currentTime();
199
200 assert(expiration < now);
201
202 add(task, expiration - now);
203}
204
205
206void TimerManager::remove(Runnable* task) {
207
208}
209
210}}} // facebook::thrift::concurrency
211