Modified PosixThreadFactory
        Added explicit detached getter and setter
Modified PosixThreadFactory::~PThread:
        Check for join failing and don't transition to detached_ state if it does.  Potential thread-handle leak for
        threads created joinable who aren't referenced by any external thread.  Solution for now has to be
        "DONT DO THAT", the clever approach doesn't always work.

Added ThreadFactoryTests.floodNThreads:
        Loop M times for N threads where M x N is bigger than 32K to verify that detached threads can be created
        ad infinitum.

Reviewed By: mcslee

Revert Plan: revertible

Test Plan: concurrency_test thread-factory passes


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665130 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cpp b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
index 2e8ed47..73aba61 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.cpp
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cpp
@@ -63,13 +63,13 @@
 
   ~PthreadThread() {
     /* Nothing references this thread, if is is not detached, do a join
-       now, otherwise the thread-id and, possibly, other resources will 
+       now, otherwise the thread-id and, possibly, other resources will
        be leaked. */
     if(!detached_) {
       try {
         join();
       } catch(...) {
-        // We're really hosed. 
+        // We're really hosed.
       }
     }
   }
@@ -84,9 +84,9 @@
         throw SystemResourceException("pthread_attr_init failed");
     }
 
-    if(pthread_attr_setdetachstate(&thread_attr, 
-                                   detached_ ? 
-                                   PTHREAD_CREATE_DETACHED : 
+    if(pthread_attr_setdetachstate(&thread_attr,
+                                   detached_ ?
+                                   PTHREAD_CREATE_DETACHED :
                                    PTHREAD_CREATE_JOINABLE) != 0) {
         throw SystemResourceException("pthread_attr_setdetachstate failed");
     }
@@ -123,12 +123,18 @@
   void join() {
     if (!detached_ && state_ != uninitialized) {
       void* ignore;
-      pthread_join(pthread_, &ignore);
-      detached_ = true;
+      /* XXX
+         If join fails it is most likely due to the fact
+         that the last reference was the thread itself and cannot
+         join.  This results in leaked threads and will eventually
+         cause the process to run out of thread resources.
+         We're beyond the point of throwing an exception.  Not clear how
+         best to handle this. */
+      detached_ = pthread_join(pthread_, &ignore) == 0;
     }
   }
 
-  id_t id() {
+  id_t getId() {
     return static_cast<id_t>(pthread_);
   }
 
@@ -234,13 +240,11 @@
     return result;
   }
 
-  int stackSize() const { return stackSize_; }
+  int getStackSize() const { return stackSize_; }
 
-  void stackSize(int value) { stackSize_ = value; }
+  void setStackSize(int value) { stackSize_ = value; }
 
-  PRIORITY priority() const { return priority_; }
-
-  Thread::id_t currentThreadId() const {return static_cast<id_t>(pthread_self());}
+  PRIORITY getPriority() const { return priority_; }
 
   /**
    * Sets priority.
@@ -248,7 +252,14 @@
    *  XXX
    *  Need to handle incremental priorities properly.
    */
-  void priority(PRIORITY value) { priority_ = value; }
+  void setPriority(PRIORITY value) { priority_ = value; }
+
+  bool isDetached() const { return detached_; }
+
+  void setDetached(bool value) { detached_ = value; }
+
+  Thread::id_t getCurrentThreadId() const {return static_cast<id_t>(pthread_self());}
+
 };
 
 PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
@@ -256,14 +267,18 @@
 
 shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
 
-int PosixThreadFactory::stackSize() const { return impl_->stackSize(); }
+int PosixThreadFactory::getStackSize() const { return impl_->getStackSize(); }
 
-void PosixThreadFactory::stackSize(int value) { impl_->stackSize(value); }
+void PosixThreadFactory::setStackSize(int value) { impl_->setStackSize(value); }
 
-PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return impl_->priority(); }
+PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { return impl_->getPriority(); }
 
-void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { impl_->priority(value); }
+void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { impl_->setPriority(value); }
 
-Thread::id_t PosixThreadFactory::currentThreadId() const { return impl_->currentThreadId(); }
+bool PosixThreadFactory::isDetached() const { return impl_->isDetached(); }
+
+void PosixThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
+
+Thread::id_t PosixThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
 
 }}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.h b/lib/cpp/src/concurrency/PosixThreadFactory.h
index 16be14d..894d5f5 100644
--- a/lib/cpp/src/concurrency/PosixThreadFactory.h
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.h
@@ -11,10 +11,10 @@
 
 #include <boost/shared_ptr.hpp>
 
-namespace facebook { namespace thrift { namespace concurrency { 
+namespace facebook { namespace thrift { namespace concurrency {
 
 /**
- * A thread factory to create posix threads 
+ * A thread factory to create posix threads
  *
  * @author marc
  * @version $Id:$
@@ -52,20 +52,20 @@
     DECREMENT = 8
   };
 
-  /** 
-   * Posix thread (pthread) factory.  All threads created by a factory are reference-counted 
-   * via boost::shared_ptr and boost::weak_ptr.  The factory guarantees that threads and 
+  /**
+   * Posix thread (pthread) factory.  All threads created by a factory are reference-counted
+   * via boost::shared_ptr and boost::weak_ptr.  The factory guarantees that threads and
    * the Runnable tasks they host will be properly cleaned up once the last strong reference
    * to both is given up.
    *
    * Threads are created with the specified policy, priority, stack-size and detachable-mode
-   * detached means the thread is free-running and will release all system resources the 
-   * when it completes.  A detachable thread is not joinable.  The join method 
-   * of a detachable thread will return immediately with no error.  
+   * detached means the thread is free-running and will release all system resources the
+   * when it completes.  A detachable thread is not joinable.  The join method
+   * of a detachable thread will return immediately with no error.
    *
    * Joinable threads will detach themselves iff they were not explicitly joined and
    * there are no remaining strong references to the thread.  This guarantees that
-   * joinnable threads don't leak resources even when the application neglects to 
+   * joinnable threads don't leak resources even when the application neglects to
    * call join explicitly.
    *
    * By default threads are joinable.
@@ -77,32 +77,42 @@
   boost::shared_ptr<Thread> newThread(boost::shared_ptr<Runnable> runnable) const;
 
   // From ThreadFactory;
-  Thread::id_t currentThreadId() const;
-
-  /**
-   * Sets stack size for created threads
-   *
-   * @param value size in megabytes
-   */
-  virtual void stackSize(int value);
+  Thread::id_t getCurrentThreadId() const;
 
   /**
    * Gets stack size for created threads
    *
    * @return int size in megabytes
    */
-  virtual int stackSize() const;
+  virtual int getStackSize() const;
 
   /**
-   * Sets priority relative to current policy
+   * Sets stack size for created threads
+   *
+   * @param value size in megabytes
    */
-  virtual void priority(PRIORITY priority);
+  virtual void setStackSize(int value);
 
   /**
    * Gets priority relative to current policy
    */
-  virtual PRIORITY priority() const;
-  
+  virtual PRIORITY getPriority() const;
+
+  /**
+   * Sets priority relative to current policy
+   */
+  virtual void setPriority(PRIORITY priority);
+
+  /**
+   * Sets detached mode of threads
+   */
+  virtual void setDetached(bool detached);
+
+  /**
+   * Gets current detached mode
+   */
+  virtual bool isDetached() const;
+
  private:
   class Impl;
   boost::shared_ptr<Impl> impl_;
diff --git a/lib/cpp/src/concurrency/Thread.h b/lib/cpp/src/concurrency/Thread.h
index 3e5964f..39ee816 100644
--- a/lib/cpp/src/concurrency/Thread.h
+++ b/lib/cpp/src/concurrency/Thread.h
@@ -75,7 +75,7 @@
   /**
    * Gets the thread's platform-specific ID
    */
-  virtual id_t id() = 0;
+  virtual id_t getId() = 0;
 
   /**
    * Gets the runnable object this thread is hosting
@@ -104,7 +104,7 @@
 
   static const Thread::id_t unknown_thread_id;
 
-  virtual Thread::id_t currentThreadId() const = 0;
+  virtual Thread::id_t getCurrentThreadId() const = 0;
 };
 
 }}} // facebook::thrift::concurrency
diff --git a/lib/cpp/src/concurrency/ThreadManager.cpp b/lib/cpp/src/concurrency/ThreadManager.cpp
index 56de4d0..3d87724 100644
--- a/lib/cpp/src/concurrency/ThreadManager.cpp
+++ b/lib/cpp/src/concurrency/ThreadManager.cpp
@@ -308,7 +308,7 @@
     shared_ptr<ThreadManager::Worker> worker = dynamic_pointer_cast<ThreadManager::Worker, Runnable>((*ix)->runnable());
     worker->state_ = ThreadManager::Worker::STARTING;
     (*ix)->start();
-    idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->id(), *ix));
+    idMap_.insert(std::pair<const Thread::id_t, shared_ptr<Thread> >((*ix)->getId(), *ix));
   }
 
   {
@@ -400,7 +400,7 @@
 
     for (std::set<shared_ptr<Thread> >::iterator ix = deadWorkers_.begin(); ix != deadWorkers_.end(); ix++) {
       workers_.erase(*ix);
-      idMap_.erase((*ix)->id());
+      idMap_.erase((*ix)->getId());
     }
 
     deadWorkers_.clear();
@@ -408,7 +408,7 @@
 }
 
   bool ThreadManager::Impl::canSleep() {
-    const Thread::id_t id = threadFactory_->currentThreadId();
+    const Thread::id_t id = threadFactory_->getCurrentThreadId();
     return idMap_.find(id) == idMap_.end();
   }
 
diff --git a/lib/cpp/src/concurrency/test/Tests.cpp b/lib/cpp/src/concurrency/test/Tests.cpp
index 5fe4651..a160472 100644
--- a/lib/cpp/src/concurrency/test/Tests.cpp
+++ b/lib/cpp/src/concurrency/test/Tests.cpp
@@ -33,11 +33,17 @@
     std::cout << "ThreadFactory tests..." << std::endl;
 
     size_t count =  1000;
+    size_t floodLoops =  1;
+    size_t floodCount =  100000;
 
     std::cout << "\t\tThreadFactory reap N threads test: N = " << count << std::endl;
 
     assert(threadFactoryTests.reapNThreads(count));
 
+    std::cout << "\t\tThreadFactory floodN threads test: N = " << floodCount << std::endl;
+
+    assert(threadFactoryTests.floodNTest(floodLoops, floodCount));
+
     std::cout << "\t\tThreadFactory synchronous start test" << std::endl;
 
     assert(threadFactoryTests.synchStartTest());
@@ -134,4 +140,3 @@
     }
   }
 }
-
diff --git a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
index 4cd6bd5..99bc94e 100644
--- a/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
+++ b/lib/cpp/src/concurrency/test/ThreadFactoryTests.h
@@ -112,7 +112,7 @@
           throw e;
         }
       }
-      
+
       tix = 0;
       for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); tix++, ++thread) {
 
@@ -123,14 +123,14 @@
           throw e;
         }
       }
-      
+
       {
         Synchronized s(*monitor);
         while (*activeCount > 0) {
           monitor->wait(1000);
         }
       }
-      
+
       for (std::set<shared_ptr<Thread> >::const_iterator thread = threads.begin(); thread != threads.end(); thread++) {
         threads.erase(*thread);
       }
@@ -276,6 +276,67 @@
 
     return success;
   }
+
+
+  class FloodTask : public Runnable {
+  public:
+
+    FloodTask(const size_t id) :_id(id) {}
+    ~FloodTask(){
+      if(_id % 1000 == 0) {
+        std::cout << "\t\tthread " << _id << " done" << std::endl;
+      }
+    }
+
+    void run(){
+      if(_id % 1000 == 0) {
+        std::cout << "\t\tthread " << _id << " started" << std::endl;
+      }
+
+      usleep(1);
+    }
+    const size_t _id;
+  };
+
+  void foo(PosixThreadFactory *tf) {
+  }
+
+  bool floodNTest(size_t loop=1, size_t count=100000) {
+
+    bool success = false;
+
+    for(size_t lix = 0; lix < loop; lix++) {
+
+      PosixThreadFactory threadFactory = PosixThreadFactory();
+      threadFactory.setDetached(true);
+
+        for(size_t tix = 0; tix < count; tix++) {
+
+          try {
+
+            shared_ptr<FloodTask> task(new FloodTask(lix * count + tix ));
+
+            shared_ptr<Thread> thread = threadFactory.newThread(task);
+
+            thread->start();
+
+            usleep(1);
+
+          } catch (TException& e) {
+
+            std::cout << "\t\t\tfailed to start  " << lix * count + tix << " thread " << e.what() << std::endl;
+
+            return success;
+          }
+        }
+
+        std::cout << "\t\t\tflooded " << (lix + 1) * count << " threads" << std::endl;
+
+        success = true;
+    }
+
+    return success;
+  }
 };
 
 const double ThreadFactoryTests::ERROR = .20;
diff --git a/lib/cpp/src/concurrency/test/ThreadManagerTests.h b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
index 5f518ca..a8fdcda 100644
--- a/lib/cpp/src/concurrency/test/ThreadManagerTests.h
+++ b/lib/cpp/src/concurrency/test/ThreadManagerTests.h
@@ -100,7 +100,7 @@
 
     shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
 
-    threadFactory->priority(PosixThreadFactory::HIGHEST);
+    threadFactory->setPriority(PosixThreadFactory::HIGHEST);
 
     threadManager->threadFactory(threadFactory);
 
@@ -239,7 +239,7 @@
 
       shared_ptr<PosixThreadFactory> threadFactory = shared_ptr<PosixThreadFactory>(new PosixThreadFactory());
 
-      threadFactory->priority(PosixThreadFactory::HIGHEST);
+      threadFactory->setPriority(PosixThreadFactory::HIGHEST);
 
       threadManager->threadFactory(threadFactory);