THRIFT-4515: cross server test improvement: graceful test server shutdown

This closes #1509
diff --git a/build/cmake/ConfigureChecks.cmake b/build/cmake/ConfigureChecks.cmake
index e4793d4..6b9c6a3 100644
--- a/build/cmake/ConfigureChecks.cmake
+++ b/build/cmake/ConfigureChecks.cmake
@@ -34,6 +34,7 @@
 check_include_file(inttypes.h HAVE_INTTYPES_H)
 check_include_file(netdb.h HAVE_NETDB_H)
 check_include_file(netinet/in.h HAVE_NETINET_IN_H)
+check_include_file(signal.h HAVE_SIGNAL_H)
 check_include_file(stdint.h HAVE_STDINT_H)
 check_include_file(unistd.h HAVE_UNISTD_H)
 check_include_file(pthread.h HAVE_PTHREAD_H)
diff --git a/build/cmake/config.h.in b/build/cmake/config.h.in
index 21561b2..c5d4d30 100644
--- a/build/cmake/config.h.in
+++ b/build/cmake/config.h.in
@@ -91,6 +91,9 @@
 /* Define to 1 if you have the <netinet/in.h> header file. */
 #cmakedefine HAVE_NETINET_IN_H 1
 
+/* Define to 1 if you have the <signal.h> header file. */
+#cmakedefine HAVE_SIGNAL_H 1
+
 /* Define to 1 if you have the <stdint.h> header file. */
 #cmakedefine HAVE_STDINT_H 1
 
diff --git a/build/docker/scripts/autotools.sh b/build/docker/scripts/autotools.sh
index 67e4f2d..8388f72 100755
--- a/build/docker/scripts/autotools.sh
+++ b/build/docker/scripts/autotools.sh
@@ -1,10 +1,6 @@
 #!/bin/sh
 set -ev
 
-# haxe hxcpp > 3.4.188 will enable c++11 by default, and break the
-#      build when compiling C files with clang++ by adding -std=c++11
-export HXCPP_NO_CPP11=1
-
 ./bootstrap.sh
 ./configure $*
 make check -j3
diff --git a/configure.ac b/configure.ac
index 9efc28b..917f6fa 100755
--- a/configure.ac
+++ b/configure.ac
@@ -633,6 +633,7 @@
 AC_CHECK_HEADERS([netdb.h])
 AC_CHECK_HEADERS([netinet/in.h])
 AC_CHECK_HEADERS([pthread.h])
+AC_CHECK_HEADERS([signal.h])
 AC_CHECK_HEADERS([stddef.h])
 AC_CHECK_HEADERS([stdlib.h])
 AC_CHECK_HEADERS([sys/ioctl.h])
diff --git a/lib/cpp/src/thrift/TOutput.cpp b/lib/cpp/src/thrift/TOutput.cpp
index bb46263..ae3a9e2 100644
--- a/lib/cpp/src/thrift/TOutput.cpp
+++ b/lib/cpp/src/thrift/TOutput.cpp
@@ -94,7 +94,7 @@
 }
 
 void TOutput::perror(const char* message, int errno_copy) {
-  std::string out = message + strerror_s(errno_copy);
+  std::string out = message + std::string(": ") + strerror_s(errno_copy);
   f_(out.c_str());
 }
 
diff --git a/lib/cpp/src/thrift/server/TNonblockingServer.cpp b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
index e60bffc..f89b5f7 100644
--- a/lib/cpp/src/thrift/server/TNonblockingServer.cpp
+++ b/lib/cpp/src/thrift/server/TNonblockingServer.cpp
@@ -659,6 +659,7 @@
         return;
       }
     }
+    // fallthrough
 
   // Intentionally fall through here, the call to process has written into
   // the writeBuffer_
@@ -707,6 +708,7 @@
                               server_->getIdleWriteBufferLimit());
       callsForResize_ = 0;
     }
+    // fallthrough
 
   // N.B.: We also intentionally fall through here into the INIT state!
 
diff --git a/lib/cpp/src/thrift/transport/TSSLSocket.cpp b/lib/cpp/src/thrift/transport/TSSLSocket.cpp
index 7bdacb0..7071698 100644
--- a/lib/cpp/src/thrift/transport/TSSLSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TSSLSocket.cpp
@@ -304,6 +304,7 @@
               && (errno_copy != THRIFT_EAGAIN)) {
             break;
           }
+        // fallthrough
         case SSL_ERROR_WANT_READ:
         case SSL_ERROR_WANT_WRITE:
           // in the case of SSL_ERROR_SYSCALL we want to wait for an read event again
@@ -350,6 +351,7 @@
                   && (errno_copy != THRIFT_EAGAIN)) {
                 break;
               }
+            // fallthrough
             case SSL_ERROR_WANT_READ:
             case SSL_ERROR_WANT_WRITE:
               // in the case of SSL_ERROR_SYSCALL we want to wait for an write/read event again
@@ -415,6 +417,8 @@
           // a certain number
           break;
         }
+      // fallthrough
+
       case SSL_ERROR_WANT_READ:
       case SSL_ERROR_WANT_WRITE:
         if (isLibeventSafe()) {
@@ -471,6 +475,7 @@
               && (errno_copy != THRIFT_EAGAIN)) {
             break;
           }
+        // fallthrough        
         case SSL_ERROR_WANT_READ:
         case SSL_ERROR_WANT_WRITE:
           if (isLibeventSafe()) {
@@ -515,6 +520,7 @@
               && (errno_copy != THRIFT_EAGAIN)) {
             break;
           }
+        // fallthrough
         case SSL_ERROR_WANT_READ:
         case SSL_ERROR_WANT_WRITE:
           if (isLibeventSafe()) {
@@ -602,6 +608,7 @@
                 && (errno_copy != THRIFT_EAGAIN)) {
               break;
             }
+          // fallthrough
           case SSL_ERROR_WANT_READ:
           case SSL_ERROR_WANT_WRITE:
             if (isLibeventSafe()) {
@@ -634,6 +641,7 @@
                 && (errno_copy != THRIFT_EAGAIN)) {
               break;
             }
+          // fallthrough
           case SSL_ERROR_WANT_READ:
           case SSL_ERROR_WANT_WRITE:
             if (isLibeventSafe()) {
diff --git a/lib/cpp/src/thrift/transport/TSocket.cpp b/lib/cpp/src/thrift/transport/TSocket.cpp
index c90593d..18cadbc 100644
--- a/lib/cpp/src/thrift/transport/TSocket.cpp
+++ b/lib/cpp/src/thrift/transport/TSocket.cpp
@@ -809,11 +809,15 @@
 
 string TSocket::getSocketInfo() {
   std::ostringstream oss;
-  if (host_.empty() || port_ == 0) {
-    oss << "<Host: " << getPeerAddress();
-    oss << " Port: " << getPeerPort() << ">";
+  if (path_.empty()) {
+    if (host_.empty() || port_ == 0) {
+      oss << "<Host: " << getPeerAddress();
+      oss << " Port: " << getPeerPort() << ">";
+    } else {
+      oss << "<Host: " << host_ << " Port: " << port_ << ">";
+    }
   } else {
-    oss << "<Host: " << host_ << " Port: " << port_ << ">";
+    oss << "<Path: " << path_ << ">";
   }
   return oss.str();
 }
diff --git a/lib/cpp/test/concurrency/TimerManagerTests.h b/lib/cpp/test/concurrency/TimerManagerTests.h
index 3779b0d..1c52c47 100644
--- a/lib/cpp/test/concurrency/TimerManagerTests.h
+++ b/lib/cpp/test/concurrency/TimerManagerTests.h
@@ -79,14 +79,13 @@
         = shared_ptr<TimerManagerTests::Task>(new TimerManagerTests::Task(_monitor, 10 * timeout));
 
     {
-
       TimerManager timerManager;
-
       timerManager.threadFactory(shared_ptr<PlatformThreadFactory>(new PlatformThreadFactory()));
-
       timerManager.start();
-
-      assert(timerManager.state() == TimerManager::STARTED);
+      if (timerManager.state() != TimerManager::STARTED) {
+        std::cerr << "timerManager is not in the STARTED state, but should be" << std::endl;
+        return false;
+      }
 
       // Don't create task yet, because its constructor sets the expected completion time, and we
       // need to delay between inserting the two tasks into the run queue.
@@ -94,34 +93,27 @@
 
       {
         Synchronized s(_monitor);
-
         timerManager.add(orphanTask, 10 * timeout);
 
-        try {
-          // Wait for 1 second in order to give timerManager a chance to start sleeping in response
-          // to adding orphanTask. We need to do this so we can verify that adding the second task
-          // kicks the dispatcher out of the current wait and starts the new 1 second wait.
-          _monitor.wait(1000);
-          assert(
-              0 == "ERROR: This wait should time out. TimerManager dispatcher may have a problem.");
-        } catch (TimedOutException&) {
-        }
+        THRIFT_SLEEP_USEC(timeout * 1000);
 
         task.reset(new TimerManagerTests::Task(_monitor, timeout));
-
         timerManager.add(task, timeout);
-
         _monitor.wait();
       }
 
-      assert(task->_done);
+      if (!task->_done) {
+        std::cerr << "task is not done, but it should have executed" << std::endl;
+        return false;
+      }
 
       std::cout << "\t\t\t" << (task->_success ? "Success" : "Failure") << "!" << std::endl;
     }
 
-    // timerManager.stop(); This is where it happens via destructor
-
-    assert(!orphanTask->_done);
+    if (orphanTask->_done) {
+      std::cerr << "orphan task is done, but it should not have executed" << std::endl;
+      return false;
+    }
 
     return true;
   }
diff --git a/lib/d/test/thrift_test_server.d b/lib/d/test/thrift_test_server.d
index 71ab917..b582253 100644
--- a/lib/d/test/thrift_test_server.d
+++ b/lib/d/test/thrift_test_server.d
@@ -16,8 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 module thrift_test_server;
 
+import core.stdc.errno : errno;
+import core.stdc.signal : signal, sigfn_t, SIGINT, SIG_DFL, SIG_ERR;
 import core.thread : dur, Thread;
 import std.algorithm;
 import std.exception : enforce;
@@ -40,6 +43,7 @@
 import thrift.transport.framed;
 import thrift.transport.http;
 import thrift.transport.ssl;
+import thrift.util.cancellation;
 import thrift.util.hashset;
 import test_utils;
 
@@ -205,14 +209,44 @@
   bool trace_;
 }
 
+shared(bool) gShutdown = false;
+
+nothrow @nogc extern(C) void handleSignal(int sig) {
+  gShutdown = true;
+}
+
+// Runs a thread that waits for shutdown to be
+// signaled and then triggers cancellation,
+// causing the server to stop.  While we could
+// use a signalfd for this purpose, we are instead
+// opting for a busy waiting scheme for maximum
+// portability since signalfd is a linux thing.
+
+class ShutdownThread : Thread {
+  this(TCancellationOrigin cancellation) {
+    cancellation_ = cancellation;
+    super(&run);
+  }
+  
+private:
+  void run() {
+    while (!gShutdown) {
+      Thread.sleep(dur!("msecs")(25));
+    }
+    cancellation_.trigger();
+  }
+  
+  TCancellationOrigin cancellation_;
+}
+
 void main(string[] args) {
   ushort port = 9090;
   ServerType serverType;
   ProtocolType protocolType;
   size_t numIOThreads = 1;
   TransportType transportType;
-  bool ssl;
-  bool trace;
+  bool ssl = false;
+  bool trace = true;
   size_t taskPoolSize = totalCPUs;
 
   getopt(args, "port", &port, "protocol", &protocolType, "server-type",
@@ -279,8 +313,26 @@
   auto server = createServer(serverType, numIOThreads, taskPoolSize,
     processor, serverSocket, transportFactory, protocolFactory);
 
+  // Set up SIGINT signal handling
+  sigfn_t oldHandler = signal(SIGINT, &handleSignal);
+  enforce(oldHandler != SIG_ERR,
+    "Could not replace the SIGINT signal handler: errno {0}".format(errno()));
+  
+  // Set up a server cancellation trigger
+  auto cancel = new TCancellationOrigin();
+
+  // Set up a listener for the shutdown condition - this will
+  // wake up when the signal occurs and trigger cancellation.
+  auto shutdown = new ShutdownThread(cancel);
+  shutdown.start();
+  
+  // Serve from this thread; the signal will stop the server
+  // and control will return here
   writefln("Starting %s/%s %s ThriftTest server %son port %s...", protocolType,
     transportType, serverType, ssl ? "(using SSL) ": "", port);
-  server.serve();
+  server.serve(cancel);
+  shutdown.join();
+  signal(SIGINT, SIG_DFL);
+    
   writeln("done.");
 }
diff --git a/lib/perl/lib/Thrift/Server.pm b/lib/perl/lib/Thrift/Server.pm
index fc9ca30..f265d45 100644
--- a/lib/perl/lib/Thrift/Server.pm
+++ b/lib/perl/lib/Thrift/Server.pm
@@ -150,27 +150,31 @@
 sub serve
 {
     my $self = shift;
-
+    my $stop = 0;
+    
     $self->{serverTransport}->listen();
-    while (1)
-    {
+    while (!$stop) {
         my $client = $self->{serverTransport}->accept();
-        my $itrans = $self->{inputTransportFactory}->getTransport($client);
-        my $otrans = $self->{outputTransportFactory}->getTransport($client);
-        my $iprot  = $self->{inputProtocolFactory}->getProtocol($itrans);
-        my $oprot  = $self->{outputProtocolFactory}->getProtocol($otrans);
-        eval {
-            $self->_clientBegin($iprot, $oprot);
-            while (1)
-            {
-                $self->{processor}->process($iprot, $oprot);
+        if (defined $client) {
+            my $itrans = $self->{inputTransportFactory}->getTransport($client);
+            my $otrans = $self->{outputTransportFactory}->getTransport($client);
+            my $iprot  = $self->{inputProtocolFactory}->getProtocol($itrans);
+            my $oprot  = $self->{outputProtocolFactory}->getProtocol($otrans);
+            eval {
+                $self->_clientBegin($iprot, $oprot);
+                while (1)
+                {
+                    $self->{processor}->process($iprot, $oprot);
+                }
+            }; if($@) {
+                $self->_handleException($@);
             }
-        }; if($@) {
-            $self->_handleException($@);
-        }
 
-        $itrans->close();
-        $otrans->close();
+            $itrans->close();
+            $otrans->close();
+        } else {
+            $stop = 1;
+        }
     }
 }
 
diff --git a/lib/perl/lib/Thrift/ServerSocket.pm b/lib/perl/lib/Thrift/ServerSocket.pm
index 51f83b4..2c4d906 100644
--- a/lib/perl/lib/Thrift/ServerSocket.pm
+++ b/lib/perl/lib/Thrift/ServerSocket.pm
@@ -81,15 +81,24 @@
 {
     my $self = shift;
 
-    if ( exists $self->{handle} and defined $self->{handle} )
-    {
+    if ( exists $self->{handle} and defined $self->{handle} ) {
         my $client        = $self->{handle}->accept();
         my $result        = $self->__client();
         $result->{handle} = new IO::Select($client);
         return $result;
     }
 
-    return 0;
+    return undef;
+}
+
+sub close
+{
+	my $self = shift;
+	
+    if ( exists $self->{handle} and defined $self->{handle} )
+    {
+        $self->{handle}->close();
+	}
 }
 
 ###
diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp
index b93e5ea..1c38124 100644
--- a/test/cpp/src/TestServer.cpp
+++ b/test/cpp/src/TestServer.cpp
@@ -49,6 +49,9 @@
 #ifdef HAVE_INTTYPES_H
 #include <inttypes.h>
 #endif
+#ifdef HAVE_SIGNAL_H
+#include <signal.h>
+#endif
 
 #include <iostream>
 #include <stdexcept>
@@ -59,7 +62,6 @@
 #include <boost/filesystem.hpp>
 #include <thrift/stdcxx.h>
 
-#include <signal.h>
 #if _WIN32
 #include <thrift/windows/TWinsockSingleton.h>
 #endif
@@ -75,6 +77,17 @@
 
 using namespace thrift::test;
 
+// to handle a controlled shutdown, signal handling is mandatory
+#ifdef HAVE_SIGNAL_H
+apache::thrift::concurrency::Monitor gMonitor;
+void signal_handler(int signum)
+{
+  if (signum == SIGINT) {
+    gMonitor.notifyAll();
+  }
+}
+#endif
+
 class TestHandler : public ThriftTestIf {
 public:
   TestHandler() {}
@@ -635,6 +648,12 @@
     ssl = true;
   }
 
+#if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
+  if (ssl) {
+    signal(SIGPIPE, SIG_IGN); // for OpenSSL, otherwise we end abruptly
+  }
+#endif
+
   if (vm.count("abstract-namespace")) {
     abstract_namespace = true;
   }
@@ -770,14 +789,14 @@
       TEvhttpServer nonblockingServer(testBufferProcessor, port);
       nonblockingServer.serve();
     } else if (transport_type == "framed") {
-	  stdcxx::shared_ptr<transport::TNonblockingServerTransport> nbSocket;
-	  nbSocket.reset(
-		ssl ? new transport::TNonblockingSSLServerSocket(port, sslSocketFactory)
-		    : new transport::TNonblockingServerSocket(port));
+      stdcxx::shared_ptr<transport::TNonblockingServerTransport> nbSocket;
+      nbSocket.reset(
+          ssl ? new transport::TNonblockingSSLServerSocket(port, sslSocketFactory)
+              : new transport::TNonblockingServerSocket(port));
       server.reset(new TNonblockingServer(testProcessor, protocolFactory, nbSocket));
     } else {
-	  cerr << "server-type nonblocking requires transport of http or framed" << endl;
-	  exit(1);
+      cerr << "server-type nonblocking requires transport of http or framed" << endl;
+      exit(1);
     }
   }
 
@@ -787,18 +806,23 @@
       // if using header
       server->setOutputProtocolFactory(stdcxx::shared_ptr<TProtocolFactory>());
     }
+    
     apache::thrift::concurrency::PlatformThreadFactory factory;
     factory.setDetached(false);
     stdcxx::shared_ptr<apache::thrift::concurrency::Runnable> serverThreadRunner(server);
     stdcxx::shared_ptr<apache::thrift::concurrency::Thread> thread
         = factory.newThread(serverThreadRunner);
-    thread->start();
 
-	// THRIFT-4515: this needs to be improved
-    while (1) {
-		  THRIFT_SLEEP_SEC(1);	// do something other than chew up CPU like crazy
-    }
-	// NOTREACHED
+#ifdef HAVE_SIGNAL_H
+    signal(SIGINT, signal_handler);
+#endif
+
+    thread->start();
+    gMonitor.waitForever();         // wait for a shutdown signal
+    
+#ifdef HAVE_SIGNAL_H
+    signal(SIGINT, SIG_DFL);
+#endif
 
     server->stop();
     thread->join();
@@ -808,3 +832,4 @@
   cout << "done." << endl;
   return 0;
 }
+
diff --git a/test/crossrunner/collect.py b/test/crossrunner/collect.py
index 03b0c36..e2d8978 100644
--- a/test/crossrunner/collect.py
+++ b/test/crossrunner/collect.py
@@ -51,6 +51,7 @@
 ]
 
 DEFAULT_MAX_DELAY = 5
+DEFAULT_SIGNAL = 1
 DEFAULT_TIMEOUT = 5
 
 
@@ -112,7 +113,7 @@
                     yield name, impl1, impl2
 
     def maybe_max(key, o1, o2, default):
-        """maximum of two if present, otherwise defult value"""
+        """maximum of two if present, otherwise default value"""
         v1 = o1.get(key)
         v2 = o2.get(key)
         return max(v1, v2) if v1 and v2 else v1 or v2 or default
@@ -138,6 +139,7 @@
                         'server': merge_metadata(sv, **{'protocol': proto1, 'transport': trans1}),
                         'client': merge_metadata(cl, **{'protocol': proto2, 'transport': trans2}),
                         'delay': maybe_max('delay', sv, cl, DEFAULT_MAX_DELAY),
+                        'stop_signal': maybe_max('stop_signal', sv, cl, DEFAULT_SIGNAL),
                         'timeout': maybe_max('timeout', sv, cl, DEFAULT_TIMEOUT),
                         'protocol': proto,
                         'transport': trans,
diff --git a/test/crossrunner/report.py b/test/crossrunner/report.py
index 76324ed..75f36db 100644
--- a/test/crossrunner/report.py
+++ b/test/crossrunner/report.py
@@ -157,8 +157,10 @@
         ])),
         'client': list(map(re.compile, [
             '[Cc]onnection refused',
-            'Could not connect to localhost',
+            'Could not connect to',
             'ECONNREFUSED',
+            'econnrefused',               # erl
+            'CONNECTION-REFUSED-ERROR',   # cl
             'No such file or directory',  # domain socket
         ])),
     }
@@ -174,6 +176,7 @@
             def match(line):
                 for expr in exprs:
                     if expr.search(line):
+                        self._log.info("maybe false positive: %s" % line)
                         return True
 
             with logfile_open(self.logpath, 'r') as fp:
@@ -204,7 +207,7 @@
     def _print_footer(self, returncode=None):
         self._print_bar()
         if returncode is not None:
-            print('Return code: %d' % returncode, file=self.out)
+            print('Return code: %d (negative values indicate kill by signal)' % returncode, file=self.out)
         else:
             print('Process is killed.', file=self.out)
         self._print_exec_time()
@@ -261,7 +264,8 @@
         if not with_result:
             return '{:24s}{:18s}{:25s}'.format(name[:23], test.protocol[:17], trans[:24])
         else:
-            return '{:24s}{:18s}{:25s}{:s}\n'.format(name[:23], test.protocol[:17], trans[:24], self._result_string(test))
+            return '{:24s}{:18s}{:25s}{:s}\n'.format(name[:23], test.protocol[:17],
+                                                     trans[:24], self._result_string(test))
 
     def _print_test_header(self):
         self._print_bar()
diff --git a/test/crossrunner/run.py b/test/crossrunner/run.py
index f522bb1..25c58ce 100644
--- a/test/crossrunner/run.py
+++ b/test/crossrunner/run.py
@@ -23,19 +23,20 @@
 import os
 import platform
 import random
-import signal
 import socket
 import subprocess
 import sys
-import threading
 import time
 
 from .compat import str_join
-from .test import TestEntry, domain_socket_path
 from .report import ExecReporter, SummaryReporter
+from .test import TestEntry
+from .util import domain_socket_path
 
-RESULT_TIMEOUT = 128
 RESULT_ERROR = 64
+RESULT_TIMEOUT = 128
+SIGNONE = 0
+SIGKILL = 15
 
 # globals
 ports = None
@@ -43,35 +44,18 @@
 
 
 class ExecutionContext(object):
-    def __init__(self, cmd, cwd, env, report):
+    def __init__(self, cmd, cwd, env, stop_signal, is_server, report):
         self._log = multiprocessing.get_logger()
-        self.report = report
         self.cmd = cmd
         self.cwd = cwd
         self.env = env
-        self.timer = None
+        self.stop_signal = stop_signal
+        self.is_server = is_server
+        self.report = report
         self.expired = False
         self.killed = False
         self.proc = None
 
-    def _expire(self):
-        self._log.info('Timeout')
-        self.expired = True
-        self.kill()
-
-    def kill(self):
-        self._log.debug('Killing process : %d' % self.proc.pid)
-        self.killed = True
-        if platform.system() != 'Windows':
-            try:
-                os.killpg(self.proc.pid, signal.SIGKILL)
-            except Exception:
-                self._log.info('Failed to kill process group', exc_info=sys.exc_info())
-        try:
-            self.proc.kill()
-        except Exception:
-            self._log.info('Failed to kill process', exc_info=sys.exc_info())
-
     def _popen_args(self):
         args = {
             'cwd': self.cwd,
@@ -87,75 +71,125 @@
             args.update(preexec_fn=os.setsid)
         return args
 
-    def start(self, timeout=0):
+    def start(self):
         joined = str_join(' ', self.cmd)
         self._log.debug('COMMAND: %s', joined)
         self._log.debug('WORKDIR: %s', self.cwd)
         self._log.debug('LOGFILE: %s', self.report.logpath)
         self.report.begin()
         self.proc = subprocess.Popen(self.cmd, **self._popen_args())
-        if timeout > 0:
-            self.timer = threading.Timer(timeout, self._expire)
-            self.timer.start()
+        self._log.debug('    PID: %d', self.proc.pid)
+        self._log.debug('   PGID: %d', os.getpgid(self.proc.pid))
         return self._scoped()
 
     @contextlib.contextmanager
     def _scoped(self):
         yield self
-        self._log.debug('Killing scoped process')
-        if self.proc.poll() is None:
-            self.kill()
-            self.report.killed()
+        if self.is_server:
+            # the server is supposed to run until we stop it
+            if self.returncode is not None:
+                self.report.died()
+            else:
+                if self.stop_signal != SIGNONE:
+                    if self.sigwait(self.stop_signal):
+                        self.report.end(self.returncode)
+                    else:
+                        self.report.killed()
+                else:
+                    self.sigwait(SIGKILL)
         else:
-            self._log.debug('Process died unexpectedly')
-            self.report.died()
+            # the client is supposed to exit normally
+            if self.returncode is not None:
+                self.report.end(self.returncode)
+            else:
+                self.sigwait(SIGKILL)
+                self.report.killed()
+        self._log.debug('[{0}] exited with return code {1}'.format(self.proc.pid, self.returncode))
 
-    def wait(self):
-        self.proc.communicate()
-        if self.timer:
-            self.timer.cancel()
-        self.report.end(self.returncode)
+    # Send a signal to the process and then wait for it to end
+    # If the signal requested is SIGNONE, no signal is sent, and
+    # instead we just wait for the process to end; further if it
+    # does not end normally with SIGNONE, we mark it as expired.
+    # If the process fails to end and the signal is not SIGKILL,
+    # it re-runs with SIGKILL so that a real process kill occurs
+    # returns True if the process ended, False if it may not have
+    def sigwait(self, sig=SIGKILL, timeout=2):
+        try:
+            if sig != SIGNONE:
+                self._log.debug('[{0}] send signal {1}'.format(self.proc.pid, sig))
+                if sig == SIGKILL:
+                    self.killed = True
+                try:
+                    if platform.system() != 'Windows':
+                        os.killpg(os.getpgid(self.proc.pid), sig)
+                    else:
+                        self.proc.send_signal(sig)
+                except Exception:
+                    self._log.info('[{0}] Failed to kill process'.format(self.proc.pid), exc_info=sys.exc_info())
+            self._log.debug('[{0}] wait begin, timeout {1} sec(s)'.format(self.proc.pid, timeout))
+            self.proc.communicate(timeout=timeout)
+            self._log.debug('[{0}] process ended with return code {1}'.format(self.proc.pid, self.returncode))
+            self.report.end(self.returncode)
+            return True
+        except subprocess.TimeoutExpired:
+            self._log.info('[{0}] timeout waiting for process to end'.format(self.proc.pid))
+            if sig == SIGNONE:
+                self.expired = True
+            return False if sig == SIGKILL else self.sigwait(SIGKILL, 1)
+
+    # called on the client process to wait for it to end naturally
+    def wait(self, timeout):
+        self.sigwait(SIGNONE, timeout)
 
     @property
     def returncode(self):
         return self.proc.returncode if self.proc else None
 
 
-def exec_context(port, logdir, test, prog):
+def exec_context(port, logdir, test, prog, is_server):
     report = ExecReporter(logdir, test, prog)
     prog.build_command(port)
-    return ExecutionContext(prog.command, prog.workdir, prog.env, report)
+    return ExecutionContext(prog.command, prog.workdir, prog.env, prog.stop_signal, is_server, report)
 
 
 def run_test(testdir, logdir, test_dict, max_retry, async=True):
     logger = multiprocessing.get_logger()
 
-    def ensure_socket_open(proc, port, max_delay):
-        sleeped = 0.1
-        time.sleep(sleeped)
-        sleep_step = 0.2
+    def ensure_socket_open(sv, port, test):
+        slept = 0.1
+        time.sleep(slept)
+        sleep_step = 0.1
         while True:
-            # Create sockets every iteration because refused sockets cannot be
-            # reused on some systems.
-            sock4 = socket.socket()
-            sock6 = socket.socket(family=socket.AF_INET6)
-            try:
-                if sock4.connect_ex(('127.0.0.1', port)) == 0 \
-                        or sock6.connect_ex(('::1', port)) == 0:
-                    return True
-                if proc.poll() is not None:
-                    logger.warn('server process is exited')
-                    return False
-                if sleeped > max_delay:
-                    logger.warn('sleeped for %f seconds but server port is not open' % sleeped)
-                    return False
-                time.sleep(sleep_step)
-                sleeped += sleep_step
-            finally:
-                sock4.close()
-                sock6.close()
-        logger.debug('waited %f sec for server port open' % sleeped)
-        return True
+            if slept > test.delay:
+                logger.warn('[{0}] slept for {1} seconds but server is not open'.format(sv.proc.pid, slept))
+                return False
+            if test.socket == 'domain':
+                if not os.path.exists(domain_socket_path(port)):
+                    logger.debug('[{0}] domain(unix) socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept))
+                    time.sleep(sleep_step)
+                    slept += sleep_step
+            elif test.socket == 'abstract':
+                return True
+            else:
+                # Create sockets every iteration because refused sockets cannot be
+                # reused on some systems.
+                sock4 = socket.socket()
+                sock6 = socket.socket(family=socket.AF_INET6)
+                try:
+                    if sock4.connect_ex(('127.0.0.1', port)) == 0 \
+                            or sock6.connect_ex(('::1', port)) == 0:
+                        return True
+                    if sv.proc.poll() is not None:
+                        logger.warn('[{0}] server process is exited'.format(sv.proc.pid))
+                        return False
+                    logger.debug('[{0}] socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept))
+                    time.sleep(sleep_step)
+                    slept += sleep_step
+                finally:
+                    sock4.close()
+                    sock6.close()
+            logger.debug('[{0}] server ready - waited for {1} seconds'.format(sv.proc.pid, slept))
+            return True
 
     try:
         max_bind_retry = 3
@@ -169,31 +203,27 @@
             logger.debug('Start')
             with PortAllocator.alloc_port_scoped(ports, test.socket) as port:
                 logger.debug('Start with port %d' % port)
-                sv = exec_context(port, logdir, test, test.server)
-                cl = exec_context(port, logdir, test, test.client)
+                sv = exec_context(port, logdir, test, test.server, True)
+                cl = exec_context(port, logdir, test, test.client, False)
 
                 logger.debug('Starting server')
                 with sv.start():
-                    if test.socket in ('domain', 'abstract'):
-                        time.sleep(0.1)
-                        port_ok = True
-                    else:
-                        port_ok = ensure_socket_open(sv.proc, port, test.delay)
+                    port_ok = ensure_socket_open(sv, port, test)
                     if port_ok:
                         connect_retry_count = 0
-                        max_connect_retry = 3
-                        connect_retry_wait = 0.5
+                        max_connect_retry = 12
+                        connect_retry_wait = 0.25
                         while True:
                             if sv.proc.poll() is not None:
                                 logger.info('not starting client because server process is absent')
                                 break
                             logger.debug('Starting client')
-                            cl.start(test.timeout)
-                            logger.debug('Waiting client')
-                            cl.wait()
+                            cl.start()
+                            logger.debug('Waiting client (up to %d secs)' % test.timeout)
+                            cl.wait(test.timeout)
                             if not cl.report.maybe_false_positive() or connect_retry_count >= max_connect_retry:
                                 if connect_retry_count > 0 and connect_retry_count < max_connect_retry:
-                                    logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait))
+                                    logger.info('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait))
                                 # Wait for 50ms to see if server does not die at the end.
                                 time.sleep(0.05)
                                 break
@@ -205,12 +235,18 @@
                 logger.warn('[%s]: Detected socket bind failure, retrying...', test.server.name)
                 bind_retry_count += 1
             else:
-                if cl.expired:
-                    result = RESULT_TIMEOUT
+                result = RESULT_TIMEOUT if cl.expired else cl.returncode if cl.proc.poll() is not None else RESULT_ERROR
+
+                # For servers that handle a controlled shutdown by signal
+                # if they are killed, or return an error code, that is a
+                # problem.  For servers that are not signal-aware, we simply
+                # kill them off; if we didn't kill them off, something else
+                # happened (crashed?)
+                if test.server.stop_signal != 0:
+                    if sv.killed or sv.returncode > 0:
+                        result |= RESULT_ERROR
                 else:
-                    result = cl.proc.returncode if cl.proc else RESULT_ERROR
                     if not sv.killed:
-                        # Server died without being killed.
                         result |= RESULT_ERROR
 
                 if result == 0 or retry_count >= max_retry:
diff --git a/test/crossrunner/test.py b/test/crossrunner/test.py
index 74fd916..633e926 100644
--- a/test/crossrunner/test.py
+++ b/test/crossrunner/test.py
@@ -22,22 +22,20 @@
 import os
 import sys
 from .compat import path_join
-from .util import merge_dict
-
-
-def domain_socket_path(port):
-    return '/tmp/ThriftTest.thrift.%d' % port
+from .util import merge_dict, domain_socket_path
 
 
 class TestProgram(object):
-    def __init__(self, kind, name, protocol, transport, socket, workdir, command, env=None,
+    def __init__(self, kind, name, protocol, transport, socket, workdir, stop_signal, command, env=None,
                  extra_args=[], extra_args2=[], join_args=False, **kwargs):
+
         self.kind = kind
         self.name = name
         self.protocol = protocol
         self.transport = transport
         self.socket = socket
         self.workdir = workdir
+        self.stop_signal = stop_signal
         self.command = None
         self._base_command = self._fix_cmd_path(command)
         if env:
diff --git a/test/crossrunner/util.py b/test/crossrunner/util.py
index e2d195a..c214df8 100644
--- a/test/crossrunner/util.py
+++ b/test/crossrunner/util.py
@@ -20,6 +20,10 @@
 import copy
 
 
+def domain_socket_path(port):
+    return '/tmp/ThriftTest.thrift.%d' % port
+
+
 def merge_dict(base, update):
     """Update dict concatenating list values"""
     res = copy.deepcopy(base)
diff --git a/test/perl/TestServer.pl b/test/perl/TestServer.pl
index 7d8f929..e8c1cfa 100644
--- a/test/perl/TestServer.pl
+++ b/test/perl/TestServer.pl
@@ -26,6 +26,8 @@
 use Getopt::Long qw(GetOptions);
 use Time::HiRes qw(gettimeofday);
 
+$SIG{INT} = \&sigint_handler;
+ 
 use lib '../../lib/perl/lib';
 use lib 'gen-perl';
 
@@ -146,6 +148,12 @@
 my $server = new Thrift::SimpleServer($processor, $serversocket, $transport, $protocol);
 print "Starting \"simple\" server ($opts{transport}/$opts{protocol}) listen on: $listening_on\n";
 $server->serve();
+print "done.\n";
+
+sub sigint_handler {
+  print "received SIGINT, stopping...\n";
+  $server->stop();
+}
 
 ###
 ### Test server implementation
diff --git a/test/test.py b/test/test.py
index 5a015ea..24e7c4e 100755
--- a/test/test.py
+++ b/test/test.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements. See the NOTICE file
@@ -18,12 +18,13 @@
 # under the License.
 #
 
-# Apache Thrift - integration test suite
+#
+# Apache Thrift - integration (cross) test suite
 #
 # tests different server-client, protocol and transport combinations
 #
-# This script supports python 2.7 and later.
-# python 3.x is recommended for better stability.
+# This script requires python 3.x due to the improvements in
+# subprocess management that are needed for reliability.
 #
 
 from __future__ import print_function
@@ -38,6 +39,12 @@
 import crossrunner
 from crossrunner.compat import path_join
 
+# 3.3 introduced subprocess timeouts on waiting for child
+req_version = (3, 3)
+cur_version = sys.version_info
+assert (cur_version >= req_version), "Python 3.3 or later is required for proper operation."
+
+
 ROOT_DIR = os.path.dirname(os.path.realpath(os.path.dirname(__file__)))
 TEST_DIR_RELATIVE = 'test'
 TEST_DIR = path_join(ROOT_DIR, TEST_DIR_RELATIVE)
@@ -161,9 +168,11 @@
             options.update_failures, options.print_failures)
     elif options.features is not None:
         features = options.features or ['.*']
-        res = run_feature_tests(server_match, features, options.jobs, options.skip_known_failures, options.retry_count, options.regex)
+        res = run_feature_tests(server_match, features, options.jobs,
+                                options.skip_known_failures, options.retry_count, options.regex)
     else:
-        res = run_cross_tests(server_match, client_match, options.jobs, options.skip_known_failures, options.retry_count, options.regex)
+        res = run_cross_tests(server_match, client_match, options.jobs,
+                              options.skip_known_failures, options.retry_count, options.regex)
     return 0 if res else 1
 
 
diff --git a/test/tests.json b/test/tests.json
index 671c667..9c7668d 100644
--- a/test/tests.json
+++ b/test/tests.json
@@ -63,7 +63,8 @@
     "name": "d",
     "server": {
       "command": [
-        "thrift_test_server"
+        "thrift_test_server",
+	"--trace"
       ]
     },
     "client": {
@@ -438,12 +439,12 @@
       "compact",
       "json"
     ],
-        "server": {
+    "server": {
       "command": [
         "dotnet",
         "run",
-  "--no-build",
-                "--project=Server/Server.csproj",
+        "--no-build",
+        "--project=Server/Server.csproj",
         "server"
       ]
     },
@@ -452,8 +453,8 @@
       "command": [
         "dotnet",
         "run",
-  "--no-build",
-                "--project=Client/Client.csproj",
+        "--no-build",
+        "--project=Client/Client.csproj",
         "client"
       ]
     },