Handle manage/unmanage for replicated shares

Managing a share with a share type that has replication_type
extra_spec must be allowed. Drivers are expected to fail
this operation if the share was part of a replication relationship
that Manila does not know about.

Unmanaging a share with replicas must not be permitted
until all replicas are removed.

Managing and unmanaging of snapshots must not
be permitted for a share that has replicas.

Modify the NetApp driver for manage_existing to check
for existing replicas.

Also fix issue with manage retry where the share
data was being altered inappropriately by a DB API.

Closes-Bug: #1561641
Closes-Bug: #1565903

Co-Authored-By: Goutham Pacha Ravi <gouthamr@netapp.com>

Change-Id: I82f1fef1e30114e017efd00fa7da70aceecab94c
diff --git a/manila_tempest_tests/tests/api/admin/test_replication_actions.py b/manila_tempest_tests/tests/api/admin/test_replication_actions.py
new file mode 100644
index 0000000..6ca2f0c
--- /dev/null
+++ b/manila_tempest_tests/tests/api/admin/test_replication_actions.py
@@ -0,0 +1,189 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from tempest import config
+from tempest.lib.common.utils import data_utils
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+import testtools
+
+from manila_tempest_tests import clients_share as clients
+from manila_tempest_tests.common import constants
+from manila_tempest_tests import share_exceptions
+from manila_tempest_tests.tests.api import base
+
+CONF = config.CONF
+_MIN_SUPPORTED_MICROVERSION = '2.11'
+
+
+@testtools.skipUnless(CONF.share.run_replication_tests,
+                      'Replication tests are disabled.')
+@testtools.skipIf(
+    CONF.share.multitenancy_enabled,
+    "Only for driver_handles_share_servers = False driver mode.")
+@base.skip_if_microversion_lt(_MIN_SUPPORTED_MICROVERSION)
+class ReplicationAdminTest(base.BaseSharesAdminTest):
+
+    @classmethod
+    def resource_setup(cls):
+        super(ReplicationAdminTest, cls).resource_setup()
+        # Create share_type
+        name = data_utils.rand_name(constants.TEMPEST_MANILA_PREFIX)
+        cls.admin_client = clients.AdminManager().shares_v2_client
+        cls.replication_type = CONF.share.backend_replication_type
+
+        if cls.replication_type not in constants.REPLICATION_TYPE_CHOICES:
+            raise share_exceptions.ShareReplicationTypeException(
+                replication_type=cls.replication_type
+            )
+        cls.zones = cls.get_availability_zones(client=cls.admin_client)
+        cls.share_zone = cls.zones[0]
+        cls.replica_zone = cls.zones[-1]
+
+        cls.extra_specs = cls.add_required_extra_specs_to_dict(
+            {"replication_type": cls.replication_type})
+        share_type = cls.create_share_type(
+            name,
+            cleanup_in_class=True,
+            extra_specs=cls.extra_specs,
+            client=cls.admin_client)
+        cls.share_type = share_type["share_type"]
+        # Create share with above share_type
+        cls.share = cls.create_share(size=2,
+                                     share_type_id=cls.share_type["id"],
+                                     availability_zone=cls.share_zone,)
+        cls.replica = cls.shares_v2_client.list_share_replicas(
+            share_id=cls.share['id'])[0]
+
+    @test.attr(type=["gate"])
+    @testtools.skipUnless(CONF.share.run_extend_tests,
+                          'Extend share tests are disabled.')
+    def test_extend_replicated_share(self):
+        # Test extend share
+        new_size = self.share["size"] + 1
+        self.shares_v2_client.extend_share(self.share["id"], new_size)
+        self.shares_v2_client.wait_for_share_status(self.share["id"],
+                                                    "available")
+        share = self.shares_v2_client.get_share(self.share["id"])
+        self.assertEqual(new_size, int(share["size"]))
+
+    @test.attr(type=["gate"])
+    @testtools.skipUnless(CONF.share.run_shrink_tests,
+                          'Shrink share tests are disabled.')
+    def test_shrink_replicated_share(self):
+        share = self.shares_v2_client.get_share(self.share["id"])
+        new_size = self.share["size"] - 1
+        self.shares_v2_client.shrink_share(self.share["id"], new_size)
+        self.shares_v2_client.wait_for_share_status(share["id"], "available")
+        shrink_share = self.shares_v2_client.get_share(self.share["id"])
+        self.assertEqual(new_size, int(shrink_share["size"]))
+
+    @test.attr(type=["gate", "positive"])
+    @testtools.skipUnless(CONF.share.run_manage_unmanage_tests,
+                          'Manage/Unmanage Tests are disabled.')
+    def test_manage_share_for_replication_type(self):
+        """Manage a share with replication share type."""
+        # Create a share and unmanage it
+        share = self.create_share(size=2,
+                                  share_type_id=self.share_type["id"],
+                                  availability_zone=self.share_zone,
+                                  cleanup_in_class=True)
+        share = self.shares_v2_client.get_share(share["id"])
+        export_locations = self.shares_v2_client.list_share_export_locations(
+            share["id"])
+        export_path = export_locations[0]['path']
+
+        self.shares_v2_client.unmanage_share(share['id'])
+        self.shares_v2_client.wait_for_resource_deletion(share_id=share['id'])
+
+        # Manage the previously unmanaged share
+        managed_share = self.shares_v2_client.manage_share(
+            share['host'], share['share_proto'],
+            export_path, self.share_type['id'])
+        self.shares_v2_client.wait_for_share_status(
+            managed_share['id'], 'available')
+
+        # Add managed share to cleanup queue
+        self.method_resources.insert(
+            0, {'type': 'share', 'id': managed_share['id'],
+                'client': self.shares_v2_client})
+
+        # Make sure a replica can be added to newly managed share
+        self.create_share_replica(managed_share['id'], self.replica_zone,
+                                  cleanup=True)
+
+    @test.attr(type=["gate", "negative"])
+    @testtools.skipUnless(CONF.share.run_manage_unmanage_tests,
+                          'Manage/Unmanage Tests are disabled.')
+    def test_unmanage_replicated_share_with_replica(self):
+        """Try to unmanage a share having replica."""
+        # Create a share replica before unmanaging the share
+        self.create_share_replica(self.share["id"], self.replica_zone,
+                                  cleanup=True)
+        self.assertRaises(
+            lib_exc.Conflict,
+            self.shares_v2_client.unmanage_share,
+            share_id=self.share['id'])
+
+    @test.attr(type=["gate", "positive"])
+    @testtools.skipUnless(CONF.share.run_manage_unmanage_tests,
+                          'Manage/Unmanage Tests are disabled.')
+    def test_unmanage_replicated_share_with_no_replica(self):
+        """Unmanage a replication type share that does not have replica."""
+        share = self.create_share(size=2,
+                                  share_type_id=self.share_type["id"],
+                                  availability_zone=self.share_zone,)
+        self.shares_v2_client.unmanage_share(share['id'])
+        self.shares_v2_client.wait_for_resource_deletion(share_id=share['id'])
+
+    @test.attr(type=["gate", "negative"])
+    @testtools.skipUnless(CONF.share.run_manage_unmanage_snapshot_tests,
+                          'Manage/Unmanage Snapshot Tests are disabled.')
+    def test_manage_replicated_share_snapshot(self):
+        """Try to manage a snapshot of the replicated."""
+        # Create a share replica before managing the snapshot
+        self.create_share_replica(self.share["id"], self.replica_zone,
+                                  cleanup=True)
+        self.assertRaises(
+            lib_exc.Conflict,
+            self.shares_v2_client.manage_snapshot,
+            share_id=self.share['id'],
+            provider_location="127.0.0.1:/fake_provider_location/"
+                              "manila_share_9dc61f49_fbc8_48d7_9337_2f9593d9")
+
+    @test.attr(type=["gate", "negative"])
+    @testtools.skipUnless(CONF.share.run_manage_unmanage_snapshot_tests,
+                          'Manage/Unmanage Snapshot Tests are disabled.')
+    def test_unmanage_replicated_share_snapshot(self):
+        """Try to unmanage a snapshot of the replicated share with replica."""
+        # Create a share replica before unmanaging the snapshot
+        self.create_share_replica(self.share["id"], self.replica_zone,
+                                  cleanup=True)
+        snapshot = self.create_snapshot_wait_for_active(self.share["id"])
+        self.assertRaises(
+            lib_exc.Conflict,
+            self.shares_v2_client.unmanage_snapshot,
+            snapshot_id=snapshot['id'])
+
+    @test.attr(type=["gate", "positive"])
+    @testtools.skipUnless(CONF.share.run_manage_unmanage_snapshot_tests,
+                          'Manage/Unmanage Snapshot Tests are disabled.')
+    def test_unmanage_replicated_share_snapshot_with_no_replica(self):
+        """Unmanage a snapshot of the replicated share with no replica."""
+        share = self.create_share(size=2,
+                                  share_type_id=self.share_type["id"],
+                                  availability_zone=self.share_zone,)
+
+        snapshot = self.create_snapshot_wait_for_active(share["id"])
+        self.shares_v2_client.unmanage_snapshot(snapshot_id=snapshot['id'])
+        self.shares_v2_client.wait_for_resource_deletion(
+            snapshot_id=snapshot['id'])