blob: 604602e04aa12242839f24ea57973ec4d49f6b63 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko66949872006-07-15 01:52:39 +00007#include "ThreadManager.h"
Marc Slemkod466b212006-07-20 00:04:18 +00008#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +00009#include "Monitor.h"
Marc Slemko66949872006-07-15 01:52:39 +000010
Marc Slemko6f038a72006-08-03 18:58:09 +000011#include <boost/shared_ptr.hpp>
12
Marc Slemko66949872006-07-15 01:52:39 +000013#include <assert.h>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000014#include <queue>
15#include <set>
Marc Slemko66949872006-07-15 01:52:39 +000016
Marc Slemko6f038a72006-08-03 18:58:09 +000017#if defined(DEBUG)
18#include <iostream>
19#endif //defined(DEBUG)
20
Marc Slemko3a3b53b2007-05-22 23:59:54 +000021namespace facebook { namespace thrift { namespace concurrency {
Marc Slemko66949872006-07-15 01:52:39 +000022
David Reissd4a269c2007-08-23 02:37:19 +000023using boost::shared_ptr;
24using boost::dynamic_pointer_cast;
Marc Slemko6f038a72006-08-03 18:58:09 +000025
Mark Sleef5f2be42006-09-05 21:05:31 +000026/**
27 * ThreadManager class
Marc Slemko3a3b53b2007-05-22 23:59:54 +000028 *
Mark Sleef5f2be42006-09-05 21:05:31 +000029 * This class manages a pool of threads. It uses a ThreadFactory to create
30 * threads. It never actually creates or destroys worker threads, rather
31 * it maintains statistics on number of idle threads, number of active threads,
32 * task backlog, and average wait and service times.
33 *
34 * @author marc
35 * @version $Id:$
36 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000037class ThreadManager::Impl : public ThreadManager {
38
39 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000040 Impl() :
Mark Slee2f6404d2006-10-10 01:37:40 +000041 workerCount_(0),
42 workerMaxCount_(0),
43 idleCount_(0),
Marc Slemko3a3b53b2007-05-22 23:59:54 +000044 pendingTaskCountMax_(0),
Mark Slee2f6404d2006-10-10 01:37:40 +000045 state_(ThreadManager::UNINITIALIZED) {}
Marc Slemkod466b212006-07-20 00:04:18 +000046
Mark Sleef5f2be42006-09-05 21:05:31 +000047 ~Impl() { stop(); }
Marc Slemkod466b212006-07-20 00:04:18 +000048
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000049 void start();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000050
Mark Slee7c10eaf2007-03-01 02:45:10 +000051 void stop() { stopImpl(false); }
52
53 void join() { stopImpl(true); }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000054
Mark Slee2f6404d2006-10-10 01:37:40 +000055 const ThreadManager::STATE state() const {
56 return state_;
57 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000058
Marc Slemko6f038a72006-08-03 18:58:09 +000059 shared_ptr<ThreadFactory> threadFactory() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000060 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000061 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000062 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000063
64 void threadFactory(shared_ptr<ThreadFactory> value) {
Mark Slee2f6404d2006-10-10 01:37:40 +000065 Synchronized s(monitor_);
66 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000067 }
68
Marc Slemkod466b212006-07-20 00:04:18 +000069 void addWorker(size_t value);
Marc Slemko3a3b53b2007-05-22 23:59:54 +000070
Marc Slemkod466b212006-07-20 00:04:18 +000071 void removeWorker(size_t value);
Marc Slemko3a3b53b2007-05-22 23:59:54 +000072
Mark Slee2f6404d2006-10-10 01:37:40 +000073 size_t idleWorkerCount() const {
74 return idleCount_;
75 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000076
77 size_t workerCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000078 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000079 return workerCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000080 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000081
Marc Slemko0e53ccd2006-07-17 23:51:05 +000082 size_t pendingTaskCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000083 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000084 return tasks_.size();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000085 }
86
87 size_t totalTaskCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000088 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000089 return tasks_.size() + workerCount_ - idleCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000090 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000091
92 size_t pendingTaskCountMax() const {
93 Synchronized s(monitor_);
94 return pendingTaskCountMax_;
95 }
96
97 void pendingTaskCountMax(const size_t value) {
98 Synchronized s(monitor_);
99 pendingTaskCountMax_ = value;
100 }
101
102 bool canSleep();
103
Mark Slee9b82d272007-05-23 05:16:07 +0000104 void add(shared_ptr<Runnable> value, int64_t timeout);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000105
Marc Slemko6f038a72006-08-03 18:58:09 +0000106 void remove(shared_ptr<Runnable> task);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000107
108private:
Mark Slee7c10eaf2007-03-01 02:45:10 +0000109 void stopImpl(bool join);
110
Mark Slee2f6404d2006-10-10 01:37:40 +0000111 size_t workerCount_;
112 size_t workerMaxCount_;
113 size_t idleCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000114 size_t pendingTaskCountMax_;
115
Mark Slee2f6404d2006-10-10 01:37:40 +0000116 ThreadManager::STATE state_;
117 shared_ptr<ThreadFactory> threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000118
Mark Sleef5f2be42006-09-05 21:05:31 +0000119
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000120 friend class ThreadManager::Task;
Mark Slee2f6404d2006-10-10 01:37:40 +0000121 std::queue<shared_ptr<Task> > tasks_;
122 Monitor monitor_;
123 Monitor workerMonitor_;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000124
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000125 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000126 std::set<shared_ptr<Thread> > workers_;
127 std::set<shared_ptr<Thread> > deadWorkers_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000128 std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000129};
Marc Slemko66949872006-07-15 01:52:39 +0000130
131class ThreadManager::Task : public Runnable {
132
Mark Sleef5f2be42006-09-05 21:05:31 +0000133 public:
Marc Slemko66949872006-07-15 01:52:39 +0000134 enum STATE {
135 WAITING,
136 EXECUTING,
137 CANCELLED,
138 COMPLETE
139 };
140
Marc Slemko6f038a72006-08-03 18:58:09 +0000141 Task(shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000142 runnable_(runnable),
143 state_(WAITING) {}
Marc Slemko66949872006-07-15 01:52:39 +0000144
Mark Sleef5f2be42006-09-05 21:05:31 +0000145 ~Task() {}
Marc Slemko66949872006-07-15 01:52:39 +0000146
147 void run() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000148 if (state_ == EXECUTING) {
149 runnable_->run();
150 state_ = COMPLETE;
Marc Slemko66949872006-07-15 01:52:39 +0000151 }
152 }
153
154 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000155 shared_ptr<Runnable> runnable_;
Marc Slemkod466b212006-07-20 00:04:18 +0000156 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000157 STATE state_;
Marc Slemko66949872006-07-15 01:52:39 +0000158};
159
160class ThreadManager::Worker: public Runnable {
Marc Slemko66949872006-07-15 01:52:39 +0000161 enum STATE {
162 UNINITIALIZED,
163 STARTING,
164 STARTED,
165 STOPPING,
166 STOPPED
167 };
168
169 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000170 Worker(ThreadManager::Impl* manager) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000171 manager_(manager),
172 state_(UNINITIALIZED),
173 idle_(false) {}
Marc Slemko66949872006-07-15 01:52:39 +0000174
175 ~Worker() {}
176
Mark Slee7c10eaf2007-03-01 02:45:10 +0000177 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000178 bool isActive() const {
Mark Slee7c10eaf2007-03-01 02:45:10 +0000179 return
180 (manager_->workerCount_ <= manager_->workerMaxCount_) ||
181 (manager_->state_ == JOINING && !manager_->tasks_.empty());
Mark Slee2f6404d2006-10-10 01:37:40 +0000182 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000183
Mark Slee7c10eaf2007-03-01 02:45:10 +0000184 public:
Mark Sleef5f2be42006-09-05 21:05:31 +0000185 /**
186 * Worker entry point
187 *
188 * As long as worker thread is running, pull tasks off the task queue and
189 * execute.
190 */
Marc Slemko66949872006-07-15 01:52:39 +0000191 void run() {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000192 bool active = false;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000193 bool notifyManager = false;
194
Mark Sleef5f2be42006-09-05 21:05:31 +0000195 /**
196 * Increment worker semaphore and notify manager if worker count reached
197 * desired max
198 *
199 * Note: We have to release the monitor and acquire the workerMonitor
200 * since that is what the manager blocks on for worker add/remove
201 */
202 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000203 Synchronized s(manager_->monitor_);
204 active = manager_->workerCount_ < manager_->workerMaxCount_;
Mark Sleef5f2be42006-09-05 21:05:31 +0000205 if (active) {
David Reiss96d23882007-07-26 21:10:32 +0000206 manager_->workerCount_++;
207 notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
Marc Slemko66949872006-07-15 01:52:39 +0000208 }
209 }
210
Mark Sleef5f2be42006-09-05 21:05:31 +0000211 if (notifyManager) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000212 Synchronized s(manager_->workerMonitor_);
213 manager_->workerMonitor_.notify();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000214 notifyManager = false;
215 }
216
Mark Sleef5f2be42006-09-05 21:05:31 +0000217 while (active) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000218 shared_ptr<ThreadManager::Task> task;
Marc Slemko66949872006-07-15 01:52:39 +0000219
Mark Sleef5f2be42006-09-05 21:05:31 +0000220 /**
221 * While holding manager monitor block for non-empty task queue (Also
222 * check that the thread hasn't been requested to stop). Once the queue
223 * is non-empty, dequeue a task, release monitor, and execute. If the
224 * worker max count has been decremented such that we exceed it, mark
225 * ourself inactive, decrement the worker count and notify the manager
226 * (technically we're notifying the next blocked thread but eventually
227 * the manager will see it.
228 */
229 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000230 Synchronized s(manager_->monitor_);
David Reiss96d23882007-07-26 21:10:32 +0000231 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000232
David Reiss96d23882007-07-26 21:10:32 +0000233 while (active && manager_->tasks_.empty()) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000234 manager_->idleCount_++;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000235 idle_ = true;
Mark Slee2f6404d2006-10-10 01:37:40 +0000236 manager_->monitor_.wait();
Mark Sleef5f2be42006-09-05 21:05:31 +0000237 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000238 idle_ = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000239 manager_->idleCount_--;
David Reiss96d23882007-07-26 21:10:32 +0000240 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000241
David Reiss96d23882007-07-26 21:10:32 +0000242 if (active) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000243 if (!manager_->tasks_.empty()) {
244 task = manager_->tasks_.front();
245 manager_->tasks_.pop();
246 if (task->state_ == ThreadManager::Task::WAITING) {
247 task->state_ = ThreadManager::Task::EXECUTING;
David Reiss96d23882007-07-26 21:10:32 +0000248 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000249
250 /* If we have a pending task max and we just dropped below it, wakeup any
251 thread that might be blocked on add. */
Mark Slee2782d6d2007-05-23 04:55:30 +0000252 if (manager_->pendingTaskCountMax_ != 0 &&
253 manager_->tasks_.size() == manager_->pendingTaskCountMax_ - 1) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000254 manager_->workerMonitor_.notify();
255 }
David Reiss96d23882007-07-26 21:10:32 +0000256 }
257 } else {
258 idle_ = true;
259 manager_->workerCount_--;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000260 notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
David Reiss96d23882007-07-26 21:10:32 +0000261 }
Marc Slemko66949872006-07-15 01:52:39 +0000262 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000263
Mark Sleef5f2be42006-09-05 21:05:31 +0000264 if (task != NULL) {
David Reiss96d23882007-07-26 21:10:32 +0000265 if (task->state_ == ThreadManager::Task::EXECUTING) {
266 try {
Mark Sleef5f2be42006-09-05 21:05:31 +0000267 task->run();
268 } catch(...) {
269 // XXX need to log this
David Reiss96d23882007-07-26 21:10:32 +0000270 }
271 }
Marc Slemko66949872006-07-15 01:52:39 +0000272 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000273 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000274
Mark Sleef5f2be42006-09-05 21:05:31 +0000275 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000276 Synchronized s(manager_->workerMonitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000277 manager_->deadWorkers_.insert(this->thread());
Mark Sleef5f2be42006-09-05 21:05:31 +0000278 if (notifyManager) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000279 manager_->workerMonitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000280 }
281 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000282
Marc Slemko66949872006-07-15 01:52:39 +0000283 return;
284 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000285
Mark Sleef5f2be42006-09-05 21:05:31 +0000286 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000287 ThreadManager::Impl* manager_;
Mark Sleef5f2be42006-09-05 21:05:31 +0000288 friend class ThreadManager::Impl;
Mark Slee2f6404d2006-10-10 01:37:40 +0000289 STATE state_;
290 bool idle_;
Marc Slemko66949872006-07-15 01:52:39 +0000291};
292
Mark Sleef5f2be42006-09-05 21:05:31 +0000293
294 void ThreadManager::Impl::addWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000295 std::set<shared_ptr<Thread> > newThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000296 for (size_t ix = 0; ix < value; ix++) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000297 class ThreadManager::Worker;
Marc Slemko6f038a72006-08-03 18:58:09 +0000298 shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
Mark Slee2f6404d2006-10-10 01:37:40 +0000299 newThreads.insert(threadFactory_->newThread(worker));
Marc Slemko66949872006-07-15 01:52:39 +0000300 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000301
Mark Sleef5f2be42006-09-05 21:05:31 +0000302 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000303 Synchronized s(monitor_);
304 workerMaxCount_ += value;
305 workers_.insert(newThreads.begin(), newThreads.end());
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000306 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000307
Mark Sleef5f2be42006-09-05 21:05:31 +0000308 for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000309 shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
Mark Slee2f6404d2006-10-10 01:37:40 +0000310 worker->state_ = ThreadManager::Worker::STARTING;
Marc Slemkod466b212006-07-20 00:04:18 +0000311 (*ix)->start();
Marc Slemkoa6479032007-06-05 22:20:14 +0000312 idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
Marc Slemkod466b212006-07-20 00:04:18 +0000313 }
314
Mark Sleef5f2be42006-09-05 21:05:31 +0000315 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000316 Synchronized s(workerMonitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000317 while (workerCount_ != workerMaxCount_) {
318 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000319 }
320 }
321}
Marc Slemkod466b212006-07-20 00:04:18 +0000322
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000323void ThreadManager::Impl::start() {
324
Mark Slee2f6404d2006-10-10 01:37:40 +0000325 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000326 return;
327 }
328
Mark Sleef5f2be42006-09-05 21:05:31 +0000329 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000330 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000331 if (state_ == ThreadManager::UNINITIALIZED) {
332 if (threadFactory_ == NULL) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000333 throw InvalidArgumentException();
334 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000335 state_ = ThreadManager::STARTED;
336 monitor_.notifyAll();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000337 }
338
Mark Slee2f6404d2006-10-10 01:37:40 +0000339 while (state_ == STARTING) {
340 monitor_.wait();
Marc Slemkod466b212006-07-20 00:04:18 +0000341 }
342 }
343}
344
Mark Slee7c10eaf2007-03-01 02:45:10 +0000345void ThreadManager::Impl::stopImpl(bool join) {
Marc Slemkod466b212006-07-20 00:04:18 +0000346 bool doStop = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000347 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000348 return;
349 }
350
Mark Sleef5f2be42006-09-05 21:05:31 +0000351 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000352 Synchronized s(monitor_);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000353 if (state_ != ThreadManager::STOPPING &&
354 state_ != ThreadManager::JOINING &&
355 state_ != ThreadManager::STOPPED) {
Marc Slemkod466b212006-07-20 00:04:18 +0000356 doStop = true;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000357 state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
Marc Slemkod466b212006-07-20 00:04:18 +0000358 }
359 }
360
Mark Sleef5f2be42006-09-05 21:05:31 +0000361 if (doStop) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000362 removeWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000363 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000364
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000365 // XXX
Mark Sleef5f2be42006-09-05 21:05:31 +0000366 // should be able to block here for transition to STOPPED since we're no
367 // using shared_ptrs
Mark Slee7c10eaf2007-03-01 02:45:10 +0000368
369 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000370 Synchronized s(monitor_);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000371 state_ = ThreadManager::STOPPED;
372 }
373
Marc Slemkod466b212006-07-20 00:04:18 +0000374}
Mark Slee7c10eaf2007-03-01 02:45:10 +0000375
Marc Slemkod466b212006-07-20 00:04:18 +0000376void ThreadManager::Impl::removeWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000377 std::set<shared_ptr<Thread> > removedThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000378 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000379 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000380 if (value > workerMaxCount_) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000381 throw InvalidArgumentException();
382 }
383
Mark Slee2f6404d2006-10-10 01:37:40 +0000384 workerMaxCount_ -= value;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000385
Mark Slee2f6404d2006-10-10 01:37:40 +0000386 if (idleCount_ < value) {
387 for (size_t ix = 0; ix < idleCount_; ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000388 monitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000389 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000390 } else {
Mark Slee2f6404d2006-10-10 01:37:40 +0000391 monitor_.notifyAll();
Marc Slemko66949872006-07-15 01:52:39 +0000392 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000393 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000394
Mark Sleef5f2be42006-09-05 21:05:31 +0000395 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000396 Synchronized s(workerMonitor_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000397
Mark Slee2f6404d2006-10-10 01:37:40 +0000398 while (workerCount_ != workerMaxCount_) {
399 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000400 }
401
Mark Slee2f6404d2006-10-10 01:37:40 +0000402 for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
403 workers_.erase(*ix);
Marc Slemkoa6479032007-06-05 22:20:14 +0000404 idMap_.erase((*ix)->getId());
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000405 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000406
Mark Slee2f6404d2006-10-10 01:37:40 +0000407 deadWorkers_.clear();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000408 }
409}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000410
411 bool ThreadManager::Impl::canSleep() {
Marc Slemkoa6479032007-06-05 22:20:14 +0000412 const Thread::id_t id = threadFactory_->getCurrentThreadId();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000413 return idMap_.find(id) == idMap_.end();
414 }
415
Mark Slee9b82d272007-05-23 05:16:07 +0000416 void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000417 Synchronized s(monitor_);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000418
Mark Slee2f6404d2006-10-10 01:37:40 +0000419 if (state_ != ThreadManager::STARTED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000420 throw IllegalStateException();
421 }
Marc Slemkod466b212006-07-20 00:04:18 +0000422
Mark Slee2782d6d2007-05-23 04:55:30 +0000423 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
Aditya Agarwal4b6ff2d2007-12-25 22:58:50 +0000424 if (canSleep() && timeout >= 0) {
Mark Slee2782d6d2007-05-23 04:55:30 +0000425 while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000426 monitor_.wait(timeout);
427 }
428 } else {
429 throw TooManyPendingTasksException();
430 }
431 }
432
Mark Slee2f6404d2006-10-10 01:37:40 +0000433 tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000434
Mark Sleef5f2be42006-09-05 21:05:31 +0000435 // If idle thread is available notify it, otherwise all worker threads are
436 // running and will get around to this task in time.
Mark Slee2f6404d2006-10-10 01:37:40 +0000437 if (idleCount_ > 0) {
438 monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000439 }
Marc Slemko66949872006-07-15 01:52:39 +0000440 }
441
Marc Slemko6f038a72006-08-03 18:58:09 +0000442void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000443 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000444 if (state_ != ThreadManager::STARTED) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000445 throw IllegalStateException();
446 }
Marc Slemko66949872006-07-15 01:52:39 +0000447}
448
Marc Slemkod466b212006-07-20 00:04:18 +0000449class SimpleThreadManager : public ThreadManager::Impl {
Marc Slemko66949872006-07-15 01:52:39 +0000450
Mark Slee2782d6d2007-05-23 04:55:30 +0000451 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000452 SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000453 workerCount_(workerCount),
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000454 pendingTaskCountMax_(pendingTaskCountMax),
Mark Slee2f6404d2006-10-10 01:37:40 +0000455 firstTime_(true) {
Marc Slemkod466b212006-07-20 00:04:18 +0000456 }
Marc Slemko66949872006-07-15 01:52:39 +0000457
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000458 void start() {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000459 ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000460 ThreadManager::Impl::start();
Mark Slee2f6404d2006-10-10 01:37:40 +0000461 addWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000462 }
463
Mark Slee2782d6d2007-05-23 04:55:30 +0000464 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000465 const size_t workerCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000466 const size_t pendingTaskCountMax_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000467 bool firstTime_;
468 Monitor monitor_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000469};
Marc Slemko66949872006-07-15 01:52:39 +0000470
Marc Slemko66949872006-07-15 01:52:39 +0000471
Marc Slemko6f038a72006-08-03 18:58:09 +0000472shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
473 return shared_ptr<ThreadManager>(new ThreadManager::Impl());
Marc Slemkod466b212006-07-20 00:04:18 +0000474}
Marc Slemko66949872006-07-15 01:52:39 +0000475
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000476shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) {
477 return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
Marc Slemkod466b212006-07-20 00:04:18 +0000478}
Marc Slemko66949872006-07-15 01:52:39 +0000479
480}}} // facebook::thrift::concurrency
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000481