blob: 122d26ed8d71b3433460a2add5eec7120557a7bb [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Roger Meier4285ba22013-06-10 21:17:23 +020020#include <thrift/concurrency/TimerManager.h>
21#include <thrift/concurrency/Exception.h>
22#include <thrift/concurrency/Util.h>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000023
24#include <assert.h>
Marc Slemko8a40a762006-07-19 17:46:50 +000025#include <iostream>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000026#include <set>
27
Konrad Grochowski16a23a62014-11-13 15:33:38 +010028namespace apache {
29namespace thrift {
30namespace concurrency {
Marc Slemko0e53ccd2006-07-17 23:51:05 +000031
Mark Slee5ea15f92007-03-05 22:55:59 +000032using boost::shared_ptr;
33
Mark Sleef5f2be42006-09-05 21:05:31 +000034/**
35 * TimerManager class
36 *
Mark Sleef5f2be42006-09-05 21:05:31 +000037 * @version $Id:$
38 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000039class TimerManager::Task : public Runnable {
40
Konrad Grochowski16a23a62014-11-13 15:33:38 +010041public:
42 enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE };
Marc Slemko0e53ccd2006-07-17 23:51:05 +000043
Konrad Grochowski16a23a62014-11-13 15:33:38 +010044 Task(shared_ptr<Runnable> runnable) : runnable_(runnable), state_(WAITING) {}
David Reiss0c90f6f2008-02-06 22:18:40 +000045
Konrad Grochowski16a23a62014-11-13 15:33:38 +010046 ~Task() {}
David Reiss0c90f6f2008-02-06 22:18:40 +000047
Marc Slemko0e53ccd2006-07-17 23:51:05 +000048 void run() {
Mark Slee2f6404d2006-10-10 01:37:40 +000049 if (state_ == EXECUTING) {
50 runnable_->run();
51 state_ = COMPLETE;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000052 }
53 }
54
Konrad Grochowski16a23a62014-11-13 15:33:38 +010055private:
Mark Slee2f6404d2006-10-10 01:37:40 +000056 shared_ptr<Runnable> runnable_;
Marc Slemko8a40a762006-07-19 17:46:50 +000057 friend class TimerManager::Dispatcher;
Mark Slee2f6404d2006-10-10 01:37:40 +000058 STATE state_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000059};
60
Konrad Grochowski16a23a62014-11-13 15:33:38 +010061class TimerManager::Dispatcher : public Runnable {
Marc Slemko0e53ccd2006-07-17 23:51:05 +000062
Konrad Grochowski16a23a62014-11-13 15:33:38 +010063public:
64 Dispatcher(TimerManager* manager) : manager_(manager) {}
David Reiss0c90f6f2008-02-06 22:18:40 +000065
Marc Slemko67606e52007-06-04 21:01:19 +000066 ~Dispatcher() {}
David Reiss0c90f6f2008-02-06 22:18:40 +000067
Mark Sleef5f2be42006-09-05 21:05:31 +000068 /**
69 * Dispatcher entry point
70 *
Mark Slee2f6404d2006-10-10 01:37:40 +000071 * As long as dispatcher thread is running, pull tasks off the task taskMap_
Mark Sleef5f2be42006-09-05 21:05:31 +000072 * and execute.
73 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000074 void run() {
Mark Sleef5f2be42006-09-05 21:05:31 +000075 {
Mark Slee2f6404d2006-10-10 01:37:40 +000076 Synchronized s(manager_->monitor_);
77 if (manager_->state_ == TimerManager::STARTING) {
David Reiss96d23882007-07-26 21:10:32 +000078 manager_->state_ = TimerManager::STARTED;
79 manager_->monitor_.notifyAll();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000080 }
81 }
82
83 do {
Marc Slemko6f038a72006-08-03 18:58:09 +000084 std::set<shared_ptr<TimerManager::Task> > expiredTasks;
Mark Sleef5f2be42006-09-05 21:05:31 +000085 {
Mark Slee2f6404d2006-10-10 01:37:40 +000086 Synchronized s(manager_->monitor_);
David Reiss96d23882007-07-26 21:10:32 +000087 task_iterator expiredTaskEnd;
88 int64_t now = Util::currentTime();
Konrad Grochowski16a23a62014-11-13 15:33:38 +010089 while (manager_->state_ == TimerManager::STARTED
90 && (expiredTaskEnd = manager_->taskMap_.upper_bound(now))
91 == manager_->taskMap_.begin()) {
David Reiss96d23882007-07-26 21:10:32 +000092 int64_t timeout = 0LL;
93 if (!manager_->taskMap_.empty()) {
Mark Slee2f6404d2006-10-10 01:37:40 +000094 timeout = manager_->taskMap_.begin()->first - now;
David Reiss96d23882007-07-26 21:10:32 +000095 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +010096 assert((timeout != 0 && manager_->taskCount_ > 0)
97 || (timeout == 0 && manager_->taskCount_ == 0));
Marc Slemko3a3b53b2007-05-22 23:59:54 +000098 try {
Mark Slee2782d6d2007-05-23 04:55:30 +000099 manager_->monitor_.wait(timeout);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100100 } catch (TimedOutException&) {
101 }
David Reiss96d23882007-07-26 21:10:32 +0000102 now = Util::currentTime();
103 }
David Reiss0c90f6f2008-02-06 22:18:40 +0000104
David Reiss96d23882007-07-26 21:10:32 +0000105 if (manager_->state_ == TimerManager::STARTED) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000106 for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000107 shared_ptr<TimerManager::Task> task = ix->second;
Mark Sleef5f2be42006-09-05 21:05:31 +0000108 expiredTasks.insert(task);
David Reiss96d23882007-07-26 21:10:32 +0000109 if (task->state_ == TimerManager::Task::WAITING) {
110 task->state_ = TimerManager::Task::EXECUTING;
111 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000112 manager_->taskCount_--;
David Reiss96d23882007-07-26 21:10:32 +0000113 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000114 manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
David Reiss96d23882007-07-26 21:10:32 +0000115 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000116 }
David Reiss0c90f6f2008-02-06 22:18:40 +0000117
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100118 for (std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin();
119 ix != expiredTasks.end();
Roger Meier71f2d8a2015-04-26 17:00:04 +0200120 ++ix) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000121 (*ix)->run();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000122 }
David Reiss0c90f6f2008-02-06 22:18:40 +0000123
Mark Slee2f6404d2006-10-10 01:37:40 +0000124 } while (manager_->state_ == TimerManager::STARTED);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000125
Mark Sleef5f2be42006-09-05 21:05:31 +0000126 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000127 Synchronized s(manager_->monitor_);
128 if (manager_->state_ == TimerManager::STOPPING) {
David Reiss0c90f6f2008-02-06 22:18:40 +0000129 manager_->state_ = TimerManager::STOPPED;
David Reiss96d23882007-07-26 21:10:32 +0000130 manager_->monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000131 }
132 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000133 return;
134 }
135
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100136private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000137 TimerManager* manager_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000138 friend class TimerManager;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000139};
140
Roger Meierb69d24d2012-10-04 18:02:15 +0000141#if defined(_MSC_VER)
142#pragma warning(push)
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100143#pragma warning(disable : 4355) // 'this' used in base member initializer list
Roger Meierb69d24d2012-10-04 18:02:15 +0000144#endif
145
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100146TimerManager::TimerManager()
147 : taskCount_(0),
148 state_(TimerManager::UNINITIALIZED),
149 dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000150}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000151
Roger Meierb69d24d2012-10-04 18:02:15 +0000152#if defined(_MSC_VER)
153#pragma warning(pop)
154#endif
Marc Slemko8a40a762006-07-19 17:46:50 +0000155
156TimerManager::~TimerManager() {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000157
Mark Sleef5f2be42006-09-05 21:05:31 +0000158 // If we haven't been explicitly stopped, do so now. We don't need to grab
159 // the monitor here, since stop already takes care of reentrancy.
David Reiss0c90f6f2008-02-06 22:18:40 +0000160
Mark Slee2f6404d2006-10-10 01:37:40 +0000161 if (state_ != STOPPED) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000162 try {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000163 stop();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100164 } catch (...) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000165 throw;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000166 // uhoh
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000167 }
168 }
Marc Slemko8a40a762006-07-19 17:46:50 +0000169}
170
171void TimerManager::start() {
Marc Slemko8a40a762006-07-19 17:46:50 +0000172 bool doStart = false;
Mark Sleef5f2be42006-09-05 21:05:31 +0000173 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000174 Synchronized s(monitor_);
Roger Meier72957452013-06-29 00:28:50 +0200175 if (!threadFactory_) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000176 throw InvalidArgumentException();
177 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000178 if (state_ == TimerManager::UNINITIALIZED) {
179 state_ = TimerManager::STARTING;
Marc Slemko8a40a762006-07-19 17:46:50 +0000180 doStart = true;
181 }
182 }
183
Mark Sleef5f2be42006-09-05 21:05:31 +0000184 if (doStart) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000185 dispatcherThread_ = threadFactory_->newThread(dispatcher_);
186 dispatcherThread_->start();
Marc Slemko8a40a762006-07-19 17:46:50 +0000187 }
188
Mark Sleef5f2be42006-09-05 21:05:31 +0000189 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000190 Synchronized s(monitor_);
191 while (state_ == TimerManager::STARTING) {
192 monitor_.wait();
Marc Slemko8a40a762006-07-19 17:46:50 +0000193 }
Aditya Agarwal3f234da2007-04-01 01:19:57 +0000194 assert(state_ != TimerManager::STARTING);
Marc Slemko8a40a762006-07-19 17:46:50 +0000195 }
196}
197
198void TimerManager::stop() {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000199 bool doStop = false;
Mark Sleef5f2be42006-09-05 21:05:31 +0000200 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000201 Synchronized s(monitor_);
202 if (state_ == TimerManager::UNINITIALIZED) {
203 state_ = TimerManager::STOPPED;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100204 } else if (state_ != STOPPING && state_ != STOPPED) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000205 doStop = true;
Mark Slee2f6404d2006-10-10 01:37:40 +0000206 state_ = STOPPING;
207 monitor_.notifyAll();
Marc Slemko8a40a762006-07-19 17:46:50 +0000208 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000209 while (state_ != STOPPED) {
210 monitor_.wait();
Marc Slemko8a40a762006-07-19 17:46:50 +0000211 }
212 }
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000213
Mark Sleef5f2be42006-09-05 21:05:31 +0000214 if (doStop) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000215 // Clean up any outstanding tasks
David Reiss5fa20da2009-06-04 00:32:47 +0000216 taskMap_.clear();
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000217
David Reiss0c90f6f2008-02-06 22:18:40 +0000218 // Remove dispatcher's reference to us.
Mark Slee2f6404d2006-10-10 01:37:40 +0000219 dispatcher_->manager_ = NULL;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000220 }
Marc Slemko8a40a762006-07-19 17:46:50 +0000221}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000222
Marc Slemko6f038a72006-08-03 18:58:09 +0000223shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
David Reiss0c90f6f2008-02-06 22:18:40 +0000224 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000225 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000226}
David Reiss0c90f6f2008-02-06 22:18:40 +0000227
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100228void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
David Reiss0c90f6f2008-02-06 22:18:40 +0000229 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000230 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000231}
232
Marc Slemko8a40a762006-07-19 17:46:50 +0000233size_t TimerManager::taskCount() const {
Mark Slee2f6404d2006-10-10 01:37:40 +0000234 return taskCount_;
Marc Slemko8a40a762006-07-19 17:46:50 +0000235}
David Reiss0c90f6f2008-02-06 22:18:40 +0000236
Mark Slee9b82d272007-05-23 05:16:07 +0000237void TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) {
238 int64_t now = Util::currentTime();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000239 timeout += now;
240
Mark Sleef5f2be42006-09-05 21:05:31 +0000241 {
David Reiss0c90f6f2008-02-06 22:18:40 +0000242 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000243 if (state_ != TimerManager::STARTED) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000244 throw IllegalStateException();
245 }
246
David Reiss52687eb2009-06-04 00:32:57 +0000247 // If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him
248 // if the expiration time is shorter than the current value. Need to test before we insert,
249 // because the new task might insert at the front.
250 bool notifyRequired = (taskCount_ == 0) ? true : timeout < taskMap_.begin()->first;
251
Mark Slee2f6404d2006-10-10 01:37:40 +0000252 taskCount_++;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100253 taskMap_.insert(
254 std::pair<int64_t, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000255
Mark Sleef5f2be42006-09-05 21:05:31 +0000256 // If the task map was empty, or if we have an expiration that is earlier
257 // than any previously seen, kick the dispatcher so it can update its
258 // timeout
David Reiss52687eb2009-06-04 00:32:57 +0000259 if (notifyRequired) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000260 monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000261 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000262 }
263}
264
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400265void TimerManager::add(shared_ptr<Runnable> task, const struct THRIFT_TIMESPEC& value) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000266
Mark Slee9b82d272007-05-23 05:16:07 +0000267 int64_t expiration;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000268 Util::toMilliseconds(expiration, value);
269
Mark Slee9b82d272007-05-23 05:16:07 +0000270 int64_t now = Util::currentTime();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000271
Mark Sleef5f2be42006-09-05 21:05:31 +0000272 if (expiration < now) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100273 throw InvalidArgumentException();
Marc Slemko8a40a762006-07-19 17:46:50 +0000274 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000275
276 add(task, expiration - now);
277}
278
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400279void TimerManager::add(shared_ptr<Runnable> task, const struct timeval& value) {
280
281 int64_t expiration;
282 Util::toMilliseconds(expiration, value);
283
284 int64_t now = Util::currentTime();
285
286 if (expiration < now) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100287 throw InvalidArgumentException();
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400288 }
289
290 add(task, expiration - now);
291}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000292
Marc Slemko6f038a72006-08-03 18:58:09 +0000293void TimerManager::remove(shared_ptr<Runnable> task) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100294 (void)task;
David Reiss0c90f6f2008-02-06 22:18:40 +0000295 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000296 if (state_ != TimerManager::STARTED) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000297 throw IllegalStateException();
Marc Slemko8a40a762006-07-19 17:46:50 +0000298 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000299}
300
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100301TimerManager::STATE TimerManager::state() const {
302 return state_;
303}
304}
305}
306} // apache::thrift::concurrency