Modified PosixThreadFactory::PThread:
        Pay attention to detached flags.  If thread is create non-detached and has not been joined when all references are given up,
        (ie boost::share_ptr calls ~PThread) do the join in the destructor to prevent thread ids from being leaked.

Modified ThreadFactoryTests.reapNThreads:
        Loop M times for M threads where M x N is bigger than 32K to verify that thread ids aren't leaked

Modified TimerManager.cpp:
        Removed debug messages.

Reviewed By: mcslee

Revert Plan: revertible

Test Plan: concurrency_test thread-factory passes


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665129 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
index 5f86ea2..2e8ed47 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
@@ -46,35 +46,49 @@
   int priority_;
   int stackSize_;
   weak_ptr<PthreadThread> self_;
+  bool detached_;
 
  public:
 
-  PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
+  PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr<Runnable> runnable) :
     pthread_(0),
     state_(uninitialized),
     policy_(policy),
     priority_(priority),
-    stackSize_(stackSize) {
+    stackSize_(stackSize),
+    detached_(detached) {
 
     this->Thread::runnable(runnable);
   }
 
-  ~PthreadThread() {}
+  ~PthreadThread() {
+    /* Nothing references this thread, if is is not detached, do a join
+       now, otherwise the thread-id and, possibly, other resources will 
+       be leaked. */
+    if(!detached_) {
+      try {
+        join();
+      } catch(...) {
+        // We're really hosed. 
+      }
+    }
+  }
 
   void start() {
     if (state_ != uninitialized) {
       return;
     }
 
-    state_ = starting;
-
     pthread_attr_t thread_attr;
     if (pthread_attr_init(&thread_attr) != 0) {
         throw SystemResourceException("pthread_attr_init failed");
     }
 
-    if (pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) != 0) {
-      throw SystemResourceException("pthread_attr_setdetachstate failed");
+    if(pthread_attr_setdetachstate(&thread_attr, 
+                                   detached_ ? 
+                                   PTHREAD_CREATE_DETACHED : 
+                                   PTHREAD_CREATE_JOINABLE) != 0) {
+        throw SystemResourceException("pthread_attr_setdetachstate failed");
     }
 
     // Set thread stack size
@@ -99,20 +113,23 @@
     shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
     *selfRef = self_.lock();
 
+    state_ = starting;
+
     if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
       throw SystemResourceException("pthread_create failed");
     }
   }
 
   void join() {
-    if (state_ != stopped) {
+    if (!detached_ && state_ != uninitialized) {
       void* ignore;
       pthread_join(pthread_, &ignore);
+      detached_ = true;
     }
   }
 
   id_t id() {
-    return pthread_;
+    return static_cast<id_t>(pthread_);
   }
 
   shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
@@ -211,7 +228,7 @@
    * @param runnable A runnable object
    */
   shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
-    shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, runnable));
+    shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable));
     result->weakRef(result);
     runnable->thread(result);
     return result;
@@ -223,7 +240,7 @@
 
   PRIORITY priority() const { return priority_; }
 
-  Thread::id_t currentThreadId() const { return pthread_self(); }
+  Thread::id_t currentThreadId() const {return static_cast<id_t>(pthread_self());}
 
   /**
    * Sets priority.
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h
index 5f032c1..16be14d 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.h
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.h
@@ -52,6 +52,25 @@
     DECREMENT = 8
   };
 
+  /** 
+   * Posix thread (pthread) factory.  All threads created by a factory are reference-counted 
+   * via boost::shared_ptr and boost::weak_ptr.  The factory guarantees that threads and 
+   * the Runnable tasks they host will be properly cleaned up once the last strong reference
+   * to both is given up.
+   *
+   * Threads are created with the specified policy, priority, stack-size and detachable-mode
+   * detached means the thread is free-running and will release all system resources the 
+   * when it completes.  A detachable thread is not joinable.  The join method 
+   * of a detachable thread will return immediately with no error.  
+   *
+   * Joinable threads will detach themselves iff they were not explicitly joined and
+   * there are no remaining strong references to the thread.  This guarantees that
+   * joinnable threads don't leak resources even when the application neglects to 
+   * call join explicitly.
+   *
+   * By default threads are joinable.
+   */
+
   PosixThreadFactory(POLICY policy=ROUND_ROBIN, PRIORITY priority=NORMAL, int stackSize=1, bool detached=false);
 
   // From ThreadFactory;
diff --git a/lib/cpp/src/concurrency/TimerManager.cpp b/lib/cpp/src/concurrency/TimerManager.cpp
index 2134e37..b9e604c 100644
--- a/lib/cpp/src/concurrency/TimerManager.cpp
+++ b/lib/cpp/src/concurrency/TimerManager.cpp
@@ -40,8 +40,6 @@
     state_(WAITING) {}
   
   ~Task() {
-    //debug
-    std::cerr << "TimerManager::Task.dtor[" << this << "]" << std::endl; 
   }
   
   void run() {
@@ -64,10 +62,7 @@
   Dispatcher(TimerManager* manager) : 
     manager_(manager) {}
   
-  ~Dispatcher() {
-    // debug
-    std::cerr << "Dispatcher::dtor[" << this << "]" << std::endl;
-  }
+  ~Dispatcher() {}
   
   /**
    * Dispatcher entry point
@@ -148,13 +143,11 @@
 
   // If we haven't been explicitly stopped, do so now.  We don't need to grab
   // the monitor here, since stop already takes care of reentrancy.
-  std::cerr << "TimerManager::dtor[" << this << "]" << std::endl;
   
   if (state_ != STOPPED) {
     try {
       stop();
     } catch(...) {
-      std::cerr << "TimerManager::dtor[" << this << "] uhoh " << std::endl;
       throw;
       // uhoh
     }
diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
index f7d607a..4cd6bd5 100644
--- a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
+++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
@@ -47,7 +47,7 @@
    */
   bool helloWorldTest() {
 
-    PosixThreadFactory threadFactory =  PosixThreadFactory();
+    PosixThreadFactory threadFactory = PosixThreadFactory();
 
     shared_ptr<Task> task = shared_ptr<Task>(new ThreadFactoryTests::Task());
 
@@ -90,35 +90,52 @@
     int& _count;
   };
 
-  bool reapNThreads(int count=10) {
-
-    Monitor* monitor = new Monitor();
-
-    int* activeCount  = new int(count);
+  bool reapNThreads(int loop=1, int count=10) {
 
     PosixThreadFactory threadFactory =  PosixThreadFactory();
 
-    std::set<shared_ptr<Thread> > threads;
+    Monitor* monitor = new Monitor();
 
-    for (int ix = 0; ix < count; ix++) {
-      threads.insert(threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, *activeCount))));
-    }
+    for(int lix = 0; lix < loop; lix++) {
 
-    for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
+      int* activeCount  = new int(count);
 
-      (*thread)->start();
-    }
+      std::set<shared_ptr<Thread> > threads;
 
+      int tix;
 
-    {
-      Synchronized s(*monitor);
-      while (*activeCount > 0) {
-	monitor->wait(1000);
+      for (tix = 0; tix < count; tix++) {
+        try {
+          threads.insert(threadFactory.newThread(shared_ptr<Runnable>(new ReapNTask(*monitor, *activeCount))));
+        } catch(SystemResourceException& e) {
+          std::cout << "\t\t\tfailed to create " << lix * count + tix << " thread " << e.what() << std::endl;
+          throw e;
+        }
       }
-    }
+      
+      tix = 0;
+      for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); tix++, ++thread) {
 
-    for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
-      threads.erase(*thread);
+        try {
+          (*thread)->start();
+        } catch(SystemResourceException& e) {
+          std::cout << "\t\t\tfailed to start  " << lix * count + tix << " thread " << e.what() << std::endl;
+          throw e;
+        }
+      }
+      
+      {
+        Synchronized s(*monitor);
+        while (*activeCount > 0) {
+          monitor->wait(1000);
+        }
+      }
+      
+      for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
+        threads.erase(*thread);
+      }
+
+      std::cout << "\t\t\treaped " << lix * count << " threads" << std::endl;
     }
 
     std::cout << "\t\t\tSuccess!" << std::endl;