Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 1 | #include "ThreadManager.h" |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 2 | #include "Exception.h" |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 3 | #include "Monitor.h" |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 4 | |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 5 | #include <boost/shared_ptr.hpp> |
| 6 | |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 7 | #include <assert.h> |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 8 | #include <queue> |
| 9 | #include <set> |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 10 | |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 11 | #if defined(DEBUG) |
| 12 | #include <iostream> |
| 13 | #endif //defined(DEBUG) |
| 14 | |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 15 | namespace facebook { namespace thrift { namespace concurrency { |
| 16 | |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 17 | using namespace boost; |
| 18 | |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 19 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 20 | /** |
| 21 | * ThreadManager class |
| 22 | * |
| 23 | * This class manages a pool of threads. It uses a ThreadFactory to create |
| 24 | * threads. It never actually creates or destroys worker threads, rather |
| 25 | * it maintains statistics on number of idle threads, number of active threads, |
| 26 | * task backlog, and average wait and service times. |
| 27 | * |
| 28 | * @author marc |
| 29 | * @version $Id:$ |
| 30 | */ |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 31 | class ThreadManager::Impl : public ThreadManager { |
| 32 | |
| 33 | public: |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 34 | Impl() : |
| 35 | _workerCount(0), |
| 36 | _workerMaxCount(0), |
| 37 | _idleCount(0), |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 38 | _state(ThreadManager::UNINITIALIZED) {} |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 39 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 40 | ~Impl() { stop(); } |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 41 | |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 42 | void start(); |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 43 | |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 44 | void stop(); |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 45 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 46 | const ThreadManager::STATE state() const { return _state; } |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 47 | |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 48 | shared_ptr<ThreadFactory> threadFactory() const { |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 49 | Synchronized s(_monitor); |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 50 | return _threadFactory; |
| 51 | } |
| 52 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 53 | void threadFactory(shared_ptr<ThreadFactory> value) { |
| 54 | Synchronized s(_monitor); |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 55 | _threadFactory = value; |
| 56 | } |
| 57 | |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 58 | void addWorker(size_t value); |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 59 | |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 60 | void removeWorker(size_t value); |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 61 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 62 | size_t idleWorkerCount() const { return _idleCount; } |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 63 | |
| 64 | size_t workerCount() const { |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 65 | Synchronized s(_monitor); |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 66 | return _workerCount; |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 67 | } |
| 68 | |
| 69 | size_t pendingTaskCount() const { |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 70 | Synchronized s(_monitor); |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 71 | return _tasks.size(); |
| 72 | } |
| 73 | |
| 74 | size_t totalTaskCount() const { |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 75 | Synchronized s(_monitor); |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 76 | return _tasks.size() + _workerCount - _idleCount; |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 77 | } |
| 78 | |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 79 | void add(shared_ptr<Runnable> value); |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 80 | |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 81 | void remove(shared_ptr<Runnable> task); |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 82 | |
| 83 | private: |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 84 | size_t _workerCount; |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 85 | size_t _workerMaxCount; |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 86 | size_t _idleCount; |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 87 | ThreadManager::STATE _state; |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 88 | shared_ptr<ThreadFactory> _threadFactory; |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 89 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 90 | |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 91 | friend class ThreadManager::Task; |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 92 | std::queue<shared_ptr<Task> > _tasks; |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 93 | Monitor _monitor; |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 94 | Monitor _workerMonitor; |
| 95 | |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 96 | friend class ThreadManager::Worker; |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 97 | std::set<shared_ptr<Thread> > _workers; |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 98 | std::set<shared_ptr<Thread> > _deadWorkers; |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 99 | }; |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 100 | |
| 101 | class ThreadManager::Task : public Runnable { |
| 102 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 103 | public: |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 104 | enum STATE { |
| 105 | WAITING, |
| 106 | EXECUTING, |
| 107 | CANCELLED, |
| 108 | COMPLETE |
| 109 | }; |
| 110 | |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 111 | Task(shared_ptr<Runnable> runnable) : |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 112 | _runnable(runnable), |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 113 | _state(WAITING) {} |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 114 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 115 | ~Task() {} |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 116 | |
| 117 | void run() { |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 118 | if (_state == EXECUTING) { |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 119 | _runnable->run(); |
| 120 | _state = COMPLETE; |
| 121 | } |
| 122 | } |
| 123 | |
| 124 | private: |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 125 | shared_ptr<Runnable> _runnable; |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 126 | friend class ThreadManager::Worker; |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 127 | STATE _state; |
| 128 | }; |
| 129 | |
| 130 | class ThreadManager::Worker: public Runnable { |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 131 | enum STATE { |
| 132 | UNINITIALIZED, |
| 133 | STARTING, |
| 134 | STARTED, |
| 135 | STOPPING, |
| 136 | STOPPED |
| 137 | }; |
| 138 | |
| 139 | public: |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 140 | Worker(ThreadManager::Impl* manager) : |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 141 | _manager(manager), |
| 142 | _state(UNINITIALIZED), |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 143 | _idle(false) {} |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 144 | |
| 145 | ~Worker() {} |
| 146 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 147 | bool isActive() const { return _manager->_workerCount <= _manager->_workerMaxCount; } |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 148 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 149 | /** |
| 150 | * Worker entry point |
| 151 | * |
| 152 | * As long as worker thread is running, pull tasks off the task queue and |
| 153 | * execute. |
| 154 | */ |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 155 | void run() { |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 156 | bool active = false; |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 157 | bool notifyManager = false; |
| 158 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 159 | /** |
| 160 | * Increment worker semaphore and notify manager if worker count reached |
| 161 | * desired max |
| 162 | * |
| 163 | * Note: We have to release the monitor and acquire the workerMonitor |
| 164 | * since that is what the manager blocks on for worker add/remove |
| 165 | */ |
| 166 | { |
| 167 | Synchronized s(_manager->_monitor); |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 168 | active = _manager->_workerCount < _manager->_workerMaxCount; |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 169 | if (active) { |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 170 | _manager->_workerCount++; |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 171 | notifyManager = _manager->_workerCount == _manager->_workerMaxCount; |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 172 | } |
| 173 | } |
| 174 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 175 | if (notifyManager) { |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 176 | Synchronized s(_manager->_workerMonitor); |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 177 | _manager->_workerMonitor.notify(); |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 178 | notifyManager = false; |
| 179 | } |
| 180 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 181 | while (active) { |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 182 | shared_ptr<ThreadManager::Task> task; |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 183 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 184 | /** |
| 185 | * While holding manager monitor block for non-empty task queue (Also |
| 186 | * check that the thread hasn't been requested to stop). Once the queue |
| 187 | * is non-empty, dequeue a task, release monitor, and execute. If the |
| 188 | * worker max count has been decremented such that we exceed it, mark |
| 189 | * ourself inactive, decrement the worker count and notify the manager |
| 190 | * (technically we're notifying the next blocked thread but eventually |
| 191 | * the manager will see it. |
| 192 | */ |
| 193 | { |
| 194 | Synchronized s(_manager->_monitor); |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 195 | active = isActive(); |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 196 | while (active && _manager->_tasks.empty()) { |
| 197 | _manager->_idleCount++; |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 198 | _idle = true; |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 199 | _manager->_monitor.wait(); |
| 200 | active = isActive(); |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 201 | _idle = false; |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 202 | _manager->_idleCount--; |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 203 | } |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 204 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 205 | if (active) { |
| 206 | if (!_manager->_tasks.empty()) { |
| 207 | task = _manager->_tasks.front(); |
| 208 | _manager->_tasks.pop(); |
| 209 | if (task->_state == ThreadManager::Task::WAITING) { |
| 210 | task->_state = ThreadManager::Task::EXECUTING; |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 211 | } |
| 212 | } |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 213 | } else { |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 214 | _idle = true; |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 215 | _manager->_workerCount--; |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 216 | notifyManager = _manager->_workerCount == _manager->_workerMaxCount; |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 217 | } |
| 218 | } |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 219 | |
| 220 | if (task != NULL) { |
| 221 | if (task->_state == ThreadManager::Task::EXECUTING) { |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 222 | try { |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 223 | task->run(); |
| 224 | } catch(...) { |
| 225 | // XXX need to log this |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 226 | } |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 227 | } |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 228 | } |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 229 | } |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 230 | |
| 231 | { |
| 232 | Synchronized s(_manager->_workerMonitor); |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 233 | _manager->_deadWorkers.insert(this->thread()); |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 234 | if (notifyManager) { |
| 235 | _manager->_workerMonitor.notify(); |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 236 | } |
| 237 | } |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 238 | |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 239 | return; |
| 240 | } |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 241 | |
| 242 | private: |
| 243 | ThreadManager::Impl* _manager; |
| 244 | friend class ThreadManager::Impl; |
| 245 | STATE _state; |
| 246 | bool _idle; |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 247 | }; |
| 248 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 249 | |
| 250 | void ThreadManager::Impl::addWorker(size_t value) { |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 251 | std::set<shared_ptr<Thread> > newThreads; |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 252 | for (size_t ix = 0; ix < value; ix++) { |
| 253 | class ThreadManager::Worker; |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 254 | shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this)); |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 255 | newThreads.insert(_threadFactory->newThread(worker)); |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 256 | } |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 257 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 258 | { |
| 259 | Synchronized s(_monitor); |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 260 | _workerMaxCount+= value; |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 261 | _workers.insert(newThreads.begin(), newThreads.end()); |
| 262 | } |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 263 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 264 | for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) { |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 265 | shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable()); |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 266 | worker->_state = ThreadManager::Worker::STARTING; |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 267 | (*ix)->start(); |
| 268 | } |
| 269 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 270 | { |
| 271 | Synchronized s(_workerMonitor); |
| 272 | while (_workerCount != _workerMaxCount) { |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 273 | _workerMonitor.wait(); |
| 274 | } |
| 275 | } |
| 276 | } |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 277 | |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 278 | void ThreadManager::Impl::start() { |
| 279 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 280 | if (_state == ThreadManager::STOPPED) { |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 281 | return; |
| 282 | } |
| 283 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 284 | { |
| 285 | Synchronized s(_monitor); |
| 286 | if (_state == ThreadManager::UNINITIALIZED) { |
| 287 | if (_threadFactory == NULL) { |
| 288 | throw InvalidArgumentException(); |
| 289 | } |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 290 | _state = ThreadManager::STARTED; |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 291 | _monitor.notifyAll(); |
| 292 | } |
| 293 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 294 | while (_state == STARTING) { |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 295 | _monitor.wait(); |
| 296 | } |
| 297 | } |
| 298 | } |
| 299 | |
| 300 | void ThreadManager::Impl::stop() { |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 301 | bool doStop = false; |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 302 | if (_state == ThreadManager::STOPPED) { |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 303 | return; |
| 304 | } |
| 305 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 306 | { |
| 307 | Synchronized s(_monitor); |
| 308 | if (!_state != ThreadManager::STOPPING && _state != ThreadManager::STOPPED) { |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 309 | doStop = true; |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 310 | _state = ThreadManager::STOPPING; |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 311 | } |
| 312 | } |
| 313 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 314 | if (doStop) { |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 315 | removeWorker(_workerCount); |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 316 | _state = ThreadManager::STOPPING; |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 317 | } |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 318 | |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 319 | // XXX |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 320 | // should be able to block here for transition to STOPPED since we're no |
| 321 | // using shared_ptrs |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 322 | } |
| 323 | |
| 324 | void ThreadManager::Impl::removeWorker(size_t value) { |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 325 | std::set<shared_ptr<Thread> > removedThreads; |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 326 | { |
| 327 | Synchronized s(_monitor); |
| 328 | if (value > _workerMaxCount) { |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 329 | throw InvalidArgumentException(); |
| 330 | } |
| 331 | |
| 332 | _workerMaxCount-= value; |
| 333 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 334 | if (_idleCount < value) { |
| 335 | for (size_t ix = 0; ix < _idleCount; ix++) { |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 336 | _monitor.notify(); |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 337 | } |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 338 | } else { |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 339 | _monitor.notifyAll(); |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 340 | } |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 341 | } |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 342 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 343 | { |
| 344 | Synchronized s(_workerMonitor); |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 345 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 346 | while (_workerCount != _workerMaxCount) { |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 347 | _workerMonitor.wait(); |
| 348 | } |
| 349 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 350 | for (std::set<shared_ptr<Thread> >::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) { |
| 351 | _workers.erase(*ix); |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 352 | } |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 353 | |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 354 | _deadWorkers.clear(); |
| 355 | } |
| 356 | } |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 357 | |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 358 | void ThreadManager::Impl::add(shared_ptr<Runnable> value) { |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 359 | Synchronized s(_monitor); |
| 360 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 361 | if (_state != ThreadManager::STARTED) { |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 362 | throw IllegalStateException(); |
| 363 | } |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 364 | |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 365 | _tasks.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value))); |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 366 | |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 367 | // If idle thread is available notify it, otherwise all worker threads are |
| 368 | // running and will get around to this task in time. |
| 369 | if (_idleCount > 0) { |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 370 | _monitor.notify(); |
| 371 | } |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 372 | } |
| 373 | |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 374 | void ThreadManager::Impl::remove(shared_ptr<Runnable> task) { |
Mark Slee | f5f2be4 | 2006-09-05 21:05:31 +0000 | [diff] [blame] | 375 | Synchronized s(_monitor); |
| 376 | if (_state != ThreadManager::STARTED) { |
| 377 | throw IllegalStateException(); |
| 378 | } |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 379 | } |
| 380 | |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 381 | class SimpleThreadManager : public ThreadManager::Impl { |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 382 | |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 383 | public: |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 384 | SimpleThreadManager(size_t workerCount=4) : |
| 385 | _workerCount(workerCount), |
| 386 | _firstTime(true) { |
| 387 | } |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 388 | |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 389 | void start() { |
| 390 | ThreadManager::Impl::start(); |
Marc Slemko | fe5ba12e | 2006-07-20 21:16:27 +0000 | [diff] [blame] | 391 | addWorker(_workerCount); |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 392 | } |
| 393 | |
| 394 | private: |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 395 | const size_t _workerCount; |
| 396 | bool _firstTime; |
| 397 | Monitor _monitor; |
Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame] | 398 | }; |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 399 | |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 400 | |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 401 | shared_ptr<ThreadManager> ThreadManager::newThreadManager() { |
| 402 | return shared_ptr<ThreadManager>(new ThreadManager::Impl()); |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 403 | } |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 404 | |
Marc Slemko | 6f038a7 | 2006-08-03 18:58:09 +0000 | [diff] [blame] | 405 | shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count) { |
| 406 | return shared_ptr<ThreadManager>(new SimpleThreadManager(count)); |
Marc Slemko | d466b21 | 2006-07-20 00:04:18 +0000 | [diff] [blame] | 407 | } |
Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 408 | |
| 409 | }}} // facebook::thrift::concurrency |