[scenario] Add test case to check for RO access
Test if a compute instance with RO access granted to it via floating
IP address fails to write on the share. Also, add the capability to
pass the access level to the methods that allow access to shares.
Change-Id: I1aef8f1ed1b93c0847a6aa982f62c8e539d04337
Signed-off-by: Rishabh Dave <ridave@redhat.com>
diff --git a/manila_tempest_tests/tests/scenario/manager_share.py b/manila_tempest_tests/tests/scenario/manager_share.py
index dac801e..5c8cbbf 100644
--- a/manila_tempest_tests/tests/scenario/manager_share.py
+++ b/manila_tempest_tests/tests/scenario/manager_share.py
@@ -136,18 +136,20 @@
sn['id'])
return sn
- def _allow_access(self, share_id, client=None,
- access_type="ip", access_to="0.0.0.0", cleanup=True):
+ def _allow_access(self, share_id, client=None, access_type="ip",
+ access_level="rw", access_to="0.0.0.0", cleanup=True):
"""Allow share access
:param share_id: id of the share
:param client: client object
:param access_type: "ip", "user" or "cert"
+ :param access_level: "rw" or "ro"
:param access_to
:returns: access object
"""
client = client or self.shares_client
- access = client.create_access_rule(share_id, access_type, access_to)
+ access = client.create_access_rule(share_id, access_type, access_to,
+ access_level)
# NOTE(u_glide): Ignore provided client, because we always need v2
# client to make this call
@@ -158,6 +160,17 @@
self.addCleanup(client.delete_access_rule, share_id, access['id'])
return access
+ def _deny_access(self, share_id, rule_id, client=None):
+ """Deny share access
+
+ :param share_id: id of the share
+ :param rule_id: id of the rule that will be deleted
+ """
+ client = client or self.shares_client
+ client.delete_access_rule(share_id, rule_id)
+ self.shares_v2_client.wait_for_share_status(
+ share_id, "active", status_attr='access_rules_status')
+
def _allow_access_snapshot(self, snapshot_id, access_type="ip",
access_to="0.0.0.0/0", cleanup=True):
"""Allow snapshot access
diff --git a/manila_tempest_tests/tests/scenario/test_share_basic_ops.py b/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
index 8c44b11..a72d538 100644
--- a/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
+++ b/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
@@ -206,8 +206,8 @@
self.share = self._create_share(**kwargs)
return self.share
- def allow_access_ip(self, share_id, ip=None, instance=None, cleanup=True,
- snapshot=None):
+ def allow_access_ip(self, share_id, ip=None, instance=None,
+ access_level="rw", cleanup=True, snapshot=None):
if instance and not ip:
try:
net_addresses = instance['addresses']
@@ -225,16 +225,21 @@
self._allow_access_snapshot(snapshot['id'], access_type='ip',
access_to=ip, cleanup=cleanup)
else:
- self._allow_access(share_id, access_type='ip', access_to=ip,
- cleanup=cleanup, client=self.shares_v2_client)
+ return self._allow_access(share_id, access_type='ip',
+ access_level=access_level, access_to=ip,
+ cleanup=cleanup,
+ client=self.shares_v2_client)
+
+ def deny_access(self, share_id, access_rule_id):
+ self._deny_access(share_id, access_rule_id)
def provide_access_to_auxiliary_instance(self, instance, share=None,
- snapshot=None):
+ snapshot=None, access_level='rw'):
share = share or self.share
if self.protocol.lower() == 'cifs':
- self.allow_access_ip(
+ return self.allow_access_ip(
share['id'], instance=instance, cleanup=False,
- snapshot=snapshot)
+ snapshot=snapshot, access_level=access_level)
elif not CONF.share.multitenancy_enabled:
if self.use_ipv6:
server_ip = self._get_ipv6_server_ip(instance)
@@ -242,14 +247,15 @@
server_ip = (CONF.share.override_ip_for_nfs_access or
self.floatings[instance['id']]['ip'])
self.assertIsNotNone(server_ip)
- self.allow_access_ip(
+ return self.allow_access_ip(
share['id'], ip=server_ip,
- instance=instance, cleanup=False, snapshot=snapshot)
+ instance=instance, cleanup=False, snapshot=snapshot,
+ access_level=access_level)
elif (CONF.share.multitenancy_enabled and
self.protocol.lower() == 'nfs'):
- self.allow_access_ip(
+ return self.allow_access_ip(
share['id'], instance=instance, cleanup=False,
- snapshot=snapshot)
+ snapshot=snapshot, access_level=access_level)
def wait_for_active_instance(self, instance_id):
waiters.wait_for_server_status(
@@ -340,6 +346,31 @@
return locations
+ @tc.attr(base.TAG_NEGATIVE, base.TAG_BACKEND)
+ def test_write_with_ro_access(self):
+ '''Test if an instance with ro access can write on the share.'''
+ test_data = "Some test data to write"
+
+ instance = self.boot_instance(wait_until="BUILD")
+ self.create_share()
+ location = self._get_user_export_locations(self.share)[0]
+ instance = self.wait_for_active_instance(instance["id"])
+
+ ssh_client_inst = self.init_ssh(instance)
+
+ # First, check if write works RW access.
+ acc_rule_id = self.provide_access_to_auxiliary_instance(instance)['id']
+ self.mount_share(location, ssh_client_inst)
+ self.write_data(test_data, ssh_client_inst)
+ self.deny_access(self.share['id'], acc_rule_id)
+
+ self.provide_access_to_auxiliary_instance(instance, access_level='ro')
+ self.addCleanup(self.umount_share, ssh_client_inst)
+
+ # Test if write with RO access fails.
+ self.assertRaises(exceptions.SSHExecCommandFailed,
+ self.write_data, test_data, ssh_client_inst)
+
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
def test_read_write_two_vms(self):
"""Boots two vms and writes/reads data on it."""