THRIFT-926. cpp: Better buffer management for TNonblockingServer

Add two improvements to memory management in TNonblocking server:
- Separate the receive code into two distinct states: one for receiving
  the frame header and one for the frame content.  This allows us to
  size the initial read buffer based on the initial frame size, rather
  than allocating an arbitrary amount of memory before reading the
  header.
- Allow setting the initial write buffer size based on the application's
  expected response size.

git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@1005169 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index 1bf4e68..41056ab 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -114,7 +114,7 @@
   writeBufferPos_ = 0;
   largestWriteBufferSize_ = 0;
 
-  socketState_ = SOCKET_RECV;
+  socketState_ = SOCKET_RECV_FRAMING;
   appState_ = APP_INIT;
   callsForResize_ = 0;
 
@@ -143,26 +143,52 @@
   uint32_t fetch = 0;
 
   switch (socketState_) {
-  case SOCKET_RECV:
-    // It is an error to be in this state if we already have all the data
-    assert(readBufferPos_ < readWant_);
+  case SOCKET_RECV_FRAMING:
+    union {
+      uint8_t buf[sizeof(uint32_t)];
+      int32_t size;
+    } framing;
 
-    // Double the buffer size until it is big enough
-    if (readWant_ > readBufferSize_) {
-      uint32_t newSize = readBufferSize_;
-      while (readWant_ > newSize) {
-        newSize *= 2;
-      }
-      uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
-      if (newBuffer == NULL) {
-        GlobalOutput("TConnection::workSocket() realloc");
+    // if we've already received some bytes we kept them here
+    framing.size = readWant_;
+    // determine size of this frame
+    try {
+      // Read from the socket
+      fetch = tSocket_->read(&framing.buf[readBufferPos_],
+                             uint32_t(sizeof(framing.size) - readBufferPos_));
+      if (fetch == 0) {
+        // Whenever we get here it means a remote disconnect
         close();
         return;
       }
-      readBuffer_ = newBuffer;
-      readBufferSize_ = newSize;
+      readBufferPos_ += fetch;
+    } catch (TTransportException& te) {
+      GlobalOutput.printf("TConnection::workSocket(): %s", te.what());
+      close();
+
+      return;
     }
 
+    if (readBufferPos_ < sizeof(framing.size)) {
+      // more needed before frame size is known -- save what we have so far
+      readWant_ = framing.size;
+      return;
+    }
+
+    readWant_ = ntohl(framing.size);
+    if (static_cast<int>(readWant_) <= 0) {
+      GlobalOutput.printf("TConnection:workSocket() Negative frame size %d, remote side not using TFramedTransport?", static_cast<int>(readWant_));
+      close();
+      return;
+    }
+    // size known; now get the rest of the frame
+    transition();
+    return;
+
+  case SOCKET_RECV:
+    // It is an error to be in this state if we already have all the data
+    assert(readBufferPos_ < readWant_);
+
     try {
       // Read from the socket
       fetch = readWant_ - readBufferPos_;
@@ -365,14 +391,12 @@
     writeBufferPos_ = 0;
     writeBufferSize_ = 0;
 
-    // Set up read buffer for getting 4 bytes
-    readBufferPos_ = 0;
-    readWant_ = 4;
-
     // Into read4 state we go
-    socketState_ = SOCKET_RECV;
+    socketState_ = SOCKET_RECV_FRAMING;
     appState_ = APP_READ_FRAME_SIZE;
 
+    readBufferPos_ = 0;
+
     // Register read event
     setRead();
 
@@ -382,21 +406,30 @@
     return;
 
   case APP_READ_FRAME_SIZE:
-    // We just read the request length, deserialize it
-    sz = *(int32_t*)readBuffer_;
-    sz = (int32_t)ntohl(sz);
+    // We just read the request length
+    // Double the buffer size until it is big enough
+    if (readWant_ > readBufferSize_) {
+      if (readBufferSize_ == 0) {
+        readBufferSize_ = 1;
+      }
+      uint32_t newSize = readBufferSize_;
+      while (readWant_ > newSize) {
+        newSize *= 2;
+      }
 
-    if (sz <= 0) {
-      GlobalOutput.printf("TConnection:transition() Negative frame size %d, remote side not using TFramedTransport?", sz);
-      close();
-      return;
+      uint8_t* newBuffer = (uint8_t*)std::realloc(readBuffer_, newSize);
+      if (newBuffer == NULL) {
+        // nothing else to be done...
+        throw std::bad_alloc();
+      }
+      readBuffer_ = newBuffer;
+      readBufferSize_ = newSize;
     }
 
-    // Reset the read buffer
-    readWant_ = (uint32_t)sz;
     readBufferPos_= 0;
 
     // Move into read request state
+    socketState_ = SOCKET_RECV;
     appState_ = APP_READ_REQUEST;
 
     // Work the socket right away
@@ -501,17 +534,14 @@
 void TConnection::checkIdleBufferMemLimit(size_t readLimit,
                                           size_t writeLimit) {
   if (readLimit > 0 && readBufferSize_ > readLimit) {
-    readBufferSize_ = readLimit;
-    readBuffer_ = (uint8_t*)std::realloc(readBuffer_, readBufferSize_);
-    if (readBuffer_ == NULL) {
-      GlobalOutput("TConnection::checkIdleBufferMemLimit() realloc");
-      close();
-    }
+    free(readBuffer_);
+    readBuffer_ = NULL;
+    readBufferSize_ = 0;
   }
 
   if (writeLimit > 0 && largestWriteBufferSize_ > writeLimit) {
     // just start over
-    outputTransport_->resetBuffer(NULL, 0, TMemoryBuffer::TAKE_OWNERSHIP);
+    outputTransport_->resetBuffer(server_->getWriteBufferDefaultSize());
     largestWriteBufferSize_ = 0;
   }
 }
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index 501433c..0252f10 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -76,6 +76,9 @@
   /// Default limit on connections in handler/task processing
   static const int MAX_ACTIVE_PROCESSORS = INT_MAX;
 
+  /// Default size of write buffer
+  static const int WRITE_BUFFER_DEFAULT_SIZE = 1024;
+
   /// Maximum size of read buffer allocated to idle connection (0 = unlimited)
   static const int IDLE_READ_BUFFER_LIMIT = 1024;
 
@@ -135,10 +138,16 @@
   TOverloadAction overloadAction_;
 
   /**
+   * The write buffer is initialized (and when idleWriteBufferLimit_ is checked
+   * and found to be exceeded, reinitialized) to this size.
+   */
+  size_t writeBufferDefaultSize_;
+
+  /**
    * Max read buffer size for an idle TConnection.  When we place an idle
    * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
-   * we insure that its read buffer is reduced to this size to insure that
-   * idle connections don't hog memory.  0 disables this check.
+   * we will free the buffer (such that it will be reinitialized by the next
+   * received frame) if it has exceeded this limit.  0 disables this check.
    */
   size_t idleReadBufferLimit_;
 
@@ -146,8 +155,8 @@
    * Max write buffer size for an idle connection.  When we place an idle
    * TConnection into connectionStack_ or on every resizeBufferEveryN_ calls,
    * we insure that its write buffer is <= to this size; otherwise we
-   * replace it with a new one to insure that idle connections don't hog
-   * memory. 0 disables this check.
+   * replace it with a new one of writeBufferDefaultSize_ bytes to insure that
+   * idle connections don't hog memory. 0 disables this check.
    */
   size_t idleWriteBufferLimit_;
 
@@ -203,6 +212,7 @@
     taskExpireTime_(0),
     overloadHysteresis_(0.8),
     overloadAction_(T_OVERLOAD_NO_ACTION),
+    writeBufferDefaultSize_(WRITE_BUFFER_DEFAULT_SIZE),
     idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
     idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
     resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
@@ -227,6 +237,7 @@
     taskExpireTime_(0),
     overloadHysteresis_(0.8),
     overloadAction_(T_OVERLOAD_NO_ACTION),
+    writeBufferDefaultSize_(WRITE_BUFFER_DEFAULT_SIZE),
     idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
     idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
     resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
@@ -260,6 +271,7 @@
     taskExpireTime_(0),
     overloadHysteresis_(0.8),
     overloadAction_(T_OVERLOAD_NO_ACTION),
+    writeBufferDefaultSize_(WRITE_BUFFER_DEFAULT_SIZE),
     idleReadBufferLimit_(IDLE_READ_BUFFER_LIMIT),
     idleWriteBufferLimit_(IDLE_WRITE_BUFFER_LIMIT),
     resizeBufferEveryN_(RESIZE_BUFFER_EVERY_N),
@@ -474,9 +486,27 @@
   bool drainPendingTask();
 
   /**
+   * Get the starting size of a TConnection object's write buffer.
+   *
+   * @return # bytes we initialize a TConnection object's write buffer to.
+   */
+  size_t getWriteBufferDefaultSize() const {
+    return writeBufferDefaultSize_;
+  }
+
+  /**
+   * Set the starting size of a TConnection object's write buffer.
+   *
+   * @param size # bytes we initialize a TConnection object's write buffer to.
+   */
+  void setWriteBufferDefaultSize(size_t size) {
+    writeBufferDefaultSize_ = size;
+  }
+
+  /**
    * Get the maximum size of read buffer allocated to idle TConnection objects.
    *
-   * @return # bytes beyond which we will shrink buffers when idle.
+   * @return # bytes beyond which we will dealloc idle buffer.
    */
   size_t getIdleReadBufferLimit() const {
     return idleReadBufferLimit_;
@@ -486,7 +516,7 @@
    * [NOTE: This is for backwards compatibility, use getIdleReadBufferLimit().]
    * Get the maximum size of read buffer allocated to idle TConnection objects.
    *
-   * @return # bytes beyond which we will shrink buffers when idle.
+   * @return # bytes beyond which we will dealloc idle buffer.
    */
   size_t getIdleBufferMemLimit() const {
     return idleReadBufferLimit_;
@@ -496,7 +526,8 @@
    * Set the maximum size read buffer allocated to idle TConnection objects.
    * If a TConnection object is found (either on connection close or between
    * calls when resizeBufferEveryN_ is set) with more than this much memory
-   * allocated to its read buffer, we shrink it to this value.
+   * allocated to its read buffer, we free it and allow it to be reinitialized
+   * on the next received frame.
    *
    * @param limit of bytes beyond which we will shrink buffers when checked.
    */
@@ -509,7 +540,8 @@
    * Set the maximum size read buffer allocated to idle TConnection objects.
    * If a TConnection object is found (either on connection close or between
    * calls when resizeBufferEveryN_ is set) with more than this much memory
-   * allocated to its read buffer, we shrink it to this value.
+   * allocated to its read buffer, we free it and allow it to be reinitialized
+   * on the next received frame.
    *
    * @param limit of bytes beyond which we will shrink buffers when checked.
    */
@@ -532,7 +564,8 @@
    * Set the maximum size write buffer allocated to idle TConnection objects.
    * If a TConnection object is found (either on connection close or between
    * calls when resizeBufferEveryN_ is set) with more than this much memory
-   * allocated to its write buffer, we destroy and construct that buffer.
+   * allocated to its write buffer, we destroy and construct that buffer with
+   * writeBufferDefaultSize_ bytes.
    *
    * @param limit of bytes beyond which we will shrink buffers when idle.
    */
@@ -651,8 +684,9 @@
   void serve();
 };
 
-/// Two states for sockets, recv and send mode
+/// Three states for sockets: recv frame size, recv data, and send mode
 enum TSocketState {
+  SOCKET_RECV_FRAMING,
   SOCKET_RECV,
   SOCKET_SEND
 };
@@ -681,9 +715,6 @@
 class TConnection {
  private:
 
-  /// Starting size for new connection buffer
-  static const int STARTING_CONNECTION_BUFFER_SIZE = 1024;
-
   /// Server handle
   TNonblockingServer* server_;
 
@@ -797,17 +828,14 @@
   /// Constructor
   TConnection(int socket, short eventFlags, TNonblockingServer *s,
               const sockaddr* addr, socklen_t addrLen) {
-    readBuffer_ = (uint8_t*)std::malloc(STARTING_CONNECTION_BUFFER_SIZE);
-    if (readBuffer_ == NULL) {
-      throw new apache::thrift::TException("Out of memory.");
-    }
-    readBufferSize_ = STARTING_CONNECTION_BUFFER_SIZE;
+    readBuffer_ = NULL;
+    readBufferSize_ = 0;
 
     // Allocate input and output tranpsorts
     // these only need to be allocated once per TConnection (they don't need to be
     // reallocated on init() call)
     inputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(readBuffer_, readBufferSize_));
-    outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer());
+    outputTransport_ = boost::shared_ptr<TMemoryBuffer>(new TMemoryBuffer(s->getWriteBufferDefaultSize()));
     tSocket_.reset(new TSocket());
 
     init(socket, eventFlags, s, addr, addrLen);