blob: e22a4b8bb2859c55090e45d7ae46e637ddca4de9 [file] [log] [blame]
// Copyright (c) 2007- Facebook
// Distributed under the Thrift Software License
//
// See accompanying file LICENSE or visit the Thrift site at:
// http://developers.facebook.com/thrift/
#include <algorithm>
#include <iostream>
#include "TSocketPool.h"
namespace facebook { namespace thrift { namespace transport {
using namespace std;
using boost::shared_ptr;
/**
* TSocketPoolServer implementation
*
* @author Akhil Wable <akhil@facebook.com>
*/
TSocketPoolServer::TSocketPoolServer()
: host_(""),
port_(0),
lastFailTime_(0),
consecutiveFailures_(0) {}
/**
* Constructor for TSocketPool server
*/
TSocketPoolServer::TSocketPoolServer(const string &host, int port)
: host_(host),
port_(port),
lastFailTime_(0),
consecutiveFailures_(0) {}
/**
* TSocketPool implementation.
*
* @author Jason Sobel <jsobel@facebook.com>
*/
TSocketPool::TSocketPool() : TSocket(),
numRetries_(1),
retryInterval_(60),
maxConsecutiveFailures_(1),
randomize_(true),
alwaysTryLast_(true) {
}
TSocketPool::TSocketPool(const vector<string> &hosts,
const vector<int> &ports) : TSocket(),
numRetries_(1),
retryInterval_(60),
maxConsecutiveFailures_(1),
randomize_(true),
alwaysTryLast_(true)
{
if (hosts.size() != ports.size()) {
GlobalOutput("TSocketPool::TSocketPool: hosts.size != ports.size");
throw TTransportException(TTransportException::BAD_ARGS);
}
for (unsigned int i = 0; i < hosts.size(); ++i) {
addServer(hosts[i], ports[i]);
}
}
TSocketPool::TSocketPool(const vector<pair<string, int> >& servers) : TSocket(),
numRetries_(1),
retryInterval_(60),
maxConsecutiveFailures_(1),
randomize_(true),
alwaysTryLast_(true)
{
for (unsigned i = 0; i < servers.size(); ++i) {
addServer(servers[i].first, servers[i].second);
}
}
TSocketPool::TSocketPool(const vector< shared_ptr<TSocketPoolServer> >& servers) : TSocket(),
servers_(servers),
numRetries_(1),
retryInterval_(60),
maxConsecutiveFailures_(1),
randomize_(true),
alwaysTryLast_(true)
{
}
TSocketPool::TSocketPool(const string& host, int port) : TSocket(),
numRetries_(1),
retryInterval_(60),
maxConsecutiveFailures_(1),
randomize_(true),
alwaysTryLast_(true)
{
addServer(host, port);
}
TSocketPool::~TSocketPool() {
close();
}
void TSocketPool::addServer(const string& host, int port) {
servers_.push_back(shared_ptr<TSocketPoolServer>(new TSocketPoolServer(host, port)));
}
void TSocketPool::setServers(const vector< shared_ptr<TSocketPoolServer> >& servers) {
servers_ = servers;
}
void TSocketPool::getServers(vector< shared_ptr<TSocketPoolServer> >& servers) {
servers = servers_;
}
void TSocketPool::setNumRetries(int numRetries) {
numRetries_ = numRetries;
}
void TSocketPool::setRetryInterval(int retryInterval) {
retryInterval_ = retryInterval;
}
void TSocketPool::setMaxConsecutiveFailures(int maxConsecutiveFailures) {
maxConsecutiveFailures_ = maxConsecutiveFailures;
}
void TSocketPool::setRandomize(bool randomize) {
randomize_ = randomize;
}
void TSocketPool::setAlwaysTryLast(bool alwaysTryLast) {
alwaysTryLast_ = alwaysTryLast;
}
/* TODO: without apc we ignore a lot of functionality from the php version */
void TSocketPool::open() {
if (randomize_) {
random_shuffle(servers_.begin(), servers_.end());
}
unsigned int numServers = servers_.size();
for (unsigned int i = 0; i < numServers; ++i) {
TSocketPoolServer &server = *(servers_[i]);
bool retryIntervalPassed = (server.lastFailTime_ == 0);
bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false;
host_ = server.host_;
port_ = server.port_;
if (server.lastFailTime_ > 0) {
// The server was marked as down, so check if enough time has elapsed to retry
int elapsedTime = time(NULL) - server.lastFailTime_;
if (elapsedTime > retryInterval_) {
retryIntervalPassed = true;
}
}
if (retryIntervalPassed || isLastServer) {
for (int j = 0; j < numRetries_; ++j) {
try {
TSocket::open();
// reset lastFailTime_ is required
if (server.lastFailTime_) {
server.lastFailTime_ = 0;
}
// success
return;
} catch (TException e) {
// connection failed
}
}
++server.consecutiveFailures_;
if (server.consecutiveFailures_ > maxConsecutiveFailures_) {
// Mark server as down
server.consecutiveFailures_ = 0;
server.lastFailTime_ = time(NULL);
}
}
}
GlobalOutput("TSocketPool::open: all connections failed");
throw TTransportException(TTransportException::NOT_OPEN);
}
}}} // facebook::thrift::transport