New basic API tests for the default SG rules templates CRUDs

This patch adds some basic API tests for the new API for default SG
rules templates. Those new tests are checking if by default SG rules are
set in the same way as legacy rules which were there since "forever".
Second test checks basic lifecycle of the SG rule template.

Depends-On: https://review.opendev.org/c/openstack/neutron/+/883246/

Related-Bug: #1983053
Change-Id: I458f54ff6b73e277fe9506e90fa6af44d9c51101
diff --git a/neutron_tempest_plugin/api/admin/test_default_security_group_rules.py b/neutron_tempest_plugin/api/admin/test_default_security_group_rules.py
new file mode 100644
index 0000000..826e2ac
--- /dev/null
+++ b/neutron_tempest_plugin/api/admin/test_default_security_group_rules.py
@@ -0,0 +1,260 @@
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import copy
+import random
+
+from neutron_lib import constants
+from tempest.lib import decorators
+from tempest.lib import exceptions as lib_exc
+
+from neutron_tempest_plugin.api import base
+
+RULE_KEYWORDS_TO_CHECK = [
+    'direction', 'remote_group_id', 'remote_address_group_id', 'description',
+    'protocol', 'port_range_min', 'port_range_max', 'ethertype',
+    'remote_ip_prefix', 'used_in_default_sg', 'used_in_non_default_sg'
+]
+
+
+class DefaultSecurityGroupRuleTest(base.BaseNetworkTest):
+    required_extensions = ['security-groups-default-rules']
+
+    credentials = ['primary', 'admin']
+
+    @classmethod
+    def setup_clients(cls):
+        super(DefaultSecurityGroupRuleTest, cls).setup_clients()
+        cls.admin_client = cls.os_admin.network_client
+
+    def _filter_not_relevant_rule_keys(self, rule):
+        new_rule = {}
+        rule_keys = list(rule.keys())
+        for k in rule_keys:
+            if k in RULE_KEYWORDS_TO_CHECK:
+                new_rule[k] = rule[k]
+        return new_rule
+
+    def _filter_not_relevant_rules_keys(self, rules):
+        return [self._filter_not_relevant_rule_keys(r) for r in rules]
+
+    def _assert_rules_exists(self, expected_rules, actual_rules):
+        actual_rules = self._filter_not_relevant_rules_keys(actual_rules)
+        for expected_rule in expected_rules:
+            self.assertIn(expected_rule, actual_rules)
+
+    @decorators.idempotent_id('2f3d3070-e9fa-4127-a33f-f1532fd89108')
+    def test_legacy_default_sg_rules_created_by_default(self):
+        expected_legacy_template_rules = [
+            {
+                'direction': 'egress',
+                'ethertype': 'IPv4',
+                'remote_group_id': None,
+                'protocol': None,
+                'remote_ip_prefix': None,
+                'remote_address_group_id': None,
+                'port_range_max': None,
+                'port_range_min': None,
+                'used_in_default_sg': True,
+                'used_in_non_default_sg': True,
+                'description': 'Legacy default SG rule for egress traffic'
+            }, {
+                'remote_group_id': 'PARENT',
+                'direction': 'ingress',
+                'ethertype': 'IPv6',
+                'protocol': None,
+                'remote_ip_prefix': None,
+                'remote_address_group_id': None,
+                'port_range_max': None,
+                'port_range_min': None,
+                'used_in_default_sg': True,
+                'used_in_non_default_sg': False,
+                'description': 'Legacy default SG rule for ingress traffic'
+            }, {
+                'remote_group_id': 'PARENT',
+                'direction': 'ingress',
+                'ethertype': 'IPv4',
+                'protocol': None,
+                'remote_ip_prefix': None,
+                'remote_address_group_id': None,
+                'port_range_max': None,
+                'port_range_min': None,
+                'used_in_default_sg': True,
+                'used_in_non_default_sg': False,
+                'description': 'Legacy default SG rule for ingress traffic'
+            }, {
+                'direction': 'egress',
+                'ethertype': 'IPv6',
+                'remote_group_id': None,
+                'protocol': None,
+                'remote_ip_prefix': None,
+                'remote_address_group_id': None,
+                'port_range_max': None,
+                'port_range_min': None,
+                'used_in_default_sg': True,
+                'used_in_non_default_sg': True,
+                'description': 'Legacy default SG rule for egress traffic'
+            }
+        ]
+        sg_rules_template = (
+            self.admin_client.list_default_security_group_rules()[
+                'default_security_group_rules'
+            ])
+        self._assert_rules_exists(expected_legacy_template_rules,
+                                  sg_rules_template)
+
+    @decorators.idempotent_id('df98f969-ff2d-4597-9765-f5d4f81f775f')
+    def test_default_security_group_rule_lifecycle(self):
+        tcp_port = random.randint(constants.PORT_RANGE_MIN,
+                                  constants.PORT_RANGE_MAX)
+        rule_args = {
+            'direction': 'ingress',
+            'ethertype': 'IPv4',
+            'protocol': 'tcp',
+            'port_range_max': tcp_port,
+            'port_range_min': tcp_port,
+            'used_in_default_sg': False,
+            'used_in_non_default_sg': True,
+            'description': (
+                'Allow tcp connections over IPv4 on port %s' % tcp_port)
+        }
+        expected_rule = {
+            'remote_group_id': None,
+            'direction': 'ingress',
+            'ethertype': 'IPv4',
+            'protocol': 'tcp',
+            'port_range_min': tcp_port,
+            'port_range_max': tcp_port,
+            'remote_ip_prefix': None,
+            'remote_address_group_id': None,
+            'used_in_default_sg': False,
+            'used_in_non_default_sg': True,
+            'description': (
+                'Allow tcp connections over IPv4 on port %s' % tcp_port)
+        }
+        created_rule_template = self.create_default_security_group_rule(
+            **rule_args)
+        self.assertDictEqual(
+            expected_rule,
+            self._filter_not_relevant_rule_keys(created_rule_template)
+        )
+        observed_rule_template = (
+            self.admin_client.get_default_security_group_rule(
+                created_rule_template['id'])
+        )['default_security_group_rule']
+        self.assertDictEqual(
+            expected_rule,
+            self._filter_not_relevant_rule_keys(observed_rule_template)
+        )
+
+        self.admin_client.delete_default_security_group_rule(
+            created_rule_template['id']
+        )
+        self.assertRaises(
+            lib_exc.NotFound,
+            self.admin_client.get_default_security_group_rule,
+            created_rule_template['id']
+        )
+
+    @decorators.idempotent_id('6c5a2f41-5899-47f4-9daf-4f8ddbbd3ad5')
+    def test_create_duplicate_default_security_group_rule_different_templates(
+            self):
+        tcp_port = random.randint(constants.PORT_RANGE_MIN,
+                                  constants.PORT_RANGE_MAX)
+        rule_args = {
+            'direction': 'ingress',
+            'ethertype': 'IPv4',
+            'protocol': 'tcp',
+            'port_range_max': tcp_port,
+            'port_range_min': tcp_port,
+            'used_in_default_sg': True,
+            'used_in_non_default_sg': True}
+        self.create_default_security_group_rule(**rule_args)
+
+        # Now, even if 'used_in_non_default_sg' will be different error should
+        # be returned as 'used_in_default_sg' is the same
+        new_rule_args = copy.copy(rule_args)
+        new_rule_args['used_in_non_default_sg'] = False
+        self.assertRaises(
+            lib_exc.Conflict,
+            self.admin_client.create_default_security_group_rule,
+            **new_rule_args)
+
+        # Same in the opposite way: even if 'used_in_default_sg' will be
+        # different error should be returned as 'used_in_non_default_sg'
+        # is the same
+        new_rule_args = copy.copy(rule_args)
+        new_rule_args['used_in_default_sg'] = False
+        self.assertRaises(
+            lib_exc.Conflict,
+            self.admin_client.create_default_security_group_rule,
+            **new_rule_args)
+
+    @decorators.idempotent_id('e4696607-1a13-48eb-8912-ee1e742d9471')
+    def test_create_same_default_security_group_rule_for_different_templates(
+            self):
+        tcp_port = random.randint(constants.PORT_RANGE_MIN,
+                                  constants.PORT_RANGE_MAX)
+        expected_rules = [{
+            'remote_group_id': None,
+            'direction': 'ingress',
+            'ethertype': 'IPv4',
+            'protocol': 'tcp',
+            'remote_ip_prefix': None,
+            'remote_address_group_id': None,
+            'port_range_max': tcp_port,
+            'port_range_min': tcp_port,
+            'used_in_default_sg': True,
+            'used_in_non_default_sg': False,
+            'description': ''
+        }, {
+            'remote_group_id': None,
+            'direction': 'ingress',
+            'ethertype': 'IPv4',
+            'protocol': 'tcp',
+            'remote_ip_prefix': None,
+            'remote_address_group_id': None,
+            'port_range_max': tcp_port,
+            'port_range_min': tcp_port,
+            'used_in_default_sg': False,
+            'used_in_non_default_sg': True,
+            'description': ''
+        }]
+
+        default_sg_rule_args = {
+            'direction': 'ingress',
+            'ethertype': 'IPv4',
+            'protocol': 'tcp',
+            'port_range_max': tcp_port,
+            'port_range_min': tcp_port,
+            'used_in_default_sg': True,
+            'used_in_non_default_sg': False}
+        self.create_default_security_group_rule(**default_sg_rule_args)
+
+        custom_sg_rule_args = {
+            'direction': 'ingress',
+            'ethertype': 'IPv4',
+            'protocol': 'tcp',
+            'port_range_max': tcp_port,
+            'port_range_min': tcp_port,
+            'used_in_default_sg': False,
+            'used_in_non_default_sg': True}
+        self.create_default_security_group_rule(**custom_sg_rule_args)
+
+        sg_rules_template = (
+            self.admin_client.list_default_security_group_rules()[
+                'default_security_group_rules'
+            ])
+        self._assert_rules_exists(expected_rules,
+                                  sg_rules_template)
diff --git a/neutron_tempest_plugin/api/base.py b/neutron_tempest_plugin/api/base.py
index b66fe0d..e3c9aad 100644
--- a/neutron_tempest_plugin/api/base.py
+++ b/neutron_tempest_plugin/api/base.py
@@ -135,6 +135,7 @@
         cls.admin_subnetpools = []
         cls.security_groups = []
         cls.admin_security_groups = []
+        cls.sg_rule_templates = []
         cls.projects = []
         cls.log_objects = []
         cls.reserved_subnet_cidrs = set()
@@ -243,6 +244,12 @@
                                          security_group,
                                          client=cls.admin_client)
 
+            # Clean up security group rule templates
+            for sg_rule_template in cls.sg_rule_templates:
+                cls._try_delete_resource(
+                    cls.admin_client.delete_default_security_group_rule,
+                    sg_rule_template['id'])
+
             for subnetpool in cls.subnetpools:
                 cls._try_delete_resource(cls.client.delete_subnetpool,
                                          subnetpool['id'])
@@ -971,6 +978,15 @@
         client.delete_security_group(security_group['id'])
 
     @classmethod
+    def get_security_group(cls, name='default', client=None):
+        client = client or cls.client
+        security_groups = client.list_security_groups()['security_groups']
+        for security_group in security_groups:
+            if security_group['name'] == name:
+                return security_group
+        raise ValueError("No such security group named {!r}".format(name))
+
+    @classmethod
     def create_security_group_rule(cls, security_group=None, project=None,
                                    client=None, ip_version=None, **kwargs):
         if project:
@@ -1006,13 +1022,11 @@
             'security_group_rule']
 
     @classmethod
-    def get_security_group(cls, name='default', client=None):
-        client = client or cls.client
-        security_groups = client.list_security_groups()['security_groups']
-        for security_group in security_groups:
-            if security_group['name'] == name:
-                return security_group
-        raise ValueError("No such security group named {!r}".format(name))
+    def create_default_security_group_rule(cls, **kwargs):
+        body = cls.admin_client.create_default_security_group_rule(**kwargs)
+        default_sg_rule = body['default_security_group_rule']
+        cls.sg_rule_templates.append(default_sg_rule)
+        return default_sg_rule
 
     @classmethod
     def create_keypair(cls, client=None, name=None, **kwargs):
diff --git a/neutron_tempest_plugin/services/network/json/network_client.py b/neutron_tempest_plugin/services/network/json/network_client.py
index 0666297..d5a827e 100644
--- a/neutron_tempest_plugin/services/network/json/network_client.py
+++ b/neutron_tempest_plugin/services/network/json/network_client.py
@@ -846,6 +846,38 @@
         self.expected_success(204, resp.status)
         return service_client.ResponseBody(resp, body)
 
+    def list_default_security_group_rules(self, **kwargs):
+        uri = '%s/default-security-group-rules' % self.uri_prefix
+        if kwargs:
+            uri += '?' + urlparse.urlencode(kwargs, doseq=1)
+        resp, body = self.get(uri)
+        self.expected_success(200, resp.status)
+        body = jsonutils.loads(body)
+        return service_client.ResponseBody(resp, body)
+
+    def get_default_security_group_rule(self, rule_id):
+        uri = '%s/default-security-group-rules/%s' % (self.uri_prefix,
+                                                      rule_id)
+        get_resp, get_resp_body = self.get(uri)
+        self.expected_success(200, get_resp.status)
+        body = jsonutils.loads(get_resp_body)
+        return service_client.ResponseBody(get_resp, body)
+
+    def create_default_security_group_rule(self, **kwargs):
+        post_body = {'default_security_group_rule': kwargs}
+        body = jsonutils.dumps(post_body)
+        uri = '%s/default-security-group-rules' % self.uri_prefix
+        resp, body = self.post(uri, body)
+        self.expected_success(201, resp.status)
+        body = jsonutils.loads(body)
+        return service_client.ResponseBody(resp, body)
+
+    def delete_default_security_group_rule(self, rule_id):
+        uri = '%s/default-security-group-rules/%s' % (self.uri_prefix, rule_id)
+        resp, body = self.delete(uri)
+        self.expected_success(204, resp.status)
+        return service_client.ResponseBody(resp, body)
+
     def list_ports(self, **kwargs):
         uri = '%s/ports' % self.uri_prefix
         if kwargs:
diff --git a/zuul.d/master_jobs.yaml b/zuul.d/master_jobs.yaml
index 49098dc..d3007a0 100644
--- a/zuul.d/master_jobs.yaml
+++ b/zuul.d/master_jobs.yaml
@@ -114,6 +114,7 @@
         - router
         - router_availability_zone
         - security-group
+        - security-groups-default-rules
         - security-groups-remote-address-group
         - segment
         - service-type