blob: bac122de589f8f612528103e26608d4418723428 [file] [log] [blame]
Marc Slemko66949872006-07-15 01:52:39 +00001#include "PosixThreadFactory.h"
2
3#include <assert.h>
4#include <pthread.h>
5
6namespace facebook { namespace thrift { namespace concurrency {
7
Marc Slemko0e53ccd2006-07-17 23:51:05 +00008/** The POSIX thread class.
9
10 @author marc
11 @version $Id:$ */
Marc Slemko66949872006-07-15 01:52:39 +000012
13class PthreadThread: public Thread {
14
15public:
16 enum STATE {uninitialized,
17 starting,
18 started,
19 stopping,
20 stopped
21 };
22
23 static const int MB = 1024 * 1024;
24
Marc Slemko8a40a762006-07-19 17:46:50 +000025 static void* threadMain(void* arg);
26
Marc Slemko66949872006-07-15 01:52:39 +000027private:
28
29 pthread_t _pthread;
30
31 STATE _state;
32
33 int _policy;
34
35 int _priority;
36
37 int _stackSize;
38
39 Runnable* _runnable;
40
Marc Slemko66949872006-07-15 01:52:39 +000041
42public:
43
44 PthreadThread(int policy, int priority, int stackSize, Runnable* runnable) :
45 _pthread(0),
46 _state(uninitialized),
47 _policy(policy),
48 _priority(priority),
49 _stackSize(stackSize),
50 _runnable(runnable)
51 {}
52
53 void start() {
54
55 if(_state != uninitialized) {
56 return;
57 }
58
59 _state = starting;
60
61 pthread_attr_t thread_attr;
62
63 assert(pthread_attr_init(&thread_attr) == 0);
64
65 assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0);
66
67 // Set thread stack size
68
69 assert(pthread_attr_setstacksize(&thread_attr, MB * _stackSize) == 0);
70
71 // Set thread policy
72
73 assert(pthread_attr_setschedpolicy(&thread_attr, _policy) == 0);
74
75 struct sched_param sched_param;
76 sched_param.sched_priority = _priority;
77
78 // Set thread priority
79
Marc Slemko8a40a762006-07-19 17:46:50 +000080 // assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
Marc Slemko66949872006-07-15 01:52:39 +000081
Marc Slemko8a40a762006-07-19 17:46:50 +000082 assert(pthread_create(&_pthread, &thread_attr, threadMain, (void*)this) == 0);
Marc Slemko66949872006-07-15 01:52:39 +000083 }
84
85 void join() {
86
87 if(_state != stopped) {
88
89 void* ignore;
90
91 pthread_join(_pthread, &ignore);
92 }
93 }
94
95 const Runnable* runnable() const {return _runnable;}
96
97};
98
Marc Slemko8a40a762006-07-19 17:46:50 +000099void* PthreadThread::threadMain(void* arg) {
100 // XXX need a lock here when testing thread state
101
102 PthreadThread* thread = (PthreadThread*)arg;
103
104 if(thread->_state != starting) {
105 return (void*)0;
106 }
107
108 thread->_state = starting;
109
110 thread->_runnable->run();
111
112 if(thread->_state != stopping && thread->_state != stopped) {
113 thread->_state = stopping;
114 }
115
116 return (void*)0;
117}
118
Marc Slemko66949872006-07-15 01:52:39 +0000119/** POSIX Thread factory implementation */
120
121class PosixThreadFactory::Impl {
122
123private:
124
125 POLICY _policy;
126
127 PRIORITY _priority;
128
129 int _stackSize;
130
131 bool _detached;
132
133 /** Converts generic posix thread schedule policy enums into pthread API values. */
134
135 static int toPthreadPolicy(POLICY policy) {
136 switch(policy) {
137 case OTHER: return SCHED_OTHER; break;
138 case FIFO: return SCHED_FIFO; break;
139 case ROUND_ROBIN: return SCHED_RR; break;
140 default: return SCHED_OTHER; break;
141 }
142 }
143
144 /** Converts relative thread priorities to absolute value based on posix thread scheduler policy
145
146 The idea is simply to divide up the priority range for the given policy into the correpsonding relative
Marc Slemko0e53ccd2006-07-17 23:51:05 +0000147 priority level (lowest..highest) and then pro-rate accordingly. */
Marc Slemko66949872006-07-15 01:52:39 +0000148
149 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
150
151 int pthread_policy = toPthreadPolicy(policy);
152
153 int min_priority = sched_get_priority_min(pthread_policy);
154
155 int max_priority = sched_get_priority_max(pthread_policy);
156
157 int quanta = (HIGHEST - LOWEST) + 1;
158
159 float stepsperquanta = (max_priority - min_priority) / quanta;
160
161 if(priority <= HIGHEST) {
162
163 return (int)(min_priority + stepsperquanta * priority);
164 } else {
165
166 // should never get here for priority increments.
167
168 assert(false);
169
170 return (int)(min_priority + stepsperquanta * NORMAL);
171 }
172 }
173
174public:
175
176 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
177 _policy(policy),
178 _priority(priority),
179 _stackSize(stackSize),
180 _detached(detached) {
181 }
182
183 /** Creates a new POSIX thread to run the runnable object
184
185 @param runnable A runnable object */
186
187 Thread* newThread(Runnable* runnable) const {
188
189 return new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable);
190 }
191
192 int stackSize() const { return _stackSize;}
193
194 void stackSize(int value) { _stackSize = value;}
195
196 PRIORITY priority() const { return _priority;}
197
198 /** Sets priority.
199
200 XXX
201 Need to handle incremental priorities properl. */
202
203 void priority(PRIORITY value) { _priority = value;}
204
205};
206
207PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
208 _impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
209
210Thread* PosixThreadFactory::newThread(Runnable* runnable) const {return _impl->newThread(runnable);}
211
212int PosixThreadFactory::stackSize() const {return _impl->stackSize();}
213
214void PosixThreadFactory::stackSize(int value) {_impl->stackSize(value);}
215
216PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const {return _impl->priority();}
217
218void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) {_impl->priority(value);}
219
220}}} // facebook::thrift::concurrency