blob: e7f84cdd4fbd1a88909e7889bd2834c990f5ed22 [file] [log] [blame]
Marc Slemko66949872006-07-15 01:52:39 +00001#include "PosixThreadFactory.h"
2
3#include <assert.h>
4#include <pthread.h>
5
6namespace facebook { namespace thrift { namespace concurrency {
7
Marc Slemko0e53ccd2006-07-17 23:51:05 +00008/** The POSIX thread class.
9
10 @author marc
11 @version $Id:$ */
Marc Slemko66949872006-07-15 01:52:39 +000012
13class PthreadThread: public Thread {
14
15public:
16 enum STATE {uninitialized,
17 starting,
18 started,
19 stopping,
20 stopped
21 };
22
23 static const int MB = 1024 * 1024;
24
25private:
26
27 pthread_t _pthread;
28
29 STATE _state;
30
31 int _policy;
32
33 int _priority;
34
35 int _stackSize;
36
37 Runnable* _runnable;
38
39 static void* threadMain(void* arg) {
40
41 // XXX need a lock here when testing thread state
42
43 PthreadThread* thread = (PthreadThread*)arg;
44
45 if(thread->_state != starting) {
46 return (void*)0;
47 }
48
49 thread->_state = starting;
50
51 thread->_runnable->run();
52
53 if(thread->_state != stopping && thread->_state != stopped) {
54 thread->_state = stopping;
55 }
56
57 return (void*)0;
58 }
59
60public:
61
62 PthreadThread(int policy, int priority, int stackSize, Runnable* runnable) :
63 _pthread(0),
64 _state(uninitialized),
65 _policy(policy),
66 _priority(priority),
67 _stackSize(stackSize),
68 _runnable(runnable)
69 {}
70
71 void start() {
72
73 if(_state != uninitialized) {
74 return;
75 }
76
77 _state = starting;
78
79 pthread_attr_t thread_attr;
80
81 assert(pthread_attr_init(&thread_attr) == 0);
82
83 assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0);
84
85 // Set thread stack size
86
87 assert(pthread_attr_setstacksize(&thread_attr, MB * _stackSize) == 0);
88
89 // Set thread policy
90
91 assert(pthread_attr_setschedpolicy(&thread_attr, _policy) == 0);
92
93 struct sched_param sched_param;
94 sched_param.sched_priority = _priority;
95
96 // Set thread priority
97
98 assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
99
100 assert(pthread_create(&_pthread, &thread_attr, PthreadThread::threadMain, (void*)this) == 0);
101 }
102
103 void join() {
104
105 if(_state != stopped) {
106
107 void* ignore;
108
109 pthread_join(_pthread, &ignore);
110 }
111 }
112
113 const Runnable* runnable() const {return _runnable;}
114
115};
116
117/** POSIX Thread factory implementation */
118
119class PosixThreadFactory::Impl {
120
121private:
122
123 POLICY _policy;
124
125 PRIORITY _priority;
126
127 int _stackSize;
128
129 bool _detached;
130
131 /** Converts generic posix thread schedule policy enums into pthread API values. */
132
133 static int toPthreadPolicy(POLICY policy) {
134 switch(policy) {
135 case OTHER: return SCHED_OTHER; break;
136 case FIFO: return SCHED_FIFO; break;
137 case ROUND_ROBIN: return SCHED_RR; break;
138 default: return SCHED_OTHER; break;
139 }
140 }
141
142 /** Converts relative thread priorities to absolute value based on posix thread scheduler policy
143
144 The idea is simply to divide up the priority range for the given policy into the correpsonding relative
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000145 priority level (lowest..highest) and then pro-rate accordingly. */
Marc Slemko66949872006-07-15 01:52:39 +0000146
147 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
148
149 int pthread_policy = toPthreadPolicy(policy);
150
151 int min_priority = sched_get_priority_min(pthread_policy);
152
153 int max_priority = sched_get_priority_max(pthread_policy);
154
155 int quanta = (HIGHEST - LOWEST) + 1;
156
157 float stepsperquanta = (max_priority - min_priority) / quanta;
158
159 if(priority <= HIGHEST) {
160
161 return (int)(min_priority + stepsperquanta * priority);
162 } else {
163
164 // should never get here for priority increments.
165
166 assert(false);
167
168 return (int)(min_priority + stepsperquanta * NORMAL);
169 }
170 }
171
172public:
173
174 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
175 _policy(policy),
176 _priority(priority),
177 _stackSize(stackSize),
178 _detached(detached) {
179 }
180
181 /** Creates a new POSIX thread to run the runnable object
182
183 @param runnable A runnable object */
184
185 Thread* newThread(Runnable* runnable) const {
186
187 return new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable);
188 }
189
190 int stackSize() const { return _stackSize;}
191
192 void stackSize(int value) { _stackSize = value;}
193
194 PRIORITY priority() const { return _priority;}
195
196 /** Sets priority.
197
198 XXX
199 Need to handle incremental priorities properl. */
200
201 void priority(PRIORITY value) { _priority = value;}
202
203};
204
205PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
206 _impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
207
208Thread* PosixThreadFactory::newThread(Runnable* runnable) const {return _impl->newThread(runnable);}
209
210int PosixThreadFactory::stackSize() const {return _impl->stackSize();}
211
212void PosixThreadFactory::stackSize(int value) {_impl->stackSize(value);}
213
214PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const {return _impl->priority();}
215
216void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) {_impl->priority(value);}
217
218}}} // facebook::thrift::concurrency