blob: a043418d5f1315c0755e5a602973cf44376cbfaa [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko66949872006-07-15 01:52:39 +00007#include "PosixThreadFactory.h"
8
9#include <assert.h>
10#include <pthread.h>
11
Marc Slemko6f038a72006-08-03 18:58:09 +000012#include <iostream>
13
14#include <boost/weak_ptr.hpp>
15
Marc Slemko66949872006-07-15 01:52:39 +000016namespace facebook { namespace thrift { namespace concurrency {
17
Marc Slemko6f038a72006-08-03 18:58:09 +000018using namespace boost;
19
Mark Sleef5f2be42006-09-05 21:05:31 +000020/**
21 * The POSIX thread class.
22 *
23 * @author marc
24 * @version $Id:$
25 */
Marc Slemko66949872006-07-15 01:52:39 +000026class PthreadThread: public Thread {
Mark Sleef5f2be42006-09-05 21:05:31 +000027 public:
Marc Slemko66949872006-07-15 01:52:39 +000028
Mark Sleef5f2be42006-09-05 21:05:31 +000029 enum STATE {
30 uninitialized,
31 starting,
32 started,
33 stopping,
34 stopped
Marc Slemko66949872006-07-15 01:52:39 +000035 };
36
37 static const int MB = 1024 * 1024;
38
Marc Slemko8a40a762006-07-19 17:46:50 +000039 static void* threadMain(void* arg);
40
Mark Sleef5f2be42006-09-05 21:05:31 +000041 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000042 pthread_t pthread_;
43 STATE state_;
44 int policy_;
45 int priority_;
46 int stackSize_;
47 weak_ptr<PthreadThread> self_;
Marc Slemko6f038a72006-08-03 18:58:09 +000048
Mark Sleef5f2be42006-09-05 21:05:31 +000049 public:
Marc Slemko66949872006-07-15 01:52:39 +000050
Marc Slemko6f038a72006-08-03 18:58:09 +000051 PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +000052 pthread_(0),
53 state_(uninitialized),
54 policy_(policy),
55 priority_(priority),
56 stackSize_(stackSize) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000057
58 this->Thread::runnable(runnable);
59 }
Marc Slemko66949872006-07-15 01:52:39 +000060
Mark Sleef5f2be42006-09-05 21:05:31 +000061 ~PthreadThread() {}
Marc Slemko6f038a72006-08-03 18:58:09 +000062
Marc Slemko66949872006-07-15 01:52:39 +000063 void start() {
Mark Slee2f6404d2006-10-10 01:37:40 +000064 if (state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +000065 return;
66 }
67
Mark Slee2f6404d2006-10-10 01:37:40 +000068 state_ = starting;
Marc Slemko66949872006-07-15 01:52:39 +000069
70 pthread_attr_t thread_attr;
Aditya Agarwal9dc57402007-03-31 17:45:12 +000071 int ret = pthread_attr_init(&thread_attr);
72 assert(ret);
73
74 ret = pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE);
75 assert(ret);
Marc Slemko66949872006-07-15 01:52:39 +000076
77 // Set thread stack size
Aditya Agarwal9dc57402007-03-31 17:45:12 +000078 ret = pthread_attr_setstacksize(&thread_attr, MB * stackSize_);
79 assert(ret);
Marc Slemko66949872006-07-15 01:52:39 +000080
81 // Set thread policy
Aditya Agarwal9dc57402007-03-31 17:45:12 +000082 ret = pthread_attr_setschedpolicy(&thread_attr, policy_);
83 assert(ret);
Marc Slemko66949872006-07-15 01:52:39 +000084
85 struct sched_param sched_param;
Mark Slee2f6404d2006-10-10 01:37:40 +000086 sched_param.sched_priority = priority_;
Marc Slemko66949872006-07-15 01:52:39 +000087
88 // Set thread priority
Aditya Agarwal9dc57402007-03-31 17:45:12 +000089 ret = pthread_attr_setschedparam(&thread_attr, &sched_param);
90 assert(ret);
Marc Slemko66949872006-07-15 01:52:39 +000091
Mark Sleef5f2be42006-09-05 21:05:31 +000092 // Create reference
Marc Slemko6f038a72006-08-03 18:58:09 +000093 shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
Mark Slee2f6404d2006-10-10 01:37:40 +000094 *selfRef = self_.lock();
Aditya Agarwal9dc57402007-03-31 17:45:12 +000095 ret = pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef);
96 assert(ret);
Marc Slemko66949872006-07-15 01:52:39 +000097 }
98
99 void join() {
Mark Slee2f6404d2006-10-10 01:37:40 +0000100 if (state_ != stopped) {
Marc Slemko66949872006-07-15 01:52:39 +0000101 void* ignore;
Mark Slee2f6404d2006-10-10 01:37:40 +0000102 pthread_join(pthread_, &ignore);
Marc Slemko66949872006-07-15 01:52:39 +0000103 }
104 }
105
Mark Sleef5f2be42006-09-05 21:05:31 +0000106 shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000107
Mark Sleef5f2be42006-09-05 21:05:31 +0000108 void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000109
Marc Slemko6f038a72006-08-03 18:58:09 +0000110 void weakRef(shared_ptr<PthreadThread> self) {
Aditya Agarwal9dc57402007-03-31 17:45:12 +0000111 bool ret = (self.get() == this);
112 assert(ret);
Mark Slee2f6404d2006-10-10 01:37:40 +0000113 self_ = weak_ptr<PthreadThread>(self);
Marc Slemko6f038a72006-08-03 18:58:09 +0000114 }
Marc Slemko66949872006-07-15 01:52:39 +0000115};
116
Marc Slemko8a40a762006-07-19 17:46:50 +0000117void* PthreadThread::threadMain(void* arg) {
118 // XXX need a lock here when testing thread state
Marc Slemko6f038a72006-08-03 18:58:09 +0000119 shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
Marc Slemko6f038a72006-08-03 18:58:09 +0000120 delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
121
Mark Sleef5f2be42006-09-05 21:05:31 +0000122 if (thread == NULL) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000123 return (void*)0;
124 }
125
Mark Slee2f6404d2006-10-10 01:37:40 +0000126 if (thread->state_ != starting) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000127 return (void*)0;
128 }
129
Mark Slee2f6404d2006-10-10 01:37:40 +0000130 thread->state_ = starting;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000131 thread->runnable()->run();
Mark Slee2f6404d2006-10-10 01:37:40 +0000132 if (thread->state_ != stopping && thread->state_ != stopped) {
133 thread->state_ = stopping;
Marc Slemko8a40a762006-07-19 17:46:50 +0000134 }
135
136 return (void*)0;
137}
138
Mark Sleef5f2be42006-09-05 21:05:31 +0000139/**
140 * POSIX Thread factory implementation
141 */
Marc Slemko66949872006-07-15 01:52:39 +0000142class PosixThreadFactory::Impl {
143
Mark Sleef5f2be42006-09-05 21:05:31 +0000144 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000145 POLICY policy_;
146 PRIORITY priority_;
147 int stackSize_;
148 bool detached_;
Marc Slemko66949872006-07-15 01:52:39 +0000149
Mark Sleef5f2be42006-09-05 21:05:31 +0000150 /**
151 * Converts generic posix thread schedule policy enums into pthread
152 * API values.
153 */
Marc Slemko66949872006-07-15 01:52:39 +0000154 static int toPthreadPolicy(POLICY policy) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000155 switch (policy) {
156 case OTHER:
157 return SCHED_OTHER;
158 case FIFO:
159 return SCHED_FIFO;
160 case ROUND_ROBIN:
161 return SCHED_RR;
Marc Slemko66949872006-07-15 01:52:39 +0000162 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000163 return SCHED_OTHER;
Marc Slemko66949872006-07-15 01:52:39 +0000164 }
165
Mark Sleef5f2be42006-09-05 21:05:31 +0000166 /**
167 * Converts relative thread priorities to absolute value based on posix
168 * thread scheduler policy
169 *
170 * The idea is simply to divide up the priority range for the given policy
171 * into the correpsonding relative priority level (lowest..highest) and
172 * then pro-rate accordingly.
173 */
Marc Slemko66949872006-07-15 01:52:39 +0000174 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
Marc Slemko66949872006-07-15 01:52:39 +0000175 int pthread_policy = toPthreadPolicy(policy);
Marc Slemko66949872006-07-15 01:52:39 +0000176 int min_priority = sched_get_priority_min(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000177 int max_priority = sched_get_priority_max(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000178 int quanta = (HIGHEST - LOWEST) + 1;
Marc Slemko66949872006-07-15 01:52:39 +0000179 float stepsperquanta = (max_priority - min_priority) / quanta;
180
Mark Slee29050782006-09-29 00:12:30 +0000181 if (priority <= HIGHEST) {
Marc Slemko66949872006-07-15 01:52:39 +0000182 return (int)(min_priority + stepsperquanta * priority);
183 } else {
Marc Slemko66949872006-07-15 01:52:39 +0000184 // should never get here for priority increments.
Marc Slemko66949872006-07-15 01:52:39 +0000185 assert(false);
Marc Slemko66949872006-07-15 01:52:39 +0000186 return (int)(min_priority + stepsperquanta * NORMAL);
187 }
188 }
189
Mark Sleef5f2be42006-09-05 21:05:31 +0000190 public:
Marc Slemko66949872006-07-15 01:52:39 +0000191
192 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000193 policy_(policy),
194 priority_(priority),
195 stackSize_(stackSize),
196 detached_(detached) {}
Marc Slemko66949872006-07-15 01:52:39 +0000197
Mark Sleef5f2be42006-09-05 21:05:31 +0000198 /**
199 * Creates a new POSIX thread to run the runnable object
200 *
201 * @param runnable A runnable object
202 */
Marc Slemko6f038a72006-08-03 18:58:09 +0000203 shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
Mark Slee2f6404d2006-10-10 01:37:40 +0000204 shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, runnable));
Marc Slemko6f038a72006-08-03 18:58:09 +0000205 result->weakRef(result);
206 runnable->thread(result);
207 return result;
Marc Slemko66949872006-07-15 01:52:39 +0000208 }
209
Mark Slee2f6404d2006-10-10 01:37:40 +0000210 int stackSize() const { return stackSize_; }
Marc Slemko66949872006-07-15 01:52:39 +0000211
Mark Slee2f6404d2006-10-10 01:37:40 +0000212 void stackSize(int value) { stackSize_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000213
Mark Slee2f6404d2006-10-10 01:37:40 +0000214 PRIORITY priority() const { return priority_; }
Marc Slemko66949872006-07-15 01:52:39 +0000215
Mark Sleef5f2be42006-09-05 21:05:31 +0000216 /**
217 * Sets priority.
218 *
219 * XXX
220 * Need to handle incremental priorities properly.
221 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000222 void priority(PRIORITY value) { priority_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000223};
224
225PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000226 impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
Marc Slemko66949872006-07-15 01:52:39 +0000227
Mark Slee2f6404d2006-10-10 01:37:40 +0000228shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
Marc Slemko66949872006-07-15 01:52:39 +0000229
Mark Slee2f6404d2006-10-10 01:37:40 +0000230int PosixThreadFactory::stackSize() const { return impl_->stackSize(); }
Marc Slemko66949872006-07-15 01:52:39 +0000231
Mark Slee2f6404d2006-10-10 01:37:40 +0000232void PosixThreadFactory::stackSize(int value) { impl_->stackSize(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000233
Mark Slee2f6404d2006-10-10 01:37:40 +0000234PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return impl_->priority(); }
Marc Slemko66949872006-07-15 01:52:39 +0000235
Mark Slee2f6404d2006-10-10 01:37:40 +0000236void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { impl_->priority(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000237
238}}} // facebook::thrift::concurrency