Add test for checking intra security group isolation

Default security group allows ingress packets of the same
security group (aka intra-sg). However, this may not be
what users want and these rules can be removed at any time.
This new test ensures that even in such cases, servers are
able to receive traffic like metadata and DHCP.

Related-Bug: #1881316
Change-Id: Iceb2abf908fa3c7bb59dec2c0400c8b2ba6fc1a8
Signed-off-by: Flavio Fernandes <flaviof@redhat.com>
diff --git a/neutron_tempest_plugin/scenario/test_security_groups.py b/neutron_tempest_plugin/scenario/test_security_groups.py
index dc14857..23a5224 100644
--- a/neutron_tempest_plugin/scenario/test_security_groups.py
+++ b/neutron_tempest_plugin/scenario/test_security_groups.py
@@ -400,3 +400,76 @@
                 ssh_clients[0],
                 ssh_clients[2],
                 test_ip, port)
+
+    @decorators.idempotent_id('f07d0159-8f9e-4faa-87f5-a869ab0ad490')
+    def test_intra_sg_isolation(self):
+        """Test intra security group isolation
+
+        This test creates a security group that does not allow ingress
+        packets from vms of the same security group. The purpose of this
+        test is to verify that intra SG traffic is properly blocked, while
+        traffic like metadata and DHCP remains working due to the
+        allow-related behavior of the egress rules (added via default).
+        """
+        # create a security group and make it loginable
+        secgrp_name = data_utils.rand_name('secgrp')
+        secgrp = self.os_primary.network_client.create_security_group(
+            name=secgrp_name)
+        secgrp_id = secgrp['security_group']['id']
+        # add security group to cleanup
+        self.security_groups.append(secgrp['security_group'])
+
+        # remove all rules and add ICMP, DHCP and metadata as egress,
+        # and ssh as ingress.
+        for sgr in secgrp['security_group']['security_group_rules']:
+            self.client.delete_security_group_rule(sgr['id'])
+
+        self.create_loginable_secgroup_rule(secgroup_id=secgrp_id)
+        rule_list = [{'direction': constants.EGRESS_DIRECTION,
+                      'protocol': constants.PROTO_NAME_TCP,
+                      'remote_ip_prefix': '169.254.169.254/32',
+                      'description': 'metadata out',
+                      },
+                     {'direction': constants.EGRESS_DIRECTION,
+                      'protocol': constants.PROTO_NAME_UDP,
+                      'port_range_min': '67',
+                      'port_range_max': '67',
+                      'description': 'dhcpv4 out',
+                      },
+                     {'direction': constants.EGRESS_DIRECTION,
+                      'protocol': constants.PROTO_NAME_ICMP,
+                      'description': 'ping out',
+                      },
+                     ]
+        self.create_secgroup_rules(rule_list, secgroup_id=secgrp_id)
+
+        # go vms, go!
+        ssh_clients, fips, servers = self.create_vm_testing_sec_grp(
+            num_servers=2, security_groups=[{'name': secgrp_name}])
+
+        # verify SSH functionality. This will ensure that servers were
+        # able to reach dhcp + metadata servers
+        for fip in fips:
+            self.check_connectivity(fip['floating_ip_address'],
+                                    CONF.validation.image_ssh_user,
+                                    self.keypair['private_key'])
+
+        # try to ping instances without intra SG permission (should fail)
+        self.check_remote_connectivity(
+            ssh_clients[0], fips[1]['fixed_ip_address'],
+            should_succeed=False)
+        self.check_remote_connectivity(
+            ssh_clients[1], fips[0]['fixed_ip_address'],
+            should_succeed=False)
+
+        # add intra sg rule. This will allow packets from servers that
+        # are in the same sg
+        rule_list = [{'direction': constants.INGRESS_DIRECTION,
+                      'remote_group_id': secgrp_id}]
+        self.create_secgroup_rules(rule_list, secgroup_id=secgrp_id)
+
+        # try to ping instances with intra SG permission
+        self.check_remote_connectivity(
+            ssh_clients[0], fips[1]['fixed_ip_address'])
+        self.check_remote_connectivity(
+            ssh_clients[1], fips[0]['fixed_ip_address'])