diff --git a/manila_tempest_tests/common/__init__.py b/manila_tempest_tests/common/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/manila_tempest_tests/common/__init__.py
diff --git a/manila_tempest_tests/common/constants.py b/manila_tempest_tests/common/constants.py
new file mode 100644
index 0000000..bef35a5
--- /dev/null
+++ b/manila_tempest_tests/common/constants.py
@@ -0,0 +1,36 @@
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+STATUS_ERROR = 'error'
+STATUS_AVAILABLE = 'available'
+STATUS_ERROR_DELETING = 'error_deleting'
+
+TEMPEST_MANILA_PREFIX = 'tempest-manila'
+REPLICATION_STYLE_READABLE = 'readable'
+REPLICATION_STYLE_WRITABLE = 'writable'
+REPLICATION_STYLE_DR = 'dr'
+REPLICATION_TYPE_CHOICES = (
+    REPLICATION_STYLE_READABLE,
+    REPLICATION_STYLE_WRITABLE,
+    REPLICATION_STYLE_DR,
+)
+REPLICATION_PROMOTION_CHOICES = (
+    REPLICATION_STYLE_READABLE,
+    REPLICATION_STYLE_DR,
+)
+REPLICATION_STATE_ACTIVE = 'active'
+REPLICATION_STATE_IN_SYNC = 'in_sync'
+REPLICATION_STATE_OUT_OF_SYNC = 'out_of_sync'
+
+RULE_STATE_ACTIVE = 'active'
+RULE_STATE_OUT_OF_SYNC = 'out_of_sync'
+RULE_STATE_ERROR = 'error'
diff --git a/manila_tempest_tests/config.py b/manila_tempest_tests/config.py
index 5d935c2..94ffb5f 100644
--- a/manila_tempest_tests/config.py
+++ b/manila_tempest_tests/config.py
@@ -156,6 +156,11 @@
                 help="Defines whether to run consistency group tests or not. "
                      "Disable this feature if used driver doesn't support "
                      "it."),
+    cfg.BoolOpt("run_replication_tests",
+                default=False,
+                help="Defines whether to run replication tests or not. "
+                     "Enable this feature if the driver is configured "
+                     "for replication."),
     cfg.BoolOpt("run_migration_tests",
                 default=False,
                 help="Enable or disable migration tests."),
@@ -188,4 +193,8 @@
                     "timing out (seconds)."),
     cfg.StrOpt("default_share_type_name",
                help="Default share type name to use in tempest tests."),
+    cfg.StrOpt("backend_replication_type",
+               default='none',
+               choices=['none', 'writable', 'readable', 'dr'],
+               help="Specify the replication type supported by the backend."),
 ]
diff --git a/manila_tempest_tests/services/share/v2/json/shares_client.py b/manila_tempest_tests/services/share/v2/json/shares_client.py
index 4ce74eb..3b45bf3 100644
--- a/manila_tempest_tests/services/share/v2/json/shares_client.py
+++ b/manila_tempest_tests/services/share/v2/json/shares_client.py
@@ -21,6 +21,7 @@
 from tempest.lib.common.utils import data_utils
 from tempest.lib import exceptions
 
+from manila_tempest_tests.common import constants
 from manila_tempest_tests.services.share.json import shares_client
 from manila_tempest_tests import share_exceptions
 from manila_tempest_tests import utils
@@ -177,6 +178,9 @@
         elif "cgsnapshot_id" in kwargs:
             return self._is_resource_deleted(
                 self.get_cgsnapshot, kwargs.get("cgsnapshot_id"))
+        elif "replica_id" in kwargs:
+            return self._is_resource_deleted(
+                self.get_share_replica, kwargs.get("replica_id"))
         else:
             return super(SharesV2Client, self).is_resource_deleted(
                 *args, **kwargs)
@@ -1034,3 +1038,173 @@
                                'status': status,
                            })
                 raise exceptions.TimeoutException(message)
+
+################
+
+    def create_share_replica(self, share_id, availability_zone=None,
+                             version=LATEST_MICROVERSION):
+        """Add a share replica of an existing share."""
+        uri = "share-replicas"
+        post_body = {
+            'share_id': share_id,
+            'availability_zone': availability_zone,
+        }
+
+        body = json.dumps({'share_replica': post_body})
+        resp, body = self.post(uri, body,
+                               headers=EXPERIMENTAL,
+                               extra_headers=True,
+                               version=version)
+        self.expected_success(202, resp.status)
+        return self._parse_resp(body)
+
+    def get_share_replica(self, replica_id, version=LATEST_MICROVERSION):
+        """Get the details of share_replica."""
+        resp, body = self.get("share-replicas/%s" % replica_id,
+                              headers=EXPERIMENTAL,
+                              extra_headers=True,
+                              version=version)
+        self.expected_success(200, resp.status)
+        return self._parse_resp(body)
+
+    def list_share_replicas(self, share_id=None, version=LATEST_MICROVERSION):
+        """Get list of replicas."""
+        uri = "share-replicas/detail"
+        uri += ("?share_id=%s" % share_id) if share_id is not None else ''
+        resp, body = self.get(uri, headers=EXPERIMENTAL,
+                              extra_headers=True, version=version)
+        self.expected_success(200, resp.status)
+        return self._parse_resp(body)
+
+    def list_share_replicas_summary(self, share_id=None,
+                                    version=LATEST_MICROVERSION):
+        """Get summary list of replicas."""
+        uri = "share-replicas"
+        uri += ("?share_id=%s" % share_id) if share_id is not None else ''
+        resp, body = self.get(uri, headers=EXPERIMENTAL,
+                              extra_headers=True, version=version)
+        self.expected_success(200, resp.status)
+        return self._parse_resp(body)
+
+    def delete_share_replica(self, replica_id, version=LATEST_MICROVERSION):
+        """Delete share_replica."""
+        uri = "share-replicas/%s" % replica_id
+        resp, body = self.delete(uri,
+                                 headers=EXPERIMENTAL,
+                                 extra_headers=True,
+                                 version=version)
+        self.expected_success(202, resp.status)
+        return body
+
+    def promote_share_replica(self, replica_id, expected_status=202,
+                              version=LATEST_MICROVERSION):
+        """Promote a share replica to active state."""
+        uri = "share-replicas/%s/action" % replica_id
+        post_body = {
+            'promote': None,
+        }
+        body = json.dumps(post_body)
+        resp, body = self.post(uri, body,
+                               headers=EXPERIMENTAL,
+                               extra_headers=True,
+                               version=version)
+        self.expected_success(expected_status, resp.status)
+        return self._parse_resp(body)
+
+    def wait_for_share_replica_status(self, replica_id, expected_status,
+                                      status_attr='status'):
+        """Waits for a replica's status_attr to reach a given status."""
+        body = self.get_share_replica(replica_id)
+        replica_status = body[status_attr]
+        start = int(time.time())
+
+        while replica_status != expected_status:
+            time.sleep(self.build_interval)
+            body = self.get_share_replica(replica_id)
+            replica_status = body[status_attr]
+            if replica_status == expected_status:
+                return
+            if ('error' in replica_status
+                    and expected_status != constants.STATUS_ERROR):
+                raise share_exceptions.ShareInstanceBuildErrorException(
+                    id=replica_id)
+
+            if int(time.time()) - start >= self.build_timeout:
+                message = ('The %(status_attr)s of Replica %(id)s failed to '
+                           'reach %(expected_status)s status within the '
+                           'required time (%(time)ss). Current '
+                           '%(status_attr)s: %(current_status)s.' %
+                           {
+                               'status_attr': status_attr,
+                               'expected_status': expected_status,
+                               'time': self.build_timeout,
+                               'id': replica_id,
+                               'current_status': replica_status,
+                           })
+                raise exceptions.TimeoutException(message)
+
+    def reset_share_replica_status(self, replica_id,
+                                   status=constants.STATUS_AVAILABLE,
+                                   version=LATEST_MICROVERSION):
+        """Reset the status."""
+        uri = 'share-replicas/%s/action' % replica_id
+        post_body = {
+            'reset_status': {
+                'status': status
+            }
+        }
+        body = json.dumps(post_body)
+        resp, body = self.post(uri, body,
+                               headers=EXPERIMENTAL,
+                               extra_headers=True,
+                               version=version)
+        self.expected_success(202, resp.status)
+        return self._parse_resp(body)
+
+    def reset_share_replica_state(self, replica_id,
+                                  state=constants.REPLICATION_STATE_ACTIVE,
+                                  version=LATEST_MICROVERSION):
+        """Reset the replication state of a replica."""
+        uri = 'share-replicas/%s/action' % replica_id
+        post_body = {
+            'reset_replica_state': {
+                'replica_state': state
+            }
+        }
+        body = json.dumps(post_body)
+        resp, body = self.post(uri, body,
+                               headers=EXPERIMENTAL,
+                               extra_headers=True,
+                               version=version)
+        self.expected_success(202, resp.status)
+        return self._parse_resp(body)
+
+    def resync_share_replica(self, replica_id, expected_result=202,
+                             version=LATEST_MICROVERSION):
+        """Force an immediate resync of the replica."""
+        uri = 'share-replicas/%s/action' % replica_id
+        post_body = {
+            'resync': None
+        }
+        body = json.dumps(post_body)
+        resp, body = self.post(uri, body,
+                               headers=EXPERIMENTAL,
+                               extra_headers=True,
+                               version=version)
+        self.expected_success(expected_result, resp.status)
+        return self._parse_resp(body)
+
+    def force_delete_share_replica(self, replica_id,
+                                   version=LATEST_MICROVERSION):
+        """Force delete a replica."""
+        uri = 'share-replicas/%s/action' % replica_id
+        post_body = {
+            'force_delete': None
+        }
+        body = json.dumps(post_body)
+        resp, body = self.post(uri, body,
+                               headers=EXPERIMENTAL,
+                               extra_headers=True,
+                               version=version)
+        self.expected_success(202, resp.status)
+        return self._parse_resp(body)
diff --git a/manila_tempest_tests/share_exceptions.py b/manila_tempest_tests/share_exceptions.py
index 505b385..3a11531 100644
--- a/manila_tempest_tests/share_exceptions.py
+++ b/manila_tempest_tests/share_exceptions.py
@@ -65,3 +65,8 @@
 
 class ResourceReleaseFailed(exceptions.TempestException):
     message = "Failed to release resource '%(res_type)s' with id '%(res_id)s'."
+
+
+class ShareReplicationTypeException(exceptions.TempestException):
+    message = ("Option backend_replication_type is set to incorrect value: "
+               "%(replication_type)s")
diff --git a/manila_tempest_tests/tests/api/admin/test_replication.py b/manila_tempest_tests/tests/api/admin/test_replication.py
new file mode 100644
index 0000000..605656c
--- /dev/null
+++ b/manila_tempest_tests/tests/api/admin/test_replication.py
@@ -0,0 +1,173 @@
+# Copyright 2015 Yogesh Kshirsagar
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from tempest import config
+from tempest.lib.common.utils import data_utils
+from tempest import test
+import testtools
+
+from manila_tempest_tests import clients_share as clients
+from manila_tempest_tests.common import constants
+from manila_tempest_tests import share_exceptions
+from manila_tempest_tests.tests.api import base
+
+CONF = config.CONF
+_MIN_SUPPORTED_MICROVERSION = '2.11'
+
+
+@testtools.skipUnless(CONF.share.run_replication_tests,
+                      'Replication tests are disabled.')
+@base.skip_if_microversion_lt(_MIN_SUPPORTED_MICROVERSION)
+class ReplicationAdminTest(base.BaseSharesAdminTest):
+
+    @classmethod
+    def resource_setup(cls):
+        super(ReplicationAdminTest, cls).resource_setup()
+        # Create share_type
+        name = data_utils.rand_name(constants.TEMPEST_MANILA_PREFIX)
+        cls.admin_client = clients.AdminManager().shares_v2_client
+        cls.replication_type = CONF.share.backend_replication_type
+
+        if cls.replication_type not in constants.REPLICATION_TYPE_CHOICES:
+            raise share_exceptions.ShareReplicationTypeException(
+                replication_type=cls.replication_type
+            )
+        cls.zones = cls.get_availability_zones(client=cls.admin_client)
+        cls.share_zone = cls.zones[0]
+        cls.replica_zone = cls.zones[-1]
+
+        cls.extra_specs = cls.add_required_extra_specs_to_dict(
+            {"replication_type": cls.replication_type})
+        share_type = cls.create_share_type(
+            name,
+            extra_specs=cls.extra_specs,
+            client=cls.admin_client)
+        cls.share_type = share_type["share_type"]
+        # Create share with above share_type
+        cls.share = cls.create_share(share_type_id=cls.share_type["id"],
+                                     availability_zone=cls.share_zone,)
+        cls.replica = cls.shares_v2_client.list_share_replicas(
+            share_id=cls.share['id'])[0]
+
+    @staticmethod
+    def _filter_share_replica_list(replica_list, r_state):
+        # Iterate through replica list to filter based on replica_state
+        return [replica['id'] for replica in replica_list
+                if replica['replica_state'] == r_state]
+
+    @test.attr(type=["gate", ])
+    def test_promote_out_of_sync_share_replica(self):
+        """Test promote 'out_of_sync' share replica to active state."""
+        if (self.replication_type
+                not in constants.REPLICATION_PROMOTION_CHOICES):
+            msg = "Option backend_replication_type should be one of (%s)!"
+            raise self.skipException(
+                msg % ','.join(constants.REPLICATION_PROMOTION_CHOICES))
+        share = self.create_share(share_type_id=self.share_type['id'])
+        original_replica = self.shares_v2_client.list_share_replicas(
+            share_id=share['id'])[0]
+
+        # NOTE(Yogi1): Cleanup needs to be disabled for replica that is
+        # being promoted since it will become the 'primary'/'active' replica.
+        replica = self.create_share_replica(share["id"], self.replica_zone,
+                                            cleanup=False)
+
+        # List replicas
+        replica_list = self.admin_client.list_share_replicas(
+            share_id=share['id'])
+
+        # Check if there is only 1 'active' replica before promotion.
+        active_replicas = self._filter_share_replica_list(
+            replica_list, constants.REPLICATION_STATE_ACTIVE)
+        self.assertEqual(1, len(active_replicas))
+
+        # Set replica_state to 'out_of_sync'
+        self.admin_client.reset_share_replica_state(
+            replica['id'], constants.REPLICATION_STATE_OUT_OF_SYNC)
+        self.shares_v2_client.wait_for_share_replica_status(
+            replica['id'], constants.REPLICATION_STATE_OUT_OF_SYNC,
+            status_attr='replica_state')
+
+        # Promote 'out_of_sync' replica to 'active' state.
+        self.promote_share_replica(replica['id'], self.admin_client)
+        # Original replica will need to be cleaned up before the promoted
+        # replica can be deleted.
+        self.addCleanup(self.delete_share_replica, original_replica['id'])
+
+        # Check if there is still only 1 'active' replica after promotion.
+        replica_list = self.shares_v2_client.list_share_replicas(
+            share_id=self.share["id"])
+        new_active_replicas = self._filter_share_replica_list(
+            replica_list, constants.REPLICATION_STATE_ACTIVE)
+        self.assertEqual(1, len(new_active_replicas))
+
+    @test.attr(type=["gate", ])
+    def test_force_delete_share_replica(self):
+        """Test force deleting a replica that is in 'error_deleting' status."""
+        replica = self.create_share_replica(self.share['id'],
+                                            self.replica_zone,
+                                            cleanup_in_class=False)
+        self.admin_client.reset_share_replica_status(
+            replica['id'], constants.STATUS_ERROR_DELETING)
+        self.shares_v2_client.wait_for_share_replica_status(
+            replica['id'], constants.STATUS_ERROR_DELETING)
+        self.admin_client.force_delete_share_replica(replica['id'])
+        self.shares_v2_client.wait_for_resource_deletion(
+            replica_id=replica['id'])
+
+    @test.attr(type=["gate", ])
+    def test_reset_share_replica_status(self):
+        """Test resetting a replica's 'status' attribute."""
+        replica = self.create_share_replica(self.share['id'],
+                                            self.replica_zone,
+                                            cleanup_in_class=False)
+        self.admin_client.reset_share_replica_status(replica['id'],
+                                                     constants.STATUS_ERROR)
+        self.shares_v2_client.wait_for_share_replica_status(
+            replica['id'], constants.STATUS_ERROR)
+
+    @test.attr(type=["gate", ])
+    def test_reset_share_replica_state(self):
+        """Test resetting a replica's 'replica_state' attribute."""
+        replica = self.create_share_replica(self.share['id'],
+                                            self.replica_zone,
+                                            cleanup_in_class=False)
+        self.admin_client.reset_share_replica_state(replica['id'],
+                                                    constants.STATUS_ERROR)
+        self.shares_v2_client.wait_for_share_replica_status(
+            replica['id'], constants.STATUS_ERROR, status_attr='replica_state')
+
+    @test.attr(type=["gate", ])
+    def test_resync_share_replica(self):
+        """Test resyncing a replica."""
+        replica = self.create_share_replica(self.share['id'],
+                                            self.replica_zone,
+                                            cleanup_in_class=False)
+        self.shares_v2_client.wait_for_share_replica_status(
+            replica['id'], constants.REPLICATION_STATE_IN_SYNC,
+            status_attr='replica_state')
+
+        # Set replica_state to 'out_of_sync'.
+        self.admin_client.reset_share_replica_state(
+            replica['id'], constants.REPLICATION_STATE_OUT_OF_SYNC)
+        self.shares_v2_client.wait_for_share_replica_status(
+            replica['id'], constants.REPLICATION_STATE_OUT_OF_SYNC,
+            status_attr='replica_state')
+
+        # Attempt resync
+        self.admin_client.resync_share_replica(replica['id'])
+        self.shares_v2_client.wait_for_share_replica_status(
+            replica['id'], constants.REPLICATION_STATE_IN_SYNC,
+            status_attr='replica_state')
diff --git a/manila_tempest_tests/tests/api/base.py b/manila_tempest_tests/tests/api/base.py
index 0eae2ad..0395251 100644
--- a/manila_tempest_tests/tests/api/base.py
+++ b/manila_tempest_tests/tests/api/base.py
@@ -28,6 +28,7 @@
 from tempest import test
 
 from manila_tempest_tests import clients_share as clients
+from manila_tempest_tests.common import constants
 from manila_tempest_tests import share_exceptions
 from manila_tempest_tests import utils
 
@@ -508,6 +509,56 @@
         return cgsnapshot
 
     @classmethod
+    def get_availability_zones(cls, client=None):
+        """List the availability zones for "manila-share" services
+
+         that are currently in "up" state.
+         """
+        client = client or cls.shares_v2_client
+        cls.services = client.list_services()
+        zones = [service['zone'] for service in cls.services if
+                 service['binary'] == "manila-share" and
+                 service['state'] == 'up']
+        return zones
+
+    @classmethod
+    def create_share_replica(cls, share_id, availability_zone, client=None,
+                             cleanup_in_class=False, cleanup=True):
+        client = client or cls.shares_v2_client
+        replica = client.create_share_replica(share_id, availability_zone)
+        resource = {
+            "type": "share_replica",
+            "id": replica["id"],
+            "client": client,
+            "share_id": share_id,
+        }
+        # NOTE(Yogi1): Cleanup needs to be disabled during promotion tests.
+        if cleanup:
+            if cleanup_in_class:
+                cls.class_resources.insert(0, resource)
+            else:
+                cls.method_resources.insert(0, resource)
+        client.wait_for_share_replica_status(
+            replica["id"], constants.STATUS_AVAILABLE)
+        return replica
+
+    @classmethod
+    def delete_share_replica(cls, replica_id, client=None):
+        client = client or cls.shares_v2_client
+        client.delete_share_replica(replica_id)
+        client.wait_for_resource_deletion(replica_id=replica_id)
+
+    @classmethod
+    def promote_share_replica(cls, replica_id, client=None):
+        client = client or cls.shares_v2_client
+        replica = client.promote_share_replica(replica_id)
+        client.wait_for_share_replica_status(
+            replica["id"],
+            constants.REPLICATION_STATE_ACTIVE,
+            status_attr="replica_state")
+        return replica
+
+    @classmethod
     def create_share_network(cls, client=None,
                              cleanup_in_class=False, **kwargs):
         if client is None:
@@ -632,6 +683,9 @@
                     elif res["type"] is "cgsnapshot":
                         client.delete_cgsnapshot(res_id)
                         client.wait_for_resource_deletion(cgsnapshot_id=res_id)
+                    elif res["type"] is "share_replica":
+                        client.delete_share_replica(res_id)
+                        client.wait_for_resource_deletion(replica_id=res_id)
                     else:
                         LOG.warning("Provided unsupported resource type for "
                                     "cleanup '%s'. Skipping." % res["type"])
diff --git a/manila_tempest_tests/tests/api/test_replication.py b/manila_tempest_tests/tests/api/test_replication.py
new file mode 100644
index 0000000..4f4268c
--- /dev/null
+++ b/manila_tempest_tests/tests/api/test_replication.py
@@ -0,0 +1,406 @@
+# Copyright 2015 Yogesh Kshirsagar
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from tempest import config
+from tempest.lib.common.utils import data_utils
+from tempest import test
+import testtools
+
+from manila_tempest_tests import clients_share as clients
+from manila_tempest_tests.common import constants
+from manila_tempest_tests import share_exceptions
+from manila_tempest_tests.tests.api import base
+from manila_tempest_tests import utils
+
+CONF = config.CONF
+_MIN_SUPPORTED_MICROVERSION = '2.11'
+SUMMARY_KEYS = ['share_id', 'id', 'replica_state', 'status']
+DETAIL_KEYS = SUMMARY_KEYS + ['availability_zone', 'host', 'updated_at',
+                              'share_network_id', 'created_at']
+
+
+@testtools.skipUnless(CONF.share.run_replication_tests,
+                      'Replication tests are disabled.')
+@base.skip_if_microversion_lt(_MIN_SUPPORTED_MICROVERSION)
+class ReplicationTest(base.BaseSharesTest):
+
+    @classmethod
+    def resource_setup(cls):
+        super(ReplicationTest, cls).resource_setup()
+        # Create share_type
+        name = data_utils.rand_name(constants.TEMPEST_MANILA_PREFIX)
+        cls.admin_client = clients.AdminManager().shares_v2_client
+        cls.replication_type = CONF.share.backend_replication_type
+
+        if cls.replication_type not in constants.REPLICATION_TYPE_CHOICES:
+            raise share_exceptions.ShareReplicationTypeException(
+                replication_type=cls.replication_type
+            )
+        cls.zones = cls.get_availability_zones(client=cls.admin_client)
+        cls.share_zone = cls.zones[0]
+        cls.replica_zone = cls.zones[-1]
+
+        cls.extra_specs = cls.add_required_extra_specs_to_dict(
+            {"replication_type": cls.replication_type})
+        share_type = cls.create_share_type(
+            name,
+            extra_specs=cls.extra_specs,
+            client=cls.admin_client)
+        cls.share_type = share_type["share_type"]
+        # Create share with above share_type
+        cls.creation_data = {'kwargs': {
+            'share_type_id': cls.share_type['id'],
+            'availability_zone': cls.share_zone,
+        }}
+
+        # Data for creating shares in parallel
+        data = [cls.creation_data, cls.creation_data]
+        cls.shares = cls.create_shares(data)
+        cls.shares = [cls.shares_v2_client.get_share(s['id']) for s in
+                      cls.shares]
+        cls.instance_id1 = cls._get_instance(cls.shares[0])
+        cls.instance_id2 = cls._get_instance(cls.shares[1])
+
+        cls.access_type = "ip"
+        cls.access_to = utils.rand_ip()
+
+    @classmethod
+    def _get_instance(cls, share):
+        share_instances = cls.admin_client.get_instances_of_share(share["id"])
+        return share_instances[0]["id"]
+
+    def _verify_create_replica(self):
+        # Create the replica
+        share_replica = self.create_share_replica(self.shares[0]["id"],
+                                                  self.replica_zone,
+                                                  cleanup_in_class=False)
+        share_replicas = self.shares_v2_client.list_share_replicas(
+            share_id=self.shares[0]["id"])
+        # Ensure replica is created successfully.
+        replica_ids = [replica["id"] for replica in share_replicas]
+        self.assertIn(share_replica["id"], replica_ids)
+        return share_replica
+
+    def _verify_active_replica_count(self, share_id):
+        # List replicas
+        replica_list = self.shares_v2_client.list_share_replicas(
+            share_id=share_id)
+
+        # Check if there is only 1 'active' replica before promotion.
+        active_replicas = self._filter_replica_list(
+            replica_list, constants.REPLICATION_STATE_ACTIVE)
+        self.assertEqual(1, len(active_replicas))
+
+    def _filter_replica_list(self, replica_list, r_state):
+        # Iterate through replica list to filter based on replica_state
+        return [replica for replica in replica_list
+                if replica['replica_state'] == r_state]
+
+    def _get_pools_for_replication_domain(self):
+        # Get the list of pools for the replication domain
+        pools = self.admin_client.list_pools(detail=True)['pools']
+        instance_host = self.shares[0]['host']
+        host_pool = [p for p in pools if p['name'] == instance_host][0]
+        rep_domain = host_pool['capabilities']['replication_domain']
+        pools_in_rep_domain = [p for p in pools if p['capabilities'][
+            'replication_domain'] == rep_domain]
+        return rep_domain, pools_in_rep_domain
+
+    def _verify_config_and_set_access_rule_data(self):
+        """Verify the access rule configuration is enabled for NFS.
+
+        Set the data after verification.
+        """
+        protocol = self.shares_v2_client.share_protocol
+
+        # TODO(Yogi1): Add access rules for other protocols.
+        if not ((protocol.lower() == 'nfs') and
+                (protocol in CONF.share.enable_ip_rules_for_protocols) and
+                CONF.share.enable_ip_rules_for_protocols):
+            message = "IP access rules are not supported for this protocol."
+            raise self.skipException(message)
+
+        access_type = "ip"
+        access_to = utils.rand_ip()
+
+        return access_type, access_to
+
+    @test.attr(type=["gate", ])
+    def test_add_delete_share_replica(self):
+        # Create the replica
+        share_replica = self._verify_create_replica()
+
+        # Delete the replica
+        self.delete_share_replica(share_replica["id"])
+
+    @test.attr(type=["gate", ])
+    def test_add_access_rule_create_replica_delete_rule(self):
+        # Add access rule to the share
+        access_type, access_to = self._verify_config_and_set_access_rule_data()
+        rule = self.shares_v2_client.create_access_rule(
+            self.shares[0]["id"], access_type, access_to, 'ro')
+        self.shares_v2_client.wait_for_access_rule_status(
+            self.shares[0]["id"], rule["id"], constants.RULE_STATE_ACTIVE)
+
+        # Create the replica
+        self._verify_create_replica()
+
+        # Verify access rule transitions to 'active' state.
+        self.shares_v2_client.wait_for_access_rule_status(
+            self.shares[0]["id"], rule["id"], constants.RULE_STATE_ACTIVE)
+
+        # Delete rule and wait for deletion
+        self.shares_v2_client.delete_access_rule(self.shares[0]["id"],
+                                                 rule["id"])
+        self.shares_v2_client.wait_for_resource_deletion(
+            rule_id=rule["id"], share_id=self.shares[0]['id'])
+
+    @test.attr(type=["gate", ])
+    def test_create_replica_add_access_rule_delete_replica(self):
+        access_type, access_to = self._verify_config_and_set_access_rule_data()
+        # Create the replica
+        share_replica = self._verify_create_replica()
+
+        # Add access rule
+        rule = self.shares_v2_client.create_access_rule(
+            self.shares[0]["id"], access_type, access_to, 'ro')
+        self.shares_v2_client.wait_for_access_rule_status(
+            self.shares[0]["id"], rule["id"], constants.RULE_STATE_ACTIVE)
+
+        # Delete the replica
+        self.delete_share_replica(share_replica["id"])
+
+    @test.attr(type=["gate", ])
+    def test_add_multiple_share_replicas(self):
+        rep_domain, pools = self._get_pools_for_replication_domain()
+        if len(pools) < 3:
+            msg = ("Replication domain %(domain)s has only %(count)s pools. "
+                   "Need at least 3 pools to run this test." %
+                   {"domain": rep_domain, "count": len(pools)})
+            raise self.skipException(msg)
+        # Add the replicas
+        share_replica1 = self.create_share_replica(self.shares[0]["id"],
+                                                   self.replica_zone,
+                                                   cleanup_in_class=False)
+        share_replica2 = self.create_share_replica(self.shares[0]["id"],
+                                                   self.replica_zone,
+                                                   cleanup_in_class=False)
+        self.shares_v2_client.get_share_replica(share_replica2['id'])
+
+        share_replicas = self.shares_v2_client.list_share_replicas(
+            share_id=self.shares[0]["id"])
+        replica_host_set = {r['host'] for r in share_replicas}
+
+        # Assert that replicas are created on different pools.
+        msg = "More than one replica is created on the same pool."
+        self.assertEqual(3, len(replica_host_set), msg)
+        # Verify replicas are in the replica list
+        replica_ids = [replica["id"] for replica in share_replicas]
+        self.assertIn(share_replica1["id"], replica_ids)
+        self.assertIn(share_replica2["id"], replica_ids)
+
+    @test.attr(type=["gate", ])
+    def test_promote_in_sync_share_replica(self):
+        # Test promote 'in_sync' share_replica to 'active' state
+        if (self.replication_type
+                not in constants.REPLICATION_PROMOTION_CHOICES):
+            msg = "Option backend_replication_type should be one of (%s)!"
+            raise self.skipException(
+                msg % ','.join(constants.REPLICATION_PROMOTION_CHOICES))
+        share = self.create_shares([self.creation_data])[0]
+        original_replica = self.shares_v2_client.list_share_replicas(
+            share["id"])[0]
+        # NOTE(Yogi1): Cleanup needs to be disabled for replica that is
+        # being promoted since it will become the 'primary'/'active' replica.
+        replica = self.create_share_replica(share["id"], self.replica_zone,
+                                            cleanup=False)
+        # Wait for replica state to update after creation
+        self.shares_v2_client.wait_for_share_replica_status(
+            replica['id'], constants.REPLICATION_STATE_IN_SYNC,
+            status_attr='replica_state')
+        # Promote the first in_sync replica to active state
+        promoted_replica = self.promote_share_replica(replica['id'])
+        # Delete the demoted replica so promoted replica can be cleaned
+        # during the cleanup of the share.
+        self.addCleanup(self.delete_share_replica, original_replica['id'])
+        self._verify_active_replica_count(share["id"])
+        # Verify the replica_state for promoted replica
+        promoted_replica = self.shares_v2_client.get_share_replica(
+            promoted_replica["id"])
+        self.assertEqual(constants.REPLICATION_STATE_ACTIVE,
+                         promoted_replica["replica_state"])
+
+    @test.attr(type=["gate", ])
+    def test_promote_and_promote_back(self):
+        # Test promote back and forth between 2 share replicas
+        if (self.replication_type
+                not in constants.REPLICATION_PROMOTION_CHOICES):
+            msg = "Option backend_replication_type should be one of (%s)!"
+            raise self.skipException(
+                msg % ','.join(constants.REPLICATION_PROMOTION_CHOICES))
+
+        # Create a new share
+        share = self.create_shares([self.creation_data])[0]
+
+        # Discover the original replica
+        initial_replicas = self.shares_v2_client.list_share_replicas(
+            share_id=share['id'])
+        self.assertEqual(1, len(initial_replicas),
+                         '%s replicas initially created for share %s' %
+                         (len(initial_replicas), share['id']))
+        original_replica = initial_replicas[0]
+
+        # Create a new replica
+        new_replica = self.create_share_replica(share["id"],
+                                                self.replica_zone,
+                                                cleanup_in_class=False)
+        self.shares_v2_client.wait_for_share_replica_status(
+            new_replica['id'], constants.REPLICATION_STATE_IN_SYNC,
+            status_attr='replica_state')
+
+        # Promote the new replica to active and verify the replica states
+        self.promote_share_replica(new_replica['id'])
+        self._verify_active_replica_count(share["id"])
+        self.shares_v2_client.wait_for_share_replica_status(
+            original_replica['id'], constants.REPLICATION_STATE_IN_SYNC,
+            status_attr='replica_state')
+
+        # Promote the original replica back to active
+        self.promote_share_replica(original_replica['id'])
+        self._verify_active_replica_count(share["id"])
+        self.shares_v2_client.wait_for_share_replica_status(
+            new_replica['id'], constants.REPLICATION_STATE_IN_SYNC,
+            status_attr='replica_state')
+
+    @test.attr(type=["gate", ])
+    def test_active_replication_state(self):
+        # Verify the replica_state of first instance is set to active.
+        replica = self.shares_v2_client.get_share_replica(self.instance_id1)
+        self.assertEqual(
+            constants.REPLICATION_STATE_ACTIVE, replica['replica_state'])
+
+
+@testtools.skipUnless(CONF.share.run_replication_tests,
+                      'Replication tests are disabled.')
+@base.skip_if_microversion_lt(_MIN_SUPPORTED_MICROVERSION)
+class ReplicationActionsTest(base.BaseSharesTest):
+
+    @classmethod
+    def resource_setup(cls):
+        super(ReplicationActionsTest, cls).resource_setup()
+        # Create share_type
+        name = data_utils.rand_name(constants.TEMPEST_MANILA_PREFIX)
+        cls.admin_client = clients.AdminManager().shares_v2_client
+        cls.replication_type = CONF.share.backend_replication_type
+
+        if cls.replication_type not in constants.REPLICATION_TYPE_CHOICES:
+            raise share_exceptions.ShareReplicationTypeException(
+                replication_type=cls.replication_type
+            )
+        cls.zones = cls.get_availability_zones(client=cls.admin_client)
+        cls.share_zone = cls.zones[0]
+        cls.replica_zone = cls.zones[-1]
+
+        cls.extra_specs = cls.add_required_extra_specs_to_dict(
+            {"replication_type": cls.replication_type})
+        share_type = cls.create_share_type(
+            name,
+            extra_specs=cls.extra_specs,
+            client=cls.admin_client)
+        cls.share_type = share_type["share_type"]
+        # Create share with above share_type
+        cls.creation_data = {'kwargs': {
+            'share_type_id': cls.share_type['id'],
+            'availability_zone': cls.share_zone,
+        }}
+
+        # Data for creating shares in parallel
+        data = [cls.creation_data, cls.creation_data]
+        cls.shares = cls.create_shares(data)
+        cls.shares = [cls.shares_v2_client.get_share(s['id']) for s in
+                      cls.shares]
+        cls.instance_id1 = cls._get_instance(cls.shares[0])
+        cls.instance_id2 = cls._get_instance(cls.shares[1])
+
+        # Create replicas to 2 shares
+        cls.replica1 = cls.create_share_replica(cls.shares[0]["id"],
+                                                cls.replica_zone,
+                                                cleanup_in_class=True)
+        cls.replica2 = cls.create_share_replica(cls.shares[1]["id"],
+                                                cls.replica_zone,
+                                                cleanup_in_class=True)
+
+    @classmethod
+    def _get_instance(cls, share):
+        share_instances = cls.admin_client.get_instances_of_share(share["id"])
+        return share_instances[0]["id"]
+
+    def _validate_replica_list(self, replica_list, detail=True):
+        # Verify keys
+        if detail:
+            keys = DETAIL_KEYS
+        else:
+            keys = SUMMARY_KEYS
+        for replica in replica_list:
+            self.assertEqual(sorted(keys), sorted(replica.keys()))
+            # Check for duplicates
+            replica_id_list = [sr["id"] for sr in replica_list
+                               if sr["id"] == replica["id"]]
+            msg = "Replica %s appears %s times in replica list." % (
+                replica['id'], len(replica_id_list))
+            self.assertEqual(1, len(replica_id_list), msg)
+
+    @test.attr(type=["gate", ])
+    def test_show_share_replica(self):
+        replica = self.shares_v2_client.get_share_replica(self.replica1["id"])
+
+        actual_keys = sorted(list(replica.keys()))
+        detail_keys = sorted(DETAIL_KEYS)
+        self.assertEqual(detail_keys, actual_keys,
+                         'Share Replica %s has incorrect keys; '
+                         'expected %s, got %s.' % (replica["id"],
+                                                   detail_keys, actual_keys))
+
+    @test.attr(type=["gate", ])
+    def test_detail_list_share_replicas_for_share(self):
+        # List replicas for share
+        replica_list = self.shares_v2_client.list_share_replicas(
+            share_id=self.shares[0]["id"])
+        replica_ids_list = [rep['id'] for rep in replica_list]
+        self.assertIn(self.replica1['id'], replica_ids_list,
+                      'Replica %s was not returned in the list of replicas: %s'
+                      % (self.replica1['id'], replica_list))
+        # Verify keys
+        self._validate_replica_list(replica_list)
+
+    @test.attr(type=["gate", ])
+    def test_detail_list_share_replicas_for_all_shares(self):
+        # List replicas for all available shares
+        replica_list = self.shares_v2_client.list_share_replicas()
+        replica_ids_list = [rep['id'] for rep in replica_list]
+        for replica in [self.replica1, self.replica2]:
+            self.assertIn(replica['id'], replica_ids_list,
+                          'Replica %s was not returned in the list of '
+                          'replicas: %s' % (replica['id'], replica_list))
+        # Verify keys
+        self._validate_replica_list(replica_list)
+
+    @test.attr(type=["gate", ])
+    def test_summary_list_share_replicas_for_all_shares(self):
+        # List replicas
+        replica_list = self.shares_v2_client.list_share_replicas_summary()
+
+        # Verify keys
+        self._validate_replica_list(replica_list, detail=False)
diff --git a/manila_tempest_tests/tests/api/test_replication_negative.py b/manila_tempest_tests/tests/api/test_replication_negative.py
new file mode 100644
index 0000000..2587dc0
--- /dev/null
+++ b/manila_tempest_tests/tests/api/test_replication_negative.py
@@ -0,0 +1,169 @@
+# Copyright 2015 Yogesh Kshirsagar
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from tempest import config
+from tempest.lib.common.utils import data_utils
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+import testtools
+
+from manila_tempest_tests import clients_share as clients
+from manila_tempest_tests.common import constants
+from manila_tempest_tests import share_exceptions
+from manila_tempest_tests.tests.api import base
+
+CONF = config.CONF
+_MIN_SUPPORTED_MICROVERSION = '2.11'
+
+
+@testtools.skipUnless(CONF.share.run_replication_tests,
+                      'Replication tests are disabled.')
+@base.skip_if_microversion_lt(_MIN_SUPPORTED_MICROVERSION)
+class ReplicationNegativeTest(base.BaseSharesTest):
+
+    @classmethod
+    def resource_setup(cls):
+        super(ReplicationNegativeTest, cls).resource_setup()
+        # Create share_type
+        name = data_utils.rand_name(constants.TEMPEST_MANILA_PREFIX)
+        cls.admin_client = clients.AdminManager().shares_v2_client
+        cls.replication_type = CONF.share.backend_replication_type
+
+        if cls.replication_type not in constants.REPLICATION_TYPE_CHOICES:
+            raise share_exceptions.ShareReplicationTypeException(
+                replication_type=cls.replication_type
+            )
+        cls.zones = cls.get_availability_zones(client=cls.admin_client)
+        cls.share_zone = cls.zones[0]
+        cls.replica_zone = cls.zones[-1]
+
+        cls.extra_specs = cls.add_required_extra_specs_to_dict(
+            {"replication_type": cls.replication_type})
+        share_type = cls.create_share_type(
+            name,
+            extra_specs=cls.extra_specs,
+            client=cls.admin_client)
+        cls.share_type = share_type["share_type"]
+        # Create share with above share_type
+        cls.share1, cls.instance_id1 = cls._create_share_get_instance()
+
+    @classmethod
+    def _create_share_get_instance(cls):
+        share = cls.create_share(share_type_id=cls.share_type["id"],
+                                 availability_zone=cls.share_zone,)
+        share_instances = cls.admin_client.get_instances_of_share(
+            share["id"], version=_MIN_SUPPORTED_MICROVERSION
+        )
+        instance_id = share_instances[0]["id"]
+        return share, instance_id
+
+    def _is_replication_type_promotable(self):
+        if (self.replication_type
+                not in constants.REPLICATION_PROMOTION_CHOICES):
+            msg = "Option backend_replication_type should be one of (%s)!"
+            raise self.skipException(
+                msg % ','.join(constants.REPLICATION_PROMOTION_CHOICES))
+
+    @test.attr(type=["gate", "negative", ])
+    def test_try_add_replica_to_share_with_no_replication_share_type(self):
+        # Create share without replication type
+        share = self.create_share()
+        self.assertRaises(lib_exc.BadRequest,
+                          self.create_share_replica,
+                          share['id'],
+                          self.replica_zone)
+
+    @test.attr(type=["gate", "negative", ])
+    def test_add_replica_to_share_with_error_state(self):
+        # Set "error" state
+        self.admin_client.reset_state(
+            self.share1['id'], constants.STATUS_ERROR)
+        self.addCleanup(self.admin_client.reset_state,
+                        self.share1['id'],
+                        constants.STATUS_AVAILABLE)
+        self.assertRaises(lib_exc.BadRequest,
+                          self.create_share_replica,
+                          self.share1['id'],
+                          self.replica_zone)
+
+    @test.attr(type=["gate", "negative", ])
+    def test_get_replica_by_nonexistent_id(self):
+        self.assertRaises(lib_exc.NotFound,
+                          self.shares_v2_client.get_share_replica,
+                          data_utils.rand_uuid())
+
+    @test.attr(type=["gate", "negative", ])
+    def test_try_delete_replica_by_nonexistent_id(self):
+        self.assertRaises(lib_exc.NotFound,
+                          self.shares_v2_client.delete_share_replica,
+                          data_utils.rand_uuid())
+
+    @test.attr(type=["gate", "negative", ])
+    def test_try_delete_last_active_replica(self):
+        self.assertRaises(lib_exc.BadRequest,
+                          self.shares_v2_client.delete_share_replica,
+                          self.instance_id1)
+
+    @test.attr(type=["gate", "negative", ])
+    def test_try_delete_share_having_replica(self):
+        self.create_share_replica(self.share1["id"], self.replica_zone,
+                                  cleanup_in_class=False)
+        self.assertRaises(lib_exc.Conflict,
+                          self.shares_v2_client.delete_share,
+                          self.share1["id"])
+
+    @test.attr(type=["negative", "gate", ])
+    def test_promote_out_of_sync_share_replica(self):
+        # Test promoting an out_of_sync share_replica to active state
+        self._is_replication_type_promotable()
+        share, instance_id = self._create_share_get_instance()
+        replica = self.create_share_replica(share["id"], self.replica_zone,
+                                            cleanup_in_class=False)
+        # Set replica state to out of sync
+        self.admin_client.reset_share_replica_state(
+            replica['id'], constants.REPLICATION_STATE_OUT_OF_SYNC)
+        self.shares_v2_client.wait_for_share_replica_status(
+            replica['id'], constants.REPLICATION_STATE_OUT_OF_SYNC,
+            status_attr='replica_state')
+        # Try promoting the first out_of_sync replica to active state
+        self.assertRaises(lib_exc.Forbidden,
+                          self.shares_v2_client.promote_share_replica,
+                          replica['id'])
+
+    @test.attr(type=["negative", "gate", ])
+    def test_promote_active_share_replica(self):
+        # Test promote active share_replica
+        self._is_replication_type_promotable()
+
+        # Try promoting the active replica
+        self.shares_v2_client.promote_share_replica(self.instance_id1,
+                                                    expected_status=200)
+
+    @test.attr(type=["negative", "gate", ])
+    def test_promote_share_replica_for_writable_share_type(self):
+        # Test promote active share_replica for writable share
+        if self.replication_type != "writable":
+            raise self.skipException("Option backend_replication_type "
+                                     "should be writable!")
+        share, instance_id = self._create_share_get_instance()
+        replica = self.create_share_replica(share["id"], self.replica_zone,
+                                            cleanup_in_class=False)
+        # By default, 'writable' replica is expected to be in active state
+        self.shares_v2_client.wait_for_share_replica_status(
+            replica["id"], constants.REPLICATION_STATE_ACTIVE,
+            status_attr='replica_state')
+
+        # Try promoting the replica
+        self.shares_v2_client.promote_share_replica(replica['id'])
