blob: 213e7acbbb4cc3b4e579552b2dba6ad4ad2e66c9 [file] [log] [blame]
Andrea Frittolif4510a12017-03-07 19:17:11 +00001# Copyright 2012 OpenStack Foundation
2# Copyright 2013 IBM Corp.
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Andrea Frittolif4510a12017-03-07 19:17:11 +000017from oslo_log import log
Goutham Pacha Ravi37ee6772019-10-18 12:53:22 -070018from oslo_utils import uuidutils
Andrea Frittolif4510a12017-03-07 19:17:11 +000019from tempest.common import image as common_image
Andrea Frittolif4510a12017-03-07 19:17:11 +000020from tempest import config
Ken'ichi Ohmichi02d1f242017-03-12 18:56:27 -070021from tempest.lib.common.utils import data_utils
Andrea Frittolif4510a12017-03-07 19:17:11 +000022from tempest.lib.common.utils import test_utils
23from tempest.lib import exceptions as lib_exc
Roman Popelka290ef292022-02-28 10:41:04 +010024from tempest.scenario import manager
Andrea Frittolif4510a12017-03-07 19:17:11 +000025
26CONF = config.CONF
27
28LOG = log.getLogger(__name__)
29
30
Roman Popelka1118f3e2022-03-21 09:18:53 +010031class ScenarioTest(manager.NetworkScenarioTest):
Andrea Frittolif4510a12017-03-07 19:17:11 +000032 """Base class for scenario tests. Uses tempest own clients. """
33
34 credentials = ['primary']
35
36 @classmethod
37 def setup_clients(cls):
38 super(ScenarioTest, cls).setup_clients()
39 # Clients (in alphabetical order)
Vu Cong Tuandb2abab2017-06-21 20:38:39 +070040 cls.flavors_client = cls.os_primary.flavors_client
Andrea Frittolif4510a12017-03-07 19:17:11 +000041 cls.compute_floating_ips_client = (
Vu Cong Tuandb2abab2017-06-21 20:38:39 +070042 cls.os_primary.compute_floating_ips_client)
Andrea Frittolif4510a12017-03-07 19:17:11 +000043 if CONF.service_available.glance:
44 # Check if glance v1 is available to determine which client to use.
45 if CONF.image_feature_enabled.api_v1:
Vu Cong Tuandb2abab2017-06-21 20:38:39 +070046 cls.image_client = cls.os_primary.image_client
Andrea Frittolif4510a12017-03-07 19:17:11 +000047 elif CONF.image_feature_enabled.api_v2:
Vu Cong Tuandb2abab2017-06-21 20:38:39 +070048 cls.image_client = cls.os_primary.image_client_v2
Andrea Frittolif4510a12017-03-07 19:17:11 +000049 else:
50 raise lib_exc.InvalidConfiguration(
51 'Either api_v1 or api_v2 must be True in '
52 '[image-feature-enabled].')
53 # Compute image client
Vu Cong Tuandb2abab2017-06-21 20:38:39 +070054 cls.compute_images_client = cls.os_primary.compute_images_client
55 cls.keypairs_client = cls.os_primary.keypairs_client
Andrea Frittolif4510a12017-03-07 19:17:11 +000056 # Nova security groups client
57 cls.compute_security_groups_client = (
Vu Cong Tuandb2abab2017-06-21 20:38:39 +070058 cls.os_primary.compute_security_groups_client)
Andrea Frittolif4510a12017-03-07 19:17:11 +000059 cls.compute_security_group_rules_client = (
Vu Cong Tuandb2abab2017-06-21 20:38:39 +070060 cls.os_primary.compute_security_group_rules_client)
61 cls.servers_client = cls.os_primary.servers_client
62 cls.interface_client = cls.os_primary.interfaces_client
Andrea Frittolif4510a12017-03-07 19:17:11 +000063 # Neutron network client
Vu Cong Tuandb2abab2017-06-21 20:38:39 +070064 cls.networks_client = cls.os_primary.networks_client
65 cls.ports_client = cls.os_primary.ports_client
66 cls.routers_client = cls.os_primary.routers_client
67 cls.subnets_client = cls.os_primary.subnets_client
68 cls.floating_ips_client = cls.os_primary.floating_ips_client
69 cls.security_groups_client = cls.os_primary.security_groups_client
Andrea Frittolif4510a12017-03-07 19:17:11 +000070 cls.security_group_rules_client = (
Vu Cong Tuandb2abab2017-06-21 20:38:39 +070071 cls.os_primary.security_group_rules_client)
Andrea Frittolif4510a12017-03-07 19:17:11 +000072
Andrea Frittolif4510a12017-03-07 19:17:11 +000073 # ## Test functions library
74 #
75 # The create_[resource] functions only return body and discard the
76 # resp part which is not used in scenario tests
77
Andrea Frittolif4510a12017-03-07 19:17:11 +000078 def _image_create(self, name, fmt, path,
79 disk_format=None, properties=None):
80 if properties is None:
81 properties = {}
82 name = data_utils.rand_name('%s-' % name)
83 params = {
84 'name': name,
85 'container_format': fmt,
86 'disk_format': disk_format or fmt,
87 }
88 if CONF.image_feature_enabled.api_v1:
89 params['is_public'] = 'False'
90 params['properties'] = properties
91 params = {'headers': common_image.image_meta_to_headers(**params)}
92 else:
93 params['visibility'] = 'private'
94 # Additional properties are flattened out in the v2 API.
95 params.update(properties)
96 body = self.image_client.create_image(**params)
97 image = body['image'] if 'image' in body else body
98 self.addCleanup(self.image_client.delete_image, image['id'])
99 self.assertEqual("queued", image['status'])
100 with open(path, 'rb') as image_file:
101 if CONF.image_feature_enabled.api_v1:
102 self.image_client.update_image(image['id'], data=image_file)
103 else:
104 self.image_client.store_image_file(image['id'], image_file)
105 return image['id']
106
107 def glance_image_create(self):
Martin Kopec258cc6c2020-04-15 22:55:25 +0000108 img_path = CONF.scenario.img_file
Andrea Frittolif4510a12017-03-07 19:17:11 +0000109 img_container_format = CONF.scenario.img_container_format
110 img_disk_format = CONF.scenario.img_disk_format
111 img_properties = CONF.scenario.img_properties
112 LOG.debug("paths: img: %s, container_format: %s, disk_format: %s, "
Martin Kopec258cc6c2020-04-15 22:55:25 +0000113 "properties: %s",
Andrea Frittolif4510a12017-03-07 19:17:11 +0000114 img_path, img_container_format, img_disk_format,
Martin Kopec258cc6c2020-04-15 22:55:25 +0000115 img_properties)
116 image = self._image_create('scenario-img',
117 img_container_format,
118 img_path,
119 disk_format=img_disk_format,
120 properties=img_properties)
Andrea Frittolif4510a12017-03-07 19:17:11 +0000121 LOG.debug("image:%s", image)
122
123 return image
124
Andrea Frittolif4510a12017-03-07 19:17:11 +0000125 def _log_net_info(self, exc):
126 # network debug is called as part of ssh init
127 if not isinstance(exc, lib_exc.SSHTimeout):
128 LOG.debug('Network information on a devstack host')
129
Andrea Frittolif4510a12017-03-07 19:17:11 +0000130
131class NetworkScenarioTest(ScenarioTest):
132 """Base class for network scenario tests.
133
134 This class provide helpers for network scenario tests, using the neutron
135 API. Helpers from ancestor which use the nova network API are overridden
136 with the neutron API.
137
138 This Class also enforces using Neutron instead of novanetwork.
139 Subclassed tests will be skipped if Neutron is not enabled
140
141 """
142
143 credentials = ['primary', 'admin']
144
145 @classmethod
146 def skip_checks(cls):
147 super(NetworkScenarioTest, cls).skip_checks()
148 if not CONF.service_available.neutron:
149 raise cls.skipException('Neutron not available')
150
Goutham Pacha Ravi37ee6772019-10-18 12:53:22 -0700151 def _get_network_by_name_or_id(self, identifier):
152
153 if uuidutils.is_uuid_like(identifier):
154 return self.os_admin.networks_client.show_network(
155 identifier)['network']
156
157 networks = self.os_admin.networks_client.list_networks(
158 name=identifier)['networks']
159 self.assertNotEqual(len(networks), 0,
160 "Unable to get network by name: %s" % identifier)
161 return networks[0]
162
163 def get_networks(self):
164 return self.os_admin.networks_client.list_networks()['networks']
Andrea Frittolif4510a12017-03-07 19:17:11 +0000165
166 def create_floating_ip(self, thing, external_network_id=None,
lkuchlan7636a1f2020-04-30 16:13:13 +0300167 port_id=None, ip_addr=None, client=None):
Andrea Frittolif4510a12017-03-07 19:17:11 +0000168 """Create a floating IP and associates to a resource/port on Neutron"""
169 if not external_network_id:
170 external_network_id = CONF.network.public_network_id
171 if not client:
172 client = self.floating_ips_client
173 if not port_id:
Roman Popelkaf880ce32022-03-22 13:26:51 +0100174 port_id, ip4 = self.get_server_port_id_and_ip4(thing,
175 ip_addr=ip_addr)
Andrea Frittolif4510a12017-03-07 19:17:11 +0000176 else:
177 ip4 = None
178 result = client.create_floatingip(
179 floating_network_id=external_network_id,
180 port_id=port_id,
181 tenant_id=thing['tenant_id'],
182 fixed_ip_address=ip4
183 )
184 floating_ip = result['floatingip']
185 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
186 client.delete_floatingip,
187 floating_ip['id'])
188 return floating_ip
189
Andrea Frittolif4510a12017-03-07 19:17:11 +0000190 def _create_security_group(self, security_group_rules_client=None,
191 tenant_id=None,
192 namestart='secgroup-smoke',
193 security_groups_client=None):
194 if security_group_rules_client is None:
195 security_group_rules_client = self.security_group_rules_client
196 if security_groups_client is None:
197 security_groups_client = self.security_groups_client
198 if tenant_id is None:
199 tenant_id = security_groups_client.tenant_id
200 secgroup = self._create_empty_security_group(
201 namestart=namestart, client=security_groups_client,
202 tenant_id=tenant_id)
203
204 # Add rules to the security group
205 rules = self._create_loginable_secgroup_rule(
206 security_group_rules_client=security_group_rules_client,
207 secgroup=secgroup,
208 security_groups_client=security_groups_client)
209 for rule in rules:
210 self.assertEqual(tenant_id, rule['tenant_id'])
211 self.assertEqual(secgroup['id'], rule['security_group_id'])
212 return secgroup
213
214 def _create_empty_security_group(self, client=None, tenant_id=None,
215 namestart='secgroup-smoke'):
216 """Create a security group without rules.
217
218 Default rules will be created:
219 - IPv4 egress to any
220 - IPv6 egress to any
221
222 :param tenant_id: secgroup will be created in this tenant
223 :returns: the created security group
224 """
225 if client is None:
226 client = self.security_groups_client
227 if not tenant_id:
228 tenant_id = client.tenant_id
229 sg_name = data_utils.rand_name(namestart)
230 sg_desc = sg_name + " description"
231 sg_dict = dict(name=sg_name,
232 description=sg_desc)
233 sg_dict['tenant_id'] = tenant_id
234 result = client.create_security_group(**sg_dict)
235
236 secgroup = result['security_group']
237 self.assertEqual(secgroup['name'], sg_name)
238 self.assertEqual(tenant_id, secgroup['tenant_id'])
239 self.assertEqual(secgroup['description'], sg_desc)
240
241 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
242 client.delete_security_group, secgroup['id'])
243 return secgroup
244
245 def _default_security_group(self, client=None, tenant_id=None):
246 """Get default secgroup for given tenant_id.
247
248 :returns: default secgroup for given tenant
249 """
250 if client is None:
251 client = self.security_groups_client
252 if not tenant_id:
253 tenant_id = client.tenant_id
254 sgs = [
255 sg for sg in list(client.list_security_groups().values())[0]
256 if sg['tenant_id'] == tenant_id and sg['name'] == 'default'
257 ]
258 msg = "No default security group for tenant %s." % (tenant_id)
259 self.assertGreater(len(sgs), 0, msg)
260 return sgs[0]
261
262 def _create_security_group_rule(self, secgroup=None,
263 sec_group_rules_client=None,
264 tenant_id=None,
265 security_groups_client=None, **kwargs):
266 """Create a rule from a dictionary of rule parameters.
267
268 Create a rule in a secgroup. if secgroup not defined will search for
269 default secgroup in tenant_id.
270
271 :param secgroup: the security group.
272 :param tenant_id: if secgroup not passed -- the tenant in which to
273 search for default secgroup
274 :param kwargs: a dictionary containing rule parameters:
275 for example, to allow incoming ssh:
276 rule = {
277 direction: 'ingress'
278 protocol:'tcp',
279 port_range_min: 22,
280 port_range_max: 22
281 }
282 """
283 if sec_group_rules_client is None:
284 sec_group_rules_client = self.security_group_rules_client
285 if security_groups_client is None:
286 security_groups_client = self.security_groups_client
287 if not tenant_id:
288 tenant_id = security_groups_client.tenant_id
289 if secgroup is None:
290 secgroup = self._default_security_group(
291 client=security_groups_client, tenant_id=tenant_id)
292
293 ruleset = dict(security_group_id=secgroup['id'],
294 tenant_id=secgroup['tenant_id'])
295 ruleset.update(kwargs)
296
297 sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
298 sg_rule = sg_rule['security_group_rule']
299
300 self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
301 self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
302
303 return sg_rule
304
305 def _create_loginable_secgroup_rule(self, security_group_rules_client=None,
306 secgroup=None,
307 security_groups_client=None):
308 """Create loginable security group rule
309
310 This function will create:
311 1. egress and ingress tcp port 22 allow rule in order to allow ssh
312 access for ipv4.
313 2. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
314 3. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
315 """
316
317 if security_group_rules_client is None:
318 security_group_rules_client = self.security_group_rules_client
319 if security_groups_client is None:
320 security_groups_client = self.security_groups_client
321 rules = []
322 rulesets = [
323 dict(
324 # ssh
325 protocol='tcp',
326 port_range_min=22,
327 port_range_max=22,
328 ),
329 dict(
Rodrigo Barbieri797257e2017-11-21 11:00:45 -0200330 # ipv6-ssh
331 protocol='tcp',
332 port_range_min=22,
333 port_range_max=22,
334 ethertype='IPv6',
335 ),
336 dict(
Andrea Frittolif4510a12017-03-07 19:17:11 +0000337 # ping
338 protocol='icmp',
339 ),
340 dict(
341 # ipv6-icmp for ping6
342 protocol='icmp',
343 ethertype='IPv6',
344 )
345 ]
346 sec_group_rules_client = security_group_rules_client
347 for ruleset in rulesets:
348 for r_direction in ['ingress', 'egress']:
349 ruleset['direction'] = r_direction
350 try:
351 sg_rule = self._create_security_group_rule(
352 sec_group_rules_client=sec_group_rules_client,
353 secgroup=secgroup,
354 security_groups_client=security_groups_client,
355 **ruleset)
356 except lib_exc.Conflict as ex:
357 # if rule already exist - skip rule and continue
358 msg = 'Security group rule already exists'
359 if msg not in ex._error_string:
360 raise ex
361 else:
362 self.assertEqual(r_direction, sg_rule['direction'])
363 rules.append(sg_rule)
364
365 return rules