blob: 1bb2e49bcca4ac3dff9cbbf5014d63694622fe01 [file] [log] [blame]
Mark Sleeade2c832006-09-08 03:41:50 +00001<?php
2
3/** Inherits from Socket */
Mark Slee1c4a5592006-09-25 21:32:05 +00004include_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php';
Mark Sleeade2c832006-09-08 03:41:50 +00005
6/**
7 * This library makes use of APC cache to make hosts as down in a web
8 * environment. If you are running from the CLI or on a system without APC
9 * installed, then these null functions will step in and act like cache
10 * misses.
11 */
12if (!function_exists('apc_fetch')) {
13 function apc_fetch($key) { return FALSE; }
14 function apc_store($key, $var, $ttl=0) { return FALSE; }
15}
16
17/**
18 * Sockets implementation of the TTransport interface that allows connection
19 * to a pool of servers.
20 *
21 * @package thrift.transport
22 * @author Mark Slee <mcslee@facebook.com>
23 */
24class TSocketPool extends TSocket {
25
26 /**
27 * Remote hostname
28 *
29 * @var array
30 */
31 private $hosts_ = array('localhost');
32
33 /**
34 * Remote ports
35 *
36 * @var array
37 */
38 private $ports_ = array('9090');
39
40 /**
41 * How many times to retry each host in connect
42 *
43 * @var int
44 */
45 private $numRetries_ = 1;
46
47 /**
48 * Retry interval in seconds, how long to not try a host if it has been
49 * marked as down.
50 *
51 * @var int
52 */
53 private $retryInterval_ = 60;
54
55 /**
56 * Max consecutive failures before marking a host down.
57 *
58 * @var int
59 */
60 private $maxConsecutiveFailures_ = 1;
61
62 /**
63 * Try hosts in order? or Randomized?
64 *
65 * @var bool
66 */
67 private $randomize_ = TRUE;
68
69 /**
70 * Always try last host, even if marked down?
71 *
72 * @var bool
73 */
74 private $alwaysTryLast_ = TRUE;
75
76 /**
77 * Socket pool constructor
78 *
79 * @param array $hosts List of remote hostnames
80 * @param mixed $ports Array of remote ports, or a single common port
81 * @param bool $persist Whether to use a persistent socket
82 */
83 public function __construct($hosts=array('localhost'),
84 $ports=array(9090),
85 $persist=FALSE) {
86 parent::__construct(null, 0, $persist);
87 $this->hosts_ = $hosts;
88
89 // Ports may be an array or a single port
90 if (is_array($ports)) {
91 $this->ports_ = $ports;
92 } else {
93 $this->ports_ = array();
94 $num = count($hosts);
95 for ($i = 0; $i < $num; ++$i) {
96 $this->ports_ []= $ports;
97 }
98 }
99 }
100
101 /**
102 * Sets how many time to keep retrying a host in the connect function.
103 *
104 * @param int $numRetries
105 */
106 public function setNumRetries($numRetries) {
107 $this->numRetries_ = $numRetries;
108 }
109
110 /**
111 * Sets how long to wait until retrying a host if it was marked down
112 *
113 * @param int $numRetries
114 */
115 public function setRetryInterval($retryInterval) {
116 $this->retryInterval_ = $retryInterval;
117 }
118
119 /**
120 * Sets how many time to keep retrying a host before marking it as down.
121 *
122 * @param int $numRetries
123 */
124 public function setMaxConsecutiveFailures($maxConsecutiveFailures) {
125 $this->maxConsecutiveFailures_ = $maxConsecutiveFailures;
126 }
127
128 /**
129 * Turns randomization in connect order on or off.
130 *
131 * @param bool $randomize
132 */
133 public function setRandomize($randomize) {
134 $this->randomize_ = $randomize;
135 }
136
137 /**
138 * Whether to always try the last server.
139 *
140 * @param bool $alwaysTryLast
141 */
142 public function setAlwaysTryLast($alwaysTryLast) {
143 $this->alwaysTryLast_ = $alwaysTryLast;
144 }
145
146
147 /**
148 * Connects the socket by iterating through all the servers in the pool
149 * and trying to find one that works.
150 */
151 public function open() {
152 $numServers = count($this->hosts_);
153
154 // Check if a random server from the pool should be hit
155 if ($this->randomize_) {
156 $startingPoint = mt_rand(0, $numServers-1);
157 } else {
158 $startingPoint = 0;
159 }
160 $i = $startingPoint;
161
162 do {
163 $host = $this->hosts_[$i];
164 $port = $this->ports_[$i];
165
166 // Check APC cache for a record of this server being down
167 $failtimeKey = 'thrift_failtime:'.$host_.':'.$port.'~';
168
169 // Cache miss? Assume it's OK
170 $lastFailtime = apc_fetch($failtimeKey);
171 if ($lastFailtime === FALSE) {
172 $lastFailtime = 0;
173 }
174
175 $retryIntervalPassed = FALSE;
176
177 // Cache hit...make sure enough the retry interval has elapsed
178 if ($lastFailtime > 0) {
179 $elapsed = time() - $lastFailtime;
180 if ($elapsed > $retryInterval) {
181 $retryIntervalPassed = TRUE;
182 if ($this->debug_) {
183 error_log('TSocketPool: retryInterval '.
184 '('.$this->retryInterval_.') '.
185 'has passed for host '.$host.':'.$port);
186 }
187 }
188 }
189
190 // Only connect if not in the middle of a fail interval, OR if this
191 // is the LAST server we are trying, just hammer away on it
192 $isLastServer = FALSE;
193 if ($alwaysTryLast) {
194 $isLastServer =
195 ( (($i+1) % $numServers) == $startingPoint ) ? TRUE : FALSE;
196 }
197
198 if (($lastFailtime === 0) ||
199 ($isLastServer) ||
200 ($lastFailtime > 0 && $retryIntervalPassed)) {
201
202 // Set underlying TSocket params to this one
203 $this->host_ = $host;
204 $this->port_ = $port;
205
206 for ($attempt = 0; $attempt < $this->numRetries_; $attempt++) {
207 try {
208 parent::open();
209
210 // Only clear the failure counts if required to do so
211 if ($lastFailtime > 0) {
212 apc_store($failtimeKey, 0);
213 }
214 // Successful connection, return now
215 return;
216
217 } catch (Exception $x) {
218 // Connection failed
219 }
220 }
221
222 // Mark failure of this host in the cache
223 $consecfailsKey = 'thrift_consecfails:'.$host.':'.$port.'~';
224
225 // Ignore cache misses
226 $consecfails = apc_fetch($consecfailsKey);
227 if ($consecfails === FALSE) {
228 $consecfails = 0;
229 }
230
231 // Increment by one
232 $consecfails++;
233
234 // Log and cache this failure
235 if ($consecfails >= $this->maxConsecutiveFailures_) {
236 if ($this->debug_) {
237 error_log('TSocketPool: marking '.$host.':'.$port.
238 ' as down for '.$this->retryInterval.' seconds '.
239 'after '.$consecfails.' failed connect attempts.');
240 }
241 // Store the failure time
242 apc_store($failtimeKey, time());
243
244 // Clear the count of consecutive failures
245 apc_store($consecfailsKey, 0);
246 } else {
247 apc_store($consecfailsKey, $consecfails);
248 }
249 }
250 $i = ($i + 1) % $numServers;
251
252 } while ($i != $startingPoint);
253
254 // Holy shit we failed them all. The system is totally ill!
255 $error = 'TSocketPool: All hosts in pool are down. ';
256 $hostlist = implode(',', $this->hosts_);
257 $error .= '('.$hostlist.':'.$this->port_.')';
258 if ($this->debug_) {
259 error_log($error);
260 }
261 throw new Exception($error);
262 }
263}
264
265?>