Add support MTU tests in the same network type

Neutron now supports setting MTU when creating and updating
a network, so in tempest we can use same network type for
testing different mtu size.

Change-Id: I328ed3f196498ec597cb06d3684c94719666623f
Closes-Bug: #1742197
diff --git a/neutron_tempest_plugin/scenario/test_mtu.py b/neutron_tempest_plugin/scenario/test_mtu.py
index 9cbb4d8..932c645 100644
--- a/neutron_tempest_plugin/scenario/test_mtu.py
+++ b/neutron_tempest_plugin/scenario/test_mtu.py
@@ -17,6 +17,7 @@
 from neutron_lib.api.definitions import provider_net
 from tempest.common import utils
 from tempest.common import waiters
+from tempest.lib.common.utils import data_utils
 from tempest.lib import decorators
 
 from neutron_tempest_plugin.common import ssh
@@ -28,21 +29,8 @@
 
 
 class NetworkMtuBaseTest(base.BaseTempestTestCase):
-    credentials = ['primary', 'admin']
-    servers = []
-    networks = []
 
     @classmethod
-    def skip_checks(cls):
-        super(NetworkMtuBaseTest, cls).skip_checks()
-        if ("vxlan" not in
-                config.CONF.neutron_plugin_options.available_type_drivers
-            or "gre" not in
-                config.CONF.neutron_plugin_options.available_type_drivers):
-            raise cls.skipException("GRE or VXLAN type_driver is not enabled")
-
-    @classmethod
-    @utils.requires_ext(extension=provider_net.ALIAS, service="network")
     def resource_setup(cls):
         super(NetworkMtuBaseTest, cls).resource_setup()
         # setup basic topology for servers we can log into it
@@ -56,6 +44,42 @@
         cls.create_pingable_secgroup_rule(
             secgroup_id=cls.secgroup['security_group']['id'])
 
+    def create_pingable_vm(self, net, keypair, secgroup):
+        server = self.create_server(
+            flavor_ref=CONF.compute.flavor_ref,
+            image_ref=CONF.compute.image_ref,
+            key_name=keypair['name'],
+            networks=[{'uuid': net['id']}],
+            security_groups=[{'name': secgroup[
+                'security_group']['name']}])
+        waiters.wait_for_server_status(
+            self.os_primary.servers_client, server['server']['id'],
+            constants.SERVER_STATUS_ACTIVE)
+        port = self.client.list_ports(
+            network_id=net['id'], device_id=server['server']['id'])['ports'][0]
+        fip = self.create_and_associate_floatingip(port['id'])
+        return server, fip
+
+
+class NetworkMtuTest(NetworkMtuBaseTest):
+    credentials = ['primary', 'admin']
+    servers = []
+    networks = []
+
+    @classmethod
+    def skip_checks(cls):
+        super(NetworkMtuTest, cls).skip_checks()
+        if ("vxlan" not in
+                config.CONF.neutron_plugin_options.available_type_drivers
+            or "gre" not in
+                config.CONF.neutron_plugin_options.available_type_drivers):
+            raise cls.skipException("GRE or VXLAN type_driver is not enabled")
+
+    @classmethod
+    @utils.requires_ext(extension=provider_net.ALIAS, service="network")
+    def resource_setup(cls):
+        super(NetworkMtuTest, cls).resource_setup()
+
     def _create_setup(self):
         self.admin_client = self.os_admin.network_client
         net_kwargs = {'tenant_id': self.client.tenant_id}
@@ -75,12 +99,14 @@
         # check that MTUs are different for 2 networks
         self.assertNotEqual(self.networks[0]['mtu'], self.networks[1]['mtu'])
         self.networks.sort(key=lambda net: net['mtu'])
-        server1, fip1 = self.create_pingable_vm(self.networks[0])
+        server1, fip1 = self.create_pingable_vm(self.networks[0],
+            self.keypair, self.secgroup)
         server_ssh_client1 = ssh.Client(
             self.floating_ips[0]['floating_ip_address'],
             CONF.validation.image_ssh_user,
             pkey=self.keypair['private_key'])
-        server2, fip2 = self.create_pingable_vm(self.networks[1])
+        server2, fip2 = self.create_pingable_vm(self.networks[1],
+            self.keypair, self.secgroup)
         server_ssh_client2 = ssh.Client(
             self.floating_ips[0]['floating_ip_address'],
             CONF.validation.image_ssh_user,
@@ -91,22 +117,6 @@
                                     self.keypair['private_key'])
         return server_ssh_client1, fip1, server_ssh_client2, fip2
 
-    def create_pingable_vm(self, net):
-        server = self.create_server(
-            flavor_ref=CONF.compute.flavor_ref,
-            image_ref=CONF.compute.image_ref,
-            key_name=self.keypair['name'],
-            networks=[{'uuid': net['id']}],
-            security_groups=[{'name': self.secgroup[
-                'security_group']['name']}])
-        waiters.wait_for_server_status(
-            self.os_primary.servers_client, server['server']['id'],
-            constants.SERVER_STATUS_ACTIVE)
-        port = self.client.list_ports(
-            network_id=net['id'], device_id=server['server']['id'])['ports'][0]
-        fip = self.create_and_associate_floatingip(port['id'])
-        return server, fip
-
     @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a273d9d344')
     def test_connectivity_min_max_mtu(self):
         server_ssh_client, _, _, fip2 = self._create_setup()
@@ -132,3 +142,93 @@
         self.check_remote_connectivity(
             server_ssh_client, fip2['fixed_ip_address'], should_succeed=False,
             mtu=self.networks[1]['mtu'], fragmentation=False)
+
+
+class NetworkWritableMtuTest(NetworkMtuBaseTest):
+    credentials = ['primary', 'admin']
+    servers = []
+    networks = []
+
+    @classmethod
+    def skip_checks(cls):
+        super(NetworkWritableMtuTest, cls).skip_checks()
+        if ("vxlan" not in
+            config.CONF.neutron_plugin_options.available_type_drivers):
+            raise cls.skipException("VXLAN type_driver is not enabled")
+
+    @classmethod
+    @utils.requires_ext(extension="net-mtu-writable", service="network")
+    def resource_setup(cls):
+        super(NetworkWritableMtuTest, cls).resource_setup()
+
+    def _create_setup(self):
+        self.admin_client = self.os_admin.network_client
+        net_kwargs = {'tenant_id': self.client.tenant_id,
+                      'provider:network_type': 'vxlan'}
+        for sub in ('10.100.0.0/16', '10.200.0.0/16'):
+            net_kwargs['name'] = data_utils.rand_name('net')
+            network = self.admin_client.create_network(**net_kwargs)[
+                'network']
+            self.networks.append(network)
+            self.addCleanup(self.admin_client.delete_network, network['id'])
+            cidr = netaddr.IPNetwork(sub)
+            subnet = self.create_subnet(network, cidr=cidr)
+            self.create_router_interface(self.router['id'], subnet['id'])
+            self.addCleanup(self.client.remove_router_interface_with_subnet_id,
+                            self.router['id'], subnet['id'])
+
+        # Update network mtu.
+        net_mtu = self.admin_client.show_network(
+            self.networks[0]['id'])['network']['mtu']
+        self.admin_client.update_network(self.networks[0]['id'],
+            mtu=(net_mtu - 1))
+        self.networks[0]['mtu'] = (
+            self.admin_client.show_network(
+                self.networks[0]['id'])['network']['mtu'])
+
+        # check that MTUs are different for 2 networks
+        self.assertNotEqual(self.networks[0]['mtu'], self.networks[1]['mtu'])
+        self.networks.sort(key=lambda net: net['mtu'])
+        server1, fip1 = self.create_pingable_vm(self.networks[0],
+            self.keypair, self.secgroup)
+        server_ssh_client1 = ssh.Client(
+            self.floating_ips[0]['floating_ip_address'],
+            CONF.validation.image_ssh_user,
+            pkey=self.keypair['private_key'])
+        server2, fip2 = self.create_pingable_vm(self.networks[1],
+            self.keypair, self.secgroup)
+        server_ssh_client2 = ssh.Client(
+            self.floating_ips[0]['floating_ip_address'],
+            CONF.validation.image_ssh_user,
+            pkey=self.keypair['private_key'])
+        for fip in (fip1, fip2):
+            self.check_connectivity(fip['floating_ip_address'],
+                                    CONF.validation.image_ssh_user,
+                                    self.keypair['private_key'])
+        return server_ssh_client1, fip1, server_ssh_client2, fip2
+
+    @decorators.idempotent_id('bc470200-d8f4-4f07-b294-1b4cbaaa35b9')
+    def test_connectivity_min_max_mtu(self):
+        server_ssh_client, _, _, fip2 = self._create_setup()
+        # ping with min mtu of 2 networks succeeds even when
+        # fragmentation is disabled
+        self.check_remote_connectivity(
+            server_ssh_client, fip2['fixed_ip_address'],
+            mtu=self.networks[0]['mtu'], fragmentation=False)
+
+        # ping with the size above min mtu of 2 networks
+        # fails when fragmentation is disabled
+        self.check_remote_connectivity(
+            server_ssh_client, fip2['fixed_ip_address'], should_succeed=False,
+            mtu=self.networks[0]['mtu'] + 2, fragmentation=False)
+
+        # ping with max mtu of 2 networks succeeds when
+        # fragmentation is enabled
+        self.check_remote_connectivity(
+            server_ssh_client, fip2['fixed_ip_address'],
+            mtu=self.networks[1]['mtu'])
+
+        # ping with max mtu of 2 networks fails when fragmentation is disabled
+        self.check_remote_connectivity(
+            server_ssh_client, fip2['fixed_ip_address'], should_succeed=False,
+            mtu=self.networks[1]['mtu'], fragmentation=False)