cpp: non-blocking add for ThreadManager
It's rare for the ThreadManager mutex to be contended, but it is
possible. For nonblocking applications, it is necessary to have a
strict timeout for the lock acquisition. With this change, that timeout
is enforced. Also add timeout parameters to Mutex::lock and
Guard::Guard to support this feature.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@920679 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/Mutex.cpp b/lib/cpp/src/concurrency/Mutex.cpp
index 5d33c11..67d7a2c 100644
--- a/lib/cpp/src/concurrency/Mutex.cpp
+++ b/lib/cpp/src/concurrency/Mutex.cpp
@@ -18,6 +18,7 @@
*/
#include "Mutex.h"
+#include "Util.h"
#include <assert.h>
#include <pthread.h>
@@ -50,6 +51,12 @@
bool trylock() const { return (0 == pthread_mutex_trylock(&pthread_mutex_)); }
+ bool timedlock(int64_t milliseconds) const {
+ struct timespec ts;
+ Util::toTimespec(ts, milliseconds);
+ return (0 == pthread_mutex_timedlock(&pthread_mutex_, &ts));
+ }
+
void unlock() const { pthread_mutex_unlock(&pthread_mutex_); }
void* getUnderlyingImpl() const { return (void*) &pthread_mutex_; }
@@ -67,6 +74,8 @@
bool Mutex::trylock() const { return impl_->trylock(); }
+bool Mutex::timedlock(int64_t ms) const { return impl_->timedlock(ms); }
+
void Mutex::unlock() const { impl_->unlock(); }
void Mutex::DEFAULT_INITIALIZER(void* arg) {
diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h
index a33b995..94eb835 100644
--- a/lib/cpp/src/concurrency/Mutex.h
+++ b/lib/cpp/src/concurrency/Mutex.h
@@ -37,6 +37,7 @@
virtual ~Mutex() {}
virtual void lock() const;
virtual bool trylock() const;
+ virtual bool timedlock(int64_t milliseconds) const;
virtual void unlock() const;
void* getUnderlyingImpl() const;
@@ -75,18 +76,33 @@
class Guard {
public:
- Guard(const Mutex& value) : mutex_(value) {
- mutex_.lock();
+ Guard(const Mutex& value, int64_t timeout = 0) : mutex_(&value) {
+ if (timeout == 0) {
+ value.lock();
+ } else if (timeout < 0) {
+ if (!value.trylock()) {
+ mutex_ = NULL;
+ }
+ } else {
+ if (!value.timedlock(timeout)) {
+ mutex_ = NULL;
+ }
+ }
}
~Guard() {
- mutex_.unlock();
+ if (mutex_) {
+ mutex_->unlock();
+ }
+ }
+
+ operator bool() const {
+ return (mutex_ != NULL);
}
private:
- const Mutex& mutex_;
+ const Mutex* mutex_;
};
-
// Can be used as second argument to RWGuard to make code more readable
// as to whether we're doing acquireRead() or acquireWrite().
enum RWGuardType {
diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp
index a02ad74..d0bb41f 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/concurrency/ThreadManager.cpp
@@ -461,7 +461,11 @@
void ThreadManager::Impl::add(shared_ptr<Runnable> value,
int64_t timeout,
int64_t expiration) {
- Guard g(mutex_);
+ Guard g(mutex_, timeout);
+
+ if (!g) {
+ throw TimedOutException();
+ }
if (state_ != ThreadManager::STARTED) {
throw IllegalStateException();