blob: 2e8ed474105e82f6a4c17b5e5af6fac3e734e667 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko66949872006-07-15 01:52:39 +00007#include "PosixThreadFactory.h"
Marc Slemko3a3b53b2007-05-22 23:59:54 +00008#include "Exception.h"
Marc Slemko66949872006-07-15 01:52:39 +00009
10#include <assert.h>
11#include <pthread.h>
12
Marc Slemko6f038a72006-08-03 18:58:09 +000013#include <iostream>
14
15#include <boost/weak_ptr.hpp>
16
Marc Slemko66949872006-07-15 01:52:39 +000017namespace facebook { namespace thrift { namespace concurrency {
18
Marc Slemko6f038a72006-08-03 18:58:09 +000019using namespace boost;
20
Mark Sleef5f2be42006-09-05 21:05:31 +000021/**
Marc Slemko3a3b53b2007-05-22 23:59:54 +000022 * The POSIX thread class.
Mark Sleef5f2be42006-09-05 21:05:31 +000023 *
24 * @author marc
25 * @version $Id:$
26 */
Marc Slemko66949872006-07-15 01:52:39 +000027class PthreadThread: public Thread {
Mark Sleef5f2be42006-09-05 21:05:31 +000028 public:
Marc Slemko66949872006-07-15 01:52:39 +000029
Mark Sleef5f2be42006-09-05 21:05:31 +000030 enum STATE {
31 uninitialized,
32 starting,
33 started,
34 stopping,
35 stopped
Marc Slemko66949872006-07-15 01:52:39 +000036 };
37
38 static const int MB = 1024 * 1024;
39
Marc Slemko8a40a762006-07-19 17:46:50 +000040 static void* threadMain(void* arg);
41
Mark Sleef5f2be42006-09-05 21:05:31 +000042 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000043 pthread_t pthread_;
44 STATE state_;
45 int policy_;
46 int priority_;
47 int stackSize_;
48 weak_ptr<PthreadThread> self_;
Marc Slemko67606e52007-06-04 21:01:19 +000049 bool detached_;
Marc Slemko6f038a72006-08-03 18:58:09 +000050
Mark Sleef5f2be42006-09-05 21:05:31 +000051 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000052
Marc Slemko67606e52007-06-04 21:01:19 +000053 PthreadThread(int policy, int priority, int stackSize, bool detached, shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +000054 pthread_(0),
Marc Slemko3a3b53b2007-05-22 23:59:54 +000055 state_(uninitialized),
Mark Slee2f6404d2006-10-10 01:37:40 +000056 policy_(policy),
57 priority_(priority),
Marc Slemko67606e52007-06-04 21:01:19 +000058 stackSize_(stackSize),
59 detached_(detached) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000060
61 this->Thread::runnable(runnable);
62 }
Marc Slemko66949872006-07-15 01:52:39 +000063
Marc Slemko67606e52007-06-04 21:01:19 +000064 ~PthreadThread() {
65 /* Nothing references this thread, if is is not detached, do a join
66 now, otherwise the thread-id and, possibly, other resources will
67 be leaked. */
68 if(!detached_) {
69 try {
70 join();
71 } catch(...) {
72 // We're really hosed.
73 }
74 }
75 }
Marc Slemko6f038a72006-08-03 18:58:09 +000076
Marc Slemko66949872006-07-15 01:52:39 +000077 void start() {
Mark Slee2f6404d2006-10-10 01:37:40 +000078 if (state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +000079 return;
80 }
81
Marc Slemko66949872006-07-15 01:52:39 +000082 pthread_attr_t thread_attr;
Mark Slee2782d6d2007-05-23 04:55:30 +000083 if (pthread_attr_init(&thread_attr) != 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000084 throw SystemResourceException("pthread_attr_init failed");
85 }
Aditya Agarwal9dc57402007-03-31 17:45:12 +000086
Marc Slemko67606e52007-06-04 21:01:19 +000087 if(pthread_attr_setdetachstate(&thread_attr,
88 detached_ ?
89 PTHREAD_CREATE_DETACHED :
90 PTHREAD_CREATE_JOINABLE) != 0) {
91 throw SystemResourceException("pthread_attr_setdetachstate failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +000092 }
Marc Slemko66949872006-07-15 01:52:39 +000093
94 // Set thread stack size
Mark Slee2782d6d2007-05-23 04:55:30 +000095 if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
96 throw SystemResourceException("pthread_attr_setstacksize failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +000097 }
Marc Slemko66949872006-07-15 01:52:39 +000098
99 // Set thread policy
Mark Slee2782d6d2007-05-23 04:55:30 +0000100 if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
101 throw SystemResourceException("pthread_attr_setschedpolicy failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000102 }
Marc Slemko66949872006-07-15 01:52:39 +0000103
104 struct sched_param sched_param;
Mark Slee2f6404d2006-10-10 01:37:40 +0000105 sched_param.sched_priority = priority_;
Marc Slemko66949872006-07-15 01:52:39 +0000106
107 // Set thread priority
Mark Slee2782d6d2007-05-23 04:55:30 +0000108 if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
109 throw SystemResourceException("pthread_attr_setschedparam failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000110 }
Marc Slemko66949872006-07-15 01:52:39 +0000111
Mark Sleef5f2be42006-09-05 21:05:31 +0000112 // Create reference
Marc Slemko6f038a72006-08-03 18:58:09 +0000113 shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
Mark Slee2f6404d2006-10-10 01:37:40 +0000114 *selfRef = self_.lock();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000115
Marc Slemko67606e52007-06-04 21:01:19 +0000116 state_ = starting;
117
Mark Slee2782d6d2007-05-23 04:55:30 +0000118 if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
119 throw SystemResourceException("pthread_create failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000120 }
Marc Slemko66949872006-07-15 01:52:39 +0000121 }
122
123 void join() {
Marc Slemko67606e52007-06-04 21:01:19 +0000124 if (!detached_ && state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +0000125 void* ignore;
Mark Slee2f6404d2006-10-10 01:37:40 +0000126 pthread_join(pthread_, &ignore);
Marc Slemko67606e52007-06-04 21:01:19 +0000127 detached_ = true;
Marc Slemko66949872006-07-15 01:52:39 +0000128 }
129 }
130
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000131 id_t id() {
Marc Slemko67606e52007-06-04 21:01:19 +0000132 return static_cast<id_t>(pthread_);
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000133 }
134
Mark Sleef5f2be42006-09-05 21:05:31 +0000135 shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000136
Mark Sleef5f2be42006-09-05 21:05:31 +0000137 void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000138
Marc Slemko6f038a72006-08-03 18:58:09 +0000139 void weakRef(shared_ptr<PthreadThread> self) {
Aditya Agarwal3f234da2007-04-01 01:19:57 +0000140 assert(self.get() == this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000141 self_ = weak_ptr<PthreadThread>(self);
Marc Slemko6f038a72006-08-03 18:58:09 +0000142 }
Marc Slemko66949872006-07-15 01:52:39 +0000143};
144
Marc Slemko8a40a762006-07-19 17:46:50 +0000145void* PthreadThread::threadMain(void* arg) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000146 shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
Marc Slemko6f038a72006-08-03 18:58:09 +0000147 delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
148
Mark Sleef5f2be42006-09-05 21:05:31 +0000149 if (thread == NULL) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000150 return (void*)0;
151 }
152
Mark Slee2f6404d2006-10-10 01:37:40 +0000153 if (thread->state_ != starting) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000154 return (void*)0;
155 }
156
Mark Slee2f6404d2006-10-10 01:37:40 +0000157 thread->state_ = starting;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000158 thread->runnable()->run();
Mark Slee2f6404d2006-10-10 01:37:40 +0000159 if (thread->state_ != stopping && thread->state_ != stopped) {
160 thread->state_ = stopping;
Marc Slemko8a40a762006-07-19 17:46:50 +0000161 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000162
Marc Slemko8a40a762006-07-19 17:46:50 +0000163 return (void*)0;
164}
165
Mark Sleef5f2be42006-09-05 21:05:31 +0000166/**
167 * POSIX Thread factory implementation
168 */
Marc Slemko66949872006-07-15 01:52:39 +0000169class PosixThreadFactory::Impl {
170
Mark Sleef5f2be42006-09-05 21:05:31 +0000171 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000172 POLICY policy_;
173 PRIORITY priority_;
174 int stackSize_;
175 bool detached_;
Marc Slemko66949872006-07-15 01:52:39 +0000176
Mark Sleef5f2be42006-09-05 21:05:31 +0000177 /**
178 * Converts generic posix thread schedule policy enums into pthread
179 * API values.
180 */
Marc Slemko66949872006-07-15 01:52:39 +0000181 static int toPthreadPolicy(POLICY policy) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000182 switch (policy) {
183 case OTHER:
184 return SCHED_OTHER;
185 case FIFO:
186 return SCHED_FIFO;
187 case ROUND_ROBIN:
188 return SCHED_RR;
Marc Slemko66949872006-07-15 01:52:39 +0000189 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000190 return SCHED_OTHER;
Marc Slemko66949872006-07-15 01:52:39 +0000191 }
192
Mark Sleef5f2be42006-09-05 21:05:31 +0000193 /**
194 * Converts relative thread priorities to absolute value based on posix
195 * thread scheduler policy
196 *
197 * The idea is simply to divide up the priority range for the given policy
198 * into the correpsonding relative priority level (lowest..highest) and
199 * then pro-rate accordingly.
200 */
Marc Slemko66949872006-07-15 01:52:39 +0000201 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
Marc Slemko66949872006-07-15 01:52:39 +0000202 int pthread_policy = toPthreadPolicy(policy);
Marc Slemko66949872006-07-15 01:52:39 +0000203 int min_priority = sched_get_priority_min(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000204 int max_priority = sched_get_priority_max(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000205 int quanta = (HIGHEST - LOWEST) + 1;
Marc Slemko66949872006-07-15 01:52:39 +0000206 float stepsperquanta = (max_priority - min_priority) / quanta;
207
Mark Slee29050782006-09-29 00:12:30 +0000208 if (priority <= HIGHEST) {
Marc Slemko66949872006-07-15 01:52:39 +0000209 return (int)(min_priority + stepsperquanta * priority);
210 } else {
Marc Slemko66949872006-07-15 01:52:39 +0000211 // should never get here for priority increments.
Marc Slemko66949872006-07-15 01:52:39 +0000212 assert(false);
Marc Slemko66949872006-07-15 01:52:39 +0000213 return (int)(min_priority + stepsperquanta * NORMAL);
214 }
215 }
216
Mark Sleef5f2be42006-09-05 21:05:31 +0000217 public:
Marc Slemko66949872006-07-15 01:52:39 +0000218
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000219 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000220 policy_(policy),
221 priority_(priority),
222 stackSize_(stackSize),
223 detached_(detached) {}
Marc Slemko66949872006-07-15 01:52:39 +0000224
Mark Sleef5f2be42006-09-05 21:05:31 +0000225 /**
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000226 * Creates a new POSIX thread to run the runnable object
Mark Sleef5f2be42006-09-05 21:05:31 +0000227 *
228 * @param runnable A runnable object
229 */
Marc Slemko6f038a72006-08-03 18:58:09 +0000230 shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
Marc Slemko67606e52007-06-04 21:01:19 +0000231 shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, detached_, runnable));
Marc Slemko6f038a72006-08-03 18:58:09 +0000232 result->weakRef(result);
233 runnable->thread(result);
234 return result;
Marc Slemko66949872006-07-15 01:52:39 +0000235 }
236
Mark Slee2f6404d2006-10-10 01:37:40 +0000237 int stackSize() const { return stackSize_; }
Marc Slemko66949872006-07-15 01:52:39 +0000238
Mark Slee2f6404d2006-10-10 01:37:40 +0000239 void stackSize(int value) { stackSize_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000240
Mark Slee2f6404d2006-10-10 01:37:40 +0000241 PRIORITY priority() const { return priority_; }
Marc Slemko66949872006-07-15 01:52:39 +0000242
Marc Slemko67606e52007-06-04 21:01:19 +0000243 Thread::id_t currentThreadId() const {return static_cast<id_t>(pthread_self());}
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000244
Mark Sleef5f2be42006-09-05 21:05:31 +0000245 /**
246 * Sets priority.
247 *
248 * XXX
249 * Need to handle incremental priorities properly.
250 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000251 void priority(PRIORITY value) { priority_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000252};
253
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000254PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000255 impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
Marc Slemko66949872006-07-15 01:52:39 +0000256
Mark Slee2f6404d2006-10-10 01:37:40 +0000257shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
Marc Slemko66949872006-07-15 01:52:39 +0000258
Mark Slee2f6404d2006-10-10 01:37:40 +0000259int PosixThreadFactory::stackSize() const { return impl_->stackSize(); }
Marc Slemko66949872006-07-15 01:52:39 +0000260
Mark Slee2f6404d2006-10-10 01:37:40 +0000261void PosixThreadFactory::stackSize(int value) { impl_->stackSize(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000262
Mark Slee2f6404d2006-10-10 01:37:40 +0000263PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return impl_->priority(); }
Marc Slemko66949872006-07-15 01:52:39 +0000264
Mark Slee2f6404d2006-10-10 01:37:40 +0000265void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { impl_->priority(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000266
Mark Slee2782d6d2007-05-23 04:55:30 +0000267Thread::id_t PosixThreadFactory::currentThreadId() const { return impl_->currentThreadId(); }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000268
Marc Slemko66949872006-07-15 01:52:39 +0000269}}} // facebook::thrift::concurrency