Merge "Mountable snapshots scenario tests"
diff --git a/manila_tempest_tests/tests/scenario/manager_share.py b/manila_tempest_tests/tests/scenario/manager_share.py
index 279a924..b29449a 100644
--- a/manila_tempest_tests/tests/scenario/manager_share.py
+++ b/manila_tempest_tests/tests/scenario/manager_share.py
@@ -166,6 +166,27 @@
             self.addCleanup(client.delete_access_rule, share_id, access['id'])
         return access
 
+    def _allow_access_snapshot(self, snapshot_id, access_type="ip",
+                               access_to="0.0.0.0/0", cleanup=True):
+        """Allow snapshot access
+
+        :param snapshot_id: id of the snapshot
+        :param access_type: "ip", "user" or "cert"
+        :param access_to
+        :returns: access object
+        """
+        access = self.shares_v2_client.create_snapshot_access_rule(
+            snapshot_id, access_type, access_to)
+
+        if cleanup:
+            self.addCleanup(self.shares_v2_client.delete_snapshot_access_rule,
+                            snapshot_id, access['id'])
+
+        self.shares_v2_client.wait_for_snapshot_access_rule_status(
+            snapshot_id, access['id'])
+
+        return access
+
     def _create_router_interface(self, subnet_id, client=None, router_id=None):
         """Create a router interface
 
diff --git a/manila_tempest_tests/tests/scenario/test_share_basic_ops.py b/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
index ef9a3d8..4542de6 100644
--- a/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
+++ b/manila_tempest_tests/tests/scenario/test_share_basic_ops.py
@@ -181,7 +181,8 @@
         self.share = self._create_share(**kwargs)
         return self.share
 
-    def allow_access_ip(self, share_id, ip=None, instance=None, cleanup=True):
+    def allow_access_ip(self, share_id, ip=None, instance=None, cleanup=True,
+                        snapshot=None):
         if instance and not ip:
             try:
                 net_addresses = instance['addresses']
@@ -194,20 +195,30 @@
                               "Falling back to default")
         if not ip:
             ip = '0.0.0.0/0'
-        self._allow_access(share_id, access_type='ip', access_to=ip,
-                           cleanup=cleanup)
 
-    def provide_access_to_auxiliary_instance(self, instance, share=None):
+        if snapshot:
+            self._allow_access_snapshot(snapshot['id'], access_type='ip',
+                                        access_to=ip, cleanup=cleanup)
+        else:
+            self._allow_access(share_id, access_type='ip', access_to=ip,
+                               cleanup=cleanup)
+
+    def provide_access_to_auxiliary_instance(self, instance, share=None,
+                                             snapshot=None):
         share = share or self.share
         if self.protocol.lower() == 'cifs':
-            self.allow_access_ip(share['id'], instance=instance, cleanup=False)
+            self.allow_access_ip(
+                share['id'], instance=instance, cleanup=False,
+                snapshot=snapshot)
         elif not CONF.share.multitenancy_enabled:
             self.allow_access_ip(
                 share['id'], ip=self.floatings[instance['id']]['ip'],
-                instance=instance, cleanup=False)
+                instance=instance, cleanup=False, snapshot=snapshot)
         elif (CONF.share.multitenancy_enabled and
               self.protocol.lower() == 'nfs'):
-            self.allow_access_ip(share['id'], instance=instance, cleanup=False)
+            self.allow_access_ip(
+                share['id'], instance=instance, cleanup=False,
+                snapshot=snapshot)
 
     def wait_for_active_instance(self, instance_id):
         waiters.wait_for_server_status(
@@ -386,14 +397,23 @@
         self.assertIn('1m4.bin', output)
         self.assertIn('1m5.bin', output)
 
-    def _get_user_export_location(self, share):
-        if utils.is_microversion_lt(CONF.share.max_api_microversion, "2.9"):
-            user_export_location = share['export_locations'][0]
-        else:
-            exports = self.shares_v2_client.list_share_export_locations(
-                share['id'])
+    def _get_user_export_location(self, share=None, snapshot=None):
+        user_export_location = None
+        if share:
+            if utils.is_microversion_lt(
+                    CONF.share.max_api_microversion, "2.9"):
+                user_export_location = share['export_locations'][0]
+            else:
+                exports = self.shares_v2_client.list_share_export_locations(
+                    share['id'])
+                locations = [x['path'] for x in exports]
+                user_export_location = locations[0]
+        elif snapshot:
+            exports = (self.shares_v2_client.
+                       list_snapshot_export_locations(snapshot['id']))
             locations = [x['path'] for x in exports]
             user_export_location = locations[0]
+        self.assertIsNotNone(user_export_location)
         return user_export_location
 
     @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
@@ -476,6 +496,72 @@
         self.assertNotIn('file2', output)
         self.assertIn('file3', output)
 
+    @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
+    @base.skip_if_microversion_lt("2.32")
+    @testtools.skipUnless(CONF.share.run_mount_snapshot_tests,
+                          'Mountable snapshots tests are disabled.')
+    @testtools.skipUnless(CONF.share.run_snapshot_tests,
+                          "Snapshot tests are disabled.")
+    def test_read_mountable_snapshot(self):
+        if self.protocol.upper() == 'CIFS':
+            msg = "Skipped for CIFS protocol because of bug/1649573"
+            raise self.skipException(msg)
+
+        # 1 - Create UVM, ok, created
+        instance = self.boot_instance(wait_until="BUILD")
+
+        # 2 - Create share S1, ok, created
+        parent_share = self.create_share()
+        instance = self.wait_for_active_instance(instance["id"])
+        self.addCleanup(self.servers_client.delete_server, instance['id'])
+
+        # 3 - SSH to UVM, ok, connected
+        ssh_client = self.init_ssh(instance)
+
+        # 4 - Provide RW access to S1, ok, provided
+        self.provide_access_to_auxiliary_instance(instance, parent_share)
+
+        # 5 - Try mount S1 to UVM, ok, mounted
+        user_export_location = self._get_user_export_location(parent_share)
+        parent_share_dir = "/mnt/parent"
+        snapshot_dir = "/mnt/snapshot_dir"
+        ssh_client.exec_command("sudo mkdir -p %s" % parent_share_dir)
+        ssh_client.exec_command("sudo mkdir -p %s" % snapshot_dir)
+        self.mount_share(user_export_location, ssh_client, parent_share_dir)
+        self.addCleanup(self.umount_share, ssh_client, parent_share_dir)
+
+        # 6 - Create "file1", ok, created
+        ssh_client.exec_command("sudo touch %s/file1" % parent_share_dir)
+
+        # 7 - Create snapshot SS1 from S1, ok, created
+        snapshot = self._create_snapshot(parent_share['id'])
+
+        # 8 - Create "file2" in share S1 - ok, created. We expect that
+        # snapshot will not contain any data created after snapshot creation.
+        ssh_client.exec_command("sudo touch %s/file2" % parent_share_dir)
+
+        # 9 - Allow access to SS1
+        self.provide_access_to_auxiliary_instance(instance, snapshot=snapshot)
+
+        # 10 - Mount SS1
+        user_export_location = self._get_user_export_location(
+            snapshot=snapshot)
+        self.mount_share(user_export_location, ssh_client, snapshot_dir)
+        self.addCleanup(self.umount_share, ssh_client, snapshot_dir)
+
+        # 11 - List files on SS1, only "file1" exists
+        # NOTE(lseki): using ls without recursion to avoid permission denied
+        #              error while listing lost+found directory on LVM volumes
+        output = ssh_client.exec_command("sudo ls -lA %s" % snapshot_dir)
+        self.assertIn('file1', output)
+        self.assertNotIn('file2', output)
+
+        # 12 - Try to create a file on SS1, should fail
+        self.assertRaises(
+            exceptions.SSHExecCommandFailed,
+            ssh_client.exec_command,
+            "sudo touch %s/file3" % snapshot_dir)
+
 
 class TestShareBasicOpsNFS(ShareBasicOpsBase):
     protocol = "nfs"