THRIFT-3579 Introduce retry to make cross

This closes #817
diff --git a/Makefile.am b/Makefile.am
index 8ed400d..46e7d4f 100755
--- a/Makefile.am
+++ b/Makefile.am
@@ -57,8 +57,8 @@
 
 if WITH_PYTHON
 cross: precross
-	$(CROSS_PY) test/test.py -F.* -s --server $(CROSS_LANGS_COMMA_SEPARATED)
-	$(CROSS_PY) test/test.py -s --server $(CROSS_LANGS_COMMA_SEPARATED) --client $(CROSS_LANGS_COMMA_SEPARATED)
+	$(CROSS_PY) test/test.py --retry-count 3 --features .* --skip-known-failures --server $(CROSS_LANGS_COMMA_SEPARATED)
+	$(CROSS_PY) test/test.py --retry-count 3 --skip-known-failures --server $(CROSS_LANGS_COMMA_SEPARATED) --client $(CROSS_LANGS_COMMA_SEPARATED)
 else
 # feature test needs python build
 cross: precross
diff --git a/test/crossrunner/report.py b/test/crossrunner/report.py
index a84e891..be7271c 100644
--- a/test/crossrunner/report.py
+++ b/test/crossrunner/report.py
@@ -100,14 +100,15 @@
     return '%s' % datetime.datetime.now().strftime(cls.DATETIME_FORMAT)
 
   def _print_date(self):
-    self.out.write('%s\n' % self._format_date())
+    print(self._format_date(), file=self.out)
 
   def _print_bar(self, out=None):
-    (out or self.out).write(
-      '======================================================================\n')
+    print(
+      '==========================================================================',
+      file=(out or self.out))
 
   def _print_exec_time(self):
-    self.out.write('Test execution took {:.1f} seconds.\n'.format(self._elapsed))
+    print('Test execution took {:.1f} seconds.'.format(self._elapsed), file=self.out)
 
 
 class ExecReporter(TestReporter):
@@ -139,11 +140,13 @@
       self._lock.release()
 
   def killed(self):
-    self.out.write('Process is killed.\n')
+    print(file=self.out)
+    print('Server process is successfully killed.', file=self.out)
     self.end(None)
 
   def died(self):
-    self.out.write('Process is died unexpectedly.\n')
+    print(file=self.out)
+    print('*** Server process has died unexpectedly ***', file=self.out)
     self.end(None)
 
   _init_failure_exprs = {
@@ -191,19 +194,19 @@
 
   def _print_header(self):
     self._print_date()
-    self.out.write('Executing: %s\n' % str_join(' ', self._prog.command))
-    self.out.write('Directory: %s\n' % self._prog.workdir)
-    self.out.write('config:delay: %s\n' % self._test.delay)
-    self.out.write('config:timeout: %s\n' % self._test.timeout)
+    print('Executing: %s' % str_join(' ', self._prog.command), file=self.out)
+    print('Directory: %s' % self._prog.workdir, file=self.out)
+    print('config:delay: %s' % self._test.delay, file=self.out)
+    print('config:timeout: %s' % self._test.timeout, file=self.out)
     self._print_bar()
     self.out.flush()
 
   def _print_footer(self, returncode=None):
     self._print_bar()
     if returncode is not None:
-      self.out.write('Return code: %d\n' % returncode)
+      print('Return code: %d' % returncode, file=self.out)
     else:
-      self.out.write('Process is killed.\n')
+      print('Process is killed.', file=self.out)
     self._print_exec_time()
     self._print_date()
 
@@ -224,6 +227,7 @@
       os.mkdir(self.logdir)
     self._known_failures = load_known_failures(self.testdir)
     self._unexpected_success = []
+    self._flaky_success = []
     self._unexpected_failure = []
     self._expected_failure = []
     self._print_header()
@@ -232,6 +236,19 @@
   def testdir(self):
     return path_join(self._basedir, self._testdir_rel)
 
+  def _result_string(self, test):
+    if test.success:
+      if test.retry_count == 0:
+        return 'success'
+      elif test.retry_count == 1:
+        return 'flaky(1 retry)'
+      else:
+        return 'flaky(%d retries)' % test.retry_count
+    elif test.expired:
+      return 'failure(timeout)'
+    else:
+      return 'failure(%d)' % test.returncode
+
   def _get_revision(self):
     p = subprocess.Popen(['git', 'rev-parse', '--short', 'HEAD'],
                          cwd=self.testdir, stdout=subprocess.PIPE)
@@ -242,23 +259,19 @@
     name = '%s-%s' % (test.server.name, test.client.name)
     trans = '%s-%s' % (test.transport, test.socket)
     if not with_result:
-      return '{:19s}{:13s}{:25s}'.format(name[:18], test.protocol[:12], trans[:24])
+      return '{:24s}{:13s}{:25s}'.format(name[:23], test.protocol[:12], trans[:24])
     else:
-      result = 'success' if test.success else (
-          'timeout' if test.expired else 'failure')
-      result_string = '%s(%d)' % (result, test.returncode)
-      return '{:19s}{:13s}{:25s}{:s}\n'.format(name[:18], test.protocol[:12], trans[:24], result_string)
+      return '{:24s}{:13s}{:25s}{:s}\n'.format(name[:23], test.protocol[:12], trans[:24], self._result_string(test))
 
   def _print_test_header(self):
     self._print_bar()
-    self.out.write(
-      '{:19s}{:13s}{:25s}{:s}\n'.format('server-client:', 'protocol:', 'transport:', 'result:'))
+    print(
+      '{:24s}{:13s}{:25s}{:s}'.format('server-client:', 'protocol:', 'transport:', 'result:'),
+      file=self.out)
 
   def _print_header(self):
     self._start()
-    self.out.writelines([
-      'Apache Thrift - Integration Test Suite\n',
-    ])
+    print('Apache Thrift - Integration Test Suite', file=self.out)
     self._print_date()
     self._print_test_header()
 
@@ -274,12 +287,23 @@
         self.out.write(self._format_test(self._tests[i]))
       self._print_bar()
     else:
-      self.out.write('No unexpected failures.\n')
+      print('No unexpected failures.', file=self.out)
+
+  def _print_flaky_success(self):
+    if len(self._flaky_success) > 0:
+      print(
+          'Following %d tests were expected to cleanly succeed but needed retry:' % len(self._flaky_success),
+          file=self.out)
+      self._print_test_header()
+      for i in self._flaky_success:
+        self.out.write(self._format_test(self._tests[i]))
+      self._print_bar()
 
   def _print_unexpected_success(self):
     if len(self._unexpected_success) > 0:
-      self.out.write(
-        'Following %d tests were known to fail but succeeded (it\'s normal):\n' % len(self._unexpected_success))
+      print(
+        'Following %d tests were known to fail but succeeded (maybe flaky):' % len(self._unexpected_success),
+        file=self.out)
       self._print_test_header()
       for i in self._unexpected_success:
         self.out.write(self._format_test(self._tests[i]))
@@ -295,13 +319,14 @@
     fail_count = len(self._expected_failure) + len(self._unexpected_failure)
     self._print_bar()
     self._print_unexpected_success()
+    self._print_flaky_success()
     self._print_unexpected_failure()
     self._write_html_data()
     self._assemble_log('unexpected failures', self._unexpected_failure)
     self._assemble_log('known failures', self._expected_failure)
     self.out.writelines([
       'You can browse results at:\n',
-      '\tfile://%s/%s\n' % (self._basedir, RESULT_HTML),
+      '\tfile://%s/%s\n' % (self.testdir, RESULT_HTML),
       '# If you use Chrome, run:\n',
       '# \tcd %s\n#\t%s\n' % (self._basedir, self._http_server_command(8001)),
       '# then browse:\n',
@@ -358,7 +383,7 @@
           add_prog_log(fp, test, test.server.kind)
           add_prog_log(fp, test, test.client.kind)
           fp.write('**********************************************************************\n\n')
-      print('%s are logged to test/%s/%s' % (title.capitalize(), LOG_DIR, filename))
+      print('%s are logged to %s/%s/%s' % (title.capitalize(), self._testdir_rel, LOG_DIR, filename))
 
   def end(self):
     self._print_footer()
@@ -376,10 +401,11 @@
     finally:
       self._lock.release()
 
-  def add_result(self, index, returncode, expired):
+  def add_result(self, index, returncode, expired, retry_count):
     self._lock.acquire()
     try:
       failed = returncode is None or returncode != 0
+      flaky = not failed and retry_count != 0
       test = self._tests[index]
       known = test.name in self._known_failures
       if failed:
@@ -389,17 +415,19 @@
         else:
           self._log.info('unexpected failure: %s' % test.name)
           self._unexpected_failure.append(index)
-      elif known:
+      elif flaky and not known:
+        self._log.info('unexpected flaky success: %s' % test.name)
+        self._flaky_success.append(index)
+      elif not flaky and known:
         self._log.info('unexpected success: %s' % test.name)
         self._unexpected_success.append(index)
       test.success = not failed
       test.returncode = returncode
+      test.retry_count = retry_count
       test.expired = expired
       test.as_expected = known == failed
       if not self.concurrent:
-        result = 'success' if not failed else 'failure'
-        result_string = '%s(%d)' % (result, returncode)
-        self.out.write(result_string + '\n')
+        self.out.write(self._result_string(test) + '\n')
       else:
         self.out.write(self._format_test(test))
     finally:
diff --git a/test/crossrunner/run.py b/test/crossrunner/run.py
index 0d617c0..68bd928 100644
--- a/test/crossrunner/run.py
+++ b/test/crossrunner/run.py
@@ -60,12 +60,12 @@
     if platform.system() != 'Windows':
       try:
         os.killpg(self.proc.pid, signal.SIGKILL)
-      except Exception as err:
-        self._log.info('Failed to kill process group : %s' % str(err))
+      except Exception:
+        self._log.info('Failed to kill process group', exc_info=sys.exc_info())
     try:
       self.proc.kill()
-    except Exception as err:
-      self._log.info('Failed to kill process : %s' % str(err))
+    except Exception:
+      self._log.info('Failed to kill process', exc_info=sys.exc_info())
 
   def _popen_args(self):
     args = {
@@ -122,15 +122,17 @@
   return ExecutionContext(prog.command, prog.workdir, prog.env, report)
 
 
-def run_test(testdir, logdir, test_dict, async=True, max_retry=3):
+def run_test(testdir, logdir, test_dict, max_retry, async=True):
   try:
     logger = multiprocessing.get_logger()
+    max_bind_retry = 3
     retry_count = 0
+    bind_retry_count = 0
     test = TestEntry(testdir, **test_dict)
     while True:
       if stop.is_set():
         logger.debug('Skipping because shutting down')
-        return None
+        return (retry_count, None)
       logger.debug('Start')
       with PortAllocator.alloc_port_scoped(ports, test.socket) as port:
         logger.debug('Start with port %d' % port)
@@ -142,35 +144,41 @@
           if test.delay > 0:
             logger.debug('Delaying client for %.2f seconds' % test.delay)
             time.sleep(test.delay)
-          cl_retry_count = 0
-          cl_max_retry = 10
-          cl_retry_wait = 0.5
+          connect_retry_count = 0
+          max_connect_retry = 10
+          connect_retry_wait = 0.5
           while True:
             logger.debug('Starting client')
             cl.start(test.timeout)
             logger.debug('Waiting client')
             cl.wait()
-            if not cl.report.maybe_false_positive() or cl_retry_count >= cl_max_retry:
-              if cl_retry_count > 0 and cl_retry_count < cl_max_retry:
-                logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, cl_retry_count, cl_retry_wait))
-              # Wait for 50 ms to see if server does not die at the end.
+            if not cl.report.maybe_false_positive() or connect_retry_count >= max_connect_retry:
+              if connect_retry_count > 0 and connect_retry_count < max_connect_retry:
+                logger.warn('[%s]: Connected after %d retry (%.2f sec each)' % (test.server.name, connect_retry_count, connect_retry_wait))
+              # Wait for 50ms to see if server does not die at the end.
               time.sleep(0.05)
               break
-            logger.debug('Server may not be ready, waiting %.2f second...' % cl_retry_wait)
-            time.sleep(cl_retry_wait)
-            cl_retry_count += 1
+            logger.debug('Server may not be ready, waiting %.2f second...' % connect_retry_wait)
+            time.sleep(connect_retry_wait)
+            connect_retry_count += 1
 
-      if not sv.report.maybe_false_positive() or retry_count >= max_retry:
-        logger.debug('Finish')
+      if sv.report.maybe_false_positive() and bind_retry_count < max_bind_retry:
+        logger.warn('[%s]: Detected socket bind failure, retrying...', test.server.name)
+        bind_retry_count += 1
+      else:
         if cl.expired:
-          return RESULT_TIMEOUT
+          result = RESULT_TIMEOUT
         elif not sv.killed and cl.proc.returncode == 0:
           # Server should be alive at the end.
-          return RESULT_ERROR
+          result = RESULT_ERROR
         else:
-          return cl.proc.returncode
-      logger.warn('[%s]: Detected socket bind failure, retrying...' % test.server.name)
-      retry_count += 1
+          result = cl.proc.returncode
+
+        if result == 0 or retry_count >= max_retry:
+          return (retry_count, result)
+        else:
+          logger.info('[%s-%s]: test failed, retrying...', test.server.name, test.client.name)
+          retry_count += 1
   except (KeyboardInterrupt, SystemExit):
     logger.info('Interrupted execution')
     if not async:
@@ -181,7 +189,7 @@
     if not async:
       raise
     logger.warn('Error executing [%s]', test.name, exc_info=sys.exc_info())
-    return RESULT_ERROR
+    return (retry_count, RESULT_ERROR)
 
 
 class PortAllocator(object):
@@ -245,8 +253,8 @@
         self._dom_ports.remove(port)
       else:
         self._ports.remove(port)
-    except IOError as err:
-      self._log.info('Error while freeing port : %s' % str(err))
+    except IOError:
+      self._log.info('Error while freeing port', exc_info=sys.exc_info())
     finally:
       self._lock.release()
 
@@ -300,26 +308,27 @@
     m.connect()
     ports = m.ports()
 
-  def _dispatch_sync(self, test, cont):
-    r = run_test(self.testdir, self.logdir, test, False)
+  def _dispatch_sync(self, test, cont, max_retry):
+    r = run_test(self.testdir, self.logdir, test, max_retry, False)
     cont(r)
     return NonAsyncResult(r)
 
-  def _dispatch_async(self, test, cont):
+  def _dispatch_async(self, test, cont, max_retry):
     self._log.debug('_dispatch_async')
-    return self._pool.apply_async(func=run_test, args=(self.testdir, self.logdir, test,), callback=cont)
+    return self._pool.apply_async(func=run_test, args=(self.testdir, self.logdir, test, max_retry), callback=cont)
 
-  def dispatch(self, test):
+  def dispatch(self, test, max_retry):
     index = self._report.add_test(test)
 
-    def cont(r):
+    def cont(result):
       if not self._stop.is_set():
+        retry_count, returncode = result
         self._log.debug('freeing port')
         self._log.debug('adding result')
-        self._report.add_result(index, r, r == RESULT_TIMEOUT)
+        self._report.add_result(index, returncode, returncode == RESULT_TIMEOUT, retry_count)
         self._log.debug('finish continuation')
     fn = self._dispatch_async if self._async else self._dispatch_sync
-    return fn(test, cont)
+    return fn(test, cont, max_retry)
 
   def wait(self):
     if self._async:
diff --git a/test/crossrunner/test.py b/test/crossrunner/test.py
index bb81c4f..fc90f7f 100644
--- a/test/crossrunner/test.py
+++ b/test/crossrunner/test.py
@@ -70,7 +70,7 @@
 
   def build_command(self, port):
     cmd = copy.copy(self._base_command)
-    args = self._extra_args2
+    args = copy.copy(self._extra_args2)
     args.append('--protocol=' + self.protocol)
     args.append('--transport=' + self.transport)
     socket_args = self._socket_args(self.socket, port)
@@ -109,6 +109,7 @@
     self.as_expected = None
     self.returncode = None
     self.expired = False
+    self.retry_count = 0
 
   def _fix_workdir(self, config):
     key = 'workdir'
diff --git a/test/test.py b/test/test.py
index df4c72e..a5bcd9b 100755
--- a/test/test.py
+++ b/test/test.py
@@ -45,7 +45,7 @@
 CONFIG_FILE = 'tests.json'
 
 
-def run_cross_tests(server_match, client_match, jobs, skip_known_failures):
+def run_cross_tests(server_match, client_match, jobs, skip_known_failures, retry_count):
   logger = multiprocessing.get_logger()
   logger.debug('Collecting tests')
   with open(path_join(TEST_DIR, CONFIG_FILE), 'r') as fp:
@@ -64,7 +64,7 @@
   dispatcher = crossrunner.TestDispatcher(TEST_DIR, ROOT_DIR, TEST_DIR_RELATIVE, jobs)
   logger.debug('Executing %d tests' % len(tests))
   try:
-    for r in [dispatcher.dispatch(test) for test in tests]:
+    for r in [dispatcher.dispatch(test, retry_count) for test in tests]:
       r.wait()
     logger.debug('Waiting for completion')
     return dispatcher.wait()
@@ -74,7 +74,7 @@
     return False
 
 
-def run_feature_tests(server_match, feature_match, jobs, skip_known_failures):
+def run_feature_tests(server_match, feature_match, jobs, skip_known_failures, retry_count):
   basedir = path_join(ROOT_DIR, FEATURE_DIR_RELATIVE)
   logger = multiprocessing.get_logger()
   logger.debug('Collecting tests')
@@ -96,7 +96,7 @@
   dispatcher = crossrunner.TestDispatcher(TEST_DIR, ROOT_DIR, FEATURE_DIR_RELATIVE, jobs)
   logger.debug('Executing %d tests' % len(tests))
   try:
-    for r in [dispatcher.dispatch(test) for test in tests]:
+    for r in [dispatcher.dispatch(test, retry_count) for test in tests]:
       r.wait()
     logger.debug('Waiting for completion')
     return dispatcher.wait()
@@ -120,13 +120,15 @@
                       help='list of servers to test')
   parser.add_argument('--client', default='', nargs='*',
                       help='list of clients to test')
+  parser.add_argument('-F', '--features', nargs='*', default=None,
+                      help='run server feature tests instead of cross language tests')
   parser.add_argument('-s', '--skip-known-failures', action='store_true', dest='skip_known_failures',
                       help='do not execute tests that are known to fail')
+  parser.add_argument('-r', '--retry-count', type=int,
+                      default=0, help='maximum retry on failure')
   parser.add_argument('-j', '--jobs', type=int,
                       default=default_concurrenty(),
                       help='number of concurrent test executions')
-  parser.add_argument('-F', '--features', nargs='*', default=None,
-                      help='run server feature tests instead of cross language tests')
 
   g = parser.add_argument_group(title='Advanced')
   g.add_argument('-v', '--verbose', action='store_const',
@@ -158,9 +160,9 @@
         options.update_failures, options.print_failures)
   elif options.features is not None:
     features = options.features or ['.*']
-    res = run_feature_tests(server_match, features, options.jobs, options.skip_known_failures)
+    res = run_feature_tests(server_match, features, options.jobs, options.skip_known_failures, options.retry_count)
   else:
-    res = run_cross_tests(server_match, client_match, options.jobs, options.skip_known_failures)
+    res = run_cross_tests(server_match, client_match, options.jobs, options.skip_known_failures, options.retry_count)
   return 0 if res else 1
 
 if __name__ == '__main__':