| # Copyright 2012 OpenStack Foundation |
| # All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| |
| from tempest.common import compute |
| from tempest.common import object_storage |
| from tempest.common import waiters |
| from tempest import config |
| from tempest.lib.common import api_version_utils |
| from tempest.lib.common.utils import data_utils |
| from tempest.lib.common.utils import test_utils |
| from tempest.lib.decorators import cleanup_order |
| from tempest.lib import exceptions as lib_exc |
| import tempest.test |
| |
| CONF = config.CONF |
| |
| |
| class BaseVolumeTest(api_version_utils.BaseMicroversionTest, |
| tempest.test.BaseTestCase): |
| """Base test case class for all Cinder API tests.""" |
| |
| # Set this to True in subclasses to create a default network. See |
| # https://bugs.launchpad.net/tempest/+bug/1844568 |
| create_default_network = False |
| credentials = ['primary'] |
| |
| @classmethod |
| def skip_checks(cls): |
| super(BaseVolumeTest, cls).skip_checks() |
| |
| if not CONF.service_available.cinder: |
| skip_msg = ("%s skipped as Cinder is not available" % cls.__name__) |
| raise cls.skipException(skip_msg) |
| if cls.create_default_network and not CONF.service_available.neutron: |
| skip_msg = ( |
| "%s skipped as Neutron is not available" % cls.__name__) |
| raise cls.skipException(skip_msg) |
| |
| api_version_utils.check_skip_with_microversion( |
| cls.volume_min_microversion, cls.volume_max_microversion, |
| CONF.volume.min_microversion, CONF.volume.max_microversion) |
| |
| @classmethod |
| def setup_credentials(cls): |
| cls.set_network_resources( |
| network=cls.create_default_network, |
| router=cls.create_default_network, |
| dhcp=cls.create_default_network, |
| subnet=cls.create_default_network) |
| super(BaseVolumeTest, cls).setup_credentials() |
| |
| @classmethod |
| def setup_clients(cls): |
| super(BaseVolumeTest, cls).setup_clients() |
| cls.servers_client = cls.os_primary.servers_client |
| |
| if CONF.service_available.glance: |
| cls.images_client = cls.os_primary.image_client_v2 |
| |
| cls.container_client = cls.os_primary.container_client |
| cls.object_client = cls.os_primary.object_client |
| cls.backups_client = cls.os_primary.backups_client_latest |
| cls.volumes_client = cls.os_primary.volumes_client_latest |
| cls.messages_client = cls.os_primary.volume_messages_client_latest |
| cls.versions_client = cls.os_primary.volume_versions_client_latest |
| cls.groups_client = cls.os_primary.groups_client_latest |
| cls.group_snapshots_client = ( |
| cls.os_primary.group_snapshots_client_latest) |
| cls.snapshots_client = cls.os_primary.snapshots_client_latest |
| cls.volumes_extension_client =\ |
| cls.os_primary.volumes_extension_client_latest |
| cls.availability_zone_client = ( |
| cls.os_primary.volume_availability_zone_client_latest) |
| cls.volume_limits_client = cls.os_primary.volume_limits_client_latest |
| |
| @classmethod |
| def resource_setup(cls): |
| super(BaseVolumeTest, cls).resource_setup() |
| cls.volume_request_microversion = ( |
| api_version_utils.select_request_microversion( |
| cls.volume_min_microversion, |
| CONF.volume.min_microversion)) |
| cls.compute_request_microversion = ( |
| api_version_utils.select_request_microversion( |
| cls.min_microversion, |
| CONF.compute.min_microversion)) |
| cls.setup_api_microversion_fixture( |
| compute_microversion=cls.compute_request_microversion, |
| volume_microversion=cls.volume_request_microversion) |
| |
| cls.image_ref = CONF.compute.image_ref |
| cls.flavor_ref = CONF.compute.flavor_ref |
| cls.build_interval = CONF.volume.build_interval |
| cls.build_timeout = CONF.volume.build_timeout |
| |
| @cleanup_order |
| def create_volume(self, wait_until='available', **kwargs): |
| """Wrapper utility that returns a test volume. |
| |
| :param wait_until: wait till volume status, None means no wait. |
| """ |
| if 'size' not in kwargs: |
| kwargs['size'] = CONF.volume.volume_size |
| |
| if 'imageRef' in kwargs: |
| image = self.images_client.show_image(kwargs['imageRef']) |
| min_disk = image['min_disk'] |
| kwargs['size'] = max(kwargs['size'], min_disk) |
| |
| if 'name' not in kwargs: |
| name = data_utils.rand_name(self.__name__ + '-Volume') |
| kwargs['name'] = name |
| |
| if CONF.volume.volume_type and 'volume_type' not in kwargs: |
| # If volume_type is not provided in config then no need to |
| # add a volume type and |
| # if volume_type has already been added by child class then |
| # no need to override. |
| kwargs['volume_type'] = CONF.volume.volume_type |
| |
| if CONF.compute.compute_volume_common_az: |
| kwargs.setdefault('availability_zone', |
| CONF.compute.compute_volume_common_az) |
| |
| volume = self.volumes_client.create_volume(**kwargs)['volume'] |
| self.cleanup(test_utils.call_and_ignore_notfound_exc, |
| self._delete_volume_for_cleanup, |
| self.volumes_client, volume['id']) |
| if wait_until: |
| waiters.wait_for_volume_resource_status(self.volumes_client, |
| volume['id'], wait_until) |
| return volume |
| |
| @staticmethod |
| def _delete_volume_for_cleanup(volumes_client, volume_id): |
| """Delete a volume (only) for cleanup. |
| |
| If it is attached to a server, wait for it to become available, |
| assuming we have already deleted the server and just need nova to |
| complete the delete operation before it is available to be deleted. |
| Otherwise proceed to the regular delete_volume(). |
| """ |
| try: |
| vol = volumes_client.show_volume(volume_id)['volume'] |
| if vol['status'] == 'in-use': |
| waiters.wait_for_volume_resource_status(volumes_client, |
| volume_id, |
| 'available') |
| except lib_exc.NotFound: |
| pass |
| BaseVolumeTest.delete_volume(volumes_client, volume_id) |
| |
| @cleanup_order |
| def create_snapshot(self, volume_id=1, **kwargs): |
| """Wrapper utility that returns a test snapshot.""" |
| if 'name' not in kwargs: |
| name = data_utils.rand_name(self.__name__ + '-Snapshot') |
| kwargs['name'] = name |
| |
| snapshot = self.snapshots_client.create_snapshot( |
| volume_id=volume_id, **kwargs)['snapshot'] |
| self.cleanup(test_utils.call_and_ignore_notfound_exc, |
| self.delete_snapshot, snapshot['id']) |
| waiters.wait_for_volume_resource_status(self.snapshots_client, |
| snapshot['id'], 'available') |
| return snapshot |
| |
| def create_backup(self, volume_id, backup_client=None, object_client=None, |
| container_client=None, **kwargs): |
| """Wrapper utility that returns a test backup.""" |
| if backup_client is None: |
| backup_client = self.backups_client |
| if container_client is None: |
| container_client = self.container_client |
| if object_client is None: |
| object_client = self.object_client |
| if 'name' not in kwargs: |
| name = data_utils.rand_name(self.__class__.__name__ + '-Backup') |
| kwargs['name'] = name |
| if 'container' not in kwargs: |
| cont_name = self.__class__.__name__ + '-backup-container' |
| cont = data_utils.rand_name(cont_name) |
| kwargs['container'] = cont |
| |
| backup = backup_client.create_backup( |
| volume_id=volume_id, **kwargs)['backup'] |
| |
| if CONF.service_available.swift: |
| self.addCleanup(object_storage.delete_containers, |
| kwargs['container'], container_client, |
| object_client) |
| |
| # addCleanup uses list pop to cleanup. Wait should be added before |
| # the backup is deleted |
| self.addCleanup(backup_client.wait_for_resource_deletion, |
| backup['id']) |
| self.addCleanup(backup_client.delete_backup, backup['id']) |
| waiters.wait_for_volume_resource_status(backup_client, backup['id'], |
| 'available') |
| return backup |
| |
| # NOTE(afazekas): these create_* and clean_* could be defined |
| # only in a single location in the source, and could be more general. |
| |
| @staticmethod |
| def delete_volume(client, volume_id): |
| """Delete volume by the given client""" |
| client.delete_volume(volume_id) |
| client.wait_for_resource_deletion(volume_id) |
| |
| @cleanup_order |
| def delete_snapshot(self, snapshot_id, snapshots_client=None): |
| """Delete snapshot by the given client""" |
| if snapshots_client is None: |
| snapshots_client = self.snapshots_client |
| snapshots_client.delete_snapshot(snapshot_id) |
| snapshots_client.wait_for_resource_deletion(snapshot_id) |
| |
| def attach_volume(self, server_id, volume_id, wait_for_detach=True): |
| """Attach a volume to a server""" |
| self.servers_client.attach_volume( |
| server_id, volumeId=volume_id, |
| device='/dev/%s' % CONF.compute.volume_device_name) |
| waiters.wait_for_volume_resource_status(self.volumes_client, |
| volume_id, 'in-use') |
| if wait_for_detach: |
| self.addCleanup(waiters.wait_for_volume_resource_status, |
| self.volumes_client, volume_id, 'available', |
| server_id, self.servers_client) |
| self.addCleanup(self.servers_client.detach_volume, server_id, |
| volume_id) |
| |
| def create_server(self, wait_until='ACTIVE', **kwargs): |
| name = kwargs.pop( |
| 'name', |
| data_utils.rand_name(self.__class__.__name__ + '-instance')) |
| |
| if wait_until == 'SSHABLE' and not kwargs.get('validation_resources'): |
| # If we were asked for SSHABLE but were not provided with the |
| # required validation_resources and validatable flag, ensure we |
| # pass them to create_test_server() so that it will actually wait. |
| kwargs['validation_resources'] = ( |
| self.get_test_validation_resources(self.os_primary)) |
| kwargs['validatable'] = True |
| |
| tenant_network = self.get_tenant_network() |
| body, _ = compute.create_test_server( |
| self.os_primary, |
| tenant_network=tenant_network, |
| name=name, |
| wait_until=wait_until, |
| **kwargs) |
| |
| self.addCleanup(test_utils.call_and_ignore_notfound_exc, |
| waiters.wait_for_server_termination, |
| self.servers_client, body['id']) |
| self.addCleanup(test_utils.call_and_ignore_notfound_exc, |
| self.servers_client.delete_server, body['id']) |
| return body |
| |
| def create_group(self, **kwargs): |
| if 'name' not in kwargs: |
| kwargs['name'] = data_utils.rand_name( |
| self.__class__.__name__ + '-Group') |
| |
| group = self.groups_client.create_group(**kwargs)['group'] |
| self.addCleanup(test_utils.call_and_ignore_notfound_exc, |
| self.delete_group, group['id']) |
| waiters.wait_for_volume_resource_status( |
| self.groups_client, group['id'], 'available') |
| return group |
| |
| def delete_group(self, group_id, delete_volumes=True): |
| group_vols = [] |
| if delete_volumes: |
| vols = self.volumes_client.list_volumes(detail=True)['volumes'] |
| for vol in vols: |
| if vol['group_id'] == group_id: |
| group_vols.append(vol['id']) |
| self.groups_client.delete_group(group_id, delete_volumes) |
| for vol in group_vols: |
| self.volumes_client.wait_for_resource_deletion(vol) |
| self.groups_client.wait_for_resource_deletion(group_id) |
| |
| |
| class BaseVolumeAdminTest(BaseVolumeTest): |
| """Base test case class for all Volume Admin API tests.""" |
| |
| credentials = ['primary', 'admin'] |
| |
| @classmethod |
| def setup_clients(cls): |
| super(BaseVolumeAdminTest, cls).setup_clients() |
| |
| cls.admin_volume_qos_client = cls.os_admin.volume_qos_client_latest |
| cls.admin_volume_services_client = \ |
| cls.os_admin.volume_services_client_latest |
| cls.admin_volume_types_client = cls.os_admin.volume_types_client_latest |
| cls.admin_volume_manage_client = ( |
| cls.os_admin.volume_manage_client_latest) |
| cls.admin_volume_client = cls.os_admin.volumes_client_latest |
| cls.admin_groups_client = cls.os_admin.groups_client_latest |
| cls.admin_messages_client = cls.os_admin.volume_messages_client_latest |
| cls.admin_group_snapshots_client = \ |
| cls.os_admin.group_snapshots_client_latest |
| cls.admin_group_types_client = cls.os_admin.group_types_client_latest |
| cls.admin_hosts_client = cls.os_admin.volume_hosts_client_latest |
| cls.admin_snapshot_manage_client = \ |
| cls.os_admin.snapshot_manage_client_latest |
| cls.admin_snapshots_client = cls.os_admin.snapshots_client_latest |
| cls.admin_backups_client = cls.os_admin.backups_client_latest |
| cls.admin_encryption_types_client = \ |
| cls.os_admin.encryption_types_client_latest |
| cls.admin_quota_classes_client = \ |
| cls.os_admin.volume_quota_classes_client_latest |
| cls.admin_quotas_client = cls.os_admin.volume_quotas_client_latest |
| cls.admin_volume_limits_client = ( |
| cls.os_admin.volume_limits_client_latest) |
| cls.admin_capabilities_client = \ |
| cls.os_admin.volume_capabilities_client_latest |
| cls.admin_scheduler_stats_client = \ |
| cls.os_admin.volume_scheduler_stats_client_latest |
| |
| @cleanup_order |
| def create_test_qos_specs(self, name=None, consumer=None, **kwargs): |
| """create a test Qos-Specs.""" |
| name = name or data_utils.rand_name(self.__name__ + '-QoS') |
| consumer = consumer or 'front-end' |
| qos_specs = self.admin_volume_qos_client.create_qos( |
| name=name, consumer=consumer, **kwargs)['qos_specs'] |
| self.cleanup(self.clear_qos_spec, qos_specs['id']) |
| return qos_specs |
| |
| @cleanup_order |
| def create_volume_type(self, name=None, **kwargs): |
| """Create a test volume-type""" |
| name = name or data_utils.rand_name(self.__name__ + '-volume-type') |
| volume_type = self.admin_volume_types_client.create_volume_type( |
| name=name, **kwargs)['volume_type'] |
| self.cleanup(self.clear_volume_type, volume_type['id']) |
| return volume_type |
| |
| def create_encryption_type(self, type_id=None, provider=None, |
| key_size=None, cipher=None, |
| control_location=None): |
| if not type_id: |
| volume_type = self.create_volume_type() |
| type_id = volume_type['id'] |
| self.admin_encryption_types_client.create_encryption_type( |
| type_id, provider=provider, key_size=key_size, cipher=cipher, |
| control_location=control_location) |
| |
| def create_encrypted_volume(self, encryption_provider, key_size=256, |
| cipher='aes-xts-plain64', |
| control_location='front-end'): |
| volume_type = self.create_volume_type() |
| self.create_encryption_type(type_id=volume_type['id'], |
| provider=encryption_provider, |
| key_size=key_size, |
| cipher=cipher, |
| control_location=control_location) |
| return self.create_volume(volume_type=volume_type['name']) |
| |
| def create_group_type(self, name=None, **kwargs): |
| """Create a test group-type""" |
| name = name or data_utils.rand_name( |
| self.__class__.__name__ + '-group-type') |
| group_type = self.admin_group_types_client.create_group_type( |
| name=name, **kwargs)['group_type'] |
| self.addCleanup(self.admin_group_types_client.delete_group_type, |
| group_type['id']) |
| return group_type |
| |
| @cleanup_order |
| def clear_qos_spec(self, qos_id): |
| test_utils.call_and_ignore_notfound_exc( |
| self.admin_volume_qos_client.delete_qos, qos_id) |
| |
| test_utils.call_and_ignore_notfound_exc( |
| self.admin_volume_qos_client.wait_for_resource_deletion, qos_id) |
| |
| @cleanup_order |
| def clear_volume_type(self, vol_type_id): |
| test_utils.call_and_ignore_notfound_exc( |
| self.admin_volume_types_client.delete_volume_type, vol_type_id) |
| |
| test_utils.call_and_ignore_notfound_exc( |
| self.admin_volume_types_client.wait_for_resource_deletion, |
| vol_type_id) |