blob: 5aaf35e254a44b12eff4f3d44c342998bd16b4ee [file] [log] [blame]
lkuchlan7b2566a2021-04-14 10:31:31 +03001# Copyright 2022 Red Hat, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16
17from tempest import clients
18from tempest import config
19from tempest.lib import auth
20from tempest.lib.common.utils import data_utils
21from tempest.lib.common.utils import test_utils
22
23from manila_tempest_tests.common import waiters
24
25CONF = config.CONF
26
27
28class ShareRbacBaseTests(object):
29
30 identity_version = 'v3'
31 protocols = ['nfs', 'cifs', 'glusterfs', 'hdfs', 'cephfs', 'maprfs']
32
33 @classmethod
34 def skip_checks(cls):
35 super(ShareRbacBaseTests, cls).skip_checks()
36 if not CONF.enforce_scope.manila:
37 raise cls.skipException(
38 "Tempest is not configured to enforce_scope for manila, "
39 "skipping RBAC tests. To enable these tests set "
40 "`tempest.conf [enforce_scope] manila=True`."
41 )
42 if not CONF.share.default_share_type_name:
43 raise cls.skipException("Secure rbac tests require a default "
44 "share type")
45 if not any(p in CONF.share.enable_protocols for p in cls.protocols):
46 message = "%s tests are disabled" % cls.protocol
47 raise cls.skipException(message)
48
49 @classmethod
50 def delete_resource(cls, client, **kwargs):
51 key_names = {
52 'st': 'share_type',
53 'sn': 'share_network',
54 }
55 key, resource_id = list(kwargs.items())[0]
56 key = key.split('_')[0]
57 resource_name = key_names[key] if key in key_names else key
58
59 del_action = getattr(client, 'delete_{}'.format(resource_name))
60 test_utils.call_and_ignore_notfound_exc(
61 del_action, resource_id)
62 test_utils.call_and_ignore_notfound_exc(
63 client.wait_for_resource_deletion, **kwargs)
64
65 @classmethod
66 def create_share(cls, client, share_type_id, size=None, name=None,
67 metadata=None):
68 kwargs = {}
69 name = name or data_utils.rand_name('share')
70 metadata = metadata or {}
71 kwargs.update({
72 'share_protocol': cls.protocol,
73 'size': size or CONF.share.share_size,
74 'name': name or data_utils.rand_name('share'),
75 'share_type_id': share_type_id,
76 'metadata': metadata,
77 })
78 share = client.create_share(**kwargs)['share']
79 waiters.wait_for_resource_status(client, share['id'], 'available')
80 cls.addClassResourceCleanup(
81 cls.delete_resource, client,
82 share_id=share['id'])
83 return share
84
85 @classmethod
86 def create_snapshot(cls, client, share_id, name=None):
87 name = name or data_utils.rand_name('snapshot')
88 snapshot = client.create_snapshot(share_id, name=name)['snapshot']
89 waiters.wait_for_resource_status(
90 client, snapshot['id'], 'available', resource_name='snapshot')
91 cls.addClassResourceCleanup(
92 cls.delete_resource, client, snapshot_id=snapshot['id'])
93 return snapshot
94
95 @classmethod
96 def create_share_network(cls, client, name=None):
97 name = name or data_utils.rand_name('share_network')
98 share_network = client.create_share_network(name=name)['share_network']
99
100 cls.addClassResourceCleanup(
101 cls.delete_resource, client, sn_id=share_network['id'])
102 return share_network
103
104 @classmethod
lkuchlan4db10d42022-11-22 12:05:47 +0200105 def create_share_type(cls):
106 name = data_utils.rand_name('share-type')
107 extra_specs = {
108 'driver_handles_share_servers': CONF.share.multitenancy_enabled,
109 }
110 share_type = cls.admin_shares_v2_client.create_share_type(
111 name=name, extra_specs=extra_specs)['share_type']
112 cls.addClassResourceCleanup(
113 cls.delete_resource, cls.admin_shares_v2_client,
114 st_id=share_type['id'])
115 return share_type
116
117 @classmethod
lkuchlan7b2566a2021-04-14 10:31:31 +0300118 def get_share_type(cls):
119 return cls.shares_v2_client.get_default_share_type()['share_type']
120
121 def do_request(self, method, expected_status=200, client=None, **payload):
122 if not client:
123 client = self.client
124 if isinstance(expected_status, type(Exception)):
125 self.assertRaises(expected_status,
126 getattr(client, method),
127 **payload)
128 else:
129 response = getattr(client, method)(**payload)
130 self.assertEqual(response.response.status, expected_status)
131 return response
132
133 @classmethod
134 def setup_user_client(cls, client, project_id=None):
135 """Set up project user with its own client.
136
137 This is useful for testing protection of resources in separate
138 projects.
139 NOTE(lkuchlan): Tempest creates 'project_member' and 'project_reader'
140 dynamic credentials in different projects. So this method is also
141 necessary for testing protection of resources in a specific project.
142
143 Returns a client object and the user's ID.
144 """
145
146 projects_client = client.identity_v3.ProjectsClient()
147 users_client = client.identity_v3.UsersClient()
148 roles_client = client.identity_v3.RolesClient()
149
150 user_dict = {
151 'name': data_utils.rand_name('user'),
152 'password': data_utils.rand_password(),
153 }
154 user_id = users_client.create_user(
155 **user_dict)['user']['id']
156 cls.addClassResourceCleanup(users_client.delete_user, user_id)
157
158 if not project_id:
159 project_id = projects_client.create_project(
160 data_utils.rand_name())['project']['id']
161 cls.addClassResourceCleanup(
162 projects_client.delete_project,
163 project_id)
164
165 member_role_id = roles_client.list_roles(
166 name='member')['roles'][0]['id']
167 roles_client.create_user_role_on_project(
168 project_id, user_id, member_role_id)
169 creds = auth.KeystoneV3Credentials(
170 user_id=user_id,
171 password=user_dict['password'],
172 project_id=project_id)
173 auth_provider = clients.get_auth_provider(creds)
174 creds = auth_provider.fill_credentials()
175 client = clients.Manager(credentials=creds)
176 return client