Merge "Add access-rules tests to improve the coverage"
diff --git a/manila_tempest_tests/tests/api/base.py b/manila_tempest_tests/tests/api/base.py
index 7837195..9cf3e69 100644
--- a/manila_tempest_tests/tests/api/base.py
+++ b/manila_tempest_tests/tests/api/base.py
@@ -495,6 +495,7 @@
         data = []
         for d in share_data_list:
             client = d["kwargs"].pop("client", cls.shares_v2_client)
+            wait_for_status = d["kwargs"].pop("wait_for_status", True)
             local_d = {
                 "args": d["args"],
                 "kwargs": copy.deepcopy(d["kwargs"]),
@@ -504,10 +505,13 @@
                 *local_d["args"], **local_d["kwargs"])
             local_d["cnt"] = 0
             local_d["available"] = False
+            local_d["wait_for_status"] = wait_for_status
             data.append(local_d)
 
         while not all(d["available"] for d in data):
             for d in data:
+                if not d["wait_for_status"]:
+                    d["available"] = True
                 if d["available"]:
                     continue
                 client = d["kwargs"]["client"]
@@ -704,6 +708,33 @@
             status_attr="replica_state")
         return replica
 
+    def _get_access_rule_data_from_config(self):
+        """Get the first available access type/to combination from config.
+
+        This method opportunistically picks the first configured protocol
+        to create the share. Do not use this method in tests where you need
+        to test depth and breadth in the access types and access recipients.
+        """
+        protocol = self.shares_v2_client.share_protocol
+
+        if protocol in CONF.share.enable_ip_rules_for_protocols:
+            access_type = "ip"
+            access_to = utils.rand_ip()
+        elif protocol in CONF.share.enable_user_rules_for_protocols:
+            access_type = "user"
+            access_to = CONF.share.username_for_user_rules
+        elif protocol in CONF.share.enable_cert_rules_for_protocols:
+            access_type = "cert"
+            access_to = "client3.com"
+        elif protocol in CONF.share.enable_cephx_rules_for_protocols:
+            access_type = "cephx"
+            access_to = "eve"
+        else:
+            message = "Unrecognized protocol and access rules configuration."
+            raise self.skipException(message)
+
+        return access_type, access_to
+
     @classmethod
     def create_share_network(cls, client=None,
                              cleanup_in_class=False, **kwargs):
diff --git a/manila_tempest_tests/tests/api/test_replication.py b/manila_tempest_tests/tests/api/test_replication.py
index 1738e87..1a1a7c9 100644
--- a/manila_tempest_tests/tests/api/test_replication.py
+++ b/manila_tempest_tests/tests/api/test_replication.py
@@ -21,7 +21,6 @@
 from manila_tempest_tests.common import constants
 from manila_tempest_tests import share_exceptions
 from manila_tempest_tests.tests.api import base
-from manila_tempest_tests import utils
 
 CONF = config.CONF
 _MIN_SUPPORTED_MICROVERSION = '2.11'
@@ -72,9 +71,6 @@
         cls.instance_id1 = cls._get_instance(cls.shares[0])
         cls.instance_id2 = cls._get_instance(cls.shares[1])
 
-        cls.access_type = "ip"
-        cls.access_to = utils.rand_ip()
-
     @classmethod
     def _get_instance(cls, share):
         share_instances = cls.admin_client.get_instances_of_share(share["id"])
@@ -107,24 +103,36 @@
         return [replica for replica in replica_list
                 if replica['replica_state'] == r_state]
 
-    def _verify_config_and_set_access_rule_data(self):
-        """Verify the access rule configuration is enabled for NFS.
+    def _verify_in_sync_replica_promotion(self, share, original_replica):
+        # Verify that 'in-sync' replica has been promoted successfully
 
-        Set the data after verification.
-        """
-        protocol = self.shares_v2_client.share_protocol
+        # NOTE(Yogi1): Cleanup needs to be disabled for replica that is
+        # being promoted since it will become the 'primary'/'active' replica.
+        replica = self.create_share_replica(share["id"], self.replica_zone,
+                                            cleanup=False)
+        # Wait for replica state to update after creation
+        self.shares_v2_client.wait_for_share_replica_status(
+            replica['id'], constants.REPLICATION_STATE_IN_SYNC,
+            status_attr='replica_state')
+        # Promote the first in_sync replica to active state
+        promoted_replica = self.promote_share_replica(replica['id'])
+        # Delete the demoted replica so promoted replica can be cleaned
+        # during the cleanup of the share.
+        self.addCleanup(self.delete_share_replica, original_replica['id'])
+        self._verify_active_replica_count(share["id"])
+        # Verify the replica_state for promoted replica
+        promoted_replica = self.shares_v2_client.get_share_replica(
+            promoted_replica["id"])
+        self.assertEqual(constants.REPLICATION_STATE_ACTIVE,
+                         promoted_replica["replica_state"])
 
-        # TODO(Yogi1): Add access rules for other protocols.
-        if not ((protocol.lower() == 'nfs') and
-                (protocol in CONF.share.enable_ip_rules_for_protocols) and
-                CONF.share.enable_ip_rules_for_protocols):
-            message = "IP access rules are not supported for this protocol."
-            raise self.skipException(message)
-
-        access_type = "ip"
-        access_to = utils.rand_ip()
-
-        return access_type, access_to
+    def _check_skip_promotion_tests(self):
+        # Check if the replication type is right for replica promotion tests
+        if (self.replication_type
+                not in constants.REPLICATION_PROMOTION_CHOICES):
+            msg = "Option backend_replication_type should be one of (%s)!"
+            raise self.skipException(
+                msg % ','.join(constants.REPLICATION_PROMOTION_CHOICES))
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
     def test_add_delete_share_replica(self):
@@ -137,7 +145,7 @@
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
     def test_add_access_rule_create_replica_delete_rule(self):
         # Add access rule to the share
-        access_type, access_to = self._verify_config_and_set_access_rule_data()
+        access_type, access_to = self._get_access_rule_data_from_config()
         rule = self.shares_v2_client.create_access_rule(
             self.shares[0]["id"], access_type, access_to, 'ro')
         self.shares_v2_client.wait_for_access_rule_status(
@@ -159,7 +167,7 @@
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
     def test_create_replica_add_access_rule_delete_replica(self):
-        access_type, access_to = self._verify_config_and_set_access_rule_data()
+        access_type, access_to = self._get_access_rule_data_from_config()
         # Create the replica
         share_replica = self._verify_create_replica()
 
@@ -208,42 +216,40 @@
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
     def test_promote_in_sync_share_replica(self):
         # Test promote 'in_sync' share_replica to 'active' state
-        if (self.replication_type
-                not in constants.REPLICATION_PROMOTION_CHOICES):
-            msg = "Option backend_replication_type should be one of (%s)!"
-            raise self.skipException(
-                msg % ','.join(constants.REPLICATION_PROMOTION_CHOICES))
+        self._check_skip_promotion_tests()
         share = self.create_shares([self.creation_data])[0]
         original_replica = self.shares_v2_client.list_share_replicas(
             share["id"])[0]
-        # NOTE(Yogi1): Cleanup needs to be disabled for replica that is
-        # being promoted since it will become the 'primary'/'active' replica.
-        replica = self.create_share_replica(share["id"], self.replica_zone,
-                                            cleanup=False)
-        # Wait for replica state to update after creation
-        self.shares_v2_client.wait_for_share_replica_status(
-            replica['id'], constants.REPLICATION_STATE_IN_SYNC,
-            status_attr='replica_state')
-        # Promote the first in_sync replica to active state
-        promoted_replica = self.promote_share_replica(replica['id'])
-        # Delete the demoted replica so promoted replica can be cleaned
-        # during the cleanup of the share.
-        self.addCleanup(self.delete_share_replica, original_replica['id'])
-        self._verify_active_replica_count(share["id"])
-        # Verify the replica_state for promoted replica
-        promoted_replica = self.shares_v2_client.get_share_replica(
-            promoted_replica["id"])
-        self.assertEqual(constants.REPLICATION_STATE_ACTIVE,
-                         promoted_replica["replica_state"])
+        self._verify_in_sync_replica_promotion(share, original_replica)
+
+    @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
+    def test_add_rule_promote_share_replica_verify_rule(self):
+        # Verify the access rule stays intact after share replica promotion
+        self._check_skip_promotion_tests()
+
+        share = self.create_shares([self.creation_data])[0]
+        # Add access rule
+        access_type, access_to = self._get_access_rule_data_from_config()
+        rule = self.shares_v2_client.create_access_rule(
+            share["id"], access_type, access_to, 'ro')
+        self.shares_v2_client.wait_for_access_rule_status(
+            share["id"], rule["id"], constants.RULE_STATE_ACTIVE)
+
+        original_replica = self.shares_v2_client.list_share_replicas(
+            share["id"])[0]
+        self._verify_in_sync_replica_promotion(share, original_replica)
+
+        # verify rule's values
+        rules_list = self.shares_v2_client.list_access_rules(share["id"])
+        self.assertEqual(1, len(rules_list))
+        self.assertEqual(access_type, rules_list[0]["access_type"])
+        self.assertEqual(access_to, rules_list[0]["access_to"])
+        self.assertEqual('ro', rules_list[0]["access_level"])
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
     def test_promote_and_promote_back(self):
         # Test promote back and forth between 2 share replicas
-        if (self.replication_type
-                not in constants.REPLICATION_PROMOTION_CHOICES):
-            msg = "Option backend_replication_type should be one of (%s)!"
-            raise self.skipException(
-                msg % ','.join(constants.REPLICATION_PROMOTION_CHOICES))
+        self._check_skip_promotion_tests()
 
         # Create a new share
         share = self.create_shares([self.creation_data])[0]
diff --git a/manila_tempest_tests/tests/api/test_replication_negative.py b/manila_tempest_tests/tests/api/test_replication_negative.py
index 48b8d5c..c4d08aa 100644
--- a/manila_tempest_tests/tests/api/test_replication_negative.py
+++ b/manila_tempest_tests/tests/api/test_replication_negative.py
@@ -159,6 +159,22 @@
         # Try promoting the replica
         self.shares_v2_client.promote_share_replica(replica['id'])
 
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+    def test_add_access_rule_share_replica_error_status(self):
+        access_type, access_to = self._get_access_rule_data_from_config()
+        # Create the replica
+        share_replica = self.create_share_replica(self.share1["id"],
+                                                  self.replica_zone,
+                                                  cleanup_in_class=False)
+        # Reset the replica status to error
+        self.admin_client.reset_share_replica_status(
+            share_replica['id'], constants.STATUS_ERROR)
+
+        # Verify access rule cannot be added
+        self.assertRaises(lib_exc.BadRequest,
+                          self.admin_client.create_access_rule,
+                          self.share1["id"], access_type, access_to, 'ro')
+
 
 @testtools.skipUnless(CONF.share.run_replication_tests,
                       'Replication tests are disabled.')
diff --git a/manila_tempest_tests/tests/api/test_rules.py b/manila_tempest_tests/tests/api/test_rules.py
index 180ed79..024590d 100644
--- a/manila_tempest_tests/tests/api/test_rules.py
+++ b/manila_tempest_tests/tests/api/test_rules.py
@@ -13,6 +13,8 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+import itertools
+
 import ddt
 from tempest import config
 from tempest.lib import exceptions as lib_exc
@@ -44,6 +46,12 @@
     for key in ('deleted', 'deleted_at', 'instance_mappings'):
         self.assertNotIn(key, rule.keys())
 
+    # rules must start out in 'new' until 2.28 & 'queued_to_apply' after 2.28
+    if utils.is_microversion_le(version, "2.27"):
+        self.assertEqual("new", rule['state'])
+    else:
+        self.assertEqual("queued_to_apply", rule['state'])
+
     if utils.is_microversion_le(version, '2.9'):
         self.shares_client.wait_for_access_rule_status(
             self.share["id"], rule["id"], "active")
@@ -51,6 +59,11 @@
         self.shares_v2_client.wait_for_share_status(
             self.share["id"], "active", status_attr='access_rules_status',
             version=version)
+        # If the 'access_rules_status' transitions to 'active',
+        # rule state must too
+        rules = self.shares_v2_client.list_access_rules(self.share['id'])
+        rule = [r for r in rules if r['id'] == rule['id']][0]
+        self.assertEqual("active", rule['state'])
 
     if utils.is_microversion_eq(version, '1.0'):
         self.shares_client.delete_access_rule(self.share["id"], rule["id"])
@@ -79,7 +92,7 @@
         cls.access_to = "2.2.2.2"
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_access_rules_with_one_ip(self, version):
 
         # test data
@@ -98,6 +111,12 @@
         for key in ('deleted', 'deleted_at', 'instance_mappings'):
             self.assertNotIn(key, rule.keys())
 
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
+
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
                 self.share["id"], rule["id"], "active")
@@ -121,7 +140,7 @@
                 rule_id=rule["id"], share_id=self.share['id'], version=version)
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_access_rule_with_cidr(self, version):
 
         # test data
@@ -140,6 +159,12 @@
             self.assertNotIn(key, rule.keys())
         self.assertEqual('rw', rule['access_level'])
 
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
+
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
                 self.share["id"], rule["id"], "active")
@@ -166,7 +191,7 @@
     @testtools.skipIf(
         "nfs" not in CONF.share.enable_ro_access_level_for_protocols,
         "RO access rule tests are disabled for NFS protocol.")
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_ro_access_rule(self, client_name):
         _create_delete_ro_access_rule(self, client_name)
 
@@ -179,7 +204,7 @@
     @testtools.skipIf(
         "cifs" not in CONF.share.enable_ro_access_level_for_protocols,
         "RO access rule tests are disabled for CIFS protocol.")
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_ro_access_rule(self, version):
         _create_delete_ro_access_rule(self, version)
 
@@ -201,7 +226,7 @@
         cls.access_to = CONF.share.username_for_user_rules
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_user_rule(self, version):
 
         # create rule
@@ -217,6 +242,12 @@
         for key in ('deleted', 'deleted_at', 'instance_mappings'):
             self.assertNotIn(key, rule.keys())
 
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
+
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
                 self.share["id"], rule["id"], "active")
@@ -243,7 +274,7 @@
     @testtools.skipIf(
         "nfs" not in CONF.share.enable_ro_access_level_for_protocols,
         "RO access rule tests are disabled for NFS protocol.")
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_ro_access_rule(self, version):
         _create_delete_ro_access_rule(self, version)
 
@@ -256,7 +287,7 @@
     @testtools.skipIf(
         "cifs" not in CONF.share.enable_ro_access_level_for_protocols,
         "RO access rule tests are disabled for CIFS protocol.")
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_ro_access_rule(self, version):
         _create_delete_ro_access_rule(self, version)
 
@@ -280,7 +311,7 @@
         cls.access_to = "client1.com"
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_cert_rule(self, version):
 
         # create rule
@@ -296,6 +327,12 @@
         for key in ('deleted', 'deleted_at', 'instance_mappings'):
             self.assertNotIn(key, rule.keys())
 
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
+
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
                 self.share["id"], rule["id"], "active")
@@ -322,7 +359,7 @@
     @testtools.skipIf(
         "glusterfs" not in CONF.share.enable_ro_access_level_for_protocols,
         "RO access rule tests are disabled for GLUSTERFS protocol.")
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_create_delete_cert_ro_access_rule(self, version):
         if utils.is_microversion_eq(version, '1.0'):
             rule = self.shares_client.create_access_rule(
@@ -336,6 +373,12 @@
         for key in ('deleted', 'deleted_at', 'instance_mappings'):
             self.assertNotIn(key, rule.keys())
 
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
+
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
                 self.share["id"], rule["id"], "active")
@@ -377,10 +420,13 @@
         cls.access_to = "bob"
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
-    @ddt.data("alice", "alice_bob", "alice bob")
-    def test_create_delete_cephx_rule(self, access_to):
+    @ddt.data(*itertools.product(
+        set(['2.13', '2.27', '2.28', LATEST_MICROVERSION]),
+        ("alice", "alice_bob", "alice bob")))
+    @ddt.unpack
+    def test_create_delete_cephx_rule(self, version, access_to):
         rule = self.shares_v2_client.create_access_rule(
-            self.share["id"], self.access_type, access_to)
+            self.share["id"], self.access_type, access_to, version=version)
 
         self.assertEqual('rw', rule['access_level'])
         for key in ('deleted', 'deleted_at', 'instance_mappings'):
@@ -388,7 +434,8 @@
         self.shares_v2_client.wait_for_access_rule_status(
             self.share["id"], rule["id"], "active")
 
-        self.shares_v2_client.delete_access_rule(self.share["id"], rule["id"])
+        self.shares_v2_client.delete_access_rule(
+            self.share["id"], rule["id"], version=version)
         self.shares_v2_client.wait_for_resource_deletion(
             rule_id=rule["id"], share_id=self.share['id'])
 
@@ -429,7 +476,7 @@
         cls.share = cls.create_share()
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_list_access_rules(self, version):
         if (utils.is_microversion_lt(version, '2.13') and
                 CONF.share.enable_cephx_rules_for_protocols):
@@ -445,6 +492,11 @@
             rule = self.shares_v2_client.create_access_rule(
                 self.share["id"], self.access_type, self.access_to,
                 version=version)
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
 
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
@@ -499,7 +551,7 @@
                 rule_id=rule["id"], share_id=self.share['id'], version=version)
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
-    @ddt.data('1.0', '2.9', LATEST_MICROVERSION)
+    @ddt.data(*set(['1.0', '2.9', '2.27', '2.28', LATEST_MICROVERSION]))
     def test_access_rules_deleted_if_share_deleted(self, version):
         if (utils.is_microversion_lt(version, '2.13') and
                 CONF.share.enable_cephx_rules_for_protocols):
@@ -519,6 +571,12 @@
                 share["id"], self.access_type, self.access_to,
                 version=version)
 
+        # rules must start out in 'new' until 2.28 & 'queued_to_apply' after
+        if utils.is_microversion_le(version, "2.27"):
+            self.assertEqual("new", rule['state'])
+        else:
+            self.assertEqual("queued_to_apply", rule['state'])
+
         if utils.is_microversion_eq(version, '1.0'):
             self.shares_client.wait_for_access_rule_status(
                 share["id"], rule["id"], "active")
diff --git a/manila_tempest_tests/tests/api/test_rules_negative.py b/manila_tempest_tests/tests/api/test_rules_negative.py
index 9cd4708..20db4eb 100644
--- a/manila_tempest_tests/tests/api/test_rules_negative.py
+++ b/manila_tempest_tests/tests/api/test_rules_negative.py
@@ -27,12 +27,13 @@
 
 
 @ddt.ddt
-class ShareIpRulesForNFSNegativeTest(base.BaseSharesTest):
+class ShareIpRulesForNFSNegativeTest(base.BaseSharesMixedTest):
     protocol = "nfs"
 
     @classmethod
     def resource_setup(cls):
         super(ShareIpRulesForNFSNegativeTest, cls).resource_setup()
+        cls.admin_client = cls.admin_shares_v2_client
         if not (cls.protocol in CONF.share.enable_protocols and
                 cls.protocol in CONF.share.enable_ip_rules_for_protocols):
             msg = "IP rule tests for %s protocol are disabled" % cls.protocol
@@ -158,6 +159,23 @@
             self.shares_v2_client.wait_for_resource_deletion(
                 rule_id=rule["id"], share_id=self.share["id"], version=version)
 
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+    def test_add_access_rule_on_share_with_no_host(self):
+        access_type, access_to = self._get_access_rule_data_from_config()
+        extra_specs = self.add_extra_specs_to_dict(
+            {"share_backend_name": 'invalid_backend'})
+        share_type = self.create_share_type('invalid_backend',
+                                            extra_specs=extra_specs,
+                                            client=self.admin_client)
+        share_type = share_type['share_type']
+        share = self.create_share(share_type_id=share_type['id'],
+                                  client=self.admin_client,
+                                  cleanup_in_class=False,
+                                  wait_for_status=False)
+        self.assertRaises(lib_exc.BadRequest,
+                          self.admin_client.create_access_rule,
+                          share["id"], access_type, access_to)
+
 
 @ddt.ddt
 class ShareIpRulesForCIFSNegativeTest(ShareIpRulesForNFSNegativeTest):