THRIFT-3891 TNonblockingServer configured with more than one IO threads does not always return from serve() upon stop()
Client: C++
Patch: additional changes by jking@apache.org to improve the test and stop clean in all cases

This closes #1080
This closes #1196
diff --git a/lib/cpp/test/TNonblockingServerTest.cpp b/lib/cpp/test/TNonblockingServerTest.cpp
index 48ea913..e933d6b 100644
--- a/lib/cpp/test/TNonblockingServerTest.cpp
+++ b/lib/cpp/test/TNonblockingServerTest.cpp
@@ -21,6 +21,7 @@
 #include <boost/test/unit_test.hpp>
 #include <boost/smart_ptr.hpp>
 
+#include "thrift/concurrency/Monitor.h"
 #include "thrift/concurrency/Thread.h"
 #include "thrift/server/TNonblockingServer.h"
 
@@ -29,6 +30,10 @@
 #include <event.h>
 
 using namespace apache::thrift;
+using apache::thrift::concurrency::Guard;
+using apache::thrift::concurrency::Monitor;
+using apache::thrift::concurrency::Mutex;
+using apache::thrift::server::TServerEventHandler;
 
 struct Handler : public test::ParentServiceIf {
   void addString(const std::string& s) { strings_.push_back(s); }
@@ -46,11 +51,31 @@
 
 class Fixture {
 private:
+  struct ListenEventHandler : public TServerEventHandler {
+    public:
+      ListenEventHandler(Mutex* mutex) : listenMonitor_(mutex), ready_(false) {}
+
+      void preServe() /* override */ {
+        Guard g(listenMonitor_.mutex());
+        ready_ = true;
+        listenMonitor_.notify();
+      }
+
+      Monitor listenMonitor_;
+      bool ready_;
+  };
+
   struct Runner : public apache::thrift::concurrency::Runnable {
     int port;
     boost::shared_ptr<event_base> userEventBase;
     boost::shared_ptr<TProcessor> processor;
     boost::shared_ptr<server::TNonblockingServer> server;
+    boost::shared_ptr<ListenEventHandler> listenHandler;
+    Mutex mutex_;
+
+    Runner() {
+      listenHandler.reset(new ListenEventHandler(&mutex_));
+    }
 
     virtual void run() {
       // When binding to explicit port, allow retrying to workaround bind failures on ports in use
@@ -58,10 +83,18 @@
       startServer(retryCount);
     }
 
+    void readyBarrier() {
+      // block until server is listening and ready to accept connections
+      Guard g(mutex_);
+      while (!listenHandler->ready_) {
+        listenHandler->listenMonitor_.wait();
+      }
+    }
   private:
     void startServer(int retry_count) {
       try {
         server.reset(new server::TNonblockingServer(processor, port));
+        server->setServerEventHandler(listenHandler);
         if (userEventBase) {
           server->registerEvents(userEventBase.get());
         }
@@ -112,8 +145,8 @@
             false));
     thread = threadFactory->newThread(runner);
     thread->start();
-    // wait 100 ms for the server to begin listening
-    THRIFT_SLEEP_USEC(100000);
+    runner->readyBarrier();
+
     server = runner->server;
     return runner->port;
   }