blob: e48dce39e1e8aaffaa53d4b6912c81aa8aefe363 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Marc Slemko66949872006-07-15 01:52:39 +000020#include "PosixThreadFactory.h"
Marc Slemko3a3b53b2007-05-22 23:59:54 +000021#include "Exception.h"
Marc Slemko66949872006-07-15 01:52:39 +000022
David Reissaf296952008-06-10 22:54:40 +000023#if GOOGLE_PERFTOOLS_REGISTER_THREAD
24# include <google/profiler.h>
25#endif
26
Marc Slemko66949872006-07-15 01:52:39 +000027#include <assert.h>
28#include <pthread.h>
29
Marc Slemko6f038a72006-08-03 18:58:09 +000030#include <iostream>
31
32#include <boost/weak_ptr.hpp>
33
T Jake Lucianib5e62212009-01-31 22:36:20 +000034namespace apache { namespace thrift { namespace concurrency {
Marc Slemko66949872006-07-15 01:52:39 +000035
David Reissd4a269c2007-08-23 02:37:19 +000036using boost::shared_ptr;
37using boost::weak_ptr;
Marc Slemko6f038a72006-08-03 18:58:09 +000038
Mark Sleef5f2be42006-09-05 21:05:31 +000039/**
Marc Slemko3a3b53b2007-05-22 23:59:54 +000040 * The POSIX thread class.
Mark Sleef5f2be42006-09-05 21:05:31 +000041 *
Mark Sleef5f2be42006-09-05 21:05:31 +000042 * @version $Id:$
43 */
Marc Slemko66949872006-07-15 01:52:39 +000044class PthreadThread: public Thread {
Mark Sleef5f2be42006-09-05 21:05:31 +000045 public:
Marc Slemko66949872006-07-15 01:52:39 +000046
Mark Sleef5f2be42006-09-05 21:05:31 +000047 enum STATE {
48 uninitialized,
49 starting,
50 started,
51 stopping,
52 stopped
Marc Slemko66949872006-07-15 01:52:39 +000053 };
54
55 static const int MB = 1024 * 1024;
56
Marc Slemko8a40a762006-07-19 17:46:50 +000057 static void* threadMain(void* arg);
58
Mark Sleef5f2be42006-09-05 21:05:31 +000059 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000060 pthread_t pthread_;
61 STATE state_;
62 int policy_;
63 int priority_;
64 int stackSize_;
65 weak_ptr<PthreadThread> self_;
Marc Slemko67606e52007-06-04 21:01:19 +000066 bool detached_;
Marc Slemko6f038a72006-08-03 18:58:09 +000067
Mark Sleef5f2be42006-09-05 21:05:31 +000068 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000069
Marc Slemko67606e52007-06-04 21:01:19 +000070 PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +000071 pthread_(0),
Marc Slemko3a3b53b2007-05-22 23:59:54 +000072 state_(uninitialized),
Mark Slee2f6404d2006-10-10 01:37:40 +000073 policy_(policy),
74 priority_(priority),
Marc Slemko67606e52007-06-04 21:01:19 +000075 stackSize_(stackSize),
76 detached_(detached) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000077
78 this->Thread::runnable(runnable);
79 }
Marc Slemko66949872006-07-15 01:52:39 +000080
Marc Slemko67606e52007-06-04 21:01:19 +000081 ~PthreadThread() {
82 /* Nothing references this thread, if is is not detached, do a join
Marc Slemkoa6479032007-06-05 22:20:14 +000083 now, otherwise the thread-id and, possibly, other resources will
Marc Slemko67606e52007-06-04 21:01:19 +000084 be leaked. */
85 if(!detached_) {
86 try {
87 join();
88 } catch(...) {
Marc Slemkoa6479032007-06-05 22:20:14 +000089 // We're really hosed.
Marc Slemko67606e52007-06-04 21:01:19 +000090 }
91 }
92 }
Marc Slemko6f038a72006-08-03 18:58:09 +000093
Marc Slemko66949872006-07-15 01:52:39 +000094 void start() {
Mark Slee2f6404d2006-10-10 01:37:40 +000095 if (state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +000096 return;
97 }
98
Marc Slemko66949872006-07-15 01:52:39 +000099 pthread_attr_t thread_attr;
Mark Slee2782d6d2007-05-23 04:55:30 +0000100 if (pthread_attr_init(&thread_attr) != 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000101 throw SystemResourceException("pthread_attr_init failed");
102 }
Aditya Agarwal9dc57402007-03-31 17:45:12 +0000103
Marc Slemkoa6479032007-06-05 22:20:14 +0000104 if(pthread_attr_setdetachstate(&thread_attr,
105 detached_ ?
106 PTHREAD_CREATE_DETACHED :
Marc Slemko67606e52007-06-04 21:01:19 +0000107 PTHREAD_CREATE_JOINABLE) != 0) {
108 throw SystemResourceException("pthread_attr_setdetachstate failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000109 }
Marc Slemko66949872006-07-15 01:52:39 +0000110
111 // Set thread stack size
Mark Slee2782d6d2007-05-23 04:55:30 +0000112 if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
113 throw SystemResourceException("pthread_attr_setstacksize failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000114 }
Marc Slemko66949872006-07-15 01:52:39 +0000115
116 // Set thread policy
Mark Slee2782d6d2007-05-23 04:55:30 +0000117 if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
118 throw SystemResourceException("pthread_attr_setschedpolicy failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000119 }
Marc Slemko66949872006-07-15 01:52:39 +0000120
121 struct sched_param sched_param;
Mark Slee2f6404d2006-10-10 01:37:40 +0000122 sched_param.sched_priority = priority_;
Marc Slemko66949872006-07-15 01:52:39 +0000123
124 // Set thread priority
Mark Slee2782d6d2007-05-23 04:55:30 +0000125 if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
126 throw SystemResourceException("pthread_attr_setschedparam failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000127 }
Marc Slemko66949872006-07-15 01:52:39 +0000128
Mark Sleef5f2be42006-09-05 21:05:31 +0000129 // Create reference
Marc Slemko6f038a72006-08-03 18:58:09 +0000130 shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
Mark Slee2f6404d2006-10-10 01:37:40 +0000131 *selfRef = self_.lock();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000132
Marc Slemko67606e52007-06-04 21:01:19 +0000133 state_ = starting;
134
Mark Slee2782d6d2007-05-23 04:55:30 +0000135 if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
136 throw SystemResourceException("pthread_create failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000137 }
Marc Slemko66949872006-07-15 01:52:39 +0000138 }
139
140 void join() {
Marc Slemko67606e52007-06-04 21:01:19 +0000141 if (!detached_ && state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +0000142 void* ignore;
Marc Slemkoa6479032007-06-05 22:20:14 +0000143 /* XXX
144 If join fails it is most likely due to the fact
145 that the last reference was the thread itself and cannot
146 join. This results in leaked threads and will eventually
147 cause the process to run out of thread resources.
148 We're beyond the point of throwing an exception. Not clear how
149 best to handle this. */
150 detached_ = pthread_join(pthread_, &ignore) == 0;
Marc Slemko66949872006-07-15 01:52:39 +0000151 }
152 }
153
David Reissfbb14ef2008-12-02 02:32:25 +0000154 Thread::id_t getId() {
155 return (Thread::id_t)pthread_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000156 }
157
Mark Sleef5f2be42006-09-05 21:05:31 +0000158 shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000159
Mark Sleef5f2be42006-09-05 21:05:31 +0000160 void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000161
Marc Slemko6f038a72006-08-03 18:58:09 +0000162 void weakRef(shared_ptr<PthreadThread> self) {
Aditya Agarwal3f234da2007-04-01 01:19:57 +0000163 assert(self.get() == this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000164 self_ = weak_ptr<PthreadThread>(self);
Marc Slemko6f038a72006-08-03 18:58:09 +0000165 }
Marc Slemko66949872006-07-15 01:52:39 +0000166};
167
Marc Slemko8a40a762006-07-19 17:46:50 +0000168void* PthreadThread::threadMain(void* arg) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000169 shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
Marc Slemko6f038a72006-08-03 18:58:09 +0000170 delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
171
Mark Sleef5f2be42006-09-05 21:05:31 +0000172 if (thread == NULL) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000173 return (void*)0;
174 }
175
Mark Slee2f6404d2006-10-10 01:37:40 +0000176 if (thread->state_ != starting) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000177 return (void*)0;
178 }
179
David Reissaf296952008-06-10 22:54:40 +0000180#if GOOGLE_PERFTOOLS_REGISTER_THREAD
181 ProfilerRegisterThread();
182#endif
183
Mark Slee2f6404d2006-10-10 01:37:40 +0000184 thread->state_ = starting;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000185 thread->runnable()->run();
Mark Slee2f6404d2006-10-10 01:37:40 +0000186 if (thread->state_ != stopping && thread->state_ != stopped) {
187 thread->state_ = stopping;
Marc Slemko8a40a762006-07-19 17:46:50 +0000188 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000189
Marc Slemko8a40a762006-07-19 17:46:50 +0000190 return (void*)0;
191}
192
Mark Sleef5f2be42006-09-05 21:05:31 +0000193/**
194 * POSIX Thread factory implementation
195 */
Marc Slemko66949872006-07-15 01:52:39 +0000196class PosixThreadFactory::Impl {
197
Mark Sleef5f2be42006-09-05 21:05:31 +0000198 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000199 POLICY policy_;
200 PRIORITY priority_;
201 int stackSize_;
202 bool detached_;
Marc Slemko66949872006-07-15 01:52:39 +0000203
Mark Sleef5f2be42006-09-05 21:05:31 +0000204 /**
205 * Converts generic posix thread schedule policy enums into pthread
206 * API values.
207 */
Marc Slemko66949872006-07-15 01:52:39 +0000208 static int toPthreadPolicy(POLICY policy) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000209 switch (policy) {
210 case OTHER:
211 return SCHED_OTHER;
212 case FIFO:
213 return SCHED_FIFO;
214 case ROUND_ROBIN:
215 return SCHED_RR;
Marc Slemko66949872006-07-15 01:52:39 +0000216 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000217 return SCHED_OTHER;
Marc Slemko66949872006-07-15 01:52:39 +0000218 }
219
Mark Sleef5f2be42006-09-05 21:05:31 +0000220 /**
221 * Converts relative thread priorities to absolute value based on posix
222 * thread scheduler policy
223 *
224 * The idea is simply to divide up the priority range for the given policy
225 * into the correpsonding relative priority level (lowest..highest) and
226 * then pro-rate accordingly.
227 */
Marc Slemko66949872006-07-15 01:52:39 +0000228 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
Marc Slemko66949872006-07-15 01:52:39 +0000229 int pthread_policy = toPthreadPolicy(policy);
Marc Slemko66949872006-07-15 01:52:39 +0000230 int min_priority = sched_get_priority_min(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000231 int max_priority = sched_get_priority_max(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000232 int quanta = (HIGHEST - LOWEST) + 1;
Marc Slemko66949872006-07-15 01:52:39 +0000233 float stepsperquanta = (max_priority - min_priority) / quanta;
234
Mark Slee29050782006-09-29 00:12:30 +0000235 if (priority <= HIGHEST) {
Marc Slemko66949872006-07-15 01:52:39 +0000236 return (int)(min_priority + stepsperquanta * priority);
237 } else {
Marc Slemko66949872006-07-15 01:52:39 +0000238 // should never get here for priority increments.
Marc Slemko66949872006-07-15 01:52:39 +0000239 assert(false);
Marc Slemko66949872006-07-15 01:52:39 +0000240 return (int)(min_priority + stepsperquanta * NORMAL);
241 }
242 }
243
Mark Sleef5f2be42006-09-05 21:05:31 +0000244 public:
Marc Slemko66949872006-07-15 01:52:39 +0000245
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000246 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000247 policy_(policy),
248 priority_(priority),
249 stackSize_(stackSize),
250 detached_(detached) {}
Marc Slemko66949872006-07-15 01:52:39 +0000251
Mark Sleef5f2be42006-09-05 21:05:31 +0000252 /**
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000253 * Creates a new POSIX thread to run the runnable object
Mark Sleef5f2be42006-09-05 21:05:31 +0000254 *
255 * @param runnable A runnable object
256 */
Marc Slemko6f038a72006-08-03 18:58:09 +0000257 shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
Marc Slemko67606e52007-06-04 21:01:19 +0000258 shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable));
Marc Slemko6f038a72006-08-03 18:58:09 +0000259 result->weakRef(result);
260 runnable->thread(result);
261 return result;
Marc Slemko66949872006-07-15 01:52:39 +0000262 }
263
Marc Slemkoa6479032007-06-05 22:20:14 +0000264 int getStackSize() const { return stackSize_; }
Marc Slemko66949872006-07-15 01:52:39 +0000265
Marc Slemkoa6479032007-06-05 22:20:14 +0000266 void setStackSize(int value) { stackSize_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000267
Marc Slemkoa6479032007-06-05 22:20:14 +0000268 PRIORITY getPriority() const { return priority_; }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000269
Mark Sleef5f2be42006-09-05 21:05:31 +0000270 /**
271 * Sets priority.
272 *
273 * XXX
274 * Need to handle incremental priorities properly.
275 */
Marc Slemkoa6479032007-06-05 22:20:14 +0000276 void setPriority(PRIORITY value) { priority_ = value; }
277
278 bool isDetached() const { return detached_; }
279
280 void setDetached(bool value) { detached_ = value; }
281
Mark Slee98439152007-08-21 02:39:40 +0000282 Thread::id_t getCurrentThreadId() const {
283 // TODO(dreiss): Stop using C-style casts.
284 return (id_t)pthread_self();
285 }
Marc Slemkoa6479032007-06-05 22:20:14 +0000286
Marc Slemko66949872006-07-15 01:52:39 +0000287};
288
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000289PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000290 impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
Marc Slemko66949872006-07-15 01:52:39 +0000291
Mark Slee2f6404d2006-10-10 01:37:40 +0000292shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
Marc Slemko66949872006-07-15 01:52:39 +0000293
Marc Slemkoa6479032007-06-05 22:20:14 +0000294int PosixThreadFactory::getStackSize() const { return impl_->getStackSize(); }
Marc Slemko66949872006-07-15 01:52:39 +0000295
Marc Slemkoa6479032007-06-05 22:20:14 +0000296void PosixThreadFactory::setStackSize(int value) { impl_->setStackSize(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000297
Marc Slemkoa6479032007-06-05 22:20:14 +0000298PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { return impl_->getPriority(); }
Marc Slemko66949872006-07-15 01:52:39 +0000299
Marc Slemkoa6479032007-06-05 22:20:14 +0000300void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { impl_->setPriority(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000301
Marc Slemkoa6479032007-06-05 22:20:14 +0000302bool PosixThreadFactory::isDetached() const { return impl_->isDetached(); }
303
304void PosixThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
305
306Thread::id_t PosixThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000307
T Jake Lucianib5e62212009-01-31 22:36:20 +0000308}}} // apache::thrift::concurrency