Initial copy of api tests from tempest

This change is the result of running
tools/copy_api_tests_from_tempest.sh.

Change-Id: Ica02dbe1ed26f1bc9526ea9682756ebc5877cf4a
diff --git a/neutron/tests/tempest/test.py b/neutron/tests/tempest/test.py
new file mode 100644
index 0000000..27975b3
--- /dev/null
+++ b/neutron/tests/tempest/test.py
@@ -0,0 +1,674 @@
+# Copyright 2012 OpenStack Foundation
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import atexit
+import functools
+import json
+import os
+import re
+import sys
+import time
+import urllib
+import uuid
+
+import fixtures
+import six
+import testscenarios
+import testtools
+
+from neutron.tests.api.contrib import clients
+from neutron.tests.tempest.common import credentials
+import neutron.tests.tempest.common.generator.valid_generator as valid
+from neutron.tests.tempest import config
+from neutron.tests.tempest import exceptions
+from oslo_utils import importutils
+from neutron.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+CONF = config.CONF
+
+
+def attr(*args, **kwargs):
+    """A decorator which applies the  testtools attr decorator
+
+    This decorator applies the testtools.testcase.attr if it is in the list of
+    attributes to testtools we want to apply.
+    """
+
+    def decorator(f):
+        if 'type' in kwargs and isinstance(kwargs['type'], str):
+            f = testtools.testcase.attr(kwargs['type'])(f)
+            if kwargs['type'] == 'smoke':
+                f = testtools.testcase.attr('gate')(f)
+        elif 'type' in kwargs and isinstance(kwargs['type'], list):
+            for attr in kwargs['type']:
+                f = testtools.testcase.attr(attr)(f)
+                if attr == 'smoke':
+                    f = testtools.testcase.attr('gate')(f)
+        return f
+
+    return decorator
+
+
+def idempotent_id(id):
+    """Stub for metadata decorator"""
+    if not isinstance(id, six.string_types):
+        raise TypeError('Test idempotent_id must be string not %s'
+                        '' % type(id).__name__)
+    uuid.UUID(id)
+
+    def decorator(f):
+        f = testtools.testcase.attr('id-%s' % id)(f)
+        if f.__doc__:
+            f.__doc__ = 'Test idempotent id: %s\n%s' % (id, f.__doc__)
+        else:
+            f.__doc__ = 'Test idempotent id: %s' % id
+        return f
+    return decorator
+
+
+def get_service_list():
+    service_list = {
+        'compute': CONF.service_available.nova,
+        'image': CONF.service_available.glance,
+        'baremetal': CONF.service_available.ironic,
+        'volume': CONF.service_available.cinder,
+        'orchestration': CONF.service_available.heat,
+        # NOTE(mtreinish) nova-network will provide networking functionality
+        # if neutron isn't available, so always set to True.
+        'network': True,
+        'identity': True,
+        'object_storage': CONF.service_available.swift,
+        'dashboard': CONF.service_available.horizon,
+        'telemetry': CONF.service_available.ceilometer,
+        'data_processing': CONF.service_available.sahara
+    }
+    return service_list
+
+
+def services(*args, **kwargs):
+    """A decorator used to set an attr for each service used in a test case
+
+    This decorator applies a testtools attr for each service that gets
+    exercised by a test case.
+    """
+    def decorator(f):
+        services = ['compute', 'image', 'baremetal', 'volume', 'orchestration',
+                    'network', 'identity', 'object_storage', 'dashboard',
+                    'telemetry', 'data_processing']
+        for service in args:
+            if service not in services:
+                raise exceptions.InvalidServiceTag('%s is not a valid '
+                                                   'service' % service)
+        attr(type=list(args))(f)
+
+        @functools.wraps(f)
+        def wrapper(self, *func_args, **func_kwargs):
+            service_list = get_service_list()
+
+            for service in args:
+                if not service_list[service]:
+                    msg = 'Skipped because the %s service is not available' % (
+                        service)
+                    raise testtools.TestCase.skipException(msg)
+            return f(self, *func_args, **func_kwargs)
+        return wrapper
+    return decorator
+
+
+def stresstest(*args, **kwargs):
+    """Add stress test decorator
+
+    For all functions with this decorator a attr stress will be
+    set automatically.
+
+    @param class_setup_per: allowed values are application, process, action
+           ``application``: once in the stress job lifetime
+           ``process``: once in the worker process lifetime
+           ``action``: on each action
+    @param allow_inheritance: allows inheritance of this attribute
+    """
+    def decorator(f):
+        if 'class_setup_per' in kwargs:
+            setattr(f, "st_class_setup_per", kwargs['class_setup_per'])
+        else:
+            setattr(f, "st_class_setup_per", 'process')
+        if 'allow_inheritance' in kwargs:
+            setattr(f, "st_allow_inheritance", kwargs['allow_inheritance'])
+        else:
+            setattr(f, "st_allow_inheritance", False)
+        attr(type='stress')(f)
+        return f
+    return decorator
+
+
+def requires_ext(*args, **kwargs):
+    """A decorator to skip tests if an extension is not enabled
+
+    @param extension
+    @param service
+    """
+    def decorator(func):
+        @functools.wraps(func)
+        def wrapper(*func_args, **func_kwargs):
+            if not is_extension_enabled(kwargs['extension'],
+                                        kwargs['service']):
+                msg = "Skipped because %s extension: %s is not enabled" % (
+                    kwargs['service'], kwargs['extension'])
+                raise testtools.TestCase.skipException(msg)
+            return func(*func_args, **func_kwargs)
+        return wrapper
+    return decorator
+
+
+def is_extension_enabled(extension_name, service):
+    """A function that will check the list of enabled extensions from config
+
+    """
+    config_dict = {
+        'compute': CONF.compute_feature_enabled.api_extensions,
+        'volume': CONF.volume_feature_enabled.api_extensions,
+        'network': CONF.network_feature_enabled.api_extensions,
+        'object': CONF.object_storage_feature_enabled.discoverable_apis,
+    }
+    if len(config_dict[service]) == 0:
+        return False
+    if config_dict[service][0] == 'all':
+        return True
+    if extension_name in config_dict[service]:
+        return True
+    return False
+
+
+at_exit_set = set()
+
+
+def validate_tearDownClass():
+    if at_exit_set:
+        LOG.error(
+            "tearDownClass does not call the super's "
+            "tearDownClass in these classes: \n"
+            + str(at_exit_set))
+
+
+atexit.register(validate_tearDownClass)
+
+
+class BaseTestCase(testtools.testcase.WithAttributes,
+                   testtools.TestCase):
+    """The test base class defines Tempest framework for class level fixtures.
+    `setUpClass` and `tearDownClass` are defined here and cannot be overwritten
+    by subclasses (enforced via hacking rule T105).
+
+    Set-up is split in a series of steps (setup stages), which can be
+    overwritten by test classes. Set-up stages are:
+    - skip_checks
+    - setup_credentials
+    - setup_clients
+    - resource_setup
+
+    Tear-down is also split in a series of steps (teardown stages), which are
+    stacked for execution only if the corresponding setup stage had been
+    reached during the setup phase. Tear-down stages are:
+    - clear_isolated_creds (defined in the base test class)
+    - resource_cleanup
+    """
+
+    setUpClassCalled = False
+    _service = None
+
+    network_resources = {}
+
+    # NOTE(sdague): log_format is defined inline here instead of using the oslo
+    # default because going through the config path recouples config to the
+    # stress tests too early, and depending on testr order will fail unit tests
+    log_format = ('%(asctime)s %(process)d %(levelname)-8s '
+                  '[%(name)s] %(message)s')
+
+    @classmethod
+    def setUpClass(cls):
+        # It should never be overridden by descendants
+        if hasattr(super(BaseTestCase, cls), 'setUpClass'):
+            super(BaseTestCase, cls).setUpClass()
+        cls.setUpClassCalled = True
+        # Stack of (name, callable) to be invoked in reverse order at teardown
+        cls.teardowns = []
+        # All the configuration checks that may generate a skip
+        cls.skip_checks()
+        try:
+            # Allocation of all required credentials and client managers
+            cls.teardowns.append(('credentials', cls.clear_isolated_creds))
+            cls.setup_credentials()
+            # Shortcuts to clients
+            cls.setup_clients()
+            # Additional class-wide test resources
+            cls.teardowns.append(('resources', cls.resource_cleanup))
+            cls.resource_setup()
+        except Exception:
+            etype, value, trace = sys.exc_info()
+            LOG.info("%s raised in %s.setUpClass. Invoking tearDownClass." % (
+                     etype, cls.__name__))
+            cls.tearDownClass()
+            try:
+                raise etype, value, trace
+            finally:
+                del trace  # to avoid circular refs
+
+    @classmethod
+    def tearDownClass(cls):
+        at_exit_set.discard(cls)
+        # It should never be overridden by descendants
+        if hasattr(super(BaseTestCase, cls), 'tearDownClass'):
+            super(BaseTestCase, cls).tearDownClass()
+        # Save any existing exception, we always want to re-raise the original
+        # exception only
+        etype, value, trace = sys.exc_info()
+        # If there was no exception during setup we shall re-raise the first
+        # exception in teardown
+        re_raise = (etype is None)
+        while cls.teardowns:
+            name, teardown = cls.teardowns.pop()
+            # Catch any exception in tearDown so we can re-raise the original
+            # exception at the end
+            try:
+                teardown()
+            except Exception as te:
+                sys_exec_info = sys.exc_info()
+                tetype = sys_exec_info[0]
+                # TODO(andreaf): Till we have the ability to cleanup only
+                # resources that were successfully setup in resource_cleanup,
+                # log AttributeError as info instead of exception.
+                if tetype is AttributeError and name == 'resources':
+                    LOG.info("tearDownClass of %s failed: %s" % (name, te))
+                else:
+                    LOG.exception("teardown of %s failed: %s" % (name, te))
+                if not etype:
+                    etype, value, trace = sys_exec_info
+        # If exceptions were raised during teardown, an not before, re-raise
+        # the first one
+        if re_raise and etype is not None:
+            try:
+                raise etype, value, trace
+            finally:
+                del trace  # to avoid circular refs
+
+    @classmethod
+    def skip_checks(cls):
+        """Class level skip checks. Subclasses verify in here all
+        conditions that might prevent the execution of the entire test class.
+        Checks implemented here may not make use API calls, and should rely on
+        configuration alone.
+        In general skip checks that require an API call are discouraged.
+        If one is really needed it may be implemented either in the
+        resource_setup or at test level.
+        """
+        pass
+
+    @classmethod
+    def setup_credentials(cls):
+        """Allocate credentials and the client managers from them."""
+        # TODO(andreaf) There is a fair amount of code that could me moved from
+        # base / test classes in here. Ideally tests should be able to only
+        # specify a list of (additional) credentials the need to use.
+        pass
+
+    @classmethod
+    def setup_clients(cls):
+        """Create links to the clients into the test object."""
+        # TODO(andreaf) There is a fair amount of code that could me moved from
+        # base / test classes in here. Ideally tests should be able to only
+        # specify which client is `client` and nothing else.
+        pass
+
+    @classmethod
+    def resource_setup(cls):
+        """Class level resource setup for test cases.
+        """
+        pass
+
+    @classmethod
+    def resource_cleanup(cls):
+        """Class level resource cleanup for test cases.
+        Resource cleanup must be able to handle the case of partially setup
+        resources, in case a failure during `resource_setup` should happen.
+        """
+        pass
+
+    def setUp(self):
+        super(BaseTestCase, self).setUp()
+        if not self.setUpClassCalled:
+            raise RuntimeError("setUpClass does not calls the super's"
+                               "setUpClass in the "
+                               + self.__class__.__name__)
+        at_exit_set.add(self.__class__)
+        test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
+        try:
+            test_timeout = int(test_timeout)
+        except ValueError:
+            test_timeout = 0
+        if test_timeout > 0:
+            self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
+
+        if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
+                os.environ.get('OS_STDOUT_CAPTURE') == '1'):
+            stdout = self.useFixture(fixtures.StringStream('stdout')).stream
+            self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
+        if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
+                os.environ.get('OS_STDERR_CAPTURE') == '1'):
+            stderr = self.useFixture(fixtures.StringStream('stderr')).stream
+            self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
+        if (os.environ.get('OS_LOG_CAPTURE') != 'False' and
+            os.environ.get('OS_LOG_CAPTURE') != '0'):
+            self.useFixture(fixtures.LoggerFixture(nuke_handlers=False,
+                                                   format=self.log_format,
+                                                   level=None))
+
+    @classmethod
+    def get_client_manager(cls):
+        """
+        Returns an OpenStack client manager
+        """
+        force_tenant_isolation = getattr(cls, 'force_tenant_isolation', None)
+
+        if (not hasattr(cls, 'isolated_creds') or
+            not cls.isolated_creds.name == cls.__name__):
+            cls.isolated_creds = credentials.get_isolated_credentials(
+                name=cls.__name__, network_resources=cls.network_resources,
+                force_tenant_isolation=force_tenant_isolation,
+            )
+
+        creds = cls.isolated_creds.get_primary_creds()
+        os = clients.Manager(credentials=creds, service=cls._service)
+        return os
+
+    @classmethod
+    def clear_isolated_creds(cls):
+        """
+        Clears isolated creds if set
+        """
+        if hasattr(cls, 'isolated_creds'):
+            cls.isolated_creds.clear_isolated_creds()
+
+    @classmethod
+    def _get_identity_admin_client(cls):
+        """
+        Returns an instance of the Identity Admin API client
+        """
+        os = clients.AdminManager(service=cls._service)
+        admin_client = os.identity_client
+        return admin_client
+
+    @classmethod
+    def set_network_resources(cls, network=False, router=False, subnet=False,
+                              dhcp=False):
+        """Specify which network resources should be created
+
+        @param network
+        @param router
+        @param subnet
+        @param dhcp
+        """
+        # network resources should be set only once from callers
+        # in order to ensure that even if it's called multiple times in
+        # a chain of overloaded methods, the attribute is set only
+        # in the leaf class
+        if not cls.network_resources:
+            cls.network_resources = {
+                'network': network,
+                'router': router,
+                'subnet': subnet,
+                'dhcp': dhcp}
+
+    def assertEmpty(self, list, msg=None):
+        self.assertTrue(len(list) == 0, msg)
+
+    def assertNotEmpty(self, list, msg=None):
+        self.assertTrue(len(list) > 0, msg)
+
+
+class NegativeAutoTest(BaseTestCase):
+
+    _resources = {}
+
+    @classmethod
+    def setUpClass(cls):
+        super(NegativeAutoTest, cls).setUpClass()
+        os = cls.get_client_manager()
+        cls.client = os.negative_client
+        os_admin = clients.AdminManager(service=cls._service)
+        cls.admin_client = os_admin.negative_client
+
+    @staticmethod
+    def load_tests(*args):
+        """
+        Wrapper for testscenarios to set the mandatory scenarios variable
+        only in case a real test loader is in place. Will be automatically
+        called in case the variable "load_tests" is set.
+        """
+        if getattr(args[0], 'suiteClass', None) is not None:
+            loader, standard_tests, pattern = args
+        else:
+            standard_tests, module, loader = args
+        for test in testtools.iterate_tests(standard_tests):
+            schema = getattr(test, '_schema', None)
+            if schema is not None:
+                setattr(test, 'scenarios',
+                        NegativeAutoTest.generate_scenario(schema))
+        return testscenarios.load_tests_apply_scenarios(*args)
+
+    @staticmethod
+    def generate_scenario(description):
+        """
+        Generates the test scenario list for a given description.
+
+        :param description: A file or dictionary with the following entries:
+            name (required) name for the api
+            http-method (required) one of HEAD,GET,PUT,POST,PATCH,DELETE
+            url (required) the url to be appended to the catalog url with '%s'
+                for each resource mentioned
+            resources: (optional) A list of resource names such as "server",
+                "flavor", etc. with an element for each '%s' in the url. This
+                method will call self.get_resource for each element when
+                constructing the positive test case template so negative
+                subclasses are expected to return valid resource ids when
+                appropriate.
+            json-schema (optional) A valid json schema that will be used to
+                create invalid data for the api calls. For "GET" and "HEAD",
+                the data is used to generate query strings appended to the url,
+                otherwise for the body of the http call.
+        """
+        LOG.debug(description)
+        generator = importutils.import_class(
+            CONF.negative.test_generator)()
+        generator.validate_schema(description)
+        schema = description.get("json-schema", None)
+        resources = description.get("resources", [])
+        scenario_list = []
+        expected_result = None
+        for resource in resources:
+            if isinstance(resource, dict):
+                expected_result = resource['expected_result']
+                resource = resource['name']
+            LOG.debug("Add resource to test %s" % resource)
+            scn_name = "inv_res_%s" % (resource)
+            scenario_list.append((scn_name, {"resource": (resource,
+                                                          str(uuid.uuid4())),
+                                             "expected_result": expected_result
+                                             }))
+        if schema is not None:
+            for scenario in generator.generate_scenarios(schema):
+                scenario_list.append((scenario['_negtest_name'],
+                                      scenario))
+        LOG.debug(scenario_list)
+        return scenario_list
+
+    def execute(self, description):
+        """
+        Execute a http call on an api that are expected to
+        result in client errors. First it uses invalid resources that are part
+        of the url, and then invalid data for queries and http request bodies.
+
+        :param description: A json file or dictionary with the following
+        entries:
+            name (required) name for the api
+            http-method (required) one of HEAD,GET,PUT,POST,PATCH,DELETE
+            url (required) the url to be appended to the catalog url with '%s'
+                for each resource mentioned
+            resources: (optional) A list of resource names such as "server",
+                "flavor", etc. with an element for each '%s' in the url. This
+                method will call self.get_resource for each element when
+                constructing the positive test case template so negative
+                subclasses are expected to return valid resource ids when
+                appropriate.
+            json-schema (optional) A valid json schema that will be used to
+                create invalid data for the api calls. For "GET" and "HEAD",
+                the data is used to generate query strings appended to the url,
+                otherwise for the body of the http call.
+
+        """
+        LOG.info("Executing %s" % description["name"])
+        LOG.debug(description)
+        generator = importutils.import_class(
+            CONF.negative.test_generator)()
+        schema = description.get("json-schema", None)
+        method = description["http-method"]
+        url = description["url"]
+        expected_result = None
+        if "default_result_code" in description:
+            expected_result = description["default_result_code"]
+
+        resources = [self.get_resource(r) for
+                     r in description.get("resources", [])]
+
+        if hasattr(self, "resource"):
+            # Note(mkoderer): The resources list already contains an invalid
+            # entry (see get_resource).
+            # We just send a valid json-schema with it
+            valid_schema = None
+            if schema:
+                valid_schema = \
+                    valid.ValidTestGenerator().generate_valid(schema)
+            new_url, body = self._http_arguments(valid_schema, url, method)
+        elif hasattr(self, "_negtest_name"):
+            schema_under_test = \
+                valid.ValidTestGenerator().generate_valid(schema)
+            local_expected_result = \
+                generator.generate_payload(self, schema_under_test)
+            if local_expected_result is not None:
+                expected_result = local_expected_result
+            new_url, body = \
+                self._http_arguments(schema_under_test, url, method)
+        else:
+            raise Exception("testscenarios are not active. Please make sure "
+                            "that your test runner supports the load_tests "
+                            "mechanism")
+
+        if "admin_client" in description and description["admin_client"]:
+            client = self.admin_client
+        else:
+            client = self.client
+        resp, resp_body = client.send_request(method, new_url,
+                                              resources, body=body)
+        self._check_negative_response(expected_result, resp.status, resp_body)
+
+    def _http_arguments(self, json_dict, url, method):
+        LOG.debug("dict: %s url: %s method: %s" % (json_dict, url, method))
+        if not json_dict:
+            return url, None
+        elif method in ["GET", "HEAD", "PUT", "DELETE"]:
+            return "%s?%s" % (url, urllib.urlencode(json_dict)), None
+        else:
+            return url, json.dumps(json_dict)
+
+    def _check_negative_response(self, expected_result, result, body):
+        self.assertTrue(result >= 400 and result < 500 and result != 413,
+                        "Expected client error, got %s:%s" %
+                        (result, body))
+        self.assertTrue(expected_result is None or expected_result == result,
+                        "Expected %s, got %s:%s" %
+                        (expected_result, result, body))
+
+    @classmethod
+    def set_resource(cls, name, resource):
+        """
+        This function can be used in setUpClass context to register a resoruce
+        for a test.
+
+        :param name: The name of the kind of resource such as "flavor", "role",
+            etc.
+        :resource: The id of the resource
+        """
+        cls._resources[name] = resource
+
+    def get_resource(self, name):
+        """
+        Return a valid uuid for a type of resource. If a real resource is
+        needed as part of a url then this method should return one. Otherwise
+        it can return None.
+
+        :param name: The name of the kind of resource such as "flavor", "role",
+            etc.
+        """
+        if isinstance(name, dict):
+            name = name['name']
+        if hasattr(self, "resource") and self.resource[0] == name:
+            LOG.debug("Return invalid resource (%s) value: %s" %
+                      (self.resource[0], self.resource[1]))
+            return self.resource[1]
+        if name in self._resources:
+            return self._resources[name]
+        return None
+
+
+def SimpleNegativeAutoTest(klass):
+    """
+    This decorator registers a test function on basis of the class name.
+    """
+    @attr(type=['negative', 'gate'])
+    def generic_test(self):
+        if hasattr(self, '_schema'):
+            self.execute(self._schema)
+
+    cn = klass.__name__
+    cn = cn.replace('JSON', '')
+    cn = cn.replace('Test', '')
+    # NOTE(mkoderer): replaces uppercase chars inside the class name with '_'
+    lower_cn = re.sub('(?<!^)(?=[A-Z])', '_', cn).lower()
+    func_name = 'test_%s' % lower_cn
+    setattr(klass, func_name, generic_test)
+    return klass
+
+
+def call_until_true(func, duration, sleep_for):
+    """
+    Call the given function until it returns True (and return True) or
+    until the specified duration (in seconds) elapses (and return
+    False).
+
+    :param func: A zero argument callable that returns True on success.
+    :param duration: The number of seconds for which to attempt a
+        successful call of the function.
+    :param sleep_for: The number of seconds to sleep after an unsuccessful
+                      invocation of the function.
+    """
+    now = time.time()
+    timeout = now + duration
+    while now < timeout:
+        if func():
+            return True
+        time.sleep(sleep_for)
+        now = time.time()
+    return False