Fix TNonBlockingServer libevent issue in ThreadPool mode
Summary: If using TNonBlockingServer with a ThreadManager, when you send a task off to the threadmanager you need to cancel the event that you have set on that client socket. Otherwise, when you give control back to libevent, it might trigger more read events if there are more requests coming down the pipe. This is an issue, because the server will be in the wrong state at that point and will have no way of handling reading more data if it is still in the WAIT_TASK state trying to see if it should write something back to the client. So, when we hit that control flow, we must setIdle() on the TConnection so that libevent doesn't trigger it anymore. Later, after the result is written, we'll setRead() and go back to the init state.
Reviewed By: akhil
Test Plan: Akhil's async + TNonBlocking karma server
git-svn-id: https://svn.apache.org/repos/asf/incubator/thrift/trunk@665217 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/lib/cpp/src/server/TNonblockingServer.cpp b/lib/cpp/src/server/TNonblockingServer.cpp
index ceba960..fcfe797 100644
--- a/lib/cpp/src/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/server/TNonblockingServer.cpp
@@ -118,7 +118,7 @@
// Read from the socket
fetch = readWant_ - readBufferPos_;
got = recv(socket_, readBuffer_ + readBufferPos_, fetch, 0);
-
+
if (got > 0) {
// Move along in the buffer
readBufferPos_ += got;
@@ -215,7 +215,7 @@
// and get back some data from the dispatch function
inputTransport_->resetBuffer(readBuffer_, readBufferPos_);
outputTransport_->resetBuffer();
-
+
if (server_->isThreadPoolProcessing()) {
// We are setting up a Task to do this work and we will wait on it
int sv[2];
@@ -242,6 +242,11 @@
return;
}
server_->addTask(task);
+
+ // Set this connection idle so that libevent doesn't process more
+ // data on it while we're still waiting for the threadmanager to
+ // finish this task
+ setIdle();
return;
}
} else {
@@ -263,6 +268,9 @@
}
}
+ // Intentionally fall through here, the call to process has written into
+ // the writeBuffer_
+
case APP_WAIT_TASK:
// We have now finished processing a task and the result has been written
// into the outputTransport_, so we grab its contents and place them into
@@ -392,6 +400,11 @@
// Update in memory structure
eventFlags_ = eventFlags;
+ // Do not call event_set if there are no flags
+ if (!eventFlags_) {
+ return;
+ }
+
/**
* event_set:
*
diff --git a/lib/cpp/src/server/TNonblockingServer.h b/lib/cpp/src/server/TNonblockingServer.h
index 6997c45..5470ad4 100644
--- a/lib/cpp/src/server/TNonblockingServer.h
+++ b/lib/cpp/src/server/TNonblockingServer.h
@@ -267,6 +267,11 @@
setFlags(EV_WRITE | EV_PERSIST);
}
+ // Set socket idle
+ void setIdle() {
+ setFlags(0);
+ }
+
// Set event flags
void setFlags(short eventFlags);