blob: 3f356811d9ba3f368e3472b22d83e9b554054d6a [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko66949872006-07-15 01:52:39 +00007#include "PosixThreadFactory.h"
Marc Slemko3a3b53b2007-05-22 23:59:54 +00008#include "Exception.h"
Marc Slemko66949872006-07-15 01:52:39 +00009
10#include <assert.h>
11#include <pthread.h>
12
Marc Slemko6f038a72006-08-03 18:58:09 +000013#include <iostream>
14
15#include <boost/weak_ptr.hpp>
16
Marc Slemko66949872006-07-15 01:52:39 +000017namespace facebook { namespace thrift { namespace concurrency {
18
Marc Slemko6f038a72006-08-03 18:58:09 +000019using namespace boost;
20
Mark Sleef5f2be42006-09-05 21:05:31 +000021/**
Marc Slemko3a3b53b2007-05-22 23:59:54 +000022 * The POSIX thread class.
Mark Sleef5f2be42006-09-05 21:05:31 +000023 *
24 * @author marc
25 * @version $Id:$
26 */
Marc Slemko66949872006-07-15 01:52:39 +000027class PthreadThread: public Thread {
Mark Sleef5f2be42006-09-05 21:05:31 +000028 public:
Marc Slemko66949872006-07-15 01:52:39 +000029
Mark Sleef5f2be42006-09-05 21:05:31 +000030 enum STATE {
31 uninitialized,
32 starting,
33 started,
34 stopping,
35 stopped
Marc Slemko66949872006-07-15 01:52:39 +000036 };
37
38 static const int MB = 1024 * 1024;
39
Marc Slemko8a40a762006-07-19 17:46:50 +000040 static void* threadMain(void* arg);
41
Mark Sleef5f2be42006-09-05 21:05:31 +000042 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000043 pthread_t pthread_;
44 STATE state_;
45 int policy_;
46 int priority_;
47 int stackSize_;
48 weak_ptr<PthreadThread> self_;
Marc Slemko67606e52007-06-04 21:01:19 +000049 bool detached_;
Marc Slemko6f038a72006-08-03 18:58:09 +000050
Mark Sleef5f2be42006-09-05 21:05:31 +000051 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000052
Marc Slemko67606e52007-06-04 21:01:19 +000053 PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +000054 pthread_(0),
Marc Slemko3a3b53b2007-05-22 23:59:54 +000055 state_(uninitialized),
Mark Slee2f6404d2006-10-10 01:37:40 +000056 policy_(policy),
57 priority_(priority),
Marc Slemko67606e52007-06-04 21:01:19 +000058 stackSize_(stackSize),
59 detached_(detached) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000060
61 this->Thread::runnable(runnable);
62 }
Marc Slemko66949872006-07-15 01:52:39 +000063
Marc Slemko67606e52007-06-04 21:01:19 +000064 ~PthreadThread() {
65 /* Nothing references this thread, if is is not detached, do a join
Marc Slemkoa6479032007-06-05 22:20:14 +000066 now, otherwise the thread-id and, possibly, other resources will
Marc Slemko67606e52007-06-04 21:01:19 +000067 be leaked. */
68 if(!detached_) {
69 try {
70 join();
71 } catch(...) {
Marc Slemkoa6479032007-06-05 22:20:14 +000072 // We're really hosed.
Marc Slemko67606e52007-06-04 21:01:19 +000073 }
74 }
75 }
Marc Slemko6f038a72006-08-03 18:58:09 +000076
Marc Slemko66949872006-07-15 01:52:39 +000077 void start() {
Mark Slee2f6404d2006-10-10 01:37:40 +000078 if (state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +000079 return;
80 }
81
Marc Slemko66949872006-07-15 01:52:39 +000082 pthread_attr_t thread_attr;
Mark Slee2782d6d2007-05-23 04:55:30 +000083 if (pthread_attr_init(&thread_attr) != 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000084 throw SystemResourceException("pthread_attr_init failed");
85 }
Aditya Agarwal9dc57402007-03-31 17:45:12 +000086
Marc Slemkoa6479032007-06-05 22:20:14 +000087 if(pthread_attr_setdetachstate(&thread_attr,
88 detached_ ?
89 PTHREAD_CREATE_DETACHED :
Marc Slemko67606e52007-06-04 21:01:19 +000090 PTHREAD_CREATE_JOINABLE) != 0) {
91 throw SystemResourceException("pthread_attr_setdetachstate failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +000092 }
Marc Slemko66949872006-07-15 01:52:39 +000093
94 // Set thread stack size
Mark Slee2782d6d2007-05-23 04:55:30 +000095 if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
96 throw SystemResourceException("pthread_attr_setstacksize failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +000097 }
Marc Slemko66949872006-07-15 01:52:39 +000098
99 // Set thread policy
Mark Slee2782d6d2007-05-23 04:55:30 +0000100 if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
101 throw SystemResourceException("pthread_attr_setschedpolicy failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000102 }
Marc Slemko66949872006-07-15 01:52:39 +0000103
104 struct sched_param sched_param;
Mark Slee2f6404d2006-10-10 01:37:40 +0000105 sched_param.sched_priority = priority_;
Marc Slemko66949872006-07-15 01:52:39 +0000106
107 // Set thread priority
Mark Slee2782d6d2007-05-23 04:55:30 +0000108 if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
109 throw SystemResourceException("pthread_attr_setschedparam failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000110 }
Marc Slemko66949872006-07-15 01:52:39 +0000111
Mark Sleef5f2be42006-09-05 21:05:31 +0000112 // Create reference
Marc Slemko6f038a72006-08-03 18:58:09 +0000113 shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
Mark Slee2f6404d2006-10-10 01:37:40 +0000114 *selfRef = self_.lock();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000115
Marc Slemko67606e52007-06-04 21:01:19 +0000116 state_ = starting;
117
Mark Slee2782d6d2007-05-23 04:55:30 +0000118 if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
119 throw SystemResourceException("pthread_create failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000120 }
Marc Slemko66949872006-07-15 01:52:39 +0000121 }
122
123 void join() {
Marc Slemko67606e52007-06-04 21:01:19 +0000124 if (!detached_ && state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +0000125 void* ignore;
Marc Slemkoa6479032007-06-05 22:20:14 +0000126 /* XXX
127 If join fails it is most likely due to the fact
128 that the last reference was the thread itself and cannot
129 join. This results in leaked threads and will eventually
130 cause the process to run out of thread resources.
131 We're beyond the point of throwing an exception. Not clear how
132 best to handle this. */
133 detached_ = pthread_join(pthread_, &ignore) == 0;
Marc Slemko66949872006-07-15 01:52:39 +0000134 }
135 }
136
Marc Slemkoa6479032007-06-05 22:20:14 +0000137 id_t getId() {
Mark Slee98439152007-08-21 02:39:40 +0000138 // TODO(dreiss): Stop using C-style casts.
139 return (id_t)pthread_;
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000140 }
141
Mark Sleef5f2be42006-09-05 21:05:31 +0000142 shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000143
Mark Sleef5f2be42006-09-05 21:05:31 +0000144 void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000145
Marc Slemko6f038a72006-08-03 18:58:09 +0000146 void weakRef(shared_ptr<PthreadThread> self) {
Aditya Agarwal3f234da2007-04-01 01:19:57 +0000147 assert(self.get() == this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000148 self_ = weak_ptr<PthreadThread>(self);
Marc Slemko6f038a72006-08-03 18:58:09 +0000149 }
Marc Slemko66949872006-07-15 01:52:39 +0000150};
151
Marc Slemko8a40a762006-07-19 17:46:50 +0000152void* PthreadThread::threadMain(void* arg) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000153 shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
Marc Slemko6f038a72006-08-03 18:58:09 +0000154 delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
155
Mark Sleef5f2be42006-09-05 21:05:31 +0000156 if (thread == NULL) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000157 return (void*)0;
158 }
159
Mark Slee2f6404d2006-10-10 01:37:40 +0000160 if (thread->state_ != starting) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000161 return (void*)0;
162 }
163
Mark Slee2f6404d2006-10-10 01:37:40 +0000164 thread->state_ = starting;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000165 thread->runnable()->run();
Mark Slee2f6404d2006-10-10 01:37:40 +0000166 if (thread->state_ != stopping && thread->state_ != stopped) {
167 thread->state_ = stopping;
Marc Slemko8a40a762006-07-19 17:46:50 +0000168 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000169
Marc Slemko8a40a762006-07-19 17:46:50 +0000170 return (void*)0;
171}
172
Mark Sleef5f2be42006-09-05 21:05:31 +0000173/**
174 * POSIX Thread factory implementation
175 */
Marc Slemko66949872006-07-15 01:52:39 +0000176class PosixThreadFactory::Impl {
177
Mark Sleef5f2be42006-09-05 21:05:31 +0000178 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000179 POLICY policy_;
180 PRIORITY priority_;
181 int stackSize_;
182 bool detached_;
Marc Slemko66949872006-07-15 01:52:39 +0000183
Mark Sleef5f2be42006-09-05 21:05:31 +0000184 /**
185 * Converts generic posix thread schedule policy enums into pthread
186 * API values.
187 */
Marc Slemko66949872006-07-15 01:52:39 +0000188 static int toPthreadPolicy(POLICY policy) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000189 switch (policy) {
190 case OTHER:
191 return SCHED_OTHER;
192 case FIFO:
193 return SCHED_FIFO;
194 case ROUND_ROBIN:
195 return SCHED_RR;
Marc Slemko66949872006-07-15 01:52:39 +0000196 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000197 return SCHED_OTHER;
Marc Slemko66949872006-07-15 01:52:39 +0000198 }
199
Mark Sleef5f2be42006-09-05 21:05:31 +0000200 /**
201 * Converts relative thread priorities to absolute value based on posix
202 * thread scheduler policy
203 *
204 * The idea is simply to divide up the priority range for the given policy
205 * into the correpsonding relative priority level (lowest..highest) and
206 * then pro-rate accordingly.
207 */
Marc Slemko66949872006-07-15 01:52:39 +0000208 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
Marc Slemko66949872006-07-15 01:52:39 +0000209 int pthread_policy = toPthreadPolicy(policy);
Marc Slemko66949872006-07-15 01:52:39 +0000210 int min_priority = sched_get_priority_min(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000211 int max_priority = sched_get_priority_max(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000212 int quanta = (HIGHEST - LOWEST) + 1;
Marc Slemko66949872006-07-15 01:52:39 +0000213 float stepsperquanta = (max_priority - min_priority) / quanta;
214
Mark Slee29050782006-09-29 00:12:30 +0000215 if (priority <= HIGHEST) {
Marc Slemko66949872006-07-15 01:52:39 +0000216 return (int)(min_priority + stepsperquanta * priority);
217 } else {
Marc Slemko66949872006-07-15 01:52:39 +0000218 // should never get here for priority increments.
Marc Slemko66949872006-07-15 01:52:39 +0000219 assert(false);
Marc Slemko66949872006-07-15 01:52:39 +0000220 return (int)(min_priority + stepsperquanta * NORMAL);
221 }
222 }
223
Mark Sleef5f2be42006-09-05 21:05:31 +0000224 public:
Marc Slemko66949872006-07-15 01:52:39 +0000225
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000226 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000227 policy_(policy),
228 priority_(priority),
229 stackSize_(stackSize),
230 detached_(detached) {}
Marc Slemko66949872006-07-15 01:52:39 +0000231
Mark Sleef5f2be42006-09-05 21:05:31 +0000232 /**
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000233 * Creates a new POSIX thread to run the runnable object
Mark Sleef5f2be42006-09-05 21:05:31 +0000234 *
235 * @param runnable A runnable object
236 */
Marc Slemko6f038a72006-08-03 18:58:09 +0000237 shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
Marc Slemko67606e52007-06-04 21:01:19 +0000238 shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable));
Marc Slemko6f038a72006-08-03 18:58:09 +0000239 result->weakRef(result);
240 runnable->thread(result);
241 return result;
Marc Slemko66949872006-07-15 01:52:39 +0000242 }
243
Marc Slemkoa6479032007-06-05 22:20:14 +0000244 int getStackSize() const { return stackSize_; }
Marc Slemko66949872006-07-15 01:52:39 +0000245
Marc Slemkoa6479032007-06-05 22:20:14 +0000246 void setStackSize(int value) { stackSize_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000247
Marc Slemkoa6479032007-06-05 22:20:14 +0000248 PRIORITY getPriority() const { return priority_; }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000249
Mark Sleef5f2be42006-09-05 21:05:31 +0000250 /**
251 * Sets priority.
252 *
253 * XXX
254 * Need to handle incremental priorities properly.
255 */
Marc Slemkoa6479032007-06-05 22:20:14 +0000256 void setPriority(PRIORITY value) { priority_ = value; }
257
258 bool isDetached() const { return detached_; }
259
260 void setDetached(bool value) { detached_ = value; }
261
Mark Slee98439152007-08-21 02:39:40 +0000262 Thread::id_t getCurrentThreadId() const {
263 // TODO(dreiss): Stop using C-style casts.
264 return (id_t)pthread_self();
265 }
Marc Slemkoa6479032007-06-05 22:20:14 +0000266
Marc Slemko66949872006-07-15 01:52:39 +0000267};
268
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000269PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000270 impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
Marc Slemko66949872006-07-15 01:52:39 +0000271
Mark Slee2f6404d2006-10-10 01:37:40 +0000272shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
Marc Slemko66949872006-07-15 01:52:39 +0000273
Marc Slemkoa6479032007-06-05 22:20:14 +0000274int PosixThreadFactory::getStackSize() const { return impl_->getStackSize(); }
Marc Slemko66949872006-07-15 01:52:39 +0000275
Marc Slemkoa6479032007-06-05 22:20:14 +0000276void PosixThreadFactory::setStackSize(int value) { impl_->setStackSize(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000277
Marc Slemkoa6479032007-06-05 22:20:14 +0000278PosixThreadFactory::PRIORITY PosixThreadFactory::getPriority() const { return impl_->getPriority(); }
Marc Slemko66949872006-07-15 01:52:39 +0000279
Marc Slemkoa6479032007-06-05 22:20:14 +0000280void PosixThreadFactory::setPriority(PosixThreadFactory::PRIORITY value) { impl_->setPriority(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000281
Marc Slemkoa6479032007-06-05 22:20:14 +0000282bool PosixThreadFactory::isDetached() const { return impl_->isDetached(); }
283
284void PosixThreadFactory::setDetached(bool value) { impl_->setDetached(value); }
285
286Thread::id_t PosixThreadFactory::getCurrentThreadId() const { return impl_->getCurrentThreadId(); }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000287
Marc Slemko66949872006-07-15 01:52:39 +0000288}}} // facebook::thrift::concurrency