Add access-rules tests to improve the coverage

Add test to make sure access rule stays intact
after share replica is promoted.
Add test to make sure access rule can't be added
when the share replica status is error.
Add test to make sure access rule can't be added
to the share which failed to build successfully.
Add asserts to assert the state of a new access rule
in existing access rule tests.

Partially-implements: bp fix-and-improve-access-rules

Change-Id: Ic702c0374c4e220553d833dfea167ed8eb38e45a
diff --git a/manila_tempest_tests/tests/api/base.py b/manila_tempest_tests/tests/api/base.py
index 7837195..9cf3e69 100644
--- a/manila_tempest_tests/tests/api/base.py
+++ b/manila_tempest_tests/tests/api/base.py
@@ -495,6 +495,7 @@
         data = []
         for d in share_data_list:
             client = d["kwargs"].pop("client", cls.shares_v2_client)
+            wait_for_status = d["kwargs"].pop("wait_for_status", True)
             local_d = {
                 "args": d["args"],
                 "kwargs": copy.deepcopy(d["kwargs"]),
@@ -504,10 +505,13 @@
                 *local_d["args"], **local_d["kwargs"])
             local_d["cnt"] = 0
             local_d["available"] = False
+            local_d["wait_for_status"] = wait_for_status
             data.append(local_d)
 
         while not all(d["available"] for d in data):
             for d in data:
+                if not d["wait_for_status"]:
+                    d["available"] = True
                 if d["available"]:
                     continue
                 client = d["kwargs"]["client"]
@@ -704,6 +708,33 @@
             status_attr="replica_state")
         return replica
 
+    def _get_access_rule_data_from_config(self):
+        """Get the first available access type/to combination from config.
+
+        This method opportunistically picks the first configured protocol
+        to create the share. Do not use this method in tests where you need
+        to test depth and breadth in the access types and access recipients.
+        """
+        protocol = self.shares_v2_client.share_protocol
+
+        if protocol in CONF.share.enable_ip_rules_for_protocols:
+            access_type = "ip"
+            access_to = utils.rand_ip()
+        elif protocol in CONF.share.enable_user_rules_for_protocols:
+            access_type = "user"
+            access_to = CONF.share.username_for_user_rules
+        elif protocol in CONF.share.enable_cert_rules_for_protocols:
+            access_type = "cert"
+            access_to = "client3.com"
+        elif protocol in CONF.share.enable_cephx_rules_for_protocols:
+            access_type = "cephx"
+            access_to = "eve"
+        else:
+            message = "Unrecognized protocol and access rules configuration."
+            raise self.skipException(message)
+
+        return access_type, access_to
+
     @classmethod
     def create_share_network(cls, client=None,
                              cleanup_in_class=False, **kwargs):
diff --git a/manila_tempest_tests/tests/api/test_replication.py b/manila_tempest_tests/tests/api/test_replication.py
index 1738e87..1a1a7c9 100644
--- a/manila_tempest_tests/tests/api/test_replication.py
+++ b/manila_tempest_tests/tests/api/test_replication.py
@@ -21,7 +21,6 @@
 from manila_tempest_tests.common import constants
 from manila_tempest_tests import share_exceptions
 from manila_tempest_tests.tests.api import base
-from manila_tempest_tests import utils
 
 CONF = config.CONF
 _MIN_SUPPORTED_MICROVERSION = '2.11'
@@ -72,9 +71,6 @@
         cls.instance_id1 = cls._get_instance(cls.shares[0])
         cls.instance_id2 = cls._get_instance(cls.shares[1])
 
-        cls.access_type = "ip"
-        cls.access_to = utils.rand_ip()
-
     @classmethod
     def _get_instance(cls, share):
         share_instances = cls.admin_client.get_instances_of_share(share["id"])
@@ -107,24 +103,36 @@
         return [replica for replica in replica_list
                 if replica['replica_state'] == r_state]
 
-    def _verify_config_and_set_access_rule_data(self):
-        """Verify the access rule configuration is enabled for NFS.
+    def _verify_in_sync_replica_promotion(self, share, original_replica):
+        # Verify that 'in-sync' replica has been promoted successfully
 
-        Set the data after verification.
-        """
-        protocol = self.shares_v2_client.share_protocol
+        # NOTE(Yogi1): Cleanup needs to be disabled for replica that is
+        # being promoted since it will become the 'primary'/'active' replica.
+        replica = self.create_share_replica(share["id"], self.replica_zone,
+                                            cleanup=False)
+        # Wait for replica state to update after creation
+        self.shares_v2_client.wait_for_share_replica_status(
+            replica['id'], constants.REPLICATION_STATE_IN_SYNC,
+            status_attr='replica_state')
+        # Promote the first in_sync replica to active state
+        promoted_replica = self.promote_share_replica(replica['id'])
+        # Delete the demoted replica so promoted replica can be cleaned
+        # during the cleanup of the share.
+        self.addCleanup(self.delete_share_replica, original_replica['id'])
+        self._verify_active_replica_count(share["id"])
+        # Verify the replica_state for promoted replica
+        promoted_replica = self.shares_v2_client.get_share_replica(
+            promoted_replica["id"])
+        self.assertEqual(constants.REPLICATION_STATE_ACTIVE,
+                         promoted_replica["replica_state"])
 
-        # TODO(Yogi1): Add access rules for other protocols.
-        if not ((protocol.lower() == 'nfs') and
-                (protocol in CONF.share.enable_ip_rules_for_protocols) and
-                CONF.share.enable_ip_rules_for_protocols):
-            message = "IP access rules are not supported for this protocol."
-            raise self.skipException(message)
-
-        access_type = "ip"
-        access_to = utils.rand_ip()
-
-        return access_type, access_to
+    def _check_skip_promotion_tests(self):
+        # Check if the replication type is right for replica promotion tests
+        if (self.replication_type
+                not in constants.REPLICATION_PROMOTION_CHOICES):
+            msg = "Option backend_replication_type should be one of (%s)!"
+            raise self.skipException(
+                msg % ','.join(constants.REPLICATION_PROMOTION_CHOICES))
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
     def test_add_delete_share_replica(self):
@@ -137,7 +145,7 @@
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
     def test_add_access_rule_create_replica_delete_rule(self):
         # Add access rule to the share
-        access_type, access_to = self._verify_config_and_set_access_rule_data()
+        access_type, access_to = self._get_access_rule_data_from_config()
         rule = self.shares_v2_client.create_access_rule(
             self.shares[0]["id"], access_type, access_to, 'ro')
         self.shares_v2_client.wait_for_access_rule_status(
@@ -159,7 +167,7 @@
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
     def test_create_replica_add_access_rule_delete_replica(self):
-        access_type, access_to = self._verify_config_and_set_access_rule_data()
+        access_type, access_to = self._get_access_rule_data_from_config()
         # Create the replica
         share_replica = self._verify_create_replica()
 
@@ -208,42 +216,40 @@
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
     def test_promote_in_sync_share_replica(self):
         # Test promote 'in_sync' share_replica to 'active' state
-        if (self.replication_type
-                not in constants.REPLICATION_PROMOTION_CHOICES):
-            msg = "Option backend_replication_type should be one of (%s)!"
-            raise self.skipException(
-                msg % ','.join(constants.REPLICATION_PROMOTION_CHOICES))
+        self._check_skip_promotion_tests()
         share = self.create_shares([self.creation_data])[0]
         original_replica = self.shares_v2_client.list_share_replicas(
             share["id"])[0]
-        # NOTE(Yogi1): Cleanup needs to be disabled for replica that is
-        # being promoted since it will become the 'primary'/'active' replica.
-        replica = self.create_share_replica(share["id"], self.replica_zone,
-                                            cleanup=False)
-        # Wait for replica state to update after creation
-        self.shares_v2_client.wait_for_share_replica_status(
-            replica['id'], constants.REPLICATION_STATE_IN_SYNC,
-            status_attr='replica_state')
-        # Promote the first in_sync replica to active state
-        promoted_replica = self.promote_share_replica(replica['id'])
-        # Delete the demoted replica so promoted replica can be cleaned
-        # during the cleanup of the share.
-        self.addCleanup(self.delete_share_replica, original_replica['id'])
-        self._verify_active_replica_count(share["id"])
-        # Verify the replica_state for promoted replica
-        promoted_replica = self.shares_v2_client.get_share_replica(
-            promoted_replica["id"])
-        self.assertEqual(constants.REPLICATION_STATE_ACTIVE,
-                         promoted_replica["replica_state"])
+        self._verify_in_sync_replica_promotion(share, original_replica)
+
+    @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
+    def test_add_rule_promote_share_replica_verify_rule(self):
+        # Verify the access rule stays intact after share replica promotion
+        self._check_skip_promotion_tests()
+
+        share = self.create_shares([self.creation_data])[0]
+        # Add access rule
+        access_type, access_to = self._get_access_rule_data_from_config()
+        rule = self.shares_v2_client.create_access_rule(
+            share["id"], access_type, access_to, 'ro')
+        self.shares_v2_client.wait_for_access_rule_status(
+            share["id"], rule["id"], constants.RULE_STATE_ACTIVE)
+
+        original_replica = self.shares_v2_client.list_share_replicas(
+            share["id"])[0]
+        self._verify_in_sync_replica_promotion(share, original_replica)
+
+        # verify rule's values
+        rules_list = self.shares_v2_client.list_access_rules(share["id"])
+        self.assertEqual(1, len(rules_list))
+        self.assertEqual(access_type, rules_list[0]["access_type"])
+        self.assertEqual(access_to, rules_list[0]["access_to"])
+        self.assertEqual('ro', rules_list[0]["access_level"])
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
     def test_promote_and_promote_back(self):
         # Test promote back and forth between 2 share replicas
-        if (self.replication_type
-                not in constants.REPLICATION_PROMOTION_CHOICES):
-            msg = "Option backend_replication_type should be one of (%s)!"
-            raise self.skipException(
-                msg % ','.join(constants.REPLICATION_PROMOTION_CHOICES))
+        self._check_skip_promotion_tests()
 
         # Create a new share
         share = self.create_shares([self.creation_data])[0]
diff --git a/manila_tempest_tests/tests/api/test_replication_negative.py b/manila_tempest_tests/tests/api/test_replication_negative.py
index 48b8d5c..c4d08aa 100644
--- a/manila_tempest_tests/tests/api/test_replication_negative.py
+++ b/manila_tempest_tests/tests/api/test_replication_negative.py
@@ -159,6 +159,22 @@
         # Try promoting the replica
         self.shares_v2_client.promote_share_replica(replica['id'])
 
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+    def test_add_access_rule_share_replica_error_status(self):
+        access_type, access_to = self._get_access_rule_data_from_config()
+        # Create the replica
+        share_replica = self.create_share_replica(self.share1["id"],
+                                                  self.replica_zone,
+                                                  cleanup_in_class=False)
+        # Reset the replica status to error
+        self.admin_client.reset_share_replica_status(
+            share_replica['id'], constants.STATUS_ERROR)
+
+        # Verify access rule cannot be added
+        self.assertRaises(lib_exc.BadRequest,
+                          self.admin_client.create_access_rule,
+                          self.share1["id"], access_type, access_to, 'ro')
+
 
 @testtools.skipUnless(CONF.share.run_replication_tests,
                       'Replication tests are disabled.')
diff --git a/manila_tempest_tests/tests/api/test_rules.py b/manila_tempest_tests/tests/api/test_rules.py
index 180ed79..024590d 100644
--- a/manila_tempest_tests/tests/api/test_rules.py
+++ b/manila_tempest_tests/tests/api/test_rules.py
@@ -13,6 +13,8 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import itertools
+
 import ddt
 from tempest import config
 from tempest.lib import exceptions as lib_exc
@@ -44,6 +46,12 @@
     for key in ('deleted', 'deleted_at', 'instance_mappings'):
         self.assertNotIn(key, rule.keys())
 
+    # rules must start out in 'new' until 2.28 & 'queued_to_apply' after 2.28
+    if utils.is_microversion_le(version, "2.27"):
+        self.assertEqual("new", rule['state'])
+    else:
+        self.assertEqual("queued_to_apply", rule['state'])
+
     if utils.is_microversion_le(version, '2.9'):
         self.shares_client.wait_for_access_rule_status(
             self.share["id"], rule["id"], "active")
@@ -51,6 +59,11 @@
         self.shares_v2_client.wait_for_share_status(
             self.share["id"], "active", status_attr='access_rules_status',
             version=version)
+        # If the 'access_rules_status' transitions to 'active',
+        # rule state must too
+        rules = self.shares_v2_client.list_access_rules(self.share['id'])
+        rule = [r for r in rules if r['id'] == rule['id']][0]
+        self.assertEqual("active", rule['state'])
 
     if utils.is_microversion_eq(version, '1.0'):
         self.shares_client.delete_access_rule(self.share["id"], rule["id"])
@@ -79,7 +92,7 @@
         cls.access_to = "2.2.2.2"
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_access_rules_with_one_ip(self, version):
 
         # test data
@@ -98,6 +111,12 @@
         for key in ('deleted', 'deleted_at', 'instance_mappings'):
             self.assertNotIn(key, rule.keys())
 
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
+
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
                 self.share["id"], rule["id"], "active")
@@ -121,7 +140,7 @@
                 rule_id=rule["id"], share_id=self.share['id'], version=version)
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_access_rule_with_cidr(self, version):
 
         # test data
@@ -140,6 +159,12 @@
             self.assertNotIn(key, rule.keys())
         self.assertEqual('rw', rule['access_level'])
 
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
+
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
                 self.share["id"], rule["id"], "active")
@@ -166,7 +191,7 @@
     @testtools.skipIf(
         "nfs" not in CONF.share.enable_ro_access_level_for_protocols,
         "RO access rule tests are disabled for NFS protocol.")
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_ro_access_rule(self, client_name):
         _create_delete_ro_access_rule(self, client_name)
 
@@ -179,7 +204,7 @@
     @testtools.skipIf(
         "cifs" not in CONF.share.enable_ro_access_level_for_protocols,
         "RO access rule tests are disabled for CIFS protocol.")
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_ro_access_rule(self, version):
         _create_delete_ro_access_rule(self, version)
 
@@ -201,7 +226,7 @@
         cls.access_to = CONF.share.username_for_user_rules
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_user_rule(self, version):
 
         # create rule
@@ -217,6 +242,12 @@
         for key in ('deleted', 'deleted_at', 'instance_mappings'):
             self.assertNotIn(key, rule.keys())
 
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
+
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
                 self.share["id"], rule["id"], "active")
@@ -243,7 +274,7 @@
     @testtools.skipIf(
         "nfs" not in CONF.share.enable_ro_access_level_for_protocols,
         "RO access rule tests are disabled for NFS protocol.")
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_ro_access_rule(self, version):
         _create_delete_ro_access_rule(self, version)
 
@@ -256,7 +287,7 @@
     @testtools.skipIf(
         "cifs" not in CONF.share.enable_ro_access_level_for_protocols,
         "RO access rule tests are disabled for CIFS protocol.")
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_ro_access_rule(self, version):
         _create_delete_ro_access_rule(self, version)
 
@@ -280,7 +311,7 @@
         cls.access_to = "client1.com"
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_cert_rule(self, version):
 
         # create rule
@@ -296,6 +327,12 @@
         for key in ('deleted', 'deleted_at', 'instance_mappings'):
             self.assertNotIn(key, rule.keys())
 
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
+
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
                 self.share["id"], rule["id"], "active")
@@ -322,7 +359,7 @@
     @testtools.skipIf(
         "glusterfs" not in CONF.share.enable_ro_access_level_for_protocols,
         "RO access rule tests are disabled for GLUSTERFS protocol.")
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_cert_ro_access_rule(self, version):
         if utils.is_microversion_eq(version, '1.0'):
             rule = self.shares_client.create_access_rule(
@@ -336,6 +373,12 @@
         for key in ('deleted', 'deleted_at', 'instance_mappings'):
             self.assertNotIn(key, rule.keys())
 
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
+
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
                 self.share["id"], rule["id"], "active")
@@ -377,10 +420,13 @@
         cls.access_to = "bob"
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
-    @ddt.data("alice", "alice_bob", "alice bob")
-    def test_create_delete_cephx_rule(self, access_to):
+    @ddt.data(*itertools.product(
+        set(['2.13', '2.27', '2.28', LATEST_MICROVERSION]),
+        ("alice", "alice_bob", "alice bob")))
+    @ddt.unpack
+    def test_create_delete_cephx_rule(self, version, access_to):
         rule = self.shares_v2_client.create_access_rule(
-            self.share["id"], self.access_type, access_to)
+            self.share["id"], self.access_type, access_to, version=version)
 
         self.assertEqual('rw', rule['access_level'])
         for key in ('deleted', 'deleted_at', 'instance_mappings'):
@@ -388,7 +434,8 @@
         self.shares_v2_client.wait_for_access_rule_status(
             self.share["id"], rule["id"], "active")
 
-        self.shares_v2_client.delete_access_rule(self.share["id"], rule["id"])
+        self.shares_v2_client.delete_access_rule(
+            self.share["id"], rule["id"], version=version)
         self.shares_v2_client.wait_for_resource_deletion(
             rule_id=rule["id"], share_id=self.share['id'])
 
@@ -429,7 +476,7 @@
         cls.share = cls.create_share()
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_list_access_rules(self, version):
         if (utils.is_microversion_lt(version, '2.13') and
                 CONF.share.enable_cephx_rules_for_protocols):
@@ -445,6 +492,11 @@
             rule = self.shares_v2_client.create_access_rule(
                 self.share["id"], self.access_type, self.access_to,
                 version=version)
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
 
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
@@ -499,7 +551,7 @@
                 rule_id=rule["id"], share_id=self.share['id'], version=version)
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_access_rules_deleted_if_share_deleted(self, version):
         if (utils.is_microversion_lt(version, '2.13') and
                 CONF.share.enable_cephx_rules_for_protocols):
@@ -519,6 +571,12 @@
                 share["id"], self.access_type, self.access_to,
                 version=version)
 
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
+
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
                 share["id"], rule["id"], "active")
diff --git a/manila_tempest_tests/tests/api/test_rules_negative.py b/manila_tempest_tests/tests/api/test_rules_negative.py
index 9cd4708..20db4eb 100644
--- a/manila_tempest_tests/tests/api/test_rules_negative.py
+++ b/manila_tempest_tests/tests/api/test_rules_negative.py
@@ -27,12 +27,13 @@
 
 
 @ddt.ddt
-class ShareIpRulesForNFSNegativeTest(base.BaseSharesTest):
+class ShareIpRulesForNFSNegativeTest(base.BaseSharesMixedTest):
     protocol = "nfs"
 
     @classmethod
     def resource_setup(cls):
         super(ShareIpRulesForNFSNegativeTest, cls).resource_setup()
+        cls.admin_client = cls.admin_shares_v2_client
         if not (cls.protocol in CONF.share.enable_protocols and
                 cls.protocol in CONF.share.enable_ip_rules_for_protocols):
             msg = "IP rule tests for %s protocol are disabled" % cls.protocol
@@ -158,6 +159,23 @@
             self.shares_v2_client.wait_for_resource_deletion(
                 rule_id=rule["id"], share_id=self.share["id"], version=version)
 
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+    def test_add_access_rule_on_share_with_no_host(self):
+        access_type, access_to = self._get_access_rule_data_from_config()
+        extra_specs = self.add_extra_specs_to_dict(
+            {"share_backend_name": 'invalid_backend'})
+        share_type = self.create_share_type('invalid_backend',
+                                            extra_specs=extra_specs,
+                                            client=self.admin_client)
+        share_type = share_type['share_type']
+        share = self.create_share(share_type_id=share_type['id'],
+                                  client=self.admin_client,
+                                  cleanup_in_class=False,
+                                  wait_for_status=False)
+        self.assertRaises(lib_exc.BadRequest,
+                          self.admin_client.create_access_rule,
+                          share["id"], access_type, access_to)
+
 
 @ddt.ddt
 class ShareIpRulesForCIFSNegativeTest(ShareIpRulesForNFSNegativeTest):