David Reiss | 2324871 | 2010-10-06 17:10:08 +0000 | [diff] [blame] | 1 | /* |
| 2 | * Licensed to the Apache Software Foundation (ASF) under one |
| 3 | * or more contributor license agreements. See the NOTICE file |
| 4 | * distributed with this work for additional information |
| 5 | * regarding copyright ownership. The ASF licenses this file |
| 6 | * to you under the Apache License, Version 2.0 (the |
| 7 | * "License"); you may not use this file except in compliance |
| 8 | * with the License. You may obtain a copy of the License at |
| 9 | * |
| 10 | * http://www.apache.org/licenses/LICENSE-2.0 |
| 11 | * |
| 12 | * Unless required by applicable law or agreed to in writing, |
| 13 | * software distributed under the License is distributed on an |
| 14 | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| 15 | * KIND, either express or implied. See the License for the |
| 16 | * specific language governing permissions and limitations |
| 17 | * under the License. |
| 18 | */ |
| 19 | |
| 20 | #ifndef _FACEBOOK_THRIFT_SERVER_TCLIENTINFO_H_ |
| 21 | #define _FACEBOOK_THRIFT_SERVER_TCLIENTINFO_H_ 1 |
| 22 | |
| 23 | // for inet_ntop -- |
| 24 | #include <arpa/inet.h> |
| 25 | #include <server/TServer.h> |
| 26 | #include <transport/TSocket.h> |
| 27 | #include <concurrency/Mutex.h> |
| 28 | |
| 29 | namespace apache { namespace thrift { namespace server { |
| 30 | |
| 31 | using namespace apache::thrift; |
| 32 | using namespace apache::thrift::transport; |
| 33 | using namespace apache::thrift::concurrency; |
| 34 | using boost::shared_ptr; |
| 35 | using std::string; |
| 36 | using std::vector; |
| 37 | |
| 38 | /** |
| 39 | * StableVector -- a minimal vector class where growth is automatic and |
| 40 | * vector elements never move as the vector grows. Allocates new space |
| 41 | * as needed, but does not copy old values. |
| 42 | * |
| 43 | * A level vector stores a list of storage vectors containing the actual |
| 44 | * elements. Levels are added as needed, doubling in size each time. |
| 45 | * Locking is only done when a level is added. Access is amortized |
| 46 | * constant time. |
| 47 | */ |
| 48 | template <typename T> |
| 49 | class StableVector { |
| 50 | /// The initial allocation as an exponent of 2 |
| 51 | static const uint32_t kInitialSizePowOf2 = 10; |
| 52 | /// The initial allocation size |
| 53 | static const uint32_t kInitialVectorSize = 1 << kInitialSizePowOf2; |
| 54 | /// This bound is guaranteed not to be exceeded on 64-bit archs |
| 55 | static const int kMaxLevels = 64; |
| 56 | |
| 57 | /// Values are kept in one or more of these |
| 58 | typedef vector<T> Vect; |
| 59 | /// One or more value vectors are kept in one of these |
| 60 | typedef vector<Vect*> LevelVector; |
| 61 | |
| 62 | Mutex mutex_; |
| 63 | /// current size |
| 64 | size_t size_; |
| 65 | _Atomic_word vectLvl_; |
| 66 | LevelVector vects_; |
| 67 | |
| 68 | public: |
| 69 | /** |
| 70 | * Constructor -- initialize the level vector and allocate the |
| 71 | * initial storage vector |
| 72 | */ |
| 73 | StableVector() |
| 74 | : size_(0) |
| 75 | , vectLvl_(0) { |
| 76 | vects_.reserve(kMaxLevels); |
| 77 | Vect* storageVector(new Vect(1 << kInitialSizePowOf2)); |
| 78 | vects_.push_back(storageVector); |
| 79 | } |
| 80 | |
| 81 | private: |
| 82 | /** |
| 83 | * make sure the requested number of storage levels have been allocated. |
| 84 | */ |
| 85 | void expand(uint32_t level) { |
| 86 | // we need the guard to insure that we only allocate once. |
| 87 | Guard g(mutex_); |
| 88 | while (level > vectLvl_) { |
| 89 | Vect* levelVect(new Vect(1 << (vectLvl_ + kInitialSizePowOf2))); |
| 90 | vects_.push_back(levelVect); |
| 91 | // we need to make sure this is done after levelVect is inserted |
| 92 | // (what we want is effectively a memory barrier here). |
| 93 | __gnu_cxx::__atomic_add(&vectLvl_, 1); |
| 94 | } |
| 95 | } |
| 96 | |
| 97 | /** |
| 98 | * Given an index, determine which level and element of that level is |
| 99 | * required. Grows if needed. |
| 100 | */ |
| 101 | void which(uint32_t n, uint32_t* vno, uint32_t* idx) { |
| 102 | if (n >= size_) { |
| 103 | size_ = n + 1; |
| 104 | } |
| 105 | if (n < kInitialVectorSize) { |
| 106 | *idx = n; |
| 107 | *vno = 0; |
| 108 | } else { |
| 109 | uint32_t upper = n >> kInitialSizePowOf2; |
| 110 | *vno = CHAR_BIT*sizeof(upper) - __builtin_clz(upper); |
| 111 | *idx = n - (1 << (*vno + kInitialSizePowOf2 - 1)); |
| 112 | if (*vno > vectLvl_) { |
| 113 | expand(*vno); |
| 114 | } |
| 115 | } |
| 116 | } |
| 117 | |
| 118 | public: |
| 119 | /** |
| 120 | * Given an index, return a reference to that element, perhaps after |
| 121 | * allocating additional space. |
| 122 | * |
| 123 | * @param n a positive integer |
| 124 | */ |
| 125 | T& operator[](uint32_t n) { |
| 126 | uint32_t vno; |
| 127 | uint32_t idx; |
| 128 | which(n, &vno, &idx); |
| 129 | return (*vects_[vno])[idx]; |
| 130 | } |
| 131 | |
| 132 | /** |
| 133 | * Return the present size of the vector. |
| 134 | */ |
| 135 | size_t size() const { return size_; } |
| 136 | }; |
| 137 | |
| 138 | |
| 139 | /** |
| 140 | * This class embodies the representation of a single connection during |
| 141 | * processing. We'll keep one of these per file descriptor in TClientInfo. |
| 142 | */ |
| 143 | class TClientInfoConnection { |
| 144 | public: |
| 145 | const static int kNameLen = 32; |
| 146 | |
| 147 | private: |
| 148 | typedef union IPAddrUnion { |
| 149 | sockaddr_in ipv4; |
| 150 | sockaddr_in6 ipv6; |
| 151 | }; |
| 152 | |
| 153 | char call_[kNameLen]; ///< The name of the thrift call |
| 154 | IPAddrUnion addr_; ///< The client's IP address |
| 155 | timespec time_; ///< Time processing started |
| 156 | uint64_t ncalls_; ///< # of calls processed |
| 157 | |
| 158 | public: |
| 159 | /** |
| 160 | * Constructor; insure that no client address or thrift call name is |
| 161 | * represented. |
| 162 | */ |
| 163 | TClientInfoConnection(); |
| 164 | |
| 165 | /** |
| 166 | * A connection has been made; record its address. Since this is the |
| 167 | * first we'll know of a connection we start the timer here as well. |
| 168 | */ |
| 169 | void recordAddr(const sockaddr* addr); |
| 170 | |
| 171 | /** |
| 172 | * Mark the address as empty/unknown. |
| 173 | */ |
| 174 | void eraseAddr(); |
| 175 | |
| 176 | /** |
| 177 | * Return a string representing the present address, or NULL if none. |
| 178 | * Copies the string into the buffer provided. |
| 179 | */ |
| 180 | const char* getAddr(char* buf, int len) const; |
| 181 | |
| 182 | /** |
| 183 | * A call has been made on this connection; record its name. Since this is |
| 184 | * called for every thrift call processed, we also do our call count here. |
| 185 | */ |
| 186 | void recordCall(const char* name); |
| 187 | |
| 188 | /** |
| 189 | * Invoked when processing has ended to clear the call name. |
| 190 | */ |
| 191 | void eraseCall(); |
| 192 | |
| 193 | /** |
| 194 | * Return as string the thrift call either currently being processed or |
| 195 | * most recently processed if the connection is still open for additonal |
| 196 | * calls. Returns NULL if a call hasn't been made yet or processing |
| 197 | * has ended. |
| 198 | */ |
| 199 | const char* getCall() const; |
| 200 | |
| 201 | /** |
| 202 | * Get the timespec for the start of this connection (specifically, when |
| 203 | * recordAddr() was first called). |
| 204 | */ |
| 205 | void getTime(timespec* time) const; |
| 206 | |
| 207 | /** |
| 208 | * Return the number of calls made on this connection. |
| 209 | */ |
| 210 | uint64_t getNCalls() const; |
| 211 | |
| 212 | private: |
| 213 | void initTime(); |
| 214 | }; |
| 215 | |
| 216 | |
| 217 | /** |
| 218 | * Store for info about a server's clients -- specifically, the client's IP |
| 219 | * address and the call it is executing. This information is indexed by |
| 220 | * socket file descriptor and in the present implementation is updated |
| 221 | * asynchronously, so it may only approximate reality. |
| 222 | */ |
| 223 | class TClientInfo { |
| 224 | private: |
| 225 | StableVector<TClientInfoConnection> info_; |
| 226 | |
| 227 | public: |
| 228 | /** |
| 229 | * Return the info object for a given file descriptor. If "grow" is true |
| 230 | * extend the info vector if required (such as for a file descriptor not seen |
| 231 | * before). If "grow" is false and the info vector isn't large enough, |
| 232 | * or if "fd" is negative, return NULL. |
| 233 | */ |
| 234 | TClientInfoConnection* getConnection(int fd, bool grow); |
| 235 | |
| 236 | size_t size() const; |
| 237 | }; |
| 238 | |
| 239 | /** |
| 240 | * This derivation of TServerEventHandler encapsulates the main status vector |
| 241 | * and provides context to the server's processing loop via overrides. |
| 242 | * Together with TClientInfoCallHandler (derived from TProcessorEventHandler) |
| 243 | * it integrates client info collection into the server. |
| 244 | */ |
| 245 | class TClientInfoServerHandler : public TServerEventHandler { |
| 246 | private: |
| 247 | TClientInfo clientInfo_; |
| 248 | |
| 249 | public: |
| 250 | /** |
| 251 | * One of these is constructed for each open connection/descriptor and links |
| 252 | * to both the status vector (clientInfo_) and that descriptor's entry |
| 253 | * within it. |
| 254 | */ |
| 255 | struct Connect { |
| 256 | TClientInfo* clientInfo_; |
| 257 | TClientInfoConnection* callInfo_; |
| 258 | |
| 259 | explicit Connect(TClientInfo* clientInfo) |
| 260 | : clientInfo_(clientInfo) |
| 261 | , callInfo_(NULL) { |
| 262 | } |
| 263 | }; |
| 264 | |
| 265 | /** |
| 266 | * Generate processor context; we don't know what descriptor we belong to |
| 267 | * yet -- we'll get hooked up in contextProcess(). |
| 268 | */ |
| 269 | void* createContext(boost::shared_ptr<TProtocol> input, |
| 270 | boost::shared_ptr<TProtocol> output); |
| 271 | |
| 272 | /** |
| 273 | * Mark our slot as unused and delete the context created in createContext(). |
| 274 | */ |
| 275 | void deleteContext(void* processorContext, |
| 276 | boost::shared_ptr<TProtocol> input, |
| 277 | boost::shared_ptr<TProtocol> output); |
| 278 | |
| 279 | /** |
| 280 | * Called in the processing loop just before the server invokes the |
| 281 | * processor itself, on the first call we establish which descriptor |
| 282 | * we correspond to and set it to that socket's peer IP address. This |
| 283 | * also has the side effect of initializing call counting and connection |
| 284 | * timing. We won't know which call we're handling until the handler |
| 285 | * first gets called in TClientInfoCallHandler::getContext(). |
| 286 | */ |
| 287 | void processContext(void* processorContext, |
| 288 | shared_ptr<TTransport> transport); |
| 289 | |
| 290 | /** |
| 291 | * Get status report for server in the form of a vector of strings. |
| 292 | * Each active client appears as one string in the format: |
| 293 | * |
| 294 | * FD IPADDR CALLNAME DURATION NCALLS |
| 295 | * |
| 296 | * where "FD" is the file descriptor for the client's socket, "IPADDR" |
| 297 | * is the IP address (as reported by accept()), "CALLNAME" is the |
| 298 | * current or most recent Thrift function name, "DURATION" is the |
| 299 | * duration of the connection, while NCALLS is the number of Thrift |
| 300 | * calls made since the connection was made. A single space separates |
| 301 | * fields. |
| 302 | */ |
| 303 | void getStatsStrings(vector<string>& result); |
| 304 | }; |
| 305 | |
| 306 | /** |
| 307 | * This class derives from TProcessorEventHandler to gain access to the |
| 308 | * function name for the current Thrift call. We need two versions of |
| 309 | * this -- TClientInfoCallStatsHandler is the other -- since in the latter |
| 310 | * case we pass through to TFunctionStatHandler to perform Thrift call |
| 311 | * stats. |
| 312 | */ |
| 313 | class TClientInfoCallHandler : public TProcessorEventHandler { |
| 314 | public: |
| 315 | virtual void* getContext(const char* fn_name, void* serverContext); |
| 316 | }; |
| 317 | |
| 318 | } } } // namespace apache::thrift::server |
| 319 | |
| 320 | #endif // !_FACEBOOK_THRIFT_SERVER_TCLIENTINFO_H_ |