|  | # Copyright 2012 OpenStack Foundation | 
|  | # All Rights Reserved. | 
|  | # | 
|  | #    Licensed under the Apache License, Version 2.0 (the "License"); you may | 
|  | #    not use this file except in compliance with the License. You may obtain | 
|  | #    a copy of the License at | 
|  | # | 
|  | #         http://www.apache.org/licenses/LICENSE-2.0 | 
|  | # | 
|  | #    Unless required by applicable law or agreed to in writing, software | 
|  | #    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT | 
|  | #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the | 
|  | #    License for the specific language governing permissions and limitations | 
|  | #    under the License. | 
|  |  | 
|  | import atexit | 
|  | import functools | 
|  | import os | 
|  | import re | 
|  | import sys | 
|  |  | 
|  | import debtcollector.moves | 
|  | import fixtures | 
|  | from oslo_log import log as logging | 
|  | from oslo_serialization import jsonutils as json | 
|  | from oslo_utils import importutils | 
|  | import six | 
|  | from six.moves import urllib | 
|  | import testscenarios | 
|  | import testtools | 
|  |  | 
|  | from tempest import clients | 
|  | from tempest.common import cred_client | 
|  | from tempest.common import credentials_factory as credentials | 
|  | from tempest.common import fixed_network | 
|  | import tempest.common.generator.valid_generator as valid | 
|  | import tempest.common.validation_resources as vresources | 
|  | from tempest import config | 
|  | from tempest import exceptions | 
|  | from tempest.lib.common.utils import data_utils | 
|  | from tempest.lib.common.utils import test_utils | 
|  | from tempest.lib import decorators | 
|  | from tempest.lib import exceptions as lib_exc | 
|  |  | 
|  | LOG = logging.getLogger(__name__) | 
|  |  | 
|  | CONF = config.CONF | 
|  |  | 
|  | idempotent_id = decorators.idempotent_id | 
|  |  | 
|  |  | 
|  | def attr(**kwargs): | 
|  | """A decorator which applies the testtools attr decorator | 
|  |  | 
|  | This decorator applies the testtools.testcase.attr if it is in the list of | 
|  | attributes to testtools we want to apply. | 
|  | """ | 
|  |  | 
|  | def decorator(f): | 
|  | if 'type' in kwargs and isinstance(kwargs['type'], str): | 
|  | f = testtools.testcase.attr(kwargs['type'])(f) | 
|  | elif 'type' in kwargs and isinstance(kwargs['type'], list): | 
|  | for attr in kwargs['type']: | 
|  | f = testtools.testcase.attr(attr)(f) | 
|  | return f | 
|  |  | 
|  | return decorator | 
|  |  | 
|  |  | 
|  | def get_service_list(): | 
|  | service_list = { | 
|  | 'compute': CONF.service_available.nova, | 
|  | 'image': CONF.service_available.glance, | 
|  | 'baremetal': CONF.service_available.ironic, | 
|  | 'volume': CONF.service_available.cinder, | 
|  | 'network': True, | 
|  | 'identity': True, | 
|  | 'object_storage': CONF.service_available.swift, | 
|  | } | 
|  | return service_list | 
|  |  | 
|  |  | 
|  | def services(*args): | 
|  | """A decorator used to set an attr for each service used in a test case | 
|  |  | 
|  | This decorator applies a testtools attr for each service that gets | 
|  | exercised by a test case. | 
|  | """ | 
|  | def decorator(f): | 
|  | services = ['compute', 'image', 'baremetal', 'volume', | 
|  | 'network', 'identity', 'object_storage'] | 
|  | for service in args: | 
|  | if service not in services: | 
|  | raise exceptions.InvalidServiceTag('%s is not a valid ' | 
|  | 'service' % service) | 
|  | attr(type=list(args))(f) | 
|  |  | 
|  | @functools.wraps(f) | 
|  | def wrapper(self, *func_args, **func_kwargs): | 
|  | service_list = get_service_list() | 
|  |  | 
|  | for service in args: | 
|  | if not service_list[service]: | 
|  | msg = 'Skipped because the %s service is not available' % ( | 
|  | service) | 
|  | raise testtools.TestCase.skipException(msg) | 
|  | return f(self, *func_args, **func_kwargs) | 
|  | return wrapper | 
|  | return decorator | 
|  |  | 
|  |  | 
|  | def stresstest(**kwargs): | 
|  | """Add stress test decorator | 
|  |  | 
|  | For all functions with this decorator a attr stress will be | 
|  | set automatically. | 
|  |  | 
|  | @param class_setup_per: allowed values are application, process, action | 
|  | ``application``: once in the stress job lifetime | 
|  | ``process``: once in the worker process lifetime | 
|  | ``action``: on each action | 
|  | @param allow_inheritance: allows inheritance of this attribute | 
|  | """ | 
|  | def decorator(f): | 
|  | if 'class_setup_per' in kwargs: | 
|  | setattr(f, "st_class_setup_per", kwargs['class_setup_per']) | 
|  | else: | 
|  | setattr(f, "st_class_setup_per", 'process') | 
|  | if 'allow_inheritance' in kwargs: | 
|  | setattr(f, "st_allow_inheritance", kwargs['allow_inheritance']) | 
|  | else: | 
|  | setattr(f, "st_allow_inheritance", False) | 
|  | attr(type='stress')(f) | 
|  | return f | 
|  | return decorator | 
|  |  | 
|  |  | 
|  | def requires_ext(**kwargs): | 
|  | """A decorator to skip tests if an extension is not enabled | 
|  |  | 
|  | @param extension | 
|  | @param service | 
|  | """ | 
|  | def decorator(func): | 
|  | @functools.wraps(func) | 
|  | def wrapper(*func_args, **func_kwargs): | 
|  | if not is_extension_enabled(kwargs['extension'], | 
|  | kwargs['service']): | 
|  | msg = "Skipped because %s extension: %s is not enabled" % ( | 
|  | kwargs['service'], kwargs['extension']) | 
|  | raise testtools.TestCase.skipException(msg) | 
|  | return func(*func_args, **func_kwargs) | 
|  | return wrapper | 
|  | return decorator | 
|  |  | 
|  |  | 
|  | def is_extension_enabled(extension_name, service): | 
|  | """A function that will check the list of enabled extensions from config | 
|  |  | 
|  | """ | 
|  | config_dict = { | 
|  | 'compute': CONF.compute_feature_enabled.api_extensions, | 
|  | 'volume': CONF.volume_feature_enabled.api_extensions, | 
|  | 'network': CONF.network_feature_enabled.api_extensions, | 
|  | 'object': CONF.object_storage_feature_enabled.discoverable_apis, | 
|  | 'identity': CONF.identity_feature_enabled.api_extensions | 
|  | } | 
|  | if len(config_dict[service]) == 0: | 
|  | return False | 
|  | if config_dict[service][0] == 'all': | 
|  | return True | 
|  | if extension_name in config_dict[service]: | 
|  | return True | 
|  | return False | 
|  |  | 
|  |  | 
|  | def is_scheduler_filter_enabled(filter_name): | 
|  | """Check the list of enabled compute scheduler filters from config. """ | 
|  |  | 
|  | filters = CONF.compute_feature_enabled.scheduler_available_filters | 
|  | if len(filters) == 0: | 
|  | return False | 
|  | if 'all' in filters: | 
|  | return True | 
|  | if filter_name in filters: | 
|  | return True | 
|  | return False | 
|  |  | 
|  |  | 
|  | at_exit_set = set() | 
|  |  | 
|  |  | 
|  | def validate_tearDownClass(): | 
|  | if at_exit_set: | 
|  | LOG.error( | 
|  | "tearDownClass does not call the super's " | 
|  | "tearDownClass in these classes: \n" | 
|  | + str(at_exit_set)) | 
|  |  | 
|  |  | 
|  | atexit.register(validate_tearDownClass) | 
|  |  | 
|  |  | 
|  | class BaseTestCase(testtools.testcase.WithAttributes, | 
|  | testtools.TestCase): | 
|  | """The test base class defines Tempest framework for class level fixtures. | 
|  |  | 
|  | `setUpClass` and `tearDownClass` are defined here and cannot be overwritten | 
|  | by subclasses (enforced via hacking rule T105). | 
|  |  | 
|  | Set-up is split in a series of steps (setup stages), which can be | 
|  | overwritten by test classes. Set-up stages are: | 
|  | - skip_checks | 
|  | - setup_credentials | 
|  | - setup_clients | 
|  | - resource_setup | 
|  |  | 
|  | Tear-down is also split in a series of steps (teardown stages), which are | 
|  | stacked for execution only if the corresponding setup stage had been | 
|  | reached during the setup phase. Tear-down stages are: | 
|  | - clear_credentials (defined in the base test class) | 
|  | - resource_cleanup | 
|  | """ | 
|  |  | 
|  | setUpClassCalled = False | 
|  | _service = None | 
|  |  | 
|  | # NOTE(andreaf) credentials holds a list of the credentials to be allocated | 
|  | # at class setup time. Credential types can be 'primary', 'alt', 'admin' or | 
|  | # a list of roles - the first element of the list being a label, and the | 
|  | # rest the actual roles | 
|  | credentials = [] | 
|  | # Resources required to validate a server using ssh | 
|  | validation_resources = {} | 
|  | network_resources = {} | 
|  |  | 
|  | # NOTE(sdague): log_format is defined inline here instead of using the oslo | 
|  | # default because going through the config path recouples config to the | 
|  | # stress tests too early, and depending on testr order will fail unit tests | 
|  | log_format = ('%(asctime)s %(process)d %(levelname)-8s ' | 
|  | '[%(name)s] %(message)s') | 
|  |  | 
|  | # Client manager class to use in this test case. | 
|  | client_manager = clients.Manager | 
|  |  | 
|  | # A way to adjust slow test classes | 
|  | TIMEOUT_SCALING_FACTOR = 1 | 
|  |  | 
|  | @classmethod | 
|  | def setUpClass(cls): | 
|  | # It should never be overridden by descendants | 
|  | if hasattr(super(BaseTestCase, cls), 'setUpClass'): | 
|  | super(BaseTestCase, cls).setUpClass() | 
|  | cls.setUpClassCalled = True | 
|  | # Stack of (name, callable) to be invoked in reverse order at teardown | 
|  | cls.teardowns = [] | 
|  | # All the configuration checks that may generate a skip | 
|  | cls.skip_checks() | 
|  | try: | 
|  | # Allocation of all required credentials and client managers | 
|  | cls.teardowns.append(('credentials', cls.clear_credentials)) | 
|  | cls.setup_credentials() | 
|  | # Shortcuts to clients | 
|  | cls.setup_clients() | 
|  | # Additional class-wide test resources | 
|  | cls.teardowns.append(('resources', cls.resource_cleanup)) | 
|  | cls.resource_setup() | 
|  | except Exception: | 
|  | etype, value, trace = sys.exc_info() | 
|  | LOG.info("%s raised in %s.setUpClass. Invoking tearDownClass." % ( | 
|  | etype, cls.__name__)) | 
|  | cls.tearDownClass() | 
|  | try: | 
|  | six.reraise(etype, value, trace) | 
|  | finally: | 
|  | del trace  # to avoid circular refs | 
|  |  | 
|  | @classmethod | 
|  | def tearDownClass(cls): | 
|  | at_exit_set.discard(cls) | 
|  | # It should never be overridden by descendants | 
|  | if hasattr(super(BaseTestCase, cls), 'tearDownClass'): | 
|  | super(BaseTestCase, cls).tearDownClass() | 
|  | # Save any existing exception, we always want to re-raise the original | 
|  | # exception only | 
|  | etype, value, trace = sys.exc_info() | 
|  | # If there was no exception during setup we shall re-raise the first | 
|  | # exception in teardown | 
|  | re_raise = (etype is None) | 
|  | while cls.teardowns: | 
|  | name, teardown = cls.teardowns.pop() | 
|  | # Catch any exception in tearDown so we can re-raise the original | 
|  | # exception at the end | 
|  | try: | 
|  | teardown() | 
|  | except Exception as te: | 
|  | sys_exec_info = sys.exc_info() | 
|  | tetype = sys_exec_info[0] | 
|  | # TODO(andreaf): Till we have the ability to cleanup only | 
|  | # resources that were successfully setup in resource_cleanup, | 
|  | # log AttributeError as info instead of exception. | 
|  | if tetype is AttributeError and name == 'resources': | 
|  | LOG.info("tearDownClass of %s failed: %s" % (name, te)) | 
|  | else: | 
|  | LOG.exception("teardown of %s failed: %s" % (name, te)) | 
|  | if not etype: | 
|  | etype, value, trace = sys_exec_info | 
|  | # If exceptions were raised during teardown, and not before, re-raise | 
|  | # the first one | 
|  | if re_raise and etype is not None: | 
|  | try: | 
|  | six.reraise(etype, value, trace) | 
|  | finally: | 
|  | del trace  # to avoid circular refs | 
|  |  | 
|  | @classmethod | 
|  | def skip_checks(cls): | 
|  | """Class level skip checks. | 
|  |  | 
|  | Subclasses verify in here all conditions that might prevent the | 
|  | execution of the entire test class. | 
|  | Checks implemented here may not make use API calls, and should rely on | 
|  | configuration alone. | 
|  | In general skip checks that require an API call are discouraged. | 
|  | If one is really needed it may be implemented either in the | 
|  | resource_setup or at test level. | 
|  | """ | 
|  | identity_version = cls.get_identity_version() | 
|  | if 'admin' in cls.credentials and not credentials.is_admin_available( | 
|  | identity_version=identity_version): | 
|  | msg = "Missing Identity Admin API credentials in configuration." | 
|  | raise cls.skipException(msg) | 
|  | if 'alt' in cls.credentials and not credentials.is_alt_available( | 
|  | identity_version=identity_version): | 
|  | msg = "Missing a 2nd set of API credentials in configuration." | 
|  | raise cls.skipException(msg) | 
|  | if hasattr(cls, 'identity_version'): | 
|  | if cls.identity_version == 'v2': | 
|  | if not CONF.identity_feature_enabled.api_v2: | 
|  | raise cls.skipException("Identity api v2 is not enabled") | 
|  | elif cls.identity_version == 'v3': | 
|  | if not CONF.identity_feature_enabled.api_v3: | 
|  | raise cls.skipException("Identity api v3 is not enabled") | 
|  |  | 
|  | @classmethod | 
|  | def setup_credentials(cls): | 
|  | """Allocate credentials and create the client managers from them. | 
|  |  | 
|  | For every element of credentials param function creates tenant/user, | 
|  | Then it creates client manager for that credential. | 
|  |  | 
|  | Network related tests must override this function with | 
|  | set_network_resources() method, otherwise it will create | 
|  | network resources(network resources are created in a later step). | 
|  | """ | 
|  | for credentials_type in cls.credentials: | 
|  | # This may raise an exception in case credentials are not available | 
|  | # In that case we want to let the exception through and the test | 
|  | # fail accordingly | 
|  | if isinstance(credentials_type, six.string_types): | 
|  | manager = cls.get_client_manager( | 
|  | credential_type=credentials_type) | 
|  | setattr(cls, 'os_%s' % credentials_type, manager) | 
|  | # Setup some common aliases | 
|  | # TODO(andreaf) The aliases below are a temporary hack | 
|  | # to avoid changing too much code in one patch. They should | 
|  | # be removed eventually | 
|  | if credentials_type == 'primary': | 
|  | cls.os = cls.manager = cls.os_primary | 
|  | if credentials_type == 'admin': | 
|  | cls.os_adm = cls.admin_manager = cls.os_admin | 
|  | if credentials_type == 'alt': | 
|  | cls.alt_manager = cls.os_alt | 
|  | elif isinstance(credentials_type, list): | 
|  | manager = cls.get_client_manager(roles=credentials_type[1:], | 
|  | force_new=True) | 
|  | setattr(cls, 'os_roles_%s' % credentials_type[0], manager) | 
|  |  | 
|  | @classmethod | 
|  | def setup_clients(cls): | 
|  | """Create links to the clients into the test object.""" | 
|  | # TODO(andreaf) There is a fair amount of code that could me moved from | 
|  | # base / test classes in here. Ideally tests should be able to only | 
|  | # specify which client is `client` and nothing else. | 
|  | pass | 
|  |  | 
|  | @classmethod | 
|  | def resource_setup(cls): | 
|  | """Class level resource setup for test cases.""" | 
|  | if hasattr(cls, "os"): | 
|  | cls.validation_resources = vresources.create_validation_resources( | 
|  | cls.os, cls.validation_resources) | 
|  | else: | 
|  | LOG.warning("Client manager not found, validation resources not" | 
|  | " created") | 
|  |  | 
|  | @classmethod | 
|  | def resource_cleanup(cls): | 
|  | """Class level resource cleanup for test cases. | 
|  |  | 
|  | Resource cleanup must be able to handle the case of partially setup | 
|  | resources, in case a failure during `resource_setup` should happen. | 
|  | """ | 
|  | if cls.validation_resources: | 
|  | if hasattr(cls, "os"): | 
|  | vresources.clear_validation_resources(cls.os, | 
|  | cls.validation_resources) | 
|  | cls.validation_resources = {} | 
|  | else: | 
|  | LOG.warning("Client manager not found, validation resources " | 
|  | "not deleted") | 
|  |  | 
|  | def setUp(self): | 
|  | super(BaseTestCase, self).setUp() | 
|  | if not self.setUpClassCalled: | 
|  | raise RuntimeError("setUpClass does not calls the super's" | 
|  | "setUpClass in the " | 
|  | + self.__class__.__name__) | 
|  | at_exit_set.add(self.__class__) | 
|  | test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0) | 
|  | try: | 
|  | test_timeout = int(test_timeout) * self.TIMEOUT_SCALING_FACTOR | 
|  | except ValueError: | 
|  | test_timeout = 0 | 
|  | if test_timeout > 0: | 
|  | self.useFixture(fixtures.Timeout(test_timeout, gentle=True)) | 
|  |  | 
|  | if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or | 
|  | os.environ.get('OS_STDOUT_CAPTURE') == '1'): | 
|  | stdout = self.useFixture(fixtures.StringStream('stdout')).stream | 
|  | self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout)) | 
|  | if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or | 
|  | os.environ.get('OS_STDERR_CAPTURE') == '1'): | 
|  | stderr = self.useFixture(fixtures.StringStream('stderr')).stream | 
|  | self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr)) | 
|  | if (os.environ.get('OS_LOG_CAPTURE') != 'False' and | 
|  | os.environ.get('OS_LOG_CAPTURE') != '0'): | 
|  | self.useFixture(fixtures.LoggerFixture(nuke_handlers=False, | 
|  | format=self.log_format, | 
|  | level=None)) | 
|  |  | 
|  | @property | 
|  | def credentials_provider(self): | 
|  | return self._get_credentials_provider() | 
|  |  | 
|  | @property | 
|  | def identity_utils(self): | 
|  | """A client that abstracts v2 and v3 identity operations. | 
|  |  | 
|  | This can be used for creating and tearing down projects in tests. It | 
|  | should not be used for testing identity features. | 
|  | """ | 
|  | if CONF.identity.auth_version == 'v2': | 
|  | client = self.os_admin.identity_client | 
|  | users_client = self.os_admin.users_client | 
|  | project_client = self.os_admin.tenants_client | 
|  | roles_client = self.os_admin.roles_client | 
|  | domains_client = None | 
|  | else: | 
|  | client = self.os_admin.identity_v3_client | 
|  | users_client = self.os_admin.users_v3_client | 
|  | project_client = self.os_admin.projects_client | 
|  | roles_client = self.os_admin.roles_v3_client | 
|  | domains_client = self.os_admin.domains_client | 
|  |  | 
|  | try: | 
|  | domain = client.auth_provider.credentials.project_domain_name | 
|  | except AttributeError: | 
|  | domain = 'Default' | 
|  |  | 
|  | return cred_client.get_creds_client(client, project_client, | 
|  | users_client, | 
|  | roles_client, | 
|  | domains_client, | 
|  | project_domain_name=domain) | 
|  |  | 
|  | @classmethod | 
|  | def get_identity_version(cls): | 
|  | """Returns the identity version used by the test class""" | 
|  | identity_version = getattr(cls, 'identity_version', None) | 
|  | return identity_version or CONF.identity.auth_version | 
|  |  | 
|  | @classmethod | 
|  | def _get_credentials_provider(cls): | 
|  | """Returns a credentials provider | 
|  |  | 
|  | If no credential provider exists yet creates one. | 
|  | It uses self.identity_version if defined, or the configuration value | 
|  | """ | 
|  | if (not hasattr(cls, '_creds_provider') or not cls._creds_provider or | 
|  | not cls._creds_provider.name == cls.__name__): | 
|  | force_tenant_isolation = getattr(cls, 'force_tenant_isolation', | 
|  | False) | 
|  |  | 
|  | cls._creds_provider = credentials.get_credentials_provider( | 
|  | name=cls.__name__, network_resources=cls.network_resources, | 
|  | force_tenant_isolation=force_tenant_isolation, | 
|  | identity_version=cls.get_identity_version()) | 
|  | return cls._creds_provider | 
|  |  | 
|  | @classmethod | 
|  | def get_client_manager(cls, credential_type=None, roles=None, | 
|  | force_new=None): | 
|  | """Returns an OpenStack client manager | 
|  |  | 
|  | Returns an OpenStack client manager based on either credential_type | 
|  | or a list of roles. If neither is specified, it defaults to | 
|  | credential_type 'primary' | 
|  | :param credential_type: string - primary, alt or admin | 
|  | :param roles: list of roles | 
|  |  | 
|  | :returns: the created client manager | 
|  | :raises skipException: if the requested credentials are not available | 
|  | """ | 
|  | if all([roles, credential_type]): | 
|  | msg = "Cannot get credentials by type and roles at the same time" | 
|  | raise ValueError(msg) | 
|  | if not any([roles, credential_type]): | 
|  | credential_type = 'primary' | 
|  | cred_provider = cls._get_credentials_provider() | 
|  | if roles: | 
|  | for role in roles: | 
|  | if not cred_provider.is_role_available(role): | 
|  | skip_msg = ( | 
|  | "%s skipped because the configured credential provider" | 
|  | " is not able to provide credentials with the %s role " | 
|  | "assigned." % (cls.__name__, role)) | 
|  | raise cls.skipException(skip_msg) | 
|  | params = dict(roles=roles) | 
|  | if force_new is not None: | 
|  | params.update(force_new=force_new) | 
|  | creds = cred_provider.get_creds_by_roles(**params) | 
|  | else: | 
|  | credentials_method = 'get_%s_creds' % credential_type | 
|  | if hasattr(cred_provider, credentials_method): | 
|  | creds = getattr(cred_provider, credentials_method)() | 
|  | else: | 
|  | raise lib_exc.InvalidCredentials( | 
|  | "Invalid credentials type %s" % credential_type) | 
|  | return cls.client_manager(credentials=creds.credentials, | 
|  | service=cls._service) | 
|  |  | 
|  | @classmethod | 
|  | def clear_credentials(cls): | 
|  | """Clears creds if set""" | 
|  | if hasattr(cls, '_creds_provider'): | 
|  | cls._creds_provider.clear_creds() | 
|  |  | 
|  | @classmethod | 
|  | def set_validation_resources(cls, keypair=None, floating_ip=None, | 
|  | security_group=None, | 
|  | security_group_rules=None): | 
|  | """Specify which ssh server validation resources should be created. | 
|  |  | 
|  | Each of the argument must be set to either None, True or False, with | 
|  | None - use default from config (security groups and security group | 
|  | rules get created when set to None) | 
|  | False - Do not create the validation resource | 
|  | True - create the validation resource | 
|  |  | 
|  | @param keypair | 
|  | @param security_group | 
|  | @param security_group_rules | 
|  | @param floating_ip | 
|  | """ | 
|  | if not CONF.validation.run_validation: | 
|  | return | 
|  | if keypair is None: | 
|  | if CONF.validation.auth_method.lower() == "keypair": | 
|  | keypair = True | 
|  | else: | 
|  | keypair = False | 
|  | if floating_ip is None: | 
|  | if CONF.validation.connect_method.lower() == "floating": | 
|  | floating_ip = True | 
|  | else: | 
|  | floating_ip = False | 
|  | if security_group is None: | 
|  | security_group = CONF.validation.security_group | 
|  | if security_group_rules is None: | 
|  | security_group_rules = CONF.validation.security_group_rules | 
|  |  | 
|  | if not cls.validation_resources: | 
|  | cls.validation_resources = { | 
|  | 'keypair': keypair, | 
|  | 'security_group': security_group, | 
|  | 'security_group_rules': security_group_rules, | 
|  | 'floating_ip': floating_ip} | 
|  |  | 
|  | @classmethod | 
|  | def set_network_resources(cls, network=False, router=False, subnet=False, | 
|  | dhcp=False): | 
|  | """Specify which network resources should be created | 
|  |  | 
|  | @param network | 
|  | @param router | 
|  | @param subnet | 
|  | @param dhcp | 
|  | """ | 
|  | # network resources should be set only once from callers | 
|  | # in order to ensure that even if it's called multiple times in | 
|  | # a chain of overloaded methods, the attribute is set only | 
|  | # in the leaf class | 
|  | if not cls.network_resources: | 
|  | cls.network_resources = { | 
|  | 'network': network, | 
|  | 'router': router, | 
|  | 'subnet': subnet, | 
|  | 'dhcp': dhcp} | 
|  |  | 
|  | @classmethod | 
|  | def get_tenant_network(cls, credentials_type='primary'): | 
|  | """Get the network to be used in testing | 
|  |  | 
|  | :param credentials_type: The type of credentials for which to get the | 
|  | tenant network | 
|  |  | 
|  | :return: network dict including 'id' and 'name' | 
|  | """ | 
|  | # Get a manager for the given credentials_type, but at least | 
|  | # always fall back on getting the manager for primary credentials | 
|  | if isinstance(credentials_type, six.string_types): | 
|  | manager = cls.get_client_manager(credential_type=credentials_type) | 
|  | elif isinstance(credentials_type, list): | 
|  | manager = cls.get_client_manager(roles=credentials_type[1:]) | 
|  | else: | 
|  | manager = cls.get_client_manager() | 
|  |  | 
|  | # Make sure cred_provider exists and get a network client | 
|  | networks_client = manager.compute_networks_client | 
|  | cred_provider = cls._get_credentials_provider() | 
|  | # In case of nova network, isolated tenants are not able to list the | 
|  | # network configured in fixed_network_name, even if they can use it | 
|  | # for their servers, so using an admin network client to validate | 
|  | # the network name | 
|  | if (not CONF.service_available.neutron and | 
|  | credentials.is_admin_available( | 
|  | identity_version=cls.get_identity_version())): | 
|  | admin_creds = cred_provider.get_admin_creds() | 
|  | admin_manager = clients.Manager(admin_creds.credentials) | 
|  | networks_client = admin_manager.compute_networks_client | 
|  | return fixed_network.get_tenant_network( | 
|  | cred_provider, networks_client, CONF.compute.fixed_network_name) | 
|  |  | 
|  | def assertEmpty(self, list, msg=None): | 
|  | self.assertTrue(len(list) == 0, msg) | 
|  |  | 
|  | def assertNotEmpty(self, list, msg=None): | 
|  | self.assertTrue(len(list) > 0, msg) | 
|  |  | 
|  |  | 
|  | class NegativeAutoTest(BaseTestCase): | 
|  |  | 
|  | _resources = {} | 
|  |  | 
|  | @classmethod | 
|  | def setUpClass(cls): | 
|  | super(NegativeAutoTest, cls).setUpClass() | 
|  | os = cls.get_client_manager(credential_type='primary') | 
|  | cls.client = os.negative_client | 
|  |  | 
|  | @staticmethod | 
|  | def load_tests(*args): | 
|  | """Wrapper for testscenarios | 
|  |  | 
|  | To set the mandatory scenarios variable only in case a real test | 
|  | loader is in place. Will be automatically called in case the variable | 
|  | "load_tests" is set. | 
|  | """ | 
|  | if getattr(args[0], 'suiteClass', None) is not None: | 
|  | loader, standard_tests, pattern = args | 
|  | else: | 
|  | standard_tests, module, loader = args | 
|  | for test in testtools.iterate_tests(standard_tests): | 
|  | schema = getattr(test, '_schema', None) | 
|  | if schema is not None: | 
|  | setattr(test, 'scenarios', | 
|  | NegativeAutoTest.generate_scenario(schema)) | 
|  | return testscenarios.load_tests_apply_scenarios(*args) | 
|  |  | 
|  | @staticmethod | 
|  | def generate_scenario(description): | 
|  | """Generates the test scenario list for a given description. | 
|  |  | 
|  | :param description: A file or dictionary with the following entries: | 
|  | name (required) name for the api | 
|  | http-method (required) one of HEAD,GET,PUT,POST,PATCH,DELETE | 
|  | url (required) the url to be appended to the catalog url with '%s' | 
|  | for each resource mentioned | 
|  | resources: (optional) A list of resource names such as "server", | 
|  | "flavor", etc. with an element for each '%s' in the url. This | 
|  | method will call self.get_resource for each element when | 
|  | constructing the positive test case template so negative | 
|  | subclasses are expected to return valid resource ids when | 
|  | appropriate. | 
|  | json-schema (optional) A valid json schema that will be used to | 
|  | create invalid data for the api calls. For "GET" and "HEAD", | 
|  | the data is used to generate query strings appended to the url, | 
|  | otherwise for the body of the http call. | 
|  | """ | 
|  | LOG.debug(description) | 
|  | generator = importutils.import_class( | 
|  | CONF.negative.test_generator)() | 
|  | generator.validate_schema(description) | 
|  | schema = description.get("json-schema", None) | 
|  | resources = description.get("resources", []) | 
|  | scenario_list = [] | 
|  | expected_result = None | 
|  | for resource in resources: | 
|  | if isinstance(resource, dict): | 
|  | expected_result = resource['expected_result'] | 
|  | resource = resource['name'] | 
|  | LOG.debug("Add resource to test %s" % resource) | 
|  | scn_name = "inv_res_%s" % (resource) | 
|  | scenario_list.append((scn_name, { | 
|  | "resource": (resource, data_utils.rand_uuid()), | 
|  | "expected_result": expected_result | 
|  | })) | 
|  | if schema is not None: | 
|  | for scenario in generator.generate_scenarios(schema): | 
|  | scenario_list.append((scenario['_negtest_name'], | 
|  | scenario)) | 
|  | LOG.debug(scenario_list) | 
|  | return scenario_list | 
|  |  | 
|  | def execute(self, description): | 
|  | """Execute a http call | 
|  |  | 
|  | Execute a http call on an api that are expected to | 
|  | result in client errors. First it uses invalid resources that are part | 
|  | of the url, and then invalid data for queries and http request bodies. | 
|  |  | 
|  | :param description: A json file or dictionary with the following | 
|  | entries: | 
|  | name (required) name for the api | 
|  | http-method (required) one of HEAD,GET,PUT,POST,PATCH,DELETE | 
|  | url (required) the url to be appended to the catalog url with '%s' | 
|  | for each resource mentioned | 
|  | resources: (optional) A list of resource names such as "server", | 
|  | "flavor", etc. with an element for each '%s' in the url. This | 
|  | method will call self.get_resource for each element when | 
|  | constructing the positive test case template so negative | 
|  | subclasses are expected to return valid resource ids when | 
|  | appropriate. | 
|  | json-schema (optional) A valid json schema that will be used to | 
|  | create invalid data for the api calls. For "GET" and "HEAD", | 
|  | the data is used to generate query strings appended to the url, | 
|  | otherwise for the body of the http call. | 
|  |  | 
|  | """ | 
|  | LOG.info("Executing %s" % description["name"]) | 
|  | LOG.debug(description) | 
|  | generator = importutils.import_class( | 
|  | CONF.negative.test_generator)() | 
|  | schema = description.get("json-schema", None) | 
|  | method = description["http-method"] | 
|  | url = description["url"] | 
|  | expected_result = None | 
|  | if "default_result_code" in description: | 
|  | expected_result = description["default_result_code"] | 
|  |  | 
|  | resources = [self.get_resource(r) for | 
|  | r in description.get("resources", [])] | 
|  |  | 
|  | if hasattr(self, "resource"): | 
|  | # Note(mkoderer): The resources list already contains an invalid | 
|  | # entry (see get_resource). | 
|  | # We just send a valid json-schema with it | 
|  | valid_schema = None | 
|  | if schema: | 
|  | valid_schema = \ | 
|  | valid.ValidTestGenerator().generate_valid(schema) | 
|  | new_url, body = self._http_arguments(valid_schema, url, method) | 
|  | elif hasattr(self, "_negtest_name"): | 
|  | schema_under_test = \ | 
|  | valid.ValidTestGenerator().generate_valid(schema) | 
|  | local_expected_result = \ | 
|  | generator.generate_payload(self, schema_under_test) | 
|  | if local_expected_result is not None: | 
|  | expected_result = local_expected_result | 
|  | new_url, body = \ | 
|  | self._http_arguments(schema_under_test, url, method) | 
|  | else: | 
|  | raise Exception("testscenarios are not active. Please make sure " | 
|  | "that your test runner supports the load_tests " | 
|  | "mechanism") | 
|  |  | 
|  | if "admin_client" in description and description["admin_client"]: | 
|  | if not credentials.is_admin_available( | 
|  | identity_version=self.get_identity_version()): | 
|  | msg = ("Missing Identity Admin API credentials in" | 
|  | "configuration.") | 
|  | raise self.skipException(msg) | 
|  | creds = self.credentials_provider.get_admin_creds() | 
|  | os_adm = clients.Manager(credentials=creds) | 
|  | client = os_adm.negative_client | 
|  | else: | 
|  | client = self.client | 
|  | resp, resp_body = client.send_request(method, new_url, | 
|  | resources, body=body) | 
|  | self._check_negative_response(expected_result, resp.status, resp_body) | 
|  |  | 
|  | def _http_arguments(self, json_dict, url, method): | 
|  | LOG.debug("dict: %s url: %s method: %s" % (json_dict, url, method)) | 
|  | if not json_dict: | 
|  | return url, None | 
|  | elif method in ["GET", "HEAD", "PUT", "DELETE"]: | 
|  | return "%s?%s" % (url, urllib.parse.urlencode(json_dict)), None | 
|  | else: | 
|  | return url, json.dumps(json_dict) | 
|  |  | 
|  | def _check_negative_response(self, expected_result, result, body): | 
|  | self.assertTrue(result >= 400 and result < 500 and result != 413, | 
|  | "Expected client error, got %s:%s" % | 
|  | (result, body)) | 
|  | self.assertTrue(expected_result is None or expected_result == result, | 
|  | "Expected %s, got %s:%s" % | 
|  | (expected_result, result, body)) | 
|  |  | 
|  | @classmethod | 
|  | def set_resource(cls, name, resource): | 
|  | """Register a resource for a test | 
|  |  | 
|  | This function can be used in setUpClass context to register a resource | 
|  | for a test. | 
|  |  | 
|  | :param name: The name of the kind of resource such as "flavor", "role", | 
|  | etc. | 
|  | :resource: The id of the resource | 
|  | """ | 
|  | cls._resources[name] = resource | 
|  |  | 
|  | def get_resource(self, name): | 
|  | """Return a valid uuid for a type of resource. | 
|  |  | 
|  | If a real resource is needed as part of a url then this method should | 
|  | return one. Otherwise it can return None. | 
|  |  | 
|  | :param name: The name of the kind of resource such as "flavor", "role", | 
|  | etc. | 
|  | """ | 
|  | if isinstance(name, dict): | 
|  | name = name['name'] | 
|  | if hasattr(self, "resource") and self.resource[0] == name: | 
|  | LOG.debug("Return invalid resource (%s) value: %s" % | 
|  | (self.resource[0], self.resource[1])) | 
|  | return self.resource[1] | 
|  | if name in self._resources: | 
|  | return self._resources[name] | 
|  | return None | 
|  |  | 
|  |  | 
|  | def SimpleNegativeAutoTest(klass): | 
|  | """This decorator registers a test function on basis of the class name.""" | 
|  | @attr(type=['negative']) | 
|  | def generic_test(self): | 
|  | if hasattr(self, '_schema'): | 
|  | self.execute(self._schema) | 
|  |  | 
|  | cn = klass.__name__ | 
|  | cn = cn.replace('JSON', '') | 
|  | cn = cn.replace('Test', '') | 
|  | # NOTE(mkoderer): replaces uppercase chars inside the class name with '_' | 
|  | lower_cn = re.sub('(?<!^)(?=[A-Z])', '_', cn).lower() | 
|  | func_name = 'test_%s' % lower_cn | 
|  | setattr(klass, func_name, generic_test) | 
|  | return klass | 
|  |  | 
|  |  | 
|  | call_until_true = debtcollector.moves.moved_function( | 
|  | test_utils.call_until_true, 'call_until_true', __name__, | 
|  | version='Newton', removal_version='Ocata') |