blob: 4a2df030dacf8f00f2c2f248e3ecb3702d45bae6 [file] [log] [blame]
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +02001# Copyright 2012 OpenStack Foundation
2# Copyright 2013 IBM Corp.
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020017from oslo_log import log
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020018
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020019from tempest.common import utils
20from tempest.common.utils.linux import remote_client
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020021from tempest import config
22from tempest.lib.common.utils import data_utils
23from tempest.lib.common.utils import test_utils
24from tempest.lib import exceptions as lib_exc
Roman Popelkafd509eb2022-04-07 15:10:08 +020025from tempest.scenario import manager
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020026
27CONF = config.CONF
28
29LOG = log.getLogger(__name__)
30
31
Roman Popelkafd509eb2022-04-07 15:10:08 +020032class ScenarioTest(manager.NetworkScenarioTest):
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020033 """Base class for scenario tests. Uses tempest own clients. """
34
35 credentials = ['primary']
36
37 @classmethod
38 def setup_clients(cls):
39 super(ScenarioTest, cls).setup_clients()
40 # Clients (in alphabetical order)
41 cls.keypairs_client = cls.os_primary.keypairs_client
42 cls.servers_client = cls.os_primary.servers_client
43 # Neutron network client
44 cls.networks_client = cls.os_primary.networks_client
45 cls.ports_client = cls.os_primary.ports_client
46 cls.routers_client = cls.os_primary.routers_client
47 cls.subnets_client = cls.os_primary.subnets_client
48 cls.floating_ips_client = cls.os_primary.floating_ips_client
49 cls.security_groups_client = cls.os_primary.security_groups_client
50 cls.security_group_rules_client = (
51 cls.os_primary.security_group_rules_client)
52
53 # ## Test functions library
54 #
55 # The create_[resource] functions only return body and discard the
56 # resp part which is not used in scenario tests
57
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020058 def get_remote_client(self, ip_address, username=None, private_key=None):
59 """Get a SSH client to a remote server
60
61 @param ip_address the server floating or fixed IP address to use
62 for ssh validation
63 @param username name of the Linux account on the remote server
64 @param private_key the SSH private key to use
65 @return a RemoteClient object
66 """
67
68 if username is None:
69 username = CONF.validation.image_ssh_user
70 # Set this with 'keypair' or others to log in with keypair or
71 # username/password.
72 if CONF.validation.auth_method == 'keypair':
73 password = None
74 if private_key is None:
75 private_key = self.keypair['private_key']
76 else:
77 password = CONF.validation.image_ssh_password
78 private_key = None
79 linux_client = remote_client.RemoteClient(ip_address, username,
80 pkey=private_key,
81 password=password)
82 try:
83 linux_client.validate_authentication()
84 except Exception as e:
85 message = ('Initializing SSH connection to %(ip)s failed. '
86 'Error: %(error)s' % {'ip': ip_address,
87 'error': e})
88 caller = test_utils.find_test_caller()
89 if caller:
90 message = '(%s) %s' % (caller, message)
91 LOG.exception(message)
Roman Popelka763567e2022-04-12 09:25:51 +020092 self.log_console_output()
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020093 raise
94
95 return linux_client
96
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020097
98class NetworkScenarioTest(ScenarioTest):
99 """Base class for network scenario tests.
100
101 This class provide helpers for network scenario tests, using the neutron
102 API. Helpers from ancestor which use the nova network API are overridden
103 with the neutron API.
104
105 This Class also enforces using Neutron instead of novanetwork.
106 Subclassed tests will be skipped if Neutron is not enabled
107
108 """
109
110 credentials = ['primary', 'admin']
111
112 @classmethod
113 def skip_checks(cls):
114 super(NetworkScenarioTest, cls).skip_checks()
115 if not CONF.service_available.neutron:
116 raise cls.skipException('Neutron not available')
117 if not utils.is_extension_enabled('bgpvpn', 'network'):
118 msg = "Bgpvpn extension not enabled."
119 raise cls.skipException(msg)
120
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +0200121 def _check_remote_connectivity(self, source, dest, should_succeed=True,
122 nic=None):
123 """check ping server via source ssh connection
124
125 :param source: RemoteClient: an ssh connection from which to ping
126 :param dest: and IP to ping against
127 :param should_succeed: boolean should ping succeed or not
128 :param nic: specific network interface to ping from
129 :returns: boolean -- should_succeed == ping
130 :returns: ping is false if ping failed
131 """
132 def ping_remote():
133 try:
134 source.ping_host(dest, nic=nic)
135 except lib_exc.SSHExecCommandFailed:
136 LOG.warning('Failed to ping IP: %s via a ssh connection '
137 'from: %s.', dest, source.ssh_client.host)
138 return not should_succeed
139 return should_succeed
140
141 return test_utils.call_until_true(ping_remote,
142 CONF.validation.ping_timeout,
143 1)
144
145 def _create_security_group(self, security_group_rules_client=None,
146 tenant_id=None,
147 namestart='secgroup-smoke',
148 security_groups_client=None):
149 if security_group_rules_client is None:
150 security_group_rules_client = self.security_group_rules_client
151 if security_groups_client is None:
152 security_groups_client = self.security_groups_client
153 if tenant_id is None:
154 tenant_id = security_groups_client.tenant_id
155 secgroup = self._create_empty_security_group(
156 namestart=namestart, client=security_groups_client,
157 tenant_id=tenant_id)
158
159 # Add rules to the security group
160 rules = self._create_loginable_secgroup_rule(
161 security_group_rules_client=security_group_rules_client,
162 secgroup=secgroup,
163 security_groups_client=security_groups_client)
164 for rule in rules:
165 self.assertEqual(tenant_id, rule['tenant_id'])
166 self.assertEqual(secgroup['id'], rule['security_group_id'])
167 return secgroup
168
169 def _create_empty_security_group(self, client=None, tenant_id=None,
170 namestart='secgroup-smoke'):
171 """Create a security group without rules.
172
173 Default rules will be created:
174 - IPv4 egress to any
175 - IPv6 egress to any
176
177 :param tenant_id: secgroup will be created in this tenant
178 :returns: the created security group
179 """
180 if client is None:
181 client = self.security_groups_client
182 if not tenant_id:
183 tenant_id = client.tenant_id
184 sg_name = data_utils.rand_name(namestart)
185 sg_desc = sg_name + " description"
186 sg_dict = dict(name=sg_name,
187 description=sg_desc)
188 sg_dict['tenant_id'] = tenant_id
189 result = client.create_security_group(**sg_dict)
190
191 secgroup = result['security_group']
192 self.assertEqual(secgroup['name'], sg_name)
193 self.assertEqual(tenant_id, secgroup['tenant_id'])
194 self.assertEqual(secgroup['description'], sg_desc)
195
196 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
197 client.delete_security_group, secgroup['id'])
198 return secgroup
199
200 def _default_security_group(self, client=None, tenant_id=None):
201 """Get default secgroup for given tenant_id.
202
203 :returns: default secgroup for given tenant
204 """
205 if client is None:
206 client = self.security_groups_client
207 if not tenant_id:
208 tenant_id = client.tenant_id
209 sgs = [
210 sg for sg in list(client.list_security_groups().values())[0]
211 if sg['tenant_id'] == tenant_id and sg['name'] == 'default'
212 ]
213 msg = "No default security group for tenant %s." % (tenant_id)
214 self.assertGreater(len(sgs), 0, msg)
215 return sgs[0]
216
217 def _create_security_group_rule(self, secgroup=None,
218 sec_group_rules_client=None,
219 tenant_id=None,
220 security_groups_client=None, **kwargs):
221 """Create a rule from a dictionary of rule parameters.
222
223 Create a rule in a secgroup. if secgroup not defined will search for
224 default secgroup in tenant_id.
225
226 :param secgroup: the security group.
227 :param tenant_id: if secgroup not passed -- the tenant in which to
228 search for default secgroup
229 :param kwargs: a dictionary containing rule parameters:
230 for example, to allow incoming ssh:
231 rule = {
232 direction: 'ingress'
233 protocol:'tcp',
234 port_range_min: 22,
235 port_range_max: 22
236 }
237 """
238 if sec_group_rules_client is None:
239 sec_group_rules_client = self.security_group_rules_client
240 if security_groups_client is None:
241 security_groups_client = self.security_groups_client
242 if not tenant_id:
243 tenant_id = security_groups_client.tenant_id
244 if secgroup is None:
245 secgroup = self._default_security_group(
246 client=security_groups_client, tenant_id=tenant_id)
247
248 ruleset = dict(security_group_id=secgroup['id'],
249 tenant_id=secgroup['tenant_id'])
250 ruleset.update(kwargs)
251
252 sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
253 sg_rule = sg_rule['security_group_rule']
254
255 self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
256 self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
257
258 return sg_rule
259
260 def _create_loginable_secgroup_rule(self, security_group_rules_client=None,
261 secgroup=None,
262 security_groups_client=None):
263 """Create loginable security group rule
264
265 This function will create:
266 1. egress and ingress tcp port 22 allow rule in order to allow ssh
267 access for ipv4.
268 2. egress and ingress tcp port 80 allow rule in order to allow http
269 access for ipv4.
270 3. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
271 4. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
272 """
273
274 if security_group_rules_client is None:
275 security_group_rules_client = self.security_group_rules_client
276 if security_groups_client is None:
277 security_groups_client = self.security_groups_client
278 rules = []
279 rulesets = [
280 dict(
281 # ssh
282 protocol='tcp',
283 port_range_min=22,
284 port_range_max=22,
285 ),
286 dict(
287 # http
288 protocol='tcp',
289 port_range_min=80,
290 port_range_max=80,
291 ),
292 dict(
293 # ping
294 protocol='icmp',
295 ),
296 dict(
297 # ipv6-icmp for ping6
298 protocol='icmp',
299 ethertype='IPv6',
300 )
301 ]
302 sec_group_rules_client = security_group_rules_client
303 for ruleset in rulesets:
304 for r_direction in ['ingress', 'egress']:
305 ruleset['direction'] = r_direction
306 try:
307 sg_rule = self._create_security_group_rule(
308 sec_group_rules_client=sec_group_rules_client,
309 secgroup=secgroup,
310 security_groups_client=security_groups_client,
311 **ruleset)
312 except lib_exc.Conflict as ex:
313 # if rule already exist - skip rule and continue
314 msg = 'Security group rule already exists'
315 if msg not in ex._error_string:
316 raise ex
317 else:
318 self.assertEqual(r_direction, sg_rule['direction'])
319 rules.append(sg_rule)
320
321 return rules
322
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +0200323 def _create_router(self, client=None, tenant_id=None,
324 namestart='router-smoke'):
325 if not client:
326 client = self.routers_client
327 if not tenant_id:
328 tenant_id = client.tenant_id
329 name = data_utils.rand_name(namestart)
330 result = client.create_router(name=name,
331 admin_state_up=True,
332 tenant_id=tenant_id)
333 router = result['router']
334 self.assertEqual(router['name'], name)
335 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
336 client.delete_router,
337 router['id'])
338 return router