blob: aee57f31b7eba297ebea0e04877ac43ccbe03008 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Marc Slemko66949872006-07-15 01:52:39 +000020#include "ThreadManager.h"
Marc Slemkod466b212006-07-20 00:04:18 +000021#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +000022#include "Monitor.h"
David Reiss068f4162010-03-09 05:19:45 +000023#include "Util.h"
Marc Slemko66949872006-07-15 01:52:39 +000024
Marc Slemko6f038a72006-08-03 18:58:09 +000025#include <boost/shared_ptr.hpp>
26
Marc Slemko66949872006-07-15 01:52:39 +000027#include <assert.h>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000028#include <queue>
29#include <set>
Marc Slemko66949872006-07-15 01:52:39 +000030
Marc Slemko6f038a72006-08-03 18:58:09 +000031#if defined(DEBUG)
32#include <iostream>
33#endif //defined(DEBUG)
34
T Jake Lucianib5e62212009-01-31 22:36:20 +000035namespace apache { namespace thrift { namespace concurrency {
Marc Slemko66949872006-07-15 01:52:39 +000036
David Reissd4a269c2007-08-23 02:37:19 +000037using boost::shared_ptr;
38using boost::dynamic_pointer_cast;
Marc Slemko6f038a72006-08-03 18:58:09 +000039
Mark Sleef5f2be42006-09-05 21:05:31 +000040/**
41 * ThreadManager class
Marc Slemko3a3b53b2007-05-22 23:59:54 +000042 *
Mark Sleef5f2be42006-09-05 21:05:31 +000043 * This class manages a pool of threads. It uses a ThreadFactory to create
44 * threads. It never actually creates or destroys worker threads, rather
45 * it maintains statistics on number of idle threads, number of active threads,
46 * task backlog, and average wait and service times.
47 *
Mark Sleef5f2be42006-09-05 21:05:31 +000048 * @version $Id:$
49 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000050class ThreadManager::Impl : public ThreadManager {
51
52 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000053 Impl() :
Mark Slee2f6404d2006-10-10 01:37:40 +000054 workerCount_(0),
55 workerMaxCount_(0),
56 idleCount_(0),
Marc Slemko3a3b53b2007-05-22 23:59:54 +000057 pendingTaskCountMax_(0),
David Reiss068f4162010-03-09 05:19:45 +000058 expiredCount_(0),
David Reissa0dbfef2010-03-09 05:19:32 +000059 state_(ThreadManager::UNINITIALIZED),
60 monitor_(&mutex_),
61 maxMonitor_(&mutex_) {}
Marc Slemkod466b212006-07-20 00:04:18 +000062
Mark Sleef5f2be42006-09-05 21:05:31 +000063 ~Impl() { stop(); }
Marc Slemkod466b212006-07-20 00:04:18 +000064
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000065 void start();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000066
Mark Slee7c10eaf2007-03-01 02:45:10 +000067 void stop() { stopImpl(false); }
68
69 void join() { stopImpl(true); }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000070
Mark Slee2f6404d2006-10-10 01:37:40 +000071 const ThreadManager::STATE state() const {
72 return state_;
73 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000074
Marc Slemko6f038a72006-08-03 18:58:09 +000075 shared_ptr<ThreadFactory> threadFactory() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000076 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000077 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000078 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000079
80 void threadFactory(shared_ptr<ThreadFactory> value) {
Mark Slee2f6404d2006-10-10 01:37:40 +000081 Synchronized s(monitor_);
82 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000083 }
84
Marc Slemkod466b212006-07-20 00:04:18 +000085 void addWorker(size_t value);
Marc Slemko3a3b53b2007-05-22 23:59:54 +000086
Marc Slemkod466b212006-07-20 00:04:18 +000087 void removeWorker(size_t value);
Marc Slemko3a3b53b2007-05-22 23:59:54 +000088
Mark Slee2f6404d2006-10-10 01:37:40 +000089 size_t idleWorkerCount() const {
90 return idleCount_;
91 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000092
93 size_t workerCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000094 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000095 return workerCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000096 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000097
Marc Slemko0e53ccd2006-07-17 23:51:05 +000098 size_t pendingTaskCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000099 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000100 return tasks_.size();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000101 }
102
103 size_t totalTaskCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000104 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000105 return tasks_.size() + workerCount_ - idleCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000106 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000107
108 size_t pendingTaskCountMax() const {
109 Synchronized s(monitor_);
110 return pendingTaskCountMax_;
111 }
112
David Reiss068f4162010-03-09 05:19:45 +0000113 size_t expiredTaskCount() {
114 Synchronized s(monitor_);
115 size_t result = expiredCount_;
116 expiredCount_ = 0;
117 return result;
118 }
119
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000120 void pendingTaskCountMax(const size_t value) {
121 Synchronized s(monitor_);
122 pendingTaskCountMax_ = value;
123 }
124
125 bool canSleep();
126
David Reiss068f4162010-03-09 05:19:45 +0000127 void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000128
Marc Slemko6f038a72006-08-03 18:58:09 +0000129 void remove(shared_ptr<Runnable> task);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000130
David Reiss01fe1532010-03-09 05:19:25 +0000131 shared_ptr<Runnable> removeNextPending();
132
David Reiss068f4162010-03-09 05:19:45 +0000133 void removeExpiredTasks();
134
135 void setExpireCallback(ExpireCallback expireCallback);
136
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000137private:
Mark Slee7c10eaf2007-03-01 02:45:10 +0000138 void stopImpl(bool join);
139
Mark Slee2f6404d2006-10-10 01:37:40 +0000140 size_t workerCount_;
141 size_t workerMaxCount_;
142 size_t idleCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000143 size_t pendingTaskCountMax_;
David Reiss068f4162010-03-09 05:19:45 +0000144 size_t expiredCount_;
145 ExpireCallback expireCallback_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000146
Mark Slee2f6404d2006-10-10 01:37:40 +0000147 ThreadManager::STATE state_;
148 shared_ptr<ThreadFactory> threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000149
Mark Sleef5f2be42006-09-05 21:05:31 +0000150
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000151 friend class ThreadManager::Task;
Mark Slee2f6404d2006-10-10 01:37:40 +0000152 std::queue<shared_ptr<Task> > tasks_;
David Reissa0dbfef2010-03-09 05:19:32 +0000153 Mutex mutex_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000154 Monitor monitor_;
David Reissa0dbfef2010-03-09 05:19:32 +0000155 Monitor maxMonitor_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000156 Monitor workerMonitor_;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000157
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000158 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000159 std::set<shared_ptr<Thread> > workers_;
160 std::set<shared_ptr<Thread> > deadWorkers_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000161 std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000162};
Marc Slemko66949872006-07-15 01:52:39 +0000163
164class ThreadManager::Task : public Runnable {
165
Mark Sleef5f2be42006-09-05 21:05:31 +0000166 public:
Marc Slemko66949872006-07-15 01:52:39 +0000167 enum STATE {
168 WAITING,
169 EXECUTING,
170 CANCELLED,
171 COMPLETE
172 };
173
David Reiss068f4162010-03-09 05:19:45 +0000174 Task(shared_ptr<Runnable> runnable, int64_t expiration=0LL) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000175 runnable_(runnable),
David Reiss068f4162010-03-09 05:19:45 +0000176 state_(WAITING),
177 expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {}
Marc Slemko66949872006-07-15 01:52:39 +0000178
Mark Sleef5f2be42006-09-05 21:05:31 +0000179 ~Task() {}
Marc Slemko66949872006-07-15 01:52:39 +0000180
181 void run() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000182 if (state_ == EXECUTING) {
183 runnable_->run();
184 state_ = COMPLETE;
Marc Slemko66949872006-07-15 01:52:39 +0000185 }
186 }
187
David Reiss01fe1532010-03-09 05:19:25 +0000188 shared_ptr<Runnable> getRunnable() {
189 return runnable_;
190 }
191
David Reiss068f4162010-03-09 05:19:45 +0000192 int64_t getExpireTime() const {
193 return expireTime_;
194 }
195
Marc Slemko66949872006-07-15 01:52:39 +0000196 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000197 shared_ptr<Runnable> runnable_;
Marc Slemkod466b212006-07-20 00:04:18 +0000198 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000199 STATE state_;
David Reiss068f4162010-03-09 05:19:45 +0000200 int64_t expireTime_;
Marc Slemko66949872006-07-15 01:52:39 +0000201};
202
203class ThreadManager::Worker: public Runnable {
Marc Slemko66949872006-07-15 01:52:39 +0000204 enum STATE {
205 UNINITIALIZED,
206 STARTING,
207 STARTED,
208 STOPPING,
209 STOPPED
210 };
211
212 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000213 Worker(ThreadManager::Impl* manager) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000214 manager_(manager),
215 state_(UNINITIALIZED),
216 idle_(false) {}
Marc Slemko66949872006-07-15 01:52:39 +0000217
218 ~Worker() {}
219
Mark Slee7c10eaf2007-03-01 02:45:10 +0000220 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000221 bool isActive() const {
Mark Slee7c10eaf2007-03-01 02:45:10 +0000222 return
223 (manager_->workerCount_ <= manager_->workerMaxCount_) ||
224 (manager_->state_ == JOINING && !manager_->tasks_.empty());
Mark Slee2f6404d2006-10-10 01:37:40 +0000225 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000226
Mark Slee7c10eaf2007-03-01 02:45:10 +0000227 public:
Mark Sleef5f2be42006-09-05 21:05:31 +0000228 /**
229 * Worker entry point
230 *
231 * As long as worker thread is running, pull tasks off the task queue and
232 * execute.
233 */
Marc Slemko66949872006-07-15 01:52:39 +0000234 void run() {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000235 bool active = false;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000236 bool notifyManager = false;
237
Mark Sleef5f2be42006-09-05 21:05:31 +0000238 /**
239 * Increment worker semaphore and notify manager if worker count reached
240 * desired max
241 *
242 * Note: We have to release the monitor and acquire the workerMonitor
243 * since that is what the manager blocks on for worker add/remove
244 */
245 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000246 Synchronized s(manager_->monitor_);
247 active = manager_->workerCount_ < manager_->workerMaxCount_;
Mark Sleef5f2be42006-09-05 21:05:31 +0000248 if (active) {
David Reiss96d23882007-07-26 21:10:32 +0000249 manager_->workerCount_++;
250 notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
Marc Slemko66949872006-07-15 01:52:39 +0000251 }
252 }
253
Mark Sleef5f2be42006-09-05 21:05:31 +0000254 if (notifyManager) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000255 Synchronized s(manager_->workerMonitor_);
256 manager_->workerMonitor_.notify();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000257 notifyManager = false;
258 }
259
Mark Sleef5f2be42006-09-05 21:05:31 +0000260 while (active) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000261 shared_ptr<ThreadManager::Task> task;
Marc Slemko66949872006-07-15 01:52:39 +0000262
Mark Sleef5f2be42006-09-05 21:05:31 +0000263 /**
264 * While holding manager monitor block for non-empty task queue (Also
265 * check that the thread hasn't been requested to stop). Once the queue
266 * is non-empty, dequeue a task, release monitor, and execute. If the
267 * worker max count has been decremented such that we exceed it, mark
268 * ourself inactive, decrement the worker count and notify the manager
269 * (technically we're notifying the next blocked thread but eventually
270 * the manager will see it.
271 */
272 {
David Reissa0dbfef2010-03-09 05:19:32 +0000273 Guard g(manager_->mutex_);
David Reiss96d23882007-07-26 21:10:32 +0000274 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000275
David Reiss96d23882007-07-26 21:10:32 +0000276 while (active && manager_->tasks_.empty()) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000277 manager_->idleCount_++;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000278 idle_ = true;
Mark Slee2f6404d2006-10-10 01:37:40 +0000279 manager_->monitor_.wait();
Mark Sleef5f2be42006-09-05 21:05:31 +0000280 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000281 idle_ = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000282 manager_->idleCount_--;
David Reiss96d23882007-07-26 21:10:32 +0000283 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000284
David Reiss96d23882007-07-26 21:10:32 +0000285 if (active) {
David Reiss068f4162010-03-09 05:19:45 +0000286 manager_->removeExpiredTasks();
287
Mark Slee2f6404d2006-10-10 01:37:40 +0000288 if (!manager_->tasks_.empty()) {
289 task = manager_->tasks_.front();
290 manager_->tasks_.pop();
291 if (task->state_ == ThreadManager::Task::WAITING) {
292 task->state_ = ThreadManager::Task::EXECUTING;
David Reiss96d23882007-07-26 21:10:32 +0000293 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000294
295 /* If we have a pending task max and we just dropped below it, wakeup any
296 thread that might be blocked on add. */
Mark Slee2782d6d2007-05-23 04:55:30 +0000297 if (manager_->pendingTaskCountMax_ != 0 &&
David Reiss4cc07552010-03-09 05:20:01 +0000298 manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
David Reissa0dbfef2010-03-09 05:19:32 +0000299 manager_->maxMonitor_.notify();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000300 }
David Reiss96d23882007-07-26 21:10:32 +0000301 }
302 } else {
303 idle_ = true;
304 manager_->workerCount_--;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000305 notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
David Reiss96d23882007-07-26 21:10:32 +0000306 }
Marc Slemko66949872006-07-15 01:52:39 +0000307 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000308
Mark Sleef5f2be42006-09-05 21:05:31 +0000309 if (task != NULL) {
David Reiss96d23882007-07-26 21:10:32 +0000310 if (task->state_ == ThreadManager::Task::EXECUTING) {
311 try {
Mark Sleef5f2be42006-09-05 21:05:31 +0000312 task->run();
313 } catch(...) {
314 // XXX need to log this
David Reiss96d23882007-07-26 21:10:32 +0000315 }
316 }
Marc Slemko66949872006-07-15 01:52:39 +0000317 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000318 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000319
Mark Sleef5f2be42006-09-05 21:05:31 +0000320 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000321 Synchronized s(manager_->workerMonitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000322 manager_->deadWorkers_.insert(this->thread());
Mark Sleef5f2be42006-09-05 21:05:31 +0000323 if (notifyManager) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000324 manager_->workerMonitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000325 }
326 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000327
Marc Slemko66949872006-07-15 01:52:39 +0000328 return;
329 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000330
Mark Sleef5f2be42006-09-05 21:05:31 +0000331 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000332 ThreadManager::Impl* manager_;
Mark Sleef5f2be42006-09-05 21:05:31 +0000333 friend class ThreadManager::Impl;
Mark Slee2f6404d2006-10-10 01:37:40 +0000334 STATE state_;
335 bool idle_;
Marc Slemko66949872006-07-15 01:52:39 +0000336};
337
Mark Sleef5f2be42006-09-05 21:05:31 +0000338
339 void ThreadManager::Impl::addWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000340 std::set<shared_ptr<Thread> > newThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000341 for (size_t ix = 0; ix < value; ix++) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000342 class ThreadManager::Worker;
Marc Slemko6f038a72006-08-03 18:58:09 +0000343 shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
Mark Slee2f6404d2006-10-10 01:37:40 +0000344 newThreads.insert(threadFactory_->newThread(worker));
Marc Slemko66949872006-07-15 01:52:39 +0000345 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000346
Mark Sleef5f2be42006-09-05 21:05:31 +0000347 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000348 Synchronized s(monitor_);
349 workerMaxCount_ += value;
350 workers_.insert(newThreads.begin(), newThreads.end());
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000351 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000352
Mark Sleef5f2be42006-09-05 21:05:31 +0000353 for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000354 shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
Mark Slee2f6404d2006-10-10 01:37:40 +0000355 worker->state_ = ThreadManager::Worker::STARTING;
Marc Slemkod466b212006-07-20 00:04:18 +0000356 (*ix)->start();
Marc Slemkoa6479032007-06-05 22:20:14 +0000357 idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
Marc Slemkod466b212006-07-20 00:04:18 +0000358 }
359
Mark Sleef5f2be42006-09-05 21:05:31 +0000360 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000361 Synchronized s(workerMonitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000362 while (workerCount_ != workerMaxCount_) {
363 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000364 }
365 }
366}
Marc Slemkod466b212006-07-20 00:04:18 +0000367
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000368void ThreadManager::Impl::start() {
369
Mark Slee2f6404d2006-10-10 01:37:40 +0000370 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000371 return;
372 }
373
Mark Sleef5f2be42006-09-05 21:05:31 +0000374 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000375 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000376 if (state_ == ThreadManager::UNINITIALIZED) {
377 if (threadFactory_ == NULL) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000378 throw InvalidArgumentException();
379 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000380 state_ = ThreadManager::STARTED;
381 monitor_.notifyAll();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000382 }
383
Mark Slee2f6404d2006-10-10 01:37:40 +0000384 while (state_ == STARTING) {
385 monitor_.wait();
Marc Slemkod466b212006-07-20 00:04:18 +0000386 }
387 }
388}
389
Mark Slee7c10eaf2007-03-01 02:45:10 +0000390void ThreadManager::Impl::stopImpl(bool join) {
Marc Slemkod466b212006-07-20 00:04:18 +0000391 bool doStop = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000392 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000393 return;
394 }
395
Mark Sleef5f2be42006-09-05 21:05:31 +0000396 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000397 Synchronized s(monitor_);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000398 if (state_ != ThreadManager::STOPPING &&
399 state_ != ThreadManager::JOINING &&
400 state_ != ThreadManager::STOPPED) {
Marc Slemkod466b212006-07-20 00:04:18 +0000401 doStop = true;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000402 state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
Marc Slemkod466b212006-07-20 00:04:18 +0000403 }
404 }
405
Mark Sleef5f2be42006-09-05 21:05:31 +0000406 if (doStop) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000407 removeWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000408 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000409
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000410 // XXX
Mark Sleef5f2be42006-09-05 21:05:31 +0000411 // should be able to block here for transition to STOPPED since we're no
412 // using shared_ptrs
Mark Slee7c10eaf2007-03-01 02:45:10 +0000413
414 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000415 Synchronized s(monitor_);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000416 state_ = ThreadManager::STOPPED;
417 }
418
Marc Slemkod466b212006-07-20 00:04:18 +0000419}
Mark Slee7c10eaf2007-03-01 02:45:10 +0000420
Marc Slemkod466b212006-07-20 00:04:18 +0000421void ThreadManager::Impl::removeWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000422 std::set<shared_ptr<Thread> > removedThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000423 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000424 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000425 if (value > workerMaxCount_) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000426 throw InvalidArgumentException();
427 }
428
Mark Slee2f6404d2006-10-10 01:37:40 +0000429 workerMaxCount_ -= value;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000430
Mark Slee2f6404d2006-10-10 01:37:40 +0000431 if (idleCount_ < value) {
432 for (size_t ix = 0; ix < idleCount_; ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000433 monitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000434 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000435 } else {
Mark Slee2f6404d2006-10-10 01:37:40 +0000436 monitor_.notifyAll();
Marc Slemko66949872006-07-15 01:52:39 +0000437 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000438 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000439
Mark Sleef5f2be42006-09-05 21:05:31 +0000440 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000441 Synchronized s(workerMonitor_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000442
Mark Slee2f6404d2006-10-10 01:37:40 +0000443 while (workerCount_ != workerMaxCount_) {
444 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000445 }
446
Mark Slee2f6404d2006-10-10 01:37:40 +0000447 for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
448 workers_.erase(*ix);
Marc Slemkoa6479032007-06-05 22:20:14 +0000449 idMap_.erase((*ix)->getId());
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000450 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000451
Mark Slee2f6404d2006-10-10 01:37:40 +0000452 deadWorkers_.clear();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000453 }
454}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000455
456 bool ThreadManager::Impl::canSleep() {
Marc Slemkoa6479032007-06-05 22:20:14 +0000457 const Thread::id_t id = threadFactory_->getCurrentThreadId();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000458 return idMap_.find(id) == idMap_.end();
459 }
460
David Reiss068f4162010-03-09 05:19:45 +0000461 void ThreadManager::Impl::add(shared_ptr<Runnable> value,
462 int64_t timeout,
463 int64_t expiration) {
David Reiss4e19f192010-03-09 05:19:59 +0000464 Guard g(mutex_, timeout);
465
466 if (!g) {
467 throw TimedOutException();
468 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000469
Mark Slee2f6404d2006-10-10 01:37:40 +0000470 if (state_ != ThreadManager::STARTED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000471 throw IllegalStateException();
472 }
Marc Slemkod466b212006-07-20 00:04:18 +0000473
David Reiss068f4162010-03-09 05:19:45 +0000474 removeExpiredTasks();
Mark Slee2782d6d2007-05-23 04:55:30 +0000475 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
Aditya Agarwal4b6ff2d2007-12-25 22:58:50 +0000476 if (canSleep() && timeout >= 0) {
Mark Slee2782d6d2007-05-23 04:55:30 +0000477 while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
David Reissa0dbfef2010-03-09 05:19:32 +0000478 // This is thread safe because the mutex is shared between monitors.
479 maxMonitor_.wait(timeout);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000480 }
481 } else {
482 throw TooManyPendingTasksException();
483 }
484 }
485
David Reiss068f4162010-03-09 05:19:45 +0000486 tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration)));
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000487
Mark Sleef5f2be42006-09-05 21:05:31 +0000488 // If idle thread is available notify it, otherwise all worker threads are
489 // running and will get around to this task in time.
Mark Slee2f6404d2006-10-10 01:37:40 +0000490 if (idleCount_ > 0) {
491 monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000492 }
Marc Slemko66949872006-07-15 01:52:39 +0000493 }
494
Marc Slemko6f038a72006-08-03 18:58:09 +0000495void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000496 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000497 if (state_ != ThreadManager::STARTED) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000498 throw IllegalStateException();
499 }
Marc Slemko66949872006-07-15 01:52:39 +0000500}
501
David Reiss01fe1532010-03-09 05:19:25 +0000502boost::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
503 Guard g(mutex_);
504 if (state_ != ThreadManager::STARTED) {
505 throw IllegalStateException();
506 }
507
508 if (tasks_.empty()) {
509 return boost::shared_ptr<Runnable>();
510 }
511
512 shared_ptr<ThreadManager::Task> task = tasks_.front();
513 tasks_.pop();
514
515 return task->getRunnable();
516}
517
David Reiss068f4162010-03-09 05:19:45 +0000518void ThreadManager::Impl::removeExpiredTasks() {
519 int64_t now = 0LL; // we won't ask for the time untile we need it
520
521 // note that this loop breaks at the first non-expiring task
522 while (!tasks_.empty()) {
523 shared_ptr<ThreadManager::Task> task = tasks_.front();
524 if (task->getExpireTime() == 0LL) {
525 break;
526 }
527 if (now == 0LL) {
528 now = Util::currentTime();
529 }
530 if (task->getExpireTime() > now) {
531 break;
532 }
533 if (expireCallback_) {
534 expireCallback_(task->getRunnable());
535 }
536 tasks_.pop();
537 expiredCount_++;
538 }
539}
540
541
542void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
543 expireCallback_ = expireCallback;
544}
545
Marc Slemkod466b212006-07-20 00:04:18 +0000546class SimpleThreadManager : public ThreadManager::Impl {
Marc Slemko66949872006-07-15 01:52:39 +0000547
Mark Slee2782d6d2007-05-23 04:55:30 +0000548 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000549 SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000550 workerCount_(workerCount),
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000551 pendingTaskCountMax_(pendingTaskCountMax),
Mark Slee2f6404d2006-10-10 01:37:40 +0000552 firstTime_(true) {
Marc Slemkod466b212006-07-20 00:04:18 +0000553 }
Marc Slemko66949872006-07-15 01:52:39 +0000554
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000555 void start() {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000556 ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000557 ThreadManager::Impl::start();
Mark Slee2f6404d2006-10-10 01:37:40 +0000558 addWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000559 }
560
Mark Slee2782d6d2007-05-23 04:55:30 +0000561 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000562 const size_t workerCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000563 const size_t pendingTaskCountMax_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000564 bool firstTime_;
565 Monitor monitor_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000566};
Marc Slemko66949872006-07-15 01:52:39 +0000567
Marc Slemko66949872006-07-15 01:52:39 +0000568
Marc Slemko6f038a72006-08-03 18:58:09 +0000569shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
570 return shared_ptr<ThreadManager>(new ThreadManager::Impl());
Marc Slemkod466b212006-07-20 00:04:18 +0000571}
Marc Slemko66949872006-07-15 01:52:39 +0000572
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000573shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) {
574 return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
Marc Slemkod466b212006-07-20 00:04:18 +0000575}
Marc Slemko66949872006-07-15 01:52:39 +0000576
T Jake Lucianib5e62212009-01-31 22:36:20 +0000577}}} // apache::thrift::concurrency
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000578