blob: 703c19ed1dbd9d4f343ca2b0b048777f8b2ba702 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Roger Meier4285ba22013-06-10 21:17:23 +020020#include <thrift/concurrency/TimerManager.h>
21#include <thrift/concurrency/Exception.h>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000022
23#include <assert.h>
Marc Slemko8a40a762006-07-19 17:46:50 +000024#include <iostream>
cyy64750162019-02-08 13:40:59 +080025#include <memory>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000026#include <set>
27
Konrad Grochowski16a23a62014-11-13 15:33:38 +010028namespace apache {
29namespace thrift {
30namespace concurrency {
Marc Slemko0e53ccd2006-07-17 23:51:05 +000031
cyy316723a2019-01-05 16:35:14 +080032using std::shared_ptr;
33using std::weak_ptr;
Mark Slee5ea15f92007-03-05 22:55:59 +000034
Mark Sleef5f2be42006-09-05 21:05:31 +000035/**
36 * TimerManager class
37 *
Mark Sleef5f2be42006-09-05 21:05:31 +000038 * @version $Id:$
39 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000040class TimerManager::Task : public Runnable {
41
Konrad Grochowski16a23a62014-11-13 15:33:38 +010042public:
43 enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE };
Marc Slemko0e53ccd2006-07-17 23:51:05 +000044
Konrad Grochowski16a23a62014-11-13 15:33:38 +010045 Task(shared_ptr<Runnable> runnable) : runnable_(runnable), state_(WAITING) {}
David Reiss0c90f6f2008-02-06 22:18:40 +000046
Sebastian Zenker042580f2019-01-29 15:48:12 +010047 ~Task() override = default;
David Reiss0c90f6f2008-02-06 22:18:40 +000048
Sebastian Zenker042580f2019-01-29 15:48:12 +010049 void run() override {
Mark Slee2f6404d2006-10-10 01:37:40 +000050 if (state_ == EXECUTING) {
51 runnable_->run();
52 state_ = COMPLETE;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000053 }
54 }
55
Francois Ferrandcc2d5582017-08-25 09:01:26 +020056 bool operator==(const shared_ptr<Runnable> & runnable) const { return runnable_ == runnable; }
57
Francois Ferrand69603702017-09-11 12:09:40 +020058 task_iterator it_;
59
Konrad Grochowski16a23a62014-11-13 15:33:38 +010060private:
Mark Slee2f6404d2006-10-10 01:37:40 +000061 shared_ptr<Runnable> runnable_;
Marc Slemko8a40a762006-07-19 17:46:50 +000062 friend class TimerManager::Dispatcher;
Mark Slee2f6404d2006-10-10 01:37:40 +000063 STATE state_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000064};
65
Konrad Grochowski16a23a62014-11-13 15:33:38 +010066class TimerManager::Dispatcher : public Runnable {
Marc Slemko0e53ccd2006-07-17 23:51:05 +000067
Konrad Grochowski16a23a62014-11-13 15:33:38 +010068public:
69 Dispatcher(TimerManager* manager) : manager_(manager) {}
David Reiss0c90f6f2008-02-06 22:18:40 +000070
Sebastian Zenker042580f2019-01-29 15:48:12 +010071 ~Dispatcher() override = default;
David Reiss0c90f6f2008-02-06 22:18:40 +000072
Mark Sleef5f2be42006-09-05 21:05:31 +000073 /**
74 * Dispatcher entry point
75 *
Mark Slee2f6404d2006-10-10 01:37:40 +000076 * As long as dispatcher thread is running, pull tasks off the task taskMap_
Mark Sleef5f2be42006-09-05 21:05:31 +000077 * and execute.
78 */
Sebastian Zenker042580f2019-01-29 15:48:12 +010079 void run() override {
Mark Sleef5f2be42006-09-05 21:05:31 +000080 {
Mark Slee2f6404d2006-10-10 01:37:40 +000081 Synchronized s(manager_->monitor_);
82 if (manager_->state_ == TimerManager::STARTING) {
David Reiss96d23882007-07-26 21:10:32 +000083 manager_->state_ = TimerManager::STARTED;
84 manager_->monitor_.notifyAll();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000085 }
86 }
87
88 do {
Marc Slemko6f038a72006-08-03 18:58:09 +000089 std::set<shared_ptr<TimerManager::Task> > expiredTasks;
Mark Sleef5f2be42006-09-05 21:05:31 +000090 {
Mark Slee2f6404d2006-10-10 01:37:40 +000091 Synchronized s(manager_->monitor_);
David Reiss96d23882007-07-26 21:10:32 +000092 task_iterator expiredTaskEnd;
cyybfdbd032019-01-12 14:38:28 +080093 auto now = std::chrono::steady_clock::now();
Konrad Grochowski16a23a62014-11-13 15:33:38 +010094 while (manager_->state_ == TimerManager::STARTED
95 && (expiredTaskEnd = manager_->taskMap_.upper_bound(now))
96 == manager_->taskMap_.begin()) {
cyybfdbd032019-01-12 14:38:28 +080097 std::chrono::milliseconds timeout(0);
David Reiss96d23882007-07-26 21:10:32 +000098 if (!manager_->taskMap_.empty()) {
cyybfdbd032019-01-12 14:38:28 +080099 timeout = std::chrono::duration_cast<std::chrono::milliseconds>(manager_->taskMap_.begin()->first - now);
100 //because the unit of steady_clock is smaller than millisecond,timeout may be 0.
101 if (timeout.count() == 0) {
102 timeout = std::chrono::milliseconds(1);
103 }
104 manager_->monitor_.waitForTimeRelative(timeout);
105 } else {
106 manager_->monitor_.waitForTimeRelative(0);
David Reiss96d23882007-07-26 21:10:32 +0000107 }
cyybfdbd032019-01-12 14:38:28 +0800108 now = std::chrono::steady_clock::now();
David Reiss96d23882007-07-26 21:10:32 +0000109 }
David Reiss0c90f6f2008-02-06 22:18:40 +0000110
David Reiss96d23882007-07-26 21:10:32 +0000111 if (manager_->state_ == TimerManager::STARTED) {
Sebastian Zenker042580f2019-01-29 15:48:12 +0100112 for (auto ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000113 shared_ptr<TimerManager::Task> task = ix->second;
Mark Sleef5f2be42006-09-05 21:05:31 +0000114 expiredTasks.insert(task);
Francois Ferrand69603702017-09-11 12:09:40 +0200115 task->it_ = manager_->taskMap_.end();
David Reiss96d23882007-07-26 21:10:32 +0000116 if (task->state_ == TimerManager::Task::WAITING) {
117 task->state_ = TimerManager::Task::EXECUTING;
118 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000119 manager_->taskCount_--;
David Reiss96d23882007-07-26 21:10:32 +0000120 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000121 manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
David Reiss96d23882007-07-26 21:10:32 +0000122 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000123 }
David Reiss0c90f6f2008-02-06 22:18:40 +0000124
cyy64750162019-02-08 13:40:59 +0800125 for (const auto & expiredTask : expiredTasks) {
126 expiredTask->run();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000127 }
David Reiss0c90f6f2008-02-06 22:18:40 +0000128
Mark Slee2f6404d2006-10-10 01:37:40 +0000129 } while (manager_->state_ == TimerManager::STARTED);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000130
Mark Sleef5f2be42006-09-05 21:05:31 +0000131 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000132 Synchronized s(manager_->monitor_);
133 if (manager_->state_ == TimerManager::STOPPING) {
David Reiss0c90f6f2008-02-06 22:18:40 +0000134 manager_->state_ = TimerManager::STOPPED;
Guillaume Blanc5e9203b2019-09-20 17:19:57 +0200135 manager_->monitor_.notifyAll();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000136 }
137 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000138 return;
139 }
140
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100141private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000142 TimerManager* manager_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000143 friend class TimerManager;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000144};
145
Roger Meierb69d24d2012-10-04 18:02:15 +0000146#if defined(_MSC_VER)
147#pragma warning(push)
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100148#pragma warning(disable : 4355) // 'this' used in base member initializer list
Roger Meierb69d24d2012-10-04 18:02:15 +0000149#endif
150
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100151TimerManager::TimerManager()
152 : taskCount_(0),
153 state_(TimerManager::UNINITIALIZED),
cyy64750162019-02-08 13:40:59 +0800154 dispatcher_(std::make_shared<Dispatcher>(this)) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000155}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000156
Roger Meierb69d24d2012-10-04 18:02:15 +0000157#if defined(_MSC_VER)
158#pragma warning(pop)
159#endif
Marc Slemko8a40a762006-07-19 17:46:50 +0000160
161TimerManager::~TimerManager() {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000162
Mark Sleef5f2be42006-09-05 21:05:31 +0000163 // If we haven't been explicitly stopped, do so now. We don't need to grab
164 // the monitor here, since stop already takes care of reentrancy.
David Reiss0c90f6f2008-02-06 22:18:40 +0000165
Mark Slee2f6404d2006-10-10 01:37:40 +0000166 if (state_ != STOPPED) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000167 try {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000168 stop();
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100169 } catch (...) {
tpcwangf98d59f2016-03-23 16:18:52 -0700170 // We're really hosed.
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000171 }
172 }
Marc Slemko8a40a762006-07-19 17:46:50 +0000173}
174
175void TimerManager::start() {
Marc Slemko8a40a762006-07-19 17:46:50 +0000176 bool doStart = false;
Mark Sleef5f2be42006-09-05 21:05:31 +0000177 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000178 Synchronized s(monitor_);
Roger Meier72957452013-06-29 00:28:50 +0200179 if (!threadFactory_) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000180 throw InvalidArgumentException();
181 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000182 if (state_ == TimerManager::UNINITIALIZED) {
183 state_ = TimerManager::STARTING;
Marc Slemko8a40a762006-07-19 17:46:50 +0000184 doStart = true;
185 }
186 }
187
Mark Sleef5f2be42006-09-05 21:05:31 +0000188 if (doStart) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000189 dispatcherThread_ = threadFactory_->newThread(dispatcher_);
190 dispatcherThread_->start();
Marc Slemko8a40a762006-07-19 17:46:50 +0000191 }
192
Mark Sleef5f2be42006-09-05 21:05:31 +0000193 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000194 Synchronized s(monitor_);
195 while (state_ == TimerManager::STARTING) {
196 monitor_.wait();
Marc Slemko8a40a762006-07-19 17:46:50 +0000197 }
Aditya Agarwal3f234da2007-04-01 01:19:57 +0000198 assert(state_ != TimerManager::STARTING);
Marc Slemko8a40a762006-07-19 17:46:50 +0000199 }
200}
201
202void TimerManager::stop() {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000203 bool doStop = false;
Mark Sleef5f2be42006-09-05 21:05:31 +0000204 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000205 Synchronized s(monitor_);
206 if (state_ == TimerManager::UNINITIALIZED) {
207 state_ = TimerManager::STOPPED;
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100208 } else if (state_ != STOPPING && state_ != STOPPED) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000209 doStop = true;
Mark Slee2f6404d2006-10-10 01:37:40 +0000210 state_ = STOPPING;
211 monitor_.notifyAll();
Marc Slemko8a40a762006-07-19 17:46:50 +0000212 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000213 while (state_ != STOPPED) {
214 monitor_.wait();
Marc Slemko8a40a762006-07-19 17:46:50 +0000215 }
216 }
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000217
Mark Sleef5f2be42006-09-05 21:05:31 +0000218 if (doStop) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000219 // Clean up any outstanding tasks
David Reiss5fa20da2009-06-04 00:32:47 +0000220 taskMap_.clear();
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000221
David Reiss0c90f6f2008-02-06 22:18:40 +0000222 // Remove dispatcher's reference to us.
Sebastian Zenker042580f2019-01-29 15:48:12 +0100223 dispatcher_->manager_ = nullptr;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000224 }
Marc Slemko8a40a762006-07-19 17:46:50 +0000225}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000226
Marc Slemko6f038a72006-08-03 18:58:09 +0000227shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
David Reiss0c90f6f2008-02-06 22:18:40 +0000228 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000229 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000230}
David Reiss0c90f6f2008-02-06 22:18:40 +0000231
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100232void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
David Reiss0c90f6f2008-02-06 22:18:40 +0000233 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000234 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000235}
236
Marc Slemko8a40a762006-07-19 17:46:50 +0000237size_t TimerManager::taskCount() const {
Mark Slee2f6404d2006-10-10 01:37:40 +0000238 return taskCount_;
Marc Slemko8a40a762006-07-19 17:46:50 +0000239}
David Reiss0c90f6f2008-02-06 22:18:40 +0000240
cyybfdbd032019-01-12 14:38:28 +0800241TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task, const std::chrono::milliseconds &timeout) {
242 return add(task, std::chrono::steady_clock::now() + timeout);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000243}
244
Francois Ferrand69603702017-09-11 12:09:40 +0200245TimerManager::Timer TimerManager::add(shared_ptr<Runnable> task,
cyybfdbd032019-01-12 14:38:28 +0800246 const std::chrono::time_point<std::chrono::steady_clock>& abstime) {
247 auto now = std::chrono::steady_clock::now();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000248
cyybfdbd032019-01-12 14:38:28 +0800249 if (abstime < now) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100250 throw InvalidArgumentException();
Marc Slemko8a40a762006-07-19 17:46:50 +0000251 }
cyybfdbd032019-01-12 14:38:28 +0800252 Synchronized s(monitor_);
253 if (state_ != TimerManager::STARTED) {
254 throw IllegalStateException();
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400255 }
256
cyybfdbd032019-01-12 14:38:28 +0800257 // If the task map is empty, we will kick the dispatcher for sure. Otherwise, we kick him
258 // if the expiration time is shorter than the current value. Need to test before we insert,
259 // because the new task might insert at the front.
260 bool notifyRequired = (taskCount_ == 0) ? true : abstime < taskMap_.begin()->first;
261
262 shared_ptr<Task> timer(new Task(task));
263 taskCount_++;
264 timer->it_ = taskMap_.emplace(abstime, timer);
265
266 // If the task map was empty, or if we have an expiration that is earlier
267 // than any previously seen, kick the dispatcher so it can update its
268 // timeout
269 if (notifyRequired) {
270 monitor_.notify();
271 }
272
273 return timer;
Carl Yeksigian7cb7fc82013-06-07 07:33:01 -0400274}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000275
Marc Slemko6f038a72006-08-03 18:58:09 +0000276void TimerManager::remove(shared_ptr<Runnable> task) {
David Reiss0c90f6f2008-02-06 22:18:40 +0000277 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000278 if (state_ != TimerManager::STARTED) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000279 throw IllegalStateException();
Marc Slemko8a40a762006-07-19 17:46:50 +0000280 }
Francois Ferrandcc2d5582017-08-25 09:01:26 +0200281 bool found = false;
Sebastian Zenker042580f2019-01-29 15:48:12 +0100282 for (auto ix = taskMap_.begin(); ix != taskMap_.end();) {
Francois Ferrandcc2d5582017-08-25 09:01:26 +0200283 if (*ix->second == task) {
284 found = true;
285 taskCount_--;
286 taskMap_.erase(ix++);
287 } else {
288 ++ix;
289 }
290 }
291 if (!found) {
292 throw NoSuchTaskException();
293 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000294}
295
Francois Ferrand69603702017-09-11 12:09:40 +0200296void TimerManager::remove(Timer handle) {
297 Synchronized s(monitor_);
298 if (state_ != TimerManager::STARTED) {
299 throw IllegalStateException();
300 }
301
302 shared_ptr<Task> task = handle.lock();
303 if (!task) {
304 throw NoSuchTaskException();
305 }
306
307 if (task->it_ == taskMap_.end()) {
308 // Task is being executed
309 throw UncancellableTaskException();
310 }
311
312 taskMap_.erase(task->it_);
313 taskCount_--;
314}
315
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100316TimerManager::STATE TimerManager::state() const {
317 return state_;
318}
319}
320}
321} // apache::thrift::concurrency