| # Copyright 2012 OpenStack Foundation |
| # All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| |
| import atexit |
| import functools |
| import json |
| import os |
| import re |
| import sys |
| import time |
| import uuid |
| |
| import fixtures |
| from oslo_log import log as logging |
| from oslo_utils import importutils |
| import six |
| from six.moves.urllib import parse |
| import testscenarios |
| import testtools |
| |
| from neutron.tests.api import clients |
| from neutron.tests.tempest.common import credentials |
| import neutron.tests.tempest.common.generator.valid_generator as valid |
| from neutron.tests.tempest import config |
| from neutron.tests.tempest import exceptions |
| |
| LOG = logging.getLogger(__name__) |
| |
| CONF = config.CONF |
| |
| |
| def attr(*args, **kwargs): |
| """A decorator which applies the testtools attr decorator |
| |
| This decorator applies the testtools.testcase.attr if it is in the list of |
| attributes to testtools we want to apply. |
| """ |
| |
| def decorator(f): |
| if 'type' in kwargs and isinstance(kwargs['type'], str): |
| f = testtools.testcase.attr(kwargs['type'])(f) |
| if kwargs['type'] == 'smoke': |
| f = testtools.testcase.attr('gate')(f) |
| elif 'type' in kwargs and isinstance(kwargs['type'], list): |
| for attr in kwargs['type']: |
| f = testtools.testcase.attr(attr)(f) |
| if attr == 'smoke': |
| f = testtools.testcase.attr('gate')(f) |
| return f |
| |
| return decorator |
| |
| |
| def idempotent_id(id): |
| """Stub for metadata decorator""" |
| if not isinstance(id, six.string_types): |
| raise TypeError('Test idempotent_id must be string not %s' |
| '' % type(id).__name__) |
| uuid.UUID(id) |
| |
| def decorator(f): |
| f = testtools.testcase.attr('id-%s' % id)(f) |
| if f.__doc__: |
| f.__doc__ = 'Test idempotent id: %s\n%s' % (id, f.__doc__) |
| else: |
| f.__doc__ = 'Test idempotent id: %s' % id |
| return f |
| return decorator |
| |
| |
| def get_service_list(): |
| service_list = { |
| 'compute': CONF.service_available.nova, |
| 'image': CONF.service_available.glance, |
| 'baremetal': CONF.service_available.ironic, |
| 'volume': CONF.service_available.cinder, |
| 'orchestration': CONF.service_available.heat, |
| # NOTE(mtreinish) nova-network will provide networking functionality |
| # if neutron isn't available, so always set to True. |
| 'network': True, |
| 'identity': True, |
| 'object_storage': CONF.service_available.swift, |
| 'dashboard': CONF.service_available.horizon, |
| 'telemetry': CONF.service_available.ceilometer, |
| 'data_processing': CONF.service_available.sahara, |
| 'database': CONF.service_available.trove |
| } |
| return service_list |
| |
| |
| def services(*args, **kwargs): |
| """A decorator used to set an attr for each service used in a test case |
| |
| This decorator applies a testtools attr for each service that gets |
| exercised by a test case. |
| """ |
| def decorator(f): |
| services = ['compute', 'image', 'baremetal', 'volume', 'orchestration', |
| 'network', 'identity', 'object_storage', 'dashboard', |
| 'telemetry', 'data_processing', 'database'] |
| for service in args: |
| if service not in services: |
| raise exceptions.InvalidServiceTag('%s is not a valid ' |
| 'service' % service) |
| attr(type=list(args))(f) |
| |
| @functools.wraps(f) |
| def wrapper(self, *func_args, **func_kwargs): |
| service_list = get_service_list() |
| |
| for service in args: |
| if not service_list[service]: |
| msg = 'Skipped because the %s service is not available' % ( |
| service) |
| raise testtools.TestCase.skipException(msg) |
| return f(self, *func_args, **func_kwargs) |
| return wrapper |
| return decorator |
| |
| |
| def stresstest(*args, **kwargs): |
| """Add stress test decorator |
| |
| For all functions with this decorator a attr stress will be |
| set automatically. |
| |
| @param class_setup_per: allowed values are application, process, action |
| ``application``: once in the stress job lifetime |
| ``process``: once in the worker process lifetime |
| ``action``: on each action |
| @param allow_inheritance: allows inheritance of this attribute |
| """ |
| def decorator(f): |
| if 'class_setup_per' in kwargs: |
| setattr(f, "st_class_setup_per", kwargs['class_setup_per']) |
| else: |
| setattr(f, "st_class_setup_per", 'process') |
| if 'allow_inheritance' in kwargs: |
| setattr(f, "st_allow_inheritance", kwargs['allow_inheritance']) |
| else: |
| setattr(f, "st_allow_inheritance", False) |
| attr(type='stress')(f) |
| return f |
| return decorator |
| |
| |
| def requires_ext(*args, **kwargs): |
| """A decorator to skip tests if an extension is not enabled |
| |
| @param extension |
| @param service |
| """ |
| def decorator(func): |
| @functools.wraps(func) |
| def wrapper(*func_args, **func_kwargs): |
| if not is_extension_enabled(kwargs['extension'], |
| kwargs['service']): |
| msg = "Skipped because %s extension: %s is not enabled" % ( |
| kwargs['service'], kwargs['extension']) |
| raise testtools.TestCase.skipException(msg) |
| return func(*func_args, **func_kwargs) |
| return wrapper |
| return decorator |
| |
| |
| def is_extension_enabled(extension_name, service): |
| """A function that will check the list of enabled extensions from config |
| |
| """ |
| config_dict = { |
| 'compute': CONF.compute_feature_enabled.api_extensions, |
| 'volume': CONF.volume_feature_enabled.api_extensions, |
| 'network': CONF.network_feature_enabled.api_extensions, |
| 'object': CONF.object_storage_feature_enabled.discoverable_apis, |
| } |
| if len(config_dict[service]) == 0: |
| return False |
| if config_dict[service][0] == 'all': |
| return True |
| if extension_name in config_dict[service]: |
| return True |
| return False |
| |
| |
| at_exit_set = set() |
| |
| |
| def validate_tearDownClass(): |
| if at_exit_set: |
| LOG.error( |
| "tearDownClass does not call the super's " |
| "tearDownClass in these classes: \n" |
| + str(at_exit_set)) |
| |
| |
| atexit.register(validate_tearDownClass) |
| |
| |
| class BaseTestCase(testtools.testcase.WithAttributes, |
| testtools.TestCase): |
| """The test base class defines Tempest framework for class level fixtures. |
| `setUpClass` and `tearDownClass` are defined here and cannot be overwritten |
| by subclasses (enforced via hacking rule T105). |
| |
| Set-up is split in a series of steps (setup stages), which can be |
| overwritten by test classes. Set-up stages are: |
| - skip_checks |
| - setup_credentials |
| - setup_clients |
| - resource_setup |
| |
| Tear-down is also split in a series of steps (teardown stages), which are |
| stacked for execution only if the corresponding setup stage had been |
| reached during the setup phase. Tear-down stages are: |
| - clear_isolated_creds (defined in the base test class) |
| - resource_cleanup |
| """ |
| |
| setUpClassCalled = False |
| _service = None |
| |
| network_resources = {} |
| |
| # NOTE(sdague): log_format is defined inline here instead of using the oslo |
| # default because going through the config path recouples config to the |
| # stress tests too early, and depending on testr order will fail unit tests |
| log_format = ('%(asctime)s %(process)d %(levelname)-8s ' |
| '[%(name)s] %(message)s') |
| |
| @classmethod |
| def setUpClass(cls): |
| # It should never be overridden by descendants |
| if hasattr(super(BaseTestCase, cls), 'setUpClass'): |
| super(BaseTestCase, cls).setUpClass() |
| cls.setUpClassCalled = True |
| # Stack of (name, callable) to be invoked in reverse order at teardown |
| cls.teardowns = [] |
| # All the configuration checks that may generate a skip |
| cls.skip_checks() |
| try: |
| # Allocation of all required credentials and client managers |
| cls.teardowns.append(('credentials', cls.clear_isolated_creds)) |
| cls.setup_credentials() |
| # Shortcuts to clients |
| cls.setup_clients() |
| # Additional class-wide test resources |
| cls.teardowns.append(('resources', cls.resource_cleanup)) |
| cls.resource_setup() |
| except Exception: |
| etype, value, trace = sys.exc_info() |
| LOG.info("%s raised in %s.setUpClass. Invoking tearDownClass." % ( |
| etype, cls.__name__)) |
| cls.tearDownClass() |
| try: |
| raise etype, value, trace |
| finally: |
| del trace # to avoid circular refs |
| |
| @classmethod |
| def tearDownClass(cls): |
| at_exit_set.discard(cls) |
| # It should never be overridden by descendants |
| if hasattr(super(BaseTestCase, cls), 'tearDownClass'): |
| super(BaseTestCase, cls).tearDownClass() |
| # Save any existing exception, we always want to re-raise the original |
| # exception only |
| etype, value, trace = sys.exc_info() |
| # If there was no exception during setup we shall re-raise the first |
| # exception in teardown |
| re_raise = (etype is None) |
| while cls.teardowns: |
| name, teardown = cls.teardowns.pop() |
| # Catch any exception in tearDown so we can re-raise the original |
| # exception at the end |
| try: |
| teardown() |
| except Exception as te: |
| sys_exec_info = sys.exc_info() |
| tetype = sys_exec_info[0] |
| # TODO(andreaf): Till we have the ability to cleanup only |
| # resources that were successfully setup in resource_cleanup, |
| # log AttributeError as info instead of exception. |
| if tetype is AttributeError and name == 'resources': |
| LOG.info("tearDownClass of %s failed: %s" % (name, te)) |
| else: |
| LOG.exception("teardown of %s failed: %s" % (name, te)) |
| if not etype: |
| etype, value, trace = sys_exec_info |
| # If exceptions were raised during teardown, an not before, re-raise |
| # the first one |
| if re_raise and etype is not None: |
| try: |
| raise etype, value, trace |
| finally: |
| del trace # to avoid circular refs |
| |
| @classmethod |
| def skip_checks(cls): |
| """Class level skip checks. Subclasses verify in here all |
| conditions that might prevent the execution of the entire test class. |
| Checks implemented here may not make use API calls, and should rely on |
| configuration alone. |
| In general skip checks that require an API call are discouraged. |
| If one is really needed it may be implemented either in the |
| resource_setup or at test level. |
| """ |
| pass |
| |
| @classmethod |
| def setup_credentials(cls): |
| """Allocate credentials and the client managers from them.""" |
| # TODO(andreaf) There is a fair amount of code that could me moved from |
| # base / test classes in here. Ideally tests should be able to only |
| # specify a list of (additional) credentials the need to use. |
| pass |
| |
| @classmethod |
| def setup_clients(cls): |
| """Create links to the clients into the test object.""" |
| # TODO(andreaf) There is a fair amount of code that could me moved from |
| # base / test classes in here. Ideally tests should be able to only |
| # specify which client is `client` and nothing else. |
| pass |
| |
| @classmethod |
| def resource_setup(cls): |
| """Class level resource setup for test cases. |
| """ |
| pass |
| |
| @classmethod |
| def resource_cleanup(cls): |
| """Class level resource cleanup for test cases. |
| Resource cleanup must be able to handle the case of partially setup |
| resources, in case a failure during `resource_setup` should happen. |
| """ |
| pass |
| |
| def setUp(self): |
| super(BaseTestCase, self).setUp() |
| if not self.setUpClassCalled: |
| raise RuntimeError("setUpClass does not calls the super's" |
| "setUpClass in the " |
| + self.__class__.__name__) |
| at_exit_set.add(self.__class__) |
| test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0) |
| try: |
| test_timeout = int(test_timeout) |
| except ValueError: |
| test_timeout = 0 |
| if test_timeout > 0: |
| self.useFixture(fixtures.Timeout(test_timeout, gentle=True)) |
| |
| if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or |
| os.environ.get('OS_STDOUT_CAPTURE') == '1'): |
| stdout = self.useFixture(fixtures.StringStream('stdout')).stream |
| self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout)) |
| if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or |
| os.environ.get('OS_STDERR_CAPTURE') == '1'): |
| stderr = self.useFixture(fixtures.StringStream('stderr')).stream |
| self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr)) |
| if (os.environ.get('OS_LOG_CAPTURE') != 'False' and |
| os.environ.get('OS_LOG_CAPTURE') != '0'): |
| self.useFixture(fixtures.LoggerFixture(nuke_handlers=False, |
| format=self.log_format, |
| level=None)) |
| |
| @classmethod |
| def get_client_manager(cls): |
| """ |
| Returns an OpenStack client manager |
| """ |
| force_tenant_isolation = getattr(cls, 'force_tenant_isolation', None) |
| |
| if (not hasattr(cls, 'isolated_creds') or |
| not cls.isolated_creds.name == cls.__name__): |
| cls.isolated_creds = credentials.get_isolated_credentials( |
| name=cls.__name__, network_resources=cls.network_resources, |
| force_tenant_isolation=force_tenant_isolation, |
| ) |
| |
| creds = cls.isolated_creds.get_primary_creds() |
| os = clients.Manager(credentials=creds, service=cls._service) |
| return os |
| |
| @classmethod |
| def clear_isolated_creds(cls): |
| """ |
| Clears isolated creds if set |
| """ |
| if hasattr(cls, 'isolated_creds'): |
| cls.isolated_creds.clear_isolated_creds() |
| |
| @classmethod |
| def _get_identity_admin_client(cls): |
| """ |
| Returns an instance of the Identity Admin API client |
| """ |
| os = clients.AdminManager(service=cls._service) |
| admin_client = os.identity_client |
| return admin_client |
| |
| @classmethod |
| def set_network_resources(cls, network=False, router=False, subnet=False, |
| dhcp=False): |
| """Specify which network resources should be created |
| |
| @param network |
| @param router |
| @param subnet |
| @param dhcp |
| """ |
| # network resources should be set only once from callers |
| # in order to ensure that even if it's called multiple times in |
| # a chain of overloaded methods, the attribute is set only |
| # in the leaf class |
| if not cls.network_resources: |
| cls.network_resources = { |
| 'network': network, |
| 'router': router, |
| 'subnet': subnet, |
| 'dhcp': dhcp} |
| |
| def assertEmpty(self, list, msg=None): |
| self.assertTrue(len(list) == 0, msg) |
| |
| def assertNotEmpty(self, list, msg=None): |
| self.assertTrue(len(list) > 0, msg) |
| |
| |
| class NegativeAutoTest(BaseTestCase): |
| |
| _resources = {} |
| |
| @classmethod |
| def setUpClass(cls): |
| super(NegativeAutoTest, cls).setUpClass() |
| os = cls.get_client_manager() |
| cls.client = os.negative_client |
| os_admin = clients.AdminManager(service=cls._service) |
| cls.admin_client = os_admin.negative_client |
| |
| @staticmethod |
| def load_tests(*args): |
| """ |
| Wrapper for testscenarios to set the mandatory scenarios variable |
| only in case a real test loader is in place. Will be automatically |
| called in case the variable "load_tests" is set. |
| """ |
| if getattr(args[0], 'suiteClass', None) is not None: |
| loader, standard_tests, pattern = args |
| else: |
| standard_tests, module, loader = args |
| for test in testtools.iterate_tests(standard_tests): |
| schema = getattr(test, '_schema', None) |
| if schema is not None: |
| setattr(test, 'scenarios', |
| NegativeAutoTest.generate_scenario(schema)) |
| return testscenarios.load_tests_apply_scenarios(*args) |
| |
| @staticmethod |
| def generate_scenario(description): |
| """ |
| Generates the test scenario list for a given description. |
| |
| :param description: A file or dictionary with the following entries: |
| name (required) name for the api |
| http-method (required) one of HEAD,GET,PUT,POST,PATCH,DELETE |
| url (required) the url to be appended to the catalog url with '%s' |
| for each resource mentioned |
| resources: (optional) A list of resource names such as "server", |
| "flavor", etc. with an element for each '%s' in the url. This |
| method will call self.get_resource for each element when |
| constructing the positive test case template so negative |
| subclasses are expected to return valid resource ids when |
| appropriate. |
| json-schema (optional) A valid json schema that will be used to |
| create invalid data for the api calls. For "GET" and "HEAD", |
| the data is used to generate query strings appended to the url, |
| otherwise for the body of the http call. |
| """ |
| LOG.debug(description) |
| generator = importutils.import_class( |
| CONF.negative.test_generator)() |
| generator.validate_schema(description) |
| schema = description.get("json-schema", None) |
| resources = description.get("resources", []) |
| scenario_list = [] |
| expected_result = None |
| for resource in resources: |
| if isinstance(resource, dict): |
| expected_result = resource['expected_result'] |
| resource = resource['name'] |
| LOG.debug("Add resource to test %s" % resource) |
| scn_name = "inv_res_%s" % (resource) |
| scenario_list.append((scn_name, {"resource": (resource, |
| str(uuid.uuid4())), |
| "expected_result": expected_result |
| })) |
| if schema is not None: |
| for scenario in generator.generate_scenarios(schema): |
| scenario_list.append((scenario['_negtest_name'], |
| scenario)) |
| LOG.debug(scenario_list) |
| return scenario_list |
| |
| def execute(self, description): |
| """ |
| Execute a http call on an api that are expected to |
| result in client errors. First it uses invalid resources that are part |
| of the url, and then invalid data for queries and http request bodies. |
| |
| :param description: A json file or dictionary with the following |
| entries: |
| name (required) name for the api |
| http-method (required) one of HEAD,GET,PUT,POST,PATCH,DELETE |
| url (required) the url to be appended to the catalog url with '%s' |
| for each resource mentioned |
| resources: (optional) A list of resource names such as "server", |
| "flavor", etc. with an element for each '%s' in the url. This |
| method will call self.get_resource for each element when |
| constructing the positive test case template so negative |
| subclasses are expected to return valid resource ids when |
| appropriate. |
| json-schema (optional) A valid json schema that will be used to |
| create invalid data for the api calls. For "GET" and "HEAD", |
| the data is used to generate query strings appended to the url, |
| otherwise for the body of the http call. |
| |
| """ |
| LOG.info("Executing %s" % description["name"]) |
| LOG.debug(description) |
| generator = importutils.import_class( |
| CONF.negative.test_generator)() |
| schema = description.get("json-schema", None) |
| method = description["http-method"] |
| url = description["url"] |
| expected_result = None |
| if "default_result_code" in description: |
| expected_result = description["default_result_code"] |
| |
| resources = [self.get_resource(r) for |
| r in description.get("resources", [])] |
| |
| if hasattr(self, "resource"): |
| # Note(mkoderer): The resources list already contains an invalid |
| # entry (see get_resource). |
| # We just send a valid json-schema with it |
| valid_schema = None |
| if schema: |
| valid_schema = \ |
| valid.ValidTestGenerator().generate_valid(schema) |
| new_url, body = self._http_arguments(valid_schema, url, method) |
| elif hasattr(self, "_negtest_name"): |
| schema_under_test = \ |
| valid.ValidTestGenerator().generate_valid(schema) |
| local_expected_result = \ |
| generator.generate_payload(self, schema_under_test) |
| if local_expected_result is not None: |
| expected_result = local_expected_result |
| new_url, body = \ |
| self._http_arguments(schema_under_test, url, method) |
| else: |
| raise Exception("testscenarios are not active. Please make sure " |
| "that your test runner supports the load_tests " |
| "mechanism") |
| |
| if "admin_client" in description and description["admin_client"]: |
| client = self.admin_client |
| else: |
| client = self.client |
| resp, resp_body = client.send_request(method, new_url, |
| resources, body=body) |
| self._check_negative_response(expected_result, resp.status, resp_body) |
| |
| def _http_arguments(self, json_dict, url, method): |
| LOG.debug("dict: %s url: %s method: %s" % (json_dict, url, method)) |
| if not json_dict: |
| return url, None |
| elif method in ["GET", "HEAD", "PUT", "DELETE"]: |
| return "%s?%s" % (url, parse.urlencode(json_dict)), None |
| else: |
| return url, json.dumps(json_dict) |
| |
| def _check_negative_response(self, expected_result, result, body): |
| self.assertTrue(result >= 400 and result < 500 and result != 413, |
| "Expected client error, got %s:%s" % |
| (result, body)) |
| self.assertTrue(expected_result is None or expected_result == result, |
| "Expected %s, got %s:%s" % |
| (expected_result, result, body)) |
| |
| @classmethod |
| def set_resource(cls, name, resource): |
| """ |
| This function can be used in setUpClass context to register a resoruce |
| for a test. |
| |
| :param name: The name of the kind of resource such as "flavor", "role", |
| etc. |
| :resource: The id of the resource |
| """ |
| cls._resources[name] = resource |
| |
| def get_resource(self, name): |
| """ |
| Return a valid uuid for a type of resource. If a real resource is |
| needed as part of a url then this method should return one. Otherwise |
| it can return None. |
| |
| :param name: The name of the kind of resource such as "flavor", "role", |
| etc. |
| """ |
| if isinstance(name, dict): |
| name = name['name'] |
| if hasattr(self, "resource") and self.resource[0] == name: |
| LOG.debug("Return invalid resource (%s) value: %s" % |
| (self.resource[0], self.resource[1])) |
| return self.resource[1] |
| if name in self._resources: |
| return self._resources[name] |
| return None |
| |
| |
| def SimpleNegativeAutoTest(klass): |
| """ |
| This decorator registers a test function on basis of the class name. |
| """ |
| @attr(type=['negative', 'gate']) |
| def generic_test(self): |
| if hasattr(self, '_schema'): |
| self.execute(self._schema) |
| |
| cn = klass.__name__ |
| cn = cn.replace('JSON', '') |
| cn = cn.replace('Test', '') |
| # NOTE(mkoderer): replaces uppercase chars inside the class name with '_' |
| lower_cn = re.sub('(?<!^)(?=[A-Z])', '_', cn).lower() |
| func_name = 'test_%s' % lower_cn |
| setattr(klass, func_name, generic_test) |
| return klass |
| |
| |
| def call_until_true(func, duration, sleep_for): |
| """ |
| Call the given function until it returns True (and return True) or |
| until the specified duration (in seconds) elapses (and return |
| False). |
| |
| :param func: A zero argument callable that returns True on success. |
| :param duration: The number of seconds for which to attempt a |
| successful call of the function. |
| :param sleep_for: The number of seconds to sleep after an unsuccessful |
| invocation of the function. |
| """ |
| now = time.time() |
| timeout = now + duration |
| while now < timeout: |
| if func(): |
| return True |
| time.sleep(sleep_for) |
| now = time.time() |
| return False |