blob: d13ce7ba69a81030d3e2b823440962f809d008a5 [file] [log] [blame]
Marc Slemko66949872006-07-15 01:52:39 +00001#include "ThreadManager.h"
Marc Slemko0e53ccd2006-07-17 23:51:05 +00002#include "Monitor.h"
Marc Slemko66949872006-07-15 01:52:39 +00003
4#include <assert.h>
Marc Slemko0e53ccd2006-07-17 23:51:05 +00005#include <queue>
6#include <set>
Marc Slemko66949872006-07-15 01:52:39 +00007
8namespace facebook { namespace thrift { namespace concurrency {
9
10/** ThreadManager class
11
12 This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather
13 it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times and informs the
14 PoolPolicy object bound to instances of this manager of interesting transitions. It is then up the PoolPolicy object to decide if the thread pool
15 size needs to be adjusted and call this object addThread and removeThread methods to make changes.
16
17 This design allows different policy implementations to used this code to handle basic worker thread management and worker task execution and focus on
18 policy issues. The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads.
19
20 @author marc
Marc Slemko0e53ccd2006-07-17 23:51:05 +000021 @version $Id:$ */
22
23class ThreadManager::Impl : public ThreadManager {
24
25 public:
26
27 Impl(size_t highWatermark, size_t lowWatermark) :
28 _hiwat(highWatermark),
29 _lowat(lowWatermark) {
30 }
31
32 ~Impl() {}
33
34 size_t highWatermark() const {return _hiwat;}
35
36 void highWatermark(size_t value) {_hiwat = value;}
37
38 size_t lowWatermark() const {return _lowat;}
39
40 void lowWatermark(size_t value) {_lowat = value;}
41
42 const PoolPolicy* poolPolicy() const {
43
44 Synchronized s(_monitor);
45
46 return _poolPolicy;
47 }
48
49 void poolPolicy(const PoolPolicy* value) {
50
51 Synchronized s(_monitor);
52
53 _poolPolicy = value;
54 }
55
56 const ThreadFactory* threadFactory() const {
57
58 Synchronized s(_monitor);
59
60 return _threadFactory;
61 }
62
63 void threadFactory(const ThreadFactory* value) {
64
65 Synchronized s(_monitor);
66
67 _threadFactory = value;
68 }
69
70 void addThread(size_t value);
71
72 void removeThread(size_t value);
73
74 size_t idleWorkerCount() const {return _idleCount;}
75
76 size_t workerCount() const {
77
78 Synchronized s(_monitor);
79
80 return _workers.size();
81 }
82
83 size_t pendingTaskCount() const {
84
85 Synchronized s(_monitor);
86
87 return _tasks.size();
88 }
89
90 size_t totalTaskCount() const {
91
92 Synchronized s(_monitor);
93
94 return _tasks.size() + _workers.size() - _idleCount;
95 }
96
97 void add(Runnable* value);
98
99 void remove(Runnable* task);
100
101private:
102
103 size_t _hiwat;
104
105 size_t _lowat;
106
107 size_t _idleCount;
108
109 const PoolPolicy* _poolPolicy;;
110
111 const ThreadFactory* _threadFactory;
112
113 friend class ThreadManager::Task;
114
115 std::queue<Task*> _tasks;
116
117 Monitor _monitor;
118
119 friend class ThreadManager::Worker;
120
121 std::set<Thread*> _workers;
122};
Marc Slemko66949872006-07-15 01:52:39 +0000123
124class ThreadManager::Task : public Runnable {
125
126public:
127 enum STATE {
128 WAITING,
129 EXECUTING,
130 CANCELLED,
131 COMPLETE
132 };
133
134 Task(Runnable* runnable) :
135 _runnable(runnable),
136 _state(WAITING)
137 {}
138
139 ~Task() {};
140
141 void run() {
142 if(_state == EXECUTING) {
143 _runnable->run();
144 _state = COMPLETE;
145 }
146 }
147
148 private:
149
150 Runnable* _runnable;
151
152 STATE _state;
153};
154
155class ThreadManager::Worker: public Runnable {
Marc Slemko66949872006-07-15 01:52:39 +0000156 enum STATE {
157 UNINITIALIZED,
158 STARTING,
159 STARTED,
160 STOPPING,
161 STOPPED
162 };
163
164 public:
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000165 Worker(ThreadManager::Impl* manager) :
Marc Slemko66949872006-07-15 01:52:39 +0000166 _manager(manager),
167 _state(UNINITIALIZED),
168 _idle(false)
169 {}
170
171 ~Worker() {}
172
173 /** Worker entry point
174
175 As long as worker thread is running, pull tasks off the task queue and execute. */
176
177 void run() {
178
Marc Slemko8a40a762006-07-19 17:46:50 +0000179 {Synchronized s(_manager->_monitor);
Marc Slemko66949872006-07-15 01:52:39 +0000180
181 if(_state == STARTING) {
182 _state = STARTED;
183 }
184 }
185
186 do {
187
188 ThreadManager::Task* task = NULL;
189
190 /* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop).
191
192 Once the queue is non-empty, dequeue a task, release monitor, and execute. */
193
Marc Slemko8a40a762006-07-19 17:46:50 +0000194 {Synchronized s(_manager->_monitor);
Marc Slemko66949872006-07-15 01:52:39 +0000195
196 while(_state == STARTED && _manager->_tasks.empty()) {
197
198 _manager->_idleCount++;
199
200 _idle = true;
201
202 _manager->_monitor.wait();
203
204 _idle = false;
205
206 _manager->_idleCount--;
207 }
208
209 if(_state == STARTED) {
210
211 task = _manager->_tasks.front();
212 }
213 }
214
215 if(task != NULL) {
216
217 task->run();
218
219 delete task;
220 }
221
222 } while(_state == STARTED);
223
Marc Slemko8a40a762006-07-19 17:46:50 +0000224 {Synchronized s(_manager->_monitor);
Marc Slemko66949872006-07-15 01:52:39 +0000225
226 if(_state == STOPPING) {
227
228 _state = STOPPED;
229
230 _manager->_monitor.notify();
231
232 }
233 }
234
235 return;
236 }
237
238 private:
239
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000240 ThreadManager::Impl* _manager;
Marc Slemko66949872006-07-15 01:52:39 +0000241
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000242 friend class ThreadManager::Impl;
Marc Slemko66949872006-07-15 01:52:39 +0000243
244 STATE _state;
245
246 bool _idle;
247};
248
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000249void ThreadManager::Impl::addThread(size_t value) {
Marc Slemko66949872006-07-15 01:52:39 +0000250
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000251 std::set<Thread*> newThreads;
Marc Slemko66949872006-07-15 01:52:39 +0000252
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000253 for(size_t ix = 0; ix < value; ix++) {
Marc Slemko66949872006-07-15 01:52:39 +0000254
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000255 class ThreadManager::Worker;
256
257 ThreadManager::Worker* worker = new ThreadManager::Worker(this);
Marc Slemko66949872006-07-15 01:52:39 +0000258
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000259 newThreads.insert(_threadFactory->newThread(worker));
260 }
Marc Slemko66949872006-07-15 01:52:39 +0000261
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000262 for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
Marc Slemko66949872006-07-15 01:52:39 +0000263
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000264 (*ix)->start();
265 }
266 for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
Marc Slemko66949872006-07-15 01:52:39 +0000267
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000268 (*ix)->start();
269 }
270
271 {Synchronized s(_monitor);
272
273 _workers.insert(newThreads.begin(), newThreads.end());
274 }
Marc Slemko66949872006-07-15 01:52:39 +0000275 }
276
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000277void ThreadManager::Impl::removeThread(size_t value) {
Marc Slemko66949872006-07-15 01:52:39 +0000278
279 std::set<Thread*> removedThreads;
280
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000281 {Synchronized s(_monitor);
Marc Slemko66949872006-07-15 01:52:39 +0000282
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000283 /* Overly clever loop
284
285 First time through, (idleOnly == 1) just look for idle threads. If that didn't find enough, go through again (idleOnly == 0)
286 and remove a sufficient number of busy threads. */
Marc Slemko66949872006-07-15 01:52:39 +0000287
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000288 for(int idleOnly = 1; idleOnly <= 0; idleOnly--) {
Marc Slemko66949872006-07-15 01:52:39 +0000289
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000290 for(std::set<Thread*>::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) {
Marc Slemko66949872006-07-15 01:52:39 +0000291
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000292 Worker* worker = (Worker*)(*workerThread)->runnable();
Marc Slemko66949872006-07-15 01:52:39 +0000293
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000294 if(worker->_idle || !idleOnly) {
Marc Slemko66949872006-07-15 01:52:39 +0000295
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000296 removedThreads.insert(*workerThread);
Marc Slemko66949872006-07-15 01:52:39 +0000297
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000298 _workers.erase(workerThread);
299 }
Marc Slemko66949872006-07-15 01:52:39 +0000300 }
301 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000302
303 _monitor.notifyAll();
Marc Slemko66949872006-07-15 01:52:39 +0000304 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000305
Marc Slemko66949872006-07-15 01:52:39 +0000306
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000307 // Join removed threads and free worker
308
309 for(std::set<Thread*>::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) {
310
311 Worker* worker = (Worker*)(*workerThread)->runnable();
312
313 (*workerThread)->join();
314
315 delete worker;
316 }
317 }
318
319void ThreadManager::Impl::add(Runnable* value) {
320
321 Synchronized s(_monitor);
322
323 _tasks.push(new ThreadManager::Task(value));
324
325 /* If queue is empty notify a thread, otherwise all worker threads are running and will get around to this
326 task in time. */
327
328 if(_tasks.size() == 1) {
329
330 assert(_idleCount == _workers.size());
331
332 _monitor.notify();
333 }
Marc Slemko66949872006-07-15 01:52:39 +0000334 }
335
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000336void ThreadManager::Impl::remove(Runnable* task) {
Marc Slemko66949872006-07-15 01:52:39 +0000337
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000338 Synchronized s(_monitor);
Marc Slemko66949872006-07-15 01:52:39 +0000339 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000340
341ThreadManager* ThreadManager::newThreadManager(size_t lowWatermark, size_t highWatermark) {
342 return new ThreadManager::Impl(lowWatermark, highWatermark);
Marc Slemko66949872006-07-15 01:52:39 +0000343}
344
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000345/** Basic Pool Policy Implementation */
Marc Slemko66949872006-07-15 01:52:39 +0000346
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000347class BasicPoolPolicy::Impl : public PoolPolicy {
Marc Slemko66949872006-07-15 01:52:39 +0000348
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000349 public:
Marc Slemko66949872006-07-15 01:52:39 +0000350
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000351 Impl() {}
Marc Slemko66949872006-07-15 01:52:39 +0000352
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000353 ~Impl() {}
Marc Slemko66949872006-07-15 01:52:39 +0000354
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000355 void onEmpty(ThreadManager* source) const {}
Marc Slemko66949872006-07-15 01:52:39 +0000356
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000357 void onLowWatermark(ThreadManager* source) const {}
Marc Slemko66949872006-07-15 01:52:39 +0000358
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000359 void onHighWatermark(ThreadManager* source) const {}
360};
Marc Slemko66949872006-07-15 01:52:39 +0000361
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000362BasicPoolPolicy::BasicPoolPolicy() : _impl(new BasicPoolPolicy::Impl()) {}
Marc Slemko66949872006-07-15 01:52:39 +0000363
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000364BasicPoolPolicy::~BasicPoolPolicy() { delete _impl;}
Marc Slemko66949872006-07-15 01:52:39 +0000365
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000366void BasicPoolPolicy::onEmpty(ThreadManager* source) const {_impl->onEmpty(source);}
Marc Slemko66949872006-07-15 01:52:39 +0000367
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000368void BasicPoolPolicy::onLowWatermark(ThreadManager* source) const {_impl->onLowWatermark(source);}
Marc Slemko66949872006-07-15 01:52:39 +0000369
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000370void BasicPoolPolicy::onHighWatermark(ThreadManager* source) const {_impl->onHighWatermark(source);}
Marc Slemko66949872006-07-15 01:52:39 +0000371
Marc Slemko66949872006-07-15 01:52:39 +0000372
373}}} // facebook::thrift::concurrency
374