blob: ac46c360529a227da4402a57002e30c5c576c533 [file] [log] [blame]
Mark Sleeade2c832006-09-08 03:41:50 +00001<?php
David Reissea2cba82009-03-30 21:35:00 +00002/*
3 * Licensed to the Apache Software Foundation (ASF) under one
4 * or more contributor license agreements. See the NOTICE file
5 * distributed with this work for additional information
6 * regarding copyright ownership. The ASF licenses this file
7 * to you under the Apache License, Version 2.0 (the
8 * "License"); you may not use this file except in compliance
9 * with the License. You may obtain a copy of the License at
Mark Slee4902c052007-03-01 00:31:30 +000010 *
David Reissea2cba82009-03-30 21:35:00 +000011 * http://www.apache.org/licenses/LICENSE-2.0
12 *
13 * Unless required by applicable law or agreed to in writing,
14 * software distributed under the License is distributed on an
15 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16 * KIND, either express or implied. See the License for the
17 * specific language governing permissions and limitations
18 * under the License.
Mark Slee4902c052007-03-01 00:31:30 +000019 *
20 * @package thrift.transport
Mark Slee4902c052007-03-01 00:31:30 +000021 */
22
David Reissea2cba82009-03-30 21:35:00 +000023
Mark Sleeade2c832006-09-08 03:41:50 +000024/** Inherits from Socket */
Mark Slee1c4a5592006-09-25 21:32:05 +000025include_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php';
Mark Sleeade2c832006-09-08 03:41:50 +000026
27/**
28 * This library makes use of APC cache to make hosts as down in a web
29 * environment. If you are running from the CLI or on a system without APC
30 * installed, then these null functions will step in and act like cache
31 * misses.
32 */
33if (!function_exists('apc_fetch')) {
34 function apc_fetch($key) { return FALSE; }
35 function apc_store($key, $var, $ttl=0) { return FALSE; }
36}
37
38/**
39 * Sockets implementation of the TTransport interface that allows connection
40 * to a pool of servers.
41 *
42 * @package thrift.transport
Mark Sleeade2c832006-09-08 03:41:50 +000043 */
44class TSocketPool extends TSocket {
45
46 /**
Mark Slee3f11b7a2006-10-04 19:02:03 +000047 * Remote servers. Array of associative arrays with 'host' and 'port' keys
Mark Sleeade2c832006-09-08 03:41:50 +000048 */
Mark Slee3f11b7a2006-10-04 19:02:03 +000049 private $servers_ = array();
Mark Sleeade2c832006-09-08 03:41:50 +000050
51 /**
52 * How many times to retry each host in connect
53 *
54 * @var int
55 */
56 private $numRetries_ = 1;
57
58 /**
59 * Retry interval in seconds, how long to not try a host if it has been
60 * marked as down.
61 *
62 * @var int
63 */
64 private $retryInterval_ = 60;
65
66 /**
67 * Max consecutive failures before marking a host down.
68 *
69 * @var int
70 */
71 private $maxConsecutiveFailures_ = 1;
72
73 /**
74 * Try hosts in order? or Randomized?
75 *
76 * @var bool
77 */
78 private $randomize_ = TRUE;
79
80 /**
81 * Always try last host, even if marked down?
82 *
83 * @var bool
84 */
85 private $alwaysTryLast_ = TRUE;
86
87 /**
88 * Socket pool constructor
89 *
Mark Sleead58f952007-01-03 19:23:50 +000090 * @param array $hosts List of remote hostnames
91 * @param mixed $ports Array of remote ports, or a single common port
92 * @param bool $persist Whether to use a persistent socket
93 * @param mixed $debugHandler Function for error logging
Mark Sleeade2c832006-09-08 03:41:50 +000094 */
95 public function __construct($hosts=array('localhost'),
96 $ports=array(9090),
Mark Sleead58f952007-01-03 19:23:50 +000097 $persist=FALSE,
98 $debugHandler=null) {
99 parent::__construct(null, 0, $persist, $debugHandler);
100
Mark Slee3f11b7a2006-10-04 19:02:03 +0000101 if (!is_array($ports)) {
102 $port = $ports;
103 $ports = array();
104 foreach ($hosts as $key => $val) {
105 $ports[$key] = $port;
Mark Sleeade2c832006-09-08 03:41:50 +0000106 }
107 }
Mark Slee3f11b7a2006-10-04 19:02:03 +0000108
109 foreach ($hosts as $key => $host) {
110 $this->servers_ []= array('host' => $host,
111 'port' => $ports[$key]);
112 }
Mark Sleeade2c832006-09-08 03:41:50 +0000113 }
114
115 /**
Mark Slee0cdc6c82007-11-13 10:19:08 +0000116 * Add a server to the pool
117 *
118 * This function does not prevent you from adding a duplicate server entry.
119 *
120 * @param string $host hostname or IP
121 * @param int $port port
122 */
123 public function addServer($host, $port) {
124 $this->servers_[] = array('host' => $host, 'port' => $port);
125 }
126
127 /**
Mark Sleeade2c832006-09-08 03:41:50 +0000128 * Sets how many time to keep retrying a host in the connect function.
129 *
130 * @param int $numRetries
131 */
132 public function setNumRetries($numRetries) {
133 $this->numRetries_ = $numRetries;
134 }
135
136 /**
137 * Sets how long to wait until retrying a host if it was marked down
138 *
139 * @param int $numRetries
140 */
141 public function setRetryInterval($retryInterval) {
142 $this->retryInterval_ = $retryInterval;
143 }
144
145 /**
146 * Sets how many time to keep retrying a host before marking it as down.
147 *
148 * @param int $numRetries
149 */
150 public function setMaxConsecutiveFailures($maxConsecutiveFailures) {
151 $this->maxConsecutiveFailures_ = $maxConsecutiveFailures;
152 }
153
154 /**
155 * Turns randomization in connect order on or off.
156 *
157 * @param bool $randomize
158 */
159 public function setRandomize($randomize) {
160 $this->randomize_ = $randomize;
161 }
162
163 /**
164 * Whether to always try the last server.
165 *
166 * @param bool $alwaysTryLast
167 */
168 public function setAlwaysTryLast($alwaysTryLast) {
169 $this->alwaysTryLast_ = $alwaysTryLast;
170 }
171
172
173 /**
174 * Connects the socket by iterating through all the servers in the pool
175 * and trying to find one that works.
176 */
177 public function open() {
Mark Slee3f11b7a2006-10-04 19:02:03 +0000178 // Check if we want order randomization
Mark Sleeade2c832006-09-08 03:41:50 +0000179 if ($this->randomize_) {
Mark Slee3f11b7a2006-10-04 19:02:03 +0000180 shuffle($this->servers_);
Mark Sleeade2c832006-09-08 03:41:50 +0000181 }
Mark Sleeade2c832006-09-08 03:41:50 +0000182
Mark Slee3f11b7a2006-10-04 19:02:03 +0000183 // Count servers to identify the "last" one
184 $numServers = count($this->servers_);
185
186 for ($i = 0; $i < $numServers; ++$i) {
187
188 // This extracts the $host and $port variables
189 extract($this->servers_[$i]);
Mark Sleeade2c832006-09-08 03:41:50 +0000190
191 // Check APC cache for a record of this server being down
Mark Slee3f11b7a2006-10-04 19:02:03 +0000192 $failtimeKey = 'thrift_failtime:'.$host.':'.$port.'~';
Mark Sleeade2c832006-09-08 03:41:50 +0000193
194 // Cache miss? Assume it's OK
195 $lastFailtime = apc_fetch($failtimeKey);
196 if ($lastFailtime === FALSE) {
197 $lastFailtime = 0;
198 }
199
200 $retryIntervalPassed = FALSE;
201
202 // Cache hit...make sure enough the retry interval has elapsed
203 if ($lastFailtime > 0) {
204 $elapsed = time() - $lastFailtime;
robertb0fac3e2007-01-15 23:53:25 +0000205 if ($elapsed > $this->retryInterval_) {
Mark Sleeade2c832006-09-08 03:41:50 +0000206 $retryIntervalPassed = TRUE;
207 if ($this->debug_) {
Mark Sleee7714a62007-01-11 01:26:00 +0000208 call_user_func($this->debugHandler_,
209 'TSocketPool: retryInterval '.
210 '('.$this->retryInterval_.') '.
211 'has passed for host '.$host.':'.$port);
Mark Sleeade2c832006-09-08 03:41:50 +0000212 }
213 }
214 }
215
216 // Only connect if not in the middle of a fail interval, OR if this
217 // is the LAST server we are trying, just hammer away on it
218 $isLastServer = FALSE;
Mark Sleea09e34e2007-01-03 18:45:04 +0000219 if ($this->alwaysTryLast_) {
Mark Slee3f11b7a2006-10-04 19:02:03 +0000220 $isLastServer = ($i == ($numServers - 1));
Mark Sleeade2c832006-09-08 03:41:50 +0000221 }
222
223 if (($lastFailtime === 0) ||
224 ($isLastServer) ||
225 ($lastFailtime > 0 && $retryIntervalPassed)) {
226
227 // Set underlying TSocket params to this one
228 $this->host_ = $host;
229 $this->port_ = $port;
Mark Slee0cdc6c82007-11-13 10:19:08 +0000230
Mark Slee3f11b7a2006-10-04 19:02:03 +0000231 // Try up to numRetries_ connections per server
Mark Sleeade2c832006-09-08 03:41:50 +0000232 for ($attempt = 0; $attempt < $this->numRetries_; $attempt++) {
233 try {
Mark Slee3f11b7a2006-10-04 19:02:03 +0000234 // Use the underlying TSocket open function
Mark Sleeade2c832006-09-08 03:41:50 +0000235 parent::open();
236
237 // Only clear the failure counts if required to do so
238 if ($lastFailtime > 0) {
239 apc_store($failtimeKey, 0);
240 }
Mark Slee3f11b7a2006-10-04 19:02:03 +0000241
Mark Sleeade2c832006-09-08 03:41:50 +0000242 // Successful connection, return now
243 return;
244
Mark Slee76791962007-03-14 02:47:35 +0000245 } catch (TException $tx) {
Mark Sleeade2c832006-09-08 03:41:50 +0000246 // Connection failed
247 }
248 }
249
250 // Mark failure of this host in the cache
251 $consecfailsKey = 'thrift_consecfails:'.$host.':'.$port.'~';
252
253 // Ignore cache misses
254 $consecfails = apc_fetch($consecfailsKey);
255 if ($consecfails === FALSE) {
256 $consecfails = 0;
257 }
258
259 // Increment by one
260 $consecfails++;
261
262 // Log and cache this failure
263 if ($consecfails >= $this->maxConsecutiveFailures_) {
264 if ($this->debug_) {
Mark Sleee7714a62007-01-11 01:26:00 +0000265 call_user_func($this->debugHandler_,
266 'TSocketPool: marking '.$host.':'.$port.
Karl Lehenbauer893ef722007-01-17 18:56:10 +0000267 ' as down for '.$this->retryInterval_.' secs '.
Mark Sleee7714a62007-01-11 01:26:00 +0000268 'after '.$consecfails.' failed attempts.');
Mark Sleeade2c832006-09-08 03:41:50 +0000269 }
270 // Store the failure time
271 apc_store($failtimeKey, time());
272
273 // Clear the count of consecutive failures
274 apc_store($consecfailsKey, 0);
275 } else {
276 apc_store($consecfailsKey, $consecfails);
277 }
278 }
Mark Slee3f11b7a2006-10-04 19:02:03 +0000279 }
Mark Sleeade2c832006-09-08 03:41:50 +0000280
David Reiss3bb5e052010-01-25 19:31:31 +0000281 // Oh no; we failed them all. The system is totally ill!
Mark Sleeade2c832006-09-08 03:41:50 +0000282 $error = 'TSocketPool: All hosts in pool are down. ';
Mark Slee588e4522006-11-15 22:23:06 +0000283 $hosts = array();
284 foreach ($this->servers_ as $server) {
285 $hosts []= $server['host'].':'.$server['port'];
286 }
287 $hostlist = implode(',', $hosts);
288 $error .= '('.$hostlist.')';
Mark Sleeade2c832006-09-08 03:41:50 +0000289 if ($this->debug_) {
Mark Sleee7714a62007-01-11 01:26:00 +0000290 call_user_func($this->debugHandler_, $error);
Mark Sleeade2c832006-09-08 03:41:50 +0000291 }
Mark Slee76791962007-03-14 02:47:35 +0000292 throw new TException($error);
Mark Sleeade2c832006-09-08 03:41:50 +0000293 }
294}