Checkpoint of initial cut at thread pool manager for thrift and related concurrency classes.


git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@664721 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/concurrency/PosixThreadFactory.cc b/lib/cpp/src/concurrency/PosixThreadFactory.cc
new file mode 100644
index 0000000..e9d52f0
--- /dev/null
+++ b/lib/cpp/src/concurrency/PosixThreadFactory.cc
@@ -0,0 +1,215 @@
+#include "PosixThreadFactory.h"
+
+#include <assert.h>
+#include <pthread.h>
+
+namespace facebook { namespace thrift { namespace concurrency {
+
+/**  The POSIX thread class.  */
+
+class PthreadThread: public Thread {
+
+public:
+  enum STATE {uninitialized, 
+	      starting,
+	      started,
+	      stopping,
+	      stopped
+  };
+
+  static const int MB = 1024 * 1024;
+
+private:
+
+  pthread_t _pthread;
+
+  STATE _state;
+
+  int _policy;
+
+  int _priority;
+
+  int _stackSize;
+
+  Runnable* _runnable;
+
+  static void* threadMain(void* arg) {
+
+    // XXX need a lock here when testing thread state
+
+    PthreadThread* thread = (PthreadThread*)arg;
+
+    if(thread->_state != starting) {
+      return (void*)0;
+    }
+
+    thread->_state = starting;
+
+    thread->_runnable->run();
+
+    if(thread->_state != stopping && thread->_state != stopped) {
+      thread->_state = stopping;
+    }
+    
+    return (void*)0;
+  }
+
+public:
+  
+  PthreadThread(int policy, int priority, int stackSize, Runnable* runnable) : 
+    _pthread(0),
+    _state(uninitialized), 
+    _policy(policy),
+    _priority(priority),
+    _stackSize(stackSize),
+    _runnable(runnable)
+  {}
+
+  void start() {
+
+    if(_state != uninitialized) {
+      return;
+    }
+
+    _state = starting;
+
+    pthread_attr_t thread_attr;
+
+    assert(pthread_attr_init(&thread_attr) == 0);
+
+    assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0);
+
+    // Set thread stack size
+
+    assert(pthread_attr_setstacksize(&thread_attr, MB * _stackSize) == 0);
+
+    // Set thread policy
+
+    assert(pthread_attr_setschedpolicy(&thread_attr, _policy) == 0);
+
+    struct sched_param sched_param;
+    sched_param.sched_priority = _priority;
+
+    // Set thread priority
+
+    assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
+
+    assert(pthread_create(&_pthread, &thread_attr, PthreadThread::threadMain, (void*)this) == 0);
+  }
+
+  void join() {
+    
+    if(_state != stopped) {
+      
+      void* ignore;
+      
+      pthread_join(_pthread, &ignore);
+    }
+  }
+
+  const Runnable* runnable() const {return _runnable;}
+
+};
+
+/** POSIX Thread factory implementation */
+
+class PosixThreadFactory::Impl {
+
+private:
+
+  POLICY _policy;
+
+  PRIORITY _priority;
+
+  int _stackSize;
+
+  bool _detached;
+
+  /** Converts generic posix thread schedule policy enums into pthread API values. */
+
+  static int toPthreadPolicy(POLICY policy) {
+    switch(policy) {
+    case OTHER: return SCHED_OTHER; break;
+    case FIFO: return SCHED_FIFO; break;
+    case ROUND_ROBIN: return SCHED_RR; break;
+    default: return SCHED_OTHER; break;
+    }
+  }
+
+  /** Converts relative thread priorities to absolute value based on posix thread scheduler policy
+
+      The idea is simply to divide up the priority range for the given policy into the correpsonding relative
+      priority level (lowest..highest) and then prorate accordingly. */
+
+  static int toPthreadPriority(POLICY policy, PRIORITY priority) {
+
+    int pthread_policy = toPthreadPolicy(policy);
+
+    int min_priority = sched_get_priority_min(pthread_policy);
+
+    int max_priority = sched_get_priority_max(pthread_policy);
+
+    int quanta = (HIGHEST - LOWEST) + 1;
+
+    float stepsperquanta = (max_priority - min_priority) / quanta;
+
+    if(priority <= HIGHEST) {
+
+      return (int)(min_priority + stepsperquanta * priority);
+    } else {
+
+      // should never get here for priority increments.
+
+      assert(false);
+
+      return (int)(min_priority + stepsperquanta * NORMAL);
+    }
+  }
+
+public:
+
+  Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) : 
+    _policy(policy),
+    _priority(priority),
+    _stackSize(stackSize),
+    _detached(detached) {
+  }
+
+  /** Creates a new POSIX thread to run the runnable object 
+
+      @param runnable A runnable object */
+
+  Thread* newThread(Runnable* runnable) const {
+
+    return new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable);
+  }
+
+  int stackSize() const { return _stackSize;}
+
+  void stackSize(int value) { _stackSize = value;}
+
+  PRIORITY priority() const { return _priority;}
+
+  /** Sets priority.
+      
+      XXX
+      Need to handle incremental priorities properl. */
+
+  void priority(PRIORITY value) { _priority = value;}
+
+};
+
+PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) : 
+  _impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
+
+Thread* PosixThreadFactory::newThread(Runnable* runnable) const {return _impl->newThread(runnable);}
+
+int PosixThreadFactory::stackSize() const {return _impl->stackSize();}
+
+void PosixThreadFactory::stackSize(int value) {_impl->stackSize(value);}
+
+PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const {return _impl->priority();}
+
+void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) {_impl->priority(value);}
+
+}}} // facebook::thrift::concurrency