blob: d5d0d067bad47b0e80f5e9808d82bdca2dd3dd77 [file] [log] [blame]
Mark Slee9f0c6512007-02-28 23:58:26 +00001// Copyright (c) 2006- Facebook
2// Distributed under the Thrift Software License
3//
4// See accompanying file LICENSE or visit the Thrift site at:
5// http://developers.facebook.com/thrift/
6
Marc Slemko66949872006-07-15 01:52:39 +00007#include "PosixThreadFactory.h"
8
9#include <assert.h>
10#include <pthread.h>
11
Marc Slemko6f038a72006-08-03 18:58:09 +000012#include <iostream>
13
14#include <boost/weak_ptr.hpp>
15
Marc Slemko66949872006-07-15 01:52:39 +000016namespace facebook { namespace thrift { namespace concurrency {
17
Marc Slemko6f038a72006-08-03 18:58:09 +000018using namespace boost;
19
Mark Sleef5f2be42006-09-05 21:05:31 +000020/**
21 * The POSIX thread class.
22 *
23 * @author marc
24 * @version $Id:$
25 */
Marc Slemko66949872006-07-15 01:52:39 +000026class PthreadThread: public Thread {
Mark Sleef5f2be42006-09-05 21:05:31 +000027 public:
Marc Slemko66949872006-07-15 01:52:39 +000028
Mark Sleef5f2be42006-09-05 21:05:31 +000029 enum STATE {
30 uninitialized,
31 starting,
32 started,
33 stopping,
34 stopped
Marc Slemko66949872006-07-15 01:52:39 +000035 };
36
37 static const int MB = 1024 * 1024;
38
Marc Slemko8a40a762006-07-19 17:46:50 +000039 static void* threadMain(void* arg);
40
Mark Sleef5f2be42006-09-05 21:05:31 +000041 private:
Mark Slee2f6404d2006-10-10 01:37:40 +000042 pthread_t pthread_;
43 STATE state_;
44 int policy_;
45 int priority_;
46 int stackSize_;
47 weak_ptr<PthreadThread> self_;
Marc Slemko6f038a72006-08-03 18:58:09 +000048
Mark Sleef5f2be42006-09-05 21:05:31 +000049 public:
Marc Slemko66949872006-07-15 01:52:39 +000050
Marc Slemko6f038a72006-08-03 18:58:09 +000051 PthreadThread(int policy, int priority, int stackSize, shared_ptr<Runnable> runnable) :
Mark Slee2f6404d2006-10-10 01:37:40 +000052 pthread_(0),
53 state_(uninitialized),
54 policy_(policy),
55 priority_(priority),
56 stackSize_(stackSize) {
Marc Slemkofe5ba12e2006-07-20 21:16:27 +000057
58 this->Thread::runnable(runnable);
59 }
Marc Slemko66949872006-07-15 01:52:39 +000060
Mark Sleef5f2be42006-09-05 21:05:31 +000061 ~PthreadThread() {}
Marc Slemko6f038a72006-08-03 18:58:09 +000062
Marc Slemko66949872006-07-15 01:52:39 +000063 void start() {
Mark Slee2f6404d2006-10-10 01:37:40 +000064 if (state_ != uninitialized) {
Marc Slemko66949872006-07-15 01:52:39 +000065 return;
66 }
67
Mark Slee2f6404d2006-10-10 01:37:40 +000068 state_ = starting;
Marc Slemko66949872006-07-15 01:52:39 +000069
70 pthread_attr_t thread_attr;
Marc Slemko66949872006-07-15 01:52:39 +000071 assert(pthread_attr_init(&thread_attr) == 0);
Marc Slemko66949872006-07-15 01:52:39 +000072 assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0);
73
74 // Set thread stack size
Mark Slee2f6404d2006-10-10 01:37:40 +000075 assert(pthread_attr_setstacksize(&thread_attr, MB * stackSize_) == 0);
Marc Slemko66949872006-07-15 01:52:39 +000076
77 // Set thread policy
Mark Slee2f6404d2006-10-10 01:37:40 +000078 assert(pthread_attr_setschedpolicy(&thread_attr, policy_) == 0);
Marc Slemko66949872006-07-15 01:52:39 +000079
80 struct sched_param sched_param;
Mark Slee2f6404d2006-10-10 01:37:40 +000081 sched_param.sched_priority = priority_;
Marc Slemko66949872006-07-15 01:52:39 +000082
83 // Set thread priority
Marc Slemko6f038a72006-08-03 18:58:09 +000084 assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0);
Marc Slemko66949872006-07-15 01:52:39 +000085
Mark Sleef5f2be42006-09-05 21:05:31 +000086 // Create reference
Marc Slemko6f038a72006-08-03 18:58:09 +000087 shared_ptr<PthreadThread>* selfRef = new shared_ptr<PthreadThread>();
Mark Slee2f6404d2006-10-10 01:37:40 +000088 *selfRef = self_.lock();
89 assert(pthread_create(&pthread_, &thread_attr, threadMain, (void*)selfRef) == 0);
Marc Slemko66949872006-07-15 01:52:39 +000090 }
91
92 void join() {
Mark Slee2f6404d2006-10-10 01:37:40 +000093 if (state_ != stopped) {
Marc Slemko66949872006-07-15 01:52:39 +000094 void* ignore;
Mark Slee2f6404d2006-10-10 01:37:40 +000095 pthread_join(pthread_, &ignore);
Marc Slemko66949872006-07-15 01:52:39 +000096 }
97 }
98
Mark Sleef5f2be42006-09-05 21:05:31 +000099 shared_ptr<Runnable> runnable() const { return Thread::runnable(); }
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000100
Mark Sleef5f2be42006-09-05 21:05:31 +0000101 void runnable(shared_ptr<Runnable> value) { Thread::runnable(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000102
Marc Slemko6f038a72006-08-03 18:58:09 +0000103 void weakRef(shared_ptr<PthreadThread> self) {
104 assert(self.get() == this);
Mark Slee2f6404d2006-10-10 01:37:40 +0000105 self_ = weak_ptr<PthreadThread>(self);
Marc Slemko6f038a72006-08-03 18:58:09 +0000106 }
Marc Slemko66949872006-07-15 01:52:39 +0000107};
108
Marc Slemko8a40a762006-07-19 17:46:50 +0000109void* PthreadThread::threadMain(void* arg) {
110 // XXX need a lock here when testing thread state
Marc Slemko6f038a72006-08-03 18:58:09 +0000111 shared_ptr<PthreadThread> thread = *(shared_ptr<PthreadThread>*)arg;
Marc Slemko6f038a72006-08-03 18:58:09 +0000112 delete reinterpret_cast<shared_ptr<PthreadThread>*>(arg);
113
Mark Sleef5f2be42006-09-05 21:05:31 +0000114 if (thread == NULL) {
Marc Slemko6f038a72006-08-03 18:58:09 +0000115 return (void*)0;
116 }
117
Mark Slee2f6404d2006-10-10 01:37:40 +0000118 if (thread->state_ != starting) {
Marc Slemko8a40a762006-07-19 17:46:50 +0000119 return (void*)0;
120 }
121
Mark Slee2f6404d2006-10-10 01:37:40 +0000122 thread->state_ = starting;
Marc Slemkofe5ba12e2006-07-20 21:16:27 +0000123 thread->runnable()->run();
Mark Slee2f6404d2006-10-10 01:37:40 +0000124 if (thread->state_ != stopping && thread->state_ != stopped) {
125 thread->state_ = stopping;
Marc Slemko8a40a762006-07-19 17:46:50 +0000126 }
127
128 return (void*)0;
129}
130
Mark Sleef5f2be42006-09-05 21:05:31 +0000131/**
132 * POSIX Thread factory implementation
133 */
Marc Slemko66949872006-07-15 01:52:39 +0000134class PosixThreadFactory::Impl {
135
Mark Sleef5f2be42006-09-05 21:05:31 +0000136 private:
Mark Slee2f6404d2006-10-10 01:37:40 +0000137 POLICY policy_;
138 PRIORITY priority_;
139 int stackSize_;
140 bool detached_;
Marc Slemko66949872006-07-15 01:52:39 +0000141
Mark Sleef5f2be42006-09-05 21:05:31 +0000142 /**
143 * Converts generic posix thread schedule policy enums into pthread
144 * API values.
145 */
Marc Slemko66949872006-07-15 01:52:39 +0000146 static int toPthreadPolicy(POLICY policy) {
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000147 switch (policy) {
148 case OTHER:
149 return SCHED_OTHER;
150 case FIFO:
151 return SCHED_FIFO;
152 case ROUND_ROBIN:
153 return SCHED_RR;
Marc Slemko66949872006-07-15 01:52:39 +0000154 }
Mark Sleeb9ff32a2006-11-16 01:00:24 +0000155 return SCHED_OTHER;
Marc Slemko66949872006-07-15 01:52:39 +0000156 }
157
Mark Sleef5f2be42006-09-05 21:05:31 +0000158 /**
159 * Converts relative thread priorities to absolute value based on posix
160 * thread scheduler policy
161 *
162 * The idea is simply to divide up the priority range for the given policy
163 * into the correpsonding relative priority level (lowest..highest) and
164 * then pro-rate accordingly.
165 */
Marc Slemko66949872006-07-15 01:52:39 +0000166 static int toPthreadPriority(POLICY policy, PRIORITY priority) {
Marc Slemko66949872006-07-15 01:52:39 +0000167 int pthread_policy = toPthreadPolicy(policy);
Marc Slemko66949872006-07-15 01:52:39 +0000168 int min_priority = sched_get_priority_min(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000169 int max_priority = sched_get_priority_max(pthread_policy);
Marc Slemko66949872006-07-15 01:52:39 +0000170 int quanta = (HIGHEST - LOWEST) + 1;
Marc Slemko66949872006-07-15 01:52:39 +0000171 float stepsperquanta = (max_priority - min_priority) / quanta;
172
Mark Slee29050782006-09-29 00:12:30 +0000173 if (priority <= HIGHEST) {
Marc Slemko66949872006-07-15 01:52:39 +0000174 return (int)(min_priority + stepsperquanta * priority);
175 } else {
Marc Slemko66949872006-07-15 01:52:39 +0000176 // should never get here for priority increments.
Marc Slemko66949872006-07-15 01:52:39 +0000177 assert(false);
Marc Slemko66949872006-07-15 01:52:39 +0000178 return (int)(min_priority + stepsperquanta * NORMAL);
179 }
180 }
181
Mark Sleef5f2be42006-09-05 21:05:31 +0000182 public:
Marc Slemko66949872006-07-15 01:52:39 +0000183
184 Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000185 policy_(policy),
186 priority_(priority),
187 stackSize_(stackSize),
188 detached_(detached) {}
Marc Slemko66949872006-07-15 01:52:39 +0000189
Mark Sleef5f2be42006-09-05 21:05:31 +0000190 /**
191 * Creates a new POSIX thread to run the runnable object
192 *
193 * @param runnable A runnable object
194 */
Marc Slemko6f038a72006-08-03 18:58:09 +0000195 shared_ptr<Thread> newThread(shared_ptr<Runnable> runnable) const {
Mark Slee2f6404d2006-10-10 01:37:40 +0000196 shared_ptr<PthreadThread> result = shared_ptr<PthreadThread>(new PthreadThread(toPthreadPolicy(policy_), toPthreadPriority(policy_, priority_), stackSize_, runnable));
Marc Slemko6f038a72006-08-03 18:58:09 +0000197 result->weakRef(result);
198 runnable->thread(result);
199 return result;
Marc Slemko66949872006-07-15 01:52:39 +0000200 }
201
Mark Slee2f6404d2006-10-10 01:37:40 +0000202 int stackSize() const { return stackSize_; }
Marc Slemko66949872006-07-15 01:52:39 +0000203
Mark Slee2f6404d2006-10-10 01:37:40 +0000204 void stackSize(int value) { stackSize_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000205
Mark Slee2f6404d2006-10-10 01:37:40 +0000206 PRIORITY priority() const { return priority_; }
Marc Slemko66949872006-07-15 01:52:39 +0000207
Mark Sleef5f2be42006-09-05 21:05:31 +0000208 /**
209 * Sets priority.
210 *
211 * XXX
212 * Need to handle incremental priorities properly.
213 */
Mark Slee2f6404d2006-10-10 01:37:40 +0000214 void priority(PRIORITY value) { priority_ = value; }
Marc Slemko66949872006-07-15 01:52:39 +0000215};
216
217PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :
Mark Slee2f6404d2006-10-10 01:37:40 +0000218 impl_(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {}
Marc Slemko66949872006-07-15 01:52:39 +0000219
Mark Slee2f6404d2006-10-10 01:37:40 +0000220shared_ptr<Thread> PosixThreadFactory::newThread(shared_ptr<Runnable> runnable) const { return impl_->newThread(runnable); }
Marc Slemko66949872006-07-15 01:52:39 +0000221
Mark Slee2f6404d2006-10-10 01:37:40 +0000222int PosixThreadFactory::stackSize() const { return impl_->stackSize(); }
Marc Slemko66949872006-07-15 01:52:39 +0000223
Mark Slee2f6404d2006-10-10 01:37:40 +0000224void PosixThreadFactory::stackSize(int value) { impl_->stackSize(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000225
Mark Slee2f6404d2006-10-10 01:37:40 +0000226PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const { return impl_->priority(); }
Marc Slemko66949872006-07-15 01:52:39 +0000227
Mark Slee2f6404d2006-10-10 01:37:40 +0000228void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) { impl_->priority(value); }
Marc Slemko66949872006-07-15 01:52:39 +0000229
230}}} // facebook::thrift::concurrency