Persistent conns in TSocketPool
Summary: Added support for persistent conns in TSocketPool
Also, added some util functions in TNonblockingServer
Reviewed By: mcslee
Test Plan: Ran a test search cluster with these changes - open was only called once (I put fprintfs in open and close), after which the socket was reused
Revert: OK
TracCamp Project: Thrift
DiffCamp Revision: 11425
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665668 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index 7c6bc7f..0c3c6d5 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -64,6 +64,9 @@
// Event struct, for use with eventBase_
struct event serverEvent_;
+ // Number of TConnection object we've created
+ size_t numTConnections_;
+
/**
* This is a stack of all the objects that have been created but that
* are NOT currently in use. When we close a connection, we place it on this
@@ -82,7 +85,8 @@
port_(port),
frameResponses_(true),
threadPoolProcessing_(false),
- eventBase_(NULL) {}
+ eventBase_(NULL),
+ numTConnections_(0) {}
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocolFactory> protocolFactory,
@@ -93,7 +97,8 @@
port_(port),
frameResponses_(true),
threadManager_(threadManager),
- eventBase_(NULL) {
+ eventBase_(NULL),
+ numTConnections_(0) {
setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(protocolFactory);
@@ -113,7 +118,8 @@
port_(port),
frameResponses_(true),
threadManager_(threadManager),
- eventBase_(NULL) {
+ eventBase_(NULL),
+ numTConnections_(0) {
setInputTransportFactory(inputTransportFactory);
setOutputTransportFactory(outputTransportFactory);
setInputProtocolFactory(inputProtocolFactory);
@@ -128,6 +134,10 @@
threadPoolProcessing_ = (threadManager != NULL);
}
+ boost::shared_ptr<ThreadManager> getThreadManager() {
+ return threadManager_;
+ }
+
bool isThreadPoolProcessing() const {
return threadPoolProcessing_;
}
@@ -148,6 +158,18 @@
return eventBase_;
}
+ void incrementNumConnections(size_t incr=1) {
+ numTConnections_ += incr;
+ }
+
+ size_t getNumConnections() {
+ return numTConnections_;
+ }
+
+ size_t getNumIdleConnections() {
+ return connectionStack_.size();
+ }
+
TConnection* createConnection(int socket, short flags);
void returnConnection(TConnection* connection);
@@ -163,7 +185,6 @@
void registerEvents(event_base* base);
void serve();
-
};
/**
@@ -304,6 +325,11 @@
outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
init(socket, eventFlags, s);
+ server_->incrementNumConnections();
+ }
+
+ ~TConnection() {
+ server_->incrementNumConnections(-1);
}
// Initialize
diff --git a/lib/cpp/src/transport/TSocketPool.cpp b/lib/cpp/src/transport/TSocketPool.cpp
index e22a4b8..d5a6501 100644
--- a/lib/cpp/src/transport/TSocketPool.cpp
+++ b/lib/cpp/src/transport/TSocketPool.cpp
@@ -23,6 +23,7 @@
TSocketPoolServer::TSocketPoolServer()
: host_(""),
port_(0),
+ socket_(-1),
lastFailTime_(0),
consecutiveFailures_(0) {}
@@ -32,6 +33,7 @@
TSocketPoolServer::TSocketPoolServer(const string &host, int port)
: host_(host),
port_(port),
+ socket_(-1),
lastFailTime_(0),
consecutiveFailures_(0) {}
@@ -100,7 +102,12 @@
}
TSocketPool::~TSocketPool() {
- close();
+ vector< shared_ptr<TSocketPoolServer> >::const_iterator iter = servers_.begin();
+ vector< shared_ptr<TSocketPoolServer> >::const_iterator iterEnd = servers_.end();
+ for (; iter != iterEnd; ++iter) {
+ setCurrentServer(*iter);
+ TSocketPool::close();
+ }
}
void TSocketPool::addServer(const string& host, int port) {
@@ -136,6 +143,13 @@
alwaysTryLast_ = alwaysTryLast;
}
+void TSocketPool::setCurrentServer(const shared_ptr<TSocketPoolServer> &server) {
+ currentServer_ = server;
+ host_ = server->host_;
+ port_ = server->port_;
+ socket_ = server->socket_;
+}
+
/* TODO: without apc we ignore a lot of functionality from the php version */
void TSocketPool::open() {
if (randomize_) {
@@ -145,16 +159,21 @@
unsigned int numServers = servers_.size();
for (unsigned int i = 0; i < numServers; ++i) {
- TSocketPoolServer &server = *(servers_[i]);
- bool retryIntervalPassed = (server.lastFailTime_ == 0);
+ shared_ptr<TSocketPoolServer> &server = servers_[i];
+ bool retryIntervalPassed = (server->lastFailTime_ == 0);
bool isLastServer = alwaysTryLast_ ? (i == (numServers - 1)) : false;
- host_ = server.host_;
- port_ = server.port_;
+ // Impersonate the server socket
+ setCurrentServer(server);
- if (server.lastFailTime_ > 0) {
+ if (isOpen()) {
+ // already open means we're done
+ return;
+ }
+
+ if (server->lastFailTime_ > 0) {
// The server was marked as down, so check if enough time has elapsed to retry
- int elapsedTime = time(NULL) - server.lastFailTime_;
+ int elapsedTime = time(NULL) - server->lastFailTime_;
if (elapsedTime > retryInterval_) {
retryIntervalPassed = true;
}
@@ -165,23 +184,28 @@
try {
TSocket::open();
+ // Copy over the opened socket so that we can keep it persistent
+ server->socket_ = socket_;
+
// reset lastFailTime_ is required
- if (server.lastFailTime_) {
- server.lastFailTime_ = 0;
+ if (server->lastFailTime_) {
+ server->lastFailTime_ = 0;
}
// success
return;
} catch (TException e) {
+ string errStr = "TSocketPool::open failed "+getSocketInfo()+": "+e.what();
+ GlobalOutput(errStr.c_str());
// connection failed
}
}
- ++server.consecutiveFailures_;
- if (server.consecutiveFailures_ > maxConsecutiveFailures_) {
+ ++server->consecutiveFailures_;
+ if (server->consecutiveFailures_ > maxConsecutiveFailures_) {
// Mark server as down
- server.consecutiveFailures_ = 0;
- server.lastFailTime_ = time(NULL);
+ server->consecutiveFailures_ = 0;
+ server->lastFailTime_ = time(NULL);
}
}
}
@@ -190,4 +214,11 @@
throw TTransportException(TTransportException::NOT_OPEN);
}
+void TSocketPool::close() {
+ if (isOpen()) {
+ TSocket::close();
+ currentServer_->socket_ = -1;
+ }
+}
+
}}} // facebook::thrift::transport
diff --git a/lib/cpp/src/transport/TSocketPool.h b/lib/cpp/src/transport/TSocketPool.h
index 6c3b0f7..a455aca 100644
--- a/lib/cpp/src/transport/TSocketPool.h
+++ b/lib/cpp/src/transport/TSocketPool.h
@@ -36,6 +36,9 @@
// Port to connect on
int port_;
+ // Socket for the server
+ int socket_;
+
// Last time connecting to this server failed
int lastFailTime_;
@@ -138,11 +141,21 @@
*/
void open();
+ /*
+ * Closes the UNIX socket
+ */
+ void close();
+
protected:
+ void setCurrentServer(const boost::shared_ptr<TSocketPoolServer> &server);
+
/** List of servers to connect to */
std::vector< boost::shared_ptr<TSocketPoolServer> > servers_;
+ /** Current server */
+ boost::shared_ptr<TSocketPoolServer> currentServer_;
+
/** How many times to retry each host in connect */
int numRetries_;