blob: 8a866671191dfcd9436ed20c3f4c84e166377c02 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko66949872006-07-15 01:52:39 +00007#include "ThreadManager.h"
Marc Slemkod466b212006-07-20 00:04:18 +00008#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +00009#include "Monitor.h"
Marc Slemko66949872006-07-15 01:52:39 +000010
Marc Slemko6f038a72006-08-03 18:58:09 +000011#include <boost/shared_ptr.hpp>
12
Marc Slemko66949872006-07-15 01:52:39 +000013#include <assert.h>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000014#include <queue>
15#include <set>
Marc Slemko66949872006-07-15 01:52:39 +000016
Marc Slemko6f038a72006-08-03 18:58:09 +000017#if defined(DEBUG)
18#include <iostream>
19#endif //defined(DEBUG)
20
T Jake Lucianib5e62212009-01-31 22:36:20 +000021namespace apache { namespace thrift { namespace concurrency {
Marc Slemko66949872006-07-15 01:52:39 +000022
David Reissd4a269c2007-08-23 02:37:19 +000023using boost::shared_ptr;
24using boost::dynamic_pointer_cast;
Marc Slemko6f038a72006-08-03 18:58:09 +000025
Mark Sleef5f2be42006-09-05 21:05:31 +000026/**
27 * ThreadManager class
Marc Slemko3a3b53b2007-05-22 23:59:54 +000028 *
Mark Sleef5f2be42006-09-05 21:05:31 +000029 * This class manages a pool of threads. It uses a ThreadFactory to create
30 * threads. It never actually creates or destroys worker threads, rather
31 * it maintains statistics on number of idle threads, number of active threads,
32 * task backlog, and average wait and service times.
33 *
Mark Sleef5f2be42006-09-05 21:05:31 +000034 * @version $Id:$
35 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000036class ThreadManager::Impl : public ThreadManager {
37
38 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000039 Impl() :
Mark Slee2f6404d2006-10-10 01:37:40 +000040 workerCount_(0),
41 workerMaxCount_(0),
42 idleCount_(0),
Marc Slemko3a3b53b2007-05-22 23:59:54 +000043 pendingTaskCountMax_(0),
Mark Slee2f6404d2006-10-10 01:37:40 +000044 state_(ThreadManager::UNINITIALIZED) {}
Marc Slemkod466b212006-07-20 00:04:18 +000045
Mark Sleef5f2be42006-09-05 21:05:31 +000046 ~Impl() { stop(); }
Marc Slemkod466b212006-07-20 00:04:18 +000047
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000048 void start();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000049
Mark Slee7c10eaf2007-03-01 02:45:10 +000050 void stop() { stopImpl(false); }
51
52 void join() { stopImpl(true); }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000053
Mark Slee2f6404d2006-10-10 01:37:40 +000054 const ThreadManager::STATE state() const {
55 return state_;
56 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000057
Marc Slemko6f038a72006-08-03 18:58:09 +000058 shared_ptr<ThreadFactory> threadFactory() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000059 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000060 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000061 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000062
63 void threadFactory(shared_ptr<ThreadFactory> value) {
Mark Slee2f6404d2006-10-10 01:37:40 +000064 Synchronized s(monitor_);
65 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000066 }
67
Marc Slemkod466b212006-07-20 00:04:18 +000068 void addWorker(size_t value);
Marc Slemko3a3b53b2007-05-22 23:59:54 +000069
Marc Slemkod466b212006-07-20 00:04:18 +000070 void removeWorker(size_t value);
Marc Slemko3a3b53b2007-05-22 23:59:54 +000071
Mark Slee2f6404d2006-10-10 01:37:40 +000072 size_t idleWorkerCount() const {
73 return idleCount_;
74 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000075
76 size_t workerCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000077 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000078 return workerCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000079 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000080
Marc Slemko0e53ccd2006-07-17 23:51:05 +000081 size_t pendingTaskCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000082 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000083 return tasks_.size();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000084 }
85
86 size_t totalTaskCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000087 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000088 return tasks_.size() + workerCount_ - idleCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000089 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000090
91 size_t pendingTaskCountMax() const {
92 Synchronized s(monitor_);
93 return pendingTaskCountMax_;
94 }
95
96 void pendingTaskCountMax(const size_t value) {
97 Synchronized s(monitor_);
98 pendingTaskCountMax_ = value;
99 }
100
101 bool canSleep();
102
Mark Slee9b82d272007-05-23 05:16:07 +0000103 void add(shared_ptr<Runnable> value, int64_t timeout);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000104
Marc Slemko6f038a72006-08-03 18:58:09 +0000105 void remove(shared_ptr<Runnable> task);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000106
107private:
Mark Slee7c10eaf2007-03-01 02:45:10 +0000108 void stopImpl(bool join);
109
Mark Slee2f6404d2006-10-10 01:37:40 +0000110 size_t workerCount_;
111 size_t workerMaxCount_;
112 size_t idleCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000113 size_t pendingTaskCountMax_;
114
Mark Slee2f6404d2006-10-10 01:37:40 +0000115 ThreadManager::STATE state_;
116 shared_ptr<ThreadFactory> threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000117
Mark Sleef5f2be42006-09-05 21:05:31 +0000118
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000119 friend class ThreadManager::Task;
Mark Slee2f6404d2006-10-10 01:37:40 +0000120 std::queue<shared_ptr<Task> > tasks_;
121 Monitor monitor_;
122 Monitor workerMonitor_;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000123
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000124 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000125 std::set<shared_ptr<Thread> > workers_;
126 std::set<shared_ptr<Thread> > deadWorkers_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000127 std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000128};
Marc Slemko66949872006-07-15 01:52:39 +0000129
130class ThreadManager::Task : public Runnable {
131
Mark Sleef5f2be42006-09-05 21:05:31 +0000132 public:
Marc Slemko66949872006-07-15 01:52:39 +0000133 enum STATE {
134 WAITING,
135 EXECUTING,
136 CANCELLED,
137 COMPLETE
138 };
139
Marc Slemko6f038a72006-08-03 18:58:09 +0000140 Task(shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000141 runnable_(runnable),
142 state_(WAITING) {}
Marc Slemko66949872006-07-15 01:52:39 +0000143
Mark Sleef5f2be42006-09-05 21:05:31 +0000144 ~Task() {}
Marc Slemko66949872006-07-15 01:52:39 +0000145
146 void run() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000147 if (state_ == EXECUTING) {
148 runnable_->run();
149 state_ = COMPLETE;
Marc Slemko66949872006-07-15 01:52:39 +0000150 }
151 }
152
153 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000154 shared_ptr<Runnable> runnable_;
Marc Slemkod466b212006-07-20 00:04:18 +0000155 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000156 STATE state_;
Marc Slemko66949872006-07-15 01:52:39 +0000157};
158
159class ThreadManager::Worker: public Runnable {
Marc Slemko66949872006-07-15 01:52:39 +0000160 enum STATE {
161 UNINITIALIZED,
162 STARTING,
163 STARTED,
164 STOPPING,
165 STOPPED
166 };
167
168 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000169 Worker(ThreadManager::Impl* manager) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000170 manager_(manager),
171 state_(UNINITIALIZED),
172 idle_(false) {}
Marc Slemko66949872006-07-15 01:52:39 +0000173
174 ~Worker() {}
175
Mark Slee7c10eaf2007-03-01 02:45:10 +0000176 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000177 bool isActive() const {
Mark Slee7c10eaf2007-03-01 02:45:10 +0000178 return
179 (manager_->workerCount_ <= manager_->workerMaxCount_) ||
180 (manager_->state_ == JOINING && !manager_->tasks_.empty());
Mark Slee2f6404d2006-10-10 01:37:40 +0000181 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000182
Mark Slee7c10eaf2007-03-01 02:45:10 +0000183 public:
Mark Sleef5f2be42006-09-05 21:05:31 +0000184 /**
185 * Worker entry point
186 *
187 * As long as worker thread is running, pull tasks off the task queue and
188 * execute.
189 */
Marc Slemko66949872006-07-15 01:52:39 +0000190 void run() {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000191 bool active = false;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000192 bool notifyManager = false;
193
Mark Sleef5f2be42006-09-05 21:05:31 +0000194 /**
195 * Increment worker semaphore and notify manager if worker count reached
196 * desired max
197 *
198 * Note: We have to release the monitor and acquire the workerMonitor
199 * since that is what the manager blocks on for worker add/remove
200 */
201 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000202 Synchronized s(manager_->monitor_);
203 active = manager_->workerCount_ < manager_->workerMaxCount_;
Mark Sleef5f2be42006-09-05 21:05:31 +0000204 if (active) {
David Reiss96d23882007-07-26 21:10:32 +0000205 manager_->workerCount_++;
206 notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
Marc Slemko66949872006-07-15 01:52:39 +0000207 }
208 }
209
Mark Sleef5f2be42006-09-05 21:05:31 +0000210 if (notifyManager) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000211 Synchronized s(manager_->workerMonitor_);
212 manager_->workerMonitor_.notify();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000213 notifyManager = false;
214 }
215
Mark Sleef5f2be42006-09-05 21:05:31 +0000216 while (active) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000217 shared_ptr<ThreadManager::Task> task;
Marc Slemko66949872006-07-15 01:52:39 +0000218
Mark Sleef5f2be42006-09-05 21:05:31 +0000219 /**
220 * While holding manager monitor block for non-empty task queue (Also
221 * check that the thread hasn't been requested to stop). Once the queue
222 * is non-empty, dequeue a task, release monitor, and execute. If the
223 * worker max count has been decremented such that we exceed it, mark
224 * ourself inactive, decrement the worker count and notify the manager
225 * (technically we're notifying the next blocked thread but eventually
226 * the manager will see it.
227 */
228 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000229 Synchronized s(manager_->monitor_);
David Reiss96d23882007-07-26 21:10:32 +0000230 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000231
David Reiss96d23882007-07-26 21:10:32 +0000232 while (active && manager_->tasks_.empty()) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000233 manager_->idleCount_++;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000234 idle_ = true;
Mark Slee2f6404d2006-10-10 01:37:40 +0000235 manager_->monitor_.wait();
Mark Sleef5f2be42006-09-05 21:05:31 +0000236 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000237 idle_ = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000238 manager_->idleCount_--;
David Reiss96d23882007-07-26 21:10:32 +0000239 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000240
David Reiss96d23882007-07-26 21:10:32 +0000241 if (active) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000242 if (!manager_->tasks_.empty()) {
243 task = manager_->tasks_.front();
244 manager_->tasks_.pop();
245 if (task->state_ == ThreadManager::Task::WAITING) {
246 task->state_ = ThreadManager::Task::EXECUTING;
David Reiss96d23882007-07-26 21:10:32 +0000247 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000248
249 /* If we have a pending task max and we just dropped below it, wakeup any
250 thread that might be blocked on add. */
Mark Slee2782d6d2007-05-23 04:55:30 +0000251 if (manager_->pendingTaskCountMax_ != 0 &&
252 manager_->tasks_.size() == manager_->pendingTaskCountMax_ - 1) {
David Reisse3a64922008-06-10 22:55:04 +0000253 manager_->monitor_.notify();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000254 }
David Reiss96d23882007-07-26 21:10:32 +0000255 }
256 } else {
257 idle_ = true;
258 manager_->workerCount_--;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000259 notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
David Reiss96d23882007-07-26 21:10:32 +0000260 }
Marc Slemko66949872006-07-15 01:52:39 +0000261 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000262
Mark Sleef5f2be42006-09-05 21:05:31 +0000263 if (task != NULL) {
David Reiss96d23882007-07-26 21:10:32 +0000264 if (task->state_ == ThreadManager::Task::EXECUTING) {
265 try {
Mark Sleef5f2be42006-09-05 21:05:31 +0000266 task->run();
267 } catch(...) {
268 // XXX need to log this
David Reiss96d23882007-07-26 21:10:32 +0000269 }
270 }
Marc Slemko66949872006-07-15 01:52:39 +0000271 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000272 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000273
Mark Sleef5f2be42006-09-05 21:05:31 +0000274 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000275 Synchronized s(manager_->workerMonitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000276 manager_->deadWorkers_.insert(this->thread());
Mark Sleef5f2be42006-09-05 21:05:31 +0000277 if (notifyManager) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000278 manager_->workerMonitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000279 }
280 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000281
Marc Slemko66949872006-07-15 01:52:39 +0000282 return;
283 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000284
Mark Sleef5f2be42006-09-05 21:05:31 +0000285 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000286 ThreadManager::Impl* manager_;
Mark Sleef5f2be42006-09-05 21:05:31 +0000287 friend class ThreadManager::Impl;
Mark Slee2f6404d2006-10-10 01:37:40 +0000288 STATE state_;
289 bool idle_;
Marc Slemko66949872006-07-15 01:52:39 +0000290};
291
Mark Sleef5f2be42006-09-05 21:05:31 +0000292
293 void ThreadManager::Impl::addWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000294 std::set<shared_ptr<Thread> > newThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000295 for (size_t ix = 0; ix < value; ix++) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000296 class ThreadManager::Worker;
Marc Slemko6f038a72006-08-03 18:58:09 +0000297 shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
Mark Slee2f6404d2006-10-10 01:37:40 +0000298 newThreads.insert(threadFactory_->newThread(worker));
Marc Slemko66949872006-07-15 01:52:39 +0000299 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000300
Mark Sleef5f2be42006-09-05 21:05:31 +0000301 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000302 Synchronized s(monitor_);
303 workerMaxCount_ += value;
304 workers_.insert(newThreads.begin(), newThreads.end());
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000305 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000306
Mark Sleef5f2be42006-09-05 21:05:31 +0000307 for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000308 shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
Mark Slee2f6404d2006-10-10 01:37:40 +0000309 worker->state_ = ThreadManager::Worker::STARTING;
Marc Slemkod466b212006-07-20 00:04:18 +0000310 (*ix)->start();
Marc Slemkoa6479032007-06-05 22:20:14 +0000311 idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
Marc Slemkod466b212006-07-20 00:04:18 +0000312 }
313
Mark Sleef5f2be42006-09-05 21:05:31 +0000314 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000315 Synchronized s(workerMonitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000316 while (workerCount_ != workerMaxCount_) {
317 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000318 }
319 }
320}
Marc Slemkod466b212006-07-20 00:04:18 +0000321
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000322void ThreadManager::Impl::start() {
323
Mark Slee2f6404d2006-10-10 01:37:40 +0000324 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000325 return;
326 }
327
Mark Sleef5f2be42006-09-05 21:05:31 +0000328 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000329 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000330 if (state_ == ThreadManager::UNINITIALIZED) {
331 if (threadFactory_ == NULL) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000332 throw InvalidArgumentException();
333 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000334 state_ = ThreadManager::STARTED;
335 monitor_.notifyAll();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000336 }
337
Mark Slee2f6404d2006-10-10 01:37:40 +0000338 while (state_ == STARTING) {
339 monitor_.wait();
Marc Slemkod466b212006-07-20 00:04:18 +0000340 }
341 }
342}
343
Mark Slee7c10eaf2007-03-01 02:45:10 +0000344void ThreadManager::Impl::stopImpl(bool join) {
Marc Slemkod466b212006-07-20 00:04:18 +0000345 bool doStop = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000346 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000347 return;
348 }
349
Mark Sleef5f2be42006-09-05 21:05:31 +0000350 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000351 Synchronized s(monitor_);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000352 if (state_ != ThreadManager::STOPPING &&
353 state_ != ThreadManager::JOINING &&
354 state_ != ThreadManager::STOPPED) {
Marc Slemkod466b212006-07-20 00:04:18 +0000355 doStop = true;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000356 state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
Marc Slemkod466b212006-07-20 00:04:18 +0000357 }
358 }
359
Mark Sleef5f2be42006-09-05 21:05:31 +0000360 if (doStop) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000361 removeWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000362 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000363
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000364 // XXX
Mark Sleef5f2be42006-09-05 21:05:31 +0000365 // should be able to block here for transition to STOPPED since we're no
366 // using shared_ptrs
Mark Slee7c10eaf2007-03-01 02:45:10 +0000367
368 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000369 Synchronized s(monitor_);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000370 state_ = ThreadManager::STOPPED;
371 }
372
Marc Slemkod466b212006-07-20 00:04:18 +0000373}
Mark Slee7c10eaf2007-03-01 02:45:10 +0000374
Marc Slemkod466b212006-07-20 00:04:18 +0000375void ThreadManager::Impl::removeWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000376 std::set<shared_ptr<Thread> > removedThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000377 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000378 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000379 if (value > workerMaxCount_) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000380 throw InvalidArgumentException();
381 }
382
Mark Slee2f6404d2006-10-10 01:37:40 +0000383 workerMaxCount_ -= value;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000384
Mark Slee2f6404d2006-10-10 01:37:40 +0000385 if (idleCount_ < value) {
386 for (size_t ix = 0; ix < idleCount_; ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000387 monitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000388 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000389 } else {
Mark Slee2f6404d2006-10-10 01:37:40 +0000390 monitor_.notifyAll();
Marc Slemko66949872006-07-15 01:52:39 +0000391 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000392 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000393
Mark Sleef5f2be42006-09-05 21:05:31 +0000394 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000395 Synchronized s(workerMonitor_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000396
Mark Slee2f6404d2006-10-10 01:37:40 +0000397 while (workerCount_ != workerMaxCount_) {
398 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000399 }
400
Mark Slee2f6404d2006-10-10 01:37:40 +0000401 for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
402 workers_.erase(*ix);
Marc Slemkoa6479032007-06-05 22:20:14 +0000403 idMap_.erase((*ix)->getId());
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000404 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000405
Mark Slee2f6404d2006-10-10 01:37:40 +0000406 deadWorkers_.clear();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000407 }
408}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000409
410 bool ThreadManager::Impl::canSleep() {
Marc Slemkoa6479032007-06-05 22:20:14 +0000411 const Thread::id_t id = threadFactory_->getCurrentThreadId();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000412 return idMap_.find(id) == idMap_.end();
413 }
414
Mark Slee9b82d272007-05-23 05:16:07 +0000415 void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000416 Synchronized s(monitor_);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000417
Mark Slee2f6404d2006-10-10 01:37:40 +0000418 if (state_ != ThreadManager::STARTED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000419 throw IllegalStateException();
420 }
Marc Slemkod466b212006-07-20 00:04:18 +0000421
Mark Slee2782d6d2007-05-23 04:55:30 +0000422 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
Aditya Agarwal4b6ff2d2007-12-25 22:58:50 +0000423 if (canSleep() && timeout >= 0) {
Mark Slee2782d6d2007-05-23 04:55:30 +0000424 while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000425 monitor_.wait(timeout);
426 }
427 } else {
428 throw TooManyPendingTasksException();
429 }
430 }
431
Mark Slee2f6404d2006-10-10 01:37:40 +0000432 tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000433
Mark Sleef5f2be42006-09-05 21:05:31 +0000434 // If idle thread is available notify it, otherwise all worker threads are
435 // running and will get around to this task in time.
Mark Slee2f6404d2006-10-10 01:37:40 +0000436 if (idleCount_ > 0) {
437 monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000438 }
Marc Slemko66949872006-07-15 01:52:39 +0000439 }
440
Marc Slemko6f038a72006-08-03 18:58:09 +0000441void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000442 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000443 if (state_ != ThreadManager::STARTED) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000444 throw IllegalStateException();
445 }
Marc Slemko66949872006-07-15 01:52:39 +0000446}
447
Marc Slemkod466b212006-07-20 00:04:18 +0000448class SimpleThreadManager : public ThreadManager::Impl {
Marc Slemko66949872006-07-15 01:52:39 +0000449
Mark Slee2782d6d2007-05-23 04:55:30 +0000450 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000451 SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000452 workerCount_(workerCount),
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000453 pendingTaskCountMax_(pendingTaskCountMax),
Mark Slee2f6404d2006-10-10 01:37:40 +0000454 firstTime_(true) {
Marc Slemkod466b212006-07-20 00:04:18 +0000455 }
Marc Slemko66949872006-07-15 01:52:39 +0000456
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000457 void start() {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000458 ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000459 ThreadManager::Impl::start();
Mark Slee2f6404d2006-10-10 01:37:40 +0000460 addWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000461 }
462
Mark Slee2782d6d2007-05-23 04:55:30 +0000463 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000464 const size_t workerCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000465 const size_t pendingTaskCountMax_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000466 bool firstTime_;
467 Monitor monitor_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000468};
Marc Slemko66949872006-07-15 01:52:39 +0000469
Marc Slemko66949872006-07-15 01:52:39 +0000470
Marc Slemko6f038a72006-08-03 18:58:09 +0000471shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
472 return shared_ptr<ThreadManager>(new ThreadManager::Impl());
Marc Slemkod466b212006-07-20 00:04:18 +0000473}
Marc Slemko66949872006-07-15 01:52:39 +0000474
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000475shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) {
476 return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
Marc Slemkod466b212006-07-20 00:04:18 +0000477}
Marc Slemko66949872006-07-15 01:52:39 +0000478
T Jake Lucianib5e62212009-01-31 22:36:20 +0000479}}} // apache::thrift::concurrency
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000480