THRIFT-81. java: TNonblockingServer: Support a limit on read buffer size
This change makes it possible to set a maximum amount of memory that
TNonblockingServer will use for all read buffers (combined).
If it is exceeded, no new data will be read from clients until
memory is freed. The current implementation does a busy wait in
the main thread when this happens.
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@719741 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/java/src/com/facebook/thrift/TByteArrayOutputStream.java b/lib/java/src/com/facebook/thrift/TByteArrayOutputStream.java
index 94ce003..4d3ffbf 100644
--- a/lib/java/src/com/facebook/thrift/TByteArrayOutputStream.java
+++ b/lib/java/src/com/facebook/thrift/TByteArrayOutputStream.java
@@ -19,6 +19,11 @@
super(size);
}
+ public TByteArrayOutputStream() {
+ super();
+ }
+
+
public byte[] get() {
return buf;
}
diff --git a/lib/java/src/com/facebook/thrift/server/THsHaServer.java b/lib/java/src/com/facebook/thrift/server/THsHaServer.java
index a8764ec..8790d84 100644
--- a/lib/java/src/com/facebook/thrift/server/THsHaServer.java
+++ b/lib/java/src/com/facebook/thrift/server/THsHaServer.java
@@ -33,7 +33,10 @@
// for the passing of Invocations from the Selector to workers.
private ExecutorService invoker;
- private final Options options;
+ protected final int MIN_WORKER_THREADS;
+ protected final int MAX_WORKER_THREADS;
+ protected final int STOP_TIMEOUT_VAL;
+ protected final TimeUnit STOP_TIMEOUT_UNIT;
/**
* Create server with given processor, and server transport. Default server
@@ -188,8 +191,13 @@
{
super(processorFactory, serverTransport,
inputTransportFactory, outputTransportFactory,
- inputProtocolFactory, outputProtocolFactory);
- this.options = options;
+ inputProtocolFactory, outputProtocolFactory,
+ options);
+
+ MIN_WORKER_THREADS = options.minWorkerThreads;
+ MAX_WORKER_THREADS = options.maxWorkerThreads;
+ STOP_TIMEOUT_VAL = options.stopTimeoutVal;
+ STOP_TIMEOUT_UNIT = options.stopTimeoutUnit;
}
/** @inheritdoc */
@@ -223,9 +231,8 @@
protected boolean startInvokerPool() {
// start the invoker pool
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
- invoker = new ThreadPoolExecutor(options.minWorkerThreads,
- options.maxWorkerThreads, options.stopTimeoutVal, options.stopTimeoutUnit,
- queue);
+ invoker = new ThreadPoolExecutor(MIN_WORKER_THREADS, MAX_WORKER_THREADS,
+ STOP_TIMEOUT_VAL, STOP_TIMEOUT_UNIT, queue);
return true;
}
@@ -279,7 +286,7 @@
}
}
- public static class Options {
+ public static class Options extends TNonblockingServer.Options {
public int minWorkerThreads = 5;
public int maxWorkerThreads = Integer.MAX_VALUE;
public int stopTimeoutVal = 60;
diff --git a/lib/java/src/com/facebook/thrift/server/TNonblockingServer.java b/lib/java/src/com/facebook/thrift/server/TNonblockingServer.java
index 47fdd5f..8e9d3bd 100644
--- a/lib/java/src/com/facebook/thrift/server/TNonblockingServer.java
+++ b/lib/java/src/com/facebook/thrift/server/TNonblockingServer.java
@@ -14,6 +14,7 @@
import com.facebook.thrift.transport.TNonblockingTransport;
import com.facebook.thrift.transport.TTransportException;
import com.facebook.thrift.transport.TTransportFactory;
+import com.facebook.thrift.TByteArrayOutputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -60,6 +61,20 @@
private SelectThread selectThread_;
/**
+ * The maximum amount of memory we will allocate to client IO buffers at a
+ * time. Without this limit, the server will gladly allocate client buffers
+ * right into an out of memory exception, rather than waiting.
+ */
+ private final long MAX_READ_BUFFER_BYTES;
+
+ protected final Options options_;
+
+ /**
+ * How many bytes are currently allocated to read buffers.
+ */
+ private long readBufferBytesAllocated = 0;
+
+ /**
* Create server with given processor and server transport, using
* TBinaryProtocol for the protocol, TFramedTransport.Factory on both input
* and output transports. A TProcessorFactory will be created that always
@@ -125,9 +140,25 @@
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory) {
+ this(processorFactory, serverTransport,
+ inputTransportFactory, outputTransportFactory,
+ inputProtocolFactory, outputProtocolFactory,
+ new Options());
+ }
+
+ public TNonblockingServer(TProcessorFactory processorFactory,
+ TNonblockingServerTransport serverTransport,
+ TFramedTransport.Factory inputTransportFactory,
+ TFramedTransport.Factory outputTransportFactory,
+ TProtocolFactory inputProtocolFactory,
+ TProtocolFactory outputProtocolFactory,
+ Options options) {
super(processorFactory, serverTransport,
inputTransportFactory, outputTransportFactory,
inputProtocolFactory, outputProtocolFactory);
+ options_ = options;
+ options_.validate();
+ MAX_READ_BUFFER_BYTES = options.maxReadBufferBytes;
}
/**
@@ -446,7 +477,7 @@
// the ByteBuffer we'll be using to write and read, depending on the state
private ByteBuffer buffer_;
- private ByteArrayOutputStream response_;
+ private TByteArrayOutputStream response_;
public FrameBuffer( final TNonblockingTransport trans,
final SelectionKey selectionKey) {
@@ -479,6 +510,24 @@
+ ". Are you using TFramedTransport on the client side?");
return false;
}
+
+ // if this frame will always be too large for this server, log the
+ // error and close the connection.
+ if (frameSize + 4 > MAX_READ_BUFFER_BYTES) {
+ LOGGER.severe("Read a frame size of " + frameSize
+ + ", which is bigger than the maximum allowable buffer size for ALL connections.");
+ return false;
+ }
+
+ // if this frame will push us over the memory limit, then return.
+ // with luck, more memory will free up the next time around.
+ if (readBufferBytesAllocated + frameSize + 4 > MAX_READ_BUFFER_BYTES) {
+ return true;
+ }
+
+ // incremement the amount of memory allocated to read buffers
+ readBufferBytesAllocated += frameSize + 4;
+
// reallocate the readbuffer as a frame-sized buffer
buffer_ = ByteBuffer.allocate(frameSize + 4);
// put the frame size at the head of the buffer
@@ -568,6 +617,11 @@
* Shut the connection down.
*/
public void close() {
+ // if we're being closed due to an error, we might have allocated a
+ // buffer that we need to subtract for our memory accounting.
+ if (state_ == READING_FRAME || state_ == READ_FRAME_COMPLETE) {
+ readBufferBytesAllocated -= buffer_.array().length;
+ }
trans_.close();
}
@@ -586,14 +640,18 @@
* trying to write and instead go back to reading.
*/
public void responseReady() {
- // capture the data we want to write as a byte array.
- byte[] bytes = response_.toByteArray();
+ // the read buffer is definitely no longer in use, so we will decrement
+ // our read buffer count. we do this here as well as in close because
+ // we'd like to free this read memory up as quickly as possible for other
+ // clients.
+ readBufferBytesAllocated -= buffer_.array().length;
- if (bytes.length <= 0) {
+ if (response_.len() == 0) {
// go straight to reading again. this was probably an async method
state_ = AWAITING_REGISTER_READ;
+ buffer_ = null;
} else {
- buffer_ = ByteBuffer.wrap(bytes);
+ buffer_ = ByteBuffer.wrap(response_.get(), 0, response_.len());
// set state that we're waiting to be switched to write. we do this
// asynchronously through requestSelectInterestChange() because there is a
@@ -640,7 +698,7 @@
* Get the transport that should be used by the invoker for responding.
*/
private TTransport getOutputTransport() {
- response_ = new ByteArrayOutputStream();
+ response_ = new TByteArrayOutputStream();
return outputTransportFactory_.getTransport(new TIOStreamTransport(response_));
}
@@ -690,4 +748,16 @@
}
} // FrameBuffer
+
+ public static class Options {
+ public long maxReadBufferBytes = Long.MAX_VALUE;
+
+ public Options() {}
+
+ public void validate() {
+ if (maxReadBufferBytes <= 1024) {
+ throw new IllegalArgumentException("You must allocate at least 1KB to the read buffer.");
+ }
+ }
+ }
}
diff --git a/test/java/TestNonblockingServer b/test/java/TestNonblockingServer
index cb330d4..ee2ba9d 100644
--- a/test/java/TestNonblockingServer
+++ b/test/java/TestNonblockingServer
@@ -1,2 +1,2 @@
#!/bin/bash -v
-java -server -cp thrifttest.jar:../../lib/java/libthrift.jar com.facebook.thrift.test.TestNonblockingServer $*
+java -server -Xmx256m -cp thrifttest.jar:../../lib/java/libthrift.jar com.facebook.thrift.test.TestNonblockingServer $*
diff --git a/test/java/src/OverloadNonblockingServer.java b/test/java/src/OverloadNonblockingServer.java
new file mode 100644
index 0000000..26f5354
--- /dev/null
+++ b/test/java/src/OverloadNonblockingServer.java
@@ -0,0 +1,44 @@
+
+package com.facebook.thrift;
+
+import thrift.test.*;
+
+import com.facebook.thrift.TApplicationException;
+import com.facebook.thrift.TSerializer;
+import com.facebook.thrift.transport.TTransport;
+import com.facebook.thrift.transport.TSocket;
+import com.facebook.thrift.transport.TFramedTransport;
+import com.facebook.thrift.transport.TTransportException;
+import com.facebook.thrift.protocol.TBinaryProtocol;
+import com.facebook.thrift.protocol.TSimpleJSONProtocol;
+
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+import java.util.ArrayList;
+
+
+public class OverloadNonblockingServer {
+
+ public static void main(String[] args) throws Exception {
+ int msg_size_mb = Integer.parseInt(args[0]);
+ int msg_size = msg_size_mb * 1024 * 1024;
+
+ TSocket socket = new TSocket("localhost", 9090);
+ TBinaryProtocol binprot = new TBinaryProtocol(socket);
+ socket.open();
+ binprot.writeI32(msg_size);
+ binprot.writeI32(1);
+ socket.flush();
+
+ System.in.read();
+ // Thread.sleep(30000);
+ for (int i = 0; i < msg_size_mb; i++) {
+ binprot.writeBinary(new byte[1024 * 1024]);
+ }
+
+ socket.close();
+ }
+}