| #include "PosixThreadFactory.h" |
| |
| #include <assert.h> |
| #include <pthread.h> |
| |
| #include <iostream> |
| |
| #include <boost/weak_ptr.hpp> |
| |
| namespace facebook { namespace thrift { namespace concurrency { |
| |
| using namespace boost; |
| |
| /** |
| * The POSIX thread class. |
| * |
| * @author marc |
| * @version $Id:$ |
| */ |
| class PthreadThread: public Thread { |
| public: |
| |
| enum STATE { |
| uninitialized, |
| starting, |
| started, |
| stopping, |
| stopped |
| }; |
| |
| static const int MB = 1024 * 1024; |
| |
| static void* threadMain(void* arg); |
| |
| private: |
| pthread_t _pthread; |
| STATE _state; |
| int _policy; |
| int _priority; |
| int _stackSize; |
| weak_ptr<PthreadThread> _self; |
| |
| public: |
| |
| PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) : |
| _pthread(0), |
| _state(uninitialized), |
| _policy(policy), |
| _priority(priority), |
| _stackSize(stackSize) { |
| |
| this->Thread::runnable(runnable); |
| } |
| |
| ~PthreadThread() {} |
| |
| void start() { |
| if (_state != uninitialized) { |
| return; |
| } |
| |
| _state = starting; |
| |
| pthread_attr_t thread_attr; |
| assert(pthread_attr_init(&thread_attr) == 0); |
| assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0); |
| |
| // Set thread stack size |
| assert(pthread_attr_setstacksize(&thread_attr, MB * _stackSize) == 0); |
| |
| // Set thread policy |
| assert(pthread_attr_setschedpolicy(&thread_attr, _policy) == 0); |
| |
| struct sched_param sched_param; |
| sched_param.sched_priority = _priority; |
| |
| // Set thread priority |
| assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0); |
| |
| // Create reference |
| shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>(); |
| *selfRef = _self.lock(); |
| assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)selfRef) == 0); |
| } |
| |
| void join() { |
| if (_state != stopped) { |
| void* ignore; |
| pthread_join(_pthread, &ignore); |
| } |
| } |
| |
| shared_ptr<Runnable> runnable() const { return Thread::runnable(); } |
| |
| void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); } |
| |
| void weakRef(shared_ptr<PthreadThread> self) { |
| assert(self.get() == this); |
| _self = weak_ptr<PthreadThread>(self); |
| } |
| }; |
| |
| void* PthreadThread::threadMain(void* arg) { |
| // XXX need a lock here when testing thread state |
| shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg; |
| delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg); |
| |
| if (thread == NULL) { |
| return (void*)0; |
| } |
| |
| if (thread->_state != starting) { |
| return (void*)0; |
| } |
| |
| thread->_state = starting; |
| thread->runnable()->run(); |
| if (thread->_state != stopping && thread->_state != stopped) { |
| thread->_state = stopping; |
| } |
| |
| return (void*)0; |
| } |
| |
| /** |
| * POSIX Thread factory implementation |
| */ |
| class PosixThreadFactory::Impl { |
| |
| private: |
| POLICY _policy; |
| PRIORITY _priority; |
| int _stackSize; |
| bool _detached; |
| |
| /** |
| * Converts generic posix thread schedule policy enums into pthread |
| * API values. |
| */ |
| static int toPthreadPolicy(POLICY policy) { |
| switch(policy) { |
| case OTHER: return SCHED_OTHER; break; |
| case FIFO: return SCHED_FIFO; break; |
| case ROUND_ROBIN: return SCHED_RR; break; |
| default: return SCHED_OTHER; break; |
| } |
| } |
| |
| /** |
| * Converts relative thread priorities to absolute value based on posix |
| * thread scheduler policy |
| * |
| * The idea is simply to divide up the priority range for the given policy |
| * into the correpsonding relative priority level (lowest..highest) and |
| * then pro-rate accordingly. |
| */ |
| static int toPthreadPriority(POLICY policy, PRIORITY priority) { |
| int pthread_policy = toPthreadPolicy(policy); |
| int min_priority = sched_get_priority_min(pthread_policy); |
| int max_priority = sched_get_priority_max(pthread_policy); |
| int quanta = (HIGHEST - LOWEST) + 1; |
| float stepsperquanta = (max_priority - min_priority) / quanta; |
| |
| if (priority <= HIGHEST) { |
| return (int)(min_priority + stepsperquanta * priority); |
| } else { |
| // should never get here for priority increments. |
| assert(false); |
| return (int)(min_priority + stepsperquanta * NORMAL); |
| } |
| } |
| |
| public: |
| |
| Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) : |
| _policy(policy), |
| _priority(priority), |
| _stackSize(stackSize), |
| _detached(detached) {} |
| |
| /** |
| * Creates a new POSIX thread to run the runnable object |
| * |
| * @param runnable A runnable object |
| */ |
| shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const { |
| shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable)); |
| result->weakRef(result); |
| runnable->thread(result); |
| return result; |
| } |
| |
| int stackSize() const { return _stackSize; } |
| |
| void stackSize(int value) { _stackSize = value; } |
| |
| PRIORITY priority() const { return _priority; } |
| |
| /** |
| * Sets priority. |
| * |
| * XXX |
| * Need to handle incremental priorities properly. |
| */ |
| void priority(PRIORITY value) { _priority = value; } |
| }; |
| |
| PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) : |
| _impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {} |
| |
| shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return _impl->newThread(runnable); } |
| |
| int PosixThreadFactory::stackSize() const { return _impl->stackSize(); } |
| |
| void PosixThreadFactory::stackSize(int value) { _impl->stackSize(value); } |
| |
| PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return _impl->priority(); } |
| |
| void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { _impl->priority(value); } |
| |
| }}} // facebook::thrift::concurrency |