Test updating FIP with a different port
The test creates two servers, first one with a floating ip. Then the
floating ip is updated and associated with the port from the second
server and connectivity is checked.
Related-bug: #1835029
Change-Id: I7f48a67af172911d52f62322d2d9fd0222c4c0a1
diff --git a/neutron_tempest_plugin/scenario/base.py b/neutron_tempest_plugin/scenario/base.py
index f24c82b..d87a365 100644
--- a/neutron_tempest_plugin/scenario/base.py
+++ b/neutron_tempest_plugin/scenario/base.py
@@ -378,3 +378,30 @@
"""
self.wait_for_server_status(
server, constants.SERVER_STATUS_ACTIVE, client)
+
+ def check_servers_hostnames(self, servers, log_errors=True):
+ """Compare hostnames of given servers with their names."""
+ try:
+ for server in servers:
+ kwargs = {}
+ try:
+ kwargs['port'] = server['port_forwarding']['external_port']
+ except KeyError:
+ pass
+ ssh_client = ssh.Client(
+ self.fip['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ pkey=self.keypair['private_key'],
+ **kwargs)
+ self.assertIn(server['name'],
+ ssh_client.exec_command('hostname'))
+ except lib_exc.SSHTimeout as ssh_e:
+ LOG.debug(ssh_e)
+ if log_errors:
+ self._log_console_output(servers)
+ raise
+ except AssertionError as assert_e:
+ LOG.debug(assert_e)
+ if log_errors:
+ self._log_console_output(servers)
+ raise
diff --git a/neutron_tempest_plugin/scenario/test_floatingip.py b/neutron_tempest_plugin/scenario/test_floatingip.py
index 15a2837..39aa09d 100644
--- a/neutron_tempest_plugin/scenario/test_floatingip.py
+++ b/neutron_tempest_plugin/scenario/test_floatingip.py
@@ -381,3 +381,58 @@
port=self.NC_PORT),
timeout=120,
sleep=1)
+
+
+class TestFloatingIPUpdate(FloatingIpTestCasesMixin,
+ base.BaseTempestTestCase):
+
+ same_network = None
+
+ @decorators.idempotent_id('1bdd849b-03dd-4b8f-994f-457cf8a36f93')
+ def test_floating_ip_update(self):
+ """Test updating FIP with another port.
+
+ The test creates two servers and attaches floating ip to first server.
+ Then it checks server is accesible using the FIP. FIP is then
+ associated with the second server and connectivity is checked again.
+ """
+ ports = [self.create_port(
+ self.network, security_groups=[self.secgroup['id']])
+ for i in range(2)]
+
+ servers = []
+ for port in ports:
+ name = data_utils.rand_name("server-%s" % port['id'][:8])
+ server = self.create_server(
+ name=name,
+ flavor_ref=CONF.compute.flavor_ref,
+ key_name=self.keypair['name'],
+ image_ref=CONF.compute.image_ref,
+ networks=[{'port': port['id']}])['server']
+ server['name'] = name
+ servers.append(server)
+ for server in servers:
+ self.wait_for_server_active(server)
+
+ self.fip = self.create_floatingip(port=ports[0])
+ self.check_connectivity(self.fip['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ self.keypair['private_key'])
+ self.client.update_floatingip(self.fip['id'], port_id=ports[1]['id'])
+
+ def _wait_for_fip_associated():
+ try:
+ self.check_servers_hostnames(servers[-1:], log_errors=False)
+ except (AssertionError, exceptions.SSHTimeout):
+ return False
+ return True
+
+ # The FIP is now associated with the port of the second server.
+ try:
+ common_utils.wait_until_true(_wait_for_fip_associated,
+ timeout=15, sleep=3)
+ except common_utils.WaitTimeout:
+ self._log_console_output(servers[-1:])
+ self.fail(
+ "Server %s is not accessible via its floating ip %s" % (
+ servers[-1]['id'], self.fip['id']))
diff --git a/neutron_tempest_plugin/scenario/test_port_forwardings.py b/neutron_tempest_plugin/scenario/test_port_forwardings.py
index 1d1fe94..3715fad 100644
--- a/neutron_tempest_plugin/scenario/test_port_forwardings.py
+++ b/neutron_tempest_plugin/scenario/test_port_forwardings.py
@@ -16,9 +16,7 @@
from oslo_log import log
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
-from tempest.lib import exceptions as lib_exc
-from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin import config
from neutron_tempest_plugin.scenario import base
@@ -70,19 +68,4 @@
protocol="tcp")
servers.append(server)
- try:
- for server in servers:
- ssh_client = ssh.Client(
- self.fip['floating_ip_address'],
- CONF.validation.image_ssh_user,
- pkey=self.keypair['private_key'],
- port=server['port_forwarding']['external_port'])
- self.assertIn(server['name'],
- ssh_client.exec_command('hostname'))
- except lib_exc.SSHTimeout as ssh_e:
- LOG.debug(ssh_e)
- self._log_console_output(servers)
- raise
- except AssertionError:
- self._log_console_output(servers)
- raise
+ self.check_servers_hostnames(servers)