| Gavin McDonald | 0b75e1a | 2010-10-28 02:12:01 +0000 | [diff] [blame^] | 1 | /* | 
 | 2 |  * Licensed to the Apache Software Foundation (ASF) under one | 
 | 3 |  * or more contributor license agreements. See the NOTICE file | 
 | 4 |  * distributed with this work for additional information | 
 | 5 |  * regarding copyright ownership. The ASF licenses this file | 
 | 6 |  * to you under the Apache License, Version 2.0 (the | 
 | 7 |  * "License"); you may not use this file except in compliance | 
 | 8 |  * with the License. You may obtain a copy of the License at | 
 | 9 |  * | 
 | 10 |  *   http://www.apache.org/licenses/LICENSE-2.0 | 
 | 11 |  * | 
 | 12 |  * Unless required by applicable law or agreed to in writing, | 
 | 13 |  * software distributed under the License is distributed on an | 
 | 14 |  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
 | 15 |  * KIND, either express or implied. See the License for the | 
 | 16 |  * specific language governing permissions and limitations | 
 | 17 |  * under the License. | 
 | 18 |  */ | 
 | 19 |  | 
 | 20 | #include <algorithm> | 
 | 21 | #include <iostream> | 
 | 22 |  | 
 | 23 | #include "TSocketPool.h" | 
 | 24 |  | 
 | 25 | namespace apache { namespace thrift { namespace transport { | 
 | 26 |  | 
 | 27 | using namespace std; | 
 | 28 |  | 
 | 29 | using boost::shared_ptr; | 
 | 30 |  | 
 | 31 | /** | 
 | 32 |  * TSocketPoolServer implementation | 
 | 33 |  * | 
 | 34 |  */ | 
 | 35 | TSocketPoolServer::TSocketPoolServer() | 
 | 36 |   : host_(""), | 
 | 37 |     port_(0), | 
 | 38 |     socket_(-1), | 
 | 39 |     lastFailTime_(0), | 
 | 40 |     consecutiveFailures_(0) {} | 
 | 41 |  | 
 | 42 | /** | 
 | 43 |  * Constructor for TSocketPool server | 
 | 44 |  */ | 
 | 45 | TSocketPoolServer::TSocketPoolServer(const string &host, int port) | 
 | 46 |   : host_(host), | 
 | 47 |     port_(port), | 
 | 48 |     socket_(-1), | 
 | 49 |     lastFailTime_(0), | 
 | 50 |     consecutiveFailures_(0) {} | 
 | 51 |  | 
 | 52 | /** | 
 | 53 |  * TSocketPool implementation. | 
 | 54 |  * | 
 | 55 |  */ | 
 | 56 |  | 
 | 57 | TSocketPool::TSocketPool() : TSocket(), | 
 | 58 |   numRetries_(1), | 
 | 59 |   retryInterval_(60), | 
 | 60 |   maxConsecutiveFailures_(1), | 
 | 61 |   randomize_(true), | 
 | 62 |   alwaysTryLast_(true) { | 
 | 63 | } | 
 | 64 |  | 
 | 65 | TSocketPool::TSocketPool(const vector<string> &hosts, | 
 | 66 |                          const vector<int> &ports) : TSocket(), | 
 | 67 |   numRetries_(1), | 
 | 68 |   retryInterval_(60), | 
 | 69 |   maxConsecutiveFailures_(1), | 
 | 70 |   randomize_(true), | 
 | 71 |   alwaysTryLast_(true) | 
 | 72 | { | 
 | 73 |   if (hosts.size() != ports.size()) { | 
 | 74 |     GlobalOutput("TSocketPool::TSocketPool: hosts.size != ports.size"); | 
 | 75 |     throw TTransportException(TTransportException::BAD_ARGS); | 
 | 76 |   } | 
 | 77 |  | 
 | 78 |   for (unsigned int i = 0; i < hosts.size(); ++i) { | 
 | 79 |     addServer(hosts[i], ports[i]); | 
 | 80 |   } | 
 | 81 | } | 
 | 82 |  | 
 | 83 | TSocketPool::TSocketPool(const vector<pair<string, int> >& servers) : TSocket(), | 
 | 84 |   numRetries_(1), | 
 | 85 |   retryInterval_(60), | 
 | 86 |   maxConsecutiveFailures_(1), | 
 | 87 |   randomize_(true), | 
 | 88 |   alwaysTryLast_(true) | 
 | 89 | { | 
 | 90 |   for (unsigned i = 0; i < servers.size(); ++i) { | 
 | 91 |     addServer(servers[i].first, servers[i].second); | 
 | 92 |   } | 
 | 93 | } | 
 | 94 |  | 
 | 95 | TSocketPool::TSocketPool(const vector< shared_ptr<TSocketPoolServer> >& servers) : TSocket(), | 
 | 96 |   servers_(servers), | 
 | 97 |   numRetries_(1), | 
 | 98 |   retryInterval_(60), | 
 | 99 |   maxConsecutiveFailures_(1), | 
 | 100 |   randomize_(true), | 
 | 101 |   alwaysTryLast_(true) | 
 | 102 | { | 
 | 103 | } | 
 | 104 |  | 
 | 105 | TSocketPool::TSocketPool(const string& host, int port) : TSocket(), | 
 | 106 |   numRetries_(1), | 
 | 107 |   retryInterval_(60), | 
 | 108 |   maxConsecutiveFailures_(1), | 
 | 109 |   randomize_(true), | 
 | 110 |   alwaysTryLast_(true) | 
 | 111 | { | 
 | 112 |   addServer(host, port); | 
 | 113 | } | 
 | 114 |  | 
 | 115 | TSocketPool::~TSocketPool() { | 
 | 116 |   vector< shared_ptr<TSocketPoolServer> >::const_iterator iter = servers_.begin(); | 
 | 117 |   vector< shared_ptr<TSocketPoolServer> >::const_iterator iterEnd = servers_.end(); | 
 | 118 |   for (; iter != iterEnd; ++iter) { | 
 | 119 |     setCurrentServer(*iter); | 
 | 120 |     TSocketPool::close(); | 
 | 121 |   } | 
 | 122 | } | 
 | 123 |  | 
 | 124 | void TSocketPool::addServer(const string& host, int port) { | 
 | 125 |   servers_.push_back(shared_ptr<TSocketPoolServer>(new TSocketPoolServer(host, port))); | 
 | 126 | } | 
 | 127 |  | 
 | 128 | void TSocketPool::setServers(const vector< shared_ptr<TSocketPoolServer> >& servers) { | 
 | 129 |   servers_ = servers; | 
 | 130 | } | 
 | 131 |  | 
 | 132 | void TSocketPool::getServers(vector< shared_ptr<TSocketPoolServer> >& servers) { | 
 | 133 |   servers = servers_; | 
 | 134 | } | 
 | 135 |  | 
 | 136 | void TSocketPool::setNumRetries(int numRetries) { | 
 | 137 |   numRetries_ = numRetries; | 
 | 138 | } | 
 | 139 |  | 
 | 140 | void TSocketPool::setRetryInterval(int retryInterval) { | 
 | 141 |   retryInterval_ = retryInterval; | 
 | 142 | } | 
 | 143 |  | 
 | 144 |  | 
 | 145 | void TSocketPool::setMaxConsecutiveFailures(int maxConsecutiveFailures) { | 
 | 146 |   maxConsecutiveFailures_ = maxConsecutiveFailures; | 
 | 147 | } | 
 | 148 |  | 
 | 149 | void TSocketPool::setRandomize(bool randomize) { | 
 | 150 |   randomize_ = randomize; | 
 | 151 | } | 
 | 152 |  | 
 | 153 | void TSocketPool::setAlwaysTryLast(bool alwaysTryLast) { | 
 | 154 |   alwaysTryLast_ = alwaysTryLast; | 
 | 155 | } | 
 | 156 |  | 
 | 157 | void TSocketPool::setCurrentServer(const shared_ptr<TSocketPoolServer> &server) { | 
 | 158 |   currentServer_ = server; | 
 | 159 |   host_ = server->host_; | 
 | 160 |   port_ = server->port_; | 
 | 161 |   socket_ = server->socket_; | 
 | 162 | } | 
 | 163 |  | 
 | 164 | /* TODO: without apc we ignore a lot of functionality from the php version */ | 
 | 165 | void TSocketPool::open() { | 
 | 166 |   if (randomize_) { | 
 | 167 |     random_shuffle(servers_.begin(), servers_.end()); | 
 | 168 |   } | 
 | 169 |  | 
 | 170 |   unsigned int numServers = servers_.size(); | 
 | 171 |   for (unsigned int i = 0; i < numServers; ++i) { | 
 | 172 |  | 
 | 173 |     shared_ptr<TSocketPoolServer> &server = servers_[i]; | 
 | 174 |     bool retryIntervalPassed = (server->lastFailTime_ == 0); | 
 | 175 |     bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false; | 
 | 176 |  | 
 | 177 |     // Impersonate the server socket | 
 | 178 |     setCurrentServer(server); | 
 | 179 |  | 
 | 180 |     if (isOpen()) { | 
 | 181 |       // already open means we're done | 
 | 182 |       return; | 
 | 183 |     } | 
 | 184 |  | 
 | 185 |     if (server->lastFailTime_ > 0) { | 
 | 186 |       // The server was marked as down, so check if enough time has elapsed to retry | 
 | 187 |       int elapsedTime = time(NULL) - server->lastFailTime_; | 
 | 188 |       if (elapsedTime > retryInterval_) { | 
 | 189 |         retryIntervalPassed = true; | 
 | 190 |       } | 
 | 191 |     } | 
 | 192 |  | 
 | 193 |     if (retryIntervalPassed || isLastServer) { | 
 | 194 |       for (int j = 0; j < numRetries_; ++j) { | 
 | 195 |         try { | 
 | 196 |           TSocket::open(); | 
 | 197 |  | 
 | 198 |           // Copy over the opened socket so that we can keep it persistent | 
 | 199 |           server->socket_ = socket_; | 
 | 200 |  | 
 | 201 |           // reset lastFailTime_ is required | 
 | 202 |           if (server->lastFailTime_) { | 
 | 203 |             server->lastFailTime_ = 0; | 
 | 204 |           } | 
 | 205 |  | 
 | 206 |           // success | 
 | 207 |           return; | 
 | 208 |         } catch (TException e) { | 
 | 209 |           string errStr = "TSocketPool::open failed "+getSocketInfo()+": "+e.what(); | 
 | 210 |           GlobalOutput(errStr.c_str()); | 
 | 211 |           // connection failed | 
 | 212 |         } | 
 | 213 |       } | 
 | 214 |  | 
 | 215 |       ++server->consecutiveFailures_; | 
 | 216 |       if (server->consecutiveFailures_ > maxConsecutiveFailures_) { | 
 | 217 |         // Mark server as down | 
 | 218 |         server->consecutiveFailures_ = 0; | 
 | 219 |         server->lastFailTime_ = time(NULL); | 
 | 220 |       } | 
 | 221 |     } | 
 | 222 |   } | 
 | 223 |  | 
 | 224 |   GlobalOutput("TSocketPool::open: all connections failed"); | 
 | 225 |   throw TTransportException(TTransportException::NOT_OPEN); | 
 | 226 | } | 
 | 227 |  | 
 | 228 | void TSocketPool::close() { | 
 | 229 |   if (isOpen()) { | 
 | 230 |     TSocket::close(); | 
 | 231 |     currentServer_->socket_ = -1; | 
 | 232 |   } | 
 | 233 | } | 
 | 234 |  | 
 | 235 | }}} // apache::thrift::transport |