THRIFT-4251: Fix JDK Epoll Bug in Thrift of TThreadedSelectorServer model.
Client: Java
This closes #1313
diff --git a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
index 9c94b76..5c62b99 100644
--- a/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
+++ b/lib/java/src/org/apache/thrift/server/AbstractNonblockingServer.java
@@ -19,15 +19,6 @@
package org.apache.thrift.server;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.spi.SelectorProvider;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.thrift.TAsyncProcessor;
import org.apache.thrift.TByteArrayOutputStream;
import org.apache.thrift.TException;
@@ -42,6 +33,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.spi.SelectorProvider;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
/**
* Provides common methods and classes used by nonblocking TServer
* implementations.
@@ -102,7 +102,7 @@
/**
* Starts any threads required for serving.
- *
+ *
* @return true if everything went ok, false if threads could not be started.
*/
protected abstract boolean startThreads();
@@ -115,7 +115,7 @@
/**
* Have the server transport start accepting connections.
- *
+ *
* @return true if we started listening successfully, false if something went
* wrong.
*/
@@ -139,7 +139,7 @@
/**
* Perform an invocation. This method could behave several different ways -
* invoke immediately inline, queue for separate execution, etc.
- *
+ *
* @return true if invocation was successfully requested, which is not a
* guarantee that invocation has completed. False if the request
* failed.
@@ -152,7 +152,7 @@
* corresponding to requests.
*/
protected abstract class AbstractSelectThread extends Thread {
- protected final Selector selector;
+ protected Selector selector;
// List of FrameBuffers that want to change their selection interests.
protected final Set<FrameBuffer> selectInterestChanges = new HashSet<FrameBuffer>();
@@ -285,21 +285,21 @@
protected ByteBuffer buffer_;
protected final TByteArrayOutputStream response_;
-
+
// the frame that the TTransport should wrap.
protected final TMemoryInputTransport frameTrans_;
-
+
// the transport that should be used to connect to clients
protected final TTransport inTrans_;
-
+
protected final TTransport outTrans_;
-
+
// the input protocol to use on frames
protected final TProtocol inProt_;
-
+
// the output protocol to use on frames
protected final TProtocol outProt_;
-
+
// context associated with this connection
protected final ServerContext context_;
@@ -328,7 +328,7 @@
/**
* Give this FrameBuffer a chance to read. The selector loop should have
* received a read event for this FrameBuffer.
- *
+ *
* @return true if the connection should live on, false if it should be
* closed
*/
@@ -455,7 +455,7 @@
public void close() {
// if we're being closed due to an error, we might have allocated a
// buffer that we need to subtract for our memory accounting.
- if (state_ == FrameBufferState.READING_FRAME ||
+ if (state_ == FrameBufferState.READING_FRAME ||
state_ == FrameBufferState.READ_FRAME_COMPLETE ||
state_ == FrameBufferState.AWAITING_CLOSE) {
readBufferBytesAllocated.addAndGet(-buffer_.array().length);
@@ -510,7 +510,7 @@
public void invoke() {
frameTrans_.reset(buffer_.array());
response_.reset();
-
+
try {
if (eventHandler_ != null) {
eventHandler_.processContext(context_, inTrans_, outTrans_);
@@ -530,7 +530,7 @@
/**
* Perform a read into buffer.
- *
+ *
* @return true if the read succeeded, false if there was an error or the
* connection closed.
*/
diff --git a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
index 353b8e0..038507e 100644
--- a/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
+++ b/lib/java/src/org/apache/thrift/server/TThreadedSelectorServer.java
@@ -19,7 +19,15 @@
package org.apache.thrift.server;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TNonblockingTransport;
+import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.SelectorProvider;
@@ -37,24 +45,18 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TNonblockingTransport;
-import org.apache.thrift.transport.TTransportException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/**
* A Half-Sync/Half-Async server with a separate pool of threads to handle
* non-blocking I/O. Accepts are handled on a single thread, and a configurable
* number of nonblocking selector threads manage reading and writing of client
* connections. A synchronous worker thread pool handles processing of requests.
- *
+ *
* Performs better than TNonblockingServer/THsHaServer in multi-core
* environments when the the bottleneck is CPU on the single selector thread
* handling I/O. In addition, because the accept handling is decoupled from
* reads/writes and invocation, the server has better ability to handle back-
* pressure from new connections (e.g. stop accepting when busy).
- *
+ *
* Like TNonblockingServer, it relies on the use of TFramedTransport.
*/
public class TThreadedSelectorServer extends AbstractNonblockingServer {
@@ -205,7 +207,7 @@
/**
* Start the accept and selector threads running to deal with clients.
- *
+ *
* @return true if everything went ok, false if we couldn't start for some
* reason.
*/
@@ -349,7 +351,7 @@
/**
* Set up the AcceptThead
- *
+ *
* @throws IOException
*/
public AcceptThread(TNonblockingServerTransport serverTransport,
@@ -478,10 +480,13 @@
// Accepted connections added by the accept thread.
private final BlockingQueue<TNonblockingTransport> acceptedQueue;
+ private int SELECTOR_AUTO_REBUILD_THRESHOLD = 512;
+ private long MONITOR_PERIOD = 1000L;
+ private int jvmBug = 0;
/**
* Set up the SelectorThread with an unbounded queue for incoming accepts.
- *
+ *
* @throws IOException
* if a selector cannot be created
*/
@@ -491,7 +496,7 @@
/**
* Set up the SelectorThread with an bounded queue for incoming accepts.
- *
+ *
* @throws IOException
* if a selector cannot be created
*/
@@ -501,7 +506,7 @@
/**
* Set up the SelectorThread with a specified queue for connections.
- *
+ *
* @param acceptedQueue
* The BlockingQueue implementation for holding incoming accepted
* connections.
@@ -515,7 +520,7 @@
/**
* Hands off an accepted connection to be handled by this thread. This
* method will block if the queue for new connections is at capacity.
- *
+ *
* @param accepted
* The connection that has been accepted.
* @return true if the connection has been successfully added.
@@ -566,8 +571,8 @@
*/
private void select() {
try {
- // wait for io events.
- selector.select();
+
+ doSelect();
// process the io events we received
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
@@ -596,6 +601,77 @@
}
}
+ /**
+ * Do select and judge epoll bug happen.
+ * See : https://issues.apache.org/jira/browse/THRIFT-4251
+ */
+ private void doSelect() throws IOException {
+ long beforeSelect = System.currentTimeMillis();
+ int selectedNums = selector.select();
+ long afterSelect = System.currentTimeMillis();
+
+ if (selectedNums == 0) {
+ jvmBug++;
+ } else {
+ jvmBug = 0;
+ }
+
+ long selectedTime = afterSelect - beforeSelect;
+ if (selectedTime >= MONITOR_PERIOD) {
+ jvmBug = 0;
+ } else if (jvmBug > SELECTOR_AUTO_REBUILD_THRESHOLD) {
+ LOGGER.warn("In {} ms happen {} times jvm bug; rebuilding selector.", MONITOR_PERIOD, jvmBug);
+ rebuildSelector();
+ selector.selectNow();
+ jvmBug = 0;
+ }
+
+ }
+
+ /**
+ * Replaces the current Selector of this SelectorThread with newly created Selector to work
+ * around the infamous epoll 100% CPU bug.
+ */
+ private synchronized void rebuildSelector() {
+ final Selector oldSelector = selector;
+ if (oldSelector == null) {
+ return;
+ }
+ Selector newSelector = null;
+ try {
+ newSelector = Selector.open();
+ LOGGER.warn("Created new Selector.");
+ } catch (IOException e) {
+ LOGGER.error("Create new Selector error.", e);
+ }
+
+ for (SelectionKey key : oldSelector.selectedKeys()) {
+ if (!key.isValid() && key.readyOps() == 0)
+ continue;
+ SelectableChannel channel = key.channel();
+ Object attachment = key.attachment();
+
+ try {
+ if (attachment == null) {
+ channel.register(newSelector, key.readyOps());
+ } else {
+ channel.register(newSelector, key.readyOps(), attachment);
+ }
+ } catch (ClosedChannelException e) {
+ LOGGER.error("Register new selector key error.", e);
+ }
+
+ }
+
+ selector = newSelector;
+ try {
+ oldSelector.close();
+ } catch (IOException e) {
+ LOGGER.error("Close old selector error.", e);
+ }
+ LOGGER.warn("Replace new selector success.");
+ }
+
private void processAcceptedConnections() {
// Register accepted connections
while (!stopped_) {