Another checkpoint of initial cut at thread pool manager for thrift and related concurrency classes.
Added TimerManager - I can't live without one after all.
Added Util - handy place for common time operations et al.
Initial test code
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664722 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/Makefile b/lib/cpp/Makefile
index 8403d7c..fde4411 100644
--- a/lib/cpp/Makefile
+++ b/lib/cpp/Makefile
@@ -8,11 +8,19 @@
# Author:
# Mark Slee <mcslee@facebook.com>
-target: libthrift
+target: libthrift.so libconcurrency.so
# Tools
LD = g++
-LDFL = -shared -Wall -Isrc -fPIC -Wl,-soname=libthrift.so
+CPP = g++
+
+CC_COMMON_FLAGS = -g -c -Wall -Isrc -fPIC -fno-common
+
+LD_COMMON_FLAGS=
+
+LD_APP_FLAGS= $(LD_COMMON_FLAGS)
+
+LD_LIB_FLAGS= -dynamiclib $(LD_COMMON_FLAGS)
# Source files
SRCS = src/protocol/TBinaryProtocol.cc \
@@ -22,13 +30,61 @@
src/transport/TServerSocket.cc \
src/server/TSimpleServer.cc
-# Linked library
-libthrift:
- $(LD) -o libthrift.so $(LDFL) $(SRCS)
+# Concurreny Utility Source files
+CONCURRENCY_SRCS = src/concurrency/Monitor.cc \
+ src/concurrency/Mutex.cc \
+ src/concurrency/PosixThreadFactory.cc \
+ src/concurrency/ThreadManager.cc \
+ src/concurrency/TimerManager.cc
+
+CONCURRENCY_OBJS = $(patsubst %.cc,%.o,$(CONCURRENCY_SRCS))
+
+$(CONCURRENCY_OBJS): %.o : %.cc
+ $(CC) $(CC_COMMON_FLAGS) $< -o $@
+
+CONCURRENCY_TEST_SRCS = src/concurrency/test/TimerManagerTests.cc
+
+CONCURRENCY_TEST_OBJS = $(patsubst %.cc,%.o,$(CONCURRENCY_TEST_SRCS))
+
+$(CONCURRENCY_TEST_OBJS): %.o : %.cc
+ $(CC) $(CC_COMMON_FLAGS) -I src/concurrency $< -o $@
+
+# Linked libraries
+
+# thrift library
+
+THRIFT_OBJS = $(patsubst %.cc,%.o, $(SRCS))
+
+$(THRIFT_OBJS): %.o : %.cc
+ $(CC) $(CC_COMMON_FLAGS) $< -o $@
+
+libthrift.so: $(THRIFT_OBJS)
+ $(LD) -o $@ $(LD_LIB_FLAGS) $(THRIFT_OBJS)
+
+# concurrency util library
+
+libconcurrency.so: $(CONCURRENCY_OBJS)
+ $(LD) -o $@ $(LD_LIB_FLAGS) $(CONCURRENCY_OBJS)
+
+concurrency_tests: libconcurrency.so $(CONCURRENCY_TEST_OBJS)
+ $(LD) -o $@ $(LD_APP_FLAGS) -L. $(CONCURRENCY_TEST_OBJS) libconcurrency.so
+
+tests: concurrency_tests
+
+clean_libthrift:
+ rm -f libthrift.so
+ rm -f $(THRIFT_OBJS)
+
+clean_libconcurrency:
+ rm -f libconcurrency.so
+ rm -f $(CONCURRENCY_OBJS)
+
+clean_tests:
+ rm -f concurrency_tests
+ rm -f $(CONCURRENTY_TEST_OBJS)
# Clean it up
-clean:
- rm -f libthrift.so
+clean: clean_libthrift clean_libconcurrency clean_tests
# Install
install: libthrift
diff --git a/lib/cpp/src/concurrency/Monitor.cc b/lib/cpp/src/concurrency/Monitor.cc
index d1b83d1..a2e4276 100644
--- a/lib/cpp/src/concurrency/Monitor.cc
+++ b/lib/cpp/src/concurrency/Monitor.cc
@@ -1,15 +1,17 @@
#include "Monitor.h"
+#include "Util.h"
#include <assert.h>
#include <errno.h>
#include <pthread.h>
+
namespace facebook { namespace thrift { namespace concurrency {
/** Monitor implementation using the POSIX pthread library
@author marc
- @version $Id$ */
+ @version $Id:$ */
class Monitor::Impl {
@@ -61,7 +63,7 @@
struct timespec abstime;
- toAbsoluteTimespec(abstime, timeout);
+ Util::toAbsoluteTimespec(abstime, timeout);
int result = pthread_cond_timedwait(&_pthread_cond, &_pthread_mutex, &abstime);
@@ -88,29 +90,6 @@
private:
- /** Converts relative timeout specified as a duration in milliseconds to a struct timespec structure
- specifying current time plus timeout
-
- @param timeout time to delay in milliseconds
- @return struct timespec current time plus timeout */
-
- static const void toAbsoluteTimespec(struct timespec& result, long long timeout) {
-
- // XXX Darwin doesn't seem to have any readily useable hi-res clock.
-
- time_t seconds;
-
- assert(time(&seconds) != (time_t)-1);
-
- seconds+= (timeout / 1000);
-
- long nanoseconds = (timeout % 1000) * 1000000;
-
- result.tv_sec = seconds + (nanoseconds / 1000000000);
-
- result.tv_nsec = nanoseconds % 1000000000;
- }
-
mutable pthread_mutex_t _pthread_mutex;
mutable bool mutexInitialized;
diff --git a/lib/cpp/src/concurrency/Monitor.h b/lib/cpp/src/concurrency/Monitor.h
index 82544f1..13dec18 100644
--- a/lib/cpp/src/concurrency/Monitor.h
+++ b/lib/cpp/src/concurrency/Monitor.h
@@ -1,5 +1,5 @@
-#if !defined(_concurrency_mutex_h_)
-#define _concurrency_mutex_h_ 1
+#if !defined(_concurrency_Monitor_h_)
+#define _concurrency_Monitor_h_ 1
namespace facebook { namespace thrift { namespace concurrency {
@@ -11,7 +11,7 @@
methods without needing to cast away constness or change to non-const signatures.
@author marc
- @version $Id$ */
+ @version $Id:$ */
class Monitor {
@@ -56,4 +56,4 @@
}}} // facebook::thrift::concurrency
-#endif // !defined(_concurrency_mutex_h_)
+#endif // !defined(_concurrency_Monitor_h_)
diff --git a/lib/cpp/src/concurrency/Mutex.cc b/lib/cpp/src/concurrency/Mutex.cc
index 39d768e..8282e73 100644
--- a/lib/cpp/src/concurrency/Mutex.cc
+++ b/lib/cpp/src/concurrency/Mutex.cc
@@ -3,6 +3,11 @@
#include <assert.h>
#include <pthread.h>
+/** Implementation of Mutex class using POSIX mutex
+
+ @author marc
+ @version $Id:$ */
+
namespace facebook { namespace thrift { namespace concurrency {
class Mutex::impl {
diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h
index e8371ea..20d4c0b 100644
--- a/lib/cpp/src/concurrency/Mutex.h
+++ b/lib/cpp/src/concurrency/Mutex.h
@@ -3,6 +3,11 @@
namespace facebook { namespace thrift { namespace concurrency {
+/** A simple mutex class
+
+ @author marc
+ @version $Id:$ */
+
class Mutex {
public:
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc
index e9d52f0..e7f84cd 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cc
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc
@@ -5,7 +5,10 @@
namespace facebook { namespace thrift { namespace concurrency {
-/** The POSIX thread class. */
+/** The POSIX thread class.
+
+ @author marc
+ @version $Id:$ */
class PthreadThread: public Thread {
@@ -139,7 +142,7 @@
/** Converts relative thread priorities to absolute value based on posix thread scheduler policy
The idea is simply to divide up the priority range for the given policy into the correpsonding relative
- priority level (lowest..highest) and then prorate accordingly. */
+ priority level (lowest..highest) and then pro-rate accordingly. */
static int toPthreadPriority(POLICY policy, PRIORITY priority) {
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h
index 88a0888..b42981b 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.h
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.h
@@ -7,7 +7,8 @@
/** A thread factory to create posix threads
- @author marc */
+ @author marc
+ @version $Id:$ */
class PosixThreadFactory : public ThreadFactory {
diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h
index befb4fe..3fc094d 100644
--- a/lib/cpp/src/concurrency/Thread.h
+++ b/lib/cpp/src/concurrency/Thread.h
@@ -5,7 +5,10 @@
class Thread;
-/** Minimal runnable class. More or less analogous to java.lang.Runnable. */
+/** Minimal runnable class. More or less analogous to java.lang.Runnable.
+
+ @author marc
+ @version $Id:$ */
class Runnable {
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
index c4ca2b1..b5d02e6 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cc
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -1,6 +1,9 @@
#include "ThreadManager.h"
+#include "Monitor.h"
#include <assert.h>
+#include <queue>
+#include <set>
namespace facebook { namespace thrift { namespace concurrency {
@@ -15,7 +18,108 @@
policy issues. The simplest policy, StaticPolicy, does nothing other than create a fixed number of threads.
@author marc
- @version $Id */
+ @version $Id:$ */
+
+class ThreadManager::Impl : public ThreadManager {
+
+ public:
+
+ Impl(size_t highWatermark, size_t lowWatermark) :
+ _hiwat(highWatermark),
+ _lowat(lowWatermark) {
+ }
+
+ ~Impl() {}
+
+ size_t highWatermark() const {return _hiwat;}
+
+ void highWatermark(size_t value) {_hiwat = value;}
+
+ size_t lowWatermark() const {return _lowat;}
+
+ void lowWatermark(size_t value) {_lowat = value;}
+
+ const PoolPolicy* poolPolicy() const {
+
+ Synchronized s(_monitor);
+
+ return _poolPolicy;
+ }
+
+ void poolPolicy(const PoolPolicy* value) {
+
+ Synchronized s(_monitor);
+
+ _poolPolicy = value;
+ }
+
+ const ThreadFactory* threadFactory() const {
+
+ Synchronized s(_monitor);
+
+ return _threadFactory;
+ }
+
+ void threadFactory(const ThreadFactory* value) {
+
+ Synchronized s(_monitor);
+
+ _threadFactory = value;
+ }
+
+ void addThread(size_t value);
+
+ void removeThread(size_t value);
+
+ size_t idleWorkerCount() const {return _idleCount;}
+
+ size_t workerCount() const {
+
+ Synchronized s(_monitor);
+
+ return _workers.size();
+ }
+
+ size_t pendingTaskCount() const {
+
+ Synchronized s(_monitor);
+
+ return _tasks.size();
+ }
+
+ size_t totalTaskCount() const {
+
+ Synchronized s(_monitor);
+
+ return _tasks.size() + _workers.size() - _idleCount;
+ }
+
+ void add(Runnable* value);
+
+ void remove(Runnable* task);
+
+private:
+
+ size_t _hiwat;
+
+ size_t _lowat;
+
+ size_t _idleCount;
+
+ const PoolPolicy* _poolPolicy;;
+
+ const ThreadFactory* _threadFactory;
+
+ friend class ThreadManager::Task;
+
+ std::queue<Task*> _tasks;
+
+ Monitor _monitor;
+
+ friend class ThreadManager::Worker;
+
+ std::set<Thread*> _workers;
+};
class ThreadManager::Task : public Runnable {
@@ -49,7 +153,6 @@
};
class ThreadManager::Worker: public Runnable {
-
enum STATE {
UNINITIALIZED,
STARTING,
@@ -59,7 +162,7 @@
};
public:
- Worker(ThreadManager* manager) :
+ Worker(ThreadManager::Impl* manager) :
_manager(manager),
_state(UNINITIALIZED),
_idle(false)
@@ -134,170 +237,138 @@
private:
- ThreadManager* _manager;
+ ThreadManager::Impl* _manager;
- friend class ThreadManager;
+ friend class ThreadManager::Impl;
STATE _state;
bool _idle;
};
-ThreadManager::ThreadManager(size_t highWatermark, size_t lowWatermark) :
- _hiwat(highWatermark),
- _lowat(lowWatermark) {
-}
-
-ThreadManager::~ThreadManager() {}
-
-size_t ThreadManager::ThreadManager::highWatermark() const {return _hiwat;}
-
-void ThreadManager::highWatermark(size_t value) {_hiwat = value;}
-
-size_t ThreadManager::lowWatermark() const {return _lowat;}
-
-void ThreadManager::lowWatermark(size_t value) {_lowat = value;}
-
-const PoolPolicy* ThreadManager::poolPolicy() const {
-
- Synchronized s(_monitor);
-
- return _poolPolicy;
-}
-
-void ThreadManager::poolPolicy(const PoolPolicy* value) {
-
- Synchronized s(_monitor);
-
- _poolPolicy = value;
-}
-
-const ThreadFactory* ThreadManager::threadFactory() const {
-
- Synchronized s(_monitor);
-
- return _threadFactory;
-}
-
-void ThreadManager::threadFactory(const ThreadFactory* value) {
+void ThreadManager::Impl::addThread(size_t value) {
- Synchronized s(_monitor);
+ std::set<Thread*> newThreads;
- _threadFactory = value;
-}
+ for(size_t ix = 0; ix < value; ix++) {
-void ThreadManager::addThread(size_t value) {
+ class ThreadManager::Worker;
+
+ ThreadManager::Worker* worker = new ThreadManager::Worker(this);
- std::set<Thread*> newThreads;
+ newThreads.insert(_threadFactory->newThread(worker));
+ }
- for(size_t ix = 0; ix < value; ix++) {
+ for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
- ThreadManager::Worker* worker = new ThreadManager::Worker(this);
+ (*ix)->start();
+ }
+ for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
- newThreads.insert(_threadFactory->newThread(worker));
+ (*ix)->start();
+ }
+
+ {Synchronized s(_monitor);
+
+ _workers.insert(newThreads.begin(), newThreads.end());
+ }
}
- for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
-
- (*ix)->start();
- }
- for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
-
- (*ix)->start();
- }
-
- {Synchronized s(_monitor);
-
- _workers.insert(newThreads.begin(), newThreads.end());
- }
-}
-
-void ThreadManager::removeThread(size_t value) {
+void ThreadManager::Impl::removeThread(size_t value) {
std::set<Thread*> removedThreads;
- {Synchronized s(_monitor);
+ {Synchronized s(_monitor);
- /* Overly clever loop
+ /* Overly clever loop
+
+ First time through, (idleOnly == 1) just look for idle threads. If that didn't find enough, go through again (idleOnly == 0)
+ and remove a sufficient number of busy threads. */
- First time through, (idleOnly == 1) just look for idle threads. If that didn't find enough, go through again (idleOnly == 0)
- and remove a sufficient number of busy threads. */
-
- for(int idleOnly = 1; idleOnly <= 0; idleOnly--) {
+ for(int idleOnly = 1; idleOnly <= 0; idleOnly--) {
- for(std::set<Thread*>::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) {
+ for(std::set<Thread*>::iterator workerThread = _workers.begin(); (workerThread != _workers.end()) && (removedThreads.size() < value); workerThread++) {
- Worker* worker = (Worker*)(*workerThread)->runnable();
+ Worker* worker = (Worker*)(*workerThread)->runnable();
- if(worker->_idle || !idleOnly) {
+ if(worker->_idle || !idleOnly) {
- removedThreads.insert(*workerThread);
+ removedThreads.insert(*workerThread);
- _workers.erase(workerThread);
+ _workers.erase(workerThread);
+ }
}
}
+
+ _monitor.notifyAll();
}
+
- _monitor.notifyAll();
+ // Join removed threads and free worker
+
+ for(std::set<Thread*>::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) {
+
+ Worker* worker = (Worker*)(*workerThread)->runnable();
+
+ (*workerThread)->join();
+
+ delete worker;
+ }
+ }
+
+void ThreadManager::Impl::add(Runnable* value) {
+
+ Synchronized s(_monitor);
+
+ _tasks.push(new ThreadManager::Task(value));
+
+ /* If queue is empty notify a thread, otherwise all worker threads are running and will get around to this
+ task in time. */
+
+ if(_tasks.size() == 1) {
+
+ assert(_idleCount == _workers.size());
+
+ _monitor.notify();
+ }
}
+void ThreadManager::Impl::remove(Runnable* task) {
- // Join removed threads and free worker
-
- for(std::set<Thread*>::iterator workerThread = removedThreads.begin(); workerThread != removedThreads.end(); workerThread++) {
-
- Worker* worker = (Worker*)(*workerThread)->runnable();
-
- (*workerThread)->join();
-
- delete worker;
+ Synchronized s(_monitor);
}
+
+ThreadManager* ThreadManager::newThreadManager(size_t lowWatermark, size_t highWatermark) {
+ return new ThreadManager::Impl(lowWatermark, highWatermark);
}
-size_t ThreadManager::idleWorkerCount() const {return _idleCount;}
+/** Basic Pool Policy Implementation */
-size_t ThreadManager::workerCount() const {
+class BasicPoolPolicy::Impl : public PoolPolicy {
- Synchronized s(_monitor);
+ public:
- return _workers.size();
-}
+ Impl() {}
-size_t ThreadManager::pendingTaskCount() const {
+ ~Impl() {}
- Synchronized s(_monitor);
+ void onEmpty(ThreadManager* source) const {}
- return _tasks.size();
-}
+ void onLowWatermark(ThreadManager* source) const {}
-size_t ThreadManager::totalTaskCount() const {
+ void onHighWatermark(ThreadManager* source) const {}
+};
- Synchronized s(_monitor);
+BasicPoolPolicy::BasicPoolPolicy() : _impl(new BasicPoolPolicy::Impl()) {}
- return _tasks.size() + _workers.size() - _idleCount;
-}
+BasicPoolPolicy::~BasicPoolPolicy() { delete _impl;}
-void ThreadManager::add(Runnable* value) {
+void BasicPoolPolicy::onEmpty(ThreadManager* source) const {_impl->onEmpty(source);}
- Synchronized s(_monitor);
+void BasicPoolPolicy::onLowWatermark(ThreadManager* source) const {_impl->onLowWatermark(source);}
- _tasks.push(new ThreadManager::Task(value));
+void BasicPoolPolicy::onHighWatermark(ThreadManager* source) const {_impl->onHighWatermark(source);}
- /* If queue is empty notify a thread, otherwise all worker threads are running and will get around to this
- task in time. */
-
- if(_tasks.size() == 1) {
-
- assert(_idleCount == _workers.size());
-
- _monitor.notify();
- }
-}
-
-void ThreadManager::remove(Runnable* task) {
-
- Synchronized s(_monitor);
-}
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h
index 1742881..352afb9 100644
--- a/lib/cpp/src/concurrency/ThreadManager.h
+++ b/lib/cpp/src/concurrency/ThreadManager.h
@@ -1,14 +1,17 @@
#if !defined(_concurrency_ThreadManager_h_)
#define _concurrency_ThreadManager_h_ 1
-#include "Monitor.h"
+#include <sys/types.h>
+
#include "Thread.h"
-#include <set>
-#include <queue>
-
namespace facebook { namespace thrift { namespace concurrency {
+/** Thread Pool Manager and related classes
+
+ @author marc
+ @version $Id:$ */
+
class ThreadManager;
/** PoolPolicy class
@@ -19,12 +22,37 @@
public:
- virtual ~PoolPolicy() = 0;
+ PoolPolicy() {}
- virtual void onlowWatermark(ThreadManager* source) const = 0;
+ virtual ~PoolPolicy() {}
- virtual void onhighWatermark(ThreadManager* source) const = 0;
+ virtual void onEmpty(ThreadManager* source) const = 0;
+ virtual void onLowWatermark(ThreadManager* source) const = 0;
+
+ virtual void onHighWatermark(ThreadManager* source) const = 0;
+
+};
+
+class BasicPoolPolicy : public PoolPolicy {
+
+ public:
+
+ BasicPoolPolicy();
+
+ virtual ~BasicPoolPolicy();
+
+ virtual void onEmpty(ThreadManager* source) const;
+
+ virtual void onLowWatermark(ThreadManager* source) const;
+
+ virtual void onHighWatermark(ThreadManager* source) const;
+
+ private:
+
+ class Impl;
+
+ Impl* _impl;
};
/** ThreadManager class
@@ -41,9 +69,9 @@
public:
- ThreadManager(size_t highWatermark=4, size_t lowWatermark=2);
+ ThreadManager(size_t highWatermark=4, size_t lowWatermark=2) {};
- virtual ~ThreadManager() = 0;
+ virtual ~ThreadManager() {};
virtual const PoolPolicy* poolPolicy() const = 0;
@@ -89,32 +117,13 @@
virtual void remove(Runnable* task) = 0;
- private:
-
- size_t _hiwat;
-
- size_t _lowat;
-
- size_t _idleCount;
-
- const PoolPolicy* _poolPolicy;;
-
- const ThreadFactory* _threadFactory;;
+ static ThreadManager* newThreadManager(size_t lowWatermark=2, size_t highWatermark=4);
class Task;
-
- friend class Task;
-
- std::queue<Task*> _tasks;
-
- Monitor _monitor;
-
+
class Worker;
- friend class Worker;
-
- std::set<Thread*> _workers;
-
+ class Impl;
};
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/TimerManager.cc b/lib/cpp/src/concurrency/TimerManager.cc
new file mode 100644
index 0000000..bd68264
--- /dev/null
+++ b/lib/cpp/src/concurrency/TimerManager.cc
@@ -0,0 +1,211 @@
+#include "TimerManager.h"
+#include "Util.h"
+
+#include <assert.h>
+
+#include <set>
+
+namespace facebook { namespace thrift { namespace concurrency {
+
+/** TimerManager class
+
+ @author marc
+ @version $Id:$ */
+
+typedef std::multimap<long long, TimerManager::Task*>::iterator task_iterator;
+typedef std::pair<task_iterator, task_iterator> task_range;
+
+class TimerManager::Task : public Runnable {
+
+public:
+ enum STATE {
+ WAITING,
+ EXECUTING,
+ CANCELLED,
+ COMPLETE
+ };
+
+ Task(Runnable* runnable) :
+ _runnable(runnable),
+ _state(WAITING)
+ {}
+
+ ~Task() {};
+
+ void run() {
+ if(_state == EXECUTING) {
+ _runnable->run();
+ _state = COMPLETE;
+ }
+ }
+
+ private:
+
+ Runnable* _runnable;
+
+ STATE _state;
+};
+
+class TimerManager::Dispatcher: public Runnable {
+
+ enum STATE {
+ UNINITIALIZED,
+ STARTING,
+ STARTED,
+ STOPPING,
+ STOPPED
+ };
+
+public:
+ Dispatcher(TimerManager* manager) :
+ _manager(manager),
+ _state(UNINITIALIZED)
+ {}
+
+ ~Dispatcher() {}
+
+ /** Dispatcher entry point
+
+ As long as dispatcher thread is running, pull tasks off the task _taskMap and execute. */
+
+ void run() {
+
+ {Synchronized(_manager->_monitor);
+
+ if(_state == STARTING) {
+ _state = STARTED;
+ }
+ }
+
+ do {
+
+ std::set<TimerManager::Task*> expiredTasks;
+
+ {Synchronized(_manager->_monitor);
+
+ long long now = Util::currentTime();
+
+ task_iterator expiredTaskEnd;
+
+ while(_state == STARTED &&
+ (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.end()) {
+
+ _manager->_monitor.wait(_manager->_nextTimeout - now);
+
+ }
+
+ if(_state == STARTED) {
+
+ for(task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) {
+
+ TimerManager::Task* task = ix->second;
+
+ expiredTasks.insert(task);
+
+ _manager->_taskCount--;
+ }
+
+ _manager->_taskMap.erase(_manager->_taskMap.begin(), expiredTaskEnd);
+ }
+ }
+
+ for(std::set<Task*>::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
+
+ (*ix)->run();
+
+ delete *ix;
+ }
+
+ } while(_state == STARTED);
+
+ {Synchronized(_manager->_monitor);
+
+ if(_state == STOPPING) {
+
+ _state = STOPPED;
+
+ _manager->_monitor.notify();
+
+ }
+ }
+
+ return;
+ }
+
+ private:
+
+ TimerManager* _manager;
+
+ friend class TimerManager;
+
+ STATE _state;
+};
+
+TimerManager::TimerManager() {}
+
+TimerManager::~TimerManager() {}
+
+const ThreadFactory* TimerManager::threadFactory() const {
+
+ Synchronized s(_monitor);
+
+ return _threadFactory;
+}
+
+void TimerManager::threadFactory(const ThreadFactory* value) {
+
+ Synchronized s(_monitor);
+
+ _threadFactory = value;
+}
+
+void TimerManager::add(Runnable* task, long long timeout) {
+
+ long long now = Util::currentTime();
+
+ timeout += now;
+
+ {Synchronized s(_monitor);
+
+ _taskCount++;
+
+ _taskMap.insert(std::pair<long long, Task*>(timeout, new Task(task)));
+
+ /* If the task map was empty, or if we have an expiration that is earlier than any previously seen,
+ kick the dispatcher so it can update its timeout */
+
+ if(_taskCount == 1 || timeout < _nextTimeout) {
+
+ _monitor.notify();
+ }
+
+ if(timeout < _nextTimeout) {
+
+ _nextTimeout = timeout;
+ }
+ }
+}
+
+void TimerManager::add(Runnable* task, const struct timespec& value) {
+
+ long long expiration;
+
+ Util::toMilliseconds(expiration, value);
+
+ /* XXX
+ Need to convert this to an explicit exception */
+
+ long long now = Util::currentTime();
+
+ assert(expiration < now);
+
+ add(task, expiration - now);
+}
+
+
+void TimerManager::remove(Runnable* task) {
+
+}
+
+}}} // facebook::thrift::concurrency
+
diff --git a/lib/cpp/src/concurrency/TimerManager.h b/lib/cpp/src/concurrency/TimerManager.h
new file mode 100644
index 0000000..002460a
--- /dev/null
+++ b/lib/cpp/src/concurrency/TimerManager.h
@@ -0,0 +1,77 @@
+#if !defined(_concurrency_TimerManager_h_)
+#define _concurrency_TimerManager_h_ 1
+
+#include "Monitor.h"
+#include "Thread.h"
+
+#include <map>
+
+#include <time.h>
+
+namespace facebook { namespace thrift { namespace concurrency {
+
+/** Timer Manager
+
+ This class dispatches timer tasks when they fall due.
+
+ @author marc
+ @version $Id:$ */
+
+class TimerManager {
+
+ public:
+
+ TimerManager();
+
+ virtual ~TimerManager() = 0;
+
+ virtual const ThreadFactory* threadFactory() const = 0;
+
+ virtual void threadFactory(const ThreadFactory* value) = 0;
+
+ virtual size_t taskCount() const = 0;
+
+ /** Adds a task to be executed at some time in the future by a worker thread.
+
+ @param task The task to execute
+ @param timeout Time in milliseconds to delay before executing task */
+
+ virtual void add(Runnable* task, long long timeout) = 0;
+
+ /** Adds a task to be executed at some time in the future by a worker thread.
+
+ @param task The task to execute
+ @param timeout Absolute time in the future to execute task. */
+
+ virtual void add(Runnable* task, const struct timespec& timeout) = 0;
+
+ /** Removes a pending task */
+
+ virtual void remove(Runnable* task) = 0;
+
+ private:
+
+ const ThreadFactory* _threadFactory;
+
+ class Task;
+
+ friend class Task;
+
+ std::multimap<long long, Task*> _taskMap;
+
+ size_t _taskCount;
+
+ long long _nextTimeout;
+
+ Monitor _monitor;
+
+ class Dispatcher;
+
+ friend class Dispatcher;
+
+ Dispatcher* _dispatcher;
+};
+
+}}} // facebook::thrift::concurrency
+
+#endif // !defined(_concurrency_TimerManager_h_)
diff --git a/lib/cpp/src/concurrency/Util.h b/lib/cpp/src/concurrency/Util.h
new file mode 100644
index 0000000..3ab8929
--- /dev/null
+++ b/lib/cpp/src/concurrency/Util.h
@@ -0,0 +1,67 @@
+#if !defined(_concurrency_Util_h_)
+#define _concurrency_Util_h_ 1
+
+#include <assert.h>
+#include <time.h>
+
+namespace facebook { namespace thrift { namespace concurrency {
+
+/** Utility methods
+
+ This class contains basic utility methods for converting time formats, and other common platform-dependent concurrency operations.
+ It should not be included in API headers for other concurrency library headers, since it will, by definition, pull in all sorts of
+ horrid platform dependent crap. Rather it should be inluded directly in concurrency library implementation source.
+
+ @author marc
+ @version $Id:$ */
+
+class Util {
+
+ public:
+
+ /** Converts relative timeout specified as a duration in milliseconds to a struct timespec structure
+ specifying current time plus timeout
+
+ @param struct timespec& current time plus timeout result
+ @param timeout time to delay in milliseconds */
+
+ static const void toAbsoluteTimespec(struct timespec& result, long long value) {
+
+ // XXX Darwin doesn't seem to have any readily useable hi-res clock.
+
+ time_t seconds;
+
+ assert(time(&seconds) != (time_t)-1);
+
+ seconds+= (value / 1000);
+
+ long nanoseconds = (value % 1000) * 1000000;
+
+ result.tv_sec = seconds + (nanoseconds / 1000000000);
+
+ result.tv_nsec = nanoseconds % 1000000000;
+ }
+
+ /** Converts absolute timespec to milliseconds from epoch */
+
+ static const void toMilliseconds(long long& result, const struct timespec& value) {
+
+ result = value.tv_sec * 1000 + value.tv_nsec * 1000000;
+ }
+
+ /** Get current time as milliseconds from epoch */
+
+ static const long long currentTime() {
+
+ time_t now;
+
+ time(&now);
+
+ return (long long)now * 1000;
+ }
+};
+
+
+}}} // facebook::thrift::concurrency
+
+#endif // !defined(_concurrency_Util_h_)
diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.cc b/lib/cpp/src/concurrency/test/TimerManagerTests.cc
new file mode 100644
index 0000000..abd0e95
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/TimerManagerTests.cc
@@ -0,0 +1,33 @@
+#include <ThreadManager.h>
+#include <PosixThreadFactory.h>
+
+#include <assert.h>
+
+namespace facebook { namespace thrift { namespace concurrency { namespace test {
+
+/** ThreadManagerTests class */
+
+
+class ThreadManagerTests {
+
+ void init() {
+
+ ThreadManager* threadManager = ThreadManager::newThreadManager();
+
+ threadManager->poolPolicy(new BasicPoolPolicy());
+
+ threadManager->threadFactory(new PosixThreadFactory());
+
+ threadManager->poolPolicy(new BasicPoolPolicy());
+ }
+};
+
+
+}}}} // facebook::thrift::concurrency
+
+int main(int argc, char** argv) {
+
+ return 0;
+
+}
+