THRIFT-1190. java: readBufferBytesAllocated in TNonblockingServer.java should be AtomicLong to fix FD leakage and general server malfunction

There was a race condition in the use of the memory limiting feature that would lead to memory loss.

Patch: Tom May

git-svn-id: https://svn.apache.org/repos/asf/thrift/trunk@1130231 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
index 39177a3..d44d460 100644
--- a/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
@@ -28,6 +28,7 @@
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.thrift.TByteArrayOutputStream;
 import org.apache.thrift.TException;
@@ -87,7 +88,7 @@
   /**
    * How many bytes are currently allocated to read buffers.
    */
-  private long readBufferBytesAllocated = 0;
+  private final AtomicLong readBufferBytesAllocated = new AtomicLong(0);
 
   public TNonblockingServer(AbstractNonblockingServerArgs args) {
     super(args);
@@ -479,12 +480,12 @@
 
           // if this frame will push us over the memory limit, then return.
           // with luck, more memory will free up the next time around.
-          if (readBufferBytesAllocated + frameSize > MAX_READ_BUFFER_BYTES) {
+          if (readBufferBytesAllocated.get() + frameSize > MAX_READ_BUFFER_BYTES) {
             return true;
           }
 
           // increment the amount of memory allocated to read buffers
-          readBufferBytesAllocated += frameSize;
+          readBufferBytesAllocated.addAndGet(frameSize);
 
           // reallocate the readbuffer as a frame-sized buffer
           buffer_ = ByteBuffer.allocate(frameSize);
@@ -576,7 +577,7 @@
       // if we're being closed due to an error, we might have allocated a
       // buffer that we need to subtract for our memory accounting.
       if (state_ == READING_FRAME || state_ == READ_FRAME_COMPLETE) {
-        readBufferBytesAllocated -= buffer_.array().length;
+        readBufferBytesAllocated.addAndGet(-buffer_.array().length);
       }
       trans_.close();
     }
@@ -600,7 +601,7 @@
       // our read buffer count. we do this here as well as in close because
       // we'd like to free this read memory up as quickly as possible for other
       // clients.
-      readBufferBytesAllocated -= buffer_.array().length;
+      readBufferBytesAllocated.addAndGet(-buffer_.array().length);
 
       if (response_.len() == 0) {
         // go straight to reading again. this was probably an oneway method