Merge "Run security group scenario tests against stateless security groups too"
diff --git a/neutron_tempest_plugin/api/base.py b/neutron_tempest_plugin/api/base.py
index e080d42..10821c1 100644
--- a/neutron_tempest_plugin/api/base.py
+++ b/neutron_tempest_plugin/api/base.py
@@ -994,8 +994,9 @@
ip_version = ip_version or cls._ip_version
default_params = (
constants.DEFAULT_SECURITY_GROUP_RULE_PARAMS[ip_version])
- if ('remote_address_group_id' in kwargs and 'remote_ip_prefix' in
- default_params):
+ if (('remote_address_group_id' in kwargs or
+ 'remote_group_id' in kwargs) and
+ 'remote_ip_prefix' in default_params):
default_params.pop('remote_ip_prefix')
for key, value in default_params.items():
kwargs.setdefault(key, value)
diff --git a/neutron_tempest_plugin/scenario/base.py b/neutron_tempest_plugin/scenario/base.py
index b9bf36f..4d9165f 100644
--- a/neutron_tempest_plugin/scenario/base.py
+++ b/neutron_tempest_plugin/scenario/base.py
@@ -190,6 +190,27 @@
port_range_max=22)
@classmethod
+ def create_ingress_metadata_secgroup_rule(cls, secgroup_id=None):
+ """This rule is intended to permit inbound metadata traffic
+
+ Allowing ingress traffic from metadata server, required only for
+ stateless security groups.
+ """
+ if getattr(cls, 'stateless_sg'):
+ # NOTE(slaweq): in case of stateless security groups, there is no
+ # "related" or "established" traffic matching at all so even if
+ # egress traffic to 169.254.169.254 is allowed by default SG, we
+ # need to explicitly allow ingress traffic from the metadata server
+ # to be able to receive responses in the guest vm
+ cls.create_security_group_rule(
+ security_group_id=secgroup_id,
+ direction=neutron_lib_constants.INGRESS_DIRECTION,
+ protocol=neutron_lib_constants.PROTO_NAME_TCP,
+ remote_ip_prefix='169.254.169.254/32',
+ description='metadata out'
+ )
+
+ @classmethod
def create_pingable_secgroup_rule(cls, secgroup_id=None,
client=None):
"""This rule is intended to permit inbound ping
diff --git a/neutron_tempest_plugin/scenario/test_security_groups.py b/neutron_tempest_plugin/scenario/test_security_groups.py
index 5af84db..7eae2eb 100644
--- a/neutron_tempest_plugin/scenario/test_security_groups.py
+++ b/neutron_tempest_plugin/scenario/test_security_groups.py
@@ -32,7 +32,7 @@
CONF = config.CONF
-class NetworkSecGroupTest(base.BaseTempestTestCase):
+class BaseNetworkSecGroupTest(base.BaseTempestTestCase):
credentials = ['primary', 'admin']
required_extensions = ['router', 'security-group']
@@ -70,17 +70,17 @@
@classmethod
def setup_credentials(cls):
- super(NetworkSecGroupTest, cls).setup_credentials()
+ super(BaseNetworkSecGroupTest, cls).setup_credentials()
cls.network_client = cls.os_admin.network_client
@classmethod
def setup_clients(cls):
- super(NetworkSecGroupTest, cls).setup_clients()
+ super(BaseNetworkSecGroupTest, cls).setup_clients()
cls.project_id = cls.os_primary.credentials.tenant_id
@classmethod
def resource_setup(cls):
- super(NetworkSecGroupTest, cls).resource_setup()
+ super(BaseNetworkSecGroupTest, cls).resource_setup()
# setup basic topology for servers we can log into it
cls.network = cls.create_network()
cls.subnet = cls.create_subnet(cls.network)
@@ -89,7 +89,7 @@
cls.keypair = cls.create_keypair()
def setUp(self):
- super(NetworkSecGroupTest, self).setUp()
+ super(BaseNetworkSecGroupTest, self).setUp()
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
self.network_client.reset_quotas, self.project_id)
self.network_client.update_quotas(self.project_id, security_group=-1)
@@ -132,12 +132,19 @@
# Add specific remote prefix to VMs and check connectivity
ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
icmp_secgrp_name = data_utils.rand_name('icmp_secgrp_with_cidr')
+ sg_kwargs = {}
+ if self.stateless_sg:
+ sg_kwargs['stateful'] = False
ssh_secgrp = self.os_primary.network_client.create_security_group(
- name=ssh_secgrp_name)
+ name=ssh_secgrp_name,
+ **sg_kwargs)
self.create_loginable_secgroup_rule(
secgroup_id=ssh_secgrp['security_group']['id'])
+ self.create_ingress_metadata_secgroup_rule(
+ secgroup_id=ssh_secgrp['security_group']['id'])
icmp_secgrp = self.os_primary.network_client.create_security_group(
- name=icmp_secgrp_name)
+ name=icmp_secgrp_name,
+ **sg_kwargs)
self.create_secgroup_rules(
rule_list, secgroup_id=icmp_secgrp['security_group']['id'])
for sec_grp in (ssh_secgrp, icmp_secgrp):
@@ -157,8 +164,7 @@
'fixed_ip_address'], should_succeed=should_succeed,
servers=servers)
- @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d764')
- def test_default_sec_grp_scenarios(self):
+ def _test_default_sec_grp_scenarios(self):
server_ssh_clients, fips, servers = self.create_vm_testing_sec_grp()
# Check ssh connectivity when you add sec group rule, enabling ssh
self.create_loginable_secgroup_rule(
@@ -191,11 +197,15 @@
self.check_remote_connectivity(server_ssh_clients[0], ext_net_ip,
servers=servers)
- @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d864')
- def test_protocol_number_rule(self):
+ def _test_protocol_number_rule(self):
# protocol number is added instead of str in security rule creation
name = data_utils.rand_name("test_protocol_number_rule")
- security_group = self.create_security_group(name=name)
+ sg_kwargs = {
+ 'name': name
+ }
+ if self.stateless_sg:
+ sg_kwargs['stateful'] = False
+ security_group = self.create_security_group(**sg_kwargs)
port = self.create_port(network=self.network, name=name,
security_groups=[security_group['id']])
_, fips, _ = self.create_vm_testing_sec_grp(num_servers=1,
@@ -208,19 +218,25 @@
self.create_secgroup_rules(rule_list, secgroup_id=security_group['id'])
self.ping_ip_address(fips[0]['floating_ip_address'])
- @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d964')
- def test_two_sec_groups(self):
+ def _test_two_sec_groups(self):
# add 2 sec groups to VM and test rules of both are working
ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
icmp_secgrp_name = data_utils.rand_name('icmp_secgrp')
+ sg_kwargs = {}
+ if self.stateless_sg:
+ sg_kwargs['stateful'] = False
ssh_secgrp = self.os_primary.network_client.create_security_group(
- name=ssh_secgrp_name)
+ name=ssh_secgrp_name,
+ **sg_kwargs)
self.create_loginable_secgroup_rule(
secgroup_id=ssh_secgrp['security_group']['id'])
icmp_secgrp = self.os_primary.network_client.create_security_group(
- name=icmp_secgrp_name)
+ name=icmp_secgrp_name,
+ **sg_kwargs)
self.create_pingable_secgroup_rule(
secgroup_id=icmp_secgrp['security_group']['id'])
+ self.create_ingress_metadata_secgroup_rule(
+ secgroup_id=ssh_secgrp['security_group']['id'])
for sec_grp in (ssh_secgrp, icmp_secgrp):
self.security_groups.append(sec_grp['security_group'])
security_groups_list = [{'name': ssh_secgrp_name},
@@ -264,79 +280,22 @@
# make sure ICMP connectivity works after update
self.ping_ip_address(fips[0]['floating_ip_address'])
- @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d664')
- def test_ip_prefix(self):
- cidr = self.subnet['cidr']
- rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
- 'direction': constants.INGRESS_DIRECTION,
- 'remote_ip_prefix': cidr}]
- self._test_ip_prefix(rule_list, should_succeed=True)
-
- @decorators.attr(type='negative')
- @decorators.idempotent_id('a01cd2ef-3cfc-4614-8aac-9d1333ea21dd')
- def test_ip_prefix_negative(self):
- # define bad CIDR
- cidr = '10.100.0.254/32'
- rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
- 'direction': constants.INGRESS_DIRECTION,
- 'remote_ip_prefix': cidr}]
- self._test_ip_prefix(rule_list, should_succeed=False)
-
- @decorators.idempotent_id('01f0ddca-b049-47eb-befd-82acb502c9ec')
- def test_established_tcp_session_after_re_attachinging_sg(self):
- """Test existing connection remain open after sg has been re-attached
-
- Verifies that new packets can pass over the existing connection when
- the security group has been removed from the server and then added
- back
- """
-
- ssh_sg = self.create_security_group()
- self.create_loginable_secgroup_rule(secgroup_id=ssh_sg['id'])
- vm_ssh, fips, vms = self.create_vm_testing_sec_grp(
- security_groups=[{'name': ssh_sg['name']}])
- sg = self.create_security_group()
- nc_rule = [{'protocol': constants.PROTO_NUM_TCP,
- 'direction': constants.INGRESS_DIRECTION,
- 'port_range_min': 6666,
- 'port_range_max': 6666}]
- self.create_secgroup_rules(nc_rule, secgroup_id=sg['id'])
- srv_port = self.client.list_ports(network_id=self.network['id'],
- device_id=vms[1]['server']['id'])['ports'][0]
- srv_ip = srv_port['fixed_ips'][0]['ip_address']
- with utils.StatefulConnection(
- vm_ssh[0], vm_ssh[1], srv_ip, 6666) as con:
- self.client.update_port(srv_port['id'],
- security_groups=[ssh_sg['id'], sg['id']])
- con.test_connection()
- with utils.StatefulConnection(
- vm_ssh[0], vm_ssh[1], srv_ip, 6666) as con:
- self.client.update_port(
- srv_port['id'], security_groups=[ssh_sg['id']])
- con.test_connection(should_pass=False)
- with utils.StatefulConnection(
- vm_ssh[0], vm_ssh[1], srv_ip, 6666) as con:
- self.client.update_port(srv_port['id'],
- security_groups=[ssh_sg['id'], sg['id']])
- con.test_connection()
- self.client.update_port(srv_port['id'],
- security_groups=[ssh_sg['id']])
- con.test_connection(should_pass=False)
- self.client.update_port(srv_port['id'],
- security_groups=[ssh_sg['id'], sg['id']])
- con.test_connection()
-
- @decorators.idempotent_id('7ed39b86-006d-40fb-887a-ae46693dabc9')
- def test_remote_group(self):
+ def _test_remote_group(self):
+ sg_kwargs = {}
+ if self.stateless_sg:
+ sg_kwargs['stateful'] = False
# create a new sec group
ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
ssh_secgrp = self.os_primary.network_client.create_security_group(
- name=ssh_secgrp_name)
+ name=ssh_secgrp_name,
+ **sg_kwargs)
# add cleanup
self.security_groups.append(ssh_secgrp['security_group'])
# configure sec group to support SSH connectivity
self.create_loginable_secgroup_rule(
secgroup_id=ssh_secgrp['security_group']['id'])
+ self.create_ingress_metadata_secgroup_rule(
+ secgroup_id=ssh_secgrp['security_group']['id'])
# spawn two instances with the sec group created
server_ssh_clients, fips, servers = self.create_vm_testing_sec_grp(
security_groups=[{'name': ssh_secgrp_name}])
@@ -363,14 +322,7 @@
self.ping_ip_address(fips[0]['floating_ip_address'],
should_succeed=False)
- @testtools.skipUnless(
- CONF.neutron_plugin_options.firewall_driver == 'openvswitch',
- "Openvswitch agent is required to run this test")
- @decorators.idempotent_id('678dd4c0-2953-4626-b89c-8e7e4110ec4b')
- @tempest_utils.requires_ext(extension="address-group", service="network")
- @tempest_utils.requires_ext(
- extension="security-groups-remote-address-group", service="network")
- def test_remote_group_and_remote_address_group(self):
+ def _test_remote_group_and_remote_address_group(self):
"""Test SG rules with remote group and remote address group
This test checks the ICMP connection among two servers using a security
@@ -379,10 +331,14 @@
is applied. When both rules are applied (overlapped), removing one of
them should not disable the connection.
"""
+ sg_kwargs = {}
+ if self.stateless_sg:
+ sg_kwargs['stateful'] = False
# create a new sec group
ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
ssh_secgrp = self.os_primary.network_client.create_security_group(
- name=ssh_secgrp_name)
+ name=ssh_secgrp_name,
+ **sg_kwargs)
# add cleanup
self.security_groups.append(ssh_secgrp['security_group'])
# configure sec group to support SSH connectivity
@@ -452,8 +408,7 @@
self.ping_ip_address(fips[0]['floating_ip_address'],
should_succeed=False)
- @decorators.idempotent_id('f07d0159-8f9e-4faa-87f5-a869ab0ad488')
- def test_multiple_ports_secgroup_inheritance(self):
+ def _test_multiple_ports_secgroup_inheritance(self):
"""Test multiple port security group inheritance
This test creates two ports with security groups, then
@@ -461,12 +416,18 @@
inherited properly and enforced in these instances.
"""
# create a security group and make it loginable and pingable
+ sg_kwargs = {}
+ if self.stateless_sg:
+ sg_kwargs['stateful'] = False
secgrp = self.os_primary.network_client.create_security_group(
- name=data_utils.rand_name('secgrp'))
+ name=data_utils.rand_name('secgrp'),
+ **sg_kwargs)
self.create_loginable_secgroup_rule(
secgroup_id=secgrp['security_group']['id'])
self.create_pingable_secgroup_rule(
secgroup_id=secgrp['security_group']['id'])
+ self.create_ingress_metadata_secgroup_rule(
+ secgroup_id=secgrp['security_group']['id'])
# add security group to cleanup
self.security_groups.append(secgrp['security_group'])
# create two ports with fixed IPs and the security group created
@@ -485,17 +446,30 @@
CONF.validation.image_ssh_user,
self.keypair['private_key'])
- @decorators.idempotent_id('f07d0159-8f9e-4faa-87f5-a869ab0ad489')
- def test_multiple_ports_portrange_remote(self):
+ def _test_multiple_ports_portrange_remote(self):
+ sg_kwargs = {}
+ initial_security_groups = []
+ if self.stateless_sg:
+ sg_kwargs['stateful'] = False
+ md_secgrp = self.os_primary.network_client.create_security_group(
+ name=data_utils.rand_name('metadata_secgrp'),
+ **sg_kwargs)
+ self.create_ingress_metadata_secgroup_rule(
+ secgroup_id=md_secgrp['security_group']['id'])
+ initial_security_groups.append(
+ {'name': md_secgrp['security_group']['name']})
+
ssh_clients, fips, servers = self.create_vm_testing_sec_grp(
- num_servers=3)
+ num_servers=3, security_groups=initial_security_groups)
secgroups = []
ports = []
# Create remote and test security groups
for i in range(0, 2):
secgroups.append(
- self.create_security_group(name='secgrp-%d' % i))
+ self.create_security_group(
+ name='secgrp-%d' % i,
+ **sg_kwargs))
# configure sec groups to support SSH connectivity
self.create_loginable_secgroup_rule(
secgroup_id=secgroups[-1]['id'])
@@ -535,6 +509,21 @@
'port_range_min': '82',
'port_range_max': '83',
'remote_group_id': secgroups[0]['id']}]
+ if self.stateless_sg:
+ rule_list.append({
+ 'protocol': constants.PROTO_NUM_TCP,
+ 'direction': constants.EGRESS_DIRECTION,
+ 'remote_group_id': secgroups[0]['id']})
+ # NOTE(slaweq): in case of stateless SG, client needs to have also
+ # rule which will explicitly accept ingress connections from
+ # secgroup[1]
+
+ self.create_security_group_rule(
+ security_group_id=secgroups[0]['id'],
+ protocol=constants.PROTO_NAME_TCP,
+ direction=constants.INGRESS_DIRECTION,
+ remote_group_id=secgroups[1]['id'])
+
self.create_secgroup_rules(
rule_list, secgroup_id=secgroups[1]['id'])
@@ -558,6 +547,170 @@
ssh_clients[0], ssh_clients[2], test_ip, port) as con:
con.test_connection(should_pass=False)
+ def _test_overlapping_sec_grp_rules(self):
+ """Test security group rules with overlapping port ranges"""
+ sg_kwargs = {}
+ initial_security_groups = []
+ if self.stateless_sg:
+ sg_kwargs['stateful'] = False
+ md_secgrp = self.os_primary.network_client.create_security_group(
+ name=data_utils.rand_name('metadata_secgrp'),
+ **sg_kwargs)
+ self.create_ingress_metadata_secgroup_rule(
+ secgroup_id=md_secgrp['security_group']['id'])
+ initial_security_groups.append(
+ {'name': md_secgrp['security_group']['name']})
+ client_ssh, _, vms = self.create_vm_testing_sec_grp(
+ num_servers=2, security_groups=initial_security_groups)
+ tmp_ssh, _, tmp_vm = self.create_vm_testing_sec_grp(
+ num_servers=1, security_groups=initial_security_groups)
+ srv_ssh = tmp_ssh[0]
+ srv_vm = tmp_vm[0]
+ srv_port = self.client.list_ports(network_id=self.network['id'],
+ device_id=srv_vm['server']['id'])['ports'][0]
+ srv_ip = srv_port['fixed_ips'][0]['ip_address']
+ secgrps = []
+ for i, vm in enumerate(vms):
+ sg = self.create_security_group(
+ name='secgrp-%d' % i,
+ **sg_kwargs)
+ self.create_loginable_secgroup_rule(secgroup_id=sg['id'])
+ port = self.client.list_ports(network_id=self.network['id'],
+ device_id=vm['server']['id'])['ports'][0]
+ self.client.update_port(port['id'], security_groups=[sg['id']])
+ secgrps.append(sg)
+ tcp_port = 3000
+ rule_list = [{'protocol': constants.PROTO_NUM_TCP,
+ 'direction': constants.INGRESS_DIRECTION,
+ 'port_range_min': tcp_port,
+ 'port_range_max': tcp_port,
+ 'remote_group_id': secgrps[0]['id']},
+ {'protocol': constants.PROTO_NUM_TCP,
+ 'direction': constants.INGRESS_DIRECTION,
+ 'port_range_min': tcp_port,
+ 'port_range_max': tcp_port + 2,
+ 'remote_group_id': secgrps[1]['id']}]
+ self.client.update_port(srv_port['id'],
+ security_groups=[secgrps[0]['id'], secgrps[1]['id']])
+ self.create_secgroup_rules(rule_list, secgroup_id=secgrps[0]['id'])
+
+ if self.stateless_sg:
+ # NOTE(slaweq): in case of stateless SG, client needs to have also
+ # rule which will explicitly accept ingress TCP connections which
+ # will be replies from the TCP server so it will use random
+ # destination port (depends on the src port choosen by client while
+ # establishing connection)
+ self.create_security_group_rule(
+ security_group_id=secgrps[0]['id'],
+ protocol=constants.PROTO_NAME_TCP,
+ direction=constants.INGRESS_DIRECTION)
+ self.create_security_group_rule(
+ security_group_id=secgrps[1]['id'],
+ protocol=constants.PROTO_NAME_TCP,
+ direction=constants.INGRESS_DIRECTION)
+
+ # The conntrack entries are ruled by the OF definitions but conntrack
+ # status can change the datapath. Let's check the rules in two
+ # attempts
+ for _ in range(2):
+ with utils.StatefulConnection(
+ client_ssh[0], srv_ssh, srv_ip, tcp_port) as con:
+ con.test_connection()
+ for port in range(tcp_port, tcp_port + 3):
+ with utils.StatefulConnection(
+ client_ssh[1], srv_ssh, srv_ip, port) as con:
+ con.test_connection()
+
+ def _test_remove_sec_grp_from_active_vm(self):
+ """Tests the following:
+
+ 1. Create SG associated with ICMP rule
+ 2. Create Port (assoiated to SG #1) and use it to create the VM
+ 3. Ping the VM, expected should be PASS
+ 4. Remove the security group from VM by Port update
+ 5. Ping the VM, expected should be FAIL
+ """
+ sec_grp_name = data_utils.rand_name('test_sg')
+ sg_kwargs = {
+ 'name': sec_grp_name
+ }
+ if self.stateless_sg:
+ sg_kwargs['stateful'] = False
+ secgrp = self.os_primary.network_client.create_security_group(
+ **sg_kwargs)
+ self.security_groups.append(secgrp['security_group'])
+ sec_grp_id = secgrp['security_group']['id']
+ self.create_pingable_secgroup_rule(sec_grp_id)
+
+ ex_port = self.create_port(
+ self.network, fixed_ips=[{'subnet_id': self.subnet['id']}],
+ security_groups=[sec_grp_id])
+ fip = self.create_vm_testing_sec_grp(
+ num_servers=1, security_groups=[{'name': sec_grp_name}],
+ ports=[ex_port])[1][0]
+
+ self.ping_ip_address(fip['floating_ip_address'])
+ self.client.update_port(ex_port['id'],
+ security_groups=[])
+ self.ping_ip_address(fip['floating_ip_address'],
+ should_succeed=False)
+
+
+class StatefulNetworkSecGroupTest(BaseNetworkSecGroupTest):
+ stateless_sg = False
+
+ @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d764')
+ def test_default_sec_grp_scenarios(self):
+ self._test_default_sec_grp_scenarios()
+
+ @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d864')
+ def test_protocol_number_rule(self):
+ self._test_protocol_number_rule()
+
+ @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d964')
+ def test_two_sec_groups(self):
+ self._test_two_sec_groups()
+
+ @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d664')
+ def test_ip_prefix(self):
+ cidr = self.subnet['cidr']
+ rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
+ 'direction': constants.INGRESS_DIRECTION,
+ 'remote_ip_prefix': cidr}]
+ self._test_ip_prefix(rule_list, should_succeed=True)
+
+ @decorators.attr(type='negative')
+ @decorators.idempotent_id('a01cd2ef-3cfc-4614-8aac-9d1333ea21dd')
+ def test_ip_prefix_negative(self):
+ # define bad CIDR
+ cidr = '10.100.0.254/32'
+ rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
+ 'direction': constants.INGRESS_DIRECTION,
+ 'remote_ip_prefix': cidr}]
+ self._test_ip_prefix(rule_list, should_succeed=False)
+
+ @decorators.idempotent_id('7ed39b86-006d-40fb-887a-ae46693dabc9')
+ def test_remote_group(self):
+ self._test_remote_group()
+
+ @testtools.skipUnless(
+ CONF.neutron_plugin_options.firewall_driver == 'openvswitch',
+ "Openvswitch agent is required to run this test")
+ @decorators.idempotent_id('678dd4c0-2953-4626-b89c-8e7e4110ec4b')
+ @tempest_utils.requires_ext(extension="address-group", service="network")
+ @tempest_utils.requires_ext(
+ extension="security-groups-remote-address-group", service="network")
+ def test_remote_group_and_remote_address_group(self):
+ self._test_remote_group_and_remote_address_group()
+
+ @decorators.idempotent_id('f07d0159-8f9e-4faa-87f5-a869ab0ad488')
+ def test_multiple_ports_secgroup_inheritance(self):
+ self._test_multiple_ports_secgroup_inheritance()
+
+ @decorators.idempotent_id('f07d0159-8f9e-4faa-87f5-a869ab0ad489')
+ def test_multiple_ports_portrange_remote(self):
+ self._test_multiple_ports_portrange_remote()
+
@decorators.idempotent_id('f07d0159-8f9e-4faa-87f5-a869ab0ad490')
def test_intra_sg_isolation(self):
"""Test intra security group isolation
@@ -570,8 +723,11 @@
"""
# create a security group and make it loginable
secgrp_name = data_utils.rand_name('secgrp')
+ sg_kwargs = {
+ 'name': secgrp_name
+ }
secgrp = self.os_primary.network_client.create_security_group(
- name=secgrp_name)
+ **sg_kwargs)
secgrp_id = secgrp['security_group']['id']
# add security group to cleanup
self.security_groups.append(secgrp['security_group'])
@@ -633,74 +789,120 @@
@decorators.idempotent_id('cd66b826-d86c-4fb4-ab37-17c8391753cb')
def test_overlapping_sec_grp_rules(self):
- """Test security group rules with overlapping port ranges"""
- client_ssh, _, vms = self.create_vm_testing_sec_grp(num_servers=2)
- tmp_ssh, _, tmp_vm = self.create_vm_testing_sec_grp(num_servers=1)
- srv_ssh = tmp_ssh[0]
- srv_vm = tmp_vm[0]
- srv_port = self.client.list_ports(network_id=self.network['id'],
- device_id=srv_vm['server']['id'])['ports'][0]
- srv_ip = srv_port['fixed_ips'][0]['ip_address']
- secgrps = []
- for i, vm in enumerate(vms):
- sg = self.create_security_group(name='secgrp-%d' % i)
- self.create_loginable_secgroup_rule(secgroup_id=sg['id'])
- port = self.client.list_ports(network_id=self.network['id'],
- device_id=vm['server']['id'])['ports'][0]
- self.client.update_port(port['id'], security_groups=[sg['id']])
- secgrps.append(sg)
- tcp_port = 3000
- rule_list = [{'protocol': constants.PROTO_NUM_TCP,
- 'direction': constants.INGRESS_DIRECTION,
- 'port_range_min': tcp_port,
- 'port_range_max': tcp_port,
- 'remote_group_id': secgrps[0]['id']},
- {'protocol': constants.PROTO_NUM_TCP,
- 'direction': constants.INGRESS_DIRECTION,
- 'port_range_min': tcp_port,
- 'port_range_max': tcp_port + 2,
- 'remote_group_id': secgrps[1]['id']}]
- self.client.update_port(srv_port['id'],
- security_groups=[secgrps[0]['id'], secgrps[1]['id']])
- self.create_secgroup_rules(rule_list, secgroup_id=secgrps[0]['id'])
- # The conntrack entries are ruled by the OF definitions but conntrack
- # status can change the datapath. Let's check the rules in two
- # attempts
- for _ in range(2):
- with utils.StatefulConnection(
- client_ssh[0], srv_ssh, srv_ip, tcp_port) as con:
- con.test_connection()
- for port in range(tcp_port, tcp_port + 3):
- with utils.StatefulConnection(
- client_ssh[1], srv_ssh, srv_ip, port) as con:
- con.test_connection()
+ self._test_overlapping_sec_grp_rules()
@decorators.idempotent_id('96dcd5ff-9d45-4e0d-bea0-0b438cbd388f')
def test_remove_sec_grp_from_active_vm(self):
- """Tests the following:
+ self._test_remove_sec_grp_from_active_vm()
- 1. Create SG associated with ICMP rule
- 2. Create Port (assoiated to SG #1) and use it to create the VM
- 3. Ping the VM, expected should be PASS
- 4. Remove the security group from VM by Port update
- 5. Ping the VM, expected should be FAIL
+ @decorators.idempotent_id('01f0ddca-b049-47eb-befd-82acb502c9ec')
+ def test_established_tcp_session_after_re_attachinging_sg(self):
+ """Test existing connection remain open after sg has been re-attached
+
+ Verifies that new packets can pass over the existing connection when
+ the security group has been removed from the server and then added
+ back
"""
- sec_grp_name = data_utils.rand_name('test_sg')
- secgrp = self.os_primary.network_client.create_security_group(
- name=sec_grp_name)
- self.security_groups.append(secgrp['security_group'])
- sec_grp_id = secgrp['security_group']['id']
- self.create_pingable_secgroup_rule(sec_grp_id)
- ex_port = self.create_port(
- self.network, fixed_ips=[{'subnet_id': self.subnet['id']}],
- security_groups=[sec_grp_id])
- fip = self.create_vm_testing_sec_grp(
- num_servers=1, security_groups=[{'name': sec_grp_name}],
- ports=[ex_port])[1][0]
+ sg_kwargs = {}
+ if self.stateless_sg:
+ sg_kwargs['stateful'] = False
+ ssh_sg = self.create_security_group(**sg_kwargs)
+ self.create_loginable_secgroup_rule(secgroup_id=ssh_sg['id'])
+ vm_ssh, fips, vms = self.create_vm_testing_sec_grp(
+ security_groups=[{'name': ssh_sg['name']}])
+ sg = self.create_security_group(**sg_kwargs)
+ nc_rule = [{'protocol': constants.PROTO_NUM_TCP,
+ 'direction': constants.INGRESS_DIRECTION,
+ 'port_range_min': 6666,
+ 'port_range_max': 6666}]
+ self.create_secgroup_rules(nc_rule, secgroup_id=sg['id'])
+ srv_port = self.client.list_ports(network_id=self.network['id'],
+ device_id=vms[1]['server']['id'])['ports'][0]
+ srv_ip = srv_port['fixed_ips'][0]['ip_address']
+ with utils.StatefulConnection(
+ vm_ssh[0], vm_ssh[1], srv_ip, 6666) as con:
+ self.client.update_port(srv_port['id'],
+ security_groups=[ssh_sg['id'], sg['id']])
+ con.test_connection()
+ with utils.StatefulConnection(
+ vm_ssh[0], vm_ssh[1], srv_ip, 6666) as con:
+ self.client.update_port(
+ srv_port['id'], security_groups=[ssh_sg['id']])
+ con.test_connection(should_pass=False)
+ with utils.StatefulConnection(
+ vm_ssh[0], vm_ssh[1], srv_ip, 6666) as con:
+ self.client.update_port(srv_port['id'],
+ security_groups=[ssh_sg['id'], sg['id']])
+ con.test_connection()
+ self.client.update_port(srv_port['id'],
+ security_groups=[ssh_sg['id']])
+ con.test_connection(should_pass=False)
+ self.client.update_port(srv_port['id'],
+ security_groups=[ssh_sg['id'], sg['id']])
+ con.test_connection()
- self.ping_ip_address(fip['floating_ip_address'])
- self.client.update_port(ex_port['id'],
- security_groups=[])
- self.ping_ip_address(fip['floating_ip_address'],
- should_succeed=False)
+
+class StatelessNetworkSecGroupTest(BaseNetworkSecGroupTest):
+ required_extensions = ['security-group', 'stateful-security-group']
+ stateless_sg = True
+
+ @decorators.idempotent_id('9e193e3f-56f2-4f4e-886c-988a147958ef')
+ def test_default_sec_grp_scenarios(self):
+ self._test_default_sec_grp_scenarios()
+
+ @decorators.idempotent_id('afae8654-a389-4887-b21d-7f07ec350177')
+ def test_protocol_number_rule(self):
+ self._test_protocol_number_rule()
+
+ @decorators.idempotent_id('b51cc0eb-8f9a-49e7-96ab-61cd31243b67')
+ def test_two_sec_groups(self):
+ self._test_two_sec_groups()
+
+ @decorators.idempotent_id('07985496-58da-4c1f-a6ef-2fdd88128a81')
+ def test_ip_prefix(self):
+ cidr = self.subnet['cidr']
+ rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
+ 'direction': constants.INGRESS_DIRECTION,
+ 'remote_ip_prefix': cidr}]
+ self._test_ip_prefix(rule_list, should_succeed=True)
+
+ @decorators.attr(type='negative')
+ @decorators.idempotent_id('1ad469c4-0d8f-42ae-8ec3-46cc424565c4')
+ def test_ip_prefix_negative(self):
+ # define bad CIDR
+ cidr = '10.100.0.254/32'
+ rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
+ 'direction': constants.INGRESS_DIRECTION,
+ 'remote_ip_prefix': cidr}]
+ self._test_ip_prefix(rule_list, should_succeed=False)
+
+ @decorators.idempotent_id('fa1e93bf-67c5-4590-9962-38ee1f43a46a')
+ def test_remote_group(self):
+ self._test_remote_group()
+
+ @testtools.skipUnless(
+ CONF.neutron_plugin_options.firewall_driver == 'openvswitch',
+ "Openvswitch agent is required to run this test")
+ @decorators.idempotent_id('9fae530d-2711-4c61-a4a5-8efe6e58ab14')
+ @tempest_utils.requires_ext(extension="address-group", service="network")
+ @tempest_utils.requires_ext(
+ extension="security-groups-remote-address-group", service="network")
+ def test_remote_group_and_remote_address_group(self):
+ self._test_remote_group_and_remote_address_group()
+
+ @decorators.idempotent_id('4f1eb6db-ae7f-4f26-b371-cbd8363f9b0b')
+ def test_multiple_ports_secgroup_inheritance(self):
+ self._test_multiple_ports_secgroup_inheritance()
+
+ @decorators.idempotent_id('4043ca0a-eabb-4198-be53-3d3051cc0804')
+ def test_multiple_ports_portrange_remote(self):
+ self._test_multiple_ports_portrange_remote()
+
+ @decorators.idempotent_id('bfe25138-ceac-4944-849a-b9b90aff100f')
+ def test_overlapping_sec_grp_rules(self):
+ self._test_overlapping_sec_grp_rules()
+
+ @decorators.idempotent_id('e4340e47-39cd-49ed-967c-fc2c40b47c5a')
+ def test_remove_sec_grp_from_active_vm(self):
+ self._test_remove_sec_grp_from_active_vm()
diff --git a/zuul.d/master_jobs.yaml b/zuul.d/master_jobs.yaml
index fba4610..93c8fe2 100644
--- a/zuul.d/master_jobs.yaml
+++ b/zuul.d/master_jobs.yaml
@@ -338,7 +338,8 @@
# bug https://bugzilla.redhat.com/show_bug.cgi?id=1965036 will be fixed
tempest_exclude_regex: "\
(^neutron_tempest_plugin.scenario.test_trunk.TrunkTest.test_subport_connectivity)|\
- (^neutron_tempest_plugin.scenario.test_security_groups.NetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)"
+ (^neutron_tempest_plugin.scenario.test_security_groups.StatefulNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
+ (^neutron_tempest_plugin.scenario.test_security_groups.StatelessNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)"
devstack_localrc:
Q_AGENT: openvswitch
Q_ML2_TENANT_NETWORK_TYPE: vxlan
@@ -487,7 +488,8 @@
# fixed
tempest_exclude_regex: "\
(^neutron_tempest_plugin.scenario.test_vlan_transparency.VlanTransparencyTest)|\
- (^neutron_tempest_plugin.scenario.test_security_groups.NetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
+ (^neutron_tempest_plugin.scenario.test_security_groups.StatefulNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
+ (^neutron_tempest_plugin.scenario.test_security_groups.StatelessNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
(^neutron_tempest_plugin.scenario.test_floatingip.FloatingIPPortDetailsTest.test_floatingip_port_details)"
devstack_localrc:
Q_AGENT: linuxbridge
diff --git a/zuul.d/zed_jobs.yaml b/zuul.d/zed_jobs.yaml
index acbd234..f6e143f 100644
--- a/zuul.d/zed_jobs.yaml
+++ b/zuul.d/zed_jobs.yaml
@@ -122,7 +122,8 @@
# bug https://bugzilla.redhat.com/show_bug.cgi?id=1965036 will be fixed
tempest_exclude_regex: "\
(^neutron_tempest_plugin.scenario.test_trunk.TrunkTest.test_subport_connectivity)|\
- (^neutron_tempest_plugin.scenario.test_security_groups.NetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)"
+ (^neutron_tempest_plugin.scenario.test_security_groups.StatefulNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
+ (^neutron_tempest_plugin.scenario.test_security_groups.StatelessNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)"
network_available_features: *available_features
devstack_localrc:
NETWORK_API_EXTENSIONS: "{{ (network_api_extensions_common + network_api_extensions_openvswitch) | join(',') }}"
@@ -146,7 +147,8 @@
(^tempest.api.compute.servers.test_multiple_create)"
tempest_exclude_regex: "\
(^neutron_tempest_plugin.scenario.test_vlan_transparency.VlanTransparencyTest)|\
- (^neutron_tempest_plugin.scenario.test_security_groups.NetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
+ (^neutron_tempest_plugin.scenario.test_security_groups.StatefulNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
+ (^neutron_tempest_plugin.scenario.test_security_groups.StatelessNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
(^neutron_tempest_plugin.scenario.test_floatingip.FloatingIPPortDetailsTest.test_floatingip_port_details)"
network_available_features: *available_features
devstack_localrc: