blob: f07db86a229b6674a3cae3af7abc5c006cd9b3ab [file] [log] [blame]
#include "PosixThreadFactory.h"
#include <assert.h>
#include <pthread.h>
#include <iostream>
#include <boost/weak_ptr.hpp>
namespace facebook { namespace thrift { namespace concurrency {
using namespace boost;
/** The POSIX thread class.
@author marc
@version $Id:$ */
class PthreadThread: public Thread {
public:
enum STATE {uninitialized,
starting,
started,
stopping,
stopped
};
static const int MB = 1024 * 1024;
static void* threadMain(void* arg);
private:
pthread_t _pthread;
STATE _state;
int _policy;
int _priority;
int _stackSize;
weak_ptr<PthreadThread> _self;
public:
PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
_pthread(0),
_state(uninitialized),
_policy(policy),
_priority(priority),
_stackSize(stackSize) {
this->Thread::runnable(runnable);
}
~PthreadThread() {
}
void start() {
if(_state != uninitialized) {
return;
}
_state = starting;
pthread_attr_t thread_attr;
assert(pthread_attr_init(&thread_attr) == 0);
assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0);
// Set thread stack size
assert(pthread_attr_setstacksize(&thread_attr, MB * _stackSize) == 0);
// Set thread policy
assert(pthread_attr_setschedpolicy(&thread_attr, _policy) == 0);
struct sched_param sched_param;
sched_param.sched_priority = _priority;
// Set thread priority
assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
*selfRef = _self.lock();
assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)selfRef) == 0);
}
void join() {
if(_state != stopped) {
void* ignore;
pthread_join(_pthread, &ignore);
}
}
shared_ptr<Runnable> runnable() const {return Thread::runnable();}
void runnable(shared_ptr<Runnable> value) {Thread::runnable(value);}
void weakRef(shared_ptr<PthreadThread> self) {
assert(self.get() == this);
_self = weak_ptr<PthreadThread>(self);
}
};
void* PthreadThread::threadMain(void* arg) {
// XXX need a lock here when testing thread state
shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
if(thread == NULL) {
return (void*)0;
}
if(thread->_state != starting) {
return (void*)0;
}
thread->_state = starting;
thread->runnable()->run();
if(thread->_state != stopping && thread->_state != stopped) {
thread->_state = stopping;
}
return (void*)0;
}
/** POSIX Thread factory implementation */
class PosixThreadFactory::Impl {
private:
POLICY _policy;
PRIORITY _priority;
int _stackSize;
bool _detached;
/** Converts generic posix thread schedule policy enums into pthread API values. */
static int toPthreadPolicy(POLICY policy) {
switch(policy) {
case OTHER: return SCHED_OTHER; break;
case FIFO: return SCHED_FIFO; break;
case ROUND_ROBIN: return SCHED_RR; break;
default: return SCHED_OTHER; break;
}
}
/** Converts relative thread priorities to absolute value based on posix thread scheduler policy
The idea is simply to divide up the priority range for the given policy into the correpsonding relative
priority level (lowest..highest) and then pro-rate accordingly. */
static int toPthreadPriority(POLICY policy, PRIORITY priority) {
int pthread_policy = toPthreadPolicy(policy);
int min_priority = sched_get_priority_min(pthread_policy);
int max_priority = sched_get_priority_max(pthread_policy);
int quanta = (HIGHEST - LOWEST) + 1;
float stepsperquanta = (max_priority - min_priority) / quanta;
if(priority <= HIGHEST) {
return (int)(min_priority + stepsperquanta * priority);
} else {
// should never get here for priority increments.
assert(false);
return (int)(min_priority + stepsperquanta * NORMAL);
}
}
public:
Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
_policy(policy),
_priority(priority),
_stackSize(stackSize),
_detached(detached) {
}
/** Creates a new POSIX thread to run the runnable object
@param runnable A runnable object */
shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable));
result->weakRef(result);
runnable->thread(result);
return result;
}
int stackSize() const { return _stackSize;}
void stackSize(int value) { _stackSize = value;}
PRIORITY priority() const { return _priority;}
/** Sets priority.
XXX
Need to handle incremental priorities properly. */
void priority(PRIORITY value) { _priority = value;}
};
PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
_impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const {return _impl->newThread(runnable);}
int PosixThreadFactory::stackSize() const {return _impl->stackSize();}
void PosixThreadFactory::stackSize(int value) {_impl->stackSize(value);}
PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const {return _impl->priority();}
void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) {_impl->priority(value);}
}}} // facebook::thrift::concurrency