blob: 25515dc820343de1856a4fe1e122bd27700e5f1e [file] [log] [blame]
Gavin McDonald0b75e1a2010-10-28 02:12:01 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#include "TimerManager.h"
21#include "Exception.h"
22#include "Util.h"
23
24#include <assert.h>
25#include <iostream>
26#include <set>
27
28namespace apache { namespace thrift { namespace concurrency {
29
30using boost::shared_ptr;
31
32typedef std::multimap<int64_t, shared_ptr<TimerManager::Task> >::iterator task_iterator;
33typedef std::pair<task_iterator, task_iterator> task_range;
34
35/**
36 * TimerManager class
37 *
38 * @version $Id:$
39 */
40class TimerManager::Task : public Runnable {
41
42 public:
43 enum STATE {
44 WAITING,
45 EXECUTING,
46 CANCELLED,
47 COMPLETE
48 };
49
50 Task(shared_ptr<Runnable> runnable) :
51 runnable_(runnable),
52 state_(WAITING) {}
53
54 ~Task() {
55 }
56
57 void run() {
58 if (state_ == EXECUTING) {
59 runnable_->run();
60 state_ = COMPLETE;
61 }
62 }
63
64 private:
65 shared_ptr<Runnable> runnable_;
66 class TimerManager::Dispatcher;
67 friend class TimerManager::Dispatcher;
68 STATE state_;
69};
70
71class TimerManager::Dispatcher: public Runnable {
72
73 public:
74 Dispatcher(TimerManager* manager) :
75 manager_(manager) {}
76
77 ~Dispatcher() {}
78
79 /**
80 * Dispatcher entry point
81 *
82 * As long as dispatcher thread is running, pull tasks off the task taskMap_
83 * and execute.
84 */
85 void run() {
86 {
87 Synchronized s(manager_->monitor_);
88 if (manager_->state_ == TimerManager::STARTING) {
89 manager_->state_ = TimerManager::STARTED;
90 manager_->monitor_.notifyAll();
91 }
92 }
93
94 do {
95 std::set<shared_ptr<TimerManager::Task> > expiredTasks;
96 {
97 Synchronized s(manager_->monitor_);
98 task_iterator expiredTaskEnd;
99 int64_t now = Util::currentTime();
100 while (manager_->state_ == TimerManager::STARTED &&
101 (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) == manager_->taskMap_.begin()) {
102 int64_t timeout = 0LL;
103 if (!manager_->taskMap_.empty()) {
104 timeout = manager_->taskMap_.begin()->first - now;
105 }
106 assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0));
107 try {
108 manager_->monitor_.wait(timeout);
109 } catch (TimedOutException &e) {}
110 now = Util::currentTime();
111 }
112
113 if (manager_->state_ == TimerManager::STARTED) {
114 for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
115 shared_ptr<TimerManager::Task> task = ix->second;
116 expiredTasks.insert(task);
117 if (task->state_ == TimerManager::Task::WAITING) {
118 task->state_ = TimerManager::Task::EXECUTING;
119 }
120 manager_->taskCount_--;
121 }
122 manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
123 }
124 }
125
126 for (std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
127 (*ix)->run();
128 }
129
130 } while (manager_->state_ == TimerManager::STARTED);
131
132 {
133 Synchronized s(manager_->monitor_);
134 if (manager_->state_ == TimerManager::STOPPING) {
135 manager_->state_ = TimerManager::STOPPED;
136 manager_->monitor_.notify();
137 }
138 }
139 return;
140 }
141
142 private:
143 TimerManager* manager_;
144 friend class TimerManager;
145};
146
147TimerManager::TimerManager() :
148 taskCount_(0),
149 state_(TimerManager::UNINITIALIZED),
150 dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {
151}
152
153
154TimerManager::~TimerManager() {
155
156 // If we haven't been explicitly stopped, do so now. We don't need to grab
157 // the monitor here, since stop already takes care of reentrancy.
158
159 if (state_ != STOPPED) {
160 try {
161 stop();
162 } catch(...) {
163 throw;
164 // uhoh
165 }
166 }
167}
168
169void TimerManager::start() {
170 bool doStart = false;
171 {
172 Synchronized s(monitor_);
173 if (threadFactory_ == NULL) {
174 throw InvalidArgumentException();
175 }
176 if (state_ == TimerManager::UNINITIALIZED) {
177 state_ = TimerManager::STARTING;
178 doStart = true;
179 }
180 }
181
182 if (doStart) {
183 dispatcherThread_ = threadFactory_->newThread(dispatcher_);
184 dispatcherThread_->start();
185 }
186
187 {
188 Synchronized s(monitor_);
189 while (state_ == TimerManager::STARTING) {
190 monitor_.wait();
191 }
192 assert(state_ != TimerManager::STARTING);
193 }
194}
195
196void TimerManager::stop() {
197 bool doStop = false;
198 {
199 Synchronized s(monitor_);
200 if (state_ == TimerManager::UNINITIALIZED) {
201 state_ = TimerManager::STOPPED;
202 } else if (state_ != STOPPING && state_ != STOPPED) {
203 doStop = true;
204 state_ = STOPPING;
205 monitor_.notifyAll();
206 }
207 while (state_ != STOPPED) {
208 monitor_.wait();
209 }
210 }
211
212 if (doStop) {
213 // Clean up any outstanding tasks
214 for (task_iterator ix = taskMap_.begin(); ix != taskMap_.end(); ix++) {
215 taskMap_.erase(ix);
216 }
217
218 // Remove dispatcher's reference to us.
219 dispatcher_->manager_ = NULL;
220 }
221}
222
223shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
224 Synchronized s(monitor_);
225 return threadFactory_;
226}
227
228void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
229 Synchronized s(monitor_);
230 threadFactory_ = value;
231}
232
233size_t TimerManager::taskCount() const {
234 return taskCount_;
235}
236
237void TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) {
238 int64_t now = Util::currentTime();
239 timeout += now;
240
241 {
242 Synchronized s(monitor_);
243 if (state_ != TimerManager::STARTED) {
244 throw IllegalStateException();
245 }
246
247 taskCount_++;
248 taskMap_.insert(std::pair<int64_t, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
249
250 // If the task map was empty, or if we have an expiration that is earlier
251 // than any previously seen, kick the dispatcher so it can update its
252 // timeout
253 if (taskCount_ == 1 || timeout < taskMap_.begin()->first) {
254 monitor_.notify();
255 }
256 }
257}
258
259void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) {
260
261 int64_t expiration;
262 Util::toMilliseconds(expiration, value);
263
264 int64_t now = Util::currentTime();
265
266 if (expiration < now) {
267 throw InvalidArgumentException();
268 }
269
270 add(task, expiration - now);
271}
272
273
274void TimerManager::remove(shared_ptr<Runnable> task) {
275 Synchronized s(monitor_);
276 if (state_ != TimerManager::STARTED) {
277 throw IllegalStateException();
278 }
279}
280
281const TimerManager::STATE TimerManager::state() const { return state_; }
282
283}}} // apache::thrift::concurrency
284