THRIFT-3516 Add feature test for THeader TBinaryProtocol

This closes #767
diff --git a/Makefile.am b/Makefile.am
index 8eb2d95..3eaa94e 100755
--- a/Makefile.am
+++ b/Makefile.am
@@ -53,6 +53,7 @@
 CROSS_LANGS_COMMA_SEPARATED = $(subst $(space),$(comma),$(CROSS_LANGS))
 
 cross: precross
+	$(PYTHON) test/test.py -F.* -s --server $(CROSS_LANGS_COMMA_SEPARATED)
 	$(PYTHON) test/test.py -s --server $(CROSS_LANGS_COMMA_SEPARATED) --client $(CROSS_LANGS_COMMA_SEPARATED)
 
 TIMES = 1 2 3
diff --git a/test/crossrunner/__init__.py b/test/crossrunner/__init__.py
index 584cc07..49025ed 100644
--- a/test/crossrunner/__init__.py
+++ b/test/crossrunner/__init__.py
@@ -18,7 +18,6 @@
 #
 
 from .test import test_name
-from .collect import collect_tests
+from .collect import collect_cross_tests, collect_feature_tests
 from .run import TestDispatcher
 from .report import generate_known_failures, load_known_failures
-from .prepare import prepare
diff --git a/test/crossrunner/collect.py b/test/crossrunner/collect.py
index c6e33e9..455189c 100644
--- a/test/crossrunner/collect.py
+++ b/test/crossrunner/collect.py
@@ -18,6 +18,7 @@
 #
 
 import platform
+import re
 from itertools import product
 
 from .util import merge_dict
@@ -51,7 +52,7 @@
 DEFAULT_TIMEOUT = 5
 
 
-def collect_testlibs(config, server_match, client_match):
+def _collect_testlibs(config, server_match, client_match=[None]):
   """Collects server/client configurations from library configurations"""
   def expand_libs(config):
     for lib in config:
@@ -73,7 +74,12 @@
   return servers, clients
 
 
-def do_collect_tests(servers, clients):
+def collect_features(config, match):
+  res = list(map(re.compile, match))
+  return list(filter(lambda c: any(map(lambda r: r.search(c['name']), res)), config))
+
+
+def _do_collect_tests(servers, clients):
   def intersection(key, o1, o2):
     """intersection of two collections.
     collections are replaced with sets the first time"""
@@ -137,6 +143,12 @@
           }
 
 
-def collect_tests(tests_dict, server_match, client_match):
-  sv, cl = collect_testlibs(tests_dict, server_match, client_match)
-  return list(do_collect_tests(sv, cl))
+def collect_cross_tests(tests_dict, server_match, client_match):
+  sv, cl = _collect_testlibs(tests_dict, server_match, client_match)
+  return list(_do_collect_tests(sv, cl))
+
+
+def collect_feature_tests(tests_dict, features_dict, server_match, feature_match):
+  sv, _ = _collect_testlibs(tests_dict, server_match)
+  ft = collect_features(features_dict, feature_match)
+  return list(_do_collect_tests(sv, ft))
diff --git a/test/crossrunner/prepare.py b/test/crossrunner/prepare.py
deleted file mode 100644
index c6784af..0000000
--- a/test/crossrunner/prepare.py
+++ /dev/null
@@ -1,55 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-import os
-import subprocess
-
-from .collect import collect_testlibs
-
-
-def prepare(config_dict, testdir, server_match, client_match):
-  libs, libs2 = collect_testlibs(config_dict, server_match, client_match)
-  libs.extend(libs2)
-
-  def prepares():
-    for lib in libs:
-      pre = lib.get('prepare')
-      if pre:
-        yield pre, lib['workdir']
-
-  def files():
-    for lib in libs:
-      workdir = os.path.join(testdir, lib['workdir'])
-      for c in lib['command']:
-        if not c.startswith('-'):
-          p = os.path.join(workdir, c)
-          if not os.path.exists(p):
-            yield os.path.split(p)
-
-  def make(p):
-    d, f = p
-    with open(os.devnull, 'w') as devnull:
-      return subprocess.Popen(['make', f], cwd=d, stderr=devnull)
-
-  for pre, d in prepares():
-    subprocess.Popen(pre, cwd=d).wait()
-
-  for p in list(map(make, set(files()))):
-    p.wait()
-  return True
diff --git a/test/crossrunner/report.py b/test/crossrunner/report.py
index 6d843d9..7840724 100644
--- a/test/crossrunner/report.py
+++ b/test/crossrunner/report.py
@@ -45,7 +45,7 @@
       if not r[success_index]:
         yield TestEntry.get_name(*r)
   try:
-    with logfile_open(os.path.join(testdir, RESULT_JSON), 'r') as fp:
+    with logfile_open(path_join(testdir, RESULT_JSON), 'r') as fp:
       results = json.load(fp)
   except IOError:
     sys.stderr.write('Unable to load last result. Did you run tests ?\n')
@@ -68,7 +68,7 @@
 
 def load_known_failures(testdir):
   try:
-    with logfile_open(os.path.join(testdir, FAIL_JSON % platform.system()), 'r') as fp:
+    with logfile_open(path_join(testdir, FAIL_JSON % platform.system()), 'r') as fp:
       return json.load(fp)
   except IOError:
     return []
@@ -85,7 +85,7 @@
 
   @classmethod
   def test_logfile(cls, test_name, prog_kind, dir=None):
-    relpath = os.path.join('log', '%s_%s.log' % (test_name, prog_kind))
+    relpath = path_join('log', '%s_%s.log' % (test_name, prog_kind))
     return relpath if not dir else os.path.realpath(path_join(dir, relpath))
 
   def _start(self):
@@ -207,8 +207,8 @@
   def __init__(self, testdir, concurrent=True):
     super(SummaryReporter, self).__init__()
     self.testdir = testdir
-    self.logdir = os.path.join(testdir, LOG_DIR)
-    self.out_path = os.path.join(testdir, RESULT_JSON)
+    self.logdir = path_join(testdir, LOG_DIR)
+    self.out_path = path_join(testdir, RESULT_JSON)
     self.concurrent = concurrent
     self.out = sys.stdout
     self._platform = platform.system()
diff --git a/test/crossrunner/run.py b/test/crossrunner/run.py
index acba335..abbd70b 100644
--- a/test/crossrunner/run.py
+++ b/test/crossrunner/run.py
@@ -110,13 +110,13 @@
     return self.proc.returncode if self.proc else None
 
 
-def exec_context(port, testdir, test, prog):
-  report = ExecReporter(testdir, test, prog)
+def exec_context(port, logdir, test, prog):
+  report = ExecReporter(logdir, test, prog)
   prog.build_command(port)
   return ExecutionContext(prog.command, prog.workdir, prog.env, report)
 
 
-def run_test(testdir, test_dict, async=True, max_retry=3):
+def run_test(testdir, logdir, test_dict, async=True, max_retry=3):
   try:
     logger = multiprocessing.get_logger()
     retry_count = 0
@@ -128,8 +128,8 @@
       logger.debug('Start')
       with PortAllocator.alloc_port_scoped(ports, test.socket) as port:
         logger.debug('Start with port %d' % port)
-        sv = exec_context(port, testdir, test, test.server)
-        cl = exec_context(port, testdir, test, test.client)
+        sv = exec_context(port, logdir, test, test.server)
+        cl = exec_context(port, logdir, test, test.client)
 
         logger.debug('Starting server')
         with sv.start():
@@ -256,9 +256,10 @@
 
 
 class TestDispatcher(object):
-  def __init__(self, testdir, concurrency):
+  def __init__(self, testdir, logdir, concurrency):
     self._log = multiprocessing.get_logger()
     self.testdir = testdir
+    self.logdir = logdir
     # seems needed for python 2.x to handle keyboard interrupt
     self._stop = multiprocessing.Event()
     self._async = concurrency > 1
@@ -273,7 +274,7 @@
       self._m.register('ports', PortAllocator)
       self._m.start()
       self._pool = multiprocessing.Pool(concurrency, self._pool_init, (self._m.address,))
-    self._report = SummaryReporter(testdir, concurrency > 1)
+    self._report = SummaryReporter(logdir, concurrency > 1)
     self._log.debug(
         'TestDispatcher started with %d concurrent jobs' % concurrency)
 
@@ -287,12 +288,13 @@
     ports = m.ports()
 
   def _dispatch_sync(self, test, cont):
-    r = run_test(self.testdir, test, False)
+    r = run_test(self.testdir, self.logdir, test, False)
     cont(r)
     return NonAsyncResult(r)
 
   def _dispatch_async(self, test, cont):
-    return self._pool.apply_async(func=run_test, args=(self.testdir, test,), callback=cont)
+    self._log.debug('_dispatch_async')
+    return self._pool.apply_async(func=run_test, args=(self.testdir, self.logdir, test,), callback=cont)
 
   def dispatch(self, test):
     index = self._report.add_test(test)
diff --git a/test/features/local_thrift/__init__.py b/test/features/local_thrift/__init__.py
new file mode 100644
index 0000000..383ee5f
--- /dev/null
+++ b/test/features/local_thrift/__init__.py
@@ -0,0 +1,14 @@
+import os
+import sys
+
+SCRIPT_DIR = os.path.realpath(os.path.dirname(__file__))
+ROOT_DIR = os.path.dirname(os.path.dirname(os.path.dirname(SCRIPT_DIR)))
+
+if sys.version_info[0] == 2:
+  import glob
+  libdir = glob.glob(os.path.join(ROOT_DIR, 'lib', 'py', 'build', 'lib.*'))[0]
+  sys.path.insert(0, libdir)
+  thrift = __import__('thrift')
+else:
+  sys.path.insert(0, os.path.join(ROOT_DIR, 'lib', 'py', 'build', 'lib'))
+  thrift = __import__('thrift')
diff --git a/test/features/tests.json b/test/features/tests.json
new file mode 100644
index 0000000..0836309
--- /dev/null
+++ b/test/features/tests.json
@@ -0,0 +1,27 @@
+[
+  {
+    "description": "THeader detects unframed binary wire format",
+    "name": "theader_unframed_binary",
+    "command": [
+      "python",
+      "theader_binary.py"
+    ],
+    "protocols": ["header"],
+    "transports": ["buffered"],
+    "sockets": ["ip"],
+    "workdir": "features"
+  },
+  {
+    "description": "THeader detects framed binary wire format",
+    "name": "theader_framed_binary",
+    "command": [
+      "python",
+      "theader_binary.py",
+      "--override-transport=framed"
+    ],
+    "protocols": ["header"],
+    "transports": ["buffered"],
+    "sockets": ["ip"],
+    "workdir": "features"
+  }
+]
diff --git a/test/features/theader_binary.py b/test/features/theader_binary.py
new file mode 100644
index 0000000..0316741
--- /dev/null
+++ b/test/features/theader_binary.py
@@ -0,0 +1,56 @@
+#!/usr/bin/env python
+
+import argparse
+import socket
+import sys
+
+from util import add_common_args
+from local_thrift import thrift
+from thrift.Thrift import TMessageType, TType
+from thrift.transport.TSocket import TSocket
+from thrift.transport.TTransport import TBufferedTransport, TFramedTransport
+from thrift.protocol.TBinaryProtocol import TBinaryProtocol
+
+
+# THeader stack should accept binary protocol with optionally framed transport
+def main(argv):
+  p = argparse.ArgumentParser()
+  add_common_args(p)
+  # Since THeaderTransport acts as framed transport when detected frame, we
+  # cannot use --transport=framed as it would result in 2 layered frames.
+  p.add_argument('--override-transport')
+  args = p.parse_args()
+  assert args.protocol == 'header'
+  assert args.transport == 'buffered'
+  assert not args.ssl
+
+  sock = TSocket(args.host, args.port, socket_family=socket.AF_INET)
+  if not args.override_transport or args.override_transport == 'buffered':
+    trans = TBufferedTransport(sock)
+  elif args.override_transport == 'framed':
+    trans = TFramedTransport(sock)
+  else:
+    raise ValueError('invalid transport')
+  trans.open()
+  proto = TBinaryProtocol(trans)
+  proto.writeMessageBegin('testVoid', TMessageType.CALL, 3)
+  proto.writeStructBegin('testVoid_args')
+  proto.writeFieldStop()
+  proto.writeStructEnd()
+  proto.writeMessageEnd()
+  trans.flush()
+
+  _, mtype, _ = proto.readMessageBegin()
+  assert mtype == TMessageType.REPLY
+  proto.readStructBegin()
+  _, ftype, _ = proto.readFieldBegin()
+  assert ftype == TType.STOP
+  proto.readFieldEnd()
+  proto.readStructEnd()
+  proto.readMessageEnd()
+
+  trans.close()
+
+
+if __name__ == '__main__':
+  sys.exit(main(sys.argv[1:]))
diff --git a/test/features/util.py b/test/features/util.py
new file mode 100644
index 0000000..cff7ff8
--- /dev/null
+++ b/test/features/util.py
@@ -0,0 +1,15 @@
+import argparse
+
+
+def add_common_args(p):
+  p.add_argument('--host', default='localhost')
+  p.add_argument('--port', type=int)
+  p.add_argument('--protocol')
+  p.add_argument('--transport')
+  p.add_argument('--ssl', action='store_true')
+
+
+def parse_common_args(argv):
+  p = argparse.ArgumentParser()
+  add_common_args(p)
+  return p.parse_args(argv)
diff --git a/test/test.py b/test/test.py
index 1176369..0c799b9 100755
--- a/test/test.py
+++ b/test/test.py
@@ -25,39 +25,102 @@
 # This script supports python 2.7 and later.
 # python 3.x is recommended for better stability.
 #
-# TODO: eliminate a few 2.7 occurrences to support 2.6 ?
-#
 
+from __future__ import print_function
+from itertools import chain
 import json
 import logging
 import multiprocessing
-import optparse
+import argparse
 import os
 import sys
 
 import crossrunner
+from crossrunner.compat import path_join
 
 TEST_DIR = os.path.realpath(os.path.dirname(__file__))
-CONFIG_PATH = os.path.join(TEST_DIR, 'tests.json')
+CONFIG_FILE = 'tests.json'
 
 
-def prepare(server_match, client_match):
-  with open(CONFIG_PATH, 'r') as fp:
-    j = json.load(fp)
-  return crossrunner.prepare(j, TEST_DIR, server_match, client_match)
-
-
-def run_tests(server_match, client_match, jobs, skip_known_failures):
+def run_tests(collect_func, basedir, server_match, client_match, jobs, skip):
   logger = multiprocessing.get_logger()
   logger.debug('Collecting tests')
-  with open(CONFIG_PATH, 'r') as fp:
+  with open(path_join(basedir, CONFIG_FILE), 'r') as fp:
     j = json.load(fp)
-  tests = list(crossrunner.collect_tests(j, server_match, client_match))
+  tests = collect_func(j, server_match, client_match)
+  if not tests:
+    print('No test found that matches the criteria', file=sys.stderr)
+    # print('  servers: %s' % server_match, file=sys.stderr)
+    # print('  clients: %s' % client_match, file=sys.stderr)
+    return False
+  if skip:
+    logger.debug('Skipping known failures')
+    known = crossrunner.load_known_failures(basedir)
+    tests = list(filter(lambda t: crossrunner.test_name(**t) not in known, tests))
+
+  dispatcher = crossrunner.TestDispatcher(TEST_DIR, basedir, jobs)
+  logger.debug('Executing %d tests' % len(tests))
+  try:
+    for r in [dispatcher.dispatch(test) for test in tests]:
+      r.wait()
+    logger.debug('Waiting for completion')
+    return dispatcher.wait()
+  except (KeyboardInterrupt, SystemExit):
+    logger.debug('Interrupted, shutting down')
+    dispatcher.terminate()
+    return False
+
+
+def run_cross_tests(server_match, client_match, jobs, skip_known_failures):
+  logger = multiprocessing.get_logger()
+  logger.debug('Collecting tests')
+  with open(path_join(TEST_DIR, CONFIG_FILE), 'r') as fp:
+    j = json.load(fp)
+  tests = crossrunner.collect_cross_tests(j, server_match, client_match)
+  if not tests:
+    print('No test found that matches the criteria', file=sys.stderr)
+    print('  servers: %s' % server_match, file=sys.stderr)
+    print('  clients: %s' % client_match, file=sys.stderr)
+    return False
   if skip_known_failures:
+    logger.debug('Skipping known failures')
     known = crossrunner.load_known_failures(TEST_DIR)
     tests = list(filter(lambda t: crossrunner.test_name(**t) not in known, tests))
 
-  dispatcher = crossrunner.TestDispatcher(TEST_DIR, jobs)
+  dispatcher = crossrunner.TestDispatcher(TEST_DIR, TEST_DIR, jobs)
+  logger.debug('Executing %d tests' % len(tests))
+  try:
+    for r in [dispatcher.dispatch(test) for test in tests]:
+      r.wait()
+    logger.debug('Waiting for completion')
+    return dispatcher.wait()
+  except (KeyboardInterrupt, SystemExit):
+    logger.debug('Interrupted, shutting down')
+    dispatcher.terminate()
+    return False
+
+
+def run_feature_tests(server_match, feature_match, jobs, skip_known_failures):
+  basedir = path_join(TEST_DIR, 'features')
+  # run_tests(crossrunner.collect_feature_tests, basedir, server_match, feature_match, jobs, skip_known_failures)
+  logger = multiprocessing.get_logger()
+  logger.debug('Collecting tests')
+  with open(path_join(TEST_DIR, CONFIG_FILE), 'r') as fp:
+    j = json.load(fp)
+  with open(path_join(basedir, CONFIG_FILE), 'r') as fp:
+    j2 = json.load(fp)
+  tests = crossrunner.collect_feature_tests(j, j2, server_match, feature_match)
+  if not tests:
+    print('No test found that matches the criteria', file=sys.stderr)
+    print('  servers: %s' % server_match, file=sys.stderr)
+    print('  features: %s' % feature_match, file=sys.stderr)
+    return False
+  if skip_known_failures:
+    logger.debug('Skipping known failures')
+    known = crossrunner.load_known_failures(basedir)
+    tests = list(filter(lambda t: crossrunner.test_name(**t) not in known, tests))
+
+  dispatcher = crossrunner.TestDispatcher(TEST_DIR, basedir, jobs)
   logger.debug('Executing %d tests' % len(tests))
   try:
     for r in [dispatcher.dispatch(test) for test in tests]:
@@ -79,44 +142,47 @@
 
 
 def main(argv):
-  parser = optparse.OptionParser()
-  parser.add_option('--server', type='string', dest='servers', default='',
-                    help='list of servers to test separated by commas, eg:- --server=cpp,java')
-  parser.add_option('--client', type='string', dest='clients', default='',
-                    help='list of clients to test separated by commas, eg:- --client=cpp,java')
-  parser.add_option('-s', '--skip-known-failures', action='store_true', dest='skip_known_failures',
-                    help='do not execute tests that are known to fail')
-  parser.add_option('-j', '--jobs', type='int', dest='jobs',
-                    default=default_concurrenty(),
-                    help='number of concurrent test executions')
-  g = optparse.OptionGroup(parser, 'Advanced')
-  g.add_option('-v', '--verbose', action='store_const',
-               dest='log_level', const=logging.DEBUG, default=logging.WARNING,
-               help='show debug output for test runner')
-  g.add_option('-P', '--print-expected-failures', choices=['merge', 'overwrite'],
-               dest='print_failures', default=None,
-               help="generate expected failures based on last result and print to stdout")
-  g.add_option('-U', '--update-expected-failures', choices=['merge', 'overwrite'],
-               dest='update_failures', default=None,
-               help="generate expected failures based on last result and save to default file location")
-  g.add_option('--prepare', action='store_true',
-               dest='prepare',
-               help="try to prepare files needed for cross test (experimental)")
-  parser.add_option_group(g)
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--server', default='', nargs='*',
+                      help='list of servers to test')
+  parser.add_argument('--client', default='', nargs='*',
+                      help='list of clients to test')
+  parser.add_argument('-s', '--skip-known-failures', action='store_true', dest='skip_known_failures',
+                      help='do not execute tests that are known to fail')
+  parser.add_argument('-j', '--jobs', type=int,
+                      default=default_concurrenty(),
+                      help='number of concurrent test executions')
+  parser.add_argument('-F', '--features', nargs='*', default=None,
+                      help='run feature tests instead of cross language tests')
+
+  g = parser.add_argument_group(title='Advanced')
+  g.add_argument('-v', '--verbose', action='store_const',
+                 dest='log_level', const=logging.DEBUG, default=logging.WARNING,
+                 help='show debug output for test runner')
+  g.add_argument('-P', '--print-expected-failures', choices=['merge', 'overwrite'],
+                 dest='print_failures',
+                 help="generate expected failures based on last result and print to stdout")
+  g.add_argument('-U', '--update-expected-failures', choices=['merge', 'overwrite'],
+                 dest='update_failures',
+                 help="generate expected failures based on last result and save to default file location")
+  options = parser.parse_args(argv)
+
   logger = multiprocessing.log_to_stderr()
-  options, _ = parser.parse_args(argv)
-  server_match = options.servers.split(',') if options.servers else []
-  client_match = options.clients.split(',') if options.clients else []
   logger.setLevel(options.log_level)
 
-  if options.prepare:
-    res = prepare(server_match, client_match)
-  elif options.update_failures or options.print_failures:
+  # Allow multiple args separated with ',' for backward compatibility
+  server_match = list(chain(*[x.split(',') for x in options.server]))
+  client_match = list(chain(*[x.split(',') for x in options.client]))
+
+  if options.update_failures or options.print_failures:
     res = crossrunner.generate_known_failures(
         TEST_DIR, options.update_failures == 'overwrite',
         options.update_failures, options.print_failures)
+  elif options.features is not None:
+    features = options.features or ['.*']
+    res = run_feature_tests(server_match, features, options.jobs, options.skip_known_failures)
   else:
-    res = run_tests(server_match, client_match, options.jobs, options.skip_known_failures)
+    res = run_cross_tests(server_match, client_match, options.jobs, options.skip_known_failures)
   return 0 if res else 1
 
 if __name__ == '__main__':