blob: fe5ba123c3f69105943060fa5a81b8ee62202a68 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Marc Slemko66949872006-07-15 01:52:39 +000020#include "PosixThreadFactory.h"
Marc Slemko3a3b53b2007-05-22 23:59:54 +000021#include "Exception.h"
Marc Slemko66949872006-07-15 01:52:39 +000022
David Reissaf296952008-06-10 22:54:40 +000023#if GOOGLE_PERFTOOLS_REGISTER_THREAD
24# include <google/profiler.h>
25#endif
26
Marc Slemko66949872006-07-15 01:52:39 +000027#include <assert.h>
28#include <pthread.h>
29
Marc Slemko6f038a72006-08-03 18:58:09 +000030#include <iostream>
31
32#include <boost/weak_ptr.hpp>
33
T Jake Lucianib5e62212009-01-31 22:36:20 +000034namespace apache { namespace thrift { namespace concurrency {
Marc Slemko66949872006-07-15 01:52:39 +000035
David Reissd4a269c2007-08-23 02:37:19 +000036using boost::shared_ptr;
37using boost::weak_ptr;
Marc Slemko6f038a72006-08-03 18:58:09 +000038
Mark Sleef5f2be42006-09-05 21:05:31 +000039/**
Marc Slemko3a3b53b2007-05-22 23:59:54 +000040 * The POSIX thread class.
Mark Sleef5f2be42006-09-05 21:05:31 +000041 *
Mark Sleef5f2be42006-09-05 21:05:31 +000042 * @version $Id:$
43 */
Marc Slemko66949872006-07-15 01:52:39 +000044class PthreadThread: public Thread {
Mark Sleef5f2be42006-09-05 21:05:31 +000045 public:
Marc Slemko66949872006-07-15 01:52:39 +000046
Mark Sleef5f2be42006-09-05 21:05:31 +000047 enum STATE {
48 uninitialized,
49 starting,
50 started,
51 stopping,
52 stopped
Marc Slemko66949872006-07-15 01:52:39 +000053 };
54
55 static const int MB = 1024 * 1024;
56
Marc Slemko8a40a762006-07-19 17:46:50 +000057 static void* threadMain(void* arg);
58
Mark Sleef5f2be42006-09-05 21:05:31 +000059 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000060 pthread_t pthread_;
61 STATE state_;
62 int policy_;
63 int priority_;
64 int stackSize_;
65 weak_ptr<PthreadThread> self_;
Marc Slemko67606e52007-06-04 21:01:19 +000066 bool detached_;
Marc Slemko6f038a72006-08-03 18:58:09 +000067
Mark Sleef5f2be42006-09-05 21:05:31 +000068 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000069
Marc Slemko67606e52007-06-04 21:01:19 +000070 PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr<Runnable> runnable) :
Roger Meier84e4a3c2011-09-16 20:58:44 +000071
72#ifndef _WIN32
Mark Slee2f6404d2006-10-10 01:37:40 +000073 pthread_(0),
Roger Meier84e4a3c2011-09-16 20:58:44 +000074#endif // _WIN32
75
Marc Slemko3a3b53b2007-05-22 23:59:54 +000076 state_(uninitialized),
Mark Slee2f6404d2006-10-10 01:37:40 +000077 policy_(policy),
78 priority_(priority),
Marc Slemko67606e52007-06-04 21:01:19 +000079 stackSize_(stackSize),
80 detached_(detached) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000081
82 this->Thread::runnable(runnable);
83 }
Marc Slemko66949872006-07-15 01:52:39 +000084
Marc Slemko67606e52007-06-04 21:01:19 +000085 ~PthreadThread() {
86 /* Nothing references this thread, if is is not detached, do a join
Marc Slemkoa6479032007-06-05 22:20:14 +000087 now, otherwise the thread-id and, possibly, other resources will
Marc Slemko67606e52007-06-04 21:01:19 +000088 be leaked. */
89 if(!detached_) {
90 try {
91 join();
92 } catch(...) {
Marc Slemkoa6479032007-06-05 22:20:14 +000093 // We're really hosed.
Marc Slemko67606e52007-06-04 21:01:19 +000094 }
95 }
96 }
Marc Slemko6f038a72006-08-03 18:58:09 +000097
Marc Slemko66949872006-07-15 01:52:39 +000098 void start() {
Mark Slee2f6404d2006-10-10 01:37:40 +000099 if (state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +0000100 return;
101 }
102
Marc Slemko66949872006-07-15 01:52:39 +0000103 pthread_attr_t thread_attr;
Mark Slee2782d6d2007-05-23 04:55:30 +0000104 if (pthread_attr_init(&thread_attr) != 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000105 throw SystemResourceException("pthread_attr_init failed");
106 }
Aditya Agarwal9dc57402007-03-31 17:45:12 +0000107
Marc Slemkoa6479032007-06-05 22:20:14 +0000108 if(pthread_attr_setdetachstate(&thread_attr,
109 detached_ ?
110 PTHREAD_CREATE_DETACHED :
Marc Slemko67606e52007-06-04 21:01:19 +0000111 PTHREAD_CREATE_JOINABLE) != 0) {
112 throw SystemResourceException("pthread_attr_setdetachstate failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000113 }
Marc Slemko66949872006-07-15 01:52:39 +0000114
115 // Set thread stack size
Mark Slee2782d6d2007-05-23 04:55:30 +0000116 if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
117 throw SystemResourceException("pthread_attr_setstacksize failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000118 }
Marc Slemko66949872006-07-15 01:52:39 +0000119
120 // Set thread policy
Roger Meier3516e0e2011-09-30 20:23:34 +0000121 #ifdef _WIN32
122 //WIN32 Pthread implementation doesn't seem to support sheduling policies other then PosixThreadFactory::OTHER - runtime error
123 policy_ = PosixThreadFactory::OTHER;
124 #endif
125
Mark Slee2782d6d2007-05-23 04:55:30 +0000126 if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
127 throw SystemResourceException("pthread_attr_setschedpolicy failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000128 }
Marc Slemko66949872006-07-15 01:52:39 +0000129
130 struct sched_param sched_param;
Mark Slee2f6404d2006-10-10 01:37:40 +0000131 sched_param.sched_priority = priority_;
Marc Slemko66949872006-07-15 01:52:39 +0000132
133 // Set thread priority
Mark Slee2782d6d2007-05-23 04:55:30 +0000134 if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
135 throw SystemResourceException("pthread_attr_setschedparam failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000136 }
Marc Slemko66949872006-07-15 01:52:39 +0000137
Mark Sleef5f2be42006-09-05 21:05:31 +0000138 // Create reference
Marc Slemko6f038a72006-08-03 18:58:09 +0000139 shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
Mark Slee2f6404d2006-10-10 01:37:40 +0000140 *selfRef = self_.lock();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000141
Marc Slemko67606e52007-06-04 21:01:19 +0000142 state_ = starting;
143
Mark Slee2782d6d2007-05-23 04:55:30 +0000144 if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
145 throw SystemResourceException("pthread_create failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000146 }
Marc Slemko66949872006-07-15 01:52:39 +0000147 }
148
149 void join() {
Marc Slemko67606e52007-06-04 21:01:19 +0000150 if (!detached_ && state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +0000151 void* ignore;
Marc Slemkoa6479032007-06-05 22:20:14 +0000152 /* XXX
153 If join fails it is most likely due to the fact
154 that the last reference was the thread itself and cannot
155 join. This results in leaked threads and will eventually
156 cause the process to run out of thread resources.
157 We're beyond the point of throwing an exception. Not clear how
158 best to handle this. */
159 detached_ = pthread_join(pthread_, &ignore) == 0;
Marc Slemko66949872006-07-15 01:52:39 +0000160 }
161 }
162
David Reissfbb14ef2008-12-02 02:32:25 +0000163 Thread::id_t getId() {
Roger Meier84e4a3c2011-09-16 20:58:44 +0000164
165#ifndef _WIN32
David Reissfbb14ef2008-12-02 02:32:25 +0000166 return (Thread::id_t)pthread_;
Roger Meier84e4a3c2011-09-16 20:58:44 +0000167#else
168 return (Thread::id_t)pthread_.p;
169#endif // _WIN32
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000170 }
171
Mark Sleef5f2be42006-09-05 21:05:31 +0000172 shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000173
Mark Sleef5f2be42006-09-05 21:05:31 +0000174 void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000175
Marc Slemko6f038a72006-08-03 18:58:09 +0000176 void weakRef(shared_ptr<PthreadThread> self) {
Aditya Agarwal3f234da2007-04-01 01:19:57 +0000177 assert(self.get() == this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000178 self_ = weak_ptr<PthreadThread>(self);
Marc Slemko6f038a72006-08-03 18:58:09 +0000179 }
Marc Slemko66949872006-07-15 01:52:39 +0000180};
181
Marc Slemko8a40a762006-07-19 17:46:50 +0000182void* PthreadThread::threadMain(void* arg) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000183 shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
Marc Slemko6f038a72006-08-03 18:58:09 +0000184 delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
185
Mark Sleef5f2be42006-09-05 21:05:31 +0000186 if (thread == NULL) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000187 return (void*)0;
188 }
189
Mark Slee2f6404d2006-10-10 01:37:40 +0000190 if (thread->state_ != starting) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000191 return (void*)0;
192 }
193
David Reissaf296952008-06-10 22:54:40 +0000194#if GOOGLE_PERFTOOLS_REGISTER_THREAD
195 ProfilerRegisterThread();
196#endif
197
Mark Slee2f6404d2006-10-10 01:37:40 +0000198 thread->state_ = starting;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000199 thread->runnable()->run();
Mark Slee2f6404d2006-10-10 01:37:40 +0000200 if (thread->state_ != stopping && thread->state_ != stopped) {
201 thread->state_ = stopping;
Marc Slemko8a40a762006-07-19 17:46:50 +0000202 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000203
Marc Slemko8a40a762006-07-19 17:46:50 +0000204 return (void*)0;
205}
206
Mark Sleef5f2be42006-09-05 21:05:31 +0000207/**
208 * POSIX Thread factory implementation
209 */
Marc Slemko66949872006-07-15 01:52:39 +0000210class PosixThreadFactory::Impl {
211
Mark Sleef5f2be42006-09-05 21:05:31 +0000212 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000213 POLICY policy_;
214 PRIORITY priority_;
215 int stackSize_;
216 bool detached_;
Marc Slemko66949872006-07-15 01:52:39 +0000217
Mark Sleef5f2be42006-09-05 21:05:31 +0000218 /**
219 * Converts generic posix thread schedule policy enums into pthread
220 * API values.
221 */
Marc Slemko66949872006-07-15 01:52:39 +0000222 static int toPthreadPolicy(POLICY policy) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000223 switch (policy) {
224 case OTHER:
225 return SCHED_OTHER;
226 case FIFO:
227 return SCHED_FIFO;
228 case ROUND_ROBIN:
229 return SCHED_RR;
Marc Slemko66949872006-07-15 01:52:39 +0000230 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000231 return SCHED_OTHER;
Marc Slemko66949872006-07-15 01:52:39 +0000232 }
233
Mark Sleef5f2be42006-09-05 21:05:31 +0000234 /**
235 * Converts relative thread priorities to absolute value based on posix
236 * thread scheduler policy
237 *
238 * The idea is simply to divide up the priority range for the given policy
239 * into the correpsonding relative priority level (lowest..highest) and
240 * then pro-rate accordingly.
241 */
Marc Slemko66949872006-07-15 01:52:39 +0000242 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
Marc Slemko66949872006-07-15 01:52:39 +0000243 int pthread_policy = toPthreadPolicy(policy);
David Reisse4ca1792009-05-21 02:28:19 +0000244 int min_priority = 0;
245 int max_priority = 0;
246#ifdef HAVE_SCHED_GET_PRIORITY_MIN
247 min_priority = sched_get_priority_min(pthread_policy);
248#endif
249#ifdef HAVE_SCHED_GET_PRIORITY_MAX
250 max_priority = sched_get_priority_max(pthread_policy);
251#endif
Marc Slemko66949872006-07-15 01:52:39 +0000252 int quanta = (HIGHEST - LOWEST) + 1;
Roger Meier3516e0e2011-09-30 20:23:34 +0000253 float stepsperquanta = (float)(max_priority - min_priority) / quanta;
Marc Slemko66949872006-07-15 01:52:39 +0000254
Mark Slee29050782006-09-29 00:12:30 +0000255 if (priority <= HIGHEST) {
Marc Slemko66949872006-07-15 01:52:39 +0000256 return (int)(min_priority + stepsperquanta * priority);
257 } else {
Marc Slemko66949872006-07-15 01:52:39 +0000258 // should never get here for priority increments.
Marc Slemko66949872006-07-15 01:52:39 +0000259 assert(false);
Marc Slemko66949872006-07-15 01:52:39 +0000260 return (int)(min_priority + stepsperquanta * NORMAL);
261 }
262 }
263
Mark Sleef5f2be42006-09-05 21:05:31 +0000264 public:
Marc Slemko66949872006-07-15 01:52:39 +0000265
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000266 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000267 policy_(policy),
268 priority_(priority),
269 stackSize_(stackSize),
270 detached_(detached) {}
Marc Slemko66949872006-07-15 01:52:39 +0000271
Mark Sleef5f2be42006-09-05 21:05:31 +0000272 /**
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000273 * Creates a new POSIX thread to run the runnable object
Mark Sleef5f2be42006-09-05 21:05:31 +0000274 *
275 * @param runnable A runnable object
276 */
Marc Slemko6f038a72006-08-03 18:58:09 +0000277 shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
Marc Slemko67606e52007-06-04 21:01:19 +0000278 shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable));
Marc Slemko6f038a72006-08-03 18:58:09 +0000279 result->weakRef(result);
280 runnable->thread(result);
281 return result;
Marc Slemko66949872006-07-15 01:52:39 +0000282 }
283
Marc Slemkoa6479032007-06-05 22:20:14 +0000284 int getStackSize() const { return stackSize_; }
Marc Slemko66949872006-07-15 01:52:39 +0000285
Marc Slemkoa6479032007-06-05 22:20:14 +0000286 void setStackSize(int value) { stackSize_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000287
Marc Slemkoa6479032007-06-05 22:20:14 +0000288 PRIORITY getPriority() const { return priority_; }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000289
Mark Sleef5f2be42006-09-05 21:05:31 +0000290 /**
291 * Sets priority.
292 *
293 * XXX
294 * Need to handle incremental priorities properly.
295 */
Marc Slemkoa6479032007-06-05 22:20:14 +0000296 void setPriority(PRIORITY value) { priority_ = value; }
297
298 bool isDetached() const { return detached_; }
299
300 void setDetached(bool value) { detached_ = value; }
301
Mark Slee98439152007-08-21 02:39:40 +0000302 Thread::id_t getCurrentThreadId() const {
Roger Meier84e4a3c2011-09-16 20:58:44 +0000303
304#ifndef _WIN32
David Reissffff2b32009-09-01 18:03:07 +0000305 return (Thread::id_t)pthread_self();
Roger Meier84e4a3c2011-09-16 20:58:44 +0000306#else
307 return (Thread::id_t)pthread_self().p;
308#endif // _WIN32
309
Mark Slee98439152007-08-21 02:39:40 +0000310 }
Marc Slemkoa6479032007-06-05 22:20:14 +0000311
Marc Slemko66949872006-07-15 01:52:39 +0000312};
313
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000314PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000315 impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
Marc Slemko66949872006-07-15 01:52:39 +0000316
Mark Slee2f6404d2006-10-10 01:37:40 +0000317shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
Marc Slemko66949872006-07-15 01:52:39 +0000318
Marc Slemkoa6479032007-06-05 22:20:14 +0000319int PosixThreadFactory::getStackSize() const { return impl_->getStackSize(); }
Marc Slemko66949872006-07-15 01:52:39 +0000320
Marc Slemkoa6479032007-06-05 22:20:14 +0000321void PosixThreadFactory::setStackSize(int value) { impl_->setStackSize(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000322
Marc Slemkoa6479032007-06-05 22:20:14 +0000323PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { return impl_->getPriority(); }
Marc Slemko66949872006-07-15 01:52:39 +0000324
Marc Slemkoa6479032007-06-05 22:20:14 +0000325void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { impl_->setPriority(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000326
Marc Slemkoa6479032007-06-05 22:20:14 +0000327bool PosixThreadFactory::isDetached() const { return impl_->isDetached(); }
328
329void PosixThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
330
331Thread::id_t PosixThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000332
T Jake Lucianib5e62212009-01-31 22:36:20 +0000333}}} // apache::thrift::concurrency