Merge "Add test: Editing and deleting port forwarding TCP rule"
diff --git a/neutron_tempest_plugin/scenario/base.py b/neutron_tempest_plugin/scenario/base.py
index 35e5c31..b441209 100644
--- a/neutron_tempest_plugin/scenario/base.py
+++ b/neutron_tempest_plugin/scenario/base.py
@@ -446,11 +446,13 @@
self.wait_for_server_status(
server, constants.SERVER_STATUS_ACTIVE, client)
- def check_servers_hostnames(self, servers, log_errors=True):
+ def check_servers_hostnames(self, servers, timeout=None, log_errors=True):
"""Compare hostnames of given servers with their names."""
try:
for server in servers:
kwargs = {}
+ if timeout:
+ kwargs['timeout'] = timeout
try:
kwargs['port'] = (
server['port_forwarding_tcp']['external_port'])
diff --git a/neutron_tempest_plugin/scenario/test_port_forwardings.py b/neutron_tempest_plugin/scenario/test_port_forwardings.py
index ab04050..da1db1b 100644
--- a/neutron_tempest_plugin/scenario/test_port_forwardings.py
+++ b/neutron_tempest_plugin/scenario/test_port_forwardings.py
@@ -17,6 +17,7 @@
from oslo_log import log
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
+from tempest.lib import exceptions as lib_exc
from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin.common import utils
@@ -45,9 +46,9 @@
cls.create_loginable_secgroup_rule(secgroup_id=cls.secgroup['id'])
cls.keypair = cls.create_keypair()
- def _prepare_resources(self, num_servers, internal_tcp_port, protocol):
+ def _prepare_resources(self, num_servers, internal_tcp_port, protocol,
+ external_port_base=1025):
servers = []
- external_port_base = 1025
for i in range(1, num_servers + 1):
internal_udp_port = internal_tcp_port + 10
external_tcp_port = external_port_base + i
@@ -121,3 +122,65 @@
self.check_servers_hostnames(servers)
# And now test UDP port forwarding using nc
self._test_udp_port_forwarding(servers)
+
+ @decorators.idempotent_id('aa19d46c-a4a6-11ea-bb37-0242ac130002')
+ def test_port_forwarding_editing_and_deleting_tcp_rule(self):
+ server = self._prepare_resources(
+ num_servers=1, internal_tcp_port=22,
+ protocol=constants.PROTO_NAME_TCP,
+ external_port_base=1035)
+ fip_id = server[0]['port_forwarding_tcp']['floatingip_id']
+ pf_id = server[0]['port_forwarding_tcp']['id']
+
+ # Check connectivity with the original parameters
+ self.check_servers_hostnames(server)
+
+ # Use a reasonable timeout to verify that connections will not
+ # happen. Default would be 196 seconds, which is an overkill.
+ test_ssh_connect_timeout = 6
+
+ # Update external port and check connectivity with original parameters
+ # Port under server[0]['port_forwarding_tcp']['external_port'] should
+ # not answer at this point.
+
+ def fip_pf_connectivity():
+ try:
+ self.check_servers_hostnames(
+ server, timeout=test_ssh_connect_timeout)
+ return True
+ except (AssertionError, lib_exc.SSHTimeout):
+ return False
+
+ def no_fip_pf_connectivity():
+ return not fip_pf_connectivity()
+
+ self.client.update_port_forwarding(fip_id, pf_id, external_port=3333)
+ utils.wait_until_true(
+ no_fip_pf_connectivity,
+ exception=RuntimeError(
+ "Connection to the server {!r} through "
+ "port {!r} is still possible.".format(
+ server[0]['id'],
+ server[0]['port_forwarding_tcp']['external_port'])))
+
+ # Check connectivity with the new parameters
+ server[0]['port_forwarding_tcp']['external_port'] = 3333
+ utils.wait_until_true(
+ fip_pf_connectivity,
+ exception=RuntimeError(
+ "Connection to the server {!r} through "
+ "port {!r} is not possible.".format(
+ server[0]['id'],
+ server[0]['port_forwarding_tcp']['external_port'])))
+
+ # Remove port forwarding and ensure connection stops working.
+ self.client.delete_port_forwarding(fip_id, pf_id)
+ self.assertRaises(lib_exc.NotFound, self.client.get_port_forwarding,
+ fip_id, pf_id)
+ utils.wait_until_true(
+ no_fip_pf_connectivity,
+ exception=RuntimeError(
+ "Connection to the server {!r} through "
+ "port {!r} is still possible.".format(
+ server[0]['id'],
+ server[0]['port_forwarding_tcp']['external_port'])))