Thrift-357. cpp: Fix buffer and connection bloat in TNonBlockingServer

Author: Anthony Giardullo

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@755824 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index cfaf3be..583877b 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -495,6 +495,17 @@
   server_->returnConnection(this);
 }
 
+void TConnection::checkIdleBufferMemLimit(uint32_t limit) {
+  if (readBufferSize_ > limit) {
+    readBufferSize_ = limit;
+    readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
+    if (readBuffer_ == NULL) {
+      GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
+      close();
+    }
+  }
+}
+
 /**
  * Creates a new connection either by reusing an object off the stack or
  * by allocating a new one entirely
@@ -515,7 +526,13 @@
  * Returns a connection to the stack
  */
 void TNonblockingServer::returnConnection(TConnection* connection) {
-  connectionStack_.push(connection);
+  if (connectionStackLimit_ &&
+      (connectionStack_.size() >= connectionStackLimit_)) {
+    delete connection;
+  } else {
+    connection->checkIdleBufferMemLimit(idleBufferMemLimit_);
+    connectionStack_.push(connection);
+  }
 }
 
 /**
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index 40ec574..a47a2c0 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -43,6 +43,12 @@
   // Listen backlog
   static const int LISTEN_BACKLOG = 1024;
 
+  // Default limit on size of idle connection pool
+  static const size_t CONNECTION_STACK_LIMIT = 1024;
+
+  // Maximum size of buffer allocated to idle connection
+  static const uint32_t IDLE_BUFFER_MEM_LIMIT = 8192;
+
   // Server socket file descriptor
   int serverSocket_;
 
@@ -64,6 +70,16 @@
   // Number of TConnection object we've created
   size_t numTConnections_;
 
+  // Limit for how many TConnection objects to cache
+  size_t connectionStackLimit_;
+
+  /**
+   * Max read buffer size for an idle connection.  When we place an idle
+   * TConnection into connectionStack_, we insure that its read buffer is
+   * reduced to this size to insure that idle connections don't hog memory.
+   */
+  uint32_t idleBufferMemLimit_;
+
   /**
    * This is a stack of all the objects that have been created but that
    * are NOT currently in use. When we close a connection, we place it on this
@@ -82,7 +98,9 @@
     port_(port),
     threadPoolProcessing_(false),
     eventBase_(NULL),
-    numTConnections_(0) {}
+    numTConnections_(0),
+    connectionStackLimit_(CONNECTION_STACK_LIMIT),
+    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {}
 
   TNonblockingServer(boost::shared_ptr<TProcessor> processor,
                      boost::shared_ptr<TProtocolFactory> protocolFactory,
@@ -93,7 +111,9 @@
     port_(port),
     threadManager_(threadManager),
     eventBase_(NULL),
-    numTConnections_(0) {
+    numTConnections_(0),
+    connectionStackLimit_(CONNECTION_STACK_LIMIT),
+    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
     setInputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setOutputTransportFactory(boost::shared_ptr<TTransportFactory>(new TTransportFactory()));
     setInputProtocolFactory(protocolFactory);
@@ -113,7 +133,9 @@
     port_(port),
     threadManager_(threadManager),
     eventBase_(NULL),
-    numTConnections_(0) {
+    numTConnections_(0),
+    connectionStackLimit_(CONNECTION_STACK_LIMIT),
+    idleBufferMemLimit_(IDLE_BUFFER_MEM_LIMIT) {
     setInputTransportFactory(inputTransportFactory);
     setOutputTransportFactory(outputTransportFactory);
     setInputProtocolFactory(inputProtocolFactory);
@@ -132,6 +154,24 @@
     return threadManager_;
   }
 
+  /**
+   * Get the maximum number of unused TConnection we will hold in reserve.
+   *
+   * @return the current limit on TConnection pool size.
+   */
+  int getConnectionStackLimit() const {
+    return connectionStackLimit_;
+  }
+
+  /**
+   * Set the maximum number of unused TConnection we will hold in reserve.
+   *
+   * @param sz the new limit for TConnection pool size.
+   */
+  void setConnectionStackLimit(int sz) {
+    connectionStackLimit_ = sz;
+  }
+
   bool isThreadPoolProcessing() const {
     return threadPoolProcessing_;
   }
@@ -160,6 +200,26 @@
     return connectionStack_.size();
   }
 
+  /**
+   * Get the maximum limit of memory allocated to idle TConnection objects.
+   *
+   * @return # bytes beyond which we will shrink buffers when idle.
+   */
+  size_t getIdleBufferMemLimit() const {
+    return idleBufferMemLimit_;
+  }
+
+  /**
+   * Set the maximum limit of memory allocated to idle TConnection objects.
+   * If a TConnection object goes idle with more than this much memory
+   * allocated to its buffer, we shrink it to this value.
+   *
+   * @param limit of bytes beyond which we will shrink buffers when idle.
+   */
+  void setIdleBufferMemLimit(size_t limit) {
+    idleBufferMemLimit_ = limit;
+  }
+
   TConnection* createConnection(int socket, short flags);
 
   void returnConnection(TConnection* connection);
@@ -327,6 +387,13 @@
     server_->decrementNumConnections();
   }
 
+  /**
+   * Check read buffer against a given limit and shrink it if exceeded.
+   *
+   * @param limit we limit buffer size to.
+   */
+  void checkIdleBufferMemLimit(uint32_t limit);
+
   // Initialize
   void init(int socket, short eventFlags, TNonblockingServer *s);