Persistent conns in TSocketPool
Summary: Added support for persistent conns in TSocketPool
Also, added some util functions in TNonblockingServer
Reviewed By: mcslee
Test Plan: Ran a test search cluster with these changes - open was only called once (I put fprintfs in open and close), after which the socket was reused
Revert: OK
TracCamp Project: Thrift
DiffCamp Revision: 11425
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665668 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/transport/TSocketPool.cpp b/lib/cpp/src/transport/TSocketPool.cpp
index e22a4b8..d5a6501 100644
--- a/lib/cpp/src/transport/TSocketPool.cpp
+++ b/lib/cpp/src/transport/TSocketPool.cpp
@@ -23,6 +23,7 @@
TSocketPoolServer::TSocketPoolServer()
: host_(""),
port_(0),
+ socket_(-1),
lastFailTime_(0),
consecutiveFailures_(0) {}
@@ -32,6 +33,7 @@
TSocketPoolServer::TSocketPoolServer(const string &host, int port)
: host_(host),
port_(port),
+ socket_(-1),
lastFailTime_(0),
consecutiveFailures_(0) {}
@@ -100,7 +102,12 @@
}
TSocketPool::~TSocketPool() {
- close();
+ vector< shared_ptr<TSocketPoolServer> >::const_iterator iter = servers_.begin();
+ vector< shared_ptr<TSocketPoolServer> >::const_iterator iterEnd = servers_.end();
+ for (; iter != iterEnd; ++iter) {
+ setCurrentServer(*iter);
+ TSocketPool::close();
+ }
}
void TSocketPool::addServer(const string& host, int port) {
@@ -136,6 +143,13 @@
alwaysTryLast_ = alwaysTryLast;
}
+void TSocketPool::setCurrentServer(const shared_ptr<TSocketPoolServer> &server) {
+ currentServer_ = server;
+ host_ = server->host_;
+ port_ = server->port_;
+ socket_ = server->socket_;
+}
+
/* TODO: without apc we ignore a lot of functionality from the php version */
void TSocketPool::open() {
if (randomize_) {
@@ -145,16 +159,21 @@
unsigned int numServers = servers_.size();
for (unsigned int i = 0; i < numServers; ++i) {
- TSocketPoolServer &server = *(servers_[i]);
- bool retryIntervalPassed = (server.lastFailTime_ == 0);
+ shared_ptr<TSocketPoolServer> &server = servers_[i];
+ bool retryIntervalPassed = (server->lastFailTime_ == 0);
bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false;
- host_ = server.host_;
- port_ = server.port_;
+ // Impersonate the server socket
+ setCurrentServer(server);
- if (server.lastFailTime_ > 0) {
+ if (isOpen()) {
+ // already open means we're done
+ return;
+ }
+
+ if (server->lastFailTime_ > 0) {
// The server was marked as down, so check if enough time has elapsed to retry
- int elapsedTime = time(NULL) - server.lastFailTime_;
+ int elapsedTime = time(NULL) - server->lastFailTime_;
if (elapsedTime > retryInterval_) {
retryIntervalPassed = true;
}
@@ -165,23 +184,28 @@
try {
TSocket::open();
+ // Copy over the opened socket so that we can keep it persistent
+ server->socket_ = socket_;
+
// reset lastFailTime_ is required
- if (server.lastFailTime_) {
- server.lastFailTime_ = 0;
+ if (server->lastFailTime_) {
+ server->lastFailTime_ = 0;
}
// success
return;
} catch (TException e) {
+ string errStr = "TSocketPool::open failed "+getSocketInfo()+": "+e.what();
+ GlobalOutput(errStr.c_str());
// connection failed
}
}
- ++server.consecutiveFailures_;
- if (server.consecutiveFailures_ > maxConsecutiveFailures_) {
+ ++server->consecutiveFailures_;
+ if (server->consecutiveFailures_ > maxConsecutiveFailures_) {
// Mark server as down
- server.consecutiveFailures_ = 0;
- server.lastFailTime_ = time(NULL);
+ server->consecutiveFailures_ = 0;
+ server->lastFailTime_ = time(NULL);
}
}
}
@@ -190,4 +214,11 @@
throw TTransportException(TTransportException::NOT_OPEN);
}
+void TSocketPool::close() {
+ if (isOpen()) {
+ TSocket::close();
+ currentServer_->socket_ = -1;
+ }
+}
+
}}} // facebook::thrift::transport