blob: 8be8a6e70889d0a1f414d8c74b7664daa5089ae4 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Marc Slemko0e53ccd2006-07-17 23:51:05 +000020#include "TimerManager.h"
Marc Slemko8a40a762006-07-19 17:46:50 +000021#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +000022#include "Util.h"
23
24#include <assert.h>
Marc Slemko8a40a762006-07-19 17:46:50 +000025#include <iostream>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000026#include <set>
27
T Jake Lucianib5e62212009-01-31 22:36:20 +000028namespace apache { namespace thrift { namespace concurrency {
Marc Slemko0e53ccd2006-07-17 23:51:05 +000029
Mark Slee5ea15f92007-03-05 22:55:59 +000030using boost::shared_ptr;
31
Mark Sleef5f2be42006-09-05 21:05:31 +000032/**
33 * TimerManager class
34 *
Mark Sleef5f2be42006-09-05 21:05:31 +000035 * @version $Id:$
36 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000037class TimerManager::Task : public Runnable {
38
Mark Sleef5f2be42006-09-05 21:05:31 +000039 public:
Marc Slemko0e53ccd2006-07-17 23:51:05 +000040 enum STATE {
41 WAITING,
42 EXECUTING,
43 CANCELLED,
44 COMPLETE
45 };
46
Marc Slemko6f038a72006-08-03 18:58:09 +000047 Task(shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +000048 runnable_(runnable),
49 state_(WAITING) {}
David Reiss0c90f6f2008-02-06 22:18:40 +000050
Marc Slemko6f038a72006-08-03 18:58:09 +000051 ~Task() {
Mark Sleef5f2be42006-09-05 21:05:31 +000052 }
David Reiss0c90f6f2008-02-06 22:18:40 +000053
Marc Slemko0e53ccd2006-07-17 23:51:05 +000054 void run() {
Mark Slee2f6404d2006-10-10 01:37:40 +000055 if (state_ == EXECUTING) {
56 runnable_->run();
57 state_ = COMPLETE;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000058 }
59 }
60
61 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000062 shared_ptr<Runnable> runnable_;
Marc Slemko8a40a762006-07-19 17:46:50 +000063 friend class TimerManager::Dispatcher;
Mark Slee2f6404d2006-10-10 01:37:40 +000064 STATE state_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000065};
66
67class TimerManager::Dispatcher: public Runnable {
68
Mark Sleef5f2be42006-09-05 21:05:31 +000069 public:
David Reiss0c90f6f2008-02-06 22:18:40 +000070 Dispatcher(TimerManager* manager) :
Mark Slee2f6404d2006-10-10 01:37:40 +000071 manager_(manager) {}
David Reiss0c90f6f2008-02-06 22:18:40 +000072
Marc Slemko67606e52007-06-04 21:01:19 +000073 ~Dispatcher() {}
David Reiss0c90f6f2008-02-06 22:18:40 +000074
Mark Sleef5f2be42006-09-05 21:05:31 +000075 /**
76 * Dispatcher entry point
77 *
Mark Slee2f6404d2006-10-10 01:37:40 +000078 * As long as dispatcher thread is running, pull tasks off the task taskMap_
Mark Sleef5f2be42006-09-05 21:05:31 +000079 * and execute.
80 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000081 void run() {
Mark Sleef5f2be42006-09-05 21:05:31 +000082 {
Mark Slee2f6404d2006-10-10 01:37:40 +000083 Synchronized s(manager_->monitor_);
84 if (manager_->state_ == TimerManager::STARTING) {
David Reiss96d23882007-07-26 21:10:32 +000085 manager_->state_ = TimerManager::STARTED;
86 manager_->monitor_.notifyAll();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000087 }
88 }
89
90 do {
Marc Slemko6f038a72006-08-03 18:58:09 +000091 std::set<shared_ptr<TimerManager::Task> > expiredTasks;
Mark Sleef5f2be42006-09-05 21:05:31 +000092 {
Mark Slee2f6404d2006-10-10 01:37:40 +000093 Synchronized s(manager_->monitor_);
David Reiss96d23882007-07-26 21:10:32 +000094 task_iterator expiredTaskEnd;
95 int64_t now = Util::currentTime();
David Reiss0c90f6f2008-02-06 22:18:40 +000096 while (manager_->state_ == TimerManager::STARTED &&
Mark Slee2f6404d2006-10-10 01:37:40 +000097 (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) == manager_->taskMap_.begin()) {
David Reiss96d23882007-07-26 21:10:32 +000098 int64_t timeout = 0LL;
99 if (!manager_->taskMap_.empty()) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000100 timeout = manager_->taskMap_.begin()->first - now;
David Reiss96d23882007-07-26 21:10:32 +0000101 }
Aditya Agarwal3f234da2007-04-01 01:19:57 +0000102 assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0));
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000103 try {
Mark Slee2782d6d2007-05-23 04:55:30 +0000104 manager_->monitor_.wait(timeout);
Roger Meierb69d24d2012-10-04 18:02:15 +0000105 } catch (TimedOutException &) {}
David Reiss96d23882007-07-26 21:10:32 +0000106 now = Util::currentTime();
107 }
David Reiss0c90f6f2008-02-06 22:18:40 +0000108
David Reiss96d23882007-07-26 21:10:32 +0000109 if (manager_->state_ == TimerManager::STARTED) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000110 for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000111 shared_ptr<TimerManager::Task> task = ix->second;
Mark Sleef5f2be42006-09-05 21:05:31 +0000112 expiredTasks.insert(task);
David Reiss96d23882007-07-26 21:10:32 +0000113 if (task->state_ == TimerManager::Task::WAITING) {
114 task->state_ = TimerManager::Task::EXECUTING;
115 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000116 manager_->taskCount_--;
David Reiss96d23882007-07-26 21:10:32 +0000117 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000118 manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
David Reiss96d23882007-07-26 21:10:32 +0000119 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000120 }
David Reiss0c90f6f2008-02-06 22:18:40 +0000121
Mark Sleef5f2be42006-09-05 21:05:31 +0000122 for (std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
123 (*ix)->run();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000124 }
David Reiss0c90f6f2008-02-06 22:18:40 +0000125
Mark Slee2f6404d2006-10-10 01:37:40 +0000126 } while (manager_->state_ == TimerManager::STARTED);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000127
Mark Sleef5f2be42006-09-05 21:05:31 +0000128 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000129 Synchronized s(manager_->monitor_);
130 if (manager_->state_ == TimerManager::STOPPING) {
David Reiss0c90f6f2008-02-06 22:18:40 +0000131 manager_->state_ = TimerManager::STOPPED;
David Reiss96d23882007-07-26 21:10:32 +0000132 manager_->monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000133 }
134 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000135 return;
136 }
137
138 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000139 TimerManager* manager_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000140 friend class TimerManager;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000141};
142
Roger Meierb69d24d2012-10-04 18:02:15 +0000143#if defined(_MSC_VER)
144#pragma warning(push)
145#pragma warning(disable: 4355) // 'this' used in base member initializer list
146#endif
147
Marc Slemko8a40a762006-07-19 17:46:50 +0000148TimerManager::TimerManager() :
Mark Slee2f6404d2006-10-10 01:37:40 +0000149 taskCount_(0),
150 state_(TimerManager::UNINITIALIZED),
151 dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000152}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000153
Roger Meierb69d24d2012-10-04 18:02:15 +0000154#if defined(_MSC_VER)
155#pragma warning(pop)
156#endif
Marc Slemko8a40a762006-07-19 17:46:50 +0000157
158TimerManager::~TimerManager() {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000159
Mark Sleef5f2be42006-09-05 21:05:31 +0000160 // If we haven't been explicitly stopped, do so now. We don't need to grab
161 // the monitor here, since stop already takes care of reentrancy.
David Reiss0c90f6f2008-02-06 22:18:40 +0000162
Mark Slee2f6404d2006-10-10 01:37:40 +0000163 if (state_ != STOPPED) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000164 try {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000165 stop();
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000166 } catch(...) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000167 throw;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000168 // uhoh
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000169 }
170 }
Marc Slemko8a40a762006-07-19 17:46:50 +0000171}
172
173void TimerManager::start() {
Marc Slemko8a40a762006-07-19 17:46:50 +0000174 bool doStart = false;
Mark Sleef5f2be42006-09-05 21:05:31 +0000175 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000176 Synchronized s(monitor_);
177 if (threadFactory_ == NULL) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000178 throw InvalidArgumentException();
179 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000180 if (state_ == TimerManager::UNINITIALIZED) {
181 state_ = TimerManager::STARTING;
Marc Slemko8a40a762006-07-19 17:46:50 +0000182 doStart = true;
183 }
184 }
185
Mark Sleef5f2be42006-09-05 21:05:31 +0000186 if (doStart) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000187 dispatcherThread_ = threadFactory_->newThread(dispatcher_);
188 dispatcherThread_->start();
Marc Slemko8a40a762006-07-19 17:46:50 +0000189 }
190
Mark Sleef5f2be42006-09-05 21:05:31 +0000191 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000192 Synchronized s(monitor_);
193 while (state_ == TimerManager::STARTING) {
194 monitor_.wait();
Marc Slemko8a40a762006-07-19 17:46:50 +0000195 }
Aditya Agarwal3f234da2007-04-01 01:19:57 +0000196 assert(state_ != TimerManager::STARTING);
Marc Slemko8a40a762006-07-19 17:46:50 +0000197 }
198}
199
200void TimerManager::stop() {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000201 bool doStop = false;
Mark Sleef5f2be42006-09-05 21:05:31 +0000202 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000203 Synchronized s(monitor_);
204 if (state_ == TimerManager::UNINITIALIZED) {
205 state_ = TimerManager::STOPPED;
206 } else if (state_ != STOPPING && state_ != STOPPED) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000207 doStop = true;
Mark Slee2f6404d2006-10-10 01:37:40 +0000208 state_ = STOPPING;
209 monitor_.notifyAll();
Marc Slemko8a40a762006-07-19 17:46:50 +0000210 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000211 while (state_ != STOPPED) {
212 monitor_.wait();
Marc Slemko8a40a762006-07-19 17:46:50 +0000213 }
214 }
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000215
Mark Sleef5f2be42006-09-05 21:05:31 +0000216 if (doStop) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000217 // Clean up any outstanding tasks
David Reiss5fa20da2009-06-04 00:32:47 +0000218 taskMap_.clear();
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000219
David Reiss0c90f6f2008-02-06 22:18:40 +0000220 // Remove dispatcher's reference to us.
Mark Slee2f6404d2006-10-10 01:37:40 +0000221 dispatcher_->manager_ = NULL;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000222 }
Marc Slemko8a40a762006-07-19 17:46:50 +0000223}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000224
Marc Slemko6f038a72006-08-03 18:58:09 +0000225shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
David Reiss0c90f6f2008-02-06 22:18:40 +0000226 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000227 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000228}
David Reiss0c90f6f2008-02-06 22:18:40 +0000229
Marc Slemko6f038a72006-08-03 18:58:09 +0000230void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
David Reiss0c90f6f2008-02-06 22:18:40 +0000231 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000232 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000233}
234
Marc Slemko8a40a762006-07-19 17:46:50 +0000235size_t TimerManager::taskCount() const {
Mark Slee2f6404d2006-10-10 01:37:40 +0000236 return taskCount_;
Marc Slemko8a40a762006-07-19 17:46:50 +0000237}
David Reiss0c90f6f2008-02-06 22:18:40 +0000238
Mark Slee9b82d272007-05-23 05:16:07 +0000239void TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) {
240 int64_t now = Util::currentTime();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000241 timeout += now;
242
Mark Sleef5f2be42006-09-05 21:05:31 +0000243 {
David Reiss0c90f6f2008-02-06 22:18:40 +0000244 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000245 if (state_ != TimerManager::STARTED) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000246 throw IllegalStateException();
247 }
248
David Reiss52687eb2009-06-04 00:32:57 +0000249 // If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him
250 // if the expiration time is shorter than the current value. Need to test before we insert,
251 // because the new task might insert at the front.
252 bool notifyRequired = (taskCount_ == 0) ? true : timeout < taskMap_.begin()->first;
253
Mark Slee2f6404d2006-10-10 01:37:40 +0000254 taskCount_++;
Mark Slee9b82d272007-05-23 05:16:07 +0000255 taskMap_.insert(std::pair<int64_t, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000256
Mark Sleef5f2be42006-09-05 21:05:31 +0000257 // If the task map was empty, or if we have an expiration that is earlier
258 // than any previously seen, kick the dispatcher so it can update its
259 // timeout
David Reiss52687eb2009-06-04 00:32:57 +0000260 if (notifyRequired) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000261 monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000262 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000263 }
264}
265
Marc Slemko6f038a72006-08-03 18:58:09 +0000266void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000267
Mark Slee9b82d272007-05-23 05:16:07 +0000268 int64_t expiration;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000269 Util::toMilliseconds(expiration, value);
270
Mark Slee9b82d272007-05-23 05:16:07 +0000271 int64_t now = Util::currentTime();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000272
Mark Sleef5f2be42006-09-05 21:05:31 +0000273 if (expiration < now) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000274 throw InvalidArgumentException();
275 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000276
277 add(task, expiration - now);
278}
279
280
Marc Slemko6f038a72006-08-03 18:58:09 +0000281void TimerManager::remove(shared_ptr<Runnable> task) {
Roger Meier3b771a12010-11-17 22:11:26 +0000282 (void) task;
David Reiss0c90f6f2008-02-06 22:18:40 +0000283 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000284 if (state_ != TimerManager::STARTED) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000285 throw IllegalStateException();
Marc Slemko8a40a762006-07-19 17:46:50 +0000286 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000287}
288
Roger Meier3b771a12010-11-17 22:11:26 +0000289TimerManager::STATE TimerManager::state() const { return state_; }
Marc Slemko8a40a762006-07-19 17:46:50 +0000290
T Jake Lucianib5e62212009-01-31 22:36:20 +0000291}}} // apache::thrift::concurrency
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000292