blob: f8a5c22442ee6abdcc29f6a870966ab3e16c0174 [file] [log] [blame]
Marc Slemko66949872006-07-15 01:52:39 +00001#include "ThreadManager.h"
Marc Slemkod466b212006-07-20 00:04:18 +00002#include "Exception.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +00003#include "Monitor.h"
Marc Slemko66949872006-07-15 01:52:39 +00004
5#include <assert.h>
Marc Slemko0e53ccd2006-07-17 23:51:05 +00006#include <queue>
7#include <set>
Marc Slemko66949872006-07-15 01:52:39 +00008
9namespace facebook { namespace thrift { namespace concurrency {
10
11/** ThreadManager class
12
13 This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather
Marc Slemkod466b212006-07-20 00:04:18 +000014 it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times.
Marc Slemko66949872006-07-15 01:52:39 +000015
16 @author marc
Marc Slemko0e53ccd2006-07-17 23:51:05 +000017 @version $Id:$ */
18
19class ThreadManager::Impl : public ThreadManager {
20
21 public:
22
Marc Slemkod466b212006-07-20 00:04:18 +000023 Impl() : _stopped(false) {}
24
25
26
27 ~Impl() {
28
29 if(!_stopped) {
30 stop();
31 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000032 }
33
Marc Slemkod466b212006-07-20 00:04:18 +000034 void stop();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000035
36 const ThreadFactory* threadFactory() const {
37
38 Synchronized s(_monitor);
39
40 return _threadFactory;
41 }
42
43 void threadFactory(const ThreadFactory* value) {
44
45 Synchronized s(_monitor);
46
47 _threadFactory = value;
48 }
49
Marc Slemkod466b212006-07-20 00:04:18 +000050 void addWorker(size_t value);
Marc Slemko0e53ccd2006-07-17 23:51:05 +000051
Marc Slemkod466b212006-07-20 00:04:18 +000052 void removeWorker(size_t value);
Marc Slemko0e53ccd2006-07-17 23:51:05 +000053
54 size_t idleWorkerCount() const {return _idleCount;}
55
56 size_t workerCount() const {
57
58 Synchronized s(_monitor);
59
Marc Slemkod466b212006-07-20 00:04:18 +000060 return _workerCount;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000061 }
62
63 size_t pendingTaskCount() const {
64
65 Synchronized s(_monitor);
66
67 return _tasks.size();
68 }
69
70 size_t totalTaskCount() const {
71
72 Synchronized s(_monitor);
73
Marc Slemkod466b212006-07-20 00:04:18 +000074 return _tasks.size() + _workerCount - _idleCount;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000075 }
76
77 void add(Runnable* value);
78
79 void remove(Runnable* task);
80
81private:
82
Marc Slemkod466b212006-07-20 00:04:18 +000083 size_t _workerCount;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000084
85 size_t _idleCount;
86
Marc Slemkod466b212006-07-20 00:04:18 +000087 bool _stopped;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000088
89 const ThreadFactory* _threadFactory;
90
91 friend class ThreadManager::Task;
92
93 std::queue<Task*> _tasks;
94
95 Monitor _monitor;
96
97 friend class ThreadManager::Worker;
98
99 std::set<Thread*> _workers;
100};
Marc Slemko66949872006-07-15 01:52:39 +0000101
102class ThreadManager::Task : public Runnable {
103
104public:
105 enum STATE {
106 WAITING,
107 EXECUTING,
108 CANCELLED,
109 COMPLETE
110 };
111
112 Task(Runnable* runnable) :
113 _runnable(runnable),
114 _state(WAITING)
115 {}
116
117 ~Task() {};
118
119 void run() {
120 if(_state == EXECUTING) {
121 _runnable->run();
122 _state = COMPLETE;
123 }
124 }
125
126 private:
127
128 Runnable* _runnable;
Marc Slemkod466b212006-07-20 00:04:18 +0000129
130 friend class ThreadManager::Worker;
Marc Slemko66949872006-07-15 01:52:39 +0000131
132 STATE _state;
133};
134
135class ThreadManager::Worker: public Runnable {
Marc Slemko66949872006-07-15 01:52:39 +0000136 enum STATE {
137 UNINITIALIZED,
138 STARTING,
139 STARTED,
140 STOPPING,
141 STOPPED
142 };
143
144 public:
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000145 Worker(ThreadManager::Impl* manager) :
Marc Slemko66949872006-07-15 01:52:39 +0000146 _manager(manager),
147 _state(UNINITIALIZED),
148 _idle(false)
149 {}
150
151 ~Worker() {}
152
153 /** Worker entry point
154
155 As long as worker thread is running, pull tasks off the task queue and execute. */
156
157 void run() {
158
Marc Slemko8a40a762006-07-19 17:46:50 +0000159 {Synchronized s(_manager->_monitor);
Marc Slemko66949872006-07-15 01:52:39 +0000160
161 if(_state == STARTING) {
162 _state = STARTED;
163 }
Marc Slemkod466b212006-07-20 00:04:18 +0000164
165 _manager->_workerCount++;
166
167 _manager->_monitor.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000168 }
169
170 do {
171
172 ThreadManager::Task* task = NULL;
173
174 /* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop).
175
176 Once the queue is non-empty, dequeue a task, release monitor, and execute. */
177
Marc Slemko8a40a762006-07-19 17:46:50 +0000178 {Synchronized s(_manager->_monitor);
Marc Slemko66949872006-07-15 01:52:39 +0000179
180 while(_state == STARTED && _manager->_tasks.empty()) {
181
182 _manager->_idleCount++;
183
184 _idle = true;
185
186 _manager->_monitor.wait();
187
188 _idle = false;
189
190 _manager->_idleCount--;
191 }
192
193 if(_state == STARTED) {
Marc Slemkod466b212006-07-20 00:04:18 +0000194
195 if(!_manager->_tasks.empty()) {
Marc Slemko66949872006-07-15 01:52:39 +0000196
Marc Slemkod466b212006-07-20 00:04:18 +0000197 task = _manager->_tasks.front();
198
199 _manager->_tasks.pop();
200
201 if(task->_state == ThreadManager::Task::WAITING) {
202
203 task->_state = ThreadManager::Task::EXECUTING;
204 }
205 }
Marc Slemko66949872006-07-15 01:52:39 +0000206 }
207 }
208
209 if(task != NULL) {
210
Marc Slemkod466b212006-07-20 00:04:18 +0000211 if(task->_state == ThreadManager::Task::EXECUTING) {
212 try {
213
214 task->run();
Marc Slemko66949872006-07-15 01:52:39 +0000215
Marc Slemkod466b212006-07-20 00:04:18 +0000216 } catch(...) {
217
218 // XXX need to log this
219 }
220
221 delete task;
222 }
Marc Slemko66949872006-07-15 01:52:39 +0000223 }
Marc Slemkod466b212006-07-20 00:04:18 +0000224
Marc Slemko66949872006-07-15 01:52:39 +0000225 } while(_state == STARTED);
226
Marc Slemko8a40a762006-07-19 17:46:50 +0000227 {Synchronized s(_manager->_monitor);
Marc Slemko66949872006-07-15 01:52:39 +0000228
Marc Slemkod466b212006-07-20 00:04:18 +0000229 _manager->_workerCount--;
230
Marc Slemko66949872006-07-15 01:52:39 +0000231 if(_state == STOPPING) {
232
233 _state = STOPPED;
234
235 _manager->_monitor.notify();
236
237 }
238 }
239
240 return;
241 }
242
243 private:
244
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000245 ThreadManager::Impl* _manager;
Marc Slemko66949872006-07-15 01:52:39 +0000246
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000247 friend class ThreadManager::Impl;
Marc Slemko66949872006-07-15 01:52:39 +0000248
249 STATE _state;
250
251 bool _idle;
252};
253
Marc Slemkod466b212006-07-20 00:04:18 +0000254void ThreadManager::Impl::addWorker(size_t value) {
Marc Slemko66949872006-07-15 01:52:39 +0000255
Marc Slemkod466b212006-07-20 00:04:18 +0000256 std::set<Thread*> newThreads;
Marc Slemko66949872006-07-15 01:52:39 +0000257
Marc Slemkod466b212006-07-20 00:04:18 +0000258 for(size_t ix = 0; ix < value; ix++) {
Marc Slemko66949872006-07-15 01:52:39 +0000259
Marc Slemkod466b212006-07-20 00:04:18 +0000260 class ThreadManager::Worker;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000261
Marc Slemkod466b212006-07-20 00:04:18 +0000262 ThreadManager::Worker* worker = new ThreadManager::Worker(this);
Marc Slemko66949872006-07-15 01:52:39 +0000263
Marc Slemkod466b212006-07-20 00:04:18 +0000264 newThreads.insert(_threadFactory->newThread(worker));
Marc Slemko66949872006-07-15 01:52:39 +0000265 }
266
Marc Slemkod466b212006-07-20 00:04:18 +0000267 for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
268
269 ThreadManager::Worker* worker = (ThreadManager::Worker*)(*ix)->runnable();
270
271 worker->_state = ThreadManager::Worker::STARTING;
272
273 (*ix)->start();
274 }
275
276 {Synchronized s(_monitor);
277
278 _workers.insert(newThreads.begin(), newThreads.end());
279
280 while(_workerCount != _workers.size()) {
281 _monitor.wait();
282 }
283 }
284}
285
286void ThreadManager::Impl::stop() {
287
288 bool doStop = false;
289
290 {Synchronized s(_monitor);
291
292 if(!_stopped) {
293 doStop = true;
294 _stopped = true;
295 }
296 }
297
298 if(doStop) {
299 removeWorker(_workerCount);
300 }
301}
302
303void ThreadManager::Impl::removeWorker(size_t value) {
Marc Slemko66949872006-07-15 01:52:39 +0000304
305 std::set<Thread*> removedThreads;
306
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000307 {Synchronized s(_monitor);
Marc Slemko66949872006-07-15 01:52:39 +0000308
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000309 /* Overly clever loop
310
311 First time through, (idleOnly == 1) just look for idle threads. If that didn't find enough, go through again (idleOnly == 0)
312 and remove a sufficient number of busy threads. */
Marc Slemko66949872006-07-15 01:52:39 +0000313
Marc Slemkod466b212006-07-20 00:04:18 +0000314 for(int idleOnly = 1; idleOnly >= 0; idleOnly--) {
Marc Slemko66949872006-07-15 01:52:39 +0000315
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000316 for(std::set<Thread*>::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) {
Marc Slemko66949872006-07-15 01:52:39 +0000317
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000318 Worker* worker = (Worker*)(*workerThread)->runnable();
Marc Slemko66949872006-07-15 01:52:39 +0000319
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000320 if(worker->_idle || !idleOnly) {
Marc Slemkod466b212006-07-20 00:04:18 +0000321
322 if(worker->_state == ThreadManager::Worker::STARTED) {
323
324 worker->_state = ThreadManager::Worker::STOPPING;
325 }
326
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000327 removedThreads.insert(*workerThread);
Marc Slemko66949872006-07-15 01:52:39 +0000328
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000329 _workers.erase(workerThread);
330 }
Marc Slemko66949872006-07-15 01:52:39 +0000331 }
332 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000333
334 _monitor.notifyAll();
Marc Slemko66949872006-07-15 01:52:39 +0000335 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000336
Marc Slemko66949872006-07-15 01:52:39 +0000337
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000338 // Join removed threads and free worker
339
340 for(std::set<Thread*>::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) {
341
342 Worker* worker = (Worker*)(*workerThread)->runnable();
343
344 (*workerThread)->join();
345
346 delete worker;
347 }
348 }
349
350void ThreadManager::Impl::add(Runnable* value) {
351
352 Synchronized s(_monitor);
353
Marc Slemkod466b212006-07-20 00:04:18 +0000354 bool isEmpty = _tasks.empty();
355
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000356 _tasks.push(new ThreadManager::Task(value));
357
Marc Slemkod466b212006-07-20 00:04:18 +0000358 /* If queue was empty notify a thread, otherwise all worker threads are running and will get around to this
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000359 task in time. */
360
Marc Slemkod466b212006-07-20 00:04:18 +0000361 if(isEmpty) {
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000362
Marc Slemkod466b212006-07-20 00:04:18 +0000363 assert(_idleCount == _workerCount);
364
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000365 _monitor.notify();
366 }
Marc Slemko66949872006-07-15 01:52:39 +0000367 }
368
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000369void ThreadManager::Impl::remove(Runnable* task) {
Marc Slemko66949872006-07-15 01:52:39 +0000370
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000371 Synchronized s(_monitor);
Marc Slemko66949872006-07-15 01:52:39 +0000372}
373
Marc Slemkod466b212006-07-20 00:04:18 +0000374class SimpleThreadManager : public ThreadManager::Impl {
Marc Slemko66949872006-07-15 01:52:39 +0000375
Marc Slemkod466b212006-07-20 00:04:18 +0000376public:
Marc Slemko66949872006-07-15 01:52:39 +0000377
Marc Slemkod466b212006-07-20 00:04:18 +0000378 SimpleThreadManager(size_t workerCount=4) :
379 _workerCount(workerCount),
380 _firstTime(true) {
381 }
Marc Slemko66949872006-07-15 01:52:39 +0000382
Marc Slemkod466b212006-07-20 00:04:18 +0000383 void add(Runnable* task) {
Marc Slemko66949872006-07-15 01:52:39 +0000384
Marc Slemkod466b212006-07-20 00:04:18 +0000385 bool addWorkers = false;
Marc Slemko66949872006-07-15 01:52:39 +0000386
Marc Slemkod466b212006-07-20 00:04:18 +0000387 {Synchronized s(_monitor);
Marc Slemko66949872006-07-15 01:52:39 +0000388
Marc Slemkod466b212006-07-20 00:04:18 +0000389 if(_firstTime) {
Marc Slemko66949872006-07-15 01:52:39 +0000390
Marc Slemkod466b212006-07-20 00:04:18 +0000391 _firstTime = false;
392
393 addWorkers = true;
394 }
395 }
396
397 if(addWorkers) {
398
399 addWorker(_workerCount);
400 }
401
402 Impl::add(task);
403 }
404
405private:
406
407 const size_t _workerCount;
408 bool _firstTime;
409 Monitor _monitor;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000410};
Marc Slemko66949872006-07-15 01:52:39 +0000411
Marc Slemko66949872006-07-15 01:52:39 +0000412
Marc Slemkod466b212006-07-20 00:04:18 +0000413ThreadManager* ThreadManager::newThreadManager() {
414 return new ThreadManager::Impl();
415}
Marc Slemko66949872006-07-15 01:52:39 +0000416
Marc Slemkod466b212006-07-20 00:04:18 +0000417ThreadManager* ThreadManager::newSimpleThreadManager() {
418 return new SimpleThreadManager();
419}
Marc Slemko66949872006-07-15 01:52:39 +0000420
421}}} // facebook::thrift::concurrency
422