THRIFT-3596 Better conformance to PEP8
This closes #832
diff --git a/test/crossrunner/run.py b/test/crossrunner/run.py
index 68bd928..18c1623 100644
--- a/test/crossrunner/run.py
+++ b/test/crossrunner/run.py
@@ -39,307 +39,307 @@
class ExecutionContext(object):
- def __init__(self, cmd, cwd, env, report):
- self._log = multiprocessing.get_logger()
- self.report = report
- self.cmd = cmd
- self.cwd = cwd
- self.env = env
- self.timer = None
- self.expired = False
- self.killed = False
+ def __init__(self, cmd, cwd, env, report):
+ self._log = multiprocessing.get_logger()
+ self.report = report
+ self.cmd = cmd
+ self.cwd = cwd
+ self.env = env
+ self.timer = None
+ self.expired = False
+ self.killed = False
- def _expire(self):
- self._log.info('Timeout')
- self.expired = True
- self.kill()
+ def _expire(self):
+ self._log.info('Timeout')
+ self.expired = True
+ self.kill()
- def kill(self):
- self._log.debug('Killing process : %d' % self.proc.pid)
- self.killed = True
- if platform.system() != 'Windows':
- try:
- os.killpg(self.proc.pid, signal.SIGKILL)
- except Exception:
- self._log.info('Failed to kill process group', exc_info=sys.exc_info())
- try:
- self.proc.kill()
- except Exception:
- self._log.info('Failed to kill process', exc_info=sys.exc_info())
+ def kill(self):
+ self._log.debug('Killing process : %d' % self.proc.pid)
+ self.killed = True
+ if platform.system() != 'Windows':
+ try:
+ os.killpg(self.proc.pid, signal.SIGKILL)
+ except Exception:
+ self._log.info('Failed to kill process group', exc_info=sys.exc_info())
+ try:
+ self.proc.kill()
+ except Exception:
+ self._log.info('Failed to kill process', exc_info=sys.exc_info())
- def _popen_args(self):
- args = {
- 'cwd': self.cwd,
- 'env': self.env,
- 'stdout': self.report.out,
- 'stderr': subprocess.STDOUT,
- }
- # make sure child processes doesn't remain after killing
- if platform.system() == 'Windows':
- DETACHED_PROCESS = 0x00000008
- args.update(creationflags=DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP)
- else:
- args.update(preexec_fn=os.setsid)
- return args
+ def _popen_args(self):
+ args = {
+ 'cwd': self.cwd,
+ 'env': self.env,
+ 'stdout': self.report.out,
+ 'stderr': subprocess.STDOUT,
+ }
+ # make sure child processes doesn't remain after killing
+ if platform.system() == 'Windows':
+ DETACHED_PROCESS = 0x00000008
+ args.update(creationflags=DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP)
+ else:
+ args.update(preexec_fn=os.setsid)
+ return args
- def start(self, timeout=0):
- joined = str_join(' ', self.cmd)
- self._log.debug('COMMAND: %s', joined)
- self._log.debug('WORKDIR: %s', self.cwd)
- self._log.debug('LOGFILE: %s', self.report.logpath)
- self.report.begin()
- self.proc = subprocess.Popen(self.cmd, **self._popen_args())
- if timeout > 0:
- self.timer = threading.Timer(timeout, self._expire)
- self.timer.start()
- return self._scoped()
+ def start(self, timeout=0):
+ joined = str_join(' ', self.cmd)
+ self._log.debug('COMMAND: %s', joined)
+ self._log.debug('WORKDIR: %s', self.cwd)
+ self._log.debug('LOGFILE: %s', self.report.logpath)
+ self.report.begin()
+ self.proc = subprocess.Popen(self.cmd, **self._popen_args())
+ if timeout > 0:
+ self.timer = threading.Timer(timeout, self._expire)
+ self.timer.start()
+ return self._scoped()
- @contextlib.contextmanager
- def _scoped(self):
- yield self
- self._log.debug('Killing scoped process')
- if self.proc.poll() is None:
- self.kill()
- self.report.killed()
- else:
- self._log.debug('Process died unexpectedly')
- self.report.died()
+ @contextlib.contextmanager
+ def _scoped(self):
+ yield self
+ self._log.debug('Killing scoped process')
+ if self.proc.poll() is None:
+ self.kill()
+ self.report.killed()
+ else:
+ self._log.debug('Process died unexpectedly')
+ self.report.died()
- def wait(self):
- self.proc.communicate()
- if self.timer:
- self.timer.cancel()
- self.report.end(self.returncode)
+ def wait(self):
+ self.proc.communicate()
+ if self.timer:
+ self.timer.cancel()
+ self.report.end(self.returncode)
- @property
- def returncode(self):
- return self.proc.returncode if self.proc else None
+ @property
+ def returncode(self):
+ return self.proc.returncode if self.proc else None
def exec_context(port, logdir, test, prog):
- report = ExecReporter(logdir, test, prog)
- prog.build_command(port)
- return ExecutionContext(prog.command, prog.workdir, prog.env, report)
+ report = ExecReporter(logdir, test, prog)
+ prog.build_command(port)
+ return ExecutionContext(prog.command, prog.workdir, prog.env, report)
def run_test(testdir, logdir, test_dict, max_retry, async=True):
- try:
- logger = multiprocessing.get_logger()
- max_bind_retry = 3
- retry_count = 0
- bind_retry_count = 0
- test = TestEntry(testdir, **test_dict)
- while True:
- if stop.is_set():
- logger.debug('Skipping because shutting down')
- return (retry_count, None)
- logger.debug('Start')
- with PortAllocator.alloc_port_scoped(ports, test.socket) as port:
- logger.debug('Start with port %d' % port)
- sv = exec_context(port, logdir, test, test.server)
- cl = exec_context(port, logdir, test, test.client)
+ try:
+ logger = multiprocessing.get_logger()
+ max_bind_retry = 3
+ retry_count = 0
+ bind_retry_count = 0
+ test = TestEntry(testdir, **test_dict)
+ while True:
+ if stop.is_set():
+ logger.debug('Skipping because shutting down')
+ return (retry_count, None)
+ logger.debug('Start')
+ with PortAllocator.alloc_port_scoped(ports, test.socket) as port:
+ logger.debug('Start with port %d' % port)
+ sv = exec_context(port, logdir, test, test.server)
+ cl = exec_context(port, logdir, test, test.client)
- logger.debug('Starting server')
- with sv.start():
- if test.delay > 0:
- logger.debug('Delaying client for %.2f seconds' % test.delay)
- time.sleep(test.delay)
- connect_retry_count = 0
- max_connect_retry = 10
- connect_retry_wait = 0.5
- while True:
- logger.debug('Starting client')
- cl.start(test.timeout)
- logger.debug('Waiting client')
- cl.wait()
- if not cl.report.maybe_false_positive() or connect_retry_count >= max_connect_retry:
- if connect_retry_count > 0 and connect_retry_count < max_connect_retry:
- logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait))
- # Wait for 50ms to see if server does not die at the end.
- time.sleep(0.05)
- break
- logger.debug('Server may not be ready, waiting %.2f second...' % connect_retry_wait)
- time.sleep(connect_retry_wait)
- connect_retry_count += 1
+ logger.debug('Starting server')
+ with sv.start():
+ if test.delay > 0:
+ logger.debug('Delaying client for %.2f seconds' % test.delay)
+ time.sleep(test.delay)
+ connect_retry_count = 0
+ max_connect_retry = 10
+ connect_retry_wait = 0.5
+ while True:
+ logger.debug('Starting client')
+ cl.start(test.timeout)
+ logger.debug('Waiting client')
+ cl.wait()
+ if not cl.report.maybe_false_positive() or connect_retry_count >= max_connect_retry:
+ if connect_retry_count > 0 and connect_retry_count < max_connect_retry:
+ logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait))
+ # Wait for 50ms to see if server does not die at the end.
+ time.sleep(0.05)
+ break
+ logger.debug('Server may not be ready, waiting %.2f second...' % connect_retry_wait)
+ time.sleep(connect_retry_wait)
+ connect_retry_count += 1
- if sv.report.maybe_false_positive() and bind_retry_count < max_bind_retry:
- logger.warn('[%s]: Detected socket bind failure, retrying...', test.server.name)
- bind_retry_count += 1
- else:
- if cl.expired:
- result = RESULT_TIMEOUT
- elif not sv.killed and cl.proc.returncode == 0:
- # Server should be alive at the end.
- result = RESULT_ERROR
- else:
- result = cl.proc.returncode
+ if sv.report.maybe_false_positive() and bind_retry_count < max_bind_retry:
+ logger.warn('[%s]: Detected socket bind failure, retrying...', test.server.name)
+ bind_retry_count += 1
+ else:
+ if cl.expired:
+ result = RESULT_TIMEOUT
+ elif not sv.killed and cl.proc.returncode == 0:
+ # Server should be alive at the end.
+ result = RESULT_ERROR
+ else:
+ result = cl.proc.returncode
- if result == 0 or retry_count >= max_retry:
- return (retry_count, result)
- else:
- logger.info('[%s-%s]: test failed, retrying...', test.server.name, test.client.name)
- retry_count += 1
- except (KeyboardInterrupt, SystemExit):
- logger.info('Interrupted execution')
- if not async:
- raise
- stop.set()
- return None
- except:
- if not async:
- raise
- logger.warn('Error executing [%s]', test.name, exc_info=sys.exc_info())
- return (retry_count, RESULT_ERROR)
+ if result == 0 or retry_count >= max_retry:
+ return (retry_count, result)
+ else:
+ logger.info('[%s-%s]: test failed, retrying...', test.server.name, test.client.name)
+ retry_count += 1
+ except (KeyboardInterrupt, SystemExit):
+ logger.info('Interrupted execution')
+ if not async:
+ raise
+ stop.set()
+ return None
+ except:
+ if not async:
+ raise
+ logger.warn('Error executing [%s]', test.name, exc_info=sys.exc_info())
+ return (retry_count, RESULT_ERROR)
class PortAllocator(object):
- def __init__(self):
- self._log = multiprocessing.get_logger()
- self._lock = multiprocessing.Lock()
- self._ports = set()
- self._dom_ports = set()
- self._last_alloc = 0
+ def __init__(self):
+ self._log = multiprocessing.get_logger()
+ self._lock = multiprocessing.Lock()
+ self._ports = set()
+ self._dom_ports = set()
+ self._last_alloc = 0
- def _get_tcp_port(self):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.bind(('127.0.0.1', 0))
- port = sock.getsockname()[1]
- self._lock.acquire()
- try:
- ok = port not in self._ports
- if ok:
- self._ports.add(port)
- self._last_alloc = time.time()
- finally:
- self._lock.release()
- sock.close()
- return port if ok else self._get_tcp_port()
+ def _get_tcp_port(self):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.bind(('127.0.0.1', 0))
+ port = sock.getsockname()[1]
+ self._lock.acquire()
+ try:
+ ok = port not in self._ports
+ if ok:
+ self._ports.add(port)
+ self._last_alloc = time.time()
+ finally:
+ self._lock.release()
+ sock.close()
+ return port if ok else self._get_tcp_port()
- def _get_domain_port(self):
- port = random.randint(1024, 65536)
- self._lock.acquire()
- try:
- ok = port not in self._dom_ports
- if ok:
- self._dom_ports.add(port)
- finally:
- self._lock.release()
- return port if ok else self._get_domain_port()
+ def _get_domain_port(self):
+ port = random.randint(1024, 65536)
+ self._lock.acquire()
+ try:
+ ok = port not in self._dom_ports
+ if ok:
+ self._dom_ports.add(port)
+ finally:
+ self._lock.release()
+ return port if ok else self._get_domain_port()
- def alloc_port(self, socket_type):
- if socket_type in ('domain', 'abstract'):
- return self._get_domain_port()
- else:
- return self._get_tcp_port()
+ def alloc_port(self, socket_type):
+ if socket_type in ('domain', 'abstract'):
+ return self._get_domain_port()
+ else:
+ return self._get_tcp_port()
- # static method for inter-process invokation
- @staticmethod
- @contextlib.contextmanager
- def alloc_port_scoped(allocator, socket_type):
- port = allocator.alloc_port(socket_type)
- yield port
- allocator.free_port(socket_type, port)
+ # static method for inter-process invokation
+ @staticmethod
+ @contextlib.contextmanager
+ def alloc_port_scoped(allocator, socket_type):
+ port = allocator.alloc_port(socket_type)
+ yield port
+ allocator.free_port(socket_type, port)
- def free_port(self, socket_type, port):
- self._log.debug('free_port')
- self._lock.acquire()
- try:
- if socket_type == 'domain':
- self._dom_ports.remove(port)
- path = domain_socket_path(port)
- if os.path.exists(path):
- os.remove(path)
- elif socket_type == 'abstract':
- self._dom_ports.remove(port)
- else:
- self._ports.remove(port)
- except IOError:
- self._log.info('Error while freeing port', exc_info=sys.exc_info())
- finally:
- self._lock.release()
+ def free_port(self, socket_type, port):
+ self._log.debug('free_port')
+ self._lock.acquire()
+ try:
+ if socket_type == 'domain':
+ self._dom_ports.remove(port)
+ path = domain_socket_path(port)
+ if os.path.exists(path):
+ os.remove(path)
+ elif socket_type == 'abstract':
+ self._dom_ports.remove(port)
+ else:
+ self._ports.remove(port)
+ except IOError:
+ self._log.info('Error while freeing port', exc_info=sys.exc_info())
+ finally:
+ self._lock.release()
class NonAsyncResult(object):
- def __init__(self, value):
- self._value = value
+ def __init__(self, value):
+ self._value = value
- def get(self, timeout=None):
- return self._value
+ def get(self, timeout=None):
+ return self._value
- def wait(self, timeout=None):
- pass
+ def wait(self, timeout=None):
+ pass
- def ready(self):
- return True
+ def ready(self):
+ return True
- def successful(self):
- return self._value == 0
+ def successful(self):
+ return self._value == 0
class TestDispatcher(object):
- def __init__(self, testdir, basedir, logdir_rel, concurrency):
- self._log = multiprocessing.get_logger()
- self.testdir = testdir
- self._report = SummaryReporter(basedir, logdir_rel, concurrency > 1)
- self.logdir = self._report.testdir
- # seems needed for python 2.x to handle keyboard interrupt
- self._stop = multiprocessing.Event()
- self._async = concurrency > 1
- if not self._async:
- self._pool = None
- global stop
- global ports
- stop = self._stop
- ports = PortAllocator()
- else:
- self._m = multiprocessing.managers.BaseManager()
- self._m.register('ports', PortAllocator)
- self._m.start()
- self._pool = multiprocessing.Pool(concurrency, self._pool_init, (self._m.address,))
- self._log.debug(
- 'TestDispatcher started with %d concurrent jobs' % concurrency)
+ def __init__(self, testdir, basedir, logdir_rel, concurrency):
+ self._log = multiprocessing.get_logger()
+ self.testdir = testdir
+ self._report = SummaryReporter(basedir, logdir_rel, concurrency > 1)
+ self.logdir = self._report.testdir
+ # seems needed for python 2.x to handle keyboard interrupt
+ self._stop = multiprocessing.Event()
+ self._async = concurrency > 1
+ if not self._async:
+ self._pool = None
+ global stop
+ global ports
+ stop = self._stop
+ ports = PortAllocator()
+ else:
+ self._m = multiprocessing.managers.BaseManager()
+ self._m.register('ports', PortAllocator)
+ self._m.start()
+ self._pool = multiprocessing.Pool(concurrency, self._pool_init, (self._m.address,))
+ self._log.debug(
+ 'TestDispatcher started with %d concurrent jobs' % concurrency)
- def _pool_init(self, address):
- global stop
- global m
- global ports
- stop = self._stop
- m = multiprocessing.managers.BaseManager(address)
- m.connect()
- ports = m.ports()
+ def _pool_init(self, address):
+ global stop
+ global m
+ global ports
+ stop = self._stop
+ m = multiprocessing.managers.BaseManager(address)
+ m.connect()
+ ports = m.ports()
- def _dispatch_sync(self, test, cont, max_retry):
- r = run_test(self.testdir, self.logdir, test, max_retry, False)
- cont(r)
- return NonAsyncResult(r)
+ def _dispatch_sync(self, test, cont, max_retry):
+ r = run_test(self.testdir, self.logdir, test, max_retry, False)
+ cont(r)
+ return NonAsyncResult(r)
- def _dispatch_async(self, test, cont, max_retry):
- self._log.debug('_dispatch_async')
- return self._pool.apply_async(func=run_test, args=(self.testdir, self.logdir, test, max_retry), callback=cont)
+ def _dispatch_async(self, test, cont, max_retry):
+ self._log.debug('_dispatch_async')
+ return self._pool.apply_async(func=run_test, args=(self.testdir, self.logdir, test, max_retry), callback=cont)
- def dispatch(self, test, max_retry):
- index = self._report.add_test(test)
+ def dispatch(self, test, max_retry):
+ index = self._report.add_test(test)
- def cont(result):
- if not self._stop.is_set():
- retry_count, returncode = result
- self._log.debug('freeing port')
- self._log.debug('adding result')
- self._report.add_result(index, returncode, returncode == RESULT_TIMEOUT, retry_count)
- self._log.debug('finish continuation')
- fn = self._dispatch_async if self._async else self._dispatch_sync
- return fn(test, cont, max_retry)
+ def cont(result):
+ if not self._stop.is_set():
+ retry_count, returncode = result
+ self._log.debug('freeing port')
+ self._log.debug('adding result')
+ self._report.add_result(index, returncode, returncode == RESULT_TIMEOUT, retry_count)
+ self._log.debug('finish continuation')
+ fn = self._dispatch_async if self._async else self._dispatch_sync
+ return fn(test, cont, max_retry)
- def wait(self):
- if self._async:
- self._pool.close()
- self._pool.join()
- self._m.shutdown()
- return self._report.end()
+ def wait(self):
+ if self._async:
+ self._pool.close()
+ self._pool.join()
+ self._m.shutdown()
+ return self._report.end()
- def terminate(self):
- self._stop.set()
- if self._async:
- self._pool.terminate()
- self._pool.join()
- self._m.shutdown()
+ def terminate(self):
+ self._stop.set()
+ if self._async:
+ self._pool.terminate()
+ self._pool.join()
+ self._m.shutdown()