Add test to verify DHCP port IP address modification

When the IP address from a subnet DHCP port is changed, the route to the
metadata provisioned for that port should be modified accordingly
In case this route is not correctly updated, the server will not be
able to obtain metadata information - hence, it will not get the
ssh authorized keys

This new test only applies to OVN

Related-Bug: #1942794

Change-Id: I76e75db469e2518ed90561430aa9c8c68846dae5
diff --git a/neutron_tempest_plugin/scenario/ b/neutron_tempest_plugin/scenario/
index b95eaa2..d0545e2 100644
--- a/neutron_tempest_plugin/scenario/
+++ b/neutron_tempest_plugin/scenario/
@@ -11,14 +11,18 @@
 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 #    License for the specific language governing permissions and limitations
 #    under the License.
+import netaddr
+from neutron_lib import constants
 from oslo_log import log
 from paramiko import ssh_exception as ssh_exc
 from tempest.common import utils
 from tempest.lib.common.utils import data_utils
 from tempest.lib import decorators
 from tempest.lib import exceptions as lib_exc
+import testtools
 from neutron_tempest_plugin.common import ssh
+from neutron_tempest_plugin.common import utils as neutron_utils
 from neutron_tempest_plugin import config
 from neutron_tempest_plugin.scenario import base
@@ -92,3 +96,96 @@
+class DHCPPortUpdateTest(base.BaseTempestTestCase):
+    credentials = ['primary', 'admin']
+    @classmethod
+    def resource_setup(cls):
+        super(DHCPPortUpdateTest, cls).resource_setup()
+        cls.rand_name = data_utils.rand_name(
+            cls.__name__.rsplit('.', 1)[-1])
+ = cls.create_network(name=cls.rand_name)
+        cls.router = cls.create_router_by_client()
+        cls.keypair = cls.create_keypair(name=cls.rand_name)
+        cls.security_group = cls.create_security_group(name=cls.rand_name)
+        cls.create_loginable_secgroup_rule(cls.security_group['id'])
+        cls.create_pingable_secgroup_rule(cls.security_group['id'])
+    @testtools.skipUnless(
+        CONF.neutron_plugin_options.firewall_driver == 'ovn',
+        "OVN driver is required to run this test - "
+        "LP#1942794 solution only applied to OVN")
+    @decorators.idempotent_id('8171cc68-9dbb-46ca-b065-17b5b2e26094')
+    def test_modify_dhcp_port_ip_address(self):
+        """Test Scenario
+        1) Create a network and a subnet with DHCP enabled
+        2) Modify the default IP address from the subnet DHCP port
+        3) Create a server in this network and check ssh connectivity
+        For the step 3), the server needs to obtain ssh keys from the metadata
+        Related bug: LP#1942794
+        """
+        # create subnet (dhcp is enabled by default)
+        subnet = self.create_subnet(, name=self.rand_name)
+        def _get_dhcp_ports():
+            # in some cases, like ML2/OVS, the subnet port associated to DHCP
+            # is created with device_owner='network:dhcp'
+            dhcp_ports = self.client.list_ports(
+      ['id'],
+                device_owner=constants.DEVICE_OWNER_DHCP)['ports']
+            # in other cases, like ML2/OVN, the subnet port used for metadata
+            # is created with device_owner='network:distributed'
+            distributed_ports = self.client.list_ports(
+      ['id'],
+                device_owner=constants.DEVICE_OWNER_DISTRIBUTED)['ports']
+            self.dhcp_ports = dhcp_ports + distributed_ports
+            self.assertLessEqual(
+                len(self.dhcp_ports), 1, msg='Only one port was expected')
+            return len(self.dhcp_ports) == 1
+        # obtain the dhcp port
+        # in some cases this port is not created together with the subnet, but
+        # immediately after it, so some delay may be needed and that is the
+        # reason why a waiter function is used here
+        self.dhcp_ports = []
+        neutron_utils.wait_until_true(
+            lambda: _get_dhcp_ports(),
+            timeout=10)
+        dhcp_port = self.dhcp_ports[0]
+        # modify DHCP port IP address
+        old_dhcp_port_ip = netaddr.IPAddress(
+            dhcp_port['fixed_ips'][0]['ip_address'])
+        if str(old_dhcp_port_ip) != subnet['allocation_pools'][0]['end']:
+            new_dhcp_port_ip = str(old_dhcp_port_ip + 1)
+        else:
+            new_dhcp_port_ip = str(old_dhcp_port_ip - 1)
+        self.update_port(port=dhcp_port,
+                         fixed_ips=[{'subnet_id': subnet['id'],
+                                     'ip_address': new_dhcp_port_ip}])
+        # create server
+        server = self.create_server(
+            flavor_ref=CONF.compute.flavor_ref,
+            image_ref=CONF.compute.image_ref,
+            key_name=self.keypair['name'],
+            security_groups=[{'name': self.security_group['name']}],
+            networks=[{'uuid':['id']}])
+        # attach fip to the server
+        self.create_router_interface(self.router['id'], subnet['id'])
+        server_port = self.client.list_ports(
+  ['id'],
+            device_id=server['server']['id'])['ports'][0]
+        fip = self.create_floatingip(port_id=server_port['id'])
+        # check connectivity
+        self.check_connectivity(fip['floating_ip_address'],
+                                CONF.validation.image_ssh_user,
+                                self.keypair['private_key'])