Merge "[fwaas] Remove secgroup creation related methods"
diff --git a/neutron_tempest_plugin/fwaas/scenario/fwaas_v2_manager.py b/neutron_tempest_plugin/fwaas/scenario/fwaas_v2_manager.py
index 07d0060..decab10 100644
--- a/neutron_tempest_plugin/fwaas/scenario/fwaas_v2_manager.py
+++ b/neutron_tempest_plugin/fwaas/scenario/fwaas_v2_manager.py
@@ -455,176 +455,6 @@
floating_ip['id'])
return floating_ip
- def _create_security_group(self, security_group_rules_client=None,
- tenant_id=None,
- namestart='secgroup-smoke',
- security_groups_client=None):
- if security_group_rules_client is None:
- security_group_rules_client = self.security_group_rules_client
- if security_groups_client is None:
- security_groups_client = self.security_groups_client
- if tenant_id is None:
- tenant_id = security_groups_client.tenant_id
- secgroup = self._create_empty_security_group(
- namestart=namestart, client=security_groups_client,
- tenant_id=tenant_id)
-
- # Add rules to the security group
- rules = self._create_loginable_secgroup_rule(
- security_group_rules_client=security_group_rules_client,
- secgroup=secgroup,
- security_groups_client=security_groups_client)
- for rule in rules:
- self.assertEqual(tenant_id, rule['tenant_id'])
- self.assertEqual(secgroup['id'], rule['security_group_id'])
- return secgroup
-
- def _create_empty_security_group(self, client=None, tenant_id=None,
- namestart='secgroup-smoke'):
- """Create a security group without rules.
-
- Default rules will be created:
- - IPv4 egress to any
- - IPv6 egress to any
-
- :param tenant_id: secgroup will be created in this tenant
- :returns: the created security group
- """
- if client is None:
- client = self.security_groups_client
- if not tenant_id:
- tenant_id = client.tenant_id
- sg_name = data_utils.rand_name(namestart)
- sg_desc = sg_name + " description"
- sg_dict = dict(name=sg_name,
- description=sg_desc)
- sg_dict['tenant_id'] = tenant_id
- result = client.create_security_group(**sg_dict)
-
- secgroup = result['security_group']
- self.assertEqual(secgroup['name'], sg_name)
- self.assertEqual(tenant_id, secgroup['tenant_id'])
- self.assertEqual(secgroup['description'], sg_desc)
-
- self.addCleanup(test_utils.call_and_ignore_notfound_exc,
- client.delete_security_group, secgroup['id'])
- return secgroup
-
- def _default_security_group(self, client=None, tenant_id=None):
- """Get default secgroup for given tenant_id.
-
- :returns: default secgroup for given tenant
- """
- if client is None:
- client = self.security_groups_client
- if not tenant_id:
- tenant_id = client.tenant_id
- sgs = [
- sg for sg in list(client.list_security_groups().values())[0]
- if sg['tenant_id'] == tenant_id and sg['name'] == 'default'
- ]
- msg = "No default security group for tenant %s." % (tenant_id)
- self.assertGreater(len(sgs), 0, msg)
- return sgs[0]
-
- def _create_security_group_rule(self, secgroup=None,
- sec_group_rules_client=None,
- tenant_id=None,
- security_groups_client=None, **kwargs):
- """Create a rule from a dictionary of rule parameters.
-
- Create a rule in a secgroup. if secgroup not defined will search for
- default secgroup in tenant_id.
-
- :param secgroup: the security group.
- :param tenant_id: if secgroup not passed -- the tenant in which to
- search for default secgroup
- :param kwargs: a dictionary containing rule parameters:
- for example, to allow incoming ssh:
- rule = {
- direction: 'ingress'
- protocol:'tcp',
- port_range_min: 22,
- port_range_max: 22
- }
- """
- if sec_group_rules_client is None:
- sec_group_rules_client = self.security_group_rules_client
- if security_groups_client is None:
- security_groups_client = self.security_groups_client
- if not tenant_id:
- tenant_id = security_groups_client.tenant_id
- if secgroup is None:
- secgroup = self._default_security_group(
- client=security_groups_client, tenant_id=tenant_id)
-
- ruleset = dict(security_group_id=secgroup['id'],
- tenant_id=secgroup['tenant_id'])
- ruleset.update(kwargs)
-
- sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
- sg_rule = sg_rule['security_group_rule']
-
- self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
- self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
-
- return sg_rule
-
- def _create_loginable_secgroup_rule(self, security_group_rules_client=None,
- secgroup=None,
- security_groups_client=None):
- """Create loginable security group rule
-
- This function will create:
- 1. egress and ingress tcp port 22 allow rule in order to allow ssh
- access for ipv4.
- 2. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
- 3. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
- """
-
- if security_group_rules_client is None:
- security_group_rules_client = self.security_group_rules_client
- if security_groups_client is None:
- security_groups_client = self.security_groups_client
- rules = []
- rulesets = [
- dict(
- # ssh
- protocol='tcp',
- port_range_min=22,
- port_range_max=22,
- ),
- dict(
- # ping
- protocol='icmp',
- ),
- dict(
- # ipv6-icmp for ping6
- protocol='icmp',
- ethertype='IPv6',
- )
- ]
- sec_group_rules_client = security_group_rules_client
- for ruleset in rulesets:
- for r_direction in ['ingress', 'egress']:
- ruleset['direction'] = r_direction
- try:
- sg_rule = self._create_security_group_rule(
- sec_group_rules_client=sec_group_rules_client,
- secgroup=secgroup,
- security_groups_client=security_groups_client,
- **ruleset)
- except lib_exc.Conflict as ex:
- # if rule already exist - skip rule and continue
- msg = 'Security group rule already exists'
- if msg not in ex._error_string:
- raise ex
- else:
- self.assertEqual(r_direction, sg_rule['direction'])
- rules.append(sg_rule)
-
- return rules
-
def _create_router(self, client=None, tenant_id=None,
namestart='router-smoke'):
if not client:
diff --git a/neutron_tempest_plugin/fwaas/scenario/test_fwaas_v2.py b/neutron_tempest_plugin/fwaas/scenario/test_fwaas_v2.py
index 4681b88..8f0c0d5 100644
--- a/neutron_tempest_plugin/fwaas/scenario/test_fwaas_v2.py
+++ b/neutron_tempest_plugin/fwaas/scenario/test_fwaas_v2.py
@@ -196,7 +196,7 @@
resp['router_portid_2'] = router_portid_2
# Create a VM on each of the network and assign it a floating IP.
- security_group = self._create_security_group()
+ security_group = self.create_security_group()
server1, private_key1, server_fixed_ip_1, server_floating_ip_1 = (
self._create_test_server(network1, security_group))
server2, private_key2, server_fixed_ip_2, server_floating_ip_2 = (