Added thread factory test - problems in thread

Fixed stupid typo in  TimerManager::start


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664723 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/Monitor.cc b/lib/cpp/src/concurrency/Monitor.cc
index a2e4276..f6144ba 100644
--- a/lib/cpp/src/concurrency/Monitor.cc
+++ b/lib/cpp/src/concurrency/Monitor.cc
@@ -1,8 +1,12 @@
 #include "Monitor.h" 
+#include "Exception.h" 
 #include "Util.h"
 
 #include <assert.h>
 #include <errno.h>
+
+#include <iostream>
+
 #include <pthread.h>
 
 
@@ -57,19 +61,19 @@
 
     if(timeout == 0LL) {
 
-      pthread_cond_wait(&_pthread_cond, &_pthread_mutex);
+      assert(pthread_cond_wait(&_pthread_cond, &_pthread_mutex) == 0);
 
     } else {
 
       struct timespec abstime;
 
-      Util::toAbsoluteTimespec(abstime, timeout);
-
       int result  = pthread_cond_timedwait(&_pthread_cond, &_pthread_mutex, &abstime);
 
       if(result == ETIMEDOUT) {
 
-	// XXX If result is timeout need to throw timeout exception
+	// XXX Add assert once currentTime is fixed to have ms resolution or better
+
+	// assert(Util::currentTime() >= (now + timeout));
       }
     }
   }
@@ -101,7 +105,7 @@
 
 Monitor::Monitor() : _impl(new Monitor::Impl()) {}
 
-      Monitor::~Monitor() { delete _impl;}
+Monitor::~Monitor() { delete _impl;}
 
 void Monitor::lock() const {_impl->lock();}
 
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc
index e7f84cd..bac122d 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cc
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc
@@ -22,6 +22,8 @@
 
   static const int MB = 1024 * 1024;
 
+  static void* threadMain(void* arg);
+
 private:
 
   pthread_t _pthread;
@@ -36,26 +38,6 @@
 
   Runnable* _runnable;
 
-  static void* threadMain(void* arg) {
-
-    // XXX need a lock here when testing thread state
-
-    PthreadThread* thread = (PthreadThread*)arg;
-
-    if(thread->_state != starting) {
-      return (void*)0;
-    }
-
-    thread->_state = starting;
-
-    thread->_runnable->run();
-
-    if(thread->_state != stopping && thread->_state != stopped) {
-      thread->_state = stopping;
-    }
-    
-    return (void*)0;
-  }
 
 public:
   
@@ -95,9 +77,9 @@
 
     // Set thread priority
 
-    assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
+    // assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
 
-    assert(pthread_create(&_pthread, &thread_attr, PthreadThread::threadMain, (void*)this) == 0);
+    assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)this) == 0);
   }
 
   void join() {
@@ -114,6 +96,26 @@
 
 };
 
+void* PthreadThread::threadMain(void* arg) {
+  // XXX need a lock here when testing thread state
+
+  PthreadThread* thread = (PthreadThread*)arg;
+  
+  if(thread->_state != starting) {
+    return (void*)0;
+  }
+
+  thread->_state = starting;
+
+  thread->_runnable->run();
+
+  if(thread->_state != stopping && thread->_state != stopped) {
+    thread->_state = stopping;
+  }
+    
+  return (void*)0;
+}
+
 /** POSIX Thread factory implementation */
 
 class PosixThreadFactory::Impl {
diff --git a/lib/cpp/src/concurrency/ThreadManager.cc b/lib/cpp/src/concurrency/ThreadManager.cc
index b5d02e6..d13ce7b 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cc
+++ b/lib/cpp/src/concurrency/ThreadManager.cc
@@ -176,7 +176,7 @@
 
   void run() {
 
-    {Synchronized(_manager->_monitor);
+    {Synchronized s(_manager->_monitor);
 
       if(_state == STARTING) {
 	_state = STARTED;
@@ -191,7 +191,7 @@
 
 	 Once the queue is non-empty, dequeue a task, release monitor, and execute. */
 
-      {Synchronized(_manager->_monitor);
+      {Synchronized s(_manager->_monitor);
 
 	while(_state == STARTED && _manager->_tasks.empty()) {
 	  
@@ -221,7 +221,7 @@
 
     } while(_state == STARTED);
 
-    {Synchronized(_manager->_monitor);
+    {Synchronized s(_manager->_monitor);
 
       if(_state == STOPPING) {
 
diff --git a/lib/cpp/src/concurrency/TimerManager.cc b/lib/cpp/src/concurrency/TimerManager.cc
index bd68264..015ffba 100644
--- a/lib/cpp/src/concurrency/TimerManager.cc
+++ b/lib/cpp/src/concurrency/TimerManager.cc
@@ -1,8 +1,9 @@
 #include "TimerManager.h"
+#include "Exception.h"
 #include "Util.h"
 
 #include <assert.h>
-
+#include <iostream>
 #include <set>
 
 namespace facebook { namespace thrift { namespace concurrency { 
@@ -43,24 +44,19 @@
 
   Runnable* _runnable;
 
+  class TimerManager::Dispatcher;
+
+  friend class TimerManager::Dispatcher;
+
   STATE _state;
 };
 
 class TimerManager::Dispatcher: public Runnable {
 
-  enum STATE {
-    UNINITIALIZED,
-    STARTING,
-    STARTED,
-    STOPPING,
-    STOPPED
-  };
-  
 public:
   Dispatcher(TimerManager* manager) : 
-    _manager(manager),
-    _state(UNINITIALIZED)
-  {}
+    _manager(manager) {
+}
   
   ~Dispatcher() {}
   
@@ -70,10 +66,13 @@
 
   void run() {
 
-    {Synchronized(_manager->_monitor);
+    {Synchronized s(_manager->_monitor);
 
-      if(_state == STARTING) {
-	_state = STARTED;
+      if(_manager->_state == TimerManager::STARTING) {
+
+	_manager->_state = TimerManager::STARTED;
+
+	_manager->_monitor.notifyAll();
       }
     }
 
@@ -81,26 +80,38 @@
 
       std::set<TimerManager::Task*> expiredTasks;
 
-      {Synchronized(_manager->_monitor);
-	
-	long long now = Util::currentTime();
+      {Synchronized s(_manager->_monitor);
+
+	/* Update next timeout if necessary */
 
 	task_iterator expiredTaskEnd;
-	
-	while(_state == STARTED && 
-	      (expiredTaskEnd = _manager->_taskMap.upper_bound(now)) == _manager->_taskMap.end()) {
-	  
-	  _manager->_monitor.wait(_manager->_nextTimeout - now);
+
+	while(_manager->_state == TimerManager::STARTED && 
+	      (expiredTaskEnd = _manager->_taskMap.upper_bound(Util::currentTime())) == _manager->_taskMap.begin()) {
+
+	  long long timeout = 0LL;
+
+	  if(!_manager->_taskMap.empty()) {
+
+	    timeout = Util::currentTime() - _manager->_taskMap.begin()->first;
+	  }
+	    
+ 	  _manager->_monitor.wait(timeout);
 	  
 	}
 	
-	if(_state == STARTED) {
+	if(_manager->_state == TimerManager::STARTED) {
 	  
 	  for(task_iterator ix = _manager->_taskMap.begin(); ix != expiredTaskEnd; ix++) {
 
 	    TimerManager::Task* task = ix->second;
 	    
 	    expiredTasks.insert(task);
+
+	    if(task->_state == TimerManager::Task::WAITING) {
+
+	      task->_state = TimerManager::Task::EXECUTING;
+	    }
 	    
 	    _manager->_taskCount--;
 	  }
@@ -116,13 +127,13 @@
 	delete *ix;
       }
       
-    } while(_state == STARTED);
+    } while(_manager->_state == TimerManager::STARTED);
 
-    {Synchronized(_manager->_monitor);
+    {Synchronized s(_manager->_monitor);
 
-      if(_state == STOPPING) {
+      if(_manager->_state == TimerManager::STOPPING) {
 
-	_state = STOPPED; 
+	_manager->_state = TimerManager::STOPPED; 
 
 	_manager->_monitor.notify();
 
@@ -137,13 +148,73 @@
   TimerManager* _manager;
 
   friend class TimerManager;
-
-  STATE _state;
 };
 
-TimerManager::TimerManager() {}
+TimerManager::TimerManager() :
+  _state(TimerManager::UNINITIALIZED),
+  _dispatcher(new Dispatcher(this)) {
+}
 
-TimerManager::~TimerManager() {}
+
+TimerManager::~TimerManager() {
+  delete _dispatcher;
+}
+
+void TimerManager::start() {
+
+  bool doStart = false;
+
+  {Synchronized s(_monitor);
+
+    if(_threadFactory == NULL) {throw InvalidArgumentException();}
+
+    if(_state == TimerManager::UNINITIALIZED) {
+
+      _state = TimerManager::STARTING;
+
+      doStart = true;
+    }
+  }
+
+  if(doStart) {
+
+    _dispatcherThread = _threadFactory->newThread(_dispatcher);
+
+    _dispatcherThread->start();
+  }
+
+  {Synchronized s(_monitor);
+
+    while(_state == TimerManager::STARTING) {
+
+      _monitor.wait();
+    }
+    
+    assert(_state != TimerManager::STARTING);
+  }
+}
+
+void TimerManager::stop() {
+
+  {Synchronized s(_monitor);
+
+    if(_state == TimerManager::UNINITIALIZED) {
+
+      _state = TimerManager::STOPPED;
+
+    } else if(_state != STOPPING &&  _state != STOPPED) {
+
+      _state = STOPPING;
+
+      _monitor.notifyAll();
+    }
+
+    while(_state != STOPPED) {
+
+      _monitor.wait();
+    }
+  }
+}
 
 const ThreadFactory* TimerManager::threadFactory() const {
 
@@ -159,6 +230,11 @@
   _threadFactory = value;
 }
 
+size_t TimerManager::taskCount() const {
+
+  return _taskCount;
+}
+      
 void TimerManager::add(Runnable* task, long long timeout) {
 
   long long now = Util::currentTime();
@@ -167,6 +243,10 @@
 
   {Synchronized s(_monitor); 
 
+    if(_state != TimerManager::STARTED) {
+      throw IllegalStateException();
+    }
+
     _taskCount++;
 
     _taskMap.insert(std::pair<long long, Task*>(timeout, new Task(task)));
@@ -174,15 +254,10 @@
     /* If the task map was empty, or if we have an expiration that is earlier than any previously seen,
        kick the dispatcher so it can update its timeout */
 
-    if(_taskCount == 1 || timeout < _nextTimeout) {
+    if(_taskCount == 1 || timeout < _taskMap.begin()->first) {
 
       _monitor.notify();
     }
-    
-    if(timeout < _nextTimeout) {
-
-      _nextTimeout = timeout;
-    }
   }
 }
 
@@ -192,20 +267,26 @@
 
   Util::toMilliseconds(expiration, value);
 
-  /* XXX
-     Need to convert this to an explicit exception */
-
   long long now = Util::currentTime();
 
-  assert(expiration < now);
+  if(expiration < now) {
+    throw  InvalidArgumentException();
+  }
 
   add(task, expiration - now);
 }
 
 
 void TimerManager::remove(Runnable* task) {
+  {Synchronized s(_monitor); 
 
+    if(_state != TimerManager::STARTED) {
+      throw IllegalStateException();
+    }
+  }
 }
 
+const TimerManager::STATE TimerManager::state() const { return _state;}
+
 }}} // facebook::thrift::concurrency
 
diff --git a/lib/cpp/src/concurrency/TimerManager.h b/lib/cpp/src/concurrency/TimerManager.h
index 002460a..2681a5a 100644
--- a/lib/cpp/src/concurrency/TimerManager.h
+++ b/lib/cpp/src/concurrency/TimerManager.h
@@ -1,6 +1,7 @@
 #if !defined(_concurrency_TimerManager_h_)
 #define _concurrency_TimerManager_h_ 1
 
+#include "Exception.h"
 #include "Monitor.h"
 #include "Thread.h"
 
@@ -9,7 +10,7 @@
 #include <time.h>
 
 namespace facebook { namespace thrift { namespace concurrency { 
-      
+
 /** Timer Manager 
 	  
     This class dispatches timer tasks when they fall due.
@@ -23,31 +24,56 @@
 
   TimerManager();
 
-  virtual ~TimerManager() = 0;
+  virtual ~TimerManager();
 
-  virtual const ThreadFactory* threadFactory() const = 0;
+  virtual const ThreadFactory* threadFactory() const;
 
-  virtual void threadFactory(const ThreadFactory* value) = 0;
+  virtual void threadFactory(const ThreadFactory* value);
 
-  virtual size_t taskCount() const  = 0;
+  /** Starts the timer manager service 
+
+      @throws IllegalArgumentException Missing thread factory attribute */
+
+  virtual void start();
+
+  /** Stops the timer manager service */
+
+  virtual void stop();
+
+  virtual size_t taskCount() const ;
 
   /** Adds a task to be executed at some time in the future by a worker thread.
       
       @param task The task to execute
       @param timeout Time in milliseconds to delay before executing task */
 
-  virtual void add(Runnable* task, long long timeout) = 0;
+  virtual void add(Runnable* task, long long timeout);
 
   /** Adds a task to be executed at some time in the future by a worker thread.
       
       @param task The task to execute
       @param timeout Absolute time in the future to execute task. */ 
 
-  virtual void add(Runnable* task, const struct timespec& timeout) = 0;
+  virtual void add(Runnable* task, const struct timespec& timeout);
 
-  /** Removes a pending task */
+  /** Removes a pending task 
 
-  virtual void remove(Runnable* task) = 0;
+      @throws NoSuchTaskException Specified task doesn't exist.  It was either processed already or this call was made for a task that
+      was never added to this timer
+
+      @throws UncancellableTaskException Specified task is already being executed or has completed execution. */
+
+  virtual void remove(Runnable* task);
+
+  enum STATE {
+    UNINITIALIZED = 1000,
+    STARTING = 1001,
+    STARTED = 1002,
+    STOPPING = 1003,
+    STOPPED = 1004
+  };
+  
+  virtual const STATE state() const;
 
  private:
   
@@ -61,15 +87,18 @@
 
   size_t _taskCount;
 
-  long long _nextTimeout;
-
   Monitor _monitor;
 
+  STATE _state;
+
   class Dispatcher;
 
   friend class Dispatcher;
 
   Dispatcher* _dispatcher;
+
+  Thread* _dispatcherThread;
+
 };
 
 }}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/test/Tests.cc b/lib/cpp/src/concurrency/test/Tests.cc
new file mode 100644
index 0000000..36f29ff
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/Tests.cc
@@ -0,0 +1,52 @@
+#include <iostream>
+#include <string>
+
+#include "ThreadFactoryTests.h"
+#include "TimerManagerTests.h"
+
+int main(int argc, char** argv) {
+
+  std::string arg;
+
+  if(argc < 2) {
+
+    arg = "all";
+
+  } else {
+    
+    arg = std::string(argv[1]);
+  }
+
+  bool runAll = arg.compare("all") == 0;
+
+  if(runAll || arg.compare("thread-factory") == 0) {
+
+    ThreadFactoryTests threadFactoryTests;
+    
+    std::cout << "ThreadFactory tests..." << std::endl;
+    
+    std::cout << "\tThreadFactory hello-world test" << std::endl;
+
+    assert(threadFactoryTests.helloWorldTest());
+
+    std::cout << "\t\tThreadFactory reap N threads test: N = 100" << std::endl;
+
+    assert(threadFactoryTests.reapNThreads(100));
+
+    std::cout << "\t\tThreadFactory synchrous start test" << std::endl;
+
+    assert(threadFactoryTests.synchStartTest());
+  }
+
+  if(runAll || arg.compare("timer-manager") == 0) {
+
+    std::cout << "TimerManager tests..." << std::endl;
+
+    std::cout << "\t\tTimerManager test00" << std::endl;
+
+    TimerManagerTests timerManagerTests;
+
+    assert(timerManagerTests.test00());
+  }
+}
+
diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
new file mode 100644
index 0000000..0d93564
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
@@ -0,0 +1,228 @@
+#include <Thread.h>
+#include <PosixThreadFactory.h>
+#include <Monitor.h>
+
+#include <assert.h>
+#include <iostream>
+#include <set>
+
+namespace facebook { namespace thrift { namespace concurrency { namespace test {
+
+using namespace facebook::thrift::concurrency;
+
+/** ThreadManagerTests class 
+
+    @author marc
+    @version $Id:$ */
+
+class ThreadFactoryTests {
+
+ class Task: public Runnable {
+
+  public:
+
+    Task() {}
+
+    void run() {
+      std::cout << "\t\t\tHello World" << std::endl;
+    }
+  };
+
+public:
+
+  /** Hello world test */
+
+  bool helloWorldTest() {
+
+    PosixThreadFactory threadFactory =  PosixThreadFactory();
+
+    Task* task = new ThreadFactoryTests::Task();
+
+    Thread* thread = threadFactory.newThread(task);
+
+    thread->start();
+
+    thread->join();
+
+    delete thread;
+
+    delete task;
+
+    std::cout << "\t\t\tSuccess!" << std::endl;
+
+    return true;
+  }
+
+  /** Reap N threads  */
+
+ class ReapNTask: public Runnable {
+
+  public:
+
+  ReapNTask(Monitor& monitor, int& activeCount) :
+    _monitor(monitor),
+      _count(activeCount) {
+      }
+    
+    void run() {
+
+      {Synchronized s(_monitor);
+
+	_count--;
+
+	//std::cout << "\t\t\tthread count: " << _count << std::endl;
+
+	if(_count == 0) {
+	  _monitor.notify();
+	}
+      }
+    }
+
+    Monitor& _monitor;
+
+    int& _count;
+  };
+
+  bool reapNThreads(int count=100) {
+
+    Monitor* monitor = new Monitor();
+
+    int* activeCount  = new int(count);
+
+    PosixThreadFactory threadFactory =  PosixThreadFactory();
+
+    std::set<Thread*> threads;
+
+    for(int ix = 0; ix < count; ix++) {
+      threads.insert(threadFactory.newThread(new ReapNTask(*monitor, *activeCount)));
+    }
+
+    for(std::set<Thread*>::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
+
+      (*thread)->start();
+    }
+
+
+    {Synchronized s(*monitor);
+
+      while(*activeCount > 0) {
+	monitor->wait(1000);
+      }
+    }
+
+    for(std::set<Thread*>::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
+
+      delete (*thread)->runnable();
+
+      delete *thread;
+    }
+
+    std::cout << "\t\t\tSuccess!" << std::endl;
+
+    return true;
+  }
+
+ class SynchStartTask: public Runnable {
+
+  public:
+
+    enum STATE {
+      UNINITIALIZED = 1000,
+      STARTING = 1001,
+      STARTED = 1002,
+      STOPPING = 1003,
+      STOPPED = 1004
+    };
+
+  SynchStartTask(Monitor& monitor,
+		 volatile  STATE& state) :
+    _monitor(monitor),
+    _state(state) {
+    }
+
+    void run() {
+
+      {Synchronized s(_monitor);
+
+	if(_state == SynchStartTask::STARTING) {
+	  _state = SynchStartTask::STARTED;
+	  _monitor.notify();
+	}
+      }
+
+      {Synchronized s(_monitor);
+	
+	while(_state == SynchStartTask::STARTED) {
+	  _monitor.wait();
+	}
+
+	if(_state == SynchStartTask::STOPPING) {
+	  
+	  _state = SynchStartTask::STOPPED;
+	  
+	  _monitor.notifyAll();
+	}
+      }
+    }
+
+    private:
+    Monitor& _monitor;
+    volatile  STATE& _state;
+  };
+
+  bool synchStartTest() {
+
+    Monitor monitor;
+    
+    SynchStartTask::STATE state = SynchStartTask::UNINITIALIZED;
+    
+    SynchStartTask* task = new SynchStartTask(monitor, state);
+
+    PosixThreadFactory threadFactory =  PosixThreadFactory();
+
+    Thread* thread = threadFactory.newThread(task);
+
+    if(state == SynchStartTask::UNINITIALIZED) {
+
+      state = SynchStartTask::STARTING;
+
+      thread->start();
+    }
+
+    {Synchronized s(monitor);
+      
+      while(state == SynchStartTask::STARTING) {
+	monitor.wait();
+      }
+    }
+
+    assert(state != SynchStartTask::STARTING);
+
+    {Synchronized s(monitor);
+
+      monitor.wait(100);
+
+      if(state == SynchStartTask::STARTED) {
+
+	state = SynchStartTask::STOPPING;
+
+	monitor.notify();
+      }
+      
+      while(state == SynchStartTask::STOPPING) {
+	monitor.wait();
+      }
+    }
+
+    assert(state == SynchStartTask::STOPPED);
+
+    return true;
+  }
+
+};
+  
+
+}}}} // facebook::thrift::concurrency
+
+using namespace facebook::thrift::concurrency::test;
+
diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.cc b/lib/cpp/src/concurrency/test/TimerManagerTests.cc
deleted file mode 100644
index abd0e95..0000000
--- a/lib/cpp/src/concurrency/test/TimerManagerTests.cc
+++ /dev/null
@@ -1,33 +0,0 @@
-#include <ThreadManager.h>
-#include <PosixThreadFactory.h>
-
-#include <assert.h>
-
-namespace facebook { namespace thrift { namespace concurrency { namespace test {
-
-/** ThreadManagerTests class */
-
-
-class ThreadManagerTests {
-
-  void init() {
-
-    ThreadManager* threadManager =  ThreadManager::newThreadManager();
-
-    threadManager->poolPolicy(new BasicPoolPolicy());
-
-    threadManager->threadFactory(new PosixThreadFactory());
-
-    threadManager->poolPolicy(new BasicPoolPolicy());
-  }
-};
-  
-
-}}}} // facebook::thrift::concurrency
-
-int main(int argc, char** argv) {
-
-  return 0;
-
-}
-
diff --git a/lib/cpp/src/concurrency/test/TimerManagerTests.h b/lib/cpp/src/concurrency/test/TimerManagerTests.h
new file mode 100644
index 0000000..24f7964
--- /dev/null
+++ b/lib/cpp/src/concurrency/test/TimerManagerTests.h
@@ -0,0 +1,81 @@
+#include <TimerManager.h>
+#include <PosixThreadFactory.h>
+#include <Monitor.h>
+
+#include <assert.h>
+#include <iostream>
+
+namespace facebook { namespace thrift { namespace concurrency { namespace test {
+
+using namespace facebook::thrift::concurrency;
+
+/** ThreadManagerTests class 
+
+    @author marc
+    @version $Id:$ */
+
+class TimerManagerTests {
+
+ class Task: public Runnable {
+
+  public:
+    
+  Task(Monitor& monitor) : 
+    _monitor(monitor),
+    _done(false) {}
+
+    void run() {
+
+      std::cout << "\t\t\tHello World" << std::endl;
+
+      _done = true;
+      
+      {Synchronized s(_monitor);
+	_monitor.notifyAll();
+      }
+    }
+    
+    Monitor& _monitor;
+    bool _done;
+  };
+
+public:
+
+  bool test00() {
+
+    TimerManager* timerManager =  new TimerManager();
+
+    timerManager->threadFactory(new PosixThreadFactory());
+
+    timerManager->start();
+
+    assert(timerManager->state() == TimerManager::STARTED);
+
+    TimerManagerTests::Task* task = new TimerManagerTests::Task(_monitor);
+
+    {Synchronized s(_monitor);
+
+      timerManager->add(task, 1000LL);
+
+      _monitor.wait();
+    }
+
+    assert(task->_done);
+
+    delete task;
+
+    std::cout << "\t\t\tSuccess!" << std::endl;
+
+    return true;
+  }
+
+  friend class TestTask;
+
+  Monitor _monitor;
+};
+  
+
+}}}} // facebook::thrift::concurrency
+
+using namespace facebook::thrift::concurrency::test;
+