Checkpoint of initial cut at thread pool manager for thrift and related concurrency classes.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664721 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/Monitor.cc b/lib/cpp/src/concurrency/Monitor.cc
new file mode 100644
index 0000000..d1b83d1
--- /dev/null
+++ b/lib/cpp/src/concurrency/Monitor.cc
@@ -0,0 +1,138 @@
+#include "Monitor.h"
+
+#include <assert.h>
+#include <errno.h>
+#include <pthread.h>
+
+namespace facebook { namespace thrift { namespace concurrency {
+
+/** Monitor implementation using the POSIX pthread library
+
+ @author marc
+ @version $Id$ */
+
+class Monitor::Impl {
+
+ public:
+
+ Impl() :
+ mutexInitialized(false) {
+
+ /* XXX
+ Need to fix this to handle failures without leaking. */
+
+ assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0);
+
+ mutexInitialized = true;
+
+ assert(pthread_cond_init(&_pthread_cond, NULL) == 0);
+ }
+
+ ~Impl() {
+
+ if(mutexInitialized) {
+
+ mutexInitialized = false;
+
+ assert(pthread_mutex_destroy(&_pthread_mutex) == 0);
+ }
+
+ if(condInitialized) {
+
+ condInitialized = false;
+
+ assert(pthread_cond_destroy(&_pthread_cond) == 0);
+ }
+ }
+
+ void lock() const {pthread_mutex_lock(&_pthread_mutex);}
+
+ void unlock() const {pthread_mutex_unlock(&_pthread_mutex);}
+
+ void wait(long long timeout) const {
+
+ // XXX Need to assert that caller owns mutex
+
+ if(timeout == 0LL) {
+
+ pthread_cond_wait(&_pthread_cond, &_pthread_mutex);
+
+ } else {
+
+ struct timespec abstime;
+
+ toAbsoluteTimespec(abstime, timeout);
+
+ int result = pthread_cond_timedwait(&_pthread_cond, &_pthread_mutex, &abstime);
+
+ if(result == ETIMEDOUT) {
+
+ // XXX If result is timeout need to throw timeout exception
+ }
+ }
+ }
+
+ void notify() {
+
+ // XXX Need to assert that caller owns mutex
+
+ assert(pthread_cond_signal(&_pthread_cond) == 0);
+ }
+
+ void notifyAll() {
+
+ // XXX Need to assert that caller owns mutex
+
+ assert(pthread_cond_broadcast(&_pthread_cond) == 0);
+ }
+
+private:
+
+ /** Converts relative timeout specified as a duration in milliseconds to a struct timespec structure
+ specifying current time plus timeout
+
+ @param timeout time to delay in milliseconds
+ @return struct timespec current time plus timeout */
+
+ static const void toAbsoluteTimespec(struct timespec& result, long long timeout) {
+
+ // XXX Darwin doesn't seem to have any readily useable hi-res clock.
+
+ time_t seconds;
+
+ assert(time(&seconds) != (time_t)-1);
+
+ seconds+= (timeout / 1000);
+
+ long nanoseconds = (timeout % 1000) * 1000000;
+
+ result.tv_sec = seconds + (nanoseconds / 1000000000);
+
+ result.tv_nsec = nanoseconds % 1000000000;
+ }
+
+ mutable pthread_mutex_t _pthread_mutex;
+
+ mutable bool mutexInitialized;
+
+ mutable pthread_cond_t _pthread_cond;
+
+ mutable bool condInitialized;
+};
+
+Monitor::Monitor() : _impl(new Monitor::Impl()) {}
+
+ Monitor::~Monitor() { delete _impl;}
+
+void Monitor::lock() const {_impl->lock();}
+
+void Monitor::unlock() const {_impl->unlock();}
+
+void Monitor::wait(long long timeout) const {_impl->wait(timeout);}
+
+void Monitor::notify() const {_impl->notify();}
+
+void Monitor::notifyAll() const {_impl->notifyAll();}
+
+}}} // facebook::thrift::concurrency
+
diff --git a/lib/cpp/src/concurrency/Monitor.h b/lib/cpp/src/concurrency/Monitor.h
new file mode 100644
index 0000000..82544f1
--- /dev/null
+++ b/lib/cpp/src/concurrency/Monitor.h
@@ -0,0 +1,59 @@
+#if !defined(_concurrency_mutex_h_)
+#define _concurrency_mutex_h_ 1
+
+namespace facebook { namespace thrift { namespace concurrency {
+
+/** A monitor is a combination mutex and condition-event. Waiting and notifying condition events requires that the caller own the mutex. Mutex
+ lock and unlock operations can be performed independently of condition events. This is more or less analogous to java.lang.Object multi-thread
+ operations
+
+ Note that all methods are const. Monitors implement logical constness, not bit constness. This allows const methods to call monitor
+ methods without needing to cast away constness or change to non-const signatures.
+
+ @author marc
+ @version $Id$ */
+
+class Monitor {
+
+ public:
+
+ Monitor();
+
+ virtual ~Monitor();
+
+ virtual void lock() const;
+
+ virtual void unlock() const;
+
+ virtual void wait(long long timeout=0LL) const;
+
+ virtual void notify() const;
+
+ virtual void notifyAll() const;
+
+ private:
+
+ class Impl;
+
+ Impl* _impl;
+};
+
+class Synchronized {
+ public:
+
+ Synchronized(const Monitor& value) : _monitor(value) {
+ _monitor.lock();
+ }
+
+ ~Synchronized() {
+ _monitor.unlock();
+ }
+
+ private:
+ const Monitor& _monitor;
+};
+
+
+}}} // facebook::thrift::concurrency
+
+#endif // !defined(_concurrency_mutex_h_)
diff --git a/lib/cpp/src/concurrency/Mutex.cc b/lib/cpp/src/concurrency/Mutex.cc
new file mode 100644
index 0000000..39d768e
--- /dev/null
+++ b/lib/cpp/src/concurrency/Mutex.cc
@@ -0,0 +1,38 @@
+#include "Mutex.h"
+
+#include <assert.h>
+#include <pthread.h>
+
+namespace facebook { namespace thrift { namespace concurrency {
+
+class Mutex::impl {
+public:
+ impl() : initialized(false) {
+ assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0);
+ initialized = true;
+ }
+
+ ~impl() {
+ if(initialized) {
+ initialized = false;
+ assert(pthread_mutex_destroy(&_pthread_mutex) == 0);
+ }
+ }
+
+ void lock() const {pthread_mutex_lock(&_pthread_mutex);}
+
+ void unlock() const {pthread_mutex_unlock(&_pthread_mutex);}
+
+private:
+ mutable pthread_mutex_t _pthread_mutex;
+ mutable bool initialized;
+};
+
+Mutex::Mutex() : _impl(new Mutex::impl()) {}
+
+void Mutex::lock() const {_impl->lock();}
+
+void Mutex::unlock() const {_impl->unlock();}
+
+}}} // facebook::thrift::concurrency
+
diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h
new file mode 100644
index 0000000..e8371ea
--- /dev/null
+++ b/lib/cpp/src/concurrency/Mutex.h
@@ -0,0 +1,43 @@
+#if !defined(_concurrency_mutex_h_)
+#define _concurrency_mutex_h_ 1
+
+namespace facebook { namespace thrift { namespace concurrency {
+
+class Mutex {
+
+ public:
+
+ Mutex();
+
+ virtual ~Mutex() {}
+
+ virtual void lock() const;
+
+ virtual void unlock() const;
+
+ private:
+
+ class impl;
+
+ impl* _impl;
+};
+
+class MutexMonitor {
+ public:
+
+ MutexMonitor(const Mutex& value) : _mutex(value) {
+ _mutex.lock();
+ }
+
+ ~MutexMonitor() {
+ _mutex.unlock();
+ }
+
+ private:
+ const Mutex& _mutex;
+};
+
+
+}}} // facebook::thrift::concurrency
+
+#endif // !defined(_concurrency_mutex_h_)
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc
new file mode 100644
index 0000000..e9d52f0
--- /dev/null
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc
@@ -0,0 +1,215 @@
+#include "PosixThreadFactory.h"
+
+#include <assert.h>
+#include <pthread.h>
+
+namespace facebook { namespace thrift { namespace concurrency {
+
+/** The POSIX thread class. */
+
+class PthreadThread: public Thread {
+
+public:
+ enum STATE {uninitialized,
+ starting,
+ started,
+ stopping,
+ stopped
+ };
+
+ static const int MB = 1024 * 1024;
+
+private:
+
+ pthread_t _pthread;
+
+ STATE _state;
+
+ int _policy;
+
+ int _priority;
+
+ int _stackSize;
+
+ Runnable* _runnable;
+
+ static void* threadMain(void* arg) {
+
+ // XXX need a lock here when testing thread state
+
+ PthreadThread* thread = (PthreadThread*)arg;
+
+ if(thread->_state != starting) {
+ return (void*)0;
+ }
+
+ thread->_state = starting;
+
+ thread->_runnable->run();
+
+ if(thread->_state != stopping && thread->_state != stopped) {
+ thread->_state = stopping;
+ }
+
+ return (void*)0;
+ }
+
+public:
+
+ PthreadThread(int policy, int priority, int stackSize, Runnable* runnable) :
+ _pthread(0),
+ _state(uninitialized),
+ _policy(policy),
+ _priority(priority),
+ _stackSize(stackSize),
+ _runnable(runnable)
+ {}
+
+ void start() {
+
+ if(_state != uninitialized) {
+ return;
+ }
+
+ _state = starting;
+
+ pthread_attr_t thread_attr;
+
+ assert(pthread_attr_init(&thread_attr) == 0);
+
+ assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0);
+
+ // Set thread stack size
+
+ assert(pthread_attr_setstacksize(&thread_attr, MB * _stackSize) == 0);
+
+ // Set thread policy
+
+ assert(pthread_attr_setschedpolicy(&thread_attr, _policy) == 0);
+
+ struct sched_param sched_param;
+ sched_param.sched_priority = _priority;
+
+ // Set thread priority
+
+ assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
+
+ assert(pthread_create(&_pthread, &thread_attr, PthreadThread::threadMain, (void*)this) == 0);
+ }
+
+ void join() {
+
+ if(_state != stopped) {
+
+ void* ignore;
+
+ pthread_join(_pthread, &ignore);
+ }
+ }
+
+ const Runnable* runnable() const {return _runnable;}
+
+};
+
+/** POSIX Thread factory implementation */
+
+class PosixThreadFactory::Impl {
+
+private:
+
+ POLICY _policy;
+
+ PRIORITY _priority;
+
+ int _stackSize;
+
+ bool _detached;
+
+ /** Converts generic posix thread schedule policy enums into pthread API values. */
+
+ static int toPthreadPolicy(POLICY policy) {
+ switch(policy) {
+ case OTHER: return SCHED_OTHER; break;
+ case FIFO: return SCHED_FIFO; break;
+ case ROUND_ROBIN: return SCHED_RR; break;
+ default: return SCHED_OTHER; break;
+ }
+ }
+
+ /** Converts relative thread priorities to absolute value based on posix thread scheduler policy
+
+ The idea is simply to divide up the priority range for the given policy into the correpsonding relative
+ priority level (lowest..highest) and then prorate accordingly. */
+
+ static int toPthreadPriority(POLICY policy, PRIORITY priority) {
+
+ int pthread_policy = toPthreadPolicy(policy);
+
+ int min_priority = sched_get_priority_min(pthread_policy);
+
+ int max_priority = sched_get_priority_max(pthread_policy);
+
+ int quanta = (HIGHEST - LOWEST) + 1;
+
+ float stepsperquanta = (max_priority - min_priority) / quanta;
+
+ if(priority <= HIGHEST) {
+
+ return (int)(min_priority + stepsperquanta * priority);
+ } else {
+
+ // should never get here for priority increments.
+
+ assert(false);
+
+ return (int)(min_priority + stepsperquanta * NORMAL);
+ }
+ }
+
+public:
+
+ Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
+ _policy(policy),
+ _priority(priority),
+ _stackSize(stackSize),
+ _detached(detached) {
+ }
+
+ /** Creates a new POSIX thread to run the runnable object
+
+ @param runnable A runnable object */
+
+ Thread* newThread(Runnable* runnable) const {
+
+ return new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable);
+ }
+
+ int stackSize() const { return _stackSize;}
+
+ void stackSize(int value) { _stackSize = value;}
+
+ PRIORITY priority() const { return _priority;}
+
+ /** Sets priority.
+
+ XXX
+ Need to handle incremental priorities properl. */
+
+ void priority(PRIORITY value) { _priority = value;}
+
+};
+
+PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
+ _impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
+
+Thread* PosixThreadFactory::newThread(Runnable* runnable) const {return _impl->newThread(runnable);}
+
+int PosixThreadFactory::stackSize() const {return _impl->stackSize();}
+
+void PosixThreadFactory::stackSize(int value) {_impl->stackSize(value);}
+
+PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const {return _impl->priority();}
+
+void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) {_impl->priority(value);}
+
+}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h
new file mode 100644
index 0000000..88a0888
--- /dev/null
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.h
@@ -0,0 +1,76 @@
+#if !defined(_concurrency_PosixThreadFactory_h_)
+#define _concurrency_PosixThreadFactory_h_ 1
+
+#include "Thread.h"
+
+namespace facebook { namespace thrift { namespace concurrency {
+
+/** A thread factory to create posix threads
+
+ @author marc */
+
+class PosixThreadFactory : public ThreadFactory {
+
+ public:
+
+ /** POSIX Thread scheduler policies */
+
+ enum POLICY {
+ OTHER,
+ FIFO,
+ ROUND_ROBIN
+ };
+
+ /** POSIX Thread scheduler relative priorities,
+
+ Absolute priority is determined by scheduler policy and OS. This enumeration specifies relative priorities such that one can
+ specify a priority withing a giving scheduler policy without knowing the absolute value of the priority. */
+
+ enum PRIORITY {
+ LOWEST = 0,
+ LOWER = 1,
+ LOW = 2,
+ NORMAL = 3,
+ HIGH = 4,
+ HIGHER = 5,
+ HIGHEST = 6,
+ INCREMENT = 7,
+ DECREMENT = 8
+ };
+
+ PosixThreadFactory(POLICY policy=ROUND_ROBIN, PRIORITY priority=NORMAL, int stackSize=1, bool detached=false);
+
+ // From ThreadFactory;
+
+ Thread* newThread(Runnable* runnable) const;
+
+ /** Sets stack size for created threads
+
+ @param value size in megabytes */
+
+ virtual void stackSize(int value);
+
+ /** Gets stack size for created threads
+
+ @return int size in megabytes */
+
+ virtual int stackSize() const;
+
+ /** Sets priority relative to current policy */
+
+ virtual void priority(PRIORITY priority);
+
+ /** Gets priority relative to current policy */
+
+ virtual PRIORITY priority() const;
+
+ private:
+
+ class Impl;
+
+ Impl* _impl;
+};
+
+}}} // facebook::thrift::concurrency
+
+#endif // !defined(_concurrency_PosixThreadFactory_h_)
diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h
new file mode 100644
index 0000000..befb4fe
--- /dev/null
+++ b/lib/cpp/src/concurrency/Thread.h
@@ -0,0 +1,59 @@
+#if !defined(_concurrency_Thread_h_)
+#define _concurrency_Thread_h_ 1
+
+namespace facebook { namespace thrift { namespace concurrency {
+
+class Thread;
+
+/** Minimal runnable class. More or less analogous to java.lang.Runnable. */
+
+class Runnable {
+
+ public:
+
+ virtual ~Runnable() {};
+
+ virtual void run() = 0;
+};
+
+/** Minimal thread class. Returned by thread factory bound to a Runnable object and ready to start execution. More or less analogous to java.lang.Thread
+ (minus all the thread group, priority, mode and other baggage, since that is difficult to abstract across platforms and is left for platform-specific
+ ThreadFactory implemtations to deal with - @see facebook::thrift::concurrency::ThreadFactory) */
+
+
+class Thread {
+
+ public:
+
+ virtual ~Thread() {};
+
+ /** Starts the thread. Does platform specific thread creation and configuration then invokes the run method of the Runnable object bound to this
+ thread. */
+
+ virtual void start() = 0;
+
+ /** Join this thread
+
+ Current thread blocks until this target thread completes. */
+
+ virtual void join() = 0;
+
+ /** Gets the runnable object this thread is hosting */
+
+ virtual const Runnable* runnable() const = 0;
+};
+
+/** Factory to create platform-specific thread object and bind them to Runnable object for execution */
+
+class ThreadFactory {
+
+ public:
+
+ virtual ~ThreadFactory() {}
+
+ virtual Thread* newThread(Runnable* runnable) const = 0;
+};
+
+}}} // facebook::thrift::concurrency
+
+#endif // !defined(_concurrency_Thread_h_)
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
new file mode 100644
index 0000000..c4ca2b1
--- /dev/null
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -0,0 +1,303 @@
+#include "ThreadManager.h"
+
+#include <assert.h>
+
+namespace facebook { namespace thrift { namespace concurrency {
+
+/** ThreadManager class
+
+ This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather
+ it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times and informs the
+ PoolPolicy object bound to instances of this manager of interesting transitions. It is then up the PoolPolicy object to decide if the thread pool
+ size needs to be adjusted and call this object addThread and removeThread methods to make changes.
+
+ This design allows different policy implementations to used this code to handle basic worker thread management and worker task execution and focus on
+ policy issues. The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads.
+
+ @author marc
+ @version $Id */
+
+class ThreadManager::Task : public Runnable {
+
+public:
+ enum STATE {
+ WAITING,
+ EXECUTING,
+ CANCELLED,
+ COMPLETE
+ };
+
+ Task(Runnable* runnable) :
+ _runnable(runnable),
+ _state(WAITING)
+ {}
+
+ ~Task() {};
+
+ void run() {
+ if(_state == EXECUTING) {
+ _runnable->run();
+ _state = COMPLETE;
+ }
+ }
+
+ private:
+
+ Runnable* _runnable;
+
+ STATE _state;
+};
+
+class ThreadManager::Worker: public Runnable {
+
+ enum STATE {
+ UNINITIALIZED,
+ STARTING,
+ STARTED,
+ STOPPING,
+ STOPPED
+ };
+
+ public:
+ Worker(ThreadManager* manager) :
+ _manager(manager),
+ _state(UNINITIALIZED),
+ _idle(false)
+ {}
+
+ ~Worker() {}
+
+ /** Worker entry point
+
+ As long as worker thread is running, pull tasks off the task queue and execute. */
+
+ void run() {
+
+ {Synchronized(_manager->_monitor);
+
+ if(_state == STARTING) {
+ _state = STARTED;
+ }
+ }
+
+ do {
+
+ ThreadManager::Task* task = NULL;
+
+ /* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop).
+
+ Once the queue is non-empty, dequeue a task, release monitor, and execute. */
+
+ {Synchronized(_manager->_monitor);
+
+ while(_state == STARTED && _manager->_tasks.empty()) {
+
+ _manager->_idleCount++;
+
+ _idle = true;
+
+ _manager->_monitor.wait();
+
+ _idle = false;
+
+ _manager->_idleCount--;
+ }
+
+ if(_state == STARTED) {
+
+ task = _manager->_tasks.front();
+ }
+ }
+
+ if(task != NULL) {
+
+ task->run();
+
+ delete task;
+ }
+
+ } while(_state == STARTED);
+
+ {Synchronized(_manager->_monitor);
+
+ if(_state == STOPPING) {
+
+ _state = STOPPED;
+
+ _manager->_monitor.notify();
+
+ }
+ }
+
+ return;
+ }
+
+ private:
+
+ ThreadManager* _manager;
+
+ friend class ThreadManager;
+
+ STATE _state;
+
+ bool _idle;
+};
+
+ThreadManager::ThreadManager(size_t highWatermark, size_t lowWatermark) :
+ _hiwat(highWatermark),
+ _lowat(lowWatermark) {
+}
+
+ThreadManager::~ThreadManager() {}
+
+size_t ThreadManager::ThreadManager::highWatermark() const {return _hiwat;}
+
+void ThreadManager::highWatermark(size_t value) {_hiwat = value;}
+
+size_t ThreadManager::lowWatermark() const {return _lowat;}
+
+void ThreadManager::lowWatermark(size_t value) {_lowat = value;}
+
+const PoolPolicy* ThreadManager::poolPolicy() const {
+
+ Synchronized s(_monitor);
+
+ return _poolPolicy;
+}
+
+void ThreadManager::poolPolicy(const PoolPolicy* value) {
+
+ Synchronized s(_monitor);
+
+ _poolPolicy = value;
+}
+
+const ThreadFactory* ThreadManager::threadFactory() const {
+
+ Synchronized s(_monitor);
+
+ return _threadFactory;
+}
+
+void ThreadManager::threadFactory(const ThreadFactory* value) {
+
+ Synchronized s(_monitor);
+
+ _threadFactory = value;
+}
+
+void ThreadManager::addThread(size_t value) {
+
+ std::set<Thread*> newThreads;
+
+ for(size_t ix = 0; ix < value; ix++) {
+
+ ThreadManager::Worker* worker = new ThreadManager::Worker(this);
+
+ newThreads.insert(_threadFactory->newThread(worker));
+ }
+
+ for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
+
+ (*ix)->start();
+ }
+ for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
+
+ (*ix)->start();
+ }
+
+ {Synchronized s(_monitor);
+
+ _workers.insert(newThreads.begin(), newThreads.end());
+ }
+}
+
+void ThreadManager::removeThread(size_t value) {
+
+ std::set<Thread*> removedThreads;
+
+ {Synchronized s(_monitor);
+
+ /* Overly clever loop
+
+ First time through, (idleOnly == 1) just look for idle threads. If that didn't find enough, go through again (idleOnly == 0)
+ and remove a sufficient number of busy threads. */
+
+ for(int idleOnly = 1; idleOnly <= 0; idleOnly--) {
+
+ for(std::set<Thread*>::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) {
+
+ Worker* worker = (Worker*)(*workerThread)->runnable();
+
+ if(worker->_idle || !idleOnly) {
+
+ removedThreads.insert(*workerThread);
+
+ _workers.erase(workerThread);
+ }
+ }
+ }
+
+ _monitor.notifyAll();
+ }
+
+
+ // Join removed threads and free worker
+
+ for(std::set<Thread*>::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) {
+
+ Worker* worker = (Worker*)(*workerThread)->runnable();
+
+ (*workerThread)->join();
+
+ delete worker;
+ }
+}
+
+size_t ThreadManager::idleWorkerCount() const {return _idleCount;}
+
+size_t ThreadManager::workerCount() const {
+
+ Synchronized s(_monitor);
+
+ return _workers.size();
+}
+
+size_t ThreadManager::pendingTaskCount() const {
+
+ Synchronized s(_monitor);
+
+ return _tasks.size();
+}
+
+size_t ThreadManager::totalTaskCount() const {
+
+ Synchronized s(_monitor);
+
+ return _tasks.size() + _workers.size() - _idleCount;
+}
+
+void ThreadManager::add(Runnable* value) {
+
+ Synchronized s(_monitor);
+
+ _tasks.push(new ThreadManager::Task(value));
+
+ /* If queue is empty notify a thread, otherwise all worker threads are running and will get around to this
+ task in time. */
+
+ if(_tasks.size() == 1) {
+
+ assert(_idleCount == _workers.size());
+
+ _monitor.notify();
+ }
+}
+
+void ThreadManager::remove(Runnable* task) {
+
+ Synchronized s(_monitor);
+}
+
+}}} // facebook::thrift::concurrency
+
diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h
new file mode 100644
index 0000000..1742881
--- /dev/null
+++ b/lib/cpp/src/concurrency/ThreadManager.h
@@ -0,0 +1,122 @@
+#if !defined(_concurrency_ThreadManager_h_)
+#define _concurrency_ThreadManager_h_ 1
+
+#include "Monitor.h"
+#include "Thread.h"
+
+#include <set>
+#include <queue>
+
+namespace facebook { namespace thrift { namespace concurrency {
+
+class ThreadManager;
+
+/** PoolPolicy class
+
+ Tracks performance of ThreadManager object and makes desired changes in thread pool count if any. */
+
+class PoolPolicy {
+
+ public:
+
+ virtual ~PoolPolicy() = 0;
+
+ virtual void onlowWatermark(ThreadManager* source) const = 0;
+
+ virtual void onhighWatermark(ThreadManager* source) const = 0;
+
+};
+
+/** ThreadManager class
+
+ This class manages a pool of threads. It uses a ThreadFactory to create threads. It never actually creates or destroys worker threads, rather
+ it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times and informs the
+ PoolPolicy object bound to instances of this manager of interesting transitions. It is then up the PoolPolicy object to decide if the thread pool
+ size needs to be adjusted and call this object addThread and removeThread methods to make changes.
+
+ This design allows different policy implementations to used this code to handle basic worker thread management and worker task execution and focus on
+ policy issues. The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads. */
+
+class ThreadManager {
+
+ public:
+
+ ThreadManager(size_t highWatermark=4, size_t lowWatermark=2);
+
+ virtual ~ThreadManager() = 0;
+
+ virtual const PoolPolicy* poolPolicy() const = 0;
+
+ virtual void poolPolicy(const PoolPolicy* value) = 0;
+
+ virtual const ThreadFactory* threadFactory() const = 0;
+
+ virtual void threadFactory(const ThreadFactory* value) = 0;
+
+ virtual size_t highWatermark() const = 0;
+
+ virtual void highWatermark(size_t value) = 0;
+
+ virtual size_t lowWatermark() const = 0;
+
+ virtual void lowWatermark(size_t value) = 0;
+
+ virtual void addThread(size_t value=1) = 0;
+
+ virtual void removeThread(size_t value=1) = 0;
+
+ /** Gets the current number of idle worker threads */
+
+ virtual size_t idleWorkerCount() const = 0;
+
+ /** Gets the current number of total worker threads */
+
+ virtual size_t workerCount() const = 0;
+
+ /** Gets the current number of pending tasks */
+
+ virtual size_t pendingTaskCount() const = 0;
+
+ /** Gets the current number of pending and executing tasks */
+
+ virtual size_t totalTaskCount() const = 0;
+
+ /** Adds a task to be execued at some time in the future by a worker thread. */
+
+ virtual void add(Runnable* value) = 0;
+
+ /** Removes a pending task */
+
+ virtual void remove(Runnable* task) = 0;
+
+ private:
+
+ size_t _hiwat;
+
+ size_t _lowat;
+
+ size_t _idleCount;
+
+ const PoolPolicy* _poolPolicy;;
+
+ const ThreadFactory* _threadFactory;;
+
+ class Task;
+
+ friend class Task;
+
+ std::queue<Task*> _tasks;
+
+ Monitor _monitor;
+
+ class Worker;
+
+ friend class Worker;
+
+ std::set<Thread*> _workers;
+
+};
+
+}}} // facebook::thrift::concurrency
+
+#endif // !defined(_concurrency_ThreadManager_h_)