THRIFT-847 Test Framework harmonization across all languages
THRIFT-2946 Enhance usability of cross test framework

Patch: Nobuaki Sukegawa

This closes: #358
diff --git a/test/crossrunner/__init__.py b/test/crossrunner/__init__.py
new file mode 100644
index 0000000..06de2d0
--- /dev/null
+++ b/test/crossrunner/__init__.py
@@ -0,0 +1,25 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+from crossrunner.test import test_name
+from crossrunner.collect import collect_tests
+from crossrunner.run import TestDispatcher
+from crossrunner.report import generate_known_failures
+from crossrunner.report import load_known_failures
+from crossrunner.prepare import prepare
diff --git a/test/crossrunner/collect.py b/test/crossrunner/collect.py
new file mode 100644
index 0000000..80a82e7
--- /dev/null
+++ b/test/crossrunner/collect.py
@@ -0,0 +1,136 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import platform
+from itertools import product
+
+from crossrunner.util import merge_dict
+
+# Those keys are passed to execution as is.
+# Note that there are keys other than these, namely:
+# delay: After server is started, client start is delayed for the value
+# (seconds).
+# timeout: Test timeout after client is started (seconds).
+# platforms: Supported platforms. Should match platform.system() value.
+# protocols: list of supported protocols
+# transports: list of supported transports
+# sockets: list of supported sockets
+VALID_JSON_KEYS = [
+  'name',  # name of the library, typically a language name
+  'workdir',  # work directory where command is executed
+  'command',  # test command
+  'extra_args',  # args appended to command after other args are appended
+  'join_args',  # whether args should be passed as single concatenated string
+  'env',  # additional environmental variable
+]
+
+DEFAULT_DELAY = 1
+DEFAULT_TIMEOUT = 5
+
+
+def collect_testlibs(config, server_match, client_match):
+  """Collects server/client configurations from library configurations"""
+  def expand_libs(config):
+    for lib in config:
+      sv = lib.pop('server', None)
+      cl = lib.pop('client', None)
+      yield lib, sv, cl
+
+  def yield_testlibs(base_configs, configs, match):
+    for base, conf in zip(base_configs, configs):
+      if conf:
+        if not match or base['name'] in match:
+          platforms = conf.get('platforms') or base.get('platforms')
+          if not platforms or platform.system() in platforms:
+            yield merge_dict(base, conf)
+
+  libs, svs, cls = zip(*expand_libs(config))
+  servers = list(yield_testlibs(libs, svs, server_match))
+  clients = list(yield_testlibs(libs, cls, client_match))
+  return servers, clients
+
+
+def do_collect_tests(servers, clients):
+  def intersection(key, o1, o2):
+    """intersection of two collections.
+    collections are replaced with sets the first time"""
+    def cached_set(o, key):
+      v = o[key]
+      if not isinstance(v, set):
+        v = set(v)
+        o[key] = v
+      return v
+    return cached_set(o1, key) & cached_set(o2, key)
+
+  # each entry can be spec:impl (e.g. binary:accel)
+  def intersect_with_spec(key, o1, o2):
+    # store as set of (spec, impl) tuple
+    def cached_set(o):
+      def to_spec_impl_tuples(values):
+        for v in values:
+          spec, _, impl = v.partition(':')
+          yield spec, impl or spec
+      v = o[key]
+      if not isinstance(v, set):
+        v = set(to_spec_impl_tuples(set(v)))
+        o[key] = v
+      return v
+    for spec1, impl1 in cached_set(o1):
+      for spec2, impl2 in cached_set(o2):
+        if spec1 == spec2:
+          name = impl1 if impl1 == impl2 else '%s-%s' % (impl1, impl2)
+          yield name, impl1, impl2
+
+  def maybe_max(key, o1, o2, default):
+    """maximum of two if present, otherwise defult value"""
+    v1 = o1.get(key)
+    v2 = o2.get(key)
+    return max(v1, v2) if v1 and v2 else v1 or v2 or default
+
+  def filter_with_validkeys(o):
+    ret = {}
+    for key in VALID_JSON_KEYS:
+      if key in o:
+        ret[key] = o[key]
+    return ret
+
+  def merge_metadata(o, **ret):
+    for key in VALID_JSON_KEYS:
+      if key in o:
+        ret[key] = o[key]
+    return ret
+
+  for sv, cl in product(servers, clients):
+    for proto, proto1, proto2 in intersect_with_spec('protocols', sv, cl):
+      for trans, trans1, trans2 in intersect_with_spec('transports', sv, cl):
+        for sock in intersection('sockets', sv, cl):
+          yield {
+            'server': merge_metadata(sv, **{'protocol': proto1, 'transport': trans1}),
+            'client': merge_metadata(cl, **{'protocol': proto2, 'transport': trans2}),
+            'delay': maybe_max('delay', sv, cl, DEFAULT_DELAY),
+            'timeout': maybe_max('timeout', sv, cl, DEFAULT_TIMEOUT),
+            'protocol': proto,
+            'transport': trans,
+            'socket': sock
+          }
+
+
+def collect_tests(tests_dict, server_match, client_match):
+  sv, cl = collect_testlibs(tests_dict, server_match, client_match)
+  return list(do_collect_tests(sv, cl))
diff --git a/test/crossrunner/prepare.py b/test/crossrunner/prepare.py
new file mode 100644
index 0000000..6e4f6ee
--- /dev/null
+++ b/test/crossrunner/prepare.py
@@ -0,0 +1,55 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import os
+import subprocess
+
+from crossrunner.collect import collect_testlibs
+
+
+def prepare(config_dict, testdir, server_match, client_match):
+  libs, libs2 = collect_testlibs(config_dict, server_match, client_match)
+  libs.extend(libs2)
+
+  def prepares():
+    for lib in libs:
+      pre = lib.get('prepare')
+      if pre:
+        yield pre, lib['workdir']
+
+  def files():
+    for lib in libs:
+      workdir = os.path.join(testdir, lib['workdir'])
+      for c in lib['command']:
+        if not c.startswith('-'):
+          p = os.path.join(workdir, c)
+          if not os.path.exists(p):
+            yield os.path.split(p)
+
+  def make(p):
+    d, f = p
+    with open(os.devnull, 'w') as devnull:
+      return subprocess.Popen(['make', f], cwd=d, stderr=devnull)
+
+  for pre, d in prepares():
+    subprocess.Popen(pre, cwd=d).wait()
+
+  for p in list(map(make, set(files()))):
+    p.wait()
+  return True
diff --git a/test/crossrunner/report.py b/test/crossrunner/report.py
new file mode 100644
index 0000000..da478fa
--- /dev/null
+++ b/test/crossrunner/report.py
@@ -0,0 +1,395 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import datetime
+import json
+import multiprocessing
+import os
+import platform
+import re
+import subprocess
+import sys
+import time
+import traceback
+
+from crossrunner.test import TestEntry
+
+LOG_DIR = 'log'
+RESULT_HTML = 'result.html'
+RESULT_JSON = 'results.json'
+FAIL_JSON = 'known_failures_%s.json'
+
+
+def generate_known_failures(testdir, overwrite, save, out):
+  def collect_failures(results):
+    success_index = 5
+    for r in results:
+      if not r[success_index]:
+        yield TestEntry.get_name(*r)
+  try:
+    with open(os.path.join(testdir, RESULT_JSON), 'r') as fp:
+      results = json.load(fp)
+  except IOError:
+    sys.stderr.write('Unable to load last result. Did you run tests ?\n')
+    return False
+  fails = collect_failures(results['results'])
+  if not overwrite:
+    known = load_known_failures(testdir)
+    known.extend(fails)
+    fails = known
+  fails_json = json.dumps(sorted(set(fails)), indent=2)
+  if save:
+    with open(os.path.join(testdir, FAIL_JSON % platform.system()), 'w+') as fp:
+      fp.write(fails_json)
+    sys.stdout.write('Successfully updated known failures.\n')
+  if out:
+    sys.stdout.write(fails_json)
+    sys.stdout.write('\n')
+  return True
+
+
+def load_known_failures(testdir):
+  try:
+    with open(os.path.join(testdir, FAIL_JSON % platform.system()), 'r') as fp:
+      return json.load(fp)
+  except IOError:
+    return []
+
+
+class TestReporter(object):
+  # Unfortunately, standard library doesn't handle timezone well
+  # DATETIME_FORMAT = '%a %b %d %H:%M:%S %Z %Y'
+  DATETIME_FORMAT = '%a %b %d %H:%M:%S %Y'
+
+  def __init__(self):
+    self._log = multiprocessing.get_logger()
+    self._lock = multiprocessing.Lock()
+
+  @classmethod
+  def test_logfile(cls, dir, test_name, prog_kind):
+    return os.path.realpath(os.path.join(
+      dir, 'log', '%s_%s.log' % (test_name, prog_kind)))
+
+  def _start(self):
+    self._start_time = time.time()
+
+  @property
+  def _elapsed(self):
+    return time.time() - self._start_time
+
+  @classmethod
+  def _format_date(cls):
+    return '%s' % datetime.datetime.now().strftime(cls.DATETIME_FORMAT)
+
+  def _print_date(self):
+    self.out.write('%s\n' % self._format_date())
+
+  def _print_bar(self, out=None):
+    (out or self.out).write(
+      '======================================================================\n')
+
+  def _print_exec_time(self):
+    self.out.write('Test execution took {:.1f} seconds.\n'.format(self._elapsed))
+
+
+class ExecReporter(TestReporter):
+  def __init__(self, testdir, test, prog):
+    super(ExecReporter, self).__init__()
+    self._test = test
+    self._prog = prog
+    self.logpath = self.test_logfile(testdir, test.name, prog.kind)
+    self.out = None
+
+  def begin(self):
+    self._start()
+    self._open()
+    if self.out and not self.out.closed:
+      self._print_header()
+    else:
+      self._log.debug('Output stream is not available.')
+
+  def end(self, returncode):
+    self._lock.acquire()
+    try:
+      if self.out and not self.out.closed:
+        self._print_footer(returncode)
+        self._close()
+        self.out = None
+      else:
+        self._log.debug('Output stream is not available.')
+    finally:
+      self._lock.release()
+
+  def killed(self):
+    self._lock.acquire()
+    try:
+      if self.out and not self.out.closed:
+        self._print_footer()
+        self._close()
+        self.out = None
+      else:
+        self._log.debug('Output stream is not available.')
+    finally:
+      self._lock.release()
+
+  _init_failure_exprs = {
+    'server': list(map(re.compile, [
+      '[Aa]ddress already in use',
+      'Could not bind',
+      'EADDRINUSE',
+    ])),
+    'client': list(map(re.compile, [
+      '[Cc]onnection refused',
+      'Could not connect to localhost',
+      'ECONNREFUSED',
+      'No such file or directory',  # domain socket
+    ])),
+  }
+
+  def maybe_false_positive(self):
+    """Searches through log file for socket bind error.
+    Returns True if suspicious expression is found, otherwise False"""
+    def match(line):
+      for expr in exprs:
+        if expr.search(line):
+          return True
+    try:
+      if self.out and not self.out.closed:
+        self.out.flush()
+      exprs = list(map(re.compile, self._init_failure_exprs[self._prog.kind]))
+
+      server_logfile = self.logpath
+      # need to handle unicode errors on Python 3
+      kwargs = {} if sys.version_info[0] < 3 else {'errors': 'replace'}
+      with open(server_logfile, 'r', **kwargs) as fp:
+        if any(map(match, fp)):
+          return True
+    except (KeyboardInterrupt, SystemExit):
+      raise
+    except Exception as ex:
+      self._log.warn('[%s]: Error while detecting false positive: %s' % (self._test.name, str(ex)))
+      self._log.info(traceback.print_exc())
+    return False
+
+  def _open(self):
+    self.out = open(self.logpath, 'w+')
+
+  def _close(self):
+    self.out.close()
+
+  def _print_header(self):
+    self._print_date()
+    self.out.write('Executing: %s\n' % ' '.join(self._prog.command))
+    self.out.write('Directory: %s\n' % self._prog.workdir)
+    self.out.write('config:delay: %s\n' % self._test.delay)
+    self.out.write('config:timeout: %s\n' % self._test.timeout)
+    self._print_bar()
+    self.out.flush()
+
+  def _print_footer(self, returncode=None):
+    self._print_bar()
+    if returncode is not None:
+      self.out.write('Return code: %d\n' % returncode)
+    else:
+      self.out.write('Process is killed.\n')
+    self._print_exec_time()
+    self._print_date()
+
+
+class SummaryReporter(TestReporter):
+  def __init__(self, testdir, concurrent=True):
+    super(SummaryReporter, self).__init__()
+    self.testdir = testdir
+    self.logdir = os.path.join(testdir, LOG_DIR)
+    self.out_path = os.path.join(testdir, RESULT_JSON)
+    self.concurrent = concurrent
+    self.out = sys.stdout
+    self._platform = platform.system()
+    self._revision = self._get_revision()
+    self._tests = []
+    if not os.path.exists(self.logdir):
+      os.mkdir(self.logdir)
+    self._known_failures = load_known_failures(testdir)
+    self._unexpected_success = []
+    self._unexpected_failure = []
+    self._expected_failure = []
+    self._print_header()
+
+  def _get_revision(self):
+    p = subprocess.Popen(['git', 'rev-parse', '--short', 'HEAD'],
+                         cwd=self.testdir, stdout=subprocess.PIPE)
+    out, _ = p.communicate()
+    return out.strip()
+
+  def _format_test(self, test, with_result=True):
+    name = '%s-%s' % (test.server.name, test.client.name)
+    trans = '%s-%s' % (test.transport, test.socket)
+    if not with_result:
+      return '{:19s}{:13s}{:25s}'.format(name[:18], test.protocol[:12], trans[:24])
+    else:
+      result = 'success' if test.success else (
+          'timeout' if test.expired else 'failure')
+      result_string = '%s(%d)' % (result, test.returncode)
+      return '{:19s}{:13s}{:25s}{:s}\n'.format(name[:18], test.protocol[:12], trans[:24], result_string)
+
+  def _print_test_header(self):
+    self._print_bar()
+    self.out.write(
+      '{:19s}{:13s}{:25s}{:s}\n'.format('server-client:', 'protocol:', 'transport:', 'result:'))
+
+  def _print_header(self):
+    self._start()
+    self.out.writelines([
+      'Apache Thrift - Integration Test Suite\n',
+    ])
+    self._print_date()
+    self._print_test_header()
+
+  def _print_unexpected_failure(self):
+    if len(self._unexpected_failure) > 0:
+      self.out.writelines([
+        '*** Following %d failures were unexpected ***:\n' % len(self._unexpected_failure),
+        'If it is introduced by you, please fix it before submitting the code.\n',
+        # 'If not, please report at https://issues.apache.org/jira/browse/THRIFT\n',
+      ])
+      self._print_test_header()
+      for i in self._unexpected_failure:
+        self.out.write(self._format_test(self._tests[i]))
+      self._print_bar()
+    else:
+      self.out.write('No unexpected failures.\n')
+
+  def _print_unexpected_success(self):
+    if len(self._unexpected_success) > 0:
+      self.out.write(
+        'Following %d tests were known to fail but succeeded (it\'s normal):\n' % len(self._unexpected_success))
+      self._print_test_header()
+      for i in self._unexpected_success:
+        self.out.write(self._format_test(self._tests[i]))
+      self._print_bar()
+
+  def _print_footer(self):
+    fail_count = len(self._expected_failure) + len(self._unexpected_failure)
+    self._print_bar()
+    self._print_unexpected_success()
+    self._print_unexpected_failure()
+    self._write_html_data()
+    self._assemble_log('unexpected failures', self._unexpected_failure)
+    self._assemble_log('known failures', self._expected_failure)
+    self.out.writelines([
+      'You can browse results at:\n',
+      '\tfile://%s/%s\n' % (self.testdir, RESULT_HTML),
+      'Full log for each test is here:\n',
+      '\ttest/log/client_server_protocol_transport_client.log\n',
+      '\ttest/log/client_server_protocol_transport_server.log\n',
+      '%d failed of %d tests in total.\n' % (fail_count, len(self._tests)),
+    ])
+    self._print_exec_time()
+    self._print_date()
+
+  def _render_result(self, test):
+    return [
+      test.server.name,
+      test.client.name,
+      test.protocol,
+      test.transport,
+      test.socket,
+      test.success,
+      test.as_expected,
+      test.returncode,
+      {
+        'server': self.test_logfile(test.testdir, test.name, test.server.kind),
+        'client': self.test_logfile(test.testdir, test.name, test.client.kind),
+      },
+    ]
+
+  def _write_html_data(self):
+    """Writes JSON data to be read by result html"""
+    results = [self._render_result(r) for r in self._tests]
+    with open(self.out_path, 'w+') as fp:
+      fp.write(json.dumps({
+        'date': self._format_date(),
+        'revision': str(self._revision),
+        'platform': self._platform,
+        'duration': '{:.1f}'.format(self._elapsed),
+        'results': results,
+      }, indent=2))
+
+  def _assemble_log(self, title, indexes):
+    if len(indexes) > 0:
+      def add_prog_log(fp, test, prog_kind):
+        fp.write('*************************** %s message ***************************\n'
+                 % prog_kind)
+        path = self.test_logfile(self.testdir, test.name, prog_kind)
+        kwargs = {} if sys.version_info[0] < 3 else {'errors': 'replace'}
+        with open(path, 'r', **kwargs) as prog_fp:
+          fp.write(prog_fp.read())
+      filename = title.replace(' ', '_') + '.log'
+      with open(os.path.join(self.logdir, filename), 'w+') as fp:
+        for test in map(self._tests.__getitem__, indexes):
+          fp.write('TEST: [%s]\n' % test.name)
+          add_prog_log(fp, test, test.server.kind)
+          add_prog_log(fp, test, test.client.kind)
+          fp.write('**********************************************************************\n\n')
+      self.out.write('%s are logged to test/%s/%s\n' % (title.capitalize(), LOG_DIR, filename))
+
+  def end(self):
+    self._print_footer()
+    return len(self._unexpected_failure) == 0
+
+  def add_test(self, test_dict):
+    test = TestEntry(self.testdir, **test_dict)
+    self._lock.acquire()
+    try:
+      if not self.concurrent:
+        self.out.write(self._format_test(test, False))
+        self.out.flush()
+      self._tests.append(test)
+      return len(self._tests) - 1
+    finally:
+      self._lock.release()
+
+  def add_result(self, index, returncode, expired):
+    self._lock.acquire()
+    try:
+      failed = returncode is None or returncode != 0
+      test = self._tests[index]
+      known = test.name in self._known_failures
+      if failed:
+        if known:
+          self._log.debug('%s failed as expected' % test.name)
+          self._expected_failure.append(index)
+        else:
+          self._log.info('unexpected failure: %s' % test.name)
+          self._unexpected_failure.append(index)
+      elif known:
+        self._log.info('unexpected success: %s' % test.name)
+        self._unexpected_success.append(index)
+      test.success = not failed
+      test.returncode = returncode
+      test.expired = expired
+      test.as_expected = known == failed
+      if not self.concurrent:
+        result = 'success' if not failed else 'failure'
+        result_string = '%s(%d)' % (result, returncode)
+        self.out.write(result_string + '\n')
+      else:
+        self.out.write(self._format_test(test))
+    finally:
+      self._lock.release()
diff --git a/test/crossrunner/run.py b/test/crossrunner/run.py
new file mode 100644
index 0000000..e3300ba
--- /dev/null
+++ b/test/crossrunner/run.py
@@ -0,0 +1,317 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import contextlib
+import multiprocessing
+import multiprocessing.managers
+import os
+import platform
+import random
+import socket
+import signal
+import subprocess
+import threading
+import time
+import traceback
+
+from crossrunner.test import TestEntry, domain_socket_path
+from crossrunner.report import ExecReporter, SummaryReporter
+
+RESULT_TIMEOUT = 128
+RESULT_ERROR = 64
+
+
+class ExecutionContext(object):
+  def __init__(self, cmd, cwd, env, report):
+    self._log = multiprocessing.get_logger()
+    self.report = report
+    self.cmd = cmd
+    self.cwd = cwd
+    self.env = env
+    self.timer = None
+    self.expired = False
+
+  def _expire(self):
+    self._log.info('Timeout')
+    self.expired = True
+    self.kill()
+
+  def kill(self):
+    self._log.debug('Killing process : %d' % self.proc.pid)
+    if platform.system() != 'Windows':
+      try:
+        os.killpg(self.proc.pid, signal.SIGKILL)
+      except Exception as err:
+        self._log.info('Failed to kill process group : %s' % str(err))
+    try:
+      self.proc.kill()
+    except Exception as err:
+      self._log.info('Failed to kill process : %s' % str(err))
+    self.report.killed()
+
+  def _popen_args(self):
+    args = {
+      'cwd': self.cwd,
+      'env': self.env,
+      'stdout': self.report.out,
+      'stderr': subprocess.STDOUT,
+    }
+    # make sure child processes doesn't remain after killing
+    if platform.system() == 'Windows':
+      DETACHED_PROCESS = 0x00000008
+      args.update(creationflags=DETACHED_PROCESS | subprocess.CREATE_NEW_PROCESS_GROUP)
+    else:
+      args.update(preexec_fn=os.setsid)
+    return args
+
+  def start(self, timeout=0):
+    self._log.debug('COMMAND: %s', ' '.join(self.cmd))
+    self._log.debug('WORKDIR: %s', self.cwd)
+    self._log.debug('LOGFILE: %s', self.report.logpath)
+    self.report.begin()
+    self.proc = subprocess.Popen(self.cmd, **self._popen_args())
+    if timeout > 0:
+      self.timer = threading.Timer(timeout, self._expire)
+      self.timer.start()
+    return self._scoped()
+
+  @contextlib.contextmanager
+  def _scoped(self):
+    yield self
+    self._log.debug('Killing scoped process')
+    self.kill()
+
+  def wait(self):
+    self.proc.communicate()
+    if self.timer:
+      self.timer.cancel()
+    self.report.end(self.returncode)
+
+  @property
+  def returncode(self):
+    return self.proc.returncode if self.proc else None
+
+
+def exec_context(port, testdir, test, prog):
+  report = ExecReporter(testdir, test, prog)
+  prog.build_command(port)
+  return ExecutionContext(prog.command, prog.workdir, prog.env, report)
+
+
+def run_test(testdir, test_dict, async=True, max_retry=3):
+  try:
+    logger = multiprocessing.get_logger()
+    retry_count = 0
+    test = TestEntry(testdir, **test_dict)
+    while True:
+      if stop.is_set():
+        logger.debug('Skipping because shutting down')
+        return None
+      logger.debug('Start')
+      with PortAllocator.alloc_port_scoped(ports, test.socket) as port:
+        logger.debug('Start with port %d' % port)
+        sv = exec_context(port, testdir, test, test.server)
+        cl = exec_context(port, testdir, test, test.client)
+
+        logger.debug('Starting server')
+        with sv.start():
+          if test.delay > 0:
+            logger.debug('Delaying client for %.2f seconds' % test.delay)
+            time.sleep(test.delay)
+          cl_retry_count = 0
+          cl_max_retry = 10
+          cl_retry_wait = 0.5
+          while True:
+            logger.debug('Starting client')
+            cl.start(test.timeout)
+            logger.debug('Waiting client')
+            cl.wait()
+            if not cl.report.maybe_false_positive() or cl_retry_count >= cl_max_retry:
+              if cl_retry_count > 0 and cl_retry_count < cl_max_retry:
+                logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, cl_retry_count, cl_retry_wait))
+              break
+            logger.debug('Server may not be ready, waiting %.2f second...' % cl_retry_wait)
+            time.sleep(cl_retry_wait)
+            cl_retry_count += 1
+
+      if not sv.report.maybe_false_positive() or retry_count >= max_retry:
+        logger.debug('Finish')
+        return RESULT_TIMEOUT if cl.expired else cl.proc.returncode
+      logger.warn('[%s]: Detected socket bind failure, retrying...' % test.server.name)
+      retry_count += 1
+  except (KeyboardInterrupt, SystemExit):
+    logger.info('Interrupted execution')
+    if not async:
+      raise
+    stop.set()
+    return None
+  except Exception as ex:
+    logger.warn('Error while executing test : %s' % str(ex))
+    if not async:
+      raise
+    logger.info(traceback.print_exc())
+    return RESULT_ERROR
+
+
+class PortAllocator(object):
+  def __init__(self):
+    self._log = multiprocessing.get_logger()
+    self._lock = multiprocessing.Lock()
+    self._ports = set()
+    self._dom_ports = set()
+    self._last_alloc = 0
+
+  def _get_tcp_port(self):
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    sock.bind(('127.0.0.1', 0))
+    port = sock.getsockname()[1]
+    self._lock.acquire()
+    try:
+      ok = port not in self._ports
+      if ok:
+        self._ports.add(port)
+        self._last_alloc = time.time()
+    finally:
+      self._lock.release()
+      sock.close()
+    return port if ok else self._get_tcp_port()
+
+  def _get_domain_port(self):
+    port = random.randint(1024, 65536)
+    self._lock.acquire()
+    try:
+      ok = port not in self._dom_ports
+      if ok:
+        self._dom_ports.add(port)
+    finally:
+      self._lock.release()
+    return port if ok else self._get_domain_port()
+
+  def alloc_port(self, socket_type):
+    if socket_type == 'domain':
+      return self._get_domain_port()
+    else:
+      return self._get_tcp_port()
+
+  # static method for inter-process invokation
+  @staticmethod
+  @contextlib.contextmanager
+  def alloc_port_scoped(allocator, socket_type):
+    port = allocator.alloc_port(socket_type)
+    yield port
+    allocator.free_port(socket_type, port)
+
+  def free_port(self, socket_type, port):
+    self._log.debug('free_port')
+    self._lock.acquire()
+    try:
+      if socket_type == 'domain':
+        self._dom_ports.remove(port)
+        path = domain_socket_path(port)
+        if os.path.exists(path):
+          os.remove(path)
+      else:
+        self._ports.remove(port)
+    except IOError as err:
+      self._log.info('Error while freeing port : %s' % str(err))
+    finally:
+      self._lock.release()
+
+
+class NonAsyncResult(object):
+  def __init__(self, value):
+    self._value = value
+
+  def get(self, timeout=None):
+    return self._value
+
+  def wait(self, timeout=None):
+    pass
+
+  def ready(self):
+    return True
+
+  def successful(self):
+    return self._value == 0
+
+
+class TestDispatcher(object):
+  def __init__(self, testdir, concurrency):
+    self._log = multiprocessing.get_logger()
+    self.testdir = testdir
+    # seems needed for python 2.x to handle keyboard interrupt
+    self._stop = multiprocessing.Event()
+    self._async = concurrency > 1
+    if not self._async:
+      self._pool = None
+      global stop
+      global ports
+      stop = self._stop
+      ports = PortAllocator()
+    else:
+      self._m = multiprocessing.managers.BaseManager()
+      self._m.register('ports', PortAllocator)
+      self._m.start()
+      self._pool = multiprocessing.Pool(concurrency, self._pool_init, (self._m.address,))
+    self._report = SummaryReporter(testdir, concurrency > 1)
+    self._log.debug(
+        'TestDispatcher started with %d concurrent jobs' % concurrency)
+
+  def _pool_init(self, address):
+    global stop
+    global m
+    global ports
+    stop = self._stop
+    m = multiprocessing.managers.BaseManager(address)
+    m.connect()
+    ports = m.ports()
+
+  def _dispatch_sync(self, test, cont):
+    r = run_test(self.testdir, test, False)
+    cont(r)
+    return NonAsyncResult(r)
+
+  def _dispatch_async(self, test, cont):
+    return self._pool.apply_async(func=run_test, args=(self.testdir, test,), callback=cont)
+
+  def dispatch(self, test):
+    index = self._report.add_test(test)
+
+    def cont(r):
+      if not self._stop.is_set():
+        self._log.debug('freeing port')
+        self._log.debug('adding result')
+        self._report.add_result(index, r, r == RESULT_TIMEOUT)
+        self._log.debug('finish continuation')
+    fn = self._dispatch_async if self._async else self._dispatch_sync
+    return fn(test, cont)
+
+  def wait(self):
+    if self._async:
+      self._pool.close()
+      self._pool.join()
+      self._m.shutdown()
+    return self._report.end()
+
+  def terminate(self):
+    self._stop.set()
+    if self._async:
+      self._pool.terminate()
+      self._pool.join()
+      self._m.shutdown()
diff --git a/test/crossrunner/test.py b/test/crossrunner/test.py
new file mode 100644
index 0000000..512e664
--- /dev/null
+++ b/test/crossrunner/test.py
@@ -0,0 +1,136 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import copy
+import multiprocessing
+import os
+import sys
+
+from crossrunner.util import merge_dict
+
+
+def domain_socket_path(port):
+  return '/tmp/ThriftTest.thrift.%d' % port
+
+
+class TestProgram(object):
+  def __init__(self, kind, name, protocol, transport, socket, workdir, command, env=None,
+               extra_args=[], join_args=False, **kwargs):
+    self.kind = kind
+    self.name = name
+    self.protocol = protocol
+    self.transport = transport
+    self.socket = socket
+    self.workdir = workdir
+    self.command = None
+    self._base_command = self._fix_cmd_path(command)
+    if env:
+      self.env = copy.copy(os.environ)
+      self.env.update(env)
+    else:
+      self.env = os.environ
+    self._extra_args = extra_args
+    self._join_args = join_args
+
+  def _fix_cmd_path(self, cmd):
+    # if the arg is a file in the current directory, make it path
+    def abs_if_exists(arg):
+      p = os.path.join(self.workdir, arg)
+      return p if os.path.exists(p) else arg
+
+    if cmd[0] == 'python':
+      cmd[0] = sys.executable
+    else:
+      cmd[0] = abs_if_exists(cmd[0])
+    return cmd
+
+  def _socket_arg(self, socket, port):
+    return {
+      'ip-ssl': '--ssl',
+      'domain': '--domain-socket=%s' % domain_socket_path(port),
+    }.get(socket, None)
+
+  def build_command(self, port):
+    cmd = copy.copy(self._base_command)
+    args = []
+    args.append('--protocol=' + self.protocol)
+    args.append('--transport=' + self.transport)
+    socket_arg = self._socket_arg(self.socket, port)
+    if socket_arg:
+      args.append(socket_arg)
+    args.append('--port=%d' % port)
+    if self._join_args:
+      cmd.append('%s' % " ".join(args))
+    else:
+      cmd.extend(args)
+    if self._extra_args:
+      cmd.extend(self._extra_args)
+    self.command = cmd
+    return self.command
+
+
+class TestEntry(object):
+  def __init__(self, testdir, server, client, delay, timeout, **kwargs):
+    self.testdir = testdir
+    self._log = multiprocessing.get_logger()
+    self._config = kwargs
+    self.protocol = kwargs['protocol']
+    self.transport = kwargs['transport']
+    self.socket = kwargs['socket']
+    self.server = TestProgram('server', **self._fix_workdir(merge_dict(self._config, server)))
+    self.client = TestProgram('client', **self._fix_workdir(merge_dict(self._config, client)))
+    self.delay = delay
+    self.timeout = timeout
+    self._name = None
+    # results
+    self.success = None
+    self.as_expected = None
+    self.returncode = None
+    self.expired = False
+
+  def _fix_workdir(self, config):
+    key = 'workdir'
+    path = config.get(key, None)
+    if not path:
+      path = self.testdir
+    if os.path.isabs(path):
+      path = os.path.realpath(path)
+    else:
+      path = os.path.realpath(os.path.join(self.testdir, path))
+    config.update({key: path})
+    return config
+
+  @classmethod
+  def get_name(cls, server, client, proto, trans, sock, *args):
+    return '%s-%s_%s_%s-%s' % (server, client, proto, trans, sock)
+
+  @property
+  def name(self):
+    if not self._name:
+      self._name = self.get_name(
+          self.server.name, self.client.name, self.protocol, self.transport, self.socket)
+    return self._name
+
+  @property
+  def transport_name(self):
+    return '%s-%s' % (self.transport, self.socket)
+
+
+def test_name(server, client, protocol, transport, socket, **kwargs):
+  return TestEntry.get_name(server['name'], client['name'], protocol, transport, socket)
diff --git a/test/crossrunner/util.py b/test/crossrunner/util.py
new file mode 100644
index 0000000..750ed47
--- /dev/null
+++ b/test/crossrunner/util.py
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+import copy
+
+
+def merge_dict(base, update):
+  """Update dict concatenating list values"""
+  res = copy.deepcopy(base)
+  for k, v in list(update.items()):
+    if k in list(res.keys()) and isinstance(v, list):
+      res[k].extend(v)
+    else:
+      res[k] = v
+  return res