blob: e48dce39e1e8aaffaa53d4b6912c81aa8aefe363 [file] [log] [blame]
Gavin McDonald0b75e1a2010-10-28 02:12:01 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20#include "PosixThreadFactory.h"
21#include "Exception.h"
22
23#if GOOGLE_PERFTOOLS_REGISTER_THREAD
24# include <google/profiler.h>
25#endif
26
27#include <assert.h>
28#include <pthread.h>
29
30#include <iostream>
31
32#include <boost/weak_ptr.hpp>
33
34namespace apache { namespace thrift { namespace concurrency {
35
36using boost::shared_ptr;
37using boost::weak_ptr;
38
39/**
40 * The POSIX thread class.
41 *
42 * @version $Id:$
43 */
44class PthreadThread: public Thread {
45 public:
46
47 enum STATE {
48 uninitialized,
49 starting,
50 started,
51 stopping,
52 stopped
53 };
54
55 static const int MB = 1024 * 1024;
56
57 static void* threadMain(void* arg);
58
59 private:
60 pthread_t pthread_;
61 STATE state_;
62 int policy_;
63 int priority_;
64 int stackSize_;
65 weak_ptr<PthreadThread> self_;
66 bool detached_;
67
68 public:
69
70 PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr<Runnable> runnable) :
71 pthread_(0),
72 state_(uninitialized),
73 policy_(policy),
74 priority_(priority),
75 stackSize_(stackSize),
76 detached_(detached) {
77
78 this->Thread::runnable(runnable);
79 }
80
81 ~PthreadThread() {
82 /* Nothing references this thread, if is is not detached, do a join
83 now, otherwise the thread-id and, possibly, other resources will
84 be leaked. */
85 if(!detached_) {
86 try {
87 join();
88 } catch(...) {
89 // We're really hosed.
90 }
91 }
92 }
93
94 void start() {
95 if (state_ != uninitialized) {
96 return;
97 }
98
99 pthread_attr_t thread_attr;
100 if (pthread_attr_init(&thread_attr) != 0) {
101 throw SystemResourceException("pthread_attr_init failed");
102 }
103
104 if(pthread_attr_setdetachstate(&thread_attr,
105 detached_ ?
106 PTHREAD_CREATE_DETACHED :
107 PTHREAD_CREATE_JOINABLE) != 0) {
108 throw SystemResourceException("pthread_attr_setdetachstate failed");
109 }
110
111 // Set thread stack size
112 if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
113 throw SystemResourceException("pthread_attr_setstacksize failed");
114 }
115
116 // Set thread policy
117 if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
118 throw SystemResourceException("pthread_attr_setschedpolicy failed");
119 }
120
121 struct sched_param sched_param;
122 sched_param.sched_priority = priority_;
123
124 // Set thread priority
125 if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
126 throw SystemResourceException("pthread_attr_setschedparam failed");
127 }
128
129 // Create reference
130 shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
131 *selfRef = self_.lock();
132
133 state_ = starting;
134
135 if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
136 throw SystemResourceException("pthread_create failed");
137 }
138 }
139
140 void join() {
141 if (!detached_ && state_ != uninitialized) {
142 void* ignore;
143 /* XXX
144 If join fails it is most likely due to the fact
145 that the last reference was the thread itself and cannot
146 join. This results in leaked threads and will eventually
147 cause the process to run out of thread resources.
148 We're beyond the point of throwing an exception. Not clear how
149 best to handle this. */
150 detached_ = pthread_join(pthread_, &ignore) == 0;
151 }
152 }
153
154 Thread::id_t getId() {
155 return (Thread::id_t)pthread_;
156 }
157
158 shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
159
160 void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
161
162 void weakRef(shared_ptr<PthreadThread> self) {
163 assert(self.get() == this);
164 self_ = weak_ptr<PthreadThread>(self);
165 }
166};
167
168void* PthreadThread::threadMain(void* arg) {
169 shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
170 delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
171
172 if (thread == NULL) {
173 return (void*)0;
174 }
175
176 if (thread->state_ != starting) {
177 return (void*)0;
178 }
179
180#if GOOGLE_PERFTOOLS_REGISTER_THREAD
181 ProfilerRegisterThread();
182#endif
183
184 thread->state_ = starting;
185 thread->runnable()->run();
186 if (thread->state_ != stopping && thread->state_ != stopped) {
187 thread->state_ = stopping;
188 }
189
190 return (void*)0;
191}
192
193/**
194 * POSIX Thread factory implementation
195 */
196class PosixThreadFactory::Impl {
197
198 private:
199 POLICY policy_;
200 PRIORITY priority_;
201 int stackSize_;
202 bool detached_;
203
204 /**
205 * Converts generic posix thread schedule policy enums into pthread
206 * API values.
207 */
208 static int toPthreadPolicy(POLICY policy) {
209 switch (policy) {
210 case OTHER:
211 return SCHED_OTHER;
212 case FIFO:
213 return SCHED_FIFO;
214 case ROUND_ROBIN:
215 return SCHED_RR;
216 }
217 return SCHED_OTHER;
218 }
219
220 /**
221 * Converts relative thread priorities to absolute value based on posix
222 * thread scheduler policy
223 *
224 * The idea is simply to divide up the priority range for the given policy
225 * into the correpsonding relative priority level (lowest..highest) and
226 * then pro-rate accordingly.
227 */
228 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
229 int pthread_policy = toPthreadPolicy(policy);
230 int min_priority = sched_get_priority_min(pthread_policy);
231 int max_priority = sched_get_priority_max(pthread_policy);
232 int quanta = (HIGHEST - LOWEST) + 1;
233 float stepsperquanta = (max_priority - min_priority) / quanta;
234
235 if (priority <= HIGHEST) {
236 return (int)(min_priority + stepsperquanta * priority);
237 } else {
238 // should never get here for priority increments.
239 assert(false);
240 return (int)(min_priority + stepsperquanta * NORMAL);
241 }
242 }
243
244 public:
245
246 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
247 policy_(policy),
248 priority_(priority),
249 stackSize_(stackSize),
250 detached_(detached) {}
251
252 /**
253 * Creates a new POSIX thread to run the runnable object
254 *
255 * @param runnable A runnable object
256 */
257 shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
258 shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable));
259 result->weakRef(result);
260 runnable->thread(result);
261 return result;
262 }
263
264 int getStackSize() const { return stackSize_; }
265
266 void setStackSize(int value) { stackSize_ = value; }
267
268 PRIORITY getPriority() const { return priority_; }
269
270 /**
271 * Sets priority.
272 *
273 * XXX
274 * Need to handle incremental priorities properly.
275 */
276 void setPriority(PRIORITY value) { priority_ = value; }
277
278 bool isDetached() const { return detached_; }
279
280 void setDetached(bool value) { detached_ = value; }
281
282 Thread::id_t getCurrentThreadId() const {
283 // TODO(dreiss): Stop using C-style casts.
284 return (id_t)pthread_self();
285 }
286
287};
288
289PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
290 impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
291
292shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
293
294int PosixThreadFactory::getStackSize() const { return impl_->getStackSize(); }
295
296void PosixThreadFactory::setStackSize(int value) { impl_->setStackSize(value); }
297
298PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { return impl_->getPriority(); }
299
300void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { impl_->setPriority(value); }
301
302bool PosixThreadFactory::isDetached() const { return impl_->isDetached(); }
303
304void PosixThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
305
306Thread::id_t PosixThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
307
308}}} // apache::thrift::concurrency