blob: 5f86ea27142f529e19dfc9b77af597a34dedc3cd [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko66949872006-07-15 01:52:39 +00007#include "PosixThreadFactory.h"
Marc Slemko3a3b53b2007-05-22 23:59:54 +00008#include "Exception.h"
Marc Slemko66949872006-07-15 01:52:39 +00009
10#include <assert.h>
11#include <pthread.h>
12
Marc Slemko6f038a72006-08-03 18:58:09 +000013#include <iostream>
14
15#include <boost/weak_ptr.hpp>
16
Marc Slemko66949872006-07-15 01:52:39 +000017namespace facebook { namespace thrift { namespace concurrency {
18
Marc Slemko6f038a72006-08-03 18:58:09 +000019using namespace boost;
20
Mark Sleef5f2be42006-09-05 21:05:31 +000021/**
Marc Slemko3a3b53b2007-05-22 23:59:54 +000022 * The POSIX thread class.
Mark Sleef5f2be42006-09-05 21:05:31 +000023 *
24 * @author marc
25 * @version $Id:$
26 */
Marc Slemko66949872006-07-15 01:52:39 +000027class PthreadThread: public Thread {
Mark Sleef5f2be42006-09-05 21:05:31 +000028 public:
Marc Slemko66949872006-07-15 01:52:39 +000029
Mark Sleef5f2be42006-09-05 21:05:31 +000030 enum STATE {
31 uninitialized,
32 starting,
33 started,
34 stopping,
35 stopped
Marc Slemko66949872006-07-15 01:52:39 +000036 };
37
38 static const int MB = 1024 * 1024;
39
Marc Slemko8a40a762006-07-19 17:46:50 +000040 static void* threadMain(void* arg);
41
Mark Sleef5f2be42006-09-05 21:05:31 +000042 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000043 pthread_t pthread_;
44 STATE state_;
45 int policy_;
46 int priority_;
47 int stackSize_;
48 weak_ptr<PthreadThread> self_;
Marc Slemko6f038a72006-08-03 18:58:09 +000049
Mark Sleef5f2be42006-09-05 21:05:31 +000050 public:
Marc Slemko3a3b53b2007-05-22 23:59:54 +000051
52 PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +000053 pthread_(0),
Marc Slemko3a3b53b2007-05-22 23:59:54 +000054 state_(uninitialized),
Mark Slee2f6404d2006-10-10 01:37:40 +000055 policy_(policy),
56 priority_(priority),
57 stackSize_(stackSize) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000058
59 this->Thread::runnable(runnable);
60 }
Marc Slemko66949872006-07-15 01:52:39 +000061
Mark Sleef5f2be42006-09-05 21:05:31 +000062 ~PthreadThread() {}
Marc Slemko6f038a72006-08-03 18:58:09 +000063
Marc Slemko66949872006-07-15 01:52:39 +000064 void start() {
Mark Slee2f6404d2006-10-10 01:37:40 +000065 if (state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +000066 return;
67 }
68
Mark Slee2f6404d2006-10-10 01:37:40 +000069 state_ = starting;
Marc Slemko66949872006-07-15 01:52:39 +000070
71 pthread_attr_t thread_attr;
Mark Slee2782d6d2007-05-23 04:55:30 +000072 if (pthread_attr_init(&thread_attr) != 0) {
Marc Slemko3a3b53b2007-05-22 23:59:54 +000073 throw SystemResourceException("pthread_attr_init failed");
74 }
Aditya Agarwal9dc57402007-03-31 17:45:12 +000075
Mark Slee2782d6d2007-05-23 04:55:30 +000076 if (pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) != 0) {
77 throw SystemResourceException("pthread_attr_setdetachstate failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +000078 }
Marc Slemko66949872006-07-15 01:52:39 +000079
80 // Set thread stack size
Mark Slee2782d6d2007-05-23 04:55:30 +000081 if (pthread_attr_setstacksize(&thread_attr, MB * stackSize_) != 0) {
82 throw SystemResourceException("pthread_attr_setstacksize failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +000083 }
Marc Slemko66949872006-07-15 01:52:39 +000084
85 // Set thread policy
Mark Slee2782d6d2007-05-23 04:55:30 +000086 if (pthread_attr_setschedpolicy(&thread_attr, policy_) != 0) {
87 throw SystemResourceException("pthread_attr_setschedpolicy failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +000088 }
Marc Slemko66949872006-07-15 01:52:39 +000089
90 struct sched_param sched_param;
Mark Slee2f6404d2006-10-10 01:37:40 +000091 sched_param.sched_priority = priority_;
Marc Slemko66949872006-07-15 01:52:39 +000092
93 // Set thread priority
Mark Slee2782d6d2007-05-23 04:55:30 +000094 if (pthread_attr_setschedparam(&thread_attr, &sched_param) != 0) {
95 throw SystemResourceException("pthread_attr_setschedparam failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +000096 }
Marc Slemko66949872006-07-15 01:52:39 +000097
Mark Sleef5f2be42006-09-05 21:05:31 +000098 // Create reference
Marc Slemko6f038a72006-08-03 18:58:09 +000099 shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
Mark Slee2f6404d2006-10-10 01:37:40 +0000100 *selfRef = self_.lock();
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000101
Mark Slee2782d6d2007-05-23 04:55:30 +0000102 if (pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) != 0) {
103 throw SystemResourceException("pthread_create failed");
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000104 }
Marc Slemko66949872006-07-15 01:52:39 +0000105 }
106
107 void join() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000108 if (state_ != stopped) {
Marc Slemko66949872006-07-15 01:52:39 +0000109 void* ignore;
Mark Slee2f6404d2006-10-10 01:37:40 +0000110 pthread_join(pthread_, &ignore);
Marc Slemko66949872006-07-15 01:52:39 +0000111 }
112 }
113
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000114 id_t id() {
115 return pthread_;
116 }
117
Mark Sleef5f2be42006-09-05 21:05:31 +0000118 shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000119
Mark Sleef5f2be42006-09-05 21:05:31 +0000120 void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000121
Marc Slemko6f038a72006-08-03 18:58:09 +0000122 void weakRef(shared_ptr<PthreadThread> self) {
Aditya Agarwal3f234da2007-04-01 01:19:57 +0000123 assert(self.get() == this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000124 self_ = weak_ptr<PthreadThread>(self);
Marc Slemko6f038a72006-08-03 18:58:09 +0000125 }
Marc Slemko66949872006-07-15 01:52:39 +0000126};
127
Marc Slemko8a40a762006-07-19 17:46:50 +0000128void* PthreadThread::threadMain(void* arg) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000129 shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
Marc Slemko6f038a72006-08-03 18:58:09 +0000130 delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
131
Mark Sleef5f2be42006-09-05 21:05:31 +0000132 if (thread == NULL) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000133 return (void*)0;
134 }
135
Mark Slee2f6404d2006-10-10 01:37:40 +0000136 if (thread->state_ != starting) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000137 return (void*)0;
138 }
139
Mark Slee2f6404d2006-10-10 01:37:40 +0000140 thread->state_ = starting;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000141 thread->runnable()->run();
Mark Slee2f6404d2006-10-10 01:37:40 +0000142 if (thread->state_ != stopping && thread->state_ != stopped) {
143 thread->state_ = stopping;
Marc Slemko8a40a762006-07-19 17:46:50 +0000144 }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000145
Marc Slemko8a40a762006-07-19 17:46:50 +0000146 return (void*)0;
147}
148
Mark Sleef5f2be42006-09-05 21:05:31 +0000149/**
150 * POSIX Thread factory implementation
151 */
Marc Slemko66949872006-07-15 01:52:39 +0000152class PosixThreadFactory::Impl {
153
Mark Sleef5f2be42006-09-05 21:05:31 +0000154 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000155 POLICY policy_;
156 PRIORITY priority_;
157 int stackSize_;
158 bool detached_;
Marc Slemko66949872006-07-15 01:52:39 +0000159
Mark Sleef5f2be42006-09-05 21:05:31 +0000160 /**
161 * Converts generic posix thread schedule policy enums into pthread
162 * API values.
163 */
Marc Slemko66949872006-07-15 01:52:39 +0000164 static int toPthreadPolicy(POLICY policy) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000165 switch (policy) {
166 case OTHER:
167 return SCHED_OTHER;
168 case FIFO:
169 return SCHED_FIFO;
170 case ROUND_ROBIN:
171 return SCHED_RR;
Marc Slemko66949872006-07-15 01:52:39 +0000172 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000173 return SCHED_OTHER;
Marc Slemko66949872006-07-15 01:52:39 +0000174 }
175
Mark Sleef5f2be42006-09-05 21:05:31 +0000176 /**
177 * Converts relative thread priorities to absolute value based on posix
178 * thread scheduler policy
179 *
180 * The idea is simply to divide up the priority range for the given policy
181 * into the correpsonding relative priority level (lowest..highest) and
182 * then pro-rate accordingly.
183 */
Marc Slemko66949872006-07-15 01:52:39 +0000184 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
Marc Slemko66949872006-07-15 01:52:39 +0000185 int pthread_policy = toPthreadPolicy(policy);
Marc Slemko66949872006-07-15 01:52:39 +0000186 int min_priority = sched_get_priority_min(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000187 int max_priority = sched_get_priority_max(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000188 int quanta = (HIGHEST - LOWEST) + 1;
Marc Slemko66949872006-07-15 01:52:39 +0000189 float stepsperquanta = (max_priority - min_priority) / quanta;
190
Mark Slee29050782006-09-29 00:12:30 +0000191 if (priority <= HIGHEST) {
Marc Slemko66949872006-07-15 01:52:39 +0000192 return (int)(min_priority + stepsperquanta * priority);
193 } else {
Marc Slemko66949872006-07-15 01:52:39 +0000194 // should never get here for priority increments.
Marc Slemko66949872006-07-15 01:52:39 +0000195 assert(false);
Marc Slemko66949872006-07-15 01:52:39 +0000196 return (int)(min_priority + stepsperquanta * NORMAL);
197 }
198 }
199
Mark Sleef5f2be42006-09-05 21:05:31 +0000200 public:
Marc Slemko66949872006-07-15 01:52:39 +0000201
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000202 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000203 policy_(policy),
204 priority_(priority),
205 stackSize_(stackSize),
206 detached_(detached) {}
Marc Slemko66949872006-07-15 01:52:39 +0000207
Mark Sleef5f2be42006-09-05 21:05:31 +0000208 /**
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000209 * Creates a new POSIX thread to run the runnable object
Mark Sleef5f2be42006-09-05 21:05:31 +0000210 *
211 * @param runnable A runnable object
212 */
Marc Slemko6f038a72006-08-03 18:58:09 +0000213 shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
Mark Slee2f6404d2006-10-10 01:37:40 +0000214 shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, runnable));
Marc Slemko6f038a72006-08-03 18:58:09 +0000215 result->weakRef(result);
216 runnable->thread(result);
217 return result;
Marc Slemko66949872006-07-15 01:52:39 +0000218 }
219
Mark Slee2f6404d2006-10-10 01:37:40 +0000220 int stackSize() const { return stackSize_; }
Marc Slemko66949872006-07-15 01:52:39 +0000221
Mark Slee2f6404d2006-10-10 01:37:40 +0000222 void stackSize(int value) { stackSize_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000223
Mark Slee2f6404d2006-10-10 01:37:40 +0000224 PRIORITY priority() const { return priority_; }
Marc Slemko66949872006-07-15 01:52:39 +0000225
Mark Slee2782d6d2007-05-23 04:55:30 +0000226 Thread::id_t currentThreadId() const { return pthread_self(); }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000227
Mark Sleef5f2be42006-09-05 21:05:31 +0000228 /**
229 * Sets priority.
230 *
231 * XXX
232 * Need to handle incremental priorities properly.
233 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000234 void priority(PRIORITY value) { priority_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000235};
236
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000237PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000238 impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
Marc Slemko66949872006-07-15 01:52:39 +0000239
Mark Slee2f6404d2006-10-10 01:37:40 +0000240shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
Marc Slemko66949872006-07-15 01:52:39 +0000241
Mark Slee2f6404d2006-10-10 01:37:40 +0000242int PosixThreadFactory::stackSize() const { return impl_->stackSize(); }
Marc Slemko66949872006-07-15 01:52:39 +0000243
Mark Slee2f6404d2006-10-10 01:37:40 +0000244void PosixThreadFactory::stackSize(int value) { impl_->stackSize(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000245
Mark Slee2f6404d2006-10-10 01:37:40 +0000246PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return impl_->priority(); }
Marc Slemko66949872006-07-15 01:52:39 +0000247
Mark Slee2f6404d2006-10-10 01:37:40 +0000248void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { impl_->priority(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000249
Mark Slee2782d6d2007-05-23 04:55:30 +0000250Thread::id_t PosixThreadFactory::currentThreadId() const { return impl_->currentThreadId(); }
Marc Slemko3a3b53b2007-05-22 23:59:54 +0000251
Marc Slemko66949872006-07-15 01:52:39 +0000252}}} // facebook::thrift::concurrency