Merge "Replace http with https for doc links"
diff --git a/manila_tempest_tests/tests/scenario/manager_share.py b/manila_tempest_tests/tests/scenario/manager_share.py
index dac801e..5c8cbbf 100644
--- a/manila_tempest_tests/tests/scenario/manager_share.py
+++ b/manila_tempest_tests/tests/scenario/manager_share.py
@@ -136,18 +136,20 @@
                         sn['id'])
         return sn
 
-    def _allow_access(self, share_id, client=None,
-                      access_type="ip", access_to="0.0.0.0", cleanup=True):
+    def _allow_access(self, share_id, client=None, access_type="ip",
+                      access_level="rw", access_to="0.0.0.0", cleanup=True):
         """Allow share access
 
         :param share_id: id of the share
         :param client: client object
         :param access_type: "ip", "user" or "cert"
+        :param access_level: "rw" or "ro"
         :param access_to
         :returns: access object
         """
         client = client or self.shares_client
-        access = client.create_access_rule(share_id, access_type, access_to)
+        access = client.create_access_rule(share_id, access_type, access_to,
+                                           access_level)
 
         # NOTE(u_glide): Ignore provided client, because we always need v2
         # client to make this call
@@ -158,6 +160,17 @@
             self.addCleanup(client.delete_access_rule, share_id, access['id'])
         return access
 
+    def _deny_access(self, share_id, rule_id, client=None):
+        """Deny share access
+
+        :param share_id: id of the share
+        :param rule_id: id of the rule that will be deleted
+        """
+        client = client or self.shares_client
+        client.delete_access_rule(share_id, rule_id)
+        self.shares_v2_client.wait_for_share_status(
+            share_id, "active", status_attr='access_rules_status')
+
     def _allow_access_snapshot(self, snapshot_id, access_type="ip",
                                access_to="0.0.0.0/0", cleanup=True):
         """Allow snapshot access
diff --git a/manila_tempest_tests/tests/scenario/test_share_basic_ops.py b/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
index 8c44b11..a72d538 100644
--- a/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
+++ b/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
@@ -206,8 +206,8 @@
         self.share = self._create_share(**kwargs)
         return self.share
 
-    def allow_access_ip(self, share_id, ip=None, instance=None, cleanup=True,
-                        snapshot=None):
+    def allow_access_ip(self, share_id, ip=None, instance=None,
+                        access_level="rw", cleanup=True, snapshot=None):
         if instance and not ip:
             try:
                 net_addresses = instance['addresses']
@@ -225,16 +225,21 @@
             self._allow_access_snapshot(snapshot['id'], access_type='ip',
                                         access_to=ip, cleanup=cleanup)
         else:
-            self._allow_access(share_id, access_type='ip', access_to=ip,
-                               cleanup=cleanup, client=self.shares_v2_client)
+            return self._allow_access(share_id, access_type='ip',
+                                      access_level=access_level, access_to=ip,
+                                      cleanup=cleanup,
+                                      client=self.shares_v2_client)
+
+    def deny_access(self, share_id, access_rule_id):
+        self._deny_access(share_id, access_rule_id)
 
     def provide_access_to_auxiliary_instance(self, instance, share=None,
-                                             snapshot=None):
+                                             snapshot=None, access_level='rw'):
         share = share or self.share
         if self.protocol.lower() == 'cifs':
-            self.allow_access_ip(
+            return self.allow_access_ip(
                 share['id'], instance=instance, cleanup=False,
-                snapshot=snapshot)
+                snapshot=snapshot, access_level=access_level)
         elif not CONF.share.multitenancy_enabled:
             if self.use_ipv6:
                 server_ip = self._get_ipv6_server_ip(instance)
@@ -242,14 +247,15 @@
                 server_ip = (CONF.share.override_ip_for_nfs_access or
                              self.floatings[instance['id']]['ip'])
             self.assertIsNotNone(server_ip)
-            self.allow_access_ip(
+            return self.allow_access_ip(
                 share['id'], ip=server_ip,
-                instance=instance, cleanup=False, snapshot=snapshot)
+                instance=instance, cleanup=False, snapshot=snapshot,
+                access_level=access_level)
         elif (CONF.share.multitenancy_enabled and
               self.protocol.lower() == 'nfs'):
-            self.allow_access_ip(
+            return self.allow_access_ip(
                 share['id'], instance=instance, cleanup=False,
-                snapshot=snapshot)
+                snapshot=snapshot, access_level=access_level)
 
     def wait_for_active_instance(self, instance_id):
         waiters.wait_for_server_status(
@@ -340,6 +346,31 @@
 
         return locations
 
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_BACKEND)
+    def test_write_with_ro_access(self):
+        '''Test if an instance with ro access can write on the share.'''
+        test_data = "Some test data to write"
+
+        instance = self.boot_instance(wait_until="BUILD")
+        self.create_share()
+        location = self._get_user_export_locations(self.share)[0]
+        instance = self.wait_for_active_instance(instance["id"])
+
+        ssh_client_inst = self.init_ssh(instance)
+
+        # First, check if write works RW access.
+        acc_rule_id = self.provide_access_to_auxiliary_instance(instance)['id']
+        self.mount_share(location, ssh_client_inst)
+        self.write_data(test_data, ssh_client_inst)
+        self.deny_access(self.share['id'], acc_rule_id)
+
+        self.provide_access_to_auxiliary_instance(instance, access_level='ro')
+        self.addCleanup(self.umount_share, ssh_client_inst)
+
+        # Test if write with RO access fails.
+        self.assertRaises(exceptions.SSHExecCommandFailed,
+                          self.write_data, test_data, ssh_client_inst)
+
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
     def test_read_write_two_vms(self):
         """Boots two vms and writes/reads data on it."""