Improve security groups management.
- Add create_security_group_rule method to base class
to make easier creating rules for IPv6 test cases.
- Add delete_security_group method.
- Make sure segurity groups are deleted using the
client that has been used to create them.
- Improve security group client handling.
Change-Id: I50858d5198d8a70a2bb9fb278786c433d7cb12ca
diff --git a/neutron_tempest_plugin/api/base.py b/neutron_tempest_plugin/api/base.py
index 1b8239b..bcafe03 100644
--- a/neutron_tempest_plugin/api/base.py
+++ b/neutron_tempest_plugin/api/base.py
@@ -189,15 +189,15 @@
network['id'])
# Clean up security groups
- for secgroup in cls.security_groups:
- cls._try_delete_resource(cls.client.delete_security_group,
- secgroup['id'])
+ for security_group in cls.security_groups:
+ cls._try_delete_resource(cls.delete_security_group,
+ security_group)
# Clean up admin security groups
- for secgroup in cls.admin_security_groups:
- cls._try_delete_resource(
- cls.admin_client.delete_security_group,
- secgroup['id'])
+ for security_group in cls.admin_security_groups:
+ cls._try_delete_resource(cls.delete_security_group,
+ security_group,
+ client=cls.admin_client)
for subnetpool in cls.subnetpools:
cls._try_delete_resource(cls.client.delete_subnetpool,
@@ -717,18 +717,78 @@
description=test_description)['project']
cls.projects.append(project)
# Create a project will create a default security group.
- # We make these security groups into admin_security_groups.
sgs_list = cls.admin_client.list_security_groups(
tenant_id=project['id'])['security_groups']
- for sg in sgs_list:
- cls.admin_security_groups.append(sg)
+ for security_group in sgs_list:
+ # Make sure delete_security_group method will use
+ # the admin client for this group
+ security_group['client'] = cls.admin_client
+ cls.security_groups.append(security_group)
return project
@classmethod
- def create_security_group(cls, name, **kwargs):
- body = cls.client.create_security_group(name=name, **kwargs)
- cls.security_groups.append(body['security_group'])
- return body['security_group']
+ def create_security_group(cls, name=None, project=None, client=None,
+ **kwargs):
+ if project:
+ client = client or cls.admin_client
+ project_id = kwargs.setdefault('project_id', project['id'])
+ tenant_id = kwargs.setdefault('tenant_id', project['id'])
+ if project_id != project['id'] or tenant_id != project['id']:
+ raise ValueError('Project ID specified multiple times')
+ else:
+ client = client or cls.client
+
+ name = name or data_utils.rand_name(cls.__name__)
+ security_group = client.create_security_group(name=name, **kwargs)[
+ 'security_group']
+ security_group['client'] = client
+ cls.security_groups.append(security_group)
+ return security_group
+
+ @classmethod
+ def delete_security_group(cls, security_group, client=None):
+ client = client or security_group.get('client') or cls.client
+ client.delete_security_group(security_group['id'])
+
+ @classmethod
+ def create_security_group_rule(cls, security_group=None, project=None,
+ client=None, ip_version=None, **kwargs):
+ if project:
+ client = client or cls.admin_client
+ project_id = kwargs.setdefault('project_id', project['id'])
+ tenant_id = kwargs.setdefault('tenant_id', project['id'])
+ if project_id != project['id'] or tenant_id != project['id']:
+ raise ValueError('Project ID specified multiple times')
+
+ if 'security_group_id' not in kwargs:
+ security_group = (security_group or
+ cls.get_security_group(client=client))
+
+ if security_group:
+ client = client or security_group.get('client')
+ security_group_id = kwargs.setdefault('security_group_id',
+ security_group['id'])
+ if security_group_id != security_group['id']:
+ raise ValueError('Security group ID specified multiple times.')
+
+ ip_version = ip_version or cls._ip_version
+ default_params = (
+ constants.DEFAULT_SECURITY_GROUP_RULE_PARAMS[ip_version])
+ for key, value in default_params.items():
+ kwargs.setdefault(key, value)
+
+ client = client or cls.client
+ return client.create_security_group_rule(**kwargs)[
+ 'security_group_rule']
+
+ @classmethod
+ def get_security_group(cls, name='default', client=None):
+ client = client or cls.client
+ security_groups = client.list_security_groups()['security_groups']
+ for security_group in security_groups:
+ if security_group['name'] == name:
+ return security_group
+ raise ValueError("No such security group named {!r}".format(name))
@classmethod
def create_keypair(cls, client=None, name=None, **kwargs):