THRIFT-265. cpp: Reset buffers every 512 calls in TNonblockingServer
Author: Erik Frey
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@750153 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 8644526..cfaf3be 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -216,8 +216,24 @@
case APP_READ_REQUEST:
// We are done reading the request, package the read buffer into transport
// and get back some data from the dispatch function
+ // If we've used these transport buffers enough times, reset them to avoid bloating
+
inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
- outputTransport_->resetBuffer();
+ ++numReadsSinceReset_;
+ if (numWritesSinceReset_ < 512) {
+ outputTransport_->resetBuffer();
+ } else {
+ // reset the capacity of the output transport if we used it enough times that it might be bloated
+ try {
+ outputTransport_->resetBuffer(true);
+ numWritesSinceReset_ = 0;
+ } catch (TTransportException &ttx) {
+ GlobalOutput.printf("TTransportException: TMemoryBuffer::resetBuffer() %s", ttx.what());
+ close();
+ return;
+ }
+ }
+
// Prepend four bytes of blank space to the buffer so we can
// write the frame size there later.
outputTransport_->getWritePtr(4);
@@ -327,11 +343,27 @@
case APP_SEND_RESULT:
+ ++numWritesSinceReset_;
+
// N.B.: We also intentionally fall through here into the INIT state!
LABEL_APP_INIT:
case APP_INIT:
+ // reset the input buffer if we used it enough times that it might be bloated
+ if (numReadsSinceReset_ > 512)
+ {
+ void * new_buffer = std::realloc(readBuffer_, 1024);
+ if (new_buffer == NULL) {
+ GlobalOutput("TConnection::transition() realloc");
+ close();
+ return;
+ }
+ readBuffer_ = (uint8_t*) new_buffer;
+ readBufferSize_ = 1024;
+ numReadsSinceReset_ = 0;
+ }
+
// Clear write buffer variables
writeBuffer_ = NULL;
writeBufferPos_ = 0;
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index cf024ed..40ec574 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -248,6 +248,12 @@
// How far through writing are we?
uint32_t writeBufferPos_;
+ // How many times have we read since our last buffer reset?
+ uint32_t numReadsSinceReset_;
+
+ // How many times have we written since our last buffer reset?
+ uint32_t numWritesSinceReset_;
+
// Task handle
int taskHandle_;
@@ -304,6 +310,9 @@
}
readBufferSize_ = 1024;
+ numReadsSinceReset_ = 0;
+ numWritesSinceReset_ = 0;
+
// Allocate input and output tranpsorts
// these only need to be allocated once per TConnection (they don't need to be
// reallocated on init() call)
diff --git a/lib/cpp/src/transport/TBufferTransports.h b/lib/cpp/src/transport/TBufferTransports.h
index f0298df..9d9510d 100644
--- a/lib/cpp/src/transport/TBufferTransports.h
+++ b/lib/cpp/src/transport/TBufferTransports.h
@@ -552,7 +552,23 @@
str.append((char*)buf, sz);
}
- void resetBuffer() {
+ void resetBuffer(bool reset_capacity = false) {
+ if (reset_capacity)
+ {
+ assert(owner_);
+
+ void* new_buffer = std::realloc(buffer_, defaultSize);
+
+ if (new_buffer == NULL) {
+ throw TTransportException("Out of memory.");
+ }
+
+ buffer_ = (uint8_t*) new_buffer;
+ bufferSize_ = defaultSize;
+
+ wBound_ = buffer_ + bufferSize_;
+ }
+
rBase_ = buffer_;
rBound_ = buffer_;
wBase_ = buffer_;