Code Sync from neutron project to newly created neutron-tempest-plugin
* The following commit sync the code from following hash:
start_hash: 7279aa35851110a4933a10b58b2758a2bc3933a3
end_hash: 6e911a49a9e630878f4c46f61fde3964be550880
Change-Id: I371aa4d5f043f695df04b98b0f485c8f0548f2b3
diff --git a/neutron_tempest_plugin/scenario/base.py b/neutron_tempest_plugin/scenario/base.py
index e810490..5cc085f 100644
--- a/neutron_tempest_plugin/scenario/base.py
+++ b/neutron_tempest_plugin/scenario/base.py
@@ -12,6 +12,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+import subprocess
import netaddr
from oslo_log import log
@@ -173,7 +174,7 @@
LOG.debug("Created subnet %s", self.subnet['id'])
secgroup = self.os_primary.network_client.create_security_group(
- name=data_utils.rand_name('secgroup-'))
+ name=data_utils.rand_name('secgroup'))
LOG.debug("Created security group %s",
secgroup['security_group']['name'])
self.security_groups.append(secgroup['security_group'])
@@ -276,3 +277,41 @@
nic=None, mtu=None, fragmentation=True):
self.assertTrue(self._check_remote_connectivity(
source, dest, should_succeed, nic, mtu, fragmentation))
+
+ def ping_ip_address(self, ip_address, should_succeed=True,
+ ping_timeout=None, mtu=None):
+ # the code is taken from tempest/scenario/manager.py in tempest git
+ timeout = ping_timeout or CONF.validation.ping_timeout
+ cmd = ['ping', '-c1', '-w1']
+
+ if mtu:
+ cmd += [
+ # don't fragment
+ '-M', 'do',
+ # ping receives just the size of ICMP payload
+ '-s', str(net_utils.get_ping_payload_size(mtu, 4))
+ ]
+ cmd.append(ip_address)
+
+ def ping():
+ proc = subprocess.Popen(cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ proc.communicate()
+
+ return (proc.returncode == 0) == should_succeed
+
+ caller = test_utils.find_test_caller()
+ LOG.debug('%(caller)s begins to ping %(ip)s in %(timeout)s sec and the'
+ ' expected result is %(should_succeed)s', {
+ 'caller': caller, 'ip': ip_address, 'timeout': timeout,
+ 'should_succeed':
+ 'reachable' if should_succeed else 'unreachable'
+ })
+ result = test_utils.call_until_true(ping, timeout, 1)
+ LOG.debug('%(caller)s finishes ping %(ip)s in %(timeout)s sec and the '
+ 'ping result is %(result)s', {
+ 'caller': caller, 'ip': ip_address, 'timeout': timeout,
+ 'result': 'expected' if result else 'unexpected'
+ })
+ return result
diff --git a/neutron_tempest_plugin/scenario/test_dvr.py b/neutron_tempest_plugin/scenario/test_dvr.py
index 3da0694..b1cba5a 100644
--- a/neutron_tempest_plugin/scenario/test_dvr.py
+++ b/neutron_tempest_plugin/scenario/test_dvr.py
@@ -12,8 +12,8 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+from tempest.common import utils
from tempest.lib import decorators
-from tempest import test
from neutron_lib import constants
from neutron_tempest_plugin import config
@@ -49,7 +49,7 @@
force_tenant_isolation = False
@classmethod
- @test.requires_ext(extension="dvr", service="network")
+ @utils.requires_ext(extension="dvr", service="network")
def skip_checks(cls):
super(NetworkDvrTest, cls).skip_checks()
diff --git a/neutron_tempest_plugin/scenario/test_floatingip.py b/neutron_tempest_plugin/scenario/test_floatingip.py
index 97bfcc5..5fcbdc0 100644
--- a/neutron_tempest_plugin/scenario/test_floatingip.py
+++ b/neutron_tempest_plugin/scenario/test_floatingip.py
@@ -14,10 +14,10 @@
# under the License.
import netaddr
+from tempest.common import utils
from tempest.common import waiters
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
-from tempest import test
import testscenarios
from testscenarios.scenarios import multiply_scenarios
@@ -37,7 +37,7 @@
credentials = ['primary', 'admin']
@classmethod
- @test.requires_ext(extension="router", service="network")
+ @utils.requires_ext(extension="router", service="network")
def resource_setup(cls):
super(FloatingIpTestCasesMixin, cls).resource_setup()
cls.network = cls.create_network()
@@ -47,7 +47,7 @@
cls.keypair = cls.create_keypair()
cls.secgroup = cls.os_primary.network_client.create_security_group(
- name=data_utils.rand_name('secgroup-'))['security_group']
+ name=data_utils.rand_name('secgroup'))['security_group']
cls.security_groups.append(cls.secgroup)
cls.create_loginable_secgroup_rule(secgroup_id=cls.secgroup['id'])
cls.create_pingable_secgroup_rule(secgroup_id=cls.secgroup['id'])
diff --git a/neutron_tempest_plugin/scenario/test_migration.py b/neutron_tempest_plugin/scenario/test_migration.py
index 291611c..62c3642 100644
--- a/neutron_tempest_plugin/scenario/test_migration.py
+++ b/neutron_tempest_plugin/scenario/test_migration.py
@@ -13,9 +13,14 @@
# License for the specific language governing permissions and limitations
# under the License.
-from tempest.lib import decorators
-from tempest import test
+import functools
+from neutron_lib.api.definitions import portbindings as pb
+from neutron_lib import constants as const
+from tempest.common import utils
+from tempest.lib import decorators
+
+from neutron_tempest_plugin.common import utils as common_utils
from neutron_tempest_plugin.scenario import base
from neutron_tempest_plugin.scenario import test_dvr
@@ -26,8 +31,8 @@
force_tenant_isolation = False
@classmethod
- @test.requires_ext(extension="dvr", service="network")
- @test.requires_ext(extension="l3-ha", service="network")
+ @utils.requires_ext(extension="dvr", service="network")
+ @utils.requires_ext(extension="l3-ha", service="network")
def skip_checks(cls):
super(NetworkMigrationTestBase, cls).skip_checks()
@@ -36,12 +41,77 @@
self.assertEqual(is_dvr, router['router']['distributed'])
self.assertEqual(is_ha, router['router']['ha'])
+ def _wait_until_port_deleted(self, router_id, device_owner):
+ common_utils.wait_until_true(
+ functools.partial(
+ self._is_port_deleted,
+ router_id,
+ device_owner),
+ timeout=300, sleep=5)
+
+ def _is_port_deleted(self, router_id, device_owner):
+ ports = self.os_admin.network_client.list_ports(
+ device_id=router_id,
+ device_owner=device_owner)
+ return not ports.get('ports')
+
+ def _wait_until_port_ready(self, router_id, device_owner):
+ common_utils.wait_until_true(
+ functools.partial(
+ self._is_port_active,
+ router_id,
+ device_owner),
+ timeout=300, sleep=5)
+
+ def _is_port_active(self, router_id, device_owner):
+ ports = self.os_admin.network_client.list_ports(
+ device_id=router_id,
+ device_owner=device_owner,
+ status=const.ACTIVE).get('ports')
+ if ports:
+ if ports[0][pb.VIF_TYPE] not in [pb.VIF_TYPE_UNBOUND,
+ pb.VIF_TYPE_BINDING_FAILED]:
+ return True
+ return False
+
+ def _wait_until_router_ports_ready(self, router_id, dvr, ha):
+ if dvr:
+ self._wait_until_port_ready(
+ router_id, const.DEVICE_OWNER_DVR_INTERFACE)
+ if ha:
+ self._wait_until_port_ready(
+ router_id, const.DEVICE_OWNER_ROUTER_HA_INTF)
+ if dvr:
+ self._wait_until_port_ready(
+ router_id, const.DEVICE_OWNER_ROUTER_SNAT)
+ else:
+ self._wait_until_port_ready(
+ router_id, const.DEVICE_OWNER_HA_REPLICATED_INT)
+ self._wait_until_port_ready(
+ router_id, const.DEVICE_OWNER_ROUTER_GW)
+
+ def _wait_until_router_ports_migrated(
+ self, router_id, before_dvr, before_ha, after_dvr, after_ha):
+ if before_ha and not after_ha:
+ self._wait_until_port_deleted(
+ router_id, const.DEVICE_OWNER_ROUTER_HA_INTF)
+ self._wait_until_port_deleted(
+ router_id, const.DEVICE_OWNER_HA_REPLICATED_INT)
+ if before_dvr and not after_dvr:
+ self._wait_until_port_deleted(
+ router_id, const.DEVICE_OWNER_DVR_INTERFACE)
+ self._wait_until_port_deleted(
+ router_id, const.DEVICE_OWNER_ROUTER_SNAT)
+ self._wait_until_router_ports_ready(router_id, after_dvr, after_ha)
+
def _test_migration(self, before_dvr, before_ha, after_dvr, after_ha):
router = self.create_router_by_client(
distributed=before_dvr, ha=before_ha,
tenant_id=self.client.tenant_id, is_admin=True)
self.setup_network_and_server(router=router)
+ self._wait_until_router_ports_ready(
+ router['id'], before_dvr, before_ha)
self._check_connectivity()
self.os_admin.network_client.update_router(
@@ -52,6 +122,9 @@
self.os_admin.network_client.update_router(
router_id=router['id'], admin_state_up=True)
+
+ self._wait_until_router_ports_migrated(
+ router['id'], before_dvr, before_ha, after_dvr, after_ha)
self._check_connectivity()
diff --git a/neutron_tempest_plugin/scenario/test_mtu.py b/neutron_tempest_plugin/scenario/test_mtu.py
new file mode 100644
index 0000000..9cbb4d8
--- /dev/null
+++ b/neutron_tempest_plugin/scenario/test_mtu.py
@@ -0,0 +1,134 @@
+# Copyright 2016 Red Hat, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import netaddr
+
+from neutron_lib.api.definitions import provider_net
+from tempest.common import utils
+from tempest.common import waiters
+from tempest.lib import decorators
+
+from neutron_tempest_plugin.common import ssh
+from neutron_tempest_plugin import config
+from neutron_tempest_plugin.scenario import base
+from neutron_tempest_plugin.scenario import constants
+
+CONF = config.CONF
+
+
+class NetworkMtuBaseTest(base.BaseTempestTestCase):
+ credentials = ['primary', 'admin']
+ servers = []
+ networks = []
+
+ @classmethod
+ def skip_checks(cls):
+ super(NetworkMtuBaseTest, cls).skip_checks()
+ if ("vxlan" not in
+ config.CONF.neutron_plugin_options.available_type_drivers
+ or "gre" not in
+ config.CONF.neutron_plugin_options.available_type_drivers):
+ raise cls.skipException("GRE or VXLAN type_driver is not enabled")
+
+ @classmethod
+ @utils.requires_ext(extension=provider_net.ALIAS, service="network")
+ def resource_setup(cls):
+ super(NetworkMtuBaseTest, cls).resource_setup()
+ # setup basic topology for servers we can log into it
+ cls.router = cls.create_router_by_client()
+ cls.keypair = cls.create_keypair()
+ cls.secgroup = cls.os_primary.network_client.create_security_group(
+ name='secgroup_mtu')
+ cls.security_groups.append(cls.secgroup['security_group'])
+ cls.create_loginable_secgroup_rule(
+ secgroup_id=cls.secgroup['security_group']['id'])
+ cls.create_pingable_secgroup_rule(
+ secgroup_id=cls.secgroup['security_group']['id'])
+
+ def _create_setup(self):
+ self.admin_client = self.os_admin.network_client
+ net_kwargs = {'tenant_id': self.client.tenant_id}
+ for sub, net_type in (
+ ('10.100.0.0/16', 'vxlan'), ('10.200.0.0/16', 'gre')):
+ net_kwargs['name'] = '-'.join([net_type, 'net'])
+ net_kwargs['provider:network_type'] = net_type
+ network = self.admin_client.create_network(**net_kwargs)[
+ 'network']
+ self.networks.append(network)
+ self.addCleanup(self.admin_client.delete_network, network['id'])
+ cidr = netaddr.IPNetwork(sub)
+ subnet = self.create_subnet(network, cidr=cidr)
+ self.create_router_interface(self.router['id'], subnet['id'])
+ self.addCleanup(self.client.remove_router_interface_with_subnet_id,
+ self.router['id'], subnet['id'])
+ # check that MTUs are different for 2 networks
+ self.assertNotEqual(self.networks[0]['mtu'], self.networks[1]['mtu'])
+ self.networks.sort(key=lambda net: net['mtu'])
+ server1, fip1 = self.create_pingable_vm(self.networks[0])
+ server_ssh_client1 = ssh.Client(
+ self.floating_ips[0]['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ pkey=self.keypair['private_key'])
+ server2, fip2 = self.create_pingable_vm(self.networks[1])
+ server_ssh_client2 = ssh.Client(
+ self.floating_ips[0]['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ pkey=self.keypair['private_key'])
+ for fip in (fip1, fip2):
+ self.check_connectivity(fip['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ self.keypair['private_key'])
+ return server_ssh_client1, fip1, server_ssh_client2, fip2
+
+ def create_pingable_vm(self, net):
+ server = self.create_server(
+ flavor_ref=CONF.compute.flavor_ref,
+ image_ref=CONF.compute.image_ref,
+ key_name=self.keypair['name'],
+ networks=[{'uuid': net['id']}],
+ security_groups=[{'name': self.secgroup[
+ 'security_group']['name']}])
+ waiters.wait_for_server_status(
+ self.os_primary.servers_client, server['server']['id'],
+ constants.SERVER_STATUS_ACTIVE)
+ port = self.client.list_ports(
+ network_id=net['id'], device_id=server['server']['id'])['ports'][0]
+ fip = self.create_and_associate_floatingip(port['id'])
+ return server, fip
+
+ @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a273d9d344')
+ def test_connectivity_min_max_mtu(self):
+ server_ssh_client, _, _, fip2 = self._create_setup()
+ # ping with min mtu of 2 networks succeeds even when
+ # fragmentation is disabled
+ self.check_remote_connectivity(
+ server_ssh_client, fip2['fixed_ip_address'],
+ mtu=self.networks[0]['mtu'], fragmentation=False)
+
+ # ping with the size above min mtu of 2 networks
+ # fails when fragmentation is disabled
+ self.check_remote_connectivity(
+ server_ssh_client, fip2['fixed_ip_address'], should_succeed=False,
+ mtu=self.networks[0]['mtu'] + 1, fragmentation=False)
+
+ # ping with max mtu of 2 networks succeeds when
+ # fragmentation is enabled
+ self.check_remote_connectivity(
+ server_ssh_client, fip2['fixed_ip_address'],
+ mtu=self.networks[1]['mtu'])
+
+ # ping with max mtu of 2 networks fails when fragmentation is disabled
+ self.check_remote_connectivity(
+ server_ssh_client, fip2['fixed_ip_address'], should_succeed=False,
+ mtu=self.networks[1]['mtu'], fragmentation=False)
diff --git a/neutron_tempest_plugin/scenario/test_qos.py b/neutron_tempest_plugin/scenario/test_qos.py
index d93f57f..67c00c2 100644
--- a/neutron_tempest_plugin/scenario/test_qos.py
+++ b/neutron_tempest_plugin/scenario/test_qos.py
@@ -16,13 +16,13 @@
import socket
import time
+from neutron_lib.services.qos import constants as qos_consts
from oslo_log import log as logging
+from tempest.common import utils as tutils
from tempest.lib import decorators
from tempest.lib import exceptions
-from tempest import test
from neutron_tempest_plugin.api import base as base_api
-from neutron_tempest_plugin.common import qos_consts
from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin.common import utils
from neutron_tempest_plugin import config
@@ -80,7 +80,7 @@
FILE_PATH = "/tmp/img"
@classmethod
- @test.requires_ext(extension="qos", service="network")
+ @tutils.requires_ext(extension="qos", service="network")
@base_api.require_qos_rule_type(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT)
def resource_setup(cls):
super(QoSTest, cls).resource_setup()
@@ -154,9 +154,9 @@
CONF.validation.image_ssh_user,
pkey=self.keypair['private_key'])
policy = self.os_admin.network_client.create_qos_policy(
- name='test-policy',
- description='test-qos-policy',
- shared=True)
+ name='test-policy',
+ description='test-qos-policy',
+ shared=True)
policy_id = policy['policy']['id']
self.os_admin.network_client.create_bandwidth_limit_rule(
policy_id, max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND,
diff --git a/neutron_tempest_plugin/scenario/test_security_groups.py b/neutron_tempest_plugin/scenario/test_security_groups.py
new file mode 100644
index 0000000..faaeb84
--- /dev/null
+++ b/neutron_tempest_plugin/scenario/test_security_groups.py
@@ -0,0 +1,200 @@
+# Copyright 2016 Red Hat, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+from neutron_lib import constants
+
+from tempest.common import waiters
+from tempest.lib.common.utils import data_utils
+from tempest.lib import decorators
+
+from neutron_tempest_plugin.common import ssh
+from neutron_tempest_plugin import config
+from neutron_tempest_plugin.scenario import base
+from neutron_tempest_plugin.scenario import constants as const
+
+CONF = config.CONF
+
+
+class NetworkDefaultSecGroupTest(base.BaseTempestTestCase):
+ credentials = ['primary', 'admin']
+ required_extensions = ['router', 'security-group']
+
+ @classmethod
+ def resource_setup(cls):
+ super(NetworkDefaultSecGroupTest, cls).resource_setup()
+ # setup basic topology for servers we can log into it
+ cls.network = cls.create_network()
+ cls.subnet = cls.create_subnet(cls.network)
+ router = cls.create_router_by_client()
+ cls.create_router_interface(router['id'], cls.subnet['id'])
+ cls.keypair = cls.create_keypair()
+
+ def create_vm_testing_sec_grp(self, num_servers=2, security_groups=None):
+ servers, fips, server_ssh_clients = ([], [], [])
+ for i in range(num_servers):
+ servers.append(self.create_server(
+ flavor_ref=CONF.compute.flavor_ref,
+ image_ref=CONF.compute.image_ref,
+ key_name=self.keypair['name'],
+ networks=[{'uuid': self.network['id']}],
+ security_groups=security_groups))
+ for i, server in enumerate(servers):
+ waiters.wait_for_server_status(
+ self.os_primary.servers_client, server['server']['id'],
+ const.SERVER_STATUS_ACTIVE)
+ port = self.client.list_ports(
+ network_id=self.network['id'], device_id=server['server'][
+ 'id'])['ports'][0]
+ fips.append(self.create_and_associate_floatingip(port['id']))
+ server_ssh_clients.append(ssh.Client(
+ fips[i]['floating_ip_address'], CONF.validation.image_ssh_user,
+ pkey=self.keypair['private_key']))
+ return server_ssh_clients, fips, servers
+
+ @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d764')
+ def test_default_sec_grp_scenarios(self):
+ server_ssh_clients, fips, _ = self.create_vm_testing_sec_grp()
+ # Check ssh connectivity when you add sec group rule, enabling ssh
+ self.create_loginable_secgroup_rule(
+ self.os_primary.network_client.list_security_groups()[
+ 'security_groups'][0]['id']
+ )
+ self.check_connectivity(fips[0]['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ self.keypair['private_key'])
+
+ # make sure ICMP connectivity still fails as only ssh rule was added
+ self.ping_ip_address(fips[0]['floating_ip_address'],
+ should_succeed=False)
+
+ # Check ICMP connectivity between VMs without specific rule for that
+ # It should work though the rule is not configured
+ self.check_remote_connectivity(
+ server_ssh_clients[0], fips[1]['fixed_ip_address'])
+
+ # Check ICMP connectivity from VM to external network
+ subnets = self.os_admin.network_client.list_subnets(
+ network_id=CONF.network.public_network_id)['subnets']
+ ext_net_ip = None
+ for subnet in subnets:
+ if subnet['ip_version'] == 4:
+ ext_net_ip = subnet['gateway_ip']
+ break
+ self.assertTrue(ext_net_ip)
+ self.check_remote_connectivity(server_ssh_clients[0], ext_net_ip)
+
+ @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d864')
+ def test_protocol_number_rule(self):
+ # protocol number is added instead of str in security rule creation
+ server_ssh_clients, fips, _ = self.create_vm_testing_sec_grp(
+ num_servers=1)
+ self.ping_ip_address(fips[0]['floating_ip_address'],
+ should_succeed=False)
+ rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
+ 'direction': constants.INGRESS_DIRECTION,
+ 'remote_ip_prefix': '0.0.0.0/0'}]
+ secgroup_id = self.os_primary.network_client.list_security_groups()[
+ 'security_groups'][0]['id']
+ self.create_secgroup_rules(rule_list, secgroup_id=secgroup_id)
+ self.ping_ip_address(fips[0]['floating_ip_address'])
+
+ @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d964')
+ def test_two_sec_groups(self):
+ # add 2 sec groups to VM and test rules of both are working
+ ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
+ icmp_secgrp_name = data_utils.rand_name('icmp_secgrp')
+ ssh_secgrp = self.os_primary.network_client.create_security_group(
+ name=ssh_secgrp_name)
+ self.create_loginable_secgroup_rule(
+ secgroup_id=ssh_secgrp['security_group']['id'])
+ icmp_secgrp = self.os_primary.network_client.create_security_group(
+ name=icmp_secgrp_name)
+ self.create_pingable_secgroup_rule(
+ secgroup_id=icmp_secgrp['security_group']['id'])
+ for sec_grp in (ssh_secgrp, icmp_secgrp):
+ self.security_groups.append(sec_grp['security_group'])
+ security_groups_list = [{'name': ssh_secgrp_name},
+ {'name': icmp_secgrp_name}]
+ server_ssh_clients, fips, servers = self.create_vm_testing_sec_grp(
+ num_servers=1, security_groups=security_groups_list)
+ # make sure ssh connectivity works
+ self.check_connectivity(fips[0]['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ self.keypair['private_key'])
+ # make sure ICMP connectivity works
+ self.ping_ip_address(fips[0]['floating_ip_address'],
+ should_succeed=True)
+ ports = self.client.list_ports(device_id=servers[0]['server']['id'])
+ port_id = ports['ports'][0]['id']
+
+ # update port with ssh security group only
+ self.os_primary.network_client.update_port(
+ port_id, security_groups=[ssh_secgrp['security_group']['id']])
+
+ # make sure ssh connectivity works
+ self.check_connectivity(fips[0]['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ self.keypair['private_key'])
+
+ # make sure ICMP connectivity doesn't work
+ self.ping_ip_address(fips[0]['floating_ip_address'],
+ should_succeed=False)
+
+ # update port with ssh and ICMP security groups
+ self.os_primary.network_client.update_port(
+ port_id, security_groups=[
+ icmp_secgrp['security_group']['id'],
+ ssh_secgrp['security_group']['id']])
+
+ # make sure ssh connectivity works after update
+ self.check_connectivity(fips[0]['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ self.keypair['private_key'])
+
+ # make sure ICMP connectivity works after update
+ self.ping_ip_address(fips[0]['floating_ip_address'])
+
+ @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d664')
+ def test_ip_prefix(self):
+ # Add specific remote prefix to VMs and check connectivity
+ ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
+ icmp_secgrp_name = data_utils.rand_name('icmp_secgrp_with_cidr')
+ cidr = self.subnet['cidr']
+ ssh_secgrp = self.os_primary.network_client.create_security_group(
+ name=ssh_secgrp_name)
+ self.create_loginable_secgroup_rule(
+ secgroup_id=ssh_secgrp['security_group']['id'])
+
+ rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
+ 'direction': constants.INGRESS_DIRECTION,
+ 'remote_ip_prefix': cidr}]
+ icmp_secgrp = self.os_primary.network_client.create_security_group(
+ name=icmp_secgrp_name)
+ self.create_secgroup_rules(
+ rule_list, secgroup_id=icmp_secgrp['security_group']['id'])
+ for sec_grp in (ssh_secgrp, icmp_secgrp):
+ self.security_groups.append(sec_grp['security_group'])
+ security_groups_list = [{'name': ssh_secgrp_name},
+ {'name': icmp_secgrp_name}]
+ server_ssh_clients, fips, servers = self.create_vm_testing_sec_grp(
+ security_groups=security_groups_list)
+
+ # make sure ssh connectivity works
+ self.check_connectivity(fips[0]['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ self.keypair['private_key'])
+
+ # make sure ICMP connectivity works
+ self.check_remote_connectivity(server_ssh_clients[0], fips[1][
+ 'fixed_ip_address'])
diff --git a/neutron_tempest_plugin/scenario/test_trunk.py b/neutron_tempest_plugin/scenario/test_trunk.py
index 95906a0..0008b0a 100644
--- a/neutron_tempest_plugin/scenario/test_trunk.py
+++ b/neutron_tempest_plugin/scenario/test_trunk.py
@@ -14,10 +14,10 @@
import netaddr
from oslo_log import log as logging
+from tempest.common import utils as tutils
from tempest.common import waiters
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
-from tempest import test
import testtools
from neutron_tempest_plugin.common import ssh
@@ -47,7 +47,7 @@
force_tenant_isolation = False
@classmethod
- @test.requires_ext(extension="trunk", service="network")
+ @tutils.requires_ext(extension="trunk", service="network")
def resource_setup(cls):
super(TrunkTest, cls).resource_setup()
# setup basic topology for servers we can log into
@@ -57,7 +57,7 @@
cls.create_router_interface(router['id'], cls.subnet['id'])
cls.keypair = cls.create_keypair()
cls.secgroup = cls.os_primary.network_client.create_security_group(
- name=data_utils.rand_name('secgroup-'))
+ name=data_utils.rand_name('secgroup'))
cls.security_groups.append(cls.secgroup['security_group'])
cls.create_loginable_secgroup_rule(
secgroup_id=cls.secgroup['security_group']['id'])