[Stateless SG] Add test to check connectivity between vms
This patch adds new test (for both stateful and stateless type of
security groups) which creates: 2 VMs connected to 2 different networks
and using 2 different SGs. Both networks connected to the same router.
Finally it checks TCP connectivity between VMs.
In case of stateless SG additional security group rule needs to be added
to allow ingress traffic on the ephemeral ports' range.
Change-Id: Icf318d8e7a76286ee2c2b8626d80e554aa5289f6
diff --git a/neutron_tempest_plugin/scenario/base.py b/neutron_tempest_plugin/scenario/base.py
index eb03c93..4d769dc 100644
--- a/neutron_tempest_plugin/scenario/base.py
+++ b/neutron_tempest_plugin/scenario/base.py
@@ -630,13 +630,14 @@
self._log_local_network_status()
raise
- def nc_client(self, ip_address, port, protocol):
+ def nc_client(self, ip_address, port, protocol, ssh_client=None):
"""Check connectivity to TCP/UDP port at host via nc.
- Client is always executed locally on host where tests are executed.
+ If ssh_client is not given, it is executed locally on host where tests
+ are executed. Otherwise ssh_client object is used to execute it.
"""
cmd = get_ncat_client_cmd(ip_address, port, protocol)
- result = shell.execute_local_command(cmd)
+ result = shell.execute(cmd, ssh_client=ssh_client)
self.assertEqual(0, result.exit_status)
return result.stdout
diff --git a/neutron_tempest_plugin/scenario/test_security_groups.py b/neutron_tempest_plugin/scenario/test_security_groups.py
index f7dee5e..a8faae5 100644
--- a/neutron_tempest_plugin/scenario/test_security_groups.py
+++ b/neutron_tempest_plugin/scenario/test_security_groups.py
@@ -31,6 +31,8 @@
CONF = config.CONF
+EPHEMERAL_PORT_RANGE = {'min': 32768, 'max': 65535}
+
class BaseNetworkSecGroupTest(base.BaseTempestTestCase):
credentials = ['primary', 'admin']
@@ -84,8 +86,8 @@
# setup basic topology for servers we can log into it
cls.network = cls.create_network()
cls.subnet = cls.create_subnet(cls.network)
- router = cls.create_router_by_client()
- cls.create_router_interface(router['id'], cls.subnet['id'])
+ cls.router = cls.create_router_by_client()
+ cls.create_router_interface(cls.router['id'], cls.subnet['id'])
cls.keypair = cls.create_keypair()
def setUp(self):
@@ -95,7 +97,7 @@
self.network_client.update_quotas(self.project_id, security_group=-1)
def create_vm_testing_sec_grp(self, num_servers=2, security_groups=None,
- ports=None):
+ ports=None, network_id=None):
"""Create instance for security group testing
:param num_servers (int): number of servers to spawn
@@ -103,13 +105,14 @@
:param ports* (list): list of ports
*Needs to be the same length as num_servers
"""
+ network_id = network_id or self.network['id']
servers, fips, server_ssh_clients = ([], [], [])
for i in range(num_servers):
server_args = {
'flavor_ref': CONF.compute.flavor_ref,
'image_ref': CONF.compute.image_ref,
'key_name': self.keypair['name'],
- 'networks': [{'uuid': self.network['id']}],
+ 'networks': [{'uuid': network_id}],
'security_groups': security_groups
}
if ports is not None:
@@ -120,7 +123,7 @@
self.os_primary.servers_client, server['server']['id'],
const.SERVER_STATUS_ACTIVE)
port = self.client.list_ports(
- network_id=self.network['id'], device_id=server['server'][
+ network_id=network_id, device_id=server['server'][
'id'])['ports'][0]
fips.append(self.create_floatingip(port=port))
server_ssh_clients.append(ssh.Client(
@@ -135,30 +138,103 @@
if sg['name'] == 'default':
return sg
+ def _create_security_group(self, name_prefix, **kwargs):
+ if self.stateless_sg:
+ kwargs['stateful'] = False
+ return super(BaseNetworkSecGroupTest, self).create_security_group(
+ name=data_utils.rand_name(name_prefix), **kwargs)
+
+ def _test_connectivity_between_vms_using_different_sec_groups(self):
+ TEST_TCP_PORT = 1022
+ networks = {
+ 'server': self.network,
+ 'client': self.create_network()}
+ subnet = self.create_subnet(networks['client'])
+ self.create_router_interface(self.router['id'], subnet['id'])
+
+ security_groups = {}
+ for sg_name in ["server", "client"]:
+ sg = self._create_security_group('vm_%s_secgrp' % sg_name)
+ self.create_loginable_secgroup_rule(
+ secgroup_id=sg['id'])
+ self.create_security_group_rule(
+ security_group_id=sg['id'],
+ protocol=constants.PROTO_NAME_TCP,
+ direction=constants.INGRESS_DIRECTION,
+ port_range_min=TEST_TCP_PORT,
+ port_range_max=TEST_TCP_PORT)
+ if self.stateless_sg:
+ self.create_ingress_metadata_secgroup_rule(
+ secgroup_id=sg['id'])
+ if sg_name == 'client':
+ # NOTE(slaweq): In case of stateless SG we need also SG
+ # rule to accept response from server to client,
+ self.create_security_group_rule(
+ security_group_id=sg['id'],
+ protocol=constants.PROTO_NAME_TCP,
+ direction=constants.INGRESS_DIRECTION,
+ port_range_min=EPHEMERAL_PORT_RANGE['min'],
+ port_range_max=EPHEMERAL_PORT_RANGE['max'])
+ security_groups[sg_name] = sg
+
+ # NOTE(slaweq): we need to iterate over create_vm_testing_sec_grp as
+ # this method plugs all SGs to all VMs and we need each vm to use other
+ # SGs
+ ssh_clients = {}
+ fips = {}
+ servers = {}
+ for server_name, sg in security_groups.items():
+ _ssh_clients, _fips, _servers = self.create_vm_testing_sec_grp(
+ num_servers=1,
+ security_groups=[{'name': sg['name']}],
+ network_id=networks[server_name]['id'])
+ ssh_clients[server_name] = _ssh_clients[0]
+ fips[server_name] = _fips[0]
+ servers[server_name] = _servers[0]
+
+ # make sure tcp connectivity between vms works fine
+ for fip in fips.values():
+ self.check_connectivity(fip['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ self.keypair['private_key'])
+
+ # Check connectivity between servers
+ def _message_received(server_ssh_client, client_ssh_client,
+ dest_fip, servers):
+ expected_msg = "Test_msg"
+ self.nc_listen(server_ssh_client,
+ TEST_TCP_PORT,
+ constants.PROTO_NAME_TCP,
+ expected_msg,
+ list(servers.values()))
+ received_msg = self.nc_client(
+ dest_fip,
+ TEST_TCP_PORT,
+ constants.PROTO_NAME_TCP,
+ ssh_client=client_ssh_client)
+ return expected_msg in received_msg
+
+ utils.wait_until_true(
+ lambda: _message_received(
+ ssh_clients['server'], ssh_clients['client'],
+ fips['server']['fixed_ip_address'], servers))
+
def _test_ip_prefix(self, rule_list, should_succeed):
# Add specific remote prefix to VMs and check connectivity
- ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
- icmp_secgrp_name = data_utils.rand_name('icmp_secgrp_with_cidr')
- sg_kwargs = {}
- if self.stateless_sg:
- sg_kwargs['stateful'] = False
- ssh_secgrp = self.os_primary.network_client.create_security_group(
- name=ssh_secgrp_name,
- **sg_kwargs)
+ ssh_secgrp = self._create_security_group('ssh_secgrp')
self.create_loginable_secgroup_rule(
- secgroup_id=ssh_secgrp['security_group']['id'])
+ secgroup_id=ssh_secgrp['id'])
if self.stateless_sg:
self.create_ingress_metadata_secgroup_rule(
- secgroup_id=ssh_secgrp['security_group']['id'])
- icmp_secgrp = self.os_primary.network_client.create_security_group(
- name=icmp_secgrp_name,
- **sg_kwargs)
+ secgroup_id=ssh_secgrp['id'])
+ icmp_secgrp = self._create_security_group('icmp_secgrp')
self.create_secgroup_rules(
- rule_list, secgroup_id=icmp_secgrp['security_group']['id'])
+ rule_list, secgroup_id=icmp_secgrp['id'])
for sec_grp in (ssh_secgrp, icmp_secgrp):
- self.security_groups.append(sec_grp['security_group'])
- security_groups_list = [{'name': ssh_secgrp_name},
- {'name': icmp_secgrp_name}]
+ self.security_groups.append(sec_grp)
+ security_groups_list = [
+ {'name': ssh_secgrp['name']},
+ {'name': icmp_secgrp['name']}]
server_ssh_clients, fips, servers = self.create_vm_testing_sec_grp(
security_groups=security_groups_list)
@@ -217,12 +293,7 @@
def _test_protocol_number_rule(self):
# protocol number is added instead of str in security rule creation
name = data_utils.rand_name("test_protocol_number_rule")
- sg_kwargs = {
- 'name': name
- }
- if self.stateless_sg:
- sg_kwargs['stateful'] = False
- security_group = self.create_security_group(**sg_kwargs)
+ security_group = self._create_security_group(name)
port = self.create_port(network=self.network, name=name,
security_groups=[security_group['id']])
_, fips, _ = self.create_vm_testing_sec_grp(num_servers=1,
@@ -237,28 +308,20 @@
def _test_two_sec_groups(self):
# add 2 sec groups to VM and test rules of both are working
- ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
- icmp_secgrp_name = data_utils.rand_name('icmp_secgrp')
- sg_kwargs = {}
- if self.stateless_sg:
- sg_kwargs['stateful'] = False
- ssh_secgrp = self.os_primary.network_client.create_security_group(
- name=ssh_secgrp_name,
- **sg_kwargs)
+ ssh_secgrp = self._create_security_group('ssh_secgrp')
self.create_loginable_secgroup_rule(
- secgroup_id=ssh_secgrp['security_group']['id'])
- icmp_secgrp = self.os_primary.network_client.create_security_group(
- name=icmp_secgrp_name,
- **sg_kwargs)
+ secgroup_id=ssh_secgrp['id'])
+ icmp_secgrp = self._create_security_group('icmp_secgrp')
self.create_pingable_secgroup_rule(
- secgroup_id=icmp_secgrp['security_group']['id'])
+ secgroup_id=icmp_secgrp['id'])
if self.stateless_sg:
self.create_ingress_metadata_secgroup_rule(
- secgroup_id=ssh_secgrp['security_group']['id'])
+ secgroup_id=ssh_secgrp['id'])
for sec_grp in (ssh_secgrp, icmp_secgrp):
- self.security_groups.append(sec_grp['security_group'])
- security_groups_list = [{'name': ssh_secgrp_name},
- {'name': icmp_secgrp_name}]
+ self.security_groups.append(sec_grp)
+ security_groups_list = [
+ {'name': ssh_secgrp['name']},
+ {'name': icmp_secgrp['name']}]
server_ssh_clients, fips, servers = self.create_vm_testing_sec_grp(
num_servers=1, security_groups=security_groups_list)
# make sure ssh connectivity works
@@ -273,7 +336,7 @@
# update port with ssh security group only
self.os_primary.network_client.update_port(
- port_id, security_groups=[ssh_secgrp['security_group']['id']])
+ port_id, security_groups=[ssh_secgrp['id']])
# make sure ssh connectivity works
self.check_connectivity(fips[0]['floating_ip_address'],
@@ -286,9 +349,7 @@
# update port with ssh and ICMP security groups
self.os_primary.network_client.update_port(
- port_id, security_groups=[
- icmp_secgrp['security_group']['id'],
- ssh_secgrp['security_group']['id']])
+ port_id, security_groups=[icmp_secgrp['id'], ssh_secgrp['id']])
# make sure ssh connectivity works after update
self.check_connectivity(fips[0]['floating_ip_address'],
@@ -299,25 +360,17 @@
self.ping_ip_address(fips[0]['floating_ip_address'])
def _test_remote_group(self):
- sg_kwargs = {}
- if self.stateless_sg:
- sg_kwargs['stateful'] = False
# create a new sec group
- ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
- ssh_secgrp = self.os_primary.network_client.create_security_group(
- name=ssh_secgrp_name,
- **sg_kwargs)
- # add cleanup
- self.security_groups.append(ssh_secgrp['security_group'])
+ ssh_secgrp = self._create_security_group('ssh_secgrp')
# configure sec group to support SSH connectivity
self.create_loginable_secgroup_rule(
- secgroup_id=ssh_secgrp['security_group']['id'])
+ secgroup_id=ssh_secgrp['id'])
if self.stateless_sg:
self.create_ingress_metadata_secgroup_rule(
- secgroup_id=ssh_secgrp['security_group']['id'])
+ secgroup_id=ssh_secgrp['id'])
# spawn two instances with the sec group created
server_ssh_clients, fips, servers = self.create_vm_testing_sec_grp(
- security_groups=[{'name': ssh_secgrp_name}])
+ security_groups=[{'name': ssh_secgrp['name']}])
# verify SSH functionality
for i in range(2):
self.check_connectivity(fips[i]['floating_ip_address'],
@@ -330,9 +383,9 @@
# add ICMP support to the remote group
rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
'direction': constants.INGRESS_DIRECTION,
- 'remote_group_id': ssh_secgrp['security_group']['id']}]
+ 'remote_group_id': ssh_secgrp['id']}]
self.create_secgroup_rules(
- rule_list, secgroup_id=ssh_secgrp['security_group']['id'])
+ rule_list, secgroup_id=ssh_secgrp['id'])
# verify ICMP connectivity between instances works
self.check_remote_connectivity(
server_ssh_clients[0], fips[1]['fixed_ip_address'],
@@ -350,22 +403,14 @@
is applied. When both rules are applied (overlapped), removing one of
them should not disable the connection.
"""
- sg_kwargs = {}
- if self.stateless_sg:
- sg_kwargs['stateful'] = False
# create a new sec group
- ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
- ssh_secgrp = self.os_primary.network_client.create_security_group(
- name=ssh_secgrp_name,
- **sg_kwargs)
- # add cleanup
- self.security_groups.append(ssh_secgrp['security_group'])
+ ssh_secgrp = self._create_security_group('ssh_secgrp')
# configure sec group to support SSH connectivity
self.create_loginable_secgroup_rule(
- secgroup_id=ssh_secgrp['security_group']['id'])
+ secgroup_id=ssh_secgrp['id'])
# spawn two instances with the sec group created
server_ssh_clients, fips, servers = self.create_vm_testing_sec_grp(
- security_groups=[{'name': ssh_secgrp_name}])
+ security_groups=[{'name': ssh_secgrp['name']}])
# verify SSH functionality
for i in range(2):
self.check_connectivity(fips[i]['floating_ip_address'],
@@ -378,9 +423,9 @@
# add ICMP support to the remote group
rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
'direction': constants.INGRESS_DIRECTION,
- 'remote_group_id': ssh_secgrp['security_group']['id']}]
+ 'remote_group_id': ssh_secgrp['id']}]
remote_sg_rid = self.create_secgroup_rules(
- rule_list, secgroup_id=ssh_secgrp['security_group']['id'])[0]['id']
+ rule_list, secgroup_id=ssh_secgrp['id'])[0]['id']
# verify ICMP connectivity between instances works
self.check_remote_connectivity(
server_ssh_clients[0], fips[1]['fixed_ip_address'],
@@ -397,7 +442,7 @@
'direction': constants.INGRESS_DIRECTION,
'remote_address_group_id': test_ag['id']}]
remote_ag_rid = self.create_secgroup_rules(
- rule_list, secgroup_id=ssh_secgrp['security_group']['id'])[0]['id']
+ rule_list, secgroup_id=ssh_secgrp['id'])[0]['id']
# verify ICMP connectivity between instances still works
self.check_remote_connectivity(
server_ssh_clients[0], fips[1]['fixed_ip_address'],
@@ -435,27 +480,20 @@
inherited properly and enforced in these instances.
"""
# create a security group and make it loginable and pingable
- sg_kwargs = {}
- if self.stateless_sg:
- sg_kwargs['stateful'] = False
- secgrp = self.os_primary.network_client.create_security_group(
- name=data_utils.rand_name('secgrp'),
- **sg_kwargs)
+ secgrp = self._create_security_group('secgrp')
self.create_loginable_secgroup_rule(
- secgroup_id=secgrp['security_group']['id'])
+ secgroup_id=secgrp['id'])
self.create_pingable_secgroup_rule(
- secgroup_id=secgrp['security_group']['id'])
+ secgroup_id=secgrp['id'])
if self.stateless_sg:
self.create_ingress_metadata_secgroup_rule(
- secgroup_id=secgrp['security_group']['id'])
- # add security group to cleanup
- self.security_groups.append(secgrp['security_group'])
+ secgroup_id=secgrp['id'])
# create two ports with fixed IPs and the security group created
ports = []
for i in range(2):
ports.append(self.create_port(
self.network, fixed_ips=[{'subnet_id': self.subnets[0]['id']}],
- security_groups=[secgrp['security_group']['id']]))
+ security_groups=[secgrp['id']]))
# spawn instances with the ports created
server_ssh_clients, fips, servers = self.create_vm_testing_sec_grp(
ports=ports)
@@ -467,17 +505,13 @@
self.keypair['private_key'])
def _test_multiple_ports_portrange_remote(self):
- sg_kwargs = {}
initial_security_groups = []
if self.stateless_sg:
- sg_kwargs['stateful'] = False
- md_secgrp = self.os_primary.network_client.create_security_group(
- name=data_utils.rand_name('metadata_secgrp'),
- **sg_kwargs)
+ md_secgrp = self._create_security_group('metadata_secgrp')
self.create_ingress_metadata_secgroup_rule(
- secgroup_id=md_secgrp['security_group']['id'])
+ secgroup_id=md_secgrp['id'])
initial_security_groups.append(
- {'name': md_secgrp['security_group']['name']})
+ {'name': md_secgrp['name']})
ssh_clients, fips, servers = self.create_vm_testing_sec_grp(
num_servers=3, security_groups=initial_security_groups)
@@ -487,9 +521,7 @@
# Create remote and test security groups
for i in range(0, 2):
secgroups.append(
- self.create_security_group(
- name='secgrp-%d' % i,
- **sg_kwargs))
+ self._create_security_group('secgrp-%d' % i))
# configure sec groups to support SSH connectivity
self.create_loginable_secgroup_rule(
secgroup_id=secgroups[-1]['id'])
@@ -569,17 +601,13 @@
def _test_overlapping_sec_grp_rules(self):
"""Test security group rules with overlapping port ranges"""
- sg_kwargs = {}
initial_security_groups = []
if self.stateless_sg:
- sg_kwargs['stateful'] = False
- md_secgrp = self.os_primary.network_client.create_security_group(
- name=data_utils.rand_name('metadata_secgrp'),
- **sg_kwargs)
+ md_secgrp = self._create_security_group('metadata_secgrp')
self.create_ingress_metadata_secgroup_rule(
- secgroup_id=md_secgrp['security_group']['id'])
+ secgroup_id=md_secgrp['id'])
initial_security_groups.append(
- {'name': md_secgrp['security_group']['name']})
+ {'name': md_secgrp['name']})
client_ssh, _, vms = self.create_vm_testing_sec_grp(
num_servers=2, security_groups=initial_security_groups)
tmp_ssh, _, tmp_vm = self.create_vm_testing_sec_grp(
@@ -591,9 +619,7 @@
srv_ip = srv_port['fixed_ips'][0]['ip_address']
secgrps = []
for i, vm in enumerate(vms):
- sg = self.create_security_group(
- name='secgrp-%d' % i,
- **sg_kwargs)
+ sg = self._create_security_group('secgrp-%d' % i)
self.create_loginable_secgroup_rule(secgroup_id=sg['id'])
port = self.client.list_ports(network_id=self.network['id'],
device_id=vm['server']['id'])['ports'][0]
@@ -650,23 +676,15 @@
4. Remove the security group from VM by Port update
5. Ping the VM, expected should be FAIL
"""
- sec_grp_name = data_utils.rand_name('test_sg')
- sg_kwargs = {
- 'name': sec_grp_name
- }
- if self.stateless_sg:
- sg_kwargs['stateful'] = False
- secgrp = self.os_primary.network_client.create_security_group(
- **sg_kwargs)
- self.security_groups.append(secgrp['security_group'])
- sec_grp_id = secgrp['security_group']['id']
- self.create_pingable_secgroup_rule(sec_grp_id)
+ secgrp = self._create_security_group('test_sg')
+ self.security_groups.append(secgrp)
+ self.create_pingable_secgroup_rule(secgrp['id'])
ex_port = self.create_port(
self.network, fixed_ips=[{'subnet_id': self.subnet['id']}],
- security_groups=[sec_grp_id])
+ security_groups=[secgrp['id']])
fip = self.create_vm_testing_sec_grp(
- num_servers=1, security_groups=[{'name': sec_grp_name}],
+ num_servers=1, security_groups=[{'name': secgrp['name']}],
ports=[ex_port])[1][0]
self.ping_ip_address(fip['floating_ip_address'])
@@ -742,22 +760,14 @@
allow-related behavior of the egress rules (added via default).
"""
# create a security group and make it loginable
- secgrp_name = data_utils.rand_name('secgrp')
- sg_kwargs = {
- 'name': secgrp_name
- }
- secgrp = self.os_primary.network_client.create_security_group(
- **sg_kwargs)
- secgrp_id = secgrp['security_group']['id']
- # add security group to cleanup
- self.security_groups.append(secgrp['security_group'])
+ secgrp = self._create_security_group('secgrp')
# remove all rules and add ICMP, DHCP and metadata as egress,
# and ssh as ingress.
- for sgr in secgrp['security_group']['security_group_rules']:
+ for sgr in secgrp['security_group_rules']:
self.client.delete_security_group_rule(sgr['id'])
- self.create_loginable_secgroup_rule(secgroup_id=secgrp_id)
+ self.create_loginable_secgroup_rule(secgroup_id=secgrp['id'])
rule_list = [{'direction': constants.EGRESS_DIRECTION,
'protocol': constants.PROTO_NAME_TCP,
'remote_ip_prefix': '169.254.169.254/32',
@@ -774,11 +784,12 @@
'description': 'ping out',
},
]
- self.create_secgroup_rules(rule_list, secgroup_id=secgrp_id)
+ self.create_secgroup_rules(rule_list, secgroup_id=secgrp['id'])
# go vms, go!
ssh_clients, fips, servers = self.create_vm_testing_sec_grp(
- num_servers=2, security_groups=[{'name': secgrp_name}])
+ num_servers=2,
+ security_groups=[{'name': secgrp['name']}])
# verify SSH functionality. This will ensure that servers were
# able to reach dhcp + metadata servers
@@ -798,8 +809,8 @@
# add intra sg rule. This will allow packets from servers that
# are in the same sg
rule_list = [{'direction': constants.INGRESS_DIRECTION,
- 'remote_group_id': secgrp_id}]
- self.create_secgroup_rules(rule_list, secgroup_id=secgrp_id)
+ 'remote_group_id': secgrp['id']}]
+ self.create_secgroup_rules(rule_list, secgroup_id=secgrp['id'])
# try to ping instances with intra SG permission
self.check_remote_connectivity(
@@ -824,14 +835,11 @@
back
"""
- sg_kwargs = {}
- if self.stateless_sg:
- sg_kwargs['stateful'] = False
- ssh_sg = self.create_security_group(**sg_kwargs)
+ ssh_sg = self._create_security_group('ssh_sg')
self.create_loginable_secgroup_rule(secgroup_id=ssh_sg['id'])
vm_ssh, fips, vms = self.create_vm_testing_sec_grp(
security_groups=[{'name': ssh_sg['name']}])
- sg = self.create_security_group(**sg_kwargs)
+ sg = self._create_security_group('sg')
nc_rule = [{'protocol': constants.PROTO_NUM_TCP,
'direction': constants.INGRESS_DIRECTION,
'port_range_min': 6666,
@@ -862,6 +870,10 @@
security_groups=[ssh_sg['id'], sg['id']])
con.test_connection()
+ @decorators.idempotent_id('4a724164-bbc0-4029-a844-644ece66c026')
+ def test_connectivity_between_vms_using_different_sec_groups(self):
+ self._test_connectivity_between_vms_using_different_sec_groups()
+
@testtools.skipIf(
CONF.neutron_plugin_options.firewall_driver in ['openvswitch', 'None'],
@@ -968,3 +980,7 @@
# Make sure connectivity works fine again
self.ping_ip_address(fips[0]['floating_ip_address'],
should_succeed=True)
+
+ @decorators.idempotent_id('7ede9ab5-a615-46c5-9dea-cf2aa1ea43cb')
+ def test_connectivity_between_vms_using_different_sec_groups(self):
+ self._test_connectivity_between_vms_using_different_sec_groups()