Improve security groups management.
- Add create_security_group_rule method to base class
to make easier creating rules for IPv6 test cases.
- Add delete_security_group method.
- Make sure segurity groups are deleted using the
client that has been used to create them.
- Improve security group client handling.
Change-Id: I50858d5198d8a70a2bb9fb278786c433d7cb12ca
diff --git a/neutron_tempest_plugin/api/base.py b/neutron_tempest_plugin/api/base.py
index 1b8239b..bcafe03 100644
--- a/neutron_tempest_plugin/api/base.py
+++ b/neutron_tempest_plugin/api/base.py
@@ -189,15 +189,15 @@
network['id'])
# Clean up security groups
- for secgroup in cls.security_groups:
- cls._try_delete_resource(cls.client.delete_security_group,
- secgroup['id'])
+ for security_group in cls.security_groups:
+ cls._try_delete_resource(cls.delete_security_group,
+ security_group)
# Clean up admin security groups
- for secgroup in cls.admin_security_groups:
- cls._try_delete_resource(
- cls.admin_client.delete_security_group,
- secgroup['id'])
+ for security_group in cls.admin_security_groups:
+ cls._try_delete_resource(cls.delete_security_group,
+ security_group,
+ client=cls.admin_client)
for subnetpool in cls.subnetpools:
cls._try_delete_resource(cls.client.delete_subnetpool,
@@ -717,18 +717,78 @@
description=test_description)['project']
cls.projects.append(project)
# Create a project will create a default security group.
- # We make these security groups into admin_security_groups.
sgs_list = cls.admin_client.list_security_groups(
tenant_id=project['id'])['security_groups']
- for sg in sgs_list:
- cls.admin_security_groups.append(sg)
+ for security_group in sgs_list:
+ # Make sure delete_security_group method will use
+ # the admin client for this group
+ security_group['client'] = cls.admin_client
+ cls.security_groups.append(security_group)
return project
@classmethod
- def create_security_group(cls, name, **kwargs):
- body = cls.client.create_security_group(name=name, **kwargs)
- cls.security_groups.append(body['security_group'])
- return body['security_group']
+ def create_security_group(cls, name=None, project=None, client=None,
+ **kwargs):
+ if project:
+ client = client or cls.admin_client
+ project_id = kwargs.setdefault('project_id', project['id'])
+ tenant_id = kwargs.setdefault('tenant_id', project['id'])
+ if project_id != project['id'] or tenant_id != project['id']:
+ raise ValueError('Project ID specified multiple times')
+ else:
+ client = client or cls.client
+
+ name = name or data_utils.rand_name(cls.__name__)
+ security_group = client.create_security_group(name=name, **kwargs)[
+ 'security_group']
+ security_group['client'] = client
+ cls.security_groups.append(security_group)
+ return security_group
+
+ @classmethod
+ def delete_security_group(cls, security_group, client=None):
+ client = client or security_group.get('client') or cls.client
+ client.delete_security_group(security_group['id'])
+
+ @classmethod
+ def create_security_group_rule(cls, security_group=None, project=None,
+ client=None, ip_version=None, **kwargs):
+ if project:
+ client = client or cls.admin_client
+ project_id = kwargs.setdefault('project_id', project['id'])
+ tenant_id = kwargs.setdefault('tenant_id', project['id'])
+ if project_id != project['id'] or tenant_id != project['id']:
+ raise ValueError('Project ID specified multiple times')
+
+ if 'security_group_id' not in kwargs:
+ security_group = (security_group or
+ cls.get_security_group(client=client))
+
+ if security_group:
+ client = client or security_group.get('client')
+ security_group_id = kwargs.setdefault('security_group_id',
+ security_group['id'])
+ if security_group_id != security_group['id']:
+ raise ValueError('Security group ID specified multiple times.')
+
+ ip_version = ip_version or cls._ip_version
+ default_params = (
+ constants.DEFAULT_SECURITY_GROUP_RULE_PARAMS[ip_version])
+ for key, value in default_params.items():
+ kwargs.setdefault(key, value)
+
+ client = client or cls.client
+ return client.create_security_group_rule(**kwargs)[
+ 'security_group_rule']
+
+ @classmethod
+ def get_security_group(cls, name='default', client=None):
+ client = client or cls.client
+ security_groups = client.list_security_groups()['security_groups']
+ for security_group in security_groups:
+ if security_group['name'] == name:
+ return security_group
+ raise ValueError("No such security group named {!r}".format(name))
@classmethod
def create_keypair(cls, client=None, name=None, **kwargs):
diff --git a/neutron_tempest_plugin/common/constants.py b/neutron_tempest_plugin/common/constants.py
index 4dc7844..f695f6c 100644
--- a/neutron_tempest_plugin/common/constants.py
+++ b/neutron_tempest_plugin/common/constants.py
@@ -171,3 +171,11 @@
# Possible types of values (e.g. in QoS rule types)
VALUES_TYPE_CHOICES = "choices"
VALUES_TYPE_RANGE = "range"
+
+# Security group parameters values mapped by IP version
+DEFAULT_SECURITY_GROUP_RULE_PARAMS = {
+ lib_constants.IP_VERSION_4: {'ethertype': lib_constants.IPv4,
+ 'remote_ip_prefix': lib_constants.IPv4_ANY},
+ lib_constants.IP_VERSION_6: {'ethertype': lib_constants.IPv6,
+ 'remote_ip_prefix': lib_constants.IPv6_ANY},
+}
diff --git a/neutron_tempest_plugin/scenario/base.py b/neutron_tempest_plugin/scenario/base.py
index 32c5db8..cc1ca4c 100644
--- a/neutron_tempest_plugin/scenario/base.py
+++ b/neutron_tempest_plugin/scenario/base.py
@@ -122,29 +122,24 @@
Setting a group_id would only permit traffic from ports
belonging to the same security group.
"""
-
- rule_list = [{'protocol': 'tcp',
- 'direction': 'ingress',
- 'port_range_min': 22,
- 'port_range_max': 22,
- 'remote_ip_prefix': '0.0.0.0/0'}]
- client = client or cls.os_primary.network_client
- cls.create_secgroup_rules(rule_list, client=client,
- secgroup_id=secgroup_id)
+ return cls.create_security_group_rule(
+ security_group_id=secgroup_id,
+ client=client,
+ protocol=neutron_lib_constants.PROTO_NAME_TCP,
+ direction=neutron_lib_constants.INGRESS_DIRECTION,
+ port_range_min=22,
+ port_range_max=22)
@classmethod
def create_pingable_secgroup_rule(cls, secgroup_id=None,
client=None):
- """This rule is intended to permit inbound ping"""
+ """This rule is intended to permit inbound ping
- rule_list = [{'protocol': 'icmp',
- 'direction': 'ingress',
- 'port_range_min': 8, # type
- 'port_range_max': 0, # code
- 'remote_ip_prefix': '0.0.0.0/0'}]
- client = client or cls.os_primary.network_client
- cls.create_secgroup_rules(rule_list, client=client,
- secgroup_id=secgroup_id)
+ """
+ return cls.create_security_group_rule(
+ security_group_id=secgroup_id, client=client,
+ protocol=neutron_lib_constants.PROTO_NAME_ICMP,
+ direction=neutron_lib_constants.INGRESS_DIRECTION)
@classmethod
def create_router_by_client(cls, is_admin=False, **kwargs):