Thrift now a TLP - INFRA-3116

git-svn-id: https://svn.apache.org/repos/asf/thrift/branches/0.1.x@1028168 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/Exception.h b/lib/cpp/src/concurrency/Exception.h
new file mode 100644
index 0000000..ec46629
--- /dev/null
+++ b/lib/cpp/src/concurrency/Exception.h
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_EXCEPTION_H_
+#define _THRIFT_CONCURRENCY_EXCEPTION_H_ 1
+
+#include <exception>
+#include <Thrift.h>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+class NoSuchTaskException : public apache::thrift::TException {};
+
+class UncancellableTaskException : public apache::thrift::TException {};
+
+class InvalidArgumentException : public apache::thrift::TException {};
+
+class IllegalStateException : public apache::thrift::TException {};
+
+class TimedOutException : public apache::thrift::TException {
+public:
+  TimedOutException():TException("TimedOutException"){};
+  TimedOutException(const std::string& message ) :
+    TException(message) {}
+};
+
+class TooManyPendingTasksException : public apache::thrift::TException {
+public:
+  TooManyPendingTasksException():TException("TooManyPendingTasksException"){};
+  TooManyPendingTasksException(const std::string& message ) :
+    TException(message) {}
+};
+
+class SystemResourceException : public apache::thrift::TException {
+public:
+    SystemResourceException() {}
+
+    SystemResourceException(const std::string& message) :
+        TException(message) {}
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_EXCEPTION_H_
diff --git a/lib/cpp/src/concurrency/FunctionRunner.h b/lib/cpp/src/concurrency/FunctionRunner.h
new file mode 100644
index 0000000..2216927
--- /dev/null
+++ b/lib/cpp/src/concurrency/FunctionRunner.h
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_FUNCTION_RUNNER_H
+#define _THRIFT_CONCURRENCY_FUNCTION_RUNNER_H 1
+
+#include <tr1/functional>
+#include "thrift/lib/cpp/concurrency/Thread.h"
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * Convenient implementation of Runnable that will execute arbitrary callbacks.
+ * Interfaces are provided to accept both a generic 'void(void)' callback, and
+ * a 'void* (void*)' pthread_create-style callback.
+ *
+ * Example use:
+ *  void* my_thread_main(void* arg);
+ *  shared_ptr<ThreadFactory> factory = ...;
+ *  shared_ptr<Thread> thread =
+ *    factory->newThread(shared_ptr<FunctionRunner>(
+ *      new FunctionRunner(my_thread_main, some_argument)));
+ *  thread->start();
+ *
+ *
+ */
+
+class FunctionRunner : public Runnable {
+ public:
+  // This is the type of callback 'pthread_create()' expects.
+  typedef void* (*PthreadFuncPtr)(void *arg);
+  // This a fully-generic void(void) callback for custom bindings.
+  typedef std::tr1::function<void()> VoidFunc;
+
+  /**
+   * Given a 'pthread_create' style callback, this FunctionRunner will
+   * execute the given callback.  Note that the 'void*' return value is ignored.
+   */
+  FunctionRunner(PthreadFuncPtr func, void* arg)
+   : func_(std::tr1::bind(func, arg))
+  { }
+
+  /**
+   * Given a generic callback, this FunctionRunner will execute it.
+   */
+  FunctionRunner(const VoidFunc& cob)
+   : func_(cob)
+  { }
+
+
+  void run() {
+    func_();
+  }
+
+ private:
+  VoidFunc func_;
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_FUNCTION_RUNNER_H
diff --git a/lib/cpp/src/concurrency/Monitor.cpp b/lib/cpp/src/concurrency/Monitor.cpp
new file mode 100644
index 0000000..2055caa
--- /dev/null
+++ b/lib/cpp/src/concurrency/Monitor.cpp
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "Monitor.h"
+#include "Exception.h"
+#include "Util.h"
+
+#include <assert.h>
+#include <errno.h>
+
+#include <iostream>
+
+#include <pthread.h>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * Monitor implementation using the POSIX pthread library
+ *
+ * @version $Id:$
+ */
+class Monitor::Impl {
+
+ public:
+
+  Impl() :
+    mutexInitialized_(false),
+    condInitialized_(false) {
+
+    if (pthread_mutex_init(&pthread_mutex_, NULL) == 0) {
+      mutexInitialized_ = true;
+
+      if (pthread_cond_init(&pthread_cond_, NULL) == 0) {
+        condInitialized_ = true;
+      }
+    }
+
+    if (!mutexInitialized_ || !condInitialized_) {
+      cleanup();
+      throw SystemResourceException();
+    }
+  }
+
+  ~Impl() { cleanup(); }
+
+  void lock() const { pthread_mutex_lock(&pthread_mutex_); }
+
+  void unlock() const { pthread_mutex_unlock(&pthread_mutex_); }
+
+  void wait(int64_t timeout) const {
+
+    // XXX Need to assert that caller owns mutex
+    assert(timeout >= 0LL);
+    if (timeout == 0LL) {
+      int iret = pthread_cond_wait(&pthread_cond_, &pthread_mutex_);
+      assert(iret == 0);
+    } else {
+      struct timespec abstime;
+      int64_t now = Util::currentTime();
+      Util::toTimespec(abstime, now + timeout);
+      int result = pthread_cond_timedwait(&pthread_cond_,
+                                          &pthread_mutex_,
+                                          &abstime);
+      if (result == ETIMEDOUT) {
+        // pthread_cond_timedwait has been observed to return early on
+        // various platforms, so comment out this assert.
+        //assert(Util::currentTime() >= (now + timeout));
+        throw TimedOutException();
+      }
+    }
+  }
+
+  void notify() {
+    // XXX Need to assert that caller owns mutex
+    int iret = pthread_cond_signal(&pthread_cond_);
+    assert(iret == 0);
+  }
+
+  void notifyAll() {
+    // XXX Need to assert that caller owns mutex
+    int iret = pthread_cond_broadcast(&pthread_cond_);
+    assert(iret == 0);
+  }
+
+ private:
+
+  void cleanup() {
+    if (mutexInitialized_) {
+      mutexInitialized_ = false;
+      int iret = pthread_mutex_destroy(&pthread_mutex_);
+      assert(iret == 0);
+    }
+
+    if (condInitialized_) {
+      condInitialized_ = false;
+      int iret = pthread_cond_destroy(&pthread_cond_);
+      assert(iret == 0);
+    }
+  }
+
+  mutable pthread_mutex_t pthread_mutex_;
+  mutable bool mutexInitialized_;
+  mutable pthread_cond_t pthread_cond_;
+  mutable bool condInitialized_;
+};
+
+Monitor::Monitor() : impl_(new Monitor::Impl()) {}
+
+Monitor::~Monitor() { delete impl_; }
+
+void Monitor::lock() const { impl_->lock(); }
+
+void Monitor::unlock() const { impl_->unlock(); }
+
+void Monitor::wait(int64_t timeout) const { impl_->wait(timeout); }
+
+void Monitor::notify() const { impl_->notify(); }
+
+void Monitor::notifyAll() const { impl_->notifyAll(); }
+
+}}} // apache::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/Monitor.h b/lib/cpp/src/concurrency/Monitor.h
new file mode 100644
index 0000000..234bf32
--- /dev/null
+++ b/lib/cpp/src/concurrency/Monitor.h
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_MONITOR_H_
+#define _THRIFT_CONCURRENCY_MONITOR_H_ 1
+
+#include "Exception.h"
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * A monitor is a combination mutex and condition-event.  Waiting and
+ * notifying condition events requires that the caller own the mutex.  Mutex
+ * lock and unlock operations can be performed independently of condition
+ * events.  This is more or less analogous to java.lang.Object multi-thread
+ * operations
+ *
+ * Note that all methods are const.  Monitors implement logical constness, not
+ * bit constness.  This allows const methods to call monitor methods without
+ * needing to cast away constness or change to non-const signatures.
+ *
+ * @version $Id:$
+ */
+class Monitor {
+
+ public:
+
+  Monitor();
+
+  virtual ~Monitor();
+
+  virtual void lock() const;
+
+  virtual void unlock() const;
+
+  virtual void wait(int64_t timeout=0LL) const;
+
+  virtual void notify() const;
+
+  virtual void notifyAll() const;
+
+ private:
+
+  class Impl;
+
+  Impl* impl_;
+};
+
+class Synchronized {
+ public:
+
+ Synchronized(const Monitor& value) :
+   monitor_(value) {
+   monitor_.lock();
+  }
+
+  ~Synchronized() {
+    monitor_.unlock();
+  }
+
+ private:
+  const Monitor& monitor_;
+};
+
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_MONITOR_H_
diff --git a/lib/cpp/src/concurrency/Mutex.cpp b/lib/cpp/src/concurrency/Mutex.cpp
new file mode 100644
index 0000000..045dbdf
--- /dev/null
+++ b/lib/cpp/src/concurrency/Mutex.cpp
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "Mutex.h"
+
+#include <assert.h>
+#include <pthread.h>
+
+using boost::shared_ptr;
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * Implementation of Mutex class using POSIX mutex
+ *
+ * @version $Id:$
+ */
+class Mutex::impl {
+ public:
+  impl(Initializer init) : initialized_(false) {
+    init(&pthread_mutex_);
+    initialized_ = true;
+  }
+
+  ~impl() {
+    if (initialized_) {
+      initialized_ = false;
+      int ret = pthread_mutex_destroy(&pthread_mutex_);
+      assert(ret == 0);
+    }
+  }
+
+  void lock() const { pthread_mutex_lock(&pthread_mutex_); }
+
+  bool trylock() const { return (0 == pthread_mutex_trylock(&pthread_mutex_)); }
+
+  void unlock() const { pthread_mutex_unlock(&pthread_mutex_); }
+
+ private:
+  mutable pthread_mutex_t pthread_mutex_;
+  mutable bool initialized_;
+};
+
+Mutex::Mutex(Initializer init) : impl_(new Mutex::impl(init)) {}
+
+void Mutex::lock() const { impl_->lock(); }
+
+bool Mutex::trylock() const { return impl_->trylock(); }
+
+void Mutex::unlock() const { impl_->unlock(); }
+
+void Mutex::DEFAULT_INITIALIZER(void* arg) {
+  pthread_mutex_t* pthread_mutex = (pthread_mutex_t*)arg;
+  int ret = pthread_mutex_init(pthread_mutex, NULL);
+  assert(ret == 0);
+}
+
+static void init_with_kind(pthread_mutex_t* mutex, int kind) {
+  pthread_mutexattr_t mutexattr;
+  int ret = pthread_mutexattr_init(&mutexattr);
+  assert(ret == 0);
+
+  // Apparently, this can fail.  Should we really be aborting?
+  ret = pthread_mutexattr_settype(&mutexattr, kind);
+  assert(ret == 0);
+
+  ret = pthread_mutex_init(mutex, &mutexattr);
+  assert(ret == 0);
+
+  ret = pthread_mutexattr_destroy(&mutexattr);
+  assert(ret == 0);
+}
+
+#ifdef PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP
+void Mutex::ADAPTIVE_INITIALIZER(void* arg) {
+  // From mysql source: mysys/my_thr_init.c
+  // Set mutex type to "fast" a.k.a "adaptive"
+  //
+  // In this case the thread may steal the mutex from some other thread
+  // that is waiting for the same mutex. This will save us some
+  // context switches but may cause a thread to 'starve forever' while
+  // waiting for the mutex (not likely if the code within the mutex is
+  // short).
+  init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_ADAPTIVE_NP);
+}
+#endif
+
+#ifdef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
+void Mutex::RECURSIVE_INITIALIZER(void* arg) {
+  init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_RECURSIVE_NP);
+}
+#endif
+
+
+/**
+ * Implementation of ReadWriteMutex class using POSIX rw lock
+ *
+ * @version $Id:$
+ */
+class ReadWriteMutex::impl {
+public:
+  impl() : initialized_(false) {
+    int ret = pthread_rwlock_init(&rw_lock_, NULL);
+    assert(ret == 0);
+    initialized_ = true;
+  }
+
+  ~impl() {
+    if(initialized_) {
+      initialized_ = false;
+      int ret = pthread_rwlock_destroy(&rw_lock_);
+      assert(ret == 0);
+    }
+  }
+
+  void acquireRead() const { pthread_rwlock_rdlock(&rw_lock_); }
+
+  void acquireWrite() const { pthread_rwlock_wrlock(&rw_lock_); }
+
+  bool attemptRead() const { return pthread_rwlock_tryrdlock(&rw_lock_); }
+
+  bool attemptWrite() const { return pthread_rwlock_trywrlock(&rw_lock_); }
+
+  void release() const { pthread_rwlock_unlock(&rw_lock_); }
+
+private:
+  mutable pthread_rwlock_t rw_lock_;
+  mutable bool initialized_;
+};
+
+ReadWriteMutex::ReadWriteMutex() : impl_(new ReadWriteMutex::impl()) {}
+
+void ReadWriteMutex::acquireRead() const { impl_->acquireRead(); }
+
+void ReadWriteMutex::acquireWrite() const { impl_->acquireWrite(); }
+
+bool ReadWriteMutex::attemptRead() const { return impl_->attemptRead(); }
+
+bool ReadWriteMutex::attemptWrite() const { return impl_->attemptWrite(); }
+
+void ReadWriteMutex::release() const { impl_->release(); }
+
+}}} // apache::thrift::concurrency
+
diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h
new file mode 100644
index 0000000..884412b
--- /dev/null
+++ b/lib/cpp/src/concurrency/Mutex.h
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_MUTEX_H_
+#define _THRIFT_CONCURRENCY_MUTEX_H_ 1
+
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * A simple mutex class
+ *
+ * @version $Id:$
+ */
+class Mutex {
+ public:
+  typedef void (*Initializer)(void*);
+
+  Mutex(Initializer init = DEFAULT_INITIALIZER);
+  virtual ~Mutex() {}
+  virtual void lock() const;
+  virtual bool trylock() const;
+  virtual void unlock() const;
+
+  static void DEFAULT_INITIALIZER(void*);
+  static void ADAPTIVE_INITIALIZER(void*);
+  static void RECURSIVE_INITIALIZER(void*);
+
+ private:
+
+  class impl;
+  boost::shared_ptr<impl> impl_;
+};
+
+class ReadWriteMutex {
+public:
+  ReadWriteMutex();
+  virtual ~ReadWriteMutex() {}
+
+  // these get the lock and block until it is done successfully
+  virtual void acquireRead() const;
+  virtual void acquireWrite() const;
+
+  // these attempt to get the lock, returning false immediately if they fail
+  virtual bool attemptRead() const;
+  virtual bool attemptWrite() const;
+
+  // this releases both read and write locks
+  virtual void release() const;
+
+private:
+
+  class impl;
+  boost::shared_ptr<impl> impl_;
+};
+
+class Guard {
+ public:
+  Guard(const Mutex& value) : mutex_(value) {
+    mutex_.lock();
+  }
+  ~Guard() {
+    mutex_.unlock();
+  }
+
+ private:
+  const Mutex& mutex_;
+};
+
+class RWGuard {
+  public:
+    RWGuard(const ReadWriteMutex& value, bool write = 0) : rw_mutex_(value) {
+      if (write) {
+        rw_mutex_.acquireWrite();
+      } else {
+        rw_mutex_.acquireRead();
+      }
+    }
+    ~RWGuard() {
+      rw_mutex_.release();
+    }
+  private:
+    const ReadWriteMutex& rw_mutex_;
+};
+
+
+// A little hack to prevent someone from trying to do "Guard(m);"
+// Sorry for polluting the global namespace, but I think it's worth it.
+#define Guard(m) incorrect_use_of_Guard(m)
+#define RWGuard(m) incorrect_use_of_RWGuard(m)
+
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_MUTEX_H_
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
new file mode 100644
index 0000000..e48dce3
--- /dev/null
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "PosixThreadFactory.h"
+#include "Exception.h"
+
+#if GOOGLE_PERFTOOLS_REGISTER_THREAD
+#  include <google/profiler.h>
+#endif
+
+#include <assert.h>
+#include <pthread.h>
+
+#include <iostream>
+
+#include <boost/weak_ptr.hpp>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+using boost::shared_ptr;
+using boost::weak_ptr;
+
+/**
+ * The POSIX thread class.
+ *
+ * @version $Id:$
+ */
+class PthreadThread: public Thread {
+ public:
+
+  enum STATE {
+    uninitialized,
+    starting,
+    started,
+    stopping,
+    stopped
+  };
+
+  static const int MB = 1024 * 1024;
+
+  static void* threadMain(void* arg);
+
+ private:
+  pthread_t pthread_;
+  STATE state_;
+  int policy_;
+  int priority_;
+  int stackSize_;
+  weak_ptr<PthreadThread> self_;
+  bool detached_;
+
+ public:
+
+  PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr<Runnable> runnable) :
+    pthread_(0),
+    state_(uninitialized),
+    policy_(policy),
+    priority_(priority),
+    stackSize_(stackSize),
+    detached_(detached) {
+
+    this->Thread::runnable(runnable);
+  }
+
+  ~PthreadThread() {
+    /* Nothing references this thread, if is is not detached, do a join
+       now, otherwise the thread-id and, possibly, other resources will
+       be leaked. */
+    if(!detached_) {
+      try {
+        join();
+      } catch(...) {
+        // We're really hosed.
+      }
+    }
+  }
+
+  void start() {
+    if (state_ != uninitialized) {
+      return;
+    }
+
+    pthread_attr_t thread_attr;
+    if (pthread_attr_init(&thread_attr) != 0) {
+        throw SystemResourceException("pthread_attr_init failed");
+    }
+
+    if(pthread_attr_setdetachstate(&thread_attr,
+                                   detached_ ?
+                                   PTHREAD_CREATE_DETACHED :
+                                   PTHREAD_CREATE_JOINABLE) != 0) {
+        throw SystemResourceException("pthread_attr_setdetachstate failed");
+    }
+
+    // Set thread stack size
+    if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
+      throw SystemResourceException("pthread_attr_setstacksize failed");
+    }
+
+    // Set thread policy
+    if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
+      throw SystemResourceException("pthread_attr_setschedpolicy failed");
+    }
+
+    struct sched_param sched_param;
+    sched_param.sched_priority = priority_;
+
+    // Set thread priority
+    if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
+      throw SystemResourceException("pthread_attr_setschedparam failed");
+    }
+
+    // Create reference
+    shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
+    *selfRef = self_.lock();
+
+    state_ = starting;
+
+    if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
+      throw SystemResourceException("pthread_create failed");
+    }
+  }
+
+  void join() {
+    if (!detached_ && state_ != uninitialized) {
+      void* ignore;
+      /* XXX
+         If join fails it is most likely due to the fact
+         that the last reference was the thread itself and cannot
+         join.  This results in leaked threads and will eventually
+         cause the process to run out of thread resources.
+         We're beyond the point of throwing an exception.  Not clear how
+         best to handle this. */
+      detached_ = pthread_join(pthread_, &ignore) == 0;
+    }
+  }
+
+  Thread::id_t getId() {
+    return (Thread::id_t)pthread_;
+  }
+
+  shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
+
+  void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
+
+  void weakRef(shared_ptr<PthreadThread> self) {
+    assert(self.get() == this);
+    self_ = weak_ptr<PthreadThread>(self);
+  }
+};
+
+void* PthreadThread::threadMain(void* arg) {
+  shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
+  delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
+
+  if (thread == NULL) {
+    return (void*)0;
+  }
+
+  if (thread->state_ != starting) {
+    return (void*)0;
+  }
+
+#if GOOGLE_PERFTOOLS_REGISTER_THREAD
+  ProfilerRegisterThread();
+#endif
+
+  thread->state_ = starting;
+  thread->runnable()->run();
+  if (thread->state_ != stopping && thread->state_ != stopped) {
+    thread->state_ = stopping;
+  }
+
+  return (void*)0;
+}
+
+/**
+ * POSIX Thread factory implementation
+ */
+class PosixThreadFactory::Impl {
+
+ private:
+  POLICY policy_;
+  PRIORITY priority_;
+  int stackSize_;
+  bool detached_;
+
+  /**
+   * Converts generic posix thread schedule policy enums into pthread
+   * API values.
+   */
+  static int toPthreadPolicy(POLICY policy) {
+    switch (policy) {
+    case OTHER:
+      return SCHED_OTHER;
+    case FIFO:
+      return SCHED_FIFO;
+    case ROUND_ROBIN:
+      return SCHED_RR;
+    }
+    return SCHED_OTHER;
+  }
+
+  /**
+   * Converts relative thread priorities to absolute value based on posix
+   * thread scheduler policy
+   *
+   *  The idea is simply to divide up the priority range for the given policy
+   * into the correpsonding relative priority level (lowest..highest) and
+   * then pro-rate accordingly.
+   */
+  static int toPthreadPriority(POLICY policy, PRIORITY priority) {
+    int pthread_policy = toPthreadPolicy(policy);
+    int min_priority = sched_get_priority_min(pthread_policy);
+    int max_priority = sched_get_priority_max(pthread_policy);
+    int quanta = (HIGHEST - LOWEST) + 1;
+    float stepsperquanta = (max_priority - min_priority) / quanta;
+
+    if (priority <= HIGHEST) {
+      return (int)(min_priority + stepsperquanta * priority);
+    } else {
+      // should never get here for priority increments.
+      assert(false);
+      return (int)(min_priority + stepsperquanta * NORMAL);
+    }
+  }
+
+ public:
+
+  Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
+    policy_(policy),
+    priority_(priority),
+    stackSize_(stackSize),
+    detached_(detached) {}
+
+  /**
+   * Creates a new POSIX thread to run the runnable object
+   *
+   * @param runnable A runnable object
+   */
+  shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
+    shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable));
+    result->weakRef(result);
+    runnable->thread(result);
+    return result;
+  }
+
+  int getStackSize() const { return stackSize_; }
+
+  void setStackSize(int value) { stackSize_ = value; }
+
+  PRIORITY getPriority() const { return priority_; }
+
+  /**
+   * Sets priority.
+   *
+   *  XXX
+   *  Need to handle incremental priorities properly.
+   */
+  void setPriority(PRIORITY value) { priority_ = value; }
+
+  bool isDetached() const { return detached_; }
+
+  void setDetached(bool value) { detached_ = value; }
+
+  Thread::id_t getCurrentThreadId() const {
+    // TODO(dreiss): Stop using C-style casts.
+    return (id_t)pthread_self();
+  }
+
+};
+
+PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
+  impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
+
+shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
+
+int PosixThreadFactory::getStackSize() const { return impl_->getStackSize(); }
+
+void PosixThreadFactory::setStackSize(int value) { impl_->setStackSize(value); }
+
+PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { return impl_->getPriority(); }
+
+void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { impl_->setPriority(value); }
+
+bool PosixThreadFactory::isDetached() const { return impl_->isDetached(); }
+
+void PosixThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
+
+Thread::id_t PosixThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
+
+}}} // apache::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h
new file mode 100644
index 0000000..d6d83a3
--- /dev/null
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.h
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_
+#define _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_ 1
+
+#include "Thread.h"
+
+#include <boost/shared_ptr.hpp>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * A thread factory to create posix threads
+ *
+ * @version $Id:$
+ */
+class PosixThreadFactory : public ThreadFactory {
+
+ public:
+
+  /**
+   * POSIX Thread scheduler policies
+   */
+  enum POLICY {
+    OTHER,
+    FIFO,
+    ROUND_ROBIN
+  };
+
+  /**
+   * POSIX Thread scheduler relative priorities,
+   *
+   * Absolute priority is determined by scheduler policy and OS. This
+   * enumeration specifies relative priorities such that one can specify a
+   * priority withing a giving scheduler policy without knowing the absolute
+   * value of the priority.
+   */
+  enum PRIORITY {
+    LOWEST = 0,
+    LOWER = 1,
+    LOW = 2,
+    NORMAL = 3,
+    HIGH = 4,
+    HIGHER = 5,
+    HIGHEST = 6,
+    INCREMENT = 7,
+    DECREMENT = 8
+  };
+
+  /**
+   * Posix thread (pthread) factory.  All threads created by a factory are reference-counted
+   * via boost::shared_ptr and boost::weak_ptr.  The factory guarantees that threads and
+   * the Runnable tasks they host will be properly cleaned up once the last strong reference
+   * to both is given up.
+   *
+   * Threads are created with the specified policy, priority, stack-size and detachable-mode
+   * detached means the thread is free-running and will release all system resources the
+   * when it completes.  A detachable thread is not joinable.  The join method
+   * of a detachable thread will return immediately with no error.
+   *
+   * By default threads are not joinable.
+   */
+
+  PosixThreadFactory(POLICY policy=ROUND_ROBIN, PRIORITY priority=NORMAL, int stackSize=1, bool detached=true);
+
+  // From ThreadFactory;
+  boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const;
+
+  // From ThreadFactory;
+  Thread::id_t getCurrentThreadId() const;
+
+  /**
+   * Gets stack size for created threads
+   *
+   * @return int size in megabytes
+   */
+  virtual int getStackSize() const;
+
+  /**
+   * Sets stack size for created threads
+   *
+   * @param value size in megabytes
+   */
+  virtual void setStackSize(int value);
+
+  /**
+   * Gets priority relative to current policy
+   */
+  virtual PRIORITY getPriority() const;
+
+  /**
+   * Sets priority relative to current policy
+   */
+  virtual void setPriority(PRIORITY priority);
+
+  /**
+   * Sets detached mode of threads
+   */
+  virtual void setDetached(bool detached);
+
+  /**
+   * Gets current detached mode
+   */
+  virtual bool isDetached() const;
+
+ private:
+  class Impl;
+  boost::shared_ptr<Impl> impl_;
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_POSIXTHREADFACTORY_H_
diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h
new file mode 100644
index 0000000..d4282ad
--- /dev/null
+++ b/lib/cpp/src/concurrency/Thread.h
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_THREAD_H_
+#define _THRIFT_CONCURRENCY_THREAD_H_ 1
+
+#include <stdint.h>
+#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+class Thread;
+
+/**
+ * Minimal runnable class.  More or less analogous to java.lang.Runnable.
+ *
+ * @version $Id:$
+ */
+class Runnable {
+
+ public:
+  virtual ~Runnable() {};
+  virtual void run() = 0;
+
+  /**
+   * Gets the thread object that is hosting this runnable object  - can return
+   * an empty boost::shared pointer if no references remain on thet thread  object
+   */
+  virtual boost::shared_ptr<Thread> thread() { return thread_.lock(); }
+
+  /**
+   * Sets the thread that is executing this object.  This is only meant for
+   * use by concrete implementations of Thread.
+   */
+  virtual void thread(boost::shared_ptr<Thread> value) { thread_ = value; }
+
+ private:
+  boost::weak_ptr<Thread> thread_;
+};
+
+/**
+ * Minimal thread class. Returned by thread factory bound to a Runnable object
+ * and ready to start execution.  More or less analogous to java.lang.Thread
+ * (minus all the thread group, priority, mode and other baggage, since that
+ * is difficult to abstract across platforms and is left for platform-specific
+ * ThreadFactory implemtations to deal with
+ *
+ * @see apache::thrift::concurrency::ThreadFactory)
+ */
+class Thread {
+
+ public:
+
+  typedef uint64_t id_t;
+
+  virtual ~Thread() {};
+
+  /**
+   * Starts the thread. Does platform specific thread creation and
+   * configuration then invokes the run method of the Runnable object bound
+   * to this thread.
+   */
+  virtual void start() = 0;
+
+  /**
+   * Join this thread. Current thread blocks until this target thread
+   * completes.
+   */
+  virtual void join() = 0;
+
+  /**
+   * Gets the thread's platform-specific ID
+   */
+  virtual id_t getId() = 0;
+
+  /**
+   * Gets the runnable object this thread is hosting
+   */
+  virtual boost::shared_ptr<Runnable> runnable() const { return _runnable; }
+
+ protected:
+  virtual void runnable(boost::shared_ptr<Runnable> value) { _runnable = value; }
+
+ private:
+  boost::shared_ptr<Runnable> _runnable;
+
+};
+
+/**
+ * Factory to create platform-specific thread object and bind them to Runnable
+ * object for execution
+ */
+class ThreadFactory {
+
+ public:
+  virtual ~ThreadFactory() {}
+  virtual boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const = 0;
+
+  /** Gets the current thread id or unknown_thread_id if the current thread is not a thrift thread */
+
+  static const Thread::id_t unknown_thread_id;
+
+  virtual Thread::id_t getCurrentThreadId() const = 0;
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_THREAD_H_
diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp
new file mode 100644
index 0000000..abfcf6e
--- /dev/null
+++ b/lib/cpp/src/concurrency/ThreadManager.cpp
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "ThreadManager.h"
+#include "Exception.h"
+#include "Monitor.h"
+
+#include <boost/shared_ptr.hpp>
+
+#include <assert.h>
+#include <queue>
+#include <set>
+
+#if defined(DEBUG)
+#include <iostream>
+#endif //defined(DEBUG)
+
+namespace apache { namespace thrift { namespace concurrency {
+
+using boost::shared_ptr;
+using boost::dynamic_pointer_cast;
+
+/**
+ * ThreadManager class
+ *
+ * This class manages a pool of threads. It uses a ThreadFactory to create
+ * threads.  It never actually creates or destroys worker threads, rather
+ * it maintains statistics on number of idle threads, number of active threads,
+ * task backlog, and average wait and service times.
+ *
+ * @version $Id:$
+ */
+class ThreadManager::Impl : public ThreadManager  {
+
+ public:
+  Impl() :
+    workerCount_(0),
+    workerMaxCount_(0),
+    idleCount_(0),
+    pendingTaskCountMax_(0),
+    state_(ThreadManager::UNINITIALIZED) {}
+
+  ~Impl() { stop(); }
+
+  void start();
+
+  void stop() { stopImpl(false); }
+
+  void join() { stopImpl(true); }
+
+  const ThreadManager::STATE state() const {
+    return state_;
+  }
+
+  shared_ptr<ThreadFactory> threadFactory() const {
+    Synchronized s(monitor_);
+    return threadFactory_;
+  }
+
+  void threadFactory(shared_ptr<ThreadFactory> value) {
+    Synchronized s(monitor_);
+    threadFactory_ = value;
+  }
+
+  void addWorker(size_t value);
+
+  void removeWorker(size_t value);
+
+  size_t idleWorkerCount() const {
+    return idleCount_;
+  }
+
+  size_t workerCount() const {
+    Synchronized s(monitor_);
+    return workerCount_;
+  }
+
+  size_t pendingTaskCount() const {
+    Synchronized s(monitor_);
+    return tasks_.size();
+  }
+
+  size_t totalTaskCount() const {
+    Synchronized s(monitor_);
+    return tasks_.size() + workerCount_ - idleCount_;
+  }
+
+  size_t pendingTaskCountMax() const {
+    Synchronized s(monitor_);
+    return pendingTaskCountMax_;
+  }
+
+  void pendingTaskCountMax(const size_t value) {
+    Synchronized s(monitor_);
+    pendingTaskCountMax_ = value;
+  }
+
+  bool canSleep();
+
+  void add(shared_ptr<Runnable> value, int64_t timeout);
+
+  void remove(shared_ptr<Runnable> task);
+
+private:
+  void stopImpl(bool join);
+
+  size_t workerCount_;
+  size_t workerMaxCount_;
+  size_t idleCount_;
+  size_t pendingTaskCountMax_;
+
+  ThreadManager::STATE state_;
+  shared_ptr<ThreadFactory> threadFactory_;
+
+
+  friend class ThreadManager::Task;
+  std::queue<shared_ptr<Task> > tasks_;
+  Monitor monitor_;
+  Monitor workerMonitor_;
+
+  friend class ThreadManager::Worker;
+  std::set<shared_ptr<Thread> > workers_;
+  std::set<shared_ptr<Thread> > deadWorkers_;
+  std::map<const Thread::id_t, shared_ptr<Thread> > idMap_;
+};
+
+class ThreadManager::Task : public Runnable {
+
+ public:
+  enum STATE {
+    WAITING,
+    EXECUTING,
+    CANCELLED,
+    COMPLETE
+  };
+
+  Task(shared_ptr<Runnable> runnable) :
+    runnable_(runnable),
+    state_(WAITING) {}
+
+  ~Task() {}
+
+  void run() {
+    if (state_ == EXECUTING) {
+      runnable_->run();
+      state_ = COMPLETE;
+    }
+  }
+
+ private:
+  shared_ptr<Runnable> runnable_;
+  friend class ThreadManager::Worker;
+  STATE state_;
+};
+
+class ThreadManager::Worker: public Runnable {
+  enum STATE {
+    UNINITIALIZED,
+    STARTING,
+    STARTED,
+    STOPPING,
+    STOPPED
+  };
+
+ public:
+  Worker(ThreadManager::Impl* manager) :
+    manager_(manager),
+    state_(UNINITIALIZED),
+    idle_(false) {}
+
+  ~Worker() {}
+
+ private:
+  bool isActive() const {
+    return
+      (manager_->workerCount_ <= manager_->workerMaxCount_) ||
+      (manager_->state_ == JOINING && !manager_->tasks_.empty());
+  }
+
+ public:
+  /**
+   * Worker entry point
+   *
+   * As long as worker thread is running, pull tasks off the task queue and
+   * execute.
+   */
+  void run() {
+    bool active = false;
+    bool notifyManager = false;
+
+    /**
+     * Increment worker semaphore and notify manager if worker count reached
+     * desired max
+     *
+     * Note: We have to release the monitor and acquire the workerMonitor
+     * since that is what the manager blocks on for worker add/remove
+     */
+    {
+      Synchronized s(manager_->monitor_);
+      active = manager_->workerCount_ < manager_->workerMaxCount_;
+      if (active) {
+        manager_->workerCount_++;
+        notifyManager = manager_->workerCount_ == manager_->workerMaxCount_;
+      }
+    }
+
+    if (notifyManager) {
+      Synchronized s(manager_->workerMonitor_);
+      manager_->workerMonitor_.notify();
+      notifyManager = false;
+    }
+
+    while (active) {
+      shared_ptr<ThreadManager::Task> task;
+
+      /**
+       * While holding manager monitor block for non-empty task queue (Also
+       * check that the thread hasn't been requested to stop). Once the queue
+       * is non-empty, dequeue a task, release monitor, and execute. If the
+       * worker max count has been decremented such that we exceed it, mark
+       * ourself inactive, decrement the worker count and notify the manager
+       * (technically we're notifying the next blocked thread but eventually
+       * the manager will see it.
+       */
+      {
+        Synchronized s(manager_->monitor_);
+        active = isActive();
+
+        while (active && manager_->tasks_.empty()) {
+          manager_->idleCount_++;
+          idle_ = true;
+          manager_->monitor_.wait();
+          active = isActive();
+          idle_ = false;
+          manager_->idleCount_--;
+        }
+
+        if (active) {
+          if (!manager_->tasks_.empty()) {
+            task = manager_->tasks_.front();
+            manager_->tasks_.pop();
+            if (task->state_ == ThreadManager::Task::WAITING) {
+              task->state_ = ThreadManager::Task::EXECUTING;
+            }
+
+            /* If we have a pending task max and we just dropped below it, wakeup any
+               thread that might be blocked on add. */
+            if (manager_->pendingTaskCountMax_ != 0 &&
+                manager_->tasks_.size() == manager_->pendingTaskCountMax_ - 1) {
+              manager_->monitor_.notify();
+            }
+          }
+        } else {
+          idle_ = true;
+          manager_->workerCount_--;
+          notifyManager = (manager_->workerCount_ == manager_->workerMaxCount_);
+        }
+      }
+
+      if (task != NULL) {
+        if (task->state_ == ThreadManager::Task::EXECUTING) {
+          try {
+            task->run();
+          } catch(...) {
+            // XXX need to log this
+          }
+        }
+      }
+    }
+
+    {
+      Synchronized s(manager_->workerMonitor_);
+      manager_->deadWorkers_.insert(this->thread());
+      if (notifyManager) {
+        manager_->workerMonitor_.notify();
+      }
+    }
+
+    return;
+  }
+
+  private:
+    ThreadManager::Impl* manager_;
+    friend class ThreadManager::Impl;
+    STATE state_;
+    bool idle_;
+};
+
+
+  void ThreadManager::Impl::addWorker(size_t value) {
+  std::set<shared_ptr<Thread> > newThreads;
+  for (size_t ix = 0; ix < value; ix++) {
+    class ThreadManager::Worker;
+    shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
+    newThreads.insert(threadFactory_->newThread(worker));
+  }
+
+  {
+    Synchronized s(monitor_);
+    workerMaxCount_ += value;
+    workers_.insert(newThreads.begin(), newThreads.end());
+  }
+
+  for (std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
+    shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
+    worker->state_ = ThreadManager::Worker::STARTING;
+    (*ix)->start();
+    idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
+  }
+
+  {
+    Synchronized s(workerMonitor_);
+    while (workerCount_ != workerMaxCount_) {
+      workerMonitor_.wait();
+    }
+  }
+}
+
+void ThreadManager::Impl::start() {
+
+  if (state_ == ThreadManager::STOPPED) {
+    return;
+  }
+
+  {
+    Synchronized s(monitor_);
+    if (state_ == ThreadManager::UNINITIALIZED) {
+      if (threadFactory_ == NULL) {
+        throw InvalidArgumentException();
+      }
+      state_ = ThreadManager::STARTED;
+      monitor_.notifyAll();
+    }
+
+    while (state_ == STARTING) {
+      monitor_.wait();
+    }
+  }
+}
+
+void ThreadManager::Impl::stopImpl(bool join) {
+  bool doStop = false;
+  if (state_ == ThreadManager::STOPPED) {
+    return;
+  }
+
+  {
+    Synchronized s(monitor_);
+    if (state_ != ThreadManager::STOPPING &&
+        state_ != ThreadManager::JOINING &&
+        state_ != ThreadManager::STOPPED) {
+      doStop = true;
+      state_ = join ? ThreadManager::JOINING : ThreadManager::STOPPING;
+    }
+  }
+
+  if (doStop) {
+    removeWorker(workerCount_);
+  }
+
+  // XXX
+  // should be able to block here for transition to STOPPED since we're no
+  // using shared_ptrs
+
+  {
+    Synchronized s(monitor_);
+    state_ = ThreadManager::STOPPED;
+  }
+
+}
+
+void ThreadManager::Impl::removeWorker(size_t value) {
+  std::set<shared_ptr<Thread> > removedThreads;
+  {
+    Synchronized s(monitor_);
+    if (value > workerMaxCount_) {
+      throw InvalidArgumentException();
+    }
+
+    workerMaxCount_ -= value;
+
+    if (idleCount_ < value) {
+      for (size_t ix = 0; ix < idleCount_; ix++) {
+        monitor_.notify();
+      }
+    } else {
+      monitor_.notifyAll();
+    }
+  }
+
+  {
+    Synchronized s(workerMonitor_);
+
+    while (workerCount_ != workerMaxCount_) {
+      workerMonitor_.wait();
+    }
+
+    for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
+      workers_.erase(*ix);
+      idMap_.erase((*ix)->getId());
+    }
+
+    deadWorkers_.clear();
+  }
+}
+
+  bool ThreadManager::Impl::canSleep() {
+    const Thread::id_t id = threadFactory_->getCurrentThreadId();
+    return idMap_.find(id) == idMap_.end();
+  }
+
+  void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout) {
+    Synchronized s(monitor_);
+
+    if (state_ != ThreadManager::STARTED) {
+      throw IllegalStateException();
+    }
+
+    if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
+      if (canSleep() && timeout >= 0) {
+        while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
+          monitor_.wait(timeout);
+        }
+      } else {
+        throw TooManyPendingTasksException();
+      }
+    }
+
+    tasks_.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
+
+    // If idle thread is available notify it, otherwise all worker threads are
+    // running and will get around to this task in time.
+    if (idleCount_ > 0) {
+      monitor_.notify();
+    }
+  }
+
+void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
+  Synchronized s(monitor_);
+  if (state_ != ThreadManager::STARTED) {
+    throw IllegalStateException();
+  }
+}
+
+class SimpleThreadManager : public ThreadManager::Impl {
+
+ public:
+  SimpleThreadManager(size_t workerCount=4, size_t pendingTaskCountMax=0) :
+    workerCount_(workerCount),
+    pendingTaskCountMax_(pendingTaskCountMax),
+    firstTime_(true) {
+  }
+
+  void start() {
+    ThreadManager::Impl::pendingTaskCountMax(pendingTaskCountMax_);
+    ThreadManager::Impl::start();
+    addWorker(workerCount_);
+  }
+
+ private:
+  const size_t workerCount_;
+  const size_t pendingTaskCountMax_;
+  bool firstTime_;
+  Monitor monitor_;
+};
+
+
+shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
+  return shared_ptr<ThreadManager>(new ThreadManager::Impl());
+}
+
+shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count, size_t pendingTaskCountMax) {
+  return shared_ptr<ThreadManager>(new SimpleThreadManager(count, pendingTaskCountMax));
+}
+
+}}} // apache::thrift::concurrency
+
diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h
new file mode 100644
index 0000000..6e5a178
--- /dev/null
+++ b/lib/cpp/src/concurrency/ThreadManager.h
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_
+#define _THRIFT_CONCURRENCY_THREADMANAGER_H_ 1
+
+#include <boost/shared_ptr.hpp>
+#include <sys/types.h>
+#include "Thread.h"
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * Thread Pool Manager and related classes
+ *
+ * @version $Id:$
+ */
+class ThreadManager;
+
+/**
+ * ThreadManager class
+ *
+ * This class manages a pool of threads. It uses a ThreadFactory to create
+ * threads. It never actually creates or destroys worker threads, rather
+ * It maintains statistics on number of idle threads, number of active threads,
+ * task backlog, and average wait and service times and informs the PoolPolicy
+ * object bound to instances of this manager of interesting transitions. It is
+ * then up the PoolPolicy object to decide if the thread pool size needs to be
+ * adjusted and call this object addWorker and removeWorker methods to make
+ * changes.
+ *
+ * This design allows different policy implementations to used this code to
+ * handle basic worker thread management and worker task execution and focus on
+ * policy issues. The simplest policy, StaticPolicy, does nothing other than
+ * create a fixed number of threads.
+ */
+class ThreadManager {
+
+ protected:
+  ThreadManager() {}
+
+ public:
+  virtual ~ThreadManager() {}
+
+  /**
+   * Starts the thread manager. Verifies all attributes have been properly
+   * initialized, then allocates necessary resources to begin operation
+   */
+  virtual void start() = 0;
+
+  /**
+   * Stops the thread manager. Aborts all remaining unprocessed task, shuts
+   * down all created worker threads, and realeases all allocated resources.
+   * This method blocks for all worker threads to complete, thus it can
+   * potentially block forever if a worker thread is running a task that
+   * won't terminate.
+   */
+  virtual void stop() = 0;
+
+  /**
+   * Joins the thread manager. This is the same as stop, except that it will
+   * block until all the workers have finished their work. At that point
+   * the ThreadManager will transition into the STOPPED state.
+   */
+  virtual void join() = 0;
+
+  enum STATE {
+    UNINITIALIZED,
+    STARTING,
+    STARTED,
+    JOINING,
+    STOPPING,
+    STOPPED
+  };
+
+  virtual const STATE state() const = 0;
+
+  virtual boost::shared_ptr<ThreadFactory> threadFactory() const = 0;
+
+  virtual void threadFactory(boost::shared_ptr<ThreadFactory> value) = 0;
+
+  virtual void addWorker(size_t value=1) = 0;
+
+  virtual void removeWorker(size_t value=1) = 0;
+
+  /**
+   * Gets the current number of idle worker threads
+   */
+  virtual size_t idleWorkerCount() const = 0;
+
+  /**
+   * Gets the current number of total worker threads
+   */
+  virtual size_t workerCount() const = 0;
+
+  /**
+   * Gets the current number of pending tasks
+   */
+  virtual size_t pendingTaskCount() const  = 0;
+
+  /**
+   * Gets the current number of pending and executing tasks
+   */
+  virtual size_t totalTaskCount() const = 0;
+
+  /**
+   * Gets the maximum pending task count.  0 indicates no maximum
+   */
+  virtual size_t pendingTaskCountMax() const = 0;
+
+  /**
+   * Adds a task to be executed at some time in the future by a worker thread.
+   *
+   * This method will block if pendingTaskCountMax() in not zero and pendingTaskCount()
+   * is greater than or equalt to pendingTaskCountMax().  If this method is called in the
+   * context of a ThreadManager worker thread it will throw a
+   * TooManyPendingTasksException
+   *
+   * @param task  The task to queue for execution
+   *
+   * @param timeout Time to wait in milliseconds to add a task when a pending-task-count
+   * is specified. Specific cases:
+   * timeout = 0  : Wait forever to queue task.
+   * timeout = -1 : Return immediately if pending task count exceeds specified max
+   *
+   * @throws TooManyPendingTasksException Pending task count exceeds max pending task count
+   */
+  virtual void add(boost::shared_ptr<Runnable>task, int64_t timeout=0LL) = 0;
+
+  /**
+   * Removes a pending task
+   */
+  virtual void remove(boost::shared_ptr<Runnable> task) = 0;
+
+  static boost::shared_ptr<ThreadManager> newThreadManager();
+
+  /**
+   * Creates a simple thread manager the uses count number of worker threads and has
+   * a pendingTaskCountMax maximum pending tasks. The default, 0, specified no limit
+   * on pending tasks
+   */
+  static boost::shared_ptr<ThreadManager> newSimpleThreadManager(size_t count=4, size_t pendingTaskCountMax=0);
+
+  class Task;
+
+  class Worker;
+
+  class Impl;
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_THREADMANAGER_H_
diff --git a/lib/cpp/src/concurrency/TimerManager.cpp b/lib/cpp/src/concurrency/TimerManager.cpp
new file mode 100644
index 0000000..25515dc
--- /dev/null
+++ b/lib/cpp/src/concurrency/TimerManager.cpp
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "TimerManager.h"
+#include "Exception.h"
+#include "Util.h"
+
+#include <assert.h>
+#include <iostream>
+#include <set>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+using boost::shared_ptr;
+
+typedef std::multimap<int64_t, shared_ptr<TimerManager::Task> >::iterator task_iterator;
+typedef std::pair<task_iterator, task_iterator> task_range;
+
+/**
+ * TimerManager class
+ *
+ * @version $Id:$
+ */
+class TimerManager::Task : public Runnable {
+
+ public:
+  enum STATE {
+    WAITING,
+    EXECUTING,
+    CANCELLED,
+    COMPLETE
+  };
+
+  Task(shared_ptr<Runnable> runnable) :
+    runnable_(runnable),
+    state_(WAITING) {}
+
+  ~Task() {
+  }
+
+  void run() {
+    if (state_ == EXECUTING) {
+      runnable_->run();
+      state_ = COMPLETE;
+    }
+  }
+
+ private:
+  shared_ptr<Runnable> runnable_;
+  class TimerManager::Dispatcher;
+  friend class TimerManager::Dispatcher;
+  STATE state_;
+};
+
+class TimerManager::Dispatcher: public Runnable {
+
+ public:
+  Dispatcher(TimerManager* manager) :
+    manager_(manager) {}
+
+  ~Dispatcher() {}
+
+  /**
+   * Dispatcher entry point
+   *
+   * As long as dispatcher thread is running, pull tasks off the task taskMap_
+   * and execute.
+   */
+  void run() {
+    {
+      Synchronized s(manager_->monitor_);
+      if (manager_->state_ == TimerManager::STARTING) {
+        manager_->state_ = TimerManager::STARTED;
+        manager_->monitor_.notifyAll();
+      }
+    }
+
+    do {
+      std::set<shared_ptr<TimerManager::Task> > expiredTasks;
+      {
+        Synchronized s(manager_->monitor_);
+        task_iterator expiredTaskEnd;
+        int64_t now = Util::currentTime();
+        while (manager_->state_ == TimerManager::STARTED &&
+               (expiredTaskEnd = manager_->taskMap_.upper_bound(now)) == manager_->taskMap_.begin()) {
+          int64_t timeout = 0LL;
+          if (!manager_->taskMap_.empty()) {
+            timeout = manager_->taskMap_.begin()->first - now;
+          }
+          assert((timeout != 0 && manager_->taskCount_ > 0) || (timeout == 0 && manager_->taskCount_ == 0));
+          try {
+            manager_->monitor_.wait(timeout);
+          } catch (TimedOutException &e) {}
+          now = Util::currentTime();
+        }
+
+        if (manager_->state_ == TimerManager::STARTED) {
+          for (task_iterator ix = manager_->taskMap_.begin(); ix != expiredTaskEnd; ix++) {
+            shared_ptr<TimerManager::Task> task = ix->second;
+            expiredTasks.insert(task);
+            if (task->state_ == TimerManager::Task::WAITING) {
+              task->state_ = TimerManager::Task::EXECUTING;
+            }
+            manager_->taskCount_--;
+          }
+          manager_->taskMap_.erase(manager_->taskMap_.begin(), expiredTaskEnd);
+        }
+      }
+
+      for (std::set<shared_ptr<Task> >::iterator ix =  expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
+        (*ix)->run();
+      }
+
+    } while (manager_->state_ == TimerManager::STARTED);
+
+    {
+      Synchronized s(manager_->monitor_);
+      if (manager_->state_ == TimerManager::STOPPING) {
+        manager_->state_ = TimerManager::STOPPED;
+        manager_->monitor_.notify();
+      }
+    }
+    return;
+  }
+
+ private:
+  TimerManager* manager_;
+  friend class TimerManager;
+};
+
+TimerManager::TimerManager() :
+  taskCount_(0),
+  state_(TimerManager::UNINITIALIZED),
+  dispatcher_(shared_ptr<Dispatcher>(new Dispatcher(this))) {
+}
+
+
+TimerManager::~TimerManager() {
+
+  // If we haven't been explicitly stopped, do so now.  We don't need to grab
+  // the monitor here, since stop already takes care of reentrancy.
+
+  if (state_ != STOPPED) {
+    try {
+      stop();
+    } catch(...) {
+      throw;
+      // uhoh
+    }
+  }
+}
+
+void TimerManager::start() {
+  bool doStart = false;
+  {
+    Synchronized s(monitor_);
+    if (threadFactory_ == NULL) {
+      throw InvalidArgumentException();
+    }
+    if (state_ == TimerManager::UNINITIALIZED) {
+      state_ = TimerManager::STARTING;
+      doStart = true;
+    }
+  }
+
+  if (doStart) {
+    dispatcherThread_ = threadFactory_->newThread(dispatcher_);
+    dispatcherThread_->start();
+  }
+
+  {
+    Synchronized s(monitor_);
+    while (state_ == TimerManager::STARTING) {
+      monitor_.wait();
+    }
+    assert(state_ != TimerManager::STARTING);
+  }
+}
+
+void TimerManager::stop() {
+  bool doStop = false;
+  {
+    Synchronized s(monitor_);
+    if (state_ == TimerManager::UNINITIALIZED) {
+      state_ = TimerManager::STOPPED;
+    } else if (state_ != STOPPING &&  state_ != STOPPED) {
+      doStop = true;
+      state_ = STOPPING;
+      monitor_.notifyAll();
+    }
+    while (state_ != STOPPED) {
+      monitor_.wait();
+    }
+  }
+
+  if (doStop) {
+    // Clean up any outstanding tasks
+    for (task_iterator ix =  taskMap_.begin(); ix != taskMap_.end(); ix++) {
+      taskMap_.erase(ix);
+    }
+
+    // Remove dispatcher's reference to us.
+    dispatcher_->manager_ = NULL;
+  }
+}
+
+shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
+  Synchronized s(monitor_);
+  return threadFactory_;
+}
+
+void TimerManager::threadFactory(shared_ptr<const ThreadFactory>  value) {
+  Synchronized s(monitor_);
+  threadFactory_ = value;
+}
+
+size_t TimerManager::taskCount() const {
+  return taskCount_;
+}
+
+void TimerManager::add(shared_ptr<Runnable> task, int64_t timeout) {
+  int64_t now = Util::currentTime();
+  timeout += now;
+
+  {
+    Synchronized s(monitor_);
+    if (state_ != TimerManager::STARTED) {
+      throw IllegalStateException();
+    }
+
+    taskCount_++;
+    taskMap_.insert(std::pair<int64_t, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
+
+    // If the task map was empty, or if we have an expiration that is earlier
+    // than any previously seen, kick the dispatcher so it can update its
+    // timeout
+    if (taskCount_ == 1 || timeout < taskMap_.begin()->first) {
+      monitor_.notify();
+    }
+  }
+}
+
+void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) {
+
+  int64_t expiration;
+  Util::toMilliseconds(expiration, value);
+
+  int64_t now = Util::currentTime();
+
+  if (expiration < now) {
+    throw  InvalidArgumentException();
+  }
+
+  add(task, expiration - now);
+}
+
+
+void TimerManager::remove(shared_ptr<Runnable> task) {
+  Synchronized s(monitor_);
+  if (state_ != TimerManager::STARTED) {
+    throw IllegalStateException();
+  }
+}
+
+const TimerManager::STATE TimerManager::state() const { return state_; }
+
+}}} // apache::thrift::concurrency
+
diff --git a/lib/cpp/src/concurrency/TimerManager.h b/lib/cpp/src/concurrency/TimerManager.h
new file mode 100644
index 0000000..f3f799f
--- /dev/null
+++ b/lib/cpp/src/concurrency/TimerManager.h
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_TIMERMANAGER_H_
+#define _THRIFT_CONCURRENCY_TIMERMANAGER_H_ 1
+
+#include "Exception.h"
+#include "Monitor.h"
+#include "Thread.h"
+
+#include <boost/shared_ptr.hpp>
+#include <map>
+#include <time.h>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * Timer Manager
+ *
+ * This class dispatches timer tasks when they fall due.
+ *
+ * @version $Id:$
+ */
+class TimerManager {
+
+ public:
+
+  TimerManager();
+
+  virtual ~TimerManager();
+
+  virtual boost::shared_ptr<const ThreadFactory> threadFactory() const;
+
+  virtual void threadFactory(boost::shared_ptr<const ThreadFactory> value);
+
+  /**
+   * Starts the timer manager service
+   *
+   * @throws IllegalArgumentException Missing thread factory attribute
+   */
+  virtual void start();
+
+  /**
+   * Stops the timer manager service
+   */
+  virtual void stop();
+
+  virtual size_t taskCount() const ;
+
+  /**
+   * Adds a task to be executed at some time in the future by a worker thread.
+   *
+   * @param task The task to execute
+   * @param timeout Time in milliseconds to delay before executing task
+   */
+  virtual void add(boost::shared_ptr<Runnable> task, int64_t timeout);
+
+  /**
+   * Adds a task to be executed at some time in the future by a worker thread.
+   *
+   * @param task The task to execute
+   * @param timeout Absolute time in the future to execute task.
+   */
+  virtual void add(boost::shared_ptr<Runnable> task, const struct timespec& timeout);
+
+  /**
+   * Removes a pending task
+   *
+   * @throws NoSuchTaskException Specified task doesn't exist. It was either
+   *                             processed already or this call was made for a
+   *                             task that was never added to this timer
+   *
+   * @throws UncancellableTaskException Specified task is already being
+   *                                    executed or has completed execution.
+   */
+  virtual void remove(boost::shared_ptr<Runnable> task);
+
+  enum STATE {
+    UNINITIALIZED,
+    STARTING,
+    STARTED,
+    STOPPING,
+    STOPPED
+  };
+
+  virtual const STATE state() const;
+
+ private:
+  boost::shared_ptr<const ThreadFactory> threadFactory_;
+  class Task;
+  friend class Task;
+  std::multimap<int64_t, boost::shared_ptr<Task> > taskMap_;
+  size_t taskCount_;
+  Monitor monitor_;
+  STATE state_;
+  class Dispatcher;
+  friend class Dispatcher;
+  boost::shared_ptr<Dispatcher> dispatcher_;
+  boost::shared_ptr<Thread> dispatcherThread_;
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_TIMERMANAGER_H_
diff --git a/lib/cpp/src/concurrency/Util.cpp b/lib/cpp/src/concurrency/Util.cpp
new file mode 100644
index 0000000..1c44937
--- /dev/null
+++ b/lib/cpp/src/concurrency/Util.cpp
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "Util.h"
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#if defined(HAVE_CLOCK_GETTIME)
+#include <time.h>
+#elif defined(HAVE_GETTIMEOFDAY)
+#include <sys/time.h>
+#endif // defined(HAVE_CLOCK_GETTIME)
+
+namespace apache { namespace thrift { namespace concurrency {
+
+const int64_t Util::currentTime() {
+  int64_t result;
+
+#if defined(HAVE_CLOCK_GETTIME)
+  struct timespec now;
+  int ret = clock_gettime(CLOCK_REALTIME, &now);
+  assert(ret == 0);
+  toMilliseconds(result, now);
+#elif defined(HAVE_GETTIMEOFDAY)
+  struct timeval now;
+  int ret = gettimeofday(&now, NULL);
+  assert(ret == 0);
+  toMilliseconds(result, now);
+#else
+#error "No high-precision clock is available."
+#endif // defined(HAVE_CLOCK_GETTIME)
+
+  return result;
+}
+
+
+}}} // apache::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/Util.h b/lib/cpp/src/concurrency/Util.h
new file mode 100644
index 0000000..25fcc20
--- /dev/null
+++ b/lib/cpp/src/concurrency/Util.h
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _THRIFT_CONCURRENCY_UTIL_H_
+#define _THRIFT_CONCURRENCY_UTIL_H_ 1
+
+#include <assert.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <time.h>
+#include <sys/time.h>
+
+namespace apache { namespace thrift { namespace concurrency {
+
+/**
+ * Utility methods
+ *
+ * This class contains basic utility methods for converting time formats,
+ * and other common platform-dependent concurrency operations.
+ * It should not be included in API headers for other concurrency library
+ * headers, since it will, by definition, pull in all sorts of horrid
+ * platform dependent crap.  Rather it should be inluded directly in
+ * concurrency library implementation source.
+ *
+ * @version $Id:$
+ */
+class Util {
+
+  static const int64_t NS_PER_S = 1000000000LL;
+  static const int64_t US_PER_S = 1000000LL;
+  static const int64_t MS_PER_S = 1000LL;
+
+  static const int64_t NS_PER_MS = NS_PER_S / MS_PER_S;
+  static const int64_t US_PER_MS = US_PER_S / MS_PER_S;
+
+ public:
+
+  /**
+   * Converts millisecond timestamp into a timespec struct
+   *
+   * @param struct timespec& result
+   * @param time or duration in milliseconds
+   */
+  static void toTimespec(struct timespec& result, int64_t value) {
+    result.tv_sec = value / MS_PER_S; // ms to s
+    result.tv_nsec = (value % MS_PER_S) * NS_PER_MS; // ms to ns
+  }
+
+  static void toTimeval(struct timeval& result, int64_t value) {
+    result.tv_sec = value / MS_PER_S; // ms to s
+    result.tv_usec = (value % MS_PER_S) * US_PER_MS; // ms to us
+  }
+
+  /**
+   * Converts struct timespec to milliseconds
+   */
+  static const void toMilliseconds(int64_t& result, const struct timespec& value) {
+    result = (value.tv_sec * MS_PER_S) + (value.tv_nsec / NS_PER_MS);
+    // round up -- int64_t cast is to avoid a compiler error for some GCCs
+    if (int64_t(value.tv_nsec) % NS_PER_MS >= (NS_PER_MS / 2)) {
+      ++result;
+    }
+  }
+
+  /**
+   * Converts struct timeval to milliseconds
+   */
+  static const void toMilliseconds(int64_t& result, const struct timeval& value) {
+    result = (value.tv_sec * MS_PER_S) + (value.tv_usec / US_PER_MS);
+    // round up -- int64_t cast is to avoid a compiler error for some GCCs
+    if (int64_t(value.tv_usec) % US_PER_MS >= (US_PER_MS / 2)) {
+      ++result;
+    }
+  }
+
+  /**
+   * Get current time as milliseconds from epoch
+   */
+  static const int64_t currentTime();
+};
+
+}}} // apache::thrift::concurrency
+
+#endif // #ifndef _THRIFT_CONCURRENCY_UTIL_H_
diff --git a/lib/cpp/src/concurrency/test/Tests.cpp b/lib/cpp/src/concurrency/test/Tests.cpp
new file mode 100644
index 0000000..c80bb88
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/Tests.cpp
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <iostream>
+#include <vector>
+#include <string>
+
+#include "ThreadFactoryTests.h"
+#include "TimerManagerTests.h"
+#include "ThreadManagerTests.h"
+
+int main(int argc, char** argv) {
+
+  std::string arg;
+
+  std::vector<std::string>  args(argc - 1 > 1 ? argc - 1 : 1);
+
+  args[0] = "all";
+
+  for (int ix = 1; ix < argc; ix++) {
+    args[ix - 1] = std::string(argv[ix]);
+  }
+
+  bool runAll = args[0].compare("all") == 0;
+
+  if (runAll || args[0].compare("thread-factory") == 0) {
+
+    ThreadFactoryTests threadFactoryTests;
+
+    std::cout << "ThreadFactory tests..." << std::endl;
+
+    size_t count =  1000;
+    size_t floodLoops =  1;
+    size_t floodCount =  100000;
+
+    std::cout << "\t\tThreadFactory reap N threads test: N = " << count << std::endl;
+
+    assert(threadFactoryTests.reapNThreads(count));
+
+    std::cout << "\t\tThreadFactory floodN threads test: N = " << floodCount << std::endl;
+
+    assert(threadFactoryTests.floodNTest(floodLoops, floodCount));
+
+    std::cout << "\t\tThreadFactory synchronous start test" << std::endl;
+
+    assert(threadFactoryTests.synchStartTest());
+
+    std::cout << "\t\tThreadFactory monitor timeout test" << std::endl;
+
+    assert(threadFactoryTests.monitorTimeoutTest());
+  }
+
+  if (runAll || args[0].compare("util") == 0) {
+
+    std::cout << "Util tests..." << std::endl;
+
+    std::cout << "\t\tUtil minimum time" << std::endl;
+
+    int64_t time00 = Util::currentTime();
+    int64_t time01 = Util::currentTime();
+
+    std::cout << "\t\t\tMinimum time: " << time01 - time00 << "ms" << std::endl;
+
+    time00 = Util::currentTime();
+    time01 = time00;
+    size_t count = 0;
+
+    while (time01 < time00 + 10) {
+      count++;
+      time01 = Util::currentTime();
+    }
+
+    std::cout << "\t\t\tscall per ms: " << count / (time01 - time00) << std::endl;
+  }
+
+
+  if (runAll || args[0].compare("timer-manager") == 0) {
+
+    std::cout << "TimerManager tests..." << std::endl;
+
+    std::cout << "\t\tTimerManager test00" << std::endl;
+
+    TimerManagerTests timerManagerTests;
+
+    assert(timerManagerTests.test00());
+  }
+
+  if (runAll || args[0].compare("thread-manager") == 0) {
+
+    std::cout << "ThreadManager tests..." << std::endl;
+
+    {
+
+      size_t workerCount = 100;
+
+      size_t taskCount = 100000;
+
+      int64_t delay = 10LL;
+
+      std::cout << "\t\tThreadManager load test: worker count: " << workerCount << " task count: " << taskCount << " delay: " << delay << std::endl;
+
+      ThreadManagerTests threadManagerTests;
+
+      assert(threadManagerTests.loadTest(taskCount, delay, workerCount));
+
+      std::cout << "\t\tThreadManager block test: worker count: " << workerCount << " delay: " << delay << std::endl;
+
+      assert(threadManagerTests.blockTest(delay, workerCount));
+
+    }
+  }
+
+  if (runAll || args[0].compare("thread-manager-benchmark") == 0) {
+
+    std::cout << "ThreadManager benchmark tests..." << std::endl;
+
+    {
+
+      size_t minWorkerCount = 2;
+
+      size_t maxWorkerCount = 512;
+
+      size_t tasksPerWorker = 1000;
+
+      int64_t delay = 10LL;
+
+      for (size_t workerCount = minWorkerCount; workerCount < maxWorkerCount; workerCount*= 2) {
+
+        size_t taskCount = workerCount * tasksPerWorker;
+
+        std::cout << "\t\tThreadManager load test: worker count: " << workerCount << " task count: " << taskCount << " delay: " << delay << std::endl;
+
+        ThreadManagerTests threadManagerTests;
+
+        threadManagerTests.loadTest(taskCount, delay, workerCount);
+      }
+    }
+  }
+}
diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
new file mode 100644
index 0000000..859fbaf
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <config.h>
+#include <concurrency/Thread.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Util.h>
+
+#include <assert.h>
+#include <iostream>
+#include <set>
+
+namespace apache { namespace thrift { namespace concurrency { namespace test {
+
+using boost::shared_ptr;
+using namespace apache::thrift::concurrency;
+
+/**
+ * ThreadManagerTests class
+ *
+ * @version $Id:$
+ */
+class ThreadFactoryTests {
+
+public:
+
+  static const double ERROR;
+
+  class Task: public Runnable {
+
+  public:
+
+    Task() {}
+
+    void run() {
+      std::cout << "\t\t\tHello World" << std::endl;
+    }
+  };
+
+  /**
+   * Hello world test
+   */
+  bool helloWorldTest() {
+
+    PosixThreadFactory threadFactory = PosixThreadFactory();
+
+    shared_ptr<Task> task = shared_ptr<Task>(new ThreadFactoryTests::Task());
+
+    shared_ptr<Thread> thread = threadFactory.newThread(task);
+
+    thread->start();
+
+    thread->join();
+
+    std::cout << "\t\t\tSuccess!" << std::endl;
+
+    return true;
+  }
+
+  /**
+   * Reap N threads
+   */
+  class ReapNTask: public Runnable {
+
+   public:
+
+    ReapNTask(Monitor& monitor, int& activeCount) :
+      _monitor(monitor),
+      _count(activeCount) {}
+
+    void run() {
+      Synchronized s(_monitor);
+
+      _count--;
+
+      //std::cout << "\t\t\tthread count: " << _count << std::endl;
+
+      if (_count == 0) {
+        _monitor.notify();
+      }
+    }
+
+    Monitor& _monitor;
+
+    int& _count;
+  };
+
+  bool reapNThreads(int loop=1, int count=10) {
+
+    PosixThreadFactory threadFactory =  PosixThreadFactory();
+
+    Monitor* monitor = new Monitor();
+
+    for(int lix = 0; lix < loop; lix++) {
+
+      int* activeCount  = new int(count);
+
+      std::set<shared_ptr<Thread> > threads;
+
+      int tix;
+
+      for (tix = 0; tix < count; tix++) {
+        try {
+          threads.insert(threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, *activeCount))));
+        } catch(SystemResourceException& e) {
+          std::cout << "\t\t\tfailed to create " << lix * count + tix << " thread " << e.what() << std::endl;
+          throw e;
+        }
+      }
+
+      tix = 0;
+      for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); tix++, ++thread) {
+
+        try {
+          (*thread)->start();
+        } catch(SystemResourceException& e) {
+          std::cout << "\t\t\tfailed to start  " << lix * count + tix << " thread " << e.what() << std::endl;
+          throw e;
+        }
+      }
+
+      {
+        Synchronized s(*monitor);
+        while (*activeCount > 0) {
+          monitor->wait(1000);
+        }
+      }
+
+      for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
+        threads.erase(*thread);
+      }
+
+      std::cout << "\t\t\treaped " << lix * count << " threads" << std::endl;
+    }
+
+    std::cout << "\t\t\tSuccess!" << std::endl;
+
+    return true;
+  }
+
+  class SynchStartTask: public Runnable {
+
+   public:
+
+    enum STATE {
+      UNINITIALIZED,
+      STARTING,
+      STARTED,
+      STOPPING,
+      STOPPED
+    };
+
+    SynchStartTask(Monitor& monitor, volatile  STATE& state) :
+      _monitor(monitor),
+      _state(state) {}
+
+    void run() {
+      {
+        Synchronized s(_monitor);
+        if (_state == SynchStartTask::STARTING) {
+          _state = SynchStartTask::STARTED;
+          _monitor.notify();
+        }
+      }
+
+      {
+        Synchronized s(_monitor);
+        while (_state == SynchStartTask::STARTED) {
+          _monitor.wait();
+        }
+
+        if (_state == SynchStartTask::STOPPING) {
+          _state = SynchStartTask::STOPPED;
+          _monitor.notifyAll();
+        }
+      }
+    }
+
+   private:
+    Monitor& _monitor;
+    volatile  STATE& _state;
+  };
+
+  bool synchStartTest() {
+
+    Monitor monitor;
+
+    SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED;
+
+    shared_ptr<SynchStartTask> task = shared_ptr<SynchStartTask>(new SynchStartTask(monitor, state));
+
+    PosixThreadFactory threadFactory =  PosixThreadFactory();
+
+    shared_ptr<Thread> thread = threadFactory.newThread(task);
+
+    if (state == SynchStartTask::UNINITIALIZED) {
+
+      state = SynchStartTask::STARTING;
+
+      thread->start();
+    }
+
+    {
+      Synchronized s(monitor);
+      while (state == SynchStartTask::STARTING) {
+        monitor.wait();
+      }
+    }
+
+    assert(state != SynchStartTask::STARTING);
+
+    {
+      Synchronized s(monitor);
+
+      try {
+          monitor.wait(100);
+      } catch(TimedOutException& e) {
+      }
+
+      if (state == SynchStartTask::STARTED) {
+
+        state = SynchStartTask::STOPPING;
+
+        monitor.notify();
+      }
+
+      while (state == SynchStartTask::STOPPING) {
+        monitor.wait();
+      }
+    }
+
+    assert(state == SynchStartTask::STOPPED);
+
+    bool success = true;
+
+    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "!" << std::endl;
+
+    return true;
+  }
+
+  /** See how accurate monitor timeout is. */
+
+  bool monitorTimeoutTest(size_t count=1000, int64_t timeout=10) {
+
+    Monitor monitor;
+
+    int64_t startTime = Util::currentTime();
+
+    for (size_t ix = 0; ix < count; ix++) {
+      {
+        Synchronized s(monitor);
+        try {
+            monitor.wait(timeout);
+        } catch(TimedOutException& e) {
+        }
+      }
+    }
+
+    int64_t endTime = Util::currentTime();
+
+    double error = ((endTime - startTime) - (count * timeout)) / (double)(count * timeout);
+
+    if (error < 0.0)  {
+
+      error *= 1.0;
+    }
+
+    bool success = error < ThreadFactoryTests::ERROR;
+
+    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << count * timeout << "ms elapsed time: "<< endTime - startTime << "ms error%: " << error * 100.0 << std::endl;
+
+    return success;
+  }
+
+
+  class FloodTask : public Runnable {
+  public:
+
+    FloodTask(const size_t id) :_id(id) {}
+    ~FloodTask(){
+      if(_id % 1000 == 0) {
+        std::cout << "\t\tthread " << _id << " done" << std::endl;
+      }
+    }
+
+    void run(){
+      if(_id % 1000 == 0) {
+        std::cout << "\t\tthread " << _id << " started" << std::endl;
+      }
+
+      usleep(1);
+    }
+    const size_t _id;
+  };
+
+  void foo(PosixThreadFactory *tf) {
+  }
+
+  bool floodNTest(size_t loop=1, size_t count=100000) {
+
+    bool success = false;
+
+    for(size_t lix = 0; lix < loop; lix++) {
+
+      PosixThreadFactory threadFactory = PosixThreadFactory();
+      threadFactory.setDetached(true);
+
+        for(size_t tix = 0; tix < count; tix++) {
+
+          try {
+
+            shared_ptr<FloodTask> task(new FloodTask(lix * count + tix ));
+
+            shared_ptr<Thread> thread = threadFactory.newThread(task);
+
+            thread->start();
+
+            usleep(1);
+
+          } catch (TException& e) {
+
+            std::cout << "\t\t\tfailed to start  " << lix * count + tix << " thread " << e.what() << std::endl;
+
+            return success;
+          }
+        }
+
+        std::cout << "\t\t\tflooded " << (lix + 1) * count << " threads" << std::endl;
+
+        success = true;
+    }
+
+    return success;
+  }
+};
+
+const double ThreadFactoryTests::ERROR = .20;
+
+}}}} // apache::thrift::concurrency::test
+
diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
new file mode 100644
index 0000000..e7b5174
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <config.h>
+#include <concurrency/ThreadManager.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Util.h>
+
+#include <assert.h>
+#include <set>
+#include <iostream>
+#include <set>
+#include <stdint.h>
+
+namespace apache { namespace thrift { namespace concurrency { namespace test {
+
+using namespace apache::thrift::concurrency;
+
+/**
+ * ThreadManagerTests class
+ *
+ * @version $Id:$
+ */
+class ThreadManagerTests {
+
+public:
+
+  static const double ERROR;
+
+  class Task: public Runnable {
+
+  public:
+
+    Task(Monitor& monitor, size_t& count, int64_t timeout) :
+      _monitor(monitor),
+      _count(count),
+      _timeout(timeout),
+      _done(false) {}
+
+    void run() {
+
+      _startTime = Util::currentTime();
+
+      {
+        Synchronized s(_sleep);
+
+        try {
+          _sleep.wait(_timeout);
+        } catch(TimedOutException& e) {
+          ;
+        }catch(...) {
+          assert(0);
+        }
+      }
+
+      _endTime = Util::currentTime();
+
+      _done = true;
+
+      {
+        Synchronized s(_monitor);
+
+        // std::cout << "Thread " << _count << " completed " << std::endl;
+
+        _count--;
+
+        if (_count == 0) {
+
+          _monitor.notify();
+        }
+      }
+    }
+
+    Monitor& _monitor;
+    size_t& _count;
+    int64_t _timeout;
+    int64_t _startTime;
+    int64_t _endTime;
+    bool _done;
+    Monitor _sleep;
+  };
+
+  /**
+   * Dispatch count tasks, each of which blocks for timeout milliseconds then
+   * completes. Verify that all tasks completed and that thread manager cleans
+   * up properly on delete.
+   */
+  bool loadTest(size_t count=100, int64_t timeout=100LL, size_t workerCount=4) {
+
+    Monitor monitor;
+
+    size_t activeCount = count;
+
+    shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
+
+    shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+
+    threadFactory->setPriority(PosixThreadFactory::HIGHEST);
+
+    threadManager->threadFactory(threadFactory);
+
+    threadManager->start();
+
+    std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
+
+    for (size_t ix = 0; ix < count; ix++) {
+
+      tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout)));
+    }
+
+    int64_t time00 = Util::currentTime();
+
+    for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+
+        threadManager->add(*ix);
+    }
+
+    {
+      Synchronized s(monitor);
+
+      while(activeCount > 0) {
+
+        monitor.wait();
+      }
+    }
+
+    int64_t time01 = Util::currentTime();
+
+    int64_t firstTime = 9223372036854775807LL;
+    int64_t lastTime = 0;
+
+    double averageTime = 0;
+    int64_t minTime = 9223372036854775807LL;
+    int64_t maxTime = 0;
+
+    for (std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+
+      shared_ptr<ThreadManagerTests::Task> task = *ix;
+
+      int64_t delta = task->_endTime - task->_startTime;
+
+      assert(delta > 0);
+
+      if (task->_startTime < firstTime) {
+        firstTime = task->_startTime;
+      }
+
+      if (task->_endTime > lastTime) {
+        lastTime = task->_endTime;
+      }
+
+      if (delta < minTime) {
+        minTime = delta;
+      }
+
+      if (delta > maxTime) {
+        maxTime = delta;
+      }
+
+      averageTime+= delta;
+    }
+
+    averageTime /= count;
+
+    std::cout << "\t\t\tfirst start: " << firstTime << "ms Last end: " << lastTime << "ms min: " << minTime << "ms max: " << maxTime << "ms average: " << averageTime << "ms" << std::endl;
+
+    double expectedTime = ((count + (workerCount - 1)) / workerCount) * timeout;
+
+    double error = ((time01 - time00) - expectedTime) / expectedTime;
+
+    if (error < 0) {
+      error*= -1.0;
+    }
+
+    bool success = error < ERROR;
+
+    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl;
+
+    return success;
+  }
+
+  class BlockTask: public Runnable {
+
+  public:
+
+    BlockTask(Monitor& monitor, Monitor& bmonitor, size_t& count) :
+      _monitor(monitor),
+      _bmonitor(bmonitor),
+      _count(count) {}
+
+    void run() {
+      {
+        Synchronized s(_bmonitor);
+
+        _bmonitor.wait();
+
+      }
+
+      {
+        Synchronized s(_monitor);
+
+        _count--;
+
+        if (_count == 0) {
+
+          _monitor.notify();
+        }
+      }
+    }
+
+    Monitor& _monitor;
+    Monitor& _bmonitor;
+    size_t& _count;
+  };
+
+  /**
+   * Block test.  Create pendingTaskCountMax tasks.  Verify that we block adding the
+   * pendingTaskCountMax + 1th task.  Verify that we unblock when a task completes */
+
+  bool blockTest(int64_t timeout=100LL, size_t workerCount=2) {
+
+    bool success = false;
+
+    try {
+
+      Monitor bmonitor;
+      Monitor monitor;
+
+      size_t pendingTaskMaxCount = workerCount;
+
+      size_t activeCounts[] = {workerCount, pendingTaskMaxCount, 1};
+
+      shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount, pendingTaskMaxCount);
+
+      shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
+
+      threadFactory->setPriority(PosixThreadFactory::HIGHEST);
+
+      threadManager->threadFactory(threadFactory);
+
+      threadManager->start();
+
+      std::set<shared_ptr<ThreadManagerTests::BlockTask> > tasks;
+
+      for (size_t ix = 0; ix < workerCount; ix++) {
+
+        tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[0])));
+      }
+
+      for (size_t ix = 0; ix < pendingTaskMaxCount; ix++) {
+
+        tasks.insert(shared_ptr<ThreadManagerTests::BlockTask>(new ThreadManagerTests::BlockTask(monitor, bmonitor,activeCounts[1])));
+      }
+
+      for (std::set<shared_ptr<ThreadManagerTests::BlockTask> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+        threadManager->add(*ix);
+      }
+
+      if(!(success = (threadManager->totalTaskCount() == pendingTaskMaxCount + workerCount))) {
+        throw TException("Unexpected pending task count");
+      }
+
+      shared_ptr<ThreadManagerTests::BlockTask> extraTask(new ThreadManagerTests::BlockTask(monitor, bmonitor, activeCounts[2]));
+
+      try {
+        threadManager->add(extraTask, 1);
+        throw TException("Unexpected success adding task in excess of pending task count");
+      } catch(TimedOutException& e) {
+      }
+
+      std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount()  << std::endl;
+
+      {
+        Synchronized s(bmonitor);
+
+        bmonitor.notifyAll();
+      }
+
+      {
+        Synchronized s(monitor);
+
+        while(activeCounts[0] != 0) {
+          monitor.wait();
+        }
+      }
+
+      std::cout << "\t\t\t" << "Pending tasks " << threadManager->pendingTaskCount() << std::endl;
+
+      try {
+        threadManager->add(extraTask, 1);
+      } catch(TimedOutException& e) {
+        std::cout << "\t\t\t" << "add timed out unexpectedly"  << std::endl;
+        throw TException("Unexpected timeout adding task");
+
+      } catch(TooManyPendingTasksException& e) {
+        std::cout << "\t\t\t" << "add encountered too many pending exepctions" << std::endl;
+        throw TException("Unexpected timeout adding task");
+      }
+
+      // Wake up tasks that were pending before and wait for them to complete
+
+      {
+        Synchronized s(bmonitor);
+
+        bmonitor.notifyAll();
+      }
+
+      {
+        Synchronized s(monitor);
+
+        while(activeCounts[1] != 0) {
+          monitor.wait();
+        }
+      }
+
+      // Wake up the extra task and wait for it to complete
+
+      {
+        Synchronized s(bmonitor);
+
+        bmonitor.notifyAll();
+      }
+
+      {
+        Synchronized s(monitor);
+
+        while(activeCounts[2] != 0) {
+          monitor.wait();
+        }
+      }
+
+      if(!(success = (threadManager->totalTaskCount() == 0))) {
+        throw TException("Unexpected pending task count");
+      }
+
+    } catch(TException& e) {
+    }
+
+    std::cout << "\t\t\t" << (success ? "Success" : "Failure") << std::endl;
+    return success;
+ }
+};
+
+const double ThreadManagerTests::ERROR = .20;
+
+}}}} // apache::thrift::concurrency
+
+using namespace apache::thrift::concurrency::test;
+
diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.h b/lib/cpp/src/concurrency/test/TimerManagerTests.h
new file mode 100644
index 0000000..e6fe6ce
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/TimerManagerTests.h
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <concurrency/TimerManager.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Util.h>
+
+#include <assert.h>
+#include <iostream>
+
+namespace apache { namespace thrift { namespace concurrency { namespace test {
+
+using namespace apache::thrift::concurrency;
+
+/**
+ * ThreadManagerTests class
+ *
+ * @version $Id:$
+ */
+class TimerManagerTests {
+
+ public:
+
+  static const double ERROR;
+
+  class Task: public Runnable {
+   public:
+
+    Task(Monitor& monitor, int64_t timeout) :
+      _timeout(timeout),
+      _startTime(Util::currentTime()),
+      _monitor(monitor),
+      _success(false),
+      _done(false) {}
+
+    ~Task() { std::cerr << this << std::endl; }
+
+    void run() {
+
+      _endTime = Util::currentTime();
+
+      // Figure out error percentage
+
+      int64_t delta = _endTime - _startTime;
+
+
+      delta = delta > _timeout ?  delta - _timeout : _timeout - delta;
+
+      float error = delta / _timeout;
+
+      if(error < ERROR) {
+        _success = true;
+      }
+
+      _done = true;
+
+      std::cout << "\t\t\tTimerManagerTests::Task[" << this << "] done" << std::endl; //debug
+
+      {Synchronized s(_monitor);
+        _monitor.notifyAll();
+      }
+    }
+
+    int64_t _timeout;
+    int64_t _startTime;
+    int64_t _endTime;
+    Monitor& _monitor;
+    bool _success;
+    bool _done;
+  };
+
+  /**
+   * This test creates two tasks and waits for the first to expire within 10%
+   * of the expected expiration time. It then verifies that the timer manager
+   * properly clean up itself and the remaining orphaned timeout task when the
+   * manager goes out of scope and its destructor is called.
+   */
+  bool test00(int64_t timeout=1000LL) {
+
+    shared_ptr<TimerManagerTests::Task> orphanTask = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, 10 * timeout));
+
+    {
+
+      TimerManager timerManager;
+
+      timerManager.threadFactory(shared_ptr<PosixThreadFactory>(new PosixThreadFactory()));
+
+      timerManager.start();
+
+      assert(timerManager.state() == TimerManager::STARTED);
+
+      shared_ptr<TimerManagerTests::Task> task = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, timeout));
+
+      {
+        Synchronized s(_monitor);
+
+        timerManager.add(orphanTask, 10 * timeout);
+
+        timerManager.add(task, timeout);
+
+        _monitor.wait();
+      }
+
+      assert(task->_done);
+
+
+      std::cout << "\t\t\t" << (task->_success ? "Success" : "Failure") << "!" << std::endl;
+    }
+
+    // timerManager.stop(); This is where it happens via destructor
+
+    assert(!orphanTask->_done);
+
+    return true;
+  }
+
+  friend class TestTask;
+
+  Monitor _monitor;
+};
+
+const double TimerManagerTests::ERROR = .20;
+
+}}}} // apache::thrift::concurrency
+