blob: 09f7a9c5e1caae1073ca85110a26ec7717e4e2b6 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko0e53ccd2006-07-17 23:51:05 +00007#include "TimerManager.h"
Marc Slemko8a40a762006-07-19 17:46:50 +00008#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +00009#include "Util.h"
10
11#include <assert.h>
Marc Slemko8a40a762006-07-19 17:46:50 +000012#include <iostream>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000013#include <set>
14
15namespace facebook { namespace thrift { namespace concurrency {
16
Marc Slemko6f038a72006-08-03 18:58:09 +000017typedef std::multimap<long long, shared_ptr<TimerManager::Task> >::iterator task_iterator;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000018typedef std::pair<task_iterator, task_iterator> task_range;
19
Mark Sleef5f2be42006-09-05 21:05:31 +000020/**
21 * TimerManager class
22 *
23 * @author marc
24 * @version $Id:$
25 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000026class TimerManager::Task : public Runnable {
27
Mark Sleef5f2be42006-09-05 21:05:31 +000028 public:
Marc Slemko0e53ccd2006-07-17 23:51:05 +000029 enum STATE {
30 WAITING,
31 EXECUTING,
32 CANCELLED,
33 COMPLETE
34 };
35
Marc Slemko6f038a72006-08-03 18:58:09 +000036 Task(shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +000037 runnable_(runnable),
38 state_(WAITING) {}
Marc Slemko0e53ccd2006-07-17 23:51:05 +000039
Marc Slemko6f038a72006-08-03 18:58:09 +000040 ~Task() {
Mark Sleef5f2be42006-09-05 21:05:31 +000041 //debug
42 std::cerr << "TimerManager::Task.dtor[" << this << "]" << std::endl;
43 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000044
45 void run() {
Mark Slee2f6404d2006-10-10 01:37:40 +000046 if (state_ == EXECUTING) {
47 runnable_->run();
48 state_ = COMPLETE;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000049 }
50 }
51
52 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000053 shared_ptr<Runnable> runnable_;
Marc Slemko8a40a762006-07-19 17:46:50 +000054 class TimerManager::Dispatcher;
Marc Slemko8a40a762006-07-19 17:46:50 +000055 friend class TimerManager::Dispatcher;
Mark Slee2f6404d2006-10-10 01:37:40 +000056 STATE state_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000057};
58
59class TimerManager::Dispatcher: public Runnable {
60
Mark Sleef5f2be42006-09-05 21:05:31 +000061 public:
Marc Slemko0e53ccd2006-07-17 23:51:05 +000062 Dispatcher(TimerManager* manager) :
Mark Slee2f6404d2006-10-10 01:37:40 +000063 manager_(manager) {}
Marc Slemko0e53ccd2006-07-17 23:51:05 +000064
Marc Slemko6f038a72006-08-03 18:58:09 +000065 ~Dispatcher() {
Mark Sleef5f2be42006-09-05 21:05:31 +000066 // debug
67 std::cerr << "Dispatcher::dtor[" << this << "]" << std::endl;
Marc Slemko6f038a72006-08-03 18:58:09 +000068 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000069
Mark Sleef5f2be42006-09-05 21:05:31 +000070 /**
71 * Dispatcher entry point
72 *
Mark Slee2f6404d2006-10-10 01:37:40 +000073 * As long as dispatcher thread is running, pull tasks off the task taskMap_
Mark Sleef5f2be42006-09-05 21:05:31 +000074 * and execute.
75 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000076 void run() {
Mark Sleef5f2be42006-09-05 21:05:31 +000077 {
Mark Slee2f6404d2006-10-10 01:37:40 +000078 Synchronized s(manager_->monitor_);
79 if (manager_->state_ == TimerManager::STARTING) {
80 manager_->state_ = TimerManager::STARTED;
81 manager_->monitor_.notifyAll();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000082 }
83 }
84
85 do {
Marc Slemko6f038a72006-08-03 18:58:09 +000086 std::set<shared_ptr<TimerManager::Task> > expiredTasks;
Mark Sleef5f2be42006-09-05 21:05:31 +000087 {
Mark Slee2f6404d2006-10-10 01:37:40 +000088 Synchronized s(manager_->monitor_);
Marc Slemko0e53ccd2006-07-17 23:51:05 +000089 task_iterator expiredTaskEnd;
Marc Slemko9f27a4e2006-07-19 20:02:22 +000090 long long now = Util::currentTime();
Mark Slee2f6404d2006-10-10 01:37:40 +000091 while (manager_->state_ == TimerManager::STARTED &&
92 (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) == manager_->taskMap_.begin()) {
Marc Slemko8a40a762006-07-19 17:46:50 +000093 long long timeout = 0LL;
Mark Slee2f6404d2006-10-10 01:37:40 +000094 if (!manager_->taskMap_.empty()) {
95 timeout = manager_->taskMap_.begin()->first - now;
Marc Slemko8a40a762006-07-19 17:46:50 +000096 }
Mark Slee2f6404d2006-10-10 01:37:40 +000097 assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0));
98 manager_->monitor_.wait(timeout);
Marc Slemko9f27a4e2006-07-19 20:02:22 +000099 now = Util::currentTime();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000100 }
101
Mark Slee2f6404d2006-10-10 01:37:40 +0000102 if (manager_->state_ == TimerManager::STARTED) {
103 for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000104 shared_ptr<TimerManager::Task> task = ix->second;
Mark Sleef5f2be42006-09-05 21:05:31 +0000105 expiredTasks.insert(task);
Mark Slee2f6404d2006-10-10 01:37:40 +0000106 if (task->state_ == TimerManager::Task::WAITING) {
107 task->state_ = TimerManager::Task::EXECUTING;
Marc Slemko8a40a762006-07-19 17:46:50 +0000108 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000109 manager_->taskCount_--;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000110 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000111 manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000112 }
113 }
114
Mark Sleef5f2be42006-09-05 21:05:31 +0000115 for (std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
116 (*ix)->run();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000117 }
118
Mark Slee2f6404d2006-10-10 01:37:40 +0000119 } while (manager_->state_ == TimerManager::STARTED);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000120
Mark Sleef5f2be42006-09-05 21:05:31 +0000121 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000122 Synchronized s(manager_->monitor_);
123 if (manager_->state_ == TimerManager::STOPPING) {
124 manager_->state_ = TimerManager::STOPPED;
125 manager_->monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000126 }
127 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000128 return;
129 }
130
131 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000132 TimerManager* manager_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000133 friend class TimerManager;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000134};
135
Marc Slemko8a40a762006-07-19 17:46:50 +0000136TimerManager::TimerManager() :
Mark Slee2f6404d2006-10-10 01:37:40 +0000137 taskCount_(0),
138 state_(TimerManager::UNINITIALIZED),
139 dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000140}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000141
Marc Slemko8a40a762006-07-19 17:46:50 +0000142
143TimerManager::~TimerManager() {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000144
Mark Sleef5f2be42006-09-05 21:05:31 +0000145 // If we haven't been explicitly stopped, do so now. We don't need to grab
146 // the monitor here, since stop already takes care of reentrancy.
Marc Slemko6f038a72006-08-03 18:58:09 +0000147 std::cerr << "TimerManager::dtor[" << this << "]" << std::endl;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000148
Mark Slee2f6404d2006-10-10 01:37:40 +0000149 if (state_ != STOPPED) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000150 try {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000151 stop();
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000152 } catch(...) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000153 std::cerr << "TimerManager::dtor[" << this << "] uhoh " << std::endl;
154 throw;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000155 // uhoh
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000156 }
157 }
Marc Slemko8a40a762006-07-19 17:46:50 +0000158}
159
160void TimerManager::start() {
Marc Slemko8a40a762006-07-19 17:46:50 +0000161 bool doStart = false;
Mark Sleef5f2be42006-09-05 21:05:31 +0000162 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000163 Synchronized s(monitor_);
164 if (threadFactory_ == NULL) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000165 throw InvalidArgumentException();
166 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000167 if (state_ == TimerManager::UNINITIALIZED) {
168 state_ = TimerManager::STARTING;
Marc Slemko8a40a762006-07-19 17:46:50 +0000169 doStart = true;
170 }
171 }
172
Mark Sleef5f2be42006-09-05 21:05:31 +0000173 if (doStart) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000174 dispatcherThread_ = threadFactory_->newThread(dispatcher_);
175 dispatcherThread_->start();
Marc Slemko8a40a762006-07-19 17:46:50 +0000176 }
177
Mark Sleef5f2be42006-09-05 21:05:31 +0000178 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000179 Synchronized s(monitor_);
180 while (state_ == TimerManager::STARTING) {
181 monitor_.wait();
Marc Slemko8a40a762006-07-19 17:46:50 +0000182 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000183 assert(state_ != TimerManager::STARTING);
Marc Slemko8a40a762006-07-19 17:46:50 +0000184 }
185}
186
187void TimerManager::stop() {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000188 bool doStop = false;
Mark Sleef5f2be42006-09-05 21:05:31 +0000189 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000190 Synchronized s(monitor_);
191 if (state_ == TimerManager::UNINITIALIZED) {
192 state_ = TimerManager::STOPPED;
193 } else if (state_ != STOPPING && state_ != STOPPED) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000194 doStop = true;
Mark Slee2f6404d2006-10-10 01:37:40 +0000195 state_ = STOPPING;
196 monitor_.notifyAll();
Marc Slemko8a40a762006-07-19 17:46:50 +0000197 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000198 while (state_ != STOPPED) {
199 monitor_.wait();
Marc Slemko8a40a762006-07-19 17:46:50 +0000200 }
201 }
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000202
Mark Sleef5f2be42006-09-05 21:05:31 +0000203 if (doStop) {
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000204 // Clean up any outstanding tasks
Mark Slee2f6404d2006-10-10 01:37:40 +0000205 for (task_iterator ix = taskMap_.begin(); ix != taskMap_.end(); ix++) {
206 taskMap_.erase(ix);
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000207 }
208
Marc Slemko6f038a72006-08-03 18:58:09 +0000209 // Remove dispatcher's reference to us.
Mark Slee2f6404d2006-10-10 01:37:40 +0000210 dispatcher_->manager_ = NULL;
Marc Slemko9f27a4e2006-07-19 20:02:22 +0000211 }
Marc Slemko8a40a762006-07-19 17:46:50 +0000212}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000213
Marc Slemko6f038a72006-08-03 18:58:09 +0000214shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
Mark Slee2f6404d2006-10-10 01:37:40 +0000215 Synchronized s(monitor_);
216 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000217}
218
Marc Slemko6f038a72006-08-03 18:58:09 +0000219void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000220 Synchronized s(monitor_);
221 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000222}
223
Marc Slemko8a40a762006-07-19 17:46:50 +0000224size_t TimerManager::taskCount() const {
Mark Slee2f6404d2006-10-10 01:37:40 +0000225 return taskCount_;
Marc Slemko8a40a762006-07-19 17:46:50 +0000226}
227
Marc Slemko6f038a72006-08-03 18:58:09 +0000228void TimerManager::add(shared_ptr<Runnable> task, long long timeout) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000229 long long now = Util::currentTime();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000230 timeout += now;
231
Mark Sleef5f2be42006-09-05 21:05:31 +0000232 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000233 Synchronized s(monitor_);
234 if (state_ != TimerManager::STARTED) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000235 throw IllegalStateException();
236 }
237
Mark Slee2f6404d2006-10-10 01:37:40 +0000238 taskCount_++;
239 taskMap_.insert(std::pair<long long, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000240
Mark Sleef5f2be42006-09-05 21:05:31 +0000241 // If the task map was empty, or if we have an expiration that is earlier
242 // than any previously seen, kick the dispatcher so it can update its
243 // timeout
Mark Slee2f6404d2006-10-10 01:37:40 +0000244 if (taskCount_ == 1 || timeout < taskMap_.begin()->first) {
245 monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000246 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000247 }
248}
249
Marc Slemko6f038a72006-08-03 18:58:09 +0000250void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000251
Mark Sleef5f2be42006-09-05 21:05:31 +0000252 long long expiration;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000253 Util::toMilliseconds(expiration, value);
254
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000255 long long now = Util::currentTime();
256
Mark Sleef5f2be42006-09-05 21:05:31 +0000257 if (expiration < now) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000258 throw InvalidArgumentException();
259 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000260
261 add(task, expiration - now);
262}
263
264
Marc Slemko6f038a72006-08-03 18:58:09 +0000265void TimerManager::remove(shared_ptr<Runnable> task) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000266 Synchronized s(monitor_);
267 if (state_ != TimerManager::STARTED) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000268 throw IllegalStateException();
Marc Slemko8a40a762006-07-19 17:46:50 +0000269 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000270}
271
Mark Slee2f6404d2006-10-10 01:37:40 +0000272const TimerManager::STATE TimerManager::state() const { return state_; }
Marc Slemko8a40a762006-07-19 17:46:50 +0000273
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000274}}} // facebook::thrift::concurrency
275