blob: abfcf6e7057530e34e3e5695744e670e8f9cc496 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Marc Slemko66949872006-07-15 01:52:39 +000020#include "ThreadManager.h"
Marc Slemkod466b212006-07-20 00:04:18 +000021#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +000022#include "Monitor.h"
Marc Slemko66949872006-07-15 01:52:39 +000023
Marc Slemko6f038a72006-08-03 18:58:09 +000024#include <boost/shared_ptr.hpp>
25
Marc Slemko66949872006-07-15 01:52:39 +000026#include <assert.h>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000027#include <queue>
28#include <set>
Marc Slemko66949872006-07-15 01:52:39 +000029
Marc Slemko6f038a72006-08-03 18:58:09 +000030#if defined(DEBUG)
31#include <iostream>
32#endif //defined(DEBUG)
33
T Jake Lucianib5e62212009-01-31 22:36:20 +000034namespace apache { namespace thrift { namespace concurrency {
Marc Slemko66949872006-07-15 01:52:39 +000035
David Reissd4a269c2007-08-23 02:37:19 +000036using boost::shared_ptr;
37using boost::dynamic_pointer_cast;
Marc Slemko6f038a72006-08-03 18:58:09 +000038
Mark Sleef5f2be42006-09-05 21:05:31 +000039/**
40 * ThreadManager class
Marc Slemko3a3b53b2007-05-22 23:59:54 +000041 *
Mark Sleef5f2be42006-09-05 21:05:31 +000042 * This class manages a pool of threads. It uses a ThreadFactory to create
43 * threads. It never actually creates or destroys worker threads, rather
44 * it maintains statistics on number of idle threads, number of active threads,
45 * task backlog, and average wait and service times.
46 *
Mark Sleef5f2be42006-09-05 21:05:31 +000047 * @version $Id:$
48 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000049class ThreadManager::Impl : public ThreadManager {
50
51 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000052 Impl() :
Mark Slee2f6404d2006-10-10 01:37:40 +000053 workerCount_(0),
54 workerMaxCount_(0),
55 idleCount_(0),
Marc Slemko3a3b53b2007-05-22 23:59:54 +000056 pendingTaskCountMax_(0),
Mark Slee2f6404d2006-10-10 01:37:40 +000057 state_(ThreadManager::UNINITIALIZED) {}
Marc Slemkod466b212006-07-20 00:04:18 +000058
Mark Sleef5f2be42006-09-05 21:05:31 +000059 ~Impl() { stop(); }
Marc Slemkod466b212006-07-20 00:04:18 +000060
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000061 void start();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000062
Mark Slee7c10eaf2007-03-01 02:45:10 +000063 void stop() { stopImpl(false); }
64
65 void join() { stopImpl(true); }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000066
Mark Slee2f6404d2006-10-10 01:37:40 +000067 const ThreadManager::STATE state() const {
68 return state_;
69 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000070
Marc Slemko6f038a72006-08-03 18:58:09 +000071 shared_ptr<ThreadFactory> threadFactory() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000072 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000073 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000074 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000075
76 void threadFactory(shared_ptr<ThreadFactory> value) {
Mark Slee2f6404d2006-10-10 01:37:40 +000077 Synchronized s(monitor_);
78 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000079 }
80
Marc Slemkod466b212006-07-20 00:04:18 +000081 void addWorker(size_t value);
Marc Slemko3a3b53b2007-05-22 23:59:54 +000082
Marc Slemkod466b212006-07-20 00:04:18 +000083 void removeWorker(size_t value);
Marc Slemko3a3b53b2007-05-22 23:59:54 +000084
Mark Slee2f6404d2006-10-10 01:37:40 +000085 size_t idleWorkerCount() const {
86 return idleCount_;
87 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000088
89 size_t workerCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000090 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000091 return workerCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000092 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000093
Marc Slemko0e53ccd2006-07-17 23:51:05 +000094 size_t pendingTaskCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000095 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000096 return tasks_.size();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000097 }
98
99 size_t totalTaskCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000100 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000101 return tasks_.size() + workerCount_ - idleCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000102 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000103
104 size_t pendingTaskCountMax() const {
105 Synchronized s(monitor_);
106 return pendingTaskCountMax_;
107 }
108
109 void pendingTaskCountMax(const size_t value) {
110 Synchronized s(monitor_);
111 pendingTaskCountMax_ = value;
112 }
113
114 bool canSleep();
115
Mark Slee9b82d272007-05-23 05:16:07 +0000116 void add(shared_ptr<Runnable> value, int64_t timeout);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000117
Marc Slemko6f038a72006-08-03 18:58:09 +0000118 void remove(shared_ptr<Runnable> task);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000119
120private:
Mark Slee7c10eaf2007-03-01 02:45:10 +0000121 void stopImpl(bool join);
122
Mark Slee2f6404d2006-10-10 01:37:40 +0000123 size_t workerCount_;
124 size_t workerMaxCount_;
125 size_t idleCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000126 size_t pendingTaskCountMax_;
127
Mark Slee2f6404d2006-10-10 01:37:40 +0000128 ThreadManager::STATE state_;
129 shared_ptr<ThreadFactory> threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000130
Mark Sleef5f2be42006-09-05 21:05:31 +0000131
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000132 friend class ThreadManager::Task;
Mark Slee2f6404d2006-10-10 01:37:40 +0000133 std::queue<shared_ptr<Task> > tasks_;
134 Monitor monitor_;
135 Monitor workerMonitor_;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000136
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000137 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000138 std::set<shared_ptr<Thread> > workers_;
139 std::set<shared_ptr<Thread> > deadWorkers_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000140 std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000141};
Marc Slemko66949872006-07-15 01:52:39 +0000142
143class ThreadManager::Task : public Runnable {
144
Mark Sleef5f2be42006-09-05 21:05:31 +0000145 public:
Marc Slemko66949872006-07-15 01:52:39 +0000146 enum STATE {
147 WAITING,
148 EXECUTING,
149 CANCELLED,
150 COMPLETE
151 };
152
Marc Slemko6f038a72006-08-03 18:58:09 +0000153 Task(shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000154 runnable_(runnable),
155 state_(WAITING) {}
Marc Slemko66949872006-07-15 01:52:39 +0000156
Mark Sleef5f2be42006-09-05 21:05:31 +0000157 ~Task() {}
Marc Slemko66949872006-07-15 01:52:39 +0000158
159 void run() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000160 if (state_ == EXECUTING) {
161 runnable_->run();
162 state_ = COMPLETE;
Marc Slemko66949872006-07-15 01:52:39 +0000163 }
164 }
165
166 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000167 shared_ptr<Runnable> runnable_;
Marc Slemkod466b212006-07-20 00:04:18 +0000168 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000169 STATE state_;
Marc Slemko66949872006-07-15 01:52:39 +0000170};
171
172class ThreadManager::Worker: public Runnable {
Marc Slemko66949872006-07-15 01:52:39 +0000173 enum STATE {
174 UNINITIALIZED,
175 STARTING,
176 STARTED,
177 STOPPING,
178 STOPPED
179 };
180
181 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000182 Worker(ThreadManager::Impl* manager) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000183 manager_(manager),
184 state_(UNINITIALIZED),
185 idle_(false) {}
Marc Slemko66949872006-07-15 01:52:39 +0000186
187 ~Worker() {}
188
Mark Slee7c10eaf2007-03-01 02:45:10 +0000189 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000190 bool isActive() const {
Mark Slee7c10eaf2007-03-01 02:45:10 +0000191 return
192 (manager_->workerCount_ <= manager_->workerMaxCount_) ||
193 (manager_->state_ == JOINING && !manager_->tasks_.empty());
Mark Slee2f6404d2006-10-10 01:37:40 +0000194 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000195
Mark Slee7c10eaf2007-03-01 02:45:10 +0000196 public:
Mark Sleef5f2be42006-09-05 21:05:31 +0000197 /**
198 * Worker entry point
199 *
200 * As long as worker thread is running, pull tasks off the task queue and
201 * execute.
202 */
Marc Slemko66949872006-07-15 01:52:39 +0000203 void run() {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000204 bool active = false;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000205 bool notifyManager = false;
206
Mark Sleef5f2be42006-09-05 21:05:31 +0000207 /**
208 * Increment worker semaphore and notify manager if worker count reached
209 * desired max
210 *
211 * Note: We have to release the monitor and acquire the workerMonitor
212 * since that is what the manager blocks on for worker add/remove
213 */
214 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000215 Synchronized s(manager_->monitor_);
216 active = manager_->workerCount_ < manager_->workerMaxCount_;
Mark Sleef5f2be42006-09-05 21:05:31 +0000217 if (active) {
David Reiss96d23882007-07-26 21:10:32 +0000218 manager_->workerCount_++;
219 notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
Marc Slemko66949872006-07-15 01:52:39 +0000220 }
221 }
222
Mark Sleef5f2be42006-09-05 21:05:31 +0000223 if (notifyManager) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000224 Synchronized s(manager_->workerMonitor_);
225 manager_->workerMonitor_.notify();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000226 notifyManager = false;
227 }
228
Mark Sleef5f2be42006-09-05 21:05:31 +0000229 while (active) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000230 shared_ptr<ThreadManager::Task> task;
Marc Slemko66949872006-07-15 01:52:39 +0000231
Mark Sleef5f2be42006-09-05 21:05:31 +0000232 /**
233 * While holding manager monitor block for non-empty task queue (Also
234 * check that the thread hasn't been requested to stop). Once the queue
235 * is non-empty, dequeue a task, release monitor, and execute. If the
236 * worker max count has been decremented such that we exceed it, mark
237 * ourself inactive, decrement the worker count and notify the manager
238 * (technically we're notifying the next blocked thread but eventually
239 * the manager will see it.
240 */
241 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000242 Synchronized s(manager_->monitor_);
David Reiss96d23882007-07-26 21:10:32 +0000243 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000244
David Reiss96d23882007-07-26 21:10:32 +0000245 while (active && manager_->tasks_.empty()) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000246 manager_->idleCount_++;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000247 idle_ = true;
Mark Slee2f6404d2006-10-10 01:37:40 +0000248 manager_->monitor_.wait();
Mark Sleef5f2be42006-09-05 21:05:31 +0000249 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000250 idle_ = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000251 manager_->idleCount_--;
David Reiss96d23882007-07-26 21:10:32 +0000252 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000253
David Reiss96d23882007-07-26 21:10:32 +0000254 if (active) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000255 if (!manager_->tasks_.empty()) {
256 task = manager_->tasks_.front();
257 manager_->tasks_.pop();
258 if (task->state_ == ThreadManager::Task::WAITING) {
259 task->state_ = ThreadManager::Task::EXECUTING;
David Reiss96d23882007-07-26 21:10:32 +0000260 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000261
262 /* If we have a pending task max and we just dropped below it, wakeup any
263 thread that might be blocked on add. */
Mark Slee2782d6d2007-05-23 04:55:30 +0000264 if (manager_->pendingTaskCountMax_ != 0 &&
265 manager_->tasks_.size() == manager_->pendingTaskCountMax_ - 1) {
David Reisse3a64922008-06-10 22:55:04 +0000266 manager_->monitor_.notify();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000267 }
David Reiss96d23882007-07-26 21:10:32 +0000268 }
269 } else {
270 idle_ = true;
271 manager_->workerCount_--;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000272 notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
David Reiss96d23882007-07-26 21:10:32 +0000273 }
Marc Slemko66949872006-07-15 01:52:39 +0000274 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000275
Mark Sleef5f2be42006-09-05 21:05:31 +0000276 if (task != NULL) {
David Reiss96d23882007-07-26 21:10:32 +0000277 if (task->state_ == ThreadManager::Task::EXECUTING) {
278 try {
Mark Sleef5f2be42006-09-05 21:05:31 +0000279 task->run();
280 } catch(...) {
281 // XXX need to log this
David Reiss96d23882007-07-26 21:10:32 +0000282 }
283 }
Marc Slemko66949872006-07-15 01:52:39 +0000284 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000285 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000286
Mark Sleef5f2be42006-09-05 21:05:31 +0000287 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000288 Synchronized s(manager_->workerMonitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000289 manager_->deadWorkers_.insert(this->thread());
Mark Sleef5f2be42006-09-05 21:05:31 +0000290 if (notifyManager) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000291 manager_->workerMonitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000292 }
293 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000294
Marc Slemko66949872006-07-15 01:52:39 +0000295 return;
296 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000297
Mark Sleef5f2be42006-09-05 21:05:31 +0000298 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000299 ThreadManager::Impl* manager_;
Mark Sleef5f2be42006-09-05 21:05:31 +0000300 friend class ThreadManager::Impl;
Mark Slee2f6404d2006-10-10 01:37:40 +0000301 STATE state_;
302 bool idle_;
Marc Slemko66949872006-07-15 01:52:39 +0000303};
304
Mark Sleef5f2be42006-09-05 21:05:31 +0000305
306 void ThreadManager::Impl::addWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000307 std::set<shared_ptr<Thread> > newThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000308 for (size_t ix = 0; ix < value; ix++) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000309 class ThreadManager::Worker;
Marc Slemko6f038a72006-08-03 18:58:09 +0000310 shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
Mark Slee2f6404d2006-10-10 01:37:40 +0000311 newThreads.insert(threadFactory_->newThread(worker));
Marc Slemko66949872006-07-15 01:52:39 +0000312 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000313
Mark Sleef5f2be42006-09-05 21:05:31 +0000314 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000315 Synchronized s(monitor_);
316 workerMaxCount_ += value;
317 workers_.insert(newThreads.begin(), newThreads.end());
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000318 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000319
Mark Sleef5f2be42006-09-05 21:05:31 +0000320 for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000321 shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
Mark Slee2f6404d2006-10-10 01:37:40 +0000322 worker->state_ = ThreadManager::Worker::STARTING;
Marc Slemkod466b212006-07-20 00:04:18 +0000323 (*ix)->start();
Marc Slemkoa6479032007-06-05 22:20:14 +0000324 idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
Marc Slemkod466b212006-07-20 00:04:18 +0000325 }
326
Mark Sleef5f2be42006-09-05 21:05:31 +0000327 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000328 Synchronized s(workerMonitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000329 while (workerCount_ != workerMaxCount_) {
330 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000331 }
332 }
333}
Marc Slemkod466b212006-07-20 00:04:18 +0000334
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000335void ThreadManager::Impl::start() {
336
Mark Slee2f6404d2006-10-10 01:37:40 +0000337 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000338 return;
339 }
340
Mark Sleef5f2be42006-09-05 21:05:31 +0000341 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000342 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000343 if (state_ == ThreadManager::UNINITIALIZED) {
344 if (threadFactory_ == NULL) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000345 throw InvalidArgumentException();
346 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000347 state_ = ThreadManager::STARTED;
348 monitor_.notifyAll();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000349 }
350
Mark Slee2f6404d2006-10-10 01:37:40 +0000351 while (state_ == STARTING) {
352 monitor_.wait();
Marc Slemkod466b212006-07-20 00:04:18 +0000353 }
354 }
355}
356
Mark Slee7c10eaf2007-03-01 02:45:10 +0000357void ThreadManager::Impl::stopImpl(bool join) {
Marc Slemkod466b212006-07-20 00:04:18 +0000358 bool doStop = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000359 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000360 return;
361 }
362
Mark Sleef5f2be42006-09-05 21:05:31 +0000363 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000364 Synchronized s(monitor_);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000365 if (state_ != ThreadManager::STOPPING &&
366 state_ != ThreadManager::JOINING &&
367 state_ != ThreadManager::STOPPED) {
Marc Slemkod466b212006-07-20 00:04:18 +0000368 doStop = true;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000369 state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
Marc Slemkod466b212006-07-20 00:04:18 +0000370 }
371 }
372
Mark Sleef5f2be42006-09-05 21:05:31 +0000373 if (doStop) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000374 removeWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000375 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000376
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000377 // XXX
Mark Sleef5f2be42006-09-05 21:05:31 +0000378 // should be able to block here for transition to STOPPED since we're no
379 // using shared_ptrs
Mark Slee7c10eaf2007-03-01 02:45:10 +0000380
381 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000382 Synchronized s(monitor_);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000383 state_ = ThreadManager::STOPPED;
384 }
385
Marc Slemkod466b212006-07-20 00:04:18 +0000386}
Mark Slee7c10eaf2007-03-01 02:45:10 +0000387
Marc Slemkod466b212006-07-20 00:04:18 +0000388void ThreadManager::Impl::removeWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000389 std::set<shared_ptr<Thread> > removedThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000390 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000391 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000392 if (value > workerMaxCount_) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000393 throw InvalidArgumentException();
394 }
395
Mark Slee2f6404d2006-10-10 01:37:40 +0000396 workerMaxCount_ -= value;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000397
Mark Slee2f6404d2006-10-10 01:37:40 +0000398 if (idleCount_ < value) {
399 for (size_t ix = 0; ix < idleCount_; ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000400 monitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000401 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000402 } else {
Mark Slee2f6404d2006-10-10 01:37:40 +0000403 monitor_.notifyAll();
Marc Slemko66949872006-07-15 01:52:39 +0000404 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000405 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000406
Mark Sleef5f2be42006-09-05 21:05:31 +0000407 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000408 Synchronized s(workerMonitor_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000409
Mark Slee2f6404d2006-10-10 01:37:40 +0000410 while (workerCount_ != workerMaxCount_) {
411 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000412 }
413
Mark Slee2f6404d2006-10-10 01:37:40 +0000414 for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
415 workers_.erase(*ix);
Marc Slemkoa6479032007-06-05 22:20:14 +0000416 idMap_.erase((*ix)->getId());
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000417 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000418
Mark Slee2f6404d2006-10-10 01:37:40 +0000419 deadWorkers_.clear();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000420 }
421}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000422
423 bool ThreadManager::Impl::canSleep() {
Marc Slemkoa6479032007-06-05 22:20:14 +0000424 const Thread::id_t id = threadFactory_->getCurrentThreadId();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000425 return idMap_.find(id) == idMap_.end();
426 }
427
Mark Slee9b82d272007-05-23 05:16:07 +0000428 void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000429 Synchronized s(monitor_);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000430
Mark Slee2f6404d2006-10-10 01:37:40 +0000431 if (state_ != ThreadManager::STARTED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000432 throw IllegalStateException();
433 }
Marc Slemkod466b212006-07-20 00:04:18 +0000434
Mark Slee2782d6d2007-05-23 04:55:30 +0000435 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
Aditya Agarwal4b6ff2d2007-12-25 22:58:50 +0000436 if (canSleep() && timeout >= 0) {
Mark Slee2782d6d2007-05-23 04:55:30 +0000437 while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000438 monitor_.wait(timeout);
439 }
440 } else {
441 throw TooManyPendingTasksException();
442 }
443 }
444
Mark Slee2f6404d2006-10-10 01:37:40 +0000445 tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000446
Mark Sleef5f2be42006-09-05 21:05:31 +0000447 // If idle thread is available notify it, otherwise all worker threads are
448 // running and will get around to this task in time.
Mark Slee2f6404d2006-10-10 01:37:40 +0000449 if (idleCount_ > 0) {
450 monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000451 }
Marc Slemko66949872006-07-15 01:52:39 +0000452 }
453
Marc Slemko6f038a72006-08-03 18:58:09 +0000454void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000455 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000456 if (state_ != ThreadManager::STARTED) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000457 throw IllegalStateException();
458 }
Marc Slemko66949872006-07-15 01:52:39 +0000459}
460
Marc Slemkod466b212006-07-20 00:04:18 +0000461class SimpleThreadManager : public ThreadManager::Impl {
Marc Slemko66949872006-07-15 01:52:39 +0000462
Mark Slee2782d6d2007-05-23 04:55:30 +0000463 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000464 SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000465 workerCount_(workerCount),
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000466 pendingTaskCountMax_(pendingTaskCountMax),
Mark Slee2f6404d2006-10-10 01:37:40 +0000467 firstTime_(true) {
Marc Slemkod466b212006-07-20 00:04:18 +0000468 }
Marc Slemko66949872006-07-15 01:52:39 +0000469
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000470 void start() {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000471 ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000472 ThreadManager::Impl::start();
Mark Slee2f6404d2006-10-10 01:37:40 +0000473 addWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000474 }
475
Mark Slee2782d6d2007-05-23 04:55:30 +0000476 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000477 const size_t workerCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000478 const size_t pendingTaskCountMax_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000479 bool firstTime_;
480 Monitor monitor_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000481};
Marc Slemko66949872006-07-15 01:52:39 +0000482
Marc Slemko66949872006-07-15 01:52:39 +0000483
Marc Slemko6f038a72006-08-03 18:58:09 +0000484shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
485 return shared_ptr<ThreadManager>(new ThreadManager::Impl());
Marc Slemkod466b212006-07-20 00:04:18 +0000486}
Marc Slemko66949872006-07-15 01:52:39 +0000487
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000488shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) {
489 return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
Marc Slemkod466b212006-07-20 00:04:18 +0000490}
Marc Slemko66949872006-07-15 01:52:39 +0000491
T Jake Lucianib5e62212009-01-31 22:36:20 +0000492}}} // apache::thrift::concurrency
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000493