cpp: Let Monitors share Mutex instances

- Let Monitor objects share a Mutex() instance so that more than one
  condition can be implemented on top of a single mutex protecting an
  important data structure.
- Make Mutex and Monitor noncopyable
- Add an accessor to Mutex() so the underlying pthread_mutex_t* can be
  retrieved for passing to pthread_condwait
- Change Monitor to use the actual Mutex class instead of creating a
  naked pthread_mutex_t on its own
- Add new constructors to Monitor

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@920666 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/Monitor.cpp b/lib/cpp/src/concurrency/Monitor.cpp
index 2055caa..2943ef7 100644
--- a/lib/cpp/src/concurrency/Monitor.cpp
+++ b/lib/cpp/src/concurrency/Monitor.cpp
@@ -21,6 +21,8 @@
 #include "Exception.h"
 #include "Util.h"
 
+#include <boost/scoped_ptr.hpp>
+
 #include <assert.h>
 #include <errno.h>
 
@@ -30,6 +32,8 @@
 
 namespace apache { namespace thrift { namespace concurrency {
 
+using boost::scoped_ptr;
+
 /**
  * Monitor implementation using the POSIX pthread library
  *
@@ -39,43 +43,48 @@
 
  public:
 
-  Impl() :
-    mutexInitialized_(false),
-    condInitialized_(false) {
+  Impl()
+     : ownedMutex_(new Mutex()),
+       mutex_(NULL),
+       condInitialized_(false) {
+    init(ownedMutex_.get());
+  }
 
-    if (pthread_mutex_init(&pthread_mutex_, NULL) == 0) {
-      mutexInitialized_ = true;
+  Impl(Mutex* mutex)
+     : mutex_(NULL),
+       condInitialized_(false) {
+    init(mutex);
+  }
 
-      if (pthread_cond_init(&pthread_cond_, NULL) == 0) {
-        condInitialized_ = true;
-      }
-    }
-
-    if (!mutexInitialized_ || !condInitialized_) {
-      cleanup();
-      throw SystemResourceException();
-    }
+  Impl(Monitor* monitor)
+     : mutex_(NULL),
+       condInitialized_(false) {
+    init(&(monitor->mutex()));
   }
 
   ~Impl() { cleanup(); }
 
-  void lock() const { pthread_mutex_lock(&pthread_mutex_); }
-
-  void unlock() const { pthread_mutex_unlock(&pthread_mutex_); }
+  Mutex& mutex() { return *mutex_; }
+  void lock() { mutex().lock(); }
+  void unlock() { mutex().unlock(); }
 
   void wait(int64_t timeout) const {
+    assert(mutex_);
+    pthread_mutex_t* mutexImpl =
+      reinterpret_cast<pthread_mutex_t*>(mutex_->getUnderlyingImpl());
+    assert(mutexImpl);
 
     // XXX Need to assert that caller owns mutex
     assert(timeout >= 0LL);
     if (timeout == 0LL) {
-      int iret = pthread_cond_wait(&pthread_cond_, &pthread_mutex_);
+      int iret = pthread_cond_wait(&pthread_cond_, mutexImpl);
       assert(iret == 0);
     } else {
       struct timespec abstime;
       int64_t now = Util::currentTime();
       Util::toTimespec(abstime, now + timeout);
       int result = pthread_cond_timedwait(&pthread_cond_,
-                                          &pthread_mutex_,
+                                          mutexImpl,
                                           &abstime);
       if (result == ETIMEDOUT) {
         // pthread_cond_timedwait has been observed to return early on
@@ -100,13 +109,20 @@
 
  private:
 
-  void cleanup() {
-    if (mutexInitialized_) {
-      mutexInitialized_ = false;
-      int iret = pthread_mutex_destroy(&pthread_mutex_);
-      assert(iret == 0);
+  void init(Mutex* mutex) {
+    mutex_ = mutex;
+
+    if (pthread_cond_init(&pthread_cond_, NULL) == 0) {
+      condInitialized_ = true;
     }
 
+    if (!condInitialized_) {
+      cleanup();
+      throw SystemResourceException();
+    }
+  }
+
+  void cleanup() {
     if (condInitialized_) {
       condInitialized_ = false;
       int iret = pthread_cond_destroy(&pthread_cond_);
@@ -114,16 +130,21 @@
     }
   }
 
-  mutable pthread_mutex_t pthread_mutex_;
-  mutable bool mutexInitialized_;
+  scoped_ptr<Mutex> ownedMutex_;
+  Mutex* mutex_;
+
   mutable pthread_cond_t pthread_cond_;
   mutable bool condInitialized_;
 };
 
 Monitor::Monitor() : impl_(new Monitor::Impl()) {}
+Monitor::Monitor(Mutex* mutex) : impl_(new Monitor::Impl(mutex)) {}
+Monitor::Monitor(Monitor* monitor) : impl_(new Monitor::Impl(monitor)) {}
 
 Monitor::~Monitor() { delete impl_; }
 
+Mutex& Monitor::mutex() const { return impl_->mutex(); }
+
 void Monitor::lock() const { impl_->lock(); }
 
 void Monitor::unlock() const { impl_->unlock(); }
diff --git a/lib/cpp/src/concurrency/Monitor.h b/lib/cpp/src/concurrency/Monitor.h
index 234bf32..f29119d 100644
--- a/lib/cpp/src/concurrency/Monitor.h
+++ b/lib/cpp/src/concurrency/Monitor.h
@@ -21,6 +21,10 @@
 #define _THRIFT_CONCURRENCY_MONITOR_H_ 1
 
 #include "Exception.h"
+#include "Mutex.h"
+
+#include <boost/utility.hpp>
+
 
 namespace apache { namespace thrift { namespace concurrency {
 
@@ -29,7 +33,12 @@
  * notifying condition events requires that the caller own the mutex.  Mutex
  * lock and unlock operations can be performed independently of condition
  * events.  This is more or less analogous to java.lang.Object multi-thread
- * operations
+ * operations.
+ *
+ * Note the Monitor can create a new, internal mutex; alternatively, a
+ * separate Mutex can be passed in and the Monitor will re-use it without
+ * taking ownership.  It's the user's responsibility to make sure that the
+ * Mutex is not deallocated before the Monitor.
  *
  * Note that all methods are const.  Monitors implement logical constness, not
  * bit constness.  This allows const methods to call monitor methods without
@@ -37,14 +46,22 @@
  *
  * @version $Id:$
  */
-class Monitor {
-
+class Monitor : boost::noncopyable {
  public:
-
+  /** Creates a new mutex, and takes ownership of it. */
   Monitor();
 
+  /** Uses the provided mutex without taking ownership. */
+  explicit Monitor(Mutex* mutex);
+
+  /** Uses the mutex inside the provided Monitor without taking ownership. */
+  explicit Monitor(Monitor* monitor);
+
+  /** Deallocates the mutex only if we own it. */
   virtual ~Monitor();
 
+  Mutex& mutex() const;
+
   virtual void lock() const;
 
   virtual void unlock() const;
@@ -64,18 +81,11 @@
 
 class Synchronized {
  public:
-
- Synchronized(const Monitor& value) :
-   monitor_(value) {
-   monitor_.lock();
-  }
-
-  ~Synchronized() {
-    monitor_.unlock();
-  }
+ Synchronized(const Monitor* monitor) : g(monitor->mutex()) { }
+ Synchronized(const Monitor& monitor) : g(monitor.mutex()) { }
 
  private:
-  const Monitor& monitor_;
+  Guard g;
 };
 
 
diff --git a/lib/cpp/src/concurrency/Mutex.cpp b/lib/cpp/src/concurrency/Mutex.cpp
index 045dbdf..5d33c11 100644
--- a/lib/cpp/src/concurrency/Mutex.cpp
+++ b/lib/cpp/src/concurrency/Mutex.cpp
@@ -52,6 +52,8 @@
 
   void unlock() const { pthread_mutex_unlock(&pthread_mutex_); }
 
+  void* getUnderlyingImpl() const { return (void*) &pthread_mutex_; }
+
  private:
   mutable pthread_mutex_t pthread_mutex_;
   mutable bool initialized_;
@@ -59,6 +61,8 @@
 
 Mutex::Mutex(Initializer init) : impl_(new Mutex::impl(init)) {}
 
+void* Mutex::getUnderlyingImpl() const { return impl_->getUnderlyingImpl(); }
+
 void Mutex::lock() const { impl_->lock(); }
 
 bool Mutex::trylock() const { return impl_->trylock(); }
diff --git a/lib/cpp/src/concurrency/Mutex.h b/lib/cpp/src/concurrency/Mutex.h
index 73c73e0..f7ee9f2 100644
--- a/lib/cpp/src/concurrency/Mutex.h
+++ b/lib/cpp/src/concurrency/Mutex.h
@@ -39,6 +39,8 @@
   virtual bool trylock() const;
   virtual void unlock() const;
 
+  void* getUnderlyingImpl() const;
+
   static void DEFAULT_INITIALIZER(void*);
   static void ADAPTIVE_INITIALIZER(void*);
   static void RECURSIVE_INITIALIZER(void*);