Checkpoint of initial cut at thread pool manager for thrift and related concurrency classes.


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664721 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/Monitor.cc b/lib/cpp/src/concurrency/Monitor.cc
new file mode 100644
index 0000000..d1b83d1
--- /dev/null
+++ b/lib/cpp/src/concurrency/Monitor.cc
@@ -0,0 +1,138 @@
+#include "Monitor.h" 
+
+#include <assert.h>
+#include <errno.h>
+#include <pthread.h>
+
+namespace facebook { namespace thrift { namespace concurrency { 
+
+/** Monitor implementation using the POSIX pthread library
+    
+    @author marc
+    @version $Id$ */
+
+class Monitor::Impl {
+
+ public:
+
+  Impl() : 
+    mutexInitialized(false) {
+    
+    /* XXX
+       Need to fix this to handle failures without leaking.  */
+
+    assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0);
+
+    mutexInitialized = true;
+
+    assert(pthread_cond_init(&_pthread_cond, NULL) == 0);
+  }
+
+  ~Impl() {
+
+    if(mutexInitialized) {
+
+      mutexInitialized = false;
+
+      assert(pthread_mutex_destroy(&_pthread_mutex) == 0);
+    }
+
+    if(condInitialized) {
+
+      condInitialized = false;
+
+      assert(pthread_cond_destroy(&_pthread_cond) == 0);
+    }
+  }
+
+  void lock() const {pthread_mutex_lock(&_pthread_mutex);}
+
+  void unlock() const {pthread_mutex_unlock(&_pthread_mutex);}
+
+  void wait(long long timeout) const {
+
+    // XXX Need to assert that caller owns mutex
+
+    if(timeout == 0LL) {
+
+      pthread_cond_wait(&_pthread_cond, &_pthread_mutex);
+
+    } else {
+
+      struct timespec abstime;
+
+      toAbsoluteTimespec(abstime, timeout);
+
+      int result  = pthread_cond_timedwait(&_pthread_cond, &_pthread_mutex, &abstime);
+
+      if(result == ETIMEDOUT) {
+
+	// XXX If result is timeout need to throw timeout exception
+      }
+    }
+  }
+
+  void notify() {
+
+    // XXX Need to assert that caller owns mutex
+
+    assert(pthread_cond_signal(&_pthread_cond) == 0);
+  }
+
+  void notifyAll() {
+
+    // XXX Need to assert that caller owns mutex
+
+    assert(pthread_cond_broadcast(&_pthread_cond) == 0);
+  }
+
+private:
+
+  /** Converts relative timeout specified as a duration in milliseconds to a struct timespec structure
+      specifying current time plus timeout 
+
+      @param timeout time to delay in milliseconds
+      @return struct timespec current time plus timeout  */
+
+  static const void toAbsoluteTimespec(struct timespec& result, long long timeout) {
+
+    // XXX Darwin doesn't seem to have any readily useable hi-res clock.
+
+    time_t seconds; 
+
+    assert(time(&seconds) != (time_t)-1);
+
+    seconds+= (timeout / 1000);
+
+    long nanoseconds = (timeout % 1000) * 1000000;
+
+    result.tv_sec = seconds + (nanoseconds / 1000000000);
+
+    result.tv_nsec = nanoseconds % 1000000000;
+  }
+
+  mutable pthread_mutex_t _pthread_mutex;
+
+  mutable bool mutexInitialized;
+
+  mutable pthread_cond_t _pthread_cond;
+
+  mutable bool condInitialized;
+};
+
+Monitor::Monitor() : _impl(new Monitor::Impl()) {}
+
+      Monitor::~Monitor() { delete _impl;}
+
+void Monitor::lock() const {_impl->lock();}
+
+void Monitor::unlock() const {_impl->unlock();}
+
+void Monitor::wait(long long timeout) const {_impl->wait(timeout);}
+
+void Monitor::notify() const {_impl->notify();}
+
+void Monitor::notifyAll() const {_impl->notifyAll();}
+
+}}} // facebook::thrift::concurrency
+
diff --git a/lib/cpp/src/concurrency/Monitor.h b/lib/cpp/src/concurrency/Monitor.h
new file mode 100644
index 0000000..82544f1
--- /dev/null
+++ b/lib/cpp/src/concurrency/Monitor.h
@@ -0,0 +1,59 @@
+#if !defined(_concurrency_mutex_h_)
+#define _concurrency_mutex_h_ 1
+
+namespace facebook { namespace thrift { namespace concurrency { 
+
+/**  A monitor is a combination mutex and condition-event.  Waiting and notifying condition events requires that the caller own the mutex.  Mutex
+     lock and unlock operations can be performed independently of condition events.  This is more or less analogous to java.lang.Object multi-thread
+     operations
+
+     Note that all methods are const.  Monitors implement logical constness, not bit constness.  This allows const methods to call monitor
+     methods without needing to cast away constness or change to non-const signatures.  
+
+     @author marc
+     @version $Id$ */
+
+class Monitor {
+
+ public:
+
+  Monitor();
+
+  virtual ~Monitor();
+
+  virtual void lock() const;
+
+  virtual void unlock() const;
+
+  virtual void wait(long long timeout=0LL) const;
+
+  virtual void notify() const;
+
+  virtual void notifyAll() const;
+
+ private:
+
+  class Impl;
+
+  Impl* _impl;
+};
+
+class Synchronized {
+ public:
+  
+ Synchronized(const Monitor& value) : _monitor(value) {
+    _monitor.lock();
+  }
+
+  ~Synchronized() {
+    _monitor.unlock();
+  }
+
+ private:
+  const Monitor& _monitor;
+};
+
+
+}}} // facebook::thrift::concurrency
+
+#endif // !defined(_concurrency_mutex_h_)
diff --git a/lib/cpp/src/concurrency/Mutex.cc b/lib/cpp/src/concurrency/Mutex.cc
new file mode 100644
index 0000000..39d768e
--- /dev/null
+++ b/lib/cpp/src/concurrency/Mutex.cc
@@ -0,0 +1,38 @@
+#include "Mutex.h"
+
+#include <assert.h>
+#include <pthread.h>
+
+namespace facebook { namespace thrift { namespace concurrency { 
+
+class Mutex::impl {
+public:
+  impl() : initialized(false) {
+    assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0);
+    initialized = true;
+  }
+
+  ~impl() {
+    if(initialized) {
+      initialized = false;
+      assert(pthread_mutex_destroy(&_pthread_mutex) == 0);
+    }
+  }
+
+  void lock() const {pthread_mutex_lock(&_pthread_mutex);}
+
+  void unlock() const {pthread_mutex_unlock(&_pthread_mutex);}
+
+private:
+  mutable pthread_mutex_t _pthread_mutex;
+  mutable bool initialized;
+};
+
+Mutex::Mutex() : _impl(new Mutex::impl()) {}
+
+void Mutex::lock() const {_impl->lock();}
+
+void Mutex::unlock() const {_impl->unlock();}
+
+}}} // facebook::thrift::concurrency
+
diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h
new file mode 100644
index 0000000..e8371ea
--- /dev/null
+++ b/lib/cpp/src/concurrency/Mutex.h
@@ -0,0 +1,43 @@
+#if !defined(_concurrency_mutex_h_)
+#define _concurrency_mutex_h_ 1
+
+namespace facebook { namespace thrift { namespace concurrency { 
+
+class Mutex {
+
+ public:
+
+  Mutex();
+
+  virtual ~Mutex() {}
+
+  virtual void lock() const;
+
+  virtual void unlock() const;
+
+ private:
+
+  class impl;
+
+  impl* _impl;
+};
+
+class MutexMonitor {
+ public:
+  
+  MutexMonitor(const Mutex& value) : _mutex(value) {
+    _mutex.lock();
+  }
+
+  ~MutexMonitor() {
+    _mutex.unlock();
+  }
+
+ private:
+  const Mutex& _mutex;
+};
+
+
+}}} // facebook::thrift::concurrency
+
+#endif // !defined(_concurrency_mutex_h_)
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc
new file mode 100644
index 0000000..e9d52f0
--- /dev/null
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc
@@ -0,0 +1,215 @@
+#include "PosixThreadFactory.h"
+
+#include <assert.h>
+#include <pthread.h>
+
+namespace facebook { namespace thrift { namespace concurrency {
+
+/**  The POSIX thread class.  */
+
+class PthreadThread: public Thread {
+
+public:
+  enum STATE {uninitialized, 
+	      starting,
+	      started,
+	      stopping,
+	      stopped
+  };
+
+  static const int MB = 1024 * 1024;
+
+private:
+
+  pthread_t _pthread;
+
+  STATE _state;
+
+  int _policy;
+
+  int _priority;
+
+  int _stackSize;
+
+  Runnable* _runnable;
+
+  static void* threadMain(void* arg) {
+
+    // XXX need a lock here when testing thread state
+
+    PthreadThread* thread = (PthreadThread*)arg;
+
+    if(thread->_state != starting) {
+      return (void*)0;
+    }
+
+    thread->_state = starting;
+
+    thread->_runnable->run();
+
+    if(thread->_state != stopping && thread->_state != stopped) {
+      thread->_state = stopping;
+    }
+    
+    return (void*)0;
+  }
+
+public:
+  
+  PthreadThread(int policy, int priority, int stackSize, Runnable* runnable) : 
+    _pthread(0),
+    _state(uninitialized), 
+    _policy(policy),
+    _priority(priority),
+    _stackSize(stackSize),
+    _runnable(runnable)
+  {}
+
+  void start() {
+
+    if(_state != uninitialized) {
+      return;
+    }
+
+    _state = starting;
+
+    pthread_attr_t thread_attr;
+
+    assert(pthread_attr_init(&thread_attr) == 0);
+
+    assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0);
+
+    // Set thread stack size
+
+    assert(pthread_attr_setstacksize(&thread_attr, MB * _stackSize) == 0);
+
+    // Set thread policy
+
+    assert(pthread_attr_setschedpolicy(&thread_attr, _policy) == 0);
+
+    struct sched_param sched_param;
+    sched_param.sched_priority = _priority;
+
+    // Set thread priority
+
+    assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
+
+    assert(pthread_create(&_pthread, &thread_attr, PthreadThread::threadMain, (void*)this) == 0);
+  }
+
+  void join() {
+    
+    if(_state != stopped) {
+      
+      void* ignore;
+      
+      pthread_join(_pthread, &ignore);
+    }
+  }
+
+  const Runnable* runnable() const {return _runnable;}
+
+};
+
+/** POSIX Thread factory implementation */
+
+class PosixThreadFactory::Impl {
+
+private:
+
+  POLICY _policy;
+
+  PRIORITY _priority;
+
+  int _stackSize;
+
+  bool _detached;
+
+  /** Converts generic posix thread schedule policy enums into pthread API values. */
+
+  static int toPthreadPolicy(POLICY policy) {
+    switch(policy) {
+    case OTHER: return SCHED_OTHER; break;
+    case FIFO: return SCHED_FIFO; break;
+    case ROUND_ROBIN: return SCHED_RR; break;
+    default: return SCHED_OTHER; break;
+    }
+  }
+
+  /** Converts relative thread priorities to absolute value based on posix thread scheduler policy
+
+      The idea is simply to divide up the priority range for the given policy into the correpsonding relative
+      priority level (lowest..highest) and then prorate accordingly. */
+
+  static int toPthreadPriority(POLICY policy, PRIORITY priority) {
+
+    int pthread_policy = toPthreadPolicy(policy);
+
+    int min_priority = sched_get_priority_min(pthread_policy);
+
+    int max_priority = sched_get_priority_max(pthread_policy);
+
+    int quanta = (HIGHEST - LOWEST) + 1;
+
+    float stepsperquanta = (max_priority - min_priority) / quanta;
+
+    if(priority <= HIGHEST) {
+
+      return (int)(min_priority + stepsperquanta * priority);
+    } else {
+
+      // should never get here for priority increments.
+
+      assert(false);
+
+      return (int)(min_priority + stepsperquanta * NORMAL);
+    }
+  }
+
+public:
+
+  Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) : 
+    _policy(policy),
+    _priority(priority),
+    _stackSize(stackSize),
+    _detached(detached) {
+  }
+
+  /** Creates a new POSIX thread to run the runnable object 
+
+      @param runnable A runnable object */
+
+  Thread* newThread(Runnable* runnable) const {
+
+    return new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable);
+  }
+
+  int stackSize() const { return _stackSize;}
+
+  void stackSize(int value) { _stackSize = value;}
+
+  PRIORITY priority() const { return _priority;}
+
+  /** Sets priority.
+      
+      XXX
+      Need to handle incremental priorities properl. */
+
+  void priority(PRIORITY value) { _priority = value;}
+
+};
+
+PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) : 
+  _impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
+
+Thread* PosixThreadFactory::newThread(Runnable* runnable) const {return _impl->newThread(runnable);}
+
+int PosixThreadFactory::stackSize() const {return _impl->stackSize();}
+
+void PosixThreadFactory::stackSize(int value) {_impl->stackSize(value);}
+
+PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const {return _impl->priority();}
+
+void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) {_impl->priority(value);}
+
+}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h
new file mode 100644
index 0000000..88a0888
--- /dev/null
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.h
@@ -0,0 +1,76 @@
+#if !defined(_concurrency_PosixThreadFactory_h_)
+#define _concurrency_PosixThreadFactory_h_ 1
+
+#include "Thread.h"
+
+namespace facebook { namespace thrift { namespace concurrency { 
+
+/** A thread factory to create posix threads 
+
+    @author marc */
+
+class PosixThreadFactory : public ThreadFactory {
+
+ public:
+
+  /** POSIX Thread scheduler policies */
+
+  enum POLICY {
+    OTHER,
+    FIFO,
+    ROUND_ROBIN
+  };
+
+  /** POSIX Thread scheduler relative priorities,
+      
+      Absolute priority is determined by scheduler policy and OS.  This enumeration specifies relative priorities such that one can
+      specify a priority withing a giving scheduler policy without knowing the absolute value of the priority. */
+
+  enum PRIORITY {
+    LOWEST = 0,
+    LOWER = 1,
+    LOW = 2,
+    NORMAL = 3,
+    HIGH = 4,
+    HIGHER = 5,
+    HIGHEST = 6,
+    INCREMENT = 7,
+    DECREMENT = 8
+  };
+
+  PosixThreadFactory(POLICY policy=ROUND_ROBIN, PRIORITY priority=NORMAL, int stackSize=1, bool detached=false);
+
+  // From ThreadFactory;
+
+  Thread* newThread(Runnable* runnable) const;
+
+  /** Sets stack size for created threads
+
+     @param value size in megabytes */
+
+  virtual void stackSize(int value);
+
+  /** Gets stack size for created threads
+
+      @return int size in megabytes */
+
+  virtual int stackSize() const;
+
+  /** Sets priority relative to current policy */
+
+  virtual void priority(PRIORITY priority);
+
+  /** Gets priority relative to current policy */
+
+  virtual PRIORITY priority() const;
+  
+ private:
+  
+  class Impl;
+
+  Impl* _impl;
+};
+
+}}} // facebook::thrift::concurrency
+
+#endif // !defined(_concurrency_PosixThreadFactory_h_)
diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h
new file mode 100644
index 0000000..befb4fe
--- /dev/null
+++ b/lib/cpp/src/concurrency/Thread.h
@@ -0,0 +1,59 @@
+#if !defined(_concurrency_Thread_h_)
+#define _concurrency_Thread_h_ 1
+
+namespace facebook { namespace thrift { namespace concurrency { 
+
+class Thread;
+
+/** Minimal runnable class.  More or less analogous to java.lang.Runnable. */
+
+class Runnable {
+
+ public:
+  
+  virtual ~Runnable() {};
+
+  virtual void run() = 0;
+};
+
+/** Minimal thread class.  Returned by thread factory bound to a Runnable object and ready to start execution.  More or less analogous to java.lang.Thread
+    (minus all the thread group, priority, mode and other baggage, since that is difficult to abstract across platforms and is left for platform-specific
+    ThreadFactory implemtations to deal with - @see facebook::thrift::concurrency::ThreadFactory) */
+    
+ 
+class Thread {
+  
+ public:
+
+  virtual ~Thread() {};
+
+  /** Starts the thread.  Does platform specific thread creation and configuration then invokes the run method of the Runnable object bound to this 
+      thread. */
+
+  virtual void start() = 0;
+
+  /** Join this thread
+
+      Current thread blocks until this target thread completes. */
+
+  virtual void join() = 0;
+
+  /** Gets the runnable object this thread is hosting */
+  
+  virtual const Runnable* runnable() const = 0;
+};
+
+/** Factory to create platform-specific thread object and bind them to Runnable object for execution */
+
+class ThreadFactory {
+
+ public:
+
+  virtual ~ThreadFactory() {}
+
+  virtual Thread* newThread(Runnable* runnable) const = 0;
+};
+
+}}} // facebook::thrift::concurrency
+
+#endif // !defined(_concurrency_Thread_h_)
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
new file mode 100644
index 0000000..c4ca2b1
--- /dev/null
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -0,0 +1,303 @@
+#include "ThreadManager.h"
+
+#include <assert.h>
+
+namespace facebook { namespace thrift { namespace concurrency { 
+
+/** ThreadManager class
+    
+    This class manages a pool of threads.  It uses a ThreadFactory to create threads.  It never actually creates or destroys worker threads, rather
+    it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times and informs the
+    PoolPolicy object bound to instances of this manager of interesting transitions.  It is then up the PoolPolicy object to decide if the thread pool
+    size needs to be adjusted and call this object addThread and removeThread methods to make changes.
+
+    This design allows different policy implementations to used this code to handle basic worker thread management and worker task execution and focus on
+    policy issues.  The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads.
+
+    @author marc
+    @version $Id */
+
+class ThreadManager::Task : public Runnable {
+
+public:
+  enum STATE {
+    WAITING,
+    EXECUTING,
+    CANCELLED,
+    COMPLETE
+  };
+
+  Task(Runnable* runnable) :
+    _runnable(runnable),
+    _state(WAITING) 
+    {}
+
+  ~Task() {};
+
+  void run() {
+    if(_state == EXECUTING) {
+      _runnable->run();
+      _state = COMPLETE;
+    }
+  }
+
+ private:
+
+  Runnable* _runnable;
+
+  STATE _state;
+};
+
+class ThreadManager::Worker: public Runnable {
+
+  enum STATE {
+    UNINITIALIZED,
+    STARTING,
+    STARTED,
+    STOPPING,
+    STOPPED
+  };
+
+ public:
+  Worker(ThreadManager* manager) : 
+    _manager(manager),
+    _state(UNINITIALIZED),
+    _idle(false)
+  {}
+
+  ~Worker() {}
+
+  /** Worker entry point
+
+      As long as worker thread is running, pull tasks off the task queue and execute. */
+
+  void run() {
+
+    {Synchronized(_manager->_monitor);
+
+      if(_state == STARTING) {
+	_state = STARTED;
+      }
+    }
+
+    do {
+
+      ThreadManager::Task* task = NULL;
+
+      /* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop).
+
+	 Once the queue is non-empty, dequeue a task, release monitor, and execute. */
+
+      {Synchronized(_manager->_monitor);
+
+	while(_state == STARTED && _manager->_tasks.empty()) {
+	  
+	  _manager->_idleCount++;
+
+	  _idle = true;
+	  
+	  _manager->_monitor.wait();
+
+	  _idle = false;
+	  
+	  _manager->_idleCount--;
+	}
+	
+	if(_state == STARTED) {
+	  
+	  task = _manager->_tasks.front();
+	}
+      }
+
+      if(task != NULL) {
+
+	task->run();
+
+	delete task;
+      }
+
+    } while(_state == STARTED);
+
+    {Synchronized(_manager->_monitor);
+
+      if(_state == STOPPING) {
+
+	_state = STOPPED; 
+
+	_manager->_monitor.notify();
+
+      }
+    }
+    
+    return;
+  }
+
+ private:
+
+  ThreadManager* _manager;
+
+  friend class ThreadManager;
+
+  STATE _state;
+
+  bool _idle;
+};
+
+ThreadManager::ThreadManager(size_t highWatermark, size_t lowWatermark) : 
+  _hiwat(highWatermark), 
+  _lowat(lowWatermark) {
+}
+
+ThreadManager::~ThreadManager() {}
+
+size_t ThreadManager::ThreadManager::highWatermark() const {return _hiwat;}
+
+void ThreadManager::highWatermark(size_t value) {_hiwat = value;}
+
+size_t ThreadManager::lowWatermark() const {return _lowat;}
+
+void ThreadManager::lowWatermark(size_t value) {_lowat = value;}
+
+const PoolPolicy* ThreadManager::poolPolicy() const {
+
+  Synchronized s(_monitor); 
+  
+  return _poolPolicy;
+}
+  
+void ThreadManager::poolPolicy(const PoolPolicy*  value) {
+
+  Synchronized s(_monitor); 
+
+  _poolPolicy = value;
+}
+
+const ThreadFactory* ThreadManager::threadFactory() const {
+
+  Synchronized s(_monitor); 
+
+  return _threadFactory;
+}
+      
+void ThreadManager::threadFactory(const ThreadFactory*  value) {
+    
+  Synchronized s(_monitor); 
+  
+  _threadFactory = value;
+}
+
+void ThreadManager::addThread(size_t value) {
+
+  std::set<Thread*> newThreads;
+  
+  for(size_t ix = 0; ix < value; ix++) {
+
+    ThreadManager::Worker* worker = new ThreadManager::Worker(this);
+
+    newThreads.insert(_threadFactory->newThread(worker));
+  }
+  
+  for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
+
+    (*ix)->start();
+  }
+  for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
+
+    (*ix)->start();
+  }
+
+  {Synchronized s(_monitor); 
+
+    _workers.insert(newThreads.begin(), newThreads.end());
+  }
+}
+
+void ThreadManager::removeThread(size_t value) {
+
+  std::set<Thread*> removedThreads;
+
+  {Synchronized s(_monitor); 
+  
+    /* Overly clever loop
+
+       First time through, (idleOnly == 1) just look for idle threads.  If that didn't find enough, go through again (idleOnly == 0)
+       and remove a sufficient number of busy threads. */
+
+    for(int idleOnly = 1; idleOnly <= 0; idleOnly--) {
+      
+      for(std::set<Thread*>::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) {
+
+	Worker* worker = (Worker*)(*workerThread)->runnable();
+
+	if(worker->_idle || !idleOnly) {
+	
+	  removedThreads.insert(*workerThread);
+	
+	  _workers.erase(workerThread);
+	}
+      }
+    }
+    
+    _monitor.notifyAll();
+  }
+
+
+  // Join removed threads and free worker 
+
+  for(std::set<Thread*>::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) {
+
+    Worker* worker = (Worker*)(*workerThread)->runnable();
+
+    (*workerThread)->join();
+
+    delete worker;
+  }
+}
+
+size_t ThreadManager::idleWorkerCount() const {return _idleCount;}
+
+size_t ThreadManager::workerCount() const {
+
+  Synchronized s(_monitor); 
+
+  return _workers.size();
+}
+
+size_t ThreadManager::pendingTaskCount() const {
+
+  Synchronized s(_monitor); 
+
+  return _tasks.size();
+}
+
+size_t ThreadManager::totalTaskCount() const {
+
+  Synchronized s(_monitor); 
+
+  return _tasks.size() + _workers.size() - _idleCount;
+}
+
+void ThreadManager::add(Runnable* value) {
+
+  Synchronized s(_monitor); 
+
+  _tasks.push(new ThreadManager::Task(value));
+
+  /* If queue is empty notify a thread, otherwise all worker threads are running and will get around to this
+     task in time. */
+
+  if(_tasks.size() == 1) {
+
+    assert(_idleCount == _workers.size());
+
+    _monitor.notify();
+  }
+}
+
+void ThreadManager::remove(Runnable* task) {
+
+  Synchronized s(_monitor); 
+}
+
+}}} // facebook::thrift::concurrency
+
diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h
new file mode 100644
index 0000000..1742881
--- /dev/null
+++ b/lib/cpp/src/concurrency/ThreadManager.h
@@ -0,0 +1,122 @@
+#if !defined(_concurrency_ThreadManager_h_)
+#define _concurrency_ThreadManager_h_ 1
+
+#include "Monitor.h"
+#include "Thread.h"
+
+#include <set>
+#include <queue>
+
+namespace facebook { namespace thrift { namespace concurrency { 
+
+class ThreadManager;
+
+/** PoolPolicy class 
+
+    Tracks performance of ThreadManager object and makes desired changes in thread pool count if any. */
+
+class PoolPolicy {
+
+ public:
+
+  virtual ~PoolPolicy() = 0;
+
+  virtual void onlowWatermark(ThreadManager* source) const = 0;
+
+  virtual void onhighWatermark(ThreadManager* source) const = 0;
+
+};
+
+/** ThreadManager class
+    
+    This class manages a pool of threads.  It uses a ThreadFactory to create threads.  It never actually creates or destroys worker threads, rather
+    it maintains statistics on number of idle threads, number of active threads, task backlog, and average wait and service times and informs the
+    PoolPolicy object bound to instances of this manager of interesting transitions.  It is then up the PoolPolicy object to decide if the thread pool
+    size needs to be adjusted and call this object addThread and removeThread methods to make changes.
+
+    This design allows different policy implementations to used this code to handle basic worker thread management and worker task execution and focus on
+    policy issues.  The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads. */
+
+class ThreadManager {
+
+ public:
+
+  ThreadManager(size_t highWatermark=4, size_t lowWatermark=2);
+
+  virtual ~ThreadManager() = 0;
+
+  virtual const PoolPolicy* poolPolicy() const = 0;
+
+  virtual void poolPolicy(const PoolPolicy* value) = 0;
+
+  virtual const ThreadFactory* threadFactory() const = 0;
+
+  virtual void threadFactory(const ThreadFactory* value) = 0;
+
+  virtual size_t highWatermark() const = 0;
+
+  virtual void highWatermark(size_t value) = 0;
+
+  virtual size_t lowWatermark() const = 0;
+
+  virtual void lowWatermark(size_t value) = 0;
+
+  virtual void addThread(size_t value=1) = 0;
+
+  virtual void removeThread(size_t value=1) = 0;
+
+  /** Gets the current number of idle worker threads */
+
+  virtual size_t idleWorkerCount() const = 0;
+
+  /** Gets the current number of total worker threads */
+
+  virtual size_t workerCount() const = 0;
+
+  /** Gets the current number of pending tasks */
+
+  virtual size_t pendingTaskCount() const  = 0;
+
+  /** Gets the current number of pending and executing tasks */
+
+  virtual size_t totalTaskCount() const = 0;
+
+  /** Adds a task to be execued at some time in the future by a worker thread. */
+
+  virtual void add(Runnable* value) = 0;
+
+  /** Removes a pending task */
+
+  virtual void remove(Runnable* task) = 0;
+
+ private:
+
+  size_t _hiwat;
+
+  size_t _lowat;
+
+  size_t _idleCount;
+
+  const PoolPolicy* _poolPolicy;;
+
+  const ThreadFactory* _threadFactory;;
+
+  class Task;
+
+  friend class Task;
+
+  std::queue<Task*> _tasks;
+
+  Monitor _monitor;
+
+  class Worker;
+
+  friend class Worker;
+
+  std::set<Thread*> _workers;
+
+};
+
+}}} // facebook::thrift::concurrency
+
+#endif // !defined(_concurrency_ThreadManager_h_)