|  | #include "PosixThreadFactory.h" | 
|  |  | 
|  | #include <assert.h> | 
|  | #include <pthread.h> | 
|  |  | 
|  | #include <iostream> | 
|  |  | 
|  | #include <boost/weak_ptr.hpp> | 
|  |  | 
|  | namespace facebook { namespace thrift { namespace concurrency { | 
|  |  | 
|  | using namespace boost; | 
|  |  | 
|  | /** | 
|  | * The POSIX thread class. | 
|  | * | 
|  | * @author marc | 
|  | * @version $Id:$ | 
|  | */ | 
|  | class PthreadThread: public Thread { | 
|  | public: | 
|  |  | 
|  | enum STATE { | 
|  | uninitialized, | 
|  | starting, | 
|  | started, | 
|  | stopping, | 
|  | stopped | 
|  | }; | 
|  |  | 
|  | static const int MB = 1024 * 1024; | 
|  |  | 
|  | static void* threadMain(void* arg); | 
|  |  | 
|  | private: | 
|  | pthread_t _pthread; | 
|  | STATE _state; | 
|  | int _policy; | 
|  | int _priority; | 
|  | int _stackSize; | 
|  | weak_ptr<PthreadThread> _self; | 
|  |  | 
|  | public: | 
|  |  | 
|  | PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) : | 
|  | _pthread(0), | 
|  | _state(uninitialized), | 
|  | _policy(policy), | 
|  | _priority(priority), | 
|  | _stackSize(stackSize) { | 
|  |  | 
|  | this->Thread::runnable(runnable); | 
|  | } | 
|  |  | 
|  | ~PthreadThread() {} | 
|  |  | 
|  | void start() { | 
|  | if (_state != uninitialized) { | 
|  | return; | 
|  | } | 
|  |  | 
|  | _state = starting; | 
|  |  | 
|  | pthread_attr_t thread_attr; | 
|  | assert(pthread_attr_init(&thread_attr) == 0); | 
|  | assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0); | 
|  |  | 
|  | // Set thread stack size | 
|  | assert(pthread_attr_setstacksize(&thread_attr, MB * _stackSize) == 0); | 
|  |  | 
|  | // Set thread policy | 
|  | assert(pthread_attr_setschedpolicy(&thread_attr, _policy) == 0); | 
|  |  | 
|  | struct sched_param sched_param; | 
|  | sched_param.sched_priority = _priority; | 
|  |  | 
|  | // Set thread priority | 
|  | assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0); | 
|  |  | 
|  | // Create reference | 
|  | shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>(); | 
|  | *selfRef = _self.lock(); | 
|  | assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)selfRef) == 0); | 
|  | } | 
|  |  | 
|  | void join() { | 
|  | if (_state != stopped) { | 
|  | void* ignore; | 
|  | pthread_join(_pthread, &ignore); | 
|  | } | 
|  | } | 
|  |  | 
|  | shared_ptr<Runnable> runnable() const { return Thread::runnable(); } | 
|  |  | 
|  | void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); } | 
|  |  | 
|  | void weakRef(shared_ptr<PthreadThread> self) { | 
|  | assert(self.get() == this); | 
|  | _self = weak_ptr<PthreadThread>(self); | 
|  | } | 
|  | }; | 
|  |  | 
|  | void* PthreadThread::threadMain(void* arg) { | 
|  | // XXX need a lock here when testing thread state | 
|  | shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg; | 
|  | delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg); | 
|  |  | 
|  | if (thread == NULL) { | 
|  | return (void*)0; | 
|  | } | 
|  |  | 
|  | if (thread->_state != starting) { | 
|  | return (void*)0; | 
|  | } | 
|  |  | 
|  | thread->_state = starting; | 
|  | thread->runnable()->run(); | 
|  | if (thread->_state != stopping && thread->_state != stopped) { | 
|  | thread->_state = stopping; | 
|  | } | 
|  |  | 
|  | return (void*)0; | 
|  | } | 
|  |  | 
|  | /** | 
|  | * POSIX Thread factory implementation | 
|  | */ | 
|  | class PosixThreadFactory::Impl { | 
|  |  | 
|  | private: | 
|  | POLICY _policy; | 
|  | PRIORITY _priority; | 
|  | int _stackSize; | 
|  | bool _detached; | 
|  |  | 
|  | /** | 
|  | * Converts generic posix thread schedule policy enums into pthread | 
|  | * API values. | 
|  | */ | 
|  | static int toPthreadPolicy(POLICY policy) { | 
|  | switch(policy) { | 
|  | case OTHER: return SCHED_OTHER; break; | 
|  | case FIFO: return SCHED_FIFO; break; | 
|  | case ROUND_ROBIN: return SCHED_RR; break; | 
|  | default: return SCHED_OTHER; break; | 
|  | } | 
|  | } | 
|  |  | 
|  | /** | 
|  | * Converts relative thread priorities to absolute value based on posix | 
|  | * thread scheduler policy | 
|  | * | 
|  | *  The idea is simply to divide up the priority range for the given policy | 
|  | * into the correpsonding relative priority level (lowest..highest) and | 
|  | * then pro-rate accordingly. | 
|  | */ | 
|  | static int toPthreadPriority(POLICY policy, PRIORITY priority) { | 
|  | int pthread_policy = toPthreadPolicy(policy); | 
|  | int min_priority = sched_get_priority_min(pthread_policy); | 
|  | int max_priority = sched_get_priority_max(pthread_policy); | 
|  | int quanta = (HIGHEST - LOWEST) + 1; | 
|  | float stepsperquanta = (max_priority - min_priority) / quanta; | 
|  |  | 
|  | if(priority <= HIGHEST) { | 
|  | return (int)(min_priority + stepsperquanta * priority); | 
|  | } else { | 
|  | // should never get here for priority increments. | 
|  | assert(false); | 
|  | return (int)(min_priority + stepsperquanta * NORMAL); | 
|  | } | 
|  | } | 
|  |  | 
|  | public: | 
|  |  | 
|  | Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) : | 
|  | _policy(policy), | 
|  | _priority(priority), | 
|  | _stackSize(stackSize), | 
|  | _detached(detached) {} | 
|  |  | 
|  | /** | 
|  | * Creates a new POSIX thread to run the runnable object | 
|  | * | 
|  | * @param runnable A runnable object | 
|  | */ | 
|  | shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const { | 
|  | shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable)); | 
|  | result->weakRef(result); | 
|  | runnable->thread(result); | 
|  | return result; | 
|  | } | 
|  |  | 
|  | int stackSize() const { return _stackSize; } | 
|  |  | 
|  | void stackSize(int value) { _stackSize = value; } | 
|  |  | 
|  | PRIORITY priority() const { return _priority; } | 
|  |  | 
|  | /** | 
|  | * Sets priority. | 
|  | * | 
|  | *  XXX | 
|  | *  Need to handle incremental priorities properly. | 
|  | */ | 
|  | void priority(PRIORITY value) { _priority = value; } | 
|  | }; | 
|  |  | 
|  | PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) : | 
|  | _impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {} | 
|  |  | 
|  | shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return _impl->newThread(runnable); } | 
|  |  | 
|  | int PosixThreadFactory::stackSize() const { return _impl->stackSize(); } | 
|  |  | 
|  | void PosixThreadFactory::stackSize(int value) { _impl->stackSize(value); } | 
|  |  | 
|  | PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return _impl->priority(); } | 
|  |  | 
|  | void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { _impl->priority(value); } | 
|  |  | 
|  | }}} // facebook::thrift::concurrency |