blob: 1631541de454d5c642c1b9f8cd61892f2e3dff5f [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko66949872006-07-15 01:52:39 +00007#include "ThreadManager.h"
Marc Slemkod466b212006-07-20 00:04:18 +00008#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +00009#include "Monitor.h"
Marc Slemko66949872006-07-15 01:52:39 +000010
Marc Slemko6f038a72006-08-03 18:58:09 +000011#include <boost/shared_ptr.hpp>
12
Marc Slemko66949872006-07-15 01:52:39 +000013#include <assert.h>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000014#include <queue>
15#include <set>
Marc Slemko66949872006-07-15 01:52:39 +000016
Marc Slemko6f038a72006-08-03 18:58:09 +000017#if defined(DEBUG)
18#include <iostream>
19#endif //defined(DEBUG)
20
Marc Slemko66949872006-07-15 01:52:39 +000021namespace facebook { namespace thrift { namespace concurrency {
22
Marc Slemko6f038a72006-08-03 18:58:09 +000023using namespace boost;
24
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000025
Mark Sleef5f2be42006-09-05 21:05:31 +000026/**
27 * ThreadManager class
28 *
29 * This class manages a pool of threads. It uses a ThreadFactory to create
30 * threads. It never actually creates or destroys worker threads, rather
31 * it maintains statistics on number of idle threads, number of active threads,
32 * task backlog, and average wait and service times.
33 *
34 * @author marc
35 * @version $Id:$
36 */
Marc Slemko0e53ccd2006-07-17 23:51:05 +000037class ThreadManager::Impl : public ThreadManager {
38
39 public:
Marc Slemko6f038a72006-08-03 18:58:09 +000040 Impl() :
Mark Slee2f6404d2006-10-10 01:37:40 +000041 workerCount_(0),
42 workerMaxCount_(0),
43 idleCount_(0),
44 state_(ThreadManager::UNINITIALIZED) {}
Marc Slemkod466b212006-07-20 00:04:18 +000045
Mark Sleef5f2be42006-09-05 21:05:31 +000046 ~Impl() { stop(); }
Marc Slemkod466b212006-07-20 00:04:18 +000047
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000048 void start();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000049
Mark Slee7c10eaf2007-03-01 02:45:10 +000050 void stop() { stopImpl(false); }
51
52 void join() { stopImpl(true); }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000053
Mark Slee2f6404d2006-10-10 01:37:40 +000054 const ThreadManager::STATE state() const {
55 return state_;
56 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000057
Marc Slemko6f038a72006-08-03 18:58:09 +000058 shared_ptr<ThreadFactory> threadFactory() const {
Mark Slee2f6404d2006-10-10 01:37:40 +000059 Synchronized s(monitor_);
60 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000061 }
62
Mark Sleef5f2be42006-09-05 21:05:31 +000063 void threadFactory(shared_ptr<ThreadFactory> value) {
Mark Slee2f6404d2006-10-10 01:37:40 +000064 Synchronized s(monitor_);
65 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000066 }
67
Marc Slemkod466b212006-07-20 00:04:18 +000068 void addWorker(size_t value);
Marc Slemko0e53ccd2006-07-17 23:51:05 +000069
Marc Slemkod466b212006-07-20 00:04:18 +000070 void removeWorker(size_t value);
Marc Slemko0e53ccd2006-07-17 23:51:05 +000071
Mark Slee2f6404d2006-10-10 01:37:40 +000072 size_t idleWorkerCount() const {
73 return idleCount_;
74 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000075
76 size_t workerCount() const {
Mark Slee2f6404d2006-10-10 01:37:40 +000077 Synchronized s(monitor_);
78 return workerCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000079 }
80
81 size_t pendingTaskCount() const {
Mark Slee2f6404d2006-10-10 01:37:40 +000082 Synchronized s(monitor_);
83 return tasks_.size();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000084 }
85
86 size_t totalTaskCount() const {
Mark Slee2f6404d2006-10-10 01:37:40 +000087 Synchronized s(monitor_);
88 return tasks_.size() + workerCount_ - idleCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000089 }
90
Marc Slemko6f038a72006-08-03 18:58:09 +000091 void add(shared_ptr<Runnable> value);
Marc Slemko0e53ccd2006-07-17 23:51:05 +000092
Marc Slemko6f038a72006-08-03 18:58:09 +000093 void remove(shared_ptr<Runnable> task);
Marc Slemko0e53ccd2006-07-17 23:51:05 +000094
95private:
Mark Slee7c10eaf2007-03-01 02:45:10 +000096 void stopImpl(bool join);
97
Mark Slee2f6404d2006-10-10 01:37:40 +000098 size_t workerCount_;
99 size_t workerMaxCount_;
100 size_t idleCount_;
101 ThreadManager::STATE state_;
102 shared_ptr<ThreadFactory> threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000103
Mark Sleef5f2be42006-09-05 21:05:31 +0000104
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000105 friend class ThreadManager::Task;
Mark Slee2f6404d2006-10-10 01:37:40 +0000106 std::queue<shared_ptr<Task> > tasks_;
107 Monitor monitor_;
108 Monitor workerMonitor_;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000109
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000110 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000111 std::set<shared_ptr<Thread> > workers_;
112 std::set<shared_ptr<Thread> > deadWorkers_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000113};
Marc Slemko66949872006-07-15 01:52:39 +0000114
115class ThreadManager::Task : public Runnable {
116
Mark Sleef5f2be42006-09-05 21:05:31 +0000117 public:
Marc Slemko66949872006-07-15 01:52:39 +0000118 enum STATE {
119 WAITING,
120 EXECUTING,
121 CANCELLED,
122 COMPLETE
123 };
124
Marc Slemko6f038a72006-08-03 18:58:09 +0000125 Task(shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000126 runnable_(runnable),
127 state_(WAITING) {}
Marc Slemko66949872006-07-15 01:52:39 +0000128
Mark Sleef5f2be42006-09-05 21:05:31 +0000129 ~Task() {}
Marc Slemko66949872006-07-15 01:52:39 +0000130
131 void run() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000132 if (state_ == EXECUTING) {
133 runnable_->run();
134 state_ = COMPLETE;
Marc Slemko66949872006-07-15 01:52:39 +0000135 }
136 }
137
138 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000139 shared_ptr<Runnable> runnable_;
Marc Slemkod466b212006-07-20 00:04:18 +0000140 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000141 STATE state_;
Marc Slemko66949872006-07-15 01:52:39 +0000142};
143
144class ThreadManager::Worker: public Runnable {
Marc Slemko66949872006-07-15 01:52:39 +0000145 enum STATE {
146 UNINITIALIZED,
147 STARTING,
148 STARTED,
149 STOPPING,
150 STOPPED
151 };
152
153 public:
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000154 Worker(ThreadManager::Impl* manager) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000155 manager_(manager),
156 state_(UNINITIALIZED),
157 idle_(false) {}
Marc Slemko66949872006-07-15 01:52:39 +0000158
159 ~Worker() {}
160
Mark Slee7c10eaf2007-03-01 02:45:10 +0000161 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000162 bool isActive() const {
Mark Slee7c10eaf2007-03-01 02:45:10 +0000163 return
164 (manager_->workerCount_ <= manager_->workerMaxCount_) ||
165 (manager_->state_ == JOINING && !manager_->tasks_.empty());
Mark Slee2f6404d2006-10-10 01:37:40 +0000166 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000167
Mark Slee7c10eaf2007-03-01 02:45:10 +0000168 public:
Mark Sleef5f2be42006-09-05 21:05:31 +0000169 /**
170 * Worker entry point
171 *
172 * As long as worker thread is running, pull tasks off the task queue and
173 * execute.
174 */
Marc Slemko66949872006-07-15 01:52:39 +0000175 void run() {
Mark Sleef5f2be42006-09-05 21:05:31 +0000176 bool active = false;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000177 bool notifyManager = false;
178
Mark Sleef5f2be42006-09-05 21:05:31 +0000179 /**
180 * Increment worker semaphore and notify manager if worker count reached
181 * desired max
182 *
183 * Note: We have to release the monitor and acquire the workerMonitor
184 * since that is what the manager blocks on for worker add/remove
185 */
186 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000187 Synchronized s(manager_->monitor_);
188 active = manager_->workerCount_ < manager_->workerMaxCount_;
Mark Sleef5f2be42006-09-05 21:05:31 +0000189 if (active) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000190 manager_->workerCount_++;
191 notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
Marc Slemko66949872006-07-15 01:52:39 +0000192 }
193 }
194
Mark Sleef5f2be42006-09-05 21:05:31 +0000195 if (notifyManager) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000196 Synchronized s(manager_->workerMonitor_);
197 manager_->workerMonitor_.notify();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000198 notifyManager = false;
199 }
200
Mark Sleef5f2be42006-09-05 21:05:31 +0000201 while (active) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000202 shared_ptr<ThreadManager::Task> task;
Marc Slemko66949872006-07-15 01:52:39 +0000203
Mark Sleef5f2be42006-09-05 21:05:31 +0000204 /**
205 * While holding manager monitor block for non-empty task queue (Also
206 * check that the thread hasn't been requested to stop). Once the queue
207 * is non-empty, dequeue a task, release monitor, and execute. If the
208 * worker max count has been decremented such that we exceed it, mark
209 * ourself inactive, decrement the worker count and notify the manager
210 * (technically we're notifying the next blocked thread but eventually
211 * the manager will see it.
212 */
213 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000214 Synchronized s(manager_->monitor_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000215 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000216
Mark Slee2f6404d2006-10-10 01:37:40 +0000217 while (active && manager_->tasks_.empty()) {
218 manager_->idleCount_++;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000219 idle_ = true;
Mark Slee2f6404d2006-10-10 01:37:40 +0000220 manager_->monitor_.wait();
Mark Sleef5f2be42006-09-05 21:05:31 +0000221 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000222 idle_ = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000223 manager_->idleCount_--;
Marc Slemko66949872006-07-15 01:52:39 +0000224 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000225
Mark Sleef5f2be42006-09-05 21:05:31 +0000226 if (active) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000227 if (!manager_->tasks_.empty()) {
228 task = manager_->tasks_.front();
229 manager_->tasks_.pop();
230 if (task->state_ == ThreadManager::Task::WAITING) {
231 task->state_ = ThreadManager::Task::EXECUTING;
Marc Slemkod466b212006-07-20 00:04:18 +0000232 }
233 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000234 } else {
Mark Slee7c10eaf2007-03-01 02:45:10 +0000235 idle_ = true;
Mark Slee2f6404d2006-10-10 01:37:40 +0000236 manager_->workerCount_--;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000237 notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
Marc Slemko66949872006-07-15 01:52:39 +0000238 }
239 }
Mark Sleef5f2be42006-09-05 21:05:31 +0000240
241 if (task != NULL) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000242 if (task->state_ == ThreadManager::Task::EXECUTING) {
Marc Slemkod466b212006-07-20 00:04:18 +0000243 try {
Mark Sleef5f2be42006-09-05 21:05:31 +0000244 task->run();
245 } catch(...) {
246 // XXX need to log this
Marc Slemkod466b212006-07-20 00:04:18 +0000247 }
Marc Slemkod466b212006-07-20 00:04:18 +0000248 }
Marc Slemko66949872006-07-15 01:52:39 +0000249 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000250 }
Mark Sleef5f2be42006-09-05 21:05:31 +0000251
252 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000253 Synchronized s(manager_->workerMonitor_);
254 manager_->deadWorkers_.insert(this->thread());
Mark Sleef5f2be42006-09-05 21:05:31 +0000255 if (notifyManager) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000256 manager_->workerMonitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000257 }
258 }
Mark Sleef5f2be42006-09-05 21:05:31 +0000259
Marc Slemko66949872006-07-15 01:52:39 +0000260 return;
261 }
Mark Sleef5f2be42006-09-05 21:05:31 +0000262
263 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000264 ThreadManager::Impl* manager_;
Mark Sleef5f2be42006-09-05 21:05:31 +0000265 friend class ThreadManager::Impl;
Mark Slee2f6404d2006-10-10 01:37:40 +0000266 STATE state_;
267 bool idle_;
Marc Slemko66949872006-07-15 01:52:39 +0000268};
269
Mark Sleef5f2be42006-09-05 21:05:31 +0000270
271 void ThreadManager::Impl::addWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000272 std::set<shared_ptr<Thread> > newThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000273 for (size_t ix = 0; ix < value; ix++) {
274 class ThreadManager::Worker;
Marc Slemko6f038a72006-08-03 18:58:09 +0000275 shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
Mark Slee2f6404d2006-10-10 01:37:40 +0000276 newThreads.insert(threadFactory_->newThread(worker));
Marc Slemko66949872006-07-15 01:52:39 +0000277 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000278
Mark Sleef5f2be42006-09-05 21:05:31 +0000279 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000280 Synchronized s(monitor_);
281 workerMaxCount_ += value;
282 workers_.insert(newThreads.begin(), newThreads.end());
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000283 }
Marc Slemko66949872006-07-15 01:52:39 +0000284
Mark Sleef5f2be42006-09-05 21:05:31 +0000285 for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000286 shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
Mark Slee2f6404d2006-10-10 01:37:40 +0000287 worker->state_ = ThreadManager::Worker::STARTING;
Marc Slemkod466b212006-07-20 00:04:18 +0000288 (*ix)->start();
289 }
290
Mark Sleef5f2be42006-09-05 21:05:31 +0000291 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000292 Synchronized s(workerMonitor_);
293 while (workerCount_ != workerMaxCount_) {
294 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000295 }
296 }
297}
Marc Slemkod466b212006-07-20 00:04:18 +0000298
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000299void ThreadManager::Impl::start() {
300
Mark Slee2f6404d2006-10-10 01:37:40 +0000301 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000302 return;
303 }
304
Mark Sleef5f2be42006-09-05 21:05:31 +0000305 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000306 Synchronized s(monitor_);
307 if (state_ == ThreadManager::UNINITIALIZED) {
308 if (threadFactory_ == NULL) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000309 throw InvalidArgumentException();
310 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000311 state_ = ThreadManager::STARTED;
312 monitor_.notifyAll();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000313 }
314
Mark Slee2f6404d2006-10-10 01:37:40 +0000315 while (state_ == STARTING) {
316 monitor_.wait();
Marc Slemkod466b212006-07-20 00:04:18 +0000317 }
318 }
319}
320
Mark Slee7c10eaf2007-03-01 02:45:10 +0000321void ThreadManager::Impl::stopImpl(bool join) {
Marc Slemkod466b212006-07-20 00:04:18 +0000322 bool doStop = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000323 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000324 return;
325 }
326
Mark Sleef5f2be42006-09-05 21:05:31 +0000327 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000328 Synchronized s(monitor_);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000329 if (state_ != ThreadManager::STOPPING &&
330 state_ != ThreadManager::JOINING &&
331 state_ != ThreadManager::STOPPED) {
Marc Slemkod466b212006-07-20 00:04:18 +0000332 doStop = true;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000333 state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
Marc Slemkod466b212006-07-20 00:04:18 +0000334 }
335 }
336
Mark Sleef5f2be42006-09-05 21:05:31 +0000337 if (doStop) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000338 removeWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000339 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000340
Marc Slemko6f038a72006-08-03 18:58:09 +0000341 // XXX
Mark Sleef5f2be42006-09-05 21:05:31 +0000342 // should be able to block here for transition to STOPPED since we're no
343 // using shared_ptrs
Mark Slee7c10eaf2007-03-01 02:45:10 +0000344
345 {
346 Synchronized s(monitor_);
347 state_ = ThreadManager::STOPPED;
348 }
349
Marc Slemkod466b212006-07-20 00:04:18 +0000350}
Mark Slee7c10eaf2007-03-01 02:45:10 +0000351
Marc Slemkod466b212006-07-20 00:04:18 +0000352void ThreadManager::Impl::removeWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000353 std::set<shared_ptr<Thread> > removedThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000354 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000355 Synchronized s(monitor_);
356 if (value > workerMaxCount_) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000357 throw InvalidArgumentException();
358 }
359
Mark Slee2f6404d2006-10-10 01:37:40 +0000360 workerMaxCount_ -= value;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000361
Mark Slee2f6404d2006-10-10 01:37:40 +0000362 if (idleCount_ < value) {
363 for (size_t ix = 0; ix < idleCount_; ix++) {
364 monitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000365 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000366 } else {
Mark Slee2f6404d2006-10-10 01:37:40 +0000367 monitor_.notifyAll();
Marc Slemko66949872006-07-15 01:52:39 +0000368 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000369 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000370
Mark Sleef5f2be42006-09-05 21:05:31 +0000371 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000372 Synchronized s(workerMonitor_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000373
Mark Slee2f6404d2006-10-10 01:37:40 +0000374 while (workerCount_ != workerMaxCount_) {
375 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000376 }
377
Mark Slee2f6404d2006-10-10 01:37:40 +0000378 for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
379 workers_.erase(*ix);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000380 }
Mark Sleef5f2be42006-09-05 21:05:31 +0000381
Mark Slee2f6404d2006-10-10 01:37:40 +0000382 deadWorkers_.clear();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000383 }
384}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000385
Marc Slemko6f038a72006-08-03 18:58:09 +0000386void ThreadManager::Impl::add(shared_ptr<Runnable> value) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000387 Synchronized s(monitor_);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000388
Mark Slee2f6404d2006-10-10 01:37:40 +0000389 if (state_ != ThreadManager::STARTED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000390 throw IllegalStateException();
391 }
Marc Slemkod466b212006-07-20 00:04:18 +0000392
Mark Slee2f6404d2006-10-10 01:37:40 +0000393 tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000394
Mark Sleef5f2be42006-09-05 21:05:31 +0000395 // If idle thread is available notify it, otherwise all worker threads are
396 // running and will get around to this task in time.
Mark Slee2f6404d2006-10-10 01:37:40 +0000397 if (idleCount_ > 0) {
398 monitor_.notify();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000399 }
Marc Slemko66949872006-07-15 01:52:39 +0000400 }
401
Marc Slemko6f038a72006-08-03 18:58:09 +0000402void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000403 Synchronized s(monitor_);
404 if (state_ != ThreadManager::STARTED) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000405 throw IllegalStateException();
406 }
Marc Slemko66949872006-07-15 01:52:39 +0000407}
408
Marc Slemkod466b212006-07-20 00:04:18 +0000409class SimpleThreadManager : public ThreadManager::Impl {
Marc Slemko66949872006-07-15 01:52:39 +0000410
Marc Slemkod466b212006-07-20 00:04:18 +0000411public:
Marc Slemkod466b212006-07-20 00:04:18 +0000412 SimpleThreadManager(size_t workerCount=4) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000413 workerCount_(workerCount),
414 firstTime_(true) {
Marc Slemkod466b212006-07-20 00:04:18 +0000415 }
Marc Slemko66949872006-07-15 01:52:39 +0000416
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000417 void start() {
418 ThreadManager::Impl::start();
Mark Slee2f6404d2006-10-10 01:37:40 +0000419 addWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000420 }
421
422private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000423 const size_t workerCount_;
424 bool firstTime_;
425 Monitor monitor_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000426};
Marc Slemko66949872006-07-15 01:52:39 +0000427
Marc Slemko66949872006-07-15 01:52:39 +0000428
Marc Slemko6f038a72006-08-03 18:58:09 +0000429shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
430 return shared_ptr<ThreadManager>(new ThreadManager::Impl());
Marc Slemkod466b212006-07-20 00:04:18 +0000431}
Marc Slemko66949872006-07-15 01:52:39 +0000432
Marc Slemko6f038a72006-08-03 18:58:09 +0000433shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count) {
434 return shared_ptr<ThreadManager>(new SimpleThreadManager(count));
Marc Slemkod466b212006-07-20 00:04:18 +0000435}
Marc Slemko66949872006-07-15 01:52:39 +0000436
437}}} // facebook::thrift::concurrency