blob: a73ab44d7c130b006d38d255ad7b71e15d0790c5 [file] [log] [blame]
Andrea Frittolif4510a12017-03-07 19:17:11 +00001# Copyright 2012 OpenStack Foundation
2# Copyright 2013 IBM Corp.
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Andrea Frittolif4510a12017-03-07 19:17:11 +000017from oslo_log import log
Goutham Pacha Ravi37ee6772019-10-18 12:53:22 -070018from oslo_utils import uuidutils
Andrea Frittolif4510a12017-03-07 19:17:11 +000019from tempest.common import image as common_image
Andrea Frittolif4510a12017-03-07 19:17:11 +000020from tempest import config
Ken'ichi Ohmichi02d1f242017-03-12 18:56:27 -070021from tempest.lib.common.utils import data_utils
Andrea Frittolif4510a12017-03-07 19:17:11 +000022from tempest.lib.common.utils import test_utils
23from tempest.lib import exceptions as lib_exc
Roman Popelka290ef292022-02-28 10:41:04 +010024from tempest.scenario import manager
Andrea Frittolif4510a12017-03-07 19:17:11 +000025
26CONF = config.CONF
27
28LOG = log.getLogger(__name__)
29
30
Roman Popelka1118f3e2022-03-21 09:18:53 +010031class ScenarioTest(manager.NetworkScenarioTest):
Andrea Frittolif4510a12017-03-07 19:17:11 +000032 """Base class for scenario tests. Uses tempest own clients. """
33
Andrea Frittolif4510a12017-03-07 19:17:11 +000034 # ## Test functions library
35 #
36 # The create_[resource] functions only return body and discard the
37 # resp part which is not used in scenario tests
38
Andrea Frittolif4510a12017-03-07 19:17:11 +000039 def _image_create(self, name, fmt, path,
40 disk_format=None, properties=None):
41 if properties is None:
42 properties = {}
43 name = data_utils.rand_name('%s-' % name)
44 params = {
45 'name': name,
46 'container_format': fmt,
47 'disk_format': disk_format or fmt,
48 }
49 if CONF.image_feature_enabled.api_v1:
50 params['is_public'] = 'False'
51 params['properties'] = properties
52 params = {'headers': common_image.image_meta_to_headers(**params)}
53 else:
54 params['visibility'] = 'private'
55 # Additional properties are flattened out in the v2 API.
56 params.update(properties)
57 body = self.image_client.create_image(**params)
58 image = body['image'] if 'image' in body else body
59 self.addCleanup(self.image_client.delete_image, image['id'])
60 self.assertEqual("queued", image['status'])
61 with open(path, 'rb') as image_file:
62 if CONF.image_feature_enabled.api_v1:
63 self.image_client.update_image(image['id'], data=image_file)
64 else:
65 self.image_client.store_image_file(image['id'], image_file)
66 return image['id']
67
68 def glance_image_create(self):
Martin Kopec258cc6c2020-04-15 22:55:25 +000069 img_path = CONF.scenario.img_file
Andrea Frittolif4510a12017-03-07 19:17:11 +000070 img_container_format = CONF.scenario.img_container_format
71 img_disk_format = CONF.scenario.img_disk_format
72 img_properties = CONF.scenario.img_properties
73 LOG.debug("paths: img: %s, container_format: %s, disk_format: %s, "
Martin Kopec258cc6c2020-04-15 22:55:25 +000074 "properties: %s",
Andrea Frittolif4510a12017-03-07 19:17:11 +000075 img_path, img_container_format, img_disk_format,
Martin Kopec258cc6c2020-04-15 22:55:25 +000076 img_properties)
77 image = self._image_create('scenario-img',
78 img_container_format,
79 img_path,
80 disk_format=img_disk_format,
81 properties=img_properties)
Andrea Frittolif4510a12017-03-07 19:17:11 +000082 LOG.debug("image:%s", image)
83
84 return image
85
Andrea Frittolif4510a12017-03-07 19:17:11 +000086 def _log_net_info(self, exc):
87 # network debug is called as part of ssh init
88 if not isinstance(exc, lib_exc.SSHTimeout):
89 LOG.debug('Network information on a devstack host')
90
Andrea Frittolif4510a12017-03-07 19:17:11 +000091
92class NetworkScenarioTest(ScenarioTest):
93 """Base class for network scenario tests.
94
95 This class provide helpers for network scenario tests, using the neutron
96 API. Helpers from ancestor which use the nova network API are overridden
97 with the neutron API.
98
99 This Class also enforces using Neutron instead of novanetwork.
100 Subclassed tests will be skipped if Neutron is not enabled
101
102 """
103
Andrea Frittolif4510a12017-03-07 19:17:11 +0000104 @classmethod
105 def skip_checks(cls):
106 super(NetworkScenarioTest, cls).skip_checks()
107 if not CONF.service_available.neutron:
108 raise cls.skipException('Neutron not available')
109
Goutham Pacha Ravi37ee6772019-10-18 12:53:22 -0700110 def _get_network_by_name_or_id(self, identifier):
111
112 if uuidutils.is_uuid_like(identifier):
113 return self.os_admin.networks_client.show_network(
114 identifier)['network']
115
116 networks = self.os_admin.networks_client.list_networks(
117 name=identifier)['networks']
118 self.assertNotEqual(len(networks), 0,
119 "Unable to get network by name: %s" % identifier)
120 return networks[0]
121
Andrea Frittolif4510a12017-03-07 19:17:11 +0000122 def create_floating_ip(self, thing, external_network_id=None,
lkuchlan7636a1f2020-04-30 16:13:13 +0300123 port_id=None, ip_addr=None, client=None):
Andrea Frittolif4510a12017-03-07 19:17:11 +0000124 """Create a floating IP and associates to a resource/port on Neutron"""
125 if not external_network_id:
126 external_network_id = CONF.network.public_network_id
127 if not client:
128 client = self.floating_ips_client
129 if not port_id:
Roman Popelkaf880ce32022-03-22 13:26:51 +0100130 port_id, ip4 = self.get_server_port_id_and_ip4(thing,
131 ip_addr=ip_addr)
Andrea Frittolif4510a12017-03-07 19:17:11 +0000132 else:
133 ip4 = None
134 result = client.create_floatingip(
135 floating_network_id=external_network_id,
136 port_id=port_id,
137 tenant_id=thing['tenant_id'],
138 fixed_ip_address=ip4
139 )
140 floating_ip = result['floatingip']
141 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
142 client.delete_floatingip,
143 floating_ip['id'])
144 return floating_ip
145
Roman Popelka22fde6a2022-03-24 10:45:05 +0100146 def create_loginable_secgroup_rule(self, security_group_rules_client=None,
147 secgroup=None,
148 security_groups_client=None):
Andrea Frittolif4510a12017-03-07 19:17:11 +0000149 """Create loginable security group rule
150
151 This function will create:
152 1. egress and ingress tcp port 22 allow rule in order to allow ssh
153 access for ipv4.
154 2. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
155 3. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
156 """
157
158 if security_group_rules_client is None:
159 security_group_rules_client = self.security_group_rules_client
160 if security_groups_client is None:
161 security_groups_client = self.security_groups_client
162 rules = []
163 rulesets = [
164 dict(
165 # ssh
166 protocol='tcp',
167 port_range_min=22,
168 port_range_max=22,
169 ),
170 dict(
Rodrigo Barbieri797257e2017-11-21 11:00:45 -0200171 # ipv6-ssh
172 protocol='tcp',
173 port_range_min=22,
174 port_range_max=22,
175 ethertype='IPv6',
176 ),
177 dict(
Andrea Frittolif4510a12017-03-07 19:17:11 +0000178 # ping
179 protocol='icmp',
180 ),
181 dict(
182 # ipv6-icmp for ping6
183 protocol='icmp',
184 ethertype='IPv6',
185 )
186 ]
187 sec_group_rules_client = security_group_rules_client
188 for ruleset in rulesets:
189 for r_direction in ['ingress', 'egress']:
190 ruleset['direction'] = r_direction
191 try:
Roman Popelka0cf32fb2022-03-24 11:14:49 +0100192 sg_rule = self.create_security_group_rule(
Andrea Frittolif4510a12017-03-07 19:17:11 +0000193 sec_group_rules_client=sec_group_rules_client,
194 secgroup=secgroup,
195 security_groups_client=security_groups_client,
196 **ruleset)
197 except lib_exc.Conflict as ex:
198 # if rule already exist - skip rule and continue
199 msg = 'Security group rule already exists'
200 if msg not in ex._error_string:
201 raise ex
202 else:
203 self.assertEqual(r_direction, sg_rule['direction'])
204 rules.append(sg_rule)
205
206 return rules