|  | # | 
|  | # Licensed to the Apache Software Foundation (ASF) under one | 
|  | # or more contributor license agreements. See the NOTICE file | 
|  | # distributed with this work for additional information | 
|  | # regarding copyright ownership. The ASF licenses this file | 
|  | # to you under the Apache License, Version 2.0 (the | 
|  | # "License"); you may not use this file except in compliance | 
|  | # with the License. You may obtain a copy of the License at | 
|  | # | 
|  | #   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, | 
|  | # software distributed under the License is distributed on an | 
|  | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | # KIND, either express or implied. See the License for the | 
|  | # specific language governing permissions and limitations | 
|  | # under the License. | 
|  | # | 
|  |  | 
|  | import contextlib | 
|  | import multiprocessing | 
|  | import multiprocessing.managers | 
|  | import os | 
|  | import platform | 
|  | import random | 
|  | import socket | 
|  | import subprocess | 
|  | import sys | 
|  | import time | 
|  |  | 
|  | from .compat import str_join | 
|  | from .report import ExecReporter, SummaryReporter | 
|  | from .test import TestEntry | 
|  | from .util import domain_socket_path | 
|  |  | 
|  | RESULT_ERROR = 64 | 
|  | RESULT_TIMEOUT = 128 | 
|  | SIGNONE = 0 | 
|  | SIGKILL = 15 | 
|  |  | 
|  | # globals | 
|  | ports = None | 
|  | stop = None | 
|  |  | 
|  |  | 
|  | class ExecutionContext(object): | 
|  | def __init__(self, cmd, cwd, env, stop_signal, is_server, report): | 
|  | self._log = multiprocessing.get_logger() | 
|  | self.cmd = cmd | 
|  | self.cwd = cwd | 
|  | self.env = env | 
|  | self.stop_signal = stop_signal | 
|  | self.is_server = is_server | 
|  | self.report = report | 
|  | self.expired = False | 
|  | self.killed = False | 
|  | self.proc = None | 
|  |  | 
|  | def _popen_args(self): | 
|  | args = { | 
|  | 'cwd': self.cwd, | 
|  | 'env': self.env, | 
|  | 'stdout': self.report.out, | 
|  | 'stderr': subprocess.STDOUT, | 
|  | } | 
|  | # make sure child processes doesn't remain after killing | 
|  | if platform.system() == 'Windows': | 
|  | DETACHED_PROCESS = 0x00000008 | 
|  | args.update(creationflags=DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP) | 
|  | else: | 
|  | args.update(preexec_fn=os.setsid) | 
|  | return args | 
|  |  | 
|  | def start(self): | 
|  | joined = str_join(' ', self.cmd) | 
|  | self._log.debug('COMMAND: %s', joined) | 
|  | self._log.debug('WORKDIR: %s', self.cwd) | 
|  | self._log.debug('LOGFILE: %s', self.report.logpath) | 
|  | self.report.begin() | 
|  | self.proc = subprocess.Popen(self.cmd, **self._popen_args()) | 
|  | self._log.debug('    PID: %d', self.proc.pid) | 
|  | self._log.debug('   PGID: %d', os.getpgid(self.proc.pid)) | 
|  | return self._scoped() | 
|  |  | 
|  | @contextlib.contextmanager | 
|  | def _scoped(self): | 
|  | yield self | 
|  | if self.is_server: | 
|  | # the server is supposed to run until we stop it | 
|  | if self.returncode is not None: | 
|  | self.report.died() | 
|  | else: | 
|  | if self.stop_signal != SIGNONE: | 
|  | if self.sigwait(self.stop_signal): | 
|  | self.report.end(self.returncode) | 
|  | else: | 
|  | self.report.killed() | 
|  | else: | 
|  | self.sigwait(SIGKILL) | 
|  | else: | 
|  | # the client is supposed to exit normally | 
|  | if self.returncode is not None: | 
|  | self.report.end(self.returncode) | 
|  | else: | 
|  | self.sigwait(SIGKILL) | 
|  | self.report.killed() | 
|  | self._log.debug('[{0}] exited with return code {1}'.format(self.proc.pid, self.returncode)) | 
|  |  | 
|  | # Send a signal to the process and then wait for it to end | 
|  | # If the signal requested is SIGNONE, no signal is sent, and | 
|  | # instead we just wait for the process to end; further if it | 
|  | # does not end normally with SIGNONE, we mark it as expired. | 
|  | # If the process fails to end and the signal is not SIGKILL, | 
|  | # it re-runs with SIGKILL so that a real process kill occurs | 
|  | # returns True if the process ended, False if it may not have | 
|  | def sigwait(self, sig=SIGKILL, timeout=2): | 
|  | try: | 
|  | if sig != SIGNONE: | 
|  | self._log.debug('[{0}] send signal {1}'.format(self.proc.pid, sig)) | 
|  | if sig == SIGKILL: | 
|  | self.killed = True | 
|  | try: | 
|  | if platform.system() != 'Windows': | 
|  | os.killpg(os.getpgid(self.proc.pid), sig) | 
|  | else: | 
|  | self.proc.send_signal(sig) | 
|  | except Exception: | 
|  | self._log.info('[{0}] Failed to kill process'.format(self.proc.pid), exc_info=sys.exc_info()) | 
|  | self._log.debug('[{0}] wait begin, timeout {1} sec(s)'.format(self.proc.pid, timeout)) | 
|  | self.proc.communicate(timeout=timeout) | 
|  | self._log.debug('[{0}] process ended with return code {1}'.format(self.proc.pid, self.returncode)) | 
|  | self.report.end(self.returncode) | 
|  | return True | 
|  | except subprocess.TimeoutExpired: | 
|  | self._log.info('[{0}] timeout waiting for process to end'.format(self.proc.pid)) | 
|  | if sig == SIGNONE: | 
|  | self.expired = True | 
|  | return False if sig == SIGKILL else self.sigwait(SIGKILL, 1) | 
|  |  | 
|  | # called on the client process to wait for it to end naturally | 
|  | def wait(self, timeout): | 
|  | self.sigwait(SIGNONE, timeout) | 
|  |  | 
|  | @property | 
|  | def returncode(self): | 
|  | return self.proc.returncode if self.proc else None | 
|  |  | 
|  |  | 
|  | def exec_context(port, logdir, test, prog, is_server): | 
|  | report = ExecReporter(logdir, test, prog) | 
|  | prog.build_command(port) | 
|  | return ExecutionContext(prog.command, prog.workdir, prog.env, prog.stop_signal, is_server, report) | 
|  |  | 
|  |  | 
|  | def run_test(testdir, logdir, test_dict, max_retry, async_mode=True): | 
|  | logger = multiprocessing.get_logger() | 
|  |  | 
|  | def ensure_socket_open(sv, port, test): | 
|  | slept = 0.1 | 
|  | time.sleep(slept) | 
|  | sleep_step = 0.1 | 
|  | while True: | 
|  | if slept > test.delay: | 
|  | logger.warn('[{0}] slept for {1} seconds but server is not open'.format(sv.proc.pid, slept)) | 
|  | return False | 
|  | if test.socket == 'domain': | 
|  | if not os.path.exists(domain_socket_path(port)): | 
|  | logger.debug('[{0}] domain(unix) socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept)) | 
|  | time.sleep(sleep_step) | 
|  | slept += sleep_step | 
|  | elif test.socket == 'abstract': | 
|  | return True | 
|  | else: | 
|  | # Create sockets every iteration because refused sockets cannot be | 
|  | # reused on some systems. | 
|  | sock4 = socket.socket() | 
|  | sock6 = socket.socket(family=socket.AF_INET6) | 
|  | try: | 
|  | if sock4.connect_ex(('127.0.0.1', port)) == 0 \ | 
|  | or sock6.connect_ex(('::1', port)) == 0: | 
|  | return True | 
|  | if sv.proc.poll() is not None: | 
|  | logger.warn('[{0}] server process is exited'.format(sv.proc.pid)) | 
|  | return False | 
|  | logger.debug('[{0}] socket not available yet. slept for {1} seconds so far'.format(sv.proc.pid, slept)) | 
|  | time.sleep(sleep_step) | 
|  | slept += sleep_step | 
|  | finally: | 
|  | sock4.close() | 
|  | sock6.close() | 
|  | logger.debug('[{0}] server ready - waited for {1} seconds'.format(sv.proc.pid, slept)) | 
|  | return True | 
|  |  | 
|  | try: | 
|  | max_bind_retry = 3 | 
|  | retry_count = 0 | 
|  | bind_retry_count = 0 | 
|  | test = TestEntry(testdir, **test_dict) | 
|  | while True: | 
|  | if stop.is_set(): | 
|  | logger.debug('Skipping because shutting down') | 
|  | return (retry_count, None) | 
|  | logger.debug('Start') | 
|  | with PortAllocator.alloc_port_scoped(ports, test.socket) as port: | 
|  | logger.debug('Start with port %d' % port) | 
|  | sv = exec_context(port, logdir, test, test.server, True) | 
|  | cl = exec_context(port, logdir, test, test.client, False) | 
|  |  | 
|  | logger.debug('Starting server') | 
|  | with sv.start(): | 
|  | port_ok = ensure_socket_open(sv, port, test) | 
|  | if port_ok: | 
|  | connect_retry_count = 0 | 
|  | max_connect_retry = 12 | 
|  | connect_retry_wait = 0.25 | 
|  | while True: | 
|  | if sv.proc.poll() is not None: | 
|  | logger.info('not starting client because server process is absent') | 
|  | break | 
|  | logger.debug('Starting client') | 
|  | cl.start() | 
|  | logger.debug('Waiting client (up to %d secs)' % test.timeout) | 
|  | cl.wait(test.timeout) | 
|  | if not cl.report.maybe_false_positive() or connect_retry_count >= max_connect_retry: | 
|  | if connect_retry_count > 0 and connect_retry_count < max_connect_retry: | 
|  | logger.info('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait)) | 
|  | # Wait for 50ms to see if server does not die at the end. | 
|  | time.sleep(0.05) | 
|  | break | 
|  | logger.debug('Server may not be ready, waiting %.2f second...' % connect_retry_wait) | 
|  | time.sleep(connect_retry_wait) | 
|  | connect_retry_count += 1 | 
|  |  | 
|  | if sv.report.maybe_false_positive() and bind_retry_count < max_bind_retry: | 
|  | logger.warn('[%s]: Detected socket bind failure, retrying...', test.server.name) | 
|  | bind_retry_count += 1 | 
|  | else: | 
|  | result = RESULT_TIMEOUT if cl.expired else cl.returncode if (cl.proc and cl.proc.poll()) is not None else RESULT_ERROR | 
|  |  | 
|  | # For servers that handle a controlled shutdown by signal | 
|  | # if they are killed, or return an error code, that is a | 
|  | # problem.  For servers that are not signal-aware, we simply | 
|  | # kill them off; if we didn't kill them off, something else | 
|  | # happened (crashed?) | 
|  | if test.server.stop_signal != 0: | 
|  | # for bash scripts, 128+N is the exit code for signal N, since we are sending | 
|  | # DEFAULT_SIGNAL=1, 128 + 1 is the expected err code | 
|  | # http://www.gnu.org/software/bash/manual/html_node/Exit-Status.html | 
|  | allowed_return_code = set([-1, 0, 128 + 1]) | 
|  | if sv.killed or sv.returncode not in allowed_return_code: | 
|  | result |= RESULT_ERROR | 
|  | else: | 
|  | if not sv.killed: | 
|  | result |= RESULT_ERROR | 
|  |  | 
|  | if result == 0 or retry_count >= max_retry: | 
|  | return (retry_count, result) | 
|  | else: | 
|  | logger.info('[%s-%s]: test failed, retrying...', test.server.name, test.client.name) | 
|  | retry_count += 1 | 
|  | except Exception: | 
|  | if not async_mode: | 
|  | raise | 
|  | logger.warn('Error executing [%s]', test.name, exc_info=True) | 
|  | return (retry_count, RESULT_ERROR) | 
|  | except Exception: | 
|  | logger.info('Interrupted execution', exc_info=True) | 
|  | if not async_mode: | 
|  | raise | 
|  | stop.set() | 
|  | return (retry_count, RESULT_ERROR) | 
|  |  | 
|  |  | 
|  | class PortAllocator(object): | 
|  | def __init__(self): | 
|  | self._log = multiprocessing.get_logger() | 
|  | self._lock = multiprocessing.Lock() | 
|  | self._ports = set() | 
|  | self._dom_ports = set() | 
|  | self._last_alloc = 0 | 
|  |  | 
|  | def _get_tcp_port(self): | 
|  | sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
|  | sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | 
|  | sock.bind(('', 0)) | 
|  | port = sock.getsockname()[1] | 
|  | self._lock.acquire() | 
|  | try: | 
|  | ok = port not in self._ports | 
|  | if ok: | 
|  | self._ports.add(port) | 
|  | self._last_alloc = time.time() | 
|  | finally: | 
|  | self._lock.release() | 
|  | sock.close() | 
|  | return port if ok else self._get_tcp_port() | 
|  |  | 
|  | def _get_domain_port(self): | 
|  | port = random.randint(1024, 65536) | 
|  | self._lock.acquire() | 
|  | try: | 
|  | ok = port not in self._dom_ports | 
|  | if ok: | 
|  | self._dom_ports.add(port) | 
|  | finally: | 
|  | self._lock.release() | 
|  | return port if ok else self._get_domain_port() | 
|  |  | 
|  | def alloc_port(self, socket_type): | 
|  | if socket_type in ('domain', 'abstract'): | 
|  | return self._get_domain_port() | 
|  | else: | 
|  | return self._get_tcp_port() | 
|  |  | 
|  | # static method for inter-process invokation | 
|  | @staticmethod | 
|  | @contextlib.contextmanager | 
|  | def alloc_port_scoped(allocator, socket_type): | 
|  | port = allocator.alloc_port(socket_type) | 
|  | yield port | 
|  | allocator.free_port(socket_type, port) | 
|  |  | 
|  | def free_port(self, socket_type, port): | 
|  | self._log.debug('free_port') | 
|  | self._lock.acquire() | 
|  | try: | 
|  | if socket_type == 'domain': | 
|  | self._dom_ports.remove(port) | 
|  | path = domain_socket_path(port) | 
|  | if os.path.exists(path): | 
|  | os.remove(path) | 
|  | elif socket_type == 'abstract': | 
|  | self._dom_ports.remove(port) | 
|  | else: | 
|  | self._ports.remove(port) | 
|  | except IOError: | 
|  | self._log.info('Error while freeing port', exc_info=sys.exc_info()) | 
|  | finally: | 
|  | self._lock.release() | 
|  |  | 
|  |  | 
|  | class NonAsyncResult(object): | 
|  | def __init__(self, value): | 
|  | self._value = value | 
|  |  | 
|  | def get(self, timeout=None): | 
|  | return self._value | 
|  |  | 
|  | def wait(self, timeout=None): | 
|  | pass | 
|  |  | 
|  | def ready(self): | 
|  | return True | 
|  |  | 
|  | def successful(self): | 
|  | return self._value == 0 | 
|  |  | 
|  |  | 
|  | class TestDispatcher(object): | 
|  | def __init__(self, testdir, basedir, logdir_rel, concurrency): | 
|  | self._log = multiprocessing.get_logger() | 
|  | self.testdir = testdir | 
|  | self._report = SummaryReporter(basedir, logdir_rel, concurrency > 1) | 
|  | self.logdir = self._report.testdir | 
|  | # seems needed for python 2.x to handle keyboard interrupt | 
|  | self._stop = multiprocessing.Event() | 
|  | self._async = concurrency > 1 | 
|  | if not self._async: | 
|  | self._pool = None | 
|  | global stop | 
|  | global ports | 
|  | stop = self._stop | 
|  | ports = PortAllocator() | 
|  | else: | 
|  | self._m = multiprocessing.managers.BaseManager() | 
|  | self._m.register('ports', PortAllocator) | 
|  | self._m.start() | 
|  | self._pool = multiprocessing.Pool(concurrency, self._pool_init, (self._m.address,)) | 
|  | self._log.debug( | 
|  | 'TestDispatcher started with %d concurrent jobs' % concurrency) | 
|  |  | 
|  | def _pool_init(self, address): | 
|  | global stop | 
|  | global m | 
|  | global ports | 
|  | stop = self._stop | 
|  | m = multiprocessing.managers.BaseManager(address) | 
|  | m.connect() | 
|  | ports = m.ports() | 
|  |  | 
|  | def _dispatch_sync(self, test, cont, max_retry): | 
|  | r = run_test(self.testdir, self.logdir, test, max_retry, async_mode=False) | 
|  | cont(r) | 
|  | return NonAsyncResult(r) | 
|  |  | 
|  | def _dispatch_async(self, test, cont, max_retry): | 
|  | self._log.debug('_dispatch_async') | 
|  | return self._pool.apply_async(func=run_test, args=(self.testdir, self.logdir, test, max_retry), callback=cont) | 
|  |  | 
|  | def dispatch(self, test, max_retry): | 
|  | index = self._report.add_test(test) | 
|  |  | 
|  | def cont(result): | 
|  | if not self._stop.is_set(): | 
|  | if result and len(result) == 2: | 
|  | retry_count, returncode = result | 
|  | else: | 
|  | retry_count = 0 | 
|  | returncode = RESULT_ERROR | 
|  | self._log.debug('freeing port') | 
|  | self._log.debug('adding result') | 
|  | self._report.add_result(index, returncode, returncode == RESULT_TIMEOUT, retry_count) | 
|  | self._log.debug('finish continuation') | 
|  | fn = self._dispatch_async if self._async else self._dispatch_sync | 
|  | return fn(test, cont, max_retry) | 
|  |  | 
|  | def wait(self): | 
|  | if self._async: | 
|  | self._pool.close() | 
|  | self._pool.join() | 
|  | self._m.shutdown() | 
|  | return self._report.end() | 
|  |  | 
|  | def terminate(self): | 
|  | self._stop.set() | 
|  | if self._async: | 
|  | self._pool.terminate() | 
|  | self._pool.join() | 
|  | self._m.shutdown() |