blob: a65394b597676d47cc568ada2ce07a3b877640a7 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Marc Slemko66949872006-07-15 01:52:39 +000020#include "ThreadManager.h"
Marc Slemkod466b212006-07-20 00:04:18 +000021#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +000022#include "Monitor.h"
David Reiss068f4162010-03-09 05:19:45 +000023#include "Util.h"
Marc Slemko66949872006-07-15 01:52:39 +000024
Marc Slemko6f038a72006-08-03 18:58:09 +000025#include <boost/shared_ptr.hpp>
26
Marc Slemko66949872006-07-15 01:52:39 +000027#include <assert.h>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000028#include <queue>
29#include <set>
Marc Slemko66949872006-07-15 01:52:39 +000030
Marc Slemko6f038a72006-08-03 18:58:09 +000031#if defined(DEBUG)
32#include <iostream>
33#endif //defined(DEBUG)
34
T Jake Lucianib5e62212009-01-31 22:36:20 +000035namespace apache { namespace thrift { namespace concurrency {
Marc Slemko66949872006-07-15 01:52:39 +000036
David Reissd4a269c2007-08-23 02:37:19 +000037using boost::shared_ptr;
38using boost::dynamic_pointer_cast;
Marc Slemko6f038a72006-08-03 18:58:09 +000039
Mark Sleef5f2be42006-09-05 21:05:31 +000040/**
41 * ThreadManager class
Marc Slemko3a3b53b2007-05-22 23:59:54 +000042 *
Mark Sleef5f2be42006-09-05 21:05:31 +000043 * This class manages a pool of threads. It uses a ThreadFactory to create
44 * threads. It never actually creates or destroys worker threads, rather
45 * it maintains statistics on number of idle threads, number of active threads,
46 * task backlog, and average wait and service times.
47 *
Mark Sleef5f2be42006-09-05 21:05:31 +000048 * @version $Id:$
49 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000050class ThreadManager::Impl : public ThreadManager {
51
52 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000053 Impl() :
Mark Slee2f6404d2006-10-10 01:37:40 +000054 workerCount_(0),
55 workerMaxCount_(0),
56 idleCount_(0),
Marc Slemko3a3b53b2007-05-22 23:59:54 +000057 pendingTaskCountMax_(0),
David Reiss068f4162010-03-09 05:19:45 +000058 expiredCount_(0),
David Reissa0dbfef2010-03-09 05:19:32 +000059 state_(ThreadManager::UNINITIALIZED),
60 monitor_(&mutex_),
61 maxMonitor_(&mutex_) {}
Marc Slemkod466b212006-07-20 00:04:18 +000062
Mark Sleef5f2be42006-09-05 21:05:31 +000063 ~Impl() { stop(); }
Marc Slemkod466b212006-07-20 00:04:18 +000064
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000065 void start();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000066
Mark Slee7c10eaf2007-03-01 02:45:10 +000067 void stop() { stopImpl(false); }
68
69 void join() { stopImpl(true); }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000070
Mark Slee2f6404d2006-10-10 01:37:40 +000071 const ThreadManager::STATE state() const {
72 return state_;
73 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000074
Marc Slemko6f038a72006-08-03 18:58:09 +000075 shared_ptr<ThreadFactory> threadFactory() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000076 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000077 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000078 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000079
80 void threadFactory(shared_ptr<ThreadFactory> value) {
Mark Slee2f6404d2006-10-10 01:37:40 +000081 Synchronized s(monitor_);
82 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000083 }
84
Marc Slemkod466b212006-07-20 00:04:18 +000085 void addWorker(size_t value);
Marc Slemko3a3b53b2007-05-22 23:59:54 +000086
Marc Slemkod466b212006-07-20 00:04:18 +000087 void removeWorker(size_t value);
Marc Slemko3a3b53b2007-05-22 23:59:54 +000088
Mark Slee2f6404d2006-10-10 01:37:40 +000089 size_t idleWorkerCount() const {
90 return idleCount_;
91 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000092
93 size_t workerCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000094 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000095 return workerCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000096 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000097
Marc Slemko0e53ccd2006-07-17 23:51:05 +000098 size_t pendingTaskCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000099 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000100 return tasks_.size();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000101 }
102
103 size_t totalTaskCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000104 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000105 return tasks_.size() + workerCount_ - idleCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000106 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000107
108 size_t pendingTaskCountMax() const {
109 Synchronized s(monitor_);
110 return pendingTaskCountMax_;
111 }
112
David Reiss068f4162010-03-09 05:19:45 +0000113 size_t expiredTaskCount() {
114 Synchronized s(monitor_);
115 size_t result = expiredCount_;
116 expiredCount_ = 0;
117 return result;
118 }
119
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000120 void pendingTaskCountMax(const size_t value) {
121 Synchronized s(monitor_);
122 pendingTaskCountMax_ = value;
123 }
124
125 bool canSleep();
126
David Reiss068f4162010-03-09 05:19:45 +0000127 void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000128
Marc Slemko6f038a72006-08-03 18:58:09 +0000129 void remove(shared_ptr<Runnable> task);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000130
David Reiss01fe1532010-03-09 05:19:25 +0000131 shared_ptr<Runnable> removeNextPending();
132
David Reiss068f4162010-03-09 05:19:45 +0000133 void removeExpiredTasks();
134
135 void setExpireCallback(ExpireCallback expireCallback);
136
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000137private:
Mark Slee7c10eaf2007-03-01 02:45:10 +0000138 void stopImpl(bool join);
139
Mark Slee2f6404d2006-10-10 01:37:40 +0000140 size_t workerCount_;
141 size_t workerMaxCount_;
142 size_t idleCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000143 size_t pendingTaskCountMax_;
David Reiss068f4162010-03-09 05:19:45 +0000144 size_t expiredCount_;
145 ExpireCallback expireCallback_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000146
Mark Slee2f6404d2006-10-10 01:37:40 +0000147 ThreadManager::STATE state_;
148 shared_ptr<ThreadFactory> threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000149
Mark Sleef5f2be42006-09-05 21:05:31 +0000150
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000151 friend class ThreadManager::Task;
Mark Slee2f6404d2006-10-10 01:37:40 +0000152 std::queue<shared_ptr<Task> > tasks_;
David Reissa0dbfef2010-03-09 05:19:32 +0000153 Mutex mutex_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000154 Monitor monitor_;
David Reissa0dbfef2010-03-09 05:19:32 +0000155 Monitor maxMonitor_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000156 Monitor workerMonitor_;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000157
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000158 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000159 std::set<shared_ptr<Thread> > workers_;
160 std::set<shared_ptr<Thread> > deadWorkers_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000161 std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000162};
Marc Slemko66949872006-07-15 01:52:39 +0000163
164class ThreadManager::Task : public Runnable {
165
Mark Sleef5f2be42006-09-05 21:05:31 +0000166 public:
Marc Slemko66949872006-07-15 01:52:39 +0000167 enum STATE {
168 WAITING,
169 EXECUTING,
170 CANCELLED,
171 COMPLETE
172 };
173
David Reiss068f4162010-03-09 05:19:45 +0000174 Task(shared_ptr<Runnable> runnable, int64_t expiration=0LL) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000175 runnable_(runnable),
David Reiss068f4162010-03-09 05:19:45 +0000176 state_(WAITING),
177 expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {}
Marc Slemko66949872006-07-15 01:52:39 +0000178
Mark Sleef5f2be42006-09-05 21:05:31 +0000179 ~Task() {}
Marc Slemko66949872006-07-15 01:52:39 +0000180
181 void run() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000182 if (state_ == EXECUTING) {
183 runnable_->run();
184 state_ = COMPLETE;
Marc Slemko66949872006-07-15 01:52:39 +0000185 }
186 }
187
David Reiss01fe1532010-03-09 05:19:25 +0000188 shared_ptr<Runnable> getRunnable() {
189 return runnable_;
190 }
191
David Reiss068f4162010-03-09 05:19:45 +0000192 int64_t getExpireTime() const {
193 return expireTime_;
194 }
195
Marc Slemko66949872006-07-15 01:52:39 +0000196 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000197 shared_ptr<Runnable> runnable_;
Marc Slemkod466b212006-07-20 00:04:18 +0000198 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000199 STATE state_;
David Reiss068f4162010-03-09 05:19:45 +0000200 int64_t expireTime_;
Marc Slemko66949872006-07-15 01:52:39 +0000201};
202
203class ThreadManager::Worker: public Runnable {
Marc Slemko66949872006-07-15 01:52:39 +0000204 enum STATE {
205 UNINITIALIZED,
206 STARTING,
207 STARTED,
208 STOPPING,
209 STOPPED
210 };
211
212 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000213 Worker(ThreadManager::Impl* manager) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000214 manager_(manager),
215 state_(UNINITIALIZED),
216 idle_(false) {}
Marc Slemko66949872006-07-15 01:52:39 +0000217
218 ~Worker() {}
219
Mark Slee7c10eaf2007-03-01 02:45:10 +0000220 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000221 bool isActive() const {
Mark Slee7c10eaf2007-03-01 02:45:10 +0000222 return
223 (manager_->workerCount_ <= manager_->workerMaxCount_) ||
224 (manager_->state_ == JOINING && !manager_->tasks_.empty());
Mark Slee2f6404d2006-10-10 01:37:40 +0000225 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000226
Mark Slee7c10eaf2007-03-01 02:45:10 +0000227 public:
Mark Sleef5f2be42006-09-05 21:05:31 +0000228 /**
229 * Worker entry point
230 *
231 * As long as worker thread is running, pull tasks off the task queue and
232 * execute.
233 */
Marc Slemko66949872006-07-15 01:52:39 +0000234 void run() {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000235 bool active = false;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000236 bool notifyManager = false;
237
Mark Sleef5f2be42006-09-05 21:05:31 +0000238 /**
239 * Increment worker semaphore and notify manager if worker count reached
240 * desired max
241 *
242 * Note: We have to release the monitor and acquire the workerMonitor
243 * since that is what the manager blocks on for worker add/remove
244 */
245 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000246 Synchronized s(manager_->monitor_);
247 active = manager_->workerCount_ < manager_->workerMaxCount_;
Mark Sleef5f2be42006-09-05 21:05:31 +0000248 if (active) {
David Reiss96d23882007-07-26 21:10:32 +0000249 manager_->workerCount_++;
250 notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
Marc Slemko66949872006-07-15 01:52:39 +0000251 }
252 }
253
Mark Sleef5f2be42006-09-05 21:05:31 +0000254 if (notifyManager) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000255 Synchronized s(manager_->workerMonitor_);
256 manager_->workerMonitor_.notify();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000257 notifyManager = false;
258 }
259
Mark Sleef5f2be42006-09-05 21:05:31 +0000260 while (active) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000261 shared_ptr<ThreadManager::Task> task;
Marc Slemko66949872006-07-15 01:52:39 +0000262
Mark Sleef5f2be42006-09-05 21:05:31 +0000263 /**
264 * While holding manager monitor block for non-empty task queue (Also
265 * check that the thread hasn't been requested to stop). Once the queue
266 * is non-empty, dequeue a task, release monitor, and execute. If the
267 * worker max count has been decremented such that we exceed it, mark
268 * ourself inactive, decrement the worker count and notify the manager
269 * (technically we're notifying the next blocked thread but eventually
270 * the manager will see it.
271 */
272 {
David Reissa0dbfef2010-03-09 05:19:32 +0000273 Guard g(manager_->mutex_);
David Reiss96d23882007-07-26 21:10:32 +0000274 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000275
David Reiss96d23882007-07-26 21:10:32 +0000276 while (active && manager_->tasks_.empty()) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000277 manager_->idleCount_++;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000278 idle_ = true;
Mark Slee2f6404d2006-10-10 01:37:40 +0000279 manager_->monitor_.wait();
Mark Sleef5f2be42006-09-05 21:05:31 +0000280 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000281 idle_ = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000282 manager_->idleCount_--;
David Reiss96d23882007-07-26 21:10:32 +0000283 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000284
David Reiss96d23882007-07-26 21:10:32 +0000285 if (active) {
David Reiss068f4162010-03-09 05:19:45 +0000286 manager_->removeExpiredTasks();
287
Mark Slee2f6404d2006-10-10 01:37:40 +0000288 if (!manager_->tasks_.empty()) {
289 task = manager_->tasks_.front();
290 manager_->tasks_.pop();
291 if (task->state_ == ThreadManager::Task::WAITING) {
292 task->state_ = ThreadManager::Task::EXECUTING;
David Reiss96d23882007-07-26 21:10:32 +0000293 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000294
295 /* If we have a pending task max and we just dropped below it, wakeup any
296 thread that might be blocked on add. */
Mark Slee2782d6d2007-05-23 04:55:30 +0000297 if (manager_->pendingTaskCountMax_ != 0 &&
David Reiss4cc07552010-03-09 05:20:01 +0000298 manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
David Reissa0dbfef2010-03-09 05:19:32 +0000299 manager_->maxMonitor_.notify();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000300 }
David Reiss96d23882007-07-26 21:10:32 +0000301 }
302 } else {
303 idle_ = true;
304 manager_->workerCount_--;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000305 notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
David Reiss96d23882007-07-26 21:10:32 +0000306 }
Marc Slemko66949872006-07-15 01:52:39 +0000307 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000308
Mark Sleef5f2be42006-09-05 21:05:31 +0000309 if (task != NULL) {
David Reiss96d23882007-07-26 21:10:32 +0000310 if (task->state_ == ThreadManager::Task::EXECUTING) {
311 try {
Mark Sleef5f2be42006-09-05 21:05:31 +0000312 task->run();
313 } catch(...) {
314 // XXX need to log this
David Reiss96d23882007-07-26 21:10:32 +0000315 }
316 }
Marc Slemko66949872006-07-15 01:52:39 +0000317 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000318 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000319
Mark Sleef5f2be42006-09-05 21:05:31 +0000320 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000321 Synchronized s(manager_->workerMonitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000322 manager_->deadWorkers_.insert(this->thread());
Mark Sleef5f2be42006-09-05 21:05:31 +0000323 if (notifyManager) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000324 manager_->workerMonitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000325 }
326 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000327
Marc Slemko66949872006-07-15 01:52:39 +0000328 return;
329 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000330
Mark Sleef5f2be42006-09-05 21:05:31 +0000331 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000332 ThreadManager::Impl* manager_;
Mark Sleef5f2be42006-09-05 21:05:31 +0000333 friend class ThreadManager::Impl;
Mark Slee2f6404d2006-10-10 01:37:40 +0000334 STATE state_;
335 bool idle_;
Marc Slemko66949872006-07-15 01:52:39 +0000336};
337
Mark Sleef5f2be42006-09-05 21:05:31 +0000338
339 void ThreadManager::Impl::addWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000340 std::set<shared_ptr<Thread> > newThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000341 for (size_t ix = 0; ix < value; ix++) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000342 shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
Mark Slee2f6404d2006-10-10 01:37:40 +0000343 newThreads.insert(threadFactory_->newThread(worker));
Marc Slemko66949872006-07-15 01:52:39 +0000344 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000345
Mark Sleef5f2be42006-09-05 21:05:31 +0000346 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000347 Synchronized s(monitor_);
348 workerMaxCount_ += value;
349 workers_.insert(newThreads.begin(), newThreads.end());
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000350 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000351
Mark Sleef5f2be42006-09-05 21:05:31 +0000352 for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000353 shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
Mark Slee2f6404d2006-10-10 01:37:40 +0000354 worker->state_ = ThreadManager::Worker::STARTING;
Marc Slemkod466b212006-07-20 00:04:18 +0000355 (*ix)->start();
Marc Slemkoa6479032007-06-05 22:20:14 +0000356 idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
Marc Slemkod466b212006-07-20 00:04:18 +0000357 }
358
Mark Sleef5f2be42006-09-05 21:05:31 +0000359 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000360 Synchronized s(workerMonitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000361 while (workerCount_ != workerMaxCount_) {
362 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000363 }
364 }
365}
Marc Slemkod466b212006-07-20 00:04:18 +0000366
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000367void ThreadManager::Impl::start() {
368
Mark Slee2f6404d2006-10-10 01:37:40 +0000369 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000370 return;
371 }
372
Mark Sleef5f2be42006-09-05 21:05:31 +0000373 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000374 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000375 if (state_ == ThreadManager::UNINITIALIZED) {
376 if (threadFactory_ == NULL) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000377 throw InvalidArgumentException();
378 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000379 state_ = ThreadManager::STARTED;
380 monitor_.notifyAll();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000381 }
382
Mark Slee2f6404d2006-10-10 01:37:40 +0000383 while (state_ == STARTING) {
384 monitor_.wait();
Marc Slemkod466b212006-07-20 00:04:18 +0000385 }
386 }
387}
388
Mark Slee7c10eaf2007-03-01 02:45:10 +0000389void ThreadManager::Impl::stopImpl(bool join) {
Marc Slemkod466b212006-07-20 00:04:18 +0000390 bool doStop = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000391 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000392 return;
393 }
394
Mark Sleef5f2be42006-09-05 21:05:31 +0000395 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000396 Synchronized s(monitor_);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000397 if (state_ != ThreadManager::STOPPING &&
398 state_ != ThreadManager::JOINING &&
399 state_ != ThreadManager::STOPPED) {
Marc Slemkod466b212006-07-20 00:04:18 +0000400 doStop = true;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000401 state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
Marc Slemkod466b212006-07-20 00:04:18 +0000402 }
403 }
404
Mark Sleef5f2be42006-09-05 21:05:31 +0000405 if (doStop) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000406 removeWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000407 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000408
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000409 // XXX
Mark Sleef5f2be42006-09-05 21:05:31 +0000410 // should be able to block here for transition to STOPPED since we're no
411 // using shared_ptrs
Mark Slee7c10eaf2007-03-01 02:45:10 +0000412
413 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000414 Synchronized s(monitor_);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000415 state_ = ThreadManager::STOPPED;
416 }
417
Marc Slemkod466b212006-07-20 00:04:18 +0000418}
Mark Slee7c10eaf2007-03-01 02:45:10 +0000419
Marc Slemkod466b212006-07-20 00:04:18 +0000420void ThreadManager::Impl::removeWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000421 std::set<shared_ptr<Thread> > removedThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000422 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000423 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000424 if (value > workerMaxCount_) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000425 throw InvalidArgumentException();
426 }
427
Mark Slee2f6404d2006-10-10 01:37:40 +0000428 workerMaxCount_ -= value;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000429
Mark Slee2f6404d2006-10-10 01:37:40 +0000430 if (idleCount_ < value) {
431 for (size_t ix = 0; ix < idleCount_; ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000432 monitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000433 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000434 } else {
Mark Slee2f6404d2006-10-10 01:37:40 +0000435 monitor_.notifyAll();
Marc Slemko66949872006-07-15 01:52:39 +0000436 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000437 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000438
Mark Sleef5f2be42006-09-05 21:05:31 +0000439 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000440 Synchronized s(workerMonitor_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000441
Mark Slee2f6404d2006-10-10 01:37:40 +0000442 while (workerCount_ != workerMaxCount_) {
443 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000444 }
445
Mark Slee2f6404d2006-10-10 01:37:40 +0000446 for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
447 workers_.erase(*ix);
Marc Slemkoa6479032007-06-05 22:20:14 +0000448 idMap_.erase((*ix)->getId());
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000449 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000450
Mark Slee2f6404d2006-10-10 01:37:40 +0000451 deadWorkers_.clear();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000452 }
453}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000454
455 bool ThreadManager::Impl::canSleep() {
Marc Slemkoa6479032007-06-05 22:20:14 +0000456 const Thread::id_t id = threadFactory_->getCurrentThreadId();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000457 return idMap_.find(id) == idMap_.end();
458 }
459
David Reiss068f4162010-03-09 05:19:45 +0000460 void ThreadManager::Impl::add(shared_ptr<Runnable> value,
461 int64_t timeout,
462 int64_t expiration) {
David Reiss4e19f192010-03-09 05:19:59 +0000463 Guard g(mutex_, timeout);
464
465 if (!g) {
466 throw TimedOutException();
467 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000468
Mark Slee2f6404d2006-10-10 01:37:40 +0000469 if (state_ != ThreadManager::STARTED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000470 throw IllegalStateException();
471 }
Marc Slemkod466b212006-07-20 00:04:18 +0000472
David Reiss068f4162010-03-09 05:19:45 +0000473 removeExpiredTasks();
Mark Slee2782d6d2007-05-23 04:55:30 +0000474 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
Aditya Agarwal4b6ff2d2007-12-25 22:58:50 +0000475 if (canSleep() && timeout >= 0) {
Mark Slee2782d6d2007-05-23 04:55:30 +0000476 while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
David Reissa0dbfef2010-03-09 05:19:32 +0000477 // This is thread safe because the mutex is shared between monitors.
478 maxMonitor_.wait(timeout);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000479 }
480 } else {
481 throw TooManyPendingTasksException();
482 }
483 }
484
David Reiss068f4162010-03-09 05:19:45 +0000485 tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration)));
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000486
Mark Sleef5f2be42006-09-05 21:05:31 +0000487 // If idle thread is available notify it, otherwise all worker threads are
488 // running and will get around to this task in time.
Mark Slee2f6404d2006-10-10 01:37:40 +0000489 if (idleCount_ > 0) {
490 monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000491 }
Marc Slemko66949872006-07-15 01:52:39 +0000492 }
493
Marc Slemko6f038a72006-08-03 18:58:09 +0000494void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000495 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000496 if (state_ != ThreadManager::STARTED) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000497 throw IllegalStateException();
498 }
Marc Slemko66949872006-07-15 01:52:39 +0000499}
500
David Reiss01fe1532010-03-09 05:19:25 +0000501boost::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
502 Guard g(mutex_);
503 if (state_ != ThreadManager::STARTED) {
504 throw IllegalStateException();
505 }
506
507 if (tasks_.empty()) {
508 return boost::shared_ptr<Runnable>();
509 }
510
511 shared_ptr<ThreadManager::Task> task = tasks_.front();
512 tasks_.pop();
513
514 return task->getRunnable();
515}
516
David Reiss068f4162010-03-09 05:19:45 +0000517void ThreadManager::Impl::removeExpiredTasks() {
518 int64_t now = 0LL; // we won't ask for the time untile we need it
519
520 // note that this loop breaks at the first non-expiring task
521 while (!tasks_.empty()) {
522 shared_ptr<ThreadManager::Task> task = tasks_.front();
523 if (task->getExpireTime() == 0LL) {
524 break;
525 }
526 if (now == 0LL) {
527 now = Util::currentTime();
528 }
529 if (task->getExpireTime() > now) {
530 break;
531 }
532 if (expireCallback_) {
533 expireCallback_(task->getRunnable());
534 }
535 tasks_.pop();
536 expiredCount_++;
537 }
538}
539
540
541void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
542 expireCallback_ = expireCallback;
543}
544
Marc Slemkod466b212006-07-20 00:04:18 +0000545class SimpleThreadManager : public ThreadManager::Impl {
Marc Slemko66949872006-07-15 01:52:39 +0000546
Mark Slee2782d6d2007-05-23 04:55:30 +0000547 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000548 SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000549 workerCount_(workerCount),
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000550 pendingTaskCountMax_(pendingTaskCountMax),
Mark Slee2f6404d2006-10-10 01:37:40 +0000551 firstTime_(true) {
Marc Slemkod466b212006-07-20 00:04:18 +0000552 }
Marc Slemko66949872006-07-15 01:52:39 +0000553
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000554 void start() {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000555 ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000556 ThreadManager::Impl::start();
Mark Slee2f6404d2006-10-10 01:37:40 +0000557 addWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000558 }
559
Mark Slee2782d6d2007-05-23 04:55:30 +0000560 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000561 const size_t workerCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000562 const size_t pendingTaskCountMax_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000563 bool firstTime_;
564 Monitor monitor_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000565};
Marc Slemko66949872006-07-15 01:52:39 +0000566
Marc Slemko66949872006-07-15 01:52:39 +0000567
Marc Slemko6f038a72006-08-03 18:58:09 +0000568shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
569 return shared_ptr<ThreadManager>(new ThreadManager::Impl());
Marc Slemkod466b212006-07-20 00:04:18 +0000570}
Marc Slemko66949872006-07-15 01:52:39 +0000571
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000572shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) {
573 return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
Marc Slemkod466b212006-07-20 00:04:18 +0000574}
Marc Slemko66949872006-07-15 01:52:39 +0000575
T Jake Lucianib5e62212009-01-31 22:36:20 +0000576}}} // apache::thrift::concurrency
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000577