THRIFT-3891 TNonblockingServer configured with more than one IO threads does not always return from serve() upon stop()
Client: C++
Patch: additional changes by jking@apache.org to improve the test and stop clean in all cases
This closes #1080
This closes #1196
diff --git a/lib/cpp/test/TNonblockingServerTest.cpp b/lib/cpp/test/TNonblockingServerTest.cpp
index 48ea913..e933d6b 100644
--- a/lib/cpp/test/TNonblockingServerTest.cpp
+++ b/lib/cpp/test/TNonblockingServerTest.cpp
@@ -21,6 +21,7 @@
#include <boost/test/unit_test.hpp>
#include <boost/smart_ptr.hpp>
+#include "thrift/concurrency/Monitor.h"
#include "thrift/concurrency/Thread.h"
#include "thrift/server/TNonblockingServer.h"
@@ -29,6 +30,10 @@
#include <event.h>
using namespace apache::thrift;
+using apache::thrift::concurrency::Guard;
+using apache::thrift::concurrency::Monitor;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::server::TServerEventHandler;
struct Handler : public test::ParentServiceIf {
void addString(const std::string& s) { strings_.push_back(s); }
@@ -46,11 +51,31 @@
class Fixture {
private:
+ struct ListenEventHandler : public TServerEventHandler {
+ public:
+ ListenEventHandler(Mutex* mutex) : listenMonitor_(mutex), ready_(false) {}
+
+ void preServe() /* override */ {
+ Guard g(listenMonitor_.mutex());
+ ready_ = true;
+ listenMonitor_.notify();
+ }
+
+ Monitor listenMonitor_;
+ bool ready_;
+ };
+
struct Runner : public apache::thrift::concurrency::Runnable {
int port;
boost::shared_ptr<event_base> userEventBase;
boost::shared_ptr<TProcessor> processor;
boost::shared_ptr<server::TNonblockingServer> server;
+ boost::shared_ptr<ListenEventHandler> listenHandler;
+ Mutex mutex_;
+
+ Runner() {
+ listenHandler.reset(new ListenEventHandler(&mutex_));
+ }
virtual void run() {
// When binding to explicit port, allow retrying to workaround bind failures on ports in use
@@ -58,10 +83,18 @@
startServer(retryCount);
}
+ void readyBarrier() {
+ // block until server is listening and ready to accept connections
+ Guard g(mutex_);
+ while (!listenHandler->ready_) {
+ listenHandler->listenMonitor_.wait();
+ }
+ }
private:
void startServer(int retry_count) {
try {
server.reset(new server::TNonblockingServer(processor, port));
+ server->setServerEventHandler(listenHandler);
if (userEventBase) {
server->registerEvents(userEventBase.get());
}
@@ -112,8 +145,8 @@
false));
thread = threadFactory->newThread(runner);
thread->start();
- // wait 100 ms for the server to begin listening
- THRIFT_SLEEP_USEC(100000);
+ runner->readyBarrier();
+
server = runner->server;
return runner->port;
}