blob: e78d1d83fb523277150ef9a10fe398b6d7558f50 [file] [log] [blame]
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +02001# Copyright 2012 OpenStack Foundation
2# Copyright 2013 IBM Corp.
3# All Rights Reserved.
4#
5# Licensed under the Apache License, Version 2.0 (the "License"); you may
6# not use this file except in compliance with the License. You may obtain
7# a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14# License for the specific language governing permissions and limitations
15# under the License.
16
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020017from oslo_log import log
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020018
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020019from tempest.common import utils
20from tempest.common.utils.linux import remote_client
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020021from tempest import config
22from tempest.lib.common.utils import data_utils
23from tempest.lib.common.utils import test_utils
24from tempest.lib import exceptions as lib_exc
Roman Popelkafd509eb2022-04-07 15:10:08 +020025from tempest.scenario import manager
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020026
27CONF = config.CONF
28
29LOG = log.getLogger(__name__)
30
31
Roman Popelkafd509eb2022-04-07 15:10:08 +020032class ScenarioTest(manager.NetworkScenarioTest):
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020033 """Base class for scenario tests. Uses tempest own clients. """
34
35 credentials = ['primary']
36
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020037 # ## Test functions library
38 #
39 # The create_[resource] functions only return body and discard the
40 # resp part which is not used in scenario tests
41
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020042 def get_remote_client(self, ip_address, username=None, private_key=None):
43 """Get a SSH client to a remote server
44
45 @param ip_address the server floating or fixed IP address to use
46 for ssh validation
47 @param username name of the Linux account on the remote server
48 @param private_key the SSH private key to use
49 @return a RemoteClient object
50 """
51
52 if username is None:
53 username = CONF.validation.image_ssh_user
54 # Set this with 'keypair' or others to log in with keypair or
55 # username/password.
56 if CONF.validation.auth_method == 'keypair':
57 password = None
58 if private_key is None:
59 private_key = self.keypair['private_key']
60 else:
61 password = CONF.validation.image_ssh_password
62 private_key = None
63 linux_client = remote_client.RemoteClient(ip_address, username,
64 pkey=private_key,
65 password=password)
66 try:
67 linux_client.validate_authentication()
68 except Exception as e:
69 message = ('Initializing SSH connection to %(ip)s failed. '
70 'Error: %(error)s' % {'ip': ip_address,
71 'error': e})
72 caller = test_utils.find_test_caller()
73 if caller:
74 message = '(%s) %s' % (caller, message)
75 LOG.exception(message)
Roman Popelka763567e2022-04-12 09:25:51 +020076 self.log_console_output()
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020077 raise
78
79 return linux_client
80
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +020081
82class NetworkScenarioTest(ScenarioTest):
83 """Base class for network scenario tests.
84
85 This class provide helpers for network scenario tests, using the neutron
86 API. Helpers from ancestor which use the nova network API are overridden
87 with the neutron API.
88
89 This Class also enforces using Neutron instead of novanetwork.
90 Subclassed tests will be skipped if Neutron is not enabled
91
92 """
93
94 credentials = ['primary', 'admin']
95
96 @classmethod
97 def skip_checks(cls):
98 super(NetworkScenarioTest, cls).skip_checks()
99 if not CONF.service_available.neutron:
100 raise cls.skipException('Neutron not available')
101 if not utils.is_extension_enabled('bgpvpn', 'network'):
102 msg = "Bgpvpn extension not enabled."
103 raise cls.skipException(msg)
104
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +0200105 def _check_remote_connectivity(self, source, dest, should_succeed=True,
106 nic=None):
107 """check ping server via source ssh connection
108
109 :param source: RemoteClient: an ssh connection from which to ping
110 :param dest: and IP to ping against
111 :param should_succeed: boolean should ping succeed or not
112 :param nic: specific network interface to ping from
113 :returns: boolean -- should_succeed == ping
114 :returns: ping is false if ping failed
115 """
116 def ping_remote():
117 try:
118 source.ping_host(dest, nic=nic)
119 except lib_exc.SSHExecCommandFailed:
120 LOG.warning('Failed to ping IP: %s via a ssh connection '
121 'from: %s.', dest, source.ssh_client.host)
122 return not should_succeed
123 return should_succeed
124
125 return test_utils.call_until_true(ping_remote,
126 CONF.validation.ping_timeout,
127 1)
128
Roman Popelka8235e3b2022-04-12 11:29:17 +0200129 def create_loginable_secgroup_rule(self, security_group_rules_client=None,
130 secgroup=None,
131 security_groups_client=None):
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +0200132 """Create loginable security group rule
133
134 This function will create:
135 1. egress and ingress tcp port 22 allow rule in order to allow ssh
136 access for ipv4.
137 2. egress and ingress tcp port 80 allow rule in order to allow http
138 access for ipv4.
139 3. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
140 4. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
141 """
142
143 if security_group_rules_client is None:
144 security_group_rules_client = self.security_group_rules_client
145 if security_groups_client is None:
146 security_groups_client = self.security_groups_client
147 rules = []
148 rulesets = [
149 dict(
150 # ssh
151 protocol='tcp',
152 port_range_min=22,
153 port_range_max=22,
154 ),
155 dict(
156 # http
157 protocol='tcp',
158 port_range_min=80,
159 port_range_max=80,
160 ),
161 dict(
162 # ping
163 protocol='icmp',
164 ),
165 dict(
166 # ipv6-icmp for ping6
167 protocol='icmp',
168 ethertype='IPv6',
169 )
170 ]
171 sec_group_rules_client = security_group_rules_client
172 for ruleset in rulesets:
173 for r_direction in ['ingress', 'egress']:
174 ruleset['direction'] = r_direction
175 try:
Roman Popelka46d4dda2022-04-12 11:36:39 +0200176 sg_rule = self.create_security_group_rule(
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +0200177 sec_group_rules_client=sec_group_rules_client,
178 secgroup=secgroup,
179 security_groups_client=security_groups_client,
180 **ruleset)
181 except lib_exc.Conflict as ex:
182 # if rule already exist - skip rule and continue
183 msg = 'Security group rule already exists'
184 if msg not in ex._error_string:
185 raise ex
186 else:
187 self.assertEqual(r_direction, sg_rule['direction'])
188 rules.append(sg_rule)
189
190 return rules
191
Slawek Kaplonski8dd49aa2019-04-16 14:47:07 +0200192 def _create_router(self, client=None, tenant_id=None,
193 namestart='router-smoke'):
194 if not client:
195 client = self.routers_client
196 if not tenant_id:
197 tenant_id = client.tenant_id
198 name = data_utils.rand_name(namestart)
199 result = client.create_router(name=name,
200 admin_state_up=True,
201 tenant_id=tenant_id)
202 router = result['router']
203 self.assertEqual(router['name'], name)
204 self.addCleanup(test_utils.call_and_ignore_notfound_exc,
205 client.delete_router,
206 router['id'])
207 return router