THRIFT-926. cpp: Add configurable buffer recycling for TNonblockingServer
Add methods to TNonblockingServer to set the maximum size of idle read
and write buffers and the check interval (in calls). When checked, if
the buffers are larger than the configured maximum, they will be resized
down the to maximum size.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005164 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 73edd93..1bf4e68 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -112,9 +112,11 @@
writeBuffer_ = NULL;
writeBufferSize_ = 0;
writeBufferPos_ = 0;
+ largestWriteBufferSize_ = 0;
socketState_ = SOCKET_RECV;
appState_ = APP_INIT;
+ callsForResize_ = 0;
// Set flags, which also registers the event
setFlags(eventFlags);
@@ -342,6 +344,16 @@
goto LABEL_APP_INIT;
case APP_SEND_RESULT:
+ // it's now safe to perform buffer size housekeeping.
+ if (writeBufferSize_ > largestWriteBufferSize_) {
+ largestWriteBufferSize_ = writeBufferSize_;
+ }
+ if (server_->getResizeBufferEveryN() > 0
+ && ++callsForResize_ >= server_->getResizeBufferEveryN()) {
+ checkIdleBufferMemLimit(server_->getIdleReadBufferLimit(),
+ server_->getIdleWriteBufferLimit());
+ callsForResize_ = 0;
+ }
// N.B.: We also intentionally fall through here into the INIT state!
@@ -486,15 +498,22 @@
server_->returnConnection(this);
}
-void TConnection::checkIdleBufferMemLimit(size_t limit) {
- if (readBufferSize_ > limit) {
- readBufferSize_ = limit;
+void TConnection::checkIdleBufferMemLimit(size_t readLimit,
+ size_t writeLimit) {
+ if (readLimit > 0 && readBufferSize_ > readLimit) {
+ readBufferSize_ = readLimit;
readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
if (readBuffer_ == NULL) {
GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
close();
}
}
+
+ if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
+ // just start over
+ outputTransport_->resetBuffer(NULL, 0, TMemoryBuffer::TAKE_OWNERSHIP);
+ largestWriteBufferSize_ = 0;
+ }
}
TNonblockingServer::~TNonblockingServer() {
@@ -546,7 +565,7 @@
(connectionStack_.size() >= connectionStackLimit_)) {
delete connection;
} else {
- connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
+ connection->checkIdleBufferMemLimit(idleReadBufferLimit_, idleWriteBufferLimit_);
connectionStack_.push(connection);
}
}
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index 3ad49c1..501433c 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -70,15 +70,21 @@
/// Default limit on size of idle connection pool
static const size_t CONNECTION_STACK_LIMIT = 1024;
- /// Maximum size of buffer allocated to idle connection
- static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
-
/// Default limit on total number of connected sockets
static const int MAX_CONNECTIONS = INT_MAX;
/// Default limit on connections in handler/task processing
static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
+ /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
+ static const int IDLE_READ_BUFFER_LIMIT = 1024;
+
+ /// Maximum size of write buffer allocated to idle connection (0 = unlimited)
+ static const int IDLE_WRITE_BUFFER_LIMIT = 1024;
+
+ /// # of calls before resizing oversized buffers (0 = check only on close)
+ static const int RESIZE_BUFFER_EVERY_N = 512;
+
/// Server socket file descriptor
int serverSocket_;
@@ -129,11 +135,27 @@
TOverloadAction overloadAction_;
/**
- * Max read buffer size for an idle connection. When we place an idle
- * TConnection into connectionStack_, we insure that its read buffer is
- * reduced to this size to insure that idle connections don't hog memory.
+ * Max read buffer size for an idle TConnection. When we place an idle
+ * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
+ * we insure that its read buffer is reduced to this size to insure that
+ * idle connections don't hog memory. 0 disables this check.
*/
- size_t idleBufferMemLimit_;
+ size_t idleReadBufferLimit_;
+
+ /**
+ * Max write buffer size for an idle connection. When we place an idle
+ * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
+ * we insure that its write buffer is <= to this size; otherwise we
+ * replace it with a new one to insure that idle connections don't hog
+ * memory. 0 disables this check.
+ */
+ size_t idleWriteBufferLimit_;
+
+ /**
+ * Every N calls we check the buffer size limits on a connected TConnection.
+ * 0 disables (i.e. the checks are only done when a connection closes).
+ */
+ int32_t resizeBufferEveryN_;
/// Set if we are currently in an overloaded state.
bool overloaded_;
@@ -181,7 +203,9 @@
taskExpireTime_(0),
overloadHysteresis_(0.8),
overloadAction_(T_OVERLOAD_NO_ACTION),
- idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
+ idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
+ idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
+ resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
overloaded_(false),
nConnectionsDropped_(0),
nTotalConnectionsDropped_(0) {}
@@ -203,7 +227,9 @@
taskExpireTime_(0),
overloadHysteresis_(0.8),
overloadAction_(T_OVERLOAD_NO_ACTION),
- idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
+ idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
+ idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
+ resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
overloaded_(false),
nConnectionsDropped_(0),
nTotalConnectionsDropped_(0) {
@@ -234,7 +260,9 @@
taskExpireTime_(0),
overloadHysteresis_(0.8),
overloadAction_(T_OVERLOAD_NO_ACTION),
- idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT),
+ idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
+ idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
+ resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
overloaded_(false),
nConnectionsDropped_(0),
nTotalConnectionsDropped_(0) {
@@ -446,26 +474,95 @@
bool drainPendingTask();
/**
- * Get the maximum limit of memory allocated to idle TConnection objects.
+ * Get the maximum size of read buffer allocated to idle TConnection objects.
+ *
+ * @return # bytes beyond which we will shrink buffers when idle.
+ */
+ size_t getIdleReadBufferLimit() const {
+ return idleReadBufferLimit_;
+ }
+
+ /**
+ * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
+ * Get the maximum size of read buffer allocated to idle TConnection objects.
*
* @return # bytes beyond which we will shrink buffers when idle.
*/
size_t getIdleBufferMemLimit() const {
- return idleBufferMemLimit_;
+ return idleReadBufferLimit_;
}
/**
- * Set the maximum limit of memory allocated to idle TConnection objects.
- * If a TConnection object goes idle with more than this much memory
- * allocated to its buffer, we shrink it to this value.
+ * Set the maximum size read buffer allocated to idle TConnection objects.
+ * If a TConnection object is found (either on connection close or between
+ * calls when resizeBufferEveryN_ is set) with more than this much memory
+ * allocated to its read buffer, we shrink it to this value.
+ *
+ * @param limit of bytes beyond which we will shrink buffers when checked.
+ */
+ void setIdleReadBufferLimit(size_t limit) {
+ idleReadBufferLimit_ = limit;
+ }
+
+ /**
+ * [NOTE: This is for backwards compatibility, use setIdleReadBufferLimit().]
+ * Set the maximum size read buffer allocated to idle TConnection objects.
+ * If a TConnection object is found (either on connection close or between
+ * calls when resizeBufferEveryN_ is set) with more than this much memory
+ * allocated to its read buffer, we shrink it to this value.
+ *
+ * @param limit of bytes beyond which we will shrink buffers when checked.
+ */
+ void setIdleBufferMemLimit(size_t limit) {
+ idleReadBufferLimit_ = limit;
+ }
+
+
+
+ /**
+ * Get the maximum size of write buffer allocated to idle TConnection objects.
+ *
+ * @return # bytes beyond which we will reallocate buffers when checked.
+ */
+ size_t getIdleWriteBufferLimit() const {
+ return idleWriteBufferLimit_;
+ }
+
+ /**
+ * Set the maximum size write buffer allocated to idle TConnection objects.
+ * If a TConnection object is found (either on connection close or between
+ * calls when resizeBufferEveryN_ is set) with more than this much memory
+ * allocated to its write buffer, we destroy and construct that buffer.
*
* @param limit of bytes beyond which we will shrink buffers when idle.
*/
- void setIdleBufferMemLimit(size_t limit) {
- idleBufferMemLimit_ = limit;
+ void setIdleWriteBufferLimit(size_t limit) {
+ idleWriteBufferLimit_ = limit;
}
/**
+ * Get # of calls made between buffer size checks. 0 means disabled.
+ *
+ * @return # of calls between buffer size checks.
+ */
+ int32_t getResizeBufferEveryN() const {
+ return resizeBufferEveryN_;
+ }
+
+ /**
+ * Check buffer sizes every "count" calls. This allows buffer limits
+ * to be enforced for persistant connections with a controllable degree
+ * of overhead. 0 disables checks except at connection close.
+ *
+ * @param count the number of calls between checks, or 0 to disable
+ */
+ void setResizeBufferEveryN(int32_t count) {
+ resizeBufferEveryN_ = count;
+ }
+
+
+
+ /**
* Return an initialized connection object. Creates or recovers from
* pool a TConnection and initializes it with the provided socket FD
* and flags.
@@ -581,7 +678,7 @@
* Represents a connection that is handled via libevent. This connection
* essentially encapsulates a socket that has some associated libevent state.
*/
- class TConnection {
+class TConnection {
private:
/// Starting size for new connection buffer
@@ -626,6 +723,12 @@
/// How far through writing are we?
uint32_t writeBufferPos_;
+ /// Largest size of write buffer seen since buffer was constructed
+ size_t largestWriteBufferSize_;
+
+ /// Count of the number of calls for use with getResizeBufferEveryN().
+ int32_t callsForResize_;
+
/// Task handle
int taskHandle_;
@@ -716,12 +819,13 @@
server_->decrementNumConnections();
}
- /**
- * Check read buffer against a given limit and shrink it if exceeded.
+ /**
+ * Check buffers against any size limits and shrink it if exceeded.
*
- * @param limit we limit buffer size to.
+ * @param readLimit we reduce read buffer size to this (if nonzero).
+ * @param writeLimit if nonzero and write buffer is larger, replace it.
*/
- void checkIdleBufferMemLimit(size_t limit);
+ void checkIdleBufferMemLimit(size_t readLimit, size_t writeLimit);
/// Initialize
void init(int socket, short eventFlags, TNonblockingServer *s,