THRIFT-265. cpp: Reset buffers every 512 calls in TNonblockingServer

Author: Erik Frey

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@750153 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 8644526..cfaf3be 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -216,8 +216,24 @@
   case APP_READ_REQUEST:
     // We are done reading the request, package the read buffer into transport
     // and get back some data from the dispatch function
+    // If we've used these transport buffers enough times, reset them to avoid bloating
+
     inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
-    outputTransport_->resetBuffer();
+    ++numReadsSinceReset_;
+    if (numWritesSinceReset_ < 512) {
+      outputTransport_->resetBuffer();
+    } else {
+      // reset the capacity of the output transport if we used it enough times that it might be bloated
+      try {
+        outputTransport_->resetBuffer(true);
+        numWritesSinceReset_ = 0;
+      } catch (TTransportException &ttx) {
+        GlobalOutput.printf("TTransportException: TMemoryBuffer::resetBuffer() %s", ttx.what());
+        close();
+        return;
+      }
+    }
+
     // Prepend four bytes of blank space to the buffer so we can
     // write the frame size there later.
     outputTransport_->getWritePtr(4);
@@ -327,11 +343,27 @@
 
   case APP_SEND_RESULT:
 
+    ++numWritesSinceReset_;
+
     // N.B.: We also intentionally fall through here into the INIT state!
 
   LABEL_APP_INIT:
   case APP_INIT:
 
+    // reset the input buffer if we used it enough times that it might be bloated
+    if (numReadsSinceReset_ > 512)
+    {
+      void * new_buffer = std::realloc(readBuffer_, 1024);
+      if (new_buffer == NULL) {
+        GlobalOutput("TConnection::transition() realloc");
+        close();
+        return;
+      }
+      readBuffer_ = (uint8_t*) new_buffer;
+      readBufferSize_ = 1024;
+      numReadsSinceReset_ = 0;
+    }
+
     // Clear write buffer variables
     writeBuffer_ = NULL;
     writeBufferPos_ = 0;
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index cf024ed..40ec574 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -248,6 +248,12 @@
   // How far through writing are we?
   uint32_t writeBufferPos_;
 
+  // How many times have we read since our last buffer reset?
+  uint32_t numReadsSinceReset_;
+
+  // How many times have we written since our last buffer reset?
+  uint32_t numWritesSinceReset_;
+
   // Task handle
   int taskHandle_;
 
@@ -304,6 +310,9 @@
     }
     readBufferSize_ = 1024;
 
+    numReadsSinceReset_ = 0;
+    numWritesSinceReset_ = 0;
+
     // Allocate input and output tranpsorts
     // these only need to be allocated once per TConnection (they don't need to be
     // reallocated on init() call)
diff --git a/lib/cpp/src/transport/TBufferTransports.h b/lib/cpp/src/transport/TBufferTransports.h
index f0298df..9d9510d 100644
--- a/lib/cpp/src/transport/TBufferTransports.h
+++ b/lib/cpp/src/transport/TBufferTransports.h
@@ -552,7 +552,23 @@
     str.append((char*)buf, sz);
   }
 
-  void resetBuffer() {
+  void resetBuffer(bool reset_capacity = false) {
+    if (reset_capacity)
+    {
+      assert(owner_);
+
+      void* new_buffer = std::realloc(buffer_, defaultSize);
+
+      if (new_buffer == NULL) {
+        throw TTransportException("Out of memory.");
+      }
+
+      buffer_ = (uint8_t*) new_buffer;
+      bufferSize_ = defaultSize;
+
+      wBound_ = buffer_ + bufferSize_;
+    }
+
     rBase_ = buffer_;
     rBound_ = buffer_;
     wBase_ = buffer_;