cpp: Add profiling hooks to Mutex, ReadWriteMutex() classes
Extend the Thrift C++ Concurrency library by allowing a user to register
a callback and a sample rate for lock primitive contention profiling.
The callback will be invoked approximately once every sampleRate calls
to Mutex::lock(), Mutex::timedlock(), ReadWriteLock::acquireRead(), or
ReadWriteLock::acquireWrite().
The callback receives a pointer to the mutex responsible and the time
waited on the lock in micros (whether the lock was successfuly acquire
or not). The user can then implement a registry of his choice to
log/collect this data as needed.
This can all be easily compiled out if it harms performance. By
default, there is no profiling callback, so the overhead is minimal
(one branch).
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@920681 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/Mutex.cpp b/lib/cpp/src/concurrency/Mutex.cpp
index 67d7a2c..d72267d 100644
--- a/lib/cpp/src/concurrency/Mutex.cpp
+++ b/lib/cpp/src/concurrency/Mutex.cpp
@@ -22,11 +22,87 @@
#include <assert.h>
#include <pthread.h>
+#include <signal.h>
using boost::shared_ptr;
namespace apache { namespace thrift { namespace concurrency {
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+
+static sig_atomic_t mutexProfilingSampleRate = 0;
+static MutexWaitCallback mutexProfilingCallback = 0;
+
+volatile static sig_atomic_t mutexProfilingCounter = 0;
+
+void enableMutexProfiling(int32_t profilingSampleRate,
+ MutexWaitCallback callback) {
+ mutexProfilingSampleRate = profilingSampleRate;
+ mutexProfilingCallback = callback;
+}
+
+#define PROFILE_MUTEX_START_LOCK() \
+ int64_t _lock_startTime = maybeGetProfilingStartTime();
+
+#define PROFILE_MUTEX_NOT_LOCKED() \
+ do { \
+ if (_lock_startTime > 0) { \
+ int64_t endTime = Util::currentTimeUsec(); \
+ (*mutexProfilingCallback)(this, endTime - _lock_startTime); \
+ } \
+ } while (0)
+
+#define PROFILE_MUTEX_LOCKED() \
+ do { \
+ profileTime_ = _lock_startTime; \
+ if (profileTime_ > 0) { \
+ profileTime_ = Util::currentTimeUsec() - profileTime_; \
+ } \
+ } while (0)
+
+#define PROFILE_MUTEX_START_UNLOCK() \
+ int64_t _temp_profileTime = profileTime_; \
+ profileTime_ = 0;
+
+#define PROFILE_MUTEX_UNLOCKED() \
+ do { \
+ if (_temp_profileTime > 0) { \
+ (*mutexProfilingCallback)(this, _temp_profileTime); \
+ } \
+ } while (0)
+
+static inline int64_t maybeGetProfilingStartTime() {
+ if (mutexProfilingSampleRate && mutexProfilingCallback) {
+ // This block is unsynchronized, but should produce a reasonable sampling
+ // rate on most architectures. The main race conditions are the gap
+ // between the decrement and the test, the non-atomicity of decrement, and
+ // potential caching of different values at different CPUs.
+ //
+ // - if two decrements race, the likeliest result is that the counter
+ // decrements slowly (perhaps much more slowly) than intended.
+ //
+ // - many threads could potentially decrement before resetting the counter
+ // to its large value, causing each additional incoming thread to
+ // profile every call. This situation is unlikely to persist for long
+ // as the critical gap is quite short, but profiling could be bursty.
+ sig_atomic_t localValue = --mutexProfilingCounter;
+ if (localValue <= 0) {
+ mutexProfilingCounter = mutexProfilingSampleRate;
+ return Util::currentTimeUsec();
+ }
+ }
+
+ return 0;
+}
+
+#else
+# define PROFILE_MUTEX_START_LOCK()
+# define PROFILE_MUTEX_NOT_LOCKED()
+# define PROFILE_MUTEX_LOCKED()
+# define PROFILE_MUTEX_START_UNLOCK()
+# define PROFILE_MUTEX_UNLOCKED()
+#endif // THRIFT_NO_CONTENTION_PROFILING
+
/**
* Implementation of Mutex class using POSIX mutex
*
@@ -35,6 +111,9 @@
class Mutex::impl {
public:
impl(Initializer init) : initialized_(false) {
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+ profileTime_ = 0;
+#endif
init(&pthread_mutex_);
initialized_ = true;
}
@@ -47,23 +126,43 @@
}
}
- void lock() const { pthread_mutex_lock(&pthread_mutex_); }
+ void lock() const {
+ PROFILE_MUTEX_START_LOCK();
+ pthread_mutex_lock(&pthread_mutex_);
+ PROFILE_MUTEX_LOCKED();
+ }
bool trylock() const { return (0 == pthread_mutex_trylock(&pthread_mutex_)); }
bool timedlock(int64_t milliseconds) const {
+ PROFILE_MUTEX_START_LOCK();
+
struct timespec ts;
Util::toTimespec(ts, milliseconds);
- return (0 == pthread_mutex_timedlock(&pthread_mutex_, &ts));
+ int ret = pthread_mutex_timedlock(&pthread_mutex_, &ts);
+ if (ret == 0) {
+ PROFILE_MUTEX_LOCKED();
+ return true;
+ }
+
+ PROFILE_MUTEX_NOT_LOCKED();
+ return false;
}
- void unlock() const { pthread_mutex_unlock(&pthread_mutex_); }
+ void unlock() const {
+ PROFILE_MUTEX_START_UNLOCK();
+ pthread_mutex_unlock(&pthread_mutex_);
+ PROFILE_MUTEX_UNLOCKED();
+ }
void* getUnderlyingImpl() const { return (void*) &pthread_mutex_; }
private:
mutable pthread_mutex_t pthread_mutex_;
mutable bool initialized_;
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+ mutable int64_t profileTime_;
+#endif
};
Mutex::Mutex(Initializer init) : impl_(new Mutex::impl(init)) {}
@@ -129,6 +228,9 @@
class ReadWriteMutex::impl {
public:
impl() : initialized_(false) {
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+ profileTime_ = 0;
+#endif
int ret = pthread_rwlock_init(&rw_lock_, NULL);
assert(ret == 0);
initialized_ = true;
@@ -142,19 +244,34 @@
}
}
- void acquireRead() const { pthread_rwlock_rdlock(&rw_lock_); }
+ void acquireRead() const {
+ PROFILE_MUTEX_START_LOCK();
+ pthread_rwlock_rdlock(&rw_lock_);
+ PROFILE_MUTEX_NOT_LOCKED(); // not exclusive, so use not-locked path
+ }
- void acquireWrite() const { pthread_rwlock_wrlock(&rw_lock_); }
+ void acquireWrite() const {
+ PROFILE_MUTEX_START_LOCK();
+ pthread_rwlock_wrlock(&rw_lock_);
+ PROFILE_MUTEX_LOCKED();
+ }
bool attemptRead() const { return pthread_rwlock_tryrdlock(&rw_lock_); }
bool attemptWrite() const { return pthread_rwlock_trywrlock(&rw_lock_); }
- void release() const { pthread_rwlock_unlock(&rw_lock_); }
+ void release() const {
+ PROFILE_MUTEX_START_UNLOCK();
+ pthread_rwlock_unlock(&rw_lock_);
+ PROFILE_MUTEX_UNLOCKED();
+ }
private:
mutable pthread_rwlock_t rw_lock_;
mutable bool initialized_;
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+ mutable int64_t profileTime_;
+#endif
};
ReadWriteMutex::ReadWriteMutex() : impl_(new ReadWriteMutex::impl()) {}
diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h
index 94eb835..7cedbe3 100644
--- a/lib/cpp/src/concurrency/Mutex.h
+++ b/lib/cpp/src/concurrency/Mutex.h
@@ -24,6 +24,32 @@
namespace apache { namespace thrift { namespace concurrency {
+#ifndef THRIFT_NO_CONTENTION_PROFILING
+
+/**
+ * Determines if the Thrift Mutex and ReadWriteMutex classes will attempt to
+ * profile their blocking acquire methods. If this value is set to non-zero,
+ * Thrift will attempt to invoke the callback once every profilingSampleRate
+ * times. However, as the sampling is not synchronized the rate is not
+ * guranateed, and could be subject to big bursts and swings. Please ensure
+ * your sampling callback is as performant as your application requires.
+ *
+ * The callback will get called with the wait time taken to lock the mutex in
+ * usec and a (void*) that uniquely identifies the Mutex (or ReadWriteMutex)
+ * being locked.
+ *
+ * The enableMutexProfiling() function is unsynchronized; calling this function
+ * while profiling is already enabled may result in race conditions. On
+ * architectures where a pointer assignment is atomic, this is safe but there
+ * is no guarantee threads will agree on a single callback within any
+ * particular time period.
+ */
+typedef void (*MutexWaitCallback)(const void* id, int64_t waitTimeMicros);
+void enableMutexProfiling(int32_t profilingSampleRate,
+ MutexWaitCallback callback);
+
+#endif
+
/**
* A simple mutex class
*
diff --git a/lib/cpp/src/concurrency/Util.cpp b/lib/cpp/src/concurrency/Util.cpp
index 1c44937..ac2a460 100644
--- a/lib/cpp/src/concurrency/Util.cpp
+++ b/lib/cpp/src/concurrency/Util.cpp
@@ -31,19 +31,19 @@
namespace apache { namespace thrift { namespace concurrency {
-const int64_t Util::currentTime() {
+const int64_t Util::currentTimeTicks(int64_t ticksPerSec) {
int64_t result;
#if defined(HAVE_CLOCK_GETTIME)
struct timespec now;
int ret = clock_gettime(CLOCK_REALTIME, &now);
assert(ret == 0);
- toMilliseconds(result, now);
+ toTicks(result, now, ticksPerSec);
#elif defined(HAVE_GETTIMEOFDAY)
struct timeval now;
int ret = gettimeofday(&now, NULL);
assert(ret == 0);
- toMilliseconds(result, now);
+ toTicks(result, now, ticksPerSec);
#else
#error "No high-precision clock is available."
#endif // defined(HAVE_CLOCK_GETTIME)
@@ -51,5 +51,4 @@
return result;
}
-
}}} // apache::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/Util.h b/lib/cpp/src/concurrency/Util.h
index 4630dbb..18a221d 100644
--- a/lib/cpp/src/concurrency/Util.h
+++ b/lib/cpp/src/concurrency/Util.h
@@ -47,6 +47,7 @@
static const int64_t MS_PER_S = 1000LL;
static const int64_t NS_PER_MS = NS_PER_S / MS_PER_S;
+ static const int64_t NS_PER_US = NS_PER_S / US_PER_S;
static const int64_t US_PER_MS = US_PER_S / MS_PER_S;
public:
@@ -67,32 +68,78 @@
result.tv_usec = (value % MS_PER_S) * US_PER_MS; // ms to us
}
+ static const void toTicks(int64_t& result, int64_t secs, int64_t oldTicks,
+ int64_t oldTicksPerSec, int64_t newTicksPerSec) {
+ result = secs * newTicksPerSec;
+ result += oldTicks * newTicksPerSec / oldTicksPerSec;
+
+ int64_t oldPerNew = oldTicksPerSec / newTicksPerSec;
+ if (oldPerNew && ((oldTicks % oldPerNew) >= (oldPerNew / 2))) {
+ ++result;
+ }
+ }
+ /**
+ * Converts struct timespec to arbitrary-sized ticks since epoch
+ */
+ static const void toTicks(int64_t& result,
+ const struct timespec& value,
+ int64_t ticksPerSec) {
+ return toTicks(result, value.tv_sec, value.tv_nsec, NS_PER_S, ticksPerSec);
+ }
+
+ /**
+ * Converts struct timeval to arbitrary-sized ticks since epoch
+ */
+ static const void toTicks(int64_t& result,
+ const struct timeval& value,
+ int64_t ticksPerSec) {
+ return toTicks(result, value.tv_sec, value.tv_usec, US_PER_S, ticksPerSec);
+ }
+
/**
* Converts struct timespec to milliseconds
*/
- static const void toMilliseconds(int64_t& result, const struct timespec& value) {
- result = (value.tv_sec * MS_PER_S) + (value.tv_nsec / NS_PER_MS);
- // round up -- int64_t cast is to avoid a compiler error for some GCCs
- if (int64_t(value.tv_nsec) % NS_PER_MS >= (NS_PER_MS / 2)) {
- ++result;
- }
+ static const void toMilliseconds(int64_t& result,
+ const struct timespec& value) {
+ return toTicks(result, value, MS_PER_S);
}
/**
* Converts struct timeval to milliseconds
*/
- static const void toMilliseconds(int64_t& result, const struct timeval& value) {
- result = (value.tv_sec * MS_PER_S) + (value.tv_usec / US_PER_MS);
- // round up -- int64_t cast is to avoid a compiler error for some GCCs
- if (int64_t(value.tv_usec) % US_PER_MS >= (US_PER_MS / 2)) {
- ++result;
- }
+ static const void toMilliseconds(int64_t& result,
+ const struct timeval& value) {
+ return toTicks(result, value, MS_PER_S);
}
/**
+ * Converts struct timespec to microseconds
+ */
+ static const void toUsec(int64_t& result, const struct timespec& value) {
+ return toTicks(result, value, US_PER_S);
+ }
+
+ /**
+ * Converts struct timeval to microseconds
+ */
+ static const void toUsec(int64_t& result, const struct timeval& value) {
+ return toTicks(result, value, US_PER_S);
+ }
+
+ /**
+ * Get current time as a number of arbitrary-size ticks from epoch
+ */
+ static const int64_t currentTimeTicks(int64_t ticksPerSec);
+
+ /**
* Get current time as milliseconds from epoch
*/
- static const int64_t currentTime();
+ static const int64_t currentTime() { return currentTimeTicks(MS_PER_S); }
+
+ /**
+ * Get current time as micros from epoch
+ */
+ static const int64_t currentTimeUsec() { return currentTimeTicks(US_PER_S); }
};
}}} // apache::thrift::concurrency