blob: 4f77b2f5c1c07a072f690326a086983b310f5a0f [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko0e53ccd2006-07-17 23:51:05 +00007#include "TimerManager.h"
Marc Slemko8a40a762006-07-19 17:46:50 +00008#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +00009#include "Util.h"
10
11#include <assert.h>
Marc Slemko8a40a762006-07-19 17:46:50 +000012#include <iostream>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000013#include <set>
14
15namespace facebook { namespace thrift { namespace concurrency {
16
Mark Slee5ea15f92007-03-05 22:55:59 +000017using boost::shared_ptr;
18
Mark Slee9b82d272007-05-23 05:16:07 +000019typedef std::multimap<int64_t, shared_ptr<TimerManager::Task> >::iterator task_iterator;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000020typedef std::pair<task_iterator, task_iterator> task_range;
21
Mark Sleef5f2be42006-09-05 21:05:31 +000022/**
23 * TimerManager class
24 *
25 * @author marc
26 * @version $Id:$
27 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000028class TimerManager::Task : public Runnable {
29
Mark Sleef5f2be42006-09-05 21:05:31 +000030 public:
Marc Slemko0e53ccd2006-07-17 23:51:05 +000031 enum STATE {
32 WAITING,
33 EXECUTING,
34 CANCELLED,
35 COMPLETE
36 };
37
Marc Slemko6f038a72006-08-03 18:58:09 +000038 Task(shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +000039 runnable_(runnable),
40 state_(WAITING) {}
Marc Slemko0e53ccd2006-07-17 23:51:05 +000041
Marc Slemko6f038a72006-08-03 18:58:09 +000042 ~Task() {
Mark Sleef5f2be42006-09-05 21:05:31 +000043 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000044
45 void run() {
Mark Slee2f6404d2006-10-10 01:37:40 +000046 if (state_ == EXECUTING) {
47 runnable_->run();
48 state_ = COMPLETE;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000049 }
50 }
51
52 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000053 shared_ptr<Runnable> runnable_;
Marc Slemko8a40a762006-07-19 17:46:50 +000054 class TimerManager::Dispatcher;
Marc Slemko8a40a762006-07-19 17:46:50 +000055 friend class TimerManager::Dispatcher;
Mark Slee2f6404d2006-10-10 01:37:40 +000056 STATE state_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000057};
58
59class TimerManager::Dispatcher: public Runnable {
60
Mark Sleef5f2be42006-09-05 21:05:31 +000061 public:
Marc Slemko0e53ccd2006-07-17 23:51:05 +000062 Dispatcher(TimerManager* manager) :
Mark Slee2f6404d2006-10-10 01:37:40 +000063 manager_(manager) {}
Marc Slemko0e53ccd2006-07-17 23:51:05 +000064
Marc Slemko67606e52007-06-04 21:01:19 +000065 ~Dispatcher() {}
Marc Slemko0e53ccd2006-07-17 23:51:05 +000066
Mark Sleef5f2be42006-09-05 21:05:31 +000067 /**
68 * Dispatcher entry point
69 *
Mark Slee2f6404d2006-10-10 01:37:40 +000070 * As long as dispatcher thread is running, pull tasks off the task taskMap_
Mark Sleef5f2be42006-09-05 21:05:31 +000071 * and execute.
72 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000073 void run() {
Mark Sleef5f2be42006-09-05 21:05:31 +000074 {
Mark Slee2f6404d2006-10-10 01:37:40 +000075 Synchronized s(manager_->monitor_);
76 if (manager_->state_ == TimerManager::STARTING) {
David Reiss96d23882007-07-26 21:10:32 +000077 manager_->state_ = TimerManager::STARTED;
78 manager_->monitor_.notifyAll();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000079 }
80 }
81
82 do {
Marc Slemko6f038a72006-08-03 18:58:09 +000083 std::set<shared_ptr<TimerManager::Task> > expiredTasks;
Mark Sleef5f2be42006-09-05 21:05:31 +000084 {
Mark Slee2f6404d2006-10-10 01:37:40 +000085 Synchronized s(manager_->monitor_);
David Reiss96d23882007-07-26 21:10:32 +000086 task_iterator expiredTaskEnd;
87 int64_t now = Util::currentTime();
88 while (manager_->state_ == TimerManager::STARTED &&
Mark Slee2f6404d2006-10-10 01:37:40 +000089 (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) == manager_->taskMap_.begin()) {
David Reiss96d23882007-07-26 21:10:32 +000090 int64_t timeout = 0LL;
91 if (!manager_->taskMap_.empty()) {
Mark Slee2f6404d2006-10-10 01:37:40 +000092 timeout = manager_->taskMap_.begin()->first - now;
David Reiss96d23882007-07-26 21:10:32 +000093 }
Aditya Agarwal3f234da2007-04-01 01:19:57 +000094 assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0));
Marc Slemko3a3b53b2007-05-22 23:59:54 +000095 try {
Mark Slee2782d6d2007-05-23 04:55:30 +000096 manager_->monitor_.wait(timeout);
97 } catch (TimedOutException &e) {}
David Reiss96d23882007-07-26 21:10:32 +000098 now = Util::currentTime();
99 }
100
101 if (manager_->state_ == TimerManager::STARTED) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000102 for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000103 shared_ptr<TimerManager::Task> task = ix->second;
Mark Sleef5f2be42006-09-05 21:05:31 +0000104 expiredTasks.insert(task);
David Reiss96d23882007-07-26 21:10:32 +0000105 if (task->state_ == TimerManager::Task::WAITING) {
106 task->state_ = TimerManager::Task::EXECUTING;
107 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000108 manager_->taskCount_--;
David Reiss96d23882007-07-26 21:10:32 +0000109 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000110 manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
David Reiss96d23882007-07-26 21:10:32 +0000111 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000112 }
113
Mark Sleef5f2be42006-09-05 21:05:31 +0000114 for (std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
115 (*ix)->run();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000116 }
117
Mark Slee2f6404d2006-10-10 01:37:40 +0000118 } while (manager_->state_ == TimerManager::STARTED);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000119
Mark Sleef5f2be42006-09-05 21:05:31 +0000120 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000121 Synchronized s(manager_->monitor_);
122 if (manager_->state_ == TimerManager::STOPPING) {
David Reiss96d23882007-07-26 21:10:32 +0000123 manager_->state_ = TimerManager::STOPPED;
124 manager_->monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000125 }
126 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000127 return;
128 }
129
130 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000131 TimerManager* manager_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000132 friend class TimerManager;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000133};
134
Marc Slemko8a40a762006-07-19 17:46:50 +0000135TimerManager::TimerManager() :
Mark Slee2f6404d2006-10-10 01:37:40 +0000136 taskCount_(0),
137 state_(TimerManager::UNINITIALIZED),
138 dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000139}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000140
Marc Slemko8a40a762006-07-19 17:46:50 +0000141
142TimerManager::~TimerManager() {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000143
Mark Sleef5f2be42006-09-05 21:05:31 +0000144 // If we haven't been explicitly stopped, do so now. We don't need to grab
145 // the monitor here, since stop already takes care of reentrancy.
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000146
Mark Slee2f6404d2006-10-10 01:37:40 +0000147 if (state_ != STOPPED) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000148 try {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000149 stop();
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000150 } catch(...) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000151 throw;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000152 // uhoh
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000153 }
154 }
Marc Slemko8a40a762006-07-19 17:46:50 +0000155}
156
157void TimerManager::start() {
Marc Slemko8a40a762006-07-19 17:46:50 +0000158 bool doStart = false;
Mark Sleef5f2be42006-09-05 21:05:31 +0000159 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000160 Synchronized s(monitor_);
161 if (threadFactory_ == NULL) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000162 throw InvalidArgumentException();
163 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000164 if (state_ == TimerManager::UNINITIALIZED) {
165 state_ = TimerManager::STARTING;
Marc Slemko8a40a762006-07-19 17:46:50 +0000166 doStart = true;
167 }
168 }
169
Mark Sleef5f2be42006-09-05 21:05:31 +0000170 if (doStart) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000171 dispatcherThread_ = threadFactory_->newThread(dispatcher_);
172 dispatcherThread_->start();
Marc Slemko8a40a762006-07-19 17:46:50 +0000173 }
174
Mark Sleef5f2be42006-09-05 21:05:31 +0000175 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000176 Synchronized s(monitor_);
177 while (state_ == TimerManager::STARTING) {
178 monitor_.wait();
Marc Slemko8a40a762006-07-19 17:46:50 +0000179 }
Aditya Agarwal3f234da2007-04-01 01:19:57 +0000180 assert(state_ != TimerManager::STARTING);
Marc Slemko8a40a762006-07-19 17:46:50 +0000181 }
182}
183
184void TimerManager::stop() {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000185 bool doStop = false;
Mark Sleef5f2be42006-09-05 21:05:31 +0000186 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000187 Synchronized s(monitor_);
188 if (state_ == TimerManager::UNINITIALIZED) {
189 state_ = TimerManager::STOPPED;
190 } else if (state_ != STOPPING && state_ != STOPPED) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000191 doStop = true;
Mark Slee2f6404d2006-10-10 01:37:40 +0000192 state_ = STOPPING;
193 monitor_.notifyAll();
Marc Slemko8a40a762006-07-19 17:46:50 +0000194 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000195 while (state_ != STOPPED) {
196 monitor_.wait();
Marc Slemko8a40a762006-07-19 17:46:50 +0000197 }
198 }
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000199
Mark Sleef5f2be42006-09-05 21:05:31 +0000200 if (doStop) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000201 // Clean up any outstanding tasks
Mark Slee2f6404d2006-10-10 01:37:40 +0000202 for (task_iterator ix = taskMap_.begin(); ix != taskMap_.end(); ix++) {
203 taskMap_.erase(ix);
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000204 }
205
Marc Slemko6f038a72006-08-03 18:58:09 +0000206 // Remove dispatcher's reference to us.
Mark Slee2f6404d2006-10-10 01:37:40 +0000207 dispatcher_->manager_ = NULL;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000208 }
Marc Slemko8a40a762006-07-19 17:46:50 +0000209}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000210
Marc Slemko6f038a72006-08-03 18:58:09 +0000211shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
Mark Slee2f6404d2006-10-10 01:37:40 +0000212 Synchronized s(monitor_);
213 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000214}
215
Marc Slemko6f038a72006-08-03 18:58:09 +0000216void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000217 Synchronized s(monitor_);
218 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000219}
220
Marc Slemko8a40a762006-07-19 17:46:50 +0000221size_t TimerManager::taskCount() const {
Mark Slee2f6404d2006-10-10 01:37:40 +0000222 return taskCount_;
Marc Slemko8a40a762006-07-19 17:46:50 +0000223}
224
Mark Slee9b82d272007-05-23 05:16:07 +0000225void TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) {
226 int64_t now = Util::currentTime();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000227 timeout += now;
228
Mark Sleef5f2be42006-09-05 21:05:31 +0000229 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000230 Synchronized s(monitor_);
231 if (state_ != TimerManager::STARTED) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000232 throw IllegalStateException();
233 }
234
Mark Slee2f6404d2006-10-10 01:37:40 +0000235 taskCount_++;
Mark Slee9b82d272007-05-23 05:16:07 +0000236 taskMap_.insert(std::pair<int64_t, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000237
Mark Sleef5f2be42006-09-05 21:05:31 +0000238 // If the task map was empty, or if we have an expiration that is earlier
239 // than any previously seen, kick the dispatcher so it can update its
240 // timeout
Mark Slee2f6404d2006-10-10 01:37:40 +0000241 if (taskCount_ == 1 || timeout < taskMap_.begin()->first) {
242 monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000243 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000244 }
245}
246
Marc Slemko6f038a72006-08-03 18:58:09 +0000247void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000248
Mark Slee9b82d272007-05-23 05:16:07 +0000249 int64_t expiration;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000250 Util::toMilliseconds(expiration, value);
251
Mark Slee9b82d272007-05-23 05:16:07 +0000252 int64_t now = Util::currentTime();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000253
Mark Sleef5f2be42006-09-05 21:05:31 +0000254 if (expiration < now) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000255 throw InvalidArgumentException();
256 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000257
258 add(task, expiration - now);
259}
260
261
Marc Slemko6f038a72006-08-03 18:58:09 +0000262void TimerManager::remove(shared_ptr<Runnable> task) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000263 Synchronized s(monitor_);
264 if (state_ != TimerManager::STARTED) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000265 throw IllegalStateException();
Marc Slemko8a40a762006-07-19 17:46:50 +0000266 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000267}
268
Mark Slee2f6404d2006-10-10 01:37:40 +0000269const TimerManager::STATE TimerManager::state() const { return state_; }
Marc Slemko8a40a762006-07-19 17:46:50 +0000270
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000271}}} // facebook::thrift::concurrency
272