cpp: TNonBlockingServer: Use separate monitor for max queue

We were using the same monitor for max queue size and empty queue, this
meant the notifies might be going to the wrong place.

This change significantly reduces the time spent in futex calls in
loaded servers.

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@920667 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp
index 7bba0e6..52473c7 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/concurrency/ThreadManager.cpp
@@ -54,7 +54,9 @@
     workerMaxCount_(0),
     idleCount_(0),
     pendingTaskCountMax_(0),
-    state_(ThreadManager::UNINITIALIZED) {}
+    state_(ThreadManager::UNINITIALIZED),
+    monitor_(&mutex_),
+    maxMonitor_(&mutex_) {}
 
   ~Impl() { stop(); }
 
@@ -133,7 +135,9 @@
 
   friend class ThreadManager::Task;
   std::queue<shared_ptr<Task> > tasks_;
+  Mutex mutex_;
   Monitor monitor_;
+  Monitor maxMonitor_;
   Monitor workerMonitor_;
 
   friend class ThreadManager::Worker;
@@ -245,7 +249,7 @@
        * the manager will see it.
        */
       {
-        Synchronized s(manager_->monitor_);
+        Guard g(manager_->mutex_);
         active = isActive();
 
         while (active && manager_->tasks_.empty()) {
@@ -269,7 +273,7 @@
                thread that might be blocked on add. */
             if (manager_->pendingTaskCountMax_ != 0 &&
                 manager_->tasks_.size() == manager_->pendingTaskCountMax_ - 1) {
-              manager_->monitor_.notify();
+              manager_->maxMonitor_.notify();
             }
           }
         } else {
@@ -432,7 +436,7 @@
   }
 
   void ThreadManager::Impl::add(shared_ptr<Runnable> value, int64_t timeout) {
-    Synchronized s(monitor_);
+    Guard g(mutex_);
 
     if (state_ != ThreadManager::STARTED) {
       throw IllegalStateException();
@@ -441,7 +445,8 @@
     if (pendingTaskCountMax_ > 0 && (tasks_.size() >= pendingTaskCountMax_)) {
       if (canSleep() && timeout >= 0) {
         while (pendingTaskCountMax_ > 0 && tasks_.size() >= pendingTaskCountMax_) {
-          monitor_.wait(timeout);
+          // This is thread safe because the mutex is shared between monitors.
+          maxMonitor_.wait(timeout);
         }
       } else {
         throw TooManyPendingTasksException();