Merge "Add the requirements.txt file to the tox deps"
diff --git a/neutron_tempest_plugin/api/base.py b/neutron_tempest_plugin/api/base.py
index e080d42..10821c1 100644
--- a/neutron_tempest_plugin/api/base.py
+++ b/neutron_tempest_plugin/api/base.py
@@ -994,8 +994,9 @@
         ip_version = ip_version or cls._ip_version
         default_params = (
             constants.DEFAULT_SECURITY_GROUP_RULE_PARAMS[ip_version])
-        if ('remote_address_group_id' in kwargs and 'remote_ip_prefix' in
-                default_params):
+        if (('remote_address_group_id' in kwargs or
+             'remote_group_id' in kwargs) and
+                'remote_ip_prefix' in default_params):
             default_params.pop('remote_ip_prefix')
         for key, value in default_params.items():
             kwargs.setdefault(key, value)
diff --git a/neutron_tempest_plugin/api/test_security_groups.py b/neutron_tempest_plugin/api/test_security_groups.py
index d251f8c..e7a9eae 100644
--- a/neutron_tempest_plugin/api/test_security_groups.py
+++ b/neutron_tempest_plugin/api/test_security_groups.py
@@ -29,13 +29,15 @@
 LOG = log.getLogger(__name__)
 
 
-class SecGroupTest(base.BaseAdminNetworkTest):
+class BaseSecGroupTest(base.BaseAdminNetworkTest):
 
     required_extensions = ['security-group']
 
-    @decorators.idempotent_id('bfd128e5-3c92-44b6-9d66-7fe29d22c802')
-    def test_create_list_update_show_delete_security_group(self):
-        security_group = self.create_security_group()
+    def _test_create_list_update_show_delete_security_group(self):
+        sg_kwargs = {}
+        if self.stateless_sg:
+            sg_kwargs['stateful'] = False
+        security_group = self.create_security_group(**sg_kwargs)
 
         # List security groups and verify if created group is there in response
         security_groups = self.client.list_security_groups()['security_groups']
@@ -61,9 +63,11 @@
         self.assertEqual(observed_security_group['description'],
                          new_description)
 
-    @decorators.idempotent_id('1fff0d57-bb6c-4528-9c1d-2326dce1c087')
-    def test_show_security_group_contains_all_rules(self):
-        security_group = self.create_security_group()
+    def _test_show_security_group_contains_all_rules(self):
+        sg_kwargs = {}
+        if self.stateless_sg:
+            sg_kwargs['stateful'] = False
+        security_group = self.create_security_group(**sg_kwargs)
         protocol = random.choice(list(base_security_groups.V4_PROTOCOL_NAMES))
         security_group_rule = self.create_security_group_rule(
             security_group=security_group,
@@ -80,14 +84,16 @@
         self.assertIn(
             security_group_rule['id'], observerd_security_group_rules_ids)
 
-    @decorators.idempotent_id('b5923b1a-4d33-44e1-af25-088dcb55b02b')
-    def test_list_security_group_rules_contains_all_rules(self):
+    def _test_list_security_group_rules_contains_all_rules(self):
         """Test list security group rules.
 
         This test checks if all SG rules which belongs to the tenant OR
         which belongs to the tenant's security group are listed.
         """
-        security_group = self.create_security_group()
+        sg_kwargs = {}
+        if self.stateless_sg:
+            sg_kwargs['stateful'] = False
+        security_group = self.create_security_group(**sg_kwargs)
         protocol = random.choice(list(base_security_groups.V4_PROTOCOL_NAMES))
         security_group_rule = self.create_security_group_rule(
             security_group=security_group,
@@ -98,9 +104,13 @@
 
         # Create also other SG with some custom rule to check that regular user
         # can't see this rule
-        admin_security_group = self.create_security_group(
-            project={'id': self.admin_client.tenant_id},
-            client=self.admin_client)
+        sg_kwargs = {
+            'project': {'id': self.admin_client.tenant_id},
+            'client': self.admin_client
+        }
+        if self.stateless_sg:
+            sg_kwargs['stateful'] = False
+        admin_security_group = self.create_security_group(**sg_kwargs)
         admin_security_group_rule = self.create_security_group_rule(
             security_group=admin_security_group,
             project={'id': self.admin_client.tenant_id},
@@ -113,12 +123,12 @@
         self.assertIn(security_group_rule['id'], rules_ids)
         self.assertNotIn(admin_security_group_rule['id'], rules_ids)
 
-    @decorators.idempotent_id('7c0ecb10-b2db-11e6-9b14-000c29248b0d')
-    def test_create_bulk_sec_groups(self):
+    def _test_create_bulk_sec_groups(self):
         # Creates 2 sec-groups in one request
         sec_nm = [data_utils.rand_name('secgroup'),
                   data_utils.rand_name('secgroup')]
-        body = self.client.create_bulk_security_groups(sec_nm)
+        body = self.client.create_bulk_security_groups(
+            sec_nm, stateless=self.stateless_sg)
         created_sec_grps = body['security_groups']
         self.assertEqual(2, len(created_sec_grps))
         for secgrp in created_sec_grps:
@@ -127,13 +137,16 @@
             self.assertIn(secgrp['name'], sec_nm)
             self.assertIsNotNone(secgrp['id'])
 
-    @decorators.idempotent_id('e93f33d8-57ea-11eb-b69b-74e5f9e2a801')
-    def test_create_sec_groups_with_the_same_name(self):
+    def _test_create_sec_groups_with_the_same_name(self):
         same_name_sg_number = 5
         sg_name = 'sg_zahlabut'
         sg_names = [sg_name] * same_name_sg_number
+        sg_kwargs = {}
+        if self.stateless_sg:
+            sg_kwargs['stateful'] = False
         for name in sg_names:
-            self.create_security_group(name=name)
+            sg_kwargs['name'] = name
+            self.create_security_group(**sg_kwargs)
         sec_groups = [item['id'] for item in
                       self.client.list_security_groups(
                           name=sg_name)['security_groups']]
@@ -143,9 +156,55 @@
             ' is: {}'.format(same_name_sg_number))
 
 
-class StatelessSecGroupTest(base.BaseAdminNetworkTest):
+class StatefulSecGroupTest(BaseSecGroupTest):
+
+    stateless_sg = False
+
+    @decorators.idempotent_id('bfd128e5-3c92-44b6-9d66-7fe29d22c802')
+    def test_create_list_update_show_delete_security_group(self):
+        self._test_create_list_update_show_delete_security_group()
+
+    @decorators.idempotent_id('1fff0d57-bb6c-4528-9c1d-2326dce1c087')
+    def test_show_security_group_contains_all_rules(self):
+        self._test_show_security_group_contains_all_rules()
+
+    @decorators.idempotent_id('b5923b1a-4d33-44e1-af25-088dcb55b02b')
+    def test_list_security_group_rules_contains_all_rules(self):
+        self._test_list_security_group_rules_contains_all_rules()
+
+    @decorators.idempotent_id('7c0ecb10-b2db-11e6-9b14-000c29248b0d')
+    def test_create_bulk_sec_groups(self):
+        self._test_create_bulk_sec_groups()
+
+    @decorators.idempotent_id('e93f33d8-57ea-11eb-b69b-74e5f9e2a801')
+    def test_create_sec_groups_with_the_same_name(self):
+        self._test_create_sec_groups_with_the_same_name()
+
+
+class StatelessSecGroupTest(BaseSecGroupTest):
 
     required_extensions = ['security-group', 'stateful-security-group']
+    stateless_sg = True
+
+    @decorators.idempotent_id('0214d58a-2177-47e1-af83-dcd45c024829')
+    def test_create_list_update_show_delete_security_group(self):
+        self._test_create_list_update_show_delete_security_group()
+
+    @decorators.idempotent_id('ddbc0e4c-840f-44ab-8718-0b95b7c7b575')
+    def test_show_security_group_contains_all_rules(self):
+        self._test_show_security_group_contains_all_rules()
+
+    @decorators.idempotent_id('cdf3a63a-08fe-4091-bab4-62180847990f')
+    def test_list_security_group_rules_contains_all_rules(self):
+        self._test_list_security_group_rules_contains_all_rules()
+
+    @decorators.idempotent_id('b33e612e-65f0-467b-9bf2-b5b2ce67f72f')
+    def test_create_bulk_sec_groups(self):
+        self._test_create_bulk_sec_groups()
+
+    @decorators.idempotent_id('a6896935-db18-413d-95f5-4f465e0e2209')
+    def test_create_sec_groups_with_the_same_name(self):
+        self._test_create_sec_groups_with_the_same_name()
 
     @decorators.idempotent_id('0a6c1476-3d1a-11ec-b0ec-0800277ac3d9')
     def test_stateless_security_group_update(self):
@@ -380,21 +439,16 @@
         self.assertEqual(self._get_sg_rules_quota(), new_quota)
 
 
-class SecGroupProtocolTest(base.BaseNetworkTest):
+class BaseSecGroupProtocolTest(base.BaseNetworkTest):
 
     protocol_names = base_security_groups.V4_PROTOCOL_NAMES
     protocol_ints = base_security_groups.V4_PROTOCOL_INTS
 
-    @decorators.idempotent_id('282e3681-aa6e-42a7-b05c-c341aa1e3cdf')
-    def test_security_group_rule_protocol_names(self):
-        self._test_security_group_rule_protocols(protocols=self.protocol_names)
-
-    @decorators.idempotent_id('66e47f1f-20b6-4417-8839-3cc671c7afa3')
-    def test_security_group_rule_protocol_ints(self):
-        self._test_security_group_rule_protocols(protocols=self.protocol_ints)
-
     def _test_security_group_rule_protocols(self, protocols):
-        security_group = self.create_security_group()
+        sg_kwargs = {}
+        if self.stateless_sg:
+            sg_kwargs['stateful'] = False
+        security_group = self.create_security_group(**sg_kwargs)
         for protocol in protocols:
             self._test_security_group_rule(
                 security_group=security_group,
@@ -414,14 +468,38 @@
                              "{!r} does not match.".format(key))
 
 
-class SecGroupProtocolIPv6Test(SecGroupProtocolTest):
+class StatefulSecGroupProtocolTest(BaseSecGroupProtocolTest):
+    stateless_sg = False
+
+    @decorators.idempotent_id('282e3681-aa6e-42a7-b05c-c341aa1e3cdf')
+    def test_security_group_rule_protocol_names(self):
+        self._test_security_group_rule_protocols(protocols=self.protocol_names)
+
+    @decorators.idempotent_id('66e47f1f-20b6-4417-8839-3cc671c7afa3')
+    def test_security_group_rule_protocol_ints(self):
+        self._test_security_group_rule_protocols(protocols=self.protocol_ints)
+
+
+class StatelessSecGroupProtocolTest(BaseSecGroupProtocolTest):
+    required_extensions = ['security-group', 'stateful-security-group']
+    stateless_sg = True
+
+    @decorators.idempotent_id('3a065cdd-99bd-409f-a08e-385c6674bec2')
+    def test_security_group_rule_protocol_names(self):
+        self._test_security_group_rule_protocols(protocols=self.protocol_names)
+
+    @decorators.idempotent_id('b0332b5d-6fac-49d5-a79d-ae4fe62600f7')
+    def test_security_group_rule_protocol_ints(self):
+        self._test_security_group_rule_protocols(protocols=self.protocol_ints)
+
+
+class BaseSecGroupProtocolIPv6Test(BaseSecGroupProtocolTest):
 
     _ip_version = constants.IP_VERSION_6
     protocol_names = base_security_groups.V6_PROTOCOL_NAMES
     protocol_ints = base_security_groups.V6_PROTOCOL_INTS
 
-    @decorators.idempotent_id('c7d17b41-3b4e-4add-bb3b-6af59baaaffa')
-    def test_security_group_rule_protocol_legacy_icmpv6(self):
+    def _test_security_group_rule_protocol_legacy_icmpv6(self):
         # These legacy protocols can be used to create security groups,
         # but they could be shown either with their passed protocol name,
         # or a canonical-ized version, depending on the neutron version.
@@ -439,7 +517,10 @@
                 ethertype=self.ethertype)
 
     def _test_security_group_rule_legacy(self, protocol_list, **kwargs):
-        security_group = self.create_security_group()
+        sg_kwargs = {}
+        if self.stateless_sg:
+            sg_kwargs['stateful'] = False
+        security_group = self.create_security_group(**sg_kwargs)
         security_group_rule = self.create_security_group_rule(
             security_group=security_group, **kwargs)
         observed_security_group_rule = self.client.show_security_group_rule(
@@ -457,6 +538,23 @@
                                  "{!r} does not match.".format(key))
 
 
+class StatefulSecGroupProtocolIPv6Test(BaseSecGroupProtocolIPv6Test):
+    stateless_sg = False
+
+    @decorators.idempotent_id('c7d17b41-3b4e-4add-bb3b-6af59baaaffa')
+    def test_security_group_rule_protocol_legacy_icmpv6(self):
+        self._test_security_group_rule_protocol_legacy_icmpv6()
+
+
+class StatelessSecGroupProtocolIPv6Test(BaseSecGroupProtocolIPv6Test):
+    required_extensions = ['security-group', 'stateful-security-group']
+    stateless_sg = True
+
+    @decorators.idempotent_id('a034814e-0fa5-4437-8e6f-0d2eebd668b3')
+    def test_security_group_rule_protocol_legacy_icmpv6(self):
+        self._test_security_group_rule_protocol_legacy_icmpv6()
+
+
 class RbacSharedSecurityGroupTest(base.BaseAdminNetworkTest):
 
     force_tenant_isolation = True
diff --git a/neutron_tempest_plugin/scenario/base.py b/neutron_tempest_plugin/scenario/base.py
index b9bf36f..4d9165f 100644
--- a/neutron_tempest_plugin/scenario/base.py
+++ b/neutron_tempest_plugin/scenario/base.py
@@ -190,6 +190,27 @@
             port_range_max=22)
 
     @classmethod
+    def create_ingress_metadata_secgroup_rule(cls, secgroup_id=None):
+        """This rule is intended to permit inbound metadata traffic
+
+        Allowing ingress traffic from metadata server, required only for
+        stateless security groups.
+        """
+        if getattr(cls, 'stateless_sg'):
+            # NOTE(slaweq): in case of stateless security groups, there is no
+            # "related" or "established" traffic matching at all so even if
+            # egress traffic to 169.254.169.254 is allowed by default SG, we
+            # need to explicitly allow ingress traffic from the metadata server
+            # to be able to receive responses in the guest vm
+            cls.create_security_group_rule(
+                security_group_id=secgroup_id,
+                direction=neutron_lib_constants.INGRESS_DIRECTION,
+                protocol=neutron_lib_constants.PROTO_NAME_TCP,
+                remote_ip_prefix='169.254.169.254/32',
+                description='metadata out'
+            )
+
+    @classmethod
     def create_pingable_secgroup_rule(cls, secgroup_id=None,
                                       client=None):
         """This rule is intended to permit inbound ping
diff --git a/neutron_tempest_plugin/scenario/test_security_groups.py b/neutron_tempest_plugin/scenario/test_security_groups.py
index 5af84db..7eae2eb 100644
--- a/neutron_tempest_plugin/scenario/test_security_groups.py
+++ b/neutron_tempest_plugin/scenario/test_security_groups.py
@@ -32,7 +32,7 @@
 CONF = config.CONF
 
 
-class NetworkSecGroupTest(base.BaseTempestTestCase):
+class BaseNetworkSecGroupTest(base.BaseTempestTestCase):
     credentials = ['primary', 'admin']
     required_extensions = ['router', 'security-group']
 
@@ -70,17 +70,17 @@
 
     @classmethod
     def setup_credentials(cls):
-        super(NetworkSecGroupTest, cls).setup_credentials()
+        super(BaseNetworkSecGroupTest, cls).setup_credentials()
         cls.network_client = cls.os_admin.network_client
 
     @classmethod
     def setup_clients(cls):
-        super(NetworkSecGroupTest, cls).setup_clients()
+        super(BaseNetworkSecGroupTest, cls).setup_clients()
         cls.project_id = cls.os_primary.credentials.tenant_id
 
     @classmethod
     def resource_setup(cls):
-        super(NetworkSecGroupTest, cls).resource_setup()
+        super(BaseNetworkSecGroupTest, cls).resource_setup()
         # setup basic topology for servers we can log into it
         cls.network = cls.create_network()
         cls.subnet = cls.create_subnet(cls.network)
@@ -89,7 +89,7 @@
         cls.keypair = cls.create_keypair()
 
     def setUp(self):
-        super(NetworkSecGroupTest, self).setUp()
+        super(BaseNetworkSecGroupTest, self).setUp()
         self.addCleanup(test_utils.call_and_ignore_notfound_exc,
                         self.network_client.reset_quotas, self.project_id)
         self.network_client.update_quotas(self.project_id, security_group=-1)
@@ -132,12 +132,19 @@
         # Add specific remote prefix to VMs and check connectivity
         ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
         icmp_secgrp_name = data_utils.rand_name('icmp_secgrp_with_cidr')
+        sg_kwargs = {}
+        if self.stateless_sg:
+            sg_kwargs['stateful'] = False
         ssh_secgrp = self.os_primary.network_client.create_security_group(
-            name=ssh_secgrp_name)
+            name=ssh_secgrp_name,
+            **sg_kwargs)
         self.create_loginable_secgroup_rule(
             secgroup_id=ssh_secgrp['security_group']['id'])
+        self.create_ingress_metadata_secgroup_rule(
+            secgroup_id=ssh_secgrp['security_group']['id'])
         icmp_secgrp = self.os_primary.network_client.create_security_group(
-            name=icmp_secgrp_name)
+            name=icmp_secgrp_name,
+            **sg_kwargs)
         self.create_secgroup_rules(
             rule_list, secgroup_id=icmp_secgrp['security_group']['id'])
         for sec_grp in (ssh_secgrp, icmp_secgrp):
@@ -157,8 +164,7 @@
             'fixed_ip_address'], should_succeed=should_succeed,
             servers=servers)
 
-    @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d764')
-    def test_default_sec_grp_scenarios(self):
+    def _test_default_sec_grp_scenarios(self):
         server_ssh_clients, fips, servers = self.create_vm_testing_sec_grp()
         # Check ssh connectivity when you add sec group rule, enabling ssh
         self.create_loginable_secgroup_rule(
@@ -191,11 +197,15 @@
         self.check_remote_connectivity(server_ssh_clients[0], ext_net_ip,
                                        servers=servers)
 
-    @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d864')
-    def test_protocol_number_rule(self):
+    def _test_protocol_number_rule(self):
         # protocol number is added instead of str in security rule creation
         name = data_utils.rand_name("test_protocol_number_rule")
-        security_group = self.create_security_group(name=name)
+        sg_kwargs = {
+            'name': name
+        }
+        if self.stateless_sg:
+            sg_kwargs['stateful'] = False
+        security_group = self.create_security_group(**sg_kwargs)
         port = self.create_port(network=self.network, name=name,
                                 security_groups=[security_group['id']])
         _, fips, _ = self.create_vm_testing_sec_grp(num_servers=1,
@@ -208,19 +218,25 @@
         self.create_secgroup_rules(rule_list, secgroup_id=security_group['id'])
         self.ping_ip_address(fips[0]['floating_ip_address'])
 
-    @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d964')
-    def test_two_sec_groups(self):
+    def _test_two_sec_groups(self):
         # add 2 sec groups to VM and test rules of both are working
         ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
         icmp_secgrp_name = data_utils.rand_name('icmp_secgrp')
+        sg_kwargs = {}
+        if self.stateless_sg:
+            sg_kwargs['stateful'] = False
         ssh_secgrp = self.os_primary.network_client.create_security_group(
-            name=ssh_secgrp_name)
+            name=ssh_secgrp_name,
+            **sg_kwargs)
         self.create_loginable_secgroup_rule(
             secgroup_id=ssh_secgrp['security_group']['id'])
         icmp_secgrp = self.os_primary.network_client.create_security_group(
-            name=icmp_secgrp_name)
+            name=icmp_secgrp_name,
+            **sg_kwargs)
         self.create_pingable_secgroup_rule(
             secgroup_id=icmp_secgrp['security_group']['id'])
+        self.create_ingress_metadata_secgroup_rule(
+            secgroup_id=ssh_secgrp['security_group']['id'])
         for sec_grp in (ssh_secgrp, icmp_secgrp):
             self.security_groups.append(sec_grp['security_group'])
         security_groups_list = [{'name': ssh_secgrp_name},
@@ -264,79 +280,22 @@
         # make sure ICMP connectivity works after update
         self.ping_ip_address(fips[0]['floating_ip_address'])
 
-    @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d664')
-    def test_ip_prefix(self):
-        cidr = self.subnet['cidr']
-        rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
-                      'direction': constants.INGRESS_DIRECTION,
-                      'remote_ip_prefix': cidr}]
-        self._test_ip_prefix(rule_list, should_succeed=True)
-
-    @decorators.attr(type='negative')
-    @decorators.idempotent_id('a01cd2ef-3cfc-4614-8aac-9d1333ea21dd')
-    def test_ip_prefix_negative(self):
-        # define bad CIDR
-        cidr = '10.100.0.254/32'
-        rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
-                      'direction': constants.INGRESS_DIRECTION,
-                      'remote_ip_prefix': cidr}]
-        self._test_ip_prefix(rule_list, should_succeed=False)
-
-    @decorators.idempotent_id('01f0ddca-b049-47eb-befd-82acb502c9ec')
-    def test_established_tcp_session_after_re_attachinging_sg(self):
-        """Test existing connection remain open after sg has been re-attached
-
-        Verifies that new packets can pass over the existing connection when
-        the security group has been removed from the server and then added
-        back
-        """
-
-        ssh_sg = self.create_security_group()
-        self.create_loginable_secgroup_rule(secgroup_id=ssh_sg['id'])
-        vm_ssh, fips, vms = self.create_vm_testing_sec_grp(
-                security_groups=[{'name': ssh_sg['name']}])
-        sg = self.create_security_group()
-        nc_rule = [{'protocol': constants.PROTO_NUM_TCP,
-                    'direction': constants.INGRESS_DIRECTION,
-                    'port_range_min': 6666,
-                    'port_range_max': 6666}]
-        self.create_secgroup_rules(nc_rule, secgroup_id=sg['id'])
-        srv_port = self.client.list_ports(network_id=self.network['id'],
-                device_id=vms[1]['server']['id'])['ports'][0]
-        srv_ip = srv_port['fixed_ips'][0]['ip_address']
-        with utils.StatefulConnection(
-                vm_ssh[0], vm_ssh[1], srv_ip, 6666) as con:
-            self.client.update_port(srv_port['id'],
-                    security_groups=[ssh_sg['id'], sg['id']])
-            con.test_connection()
-        with utils.StatefulConnection(
-                vm_ssh[0], vm_ssh[1], srv_ip, 6666) as con:
-            self.client.update_port(
-                    srv_port['id'], security_groups=[ssh_sg['id']])
-            con.test_connection(should_pass=False)
-        with utils.StatefulConnection(
-                vm_ssh[0], vm_ssh[1], srv_ip, 6666) as con:
-            self.client.update_port(srv_port['id'],
-                    security_groups=[ssh_sg['id'], sg['id']])
-            con.test_connection()
-            self.client.update_port(srv_port['id'],
-                    security_groups=[ssh_sg['id']])
-            con.test_connection(should_pass=False)
-            self.client.update_port(srv_port['id'],
-                    security_groups=[ssh_sg['id'], sg['id']])
-            con.test_connection()
-
-    @decorators.idempotent_id('7ed39b86-006d-40fb-887a-ae46693dabc9')
-    def test_remote_group(self):
+    def _test_remote_group(self):
+        sg_kwargs = {}
+        if self.stateless_sg:
+            sg_kwargs['stateful'] = False
         # create a new sec group
         ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
         ssh_secgrp = self.os_primary.network_client.create_security_group(
-            name=ssh_secgrp_name)
+            name=ssh_secgrp_name,
+            **sg_kwargs)
         # add cleanup
         self.security_groups.append(ssh_secgrp['security_group'])
         # configure sec group to support SSH connectivity
         self.create_loginable_secgroup_rule(
             secgroup_id=ssh_secgrp['security_group']['id'])
+        self.create_ingress_metadata_secgroup_rule(
+            secgroup_id=ssh_secgrp['security_group']['id'])
         # spawn two instances with the sec group created
         server_ssh_clients, fips, servers = self.create_vm_testing_sec_grp(
             security_groups=[{'name': ssh_secgrp_name}])
@@ -363,14 +322,7 @@
         self.ping_ip_address(fips[0]['floating_ip_address'],
                              should_succeed=False)
 
-    @testtools.skipUnless(
-        CONF.neutron_plugin_options.firewall_driver == 'openvswitch',
-        "Openvswitch agent is required to run this test")
-    @decorators.idempotent_id('678dd4c0-2953-4626-b89c-8e7e4110ec4b')
-    @tempest_utils.requires_ext(extension="address-group", service="network")
-    @tempest_utils.requires_ext(
-        extension="security-groups-remote-address-group", service="network")
-    def test_remote_group_and_remote_address_group(self):
+    def _test_remote_group_and_remote_address_group(self):
         """Test SG rules with remote group and remote address group
 
         This test checks the ICMP connection among two servers using a security
@@ -379,10 +331,14 @@
         is applied. When both rules are applied (overlapped), removing one of
         them should not disable the connection.
         """
+        sg_kwargs = {}
+        if self.stateless_sg:
+            sg_kwargs['stateful'] = False
         # create a new sec group
         ssh_secgrp_name = data_utils.rand_name('ssh_secgrp')
         ssh_secgrp = self.os_primary.network_client.create_security_group(
-            name=ssh_secgrp_name)
+            name=ssh_secgrp_name,
+            **sg_kwargs)
         # add cleanup
         self.security_groups.append(ssh_secgrp['security_group'])
         # configure sec group to support SSH connectivity
@@ -452,8 +408,7 @@
         self.ping_ip_address(fips[0]['floating_ip_address'],
                              should_succeed=False)
 
-    @decorators.idempotent_id('f07d0159-8f9e-4faa-87f5-a869ab0ad488')
-    def test_multiple_ports_secgroup_inheritance(self):
+    def _test_multiple_ports_secgroup_inheritance(self):
         """Test multiple port security group inheritance
 
         This test creates two ports with security groups, then
@@ -461,12 +416,18 @@
         inherited properly and enforced in these instances.
         """
         # create a security group and make it loginable and pingable
+        sg_kwargs = {}
+        if self.stateless_sg:
+            sg_kwargs['stateful'] = False
         secgrp = self.os_primary.network_client.create_security_group(
-            name=data_utils.rand_name('secgrp'))
+            name=data_utils.rand_name('secgrp'),
+            **sg_kwargs)
         self.create_loginable_secgroup_rule(
             secgroup_id=secgrp['security_group']['id'])
         self.create_pingable_secgroup_rule(
             secgroup_id=secgrp['security_group']['id'])
+        self.create_ingress_metadata_secgroup_rule(
+            secgroup_id=secgrp['security_group']['id'])
         # add security group to cleanup
         self.security_groups.append(secgrp['security_group'])
         # create two ports with fixed IPs and the security group created
@@ -485,17 +446,30 @@
                                     CONF.validation.image_ssh_user,
                                     self.keypair['private_key'])
 
-    @decorators.idempotent_id('f07d0159-8f9e-4faa-87f5-a869ab0ad489')
-    def test_multiple_ports_portrange_remote(self):
+    def _test_multiple_ports_portrange_remote(self):
+        sg_kwargs = {}
+        initial_security_groups = []
+        if self.stateless_sg:
+            sg_kwargs['stateful'] = False
+            md_secgrp = self.os_primary.network_client.create_security_group(
+                name=data_utils.rand_name('metadata_secgrp'),
+                **sg_kwargs)
+            self.create_ingress_metadata_secgroup_rule(
+                secgroup_id=md_secgrp['security_group']['id'])
+            initial_security_groups.append(
+                {'name': md_secgrp['security_group']['name']})
+
         ssh_clients, fips, servers = self.create_vm_testing_sec_grp(
-            num_servers=3)
+            num_servers=3, security_groups=initial_security_groups)
         secgroups = []
         ports = []
 
         # Create remote and test security groups
         for i in range(0, 2):
             secgroups.append(
-                self.create_security_group(name='secgrp-%d' % i))
+                self.create_security_group(
+                    name='secgrp-%d' % i,
+                    **sg_kwargs))
             # configure sec groups to support SSH connectivity
             self.create_loginable_secgroup_rule(
                 secgroup_id=secgroups[-1]['id'])
@@ -535,6 +509,21 @@
                       'port_range_min': '82',
                       'port_range_max': '83',
                       'remote_group_id': secgroups[0]['id']}]
+        if self.stateless_sg:
+            rule_list.append({
+                'protocol': constants.PROTO_NUM_TCP,
+                'direction': constants.EGRESS_DIRECTION,
+                'remote_group_id': secgroups[0]['id']})
+            # NOTE(slaweq): in case of stateless SG, client needs to have also
+            # rule which will explicitly accept ingress connections from
+            # secgroup[1]
+
+            self.create_security_group_rule(
+                security_group_id=secgroups[0]['id'],
+                protocol=constants.PROTO_NAME_TCP,
+                direction=constants.INGRESS_DIRECTION,
+                remote_group_id=secgroups[1]['id'])
+
         self.create_secgroup_rules(
             rule_list, secgroup_id=secgroups[1]['id'])
 
@@ -558,6 +547,170 @@
                     ssh_clients[0], ssh_clients[2], test_ip, port) as con:
                 con.test_connection(should_pass=False)
 
+    def _test_overlapping_sec_grp_rules(self):
+        """Test security group rules with overlapping port ranges"""
+        sg_kwargs = {}
+        initial_security_groups = []
+        if self.stateless_sg:
+            sg_kwargs['stateful'] = False
+            md_secgrp = self.os_primary.network_client.create_security_group(
+                name=data_utils.rand_name('metadata_secgrp'),
+                **sg_kwargs)
+            self.create_ingress_metadata_secgroup_rule(
+                secgroup_id=md_secgrp['security_group']['id'])
+            initial_security_groups.append(
+                {'name': md_secgrp['security_group']['name']})
+        client_ssh, _, vms = self.create_vm_testing_sec_grp(
+            num_servers=2, security_groups=initial_security_groups)
+        tmp_ssh, _, tmp_vm = self.create_vm_testing_sec_grp(
+            num_servers=1, security_groups=initial_security_groups)
+        srv_ssh = tmp_ssh[0]
+        srv_vm = tmp_vm[0]
+        srv_port = self.client.list_ports(network_id=self.network['id'],
+                device_id=srv_vm['server']['id'])['ports'][0]
+        srv_ip = srv_port['fixed_ips'][0]['ip_address']
+        secgrps = []
+        for i, vm in enumerate(vms):
+            sg = self.create_security_group(
+                name='secgrp-%d' % i,
+                **sg_kwargs)
+            self.create_loginable_secgroup_rule(secgroup_id=sg['id'])
+            port = self.client.list_ports(network_id=self.network['id'],
+                    device_id=vm['server']['id'])['ports'][0]
+            self.client.update_port(port['id'], security_groups=[sg['id']])
+            secgrps.append(sg)
+        tcp_port = 3000
+        rule_list = [{'protocol': constants.PROTO_NUM_TCP,
+                      'direction': constants.INGRESS_DIRECTION,
+                      'port_range_min': tcp_port,
+                      'port_range_max': tcp_port,
+                      'remote_group_id': secgrps[0]['id']},
+                     {'protocol': constants.PROTO_NUM_TCP,
+                      'direction': constants.INGRESS_DIRECTION,
+                      'port_range_min': tcp_port,
+                      'port_range_max': tcp_port + 2,
+                      'remote_group_id': secgrps[1]['id']}]
+        self.client.update_port(srv_port['id'],
+                security_groups=[secgrps[0]['id'], secgrps[1]['id']])
+        self.create_secgroup_rules(rule_list, secgroup_id=secgrps[0]['id'])
+
+        if self.stateless_sg:
+            # NOTE(slaweq): in case of stateless SG, client needs to have also
+            # rule which will explicitly accept ingress TCP connections which
+            # will be replies from the TCP server so it will use random
+            # destination port (depends on the src port choosen by client while
+            # establishing connection)
+            self.create_security_group_rule(
+                security_group_id=secgrps[0]['id'],
+                protocol=constants.PROTO_NAME_TCP,
+                direction=constants.INGRESS_DIRECTION)
+            self.create_security_group_rule(
+                security_group_id=secgrps[1]['id'],
+                protocol=constants.PROTO_NAME_TCP,
+                direction=constants.INGRESS_DIRECTION)
+
+        # The conntrack entries are ruled by the OF definitions but conntrack
+        # status can change the datapath. Let's check the rules in two
+        # attempts
+        for _ in range(2):
+            with utils.StatefulConnection(
+                    client_ssh[0], srv_ssh, srv_ip, tcp_port) as con:
+                con.test_connection()
+            for port in range(tcp_port, tcp_port + 3):
+                with utils.StatefulConnection(
+                        client_ssh[1], srv_ssh, srv_ip, port) as con:
+                    con.test_connection()
+
+    def _test_remove_sec_grp_from_active_vm(self):
+        """Tests the following:
+
+        1. Create SG associated with ICMP rule
+        2. Create Port (assoiated to SG #1) and use it to create the VM
+        3. Ping the VM, expected should be PASS
+        4. Remove the security group from VM by Port update
+        5. Ping the VM, expected should be FAIL
+        """
+        sec_grp_name = data_utils.rand_name('test_sg')
+        sg_kwargs = {
+            'name': sec_grp_name
+        }
+        if self.stateless_sg:
+            sg_kwargs['stateful'] = False
+        secgrp = self.os_primary.network_client.create_security_group(
+            **sg_kwargs)
+        self.security_groups.append(secgrp['security_group'])
+        sec_grp_id = secgrp['security_group']['id']
+        self.create_pingable_secgroup_rule(sec_grp_id)
+
+        ex_port = self.create_port(
+            self.network, fixed_ips=[{'subnet_id': self.subnet['id']}],
+            security_groups=[sec_grp_id])
+        fip = self.create_vm_testing_sec_grp(
+            num_servers=1, security_groups=[{'name': sec_grp_name}],
+            ports=[ex_port])[1][0]
+
+        self.ping_ip_address(fip['floating_ip_address'])
+        self.client.update_port(ex_port['id'],
+                                security_groups=[])
+        self.ping_ip_address(fip['floating_ip_address'],
+                             should_succeed=False)
+
+
+class StatefulNetworkSecGroupTest(BaseNetworkSecGroupTest):
+    stateless_sg = False
+
+    @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d764')
+    def test_default_sec_grp_scenarios(self):
+        self._test_default_sec_grp_scenarios()
+
+    @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d864')
+    def test_protocol_number_rule(self):
+        self._test_protocol_number_rule()
+
+    @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d964')
+    def test_two_sec_groups(self):
+        self._test_two_sec_groups()
+
+    @decorators.idempotent_id('3d73ec1a-2ec6-45a9-b0f8-04a283d9d664')
+    def test_ip_prefix(self):
+        cidr = self.subnet['cidr']
+        rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
+                      'direction': constants.INGRESS_DIRECTION,
+                      'remote_ip_prefix': cidr}]
+        self._test_ip_prefix(rule_list, should_succeed=True)
+
+    @decorators.attr(type='negative')
+    @decorators.idempotent_id('a01cd2ef-3cfc-4614-8aac-9d1333ea21dd')
+    def test_ip_prefix_negative(self):
+        # define bad CIDR
+        cidr = '10.100.0.254/32'
+        rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
+                      'direction': constants.INGRESS_DIRECTION,
+                      'remote_ip_prefix': cidr}]
+        self._test_ip_prefix(rule_list, should_succeed=False)
+
+    @decorators.idempotent_id('7ed39b86-006d-40fb-887a-ae46693dabc9')
+    def test_remote_group(self):
+        self._test_remote_group()
+
+    @testtools.skipUnless(
+        CONF.neutron_plugin_options.firewall_driver == 'openvswitch',
+        "Openvswitch agent is required to run this test")
+    @decorators.idempotent_id('678dd4c0-2953-4626-b89c-8e7e4110ec4b')
+    @tempest_utils.requires_ext(extension="address-group", service="network")
+    @tempest_utils.requires_ext(
+        extension="security-groups-remote-address-group", service="network")
+    def test_remote_group_and_remote_address_group(self):
+        self._test_remote_group_and_remote_address_group()
+
+    @decorators.idempotent_id('f07d0159-8f9e-4faa-87f5-a869ab0ad488')
+    def test_multiple_ports_secgroup_inheritance(self):
+        self._test_multiple_ports_secgroup_inheritance()
+
+    @decorators.idempotent_id('f07d0159-8f9e-4faa-87f5-a869ab0ad489')
+    def test_multiple_ports_portrange_remote(self):
+        self._test_multiple_ports_portrange_remote()
+
     @decorators.idempotent_id('f07d0159-8f9e-4faa-87f5-a869ab0ad490')
     def test_intra_sg_isolation(self):
         """Test intra security group isolation
@@ -570,8 +723,11 @@
         """
         # create a security group and make it loginable
         secgrp_name = data_utils.rand_name('secgrp')
+        sg_kwargs = {
+            'name': secgrp_name
+        }
         secgrp = self.os_primary.network_client.create_security_group(
-            name=secgrp_name)
+            **sg_kwargs)
         secgrp_id = secgrp['security_group']['id']
         # add security group to cleanup
         self.security_groups.append(secgrp['security_group'])
@@ -633,74 +789,120 @@
 
     @decorators.idempotent_id('cd66b826-d86c-4fb4-ab37-17c8391753cb')
     def test_overlapping_sec_grp_rules(self):
-        """Test security group rules with overlapping port ranges"""
-        client_ssh, _, vms = self.create_vm_testing_sec_grp(num_servers=2)
-        tmp_ssh, _, tmp_vm = self.create_vm_testing_sec_grp(num_servers=1)
-        srv_ssh = tmp_ssh[0]
-        srv_vm = tmp_vm[0]
-        srv_port = self.client.list_ports(network_id=self.network['id'],
-                device_id=srv_vm['server']['id'])['ports'][0]
-        srv_ip = srv_port['fixed_ips'][0]['ip_address']
-        secgrps = []
-        for i, vm in enumerate(vms):
-            sg = self.create_security_group(name='secgrp-%d' % i)
-            self.create_loginable_secgroup_rule(secgroup_id=sg['id'])
-            port = self.client.list_ports(network_id=self.network['id'],
-                    device_id=vm['server']['id'])['ports'][0]
-            self.client.update_port(port['id'], security_groups=[sg['id']])
-            secgrps.append(sg)
-        tcp_port = 3000
-        rule_list = [{'protocol': constants.PROTO_NUM_TCP,
-                      'direction': constants.INGRESS_DIRECTION,
-                      'port_range_min': tcp_port,
-                      'port_range_max': tcp_port,
-                      'remote_group_id': secgrps[0]['id']},
-                     {'protocol': constants.PROTO_NUM_TCP,
-                      'direction': constants.INGRESS_DIRECTION,
-                      'port_range_min': tcp_port,
-                      'port_range_max': tcp_port + 2,
-                      'remote_group_id': secgrps[1]['id']}]
-        self.client.update_port(srv_port['id'],
-                security_groups=[secgrps[0]['id'], secgrps[1]['id']])
-        self.create_secgroup_rules(rule_list, secgroup_id=secgrps[0]['id'])
-        # The conntrack entries are ruled by the OF definitions but conntrack
-        # status can change the datapath. Let's check the rules in two
-        # attempts
-        for _ in range(2):
-            with utils.StatefulConnection(
-                    client_ssh[0], srv_ssh, srv_ip, tcp_port) as con:
-                con.test_connection()
-            for port in range(tcp_port, tcp_port + 3):
-                with utils.StatefulConnection(
-                        client_ssh[1], srv_ssh, srv_ip, port) as con:
-                    con.test_connection()
+        self._test_overlapping_sec_grp_rules()
 
     @decorators.idempotent_id('96dcd5ff-9d45-4e0d-bea0-0b438cbd388f')
     def test_remove_sec_grp_from_active_vm(self):
-        """Tests the following:
+        self._test_remove_sec_grp_from_active_vm()
 
-        1. Create SG associated with ICMP rule
-        2. Create Port (assoiated to SG #1) and use it to create the VM
-        3. Ping the VM, expected should be PASS
-        4. Remove the security group from VM by Port update
-        5. Ping the VM, expected should be FAIL
+    @decorators.idempotent_id('01f0ddca-b049-47eb-befd-82acb502c9ec')
+    def test_established_tcp_session_after_re_attachinging_sg(self):
+        """Test existing connection remain open after sg has been re-attached
+
+        Verifies that new packets can pass over the existing connection when
+        the security group has been removed from the server and then added
+        back
         """
-        sec_grp_name = data_utils.rand_name('test_sg')
-        secgrp = self.os_primary.network_client.create_security_group(
-            name=sec_grp_name)
-        self.security_groups.append(secgrp['security_group'])
-        sec_grp_id = secgrp['security_group']['id']
-        self.create_pingable_secgroup_rule(sec_grp_id)
 
-        ex_port = self.create_port(
-            self.network, fixed_ips=[{'subnet_id': self.subnet['id']}],
-            security_groups=[sec_grp_id])
-        fip = self.create_vm_testing_sec_grp(
-            num_servers=1, security_groups=[{'name': sec_grp_name}],
-            ports=[ex_port])[1][0]
+        sg_kwargs = {}
+        if self.stateless_sg:
+            sg_kwargs['stateful'] = False
+        ssh_sg = self.create_security_group(**sg_kwargs)
+        self.create_loginable_secgroup_rule(secgroup_id=ssh_sg['id'])
+        vm_ssh, fips, vms = self.create_vm_testing_sec_grp(
+                security_groups=[{'name': ssh_sg['name']}])
+        sg = self.create_security_group(**sg_kwargs)
+        nc_rule = [{'protocol': constants.PROTO_NUM_TCP,
+                    'direction': constants.INGRESS_DIRECTION,
+                    'port_range_min': 6666,
+                    'port_range_max': 6666}]
+        self.create_secgroup_rules(nc_rule, secgroup_id=sg['id'])
+        srv_port = self.client.list_ports(network_id=self.network['id'],
+                device_id=vms[1]['server']['id'])['ports'][0]
+        srv_ip = srv_port['fixed_ips'][0]['ip_address']
+        with utils.StatefulConnection(
+                vm_ssh[0], vm_ssh[1], srv_ip, 6666) as con:
+            self.client.update_port(srv_port['id'],
+                    security_groups=[ssh_sg['id'], sg['id']])
+            con.test_connection()
+        with utils.StatefulConnection(
+                vm_ssh[0], vm_ssh[1], srv_ip, 6666) as con:
+            self.client.update_port(
+                    srv_port['id'], security_groups=[ssh_sg['id']])
+            con.test_connection(should_pass=False)
+        with utils.StatefulConnection(
+                vm_ssh[0], vm_ssh[1], srv_ip, 6666) as con:
+            self.client.update_port(srv_port['id'],
+                    security_groups=[ssh_sg['id'], sg['id']])
+            con.test_connection()
+            self.client.update_port(srv_port['id'],
+                    security_groups=[ssh_sg['id']])
+            con.test_connection(should_pass=False)
+            self.client.update_port(srv_port['id'],
+                    security_groups=[ssh_sg['id'], sg['id']])
+            con.test_connection()
 
-        self.ping_ip_address(fip['floating_ip_address'])
-        self.client.update_port(ex_port['id'],
-                                security_groups=[])
-        self.ping_ip_address(fip['floating_ip_address'],
-                             should_succeed=False)
+
+class StatelessNetworkSecGroupTest(BaseNetworkSecGroupTest):
+    required_extensions = ['security-group', 'stateful-security-group']
+    stateless_sg = True
+
+    @decorators.idempotent_id('9e193e3f-56f2-4f4e-886c-988a147958ef')
+    def test_default_sec_grp_scenarios(self):
+        self._test_default_sec_grp_scenarios()
+
+    @decorators.idempotent_id('afae8654-a389-4887-b21d-7f07ec350177')
+    def test_protocol_number_rule(self):
+        self._test_protocol_number_rule()
+
+    @decorators.idempotent_id('b51cc0eb-8f9a-49e7-96ab-61cd31243b67')
+    def test_two_sec_groups(self):
+        self._test_two_sec_groups()
+
+    @decorators.idempotent_id('07985496-58da-4c1f-a6ef-2fdd88128a81')
+    def test_ip_prefix(self):
+        cidr = self.subnet['cidr']
+        rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
+                      'direction': constants.INGRESS_DIRECTION,
+                      'remote_ip_prefix': cidr}]
+        self._test_ip_prefix(rule_list, should_succeed=True)
+
+    @decorators.attr(type='negative')
+    @decorators.idempotent_id('1ad469c4-0d8f-42ae-8ec3-46cc424565c4')
+    def test_ip_prefix_negative(self):
+        # define bad CIDR
+        cidr = '10.100.0.254/32'
+        rule_list = [{'protocol': constants.PROTO_NUM_ICMP,
+                      'direction': constants.INGRESS_DIRECTION,
+                      'remote_ip_prefix': cidr}]
+        self._test_ip_prefix(rule_list, should_succeed=False)
+
+    @decorators.idempotent_id('fa1e93bf-67c5-4590-9962-38ee1f43a46a')
+    def test_remote_group(self):
+        self._test_remote_group()
+
+    @testtools.skipUnless(
+        CONF.neutron_plugin_options.firewall_driver == 'openvswitch',
+        "Openvswitch agent is required to run this test")
+    @decorators.idempotent_id('9fae530d-2711-4c61-a4a5-8efe6e58ab14')
+    @tempest_utils.requires_ext(extension="address-group", service="network")
+    @tempest_utils.requires_ext(
+        extension="security-groups-remote-address-group", service="network")
+    def test_remote_group_and_remote_address_group(self):
+        self._test_remote_group_and_remote_address_group()
+
+    @decorators.idempotent_id('4f1eb6db-ae7f-4f26-b371-cbd8363f9b0b')
+    def test_multiple_ports_secgroup_inheritance(self):
+        self._test_multiple_ports_secgroup_inheritance()
+
+    @decorators.idempotent_id('4043ca0a-eabb-4198-be53-3d3051cc0804')
+    def test_multiple_ports_portrange_remote(self):
+        self._test_multiple_ports_portrange_remote()
+
+    @decorators.idempotent_id('bfe25138-ceac-4944-849a-b9b90aff100f')
+    def test_overlapping_sec_grp_rules(self):
+        self._test_overlapping_sec_grp_rules()
+
+    @decorators.idempotent_id('e4340e47-39cd-49ed-967c-fc2c40b47c5a')
+    def test_remove_sec_grp_from_active_vm(self):
+        self._test_remove_sec_grp_from_active_vm()
diff --git a/neutron_tempest_plugin/services/network/json/network_client.py b/neutron_tempest_plugin/services/network/json/network_client.py
index a917b4f..0666297 100644
--- a/neutron_tempest_plugin/services/network/json/network_client.py
+++ b/neutron_tempest_plugin/services/network/json/network_client.py
@@ -273,9 +273,13 @@
         self.expected_success(201, resp.status)
         return service_client.ResponseBody(resp, body)
 
-    def create_bulk_security_groups(self, security_group_list):
+    def create_bulk_security_groups(self, security_group_list,
+                                    stateless=False):
         group_list = [{'security_group': {'name': name}}
                       for name in security_group_list]
+        if stateless:
+            for group in group_list:
+                group['security_group']['stateful'] = False
         post_data = {'security_groups': group_list}
         body = self.serialize_list(post_data, 'security_groups',
                                    'security_group')
diff --git a/zuul.d/base-nested-switch.yaml b/zuul.d/base-nested-switch.yaml
index dcc0175..9689d82 100644
--- a/zuul.d/base-nested-switch.yaml
+++ b/zuul.d/base-nested-switch.yaml
@@ -13,7 +13,7 @@
     name: neutron-tempest-plugin-base-nested-switch
     parent: neutron-tempest-plugin-base
     abstract: true
-    branches: ^(?!stable/(queens|rocky|stein|train|ussuri)).*$
+    branches: ^(?!stable/(queens|rocky|stein|train|ussuri|victoria)).*$
     # Comment nodeset and vars to switch back to non nested nodes
     nodeset: neutron-nested-virt-ubuntu-focal
     vars:
@@ -26,7 +26,7 @@
 
 # Base nested switch job for EM releases
 - job:
-    name: neutron-tempest-plugin-scenario-nested-switch
+    name: neutron-tempest-plugin-base-nested-switch
     parent: neutron-tempest-plugin-base
     abstract: true
-    branches: ^(stable/(queens|rocky|stein|train|ussuri)).*$
+    branches: ^(stable/(queens|rocky|stein|train|ussuri|victoria)).*$
diff --git a/zuul.d/master_jobs.yaml b/zuul.d/master_jobs.yaml
index 48b146d..93c8fe2 100644
--- a/zuul.d/master_jobs.yaml
+++ b/zuul.d/master_jobs.yaml
@@ -164,9 +164,6 @@
               quota_floatingip: 500
               quota_security_group: 150
               quota_security_group_rule: 1000
-          # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-          # devstack-tempest job will be switched to use lib/neutron instead of
-          # lib/neutron-legacy
           /$NEUTRON_CORE_PLUGIN_CONF:
             ml2:
               type_drivers: flat,geneve,vlan,gre,local,vxlan
@@ -256,9 +253,6 @@
             DEFAULT:
               enable_dvr: false
               l3_ha: true
-          # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-          # devstack-tempest job will be switched to use lib/neutron instead of
-          # lib/neutron-legacy
           /$NEUTRON_CORE_PLUGIN_CONF:
             agent:
               tunnel_types: vxlan,gre
@@ -344,7 +338,8 @@
       # bug https://bugzilla.redhat.com/show_bug.cgi?id=1965036 will be fixed
       tempest_exclude_regex: "\
           (^neutron_tempest_plugin.scenario.test_trunk.TrunkTest.test_subport_connectivity)|\
-          (^neutron_tempest_plugin.scenario.test_security_groups.NetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)"
+          (^neutron_tempest_plugin.scenario.test_security_groups.StatefulNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
+          (^neutron_tempest_plugin.scenario.test_security_groups.StatelessNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)"
       devstack_localrc:
         Q_AGENT: openvswitch
         Q_ML2_TENANT_NETWORK_TYPE: vxlan
@@ -356,9 +351,6 @@
             DEFAULT:
               enable_dvr: false
               l3_ha: true
-          # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-          # devstack-tempest job will be switched to use lib/neutron instead of
-          # lib/neutron-legacy
           /$NEUTRON_CORE_PLUGIN_CONF:
             agent:
               tunnel_types: vxlan,gre
@@ -496,7 +488,8 @@
       # fixed
       tempest_exclude_regex: "\
           (^neutron_tempest_plugin.scenario.test_vlan_transparency.VlanTransparencyTest)|\
-          (^neutron_tempest_plugin.scenario.test_security_groups.NetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
+          (^neutron_tempest_plugin.scenario.test_security_groups.StatefulNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
+          (^neutron_tempest_plugin.scenario.test_security_groups.StatelessNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
           (^neutron_tempest_plugin.scenario.test_floatingip.FloatingIPPortDetailsTest.test_floatingip_port_details)"
       devstack_localrc:
         Q_AGENT: linuxbridge
@@ -514,9 +507,6 @@
               debug_iptables_rules: true
             EXPERIMENTAL:
               linuxbridge: true
-          # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-          # devstack-tempest job will be switched to use lib/neutron instead of
-          # lib/neutron-legacy
           /$NEUTRON_CORE_PLUGIN_CONF:
             ml2:
               type_drivers: flat,vlan,local,vxlan
@@ -594,14 +584,6 @@
         Q_ML2_PLUGIN_TYPE_DRIVERS: local,flat,vlan,geneve
         Q_ML2_TENANT_NETWORK_TYPE: geneve
         Q_USE_PROVIDERNET_FOR_PUBLIC: true
-        # NOTE(slaweq): In the job with OVN backend we can't use Ubuntu minimal
-        # image because kernel in that image don't supports MULTICAST traffic
-        # thus multicast scenario test with IGMP snooping enabled would fail
-        IMAGE_URLS: https://cloud-images.ubuntu.com/releases/bionic/release/ubuntu-18.04-server-cloudimg-amd64.img
-        ADVANCED_IMAGE_NAME: ubuntu-18.04-server-cloudimg-amd64
-        ADVANCED_INSTANCE_TYPE: ntp_image_384M
-        ADVANCED_INSTANCE_USER: ubuntu
-        CUSTOMIZE_IMAGE: false
         ENABLE_CHASSIS_AS_GW: true
         OVN_L3_CREATE_PUBLIC_NETWORK: true
         OVN_DBS_LOG_LEVEL: dbg
@@ -803,9 +785,6 @@
               quota_security_group_rule: 1000
             DEFAULT:
               router_distributed: True
-          # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-          # devstack-tempest job will be switched to use lib/neutron instead of
-          # lib/neutron-legacy
           "/$NEUTRON_CORE_PLUGIN_CONF":
             ml2:
               type_drivers: flat,geneve,vlan,gre,local,vxlan
@@ -885,9 +864,6 @@
             $NEUTRON_CONF:
               DEFAULT:
                 router_distributed: True
-            # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-            # devstack-tempest job will be switched to use lib/neutron instead of
-            # lib/neutron-legacy
             "/$NEUTRON_CORE_PLUGIN_CONF":
               agent:
                 enable_distributed_routing: True
@@ -1297,11 +1273,6 @@
         - taas-vlan-filter
       devstack_localrc:
         NETWORK_API_EXTENSIONS: "{{ (network_api_extensions_common + network_api_extensions_tempest) | join(',') }}"
-        IMAGE_URLS: https://cloud-images.ubuntu.com/releases/bionic/release/ubuntu-18.04-server-cloudimg-amd64.img
-        ADVANCED_IMAGE_NAME: ubuntu-18.04-server-cloudimg-amd64
-        ADVANCED_INSTANCE_TYPE: ntp_image_384M
-        ADVANCED_INSTANCE_USER: ubuntu
-        CUSTOMIZE_IMAGE: false
         BUILD_TIMEOUT: 784
         Q_AGENT: openvswitch
         Q_ML2_TENANT_NETWORK_TYPE: vxlan,vlan
diff --git a/zuul.d/queens_jobs.yaml b/zuul.d/queens_jobs.yaml
index 483d11d..66d4ff3 100644
--- a/zuul.d/queens_jobs.yaml
+++ b/zuul.d/queens_jobs.yaml
@@ -97,9 +97,6 @@
         ML2_L3_PLUGIN: router
       devstack_local_conf:
         post-config:
-          # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-          # devstack-tempest job will be switched to use lib/neutron instead of
-          # lib/neutron-legacy
           /$NEUTRON_CORE_PLUGIN_CONF:
             AGENT:
               tunnel_types: gre,vxlan
@@ -203,9 +200,6 @@
               enable_dvr: false
             AGENT:
               debug_iptables_rules: true
-          # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-          # devstack-tempest job will be switched to use lib/neutron instead of
-          # lib/neutron-legacy
           /$NEUTRON_CORE_PLUGIN_CONF:
             ml2:
               type_drivers: flat,vlan,local,vxlan
diff --git a/zuul.d/rocky_jobs.yaml b/zuul.d/rocky_jobs.yaml
index c6bbca8..39db503 100644
--- a/zuul.d/rocky_jobs.yaml
+++ b/zuul.d/rocky_jobs.yaml
@@ -101,9 +101,6 @@
         ML2_L3_PLUGIN: router
       devstack_local_conf:
         post-config:
-          # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-          # devstack-tempest job will be switched to use lib/neutron instead of
-          # lib/neutron-legacy
           /$NEUTRON_CORE_PLUGIN_CONF:
             AGENT:
               tunnel_types: gre,vxlan
@@ -184,9 +181,6 @@
             DEFAULT:
               enable_dvr: false
               l3_ha: true
-          # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-          # devstack-tempest job will be switched to use lib/neutron instead of
-          # lib/neutron-legacy
           /$NEUTRON_CORE_PLUGIN_CONF:
             agent:
               tunnel_types: vxlan,gre
@@ -302,9 +296,6 @@
             DEFAULT:
               enable_dvr: false
               l3_ha: true
-          # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-          # devstack-tempest job will be switched to use lib/neutron instead of
-          # lib/neutron-legacy
           /$NEUTRON_CORE_PLUGIN_CONF:
             agent:
               tunnel_types: vxlan,gre
@@ -399,9 +390,6 @@
               enable_dvr: false
             AGENT:
               debug_iptables_rules: true
-          # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-          # devstack-tempest job will be switched to use lib/neutron instead of
-          # lib/neutron-legacy
           /$NEUTRON_CORE_PLUGIN_CONF:
             ml2:
               type_drivers: flat,vlan,local,vxlan
@@ -514,9 +502,6 @@
               quota_security_group_rule: 1000
             DEFAULT:
               router_distributed: True
-          # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-          # devstack-tempest job will be switched to use lib/neutron instead of
-          # lib/neutron-legacy
           "/$NEUTRON_CORE_PLUGIN_CONF":
             ml2:
               type_drivers: flat,geneve,vlan,gre,local,vxlan
@@ -587,9 +572,6 @@
             $NEUTRON_CONF:
               DEFAULT:
                 router_distributed: True
-            # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-            # devstack-tempest job will be switched to use lib/neutron instead of
-            # lib/neutron-legacy
             "/$NEUTRON_CORE_PLUGIN_CONF":
               agent:
                 enable_distributed_routing: True
diff --git a/zuul.d/stein_jobs.yaml b/zuul.d/stein_jobs.yaml
index dc77ad3..491642c 100644
--- a/zuul.d/stein_jobs.yaml
+++ b/zuul.d/stein_jobs.yaml
@@ -116,9 +116,6 @@
         ADVANCED_INSTANCE_USER: ubuntu
       devstack_local_conf:
         post-config:
-          # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-          # devstack-tempest job will be switched to use lib/neutron instead of
-          # lib/neutron-legacy
           /$NEUTRON_CORE_PLUGIN_CONF:
             AGENT:
               tunnel_types: gre,vxlan
@@ -211,9 +208,6 @@
             DEFAULT:
               enable_dvr: false
               l3_ha: true
-          # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-          # devstack-tempest job will be switched to use lib/neutron instead of
-          # lib/neutron-legacy
           /$NEUTRON_CORE_PLUGIN_CONF:
             agent:
               tunnel_types: vxlan,gre
@@ -311,9 +305,6 @@
               l3_ha: true
             AGENT:
               debug_iptables_rules: true
-          # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-          # devstack-tempest job will be switched to use lib/neutron instead of
-          # lib/neutron-legacy
           /$NEUTRON_CORE_PLUGIN_CONF:
             ml2:
               type_drivers: flat,vlan,local,vxlan
diff --git a/zuul.d/train_jobs.yaml b/zuul.d/train_jobs.yaml
index 1cb5801..b9a9921 100644
--- a/zuul.d/train_jobs.yaml
+++ b/zuul.d/train_jobs.yaml
@@ -117,9 +117,6 @@
         ML2_L3_PLUGIN: router
       devstack_local_conf:
         post-config:
-          # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-          # devstack-tempest job will be switched to use lib/neutron instead of
-          # lib/neutron-legacy
           /$NEUTRON_CORE_PLUGIN_CONF:
             AGENT:
               tunnel_types: gre,vxlan
diff --git a/zuul.d/ussuri_jobs.yaml b/zuul.d/ussuri_jobs.yaml
index 8614fa9..d918182 100644
--- a/zuul.d/ussuri_jobs.yaml
+++ b/zuul.d/ussuri_jobs.yaml
@@ -121,9 +121,6 @@
         CUSTOMIZE_IMAGE: false
       devstack_local_conf:
         post-config:
-          # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-          # devstack-tempest job will be switched to use lib/neutron instead of
-          # lib/neutron-legacy
           /$NEUTRON_CORE_PLUGIN_CONF:
             AGENT:
               tunnel_types: gre,vxlan
@@ -254,6 +251,12 @@
         OVN_BRANCH: "v20.03.0"
         # NOTE(slaweq): IGMP Snooping requires OVN 20.12
         OVN_IGMP_SNOOPING_ENABLE: False
+        # NOTE(bcafarel) guestmount binary not available on host OS
+        IMAGE_URLS: https://cloud-images.ubuntu.com/releases/bionic/release/ubuntu-18.04-server-cloudimg-amd64.img
+        ADVANCED_IMAGE_NAME: ubuntu-18.04-server-cloudimg-amd64
+        ADVANCED_INSTANCE_TYPE: ds512M
+        ADVANCED_INSTANCE_USER: ubuntu
+        CUSTOMIZE_IMAGE: false
       devstack_local_conf:
         test-config:
           $TEMPEST_CONFIG:
diff --git a/zuul.d/victoria_jobs.yaml b/zuul.d/victoria_jobs.yaml
index f7cbc3f..d7b2f90 100644
--- a/zuul.d/victoria_jobs.yaml
+++ b/zuul.d/victoria_jobs.yaml
@@ -120,9 +120,6 @@
         CUSTOMIZE_IMAGE: false
       devstack_local_conf:
         post-config:
-          # NOTE(slaweq): We can get rid of this hardcoded absolute path when
-          # devstack-tempest job will be switched to use lib/neutron instead of
-          # lib/neutron-legacy
           /$NEUTRON_CORE_PLUGIN_CONF:
             AGENT:
               tunnel_types: gre,vxlan
diff --git a/zuul.d/zed_jobs.yaml b/zuul.d/zed_jobs.yaml
index acbd234..f6e143f 100644
--- a/zuul.d/zed_jobs.yaml
+++ b/zuul.d/zed_jobs.yaml
@@ -122,7 +122,8 @@
       # bug https://bugzilla.redhat.com/show_bug.cgi?id=1965036 will be fixed
       tempest_exclude_regex: "\
           (^neutron_tempest_plugin.scenario.test_trunk.TrunkTest.test_subport_connectivity)|\
-          (^neutron_tempest_plugin.scenario.test_security_groups.NetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)"
+          (^neutron_tempest_plugin.scenario.test_security_groups.StatefulNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
+          (^neutron_tempest_plugin.scenario.test_security_groups.StatelessNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)"
       network_available_features: *available_features
       devstack_localrc:
         NETWORK_API_EXTENSIONS: "{{ (network_api_extensions_common + network_api_extensions_openvswitch) | join(',') }}"
@@ -146,7 +147,8 @@
           (^tempest.api.compute.servers.test_multiple_create)"
       tempest_exclude_regex: "\
           (^neutron_tempest_plugin.scenario.test_vlan_transparency.VlanTransparencyTest)|\
-          (^neutron_tempest_plugin.scenario.test_security_groups.NetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
+          (^neutron_tempest_plugin.scenario.test_security_groups.StatefulNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
+          (^neutron_tempest_plugin.scenario.test_security_groups.StatelessNetworkSecGroupTest.test_established_tcp_session_after_re_attachinging_sg)|\
           (^neutron_tempest_plugin.scenario.test_floatingip.FloatingIPPortDetailsTest.test_floatingip_port_details)"
       network_available_features: *available_features
       devstack_localrc: