Converted concurrency classes to use boost::shared_ptr and boost::weak_ptr:
Wrapped all thrift code in facebook::thrift:: namespace
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664735 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/Monitor.cc b/lib/cpp/src/concurrency/Monitor.cc
index b1d7b72..7493ec3 100644
--- a/lib/cpp/src/concurrency/Monitor.cc
+++ b/lib/cpp/src/concurrency/Monitor.cc
@@ -22,35 +22,26 @@
public:
Impl() :
- mutexInitialized(false) {
+ mutexInitialized(false),
+ condInitialized(false) {
- /* XXX
- Need to fix this to handle failures without leaking. */
+ try {
- assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0);
+ assert(pthread_mutex_init(&_pthread_mutex, NULL) == 0);
- mutexInitialized = true;
+ mutexInitialized = true;
- assert(pthread_cond_init(&_pthread_cond, NULL) == 0);
- }
+ assert(pthread_cond_init(&_pthread_cond, NULL) == 0);
- ~Impl() {
+ condInitialized = true;
- if(mutexInitialized) {
-
- mutexInitialized = false;
-
- assert(pthread_mutex_destroy(&_pthread_mutex) == 0);
- }
-
- if(condInitialized) {
-
- condInitialized = false;
-
- assert(pthread_cond_destroy(&_pthread_cond) == 0);
+ } catch(...) {
+ cleanup();
}
}
+ ~Impl() {cleanup();}
+
void lock() const {pthread_mutex_lock(&_pthread_mutex);}
void unlock() const {pthread_mutex_unlock(&_pthread_mutex);}
@@ -98,6 +89,23 @@
private:
+ void cleanup() {
+
+ if(mutexInitialized) {
+
+ mutexInitialized = false;
+
+ assert(pthread_mutex_destroy(&_pthread_mutex) == 0);
+ }
+
+ if(condInitialized) {
+
+ condInitialized = false;
+
+ assert(pthread_cond_destroy(&_pthread_cond) == 0);
+ }
+ }
+
mutable pthread_mutex_t _pthread_mutex;
mutable bool mutexInitialized;
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc
index 00b2613..f07db86 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cc
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc
@@ -3,8 +3,14 @@
#include <assert.h>
#include <pthread.h>
+#include <iostream>
+
+#include <boost/weak_ptr.hpp>
+
namespace facebook { namespace thrift { namespace concurrency {
+using namespace boost;
+
/** The POSIX thread class.
@author marc
@@ -36,18 +42,23 @@
int _stackSize;
+ weak_ptr<PthreadThread> _self;
+
public:
- PthreadThread(int policy, int priority, int stackSize, Runnable* runnable) :
+ PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
_pthread(0),
_state(uninitialized),
_policy(policy),
_priority(priority),
- _stackSize(stackSize) {
+ _stackSize(stackSize) {
this->Thread::runnable(runnable);
}
+ ~PthreadThread() {
+ }
+
void start() {
if(_state != uninitialized) {
@@ -75,9 +86,13 @@
// Set thread priority
- // assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
+ assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
- assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)this) == 0);
+ shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
+
+ *selfRef = _self.lock();
+
+ assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)selfRef) == 0);
}
void join() {
@@ -90,17 +105,27 @@
}
}
- Runnable* runnable() const {return Thread::runnable();}
+ shared_ptr<Runnable> runnable() const {return Thread::runnable();}
- void runnable(Runnable* value) {Thread::runnable(value);}
+ void runnable(shared_ptr<Runnable> value) {Thread::runnable(value);}
+ void weakRef(shared_ptr<PthreadThread> self) {
+ assert(self.get() == this);
+ _self = weak_ptr<PthreadThread>(self);
+ }
};
void* PthreadThread::threadMain(void* arg) {
// XXX need a lock here when testing thread state
- PthreadThread* thread = (PthreadThread*)arg;
-
+ shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
+
+ delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
+
+ if(thread == NULL) {
+ return (void*)0;
+ }
+
if(thread->_state != starting) {
return (void*)0;
}
@@ -184,9 +209,12 @@
@param runnable A runnable object */
- Thread* newThread(Runnable* runnable) const {
+ shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
- return new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable);
+ shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable));
+ result->weakRef(result);
+ runnable->thread(result);
+ return result;
}
int stackSize() const { return _stackSize;}
@@ -198,7 +226,7 @@
/** Sets priority.
XXX
- Need to handle incremental priorities properl. */
+ Need to handle incremental priorities properly. */
void priority(PRIORITY value) { _priority = value;}
@@ -207,7 +235,7 @@
PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
_impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
-Thread* PosixThreadFactory::newThread(Runnable* runnable) const {return _impl->newThread(runnable);}
+shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const {return _impl->newThread(runnable);}
int PosixThreadFactory::stackSize() const {return _impl->stackSize();}
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h
index b42981b..0095cf8 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.h
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.h
@@ -3,8 +3,12 @@
#include "Thread.h"
+#include <boost/shared_ptr.hpp>
+
namespace facebook { namespace thrift { namespace concurrency {
+using namespace boost;
+
/** A thread factory to create posix threads
@author marc
@@ -43,7 +47,7 @@
// From ThreadFactory;
- Thread* newThread(Runnable* runnable) const;
+ shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const;
/** Sets stack size for created threads
@@ -69,7 +73,7 @@
class Impl;
- Impl* _impl;
+ shared_ptr<Impl> _impl;
};
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h
index 2416887..ea6b999 100644
--- a/lib/cpp/src/concurrency/Thread.h
+++ b/lib/cpp/src/concurrency/Thread.h
@@ -1,8 +1,13 @@
#if !defined(_concurrency_Thread_h_)
#define _concurrency_Thread_h_ 1
+#include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
+
namespace facebook { namespace thrift { namespace concurrency {
+using namespace boost;
+
class Thread;
/** Minimal runnable class. More or less analogous to java.lang.Runnable.
@@ -18,17 +23,17 @@
virtual void run() = 0;
- virtual Thread* thread() {return _thread;}
+ /** Gets the thread object that is hosting this runnable object - can return an empty shared pointer if no references remain on thet thread object */
- private:
+ virtual shared_ptr<Thread> thread() {return _thread.lock();}
/** Sets the thread that is executing this object. This is only meant for use by concrete implementations of Thread. */
- friend class Thread;
+ virtual void thread(shared_ptr<Thread> value) {_thread = value;}
- virtual void thread(Thread* value) {_thread = value;}
+ private:
- Thread* _thread;
+ weak_ptr<Thread> _thread;
};
/** Minimal thread class. Returned by thread factory bound to a Runnable object and ready to start execution. More or less analogous to java.lang.Thread
@@ -55,15 +60,14 @@
/** Gets the runnable object this thread is hosting */
- virtual Runnable* runnable() const {return _runnable;}
+ virtual shared_ptr<Runnable> runnable() const {return _runnable;}
protected:
- virtual void runnable(Runnable* value, bool x=false) {_runnable = value; _runnable->thread(this);}
+ virtual void runnable(shared_ptr<Runnable> value) {_runnable = value;}
private:
-
- Runnable* _runnable;
+ shared_ptr<Runnable> _runnable;
};
/** Factory to create platform-specific thread object and bind them to Runnable object for execution */
@@ -74,7 +78,7 @@
virtual ~ThreadFactory() {}
- virtual Thread* newThread(Runnable* runnable) const = 0;
+ virtual shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const = 0;
};
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
index ca2bbb5..a5b8f05 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cc
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -2,12 +2,20 @@
#include "Exception.h"
#include "Monitor.h"
+#include <boost/shared_ptr.hpp>
+
#include <assert.h>
#include <queue>
#include <set>
+#if defined(DEBUG)
+#include <iostream>
+#endif //defined(DEBUG)
+
namespace facebook { namespace thrift { namespace concurrency {
+using namespace boost;
+
/** ThreadManager class
@@ -21,9 +29,16 @@
public:
- Impl() : _state(ThreadManager::UNINITIALIZED) {}
+ Impl() :
+ _workerCount(0),
+ _workerMaxCount(0),
+ _idleCount(0),
+ _state(ThreadManager::UNINITIALIZED)
+ {}
- ~Impl() {stop();}
+ ~Impl() {
+ stop();
+ }
void start();
@@ -33,14 +48,14 @@
return _state;
};
- const ThreadFactory* threadFactory() const {
+ shared_ptr<ThreadFactory> threadFactory() const {
Synchronized s(_monitor);
return _threadFactory;
}
- void threadFactory(const ThreadFactory* value) {
+ void threadFactory(shared_ptr<ThreadFactory> value) {
Synchronized s(_monitor);
@@ -74,9 +89,9 @@
return _tasks.size() + _workerCount - _idleCount;
}
- void add(Runnable* value);
+ void add(shared_ptr<Runnable> value);
- void remove(Runnable* task);
+ void remove(shared_ptr<Runnable> task);
private:
@@ -88,11 +103,11 @@
ThreadManager::STATE _state;
- const ThreadFactory* _threadFactory;
+ shared_ptr<ThreadFactory> _threadFactory;
friend class ThreadManager::Task;
- std::queue<Task*> _tasks;
+ std::queue<shared_ptr<Task> > _tasks;
Monitor _monitor;
@@ -100,9 +115,9 @@
friend class ThreadManager::Worker;
- std::set<Thread*> _workers;
+ std::set<shared_ptr<Thread> > _workers;
- std::set<Thread*> _deadWorkers;
+ std::set<shared_ptr<Thread> > _deadWorkers;
};
class ThreadManager::Task : public Runnable {
@@ -115,7 +130,7 @@
COMPLETE
};
- Task(Runnable* runnable) :
+ Task(shared_ptr<Runnable> runnable) :
_runnable(runnable),
_state(WAITING)
{}
@@ -131,7 +146,7 @@
private:
- Runnable* _runnable;
+ shared_ptr<Runnable> _runnable;
friend class ThreadManager::Worker;
@@ -199,7 +214,7 @@
while(active) {
- ThreadManager::Task* task = NULL;
+ shared_ptr<ThreadManager::Task> task;
/* While holding manager monitor block for non-empty task queue (Also check that the thread hasn't been requested to stop).
@@ -260,10 +275,6 @@
// XXX need to log this
}
-
- delete task;
-
- task = NULL;
}
}
}
@@ -294,13 +305,13 @@
void ThreadManager::Impl::addWorker(size_t value) {
- std::set<Thread*> newThreads;
+ std::set<shared_ptr<Thread> > newThreads;
for(size_t ix = 0; ix < value; ix++) {
class ThreadManager::Worker;
- ThreadManager::Worker* worker = new ThreadManager::Worker(this);
+ shared_ptr<ThreadManager::Worker> worker = shared_ptr<ThreadManager::Worker>(new ThreadManager::Worker(this));
newThreads.insert(_threadFactory->newThread(worker));
}
@@ -312,9 +323,9 @@
_workers.insert(newThreads.begin(), newThreads.end());
}
- for(std::set<Thread*>::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
+ for(std::set<shared_ptr<Thread> >::iterator ix = newThreads.begin(); ix != newThreads.end(); ix++) {
- ThreadManager::Worker* worker = (ThreadManager::Worker*)(*ix)->runnable();
+ shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
worker->_state = ThreadManager::Worker::STARTING;
@@ -378,13 +389,14 @@
_state = ThreadManager::STOPPING;
}
- // Don't block for stopping->stopped transition here, since if stop is being performed in context of a delete, the monitor may be invalid
+ // XXX
+ // should be able to block here for transition to STOPPED since we're now using shared_ptrs
}
void ThreadManager::Impl::removeWorker(size_t value) {
- std::set<Thread*> removedThreads;
+ std::set<shared_ptr<Thread> > removedThreads;
{Synchronized s(_monitor);
@@ -413,20 +425,17 @@
_workerMonitor.wait();
}
- for(std::set<Thread*>::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) {
+ for(std::set<shared_ptr<Thread> >::iterator ix = _deadWorkers.begin(); ix != _deadWorkers.end(); ix++) {
_workers.erase(*ix);
- delete (*ix)->runnable();
-
- delete (*ix);
}
_deadWorkers.clear();
}
}
-void ThreadManager::Impl::add(Runnable* value) {
+void ThreadManager::Impl::add(shared_ptr<Runnable> value) {
Synchronized s(_monitor);
@@ -435,7 +444,7 @@
throw IllegalStateException();
}
- _tasks.push(new ThreadManager::Task(value));
+ _tasks.push(shared_ptr<ThreadManager::Task>(new ThreadManager::Task(value)));
/* If idle thread is available notify it, otherwise all worker threads are running and will get around to this
task in time. */
@@ -446,7 +455,7 @@
}
}
-void ThreadManager::Impl::remove(Runnable* task) {
+void ThreadManager::Impl::remove(shared_ptr<Runnable> task) {
Synchronized s(_monitor);
@@ -479,12 +488,12 @@
};
-ThreadManager* ThreadManager::newThreadManager() {
- return new ThreadManager::Impl();
+shared_ptr<ThreadManager> ThreadManager::newThreadManager() {
+ return shared_ptr<ThreadManager>(new ThreadManager::Impl());
}
-ThreadManager* ThreadManager::newSimpleThreadManager(size_t count) {
- return new SimpleThreadManager(count);
+shared_ptr<ThreadManager> ThreadManager::newSimpleThreadManager(size_t count) {
+ return shared_ptr<ThreadManager>(new SimpleThreadManager(count));
}
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/ThreadManager.h b/lib/cpp/src/concurrency/ThreadManager.h
index aa5a98a..f365643 100644
--- a/lib/cpp/src/concurrency/ThreadManager.h
+++ b/lib/cpp/src/concurrency/ThreadManager.h
@@ -1,12 +1,16 @@
#if !defined(_concurrency_ThreadManager_h_)
#define _concurrency_ThreadManager_h_ 1
+#include <boost/shared_ptr.hpp>
+
#include <sys/types.h>
#include "Thread.h"
namespace facebook { namespace thrift { namespace concurrency {
+using namespace boost;
+
/** Thread Pool Manager and related classes
@author marc
@@ -52,9 +56,9 @@
virtual const STATE state() const = 0;
- virtual const ThreadFactory* threadFactory() const = 0;
+ virtual shared_ptr<ThreadFactory> threadFactory() const = 0;
- virtual void threadFactory(const ThreadFactory* value) = 0;
+ virtual void threadFactory(shared_ptr<ThreadFactory> value) = 0;
virtual void addWorker(size_t value=1) = 0;
@@ -76,19 +80,22 @@
virtual size_t totalTaskCount() const = 0;
- /** Adds a task to be execued at some time in the future by a worker thread. */
+ /** Adds a task to be execued at some time in the future by a worker thread.
- virtual void add(Runnable* value) = 0;
+ @param value The task to run */
+
+
+ virtual void add(shared_ptr<Runnable>value) = 0;
/** Removes a pending task */
- virtual void remove(Runnable* task) = 0;
+ virtual void remove(shared_ptr<Runnable> task) = 0;
- static ThreadManager* newThreadManager();
+ static shared_ptr<ThreadManager> newThreadManager();
/** Creates a simple thread manager the uses count number of worker threads */
- static ThreadManager* newSimpleThreadManager(size_t count=4);
+ static shared_ptr<ThreadManager> newSimpleThreadManager(size_t count=4);
class Task;
diff --git a/lib/cpp/src/concurrency/TimerManager.cc b/lib/cpp/src/concurrency/TimerManager.cc
index 7952122..a223a77 100644
--- a/lib/cpp/src/concurrency/TimerManager.cc
+++ b/lib/cpp/src/concurrency/TimerManager.cc
@@ -13,7 +13,7 @@
@author marc
@version $Id:$ */
-typedef std::multimap<long long, TimerManager::Task*>::iterator task_iterator;
+typedef std::multimap<long long, shared_ptr<TimerManager::Task> >::iterator task_iterator;
typedef std::pair<task_iterator, task_iterator> task_range;
class TimerManager::Task : public Runnable {
@@ -26,12 +26,14 @@
COMPLETE
};
- Task(Runnable* runnable) :
+ Task(shared_ptr<Runnable> runnable) :
_runnable(runnable),
_state(WAITING)
{}
- ~Task() {};
+ ~Task() {
+ std::cerr << "TimerManager::Task.dtor[" << this << "]" << std::endl; //debug
+};
void run() {
if(_state == EXECUTING) {
@@ -42,7 +44,7 @@
private:
- Runnable* _runnable;
+ shared_ptr<Runnable> _runnable;
class TimerManager::Dispatcher;
@@ -58,7 +60,9 @@
_manager(manager) {
}
- ~Dispatcher() {}
+ ~Dispatcher() {
+ std::cerr << "Dispatcher::dtor[" << this << "]" << std::endl; //debug
+ }
/** Dispatcher entry point
@@ -78,7 +82,7 @@
do {
- std::set<TimerManager::Task*> expiredTasks;
+ std::set<shared_ptr<TimerManager::Task> > expiredTasks;
{Synchronized s(_manager->_monitor);
@@ -107,7 +111,7 @@
for(task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) {
- TimerManager::Task* task = ix->second;
+ shared_ptr<TimerManager::Task> task = ix->second;
expiredTasks.insert(task);
@@ -123,11 +127,9 @@
}
}
- for(std::set<Task*>::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
+ for(std::set<shared_ptr<Task> >::iterator ix = expiredTasks.begin(); ix != expiredTasks.end(); ix++) {
(*ix)->run();
-
- delete *ix;
}
} while(_manager->_state == TimerManager::STARTED);
@@ -156,7 +158,7 @@
TimerManager::TimerManager() :
_taskCount(0),
_state(TimerManager::UNINITIALIZED),
- _dispatcher(new Dispatcher(this)) {
+ _dispatcher(shared_ptr<Dispatcher>(new Dispatcher(this))) {
}
@@ -164,6 +166,8 @@
/* If we haven't been explicitly stopped, do so now. We don't need to grab the monitor here, since
stop already takes care of reentrancy. */
+
+ std::cerr << "TimerManager::dtor[" << this << "]" << std::endl;
if(_state != STOPPED) {
@@ -172,6 +176,8 @@
stop();
} catch(...) {
+ std::cerr << "TimerManager::dtor[" << this << "] uhoh " << std::endl;
+ throw;
// uhoh
@@ -244,23 +250,23 @@
for(task_iterator ix = _taskMap.begin(); ix != _taskMap.end(); ix++) {
- delete ix->second;
-
_taskMap.erase(ix);
}
- delete _dispatcher;
+ // Remove dispatcher's reference to us.
+
+ _dispatcher->_manager = NULL;
}
}
-const ThreadFactory* TimerManager::threadFactory() const {
+shared_ptr<const ThreadFactory> TimerManager::threadFactory() const {
Synchronized s(_monitor);
return _threadFactory;
}
-void TimerManager::threadFactory(const ThreadFactory* value) {
+void TimerManager::threadFactory(shared_ptr<const ThreadFactory> value) {
Synchronized s(_monitor);
@@ -272,7 +278,7 @@
return _taskCount;
}
-void TimerManager::add(Runnable* task, long long timeout) {
+void TimerManager::add(shared_ptr<Runnable> task, long long timeout) {
long long now = Util::currentTime();
@@ -286,7 +292,7 @@
_taskCount++;
- _taskMap.insert(std::pair<long long, Task*>(timeout, new Task(task)));
+ _taskMap.insert(std::pair<long long, shared_ptr<Task> >(timeout, shared_ptr<Task>(new Task(task))));
/* If the task map was empty, or if we have an expiration that is earlier than any previously seen,
kick the dispatcher so it can update its timeout */
@@ -298,7 +304,7 @@
}
}
-void TimerManager::add(Runnable* task, const struct timespec& value) {
+void TimerManager::add(shared_ptr<Runnable> task, const struct timespec& value) {
long long expiration;
@@ -314,7 +320,7 @@
}
-void TimerManager::remove(Runnable* task) {
+void TimerManager::remove(shared_ptr<Runnable> task) {
{Synchronized s(_monitor);
if(_state != TimerManager::STARTED) {
diff --git a/lib/cpp/src/concurrency/TimerManager.h b/lib/cpp/src/concurrency/TimerManager.h
index 2b92f23..c0e6340 100644
--- a/lib/cpp/src/concurrency/TimerManager.h
+++ b/lib/cpp/src/concurrency/TimerManager.h
@@ -5,12 +5,16 @@
#include "Monitor.h"
#include "Thread.h"
+#include <boost/shared_ptr.hpp>
+
#include <map>
#include <time.h>
namespace facebook { namespace thrift { namespace concurrency {
+using namespace boost;
+
/** Timer Manager
This class dispatches timer tasks when they fall due.
@@ -26,9 +30,9 @@
virtual ~TimerManager();
- virtual const ThreadFactory* threadFactory() const;
+ virtual shared_ptr<const ThreadFactory> threadFactory() const;
- virtual void threadFactory(const ThreadFactory* value);
+ virtual void threadFactory(shared_ptr<const ThreadFactory> value);
/** Starts the timer manager service
@@ -47,14 +51,14 @@
@param task The task to execute
@param timeout Time in milliseconds to delay before executing task */
- virtual void add(Runnable* task, long long timeout);
+ virtual void add(shared_ptr<Runnable> task, long long timeout);
/** Adds a task to be executed at some time in the future by a worker thread.
@param task The task to execute
@param timeout Absolute time in the future to execute task. */
- virtual void add(Runnable* task, const struct timespec& timeout);
+ virtual void add(shared_ptr<Runnable> task, const struct timespec& timeout);
/** Removes a pending task
@@ -63,7 +67,7 @@
@throws UncancellableTaskException Specified task is already being executed or has completed execution. */
- virtual void remove(Runnable* task);
+ virtual void remove(shared_ptr<Runnable> task);
enum STATE {
UNINITIALIZED,
@@ -77,13 +81,13 @@
private:
- const ThreadFactory* _threadFactory;
+ shared_ptr<const ThreadFactory> _threadFactory;
class Task;
friend class Task;
- std::multimap<long long, Task*> _taskMap;
+ std::multimap<long long, shared_ptr<Task> > _taskMap;
size_t _taskCount;
@@ -95,9 +99,9 @@
friend class Dispatcher;
- Dispatcher* _dispatcher;
+ shared_ptr<Dispatcher> _dispatcher;
- Thread* _dispatcherThread;
+ shared_ptr<Thread> _dispatcherThread;
};
diff --git a/lib/cpp/src/concurrency/Util.h b/lib/cpp/src/concurrency/Util.h
index 6e8891d..fc24f74 100644
--- a/lib/cpp/src/concurrency/Util.h
+++ b/lib/cpp/src/concurrency/Util.h
@@ -5,7 +5,7 @@
#include <assert.h>
#include <stddef.h>
-#include <sys/time.h>
+#include <time.h>
namespace facebook { namespace thrift { namespace concurrency {
@@ -55,9 +55,9 @@
struct timespec now;
- assert(clock_gettime(&now, NULL) == 0);
+ assert(clock_gettime(CLOCK_REALTIME, &now) == 0);
- return = (now.tv_sec * MS_PER_S) + (now.tv_nsec / NS_PER_MS) + (now.tv_nsec % NS_PER_MS >= 500000 ? 1 : 0) ;
+ return (now.tv_sec * MS_PER_S) + (now.tv_nsec / NS_PER_MS) + (now.tv_nsec % NS_PER_MS >= 500000 ? 1 : 0) ;
#elif defined(HAVE_GETTIMEOFDAY)
@@ -66,11 +66,11 @@
assert(gettimeofday(&now, NULL) == 0);
return (((long long)now.tv_sec) * MS_PER_S) + (now.tv_usec / MS_PER_S) + (now.tv_usec % MS_PER_S >= 500 ? 1 : 0);
+
#endif // defined(HAVE_GETTIMEDAY)
}
};
-
}}} // facebook::thrift::concurrency
#endif // !defined(_concurrency_Util_h_)
diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
index d1ec0df..c019159 100644
--- a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
+++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
@@ -1,7 +1,7 @@
-#include <Thread.h>
-#include <PosixThreadFactory.h>
-#include <Monitor.h>
-#include <Util.h>
+#include <concurrency/Thread.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Util.h>
#include <assert.h>
#include <iostream>
@@ -18,10 +18,15 @@
class ThreadFactoryTests {
- class Task: public Runnable {
+public:
+
+ static const double ERROR;
+
+ class Task: public Runnable {
public:
+
Task() {}
void run() {
@@ -29,26 +34,20 @@
}
};
-public:
-
/** Hello world test */
bool helloWorldTest() {
PosixThreadFactory threadFactory = PosixThreadFactory();
- Task* task = new ThreadFactoryTests::Task();
+ shared_ptr<Task> task = shared_ptr<Task>(new ThreadFactoryTests::Task());
- Thread* thread = threadFactory.newThread(task);
+ shared_ptr<Thread> thread = threadFactory.newThread(task);
thread->start();
thread->join();
- delete thread;
-
- delete task;
-
std::cout << "\t\t\tSuccess!" << std::endl;
return true;
@@ -92,13 +91,13 @@
PosixThreadFactory threadFactory = PosixThreadFactory();
- std::set<Thread*> threads;
+ std::set<shared_ptr<Thread> > threads;
for(int ix = 0; ix < count; ix++) {
- threads.insert(threadFactory.newThread(new ReapNTask(*monitor, *activeCount)));
+ threads.insert(threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, *activeCount))));
}
- for(std::set<Thread*>::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
+ for(std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
(*thread)->start();
}
@@ -111,11 +110,9 @@
}
}
- for(std::set<Thread*>::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
+ for(std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
- delete (*thread)->runnable();
-
- delete *thread;
+ threads.erase(*thread);
}
std::cout << "\t\t\tSuccess!" << std::endl;
@@ -177,11 +174,11 @@
SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED;
- SynchStartTask* task = new SynchStartTask(monitor, state);
+ shared_ptr<SynchStartTask> task = shared_ptr<SynchStartTask>(new SynchStartTask(monitor, state));
PosixThreadFactory threadFactory = PosixThreadFactory();
- Thread* thread = threadFactory.newThread(task);
+ shared_ptr<Thread> thread = threadFactory.newThread(task);
if(state == SynchStartTask::UNINITIALIZED) {
@@ -247,16 +244,15 @@
error *= 1.0;
}
- bool success = error < .10;
+ bool success = error < ThreadFactoryTests::ERROR;
std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << count * timeout << "ms elapsed time: "<< endTime - startTime << "ms error%: " << error * 100.0 << std::endl;
return success;
}
};
-
-}}}} // facebook::thrift::concurrency
+const double ThreadFactoryTests::ERROR = .20;
-using namespace facebook::thrift::concurrency::test;
+}}}} // facebook::thrift::concurrency::test
diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
index 8b2dda8..7e74aac 100644
--- a/lib/cpp/src/concurrency/test/ThreadManagerTests.h
+++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
@@ -1,8 +1,8 @@
#include <config.h>
-#include <ThreadManager.h>
-#include <PosixThreadFactory.h>
-#include <Monitor.h>
-#include <Util.h>
+#include <concurrency/ThreadManager.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Util.h>
#include <assert.h>
#include <set>
@@ -23,6 +23,8 @@
public:
+ static const double ERROR;
+
class Task: public Runnable {
public:
@@ -78,9 +80,9 @@
size_t activeCount = count;
- ThreadManager* threadManager = ThreadManager::newSimpleThreadManager(workerCount);
+ shared_ptr<ThreadManager> threadManager = ThreadManager::newSimpleThreadManager(workerCount);
- PosixThreadFactory* threadFactory = new PosixThreadFactory();
+ shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
threadFactory->priority(PosixThreadFactory::HIGHEST);
@@ -88,16 +90,16 @@
threadManager->start();
- std::set<ThreadManagerTests::Task*> tasks;
+ std::set<shared_ptr<ThreadManagerTests::Task> > tasks;
for(size_t ix = 0; ix < count; ix++) {
- tasks.insert(new ThreadManagerTests::Task(monitor, activeCount, timeout));
+ tasks.insert(shared_ptr<ThreadManagerTests::Task>(new ThreadManagerTests::Task(monitor, activeCount, timeout)));
}
long long time00 = Util::currentTime();
- for(std::set<ThreadManagerTests::Task*>::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+ for(std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
threadManager->add(*ix);
}
@@ -119,9 +121,9 @@
long long minTime = 9223372036854775807LL;
long long maxTime = 0;
- for(std::set<ThreadManagerTests::Task*>::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
+ for(std::set<shared_ptr<ThreadManagerTests::Task> >::iterator ix = tasks.begin(); ix != tasks.end(); ix++) {
- ThreadManagerTests::Task* task = *ix;
+ shared_ptr<ThreadManagerTests::Task> task = *ix;
long long delta = task->_endTime - task->_startTime;
@@ -144,8 +146,6 @@
}
averageTime+= delta;
-
- delete *ix;
}
averageTime /= count;
@@ -160,18 +160,16 @@
error*= -1.0;
}
- bool success = error < .10;
-
- delete threadManager;
-
- delete threadFactory;
+ bool success = error < ERROR;
std::cout << "\t\t\t" << (success ? "Success" : "Failure") << "! expected time: " << expectedTime << "ms elapsed time: "<< time01 - time00 << "ms error%: " << error * 100.0 << std::endl;
return success;
}
};
-
+
+const double ThreadManagerTests::ERROR = .20;
+
}}}} // facebook::thrift::concurrency
using namespace facebook::thrift::concurrency::test;
diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.h b/lib/cpp/src/concurrency/test/TimerManagerTests.h
index 3c7fc0b..fe56d31 100644
--- a/lib/cpp/src/concurrency/test/TimerManagerTests.h
+++ b/lib/cpp/src/concurrency/test/TimerManagerTests.h
@@ -1,7 +1,7 @@
-#include <TimerManager.h>
-#include <PosixThreadFactory.h>
-#include <Monitor.h>
-#include <Util.h>
+#include <concurrency/TimerManager.h>
+#include <concurrency/PosixThreadFactory.h>
+#include <concurrency/Monitor.h>
+#include <concurrency/Util.h>
#include <assert.h>
#include <iostream>
@@ -17,17 +17,23 @@
class TimerManagerTests {
+ public:
+
+ static const double ERROR;
+
class Task: public Runnable {
public:
- Task(Monitor& monitor, long long timeout) :
- _timeout(timeout),
- _startTime(Util::currentTime()),
- _monitor(monitor),
- _success(false),
+ Task(Monitor& monitor, long long timeout) :
+ _timeout(timeout),
+ _startTime(Util::currentTime()),
+ _monitor(monitor),
+ _success(false),
_done(false) {}
+ ~Task() {std::cerr << this << std::endl;}
+
void run() {
_endTime = Util::currentTime();
@@ -41,19 +47,20 @@
float error = delta / _timeout;
- if(error < .10) {
+ if(error < ERROR) {
_success = true;
}
- std::cout << "\t\t\tHello World" << std::endl;
-
_done = true;
-
+
+ std::cout << "\t\t\tTimerManagerTests::Task[" << this << "] done" << std::endl; //debug
+
{Synchronized s(_monitor);
_monitor.notifyAll();
}
}
+
long long _timeout;
long long _startTime;
@@ -63,27 +70,25 @@
bool _done;
};
-public:
-
/** This test creates two tasks and waits for the first to expire within 10% of the expected expiration time. It then verifies that
the timer manager properly clean up itself and the remaining orphaned timeout task when the manager goes out of scope and its
destructor is called. */
bool test00(long long timeout=1000LL) {
- TimerManagerTests::Task* orphanTask = new TimerManagerTests::Task(_monitor, 10 * timeout);
+ shared_ptr<TimerManagerTests::Task> orphanTask = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, 10 * timeout));
{
TimerManager timerManager;
- timerManager.threadFactory(new PosixThreadFactory());
+ timerManager.threadFactory(shared_ptr<PosixThreadFactory>(new PosixThreadFactory()));
timerManager.start();
assert(timerManager.state() == TimerManager::STARTED);
- TimerManagerTests::Task* task = new TimerManagerTests::Task(_monitor, timeout);
+ shared_ptr<TimerManagerTests::Task> task = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, timeout));
{Synchronized s(_monitor);
@@ -98,16 +103,12 @@
std::cout << "\t\t\t" << (task->_success ? "Success" : "Failure") << "!" << std::endl;
-
- delete task;
}
// timerManager.stop(); This is where it happens via destructor
assert(!orphanTask->_done);
- delete orphanTask;
-
return true;
}
@@ -115,7 +116,8 @@
Monitor _monitor;
};
-
+const double TimerManagerTests::ERROR = .20;
+
}}}} // facebook::thrift::concurrency