Merge "Remove redundant revert-to-snapshot test option"
diff --git a/manila_tempest_tests/common/constants.py b/manila_tempest_tests/common/constants.py
index 4ca87d2..25cf4ee 100644
--- a/manila_tempest_tests/common/constants.py
+++ b/manila_tempest_tests/common/constants.py
@@ -14,6 +14,8 @@
 STATUS_ERROR = 'error'
 STATUS_AVAILABLE = 'available'
 STATUS_ERROR_DELETING = 'error_deleting'
+STATUS_MIGRATING = 'migrating'
+
 TEMPEST_MANILA_PREFIX = 'tempest-manila'
 
 # Replication
diff --git a/manila_tempest_tests/config.py b/manila_tempest_tests/config.py
index 2ba9ae5..ab50c34 100644
--- a/manila_tempest_tests/config.py
+++ b/manila_tempest_tests/config.py
@@ -196,6 +196,10 @@
                 deprecated_name="run_migration_tests",
                 default=False,
                 help="Enable or disable driver-assisted migration tests."),
+    cfg.BoolOpt("run_migration_with_preserve_snapshots_tests",
+                default=False,
+                help="Enable or disable migration with "
+                     "preserve_snapshots tests set to True."),
     cfg.BoolOpt("run_manage_unmanage_tests",
                 default=False,
                 help="Defines whether to run manage/unmanage tests or not. "
diff --git a/manila_tempest_tests/services/share/v2/json/shares_client.py b/manila_tempest_tests/services/share/v2/json/shares_client.py
index d58af79..15e538c 100644
--- a/manila_tempest_tests/services/share/v2/json/shares_client.py
+++ b/manila_tempest_tests/services/share/v2/json/shares_client.py
@@ -493,6 +493,15 @@
         self.expected_success(200, resp.status)
         return self._parse_resp(body)
 
+    def list_snapshots_for_share(self, share_id, detailed=False,
+                                 version=LATEST_MICROVERSION):
+        """Get list of snapshots for given share."""
+        uri = ('snapshots/detail?share_id=%s' % share_id
+               if detailed else 'snapshots?share_id=%s' % share_id)
+        resp, body = self.get(uri, version=version)
+        self.expected_success(200, resp.status)
+        return self._parse_resp(body)
+
     def list_snapshots_with_detail(self, params=None,
                                    version=LATEST_MICROVERSION):
         """Get detailed list of share snapshots w/o filters."""
diff --git a/manila_tempest_tests/tests/api/admin/test_migration.py b/manila_tempest_tests/tests/api/admin/test_migration.py
index 5a30bae..bdc6248 100644
--- a/manila_tempest_tests/tests/api/admin/test_migration.py
+++ b/manila_tempest_tests/tests/api/admin/test_migration.py
@@ -17,6 +17,7 @@
 import ddt
 from tempest import config
 from tempest.lib.common.utils import data_utils
+import testtools
 from testtools import testcase as tc
 
 from manila_tempest_tests.common import constants
@@ -26,9 +27,8 @@
 CONF = config.CONF
 
 
-@ddt.ddt
-class MigrationNFSTest(base.BaseSharesAdminTest):
-    """Tests Share Migration for NFS shares.
+class MigrationBase(base.BaseSharesAdminTest):
+    """Base test class for Share Migration.
 
     Tests share migration in multi-backend environment.
 
@@ -52,17 +52,22 @@
     configuration flag to be tested.
     """
 
-    protocol = "nfs"
+    protocol = None
 
     @classmethod
     def resource_setup(cls):
-        super(MigrationNFSTest, cls).resource_setup()
+        super(MigrationBase, cls).resource_setup()
         if cls.protocol not in CONF.share.enable_protocols:
             message = "%s tests are disabled." % cls.protocol
             raise cls.skipException(message)
         if not (CONF.share.run_host_assisted_migration_tests or
                 CONF.share.run_driver_assisted_migration_tests):
             raise cls.skipException("Share migration tests are disabled.")
+        cls.pools = cls.shares_v2_client.list_pools(detail=True)['pools']
+
+        if len(cls.pools) < 2:
+            raise cls.skipException("At least two different pool entries are "
+                                    "needed to run share migration tests.")
 
         cls.new_type = cls.create_share_type(
             name=data_utils.rand_name('new_share_type_for_migration'),
@@ -75,166 +80,23 @@
             extra_specs=utils.get_configured_extra_specs(
                 variation='opposite_driver_modes'))
 
-    @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
-    @base.skip_if_microversion_lt("2.29")
-    @ddt.data(True, False)
-    def test_migration_cancel(self, force_host_assisted):
+    def _setup_migration(self, share, opposite=False):
 
-        self._check_migration_enabled(force_host_assisted)
-
-        share, dest_pool = self._setup_migration()
-
-        task_state = (constants.TASK_STATE_DATA_COPYING_COMPLETED
-                      if force_host_assisted
-                      else constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE)
-
-        share = self.migrate_share(
-            share['id'], dest_pool, wait_for_status=task_state,
-            force_host_assisted_migration=force_host_assisted)
-
-        self._validate_migration_successful(
-            dest_pool, share, task_state, complete=False)
-
-        progress = self.shares_v2_client.migration_get_progress(share['id'])
-
-        self.assertEqual(task_state, progress['task_state'])
-        self.assertEqual(100, progress['total_progress'])
-
-        share = self.migration_cancel(share['id'], dest_pool)
-
-        progress = self.shares_v2_client.migration_get_progress(share['id'])
-
-        self.assertEqual(
-            constants.TASK_STATE_MIGRATION_CANCELLED, progress['task_state'])
-        self.assertEqual(100, progress['total_progress'])
-
-        self._validate_migration_successful(
-            dest_pool, share, constants.TASK_STATE_MIGRATION_CANCELLED,
-            complete=False)
-
-    @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
-    @base.skip_if_microversion_lt("2.29")
-    @ddt.data(True, False)
-    def test_migration_opposite_driver_modes(self, force_host_assisted):
-
-        self._check_migration_enabled(force_host_assisted)
-
-        share, dest_pool = self._setup_migration(opposite=True)
-
-        old_share_network_id = share['share_network_id']
-
-        # If currently configured is DHSS=False,
-        # then we need it for DHSS=True
-        if not CONF.share.multitenancy_enabled:
-
-            new_share_network_id = self.provide_share_network(
-                self.shares_v2_client, self.os_admin.networks_client,
-                isolated_creds_client=None, ignore_multitenancy_config=True)
-
-        # If currently configured is DHSS=True,
-        # then we must pass None for DHSS=False
+        if opposite:
+            dest_type = self.new_type_opposite['share_type']
         else:
-            new_share_network_id = None
+            dest_type = self.new_type['share_type']
 
-        old_share_type_id = share['share_type']
-        new_share_type_id = self.new_type_opposite['share_type']['id']
+        dest_pool = utils.choose_matching_backend(share, self.pools, dest_type)
 
-        task_state = (constants.TASK_STATE_DATA_COPYING_COMPLETED
-                      if force_host_assisted
-                      else constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE)
-
-        share = self.migrate_share(
-            share['id'], dest_pool,
-            force_host_assisted_migration=force_host_assisted,
-            wait_for_status=task_state, new_share_type_id=new_share_type_id,
-            new_share_network_id=new_share_network_id)
-
-        self._validate_migration_successful(
-            dest_pool, share, task_state, complete=False,
-            share_network_id=old_share_network_id,
-            share_type_id=old_share_type_id)
-
-        progress = self.shares_v2_client.migration_get_progress(share['id'])
-
-        self.assertEqual(task_state, progress['task_state'])
-        self.assertEqual(100, progress['total_progress'])
-
-        share = self.migration_complete(share['id'], dest_pool)
-
-        progress = self.shares_v2_client.migration_get_progress(share['id'])
-
-        self.assertEqual(
-            constants.TASK_STATE_MIGRATION_SUCCESS, progress['task_state'])
-        self.assertEqual(100, progress['total_progress'])
-
-        self._validate_migration_successful(
-            dest_pool, share, constants.TASK_STATE_MIGRATION_SUCCESS,
-            complete=True, share_network_id=new_share_network_id,
-            share_type_id=new_share_type_id)
-
-    @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
-    @base.skip_if_microversion_lt("2.29")
-    @ddt.data(True, False)
-    def test_migration_2phase(self, force_host_assisted):
-
-        self._check_migration_enabled(force_host_assisted)
-
-        share, dest_pool = self._setup_migration()
-
-        task_state = (constants.TASK_STATE_DATA_COPYING_COMPLETED
-                      if force_host_assisted
-                      else constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE)
-
-        old_share_network_id = share['share_network_id']
-
-        if CONF.share.multitenancy_enabled:
-            new_share_network_id = self._create_secondary_share_network(
-                old_share_network_id)
+        if opposite:
+            if not dest_pool:
+                raise self.skipException(
+                    "This test requires two pools enabled with different "
+                    "driver modes.")
         else:
-            new_share_network_id = None
-
-        old_share_type_id = share['share_type']
-        new_share_type_id = self.new_type['share_type']['id']
-
-        share = self.migrate_share(
-            share['id'], dest_pool,
-            force_host_assisted_migration=force_host_assisted,
-            wait_for_status=task_state, new_share_type_id=new_share_type_id,
-            new_share_network_id=new_share_network_id)
-
-        self._validate_migration_successful(
-            dest_pool, share, task_state, complete=False,
-            share_network_id=old_share_network_id,
-            share_type_id=old_share_type_id)
-
-        progress = self.shares_v2_client.migration_get_progress(share['id'])
-
-        self.assertEqual(task_state, progress['task_state'])
-        self.assertEqual(100, progress['total_progress'])
-
-        share = self.migration_complete(share['id'], dest_pool)
-
-        progress = self.shares_v2_client.migration_get_progress(share['id'])
-
-        self.assertEqual(
-            constants.TASK_STATE_MIGRATION_SUCCESS, progress['task_state'])
-        self.assertEqual(100, progress['total_progress'])
-
-        self._validate_migration_successful(
-            dest_pool, share, constants.TASK_STATE_MIGRATION_SUCCESS,
-            complete=True, share_network_id=new_share_network_id,
-            share_type_id=new_share_type_id)
-
-    def _setup_migration(self, opposite=False):
-
-        pools = self.shares_v2_client.list_pools(detail=True)['pools']
-
-        if len(pools) < 2:
-            raise self.skipException("At least two different pool entries are "
-                                     "needed to run share migration tests.")
-
-        share = self.create_share(self.protocol)
-        share = self.shares_v2_client.get_share(share['id'])
+            self.assertIsNotNone(dest_pool)
+            self.assertIsNotNone(dest_pool.get('name'))
 
         old_exports = self.shares_v2_client.list_share_export_locations(
             share['id'])
@@ -257,23 +119,8 @@
             share['id'], constants.RULE_STATE_ACTIVE,
             status_attr='access_rules_status')
 
-        if opposite:
-            dest_type = self.new_type_opposite['share_type']
-        else:
-            dest_type = self.new_type['share_type']
-
-        dest_pool = utils.choose_matching_backend(share, pools, dest_type)
-
-        if opposite:
-            if not dest_pool:
-                raise self.skipException(
-                    "This test requires two pools enabled with different "
-                    "driver modes.")
-        else:
-            self.assertIsNotNone(dest_pool)
-            self.assertIsNotNone(dest_pool.get('name'))
-
         dest_pool = dest_pool['name']
+        share = self.shares_v2_client.get_share(share['id'])
 
         return share, dest_pool
 
@@ -325,10 +172,6 @@
                 self.assertIn(r, filtered_rules)
             self.assertEqual(len(expected_rules), len(filtered_rules))
 
-            self.shares_v2_client.delete_share(share['id'])
-            self.shares_v2_client.wait_for_resource_deletion(
-                share_id=share['id'])
-
         # Share not migrated yet
         else:
             self.assertNotEqual(dest_pool, share['host'])
@@ -355,3 +198,437 @@
             neutron_subnet_id=old_share_network['neutron_subnet_id'])
 
         return new_share_network['id']
+
+    def _test_resize_post_migration(self, force_host_assisted, resize):
+        self._check_migration_enabled(force_host_assisted)
+        new_size = CONF.share.share_size + 1
+        share = self.create_share(self.protocol, size=new_size)
+        share = self.shares_v2_client.get_share(share['id'])
+
+        share, dest_pool = self._setup_migration(share)
+
+        task_state, new_share_network_id, new_share_type_id = (
+            self._get_migration_data(share, force_host_assisted))
+
+        share = self.migrate_share(
+            share['id'], dest_pool,
+            force_host_assisted_migration=force_host_assisted,
+            wait_for_status=task_state, new_share_type_id=new_share_type_id,
+            new_share_network_id=new_share_network_id)
+
+        share = self.migration_complete(share['id'], dest_pool)
+        if resize == 'extend':
+            new_size = CONF.share.share_size + 2
+            self.shares_v2_client.extend_share(share['id'], new_size)
+            self.shares_v2_client.wait_for_share_status(
+                share['id'], constants.STATUS_AVAILABLE)
+            share = self.shares_v2_client.get_share(share["id"])
+            self.assertEqual(new_size, int(share["size"]))
+        else:
+            new_size = CONF.share.share_size
+            self.shares_v2_client.shrink_share(share['id'], new_size)
+            self.shares_v2_client.wait_for_share_status(
+                share['id'], constants.STATUS_AVAILABLE)
+            share = self.shares_v2_client.get_share(share["id"])
+            self.assertEqual(new_size, int(share["size"]))
+
+        self._cleanup_share(share)
+
+    def _get_migration_data(self, share, force_host_assisted=False):
+        task_state = (constants.TASK_STATE_DATA_COPYING_COMPLETED
+                      if force_host_assisted
+                      else constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE)
+
+        old_share_network_id = share['share_network_id']
+
+        if CONF.share.multitenancy_enabled:
+            new_share_network_id = self._create_secondary_share_network(
+                old_share_network_id)
+
+        else:
+            new_share_network_id = None
+
+        new_share_type_id = self.new_type['share_type']['id']
+        return task_state, new_share_network_id, new_share_type_id
+
+    def _validate_snapshot(self, share, snapshot1, snapshot2):
+        snapshot_list = self.shares_v2_client.list_snapshots_for_share(
+            share['id'])
+        msg = "Share %s has no snapshot." % share['id']
+        # Verify that snapshot list is not empty
+        self.assertNotEmpty(snapshot_list, msg)
+        snapshot_id_list = [snap['id'] for snap in snapshot_list]
+
+        # verify that after migration original snapshots are retained
+        self.assertIn(snapshot1['id'], snapshot_id_list)
+        self.assertIn(snapshot2['id'], snapshot_id_list)
+        # Verify that a share can be created from a snapshot after migration
+        snapshot1_share = self.create_share(
+            self.protocol, size=share['size'], snapshot_id=snapshot1['id'],
+            share_network_id=share['share_network_id'])
+        self.assertEqual(snapshot1['id'], snapshot1_share['snapshot_id'])
+        self._cleanup_share(share)
+
+    def _validate_share_migration_with_different_snapshot_capability_type(
+            self, force_host_assisted, snapshot_capable):
+
+        self._check_migration_enabled(force_host_assisted)
+        ss_type, no_ss_type = self._create_share_type_for_snapshot_capability()
+
+        if snapshot_capable:
+            share_type = ss_type['share_type']
+            share_type_id = no_ss_type['share_type']['id']
+            new_share_type_id = ss_type['share_type']['id']
+        else:
+            share_type = no_ss_type['share_type']
+            share_type_id = ss_type['share_type']['id']
+            new_share_type_id = no_ss_type['share_type']['id']
+
+        share = self.create_share(
+            self.protocol, share_type_id=share_type_id)
+        share = self.shares_v2_client.get_share(share['id'])
+
+        if snapshot_capable:
+            self.assertEqual(False, share['snapshot_support'])
+        else:
+            # Verify that share has snapshot support capability
+            self.assertTrue(share['snapshot_support'])
+
+        dest_pool = utils.choose_matching_backend(share, self.pools,
+                                                  share_type)
+        task_state, new_share_network_id, __ = (
+            self._get_migration_data(share, force_host_assisted))
+        share = self.migrate_share(
+            share['id'], dest_pool['name'],
+            force_host_assisted_migration=force_host_assisted,
+            wait_for_status=task_state,
+            new_share_type_id=new_share_type_id,
+            new_share_network_id=new_share_network_id)
+        share = self.migration_complete(share['id'], dest_pool)
+
+        if snapshot_capable:
+            # Verify that migrated share does have snapshot support capability
+            self.assertTrue(share['snapshot_support'])
+        else:
+            # Verify that migrated share don't have snapshot support capability
+            self.assertEqual(False, share['snapshot_support'])
+
+        self._cleanup_share(share)
+
+    def _create_share_type_for_snapshot_capability(self):
+        # Share type with snapshot support
+        st_name = data_utils.rand_name(
+            'snapshot_capable_share_type_for_migration')
+        extra_specs = self.add_extra_specs_to_dict({"snapshot_support": True})
+        ss_type = self.create_share_type(st_name, extra_specs=extra_specs)
+
+        # New share type with no snapshot support capability
+        # to which a share will be migrated
+        new_st_name = data_utils.rand_name(
+            'snapshot_noncapable_share_type_for_migration')
+        extra_specs = {
+            "driver_handles_share_servers": CONF.share.multitenancy_enabled
+        }
+        no_ss_type = self.create_share_type(new_st_name,
+                                            extra_specs=extra_specs)
+        return ss_type, no_ss_type
+
+    def _cleanup_share(self, share):
+        resource = {"type": "share", "id": share["id"],
+                    "client": self.shares_v2_client}
+        # NOTE(Yogi1): Share needs to be cleaned up explicitly at the end of
+        #  test otherwise, newly created share_network will not get cleaned up.
+        self.method_resources.insert(0, resource)
+
+
+@ddt.ddt
+class MigrationCancelNFSTest(MigrationBase):
+    protocol = "nfs"
+
+    @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
+    @base.skip_if_microversion_lt("2.29")
+    @ddt.data(True, False)
+    def test_migration_cancel(self, force_host_assisted):
+        self._check_migration_enabled(force_host_assisted)
+
+        share = self.create_share(self.protocol)
+        share = self.shares_v2_client.get_share(share['id'])
+        share, dest_pool = self._setup_migration(share)
+        task_state = (constants.TASK_STATE_DATA_COPYING_COMPLETED
+                      if force_host_assisted
+                      else constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE)
+
+        share = self.migrate_share(
+            share['id'], dest_pool, wait_for_status=task_state,
+            force_host_assisted_migration=force_host_assisted)
+
+        self._validate_migration_successful(
+            dest_pool, share, task_state, complete=False)
+
+        progress = self.shares_v2_client.migration_get_progress(share['id'])
+
+        self.assertEqual(task_state, progress['task_state'])
+        self.assertEqual(100, progress['total_progress'])
+
+        share = self.migration_cancel(share['id'], dest_pool)
+        progress = self.shares_v2_client.migration_get_progress(share['id'])
+
+        self.assertEqual(
+            constants.TASK_STATE_MIGRATION_CANCELLED, progress['task_state'])
+        self.assertEqual(100, progress['total_progress'])
+
+        self._validate_migration_successful(
+            dest_pool, share, constants.TASK_STATE_MIGRATION_CANCELLED,
+            complete=False)
+
+    @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
+    @base.skip_if_microversion_lt("2.29")
+    @testtools.skipUnless(
+        CONF.share.run_snapshot_tests, 'Snapshot tests are disabled.')
+    @testtools.skipUnless(
+        CONF.share.run_driver_assisted_migration_tests,
+        'Driver-assisted migration tests are disabled.')
+    @testtools.skipUnless(
+        CONF.share.run_migration_with_preserve_snapshots_tests,
+        'Migration with preserve snapshots tests are disabled.')
+    def test_migration_cancel_share_with_snapshot(self):
+        share = self.create_share(self.protocol)
+        share = self.shares_v2_client.get_share(share['id'])
+
+        share, dest_pool = self._setup_migration(share)
+        snapshot1 = self.create_snapshot_wait_for_active(share['id'])
+        snapshot2 = self.create_snapshot_wait_for_active(share['id'])
+
+        task_state, new_share_network_id, new_share_type_id = (
+            self._get_migration_data(share))
+
+        share = self.migrate_share(
+            share['id'], dest_pool,
+            wait_for_status=task_state, new_share_type_id=new_share_type_id,
+            new_share_network_id=new_share_network_id, preserve_snapshots=True)
+
+        share = self.migration_cancel(share['id'], dest_pool)
+        self._validate_snapshot(share, snapshot1, snapshot2)
+
+
+@ddt.ddt
+class MigrationOppositeDriverModesNFSTest(MigrationBase):
+    protocol = "nfs"
+
+    @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
+    @base.skip_if_microversion_lt("2.29")
+    @ddt.data(True, False)
+    def test_migration_opposite_driver_modes(self, force_host_assisted):
+        self._check_migration_enabled(force_host_assisted)
+
+        share = self.create_share(self.protocol)
+        share = self.shares_v2_client.get_share(share['id'])
+        share, dest_pool = self._setup_migration(share, opposite=True)
+
+        if not CONF.share.multitenancy_enabled:
+            # If currently configured is DHSS=False,
+            # then we need it for DHSS=True
+            new_share_network_id = self.provide_share_network(
+                self.shares_v2_client,
+                self.os_admin.networks_client,
+                isolated_creds_client=None,
+                ignore_multitenancy_config=True,
+            )
+        else:
+            # If currently configured is DHSS=True,
+            # then we must pass None for DHSS=False
+            new_share_network_id = None
+
+        old_share_network_id = share['share_network_id']
+        old_share_type_id = share['share_type']
+        new_share_type_id = self.new_type_opposite['share_type']['id']
+
+        task_state = (constants.TASK_STATE_DATA_COPYING_COMPLETED
+                      if force_host_assisted
+                      else constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE)
+
+        share = self.migrate_share(
+            share['id'], dest_pool,
+            force_host_assisted_migration=force_host_assisted,
+            wait_for_status=task_state, new_share_type_id=new_share_type_id,
+            new_share_network_id=new_share_network_id)
+
+        self._validate_migration_successful(
+            dest_pool, share, task_state, complete=False,
+            share_network_id=old_share_network_id,
+            share_type_id=old_share_type_id)
+
+        progress = self.shares_v2_client.migration_get_progress(share['id'])
+
+        self.assertEqual(task_state, progress['task_state'])
+        self.assertEqual(100, progress['total_progress'])
+
+        share = self.migration_complete(share['id'], dest_pool)
+
+        progress = self.shares_v2_client.migration_get_progress(share['id'])
+
+        self.assertEqual(
+            constants.TASK_STATE_MIGRATION_SUCCESS, progress['task_state'])
+        self.assertEqual(100, progress['total_progress'])
+
+        self._validate_migration_successful(
+            dest_pool, share, constants.TASK_STATE_MIGRATION_SUCCESS,
+            complete=True, share_network_id=new_share_network_id,
+            share_type_id=new_share_type_id)
+
+
+@ddt.ddt
+class MigrationTwoPhaseNFSTest(MigrationBase):
+    protocol = "nfs"
+
+    @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
+    @base.skip_if_microversion_lt("2.29")
+    @ddt.data(True, False)
+    def test_migration_2phase(self, force_host_assisted):
+        self._check_migration_enabled(force_host_assisted)
+
+        share = self.create_share(self.protocol)
+        share = self.shares_v2_client.get_share(share['id'])
+        share, dest_pool = self._setup_migration(share)
+
+        old_share_network_id = share['share_network_id']
+        old_share_type_id = share['share_type']
+        task_state, new_share_network_id, new_share_type_id = (
+            self._get_migration_data(share, force_host_assisted))
+
+        share = self.migrate_share(
+            share['id'], dest_pool,
+            force_host_assisted_migration=force_host_assisted,
+            wait_for_status=task_state, new_share_type_id=new_share_type_id,
+            new_share_network_id=new_share_network_id)
+
+        self._validate_migration_successful(
+            dest_pool, share, task_state, complete=False,
+            share_network_id=old_share_network_id,
+            share_type_id=old_share_type_id)
+
+        progress = self.shares_v2_client.migration_get_progress(share['id'])
+
+        self.assertEqual(task_state, progress['task_state'])
+        self.assertEqual(100, progress['total_progress'])
+
+        share = self.migration_complete(share['id'], dest_pool)
+
+        progress = self.shares_v2_client.migration_get_progress(share['id'])
+
+        self.assertEqual(
+            constants.TASK_STATE_MIGRATION_SUCCESS, progress['task_state'])
+        self.assertEqual(100, progress['total_progress'])
+
+        self._validate_migration_successful(
+            dest_pool, share, constants.TASK_STATE_MIGRATION_SUCCESS,
+            complete=True, share_network_id=new_share_network_id,
+            share_type_id=new_share_type_id)
+        self._cleanup_share(share)
+
+
+@ddt.ddt
+class MigrationWithShareExtendingNFSTest(MigrationBase):
+    protocol = "nfs"
+
+    @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
+    @base.skip_if_microversion_lt("2.29")
+    @testtools.skipUnless(
+        CONF.share.run_extend_tests, 'Extend share tests are disabled.')
+    @ddt.data(True, False)
+    def test_extend_on_migrated_share(self, force_host_assisted):
+        self._test_resize_post_migration(force_host_assisted, resize='extend')
+
+
+@ddt.ddt
+class MigrationWithShareShrinkingNFSTest(MigrationBase):
+    protocol = "nfs"
+
+    @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
+    @base.skip_if_microversion_lt("2.29")
+    @testtools.skipUnless(
+        CONF.share.run_shrink_tests, 'Shrink share tests are disabled.')
+    @ddt.data(True, False)
+    def test_shrink_on_migrated_share(self, force_host_assisted):
+        self._test_resize_post_migration(force_host_assisted, resize='shrink')
+
+
+@ddt.ddt
+class MigrationOfShareWithSnapshotNFSTest(MigrationBase):
+    protocol = "nfs"
+
+    @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
+    @base.skip_if_microversion_lt("2.29")
+    @testtools.skipUnless(
+        CONF.share.run_snapshot_tests, 'Snapshot tests are disabled.')
+    @testtools.skipUnless(
+        CONF.share.run_driver_assisted_migration_tests,
+        'Driver-assisted migration tests are disabled.')
+    @testtools.skipUnless(
+        CONF.share.run_migration_with_preserve_snapshots_tests,
+        'Migration with preserve snapshots tests are disabled.')
+    def test_migrating_share_with_snapshot(self):
+        ss_type, __ = self._create_share_type_for_snapshot_capability()
+
+        share = self.create_share(self.protocol, cleanup_in_class=False)
+        share = self.shares_v2_client.get_share(share['id'])
+
+        share, dest_pool = self._setup_migration(share)
+        snapshot1 = self.create_snapshot_wait_for_active(
+            share['id'], cleanup_in_class=False)
+        snapshot2 = self.create_snapshot_wait_for_active(
+            share['id'], cleanup_in_class=False)
+
+        task_state, new_share_network_id, __ = self._get_migration_data(share)
+
+        share = self.migrate_share(
+            share['id'], dest_pool,
+            wait_for_status=task_state,
+            new_share_type_id=ss_type['share_type']['id'],
+            new_share_network_id=new_share_network_id, preserve_snapshots=True)
+
+        share = self.migration_complete(share['id'], dest_pool)
+
+        self._validate_snapshot(share, snapshot1, snapshot2)
+
+
+@ddt.ddt
+class MigrationWithDifferentSnapshotSupportNFSTest(MigrationBase):
+    protocol = "nfs"
+
+    @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
+    @base.skip_if_microversion_lt("2.29")
+    @testtools.skipUnless(CONF.share.run_snapshot_tests,
+                          'Snapshot tests are disabled.')
+    @ddt.data(True, False)
+    def test_migrate_share_to_snapshot_capability_share_type(
+            self, force_host_assisted):
+        # Verify that share with no snapshot support type can be migrated
+        # to new share type which supports the snapshot
+        self._validate_share_migration_with_different_snapshot_capability_type(
+            force_host_assisted, True)
+
+    @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
+    @base.skip_if_microversion_lt("2.29")
+    @testtools.skipUnless(CONF.share.run_snapshot_tests,
+                          'Snapshot tests are disabled.')
+    @ddt.data(True, False)
+    def test_migrate_share_to_no_snapshot_capability_share_type(
+            self, force_host_assisted):
+        # Verify that share with snapshot support type can be migrated
+        # to new share type which doesn't support the snapshot
+        self._validate_share_migration_with_different_snapshot_capability_type(
+            force_host_assisted, False)
+
+
+# NOTE(u_glide): this function is required to exclude MigrationBase from
+# executed test cases.
+# See: https://docs.python.org/2/library/unittest.html#load-tests-protocol
+# for details.
+def load_tests(loader, tests, _):
+    result = []
+    for test_case in tests:
+        if not test_case._tests or type(test_case._tests[0]) is MigrationBase:
+            continue
+        result.append(test_case)
+    return loader.suiteClass(result)
diff --git a/manila_tempest_tests/tests/api/admin/test_migration_negative.py b/manila_tempest_tests/tests/api/admin/test_migration_negative.py
index 9d25fff..87ded63 100644
--- a/manila_tempest_tests/tests/api/admin/test_migration_negative.py
+++ b/manila_tempest_tests/tests/api/admin/test_migration_negative.py
@@ -54,7 +54,8 @@
             raise cls.skipException("At least two different pool entries "
                                     "are needed to run share migration tests.")
 
-        cls.share = cls.create_share(cls.protocol)
+        cls.share = cls.create_share(cls.protocol,
+                                     size=CONF.share.share_size+1)
         cls.share = cls.shares_client.get_share(cls.share['id'])
 
         cls.default_type = cls.shares_v2_client.list_share_types(
@@ -130,8 +131,9 @@
             lib_exc.Conflict, self.shares_v2_client.migrate_share,
             self.share['id'], self.dest_pool,
             force_host_assisted_migration=True)
-        self.shares_client.delete_snapshot(snap['id'])
-        self.shares_client.wait_for_resource_deletion(snapshot_id=snap["id"])
+        self.shares_v2_client.delete_snapshot(snap['id'])
+        self.shares_v2_client.wait_for_resource_deletion(snapshot_id=snap[
+            "id"])
 
     @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
     @base.skip_if_microversion_lt("2.29")
@@ -258,3 +260,67 @@
             self.share['id'], self.dest_pool,
             new_share_type_id=new_type_opposite['share_type']['id'],
             new_share_network_id=new_share_network_id)
+
+    @testtools.skipUnless(CONF.share.run_driver_assisted_migration_tests,
+                          "Driver-assisted migration tests are disabled.")
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+    @base.skip_if_microversion_lt("2.29")
+    def test_create_snapshot_during_share_migration(self):
+        self._test_share_actions_during_share_migration('create_snapshot', [])
+
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+    @base.skip_if_microversion_lt("2.29")
+    @ddt.data(('extend_share', [CONF.share.share_size + 2]),
+              ('shrink_share', [CONF.share.share_size]))
+    @ddt.unpack
+    def test_share_resize_during_share_migration(self, method_name, *args):
+        self._test_share_actions_during_share_migration(method_name, *args)
+
+    def skip_if_tests_are_disabled(self, method_name):
+        property_to_evaluate = {
+            'extend_share': CONF.share.run_extend_tests,
+            'shrink_share': CONF.share.run_shrink_tests,
+            'create_snapshot': CONF.share.run_snapshot_tests,
+        }
+        if not property_to_evaluate[method_name]:
+            raise self.skipException(method_name + 'tests are disabled.')
+
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+    @base.skip_if_microversion_lt("2.29")
+    def test_add_access_rule_during_migration(self):
+        access_type = "ip"
+        access_to = "50.50.50.50"
+        self.shares_v2_client.reset_state(self.share['id'],
+                                          constants.STATUS_MIGRATING)
+        self.shares_v2_client.reset_task_state(
+            self.share['id'],
+            constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE)
+        self.assertRaises(
+            lib_exc.BadRequest,
+            self.shares_v2_client.create_access_rule,
+            self.share['id'], access_type, access_to)
+        # Revert the migration state by cancelling the migration
+        self.shares_v2_client.reset_state(self.share['id'],
+                                          constants.STATUS_AVAILABLE)
+        self.shares_v2_client.reset_task_state(
+            self.share['id'],
+            constants.TASK_STATE_MIGRATION_CANCELLED)
+
+    def _test_share_actions_during_share_migration(self, method_name, *args):
+        self.skip_if_tests_are_disabled(method_name)
+        # Verify various share operations during share migration
+        self.shares_v2_client.reset_state(self.share['id'],
+                                          constants.STATUS_MIGRATING)
+        self.shares_v2_client.reset_task_state(
+            self.share['id'],
+            constants.TASK_STATE_MIGRATION_DRIVER_PHASE1_DONE)
+
+        self.assertRaises(
+            lib_exc.BadRequest, getattr(self.shares_v2_client, method_name),
+            self.share['id'], *args)
+        # Revert the migration state by cancelling the migration
+        self.shares_v2_client.reset_state(self.share['id'],
+                                          constants.STATUS_AVAILABLE)
+        self.shares_v2_client.reset_task_state(
+            self.share['id'],
+            constants.TASK_STATE_MIGRATION_CANCELLED)
diff --git a/manila_tempest_tests/tests/api/admin/test_multi_backend.py b/manila_tempest_tests/tests/api/admin/test_multi_backend.py
index 8aeabc3..79c9c19 100644
--- a/manila_tempest_tests/tests/api/admin/test_multi_backend.py
+++ b/manila_tempest_tests/tests/api/admin/test_multi_backend.py
@@ -61,7 +61,7 @@
         # Share's 'host' should be like "hostname@backend_name"
         for share in self.shares:
             get = self.shares_client.get_share(share['id'])
-            self.assertTrue(len(get["host"].split("@")) == 2)
+            self.assertEqual(2, len(get["host"].split("@")))
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
     def test_share_share_type(self):
diff --git a/manila_tempest_tests/tests/api/admin/test_share_servers.py b/manila_tempest_tests/tests/api/admin/test_share_servers.py
index 25cc406..0c784e9 100644
--- a/manila_tempest_tests/tests/api/admin/test_share_servers.py
+++ b/manila_tempest_tests/tests/api/admin/test_share_servers.py
@@ -15,9 +15,11 @@
 
 import re
 
+import ddt
 import six
 from tempest import config
 from tempest.lib import exceptions as lib_exc
+import testtools
 from testtools import testcase as tc
 
 from manila_tempest_tests.tests.api import base
@@ -25,21 +27,21 @@
 CONF = config.CONF
 
 
+@testtools.skipUnless(
+    CONF.share.multitenancy_enabled,
+    'Share servers can be tested only with multitenant drivers.')
+@ddt.ddt
 class ShareServersAdminTest(base.BaseSharesAdminTest):
 
     @classmethod
     def resource_setup(cls):
         super(ShareServersAdminTest, cls).resource_setup()
-        if not CONF.share.multitenancy_enabled:
-            msg = ("Share servers can be tested only with multitenant drivers."
-                   " Skipping.")
-            raise cls.skipException(msg)
         cls.share = cls.create_share()
-        cls.share_network = cls.shares_client.get_share_network(
-            cls.shares_client.share_network_id)
+        cls.share_network = cls.shares_v2_client.get_share_network(
+            cls.shares_v2_client.share_network_id)
         if not cls.share_network["name"]:
             sn_id = cls.share_network["id"]
-            cls.share_network = cls.shares_client.update_share_network(
+            cls.share_network = cls.shares_v2_client.update_share_network(
                 sn_id, name="sn_%s" % sn_id)
         cls.sn_name_and_id = [
             cls.share_network["name"],
@@ -52,7 +54,7 @@
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
     def test_list_share_servers_without_filters(self):
-        servers = self.shares_client.list_share_servers()
+        servers = self.shares_v2_client.list_share_servers()
         self.assertGreater(len(servers), 0)
         keys = [
             "id",
@@ -84,10 +86,9 @@
     @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
     def test_list_share_servers_with_host_filter(self):
         # Get list of share servers and remember 'host' name
-        servers = self.shares_client.list_share_servers()
+        servers = self.shares_v2_client.list_share_servers()
         # Remember name of server that was used by this test suite
         # to be sure it will be still existing.
-        host = ""
         for server in servers:
             if server["share_network_name"] in self.sn_name_and_id:
                 if not server["host"]:
@@ -96,47 +97,32 @@
                     raise lib_exc.InvalidContentType(message=msg)
                 host = server["host"]
                 break
-        if not host:
+        else:
             msg = ("Appropriate server was not found. Its share_network_data"
                    ": '%s'. List of servers: '%s'.") % (self.sn_name_and_id,
-                                                        str(servers))
+                                                        six.text_type(servers))
             raise lib_exc.NotFound(message=msg)
         search_opts = {"host": host}
-        servers = self.shares_client.list_share_servers(search_opts)
+        servers = self.shares_v2_client.list_share_servers(search_opts)
         self.assertGreater(len(servers), 0)
         for server in servers:
             self.assertEqual(server["host"], host)
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
     def test_list_share_servers_with_status_filter(self):
-        # Get list of share servers
-        servers = self.shares_client.list_share_servers()
-        # Remember status of server that was used by this test suite
-        # to be sure it will be still existing.
-        status = ""
-        for server in servers:
-            if server["share_network_name"] in self.sn_name_and_id:
-                if not server["status"]:
-                    msg = ("Server '%s' has wrong value for status - "
-                           "'%s'.") % (server["id"], server["host"])
-                    raise lib_exc.InvalidContentType(message=msg)
-                status = server["status"]
-                break
-        if not status:
-            msg = ("Appropriate server was not found. Its share_network_data"
-                   ": '%s'. List of servers: '%s'.") % (self.sn_name_and_id,
-                                                        str(servers))
-            raise lib_exc.NotFound(message=msg)
-        search_opts = {"status": status}
-        servers = self.shares_client.list_share_servers(search_opts)
+        search_opts = {"status": "active"}
+        servers = self.shares_v2_client.list_share_servers(search_opts)
+
+        # At least 1 share server should exist always - the one created
+        # for this class.
         self.assertGreater(len(servers), 0)
         for server in servers:
-            self.assertEqual(server["status"], status)
+            self.assertEqual(server["status"], "active")
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
     def test_list_share_servers_with_project_id_filter(self):
         search_opts = {"project_id": self.share_network["project_id"]}
-        servers = self.shares_client.list_share_servers(search_opts)
+        servers = self.shares_v2_client.list_share_servers(search_opts)
         # Should exist, at least, one share server, used by this test suite.
         self.assertGreater(len(servers), 0)
         for server in servers:
@@ -146,7 +132,7 @@
     @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
     def test_list_share_servers_with_share_network_name_filter(self):
         search_opts = {"share_network": self.share_network["name"]}
-        servers = self.shares_client.list_share_servers(search_opts)
+        servers = self.shares_v2_client.list_share_servers(search_opts)
         # Should exist, at least, one share server, used by this test suite.
         self.assertGreater(len(servers), 0)
         for server in servers:
@@ -156,7 +142,7 @@
     @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
     def test_list_share_servers_with_share_network_id_filter(self):
         search_opts = {"share_network": self.share_network["id"]}
-        servers = self.shares_client.list_share_servers(search_opts)
+        servers = self.shares_v2_client.list_share_servers(search_opts)
         # Should exist, at least, one share server, used by this test suite.
         self.assertGreater(len(servers), 0)
         for server in servers:
@@ -165,8 +151,9 @@
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
     def test_show_share_server(self):
-        share = self.shares_client.get_share(self.share["id"])
-        server = self.shares_client.show_share_server(share["share_server_id"])
+        share = self.shares_v2_client.get_share(self.share["id"])
+        server = self.shares_v2_client.show_share_server(
+            share["share_server_id"])
         keys = [
             "id",
             "host",
@@ -180,35 +167,35 @@
         # all expected keys are present
         for key in keys:
             self.assertIn(key, server.keys())
+
         # 'created_at' is valid date
         self.assertTrue(self.date_re.match(server["created_at"]))
+
         # 'updated_at' is valid date if set
         if server["updated_at"]:
             self.assertTrue(self.date_re.match(server["updated_at"]))
-        # Host is not empty
-        self.assertGreater(len(server["host"]), 0)
-        # Id is not empty
-        self.assertGreater(len(server["id"]), 0)
-        # Project id is not empty
-        self.assertGreater(len(server["project_id"]), 0)
-        # Status is not empty
-        self.assertGreater(len(server["status"]), 0)
-        # share_network_name is not empty
-        self.assertGreater(len(server["share_network_name"]), 0)
-        # backend_details should be a dict
+
+        # veriy that values for following keys are not empty
+        for k in ('host', 'id', 'project_id', 'status', 'share_network_name'):
+            self.assertGreater(len(server[k]), 0)
+
+        # 'backend_details' should be a dict
         self.assertIsInstance(server["backend_details"], dict)
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
     def test_show_share_server_details(self):
-        servers = self.shares_client.list_share_servers()
-        details = self.shares_client.show_share_server_details(
-            servers[0]["id"])
+        share = self.shares_v2_client.get_share(self.share['id'])
+        details = self.shares_v2_client.show_share_server_details(
+            share['share_server_id'])
+
         # If details are present they and their values should be only strings
-        for k, v in details.iteritems():
+        for k, v in details.items():
             self.assertIsInstance(k, six.string_types)
             self.assertIsInstance(v, six.string_types)
 
-    def _delete_share_server(self, delete_share_network):
+    @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
+    @ddt.data(True, False)
+    def test_delete_share_server(self, delete_share_network):
         # Get network and subnet from existing share_network and reuse it
         # to be able to delete share_server after test ends.
         # TODO(vponomaryov): attach security-services too. If any exist from
@@ -221,8 +208,8 @@
         self.create_share(share_network_id=new_sn['id'])
 
         # List share servers, filtered by share_network_id
-        search_opts = {"share_network": new_sn["id"]}
-        servers = self.shares_client.list_share_servers(search_opts)
+        servers = self.shares_v2_client.list_share_servers(
+            {"share_network": new_sn["id"]})
 
         # There can be more than one share server for share network when retry
         # was used and share was created successfully not from first time.
@@ -233,42 +220,36 @@
             self.assertEqual(new_sn["id"], serv["share_network_id"])
 
             # List shares by share server id
-            params = {"share_server_id": serv["id"]}
-            shares = self.shares_client.list_shares_with_detail(params)
+            shares = self.shares_v2_client.list_shares_with_detail(
+                {"share_server_id": serv["id"]})
             for s in shares:
                 self.assertEqual(new_sn["id"], s["share_network_id"])
 
             # Delete shares, so we will have share server without shares
             for s in shares:
-                self.shares_client.delete_share(s["id"])
+                self.shares_v2_client.delete_share(s["id"])
 
             # Wait for shares deletion
             for s in shares:
-                self.shares_client.wait_for_resource_deletion(share_id=s["id"])
+                self.shares_v2_client.wait_for_resource_deletion(
+                    share_id=s["id"])
 
             # List shares by share server id, we expect empty list
-            params = {"share_server_id": serv["id"]}
-            empty = self.shares_client.list_shares_with_detail(params)
+            empty = self.shares_v2_client.list_shares_with_detail(
+                {"share_server_id": serv["id"]})
             self.assertEqual(0, len(empty))
 
             if delete_share_network:
                 # Delete share network, it should trigger share server deletion
-                self.shares_client.delete_share_network(new_sn["id"])
+                self.shares_v2_client.delete_share_network(new_sn["id"])
             else:
                 # Delete share server
-                self.shares_client.delete_share_server(serv["id"])
+                self.shares_v2_client.delete_share_server(serv["id"])
 
             # Wait for share server deletion
-            self.shares_client.wait_for_resource_deletion(server_id=serv["id"])
+            self.shares_v2_client.wait_for_resource_deletion(
+                server_id=serv["id"])
 
             if delete_share_network:
-                self.shares_client.wait_for_resource_deletion(
+                self.shares_v2_client.wait_for_resource_deletion(
                     sn_id=new_sn["id"])
-
-    @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
-    def test_delete_share_server(self):
-        self._delete_share_server(False)
-
-    @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
-    def test_delete_share_server_by_deletion_of_share_network(self):
-        self._delete_share_server(True)
diff --git a/manila_tempest_tests/tests/api/base.py b/manila_tempest_tests/tests/api/base.py
index 2c26c78..de298da 100644
--- a/manila_tempest_tests/tests/api/base.py
+++ b/manila_tempest_tests/tests/api/base.py
@@ -150,6 +150,26 @@
                 microversion)
 
     @classmethod
+    def _get_dynamic_creds(cls, name, network_resources=None):
+        return dynamic_creds.DynamicCredentialProvider(
+            identity_version=CONF.identity.auth_version,
+            name=name,
+            network_resources=network_resources,
+            credentials_domain=CONF.auth.default_credentials_domain_name,
+            admin_role=CONF.identity.admin_role,
+            admin_creds=common_creds.get_configured_admin_credentials(),
+            identity_admin_domain_scope=CONF.identity.admin_domain_scope,
+            identity_admin_role=CONF.identity.admin_role,
+            extra_roles=None,
+            neutron_available=CONF.service_available.neutron,
+            create_networks=(
+                CONF.share.create_networks_when_multitenancy_enabled),
+            project_network_cidr=CONF.network.project_network_cidr,
+            project_network_mask_bits=CONF.network.project_network_mask_bits,
+            public_network_id=CONF.network.public_network_id,
+            resource_prefix=CONF.resources_prefix)
+
+    @classmethod
     def get_client_with_isolated_creds(cls,
                                        name=None,
                                        type_of_creds="admin",
@@ -171,11 +191,7 @@
                 name = name[0:32]
 
         # Choose type of isolated creds
-        ic = dynamic_creds.DynamicCredentialProvider(
-            identity_version=CONF.identity.auth_version,
-            name=name,
-            admin_role=CONF.identity.admin_role,
-            admin_creds=common_creds.get_configured_admin_credentials())
+        ic = cls._get_dynamic_creds(name)
         if "admin" in type_of_creds:
             creds = ic.get_admin_creds().credentials
         elif "alt" in type_of_creds:
@@ -349,13 +365,7 @@
 
                     # Create suitable network
                     if net_id is None or subnet_id is None:
-                        ic = dynamic_creds.DynamicCredentialProvider(
-                            identity_version=CONF.identity.auth_version,
-                            name=service_net_name,
-                            admin_role=CONF.identity.admin_role,
-                            admin_creds=(
-                                common_creds.
-                                get_configured_admin_credentials()))
+                        ic = cls._get_dynamic_creds(service_net_name)
                         net_data = ic._create_network_resources(sc.tenant_id)
                         network, subnet, router = net_data
                         net_id = network["id"]
@@ -495,6 +505,7 @@
         data = []
         for d in share_data_list:
             client = d["kwargs"].pop("client", cls.shares_v2_client)
+            wait_for_status = d["kwargs"].pop("wait_for_status", True)
             local_d = {
                 "args": d["args"],
                 "kwargs": copy.deepcopy(d["kwargs"]),
@@ -504,10 +515,13 @@
                 *local_d["args"], **local_d["kwargs"])
             local_d["cnt"] = 0
             local_d["available"] = False
+            local_d["wait_for_status"] = wait_for_status
             data.append(local_d)
 
         while not all(d["available"] for d in data):
             for d in data:
+                if not d["wait_for_status"]:
+                    d["available"] = True
                 if d["available"]:
                     continue
                 client = d["kwargs"]["client"]
@@ -657,7 +671,8 @@
     def get_pools_for_replication_domain(self):
         # Get the list of pools for the replication domain
         pools = self.admin_client.list_pools(detail=True)['pools']
-        instance_host = self.shares[0]['host']
+        instance_host = self.admin_client.get_share(
+            self.shares[0]['id'])['host']
         host_pool = [p for p in pools if p['name'] == instance_host][0]
         rep_domain = host_pool['capabilities']['replication_domain']
         pools_in_rep_domain = [p for p in pools if p['capabilities'][
@@ -704,6 +719,33 @@
             status_attr="replica_state")
         return replica
 
+    def _get_access_rule_data_from_config(self):
+        """Get the first available access type/to combination from config.
+
+        This method opportunistically picks the first configured protocol
+        to create the share. Do not use this method in tests where you need
+        to test depth and breadth in the access types and access recipients.
+        """
+        protocol = self.shares_v2_client.share_protocol
+
+        if protocol in CONF.share.enable_ip_rules_for_protocols:
+            access_type = "ip"
+            access_to = utils.rand_ip()
+        elif protocol in CONF.share.enable_user_rules_for_protocols:
+            access_type = "user"
+            access_to = CONF.share.username_for_user_rules
+        elif protocol in CONF.share.enable_cert_rules_for_protocols:
+            access_type = "cert"
+            access_to = "client3.com"
+        elif protocol in CONF.share.enable_cephx_rules_for_protocols:
+            access_type = "cephx"
+            access_to = "eve"
+        else:
+            message = "Unrecognized protocol and access rules configuration."
+            raise self.skipException(message)
+
+        return access_type, access_to
+
     @classmethod
     def create_share_network(cls, client=None,
                              cleanup_in_class=False, **kwargs):
diff --git a/manila_tempest_tests/tests/api/test_replication.py b/manila_tempest_tests/tests/api/test_replication.py
index 1738e87..589f47c 100644
--- a/manila_tempest_tests/tests/api/test_replication.py
+++ b/manila_tempest_tests/tests/api/test_replication.py
@@ -21,12 +21,11 @@
 from manila_tempest_tests.common import constants
 from manila_tempest_tests import share_exceptions
 from manila_tempest_tests.tests.api import base
-from manila_tempest_tests import utils
 
 CONF = config.CONF
 _MIN_SUPPORTED_MICROVERSION = '2.11'
 SUMMARY_KEYS = ['share_id', 'id', 'replica_state', 'status']
-DETAIL_KEYS = SUMMARY_KEYS + ['availability_zone', 'host', 'updated_at',
+DETAIL_KEYS = SUMMARY_KEYS + ['availability_zone', 'updated_at',
                               'share_network_id', 'created_at']
 
 
@@ -72,9 +71,6 @@
         cls.instance_id1 = cls._get_instance(cls.shares[0])
         cls.instance_id2 = cls._get_instance(cls.shares[1])
 
-        cls.access_type = "ip"
-        cls.access_to = utils.rand_ip()
-
     @classmethod
     def _get_instance(cls, share):
         share_instances = cls.admin_client.get_instances_of_share(share["id"])
@@ -107,24 +103,36 @@
         return [replica for replica in replica_list
                 if replica['replica_state'] == r_state]
 
-    def _verify_config_and_set_access_rule_data(self):
-        """Verify the access rule configuration is enabled for NFS.
+    def _verify_in_sync_replica_promotion(self, share, original_replica):
+        # Verify that 'in-sync' replica has been promoted successfully
 
-        Set the data after verification.
-        """
-        protocol = self.shares_v2_client.share_protocol
+        # NOTE(Yogi1): Cleanup needs to be disabled for replica that is
+        # being promoted since it will become the 'primary'/'active' replica.
+        replica = self.create_share_replica(share["id"], self.replica_zone,
+                                            cleanup=False)
+        # Wait for replica state to update after creation
+        self.shares_v2_client.wait_for_share_replica_status(
+            replica['id'], constants.REPLICATION_STATE_IN_SYNC,
+            status_attr='replica_state')
+        # Promote the first in_sync replica to active state
+        promoted_replica = self.promote_share_replica(replica['id'])
+        # Delete the demoted replica so promoted replica can be cleaned
+        # during the cleanup of the share.
+        self.addCleanup(self.delete_share_replica, original_replica['id'])
+        self._verify_active_replica_count(share["id"])
+        # Verify the replica_state for promoted replica
+        promoted_replica = self.shares_v2_client.get_share_replica(
+            promoted_replica["id"])
+        self.assertEqual(constants.REPLICATION_STATE_ACTIVE,
+                         promoted_replica["replica_state"])
 
-        # TODO(Yogi1): Add access rules for other protocols.
-        if not ((protocol.lower() == 'nfs') and
-                (protocol in CONF.share.enable_ip_rules_for_protocols) and
-                CONF.share.enable_ip_rules_for_protocols):
-            message = "IP access rules are not supported for this protocol."
-            raise self.skipException(message)
-
-        access_type = "ip"
-        access_to = utils.rand_ip()
-
-        return access_type, access_to
+    def _check_skip_promotion_tests(self):
+        # Check if the replication type is right for replica promotion tests
+        if (self.replication_type
+                not in constants.REPLICATION_PROMOTION_CHOICES):
+            msg = "Option backend_replication_type should be one of (%s)!"
+            raise self.skipException(
+                msg % ','.join(constants.REPLICATION_PROMOTION_CHOICES))
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
     def test_add_delete_share_replica(self):
@@ -137,7 +145,7 @@
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
     def test_add_access_rule_create_replica_delete_rule(self):
         # Add access rule to the share
-        access_type, access_to = self._verify_config_and_set_access_rule_data()
+        access_type, access_to = self._get_access_rule_data_from_config()
         rule = self.shares_v2_client.create_access_rule(
             self.shares[0]["id"], access_type, access_to, 'ro')
         self.shares_v2_client.wait_for_access_rule_status(
@@ -159,7 +167,7 @@
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
     def test_create_replica_add_access_rule_delete_replica(self):
-        access_type, access_to = self._verify_config_and_set_access_rule_data()
+        access_type, access_to = self._get_access_rule_data_from_config()
         # Create the replica
         share_replica = self._verify_create_replica()
 
@@ -193,7 +201,7 @@
                                                    cleanup_in_class=False)
         self.shares_v2_client.get_share_replica(share_replica2['id'])
 
-        share_replicas = self.shares_v2_client.list_share_replicas(
+        share_replicas = self.admin_client.list_share_replicas(
             share_id=self.shares[0]["id"])
         replica_host_set = {r['host'] for r in share_replicas}
 
@@ -208,42 +216,40 @@
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
     def test_promote_in_sync_share_replica(self):
         # Test promote 'in_sync' share_replica to 'active' state
-        if (self.replication_type
-                not in constants.REPLICATION_PROMOTION_CHOICES):
-            msg = "Option backend_replication_type should be one of (%s)!"
-            raise self.skipException(
-                msg % ','.join(constants.REPLICATION_PROMOTION_CHOICES))
+        self._check_skip_promotion_tests()
         share = self.create_shares([self.creation_data])[0]
         original_replica = self.shares_v2_client.list_share_replicas(
             share["id"])[0]
-        # NOTE(Yogi1): Cleanup needs to be disabled for replica that is
-        # being promoted since it will become the 'primary'/'active' replica.
-        replica = self.create_share_replica(share["id"], self.replica_zone,
-                                            cleanup=False)
-        # Wait for replica state to update after creation
-        self.shares_v2_client.wait_for_share_replica_status(
-            replica['id'], constants.REPLICATION_STATE_IN_SYNC,
-            status_attr='replica_state')
-        # Promote the first in_sync replica to active state
-        promoted_replica = self.promote_share_replica(replica['id'])
-        # Delete the demoted replica so promoted replica can be cleaned
-        # during the cleanup of the share.
-        self.addCleanup(self.delete_share_replica, original_replica['id'])
-        self._verify_active_replica_count(share["id"])
-        # Verify the replica_state for promoted replica
-        promoted_replica = self.shares_v2_client.get_share_replica(
-            promoted_replica["id"])
-        self.assertEqual(constants.REPLICATION_STATE_ACTIVE,
-                         promoted_replica["replica_state"])
+        self._verify_in_sync_replica_promotion(share, original_replica)
+
+    @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
+    def test_add_rule_promote_share_replica_verify_rule(self):
+        # Verify the access rule stays intact after share replica promotion
+        self._check_skip_promotion_tests()
+
+        share = self.create_shares([self.creation_data])[0]
+        # Add access rule
+        access_type, access_to = self._get_access_rule_data_from_config()
+        rule = self.shares_v2_client.create_access_rule(
+            share["id"], access_type, access_to, 'ro')
+        self.shares_v2_client.wait_for_access_rule_status(
+            share["id"], rule["id"], constants.RULE_STATE_ACTIVE)
+
+        original_replica = self.shares_v2_client.list_share_replicas(
+            share["id"])[0]
+        self._verify_in_sync_replica_promotion(share, original_replica)
+
+        # verify rule's values
+        rules_list = self.shares_v2_client.list_access_rules(share["id"])
+        self.assertEqual(1, len(rules_list))
+        self.assertEqual(access_type, rules_list[0]["access_type"])
+        self.assertEqual(access_to, rules_list[0]["access_to"])
+        self.assertEqual('ro', rules_list[0]["access_level"])
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
     def test_promote_and_promote_back(self):
         # Test promote back and forth between 2 share replicas
-        if (self.replication_type
-                not in constants.REPLICATION_PROMOTION_CHOICES):
-            msg = "Option backend_replication_type should be one of (%s)!"
-            raise self.skipException(
-                msg % ','.join(constants.REPLICATION_PROMOTION_CHOICES))
+        self._check_skip_promotion_tests()
 
         # Create a new share
         share = self.create_shares([self.creation_data])[0]
diff --git a/manila_tempest_tests/tests/api/test_replication_negative.py b/manila_tempest_tests/tests/api/test_replication_negative.py
index 48b8d5c..21a5ef2 100644
--- a/manila_tempest_tests/tests/api/test_replication_negative.py
+++ b/manila_tempest_tests/tests/api/test_replication_negative.py
@@ -159,6 +159,43 @@
         # Try promoting the replica
         self.shares_v2_client.promote_share_replica(replica['id'])
 
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+    def test_add_access_rule_share_replica_error_status(self):
+        access_type, access_to = self._get_access_rule_data_from_config()
+        # Create the replica
+        share_replica = self.create_share_replica(self.share1["id"],
+                                                  self.replica_zone,
+                                                  cleanup_in_class=False)
+        # Reset the replica status to error
+        self.admin_client.reset_share_replica_status(
+            share_replica['id'], constants.STATUS_ERROR)
+
+        # Verify access rule cannot be added
+        self.assertRaises(lib_exc.BadRequest,
+                          self.admin_client.create_access_rule,
+                          self.share1["id"], access_type, access_to, 'ro')
+
+    @testtools.skipUnless(CONF.share.run_host_assisted_migration_tests or
+                          CONF.share.run_driver_assisted_migration_tests,
+                          "Share migration tests are disabled.")
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+    @base.skip_if_microversion_lt("2.29")
+    def test_migration_of_replicated_share(self):
+        pools = self.admin_client.list_pools(detail=True)['pools']
+        hosts = [p['name'] for p in pools]
+        self.create_share_replica(self.share1["id"], self.replica_zone,
+                                  cleanup_in_class=False)
+        share_host = self.admin_client.get_share(self.share1['id'])['host']
+
+        for host in hosts:
+            if host != share_host:
+                dest_host = host
+                break
+
+        self.assertRaises(
+            lib_exc.Conflict, self.admin_client.migrate_share,
+            self.share1['id'], dest_host)
+
 
 @testtools.skipUnless(CONF.share.run_replication_tests,
                       'Replication tests are disabled.')
diff --git a/manila_tempest_tests/tests/api/test_rules.py b/manila_tempest_tests/tests/api/test_rules.py
index 180ed79..024590d 100644
--- a/manila_tempest_tests/tests/api/test_rules.py
+++ b/manila_tempest_tests/tests/api/test_rules.py
@@ -13,6 +13,8 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import itertools
+
 import ddt
 from tempest import config
 from tempest.lib import exceptions as lib_exc
@@ -44,6 +46,12 @@
     for key in ('deleted', 'deleted_at', 'instance_mappings'):
         self.assertNotIn(key, rule.keys())
 
+    # rules must start out in 'new' until 2.28 & 'queued_to_apply' after 2.28
+    if utils.is_microversion_le(version, "2.27"):
+        self.assertEqual("new", rule['state'])
+    else:
+        self.assertEqual("queued_to_apply", rule['state'])
+
     if utils.is_microversion_le(version, '2.9'):
         self.shares_client.wait_for_access_rule_status(
             self.share["id"], rule["id"], "active")
@@ -51,6 +59,11 @@
         self.shares_v2_client.wait_for_share_status(
             self.share["id"], "active", status_attr='access_rules_status',
             version=version)
+        # If the 'access_rules_status' transitions to 'active',
+        # rule state must too
+        rules = self.shares_v2_client.list_access_rules(self.share['id'])
+        rule = [r for r in rules if r['id'] == rule['id']][0]
+        self.assertEqual("active", rule['state'])
 
     if utils.is_microversion_eq(version, '1.0'):
         self.shares_client.delete_access_rule(self.share["id"], rule["id"])
@@ -79,7 +92,7 @@
         cls.access_to = "2.2.2.2"
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_access_rules_with_one_ip(self, version):
 
         # test data
@@ -98,6 +111,12 @@
         for key in ('deleted', 'deleted_at', 'instance_mappings'):
             self.assertNotIn(key, rule.keys())
 
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
+
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
                 self.share["id"], rule["id"], "active")
@@ -121,7 +140,7 @@
                 rule_id=rule["id"], share_id=self.share['id'], version=version)
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_access_rule_with_cidr(self, version):
 
         # test data
@@ -140,6 +159,12 @@
             self.assertNotIn(key, rule.keys())
         self.assertEqual('rw', rule['access_level'])
 
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
+
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
                 self.share["id"], rule["id"], "active")
@@ -166,7 +191,7 @@
     @testtools.skipIf(
         "nfs" not in CONF.share.enable_ro_access_level_for_protocols,
         "RO access rule tests are disabled for NFS protocol.")
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_ro_access_rule(self, client_name):
         _create_delete_ro_access_rule(self, client_name)
 
@@ -179,7 +204,7 @@
     @testtools.skipIf(
         "cifs" not in CONF.share.enable_ro_access_level_for_protocols,
         "RO access rule tests are disabled for CIFS protocol.")
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_ro_access_rule(self, version):
         _create_delete_ro_access_rule(self, version)
 
@@ -201,7 +226,7 @@
         cls.access_to = CONF.share.username_for_user_rules
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_user_rule(self, version):
 
         # create rule
@@ -217,6 +242,12 @@
         for key in ('deleted', 'deleted_at', 'instance_mappings'):
             self.assertNotIn(key, rule.keys())
 
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
+
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
                 self.share["id"], rule["id"], "active")
@@ -243,7 +274,7 @@
     @testtools.skipIf(
         "nfs" not in CONF.share.enable_ro_access_level_for_protocols,
         "RO access rule tests are disabled for NFS protocol.")
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_ro_access_rule(self, version):
         _create_delete_ro_access_rule(self, version)
 
@@ -256,7 +287,7 @@
     @testtools.skipIf(
         "cifs" not in CONF.share.enable_ro_access_level_for_protocols,
         "RO access rule tests are disabled for CIFS protocol.")
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_ro_access_rule(self, version):
         _create_delete_ro_access_rule(self, version)
 
@@ -280,7 +311,7 @@
         cls.access_to = "client1.com"
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_cert_rule(self, version):
 
         # create rule
@@ -296,6 +327,12 @@
         for key in ('deleted', 'deleted_at', 'instance_mappings'):
             self.assertNotIn(key, rule.keys())
 
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
+
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
                 self.share["id"], rule["id"], "active")
@@ -322,7 +359,7 @@
     @testtools.skipIf(
         "glusterfs" not in CONF.share.enable_ro_access_level_for_protocols,
         "RO access rule tests are disabled for GLUSTERFS protocol.")
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_cert_ro_access_rule(self, version):
         if utils.is_microversion_eq(version, '1.0'):
             rule = self.shares_client.create_access_rule(
@@ -336,6 +373,12 @@
         for key in ('deleted', 'deleted_at', 'instance_mappings'):
             self.assertNotIn(key, rule.keys())
 
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
+
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
                 self.share["id"], rule["id"], "active")
@@ -377,10 +420,13 @@
         cls.access_to = "bob"
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
-    @ddt.data("alice", "alice_bob", "alice bob")
-    def test_create_delete_cephx_rule(self, access_to):
+    @ddt.data(*itertools.product(
+        set(['2.13', '2.27', '2.28', LATEST_MICROVERSION]),
+        ("alice", "alice_bob", "alice bob")))
+    @ddt.unpack
+    def test_create_delete_cephx_rule(self, version, access_to):
         rule = self.shares_v2_client.create_access_rule(
-            self.share["id"], self.access_type, access_to)
+            self.share["id"], self.access_type, access_to, version=version)
 
         self.assertEqual('rw', rule['access_level'])
         for key in ('deleted', 'deleted_at', 'instance_mappings'):
@@ -388,7 +434,8 @@
         self.shares_v2_client.wait_for_access_rule_status(
             self.share["id"], rule["id"], "active")
 
-        self.shares_v2_client.delete_access_rule(self.share["id"], rule["id"])
+        self.shares_v2_client.delete_access_rule(
+            self.share["id"], rule["id"], version=version)
         self.shares_v2_client.wait_for_resource_deletion(
             rule_id=rule["id"], share_id=self.share['id'])
 
@@ -429,7 +476,7 @@
         cls.share = cls.create_share()
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_list_access_rules(self, version):
         if (utils.is_microversion_lt(version, '2.13') and
                 CONF.share.enable_cephx_rules_for_protocols):
@@ -445,6 +492,11 @@
             rule = self.shares_v2_client.create_access_rule(
                 self.share["id"], self.access_type, self.access_to,
                 version=version)
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
 
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
@@ -499,7 +551,7 @@
                 rule_id=rule["id"], share_id=self.share['id'], version=version)
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_access_rules_deleted_if_share_deleted(self, version):
         if (utils.is_microversion_lt(version, '2.13') and
                 CONF.share.enable_cephx_rules_for_protocols):
@@ -519,6 +571,12 @@
                 share["id"], self.access_type, self.access_to,
                 version=version)
 
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
+
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
                 share["id"], rule["id"], "active")
diff --git a/manila_tempest_tests/tests/api/test_rules_negative.py b/manila_tempest_tests/tests/api/test_rules_negative.py
index 9cd4708..8049f14 100644
--- a/manila_tempest_tests/tests/api/test_rules_negative.py
+++ b/manila_tempest_tests/tests/api/test_rules_negative.py
@@ -19,6 +19,7 @@
 import testtools
 from testtools import testcase as tc
 
+from manila_tempest_tests.common import constants
 from manila_tempest_tests.tests.api import base
 from manila_tempest_tests import utils
 
@@ -27,12 +28,13 @@
 
 
 @ddt.ddt
-class ShareIpRulesForNFSNegativeTest(base.BaseSharesTest):
+class ShareIpRulesForNFSNegativeTest(base.BaseSharesMixedTest):
     protocol = "nfs"
 
     @classmethod
     def resource_setup(cls):
         super(ShareIpRulesForNFSNegativeTest, cls).resource_setup()
+        cls.admin_client = cls.admin_shares_v2_client
         if not (cls.protocol in CONF.share.enable_protocols and
                 cls.protocol in CONF.share.enable_ip_rules_for_protocols):
             msg = "IP rule tests for %s protocol are disabled" % cls.protocol
@@ -158,6 +160,25 @@
             self.shares_v2_client.wait_for_resource_deletion(
                 rule_id=rule["id"], share_id=self.share["id"], version=version)
 
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+    def test_add_access_rule_on_share_with_no_host(self):
+        access_type, access_to = self._get_access_rule_data_from_config()
+        extra_specs = self.add_extra_specs_to_dict(
+            {"share_backend_name": 'invalid_backend'})
+        share_type = self.create_share_type('invalid_backend',
+                                            extra_specs=extra_specs,
+                                            client=self.admin_client,
+                                            cleanup_in_class=False)
+        share_type = share_type['share_type']
+        share = self.create_share(share_type_id=share_type['id'],
+                                  cleanup_in_class=False,
+                                  wait_for_status=False)
+        self.shares_v2_client.wait_for_share_status(
+            share['id'], constants.STATUS_ERROR)
+        self.assertRaises(lib_exc.BadRequest,
+                          self.admin_client.create_access_rule,
+                          share["id"], access_type, access_to)
+
 
 @ddt.ddt
 class ShareIpRulesForCIFSNegativeTest(ShareIpRulesForNFSNegativeTest):
diff --git a/manila_tempest_tests/tests/api/test_shares.py b/manila_tempest_tests/tests/api/test_shares.py
index 277f76e..a02b5f8 100644
--- a/manila_tempest_tests/tests/api/test_shares.py
+++ b/manila_tempest_tests/tests/api/test_shares.py
@@ -42,7 +42,7 @@
         share = self.create_share(self.protocol)
         detailed_elements = {'name', 'id', 'availability_zone',
                              'description', 'project_id',
-                             'host', 'created_at', 'share_proto', 'metadata',
+                             'created_at', 'share_proto', 'metadata',
                              'size', 'snapshot_id', 'share_network_id',
                              'status', 'share_type', 'volume_type', 'links',
                              'is_public'}
diff --git a/manila_tempest_tests/tests/api/test_shares_actions.py b/manila_tempest_tests/tests/api/test_shares_actions.py
index cfebdd3..0f59d7a 100644
--- a/manila_tempest_tests/tests/api/test_shares_actions.py
+++ b/manila_tempest_tests/tests/api/test_shares_actions.py
@@ -88,7 +88,7 @@
             "status", "description", "links", "availability_zone",
             "created_at", "project_id", "volume_type", "share_proto", "name",
             "snapshot_id", "id", "size", "share_network_id", "metadata",
-            "host", "snapshot_id", "is_public",
+            "snapshot_id", "is_public",
         ]
         if utils.is_microversion_lt(version, '2.9'):
             expected_keys.extend(["export_location", "export_locations"])
@@ -196,7 +196,7 @@
             "status", "description", "links", "availability_zone",
             "created_at", "project_id", "volume_type", "share_proto", "name",
             "snapshot_id", "id", "size", "share_network_id", "metadata",
-            "host", "snapshot_id", "is_public", "share_type",
+            "snapshot_id", "is_public", "share_type",
         ]
         if utils.is_microversion_lt(version, '2.9'):
             keys.extend(["export_location", "export_locations"])
@@ -284,19 +284,6 @@
             self.assertFalse(self.shares[1]['id'] in [s['id'] for s in shares])
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
-    def test_list_shares_with_detail_filter_by_host(self):
-        base_share = self.shares_client.get_share(self.shares[0]['id'])
-        filters = {'host': base_share['host']}
-
-        # list shares
-        shares = self.shares_client.list_shares_with_detail(params=filters)
-
-        # verify response
-        self.assertGreater(len(shares), 0)
-        for share in shares:
-            self.assertEqual(filters['host'], share['host'])
-
-    @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
     @testtools.skipIf(
         not CONF.share.multitenancy_enabled, "Only for multitenancy.")
     def test_list_shares_with_detail_filter_by_share_network_id(self):
@@ -407,7 +394,7 @@
 
         keys = [
             "status", "description", "links", "availability_zone",
-            "created_at", "export_location", "share_proto", "host",
+            "created_at", "export_location", "share_proto",
             "name", "snapshot_id", "id", "size", "project_id", "is_public",
         ]
         [self.assertIn(key, sh.keys()) for sh in shares for key in keys]
diff --git a/manila_tempest_tests/tests/api/test_shares_negative.py b/manila_tempest_tests/tests/api/test_shares_negative.py
index 42204e1..3229f2f 100644
--- a/manila_tempest_tests/tests/api/test_shares_negative.py
+++ b/manila_tempest_tests/tests/api/test_shares_negative.py
@@ -194,6 +194,12 @@
                           'fake-type')
 
     @tc.attr(base.TAG_NEGATIVE, base.TAG_API)
+    def test_list_by_user_with_host_filter(self):
+        self.assertRaises(lib_exc.Forbidden,
+                          self.shares_v2_client.list_shares,
+                          params={'host': 'fake_host'})
+
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API)
     def test_list_by_share_server_by_user(self):
         self.assertRaises(lib_exc.Forbidden,
                           self.shares_client.list_shares,
diff --git a/manila_tempest_tests/tests/scenario/manager.py b/manila_tempest_tests/tests/scenario/manager.py
new file mode 100644
index 0000000..64c5e8a
--- /dev/null
+++ b/manila_tempest_tests/tests/scenario/manager.py
@@ -0,0 +1,1241 @@
+# Copyright 2012 OpenStack Foundation
+# Copyright 2013 IBM Corp.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import subprocess
+
+import netaddr
+from oslo_log import log
+from oslo_serialization import jsonutils as json
+from oslo_utils import netutils
+import six
+
+from tempest.common import compute
+from tempest.common import image as common_image
+from tempest.common.utils.linux import remote_client
+from tempest.common.utils import net_utils
+from tempest.common import waiters
+from tempest import config
+from tempest import exceptions
+from tempest.lib.common.utils import data_utils
+from tempest.lib.common.utils import test_utils
+from tempest.lib import exceptions as lib_exc
+import tempest.test
+
+CONF = config.CONF
+
+LOG = log.getLogger(__name__)
+
+
+class ScenarioTest(tempest.test.BaseTestCase):
+    """Base class for scenario tests. Uses tempest own clients. """
+
+    credentials = ['primary']
+
+    @classmethod
+    def setup_clients(cls):
+        super(ScenarioTest, cls).setup_clients()
+        # Clients (in alphabetical order)
+        cls.flavors_client = cls.manager.flavors_client
+        cls.compute_floating_ips_client = (
+            cls.manager.compute_floating_ips_client)
+        if CONF.service_available.glance:
+            # Check if glance v1 is available to determine which client to use.
+            if CONF.image_feature_enabled.api_v1:
+                cls.image_client = cls.manager.image_client
+            elif CONF.image_feature_enabled.api_v2:
+                cls.image_client = cls.manager.image_client_v2
+            else:
+                raise lib_exc.InvalidConfiguration(
+                    'Either api_v1 or api_v2 must be True in '
+                    '[image-feature-enabled].')
+        # Compute image client
+        cls.compute_images_client = cls.manager.compute_images_client
+        cls.keypairs_client = cls.manager.keypairs_client
+        # Nova security groups client
+        cls.compute_security_groups_client = (
+            cls.manager.compute_security_groups_client)
+        cls.compute_security_group_rules_client = (
+            cls.manager.compute_security_group_rules_client)
+        cls.servers_client = cls.manager.servers_client
+        cls.interface_client = cls.manager.interfaces_client
+        # Neutron network client
+        cls.networks_client = cls.manager.networks_client
+        cls.ports_client = cls.manager.ports_client
+        cls.routers_client = cls.manager.routers_client
+        cls.subnets_client = cls.manager.subnets_client
+        cls.floating_ips_client = cls.manager.floating_ips_client
+        cls.security_groups_client = cls.manager.security_groups_client
+        cls.security_group_rules_client = (
+            cls.manager.security_group_rules_client)
+
+        if CONF.volume_feature_enabled.api_v2:
+            cls.volumes_client = cls.manager.volumes_v2_client
+            cls.snapshots_client = cls.manager.snapshots_v2_client
+        else:
+            cls.volumes_client = cls.manager.volumes_client
+            cls.snapshots_client = cls.manager.snapshots_client
+
+    # ## Test functions library
+    #
+    # The create_[resource] functions only return body and discard the
+    # resp part which is not used in scenario tests
+
+    def _create_port(self, network_id, client=None, namestart='port-quotatest',
+                     **kwargs):
+        if not client:
+            client = self.ports_client
+        name = data_utils.rand_name(namestart)
+        result = client.create_port(
+            name=name,
+            network_id=network_id,
+            **kwargs)
+        self.assertIsNotNone(result, 'Unable to allocate port')
+        port = result['port']
+        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+                        client.delete_port, port['id'])
+        return port
+
+    def create_keypair(self, client=None):
+        if not client:
+            client = self.keypairs_client
+        name = data_utils.rand_name(self.__class__.__name__)
+        # We don't need to create a keypair by pubkey in scenario
+        body = client.create_keypair(name=name)
+        self.addCleanup(client.delete_keypair, name)
+        return body['keypair']
+
+    def create_server(self, name=None, image_id=None, flavor=None,
+                      validatable=False, wait_until='ACTIVE',
+                      clients=None, **kwargs):
+        """Wrapper utility that returns a test server.
+
+        This wrapper utility calls the common create test server and
+        returns a test server. The purpose of this wrapper is to minimize
+        the impact on the code of the tests already using this
+        function.
+        """
+
+        # NOTE(jlanoux): As a first step, ssh checks in the scenario
+        # tests need to be run regardless of the run_validation and
+        # validatable parameters and thus until the ssh validation job
+        # becomes voting in CI. The test resources management and IP
+        # association are taken care of in the scenario tests.
+        # Therefore, the validatable parameter is set to false in all
+        # those tests. In this way create_server just return a standard
+        # server and the scenario tests always perform ssh checks.
+
+        # Needed for the cross_tenant_traffic test:
+        if clients is None:
+            clients = self.manager
+
+        if name is None:
+            name = data_utils.rand_name(self.__class__.__name__ + "-server")
+
+        vnic_type = CONF.network.port_vnic_type
+
+        # If vnic_type is configured create port for
+        # every network
+        if vnic_type:
+            ports = []
+
+            create_port_body = {'binding:vnic_type': vnic_type,
+                                'namestart': 'port-smoke'}
+            if kwargs:
+                # Convert security group names to security group ids
+                # to pass to create_port
+                if 'security_groups' in kwargs:
+                    security_groups = (
+                        clients.security_groups_client.list_security_groups(
+                        ).get('security_groups'))
+                    sec_dict = {s['name']: s['id'] for s in security_groups}
+
+                    sec_groups_names = [s['name'] for s in kwargs.pop(
+                        'security_groups')]
+                    security_groups_ids = [sec_dict[s]
+                                           for s in sec_groups_names]
+
+                    if security_groups_ids:
+                        create_port_body[
+                            'security_groups'] = security_groups_ids
+                networks = kwargs.pop('networks', [])
+            else:
+                networks = []
+
+            # If there are no networks passed to us we look up
+            # for the project's private networks and create a port.
+            # The same behaviour as we would expect when passing
+            # the call to the clients with no networks
+            if not networks:
+                networks = clients.networks_client.list_networks(
+                    **{'router:external': False, 'fields': 'id'})['networks']
+
+            # It's net['uuid'] if networks come from kwargs
+            # and net['id'] if they come from
+            # clients.networks_client.list_networks
+            for net in networks:
+                net_id = net.get('uuid', net.get('id'))
+                if 'port' not in net:
+                    port = self._create_port(network_id=net_id,
+                                             client=clients.ports_client,
+                                             **create_port_body)
+                    ports.append({'port': port['id']})
+                else:
+                    ports.append({'port': net['port']})
+            if ports:
+                kwargs['networks'] = ports
+            self.ports = ports
+
+        tenant_network = self.get_tenant_network()
+
+        body, servers = compute.create_test_server(
+            clients,
+            tenant_network=tenant_network,
+            wait_until=wait_until,
+            name=name, flavor=flavor,
+            image_id=image_id, **kwargs)
+
+        self.addCleanup(waiters.wait_for_server_termination,
+                        clients.servers_client, body['id'])
+        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+                        clients.servers_client.delete_server, body['id'])
+        server = clients.servers_client.show_server(body['id'])['server']
+        return server
+
+    def create_volume(self, size=None, name=None, snapshot_id=None,
+                      imageRef=None, volume_type=None):
+        if size is None:
+            size = CONF.volume.volume_size
+        if imageRef:
+            image = self.compute_images_client.show_image(imageRef)['image']
+            min_disk = image.get('minDisk')
+            size = max(size, min_disk)
+        if name is None:
+            name = data_utils.rand_name(self.__class__.__name__ + "-volume")
+        kwargs = {'display_name': name,
+                  'snapshot_id': snapshot_id,
+                  'imageRef': imageRef,
+                  'volume_type': volume_type,
+                  'size': size}
+        volume = self.volumes_client.create_volume(**kwargs)['volume']
+
+        self.addCleanup(self.volumes_client.wait_for_resource_deletion,
+                        volume['id'])
+        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+                        self.volumes_client.delete_volume, volume['id'])
+
+        # NOTE(e0ne): Cinder API v2 uses name instead of display_name
+        if 'display_name' in volume:
+            self.assertEqual(name, volume['display_name'])
+        else:
+            self.assertEqual(name, volume['name'])
+        waiters.wait_for_volume_resource_status(self.volumes_client,
+                                                volume['id'], 'available')
+        # The volume retrieved on creation has a non-up-to-date status.
+        # Retrieval after it becomes active ensures correct details.
+        volume = self.volumes_client.show_volume(volume['id'])['volume']
+        return volume
+
+    def create_volume_type(self, client=None, name=None, backend_name=None):
+        if not client:
+            client = self.admin_volume_types_client
+        if not name:
+            class_name = self.__class__.__name__
+            name = data_utils.rand_name(class_name + '-volume-type')
+        randomized_name = data_utils.rand_name('scenario-type-' + name)
+
+        LOG.debug("Creating a volume type: %s on backend %s",
+                  randomized_name, backend_name)
+        extra_specs = {}
+        if backend_name:
+            extra_specs = {"volume_backend_name": backend_name}
+
+        body = client.create_volume_type(name=randomized_name,
+                                         extra_specs=extra_specs)
+        volume_type = body['volume_type']
+        self.assertIn('id', volume_type)
+        self.addCleanup(client.delete_volume_type, volume_type['id'])
+        return volume_type
+
+    def _create_loginable_secgroup_rule(self, secgroup_id=None):
+        _client = self.compute_security_groups_client
+        _client_rules = self.compute_security_group_rules_client
+        if secgroup_id is None:
+            sgs = _client.list_security_groups()['security_groups']
+            for sg in sgs:
+                if sg['name'] == 'default':
+                    secgroup_id = sg['id']
+
+        # These rules are intended to permit inbound ssh and icmp
+        # traffic from all sources, so no group_id is provided.
+        # Setting a group_id would only permit traffic from ports
+        # belonging to the same security group.
+        rulesets = [
+            {
+                # ssh
+                'ip_protocol': 'tcp',
+                'from_port': 22,
+                'to_port': 22,
+                'cidr': '0.0.0.0/0',
+            },
+            {
+                # ping
+                'ip_protocol': 'icmp',
+                'from_port': -1,
+                'to_port': -1,
+                'cidr': '0.0.0.0/0',
+            }
+        ]
+        rules = list()
+        for ruleset in rulesets:
+            sg_rule = _client_rules.create_security_group_rule(
+                parent_group_id=secgroup_id, **ruleset)['security_group_rule']
+            rules.append(sg_rule)
+        return rules
+
+    def _create_security_group(self):
+        # Create security group
+        sg_name = data_utils.rand_name(self.__class__.__name__)
+        sg_desc = sg_name + " description"
+        secgroup = self.compute_security_groups_client.create_security_group(
+            name=sg_name, description=sg_desc)['security_group']
+        self.assertEqual(secgroup['name'], sg_name)
+        self.assertEqual(secgroup['description'], sg_desc)
+        self.addCleanup(
+            test_utils.call_and_ignore_notfound_exc,
+            self.compute_security_groups_client.delete_security_group,
+            secgroup['id'])
+
+        # Add rules to the security group
+        self._create_loginable_secgroup_rule(secgroup['id'])
+
+        return secgroup
+
+    def get_remote_client(self, ip_address, username=None, private_key=None):
+        """Get a SSH client to a remote server
+
+        @param ip_address the server floating or fixed IP address to use
+                          for ssh validation
+        @param username name of the Linux account on the remote server
+        @param private_key the SSH private key to use
+        @return a RemoteClient object
+        """
+
+        if username is None:
+            username = CONF.validation.image_ssh_user
+        # Set this with 'keypair' or others to log in with keypair or
+        # username/password.
+        if CONF.validation.auth_method == 'keypair':
+            password = None
+            if private_key is None:
+                private_key = self.keypair['private_key']
+        else:
+            password = CONF.validation.image_ssh_password
+            private_key = None
+        linux_client = remote_client.RemoteClient(ip_address, username,
+                                                  pkey=private_key,
+                                                  password=password)
+        try:
+            linux_client.validate_authentication()
+        except Exception as e:
+            message = ('Initializing SSH connection to %(ip)s failed. '
+                       'Error: %(error)s' % {'ip': ip_address,
+                                             'error': e})
+            caller = test_utils.find_test_caller()
+            if caller:
+                message = '(%s) %s' % (caller, message)
+            LOG.exception(message)
+            self._log_console_output()
+            raise
+
+        return linux_client
+
+    def _image_create(self, name, fmt, path,
+                      disk_format=None, properties=None):
+        if properties is None:
+            properties = {}
+        name = data_utils.rand_name('%s-' % name)
+        params = {
+            'name': name,
+            'container_format': fmt,
+            'disk_format': disk_format or fmt,
+        }
+        if CONF.image_feature_enabled.api_v1:
+            params['is_public'] = 'False'
+            params['properties'] = properties
+            params = {'headers': common_image.image_meta_to_headers(**params)}
+        else:
+            params['visibility'] = 'private'
+            # Additional properties are flattened out in the v2 API.
+            params.update(properties)
+        body = self.image_client.create_image(**params)
+        image = body['image'] if 'image' in body else body
+        self.addCleanup(self.image_client.delete_image, image['id'])
+        self.assertEqual("queued", image['status'])
+        with open(path, 'rb') as image_file:
+            if CONF.image_feature_enabled.api_v1:
+                self.image_client.update_image(image['id'], data=image_file)
+            else:
+                self.image_client.store_image_file(image['id'], image_file)
+        return image['id']
+
+    def glance_image_create(self):
+        img_path = CONF.scenario.img_dir + "/" + CONF.scenario.img_file
+        aki_img_path = CONF.scenario.img_dir + "/" + CONF.scenario.aki_img_file
+        ari_img_path = CONF.scenario.img_dir + "/" + CONF.scenario.ari_img_file
+        ami_img_path = CONF.scenario.img_dir + "/" + CONF.scenario.ami_img_file
+        img_container_format = CONF.scenario.img_container_format
+        img_disk_format = CONF.scenario.img_disk_format
+        img_properties = CONF.scenario.img_properties
+        LOG.debug("paths: img: %s, container_format: %s, disk_format: %s, "
+                  "properties: %s, ami: %s, ari: %s, aki: %s",
+                  img_path, img_container_format, img_disk_format,
+                  img_properties, ami_img_path, ari_img_path, aki_img_path)
+        try:
+            image = self._image_create('scenario-img',
+                                       img_container_format,
+                                       img_path,
+                                       disk_format=img_disk_format,
+                                       properties=img_properties)
+        except IOError:
+            LOG.debug("A qcow2 image was not found. Try to get a uec image.")
+            kernel = self._image_create('scenario-aki', 'aki', aki_img_path)
+            ramdisk = self._image_create('scenario-ari', 'ari', ari_img_path)
+            properties = {'kernel_id': kernel, 'ramdisk_id': ramdisk}
+            image = self._image_create('scenario-ami', 'ami',
+                                       path=ami_img_path,
+                                       properties=properties)
+        LOG.debug("image:%s", image)
+
+        return image
+
+    def _log_console_output(self, servers=None):
+        if not CONF.compute_feature_enabled.console_output:
+            LOG.debug('Console output not supported, cannot log')
+            return
+        if not servers:
+            servers = self.servers_client.list_servers()
+            servers = servers['servers']
+        for server in servers:
+            try:
+                console_output = self.servers_client.get_console_output(
+                    server['id'])['output']
+                LOG.debug('Console output for %s\nbody=\n%s',
+                          server['id'], console_output)
+            except lib_exc.NotFound:
+                LOG.debug("Server %s disappeared(deleted) while looking "
+                          "for the console log", server['id'])
+
+    def _log_net_info(self, exc):
+        # network debug is called as part of ssh init
+        if not isinstance(exc, lib_exc.SSHTimeout):
+            LOG.debug('Network information on a devstack host')
+
+    def create_server_snapshot(self, server, name=None):
+        # Glance client
+        _image_client = self.image_client
+        # Compute client
+        _images_client = self.compute_images_client
+        if name is None:
+            name = data_utils.rand_name(self.__class__.__name__ + 'snapshot')
+        LOG.debug("Creating a snapshot image for server: %s", server['name'])
+        image = _images_client.create_image(server['id'], name=name)
+        image_id = image.response['location'].split('images/')[1]
+        waiters.wait_for_image_status(_image_client, image_id, 'active')
+
+        self.addCleanup(_image_client.wait_for_resource_deletion,
+                        image_id)
+        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+                        _image_client.delete_image, image_id)
+
+        if CONF.image_feature_enabled.api_v1:
+            # In glance v1 the additional properties are stored in the headers.
+            resp = _image_client.check_image(image_id)
+            snapshot_image = common_image.get_image_meta_from_headers(resp)
+            image_props = snapshot_image.get('properties', {})
+        else:
+            # In glance v2 the additional properties are flattened.
+            snapshot_image = _image_client.show_image(image_id)
+            image_props = snapshot_image
+
+        bdm = image_props.get('block_device_mapping')
+        if bdm:
+            bdm = json.loads(bdm)
+            if bdm and 'snapshot_id' in bdm[0]:
+                snapshot_id = bdm[0]['snapshot_id']
+                self.addCleanup(
+                    self.snapshots_client.wait_for_resource_deletion,
+                    snapshot_id)
+                self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+                                self.snapshots_client.delete_snapshot,
+                                snapshot_id)
+                waiters.wait_for_volume_resource_status(self.snapshots_client,
+                                                        snapshot_id,
+                                                        'available')
+        image_name = snapshot_image['name']
+        self.assertEqual(name, image_name)
+        LOG.debug("Created snapshot image %s for server %s",
+                  image_name, server['name'])
+        return snapshot_image
+
+    def nova_volume_attach(self, server, volume_to_attach):
+        volume = self.servers_client.attach_volume(
+            server['id'], volumeId=volume_to_attach['id'], device='/dev/%s'
+            % CONF.compute.volume_device_name)['volumeAttachment']
+        self.assertEqual(volume_to_attach['id'], volume['id'])
+        waiters.wait_for_volume_resource_status(self.volumes_client,
+                                                volume['id'], 'in-use')
+
+        # Return the updated volume after the attachment
+        return self.volumes_client.show_volume(volume['id'])['volume']
+
+    def nova_volume_detach(self, server, volume):
+        self.servers_client.detach_volume(server['id'], volume['id'])
+        waiters.wait_for_volume_resource_status(self.volumes_client,
+                                                volume['id'], 'available')
+
+        volume = self.volumes_client.show_volume(volume['id'])['volume']
+        self.assertEqual('available', volume['status'])
+
+    def rebuild_server(self, server_id, image=None,
+                       preserve_ephemeral=False, wait=True,
+                       rebuild_kwargs=None):
+        if image is None:
+            image = CONF.compute.image_ref
+
+        rebuild_kwargs = rebuild_kwargs or {}
+
+        LOG.debug("Rebuilding server (id: %s, image: %s, preserve eph: %s)",
+                  server_id, image, preserve_ephemeral)
+        self.servers_client.rebuild_server(
+            server_id=server_id, image_ref=image,
+            preserve_ephemeral=preserve_ephemeral,
+            **rebuild_kwargs)
+        if wait:
+            waiters.wait_for_server_status(self.servers_client,
+                                           server_id, 'ACTIVE')
+
+    def ping_ip_address(self, ip_address, should_succeed=True,
+                        ping_timeout=None, mtu=None):
+        timeout = ping_timeout or CONF.validation.ping_timeout
+        cmd = ['ping', '-c1', '-w1']
+
+        if mtu:
+            cmd += [
+                # don't fragment
+                '-M', 'do',
+                # ping receives just the size of ICMP payload
+                '-s', str(net_utils.get_ping_payload_size(mtu, 4))
+            ]
+        cmd.append(ip_address)
+
+        def ping():
+            proc = subprocess.Popen(cmd,
+                                    stdout=subprocess.PIPE,
+                                    stderr=subprocess.PIPE)
+            proc.communicate()
+
+            return (proc.returncode == 0) == should_succeed
+
+        caller = test_utils.find_test_caller()
+        LOG.debug('%(caller)s begins to ping %(ip)s in %(timeout)s sec and the'
+                  ' expected result is %(should_succeed)s', {
+                      'caller': caller, 'ip': ip_address, 'timeout': timeout,
+                      'should_succeed':
+                      'reachable' if should_succeed else 'unreachable'
+                  })
+        result = test_utils.call_until_true(ping, timeout, 1)
+        LOG.debug('%(caller)s finishes ping %(ip)s in %(timeout)s sec and the '
+                  'ping result is %(result)s', {
+                      'caller': caller, 'ip': ip_address, 'timeout': timeout,
+                      'result': 'expected' if result else 'unexpected'
+                  })
+        return result
+
+    def check_vm_connectivity(self, ip_address,
+                              username=None,
+                              private_key=None,
+                              should_connect=True,
+                              mtu=None):
+        """Check server connectivity
+
+        :param ip_address: server to test against
+        :param username: server's ssh username
+        :param private_key: server's ssh private key to be used
+        :param should_connect: True/False indicates positive/negative test
+            positive - attempt ping and ssh
+            negative - attempt ping and fail if succeed
+        :param mtu: network MTU to use for connectivity validation
+
+        :raises: AssertError if the result of the connectivity check does
+            not match the value of the should_connect param
+        """
+        if should_connect:
+            msg = "Timed out waiting for %s to become reachable" % ip_address
+        else:
+            msg = "ip address %s is reachable" % ip_address
+        self.assertTrue(self.ping_ip_address(ip_address,
+                                             should_succeed=should_connect,
+                                             mtu=mtu),
+                        msg=msg)
+        if should_connect:
+            # no need to check ssh for negative connectivity
+            self.get_remote_client(ip_address, username, private_key)
+
+    def check_public_network_connectivity(self, ip_address, username,
+                                          private_key, should_connect=True,
+                                          msg=None, servers=None, mtu=None):
+        # The target login is assumed to have been configured for
+        # key-based authentication by cloud-init.
+        LOG.debug('checking network connections to IP %s with user: %s',
+                  ip_address, username)
+        try:
+            self.check_vm_connectivity(ip_address,
+                                       username,
+                                       private_key,
+                                       should_connect=should_connect,
+                                       mtu=mtu)
+        except Exception:
+            ex_msg = 'Public network connectivity check failed'
+            if msg:
+                ex_msg += ": " + msg
+            LOG.exception(ex_msg)
+            self._log_console_output(servers)
+            raise
+
+    def create_floating_ip(self, thing, pool_name=None):
+        """Create a floating IP and associates to a server on Nova"""
+
+        if not pool_name:
+            pool_name = CONF.network.floating_network_name
+        floating_ip = (self.compute_floating_ips_client.
+                       create_floating_ip(pool=pool_name)['floating_ip'])
+        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+                        self.compute_floating_ips_client.delete_floating_ip,
+                        floating_ip['id'])
+        self.compute_floating_ips_client.associate_floating_ip_to_server(
+            floating_ip['ip'], thing['id'])
+        return floating_ip
+
+    def create_timestamp(self, ip_address, dev_name=None, mount_path='/mnt',
+                         private_key=None):
+        ssh_client = self.get_remote_client(ip_address,
+                                            private_key=private_key)
+        if dev_name is not None:
+            ssh_client.make_fs(dev_name)
+            ssh_client.mount(dev_name, mount_path)
+        cmd_timestamp = 'sudo sh -c "date > %s/timestamp; sync"' % mount_path
+        ssh_client.exec_command(cmd_timestamp)
+        timestamp = ssh_client.exec_command('sudo cat %s/timestamp'
+                                            % mount_path)
+        if dev_name is not None:
+            ssh_client.umount(mount_path)
+        return timestamp
+
+    def get_timestamp(self, ip_address, dev_name=None, mount_path='/mnt',
+                      private_key=None):
+        ssh_client = self.get_remote_client(ip_address,
+                                            private_key=private_key)
+        if dev_name is not None:
+            ssh_client.mount(dev_name, mount_path)
+        timestamp = ssh_client.exec_command('sudo cat %s/timestamp'
+                                            % mount_path)
+        if dev_name is not None:
+            ssh_client.umount(mount_path)
+        return timestamp
+
+    def get_server_ip(self, server):
+        """Get the server fixed or floating IP.
+
+        Based on the configuration we're in, return a correct ip
+        address for validating that a guest is up.
+        """
+        if CONF.validation.connect_method == 'floating':
+            # The tests calling this method don't have a floating IP
+            # and can't make use of the validation resources. So the
+            # method is creating the floating IP there.
+            return self.create_floating_ip(server)['ip']
+        elif CONF.validation.connect_method == 'fixed':
+            # Determine the network name to look for based on config or creds
+            # provider network resources.
+            if CONF.validation.network_for_ssh:
+                addresses = server['addresses'][
+                    CONF.validation.network_for_ssh]
+            else:
+                creds_provider = self._get_credentials_provider()
+                net_creds = creds_provider.get_primary_creds()
+                network = getattr(net_creds, 'network', None)
+                addresses = (server['addresses'][network['name']]
+                             if network else [])
+            for address in addresses:
+                if (address['version'] == CONF.validation.ip_version_for_ssh
+                        and address['OS-EXT-IPS:type'] == 'fixed'):
+                    return address['addr']
+            raise exceptions.ServerUnreachable(server_id=server['id'])
+        else:
+            raise lib_exc.InvalidConfiguration()
+
+
+class NetworkScenarioTest(ScenarioTest):
+    """Base class for network scenario tests.
+
+    This class provide helpers for network scenario tests, using the neutron
+    API. Helpers from ancestor which use the nova network API are overridden
+    with the neutron API.
+
+    This Class also enforces using Neutron instead of novanetwork.
+    Subclassed tests will be skipped if Neutron is not enabled
+
+    """
+
+    credentials = ['primary', 'admin']
+
+    @classmethod
+    def skip_checks(cls):
+        super(NetworkScenarioTest, cls).skip_checks()
+        if not CONF.service_available.neutron:
+            raise cls.skipException('Neutron not available')
+
+    def _create_network(self, networks_client=None,
+                        tenant_id=None,
+                        namestart='network-smoke-',
+                        port_security_enabled=True):
+        if not networks_client:
+            networks_client = self.networks_client
+        if not tenant_id:
+            tenant_id = networks_client.tenant_id
+        name = data_utils.rand_name(namestart)
+        network_kwargs = dict(name=name, tenant_id=tenant_id)
+        # Neutron disables port security by default so we have to check the
+        # config before trying to create the network with port_security_enabled
+        if CONF.network_feature_enabled.port_security:
+            network_kwargs['port_security_enabled'] = port_security_enabled
+        result = networks_client.create_network(**network_kwargs)
+        network = result['network']
+
+        self.assertEqual(network['name'], name)
+        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+                        networks_client.delete_network,
+                        network['id'])
+        return network
+
+    def _create_subnet(self, network, subnets_client=None,
+                       routers_client=None, namestart='subnet-smoke',
+                       **kwargs):
+        """Create a subnet for the given network
+
+        within the cidr block configured for tenant networks.
+        """
+        if not subnets_client:
+            subnets_client = self.subnets_client
+        if not routers_client:
+            routers_client = self.routers_client
+
+        def cidr_in_use(cidr, tenant_id):
+            """Check cidr existence
+
+            :returns: True if subnet with cidr already exist in tenant
+                  False else
+            """
+            cidr_in_use = self.admin_manager.subnets_client.list_subnets(
+                tenant_id=tenant_id, cidr=cidr)['subnets']
+            return len(cidr_in_use) != 0
+
+        ip_version = kwargs.pop('ip_version', 4)
+
+        if ip_version == 6:
+            tenant_cidr = netaddr.IPNetwork(
+                CONF.network.project_network_v6_cidr)
+            num_bits = CONF.network.project_network_v6_mask_bits
+        else:
+            tenant_cidr = netaddr.IPNetwork(CONF.network.project_network_cidr)
+            num_bits = CONF.network.project_network_mask_bits
+
+        result = None
+        str_cidr = None
+        # Repeatedly attempt subnet creation with sequential cidr
+        # blocks until an unallocated block is found.
+        for subnet_cidr in tenant_cidr.subnet(num_bits):
+            str_cidr = str(subnet_cidr)
+            if cidr_in_use(str_cidr, tenant_id=network['tenant_id']):
+                continue
+
+            subnet = dict(
+                name=data_utils.rand_name(namestart),
+                network_id=network['id'],
+                tenant_id=network['tenant_id'],
+                cidr=str_cidr,
+                ip_version=ip_version,
+                **kwargs
+            )
+            try:
+                result = subnets_client.create_subnet(**subnet)
+                break
+            except lib_exc.Conflict as e:
+                if 'overlaps with another subnet' not in six.text_type(e):
+                    raise
+        self.assertIsNotNone(result, 'Unable to allocate tenant network')
+
+        subnet = result['subnet']
+        self.assertEqual(subnet['cidr'], str_cidr)
+
+        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+                        subnets_client.delete_subnet, subnet['id'])
+
+        return subnet
+
+    def _get_server_port_id_and_ip4(self, server, ip_addr=None):
+        ports = self.admin_manager.ports_client.list_ports(
+            device_id=server['id'], fixed_ip=ip_addr)['ports']
+        # A port can have more than one IP address in some cases.
+        # If the network is dual-stack (IPv4 + IPv6), this port is associated
+        # with 2 subnets
+        p_status = ['ACTIVE']
+        # NOTE(vsaienko) With Ironic, instances live on separate hardware
+        # servers. Neutron does not bind ports for Ironic instances, as a
+        # result the port remains in the DOWN state.
+        # TODO(vsaienko) remove once bug: #1599836 is resolved.
+        if getattr(CONF.service_available, 'ironic', False):
+            p_status.append('DOWN')
+        port_map = [(p["id"], fxip["ip_address"])
+                    for p in ports
+                    for fxip in p["fixed_ips"]
+                    if netutils.is_valid_ipv4(fxip["ip_address"])
+                    and p['status'] in p_status]
+        inactive = [p for p in ports if p['status'] != 'ACTIVE']
+        if inactive:
+            LOG.warning("Instance has ports that are not ACTIVE: %s", inactive)
+
+        self.assertNotEqual(0, len(port_map),
+                            "No IPv4 addresses found in: %s" % ports)
+        self.assertEqual(len(port_map), 1,
+                         "Found multiple IPv4 addresses: %s. "
+                         "Unable to determine which port to target."
+                         % port_map)
+        return port_map[0]
+
+    def _get_network_by_name(self, network_name):
+        net = self.admin_manager.networks_client.list_networks(
+            name=network_name)['networks']
+        self.assertNotEqual(len(net), 0,
+                            "Unable to get network by name: %s" % network_name)
+        return net[0]
+
+    def create_floating_ip(self, thing, external_network_id=None,
+                           port_id=None, client=None):
+        """Create a floating IP and associates to a resource/port on Neutron"""
+        if not external_network_id:
+            external_network_id = CONF.network.public_network_id
+        if not client:
+            client = self.floating_ips_client
+        if not port_id:
+            port_id, ip4 = self._get_server_port_id_and_ip4(thing)
+        else:
+            ip4 = None
+        result = client.create_floatingip(
+            floating_network_id=external_network_id,
+            port_id=port_id,
+            tenant_id=thing['tenant_id'],
+            fixed_ip_address=ip4
+        )
+        floating_ip = result['floatingip']
+        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+                        client.delete_floatingip,
+                        floating_ip['id'])
+        return floating_ip
+
+    def _associate_floating_ip(self, floating_ip, server):
+        port_id, _ = self._get_server_port_id_and_ip4(server)
+        kwargs = dict(port_id=port_id)
+        floating_ip = self.floating_ips_client.update_floatingip(
+            floating_ip['id'], **kwargs)['floatingip']
+        self.assertEqual(port_id, floating_ip['port_id'])
+        return floating_ip
+
+    def _disassociate_floating_ip(self, floating_ip):
+        """:param floating_ip: floating_ips_client.create_floatingip"""
+        kwargs = dict(port_id=None)
+        floating_ip = self.floating_ips_client.update_floatingip(
+            floating_ip['id'], **kwargs)['floatingip']
+        self.assertIsNone(floating_ip['port_id'])
+        return floating_ip
+
+    def check_floating_ip_status(self, floating_ip, status):
+        """Verifies floatingip reaches the given status
+
+        :param dict floating_ip: floating IP dict to check status
+        :param status: target status
+        :raises: AssertionError if status doesn't match
+        """
+        floatingip_id = floating_ip['id']
+
+        def refresh():
+            result = (self.floating_ips_client.
+                      show_floatingip(floatingip_id)['floatingip'])
+            return status == result['status']
+
+        test_utils.call_until_true(refresh,
+                                   CONF.network.build_timeout,
+                                   CONF.network.build_interval)
+        floating_ip = self.floating_ips_client.show_floatingip(
+            floatingip_id)['floatingip']
+        self.assertEqual(status, floating_ip['status'],
+                         message="FloatingIP: {fp} is at status: {cst}. "
+                                 "failed  to reach status: {st}"
+                         .format(fp=floating_ip, cst=floating_ip['status'],
+                                 st=status))
+        LOG.info("FloatingIP: {fp} is at status: {st}"
+                 .format(fp=floating_ip, st=status))
+
+    def _check_tenant_network_connectivity(self, server,
+                                           username,
+                                           private_key,
+                                           should_connect=True,
+                                           servers_for_debug=None):
+        if not CONF.network.project_networks_reachable:
+            msg = 'Tenant networks not configured to be reachable.'
+            LOG.info(msg)
+            return
+        # The target login is assumed to have been configured for
+        # key-based authentication by cloud-init.
+        try:
+            for ip_addresses in server['addresses'].values():
+                for ip_address in ip_addresses:
+                    self.check_vm_connectivity(ip_address['addr'],
+                                               username,
+                                               private_key,
+                                               should_connect=should_connect)
+        except Exception as e:
+            LOG.exception('Tenant network connectivity check failed')
+            self._log_console_output(servers_for_debug)
+            self._log_net_info(e)
+            raise
+
+    def _check_remote_connectivity(self, source, dest, should_succeed=True,
+                                   nic=None):
+        """assert ping server via source ssh connection
+
+        Note: This is an internal method.  Use check_remote_connectivity
+        instead.
+
+        :param source: RemoteClient: an ssh connection from which to ping
+        :param dest: and IP to ping against
+        :param should_succeed: boolean should ping succeed or not
+        :param nic: specific network interface to ping from
+        """
+        def ping_remote():
+            try:
+                source.ping_host(dest, nic=nic)
+            except lib_exc.SSHExecCommandFailed:
+                LOG.warning('Failed to ping IP: %s via a ssh connection '
+                            'from: %s.', dest, source.ssh_client.host)
+                return not should_succeed
+            return should_succeed
+
+        return test_utils.call_until_true(ping_remote,
+                                          CONF.validation.ping_timeout,
+                                          1)
+
+    def check_remote_connectivity(self, source, dest, should_succeed=True,
+                                  nic=None):
+        """assert ping server via source ssh connection
+
+        :param source: RemoteClient: an ssh connection from which to ping
+        :param dest: and IP to ping against
+        :param should_succeed: boolean should ping succeed or not
+        :param nic: specific network interface to ping from
+        """
+        result = self._check_remote_connectivity(source, dest, should_succeed,
+                                                 nic)
+        source_host = source.ssh_client.host
+        if should_succeed:
+            msg = "Timed out waiting for %s to become reachable from %s" \
+                % (dest, source_host)
+        else:
+            msg = "%s is reachable from %s" % (dest, source_host)
+        self.assertTrue(result, msg)
+
+    def _create_security_group(self, security_group_rules_client=None,
+                               tenant_id=None,
+                               namestart='secgroup-smoke',
+                               security_groups_client=None):
+        if security_group_rules_client is None:
+            security_group_rules_client = self.security_group_rules_client
+        if security_groups_client is None:
+            security_groups_client = self.security_groups_client
+        if tenant_id is None:
+            tenant_id = security_groups_client.tenant_id
+        secgroup = self._create_empty_security_group(
+            namestart=namestart, client=security_groups_client,
+            tenant_id=tenant_id)
+
+        # Add rules to the security group
+        rules = self._create_loginable_secgroup_rule(
+            security_group_rules_client=security_group_rules_client,
+            secgroup=secgroup,
+            security_groups_client=security_groups_client)
+        for rule in rules:
+            self.assertEqual(tenant_id, rule['tenant_id'])
+            self.assertEqual(secgroup['id'], rule['security_group_id'])
+        return secgroup
+
+    def _create_empty_security_group(self, client=None, tenant_id=None,
+                                     namestart='secgroup-smoke'):
+        """Create a security group without rules.
+
+        Default rules will be created:
+         - IPv4 egress to any
+         - IPv6 egress to any
+
+        :param tenant_id: secgroup will be created in this tenant
+        :returns: the created security group
+        """
+        if client is None:
+            client = self.security_groups_client
+        if not tenant_id:
+            tenant_id = client.tenant_id
+        sg_name = data_utils.rand_name(namestart)
+        sg_desc = sg_name + " description"
+        sg_dict = dict(name=sg_name,
+                       description=sg_desc)
+        sg_dict['tenant_id'] = tenant_id
+        result = client.create_security_group(**sg_dict)
+
+        secgroup = result['security_group']
+        self.assertEqual(secgroup['name'], sg_name)
+        self.assertEqual(tenant_id, secgroup['tenant_id'])
+        self.assertEqual(secgroup['description'], sg_desc)
+
+        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+                        client.delete_security_group, secgroup['id'])
+        return secgroup
+
+    def _default_security_group(self, client=None, tenant_id=None):
+        """Get default secgroup for given tenant_id.
+
+        :returns: default secgroup for given tenant
+        """
+        if client is None:
+            client = self.security_groups_client
+        if not tenant_id:
+            tenant_id = client.tenant_id
+        sgs = [
+            sg for sg in list(client.list_security_groups().values())[0]
+            if sg['tenant_id'] == tenant_id and sg['name'] == 'default'
+        ]
+        msg = "No default security group for tenant %s." % (tenant_id)
+        self.assertGreater(len(sgs), 0, msg)
+        return sgs[0]
+
+    def _create_security_group_rule(self, secgroup=None,
+                                    sec_group_rules_client=None,
+                                    tenant_id=None,
+                                    security_groups_client=None, **kwargs):
+        """Create a rule from a dictionary of rule parameters.
+
+        Create a rule in a secgroup. if secgroup not defined will search for
+        default secgroup in tenant_id.
+
+        :param secgroup: the security group.
+        :param tenant_id: if secgroup not passed -- the tenant in which to
+            search for default secgroup
+        :param kwargs: a dictionary containing rule parameters:
+            for example, to allow incoming ssh:
+            rule = {
+                    direction: 'ingress'
+                    protocol:'tcp',
+                    port_range_min: 22,
+                    port_range_max: 22
+                    }
+        """
+        if sec_group_rules_client is None:
+            sec_group_rules_client = self.security_group_rules_client
+        if security_groups_client is None:
+            security_groups_client = self.security_groups_client
+        if not tenant_id:
+            tenant_id = security_groups_client.tenant_id
+        if secgroup is None:
+            secgroup = self._default_security_group(
+                client=security_groups_client, tenant_id=tenant_id)
+
+        ruleset = dict(security_group_id=secgroup['id'],
+                       tenant_id=secgroup['tenant_id'])
+        ruleset.update(kwargs)
+
+        sg_rule = sec_group_rules_client.create_security_group_rule(**ruleset)
+        sg_rule = sg_rule['security_group_rule']
+
+        self.assertEqual(secgroup['tenant_id'], sg_rule['tenant_id'])
+        self.assertEqual(secgroup['id'], sg_rule['security_group_id'])
+
+        return sg_rule
+
+    def _create_loginable_secgroup_rule(self, security_group_rules_client=None,
+                                        secgroup=None,
+                                        security_groups_client=None):
+        """Create loginable security group rule
+
+        This function will create:
+        1. egress and ingress tcp port 22 allow rule in order to allow ssh
+        access for ipv4.
+        2. egress and ingress ipv6 icmp allow rule, in order to allow icmpv6.
+        3. egress and ingress ipv4 icmp allow rule, in order to allow icmpv4.
+        """
+
+        if security_group_rules_client is None:
+            security_group_rules_client = self.security_group_rules_client
+        if security_groups_client is None:
+            security_groups_client = self.security_groups_client
+        rules = []
+        rulesets = [
+            dict(
+                # ssh
+                protocol='tcp',
+                port_range_min=22,
+                port_range_max=22,
+            ),
+            dict(
+                # ping
+                protocol='icmp',
+            ),
+            dict(
+                # ipv6-icmp for ping6
+                protocol='icmp',
+                ethertype='IPv6',
+            )
+        ]
+        sec_group_rules_client = security_group_rules_client
+        for ruleset in rulesets:
+            for r_direction in ['ingress', 'egress']:
+                ruleset['direction'] = r_direction
+                try:
+                    sg_rule = self._create_security_group_rule(
+                        sec_group_rules_client=sec_group_rules_client,
+                        secgroup=secgroup,
+                        security_groups_client=security_groups_client,
+                        **ruleset)
+                except lib_exc.Conflict as ex:
+                    # if rule already exist - skip rule and continue
+                    msg = 'Security group rule already exists'
+                    if msg not in ex._error_string:
+                        raise ex
+                else:
+                    self.assertEqual(r_direction, sg_rule['direction'])
+                    rules.append(sg_rule)
+
+        return rules
+
+    def _get_router(self, client=None, tenant_id=None):
+        """Retrieve a router for the given tenant id.
+
+        If a public router has been configured, it will be returned.
+
+        If a public router has not been configured, but a public
+        network has, a tenant router will be created and returned that
+        routes traffic to the public network.
+        """
+        if not client:
+            client = self.routers_client
+        if not tenant_id:
+            tenant_id = client.tenant_id
+        router_id = CONF.network.public_router_id
+        network_id = CONF.network.public_network_id
+        if router_id:
+            body = client.show_router(router_id)
+            return body['router']
+        elif network_id:
+            router = self._create_router(client, tenant_id)
+            kwargs = {'external_gateway_info': dict(network_id=network_id)}
+            router = client.update_router(router['id'], **kwargs)['router']
+            return router
+        else:
+            raise Exception("Neither of 'public_router_id' or "
+                            "'public_network_id' has been defined.")
+
+    def _create_router(self, client=None, tenant_id=None,
+                       namestart='router-smoke'):
+        if not client:
+            client = self.routers_client
+        if not tenant_id:
+            tenant_id = client.tenant_id
+        name = data_utils.rand_name(namestart)
+        result = client.create_router(name=name,
+                                      admin_state_up=True,
+                                      tenant_id=tenant_id)
+        router = result['router']
+        self.assertEqual(router['name'], name)
+        self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+                        client.delete_router,
+                        router['id'])
+        return router
+
+    def _update_router_admin_state(self, router, admin_state_up):
+        kwargs = dict(admin_state_up=admin_state_up)
+        router = self.routers_client.update_router(
+            router['id'], **kwargs)['router']
+        self.assertEqual(admin_state_up, router['admin_state_up'])
+
+    def create_networks(self, networks_client=None,
+                        routers_client=None, subnets_client=None,
+                        tenant_id=None, dns_nameservers=None,
+                        port_security_enabled=True):
+        """Create a network with a subnet connected to a router.
+
+        The baremetal driver is a special case since all nodes are
+        on the same shared network.
+
+        :param tenant_id: id of tenant to create resources in.
+        :param dns_nameservers: list of dns servers to send to subnet.
+        :returns: network, subnet, router
+        """
+        if CONF.network.shared_physical_network:
+            # NOTE(Shrews): This exception is for environments where tenant
+            # credential isolation is available, but network separation is
+            # not (the current baremetal case). Likely can be removed when
+            # test account mgmt is reworked:
+            # https://blueprints.launchpad.net/tempest/+spec/test-accounts
+            if not CONF.compute.fixed_network_name:
+                m = 'fixed_network_name must be specified in config'
+                raise lib_exc.InvalidConfiguration(m)
+            network = self._get_network_by_name(
+                CONF.compute.fixed_network_name)
+            router = None
+            subnet = None
+        else:
+            network = self._create_network(
+                networks_client=networks_client,
+                tenant_id=tenant_id,
+                port_security_enabled=port_security_enabled)
+            router = self._get_router(client=routers_client,
+                                      tenant_id=tenant_id)
+            subnet_kwargs = dict(network=network,
+                                 subnets_client=subnets_client,
+                                 routers_client=routers_client)
+            # use explicit check because empty list is a valid option
+            if dns_nameservers is not None:
+                subnet_kwargs['dns_nameservers'] = dns_nameservers
+            subnet = self._create_subnet(**subnet_kwargs)
+            if not routers_client:
+                routers_client = self.routers_client
+            router_id = router['id']
+            routers_client.add_router_interface(router_id,
+                                                subnet_id=subnet['id'])
+
+            # save a cleanup job to remove this association between
+            # router and subnet
+            self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+                            routers_client.remove_router_interface, router_id,
+                            subnet_id=subnet['id'])
+        return network, subnet, router
diff --git a/manila_tempest_tests/tests/scenario/manager_share.py b/manila_tempest_tests/tests/scenario/manager_share.py
index b29449a..6c25785 100644
--- a/manila_tempest_tests/tests/scenario/manager_share.py
+++ b/manila_tempest_tests/tests/scenario/manager_share.py
@@ -18,13 +18,13 @@
 
 from tempest import config
 from tempest.lib.common.utils import data_utils
-from tempest.scenario import manager
 
 from manila_tempest_tests.common import constants
 from manila_tempest_tests.common import remote_client
 from manila_tempest_tests.services.share.json import shares_client
 from manila_tempest_tests.services.share.v2.json import (
     shares_client as shares_v2_client)
+from manila_tempest_tests.tests.scenario import manager
 
 CONF = config.CONF
 LOG = log.getLogger(__name__)
@@ -212,7 +212,8 @@
         if isinstance(server_or_ip, six.string_types):
             ip = server_or_ip
         else:
-            addr = server_or_ip['addresses'][CONF.compute.network_for_ssh][0]
+            addr = server_or_ip['addresses'][
+                CONF.validation.network_for_ssh][0]
             ip = addr['addr']
 
         # NOTE(u_glide): Both options (pkey and password) are required here to
diff --git a/manila_tempest_tests/tests/scenario/test_share_basic_ops.py b/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
index cd77294..835e325 100644
--- a/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
+++ b/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
@@ -312,7 +312,7 @@
         instance = self.boot_instance(wait_until="BUILD")
         self.create_share()
         instance = self.wait_for_active_instance(instance["id"])
-        self.share = self.shares_client.get_share(self.share['id'])
+        self.share = self.shares_admin_v2_client.get_share(self.share['id'])
 
         default_type = self.shares_v2_client.list_share_types(
             default=True)['share_type']