Merge "Replace http with https for doc links"
diff --git a/manila_tempest_tests/tests/scenario/manager_share.py b/manila_tempest_tests/tests/scenario/manager_share.py
index dac801e..5c8cbbf 100644
--- a/manila_tempest_tests/tests/scenario/manager_share.py
+++ b/manila_tempest_tests/tests/scenario/manager_share.py
@@ -136,18 +136,20 @@
sn['id'])
return sn
- def _allow_access(self, share_id, client=None,
- access_type="ip", access_to="0.0.0.0", cleanup=True):
+ def _allow_access(self, share_id, client=None, access_type="ip",
+ access_level="rw", access_to="0.0.0.0", cleanup=True):
"""Allow share access
:param share_id: id of the share
:param client: client object
:param access_type: "ip", "user" or "cert"
+ :param access_level: "rw" or "ro"
:param access_to
:returns: access object
"""
client = client or self.shares_client
- access = client.create_access_rule(share_id, access_type, access_to)
+ access = client.create_access_rule(share_id, access_type, access_to,
+ access_level)
# NOTE(u_glide): Ignore provided client, because we always need v2
# client to make this call
@@ -158,6 +160,17 @@
self.addCleanup(client.delete_access_rule, share_id, access['id'])
return access
+ def _deny_access(self, share_id, rule_id, client=None):
+ """Deny share access
+
+ :param share_id: id of the share
+ :param rule_id: id of the rule that will be deleted
+ """
+ client = client or self.shares_client
+ client.delete_access_rule(share_id, rule_id)
+ self.shares_v2_client.wait_for_share_status(
+ share_id, "active", status_attr='access_rules_status')
+
def _allow_access_snapshot(self, snapshot_id, access_type="ip",
access_to="0.0.0.0/0", cleanup=True):
"""Allow snapshot access
diff --git a/manila_tempest_tests/tests/scenario/test_share_basic_ops.py b/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
index 8c44b11..a72d538 100644
--- a/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
+++ b/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
@@ -206,8 +206,8 @@
self.share = self._create_share(**kwargs)
return self.share
- def allow_access_ip(self, share_id, ip=None, instance=None, cleanup=True,
- snapshot=None):
+ def allow_access_ip(self, share_id, ip=None, instance=None,
+ access_level="rw", cleanup=True, snapshot=None):
if instance and not ip:
try:
net_addresses = instance['addresses']
@@ -225,16 +225,21 @@
self._allow_access_snapshot(snapshot['id'], access_type='ip',
access_to=ip, cleanup=cleanup)
else:
- self._allow_access(share_id, access_type='ip', access_to=ip,
- cleanup=cleanup, client=self.shares_v2_client)
+ return self._allow_access(share_id, access_type='ip',
+ access_level=access_level, access_to=ip,
+ cleanup=cleanup,
+ client=self.shares_v2_client)
+
+ def deny_access(self, share_id, access_rule_id):
+ self._deny_access(share_id, access_rule_id)
def provide_access_to_auxiliary_instance(self, instance, share=None,
- snapshot=None):
+ snapshot=None, access_level='rw'):
share = share or self.share
if self.protocol.lower() == 'cifs':
- self.allow_access_ip(
+ return self.allow_access_ip(
share['id'], instance=instance, cleanup=False,
- snapshot=snapshot)
+ snapshot=snapshot, access_level=access_level)
elif not CONF.share.multitenancy_enabled:
if self.use_ipv6:
server_ip = self._get_ipv6_server_ip(instance)
@@ -242,14 +247,15 @@
server_ip = (CONF.share.override_ip_for_nfs_access or
self.floatings[instance['id']]['ip'])
self.assertIsNotNone(server_ip)
- self.allow_access_ip(
+ return self.allow_access_ip(
share['id'], ip=server_ip,
- instance=instance, cleanup=False, snapshot=snapshot)
+ instance=instance, cleanup=False, snapshot=snapshot,
+ access_level=access_level)
elif (CONF.share.multitenancy_enabled and
self.protocol.lower() == 'nfs'):
- self.allow_access_ip(
+ return self.allow_access_ip(
share['id'], instance=instance, cleanup=False,
- snapshot=snapshot)
+ snapshot=snapshot, access_level=access_level)
def wait_for_active_instance(self, instance_id):
waiters.wait_for_server_status(
@@ -340,6 +346,31 @@
return locations
+ @tc.attr(base.TAG_NEGATIVE, base.TAG_BACKEND)
+ def test_write_with_ro_access(self):
+ '''Test if an instance with ro access can write on the share.'''
+ test_data = "Some test data to write"
+
+ instance = self.boot_instance(wait_until="BUILD")
+ self.create_share()
+ location = self._get_user_export_locations(self.share)[0]
+ instance = self.wait_for_active_instance(instance["id"])
+
+ ssh_client_inst = self.init_ssh(instance)
+
+ # First, check if write works RW access.
+ acc_rule_id = self.provide_access_to_auxiliary_instance(instance)['id']
+ self.mount_share(location, ssh_client_inst)
+ self.write_data(test_data, ssh_client_inst)
+ self.deny_access(self.share['id'], acc_rule_id)
+
+ self.provide_access_to_auxiliary_instance(instance, access_level='ro')
+ self.addCleanup(self.umount_share, ssh_client_inst)
+
+ # Test if write with RO access fails.
+ self.assertRaises(exceptions.SSHExecCommandFailed,
+ self.write_data, test_data, ssh_client_inst)
+
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_read_write_two_vms(self):
"""Boots two vms and writes/reads data on it."""