Merge "[Tempest] Add functional tests for share groups feature"
diff --git a/manila_tempest_tests/config.py b/manila_tempest_tests/config.py
index 9af95eb..ee3dd42 100644
--- a/manila_tempest_tests/config.py
+++ b/manila_tempest_tests/config.py
@@ -30,7 +30,7 @@
help="The minimum api microversion is configured to be the "
"value of the minimum microversion supported by Manila."),
cfg.StrOpt("max_api_microversion",
- default="2.31",
+ default="2.32",
help="The maximum api microversion is configured to be the "
"value of the latest microversion supported by Manila."),
cfg.StrOpt("region",
@@ -202,6 +202,9 @@
help="Defines whether to run manage/unmanage snapshot tests "
"or not. These tests may leave orphaned resources, so be "
"careful enabling this opt."),
+ cfg.BoolOpt("run_mount_snapshot_tests",
+ default=False,
+ help="Enable or disable mountable snapshot tests."),
cfg.StrOpt("image_with_share_tools",
default="manila-service-image-master",
diff --git a/manila_tempest_tests/services/share/v2/json/shares_client.py b/manila_tempest_tests/services/share/v2/json/shares_client.py
index a524051..d58af79 100644
--- a/manila_tempest_tests/services/share/v2/json/shares_client.py
+++ b/manila_tempest_tests/services/share/v2/json/shares_client.py
@@ -687,6 +687,26 @@
})
raise exceptions.TimeoutException(message)
+ def get_snapshot_instance_export_location(
+ self, instance_id, export_location_uuid,
+ version=LATEST_MICROVERSION):
+ resp, body = self.get(
+ "snapshot-instances/%(instance_id)s/export-locations/%("
+ "el_uuid)s" % {
+ "instance_id": instance_id,
+ "el_uuid": export_location_uuid},
+ version=version)
+ self.expected_success(200, resp.status)
+ return self._parse_resp(body)
+
+ def list_snapshot_instance_export_locations(
+ self, instance_id, version=LATEST_MICROVERSION):
+ resp, body = self.get(
+ "snapshot-instances/%s/export-locations" % instance_id,
+ version=version)
+ self.expected_success(200, resp.status)
+ return self._parse_resp(body)
+
###############
def _get_access_action_name(self, version, action):
@@ -1542,3 +1562,102 @@
version=version)
self.expected_success(200, resp.status)
return self._parse_resp(body)
+
+################
+
+ def create_snapshot_access_rule(self, snapshot_id, access_type="ip",
+ access_to="0.0.0.0/0"):
+ body = {
+ "allow_access": {
+ "access_type": access_type,
+ "access_to": access_to
+ }
+ }
+ resp, body = self.post("snapshots/%s/action" % snapshot_id,
+ json.dumps(body), version=LATEST_MICROVERSION)
+ self.expected_success(202, resp.status)
+ return self._parse_resp(body)
+
+ def get_snapshot_access_rule(self, snapshot_id, rule_id):
+ resp, body = self.get("snapshots/%s/access-list" % snapshot_id,
+ version=LATEST_MICROVERSION)
+ body = self._parse_resp(body)
+ found_rules = filter(lambda x: x['id'] == rule_id, body)
+
+ return found_rules[0] if len(found_rules) > 0 else None
+
+ def wait_for_snapshot_access_rule_status(self, snapshot_id, rule_id,
+ expected_state='active'):
+ rule = self.get_snapshot_access_rule(snapshot_id, rule_id)
+ state = rule['state']
+ start = int(time.time())
+
+ while state != expected_state:
+ time.sleep(self.build_interval)
+ rule = self.get_snapshot_access_rule(snapshot_id, rule_id)
+ state = rule['state']
+ if state == expected_state:
+ return
+ if 'error' in state:
+ raise share_exceptions.AccessRuleBuildErrorException(
+ snapshot_id)
+
+ if int(time.time()) - start >= self.build_timeout:
+ message = ('The status of snapshot access rule %(id)s failed '
+ 'to reach %(expected_state)s state within the '
+ 'required time (%(time)ss). Current '
+ 'state: %(current_state)s.' %
+ {
+ 'expected_state': expected_state,
+ 'time': self.build_timeout,
+ 'id': rule_id,
+ 'current_state': state,
+ })
+ raise exceptions.TimeoutException(message)
+
+ def delete_snapshot_access_rule(self, snapshot_id, rule_id):
+ body = {
+ "deny_access": {
+ "access_id": rule_id,
+ }
+ }
+ resp, body = self.post("snapshots/%s/action" % snapshot_id,
+ json.dumps(body), version=LATEST_MICROVERSION)
+ self.expected_success(202, resp.status)
+ return self._parse_resp(body)
+
+ def wait_for_snapshot_access_rule_deletion(self, snapshot_id, rule_id):
+ rule = self.get_snapshot_access_rule(snapshot_id, rule_id)
+ start = int(time.time())
+
+ while rule is not None:
+ time.sleep(self.build_interval)
+
+ rule = self.get_snapshot_access_rule(snapshot_id, rule_id)
+
+ if rule is None:
+ return
+ if int(time.time()) - start >= self.build_timeout:
+ message = ('The snapshot access rule %(id)s failed to delete '
+ 'within the required time (%(time)ss).' %
+ {
+ 'time': self.build_timeout,
+ 'id': rule_id,
+ })
+ raise exceptions.TimeoutException(message)
+
+ def get_snapshot_export_location(self, snapshot_id, export_location_uuid,
+ version=LATEST_MICROVERSION):
+ resp, body = self.get(
+ "snapshots/%(snapshot_id)s/export-locations/%(el_uuid)s" % {
+ "snapshot_id": snapshot_id, "el_uuid": export_location_uuid},
+ version=version)
+ self.expected_success(200, resp.status)
+ return self._parse_resp(body)
+
+ def list_snapshot_export_locations(
+ self, snapshot_id, version=LATEST_MICROVERSION):
+ resp, body = self.get(
+ "snapshots/%s/export-locations" % snapshot_id, version=version)
+ self.expected_success(200, resp.status)
+ return self._parse_resp(body)
diff --git a/manila_tempest_tests/tests/api/admin/test_share_types_extra_specs_negative.py b/manila_tempest_tests/tests/api/admin/test_share_types_extra_specs_negative.py
index 63fdf23..f888969 100644
--- a/manila_tempest_tests/tests/api/admin/test_share_types_extra_specs_negative.py
+++ b/manila_tempest_tests/tests/api/admin/test_share_types_extra_specs_negative.py
@@ -80,6 +80,8 @@
if utils.is_microversion_ge(CONF.share.max_api_microversion,
constants.REVERT_TO_SNAPSHOT_MICROVERSION):
expected_keys.append('revert_to_snapshot_support')
+ if utils.is_microversion_ge(CONF.share.max_api_microversion, '2.32'):
+ expected_keys.append('mount_snapshot_support')
actual_keys = share_type['share_type']['extra_specs'].keys()
self.assertEqual(sorted(expected_keys), sorted(actual_keys),
'Incorrect extra specs visible to non-admin user; '
diff --git a/manila_tempest_tests/tests/api/admin/test_snapshot_export_locations.py b/manila_tempest_tests/tests/api/admin/test_snapshot_export_locations.py
new file mode 100644
index 0000000..dd48df4
--- /dev/null
+++ b/manila_tempest_tests/tests/api/admin/test_snapshot_export_locations.py
@@ -0,0 +1,140 @@
+# Copyright (c) 2017 Hitachi Data Systems, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import ddt
+from oslo_utils import uuidutils
+import six
+from tempest import config
+import testtools
+from testtools import testcase as tc
+
+from manila_tempest_tests.tests.api import base
+
+CONF = config.CONF
+LATEST_MICROVERSION = CONF.share.max_api_microversion
+
+
+@base.skip_if_microversion_lt("2.32")
+@testtools.skipUnless(CONF.share.run_mount_snapshot_tests and
+ CONF.share.run_snapshot_tests,
+ "Mountable snapshots tests are disabled.")
+@ddt.ddt
+class SnapshotExportLocationsTest(base.BaseSharesMixedTest):
+
+ @classmethod
+ def setup_clients(cls):
+ super(SnapshotExportLocationsTest, cls).setup_clients()
+ cls.admin_client = cls.admin_shares_v2_client
+
+ @classmethod
+ def resource_setup(cls):
+ super(SnapshotExportLocationsTest, cls).resource_setup()
+ cls.share = cls.create_share(client=cls.admin_client)
+ cls.snapshot = cls.create_snapshot_wait_for_active(
+ cls.share['id'], client=cls.admin_client)
+ cls.snapshot = cls.admin_client.get_snapshot(cls.snapshot['id'])
+ cls.snapshot_instances = cls.admin_client.list_snapshot_instances(
+ snapshot_id=cls.snapshot['id'])
+
+ def _verify_export_location_structure(
+ self, export_locations, role='admin', detail=False):
+
+ # Determine which keys to expect based on role, version and format
+ summary_keys = ['id', 'path', 'links']
+ if detail:
+ summary_keys.extend(['created_at', 'updated_at'])
+
+ admin_summary_keys = summary_keys + [
+ 'share_snapshot_instance_id', 'is_admin_only']
+
+ if role == 'admin':
+ expected_keys = admin_summary_keys
+ else:
+ expected_keys = summary_keys
+
+ if not isinstance(export_locations, (list, tuple, set)):
+ export_locations = (export_locations, )
+
+ for export_location in export_locations:
+
+ # Check that the correct keys are present
+ self.assertEqual(len(expected_keys), len(export_location))
+ for key in expected_keys:
+ self.assertIn(key, export_location)
+
+ # Check the format of ever-present summary keys
+ self.assertTrue(uuidutils.is_uuid_like(export_location['id']))
+ self.assertIsInstance(export_location['path'],
+ six.string_types)
+
+ if role == 'admin':
+ self.assertIn(export_location['is_admin_only'], (True, False))
+ self.assertTrue(uuidutils.is_uuid_like(
+ export_location['share_snapshot_instance_id']))
+
+ @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
+ def test_list_snapshot_export_location(self):
+ export_locations = (
+ self.admin_client.list_snapshot_export_locations(
+ self.snapshot['id']))
+
+ for el in export_locations:
+ self._verify_export_location_structure(el)
+
+ @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
+ def test_get_snapshot_export_location(self):
+ export_locations = (
+ self.admin_client.list_snapshot_export_locations(
+ self.snapshot['id']))
+
+ for export_location in export_locations:
+ el = self.admin_client.get_snapshot_export_location(
+ self.snapshot['id'], export_location['id'])
+ self._verify_export_location_structure(el, detail=True)
+
+ @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
+ def test_get_snapshot_instance_export_location(self):
+ for snapshot_instance in self.snapshot_instances:
+ export_locations = (
+ self.admin_client.list_snapshot_instance_export_locations(
+ snapshot_instance['id']))
+ for el in export_locations:
+ el = self.admin_client.get_snapshot_instance_export_location(
+ snapshot_instance['id'], el['id'])
+ self._verify_export_location_structure(el, detail=True)
+
+ @tc.attr(base.TAG_POSITIVE, base.TAG_API_WITH_BACKEND)
+ def test_snapshot_contains_all_export_locations_of_all_snapshot_instances(
+ self):
+ snapshot_export_locations = (
+ self.admin_client.list_snapshot_export_locations(
+ self.snapshot['id']))
+ snapshot_instances_export_locations = []
+ for snapshot_instance in self.snapshot_instances:
+ snapshot_instance_export_locations = (
+ self.admin_client.list_snapshot_instance_export_locations(
+ snapshot_instance['id']))
+ snapshot_instances_export_locations.extend(
+ snapshot_instance_export_locations)
+
+ self.assertEqual(
+ len(snapshot_export_locations),
+ len(snapshot_instances_export_locations)
+ )
+ self.assertEqual(
+ sorted(snapshot_export_locations, key=lambda el: el['id']),
+ sorted(snapshot_instances_export_locations,
+ key=lambda el: el['id'])
+ )
diff --git a/manila_tempest_tests/tests/api/admin/test_snapshot_export_locations_negative.py b/manila_tempest_tests/tests/api/admin/test_snapshot_export_locations_negative.py
new file mode 100644
index 0000000..6fccc4d
--- /dev/null
+++ b/manila_tempest_tests/tests/api/admin/test_snapshot_export_locations_negative.py
@@ -0,0 +1,140 @@
+# Copyright (c) 2017 Hitachi Data Systems, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest import config
+from tempest.lib import exceptions as lib_exc
+import testtools
+from testtools import testcase as tc
+
+from manila_tempest_tests.tests.api import base
+
+CONF = config.CONF
+
+
+@base.skip_if_microversion_lt("2.32")
+@testtools.skipUnless(CONF.share.run_mount_snapshot_tests and
+ CONF.share.run_snapshot_tests,
+ "Mountable snapshots tests are disabled.")
+class SnapshotExportLocationsNegativeTest(base.BaseSharesMixedTest):
+
+ @classmethod
+ def setup_clients(cls):
+ super(SnapshotExportLocationsNegativeTest, cls).setup_clients()
+ cls.admin_client = cls.admin_shares_v2_client
+ cls.isolated_client = cls.alt_shares_v2_client
+
+ @classmethod
+ def resource_setup(cls):
+ super(SnapshotExportLocationsNegativeTest, cls).resource_setup()
+ cls.share = cls.create_share(client=cls.admin_client)
+ cls.snapshot = cls.create_snapshot_wait_for_active(
+ cls.share['id'], client=cls.admin_client)
+ cls.snapshot = cls.admin_client.get_snapshot(cls.snapshot['id'])
+ cls.snapshot_instances = cls.admin_client.list_snapshot_instances(
+ snapshot_id=cls.snapshot['id'])
+
+ @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+ def test_get_inexistent_snapshot_export_location(self):
+ self.assertRaises(
+ lib_exc.NotFound,
+ self.admin_client.get_snapshot_export_location,
+ self.snapshot['id'],
+ "fake-inexistent-snapshot-export-location-id",
+ )
+
+ @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+ def test_list_snapshot_export_locations_by_member(self):
+ self.assertRaises(
+ lib_exc.NotFound,
+ self.isolated_client.list_snapshot_export_locations,
+ self.snapshot['id']
+ )
+
+ @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+ def test_get_snapshot_export_location_by_member(self):
+ export_locations = (
+ self.admin_client.list_snapshot_export_locations(
+ self.snapshot['id']))
+
+ for export_location in export_locations:
+ if export_location['is_admin_only']:
+ continue
+ self.assertRaises(
+ lib_exc.NotFound,
+ self.isolated_client.get_snapshot_export_location,
+ self.snapshot['id'],
+ export_location['id']
+ )
+
+ @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+ def test_get_inexistent_snapshot_instance_export_location(self):
+ for snapshot_instance in self.snapshot_instances:
+ self.assertRaises(
+ lib_exc.NotFound,
+ self.admin_client.get_snapshot_instance_export_location,
+ snapshot_instance['id'],
+ "fake-inexistent-snapshot-export-location-id",
+ )
+
+ @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+ def test_get_snapshot_instance_export_location_by_member(self):
+ for snapshot_instance in self.snapshot_instances:
+ export_locations = (
+ self.admin_client.list_snapshot_instance_export_locations(
+ snapshot_instance['id']))
+ for el in export_locations:
+ self.assertRaises(
+ lib_exc.Forbidden,
+ self.isolated_client.get_snapshot_instance_export_location,
+ snapshot_instance['id'], el['id'],
+ )
+
+
+@testtools.skipUnless(CONF.share.run_mount_snapshot_tests and
+ CONF.share.run_snapshot_tests,
+ "Mountable snapshots tests are disabled.")
+@base.skip_if_microversion_lt("2.32")
+class SnapshotExportLocationsAPIOnlyNegativeTest(base.BaseSharesMixedTest):
+
+ @classmethod
+ def setup_clients(cls):
+ super(SnapshotExportLocationsAPIOnlyNegativeTest, cls).setup_clients()
+ cls.admin_client = cls.admin_shares_v2_client
+ cls.isolated_client = cls.alt_shares_v2_client
+
+ @tc.attr(base.TAG_NEGATIVE, base.TAG_API)
+ def test_list_export_locations_by_nonexistent_snapshot(self):
+ self.assertRaises(
+ lib_exc.NotFound,
+ self.admin_client.list_snapshot_export_locations,
+ "fake-inexistent-snapshot-id",
+ )
+
+ @tc.attr(base.TAG_NEGATIVE, base.TAG_API)
+ def test_list_export_locations_by_nonexistent_snapshot_instance(self):
+ self.assertRaises(
+ lib_exc.NotFound,
+ self.admin_client.list_snapshot_instance_export_locations,
+ "fake-inexistent-snapshot-instance-id",
+ )
+
+ @tc.attr(base.TAG_NEGATIVE, base.TAG_API)
+ def test_list_inexistent_snapshot_instance_export_locations_by_member(
+ self):
+ self.assertRaises(
+ lib_exc.Forbidden,
+ self.isolated_client.list_snapshot_instance_export_locations,
+ "fake-inexistent-snapshot-instance-id"
+ )
diff --git a/manila_tempest_tests/tests/api/test_revert_to_snapshot.py b/manila_tempest_tests/tests/api/test_revert_to_snapshot.py
index 686b185..91eca02 100644
--- a/manila_tempest_tests/tests/api/test_revert_to_snapshot.py
+++ b/manila_tempest_tests/tests/api/test_revert_to_snapshot.py
@@ -20,6 +20,7 @@
from testtools import testcase as tc
from manila_tempest_tests.common import constants
+from manila_tempest_tests import share_exceptions
from manila_tempest_tests.tests.api import base
CONF = config.CONF
@@ -72,6 +73,26 @@
cls.share = cls.create_share(share_type_id=cls.st_id)
+ if CONF.share.run_replication_tests:
+ # Create replicated share type
+ cls.replicated_share_type_name = data_utils.rand_name("share-type")
+ cls.replication_type = CONF.share.backend_replication_type
+ if cls.replication_type not in constants.REPLICATION_TYPE_CHOICES:
+ raise share_exceptions.ShareReplicationTypeException(
+ replication_type=cls.replication_type
+ )
+ cls.zones = cls.get_availability_zones(client=cls.admin_client)
+ cls.share_zone = cls.zones[0]
+ cls.replica_zone = cls.zones[-1]
+
+ extra_specs = cls.add_extra_specs_to_dict(
+ {"replication_type": cls.replication_type})
+ share_type = cls.create_share_type(
+ cls.replicated_share_type_name,
+ extra_specs=extra_specs,
+ client=cls.admin_client)
+ cls.replicated_share_type = share_type["share_type"]
+
@tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
@ddt.data(
*{constants.REVERT_TO_SNAPSHOT_MICROVERSION,
@@ -107,3 +128,35 @@
version=version)
self.shares_v2_client.wait_for_share_status(self.share['id'],
constants.STATUS_AVAILABLE)
+
+ @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
+ @tc.skipUnless(CONF.share.run_replication_tests,
+ 'Replication tests are disabled.')
+ @ddt.data(
+ *{constants.REVERT_TO_SNAPSHOT_MICROVERSION,
+ CONF.share.max_api_microversion}
+ )
+ def test_revert_to_replicated_snapshot(self, version):
+ """Test reverting to a replicated snapshot."""
+ share = self.create_share(
+ share_type_id=self.replicated_share_type['id'],
+ availability_zone=self.share_zone
+ )
+
+ share_replica = self.create_share_replica(share["id"],
+ self.replica_zone)
+ self.shares_v2_client.wait_for_share_replica_status(
+ share_replica['id'], constants.REPLICATION_STATE_IN_SYNC,
+ status_attr='replica_state')
+
+ snapshot = self.create_snapshot_wait_for_active(share["id"])
+
+ self.shares_v2_client.revert_to_snapshot(
+ share['id'],
+ snapshot['id'],
+ version=version)
+ self.shares_v2_client.wait_for_share_status(share['id'],
+ constants.STATUS_AVAILABLE)
+ self.shares_v2_client.wait_for_share_replica_status(
+ share_replica['id'], constants.REPLICATION_STATE_IN_SYNC,
+ status_attr='replica_state')
diff --git a/manila_tempest_tests/tests/api/test_snapshot_rules.py b/manila_tempest_tests/tests/api/test_snapshot_rules.py
new file mode 100644
index 0000000..af80f4d
--- /dev/null
+++ b/manila_tempest_tests/tests/api/test_snapshot_rules.py
@@ -0,0 +1,101 @@
+# Copyright 2016 Hitachi Data Systems
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import six
+
+import ddt
+from tempest import config
+import testtools
+from testtools import testcase as tc
+
+from manila_tempest_tests.tests.api import base
+
+CONF = config.CONF
+
+
+class BaseShareSnapshotRulesTest(base.BaseSharesTest):
+
+ protocol = ""
+
+ @classmethod
+ def resource_setup(cls):
+ super(BaseShareSnapshotRulesTest, cls).resource_setup()
+ cls.share = cls.create_share(cls.protocol)
+ cls.snapshot = cls.create_snapshot_wait_for_active(cls.share['id'])
+
+ def _test_create_delete_access_rules(self, access_to):
+ # create rule
+ rule = self.shares_v2_client.create_snapshot_access_rule(
+ self.snapshot['id'], self.access_type, access_to)
+
+ for key in ('deleted', 'deleted_at', 'instance_mappings'):
+ self.assertNotIn(key, list(six.iterkeys(rule)))
+
+ self.shares_v2_client.wait_for_snapshot_access_rule_status(
+ self.snapshot['id'], rule['id'])
+
+ # delete rule and wait for deletion
+ self.shares_v2_client.delete_snapshot_access_rule(self.snapshot['id'],
+ rule['id'])
+ self.shares_v2_client.wait_for_snapshot_access_rule_deletion(
+ self.snapshot['id'], rule['id'])
+
+
+@base.skip_if_microversion_lt("2.32")
+@testtools.skipUnless(CONF.share.run_mount_snapshot_tests and
+ CONF.share.run_snapshot_tests,
+ 'Mountable snapshots tests are disabled.')
+@ddt.ddt
+class ShareSnapshotIpRulesForNFSTest(BaseShareSnapshotRulesTest):
+ protocol = "nfs"
+
+ @classmethod
+ def resource_setup(cls):
+ if not (cls.protocol in CONF.share.enable_protocols and
+ cls.protocol in CONF.share.enable_ip_rules_for_protocols):
+ msg = "IP rule tests for %s protocol are disabled." % cls.protocol
+ raise cls.skipException(msg)
+ super(ShareSnapshotIpRulesForNFSTest, cls).resource_setup()
+
+ cls.access_type = "ip"
+
+ @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
+ @ddt.data("1.1.1.1", "1.2.3.4/32")
+ def test_create_delete_access_rules(self, access_to):
+ self._test_create_delete_access_rules(access_to)
+
+
+@base.skip_if_microversion_lt("2.32")
+@testtools.skipUnless(CONF.share.run_mount_snapshot_tests,
+ 'Mountable snapshots tests are disabled.')
+@ddt.ddt
+class ShareSnapshotUserRulesForCIFSTest(BaseShareSnapshotRulesTest):
+ protocol = "cifs"
+
+ @classmethod
+ def resource_setup(cls):
+ if not (cls.protocol in CONF.share.enable_protocols and
+ cls.protocol in CONF.share.enable_user_rules_for_protocols):
+ msg = ("User rule tests for %s protocol are "
+ "disabled." % cls.protocol)
+ raise cls.skipException(msg)
+ super(ShareSnapshotUserRulesForCIFSTest, cls).resource_setup()
+
+ cls.access_type = "user"
+
+ @tc.attr(base.TAG_POSITIVE, base.TAG_BACKEND)
+ def test_create_delete_access_rules(self):
+ access_to = CONF.share.username_for_user_rules
+ self._test_create_delete_access_rules(access_to)
diff --git a/manila_tempest_tests/tests/api/test_snapshot_rules_negative.py b/manila_tempest_tests/tests/api/test_snapshot_rules_negative.py
new file mode 100644
index 0000000..9f48b73
--- /dev/null
+++ b/manila_tempest_tests/tests/api/test_snapshot_rules_negative.py
@@ -0,0 +1,90 @@
+# Copyright 2016 Hitachi Data Systems
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import ddt
+from tempest import config
+from tempest.lib import exceptions as lib_exc
+import testtools
+from testtools import testcase as tc
+
+from manila_tempest_tests.tests.api import base
+from manila_tempest_tests.tests.api import test_snapshot_rules
+
+CONF = config.CONF
+
+
+@base.skip_if_microversion_lt("2.32")
+@testtools.skipUnless(CONF.share.run_mount_snapshot_tests and
+ CONF.share.run_snapshot_tests,
+ 'Mountable snapshots tests are disabled.')
+@ddt.ddt
+class SnapshotIpRulesForNFSNegativeTest(
+ test_snapshot_rules.BaseShareSnapshotRulesTest):
+ protocol = "nfs"
+
+ @classmethod
+ def resource_setup(cls):
+ if not (cls.protocol in CONF.share.enable_protocols and
+ cls.protocol in CONF.share.enable_ip_rules_for_protocols):
+ msg = "IP rule tests for %s protocol are disabled." % cls.protocol
+ raise cls.skipException(msg)
+ super(SnapshotIpRulesForNFSNegativeTest, cls).resource_setup()
+
+ # create share
+ cls.share = cls.create_share(cls.protocol)
+ cls.snap = cls.create_snapshot_wait_for_active(cls.share["id"])
+
+ @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+ @ddt.data("1.2.3.256", "1.1.1.-", "1.2.3.4/33", "1.2.3.*", "1.2.3.*/23",
+ "1.2.3.1|23", "1.2.3.1/", "1.2.3.1/-1", "fe00::1",
+ "fe80::217:f2ff:fe07:ed62", "2001:db8::/48", "::1/128",
+ "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff",
+ "2001:0db8:0000:85a3:0000:0000:ac1f:8001")
+ def test_create_access_rule_ip_with_wrong_target(self, target):
+ self.assertRaises(lib_exc.BadRequest,
+ self.shares_v2_client.create_snapshot_access_rule,
+ self.snap["id"], "ip", target)
+
+ @tc.attr(base.TAG_NEGATIVE, base.TAG_API_WITH_BACKEND)
+ def test_create_duplicate_of_ip_rule(self):
+ self._test_duplicate_rules()
+ self._test_duplicate_rules()
+
+ def _test_duplicate_rules(self):
+ # test data
+ access_type = "ip"
+ access_to = "1.2.3.4"
+
+ # create rule
+ rule = self.shares_v2_client.create_snapshot_access_rule(
+ self.snap['id'], access_type, access_to)
+
+ self.shares_v2_client.wait_for_snapshot_access_rule_status(
+ self.snap['id'], rule['id'])
+
+ # try create duplicate of rule
+ self.assertRaises(lib_exc.BadRequest,
+ self.shares_v2_client.create_snapshot_access_rule,
+ self.snap["id"], access_type, access_to)
+
+ # delete rule and wait for deletion
+ self.shares_v2_client.delete_snapshot_access_rule(self.snap['id'],
+ rule['id'])
+ self.shares_v2_client.wait_for_snapshot_access_rule_deletion(
+ self.snap['id'], rule['id'])
+
+ self.assertRaises(lib_exc.NotFound,
+ self.shares_v2_client.delete_snapshot_access_rule,
+ self.snap['id'], rule['id'])