THRIFT-4515: cross server test improvement: graceful test server shutdown
This closes #1509
diff --git a/test/cpp/src/TestServer.cpp b/test/cpp/src/TestServer.cpp
index b93e5ea..1c38124 100644
--- a/test/cpp/src/TestServer.cpp
+++ b/test/cpp/src/TestServer.cpp
@@ -49,6 +49,9 @@
 #ifdef HAVE_INTTYPES_H
 #include <inttypes.h>
 #endif
+#ifdef HAVE_SIGNAL_H
+#include <signal.h>
+#endif
 
 #include <iostream>
 #include <stdexcept>
@@ -59,7 +62,6 @@
 #include <boost/filesystem.hpp>
 #include <thrift/stdcxx.h>
 
-#include <signal.h>
 #if _WIN32
 #include <thrift/windows/TWinsockSingleton.h>
 #endif
@@ -75,6 +77,17 @@
 
 using namespace thrift::test;
 
+// to handle a controlled shutdown, signal handling is mandatory
+#ifdef HAVE_SIGNAL_H
+apache::thrift::concurrency::Monitor gMonitor;
+void signal_handler(int signum)
+{
+  if (signum == SIGINT) {
+    gMonitor.notifyAll();
+  }
+}
+#endif
+
 class TestHandler : public ThriftTestIf {
 public:
   TestHandler() {}
@@ -635,6 +648,12 @@
     ssl = true;
   }
 
+#if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
+  if (ssl) {
+    signal(SIGPIPE, SIG_IGN); // for OpenSSL, otherwise we end abruptly
+  }
+#endif
+
   if (vm.count("abstract-namespace")) {
     abstract_namespace = true;
   }
@@ -770,14 +789,14 @@
       TEvhttpServer nonblockingServer(testBufferProcessor, port);
       nonblockingServer.serve();
     } else if (transport_type == "framed") {
-	  stdcxx::shared_ptr<transport::TNonblockingServerTransport> nbSocket;
-	  nbSocket.reset(
-		ssl ? new transport::TNonblockingSSLServerSocket(port, sslSocketFactory)
-		    : new transport::TNonblockingServerSocket(port));
+      stdcxx::shared_ptr<transport::TNonblockingServerTransport> nbSocket;
+      nbSocket.reset(
+          ssl ? new transport::TNonblockingSSLServerSocket(port, sslSocketFactory)
+              : new transport::TNonblockingServerSocket(port));
       server.reset(new TNonblockingServer(testProcessor, protocolFactory, nbSocket));
     } else {
-	  cerr << "server-type nonblocking requires transport of http or framed" << endl;
-	  exit(1);
+      cerr << "server-type nonblocking requires transport of http or framed" << endl;
+      exit(1);
     }
   }
 
@@ -787,18 +806,23 @@
       // if using header
       server->setOutputProtocolFactory(stdcxx::shared_ptr<TProtocolFactory>());
     }
+    
     apache::thrift::concurrency::PlatformThreadFactory factory;
     factory.setDetached(false);
     stdcxx::shared_ptr<apache::thrift::concurrency::Runnable> serverThreadRunner(server);
     stdcxx::shared_ptr<apache::thrift::concurrency::Thread> thread
         = factory.newThread(serverThreadRunner);
-    thread->start();
 
-	// THRIFT-4515: this needs to be improved
-    while (1) {
-		  THRIFT_SLEEP_SEC(1);	// do something other than chew up CPU like crazy
-    }
-	// NOTREACHED
+#ifdef HAVE_SIGNAL_H
+    signal(SIGINT, signal_handler);
+#endif
+
+    thread->start();
+    gMonitor.waitForever();         // wait for a shutdown signal
+    
+#ifdef HAVE_SIGNAL_H
+    signal(SIGINT, SIG_DFL);
+#endif
 
     server->stop();
     thread->join();
@@ -808,3 +832,4 @@
   cout << "done." << endl;
   return 0;
 }
+
diff --git a/test/crossrunner/collect.py b/test/crossrunner/collect.py
index 03b0c36..e2d8978 100644
--- a/test/crossrunner/collect.py
+++ b/test/crossrunner/collect.py
@@ -51,6 +51,7 @@
 ]
 
 DEFAULT_MAX_DELAY = 5
+DEFAULT_SIGNAL = 1
 DEFAULT_TIMEOUT = 5
 
 
@@ -112,7 +113,7 @@
                     yield name, impl1, impl2
 
     def maybe_max(key, o1, o2, default):
-        """maximum of two if present, otherwise defult value"""
+        """maximum of two if present, otherwise default value"""
         v1 = o1.get(key)
         v2 = o2.get(key)
         return max(v1, v2) if v1 and v2 else v1 or v2 or default
@@ -138,6 +139,7 @@
                         'server': merge_metadata(sv, **{'protocol': proto1, 'transport': trans1}),
                         'client': merge_metadata(cl, **{'protocol': proto2, 'transport': trans2}),
                         'delay': maybe_max('delay', sv, cl, DEFAULT_MAX_DELAY),
+                        'stop_signal': maybe_max('stop_signal', sv, cl, DEFAULT_SIGNAL),
                         'timeout': maybe_max('timeout', sv, cl, DEFAULT_TIMEOUT),
                         'protocol': proto,
                         'transport': trans,
diff --git a/test/crossrunner/report.py b/test/crossrunner/report.py
index 76324ed..75f36db 100644
--- a/test/crossrunner/report.py
+++ b/test/crossrunner/report.py
@@ -157,8 +157,10 @@
         ])),
         'client': list(map(re.compile, [
             '[Cc]onnection refused',
-            'Could not connect to localhost',
+            'Could not connect to',
             'ECONNREFUSED',
+            'econnrefused',               # erl
+            'CONNECTION-REFUSED-ERROR',   # cl
             'No such file or directory',  # domain socket
         ])),
     }
@@ -174,6 +176,7 @@
             def match(line):
                 for expr in exprs:
                     if expr.search(line):
+                        self._log.info("maybe false positive: %s" % line)
                         return True
 
             with logfile_open(self.logpath, 'r') as fp:
@@ -204,7 +207,7 @@
     def _print_footer(self, returncode=None):
         self._print_bar()
         if returncode is not None:
-            print('Return code: %d' % returncode, file=self.out)
+            print('Return code: %d (negative values indicate kill by signal)' % returncode, file=self.out)
         else:
             print('Process is killed.', file=self.out)
         self._print_exec_time()
@@ -261,7 +264,8 @@
         if not with_result:
             return '{:24s}{:18s}{:25s}'.format(name[:23], test.protocol[:17], trans[:24])
         else:
-            return '{:24s}{:18s}{:25s}{:s}\n'.format(name[:23], test.protocol[:17], trans[:24], self._result_string(test))
+            return '{:24s}{:18s}{:25s}{:s}\n'.format(name[:23], test.protocol[:17],
+                                                     trans[:24], self._result_string(test))
 
     def _print_test_header(self):
         self._print_bar()
diff --git a/test/crossrunner/run.py b/test/crossrunner/run.py
index f522bb1..25c58ce 100644
--- a/test/crossrunner/run.py
+++ b/test/crossrunner/run.py
@@ -23,19 +23,20 @@
 import os
 import platform
 import random
-import signal
 import socket
 import subprocess
 import sys
-import threading
 import time
 
 from .compat import str_join
-from .test import TestEntry, domain_socket_path
 from .report import ExecReporter, SummaryReporter
+from .test import TestEntry
+from .util import domain_socket_path
 
-RESULT_TIMEOUT = 128
 RESULT_ERROR = 64
+RESULT_TIMEOUT = 128
+SIGNONE = 0
+SIGKILL = 15
 
 # globals
 ports = None
@@ -43,35 +44,18 @@
 
 
 class ExecutionContext(object):
-    def __init__(self, cmd, cwd, env, report):
+    def __init__(self, cmd, cwd, env, stop_signal, is_server, report):
         self._log = multiprocessing.get_logger()
-        self.report = report
         self.cmd = cmd
         self.cwd = cwd
         self.env = env
-        self.timer = None
+        self.stop_signal = stop_signal
+        self.is_server = is_server
+        self.report = report
         self.expired = False
         self.killed = False
         self.proc = None
 
-    def _expire(self):
-        self._log.info('Timeout')
-        self.expired = True
-        self.kill()
-
-    def kill(self):
-        self._log.debug('Killing process : %d' % self.proc.pid)
-        self.killed = True
-        if platform.system() != 'Windows':
-            try:
-                os.killpg(self.proc.pid, signal.SIGKILL)
-            except Exception:
-                self._log.info('Failed to kill process group', exc_info=sys.exc_info())
-        try:
-            self.proc.kill()
-        except Exception:
-            self._log.info('Failed to kill process', exc_info=sys.exc_info())
-
     def _popen_args(self):
         args = {
             'cwd': self.cwd,
@@ -87,75 +71,125 @@
             args.update(preexec_fn=os.setsid)
         return args
 
-    def start(self, timeout=0):
+    def start(self):
         joined = str_join(' ', self.cmd)
         self._log.debug('COMMAND: %s', joined)
         self._log.debug('WORKDIR: %s', self.cwd)
         self._log.debug('LOGFILE: %s', self.report.logpath)
         self.report.begin()
         self.proc = subprocess.Popen(self.cmd, **self._popen_args())
-        if timeout > 0:
-            self.timer = threading.Timer(timeout, self._expire)
-            self.timer.start()
+        self._log.debug('    PID: %d', self.proc.pid)
+        self._log.debug('   PGID: %d', os.getpgid(self.proc.pid))
         return self._scoped()
 
     @contextlib.contextmanager
     def _scoped(self):
         yield self
-        self._log.debug('Killing scoped process')
-        if self.proc.poll() is None:
-            self.kill()
-            self.report.killed()
+        if self.is_server:
+            # the server is supposed to run until we stop it
+            if self.returncode is not None:
+                self.report.died()
+            else:
+                if self.stop_signal != SIGNONE:
+                    if self.sigwait(self.stop_signal):
+                        self.report.end(self.returncode)
+                    else:
+                        self.report.killed()
+                else:
+                    self.sigwait(SIGKILL)
         else:
-            self._log.debug('Process died unexpectedly')
-            self.report.died()
+            # the client is supposed to exit normally
+            if self.returncode is not None:
+                self.report.end(self.returncode)
+            else:
+                self.sigwait(SIGKILL)
+                self.report.killed()
+        self._log.debug('[{0}] exited with return code {1}'.format(self.proc.pid, self.returncode))
 
-    def wait(self):
-        self.proc.communicate()
-        if self.timer:
-            self.timer.cancel()
-        self.report.end(self.returncode)
+    # Send a signal to the process and then wait for it to end
+    # If the signal requested is SIGNONE, no signal is sent, and
+    # instead we just wait for the process to end; further if it
+    # does not end normally with SIGNONE, we mark it as expired.
+    # If the process fails to end and the signal is not SIGKILL,
+    # it re-runs with SIGKILL so that a real process kill occurs
+    # returns True if the process ended, False if it may not have
+    def sigwait(self, sig=SIGKILL, timeout=2):
+        try:
+            if sig != SIGNONE:
+                self._log.debug('[{0}] send signal {1}'.format(self.proc.pid, sig))
+                if sig == SIGKILL:
+                    self.killed = True
+                try:
+                    if platform.system() != 'Windows':
+                        os.killpg(os.getpgid(self.proc.pid), sig)
+                    else:
+                        self.proc.send_signal(sig)
+                except Exception:
+                    self._log.info('[{0}] Failed to kill process'.format(self.proc.pid), exc_info=sys.exc_info())
+            self._log.debug('[{0}] wait begin, timeout {1} sec(s)'.format(self.proc.pid, timeout))
+            self.proc.communicate(timeout=timeout)
+            self._log.debug('[{0}] process ended with return code {1}'.format(self.proc.pid, self.returncode))
+            self.report.end(self.returncode)
+            return True
+        except subprocess.TimeoutExpired:
+            self._log.info('[{0}] timeout waiting for process to end'.format(self.proc.pid))
+            if sig == SIGNONE:
+                self.expired = True
+            return False if sig == SIGKILL else self.sigwait(SIGKILL, 1)
+
+    # called on the client process to wait for it to end naturally
+    def wait(self, timeout):
+        self.sigwait(SIGNONE, timeout)
 
     @property
     def returncode(self):
         return self.proc.returncode if self.proc else None
 
 
-def exec_context(port, logdir, test, prog):
+def exec_context(port, logdir, test, prog, is_server):
     report = ExecReporter(logdir, test, prog)
     prog.build_command(port)
-    return ExecutionContext(prog.command, prog.workdir, prog.env, report)
+    return ExecutionContext(prog.command, prog.workdir, prog.env, prog.stop_signal, is_server, report)
 
 
 def run_test(testdir, logdir, test_dict, max_retry, async=True):
     logger = multiprocessing.get_logger()
 
-    def ensure_socket_open(proc, port, max_delay):
-        sleeped = 0.1
-        time.sleep(sleeped)
-        sleep_step = 0.2
+    def ensure_socket_open(sv, port, test):
+        slept = 0.1
+        time.sleep(slept)
+        sleep_step = 0.1
         while True:
-            # Create sockets every iteration because refused sockets cannot be
-            # reused on some systems.
-            sock4 = socket.socket()
-            sock6 = socket.socket(family=socket.AF_INET6)
-            try:
-                if sock4.connect_ex(('127.0.0.1', port)) == 0 \
-                        or sock6.connect_ex(('::1', port)) == 0:
-                    return True
-                if proc.poll() is not None:
-                    logger.warn('server process is exited')
-                    return False
-                if sleeped > max_delay:
-                    logger.warn('sleeped for %f seconds but server port is not open' % sleeped)
-                    return False
-                time.sleep(sleep_step)
-                sleeped += sleep_step
-            finally:
-                sock4.close()
-                sock6.close()
-        logger.debug('waited %f sec for server port open' % sleeped)
-        return True
+            if slept > test.delay:
+                logger.warn('[{0}] slept for {1} seconds but server is not open'.format(sv.proc.pid, slept))
+                return False
+            if test.socket == 'domain':
+                if not os.path.exists(domain_socket_path(port)):
+                    logger.debug('[{0}] domain(unix) socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept))
+                    time.sleep(sleep_step)
+                    slept += sleep_step
+            elif test.socket == 'abstract':
+                return True
+            else:
+                # Create sockets every iteration because refused sockets cannot be
+                # reused on some systems.
+                sock4 = socket.socket()
+                sock6 = socket.socket(family=socket.AF_INET6)
+                try:
+                    if sock4.connect_ex(('127.0.0.1', port)) == 0 \
+                            or sock6.connect_ex(('::1', port)) == 0:
+                        return True
+                    if sv.proc.poll() is not None:
+                        logger.warn('[{0}] server process is exited'.format(sv.proc.pid))
+                        return False
+                    logger.debug('[{0}] socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept))
+                    time.sleep(sleep_step)
+                    slept += sleep_step
+                finally:
+                    sock4.close()
+                    sock6.close()
+            logger.debug('[{0}] server ready - waited for {1} seconds'.format(sv.proc.pid, slept))
+            return True
 
     try:
         max_bind_retry = 3
@@ -169,31 +203,27 @@
             logger.debug('Start')
             with PortAllocator.alloc_port_scoped(ports, test.socket) as port:
                 logger.debug('Start with port %d' % port)
-                sv = exec_context(port, logdir, test, test.server)
-                cl = exec_context(port, logdir, test, test.client)
+                sv = exec_context(port, logdir, test, test.server, True)
+                cl = exec_context(port, logdir, test, test.client, False)
 
                 logger.debug('Starting server')
                 with sv.start():
-                    if test.socket in ('domain', 'abstract'):
-                        time.sleep(0.1)
-                        port_ok = True
-                    else:
-                        port_ok = ensure_socket_open(sv.proc, port, test.delay)
+                    port_ok = ensure_socket_open(sv, port, test)
                     if port_ok:
                         connect_retry_count = 0
-                        max_connect_retry = 3
-                        connect_retry_wait = 0.5
+                        max_connect_retry = 12
+                        connect_retry_wait = 0.25
                         while True:
                             if sv.proc.poll() is not None:
                                 logger.info('not starting client because server process is absent')
                                 break
                             logger.debug('Starting client')
-                            cl.start(test.timeout)
-                            logger.debug('Waiting client')
-                            cl.wait()
+                            cl.start()
+                            logger.debug('Waiting client (up to %d secs)' % test.timeout)
+                            cl.wait(test.timeout)
                             if not cl.report.maybe_false_positive() or connect_retry_count >= max_connect_retry:
                                 if connect_retry_count > 0 and connect_retry_count < max_connect_retry:
-                                    logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait))
+                                    logger.info('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait))
                                 # Wait for 50ms to see if server does not die at the end.
                                 time.sleep(0.05)
                                 break
@@ -205,12 +235,18 @@
                 logger.warn('[%s]: Detected socket bind failure, retrying...', test.server.name)
                 bind_retry_count += 1
             else:
-                if cl.expired:
-                    result = RESULT_TIMEOUT
+                result = RESULT_TIMEOUT if cl.expired else cl.returncode if cl.proc.poll() is not None else RESULT_ERROR
+
+                # For servers that handle a controlled shutdown by signal
+                # if they are killed, or return an error code, that is a
+                # problem.  For servers that are not signal-aware, we simply
+                # kill them off; if we didn't kill them off, something else
+                # happened (crashed?)
+                if test.server.stop_signal != 0:
+                    if sv.killed or sv.returncode > 0:
+                        result |= RESULT_ERROR
                 else:
-                    result = cl.proc.returncode if cl.proc else RESULT_ERROR
                     if not sv.killed:
-                        # Server died without being killed.
                         result |= RESULT_ERROR
 
                 if result == 0 or retry_count >= max_retry:
diff --git a/test/crossrunner/test.py b/test/crossrunner/test.py
index 74fd916..633e926 100644
--- a/test/crossrunner/test.py
+++ b/test/crossrunner/test.py
@@ -22,22 +22,20 @@
 import os
 import sys
 from .compat import path_join
-from .util import merge_dict
-
-
-def domain_socket_path(port):
-    return '/tmp/ThriftTest.thrift.%d' % port
+from .util import merge_dict, domain_socket_path
 
 
 class TestProgram(object):
-    def __init__(self, kind, name, protocol, transport, socket, workdir, command, env=None,
+    def __init__(self, kind, name, protocol, transport, socket, workdir, stop_signal, command, env=None,
                  extra_args=[], extra_args2=[], join_args=False, **kwargs):
+
         self.kind = kind
         self.name = name
         self.protocol = protocol
         self.transport = transport
         self.socket = socket
         self.workdir = workdir
+        self.stop_signal = stop_signal
         self.command = None
         self._base_command = self._fix_cmd_path(command)
         if env:
diff --git a/test/crossrunner/util.py b/test/crossrunner/util.py
index e2d195a..c214df8 100644
--- a/test/crossrunner/util.py
+++ b/test/crossrunner/util.py
@@ -20,6 +20,10 @@
 import copy
 
 
+def domain_socket_path(port):
+    return '/tmp/ThriftTest.thrift.%d' % port
+
+
 def merge_dict(base, update):
     """Update dict concatenating list values"""
     res = copy.deepcopy(base)
diff --git a/test/perl/TestServer.pl b/test/perl/TestServer.pl
index 7d8f929..e8c1cfa 100644
--- a/test/perl/TestServer.pl
+++ b/test/perl/TestServer.pl
@@ -26,6 +26,8 @@
 use Getopt::Long qw(GetOptions);
 use Time::HiRes qw(gettimeofday);
 
+$SIG{INT} = \&sigint_handler;
+ 
 use lib '../../lib/perl/lib';
 use lib 'gen-perl';
 
@@ -146,6 +148,12 @@
 my $server = new Thrift::SimpleServer($processor, $serversocket, $transport, $protocol);
 print "Starting \"simple\" server ($opts{transport}/$opts{protocol}) listen on: $listening_on\n";
 $server->serve();
+print "done.\n";
+
+sub sigint_handler {
+  print "received SIGINT, stopping...\n";
+  $server->stop();
+}
 
 ###
 ### Test server implementation
diff --git a/test/test.py b/test/test.py
index 5a015ea..24e7c4e 100755
--- a/test/test.py
+++ b/test/test.py
@@ -1,4 +1,4 @@
-#!/usr/bin/env python
+#!/usr/bin/env python3
 #
 # Licensed to the Apache Software Foundation (ASF) under one
 # or more contributor license agreements. See the NOTICE file
@@ -18,12 +18,13 @@
 # under the License.
 #
 
-# Apache Thrift - integration test suite
+#
+# Apache Thrift - integration (cross) test suite
 #
 # tests different server-client, protocol and transport combinations
 #
-# This script supports python 2.7 and later.
-# python 3.x is recommended for better stability.
+# This script requires python 3.x due to the improvements in
+# subprocess management that are needed for reliability.
 #
 
 from __future__ import print_function
@@ -38,6 +39,12 @@
 import crossrunner
 from crossrunner.compat import path_join
 
+# 3.3 introduced subprocess timeouts on waiting for child
+req_version = (3, 3)
+cur_version = sys.version_info
+assert (cur_version >= req_version), "Python 3.3 or later is required for proper operation."
+
+
 ROOT_DIR = os.path.dirname(os.path.realpath(os.path.dirname(__file__)))
 TEST_DIR_RELATIVE = 'test'
 TEST_DIR = path_join(ROOT_DIR, TEST_DIR_RELATIVE)
@@ -161,9 +168,11 @@
             options.update_failures, options.print_failures)
     elif options.features is not None:
         features = options.features or ['.*']
-        res = run_feature_tests(server_match, features, options.jobs, options.skip_known_failures, options.retry_count, options.regex)
+        res = run_feature_tests(server_match, features, options.jobs,
+                                options.skip_known_failures, options.retry_count, options.regex)
     else:
-        res = run_cross_tests(server_match, client_match, options.jobs, options.skip_known_failures, options.retry_count, options.regex)
+        res = run_cross_tests(server_match, client_match, options.jobs,
+                              options.skip_known_failures, options.retry_count, options.regex)
     return 0 if res else 1
 
 
diff --git a/test/tests.json b/test/tests.json
index 671c667..9c7668d 100644
--- a/test/tests.json
+++ b/test/tests.json
@@ -63,7 +63,8 @@
     "name": "d",
     "server": {
       "command": [
-        "thrift_test_server"
+        "thrift_test_server",
+	"--trace"
       ]
     },
     "client": {
@@ -438,12 +439,12 @@
       "compact",
       "json"
     ],
-        "server": {
+    "server": {
       "command": [
         "dotnet",
         "run",
-  "--no-build",
-                "--project=Server/Server.csproj",
+        "--no-build",
+        "--project=Server/Server.csproj",
         "server"
       ]
     },
@@ -452,8 +453,8 @@
       "command": [
         "dotnet",
         "run",
-  "--no-build",
-                "--project=Client/Client.csproj",
+        "--no-build",
+        "--project=Client/Client.csproj",
         "client"
       ]
     },