|  | # | 
|  | # Licensed to the Apache Software Foundation (ASF) under one | 
|  | # or more contributor license agreements. See the NOTICE file | 
|  | # distributed with this work for additional information | 
|  | # regarding copyright ownership. The ASF licenses this file | 
|  | # to you under the Apache License, Version 2.0 (the | 
|  | # "License"); you may not use this file except in compliance | 
|  | # with the License. You may obtain a copy of the License at | 
|  | # | 
|  | #   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | # Unless required by applicable law or agreed to in writing, | 
|  | # software distributed under the License is distributed on an | 
|  | # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | # KIND, either express or implied. See the License for the | 
|  | # specific language governing permissions and limitations | 
|  | # under the License. | 
|  | # | 
|  |  | 
|  | import contextlib | 
|  | import multiprocessing | 
|  | import multiprocessing.managers | 
|  | import os | 
|  | import platform | 
|  | import random | 
|  | import signal | 
|  | import socket | 
|  | import subprocess | 
|  | import sys | 
|  | import threading | 
|  | import time | 
|  |  | 
|  | from .compat import str_join | 
|  | from .test import TestEntry, domain_socket_path | 
|  | from .report import ExecReporter, SummaryReporter | 
|  |  | 
|  | RESULT_TIMEOUT = 128 | 
|  | RESULT_ERROR = 64 | 
|  |  | 
|  |  | 
|  | class ExecutionContext(object): | 
|  | def __init__(self, cmd, cwd, env, report): | 
|  | self._log = multiprocessing.get_logger() | 
|  | self.report = report | 
|  | self.cmd = cmd | 
|  | self.cwd = cwd | 
|  | self.env = env | 
|  | self.timer = None | 
|  | self.expired = False | 
|  | self.killed = False | 
|  | self.proc = None | 
|  |  | 
|  | def _expire(self): | 
|  | self._log.info('Timeout') | 
|  | self.expired = True | 
|  | self.kill() | 
|  |  | 
|  | def kill(self): | 
|  | self._log.debug('Killing process : %d' % self.proc.pid) | 
|  | self.killed = True | 
|  | if platform.system() != 'Windows': | 
|  | try: | 
|  | os.killpg(self.proc.pid, signal.SIGKILL) | 
|  | except Exception: | 
|  | self._log.info('Failed to kill process group', exc_info=sys.exc_info()) | 
|  | try: | 
|  | self.proc.kill() | 
|  | except Exception: | 
|  | self._log.info('Failed to kill process', exc_info=sys.exc_info()) | 
|  |  | 
|  | def _popen_args(self): | 
|  | args = { | 
|  | 'cwd': self.cwd, | 
|  | 'env': self.env, | 
|  | 'stdout': self.report.out, | 
|  | 'stderr': subprocess.STDOUT, | 
|  | } | 
|  | # make sure child processes doesn't remain after killing | 
|  | if platform.system() == 'Windows': | 
|  | DETACHED_PROCESS = 0x00000008 | 
|  | args.update(creationflags=DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP) | 
|  | else: | 
|  | args.update(preexec_fn=os.setsid) | 
|  | return args | 
|  |  | 
|  | def start(self, timeout=0): | 
|  | joined = str_join(' ', self.cmd) | 
|  | self._log.debug('COMMAND: %s', joined) | 
|  | self._log.debug('WORKDIR: %s', self.cwd) | 
|  | self._log.debug('LOGFILE: %s', self.report.logpath) | 
|  | self.report.begin() | 
|  | self.proc = subprocess.Popen(self.cmd, **self._popen_args()) | 
|  | if timeout > 0: | 
|  | self.timer = threading.Timer(timeout, self._expire) | 
|  | self.timer.start() | 
|  | return self._scoped() | 
|  |  | 
|  | @contextlib.contextmanager | 
|  | def _scoped(self): | 
|  | yield self | 
|  | self._log.debug('Killing scoped process') | 
|  | if self.proc.poll() is None: | 
|  | self.kill() | 
|  | self.report.killed() | 
|  | else: | 
|  | self._log.debug('Process died unexpectedly') | 
|  | self.report.died() | 
|  |  | 
|  | def wait(self): | 
|  | self.proc.communicate() | 
|  | if self.timer: | 
|  | self.timer.cancel() | 
|  | self.report.end(self.returncode) | 
|  |  | 
|  | @property | 
|  | def returncode(self): | 
|  | return self.proc.returncode if self.proc else None | 
|  |  | 
|  |  | 
|  | def exec_context(port, logdir, test, prog): | 
|  | report = ExecReporter(logdir, test, prog) | 
|  | prog.build_command(port) | 
|  | return ExecutionContext(prog.command, prog.workdir, prog.env, report) | 
|  |  | 
|  |  | 
|  | def run_test(testdir, logdir, test_dict, max_retry, async=True): | 
|  | logger = multiprocessing.get_logger() | 
|  |  | 
|  | def ensure_socket_open(proc, port, max_delay): | 
|  | sleeped = 0.1 | 
|  | time.sleep(sleeped) | 
|  | sock4 = socket.socket() | 
|  | sock6 = socket.socket(family=socket.AF_INET6) | 
|  | sleep_step = 0.2 | 
|  | try: | 
|  | while sock4.connect_ex(('127.0.0.1', port)) and sock6.connect_ex(('::1', port)): | 
|  | if proc.poll() is not None: | 
|  | logger.warn('server process is exited') | 
|  | return False | 
|  | if sleeped > max_delay: | 
|  | logger.warn('sleeped for %f seconds but server port is not open' % sleeped) | 
|  | return False | 
|  | time.sleep(sleep_step) | 
|  | sleeped += sleep_step | 
|  | logger.debug('waited %f sec for server port open' % sleeped) | 
|  | return True | 
|  | finally: | 
|  | sock4.close() | 
|  | sock6.close() | 
|  |  | 
|  | try: | 
|  | max_bind_retry = 3 | 
|  | retry_count = 0 | 
|  | bind_retry_count = 0 | 
|  | test = TestEntry(testdir, **test_dict) | 
|  | while True: | 
|  | if stop.is_set(): | 
|  | logger.debug('Skipping because shutting down') | 
|  | return (retry_count, None) | 
|  | logger.debug('Start') | 
|  | with PortAllocator.alloc_port_scoped(ports, test.socket) as port: | 
|  | logger.debug('Start with port %d' % port) | 
|  | sv = exec_context(port, logdir, test, test.server) | 
|  | cl = exec_context(port, logdir, test, test.client) | 
|  |  | 
|  | logger.debug('Starting server') | 
|  | with sv.start(): | 
|  | if test.socket in ('domain', 'abstract'): | 
|  | time.sleep(0.1) | 
|  | else: | 
|  | if not ensure_socket_open(sv.proc, port, test.delay): | 
|  | break | 
|  | connect_retry_count = 0 | 
|  | max_connect_retry = 3 | 
|  | connect_retry_wait = 0.5 | 
|  | while True: | 
|  | if sv.proc.poll() is not None: | 
|  | logger.info('not starting client because server process is absent') | 
|  | break | 
|  | logger.debug('Starting client') | 
|  | cl.start(test.timeout) | 
|  | logger.debug('Waiting client') | 
|  | cl.wait() | 
|  | if not cl.report.maybe_false_positive() or connect_retry_count >= max_connect_retry: | 
|  | if connect_retry_count > 0 and connect_retry_count < max_connect_retry: | 
|  | logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait)) | 
|  | # Wait for 50ms to see if server does not die at the end. | 
|  | time.sleep(0.05) | 
|  | break | 
|  | logger.debug('Server may not be ready, waiting %.2f second...' % connect_retry_wait) | 
|  | time.sleep(connect_retry_wait) | 
|  | connect_retry_count += 1 | 
|  |  | 
|  | if sv.report.maybe_false_positive() and bind_retry_count < max_bind_retry: | 
|  | logger.warn('[%s]: Detected socket bind failure, retrying...', test.server.name) | 
|  | bind_retry_count += 1 | 
|  | else: | 
|  | if cl.expired: | 
|  | result = RESULT_TIMEOUT | 
|  | else: | 
|  | result = cl.proc.returncode if cl.proc else RESULT_ERROR | 
|  | if not sv.killed: | 
|  | # Server died without being killed. | 
|  | result |= RESULT_ERROR | 
|  |  | 
|  | if result == 0 or retry_count >= max_retry: | 
|  | return (retry_count, result) | 
|  | else: | 
|  | logger.info('[%s-%s]: test failed, retrying...', test.server.name, test.client.name) | 
|  | retry_count += 1 | 
|  | except Exception: | 
|  | if not async: | 
|  | raise | 
|  | logger.warn('Error executing [%s]', test.name, exc_info=True) | 
|  | return (retry_count, RESULT_ERROR) | 
|  | except: | 
|  | logger.info('Interrupted execution', exc_info=True) | 
|  | if not async: | 
|  | raise | 
|  | stop.set() | 
|  | return (retry_count, RESULT_ERROR) | 
|  |  | 
|  |  | 
|  | class PortAllocator(object): | 
|  | def __init__(self): | 
|  | self._log = multiprocessing.get_logger() | 
|  | self._lock = multiprocessing.Lock() | 
|  | self._ports = set() | 
|  | self._dom_ports = set() | 
|  | self._last_alloc = 0 | 
|  |  | 
|  | def _get_tcp_port(self): | 
|  | sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | 
|  | sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) | 
|  | sock.bind(('', 0)) | 
|  | port = sock.getsockname()[1] | 
|  | self._lock.acquire() | 
|  | try: | 
|  | ok = port not in self._ports | 
|  | if ok: | 
|  | self._ports.add(port) | 
|  | self._last_alloc = time.time() | 
|  | finally: | 
|  | self._lock.release() | 
|  | sock.close() | 
|  | return port if ok else self._get_tcp_port() | 
|  |  | 
|  | def _get_domain_port(self): | 
|  | port = random.randint(1024, 65536) | 
|  | self._lock.acquire() | 
|  | try: | 
|  | ok = port not in self._dom_ports | 
|  | if ok: | 
|  | self._dom_ports.add(port) | 
|  | finally: | 
|  | self._lock.release() | 
|  | return port if ok else self._get_domain_port() | 
|  |  | 
|  | def alloc_port(self, socket_type): | 
|  | if socket_type in ('domain', 'abstract'): | 
|  | return self._get_domain_port() | 
|  | else: | 
|  | return self._get_tcp_port() | 
|  |  | 
|  | # static method for inter-process invokation | 
|  | @staticmethod | 
|  | @contextlib.contextmanager | 
|  | def alloc_port_scoped(allocator, socket_type): | 
|  | port = allocator.alloc_port(socket_type) | 
|  | yield port | 
|  | allocator.free_port(socket_type, port) | 
|  |  | 
|  | def free_port(self, socket_type, port): | 
|  | self._log.debug('free_port') | 
|  | self._lock.acquire() | 
|  | try: | 
|  | if socket_type == 'domain': | 
|  | self._dom_ports.remove(port) | 
|  | path = domain_socket_path(port) | 
|  | if os.path.exists(path): | 
|  | os.remove(path) | 
|  | elif socket_type == 'abstract': | 
|  | self._dom_ports.remove(port) | 
|  | else: | 
|  | self._ports.remove(port) | 
|  | except IOError: | 
|  | self._log.info('Error while freeing port', exc_info=sys.exc_info()) | 
|  | finally: | 
|  | self._lock.release() | 
|  |  | 
|  |  | 
|  | class NonAsyncResult(object): | 
|  | def __init__(self, value): | 
|  | self._value = value | 
|  |  | 
|  | def get(self, timeout=None): | 
|  | return self._value | 
|  |  | 
|  | def wait(self, timeout=None): | 
|  | pass | 
|  |  | 
|  | def ready(self): | 
|  | return True | 
|  |  | 
|  | def successful(self): | 
|  | return self._value == 0 | 
|  |  | 
|  |  | 
|  | class TestDispatcher(object): | 
|  | def __init__(self, testdir, basedir, logdir_rel, concurrency): | 
|  | self._log = multiprocessing.get_logger() | 
|  | self.testdir = testdir | 
|  | self._report = SummaryReporter(basedir, logdir_rel, concurrency > 1) | 
|  | self.logdir = self._report.testdir | 
|  | # seems needed for python 2.x to handle keyboard interrupt | 
|  | self._stop = multiprocessing.Event() | 
|  | self._async = concurrency > 1 | 
|  | if not self._async: | 
|  | self._pool = None | 
|  | global stop | 
|  | global ports | 
|  | stop = self._stop | 
|  | ports = PortAllocator() | 
|  | else: | 
|  | self._m = multiprocessing.managers.BaseManager() | 
|  | self._m.register('ports', PortAllocator) | 
|  | self._m.start() | 
|  | self._pool = multiprocessing.Pool(concurrency, self._pool_init, (self._m.address,)) | 
|  | self._log.debug( | 
|  | 'TestDispatcher started with %d concurrent jobs' % concurrency) | 
|  |  | 
|  | def _pool_init(self, address): | 
|  | global stop | 
|  | global m | 
|  | global ports | 
|  | stop = self._stop | 
|  | m = multiprocessing.managers.BaseManager(address) | 
|  | m.connect() | 
|  | ports = m.ports() | 
|  |  | 
|  | def _dispatch_sync(self, test, cont, max_retry): | 
|  | r = run_test(self.testdir, self.logdir, test, max_retry, False) | 
|  | cont(r) | 
|  | return NonAsyncResult(r) | 
|  |  | 
|  | def _dispatch_async(self, test, cont, max_retry): | 
|  | self._log.debug('_dispatch_async') | 
|  | return self._pool.apply_async(func=run_test, args=(self.testdir, self.logdir, test, max_retry), callback=cont) | 
|  |  | 
|  | def dispatch(self, test, max_retry): | 
|  | index = self._report.add_test(test) | 
|  |  | 
|  | def cont(result): | 
|  | if not self._stop.is_set(): | 
|  | if result and len(result) == 2: | 
|  | retry_count, returncode = result | 
|  | else: | 
|  | retry_count = 0 | 
|  | returncode = RESULT_ERROR | 
|  | self._log.debug('freeing port') | 
|  | self._log.debug('adding result') | 
|  | self._report.add_result(index, returncode, returncode == RESULT_TIMEOUT, retry_count) | 
|  | self._log.debug('finish continuation') | 
|  | fn = self._dispatch_async if self._async else self._dispatch_sync | 
|  | return fn(test, cont, max_retry) | 
|  |  | 
|  | def wait(self): | 
|  | if self._async: | 
|  | self._pool.close() | 
|  | self._pool.join() | 
|  | self._m.shutdown() | 
|  | return self._report.end() | 
|  |  | 
|  | def terminate(self): | 
|  | self._stop.set() | 
|  | if self._async: | 
|  | self._pool.terminate() | 
|  | self._pool.join() | 
|  | self._m.shutdown() |