Thrift-357. cpp: Fix buffer and connection bloat in TNonBlockingServer
Author: Anthony Giardullo
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@755824 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index cfaf3be..583877b 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -495,6 +495,17 @@
server_->returnConnection(this);
}
+void TConnection::checkIdleBufferMemLimit(uint32_t limit) {
+ if (readBufferSize_ > limit) {
+ readBufferSize_ = limit;
+ readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
+ if (readBuffer_ == NULL) {
+ GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
+ close();
+ }
+ }
+}
+
/**
* Creates a new connection either by reusing an object off the stack or
* by allocating a new one entirely
@@ -515,7 +526,13 @@
* Returns a connection to the stack
*/
void TNonblockingServer::returnConnection(TConnection* connection) {
- connectionStack_.push(connection);
+ if (connectionStackLimit_ &&
+ (connectionStack_.size() >= connectionStackLimit_)) {
+ delete connection;
+ } else {
+ connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
+ connectionStack_.push(connection);
+ }
}
/**
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index 40ec574..a47a2c0 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -43,6 +43,12 @@
// Listen backlog
static const int LISTEN_BACKLOG = 1024;
+ // Default limit on size of idle connection pool
+ static const size_t CONNECTION_STACK_LIMIT = 1024;
+
+ // Maximum size of buffer allocated to idle connection
+ static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
+
// Server socket file descriptor
int serverSocket_;
@@ -64,6 +70,16 @@
// Number of TConnection object we've created
size_t numTConnections_;
+ // Limit for how many TConnection objects to cache
+ size_t connectionStackLimit_;
+
+ /**
+ * Max read buffer size for an idle connection. When we place an idle
+ * TConnection into connectionStack_, we insure that its read buffer is
+ * reduced to this size to insure that idle connections don't hog memory.
+ */
+ uint32_t idleBufferMemLimit_;
+
/**
* This is a stack of all the objects that have been created but that
* are NOT currently in use. When we close a connection, we place it on this
@@ -82,7 +98,9 @@
port_(port),
threadPoolProcessing_(false),
eventBase_(NULL),
- numTConnections_(0) {}
+ numTConnections_(0),
+ connectionStackLimit_(CONNECTION_STACK_LIMIT),
+ idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {}
TNonblockingServer(boost::shared_ptr<TProcessor> processor,
boost::shared_ptr<TProtocolFactory> protocolFactory,
@@ -93,7 +111,9 @@
port_(port),
threadManager_(threadManager),
eventBase_(NULL),
- numTConnections_(0) {
+ numTConnections_(0),
+ connectionStackLimit_(CONNECTION_STACK_LIMIT),
+ idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
setInputProtocolFactory(protocolFactory);
@@ -113,7 +133,9 @@
port_(port),
threadManager_(threadManager),
eventBase_(NULL),
- numTConnections_(0) {
+ numTConnections_(0),
+ connectionStackLimit_(CONNECTION_STACK_LIMIT),
+ idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
setInputTransportFactory(inputTransportFactory);
setOutputTransportFactory(outputTransportFactory);
setInputProtocolFactory(inputProtocolFactory);
@@ -132,6 +154,24 @@
return threadManager_;
}
+ /**
+ * Get the maximum number of unused TConnection we will hold in reserve.
+ *
+ * @return the current limit on TConnection pool size.
+ */
+ int getConnectionStackLimit() const {
+ return connectionStackLimit_;
+ }
+
+ /**
+ * Set the maximum number of unused TConnection we will hold in reserve.
+ *
+ * @param sz the new limit for TConnection pool size.
+ */
+ void setConnectionStackLimit(int sz) {
+ connectionStackLimit_ = sz;
+ }
+
bool isThreadPoolProcessing() const {
return threadPoolProcessing_;
}
@@ -160,6 +200,26 @@
return connectionStack_.size();
}
+ /**
+ * Get the maximum limit of memory allocated to idle TConnection objects.
+ *
+ * @return # bytes beyond which we will shrink buffers when idle.
+ */
+ size_t getIdleBufferMemLimit() const {
+ return idleBufferMemLimit_;
+ }
+
+ /**
+ * Set the maximum limit of memory allocated to idle TConnection objects.
+ * If a TConnection object goes idle with more than this much memory
+ * allocated to its buffer, we shrink it to this value.
+ *
+ * @param limit of bytes beyond which we will shrink buffers when idle.
+ */
+ void setIdleBufferMemLimit(size_t limit) {
+ idleBufferMemLimit_ = limit;
+ }
+
TConnection* createConnection(int socket, short flags);
void returnConnection(TConnection* connection);
@@ -327,6 +387,13 @@
server_->decrementNumConnections();
}
+ /**
+ * Check read buffer against a given limit and shrink it if exceeded.
+ *
+ * @param limit we limit buffer size to.
+ */
+ void checkIdleBufferMemLimit(uint32_t limit);
+
// Initialize
void init(int socket, short eventFlags, TNonblockingServer *s);