blob: f07db86a229b6674a3cae3af7abc5c006cd9b3ab [file] [log] [blame]
Marc Slemko66949872006-07-15 01:52:39 +00001#include "PosixThreadFactory.h"
2
3#include <assert.h>
4#include <pthread.h>
5
Marc Slemko6f038a72006-08-03 18:58:09 +00006#include <iostream>
7
8#include <boost/weak_ptr.hpp>
9
Marc Slemko66949872006-07-15 01:52:39 +000010namespace facebook { namespace thrift { namespace concurrency {
11
Marc Slemko6f038a72006-08-03 18:58:09 +000012using namespace boost;
13
Marc Slemko0e53ccd2006-07-17 23:51:05 +000014/** The POSIX thread class.
15
16 @author marc
17 @version $Id:$ */
Marc Slemko66949872006-07-15 01:52:39 +000018
19class PthreadThread: public Thread {
20
21public:
22 enum STATE {uninitialized,
23 starting,
24 started,
25 stopping,
26 stopped
27 };
28
29 static const int MB = 1024 * 1024;
30
Marc Slemko8a40a762006-07-19 17:46:50 +000031 static void* threadMain(void* arg);
32
Marc Slemko66949872006-07-15 01:52:39 +000033private:
34
35 pthread_t _pthread;
36
37 STATE _state;
38
39 int _policy;
40
41 int _priority;
42
43 int _stackSize;
44
Marc Slemko6f038a72006-08-03 18:58:09 +000045 weak_ptr<PthreadThread> _self;
46
Marc Slemko66949872006-07-15 01:52:39 +000047public:
48
Marc Slemko6f038a72006-08-03 18:58:09 +000049 PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
Marc Slemko66949872006-07-15 01:52:39 +000050 _pthread(0),
51 _state(uninitialized),
52 _policy(policy),
53 _priority(priority),
Marc Slemko6f038a72006-08-03 18:58:09 +000054 _stackSize(stackSize) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000055
56 this->Thread::runnable(runnable);
57 }
Marc Slemko66949872006-07-15 01:52:39 +000058
Marc Slemko6f038a72006-08-03 18:58:09 +000059 ~PthreadThread() {
60 }
61
Marc Slemko66949872006-07-15 01:52:39 +000062 void start() {
63
64 if(_state != uninitialized) {
65 return;
66 }
67
68 _state = starting;
69
70 pthread_attr_t thread_attr;
71
72 assert(pthread_attr_init(&thread_attr) == 0);
73
74 assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0);
75
76 // Set thread stack size
77
78 assert(pthread_attr_setstacksize(&thread_attr, MB * _stackSize) == 0);
79
80 // Set thread policy
81
82 assert(pthread_attr_setschedpolicy(&thread_attr, _policy) == 0);
83
84 struct sched_param sched_param;
85 sched_param.sched_priority = _priority;
86
87 // Set thread priority
88
Marc Slemko6f038a72006-08-03 18:58:09 +000089 assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
Marc Slemko66949872006-07-15 01:52:39 +000090
Marc Slemko6f038a72006-08-03 18:58:09 +000091 shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
92
93 *selfRef = _self.lock();
94
95 assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)selfRef) == 0);
Marc Slemko66949872006-07-15 01:52:39 +000096 }
97
98 void join() {
99
100 if(_state != stopped) {
101
102 void* ignore;
103
104 pthread_join(_pthread, &ignore);
105 }
106 }
107
Marc Slemko6f038a72006-08-03 18:58:09 +0000108 shared_ptr<Runnable> runnable() const {return Thread::runnable();}
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000109
Marc Slemko6f038a72006-08-03 18:58:09 +0000110 void runnable(shared_ptr<Runnable> value) {Thread::runnable(value);}
Marc Slemko66949872006-07-15 01:52:39 +0000111
Marc Slemko6f038a72006-08-03 18:58:09 +0000112 void weakRef(shared_ptr<PthreadThread> self) {
113 assert(self.get() == this);
114 _self = weak_ptr<PthreadThread>(self);
115 }
Marc Slemko66949872006-07-15 01:52:39 +0000116};
117
Marc Slemko8a40a762006-07-19 17:46:50 +0000118void* PthreadThread::threadMain(void* arg) {
119 // XXX need a lock here when testing thread state
120
Marc Slemko6f038a72006-08-03 18:58:09 +0000121 shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
122
123 delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
124
125 if(thread == NULL) {
126 return (void*)0;
127 }
128
Marc Slemko8a40a762006-07-19 17:46:50 +0000129 if(thread->_state != starting) {
130 return (void*)0;
131 }
132
133 thread->_state = starting;
134
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000135 thread->runnable()->run();
Marc Slemko8a40a762006-07-19 17:46:50 +0000136
137 if(thread->_state != stopping && thread->_state != stopped) {
138 thread->_state = stopping;
139 }
140
141 return (void*)0;
142}
143
Marc Slemko66949872006-07-15 01:52:39 +0000144/** POSIX Thread factory implementation */
145
146class PosixThreadFactory::Impl {
147
148private:
149
150 POLICY _policy;
151
152 PRIORITY _priority;
153
154 int _stackSize;
155
156 bool _detached;
157
158 /** Converts generic posix thread schedule policy enums into pthread API values. */
159
160 static int toPthreadPolicy(POLICY policy) {
161 switch(policy) {
162 case OTHER: return SCHED_OTHER; break;
163 case FIFO: return SCHED_FIFO; break;
164 case ROUND_ROBIN: return SCHED_RR; break;
165 default: return SCHED_OTHER; break;
166 }
167 }
168
169 /** Converts relative thread priorities to absolute value based on posix thread scheduler policy
170
171 The idea is simply to divide up the priority range for the given policy into the correpsonding relative
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000172 priority level (lowest..highest) and then pro-rate accordingly. */
Marc Slemko66949872006-07-15 01:52:39 +0000173
174 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
175
176 int pthread_policy = toPthreadPolicy(policy);
177
178 int min_priority = sched_get_priority_min(pthread_policy);
179
180 int max_priority = sched_get_priority_max(pthread_policy);
181
182 int quanta = (HIGHEST - LOWEST) + 1;
183
184 float stepsperquanta = (max_priority - min_priority) / quanta;
185
186 if(priority <= HIGHEST) {
187
188 return (int)(min_priority + stepsperquanta * priority);
189 } else {
190
191 // should never get here for priority increments.
192
193 assert(false);
194
195 return (int)(min_priority + stepsperquanta * NORMAL);
196 }
197 }
198
199public:
200
201 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
202 _policy(policy),
203 _priority(priority),
204 _stackSize(stackSize),
205 _detached(detached) {
206 }
207
208 /** Creates a new POSIX thread to run the runnable object
209
210 @param runnable A runnable object */
211
Marc Slemko6f038a72006-08-03 18:58:09 +0000212 shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
Marc Slemko66949872006-07-15 01:52:39 +0000213
Marc Slemko6f038a72006-08-03 18:58:09 +0000214 shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable));
215 result->weakRef(result);
216 runnable->thread(result);
217 return result;
Marc Slemko66949872006-07-15 01:52:39 +0000218 }
219
220 int stackSize() const { return _stackSize;}
221
222 void stackSize(int value) { _stackSize = value;}
223
224 PRIORITY priority() const { return _priority;}
225
226 /** Sets priority.
227
228 XXX
Marc Slemko6f038a72006-08-03 18:58:09 +0000229 Need to handle incremental priorities properly. */
Marc Slemko66949872006-07-15 01:52:39 +0000230
231 void priority(PRIORITY value) { _priority = value;}
232
233};
234
235PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
236 _impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
237
Marc Slemko6f038a72006-08-03 18:58:09 +0000238shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const {return _impl->newThread(runnable);}
Marc Slemko66949872006-07-15 01:52:39 +0000239
240int PosixThreadFactory::stackSize() const {return _impl->stackSize();}
241
242void PosixThreadFactory::stackSize(int value) {_impl->stackSize(value);}
243
244PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const {return _impl->priority();}
245
246void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) {_impl->priority(value);}
247
248}}} // facebook::thrift::concurrency