blob: ca2bbb5f1ad27661a9f3c82829ef99989df638dc [file] [log] [blame]
Marc Slemko66949872006-07-15 01:52:39 +00001#include "ThreadManager.h"
Marc Slemkod466b212006-07-20 00:04:18 +00002#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +00003#include "Monitor.h"
Marc Slemko66949872006-07-15 01:52:39 +00004
5#include <assert.h>
Marc Slemko0e53ccd2006-07-17 23:51:05 +00006#include <queue>
7#include <set>
Marc Slemko66949872006-07-15 01:52:39 +00008
9namespace facebook { namespace thrift { namespace concurrency {
10
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000011
Marc Slemko66949872006-07-15 01:52:39 +000012/** ThreadManager class
13
14 This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather
Marc Slemkod466b212006-07-20 00:04:18 +000015 it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times.
Marc Slemko66949872006-07-15 01:52:39 +000016
17 @author marc
Marc Slemko0e53ccd2006-07-17 23:51:05 +000018 @version $Id:$ */
19
20class ThreadManager::Impl : public ThreadManager {
21
22 public:
23
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000024 Impl() : _state(ThreadManager::UNINITIALIZED) {}
Marc Slemkod466b212006-07-20 00:04:18 +000025
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000026 ~Impl() {stop();}
Marc Slemkod466b212006-07-20 00:04:18 +000027
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000028 void start();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000029
Marc Slemkod466b212006-07-20 00:04:18 +000030 void stop();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000031
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000032 const ThreadManager::STATE state() const {
33 return _state;
34 };
35
Marc Slemko0e53ccd2006-07-17 23:51:05 +000036 const ThreadFactory* threadFactory() const {
37
38 Synchronized s(_monitor);
39
40 return _threadFactory;
41 }
42
43 void threadFactory(const ThreadFactory* value) {
44
45 Synchronized s(_monitor);
46
47 _threadFactory = value;
48 }
49
Marc Slemkod466b212006-07-20 00:04:18 +000050 void addWorker(size_t value);
Marc Slemko0e53ccd2006-07-17 23:51:05 +000051
Marc Slemkod466b212006-07-20 00:04:18 +000052 void removeWorker(size_t value);
Marc Slemko0e53ccd2006-07-17 23:51:05 +000053
54 size_t idleWorkerCount() const {return _idleCount;}
55
56 size_t workerCount() const {
57
58 Synchronized s(_monitor);
59
Marc Slemkod466b212006-07-20 00:04:18 +000060 return _workerCount;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000061 }
62
63 size_t pendingTaskCount() const {
64
65 Synchronized s(_monitor);
66
67 return _tasks.size();
68 }
69
70 size_t totalTaskCount() const {
71
72 Synchronized s(_monitor);
73
Marc Slemkod466b212006-07-20 00:04:18 +000074 return _tasks.size() + _workerCount - _idleCount;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000075 }
76
77 void add(Runnable* value);
78
79 void remove(Runnable* task);
80
81private:
82
Marc Slemkod466b212006-07-20 00:04:18 +000083 size_t _workerCount;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000084
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000085 size_t _workerMaxCount;
86
Marc Slemko0e53ccd2006-07-17 23:51:05 +000087 size_t _idleCount;
88
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000089 ThreadManager::STATE _state;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000090
91 const ThreadFactory* _threadFactory;
92
93 friend class ThreadManager::Task;
94
95 std::queue<Task*> _tasks;
96
97 Monitor _monitor;
98
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000099 Monitor _workerMonitor;
100
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000101 friend class ThreadManager::Worker;
102
103 std::set<Thread*> _workers;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000104
105 std::set<Thread*> _deadWorkers;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000106};
Marc Slemko66949872006-07-15 01:52:39 +0000107
108class ThreadManager::Task : public Runnable {
109
110public:
111 enum STATE {
112 WAITING,
113 EXECUTING,
114 CANCELLED,
115 COMPLETE
116 };
117
118 Task(Runnable* runnable) :
119 _runnable(runnable),
120 _state(WAITING)
121 {}
122
123 ~Task() {};
124
125 void run() {
126 if(_state == EXECUTING) {
127 _runnable->run();
128 _state = COMPLETE;
129 }
130 }
131
132 private:
133
134 Runnable* _runnable;
Marc Slemkod466b212006-07-20 00:04:18 +0000135
136 friend class ThreadManager::Worker;
Marc Slemko66949872006-07-15 01:52:39 +0000137
138 STATE _state;
139};
140
141class ThreadManager::Worker: public Runnable {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000142
Marc Slemko66949872006-07-15 01:52:39 +0000143 enum STATE {
144 UNINITIALIZED,
145 STARTING,
146 STARTED,
147 STOPPING,
148 STOPPED
149 };
150
151 public:
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000152
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000153 Worker(ThreadManager::Impl* manager) :
Marc Slemko66949872006-07-15 01:52:39 +0000154 _manager(manager),
155 _state(UNINITIALIZED),
156 _idle(false)
157 {}
158
159 ~Worker() {}
160
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000161 bool isActive() const { return _manager->_workerCount <= _manager->_workerMaxCount;}
162
Marc Slemko66949872006-07-15 01:52:39 +0000163 /** Worker entry point
164
165 As long as worker thread is running, pull tasks off the task queue and execute. */
166
167 void run() {
168
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000169 bool active = false;
170
171 bool notifyManager = false;
172
173 /** Increment worker semaphore and notify manager if worker count reached desired max
174
175 Note
176 We have to release the monitor and acquire the workerMonitor since that is what the manager
177 blocks on for worker add/remove */
178
Marc Slemko8a40a762006-07-19 17:46:50 +0000179 {Synchronized s(_manager->_monitor);
Marc Slemko66949872006-07-15 01:52:39 +0000180
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000181 active = _manager->_workerCount < _manager->_workerMaxCount;
182
183 if(active) {
184
185 _manager->_workerCount++;
186
187 notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
Marc Slemko66949872006-07-15 01:52:39 +0000188 }
189 }
190
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000191 if(notifyManager) {
192
193 Synchronized s(_manager->_workerMonitor);
194
195 _manager->_workerMonitor.notify();
196
197 notifyManager = false;
198 }
199
200 while(active) {
Marc Slemko66949872006-07-15 01:52:39 +0000201
202 ThreadManager::Task* task = NULL;
203
204 /* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop).
205
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000206 Once the queue is non-empty, dequeue a task, release monitor, and execute. If the worker max count has been decremented
207 such that we exceed it, mark ourself inactive, decrement the worker count and notify the manager (technically we're notifying
208 the next blocked thread but eventually the manager will see it. */
Marc Slemko66949872006-07-15 01:52:39 +0000209
Marc Slemko8a40a762006-07-19 17:46:50 +0000210 {Synchronized s(_manager->_monitor);
Marc Slemko66949872006-07-15 01:52:39 +0000211
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000212 active = isActive();
213
214 while(active && _manager->_tasks.empty()) {
Marc Slemko66949872006-07-15 01:52:39 +0000215
216 _manager->_idleCount++;
217
218 _idle = true;
219
220 _manager->_monitor.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000221
222 active = isActive();
Marc Slemko66949872006-07-15 01:52:39 +0000223
224 _idle = false;
225
226 _manager->_idleCount--;
227 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000228
229 if(active) {
Marc Slemkod466b212006-07-20 00:04:18 +0000230
231 if(!_manager->_tasks.empty()) {
Marc Slemko66949872006-07-15 01:52:39 +0000232
Marc Slemkod466b212006-07-20 00:04:18 +0000233 task = _manager->_tasks.front();
234
235 _manager->_tasks.pop();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000236
Marc Slemkod466b212006-07-20 00:04:18 +0000237 if(task->_state == ThreadManager::Task::WAITING) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000238
Marc Slemkod466b212006-07-20 00:04:18 +0000239 task->_state = ThreadManager::Task::EXECUTING;
240 }
241 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000242 } else {
243
244 _idle = true;
245
246 _manager->_workerCount--;
247
248 notifyManager = _manager->_workerCount == _manager->_workerMaxCount;
Marc Slemko66949872006-07-15 01:52:39 +0000249 }
250 }
251
252 if(task != NULL) {
253
Marc Slemkod466b212006-07-20 00:04:18 +0000254 if(task->_state == ThreadManager::Task::EXECUTING) {
255 try {
256
257 task->run();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000258
Marc Slemkod466b212006-07-20 00:04:18 +0000259 } catch(...) {
260
261 // XXX need to log this
262 }
263
264 delete task;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000265
266 task = NULL;
Marc Slemkod466b212006-07-20 00:04:18 +0000267 }
Marc Slemko66949872006-07-15 01:52:39 +0000268 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000269 }
270
271 {Synchronized s(_manager->_workerMonitor);
272
273 _manager->_deadWorkers.insert(this->thread());
Marc Slemkod466b212006-07-20 00:04:18 +0000274
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000275 if(notifyManager) {
Marc Slemko66949872006-07-15 01:52:39 +0000276
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000277 _manager->_workerMonitor.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000278 }
279 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000280
Marc Slemko66949872006-07-15 01:52:39 +0000281 return;
282 }
283
284 private:
285
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000286 ThreadManager::Impl* _manager;
Marc Slemko66949872006-07-15 01:52:39 +0000287
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000288 friend class ThreadManager::Impl;
Marc Slemko66949872006-07-15 01:52:39 +0000289
290 STATE _state;
291
292 bool _idle;
293};
294
Marc Slemkod466b212006-07-20 00:04:18 +0000295void ThreadManager::Impl::addWorker(size_t value) {
Marc Slemko66949872006-07-15 01:52:39 +0000296
Marc Slemkod466b212006-07-20 00:04:18 +0000297 std::set<Thread*> newThreads;
Marc Slemko66949872006-07-15 01:52:39 +0000298
Marc Slemkod466b212006-07-20 00:04:18 +0000299 for(size_t ix = 0; ix < value; ix++) {
Marc Slemko66949872006-07-15 01:52:39 +0000300
Marc Slemkod466b212006-07-20 00:04:18 +0000301 class ThreadManager::Worker;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000302
Marc Slemkod466b212006-07-20 00:04:18 +0000303 ThreadManager::Worker* worker = new ThreadManager::Worker(this);
Marc Slemko66949872006-07-15 01:52:39 +0000304
Marc Slemkod466b212006-07-20 00:04:18 +0000305 newThreads.insert(_threadFactory->newThread(worker));
Marc Slemko66949872006-07-15 01:52:39 +0000306 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000307
308 {Synchronized s(_monitor);
309
310 _workerMaxCount+= value;
311
312 _workers.insert(newThreads.begin(), newThreads.end());
313 }
Marc Slemko66949872006-07-15 01:52:39 +0000314
Marc Slemkod466b212006-07-20 00:04:18 +0000315 for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
316
317 ThreadManager::Worker* worker = (ThreadManager::Worker*)(*ix)->runnable();
318
319 worker->_state = ThreadManager::Worker::STARTING;
320
321 (*ix)->start();
322 }
323
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000324 {Synchronized s(_workerMonitor);
Marc Slemkod466b212006-07-20 00:04:18 +0000325
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000326 while(_workerCount != _workerMaxCount) {
327 _workerMonitor.wait();
328 }
329 }
330}
Marc Slemkod466b212006-07-20 00:04:18 +0000331
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000332void ThreadManager::Impl::start() {
333
334 if(_state == ThreadManager::STOPPED) {
335 return;
336 }
337
338 {Synchronized s(_monitor);
339
340 if(_state == ThreadManager::UNINITIALIZED) {
341
342 if(_threadFactory == NULL) {throw InvalidArgumentException();}
343
344 _state = ThreadManager::STARTED;
345
346 _monitor.notifyAll();
347 }
348
349 while(_state == STARTING) {
350
Marc Slemkod466b212006-07-20 00:04:18 +0000351 _monitor.wait();
352 }
353 }
354}
355
356void ThreadManager::Impl::stop() {
357
358 bool doStop = false;
359
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000360 if(_state == ThreadManager::STOPPED) {
361 return;
362 }
363
Marc Slemkod466b212006-07-20 00:04:18 +0000364 {Synchronized s(_monitor);
365
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000366 if(!_state != ThreadManager::STOPPING && _state != ThreadManager::STOPPED) {
367
Marc Slemkod466b212006-07-20 00:04:18 +0000368 doStop = true;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000369
370 _state = ThreadManager::STOPPING;
Marc Slemkod466b212006-07-20 00:04:18 +0000371 }
372 }
373
374 if(doStop) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000375
Marc Slemkod466b212006-07-20 00:04:18 +0000376 removeWorker(_workerCount);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000377
378 _state = ThreadManager::STOPPING;
Marc Slemkod466b212006-07-20 00:04:18 +0000379 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000380
381 // Don't block for stopping->stopped transition here, since if stop is being performed in context of a delete, the monitor may be invalid
382
Marc Slemkod466b212006-07-20 00:04:18 +0000383}
384
385void ThreadManager::Impl::removeWorker(size_t value) {
Marc Slemko66949872006-07-15 01:52:39 +0000386
387 std::set<Thread*> removedThreads;
388
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000389 {Synchronized s(_monitor);
Marc Slemko66949872006-07-15 01:52:39 +0000390
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000391 if(value > _workerMaxCount) {
392
393 throw InvalidArgumentException();
394 }
395
396 _workerMaxCount-= value;
397
398 if(_idleCount < value) {
Marc Slemko66949872006-07-15 01:52:39 +0000399
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000400 for(size_t ix = 0; ix < _idleCount; ix++) {
Marc Slemko66949872006-07-15 01:52:39 +0000401
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000402 _monitor.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000403 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000404 } else {
405
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000406 _monitor.notifyAll();
Marc Slemko66949872006-07-15 01:52:39 +0000407 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000408 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000409
410 {Synchronized s(_workerMonitor);
411
412 while(_workerCount != _workerMaxCount) {
413 _workerMonitor.wait();
414 }
415
416 for(std::set<Thread*>::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) {
417
418 _workers.erase(*ix);
419
420 delete (*ix)->runnable();
421
422 delete (*ix);
423 }
424
425 _deadWorkers.clear();
426 }
427}
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000428
429void ThreadManager::Impl::add(Runnable* value) {
430
431 Synchronized s(_monitor);
432
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000433 if(_state != ThreadManager::STARTED) {
434
435 throw IllegalStateException();
436 }
Marc Slemkod466b212006-07-20 00:04:18 +0000437
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000438 _tasks.push(new ThreadManager::Task(value));
439
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000440 /* If idle thread is available notify it, otherwise all worker threads are running and will get around to this
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000441 task in time. */
442
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000443 if(_idleCount > 0) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000444
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000445 _monitor.notify();
446 }
Marc Slemko66949872006-07-15 01:52:39 +0000447 }
448
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000449void ThreadManager::Impl::remove(Runnable* task) {
Marc Slemko66949872006-07-15 01:52:39 +0000450
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000451 Synchronized s(_monitor);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000452
453 if(_state != ThreadManager::STARTED) {
454
455 throw IllegalStateException();
456 }
Marc Slemko66949872006-07-15 01:52:39 +0000457}
458
Marc Slemkod466b212006-07-20 00:04:18 +0000459class SimpleThreadManager : public ThreadManager::Impl {
Marc Slemko66949872006-07-15 01:52:39 +0000460
Marc Slemkod466b212006-07-20 00:04:18 +0000461public:
Marc Slemko66949872006-07-15 01:52:39 +0000462
Marc Slemkod466b212006-07-20 00:04:18 +0000463 SimpleThreadManager(size_t workerCount=4) :
464 _workerCount(workerCount),
465 _firstTime(true) {
466 }
Marc Slemko66949872006-07-15 01:52:39 +0000467
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000468 void start() {
469 ThreadManager::Impl::start();
Marc Slemko66949872006-07-15 01:52:39 +0000470
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000471 addWorker(_workerCount);
Marc Slemkod466b212006-07-20 00:04:18 +0000472 }
473
474private:
475
476 const size_t _workerCount;
477 bool _firstTime;
478 Monitor _monitor;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000479};
Marc Slemko66949872006-07-15 01:52:39 +0000480
Marc Slemko66949872006-07-15 01:52:39 +0000481
Marc Slemkod466b212006-07-20 00:04:18 +0000482ThreadManager* ThreadManager::newThreadManager() {
483 return new ThreadManager::Impl();
484}
Marc Slemko66949872006-07-15 01:52:39 +0000485
Marc Slemko525c2022006-07-20 00:29:35 +0000486ThreadManager* ThreadManager::newSimpleThreadManager(size_t count) {
487 return new SimpleThreadManager(count);
Marc Slemkod466b212006-07-20 00:04:18 +0000488}
Marc Slemko66949872006-07-15 01:52:39 +0000489
490}}} // facebook::thrift::concurrency
491