blob: 3a60478bedce2e437fea01f101b411dece0bf127 [file] [log] [blame]
Andrea Frittolif4510a12017-03-07 19:17:11 +00001# Copyright 2012 OpenStack Foundation
2# Copyright 2013 IBM Corp.
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Andrea Frittolif4510a12017-03-07 19:17:11 +000017from oslo_log import log
Goutham Pacha Ravi37ee6772019-10-18 12:53:22 -070018from oslo_utils import uuidutils
Andrea Frittolif4510a12017-03-07 19:17:11 +000019from tempest.common import image as common_image
Andrea Frittolif4510a12017-03-07 19:17:11 +000020from tempest import config
Ken'ichi Ohmichi02d1f242017-03-12 18:56:27 -070021from tempest.lib.common.utils import data_utils
Andrea Frittolif4510a12017-03-07 19:17:11 +000022from tempest.lib.common.utils import test_utils
23from tempest.lib import exceptions as lib_exc
Roman Popelka290ef292022-02-28 10:41:04 +010024from tempest.scenario import manager
Andrea Frittolif4510a12017-03-07 19:17:11 +000025
26CONF = config.CONF
27
28LOG = log.getLogger(__name__)
29
30
Roman Popelka1118f3e2022-03-21 09:18:53 +010031class ScenarioTest(manager.NetworkScenarioTest):
Andrea Frittolif4510a12017-03-07 19:17:11 +000032 """Base class for scenario tests. Uses tempest own clients. """
33
34 credentials = ['primary']
35
36 @classmethod
37 def setup_clients(cls):
38 super(ScenarioTest, cls).setup_clients()
39 # Clients (in alphabetical order)
Vu Cong Tuandb2abab2017-06-21 20:38:39 +070040 cls.flavors_client = cls.os_primary.flavors_client
Andrea Frittolif4510a12017-03-07 19:17:11 +000041 cls.compute_floating_ips_client = (
Vu Cong Tuandb2abab2017-06-21 20:38:39 +070042 cls.os_primary.compute_floating_ips_client)
Andrea Frittolif4510a12017-03-07 19:17:11 +000043 if CONF.service_available.glance:
44 # Check if glance v1 is available to determine which client to use.
45 if CONF.image_feature_enabled.api_v1:
Vu Cong Tuandb2abab2017-06-21 20:38:39 +070046 cls.image_client = cls.os_primary.image_client
Andrea Frittolif4510a12017-03-07 19:17:11 +000047 elif CONF.image_feature_enabled.api_v2:
Vu Cong Tuandb2abab2017-06-21 20:38:39 +070048 cls.image_client = cls.os_primary.image_client_v2
Andrea Frittolif4510a12017-03-07 19:17:11 +000049 else:
50 raise lib_exc.InvalidConfiguration(
51 'Either api_v1 or api_v2 must be True in '
52 '[image-feature-enabled].')
53 # Compute image client
Vu Cong Tuandb2abab2017-06-21 20:38:39 +070054 cls.compute_images_client = cls.os_primary.compute_images_client
55 cls.keypairs_client = cls.os_primary.keypairs_client
Andrea Frittolif4510a12017-03-07 19:17:11 +000056 # Nova security groups client
57 cls.compute_security_groups_client = (
Vu Cong Tuandb2abab2017-06-21 20:38:39 +070058 cls.os_primary.compute_security_groups_client)
Andrea Frittolif4510a12017-03-07 19:17:11 +000059 cls.compute_security_group_rules_client = (
Vu Cong Tuandb2abab2017-06-21 20:38:39 +070060 cls.os_primary.compute_security_group_rules_client)
61 cls.servers_client = cls.os_primary.servers_client
62 cls.interface_client = cls.os_primary.interfaces_client
Andrea Frittolif4510a12017-03-07 19:17:11 +000063 # Neutron network client
Vu Cong Tuandb2abab2017-06-21 20:38:39 +070064 cls.networks_client = cls.os_primary.networks_client
65 cls.ports_client = cls.os_primary.ports_client
66 cls.routers_client = cls.os_primary.routers_client
67 cls.subnets_client = cls.os_primary.subnets_client
68 cls.floating_ips_client = cls.os_primary.floating_ips_client
69 cls.security_groups_client = cls.os_primary.security_groups_client
Andrea Frittolif4510a12017-03-07 19:17:11 +000070 cls.security_group_rules_client = (
Vu Cong Tuandb2abab2017-06-21 20:38:39 +070071 cls.os_primary.security_group_rules_client)
Andrea Frittolif4510a12017-03-07 19:17:11 +000072
Andrea Frittolif4510a12017-03-07 19:17:11 +000073 # ## Test functions library
74 #
75 # The create_[resource] functions only return body and discard the
76 # resp part which is not used in scenario tests
77
Andrea Frittolif4510a12017-03-07 19:17:11 +000078 def _image_create(self, name, fmt, path,
79 disk_format=None, properties=None):
80 if properties is None:
81 properties = {}
82 name = data_utils.rand_name('%s-' % name)
83 params = {
84 'name': name,
85 'container_format': fmt,
86 'disk_format': disk_format or fmt,
87 }
88 if CONF.image_feature_enabled.api_v1:
89 params['is_public'] = 'False'
90 params['properties'] = properties
91 params = {'headers': common_image.image_meta_to_headers(**params)}
92 else:
93 params['visibility'] = 'private'
94 # Additional properties are flattened out in the v2 API.
95 params.update(properties)
96 body = self.image_client.create_image(**params)
97 image = body['image'] if 'image' in body else body
98 self.addCleanup(self.image_client.delete_image, image['id'])
99 self.assertEqual("queued", image['status'])
100 with open(path, 'rb') as image_file:
101 if CONF.image_feature_enabled.api_v1:
102 self.image_client.update_image(image['id'], data=image_file)
103 else:
104 self.image_client.store_image_file(image['id'], image_file)
105 return image['id']
106
107 def glance_image_create(self):
Martin Kopec258cc6c2020-04-15 22:55:25 +0000108 img_path = CONF.scenario.img_file
Andrea Frittolif4510a12017-03-07 19:17:11 +0000109 img_container_format = CONF.scenario.img_container_format
110 img_disk_format = CONF.scenario.img_disk_format
111 img_properties = CONF.scenario.img_properties
112 LOG.debug("paths: img: %s, container_format: %s, disk_format: %s, "
Martin Kopec258cc6c2020-04-15 22:55:25 +0000113 "properties: %s",
Andrea Frittolif4510a12017-03-07 19:17:11 +0000114 img_path, img_container_format, img_disk_format,
Martin Kopec258cc6c2020-04-15 22:55:25 +0000115 img_properties)
116 image = self._image_create('scenario-img',
117 img_container_format,
118 img_path,
119 disk_format=img_disk_format,
120 properties=img_properties)
Andrea Frittolif4510a12017-03-07 19:17:11 +0000121 LOG.debug("image:%s", image)
122
123 return image
124
Andrea Frittolif4510a12017-03-07 19:17:11 +0000125 def _log_net_info(self, exc):
126 # network debug is called as part of ssh init
127 if not isinstance(exc, lib_exc.SSHTimeout):
128 LOG.debug('Network information on a devstack host')
129
Andrea Frittolif4510a12017-03-07 19:17:11 +0000130
131class NetworkScenarioTest(ScenarioTest):
132 """Base class for network scenario tests.
133
134 This class provide helpers for network scenario tests, using the neutron
135 API. Helpers from ancestor which use the nova network API are overridden
136 with the neutron API.
137
138 This Class also enforces using Neutron instead of novanetwork.
139 Subclassed tests will be skipped if Neutron is not enabled
140
141 """
142
143 credentials = ['primary', 'admin']
144
145 @classmethod
146 def skip_checks(cls):
147 super(NetworkScenarioTest, cls).skip_checks()
148 if not CONF.service_available.neutron:
149 raise cls.skipException('Neutron not available')
150
Goutham Pacha Ravi37ee6772019-10-18 12:53:22 -0700151 def _get_network_by_name_or_id(self, identifier):
152
153 if uuidutils.is_uuid_like(identifier):
154 return self.os_admin.networks_client.show_network(
155 identifier)['network']
156
157 networks = self.os_admin.networks_client.list_networks(
158 name=identifier)['networks']
159 self.assertNotEqual(len(networks), 0,
160 "Unable to get network by name: %s" % identifier)
161 return networks[0]
162
163 def get_networks(self):
164 return self.os_admin.networks_client.list_networks()['networks']
Andrea Frittolif4510a12017-03-07 19:17:11 +0000165
166 def create_floating_ip(self, thing, external_network_id=None,
lkuchlan7636a1f2020-04-30 16:13:13 +0300167 port_id=None, ip_addr=None, client=None):
Andrea Frittolif4510a12017-03-07 19:17:11 +0000168 """Create a floating IP and associates to a resource/port on Neutron"""
169 if not external_network_id:
170 external_network_id = CONF.network.public_network_id
171 if not client:
172 client = self.floating_ips_client
173 if not port_id:
Roman Popelkaf880ce32022-03-22 13:26:51 +0100174 port_id, ip4 = self.get_server_port_id_and_ip4(thing,
175 ip_addr=ip_addr)
Andrea Frittolif4510a12017-03-07 19:17:11 +0000176 else:
177 ip4 = None
178 result = client.create_floatingip(
179 floating_network_id=external_network_id,
180 port_id=port_id,
181 tenant_id=thing['tenant_id'],
182 fixed_ip_address=ip4
183 )
184 floating_ip = result['floatingip']
185 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
186 client.delete_floatingip,
187 floating_ip['id'])
188 return floating_ip
189
Andrea Frittolif4510a12017-03-07 19:17:11 +0000190 def _default_security_group(self, client=None, tenant_id=None):
191 """Get default secgroup for given tenant_id.
192
193 :returns: default secgroup for given tenant
194 """
195 if client is None:
196 client = self.security_groups_client
197 if not tenant_id:
198 tenant_id = client.tenant_id
199 sgs = [
200 sg for sg in list(client.list_security_groups().values())[0]
201 if sg['tenant_id'] == tenant_id and sg['name'] == 'default'
202 ]
203 msg = "No default security group for tenant %s." % (tenant_id)
204 self.assertGreater(len(sgs), 0, msg)
205 return sgs[0]
206
207 def _create_security_group_rule(self, secgroup=None,
208 sec_group_rules_client=None,
209 tenant_id=None,
210 security_groups_client=None, **kwargs):
211 """Create a rule from a dictionary of rule parameters.
212
213 Create a rule in a secgroup. if secgroup not defined will search for
214 default secgroup in tenant_id.
215
216 :param secgroup: the security group.
217 :param tenant_id: if secgroup not passed -- the tenant in which to
218 search for default secgroup
219 :param kwargs: a dictionary containing rule parameters:
220 for example, to allow incoming ssh:
221 rule = {
222 direction: 'ingress'
223 protocol:'tcp',
224 port_range_min: 22,
225 port_range_max: 22
226 }
227 """
228 if sec_group_rules_client is None:
229 sec_group_rules_client = self.security_group_rules_client
230 if security_groups_client is None:
231 security_groups_client = self.security_groups_client
232 if not tenant_id:
233 tenant_id = security_groups_client.tenant_id
234 if secgroup is None:
235 secgroup = self._default_security_group(
236 client=security_groups_client, tenant_id=tenant_id)
237
238 ruleset = dict(security_group_id=secgroup['id'],
239 tenant_id=secgroup['tenant_id'])
240 ruleset.update(kwargs)
241
242 sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
243 sg_rule = sg_rule['security_group_rule']
244
245 self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
246 self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
247
248 return sg_rule
249
Roman Popelka22fde6a2022-03-24 10:45:05 +0100250 def create_loginable_secgroup_rule(self, security_group_rules_client=None,
251 secgroup=None,
252 security_groups_client=None):
Andrea Frittolif4510a12017-03-07 19:17:11 +0000253 """Create loginable security group rule
254
255 This function will create:
256 1. egress and ingress tcp port 22 allow rule in order to allow ssh
257 access for ipv4.
258 2. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
259 3. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
260 """
261
262 if security_group_rules_client is None:
263 security_group_rules_client = self.security_group_rules_client
264 if security_groups_client is None:
265 security_groups_client = self.security_groups_client
266 rules = []
267 rulesets = [
268 dict(
269 # ssh
270 protocol='tcp',
271 port_range_min=22,
272 port_range_max=22,
273 ),
274 dict(
Rodrigo Barbieri797257e2017-11-21 11:00:45 -0200275 # ipv6-ssh
276 protocol='tcp',
277 port_range_min=22,
278 port_range_max=22,
279 ethertype='IPv6',
280 ),
281 dict(
Andrea Frittolif4510a12017-03-07 19:17:11 +0000282 # ping
283 protocol='icmp',
284 ),
285 dict(
286 # ipv6-icmp for ping6
287 protocol='icmp',
288 ethertype='IPv6',
289 )
290 ]
291 sec_group_rules_client = security_group_rules_client
292 for ruleset in rulesets:
293 for r_direction in ['ingress', 'egress']:
294 ruleset['direction'] = r_direction
295 try:
296 sg_rule = self._create_security_group_rule(
297 sec_group_rules_client=sec_group_rules_client,
298 secgroup=secgroup,
299 security_groups_client=security_groups_client,
300 **ruleset)
301 except lib_exc.Conflict as ex:
302 # if rule already exist - skip rule and continue
303 msg = 'Security group rule already exists'
304 if msg not in ex._error_string:
305 raise ex
306 else:
307 self.assertEqual(r_direction, sg_rule['direction'])
308 rules.append(sg_rule)
309
310 return rules