| Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 1 | #include "PosixThreadFactory.h" | 
 | 2 |  | 
 | 3 | #include <assert.h> | 
 | 4 | #include <pthread.h> | 
 | 5 |  | 
 | 6 | namespace facebook { namespace thrift { namespace concurrency { | 
 | 7 |  | 
| Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame^] | 8 | /**  The POSIX thread class.  | 
 | 9 |  | 
 | 10 |      @author marc | 
 | 11 |      @version $Id:$ */ | 
| Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 12 |  | 
 | 13 | class PthreadThread: public Thread { | 
 | 14 |  | 
 | 15 | public: | 
 | 16 |   enum STATE {uninitialized,  | 
 | 17 | 	      starting, | 
 | 18 | 	      started, | 
 | 19 | 	      stopping, | 
 | 20 | 	      stopped | 
 | 21 |   }; | 
 | 22 |  | 
 | 23 |   static const int MB = 1024 * 1024; | 
 | 24 |  | 
 | 25 | private: | 
 | 26 |  | 
 | 27 |   pthread_t _pthread; | 
 | 28 |  | 
 | 29 |   STATE _state; | 
 | 30 |  | 
 | 31 |   int _policy; | 
 | 32 |  | 
 | 33 |   int _priority; | 
 | 34 |  | 
 | 35 |   int _stackSize; | 
 | 36 |  | 
 | 37 |   Runnable* _runnable; | 
 | 38 |  | 
 | 39 |   static void* threadMain(void* arg) { | 
 | 40 |  | 
 | 41 |     // XXX need a lock here when testing thread state | 
 | 42 |  | 
 | 43 |     PthreadThread* thread = (PthreadThread*)arg; | 
 | 44 |  | 
 | 45 |     if(thread->_state != starting) { | 
 | 46 |       return (void*)0; | 
 | 47 |     } | 
 | 48 |  | 
 | 49 |     thread->_state = starting; | 
 | 50 |  | 
 | 51 |     thread->_runnable->run(); | 
 | 52 |  | 
 | 53 |     if(thread->_state != stopping && thread->_state != stopped) { | 
 | 54 |       thread->_state = stopping; | 
 | 55 |     } | 
 | 56 |      | 
 | 57 |     return (void*)0; | 
 | 58 |   } | 
 | 59 |  | 
 | 60 | public: | 
 | 61 |    | 
 | 62 |   PthreadThread(int policy, int priority, int stackSize, Runnable* runnable) :  | 
 | 63 |     _pthread(0), | 
 | 64 |     _state(uninitialized),  | 
 | 65 |     _policy(policy), | 
 | 66 |     _priority(priority), | 
 | 67 |     _stackSize(stackSize), | 
 | 68 |     _runnable(runnable) | 
 | 69 |   {} | 
 | 70 |  | 
 | 71 |   void start() { | 
 | 72 |  | 
 | 73 |     if(_state != uninitialized) { | 
 | 74 |       return; | 
 | 75 |     } | 
 | 76 |  | 
 | 77 |     _state = starting; | 
 | 78 |  | 
 | 79 |     pthread_attr_t thread_attr; | 
 | 80 |  | 
 | 81 |     assert(pthread_attr_init(&thread_attr) == 0); | 
 | 82 |  | 
 | 83 |     assert(pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE) == 0); | 
 | 84 |  | 
 | 85 |     // Set thread stack size | 
 | 86 |  | 
 | 87 |     assert(pthread_attr_setstacksize(&thread_attr, MB * _stackSize) == 0); | 
 | 88 |  | 
 | 89 |     // Set thread policy | 
 | 90 |  | 
 | 91 |     assert(pthread_attr_setschedpolicy(&thread_attr, _policy) == 0); | 
 | 92 |  | 
 | 93 |     struct sched_param sched_param; | 
 | 94 |     sched_param.sched_priority = _priority; | 
 | 95 |  | 
 | 96 |     // Set thread priority | 
 | 97 |  | 
 | 98 |     assert(pthread_attr_setschedparam(&thread_attr, &sched_param) == 0); | 
 | 99 |  | 
 | 100 |     assert(pthread_create(&_pthread, &thread_attr, PthreadThread::threadMain, (void*)this) == 0); | 
 | 101 |   } | 
 | 102 |  | 
 | 103 |   void join() { | 
 | 104 |      | 
 | 105 |     if(_state != stopped) { | 
 | 106 |        | 
 | 107 |       void* ignore; | 
 | 108 |        | 
 | 109 |       pthread_join(_pthread, &ignore); | 
 | 110 |     } | 
 | 111 |   } | 
 | 112 |  | 
 | 113 |   const Runnable* runnable() const {return _runnable;} | 
 | 114 |  | 
 | 115 | }; | 
 | 116 |  | 
 | 117 | /** POSIX Thread factory implementation */ | 
 | 118 |  | 
 | 119 | class PosixThreadFactory::Impl { | 
 | 120 |  | 
 | 121 | private: | 
 | 122 |  | 
 | 123 |   POLICY _policy; | 
 | 124 |  | 
 | 125 |   PRIORITY _priority; | 
 | 126 |  | 
 | 127 |   int _stackSize; | 
 | 128 |  | 
 | 129 |   bool _detached; | 
 | 130 |  | 
 | 131 |   /** Converts generic posix thread schedule policy enums into pthread API values. */ | 
 | 132 |  | 
 | 133 |   static int toPthreadPolicy(POLICY policy) { | 
 | 134 |     switch(policy) { | 
 | 135 |     case OTHER: return SCHED_OTHER; break; | 
 | 136 |     case FIFO: return SCHED_FIFO; break; | 
 | 137 |     case ROUND_ROBIN: return SCHED_RR; break; | 
 | 138 |     default: return SCHED_OTHER; break; | 
 | 139 |     } | 
 | 140 |   } | 
 | 141 |  | 
 | 142 |   /** Converts relative thread priorities to absolute value based on posix thread scheduler policy | 
 | 143 |  | 
 | 144 |       The idea is simply to divide up the priority range for the given policy into the correpsonding relative | 
| Marc Slemko | 0e53ccd | 2006-07-17 23:51:05 +0000 | [diff] [blame^] | 145 |       priority level (lowest..highest) and then pro-rate accordingly. */ | 
| Marc Slemko | 6694987 | 2006-07-15 01:52:39 +0000 | [diff] [blame] | 146 |  | 
 | 147 |   static int toPthreadPriority(POLICY policy, PRIORITY priority) { | 
 | 148 |  | 
 | 149 |     int pthread_policy = toPthreadPolicy(policy); | 
 | 150 |  | 
 | 151 |     int min_priority = sched_get_priority_min(pthread_policy); | 
 | 152 |  | 
 | 153 |     int max_priority = sched_get_priority_max(pthread_policy); | 
 | 154 |  | 
 | 155 |     int quanta = (HIGHEST - LOWEST) + 1; | 
 | 156 |  | 
 | 157 |     float stepsperquanta = (max_priority - min_priority) / quanta; | 
 | 158 |  | 
 | 159 |     if(priority <= HIGHEST) { | 
 | 160 |  | 
 | 161 |       return (int)(min_priority + stepsperquanta * priority); | 
 | 162 |     } else { | 
 | 163 |  | 
 | 164 |       // should never get here for priority increments. | 
 | 165 |  | 
 | 166 |       assert(false); | 
 | 167 |  | 
 | 168 |       return (int)(min_priority + stepsperquanta * NORMAL); | 
 | 169 |     } | 
 | 170 |   } | 
 | 171 |  | 
 | 172 | public: | 
 | 173 |  | 
 | 174 |   Impl(POLICY policy, PRIORITY priority, int stackSize, bool detached) :  | 
 | 175 |     _policy(policy), | 
 | 176 |     _priority(priority), | 
 | 177 |     _stackSize(stackSize), | 
 | 178 |     _detached(detached) { | 
 | 179 |   } | 
 | 180 |  | 
 | 181 |   /** Creates a new POSIX thread to run the runnable object  | 
 | 182 |  | 
 | 183 |       @param runnable A runnable object */ | 
 | 184 |  | 
 | 185 |   Thread* newThread(Runnable* runnable) const { | 
 | 186 |  | 
 | 187 |     return new PthreadThread(toPthreadPolicy(_policy), toPthreadPriority(_policy, _priority), _stackSize, runnable); | 
 | 188 |   } | 
 | 189 |  | 
 | 190 |   int stackSize() const { return _stackSize;} | 
 | 191 |  | 
 | 192 |   void stackSize(int value) { _stackSize = value;} | 
 | 193 |  | 
 | 194 |   PRIORITY priority() const { return _priority;} | 
 | 195 |  | 
 | 196 |   /** Sets priority. | 
 | 197 |        | 
 | 198 |       XXX | 
 | 199 |       Need to handle incremental priorities properl. */ | 
 | 200 |  | 
 | 201 |   void priority(PRIORITY value) { _priority = value;} | 
 | 202 |  | 
 | 203 | }; | 
 | 204 |  | 
 | 205 | PosixThreadFactory::PosixThreadFactory(POLICY policy, PRIORITY priority, int stackSize, bool detached) :  | 
 | 206 |   _impl(new PosixThreadFactory::Impl(policy, priority, stackSize, detached)) {} | 
 | 207 |  | 
 | 208 | Thread* PosixThreadFactory::newThread(Runnable* runnable) const {return _impl->newThread(runnable);} | 
 | 209 |  | 
 | 210 | int PosixThreadFactory::stackSize() const {return _impl->stackSize();} | 
 | 211 |  | 
 | 212 | void PosixThreadFactory::stackSize(int value) {_impl->stackSize(value);} | 
 | 213 |  | 
 | 214 | PosixThreadFactory::PRIORITY PosixThreadFactory::priority() const {return _impl->priority();} | 
 | 215 |  | 
 | 216 | void PosixThreadFactory::priority(PosixThreadFactory::PRIORITY value) {_impl->priority(value);} | 
 | 217 |  | 
 | 218 | }}} // facebook::thrift::concurrency |