cpp: non-blocking add for ThreadManager

It's rare for the ThreadManager mutex to be contended, but it is
possible.  For nonblocking applications, it is necessary to have a
strict timeout for the lock acquisition.  With this change, that timeout
is enforced.  Also add timeout parameters to Mutex::lock and
Guard::Guard to support this feature.

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@920679 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/Mutex.cpp b/lib/cpp/src/concurrency/Mutex.cpp
index 5d33c11..67d7a2c 100644
--- a/lib/cpp/src/concurrency/Mutex.cpp
+++ b/lib/cpp/src/concurrency/Mutex.cpp
@@ -18,6 +18,7 @@
  */
 
 #include "Mutex.h"
+#include "Util.h"
 
 #include <assert.h>
 #include <pthread.h>
@@ -50,6 +51,12 @@
 
   bool trylock() const { return (0 == pthread_mutex_trylock(&pthread_mutex_)); }
 
+  bool timedlock(int64_t milliseconds) const {
+    struct timespec ts;
+    Util::toTimespec(ts, milliseconds);
+    return (0 == pthread_mutex_timedlock(&pthread_mutex_, &ts));
+  }
+
   void unlock() const { pthread_mutex_unlock(&pthread_mutex_); }
 
   void* getUnderlyingImpl() const { return (void*) &pthread_mutex_; }
@@ -67,6 +74,8 @@
 
 bool Mutex::trylock() const { return impl_->trylock(); }
 
+bool Mutex::timedlock(int64_t ms) const { return impl_->timedlock(ms); }
+
 void Mutex::unlock() const { impl_->unlock(); }
 
 void Mutex::DEFAULT_INITIALIZER(void* arg) {
diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h
index a33b995..94eb835 100644
--- a/lib/cpp/src/concurrency/Mutex.h
+++ b/lib/cpp/src/concurrency/Mutex.h
@@ -37,6 +37,7 @@
   virtual ~Mutex() {}
   virtual void lock() const;
   virtual bool trylock() const;
+  virtual bool timedlock(int64_t milliseconds) const;
   virtual void unlock() const;
 
   void* getUnderlyingImpl() const;
@@ -75,18 +76,33 @@
 
 class Guard {
  public:
-  Guard(const Mutex& value) : mutex_(value) {
-    mutex_.lock();
+  Guard(const Mutex& value, int64_t timeout = 0) : mutex_(&value) {
+    if (timeout == 0) {
+      value.lock();
+    } else if (timeout < 0) {
+      if (!value.trylock()) {
+        mutex_ = NULL;
+      }
+    } else {
+      if (!value.timedlock(timeout)) {
+        mutex_ = NULL;
+      }
+    }
   }
   ~Guard() {
-    mutex_.unlock();
+    if (mutex_) {
+      mutex_->unlock();
+    }
+  }
+
+  operator bool() const {
+    return (mutex_ != NULL);
   }
 
  private:
-  const Mutex& mutex_;
+  const Mutex* mutex_;
 };
 
-
 // Can be used as second argument to RWGuard to make code more readable
 // as to whether we're doing acquireRead() or acquireWrite().
 enum RWGuardType {
diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp
index a02ad74..d0bb41f 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/concurrency/ThreadManager.cpp
@@ -461,7 +461,11 @@
   void ThreadManager::Impl::add(shared_ptr<Runnable> value,
                                 int64_t timeout,
                                 int64_t expiration) {
-    Guard g(mutex_);
+    Guard g(mutex_, timeout);
+
+    if (!g) {
+      throw TimedOutException();
+    }
 
     if (state_ != ThreadManager::STARTED) {
       throw IllegalStateException();