blob: f6ab6c7297c262bdde90e6aa3a85e76a61bf5783 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko66949872006-07-15 01:52:39 +00007#include "PosixThreadFactory.h"
Marc Slemko3a3b53b2007-05-22 23:59:54 +00008#include "Exception.h"
Marc Slemko66949872006-07-15 01:52:39 +00009
10#include <assert.h>
11#include <pthread.h>
12
Marc Slemko6f038a72006-08-03 18:58:09 +000013#include <iostream>
14
15#include <boost/weak_ptr.hpp>
16
Marc Slemko66949872006-07-15 01:52:39 +000017namespace facebook { namespace thrift { namespace concurrency {
18
Marc Slemko6f038a72006-08-03 18:58:09 +000019using namespace boost;
20
Mark Sleef5f2be42006-09-05 21:05:31 +000021/**
Marc Slemko3a3b53b2007-05-22 23:59:54 +000022 * The POSIX thread class.
Mark Sleef5f2be42006-09-05 21:05:31 +000023 *
24 * @author marc
25 * @version $Id:$
26 */
Marc Slemko66949872006-07-15 01:52:39 +000027class PthreadThread: public Thread {
Mark Sleef5f2be42006-09-05 21:05:31 +000028 public:
Marc Slemko66949872006-07-15 01:52:39 +000029
Mark Sleef5f2be42006-09-05 21:05:31 +000030 enum STATE {
31 uninitialized,
32 starting,
33 started,
34 stopping,
35 stopped
Marc Slemko66949872006-07-15 01:52:39 +000036 };
37
38 static const int MB = 1024 * 1024;
39
Marc Slemko8a40a762006-07-19 17:46:50 +000040 static void* threadMain(void* arg);
41
Mark Sleef5f2be42006-09-05 21:05:31 +000042 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000043 pthread_t pthread_;
44 STATE state_;
45 int policy_;
46 int priority_;
47 int stackSize_;
48 weak_ptr<PthreadThread> self_;
Marc Slemko67606e52007-06-04 21:01:19 +000049 bool detached_;
Marc Slemko6f038a72006-08-03 18:58:09 +000050
Mark Sleef5f2be42006-09-05 21:05:31 +000051 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000052
Marc Slemko67606e52007-06-04 21:01:19 +000053 PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +000054 pthread_(0),
Marc Slemko3a3b53b2007-05-22 23:59:54 +000055 state_(uninitialized),
Mark Slee2f6404d2006-10-10 01:37:40 +000056 policy_(policy),
57 priority_(priority),
Marc Slemko67606e52007-06-04 21:01:19 +000058 stackSize_(stackSize),
59 detached_(detached) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000060
61 this->Thread::runnable(runnable);
62 }
Marc Slemko66949872006-07-15 01:52:39 +000063
Marc Slemko67606e52007-06-04 21:01:19 +000064 ~PthreadThread() {
65 /* Nothing references this thread, if is is not detached, do a join
Marc Slemkoa6479032007-06-05 22:20:14 +000066 now, otherwise the thread-id and, possibly, other resources will
Marc Slemko67606e52007-06-04 21:01:19 +000067 be leaked. */
68 if(!detached_) {
69 try {
70 join();
71 } catch(...) {
Marc Slemkoa6479032007-06-05 22:20:14 +000072 // We're really hosed.
Marc Slemko67606e52007-06-04 21:01:19 +000073 }
74 }
75 }
Marc Slemko6f038a72006-08-03 18:58:09 +000076
Marc Slemko66949872006-07-15 01:52:39 +000077 void start() {
Mark Slee2f6404d2006-10-10 01:37:40 +000078 if (state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +000079 return;
80 }
81
Marc Slemko66949872006-07-15 01:52:39 +000082 pthread_attr_t thread_attr;
Mark Slee2782d6d2007-05-23 04:55:30 +000083 if (pthread_attr_init(&thread_attr) != 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000084 throw SystemResourceException("pthread_attr_init failed");
85 }
Aditya Agarwal9dc57402007-03-31 17:45:12 +000086
Marc Slemkoa6479032007-06-05 22:20:14 +000087 if(pthread_attr_setdetachstate(&thread_attr,
88 detached_ ?
89 PTHREAD_CREATE_DETACHED :
Marc Slemko67606e52007-06-04 21:01:19 +000090 PTHREAD_CREATE_JOINABLE) != 0) {
91 throw SystemResourceException("pthread_attr_setdetachstate failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +000092 }
Marc Slemko66949872006-07-15 01:52:39 +000093
94 // Set thread stack size
Mark Slee2782d6d2007-05-23 04:55:30 +000095 if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
96 throw SystemResourceException("pthread_attr_setstacksize failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +000097 }
Marc Slemko66949872006-07-15 01:52:39 +000098
99 // Set thread policy
Mark Slee2782d6d2007-05-23 04:55:30 +0000100 if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
101 throw SystemResourceException("pthread_attr_setschedpolicy failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000102 }
Marc Slemko66949872006-07-15 01:52:39 +0000103
104 struct sched_param sched_param;
Mark Slee2f6404d2006-10-10 01:37:40 +0000105 sched_param.sched_priority = priority_;
Marc Slemko66949872006-07-15 01:52:39 +0000106
107 // Set thread priority
Mark Slee2782d6d2007-05-23 04:55:30 +0000108 if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
109 throw SystemResourceException("pthread_attr_setschedparam failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000110 }
Marc Slemko66949872006-07-15 01:52:39 +0000111
Mark Sleef5f2be42006-09-05 21:05:31 +0000112 // Create reference
Marc Slemko6f038a72006-08-03 18:58:09 +0000113 shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
Mark Slee2f6404d2006-10-10 01:37:40 +0000114 *selfRef = self_.lock();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000115
Marc Slemko67606e52007-06-04 21:01:19 +0000116 state_ = starting;
117
Mark Slee2782d6d2007-05-23 04:55:30 +0000118 if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
119 throw SystemResourceException("pthread_create failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000120 }
Marc Slemko66949872006-07-15 01:52:39 +0000121 }
122
123 void join() {
Marc Slemko67606e52007-06-04 21:01:19 +0000124 if (!detached_ && state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +0000125 void* ignore;
Marc Slemkoa6479032007-06-05 22:20:14 +0000126 /* XXX
127 If join fails it is most likely due to the fact
128 that the last reference was the thread itself and cannot
129 join. This results in leaked threads and will eventually
130 cause the process to run out of thread resources.
131 We're beyond the point of throwing an exception. Not clear how
132 best to handle this. */
133 detached_ = pthread_join(pthread_, &ignore) == 0;
Marc Slemko66949872006-07-15 01:52:39 +0000134 }
135 }
136
Marc Slemkoa6479032007-06-05 22:20:14 +0000137 id_t getId() {
Mark Slee04f0b7b2007-08-21 01:54:36 +0000138 return reinterpret_cast<id_t>(pthread_);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000139 }
140
Mark Sleef5f2be42006-09-05 21:05:31 +0000141 shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000142
Mark Sleef5f2be42006-09-05 21:05:31 +0000143 void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000144
Marc Slemko6f038a72006-08-03 18:58:09 +0000145 void weakRef(shared_ptr<PthreadThread> self) {
Aditya Agarwal3f234da2007-04-01 01:19:57 +0000146 assert(self.get() == this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000147 self_ = weak_ptr<PthreadThread>(self);
Marc Slemko6f038a72006-08-03 18:58:09 +0000148 }
Marc Slemko66949872006-07-15 01:52:39 +0000149};
150
Marc Slemko8a40a762006-07-19 17:46:50 +0000151void* PthreadThread::threadMain(void* arg) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000152 shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
Marc Slemko6f038a72006-08-03 18:58:09 +0000153 delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
154
Mark Sleef5f2be42006-09-05 21:05:31 +0000155 if (thread == NULL) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000156 return (void*)0;
157 }
158
Mark Slee2f6404d2006-10-10 01:37:40 +0000159 if (thread->state_ != starting) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000160 return (void*)0;
161 }
162
Mark Slee2f6404d2006-10-10 01:37:40 +0000163 thread->state_ = starting;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000164 thread->runnable()->run();
Mark Slee2f6404d2006-10-10 01:37:40 +0000165 if (thread->state_ != stopping && thread->state_ != stopped) {
166 thread->state_ = stopping;
Marc Slemko8a40a762006-07-19 17:46:50 +0000167 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000168
Marc Slemko8a40a762006-07-19 17:46:50 +0000169 return (void*)0;
170}
171
Mark Sleef5f2be42006-09-05 21:05:31 +0000172/**
173 * POSIX Thread factory implementation
174 */
Marc Slemko66949872006-07-15 01:52:39 +0000175class PosixThreadFactory::Impl {
176
Mark Sleef5f2be42006-09-05 21:05:31 +0000177 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000178 POLICY policy_;
179 PRIORITY priority_;
180 int stackSize_;
181 bool detached_;
Marc Slemko66949872006-07-15 01:52:39 +0000182
Mark Sleef5f2be42006-09-05 21:05:31 +0000183 /**
184 * Converts generic posix thread schedule policy enums into pthread
185 * API values.
186 */
Marc Slemko66949872006-07-15 01:52:39 +0000187 static int toPthreadPolicy(POLICY policy) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000188 switch (policy) {
189 case OTHER:
190 return SCHED_OTHER;
191 case FIFO:
192 return SCHED_FIFO;
193 case ROUND_ROBIN:
194 return SCHED_RR;
Marc Slemko66949872006-07-15 01:52:39 +0000195 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000196 return SCHED_OTHER;
Marc Slemko66949872006-07-15 01:52:39 +0000197 }
198
Mark Sleef5f2be42006-09-05 21:05:31 +0000199 /**
200 * Converts relative thread priorities to absolute value based on posix
201 * thread scheduler policy
202 *
203 * The idea is simply to divide up the priority range for the given policy
204 * into the correpsonding relative priority level (lowest..highest) and
205 * then pro-rate accordingly.
206 */
Marc Slemko66949872006-07-15 01:52:39 +0000207 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
Marc Slemko66949872006-07-15 01:52:39 +0000208 int pthread_policy = toPthreadPolicy(policy);
Marc Slemko66949872006-07-15 01:52:39 +0000209 int min_priority = sched_get_priority_min(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000210 int max_priority = sched_get_priority_max(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000211 int quanta = (HIGHEST - LOWEST) + 1;
Marc Slemko66949872006-07-15 01:52:39 +0000212 float stepsperquanta = (max_priority - min_priority) / quanta;
213
Mark Slee29050782006-09-29 00:12:30 +0000214 if (priority <= HIGHEST) {
Marc Slemko66949872006-07-15 01:52:39 +0000215 return (int)(min_priority + stepsperquanta * priority);
216 } else {
Marc Slemko66949872006-07-15 01:52:39 +0000217 // should never get here for priority increments.
Marc Slemko66949872006-07-15 01:52:39 +0000218 assert(false);
Marc Slemko66949872006-07-15 01:52:39 +0000219 return (int)(min_priority + stepsperquanta * NORMAL);
220 }
221 }
222
Mark Sleef5f2be42006-09-05 21:05:31 +0000223 public:
Marc Slemko66949872006-07-15 01:52:39 +0000224
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000225 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000226 policy_(policy),
227 priority_(priority),
228 stackSize_(stackSize),
229 detached_(detached) {}
Marc Slemko66949872006-07-15 01:52:39 +0000230
Mark Sleef5f2be42006-09-05 21:05:31 +0000231 /**
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000232 * Creates a new POSIX thread to run the runnable object
Mark Sleef5f2be42006-09-05 21:05:31 +0000233 *
234 * @param runnable A runnable object
235 */
Marc Slemko6f038a72006-08-03 18:58:09 +0000236 shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
Marc Slemko67606e52007-06-04 21:01:19 +0000237 shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable));
Marc Slemko6f038a72006-08-03 18:58:09 +0000238 result->weakRef(result);
239 runnable->thread(result);
240 return result;
Marc Slemko66949872006-07-15 01:52:39 +0000241 }
242
Marc Slemkoa6479032007-06-05 22:20:14 +0000243 int getStackSize() const { return stackSize_; }
Marc Slemko66949872006-07-15 01:52:39 +0000244
Marc Slemkoa6479032007-06-05 22:20:14 +0000245 void setStackSize(int value) { stackSize_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000246
Marc Slemkoa6479032007-06-05 22:20:14 +0000247 PRIORITY getPriority() const { return priority_; }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000248
Mark Sleef5f2be42006-09-05 21:05:31 +0000249 /**
250 * Sets priority.
251 *
252 * XXX
253 * Need to handle incremental priorities properly.
254 */
Marc Slemkoa6479032007-06-05 22:20:14 +0000255 void setPriority(PRIORITY value) { priority_ = value; }
256
257 bool isDetached() const { return detached_; }
258
259 void setDetached(bool value) { detached_ = value; }
260
Mark Slee04f0b7b2007-08-21 01:54:36 +0000261 Thread::id_t getCurrentThreadId() const {return reinterpret_cast<id_t>(pthread_self());}
Marc Slemkoa6479032007-06-05 22:20:14 +0000262
Marc Slemko66949872006-07-15 01:52:39 +0000263};
264
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000265PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000266 impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
Marc Slemko66949872006-07-15 01:52:39 +0000267
Mark Slee2f6404d2006-10-10 01:37:40 +0000268shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
Marc Slemko66949872006-07-15 01:52:39 +0000269
Marc Slemkoa6479032007-06-05 22:20:14 +0000270int PosixThreadFactory::getStackSize() const { return impl_->getStackSize(); }
Marc Slemko66949872006-07-15 01:52:39 +0000271
Marc Slemkoa6479032007-06-05 22:20:14 +0000272void PosixThreadFactory::setStackSize(int value) { impl_->setStackSize(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000273
Marc Slemkoa6479032007-06-05 22:20:14 +0000274PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { return impl_->getPriority(); }
Marc Slemko66949872006-07-15 01:52:39 +0000275
Marc Slemkoa6479032007-06-05 22:20:14 +0000276void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { impl_->setPriority(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000277
Marc Slemkoa6479032007-06-05 22:20:14 +0000278bool PosixThreadFactory::isDetached() const { return impl_->isDetached(); }
279
280void PosixThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
281
282Thread::id_t PosixThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000283
Marc Slemko66949872006-07-15 01:52:39 +0000284}}} // facebook::thrift::concurrency