blob: 9cbb4d8989d6473c6022a0dd207291fa494903c7 [file] [log] [blame]
Chandan Kumarc125fd12017-11-15 19:41:01 +05301# Copyright 2016 Red Hat, Inc.
2# All Rights Reserved.
3#
4# Licensed under the Apache License, Version 2.0 (the "License"); you may
5# not use this file except in compliance with the License. You may obtain
6# a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13# License for the specific language governing permissions and limitations
14# under the License.
15import netaddr
16
17from neutron_lib.api.definitions import provider_net
18from tempest.common import utils
19from tempest.common import waiters
20from tempest.lib import decorators
21
22from neutron_tempest_plugin.common import ssh
23from neutron_tempest_plugin import config
24from neutron_tempest_plugin.scenario import base
25from neutron_tempest_plugin.scenario import constants
26
27CONF = config.CONF
28
29
30class NetworkMtuBaseTest(base.BaseTempestTestCase):
31 credentials = ['primary', 'admin']
32 servers = []
33 networks = []
34
35 @classmethod
36 def skip_checks(cls):
37 super(NetworkMtuBaseTest, cls).skip_checks()
38 if ("vxlan" not in
39 config.CONF.neutron_plugin_options.available_type_drivers
40 or "gre" not in
41 config.CONF.neutron_plugin_options.available_type_drivers):
42 raise cls.skipException("GRE or VXLAN type_driver is not enabled")
43
44 @classmethod
45 @utils.requires_ext(extension=provider_net.ALIAS, service="network")
46 def resource_setup(cls):
47 super(NetworkMtuBaseTest, cls).resource_setup()
48 # setup basic topology for servers we can log into it
49 cls.router = cls.create_router_by_client()
50 cls.keypair = cls.create_keypair()
51 cls.secgroup = cls.os_primary.network_client.create_security_group(
52 name='secgroup_mtu')
53 cls.security_groups.append(cls.secgroup['security_group'])
54 cls.create_loginable_secgroup_rule(
55 secgroup_id=cls.secgroup['security_group']['id'])
56 cls.create_pingable_secgroup_rule(
57 secgroup_id=cls.secgroup['security_group']['id'])
58
59 def _create_setup(self):
60 self.admin_client = self.os_admin.network_client
61 net_kwargs = {'tenant_id': self.client.tenant_id}
62 for sub, net_type in (
63 ('10.100.0.0/16', 'vxlan'), ('10.200.0.0/16', 'gre')):
64 net_kwargs['name'] = '-'.join([net_type, 'net'])
65 net_kwargs['provider:network_type'] = net_type
66 network = self.admin_client.create_network(**net_kwargs)[
67 'network']
68 self.networks.append(network)
69 self.addCleanup(self.admin_client.delete_network, network['id'])
70 cidr = netaddr.IPNetwork(sub)
71 subnet = self.create_subnet(network, cidr=cidr)
72 self.create_router_interface(self.router['id'], subnet['id'])
73 self.addCleanup(self.client.remove_router_interface_with_subnet_id,
74 self.router['id'], subnet['id'])
75 # check that MTUs are different for 2 networks
76 self.assertNotEqual(self.networks[0]['mtu'], self.networks[1]['mtu'])
77 self.networks.sort(key=lambda net: net['mtu'])
78 server1, fip1 = self.create_pingable_vm(self.networks[0])
79 server_ssh_client1 = ssh.Client(
80 self.floating_ips[0]['floating_ip_address'],
81 CONF.validation.image_ssh_user,
82 pkey=self.keypair['private_key'])
83 server2, fip2 = self.create_pingable_vm(self.networks[1])
84 server_ssh_client2 = ssh.Client(
85 self.floating_ips[0]['floating_ip_address'],
86 CONF.validation.image_ssh_user,
87 pkey=self.keypair['private_key'])
88 for fip in (fip1, fip2):
89 self.check_connectivity(fip['floating_ip_address'],
90 CONF.validation.image_ssh_user,
91 self.keypair['private_key'])
92 return server_ssh_client1, fip1, server_ssh_client2, fip2
93
94 def create_pingable_vm(self, net):
95 server = self.create_server(
96 flavor_ref=CONF.compute.flavor_ref,
97 image_ref=CONF.compute.image_ref,
98 key_name=self.keypair['name'],
99 networks=[{'uuid': net['id']}],
100 security_groups=[{'name': self.secgroup[
101 'security_group']['name']}])
102 waiters.wait_for_server_status(
103 self.os_primary.servers_client, server['server']['id'],
104 constants.SERVER_STATUS_ACTIVE)
105 port = self.client.list_ports(
106 network_id=net['id'], device_id=server['server']['id'])['ports'][0]
107 fip = self.create_and_associate_floatingip(port['id'])
108 return server, fip
109
110 @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a273d9d344')
111 def test_connectivity_min_max_mtu(self):
112 server_ssh_client, _, _, fip2 = self._create_setup()
113 # ping with min mtu of 2 networks succeeds even when
114 # fragmentation is disabled
115 self.check_remote_connectivity(
116 server_ssh_client, fip2['fixed_ip_address'],
117 mtu=self.networks[0]['mtu'], fragmentation=False)
118
119 # ping with the size above min mtu of 2 networks
120 # fails when fragmentation is disabled
121 self.check_remote_connectivity(
122 server_ssh_client, fip2['fixed_ip_address'], should_succeed=False,
123 mtu=self.networks[0]['mtu'] + 1, fragmentation=False)
124
125 # ping with max mtu of 2 networks succeeds when
126 # fragmentation is enabled
127 self.check_remote_connectivity(
128 server_ssh_client, fip2['fixed_ip_address'],
129 mtu=self.networks[1]['mtu'])
130
131 # ping with max mtu of 2 networks fails when fragmentation is disabled
132 self.check_remote_connectivity(
133 server_ssh_client, fip2['fixed_ip_address'], should_succeed=False,
134 mtu=self.networks[1]['mtu'], fragmentation=False)