blob: 74a3ec318e28839a9e6613f0bed6e5953fd1f35a [file] [log] [blame]
Marc Slemko66949872006-07-15 01:52:39 +00001#include "PosixThreadFactory.h"
2
3#include <assert.h>
4#include <pthread.h>
5
Marc Slemko6f038a72006-08-03 18:58:09 +00006#include <iostream>
7
8#include <boost/weak_ptr.hpp>
9
Marc Slemko66949872006-07-15 01:52:39 +000010namespace facebook { namespace thrift { namespace concurrency {
11
Marc Slemko6f038a72006-08-03 18:58:09 +000012using namespace boost;
13
Mark Sleef5f2be42006-09-05 21:05:31 +000014/**
15 * The POSIX thread class.
16 *
17 * @author marc
18 * @version $Id:$
19 */
Marc Slemko66949872006-07-15 01:52:39 +000020class PthreadThread: public Thread {
Mark Sleef5f2be42006-09-05 21:05:31 +000021 public:
Marc Slemko66949872006-07-15 01:52:39 +000022
Mark Sleef5f2be42006-09-05 21:05:31 +000023 enum STATE {
24 uninitialized,
25 starting,
26 started,
27 stopping,
28 stopped
Marc Slemko66949872006-07-15 01:52:39 +000029 };
30
31 static const int MB = 1024 * 1024;
32
Marc Slemko8a40a762006-07-19 17:46:50 +000033 static void* threadMain(void* arg);
34
Mark Sleef5f2be42006-09-05 21:05:31 +000035 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000036 pthread_t pthread_;
37 STATE state_;
38 int policy_;
39 int priority_;
40 int stackSize_;
41 weak_ptr<PthreadThread> self_;
Marc Slemko6f038a72006-08-03 18:58:09 +000042
Mark Sleef5f2be42006-09-05 21:05:31 +000043 public:
Marc Slemko66949872006-07-15 01:52:39 +000044
Marc Slemko6f038a72006-08-03 18:58:09 +000045 PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +000046 pthread_(0),
47 state_(uninitialized),
48 policy_(policy),
49 priority_(priority),
50 stackSize_(stackSize) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000051
52 this->Thread::runnable(runnable);
53 }
Marc Slemko66949872006-07-15 01:52:39 +000054
Mark Sleef5f2be42006-09-05 21:05:31 +000055 ~PthreadThread() {}
Marc Slemko6f038a72006-08-03 18:58:09 +000056
Marc Slemko66949872006-07-15 01:52:39 +000057 void start() {
Mark Slee2f6404d2006-10-10 01:37:40 +000058 if (state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +000059 return;
60 }
61
Mark Slee2f6404d2006-10-10 01:37:40 +000062 state_ = starting;
Marc Slemko66949872006-07-15 01:52:39 +000063
64 pthread_attr_t thread_attr;
Marc Slemko66949872006-07-15 01:52:39 +000065 assert(pthread_attr_init(&thread_attr) == 0);
Marc Slemko66949872006-07-15 01:52:39 +000066 assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0);
67
68 // Set thread stack size
Mark Slee2f6404d2006-10-10 01:37:40 +000069 assert(pthread_attr_setstacksize(&thread_attr, MB * stackSize_) == 0);
Marc Slemko66949872006-07-15 01:52:39 +000070
71 // Set thread policy
Mark Slee2f6404d2006-10-10 01:37:40 +000072 assert(pthread_attr_setschedpolicy(&thread_attr, policy_) == 0);
Marc Slemko66949872006-07-15 01:52:39 +000073
74 struct sched_param sched_param;
Mark Slee2f6404d2006-10-10 01:37:40 +000075 sched_param.sched_priority = priority_;
Marc Slemko66949872006-07-15 01:52:39 +000076
77 // Set thread priority
Marc Slemko6f038a72006-08-03 18:58:09 +000078 assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
Marc Slemko66949872006-07-15 01:52:39 +000079
Mark Sleef5f2be42006-09-05 21:05:31 +000080 // Create reference
Marc Slemko6f038a72006-08-03 18:58:09 +000081 shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
Mark Slee2f6404d2006-10-10 01:37:40 +000082 *selfRef = self_.lock();
83 assert(pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) == 0);
Marc Slemko66949872006-07-15 01:52:39 +000084 }
85
86 void join() {
Mark Slee2f6404d2006-10-10 01:37:40 +000087 if (state_ != stopped) {
Marc Slemko66949872006-07-15 01:52:39 +000088 void* ignore;
Mark Slee2f6404d2006-10-10 01:37:40 +000089 pthread_join(pthread_, &ignore);
Marc Slemko66949872006-07-15 01:52:39 +000090 }
91 }
92
Mark Sleef5f2be42006-09-05 21:05:31 +000093 shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000094
Mark Sleef5f2be42006-09-05 21:05:31 +000095 void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
Marc Slemko66949872006-07-15 01:52:39 +000096
Marc Slemko6f038a72006-08-03 18:58:09 +000097 void weakRef(shared_ptr<PthreadThread> self) {
98 assert(self.get() == this);
Mark Slee2f6404d2006-10-10 01:37:40 +000099 self_ = weak_ptr<PthreadThread>(self);
Marc Slemko6f038a72006-08-03 18:58:09 +0000100 }
Marc Slemko66949872006-07-15 01:52:39 +0000101};
102
Marc Slemko8a40a762006-07-19 17:46:50 +0000103void* PthreadThread::threadMain(void* arg) {
104 // XXX need a lock here when testing thread state
Marc Slemko6f038a72006-08-03 18:58:09 +0000105 shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
Marc Slemko6f038a72006-08-03 18:58:09 +0000106 delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
107
Mark Sleef5f2be42006-09-05 21:05:31 +0000108 if (thread == NULL) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000109 return (void*)0;
110 }
111
Mark Slee2f6404d2006-10-10 01:37:40 +0000112 if (thread->state_ != starting) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000113 return (void*)0;
114 }
115
Mark Slee2f6404d2006-10-10 01:37:40 +0000116 thread->state_ = starting;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000117 thread->runnable()->run();
Mark Slee2f6404d2006-10-10 01:37:40 +0000118 if (thread->state_ != stopping && thread->state_ != stopped) {
119 thread->state_ = stopping;
Marc Slemko8a40a762006-07-19 17:46:50 +0000120 }
121
122 return (void*)0;
123}
124
Mark Sleef5f2be42006-09-05 21:05:31 +0000125/**
126 * POSIX Thread factory implementation
127 */
Marc Slemko66949872006-07-15 01:52:39 +0000128class PosixThreadFactory::Impl {
129
Mark Sleef5f2be42006-09-05 21:05:31 +0000130 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000131 POLICY policy_;
132 PRIORITY priority_;
133 int stackSize_;
134 bool detached_;
Marc Slemko66949872006-07-15 01:52:39 +0000135
Mark Sleef5f2be42006-09-05 21:05:31 +0000136 /**
137 * Converts generic posix thread schedule policy enums into pthread
138 * API values.
139 */
Marc Slemko66949872006-07-15 01:52:39 +0000140 static int toPthreadPolicy(POLICY policy) {
141 switch(policy) {
142 case OTHER: return SCHED_OTHER; break;
143 case FIFO: return SCHED_FIFO; break;
144 case ROUND_ROBIN: return SCHED_RR; break;
145 default: return SCHED_OTHER; break;
146 }
147 }
148
Mark Sleef5f2be42006-09-05 21:05:31 +0000149 /**
150 * Converts relative thread priorities to absolute value based on posix
151 * thread scheduler policy
152 *
153 * The idea is simply to divide up the priority range for the given policy
154 * into the correpsonding relative priority level (lowest..highest) and
155 * then pro-rate accordingly.
156 */
Marc Slemko66949872006-07-15 01:52:39 +0000157 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
Marc Slemko66949872006-07-15 01:52:39 +0000158 int pthread_policy = toPthreadPolicy(policy);
Marc Slemko66949872006-07-15 01:52:39 +0000159 int min_priority = sched_get_priority_min(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000160 int max_priority = sched_get_priority_max(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000161 int quanta = (HIGHEST - LOWEST) + 1;
Marc Slemko66949872006-07-15 01:52:39 +0000162 float stepsperquanta = (max_priority - min_priority) / quanta;
163
Mark Slee29050782006-09-29 00:12:30 +0000164 if (priority <= HIGHEST) {
Marc Slemko66949872006-07-15 01:52:39 +0000165 return (int)(min_priority + stepsperquanta * priority);
166 } else {
Marc Slemko66949872006-07-15 01:52:39 +0000167 // should never get here for priority increments.
Marc Slemko66949872006-07-15 01:52:39 +0000168 assert(false);
Marc Slemko66949872006-07-15 01:52:39 +0000169 return (int)(min_priority + stepsperquanta * NORMAL);
170 }
171 }
172
Mark Sleef5f2be42006-09-05 21:05:31 +0000173 public:
Marc Slemko66949872006-07-15 01:52:39 +0000174
175 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000176 policy_(policy),
177 priority_(priority),
178 stackSize_(stackSize),
179 detached_(detached) {}
Marc Slemko66949872006-07-15 01:52:39 +0000180
Mark Sleef5f2be42006-09-05 21:05:31 +0000181 /**
182 * Creates a new POSIX thread to run the runnable object
183 *
184 * @param runnable A runnable object
185 */
Marc Slemko6f038a72006-08-03 18:58:09 +0000186 shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
Mark Slee2f6404d2006-10-10 01:37:40 +0000187 shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, runnable));
Marc Slemko6f038a72006-08-03 18:58:09 +0000188 result->weakRef(result);
189 runnable->thread(result);
190 return result;
Marc Slemko66949872006-07-15 01:52:39 +0000191 }
192
Mark Slee2f6404d2006-10-10 01:37:40 +0000193 int stackSize() const { return stackSize_; }
Marc Slemko66949872006-07-15 01:52:39 +0000194
Mark Slee2f6404d2006-10-10 01:37:40 +0000195 void stackSize(int value) { stackSize_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000196
Mark Slee2f6404d2006-10-10 01:37:40 +0000197 PRIORITY priority() const { return priority_; }
Marc Slemko66949872006-07-15 01:52:39 +0000198
Mark Sleef5f2be42006-09-05 21:05:31 +0000199 /**
200 * Sets priority.
201 *
202 * XXX
203 * Need to handle incremental priorities properly.
204 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000205 void priority(PRIORITY value) { priority_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000206};
207
208PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000209 impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
Marc Slemko66949872006-07-15 01:52:39 +0000210
Mark Slee2f6404d2006-10-10 01:37:40 +0000211shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
Marc Slemko66949872006-07-15 01:52:39 +0000212
Mark Slee2f6404d2006-10-10 01:37:40 +0000213int PosixThreadFactory::stackSize() const { return impl_->stackSize(); }
Marc Slemko66949872006-07-15 01:52:39 +0000214
Mark Slee2f6404d2006-10-10 01:37:40 +0000215void PosixThreadFactory::stackSize(int value) { impl_->stackSize(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000216
Mark Slee2f6404d2006-10-10 01:37:40 +0000217PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return impl_->priority(); }
Marc Slemko66949872006-07-15 01:52:39 +0000218
Mark Slee2f6404d2006-10-10 01:37:40 +0000219void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { impl_->priority(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000220
221}}} // facebook::thrift::concurrency