blob: 675d0bc92ba5cdf99c9e3447c4bff98515c2965e [file] [log] [blame]
ghanshyamdae4b6d2018-01-12 07:49:15 +00001# Copyright 2017 NEC Corporation.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15
16from tempest.api.volume import api_microversion_fixture
17from tempest.common import compute
18from tempest.common import waiters
19from tempest import config
20from tempest.lib.common import api_version_utils
21from tempest.lib.common.utils import data_utils
22from tempest.lib.common.utils import test_utils
ghanshyamdae4b6d2018-01-12 07:49:15 +000023from tempest import test
24
25CONF = config.CONF
26
27
28class BaseVolumeTest(api_version_utils.BaseMicroversionTest,
29 test.BaseTestCase):
30 """Base test case class for all Cinder API tests."""
31
ghanshyamdae4b6d2018-01-12 07:49:15 +000032 credentials = ['primary']
33
34 @classmethod
35 def skip_checks(cls):
36 super(BaseVolumeTest, cls).skip_checks()
37
38 if not CONF.service_available.cinder:
39 skip_msg = ("%s skipped as Cinder is not available" % cls.__name__)
40 raise cls.skipException(skip_msg)
ghanshyamdae4b6d2018-01-12 07:49:15 +000041
42 api_version_utils.check_skip_with_microversion(
43 cls.min_microversion, cls.max_microversion,
44 CONF.volume.min_microversion, CONF.volume.max_microversion)
45
46 @classmethod
47 def setup_clients(cls):
48 super(BaseVolumeTest, cls).setup_clients()
Luigi Toscano6f0b4fc2019-12-12 19:27:30 +010049 cls.backups_client = cls.os_primary.backups_client_latest
50 cls.volumes_client = cls.os_primary.volumes_client_latest
51 cls.snapshots_client = cls.os_primary.snapshots_client_latest
ghanshyamdae4b6d2018-01-12 07:49:15 +000052
53 @classmethod
54 def setup_credentials(cls):
55 cls.set_network_resources()
56 super(BaseVolumeTest, cls).setup_credentials()
57
58 def setUp(self):
59 super(BaseVolumeTest, self).setUp()
60 self.useFixture(api_microversion_fixture.APIMicroversionFixture(
61 self.request_microversion))
62
63 @classmethod
64 def resource_setup(cls):
65 super(BaseVolumeTest, cls).resource_setup()
66 cls.request_microversion = (
67 api_version_utils.select_request_microversion(
68 cls.min_microversion,
69 CONF.volume.min_microversion))
70
71 @classmethod
72 def create_volume(cls, wait_until='available', **kwargs):
73 """Wrapper utility that returns a test volume.
74
75 :param wait_until: wait till volume status.
76 """
77 if 'size' not in kwargs:
78 kwargs['size'] = CONF.volume.volume_size
79
80 if 'imageRef' in kwargs:
81 image = cls.os_primary.image_client_v2.show_image(
82 kwargs['imageRef'])
83 min_disk = image['min_disk']
84 kwargs['size'] = max(kwargs['size'], min_disk)
85
86 if 'name' not in kwargs:
87 name = data_utils.rand_name(cls.__name__ + '-Volume')
88 kwargs['name'] = name
89
90 volume = cls.volumes_client.create_volume(**kwargs)['volume']
91 cls.addClassResourceCleanup(
92 cls.volumes_client.wait_for_resource_deletion, volume['id'])
93 cls.addClassResourceCleanup(test_utils.call_and_ignore_notfound_exc,
94 cls.volumes_client.delete_volume,
95 volume['id'])
96 waiters.wait_for_volume_resource_status(cls.volumes_client,
97 volume['id'], wait_until)
98 return volume
99
100 @classmethod
101 def create_snapshot(cls, volume_id=1, **kwargs):
102 """Wrapper utility that returns a test snapshot."""
103 if 'name' not in kwargs:
104 name = data_utils.rand_name(cls.__name__ + '-Snapshot')
105 kwargs['name'] = name
106
107 snapshot = cls.snapshots_client.create_snapshot(
108 volume_id=volume_id, **kwargs)['snapshot']
109 cls.addClassResourceCleanup(
110 cls.snapshots_client.wait_for_resource_deletion, snapshot['id'])
111 cls.addClassResourceCleanup(test_utils.call_and_ignore_notfound_exc,
112 cls.snapshots_client.delete_snapshot,
113 snapshot['id'])
114 waiters.wait_for_volume_resource_status(cls.snapshots_client,
115 snapshot['id'], 'available')
116 return snapshot
117
118 def create_backup(self, volume_id, backup_client=None, **kwargs):
119 """Wrapper utility that returns a test backup."""
120 if backup_client is None:
121 backup_client = self.backups_client
122 if 'name' not in kwargs:
123 name = data_utils.rand_name(self.__class__.__name__ + '-Backup')
124 kwargs['name'] = name
125
126 backup = backup_client.create_backup(
127 volume_id=volume_id, **kwargs)['backup']
melanie wittc96757b2018-05-31 00:05:00 +0000128 self.addCleanup(backup_client.wait_for_resource_deletion, backup['id'])
129 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
130 backup_client.delete_backup, backup['id'])
ghanshyamdae4b6d2018-01-12 07:49:15 +0000131 waiters.wait_for_volume_resource_status(backup_client, backup['id'],
132 'available')
133 return backup
134
135 def create_server(self, wait_until='ACTIVE', **kwargs):
136 name = kwargs.pop(
137 'name',
138 data_utils.rand_name(self.__class__.__name__ + '-instance'))
139
140 tenant_network = self.get_tenant_network()
141 body, _ = compute.create_test_server(
142 self.os_primary,
143 tenant_network=tenant_network,
144 name=name,
145 wait_until=wait_until,
146 **kwargs)
147
148 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
149 waiters.wait_for_server_termination,
150 self.os_primary.servers_client, body['id'])
151 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
152 self.os_primary.servers_client.delete_server,
153 body['id'])
154 return body
Miriam Yumifa8791e2018-09-04 14:42:14 -0300155
156
157class BaseVolumeAdminTest(BaseVolumeTest):
158 """Base test case class for all Volume Admin API tests."""
159
160 credentials = ['primary', 'admin']
161
162 @classmethod
163 def setup_clients(cls):
164 super(BaseVolumeAdminTest, cls).setup_clients()
165
166 cls.admin_volume_types_client = cls.os_admin.volume_types_client_latest