blob: e9d52f06f320eb6278367f76f437d19a5e86989e [file] [log] [blame]
Marc Slemko66949872006-07-15 01:52:39 +00001#include "PosixThreadFactory.h"
2
3#include <assert.h>
4#include <pthread.h>
5
6namespace facebook { namespace thrift { namespace concurrency {
7
8/** The POSIX thread class. */
9
10class PthreadThread: public Thread {
11
12public:
13 enum STATE {uninitialized,
14 starting,
15 started,
16 stopping,
17 stopped
18 };
19
20 static const int MB = 1024 * 1024;
21
22private:
23
24 pthread_t _pthread;
25
26 STATE _state;
27
28 int _policy;
29
30 int _priority;
31
32 int _stackSize;
33
34 Runnable* _runnable;
35
36 static void* threadMain(void* arg) {
37
38 // XXX need a lock here when testing thread state
39
40 PthreadThread* thread = (PthreadThread*)arg;
41
42 if(thread->_state != starting) {
43 return (void*)0;
44 }
45
46 thread->_state = starting;
47
48 thread->_runnable->run();
49
50 if(thread->_state != stopping && thread->_state != stopped) {
51 thread->_state = stopping;
52 }
53
54 return (void*)0;
55 }
56
57public:
58
59 PthreadThread(int policy, int priority, int stackSize, Runnable* runnable) :
60 _pthread(0),
61 _state(uninitialized),
62 _policy(policy),
63 _priority(priority),
64 _stackSize(stackSize),
65 _runnable(runnable)
66 {}
67
68 void start() {
69
70 if(_state != uninitialized) {
71 return;
72 }
73
74 _state = starting;
75
76 pthread_attr_t thread_attr;
77
78 assert(pthread_attr_init(&thread_attr) == 0);
79
80 assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0);
81
82 // Set thread stack size
83
84 assert(pthread_attr_setstacksize(&thread_attr, MB * _stackSize) == 0);
85
86 // Set thread policy
87
88 assert(pthread_attr_setschedpolicy(&thread_attr, _policy) == 0);
89
90 struct sched_param sched_param;
91 sched_param.sched_priority = _priority;
92
93 // Set thread priority
94
95 assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
96
97 assert(pthread_create(&_pthread, &thread_attr, PthreadThread::threadMain, (void*)this) == 0);
98 }
99
100 void join() {
101
102 if(_state != stopped) {
103
104 void* ignore;
105
106 pthread_join(_pthread, &ignore);
107 }
108 }
109
110 const Runnable* runnable() const {return _runnable;}
111
112};
113
114/** POSIX Thread factory implementation */
115
116class PosixThreadFactory::Impl {
117
118private:
119
120 POLICY _policy;
121
122 PRIORITY _priority;
123
124 int _stackSize;
125
126 bool _detached;
127
128 /** Converts generic posix thread schedule policy enums into pthread API values. */
129
130 static int toPthreadPolicy(POLICY policy) {
131 switch(policy) {
132 case OTHER: return SCHED_OTHER; break;
133 case FIFO: return SCHED_FIFO; break;
134 case ROUND_ROBIN: return SCHED_RR; break;
135 default: return SCHED_OTHER; break;
136 }
137 }
138
139 /** Converts relative thread priorities to absolute value based on posix thread scheduler policy
140
141 The idea is simply to divide up the priority range for the given policy into the correpsonding relative
142 priority level (lowest..highest) and then prorate accordingly. */
143
144 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
145
146 int pthread_policy = toPthreadPolicy(policy);
147
148 int min_priority = sched_get_priority_min(pthread_policy);
149
150 int max_priority = sched_get_priority_max(pthread_policy);
151
152 int quanta = (HIGHEST - LOWEST) + 1;
153
154 float stepsperquanta = (max_priority - min_priority) / quanta;
155
156 if(priority <= HIGHEST) {
157
158 return (int)(min_priority + stepsperquanta * priority);
159 } else {
160
161 // should never get here for priority increments.
162
163 assert(false);
164
165 return (int)(min_priority + stepsperquanta * NORMAL);
166 }
167 }
168
169public:
170
171 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
172 _policy(policy),
173 _priority(priority),
174 _stackSize(stackSize),
175 _detached(detached) {
176 }
177
178 /** Creates a new POSIX thread to run the runnable object
179
180 @param runnable A runnable object */
181
182 Thread* newThread(Runnable* runnable) const {
183
184 return new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable);
185 }
186
187 int stackSize() const { return _stackSize;}
188
189 void stackSize(int value) { _stackSize = value;}
190
191 PRIORITY priority() const { return _priority;}
192
193 /** Sets priority.
194
195 XXX
196 Need to handle incremental priorities properl. */
197
198 void priority(PRIORITY value) { _priority = value;}
199
200};
201
202PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
203 _impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
204
205Thread* PosixThreadFactory::newThread(Runnable* runnable) const {return _impl->newThread(runnable);}
206
207int PosixThreadFactory::stackSize() const {return _impl->stackSize();}
208
209void PosixThreadFactory::stackSize(int value) {_impl->stackSize(value);}
210
211PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const {return _impl->priority();}
212
213void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) {_impl->priority(value);}
214
215}}} // facebook::thrift::concurrency