THRIFT-4515: cross server test improvement: graceful test server shutdown

This closes #1509
diff --git a/test/crossrunner/collect.py b/test/crossrunner/collect.py
index 03b0c36..e2d8978 100644
--- a/test/crossrunner/collect.py
+++ b/test/crossrunner/collect.py
@@ -51,6 +51,7 @@
 ]
 
 DEFAULT_MAX_DELAY = 5
+DEFAULT_SIGNAL = 1
 DEFAULT_TIMEOUT = 5
 
 
@@ -112,7 +113,7 @@
                     yield name, impl1, impl2
 
     def maybe_max(key, o1, o2, default):
-        """maximum of two if present, otherwise defult value"""
+        """maximum of two if present, otherwise default value"""
         v1 = o1.get(key)
         v2 = o2.get(key)
         return max(v1, v2) if v1 and v2 else v1 or v2 or default
@@ -138,6 +139,7 @@
                         'server': merge_metadata(sv, **{'protocol': proto1, 'transport': trans1}),
                         'client': merge_metadata(cl, **{'protocol': proto2, 'transport': trans2}),
                         'delay': maybe_max('delay', sv, cl, DEFAULT_MAX_DELAY),
+                        'stop_signal': maybe_max('stop_signal', sv, cl, DEFAULT_SIGNAL),
                         'timeout': maybe_max('timeout', sv, cl, DEFAULT_TIMEOUT),
                         'protocol': proto,
                         'transport': trans,
diff --git a/test/crossrunner/report.py b/test/crossrunner/report.py
index 76324ed..75f36db 100644
--- a/test/crossrunner/report.py
+++ b/test/crossrunner/report.py
@@ -157,8 +157,10 @@
         ])),
         'client': list(map(re.compile, [
             '[Cc]onnection refused',
-            'Could not connect to localhost',
+            'Could not connect to',
             'ECONNREFUSED',
+            'econnrefused',               # erl
+            'CONNECTION-REFUSED-ERROR',   # cl
             'No such file or directory',  # domain socket
         ])),
     }
@@ -174,6 +176,7 @@
             def match(line):
                 for expr in exprs:
                     if expr.search(line):
+                        self._log.info("maybe false positive: %s" % line)
                         return True
 
             with logfile_open(self.logpath, 'r') as fp:
@@ -204,7 +207,7 @@
     def _print_footer(self, returncode=None):
         self._print_bar()
         if returncode is not None:
-            print('Return code: %d' % returncode, file=self.out)
+            print('Return code: %d (negative values indicate kill by signal)' % returncode, file=self.out)
         else:
             print('Process is killed.', file=self.out)
         self._print_exec_time()
@@ -261,7 +264,8 @@
         if not with_result:
             return '{:24s}{:18s}{:25s}'.format(name[:23], test.protocol[:17], trans[:24])
         else:
-            return '{:24s}{:18s}{:25s}{:s}\n'.format(name[:23], test.protocol[:17], trans[:24], self._result_string(test))
+            return '{:24s}{:18s}{:25s}{:s}\n'.format(name[:23], test.protocol[:17],
+                                                     trans[:24], self._result_string(test))
 
     def _print_test_header(self):
         self._print_bar()
diff --git a/test/crossrunner/run.py b/test/crossrunner/run.py
index f522bb1..25c58ce 100644
--- a/test/crossrunner/run.py
+++ b/test/crossrunner/run.py
@@ -23,19 +23,20 @@
 import os
 import platform
 import random
-import signal
 import socket
 import subprocess
 import sys
-import threading
 import time
 
 from .compat import str_join
-from .test import TestEntry, domain_socket_path
 from .report import ExecReporter, SummaryReporter
+from .test import TestEntry
+from .util import domain_socket_path
 
-RESULT_TIMEOUT = 128
 RESULT_ERROR = 64
+RESULT_TIMEOUT = 128
+SIGNONE = 0
+SIGKILL = 15
 
 # globals
 ports = None
@@ -43,35 +44,18 @@
 
 
 class ExecutionContext(object):
-    def __init__(self, cmd, cwd, env, report):
+    def __init__(self, cmd, cwd, env, stop_signal, is_server, report):
         self._log = multiprocessing.get_logger()
-        self.report = report
         self.cmd = cmd
         self.cwd = cwd
         self.env = env
-        self.timer = None
+        self.stop_signal = stop_signal
+        self.is_server = is_server
+        self.report = report
         self.expired = False
         self.killed = False
         self.proc = None
 
-    def _expire(self):
-        self._log.info('Timeout')
-        self.expired = True
-        self.kill()
-
-    def kill(self):
-        self._log.debug('Killing process : %d' % self.proc.pid)
-        self.killed = True
-        if platform.system() != 'Windows':
-            try:
-                os.killpg(self.proc.pid, signal.SIGKILL)
-            except Exception:
-                self._log.info('Failed to kill process group', exc_info=sys.exc_info())
-        try:
-            self.proc.kill()
-        except Exception:
-            self._log.info('Failed to kill process', exc_info=sys.exc_info())
-
     def _popen_args(self):
         args = {
             'cwd': self.cwd,
@@ -87,75 +71,125 @@
             args.update(preexec_fn=os.setsid)
         return args
 
-    def start(self, timeout=0):
+    def start(self):
         joined = str_join(' ', self.cmd)
         self._log.debug('COMMAND: %s', joined)
         self._log.debug('WORKDIR: %s', self.cwd)
         self._log.debug('LOGFILE: %s', self.report.logpath)
         self.report.begin()
         self.proc = subprocess.Popen(self.cmd, **self._popen_args())
-        if timeout > 0:
-            self.timer = threading.Timer(timeout, self._expire)
-            self.timer.start()
+        self._log.debug('    PID: %d', self.proc.pid)
+        self._log.debug('   PGID: %d', os.getpgid(self.proc.pid))
         return self._scoped()
 
     @contextlib.contextmanager
     def _scoped(self):
         yield self
-        self._log.debug('Killing scoped process')
-        if self.proc.poll() is None:
-            self.kill()
-            self.report.killed()
+        if self.is_server:
+            # the server is supposed to run until we stop it
+            if self.returncode is not None:
+                self.report.died()
+            else:
+                if self.stop_signal != SIGNONE:
+                    if self.sigwait(self.stop_signal):
+                        self.report.end(self.returncode)
+                    else:
+                        self.report.killed()
+                else:
+                    self.sigwait(SIGKILL)
         else:
-            self._log.debug('Process died unexpectedly')
-            self.report.died()
+            # the client is supposed to exit normally
+            if self.returncode is not None:
+                self.report.end(self.returncode)
+            else:
+                self.sigwait(SIGKILL)
+                self.report.killed()
+        self._log.debug('[{0}] exited with return code {1}'.format(self.proc.pid, self.returncode))
 
-    def wait(self):
-        self.proc.communicate()
-        if self.timer:
-            self.timer.cancel()
-        self.report.end(self.returncode)
+    # Send a signal to the process and then wait for it to end
+    # If the signal requested is SIGNONE, no signal is sent, and
+    # instead we just wait for the process to end; further if it
+    # does not end normally with SIGNONE, we mark it as expired.
+    # If the process fails to end and the signal is not SIGKILL,
+    # it re-runs with SIGKILL so that a real process kill occurs
+    # returns True if the process ended, False if it may not have
+    def sigwait(self, sig=SIGKILL, timeout=2):
+        try:
+            if sig != SIGNONE:
+                self._log.debug('[{0}] send signal {1}'.format(self.proc.pid, sig))
+                if sig == SIGKILL:
+                    self.killed = True
+                try:
+                    if platform.system() != 'Windows':
+                        os.killpg(os.getpgid(self.proc.pid), sig)
+                    else:
+                        self.proc.send_signal(sig)
+                except Exception:
+                    self._log.info('[{0}] Failed to kill process'.format(self.proc.pid), exc_info=sys.exc_info())
+            self._log.debug('[{0}] wait begin, timeout {1} sec(s)'.format(self.proc.pid, timeout))
+            self.proc.communicate(timeout=timeout)
+            self._log.debug('[{0}] process ended with return code {1}'.format(self.proc.pid, self.returncode))
+            self.report.end(self.returncode)
+            return True
+        except subprocess.TimeoutExpired:
+            self._log.info('[{0}] timeout waiting for process to end'.format(self.proc.pid))
+            if sig == SIGNONE:
+                self.expired = True
+            return False if sig == SIGKILL else self.sigwait(SIGKILL, 1)
+
+    # called on the client process to wait for it to end naturally
+    def wait(self, timeout):
+        self.sigwait(SIGNONE, timeout)
 
     @property
     def returncode(self):
         return self.proc.returncode if self.proc else None
 
 
-def exec_context(port, logdir, test, prog):
+def exec_context(port, logdir, test, prog, is_server):
     report = ExecReporter(logdir, test, prog)
     prog.build_command(port)
-    return ExecutionContext(prog.command, prog.workdir, prog.env, report)
+    return ExecutionContext(prog.command, prog.workdir, prog.env, prog.stop_signal, is_server, report)
 
 
 def run_test(testdir, logdir, test_dict, max_retry, async=True):
     logger = multiprocessing.get_logger()
 
-    def ensure_socket_open(proc, port, max_delay):
-        sleeped = 0.1
-        time.sleep(sleeped)
-        sleep_step = 0.2
+    def ensure_socket_open(sv, port, test):
+        slept = 0.1
+        time.sleep(slept)
+        sleep_step = 0.1
         while True:
-            # Create sockets every iteration because refused sockets cannot be
-            # reused on some systems.
-            sock4 = socket.socket()
-            sock6 = socket.socket(family=socket.AF_INET6)
-            try:
-                if sock4.connect_ex(('127.0.0.1', port)) == 0 \
-                        or sock6.connect_ex(('::1', port)) == 0:
-                    return True
-                if proc.poll() is not None:
-                    logger.warn('server process is exited')
-                    return False
-                if sleeped > max_delay:
-                    logger.warn('sleeped for %f seconds but server port is not open' % sleeped)
-                    return False
-                time.sleep(sleep_step)
-                sleeped += sleep_step
-            finally:
-                sock4.close()
-                sock6.close()
-        logger.debug('waited %f sec for server port open' % sleeped)
-        return True
+            if slept > test.delay:
+                logger.warn('[{0}] slept for {1} seconds but server is not open'.format(sv.proc.pid, slept))
+                return False
+            if test.socket == 'domain':
+                if not os.path.exists(domain_socket_path(port)):
+                    logger.debug('[{0}] domain(unix) socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept))
+                    time.sleep(sleep_step)
+                    slept += sleep_step
+            elif test.socket == 'abstract':
+                return True
+            else:
+                # Create sockets every iteration because refused sockets cannot be
+                # reused on some systems.
+                sock4 = socket.socket()
+                sock6 = socket.socket(family=socket.AF_INET6)
+                try:
+                    if sock4.connect_ex(('127.0.0.1', port)) == 0 \
+                            or sock6.connect_ex(('::1', port)) == 0:
+                        return True
+                    if sv.proc.poll() is not None:
+                        logger.warn('[{0}] server process is exited'.format(sv.proc.pid))
+                        return False
+                    logger.debug('[{0}] socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept))
+                    time.sleep(sleep_step)
+                    slept += sleep_step
+                finally:
+                    sock4.close()
+                    sock6.close()
+            logger.debug('[{0}] server ready - waited for {1} seconds'.format(sv.proc.pid, slept))
+            return True
 
     try:
         max_bind_retry = 3
@@ -169,31 +203,27 @@
             logger.debug('Start')
             with PortAllocator.alloc_port_scoped(ports, test.socket) as port:
                 logger.debug('Start with port %d' % port)
-                sv = exec_context(port, logdir, test, test.server)
-                cl = exec_context(port, logdir, test, test.client)
+                sv = exec_context(port, logdir, test, test.server, True)
+                cl = exec_context(port, logdir, test, test.client, False)
 
                 logger.debug('Starting server')
                 with sv.start():
-                    if test.socket in ('domain', 'abstract'):
-                        time.sleep(0.1)
-                        port_ok = True
-                    else:
-                        port_ok = ensure_socket_open(sv.proc, port, test.delay)
+                    port_ok = ensure_socket_open(sv, port, test)
                     if port_ok:
                         connect_retry_count = 0
-                        max_connect_retry = 3
-                        connect_retry_wait = 0.5
+                        max_connect_retry = 12
+                        connect_retry_wait = 0.25
                         while True:
                             if sv.proc.poll() is not None:
                                 logger.info('not starting client because server process is absent')
                                 break
                             logger.debug('Starting client')
-                            cl.start(test.timeout)
-                            logger.debug('Waiting client')
-                            cl.wait()
+                            cl.start()
+                            logger.debug('Waiting client (up to %d secs)' % test.timeout)
+                            cl.wait(test.timeout)
                             if not cl.report.maybe_false_positive() or connect_retry_count >= max_connect_retry:
                                 if connect_retry_count > 0 and connect_retry_count < max_connect_retry:
-                                    logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait))
+                                    logger.info('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait))
                                 # Wait for 50ms to see if server does not die at the end.
                                 time.sleep(0.05)
                                 break
@@ -205,12 +235,18 @@
                 logger.warn('[%s]: Detected socket bind failure, retrying...', test.server.name)
                 bind_retry_count += 1
             else:
-                if cl.expired:
-                    result = RESULT_TIMEOUT
+                result = RESULT_TIMEOUT if cl.expired else cl.returncode if cl.proc.poll() is not None else RESULT_ERROR
+
+                # For servers that handle a controlled shutdown by signal
+                # if they are killed, or return an error code, that is a
+                # problem.  For servers that are not signal-aware, we simply
+                # kill them off; if we didn't kill them off, something else
+                # happened (crashed?)
+                if test.server.stop_signal != 0:
+                    if sv.killed or sv.returncode > 0:
+                        result |= RESULT_ERROR
                 else:
-                    result = cl.proc.returncode if cl.proc else RESULT_ERROR
                     if not sv.killed:
-                        # Server died without being killed.
                         result |= RESULT_ERROR
 
                 if result == 0 or retry_count >= max_retry:
diff --git a/test/crossrunner/test.py b/test/crossrunner/test.py
index 74fd916..633e926 100644
--- a/test/crossrunner/test.py
+++ b/test/crossrunner/test.py
@@ -22,22 +22,20 @@
 import os
 import sys
 from .compat import path_join
-from .util import merge_dict
-
-
-def domain_socket_path(port):
-    return '/tmp/ThriftTest.thrift.%d' % port
+from .util import merge_dict, domain_socket_path
 
 
 class TestProgram(object):
-    def __init__(self, kind, name, protocol, transport, socket, workdir, command, env=None,
+    def __init__(self, kind, name, protocol, transport, socket, workdir, stop_signal, command, env=None,
                  extra_args=[], extra_args2=[], join_args=False, **kwargs):
+
         self.kind = kind
         self.name = name
         self.protocol = protocol
         self.transport = transport
         self.socket = socket
         self.workdir = workdir
+        self.stop_signal = stop_signal
         self.command = None
         self._base_command = self._fix_cmd_path(command)
         if env:
diff --git a/test/crossrunner/util.py b/test/crossrunner/util.py
index e2d195a..c214df8 100644
--- a/test/crossrunner/util.py
+++ b/test/crossrunner/util.py
@@ -20,6 +20,10 @@
 import copy
 
 
+def domain_socket_path(port):
+    return '/tmp/ThriftTest.thrift.%d' % port
+
+
 def merge_dict(base, update):
     """Update dict concatenating list values"""
     res = copy.deepcopy(base)