blob: b756bf150ee9511dd5b0aeeb4ac5815bf39a8377 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Roger Meier12d70532011-12-14 23:35:28 +000020#ifdef HAVE_CONFIG_H
21#include <config.h>
22#endif
23
Marc Slemko66949872006-07-15 01:52:39 +000024#include "ThreadManager.h"
Marc Slemkod466b212006-07-20 00:04:18 +000025#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +000026#include "Monitor.h"
David Reiss068f4162010-03-09 05:19:45 +000027#include "Util.h"
Marc Slemko66949872006-07-15 01:52:39 +000028
Marc Slemko6f038a72006-08-03 18:58:09 +000029#include <boost/shared_ptr.hpp>
30
Marc Slemko66949872006-07-15 01:52:39 +000031#include <assert.h>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000032#include <queue>
33#include <set>
Marc Slemko66949872006-07-15 01:52:39 +000034
Marc Slemko6f038a72006-08-03 18:58:09 +000035#if defined(DEBUG)
36#include <iostream>
37#endif //defined(DEBUG)
38
T Jake Lucianib5e62212009-01-31 22:36:20 +000039namespace apache { namespace thrift { namespace concurrency {
Marc Slemko66949872006-07-15 01:52:39 +000040
David Reissd4a269c2007-08-23 02:37:19 +000041using boost::shared_ptr;
42using boost::dynamic_pointer_cast;
Marc Slemko6f038a72006-08-03 18:58:09 +000043
Mark Sleef5f2be42006-09-05 21:05:31 +000044/**
45 * ThreadManager class
Marc Slemko3a3b53b2007-05-22 23:59:54 +000046 *
Mark Sleef5f2be42006-09-05 21:05:31 +000047 * This class manages a pool of threads. It uses a ThreadFactory to create
48 * threads. It never actually creates or destroys worker threads, rather
49 * it maintains statistics on number of idle threads, number of active threads,
50 * task backlog, and average wait and service times.
51 *
Mark Sleef5f2be42006-09-05 21:05:31 +000052 * @version $Id:$
53 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000054class ThreadManager::Impl : public ThreadManager {
55
56 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000057 Impl() :
Mark Slee2f6404d2006-10-10 01:37:40 +000058 workerCount_(0),
59 workerMaxCount_(0),
60 idleCount_(0),
Marc Slemko3a3b53b2007-05-22 23:59:54 +000061 pendingTaskCountMax_(0),
David Reiss068f4162010-03-09 05:19:45 +000062 expiredCount_(0),
David Reissa0dbfef2010-03-09 05:19:32 +000063 state_(ThreadManager::UNINITIALIZED),
64 monitor_(&mutex_),
65 maxMonitor_(&mutex_) {}
Marc Slemkod466b212006-07-20 00:04:18 +000066
Mark Sleef5f2be42006-09-05 21:05:31 +000067 ~Impl() { stop(); }
Marc Slemkod466b212006-07-20 00:04:18 +000068
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000069 void start();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000070
Mark Slee7c10eaf2007-03-01 02:45:10 +000071 void stop() { stopImpl(false); }
72
73 void join() { stopImpl(true); }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000074
Roger Meier3b771a12010-11-17 22:11:26 +000075 ThreadManager::STATE state() const {
Mark Slee2f6404d2006-10-10 01:37:40 +000076 return state_;
77 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000078
Marc Slemko6f038a72006-08-03 18:58:09 +000079 shared_ptr<ThreadFactory> threadFactory() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000080 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000081 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000082 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000083
84 void threadFactory(shared_ptr<ThreadFactory> value) {
Mark Slee2f6404d2006-10-10 01:37:40 +000085 Synchronized s(monitor_);
86 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000087 }
88
Marc Slemkod466b212006-07-20 00:04:18 +000089 void addWorker(size_t value);
Marc Slemko3a3b53b2007-05-22 23:59:54 +000090
Marc Slemkod466b212006-07-20 00:04:18 +000091 void removeWorker(size_t value);
Marc Slemko3a3b53b2007-05-22 23:59:54 +000092
Mark Slee2f6404d2006-10-10 01:37:40 +000093 size_t idleWorkerCount() const {
94 return idleCount_;
95 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000096
97 size_t workerCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000098 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000099 return workerCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000100 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000101
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000102 size_t pendingTaskCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000103 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000104 return tasks_.size();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000105 }
106
107 size_t totalTaskCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000108 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000109 return tasks_.size() + workerCount_ - idleCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000110 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000111
112 size_t pendingTaskCountMax() const {
113 Synchronized s(monitor_);
114 return pendingTaskCountMax_;
115 }
116
David Reiss068f4162010-03-09 05:19:45 +0000117 size_t expiredTaskCount() {
118 Synchronized s(monitor_);
119 size_t result = expiredCount_;
120 expiredCount_ = 0;
121 return result;
122 }
123
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000124 void pendingTaskCountMax(const size_t value) {
125 Synchronized s(monitor_);
126 pendingTaskCountMax_ = value;
127 }
128
129 bool canSleep();
130
David Reiss068f4162010-03-09 05:19:45 +0000131 void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000132
Marc Slemko6f038a72006-08-03 18:58:09 +0000133 void remove(shared_ptr<Runnable> task);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000134
David Reiss01fe1532010-03-09 05:19:25 +0000135 shared_ptr<Runnable> removeNextPending();
136
David Reiss068f4162010-03-09 05:19:45 +0000137 void removeExpiredTasks();
138
139 void setExpireCallback(ExpireCallback expireCallback);
140
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000141private:
Mark Slee7c10eaf2007-03-01 02:45:10 +0000142 void stopImpl(bool join);
143
Mark Slee2f6404d2006-10-10 01:37:40 +0000144 size_t workerCount_;
145 size_t workerMaxCount_;
146 size_t idleCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000147 size_t pendingTaskCountMax_;
David Reiss068f4162010-03-09 05:19:45 +0000148 size_t expiredCount_;
149 ExpireCallback expireCallback_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000150
Mark Slee2f6404d2006-10-10 01:37:40 +0000151 ThreadManager::STATE state_;
152 shared_ptr<ThreadFactory> threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000153
Mark Sleef5f2be42006-09-05 21:05:31 +0000154
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000155 friend class ThreadManager::Task;
Mark Slee2f6404d2006-10-10 01:37:40 +0000156 std::queue<shared_ptr<Task> > tasks_;
David Reissa0dbfef2010-03-09 05:19:32 +0000157 Mutex mutex_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000158 Monitor monitor_;
David Reissa0dbfef2010-03-09 05:19:32 +0000159 Monitor maxMonitor_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000160 Monitor workerMonitor_;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000161
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000162 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000163 std::set<shared_ptr<Thread> > workers_;
164 std::set<shared_ptr<Thread> > deadWorkers_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000165 std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000166};
Marc Slemko66949872006-07-15 01:52:39 +0000167
168class ThreadManager::Task : public Runnable {
169
Mark Sleef5f2be42006-09-05 21:05:31 +0000170 public:
Marc Slemko66949872006-07-15 01:52:39 +0000171 enum STATE {
172 WAITING,
173 EXECUTING,
174 CANCELLED,
175 COMPLETE
176 };
177
David Reiss068f4162010-03-09 05:19:45 +0000178 Task(shared_ptr<Runnable> runnable, int64_t expiration=0LL) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000179 runnable_(runnable),
David Reiss068f4162010-03-09 05:19:45 +0000180 state_(WAITING),
181 expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {}
Marc Slemko66949872006-07-15 01:52:39 +0000182
Mark Sleef5f2be42006-09-05 21:05:31 +0000183 ~Task() {}
Marc Slemko66949872006-07-15 01:52:39 +0000184
185 void run() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000186 if (state_ == EXECUTING) {
187 runnable_->run();
188 state_ = COMPLETE;
Marc Slemko66949872006-07-15 01:52:39 +0000189 }
190 }
191
David Reiss01fe1532010-03-09 05:19:25 +0000192 shared_ptr<Runnable> getRunnable() {
193 return runnable_;
194 }
195
David Reiss068f4162010-03-09 05:19:45 +0000196 int64_t getExpireTime() const {
197 return expireTime_;
198 }
199
Marc Slemko66949872006-07-15 01:52:39 +0000200 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000201 shared_ptr<Runnable> runnable_;
Marc Slemkod466b212006-07-20 00:04:18 +0000202 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000203 STATE state_;
David Reiss068f4162010-03-09 05:19:45 +0000204 int64_t expireTime_;
Marc Slemko66949872006-07-15 01:52:39 +0000205};
206
207class ThreadManager::Worker: public Runnable {
Marc Slemko66949872006-07-15 01:52:39 +0000208 enum STATE {
209 UNINITIALIZED,
210 STARTING,
211 STARTED,
212 STOPPING,
213 STOPPED
214 };
215
216 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000217 Worker(ThreadManager::Impl* manager) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000218 manager_(manager),
219 state_(UNINITIALIZED),
220 idle_(false) {}
Marc Slemko66949872006-07-15 01:52:39 +0000221
222 ~Worker() {}
223
Mark Slee7c10eaf2007-03-01 02:45:10 +0000224 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000225 bool isActive() const {
Mark Slee7c10eaf2007-03-01 02:45:10 +0000226 return
227 (manager_->workerCount_ <= manager_->workerMaxCount_) ||
228 (manager_->state_ == JOINING && !manager_->tasks_.empty());
Mark Slee2f6404d2006-10-10 01:37:40 +0000229 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000230
Mark Slee7c10eaf2007-03-01 02:45:10 +0000231 public:
Mark Sleef5f2be42006-09-05 21:05:31 +0000232 /**
233 * Worker entry point
234 *
235 * As long as worker thread is running, pull tasks off the task queue and
236 * execute.
237 */
Marc Slemko66949872006-07-15 01:52:39 +0000238 void run() {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000239 bool active = false;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000240 bool notifyManager = false;
241
Mark Sleef5f2be42006-09-05 21:05:31 +0000242 /**
243 * Increment worker semaphore and notify manager if worker count reached
244 * desired max
245 *
246 * Note: We have to release the monitor and acquire the workerMonitor
247 * since that is what the manager blocks on for worker add/remove
248 */
249 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000250 Synchronized s(manager_->monitor_);
251 active = manager_->workerCount_ < manager_->workerMaxCount_;
Mark Sleef5f2be42006-09-05 21:05:31 +0000252 if (active) {
David Reiss96d23882007-07-26 21:10:32 +0000253 manager_->workerCount_++;
254 notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
Marc Slemko66949872006-07-15 01:52:39 +0000255 }
256 }
257
Mark Sleef5f2be42006-09-05 21:05:31 +0000258 if (notifyManager) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000259 Synchronized s(manager_->workerMonitor_);
260 manager_->workerMonitor_.notify();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000261 notifyManager = false;
262 }
263
Mark Sleef5f2be42006-09-05 21:05:31 +0000264 while (active) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000265 shared_ptr<ThreadManager::Task> task;
Marc Slemko66949872006-07-15 01:52:39 +0000266
Mark Sleef5f2be42006-09-05 21:05:31 +0000267 /**
268 * While holding manager monitor block for non-empty task queue (Also
269 * check that the thread hasn't been requested to stop). Once the queue
270 * is non-empty, dequeue a task, release monitor, and execute. If the
271 * worker max count has been decremented such that we exceed it, mark
272 * ourself inactive, decrement the worker count and notify the manager
273 * (technically we're notifying the next blocked thread but eventually
274 * the manager will see it.
275 */
276 {
David Reissa0dbfef2010-03-09 05:19:32 +0000277 Guard g(manager_->mutex_);
David Reiss96d23882007-07-26 21:10:32 +0000278 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000279
David Reiss96d23882007-07-26 21:10:32 +0000280 while (active && manager_->tasks_.empty()) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000281 manager_->idleCount_++;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000282 idle_ = true;
Mark Slee2f6404d2006-10-10 01:37:40 +0000283 manager_->monitor_.wait();
Mark Sleef5f2be42006-09-05 21:05:31 +0000284 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000285 idle_ = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000286 manager_->idleCount_--;
David Reiss96d23882007-07-26 21:10:32 +0000287 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000288
David Reiss96d23882007-07-26 21:10:32 +0000289 if (active) {
David Reiss068f4162010-03-09 05:19:45 +0000290 manager_->removeExpiredTasks();
291
Mark Slee2f6404d2006-10-10 01:37:40 +0000292 if (!manager_->tasks_.empty()) {
293 task = manager_->tasks_.front();
294 manager_->tasks_.pop();
295 if (task->state_ == ThreadManager::Task::WAITING) {
296 task->state_ = ThreadManager::Task::EXECUTING;
David Reiss96d23882007-07-26 21:10:32 +0000297 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000298
299 /* If we have a pending task max and we just dropped below it, wakeup any
300 thread that might be blocked on add. */
Mark Slee2782d6d2007-05-23 04:55:30 +0000301 if (manager_->pendingTaskCountMax_ != 0 &&
David Reiss4cc07552010-03-09 05:20:01 +0000302 manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
David Reissa0dbfef2010-03-09 05:19:32 +0000303 manager_->maxMonitor_.notify();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000304 }
David Reiss96d23882007-07-26 21:10:32 +0000305 }
306 } else {
307 idle_ = true;
308 manager_->workerCount_--;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000309 notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
David Reiss96d23882007-07-26 21:10:32 +0000310 }
Marc Slemko66949872006-07-15 01:52:39 +0000311 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000312
Mark Sleef5f2be42006-09-05 21:05:31 +0000313 if (task != NULL) {
David Reiss96d23882007-07-26 21:10:32 +0000314 if (task->state_ == ThreadManager::Task::EXECUTING) {
315 try {
Mark Sleef5f2be42006-09-05 21:05:31 +0000316 task->run();
317 } catch(...) {
318 // XXX need to log this
David Reiss96d23882007-07-26 21:10:32 +0000319 }
320 }
Marc Slemko66949872006-07-15 01:52:39 +0000321 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000322 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000323
Mark Sleef5f2be42006-09-05 21:05:31 +0000324 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000325 Synchronized s(manager_->workerMonitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000326 manager_->deadWorkers_.insert(this->thread());
Mark Sleef5f2be42006-09-05 21:05:31 +0000327 if (notifyManager) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000328 manager_->workerMonitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000329 }
330 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000331
Marc Slemko66949872006-07-15 01:52:39 +0000332 return;
333 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000334
Mark Sleef5f2be42006-09-05 21:05:31 +0000335 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000336 ThreadManager::Impl* manager_;
Mark Sleef5f2be42006-09-05 21:05:31 +0000337 friend class ThreadManager::Impl;
Mark Slee2f6404d2006-10-10 01:37:40 +0000338 STATE state_;
339 bool idle_;
Marc Slemko66949872006-07-15 01:52:39 +0000340};
341
Mark Sleef5f2be42006-09-05 21:05:31 +0000342
343 void ThreadManager::Impl::addWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000344 std::set<shared_ptr<Thread> > newThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000345 for (size_t ix = 0; ix < value; ix++) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000346 shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
Mark Slee2f6404d2006-10-10 01:37:40 +0000347 newThreads.insert(threadFactory_->newThread(worker));
Marc Slemko66949872006-07-15 01:52:39 +0000348 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000349
Mark Sleef5f2be42006-09-05 21:05:31 +0000350 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000351 Synchronized s(monitor_);
352 workerMaxCount_ += value;
353 workers_.insert(newThreads.begin(), newThreads.end());
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000354 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000355
Mark Sleef5f2be42006-09-05 21:05:31 +0000356 for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000357 shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
Mark Slee2f6404d2006-10-10 01:37:40 +0000358 worker->state_ = ThreadManager::Worker::STARTING;
Marc Slemkod466b212006-07-20 00:04:18 +0000359 (*ix)->start();
Marc Slemkoa6479032007-06-05 22:20:14 +0000360 idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
Marc Slemkod466b212006-07-20 00:04:18 +0000361 }
362
Mark Sleef5f2be42006-09-05 21:05:31 +0000363 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000364 Synchronized s(workerMonitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000365 while (workerCount_ != workerMaxCount_) {
366 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000367 }
368 }
369}
Marc Slemkod466b212006-07-20 00:04:18 +0000370
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000371void ThreadManager::Impl::start() {
372
Mark Slee2f6404d2006-10-10 01:37:40 +0000373 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000374 return;
375 }
376
Mark Sleef5f2be42006-09-05 21:05:31 +0000377 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000378 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000379 if (state_ == ThreadManager::UNINITIALIZED) {
380 if (threadFactory_ == NULL) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000381 throw InvalidArgumentException();
382 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000383 state_ = ThreadManager::STARTED;
384 monitor_.notifyAll();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000385 }
386
Mark Slee2f6404d2006-10-10 01:37:40 +0000387 while (state_ == STARTING) {
388 monitor_.wait();
Marc Slemkod466b212006-07-20 00:04:18 +0000389 }
390 }
391}
392
Mark Slee7c10eaf2007-03-01 02:45:10 +0000393void ThreadManager::Impl::stopImpl(bool join) {
Marc Slemkod466b212006-07-20 00:04:18 +0000394 bool doStop = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000395 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000396 return;
397 }
398
Mark Sleef5f2be42006-09-05 21:05:31 +0000399 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000400 Synchronized s(monitor_);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000401 if (state_ != ThreadManager::STOPPING &&
402 state_ != ThreadManager::JOINING &&
403 state_ != ThreadManager::STOPPED) {
Marc Slemkod466b212006-07-20 00:04:18 +0000404 doStop = true;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000405 state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
Marc Slemkod466b212006-07-20 00:04:18 +0000406 }
407 }
408
Mark Sleef5f2be42006-09-05 21:05:31 +0000409 if (doStop) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000410 removeWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000411 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000412
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000413 // XXX
Mark Sleef5f2be42006-09-05 21:05:31 +0000414 // should be able to block here for transition to STOPPED since we're no
415 // using shared_ptrs
Mark Slee7c10eaf2007-03-01 02:45:10 +0000416
417 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000418 Synchronized s(monitor_);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000419 state_ = ThreadManager::STOPPED;
420 }
421
Marc Slemkod466b212006-07-20 00:04:18 +0000422}
Mark Slee7c10eaf2007-03-01 02:45:10 +0000423
Marc Slemkod466b212006-07-20 00:04:18 +0000424void ThreadManager::Impl::removeWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000425 std::set<shared_ptr<Thread> > removedThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000426 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000427 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000428 if (value > workerMaxCount_) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000429 throw InvalidArgumentException();
430 }
431
Mark Slee2f6404d2006-10-10 01:37:40 +0000432 workerMaxCount_ -= value;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000433
Mark Slee2f6404d2006-10-10 01:37:40 +0000434 if (idleCount_ < value) {
435 for (size_t ix = 0; ix < idleCount_; ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000436 monitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000437 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000438 } else {
Mark Slee2f6404d2006-10-10 01:37:40 +0000439 monitor_.notifyAll();
Marc Slemko66949872006-07-15 01:52:39 +0000440 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000441 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000442
Mark Sleef5f2be42006-09-05 21:05:31 +0000443 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000444 Synchronized s(workerMonitor_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000445
Mark Slee2f6404d2006-10-10 01:37:40 +0000446 while (workerCount_ != workerMaxCount_) {
447 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000448 }
449
Mark Slee2f6404d2006-10-10 01:37:40 +0000450 for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
451 workers_.erase(*ix);
Marc Slemkoa6479032007-06-05 22:20:14 +0000452 idMap_.erase((*ix)->getId());
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000453 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000454
Mark Slee2f6404d2006-10-10 01:37:40 +0000455 deadWorkers_.clear();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000456 }
457}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000458
459 bool ThreadManager::Impl::canSleep() {
Marc Slemkoa6479032007-06-05 22:20:14 +0000460 const Thread::id_t id = threadFactory_->getCurrentThreadId();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000461 return idMap_.find(id) == idMap_.end();
462 }
463
David Reiss068f4162010-03-09 05:19:45 +0000464 void ThreadManager::Impl::add(shared_ptr<Runnable> value,
465 int64_t timeout,
466 int64_t expiration) {
David Reiss4e19f192010-03-09 05:19:59 +0000467 Guard g(mutex_, timeout);
468
469 if (!g) {
470 throw TimedOutException();
471 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000472
Mark Slee2f6404d2006-10-10 01:37:40 +0000473 if (state_ != ThreadManager::STARTED) {
Jake Farrell4013fa32011-09-09 04:10:32 +0000474 throw IllegalStateException("ThreadManager::Impl::add ThreadManager "
475 "not started");
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000476 }
Marc Slemkod466b212006-07-20 00:04:18 +0000477
David Reiss068f4162010-03-09 05:19:45 +0000478 removeExpiredTasks();
Mark Slee2782d6d2007-05-23 04:55:30 +0000479 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
Aditya Agarwal4b6ff2d2007-12-25 22:58:50 +0000480 if (canSleep() && timeout >= 0) {
Mark Slee2782d6d2007-05-23 04:55:30 +0000481 while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
David Reissa0dbfef2010-03-09 05:19:32 +0000482 // This is thread safe because the mutex is shared between monitors.
483 maxMonitor_.wait(timeout);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000484 }
485 } else {
486 throw TooManyPendingTasksException();
487 }
488 }
489
David Reiss068f4162010-03-09 05:19:45 +0000490 tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration)));
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000491
Mark Sleef5f2be42006-09-05 21:05:31 +0000492 // If idle thread is available notify it, otherwise all worker threads are
493 // running and will get around to this task in time.
Mark Slee2f6404d2006-10-10 01:37:40 +0000494 if (idleCount_ > 0) {
495 monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000496 }
Marc Slemko66949872006-07-15 01:52:39 +0000497 }
498
Marc Slemko6f038a72006-08-03 18:58:09 +0000499void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
Roger Meier3b771a12010-11-17 22:11:26 +0000500 (void) task;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000501 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000502 if (state_ != ThreadManager::STARTED) {
Jake Farrell4013fa32011-09-09 04:10:32 +0000503 throw IllegalStateException("ThreadManager::Impl::remove ThreadManager not "
504 "started");
Mark Sleef5f2be42006-09-05 21:05:31 +0000505 }
Marc Slemko66949872006-07-15 01:52:39 +0000506}
507
David Reiss01fe1532010-03-09 05:19:25 +0000508boost::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
509 Guard g(mutex_);
510 if (state_ != ThreadManager::STARTED) {
Jake Farrell4013fa32011-09-09 04:10:32 +0000511 throw IllegalStateException("ThreadManager::Impl::removeNextPending "
512 "ThreadManager not started");
David Reiss01fe1532010-03-09 05:19:25 +0000513 }
514
515 if (tasks_.empty()) {
516 return boost::shared_ptr<Runnable>();
517 }
518
519 shared_ptr<ThreadManager::Task> task = tasks_.front();
520 tasks_.pop();
521
522 return task->getRunnable();
523}
524
David Reiss068f4162010-03-09 05:19:45 +0000525void ThreadManager::Impl::removeExpiredTasks() {
526 int64_t now = 0LL; // we won't ask for the time untile we need it
527
528 // note that this loop breaks at the first non-expiring task
529 while (!tasks_.empty()) {
530 shared_ptr<ThreadManager::Task> task = tasks_.front();
531 if (task->getExpireTime() == 0LL) {
532 break;
533 }
534 if (now == 0LL) {
535 now = Util::currentTime();
536 }
537 if (task->getExpireTime() > now) {
538 break;
539 }
540 if (expireCallback_) {
541 expireCallback_(task->getRunnable());
542 }
543 tasks_.pop();
544 expiredCount_++;
545 }
546}
547
548
549void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
550 expireCallback_ = expireCallback;
551}
552
Marc Slemkod466b212006-07-20 00:04:18 +0000553class SimpleThreadManager : public ThreadManager::Impl {
Marc Slemko66949872006-07-15 01:52:39 +0000554
Mark Slee2782d6d2007-05-23 04:55:30 +0000555 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000556 SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000557 workerCount_(workerCount),
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000558 pendingTaskCountMax_(pendingTaskCountMax),
Mark Slee2f6404d2006-10-10 01:37:40 +0000559 firstTime_(true) {
Marc Slemkod466b212006-07-20 00:04:18 +0000560 }
Marc Slemko66949872006-07-15 01:52:39 +0000561
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000562 void start() {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000563 ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000564 ThreadManager::Impl::start();
Mark Slee2f6404d2006-10-10 01:37:40 +0000565 addWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000566 }
567
Mark Slee2782d6d2007-05-23 04:55:30 +0000568 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000569 const size_t workerCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000570 const size_t pendingTaskCountMax_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000571 bool firstTime_;
572 Monitor monitor_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000573};
Marc Slemko66949872006-07-15 01:52:39 +0000574
Marc Slemko66949872006-07-15 01:52:39 +0000575
Marc Slemko6f038a72006-08-03 18:58:09 +0000576shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
577 return shared_ptr<ThreadManager>(new ThreadManager::Impl());
Marc Slemkod466b212006-07-20 00:04:18 +0000578}
Marc Slemko66949872006-07-15 01:52:39 +0000579
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000580shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) {
581 return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
Marc Slemkod466b212006-07-20 00:04:18 +0000582}
Marc Slemko66949872006-07-15 01:52:39 +0000583
T Jake Lucianib5e62212009-01-31 22:36:20 +0000584}}} // apache::thrift::concurrency
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000585