THRIFT-4515: cross server test improvement: graceful test server shutdown

This closes #1509
diff --git a/lib/cpp/src/thrift/TOutput.cpp b/lib/cpp/src/thrift/TOutput.cpp
index bb46263..ae3a9e2 100644
--- a/lib/cpp/src/thrift/TOutput.cpp
+++ b/lib/cpp/src/thrift/TOutput.cpp
@@ -94,7 +94,7 @@
 }
 
 void TOutput::perror(const char* message, int errno_copy) {
-  std::string out = message + strerror_s(errno_copy);
+  std::string out = message + std::string(": ") + strerror_s(errno_copy);
   f_(out.c_str());
 }
 
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index e60bffc..f89b5f7 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -659,6 +659,7 @@
         return;
       }
     }
+    // fallthrough
 
   // Intentionally fall through here, the call to process has written into
   // the writeBuffer_
@@ -707,6 +708,7 @@
                               server_->getIdleWriteBufferLimit());
       callsForResize_ = 0;
     }
+    // fallthrough
 
   // N.B.: We also intentionally fall through here into the INIT state!
 
diff --git a/lib/cpp/src/thrift/transport/TSSLSocket.cpp b/lib/cpp/src/thrift/transport/TSSLSocket.cpp
index 7bdacb0..7071698 100644
--- a/lib/cpp/src/thrift/transport/TSSLSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TSSLSocket.cpp
@@ -304,6 +304,7 @@
               && (errno_copy != THRIFT_EAGAIN)) {
             break;
           }
+        // fallthrough
         case SSL_ERROR_WANT_READ:
         case SSL_ERROR_WANT_WRITE:
           // in the case of SSL_ERROR_SYSCALL we want to wait for an read event again
@@ -350,6 +351,7 @@
                   && (errno_copy != THRIFT_EAGAIN)) {
                 break;
               }
+            // fallthrough
             case SSL_ERROR_WANT_READ:
             case SSL_ERROR_WANT_WRITE:
               // in the case of SSL_ERROR_SYSCALL we want to wait for an write/read event again
@@ -415,6 +417,8 @@
           // a certain number
           break;
         }
+      // fallthrough
+
       case SSL_ERROR_WANT_READ:
       case SSL_ERROR_WANT_WRITE:
         if (isLibeventSafe()) {
@@ -471,6 +475,7 @@
               && (errno_copy != THRIFT_EAGAIN)) {
             break;
           }
+        // fallthrough        
         case SSL_ERROR_WANT_READ:
         case SSL_ERROR_WANT_WRITE:
           if (isLibeventSafe()) {
@@ -515,6 +520,7 @@
               && (errno_copy != THRIFT_EAGAIN)) {
             break;
           }
+        // fallthrough
         case SSL_ERROR_WANT_READ:
         case SSL_ERROR_WANT_WRITE:
           if (isLibeventSafe()) {
@@ -602,6 +608,7 @@
                 && (errno_copy != THRIFT_EAGAIN)) {
               break;
             }
+          // fallthrough
           case SSL_ERROR_WANT_READ:
           case SSL_ERROR_WANT_WRITE:
             if (isLibeventSafe()) {
@@ -634,6 +641,7 @@
                 && (errno_copy != THRIFT_EAGAIN)) {
               break;
             }
+          // fallthrough
           case SSL_ERROR_WANT_READ:
           case SSL_ERROR_WANT_WRITE:
             if (isLibeventSafe()) {
diff --git a/lib/cpp/src/thrift/transport/TSocket.cpp b/lib/cpp/src/thrift/transport/TSocket.cpp
index c90593d..18cadbc 100644
--- a/lib/cpp/src/thrift/transport/TSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TSocket.cpp
@@ -809,11 +809,15 @@
 
 string TSocket::getSocketInfo() {
   std::ostringstream oss;
-  if (host_.empty() || port_ == 0) {
-    oss << "<Host: " << getPeerAddress();
-    oss << " Port: " << getPeerPort() << ">";
+  if (path_.empty()) {
+    if (host_.empty() || port_ == 0) {
+      oss << "<Host: " << getPeerAddress();
+      oss << " Port: " << getPeerPort() << ">";
+    } else {
+      oss << "<Host: " << host_ << " Port: " << port_ << ">";
+    }
   } else {
-    oss << "<Host: " << host_ << " Port: " << port_ << ">";
+    oss << "<Path: " << path_ << ">";
   }
   return oss.str();
 }
diff --git a/lib/cpp/test/concurrency/TimerManagerTests.h b/lib/cpp/test/concurrency/TimerManagerTests.h
index 3779b0d..1c52c47 100644
--- a/lib/cpp/test/concurrency/TimerManagerTests.h
+++ b/lib/cpp/test/concurrency/TimerManagerTests.h
@@ -79,14 +79,13 @@
         = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, 10 * timeout));
 
     {
-
       TimerManager timerManager;
-
       timerManager.threadFactory(shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()));
-
       timerManager.start();
-
-      assert(timerManager.state() == TimerManager::STARTED);
+      if (timerManager.state() != TimerManager::STARTED) {
+        std::cerr << "timerManager is not in the STARTED state, but should be" << std::endl;
+        return false;
+      }
 
       // Don't create task yet, because its constructor sets the expected completion time, and we
       // need to delay between inserting the two tasks into the run queue.
@@ -94,34 +93,27 @@
 
       {
         Synchronized s(_monitor);
-
         timerManager.add(orphanTask, 10 * timeout);
 
-        try {
-          // Wait for 1 second in order to give timerManager a chance to start sleeping in response
-          // to adding orphanTask. We need to do this so we can verify that adding the second task
-          // kicks the dispatcher out of the current wait and starts the new 1 second wait.
-          _monitor.wait(1000);
-          assert(
-              0 == "ERROR: This wait should time out. TimerManager dispatcher may have a problem.");
-        } catch (TimedOutException&) {
-        }
+        THRIFT_SLEEP_USEC(timeout * 1000);
 
         task.reset(new TimerManagerTests::Task(_monitor, timeout));
-
         timerManager.add(task, timeout);
-
         _monitor.wait();
       }
 
-      assert(task->_done);
+      if (!task->_done) {
+        std::cerr << "task is not done, but it should have executed" << std::endl;
+        return false;
+      }
 
       std::cout << "\t\t\t" << (task->_success ? "Success" : "Failure") << "!" << std::endl;
     }
 
-    // timerManager.stop(); This is where it happens via destructor
-
-    assert(!orphanTask->_done);
+    if (orphanTask->_done) {
+      std::cerr << "orphan task is done, but it should not have executed" << std::endl;
+      return false;
+    }
 
     return true;
   }
diff --git a/lib/d/test/thrift_test_server.d b/lib/d/test/thrift_test_server.d
index 71ab917..b582253 100644
--- a/lib/d/test/thrift_test_server.d
+++ b/lib/d/test/thrift_test_server.d
@@ -16,8 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 module thrift_test_server;
 
+import core.stdc.errno : errno;
+import core.stdc.signal : signal, sigfn_t, SIGINT, SIG_DFL, SIG_ERR;
 import core.thread : dur, Thread;
 import std.algorithm;
 import std.exception : enforce;
@@ -40,6 +43,7 @@
 import thrift.transport.framed;
 import thrift.transport.http;
 import thrift.transport.ssl;
+import thrift.util.cancellation;
 import thrift.util.hashset;
 import test_utils;
 
@@ -205,14 +209,44 @@
   bool trace_;
 }
 
+shared(bool) gShutdown = false;
+
+nothrow @nogc extern(C) void handleSignal(int sig) {
+  gShutdown = true;
+}
+
+// Runs a thread that waits for shutdown to be
+// signaled and then triggers cancellation,
+// causing the server to stop.  While we could
+// use a signalfd for this purpose, we are instead
+// opting for a busy waiting scheme for maximum
+// portability since signalfd is a linux thing.
+
+class ShutdownThread : Thread {
+  this(TCancellationOrigin cancellation) {
+    cancellation_ = cancellation;
+    super(&run);
+  }
+  
+private:
+  void run() {
+    while (!gShutdown) {
+      Thread.sleep(dur!("msecs")(25));
+    }
+    cancellation_.trigger();
+  }
+  
+  TCancellationOrigin cancellation_;
+}
+
 void main(string[] args) {
   ushort port = 9090;
   ServerType serverType;
   ProtocolType protocolType;
   size_t numIOThreads = 1;
   TransportType transportType;
-  bool ssl;
-  bool trace;
+  bool ssl = false;
+  bool trace = true;
   size_t taskPoolSize = totalCPUs;
 
   getopt(args, "port", &port, "protocol", &protocolType, "server-type",
@@ -279,8 +313,26 @@
   auto server = createServer(serverType, numIOThreads, taskPoolSize,
     processor, serverSocket, transportFactory, protocolFactory);
 
+  // Set up SIGINT signal handling
+  sigfn_t oldHandler = signal(SIGINT, &handleSignal);
+  enforce(oldHandler != SIG_ERR,
+    "Could not replace the SIGINT signal handler: errno {0}".format(errno()));
+  
+  // Set up a server cancellation trigger
+  auto cancel = new TCancellationOrigin();
+
+  // Set up a listener for the shutdown condition - this will
+  // wake up when the signal occurs and trigger cancellation.
+  auto shutdown = new ShutdownThread(cancel);
+  shutdown.start();
+  
+  // Serve from this thread; the signal will stop the server
+  // and control will return here
   writefln("Starting %s/%s %s ThriftTest server %son port %s...", protocolType,
     transportType, serverType, ssl ? "(using SSL) ": "", port);
-  server.serve();
+  server.serve(cancel);
+  shutdown.join();
+  signal(SIGINT, SIG_DFL);
+    
   writeln("done.");
 }
diff --git a/lib/perl/lib/Thrift/Server.pm b/lib/perl/lib/Thrift/Server.pm
index fc9ca30..f265d45 100644
--- a/lib/perl/lib/Thrift/Server.pm
+++ b/lib/perl/lib/Thrift/Server.pm
@@ -150,27 +150,31 @@
 sub serve
 {
     my $self = shift;
-
+    my $stop = 0;
+    
     $self->{serverTransport}->listen();
-    while (1)
-    {
+    while (!$stop) {
         my $client = $self->{serverTransport}->accept();
-        my $itrans = $self->{inputTransportFactory}->getTransport($client);
-        my $otrans = $self->{outputTransportFactory}->getTransport($client);
-        my $iprot  = $self->{inputProtocolFactory}->getProtocol($itrans);
-        my $oprot  = $self->{outputProtocolFactory}->getProtocol($otrans);
-        eval {
-            $self->_clientBegin($iprot, $oprot);
-            while (1)
-            {
-                $self->{processor}->process($iprot, $oprot);
+        if (defined $client) {
+            my $itrans = $self->{inputTransportFactory}->getTransport($client);
+            my $otrans = $self->{outputTransportFactory}->getTransport($client);
+            my $iprot  = $self->{inputProtocolFactory}->getProtocol($itrans);
+            my $oprot  = $self->{outputProtocolFactory}->getProtocol($otrans);
+            eval {
+                $self->_clientBegin($iprot, $oprot);
+                while (1)
+                {
+                    $self->{processor}->process($iprot, $oprot);
+                }
+            }; if($@) {
+                $self->_handleException($@);
             }
-        }; if($@) {
-            $self->_handleException($@);
-        }
 
-        $itrans->close();
-        $otrans->close();
+            $itrans->close();
+            $otrans->close();
+        } else {
+            $stop = 1;
+        }
     }
 }
 
diff --git a/lib/perl/lib/Thrift/ServerSocket.pm b/lib/perl/lib/Thrift/ServerSocket.pm
index 51f83b4..2c4d906 100644
--- a/lib/perl/lib/Thrift/ServerSocket.pm
+++ b/lib/perl/lib/Thrift/ServerSocket.pm
@@ -81,15 +81,24 @@
 {
     my $self = shift;
 
-    if ( exists $self->{handle} and defined $self->{handle} )
-    {
+    if ( exists $self->{handle} and defined $self->{handle} ) {
         my $client        = $self->{handle}->accept();
         my $result        = $self->__client();
         $result->{handle} = new IO::Select($client);
         return $result;
     }
 
-    return 0;
+    return undef;
+}
+
+sub close
+{
+	my $self = shift;
+	
+    if ( exists $self->{handle} and defined $self->{handle} )
+    {
+        $self->{handle}->close();
+	}
 }
 
 ###