Modified facebook::thrift::concurrency::Monitor.wait:
Throw TimedOutException on wait timeout so caller can distinguish between timeout and event.
Modified facebook::thrift::concurrency::PthreadThread.start:
Throw SystemrResourceException on any pthread_* function call failure rather than asserting 0.
Added facebook::thrift::concurrency::Thread.id() and facebook::thrift::concurrency::ThreadFactory.currentThreadId():
Return thread-id of thread and current thread respectively. Needed for reentrancy tests in ThreadManager
Added facebook::thrift::concurrency::ThreadManager.pendingTaskCountMaxN
Modified facebook::thrift::concurrency::ThreadManager.add():
Now support a maximum pending task count and block if the current pending task count is max.
If timeout is specified for add, TimedOutException is thrown if pending task count doesn't decrease
in the timeout interval. If add() is called by a ThreadManager worker thread and the task cannot
be added, a TooManyPendingTasksException is thrown rather than blocking, since deadlocks can ensue
if worker threads block waiting for works threads to complete tasks.
Reviewed By: mcslee, aditya
Revert Plan: revertible
Test Plan: concurrency/test/ThreadManagerTests.h
run concurrency-test thread-manager
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665120 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
index 675e95d..e433467 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
@@ -5,6 +5,7 @@
// http://developers.facebook.com/thrift/
#include "PosixThreadFactory.h"
+#include "Exception.h"
#include <assert.h>
#include <pthread.h>
@@ -18,7 +19,7 @@
using namespace boost;
/**
- * The POSIX thread class.
+ * The POSIX thread class.
*
* @author marc
* @version $Id:$
@@ -47,10 +48,10 @@
weak_ptr<PthreadThread> self_;
public:
-
- PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
+
+ PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
pthread_(0),
- state_(uninitialized),
+ state_(uninitialized),
policy_(policy),
priority_(priority),
stackSize_(stackSize) {
@@ -68,32 +69,39 @@
state_ = starting;
pthread_attr_t thread_attr;
- int ret = pthread_attr_init(&thread_attr);
- assert(ret == 0);
+ if(pthread_attr_init(&thread_attr) != 0) {
+ throw SystemResourceException("pthread_attr_init failed");
+ }
- ret = pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE);
- assert(ret == 0);
+ if(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) != 0) {
+ throw SystemResourceException("pthread_attr_setdetachstate failed");
+ }
// Set thread stack size
- ret = pthread_attr_setstacksize(&thread_attr, MB * stackSize_);
- assert(ret == 0);
+ if(pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
+ throw SystemResourceException("pthread_attr_setstacksize failed");
+ }
// Set thread policy
- ret = pthread_attr_setschedpolicy(&thread_attr, policy_);
- assert(ret == 0);
+ if(pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
+ throw SystemResourceException("pthread_attr_setschedpolicy failed");
+ }
struct sched_param sched_param;
sched_param.sched_priority = priority_;
// Set thread priority
- ret = pthread_attr_setschedparam(&thread_attr, &sched_param);
- assert(ret == 0);
+ if(pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
+ throw SystemResourceException("pthread_attr_setschedparam failed");
+ }
// Create reference
shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
*selfRef = self_.lock();
- ret = pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef);
- assert(ret == 0);
+
+ if(pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
+ throw SystemResourceException("pthread_create failed");
+ }
}
void join() {
@@ -103,6 +111,10 @@
}
}
+ id_t id() {
+ return pthread_;
+ }
+
shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
@@ -130,7 +142,7 @@
if (thread->state_ != stopping && thread->state_ != stopped) {
thread->state_ = stopping;
}
-
+
return (void*)0;
}
@@ -187,14 +199,14 @@
public:
- Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
+ Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
policy_(policy),
priority_(priority),
stackSize_(stackSize),
detached_(detached) {}
/**
- * Creates a new POSIX thread to run the runnable object
+ * Creates a new POSIX thread to run the runnable object
*
* @param runnable A runnable object
*/
@@ -211,6 +223,8 @@
PRIORITY priority() const { return priority_; }
+ Thread::id_t currentThreadId() const {return pthread_self();}
+
/**
* Sets priority.
*
@@ -220,7 +234,7 @@
void priority(PRIORITY value) { priority_ = value; }
};
-PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
+PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
@@ -233,4 +247,6 @@
void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { impl_->priority(value); }
+Thread::id_t PosixThreadFactory::currentThreadId() const {return impl_->currentThreadId();}
+
}}} // facebook::thrift::concurrency