|  | // Copyright (c) 2006- Facebook | 
|  | // Distributed under the Thrift Software License | 
|  | // | 
|  | // See accompanying file LICENSE or visit the Thrift site at: | 
|  | // http://developers.facebook.com/thrift/ | 
|  |  | 
|  | #include "ThreadManager.h" | 
|  | #include "Exception.h" | 
|  | #include "Monitor.h" | 
|  |  | 
|  | #include <boost/shared_ptr.hpp> | 
|  |  | 
|  | #include <assert.h> | 
|  | #include <queue> | 
|  | #include <set> | 
|  |  | 
|  | #if defined(DEBUG) | 
|  | #include <iostream> | 
|  | #endif //defined(DEBUG) | 
|  |  | 
|  | namespace facebook { namespace thrift { namespace concurrency { | 
|  |  | 
|  | using namespace boost; | 
|  |  | 
|  |  | 
|  | /** | 
|  | * ThreadManager class | 
|  | * | 
|  | * This class manages a pool of threads. It uses a ThreadFactory to create | 
|  | * threads.  It never actually creates or destroys worker threads, rather | 
|  | * it maintains statistics on number of idle threads, number of active threads, | 
|  | * task backlog, and average wait and service times. | 
|  | * | 
|  | * @author marc | 
|  | * @version $Id:$ | 
|  | */ | 
|  | class ThreadManager::Impl : public ThreadManager  { | 
|  |  | 
|  | public: | 
|  | Impl() : | 
|  | workerCount_(0), | 
|  | workerMaxCount_(0), | 
|  | idleCount_(0), | 
|  | state_(ThreadManager::UNINITIALIZED) {} | 
|  |  | 
|  | ~Impl() { stop(); } | 
|  |  | 
|  | void start(); | 
|  |  | 
|  | void stop(); | 
|  |  | 
|  | const ThreadManager::STATE state() const { | 
|  | return state_; | 
|  | } | 
|  |  | 
|  | shared_ptr<ThreadFactory> threadFactory() const { | 
|  | Synchronized s(monitor_); | 
|  | return threadFactory_; | 
|  | } | 
|  |  | 
|  | void threadFactory(shared_ptr<ThreadFactory> value) { | 
|  | Synchronized s(monitor_); | 
|  | threadFactory_ = value; | 
|  | } | 
|  |  | 
|  | void addWorker(size_t value); | 
|  |  | 
|  | void removeWorker(size_t value); | 
|  |  | 
|  | size_t idleWorkerCount() const { | 
|  | return idleCount_; | 
|  | } | 
|  |  | 
|  | size_t workerCount() const { | 
|  | Synchronized s(monitor_); | 
|  | return workerCount_; | 
|  | } | 
|  |  | 
|  | size_t pendingTaskCount() const { | 
|  | Synchronized s(monitor_); | 
|  | return tasks_.size(); | 
|  | } | 
|  |  | 
|  | size_t totalTaskCount() const { | 
|  | Synchronized s(monitor_); | 
|  | return tasks_.size() + workerCount_ - idleCount_; | 
|  | } | 
|  |  | 
|  | void add(shared_ptr<Runnable> value); | 
|  |  | 
|  | void remove(shared_ptr<Runnable> task); | 
|  |  | 
|  | private: | 
|  | size_t workerCount_; | 
|  | size_t workerMaxCount_; | 
|  | size_t idleCount_; | 
|  | ThreadManager::STATE state_; | 
|  | shared_ptr<ThreadFactory> threadFactory_; | 
|  |  | 
|  |  | 
|  | friend class ThreadManager::Task; | 
|  | std::queue<shared_ptr<Task> > tasks_; | 
|  | Monitor monitor_; | 
|  | Monitor workerMonitor_; | 
|  |  | 
|  | friend class ThreadManager::Worker; | 
|  | std::set<shared_ptr<Thread> > workers_; | 
|  | std::set<shared_ptr<Thread> > deadWorkers_; | 
|  | }; | 
|  |  | 
|  | class ThreadManager::Task : public Runnable { | 
|  |  | 
|  | public: | 
|  | enum STATE { | 
|  | WAITING, | 
|  | EXECUTING, | 
|  | CANCELLED, | 
|  | COMPLETE | 
|  | }; | 
|  |  | 
|  | Task(shared_ptr<Runnable> runnable) : | 
|  | runnable_(runnable), | 
|  | state_(WAITING) {} | 
|  |  | 
|  | ~Task() {} | 
|  |  | 
|  | void run() { | 
|  | if (state_ == EXECUTING) { | 
|  | runnable_->run(); | 
|  | state_ = COMPLETE; | 
|  | } | 
|  | } | 
|  |  | 
|  | private: | 
|  | shared_ptr<Runnable> runnable_; | 
|  | friend class ThreadManager::Worker; | 
|  | STATE state_; | 
|  | }; | 
|  |  | 
|  | class ThreadManager::Worker: public Runnable { | 
|  | enum STATE { | 
|  | UNINITIALIZED, | 
|  | STARTING, | 
|  | STARTED, | 
|  | STOPPING, | 
|  | STOPPED | 
|  | }; | 
|  |  | 
|  | public: | 
|  | Worker(ThreadManager::Impl* manager) : | 
|  | manager_(manager), | 
|  | state_(UNINITIALIZED), | 
|  | idle_(false) {} | 
|  |  | 
|  | ~Worker() {} | 
|  |  | 
|  | bool isActive() const { | 
|  | return manager_->workerCount_ <= manager_->workerMaxCount_; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Worker entry point | 
|  | * | 
|  | * As long as worker thread is running, pull tasks off the task queue and | 
|  | * execute. | 
|  | */ | 
|  | void run() { | 
|  | bool active = false; | 
|  | bool notifyManager = false; | 
|  |  | 
|  | /** | 
|  | * Increment worker semaphore and notify manager if worker count reached | 
|  | * desired max | 
|  | * | 
|  | * Note: We have to release the monitor and acquire the workerMonitor | 
|  | * since that is what the manager blocks on for worker add/remove | 
|  | */ | 
|  | { | 
|  | Synchronized s(manager_->monitor_); | 
|  | active = manager_->workerCount_ < manager_->workerMaxCount_; | 
|  | if (active) { | 
|  | manager_->workerCount_++; | 
|  | notifyManager = manager_->workerCount_ == manager_->workerMaxCount_; | 
|  | } | 
|  | } | 
|  |  | 
|  | if (notifyManager) { | 
|  | Synchronized s(manager_->workerMonitor_); | 
|  | manager_->workerMonitor_.notify(); | 
|  | notifyManager = false; | 
|  | } | 
|  |  | 
|  | while (active) { | 
|  | shared_ptr<ThreadManager::Task> task; | 
|  |  | 
|  | /** | 
|  | * While holding manager monitor block for non-empty task queue (Also | 
|  | * check that the thread hasn't been requested to stop). Once the queue | 
|  | * is non-empty, dequeue a task, release monitor, and execute. If the | 
|  | * worker max count has been decremented such that we exceed it, mark | 
|  | * ourself inactive, decrement the worker count and notify the manager | 
|  | * (technically we're notifying the next blocked thread but eventually | 
|  | * the manager will see it. | 
|  | */ | 
|  | { | 
|  | Synchronized s(manager_->monitor_); | 
|  | active = isActive(); | 
|  | while (active && manager_->tasks_.empty()) { | 
|  | manager_->idleCount_++; | 
|  | idle_ = true; | 
|  | manager_->monitor_.wait(); | 
|  | active = isActive(); | 
|  | idle_ = false; | 
|  | manager_->idleCount_--; | 
|  | } | 
|  |  | 
|  | if (active) { | 
|  | if (!manager_->tasks_.empty()) { | 
|  | task = manager_->tasks_.front(); | 
|  | manager_->tasks_.pop(); | 
|  | if (task->state_ == ThreadManager::Task::WAITING) { | 
|  | task->state_ = ThreadManager::Task::EXECUTING; | 
|  | } | 
|  | } | 
|  | } else { | 
|  | idle_ = true; | 
|  | manager_->workerCount_--; | 
|  | notifyManager = manager_->workerCount_ == manager_->workerMaxCount_; | 
|  | } | 
|  | } | 
|  |  | 
|  | if (task != NULL) { | 
|  | if (task->state_ == ThreadManager::Task::EXECUTING) { | 
|  | try { | 
|  | task->run(); | 
|  | } catch(...) { | 
|  | // XXX need to log this | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | { | 
|  | Synchronized s(manager_->workerMonitor_); | 
|  | manager_->deadWorkers_.insert(this->thread()); | 
|  | if (notifyManager) { | 
|  | manager_->workerMonitor_.notify(); | 
|  | } | 
|  | } | 
|  |  | 
|  | return; | 
|  | } | 
|  |  | 
|  | private: | 
|  | ThreadManager::Impl* manager_; | 
|  | friend class ThreadManager::Impl; | 
|  | STATE state_; | 
|  | bool idle_; | 
|  | }; | 
|  |  | 
|  |  | 
|  | void ThreadManager::Impl::addWorker(size_t value) { | 
|  | std::set<shared_ptr<Thread> > newThreads; | 
|  | for (size_t ix = 0; ix < value; ix++) { | 
|  | class ThreadManager::Worker; | 
|  | shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this)); | 
|  | newThreads.insert(threadFactory_->newThread(worker)); | 
|  | } | 
|  |  | 
|  | { | 
|  | Synchronized s(monitor_); | 
|  | workerMaxCount_ += value; | 
|  | workers_.insert(newThreads.begin(), newThreads.end()); | 
|  | } | 
|  |  | 
|  | for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) { | 
|  | shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable()); | 
|  | worker->state_ = ThreadManager::Worker::STARTING; | 
|  | (*ix)->start(); | 
|  | } | 
|  |  | 
|  | { | 
|  | Synchronized s(workerMonitor_); | 
|  | while (workerCount_ != workerMaxCount_) { | 
|  | workerMonitor_.wait(); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | void ThreadManager::Impl::start() { | 
|  |  | 
|  | if (state_ == ThreadManager::STOPPED) { | 
|  | return; | 
|  | } | 
|  |  | 
|  | { | 
|  | Synchronized s(monitor_); | 
|  | if (state_ == ThreadManager::UNINITIALIZED) { | 
|  | if (threadFactory_ == NULL) { | 
|  | throw InvalidArgumentException(); | 
|  | } | 
|  | state_ = ThreadManager::STARTED; | 
|  | monitor_.notifyAll(); | 
|  | } | 
|  |  | 
|  | while (state_ == STARTING) { | 
|  | monitor_.wait(); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | void ThreadManager::Impl::stop() { | 
|  | bool doStop = false; | 
|  | if (state_ == ThreadManager::STOPPED) { | 
|  | return; | 
|  | } | 
|  |  | 
|  | { | 
|  | Synchronized s(monitor_); | 
|  | if (!state_ != ThreadManager::STOPPING && state_ != ThreadManager::STOPPED) { | 
|  | doStop = true; | 
|  | state_ = ThreadManager::STOPPING; | 
|  | } | 
|  | } | 
|  |  | 
|  | if (doStop) { | 
|  | removeWorker(workerCount_); | 
|  | state_ = ThreadManager::STOPPING; | 
|  | } | 
|  |  | 
|  | // XXX | 
|  | // should be able to block here for transition to STOPPED since we're no | 
|  | // using shared_ptrs | 
|  | } | 
|  |  | 
|  | void ThreadManager::Impl::removeWorker(size_t value) { | 
|  | std::set<shared_ptr<Thread> > removedThreads; | 
|  | { | 
|  | Synchronized s(monitor_); | 
|  | if (value > workerMaxCount_) { | 
|  | throw InvalidArgumentException(); | 
|  | } | 
|  |  | 
|  | workerMaxCount_ -= value; | 
|  |  | 
|  | if (idleCount_ < value) { | 
|  | for (size_t ix = 0; ix < idleCount_; ix++) { | 
|  | monitor_.notify(); | 
|  | } | 
|  | } else { | 
|  | monitor_.notifyAll(); | 
|  | } | 
|  | } | 
|  |  | 
|  | { | 
|  | Synchronized s(workerMonitor_); | 
|  |  | 
|  | while (workerCount_ != workerMaxCount_) { | 
|  | workerMonitor_.wait(); | 
|  | } | 
|  |  | 
|  | for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) { | 
|  | workers_.erase(*ix); | 
|  | } | 
|  |  | 
|  | deadWorkers_.clear(); | 
|  | } | 
|  | } | 
|  |  | 
|  | void ThreadManager::Impl::add(shared_ptr<Runnable> value) { | 
|  | Synchronized s(monitor_); | 
|  |  | 
|  | if (state_ != ThreadManager::STARTED) { | 
|  | throw IllegalStateException(); | 
|  | } | 
|  |  | 
|  | tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value))); | 
|  |  | 
|  | // If idle thread is available notify it, otherwise all worker threads are | 
|  | // running and will get around to this task in time. | 
|  | if (idleCount_ > 0) { | 
|  | monitor_.notify(); | 
|  | } | 
|  | } | 
|  |  | 
|  | void ThreadManager::Impl::remove(shared_ptr<Runnable> task) { | 
|  | Synchronized s(monitor_); | 
|  | if (state_ != ThreadManager::STARTED) { | 
|  | throw IllegalStateException(); | 
|  | } | 
|  | } | 
|  |  | 
|  | class SimpleThreadManager : public ThreadManager::Impl { | 
|  |  | 
|  | public: | 
|  | SimpleThreadManager(size_t workerCount=4) : | 
|  | workerCount_(workerCount), | 
|  | firstTime_(true) { | 
|  | } | 
|  |  | 
|  | void start() { | 
|  | ThreadManager::Impl::start(); | 
|  | addWorker(workerCount_); | 
|  | } | 
|  |  | 
|  | private: | 
|  | const size_t workerCount_; | 
|  | bool firstTime_; | 
|  | Monitor monitor_; | 
|  | }; | 
|  |  | 
|  |  | 
|  | shared_ptr<ThreadManager> ThreadManager::newThreadManager() { | 
|  | return shared_ptr<ThreadManager>(new ThreadManager::Impl()); | 
|  | } | 
|  |  | 
|  | shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count) { | 
|  | return shared_ptr<ThreadManager>(new SimpleThreadManager(count)); | 
|  | } | 
|  |  | 
|  | }}} // facebook::thrift::concurrency |