[Default SG rules] Test to check if SG rules are created from template

This patch adds new test which checks if SG rules actually created
automatically for new default and non-default SG are matching template
rules from neutron DB.

Depends-On: https://review.opendev.org/c/openstack/neutron/+/884474

Related-bug: #1983053
Change-Id: Ica0810413bef7f0e3e6dff21f6c9e4cda1945a43
diff --git a/neutron_tempest_plugin/api/admin/test_default_security_group_rules.py b/neutron_tempest_plugin/api/admin/test_default_security_group_rules.py
index 826e2ac..604dbea 100644
--- a/neutron_tempest_plugin/api/admin/test_default_security_group_rules.py
+++ b/neutron_tempest_plugin/api/admin/test_default_security_group_rules.py
@@ -28,7 +28,7 @@
 ]
 
 
-class DefaultSecurityGroupRuleTest(base.BaseNetworkTest):
+class DefaultSecurityGroupRuleTest(base.BaseAdminNetworkTest):
     required_extensions = ['security-groups-default-rules']
 
     credentials = ['primary', 'admin']
@@ -38,19 +38,19 @@
         super(DefaultSecurityGroupRuleTest, cls).setup_clients()
         cls.admin_client = cls.os_admin.network_client
 
-    def _filter_not_relevant_rule_keys(self, rule):
+    def _filter_not_relevant_rule_keys(self, rule, expected_keys=None):
+        expected_keys = expected_keys or RULE_KEYWORDS_TO_CHECK
         new_rule = {}
-        rule_keys = list(rule.keys())
-        for k in rule_keys:
-            if k in RULE_KEYWORDS_TO_CHECK:
+        for k in rule.keys():
+            if k in expected_keys:
                 new_rule[k] = rule[k]
         return new_rule
 
-    def _filter_not_relevant_rules_keys(self, rules):
-        return [self._filter_not_relevant_rule_keys(r) for r in rules]
+    def _filter_not_relevant_rules_keys(self, rules, keys=None):
+        keys = keys or RULE_KEYWORDS_TO_CHECK
+        return [self._filter_not_relevant_rule_keys(r, keys) for r in rules]
 
     def _assert_rules_exists(self, expected_rules, actual_rules):
-        actual_rules = self._filter_not_relevant_rules_keys(actual_rules)
         for expected_rule in expected_rules:
             self.assertIn(expected_rule, actual_rules)
 
@@ -111,8 +111,9 @@
             self.admin_client.list_default_security_group_rules()[
                 'default_security_group_rules'
             ])
-        self._assert_rules_exists(expected_legacy_template_rules,
-                                  sg_rules_template)
+        self._assert_rules_exists(
+            expected_legacy_template_rules,
+            self._filter_not_relevant_rules_keys(sg_rules_template))
 
     @decorators.idempotent_id('df98f969-ff2d-4597-9765-f5d4f81f775f')
     def test_default_security_group_rule_lifecycle(self):
@@ -256,5 +257,57 @@
             self.admin_client.list_default_security_group_rules()[
                 'default_security_group_rules'
             ])
-        self._assert_rules_exists(expected_rules,
-                                  sg_rules_template)
+        self._assert_rules_exists(
+            expected_rules,
+            self._filter_not_relevant_rules_keys(sg_rules_template))
+
+    def _validate_security_group_rules(self, sg, is_default_sg):
+        keys_to_check = [
+            'remote_group_id', 'direction', 'ethertype', 'protocol',
+            'remote_ip_prefix', 'remote_address_group_id', 'port_range_min',
+            'port_range_max']
+
+        if is_default_sg:
+            sg_rules_template = (
+                self.admin_client.list_default_security_group_rules(
+                    used_in_default_sg=True)['default_security_group_rules'])
+        else:
+            sg_rules_template = (
+                self.admin_client.list_default_security_group_rules(
+                    used_in_non_default_sg=True
+                )['default_security_group_rules'])
+        # NOTE(slaweq): We need to replace "PARENT" keyword in
+        # the "remote_group_id" attribute of every default sg rule template
+        # with actual SG ID
+        for rule in sg_rules_template:
+            if rule['remote_group_id'] == 'PARENT':
+                rule['remote_group_id'] = sg['id']
+
+        self._assert_rules_exists(
+            self._filter_not_relevant_rules_keys(
+                sg_rules_template, keys_to_check),
+            self._filter_not_relevant_rules_keys(
+                sg['security_group_rules'], keys_to_check))
+
+    @decorators.idempotent_id('29feedb1-6f04-4a1f-a778-2fae2c7b7dc8')
+    def test_security_group_rules_created_from_default_sg_rules_template(
+            self):
+        """Test if default SG and custom new SG have got proper SG rules.
+
+        This test creates new project and checks if its default SG has SG
+        rules matching default SG rules for that kind of SG.
+        Next it creates new SG for the same project and checks if that SG also
+        have proper SG rules based on the default SG rules template.
+        """
+
+        project = self.create_project()
+        # First check rules for default SG created automatically for each
+        # project
+        default_sg = self.admin_client.list_security_groups(
+            tenant_id=project['id'], name='default')['security_groups'][0]
+        self._validate_security_group_rules(default_sg, is_default_sg=True)
+
+        # And now create different SG for same project and check SG rules for
+        # such additional SG
+        sg = self.create_security_group(project=project)
+        self._validate_security_group_rules(sg, is_default_sg=False)
diff --git a/neutron_tempest_plugin/api/test_security_groups.py b/neutron_tempest_plugin/api/test_security_groups.py
index 14e0c66..85e4763 100644
--- a/neutron_tempest_plugin/api/test_security_groups.py
+++ b/neutron_tempest_plugin/api/test_security_groups.py
@@ -245,6 +245,14 @@
 
 class BaseSecGroupQuota(base.BaseAdminNetworkTest):
 
+    def setUp(self):
+        super().setUp()
+        # NOTE(slaweq): we don't know exactly how many rule templates may be
+        # created in the neutron db and used for every SG so, as in this test
+        # class we are checking quotas of SG, not SG rules, lets set quota for
+        # SG rules to -1
+        self._set_sg_rules_quota(-1)
+
     def _create_max_allowed_sg_amount(self):
         sg_amount = self._get_sg_amount()
         sg_quota = self._get_sg_quota()
@@ -270,17 +278,23 @@
         self.assertEqual(self._get_sg_quota(), new_sg_quota,
                          "Security group quota wasn't changed correctly")
 
-    def _set_sg_quota(self, val):
-        sg_quota = self._get_sg_quota()
+    def _set_quota(self, val, resource):
+        res_quota = self._get_quota(resource)
         project_id = self.client.project_id
-        self.admin_client.update_quotas(project_id, **{'security_group': val})
+        self.admin_client.update_quotas(project_id, **{resource: val})
         self.addCleanup(self.admin_client.update_quotas,
-                        project_id, **{'security_group': sg_quota})
+                        project_id, **{resource: res_quota})
 
-    def _get_sg_quota(self):
+    def _get_quota(self, resource):
         project_id = self.client.project_id
         quotas = self.admin_client.show_quotas(project_id)
-        return quotas['quota']['security_group']
+        return quotas['quota'][resource]
+
+    def _set_sg_quota(self, val):
+        return self._set_quota(val, 'security_group')
+
+    def _get_sg_quota(self):
+        return self._get_quota('security_group')
 
     def _get_sg_amount(self):
         project_id = self.client.project_id
@@ -288,6 +302,9 @@
         security_groups = self.client.list_security_groups(**filter_query)
         return len(security_groups['security_groups'])
 
+    def _set_sg_rules_quota(self, val):
+        return self._set_quota(val, 'security_group_rule')
+
 
 class SecGroupQuotaTest(BaseSecGroupQuota):