blob: 8c6feeb9e6ff2fc7bc6da867e73886986ad8f59f [file] [log] [blame]
Mark Sleeade2c832006-09-08 03:41:50 +00001<?php
2
3/** Inherits from Socket */
Mark Slee1c4a5592006-09-25 21:32:05 +00004include_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php';
Mark Sleeade2c832006-09-08 03:41:50 +00005
6/**
7 * This library makes use of APC cache to make hosts as down in a web
8 * environment. If you are running from the CLI or on a system without APC
9 * installed, then these null functions will step in and act like cache
10 * misses.
11 */
12if (!function_exists('apc_fetch')) {
13 function apc_fetch($key) { return FALSE; }
14 function apc_store($key, $var, $ttl=0) { return FALSE; }
15}
16
17/**
18 * Sockets implementation of the TTransport interface that allows connection
19 * to a pool of servers.
20 *
21 * @package thrift.transport
22 * @author Mark Slee <mcslee@facebook.com>
23 */
24class TSocketPool extends TSocket {
25
26 /**
Mark Slee3f11b7a2006-10-04 19:02:03 +000027 * Remote servers. Array of associative arrays with 'host' and 'port' keys
Mark Sleeade2c832006-09-08 03:41:50 +000028 */
Mark Slee3f11b7a2006-10-04 19:02:03 +000029 private $servers_ = array();
Mark Sleeade2c832006-09-08 03:41:50 +000030
31 /**
32 * How many times to retry each host in connect
33 *
34 * @var int
35 */
36 private $numRetries_ = 1;
37
38 /**
39 * Retry interval in seconds, how long to not try a host if it has been
40 * marked as down.
41 *
42 * @var int
43 */
44 private $retryInterval_ = 60;
45
46 /**
47 * Max consecutive failures before marking a host down.
48 *
49 * @var int
50 */
51 private $maxConsecutiveFailures_ = 1;
52
53 /**
54 * Try hosts in order? or Randomized?
55 *
56 * @var bool
57 */
58 private $randomize_ = TRUE;
59
60 /**
61 * Always try last host, even if marked down?
62 *
63 * @var bool
64 */
65 private $alwaysTryLast_ = TRUE;
66
67 /**
68 * Socket pool constructor
69 *
70 * @param array $hosts List of remote hostnames
71 * @param mixed $ports Array of remote ports, or a single common port
72 * @param bool $persist Whether to use a persistent socket
73 */
74 public function __construct($hosts=array('localhost'),
75 $ports=array(9090),
76 $persist=FALSE) {
77 parent::__construct(null, 0, $persist);
Mark Sleeade2c832006-09-08 03:41:50 +000078
Mark Slee3f11b7a2006-10-04 19:02:03 +000079 if (!is_array($ports)) {
80 $port = $ports;
81 $ports = array();
82 foreach ($hosts as $key => $val) {
83 $ports[$key] = $port;
Mark Sleeade2c832006-09-08 03:41:50 +000084 }
85 }
Mark Slee3f11b7a2006-10-04 19:02:03 +000086
87 foreach ($hosts as $key => $host) {
88 $this->servers_ []= array('host' => $host,
89 'port' => $ports[$key]);
90 }
Mark Sleeade2c832006-09-08 03:41:50 +000091 }
92
93 /**
94 * Sets how many time to keep retrying a host in the connect function.
95 *
96 * @param int $numRetries
97 */
98 public function setNumRetries($numRetries) {
99 $this->numRetries_ = $numRetries;
100 }
101
102 /**
103 * Sets how long to wait until retrying a host if it was marked down
104 *
105 * @param int $numRetries
106 */
107 public function setRetryInterval($retryInterval) {
108 $this->retryInterval_ = $retryInterval;
109 }
110
111 /**
112 * Sets how many time to keep retrying a host before marking it as down.
113 *
114 * @param int $numRetries
115 */
116 public function setMaxConsecutiveFailures($maxConsecutiveFailures) {
117 $this->maxConsecutiveFailures_ = $maxConsecutiveFailures;
118 }
119
120 /**
121 * Turns randomization in connect order on or off.
122 *
123 * @param bool $randomize
124 */
125 public function setRandomize($randomize) {
126 $this->randomize_ = $randomize;
127 }
128
129 /**
130 * Whether to always try the last server.
131 *
132 * @param bool $alwaysTryLast
133 */
134 public function setAlwaysTryLast($alwaysTryLast) {
135 $this->alwaysTryLast_ = $alwaysTryLast;
136 }
137
138
139 /**
140 * Connects the socket by iterating through all the servers in the pool
141 * and trying to find one that works.
142 */
143 public function open() {
Mark Slee3f11b7a2006-10-04 19:02:03 +0000144 // Check if we want order randomization
Mark Sleeade2c832006-09-08 03:41:50 +0000145 if ($this->randomize_) {
Mark Slee3f11b7a2006-10-04 19:02:03 +0000146 shuffle($this->servers_);
Mark Sleeade2c832006-09-08 03:41:50 +0000147 }
Mark Sleeade2c832006-09-08 03:41:50 +0000148
Mark Slee3f11b7a2006-10-04 19:02:03 +0000149 // Count servers to identify the "last" one
150 $numServers = count($this->servers_);
151
152 for ($i = 0; $i < $numServers; ++$i) {
153
154 // This extracts the $host and $port variables
155 extract($this->servers_[$i]);
Mark Sleeade2c832006-09-08 03:41:50 +0000156
157 // Check APC cache for a record of this server being down
Mark Slee3f11b7a2006-10-04 19:02:03 +0000158 $failtimeKey = 'thrift_failtime:'.$host.':'.$port.'~';
Mark Sleeade2c832006-09-08 03:41:50 +0000159
160 // Cache miss? Assume it's OK
161 $lastFailtime = apc_fetch($failtimeKey);
162 if ($lastFailtime === FALSE) {
163 $lastFailtime = 0;
164 }
165
166 $retryIntervalPassed = FALSE;
167
168 // Cache hit...make sure enough the retry interval has elapsed
169 if ($lastFailtime > 0) {
170 $elapsed = time() - $lastFailtime;
171 if ($elapsed > $retryInterval) {
172 $retryIntervalPassed = TRUE;
173 if ($this->debug_) {
174 error_log('TSocketPool: retryInterval '.
175 '('.$this->retryInterval_.') '.
176 'has passed for host '.$host.':'.$port);
177 }
178 }
179 }
180
181 // Only connect if not in the middle of a fail interval, OR if this
182 // is the LAST server we are trying, just hammer away on it
183 $isLastServer = FALSE;
184 if ($alwaysTryLast) {
Mark Slee3f11b7a2006-10-04 19:02:03 +0000185 $isLastServer = ($i == ($numServers - 1));
Mark Sleeade2c832006-09-08 03:41:50 +0000186 }
187
188 if (($lastFailtime === 0) ||
189 ($isLastServer) ||
190 ($lastFailtime > 0 && $retryIntervalPassed)) {
191
192 // Set underlying TSocket params to this one
193 $this->host_ = $host;
194 $this->port_ = $port;
195
Mark Slee3f11b7a2006-10-04 19:02:03 +0000196 // Try up to numRetries_ connections per server
Mark Sleeade2c832006-09-08 03:41:50 +0000197 for ($attempt = 0; $attempt < $this->numRetries_; $attempt++) {
198 try {
Mark Slee3f11b7a2006-10-04 19:02:03 +0000199 // Use the underlying TSocket open function
Mark Sleeade2c832006-09-08 03:41:50 +0000200 parent::open();
201
202 // Only clear the failure counts if required to do so
203 if ($lastFailtime > 0) {
204 apc_store($failtimeKey, 0);
205 }
Mark Slee3f11b7a2006-10-04 19:02:03 +0000206
Mark Sleeade2c832006-09-08 03:41:50 +0000207 // Successful connection, return now
208 return;
209
210 } catch (Exception $x) {
211 // Connection failed
212 }
213 }
214
215 // Mark failure of this host in the cache
216 $consecfailsKey = 'thrift_consecfails:'.$host.':'.$port.'~';
217
218 // Ignore cache misses
219 $consecfails = apc_fetch($consecfailsKey);
220 if ($consecfails === FALSE) {
221 $consecfails = 0;
222 }
223
224 // Increment by one
225 $consecfails++;
226
227 // Log and cache this failure
228 if ($consecfails >= $this->maxConsecutiveFailures_) {
229 if ($this->debug_) {
230 error_log('TSocketPool: marking '.$host.':'.$port.
231 ' as down for '.$this->retryInterval.' seconds '.
232 'after '.$consecfails.' failed connect attempts.');
233 }
234 // Store the failure time
235 apc_store($failtimeKey, time());
236
237 // Clear the count of consecutive failures
238 apc_store($consecfailsKey, 0);
239 } else {
240 apc_store($consecfailsKey, $consecfails);
241 }
242 }
Mark Slee3f11b7a2006-10-04 19:02:03 +0000243 }
Mark Sleeade2c832006-09-08 03:41:50 +0000244
245 // Holy shit we failed them all. The system is totally ill!
246 $error = 'TSocketPool: All hosts in pool are down. ';
247 $hostlist = implode(',', $this->hosts_);
248 $error .= '('.$hostlist.':'.$this->port_.')';
249 if ($this->debug_) {
250 error_log($error);
251 }
252 throw new Exception($error);
253 }
254}
255
256?>