blob: db537958d059e76ec4bb88af18a47197c8cb920b [file] [log] [blame]
lkuchlan7b2566a2021-04-14 10:31:31 +03001# Copyright 2022 Red Hat, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16
17from tempest import clients
18from tempest import config
19from tempest.lib import auth
20from tempest.lib.common.utils import data_utils
21from tempest.lib.common.utils import test_utils
22
23from manila_tempest_tests.common import waiters
24
25CONF = config.CONF
26
27
28class ShareRbacBaseTests(object):
29
30 identity_version = 'v3'
31 protocols = ['nfs', 'cifs', 'glusterfs', 'hdfs', 'cephfs', 'maprfs']
32
33 @classmethod
34 def skip_checks(cls):
35 super(ShareRbacBaseTests, cls).skip_checks()
36 if not CONF.enforce_scope.manila:
37 raise cls.skipException(
38 "Tempest is not configured to enforce_scope for manila, "
39 "skipping RBAC tests. To enable these tests set "
40 "`tempest.conf [enforce_scope] manila=True`."
41 )
42 if not CONF.share.default_share_type_name:
43 raise cls.skipException("Secure rbac tests require a default "
44 "share type")
45 if not any(p in CONF.share.enable_protocols for p in cls.protocols):
46 message = "%s tests are disabled" % cls.protocol
47 raise cls.skipException(message)
48
49 @classmethod
50 def delete_resource(cls, client, **kwargs):
51 key_names = {
52 'st': 'share_type',
53 'sn': 'share_network',
54 }
55 key, resource_id = list(kwargs.items())[0]
56 key = key.split('_')[0]
57 resource_name = key_names[key] if key in key_names else key
58
59 del_action = getattr(client, 'delete_{}'.format(resource_name))
60 test_utils.call_and_ignore_notfound_exc(
61 del_action, resource_id)
62 test_utils.call_and_ignore_notfound_exc(
63 client.wait_for_resource_deletion, **kwargs)
64
65 @classmethod
66 def create_share(cls, client, share_type_id, size=None, name=None,
67 metadata=None):
68 kwargs = {}
69 name = name or data_utils.rand_name('share')
70 metadata = metadata or {}
71 kwargs.update({
72 'share_protocol': cls.protocol,
73 'size': size or CONF.share.share_size,
74 'name': name or data_utils.rand_name('share'),
75 'share_type_id': share_type_id,
76 'metadata': metadata,
77 })
78 share = client.create_share(**kwargs)['share']
79 waiters.wait_for_resource_status(client, share['id'], 'available')
80 cls.addClassResourceCleanup(
81 cls.delete_resource, client,
82 share_id=share['id'])
83 return share
84
85 @classmethod
86 def create_snapshot(cls, client, share_id, name=None):
87 name = name or data_utils.rand_name('snapshot')
88 snapshot = client.create_snapshot(share_id, name=name)['snapshot']
89 waiters.wait_for_resource_status(
90 client, snapshot['id'], 'available', resource_name='snapshot')
91 cls.addClassResourceCleanup(
92 cls.delete_resource, client, snapshot_id=snapshot['id'])
93 return snapshot
94
95 @classmethod
96 def create_share_network(cls, client, name=None):
97 name = name or data_utils.rand_name('share_network')
98 share_network = client.create_share_network(name=name)['share_network']
99
100 cls.addClassResourceCleanup(
101 cls.delete_resource, client, sn_id=share_network['id'])
102 return share_network
103
104 @classmethod
105 def get_share_type(cls):
106 return cls.shares_v2_client.get_default_share_type()['share_type']
107
108 def do_request(self, method, expected_status=200, client=None, **payload):
109 if not client:
110 client = self.client
111 if isinstance(expected_status, type(Exception)):
112 self.assertRaises(expected_status,
113 getattr(client, method),
114 **payload)
115 else:
116 response = getattr(client, method)(**payload)
117 self.assertEqual(response.response.status, expected_status)
118 return response
119
120 @classmethod
121 def setup_user_client(cls, client, project_id=None):
122 """Set up project user with its own client.
123
124 This is useful for testing protection of resources in separate
125 projects.
126 NOTE(lkuchlan): Tempest creates 'project_member' and 'project_reader'
127 dynamic credentials in different projects. So this method is also
128 necessary for testing protection of resources in a specific project.
129
130 Returns a client object and the user's ID.
131 """
132
133 projects_client = client.identity_v3.ProjectsClient()
134 users_client = client.identity_v3.UsersClient()
135 roles_client = client.identity_v3.RolesClient()
136
137 user_dict = {
138 'name': data_utils.rand_name('user'),
139 'password': data_utils.rand_password(),
140 }
141 user_id = users_client.create_user(
142 **user_dict)['user']['id']
143 cls.addClassResourceCleanup(users_client.delete_user, user_id)
144
145 if not project_id:
146 project_id = projects_client.create_project(
147 data_utils.rand_name())['project']['id']
148 cls.addClassResourceCleanup(
149 projects_client.delete_project,
150 project_id)
151
152 member_role_id = roles_client.list_roles(
153 name='member')['roles'][0]['id']
154 roles_client.create_user_role_on_project(
155 project_id, user_id, member_role_id)
156 creds = auth.KeystoneV3Credentials(
157 user_id=user_id,
158 password=user_dict['password'],
159 project_id=project_id)
160 auth_provider = clients.get_auth_provider(creds)
161 creds = auth_provider.fill_credentials()
162 client = clients.Manager(credentials=creds)
163 return client