THRIFT-4847: CancelledKeyException causes TThreadedSelectorServer to fail
diff --git a/lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java b/lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java
index 954aaeb..a573f0c 100644
--- a/lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java
+++ b/lib/java/src/main/java/org/apache/thrift/server/AbstractNonblockingServer.java
@@ -248,7 +248,7 @@
protected final TNonblockingTransport trans_;
// the SelectionKey that corresponds to our transport
- protected final SelectionKey selectionKey_;
+ protected SelectionKey selectionKey_;
// the SelectThread that owns the registration of our transport
protected final AbstractSelectThread selectThread_;
@@ -303,6 +303,14 @@
}
/**
+ * Sets the selection key (this is not thread safe).
+ * @param selectionKey the new key to set.
+ */
+ public void setSelectionKey(SelectionKey selectionKey) {
+ selectionKey_ = selectionKey;
+ }
+
+ /**
* Give this FrameBuffer a chance to read. The selector loop should have received a read event
* for this FrameBuffer.
*
@@ -375,7 +383,11 @@
// modify our selection key directly.
if (buffer_.remaining() == 0) {
// get rid of the read select interests
- selectionKey_.interestOps(0);
+ if (selectionKey_.isValid()) {
+ selectionKey_.interestOps(0);
+ } else {
+ LOGGER.warn("SelectionKey was invalidated during read");
+ }
state_ = FrameBufferState.READ_FRAME_COMPLETE;
}
@@ -415,8 +427,12 @@
switch (state_) {
case AWAITING_REGISTER_WRITE:
// set the OP_WRITE interest
- selectionKey_.interestOps(SelectionKey.OP_WRITE);
- state_ = FrameBufferState.WRITING;
+ if (selectionKey_.isValid()) {
+ selectionKey_.interestOps(SelectionKey.OP_WRITE);
+ state_ = FrameBufferState.WRITING;
+ } else {
+ LOGGER.warn("SelectionKey was invalidated before write");
+ }
break;
case AWAITING_REGISTER_READ:
prepareRead();
@@ -520,7 +536,11 @@
private void prepareRead() {
// we can set our interest directly without using the queue because
// we're in the select thread.
- selectionKey_.interestOps(SelectionKey.OP_READ);
+ if (selectionKey_.isValid()) {
+ selectionKey_.interestOps(SelectionKey.OP_READ);
+ } else {
+ LOGGER.warn("SelectionKey was invalidated before read");
+ }
// get ready for another go-around
buffer_ = ByteBuffer.allocate(4);
state_ = FrameBufferState.READING_FRAME_SIZE;
diff --git a/lib/java/src/main/java/org/apache/thrift/server/TThreadedSelectorServer.java b/lib/java/src/main/java/org/apache/thrift/server/TThreadedSelectorServer.java
index 86b8dfd..2a95fe1 100644
--- a/lib/java/src/main/java/org/apache/thrift/server/TThreadedSelectorServer.java
+++ b/lib/java/src/main/java/org/apache/thrift/server/TThreadedSelectorServer.java
@@ -626,16 +626,23 @@
LOGGER.error("Create new Selector error.", e);
}
- for (SelectionKey key : oldSelector.selectedKeys()) {
- if (!key.isValid() && key.readyOps() == 0) continue;
+ for (SelectionKey key : oldSelector.keys()) {
+ if (!key.isValid() || key.interestOps() == 0 || key.channel().keyFor(newSelector) != null) {
+ continue;
+ }
SelectableChannel channel = key.channel();
Object attachment = key.attachment();
+ int interestOps = key.interestOps();
+ SelectionKey newKey;
try {
if (attachment == null) {
- channel.register(newSelector, key.readyOps());
+ newKey = channel.register(newSelector, interestOpts);
} else {
- channel.register(newSelector, key.readyOps(), attachment);
+ newKey = channel.register(newSelector, interestOpts, attachment);
+ if (attachment instanceof FrameBuffer) {
+ ((FrameBuffer) attachment.setSelectionKey(newKey);
+ }
}
} catch (ClosedChannelException e) {
LOGGER.error("Register new selector key error.", e);