| # Copyright 2022 Red Hat, Inc. |
| # All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| |
| |
| from tempest import clients |
| from tempest import config |
| from tempest.lib import auth |
| from tempest.lib.common.utils import data_utils |
| from tempest.lib.common.utils import test_utils |
| |
| from manila_tempest_tests.common import waiters |
| |
| CONF = config.CONF |
| |
| |
| class ShareRbacBaseTests(object): |
| |
| identity_version = 'v3' |
| protocols = ['nfs', 'cifs', 'glusterfs', 'hdfs', 'cephfs', 'maprfs'] |
| |
| @classmethod |
| def skip_checks(cls): |
| super(ShareRbacBaseTests, cls).skip_checks() |
| if not CONF.enforce_scope.manila: |
| raise cls.skipException( |
| "Tempest is not configured to enforce_scope for manila, " |
| "skipping RBAC tests. To enable these tests set " |
| "`tempest.conf [enforce_scope] manila=True`." |
| ) |
| if not CONF.share.default_share_type_name: |
| raise cls.skipException("Secure rbac tests require a default " |
| "share type") |
| if not any(p in CONF.share.enable_protocols for p in cls.protocols): |
| message = "%s tests are disabled" % cls.protocol |
| raise cls.skipException(message) |
| |
| @classmethod |
| def delete_resource(cls, client, **kwargs): |
| key_names = { |
| 'st': 'share_type', |
| 'sn': 'share_network', |
| } |
| key, resource_id = list(kwargs.items())[0] |
| key = key.rsplit('_', 1)[0] |
| resource_name = key_names[key] if key in key_names else key |
| |
| del_action = getattr(client, 'delete_{}'.format(resource_name)) |
| test_utils.call_and_ignore_notfound_exc( |
| del_action, resource_id) |
| test_utils.call_and_ignore_notfound_exc( |
| client.wait_for_resource_deletion, **kwargs) |
| |
| @classmethod |
| def create_share(cls, client, share_type_id, size=None, name=None, |
| metadata=None): |
| kwargs = {} |
| name = name or data_utils.rand_name('share') |
| metadata = metadata or {} |
| kwargs.update({ |
| 'share_protocol': cls.protocol, |
| 'size': size or CONF.share.share_size, |
| 'name': name or data_utils.rand_name('share'), |
| 'share_type_id': share_type_id, |
| 'metadata': metadata, |
| }) |
| share = client.create_share(**kwargs)['share'] |
| waiters.wait_for_resource_status(client, share['id'], 'available') |
| cls.addClassResourceCleanup( |
| cls.delete_resource, client, |
| share_id=share['id']) |
| return share |
| |
| @classmethod |
| def create_snapshot(cls, client, share_id, name=None, metadata=None): |
| name = name or data_utils.rand_name('snapshot') |
| snapshot = client.create_snapshot( |
| share_id, name=name, metadata=metadata)['snapshot'] |
| waiters.wait_for_resource_status( |
| client, snapshot['id'], 'available', resource_name='snapshot') |
| cls.addClassResourceCleanup( |
| cls.delete_resource, client, snapshot_id=snapshot['id']) |
| return snapshot |
| |
| @classmethod |
| def create_share_network(cls, client, name=None): |
| name = name or data_utils.rand_name('share_network') |
| share_network = client.create_share_network(name=name)['share_network'] |
| |
| cls.addClassResourceCleanup( |
| cls.delete_resource, client, sn_id=share_network['id']) |
| return share_network |
| |
| @classmethod |
| def create_share_type(cls): |
| name = data_utils.rand_name('share-type') |
| extra_specs = { |
| 'driver_handles_share_servers': CONF.share.multitenancy_enabled, |
| } |
| share_type = cls.admin_shares_v2_client.create_share_type( |
| name=name, extra_specs=extra_specs)['share_type'] |
| cls.addClassResourceCleanup( |
| cls.delete_resource, cls.admin_shares_v2_client, |
| st_id=share_type['id']) |
| return share_type |
| |
| @classmethod |
| def create_share_group_type(cls, share_types, is_public=True, |
| group_specs=None): |
| name = data_utils.rand_name('share-group-type') |
| share_group_type = ( |
| cls.admin_shares_v2_client.create_share_group_type( |
| name=name, share_types=share_types, is_public=is_public, |
| group_specs=group_specs))['share_group_type'] |
| cls.addClassResourceCleanup( |
| cls.delete_resource, cls.admin_shares_v2_client, |
| share_group_type_id=share_group_type['id']) |
| return share_group_type |
| |
| @classmethod |
| def create_share_group(cls, client, share_group_type_id, share_type_ids): |
| name = data_utils.rand_name('share-group') |
| share_group = client.create_share_group( |
| name=name, share_group_type_id=share_group_type_id, |
| share_type_ids=share_type_ids)['share_group'] |
| waiters.wait_for_resource_status( |
| client, share_group['id'], 'available', |
| resource_name='share_group') |
| cls.addClassResourceCleanup( |
| cls.delete_resource, client, |
| share_group_id=share_group['id']) |
| return share_group |
| |
| @classmethod |
| def get_share_type(cls): |
| return cls.shares_v2_client.get_default_share_type()['share_type'] |
| |
| def do_request(self, method, expected_status=200, client=None, **payload): |
| if not client: |
| client = self.client |
| if isinstance(expected_status, type(Exception)): |
| self.assertRaises(expected_status, |
| getattr(client, method), |
| **payload) |
| else: |
| response = getattr(client, method)(**payload) |
| self.assertEqual(response.response.status, expected_status) |
| return response |
| |
| @classmethod |
| def setup_user_client(cls, client, project_id=None): |
| """Set up project user with its own client. |
| |
| This is useful for testing protection of resources in separate |
| projects. |
| NOTE(lkuchlan): Tempest creates 'project_member' and 'project_reader' |
| dynamic credentials in different projects. So this method is also |
| necessary for testing protection of resources in a specific project. |
| |
| Returns a client object and the user's ID. |
| """ |
| |
| projects_client = client.identity_v3.ProjectsClient() |
| users_client = client.identity_v3.UsersClient() |
| roles_client = client.identity_v3.RolesClient() |
| |
| user_dict = { |
| 'name': data_utils.rand_name('user'), |
| 'password': data_utils.rand_password(), |
| } |
| user_id = users_client.create_user( |
| **user_dict)['user']['id'] |
| cls.addClassResourceCleanup(users_client.delete_user, user_id) |
| |
| if not project_id: |
| project_id = projects_client.create_project( |
| data_utils.rand_name())['project']['id'] |
| cls.addClassResourceCleanup( |
| projects_client.delete_project, |
| project_id) |
| |
| member_role_id = roles_client.list_roles( |
| name='member')['roles'][0]['id'] |
| roles_client.create_user_role_on_project( |
| project_id, user_id, member_role_id) |
| creds = auth.KeystoneV3Credentials( |
| user_id=user_id, |
| password=user_dict['password'], |
| project_id=project_id) |
| auth_provider = clients.get_auth_provider(creds) |
| creds = auth_provider.fill_credentials() |
| client = clients.Manager(credentials=creds) |
| return client |