Add a scenario test for internal dns_name
Change-Id: Ie46ee889d75b64292c1d9268624c4600a186ee41
diff --git a/neutron_tempest_plugin/scenario/base.py b/neutron_tempest_plugin/scenario/base.py
index b76a81a..8815945 100644
--- a/neutron_tempest_plugin/scenario/base.py
+++ b/neutron_tempest_plugin/scenario/base.py
@@ -15,6 +15,8 @@
import subprocess
import netaddr
+from neutron_lib.api import validators
+from neutron_lib import constants as neutron_lib_constants
from oslo_log import log
from tempest.common.utils import net_utils
from tempest.common import waiters
@@ -190,7 +192,8 @@
self.floating_ips.append(fip)
return fip
- def setup_network_and_server(self, router=None, **kwargs):
+ def setup_network_and_server(
+ self, router=None, server_name=None, **kwargs):
"""Create network resources and a server.
Creating a network, subnet, router, keypair, security group
@@ -212,12 +215,18 @@
self.keypair = self.create_keypair()
self.create_loginable_secgroup_rule(
secgroup_id=secgroup['security_group']['id'])
- self.server = self.create_server(
- flavor_ref=CONF.compute.flavor_ref,
- image_ref=CONF.compute.image_ref,
- key_name=self.keypair['name'],
- networks=[{'uuid': self.network['id']}],
- security_groups=[{'name': secgroup['security_group']['name']}])
+
+ server_kwargs = {
+ 'flavor_ref': CONF.compute.flavor_ref,
+ 'image_ref': CONF.compute.image_ref,
+ 'key_name': self.keypair['name'],
+ 'networks': [{'uuid': self.network['id']}],
+ 'security_groups': [{'name': secgroup['security_group']['name']}],
+ }
+ if server_name is not None:
+ server_kwargs['name'] = server_name
+
+ self.server = self.create_server(**server_kwargs)
self.wait_for_server_active(self.server['server'])
self.port = self.client.list_ports(network_id=self.network['id'],
device_id=self.server[
@@ -252,7 +261,8 @@
"for the console log", server['id'])
def _check_remote_connectivity(self, source, dest, should_succeed=True,
- nic=None, mtu=None, fragmentation=True):
+ nic=None, mtu=None, fragmentation=True,
+ timeout=None):
"""check ping server via source ssh connection
:param source: RemoteClient: an ssh connection from which to ping
@@ -267,15 +277,21 @@
def ping_host(source, host, count=CONF.validation.ping_count,
size=CONF.validation.ping_size, nic=None, mtu=None,
fragmentation=True):
- addr = netaddr.IPAddress(host)
- cmd = 'ping6' if addr.version == 6 else 'ping'
+ IP_VERSION_4 = neutron_lib_constants.IP_VERSION_4
+ IP_VERSION_6 = neutron_lib_constants.IP_VERSION_6
+
+ # Use 'ping6' for IPv6 addresses, 'ping' for IPv4 and hostnames
+ ip_version = (
+ IP_VERSION_6 if netaddr.valid_ipv6(host) else IP_VERSION_4)
+ cmd = (
+ 'ping6' if ip_version == IP_VERSION_6 else 'ping')
if nic:
cmd = 'sudo {cmd} -I {nic}'.format(cmd=cmd, nic=nic)
if mtu:
if not fragmentation:
cmd += ' -M do'
size = str(net_utils.get_ping_payload_size(
- mtu=mtu, ip_version=addr.version))
+ mtu=mtu, ip_version=ip_version))
cmd += ' -c{0} -w{0} -s{1} {2}'.format(count, size, host)
return source.exec_command(cmd)
@@ -289,22 +305,24 @@
'from: %s.', dest, source.host)
return not should_succeed
LOG.debug('ping result: %s', result)
- # Assert that the return traffic was from the correct
- # source address.
- from_source = 'from %s' % dest
- self.assertIn(from_source, result)
+
+ if validators.validate_ip_address(dest) is None:
+ # Assert that the return traffic was from the correct
+ # source address.
+ from_source = 'from %s' % dest
+ self.assertIn(from_source, result)
return should_succeed
- return test_utils.call_until_true(ping_remote,
- CONF.validation.ping_timeout,
- 1)
+ return test_utils.call_until_true(
+ ping_remote, timeout or CONF.validation.ping_timeout, 1)
def check_remote_connectivity(self, source, dest, should_succeed=True,
nic=None, mtu=None, fragmentation=True,
- servers=None):
+ servers=None, timeout=None):
try:
self.assertTrue(self._check_remote_connectivity(
- source, dest, should_succeed, nic, mtu, fragmentation))
+ source, dest, should_succeed, nic, mtu, fragmentation,
+ timeout=timeout))
except lib_exc.SSHTimeout as ssh_e:
LOG.debug(ssh_e)
self._log_console_output(servers)