Improve IPV6 parity in Security Group testcases

This patch implements positive and negative IPV6 testcases
for Security Groups and adds few additional test cases to
validate icmp protocol, remote_group_id and remote_ip_prefix

Change-Id: I3dd272f15a282b2ac58beee10f36c8af66377727
Partially implements: bp ipv6-api-testing-parity
diff --git a/tempest/api/network/base.py b/tempest/api/network/base.py
index 834c010..d9b2848 100644
--- a/tempest/api/network/base.py
+++ b/tempest/api/network/base.py
@@ -83,6 +83,7 @@
         cls.fw_rules = []
         cls.fw_policies = []
         cls.ipsecpolicies = []
+        cls.ethertype = "IPv" + str(cls._ip_version)
 
     @classmethod
     def resource_cleanup(cls):
diff --git a/tempest/api/network/test_security_groups.py b/tempest/api/network/test_security_groups.py
index 9764b4d..58ad39c 100644
--- a/tempest/api/network/test_security_groups.py
+++ b/tempest/api/network/test_security_groups.py
@@ -17,11 +17,15 @@
 
 from tempest.api.network import base_security_groups as base
 from tempest.common.utils import data_utils
+from tempest import config
 from tempest import test
 
+CONF = config.CONF
+
 
 class SecGroupTest(base.BaseSecGroupTest):
     _interface = 'json'
+    _tenant_network_cidr = CONF.network.tenant_network_cidr
 
     @classmethod
     def resource_setup(cls):
@@ -30,6 +34,40 @@
             msg = "security-group extension not enabled."
             raise cls.skipException(msg)
 
+    def _create_verify_security_group_rule(self, sg_id, direction,
+                                           ethertype, protocol,
+                                           port_range_min,
+                                           port_range_max,
+                                           remote_group_id=None,
+                                           remote_ip_prefix=None):
+        # Create Security Group rule with the input params and validate
+        # that SG rule is created with the same parameters.
+        resp, rule_create_body = self.client.create_security_group_rule(
+            security_group_id=sg_id,
+            direction=direction,
+            ethertype=ethertype,
+            protocol=protocol,
+            port_range_min=port_range_min,
+            port_range_max=port_range_max,
+            remote_group_id=remote_group_id,
+            remote_ip_prefix=remote_ip_prefix
+        )
+
+        sec_group_rule = rule_create_body['security_group_rule']
+        self.addCleanup(self._delete_security_group_rule,
+                        sec_group_rule['id'])
+
+        expected = {'direction': direction, 'protocol': protocol,
+                    'ethertype': ethertype, 'port_range_min': port_range_min,
+                    'port_range_max': port_range_max,
+                    'remote_group_id': remote_group_id,
+                    'remote_ip_prefix': remote_ip_prefix}
+        for key, value in six.iteritems(expected):
+            self.assertEqual(value, sec_group_rule[key],
+                             "Field %s of the created security group "
+                             "rule does not match with %s." %
+                             (key, value))
+
     @test.attr(type='smoke')
     def test_list_security_groups(self):
         # Verify the that security group belonging to tenant exist in list
@@ -80,7 +118,8 @@
             _, rule_create_body = self.client.create_security_group_rule(
                 security_group_id=group_create_body['security_group']['id'],
                 protocol=protocol,
-                direction='ingress'
+                direction='ingress',
+                ethertype=self.ethertype
             )
 
             # Show details of the created security rule
@@ -102,30 +141,93 @@
 
     @test.attr(type='smoke')
     def test_create_security_group_rule_with_additional_args(self):
-        # Verify creating security group rule with the following
-        # arguments works: "protocol": "tcp", "port_range_max": 77,
-        # "port_range_min": 77, "direction":"ingress".
-        group_create_body, _ = self._create_security_group()
+        """Verify security group rule with additional arguments works.
 
+        direction:ingress, ethertype:[IPv4/IPv6],
+        protocol:tcp, port_range_min:77, port_range_max:77
+        """
+        group_create_body, _ = self._create_security_group()
+        sg_id = group_create_body['security_group']['id']
         direction = 'ingress'
         protocol = 'tcp'
         port_range_min = 77
         port_range_max = 77
-        _, rule_create_body = self.client.create_security_group_rule(
-            security_group_id=group_create_body['security_group']['id'],
-            direction=direction,
-            protocol=protocol,
-            port_range_min=port_range_min,
-            port_range_max=port_range_max
-        )
+        self._create_verify_security_group_rule(sg_id, direction,
+                                                self.ethertype, protocol,
+                                                port_range_min,
+                                                port_range_max)
 
-        sec_group_rule = rule_create_body['security_group_rule']
+    @test.attr(type='smoke')
+    def test_create_security_group_rule_with_icmp_type_code(self):
+        """Verify security group rule for icmp protocol works.
 
-        self.assertEqual(sec_group_rule['direction'], direction)
-        self.assertEqual(sec_group_rule['protocol'], protocol)
-        self.assertEqual(int(sec_group_rule['port_range_min']), port_range_min)
-        self.assertEqual(int(sec_group_rule['port_range_max']), port_range_max)
+        Specify icmp type (port_range_min) and icmp code
+        (port_range_max) with different values. A seperate testcase
+        is added for icmp protocol as icmp validation would be
+        different from tcp/udp.
+        """
+        group_create_body, _ = self._create_security_group()
+
+        sg_id = group_create_body['security_group']['id']
+        direction = 'ingress'
+        protocol = 'icmp'
+        icmp_type_codes = [(3, 2), (2, 3), (3, 0), (2, None)]
+        for icmp_type, icmp_code in icmp_type_codes:
+            self._create_verify_security_group_rule(sg_id, direction,
+                                                    self.ethertype, protocol,
+                                                    icmp_type, icmp_code)
+
+    @test.attr(type='smoke')
+    def test_create_security_group_rule_with_remote_group_id(self):
+        # Verify creating security group rule with remote_group_id works
+        sg1_body, _ = self._create_security_group()
+        sg2_body, _ = self._create_security_group()
+
+        sg_id = sg1_body['security_group']['id']
+        direction = 'ingress'
+        protocol = 'udp'
+        port_range_min = 50
+        port_range_max = 55
+        remote_id = sg2_body['security_group']['id']
+        self._create_verify_security_group_rule(sg_id, direction,
+                                                self.ethertype, protocol,
+                                                port_range_min,
+                                                port_range_max,
+                                                remote_group_id=remote_id)
+
+    @test.attr(type='smoke')
+    def test_create_security_group_rule_with_remote_ip_prefix(self):
+        # Verify creating security group rule with remote_ip_prefix works
+        sg1_body, _ = self._create_security_group()
+
+        sg_id = sg1_body['security_group']['id']
+        direction = 'ingress'
+        protocol = 'tcp'
+        port_range_min = 76
+        port_range_max = 77
+        ip_prefix = self._tenant_network_cidr
+        self._create_verify_security_group_rule(sg_id, direction,
+                                                self.ethertype, protocol,
+                                                port_range_min,
+                                                port_range_max,
+                                                remote_ip_prefix=ip_prefix)
 
 
 class SecGroupTestXML(SecGroupTest):
     _interface = 'xml'
+
+
+class SecGroupIPv6Test(SecGroupTest):
+    _ip_version = 6
+    _tenant_network_cidr = CONF.network.tenant_network_v6_cidr
+
+    @classmethod
+    def setUpClass(cls):
+        if not CONF.network_feature_enabled.ipv6:
+            skip_msg = "IPv6 Tests are disabled."
+            raise cls.skipException(skip_msg)
+        super(SecGroupIPv6Test, cls).setUpClass()
+
+
+class SecGroupIPv6TestXML(SecGroupIPv6Test):
+    _interface = 'xml'
diff --git a/tempest/api/network/test_security_groups_negative.py b/tempest/api/network/test_security_groups_negative.py
index 9c6c267..2e3091e 100644
--- a/tempest/api/network/test_security_groups_negative.py
+++ b/tempest/api/network/test_security_groups_negative.py
@@ -16,12 +16,16 @@
 import uuid
 
 from tempest.api.network import base_security_groups as base
+from tempest import config
 from tempest import exceptions
 from tempest import test
 
+CONF = config.CONF
+
 
 class NegativeSecGroupTest(base.BaseSecGroupTest):
     _interface = 'json'
+    _tenant_network_cidr = CONF.network.tenant_network_cidr
 
     @classmethod
     def resource_setup(cls):
@@ -60,23 +64,87 @@
         self.assertRaises(
             exceptions.BadRequest, self.client.create_security_group_rule,
             security_group_id=group_create_body['security_group']['id'],
-            protocol=pname, direction='ingress')
+            protocol=pname, direction='ingress', ethertype=self.ethertype)
+
+    @test.attr(type=['negative', 'gate'])
+    def test_create_security_group_rule_with_bad_remote_ip_prefix(self):
+        group_create_body, _ = self._create_security_group()
+
+        # Create rule with bad remote_ip_prefix
+        prefix = ['192.168.1./24', '192.168.1.1/33', 'bad_prefix', '256']
+        for remote_ip_prefix in prefix:
+            self.assertRaises(
+                exceptions.BadRequest, self.client.create_security_group_rule,
+                security_group_id=group_create_body['security_group']['id'],
+                protocol='tcp', direction='ingress', ethertype=self.ethertype,
+                remote_ip_prefix=remote_ip_prefix)
+
+    @test.attr(type=['negative', 'gate'])
+    def test_create_security_group_rule_with_non_existent_remote_groupid(self):
+        group_create_body, _ = self._create_security_group()
+        non_exist_id = str(uuid.uuid4())
+
+        # Create rule with non existent remote_group_id
+        group_ids = ['bad_group_id', non_exist_id]
+        for remote_group_id in group_ids:
+            self.assertRaises(
+                exceptions.NotFound, self.client.create_security_group_rule,
+                security_group_id=group_create_body['security_group']['id'],
+                protocol='tcp', direction='ingress', ethertype=self.ethertype,
+                remote_group_id=remote_group_id)
+
+    @test.attr(type=['negative', 'gate'])
+    def test_create_security_group_rule_with_remote_ip_and_group(self):
+        sg1_body, _ = self._create_security_group()
+        sg2_body, _ = self._create_security_group()
+
+        # Create rule specifying both remote_ip_prefix and remote_group_id
+        prefix = self._tenant_network_cidr
+        self.assertRaises(
+            exceptions.BadRequest, self.client.create_security_group_rule,
+            security_group_id=sg1_body['security_group']['id'],
+            protocol='tcp', direction='ingress',
+            ethertype=self.ethertype, remote_ip_prefix=prefix,
+            remote_group_id=sg2_body['security_group']['id'])
+
+    @test.attr(type=['negative', 'gate'])
+    def test_create_security_group_rule_with_bad_ethertype(self):
+        group_create_body, _ = self._create_security_group()
+
+        # Create rule with bad ethertype
+        ethertype = 'bad_ethertype'
+        self.assertRaises(
+            exceptions.BadRequest, self.client.create_security_group_rule,
+            security_group_id=group_create_body['security_group']['id'],
+            protocol='udp', direction='ingress', ethertype=ethertype)
 
     @test.attr(type=['negative', 'gate'])
     def test_create_security_group_rule_with_invalid_ports(self):
         group_create_body, _ = self._create_security_group()
 
-        # Create rule with invalid ports
+        # Create rule for tcp protocol with invalid ports
         states = [(-16, 80, 'Invalid value for port -16'),
                   (80, 79, 'port_range_min must be <= port_range_max'),
                   (80, 65536, 'Invalid value for port 65536'),
+                  (None, 6, 'port_range_min must be <= port_range_max'),
                   (-16, 65536, 'Invalid value for port')]
         for pmin, pmax, msg in states:
             ex = self.assertRaises(
                 exceptions.BadRequest, self.client.create_security_group_rule,
                 security_group_id=group_create_body['security_group']['id'],
                 protocol='tcp', port_range_min=pmin, port_range_max=pmax,
-                direction='ingress')
+                direction='ingress', ethertype=self.ethertype)
+            self.assertIn(msg, str(ex))
+
+        # Create rule for icmp protocol with invalid ports
+        states = [(1, 256, 'Invalid value for ICMP code'),
+                  (300, 1, 'Invalid value for ICMP type')]
+        for pmin, pmax, msg in states:
+            ex = self.assertRaises(
+                exceptions.BadRequest, self.client.create_security_group_rule,
+                security_group_id=group_create_body['security_group']['id'],
+                protocol='icmp', port_range_min=pmin, port_range_max=pmax,
+                direction='ingress', ethertype=self.ethertype)
             self.assertIn(msg, str(ex))
 
     @test.attr(type=['negative', 'smoke'])
@@ -88,14 +156,54 @@
                           name=name)
 
     @test.attr(type=['negative', 'smoke'])
+    def test_create_duplicate_security_group_rule_fails(self):
+        # Create duplicate security group rule, it should fail.
+        body, _ = self._create_security_group()
+
+        min_port = 66
+        max_port = 67
+        # Create a rule with valid params
+        resp, _ = self.client.create_security_group_rule(
+            security_group_id=body['security_group']['id'],
+            direction='ingress',
+            ethertype=self.ethertype,
+            protocol='tcp',
+            port_range_min=min_port,
+            port_range_max=max_port
+        )
+
+        # Try creating the same security group rule, it should fail
+        self.assertRaises(
+            exceptions.Conflict, self.client.create_security_group_rule,
+            security_group_id=body['security_group']['id'],
+            protocol='tcp', direction='ingress', ethertype=self.ethertype,
+            port_range_min=min_port, port_range_max=max_port)
+
+    @test.attr(type=['negative', 'smoke'])
     def test_create_security_group_rule_with_non_existent_security_group(self):
         # Create security group rules with not existing security group.
         non_existent_sg = str(uuid.uuid4())
         self.assertRaises(exceptions.NotFound,
                           self.client.create_security_group_rule,
                           security_group_id=non_existent_sg,
-                          direction='ingress')
+                          direction='ingress', ethertype=self.ethertype)
 
 
 class NegativeSecGroupTestXML(NegativeSecGroupTest):
     _interface = 'xml'
+
+
+class NegativeSecGroupIPv6Test(NegativeSecGroupTest):
+    _ip_version = 6
+    _tenant_network_cidr = CONF.network.tenant_network_v6_cidr
+
+    @classmethod
+    def setUpClass(cls):
+        if not CONF.network_feature_enabled.ipv6:
+            skip_msg = "IPv6 Tests are disabled."
+            raise cls.skipException(skip_msg)
+        super(NegativeSecGroupIPv6Test, cls).setUpClass()
+
+
+class NegativeSecGroupIPv6TestXML(NegativeSecGroupIPv6Test):
+    _interface = 'xml'