blob: 398c7648a35cf4b96d73efc40084a53f58d7463e [file] [log] [blame]
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +02001# Copyright 2012 OpenStack Foundation
2# Copyright 2013 IBM Corp.
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020017from oslo_log import log
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020018
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020019from tempest.common import utils
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020020from tempest import config
21from tempest.lib.common.utils import data_utils
22from tempest.lib.common.utils import test_utils
23from tempest.lib import exceptions as lib_exc
Roman Popelkafd509eb2022-04-07 15:10:08 +020024from tempest.scenario import manager
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020025
26CONF = config.CONF
27
28LOG = log.getLogger(__name__)
29
30
Roman Popelkafd509eb2022-04-07 15:10:08 +020031class ScenarioTest(manager.NetworkScenarioTest):
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020032 """Base class for scenario tests. Uses tempest own clients. """
33
34 credentials = ['primary']
35
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020036
37class NetworkScenarioTest(ScenarioTest):
38 """Base class for network scenario tests.
39
40 This class provide helpers for network scenario tests, using the neutron
41 API. Helpers from ancestor which use the nova network API are overridden
42 with the neutron API.
43
44 This Class also enforces using Neutron instead of novanetwork.
45 Subclassed tests will be skipped if Neutron is not enabled
46
47 """
48
49 credentials = ['primary', 'admin']
50
51 @classmethod
52 def skip_checks(cls):
53 super(NetworkScenarioTest, cls).skip_checks()
54 if not CONF.service_available.neutron:
55 raise cls.skipException('Neutron not available')
56 if not utils.is_extension_enabled('bgpvpn', 'network'):
57 msg = "Bgpvpn extension not enabled."
58 raise cls.skipException(msg)
59
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020060 def _check_remote_connectivity(self, source, dest, should_succeed=True,
61 nic=None):
62 """check ping server via source ssh connection
63
64 :param source: RemoteClient: an ssh connection from which to ping
65 :param dest: and IP to ping against
66 :param should_succeed: boolean should ping succeed or not
67 :param nic: specific network interface to ping from
68 :returns: boolean -- should_succeed == ping
69 :returns: ping is false if ping failed
70 """
71 def ping_remote():
72 try:
73 source.ping_host(dest, nic=nic)
74 except lib_exc.SSHExecCommandFailed:
75 LOG.warning('Failed to ping IP: %s via a ssh connection '
76 'from: %s.', dest, source.ssh_client.host)
77 return not should_succeed
78 return should_succeed
79
80 return test_utils.call_until_true(ping_remote,
81 CONF.validation.ping_timeout,
82 1)
83
Roman Popelka8235e3b2022-04-12 11:29:17 +020084 def create_loginable_secgroup_rule(self, security_group_rules_client=None,
85 secgroup=None,
86 security_groups_client=None):
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020087 """Create loginable security group rule
88
89 This function will create:
90 1. egress and ingress tcp port 22 allow rule in order to allow ssh
91 access for ipv4.
92 2. egress and ingress tcp port 80 allow rule in order to allow http
93 access for ipv4.
94 3. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
95 4. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
96 """
97
98 if security_group_rules_client is None:
99 security_group_rules_client = self.security_group_rules_client
100 if security_groups_client is None:
101 security_groups_client = self.security_groups_client
102 rules = []
103 rulesets = [
104 dict(
105 # ssh
106 protocol='tcp',
107 port_range_min=22,
108 port_range_max=22,
109 ),
110 dict(
111 # http
112 protocol='tcp',
113 port_range_min=80,
114 port_range_max=80,
115 ),
116 dict(
117 # ping
118 protocol='icmp',
119 ),
120 dict(
121 # ipv6-icmp for ping6
122 protocol='icmp',
123 ethertype='IPv6',
124 )
125 ]
126 sec_group_rules_client = security_group_rules_client
127 for ruleset in rulesets:
128 for r_direction in ['ingress', 'egress']:
129 ruleset['direction'] = r_direction
130 try:
Roman Popelka46d4dda2022-04-12 11:36:39 +0200131 sg_rule = self.create_security_group_rule(
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +0200132 sec_group_rules_client=sec_group_rules_client,
133 secgroup=secgroup,
134 security_groups_client=security_groups_client,
135 **ruleset)
136 except lib_exc.Conflict as ex:
137 # if rule already exist - skip rule and continue
138 msg = 'Security group rule already exists'
139 if msg not in ex._error_string:
140 raise ex
141 else:
142 self.assertEqual(r_direction, sg_rule['direction'])
143 rules.append(sg_rule)
144
145 return rules
146
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +0200147 def _create_router(self, client=None, tenant_id=None,
148 namestart='router-smoke'):
149 if not client:
elajkat8c7c5e32023-05-22 15:05:36 +0200150 client = self.admin_routers_client
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +0200151 if not tenant_id:
Takashi Kajinamida451772023-03-22 00:19:39 +0900152 tenant_id = client.project_id
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +0200153 name = data_utils.rand_name(namestart)
154 result = client.create_router(name=name,
155 admin_state_up=True,
156 tenant_id=tenant_id)
157 router = result['router']
158 self.assertEqual(router['name'], name)
159 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
160 client.delete_router,
161 router['id'])
162 return router