THRIFT-3978: tighten up pthread mutex implementation, removing asserts and replacing them with exceptions
Client: cpp
This closes #1228
diff --git a/lib/cpp/src/thrift/concurrency/BoostMutex.cpp b/lib/cpp/src/thrift/concurrency/BoostMutex.cpp
index f7cadab..4e556df 100644
--- a/lib/cpp/src/thrift/concurrency/BoostMutex.cpp
+++ b/lib/cpp/src/thrift/concurrency/BoostMutex.cpp
@@ -33,7 +33,9 @@
namespace concurrency {
/**
- * Implementation of Mutex class using boost interprocess mutex
+ * Implementation of Mutex class using boost::timed_mutex
+ *
+ * Methods throw boost::lock_error on error.
*
* @version $Id:$
*/
diff --git a/lib/cpp/src/thrift/concurrency/Mutex.cpp b/lib/cpp/src/thrift/concurrency/Mutex.cpp
index b6b915d..bcab05e 100644
--- a/lib/cpp/src/thrift/concurrency/Mutex.cpp
+++ b/lib/cpp/src/thrift/concurrency/Mutex.cpp
@@ -17,18 +17,24 @@
* under the License.
*/
+// needed to test for pthread implementation capabilities:
+#define __USE_GNU
+
#include <thrift/thrift-config.h>
#include <thrift/Thrift.h>
+#include <thrift/concurrency/Exception.h>
#include <thrift/concurrency/Mutex.h>
#include <thrift/concurrency/Util.h>
#include <assert.h>
-#ifdef HAVE_PTHREAD_H
+#include <stdlib.h>
#include <pthread.h>
-#endif
#include <signal.h>
+#include <string.h>
+#include <boost/format.hpp>
+#include <boost/shared_ptr.hpp>
using boost::shared_ptr;
namespace apache {
@@ -110,9 +116,17 @@
#define PROFILE_MUTEX_UNLOCKED()
#endif // THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING
+#define EINTR_LOOP(_CALL) int ret; do { ret = _CALL; } while (ret == EINTR)
+#define ABORT_ONFAIL(_CALL) { EINTR_LOOP(_CALL); if (ret) { abort(); } }
+#define THROW_SRE(_CALLSTR, RET) { throw SystemResourceException(boost::str(boost::format("%1% returned %2% (%3%)") % _CALLSTR % RET % ::strerror(RET))); }
+#define THROW_SRE_ONFAIL(_CALL) { EINTR_LOOP(_CALL); if (ret) { THROW_SRE(#_CALL, ret); } }
+#define THROW_SRE_TRYFAIL(_CALL) { EINTR_LOOP(_CALL); if (ret == 0) { return true; } else if (ret == EBUSY) { return false; } THROW_SRE(#_CALL, ret); }
+
/**
* Implementation of Mutex class using POSIX mutex
*
+ * Throws apache::thrift::concurrency::SystemResourceException on error.
+ *
* @version $Id:$
*/
class Mutex::impl {
@@ -128,19 +142,19 @@
~impl() {
if (initialized_) {
initialized_ = false;
- int ret = pthread_mutex_destroy(&pthread_mutex_);
- THRIFT_UNUSED_VARIABLE(ret);
- assert(ret == 0);
+ ABORT_ONFAIL(pthread_mutex_destroy(&pthread_mutex_));
}
}
void lock() const {
PROFILE_MUTEX_START_LOCK();
- pthread_mutex_lock(&pthread_mutex_);
+ THROW_SRE_ONFAIL(pthread_mutex_lock(&pthread_mutex_));
PROFILE_MUTEX_LOCKED();
}
- bool trylock() const { return (0 == pthread_mutex_trylock(&pthread_mutex_)); }
+ bool trylock() const {
+ THROW_SRE_TRYFAIL(pthread_mutex_trylock(&pthread_mutex_));
+ }
bool timedlock(int64_t milliseconds) const {
#if defined(_POSIX_TIMEOUTS) && _POSIX_TIMEOUTS >= 200112L
@@ -148,14 +162,16 @@
struct THRIFT_TIMESPEC ts;
Util::toTimespec(ts, milliseconds + Util::currentTime());
- int ret = pthread_mutex_timedlock(&pthread_mutex_, &ts);
+ EINTR_LOOP(pthread_mutex_timedlock(&pthread_mutex_, &ts));
if (ret == 0) {
PROFILE_MUTEX_LOCKED();
return true;
+ } else if (ret == ETIMEDOUT) {
+ PROFILE_MUTEX_NOT_LOCKED();
+ return false;
}
- PROFILE_MUTEX_NOT_LOCKED();
- return false;
+ THROW_SRE("pthread_mutex_timedlock(&pthread_mutex_, &ts)", ret);
#else
/* Otherwise follow solution used by Mono for Android */
struct THRIFT_TIMESPEC sleepytime, now, to;
@@ -180,7 +196,7 @@
void unlock() const {
PROFILE_MUTEX_START_UNLOCK();
- pthread_mutex_unlock(&pthread_mutex_);
+ THROW_SRE_ONFAIL(pthread_mutex_unlock(&pthread_mutex_));
PROFILE_MUTEX_UNLOCKED();
}
@@ -219,28 +235,16 @@
void Mutex::DEFAULT_INITIALIZER(void* arg) {
pthread_mutex_t* pthread_mutex = (pthread_mutex_t*)arg;
- int ret = pthread_mutex_init(pthread_mutex, NULL);
- THRIFT_UNUSED_VARIABLE(ret);
- assert(ret == 0);
+ THROW_SRE_ONFAIL(pthread_mutex_init(pthread_mutex, NULL));
}
-#if defined(PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) \
- || defined(PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP)
+#if defined(PTHREAD_ADAPTIVE_MUTEX_INITIALIZER_NP) || defined(PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP) || defined(PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP)
static void init_with_kind(pthread_mutex_t* mutex, int kind) {
pthread_mutexattr_t mutexattr;
- int ret = pthread_mutexattr_init(&mutexattr);
- assert(ret == 0);
-
- // Apparently, this can fail. Should we really be aborting?
- ret = pthread_mutexattr_settype(&mutexattr, kind);
- assert(ret == 0);
-
- ret = pthread_mutex_init(mutex, &mutexattr);
- assert(ret == 0);
-
- ret = pthread_mutexattr_destroy(&mutexattr);
- assert(ret == 0);
- THRIFT_UNUSED_VARIABLE(ret);
+ THROW_SRE_ONFAIL(pthread_mutexattr_init(&mutexattr));
+ THROW_SRE_ONFAIL(pthread_mutexattr_settype(&mutexattr, kind));
+ THROW_SRE_ONFAIL(pthread_mutex_init(mutex, &mutexattr));
+ THROW_SRE_ONFAIL(pthread_mutexattr_destroy(&mutexattr));
}
#endif
@@ -258,6 +262,12 @@
}
#endif
+#ifdef PTHREAD_ERRORCHECK_MUTEX_INITIALIZER_NP
+void Mutex::ERRORCHECK_INITIALIZER(void* arg) {
+ init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_ERRORCHECK);
+}
+#endif
+
#ifdef PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP
void Mutex::RECURSIVE_INITIALIZER(void* arg) {
init_with_kind((pthread_mutex_t*)arg, PTHREAD_MUTEX_RECURSIVE_NP);
@@ -275,40 +285,36 @@
#ifdef THRIFT_PTHREAD_MUTEX_CONTENTION_PROFILING
profileTime_ = 0;
#endif
- int ret = pthread_rwlock_init(&rw_lock_, NULL);
- THRIFT_UNUSED_VARIABLE(ret);
- assert(ret == 0);
+ THROW_SRE_ONFAIL(pthread_rwlock_init(&rw_lock_, NULL));
initialized_ = true;
}
~impl() {
if (initialized_) {
initialized_ = false;
- int ret = pthread_rwlock_destroy(&rw_lock_);
- THRIFT_UNUSED_VARIABLE(ret);
- assert(ret == 0);
+ ABORT_ONFAIL(pthread_rwlock_destroy(&rw_lock_));
}
}
void acquireRead() const {
PROFILE_MUTEX_START_LOCK();
- pthread_rwlock_rdlock(&rw_lock_);
+ THROW_SRE_ONFAIL(pthread_rwlock_rdlock(&rw_lock_));
PROFILE_MUTEX_NOT_LOCKED(); // not exclusive, so use not-locked path
}
void acquireWrite() const {
PROFILE_MUTEX_START_LOCK();
- pthread_rwlock_wrlock(&rw_lock_);
+ THROW_SRE_ONFAIL(pthread_rwlock_wrlock(&rw_lock_));
PROFILE_MUTEX_LOCKED();
}
- bool attemptRead() const { return !pthread_rwlock_tryrdlock(&rw_lock_); }
+ bool attemptRead() const { THROW_SRE_TRYFAIL(pthread_rwlock_tryrdlock(&rw_lock_)); }
- bool attemptWrite() const { return !pthread_rwlock_trywrlock(&rw_lock_); }
+ bool attemptWrite() const { THROW_SRE_TRYFAIL(pthread_rwlock_trywrlock(&rw_lock_)); }
void release() const {
PROFILE_MUTEX_START_UNLOCK();
- pthread_rwlock_unlock(&rw_lock_);
+ THROW_SRE_ONFAIL(pthread_rwlock_unlock(&rw_lock_));
PROFILE_MUTEX_UNLOCKED();
}
diff --git a/lib/cpp/src/thrift/concurrency/Mutex.h b/lib/cpp/src/thrift/concurrency/Mutex.h
index 6f892dc..e1e395e 100644
--- a/lib/cpp/src/thrift/concurrency/Mutex.h
+++ b/lib/cpp/src/thrift/concurrency/Mutex.h
@@ -54,6 +54,11 @@
#endif
/**
+ * NOTE: All mutex implementations throw an exception on failure. See each
+ * specific implementation to understand the exception type(s) used.
+ */
+
+/**
* A simple mutex class
*
* @version $Id:$
@@ -64,6 +69,7 @@
Mutex(Initializer init = DEFAULT_INITIALIZER);
virtual ~Mutex() {}
+
virtual void lock() const;
virtual bool trylock() const;
virtual bool timedlock(int64_t milliseconds) const;
@@ -71,8 +77,11 @@
void* getUnderlyingImpl() const;
- static void DEFAULT_INITIALIZER(void*);
+ // If you attempt to use one of these and it fails to link, it means
+ // your version of pthreads does not support it - try another one.
static void ADAPTIVE_INITIALIZER(void*);
+ static void DEFAULT_INITIALIZER(void*);
+ static void ERRORCHECK_INITIALIZER(void*);
static void RECURSIVE_INITIALIZER(void*);
private:
diff --git a/lib/cpp/src/thrift/concurrency/StdMutex.cpp b/lib/cpp/src/thrift/concurrency/StdMutex.cpp
index 49c18d8..e0f79fa 100644
--- a/lib/cpp/src/thrift/concurrency/StdMutex.cpp
+++ b/lib/cpp/src/thrift/concurrency/StdMutex.cpp
@@ -33,6 +33,8 @@
/**
* Implementation of Mutex class using C++11 std::timed_mutex
*
+ * Methods throw std::system_error on error.
+ *
* @version $Id:$
*/
class Mutex::impl : public std::timed_mutex {};
diff --git a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
index c4726dd..88cd59a 100644
--- a/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/thrift/concurrency/ThreadManager.cpp
@@ -30,10 +30,6 @@
#include <deque>
#include <set>
-#if defined(DEBUG)
-#include <iostream>
-#endif // defined(DEBUG)
-
namespace apache {
namespace thrift {
namespace concurrency {