Modified PosixThreadFactory
Added explicit detached getter and setter
Modified PosixThreadFactory::~PThread:
Check for join failing and don't transition to detached_ state if it does. Potential thread-handle leak for
threads created joinable who aren't referenced by any external thread. Solution for now has to be
"DONT DO THAT", the clever approach doesn't always work.
Added ThreadFactoryTests.floodNThreads:
Loop M times for N threads where M x N is bigger than 32K to verify that detached threads can be created
ad infinitum.
Reviewed By: mcslee
Revert Plan: revertible
Test Plan: concurrency_test thread-factory passes
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665130 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
index 2e8ed47..73aba61 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
@@ -63,13 +63,13 @@
~PthreadThread() {
/* Nothing references this thread, if is is not detached, do a join
- now, otherwise the thread-id and, possibly, other resources will
+ now, otherwise the thread-id and, possibly, other resources will
be leaked. */
if(!detached_) {
try {
join();
} catch(...) {
- // We're really hosed.
+ // We're really hosed.
}
}
}
@@ -84,9 +84,9 @@
throw SystemResourceException("pthread_attr_init failed");
}
- if(pthread_attr_setdetachstate(&thread_attr,
- detached_ ?
- PTHREAD_CREATE_DETACHED :
+ if(pthread_attr_setdetachstate(&thread_attr,
+ detached_ ?
+ PTHREAD_CREATE_DETACHED :
PTHREAD_CREATE_JOINABLE) != 0) {
throw SystemResourceException("pthread_attr_setdetachstate failed");
}
@@ -123,12 +123,18 @@
void join() {
if (!detached_ && state_ != uninitialized) {
void* ignore;
- pthread_join(pthread_, &ignore);
- detached_ = true;
+ /* XXX
+ If join fails it is most likely due to the fact
+ that the last reference was the thread itself and cannot
+ join. This results in leaked threads and will eventually
+ cause the process to run out of thread resources.
+ We're beyond the point of throwing an exception. Not clear how
+ best to handle this. */
+ detached_ = pthread_join(pthread_, &ignore) == 0;
}
}
- id_t id() {
+ id_t getId() {
return static_cast<id_t>(pthread_);
}
@@ -234,13 +240,11 @@
return result;
}
- int stackSize() const { return stackSize_; }
+ int getStackSize() const { return stackSize_; }
- void stackSize(int value) { stackSize_ = value; }
+ void setStackSize(int value) { stackSize_ = value; }
- PRIORITY priority() const { return priority_; }
-
- Thread::id_t currentThreadId() const {return static_cast<id_t>(pthread_self());}
+ PRIORITY getPriority() const { return priority_; }
/**
* Sets priority.
@@ -248,7 +252,14 @@
* XXX
* Need to handle incremental priorities properly.
*/
- void priority(PRIORITY value) { priority_ = value; }
+ void setPriority(PRIORITY value) { priority_ = value; }
+
+ bool isDetached() const { return detached_; }
+
+ void setDetached(bool value) { detached_ = value; }
+
+ Thread::id_t getCurrentThreadId() const {return static_cast<id_t>(pthread_self());}
+
};
PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
@@ -256,14 +267,18 @@
shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
-int PosixThreadFactory::stackSize() const { return impl_->stackSize(); }
+int PosixThreadFactory::getStackSize() const { return impl_->getStackSize(); }
-void PosixThreadFactory::stackSize(int value) { impl_->stackSize(value); }
+void PosixThreadFactory::setStackSize(int value) { impl_->setStackSize(value); }
-PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return impl_->priority(); }
+PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { return impl_->getPriority(); }
-void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { impl_->priority(value); }
+void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { impl_->setPriority(value); }
-Thread::id_t PosixThreadFactory::currentThreadId() const { return impl_->currentThreadId(); }
+bool PosixThreadFactory::isDetached() const { return impl_->isDetached(); }
+
+void PosixThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
+
+Thread::id_t PosixThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h
index 16be14d..894d5f5 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.h
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.h
@@ -11,10 +11,10 @@
#include <boost/shared_ptr.hpp>
-namespace facebook { namespace thrift { namespace concurrency {
+namespace facebook { namespace thrift { namespace concurrency {
/**
- * A thread factory to create posix threads
+ * A thread factory to create posix threads
*
* @author marc
* @version $Id:$
@@ -52,20 +52,20 @@
DECREMENT = 8
};
- /**
- * Posix thread (pthread) factory. All threads created by a factory are reference-counted
- * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and
+ /**
+ * Posix thread (pthread) factory. All threads created by a factory are reference-counted
+ * via boost::shared_ptr and boost::weak_ptr. The factory guarantees that threads and
* the Runnable tasks they host will be properly cleaned up once the last strong reference
* to both is given up.
*
* Threads are created with the specified policy, priority, stack-size and detachable-mode
- * detached means the thread is free-running and will release all system resources the
- * when it completes. A detachable thread is not joinable. The join method
- * of a detachable thread will return immediately with no error.
+ * detached means the thread is free-running and will release all system resources the
+ * when it completes. A detachable thread is not joinable. The join method
+ * of a detachable thread will return immediately with no error.
*
* Joinable threads will detach themselves iff they were not explicitly joined and
* there are no remaining strong references to the thread. This guarantees that
- * joinnable threads don't leak resources even when the application neglects to
+ * joinnable threads don't leak resources even when the application neglects to
* call join explicitly.
*
* By default threads are joinable.
@@ -77,32 +77,42 @@
boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const;
// From ThreadFactory;
- Thread::id_t currentThreadId() const;
-
- /**
- * Sets stack size for created threads
- *
- * @param value size in megabytes
- */
- virtual void stackSize(int value);
+ Thread::id_t getCurrentThreadId() const;
/**
* Gets stack size for created threads
*
* @return int size in megabytes
*/
- virtual int stackSize() const;
+ virtual int getStackSize() const;
/**
- * Sets priority relative to current policy
+ * Sets stack size for created threads
+ *
+ * @param value size in megabytes
*/
- virtual void priority(PRIORITY priority);
+ virtual void setStackSize(int value);
/**
* Gets priority relative to current policy
*/
- virtual PRIORITY priority() const;
-
+ virtual PRIORITY getPriority() const;
+
+ /**
+ * Sets priority relative to current policy
+ */
+ virtual void setPriority(PRIORITY priority);
+
+ /**
+ * Sets detached mode of threads
+ */
+ virtual void setDetached(bool detached);
+
+ /**
+ * Gets current detached mode
+ */
+ virtual bool isDetached() const;
+
private:
class Impl;
boost::shared_ptr<Impl> impl_;
diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h
index 3e5964f..39ee816 100644
--- a/lib/cpp/src/concurrency/Thread.h
+++ b/lib/cpp/src/concurrency/Thread.h
@@ -75,7 +75,7 @@
/**
* Gets the thread's platform-specific ID
*/
- virtual id_t id() = 0;
+ virtual id_t getId() = 0;
/**
* Gets the runnable object this thread is hosting
@@ -104,7 +104,7 @@
static const Thread::id_t unknown_thread_id;
- virtual Thread::id_t currentThreadId() const = 0;
+ virtual Thread::id_t getCurrentThreadId() const = 0;
};
}}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp
index 56de4d0..3d87724 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/concurrency/ThreadManager.cpp
@@ -308,7 +308,7 @@
shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
worker->state_ = ThreadManager::Worker::STARTING;
(*ix)->start();
- idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->id(), *ix));
+ idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
}
{
@@ -400,7 +400,7 @@
for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
workers_.erase(*ix);
- idMap_.erase((*ix)->id());
+ idMap_.erase((*ix)->getId());
}
deadWorkers_.clear();
@@ -408,7 +408,7 @@
}
bool ThreadManager::Impl::canSleep() {
- const Thread::id_t id = threadFactory_->currentThreadId();
+ const Thread::id_t id = threadFactory_->getCurrentThreadId();
return idMap_.find(id) == idMap_.end();
}
diff --git a/lib/cpp/src/concurrency/test/Tests.cpp b/lib/cpp/src/concurrency/test/Tests.cpp
index 5fe4651..a160472 100644
--- a/lib/cpp/src/concurrency/test/Tests.cpp
+++ b/lib/cpp/src/concurrency/test/Tests.cpp
@@ -33,11 +33,17 @@
std::cout << "ThreadFactory tests..." << std::endl;
size_t count = 1000;
+ size_t floodLoops = 1;
+ size_t floodCount = 100000;
std::cout << "\t\tThreadFactory reap N threads test: N = " << count << std::endl;
assert(threadFactoryTests.reapNThreads(count));
+ std::cout << "\t\tThreadFactory floodN threads test: N = " << floodCount << std::endl;
+
+ assert(threadFactoryTests.floodNTest(floodLoops, floodCount));
+
std::cout << "\t\tThreadFactory synchronous start test" << std::endl;
assert(threadFactoryTests.synchStartTest());
@@ -134,4 +140,3 @@
}
}
}
-
diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
index 4cd6bd5..99bc94e 100644
--- a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
+++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
@@ -112,7 +112,7 @@
throw e;
}
}
-
+
tix = 0;
for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); tix++, ++thread) {
@@ -123,14 +123,14 @@
throw e;
}
}
-
+
{
Synchronized s(*monitor);
while (*activeCount > 0) {
monitor->wait(1000);
}
}
-
+
for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
threads.erase(*thread);
}
@@ -276,6 +276,67 @@
return success;
}
+
+
+ class FloodTask : public Runnable {
+ public:
+
+ FloodTask(const size_t id) :_id(id) {}
+ ~FloodTask(){
+ if(_id % 1000 == 0) {
+ std::cout << "\t\tthread " << _id << " done" << std::endl;
+ }
+ }
+
+ void run(){
+ if(_id % 1000 == 0) {
+ std::cout << "\t\tthread " << _id << " started" << std::endl;
+ }
+
+ usleep(1);
+ }
+ const size_t _id;
+ };
+
+ void foo(PosixThreadFactory *tf) {
+ }
+
+ bool floodNTest(size_t loop=1, size_t count=100000) {
+
+ bool success = false;
+
+ for(size_t lix = 0; lix < loop; lix++) {
+
+ PosixThreadFactory threadFactory = PosixThreadFactory();
+ threadFactory.setDetached(true);
+
+ for(size_t tix = 0; tix < count; tix++) {
+
+ try {
+
+ shared_ptr<FloodTask> task(new FloodTask(lix * count + tix ));
+
+ shared_ptr<Thread> thread = threadFactory.newThread(task);
+
+ thread->start();
+
+ usleep(1);
+
+ } catch (TException& e) {
+
+ std::cout << "\t\t\tfailed to start " << lix * count + tix << " thread " << e.what() << std::endl;
+
+ return success;
+ }
+ }
+
+ std::cout << "\t\t\tflooded " << (lix + 1) * count << " threads" << std::endl;
+
+ success = true;
+ }
+
+ return success;
+ }
};
const double ThreadFactoryTests::ERROR = .20;
diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
index 5f518ca..a8fdcda 100644
--- a/lib/cpp/src/concurrency/test/ThreadManagerTests.h
+++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
@@ -100,7 +100,7 @@
shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
- threadFactory->priority(PosixThreadFactory::HIGHEST);
+ threadFactory->setPriority(PosixThreadFactory::HIGHEST);
threadManager->threadFactory(threadFactory);
@@ -239,7 +239,7 @@
shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
- threadFactory->priority(PosixThreadFactory::HIGHEST);
+ threadFactory->setPriority(PosixThreadFactory::HIGHEST);
threadManager->threadFactory(threadFactory);