THRIFT-4515: cross server test improvement: graceful test server shutdown
This closes #1509
diff --git a/test/crossrunner/run.py b/test/crossrunner/run.py
index f522bb1..25c58ce 100644
--- a/test/crossrunner/run.py
+++ b/test/crossrunner/run.py
@@ -23,19 +23,20 @@
import os
import platform
import random
-import signal
import socket
import subprocess
import sys
-import threading
import time
from .compat import str_join
-from .test import TestEntry, domain_socket_path
from .report import ExecReporter, SummaryReporter
+from .test import TestEntry
+from .util import domain_socket_path
-RESULT_TIMEOUT = 128
RESULT_ERROR = 64
+RESULT_TIMEOUT = 128
+SIGNONE = 0
+SIGKILL = 15
# globals
ports = None
@@ -43,35 +44,18 @@
class ExecutionContext(object):
- def __init__(self, cmd, cwd, env, report):
+ def __init__(self, cmd, cwd, env, stop_signal, is_server, report):
self._log = multiprocessing.get_logger()
- self.report = report
self.cmd = cmd
self.cwd = cwd
self.env = env
- self.timer = None
+ self.stop_signal = stop_signal
+ self.is_server = is_server
+ self.report = report
self.expired = False
self.killed = False
self.proc = None
- def _expire(self):
- self._log.info('Timeout')
- self.expired = True
- self.kill()
-
- def kill(self):
- self._log.debug('Killing process : %d' % self.proc.pid)
- self.killed = True
- if platform.system() != 'Windows':
- try:
- os.killpg(self.proc.pid, signal.SIGKILL)
- except Exception:
- self._log.info('Failed to kill process group', exc_info=sys.exc_info())
- try:
- self.proc.kill()
- except Exception:
- self._log.info('Failed to kill process', exc_info=sys.exc_info())
-
def _popen_args(self):
args = {
'cwd': self.cwd,
@@ -87,75 +71,125 @@
args.update(preexec_fn=os.setsid)
return args
- def start(self, timeout=0):
+ def start(self):
joined = str_join(' ', self.cmd)
self._log.debug('COMMAND: %s', joined)
self._log.debug('WORKDIR: %s', self.cwd)
self._log.debug('LOGFILE: %s', self.report.logpath)
self.report.begin()
self.proc = subprocess.Popen(self.cmd, **self._popen_args())
- if timeout > 0:
- self.timer = threading.Timer(timeout, self._expire)
- self.timer.start()
+ self._log.debug(' PID: %d', self.proc.pid)
+ self._log.debug(' PGID: %d', os.getpgid(self.proc.pid))
return self._scoped()
@contextlib.contextmanager
def _scoped(self):
yield self
- self._log.debug('Killing scoped process')
- if self.proc.poll() is None:
- self.kill()
- self.report.killed()
+ if self.is_server:
+ # the server is supposed to run until we stop it
+ if self.returncode is not None:
+ self.report.died()
+ else:
+ if self.stop_signal != SIGNONE:
+ if self.sigwait(self.stop_signal):
+ self.report.end(self.returncode)
+ else:
+ self.report.killed()
+ else:
+ self.sigwait(SIGKILL)
else:
- self._log.debug('Process died unexpectedly')
- self.report.died()
+ # the client is supposed to exit normally
+ if self.returncode is not None:
+ self.report.end(self.returncode)
+ else:
+ self.sigwait(SIGKILL)
+ self.report.killed()
+ self._log.debug('[{0}] exited with return code {1}'.format(self.proc.pid, self.returncode))
- def wait(self):
- self.proc.communicate()
- if self.timer:
- self.timer.cancel()
- self.report.end(self.returncode)
+ # Send a signal to the process and then wait for it to end
+ # If the signal requested is SIGNONE, no signal is sent, and
+ # instead we just wait for the process to end; further if it
+ # does not end normally with SIGNONE, we mark it as expired.
+ # If the process fails to end and the signal is not SIGKILL,
+ # it re-runs with SIGKILL so that a real process kill occurs
+ # returns True if the process ended, False if it may not have
+ def sigwait(self, sig=SIGKILL, timeout=2):
+ try:
+ if sig != SIGNONE:
+ self._log.debug('[{0}] send signal {1}'.format(self.proc.pid, sig))
+ if sig == SIGKILL:
+ self.killed = True
+ try:
+ if platform.system() != 'Windows':
+ os.killpg(os.getpgid(self.proc.pid), sig)
+ else:
+ self.proc.send_signal(sig)
+ except Exception:
+ self._log.info('[{0}] Failed to kill process'.format(self.proc.pid), exc_info=sys.exc_info())
+ self._log.debug('[{0}] wait begin, timeout {1} sec(s)'.format(self.proc.pid, timeout))
+ self.proc.communicate(timeout=timeout)
+ self._log.debug('[{0}] process ended with return code {1}'.format(self.proc.pid, self.returncode))
+ self.report.end(self.returncode)
+ return True
+ except subprocess.TimeoutExpired:
+ self._log.info('[{0}] timeout waiting for process to end'.format(self.proc.pid))
+ if sig == SIGNONE:
+ self.expired = True
+ return False if sig == SIGKILL else self.sigwait(SIGKILL, 1)
+
+ # called on the client process to wait for it to end naturally
+ def wait(self, timeout):
+ self.sigwait(SIGNONE, timeout)
@property
def returncode(self):
return self.proc.returncode if self.proc else None
-def exec_context(port, logdir, test, prog):
+def exec_context(port, logdir, test, prog, is_server):
report = ExecReporter(logdir, test, prog)
prog.build_command(port)
- return ExecutionContext(prog.command, prog.workdir, prog.env, report)
+ return ExecutionContext(prog.command, prog.workdir, prog.env, prog.stop_signal, is_server, report)
def run_test(testdir, logdir, test_dict, max_retry, async=True):
logger = multiprocessing.get_logger()
- def ensure_socket_open(proc, port, max_delay):
- sleeped = 0.1
- time.sleep(sleeped)
- sleep_step = 0.2
+ def ensure_socket_open(sv, port, test):
+ slept = 0.1
+ time.sleep(slept)
+ sleep_step = 0.1
while True:
- # Create sockets every iteration because refused sockets cannot be
- # reused on some systems.
- sock4 = socket.socket()
- sock6 = socket.socket(family=socket.AF_INET6)
- try:
- if sock4.connect_ex(('127.0.0.1', port)) == 0 \
- or sock6.connect_ex(('::1', port)) == 0:
- return True
- if proc.poll() is not None:
- logger.warn('server process is exited')
- return False
- if sleeped > max_delay:
- logger.warn('sleeped for %f seconds but server port is not open' % sleeped)
- return False
- time.sleep(sleep_step)
- sleeped += sleep_step
- finally:
- sock4.close()
- sock6.close()
- logger.debug('waited %f sec for server port open' % sleeped)
- return True
+ if slept > test.delay:
+ logger.warn('[{0}] slept for {1} seconds but server is not open'.format(sv.proc.pid, slept))
+ return False
+ if test.socket == 'domain':
+ if not os.path.exists(domain_socket_path(port)):
+ logger.debug('[{0}] domain(unix) socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept))
+ time.sleep(sleep_step)
+ slept += sleep_step
+ elif test.socket == 'abstract':
+ return True
+ else:
+ # Create sockets every iteration because refused sockets cannot be
+ # reused on some systems.
+ sock4 = socket.socket()
+ sock6 = socket.socket(family=socket.AF_INET6)
+ try:
+ if sock4.connect_ex(('127.0.0.1', port)) == 0 \
+ or sock6.connect_ex(('::1', port)) == 0:
+ return True
+ if sv.proc.poll() is not None:
+ logger.warn('[{0}] server process is exited'.format(sv.proc.pid))
+ return False
+ logger.debug('[{0}] socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept))
+ time.sleep(sleep_step)
+ slept += sleep_step
+ finally:
+ sock4.close()
+ sock6.close()
+ logger.debug('[{0}] server ready - waited for {1} seconds'.format(sv.proc.pid, slept))
+ return True
try:
max_bind_retry = 3
@@ -169,31 +203,27 @@
logger.debug('Start')
with PortAllocator.alloc_port_scoped(ports, test.socket) as port:
logger.debug('Start with port %d' % port)
- sv = exec_context(port, logdir, test, test.server)
- cl = exec_context(port, logdir, test, test.client)
+ sv = exec_context(port, logdir, test, test.server, True)
+ cl = exec_context(port, logdir, test, test.client, False)
logger.debug('Starting server')
with sv.start():
- if test.socket in ('domain', 'abstract'):
- time.sleep(0.1)
- port_ok = True
- else:
- port_ok = ensure_socket_open(sv.proc, port, test.delay)
+ port_ok = ensure_socket_open(sv, port, test)
if port_ok:
connect_retry_count = 0
- max_connect_retry = 3
- connect_retry_wait = 0.5
+ max_connect_retry = 12
+ connect_retry_wait = 0.25
while True:
if sv.proc.poll() is not None:
logger.info('not starting client because server process is absent')
break
logger.debug('Starting client')
- cl.start(test.timeout)
- logger.debug('Waiting client')
- cl.wait()
+ cl.start()
+ logger.debug('Waiting client (up to %d secs)' % test.timeout)
+ cl.wait(test.timeout)
if not cl.report.maybe_false_positive() or connect_retry_count >= max_connect_retry:
if connect_retry_count > 0 and connect_retry_count < max_connect_retry:
- logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait))
+ logger.info('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait))
# Wait for 50ms to see if server does not die at the end.
time.sleep(0.05)
break
@@ -205,12 +235,18 @@
logger.warn('[%s]: Detected socket bind failure, retrying...', test.server.name)
bind_retry_count += 1
else:
- if cl.expired:
- result = RESULT_TIMEOUT
+ result = RESULT_TIMEOUT if cl.expired else cl.returncode if cl.proc.poll() is not None else RESULT_ERROR
+
+ # For servers that handle a controlled shutdown by signal
+ # if they are killed, or return an error code, that is a
+ # problem. For servers that are not signal-aware, we simply
+ # kill them off; if we didn't kill them off, something else
+ # happened (crashed?)
+ if test.server.stop_signal != 0:
+ if sv.killed or sv.returncode > 0:
+ result |= RESULT_ERROR
else:
- result = cl.proc.returncode if cl.proc else RESULT_ERROR
if not sv.killed:
- # Server died without being killed.
result |= RESULT_ERROR
if result == 0 or retry_count >= max_retry: