blob: 6924aa64c165d76e46e85c405262573b547d6441 [file] [log] [blame]
David Reissea2cba82009-03-30 21:35:00 +00001/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
Mark Slee9f0c6512007-02-28 23:58:26 +000019
Marc Slemko66949872006-07-15 01:52:39 +000020#include "PosixThreadFactory.h"
Marc Slemko3a3b53b2007-05-22 23:59:54 +000021#include "Exception.h"
Marc Slemko66949872006-07-15 01:52:39 +000022
David Reissaf296952008-06-10 22:54:40 +000023#if GOOGLE_PERFTOOLS_REGISTER_THREAD
24# include <google/profiler.h>
25#endif
26
Marc Slemko66949872006-07-15 01:52:39 +000027#include <assert.h>
28#include <pthread.h>
29
Marc Slemko6f038a72006-08-03 18:58:09 +000030#include <iostream>
31
32#include <boost/weak_ptr.hpp>
33
T Jake Lucianib5e62212009-01-31 22:36:20 +000034namespace apache { namespace thrift { namespace concurrency {
Marc Slemko66949872006-07-15 01:52:39 +000035
David Reissd4a269c2007-08-23 02:37:19 +000036using boost::shared_ptr;
37using boost::weak_ptr;
Marc Slemko6f038a72006-08-03 18:58:09 +000038
Mark Sleef5f2be42006-09-05 21:05:31 +000039/**
Marc Slemko3a3b53b2007-05-22 23:59:54 +000040 * The POSIX thread class.
Mark Sleef5f2be42006-09-05 21:05:31 +000041 *
Mark Sleef5f2be42006-09-05 21:05:31 +000042 * @version $Id:$
43 */
Marc Slemko66949872006-07-15 01:52:39 +000044class PthreadThread: public Thread {
Mark Sleef5f2be42006-09-05 21:05:31 +000045 public:
Marc Slemko66949872006-07-15 01:52:39 +000046
Mark Sleef5f2be42006-09-05 21:05:31 +000047 enum STATE {
48 uninitialized,
49 starting,
50 started,
51 stopping,
52 stopped
Marc Slemko66949872006-07-15 01:52:39 +000053 };
54
55 static const int MB = 1024 * 1024;
56
Marc Slemko8a40a762006-07-19 17:46:50 +000057 static void* threadMain(void* arg);
58
Mark Sleef5f2be42006-09-05 21:05:31 +000059 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000060 pthread_t pthread_;
61 STATE state_;
62 int policy_;
63 int priority_;
64 int stackSize_;
65 weak_ptr<PthreadThread> self_;
Marc Slemko67606e52007-06-04 21:01:19 +000066 bool detached_;
Marc Slemko6f038a72006-08-03 18:58:09 +000067
Mark Sleef5f2be42006-09-05 21:05:31 +000068 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000069
Marc Slemko67606e52007-06-04 21:01:19 +000070 PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr<Runnable> runnable) :
Roger Meier84e4a3c2011-09-16 20:58:44 +000071
72#ifndef _WIN32
Mark Slee2f6404d2006-10-10 01:37:40 +000073 pthread_(0),
Roger Meier84e4a3c2011-09-16 20:58:44 +000074#endif // _WIN32
75
Marc Slemko3a3b53b2007-05-22 23:59:54 +000076 state_(uninitialized),
Mark Slee2f6404d2006-10-10 01:37:40 +000077 policy_(policy),
78 priority_(priority),
Marc Slemko67606e52007-06-04 21:01:19 +000079 stackSize_(stackSize),
80 detached_(detached) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000081
82 this->Thread::runnable(runnable);
83 }
Marc Slemko66949872006-07-15 01:52:39 +000084
Marc Slemko67606e52007-06-04 21:01:19 +000085 ~PthreadThread() {
86 /* Nothing references this thread, if is is not detached, do a join
Marc Slemkoa6479032007-06-05 22:20:14 +000087 now, otherwise the thread-id and, possibly, other resources will
Marc Slemko67606e52007-06-04 21:01:19 +000088 be leaked. */
89 if(!detached_) {
90 try {
91 join();
92 } catch(...) {
Marc Slemkoa6479032007-06-05 22:20:14 +000093 // We're really hosed.
Marc Slemko67606e52007-06-04 21:01:19 +000094 }
95 }
96 }
Marc Slemko6f038a72006-08-03 18:58:09 +000097
Marc Slemko66949872006-07-15 01:52:39 +000098 void start() {
Mark Slee2f6404d2006-10-10 01:37:40 +000099 if (state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +0000100 return;
101 }
102
Marc Slemko66949872006-07-15 01:52:39 +0000103 pthread_attr_t thread_attr;
Mark Slee2782d6d2007-05-23 04:55:30 +0000104 if (pthread_attr_init(&thread_attr) != 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000105 throw SystemResourceException("pthread_attr_init failed");
106 }
Aditya Agarwal9dc57402007-03-31 17:45:12 +0000107
Marc Slemkoa6479032007-06-05 22:20:14 +0000108 if(pthread_attr_setdetachstate(&thread_attr,
109 detached_ ?
110 PTHREAD_CREATE_DETACHED :
Marc Slemko67606e52007-06-04 21:01:19 +0000111 PTHREAD_CREATE_JOINABLE) != 0) {
112 throw SystemResourceException("pthread_attr_setdetachstate failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000113 }
Marc Slemko66949872006-07-15 01:52:39 +0000114
115 // Set thread stack size
Mark Slee2782d6d2007-05-23 04:55:30 +0000116 if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
117 throw SystemResourceException("pthread_attr_setstacksize failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000118 }
Marc Slemko66949872006-07-15 01:52:39 +0000119
120 // Set thread policy
Roger Meier3516e0e2011-09-30 20:23:34 +0000121 #ifdef _WIN32
122 //WIN32 Pthread implementation doesn't seem to support sheduling policies other then PosixThreadFactory::OTHER - runtime error
123 policy_ = PosixThreadFactory::OTHER;
124 #endif
125
Mark Slee2782d6d2007-05-23 04:55:30 +0000126 if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
127 throw SystemResourceException("pthread_attr_setschedpolicy failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000128 }
Marc Slemko66949872006-07-15 01:52:39 +0000129
130 struct sched_param sched_param;
Mark Slee2f6404d2006-10-10 01:37:40 +0000131 sched_param.sched_priority = priority_;
Marc Slemko66949872006-07-15 01:52:39 +0000132
133 // Set thread priority
Mark Slee2782d6d2007-05-23 04:55:30 +0000134 if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
135 throw SystemResourceException("pthread_attr_setschedparam failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000136 }
Marc Slemko66949872006-07-15 01:52:39 +0000137
Mark Sleef5f2be42006-09-05 21:05:31 +0000138 // Create reference
Marc Slemko6f038a72006-08-03 18:58:09 +0000139 shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
Mark Slee2f6404d2006-10-10 01:37:40 +0000140 *selfRef = self_.lock();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000141
Marc Slemko67606e52007-06-04 21:01:19 +0000142 state_ = starting;
143
Mark Slee2782d6d2007-05-23 04:55:30 +0000144 if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
145 throw SystemResourceException("pthread_create failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000146 }
Marc Slemko66949872006-07-15 01:52:39 +0000147 }
148
149 void join() {
Marc Slemko67606e52007-06-04 21:01:19 +0000150 if (!detached_ && state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +0000151 void* ignore;
Marc Slemkoa6479032007-06-05 22:20:14 +0000152 /* XXX
153 If join fails it is most likely due to the fact
154 that the last reference was the thread itself and cannot
155 join. This results in leaked threads and will eventually
156 cause the process to run out of thread resources.
157 We're beyond the point of throwing an exception. Not clear how
158 best to handle this. */
Jake Farrellb0d95602011-12-06 01:17:26 +0000159 int res = pthread_join(pthread_, &ignore);
160 detached_ = (res == 0);
161 if (res != 0) {
162 GlobalOutput.printf("PthreadThread::join(): fail with code %d", res);
163 }
164 } else {
165 GlobalOutput.printf("PthreadThread::join(): detached thread");
Marc Slemko66949872006-07-15 01:52:39 +0000166 }
167 }
168
David Reissfbb14ef2008-12-02 02:32:25 +0000169 Thread::id_t getId() {
Roger Meier84e4a3c2011-09-16 20:58:44 +0000170
171#ifndef _WIN32
David Reissfbb14ef2008-12-02 02:32:25 +0000172 return (Thread::id_t)pthread_;
Roger Meier84e4a3c2011-09-16 20:58:44 +0000173#else
174 return (Thread::id_t)pthread_.p;
175#endif // _WIN32
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000176 }
177
Mark Sleef5f2be42006-09-05 21:05:31 +0000178 shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000179
Mark Sleef5f2be42006-09-05 21:05:31 +0000180 void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000181
Marc Slemko6f038a72006-08-03 18:58:09 +0000182 void weakRef(shared_ptr<PthreadThread> self) {
Aditya Agarwal3f234da2007-04-01 01:19:57 +0000183 assert(self.get() == this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000184 self_ = weak_ptr<PthreadThread>(self);
Marc Slemko6f038a72006-08-03 18:58:09 +0000185 }
Marc Slemko66949872006-07-15 01:52:39 +0000186};
187
Marc Slemko8a40a762006-07-19 17:46:50 +0000188void* PthreadThread::threadMain(void* arg) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000189 shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
Marc Slemko6f038a72006-08-03 18:58:09 +0000190 delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
191
Mark Sleef5f2be42006-09-05 21:05:31 +0000192 if (thread == NULL) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000193 return (void*)0;
194 }
195
Mark Slee2f6404d2006-10-10 01:37:40 +0000196 if (thread->state_ != starting) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000197 return (void*)0;
198 }
199
David Reissaf296952008-06-10 22:54:40 +0000200#if GOOGLE_PERFTOOLS_REGISTER_THREAD
201 ProfilerRegisterThread();
202#endif
203
Roger Meier3faaedf2011-10-02 10:51:45 +0000204 thread->state_ = started;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000205 thread->runnable()->run();
Mark Slee2f6404d2006-10-10 01:37:40 +0000206 if (thread->state_ != stopping && thread->state_ != stopped) {
207 thread->state_ = stopping;
Marc Slemko8a40a762006-07-19 17:46:50 +0000208 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000209
Marc Slemko8a40a762006-07-19 17:46:50 +0000210 return (void*)0;
211}
212
Mark Sleef5f2be42006-09-05 21:05:31 +0000213/**
214 * POSIX Thread factory implementation
215 */
Marc Slemko66949872006-07-15 01:52:39 +0000216class PosixThreadFactory::Impl {
217
Mark Sleef5f2be42006-09-05 21:05:31 +0000218 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000219 POLICY policy_;
220 PRIORITY priority_;
221 int stackSize_;
222 bool detached_;
Marc Slemko66949872006-07-15 01:52:39 +0000223
Mark Sleef5f2be42006-09-05 21:05:31 +0000224 /**
225 * Converts generic posix thread schedule policy enums into pthread
226 * API values.
227 */
Marc Slemko66949872006-07-15 01:52:39 +0000228 static int toPthreadPolicy(POLICY policy) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000229 switch (policy) {
230 case OTHER:
231 return SCHED_OTHER;
232 case FIFO:
233 return SCHED_FIFO;
234 case ROUND_ROBIN:
235 return SCHED_RR;
Marc Slemko66949872006-07-15 01:52:39 +0000236 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000237 return SCHED_OTHER;
Marc Slemko66949872006-07-15 01:52:39 +0000238 }
239
Mark Sleef5f2be42006-09-05 21:05:31 +0000240 /**
241 * Converts relative thread priorities to absolute value based on posix
242 * thread scheduler policy
243 *
244 * The idea is simply to divide up the priority range for the given policy
245 * into the correpsonding relative priority level (lowest..highest) and
246 * then pro-rate accordingly.
247 */
Marc Slemko66949872006-07-15 01:52:39 +0000248 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
Marc Slemko66949872006-07-15 01:52:39 +0000249 int pthread_policy = toPthreadPolicy(policy);
David Reisse4ca1792009-05-21 02:28:19 +0000250 int min_priority = 0;
251 int max_priority = 0;
252#ifdef HAVE_SCHED_GET_PRIORITY_MIN
253 min_priority = sched_get_priority_min(pthread_policy);
254#endif
255#ifdef HAVE_SCHED_GET_PRIORITY_MAX
256 max_priority = sched_get_priority_max(pthread_policy);
257#endif
Marc Slemko66949872006-07-15 01:52:39 +0000258 int quanta = (HIGHEST - LOWEST) + 1;
Roger Meier3516e0e2011-09-30 20:23:34 +0000259 float stepsperquanta = (float)(max_priority - min_priority) / quanta;
Marc Slemko66949872006-07-15 01:52:39 +0000260
Mark Slee29050782006-09-29 00:12:30 +0000261 if (priority <= HIGHEST) {
Marc Slemko66949872006-07-15 01:52:39 +0000262 return (int)(min_priority + stepsperquanta * priority);
263 } else {
Marc Slemko66949872006-07-15 01:52:39 +0000264 // should never get here for priority increments.
Marc Slemko66949872006-07-15 01:52:39 +0000265 assert(false);
Marc Slemko66949872006-07-15 01:52:39 +0000266 return (int)(min_priority + stepsperquanta * NORMAL);
267 }
268 }
269
Mark Sleef5f2be42006-09-05 21:05:31 +0000270 public:
Marc Slemko66949872006-07-15 01:52:39 +0000271
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000272 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000273 policy_(policy),
274 priority_(priority),
275 stackSize_(stackSize),
276 detached_(detached) {}
Marc Slemko66949872006-07-15 01:52:39 +0000277
Mark Sleef5f2be42006-09-05 21:05:31 +0000278 /**
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000279 * Creates a new POSIX thread to run the runnable object
Mark Sleef5f2be42006-09-05 21:05:31 +0000280 *
281 * @param runnable A runnable object
282 */
Marc Slemko6f038a72006-08-03 18:58:09 +0000283 shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
Marc Slemko67606e52007-06-04 21:01:19 +0000284 shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable));
Marc Slemko6f038a72006-08-03 18:58:09 +0000285 result->weakRef(result);
286 runnable->thread(result);
287 return result;
Marc Slemko66949872006-07-15 01:52:39 +0000288 }
289
Marc Slemkoa6479032007-06-05 22:20:14 +0000290 int getStackSize() const { return stackSize_; }
Marc Slemko66949872006-07-15 01:52:39 +0000291
Marc Slemkoa6479032007-06-05 22:20:14 +0000292 void setStackSize(int value) { stackSize_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000293
Marc Slemkoa6479032007-06-05 22:20:14 +0000294 PRIORITY getPriority() const { return priority_; }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000295
Mark Sleef5f2be42006-09-05 21:05:31 +0000296 /**
297 * Sets priority.
298 *
299 * XXX
300 * Need to handle incremental priorities properly.
301 */
Marc Slemkoa6479032007-06-05 22:20:14 +0000302 void setPriority(PRIORITY value) { priority_ = value; }
303
304 bool isDetached() const { return detached_; }
305
306 void setDetached(bool value) { detached_ = value; }
307
Mark Slee98439152007-08-21 02:39:40 +0000308 Thread::id_t getCurrentThreadId() const {
Roger Meier84e4a3c2011-09-16 20:58:44 +0000309
310#ifndef _WIN32
David Reissffff2b32009-09-01 18:03:07 +0000311 return (Thread::id_t)pthread_self();
Roger Meier84e4a3c2011-09-16 20:58:44 +0000312#else
313 return (Thread::id_t)pthread_self().p;
314#endif // _WIN32
315
Mark Slee98439152007-08-21 02:39:40 +0000316 }
Marc Slemkoa6479032007-06-05 22:20:14 +0000317
Marc Slemko66949872006-07-15 01:52:39 +0000318};
319
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000320PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000321 impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
Marc Slemko66949872006-07-15 01:52:39 +0000322
Mark Slee2f6404d2006-10-10 01:37:40 +0000323shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
Marc Slemko66949872006-07-15 01:52:39 +0000324
Marc Slemkoa6479032007-06-05 22:20:14 +0000325int PosixThreadFactory::getStackSize() const { return impl_->getStackSize(); }
Marc Slemko66949872006-07-15 01:52:39 +0000326
Marc Slemkoa6479032007-06-05 22:20:14 +0000327void PosixThreadFactory::setStackSize(int value) { impl_->setStackSize(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000328
Marc Slemkoa6479032007-06-05 22:20:14 +0000329PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { return impl_->getPriority(); }
Marc Slemko66949872006-07-15 01:52:39 +0000330
Marc Slemkoa6479032007-06-05 22:20:14 +0000331void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { impl_->setPriority(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000332
Marc Slemkoa6479032007-06-05 22:20:14 +0000333bool PosixThreadFactory::isDetached() const { return impl_->isDetached(); }
334
335void PosixThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
336
337Thread::id_t PosixThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000338
T Jake Lucianib5e62212009-01-31 22:36:20 +0000339}}} // apache::thrift::concurrency