diff --git a/releasenotes/notes/remove-xenapi_apis-86720c0c399460ab.yaml b/releasenotes/notes/remove-xenapi_apis-86720c0c399460ab.yaml
new file mode 100644
index 0000000..26da18c
--- /dev/null
+++ b/releasenotes/notes/remove-xenapi_apis-86720c0c399460ab.yaml
@@ -0,0 +1,5 @@
+---
+upgrade:
+  - |
+    The deprecated ``[compute-feature-enabled] xenapi_apis`` option has been
+    removed.
diff --git a/releasenotes/notes/resource-list-cbf9779e8b434654.yaml b/releasenotes/notes/resource-list-cbf9779e8b434654.yaml
new file mode 100644
index 0000000..bbd2f16
--- /dev/null
+++ b/releasenotes/notes/resource-list-cbf9779e8b434654.yaml
@@ -0,0 +1,11 @@
+---
+features:
+  - |
+    A new interface ``--resource-list`` has been introduced in the
+    ``tempest cleanup`` command to remove the resources created by
+    Tempest. A new config option in the default section, ``record_resources``,
+    is added to allow the recording of all resources created by Tempest.
+    A list of these resources will be saved in ``resource_list.json`` file,
+    which will be appended in case of multiple Tempest runs. This file
+    is intended to be used with the ``tempest cleanup`` command if it is
+    used with the newly added option ``--resource-list``.
diff --git a/roles/tempest-cleanup/README.rst b/roles/tempest-cleanup/README.rst
index d43319c..255ca2d 100644
--- a/roles/tempest-cleanup/README.rst
+++ b/roles/tempest-cleanup/README.rst
@@ -47,6 +47,15 @@
    only resources with names that match the prefix. This option can be used
    together with dry_run.
 
+.. zuul:rolevar:: run_tempest_cleanup_resource_list
+   :default: false
+
+   When true, tempest cleanup will be called with '--resource-list' to delete
+   only resources listed in ./resource_list.json that is created if
+   record_resources config option in the default section of tempest.conf file
+   is enabled (set to True). The resource_list.json contains all resources
+   created by Tempest during a Tempest run.
+
 Role usage
 ----------
 
diff --git a/roles/tempest-cleanup/defaults/main.yaml b/roles/tempest-cleanup/defaults/main.yaml
index 8060b29..1ec2f8c 100644
--- a/roles/tempest-cleanup/defaults/main.yaml
+++ b/roles/tempest-cleanup/defaults/main.yaml
@@ -3,3 +3,4 @@
 dry_run: false
 run_tempest_fail_if_leaked_resources: false
 run_tempest_cleanup_prefix: false
+run_tempest_cleanup_resource_list: false
diff --git a/roles/tempest-cleanup/tasks/dry_run.yaml b/roles/tempest-cleanup/tasks/dry_run.yaml
index 07e1b63..8ae5183 100644
--- a/roles/tempest-cleanup/tasks/dry_run.yaml
+++ b/roles/tempest-cleanup/tasks/dry_run.yaml
@@ -5,7 +5,9 @@
   command: tox -evenv-tempest -- tempest cleanup --dry-run --debug
   args:
     chdir: "{{ devstack_base_dir }}/tempest"
-  when: not run_tempest_cleanup_prefix
+  when:
+    - not run_tempest_cleanup_prefix
+    - run_tempest_cleanup_resource_list is not defined or not run_tempest_cleanup_resource_list
 
 - name: Run tempest cleanup dry-run with tempest prefix
   become: yes
@@ -13,4 +15,12 @@
   command: tox -evenv-tempest -- tempest cleanup --dry-run --debug --prefix tempest
   args:
     chdir: "{{ devstack_base_dir }}/tempest"
-  when: run_tempest_cleanup_prefix
\ No newline at end of file
+  when: run_tempest_cleanup_prefix
+
+- name: Run tempest cleanup with tempest resource list
+  become: yes
+  become_user: tempest
+  command: tox -evenv-tempest -- tempest cleanup --dry-run --debug --resource-list
+  args:
+    chdir: "{{ devstack_base_dir }}/tempest"
+  when: run_tempest_cleanup_resource_list
diff --git a/roles/tempest-cleanup/tasks/main.yaml b/roles/tempest-cleanup/tasks/main.yaml
index 7ef4928..1e1c1a7 100644
--- a/roles/tempest-cleanup/tasks/main.yaml
+++ b/roles/tempest-cleanup/tasks/main.yaml
@@ -27,7 +27,9 @@
       command: tox -evenv-tempest -- tempest cleanup --debug
       args:
         chdir: "{{ devstack_base_dir }}/tempest"
-      when: not run_tempest_cleanup_prefix
+      when:
+        - not run_tempest_cleanup_prefix
+        - run_tempest_cleanup_resource_list is not defined or not run_tempest_cleanup_resource_list
 
     - name: Run tempest cleanup with tempest prefix
       become: yes
@@ -37,6 +39,18 @@
         chdir: "{{ devstack_base_dir }}/tempest"
       when: run_tempest_cleanup_prefix
 
+    - name: Cat resource_list.json
+      command: cat "{{ devstack_base_dir }}/tempest/resource_list.json"
+      when: run_tempest_cleanup_resource_list
+
+    - name: Run tempest cleanup with tempest resource list
+      become: yes
+      become_user: tempest
+      command: tox -evenv-tempest -- tempest cleanup --debug --resource-list
+      args:
+        chdir: "{{ devstack_base_dir }}/tempest"
+      when: run_tempest_cleanup_resource_list
+
 - when:
     - run_tempest_fail_if_leaked_resources
     - not init_saved_state
diff --git a/tempest/api/compute/admin/test_agents.py b/tempest/api/compute/admin/test_agents.py
deleted file mode 100644
index 8fc155b..0000000
--- a/tempest/api/compute/admin/test_agents.py
+++ /dev/null
@@ -1,125 +0,0 @@
-# Copyright 2014 NEC Corporation.  All rights reserved.
-#
-#    Licensed under the Apache License, Version 2.0 (the "License"); you may
-#    not use this file except in compliance with the License. You may obtain
-#    a copy of the License at
-#
-#         http://www.apache.org/licenses/LICENSE-2.0
-#
-#    Unless required by applicable law or agreed to in writing, software
-#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-#    License for the specific language governing permissions and limitations
-#    under the License.
-
-from tempest.api.compute import base
-from tempest import config
-from tempest.lib.common.utils import data_utils
-from tempest.lib import decorators
-
-CONF = config.CONF
-
-
-# TODO(stephenfin): Remove these tests once the nova Ussuri branch goes EOL
-class AgentsAdminTestJSON(base.BaseV2ComputeAdminTest):
-    """Tests Compute Agents API"""
-
-    @classmethod
-    def skip_checks(cls):
-        super(AgentsAdminTestJSON, cls).skip_checks()
-        if not CONF.compute_feature_enabled.xenapi_apis:
-            raise cls.skipException('The os-agents API is not supported.')
-
-    @classmethod
-    def setup_clients(cls):
-        super(AgentsAdminTestJSON, cls).setup_clients()
-        cls.client = cls.os_admin.agents_client
-
-    @classmethod
-    def resource_setup(cls):
-        super(AgentsAdminTestJSON, cls).resource_setup()
-        cls.params_agent = cls._param_helper(
-            hypervisor='common', os='linux', architecture='x86_64',
-            version='7.0', url='xxx://xxxx/xxx/xxx',
-            md5hash='add6bb58e139be103324d04d82d8f545')
-
-    @staticmethod
-    def _param_helper(**kwargs):
-        rand_key = 'architecture'
-        if rand_key in kwargs:
-            # NOTE: The rand_name is for avoiding agent conflicts.
-            # If you try to create an agent with the same hypervisor,
-            # os and architecture as an existing agent, Nova will return
-            # an HTTPConflict or HTTPServerError.
-            kwargs[rand_key] = data_utils.rand_name(
-                prefix=CONF.resource_name_prefix,
-                name=kwargs[rand_key])
-        return kwargs
-
-    @decorators.idempotent_id('1fc6bdc8-0b6d-4cc7-9f30-9b04fabe5b90')
-    def test_create_agent(self):
-        """Test creating a compute agent"""
-        params = self._param_helper(
-            hypervisor='kvm', os='win', architecture='x86',
-            version='7.0', url='xxx://xxxx/xxx/xxx',
-            md5hash='add6bb58e139be103324d04d82d8f545')
-        body = self.client.create_agent(**params)['agent']
-        self.addCleanup(self.client.delete_agent, body['agent_id'])
-        for expected_item, value in params.items():
-            self.assertEqual(value, body[expected_item])
-
-    @decorators.idempotent_id('dc9ffd51-1c50-4f0e-a820-ae6d2a568a9e')
-    def test_update_agent(self):
-        """Test updating a compute agent"""
-        # Create and update an agent.
-        body = self.client.create_agent(**self.params_agent)['agent']
-        self.addCleanup(self.client.delete_agent, body['agent_id'])
-        agent_id = body['agent_id']
-        params = self._param_helper(
-            version='8.0', url='xxx://xxxx/xxx/xxx2',
-            md5hash='add6bb58e139be103324d04d82d8f547')
-        body = self.client.update_agent(agent_id, **params)['agent']
-        for expected_item, value in params.items():
-            self.assertEqual(value, body[expected_item])
-
-    @decorators.idempotent_id('470e0b89-386f-407b-91fd-819737d0b335')
-    def test_delete_agent(self):
-        """Test deleting a compute agent"""
-        body = self.client.create_agent(**self.params_agent)['agent']
-        self.client.delete_agent(body['agent_id'])
-
-        # Verify the list doesn't contain the deleted agent.
-        agents = self.client.list_agents()['agents']
-        self.assertNotIn(body['agent_id'], map(lambda x: x['agent_id'],
-                                               agents))
-
-    @decorators.idempotent_id('6a326c69-654b-438a-80a3-34bcc454e138')
-    def test_list_agents(self):
-        """Test listing compute agents"""
-        body = self.client.create_agent(**self.params_agent)['agent']
-        self.addCleanup(self.client.delete_agent, body['agent_id'])
-        agents = self.client.list_agents()['agents']
-        self.assertNotEmpty(agents, 'Cannot get any agents.')
-        self.assertIn(body['agent_id'], map(lambda x: x['agent_id'], agents))
-
-    @decorators.idempotent_id('eabadde4-3cd7-4ec4-a4b5-5a936d2d4408')
-    def test_list_agents_with_filter(self):
-        """Test listing compute agents by the filter"""
-        body = self.client.create_agent(**self.params_agent)['agent']
-        self.addCleanup(self.client.delete_agent, body['agent_id'])
-        params = self._param_helper(
-            hypervisor='xen', os='linux', architecture='x86',
-            version='7.0', url='xxx://xxxx/xxx/xxx1',
-            md5hash='add6bb58e139be103324d04d82d8f546')
-        agent_xen = self.client.create_agent(**params)['agent']
-        self.addCleanup(self.client.delete_agent, agent_xen['agent_id'])
-
-        agent_id_xen = agent_xen['agent_id']
-        agents = (self.client.list_agents(hypervisor=agent_xen['hypervisor'])
-                  ['agents'])
-        self.assertNotEmpty(agents, 'Cannot get any agents.')
-        self.assertIn(agent_id_xen, map(lambda x: x['agent_id'], agents))
-        self.assertNotIn(body['agent_id'], map(lambda x: x['agent_id'],
-                                               agents))
-        for agent in agents:
-            self.assertEqual(agent_xen['hypervisor'], agent['hypervisor'])
diff --git a/tempest/api/compute/admin/test_servers.py b/tempest/api/compute/admin/test_servers.py
index be838fc..6c9aafb 100644
--- a/tempest/api/compute/admin/test_servers.py
+++ b/tempest/api/compute/admin/test_servers.py
@@ -207,15 +207,10 @@
         self.assertEqual(self.image_ref_alt, rebuilt_image_id)
 
     @decorators.idempotent_id('7a1323b4-a6a2-497a-96cb-76c07b945c71')
-    def test_reset_network_inject_network_info(self):
-        """Test resetting and injecting network info of a server"""
-        if not CONF.compute_feature_enabled.xenapi_apis:
-            raise self.skipException(
-                'The resetNetwork server action is not supported.')
-
-        # Reset Network of a Server
+    def test_inject_network_info(self):
+        """Test injecting network info of a server"""
+        # Create a server
         server = self.create_test_server(wait_until='ACTIVE')
-        self.client.reset_network(server['id'])
         # Inject the Network Info into Server
         self.client.inject_network_info(server['id'])
 
diff --git a/tempest/api/compute/servers/test_server_metadata.py b/tempest/api/compute/servers/test_server_metadata.py
index 9f93e76..5f35b15 100644
--- a/tempest/api/compute/servers/test_server_metadata.py
+++ b/tempest/api/compute/servers/test_server_metadata.py
@@ -27,13 +27,6 @@
     create_default_network = True
 
     @classmethod
-    def skip_checks(cls):
-        super(ServerMetadataTestJSON, cls).skip_checks()
-        if not CONF.compute_feature_enabled.xenapi_apis:
-            raise cls.skipException(
-                'Metadata is read-only on non-Xen-based deployments.')
-
-    @classmethod
     def setup_clients(cls):
         super(ServerMetadataTestJSON, cls).setup_clients()
         cls.client = cls.servers_client
diff --git a/tempest/api/compute/servers/test_server_metadata_negative.py b/tempest/api/compute/servers/test_server_metadata_negative.py
index 655909c..2059dfa 100644
--- a/tempest/api/compute/servers/test_server_metadata_negative.py
+++ b/tempest/api/compute/servers/test_server_metadata_negative.py
@@ -14,13 +14,10 @@
 #    under the License.
 
 from tempest.api.compute import base
-from tempest import config
 from tempest.lib.common.utils import data_utils
 from tempest.lib import decorators
 from tempest.lib import exceptions as lib_exc
 
-CONF = config.CONF
-
 
 class ServerMetadataNegativeTestJSON(base.BaseV2ComputeTest):
     """Negative tests of server metadata"""
@@ -91,10 +88,6 @@
 
         Raise BadRequest if key in uri does not match the key passed in body.
         """
-        if not CONF.compute_feature_enabled.xenapi_apis:
-            raise self.skipException(
-                'Metadata is read-only on non-Xen-based deployments.')
-
         meta = {'testkey': 'testvalue'}
         self.assertRaises(lib_exc.BadRequest,
                           self.client.set_server_metadata_item,
@@ -104,10 +97,6 @@
     @decorators.idempotent_id('0df38c2a-3d4e-4db5-98d8-d4d9fa843a12')
     def test_set_metadata_non_existent_server(self):
         """Test setting metadata for a non existent server should fail"""
-        if not CONF.compute_feature_enabled.xenapi_apis:
-            raise self.skipException(
-                'Metadata is read-only on non-Xen-based deployments.')
-
         non_existent_server_id = data_utils.rand_uuid()
         meta = {'meta1': 'data1'}
         self.assertRaises(lib_exc.NotFound,
@@ -119,10 +108,6 @@
     @decorators.idempotent_id('904b13dc-0ef2-4e4c-91cd-3b4a0f2f49d8')
     def test_update_metadata_non_existent_server(self):
         """Test updating metadata for a non existent server should fail"""
-        if not CONF.compute_feature_enabled.xenapi_apis:
-            raise self.skipException(
-                'Metadata is read-only on non-Xen-based deployments.')
-
         non_existent_server_id = data_utils.rand_uuid()
         meta = {'key1': 'value1', 'key2': 'value2'}
         self.assertRaises(lib_exc.NotFound,
@@ -134,10 +119,6 @@
     @decorators.idempotent_id('a452f38c-05c2-4b47-bd44-a4f0bf5a5e48')
     def test_update_metadata_with_blank_key(self):
         """Test updating server metadata to blank key should fail"""
-        if not CONF.compute_feature_enabled.xenapi_apis:
-            raise self.skipException(
-                'Metadata is read-only on non-Xen-based deployments.')
-
         meta = {'': 'data1'}
         self.assertRaises(lib_exc.BadRequest,
                           self.client.update_server_metadata,
@@ -150,10 +131,6 @@
 
         Should not be able to delete metadata item from a non-existent server.
         """
-        if not CONF.compute_feature_enabled.xenapi_apis:
-            raise self.skipException(
-                'Metadata is read-only on non-Xen-based deployments.')
-
         non_existent_server_id = data_utils.rand_uuid()
         self.assertRaises(lib_exc.NotFound,
                           self.client.delete_server_metadata_item,
@@ -168,10 +145,6 @@
         A 403 Forbidden or 413 Overlimit (old behaviour) exception
         will be raised while exceeding metadata items limit for project.
         """
-        if not CONF.compute_feature_enabled.xenapi_apis:
-            raise self.skipException(
-                'Metadata is read-only on non-Xen-based deployments.')
-
         quota_set = self.quotas_client.show_quota_set(
             self.tenant_id)['quota_set']
         quota_metadata = quota_set['metadata_items']
@@ -196,10 +169,6 @@
     @decorators.idempotent_id('96100343-7fa9-40d8-80fa-d29ef588ce1c')
     def test_set_server_metadata_blank_key(self):
         """Test setting server metadata with blank key should fail"""
-        if not CONF.compute_feature_enabled.xenapi_apis:
-            raise self.skipException(
-                'Metadata is read-only on non-Xen-based deployments.')
-
         meta = {'': 'data1'}
         self.assertRaises(lib_exc.BadRequest,
                           self.client.set_server_metadata,
@@ -209,10 +178,6 @@
     @decorators.idempotent_id('64a91aee-9723-4863-be44-4c9d9f1e7d0e')
     def test_set_server_metadata_missing_metadata(self):
         """Test setting server metadata without metadata field should fail"""
-        if not CONF.compute_feature_enabled.xenapi_apis:
-            raise self.skipException(
-                'Metadata is read-only on non-Xen-based deployments.')
-
         meta = {'meta1': 'data1'}
         self.assertRaises(lib_exc.BadRequest,
                           self.client.set_server_metadata,
diff --git a/tempest/api/compute/volumes/test_attach_volume.py b/tempest/api/compute/volumes/test_attach_volume.py
index 7ea8f09..e267b0f 100644
--- a/tempest/api/compute/volumes/test_attach_volume.py
+++ b/tempest/api/compute/volumes/test_attach_volume.py
@@ -465,6 +465,73 @@
         self._boot_from_multiattach_volume()
 
     @utils.services('image')
+    @decorators.idempotent_id('07eb6686-571c-45f0-9d96-446b120f1121')
+    def test_boot_with_multiattach_volume_direct_lun(self, boot=False):
+        image = self.images_client.show_image(CONF.compute.image_ref)
+        if image.get('hw_scsi_model') != 'virtio-scsi':
+            # NOTE(danms): Technically we don't need this to be virtio-scsi,
+            # but cirros (and other) test images won't see the device unless
+            # they have lsilogic drivers (which is the default). So use this
+            # as sort of the indication that the test should be enabled.
+            self.skip('hw_scsi_model=virtio-scsi not set on image')
+        if not CONF.validation.run_validation:
+            self.skip('validation is required for this test')
+
+        validation_resources = self.get_test_validation_resources(
+            self.os_primary)
+
+        volume = self._create_multiattach_volume(bootable=boot)
+        # Create an image-backed instance with the multi-attach volume as a
+        # block device with device_type=lun
+        bdm = [{'source_type': 'image',
+                'destination_type': 'local',
+                'uuid': CONF.compute.image_ref,
+                'boot_index': 0},
+               {'uuid': volume['id'],
+                'source_type': 'volume',
+                'destination_type': 'volume',
+                'device_type': 'lun',
+                'disk_bus': 'scsi'}]
+
+        if boot:
+            # If we're booting from it, we don't need the local-from-image
+            # disk, but we need the volume to have a boot_index
+            bdm.pop(0)
+            bdm[0]['boot_index'] = 0
+
+        server = self.create_test_server(
+            validatable=True,
+            validation_resources=validation_resources,
+            block_device_mapping_v2=bdm, wait_until='SSHABLE')
+
+        # Assert the volume is attached to the server.
+        attachments = self.servers_client.list_volume_attachments(
+            server['id'])['volumeAttachments']
+        self.assertEqual(1, len(attachments))
+        self.assertEqual(volume['id'], attachments[0]['volumeId'])
+
+        linux_client = remote_client.RemoteClient(
+            self.get_server_ip(server, validation_resources),
+            self.image_ssh_user,
+            self.image_ssh_password,
+            validation_resources['keypair']['private_key'],
+            server=server,
+            servers_client=self.servers_client)
+
+        # Assert the volume appears as a SCSI device
+        command = 'lsblk -S'
+        blks = linux_client.exec_command(command).strip()
+        self.assertIn('\nsda ', blks)
+
+        self.servers_client.delete_server(server['id'])
+        waiters.wait_for_server_termination(self.servers_client, server['id'])
+
+    @utils.services('image')
+    @decorators.idempotent_id('bfe61d6e-767a-4f93-9de8-054355536475')
+    def test_boot_from_multiattach_volume_direct_lun(self, boot=False):
+        self.test_boot_with_multiattach_volume_direct_lun(boot=True)
+
+    @utils.services('image')
     @decorators.idempotent_id('885ac48a-2d7a-40c5-ae8b-1993882d724c')
     @testtools.skipUnless(CONF.compute_feature_enabled.snapshot,
                           'Snapshotting is not available.')
diff --git a/tempest/api/volume/test_volumes_actions.py b/tempest/api/volume/test_volumes_actions.py
index 150677d..8cf44be 100644
--- a/tempest/api/volume/test_volumes_actions.py
+++ b/tempest/api/volume/test_volumes_actions.py
@@ -119,6 +119,13 @@
                         self.images_client.delete_image,
                         image_id)
         waiters.wait_for_image_status(self.images_client, image_id, 'active')
+        # This is required for the optimized upload volume path.
+        # New location APIs are async so we need to wait for the location
+        # import task to complete.
+        # This should work with old location API since we don't fail if there
+        # are no tasks for the image
+        waiters.wait_for_image_tasks_status(self.images_client,
+                                            image_id, 'success')
         waiters.wait_for_volume_resource_status(self.volumes_client,
                                                 self.volume['id'], 'available')
 
diff --git a/tempest/clients.py b/tempest/clients.py
index 5338ed4..e432120 100644
--- a/tempest/clients.py
+++ b/tempest/clients.py
@@ -13,8 +13,13 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import os
+
+from oslo_concurrency import lockutils
+
 from tempest import config
 from tempest.lib import auth
+from tempest.lib.common.rest_client import RestClient
 from tempest.lib import exceptions as lib_exc
 from tempest.lib.services import clients
 
@@ -35,6 +40,11 @@
         super(Manager, self).__init__(
             credentials=credentials, identity_uri=identity_uri, scope=scope,
             region=CONF.identity.region)
+        if CONF.record_resources:
+            RestClient.lock_dir = os.path.join(
+                lockutils.get_lock_path(CONF),
+                'tempest-rec-rw-lock')
+            RestClient.record_resources = True
         # TODO(andreaf) When clients are initialised without the right
         # parameters available, the calls below will trigger a KeyError.
         # We should catch that and raise a better error.
diff --git a/tempest/cmd/cleanup.py b/tempest/cmd/cleanup.py
index 2a406de..8d06f93 100644
--- a/tempest/cmd/cleanup.py
+++ b/tempest/cmd/cleanup.py
@@ -87,6 +87,23 @@
   ``saved_state.json`` file will be ignored and cleanup will be done based on
   the passed prefix only.
 
+* ``--resource-list``: Allows the use of file ``./resource_list.json``, which
+  contains all resources created by Tempest during all Tempest runs, to
+  create another method for removing only resources created by Tempest.
+  List of these resources is created when config option ``record_resources``
+  in default section is set to true. After using this option for cleanup,
+  the existing ``./resource_list.json`` is cleared from deleted resources.
+
+  When this option is used, ``saved_state.json`` file is not needed (no
+  need to run with ``--init-saved-state`` first). If there is any
+  ``saved_state.json`` file present and you run the tempest cleanup with
+  ``--resource-list``, the ``saved_state.json`` file will be ignored and
+  cleanup will be done based on the ``resource_list.json`` only.
+
+  If you run tempest cleanup with both ``--prefix`` and ``--resource-list``,
+  the ``--resource-list`` option will be ignored and cleanup will be done
+  based on the ``--prefix`` option only.
+
 * ``--help``: Print the help text for the command and parameters.
 
 .. [1] The ``_projects_to_clean`` dictionary in ``dry_run.json`` lists the
@@ -122,6 +139,7 @@
 
 SAVED_STATE_JSON = "saved_state.json"
 DRY_RUN_JSON = "dry_run.json"
+RESOURCE_LIST_JSON = "resource_list.json"
 LOG = logging.getLogger(__name__)
 CONF = config.CONF
 
@@ -164,6 +182,7 @@
         self.admin_mgr = clients.Manager(
             credentials.get_configured_admin_credentials())
         self.dry_run_data = {}
+        self.resource_data = {}
         self.json_data = {}
 
         # available services
@@ -177,12 +196,20 @@
             self._init_state()
             return
 
-        self._load_json()
+        if parsed_args.prefix:
+            return
+
+        if parsed_args.resource_list:
+            self._load_resource_list()
+            return
+
+        self._load_saved_state()
 
     def _cleanup(self):
         LOG.info("Begin cleanup")
         is_dry_run = self.options.dry_run
         is_preserve = not self.options.delete_tempest_conf_objects
+        is_resource_list = self.options.resource_list
         is_save_state = False
         cleanup_prefix = self.options.prefix
 
@@ -194,8 +221,10 @@
         # they are in saved state json. Therefore is_preserve is False
         kwargs = {'data': self.dry_run_data,
                   'is_dry_run': is_dry_run,
+                  'resource_list_json': self.resource_data,
                   'saved_state_json': self.json_data,
                   'is_preserve': False,
+                  'is_resource_list': is_resource_list,
                   'is_save_state': is_save_state,
                   'prefix': cleanup_prefix}
         project_service = cleanup_service.ProjectService(admin_mgr, **kwargs)
@@ -208,8 +237,10 @@
 
         kwargs = {'data': self.dry_run_data,
                   'is_dry_run': is_dry_run,
+                  'resource_list_json': self.resource_data,
                   'saved_state_json': self.json_data,
                   'is_preserve': is_preserve,
+                  'is_resource_list': is_resource_list,
                   'is_save_state': is_save_state,
                   'prefix': cleanup_prefix,
                   'got_exceptions': self.GOT_EXCEPTIONS}
@@ -228,11 +259,17 @@
                 f.write(json.dumps(self.dry_run_data, sort_keys=True,
                                    indent=2, separators=(',', ': ')))
 
+        if is_resource_list:
+            LOG.info("Clearing 'resource_list.json' file.")
+            with open(RESOURCE_LIST_JSON, 'w') as f:
+                f.write('{}')
+
     def _clean_project(self, project):
         LOG.debug("Cleaning project:  %s ", project['name'])
         is_dry_run = self.options.dry_run
         dry_run_data = self.dry_run_data
         is_preserve = not self.options.delete_tempest_conf_objects
+        is_resource_list = self.options.resource_list
         project_id = project['id']
         project_name = project['name']
         project_data = None
@@ -244,7 +281,9 @@
         kwargs = {'data': project_data,
                   'is_dry_run': is_dry_run,
                   'saved_state_json': self.json_data,
+                  'resource_list_json': self.resource_data,
                   'is_preserve': is_preserve,
+                  'is_resource_list': is_resource_list,
                   'is_save_state': False,
                   'project_id': project_id,
                   'prefix': cleanup_prefix,
@@ -287,6 +326,19 @@
                             "ignored when --init-saved-state is used so that "
                             "it can capture the true init state - all "
                             "resources present at that moment.")
+        parser.add_argument('--resource-list', action="store_true",
+                            dest='resource_list', default=False,
+                            help="Runs tempest cleanup with generated "
+                            "JSON file: " + RESOURCE_LIST_JSON + " to "
+                            "erase resources created during Tempest run. "
+                            "NOTE: To create " + RESOURCE_LIST_JSON + " "
+                            "set config option record_resources under default "
+                            "section in tempest.conf file to true. This "
+                            "option will be ignored when --init-saved-state "
+                            "is used so that it can capture the true init "
+                            "state - all resources present at that moment. "
+                            "This option will be ignored if passed with "
+                            "--prefix.")
         return parser
 
     def get_description(self):
@@ -304,6 +356,7 @@
                   'is_dry_run': False,
                   'saved_state_json': data,
                   'is_preserve': False,
+                  'is_resource_list': False,
                   'is_save_state': True,
                   # must be None as we want to capture true init state
                   # (all resources present) thus no filtering based
@@ -326,15 +379,31 @@
             f.write(json.dumps(data, sort_keys=True,
                                indent=2, separators=(',', ': ')))
 
-    def _load_json(self, saved_state_json=SAVED_STATE_JSON):
+    def _load_resource_list(self, resource_list_json=RESOURCE_LIST_JSON):
+        try:
+            with open(resource_list_json, 'rb') as json_file:
+                self.resource_data = json.load(json_file)
+        except IOError as ex:
+            LOG.exception(
+                "Failed loading 'resource_list.json', please "
+                "be sure you created this file by setting config "
+                "option record_resources in default section to true "
+                "prior to running tempest. Exception: %s", ex)
+            sys.exit(ex)
+        except Exception as ex:
+            LOG.exception(
+                "Exception parsing 'resource_list.json' : %s", ex)
+            sys.exit(ex)
+
+    def _load_saved_state(self, saved_state_json=SAVED_STATE_JSON):
         try:
             with open(saved_state_json, 'rb') as json_file:
                 self.json_data = json.load(json_file)
-
         except IOError as ex:
-            LOG.exception("Failed loading saved state, please be sure you"
-                          " have first run cleanup with --init-saved-state "
-                          "flag prior to running tempest. Exception: %s", ex)
+            LOG.exception(
+                "Failed loading saved state, please be sure you"
+                " have first run cleanup with --init-saved-state "
+                "flag prior to running tempest. Exception: %s", ex)
             sys.exit(ex)
         except Exception as ex:
             LOG.exception("Exception parsing saved state json : %s", ex)
diff --git a/tempest/cmd/cleanup_service.py b/tempest/cmd/cleanup_service.py
index 8651ab0..b202940 100644
--- a/tempest/cmd/cleanup_service.py
+++ b/tempest/cmd/cleanup_service.py
@@ -120,6 +120,13 @@
                  if item['name'].startswith(self.prefix)]
         return items
 
+    def _filter_by_resource_list(self, item_list, attr):
+        if attr not in self.resource_list_json:
+            return []
+        items = [item for item in item_list if item['id']
+                 in self.resource_list_json[attr].keys()]
+        return items
+
     def _filter_out_ids_from_saved(self, item_list, attr):
         items = [item for item in item_list if item['id']
                  not in self.saved_state_json[attr].keys()]
@@ -166,8 +173,11 @@
     def list(self):
         client = self.client
         snaps = client.list_snapshots()['snapshots']
+
         if self.prefix:
             snaps = self._filter_by_prefix(snaps)
+        elif self.is_resource_list:
+            snaps = self._filter_by_resource_list(snaps, 'snapshots')
         elif not self.is_save_state:
             # recreate list removing saved snapshots
             snaps = self._filter_out_ids_from_saved(snaps, 'snapshots')
@@ -205,8 +215,11 @@
         client = self.client
         servers_body = client.list_servers()
         servers = servers_body['servers']
+
         if self.prefix:
             servers = self._filter_by_prefix(servers)
+        elif self.is_resource_list:
+            servers = self._filter_by_resource_list(servers, 'servers')
         elif not self.is_save_state:
             # recreate list removing saved servers
             servers = self._filter_out_ids_from_saved(servers, 'servers')
@@ -238,9 +251,12 @@
 
     def list(self):
         client = self.server_groups_client
-        sgs = client.list_server_groups()['server_groups']
+        sgs = client.list_server_groups(all_projects=True)['server_groups']
+
         if self.prefix:
             sgs = self._filter_by_prefix(sgs)
+        elif self.is_resource_list:
+            sgs = self._filter_by_resource_list(sgs, 'server_groups')
         elif not self.is_save_state:
             # recreate list removing saved server_groups
             sgs = self._filter_out_ids_from_saved(sgs, 'server_groups')
@@ -276,8 +292,13 @@
     def list(self):
         client = self.client
         keypairs = client.list_keypairs()['keypairs']
+
         if self.prefix:
             keypairs = self._filter_by_prefix(keypairs)
+        elif self.is_resource_list:
+            keypairs = [keypair for keypair in keypairs
+                        if keypair['keypair']['name']
+                        in self.resource_list_json['keypairs'].keys()]
         elif not self.is_save_state:
             # recreate list removing saved keypairs
             keypairs = [keypair for keypair in keypairs
@@ -317,8 +338,11 @@
     def list(self):
         client = self.client
         vols = client.list_volumes()['volumes']
+
         if self.prefix:
             vols = self._filter_by_prefix(vols)
+        elif self.is_resource_list:
+            vols = self._filter_by_resource_list(vols, 'volumes')
         elif not self.is_save_state:
             # recreate list removing saved volumes
             vols = self._filter_out_ids_from_saved(vols, 'volumes')
@@ -462,8 +486,11 @@
         client = self.networks_client
         networks = client.list_networks(**self.tenant_filter)
         networks = networks['networks']
+
         if self.prefix:
             networks = self._filter_by_prefix(networks)
+        elif self.is_resource_list:
+            networks = self._filter_by_resource_list(networks, 'networks')
         else:
             if not self.is_save_state:
                 # recreate list removing saved networks
@@ -500,15 +527,17 @@
 class NetworkFloatingIpService(BaseNetworkService):
 
     def list(self):
-        if self.prefix:
-            # this means we're cleaning resources based on a certain prefix,
-            # this resource doesn't have a name, therefore return empty list
-            return []
         client = self.floating_ips_client
         flips = client.list_floatingips(**self.tenant_filter)
         flips = flips['floatingips']
 
-        if not self.is_save_state:
+        if self.prefix:
+            # this means we're cleaning resources based on a certain prefix,
+            # this resource doesn't have a name, therefore return empty list
+            return []
+        elif self.is_resource_list:
+            flips = self._filter_by_resource_list(flips, 'floatingips')
+        elif not self.is_save_state:
             # recreate list removing saved flips
             flips = self._filter_out_ids_from_saved(flips, 'floatingips')
         LOG.debug("List count, %s Network Floating IPs", len(flips))
@@ -543,8 +572,11 @@
         client = self.routers_client
         routers = client.list_routers(**self.tenant_filter)
         routers = routers['routers']
+
         if self.prefix:
             routers = self._filter_by_prefix(routers)
+        elif self.is_resource_list:
+            routers = self._filter_by_resource_list(routers, 'routers')
         else:
             if not self.is_save_state:
                 # recreate list removing saved routers
@@ -592,16 +624,19 @@
 class NetworkMeteringLabelRuleService(NetworkService):
 
     def list(self):
-        if self.prefix:
-            # this means we're cleaning resources based on a certain prefix,
-            # this resource doesn't have a name, therefore return empty list
-            return []
         client = self.metering_label_rules_client
         rules = client.list_metering_label_rules()
         rules = rules['metering_label_rules']
         rules = self._filter_by_tenant_id(rules)
 
-        if not self.is_save_state:
+        if self.prefix:
+            # this means we're cleaning resources based on a certain prefix,
+            # this resource doesn't have a name, therefore return empty list
+            return []
+        elif self.is_resource_list:
+            rules = self._filter_by_resource_list(
+                rules, 'metering_label_rules')
+        elif not self.is_save_state:
             rules = self._filter_out_ids_from_saved(
                 rules, 'metering_label_rules')
             # recreate list removing saved rules
@@ -638,8 +673,12 @@
         labels = client.list_metering_labels()
         labels = labels['metering_labels']
         labels = self._filter_by_tenant_id(labels)
+
         if self.prefix:
             labels = self._filter_by_prefix(labels)
+        elif self.is_resource_list:
+            labels = self._filter_by_resource_list(
+                labels, 'metering_labels')
         elif not self.is_save_state:
             # recreate list removing saved labels
             labels = self._filter_out_ids_from_saved(
@@ -677,8 +716,11 @@
                  client.list_ports(**self.tenant_filter)['ports']
                  if port["device_owner"] == "" or
                  port["device_owner"].startswith("compute:")]
+
         if self.prefix:
             ports = self._filter_by_prefix(ports)
+        elif self.is_resource_list:
+            ports = self._filter_by_resource_list(ports, 'ports')
         else:
             if not self.is_save_state:
                 # recreate list removing saved ports
@@ -717,8 +759,12 @@
         secgroups = [secgroup for secgroup in
                      client.list_security_groups(**filter)['security_groups']
                      if secgroup['name'] != 'default']
+
         if self.prefix:
             secgroups = self._filter_by_prefix(secgroups)
+        elif self.is_resource_list:
+            secgroups = self._filter_by_resource_list(
+                secgroups, 'security_groups')
         else:
             if not self.is_save_state:
                 # recreate list removing saved security_groups
@@ -760,8 +806,11 @@
         client = self.subnets_client
         subnets = client.list_subnets(**self.tenant_filter)
         subnets = subnets['subnets']
+
         if self.prefix:
             subnets = self._filter_by_prefix(subnets)
+        elif self.is_resource_list:
+            subnets = self._filter_by_resource_list(subnets, 'subnets')
         else:
             if not self.is_save_state:
                 # recreate list removing saved subnets
@@ -797,8 +846,11 @@
     def list(self):
         client = self.subnetpools_client
         pools = client.list_subnetpools(**self.tenant_filter)['subnetpools']
+
         if self.prefix:
             pools = self._filter_by_prefix(pools)
+        elif self.is_resource_list:
+            pools = self._filter_by_resource_list(pools, 'subnetpools')
         else:
             if not self.is_save_state:
                 # recreate list removing saved subnet pools
@@ -838,13 +890,18 @@
         self.client = manager.regions_client
 
     def list(self):
+        client = self.client
+        regions = client.list_regions()
+
         if self.prefix:
             # this means we're cleaning resources based on a certain prefix,
             # this resource doesn't have a name, therefore return empty list
             return []
-        client = self.client
-        regions = client.list_regions()
-        if not self.is_save_state:
+        elif self.is_resource_list:
+            regions = self._filter_by_resource_list(
+                regions['regions'], 'regions')
+            return regions
+        elif not self.is_save_state:
             regions = self._filter_out_ids_from_saved(
                 regions['regions'], 'regions')
             LOG.debug("List count, %s Regions", len(regions))
@@ -884,8 +941,11 @@
     def list(self):
         client = self.client
         flavors = client.list_flavors({"is_public": None})['flavors']
+
         if self.prefix:
             flavors = self._filter_by_prefix(flavors)
+        elif self.is_resource_list:
+            flavors = self._filter_by_resource_list(flavors, 'flavors')
         else:
             if not self.is_save_state:
                 # recreate list removing saved flavors
@@ -932,8 +992,11 @@
             marker = urllib.parse_qs(parsed.query)['marker'][0]
             response = client.list_images(params={"marker": marker})
             images.extend(response['images'])
+
         if self.prefix:
             images = self._filter_by_prefix(images)
+        elif self.is_resource_list:
+            images = self._filter_by_resource_list(images, 'images')
         else:
             if not self.is_save_state:
                 images = self._filter_out_ids_from_saved(images, 'images')
@@ -974,6 +1037,8 @@
         users = self.client.list_users()['users']
         if self.prefix:
             users = self._filter_by_prefix(users)
+        elif self.is_resource_list:
+            users = self._filter_by_resource_list(users, 'users')
         else:
             if not self.is_save_state:
                 users = self._filter_out_ids_from_saved(users, 'users')
@@ -1015,8 +1080,11 @@
     def list(self):
         try:
             roles = self.client.list_roles()['roles']
+
             if self.prefix:
                 roles = self._filter_by_prefix(roles)
+            elif self.is_resource_list:
+                roles = self._filter_by_resource_list(roles, 'roles')
             elif not self.is_save_state:
                 # reconcile roles with saved state and never list admin role
                 roles = self._filter_out_ids_from_saved(roles, 'roles')
@@ -1056,8 +1124,11 @@
 
     def list(self):
         projects = self.client.list_projects()['projects']
+
         if self.prefix:
             projects = self._filter_by_prefix(projects)
+        elif self.is_resource_list:
+            projects = self._filter_by_resource_list(projects, 'projects')
         else:
             if not self.is_save_state:
                 projects = self._filter_out_ids_from_saved(
@@ -1099,8 +1170,11 @@
     def list(self):
         client = self.client
         domains = client.list_domains()['domains']
+
         if self.prefix:
             domains = self._filter_by_prefix(domains)
+        elif self.is_resource_list:
+            domains = self._filter_by_resource_list(domains, 'domains')
         elif not self.is_save_state:
             domains = self._filter_out_ids_from_saved(domains, 'domains')
         LOG.debug("List count, %s Domains after reconcile", len(domains))
diff --git a/tempest/config.py b/tempest/config.py
index 60a3aa9..6eee88a 100644
--- a/tempest/config.py
+++ b/tempest/config.py
@@ -595,18 +595,6 @@
                 help='Does the test environment support attaching a volume to '
                      'more than one instance? This depends on hypervisor and '
                      'volume backend/type and compute API version 2.60.'),
-    cfg.BoolOpt('xenapi_apis',
-                default=False,
-                help='Does the test environment support the XenAPI-specific '
-                     'APIs: os-agents, writeable server metadata and the '
-                     'resetNetwork server action? '
-                     'These were removed in Victoria alongside the XenAPI '
-                     'virt driver.',
-                deprecated_for_removal=True,
-                deprecated_reason="On Nova side, XenAPI virt driver and the "
-                                  "APIs that only worked with that driver "
-                                  "have been removed and there's nothing to "
-                                  "test after Ussuri."),
     cfg.BoolOpt('ide_bus',
                 default=True,
                 help='Does the test environment support attaching devices '
@@ -1321,6 +1309,15 @@
                     "to cleanup only the resources that match the prefix. "
                     "Make sure this prefix does not match with the resource "
                     "name you do not want Tempest cleanup CLI to delete."),
+    cfg.BoolOpt('record_resources',
+                default=False,
+                help="Allows to record all resources created by Tempest. "
+                     "These resources are stored in file resource_list.json, "
+                     "which can be later used for resource deletion by "
+                     "command tempest cleanup. The resource_list.json file "
+                     "will be appended in case of multiple Tempest runs, "
+                     "so the file will contain a list of resources created "
+                     "during all Tempest runs."),
 ]
 
 _opts = [
diff --git a/tempest/lib/api_schema/response/compute/v2_80/__init__.py b/tempest/lib/api_schema/response/compute/v2_80/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tempest/lib/api_schema/response/compute/v2_80/__init__.py
diff --git a/tempest/lib/api_schema/response/compute/v2_80/migrations.py b/tempest/lib/api_schema/response/compute/v2_80/migrations.py
new file mode 100644
index 0000000..f2fa008
--- /dev/null
+++ b/tempest/lib/api_schema/response/compute/v2_80/migrations.py
@@ -0,0 +1,40 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import copy
+
+from tempest.lib.api_schema.response.compute.v2_59 import migrations
+
+###########################################################################
+#
+# 2.80:
+#
+# The user_id and project_id value is now returned in the response body in
+# addition to the migration id for the following API responses:
+#
+# - GET /os-migrations
+#
+###########################################################################
+
+user_id = {'type': 'string'}
+project_id = {'type': 'string'}
+
+list_migrations = copy.deepcopy(migrations.list_migrations)
+
+list_migrations['response_body']['properties']['migrations']['items'][
+    'properties'].update({
+        'user_id': user_id,
+        'project_id': project_id
+    })
+
+list_migrations['response_body']['properties']['migrations']['items'][
+    'required'].extend(['user_id', 'project_id'])
diff --git a/tempest/lib/api_schema/response/compute/v2_89/__init__.py b/tempest/lib/api_schema/response/compute/v2_89/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tempest/lib/api_schema/response/compute/v2_89/__init__.py
diff --git a/tempest/lib/api_schema/response/compute/v2_89/servers.py b/tempest/lib/api_schema/response/compute/v2_89/servers.py
new file mode 100644
index 0000000..debf0dc
--- /dev/null
+++ b/tempest/lib/api_schema/response/compute/v2_89/servers.py
@@ -0,0 +1,84 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import copy
+
+from tempest.lib.api_schema.response.compute.v2_79 import servers as servers279
+
+
+###########################################################################
+#
+# 2.89:
+#
+# The attachment_id and bdm_uuid parameter is now returned in the response body
+# of the following calls:
+#
+# - GET /servers/{server_id}/os-volume_attachments
+# - GET /servers/{server_id}/os-volume_attachments/{volume_id}
+# - POST /servers/{server_id}/os-volume_attachments
+###########################################################################
+
+attach_volume = copy.deepcopy(servers279.attach_volume)
+
+show_volume_attachment = copy.deepcopy(servers279.show_volume_attachment)
+
+list_volume_attachments = copy.deepcopy(servers279.list_volume_attachments)
+
+# Remove properties
+# 'id' is available unti v2.88
+show_volume_attachment['response_body']['properties'][
+    'volumeAttachment']['properties'].pop('id')
+show_volume_attachment['response_body']['properties'][
+    'volumeAttachment']['required'].remove('id')
+list_volume_attachments['response_body']['properties'][
+    'volumeAttachments']['items']['properties'].pop('id')
+list_volume_attachments['response_body']['properties'][
+    'volumeAttachments']['items']['required'].remove('id')
+
+
+# Add new properties
+new_properties = {
+    'attachment_id': {'type': 'string', 'format': 'uuid'},
+    'bdm_uuid': {'type': 'string', 'format': 'uuid'}
+}
+
+show_volume_attachment['response_body']['properties'][
+    'volumeAttachment']['properties'].update(new_properties)
+show_volume_attachment['response_body']['properties'][
+    'volumeAttachment']['required'].extend(new_properties.keys())
+list_volume_attachments['response_body']['properties'][
+    'volumeAttachments']['items']['properties'].update(new_properties)
+list_volume_attachments['response_body']['properties'][
+    'volumeAttachments']['items']['required'].extend(new_properties.keys())
+
+
+# NOTE(zhufl): Below are the unchanged schema in this microversion. We
+# need to keep this schema in this file to have the generic way to select the
+# right schema based on self.schema_versions_info mapping in service client.
+# ****** Schemas unchanged since microversion 2.75 ***
+rebuild_server = copy.deepcopy(servers279.rebuild_server)
+rebuild_server_with_admin_pass = copy.deepcopy(
+    servers279.rebuild_server_with_admin_pass)
+update_server = copy.deepcopy(servers279.update_server)
+get_server = copy.deepcopy(servers279.get_server)
+list_servers_detail = copy.deepcopy(servers279.list_servers_detail)
+list_servers = copy.deepcopy(servers279.list_servers)
+show_server_diagnostics = copy.deepcopy(servers279.show_server_diagnostics)
+get_remote_consoles = copy.deepcopy(servers279.get_remote_consoles)
+list_tags = copy.deepcopy(servers279.list_tags)
+update_all_tags = copy.deepcopy(servers279.update_all_tags)
+delete_all_tags = copy.deepcopy(servers279.delete_all_tags)
+check_tag_existence = copy.deepcopy(servers279.check_tag_existence)
+update_tag = copy.deepcopy(servers279.update_tag)
+delete_tag = copy.deepcopy(servers279.delete_tag)
+show_instance_action = copy.deepcopy(servers279.show_instance_action)
+create_backup = copy.deepcopy(servers279.create_backup)
diff --git a/tempest/lib/common/rest_client.py b/tempest/lib/common/rest_client.py
index 6cf5b73..4f9e9ba 100644
--- a/tempest/lib/common/rest_client.py
+++ b/tempest/lib/common/rest_client.py
@@ -21,6 +21,7 @@
 import urllib
 import urllib3
 
+from fasteners import process_lock
 import jsonschema
 from oslo_log import log as logging
 from oslo_log import versionutils
@@ -78,6 +79,17 @@
     # The version of the API this client implements
     api_version = None
 
+    # Directory for storing read-write lock
+    lock_dir = None
+
+    # An interprocess lock used when the recording of all resources created by
+    # Tempest is allowed.
+    rec_rw_lock = None
+
+    # Variable mirrors value in config option 'record_resources' that allows
+    # the recording of all resources created by Tempest.
+    record_resources = False
+
     LOG = logging.getLogger(__name__)
 
     def __init__(self, auth_provider, service, region,
@@ -297,7 +309,13 @@
                  and the second the response body
         :rtype: tuple
         """
-        return self.request('POST', url, extra_headers, headers, body, chunked)
+        resp_header, resp_body = self.request(
+            'POST', url, extra_headers, headers, body, chunked)
+
+        if self.record_resources:
+            self.resource_record(resp_body)
+
+        return resp_header, resp_body
 
     def get(self, url, headers=None, extra_headers=False, chunked=False):
         """Send a HTTP GET request using keystone service catalog and auth
@@ -1006,6 +1024,66 @@
         """Returns the primary type of resource this client works with."""
         return 'resource'
 
+    def resource_update(self, data, res_type, res_dict):
+        """Updates resource_list.json file with current resource."""
+        if not isinstance(res_dict, dict):
+            return
+
+        if not res_type.endswith('s'):
+            res_type += 's'
+
+        if res_type not in data:
+            data[res_type] = {}
+
+        if 'uuid' in res_dict:
+            data[res_type].update(
+                {res_dict.get('uuid'): res_dict.get('name')})
+        elif 'id' in res_dict:
+            data[res_type].update(
+                {res_dict.get('id'): res_dict.get('name')})
+        elif 'name' in res_dict:
+            data[res_type].update({res_dict.get('name'): ""})
+
+        self.rec_rw_lock.acquire_write_lock()
+        with open("resource_list.json", 'w+') as f:
+            f.write(json.dumps(data, indent=2, separators=(',', ': ')))
+        self.rec_rw_lock.release_write_lock()
+
+    def resource_record(self, resp_dict):
+        """Records resources into resource_list.json file."""
+        if self.rec_rw_lock is None:
+            path = self.lock_dir
+            self.rec_rw_lock = (
+                process_lock.InterProcessReaderWriterLock(path)
+            )
+
+        self.rec_rw_lock.acquire_read_lock()
+        try:
+            with open('resource_list.json', 'rb') as f:
+                data = json.load(f)
+        except IOError:
+            data = {}
+        self.rec_rw_lock.release_read_lock()
+
+        try:
+            resp_dict = json.loads(resp_dict.decode('utf-8'))
+        except (AttributeError, TypeError, ValueError):
+            return
+
+        # check if response has any keys
+        if not resp_dict.keys():
+            return
+
+        resource_type = list(resp_dict.keys())[0]
+
+        resource_dict = resp_dict[resource_type]
+
+        if isinstance(resource_dict, list):
+            for resource in resource_dict:
+                self.resource_update(data, resource_type, resource)
+        else:
+            self.resource_update(data, resource_type, resource_dict)
+
     @classmethod
     def validate_response(cls, schema, resp, body):
         # Only check the response if the status code is a success code
diff --git a/tempest/lib/services/compute/migrations_client.py b/tempest/lib/services/compute/migrations_client.py
index 8a6e62a..d43fe83 100644
--- a/tempest/lib/services/compute/migrations_client.py
+++ b/tempest/lib/services/compute/migrations_client.py
@@ -21,6 +21,8 @@
     as schemav223
 from tempest.lib.api_schema.response.compute.v2_59 import migrations \
     as schemav259
+from tempest.lib.api_schema.response.compute.v2_80 import migrations \
+    as schemav280
 from tempest.lib.common import rest_client
 from tempest.lib.services.compute import base_compute_client
 
@@ -29,7 +31,8 @@
     schema_versions_info = [
         {'min': None, 'max': '2.22', 'schema': schema},
         {'min': '2.23', 'max': '2.58', 'schema': schemav223},
-        {'min': '2.59', 'max': None, 'schema': schemav259}]
+        {'min': '2.59', 'max': '2.79', 'schema': schemav259},
+        {'min': '2.80', 'max': None, 'schema': schemav280}]
 
     def list_migrations(self, **params):
         """List all migrations.
diff --git a/tempest/lib/services/compute/server_groups_client.py b/tempest/lib/services/compute/server_groups_client.py
index 9895653..5c1e623 100644
--- a/tempest/lib/services/compute/server_groups_client.py
+++ b/tempest/lib/services/compute/server_groups_client.py
@@ -14,6 +14,8 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+from urllib import parse as urllib
+
 from oslo_serialization import jsonutils as json
 
 from tempest.lib.api_schema.response.compute.v2_1 import server_groups \
@@ -55,9 +57,14 @@
         self.validate_response(schema.delete_server_group, resp, body)
         return rest_client.ResponseBody(resp, body)
 
-    def list_server_groups(self):
+    def list_server_groups(self, **params):
         """List the server-groups."""
-        resp, body = self.get("os-server-groups")
+
+        url = 'os-server-groups'
+        if params:
+            url += '?%s' % urllib.urlencode(params)
+
+        resp, body = self.get(url)
         body = json.loads(body)
         schema = self.get_schema(self.schema_versions_info)
         self.validate_response(schema.list_server_groups, resp, body)
diff --git a/tempest/lib/services/compute/servers_client.py b/tempest/lib/services/compute/servers_client.py
index 7e3b99f..1b93f91 100644
--- a/tempest/lib/services/compute/servers_client.py
+++ b/tempest/lib/services/compute/servers_client.py
@@ -43,6 +43,7 @@
 from tempest.lib.api_schema.response.compute.v2_75 import servers as schemav275
 from tempest.lib.api_schema.response.compute.v2_79 import servers as schemav279
 from tempest.lib.api_schema.response.compute.v2_8 import servers as schemav28
+from tempest.lib.api_schema.response.compute.v2_89 import servers as schemav289
 from tempest.lib.api_schema.response.compute.v2_9 import servers as schemav29
 from tempest.lib.common import rest_client
 from tempest.lib.services.compute import base_compute_client
@@ -73,7 +74,8 @@
         {'min': '2.71', 'max': '2.72', 'schema': schemav271},
         {'min': '2.73', 'max': '2.74', 'schema': schemav273},
         {'min': '2.75', 'max': '2.78', 'schema': schemav275},
-        {'min': '2.79', 'max': None, 'schema': schemav279}]
+        {'min': '2.79', 'max': '2.88', 'schema': schemav279},
+        {'min': '2.89', 'max': None, 'schema': schemav289}]
 
     def __init__(self, auth_provider, service, region,
                  enable_instance_password=True, **kwargs):
@@ -896,7 +898,11 @@
         API reference:
         https://docs.openstack.org/api-ref/compute/#evacuate-server-evacuate-action
         """
-        if self.enable_instance_password:
+        api_version = self.get_headers().get(self.api_microversion_header_name)
+
+        if not api_version and self.enable_instance_password:
+            evacuate_schema = schema.evacuate_server_with_admin_pass
+        elif api_version < '2.14':
             evacuate_schema = schema.evacuate_server_with_admin_pass
         else:
             evacuate_schema = schema.evacuate_server
diff --git a/tempest/tests/cmd/test_cleanup.py b/tempest/tests/cmd/test_cleanup.py
index 69e735b..3efc9bd 100644
--- a/tempest/tests/cmd/test_cleanup.py
+++ b/tempest/tests/cmd/test_cleanup.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import json
 from unittest import mock
 
 from tempest.cmd import cleanup
@@ -20,12 +21,30 @@
 
 class TestTempestCleanup(base.TestCase):
 
-    def test_load_json(self):
+    def test_load_json_saved_state(self):
         # instantiate "empty" TempestCleanup
         c = cleanup.TempestCleanup(None, None, 'test')
         test_saved_json = 'tempest/tests/cmd/test_saved_state_json.json'
+        with open(test_saved_json, 'r') as f:
+            test_saved_json_content = json.load(f)
         # test if the file is loaded without any issues/exceptions
-        c._load_json(test_saved_json)
+        c.options = mock.Mock()
+        c.options.init_saved_state = True
+        c._load_saved_state(test_saved_json)
+        self.assertEqual(c.json_data, test_saved_json_content)
+
+    def test_load_json_resource_list(self):
+        # instantiate "empty" TempestCleanup
+        c = cleanup.TempestCleanup(None, None, 'test')
+        test_resource_list = 'tempest/tests/cmd/test_resource_list.json'
+        with open(test_resource_list, 'r') as f:
+            test_resource_list_content = json.load(f)
+        # test if the file is loaded without any issues/exceptions
+        c.options = mock.Mock()
+        c.options.init_saved_state = False
+        c.options.resource_list = True
+        c._load_resource_list(test_resource_list)
+        self.assertEqual(c.resource_data, test_resource_list_content)
 
     @mock.patch('tempest.cmd.cleanup.TempestCleanup.init')
     @mock.patch('tempest.cmd.cleanup.TempestCleanup._cleanup')
diff --git a/tempest/tests/cmd/test_cleanup_services.py b/tempest/tests/cmd/test_cleanup_services.py
index 6b3b4b7..2557145 100644
--- a/tempest/tests/cmd/test_cleanup_services.py
+++ b/tempest/tests/cmd/test_cleanup_services.py
@@ -41,8 +41,10 @@
     def test_base_service_init(self):
         kwargs = {'data': {'data': 'test'},
                   'is_dry_run': False,
+                  'resource_list_json': {'resp': 'data'},
                   'saved_state_json': {'saved': 'data'},
                   'is_preserve': False,
+                  'is_resource_list': False,
                   'is_save_state': True,
                   'prefix': 'tempest',
                   'tenant_id': 'project_id',
@@ -50,8 +52,10 @@
         base = cleanup_service.BaseService(kwargs)
         self.assertEqual(base.data, kwargs['data'])
         self.assertFalse(base.is_dry_run)
+        self.assertEqual(base.resource_list_json, kwargs['resource_list_json'])
         self.assertEqual(base.saved_state_json, kwargs['saved_state_json'])
         self.assertFalse(base.is_preserve)
+        self.assertFalse(base.is_resource_list)
         self.assertTrue(base.is_save_state)
         self.assertEqual(base.tenant_filter['project_id'], kwargs['tenant_id'])
         self.assertEqual(base.got_exceptions, kwargs['got_exceptions'])
@@ -60,8 +64,10 @@
     def test_not_implemented_ex(self):
         kwargs = {'data': {'data': 'test'},
                   'is_dry_run': False,
+                  'resource_list_json': {'resp': 'data'},
                   'saved_state_json': {'saved': 'data'},
                   'is_preserve': False,
+                  'is_resource_list': False,
                   'is_save_state': False,
                   'prefix': 'tempest',
                   'tenant_id': 'project_id',
@@ -181,10 +187,20 @@
         "subnetpools": {'8acf64c1-43fc': 'saved-subnet-pool'},
         "regions": {'RegionOne': {}}
     }
+
+    resource_list = {
+        "keypairs": {'saved-key-pair': ""}
+    }
+
     # Mocked methods
     get_method = 'tempest.lib.common.rest_client.RestClient.get'
     delete_method = 'tempest.lib.common.rest_client.RestClient.delete'
     log_method = 'tempest.cmd.cleanup_service.LOG.exception'
+    filter_saved_state = 'tempest.cmd.cleanup_service.' \
+                         'BaseService._filter_out_ids_from_saved'
+    filter_resource_list = 'tempest.cmd.cleanup_service.' \
+                           'BaseService._filter_by_resource_list'
+    filter_prefix = 'tempest.cmd.cleanup_service.BaseService._filter_by_prefix'
     # Override parameters
     service_class = 'BaseService'
     response = None
@@ -192,17 +208,19 @@
 
     def _create_cmd_service(self, service_type, is_save_state=False,
                             is_preserve=False, is_dry_run=False,
-                            prefix=''):
+                            prefix='', is_resource_list=False):
         creds = fake_credentials.FakeKeystoneV3Credentials()
         os = clients.Manager(creds)
         return getattr(cleanup_service, service_type)(
             os,
+            is_resource_list=is_resource_list,
             is_save_state=is_save_state,
             is_preserve=is_preserve,
             is_dry_run=is_dry_run,
             prefix=prefix,
             project_id='b8e3ece07bb049138d224436756e3b57',
             data={},
+            resource_list_json=self.resource_list,
             saved_state_json=self.saved_state
             )
 
@@ -266,6 +284,38 @@
             self.assertNotIn(rsp['id'], self.conf_values.values())
             self.assertNotIn(rsp['name'], self.conf_values.values())
 
+    def _test_prefix_opt_precedence(self, delete_mock):
+        serv = self._create_cmd_service(
+            self.service_class, is_resource_list=True, prefix='tempest')
+        _, fixtures = self.run_function_with_mocks(
+            serv.run,
+            delete_mock
+        )
+
+        # Check that prefix was used for filtering
+        fixtures[2].mock.assert_called_once()
+
+        # Check that neither saved_state.json nor resource list was
+        # used for filtering
+        fixtures[0].mock.assert_not_called()
+        fixtures[1].mock.assert_not_called()
+
+    def _test_resource_list_opt_precedence(self, delete_mock):
+        serv = self._create_cmd_service(
+            self.service_class, is_resource_list=True)
+        _, fixtures = self.run_function_with_mocks(
+            serv.run,
+            delete_mock
+        )
+
+        # Check that resource list was used for filtering
+        fixtures[1].mock.assert_called_once()
+
+        # Check that neither saved_state.json nor prefix was
+        # used for filtering
+        fixtures[0].mock.assert_not_called()
+        fixtures[2].mock.assert_not_called()
+
 
 class TestSnapshotService(BaseCmdServiceTests):
 
@@ -320,6 +370,24 @@
     def test_save_state(self):
         self._test_saved_state_true([(self.get_method, self.response, 200)])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_prefix_opt_precedence(delete_mock)
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
+
 
 class TestServerService(BaseCmdServiceTests):
 
@@ -378,6 +446,24 @@
     def test_save_state(self):
         self._test_saved_state_true([(self.get_method, self.response, 200)])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_prefix_opt_precedence(delete_mock)
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
+
 
 class TestServerGroupService(BaseCmdServiceTests):
 
@@ -429,6 +515,26 @@
                                      (self.validate_response, 'validate', None)
                                      ])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.validate_response, 'validate', None),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_prefix_opt_precedence(delete_mock)
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.validate_response, 'validate', None),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
+
 
 class TestKeyPairService(BaseCmdServiceTests):
 
@@ -493,6 +599,33 @@
             (self.validate_response, 'validate', None)
         ])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.validate_response, 'validate', None),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_prefix_opt_precedence(delete_mock)
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.validate_response, 'validate', None),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        serv = self._create_cmd_service(
+            self.service_class, is_resource_list=True)
+
+        _, fixtures = self.run_function_with_mocks(
+            serv.delete,
+            delete_mock
+        )
+
+        # Check that prefix was not used for filtering
+        fixtures[0].mock.assert_not_called()
+
 
 class TestVolumeService(BaseCmdServiceTests):
 
@@ -542,6 +675,24 @@
     def test_save_state(self):
         self._test_saved_state_true([(self.get_method, self.response, 200)])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_prefix_opt_precedence(delete_mock)
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
+
 
 class TestVolumeQuotaService(BaseCmdServiceTests):
 
@@ -761,6 +912,24 @@
             })
         self._test_is_preserve_true([(self.get_method, self.response, 200)])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_prefix_opt_precedence(delete_mock)
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
+
 
 class TestNetworkFloatingIpService(BaseCmdServiceTests):
 
@@ -823,6 +992,34 @@
     def test_save_state(self):
         self._test_saved_state_true([(self.get_method, self.response, 200)])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        serv = self._create_cmd_service(
+            self.service_class, is_resource_list=True, prefix='tempest')
+        _, fixtures = self.run_function_with_mocks(
+            serv.run,
+            delete_mock
+        )
+
+        # cleanup returns []
+        fixtures[0].mock.assert_not_called()
+        fixtures[1].mock.assert_not_called()
+        fixtures[2].mock.assert_not_called()
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
+
 
 class TestNetworkRouterService(BaseCmdServiceTests):
 
@@ -937,6 +1134,24 @@
             })
         self._test_is_preserve_true([(self.get_method, self.response, 200)])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_prefix_opt_precedence(delete_mock)
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
+
 
 class TestNetworkMeteringLabelRuleService(BaseCmdServiceTests):
 
@@ -978,6 +1193,34 @@
     def test_save_state(self):
         self._test_saved_state_true([(self.get_method, self.response, 200)])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        serv = self._create_cmd_service(
+            self.service_class, is_resource_list=True, prefix='tempest')
+        _, fixtures = self.run_function_with_mocks(
+            serv.run,
+            delete_mock
+        )
+
+        # cleanup returns []
+        fixtures[0].mock.assert_not_called()
+        fixtures[1].mock.assert_not_called()
+        fixtures[2].mock.assert_not_called()
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
+
 
 class TestNetworkMeteringLabelService(BaseCmdServiceTests):
 
@@ -1020,6 +1263,24 @@
     def test_save_state(self):
         self._test_saved_state_true([(self.get_method, self.response, 200)])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_prefix_opt_precedence(delete_mock)
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
+
 
 class TestNetworkPortService(BaseCmdServiceTests):
 
@@ -1118,6 +1379,24 @@
             })
         self._test_is_preserve_true([(self.get_method, self.response, 200)])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_prefix_opt_precedence(delete_mock)
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
+
 
 class TestNetworkSecGroupService(BaseCmdServiceTests):
 
@@ -1196,6 +1475,24 @@
             })
         self._test_is_preserve_true([(self.get_method, self.response, 200)])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_prefix_opt_precedence(delete_mock)
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
+
 
 class TestNetworkSubnetService(BaseCmdServiceTests):
 
@@ -1272,6 +1569,24 @@
             })
         self._test_is_preserve_true([(self.get_method, self.response, 200)])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_prefix_opt_precedence(delete_mock)
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
+
 
 class TestNetworkSubnetPoolsService(BaseCmdServiceTests):
 
@@ -1340,6 +1655,24 @@
             })
         self._test_is_preserve_true([(self.get_method, self.response, 200)])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_prefix_opt_precedence(delete_mock)
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
+
 
 # begin global services
 class TestRegionService(BaseCmdServiceTests):
@@ -1392,6 +1725,34 @@
     def test_save_state(self):
         self._test_saved_state_true([(self.get_method, self.response, 200)])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        serv = self._create_cmd_service(
+            self.service_class, is_resource_list=True, prefix='tempest')
+        _, fixtures = self.run_function_with_mocks(
+            serv.run,
+            delete_mock
+        )
+
+        # cleanup returns []
+        fixtures[0].mock.assert_not_called()
+        fixtures[1].mock.assert_not_called()
+        fixtures[2].mock.assert_not_called()
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
+
 
 class TestDomainService(BaseCmdServiceTests):
 
@@ -1445,6 +1806,26 @@
     def test_save_state(self):
         self._test_saved_state_true([(self.get_method, self.response, 200)])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None),
+                       (self.mock_update, 'update', None)]
+        self._test_prefix_opt_precedence(delete_mock)
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None),
+                       (self.mock_update, 'update', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
+
 
 class TestProjectsService(BaseCmdServiceTests):
 
@@ -1518,6 +1899,24 @@
             })
         self._test_is_preserve_true([(self.get_method, self.response, 200)])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_prefix_opt_precedence(delete_mock)
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
+
 
 class TestImagesService(BaseCmdServiceTests):
 
@@ -1597,6 +1996,24 @@
             })
         self._test_is_preserve_true([(self.get_method, self.response, 200)])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_prefix_opt_precedence(delete_mock)
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
+
 
 class TestFlavorService(BaseCmdServiceTests):
 
@@ -1670,6 +2087,24 @@
             })
         self._test_is_preserve_true([(self.get_method, self.response, 200)])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_prefix_opt_precedence(delete_mock)
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
+
 
 class TestRoleService(BaseCmdServiceTests):
 
@@ -1716,6 +2151,24 @@
     def test_save_state(self):
         self._test_saved_state_true([(self.get_method, self.response, 200)])
 
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_prefix_opt_precedence(delete_mock)
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
+
 
 class TestUserService(BaseCmdServiceTests):
 
@@ -1782,3 +2235,21 @@
                 "password_expires_at": "1893-11-06T15:32:17.000000",
             })
         self._test_is_preserve_true([(self.get_method, self.response, 200)])
+
+    def test_prefix_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_prefix_opt_precedence(delete_mock)
+
+    def test_resource_list_opt_precedence(self):
+        delete_mock = [(self.filter_saved_state, [], None),
+                       (self.filter_resource_list, [], None),
+                       (self.filter_prefix, [], None),
+                       (self.get_method, self.response, 200),
+                       (self.delete_method, 'error', None),
+                       (self.log_method, 'exception', None)]
+        self._test_resource_list_opt_precedence(delete_mock)
diff --git a/tempest/tests/cmd/test_resource_list.json b/tempest/tests/cmd/test_resource_list.json
new file mode 100644
index 0000000..dfbc790
--- /dev/null
+++ b/tempest/tests/cmd/test_resource_list.json
@@ -0,0 +1,11 @@
+{
+  "project": {
+    "ce4e7edf051c439d8b81c4bfe581c5ef": "test"
+  },
+  "keypairs": {
+    "tempest-keypair-1215039183": ""
+  },
+  "users": {
+    "74463c83f9d640fe84c4376527ceff26": "test"
+  }
+}
diff --git a/tempest/tests/lib/common/test_rest_client.py b/tempest/tests/lib/common/test_rest_client.py
index 81a76e0..9bc6f60 100644
--- a/tempest/tests/lib/common/test_rest_client.py
+++ b/tempest/tests/lib/common/test_rest_client.py
@@ -13,6 +13,7 @@
 #    under the License.
 
 import copy
+from unittest import mock
 
 import fixtures
 import jsonschema
@@ -749,6 +750,110 @@
                           expected_code, read_code)
 
 
+class TestRecordResources(BaseRestClientTestClass):
+
+    def setUp(self):
+        self.fake_http = fake_http.fake_httplib2()
+        super(TestRecordResources, self).setUp()
+
+    def _cleanup_test_resource_record(self):
+        # clear resource_list.json file
+        with open('resource_list.json', 'w') as f:
+            f.write('{}')
+
+    def test_post_record_resources(self):
+        self.rest_client.record_resources = True
+        __, return_dict = self.rest_client.post(self.url, {}, {})
+        self.assertEqual({}, return_dict['headers'])
+        self.assertEqual({}, return_dict['body'])
+
+    def test_resource_record_no_top_key(self):
+        test_body_no_key = b'{}'
+        self.rest_client.resource_record(test_body_no_key)
+
+    def test_resource_record_dict(self):
+        test_dict_body = b'{"project": {"id": "test-id", "name": ""}}\n'
+        self.rest_client.resource_record(test_dict_body)
+
+        with open('resource_list.json', 'r') as f:
+            content = f.read()
+            resource_list_content = json.loads(content)
+
+        test_resource_list = {
+            "projects": {"test-id": ""}
+        }
+        self.assertEqual(resource_list_content, test_resource_list)
+
+        # cleanup
+        self._cleanup_test_resource_record()
+
+    def test_resource_record_list(self):
+        test_list_body = '''{
+            "user": [
+                {
+                    "id": "test-uuid",
+                    "name": "test-name"
+                },
+                {
+                    "id": "test-uuid2",
+                    "name": "test-name2"
+                }
+            ]
+        }'''
+        test_list_body = test_list_body.encode('utf-8')
+        self.rest_client.resource_record(test_list_body)
+
+        with open('resource_list.json', 'r') as f:
+            content = f.read()
+            resource_list_content = json.loads(content)
+
+        test_resource_list = {
+            "users": {
+                "test-uuid": "test-name",
+                "test-uuid2": "test-name2"
+            }
+        }
+        self.assertEqual(resource_list_content, test_resource_list)
+
+        # cleanup
+        self._cleanup_test_resource_record()
+
+    def test_resource_update_id(self):
+        data = {}
+        res_dict = {'id': 'test-uuid', 'name': 'test-name'}
+
+        self.rest_client.rec_rw_lock = mock.MagicMock()
+        self.rest_client.resource_update(data, 'user', res_dict)
+        result = {'users': {'test-uuid': 'test-name'}}
+        self.assertEqual(data, result)
+
+    def test_resource_update_name(self):
+        data = {'keypairs': {}}
+        res_dict = {'name': 'test-keypair'}
+
+        self.rest_client.rec_rw_lock = mock.MagicMock()
+        self.rest_client.resource_update(data, 'keypair', res_dict)
+        result = {'keypairs': {'test-keypair': ""}}
+        self.assertEqual(data, result)
+
+    def test_resource_update_no_id(self):
+        data = {}
+        res_dict = {'type': 'test', 'description': 'example'}
+
+        self.rest_client.rec_rw_lock = mock.MagicMock()
+        self.rest_client.resource_update(data, 'projects', res_dict)
+        result = {'projects': {}}
+        self.assertEqual(data, result)
+
+    def test_resource_update_not_dict(self):
+        data = {}
+        res_dict = 'test-string'
+
+        self.rest_client.rec_rw_lock = mock.MagicMock()
+        self.rest_client.resource_update(data, 'user', res_dict)
+        self.assertEqual(data, {})
+
+
 class TestResponseBody(base.TestCase):
 
     def test_str(self):
diff --git a/zuul.d/integrated-gate.yaml b/zuul.d/integrated-gate.yaml
index 1abf5e7..2fd6e36 100644
--- a/zuul.d/integrated-gate.yaml
+++ b/zuul.d/integrated-gate.yaml
@@ -17,6 +17,13 @@
         # TODO(gmann): Enable File injection tests once nova bug is fixed
         # https://bugs.launchpad.net/nova/+bug/1882421
         #   ENABLE_FILE_INJECTION: true
+      run_tempest_cleanup: true
+      run_tempest_cleanup_resource_list: true
+      devstack_local_conf:
+        test-config:
+          $TEMPEST_CONFIG:
+            DEFAULT:
+              record_resources: true
 
 - job:
     name: tempest-ipv6-only
@@ -37,6 +44,14 @@
       tools/tempest-extra-tests-list.txt.
     vars:
       tox_envlist: extra-tests
+      run_tempest_cleanup: true
+      run_tempest_cleanup_resource_list: true
+      run_tempest_dry_cleanup: true
+      devstack_local_conf:
+        test-config:
+          $TEMPEST_CONFIG:
+            DEFAULT:
+              record_resources: true
 
 - job:
     name: tempest-full-py3
