blob: e917f4a478b8ebdf0d5897ef6f285e6af1c8daf8 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Konrad Grochowski9be4e682013-06-22 22:03:31 +020020#include <thrift/thrift-config.h>
Roger Meier12d70532011-12-14 23:35:28 +000021
Roger Meier4285ba22013-06-10 21:17:23 +020022#include <thrift/concurrency/ThreadManager.h>
23#include <thrift/concurrency/Exception.h>
24#include <thrift/concurrency/Monitor.h>
Marc Slemko66949872006-07-15 01:52:39 +000025
cyy316723a2019-01-05 16:35:14 +080026#include <memory>
Marc Slemko6f038a72006-08-03 18:58:09 +000027
James E. King, IIIdf899132016-11-12 15:16:30 -050028#include <stdexcept>
29#include <deque>
Marc Slemko0e53ccd2006-07-17 23:51:05 +000030#include <set>
Marc Slemko66949872006-07-15 01:52:39 +000031
Konrad Grochowski16a23a62014-11-13 15:33:38 +010032namespace apache {
33namespace thrift {
34namespace concurrency {
Marc Slemko66949872006-07-15 01:52:39 +000035
cyy316723a2019-01-05 16:35:14 +080036using std::shared_ptr;
cyybfdbd032019-01-12 14:38:28 +080037using std::unique_ptr;
cyy316723a2019-01-05 16:35:14 +080038using std::dynamic_pointer_cast;
Marc Slemko6f038a72006-08-03 18:58:09 +000039
Mark Sleef5f2be42006-09-05 21:05:31 +000040/**
41 * ThreadManager class
Marc Slemko3a3b53b2007-05-22 23:59:54 +000042 *
Mark Sleef5f2be42006-09-05 21:05:31 +000043 * This class manages a pool of threads. It uses a ThreadFactory to create
44 * threads. It never actually creates or destroys worker threads, rather
45 * it maintains statistics on number of idle threads, number of active threads,
46 * task backlog, and average wait and service times.
47 *
James E. King, IIIdf899132016-11-12 15:16:30 -050048 * There are three different monitors used for signaling different conditions
49 * however they all share the same mutex_.
50 *
Mark Sleef5f2be42006-09-05 21:05:31 +000051 * @version $Id:$
52 */
Konrad Grochowski16a23a62014-11-13 15:33:38 +010053class ThreadManager::Impl : public ThreadManager {
Marc Slemko0e53ccd2006-07-17 23:51:05 +000054
Konrad Grochowski16a23a62014-11-13 15:33:38 +010055public:
56 Impl()
57 : workerCount_(0),
58 workerMaxCount_(0),
59 idleCount_(0),
60 pendingTaskCountMax_(0),
61 expiredCount_(0),
62 state_(ThreadManager::UNINITIALIZED),
63 monitor_(&mutex_),
James E. King, IIIdf899132016-11-12 15:16:30 -050064 maxMonitor_(&mutex_),
65 workerMonitor_(&mutex_) {}
Marc Slemkod466b212006-07-20 00:04:18 +000066
Sebastian Zenker042580f2019-01-29 15:48:12 +010067 ~Impl() override { stop(); }
Marc Slemkod466b212006-07-20 00:04:18 +000068
Sebastian Zenker042580f2019-01-29 15:48:12 +010069 void start() override;
70 void stop() override;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000071
Sebastian Zenker042580f2019-01-29 15:48:12 +010072 ThreadManager::STATE state() const override { return state_; }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000073
Sebastian Zenker042580f2019-01-29 15:48:12 +010074 shared_ptr<ThreadFactory> threadFactory() const override {
James E. King, IIIdf899132016-11-12 15:16:30 -050075 Guard g(mutex_);
Mark Slee2f6404d2006-10-10 01:37:40 +000076 return threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000077 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000078
Sebastian Zenker042580f2019-01-29 15:48:12 +010079 void threadFactory(shared_ptr<ThreadFactory> value) override {
James E. King, IIIdf899132016-11-12 15:16:30 -050080 Guard g(mutex_);
81 if (threadFactory_ && threadFactory_->isDetached() != value->isDetached()) {
82 throw InvalidArgumentException();
83 }
Mark Slee2f6404d2006-10-10 01:37:40 +000084 threadFactory_ = value;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000085 }
86
Sebastian Zenker042580f2019-01-29 15:48:12 +010087 void addWorker(size_t value) override;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000088
Sebastian Zenker042580f2019-01-29 15:48:12 +010089 void removeWorker(size_t value) override;
Marc Slemko3a3b53b2007-05-22 23:59:54 +000090
Sebastian Zenker042580f2019-01-29 15:48:12 +010091 size_t idleWorkerCount() const override { return idleCount_; }
Marc Slemko0e53ccd2006-07-17 23:51:05 +000092
Sebastian Zenker042580f2019-01-29 15:48:12 +010093 size_t workerCount() const override {
James E. King, IIIdf899132016-11-12 15:16:30 -050094 Guard g(mutex_);
Mark Slee2f6404d2006-10-10 01:37:40 +000095 return workerCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +000096 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +000097
Sebastian Zenker042580f2019-01-29 15:48:12 +010098 size_t pendingTaskCount() const override {
James E. King, IIIdf899132016-11-12 15:16:30 -050099 Guard g(mutex_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000100 return tasks_.size();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000101 }
102
Sebastian Zenker042580f2019-01-29 15:48:12 +0100103 size_t totalTaskCount() const override {
James E. King, IIIdf899132016-11-12 15:16:30 -0500104 Guard g(mutex_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000105 return tasks_.size() + workerCount_ - idleCount_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000106 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000107
Sebastian Zenker042580f2019-01-29 15:48:12 +0100108 size_t pendingTaskCountMax() const override {
James E. King, IIIdf899132016-11-12 15:16:30 -0500109 Guard g(mutex_);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000110 return pendingTaskCountMax_;
111 }
112
Sebastian Zenker042580f2019-01-29 15:48:12 +0100113 size_t expiredTaskCount() const override {
James E. King, IIIdf899132016-11-12 15:16:30 -0500114 Guard g(mutex_);
115 return expiredCount_;
David Reiss068f4162010-03-09 05:19:45 +0000116 }
117
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000118 void pendingTaskCountMax(const size_t value) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500119 Guard g(mutex_);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000120 pendingTaskCountMax_ = value;
121 }
122
Sebastian Zenker042580f2019-01-29 15:48:12 +0100123 void add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) override;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000124
Sebastian Zenker042580f2019-01-29 15:48:12 +0100125 void remove(shared_ptr<Runnable> task) override;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000126
Sebastian Zenker042580f2019-01-29 15:48:12 +0100127 shared_ptr<Runnable> removeNextPending() override;
David Reiss01fe1532010-03-09 05:19:25 +0000128
Sebastian Zenker042580f2019-01-29 15:48:12 +0100129 void removeExpiredTasks() override {
James E. King, IIIdf899132016-11-12 15:16:30 -0500130 removeExpired(false);
131 }
David Reiss068f4162010-03-09 05:19:45 +0000132
Sebastian Zenker042580f2019-01-29 15:48:12 +0100133 void setExpireCallback(ExpireCallback expireCallback) override;
David Reiss068f4162010-03-09 05:19:45 +0000134
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000135private:
James E. King, IIIdf899132016-11-12 15:16:30 -0500136 /**
137 * Remove one or more expired tasks.
138 * \param[in] justOne if true, try to remove just one task and return
139 */
140 void removeExpired(bool justOne);
141
142 /**
143 * \returns whether it is acceptable to block, depending on the current thread id
144 */
145 bool canSleep() const;
146
147 /**
148 * Lowers the maximum worker count and blocks until enough worker threads complete
149 * to get to the new maximum worker limit. The caller is responsible for acquiring
150 * a lock on the class mutex_.
151 */
152 void removeWorkersUnderLock(size_t value);
Mark Slee7c10eaf2007-03-01 02:45:10 +0000153
Mark Slee2f6404d2006-10-10 01:37:40 +0000154 size_t workerCount_;
155 size_t workerMaxCount_;
156 size_t idleCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000157 size_t pendingTaskCountMax_;
David Reiss068f4162010-03-09 05:19:45 +0000158 size_t expiredCount_;
159 ExpireCallback expireCallback_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000160
Mark Slee2f6404d2006-10-10 01:37:40 +0000161 ThreadManager::STATE state_;
162 shared_ptr<ThreadFactory> threadFactory_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000163
164 friend class ThreadManager::Task;
James E. King, IIIdf899132016-11-12 15:16:30 -0500165 typedef std::deque<shared_ptr<Task> > TaskQueue;
166 TaskQueue tasks_;
David Reissa0dbfef2010-03-09 05:19:32 +0000167 Mutex mutex_;
Mark Slee2f6404d2006-10-10 01:37:40 +0000168 Monitor monitor_;
David Reissa0dbfef2010-03-09 05:19:32 +0000169 Monitor maxMonitor_;
James E. King, IIIdf899132016-11-12 15:16:30 -0500170 Monitor workerMonitor_; // used to synchronize changes in worker count
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000171
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000172 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000173 std::set<shared_ptr<Thread> > workers_;
174 std::set<shared_ptr<Thread> > deadWorkers_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000175 std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000176};
Marc Slemko66949872006-07-15 01:52:39 +0000177
178class ThreadManager::Task : public Runnable {
179
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100180public:
James E. King, IIIdf899132016-11-12 15:16:30 -0500181 enum STATE { WAITING, EXECUTING, TIMEDOUT, COMPLETE };
Marc Slemko66949872006-07-15 01:52:39 +0000182
cyybfdbd032019-01-12 14:38:28 +0800183 Task(shared_ptr<Runnable> runnable, uint64_t expiration = 0ULL)
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100184 : runnable_(runnable),
cyybfdbd032019-01-12 14:38:28 +0800185 state_(WAITING) {
186 if (expiration != 0ULL) {
187 expireTime_.reset(new std::chrono::steady_clock::time_point(std::chrono::steady_clock::now() + std::chrono::milliseconds(expiration)));
188 }
189 }
Marc Slemko66949872006-07-15 01:52:39 +0000190
Sebastian Zenker042580f2019-01-29 15:48:12 +0100191 ~Task() override = default;
Marc Slemko66949872006-07-15 01:52:39 +0000192
Sebastian Zenker042580f2019-01-29 15:48:12 +0100193 void run() override {
Mark Slee2f6404d2006-10-10 01:37:40 +0000194 if (state_ == EXECUTING) {
195 runnable_->run();
196 state_ = COMPLETE;
Marc Slemko66949872006-07-15 01:52:39 +0000197 }
198 }
199
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100200 shared_ptr<Runnable> getRunnable() { return runnable_; }
David Reiss01fe1532010-03-09 05:19:25 +0000201
cyybfdbd032019-01-12 14:38:28 +0800202 const unique_ptr<std::chrono::steady_clock::time_point> & getExpireTime() const { return expireTime_; }
David Reiss068f4162010-03-09 05:19:45 +0000203
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100204private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000205 shared_ptr<Runnable> runnable_;
Marc Slemkod466b212006-07-20 00:04:18 +0000206 friend class ThreadManager::Worker;
Mark Slee2f6404d2006-10-10 01:37:40 +0000207 STATE state_;
cyybfdbd032019-01-12 14:38:28 +0800208 unique_ptr<std::chrono::steady_clock::time_point> expireTime_;
Marc Slemko66949872006-07-15 01:52:39 +0000209};
210
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100211class ThreadManager::Worker : public Runnable {
212 enum STATE { UNINITIALIZED, STARTING, STARTED, STOPPING, STOPPED };
Marc Slemko66949872006-07-15 01:52:39 +0000213
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100214public:
James E. King, IIIdf899132016-11-12 15:16:30 -0500215 Worker(ThreadManager::Impl* manager) : manager_(manager), state_(UNINITIALIZED) {}
Marc Slemko66949872006-07-15 01:52:39 +0000216
Sebastian Zenker042580f2019-01-29 15:48:12 +0100217 ~Worker() override = default;
Marc Slemko66949872006-07-15 01:52:39 +0000218
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100219private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000220 bool isActive() const {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100221 return (manager_->workerCount_ <= manager_->workerMaxCount_)
222 || (manager_->state_ == JOINING && !manager_->tasks_.empty());
Mark Slee2f6404d2006-10-10 01:37:40 +0000223 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000224
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100225public:
Mark Sleef5f2be42006-09-05 21:05:31 +0000226 /**
227 * Worker entry point
228 *
229 * As long as worker thread is running, pull tasks off the task queue and
230 * execute.
231 */
Sebastian Zenker042580f2019-01-29 15:48:12 +0100232 void run() override {
James E. King, IIIdf899132016-11-12 15:16:30 -0500233 Guard g(manager_->mutex_);
234
235 /**
236 * This method has three parts; one is to check for and account for
237 * admitting a task which happens under a lock. Then the lock is released
238 * and the task itself is executed. Finally we do some accounting
239 * under lock again when the task completes.
240 */
241
242 /**
243 * Admitting
244 */
245
Mark Sleef5f2be42006-09-05 21:05:31 +0000246 /**
247 * Increment worker semaphore and notify manager if worker count reached
248 * desired max
Mark Sleef5f2be42006-09-05 21:05:31 +0000249 */
James E. King, IIIdf899132016-11-12 15:16:30 -0500250 bool active = manager_->workerCount_ < manager_->workerMaxCount_;
251 if (active) {
252 if (++manager_->workerCount_ == manager_->workerMaxCount_) {
Jim King61b17082016-04-19 15:57:31 -0400253 manager_->workerMonitor_.notify();
254 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000255 }
256
Mark Sleef5f2be42006-09-05 21:05:31 +0000257 while (active) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500258 /**
259 * While holding manager monitor block for non-empty task queue (Also
260 * check that the thread hasn't been requested to stop). Once the queue
261 * is non-empty, dequeue a task, release monitor, and execute. If the
262 * worker max count has been decremented such that we exceed it, mark
263 * ourself inactive, decrement the worker count and notify the manager
264 * (technically we're notifying the next blocked thread but eventually
265 * the manager will see it.
266 */
267 active = isActive();
268
269 while (active && manager_->tasks_.empty()) {
270 manager_->idleCount_++;
271 manager_->monitor_.wait();
272 active = isActive();
273 manager_->idleCount_--;
274 }
275
Marc Slemko6f038a72006-08-03 18:58:09 +0000276 shared_ptr<ThreadManager::Task> task;
Marc Slemko66949872006-07-15 01:52:39 +0000277
James E. King, IIIdf899132016-11-12 15:16:30 -0500278 if (active) {
279 if (!manager_->tasks_.empty()) {
280 task = manager_->tasks_.front();
281 manager_->tasks_.pop_front();
282 if (task->state_ == ThreadManager::Task::WAITING) {
283 // If the state is changed to anything other than EXECUTING or TIMEDOUT here
284 // then the execution loop needs to be changed below.
285 task->state_ =
cyybfdbd032019-01-12 14:38:28 +0800286 (task->getExpireTime() && *(task->getExpireTime()) < std::chrono::steady_clock::now()) ?
James E. King, IIIdf899132016-11-12 15:16:30 -0500287 ThreadManager::Task::TIMEDOUT :
288 ThreadManager::Task::EXECUTING;
289 }
David Reiss96d23882007-07-26 21:10:32 +0000290 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000291
James E. King, IIIdf899132016-11-12 15:16:30 -0500292 /* If we have a pending task max and we just dropped below it, wakeup any
293 thread that might be blocked on add. */
294 if (manager_->pendingTaskCountMax_ != 0
295 && manager_->tasks_.size() <= manager_->pendingTaskCountMax_ - 1) {
296 manager_->maxMonitor_.notify();
David Reiss96d23882007-07-26 21:10:32 +0000297 }
Marc Slemko66949872006-07-15 01:52:39 +0000298 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000299
James E. King, IIIdf899132016-11-12 15:16:30 -0500300 /**
301 * Execution - not holding a lock
302 */
Roger Meier72957452013-06-29 00:28:50 +0200303 if (task) {
David Reiss96d23882007-07-26 21:10:32 +0000304 if (task->state_ == ThreadManager::Task::EXECUTING) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500305
306 // Release the lock so we can run the task without blocking the thread manager
307 manager_->mutex_.unlock();
308
David Reiss96d23882007-07-26 21:10:32 +0000309 try {
Mark Sleef5f2be42006-09-05 21:05:31 +0000310 task->run();
Konrad Grochowski3876ea72014-12-09 15:24:56 +0100311 } catch (const std::exception& e) {
Jens Geyerfb05cf62014-12-04 21:49:07 +0100312 GlobalOutput.printf("[ERROR] task->run() raised an exception: %s", e.what());
Konrad Grochowski3876ea72014-12-09 15:24:56 +0100313 } catch (...) {
Jens Geyerfb05cf62014-12-04 21:49:07 +0100314 GlobalOutput.printf("[ERROR] task->run() raised an unknown exception");
David Reiss96d23882007-07-26 21:10:32 +0000315 }
James E. King, IIIdf899132016-11-12 15:16:30 -0500316
317 // Re-acquire the lock to proceed in the thread manager
318 manager_->mutex_.lock();
319
320 } else if (manager_->expireCallback_) {
321 // The only other state the task could have been in is TIMEDOUT (see above)
Kanishth Karthik8ec58572020-03-26 01:29:50 +0530322 manager_->mutex_.unlock();
James E. King, IIIdf899132016-11-12 15:16:30 -0500323 manager_->expireCallback_(task->getRunnable());
Kanishth Karthik8ec58572020-03-26 01:29:50 +0530324 manager_->mutex_.lock();
James E. King, IIIdf899132016-11-12 15:16:30 -0500325 manager_->expiredCount_++;
David Reiss96d23882007-07-26 21:10:32 +0000326 }
Marc Slemko66949872006-07-15 01:52:39 +0000327 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000328 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000329
James E. King, IIIdf899132016-11-12 15:16:30 -0500330 /**
331 * Final accounting for the worker thread that is done working
332 */
333 manager_->deadWorkers_.insert(this->thread());
334 if (--manager_->workerCount_ == manager_->workerMaxCount_) {
335 manager_->workerMonitor_.notify();
Marc Slemko66949872006-07-15 01:52:39 +0000336 }
Marc Slemko66949872006-07-15 01:52:39 +0000337 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000338
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100339private:
340 ThreadManager::Impl* manager_;
341 friend class ThreadManager::Impl;
342 STATE state_;
Marc Slemko66949872006-07-15 01:52:39 +0000343};
344
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100345void ThreadManager::Impl::addWorker(size_t value) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000346 std::set<shared_ptr<Thread> > newThreads;
Mark Sleef5f2be42006-09-05 21:05:31 +0000347 for (size_t ix = 0; ix < value; ix++) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100348 shared_ptr<ThreadManager::Worker> worker
cyy64750162019-02-08 13:40:59 +0800349 = std::make_shared<ThreadManager::Worker>(this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000350 newThreads.insert(threadFactory_->newThread(worker));
Marc Slemko66949872006-07-15 01:52:39 +0000351 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000352
James E. King, IIIdf899132016-11-12 15:16:30 -0500353 Guard g(mutex_);
354 workerMaxCount_ += value;
355 workers_.insert(newThreads.begin(), newThreads.end());
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000356
cyy64750162019-02-08 13:40:59 +0800357 for (const auto & newThread : newThreads) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100358 shared_ptr<ThreadManager::Worker> worker
cyy64750162019-02-08 13:40:59 +0800359 = dynamic_pointer_cast<ThreadManager::Worker, Runnable>(newThread->runnable());
Mark Slee2f6404d2006-10-10 01:37:40 +0000360 worker->state_ = ThreadManager::Worker::STARTING;
cyy64750162019-02-08 13:40:59 +0800361 newThread->start();
362 idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >(newThread->getId(), newThread));
Marc Slemkod466b212006-07-20 00:04:18 +0000363 }
364
James E. King, IIIdf899132016-11-12 15:16:30 -0500365 while (workerCount_ != workerMaxCount_) {
366 workerMonitor_.wait();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000367 }
368}
Marc Slemkod466b212006-07-20 00:04:18 +0000369
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000370void ThreadManager::Impl::start() {
James E. King, IIIdf899132016-11-12 15:16:30 -0500371 Guard g(mutex_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000372 if (state_ == ThreadManager::STOPPED) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000373 return;
374 }
375
James E. King, IIIdf899132016-11-12 15:16:30 -0500376 if (state_ == ThreadManager::UNINITIALIZED) {
377 if (!threadFactory_) {
378 throw InvalidArgumentException();
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000379 }
James E. King, IIIdf899132016-11-12 15:16:30 -0500380 state_ = ThreadManager::STARTED;
381 monitor_.notifyAll();
382 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000383
James E. King, IIIdf899132016-11-12 15:16:30 -0500384 while (state_ == STARTING) {
385 monitor_.wait();
Marc Slemkod466b212006-07-20 00:04:18 +0000386 }
387}
388
James E. King, IIIdf899132016-11-12 15:16:30 -0500389void ThreadManager::Impl::stop() {
390 Guard g(mutex_);
Marc Slemkod466b212006-07-20 00:04:18 +0000391 bool doStop = false;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000392
James E. King, IIIdf899132016-11-12 15:16:30 -0500393 if (state_ != ThreadManager::STOPPING && state_ != ThreadManager::JOINING
394 && state_ != ThreadManager::STOPPED) {
395 doStop = true;
396 state_ = ThreadManager::JOINING;
Marc Slemkod466b212006-07-20 00:04:18 +0000397 }
398
Mark Sleef5f2be42006-09-05 21:05:31 +0000399 if (doStop) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500400 removeWorkersUnderLock(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000401 }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000402
James E. King, IIIdf899132016-11-12 15:16:30 -0500403 state_ = ThreadManager::STOPPED;
Marc Slemkod466b212006-07-20 00:04:18 +0000404}
Mark Slee7c10eaf2007-03-01 02:45:10 +0000405
Marc Slemkod466b212006-07-20 00:04:18 +0000406void ThreadManager::Impl::removeWorker(size_t value) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500407 Guard g(mutex_);
408 removeWorkersUnderLock(value);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000409}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000410
James E. King, IIIdf899132016-11-12 15:16:30 -0500411void ThreadManager::Impl::removeWorkersUnderLock(size_t value) {
412 if (value > workerMaxCount_) {
413 throw InvalidArgumentException();
414 }
415
416 workerMaxCount_ -= value;
417
418 if (idleCount_ > value) {
419 // There are more idle workers than we need to remove,
420 // so notify enough of them so they can terminate.
421 for (size_t ix = 0; ix < value; ix++) {
422 monitor_.notify();
423 }
424 } else {
425 // There are as many or less idle workers than we need to remove,
426 // so just notify them all so they can terminate.
427 monitor_.notifyAll();
428 }
429
430 while (workerCount_ != workerMaxCount_) {
431 workerMonitor_.wait();
432 }
433
cyy64750162019-02-08 13:40:59 +0800434 for (const auto & deadWorker : deadWorkers_) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500435
436 // when used with a joinable thread factory, we join the threads as we remove them
437 if (!threadFactory_->isDetached()) {
cyy64750162019-02-08 13:40:59 +0800438 deadWorker->join();
James E. King, IIIdf899132016-11-12 15:16:30 -0500439 }
440
cyy64750162019-02-08 13:40:59 +0800441 idMap_.erase(deadWorker->getId());
442 workers_.erase(deadWorker);
James E. King, IIIdf899132016-11-12 15:16:30 -0500443 }
444
445 deadWorkers_.clear();
446}
447
448bool ThreadManager::Impl::canSleep() const {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100449 const Thread::id_t id = threadFactory_->getCurrentThreadId();
450 return idMap_.find(id) == idMap_.end();
451}
452
453void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout, int64_t expiration) {
454 Guard g(mutex_, timeout);
455
456 if (!g) {
457 throw TimedOutException();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000458 }
459
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100460 if (state_ != ThreadManager::STARTED) {
461 throw IllegalStateException(
462 "ThreadManager::Impl::add ThreadManager "
463 "not started");
464 }
David Reiss4e19f192010-03-09 05:19:59 +0000465
James E. King, IIIdf899132016-11-12 15:16:30 -0500466 // if we're at a limit, remove an expired task to see if the limit clears
467 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
468 removeExpired(true);
469 }
470
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100471 if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
472 if (canSleep() && timeout >= 0) {
473 while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
474 // This is thread safe because the mutex is shared between monitors.
475 maxMonitor_.wait(timeout);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000476 }
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100477 } else {
478 throw TooManyPendingTasksException();
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000479 }
Marc Slemko66949872006-07-15 01:52:39 +0000480 }
481
cyy64750162019-02-08 13:40:59 +0800482 tasks_.push_back(std::make_shared<ThreadManager::Task>(value, expiration));
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100483
484 // If idle thread is available notify it, otherwise all worker threads are
485 // running and will get around to this task in time.
486 if (idleCount_ > 0) {
487 monitor_.notify();
488 }
489}
490
Marc Slemko6f038a72006-08-03 18:58:09 +0000491void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500492 Guard g(mutex_);
Mark Slee2f6404d2006-10-10 01:37:40 +0000493 if (state_ != ThreadManager::STARTED) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100494 throw IllegalStateException(
495 "ThreadManager::Impl::remove ThreadManager not "
496 "started");
Mark Sleef5f2be42006-09-05 21:05:31 +0000497 }
James E. King, IIIdf899132016-11-12 15:16:30 -0500498
Sebastian Zenker042580f2019-01-29 15:48:12 +0100499 for (auto it = tasks_.begin(); it != tasks_.end(); ++it)
James E. King, IIIdf899132016-11-12 15:16:30 -0500500 {
501 if ((*it)->getRunnable() == task)
502 {
503 tasks_.erase(it);
504 return;
505 }
506 }
Marc Slemko66949872006-07-15 01:52:39 +0000507}
508
cyy316723a2019-01-05 16:35:14 +0800509std::shared_ptr<Runnable> ThreadManager::Impl::removeNextPending() {
David Reiss01fe1532010-03-09 05:19:25 +0000510 Guard g(mutex_);
511 if (state_ != ThreadManager::STARTED) {
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100512 throw IllegalStateException(
513 "ThreadManager::Impl::removeNextPending "
514 "ThreadManager not started");
David Reiss01fe1532010-03-09 05:19:25 +0000515 }
516
517 if (tasks_.empty()) {
cyy316723a2019-01-05 16:35:14 +0800518 return std::shared_ptr<Runnable>();
David Reiss01fe1532010-03-09 05:19:25 +0000519 }
520
521 shared_ptr<ThreadManager::Task> task = tasks_.front();
James E. King, IIIdf899132016-11-12 15:16:30 -0500522 tasks_.pop_front();
Roger Meier8b51bc62014-07-24 23:33:33 +0200523
David Reiss01fe1532010-03-09 05:19:25 +0000524 return task->getRunnable();
525}
526
James E. King, IIIdf899132016-11-12 15:16:30 -0500527void ThreadManager::Impl::removeExpired(bool justOne) {
528 // this is always called under a lock
cyybfdbd032019-01-12 14:38:28 +0800529 if (tasks_.empty()) {
530 return;
531 }
532 auto now = std::chrono::steady_clock::now();
David Reiss068f4162010-03-09 05:19:45 +0000533
Sebastian Zenker042580f2019-01-29 15:48:12 +0100534 for (auto it = tasks_.begin(); it != tasks_.end(); )
James E. King, IIIdf899132016-11-12 15:16:30 -0500535 {
cyybfdbd032019-01-12 14:38:28 +0800536 if ((*it)->getExpireTime() && *((*it)->getExpireTime()) < now) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500537 if (expireCallback_) {
538 expireCallback_((*it)->getRunnable());
539 }
540 it = tasks_.erase(it);
541 ++expiredCount_;
542 if (justOne) {
543 return;
544 }
David Reiss068f4162010-03-09 05:19:45 +0000545 }
James E. King, IIIdf899132016-11-12 15:16:30 -0500546 else
547 {
548 ++it;
David Reiss068f4162010-03-09 05:19:45 +0000549 }
David Reiss068f4162010-03-09 05:19:45 +0000550 }
551}
552
David Reiss068f4162010-03-09 05:19:45 +0000553void ThreadManager::Impl::setExpireCallback(ExpireCallback expireCallback) {
James E. King, IIIdf899132016-11-12 15:16:30 -0500554 Guard g(mutex_);
David Reiss068f4162010-03-09 05:19:45 +0000555 expireCallback_ = expireCallback;
556}
557
Marc Slemkod466b212006-07-20 00:04:18 +0000558class SimpleThreadManager : public ThreadManager::Impl {
Marc Slemko66949872006-07-15 01:52:39 +0000559
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100560public:
561 SimpleThreadManager(size_t workerCount = 4, size_t pendingTaskCountMax = 0)
562 : workerCount_(workerCount), pendingTaskCountMax_(pendingTaskCountMax) {}
Marc Slemko66949872006-07-15 01:52:39 +0000563
Sebastian Zenker042580f2019-01-29 15:48:12 +0100564 void start() override {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000565 ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000566 ThreadManager::Impl::start();
Mark Slee2f6404d2006-10-10 01:37:40 +0000567 addWorker(workerCount_);
Marc Slemkod466b212006-07-20 00:04:18 +0000568 }
569
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100570private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000571 const size_t workerCount_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000572 const size_t pendingTaskCountMax_;
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000573};
Marc Slemko66949872006-07-15 01:52:39 +0000574
Marc Slemko6f038a72006-08-03 18:58:09 +0000575shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
576 return shared_ptr<ThreadManager>(new ThreadManager::Impl());
Marc Slemkod466b212006-07-20 00:04:18 +0000577}
Marc Slemko66949872006-07-15 01:52:39 +0000578
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100579shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count,
580 size_t pendingTaskCountMax) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000581 return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
Marc Slemkod466b212006-07-20 00:04:18 +0000582}
Konrad Grochowski16a23a62014-11-13 15:33:38 +0100583}
584}
585} // apache::thrift::concurrency