cpp: Add profiling hooks to Mutex, ReadWriteMutex() classes
Extend the Thrift C++ Concurrency library by allowing a user to register
a callback and a sample rate for lock primitive contention profiling.
The callback will be invoked approximately once every sampleRate calls
to Mutex::lock(), Mutex::timedlock(), ReadWriteLock::acquireRead(), or
ReadWriteLock::acquireWrite().
The callback receives a pointer to the mutex responsible and the time
waited on the lock in micros (whether the lock was successfuly acquire
or not). The user can then implement a registry of his choice to
log/collect this data as needed.
This can all be easily compiled out if it harms performance. By
default, there is no profiling callback, so the overhead is minimal
(one branch).
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@920681 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/Mutex.cpp b/lib/cpp/src/concurrency/Mutex.cpp
index 67d7a2c..d72267d 100644
--- a/lib/cpp/src/concurrency/Mutex.cpp
+++ b/lib/cpp/src/concurrency/Mutex.cpp
@@ -22,11 +22,87 @@
#include <assert.h>
#include <pthread.h>
+#include <signal.h>
using boost::shared_ptr;
namespace apache { namespace thrift { namespace concurrency {
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+
+static sig_atomic_t mutexProfilingSampleRate = 0;
+static MutexWaitCallback mutexProfilingCallback = 0;
+
+volatile static sig_atomic_t mutexProfilingCounter = 0;
+
+void enableMutexProfiling(int32_t profilingSampleRate,
+ MutexWaitCallback callback) {
+ mutexProfilingSampleRate = profilingSampleRate;
+ mutexProfilingCallback = callback;
+}
+
+#define PROFILE_MUTEX_START_LOCK() \
+ int64_t _lock_startTime = maybeGetProfilingStartTime();
+
+#define PROFILE_MUTEX_NOT_LOCKED() \
+ do { \
+ if (_lock_startTime > 0) { \
+ int64_t endTime = Util::currentTimeUsec(); \
+ (*mutexProfilingCallback)(this, endTime - _lock_startTime); \
+ } \
+ } while (0)
+
+#define PROFILE_MUTEX_LOCKED() \
+ do { \
+ profileTime_ = _lock_startTime; \
+ if (profileTime_ > 0) { \
+ profileTime_ = Util::currentTimeUsec() - profileTime_; \
+ } \
+ } while (0)
+
+#define PROFILE_MUTEX_START_UNLOCK() \
+ int64_t _temp_profileTime = profileTime_; \
+ profileTime_ = 0;
+
+#define PROFILE_MUTEX_UNLOCKED() \
+ do { \
+ if (_temp_profileTime > 0) { \
+ (*mutexProfilingCallback)(this, _temp_profileTime); \
+ } \
+ } while (0)
+
+static inline int64_t maybeGetProfilingStartTime() {
+ if (mutexProfilingSampleRate && mutexProfilingCallback) {
+ // This block is unsynchronized, but should produce a reasonable sampling
+ // rate on most architectures. The main race conditions are the gap
+ // between the decrement and the test, the non-atomicity of decrement, and
+ // potential caching of different values at different CPUs.
+ //
+ // - if two decrements race, the likeliest result is that the counter
+ // decrements slowly (perhaps much more slowly) than intended.
+ //
+ // - many threads could potentially decrement before resetting the counter
+ // to its large value, causing each additional incoming thread to
+ // profile every call. This situation is unlikely to persist for long
+ // as the critical gap is quite short, but profiling could be bursty.
+ sig_atomic_t localValue = --mutexProfilingCounter;
+ if (localValue <= 0) {
+ mutexProfilingCounter = mutexProfilingSampleRate;
+ return Util::currentTimeUsec();
+ }
+ }
+
+ return 0;
+}
+
+#else
+# define PROFILE_MUTEX_START_LOCK()
+# define PROFILE_MUTEX_NOT_LOCKED()
+# define PROFILE_MUTEX_LOCKED()
+# define PROFILE_MUTEX_START_UNLOCK()
+# define PROFILE_MUTEX_UNLOCKED()
+#endif // THRIFT_NO_CONTENTION_PROFILING
+
/**
* Implementation of Mutex class using POSIX mutex
*
@@ -35,6 +111,9 @@
class Mutex::impl {
public:
impl(Initializer init) : initialized_(false) {
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+ profileTime_ = 0;
+#endif
init(&pthread_mutex_);
initialized_ = true;
}
@@ -47,23 +126,43 @@
}
}
- void lock() const { pthread_mutex_lock(&pthread_mutex_); }
+ void lock() const {
+ PROFILE_MUTEX_START_LOCK();
+ pthread_mutex_lock(&pthread_mutex_);
+ PROFILE_MUTEX_LOCKED();
+ }
bool trylock() const { return (0 == pthread_mutex_trylock(&pthread_mutex_)); }
bool timedlock(int64_t milliseconds) const {
+ PROFILE_MUTEX_START_LOCK();
+
struct timespec ts;
Util::toTimespec(ts, milliseconds);
- return (0 == pthread_mutex_timedlock(&pthread_mutex_, &ts));
+ int ret = pthread_mutex_timedlock(&pthread_mutex_, &ts);
+ if (ret == 0) {
+ PROFILE_MUTEX_LOCKED();
+ return true;
+ }
+
+ PROFILE_MUTEX_NOT_LOCKED();
+ return false;
}
- void unlock() const { pthread_mutex_unlock(&pthread_mutex_); }
+ void unlock() const {
+ PROFILE_MUTEX_START_UNLOCK();
+ pthread_mutex_unlock(&pthread_mutex_);
+ PROFILE_MUTEX_UNLOCKED();
+ }
void* getUnderlyingImpl() const { return (void*) &pthread_mutex_; }
private:
mutable pthread_mutex_t pthread_mutex_;
mutable bool initialized_;
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+ mutable int64_t profileTime_;
+#endif
};
Mutex::Mutex(Initializer init) : impl_(new Mutex::impl(init)) {}
@@ -129,6 +228,9 @@
class ReadWriteMutex::impl {
public:
impl() : initialized_(false) {
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+ profileTime_ = 0;
+#endif
int ret = pthread_rwlock_init(&rw_lock_, NULL);
assert(ret == 0);
initialized_ = true;
@@ -142,19 +244,34 @@
}
}
- void acquireRead() const { pthread_rwlock_rdlock(&rw_lock_); }
+ void acquireRead() const {
+ PROFILE_MUTEX_START_LOCK();
+ pthread_rwlock_rdlock(&rw_lock_);
+ PROFILE_MUTEX_NOT_LOCKED(); // not exclusive, so use not-locked path
+ }
- void acquireWrite() const { pthread_rwlock_wrlock(&rw_lock_); }
+ void acquireWrite() const {
+ PROFILE_MUTEX_START_LOCK();
+ pthread_rwlock_wrlock(&rw_lock_);
+ PROFILE_MUTEX_LOCKED();
+ }
bool attemptRead() const { return pthread_rwlock_tryrdlock(&rw_lock_); }
bool attemptWrite() const { return pthread_rwlock_trywrlock(&rw_lock_); }
- void release() const { pthread_rwlock_unlock(&rw_lock_); }
+ void release() const {
+ PROFILE_MUTEX_START_UNLOCK();
+ pthread_rwlock_unlock(&rw_lock_);
+ PROFILE_MUTEX_UNLOCKED();
+ }
private:
mutable pthread_rwlock_t rw_lock_;
mutable bool initialized_;
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+ mutable int64_t profileTime_;
+#endif
};
ReadWriteMutex::ReadWriteMutex() : impl_(new ReadWriteMutex::impl()) {}