blob: 7bba0e665c3627832787858f6fc48d4e665eb696 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Marc Slemko66949872006-07-15 01:52:39 +000020#include "ThreadManager.h"
Marc Slemkod466b212006-07-20 00:04:18 +000021#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +000022#include "Monitor.h"
Marc Slemko66949872006-07-15 01:52:39 +000023
Marc Slemko6f038a72006-08-03 18:58:09 +000024#include <boost/shared_ptr.hpp>
25
Marc Slemko66949872006-07-15 01:52:39 +000026#include <assert.h>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000027#include <queue>
28#include <set>
Marc Slemko66949872006-07-15 01:52:39 +000029
Marc Slemko6f038a72006-08-03 18:58:09 +000030#if defined(DEBUG)
31#include <iostream>
32#endif //defined(DEBUG)
33
T Jake Lucianib5e62212009-01-31 22:36:20 +000034namespace apache { namespace thrift { namespace concurrency {
Marc Slemko66949872006-07-15 01:52:39 +000035
David Reissd4a269c2007-08-23 02:37:19 +000036using boost::shared_ptr;
37using boost::dynamic_pointer_cast;
Marc Slemko6f038a72006-08-03 18:58:09 +000038
Mark Sleef5f2be42006-09-05 21:05:31 +000039/**
40 * ThreadManager class
Marc Slemko3a3b53b2007-05-22 23:59:54 +000041 *
Mark Sleef5f2be42006-09-05 21:05:31 +000042 * This class manages a pool of threads. It uses a ThreadFactory to create
43 * threads. It never actually creates or destroys worker threads, rather
44 * it maintains statistics on number of idle threads, number of active threads,
45 * task backlog, and average wait and service times.
46 *
Mark Sleef5f2be42006-09-05 21:05:31 +000047 * @version $Id:$
48 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000049class ThreadManager::Impl : public ThreadManager {
50
51 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000052 Impl() :
Mark Slee2f6404d2006-10-10 01:37:40 +000053 workerCount_(0),
54 workerMaxCount_(0),
55 idleCount_(0),
Marc Slemko3a3b53b2007-05-22 23:59:54 +000056 pendingTaskCountMax_(0),
Mark Slee2f6404d2006-10-10 01:37:40 +000057 state_(ThreadManager::UNINITIALIZED) {}
Marc Slemkod466b212006-07-20 00:04:18 +000058
Mark Sleef5f2be42006-09-05 21:05:31 +000059 ~Impl() { stop(); }
Marc Slemkod466b212006-07-20 00:04:18 +000060
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000061 void start();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000062
Mark Slee7c10eaf2007-03-01 02:45:10 +000063 void stop() { stopImpl(false); }
64
65 void join() { stopImpl(true); }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000066
Mark Slee2f6404d2006-10-10 01:37:40 +000067 const ThreadManager::STATE state() const {
68 return state_;
69 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000070
Marc Slemko6f038a72006-08-03 18:58:09 +000071 shared_ptr<ThreadFactory> threadFactory() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000072 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000073 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000074 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000075
76 void threadFactory(shared_ptr<ThreadFactory> value) {
Mark Slee2f6404d2006-10-10 01:37:40 +000077 Synchronized s(monitor_);
78 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000079 }
80
Marc Slemkod466b212006-07-20 00:04:18 +000081 void addWorker(size_t value);
Marc Slemko3a3b53b2007-05-22 23:59:54 +000082
Marc Slemkod466b212006-07-20 00:04:18 +000083 void removeWorker(size_t value);
Marc Slemko3a3b53b2007-05-22 23:59:54 +000084
Mark Slee2f6404d2006-10-10 01:37:40 +000085 size_t idleWorkerCount() const {
86 return idleCount_;
87 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000088
89 size_t workerCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000090 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000091 return workerCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000092 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000093
Marc Slemko0e53ccd2006-07-17 23:51:05 +000094 size_t pendingTaskCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000095 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000096 return tasks_.size();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000097 }
98
99 size_t totalTaskCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000100 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000101 return tasks_.size() + workerCount_ - idleCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000102 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000103
104 size_t pendingTaskCountMax() const {
105 Synchronized s(monitor_);
106 return pendingTaskCountMax_;
107 }
108
109 void pendingTaskCountMax(const size_t value) {
110 Synchronized s(monitor_);
111 pendingTaskCountMax_ = value;
112 }
113
114 bool canSleep();
115
Mark Slee9b82d272007-05-23 05:16:07 +0000116 void add(shared_ptr<Runnable> value, int64_t timeout);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000117
Marc Slemko6f038a72006-08-03 18:58:09 +0000118 void remove(shared_ptr<Runnable> task);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000119
David Reiss01fe1532010-03-09 05:19:25 +0000120 shared_ptr<Runnable> removeNextPending();
121
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000122private:
Mark Slee7c10eaf2007-03-01 02:45:10 +0000123 void stopImpl(bool join);
124
Mark Slee2f6404d2006-10-10 01:37:40 +0000125 size_t workerCount_;
126 size_t workerMaxCount_;
127 size_t idleCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000128 size_t pendingTaskCountMax_;
129
Mark Slee2f6404d2006-10-10 01:37:40 +0000130 ThreadManager::STATE state_;
131 shared_ptr<ThreadFactory> threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000132
Mark Sleef5f2be42006-09-05 21:05:31 +0000133
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000134 friend class ThreadManager::Task;
Mark Slee2f6404d2006-10-10 01:37:40 +0000135 std::queue<shared_ptr<Task> > tasks_;
136 Monitor monitor_;
137 Monitor workerMonitor_;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000138
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000139 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000140 std::set<shared_ptr<Thread> > workers_;
141 std::set<shared_ptr<Thread> > deadWorkers_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000142 std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000143};
Marc Slemko66949872006-07-15 01:52:39 +0000144
145class ThreadManager::Task : public Runnable {
146
Mark Sleef5f2be42006-09-05 21:05:31 +0000147 public:
Marc Slemko66949872006-07-15 01:52:39 +0000148 enum STATE {
149 WAITING,
150 EXECUTING,
151 CANCELLED,
152 COMPLETE
153 };
154
Marc Slemko6f038a72006-08-03 18:58:09 +0000155 Task(shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000156 runnable_(runnable),
157 state_(WAITING) {}
Marc Slemko66949872006-07-15 01:52:39 +0000158
Mark Sleef5f2be42006-09-05 21:05:31 +0000159 ~Task() {}
Marc Slemko66949872006-07-15 01:52:39 +0000160
161 void run() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000162 if (state_ == EXECUTING) {
163 runnable_->run();
164 state_ = COMPLETE;
Marc Slemko66949872006-07-15 01:52:39 +0000165 }
166 }
167
David Reiss01fe1532010-03-09 05:19:25 +0000168 shared_ptr<Runnable> getRunnable() {
169 return runnable_;
170 }
171
Marc Slemko66949872006-07-15 01:52:39 +0000172 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000173 shared_ptr<Runnable> runnable_;
Marc Slemkod466b212006-07-20 00:04:18 +0000174 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000175 STATE state_;
Marc Slemko66949872006-07-15 01:52:39 +0000176};
177
178class ThreadManager::Worker: public Runnable {
Marc Slemko66949872006-07-15 01:52:39 +0000179 enum STATE {
180 UNINITIALIZED,
181 STARTING,
182 STARTED,
183 STOPPING,
184 STOPPED
185 };
186
187 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000188 Worker(ThreadManager::Impl* manager) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000189 manager_(manager),
190 state_(UNINITIALIZED),
191 idle_(false) {}
Marc Slemko66949872006-07-15 01:52:39 +0000192
193 ~Worker() {}
194
Mark Slee7c10eaf2007-03-01 02:45:10 +0000195 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000196 bool isActive() const {
Mark Slee7c10eaf2007-03-01 02:45:10 +0000197 return
198 (manager_->workerCount_ <= manager_->workerMaxCount_) ||
199 (manager_->state_ == JOINING && !manager_->tasks_.empty());
Mark Slee2f6404d2006-10-10 01:37:40 +0000200 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000201
Mark Slee7c10eaf2007-03-01 02:45:10 +0000202 public:
Mark Sleef5f2be42006-09-05 21:05:31 +0000203 /**
204 * Worker entry point
205 *
206 * As long as worker thread is running, pull tasks off the task queue and
207 * execute.
208 */
Marc Slemko66949872006-07-15 01:52:39 +0000209 void run() {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000210 bool active = false;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000211 bool notifyManager = false;
212
Mark Sleef5f2be42006-09-05 21:05:31 +0000213 /**
214 * Increment worker semaphore and notify manager if worker count reached
215 * desired max
216 *
217 * Note: We have to release the monitor and acquire the workerMonitor
218 * since that is what the manager blocks on for worker add/remove
219 */
220 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000221 Synchronized s(manager_->monitor_);
222 active = manager_->workerCount_ < manager_->workerMaxCount_;
Mark Sleef5f2be42006-09-05 21:05:31 +0000223 if (active) {
David Reiss96d23882007-07-26 21:10:32 +0000224 manager_->workerCount_++;
225 notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
Marc Slemko66949872006-07-15 01:52:39 +0000226 }
227 }
228
Mark Sleef5f2be42006-09-05 21:05:31 +0000229 if (notifyManager) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000230 Synchronized s(manager_->workerMonitor_);
231 manager_->workerMonitor_.notify();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000232 notifyManager = false;
233 }
234
Mark Sleef5f2be42006-09-05 21:05:31 +0000235 while (active) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000236 shared_ptr<ThreadManager::Task> task;
Marc Slemko66949872006-07-15 01:52:39 +0000237
Mark Sleef5f2be42006-09-05 21:05:31 +0000238 /**
239 * While holding manager monitor block for non-empty task queue (Also
240 * check that the thread hasn't been requested to stop). Once the queue
241 * is non-empty, dequeue a task, release monitor, and execute. If the
242 * worker max count has been decremented such that we exceed it, mark
243 * ourself inactive, decrement the worker count and notify the manager
244 * (technically we're notifying the next blocked thread but eventually
245 * the manager will see it.
246 */
247 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000248 Synchronized s(manager_->monitor_);
David Reiss96d23882007-07-26 21:10:32 +0000249 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000250
David Reiss96d23882007-07-26 21:10:32 +0000251 while (active && manager_->tasks_.empty()) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000252 manager_->idleCount_++;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000253 idle_ = true;
Mark Slee2f6404d2006-10-10 01:37:40 +0000254 manager_->monitor_.wait();
Mark Sleef5f2be42006-09-05 21:05:31 +0000255 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000256 idle_ = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000257 manager_->idleCount_--;
David Reiss96d23882007-07-26 21:10:32 +0000258 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000259
David Reiss96d23882007-07-26 21:10:32 +0000260 if (active) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000261 if (!manager_->tasks_.empty()) {
262 task = manager_->tasks_.front();
263 manager_->tasks_.pop();
264 if (task->state_ == ThreadManager::Task::WAITING) {
265 task->state_ = ThreadManager::Task::EXECUTING;
David Reiss96d23882007-07-26 21:10:32 +0000266 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000267
268 /* If we have a pending task max and we just dropped below it, wakeup any
269 thread that might be blocked on add. */
Mark Slee2782d6d2007-05-23 04:55:30 +0000270 if (manager_->pendingTaskCountMax_ != 0 &&
271 manager_->tasks_.size() == manager_->pendingTaskCountMax_ - 1) {
David Reisse3a64922008-06-10 22:55:04 +0000272 manager_->monitor_.notify();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000273 }
David Reiss96d23882007-07-26 21:10:32 +0000274 }
275 } else {
276 idle_ = true;
277 manager_->workerCount_--;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000278 notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
David Reiss96d23882007-07-26 21:10:32 +0000279 }
Marc Slemko66949872006-07-15 01:52:39 +0000280 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000281
Mark Sleef5f2be42006-09-05 21:05:31 +0000282 if (task != NULL) {
David Reiss96d23882007-07-26 21:10:32 +0000283 if (task->state_ == ThreadManager::Task::EXECUTING) {
284 try {
Mark Sleef5f2be42006-09-05 21:05:31 +0000285 task->run();
286 } catch(...) {
287 // XXX need to log this
David Reiss96d23882007-07-26 21:10:32 +0000288 }
289 }
Marc Slemko66949872006-07-15 01:52:39 +0000290 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000291 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000292
Mark Sleef5f2be42006-09-05 21:05:31 +0000293 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000294 Synchronized s(manager_->workerMonitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000295 manager_->deadWorkers_.insert(this->thread());
Mark Sleef5f2be42006-09-05 21:05:31 +0000296 if (notifyManager) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000297 manager_->workerMonitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000298 }
299 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000300
Marc Slemko66949872006-07-15 01:52:39 +0000301 return;
302 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000303
Mark Sleef5f2be42006-09-05 21:05:31 +0000304 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000305 ThreadManager::Impl* manager_;
Mark Sleef5f2be42006-09-05 21:05:31 +0000306 friend class ThreadManager::Impl;
Mark Slee2f6404d2006-10-10 01:37:40 +0000307 STATE state_;
308 bool idle_;
Marc Slemko66949872006-07-15 01:52:39 +0000309};
310
Mark Sleef5f2be42006-09-05 21:05:31 +0000311
312 void ThreadManager::Impl::addWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000313 std::set<shared_ptr<Thread> > newThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000314 for (size_t ix = 0; ix < value; ix++) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000315 class ThreadManager::Worker;
Marc Slemko6f038a72006-08-03 18:58:09 +0000316 shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
Mark Slee2f6404d2006-10-10 01:37:40 +0000317 newThreads.insert(threadFactory_->newThread(worker));
Marc Slemko66949872006-07-15 01:52:39 +0000318 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000319
Mark Sleef5f2be42006-09-05 21:05:31 +0000320 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000321 Synchronized s(monitor_);
322 workerMaxCount_ += value;
323 workers_.insert(newThreads.begin(), newThreads.end());
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000324 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000325
Mark Sleef5f2be42006-09-05 21:05:31 +0000326 for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000327 shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
Mark Slee2f6404d2006-10-10 01:37:40 +0000328 worker->state_ = ThreadManager::Worker::STARTING;
Marc Slemkod466b212006-07-20 00:04:18 +0000329 (*ix)->start();
Marc Slemkoa6479032007-06-05 22:20:14 +0000330 idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
Marc Slemkod466b212006-07-20 00:04:18 +0000331 }
332
Mark Sleef5f2be42006-09-05 21:05:31 +0000333 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000334 Synchronized s(workerMonitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000335 while (workerCount_ != workerMaxCount_) {
336 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000337 }
338 }
339}
Marc Slemkod466b212006-07-20 00:04:18 +0000340
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000341void ThreadManager::Impl::start() {
342
Mark Slee2f6404d2006-10-10 01:37:40 +0000343 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000344 return;
345 }
346
Mark Sleef5f2be42006-09-05 21:05:31 +0000347 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000348 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000349 if (state_ == ThreadManager::UNINITIALIZED) {
350 if (threadFactory_ == NULL) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000351 throw InvalidArgumentException();
352 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000353 state_ = ThreadManager::STARTED;
354 monitor_.notifyAll();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000355 }
356
Mark Slee2f6404d2006-10-10 01:37:40 +0000357 while (state_ == STARTING) {
358 monitor_.wait();
Marc Slemkod466b212006-07-20 00:04:18 +0000359 }
360 }
361}
362
Mark Slee7c10eaf2007-03-01 02:45:10 +0000363void ThreadManager::Impl::stopImpl(bool join) {
Marc Slemkod466b212006-07-20 00:04:18 +0000364 bool doStop = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000365 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000366 return;
367 }
368
Mark Sleef5f2be42006-09-05 21:05:31 +0000369 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000370 Synchronized s(monitor_);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000371 if (state_ != ThreadManager::STOPPING &&
372 state_ != ThreadManager::JOINING &&
373 state_ != ThreadManager::STOPPED) {
Marc Slemkod466b212006-07-20 00:04:18 +0000374 doStop = true;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000375 state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
Marc Slemkod466b212006-07-20 00:04:18 +0000376 }
377 }
378
Mark Sleef5f2be42006-09-05 21:05:31 +0000379 if (doStop) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000380 removeWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000381 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000382
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000383 // XXX
Mark Sleef5f2be42006-09-05 21:05:31 +0000384 // should be able to block here for transition to STOPPED since we're no
385 // using shared_ptrs
Mark Slee7c10eaf2007-03-01 02:45:10 +0000386
387 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000388 Synchronized s(monitor_);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000389 state_ = ThreadManager::STOPPED;
390 }
391
Marc Slemkod466b212006-07-20 00:04:18 +0000392}
Mark Slee7c10eaf2007-03-01 02:45:10 +0000393
Marc Slemkod466b212006-07-20 00:04:18 +0000394void ThreadManager::Impl::removeWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000395 std::set<shared_ptr<Thread> > removedThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000396 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000397 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000398 if (value > workerMaxCount_) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000399 throw InvalidArgumentException();
400 }
401
Mark Slee2f6404d2006-10-10 01:37:40 +0000402 workerMaxCount_ -= value;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000403
Mark Slee2f6404d2006-10-10 01:37:40 +0000404 if (idleCount_ < value) {
405 for (size_t ix = 0; ix < idleCount_; ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000406 monitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000407 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000408 } else {
Mark Slee2f6404d2006-10-10 01:37:40 +0000409 monitor_.notifyAll();
Marc Slemko66949872006-07-15 01:52:39 +0000410 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000411 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000412
Mark Sleef5f2be42006-09-05 21:05:31 +0000413 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000414 Synchronized s(workerMonitor_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000415
Mark Slee2f6404d2006-10-10 01:37:40 +0000416 while (workerCount_ != workerMaxCount_) {
417 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000418 }
419
Mark Slee2f6404d2006-10-10 01:37:40 +0000420 for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
421 workers_.erase(*ix);
Marc Slemkoa6479032007-06-05 22:20:14 +0000422 idMap_.erase((*ix)->getId());
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000423 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000424
Mark Slee2f6404d2006-10-10 01:37:40 +0000425 deadWorkers_.clear();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000426 }
427}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000428
429 bool ThreadManager::Impl::canSleep() {
Marc Slemkoa6479032007-06-05 22:20:14 +0000430 const Thread::id_t id = threadFactory_->getCurrentThreadId();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000431 return idMap_.find(id) == idMap_.end();
432 }
433
Mark Slee9b82d272007-05-23 05:16:07 +0000434 void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000435 Synchronized s(monitor_);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000436
Mark Slee2f6404d2006-10-10 01:37:40 +0000437 if (state_ != ThreadManager::STARTED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000438 throw IllegalStateException();
439 }
Marc Slemkod466b212006-07-20 00:04:18 +0000440
Mark Slee2782d6d2007-05-23 04:55:30 +0000441 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
Aditya Agarwal4b6ff2d2007-12-25 22:58:50 +0000442 if (canSleep() && timeout >= 0) {
Mark Slee2782d6d2007-05-23 04:55:30 +0000443 while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000444 monitor_.wait(timeout);
445 }
446 } else {
447 throw TooManyPendingTasksException();
448 }
449 }
450
Mark Slee2f6404d2006-10-10 01:37:40 +0000451 tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000452
Mark Sleef5f2be42006-09-05 21:05:31 +0000453 // If idle thread is available notify it, otherwise all worker threads are
454 // running and will get around to this task in time.
Mark Slee2f6404d2006-10-10 01:37:40 +0000455 if (idleCount_ > 0) {
456 monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000457 }
Marc Slemko66949872006-07-15 01:52:39 +0000458 }
459
Marc Slemko6f038a72006-08-03 18:58:09 +0000460void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000461 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000462 if (state_ != ThreadManager::STARTED) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000463 throw IllegalStateException();
464 }
Marc Slemko66949872006-07-15 01:52:39 +0000465}
466
David Reiss01fe1532010-03-09 05:19:25 +0000467boost::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
468 Guard g(mutex_);
469 if (state_ != ThreadManager::STARTED) {
470 throw IllegalStateException();
471 }
472
473 if (tasks_.empty()) {
474 return boost::shared_ptr<Runnable>();
475 }
476
477 shared_ptr<ThreadManager::Task> task = tasks_.front();
478 tasks_.pop();
479
480 return task->getRunnable();
481}
482
Marc Slemkod466b212006-07-20 00:04:18 +0000483class SimpleThreadManager : public ThreadManager::Impl {
Marc Slemko66949872006-07-15 01:52:39 +0000484
Mark Slee2782d6d2007-05-23 04:55:30 +0000485 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000486 SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000487 workerCount_(workerCount),
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000488 pendingTaskCountMax_(pendingTaskCountMax),
Mark Slee2f6404d2006-10-10 01:37:40 +0000489 firstTime_(true) {
Marc Slemkod466b212006-07-20 00:04:18 +0000490 }
Marc Slemko66949872006-07-15 01:52:39 +0000491
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000492 void start() {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000493 ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000494 ThreadManager::Impl::start();
Mark Slee2f6404d2006-10-10 01:37:40 +0000495 addWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000496 }
497
Mark Slee2782d6d2007-05-23 04:55:30 +0000498 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000499 const size_t workerCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000500 const size_t pendingTaskCountMax_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000501 bool firstTime_;
502 Monitor monitor_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000503};
Marc Slemko66949872006-07-15 01:52:39 +0000504
Marc Slemko66949872006-07-15 01:52:39 +0000505
Marc Slemko6f038a72006-08-03 18:58:09 +0000506shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
507 return shared_ptr<ThreadManager>(new ThreadManager::Impl());
Marc Slemkod466b212006-07-20 00:04:18 +0000508}
Marc Slemko66949872006-07-15 01:52:39 +0000509
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000510shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) {
511 return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
Marc Slemkod466b212006-07-20 00:04:18 +0000512}
Marc Slemko66949872006-07-15 01:52:39 +0000513
T Jake Lucianib5e62212009-01-31 22:36:20 +0000514}}} // apache::thrift::concurrency
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000515