THRIFT-3891 TNonblockingServer configured with more than one IO threads does not always return from serve() upon stop()
Client: C++
Patch: additional changes by jking@apache.org to improve the test and stop clean in all cases
This closes #1080
This closes #1196
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index 649910f..d4418bd 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -510,7 +510,7 @@
// If there is no data to send, then let us move on
if (writeBufferPos_ == writeBufferSize_) {
- GlobalOutput("WARNING: Send state with no data to send\n");
+ GlobalOutput("WARNING: Send state with no data to send");
transition();
return;
}
@@ -765,11 +765,9 @@
}
// Delete a previously existing event
- if (eventFlags_ != 0) {
- if (event_del(&event_) == -1) {
- GlobalOutput("TConnection::setFlags event_del");
- return;
- }
+ if (eventFlags_ && event_del(&event_) == -1) {
+ GlobalOutput.perror("TConnection::setFlags() event_del", THRIFT_GET_SOCKET_ERROR);
+ return;
}
// Update in memory structure
@@ -812,7 +810,7 @@
// Add the event
if (event_add(&event_, 0) == -1) {
- GlobalOutput("TConnection::setFlags(): could not event_add");
+ GlobalOutput.perror("TConnection::setFlags(): could not event_add", THRIFT_GET_SOCKET_ERROR);
}
}
@@ -820,9 +818,9 @@
* Closes a connection
*/
void TNonblockingServer::TConnection::close() {
- // Delete the registered libevent
- if (event_del(&event_) == -1) {
+ if (eventFlags_ && event_del(&event_) == -1) {
GlobalOutput.perror("TConnection::close() event_del", THRIFT_GET_SOCKET_ERROR);
+ return;
}
if (serverEventHandler_) {
@@ -1066,7 +1064,7 @@
if (res->ai_family == AF_INET6) {
int zero = 0;
if (-1 == setsockopt(s, IPPROTO_IPV6, IPV6_V6ONLY, const_cast_sockopt(&zero), sizeof(zero))) {
- GlobalOutput("TServerSocket::listen() IPV6_V6ONLY");
+ GlobalOutput.perror("TServerSocket::listen() IPV6_V6ONLY", THRIFT_GET_SOCKET_ERROR);
}
}
#endif // #ifdef IPV6_V6ONLY
@@ -1486,6 +1484,7 @@
if (nBytes == kSize) {
if (connection == NULL) {
// this is the command to stop our thread, exit the handler!
+ ioThread->breakLoop(false);
return;
}
connection->transition();
@@ -1496,6 +1495,7 @@
return;
} else if (nBytes == 0) {
GlobalOutput.printf("notifyHandler: Notify socket closed!");
+ ioThread->breakLoop(false);
// exit the loop
break;
} else { // nBytes < 0
@@ -1520,19 +1520,15 @@
::abort();
}
- // sets a flag so that the loop exits on the next event
- event_base_loopbreak(eventBase_);
-
- // event_base_loopbreak() only causes the loop to exit the next time
- // it wakes up. We need to force it to wake up, in case there are
- // no real events it needs to process.
- //
// If we're running in the same thread, we can't use the notify(0)
// mechanism to stop the thread, but happily if we're running in the
// same thread, this means the thread can't be blocking in the event
// loop either.
if (!Thread::is_current(threadId_)) {
notify(NULL);
+ } else {
+ // cause the loop to stop ASAP - even if it has things to do in it
+ event_base_loopbreak(eventBase_);
}
}
@@ -1566,25 +1562,27 @@
}
void TNonblockingIOThread::run() {
- if (eventBase_ == NULL)
+ if (eventBase_ == NULL) {
registerEvents();
-
- GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
-
+ }
if (useHighPriority_) {
setCurrentThreadHighPriority(true);
}
- // Run libevent engine, never returns, invokes calls to eventHandler
- event_base_loop(eventBase_, 0);
+ if (eventBase_ != NULL)
+ {
+ GlobalOutput.printf("TNonblockingServer: IO thread #%d entering loop...", number_);
+ // Run libevent engine, never returns, invokes calls to eventHandler
+ event_base_loop(eventBase_, 0);
- if (useHighPriority_) {
- setCurrentThreadHighPriority(false);
+ if (useHighPriority_) {
+ setCurrentThreadHighPriority(false);
+ }
+
+ // cleans up our registered events
+ cleanupEvents();
}
- // cleans up our registered events
- cleanupEvents();
-
GlobalOutput.printf("TNonblockingServer: IO thread #%d run() done!", number_);
}
diff --git a/lib/cpp/test/TNonblockingServerTest.cpp b/lib/cpp/test/TNonblockingServerTest.cpp
index 48ea913..e933d6b 100644
--- a/lib/cpp/test/TNonblockingServerTest.cpp
+++ b/lib/cpp/test/TNonblockingServerTest.cpp
@@ -21,6 +21,7 @@
#include <boost/test/unit_test.hpp>
#include <boost/smart_ptr.hpp>
+#include "thrift/concurrency/Monitor.h"
#include "thrift/concurrency/Thread.h"
#include "thrift/server/TNonblockingServer.h"
@@ -29,6 +30,10 @@
#include <event.h>
using namespace apache::thrift;
+using apache::thrift::concurrency::Guard;
+using apache::thrift::concurrency::Monitor;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::server::TServerEventHandler;
struct Handler : public test::ParentServiceIf {
void addString(const std::string& s) { strings_.push_back(s); }
@@ -46,11 +51,31 @@
class Fixture {
private:
+ struct ListenEventHandler : public TServerEventHandler {
+ public:
+ ListenEventHandler(Mutex* mutex) : listenMonitor_(mutex), ready_(false) {}
+
+ void preServe() /* override */ {
+ Guard g(listenMonitor_.mutex());
+ ready_ = true;
+ listenMonitor_.notify();
+ }
+
+ Monitor listenMonitor_;
+ bool ready_;
+ };
+
struct Runner : public apache::thrift::concurrency::Runnable {
int port;
boost::shared_ptr<event_base> userEventBase;
boost::shared_ptr<TProcessor> processor;
boost::shared_ptr<server::TNonblockingServer> server;
+ boost::shared_ptr<ListenEventHandler> listenHandler;
+ Mutex mutex_;
+
+ Runner() {
+ listenHandler.reset(new ListenEventHandler(&mutex_));
+ }
virtual void run() {
// When binding to explicit port, allow retrying to workaround bind failures on ports in use
@@ -58,10 +83,18 @@
startServer(retryCount);
}
+ void readyBarrier() {
+ // block until server is listening and ready to accept connections
+ Guard g(mutex_);
+ while (!listenHandler->ready_) {
+ listenHandler->listenMonitor_.wait();
+ }
+ }
private:
void startServer(int retry_count) {
try {
server.reset(new server::TNonblockingServer(processor, port));
+ server->setServerEventHandler(listenHandler);
if (userEventBase) {
server->registerEvents(userEventBase.get());
}
@@ -112,8 +145,8 @@
false));
thread = threadFactory->newThread(runner);
thread->start();
- // wait 100 ms for the server to begin listening
- THRIFT_SLEEP_USEC(100000);
+ runner->readyBarrier();
+
server = runner->server;
return runner->port;
}