blob: b1eba9bae8fb743e401c447e29bd64e5b5dd0637 [file] [log] [blame]
lkuchlan7b2566a2021-04-14 10:31:31 +03001# Copyright 2022 Red Hat, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16
17from tempest import clients
18from tempest import config
19from tempest.lib import auth
20from tempest.lib.common.utils import data_utils
21from tempest.lib.common.utils import test_utils
22
23from manila_tempest_tests.common import waiters
24
25CONF = config.CONF
26
27
28class ShareRbacBaseTests(object):
29
30 identity_version = 'v3'
31 protocols = ['nfs', 'cifs', 'glusterfs', 'hdfs', 'cephfs', 'maprfs']
32
33 @classmethod
34 def skip_checks(cls):
35 super(ShareRbacBaseTests, cls).skip_checks()
36 if not CONF.enforce_scope.manila:
37 raise cls.skipException(
38 "Tempest is not configured to enforce_scope for manila, "
39 "skipping RBAC tests. To enable these tests set "
40 "`tempest.conf [enforce_scope] manila=True`."
41 )
42 if not CONF.share.default_share_type_name:
43 raise cls.skipException("Secure rbac tests require a default "
44 "share type")
45 if not any(p in CONF.share.enable_protocols for p in cls.protocols):
46 message = "%s tests are disabled" % cls.protocol
47 raise cls.skipException(message)
48
49 @classmethod
50 def delete_resource(cls, client, **kwargs):
51 key_names = {
52 'st': 'share_type',
53 'sn': 'share_network',
54 }
55 key, resource_id = list(kwargs.items())[0]
56 key = key.split('_')[0]
57 resource_name = key_names[key] if key in key_names else key
58
59 del_action = getattr(client, 'delete_{}'.format(resource_name))
60 test_utils.call_and_ignore_notfound_exc(
61 del_action, resource_id)
62 test_utils.call_and_ignore_notfound_exc(
63 client.wait_for_resource_deletion, **kwargs)
64
65 @classmethod
66 def create_share(cls, client, share_type_id, size=None, name=None,
67 metadata=None):
68 kwargs = {}
69 name = name or data_utils.rand_name('share')
70 metadata = metadata or {}
71 kwargs.update({
72 'share_protocol': cls.protocol,
73 'size': size or CONF.share.share_size,
74 'name': name or data_utils.rand_name('share'),
75 'share_type_id': share_type_id,
76 'metadata': metadata,
77 })
78 share = client.create_share(**kwargs)['share']
79 waiters.wait_for_resource_status(client, share['id'], 'available')
80 cls.addClassResourceCleanup(
81 cls.delete_resource, client,
82 share_id=share['id'])
83 return share
84
85 @classmethod
lkuchlan5bd5f542022-11-02 12:22:56 +020086 def create_snapshot(cls, client, share_id, name=None, metadata=None):
lkuchlan7b2566a2021-04-14 10:31:31 +030087 name = name or data_utils.rand_name('snapshot')
lkuchlan5bd5f542022-11-02 12:22:56 +020088 snapshot = client.create_snapshot(
89 share_id, name=name, metadata=metadata)['snapshot']
lkuchlan7b2566a2021-04-14 10:31:31 +030090 waiters.wait_for_resource_status(
91 client, snapshot['id'], 'available', resource_name='snapshot')
92 cls.addClassResourceCleanup(
93 cls.delete_resource, client, snapshot_id=snapshot['id'])
94 return snapshot
95
96 @classmethod
97 def create_share_network(cls, client, name=None):
98 name = name or data_utils.rand_name('share_network')
99 share_network = client.create_share_network(name=name)['share_network']
100
101 cls.addClassResourceCleanup(
102 cls.delete_resource, client, sn_id=share_network['id'])
103 return share_network
104
105 @classmethod
lkuchlan4db10d42022-11-22 12:05:47 +0200106 def create_share_type(cls):
107 name = data_utils.rand_name('share-type')
108 extra_specs = {
109 'driver_handles_share_servers': CONF.share.multitenancy_enabled,
110 }
111 share_type = cls.admin_shares_v2_client.create_share_type(
112 name=name, extra_specs=extra_specs)['share_type']
113 cls.addClassResourceCleanup(
114 cls.delete_resource, cls.admin_shares_v2_client,
115 st_id=share_type['id'])
116 return share_type
117
118 @classmethod
lkuchlan7b2566a2021-04-14 10:31:31 +0300119 def get_share_type(cls):
120 return cls.shares_v2_client.get_default_share_type()['share_type']
121
122 def do_request(self, method, expected_status=200, client=None, **payload):
123 if not client:
124 client = self.client
125 if isinstance(expected_status, type(Exception)):
126 self.assertRaises(expected_status,
127 getattr(client, method),
128 **payload)
129 else:
130 response = getattr(client, method)(**payload)
131 self.assertEqual(response.response.status, expected_status)
132 return response
133
134 @classmethod
135 def setup_user_client(cls, client, project_id=None):
136 """Set up project user with its own client.
137
138 This is useful for testing protection of resources in separate
139 projects.
140 NOTE(lkuchlan): Tempest creates 'project_member' and 'project_reader'
141 dynamic credentials in different projects. So this method is also
142 necessary for testing protection of resources in a specific project.
143
144 Returns a client object and the user's ID.
145 """
146
147 projects_client = client.identity_v3.ProjectsClient()
148 users_client = client.identity_v3.UsersClient()
149 roles_client = client.identity_v3.RolesClient()
150
151 user_dict = {
152 'name': data_utils.rand_name('user'),
153 'password': data_utils.rand_password(),
154 }
155 user_id = users_client.create_user(
156 **user_dict)['user']['id']
157 cls.addClassResourceCleanup(users_client.delete_user, user_id)
158
159 if not project_id:
160 project_id = projects_client.create_project(
161 data_utils.rand_name())['project']['id']
162 cls.addClassResourceCleanup(
163 projects_client.delete_project,
164 project_id)
165
166 member_role_id = roles_client.list_roles(
167 name='member')['roles'][0]['id']
168 roles_client.create_user_role_on_project(
169 project_id, user_id, member_role_id)
170 creds = auth.KeystoneV3Credentials(
171 user_id=user_id,
172 password=user_dict['password'],
173 project_id=project_id)
174 auth_provider = clients.get_auth_provider(creds)
175 creds = auth_provider.fill_credentials()
176 client = clients.Manager(credentials=creds)
177 return client