THRIFT-926. cpp: Better buffer management for TNonblockingServer
Add two improvements to memory management in TNonblocking server:
- Separate the receive code into two distinct states: one for receiving
the frame header and one for the frame content. This allows us to
size the initial read buffer based on the initial frame size, rather
than allocating an arbitrary amount of memory before reading the
header.
- Allow setting the initial write buffer size based on the application's
expected response size.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005169 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 1bf4e68..41056ab 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -114,7 +114,7 @@
writeBufferPos_ = 0;
largestWriteBufferSize_ = 0;
- socketState_ = SOCKET_RECV;
+ socketState_ = SOCKET_RECV_FRAMING;
appState_ = APP_INIT;
callsForResize_ = 0;
@@ -143,26 +143,52 @@
uint32_t fetch = 0;
switch (socketState_) {
- case SOCKET_RECV:
- // It is an error to be in this state if we already have all the data
- assert(readBufferPos_ < readWant_);
+ case SOCKET_RECV_FRAMING:
+ union {
+ uint8_t buf[sizeof(uint32_t)];
+ int32_t size;
+ } framing;
- // Double the buffer size until it is big enough
- if (readWant_ > readBufferSize_) {
- uint32_t newSize = readBufferSize_;
- while (readWant_ > newSize) {
- newSize *= 2;
- }
- uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
- if (newBuffer == NULL) {
- GlobalOutput("TConnection::workSocket() realloc");
+ // if we've already received some bytes we kept them here
+ framing.size = readWant_;
+ // determine size of this frame
+ try {
+ // Read from the socket
+ fetch = tSocket_->read(&framing.buf[readBufferPos_],
+ uint32_t(sizeof(framing.size) - readBufferPos_));
+ if (fetch == 0) {
+ // Whenever we get here it means a remote disconnect
close();
return;
}
- readBuffer_ = newBuffer;
- readBufferSize_ = newSize;
+ readBufferPos_ += fetch;
+ } catch (TTransportException& te) {
+ GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+ close();
+
+ return;
}
+ if (readBufferPos_ < sizeof(framing.size)) {
+ // more needed before frame size is known -- save what we have so far
+ readWant_ = framing.size;
+ return;
+ }
+
+ readWant_ = ntohl(framing.size);
+ if (static_cast<int>(readWant_) <= 0) {
+ GlobalOutput.printf("TConnection:workSocket() Negative frame size %d, remote side not using TFramedTransport?", static_cast<int>(readWant_));
+ close();
+ return;
+ }
+ // size known; now get the rest of the frame
+ transition();
+ return;
+
+ case SOCKET_RECV:
+ // It is an error to be in this state if we already have all the data
+ assert(readBufferPos_ < readWant_);
+
try {
// Read from the socket
fetch = readWant_ - readBufferPos_;
@@ -365,14 +391,12 @@
writeBufferPos_ = 0;
writeBufferSize_ = 0;
- // Set up read buffer for getting 4 bytes
- readBufferPos_ = 0;
- readWant_ = 4;
-
// Into read4 state we go
- socketState_ = SOCKET_RECV;
+ socketState_ = SOCKET_RECV_FRAMING;
appState_ = APP_READ_FRAME_SIZE;
+ readBufferPos_ = 0;
+
// Register read event
setRead();
@@ -382,21 +406,30 @@
return;
case APP_READ_FRAME_SIZE:
- // We just read the request length, deserialize it
- sz = *(int32_t*)readBuffer_;
- sz = (int32_t)ntohl(sz);
+ // We just read the request length
+ // Double the buffer size until it is big enough
+ if (readWant_ > readBufferSize_) {
+ if (readBufferSize_ == 0) {
+ readBufferSize_ = 1;
+ }
+ uint32_t newSize = readBufferSize_;
+ while (readWant_ > newSize) {
+ newSize *= 2;
+ }
- if (sz <= 0) {
- GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
- close();
- return;
+ uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
+ if (newBuffer == NULL) {
+ // nothing else to be done...
+ throw std::bad_alloc();
+ }
+ readBuffer_ = newBuffer;
+ readBufferSize_ = newSize;
}
- // Reset the read buffer
- readWant_ = (uint32_t)sz;
readBufferPos_= 0;
// Move into read request state
+ socketState_ = SOCKET_RECV;
appState_ = APP_READ_REQUEST;
// Work the socket right away
@@ -501,17 +534,14 @@
void TConnection::checkIdleBufferMemLimit(size_t readLimit,
size_t writeLimit) {
if (readLimit > 0 && readBufferSize_ > readLimit) {
- readBufferSize_ = readLimit;
- readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
- if (readBuffer_ == NULL) {
- GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
- close();
- }
+ free(readBuffer_);
+ readBuffer_ = NULL;
+ readBufferSize_ = 0;
}
if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
// just start over
- outputTransport_->resetBuffer(NULL, 0, TMemoryBuffer::TAKE_OWNERSHIP);
+ outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
largestWriteBufferSize_ = 0;
}
}