[Default SG rules] Test to check if SG rules are created from template
This patch adds new test which checks if SG rules actually created
automatically for new default and non-default SG are matching template
rules from neutron DB.
Depends-On: https://review.opendev.org/c/openstack/neutron/+/884474
Related-bug: #1983053
Change-Id: Ica0810413bef7f0e3e6dff21f6c9e4cda1945a43
diff --git a/neutron_tempest_plugin/api/admin/test_default_security_group_rules.py b/neutron_tempest_plugin/api/admin/test_default_security_group_rules.py
index 826e2ac..604dbea 100644
--- a/neutron_tempest_plugin/api/admin/test_default_security_group_rules.py
+++ b/neutron_tempest_plugin/api/admin/test_default_security_group_rules.py
@@ -28,7 +28,7 @@
]
-class DefaultSecurityGroupRuleTest(base.BaseNetworkTest):
+class DefaultSecurityGroupRuleTest(base.BaseAdminNetworkTest):
required_extensions = ['security-groups-default-rules']
credentials = ['primary', 'admin']
@@ -38,19 +38,19 @@
super(DefaultSecurityGroupRuleTest, cls).setup_clients()
cls.admin_client = cls.os_admin.network_client
- def _filter_not_relevant_rule_keys(self, rule):
+ def _filter_not_relevant_rule_keys(self, rule, expected_keys=None):
+ expected_keys = expected_keys or RULE_KEYWORDS_TO_CHECK
new_rule = {}
- rule_keys = list(rule.keys())
- for k in rule_keys:
- if k in RULE_KEYWORDS_TO_CHECK:
+ for k in rule.keys():
+ if k in expected_keys:
new_rule[k] = rule[k]
return new_rule
- def _filter_not_relevant_rules_keys(self, rules):
- return [self._filter_not_relevant_rule_keys(r) for r in rules]
+ def _filter_not_relevant_rules_keys(self, rules, keys=None):
+ keys = keys or RULE_KEYWORDS_TO_CHECK
+ return [self._filter_not_relevant_rule_keys(r, keys) for r in rules]
def _assert_rules_exists(self, expected_rules, actual_rules):
- actual_rules = self._filter_not_relevant_rules_keys(actual_rules)
for expected_rule in expected_rules:
self.assertIn(expected_rule, actual_rules)
@@ -111,8 +111,9 @@
self.admin_client.list_default_security_group_rules()[
'default_security_group_rules'
])
- self._assert_rules_exists(expected_legacy_template_rules,
- sg_rules_template)
+ self._assert_rules_exists(
+ expected_legacy_template_rules,
+ self._filter_not_relevant_rules_keys(sg_rules_template))
@decorators.idempotent_id('df98f969-ff2d-4597-9765-f5d4f81f775f')
def test_default_security_group_rule_lifecycle(self):
@@ -256,5 +257,57 @@
self.admin_client.list_default_security_group_rules()[
'default_security_group_rules'
])
- self._assert_rules_exists(expected_rules,
- sg_rules_template)
+ self._assert_rules_exists(
+ expected_rules,
+ self._filter_not_relevant_rules_keys(sg_rules_template))
+
+ def _validate_security_group_rules(self, sg, is_default_sg):
+ keys_to_check = [
+ 'remote_group_id', 'direction', 'ethertype', 'protocol',
+ 'remote_ip_prefix', 'remote_address_group_id', 'port_range_min',
+ 'port_range_max']
+
+ if is_default_sg:
+ sg_rules_template = (
+ self.admin_client.list_default_security_group_rules(
+ used_in_default_sg=True)['default_security_group_rules'])
+ else:
+ sg_rules_template = (
+ self.admin_client.list_default_security_group_rules(
+ used_in_non_default_sg=True
+ )['default_security_group_rules'])
+ # NOTE(slaweq): We need to replace "PARENT" keyword in
+ # the "remote_group_id" attribute of every default sg rule template
+ # with actual SG ID
+ for rule in sg_rules_template:
+ if rule['remote_group_id'] == 'PARENT':
+ rule['remote_group_id'] = sg['id']
+
+ self._assert_rules_exists(
+ self._filter_not_relevant_rules_keys(
+ sg_rules_template, keys_to_check),
+ self._filter_not_relevant_rules_keys(
+ sg['security_group_rules'], keys_to_check))
+
+ @decorators.idempotent_id('29feedb1-6f04-4a1f-a778-2fae2c7b7dc8')
+ def test_security_group_rules_created_from_default_sg_rules_template(
+ self):
+ """Test if default SG and custom new SG have got proper SG rules.
+
+ This test creates new project and checks if its default SG has SG
+ rules matching default SG rules for that kind of SG.
+ Next it creates new SG for the same project and checks if that SG also
+ have proper SG rules based on the default SG rules template.
+ """
+
+ project = self.create_project()
+ # First check rules for default SG created automatically for each
+ # project
+ default_sg = self.admin_client.list_security_groups(
+ tenant_id=project['id'], name='default')['security_groups'][0]
+ self._validate_security_group_rules(default_sg, is_default_sg=True)
+
+ # And now create different SG for same project and check SG rules for
+ # such additional SG
+ sg = self.create_security_group(project=project)
+ self._validate_security_group_rules(sg, is_default_sg=False)
diff --git a/neutron_tempest_plugin/api/test_security_groups.py b/neutron_tempest_plugin/api/test_security_groups.py
index 14e0c66..85e4763 100644
--- a/neutron_tempest_plugin/api/test_security_groups.py
+++ b/neutron_tempest_plugin/api/test_security_groups.py
@@ -245,6 +245,14 @@
class BaseSecGroupQuota(base.BaseAdminNetworkTest):
+ def setUp(self):
+ super().setUp()
+ # NOTE(slaweq): we don't know exactly how many rule templates may be
+ # created in the neutron db and used for every SG so, as in this test
+ # class we are checking quotas of SG, not SG rules, lets set quota for
+ # SG rules to -1
+ self._set_sg_rules_quota(-1)
+
def _create_max_allowed_sg_amount(self):
sg_amount = self._get_sg_amount()
sg_quota = self._get_sg_quota()
@@ -270,17 +278,23 @@
self.assertEqual(self._get_sg_quota(), new_sg_quota,
"Security group quota wasn't changed correctly")
- def _set_sg_quota(self, val):
- sg_quota = self._get_sg_quota()
+ def _set_quota(self, val, resource):
+ res_quota = self._get_quota(resource)
project_id = self.client.project_id
- self.admin_client.update_quotas(project_id, **{'security_group': val})
+ self.admin_client.update_quotas(project_id, **{resource: val})
self.addCleanup(self.admin_client.update_quotas,
- project_id, **{'security_group': sg_quota})
+ project_id, **{resource: res_quota})
- def _get_sg_quota(self):
+ def _get_quota(self, resource):
project_id = self.client.project_id
quotas = self.admin_client.show_quotas(project_id)
- return quotas['quota']['security_group']
+ return quotas['quota'][resource]
+
+ def _set_sg_quota(self, val):
+ return self._set_quota(val, 'security_group')
+
+ def _get_sg_quota(self):
+ return self._get_quota('security_group')
def _get_sg_amount(self):
project_id = self.client.project_id
@@ -288,6 +302,9 @@
security_groups = self.client.list_security_groups(**filter_query)
return len(security_groups['security_groups'])
+ def _set_sg_rules_quota(self, val):
+ return self._set_quota(val, 'security_group_rule')
+
class SecGroupQuotaTest(BaseSecGroupQuota):