Merge "Add mountable snapshots support"
diff --git a/manila_tempest_tests/config.py b/manila_tempest_tests/config.py
index d25cb8d..e3a8936 100644
--- a/manila_tempest_tests/config.py
+++ b/manila_tempest_tests/config.py
@@ -30,7 +30,7 @@
                help="The minimum api microversion is configured to be the "
                     "value of the minimum microversion supported by Manila."),
     cfg.StrOpt("max_api_microversion",
-               default="2.31",
+               default="2.32",
                help="The maximum api microversion is configured to be the "
                     "value of the latest microversion supported by Manila."),
     cfg.StrOpt("region",
@@ -203,6 +203,9 @@
                 help="Defines whether to run manage/unmanage snapshot tests "
                      "or not. These tests may leave orphaned resources, so be "
                      "careful enabling this opt."),
+    cfg.BoolOpt("run_mount_snapshot_tests",
+                default=False,
+                help="Enable or disable mountable snapshot tests."),
 
     cfg.StrOpt("image_with_share_tools",
                default="manila-service-image-master",
diff --git a/manila_tempest_tests/services/share/v2/json/shares_client.py b/manila_tempest_tests/services/share/v2/json/shares_client.py
index aa176cb..1190dfe 100644
--- a/manila_tempest_tests/services/share/v2/json/shares_client.py
+++ b/manila_tempest_tests/services/share/v2/json/shares_client.py
@@ -674,6 +674,26 @@
                            })
                 raise exceptions.TimeoutException(message)
 
+    def get_snapshot_instance_export_location(
+            self, instance_id, export_location_uuid,
+            version=LATEST_MICROVERSION):
+        resp, body = self.get(
+            "snapshot-instances/%(instance_id)s/export-locations/%("
+            "el_uuid)s" % {
+                "instance_id": instance_id,
+                "el_uuid": export_location_uuid},
+            version=version)
+        self.expected_success(200, resp.status)
+        return self._parse_resp(body)
+
+    def list_snapshot_instance_export_locations(
+            self, instance_id, version=LATEST_MICROVERSION):
+        resp, body = self.get(
+            "snapshot-instances/%s/export-locations" % instance_id,
+            version=version)
+        self.expected_success(200, resp.status)
+        return self._parse_resp(body)
+
 ###############
 
     def _get_access_action_name(self, version, action):
@@ -1379,3 +1399,102 @@
                               version=version)
         self.expected_success(200, resp.status)
         return self._parse_resp(body)
+
+################
+
+    def create_snapshot_access_rule(self, snapshot_id, access_type="ip",
+                                    access_to="0.0.0.0/0"):
+        body = {
+            "allow_access": {
+                "access_type": access_type,
+                "access_to": access_to
+            }
+        }
+        resp, body = self.post("snapshots/%s/action" % snapshot_id,
+                               json.dumps(body), version=LATEST_MICROVERSION)
+        self.expected_success(202, resp.status)
+        return self._parse_resp(body)
+
+    def get_snapshot_access_rule(self, snapshot_id, rule_id):
+        resp, body = self.get("snapshots/%s/access-list" % snapshot_id,
+                              version=LATEST_MICROVERSION)
+        body = self._parse_resp(body)
+        found_rules = filter(lambda x: x['id'] == rule_id, body)
+
+        return found_rules[0] if len(found_rules) > 0 else None
+
+    def wait_for_snapshot_access_rule_status(self, snapshot_id, rule_id,
+                                             expected_state='active'):
+        rule = self.get_snapshot_access_rule(snapshot_id, rule_id)
+        state = rule['state']
+        start = int(time.time())
+
+        while state != expected_state:
+            time.sleep(self.build_interval)
+            rule = self.get_snapshot_access_rule(snapshot_id, rule_id)
+            state = rule['state']
+            if state == expected_state:
+                return
+            if 'error' in state:
+                raise share_exceptions.AccessRuleBuildErrorException(
+                    snapshot_id)
+
+            if int(time.time()) - start >= self.build_timeout:
+                message = ('The status of snapshot access rule %(id)s failed '
+                           'to reach %(expected_state)s state within the '
+                           'required time (%(time)ss). Current '
+                           'state: %(current_state)s.' %
+                           {
+                               'expected_state': expected_state,
+                               'time': self.build_timeout,
+                               'id': rule_id,
+                               'current_state': state,
+                           })
+                raise exceptions.TimeoutException(message)
+
+    def delete_snapshot_access_rule(self, snapshot_id, rule_id):
+        body = {
+            "deny_access": {
+                "access_id": rule_id,
+            }
+        }
+        resp, body = self.post("snapshots/%s/action" % snapshot_id,
+                               json.dumps(body), version=LATEST_MICROVERSION)
+        self.expected_success(202, resp.status)
+        return self._parse_resp(body)
+
+    def wait_for_snapshot_access_rule_deletion(self, snapshot_id, rule_id):
+        rule = self.get_snapshot_access_rule(snapshot_id, rule_id)
+        start = int(time.time())
+
+        while rule is not None:
+            time.sleep(self.build_interval)
+
+            rule = self.get_snapshot_access_rule(snapshot_id, rule_id)
+
+            if rule is None:
+                return
+            if int(time.time()) - start >= self.build_timeout:
+                message = ('The snapshot access rule %(id)s failed to delete '
+                           'within the required time (%(time)ss).' %
+                           {
+                               'time': self.build_timeout,
+                               'id': rule_id,
+                           })
+                raise exceptions.TimeoutException(message)
+
+    def get_snapshot_export_location(self, snapshot_id, export_location_uuid,
+                                     version=LATEST_MICROVERSION):
+        resp, body = self.get(
+            "snapshots/%(snapshot_id)s/export-locations/%(el_uuid)s" % {
+                "snapshot_id": snapshot_id, "el_uuid": export_location_uuid},
+            version=version)
+        self.expected_success(200, resp.status)
+        return self._parse_resp(body)
+
+    def list_snapshot_export_locations(
+            self, snapshot_id, version=LATEST_MICROVERSION):
+        resp, body = self.get(
+            "snapshots/%s/export-locations" % snapshot_id, version=version)
+        self.expected_success(200, resp.status)
+        return self._parse_resp(body)
diff --git a/manila_tempest_tests/tests/api/admin/test_share_types_extra_specs_negative.py b/manila_tempest_tests/tests/api/admin/test_share_types_extra_specs_negative.py
index 63fdf23..f888969 100644
--- a/manila_tempest_tests/tests/api/admin/test_share_types_extra_specs_negative.py
+++ b/manila_tempest_tests/tests/api/admin/test_share_types_extra_specs_negative.py
@@ -80,6 +80,8 @@
         if utils.is_microversion_ge(CONF.share.max_api_microversion,
                                     constants.REVERT_TO_SNAPSHOT_MICROVERSION):
             expected_keys.append('revert_to_snapshot_support')
+        if utils.is_microversion_ge(CONF.share.max_api_microversion, '2.32'):
+            expected_keys.append('mount_snapshot_support')
         actual_keys = share_type['share_type']['extra_specs'].keys()
         self.assertEqual(sorted(expected_keys), sorted(actual_keys),
                          'Incorrect extra specs visible to non-admin user; '
diff --git a/manila_tempest_tests/tests/api/admin/test_snapshot_export_locations.py b/manila_tempest_tests/tests/api/admin/test_snapshot_export_locations.py
new file mode 100644
index 0000000..dd48df4
--- /dev/null
+++ b/manila_tempest_tests/tests/api/admin/test_snapshot_export_locations.py
@@ -0,0 +1,140 @@
+# Copyright (c) 2017 Hitachi Data Systems, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import ddt
+from oslo_utils import uuidutils
+import six
+from tempest import config
+import testtools
+from testtools import testcase as tc
+
+from manila_tempest_tests.tests.api import base
+
+CONF = config.CONF
+LATEST_MICROVERSION = CONF.share.max_api_microversion
+
+
+@base.skip_if_microversion_lt("2.32")
+@testtools.skipUnless(CONF.share.run_mount_snapshot_tests and
+                      CONF.share.run_snapshot_tests,
+                      "Mountable snapshots tests are disabled.")
+@ddt.ddt
+class SnapshotExportLocationsTest(base.BaseSharesMixedTest):
+
+    @classmethod
+    def setup_clients(cls):
+        super(SnapshotExportLocationsTest, cls).setup_clients()
+        cls.admin_client = cls.admin_shares_v2_client
+
+    @classmethod
+    def resource_setup(cls):
+        super(SnapshotExportLocationsTest, cls).resource_setup()
+        cls.share = cls.create_share(client=cls.admin_client)
+        cls.snapshot = cls.create_snapshot_wait_for_active(
+            cls.share['id'], client=cls.admin_client)
+        cls.snapshot = cls.admin_client.get_snapshot(cls.snapshot['id'])
+        cls.snapshot_instances = cls.admin_client.list_snapshot_instances(
+            snapshot_id=cls.snapshot['id'])
+
+    def _verify_export_location_structure(
+            self, export_locations, role='admin', detail=False):
+
+        # Determine which keys to expect based on role, version and format
+        summary_keys = ['id', 'path', 'links']
+        if detail:
+            summary_keys.extend(['created_at', 'updated_at'])
+
+        admin_summary_keys = summary_keys + [
+            'share_snapshot_instance_id', 'is_admin_only']
+
+        if role == 'admin':
+            expected_keys = admin_summary_keys
+        else:
+            expected_keys = summary_keys
+
+        if not isinstance(export_locations, (list, tuple, set)):
+            export_locations = (export_locations, )
+
+        for export_location in export_locations:
+
+            # Check that the correct keys are present
+            self.assertEqual(len(expected_keys), len(export_location))
+            for key in expected_keys:
+                self.assertIn(key, export_location)
+
+            # Check the format of ever-present summary keys
+            self.assertTrue(uuidutils.is_uuid_like(export_location['id']))
+            self.assertIsInstance(export_location['path'],
+                                  six.string_types)
+
+            if role == 'admin':
+                self.assertIn(export_location['is_admin_only'], (True, False))
+                self.assertTrue(uuidutils.is_uuid_like(
+                    export_location['share_snapshot_instance_id']))
+
+    @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
+    def test_list_snapshot_export_location(self):
+        export_locations = (
+            self.admin_client.list_snapshot_export_locations(
+                self.snapshot['id']))
+
+        for el in export_locations:
+            self._verify_export_location_structure(el)
+
+    @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
+    def test_get_snapshot_export_location(self):
+        export_locations = (
+            self.admin_client.list_snapshot_export_locations(
+                self.snapshot['id']))
+
+        for export_location in export_locations:
+            el = self.admin_client.get_snapshot_export_location(
+                self.snapshot['id'], export_location['id'])
+            self._verify_export_location_structure(el, detail=True)
+
+    @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
+    def test_get_snapshot_instance_export_location(self):
+        for snapshot_instance in self.snapshot_instances:
+            export_locations = (
+                self.admin_client.list_snapshot_instance_export_locations(
+                    snapshot_instance['id']))
+            for el in export_locations:
+                el = self.admin_client.get_snapshot_instance_export_location(
+                    snapshot_instance['id'], el['id'])
+                self._verify_export_location_structure(el, detail=True)
+
+    @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
+    def test_snapshot_contains_all_export_locations_of_all_snapshot_instances(
+            self):
+        snapshot_export_locations = (
+            self.admin_client.list_snapshot_export_locations(
+                self.snapshot['id']))
+        snapshot_instances_export_locations = []
+        for snapshot_instance in self.snapshot_instances:
+            snapshot_instance_export_locations = (
+                self.admin_client.list_snapshot_instance_export_locations(
+                    snapshot_instance['id']))
+            snapshot_instances_export_locations.extend(
+                snapshot_instance_export_locations)
+
+        self.assertEqual(
+            len(snapshot_export_locations),
+            len(snapshot_instances_export_locations)
+        )
+        self.assertEqual(
+            sorted(snapshot_export_locations, key=lambda el: el['id']),
+            sorted(snapshot_instances_export_locations,
+                   key=lambda el: el['id'])
+        )
diff --git a/manila_tempest_tests/tests/api/admin/test_snapshot_export_locations_negative.py b/manila_tempest_tests/tests/api/admin/test_snapshot_export_locations_negative.py
new file mode 100644
index 0000000..6fccc4d
--- /dev/null
+++ b/manila_tempest_tests/tests/api/admin/test_snapshot_export_locations_negative.py
@@ -0,0 +1,140 @@
+# Copyright (c) 2017 Hitachi Data Systems, Inc.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from tempest import config
+from tempest.lib import exceptions as lib_exc
+import testtools
+from testtools import testcase as tc
+
+from manila_tempest_tests.tests.api import base
+
+CONF = config.CONF
+
+
+@base.skip_if_microversion_lt("2.32")
+@testtools.skipUnless(CONF.share.run_mount_snapshot_tests and
+                      CONF.share.run_snapshot_tests,
+                      "Mountable snapshots tests are disabled.")
+class SnapshotExportLocationsNegativeTest(base.BaseSharesMixedTest):
+
+    @classmethod
+    def setup_clients(cls):
+        super(SnapshotExportLocationsNegativeTest, cls).setup_clients()
+        cls.admin_client = cls.admin_shares_v2_client
+        cls.isolated_client = cls.alt_shares_v2_client
+
+    @classmethod
+    def resource_setup(cls):
+        super(SnapshotExportLocationsNegativeTest, cls).resource_setup()
+        cls.share = cls.create_share(client=cls.admin_client)
+        cls.snapshot = cls.create_snapshot_wait_for_active(
+            cls.share['id'], client=cls.admin_client)
+        cls.snapshot = cls.admin_client.get_snapshot(cls.snapshot['id'])
+        cls.snapshot_instances = cls.admin_client.list_snapshot_instances(
+            snapshot_id=cls.snapshot['id'])
+
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+    def test_get_inexistent_snapshot_export_location(self):
+        self.assertRaises(
+            lib_exc.NotFound,
+            self.admin_client.get_snapshot_export_location,
+            self.snapshot['id'],
+            "fake-inexistent-snapshot-export-location-id",
+        )
+
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+    def test_list_snapshot_export_locations_by_member(self):
+        self.assertRaises(
+            lib_exc.NotFound,
+            self.isolated_client.list_snapshot_export_locations,
+            self.snapshot['id']
+        )
+
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+    def test_get_snapshot_export_location_by_member(self):
+        export_locations = (
+            self.admin_client.list_snapshot_export_locations(
+                self.snapshot['id']))
+
+        for export_location in export_locations:
+            if export_location['is_admin_only']:
+                continue
+            self.assertRaises(
+                lib_exc.NotFound,
+                self.isolated_client.get_snapshot_export_location,
+                self.snapshot['id'],
+                export_location['id']
+            )
+
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+    def test_get_inexistent_snapshot_instance_export_location(self):
+        for snapshot_instance in self.snapshot_instances:
+            self.assertRaises(
+                lib_exc.NotFound,
+                self.admin_client.get_snapshot_instance_export_location,
+                snapshot_instance['id'],
+                "fake-inexistent-snapshot-export-location-id",
+            )
+
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+    def test_get_snapshot_instance_export_location_by_member(self):
+        for snapshot_instance in self.snapshot_instances:
+            export_locations = (
+                self.admin_client.list_snapshot_instance_export_locations(
+                    snapshot_instance['id']))
+            for el in export_locations:
+                self.assertRaises(
+                    lib_exc.Forbidden,
+                    self.isolated_client.get_snapshot_instance_export_location,
+                    snapshot_instance['id'], el['id'],
+                )
+
+
+@testtools.skipUnless(CONF.share.run_mount_snapshot_tests and
+                      CONF.share.run_snapshot_tests,
+                      "Mountable snapshots tests are disabled.")
+@base.skip_if_microversion_lt("2.32")
+class SnapshotExportLocationsAPIOnlyNegativeTest(base.BaseSharesMixedTest):
+
+    @classmethod
+    def setup_clients(cls):
+        super(SnapshotExportLocationsAPIOnlyNegativeTest, cls).setup_clients()
+        cls.admin_client = cls.admin_shares_v2_client
+        cls.isolated_client = cls.alt_shares_v2_client
+
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API)
+    def test_list_export_locations_by_nonexistent_snapshot(self):
+        self.assertRaises(
+            lib_exc.NotFound,
+            self.admin_client.list_snapshot_export_locations,
+            "fake-inexistent-snapshot-id",
+        )
+
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API)
+    def test_list_export_locations_by_nonexistent_snapshot_instance(self):
+        self.assertRaises(
+            lib_exc.NotFound,
+            self.admin_client.list_snapshot_instance_export_locations,
+            "fake-inexistent-snapshot-instance-id",
+        )
+
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API)
+    def test_list_inexistent_snapshot_instance_export_locations_by_member(
+            self):
+        self.assertRaises(
+            lib_exc.Forbidden,
+            self.isolated_client.list_snapshot_instance_export_locations,
+            "fake-inexistent-snapshot-instance-id"
+        )
diff --git a/manila_tempest_tests/tests/api/test_snapshot_rules.py b/manila_tempest_tests/tests/api/test_snapshot_rules.py
new file mode 100644
index 0000000..af80f4d
--- /dev/null
+++ b/manila_tempest_tests/tests/api/test_snapshot_rules.py
@@ -0,0 +1,101 @@
+# Copyright 2016 Hitachi Data Systems
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import six
+
+import ddt
+from tempest import config
+import testtools
+from testtools import testcase as tc
+
+from manila_tempest_tests.tests.api import base
+
+CONF = config.CONF
+
+
+class BaseShareSnapshotRulesTest(base.BaseSharesTest):
+
+    protocol = ""
+
+    @classmethod
+    def resource_setup(cls):
+        super(BaseShareSnapshotRulesTest, cls).resource_setup()
+        cls.share = cls.create_share(cls.protocol)
+        cls.snapshot = cls.create_snapshot_wait_for_active(cls.share['id'])
+
+    def _test_create_delete_access_rules(self, access_to):
+        # create rule
+        rule = self.shares_v2_client.create_snapshot_access_rule(
+            self.snapshot['id'], self.access_type, access_to)
+
+        for key in ('deleted', 'deleted_at', 'instance_mappings'):
+            self.assertNotIn(key, list(six.iterkeys(rule)))
+
+        self.shares_v2_client.wait_for_snapshot_access_rule_status(
+            self.snapshot['id'], rule['id'])
+
+        # delete rule and wait for deletion
+        self.shares_v2_client.delete_snapshot_access_rule(self.snapshot['id'],
+                                                          rule['id'])
+        self.shares_v2_client.wait_for_snapshot_access_rule_deletion(
+            self.snapshot['id'], rule['id'])
+
+
+@base.skip_if_microversion_lt("2.32")
+@testtools.skipUnless(CONF.share.run_mount_snapshot_tests and
+                      CONF.share.run_snapshot_tests,
+                      'Mountable snapshots tests are disabled.')
+@ddt.ddt
+class ShareSnapshotIpRulesForNFSTest(BaseShareSnapshotRulesTest):
+    protocol = "nfs"
+
+    @classmethod
+    def resource_setup(cls):
+        if not (cls.protocol in CONF.share.enable_protocols and
+                cls.protocol in CONF.share.enable_ip_rules_for_protocols):
+            msg = "IP rule tests for %s protocol are disabled." % cls.protocol
+            raise cls.skipException(msg)
+        super(ShareSnapshotIpRulesForNFSTest, cls).resource_setup()
+
+        cls.access_type = "ip"
+
+    @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
+    @ddt.data("1.1.1.1", "1.2.3.4/32")
+    def test_create_delete_access_rules(self, access_to):
+        self._test_create_delete_access_rules(access_to)
+
+
+@base.skip_if_microversion_lt("2.32")
+@testtools.skipUnless(CONF.share.run_mount_snapshot_tests,
+                      'Mountable snapshots tests are disabled.')
+@ddt.ddt
+class ShareSnapshotUserRulesForCIFSTest(BaseShareSnapshotRulesTest):
+    protocol = "cifs"
+
+    @classmethod
+    def resource_setup(cls):
+        if not (cls.protocol in CONF.share.enable_protocols and
+                cls.protocol in CONF.share.enable_user_rules_for_protocols):
+            msg = ("User rule tests for %s protocol are "
+                   "disabled." % cls.protocol)
+            raise cls.skipException(msg)
+        super(ShareSnapshotUserRulesForCIFSTest, cls).resource_setup()
+
+        cls.access_type = "user"
+
+    @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
+    def test_create_delete_access_rules(self):
+        access_to = CONF.share.username_for_user_rules
+        self._test_create_delete_access_rules(access_to)
diff --git a/manila_tempest_tests/tests/api/test_snapshot_rules_negative.py b/manila_tempest_tests/tests/api/test_snapshot_rules_negative.py
new file mode 100644
index 0000000..9f48b73
--- /dev/null
+++ b/manila_tempest_tests/tests/api/test_snapshot_rules_negative.py
@@ -0,0 +1,90 @@
+# Copyright 2016 Hitachi Data Systems
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import ddt
+from tempest import config
+from tempest.lib import exceptions as lib_exc
+import testtools
+from testtools import testcase as tc
+
+from manila_tempest_tests.tests.api import base
+from manila_tempest_tests.tests.api import test_snapshot_rules
+
+CONF = config.CONF
+
+
+@base.skip_if_microversion_lt("2.32")
+@testtools.skipUnless(CONF.share.run_mount_snapshot_tests and
+                      CONF.share.run_snapshot_tests,
+                      'Mountable snapshots tests are disabled.')
+@ddt.ddt
+class SnapshotIpRulesForNFSNegativeTest(
+        test_snapshot_rules.BaseShareSnapshotRulesTest):
+    protocol = "nfs"
+
+    @classmethod
+    def resource_setup(cls):
+        if not (cls.protocol in CONF.share.enable_protocols and
+                cls.protocol in CONF.share.enable_ip_rules_for_protocols):
+            msg = "IP rule tests for %s protocol are disabled." % cls.protocol
+            raise cls.skipException(msg)
+        super(SnapshotIpRulesForNFSNegativeTest, cls).resource_setup()
+
+        # create share
+        cls.share = cls.create_share(cls.protocol)
+        cls.snap = cls.create_snapshot_wait_for_active(cls.share["id"])
+
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+    @ddt.data("1.2.3.256", "1.1.1.-", "1.2.3.4/33", "1.2.3.*", "1.2.3.*/23",
+              "1.2.3.1|23", "1.2.3.1/",  "1.2.3.1/-1", "fe00::1",
+              "fe80::217:f2ff:fe07:ed62", "2001:db8::/48", "::1/128",
+              "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff",
+              "2001:0db8:0000:85a3:0000:0000:ac1f:8001")
+    def test_create_access_rule_ip_with_wrong_target(self, target):
+        self.assertRaises(lib_exc.BadRequest,
+                          self.shares_v2_client.create_snapshot_access_rule,
+                          self.snap["id"], "ip", target)
+
+    @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+    def test_create_duplicate_of_ip_rule(self):
+        self._test_duplicate_rules()
+        self._test_duplicate_rules()
+
+    def _test_duplicate_rules(self):
+        # test data
+        access_type = "ip"
+        access_to = "1.2.3.4"
+
+        # create rule
+        rule = self.shares_v2_client.create_snapshot_access_rule(
+            self.snap['id'], access_type, access_to)
+
+        self.shares_v2_client.wait_for_snapshot_access_rule_status(
+            self.snap['id'], rule['id'])
+
+        # try create duplicate of rule
+        self.assertRaises(lib_exc.BadRequest,
+                          self.shares_v2_client.create_snapshot_access_rule,
+                          self.snap["id"], access_type, access_to)
+
+        # delete rule and wait for deletion
+        self.shares_v2_client.delete_snapshot_access_rule(self.snap['id'],
+                                                          rule['id'])
+        self.shares_v2_client.wait_for_snapshot_access_rule_deletion(
+            self.snap['id'], rule['id'])
+
+        self.assertRaises(lib_exc.NotFound,
+                          self.shares_v2_client.delete_snapshot_access_rule,
+                          self.snap['id'], rule['id'])