blob: 59d7cfa7ac59285013222f23fefa298d4ecedb36 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko66949872006-07-15 01:52:39 +00007#include "PosixThreadFactory.h"
Marc Slemko3a3b53b2007-05-22 23:59:54 +00008#include "Exception.h"
Marc Slemko66949872006-07-15 01:52:39 +00009
David Reissaf296952008-06-10 22:54:40 +000010#if GOOGLE_PERFTOOLS_REGISTER_THREAD
11# include <google/profiler.h>
12#endif
13
Marc Slemko66949872006-07-15 01:52:39 +000014#include <assert.h>
15#include <pthread.h>
16
Marc Slemko6f038a72006-08-03 18:58:09 +000017#include <iostream>
18
19#include <boost/weak_ptr.hpp>
20
T Jake Lucianib5e62212009-01-31 22:36:20 +000021namespace apache { namespace thrift { namespace concurrency {
Marc Slemko66949872006-07-15 01:52:39 +000022
David Reissd4a269c2007-08-23 02:37:19 +000023using boost::shared_ptr;
24using boost::weak_ptr;
Marc Slemko6f038a72006-08-03 18:58:09 +000025
Mark Sleef5f2be42006-09-05 21:05:31 +000026/**
Marc Slemko3a3b53b2007-05-22 23:59:54 +000027 * The POSIX thread class.
Mark Sleef5f2be42006-09-05 21:05:31 +000028 *
Mark Sleef5f2be42006-09-05 21:05:31 +000029 * @version $Id:$
30 */
Marc Slemko66949872006-07-15 01:52:39 +000031class PthreadThread: public Thread {
Mark Sleef5f2be42006-09-05 21:05:31 +000032 public:
Marc Slemko66949872006-07-15 01:52:39 +000033
Mark Sleef5f2be42006-09-05 21:05:31 +000034 enum STATE {
35 uninitialized,
36 starting,
37 started,
38 stopping,
39 stopped
Marc Slemko66949872006-07-15 01:52:39 +000040 };
41
42 static const int MB = 1024 * 1024;
43
Marc Slemko8a40a762006-07-19 17:46:50 +000044 static void* threadMain(void* arg);
45
Mark Sleef5f2be42006-09-05 21:05:31 +000046 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000047 pthread_t pthread_;
48 STATE state_;
49 int policy_;
50 int priority_;
51 int stackSize_;
52 weak_ptr<PthreadThread> self_;
Marc Slemko67606e52007-06-04 21:01:19 +000053 bool detached_;
Marc Slemko6f038a72006-08-03 18:58:09 +000054
Mark Sleef5f2be42006-09-05 21:05:31 +000055 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000056
Marc Slemko67606e52007-06-04 21:01:19 +000057 PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +000058 pthread_(0),
Marc Slemko3a3b53b2007-05-22 23:59:54 +000059 state_(uninitialized),
Mark Slee2f6404d2006-10-10 01:37:40 +000060 policy_(policy),
61 priority_(priority),
Marc Slemko67606e52007-06-04 21:01:19 +000062 stackSize_(stackSize),
63 detached_(detached) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000064
65 this->Thread::runnable(runnable);
66 }
Marc Slemko66949872006-07-15 01:52:39 +000067
Marc Slemko67606e52007-06-04 21:01:19 +000068 ~PthreadThread() {
69 /* Nothing references this thread, if is is not detached, do a join
Marc Slemkoa6479032007-06-05 22:20:14 +000070 now, otherwise the thread-id and, possibly, other resources will
Marc Slemko67606e52007-06-04 21:01:19 +000071 be leaked. */
72 if(!detached_) {
73 try {
74 join();
75 } catch(...) {
Marc Slemkoa6479032007-06-05 22:20:14 +000076 // We're really hosed.
Marc Slemko67606e52007-06-04 21:01:19 +000077 }
78 }
79 }
Marc Slemko6f038a72006-08-03 18:58:09 +000080
Marc Slemko66949872006-07-15 01:52:39 +000081 void start() {
Mark Slee2f6404d2006-10-10 01:37:40 +000082 if (state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +000083 return;
84 }
85
Marc Slemko66949872006-07-15 01:52:39 +000086 pthread_attr_t thread_attr;
Mark Slee2782d6d2007-05-23 04:55:30 +000087 if (pthread_attr_init(&thread_attr) != 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000088 throw SystemResourceException("pthread_attr_init failed");
89 }
Aditya Agarwal9dc57402007-03-31 17:45:12 +000090
Marc Slemkoa6479032007-06-05 22:20:14 +000091 if(pthread_attr_setdetachstate(&thread_attr,
92 detached_ ?
93 PTHREAD_CREATE_DETACHED :
Marc Slemko67606e52007-06-04 21:01:19 +000094 PTHREAD_CREATE_JOINABLE) != 0) {
95 throw SystemResourceException("pthread_attr_setdetachstate failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +000096 }
Marc Slemko66949872006-07-15 01:52:39 +000097
98 // Set thread stack size
Mark Slee2782d6d2007-05-23 04:55:30 +000099 if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
100 throw SystemResourceException("pthread_attr_setstacksize failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000101 }
Marc Slemko66949872006-07-15 01:52:39 +0000102
103 // Set thread policy
Mark Slee2782d6d2007-05-23 04:55:30 +0000104 if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
105 throw SystemResourceException("pthread_attr_setschedpolicy failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000106 }
Marc Slemko66949872006-07-15 01:52:39 +0000107
108 struct sched_param sched_param;
Mark Slee2f6404d2006-10-10 01:37:40 +0000109 sched_param.sched_priority = priority_;
Marc Slemko66949872006-07-15 01:52:39 +0000110
111 // Set thread priority
Mark Slee2782d6d2007-05-23 04:55:30 +0000112 if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
113 throw SystemResourceException("pthread_attr_setschedparam failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000114 }
Marc Slemko66949872006-07-15 01:52:39 +0000115
Mark Sleef5f2be42006-09-05 21:05:31 +0000116 // Create reference
Marc Slemko6f038a72006-08-03 18:58:09 +0000117 shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
Mark Slee2f6404d2006-10-10 01:37:40 +0000118 *selfRef = self_.lock();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000119
Marc Slemko67606e52007-06-04 21:01:19 +0000120 state_ = starting;
121
Mark Slee2782d6d2007-05-23 04:55:30 +0000122 if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
123 throw SystemResourceException("pthread_create failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000124 }
Marc Slemko66949872006-07-15 01:52:39 +0000125 }
126
127 void join() {
Marc Slemko67606e52007-06-04 21:01:19 +0000128 if (!detached_ && state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +0000129 void* ignore;
Marc Slemkoa6479032007-06-05 22:20:14 +0000130 /* XXX
131 If join fails it is most likely due to the fact
132 that the last reference was the thread itself and cannot
133 join. This results in leaked threads and will eventually
134 cause the process to run out of thread resources.
135 We're beyond the point of throwing an exception. Not clear how
136 best to handle this. */
137 detached_ = pthread_join(pthread_, &ignore) == 0;
Marc Slemko66949872006-07-15 01:52:39 +0000138 }
139 }
140
David Reissfbb14ef2008-12-02 02:32:25 +0000141 Thread::id_t getId() {
142 return (Thread::id_t)pthread_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000143 }
144
Mark Sleef5f2be42006-09-05 21:05:31 +0000145 shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000146
Mark Sleef5f2be42006-09-05 21:05:31 +0000147 void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000148
Marc Slemko6f038a72006-08-03 18:58:09 +0000149 void weakRef(shared_ptr<PthreadThread> self) {
Aditya Agarwal3f234da2007-04-01 01:19:57 +0000150 assert(self.get() == this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000151 self_ = weak_ptr<PthreadThread>(self);
Marc Slemko6f038a72006-08-03 18:58:09 +0000152 }
Marc Slemko66949872006-07-15 01:52:39 +0000153};
154
Marc Slemko8a40a762006-07-19 17:46:50 +0000155void* PthreadThread::threadMain(void* arg) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000156 shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
Marc Slemko6f038a72006-08-03 18:58:09 +0000157 delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
158
Mark Sleef5f2be42006-09-05 21:05:31 +0000159 if (thread == NULL) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000160 return (void*)0;
161 }
162
Mark Slee2f6404d2006-10-10 01:37:40 +0000163 if (thread->state_ != starting) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000164 return (void*)0;
165 }
166
David Reissaf296952008-06-10 22:54:40 +0000167#if GOOGLE_PERFTOOLS_REGISTER_THREAD
168 ProfilerRegisterThread();
169#endif
170
Mark Slee2f6404d2006-10-10 01:37:40 +0000171 thread->state_ = starting;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000172 thread->runnable()->run();
Mark Slee2f6404d2006-10-10 01:37:40 +0000173 if (thread->state_ != stopping && thread->state_ != stopped) {
174 thread->state_ = stopping;
Marc Slemko8a40a762006-07-19 17:46:50 +0000175 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000176
Marc Slemko8a40a762006-07-19 17:46:50 +0000177 return (void*)0;
178}
179
Mark Sleef5f2be42006-09-05 21:05:31 +0000180/**
181 * POSIX Thread factory implementation
182 */
Marc Slemko66949872006-07-15 01:52:39 +0000183class PosixThreadFactory::Impl {
184
Mark Sleef5f2be42006-09-05 21:05:31 +0000185 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000186 POLICY policy_;
187 PRIORITY priority_;
188 int stackSize_;
189 bool detached_;
Marc Slemko66949872006-07-15 01:52:39 +0000190
Mark Sleef5f2be42006-09-05 21:05:31 +0000191 /**
192 * Converts generic posix thread schedule policy enums into pthread
193 * API values.
194 */
Marc Slemko66949872006-07-15 01:52:39 +0000195 static int toPthreadPolicy(POLICY policy) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000196 switch (policy) {
197 case OTHER:
198 return SCHED_OTHER;
199 case FIFO:
200 return SCHED_FIFO;
201 case ROUND_ROBIN:
202 return SCHED_RR;
Marc Slemko66949872006-07-15 01:52:39 +0000203 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000204 return SCHED_OTHER;
Marc Slemko66949872006-07-15 01:52:39 +0000205 }
206
Mark Sleef5f2be42006-09-05 21:05:31 +0000207 /**
208 * Converts relative thread priorities to absolute value based on posix
209 * thread scheduler policy
210 *
211 * The idea is simply to divide up the priority range for the given policy
212 * into the correpsonding relative priority level (lowest..highest) and
213 * then pro-rate accordingly.
214 */
Marc Slemko66949872006-07-15 01:52:39 +0000215 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
Marc Slemko66949872006-07-15 01:52:39 +0000216 int pthread_policy = toPthreadPolicy(policy);
Marc Slemko66949872006-07-15 01:52:39 +0000217 int min_priority = sched_get_priority_min(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000218 int max_priority = sched_get_priority_max(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000219 int quanta = (HIGHEST - LOWEST) + 1;
Marc Slemko66949872006-07-15 01:52:39 +0000220 float stepsperquanta = (max_priority - min_priority) / quanta;
221
Mark Slee29050782006-09-29 00:12:30 +0000222 if (priority <= HIGHEST) {
Marc Slemko66949872006-07-15 01:52:39 +0000223 return (int)(min_priority + stepsperquanta * priority);
224 } else {
Marc Slemko66949872006-07-15 01:52:39 +0000225 // should never get here for priority increments.
Marc Slemko66949872006-07-15 01:52:39 +0000226 assert(false);
Marc Slemko66949872006-07-15 01:52:39 +0000227 return (int)(min_priority + stepsperquanta * NORMAL);
228 }
229 }
230
Mark Sleef5f2be42006-09-05 21:05:31 +0000231 public:
Marc Slemko66949872006-07-15 01:52:39 +0000232
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000233 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000234 policy_(policy),
235 priority_(priority),
236 stackSize_(stackSize),
237 detached_(detached) {}
Marc Slemko66949872006-07-15 01:52:39 +0000238
Mark Sleef5f2be42006-09-05 21:05:31 +0000239 /**
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000240 * Creates a new POSIX thread to run the runnable object
Mark Sleef5f2be42006-09-05 21:05:31 +0000241 *
242 * @param runnable A runnable object
243 */
Marc Slemko6f038a72006-08-03 18:58:09 +0000244 shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
Marc Slemko67606e52007-06-04 21:01:19 +0000245 shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable));
Marc Slemko6f038a72006-08-03 18:58:09 +0000246 result->weakRef(result);
247 runnable->thread(result);
248 return result;
Marc Slemko66949872006-07-15 01:52:39 +0000249 }
250
Marc Slemkoa6479032007-06-05 22:20:14 +0000251 int getStackSize() const { return stackSize_; }
Marc Slemko66949872006-07-15 01:52:39 +0000252
Marc Slemkoa6479032007-06-05 22:20:14 +0000253 void setStackSize(int value) { stackSize_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000254
Marc Slemkoa6479032007-06-05 22:20:14 +0000255 PRIORITY getPriority() const { return priority_; }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000256
Mark Sleef5f2be42006-09-05 21:05:31 +0000257 /**
258 * Sets priority.
259 *
260 * XXX
261 * Need to handle incremental priorities properly.
262 */
Marc Slemkoa6479032007-06-05 22:20:14 +0000263 void setPriority(PRIORITY value) { priority_ = value; }
264
265 bool isDetached() const { return detached_; }
266
267 void setDetached(bool value) { detached_ = value; }
268
Mark Slee98439152007-08-21 02:39:40 +0000269 Thread::id_t getCurrentThreadId() const {
270 // TODO(dreiss): Stop using C-style casts.
271 return (id_t)pthread_self();
272 }
Marc Slemkoa6479032007-06-05 22:20:14 +0000273
Marc Slemko66949872006-07-15 01:52:39 +0000274};
275
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000276PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000277 impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
Marc Slemko66949872006-07-15 01:52:39 +0000278
Mark Slee2f6404d2006-10-10 01:37:40 +0000279shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
Marc Slemko66949872006-07-15 01:52:39 +0000280
Marc Slemkoa6479032007-06-05 22:20:14 +0000281int PosixThreadFactory::getStackSize() const { return impl_->getStackSize(); }
Marc Slemko66949872006-07-15 01:52:39 +0000282
Marc Slemkoa6479032007-06-05 22:20:14 +0000283void PosixThreadFactory::setStackSize(int value) { impl_->setStackSize(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000284
Marc Slemkoa6479032007-06-05 22:20:14 +0000285PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { return impl_->getPriority(); }
Marc Slemko66949872006-07-15 01:52:39 +0000286
Marc Slemkoa6479032007-06-05 22:20:14 +0000287void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { impl_->setPriority(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000288
Marc Slemkoa6479032007-06-05 22:20:14 +0000289bool PosixThreadFactory::isDetached() const { return impl_->isDetached(); }
290
291void PosixThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
292
293Thread::id_t PosixThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000294
T Jake Lucianib5e62212009-01-31 22:36:20 +0000295}}} // apache::thrift::concurrency