blob: 675e95d2b6abaaad69dac0f533206330efc6fafe [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko66949872006-07-15 01:52:39 +00007#include "PosixThreadFactory.h"
8
9#include <assert.h>
10#include <pthread.h>
11
Marc Slemko6f038a72006-08-03 18:58:09 +000012#include <iostream>
13
14#include <boost/weak_ptr.hpp>
15
Marc Slemko66949872006-07-15 01:52:39 +000016namespace facebook { namespace thrift { namespace concurrency {
17
Marc Slemko6f038a72006-08-03 18:58:09 +000018using namespace boost;
19
Mark Sleef5f2be42006-09-05 21:05:31 +000020/**
21 * The POSIX thread class.
22 *
23 * @author marc
24 * @version $Id:$
25 */
Marc Slemko66949872006-07-15 01:52:39 +000026class PthreadThread: public Thread {
Mark Sleef5f2be42006-09-05 21:05:31 +000027 public:
Marc Slemko66949872006-07-15 01:52:39 +000028
Mark Sleef5f2be42006-09-05 21:05:31 +000029 enum STATE {
30 uninitialized,
31 starting,
32 started,
33 stopping,
34 stopped
Marc Slemko66949872006-07-15 01:52:39 +000035 };
36
37 static const int MB = 1024 * 1024;
38
Marc Slemko8a40a762006-07-19 17:46:50 +000039 static void* threadMain(void* arg);
40
Mark Sleef5f2be42006-09-05 21:05:31 +000041 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000042 pthread_t pthread_;
43 STATE state_;
44 int policy_;
45 int priority_;
46 int stackSize_;
47 weak_ptr<PthreadThread> self_;
Marc Slemko6f038a72006-08-03 18:58:09 +000048
Mark Sleef5f2be42006-09-05 21:05:31 +000049 public:
Marc Slemko66949872006-07-15 01:52:39 +000050
Marc Slemko6f038a72006-08-03 18:58:09 +000051 PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +000052 pthread_(0),
53 state_(uninitialized),
54 policy_(policy),
55 priority_(priority),
56 stackSize_(stackSize) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000057
58 this->Thread::runnable(runnable);
59 }
Marc Slemko66949872006-07-15 01:52:39 +000060
Mark Sleef5f2be42006-09-05 21:05:31 +000061 ~PthreadThread() {}
Marc Slemko6f038a72006-08-03 18:58:09 +000062
Marc Slemko66949872006-07-15 01:52:39 +000063 void start() {
Mark Slee2f6404d2006-10-10 01:37:40 +000064 if (state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +000065 return;
66 }
67
Mark Slee2f6404d2006-10-10 01:37:40 +000068 state_ = starting;
Marc Slemko66949872006-07-15 01:52:39 +000069
70 pthread_attr_t thread_attr;
Aditya Agarwal9dc57402007-03-31 17:45:12 +000071 int ret = pthread_attr_init(&thread_attr);
Aditya Agarwal3f234da2007-04-01 01:19:57 +000072 assert(ret == 0);
Aditya Agarwal9dc57402007-03-31 17:45:12 +000073
74 ret = pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE);
Aditya Agarwal3f234da2007-04-01 01:19:57 +000075 assert(ret == 0);
Marc Slemko66949872006-07-15 01:52:39 +000076
77 // Set thread stack size
Aditya Agarwal9dc57402007-03-31 17:45:12 +000078 ret = pthread_attr_setstacksize(&thread_attr, MB * stackSize_);
Aditya Agarwal3f234da2007-04-01 01:19:57 +000079 assert(ret == 0);
Marc Slemko66949872006-07-15 01:52:39 +000080
81 // Set thread policy
Aditya Agarwal9dc57402007-03-31 17:45:12 +000082 ret = pthread_attr_setschedpolicy(&thread_attr, policy_);
Aditya Agarwal3f234da2007-04-01 01:19:57 +000083 assert(ret == 0);
Marc Slemko66949872006-07-15 01:52:39 +000084
85 struct sched_param sched_param;
Mark Slee2f6404d2006-10-10 01:37:40 +000086 sched_param.sched_priority = priority_;
Marc Slemko66949872006-07-15 01:52:39 +000087
88 // Set thread priority
Aditya Agarwal9dc57402007-03-31 17:45:12 +000089 ret = pthread_attr_setschedparam(&thread_attr, &sched_param);
Aditya Agarwal3f234da2007-04-01 01:19:57 +000090 assert(ret == 0);
Marc Slemko66949872006-07-15 01:52:39 +000091
Mark Sleef5f2be42006-09-05 21:05:31 +000092 // Create reference
Marc Slemko6f038a72006-08-03 18:58:09 +000093 shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
Mark Slee2f6404d2006-10-10 01:37:40 +000094 *selfRef = self_.lock();
Aditya Agarwal9dc57402007-03-31 17:45:12 +000095 ret = pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef);
Aditya Agarwal3f234da2007-04-01 01:19:57 +000096 assert(ret == 0);
Marc Slemko66949872006-07-15 01:52:39 +000097 }
98
99 void join() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000100 if (state_ != stopped) {
Marc Slemko66949872006-07-15 01:52:39 +0000101 void* ignore;
Mark Slee2f6404d2006-10-10 01:37:40 +0000102 pthread_join(pthread_, &ignore);
Marc Slemko66949872006-07-15 01:52:39 +0000103 }
104 }
105
Mark Sleef5f2be42006-09-05 21:05:31 +0000106 shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000107
Mark Sleef5f2be42006-09-05 21:05:31 +0000108 void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000109
Marc Slemko6f038a72006-08-03 18:58:09 +0000110 void weakRef(shared_ptr<PthreadThread> self) {
Aditya Agarwal3f234da2007-04-01 01:19:57 +0000111 assert(self.get() == this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000112 self_ = weak_ptr<PthreadThread>(self);
Marc Slemko6f038a72006-08-03 18:58:09 +0000113 }
Marc Slemko66949872006-07-15 01:52:39 +0000114};
115
Marc Slemko8a40a762006-07-19 17:46:50 +0000116void* PthreadThread::threadMain(void* arg) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000117 shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
Marc Slemko6f038a72006-08-03 18:58:09 +0000118 delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
119
Mark Sleef5f2be42006-09-05 21:05:31 +0000120 if (thread == NULL) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000121 return (void*)0;
122 }
123
Mark Slee2f6404d2006-10-10 01:37:40 +0000124 if (thread->state_ != starting) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000125 return (void*)0;
126 }
127
Mark Slee2f6404d2006-10-10 01:37:40 +0000128 thread->state_ = starting;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000129 thread->runnable()->run();
Mark Slee2f6404d2006-10-10 01:37:40 +0000130 if (thread->state_ != stopping && thread->state_ != stopped) {
131 thread->state_ = stopping;
Marc Slemko8a40a762006-07-19 17:46:50 +0000132 }
133
134 return (void*)0;
135}
136
Mark Sleef5f2be42006-09-05 21:05:31 +0000137/**
138 * POSIX Thread factory implementation
139 */
Marc Slemko66949872006-07-15 01:52:39 +0000140class PosixThreadFactory::Impl {
141
Mark Sleef5f2be42006-09-05 21:05:31 +0000142 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000143 POLICY policy_;
144 PRIORITY priority_;
145 int stackSize_;
146 bool detached_;
Marc Slemko66949872006-07-15 01:52:39 +0000147
Mark Sleef5f2be42006-09-05 21:05:31 +0000148 /**
149 * Converts generic posix thread schedule policy enums into pthread
150 * API values.
151 */
Marc Slemko66949872006-07-15 01:52:39 +0000152 static int toPthreadPolicy(POLICY policy) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000153 switch (policy) {
154 case OTHER:
155 return SCHED_OTHER;
156 case FIFO:
157 return SCHED_FIFO;
158 case ROUND_ROBIN:
159 return SCHED_RR;
Marc Slemko66949872006-07-15 01:52:39 +0000160 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000161 return SCHED_OTHER;
Marc Slemko66949872006-07-15 01:52:39 +0000162 }
163
Mark Sleef5f2be42006-09-05 21:05:31 +0000164 /**
165 * Converts relative thread priorities to absolute value based on posix
166 * thread scheduler policy
167 *
168 * The idea is simply to divide up the priority range for the given policy
169 * into the correpsonding relative priority level (lowest..highest) and
170 * then pro-rate accordingly.
171 */
Marc Slemko66949872006-07-15 01:52:39 +0000172 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
Marc Slemko66949872006-07-15 01:52:39 +0000173 int pthread_policy = toPthreadPolicy(policy);
Marc Slemko66949872006-07-15 01:52:39 +0000174 int min_priority = sched_get_priority_min(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000175 int max_priority = sched_get_priority_max(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000176 int quanta = (HIGHEST - LOWEST) + 1;
Marc Slemko66949872006-07-15 01:52:39 +0000177 float stepsperquanta = (max_priority - min_priority) / quanta;
178
Mark Slee29050782006-09-29 00:12:30 +0000179 if (priority <= HIGHEST) {
Marc Slemko66949872006-07-15 01:52:39 +0000180 return (int)(min_priority + stepsperquanta * priority);
181 } else {
Marc Slemko66949872006-07-15 01:52:39 +0000182 // should never get here for priority increments.
Marc Slemko66949872006-07-15 01:52:39 +0000183 assert(false);
Marc Slemko66949872006-07-15 01:52:39 +0000184 return (int)(min_priority + stepsperquanta * NORMAL);
185 }
186 }
187
Mark Sleef5f2be42006-09-05 21:05:31 +0000188 public:
Marc Slemko66949872006-07-15 01:52:39 +0000189
190 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000191 policy_(policy),
192 priority_(priority),
193 stackSize_(stackSize),
194 detached_(detached) {}
Marc Slemko66949872006-07-15 01:52:39 +0000195
Mark Sleef5f2be42006-09-05 21:05:31 +0000196 /**
197 * Creates a new POSIX thread to run the runnable object
198 *
199 * @param runnable A runnable object
200 */
Marc Slemko6f038a72006-08-03 18:58:09 +0000201 shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
Mark Slee2f6404d2006-10-10 01:37:40 +0000202 shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, runnable));
Marc Slemko6f038a72006-08-03 18:58:09 +0000203 result->weakRef(result);
204 runnable->thread(result);
205 return result;
Marc Slemko66949872006-07-15 01:52:39 +0000206 }
207
Mark Slee2f6404d2006-10-10 01:37:40 +0000208 int stackSize() const { return stackSize_; }
Marc Slemko66949872006-07-15 01:52:39 +0000209
Mark Slee2f6404d2006-10-10 01:37:40 +0000210 void stackSize(int value) { stackSize_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000211
Mark Slee2f6404d2006-10-10 01:37:40 +0000212 PRIORITY priority() const { return priority_; }
Marc Slemko66949872006-07-15 01:52:39 +0000213
Mark Sleef5f2be42006-09-05 21:05:31 +0000214 /**
215 * Sets priority.
216 *
217 * XXX
218 * Need to handle incremental priorities properly.
219 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000220 void priority(PRIORITY value) { priority_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000221};
222
223PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000224 impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
Marc Slemko66949872006-07-15 01:52:39 +0000225
Mark Slee2f6404d2006-10-10 01:37:40 +0000226shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
Marc Slemko66949872006-07-15 01:52:39 +0000227
Mark Slee2f6404d2006-10-10 01:37:40 +0000228int PosixThreadFactory::stackSize() const { return impl_->stackSize(); }
Marc Slemko66949872006-07-15 01:52:39 +0000229
Mark Slee2f6404d2006-10-10 01:37:40 +0000230void PosixThreadFactory::stackSize(int value) { impl_->stackSize(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000231
Mark Slee2f6404d2006-10-10 01:37:40 +0000232PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return impl_->priority(); }
Marc Slemko66949872006-07-15 01:52:39 +0000233
Mark Slee2f6404d2006-10-10 01:37:40 +0000234void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { impl_->priority(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000235
236}}} // facebook::thrift::concurrency