blob: 24bfeec448e77e7ca2d7b4558ad4addca9e52b5d [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Konrad Grochowski9be4e682013-06-22 22:03:31 +020020#include <thrift/thrift-config.h>
Roger Meier12d70532011-12-14 23:35:28 +000021
Roger Meier4285ba22013-06-10 21:17:23 +020022#include <thrift/concurrency/ThreadManager.h>
23#include <thrift/concurrency/Exception.h>
24#include <thrift/concurrency/Monitor.h>
25#include <thrift/concurrency/Util.h>
Marc Slemko66949872006-07-15 01:52:39 +000026
Marc Slemko6f038a72006-08-03 18:58:09 +000027#include <boost/shared_ptr.hpp>
28
Marc Slemko66949872006-07-15 01:52:39 +000029#include <assert.h>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000030#include <queue>
31#include <set>
Marc Slemko66949872006-07-15 01:52:39 +000032
Marc Slemko6f038a72006-08-03 18:58:09 +000033#if defined(DEBUG)
34#include <iostream>
Konrad Grochowski16a23a62014-11-13 15:33:38 +010035#endif // defined(DEBUG)
Marc Slemko6f038a72006-08-03 18:58:09 +000036
Konrad Grochowski16a23a62014-11-13 15:33:38 +010037namespace apache {
38namespace thrift {
39namespace concurrency {
Marc Slemko66949872006-07-15 01:52:39 +000040
David Reissd4a269c2007-08-23 02:37:19 +000041using boost::shared_ptr;
42using boost::dynamic_pointer_cast;
Marc Slemko6f038a72006-08-03 18:58:09 +000043
Mark Sleef5f2be42006-09-05 21:05:31 +000044/**
45 * ThreadManager class
Marc Slemko3a3b53b2007-05-22 23:59:54 +000046 *
Mark Sleef5f2be42006-09-05 21:05:31 +000047 * This class manages a pool of threads. It uses a ThreadFactory to create
48 * threads. It never actually creates or destroys worker threads, rather
49 * it maintains statistics on number of idle threads, number of active threads,
50 * task backlog, and average wait and service times.
51 *
Mark Sleef5f2be42006-09-05 21:05:31 +000052 * @version $Id:$
53 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +010054class ThreadManager::Impl : public ThreadManager {
Marc Slemko0e53ccd2006-07-17 23:51:05 +000055
Konrad Grochowski16a23a62014-11-13 15:33:38 +010056public:
57 Impl()
58 : workerCount_(0),
59 workerMaxCount_(0),
60 idleCount_(0),
61 pendingTaskCountMax_(0),
62 expiredCount_(0),
63 state_(ThreadManager::UNINITIALIZED),
64 monitor_(&mutex_),
65 maxMonitor_(&mutex_) {}
Marc Slemkod466b212006-07-20 00:04:18 +000066
Mark Sleef5f2be42006-09-05 21:05:31 +000067 ~Impl() { stop(); }
Marc Slemkod466b212006-07-20 00:04:18 +000068
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000069 void start();
Marc Slemko0e53ccd2006-07-17 23:51:05 +000070
Mark Slee7c10eaf2007-03-01 02:45:10 +000071 void stop() { stopImpl(false); }
72
73 void join() { stopImpl(true); }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000074
Konrad Grochowski16a23a62014-11-13 15:33:38 +010075 ThreadManager::STATE state() const { return state_; }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000076
Marc Slemko6f038a72006-08-03 18:58:09 +000077 shared_ptr<ThreadFactory> threadFactory() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000078 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000079 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000080 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000081
82 void threadFactory(shared_ptr<ThreadFactory> value) {
Mark Slee2f6404d2006-10-10 01:37:40 +000083 Synchronized s(monitor_);
84 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000085 }
86
Marc Slemkod466b212006-07-20 00:04:18 +000087 void addWorker(size_t value);
Marc Slemko3a3b53b2007-05-22 23:59:54 +000088
Marc Slemkod466b212006-07-20 00:04:18 +000089 void removeWorker(size_t value);
Marc Slemko3a3b53b2007-05-22 23:59:54 +000090
Konrad Grochowski16a23a62014-11-13 15:33:38 +010091 size_t idleWorkerCount() const { return idleCount_; }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000092
93 size_t workerCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000094 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +000095 return workerCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000096 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000097
Marc Slemko0e53ccd2006-07-17 23:51:05 +000098 size_t pendingTaskCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000099 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000100 return tasks_.size();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000101 }
102
103 size_t totalTaskCount() const {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000104 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000105 return tasks_.size() + workerCount_ - idleCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000106 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000107
108 size_t pendingTaskCountMax() const {
109 Synchronized s(monitor_);
110 return pendingTaskCountMax_;
111 }
112
David Reiss068f4162010-03-09 05:19:45 +0000113 size_t expiredTaskCount() {
114 Synchronized s(monitor_);
115 size_t result = expiredCount_;
116 expiredCount_ = 0;
117 return result;
118 }
119
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000120 void pendingTaskCountMax(const size_t value) {
121 Synchronized s(monitor_);
122 pendingTaskCountMax_ = value;
123 }
124
125 bool canSleep();
126
David Reiss068f4162010-03-09 05:19:45 +0000127 void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000128
Marc Slemko6f038a72006-08-03 18:58:09 +0000129 void remove(shared_ptr<Runnable> task);
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000130
David Reiss01fe1532010-03-09 05:19:25 +0000131 shared_ptr<Runnable> removeNextPending();
132
David Reiss068f4162010-03-09 05:19:45 +0000133 void removeExpiredTasks();
134
135 void setExpireCallback(ExpireCallback expireCallback);
136
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000137private:
Mark Slee7c10eaf2007-03-01 02:45:10 +0000138 void stopImpl(bool join);
139
Mark Slee2f6404d2006-10-10 01:37:40 +0000140 size_t workerCount_;
141 size_t workerMaxCount_;
142 size_t idleCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000143 size_t pendingTaskCountMax_;
David Reiss068f4162010-03-09 05:19:45 +0000144 size_t expiredCount_;
145 ExpireCallback expireCallback_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000146
Mark Slee2f6404d2006-10-10 01:37:40 +0000147 ThreadManager::STATE state_;
148 shared_ptr<ThreadFactory> threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000149
150 friend class ThreadManager::Task;
Mark Slee2f6404d2006-10-10 01:37:40 +0000151 std::queue<shared_ptr<Task> > tasks_;
David Reissa0dbfef2010-03-09 05:19:32 +0000152 Mutex mutex_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000153 Monitor monitor_;
David Reissa0dbfef2010-03-09 05:19:32 +0000154 Monitor maxMonitor_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000155 Monitor workerMonitor_;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000156
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000157 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000158 std::set<shared_ptr<Thread> > workers_;
159 std::set<shared_ptr<Thread> > deadWorkers_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000160 std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000161};
Marc Slemko66949872006-07-15 01:52:39 +0000162
163class ThreadManager::Task : public Runnable {
164
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100165public:
166 enum STATE { WAITING, EXECUTING, CANCELLED, COMPLETE };
Marc Slemko66949872006-07-15 01:52:39 +0000167
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100168 Task(shared_ptr<Runnable> runnable, int64_t expiration = 0LL)
169 : runnable_(runnable),
170 state_(WAITING),
171 expireTime_(expiration != 0LL ? Util::currentTime() + expiration : 0LL) {}
Marc Slemko66949872006-07-15 01:52:39 +0000172
Mark Sleef5f2be42006-09-05 21:05:31 +0000173 ~Task() {}
Marc Slemko66949872006-07-15 01:52:39 +0000174
175 void run() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000176 if (state_ == EXECUTING) {
177 runnable_->run();
178 state_ = COMPLETE;
Marc Slemko66949872006-07-15 01:52:39 +0000179 }
180 }
181
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100182 shared_ptr<Runnable> getRunnable() { return runnable_; }
David Reiss01fe1532010-03-09 05:19:25 +0000183
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100184 int64_t getExpireTime() const { return expireTime_; }
David Reiss068f4162010-03-09 05:19:45 +0000185
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100186private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000187 shared_ptr<Runnable> runnable_;
Marc Slemkod466b212006-07-20 00:04:18 +0000188 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000189 STATE state_;
David Reiss068f4162010-03-09 05:19:45 +0000190 int64_t expireTime_;
Marc Slemko66949872006-07-15 01:52:39 +0000191};
192
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100193class ThreadManager::Worker : public Runnable {
194 enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED };
Marc Slemko66949872006-07-15 01:52:39 +0000195
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100196public:
197 Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED), idle_(false) {}
Marc Slemko66949872006-07-15 01:52:39 +0000198
199 ~Worker() {}
200
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100201private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000202 bool isActive() const {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100203 return (manager_->workerCount_ <= manager_->workerMaxCount_)
204 || (manager_->state_ == JOINING && !manager_->tasks_.empty());
Mark Slee2f6404d2006-10-10 01:37:40 +0000205 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000206
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100207public:
Mark Sleef5f2be42006-09-05 21:05:31 +0000208 /**
209 * Worker entry point
210 *
211 * As long as worker thread is running, pull tasks off the task queue and
212 * execute.
213 */
Marc Slemko66949872006-07-15 01:52:39 +0000214 void run() {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000215 bool active = false;
Mark Sleef5f2be42006-09-05 21:05:31 +0000216 /**
217 * Increment worker semaphore and notify manager if worker count reached
218 * desired max
219 *
220 * Note: We have to release the monitor and acquire the workerMonitor
221 * since that is what the manager blocks on for worker add/remove
222 */
223 {
Jim King61b17082016-04-19 15:57:31 -0400224 bool notifyManager = false;
225 {
226 Synchronized s(manager_->monitor_);
227 active = manager_->workerCount_ < manager_->workerMaxCount_;
228 if (active) {
229 manager_->workerCount_++;
230 notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
231 }
Marc Slemko66949872006-07-15 01:52:39 +0000232 }
Marc Slemko66949872006-07-15 01:52:39 +0000233
Jim King61b17082016-04-19 15:57:31 -0400234 if (notifyManager) {
235 Synchronized s(manager_->workerMonitor_);
236 manager_->workerMonitor_.notify();
237 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000238 }
239
Mark Sleef5f2be42006-09-05 21:05:31 +0000240 while (active) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000241 shared_ptr<ThreadManager::Task> task;
Marc Slemko66949872006-07-15 01:52:39 +0000242
Mark Sleef5f2be42006-09-05 21:05:31 +0000243 /**
244 * While holding manager monitor block for non-empty task queue (Also
245 * check that the thread hasn't been requested to stop). Once the queue
246 * is non-empty, dequeue a task, release monitor, and execute. If the
247 * worker max count has been decremented such that we exceed it, mark
248 * ourself inactive, decrement the worker count and notify the manager
249 * (technically we're notifying the next blocked thread but eventually
250 * the manager will see it.
251 */
252 {
David Reissa0dbfef2010-03-09 05:19:32 +0000253 Guard g(manager_->mutex_);
David Reiss96d23882007-07-26 21:10:32 +0000254 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000255
David Reiss96d23882007-07-26 21:10:32 +0000256 while (active && manager_->tasks_.empty()) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000257 manager_->idleCount_++;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000258 idle_ = true;
Mark Slee2f6404d2006-10-10 01:37:40 +0000259 manager_->monitor_.wait();
Mark Sleef5f2be42006-09-05 21:05:31 +0000260 active = isActive();
Mark Slee7c10eaf2007-03-01 02:45:10 +0000261 idle_ = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000262 manager_->idleCount_--;
David Reiss96d23882007-07-26 21:10:32 +0000263 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000264
David Reiss96d23882007-07-26 21:10:32 +0000265 if (active) {
David Reiss068f4162010-03-09 05:19:45 +0000266 manager_->removeExpiredTasks();
267
Mark Slee2f6404d2006-10-10 01:37:40 +0000268 if (!manager_->tasks_.empty()) {
269 task = manager_->tasks_.front();
270 manager_->tasks_.pop();
271 if (task->state_ == ThreadManager::Task::WAITING) {
272 task->state_ = ThreadManager::Task::EXECUTING;
David Reiss96d23882007-07-26 21:10:32 +0000273 }
Qiao Mu2fadc8d2014-12-03 10:48:36 +0800274 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000275
Qiao Mu2fadc8d2014-12-03 10:48:36 +0800276 /* If we have a pending task max and we just dropped below it, wakeup any
277 thread that might be blocked on add. */
278 if (manager_->pendingTaskCountMax_ != 0
Konrad Grochowski7f4be5f2015-11-05 20:23:11 +0100279 && manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
280 manager_->maxMonitor_.notify();
David Reiss96d23882007-07-26 21:10:32 +0000281 }
David Reiss96d23882007-07-26 21:10:32 +0000282 }
Marc Slemko66949872006-07-15 01:52:39 +0000283 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000284
Roger Meier72957452013-06-29 00:28:50 +0200285 if (task) {
David Reiss96d23882007-07-26 21:10:32 +0000286 if (task->state_ == ThreadManager::Task::EXECUTING) {
287 try {
Mark Sleef5f2be42006-09-05 21:05:31 +0000288 task->run();
Konrad Grochowski3876ea72014-12-09 15:24:56 +0100289 } catch (const std::exception& e) {
Jens Geyerfb05cf62014-12-04 21:49:07 +0100290 GlobalOutput.printf("[ERROR] task->run() raised an exception: %s", e.what());
Konrad Grochowski3876ea72014-12-09 15:24:56 +0100291 } catch (...) {
Jens Geyerfb05cf62014-12-04 21:49:07 +0100292 GlobalOutput.printf("[ERROR] task->run() raised an unknown exception");
David Reiss96d23882007-07-26 21:10:32 +0000293 }
294 }
Marc Slemko66949872006-07-15 01:52:39 +0000295 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000296 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000297
Mark Sleef5f2be42006-09-05 21:05:31 +0000298 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000299 Synchronized s(manager_->workerMonitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000300 manager_->deadWorkers_.insert(this->thread());
Jim King61b17082016-04-19 15:57:31 -0400301 idle_ = true;
302 manager_->workerCount_--;
303 bool notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
Mark Sleef5f2be42006-09-05 21:05:31 +0000304 if (notifyManager) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000305 manager_->workerMonitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000306 }
307 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000308
Marc Slemko66949872006-07-15 01:52:39 +0000309 return;
310 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000311
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100312private:
313 ThreadManager::Impl* manager_;
314 friend class ThreadManager::Impl;
315 STATE state_;
316 bool idle_;
Marc Slemko66949872006-07-15 01:52:39 +0000317};
318
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100319void ThreadManager::Impl::addWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000320 std::set<shared_ptr<Thread> > newThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000321 for (size_t ix = 0; ix < value; ix++) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100322 shared_ptr<ThreadManager::Worker> worker
323 = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
Mark Slee2f6404d2006-10-10 01:37:40 +0000324 newThreads.insert(threadFactory_->newThread(worker));
Marc Slemko66949872006-07-15 01:52:39 +0000325 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000326
Mark Sleef5f2be42006-09-05 21:05:31 +0000327 {
Mark Slee2f6404d2006-10-10 01:37:40 +0000328 Synchronized s(monitor_);
329 workerMaxCount_ += value;
330 workers_.insert(newThreads.begin(), newThreads.end());
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000331 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000332
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100333 for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end();
Roger Meier71f2d8a2015-04-26 17:00:04 +0200334 ++ix) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100335 shared_ptr<ThreadManager::Worker> worker
336 = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
Mark Slee2f6404d2006-10-10 01:37:40 +0000337 worker->state_ = ThreadManager::Worker::STARTING;
Marc Slemkod466b212006-07-20 00:04:18 +0000338 (*ix)->start();
Marc Slemkoa6479032007-06-05 22:20:14 +0000339 idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
Marc Slemkod466b212006-07-20 00:04:18 +0000340 }
341
Mark Sleef5f2be42006-09-05 21:05:31 +0000342 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000343 Synchronized s(workerMonitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000344 while (workerCount_ != workerMaxCount_) {
345 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000346 }
347 }
348}
Marc Slemkod466b212006-07-20 00:04:18 +0000349
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000350void ThreadManager::Impl::start() {
351
Mark Slee2f6404d2006-10-10 01:37:40 +0000352 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000353 return;
354 }
355
Mark Sleef5f2be42006-09-05 21:05:31 +0000356 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000357 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000358 if (state_ == ThreadManager::UNINITIALIZED) {
Roger Meier72957452013-06-29 00:28:50 +0200359 if (!threadFactory_) {
Mark Sleef5f2be42006-09-05 21:05:31 +0000360 throw InvalidArgumentException();
361 }
Mark Slee2f6404d2006-10-10 01:37:40 +0000362 state_ = ThreadManager::STARTED;
363 monitor_.notifyAll();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000364 }
365
Mark Slee2f6404d2006-10-10 01:37:40 +0000366 while (state_ == STARTING) {
367 monitor_.wait();
Marc Slemkod466b212006-07-20 00:04:18 +0000368 }
369 }
370}
371
Mark Slee7c10eaf2007-03-01 02:45:10 +0000372void ThreadManager::Impl::stopImpl(bool join) {
Marc Slemkod466b212006-07-20 00:04:18 +0000373 bool doStop = false;
Mark Slee2f6404d2006-10-10 01:37:40 +0000374 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000375 return;
376 }
377
Mark Sleef5f2be42006-09-05 21:05:31 +0000378 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000379 Synchronized s(monitor_);
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100380 if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING
381 && state_ != ThreadManager::STOPPED) {
Marc Slemkod466b212006-07-20 00:04:18 +0000382 doStop = true;
Mark Slee7c10eaf2007-03-01 02:45:10 +0000383 state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
Marc Slemkod466b212006-07-20 00:04:18 +0000384 }
385 }
386
Mark Sleef5f2be42006-09-05 21:05:31 +0000387 if (doStop) {
Mark Slee2f6404d2006-10-10 01:37:40 +0000388 removeWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000389 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000390
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000391 // XXX
Mark Sleef5f2be42006-09-05 21:05:31 +0000392 // should be able to block here for transition to STOPPED since we're no
393 // using shared_ptrs
Mark Slee7c10eaf2007-03-01 02:45:10 +0000394
395 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000396 Synchronized s(monitor_);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000397 state_ = ThreadManager::STOPPED;
398 }
Marc Slemkod466b212006-07-20 00:04:18 +0000399}
Mark Slee7c10eaf2007-03-01 02:45:10 +0000400
Marc Slemkod466b212006-07-20 00:04:18 +0000401void ThreadManager::Impl::removeWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000402 std::set<shared_ptr<Thread> > removedThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000403 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000404 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000405 if (value > workerMaxCount_) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000406 throw InvalidArgumentException();
407 }
408
Mark Slee2f6404d2006-10-10 01:37:40 +0000409 workerMaxCount_ -= value;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000410
Mark Slee2f6404d2006-10-10 01:37:40 +0000411 if (idleCount_ < value) {
412 for (size_t ix = 0; ix < idleCount_; ix++) {
David Reiss96d23882007-07-26 21:10:32 +0000413 monitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000414 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000415 } else {
Mark Slee2f6404d2006-10-10 01:37:40 +0000416 monitor_.notifyAll();
Marc Slemko66949872006-07-15 01:52:39 +0000417 }
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000418 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000419
Mark Sleef5f2be42006-09-05 21:05:31 +0000420 {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000421 Synchronized s(workerMonitor_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000422
Mark Slee2f6404d2006-10-10 01:37:40 +0000423 while (workerCount_ != workerMaxCount_) {
424 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000425 }
426
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100427 for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin();
428 ix != deadWorkers_.end();
Roger Meier71f2d8a2015-04-26 17:00:04 +0200429 ++ix) {
Marc Slemkoa6479032007-06-05 22:20:14 +0000430 idMap_.erase((*ix)->getId());
Roger Meierffbfd032013-06-29 14:51:30 +0200431 workers_.erase(*ix);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000432 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000433
Mark Slee2f6404d2006-10-10 01:37:40 +0000434 deadWorkers_.clear();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000435 }
436}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000437
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100438bool ThreadManager::Impl::canSleep() {
439 const Thread::id_t id = threadFactory_->getCurrentThreadId();
440 return idMap_.find(id) == idMap_.end();
441}
442
443void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) {
444 Guard g(mutex_, timeout);
445
446 if (!g) {
447 throw TimedOutException();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000448 }
449
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100450 if (state_ != ThreadManager::STARTED) {
451 throw IllegalStateException(
452 "ThreadManager::Impl::add ThreadManager "
453 "not started");
454 }
David Reiss4e19f192010-03-09 05:19:59 +0000455
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100456 removeExpiredTasks();
457 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
458 if (canSleep() && timeout >= 0) {
459 while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
460 // This is thread safe because the mutex is shared between monitors.
461 maxMonitor_.wait(timeout);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000462 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100463 } else {
464 throw TooManyPendingTasksException();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000465 }
Marc Slemko66949872006-07-15 01:52:39 +0000466 }
467
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100468 tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value, expiration)));
469
470 // If idle thread is available notify it, otherwise all worker threads are
471 // running and will get around to this task in time.
472 if (idleCount_ > 0) {
473 monitor_.notify();
474 }
475}
476
Marc Slemko6f038a72006-08-03 18:58:09 +0000477void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100478 (void)task;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000479 Synchronized s(monitor_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000480 if (state_ != ThreadManager::STARTED) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100481 throw IllegalStateException(
482 "ThreadManager::Impl::remove ThreadManager not "
483 "started");
Mark Sleef5f2be42006-09-05 21:05:31 +0000484 }
Marc Slemko66949872006-07-15 01:52:39 +0000485}
486
David Reiss01fe1532010-03-09 05:19:25 +0000487boost::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
488 Guard g(mutex_);
489 if (state_ != ThreadManager::STARTED) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100490 throw IllegalStateException(
491 "ThreadManager::Impl::removeNextPending "
492 "ThreadManager not started");
David Reiss01fe1532010-03-09 05:19:25 +0000493 }
494
495 if (tasks_.empty()) {
496 return boost::shared_ptr<Runnable>();
497 }
498
499 shared_ptr<ThreadManager::Task> task = tasks_.front();
500 tasks_.pop();
Roger Meier8b51bc62014-07-24 23:33:33 +0200501
David Reiss01fe1532010-03-09 05:19:25 +0000502 return task->getRunnable();
503}
504
David Reiss068f4162010-03-09 05:19:45 +0000505void ThreadManager::Impl::removeExpiredTasks() {
506 int64_t now = 0LL; // we won't ask for the time untile we need it
507
508 // note that this loop breaks at the first non-expiring task
509 while (!tasks_.empty()) {
510 shared_ptr<ThreadManager::Task> task = tasks_.front();
511 if (task->getExpireTime() == 0LL) {
512 break;
513 }
514 if (now == 0LL) {
515 now = Util::currentTime();
516 }
517 if (task->getExpireTime() > now) {
518 break;
519 }
520 if (expireCallback_) {
521 expireCallback_(task->getRunnable());
522 }
523 tasks_.pop();
524 expiredCount_++;
525 }
526}
527
David Reiss068f4162010-03-09 05:19:45 +0000528void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
529 expireCallback_ = expireCallback;
530}
531
Marc Slemkod466b212006-07-20 00:04:18 +0000532class SimpleThreadManager : public ThreadManager::Impl {
Marc Slemko66949872006-07-15 01:52:39 +0000533
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100534public:
535 SimpleThreadManager(size_t workerCount = 4, size_t pendingTaskCountMax = 0)
536 : workerCount_(workerCount), pendingTaskCountMax_(pendingTaskCountMax) {}
Marc Slemko66949872006-07-15 01:52:39 +0000537
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000538 void start() {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000539 ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000540 ThreadManager::Impl::start();
Mark Slee2f6404d2006-10-10 01:37:40 +0000541 addWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000542 }
543
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100544private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000545 const size_t workerCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000546 const size_t pendingTaskCountMax_;
Jens Geyere5fbedd2016-07-08 22:00:37 +0200547 Monitor monitor_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000548};
Marc Slemko66949872006-07-15 01:52:39 +0000549
Marc Slemko6f038a72006-08-03 18:58:09 +0000550shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
551 return shared_ptr<ThreadManager>(new ThreadManager::Impl());
Marc Slemkod466b212006-07-20 00:04:18 +0000552}
Marc Slemko66949872006-07-15 01:52:39 +0000553
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100554shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count,
555 size_t pendingTaskCountMax) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000556 return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
Marc Slemkod466b212006-07-20 00:04:18 +0000557}
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100558}
559}
560} // apache::thrift::concurrency