Checkpoint of initial cut at thread pool manager for thrift and related concurrency classes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664721 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
new file mode 100644
index 0000000..c4ca2b1
--- /dev/null
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -0,0 +1,303 @@
+#include "ThreadManager.h"
+
+#include <assert.h>
+
+namespace facebook { namespace thrift { namespace concurrency {
+
+/** ThreadManager class
+
+ This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather
+ it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times and informs the
+ PoolPolicy object bound to instances of this manager of interesting transitions. It is then up the PoolPolicy object to decide if the thread pool
+ size needs to be adjusted and call this object addThread and removeThread methods to make changes.
+
+ This design allows different policy implementations to used this code to handle basic worker thread management and worker task execution and focus on
+ policy issues. The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads.
+
+ @author marc
+ @version $Id */
+
+class ThreadManager::Task : public Runnable {
+
+public:
+ enum STATE {
+ WAITING,
+ EXECUTING,
+ CANCELLED,
+ COMPLETE
+ };
+
+ Task(Runnable* runnable) :
+ _runnable(runnable),
+ _state(WAITING)
+ {}
+
+ ~Task() {};
+
+ void run() {
+ if(_state == EXECUTING) {
+ _runnable->run();
+ _state = COMPLETE;
+ }
+ }
+
+ private:
+
+ Runnable* _runnable;
+
+ STATE _state;
+};
+
+class ThreadManager::Worker: public Runnable {
+
+ enum STATE {
+ UNINITIALIZED,
+ STARTING,
+ STARTED,
+ STOPPING,
+ STOPPED
+ };
+
+ public:
+ Worker(ThreadManager* manager) :
+ _manager(manager),
+ _state(UNINITIALIZED),
+ _idle(false)
+ {}
+
+ ~Worker() {}
+
+ /** Worker entry point
+
+ As long as worker thread is running, pull tasks off the task queue and execute. */
+
+ void run() {
+
+ {Synchronized(_manager->_monitor);
+
+ if(_state == STARTING) {
+ _state = STARTED;
+ }
+ }
+
+ do {
+
+ ThreadManager::Task* task = NULL;
+
+ /* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop).
+
+ Once the queue is non-empty, dequeue a task, release monitor, and execute. */
+
+ {Synchronized(_manager->_monitor);
+
+ while(_state == STARTED && _manager->_tasks.empty()) {
+
+ _manager->_idleCount++;
+
+ _idle = true;
+
+ _manager->_monitor.wait();
+
+ _idle = false;
+
+ _manager->_idleCount--;
+ }
+
+ if(_state == STARTED) {
+
+ task = _manager->_tasks.front();
+ }
+ }
+
+ if(task != NULL) {
+
+ task->run();
+
+ delete task;
+ }
+
+ } while(_state == STARTED);
+
+ {Synchronized(_manager->_monitor);
+
+ if(_state == STOPPING) {
+
+ _state = STOPPED;
+
+ _manager->_monitor.notify();
+
+ }
+ }
+
+ return;
+ }
+
+ private:
+
+ ThreadManager* _manager;
+
+ friend class ThreadManager;
+
+ STATE _state;
+
+ bool _idle;
+};
+
+ThreadManager::ThreadManager(size_t highWatermark, size_t lowWatermark) :
+ _hiwat(highWatermark),
+ _lowat(lowWatermark) {
+}
+
+ThreadManager::~ThreadManager() {}
+
+size_t ThreadManager::ThreadManager::highWatermark() const {return _hiwat;}
+
+void ThreadManager::highWatermark(size_t value) {_hiwat = value;}
+
+size_t ThreadManager::lowWatermark() const {return _lowat;}
+
+void ThreadManager::lowWatermark(size_t value) {_lowat = value;}
+
+const PoolPolicy* ThreadManager::poolPolicy() const {
+
+ Synchronized s(_monitor);
+
+ return _poolPolicy;
+}
+
+void ThreadManager::poolPolicy(const PoolPolicy* value) {
+
+ Synchronized s(_monitor);
+
+ _poolPolicy = value;
+}
+
+const ThreadFactory* ThreadManager::threadFactory() const {
+
+ Synchronized s(_monitor);
+
+ return _threadFactory;
+}
+
+void ThreadManager::threadFactory(const ThreadFactory* value) {
+
+ Synchronized s(_monitor);
+
+ _threadFactory = value;
+}
+
+void ThreadManager::addThread(size_t value) {
+
+ std::set<Thread*> newThreads;
+
+ for(size_t ix = 0; ix < value; ix++) {
+
+ ThreadManager::Worker* worker = new ThreadManager::Worker(this);
+
+ newThreads.insert(_threadFactory->newThread(worker));
+ }
+
+ for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
+
+ (*ix)->start();
+ }
+ for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
+
+ (*ix)->start();
+ }
+
+ {Synchronized s(_monitor);
+
+ _workers.insert(newThreads.begin(), newThreads.end());
+ }
+}
+
+void ThreadManager::removeThread(size_t value) {
+
+ std::set<Thread*> removedThreads;
+
+ {Synchronized s(_monitor);
+
+ /* Overly clever loop
+
+ First time through, (idleOnly == 1) just look for idle threads. If that didn't find enough, go through again (idleOnly == 0)
+ and remove a sufficient number of busy threads. */
+
+ for(int idleOnly = 1; idleOnly <= 0; idleOnly--) {
+
+ for(std::set<Thread*>::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) {
+
+ Worker* worker = (Worker*)(*workerThread)->runnable();
+
+ if(worker->_idle || !idleOnly) {
+
+ removedThreads.insert(*workerThread);
+
+ _workers.erase(workerThread);
+ }
+ }
+ }
+
+ _monitor.notifyAll();
+ }
+
+
+ // Join removed threads and free worker
+
+ for(std::set<Thread*>::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) {
+
+ Worker* worker = (Worker*)(*workerThread)->runnable();
+
+ (*workerThread)->join();
+
+ delete worker;
+ }
+}
+
+size_t ThreadManager::idleWorkerCount() const {return _idleCount;}
+
+size_t ThreadManager::workerCount() const {
+
+ Synchronized s(_monitor);
+
+ return _workers.size();
+}
+
+size_t ThreadManager::pendingTaskCount() const {
+
+ Synchronized s(_monitor);
+
+ return _tasks.size();
+}
+
+size_t ThreadManager::totalTaskCount() const {
+
+ Synchronized s(_monitor);
+
+ return _tasks.size() + _workers.size() - _idleCount;
+}
+
+void ThreadManager::add(Runnable* value) {
+
+ Synchronized s(_monitor);
+
+ _tasks.push(new ThreadManager::Task(value));
+
+ /* If queue is empty notify a thread, otherwise all worker threads are running and will get around to this
+ task in time. */
+
+ if(_tasks.size() == 1) {
+
+ assert(_idleCount == _workers.size());
+
+ _monitor.notify();
+ }
+}
+
+void ThreadManager::remove(Runnable* task) {
+
+ Synchronized s(_monitor);
+}
+
+}}} // facebook::thrift::concurrency
+