Merge "Add image task client and image tests task APIs."
diff --git a/releasenotes/notes/add-volume-detach-libs-2cbb3ca924aed0ac.yaml b/releasenotes/notes/add-volume-detach-libs-2cbb3ca924aed0ac.yaml
new file mode 100644
index 0000000..30127b3
--- /dev/null
+++ b/releasenotes/notes/add-volume-detach-libs-2cbb3ca924aed0ac.yaml
@@ -0,0 +1,5 @@
+---
+features:
+  - |
+    Add delete_attachment to the v3 AttachmentsClient and terminate_connection
+    to the v3 VolumesClient.
diff --git a/releasenotes/notes/end-of-support-of-xena-2e747cff7f8bc48a.yaml b/releasenotes/notes/end-of-support-of-xena-2e747cff7f8bc48a.yaml
new file mode 100644
index 0000000..39f6866
--- /dev/null
+++ b/releasenotes/notes/end-of-support-of-xena-2e747cff7f8bc48a.yaml
@@ -0,0 +1,12 @@
+---
+prelude: >
+    This is an intermediate release during the 2023.2 development cycle to
+    mark the end of support for EM Xena release in Tempest.
+    After this release, Tempest will support below OpenStack Releases:
+
+    * 2023.1
+    * Zed
+    * Yoga
+
+    Current development of Tempest is for OpenStack 2023.2 development
+    cycle.
diff --git a/releasenotes/source/index.rst b/releasenotes/source/index.rst
index 882413f..4c1edd5 100644
--- a/releasenotes/source/index.rst
+++ b/releasenotes/source/index.rst
@@ -6,6 +6,7 @@
    :maxdepth: 1
 
    unreleased
+   v34.2.0
    v34.0.0
    v33.0.0
    v32.0.0
diff --git a/releasenotes/source/v34.2.0.rst b/releasenotes/source/v34.2.0.rst
new file mode 100644
index 0000000..386cf71
--- /dev/null
+++ b/releasenotes/source/v34.2.0.rst
@@ -0,0 +1,6 @@
+=====================
+v34.2.0 Release Notes
+=====================
+
+.. release-notes:: 34.2.0 Release Notes
+   :version: 34.2.0
diff --git a/roles/run-tempest-26/tasks/main.yaml b/roles/run-tempest-26/tasks/main.yaml
index 7423bfb..7ad5c99 100644
--- a/roles/run-tempest-26/tasks/main.yaml
+++ b/roles/run-tempest-26/tasks/main.yaml
@@ -17,7 +17,7 @@
 
 - name: Limit max concurrency when more than 3 vcpus are available
   set_fact:
-    default_concurrency: "{{ num_cores|int // 2 }}"
+    default_concurrency: "{{ num_cores|int - 2 }}"
   when: num_cores|int > 3
 
 - name: Override target branch
diff --git a/roles/run-tempest/tasks/main.yaml b/roles/run-tempest/tasks/main.yaml
index 3fb494f..3d78557 100644
--- a/roles/run-tempest/tasks/main.yaml
+++ b/roles/run-tempest/tasks/main.yaml
@@ -17,7 +17,7 @@
 
 - name: Limit max concurrency when more than 3 vcpus are available
   set_fact:
-    default_concurrency: "{{ num_cores|int // 2 }}"
+    default_concurrency: "{{ num_cores|int - 2 }}"
   when: num_cores|int > 3
 
 - name: Override target branch
diff --git a/tempest/api/compute/admin/test_live_migration.py b/tempest/api/compute/admin/test_live_migration.py
index f7c0dd9..a02820a 100644
--- a/tempest/api/compute/admin/test_live_migration.py
+++ b/tempest/api/compute/admin/test_live_migration.py
@@ -258,6 +258,7 @@
         port = self.ports_client.show_port(port_id)['port']
         return port['status'] == 'ACTIVE'
 
+    @decorators.unstable_test(bug='2027605')
     @decorators.attr(type='multinode')
     @decorators.idempotent_id('0022c12e-a482-42b0-be2d-396b5f0cffe3')
     @utils.requires_ext(service='network', extension='trunk')
@@ -303,13 +304,17 @@
     min_microversion = '2.6'
     max_microversion = 'latest'
 
+    @classmethod
+    def skip_checks(cls):
+        super(LiveMigrationRemoteConsolesV26Test, cls).skip_checks()
+        if not CONF.compute_feature_enabled.serial_console:
+            skip_msg = ("Serial console not supported.")
+            raise cls.skipException(skip_msg)
+        if not compute.is_scheduler_filter_enabled("DifferentHostFilter"):
+            raise cls.skipException("DifferentHostFilter is not available.")
+
     @decorators.attr(type='multinode')
     @decorators.idempotent_id('6190af80-513e-4f0f-90f2-9714e84955d7')
-    @testtools.skipUnless(CONF.compute_feature_enabled.serial_console,
-                          'Serial console not supported.')
-    @testtools.skipUnless(
-        compute.is_scheduler_filter_enabled("DifferentHostFilter"),
-        'DifferentHostFilter is not available.')
     def test_live_migration_serial_console(self):
         """Test the live-migration of an instance which has a serial console
 
diff --git a/tempest/api/compute/admin/test_networks.py b/tempest/api/compute/admin/test_networks.py
index fb6376e..d7fb62d 100644
--- a/tempest/api/compute/admin/test_networks.py
+++ b/tempest/api/compute/admin/test_networks.py
@@ -64,5 +64,5 @@
             configured_network = CONF.compute.fixed_network_name
             self.assertIn(configured_network, [x['label'] for x in networks])
         else:
-            network_labels = [x['label'] for x in networks]
-            self.assertNotEmpty(network_labels)
+            raise self.skipException(
+                "Environment has no known-for-sure existing network.")
diff --git a/tempest/api/compute/admin/test_servers.py b/tempest/api/compute/admin/test_servers.py
index bc00f8c..321078c 100644
--- a/tempest/api/compute/admin/test_servers.py
+++ b/tempest/api/compute/admin/test_servers.py
@@ -25,6 +25,8 @@
 class ServersAdminTestJSON(base.BaseV2ComputeAdminTest):
     """Tests Servers API using admin privileges"""
 
+    create_default_network = True
+
     @classmethod
     def setup_clients(cls):
         super(ServersAdminTestJSON, cls).setup_clients()
diff --git a/tempest/api/compute/admin/test_volume_swap.py b/tempest/api/compute/admin/test_volume_swap.py
index 36148c5..9576b74 100644
--- a/tempest/api/compute/admin/test_volume_swap.py
+++ b/tempest/api/compute/admin/test_volume_swap.py
@@ -13,7 +13,6 @@
 import time
 
 from tempest.api.compute import base
-from tempest.common import utils
 from tempest.common import waiters
 from tempest import config
 from tempest.lib import decorators
@@ -33,6 +32,8 @@
     @classmethod
     def skip_checks(cls):
         super(TestVolumeSwapBase, cls).skip_checks()
+        if not CONF.service_available.cinder:
+            raise cls.skipException("Cinder is not available")
         if not CONF.compute_feature_enabled.swap_volume:
             raise cls.skipException("Swapping volumes is not supported.")
 
@@ -81,7 +82,6 @@
     # so it's marked as such.
     @decorators.attr(type='slow')
     @decorators.idempotent_id('1769f00d-a693-4d67-a631-6a3496773813')
-    @utils.services('volume')
     def test_volume_swap(self):
         """Test swapping of volume attached to server with admin user
 
@@ -183,7 +183,6 @@
     # multiple computes but that would just side-step the underlying bug.
     @decorators.skip_because(bug='1807723',
                              condition=CONF.compute.min_compute_nodes > 1)
-    @utils.services('volume')
     def test_volume_swap_with_multiattach(self):
         """Test swapping volume attached to multiple servers
 
diff --git a/tempest/api/compute/base.py b/tempest/api/compute/base.py
index b1bfac7..9aa5d35 100644
--- a/tempest/api/compute/base.py
+++ b/tempest/api/compute/base.py
@@ -51,6 +51,9 @@
         super(BaseV2ComputeTest, cls).skip_checks()
         if not CONF.service_available.nova:
             raise cls.skipException("Nova is not available")
+        if cls.create_default_network and not CONF.service_available.neutron:
+            raise cls.skipException("Neutron is not available")
+
         api_version_utils.check_skip_with_microversion(
             cls.min_microversion, cls.max_microversion,
             CONF.compute.min_microversion, CONF.compute.max_microversion)
@@ -326,18 +329,18 @@
             body['id'])
         return body
 
-    def wait_for(self, condition):
+    def wait_for(self, condition, *args):
         """Repeatedly calls condition() until a timeout."""
         start_time = int(time.time())
         while True:
             try:
-                condition()
+                condition(*args)
             except Exception:
                 pass
             else:
                 return
             if int(time.time()) - start_time >= self.build_timeout:
-                condition()
+                condition(*args)
                 return
             time.sleep(self.build_interval)
 
@@ -524,6 +527,8 @@
         """Create a volume and wait for it to become 'available'.
 
         :param image_ref: Specify an image id to create a bootable volume.
+        :param wait_for_available: Wait until the volume becomes available
+               before returning
         :param kwargs: other parameters to create volume.
         :returns: The available volume.
         """
@@ -534,6 +539,7 @@
             kwargs['display_name'] = vol_name
         if image_ref is not None:
             kwargs['imageRef'] = image_ref
+        wait = kwargs.pop('wait_for_available', True)
         if CONF.volume.volume_type and 'volume_type' not in kwargs:
             # If volume_type is not provided in config then no need to
             # add a volume type and
@@ -549,8 +555,9 @@
         cls.addClassResourceCleanup(test_utils.call_and_ignore_notfound_exc,
                                     cls.volumes_client.delete_volume,
                                     volume['id'])
-        waiters.wait_for_volume_resource_status(cls.volumes_client,
-                                                volume['id'], 'available')
+        if wait:
+            waiters.wait_for_volume_resource_status(cls.volumes_client,
+                                                    volume['id'], 'available')
         return volume
 
     def _detach_volume(self, server, volume):
diff --git a/tempest/api/compute/images/test_image_metadata_negative.py b/tempest/api/compute/images/test_image_metadata_negative.py
index b9806c7..33a59ae 100644
--- a/tempest/api/compute/images/test_image_metadata_negative.py
+++ b/tempest/api/compute/images/test_image_metadata_negative.py
@@ -14,10 +14,13 @@
 #    under the License.
 
 from tempest.api.compute import base
+from tempest import config
 from tempest.lib.common.utils import data_utils
 from tempest.lib import decorators
 from tempest.lib import exceptions as lib_exc
 
+CONF = config.CONF
+
 
 class ImagesMetadataNegativeTestJSON(base.BaseV2ComputeTest):
     """Negative tests of image metadata
@@ -28,6 +31,13 @@
     max_microversion = '2.38'
 
     @classmethod
+    def skip_checks(cls):
+        super(ImagesMetadataNegativeTestJSON, cls).skip_checks()
+        if not CONF.service_available.glance:
+            skip_msg = ("%s skipped as glance is not available" % cls.__name__)
+            raise cls.skipException(skip_msg)
+
+    @classmethod
     def setup_clients(cls):
         super(ImagesMetadataNegativeTestJSON, cls).setup_clients()
         cls.client = cls.compute_images_client
diff --git a/tempest/api/compute/images/test_images_oneserver.py b/tempest/api/compute/images/test_images_oneserver.py
index 23f8326..2b859da 100644
--- a/tempest/api/compute/images/test_images_oneserver.py
+++ b/tempest/api/compute/images/test_images_oneserver.py
@@ -24,6 +24,8 @@
 class ImagesOneServerTestJSON(base.BaseV2ComputeTest):
     """Test server images API"""
 
+    create_default_network = True
+
     @classmethod
     def resource_setup(cls):
         super(ImagesOneServerTestJSON, cls).resource_setup()
diff --git a/tempest/api/compute/servers/test_attach_interfaces.py b/tempest/api/compute/servers/test_attach_interfaces.py
index efecd6c..9b6bf84 100644
--- a/tempest/api/compute/servers/test_attach_interfaces.py
+++ b/tempest/api/compute/servers/test_attach_interfaces.py
@@ -295,8 +295,8 @@
     def test_reassign_port_between_servers(self):
         """Tests reassigning port between servers
 
-        1. Create a port in Neutron.
-        2. Create two servers in Nova.
+        1. Create two servers in Nova.
+        2. Create a port in Neutron.
         3. Attach the port to the first server.
         4. Detach the port from the first server.
         5. Attach the port to the second server.
@@ -304,11 +304,6 @@
         """
         network = self.get_tenant_network()
         network_id = network['id']
-        port = self.ports_client.create_port(
-            network_id=network_id,
-            name=data_utils.rand_name(self.__class__.__name__))
-        port_id = port['port']['id']
-        self.addCleanup(self.ports_client.delete_port, port_id)
 
         # NOTE(artom) We create two servers one at a time because
         # create_test_server doesn't support multiple validatable servers.
@@ -318,12 +313,21 @@
         def _create_validatable_server():
             _, servers = compute.create_test_server(
                 self.os_primary, tenant_network=network,
-                wait_until='ACTIVE', validatable=True,
+                validatable=True,
                 validation_resources=validation_resources)
             return servers[0]
 
+        # NOTE(danms): We create these with no waiters because we will wait
+        # for them to be validatable (i.e. SSHABLE) below. That way some of
+        # the server creation overlap each other and with create_port.
         servers = [_create_validatable_server(), _create_validatable_server()]
 
+        port = self.ports_client.create_port(
+            network_id=network_id,
+            name=data_utils.rand_name(self.__class__.__name__))
+        port_id = port['port']['id']
+        self.addCleanup(self.ports_client.delete_port, port_id)
+
         # add our cleanups for the servers since we bypassed the base class
         for server in servers:
             self.addCleanup(self.delete_server, server['id'])
@@ -332,7 +336,9 @@
             # NOTE(mgoddard): Get detailed server to ensure addresses are
             # present in fixed IP case.
             server = self.servers_client.show_server(server['id'])['server']
-            self._wait_for_validation(server, validation_resources)
+            compute.wait_for_ssh_or_ping(server, self.os_primary, network,
+                                         True, validation_resources,
+                                         'SSHABLE', True)
             # attach the port to the server
             iface = self.interfaces_client.create_interface(
                 server['id'], port_id=port_id)['interfaceAttachment']
diff --git a/tempest/api/compute/servers/test_server_actions.py b/tempest/api/compute/servers/test_server_actions.py
index e1e7fda..81f9e55 100644
--- a/tempest/api/compute/servers/test_server_actions.py
+++ b/tempest/api/compute/servers/test_server_actions.py
@@ -34,13 +34,13 @@
 LOG = logging.getLogger(__name__)
 
 
-class ServerActionsTestJSON(base.BaseV2ComputeTest):
+class ServerActionsBase(base.BaseV2ComputeTest):
     """Test server actions"""
 
     def setUp(self):
         # NOTE(afazekas): Normally we use the same server with all test cases,
         # but if it has an issue, we build a new one
-        super(ServerActionsTestJSON, self).setUp()
+        super().setUp()
         # Check if the server is in a clean state after test
         try:
             self.validation_resources = self.get_class_validation_resources(
@@ -73,7 +73,7 @@
                 self.server_id, validatable=True, wait_until='SSHABLE')
 
     def tearDown(self):
-        super(ServerActionsTestJSON, self).tearDown()
+        super(ServerActionsBase, self).tearDown()
         # NOTE(zhufl): Because server_check_teardown will raise Exception
         # which will prevent other cleanup steps from being executed, so
         # server_check_teardown should be called after super's tearDown.
@@ -82,51 +82,19 @@
     @classmethod
     def setup_credentials(cls):
         cls.prepare_instance_network()
-        super(ServerActionsTestJSON, cls).setup_credentials()
+        super(ServerActionsBase, cls).setup_credentials()
 
     @classmethod
     def setup_clients(cls):
-        super(ServerActionsTestJSON, cls).setup_clients()
+        super(ServerActionsBase, cls).setup_clients()
         cls.client = cls.servers_client
 
     @classmethod
     def resource_setup(cls):
-        super(ServerActionsTestJSON, cls).resource_setup()
+        super(ServerActionsBase, cls).resource_setup()
         cls.server_id = cls.recreate_server(None, validatable=True,
                                             wait_until='SSHABLE')
 
-    @decorators.idempotent_id('6158df09-4b82-4ab3-af6d-29cf36af858d')
-    @testtools.skipUnless(CONF.compute_feature_enabled.change_password,
-                          'Change password not available.')
-    def test_change_server_password(self):
-        """Test changing server's password
-
-        The server's password should be set to the provided password and
-        the user can authenticate with the new password.
-        """
-        # Since this test messes with the password and makes the
-        # server unreachable, it should create its own server
-        newserver = self.create_test_server(
-            validatable=True,
-            validation_resources=self.validation_resources,
-            wait_until='ACTIVE')
-        self.addCleanup(self.delete_server, newserver['id'])
-        # The server's password should be set to the provided password
-        new_password = 'Newpass1234'
-        self.client.change_password(newserver['id'], adminPass=new_password)
-        waiters.wait_for_server_status(self.client, newserver['id'], 'ACTIVE')
-
-        if CONF.validation.run_validation:
-            # Verify that the user can authenticate with the new password
-            server = self.client.show_server(newserver['id'])['server']
-            linux_client = remote_client.RemoteClient(
-                self.get_server_ip(server, self.validation_resources),
-                self.ssh_user,
-                new_password,
-                server=server,
-                servers_client=self.client)
-            linux_client.validate_authentication()
-
     def _test_reboot_server(self, reboot_type):
         if CONF.validation.run_validation:
             # Get the time the server was last rebooted,
@@ -159,45 +127,6 @@
             self.assertGreater(new_boot_time, boot_time,
                                '%s > %s' % (new_boot_time, boot_time))
 
-    @decorators.attr(type='smoke')
-    @decorators.idempotent_id('2cb1baf6-ac8d-4429-bf0d-ba8a0ba53e32')
-    def test_reboot_server_hard(self):
-        """Test hard rebooting server
-
-        The server should be power cycled.
-        """
-        self._test_reboot_server('HARD')
-
-    @decorators.idempotent_id('1d1c9104-1b0a-11e7-a3d4-fa163e65f5ce')
-    def test_remove_server_all_security_groups(self):
-        """Test removing all security groups from server"""
-        server = self.create_test_server(wait_until='ACTIVE')
-
-        # Remove all Security group
-        self.client.remove_security_group(
-            server['id'], name=server['security_groups'][0]['name'])
-
-        # Verify all Security group
-        server = self.client.show_server(server['id'])['server']
-        self.assertNotIn('security_groups', server)
-
-    def _rebuild_server_and_check(self, image_ref, server):
-        rebuilt_server = (self.client.rebuild_server(server['id'], image_ref)
-                          ['server'])
-        if CONF.validation.run_validation:
-            tenant_network = self.get_tenant_network()
-            compute.wait_for_ssh_or_ping(
-                server, self.os_primary, tenant_network,
-                True, self.validation_resources, "SSHABLE", True)
-        else:
-            waiters.wait_for_server_status(self.client, server['id'],
-                                           'ACTIVE')
-
-        msg = ('Server was not rebuilt to the original image. '
-               'The original image: {0}. The current image: {1}'
-               .format(image_ref, rebuilt_server['image']['id']))
-        self.assertEqual(image_ref, rebuilt_server['image']['id'], msg)
-
     def _test_rebuild_server(self):
         # Get the IPs the server has before rebuilding it
         original_addresses = (self.client.show_server(self.server_id)['server']
@@ -250,6 +179,108 @@
                 servers_client=self.client)
             linux_client.validate_authentication()
 
+    def _test_resize_server_confirm(self, server_id, stop=False):
+        # The server's RAM and disk space should be modified to that of
+        # the provided flavor
+
+        if stop:
+            self.client.stop_server(server_id)
+            waiters.wait_for_server_status(self.client, server_id,
+                                           'SHUTOFF')
+
+        self.client.resize_server(server_id, self.flavor_ref_alt)
+        # NOTE(jlk): Explicitly delete the server to get a new one for later
+        # tests. Avoids resize down race issues.
+        self.addCleanup(self.delete_server, server_id)
+        waiters.wait_for_server_status(self.client, server_id,
+                                       'VERIFY_RESIZE')
+
+        self.client.confirm_resize_server(server_id)
+        expected_status = 'SHUTOFF' if stop else 'ACTIVE'
+        waiters.wait_for_server_status(self.client, server_id,
+                                       expected_status)
+
+        server = self.client.show_server(server_id)['server']
+        self.assert_flavor_equal(self.flavor_ref_alt, server['flavor'])
+
+        if stop:
+            # NOTE(mriedem): tearDown requires the server to be started.
+            self.client.start_server(server_id)
+
+    def _get_output(self, server_id):
+        output = self.client.get_console_output(
+            server_id, length=3)['output']
+        self.assertTrue(output, "Console output was empty.")
+        lines = len(output.split('\n'))
+        self.assertEqual(lines, 3)
+
+    def _validate_url(self, url):
+        valid_scheme = ['http', 'https']
+        parsed_url = urlparse.urlparse(url)
+        self.assertNotEqual('None', parsed_url.port)
+        self.assertNotEqual('None', parsed_url.hostname)
+        self.assertIn(parsed_url.scheme, valid_scheme)
+
+    def _rebuild_server_and_check(self, image_ref, server):
+        rebuilt_server = (self.client.rebuild_server(server['id'], image_ref)
+                          ['server'])
+        if CONF.validation.run_validation:
+            tenant_network = self.get_tenant_network()
+            compute.wait_for_ssh_or_ping(
+                server, self.os_primary, tenant_network,
+                True, self.validation_resources, "SSHABLE", True)
+        else:
+            waiters.wait_for_server_status(self.client, server['id'],
+                                           'ACTIVE')
+
+        msg = ('Server was not rebuilt to the original image. '
+               'The original image: {0}. The current image: {1}'
+               .format(image_ref, rebuilt_server['image']['id']))
+        self.assertEqual(image_ref, rebuilt_server['image']['id'], msg)
+
+
+class ServerActionsTestJSON(ServerActionsBase):
+    @decorators.idempotent_id('6158df09-4b82-4ab3-af6d-29cf36af858d')
+    @testtools.skipUnless(CONF.compute_feature_enabled.change_password,
+                          'Change password not available.')
+    def test_change_server_password(self):
+        """Test changing server's password
+
+        The server's password should be set to the provided password and
+        the user can authenticate with the new password.
+        """
+        # Since this test messes with the password and makes the
+        # server unreachable, it should create its own server
+        newserver = self.create_test_server(
+            validatable=True,
+            validation_resources=self.validation_resources,
+            wait_until='ACTIVE')
+        self.addCleanup(self.delete_server, newserver['id'])
+        # The server's password should be set to the provided password
+        new_password = 'Newpass1234'
+        self.client.change_password(newserver['id'], adminPass=new_password)
+        waiters.wait_for_server_status(self.client, newserver['id'], 'ACTIVE')
+
+        if CONF.validation.run_validation:
+            # Verify that the user can authenticate with the new password
+            server = self.client.show_server(newserver['id'])['server']
+            linux_client = remote_client.RemoteClient(
+                self.get_server_ip(server, self.validation_resources),
+                self.ssh_user,
+                new_password,
+                server=server,
+                servers_client=self.client)
+            linux_client.validate_authentication()
+
+    @decorators.attr(type='smoke')
+    @decorators.idempotent_id('2cb1baf6-ac8d-4429-bf0d-ba8a0ba53e32')
+    def test_reboot_server_hard(self):
+        """Test hard rebooting server
+
+        The server should be power cycled.
+        """
+        self._test_reboot_server('HARD')
+
     @decorators.idempotent_id('aaa6cdf3-55a7-461a-add9-1c8596b9a07c')
     def test_rebuild_server(self):
         """Test rebuilding server
@@ -258,6 +289,120 @@
         """
         self._test_rebuild_server()
 
+    @decorators.idempotent_id('1499262a-9328-4eda-9068-db1ac57498d2')
+    @testtools.skipUnless(CONF.compute_feature_enabled.resize,
+                          'Resize not available.')
+    def test_resize_server_confirm(self):
+        """Test resizing server and then confirming"""
+        self._test_resize_server_confirm(self.server_id, stop=False)
+
+    @decorators.idempotent_id('c03aab19-adb1-44f5-917d-c419577e9e68')
+    @testtools.skipUnless(CONF.compute_feature_enabled.resize,
+                          'Resize not available.')
+    def test_resize_server_revert(self):
+        """Test resizing server and then reverting
+
+        The server's RAM and disk space should return to its original
+        values after a resize is reverted.
+        """
+
+        self.client.resize_server(self.server_id, self.flavor_ref_alt)
+        # NOTE(zhufl): Explicitly delete the server to get a new one for later
+        # tests. Avoids resize down race issues.
+        self.addCleanup(self.delete_server, self.server_id)
+        waiters.wait_for_server_status(self.client, self.server_id,
+                                       'VERIFY_RESIZE')
+
+        self.client.revert_resize_server(self.server_id)
+        waiters.wait_for_server_status(self.client, self.server_id, 'ACTIVE')
+
+        server = self.client.show_server(self.server_id)['server']
+        self.assert_flavor_equal(self.flavor_ref, server['flavor'])
+
+    @decorators.idempotent_id('4b8867e6-fffa-4d54-b1d1-6fdda57be2f3')
+    @testtools.skipUnless(CONF.compute_feature_enabled.console_output,
+                          'Console output not supported.')
+    def test_get_console_output(self):
+        """Test getting console output for a server
+
+        Should be able to GET the console output for a given server_id and
+        number of lines.
+        """
+
+        # This reboot is necessary for outputting some console log after
+        # creating an instance backup. If an instance backup, the console
+        # log file is truncated and we cannot get any console log through
+        # "console-log" API.
+        # The detail is https://bugs.launchpad.net/nova/+bug/1251920
+        self.reboot_server(self.server_id, type='HARD')
+        self.wait_for(self._get_output, self.server_id)
+
+    @decorators.idempotent_id('bd61a9fd-062f-4670-972b-2d6c3e3b9e73')
+    @testtools.skipUnless(CONF.compute_feature_enabled.pause,
+                          'Pause is not available.')
+    def test_pause_unpause_server(self):
+        """Test pausing and unpausing server"""
+        self.client.pause_server(self.server_id)
+        waiters.wait_for_server_status(self.client, self.server_id, 'PAUSED')
+        self.client.unpause_server(self.server_id)
+        waiters.wait_for_server_status(self.client, self.server_id, 'ACTIVE')
+
+    @decorators.idempotent_id('0d8ee21e-b749-462d-83da-b85b41c86c7f')
+    @testtools.skipUnless(CONF.compute_feature_enabled.suspend,
+                          'Suspend is not available.')
+    def test_suspend_resume_server(self):
+        """Test suspending and resuming server"""
+        self.client.suspend_server(self.server_id)
+        waiters.wait_for_server_status(self.client, self.server_id,
+                                       'SUSPENDED')
+        self.client.resume_server(self.server_id)
+        waiters.wait_for_server_status(self.client, self.server_id, 'ACTIVE')
+
+    @decorators.idempotent_id('af8eafd4-38a7-4a4b-bdbc-75145a580560')
+    def test_stop_start_server(self):
+        """Test stopping and starting server"""
+        self.client.stop_server(self.server_id)
+        waiters.wait_for_server_status(self.client, self.server_id, 'SHUTOFF')
+        self.client.start_server(self.server_id)
+        waiters.wait_for_server_status(self.client, self.server_id, 'ACTIVE')
+
+    @decorators.idempotent_id('80a8094c-211e-440a-ab88-9e59d556c7ee')
+    def test_lock_unlock_server(self):
+        """Test locking and unlocking server
+
+        Lock the server, and trying to stop it will fail because locked
+        server is not allowed to be stopped by non-admin user.
+        Then unlock the server, now the server can be stopped and started.
+        """
+        # Lock the server,try server stop(exceptions throw),unlock it and retry
+        self.client.lock_server(self.server_id)
+        self.addCleanup(self.client.unlock_server, self.server_id)
+        server = self.client.show_server(self.server_id)['server']
+        self.assertEqual(server['status'], 'ACTIVE')
+        # Locked server is not allowed to be stopped by non-admin user
+        self.assertRaises(lib_exc.Conflict,
+                          self.client.stop_server, self.server_id)
+        self.client.unlock_server(self.server_id)
+        self.client.stop_server(self.server_id)
+        waiters.wait_for_server_status(self.client, self.server_id, 'SHUTOFF')
+        self.client.start_server(self.server_id)
+        waiters.wait_for_server_status(self.client, self.server_id, 'ACTIVE')
+
+
+class ServerActionsTestOtherA(ServerActionsBase):
+    @decorators.idempotent_id('1d1c9104-1b0a-11e7-a3d4-fa163e65f5ce')
+    def test_remove_server_all_security_groups(self):
+        """Test removing all security groups from server"""
+        server = self.create_test_server(wait_until='ACTIVE')
+
+        # Remove all Security group
+        self.client.remove_security_group(
+            server['id'], name=server['security_groups'][0]['name'])
+
+        # Verify all Security group
+        server = self.client.show_server(server['id'])['server']
+        self.assertNotIn('security_groups', server)
+
     @decorators.idempotent_id('30449a88-5aff-4f9b-9866-6ee9b17f906d')
     def test_rebuild_server_in_stop_state(self):
         """Test rebuilding server in stop state
@@ -330,41 +475,6 @@
                 servers_client=self.client)
             linux_client.validate_authentication()
 
-    def _test_resize_server_confirm(self, server_id, stop=False):
-        # The server's RAM and disk space should be modified to that of
-        # the provided flavor
-
-        if stop:
-            self.client.stop_server(server_id)
-            waiters.wait_for_server_status(self.client, server_id,
-                                           'SHUTOFF')
-
-        self.client.resize_server(server_id, self.flavor_ref_alt)
-        # NOTE(jlk): Explicitly delete the server to get a new one for later
-        # tests. Avoids resize down race issues.
-        self.addCleanup(self.delete_server, server_id)
-        waiters.wait_for_server_status(self.client, server_id,
-                                       'VERIFY_RESIZE')
-
-        self.client.confirm_resize_server(server_id)
-        expected_status = 'SHUTOFF' if stop else 'ACTIVE'
-        waiters.wait_for_server_status(self.client, server_id,
-                                       expected_status)
-
-        server = self.client.show_server(server_id)['server']
-        self.assert_flavor_equal(self.flavor_ref_alt, server['flavor'])
-
-        if stop:
-            # NOTE(mriedem): tearDown requires the server to be started.
-            self.client.start_server(server_id)
-
-    @decorators.idempotent_id('1499262a-9328-4eda-9068-db1ac57498d2')
-    @testtools.skipUnless(CONF.compute_feature_enabled.resize,
-                          'Resize not available.')
-    def test_resize_server_confirm(self):
-        """Test resizing server and then confirming"""
-        self._test_resize_server_confirm(self.server_id, stop=False)
-
     @decorators.idempotent_id('e6c28180-7454-4b59-b188-0257af08a63b')
     @decorators.related_bug('1728603')
     @testtools.skipUnless(CONF.compute_feature_enabled.resize,
@@ -402,6 +512,8 @@
                 servers_client=self.client)
             linux_client.validate_authentication()
 
+
+class ServerActionsTestOtherB(ServerActionsBase):
     @decorators.idempotent_id('138b131d-66df-48c9-a171-64f45eb92962')
     @testtools.skipUnless(CONF.compute_feature_enabled.resize,
                           'Resize not available.')
@@ -409,29 +521,6 @@
         """Test resizing a stopped server and then confirming"""
         self._test_resize_server_confirm(self.server_id, stop=True)
 
-    @decorators.idempotent_id('c03aab19-adb1-44f5-917d-c419577e9e68')
-    @testtools.skipUnless(CONF.compute_feature_enabled.resize,
-                          'Resize not available.')
-    def test_resize_server_revert(self):
-        """Test resizing server and then reverting
-
-        The server's RAM and disk space should return to its original
-        values after a resize is reverted.
-        """
-
-        self.client.resize_server(self.server_id, self.flavor_ref_alt)
-        # NOTE(zhufl): Explicitly delete the server to get a new one for later
-        # tests. Avoids resize down race issues.
-        self.addCleanup(self.delete_server, self.server_id)
-        waiters.wait_for_server_status(self.client, self.server_id,
-                                       'VERIFY_RESIZE')
-
-        self.client.revert_resize_server(self.server_id)
-        waiters.wait_for_server_status(self.client, self.server_id, 'ACTIVE')
-
-        server = self.client.show_server(self.server_id)['server']
-        self.assert_flavor_equal(self.flavor_ref, server['flavor'])
-
     @decorators.idempotent_id('fbbf075f-a812-4022-bc5c-ccb8047eef12')
     @decorators.related_bug('1737599')
     @testtools.skipUnless(CONF.compute_feature_enabled.resize,
@@ -595,31 +684,6 @@
         self.assertEqual((backup2, backup3),
                          (image_list[0]['name'], image_list[1]['name']))
 
-    def _get_output(self):
-        output = self.client.get_console_output(
-            self.server_id, length=3)['output']
-        self.assertTrue(output, "Console output was empty.")
-        lines = len(output.split('\n'))
-        self.assertEqual(lines, 3)
-
-    @decorators.idempotent_id('4b8867e6-fffa-4d54-b1d1-6fdda57be2f3')
-    @testtools.skipUnless(CONF.compute_feature_enabled.console_output,
-                          'Console output not supported.')
-    def test_get_console_output(self):
-        """Test getting console output for a server
-
-        Should be able to GET the console output for a given server_id and
-        number of lines.
-        """
-
-        # This reboot is necessary for outputting some console log after
-        # creating an instance backup. If an instance backup, the console
-        # log file is truncated and we cannot get any console log through
-        # "console-log" API.
-        # The detail is https://bugs.launchpad.net/nova/+bug/1251920
-        self.reboot_server(self.server_id, type='HARD')
-        self.wait_for(self._get_output)
-
     @decorators.idempotent_id('89104062-69d8-4b19-a71b-f47b7af093d7')
     @testtools.skipUnless(CONF.compute_feature_enabled.console_output,
                           'Console output not supported.')
@@ -643,6 +707,7 @@
 
         self.wait_for(_check_full_length_console_log)
 
+    @decorators.skip_because(bug='2028851')
     @decorators.idempotent_id('5b65d4e7-4ecd-437c-83c0-d6b79d927568')
     @testtools.skipUnless(CONF.compute_feature_enabled.console_output,
                           'Console output not supported.')
@@ -661,28 +726,7 @@
 
         self.client.stop_server(temp_server_id)
         waiters.wait_for_server_status(self.client, temp_server_id, 'SHUTOFF')
-        self.wait_for(self._get_output)
-
-    @decorators.idempotent_id('bd61a9fd-062f-4670-972b-2d6c3e3b9e73')
-    @testtools.skipUnless(CONF.compute_feature_enabled.pause,
-                          'Pause is not available.')
-    def test_pause_unpause_server(self):
-        """Test pausing and unpausing server"""
-        self.client.pause_server(self.server_id)
-        waiters.wait_for_server_status(self.client, self.server_id, 'PAUSED')
-        self.client.unpause_server(self.server_id)
-        waiters.wait_for_server_status(self.client, self.server_id, 'ACTIVE')
-
-    @decorators.idempotent_id('0d8ee21e-b749-462d-83da-b85b41c86c7f')
-    @testtools.skipUnless(CONF.compute_feature_enabled.suspend,
-                          'Suspend is not available.')
-    def test_suspend_resume_server(self):
-        """Test suspending and resuming server"""
-        self.client.suspend_server(self.server_id)
-        waiters.wait_for_server_status(self.client, self.server_id,
-                                       'SUSPENDED')
-        self.client.resume_server(self.server_id)
-        waiters.wait_for_server_status(self.client, self.server_id, 'ACTIVE')
+        self.wait_for(self._get_output, temp_server_id)
 
     @decorators.idempotent_id('77eba8e0-036e-4635-944b-f7a8f3b78dc9')
     @testtools.skipUnless(CONF.compute_feature_enabled.shelve,
@@ -701,12 +745,6 @@
         compute.shelve_server(self.client, self.server_id,
                               force_shelve_offload=True)
 
-        def _unshelve_server():
-            server_info = self.client.show_server(self.server_id)['server']
-            if 'SHELVED' in server_info['status']:
-                self.client.unshelve_server(self.server_id)
-        self.addCleanup(_unshelve_server)
-
         server = self.client.show_server(self.server_id)['server']
         image_name = server['name'] + '-shelved'
         params = {'name': image_name}
@@ -718,8 +756,13 @@
         self.assertEqual(1, len(images))
         self.assertEqual(image_name, images[0]['name'])
 
-        self.client.unshelve_server(self.server_id)
-        waiters.wait_for_server_status(self.client, self.server_id, 'ACTIVE')
+        body = self.client.unshelve_server(self.server_id)
+        waiters.wait_for_server_status(
+            self.client,
+            self.server_id,
+            "ACTIVE",
+            request_id=body.response["x-openstack-request-id"],
+        )
         glance_client.wait_for_resource_deletion(images[0]['id'])
 
     @decorators.idempotent_id('8cf9f450-a871-42cf-9bef-77eba189c0b0')
@@ -737,43 +780,6 @@
         compute.shelve_server(self.client, server['id'],
                               force_shelve_offload=True)
 
-    @decorators.idempotent_id('af8eafd4-38a7-4a4b-bdbc-75145a580560')
-    def test_stop_start_server(self):
-        """Test stopping and starting server"""
-        self.client.stop_server(self.server_id)
-        waiters.wait_for_server_status(self.client, self.server_id, 'SHUTOFF')
-        self.client.start_server(self.server_id)
-        waiters.wait_for_server_status(self.client, self.server_id, 'ACTIVE')
-
-    @decorators.idempotent_id('80a8094c-211e-440a-ab88-9e59d556c7ee')
-    def test_lock_unlock_server(self):
-        """Test locking and unlocking server
-
-        Lock the server, and trying to stop it will fail because locked
-        server is not allowed to be stopped by non-admin user.
-        Then unlock the server, now the server can be stopped and started.
-        """
-        # Lock the server,try server stop(exceptions throw),unlock it and retry
-        self.client.lock_server(self.server_id)
-        self.addCleanup(self.client.unlock_server, self.server_id)
-        server = self.client.show_server(self.server_id)['server']
-        self.assertEqual(server['status'], 'ACTIVE')
-        # Locked server is not allowed to be stopped by non-admin user
-        self.assertRaises(lib_exc.Conflict,
-                          self.client.stop_server, self.server_id)
-        self.client.unlock_server(self.server_id)
-        self.client.stop_server(self.server_id)
-        waiters.wait_for_server_status(self.client, self.server_id, 'SHUTOFF')
-        self.client.start_server(self.server_id)
-        waiters.wait_for_server_status(self.client, self.server_id, 'ACTIVE')
-
-    def _validate_url(self, url):
-        valid_scheme = ['http', 'https']
-        parsed_url = urlparse.urlparse(url)
-        self.assertNotEqual('None', parsed_url.port)
-        self.assertNotEqual('None', parsed_url.hostname)
-        self.assertIn(parsed_url.scheme, valid_scheme)
-
     @decorators.idempotent_id('c6bc11bf-592e-4015-9319-1c98dc64daf5')
     @testtools.skipUnless(CONF.compute_feature_enabled.vnc_console,
                           'VNC Console feature is disabled.')
@@ -804,9 +810,15 @@
 
     min_microversion = '2.47'
 
+    @classmethod
+    def skip_checks(cls):
+        if not CONF.service_available.glance:
+            skip_msg = ("%s skipped as glance is not available" % cls.__name__)
+            raise cls.skipException(skip_msg)
+        super(ServersAaction247Test, cls).skip_checks()
+
     @testtools.skipUnless(CONF.compute_feature_enabled.snapshot,
                           'Snapshotting not available, backup not possible.')
-    @utils.services('image')
     @decorators.idempotent_id('252a4bdd-6366-4dae-9994-8c30aa660f23')
     def test_create_backup(self):
         server = self.create_test_server(wait_until='ACTIVE')
@@ -817,3 +829,114 @@
                                           backup_type='daily',
                                           rotation=2,
                                           name=backup1)
+
+
+class ServerActionsV293TestJSON(base.BaseV2ComputeTest):
+
+    min_microversion = '2.93'
+    volume_min_microversion = '3.68'
+
+    @classmethod
+    def skip_checks(cls):
+        if not CONF.service_available.cinder:
+            raise cls.skipException("Cinder is not available")
+        return super().skip_checks()
+
+    @classmethod
+    def setup_credentials(cls):
+        cls.prepare_instance_network()
+        super(ServerActionsV293TestJSON, cls).setup_credentials()
+
+    @classmethod
+    def resource_setup(cls):
+        super(ServerActionsV293TestJSON, cls).resource_setup()
+        cls.server_id = cls.recreate_server(None, volume_backed=True,
+                                            validatable=True)
+
+    @decorators.idempotent_id('6652dab9-ea24-4c93-ab5a-93d79c3041cf')
+    def test_rebuild_volume_backed_server(self):
+        """Test rebuilding a volume backed server"""
+        self.validation_resources = self.get_class_validation_resources(
+            self.os_primary)
+        server = self.servers_client.show_server(self.server_id)['server']
+        volume_id = server['os-extended-volumes:volumes_attached'][0]['id']
+        volume_before_rebuild = self.volumes_client.show_volume(volume_id)
+        image_before_rebuild = (
+            volume_before_rebuild['volume']
+            ['volume_image_metadata']['image_id'])
+        # Verify that image inside volume is our initial image before rebuild
+        self.assertEqual(self.image_ref, image_before_rebuild)
+
+        # Authentication is attempted in the following order of priority:
+        # 1.The key passed in, if one was passed in.
+        # 2.Any key we can find through an SSH agent (if allowed).
+        # 3.Any "id_rsa", "id_dsa" or "id_ecdsa" key discoverable in
+        #   ~/.ssh/ (if allowed).
+        # 4.Plain username/password auth, if a password was given.
+        linux_client = remote_client.RemoteClient(
+            self.get_server_ip(server, self.validation_resources),
+            self.ssh_user,
+            password=None,
+            pkey=self.validation_resources['keypair']['private_key'],
+            server=server,
+            servers_client=self.servers_client)
+        output = linux_client.exec_command('touch test_file')
+        # No output means success
+        self.assertEqual('', output.strip())
+
+        # The server should be rebuilt using the provided image and data
+        meta = {'rebuild': 'server'}
+        new_name = data_utils.rand_name(self.__class__.__name__ + '-server')
+        password = 'rebuildPassw0rd'
+        rebuilt_server = self.servers_client.rebuild_server(
+            server['id'],
+            self.image_ref_alt,
+            name=new_name,
+            metadata=meta,
+            adminPass=password)['server']
+
+        # Verify the properties in the initial response are correct
+        self.assertEqual(server['id'], rebuilt_server['id'])
+        rebuilt_image_id = rebuilt_server['image']
+        # Since it is a volume backed server, image id will remain empty
+        self.assertEqual('', rebuilt_image_id)
+        self.assert_flavor_equal(self.flavor_ref, rebuilt_server['flavor'])
+
+        # Verify the server properties after the rebuild completes
+        waiters.wait_for_server_status(self.servers_client,
+                                       rebuilt_server['id'], 'ACTIVE')
+        server = self.servers_client.show_server(
+            rebuilt_server['id'])['server']
+        volume_id = server['os-extended-volumes:volumes_attached'][0]['id']
+        volume_after_rebuild = self.volumes_client.show_volume(volume_id)
+        image_after_rebuild = (
+            volume_after_rebuild['volume']
+            ['volume_image_metadata']['image_id'])
+
+        self.assertEqual(new_name, server['name'])
+        # Verify that volume ID remains same before and after rebuild
+        self.assertEqual(volume_before_rebuild['volume']['id'],
+                         volume_after_rebuild['volume']['id'])
+        # Verify that image inside volume is our final image after rebuild
+        self.assertEqual(self.image_ref_alt, image_after_rebuild)
+
+        # Authentication is attempted in the following order of priority:
+        # 1.The key passed in, if one was passed in.
+        # 2.Any key we can find through an SSH agent (if allowed).
+        # 3.Any "id_rsa", "id_dsa" or "id_ecdsa" key discoverable in
+        #   ~/.ssh/ (if allowed).
+        # 4.Plain username/password auth, if a password was given.
+        linux_client = remote_client.RemoteClient(
+            self.get_server_ip(rebuilt_server, self.validation_resources),
+            self.ssh_alt_user,
+            password,
+            self.validation_resources['keypair']['private_key'],
+            server=rebuilt_server,
+            servers_client=self.servers_client)
+        linux_client.validate_authentication()
+        e = self.assertRaises(lib_exc.SSHExecCommandFailed,
+                              linux_client.exec_command,
+                              'cat test_file')
+        # If we rebuilt the boot volume, then we should not find
+        # the file we touched.
+        self.assertIn('No such file or directory', str(e))
diff --git a/tempest/api/compute/servers/test_server_addresses.py b/tempest/api/compute/servers/test_server_addresses.py
index 5a3f5d0..978a9da 100644
--- a/tempest/api/compute/servers/test_server_addresses.py
+++ b/tempest/api/compute/servers/test_server_addresses.py
@@ -14,7 +14,6 @@
 #    under the License.
 
 from tempest.api.compute import base
-from tempest.common import utils
 from tempest.lib import decorators
 
 
@@ -35,7 +34,6 @@
 
     @decorators.attr(type='smoke')
     @decorators.idempotent_id('6eb718c0-02d9-4d5e-acd1-4e0c269cef39')
-    @utils.services('network')
     def test_list_server_addresses(self):
         """Test listing server address
 
@@ -52,7 +50,6 @@
 
     @decorators.attr(type='smoke')
     @decorators.idempotent_id('87bbc374-5538-4f64-b673-2b0e4443cc30')
-    @utils.services('network')
     def test_list_server_addresses_by_network(self):
         """Test listing server addresses filtered by network addresses
 
diff --git a/tempest/api/compute/servers/test_server_addresses_negative.py b/tempest/api/compute/servers/test_server_addresses_negative.py
index e7444d2..bb21594 100644
--- a/tempest/api/compute/servers/test_server_addresses_negative.py
+++ b/tempest/api/compute/servers/test_server_addresses_negative.py
@@ -14,7 +14,6 @@
 #    under the License.
 
 from tempest.api.compute import base
-from tempest.common import utils
 from tempest.lib import decorators
 from tempest.lib import exceptions as lib_exc
 
@@ -35,7 +34,6 @@
 
     @decorators.attr(type=['negative'])
     @decorators.idempotent_id('02c3f645-2d2e-4417-8525-68c0407d001b')
-    @utils.services('network')
     def test_list_server_addresses_invalid_server_id(self):
         """List addresses request should fail if server id not in system"""
         self.assertRaises(lib_exc.NotFound, self.client.list_addresses,
@@ -43,7 +41,6 @@
 
     @decorators.attr(type=['negative'])
     @decorators.idempotent_id('a2ab5144-78c0-4942-a0ed-cc8edccfd9ba')
-    @utils.services('network')
     def test_list_server_addresses_by_network_neg(self):
         """List addresses by network should fail if network name not valid"""
         self.assertRaises(lib_exc.NotFound,
diff --git a/tempest/api/compute/servers/test_server_rescue.py b/tempest/api/compute/servers/test_server_rescue.py
index 716ecda..97c2774 100644
--- a/tempest/api/compute/servers/test_server_rescue.py
+++ b/tempest/api/compute/servers/test_server_rescue.py
@@ -239,13 +239,15 @@
         # after unrescue the server. Due to that we need to make
         # server SSHable before it try to detach, more details are
         # in bug#1960346
+        volume = self.create_volume(wait_for_available=False)
         validation_resources = self.get_class_validation_resources(
             self.os_primary)
         server, rescue_image_id = self._create_server_and_rescue_image(
             hw_rescue_device='disk', hw_rescue_bus='virtio', validatable=True,
             validation_resources=validation_resources, wait_until="SSHABLE")
         server = self.servers_client.show_server(server['id'])['server']
-        volume = self.create_volume()
+        waiters.wait_for_volume_resource_status(self.volumes_client,
+                                                volume['id'], 'available')
         self.attach_volume(server, volume)
         waiters.wait_for_volume_resource_status(self.volumes_client,
                                                 volume['id'], 'in-use')
diff --git a/tempest/api/compute/test_tenant_networks.py b/tempest/api/compute/test_tenant_networks.py
index 17f4b80..da28b9b 100644
--- a/tempest/api/compute/test_tenant_networks.py
+++ b/tempest/api/compute/test_tenant_networks.py
@@ -14,8 +14,11 @@
 
 from tempest.api.compute import base
 from tempest.common import utils
+from tempest import config
 from tempest.lib import decorators
 
+CONF = config.CONF
+
 
 class ComputeTenantNetworksTest(base.BaseV2ComputeTest):
     """Test compute tenant networks API with microversion less than 2.36"""
@@ -23,6 +26,14 @@
     max_microversion = '2.35'
 
     @classmethod
+    def skip_checks(cls):
+        super(ComputeTenantNetworksTest, cls).skip_checks()
+        if not CONF.service_available.neutron:
+            skip_msg = (
+                "%s skipped as Neutron is not available" % cls.__name__)
+            raise cls.skipException(skip_msg)
+
+    @classmethod
     def resource_setup(cls):
         super(ComputeTenantNetworksTest, cls).resource_setup()
         cls.client = cls.os_primary.tenant_networks_client
diff --git a/tempest/api/identity/v3/test_users.py b/tempest/api/identity/v3/test_users.py
index dc6dd4a..b95bd75 100644
--- a/tempest/api/identity/v3/test_users.py
+++ b/tempest/api/identity/v3/test_users.py
@@ -31,6 +31,12 @@
     """Test identity user password"""
 
     @classmethod
+    def skip_checks(cls):
+        super(IdentityV3UsersTest, cls).skip_checks()
+        if not CONF.identity_feature_enabled.security_compliance:
+            raise cls.skipException("Security compliance not available.")
+
+    @classmethod
     def resource_setup(cls):
         super(IdentityV3UsersTest, cls).resource_setup()
         cls.creds = cls.os_primary.credentials
@@ -77,8 +83,6 @@
         time.sleep(1)
         self.non_admin_users_client.auth_provider.set_auth()
 
-    @testtools.skipUnless(CONF.identity_feature_enabled.security_compliance,
-                          'Security compliance not available.')
     @decorators.idempotent_id('ad71bd23-12ad-426b-bb8b-195d2b635f27')
     @testtools.skipIf(CONF.identity_feature_enabled.immutable_user_source,
                       'Skipped because environment has an '
@@ -107,8 +111,6 @@
                           user_id=self.user_id,
                           password=old_pass)
 
-    @testtools.skipUnless(CONF.identity_feature_enabled.security_compliance,
-                          'Security compliance not available.')
     @decorators.idempotent_id('941784ee-5342-4571-959b-b80dd2cea516')
     @testtools.skipIf(CONF.identity_feature_enabled.immutable_user_source,
                       'Skipped because environment has an '
@@ -142,8 +144,6 @@
         # A different password can be set
         self._update_password(original_password=new_pass1, password=new_pass2)
 
-    @testtools.skipUnless(CONF.identity_feature_enabled.security_compliance,
-                          'Security compliance not available.')
     @decorators.idempotent_id('a7ad8bbf-2cff-4520-8c1d-96332e151658')
     def test_user_account_lockout(self):
         """Test locking out user account after failure attempts"""
diff --git a/tempest/api/image/base.py b/tempest/api/image/base.py
index 23e7fd8..11a1e6c 100644
--- a/tempest/api/image/base.py
+++ b/tempest/api/image/base.py
@@ -13,6 +13,7 @@
 #    under the License.
 
 import io
+import time
 
 from tempest.common import image as common_image
 from tempest import config
@@ -22,6 +23,7 @@
 import tempest.test
 
 CONF = config.CONF
+BAD_REQUEST_RETRIES = 3
 
 
 class BaseImageTest(tempest.test.BaseTestCase):
@@ -159,6 +161,82 @@
             pass
         return stores
 
+    def _update_image_with_retries(self, image, patch):
+        # NOTE(danms): If glance was unable to fetch the remote image via
+        # HTTP, it will return BadRequest. Because this can be transient in
+        # CI, we try this a few times before we agree that it has failed
+        # for a reason worthy of failing the test.
+        for i in range(BAD_REQUEST_RETRIES):
+            try:
+                self.client.update_image(image, patch)
+                break
+            except exceptions.BadRequest:
+                if i + 1 == BAD_REQUEST_RETRIES:
+                    raise
+                else:
+                    time.sleep(1)
+
+    def check_set_location(self):
+        image = self.client.create_image(container_format='bare',
+                                         disk_format='raw')
+
+        # Locations should be empty when there is no data
+        self.assertEqual('queued', image['status'])
+        self.assertEqual([], image['locations'])
+
+        # Add a new location
+        new_loc = {'metadata': {'foo': 'bar'},
+                   'url': CONF.image.http_image}
+        self._update_image_with_retries(image['id'], [
+            dict(add='/locations/-', value=new_loc)])
+
+        # The image should now be active, with one location that looks
+        # like we expect
+        image = self.client.show_image(image['id'])
+        self.assertEqual(1, len(image['locations']),
+                         'Image should have one location but has %i' % (
+                         len(image['locations'])))
+        self.assertEqual(new_loc['url'], image['locations'][0]['url'])
+        self.assertEqual('bar', image['locations'][0]['metadata'].get('foo'))
+        if 'direct_url' in image:
+            self.assertEqual(image['direct_url'], image['locations'][0]['url'])
+
+        # If we added the location directly, the image goes straight
+        # to active and no hashing is done
+        self.assertEqual('active', image['status'])
+        self.assertIsNone(None, image['os_hash_algo'])
+        self.assertIsNone(None, image['os_hash_value'])
+
+        return image
+
+    def check_set_multiple_locations(self):
+        image = self.check_set_location()
+
+        new_loc = {'metadata': {'speed': '88mph'},
+                   'url': '%s#new' % CONF.image.http_image}
+        self._update_image_with_retries(image['id'],
+                                        [dict(add='/locations/-',
+                                              value=new_loc)])
+
+        # The image should now have two locations and the last one
+        # (locations are ordered) should have the new URL.
+        image = self.client.show_image(image['id'])
+        self.assertEqual(2, len(image['locations']),
+                         'Image should have two locations but has %i' % (
+                         len(image['locations'])))
+        self.assertEqual(new_loc['url'], image['locations'][1]['url'])
+
+        # The image should still be active and still have no hashes
+        self.assertEqual('active', image['status'])
+        self.assertIsNone(None, image['os_hash_algo'])
+        self.assertIsNone(None, image['os_hash_value'])
+
+        # The direct_url should still match the first location
+        if 'direct_url' in image:
+            self.assertEqual(image['direct_url'], image['locations'][0]['url'])
+
+        return image
+
 
 class BaseV2MemberImageTest(BaseV2ImageTest):
 
diff --git a/tempest/api/image/v2/admin/test_images.py b/tempest/api/image/v2/admin/test_images.py
index 733c778..ce50c5d 100644
--- a/tempest/api/image/v2/admin/test_images.py
+++ b/tempest/api/image/v2/admin/test_images.py
@@ -20,6 +20,7 @@
 from tempest import config
 from tempest.lib.common.utils import data_utils
 from tempest.lib import decorators
+from tempest.lib import exceptions as lib_exc
 
 CONF = config.CONF
 
@@ -59,6 +60,24 @@
         self.assertNotEqual(created_image_info['owner'],
                             updated_image_info['owner'])
 
+    @decorators.idempotent_id('f6ab4aa0-035e-4664-9f2d-c57c6df50605')
+    def test_list_public_image(self):
+        """Test create image as admin and list public image as none admin"""
+        name = data_utils.rand_name(self.__class__.__name__ + '-Image')
+        image = self.admin_client.create_image(
+            name=name,
+            container_format='bare',
+            visibility='public',
+            disk_format='raw')
+        waiters.wait_for_image_status(self.admin_client, image['id'], 'queued')
+        created_image = self.admin_client.show_image(image['id'])
+        self.assertEqual(image['id'], created_image['id'])
+        self.addCleanup(self.admin_client.delete_image, image['id'])
+
+        images_list = self.client.list_images()['images']
+        fetched_images_id = [img['id'] for img in images_list]
+        self.assertIn(image['id'], fetched_images_id)
+
 
 class ImportCopyImagesTest(base.BaseV2ImageAdminTest):
     """Test the import copy-image operations"""
@@ -120,3 +139,40 @@
         self.assertEqual(0, len(failed_stores),
                          "Failed to copy the following stores: %s" %
                          str(failed_stores))
+
+
+class ImageLocationsAdminTest(base.BaseV2ImageAdminTest):
+
+    @classmethod
+    def skip_checks(cls):
+        super(ImageLocationsAdminTest, cls).skip_checks()
+        if not CONF.image_feature_enabled.manage_locations:
+            skip_msg = (
+                "%s skipped as show_multiple_locations is not available" % (
+                    cls.__name__))
+            raise cls.skipException(skip_msg)
+
+    @decorators.idempotent_id('8a648de4-b745-4c28-a7b5-20de1c3da4d2')
+    def test_delete_locations(self):
+        image = self.check_set_multiple_locations()
+        expected_remaining_loc = image['locations'][1]
+
+        self.admin_client.update_image(image['id'], [
+            dict(remove='/locations/0')])
+
+        # The image should now have only the one location we did not delete
+        image = self.client.show_image(image['id'])
+        self.assertEqual(1, len(image['locations']),
+                         'Image should have one location but has %i' % (
+                         len(image['locations'])))
+        self.assertEqual(expected_remaining_loc['url'],
+                         image['locations'][0]['url'])
+
+        # The direct_url should now be the last remaining location
+        if 'direct_url' in image:
+            self.assertEqual(image['direct_url'], image['locations'][0]['url'])
+
+        # Removing the last location should be disallowed
+        self.assertRaises(lib_exc.Forbidden,
+                          self.admin_client.update_image, image['id'], [
+                              dict(remove='/locations/0')])
diff --git a/tempest/api/image/v2/test_images.py b/tempest/api/image/v2/test_images.py
index b723977..977ad82 100644
--- a/tempest/api/image/v2/test_images.py
+++ b/tempest/api/image/v2/test_images.py
@@ -16,19 +16,18 @@
 
 import io
 import random
-import time
 
 from oslo_log import log as logging
 from tempest.api.image import base
 from tempest.common import waiters
 from tempest import config
 from tempest.lib.common.utils import data_utils
+from tempest.lib.common.utils import test_utils
 from tempest.lib import decorators
 from tempest.lib import exceptions as lib_exc
 
 CONF = config.CONF
 LOG = logging.getLogger(__name__)
-BAD_REQUEST_RETRIES = 3
 
 
 class ImportImagesTest(base.BaseV2ImageTest):
@@ -735,6 +734,30 @@
         body = self.schemas_client.show_schema(schema)
         self.assertEqual("images", body['name'])
 
+    @decorators.idempotent_id('d43f3efc-da4c-4af9-b636-868f0c6acedb')
+    def test_list_hidden_image(self):
+        image = self.client.create_image(os_hidden=True)
+        image = image['image'] if 'image' in image else image
+        self.addCleanup(self.client.wait_for_resource_deletion, image['id'])
+        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+                        self.client.delete_image, image['id'])
+        images_list = self.client.list_images()['images']
+        fetched_images_id = [img['id'] for img in images_list]
+        self.assertNotIn(image['id'], fetched_images_id)
+
+    @decorators.idempotent_id('fdb96b81-257b-42ac-978b-ddeefa3760e4')
+    def test_list_update_hidden_image(self):
+        image = self.create_image()
+        images_list = self.client.list_images()['images']
+        fetched_images_id = [img['id'] for img in images_list]
+        self.assertIn(image['id'], fetched_images_id)
+
+        self.client.update_image(image['id'],
+                                 [dict(replace='/os_hidden', value=True)])
+        images_list = self.client.list_images()['images']
+        fetched_images_id = [img['id'] for img in images_list]
+        self.assertNotIn(image['id'], fetched_images_id)
+
 
 class ListSharedImagesTest(base.BaseV2ImageTest):
     """Here we test the listing of a shared image information"""
@@ -808,89 +831,13 @@
 
         return image
 
-    def _check_set_location(self):
-        image = self.client.create_image(container_format='bare',
-                                         disk_format='raw')
-
-        # Locations should be empty when there is no data
-        self.assertEqual('queued', image['status'])
-        self.assertEqual([], image['locations'])
-
-        # Add a new location
-        new_loc = {'metadata': {'foo': 'bar'},
-                   'url': CONF.image.http_image}
-        self._update_image_with_retries(image['id'], [
-            dict(add='/locations/-', value=new_loc)])
-
-        # The image should now be active, with one location that looks
-        # like we expect
-        image = self.client.show_image(image['id'])
-        self.assertEqual(1, len(image['locations']),
-                         'Image should have one location but has %i' % (
-                         len(image['locations'])))
-        self.assertEqual(new_loc['url'], image['locations'][0]['url'])
-        self.assertEqual('bar', image['locations'][0]['metadata'].get('foo'))
-        if 'direct_url' in image:
-            self.assertEqual(image['direct_url'], image['locations'][0]['url'])
-
-        # If we added the location directly, the image goes straight
-        # to active and no hashing is done
-        self.assertEqual('active', image['status'])
-        self.assertIsNone(None, image['os_hash_algo'])
-        self.assertIsNone(None, image['os_hash_value'])
-
-        return image
-
     @decorators.idempotent_id('37599b8a-d5c0-4590-aee5-73878502be15')
     def test_set_location(self):
-        self._check_set_location()
-
-    def _update_image_with_retries(self, image, patch):
-        # NOTE(danms): If glance was unable to fetch the remote image via
-        # HTTP, it will return BadRequest. Because this can be transient in
-        # CI, we try this a few times before we agree that it has failed
-        # for a reason worthy of failing the test.
-        for i in range(BAD_REQUEST_RETRIES):
-            try:
-                self.client.update_image(image, patch)
-                break
-            except lib_exc.BadRequest:
-                if i + 1 == BAD_REQUEST_RETRIES:
-                    raise
-                else:
-                    time.sleep(1)
-
-    def _check_set_multiple_locations(self):
-        image = self._check_set_location()
-
-        new_loc = {'metadata': {'speed': '88mph'},
-                   'url': '%s#new' % CONF.image.http_image}
-        self._update_image_with_retries(image['id'],
-                                        [dict(add='/locations/-',
-                                              value=new_loc)])
-
-        # The image should now have two locations and the last one
-        # (locations are ordered) should have the new URL.
-        image = self.client.show_image(image['id'])
-        self.assertEqual(2, len(image['locations']),
-                         'Image should have two locations but has %i' % (
-                         len(image['locations'])))
-        self.assertEqual(new_loc['url'], image['locations'][1]['url'])
-
-        # The image should still be active and still have no hashes
-        self.assertEqual('active', image['status'])
-        self.assertIsNone(None, image['os_hash_algo'])
-        self.assertIsNone(None, image['os_hash_value'])
-
-        # The direct_url should still match the first location
-        if 'direct_url' in image:
-            self.assertEqual(image['direct_url'], image['locations'][0]['url'])
-
-        return image
+        self.check_set_location()
 
     @decorators.idempotent_id('bf6e0009-c039-4884-b498-db074caadb10')
     def test_replace_location(self):
-        image = self._check_set_multiple_locations()
+        image = self.check_set_multiple_locations()
         original_locs = image['locations']
 
         # Replacing with the exact thing should work
@@ -927,31 +874,6 @@
                          len(image['locations'])))
         self.assertEqual(original_locs, image['locations'])
 
-    @decorators.idempotent_id('8a648de4-b745-4c28-a7b5-20de1c3da4d2')
-    def test_delete_locations(self):
-        image = self._check_set_multiple_locations()
-        expected_remaining_loc = image['locations'][1]
-
-        self.client.update_image(image['id'], [
-            dict(remove='/locations/0')])
-
-        # The image should now have only the one location we did not delete
-        image = self.client.show_image(image['id'])
-        self.assertEqual(1, len(image['locations']),
-                         'Image should have one location but has %i' % (
-                         len(image['locations'])))
-        self.assertEqual(expected_remaining_loc['url'],
-                         image['locations'][0]['url'])
-
-        # The direct_url should now be the last remaining location
-        if 'direct_url' in image:
-            self.assertEqual(image['direct_url'], image['locations'][0]['url'])
-
-        # Removing the last location should be disallowed
-        self.assertRaises(lib_exc.Forbidden,
-                          self.client.update_image, image['id'], [
-                              dict(remove='/locations/0')])
-
     @decorators.idempotent_id('a9a20396-8399-4b36-909d-564949be098f')
     def test_set_location_bad_scheme(self):
         image = self.client.create_image(container_format='bare',
diff --git a/tempest/api/network/test_service_providers.py b/tempest/api/network/test_service_providers.py
index 5af5244..e203a2c 100644
--- a/tempest/api/network/test_service_providers.py
+++ b/tempest/api/network/test_service_providers.py
@@ -10,8 +10,6 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-import testtools
-
 from tempest.api.network import base
 from tempest.common import utils
 from tempest.lib import decorators
@@ -20,10 +18,14 @@
 class ServiceProvidersTest(base.BaseNetworkTest):
     """Test network service providers"""
 
+    @classmethod
+    def skip_checks(cls):
+        super(ServiceProvidersTest, cls).skip_checks()
+        if not utils.is_extension_enabled('service-type', 'network'):
+            skip_msg = ("service-type extension not enabled.")
+            raise cls.skipException(skip_msg)
+
     @decorators.idempotent_id('2cbbeea9-f010-40f6-8df5-4eaa0c918ea6')
-    @testtools.skipUnless(
-        utils.is_extension_enabled('service-type', 'network'),
-        'service-type extension not enabled.')
     def test_service_providers_list(self):
         """Test listing network service providers"""
         body = self.service_providers_client.list_service_providers()
diff --git a/tempest/api/object_storage/base.py b/tempest/api/object_storage/base.py
index 7107dc4..58ad9d4 100644
--- a/tempest/api/object_storage/base.py
+++ b/tempest/api/object_storage/base.py
@@ -15,6 +15,8 @@
 
 import time
 
+from oslo_log import log
+
 from tempest.common import custom_matchers
 from tempest.common import waiters
 from tempest import config
@@ -23,6 +25,7 @@
 import tempest.test
 
 CONF = config.CONF
+LOG = log.getLogger(__name__)
 
 
 def delete_containers(containers, container_client, object_client):
@@ -41,17 +44,33 @@
 
     for cont in containers:
         try:
-            params = {'limit': 9999, 'format': 'json'}
-            _, objlist = container_client.list_container_objects(cont, params)
-            # delete every object in the container
-            for obj in objlist:
-                object_client.delete_object(cont, obj['name'])
-                object_client.wait_for_resource_deletion(obj['name'], cont)
-            # Verify resource deletion
+            delete_objects(cont, container_client, object_client)
             container_client.delete_container(cont)
             container_client.wait_for_resource_deletion(cont)
         except lib_exc.NotFound:
-            pass
+            LOG.warning(f"Container {cont} wasn't deleted as it wasn't found.")
+
+
+def delete_objects(container, container_client, object_client):
+    """Remove all objects from container.
+
+    Will not throw any error if the objects do not exist
+
+    :param container: Name of the container that contains the objects to be
+                      deleted
+    :param container_client: Client to be used to list objects in
+                             the container
+    :param object_client: Client to be used to delete objects
+    """
+    params = {'limit': 9999, 'format': 'json'}
+    _, objlist = container_client.list_container_objects(container, params)
+
+    for obj in objlist:
+        try:
+            object_client.delete_object(container, obj['name'])
+            object_client.wait_for_resource_deletion(obj['name'], container)
+        except lib_exc.NotFound:
+            LOG.warning(f"Object {obj} wasn't deleted as it wasn't found.")
 
 
 class BaseObjectTest(tempest.test.BaseTestCase):
diff --git a/tempest/api/object_storage/test_container_acl_negative.py b/tempest/api/object_storage/test_container_acl_negative.py
index 85e6ddb..347c79e 100644
--- a/tempest/api/object_storage/test_container_acl_negative.py
+++ b/tempest/api/object_storage/test_container_acl_negative.py
@@ -41,6 +41,7 @@
         super(ObjectACLsNegativeTest, self).setUp()
         self.container_name = data_utils.rand_name(name='TestContainer')
         self.container_client.update_container(self.container_name)
+        self.containers.append(self.container_name)
 
     @classmethod
     def resource_cleanup(cls):
diff --git a/tempest/api/object_storage/test_object_services.py b/tempest/api/object_storage/test_object_services.py
index 7d5bd26..61b9136 100644
--- a/tempest/api/object_storage/test_object_services.py
+++ b/tempest/api/object_storage/test_object_services.py
@@ -1016,9 +1016,10 @@
         super(PublicObjectTest, self).setUp()
         self.container_name = data_utils.rand_name(name='TestContainer')
         self.container_client.update_container(self.container_name)
+        self.containers.append(self.container_name)
 
     def tearDown(self):
-        self.delete_containers([self.container_name])
+        self.delete_containers()
         super(PublicObjectTest, self).tearDown()
 
     @decorators.idempotent_id('07c9cf95-c0d4-4b49-b9c8-0ef2c9b27193')
diff --git a/tempest/api/volume/admin/test_encrypted_volumes_extend.py b/tempest/api/volume/admin/test_encrypted_volumes_extend.py
index e85a00d..4506389 100644
--- a/tempest/api/volume/admin/test_encrypted_volumes_extend.py
+++ b/tempest/api/volume/admin/test_encrypted_volumes_extend.py
@@ -14,7 +14,6 @@
 
 from tempest.api.volume import base
 from tempest.api.volume import test_volumes_extend as extend
-from tempest.common import utils
 from tempest import config
 from tempest.lib import decorators
 
@@ -25,23 +24,25 @@
                                          base.BaseVolumeAdminTest):
     """Tests extending the size of an attached encrypted volume."""
 
+    @classmethod
+    def skip_checks(cls):
+        super(EncryptedVolumesExtendAttachedTest, cls).skip_checks()
+        if not CONF.service_available.nova:
+            skip_msg = ("%s skipped as Nova is not available" % cls.__name__)
+            raise cls.skipException(skip_msg)
+        if not CONF.volume_feature_enabled.extend_attached_encrypted_volume:
+            raise cls.skipException(
+                "Attached encrypted volume extend is disabled.")
+
     @decorators.idempotent_id('e93243ec-7c37-4b5b-a099-ebf052c13216')
-    @testtools.skipUnless(
-        CONF.volume_feature_enabled.extend_attached_encrypted_volume,
-        "Attached encrypted volume extend is disabled.")
-    @utils.services('compute')
     def test_extend_attached_encrypted_volume_luksv1(self):
         """LUKs v1 decrypts and extends through libvirt."""
         volume = self.create_encrypted_volume(encryption_provider="luks")
         self._test_extend_attached_volume(volume)
 
     @decorators.idempotent_id('381a2a3a-b2f4-4631-a910-720881f2cc2f')
-    @testtools.skipUnless(
-        CONF.volume_feature_enabled.extend_attached_encrypted_volume,
-        "Attached encrypted volume extend is disabled.")
     @testtools.skipIf(CONF.volume.storage_protocol == 'ceph',
                       'Ceph only supports LUKSv2 if doing host attach.')
-    @utils.services('compute')
     def test_extend_attached_encrypted_volume_luksv2(self):
         """LUKs v2 decrypts and extends through os-brick."""
         volume = self.create_encrypted_volume(encryption_provider="luks2")
diff --git a/tempest/api/volume/admin/test_snapshot_manage.py b/tempest/api/volume/admin/test_snapshot_manage.py
index ab0aa38..478bd16 100644
--- a/tempest/api/volume/admin/test_snapshot_manage.py
+++ b/tempest/api/volume/admin/test_snapshot_manage.py
@@ -14,6 +14,7 @@
 #    under the License.
 
 from tempest.api.volume import base
+from tempest.common import utils
 from tempest.common import waiters
 from tempest import config
 from tempest.lib.common.utils import data_utils
@@ -31,6 +32,8 @@
      managed by Cinder from a storage back end to Cinder
     """
 
+    create_default_network = True
+
     @classmethod
     def skip_checks(cls):
         super(SnapshotManageAdminTest, cls).skip_checks()
@@ -46,8 +49,7 @@
                    "it should be a list of two elements")
             raise exceptions.InvalidConfiguration(msg)
 
-    @decorators.idempotent_id('0132f42d-0147-4b45-8501-cc504bbf7810')
-    def test_unmanage_manage_snapshot(self):
+    def _test_unmanage_manage_snapshot(self, attached_volume=False):
         """Test unmanaging and managing volume snapshot"""
         # Create a volume
         volume = self.create_volume()
@@ -55,6 +57,13 @@
         # Create a snapshot
         snapshot = self.create_snapshot(volume_id=volume['id'])
 
+        if attached_volume:
+            # Create a server
+            server = self.create_server(wait_until='SSHABLE')
+            # Attach volume to instance
+            self.attach_volume(server['id'], volume['id'],
+                               wait_for_detach=False)
+
         # Unmanage the snapshot
         # Unmanage snapshot function works almost the same as delete snapshot,
         # but it does not delete the snapshot data
@@ -100,3 +109,17 @@
         self.assertEqual(snapshot['size'], new_snapshot_info['size'])
         for key in ['volume_id', 'name', 'description', 'metadata']:
             self.assertEqual(snapshot_ref[key], new_snapshot_info[key])
+
+    @decorators.idempotent_id('0132f42d-0147-4b45-8501-cc504bbf7810')
+    def test_unmanage_manage_snapshot(self):
+        self._test_unmanage_manage_snapshot()
+
+    @decorators.idempotent_id('7c735385-e953-4198-8534-68137f72dbdc')
+    @utils.services('compute')
+    def test_snapshot_manage_with_attached_volume(self):
+        """Test manage a snapshot with an attached volume.
+
+           The case validates manage snapshot operation while
+           the parent volume is attached to an instance.
+        """
+        self._test_unmanage_manage_snapshot(attached_volume=True)
diff --git a/tempest/api/volume/admin/test_volumes_actions.py b/tempest/api/volume/admin/test_volumes_actions.py
index ecddfba..b6e9f32 100644
--- a/tempest/api/volume/admin/test_volumes_actions.py
+++ b/tempest/api/volume/admin/test_volumes_actions.py
@@ -83,7 +83,7 @@
         server_id = self.create_server()['id']
         volume_id = self.create_volume()['id']
 
-        # Attach volume
+        # Request Cinder to map & export volume (it's not attached to instance)
         self.volumes_client.attach_volume(
             volume_id,
             instance_uuid=server_id,
@@ -101,7 +101,9 @@
         waiters.wait_for_volume_resource_status(self.volumes_client,
                                                 volume_id, 'error')
 
-        # Force detach volume
+        # The force detach volume calls works because the volume is not really
+        # connected to the instance (it is safe), otherwise it would be
+        # rejected for security reasons (bug #2004555).
         self.admin_volume_client.force_detach_volume(
             volume_id, connector=None,
             attachment_id=attachment['attachment_id'])
diff --git a/tempest/api/volume/base.py b/tempest/api/volume/base.py
index a31390a..ad8f573 100644
--- a/tempest/api/volume/base.py
+++ b/tempest/api/volume/base.py
@@ -42,6 +42,10 @@
         if not CONF.service_available.cinder:
             skip_msg = ("%s skipped as Cinder is not available" % cls.__name__)
             raise cls.skipException(skip_msg)
+        if cls.create_default_network and not CONF.service_available.neutron:
+            skip_msg = (
+                "%s skipped as Neutron is not available" % cls.__name__)
+            raise cls.skipException(skip_msg)
 
         api_version_utils.check_skip_with_microversion(
             cls.volume_min_microversion, cls.volume_max_microversion,
diff --git a/tempest/api/volume/test_volumes_backup.py b/tempest/api/volume/test_volumes_backup.py
index 85e4bb2..89ff497 100644
--- a/tempest/api/volume/test_volumes_backup.py
+++ b/tempest/api/volume/test_volumes_backup.py
@@ -116,7 +116,7 @@
         is "available" or "in-use".
         """
         # Create a server
-        volume = self.create_volume()
+        volume = self.create_volume(wait_until=False)
         self.addCleanup(self.delete_volume, self.volumes_client, volume['id'])
         validation_resources = self.get_test_validation_resources(
             self.os_primary)
@@ -124,6 +124,8 @@
                                     validation_resources=validation_resources,
                                     validatable=True)
         # Attach volume to instance
+        waiters.wait_for_volume_resource_status(self.volumes_client,
+                                                volume['id'], 'available')
         self.attach_volume(server['id'], volume['id'])
         # Create backup using force flag
         backup_name = data_utils.rand_name(
diff --git a/tempest/api/volume/test_volumes_extend.py b/tempest/api/volume/test_volumes_extend.py
index 9066979..c766db8 100644
--- a/tempest/api/volume/test_volumes_extend.py
+++ b/tempest/api/volume/test_volumes_extend.py
@@ -18,7 +18,6 @@
 import testtools
 
 from tempest.api.volume import base
-from tempest.common import utils
 from tempest.common import waiters
 from tempest import config
 from tempest.lib import decorators
@@ -46,6 +45,9 @@
     @decorators.idempotent_id('86be1cba-2640-11e5-9c82-635fb964c912')
     @testtools.skipUnless(CONF.volume_feature_enabled.snapshot,
                           "Cinder volume snapshots are disabled")
+    @testtools.skipUnless(
+        CONF.volume_feature_enabled.extend_volume_with_snapshot,
+        "Extending volume with snapshot is disabled.")
     def test_volume_extend_when_volume_has_snapshot(self):
         """Test extending a volume which has a snapshot"""
         volume = self.create_volume()
@@ -178,10 +180,16 @@
 
 class VolumesExtendAttachedTest(BaseVolumesExtendAttachedTest):
 
+    @classmethod
+    def skip_checks(cls):
+        super(VolumesExtendAttachedTest, cls).skip_checks()
+        if not CONF.service_available.nova:
+            skip_msg = ("%s skipped as Nova is not available" % cls.__name__)
+            raise cls.skipException(skip_msg)
+        if not CONF.volume_feature_enabled.extend_attached_volume:
+            raise cls.skipException("Attached volume extend is disabled.")
+
     @decorators.idempotent_id('301f5a30-1c6f-4ea0-be1a-91fd28d44354')
-    @testtools.skipUnless(CONF.volume_feature_enabled.extend_attached_volume,
-                          "Attached volume extend is disabled.")
-    @utils.services('compute')
     def test_extend_attached_volume(self):
         volume = self.create_volume()
         self._test_extend_attached_volume(volume)
diff --git a/tempest/common/compute.py b/tempest/common/compute.py
index 7fa8247..53d44f1 100644
--- a/tempest/common/compute.py
+++ b/tempest/common/compute.py
@@ -299,16 +299,15 @@
 
     if wait_until:
 
-        if wait_until == 'SSHABLE' and not (
-                validatable and validation_resources is not None):
-            raise RuntimeError('SSHABLE requires validatable=True and '
-                               'validation_resources to be passed')
-
         # NOTE(lyarwood): PINGABLE and SSHABLE both require the instance to
         # go ACTIVE initially before we can setup the fip(s) etc so stash
         # this additional wait state for later use.
         wait_until_extra = None
         if wait_until in ['PINGABLE', 'SSHABLE']:
+            if not validatable and validation_resources is None:
+                raise RuntimeError(
+                    'SSHABLE/PINGABLE requires validatable=True '
+                    'and validation_resources to be passed')
             wait_until_extra = wait_until
             wait_until = 'ACTIVE'
 
diff --git a/tempest/common/waiters.py b/tempest/common/waiters.py
index 59c66b5..291f201 100644
--- a/tempest/common/waiters.py
+++ b/tempest/common/waiters.py
@@ -77,7 +77,8 @@
             if 'fault' in body:
                 details += 'Fault: %s.' % body['fault']
             if request_id:
-                details += ' Server boot request ID: %s.' % request_id
+                details += ' Request ID of server operation performed before'
+                details += ' checking the server status %s.' % request_id
             raise exceptions.BuildErrorException(details, server_id=server_id)
 
         timed_out = int(time.time()) - start_time >= timeout
@@ -92,7 +93,8 @@
                         'expected_task_state': expected_task_state,
                         'timeout': timeout})
             if request_id:
-                message += ' Server boot request ID: %s.' % request_id
+                message += ' Request ID of server operation performed before'
+                message += ' checking the server status %s.' % request_id
             message += ' Current status: %s.' % server_status
             message += ' Current task state: %s.' % task_state
             caller = test_utils.find_test_caller()
diff --git a/tempest/config.py b/tempest/config.py
index 89161dc..a174fdd 100644
--- a/tempest/config.py
+++ b/tempest/config.py
@@ -1107,7 +1107,13 @@
                      'server instance? This depends on the 3.42 volume API '
                      'microversion and the 2.51 compute API microversion. '
                      'Also, not all volume or compute backends support this '
+                     'operation.'),
+    cfg.BoolOpt('extend_volume_with_snapshot',
+                default=True,
+                help='Does the cloud support extending the size of a volume '
+                     'which has snapshot? Some drivers do not support this '
                      'operation.')
+
 ]
 
 
diff --git a/tempest/lib/api_schema/response/volume/volumes.py b/tempest/lib/api_schema/response/volume/volumes.py
index 4f44526..900e5ef 100644
--- a/tempest/lib/api_schema/response/volume/volumes.py
+++ b/tempest/lib/api_schema/response/volume/volumes.py
@@ -295,6 +295,7 @@
 attach_volume = {'status_code': [202]}
 set_bootable_volume = {'status_code': [200]}
 detach_volume = {'status_code': [202]}
+terminate_connection = {'status_code': [202]}
 reserve_volume = {'status_code': [202]}
 unreserve_volume = {'status_code': [202]}
 extend_volume = {'status_code': [202]}
diff --git a/tempest/lib/services/volume/v3/attachments_client.py b/tempest/lib/services/volume/v3/attachments_client.py
index 5e448f7..303341e 100644
--- a/tempest/lib/services/volume/v3/attachments_client.py
+++ b/tempest/lib/services/volume/v3/attachments_client.py
@@ -26,3 +26,10 @@
         body = json.loads(body)
         self.expected_success(200, resp.status)
         return rest_client.ResponseBody(resp, body)
+
+    def delete_attachment(self, attachment_id):
+        """Delete volume attachment."""
+        url = "attachments/%s" % (attachment_id)
+        resp, body = self.delete(url)
+        self.expected_success(200, resp.status)
+        return rest_client.ResponseBody(resp, body)
diff --git a/tempest/lib/services/volume/v3/volumes_client.py b/tempest/lib/services/volume/v3/volumes_client.py
index ad8bd71..c6f8973 100644
--- a/tempest/lib/services/volume/v3/volumes_client.py
+++ b/tempest/lib/services/volume/v3/volumes_client.py
@@ -205,14 +205,23 @@
         self.validate_response(schema.set_bootable_volume, resp, body)
         return rest_client.ResponseBody(resp, body)
 
-    def detach_volume(self, volume_id):
+    def detach_volume(self, volume_id, **kwargs):
         """Detaches a volume from an instance."""
-        post_body = json.dumps({'os-detach': {}})
+        post_body = json.dumps({'os-detach': kwargs})
         url = 'volumes/%s/action' % (volume_id)
         resp, body = self.post(url, post_body)
         self.validate_response(schema.detach_volume, resp, body)
         return rest_client.ResponseBody(resp, body)
 
+    def terminate_connection(self, volume_id, connector):
+        """Detaches a volume from an instance using terminate_connection."""
+        post_body = json.dumps(
+            {'os-terminate_connection': {'connector': connector}})
+        url = 'volumes/%s/action' % (volume_id)
+        resp, body = self.post(url, post_body)
+        self.validate_response(schema.terminate_connection, resp, body)
+        return rest_client.ResponseBody(resp, body)
+
     def reserve_volume(self, volume_id):
         """Reserves a volume."""
         post_body = json.dumps({'os-reserve': {}})
diff --git a/tempest/scenario/manager.py b/tempest/scenario/manager.py
index 20495ee..4d35bbb 100644
--- a/tempest/scenario/manager.py
+++ b/tempest/scenario/manager.py
@@ -1140,14 +1140,19 @@
                                             server=server,
                                             username=username)
 
+        # Default the directory in which to write the timestamp file to /tmp
+        # and only use the mount_path as the target directory if we mounted
+        # dev_name to mount_path.
+        target_dir = '/tmp'
         if dev_name is not None:
             ssh_client.make_fs(dev_name, fs=fs)
             ssh_client.exec_command('sudo mount /dev/%s %s' % (dev_name,
                                                                mount_path))
-        cmd_timestamp = 'sudo sh -c "date > %s/timestamp; sync"' % mount_path
+            target_dir = mount_path
+        cmd_timestamp = 'sudo sh -c "date > %s/timestamp; sync"' % target_dir
         ssh_client.exec_command(cmd_timestamp)
         timestamp = ssh_client.exec_command('sudo cat %s/timestamp'
-                                            % mount_path)
+                                            % target_dir)
         if dev_name is not None:
             ssh_client.exec_command('sudo umount %s' % mount_path)
         return timestamp
@@ -1172,10 +1177,15 @@
                                             server=server,
                                             username=username)
 
+        # Default the directory from which to read the timestamp file to /tmp
+        # and only use the mount_path as the target directory if we mounted
+        # dev_name to mount_path.
+        target_dir = '/tmp'
         if dev_name is not None:
             ssh_client.mount(dev_name, mount_path)
+            target_dir = mount_path
         timestamp = ssh_client.exec_command('sudo cat %s/timestamp'
-                                            % mount_path)
+                                            % target_dir)
         if dev_name is not None:
             ssh_client.exec_command('sudo umount %s' % mount_path)
         return timestamp
@@ -1659,7 +1669,8 @@
 
     def create_encrypted_volume(self, encryption_provider, volume_type,
                                 key_size=256, cipher='aes-xts-plain64',
-                                control_location='front-end'):
+                                control_location='front-end',
+                                wait_until='available'):
         """Creates an encrypted volume"""
         volume_type = self.create_volume_type(name=volume_type)
         self.create_encryption_type(type_id=volume_type['id'],
@@ -1667,7 +1678,8 @@
                                     key_size=key_size,
                                     cipher=cipher,
                                     control_location=control_location)
-        return self.create_volume(volume_type=volume_type['name'])
+        return self.create_volume(volume_type=volume_type['name'],
+                                  wait_until=wait_until)
 
 
 class ObjectStorageScenarioTest(ScenarioTest):
diff --git a/tempest/scenario/test_encrypted_cinder_volumes.py b/tempest/scenario/test_encrypted_cinder_volumes.py
index 60abc02..753e64f 100644
--- a/tempest/scenario/test_encrypted_cinder_volumes.py
+++ b/tempest/scenario/test_encrypted_cinder_volumes.py
@@ -16,6 +16,7 @@
 import testtools
 
 from tempest.common import utils
+from tempest.common import waiters
 from tempest import config
 from tempest.lib import decorators
 from tempest.scenario import manager
@@ -56,9 +57,16 @@
     @utils.services('compute', 'volume', 'image')
     def test_encrypted_cinder_volumes_luks(self):
         """LUKs v1 decrypts volume through libvirt."""
-        server = self.launch_instance()
         volume = self.create_encrypted_volume('luks',
-                                              volume_type='luks')
+                                              volume_type='luks',
+                                              wait_until=None)
+        server = self.launch_instance()
+        waiters.wait_for_volume_resource_status(self.volumes_client,
+                                                volume['id'], 'available')
+        # The volume retrieved on creation has a non-up-to-date status.
+        # Retrieval after it becomes active ensures correct details.
+        volume = self.volumes_client.show_volume(volume['id'])['volume']
+
         self.attach_detach_volume(server, volume)
 
     @decorators.idempotent_id('7abec0a3-61a0-42a5-9e36-ad3138fb38b4')
@@ -68,16 +76,30 @@
     @utils.services('compute', 'volume', 'image')
     def test_encrypted_cinder_volumes_luksv2(self):
         """LUKs v2 decrypts volume through os-brick."""
-        server = self.launch_instance()
         volume = self.create_encrypted_volume('luks2',
-                                              volume_type='luksv2')
+                                              volume_type='luksv2',
+                                              wait_until=None)
+        server = self.launch_instance()
+        waiters.wait_for_volume_resource_status(self.volumes_client,
+                                                volume['id'], 'available')
+        # The volume retrieved on creation has a non-up-to-date status.
+        # Retrieval after it becomes active ensures correct details.
+        volume = self.volumes_client.show_volume(volume['id'])['volume']
+
         self.attach_detach_volume(server, volume)
 
     @decorators.idempotent_id('cbc752ed-b716-4717-910f-956cce965722')
     @decorators.attr(type='slow')
     @utils.services('compute', 'volume', 'image')
     def test_encrypted_cinder_volumes_cryptsetup(self):
-        server = self.launch_instance()
         volume = self.create_encrypted_volume('plain',
-                                              volume_type='cryptsetup')
+                                              volume_type='cryptsetup',
+                                              wait_until=None)
+        server = self.launch_instance()
+        waiters.wait_for_volume_resource_status(self.volumes_client,
+                                                volume['id'], 'available')
+        # The volume retrieved on creation has a non-up-to-date status.
+        # Retrieval after it becomes active ensures correct details.
+        volume = self.volumes_client.show_volume(volume['id'])['volume']
+
         self.attach_detach_volume(server, volume)
diff --git a/tempest/scenario/test_network_advanced_server_ops.py b/tempest/scenario/test_network_advanced_server_ops.py
index e6c6eb6..2c7c085 100644
--- a/tempest/scenario/test_network_advanced_server_ops.py
+++ b/tempest/scenario/test_network_advanced_server_ops.py
@@ -270,6 +270,11 @@
         new_host = self.get_host_for_server(server['id'])
         self.assertNotEqual(old_host, new_host, 'Server did not migrate')
 
+        # we first wait until the VM replies pings again, then check the
+        # network downtime
+        self._wait_server_status_and_check_network_connectivity(
+            server, keypair, floating_ip)
+
         downtime = downtime_meter.get_downtime()
         self.assertIsNotNone(downtime)
         LOG.debug("Downtime seconds measured with downtime_meter = %r",
@@ -280,9 +285,6 @@
             "Downtime of {} seconds is higher than expected '{}'".format(
                 downtime, allowed_downtime))
 
-        self._wait_server_status_and_check_network_connectivity(
-            server, keypair, floating_ip)
-
     @decorators.idempotent_id('25b188d7-0183-4b1e-a11d-15840c8e2fd6')
     @testtools.skipUnless(CONF.compute_feature_enabled.cold_migration,
                           'Cold migration is not available.')
diff --git a/tempest/scenario/test_network_basic_ops.py b/tempest/scenario/test_network_basic_ops.py
index cbe4122..7b819e0 100644
--- a/tempest/scenario/test_network_basic_ops.py
+++ b/tempest/scenario/test_network_basic_ops.py
@@ -898,10 +898,13 @@
                                        nic=spoof_nic, should_succeed=True)
         # Set a mac address by making nic down temporary
         spoof_ip_addresses = ssh_client.get_nic_ip_addresses(spoof_nic)
-        cmd = ("sudo ip link set {nic} down;"
+        dhcp_cmd = ("sudo start-stop-daemon -K -x /sbin/dhcpcd -p "
+                    "/var/run/dhcpcd/pid -o || true")
+        cmd = ("{dhcp_cmd}; sudo ip link set {nic} down;"
                "sudo ip link set dev {nic} address {mac};"
                "sudo ip link set {nic} up;"
                "sudo ip address flush dev {nic};").format(nic=spoof_nic,
+                                                          dhcp_cmd=dhcp_cmd,
                                                           mac=spoof_mac)
         for ip_address in spoof_ip_addresses:
             cmd += (
diff --git a/tempest/scenario/test_server_advanced_ops.py b/tempest/scenario/test_server_advanced_ops.py
index 990b325..1c2246d 100644
--- a/tempest/scenario/test_server_advanced_ops.py
+++ b/tempest/scenario/test_server_advanced_ops.py
@@ -14,7 +14,6 @@
 #    under the License.
 
 from oslo_log import log as logging
-import testtools
 
 from tempest.common import utils
 from tempest.common import waiters
@@ -36,14 +35,21 @@
     """
 
     @classmethod
+    def skip_checks(cls):
+        super(TestServerAdvancedOps, cls).skip_checks()
+        if not CONF.service_available.nova:
+            skip_msg = ("%s skipped as Nova is not available" % cls.__name__)
+            raise cls.skipException(skip_msg)
+        if not CONF.compute_feature_enabled.suspend:
+            raise cls.skipException("Suspend is not available.")
+
+    @classmethod
     def setup_credentials(cls):
         cls.set_network_resources(network=True, subnet=True)
         super(TestServerAdvancedOps, cls).setup_credentials()
 
     @decorators.attr(type='slow')
     @decorators.idempotent_id('949da7d5-72c8-4808-8802-e3d70df98e2c')
-    @testtools.skipUnless(CONF.compute_feature_enabled.suspend,
-                          'Suspend is not available.')
     @utils.services('compute')
     def test_server_sequence_suspend_resume(self):
         # We create an instance for use in this test
diff --git a/tempest/scenario/test_server_basic_ops.py b/tempest/scenario/test_server_basic_ops.py
index 5e10ebf..3830fbc 100644
--- a/tempest/scenario/test_server_basic_ops.py
+++ b/tempest/scenario/test_server_basic_ops.py
@@ -49,16 +49,8 @@
 
     def verify_ssh(self, keypair):
         if self.run_ssh:
-            # Obtain a floating IP if floating_ips is enabled
-            if (CONF.network_feature_enabled.floating_ips and
-                CONF.network.floating_network_name):
-                fip = self.create_floating_ip(self.instance)
-                self.ip = self.associate_floating_ip(
-                    fip, self.instance)['floating_ip_address']
-            else:
-                server = self.servers_client.show_server(
-                    self.instance['id'])['server']
-                self.ip = self.get_server_ip(server)
+            # Obtain server IP
+            self.ip = self.get_server_ip(self.instance)
             # Check ssh
             self.ssh_client = self.get_remote_client(
                 ip_address=self.ip,
diff --git a/tempest/scenario/test_server_multinode.py b/tempest/scenario/test_server_multinode.py
index 9285da2..fe85234 100644
--- a/tempest/scenario/test_server_multinode.py
+++ b/tempest/scenario/test_server_multinode.py
@@ -14,6 +14,7 @@
 #    under the License.
 
 from tempest.common import utils
+from tempest.common import waiters
 from tempest import config
 from tempest.lib import decorators
 from tempest.lib import exceptions
@@ -46,7 +47,8 @@
             if zone['zoneState']['available']:
                 for host in zone['hosts']:
                     if 'nova-compute' in zone['hosts'][host] and \
-                        zone['hosts'][host]['nova-compute']['available']:
+                        zone['hosts'][host]['nova-compute']['available'] and \
+                        not host.endswith('-ironic'):
                         hosts.append({'zone': zone['zoneName'],
                                       'host_name': host})
 
@@ -60,6 +62,7 @@
         # threshold (so that things don't get crazy if you have 1000
         # compute nodes but set min to 3).
         servers = []
+        host_server_ids = {}
 
         for host in hosts[:CONF.compute.min_compute_nodes]:
             # by getting to active state here, this means this has
@@ -67,12 +70,18 @@
             # in order to use the availability_zone:host scheduler hint,
             # admin client is need here.
             inst = self.create_server(
+                wait_until=None,
                 clients=self.os_admin,
                 availability_zone='%(zone)s:%(host_name)s' % host)
+            host_server_ids[host['host_name']] = inst['id']
+
+        for host_name, server_id in host_server_ids.items():
+            waiters.wait_for_server_status(self.os_admin.servers_client,
+                                           server_id, 'ACTIVE')
             server = self.os_admin.servers_client.show_server(
-                inst['id'])['server']
+                server_id)['server']
             # ensure server is located on the requested host
-            self.assertEqual(host['host_name'], server['OS-EXT-SRV-ATTR:host'])
+            self.assertEqual(host_name, server['OS-EXT-SRV-ATTR:host'])
             servers.append(server)
 
         # make sure we really have the number of servers we think we should
diff --git a/tempest/scenario/test_server_volume_attachment.py b/tempest/scenario/test_server_volume_attachment.py
new file mode 100644
index 0000000..1d0d0d0
--- /dev/null
+++ b/tempest/scenario/test_server_volume_attachment.py
@@ -0,0 +1,201 @@
+# Copyright 2023 Red Hat
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from unittest import mock
+
+from tempest.common import utils
+from tempest.common import waiters
+from tempest import config
+from tempest.lib import decorators
+from tempest.lib import exceptions
+from tempest.scenario import manager
+
+CONF = config.CONF
+
+
+class BaseAttachmentTest(manager.ScenarioTest):
+    @classmethod
+    def setup_clients(cls):
+        super().setup_clients()
+        cls.attachments_client = cls.os_primary.attachments_client_latest
+        cls.admin_volume_client = cls.os_admin.volumes_client_latest
+
+    def _call_with_fake_service_token(self, valid_token,
+                                      client, method_name, *args, **kwargs):
+        """Call client method with non-service service token
+
+        Add a service token header that can be a valid normal user token (which
+        won't have the service role) or an invalid token altogether.
+        """
+        original_raw_request = client.raw_request
+
+        def raw_request(url, method, headers=None, body=None, chunked=False,
+                        log_req_body=None):
+            token = headers['X-Auth-Token']
+            if not valid_token:
+                token = token[:-1] + ('a' if token[-1] != 'a' else 'b')
+            headers['X-Service-Token'] = token
+            return original_raw_request(url, method, headers=headers,
+                                        body=body, chunked=chunked,
+                                        log_req_body=log_req_body)
+
+        client_method = getattr(client, method_name)
+        with mock.patch.object(client, 'raw_request', raw_request):
+            return client_method(*args, **kwargs)
+
+
+class TestServerVolumeAttachmentScenario(BaseAttachmentTest):
+
+    """Test server attachment behaviors
+
+    This tests that volume attachments to servers may not be removed directly
+    and are only allowed through the compute service (bug #2004555).
+    """
+
+    @decorators.attr(type='slow')
+    @decorators.idempotent_id('be615530-f105-437a-8afe-ce998c9535d9')
+    @utils.services('compute', 'volume', 'image', 'network')
+    def test_server_detach_rules(self):
+        """Test that various methods of detaching a volume honors the rules"""
+        volume = self.create_volume(wait_until=None)
+        volume2 = self.create_volume(wait_until=None)
+
+        server = self.create_server(wait_until='SSHABLE')
+        servers = self.servers_client.list_servers()['servers']
+        self.assertIn(server['id'], [x['id'] for x in servers])
+
+        waiters.wait_for_volume_resource_status(self.volumes_client,
+                                                volume['id'], 'available')
+        # The volume retrieved on creation has a non-up-to-date status.
+        # Retrieval after it becomes active ensures correct details.
+        volume = self.volumes_client.show_volume(volume['id'])['volume']
+
+        volume = self.nova_volume_attach(server, volume)
+        self.addCleanup(self.nova_volume_detach, server, volume)
+        att_id = volume['attachments'][0]['attachment_id']
+
+        # Test user call to detach volume is rejected
+        self.assertRaises((exceptions.Forbidden, exceptions.Conflict),
+                          self.volumes_client.detach_volume, volume['id'])
+
+        # Test user call to terminate connection is rejected
+        self.assertRaises((exceptions.Forbidden, exceptions.Conflict),
+                          self.volumes_client.terminate_connection,
+                          volume['id'], connector={})
+
+        # Test faking of service token on call to detach, force detach,
+        # terminate_connection
+        for valid_token in (True, False):
+            valid_exceptions = [exceptions.Forbidden, exceptions.Conflict]
+            if not valid_token:
+                valid_exceptions.append(exceptions.Unauthorized)
+            self.assertRaises(
+                tuple(valid_exceptions),
+                self._call_with_fake_service_token,
+                valid_token,
+                self.volumes_client,
+                'detach_volume',
+                volume['id'])
+            self.assertRaises(
+                tuple(valid_exceptions),
+                self._call_with_fake_service_token,
+                valid_token,
+                self.volumes_client,
+                'terminate_connection',
+                volume['id'], connector={})
+
+        # Reset volume's status to error
+        self.admin_volume_client.reset_volume_status(volume['id'],
+                                                     status='error')
+        waiters.wait_for_volume_resource_status(self.volumes_client,
+                                                volume['id'], 'error')
+
+        # For the cleanup, we need to reset the volume status to in-use before
+        # the other cleanup steps try to detach it.
+        self.addCleanup(waiters.wait_for_volume_resource_status,
+                        self.volumes_client, volume['id'], 'in-use')
+        self.addCleanup(self.admin_volume_client.reset_volume_status,
+                        volume['id'], status='in-use')
+
+        # Test user call to force detach volume is rejected
+        self.assertRaises(
+            (exceptions.Forbidden, exceptions.Conflict),
+            self.admin_volume_client.force_detach_volume,
+            volume['id'], connector=None,
+            attachment_id=att_id)
+
+        # Test trying to override detach with force and service token
+        for valid_token in (True, False):
+            valid_exceptions = [exceptions.Forbidden, exceptions.Conflict]
+            if not valid_token:
+                valid_exceptions.append(exceptions.Unauthorized)
+            self.assertRaises(
+                tuple(valid_exceptions),
+                self._call_with_fake_service_token,
+                valid_token,
+                self.admin_volume_client,
+                'force_detach_volume',
+                volume['id'], connector=None, attachment_id=att_id)
+
+        # Test user call to detach with mismatch is rejected
+        waiters.wait_for_volume_resource_status(self.volumes_client,
+                                                volume2['id'], 'available')
+        # The volume retrieved on creation has a non-up-to-date status.
+        # Retrieval after it becomes active ensures correct details.
+        volume2 = self.volumes_client.show_volume(volume2['id'])['volume']
+
+        volume2 = self.nova_volume_attach(server, volume2)
+        att_id2 = volume2['attachments'][0]['attachment_id']
+        self.assertRaises(
+            (exceptions.Forbidden, exceptions.BadRequest),
+            self.volumes_client.detach_volume,
+            volume['id'], attachment_id=att_id2)
+
+
+class TestServerVolumeAttachScenarioOldVersion(BaseAttachmentTest):
+    volume_min_microversion = '3.27'
+    volume_max_microversion = 'latest'
+
+    @decorators.attr(type='slow')
+    @decorators.idempotent_id('6f4d2144-99f4-495c-8b0b-c6a537971418')
+    @utils.services('compute', 'volume', 'image', 'network')
+    def test_old_versions_reject(self):
+        server = self.create_server(wait_until='SSHABLE')
+        servers = self.servers_client.list_servers()['servers']
+        self.assertIn(server['id'], [x['id'] for x in servers])
+
+        volume = self.create_volume()
+
+        volume = self.nova_volume_attach(server, volume)
+        self.addCleanup(self.nova_volume_detach, server, volume)
+        att_id = volume['attachments'][0]['attachment_id']
+
+        for valid_token in (True, False):
+            valid_exceptions = [exceptions.Forbidden,
+                                exceptions.Conflict]
+            if not valid_token:
+                valid_exceptions.append(exceptions.Unauthorized)
+            self.assertRaises(
+                tuple(valid_exceptions),
+                self._call_with_fake_service_token,
+                valid_token,
+                self.attachments_client,
+                'delete_attachment',
+                att_id)
+
+        self.assertRaises(
+            (exceptions.Forbidden, exceptions.Conflict),
+            self.attachments_client.delete_attachment,
+            att_id)
diff --git a/tempest/scenario/test_stamp_pattern.py b/tempest/scenario/test_stamp_pattern.py
index 4b81b9e..82f0341 100644
--- a/tempest/scenario/test_stamp_pattern.py
+++ b/tempest/scenario/test_stamp_pattern.py
@@ -16,6 +16,7 @@
 import testtools
 
 from tempest.common import utils
+from tempest.common import waiters
 from tempest import config
 from tempest.lib.common.utils import test_utils
 from tempest.lib import decorators
@@ -84,7 +85,7 @@
         security_group = self.create_security_group()
 
         # boot an instance and create a timestamp file in it
-        volume = self.create_volume()
+        volume = self.create_volume(wait_until=None)
         server = self.create_server(
             key_name=keypair['name'],
             security_groups=[{'name': security_group['name']}])
@@ -97,6 +98,12 @@
             ip_for_server, private_key=keypair['private_key'],
             server=server)
         disks_list_before_attach = linux_client.list_disks()
+        waiters.wait_for_volume_resource_status(self.volumes_client,
+                                                volume['id'], 'available')
+        # The volume retrieved on creation has a non-up-to-date status.
+        # Retrieval after it becomes active ensures correct details.
+        volume = self.volumes_client.show_volume(volume['id'])['volume']
+
         self.nova_volume_attach(server, volume)
         volume_device_name = self._attached_volume_name(
             disks_list_before_attach, ip_for_server, keypair['private_key'])
@@ -115,7 +122,7 @@
 
         # create second volume from the snapshot(volume2)
         volume_from_snapshot = self.create_volume(
-            snapshot_id=volume_snapshot['id'])
+            snapshot_id=volume_snapshot['id'], wait_until=None)
 
         # boot second instance from the snapshot(instance2)
         server_from_snapshot = self.create_server(
@@ -135,6 +142,14 @@
         disks_list_before_attach = linux_client.list_disks()
 
         # attach volume2 to instance2
+        waiters.wait_for_volume_resource_status(self.volumes_client,
+                                                volume_from_snapshot['id'],
+                                                'available')
+        # The volume retrieved on creation has a non-up-to-date status.
+        # Retrieval after it becomes active ensures correct details.
+        volume_from_snapshot = self.volumes_client.show_volume(
+            volume_from_snapshot['id'])['volume']
+
         self.nova_volume_attach(server_from_snapshot, volume_from_snapshot)
         volume_device_name = self._attached_volume_name(
             disks_list_before_attach, ip_for_snapshot, keypair['private_key'])
diff --git a/zuul.d/base.yaml b/zuul.d/base.yaml
index 2d978c0..0ac893a 100644
--- a/zuul.d/base.yaml
+++ b/zuul.d/base.yaml
@@ -13,6 +13,8 @@
     roles: &base_roles
       - zuul: opendev.org/openstack/devstack
     vars: &base_vars
+      devstack_localrc:
+        IMAGE_URLS: http://download.cirros-cloud.net/0.6.2/cirros-0.6.2-x86_64-disk.img, http://download.cirros-cloud.net/0.6.1/cirros-0.6.1-x86_64-disk.img
       devstack_services:
         tempest: true
       devstack_local_conf:
diff --git a/zuul.d/integrated-gate.yaml b/zuul.d/integrated-gate.yaml
index 233cb6c..8ac0b42 100644
--- a/zuul.d/integrated-gate.yaml
+++ b/zuul.d/integrated-gate.yaml
@@ -100,15 +100,6 @@
         # Enbale horizon so that we can run horizon test.
         horizon: true
 
-# TODO(gmann): As per the 2023.1 testing runtime, we need to run at least
-# one job on Focal. This job can be removed as per the future testing
-# runtime (whenever we drop the Ubuntu Focal testing).
-- job:
-    name: tempest-full-ubuntu-focal
-    description: This is tempest-full python3 job on Ubuntu Focal(20.04)
-    parent: tempest-full-py3
-    nodeset: openstack-single-node-focal
-
 - job:
     name: tempest-full-centos-9-stream
     parent: tempest-full-py3
@@ -285,7 +276,7 @@
     timeout: 10800
     # This job runs on stable/stein onwards.
     branches: ^(?!stable/(ocata|pike|queens|rocky)).*$
-    vars: &tempest_slow_vars
+    vars:
       tox_envlist: slow-serial
       devstack_localrc:
         CINDER_ENABLED_BACKENDS: lvm:lvmdriver-1,lvm:lvmdriver-2
@@ -295,7 +286,6 @@
       devstack_services:
         neutron-placement: true
         neutron-qos: true
-      tempest_concurrency: 2
     group-vars:
       # NOTE(mriedem): The ENABLE_VOLUME_MULTIATTACH variable is used on both
       # the controller and subnode prior to Rocky so we have to make sure the
@@ -310,17 +300,27 @@
     # This job version is with swift enabled on py3
     # as swift is ready on py3 from stable/ussuri onwards.
     timeout: 10800
-    branches: ^(?!stable/(ocata|pike|queens|rocky|stein|train)).*$
-    vars: *tempest_slow_vars
-
-- job:
-    name: tempest-slow-parallel
-    parent: tempest-slow-py3
-    # This job run slow tests in parallel.
+    # As the 'slow' tox env which is not available in old tempest used
+    # till stable/wallaby, this job definition is only for stable/xena
+    # onwards and separate job definition until stable/wallaby
+    branches: ^(?!stable/(ocata|pike|queens|rocky|stein|train|ussuri|victoria|wallaby)).*$
     vars:
       tox_envlist: slow
       devstack_localrc:
-        MYSQL_REDUCE_MEMORY: true
+        CINDER_ENABLED_BACKENDS: lvm:lvmdriver-1,lvm:lvmdriver-2
+        ENABLE_VOLUME_MULTIATTACH: true
+      devstack_plugins:
+        neutron: https://opendev.org/openstack/neutron
+      devstack_services:
+        neutron-placement: true
+        neutron-qos: true
+    group-vars:
+      # NOTE(mriedem): The ENABLE_VOLUME_MULTIATTACH variable is used on both
+      # the controller and subnode prior to Rocky so we have to make sure the
+      # variable is set in both locations.
+      subnode:
+        devstack_localrc:
+          ENABLE_VOLUME_MULTIATTACH: true
 
 - job:
     name: tempest-cinder-v2-api
@@ -393,12 +393,24 @@
         # Keystone policies are changed to work for both system as well as
         # for project scoped, we need to keep scope check disable for
         # keystone.
-        NOVA_ENFORCE_SCOPE: true
+        # Nova and Glance have enabled the new defaults and scope by default
+        # in devstack.
         CINDER_ENFORCE_SCOPE: true
-        GLANCE_ENFORCE_SCOPE: true
         NEUTRON_ENFORCE_SCOPE: true
         PLACEMENT_ENFORCE_SCOPE: true
 
+- job:
+    name: tempest-all-rbac-old-defaults
+    parent: tempest-all
+    description: |
+      Integration test that runs all tests on RBAC old defaults.
+      devstack_localrc:
+        # NOTE(gmann): Nova and Glance have enabled the new defaults and scope
+        # by default in devstack so we need some jobs keep testing the old
+        # defaults until they are removed from service side.
+        NOVA_ENFORCE_SCOPE: false
+        GLANCE_ENFORCE_SCOPE: false
+
 - project-template:
     name: integrated-gate-networking
     description: |
diff --git a/zuul.d/project.yaml b/zuul.d/project.yaml
index be8442a..9787526 100644
--- a/zuul.d/project.yaml
+++ b/zuul.d/project.yaml
@@ -28,8 +28,6 @@
               - ^.mailmap$
         - tempest-extra-tests:
             irrelevant-files: *tempest-irrelevant-files
-        - tempest-full-ubuntu-focal:
-            irrelevant-files: *tempest-irrelevant-files
         - glance-multistore-cinder-import:
             voting: false
             irrelevant-files: *tempest-irrelevant-files
@@ -40,7 +38,7 @@
         # those in respective stable branch gate.
         - tempest-full-2023-1:
             irrelevant-files: *tempest-irrelevant-files
-        - tempest-full-xena:
+        - tempest-full-yoga:
             irrelevant-files: *tempest-irrelevant-files
         - tempest-multinode-full-py3:
             irrelevant-files: *tempest-irrelevant-files
@@ -120,6 +118,8 @@
         - tempest-full-test-account-py3:
             voting: false
             irrelevant-files: *tempest-irrelevant-files
+        - ironic-tempest-bios-ipmi-direct-tinyipa:
+            irrelevant-files: *tempest-irrelevant-files
         - openstack-tox-bashate:
             irrelevant-files: *tempest-irrelevant-files-2
     gate:
@@ -130,8 +130,6 @@
         - openstack-tox-py310
         - tempest-slow-py3:
             irrelevant-files: *tempest-irrelevant-files
-        - tempest-full-ubuntu-focal:
-            irrelevant-files: *tempest-irrelevant-files
         - neutron-ovs-grenade-multinode:
             irrelevant-files: *tempest-irrelevant-files
         - tempest-full-py3:
@@ -150,6 +148,8 @@
         #    irrelevant-files: *tempest-irrelevant-files
         - nova-live-migration:
             irrelevant-files: *tempest-irrelevant-files
+        - ironic-tempest-bios-ipmi-direct-tinyipa:
+            irrelevant-files: *tempest-irrelevant-files
     experimental:
       jobs:
         - nova-multi-cell
@@ -161,11 +161,10 @@
             irrelevant-files: *tempest-irrelevant-files
         - tempest-all:
             irrelevant-files: *tempest-irrelevant-files
-        - tempest-slow-parallel
+        - tempest-all-rbac-old-defaults
         - tempest-full-parallel
         - tempest-full-zed-extra-tests
         - tempest-full-yoga-extra-tests
-        - tempest-full-xena-extra-tests
         - tempest-full-enforce-scope-new-defaults-zed
         - neutron-ovs-tempest-dvr-ha-multinode-full:
             irrelevant-files: *tempest-irrelevant-files
@@ -188,19 +187,16 @@
         - tempest-full-2023-1
         - tempest-full-zed
         - tempest-full-yoga
-        - tempest-full-xena
         - tempest-slow-2023-1
         - tempest-slow-zed
         - tempest-slow-yoga
-        - tempest-slow-xena
         - tempest-full-2023-1-extra-tests
         - tempest-full-zed-extra-tests
         - tempest-full-yoga-extra-tests
-        - tempest-full-xena-extra-tests
     periodic:
       jobs:
         - tempest-all
-        - tempest-slow-parallel
+        - tempest-all-rbac-old-defaults
         - tempest-full-parallel
         - tempest-full-oslo-master
         - tempest-stestr-master
diff --git a/zuul.d/stable-jobs.yaml b/zuul.d/stable-jobs.yaml
index c5fc063..d399556 100644
--- a/zuul.d/stable-jobs.yaml
+++ b/zuul.d/stable-jobs.yaml
@@ -18,12 +18,6 @@
     override-checkout: stable/yoga
 
 - job:
-    name: tempest-full-xena
-    parent: tempest-full-py3
-    nodeset: openstack-single-node-focal
-    override-checkout: stable/xena
-
-- job:
     name: tempest-full-2023-1-extra-tests
     parent: tempest-extra-tests
     nodeset: openstack-single-node-jammy
@@ -42,12 +36,6 @@
     override-checkout: stable/yoga
 
 - job:
-    name: tempest-full-xena-extra-tests
-    parent: tempest-extra-tests
-    nodeset: openstack-single-node-focal
-    override-checkout: stable/xena
-
-- job:
     name: tempest-slow-2023-1
     parent: tempest-slow-py3
     nodeset: openstack-two-node-jammy
@@ -72,12 +60,6 @@
     override-checkout: stable/yoga
 
 - job:
-    name: tempest-slow-xena
-    parent: tempest-slow-py3
-    nodeset: openstack-two-node-focal
-    override-checkout: stable/xena
-
-- job:
     name: tempest-full-py3
     parent: devstack-tempest
     # This job version is to use the 'full' tox env which
@@ -294,7 +276,6 @@
       devstack_services:
         neutron-placement: true
         neutron-qos: true
-      tempest_concurrency: 2
     group-vars:
       # NOTE(mriedem): The ENABLE_VOLUME_MULTIATTACH variable is used on both
       # the controller and subnode prior to Rocky so we have to make sure the
@@ -330,6 +311,18 @@
           USE_PYTHON3: true
 
 - job:
+    name: tempest-slow-py3
+    parent: tempest-slow
+    # This job version is to use the 'slow-serial' tox env for
+    # the stable/ussuri to stable/wallaby testing.
+    branches:
+      - stable/ussuri
+      - stable/victoria
+      - stable/wallaby
+    vars:
+      tox_envlist: slow-serial
+
+- job:
     name: tempest-full-py3-opensuse15
     parent: tempest-full-py3
     nodeset: devstack-single-node-opensuse-15
diff --git a/zuul.d/tempest-specific.yaml b/zuul.d/tempest-specific.yaml
index a8c29af..ca63fcc 100644
--- a/zuul.d/tempest-specific.yaml
+++ b/zuul.d/tempest-specific.yaml
@@ -93,7 +93,12 @@
     vars:
       devstack_localrc:
         TEMPEST_USE_TEST_ACCOUNTS: True
-
+        # FIXME(gmann): Nova and Glance have enabled the new defaults and scope
+        # by default in devstack and pre provisioned account code and testing
+        # needs to be move to new RBAC design testing. Until we do that, let's
+        # run these jobs with old defaults.
+        NOVA_ENFORCE_SCOPE: false
+        GLANCE_ENFORCE_SCOPE: false
 - job:
     name: tempest-full-test-account-no-admin-py3
     parent: tempest-full-test-account-py3