[Tempest] Add scenario test creating share from snapshot
Add tempest scenario test where we create share from snapshot and
test its relations to source share. Design for this test is available
at 'Add spec for Scenario tests' spec [1].
[1] I224a52521033b47574ff5fd5a94b096c91593aa7
Change-Id: I9863ea70977453b3e7492164002b983f3d9944ab
diff --git a/manila_tempest_tests/tests/scenario/manager_share.py b/manila_tempest_tests/tests/scenario/manager_share.py
index 43ada92..01be412 100644
--- a/manila_tempest_tests/tests/scenario/manager_share.py
+++ b/manila_tempest_tests/tests/scenario/manager_share.py
@@ -97,6 +97,15 @@
client.wait_for_share_status(share['id'], 'available')
return share
+ def _create_snapshot(self, share_id, client=None, **kwargs):
+ client = client or self.shares_v2_client
+ snapshot = client.create_snapshot(share_id, **kwargs)
+ self.addCleanup(
+ client.wait_for_resource_deletion, snapshot_id=snapshot['id'])
+ self.addCleanup(client.delete_snapshot, snapshot['id'])
+ client.wait_for_snapshot_status(snapshot["id"], "available")
+ return snapshot
+
def _wait_for_share_server_deletion(self, sn_id, client=None):
"""Wait for a share server to be deleted
diff --git a/manila_tempest_tests/tests/scenario/test_share_basic_ops.py b/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
index 10218a5..e83f78d 100644
--- a/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
+++ b/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
@@ -112,11 +112,12 @@
ssh_client.exec_command("ping -c 1 %s" % server_ip)
return ssh_client
- def mount_share(self, location, ssh_client):
+ def mount_share(self, location, ssh_client, target_dir=None):
raise NotImplementedError
- def umount_share(self, ssh_client):
- ssh_client.exec_command("sudo umount /mnt")
+ def umount_share(self, ssh_client, target_dir=None):
+ target_dir = target_dir or "/mnt"
+ ssh_client.exec_command("sudo umount %s" % target_dir)
def write_data(self, data, ssh_client):
ssh_client.exec_command("echo \"%s\" | sudo tee /mnt/t1 && sudo sync" %
@@ -154,14 +155,16 @@
'driver_handles_share_servers': CONF.share.multitenancy_enabled
},)['share_type']
- def create_share(self):
- kwargs = {
+ def create_share(self, **kwargs):
+ kwargs.update({
'share_protocol': self.protocol,
- 'share_type_id': self._get_share_type()['id'],
- }
+ })
+ if not ('share_type_id' in kwargs or 'snapshot_id' in kwargs):
+ kwargs.update({'share_type_id': self._get_share_type()['id']})
if CONF.share.multitenancy_enabled:
kwargs.update({'share_network_id': self.share_net['id']})
self.share = self._create_share(**kwargs)
+ return self.share
def allow_access_ip(self, share_id, ip=None, instance=None, cleanup=True):
if instance and not ip:
@@ -343,21 +346,116 @@
self.assertIn('1m4.bin', output)
self.assertIn('1m5.bin', output)
+ def _get_user_export_location(self, share):
+ if utils.is_microversion_lt(CONF.share.max_api_microversion, "2.9"):
+ user_export_location = share['export_locations'][0]
+ else:
+ exports = self.shares_v2_client.list_share_export_locations(
+ share['id'])
+ locations = [x['path'] for x in exports]
+ user_export_location = locations[0]
+ return user_export_location
+
+ @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
+ @testtools.skipUnless(
+ CONF.share.run_snapshot_tests, "Snapshot tests are disabled.")
+ def test_write_data_to_share_created_from_snapshot(self):
+ if self.protocol.upper() == 'CIFS':
+ msg = "Skipped for CIFS protocol because of bug/1649573"
+ raise self.skipException(msg)
+
+ # 1 - Create UVM, ok, created
+ instance = self.boot_instance(wait_until="BUILD")
+
+ # 2 - Create share S1, ok, created
+ parent_share = self.create_share()
+ instance = self.wait_for_active_instance(instance["id"])
+ self.addCleanup(self.servers_client.delete_server, instance['id'])
+
+ # 3 - Provide RW access to S1, ok, provided
+ self.allow_access_ip(
+ parent_share['id'], instance=instance, cleanup=False)
+
+ # 4 - SSH to UVM, ok, connected
+ ssh_client = self.init_ssh(instance)
+
+ # 5 - Try mount S1 to UVM, ok, mounted
+ user_export_location = self._get_user_export_location(parent_share)
+ parent_share_dir = "/mnt/parent"
+ ssh_client.exec_command("sudo mkdir -p %s" % parent_share_dir)
+ self.mount_share(user_export_location, ssh_client, parent_share_dir)
+ self.addCleanup(self.umount_share, ssh_client, parent_share_dir)
+
+ # 6 - Create "file1", ok, created
+ ssh_client.exec_command("sudo touch %s/file1" % parent_share_dir)
+
+ # 7 - Create snapshot SS1 from S1, ok, created
+ snapshot = self._create_snapshot(parent_share['id'])
+
+ # 8 - Create "file2" in share S1 - ok, created. We expect that
+ # snapshot will not contain any data created after snapshot creation.
+ ssh_client.exec_command("sudo touch %s/file2" % parent_share_dir)
+
+ # 9 - Create share S2 from SS1, ok, created
+ child_share = self.create_share(snapshot_id=snapshot["id"])
+
+ # 10 - Try mount S2 - fail, access denied. We test that child share
+ # did not get access rules from parent share.
+ user_export_location = self._get_user_export_location(child_share)
+ child_share_dir = "/mnt/child"
+ ssh_client.exec_command("sudo mkdir -p %s" % child_share_dir)
+ self.assertRaises(
+ exceptions.SSHExecCommandFailed,
+ self.mount_share,
+ user_export_location, ssh_client, child_share_dir,
+ )
+
+ # 11 - Provide RW access to S2, ok, provided
+ self.allow_access_ip(
+ child_share['id'], instance=instance, cleanup=False)
+
+ # 12 - Try mount S2, ok, mounted
+ self.mount_share(user_export_location, ssh_client, child_share_dir)
+ self.addCleanup(self.umount_share, ssh_client, child_share_dir)
+
+ # 13 - List files on S2, only "file1" exists
+ output = ssh_client.exec_command("sudo ls -lRA %s" % child_share_dir)
+ self.assertIn('file1', output)
+ self.assertNotIn('file2', output)
+
+ # 14 - Create file3 on S2, ok, file created
+ ssh_client.exec_command("sudo touch %s/file3" % child_share_dir)
+
+ # 15 - List files on S1, two files exist - "file1" and "file2"
+ output = ssh_client.exec_command("sudo ls -lRA %s" % parent_share_dir)
+ self.assertIn('file1', output)
+ self.assertIn('file2', output)
+ self.assertNotIn('file3', output)
+
+ # 16 - List files on S2, two files exist - "file1" and "file3"
+ output = ssh_client.exec_command("sudo ls -lRA %s" % child_share_dir)
+ self.assertIn('file1', output)
+ self.assertNotIn('file2', output)
+ self.assertIn('file3', output)
+
class TestShareBasicOpsNFS(ShareBasicOpsBase):
protocol = "NFS"
- def mount_share(self, location, ssh_client):
- ssh_client.exec_command("sudo mount -vt nfs \"%s\" /mnt" % location)
+ def mount_share(self, location, ssh_client, target_dir=None):
+ target_dir = target_dir or "/mnt"
+ ssh_client.exec_command(
+ "sudo mount -vt nfs \"%s\" %s" % (location, target_dir))
class TestShareBasicOpsCIFS(ShareBasicOpsBase):
protocol = "CIFS"
- def mount_share(self, location, ssh_client):
+ def mount_share(self, location, ssh_client, target_dir=None):
location = location.replace("\\", "/")
+ target_dir = target_dir or "/mnt"
ssh_client.exec_command(
- "sudo mount.cifs \"%s\" /mnt -o guest" % location
+ "sudo mount.cifs \"%s\" %s -o guest" % (location, target_dir)
)