THRIFT-4515: cross server test improvement: graceful test server shutdown

This closes #1509
diff --git a/test/crossrunner/run.py b/test/crossrunner/run.py
index f522bb1..25c58ce 100644
--- a/test/crossrunner/run.py
+++ b/test/crossrunner/run.py
@@ -23,19 +23,20 @@
 import os
 import platform
 import random
-import signal
 import socket
 import subprocess
 import sys
-import threading
 import time
 
 from .compat import str_join
-from .test import TestEntry, domain_socket_path
 from .report import ExecReporter, SummaryReporter
+from .test import TestEntry
+from .util import domain_socket_path
 
-RESULT_TIMEOUT = 128
 RESULT_ERROR = 64
+RESULT_TIMEOUT = 128
+SIGNONE = 0
+SIGKILL = 15
 
 # globals
 ports = None
@@ -43,35 +44,18 @@
 
 
 class ExecutionContext(object):
-    def __init__(self, cmd, cwd, env, report):
+    def __init__(self, cmd, cwd, env, stop_signal, is_server, report):
         self._log = multiprocessing.get_logger()
-        self.report = report
         self.cmd = cmd
         self.cwd = cwd
         self.env = env
-        self.timer = None
+        self.stop_signal = stop_signal
+        self.is_server = is_server
+        self.report = report
         self.expired = False
         self.killed = False
         self.proc = None
 
-    def _expire(self):
-        self._log.info('Timeout')
-        self.expired = True
-        self.kill()
-
-    def kill(self):
-        self._log.debug('Killing process : %d' % self.proc.pid)
-        self.killed = True
-        if platform.system() != 'Windows':
-            try:
-                os.killpg(self.proc.pid, signal.SIGKILL)
-            except Exception:
-                self._log.info('Failed to kill process group', exc_info=sys.exc_info())
-        try:
-            self.proc.kill()
-        except Exception:
-            self._log.info('Failed to kill process', exc_info=sys.exc_info())
-
     def _popen_args(self):
         args = {
             'cwd': self.cwd,
@@ -87,75 +71,125 @@
             args.update(preexec_fn=os.setsid)
         return args
 
-    def start(self, timeout=0):
+    def start(self):
         joined = str_join(' ', self.cmd)
         self._log.debug('COMMAND: %s', joined)
         self._log.debug('WORKDIR: %s', self.cwd)
         self._log.debug('LOGFILE: %s', self.report.logpath)
         self.report.begin()
         self.proc = subprocess.Popen(self.cmd, **self._popen_args())
-        if timeout > 0:
-            self.timer = threading.Timer(timeout, self._expire)
-            self.timer.start()
+        self._log.debug('    PID: %d', self.proc.pid)
+        self._log.debug('   PGID: %d', os.getpgid(self.proc.pid))
         return self._scoped()
 
     @contextlib.contextmanager
     def _scoped(self):
         yield self
-        self._log.debug('Killing scoped process')
-        if self.proc.poll() is None:
-            self.kill()
-            self.report.killed()
+        if self.is_server:
+            # the server is supposed to run until we stop it
+            if self.returncode is not None:
+                self.report.died()
+            else:
+                if self.stop_signal != SIGNONE:
+                    if self.sigwait(self.stop_signal):
+                        self.report.end(self.returncode)
+                    else:
+                        self.report.killed()
+                else:
+                    self.sigwait(SIGKILL)
         else:
-            self._log.debug('Process died unexpectedly')
-            self.report.died()
+            # the client is supposed to exit normally
+            if self.returncode is not None:
+                self.report.end(self.returncode)
+            else:
+                self.sigwait(SIGKILL)
+                self.report.killed()
+        self._log.debug('[{0}] exited with return code {1}'.format(self.proc.pid, self.returncode))
 
-    def wait(self):
-        self.proc.communicate()
-        if self.timer:
-            self.timer.cancel()
-        self.report.end(self.returncode)
+    # Send a signal to the process and then wait for it to end
+    # If the signal requested is SIGNONE, no signal is sent, and
+    # instead we just wait for the process to end; further if it
+    # does not end normally with SIGNONE, we mark it as expired.
+    # If the process fails to end and the signal is not SIGKILL,
+    # it re-runs with SIGKILL so that a real process kill occurs
+    # returns True if the process ended, False if it may not have
+    def sigwait(self, sig=SIGKILL, timeout=2):
+        try:
+            if sig != SIGNONE:
+                self._log.debug('[{0}] send signal {1}'.format(self.proc.pid, sig))
+                if sig == SIGKILL:
+                    self.killed = True
+                try:
+                    if platform.system() != 'Windows':
+                        os.killpg(os.getpgid(self.proc.pid), sig)
+                    else:
+                        self.proc.send_signal(sig)
+                except Exception:
+                    self._log.info('[{0}] Failed to kill process'.format(self.proc.pid), exc_info=sys.exc_info())
+            self._log.debug('[{0}] wait begin, timeout {1} sec(s)'.format(self.proc.pid, timeout))
+            self.proc.communicate(timeout=timeout)
+            self._log.debug('[{0}] process ended with return code {1}'.format(self.proc.pid, self.returncode))
+            self.report.end(self.returncode)
+            return True
+        except subprocess.TimeoutExpired:
+            self._log.info('[{0}] timeout waiting for process to end'.format(self.proc.pid))
+            if sig == SIGNONE:
+                self.expired = True
+            return False if sig == SIGKILL else self.sigwait(SIGKILL, 1)
+
+    # called on the client process to wait for it to end naturally
+    def wait(self, timeout):
+        self.sigwait(SIGNONE, timeout)
 
     @property
     def returncode(self):
         return self.proc.returncode if self.proc else None
 
 
-def exec_context(port, logdir, test, prog):
+def exec_context(port, logdir, test, prog, is_server):
     report = ExecReporter(logdir, test, prog)
     prog.build_command(port)
-    return ExecutionContext(prog.command, prog.workdir, prog.env, report)
+    return ExecutionContext(prog.command, prog.workdir, prog.env, prog.stop_signal, is_server, report)
 
 
 def run_test(testdir, logdir, test_dict, max_retry, async=True):
     logger = multiprocessing.get_logger()
 
-    def ensure_socket_open(proc, port, max_delay):
-        sleeped = 0.1
-        time.sleep(sleeped)
-        sleep_step = 0.2
+    def ensure_socket_open(sv, port, test):
+        slept = 0.1
+        time.sleep(slept)
+        sleep_step = 0.1
         while True:
-            # Create sockets every iteration because refused sockets cannot be
-            # reused on some systems.
-            sock4 = socket.socket()
-            sock6 = socket.socket(family=socket.AF_INET6)
-            try:
-                if sock4.connect_ex(('127.0.0.1', port)) == 0 \
-                        or sock6.connect_ex(('::1', port)) == 0:
-                    return True
-                if proc.poll() is not None:
-                    logger.warn('server process is exited')
-                    return False
-                if sleeped > max_delay:
-                    logger.warn('sleeped for %f seconds but server port is not open' % sleeped)
-                    return False
-                time.sleep(sleep_step)
-                sleeped += sleep_step
-            finally:
-                sock4.close()
-                sock6.close()
-        logger.debug('waited %f sec for server port open' % sleeped)
-        return True
+            if slept > test.delay:
+                logger.warn('[{0}] slept for {1} seconds but server is not open'.format(sv.proc.pid, slept))
+                return False
+            if test.socket == 'domain':
+                if not os.path.exists(domain_socket_path(port)):
+                    logger.debug('[{0}] domain(unix) socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept))
+                    time.sleep(sleep_step)
+                    slept += sleep_step
+            elif test.socket == 'abstract':
+                return True
+            else:
+                # Create sockets every iteration because refused sockets cannot be
+                # reused on some systems.
+                sock4 = socket.socket()
+                sock6 = socket.socket(family=socket.AF_INET6)
+                try:
+                    if sock4.connect_ex(('127.0.0.1', port)) == 0 \
+                            or sock6.connect_ex(('::1', port)) == 0:
+                        return True
+                    if sv.proc.poll() is not None:
+                        logger.warn('[{0}] server process is exited'.format(sv.proc.pid))
+                        return False
+                    logger.debug('[{0}] socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept))
+                    time.sleep(sleep_step)
+                    slept += sleep_step
+                finally:
+                    sock4.close()
+                    sock6.close()
+            logger.debug('[{0}] server ready - waited for {1} seconds'.format(sv.proc.pid, slept))
+            return True
 
     try:
         max_bind_retry = 3
@@ -169,31 +203,27 @@
             logger.debug('Start')
             with PortAllocator.alloc_port_scoped(ports, test.socket) as port:
                 logger.debug('Start with port %d' % port)
-                sv = exec_context(port, logdir, test, test.server)
-                cl = exec_context(port, logdir, test, test.client)
+                sv = exec_context(port, logdir, test, test.server, True)
+                cl = exec_context(port, logdir, test, test.client, False)
 
                 logger.debug('Starting server')
                 with sv.start():
-                    if test.socket in ('domain', 'abstract'):
-                        time.sleep(0.1)
-                        port_ok = True
-                    else:
-                        port_ok = ensure_socket_open(sv.proc, port, test.delay)
+                    port_ok = ensure_socket_open(sv, port, test)
                     if port_ok:
                         connect_retry_count = 0
-                        max_connect_retry = 3
-                        connect_retry_wait = 0.5
+                        max_connect_retry = 12
+                        connect_retry_wait = 0.25
                         while True:
                             if sv.proc.poll() is not None:
                                 logger.info('not starting client because server process is absent')
                                 break
                             logger.debug('Starting client')
-                            cl.start(test.timeout)
-                            logger.debug('Waiting client')
-                            cl.wait()
+                            cl.start()
+                            logger.debug('Waiting client (up to %d secs)' % test.timeout)
+                            cl.wait(test.timeout)
                             if not cl.report.maybe_false_positive() or connect_retry_count >= max_connect_retry:
                                 if connect_retry_count > 0 and connect_retry_count < max_connect_retry:
-                                    logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait))
+                                    logger.info('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait))
                                 # Wait for 50ms to see if server does not die at the end.
                                 time.sleep(0.05)
                                 break
@@ -205,12 +235,18 @@
                 logger.warn('[%s]: Detected socket bind failure, retrying...', test.server.name)
                 bind_retry_count += 1
             else:
-                if cl.expired:
-                    result = RESULT_TIMEOUT
+                result = RESULT_TIMEOUT if cl.expired else cl.returncode if cl.proc.poll() is not None else RESULT_ERROR
+
+                # For servers that handle a controlled shutdown by signal
+                # if they are killed, or return an error code, that is a
+                # problem.  For servers that are not signal-aware, we simply
+                # kill them off; if we didn't kill them off, something else
+                # happened (crashed?)
+                if test.server.stop_signal != 0:
+                    if sv.killed or sv.returncode > 0:
+                        result |= RESULT_ERROR
                 else:
-                    result = cl.proc.returncode if cl.proc else RESULT_ERROR
                     if not sv.killed:
-                        # Server died without being killed.
                         result |= RESULT_ERROR
 
                 if result == 0 or retry_count >= max_retry: