blob: 25b838aefbc3931dd921c25253ff5123d437207e [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Konrad Grochowski9be4e682013-06-22 22:03:31 +020020#include <thrift/thrift-config.h>
Roger Meier12d70532011-12-14 23:35:28 +000021
Roger Meier4285ba22013-06-10 21:17:23 +020022#include <thrift/concurrency/ThreadManager.h>
23#include <thrift/concurrency/Exception.h>
24#include <thrift/concurrency/Monitor.h>
Marc Slemko66949872006-07-15 01:52:39 +000025
cyy316723a2019-01-05 16:35:14 +080026#include <memory>
Marc Slemko6f038a72006-08-03 18:58:09 +000027
James E. King, IIIdf899132016-11-12 15:16:30 -050028#include <stdexcept>
29#include <deque>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000030#include <set>
Marc Slemko66949872006-07-15 01:52:39 +000031
Konrad Grochowski16a23a62014-11-13 15:33:38 +010032namespace apache {
33namespace thrift {
34namespace concurrency {
Marc Slemko66949872006-07-15 01:52:39 +000035
cyy316723a2019-01-05 16:35:14 +080036using std::shared_ptr;
cyybfdbd032019-01-12 14:38:28 +080037using std::unique_ptr;
cyy316723a2019-01-05 16:35:14 +080038using std::dynamic_pointer_cast;
Marc Slemko6f038a72006-08-03 18:58:09 +000039
Mark Sleef5f2be42006-09-05 21:05:31 +000040/**
41 * ThreadManager class
Marc Slemko3a3b53b2007-05-22 23:59:54 +000042 *
Mark Sleef5f2be42006-09-05 21:05:31 +000043 * This class manages a pool of threads. It uses a ThreadFactory to create
44 * threads. It never actually creates or destroys worker threads, rather
45 * it maintains statistics on number of idle threads, number of active threads,
46 * task backlog, and average wait and service times.
47 *
James E. King, IIIdf899132016-11-12 15:16:30 -050048 * There are three different monitors used for signaling different conditions
49 * however they all share the same mutex_.
50 *
Mark Sleef5f2be42006-09-05 21:05:31 +000051 * @version $Id:$
52 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +010053class ThreadManager::Impl : public ThreadManager {
Marc Slemko0e53ccd2006-07-17 23:51:05 +000054
Konrad Grochowski16a23a62014-11-13 15:33:38 +010055public:
56 Impl()
57 : workerCount_(0),
58 workerMaxCount_(0),
59 idleCount_(0),
60 pendingTaskCountMax_(0),
61 expiredCount_(0),
62 state_(ThreadManager::UNINITIALIZED),
63 monitor_(&mutex_),
James E. King, IIIdf899132016-11-12 15:16:30 -050064 maxMonitor_(&mutex_),
65 workerMonitor_(&mutex_) {}
Marc Slemkod466b212006-07-20 00:04:18 +000066
Sebastian Zenker042580f2019-01-29 15:48:12 +010067 ~Impl() override { stop(); }
Marc Slemkod466b212006-07-20 00:04:18 +000068
Sebastian Zenker042580f2019-01-29 15:48:12 +010069 void start() override;
70 void stop() override;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000071
Sebastian Zenker042580f2019-01-29 15:48:12 +010072 ThreadManager::STATE state() const override { return state_; }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000073
Sebastian Zenker042580f2019-01-29 15:48:12 +010074 shared_ptr<ThreadFactory> threadFactory() const override {
James E. King, IIIdf899132016-11-12 15:16:30 -050075 Guard g(mutex_);
Mark Slee2f6404d2006-10-10 01:37:40 +000076 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000077 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000078
Sebastian Zenker042580f2019-01-29 15:48:12 +010079 void threadFactory(shared_ptr<ThreadFactory> value) override {
James E. King, IIIdf899132016-11-12 15:16:30 -050080 Guard g(mutex_);
81 if (threadFactory_ && threadFactory_->isDetached() != value->isDetached()) {
82 throw InvalidArgumentException();
83 }
Mark Slee2f6404d2006-10-10 01:37:40 +000084 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000085 }
86
Sebastian Zenker042580f2019-01-29 15:48:12 +010087 void addWorker(size_t value) override;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000088
Sebastian Zenker042580f2019-01-29 15:48:12 +010089 void removeWorker(size_t value) override;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000090
Sebastian Zenker042580f2019-01-29 15:48:12 +010091 size_t idleWorkerCount() const override { return idleCount_; }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000092
Sebastian Zenker042580f2019-01-29 15:48:12 +010093 size_t workerCount() const override {
James E. King, IIIdf899132016-11-12 15:16:30 -050094 Guard g(mutex_);
Mark Slee2f6404d2006-10-10 01:37:40 +000095 return workerCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000096 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000097
Sebastian Zenker042580f2019-01-29 15:48:12 +010098 size_t pendingTaskCount() const override {
James E. King, IIIdf899132016-11-12 15:16:30 -050099 Guard g(mutex_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000100 return tasks_.size();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000101 }
102
Sebastian Zenker042580f2019-01-29 15:48:12 +0100103 size_t totalTaskCount() const override {
James E. King, IIIdf899132016-11-12 15:16:30 -0500104 Guard g(mutex_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000105 return tasks_.size() + workerCount_ - idleCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000106 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000107
Sebastian Zenker042580f2019-01-29 15:48:12 +0100108 size_t pendingTaskCountMax() const override {
James E. King, IIIdf899132016-11-12 15:16:30 -0500109 Guard g(mutex_);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000110 return pendingTaskCountMax_;
111 }
112
Sebastian Zenker042580f2019-01-29 15:48:12 +0100113 size_t expiredTaskCount() const override {
James E. King, IIIdf899132016-11-12 15:16:30 -0500114 Guard g(mutex_);
115 return expiredCount_;
David Reiss068f4162010-03-09 05:19:45 +0000116 }
117
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000118 void pendingTaskCountMax(const size_t value) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500119 Guard g(mutex_);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000120 pendingTaskCountMax_ = value;
121 }
122
Sebastian Zenker042580f2019-01-29 15:48:12 +0100123 void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) override;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000124
Sebastian Zenker042580f2019-01-29 15:48:12 +0100125 void remove(shared_ptr<Runnable> task) override;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000126
Sebastian Zenker042580f2019-01-29 15:48:12 +0100127 shared_ptr<Runnable> removeNextPending() override;
David Reiss01fe1532010-03-09 05:19:25 +0000128
Sebastian Zenker042580f2019-01-29 15:48:12 +0100129 void removeExpiredTasks() override {
James E. King, IIIdf899132016-11-12 15:16:30 -0500130 removeExpired(false);
131 }
David Reiss068f4162010-03-09 05:19:45 +0000132
Sebastian Zenker042580f2019-01-29 15:48:12 +0100133 void setExpireCallback(ExpireCallback expireCallback) override;
David Reiss068f4162010-03-09 05:19:45 +0000134
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000135private:
James E. King, IIIdf899132016-11-12 15:16:30 -0500136 /**
137 * Remove one or more expired tasks.
138 * \param[in] justOne if true, try to remove just one task and return
139 */
140 void removeExpired(bool justOne);
141
142 /**
143 * \returns whether it is acceptable to block, depending on the current thread id
144 */
145 bool canSleep() const;
146
147 /**
148 * Lowers the maximum worker count and blocks until enough worker threads complete
149 * to get to the new maximum worker limit. The caller is responsible for acquiring
150 * a lock on the class mutex_.
151 */
152 void removeWorkersUnderLock(size_t value);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000153
Mark Slee2f6404d2006-10-10 01:37:40 +0000154 size_t workerCount_;
155 size_t workerMaxCount_;
156 size_t idleCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000157 size_t pendingTaskCountMax_;
David Reiss068f4162010-03-09 05:19:45 +0000158 size_t expiredCount_;
159 ExpireCallback expireCallback_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000160
Mark Slee2f6404d2006-10-10 01:37:40 +0000161 ThreadManager::STATE state_;
162 shared_ptr<ThreadFactory> threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000163
164 friend class ThreadManager::Task;
James E. King, IIIdf899132016-11-12 15:16:30 -0500165 typedef std::deque<shared_ptr<Task> > TaskQueue;
166 TaskQueue tasks_;
David Reissa0dbfef2010-03-09 05:19:32 +0000167 Mutex mutex_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000168 Monitor monitor_;
David Reissa0dbfef2010-03-09 05:19:32 +0000169 Monitor maxMonitor_;
James E. King, IIIdf899132016-11-12 15:16:30 -0500170 Monitor workerMonitor_; // used to synchronize changes in worker count
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000171
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000172 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000173 std::set<shared_ptr<Thread> > workers_;
174 std::set<shared_ptr<Thread> > deadWorkers_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000175 std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000176};
Marc Slemko66949872006-07-15 01:52:39 +0000177
178class ThreadManager::Task : public Runnable {
179
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100180public:
James E. King, IIIdf899132016-11-12 15:16:30 -0500181 enum STATE { WAITING, EXECUTING, TIMEDOUT, COMPLETE };
Marc Slemko66949872006-07-15 01:52:39 +0000182
cyybfdbd032019-01-12 14:38:28 +0800183 Task(shared_ptr<Runnable> runnable, uint64_t expiration = 0ULL)
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100184 : runnable_(runnable),
cyybfdbd032019-01-12 14:38:28 +0800185 state_(WAITING) {
186 if (expiration != 0ULL) {
187 expireTime_.reset(new std::chrono::steady_clock::time_point(std::chrono::steady_clock::now() + std::chrono::milliseconds(expiration)));
188 }
189 }
Marc Slemko66949872006-07-15 01:52:39 +0000190
Sebastian Zenker042580f2019-01-29 15:48:12 +0100191 ~Task() override = default;
Marc Slemko66949872006-07-15 01:52:39 +0000192
Sebastian Zenker042580f2019-01-29 15:48:12 +0100193 void run() override {
Mark Slee2f6404d2006-10-10 01:37:40 +0000194 if (state_ == EXECUTING) {
195 runnable_->run();
196 state_ = COMPLETE;
Marc Slemko66949872006-07-15 01:52:39 +0000197 }
198 }
199
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100200 shared_ptr<Runnable> getRunnable() { return runnable_; }
David Reiss01fe1532010-03-09 05:19:25 +0000201
cyybfdbd032019-01-12 14:38:28 +0800202 const unique_ptr<std::chrono::steady_clock::time_point> & getExpireTime() const { return expireTime_; }
David Reiss068f4162010-03-09 05:19:45 +0000203
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100204private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000205 shared_ptr<Runnable> runnable_;
Marc Slemkod466b212006-07-20 00:04:18 +0000206 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000207 STATE state_;
cyybfdbd032019-01-12 14:38:28 +0800208 unique_ptr<std::chrono::steady_clock::time_point> expireTime_;
Marc Slemko66949872006-07-15 01:52:39 +0000209};
210
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100211class ThreadManager::Worker : public Runnable {
212 enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED };
Marc Slemko66949872006-07-15 01:52:39 +0000213
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100214public:
James E. King, IIIdf899132016-11-12 15:16:30 -0500215 Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED) {}
Marc Slemko66949872006-07-15 01:52:39 +0000216
Sebastian Zenker042580f2019-01-29 15:48:12 +0100217 ~Worker() override = default;
Marc Slemko66949872006-07-15 01:52:39 +0000218
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100219private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000220 bool isActive() const {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100221 return (manager_->workerCount_ <= manager_->workerMaxCount_)
222 || (manager_->state_ == JOINING && !manager_->tasks_.empty());
Mark Slee2f6404d2006-10-10 01:37:40 +0000223 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000224
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100225public:
Mark Sleef5f2be42006-09-05 21:05:31 +0000226 /**
227 * Worker entry point
228 *
229 * As long as worker thread is running, pull tasks off the task queue and
230 * execute.
231 */
Sebastian Zenker042580f2019-01-29 15:48:12 +0100232 void run() override {
James E. King, IIIdf899132016-11-12 15:16:30 -0500233 Guard g(manager_->mutex_);
234
235 /**
236 * This method has three parts; one is to check for and account for
237 * admitting a task which happens under a lock. Then the lock is released
238 * and the task itself is executed. Finally we do some accounting
239 * under lock again when the task completes.
240 */
241
242 /**
243 * Admitting
244 */
245
Mark Sleef5f2be42006-09-05 21:05:31 +0000246 /**
247 * Increment worker semaphore and notify manager if worker count reached
248 * desired max
Mark Sleef5f2be42006-09-05 21:05:31 +0000249 */
James E. King, IIIdf899132016-11-12 15:16:30 -0500250 bool active = manager_->workerCount_ < manager_->workerMaxCount_;
251 if (active) {
252 if (++manager_->workerCount_ == manager_->workerMaxCount_) {
Jim King61b17082016-04-19 15:57:31 -0400253 manager_->workerMonitor_.notify();
254 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000255 }
256
Mark Sleef5f2be42006-09-05 21:05:31 +0000257 while (active) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500258 /**
259 * While holding manager monitor block for non-empty task queue (Also
260 * check that the thread hasn't been requested to stop). Once the queue
261 * is non-empty, dequeue a task, release monitor, and execute. If the
262 * worker max count has been decremented such that we exceed it, mark
263 * ourself inactive, decrement the worker count and notify the manager
264 * (technically we're notifying the next blocked thread but eventually
265 * the manager will see it.
266 */
267 active = isActive();
268
269 while (active && manager_->tasks_.empty()) {
270 manager_->idleCount_++;
271 manager_->monitor_.wait();
272 active = isActive();
273 manager_->idleCount_--;
274 }
275
Marc Slemko6f038a72006-08-03 18:58:09 +0000276 shared_ptr<ThreadManager::Task> task;
Marc Slemko66949872006-07-15 01:52:39 +0000277
James E. King, IIIdf899132016-11-12 15:16:30 -0500278 if (active) {
279 if (!manager_->tasks_.empty()) {
280 task = manager_->tasks_.front();
281 manager_->tasks_.pop_front();
282 if (task->state_ == ThreadManager::Task::WAITING) {
283 // If the state is changed to anything other than EXECUTING or TIMEDOUT here
284 // then the execution loop needs to be changed below.
285 task->state_ =
cyybfdbd032019-01-12 14:38:28 +0800286 (task->getExpireTime() && *(task->getExpireTime()) < std::chrono::steady_clock::now()) ?
James E. King, IIIdf899132016-11-12 15:16:30 -0500287 ThreadManager::Task::TIMEDOUT :
288 ThreadManager::Task::EXECUTING;
289 }
David Reiss96d23882007-07-26 21:10:32 +0000290 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000291
James E. King, IIIdf899132016-11-12 15:16:30 -0500292 /* If we have a pending task max and we just dropped below it, wakeup any
293 thread that might be blocked on add. */
294 if (manager_->pendingTaskCountMax_ != 0
295 && manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
296 manager_->maxMonitor_.notify();
David Reiss96d23882007-07-26 21:10:32 +0000297 }
Marc Slemko66949872006-07-15 01:52:39 +0000298 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000299
James E. King, IIIdf899132016-11-12 15:16:30 -0500300 /**
301 * Execution - not holding a lock
302 */
Roger Meier72957452013-06-29 00:28:50 +0200303 if (task) {
David Reiss96d23882007-07-26 21:10:32 +0000304 if (task->state_ == ThreadManager::Task::EXECUTING) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500305
306 // Release the lock so we can run the task without blocking the thread manager
307 manager_->mutex_.unlock();
308
David Reiss96d23882007-07-26 21:10:32 +0000309 try {
Mark Sleef5f2be42006-09-05 21:05:31 +0000310 task->run();
Konrad Grochowski3876ea72014-12-09 15:24:56 +0100311 } catch (const std::exception& e) {
Jens Geyerfb05cf62014-12-04 21:49:07 +0100312 GlobalOutput.printf("[ERROR] task->run() raised an exception: %s", e.what());
Konrad Grochowski3876ea72014-12-09 15:24:56 +0100313 } catch (...) {
Jens Geyerfb05cf62014-12-04 21:49:07 +0100314 GlobalOutput.printf("[ERROR] task->run() raised an unknown exception");
David Reiss96d23882007-07-26 21:10:32 +0000315 }
James E. King, IIIdf899132016-11-12 15:16:30 -0500316
317 // Re-acquire the lock to proceed in the thread manager
318 manager_->mutex_.lock();
319
320 } else if (manager_->expireCallback_) {
321 // The only other state the task could have been in is TIMEDOUT (see above)
322 manager_->expireCallback_(task->getRunnable());
323 manager_->expiredCount_++;
David Reiss96d23882007-07-26 21:10:32 +0000324 }
Marc Slemko66949872006-07-15 01:52:39 +0000325 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000326 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000327
James E. King, IIIdf899132016-11-12 15:16:30 -0500328 /**
329 * Final accounting for the worker thread that is done working
330 */
331 manager_->deadWorkers_.insert(this->thread());
332 if (--manager_->workerCount_ == manager_->workerMaxCount_) {
333 manager_->workerMonitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000334 }
Marc Slemko66949872006-07-15 01:52:39 +0000335 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000336
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100337private:
338 ThreadManager::Impl* manager_;
339 friend class ThreadManager::Impl;
340 STATE state_;
Marc Slemko66949872006-07-15 01:52:39 +0000341};
342
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100343void ThreadManager::Impl::addWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000344 std::set<shared_ptr<Thread> > newThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000345 for (size_t ix = 0; ix < value; ix++) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100346 shared_ptr<ThreadManager::Worker> worker
cyy64750162019-02-08 13:40:59 +0800347 = std::make_shared<ThreadManager::Worker>(this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000348 newThreads.insert(threadFactory_->newThread(worker));
Marc Slemko66949872006-07-15 01:52:39 +0000349 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000350
James E. King, IIIdf899132016-11-12 15:16:30 -0500351 Guard g(mutex_);
352 workerMaxCount_ += value;
353 workers_.insert(newThreads.begin(), newThreads.end());
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000354
cyy64750162019-02-08 13:40:59 +0800355 for (const auto & newThread : newThreads) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100356 shared_ptr<ThreadManager::Worker> worker
cyy64750162019-02-08 13:40:59 +0800357 = dynamic_pointer_cast<ThreadManager::Worker, Runnable>(newThread->runnable());
Mark Slee2f6404d2006-10-10 01:37:40 +0000358 worker->state_ = ThreadManager::Worker::STARTING;
cyy64750162019-02-08 13:40:59 +0800359 newThread->start();
360 idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >(newThread->getId(), newThread));
Marc Slemkod466b212006-07-20 00:04:18 +0000361 }
362
James E. King, IIIdf899132016-11-12 15:16:30 -0500363 while (workerCount_ != workerMaxCount_) {
364 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000365 }
366}
Marc Slemkod466b212006-07-20 00:04:18 +0000367
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000368void ThreadManager::Impl::start() {
James E. King, IIIdf899132016-11-12 15:16:30 -0500369 Guard g(mutex_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000370 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000371 return;
372 }
373
James E. King, IIIdf899132016-11-12 15:16:30 -0500374 if (state_ == ThreadManager::UNINITIALIZED) {
375 if (!threadFactory_) {
376 throw InvalidArgumentException();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000377 }
James E. King, IIIdf899132016-11-12 15:16:30 -0500378 state_ = ThreadManager::STARTED;
379 monitor_.notifyAll();
380 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000381
James E. King, IIIdf899132016-11-12 15:16:30 -0500382 while (state_ == STARTING) {
383 monitor_.wait();
Marc Slemkod466b212006-07-20 00:04:18 +0000384 }
385}
386
James E. King, IIIdf899132016-11-12 15:16:30 -0500387void ThreadManager::Impl::stop() {
388 Guard g(mutex_);
Marc Slemkod466b212006-07-20 00:04:18 +0000389 bool doStop = false;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000390
James E. King, IIIdf899132016-11-12 15:16:30 -0500391 if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING
392 && state_ != ThreadManager::STOPPED) {
393 doStop = true;
394 state_ = ThreadManager::JOINING;
Marc Slemkod466b212006-07-20 00:04:18 +0000395 }
396
Mark Sleef5f2be42006-09-05 21:05:31 +0000397 if (doStop) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500398 removeWorkersUnderLock(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000399 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000400
James E. King, IIIdf899132016-11-12 15:16:30 -0500401 state_ = ThreadManager::STOPPED;
Marc Slemkod466b212006-07-20 00:04:18 +0000402}
Mark Slee7c10eaf2007-03-01 02:45:10 +0000403
Marc Slemkod466b212006-07-20 00:04:18 +0000404void ThreadManager::Impl::removeWorker(size_t value) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500405 Guard g(mutex_);
406 removeWorkersUnderLock(value);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000407}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000408
James E. King, IIIdf899132016-11-12 15:16:30 -0500409void ThreadManager::Impl::removeWorkersUnderLock(size_t value) {
410 if (value > workerMaxCount_) {
411 throw InvalidArgumentException();
412 }
413
414 workerMaxCount_ -= value;
415
416 if (idleCount_ > value) {
417 // There are more idle workers than we need to remove,
418 // so notify enough of them so they can terminate.
419 for (size_t ix = 0; ix < value; ix++) {
420 monitor_.notify();
421 }
422 } else {
423 // There are as many or less idle workers than we need to remove,
424 // so just notify them all so they can terminate.
425 monitor_.notifyAll();
426 }
427
428 while (workerCount_ != workerMaxCount_) {
429 workerMonitor_.wait();
430 }
431
cyy64750162019-02-08 13:40:59 +0800432 for (const auto & deadWorker : deadWorkers_) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500433
434 // when used with a joinable thread factory, we join the threads as we remove them
435 if (!threadFactory_->isDetached()) {
cyy64750162019-02-08 13:40:59 +0800436 deadWorker->join();
James E. King, IIIdf899132016-11-12 15:16:30 -0500437 }
438
cyy64750162019-02-08 13:40:59 +0800439 idMap_.erase(deadWorker->getId());
440 workers_.erase(deadWorker);
James E. King, IIIdf899132016-11-12 15:16:30 -0500441 }
442
443 deadWorkers_.clear();
444}
445
446bool ThreadManager::Impl::canSleep() const {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100447 const Thread::id_t id = threadFactory_->getCurrentThreadId();
448 return idMap_.find(id) == idMap_.end();
449}
450
451void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) {
452 Guard g(mutex_, timeout);
453
454 if (!g) {
455 throw TimedOutException();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000456 }
457
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100458 if (state_ != ThreadManager::STARTED) {
459 throw IllegalStateException(
460 "ThreadManager::Impl::add ThreadManager "
461 "not started");
462 }
David Reiss4e19f192010-03-09 05:19:59 +0000463
James E. King, IIIdf899132016-11-12 15:16:30 -0500464 // if we're at a limit, remove an expired task to see if the limit clears
465 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
466 removeExpired(true);
467 }
468
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100469 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
470 if (canSleep() && timeout >= 0) {
471 while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
472 // This is thread safe because the mutex is shared between monitors.
473 maxMonitor_.wait(timeout);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000474 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100475 } else {
476 throw TooManyPendingTasksException();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000477 }
Marc Slemko66949872006-07-15 01:52:39 +0000478 }
479
cyy64750162019-02-08 13:40:59 +0800480 tasks_.push_back(std::make_shared<ThreadManager::Task>(value, expiration));
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100481
482 // If idle thread is available notify it, otherwise all worker threads are
483 // running and will get around to this task in time.
484 if (idleCount_ > 0) {
485 monitor_.notify();
486 }
487}
488
Marc Slemko6f038a72006-08-03 18:58:09 +0000489void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500490 Guard g(mutex_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000491 if (state_ != ThreadManager::STARTED) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100492 throw IllegalStateException(
493 "ThreadManager::Impl::remove ThreadManager not "
494 "started");
Mark Sleef5f2be42006-09-05 21:05:31 +0000495 }
James E. King, IIIdf899132016-11-12 15:16:30 -0500496
Sebastian Zenker042580f2019-01-29 15:48:12 +0100497 for (auto it = tasks_.begin(); it != tasks_.end(); ++it)
James E. King, IIIdf899132016-11-12 15:16:30 -0500498 {
499 if ((*it)->getRunnable() == task)
500 {
501 tasks_.erase(it);
502 return;
503 }
504 }
Marc Slemko66949872006-07-15 01:52:39 +0000505}
506
cyy316723a2019-01-05 16:35:14 +0800507std::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
David Reiss01fe1532010-03-09 05:19:25 +0000508 Guard g(mutex_);
509 if (state_ != ThreadManager::STARTED) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100510 throw IllegalStateException(
511 "ThreadManager::Impl::removeNextPending "
512 "ThreadManager not started");
David Reiss01fe1532010-03-09 05:19:25 +0000513 }
514
515 if (tasks_.empty()) {
cyy316723a2019-01-05 16:35:14 +0800516 return std::shared_ptr<Runnable>();
David Reiss01fe1532010-03-09 05:19:25 +0000517 }
518
519 shared_ptr<ThreadManager::Task> task = tasks_.front();
James E. King, IIIdf899132016-11-12 15:16:30 -0500520 tasks_.pop_front();
Roger Meier8b51bc62014-07-24 23:33:33 +0200521
David Reiss01fe1532010-03-09 05:19:25 +0000522 return task->getRunnable();
523}
524
James E. King, IIIdf899132016-11-12 15:16:30 -0500525void ThreadManager::Impl::removeExpired(bool justOne) {
526 // this is always called under a lock
cyybfdbd032019-01-12 14:38:28 +0800527 if (tasks_.empty()) {
528 return;
529 }
530 auto now = std::chrono::steady_clock::now();
David Reiss068f4162010-03-09 05:19:45 +0000531
Sebastian Zenker042580f2019-01-29 15:48:12 +0100532 for (auto it = tasks_.begin(); it != tasks_.end(); )
James E. King, IIIdf899132016-11-12 15:16:30 -0500533 {
cyybfdbd032019-01-12 14:38:28 +0800534 if ((*it)->getExpireTime() && *((*it)->getExpireTime()) < now) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500535 if (expireCallback_) {
536 expireCallback_((*it)->getRunnable());
537 }
538 it = tasks_.erase(it);
539 ++expiredCount_;
540 if (justOne) {
541 return;
542 }
David Reiss068f4162010-03-09 05:19:45 +0000543 }
James E. King, IIIdf899132016-11-12 15:16:30 -0500544 else
545 {
546 ++it;
David Reiss068f4162010-03-09 05:19:45 +0000547 }
David Reiss068f4162010-03-09 05:19:45 +0000548 }
549}
550
David Reiss068f4162010-03-09 05:19:45 +0000551void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500552 Guard g(mutex_);
David Reiss068f4162010-03-09 05:19:45 +0000553 expireCallback_ = expireCallback;
554}
555
Marc Slemkod466b212006-07-20 00:04:18 +0000556class SimpleThreadManager : public ThreadManager::Impl {
Marc Slemko66949872006-07-15 01:52:39 +0000557
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100558public:
559 SimpleThreadManager(size_t workerCount = 4, size_t pendingTaskCountMax = 0)
560 : workerCount_(workerCount), pendingTaskCountMax_(pendingTaskCountMax) {}
Marc Slemko66949872006-07-15 01:52:39 +0000561
Sebastian Zenker042580f2019-01-29 15:48:12 +0100562 void start() override {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000563 ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000564 ThreadManager::Impl::start();
Mark Slee2f6404d2006-10-10 01:37:40 +0000565 addWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000566 }
567
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100568private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000569 const size_t workerCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000570 const size_t pendingTaskCountMax_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000571};
Marc Slemko66949872006-07-15 01:52:39 +0000572
Marc Slemko6f038a72006-08-03 18:58:09 +0000573shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
574 return shared_ptr<ThreadManager>(new ThreadManager::Impl());
Marc Slemkod466b212006-07-20 00:04:18 +0000575}
Marc Slemko66949872006-07-15 01:52:39 +0000576
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100577shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count,
578 size_t pendingTaskCountMax) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000579 return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
Marc Slemkod466b212006-07-20 00:04:18 +0000580}
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100581}
582}
583} // apache::thrift::concurrency