Modified PosixThreadFactory::PThread:
Pay attention to detached flags. If thread is create non-detached and has not been joined when all references are given up,
(ie boost::share_ptr calls ~PThread) do the join in the destructor to prevent thread ids from being leaked.
Modified ThreadFactoryTests.reapNThreads:
Loop M times for M threads where M x N is bigger than 32K to verify that thread ids aren't leaked
Modified TimerManager.cpp:
Removed debug messages.
Reviewed By: mcslee
Revert Plan: revertible
Test Plan: concurrency_test thread-factory passes
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665129 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
index 5f86ea2..2e8ed47 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
@@ -46,35 +46,49 @@
int priority_;
int stackSize_;
weak_ptr<PthreadThread> self_;
+ bool detached_;
public:
- PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
+ PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr<Runnable> runnable) :
pthread_(0),
state_(uninitialized),
policy_(policy),
priority_(priority),
- stackSize_(stackSize) {
+ stackSize_(stackSize),
+ detached_(detached) {
this->Thread::runnable(runnable);
}
- ~PthreadThread() {}
+ ~PthreadThread() {
+ /* Nothing references this thread, if is is not detached, do a join
+ now, otherwise the thread-id and, possibly, other resources will
+ be leaked. */
+ if(!detached_) {
+ try {
+ join();
+ } catch(...) {
+ // We're really hosed.
+ }
+ }
+ }
void start() {
if (state_ != uninitialized) {
return;
}
- state_ = starting;
-
pthread_attr_t thread_attr;
if (pthread_attr_init(&thread_attr) != 0) {
throw SystemResourceException("pthread_attr_init failed");
}
- if (pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) != 0) {
- throw SystemResourceException("pthread_attr_setdetachstate failed");
+ if(pthread_attr_setdetachstate(&thread_attr,
+ detached_ ?
+ PTHREAD_CREATE_DETACHED :
+ PTHREAD_CREATE_JOINABLE) != 0) {
+ throw SystemResourceException("pthread_attr_setdetachstate failed");
}
// Set thread stack size
@@ -99,20 +113,23 @@
shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
*selfRef = self_.lock();
+ state_ = starting;
+
if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
throw SystemResourceException("pthread_create failed");
}
}
void join() {
- if (state_ != stopped) {
+ if (!detached_ && state_ != uninitialized) {
void* ignore;
pthread_join(pthread_, &ignore);
+ detached_ = true;
}
}
id_t id() {
- return pthread_;
+ return static_cast<id_t>(pthread_);
}
shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
@@ -211,7 +228,7 @@
* @param runnable A runnable object
*/
shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
- shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, runnable));
+ shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable));
result->weakRef(result);
runnable->thread(result);
return result;
@@ -223,7 +240,7 @@
PRIORITY priority() const { return priority_; }
- Thread::id_t currentThreadId() const { return pthread_self(); }
+ Thread::id_t currentThreadId() const {return static_cast<id_t>(pthread_self());}
/**
* Sets priority.
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h
index 5f032c1..16be14d 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.h
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.h
@@ -52,6 +52,25 @@
DECREMENT = 8
};
+ /**
+ * Posix thread (pthread) factory. All threads created by a factory are reference-counted
+ * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and
+ * the Runnable tasks they host will be properly cleaned up once the last strong reference
+ * to both is given up.
+ *
+ * Threads are created with the specified policy, priority, stack-size and detachable-mode
+ * detached means the thread is free-running and will release all system resources the
+ * when it completes. A detachable thread is not joinable. The join method
+ * of a detachable thread will return immediately with no error.
+ *
+ * Joinable threads will detach themselves iff they were not explicitly joined and
+ * there are no remaining strong references to the thread. This guarantees that
+ * joinnable threads don't leak resources even when the application neglects to
+ * call join explicitly.
+ *
+ * By default threads are joinable.
+ */
+
PosixThreadFactory(POLICY policy=ROUND_ROBIN, PRIORITY priority=NORMAL, int stackSize=1, bool detached=false);
// From ThreadFactory;
diff --git a/lib/cpp/src/concurrency/TimerManager.cpp b/lib/cpp/src/concurrency/TimerManager.cpp
index 2134e37..b9e604c 100644
--- a/lib/cpp/src/concurrency/TimerManager.cpp
+++ b/lib/cpp/src/concurrency/TimerManager.cpp
@@ -40,8 +40,6 @@
state_(WAITING) {}
~Task() {
- //debug
- std::cerr << "TimerManager::Task.dtor[" << this << "]" << std::endl;
}
void run() {
@@ -64,10 +62,7 @@
Dispatcher(TimerManager* manager) :
manager_(manager) {}
- ~Dispatcher() {
- // debug
- std::cerr << "Dispatcher::dtor[" << this << "]" << std::endl;
- }
+ ~Dispatcher() {}
/**
* Dispatcher entry point
@@ -148,13 +143,11 @@
// If we haven't been explicitly stopped, do so now. We don't need to grab
// the monitor here, since stop already takes care of reentrancy.
- std::cerr << "TimerManager::dtor[" << this << "]" << std::endl;
if (state_ != STOPPED) {
try {
stop();
} catch(...) {
- std::cerr << "TimerManager::dtor[" << this << "] uhoh " << std::endl;
throw;
// uhoh
}
diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
index f7d607a..4cd6bd5 100644
--- a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
+++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
@@ -47,7 +47,7 @@
*/
bool helloWorldTest() {
- PosixThreadFactory threadFactory = PosixThreadFactory();
+ PosixThreadFactory threadFactory = PosixThreadFactory();
shared_ptr<Task> task = shared_ptr<Task>(new ThreadFactoryTests::Task());
@@ -90,35 +90,52 @@
int& _count;
};
- bool reapNThreads(int count=10) {
-
- Monitor* monitor = new Monitor();
-
- int* activeCount = new int(count);
+ bool reapNThreads(int loop=1, int count=10) {
PosixThreadFactory threadFactory = PosixThreadFactory();
- std::set<shared_ptr<Thread> > threads;
+ Monitor* monitor = new Monitor();
- for (int ix = 0; ix < count; ix++) {
- threads.insert(threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, *activeCount))));
- }
+ for(int lix = 0; lix < loop; lix++) {
- for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
+ int* activeCount = new int(count);
- (*thread)->start();
- }
+ std::set<shared_ptr<Thread> > threads;
+ int tix;
- {
- Synchronized s(*monitor);
- while (*activeCount > 0) {
- monitor->wait(1000);
+ for (tix = 0; tix < count; tix++) {
+ try {
+ threads.insert(threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, *activeCount))));
+ } catch(SystemResourceException& e) {
+ std::cout << "\t\t\tfailed to create " << lix * count + tix << " thread " << e.what() << std::endl;
+ throw e;
+ }
}
- }
+
+ tix = 0;
+ for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); tix++, ++thread) {
- for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
- threads.erase(*thread);
+ try {
+ (*thread)->start();
+ } catch(SystemResourceException& e) {
+ std::cout << "\t\t\tfailed to start " << lix * count + tix << " thread " << e.what() << std::endl;
+ throw e;
+ }
+ }
+
+ {
+ Synchronized s(*monitor);
+ while (*activeCount > 0) {
+ monitor->wait(1000);
+ }
+ }
+
+ for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
+ threads.erase(*thread);
+ }
+
+ std::cout << "\t\t\treaped " << lix * count << " threads" << std::endl;
}
std::cout << "\t\t\tSuccess!" << std::endl;