blob: 9655a8658ff61c879a82659b8824c87dae31d7e6 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko66949872006-07-15 01:52:39 +00007#include "PosixThreadFactory.h"
Marc Slemko3a3b53b2007-05-22 23:59:54 +00008#include "Exception.h"
Marc Slemko66949872006-07-15 01:52:39 +00009
David Reissaf296952008-06-10 22:54:40 +000010#if GOOGLE_PERFTOOLS_REGISTER_THREAD
11# include <google/profiler.h>
12#endif
13
Marc Slemko66949872006-07-15 01:52:39 +000014#include <assert.h>
15#include <pthread.h>
16
Marc Slemko6f038a72006-08-03 18:58:09 +000017#include <iostream>
18
19#include <boost/weak_ptr.hpp>
20
T Jake Lucianib5e62212009-01-31 22:36:20 +000021namespace apache { namespace thrift { namespace concurrency {
Marc Slemko66949872006-07-15 01:52:39 +000022
David Reissd4a269c2007-08-23 02:37:19 +000023using boost::shared_ptr;
24using boost::weak_ptr;
Marc Slemko6f038a72006-08-03 18:58:09 +000025
Mark Sleef5f2be42006-09-05 21:05:31 +000026/**
Marc Slemko3a3b53b2007-05-22 23:59:54 +000027 * The POSIX thread class.
Mark Sleef5f2be42006-09-05 21:05:31 +000028 *
29 * @author marc
30 * @version $Id:$
31 */
Marc Slemko66949872006-07-15 01:52:39 +000032class PthreadThread: public Thread {
Mark Sleef5f2be42006-09-05 21:05:31 +000033 public:
Marc Slemko66949872006-07-15 01:52:39 +000034
Mark Sleef5f2be42006-09-05 21:05:31 +000035 enum STATE {
36 uninitialized,
37 starting,
38 started,
39 stopping,
40 stopped
Marc Slemko66949872006-07-15 01:52:39 +000041 };
42
43 static const int MB = 1024 * 1024;
44
Marc Slemko8a40a762006-07-19 17:46:50 +000045 static void* threadMain(void* arg);
46
Mark Sleef5f2be42006-09-05 21:05:31 +000047 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000048 pthread_t pthread_;
49 STATE state_;
50 int policy_;
51 int priority_;
52 int stackSize_;
53 weak_ptr<PthreadThread> self_;
Marc Slemko67606e52007-06-04 21:01:19 +000054 bool detached_;
Marc Slemko6f038a72006-08-03 18:58:09 +000055
Mark Sleef5f2be42006-09-05 21:05:31 +000056 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000057
Marc Slemko67606e52007-06-04 21:01:19 +000058 PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +000059 pthread_(0),
Marc Slemko3a3b53b2007-05-22 23:59:54 +000060 state_(uninitialized),
Mark Slee2f6404d2006-10-10 01:37:40 +000061 policy_(policy),
62 priority_(priority),
Marc Slemko67606e52007-06-04 21:01:19 +000063 stackSize_(stackSize),
64 detached_(detached) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000065
66 this->Thread::runnable(runnable);
67 }
Marc Slemko66949872006-07-15 01:52:39 +000068
Marc Slemko67606e52007-06-04 21:01:19 +000069 ~PthreadThread() {
70 /* Nothing references this thread, if is is not detached, do a join
Marc Slemkoa6479032007-06-05 22:20:14 +000071 now, otherwise the thread-id and, possibly, other resources will
Marc Slemko67606e52007-06-04 21:01:19 +000072 be leaked. */
73 if(!detached_) {
74 try {
75 join();
76 } catch(...) {
Marc Slemkoa6479032007-06-05 22:20:14 +000077 // We're really hosed.
Marc Slemko67606e52007-06-04 21:01:19 +000078 }
79 }
80 }
Marc Slemko6f038a72006-08-03 18:58:09 +000081
Marc Slemko66949872006-07-15 01:52:39 +000082 void start() {
Mark Slee2f6404d2006-10-10 01:37:40 +000083 if (state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +000084 return;
85 }
86
Marc Slemko66949872006-07-15 01:52:39 +000087 pthread_attr_t thread_attr;
Mark Slee2782d6d2007-05-23 04:55:30 +000088 if (pthread_attr_init(&thread_attr) != 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000089 throw SystemResourceException("pthread_attr_init failed");
90 }
Aditya Agarwal9dc57402007-03-31 17:45:12 +000091
Marc Slemkoa6479032007-06-05 22:20:14 +000092 if(pthread_attr_setdetachstate(&thread_attr,
93 detached_ ?
94 PTHREAD_CREATE_DETACHED :
Marc Slemko67606e52007-06-04 21:01:19 +000095 PTHREAD_CREATE_JOINABLE) != 0) {
96 throw SystemResourceException("pthread_attr_setdetachstate failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +000097 }
Marc Slemko66949872006-07-15 01:52:39 +000098
99 // Set thread stack size
Mark Slee2782d6d2007-05-23 04:55:30 +0000100 if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
101 throw SystemResourceException("pthread_attr_setstacksize failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000102 }
Marc Slemko66949872006-07-15 01:52:39 +0000103
104 // Set thread policy
Mark Slee2782d6d2007-05-23 04:55:30 +0000105 if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
106 throw SystemResourceException("pthread_attr_setschedpolicy failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000107 }
Marc Slemko66949872006-07-15 01:52:39 +0000108
109 struct sched_param sched_param;
Mark Slee2f6404d2006-10-10 01:37:40 +0000110 sched_param.sched_priority = priority_;
Marc Slemko66949872006-07-15 01:52:39 +0000111
112 // Set thread priority
Mark Slee2782d6d2007-05-23 04:55:30 +0000113 if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
114 throw SystemResourceException("pthread_attr_setschedparam failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000115 }
Marc Slemko66949872006-07-15 01:52:39 +0000116
Mark Sleef5f2be42006-09-05 21:05:31 +0000117 // Create reference
Marc Slemko6f038a72006-08-03 18:58:09 +0000118 shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
Mark Slee2f6404d2006-10-10 01:37:40 +0000119 *selfRef = self_.lock();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000120
Marc Slemko67606e52007-06-04 21:01:19 +0000121 state_ = starting;
122
Mark Slee2782d6d2007-05-23 04:55:30 +0000123 if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
124 throw SystemResourceException("pthread_create failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000125 }
Marc Slemko66949872006-07-15 01:52:39 +0000126 }
127
128 void join() {
Marc Slemko67606e52007-06-04 21:01:19 +0000129 if (!detached_ && state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +0000130 void* ignore;
Marc Slemkoa6479032007-06-05 22:20:14 +0000131 /* XXX
132 If join fails it is most likely due to the fact
133 that the last reference was the thread itself and cannot
134 join. This results in leaked threads and will eventually
135 cause the process to run out of thread resources.
136 We're beyond the point of throwing an exception. Not clear how
137 best to handle this. */
138 detached_ = pthread_join(pthread_, &ignore) == 0;
Marc Slemko66949872006-07-15 01:52:39 +0000139 }
140 }
141
David Reissfbb14ef2008-12-02 02:32:25 +0000142 Thread::id_t getId() {
143 return (Thread::id_t)pthread_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000144 }
145
Mark Sleef5f2be42006-09-05 21:05:31 +0000146 shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000147
Mark Sleef5f2be42006-09-05 21:05:31 +0000148 void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000149
Marc Slemko6f038a72006-08-03 18:58:09 +0000150 void weakRef(shared_ptr<PthreadThread> self) {
Aditya Agarwal3f234da2007-04-01 01:19:57 +0000151 assert(self.get() == this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000152 self_ = weak_ptr<PthreadThread>(self);
Marc Slemko6f038a72006-08-03 18:58:09 +0000153 }
Marc Slemko66949872006-07-15 01:52:39 +0000154};
155
Marc Slemko8a40a762006-07-19 17:46:50 +0000156void* PthreadThread::threadMain(void* arg) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000157 shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
Marc Slemko6f038a72006-08-03 18:58:09 +0000158 delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
159
Mark Sleef5f2be42006-09-05 21:05:31 +0000160 if (thread == NULL) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000161 return (void*)0;
162 }
163
Mark Slee2f6404d2006-10-10 01:37:40 +0000164 if (thread->state_ != starting) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000165 return (void*)0;
166 }
167
David Reissaf296952008-06-10 22:54:40 +0000168#if GOOGLE_PERFTOOLS_REGISTER_THREAD
169 ProfilerRegisterThread();
170#endif
171
Mark Slee2f6404d2006-10-10 01:37:40 +0000172 thread->state_ = starting;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000173 thread->runnable()->run();
Mark Slee2f6404d2006-10-10 01:37:40 +0000174 if (thread->state_ != stopping && thread->state_ != stopped) {
175 thread->state_ = stopping;
Marc Slemko8a40a762006-07-19 17:46:50 +0000176 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000177
Marc Slemko8a40a762006-07-19 17:46:50 +0000178 return (void*)0;
179}
180
Mark Sleef5f2be42006-09-05 21:05:31 +0000181/**
182 * POSIX Thread factory implementation
183 */
Marc Slemko66949872006-07-15 01:52:39 +0000184class PosixThreadFactory::Impl {
185
Mark Sleef5f2be42006-09-05 21:05:31 +0000186 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000187 POLICY policy_;
188 PRIORITY priority_;
189 int stackSize_;
190 bool detached_;
Marc Slemko66949872006-07-15 01:52:39 +0000191
Mark Sleef5f2be42006-09-05 21:05:31 +0000192 /**
193 * Converts generic posix thread schedule policy enums into pthread
194 * API values.
195 */
Marc Slemko66949872006-07-15 01:52:39 +0000196 static int toPthreadPolicy(POLICY policy) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000197 switch (policy) {
198 case OTHER:
199 return SCHED_OTHER;
200 case FIFO:
201 return SCHED_FIFO;
202 case ROUND_ROBIN:
203 return SCHED_RR;
Marc Slemko66949872006-07-15 01:52:39 +0000204 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000205 return SCHED_OTHER;
Marc Slemko66949872006-07-15 01:52:39 +0000206 }
207
Mark Sleef5f2be42006-09-05 21:05:31 +0000208 /**
209 * Converts relative thread priorities to absolute value based on posix
210 * thread scheduler policy
211 *
212 * The idea is simply to divide up the priority range for the given policy
213 * into the correpsonding relative priority level (lowest..highest) and
214 * then pro-rate accordingly.
215 */
Marc Slemko66949872006-07-15 01:52:39 +0000216 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
Marc Slemko66949872006-07-15 01:52:39 +0000217 int pthread_policy = toPthreadPolicy(policy);
Marc Slemko66949872006-07-15 01:52:39 +0000218 int min_priority = sched_get_priority_min(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000219 int max_priority = sched_get_priority_max(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000220 int quanta = (HIGHEST - LOWEST) + 1;
Marc Slemko66949872006-07-15 01:52:39 +0000221 float stepsperquanta = (max_priority - min_priority) / quanta;
222
Mark Slee29050782006-09-29 00:12:30 +0000223 if (priority <= HIGHEST) {
Marc Slemko66949872006-07-15 01:52:39 +0000224 return (int)(min_priority + stepsperquanta * priority);
225 } else {
Marc Slemko66949872006-07-15 01:52:39 +0000226 // should never get here for priority increments.
Marc Slemko66949872006-07-15 01:52:39 +0000227 assert(false);
Marc Slemko66949872006-07-15 01:52:39 +0000228 return (int)(min_priority + stepsperquanta * NORMAL);
229 }
230 }
231
Mark Sleef5f2be42006-09-05 21:05:31 +0000232 public:
Marc Slemko66949872006-07-15 01:52:39 +0000233
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000234 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000235 policy_(policy),
236 priority_(priority),
237 stackSize_(stackSize),
238 detached_(detached) {}
Marc Slemko66949872006-07-15 01:52:39 +0000239
Mark Sleef5f2be42006-09-05 21:05:31 +0000240 /**
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000241 * Creates a new POSIX thread to run the runnable object
Mark Sleef5f2be42006-09-05 21:05:31 +0000242 *
243 * @param runnable A runnable object
244 */
Marc Slemko6f038a72006-08-03 18:58:09 +0000245 shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
Marc Slemko67606e52007-06-04 21:01:19 +0000246 shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable));
Marc Slemko6f038a72006-08-03 18:58:09 +0000247 result->weakRef(result);
248 runnable->thread(result);
249 return result;
Marc Slemko66949872006-07-15 01:52:39 +0000250 }
251
Marc Slemkoa6479032007-06-05 22:20:14 +0000252 int getStackSize() const { return stackSize_; }
Marc Slemko66949872006-07-15 01:52:39 +0000253
Marc Slemkoa6479032007-06-05 22:20:14 +0000254 void setStackSize(int value) { stackSize_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000255
Marc Slemkoa6479032007-06-05 22:20:14 +0000256 PRIORITY getPriority() const { return priority_; }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000257
Mark Sleef5f2be42006-09-05 21:05:31 +0000258 /**
259 * Sets priority.
260 *
261 * XXX
262 * Need to handle incremental priorities properly.
263 */
Marc Slemkoa6479032007-06-05 22:20:14 +0000264 void setPriority(PRIORITY value) { priority_ = value; }
265
266 bool isDetached() const { return detached_; }
267
268 void setDetached(bool value) { detached_ = value; }
269
Mark Slee98439152007-08-21 02:39:40 +0000270 Thread::id_t getCurrentThreadId() const {
271 // TODO(dreiss): Stop using C-style casts.
272 return (id_t)pthread_self();
273 }
Marc Slemkoa6479032007-06-05 22:20:14 +0000274
Marc Slemko66949872006-07-15 01:52:39 +0000275};
276
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000277PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000278 impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
Marc Slemko66949872006-07-15 01:52:39 +0000279
Mark Slee2f6404d2006-10-10 01:37:40 +0000280shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
Marc Slemko66949872006-07-15 01:52:39 +0000281
Marc Slemkoa6479032007-06-05 22:20:14 +0000282int PosixThreadFactory::getStackSize() const { return impl_->getStackSize(); }
Marc Slemko66949872006-07-15 01:52:39 +0000283
Marc Slemkoa6479032007-06-05 22:20:14 +0000284void PosixThreadFactory::setStackSize(int value) { impl_->setStackSize(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000285
Marc Slemkoa6479032007-06-05 22:20:14 +0000286PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { return impl_->getPriority(); }
Marc Slemko66949872006-07-15 01:52:39 +0000287
Marc Slemkoa6479032007-06-05 22:20:14 +0000288void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { impl_->setPriority(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000289
Marc Slemkoa6479032007-06-05 22:20:14 +0000290bool PosixThreadFactory::isDetached() const { return impl_->isDetached(); }
291
292void PosixThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
293
294Thread::id_t PosixThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000295
T Jake Lucianib5e62212009-01-31 22:36:20 +0000296}}} // apache::thrift::concurrency