blob: ac46c360529a227da4402a57002e30c5c576c533 [file] [log] [blame]
<?php
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
* @package thrift.transport
*/
/** Inherits from Socket */
include_once $GLOBALS['THRIFT_ROOT'].'/transport/TSocket.php';
/**
* This library makes use of APC cache to make hosts as down in a web
* environment. If you are running from the CLI or on a system without APC
* installed, then these null functions will step in and act like cache
* misses.
*/
if (!function_exists('apc_fetch')) {
function apc_fetch($key) { return FALSE; }
function apc_store($key, $var, $ttl=0) { return FALSE; }
}
/**
* Sockets implementation of the TTransport interface that allows connection
* to a pool of servers.
*
* @package thrift.transport
*/
class TSocketPool extends TSocket {
/**
* Remote servers. Array of associative arrays with 'host' and 'port' keys
*/
private $servers_ = array();
/**
* How many times to retry each host in connect
*
* @var int
*/
private $numRetries_ = 1;
/**
* Retry interval in seconds, how long to not try a host if it has been
* marked as down.
*
* @var int
*/
private $retryInterval_ = 60;
/**
* Max consecutive failures before marking a host down.
*
* @var int
*/
private $maxConsecutiveFailures_ = 1;
/**
* Try hosts in order? or Randomized?
*
* @var bool
*/
private $randomize_ = TRUE;
/**
* Always try last host, even if marked down?
*
* @var bool
*/
private $alwaysTryLast_ = TRUE;
/**
* Socket pool constructor
*
* @param array $hosts List of remote hostnames
* @param mixed $ports Array of remote ports, or a single common port
* @param bool $persist Whether to use a persistent socket
* @param mixed $debugHandler Function for error logging
*/
public function __construct($hosts=array('localhost'),
$ports=array(9090),
$persist=FALSE,
$debugHandler=null) {
parent::__construct(null, 0, $persist, $debugHandler);
if (!is_array($ports)) {
$port = $ports;
$ports = array();
foreach ($hosts as $key => $val) {
$ports[$key] = $port;
}
}
foreach ($hosts as $key => $host) {
$this->servers_ []= array('host' => $host,
'port' => $ports[$key]);
}
}
/**
* Add a server to the pool
*
* This function does not prevent you from adding a duplicate server entry.
*
* @param string $host hostname or IP
* @param int $port port
*/
public function addServer($host, $port) {
$this->servers_[] = array('host' => $host, 'port' => $port);
}
/**
* Sets how many time to keep retrying a host in the connect function.
*
* @param int $numRetries
*/
public function setNumRetries($numRetries) {
$this->numRetries_ = $numRetries;
}
/**
* Sets how long to wait until retrying a host if it was marked down
*
* @param int $numRetries
*/
public function setRetryInterval($retryInterval) {
$this->retryInterval_ = $retryInterval;
}
/**
* Sets how many time to keep retrying a host before marking it as down.
*
* @param int $numRetries
*/
public function setMaxConsecutiveFailures($maxConsecutiveFailures) {
$this->maxConsecutiveFailures_ = $maxConsecutiveFailures;
}
/**
* Turns randomization in connect order on or off.
*
* @param bool $randomize
*/
public function setRandomize($randomize) {
$this->randomize_ = $randomize;
}
/**
* Whether to always try the last server.
*
* @param bool $alwaysTryLast
*/
public function setAlwaysTryLast($alwaysTryLast) {
$this->alwaysTryLast_ = $alwaysTryLast;
}
/**
* Connects the socket by iterating through all the servers in the pool
* and trying to find one that works.
*/
public function open() {
// Check if we want order randomization
if ($this->randomize_) {
shuffle($this->servers_);
}
// Count servers to identify the "last" one
$numServers = count($this->servers_);
for ($i = 0; $i < $numServers; ++$i) {
// This extracts the $host and $port variables
extract($this->servers_[$i]);
// Check APC cache for a record of this server being down
$failtimeKey = 'thrift_failtime:'.$host.':'.$port.'~';
// Cache miss? Assume it's OK
$lastFailtime = apc_fetch($failtimeKey);
if ($lastFailtime === FALSE) {
$lastFailtime = 0;
}
$retryIntervalPassed = FALSE;
// Cache hit...make sure enough the retry interval has elapsed
if ($lastFailtime > 0) {
$elapsed = time() - $lastFailtime;
if ($elapsed > $this->retryInterval_) {
$retryIntervalPassed = TRUE;
if ($this->debug_) {
call_user_func($this->debugHandler_,
'TSocketPool: retryInterval '.
'('.$this->retryInterval_.') '.
'has passed for host '.$host.':'.$port);
}
}
}
// Only connect if not in the middle of a fail interval, OR if this
// is the LAST server we are trying, just hammer away on it
$isLastServer = FALSE;
if ($this->alwaysTryLast_) {
$isLastServer = ($i == ($numServers - 1));
}
if (($lastFailtime === 0) ||
($isLastServer) ||
($lastFailtime > 0 && $retryIntervalPassed)) {
// Set underlying TSocket params to this one
$this->host_ = $host;
$this->port_ = $port;
// Try up to numRetries_ connections per server
for ($attempt = 0; $attempt < $this->numRetries_; $attempt++) {
try {
// Use the underlying TSocket open function
parent::open();
// Only clear the failure counts if required to do so
if ($lastFailtime > 0) {
apc_store($failtimeKey, 0);
}
// Successful connection, return now
return;
} catch (TException $tx) {
// Connection failed
}
}
// Mark failure of this host in the cache
$consecfailsKey = 'thrift_consecfails:'.$host.':'.$port.'~';
// Ignore cache misses
$consecfails = apc_fetch($consecfailsKey);
if ($consecfails === FALSE) {
$consecfails = 0;
}
// Increment by one
$consecfails++;
// Log and cache this failure
if ($consecfails >= $this->maxConsecutiveFailures_) {
if ($this->debug_) {
call_user_func($this->debugHandler_,
'TSocketPool: marking '.$host.':'.$port.
' as down for '.$this->retryInterval_.' secs '.
'after '.$consecfails.' failed attempts.');
}
// Store the failure time
apc_store($failtimeKey, time());
// Clear the count of consecutive failures
apc_store($consecfailsKey, 0);
} else {
apc_store($consecfailsKey, $consecfails);
}
}
}
// Oh no; we failed them all. The system is totally ill!
$error = 'TSocketPool: All hosts in pool are down. ';
$hosts = array();
foreach ($this->servers_ as $server) {
$hosts []= $server['host'].':'.$server['port'];
}
$hostlist = implode(',', $hosts);
$error .= '('.$hostlist.')';
if ($this->debug_) {
call_user_func($this->debugHandler_, $error);
}
throw new TException($error);
}
}