More bullet proofing of timer manager

facebook::thrift::concurrency::TimerManager::stop
	Added proper cleanup of unprocessed tasks and shutdown of dispatcher thread to stop

facebook::thrift::concurrency::TimerManager::~TimerManager
	Call stop if manager wasn't explicitly stopped

facebook::thrift::concurrency::test.TimerManagerTest
	Calculate error margin for timeout expiration and verify it's within bounds
	Verify manager stops properly when it goes out of scope
	


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664724 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/Monitor.cc b/lib/cpp/src/concurrency/Monitor.cc
index f6144ba..b1d7b72 100644
--- a/lib/cpp/src/concurrency/Monitor.cc
+++ b/lib/cpp/src/concurrency/Monitor.cc
@@ -59,6 +59,8 @@
 
     // XXX Need to assert that caller owns mutex
 
+    assert(timeout >= 0LL);
+
     if(timeout == 0LL) {
 
       assert(pthread_cond_wait(&_pthread_cond, &_pthread_mutex) == 0);
@@ -67,13 +69,15 @@
 
       struct timespec abstime;
 
+      long long now = Util::currentTime();
+
+      Util::toTimespec(abstime, now + timeout);
+
       int result  = pthread_cond_timedwait(&_pthread_cond, &_pthread_mutex, &abstime);
 
       if(result == ETIMEDOUT) {
 
-	// XXX Add assert once currentTime is fixed to have ms resolution or better
-
-	// assert(Util::currentTime() >= (now + timeout));
+	assert(Util::currentTime() >= (now + timeout));
       }
     }
   }
diff --git a/lib/cpp/src/concurrency/TimerManager.cc b/lib/cpp/src/concurrency/TimerManager.cc
index 015ffba..93b0dc5 100644
--- a/lib/cpp/src/concurrency/TimerManager.cc
+++ b/lib/cpp/src/concurrency/TimerManager.cc
@@ -82,22 +82,25 @@
 
       {Synchronized s(_manager->_monitor);
 
-	/* Update next timeout if necessary */
-
 	task_iterator expiredTaskEnd;
 
+	long long now = Util::currentTime();
+
 	while(_manager->_state == TimerManager::STARTED && 
-	      (expiredTaskEnd = _manager->_taskMap.upper_bound(Util::currentTime())) == _manager->_taskMap.begin()) {
+	      (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.begin()) {
 
 	  long long timeout = 0LL;
 
 	  if(!_manager->_taskMap.empty()) {
 
-	    timeout = Util::currentTime() - _manager->_taskMap.begin()->first;
+	    timeout = _manager->_taskMap.begin()->first - now;
 	  }
+
+	  assert((timeout != 0 && _manager->_taskCount > 0) || (timeout == 0 && _manager->_taskCount == 0));
 	    
  	  _manager->_monitor.wait(timeout);
-	  
+
+	  now = Util::currentTime();
 	}
 	
 	if(_manager->_state == TimerManager::STARTED) {
@@ -151,13 +154,29 @@
 };
 
 TimerManager::TimerManager() :
+  _taskCount(0),
   _state(TimerManager::UNINITIALIZED),
   _dispatcher(new Dispatcher(this)) {
 }
 
 
 TimerManager::~TimerManager() {
-  delete _dispatcher;
+
+  /* If we haven't been explicitly stopped, do so now.  We don't need to grab the monitor here, since
+     stop already takes care of reentrancy. */
+  
+  if(_state != STOPPED) {
+    
+    try {
+      
+      stop();
+      
+    } catch(...) {
+      
+      // uhoh
+      
+    }
+  }
 }
 
 void TimerManager::start() {
@@ -196,6 +215,8 @@
 
 void TimerManager::stop() {
 
+  bool doStop = false;
+
   {Synchronized s(_monitor);
 
     if(_state == TimerManager::UNINITIALIZED) {
@@ -204,6 +225,8 @@
 
     } else if(_state != STOPPING &&  _state != STOPPED) {
 
+      doStop = true;
+
       _state = STOPPING;
 
       _monitor.notifyAll();
@@ -214,6 +237,21 @@
       _monitor.wait();
     }
   }
+
+  if(doStop) {
+
+    // Clean up any outstanding tasks
+
+    for(task_iterator ix =  _taskMap.begin(); ix != _taskMap.end(); ix++) {
+
+      delete ix->second;
+
+      _taskMap.erase(ix);
+    }
+
+    delete _dispatcher;
+  }
+
 }
 
 const ThreadFactory* TimerManager::threadFactory() const {
diff --git a/lib/cpp/src/concurrency/Util.h b/lib/cpp/src/concurrency/Util.h
index 3ab8929..d04435d 100644
--- a/lib/cpp/src/concurrency/Util.h
+++ b/lib/cpp/src/concurrency/Util.h
@@ -2,7 +2,7 @@
 #define _concurrency_Util_h_ 1
 
 #include <assert.h>
-#include <time.h>
+#include <sys/time.h>
 
 namespace facebook { namespace thrift { namespace concurrency { 
 
@@ -19,45 +19,34 @@
 
  public:
 
-  /** Converts relative timeout specified as a duration in milliseconds to a struct timespec structure
-      specifying current time plus timeout 
+  /** Converts timespec to milliseconds
 
-      @param struct timespec& current time plus timeout result  
-      @param timeout time to delay in milliseconds */
+      @param struct timespec& result
+      @param time or duration in milliseconds */
 
-  static const void toAbsoluteTimespec(struct timespec& result, long long value) {
+  static void toTimespec(struct timespec& result, long long value) {
     
-    // XXX Darwin doesn't seem to have any readily useable hi-res clock.
+    result.tv_sec = value / 1000; // ms to s
     
-    time_t seconds; 
-    
-    assert(time(&seconds) != (time_t)-1);
-    
-    seconds+= (value / 1000);
-    
-    long nanoseconds = (value % 1000) * 1000000;
-    
-    result.tv_sec = seconds + (nanoseconds / 1000000000);
-    
-    result.tv_nsec = nanoseconds % 1000000000;
+    result.tv_nsec = (value % 1000) * 1000000; // ms to ns
   }
 
-  /** Converts absolute timespec to milliseconds from epoch */
+  /** Converts timespec to milliseconds */
 
   static const void toMilliseconds(long long& result, const struct timespec& value) {
 
-    result = value.tv_sec * 1000 + value.tv_nsec * 1000000;
+    result = value.tv_sec * 1000 + value.tv_nsec / 1000000;
   }
 
   /** Get current time as milliseconds from epoch */
 
   static const long long currentTime() {
 
-    time_t now;
+    struct timeval now;
 
-    time(&now);
+    assert(gettimeofday(&now, NULL) == 0);
 
-    return (long long)now * 1000;
+    return ((long long)now.tv_sec) * 1000LL + now.tv_usec / 1000;
   }
 };
 
diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.h b/lib/cpp/src/concurrency/test/TimerManagerTests.h
index 24f7964..f34f8b0 100644
--- a/lib/cpp/src/concurrency/test/TimerManagerTests.h
+++ b/lib/cpp/src/concurrency/test/TimerManagerTests.h
@@ -1,6 +1,7 @@
 #include <TimerManager.h>
 #include <PosixThreadFactory.h>
 #include <Monitor.h>
+#include <Util.h>
 
 #include <assert.h>
 #include <iostream>
@@ -20,12 +21,30 @@
 
   public:
     
-  Task(Monitor& monitor) : 
+  Task(Monitor& monitor, long long timeout) : 
+    _timeout(timeout),
+    _startTime(Util::currentTime()),
     _monitor(monitor),
+    _success(false),
     _done(false) {}
 
     void run() {
 
+      _endTime = Util::currentTime();
+
+      // Figure out error percentage
+
+      long long delta = _endTime - _startTime;
+
+
+      delta = delta > _timeout ?  delta - _timeout : _timeout - delta;
+
+      float error = delta / _timeout;
+
+      if(error < .10) {
+	_success = true;
+      }
+      
       std::cout << "\t\t\tHello World" << std::endl;
 
       _done = true;
@@ -34,37 +53,60 @@
 	_monitor.notifyAll();
       }
     }
-    
+
+
+    long long _timeout;
+    long long _startTime;
+    long long _endTime;
     Monitor& _monitor;
+    bool _success;
     bool _done;
   };
 
 public:
 
-  bool test00() {
+  /** This test creates two tasks and waits for the first to expire within 10% of the expected expiration time.  It then verifies that
+      the timer manager properly clean up itself and the remaining orphaned timeout task when the manager goes out of scope and its 
+      destructor is called. */
 
-    TimerManager* timerManager =  new TimerManager();
+  bool test00(long long timeout=1000LL) {
 
-    timerManager->threadFactory(new PosixThreadFactory());
+    TimerManagerTests::Task* orphanTask = new TimerManagerTests::Task(_monitor, 10 * timeout);
 
-    timerManager->start();
+    {
 
-    assert(timerManager->state() == TimerManager::STARTED);
+      TimerManager timerManager;
+      
+      timerManager.threadFactory(new PosixThreadFactory());
+      
+      timerManager.start();
+      
+      assert(timerManager.state() == TimerManager::STARTED);
 
-    TimerManagerTests::Task* task = new TimerManagerTests::Task(_monitor);
+      TimerManagerTests::Task* task = new TimerManagerTests::Task(_monitor, timeout);
 
-    {Synchronized s(_monitor);
+      {Synchronized s(_monitor);
 
-      timerManager->add(task, 1000LL);
+	timerManager.add(orphanTask, 10 * timeout);
 
-      _monitor.wait();
+	timerManager.add(task, timeout);
+
+	_monitor.wait();
+      }
+
+      assert(task->_done);
+
+
+      std::cout << "\t\t\t" << (task->_success ? "Success" : "Failure") << "!" << std::endl;
+
+      delete task;
     }
 
-    assert(task->_done);
+    // timerManager.stop(); This is where it happens via destructor
 
-    delete task;
+    assert(!orphanTask->_done);
 
-    std::cout << "\t\t\tSuccess!" << std::endl;
+    delete orphanTask;
 
     return true;
   }