Merge "Add test to verify DHCP port IP address modification"
diff --git a/neutron_tempest_plugin/scenario/test_dhcp.py b/neutron_tempest_plugin/scenario/test_dhcp.py
index b95eaa2..d0545e2 100644
--- a/neutron_tempest_plugin/scenario/test_dhcp.py
+++ b/neutron_tempest_plugin/scenario/test_dhcp.py
@@ -11,14 +11,18 @@
 #    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 #    License for the specific language governing permissions and limitations
 #    under the License.
+import netaddr
+from neutron_lib import constants
 from oslo_log import log
 from paramiko import ssh_exception as ssh_exc
 from tempest.common import utils
 from tempest.lib.common.utils import data_utils
 from tempest.lib import decorators
 from tempest.lib import exceptions as lib_exc
+import testtools
 
 from neutron_tempest_plugin.common import ssh
+from neutron_tempest_plugin.common import utils as neutron_utils
 from neutron_tempest_plugin import config
 from neutron_tempest_plugin.scenario import base
 
@@ -92,3 +96,96 @@
             self._log_console_output([server])
             self._log_local_network_status()
             raise
+
+
+class DHCPPortUpdateTest(base.BaseTempestTestCase):
+
+    credentials = ['primary', 'admin']
+
+    @classmethod
+    def resource_setup(cls):
+        super(DHCPPortUpdateTest, cls).resource_setup()
+        cls.rand_name = data_utils.rand_name(
+            cls.__name__.rsplit('.', 1)[-1])
+        cls.network = cls.create_network(name=cls.rand_name)
+        cls.router = cls.create_router_by_client()
+        cls.keypair = cls.create_keypair(name=cls.rand_name)
+        cls.security_group = cls.create_security_group(name=cls.rand_name)
+        cls.create_loginable_secgroup_rule(cls.security_group['id'])
+        cls.create_pingable_secgroup_rule(cls.security_group['id'])
+
+    @testtools.skipUnless(
+        CONF.neutron_plugin_options.firewall_driver == 'ovn',
+        "OVN driver is required to run this test - "
+        "LP#1942794 solution only applied to OVN")
+    @decorators.idempotent_id('8171cc68-9dbb-46ca-b065-17b5b2e26094')
+    def test_modify_dhcp_port_ip_address(self):
+        """Test Scenario
+
+        1) Create a network and a subnet with DHCP enabled
+        2) Modify the default IP address from the subnet DHCP port
+        3) Create a server in this network and check ssh connectivity
+
+        For the step 3), the server needs to obtain ssh keys from the metadata
+
+        Related bug: LP#1942794
+        """
+        # create subnet (dhcp is enabled by default)
+        subnet = self.create_subnet(network=self.network, name=self.rand_name)
+
+        def _get_dhcp_ports():
+            # in some cases, like ML2/OVS, the subnet port associated to DHCP
+            # is created with device_owner='network:dhcp'
+            dhcp_ports = self.client.list_ports(
+                network_id=self.network['id'],
+                device_owner=constants.DEVICE_OWNER_DHCP)['ports']
+            # in other cases, like ML2/OVN, the subnet port used for metadata
+            # is created with device_owner='network:distributed'
+            distributed_ports = self.client.list_ports(
+                network_id=self.network['id'],
+                device_owner=constants.DEVICE_OWNER_DISTRIBUTED)['ports']
+            self.dhcp_ports = dhcp_ports + distributed_ports
+            self.assertLessEqual(
+                len(self.dhcp_ports), 1, msg='Only one port was expected')
+            return len(self.dhcp_ports) == 1
+
+        # obtain the dhcp port
+        # in some cases this port is not created together with the subnet, but
+        # immediately after it, so some delay may be needed and that is the
+        # reason why a waiter function is used here
+        self.dhcp_ports = []
+        neutron_utils.wait_until_true(
+            lambda: _get_dhcp_ports(),
+            timeout=10)
+        dhcp_port = self.dhcp_ports[0]
+
+        # modify DHCP port IP address
+        old_dhcp_port_ip = netaddr.IPAddress(
+            dhcp_port['fixed_ips'][0]['ip_address'])
+        if str(old_dhcp_port_ip) != subnet['allocation_pools'][0]['end']:
+            new_dhcp_port_ip = str(old_dhcp_port_ip + 1)
+        else:
+            new_dhcp_port_ip = str(old_dhcp_port_ip - 1)
+        self.update_port(port=dhcp_port,
+                         fixed_ips=[{'subnet_id': subnet['id'],
+                                     'ip_address': new_dhcp_port_ip}])
+
+        # create server
+        server = self.create_server(
+            flavor_ref=CONF.compute.flavor_ref,
+            image_ref=CONF.compute.image_ref,
+            key_name=self.keypair['name'],
+            security_groups=[{'name': self.security_group['name']}],
+            networks=[{'uuid': self.network['id']}])
+
+        # attach fip to the server
+        self.create_router_interface(self.router['id'], subnet['id'])
+        server_port = self.client.list_ports(
+            network_id=self.network['id'],
+            device_id=server['server']['id'])['ports'][0]
+        fip = self.create_floatingip(port_id=server_port['id'])
+
+        # check connectivity
+        self.check_connectivity(fip['floating_ip_address'],
+                                CONF.validation.image_ssh_user,
+                                self.keypair['private_key'])