blob: 5df87ec2b0000aa5da98325772f6f82a1213c631 [file] [log] [blame]
Marc Slemko66949872006-07-15 01:52:39 +00001#include "PosixThreadFactory.h"
2
3#include <assert.h>
4#include <pthread.h>
5
Marc Slemko6f038a72006-08-03 18:58:09 +00006#include <iostream>
7
8#include <boost/weak_ptr.hpp>
9
Marc Slemko66949872006-07-15 01:52:39 +000010namespace facebook { namespace thrift { namespace concurrency {
11
Marc Slemko6f038a72006-08-03 18:58:09 +000012using namespace boost;
13
Mark Sleef5f2be42006-09-05 21:05:31 +000014/**
15 * The POSIX thread class.
16 *
17 * @author marc
18 * @version $Id:$
19 */
Marc Slemko66949872006-07-15 01:52:39 +000020class PthreadThread: public Thread {
Mark Sleef5f2be42006-09-05 21:05:31 +000021 public:
Marc Slemko66949872006-07-15 01:52:39 +000022
Mark Sleef5f2be42006-09-05 21:05:31 +000023 enum STATE {
24 uninitialized,
25 starting,
26 started,
27 stopping,
28 stopped
Marc Slemko66949872006-07-15 01:52:39 +000029 };
30
31 static const int MB = 1024 * 1024;
32
Marc Slemko8a40a762006-07-19 17:46:50 +000033 static void* threadMain(void* arg);
34
Mark Sleef5f2be42006-09-05 21:05:31 +000035 private:
Marc Slemko66949872006-07-15 01:52:39 +000036 pthread_t _pthread;
Marc Slemko66949872006-07-15 01:52:39 +000037 STATE _state;
Marc Slemko66949872006-07-15 01:52:39 +000038 int _policy;
Marc Slemko66949872006-07-15 01:52:39 +000039 int _priority;
Marc Slemko66949872006-07-15 01:52:39 +000040 int _stackSize;
Marc Slemko6f038a72006-08-03 18:58:09 +000041 weak_ptr<PthreadThread> _self;
42
Mark Sleef5f2be42006-09-05 21:05:31 +000043 public:
Marc Slemko66949872006-07-15 01:52:39 +000044
Marc Slemko6f038a72006-08-03 18:58:09 +000045 PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
Marc Slemko66949872006-07-15 01:52:39 +000046 _pthread(0),
47 _state(uninitialized),
48 _policy(policy),
49 _priority(priority),
Marc Slemko6f038a72006-08-03 18:58:09 +000050 _stackSize(stackSize) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000051
52 this->Thread::runnable(runnable);
53 }
Marc Slemko66949872006-07-15 01:52:39 +000054
Mark Sleef5f2be42006-09-05 21:05:31 +000055 ~PthreadThread() {}
Marc Slemko6f038a72006-08-03 18:58:09 +000056
Marc Slemko66949872006-07-15 01:52:39 +000057 void start() {
Mark Sleef5f2be42006-09-05 21:05:31 +000058 if (_state != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +000059 return;
60 }
61
62 _state = starting;
63
64 pthread_attr_t thread_attr;
Marc Slemko66949872006-07-15 01:52:39 +000065 assert(pthread_attr_init(&thread_attr) == 0);
Marc Slemko66949872006-07-15 01:52:39 +000066 assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0);
67
68 // Set thread stack size
Marc Slemko66949872006-07-15 01:52:39 +000069 assert(pthread_attr_setstacksize(&thread_attr, MB * _stackSize) == 0);
70
71 // Set thread policy
Marc Slemko66949872006-07-15 01:52:39 +000072 assert(pthread_attr_setschedpolicy(&thread_attr, _policy) == 0);
73
74 struct sched_param sched_param;
75 sched_param.sched_priority = _priority;
76
77 // Set thread priority
Marc Slemko6f038a72006-08-03 18:58:09 +000078 assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
Marc Slemko66949872006-07-15 01:52:39 +000079
Mark Sleef5f2be42006-09-05 21:05:31 +000080 // Create reference
Marc Slemko6f038a72006-08-03 18:58:09 +000081 shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
Marc Slemko6f038a72006-08-03 18:58:09 +000082 *selfRef = _self.lock();
Marc Slemko6f038a72006-08-03 18:58:09 +000083 assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)selfRef) == 0);
Marc Slemko66949872006-07-15 01:52:39 +000084 }
85
86 void join() {
Mark Sleef5f2be42006-09-05 21:05:31 +000087 if (_state != stopped) {
Marc Slemko66949872006-07-15 01:52:39 +000088 void* ignore;
Marc Slemko66949872006-07-15 01:52:39 +000089 pthread_join(_pthread, &ignore);
90 }
91 }
92
Mark Sleef5f2be42006-09-05 21:05:31 +000093 shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000094
Mark Sleef5f2be42006-09-05 21:05:31 +000095 void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
Marc Slemko66949872006-07-15 01:52:39 +000096
Marc Slemko6f038a72006-08-03 18:58:09 +000097 void weakRef(shared_ptr<PthreadThread> self) {
98 assert(self.get() == this);
99 _self = weak_ptr<PthreadThread>(self);
100 }
Marc Slemko66949872006-07-15 01:52:39 +0000101};
102
Marc Slemko8a40a762006-07-19 17:46:50 +0000103void* PthreadThread::threadMain(void* arg) {
104 // XXX need a lock here when testing thread state
Marc Slemko6f038a72006-08-03 18:58:09 +0000105 shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
Marc Slemko6f038a72006-08-03 18:58:09 +0000106 delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
107
Mark Sleef5f2be42006-09-05 21:05:31 +0000108 if (thread == NULL) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000109 return (void*)0;
110 }
111
Mark Sleef5f2be42006-09-05 21:05:31 +0000112 if (thread->_state != starting) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000113 return (void*)0;
114 }
115
116 thread->_state = starting;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000117 thread->runnable()->run();
Mark Sleef5f2be42006-09-05 21:05:31 +0000118 if (thread->_state != stopping && thread->_state != stopped) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000119 thread->_state = stopping;
120 }
121
122 return (void*)0;
123}
124
Mark Sleef5f2be42006-09-05 21:05:31 +0000125/**
126 * POSIX Thread factory implementation
127 */
Marc Slemko66949872006-07-15 01:52:39 +0000128class PosixThreadFactory::Impl {
129
Mark Sleef5f2be42006-09-05 21:05:31 +0000130 private:
Marc Slemko66949872006-07-15 01:52:39 +0000131 POLICY _policy;
Marc Slemko66949872006-07-15 01:52:39 +0000132 PRIORITY _priority;
Marc Slemko66949872006-07-15 01:52:39 +0000133 int _stackSize;
Marc Slemko66949872006-07-15 01:52:39 +0000134 bool _detached;
135
Mark Sleef5f2be42006-09-05 21:05:31 +0000136 /**
137 * Converts generic posix thread schedule policy enums into pthread
138 * API values.
139 */
Marc Slemko66949872006-07-15 01:52:39 +0000140 static int toPthreadPolicy(POLICY policy) {
141 switch(policy) {
142 case OTHER: return SCHED_OTHER; break;
143 case FIFO: return SCHED_FIFO; break;
144 case ROUND_ROBIN: return SCHED_RR; break;
145 default: return SCHED_OTHER; break;
146 }
147 }
148
Mark Sleef5f2be42006-09-05 21:05:31 +0000149 /**
150 * Converts relative thread priorities to absolute value based on posix
151 * thread scheduler policy
152 *
153 * The idea is simply to divide up the priority range for the given policy
154 * into the correpsonding relative priority level (lowest..highest) and
155 * then pro-rate accordingly.
156 */
Marc Slemko66949872006-07-15 01:52:39 +0000157 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
Marc Slemko66949872006-07-15 01:52:39 +0000158 int pthread_policy = toPthreadPolicy(policy);
Marc Slemko66949872006-07-15 01:52:39 +0000159 int min_priority = sched_get_priority_min(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000160 int max_priority = sched_get_priority_max(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000161 int quanta = (HIGHEST - LOWEST) + 1;
Marc Slemko66949872006-07-15 01:52:39 +0000162 float stepsperquanta = (max_priority - min_priority) / quanta;
163
164 if(priority <= HIGHEST) {
Marc Slemko66949872006-07-15 01:52:39 +0000165 return (int)(min_priority + stepsperquanta * priority);
166 } else {
Marc Slemko66949872006-07-15 01:52:39 +0000167 // should never get here for priority increments.
Marc Slemko66949872006-07-15 01:52:39 +0000168 assert(false);
Marc Slemko66949872006-07-15 01:52:39 +0000169 return (int)(min_priority + stepsperquanta * NORMAL);
170 }
171 }
172
Mark Sleef5f2be42006-09-05 21:05:31 +0000173 public:
Marc Slemko66949872006-07-15 01:52:39 +0000174
175 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
176 _policy(policy),
177 _priority(priority),
178 _stackSize(stackSize),
Mark Sleef5f2be42006-09-05 21:05:31 +0000179 _detached(detached) {}
Marc Slemko66949872006-07-15 01:52:39 +0000180
Mark Sleef5f2be42006-09-05 21:05:31 +0000181 /**
182 * Creates a new POSIX thread to run the runnable object
183 *
184 * @param runnable A runnable object
185 */
Marc Slemko6f038a72006-08-03 18:58:09 +0000186 shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
Marc Slemko6f038a72006-08-03 18:58:09 +0000187 shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable));
188 result->weakRef(result);
189 runnable->thread(result);
190 return result;
Marc Slemko66949872006-07-15 01:52:39 +0000191 }
192
Mark Sleef5f2be42006-09-05 21:05:31 +0000193 int stackSize() const { return _stackSize; }
Marc Slemko66949872006-07-15 01:52:39 +0000194
Mark Sleef5f2be42006-09-05 21:05:31 +0000195 void stackSize(int value) { _stackSize = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000196
Mark Sleef5f2be42006-09-05 21:05:31 +0000197 PRIORITY priority() const { return _priority; }
Marc Slemko66949872006-07-15 01:52:39 +0000198
Mark Sleef5f2be42006-09-05 21:05:31 +0000199 /**
200 * Sets priority.
201 *
202 * XXX
203 * Need to handle incremental priorities properly.
204 */
205 void priority(PRIORITY value) { _priority = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000206};
207
208PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
209 _impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
210
Mark Sleef5f2be42006-09-05 21:05:31 +0000211shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return _impl->newThread(runnable); }
Marc Slemko66949872006-07-15 01:52:39 +0000212
Mark Sleef5f2be42006-09-05 21:05:31 +0000213int PosixThreadFactory::stackSize() const { return _impl->stackSize(); }
Marc Slemko66949872006-07-15 01:52:39 +0000214
Mark Sleef5f2be42006-09-05 21:05:31 +0000215void PosixThreadFactory::stackSize(int value) { _impl->stackSize(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000216
Mark Sleef5f2be42006-09-05 21:05:31 +0000217PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return _impl->priority(); }
Marc Slemko66949872006-07-15 01:52:39 +0000218
Mark Sleef5f2be42006-09-05 21:05:31 +0000219void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { _impl->priority(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000220
221}}} // facebook::thrift::concurrency