diff --git a/octavia_tempest_plugin/clients.py b/octavia_tempest_plugin/clients.py
index f2a7767..7fe3606 100644
--- a/octavia_tempest_plugin/clients.py
+++ b/octavia_tempest_plugin/clients.py
@@ -18,6 +18,12 @@
 from octavia_tempest_plugin.services.load_balancer.v2 import (
     amphora_client)
 from octavia_tempest_plugin.services.load_balancer.v2 import (
+    availability_zone_capabilities_client)
+from octavia_tempest_plugin.services.load_balancer.v2 import (
+    availability_zone_client)
+from octavia_tempest_plugin.services.load_balancer.v2 import (
+    availability_zone_profile_client)
+from octavia_tempest_plugin.services.load_balancer.v2 import (
     flavor_capabilities_client)
 from octavia_tempest_plugin.services.load_balancer.v2 import (
     flavor_client)
@@ -74,3 +80,11 @@
         self.provider_client = provider_client.ProviderClient(**params)
         self.flavor_capabilities_client = (
             flavor_capabilities_client.FlavorCapabilitiesClient(**params))
+        self.availability_zone_capabilities_client = (
+            availability_zone_capabilities_client
+            .AvailabilityZoneCapabilitiesClient(**params))
+        self.availability_zone_profile_client = (
+            availability_zone_profile_client.AvailabilityZoneProfileClient(
+                **params))
+        self.availability_zone_client = (
+            availability_zone_client.AvailabilityZoneClient(**params))
diff --git a/octavia_tempest_plugin/common/barbican_client_mgr.py b/octavia_tempest_plugin/common/barbican_client_mgr.py
index e93f903..eba1715 100644
--- a/octavia_tempest_plugin/common/barbican_client_mgr.py
+++ b/octavia_tempest_plugin/common/barbican_client_mgr.py
@@ -63,15 +63,15 @@
         # Setup the barbican client
         self.barbican = client.Client(session=id_session)
 
-    def store_secret(self, pkcs12_secret):
+    def store_secret(self, secret):
         """Store a secret in barbican.
 
-        :param pkcs12_secret: A pkcs12 secret.
+        :param secret: A pkcs12 secret.
         :returns: The barbican secret_ref.
         """
         p12_secret = self.barbican.secrets.create()
-        p12_secret.name = data_utils.rand_name("lb_member_barbican_pkcs12")
-        p12_secret.payload = pkcs12_secret
+        p12_secret.name = data_utils.rand_name("lb_member_barbican")
+        p12_secret.payload = secret
         secret_ref = p12_secret.store()
         LOG.debug('Secret {0} has ref {1}'.format(p12_secret.name, secret_ref))
         return secret_ref
diff --git a/octavia_tempest_plugin/common/cert_utils.py b/octavia_tempest_plugin/common/cert_utils.py
index dcdd6f0..f99ce88 100644
--- a/octavia_tempest_plugin/common/cert_utils.py
+++ b/octavia_tempest_plugin/common/cert_utils.py
@@ -58,6 +58,13 @@
     ).add_extension(
         x509.BasicConstraints(ca=True, path_length=None),
         critical=True,
+    ).add_extension(
+        # KeyUsage(digital_signature, content_commitment, key_encipherment,
+        #          data_encipherment, key_agreement, key_cert_sign, crl_sign,
+        #          encipher_only, decipher_only)
+        x509.KeyUsage(True, False, False, False, False,
+                      True, True, False, False),
+        critical=True,
     ).sign(ca_key, hashes.SHA256(), default_backend())
 
     return ca_cert, ca_key
@@ -104,11 +111,66 @@
     ).add_extension(
         x509.BasicConstraints(ca=False, path_length=None),
         critical=True,
+    ).add_extension(
+        # KeyUsage(digital_signature, content_commitment, key_encipherment,
+        #          data_encipherment, key_agreement, key_cert_sign, crl_sign,
+        #          encipher_only, decipher_only)
+        x509.KeyUsage(True, False, True, False, False,
+                      False, False, False, False),
+        critical=True,
     ).sign(ca_key, hashes.SHA256(), default_backend())
 
     return server_cert, server_key
 
 
+def generate_client_cert_and_key(ca_cert, ca_key, client_uuid):
+    """Creates a client cert and key for testing.
+
+    :param ca_cert: A cryptography CA certificate (x509) object.
+    :param ca_key: A cryptography CA key (x509) object.
+    :param client_uuid: A UUID identifying the client.
+    :returns: The cryptography server cert and key objects.
+    """
+
+    client_key = rsa.generate_private_key(
+        public_exponent=65537, key_size=2048, backend=default_backend())
+
+    subject = x509.Name([
+        x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
+        x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Denial"),
+        x509.NameAttribute(NameOID.LOCALITY_NAME, u"Corvallis"),
+        x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"OpenStack"),
+        x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u"Octavia"),
+        x509.NameAttribute(NameOID.COMMON_NAME, u"{}".format(client_uuid)),
+    ])
+
+    client_cert = x509.CertificateBuilder().subject_name(
+        subject
+    ).issuer_name(
+        ca_cert.subject
+    ).public_key(
+        client_key.public_key()
+    ).serial_number(
+        x509.random_serial_number()
+    ).not_valid_before(
+        datetime.datetime.utcnow()
+    ).not_valid_after(
+        datetime.datetime.utcnow() + datetime.timedelta(days=10)
+    ).add_extension(
+        x509.BasicConstraints(ca=False, path_length=None),
+        critical=True,
+    ).add_extension(
+        # KeyUsage(digital_signature, content_commitment, key_encipherment,
+        #          data_encipherment, key_agreement, key_cert_sign, crl_sign,
+        #          encipher_only, decipher_only)
+        x509.KeyUsage(True, True, True, False, False, False,
+                      False, False, False),
+        critical=True,
+    ).sign(ca_key, hashes.SHA256(), default_backend())
+
+    return client_cert, client_key
+
+
 def generate_pkcs12_bundle(server_cert, server_key):
     """Creates a pkcs12 formated bundle.
 
@@ -128,3 +190,28 @@
         OpenSSL.crypto.PKey.from_cryptography_key(server_key))
     pkcs12.set_certificate(OpenSSL.crypto.X509.from_cryptography(server_cert))
     return pkcs12.export()
+
+
+def generate_certificate_revocation_list(ca_cert, ca_key, cert_to_revoke):
+    """Create a certificate revocation list with a revoked certificate.
+
+    :param ca_cert: A cryptography CA certificate (x509) object.
+    :param ca_key: A cryptography CA key (x509) object.
+    :param cert_to_revoke: A cryptography CA certificate (x509) object.
+    :returns: A signed certificate revocation list.
+    """
+    crl_builder = x509.CertificateRevocationListBuilder()
+    crl_builder = crl_builder.issuer_name(ca_cert.subject)
+    crl_builder = crl_builder.last_update(datetime.datetime.utcnow())
+    crl_builder = crl_builder.next_update(datetime.datetime.utcnow() +
+                                          datetime.timedelta(1, 0, 0))
+
+    revoked_cert = x509.RevokedCertificateBuilder().serial_number(
+        cert_to_revoke.serial_number
+    ).revocation_date(
+        datetime.datetime.utcnow()
+    ).build(default_backend())
+
+    crl_builder = crl_builder.add_revoked_certificate(revoked_cert)
+    return crl_builder.sign(private_key=ca_key, algorithm=hashes.SHA256(),
+                            backend=default_backend())
diff --git a/octavia_tempest_plugin/common/constants.py b/octavia_tempest_plugin/common/constants.py
index d0d6a18..e31c1a5 100644
--- a/octavia_tempest_plugin/common/constants.py
+++ b/octavia_tempest_plugin/common/constants.py
@@ -14,12 +14,23 @@
 
 # API field names
 ACTIVE_CONNECTIONS = 'active_connections'
+AVAILABILITY_ZONE = 'availability_zone'
+AVAILABILITY_ZONE_DATA = 'availability_zone_data'
+AVAILABILITY_ZONE_PROFILE_ID = 'availability_zone_profile_id'
 ADMIN_STATE_UP = 'admin_state_up'
 BYTES_IN = 'bytes_in'
 BYTES_OUT = 'bytes_out'
+CLIENT_AUTHENTICATION = 'client_authentication'
+CLIENT_AUTH_NONE = 'NONE'
+CLIENT_AUTH_OPTIONAL = 'OPTIONAL'
+CLIENT_AUTH_MANDATORY = 'MANDATORY'
+CLIENT_CA_TLS_CONTAINER_REF = 'client_ca_tls_container_ref'
+CLIENT_CRL_CONTAINER_REF = 'client_crl_container_ref'
 CREATED_AT = 'created_at'
 DESCRIPTION = 'description'
+FLAVOR_DATA = 'flavor_data'
 FLAVOR_ID = 'flavor_id'
+FLAVOR_PROFILE_ID = 'flavor_profile_id'
 ID = 'id'
 LISTENERS = 'listeners'
 LOADBALANCER = 'loadbalancer'
@@ -89,9 +100,7 @@
 URL_PATH = 'url_path'
 EXPECTED_CODES = 'expected_codes'
 
-FLAVOR_DATA = 'flavor_data'
 ENABLED = 'enabled'
-FLAVOR_PROFILE_ID = 'flavor_profile_id'
 
 # Other constants
 ACTIVE = 'ACTIVE'
@@ -213,6 +222,10 @@
 # Flavor capabilities
 LOADBALANCER_TOPOLOGY = 'loadbalancer_topology'
 
+# Availability zone capabilities
+COMPUTE_ZONE = 'compute_zone'
+MANAGEMENT_NETWORK = 'management_network'
+
 # API valid fields
 SHOW_LOAD_BALANCER_RESPONSE_FIELDS = (
     ADMIN_STATE_UP, CREATED_AT, DESCRIPTION, FLAVOR_ID, ID, LISTENERS, NAME,
@@ -264,3 +277,9 @@
 SHOW_FLAVOR_PROFILE_FIELDS = [ID, NAME, PROVIDER_NAME, FLAVOR_DATA]
 
 SHOW_FLAVOR_FIELDS = [ID, NAME, DESCRIPTION, ENABLED, FLAVOR_PROFILE_ID]
+
+SHOW_AVAILABILITY_ZONE_PROFILE_FIELDS = [
+    ID, NAME, PROVIDER_NAME, AVAILABILITY_ZONE_DATA]
+
+SHOW_AVAILABILITY_ZONE_FIELDS = [
+    NAME, DESCRIPTION, ENABLED, AVAILABILITY_ZONE_PROFILE_ID]
diff --git a/octavia_tempest_plugin/config.py b/octavia_tempest_plugin/config.py
index 29dc1da..8573d89 100644
--- a/octavia_tempest_plugin/config.py
+++ b/octavia_tempest_plugin/config.py
@@ -132,6 +132,13 @@
                          'topology. One of: SINGLE - One amphora per load '
                          'balancer. ACTIVE_STANDBY - Two amphora per load '
                          'balancer.'}),
+    cfg.DictOpt('expected_availability_zone_capability',
+                help=('Defines a provider availability zone capability that '
+                      'is expected to be present in the selected provider '
+                      'under test. It is specified in a "name": "description" '
+                      'dict. Example: {"compute_zone": "The compute '
+                      'availability zone."}'),
+                default={'compute_zone': 'The compute availability zone.'}),
     # Networking
     cfg.BoolOpt('test_with_ipv6',
                 default=True,
diff --git a/octavia_tempest_plugin/services/load_balancer/v2/availability_zone_capabilities_client.py b/octavia_tempest_plugin/services/load_balancer/v2/availability_zone_capabilities_client.py
new file mode 100644
index 0000000..92696a7
--- /dev/null
+++ b/octavia_tempest_plugin/services/load_balancer/v2/availability_zone_capabilities_client.py
@@ -0,0 +1,79 @@
+#   Copyright 2019 Rackspace US Inc.  All rights reserved.
+#   Copyright 2019 Verizon Media
+#
+#   Licensed under the Apache License, Version 2.0 (the "License"); you may
+#   not use this file except in compliance with the License. You may obtain
+#   a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#   License for the specific language governing permissions and limitations
+#   under the License.
+#
+
+from octavia_tempest_plugin.services.load_balancer.v2 import base_client
+from octavia_tempest_plugin.services.load_balancer.v2 import provider_client
+
+Unset = base_client.Unset
+
+
+class AvailabilityZoneCapabilitiesClient(base_client.BaseLBaaSClient):
+
+    list_root_tag = 'availability_zone_capabilities'
+
+    def __init__(self, *args, **kwargs):
+        super(AvailabilityZoneCapabilitiesClient, self).__init__(
+            *args, **kwargs)
+        providers_list_root_tag = provider_client.ProviderClient.list_root_tag
+        # /v2.0/lbaas/providers/<PROVIDER_UUID>/availability_zone_capabilities
+        self.uri = "{provider_base_uri}/{parent}/{object}".format(
+            provider_base_uri=self.base_uri.format(
+                object=providers_list_root_tag),
+            parent="{parent}",
+            object=self.list_root_tag
+        )
+
+    def list_availability_zone_capabilities(self, provider, query_params=None,
+                                            return_object_only=True):
+        """Get a list of provider availability zone capability objects.
+
+        :param provider: The provider to query for availability zone
+                         capabilities.
+        :param query_params: The optional query parameters to append to the
+                             request. Ex. fields=id&fields=name
+        :param return_object_only: If True, the response returns the object
+                                   inside the root tag. False returns the full
+                                   response from the API.
+        :raises AssertionError: if the expected_code isn't a valid http success
+                                response code
+        :raises BadRequest: If a 400 response code is received
+        :raises Conflict: If a 409 response code is received
+        :raises Forbidden: If a 403 response code is received
+        :raises Gone: If a 410 response code is received
+        :raises InvalidContentType: If a 415 response code is received
+        :raises InvalidHTTPResponseBody: The response body wasn't valid JSON
+        :raises InvalidHttpSuccessCode: if the read code isn't an expected
+                                        http success code
+        :raises NotFound: If a 404 response code is received
+        :raises NotImplemented: If a 501 response code is received
+        :raises OverLimit: If a 413 response code is received and over_limit is
+                           not in the response body
+        :raises RateLimitExceeded: If a 413 response code is received and
+                                   over_limit is in the response body
+        :raises ServerFault: If a 500 response code is received
+        :raises Unauthorized: If a 401 response code is received
+        :raises UnexpectedContentType: If the content-type of the response
+                                       isn't an expect type
+        :raises UnexpectedResponseCode: If a response code above 400 is
+                                        received and it doesn't fall into any
+                                        of the handled checks
+        :raises UnprocessableEntity: If a 422 response code is received and
+                                     couldn't be parsed
+        :returns: A list of availability zone capability objects.
+        """
+        return self._list_objects(parent_id=provider,
+                                  query_params=query_params,
+                                  return_object_only=return_object_only)
diff --git a/octavia_tempest_plugin/services/load_balancer/v2/availability_zone_client.py b/octavia_tempest_plugin/services/load_balancer/v2/availability_zone_client.py
new file mode 100644
index 0000000..c729f21
--- /dev/null
+++ b/octavia_tempest_plugin/services/load_balancer/v2/availability_zone_client.py
@@ -0,0 +1,274 @@
+#   Copyright 2019 Rackspace US Inc.  All rights reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License"); you may
+#   not use this file except in compliance with the License. You may obtain
+#   a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#   License for the specific language governing permissions and limitations
+#   under the License.
+#
+
+from oslo_log import log as logging
+from tempest.lib import exceptions
+
+from octavia_tempest_plugin.services.load_balancer.v2 import base_client
+
+LOG = logging.getLogger(__name__)
+Unset = base_client.Unset
+
+
+class AvailabilityZoneClient(base_client.BaseLBaaSClient):
+
+    root_tag = 'availability_zone'
+    list_root_tag = 'availability_zones'
+
+    resource_path = 'availabilityzones'
+
+    def __init__(self, *args, **kwargs):
+        super(AvailabilityZoneClient, self).__init__(*args, **kwargs)
+        self.uri = self.base_uri.format(object=self.resource_path)
+
+    def create_availability_zone(self, name, availability_zone_profile_id,
+                                 description=Unset, enabled=Unset,
+                                 return_object_only=True):
+        """Create an availability zone.
+
+        :param name: Human-readable name of the resource.
+        :param availability_zone_profile_id: The ID of the associated
+                                             availability zone profile.
+        :param description: A human-readable description for the resource.
+        :param enabled: If the resource is available for use.
+                        The default is True.
+        :raises AssertionError: if the expected_code isn't a valid http success
+                                response code
+        :raises BadRequest: If a 400 response code is received
+        :raises Conflict: If a 409 response code is received
+        :raises Forbidden: If a 403 response code is received
+        :raises Gone: If a 410 response code is received
+        :raises InvalidContentType: If a 415 response code is received
+        :raises InvalidHTTPResponseBody: The response body wasn't valid JSON
+        :raises InvalidHttpSuccessCode: if the read code isn't an expected
+                                        http success code
+        :raises NotFound: If a 404 response code is received
+        :raises NotImplemented: If a 501 response code is received
+        :raises OverLimit: If a 413 response code is received and over_limit is
+                           not in the response body
+        :raises RateLimitExceeded: If a 413 response code is received and
+                                   over_limit is in the response body
+        :raises ServerFault: If a 500 response code is received
+        :raises Unauthorized: If a 401 response code is received
+        :raises UnexpectedContentType: If the content-type of the response
+                                       isn't an expect type
+        :raises UnexpectedResponseCode: If a response code above 400 is
+                                        received and it doesn't fall into any
+                                        of the handled checks
+        :raises UnprocessableEntity: If a 422 response code is received and
+                                     couldn't be parsed
+        :returns: An availability zone object.
+        """
+        kwargs = {arg: value for arg, value in locals().items()
+                  if arg != 'self' and value is not Unset}
+        return self._create_object(**kwargs)
+
+    def show_availability_zone(self, availability_zone_name, query_params=None,
+                               return_object_only=True):
+        """Get the availability zone details.
+
+        :param availability_zone_name: The availability zone name to query.
+        :param query_params: The optional query parameters to append to the
+                             request. Ex. fields=id&fields=name
+        :param return_object_only: If True, the response returns the object
+                                   inside the root tag. False returns the full
+                                   response from the API.
+        :raises AssertionError: if the expected_code isn't a valid http success
+                                response code
+        :raises BadRequest: If a 400 response code is received
+        :raises Conflict: If a 409 response code is received
+        :raises Forbidden: If a 403 response code is received
+        :raises Gone: If a 410 response code is received
+        :raises InvalidContentType: If a 415 response code is received
+        :raises InvalidHTTPResponseBody: The response body wasn't valid JSON
+        :raises InvalidHttpSuccessCode: if the read code isn't an expected
+                                        http success code
+        :raises NotFound: If a 404 response code is received
+        :raises NotImplemented: If a 501 response code is received
+        :raises OverLimit: If a 413 response code is received and over_limit is
+                           not in the response body
+        :raises RateLimitExceeded: If a 413 response code is received and
+                                   over_limit is in the response body
+        :raises ServerFault: If a 500 response code is received
+        :raises Unauthorized: If a 401 response code is received
+        :raises UnexpectedContentType: If the content-type of the response
+                                       isn't an expect type
+        :raises UnexpectedResponseCode: If a response code above 400 is
+                                        received and it doesn't fall into any
+                                        of the handled checks
+        :raises UnprocessableEntity: If a 422 response code is received and
+                                     couldn't be parsed
+        :returns: An availability zone object.
+        """
+        return self._show_object(obj_id=availability_zone_name,
+                                 query_params=query_params,
+                                 return_object_only=return_object_only)
+
+    def list_availability_zones(self, query_params=None,
+                                return_object_only=True):
+        """Get a list of availability zone objects.
+
+        :param query_params: The optional query parameters to append to the
+                             request. Ex. fields=id&fields=name
+        :param return_object_only: If True, the response returns the object
+                                   inside the root tag. False returns the full
+                                   response from the API.
+        :raises AssertionError: if the expected_code isn't a valid http success
+                                response code
+        :raises BadRequest: If a 400 response code is received
+        :raises Conflict: If a 409 response code is received
+        :raises Forbidden: If a 403 response code is received
+        :raises Gone: If a 410 response code is received
+        :raises InvalidContentType: If a 415 response code is received
+        :raises InvalidHTTPResponseBody: The response body wasn't valid JSON
+        :raises InvalidHttpSuccessCode: if the read code isn't an expected
+                                        http success code
+        :raises NotFound: If a 404 response code is received
+        :raises NotImplemented: If a 501 response code is received
+        :raises OverLimit: If a 413 response code is received and over_limit is
+                           not in the response body
+        :raises RateLimitExceeded: If a 413 response code is received and
+                                   over_limit is in the response body
+        :raises ServerFault: If a 500 response code is received
+        :raises Unauthorized: If a 401 response code is received
+        :raises UnexpectedContentType: If the content-type of the response
+                                       isn't an expect type
+        :raises UnexpectedResponseCode: If a response code above 400 is
+                                        received and it doesn't fall into any
+                                        of the handled checks
+        :raises UnprocessableEntity: If a 422 response code is received and
+                                     couldn't be parsed
+        :returns: A list of availability zone objects.
+        """
+        return self._list_objects(query_params=query_params,
+                                  return_object_only=return_object_only)
+
+    def update_availability_zone(self, availability_zone_name,
+                                 description=Unset, enabled=Unset,
+                                 return_object_only=True):
+        """Update an availability zone.
+
+        :param availability_zone_name: The availability zone name to update.
+        :param description: A human-readable description for the resource.
+        :param enabled: If the resource is available for use.
+        :raises AssertionError: if the expected_code isn't a valid http success
+                                response code
+        :raises BadRequest: If a 400 response code is received
+        :raises Conflict: If a 409 response code is received
+        :raises Forbidden: If a 403 response code is received
+        :raises Gone: If a 410 response code is received
+        :raises InvalidContentType: If a 415 response code is received
+        :raises InvalidHTTPResponseBody: The response body wasn't valid JSON
+        :raises InvalidHttpSuccessCode: if the read code isn't an expected
+                                        http success code
+        :raises NotFound: If a 404 response code is received
+        :raises NotImplemented: If a 501 response code is received
+        :raises OverLimit: If a 413 response code is received and over_limit is
+                           not in the response body
+        :raises RateLimitExceeded: If a 413 response code is received and
+                                   over_limit is in the response body
+        :raises ServerFault: If a 500 response code is received
+        :raises Unauthorized: If a 401 response code is received
+        :raises UnexpectedContentType: If the content-type of the response
+                                       isn't an expect type
+        :raises UnexpectedResponseCode: If a response code above 400 is
+                                        received and it doesn't fall into any
+                                        of the handled checks
+        :raises UnprocessableEntity: If a 422 response code is received and
+                                     couldn't be parsed
+        :returns: An availability zone object.
+        """
+        kwargs = {arg: value for arg, value in locals().items()
+                  if arg != 'self' and value is not Unset}
+        kwargs['obj_id'] = kwargs.pop('availability_zone_name')
+        return self._update_object(**kwargs)
+
+    def delete_availability_zone(self, availability_zone_name,
+                                 ignore_errors=False):
+        """Delete an availability zone.
+
+        :param availability_zone_name: The availability zone name to delete.
+        :param ignore_errors: True if errors should be ignored.
+        :raises AssertionError: if the expected_code isn't a valid http success
+                                response code
+        :raises BadRequest: If a 400 response code is received
+        :raises Conflict: If a 409 response code is received
+        :raises Forbidden: If a 403 response code is received
+        :raises Gone: If a 410 response code is received
+        :raises InvalidContentType: If a 415 response code is received
+        :raises InvalidHTTPResponseBody: The response body wasn't valid JSON
+        :raises InvalidHttpSuccessCode: if the read code isn't an expected
+                                        http success code
+        :raises NotFound: If a 404 response code is received
+        :raises NotImplemented: If a 501 response code is received
+        :raises OverLimit: If a 413 response code is received and over_limit is
+                           not in the response body
+        :raises RateLimitExceeded: If a 413 response code is received and
+                                   over_limit is in the response body
+        :raises ServerFault: If a 500 response code is received
+        :raises Unauthorized: If a 401 response code is received
+        :raises UnexpectedContentType: If the content-type of the response
+                                       isn't an expect type
+        :raises UnexpectedResponseCode: If a response code above 400 is
+                                        received and it doesn't fall into any
+                                        of the handled checks
+        :raises UnprocessableEntity: If a 422 response code is received and
+                                     couldn't be parsed
+        :returns: None if ignore_errors is True, the response status code
+                  if not.
+        """
+        return self._delete_obj(obj_id=availability_zone_name,
+                                ignore_errors=ignore_errors)
+
+    def cleanup_an_availability_zone(self, availability_zone_name):
+        """Delete an availability zone for tempest cleanup.
+
+           We cannot use the cleanup_availability_zone method as availability
+           zones do not have a provisioning_status.
+
+        :param availability_zone_name: The availability zone name to delete.
+        :raises AssertionError: if the expected_code isn't a valid http success
+                                response code
+        :raises BadRequest: If a 400 response code is received
+        :raises Conflict: If a 409 response code is received
+        :raises Forbidden: If a 403 response code is received
+        :raises Gone: If a 410 response code is received
+        :raises InvalidContentType: If a 415 response code is received
+        :raises InvalidHTTPResponseBody: The response body wasn't valid JSON
+        :raises InvalidHttpSuccessCode: if the read code isn't an expected
+                                        http success code
+        :raises NotImplemented: If a 501 response code is received
+        :raises OverLimit: If a 413 response code is received and over_limit is
+                           not in the response body
+        :raises RateLimitExceeded: If a 413 response code is received and
+                                   over_limit is in the response body
+        :raises ServerFault: If a 500 response code is received
+        :raises Unauthorized: If a 401 response code is received
+        :raises UnexpectedContentType: If the content-type of the response
+                                       isn't an expect type
+        :raises UnexpectedResponseCode: If a response code above 400 is
+                                        received and it doesn't fall into any
+                                        of the handled checks
+        :raises UnprocessableEntity: If a 422 response code is received and
+                                     couldn't be parsed
+        :returns: None if ignore_errors is True, the response status code
+                  if not.
+        """
+        try:
+            self._delete_obj(obj_id=availability_zone_name)
+        except exceptions.NotFound:
+            # Already gone, cleanup complete
+            LOG.info("Availability zone %s is already gone. "
+                     "Cleanup considered complete.", availability_zone_name)
diff --git a/octavia_tempest_plugin/services/load_balancer/v2/availability_zone_profile_client.py b/octavia_tempest_plugin/services/load_balancer/v2/availability_zone_profile_client.py
new file mode 100644
index 0000000..631162d
--- /dev/null
+++ b/octavia_tempest_plugin/services/load_balancer/v2/availability_zone_profile_client.py
@@ -0,0 +1,280 @@
+#   Copyright 2019 Rackspace US Inc.  All rights reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License"); you may
+#   not use this file except in compliance with the License. You may obtain
+#   a copy of the License at
+#
+#        http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#   WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#   License for the specific language governing permissions and limitations
+#   under the License.
+#
+
+from oslo_log import log as logging
+from tempest.lib import exceptions
+
+from octavia_tempest_plugin.services.load_balancer.v2 import base_client
+
+LOG = logging.getLogger(__name__)
+Unset = base_client.Unset
+
+
+class AvailabilityZoneProfileClient(base_client.BaseLBaaSClient):
+
+    root_tag = 'availability_zone_profile'
+    list_root_tag = 'availability_zone_profiles'
+
+    resource_path = 'availabilityzoneprofiles'
+
+    def __init__(self, *args, **kwargs):
+        super(AvailabilityZoneProfileClient, self).__init__(*args, **kwargs)
+        self.uri = self.base_uri.format(object=self.resource_path)
+
+    def create_availability_zone_profile(self, name, provider_name,
+                                         availability_zone_data,
+                                         return_object_only=True):
+        """Create an availability zone profile.
+
+        :param name: Human-readable name of the resource.
+        :param provider_name: The octavia provider name.
+        :param availability_zone_data: The JSON string containing the
+                                       availability zone metadata.
+        :raises AssertionError: if the expected_code isn't a valid http success
+                                response code
+        :raises BadRequest: If a 400 response code is received
+        :raises Conflict: If a 409 response code is received
+        :raises Forbidden: If a 403 response code is received
+        :raises Gone: If a 410 response code is received
+        :raises InvalidContentType: If a 415 response code is received
+        :raises InvalidHTTPResponseBody: The response body wasn't valid JSON
+        :raises InvalidHttpSuccessCode: if the read code isn't an expected
+                                        http success code
+        :raises NotFound: If a 404 response code is received
+        :raises NotImplemented: If a 501 response code is received
+        :raises OverLimit: If a 413 response code is received and over_limit is
+                           not in the response body
+        :raises RateLimitExceeded: If a 413 response code is received and
+                                   over_limit is in the response body
+        :raises ServerFault: If a 500 response code is received
+        :raises Unauthorized: If a 401 response code is received
+        :raises UnexpectedContentType: If the content-type of the response
+                                       isn't an expect type
+        :raises UnexpectedResponseCode: If a response code above 400 is
+                                        received and it doesn't fall into any
+                                        of the handled checks
+        :raises UnprocessableEntity: If a 422 response code is received and
+                                     couldn't be parsed
+        :returns: An availability zone profile object.
+        """
+        kwargs = {arg: value for arg, value in locals().items()
+                  if arg != 'self' and value is not Unset}
+        return self._create_object(**kwargs)
+
+    def show_availability_zone_profile(self, availability_zone_profile_id,
+                                       query_params=None,
+                                       return_object_only=True):
+        """Get the availability zone profile details.
+
+        :param availability_zone_profile_id: The availability zone profile ID
+                                             to query.
+        :param query_params: The optional query parameters to append to the
+                             request. Ex. fields=id&fields=name
+        :param return_object_only: If True, the response returns the object
+                                   inside the root tag. False returns the full
+                                   response from the API.
+        :raises AssertionError: if the expected_code isn't a valid http success
+                                response code
+        :raises BadRequest: If a 400 response code is received
+        :raises Conflict: If a 409 response code is received
+        :raises Forbidden: If a 403 response code is received
+        :raises Gone: If a 410 response code is received
+        :raises InvalidContentType: If a 415 response code is received
+        :raises InvalidHTTPResponseBody: The response body wasn't valid JSON
+        :raises InvalidHttpSuccessCode: if the read code isn't an expected
+                                        http success code
+        :raises NotFound: If a 404 response code is received
+        :raises NotImplemented: If a 501 response code is received
+        :raises OverLimit: If a 413 response code is received and over_limit is
+                           not in the response body
+        :raises RateLimitExceeded: If a 413 response code is received and
+                                   over_limit is in the response body
+        :raises ServerFault: If a 500 response code is received
+        :raises Unauthorized: If a 401 response code is received
+        :raises UnexpectedContentType: If the content-type of the response
+                                       isn't an expect type
+        :raises UnexpectedResponseCode: If a response code above 400 is
+                                        received and it doesn't fall into any
+                                        of the handled checks
+        :raises UnprocessableEntity: If a 422 response code is received and
+                                     couldn't be parsed
+        :returns: An availability zone profile object.
+        """
+        return self._show_object(obj_id=availability_zone_profile_id,
+                                 query_params=query_params,
+                                 return_object_only=return_object_only)
+
+    def list_availability_zone_profiles(self, query_params=None,
+                                        return_object_only=True):
+        """Get a list of availability zone profile objects.
+
+        :param query_params: The optional query parameters to append to the
+                             request. Ex. fields=id&fields=name
+        :param return_object_only: If True, the response returns the object
+                                   inside the root tag. False returns the full
+                                   response from the API.
+        :raises AssertionError: if the expected_code isn't a valid http success
+                                response code
+        :raises BadRequest: If a 400 response code is received
+        :raises Conflict: If a 409 response code is received
+        :raises Forbidden: If a 403 response code is received
+        :raises Gone: If a 410 response code is received
+        :raises InvalidContentType: If a 415 response code is received
+        :raises InvalidHTTPResponseBody: The response body wasn't valid JSON
+        :raises InvalidHttpSuccessCode: if the read code isn't an expected
+                                        http success code
+        :raises NotFound: If a 404 response code is received
+        :raises NotImplemented: If a 501 response code is received
+        :raises OverLimit: If a 413 response code is received and over_limit is
+                           not in the response body
+        :raises RateLimitExceeded: If a 413 response code is received and
+                                   over_limit is in the response body
+        :raises ServerFault: If a 500 response code is received
+        :raises Unauthorized: If a 401 response code is received
+        :raises UnexpectedContentType: If the content-type of the response
+                                       isn't an expect type
+        :raises UnexpectedResponseCode: If a response code above 400 is
+                                        received and it doesn't fall into any
+                                        of the handled checks
+        :raises UnprocessableEntity: If a 422 response code is received and
+                                     couldn't be parsed
+        :returns: A list of availability zone profile objects.
+        """
+        return self._list_objects(query_params=query_params,
+                                  return_object_only=return_object_only)
+
+    def update_availability_zone_profile(
+        self, availability_zone_profile_id, name=Unset, provider_name=Unset,
+        availability_zone_data=Unset, return_object_only=True):
+        """Update an availability zone profile.
+
+        :param availability_zone_profile_id: The availability zone profile ID
+                                             to update.
+        :param name: Human-readable name of the resource.
+        :param provider_name: The octavia provider name.
+        :param availability_zone_data: The JSON string containing the
+                                       availability zone metadata.
+        :raises AssertionError: if the expected_code isn't a valid http success
+                                response code
+        :raises BadRequest: If a 400 response code is received
+        :raises Conflict: If a 409 response code is received
+        :raises Forbidden: If a 403 response code is received
+        :raises Gone: If a 410 response code is received
+        :raises InvalidContentType: If a 415 response code is received
+        :raises InvalidHTTPResponseBody: The response body wasn't valid JSON
+        :raises InvalidHttpSuccessCode: if the read code isn't an expected
+                                        http success code
+        :raises NotFound: If a 404 response code is received
+        :raises NotImplemented: If a 501 response code is received
+        :raises OverLimit: If a 413 response code is received and over_limit is
+                           not in the response body
+        :raises RateLimitExceeded: If a 413 response code is received and
+                                   over_limit is in the response body
+        :raises ServerFault: If a 500 response code is received
+        :raises Unauthorized: If a 401 response code is received
+        :raises UnexpectedContentType: If the content-type of the response
+                                       isn't an expect type
+        :raises UnexpectedResponseCode: If a response code above 400 is
+                                        received and it doesn't fall into any
+                                        of the handled checks
+        :raises UnprocessableEntity: If a 422 response code is received and
+                                     couldn't be parsed
+        :returns: An availability zone profile object.
+        """
+        kwargs = {arg: value for arg, value in locals().items()
+                  if arg != 'self' and value is not Unset}
+        kwargs['obj_id'] = kwargs.pop('availability_zone_profile_id')
+        return self._update_object(**kwargs)
+
+    def delete_availability_zone_profile(self, availability_zone_profile_id,
+                                         ignore_errors=False):
+        """Delete an availability zone profile.
+
+        :param availability_zone_profile_id: The availability zone profile ID
+                                             to delete.
+        :param ignore_errors: True if errors should be ignored.
+        :raises AssertionError: if the expected_code isn't a valid http success
+                                response code
+        :raises BadRequest: If a 400 response code is received
+        :raises Conflict: If a 409 response code is received
+        :raises Forbidden: If a 403 response code is received
+        :raises Gone: If a 410 response code is received
+        :raises InvalidContentType: If a 415 response code is received
+        :raises InvalidHTTPResponseBody: The response body wasn't valid JSON
+        :raises InvalidHttpSuccessCode: if the read code isn't an expected
+                                        http success code
+        :raises NotFound: If a 404 response code is received
+        :raises NotImplemented: If a 501 response code is received
+        :raises OverLimit: If a 413 response code is received and over_limit is
+                           not in the response body
+        :raises RateLimitExceeded: If a 413 response code is received and
+                                   over_limit is in the response body
+        :raises ServerFault: If a 500 response code is received
+        :raises Unauthorized: If a 401 response code is received
+        :raises UnexpectedContentType: If the content-type of the response
+                                       isn't an expect type
+        :raises UnexpectedResponseCode: If a response code above 400 is
+                                        received and it doesn't fall into any
+                                        of the handled checks
+        :raises UnprocessableEntity: If a 422 response code is received and
+                                     couldn't be parsed
+        :returns: None if ignore_errors is True, the response status code
+                  if not.
+        """
+        return self._delete_obj(obj_id=availability_zone_profile_id,
+                                ignore_errors=ignore_errors)
+
+    def cleanup_availability_zone_profile(self, availability_zone_profile_id):
+        """Delete an availability zone profile for tempest cleanup.
+
+           We cannot use the cleanup_availability_zone_profile method as
+           availability zone profiles do not have a provisioning_status.
+
+        :param availability_zone_profile_id: The availability zone profile ID
+                                             to delete.
+        :raises AssertionError: if the expected_code isn't a valid http success
+                                response code
+        :raises BadRequest: If a 400 response code is received
+        :raises Conflict: If a 409 response code is received
+        :raises Forbidden: If a 403 response code is received
+        :raises Gone: If a 410 response code is received
+        :raises InvalidContentType: If a 415 response code is received
+        :raises InvalidHTTPResponseBody: The response body wasn't valid JSON
+        :raises InvalidHttpSuccessCode: if the read code isn't an expected
+                                        http success code
+        :raises NotImplemented: If a 501 response code is received
+        :raises OverLimit: If a 413 response code is received and over_limit is
+                           not in the response body
+        :raises RateLimitExceeded: If a 413 response code is received and
+                                   over_limit is in the response body
+        :raises ServerFault: If a 500 response code is received
+        :raises Unauthorized: If a 401 response code is received
+        :raises UnexpectedContentType: If the content-type of the response
+                                       isn't an expect type
+        :raises UnexpectedResponseCode: If a response code above 400 is
+                                        received and it doesn't fall into any
+                                        of the handled checks
+        :raises UnprocessableEntity: If a 422 response code is received and
+                                     couldn't be parsed
+        :returns: None if ignore_errors is True, the response status code
+                  if not.
+        """
+        try:
+            self._delete_obj(obj_id=availability_zone_profile_id)
+        except exceptions.NotFound:
+            # Already gone, cleanup complete
+            LOG.info("Availability zone profile %s is already gone. "
+                     "Cleanup considered complete.",
+                     availability_zone_profile_id)
diff --git a/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py b/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py
index de84bd1..1cc17ff 100644
--- a/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py
+++ b/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py
@@ -35,7 +35,10 @@
                         timeout_member_data=Unset, timeout_tcp_inspect=Unset,
                         insert_headers=Unset, default_pool_id=Unset,
                         default_tls_container_ref=Unset,
-                        sni_container_refs=Unset, return_object_only=True):
+                        sni_container_refs=Unset, client_authentication=Unset,
+                        client_ca_tls_container_ref=Unset,
+                        client_crl_container_ref=Unset,
+                        return_object_only=True):
         """Create a listener.
 
         :param protocol: The protocol for the resource.
@@ -70,6 +73,17 @@
                                    secrets containing PKCS12 format
                                    certificate/key bundles for TERMINATED_TLS
                                    listeners.
+        :param client_authentication: The TLS client authentication mode. One
+                                      of the options NONE, OPTIONAL or
+                                      MANDATORY.
+        :param client_ca_tls_container_ref: The ref of the key manager service
+                                            secret containing a PEM format
+                                            client CA certificate bundle for
+                                            TERMINATED_HTTPS listeners.
+        :param client_crl_container_ref: The URI of the key manager service
+                                         secret containing a PEM format CA
+                                         revocation list file for
+                                         TERMINATED_HTTPS listeners.
         :param return_object_only: If True, the response returns the object
                                    inside the root tag. False returns the full
                                    response from the API.
@@ -190,7 +204,10 @@
                         timeout_member_data=Unset, timeout_tcp_inspect=Unset,
                         insert_headers=Unset, default_pool_id=Unset,
                         default_tls_container_ref=Unset,
-                        sni_container_refs=Unset, return_object_only=True):
+                        sni_container_refs=Unset, client_authentication=Unset,
+                        client_ca_tls_container_ref=Unset,
+                        client_crl_container_ref=Unset,
+                        return_object_only=True):
         """Update a listener.
 
         :param listener_id: The listener ID to update.
@@ -223,6 +240,17 @@
                                    secrets containing PKCS12 format
                                    certificate/key bundles for TERMINATED_TLS
                                    listeners.
+        :param client_authentication: The TLS client authentication mode. One
+                                      of the options NONE, OPTIONAL or
+                                      MANDATORY.
+        :param client_ca_tls_container_ref: The ref of the key manager service
+                                            secret containing a PEM format
+                                            client CA certificate bundle for
+                                            TERMINATED_HTTPS listeners.
+        :param client_crl_container_ref: The URI of the key manager service
+                                         secret containing a PEM format CA
+                                         revocation list file for
+                                         TERMINATED_HTTPS listeners.
         :param return_object_only: If True, the response returns the object
                                    inside the root tag. False returns the full
                                    response from the API.
diff --git a/octavia_tempest_plugin/tests/api/v2/test_availability_zone.py b/octavia_tempest_plugin/tests/api/v2/test_availability_zone.py
new file mode 100644
index 0000000..023d5f5
--- /dev/null
+++ b/octavia_tempest_plugin/tests/api/v2/test_availability_zone.py
@@ -0,0 +1,492 @@
+#    Copyright 2019 Rackspace US Inc.  All rights reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import copy
+from operator import itemgetter
+
+from oslo_serialization import jsonutils
+from oslo_utils import uuidutils
+from tempest import config
+from tempest.lib.common.utils import data_utils
+from tempest.lib import decorators
+from tempest.lib import exceptions
+
+from octavia_tempest_plugin.common import constants as const
+from octavia_tempest_plugin.tests import test_base
+
+CONF = config.CONF
+
+
+class AvailabilityZoneAPITest(test_base.LoadBalancerBaseTest):
+    """Test the availability zone object API."""
+
+    @classmethod
+    def resource_setup(cls):
+        """Setup resources needed by the tests."""
+        super(AvailabilityZoneAPITest, cls).resource_setup()
+
+        # We have to do this here as the api_version and clients are not
+        # setup in time to use a decorator or the skip_checks mixin
+        if not (cls.lb_admin_availability_zone_profile_client
+                .is_version_supported(cls.api_version, '2.14')):
+            return
+
+        # Create a shared availability zone profile
+        availability_zone_profile_name = data_utils.rand_name(
+            "lb_admin_availabilityzoneprofile-setup")
+        availability_zone_data = {
+            const.COMPUTE_ZONE: 'my_compute_zone',
+            const.MANAGEMENT_NETWORK: uuidutils.generate_uuid(),
+        }
+        availability_zone_data_json = jsonutils.dumps(availability_zone_data)
+
+        availability_zone_profile_kwargs = {
+            const.NAME: availability_zone_profile_name,
+            const.PROVIDER_NAME: CONF.load_balancer.provider,
+            const.AVAILABILITY_ZONE_DATA: availability_zone_data_json
+        }
+
+        cls.availability_zone_profile = (
+            cls.lb_admin_availability_zone_profile_client
+            .create_availability_zone_profile(
+                **availability_zone_profile_kwargs))
+        cls.addClassResourceCleanup(
+            cls.lb_admin_availability_zone_profile_client
+            .cleanup_availability_zone_profile,
+            cls.availability_zone_profile[const.ID])
+        cls.availability_zone_profile_id = (
+            cls.availability_zone_profile[const.ID])
+
+    @decorators.idempotent_id('3899ef15-37c3-48a3-807f-8bb10bd295f0')
+    def test_availability_zone_create(self):
+        """Tests availability zone create and basic show APIs.
+
+        * Tests that users without the loadbalancer admin role cannot
+          create an availability zone.
+        * Create a fully populated availability zone.
+        * Validate the response reflects the requested values.
+        """
+        # We have to do this here as the api_version and clients are not
+        # setup in time to use a decorator or the skip_checks mixin
+        if not self.lb_admin_availability_zone_client.is_version_supported(
+                self.api_version, '2.14'):
+            raise self.skipException('Availability zones are only available '
+                                     'on Octavia API version 2.14 or newer.')
+        availability_zone_name = data_utils.rand_name(
+            "lb_admin_availability_zone-create")
+        availability_zone_description = data_utils.arbitrary_string(size=255)
+
+        availability_zone_kwargs = {
+            const.NAME: availability_zone_name,
+            const.DESCRIPTION: availability_zone_description,
+            const.ENABLED: True,
+            const.AVAILABILITY_ZONE_PROFILE_ID:
+                self.availability_zone_profile_id}
+
+        # Test that a user without the load balancer admin role cannot
+        # create an availability zone
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            self.assertRaises(exceptions.Forbidden,
+                              self.os_primary.availability_zone_client
+                              .create_availability_zone,
+                              **availability_zone_kwargs)
+
+        # Happy path
+        availability_zone = (
+            self.lb_admin_availability_zone_client
+                .create_availability_zone(**availability_zone_kwargs))
+        self.addCleanup(
+            self.lb_admin_availability_zone_client
+                .cleanup_an_availability_zone,
+            availability_zone[const.NAME])
+
+        self.assertEqual(availability_zone_name, availability_zone[const.NAME])
+        self.assertEqual(availability_zone_description,
+                         availability_zone[const.DESCRIPTION])
+        self.assertTrue(availability_zone[const.ENABLED])
+        self.assertEqual(self.availability_zone_profile_id,
+                         availability_zone[const.AVAILABILITY_ZONE_PROFILE_ID])
+
+    @decorators.idempotent_id('bba84c0c-2832-4c4c-90ff-d28acfe4ae36')
+    def test_availability_zone_list(self):
+        """Tests availability zone list API and field filtering.
+
+        * Create three availability zones.
+        * Validates that non-admin accounts cannot list the availability zones.
+        * List the availability zones using the default sort order.
+        * List the availability zones using descending sort order.
+        * List the availability zones using ascending sort order.
+        * List the availability zones returning one field at a time.
+        * List the availability zones returning two fields.
+        * List the availability zones filtering to one of the three.
+        * List the availability zones filtered, one field, and sorted.
+        """
+        # We have to do this here as the api_version and clients are not
+        # setup in time to use a decorator or the skip_checks mixin
+        if not self.lb_admin_availability_zone_client.is_version_supported(
+                self.api_version, '2.14'):
+            raise self.skipException('Availability zones are only available '
+                                     'on Octavia API version 2.14 or newer.')
+
+        # Create availability zone 1
+        az1_name = data_utils.rand_name("lb_admin_availability_zone-list-1")
+        az1_description = 'A'
+
+        az1_kwargs = {
+            const.NAME: az1_name,
+            const.DESCRIPTION: az1_description,
+            const.ENABLED: True,
+            const.AVAILABILITY_ZONE_PROFILE_ID:
+                self.availability_zone_profile_id}
+
+        az1 = (self.lb_admin_availability_zone_client
+               .create_availability_zone(**az1_kwargs))
+        self.addCleanup(
+            self.lb_admin_availability_zone_client
+                .cleanup_an_availability_zone,
+            az1[const.NAME])
+
+        # Create availability zone 2
+        az2_name = data_utils.rand_name("lb_admin_availability_zone-list-2")
+        az2_description = 'B'
+
+        az2_kwargs = {
+            const.NAME: az2_name,
+            const.DESCRIPTION: az2_description,
+            const.ENABLED: False,
+            const.AVAILABILITY_ZONE_PROFILE_ID:
+                self.availability_zone_profile_id}
+
+        az2 = (self.lb_admin_availability_zone_client
+               .create_availability_zone(**az2_kwargs))
+        self.addCleanup(
+            self.lb_admin_availability_zone_client
+                .cleanup_an_availability_zone,
+            az2[const.NAME])
+
+        # Create availability zone 3
+        az3_name = data_utils.rand_name("lb_admin_availability_zone-list-3")
+        az3_description = 'C'
+
+        az3_kwargs = {
+            const.NAME: az3_name,
+            const.DESCRIPTION: az3_description,
+            const.ENABLED: True,
+            const.AVAILABILITY_ZONE_PROFILE_ID:
+                self.availability_zone_profile_id}
+
+        az3 = (self.lb_admin_availability_zone_client
+               .create_availability_zone(**az3_kwargs))
+        self.addCleanup(
+            self.lb_admin_availability_zone_client
+                .cleanup_an_availability_zone,
+            az3[const.NAME])
+
+        # default sort order (by Name) reference list
+        ref_id_list_asc = [az1[const.NAME], az2[const.NAME],
+                           az3[const.NAME]]
+        ref_id_list_dsc = copy.deepcopy(ref_id_list_asc)
+        ref_id_list_asc.sort()
+        ref_id_list_dsc.sort(reverse=True)
+
+        # Test that a user without the load balancer role cannot
+        # list availability zones.
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            self.assertRaises(
+                exceptions.Forbidden,
+                self.os_primary.availability_zone_client
+                    .list_availability_zones)
+
+        # Check the default sort order (by ID)
+        availability_zones = (
+            self.mem_availability_zone_client.list_availability_zones())
+        # Remove availability zones not used in this test
+        availability_zones = [
+            az for az in availability_zones
+            if 'lb_admin_availability_zone-list' in az[const.NAME]]
+        self.assertEqual(3, len(availability_zones))
+        self.assertEqual(ref_id_list_asc[0], availability_zones[0][const.NAME])
+        self.assertEqual(ref_id_list_asc[1], availability_zones[1][const.NAME])
+        self.assertEqual(ref_id_list_asc[2], availability_zones[2][const.NAME])
+
+        # Check the descending sort order by name
+        availability_zones = (
+            self.lb_admin_availability_zone_client.list_availability_zones(
+                query_params='{sort}={name}:{order}'.format(
+                    sort=const.SORT, name=const.NAME, order=const.DESC)))
+        # Remove availability zones not used in this test
+        availability_zones = [
+            az for az in availability_zones
+            if 'lb_admin_availability_zone-list' in az[const.NAME]]
+        self.assertEqual(3, len(availability_zones))
+        self.assertEqual(az3_name, availability_zones[0][const.NAME])
+        self.assertEqual(az2_name, availability_zones[1][const.NAME])
+        self.assertEqual(az1_name, availability_zones[2][const.NAME])
+
+        # Check the ascending sort order by name
+        availability_zones = (
+            self.mem_availability_zone_client.list_availability_zones(
+                query_params='{sort}={name}:{order}'.format(
+                    sort=const.SORT, name=const.NAME, order=const.ASC)))
+        # Remove availability zones not used in this test
+        availability_zones = [
+            az for az in availability_zones
+            if 'lb_admin_availability_zone-list' in az[const.NAME]]
+        self.assertEqual(3, len(availability_zones))
+        self.assertEqual(az1_name, availability_zones[0][const.NAME])
+        self.assertEqual(az2_name, availability_zones[1][const.NAME])
+        self.assertEqual(az3_name, availability_zones[2][const.NAME])
+
+        ref_availability_zones = [az1, az2, az3]
+        sorted_availability_zones = sorted(ref_availability_zones,
+                                           key=itemgetter(const.NAME))
+        sorted_enabled_availability_zones = [
+            az for az in sorted_availability_zones
+            if az[const.ENABLED]]
+
+        # Test fields
+        for field in const.SHOW_AVAILABILITY_ZONE_FIELDS:
+            availability_zones = (
+                self.mem_availability_zone_client
+                    .list_availability_zones(
+                        query_params='{fields}={field}&{fields}={name}'.format(
+                            fields=const.FIELDS, field=field, name=const.NAME))
+            )
+            # Remove availability zones not used in this test
+            availability_zones = [
+                az for az in availability_zones
+                if 'lb_admin_availability_zone-list' in az[const.NAME]]
+            self.assertEqual(3, len(availability_zones))
+            self.assertEqual(sorted_availability_zones[0][field],
+                             availability_zones[0][field])
+            self.assertEqual(sorted_availability_zones[1][field],
+                             availability_zones[1][field])
+            self.assertEqual(sorted_availability_zones[2][field],
+                             availability_zones[2][field])
+
+        # Test filtering
+        availability_zone = (
+            self.mem_availability_zone_client.list_availability_zones(
+                query_params='{name}={az_name}'.format(
+                    name=const.NAME, az_name=az2[const.NAME])))
+        self.assertEqual(1, len(availability_zone))
+        self.assertEqual(az2[const.NAME], availability_zone[0][const.NAME])
+
+        # Test combined params
+        availability_zones = (
+            self.mem_availability_zone_client.list_availability_zones(
+                query_params='{enabled}={enable}&{fields}={name}&'
+                             '{sort}={ID}:{desc}'.format(
+                                 enabled=const.ENABLED,
+                                 enable=True,
+                                 fields=const.FIELDS, name=const.NAME,
+                                 sort=const.SORT, ID=const.NAME,
+                                 desc=const.DESC)))
+        # Remove availability zones not used in this test
+        availability_zones = [
+            az for az in availability_zones
+            if 'lb_admin_availability_zone-list' in az[const.NAME]]
+        self.assertEqual(2, len(availability_zones))
+        self.assertEqual(1, len(availability_zones[0]))
+        self.assertEqual(sorted_enabled_availability_zones[1][const.NAME],
+                         availability_zones[0][const.NAME])
+        self.assertEqual(sorted_enabled_availability_zones[0][const.NAME],
+                         availability_zones[1][const.NAME])
+
+    @decorators.idempotent_id('4fa77f96-ba75-4255-bef8-6710cd7cb762')
+    def test_availability_zone_show(self):
+        """Tests availability zone show API.
+
+        * Create a fully populated availability zone.
+        * Validate that non-lb-admin accounts cannot see the availability zone.
+        * Show availability zone details.
+        * Validate the show reflects the requested values.
+        """
+        # We have to do this here as the api_version and clients are not
+        # setup in time to use a decorator or the skip_checks mixin
+        if not self.lb_admin_availability_zone_client.is_version_supported(
+                self.api_version, '2.14'):
+            raise self.skipException('Availability zones are only available '
+                                     'on Octavia API version 2.14 or newer.')
+        availability_zone_name = data_utils.rand_name(
+            "lb_admin_availability_zone-show")
+        availability_zone_description = data_utils.arbitrary_string(size=255)
+
+        availability_zone_kwargs = {
+            const.NAME: availability_zone_name,
+            const.DESCRIPTION: availability_zone_description,
+            const.ENABLED: True,
+            const.AVAILABILITY_ZONE_PROFILE_ID:
+                self.availability_zone_profile_id}
+
+        # Happy path
+        availability_zone = (
+            self.lb_admin_availability_zone_client
+                .create_availability_zone(**availability_zone_kwargs))
+        self.addCleanup(
+            self.lb_admin_availability_zone_client
+                .cleanup_an_availability_zone,
+            availability_zone[const.NAME])
+
+        # Test that a user without the load balancer role cannot
+        # show availability zone details.
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            self.assertRaises(
+                exceptions.Forbidden,
+                self.os_primary.availability_zone_client
+                    .show_availability_zone,
+                availability_zone[const.NAME])
+
+        result = self.mem_availability_zone_client.show_availability_zone(
+            availability_zone[const.NAME])
+
+        self.assertEqual(availability_zone_name, result[const.NAME])
+        self.assertEqual(availability_zone_description,
+                         result[const.DESCRIPTION])
+        self.assertTrue(result[const.ENABLED])
+        self.assertEqual(self.availability_zone_profile_id,
+                         result[const.AVAILABILITY_ZONE_PROFILE_ID])
+
+    @decorators.idempotent_id('9c466b9f-b70a-456d-9172-eb79b7820c7f')
+    def test_availability_zone_update(self):
+        """Tests availability zone update API.
+
+        * Create a fully populated availability zone.
+        * Show availability zone details.
+        * Validate the show reflects the initial values.
+        * Validate that non-admin accounts cannot update the availability zone.
+        * Update the availability zone details.
+        * Show availability zone details.
+        * Validate the show reflects the updated values.
+        """
+        # We have to do this here as the api_version and clients are not
+        # setup in time to use a decorator or the skip_checks mixin
+        if not self.lb_admin_availability_zone_client.is_version_supported(
+                self.api_version, '2.14'):
+            raise self.skipException('Availability zones are only available '
+                                     'on Octavia API version 2.14 or newer.')
+        availability_zone_name = data_utils.rand_name(
+            "lb_admin_availability_zone-update")
+        availability_zone_description = data_utils.arbitrary_string(size=255)
+
+        availability_zone_kwargs = {
+            const.NAME: availability_zone_name,
+            const.DESCRIPTION: availability_zone_description,
+            const.ENABLED: True,
+            const.AVAILABILITY_ZONE_PROFILE_ID:
+                self.availability_zone_profile_id}
+
+        # Happy path
+        availability_zone = (
+            self.lb_admin_availability_zone_client
+                .create_availability_zone(**availability_zone_kwargs))
+        self.addCleanup(
+            self.lb_admin_availability_zone_client
+                .cleanup_an_availability_zone,
+            availability_zone[const.NAME])
+
+        availability_zone_description2 = data_utils.arbitrary_string(size=255)
+        availability_zone_updated_kwargs = {
+            const.DESCRIPTION: availability_zone_description2,
+            const.ENABLED: False}
+
+        # Test that a user without the load balancer role cannot
+        # show availability zone details.
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            self.assertRaises(
+                exceptions.Forbidden,
+                self.os_primary.availability_zone_client
+                    .update_availability_zone,
+                availability_zone[const.NAME],
+                **availability_zone_updated_kwargs)
+
+        updated_availability_zone = (
+            self.lb_admin_availability_zone_client.update_availability_zone(
+                availability_zone[const.NAME],
+                **availability_zone_updated_kwargs))
+
+        self.assertEqual(
+            availability_zone[const.NAME],
+            updated_availability_zone[const.NAME])
+        self.assertEqual(
+            availability_zone_description2,
+            updated_availability_zone[const.DESCRIPTION])
+        self.assertEqual(
+            availability_zone[const.AVAILABILITY_ZONE_PROFILE_ID],
+            updated_availability_zone[const.AVAILABILITY_ZONE_PROFILE_ID])
+        self.assertFalse(updated_availability_zone[const.ENABLED])
+
+        result = (
+            self.mem_availability_zone_client
+                .show_availability_zone(availability_zone[const.NAME]))
+
+        self.assertEqual(availability_zone[const.NAME], result[const.NAME])
+        self.assertEqual(availability_zone_description2,
+                         result[const.DESCRIPTION])
+        self.assertEqual(availability_zone[const.AVAILABILITY_ZONE_PROFILE_ID],
+                         result[const.AVAILABILITY_ZONE_PROFILE_ID])
+        self.assertFalse(result[const.ENABLED])
+
+    @decorators.idempotent_id('11585b33-2689-4693-be3b-26b210bb7fc5')
+    def test_availability_zone_delete(self):
+        """Tests availability zone create and delete APIs.
+
+        * Creates an availability zone.
+        * Validates that other accounts cannot delete the availability zone.
+        * Deletes the availability zone.
+        * Validates the availability zone no longer exists.
+        """
+        # We have to do this here as the api_version and clients are not
+        # setup in time to use a decorator or the skip_checks mixin
+        if not self.lb_admin_availability_zone_client.is_version_supported(
+                self.api_version, '2.14'):
+            raise self.skipException('Availability zones are only available '
+                                     'on Octavia API version 2.14 or newer.')
+        availability_zone_name = data_utils.rand_name(
+            "lb_admin_availability_zone-delete")
+        availability_zone_description = data_utils.arbitrary_string(size=255)
+
+        availability_zone_kwargs = {
+            const.NAME: availability_zone_name,
+            const.DESCRIPTION: availability_zone_description,
+            const.ENABLED: True,
+            const.AVAILABILITY_ZONE_PROFILE_ID:
+                self.availability_zone_profile_id}
+
+        # Happy path
+        availability_zone = (
+            self.lb_admin_availability_zone_client
+                .create_availability_zone(**availability_zone_kwargs))
+        self.addCleanup(
+            self.lb_admin_availability_zone_client
+                .cleanup_an_availability_zone,
+            availability_zone[const.NAME])
+
+        # Test that a user without the load balancer admin role cannot
+        # delete an availability zone.
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            self.assertRaises(
+                exceptions.Forbidden,
+                self.os_primary.availability_zone_client
+                    .delete_availability_zone,
+                availability_zone[const.NAME])
+
+        # Happy path
+        self.lb_admin_availability_zone_client.delete_availability_zone(
+            availability_zone[const.NAME])
+
+        self.assertRaises(
+            exceptions.NotFound,
+            self.lb_admin_availability_zone_client.show_availability_zone,
+            availability_zone[const.NAME])
diff --git a/octavia_tempest_plugin/tests/api/v2/test_availability_zone_capabilities.py b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_capabilities.py
new file mode 100644
index 0000000..4a12057
--- /dev/null
+++ b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_capabilities.py
@@ -0,0 +1,93 @@
+#    Copyright 2019 Rackspace US Inc.  All rights reserved.
+#    Copyright 2019 Verizon Media
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from tempest import config
+from tempest.lib import decorators
+from tempest.lib import exceptions
+
+from octavia_tempest_plugin.common import constants as const
+from octavia_tempest_plugin.tests import test_base
+
+CONF = config.CONF
+
+
+class AvailabilityZoneCapabilitiesAPITest(test_base.LoadBalancerBaseTest):
+    """Test the provider availability zone capabilities API."""
+
+    @decorators.idempotent_id('cb3e4c59-4114-420b-9837-2666d4d5fef4')
+    def test_availability_zone_capabilities_list(self):
+        """Tests provider availability zone capabilities list API/filtering.
+
+        * Validates that non-lb admin accounts cannot list the capabilities.
+        * List the availability zone capablilities.
+        * Validate that the "loadbalancer_topology" capablility is present.
+        * List the providers returning one field at a time.
+        """
+        # We have to do this here as the api_version and clients are not
+        # setup in time to use a decorator or the skip_checks mixin
+        if not self.mem_provider_client.is_version_supported(
+                self.api_version, '2.14'):
+            raise self.skipException(
+                'Availability zone capabilities are only available '
+                'on Octavia API version 2.14 or newer.')
+
+        # Test that a user without the load balancer admin role cannot
+        # list provider availability zone capabilities.
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            os_primary_capabilities_client = (
+                self.os_primary.availability_zone_capabilities_client)
+            self.assertRaises(
+                exceptions.Forbidden,
+                (os_primary_capabilities_client
+                 .list_availability_zone_capabilities),
+                CONF.load_balancer.provider)
+
+        # Check for an expected availability zone capability for the
+        # configured provider
+        admin_capabilities_client = (
+            self.lb_admin_availability_zone_capabilities_client)
+        capabilities = (
+            admin_capabilities_client.list_availability_zone_capabilities(
+                CONF.load_balancer.provider))
+
+        expected_name = list(
+            CONF.load_balancer.expected_availability_zone_capability)[0]
+        expected_description = (
+            CONF.load_balancer.expected_availability_zone_capability[
+                expected_name])
+        for capability in capabilities:
+            if capability[const.NAME] == expected_name:
+                self.assertEqual(expected_description,
+                                 capability[const.DESCRIPTION])
+
+        # Test fields
+        capabilities = (
+            admin_capabilities_client.list_availability_zone_capabilities(
+                CONF.load_balancer.provider,
+                query_params='{fields}={field}&{field}={exp_name}'.format(
+                    fields=const.FIELDS, field=const.NAME,
+                    exp_name=expected_name)))
+        self.assertEqual(1, len(capabilities[0]))
+        self.assertEqual(expected_name, capabilities[0][const.NAME])
+
+        capabilities = (
+            admin_capabilities_client.list_availability_zone_capabilities(
+                CONF.load_balancer.provider,
+                query_params='{fields}={field}&{name}={exp_name}'.format(
+                    fields=const.FIELDS, field=const.DESCRIPTION,
+                    name=const.NAME, exp_name=expected_name)))
+        self.assertEqual(1, len(capabilities[0]))
+        self.assertEqual(expected_description,
+                         capabilities[0][const.DESCRIPTION])
diff --git a/octavia_tempest_plugin/tests/api/v2/test_availability_zone_profile.py b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_profile.py
new file mode 100644
index 0000000..86ae066
--- /dev/null
+++ b/octavia_tempest_plugin/tests/api/v2/test_availability_zone_profile.py
@@ -0,0 +1,536 @@
+#    Copyright 2019 Rackspace US Inc.  All rights reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+import copy
+from operator import itemgetter
+from uuid import UUID
+
+from oslo_serialization import jsonutils
+from oslo_utils import uuidutils
+from tempest import config
+from tempest.lib.common.utils import data_utils
+from tempest.lib import decorators
+from tempest.lib import exceptions
+
+from octavia_tempest_plugin.common import constants as const
+from octavia_tempest_plugin.tests import test_base
+
+CONF = config.CONF
+
+
+class AvailabilityZoneProfileAPITest(test_base.LoadBalancerBaseTest):
+    """Test the availability zone profile object API."""
+
+    @decorators.idempotent_id('e512b580-ef32-44c3-bbd2-efdc27ba2ea6')
+    def test_availability_zone_profile_create(self):
+        """Tests availability zone profile create and basic show APIs.
+
+        * Tests that users without the loadbalancer admin role cannot
+          create availability zone profiles.
+        * Create a fully populated availability zone profile.
+        * Validate the response reflects the requested values.
+        """
+        # We have to do this here as the api_version and clients are not
+        # setup in time to use a decorator or the skip_checks mixin
+        if not (
+                self.lb_admin_availability_zone_profile_client
+                    .is_version_supported(self.api_version, '2.14')):
+            raise self.skipException(
+                'Availability zone profiles are only available on '
+                'Octavia API version 2.14 or newer.')
+
+        availability_zone_profile_name = data_utils.rand_name(
+            "lb_admin_availabilityzoneprofile1-create")
+        availability_zone_data = {
+            const.COMPUTE_ZONE: 'my_compute_zone',
+            const.MANAGEMENT_NETWORK: uuidutils.generate_uuid(),
+        }
+        availability_zone_data_json = jsonutils.dumps(availability_zone_data)
+
+        availability_zone_profile_kwargs = {
+            const.NAME: availability_zone_profile_name,
+            const.PROVIDER_NAME: CONF.load_balancer.provider,
+            const.AVAILABILITY_ZONE_DATA: availability_zone_data_json
+        }
+
+        # Test that a user without the load balancer admin role cannot
+        # create an availability zone profile profile
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            self.assertRaises(
+                exceptions.Forbidden,
+                self.os_primary.availability_zone_profile_client
+                    .create_availability_zone_profile,
+                **availability_zone_profile_kwargs)
+
+        # Happy path
+        availability_zone_profile = (
+            self.lb_admin_availability_zone_profile_client
+                .create_availability_zone_profile(
+                    **availability_zone_profile_kwargs))
+        self.addCleanup(
+            self.lb_admin_availability_zone_profile_client
+                .cleanup_availability_zone_profile,
+            availability_zone_profile[const.ID])
+
+        UUID(availability_zone_profile[const.ID])
+        self.assertEqual(
+            availability_zone_profile_name,
+            availability_zone_profile[const.NAME])
+        self.assertEqual(
+            CONF.load_balancer.provider,
+            availability_zone_profile[const.PROVIDER_NAME])
+        self.assertEqual(
+            availability_zone_data_json,
+            availability_zone_profile[const.AVAILABILITY_ZONE_DATA])
+
+    @decorators.idempotent_id('ef7d1c45-e312-46ce-8dcb-f2fe26295658')
+    def test_availability_zone_profile_list(self):
+        """Tests availability zone profile list API and field filtering.
+
+        * Create three availability zone profiles.
+        * Validates that non-admin accounts cannot list the availability zone
+          profiles.
+        * List the availability zone profiles using the default sort order.
+        * List the availability zone profiles using descending sort order.
+        * List the availability zone profiles using ascending sort order.
+        * List the availability zone profiles returning one field at a time.
+        * List the availability zone profiles returning two fields.
+        * List the availability zone profiles filtering to one of the three.
+        * List the availability zone profiles filtered, one field, and sorted.
+        """
+        # We have to do this here as the api_version and clients are not
+        # setup in time to use a decorator or the skip_checks mixin
+        if not (self.lb_admin_availability_zone_profile_client
+                .is_version_supported(self.api_version, '2.14')):
+            raise self.skipException(
+                'Availability zone profiles are only available on '
+                'Octavia API version 2.14 or newer.')
+
+        # Create availability zone profile 1
+        availability_zone_profile1_name = data_utils.rand_name(
+            "lb_admin_availabilityzoneprofile-list-1")
+        availability_zone_data1 = {
+            const.COMPUTE_ZONE: 'my_compute_zone1',
+            const.MANAGEMENT_NETWORK: uuidutils.generate_uuid(),
+        }
+        availability_zone_data1_json = jsonutils.dumps(availability_zone_data1)
+
+        availability_zone_profile1_kwargs = {
+            const.NAME: availability_zone_profile1_name,
+            const.PROVIDER_NAME: CONF.load_balancer.provider,
+            const.AVAILABILITY_ZONE_DATA: availability_zone_data1_json
+        }
+        availability_zone_profile1 = (
+            self.lb_admin_availability_zone_profile_client
+                .create_availability_zone_profile(
+                    **availability_zone_profile1_kwargs))
+        self.addCleanup(
+            self.lb_admin_availability_zone_profile_client
+                .cleanup_availability_zone_profile,
+            availability_zone_profile1[const.ID])
+
+        # Create availability zone profile 2
+        availability_zone_profile2_name = data_utils.rand_name(
+            "lb_admin_availabilityzoneprofile-list-2")
+        availability_zone_data2 = {
+            const.COMPUTE_ZONE: 'my_compute_zone2',
+            const.MANAGEMENT_NETWORK: uuidutils.generate_uuid(),
+        }
+        availability_zone_data2_json = jsonutils.dumps(availability_zone_data2)
+
+        availability_zone_profile2_kwargs = {
+            const.NAME: availability_zone_profile2_name,
+            const.PROVIDER_NAME: CONF.load_balancer.provider,
+            const.AVAILABILITY_ZONE_DATA: availability_zone_data2_json
+        }
+        availability_zone_profile2 = (
+            self.lb_admin_availability_zone_profile_client
+                .create_availability_zone_profile(
+                    **availability_zone_profile2_kwargs))
+        self.addCleanup(
+            self.lb_admin_availability_zone_profile_client
+                .cleanup_availability_zone_profile,
+            availability_zone_profile2[const.ID])
+
+        # Create availability zone profile 3
+        availability_zone_profile3_name = data_utils.rand_name(
+            "lb_admin_availabilityzoneprofile-list-3")
+        availability_zone_data3 = {
+            const.COMPUTE_ZONE: 'my_compute_zone3',
+            const.MANAGEMENT_NETWORK: uuidutils.generate_uuid(),
+        }
+        availability_zone_data3_json = jsonutils.dumps(availability_zone_data3)
+
+        availability_zone_profile3_kwargs = {
+            const.NAME: availability_zone_profile3_name,
+            const.PROVIDER_NAME: CONF.load_balancer.provider,
+            const.AVAILABILITY_ZONE_DATA: availability_zone_data3_json
+        }
+        availability_zone_profile3 = (
+            self.lb_admin_availability_zone_profile_client
+                .create_availability_zone_profile(
+                    **availability_zone_profile3_kwargs))
+        self.addCleanup(
+            self.lb_admin_availability_zone_profile_client
+                .cleanup_availability_zone_profile,
+            availability_zone_profile3[const.ID])
+
+        # default sort order (by ID) reference list
+        ref_id_list_asc = [availability_zone_profile1[const.ID],
+                           availability_zone_profile2[const.ID],
+                           availability_zone_profile3[const.ID]]
+        ref_id_list_dsc = copy.deepcopy(ref_id_list_asc)
+        ref_id_list_asc.sort()
+        ref_id_list_dsc.sort(reverse=True)
+
+        # Test that a user without the load balancer admin role cannot
+        # list availability zone profiles.
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            self.assertRaises(
+                exceptions.Forbidden,
+                self.os_primary.availability_zone_profile_client
+                    .list_availability_zone_profiles)
+
+        # Check the default sort order (by ID)
+        profiles = (self.lb_admin_availability_zone_profile_client
+                    .list_availability_zone_profiles())
+        # Remove availability zone profiles not used in this test
+        profiles = [
+            prof for prof in profiles
+            if 'lb_admin_availabilityzoneprofile-list' in prof[const.NAME]]
+        self.assertEqual(3, len(profiles))
+        self.assertEqual(ref_id_list_asc[0], profiles[0][const.ID])
+        self.assertEqual(ref_id_list_asc[1], profiles[1][const.ID])
+        self.assertEqual(ref_id_list_asc[2], profiles[2][const.ID])
+
+        # Check the descending sort order by name
+        profiles = (
+            self.lb_admin_availability_zone_profile_client
+            .list_availability_zone_profiles(
+                query_params='{sort}={name}:{order}'.format(
+                    sort=const.SORT, name=const.NAME, order=const.DESC)))
+        # Remove availability zone profiles not used in this test
+        profiles = [
+            prof for prof in profiles
+            if 'lb_admin_availabilityzoneprofile-list' in prof[const.NAME]]
+        self.assertEqual(3, len(profiles))
+        self.assertEqual(availability_zone_profile3_name,
+                         profiles[0][const.NAME])
+        self.assertEqual(availability_zone_profile2_name,
+                         profiles[1][const.NAME])
+        self.assertEqual(availability_zone_profile1_name,
+                         profiles[2][const.NAME])
+
+        # Check the ascending sort order by name
+        profiles = (
+            self.lb_admin_availability_zone_profile_client
+            .list_availability_zone_profiles(
+                query_params='{sort}={name}:{order}'.format(
+                    sort=const.SORT, name=const.NAME, order=const.ASC)))
+        # Remove availability zone profiles not used in this test
+        profiles = [
+            prof for prof in profiles
+            if 'lb_admin_availabilityzoneprofile-list' in prof[const.NAME]]
+        self.assertEqual(3, len(profiles))
+        self.assertEqual(availability_zone_profile1_name,
+                         profiles[0][const.NAME])
+        self.assertEqual(availability_zone_profile2_name,
+                         profiles[1][const.NAME])
+        self.assertEqual(availability_zone_profile3_name,
+                         profiles[2][const.NAME])
+
+        ref_profiles = [availability_zone_profile1, availability_zone_profile2,
+                        availability_zone_profile3]
+        sorted_profiles = sorted(ref_profiles, key=itemgetter(const.ID))
+
+        # Test fields
+        availability_zone_profile_client = (
+            self.lb_admin_availability_zone_profile_client)
+        for field in const.SHOW_AVAILABILITY_ZONE_PROFILE_FIELDS:
+            profiles = (
+                availability_zone_profile_client
+                .list_availability_zone_profiles(
+                    query_params='{fields}={field}&{fields}={name}'.format(
+                        fields=const.FIELDS, field=field, name=const.NAME)))
+            # Remove availability zone profiles not used in this test
+            profiles = [
+                prof for prof in profiles
+                if 'lb_admin_availabilityzoneprofile-list' in prof[const.NAME]]
+
+            self.assertEqual(3, len(profiles))
+            self.assertEqual(sorted_profiles[0][field], profiles[0][field])
+            self.assertEqual(sorted_profiles[1][field], profiles[1][field])
+            self.assertEqual(sorted_profiles[2][field], profiles[2][field])
+
+        # Test filtering
+        profile = (
+            self.lb_admin_availability_zone_profile_client
+            .list_availability_zone_profiles(
+                query_params='{name}={prof_name}'.format(
+                    name=const.NAME,
+                    prof_name=availability_zone_profile2[const.NAME])))
+        self.assertEqual(1, len(profile))
+        self.assertEqual(availability_zone_profile2[const.ID],
+                         profile[0][const.ID])
+
+        # Test combined params
+        profiles = (
+            self.lb_admin_availability_zone_profile_client
+            .list_availability_zone_profiles(
+                query_params='{provider_name}={provider}&{fields}={name}&'
+                             '{sort}={ID}:{desc}'.format(
+                                 provider_name=const.PROVIDER_NAME,
+                                 provider=CONF.load_balancer.provider,
+                                 fields=const.FIELDS, name=const.NAME,
+                                 sort=const.SORT, ID=const.ID,
+                                 desc=const.DESC)))
+        # Remove availability zone profiles not used in this test
+        profiles = [
+            prof for prof in profiles
+            if 'lb_admin_availabilityzoneprofile-list' in prof[const.NAME]]
+        self.assertEqual(3, len(profiles))
+        self.assertEqual(1, len(profiles[0]))
+        self.assertEqual(sorted_profiles[2][const.NAME],
+                         profiles[0][const.NAME])
+        self.assertEqual(sorted_profiles[1][const.NAME],
+                         profiles[1][const.NAME])
+        self.assertEqual(sorted_profiles[0][const.NAME],
+                         profiles[2][const.NAME])
+
+    @decorators.idempotent_id('379d92dc-7f6d-4674-ae6f-b3aa2120c677')
+    def test_availability_zone_profile_show(self):
+        """Tests availability zone profile show API.
+
+        * Create a fully populated availability zone profile.
+        * Show availability zone profile details.
+        * Validate the show reflects the requested values.
+        * Validates that non-lb-admin accounts cannot see the availability zone
+          profile.
+        """
+        # We have to do this here as the api_version and clients are not
+        # setup in time to use a decorator or the skip_checks mixin
+        if not (self.lb_admin_availability_zone_profile_client
+                .is_version_supported(self.api_version, '2.14')):
+            raise self.skipException(
+                'Availability zone profiles are only available on '
+                'Octavia API version 2.14 or newer.')
+
+        availability_zone_profile_name = data_utils.rand_name(
+            "lb_admin_availabilityzoneprofile1-show")
+        availability_zone_data = {
+            const.COMPUTE_ZONE: 'my_compute_zone',
+            const.MANAGEMENT_NETWORK: uuidutils.generate_uuid(),
+        }
+        availability_zone_data_json = jsonutils.dumps(availability_zone_data)
+
+        availability_zone_profile_kwargs = {
+            const.NAME: availability_zone_profile_name,
+            const.PROVIDER_NAME: CONF.load_balancer.provider,
+            const.AVAILABILITY_ZONE_DATA: availability_zone_data_json
+        }
+
+        availability_zone_profile = (
+            self.lb_admin_availability_zone_profile_client
+                .create_availability_zone_profile(
+                    **availability_zone_profile_kwargs))
+        self.addCleanup(
+            self.lb_admin_availability_zone_profile_client
+                .cleanup_availability_zone_profile,
+            availability_zone_profile[const.ID])
+
+        # Test that a user without the load balancer admin role cannot
+        # show an availability zone profile profile
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            self.assertRaises(
+                exceptions.Forbidden,
+                self.os_primary.availability_zone_profile_client
+                    .show_availability_zone_profile,
+                availability_zone_profile[const.ID])
+
+        result = (
+            self.lb_admin_availability_zone_profile_client
+                .show_availability_zone_profile(
+                    availability_zone_profile[const.ID]))
+
+        self.assertEqual(availability_zone_profile_name, result[const.NAME])
+        self.assertEqual(CONF.load_balancer.provider,
+                         result[const.PROVIDER_NAME])
+        self.assertEqual(availability_zone_data_json,
+                         result[const.AVAILABILITY_ZONE_DATA])
+
+    @decorators.idempotent_id('7121d4c0-f751-4b4e-a4c1-ab06c27a54a4')
+    def test_availability_zone_profile_update(self):
+        """Tests availability zone profile update API.
+
+        * Create a fully populated availability zone profile.
+        * Show availability zone profile details.
+        * Validate the show reflects the initial values.
+        * Validates that non-admin accounts cannot update the availability zone
+          profile.
+        * Update the availability zone profile details.
+        * Show availability zone profile details.
+        * Validate the show reflects the updated values.
+        """
+
+        # We have to do this here as the api_version and clients are not
+        # setup in time to use a decorator or the skip_checks mixin
+        if not (self.lb_admin_availability_zone_profile_client
+                .is_version_supported(self.api_version, '2.14')):
+            raise self.skipException(
+                'Availability zone profiles are only available on '
+                'Octavia API version 2.14 or newer.')
+
+        availability_zone_profile_name = data_utils.rand_name(
+            "lb_admin_availabilityzoneprofile1-update")
+        availability_zone_data = {
+            const.COMPUTE_ZONE: 'my_compute_zone1',
+            const.MANAGEMENT_NETWORK: uuidutils.generate_uuid(),
+        }
+        availability_zone_data_json = jsonutils.dumps(availability_zone_data)
+
+        availability_zone_profile_kwargs = {
+            const.NAME: availability_zone_profile_name,
+            const.PROVIDER_NAME: CONF.load_balancer.provider,
+            const.AVAILABILITY_ZONE_DATA: availability_zone_data_json
+        }
+
+        availability_zone_profile = (
+            self.lb_admin_availability_zone_profile_client
+                .create_availability_zone_profile(
+                    **availability_zone_profile_kwargs))
+        self.addCleanup(
+            self.lb_admin_availability_zone_profile_client
+                .cleanup_availability_zone_profile,
+            availability_zone_profile[const.ID])
+
+        self.assertEqual(
+            availability_zone_profile_name,
+            availability_zone_profile[const.NAME])
+        self.assertEqual(
+            CONF.load_balancer.provider,
+            availability_zone_profile[const.PROVIDER_NAME])
+        self.assertEqual(
+            availability_zone_data_json,
+            availability_zone_profile[const.AVAILABILITY_ZONE_DATA])
+
+        availability_zone_profile_name2 = data_utils.rand_name(
+            "lb_admin_availabilityzoneprofile1-update2")
+        availability_zone_data2 = {
+            const.COMPUTE_ZONE: 'my_compute_zone2',
+            const.MANAGEMENT_NETWORK: uuidutils.generate_uuid(),
+        }
+        availability_zone_data2_json = jsonutils.dumps(availability_zone_data2)
+
+        # TODO(johnsom) Figure out a reliable second provider
+        availability_zone_profile_updated_kwargs = {
+            const.NAME: availability_zone_profile_name2,
+            const.PROVIDER_NAME: CONF.load_balancer.provider,
+            const.AVAILABILITY_ZONE_DATA: availability_zone_data2_json
+        }
+
+        # Test that a user without the load balancer admin role cannot
+        # create an availability zone profile profile
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            self.assertRaises(
+                exceptions.Forbidden,
+                self.os_primary.availability_zone_profile_client
+                    .update_availability_zone_profile,
+                availability_zone_profile[const.ID],
+                **availability_zone_profile_updated_kwargs)
+
+        result = (
+            self.lb_admin_availability_zone_profile_client
+            .update_availability_zone_profile(
+                availability_zone_profile[const.ID],
+                **availability_zone_profile_updated_kwargs))
+
+        self.assertEqual(availability_zone_profile_name2, result[const.NAME])
+        self.assertEqual(CONF.load_balancer.provider,
+                         result[const.PROVIDER_NAME])
+        self.assertEqual(availability_zone_data2_json,
+                         result[const.AVAILABILITY_ZONE_DATA])
+
+        # Check that a show reflects the new values
+        get_result = (
+            self.lb_admin_availability_zone_profile_client
+                .show_availability_zone_profile(
+                    availability_zone_profile[const.ID]))
+
+        self.assertEqual(availability_zone_profile_name2,
+                         get_result[const.NAME])
+        self.assertEqual(CONF.load_balancer.provider,
+                         get_result[const.PROVIDER_NAME])
+        self.assertEqual(availability_zone_data2_json,
+                         get_result[const.AVAILABILITY_ZONE_DATA])
+
+    @decorators.idempotent_id('371cee1d-3404-4744-b5c5-8a3d37aa8425')
+    def test_availability_zone_profile_delete(self):
+        """Tests availability zone profile create and delete APIs.
+
+        * Creates an availability zone profile profile.
+        * Validates that other accounts cannot delete the availability zone
+          profile.
+        * Deletes the availability zone profile.
+        * Validates the availability zone profile is in the DELETED state.
+        """
+        # We have to do this here as the api_version and clients are not
+        # setup in time to use a decorator or the skip_checks mixin
+        if not (self.lb_admin_availability_zone_profile_client
+                .is_version_supported(self.api_version, '2.14')):
+            raise self.skipException(
+                'Availability zone profiles are only available on '
+                'Octavia API version 2.14 or newer.')
+
+        availability_zone_profile_name = data_utils.rand_name(
+            "lb_admin_availabilityzoneprofile1-delete")
+        availability_zone_data = {
+            const.COMPUTE_ZONE: 'my_compute_zone',
+            const.MANAGEMENT_NETWORK: uuidutils.generate_uuid(),
+        }
+        availability_zone_data_json = jsonutils.dumps(availability_zone_data)
+
+        availability_zone_profile_kwargs = {
+            const.NAME: availability_zone_profile_name,
+            const.PROVIDER_NAME: CONF.load_balancer.provider,
+            const.AVAILABILITY_ZONE_DATA: availability_zone_data_json
+        }
+
+        availability_zone_profile = (
+            self.lb_admin_availability_zone_profile_client
+                .create_availability_zone_profile(
+                    **availability_zone_profile_kwargs))
+        self.addCleanup(
+            self.lb_admin_availability_zone_profile_client
+                .cleanup_availability_zone_profile,
+            availability_zone_profile[const.ID])
+
+        # Test that a user without the load balancer admin role cannot
+        # delete an availability zone profile profile
+        if CONF.load_balancer.RBAC_test_type == const.ADVANCED:
+            self.assertRaises(
+                exceptions.Forbidden,
+                self.os_primary.availability_zone_profile_client
+                    .delete_availability_zone_profile,
+                availability_zone_profile[const.ID])
+
+        # Happy path
+        (self.lb_admin_availability_zone_profile_client
+            .delete_availability_zone_profile(
+                availability_zone_profile[const.ID]))
+
+        self.assertRaises(
+            exceptions.NotFound,
+            self.lb_admin_availability_zone_profile_client
+                .show_availability_zone_profile,
+            availability_zone_profile[const.ID])
diff --git a/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py b/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py
index 924c044..7f9da51 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_flavor_capabilities.py
@@ -52,7 +52,7 @@
                 CONF.load_balancer.provider)
 
         # Check for an expected flavor capability for the configured provider
-        admin_capabilities_client = self.lb_admin_capabilities_client
+        admin_capabilities_client = self.lb_admin_flavor_capabilities_client
         capabilities = admin_capabilities_client.list_flavor_capabilities(
             CONF.load_balancer.provider)
 
diff --git a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
index 8db166e..96e7483 100644
--- a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
+++ b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
@@ -13,7 +13,9 @@
 #    under the License.
 
 import base64
+import requests
 import socket
+import tempfile
 
 from cryptography.hazmat.primitives import serialization
 from OpenSSL.crypto import X509
@@ -36,15 +38,19 @@
 
 
 class TLSWithBarbicanTest(test_base.LoadBalancerBaseTestWithCompute):
-
     @classmethod
     def skip_checks(cls):
         super(TLSWithBarbicanTest, cls).skip_checks()
+        if not CONF.loadbalancer_feature_enabled.l7_protocol_enabled:
+            raise cls.skipException(
+                '[loadbalancer_feature_enabled] "l7_protocol_enabled" is '
+                'False in the tempest configuration. TLS tests will be '
+                'skipped.')
         if not CONF.loadbalancer_feature_enabled.terminated_tls_enabled:
-            raise cls.skipException('[loadbalancer-feature-enabled] '
-                                    '"terminated_tls_enabled" is False in '
-                                    'the tempest configuration. TLS tests '
-                                    'will be skipped.')
+            raise cls.skipException(
+                '[loadbalancer-feature-enabled] "terminated_tls_enabled" is '
+                'False in the tempest configuration. TLS tests will be '
+                'skipped.')
         if not CONF.validation.run_validation:
             raise cls.skipException('Traffic tests will not work without '
                                     'run_validation enabled.')
@@ -53,6 +59,25 @@
                                     'barbican service.')
 
     @classmethod
+    def _store_secret(cls, barbican_mgr, secret):
+        new_secret_ref = barbican_mgr.store_secret(secret)
+        cls.addClassResourceCleanup(barbican_mgr.delete_secret,
+                                    new_secret_ref)
+
+        # Set the barbican ACL if the Octavia API version doesn't do it
+        # automatically.
+        if not cls.mem_lb_client.is_version_supported(
+                cls.api_version, '2.1'):
+            user_list = cls.os_admin.users_v3_client.list_users(
+                name=CONF.load_balancer.octavia_svc_username)
+            msg = 'Only one user named "{0}" should exist, {1} found.'.format(
+                CONF.load_balancer.octavia_svc_username,
+                len(user_list['users']))
+            assert 1 == len(user_list['users']), msg
+            barbican_mgr.add_acl(new_secret_ref, user_list['users'][0]['id'])
+        return new_secret_ref
+
+    @classmethod
     def _generate_load_certificate(cls, barbican_mgr, ca_cert, ca_key, name):
         new_cert, new_key = cert_utils.generate_server_cert_and_key(
             ca_cert, ca_key, name)
@@ -72,20 +97,8 @@
         pkcs12 = cert_utils.generate_pkcs12_bundle(new_cert, new_key)
         LOG.debug('%s PKCS12 bundle: %s', name, base64.b64encode(pkcs12))
 
-        new_secret_ref = barbican_mgr.store_secret(pkcs12)
-        cls.addClassResourceCleanup(barbican_mgr.delete_secret, new_secret_ref)
+        new_secret_ref = cls._store_secret(barbican_mgr, pkcs12)
 
-        # Set the barbican ACL if the Octavia API version doesn't do it
-        # automatically.
-        if not cls.mem_lb_client.is_version_supported(
-                cls.api_version, '2.1'):
-            user_list = cls.os_admin.users_v3_client.list_users(
-                name=CONF.load_balancer.octavia_svc_username)
-            msg = 'Only one user named "{0}" should exist, {1} found.'.format(
-                CONF.load_balancer.octavia_svc_username,
-                len(user_list['users']))
-            assert 1 == len(user_list['users']), msg
-            barbican_mgr.add_acl(new_secret_ref, user_list['users'][0]['id'])
         return new_cert, new_key, new_secret_ref
 
     @classmethod
@@ -108,7 +121,7 @@
 
         # Load the secret into the barbican service under the
         # os_roles_lb_member tenant
-        barbican_mgr = barbican_client_mgr.BarbicanClientManager(
+        cls.barbican_mgr = barbican_client_mgr.BarbicanClientManager(
             cls.os_roles_lb_member)
 
         # Create a server cert and key
@@ -117,7 +130,7 @@
         LOG.debug('Server (default) UUID: %s' % cls.server_uuid)
 
         server_cert, server_key, cls.server_secret_ref = (
-            cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
+            cls._generate_load_certificate(cls.barbican_mgr, cls.ca_cert,
                                            ca_key, cls.server_uuid))
 
         # Create the SNI1 cert and key
@@ -125,7 +138,7 @@
         LOG.debug('SNI1 UUID: %s' % cls.SNI1_uuid)
 
         SNI1_cert, SNI1_key, cls.SNI1_secret_ref = (
-            cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
+            cls._generate_load_certificate(cls.barbican_mgr, cls.ca_cert,
                                            ca_key, cls.SNI1_uuid))
 
         # Create the SNI2 cert and key
@@ -133,9 +146,37 @@
         LOG.debug('SNI2 UUID: %s' % cls.SNI2_uuid)
 
         SNI2_cert, SNI2_key, cls.SNI2_secret_ref = (
-            cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
+            cls._generate_load_certificate(cls.barbican_mgr, cls.ca_cert,
                                            ca_key, cls.SNI2_uuid))
 
+        # Create the client authentication CA
+        cls.client_ca_cert, client_ca_key = (
+            cert_utils.generate_ca_cert_and_key())
+
+        cls.client_ca_cert_ref = cls._store_secret(
+            cls.barbican_mgr,
+            cls.client_ca_cert.public_bytes(serialization.Encoding.PEM))
+
+        # Create client cert and key
+        cls.client_cn = uuidutils.generate_uuid()
+        cls.client_cert, cls.client_key = (
+            cert_utils.generate_client_cert_and_key(
+                cls.client_ca_cert, client_ca_key, cls.client_cn))
+
+        # Create revoked client cert and key
+        cls.revoked_client_cn = uuidutils.generate_uuid()
+        cls.revoked_client_cert, cls.revoked_client_key = (
+            cert_utils.generate_client_cert_and_key(
+                cls.client_ca_cert, client_ca_key, cls.revoked_client_cn))
+
+        # Create certificate revocation list and revoke cert
+        cls.client_crl = cert_utils.generate_certificate_revocation_list(
+            cls.client_ca_cert, client_ca_key, cls.revoked_client_cert)
+
+        cls.client_crl_ref = cls._store_secret(
+            cls.barbican_mgr,
+            cls.client_crl.public_bytes(serialization.Encoding.PEM))
+
         # Setup a load balancer for the tests to use
         lb_name = data_utils.rand_name("lb_member_lb1-tls")
         lb_kwargs = {const.PROVIDER: CONF.load_balancer.provider,
@@ -305,6 +346,60 @@
         # Validate the certificate is signed by the ca_cert we created
         sock.do_handshake()
 
+    @decorators.idempotent_id('dcf11f78-7af3-4832-b716-9a01648f439c')
+    def test_mixed_http_https_traffic(self):
+
+        listener_name = data_utils.rand_name("lb_member_listener1-tls")
+        listener_kwargs = {
+            const.NAME: listener_name,
+            const.PROTOCOL: const.TERMINATED_HTTPS,
+            const.PROTOCOL_PORT: '443',
+            const.LOADBALANCER_ID: self.lb_id,
+            const.DEFAULT_POOL_ID: self.pool_id,
+            const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
+        }
+        listener = self.mem_listener_client.create_listener(**listener_kwargs)
+        self.listener_id = listener[const.ID]
+        self.addCleanup(
+            self.mem_listener_client.cleanup_listener,
+            self.listener_id,
+            lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+        waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+                                self.lb_id, const.PROVISIONING_STATUS,
+                                const.ACTIVE,
+                                CONF.load_balancer.build_interval,
+                                CONF.load_balancer.build_timeout)
+
+        listener_name = data_utils.rand_name("lb_member_listener2-http-tls")
+        listener_kwargs = {
+            const.NAME: listener_name,
+            const.PROTOCOL: const.HTTP,
+            const.PROTOCOL_PORT: '80',
+            const.LOADBALANCER_ID: self.lb_id,
+            const.DEFAULT_POOL_ID: self.pool_id,
+        }
+        listener = self.mem_listener_client.create_listener(**listener_kwargs)
+        self.listener2_id = listener[const.ID]
+        self.addCleanup(
+            self.mem_listener_client.cleanup_listener,
+            self.listener2_id,
+            lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+        waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+                                self.lb_id, const.PROVISIONING_STATUS,
+                                const.ACTIVE,
+                                CONF.load_balancer.build_interval,
+                                CONF.load_balancer.build_timeout)
+
+        # Test HTTPS listener load balancing.
+        # Note: certificate validation tests will follow this test
+        self.check_members_balanced(self.lb_vip_address, protocol='https',
+                                    verify=False, protocol_port=443)
+
+        # Test HTTP listener load balancing.
+        self.check_members_balanced(self.lb_vip_address)
+
     @decorators.idempotent_id('08405802-4411-4454-b008-8607408f424a')
     def test_basic_tls_SNI_traffic(self):
 
@@ -618,3 +713,384 @@
         sock.connect((self.lb_vip_address, 8443))
         # Validate the certificate is signed by the ca_cert we created
         sock.do_handshake()
+
+    @decorators.idempotent_id('af6bb7d2-acbb-4f6e-861f-39a2a3f02331')
+    def test_tls_client_auth_mandatory(self):
+        if not self.mem_listener_client.is_version_supported(
+                self.api_version, '2.8'):
+            raise self.skipException('TLS client authentication '
+                                     'is only available on Octavia API '
+                                     'version 2.8 or newer.')
+        LISTENER1_TCP_PORT = '443'
+        listener_name = data_utils.rand_name(
+            "lb_member_listener1-client-auth-mand")
+        listener_kwargs = {
+            const.NAME: listener_name,
+            const.PROTOCOL: const.TERMINATED_HTTPS,
+            const.PROTOCOL_PORT: LISTENER1_TCP_PORT,
+            const.LOADBALANCER_ID: self.lb_id,
+            const.DEFAULT_POOL_ID: self.pool_id,
+            const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
+            const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY,
+            const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref,
+            const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref,
+        }
+        listener = self.mem_listener_client.create_listener(**listener_kwargs)
+        self.listener_id = listener[const.ID]
+        self.addCleanup(
+            self.mem_listener_client.cleanup_listener,
+            self.listener_id,
+            lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+        waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+                                self.lb_id, const.PROVISIONING_STATUS,
+                                const.ACTIVE,
+                                CONF.load_balancer.build_interval,
+                                CONF.load_balancer.build_timeout)
+
+        # Test that no client certificate fails to connect
+        self.assertRaisesRegex(
+            requests.exceptions.SSLError, ".*certificate required.*",
+            requests.get,
+            'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT),
+            timeout=12, verify=False)
+
+        # Test that a revoked client certificate fails to connect
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(self.revoked_client_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(self.revoked_client_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                self.assertRaisesRegex(
+                    requests.exceptions.SSLError, ".*revoked.*", requests.get,
+                    'https://{0}:{1}'.format(self.lb_vip_address,
+                                             LISTENER1_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+
+        # Test that a valid client certificate can connect
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(self.client_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(self.client_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                response = requests.get(
+                    'https://{0}:{1}'.format(self.lb_vip_address,
+                                             LISTENER1_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+                self.assertEqual(200, response.status_code)
+
+    @decorators.idempotent_id('42d696bf-e7f5-44f0-9331-4a5e01d69ef3')
+    def test_tls_client_auth_optional(self):
+        if not self.mem_listener_client.is_version_supported(
+                self.api_version, '2.8'):
+            raise self.skipException('TLS client authentication '
+                                     'is only available on Octavia API '
+                                     'version 2.8 or newer.')
+        LISTENER1_TCP_PORT = '443'
+        listener_name = data_utils.rand_name(
+            "lb_member_listener1-client-auth-optional")
+        listener_kwargs = {
+            const.NAME: listener_name,
+            const.PROTOCOL: const.TERMINATED_HTTPS,
+            const.PROTOCOL_PORT: LISTENER1_TCP_PORT,
+            const.LOADBALANCER_ID: self.lb_id,
+            const.DEFAULT_POOL_ID: self.pool_id,
+            const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
+            const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_OPTIONAL,
+            const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref,
+            const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref,
+        }
+        listener = self.mem_listener_client.create_listener(**listener_kwargs)
+        self.listener_id = listener[const.ID]
+        self.addCleanup(
+            self.mem_listener_client.cleanup_listener,
+            self.listener_id,
+            lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+        waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+                                self.lb_id, const.PROVISIONING_STATUS,
+                                const.ACTIVE,
+                                CONF.load_balancer.build_interval,
+                                CONF.load_balancer.build_timeout)
+
+        # Test that no client certificate connects
+        response = requests.get(
+            'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT),
+            timeout=12, verify=False)
+        self.assertEqual(200, response.status_code)
+
+        # Test that a revoked client certificate fails to connect
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(self.revoked_client_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(self.revoked_client_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                self.assertRaisesRegex(
+                    requests.exceptions.SSLError, ".*revoked.*", requests.get,
+                    'https://{0}:{1}'.format(self.lb_vip_address,
+                                             LISTENER1_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+
+        # Test that a valid client certificate can connect
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(self.client_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(self.client_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                response = requests.get(
+                    'https://{0}:{1}'.format(self.lb_vip_address,
+                                             LISTENER1_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+                self.assertEqual(200, response.status_code)
+
+    @decorators.idempotent_id('13271ce6-f9f7-4017-a017-c2fc390b9438')
+    def test_tls_multi_listener_client_auth(self):
+        """Test client authentication in a multi-listener LB.
+
+        Validates that certificates and CRLs don't get cross configured
+        between multiple listeners on the same load balancer.
+        """
+        if not self.mem_listener_client.is_version_supported(
+                self.api_version, '2.8'):
+            raise self.skipException('TLS client authentication '
+                                     'is only available on Octavia API '
+                                     'version 2.8 or newer.')
+        # Create the client2 authentication CA
+        client2_ca_cert, client2_ca_key = (
+            cert_utils.generate_ca_cert_and_key())
+
+        client2_ca_cert_ref = self._store_secret(
+            self.barbican_mgr,
+            client2_ca_cert.public_bytes(serialization.Encoding.PEM))
+
+        # Create client2 cert and key
+        client2_cn = uuidutils.generate_uuid()
+        client2_cert, client2_key = (
+            cert_utils.generate_client_cert_and_key(
+                client2_ca_cert, client2_ca_key, client2_cn))
+
+        # Create revoked client2 cert and key
+        revoked_client2_cn = uuidutils.generate_uuid()
+        revoked_client2_cert, revoked_client2_key = (
+            cert_utils.generate_client_cert_and_key(
+                client2_ca_cert, client2_ca_key, revoked_client2_cn))
+
+        # Create certificate revocation list and revoke cert
+        client2_crl = cert_utils.generate_certificate_revocation_list(
+            client2_ca_cert, client2_ca_key, revoked_client2_cert)
+
+        client2_crl_ref = self._store_secret(
+            self.barbican_mgr,
+            client2_crl.public_bytes(serialization.Encoding.PEM))
+
+        LISTENER1_TCP_PORT = '443'
+        listener_name = data_utils.rand_name(
+            "lb_member_listener1-multi-list-client-auth")
+        listener_kwargs = {
+            const.NAME: listener_name,
+            const.PROTOCOL: const.TERMINATED_HTTPS,
+            const.PROTOCOL_PORT: LISTENER1_TCP_PORT,
+            const.LOADBALANCER_ID: self.lb_id,
+            const.DEFAULT_POOL_ID: self.pool_id,
+            const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
+            const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY,
+            const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref,
+            const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref,
+        }
+        listener = self.mem_listener_client.create_listener(**listener_kwargs)
+        self.listener_id = listener[const.ID]
+        self.addCleanup(
+            self.mem_listener_client.cleanup_listener,
+            self.listener_id,
+            lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+        waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+                                self.lb_id, const.PROVISIONING_STATUS,
+                                const.ACTIVE,
+                                CONF.load_balancer.build_interval,
+                                CONF.load_balancer.build_timeout)
+
+        LISTENER2_TCP_PORT = '8443'
+        listener_name = data_utils.rand_name(
+            "lb_member_listener2-multi-list-client-auth")
+        listener_kwargs = {
+            const.NAME: listener_name,
+            const.PROTOCOL: const.TERMINATED_HTTPS,
+            const.PROTOCOL_PORT: LISTENER2_TCP_PORT,
+            const.LOADBALANCER_ID: self.lb_id,
+            const.DEFAULT_POOL_ID: self.pool_id,
+            const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
+            const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY,
+            const.CLIENT_CA_TLS_CONTAINER_REF: client2_ca_cert_ref,
+            const.CLIENT_CRL_CONTAINER_REF: client2_crl_ref,
+        }
+        listener2 = self.mem_listener_client.create_listener(**listener_kwargs)
+        self.listener2_id = listener2[const.ID]
+        self.addCleanup(
+            self.mem_listener_client.cleanup_listener,
+            self.listener2_id,
+            lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+        waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+                                self.lb_id, const.PROVISIONING_STATUS,
+                                const.ACTIVE,
+                                CONF.load_balancer.build_interval,
+                                CONF.load_balancer.build_timeout)
+
+        # Test that no client certificate fails to connect to listener1
+        self.assertRaisesRegex(
+            requests.exceptions.SSLError, ".*certificate required.*",
+            requests.get,
+            'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT),
+            timeout=12, verify=False)
+
+        # Test that no client certificate fails to connect to listener2
+        self.assertRaisesRegex(
+            requests.exceptions.SSLError, ".*certificate required.*",
+            requests.get,
+            'https://{0}:{1}'.format(self.lb_vip_address, LISTENER2_TCP_PORT),
+            timeout=12, verify=False)
+
+        # Test that a revoked client certificate fails to connect
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(self.revoked_client_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(self.revoked_client_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                self.assertRaisesRegex(
+                    requests.exceptions.SSLError, ".*revoked.*", requests.get,
+                    'https://{0}:{1}'.format(self.lb_vip_address,
+                                             LISTENER1_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+
+        # Test that a revoked client2 certificate fails to connect
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(revoked_client2_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(revoked_client2_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                self.assertRaisesRegex(
+                    requests.exceptions.SSLError, ".*revoked.*", requests.get,
+                    'https://{0}:{1}'.format(self.lb_vip_address,
+                                             LISTENER2_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+
+        # Test that a valid client certificate can connect to listener1
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(self.client_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(self.client_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                response = requests.get(
+                    'https://{0}:{1}'.format(self.lb_vip_address,
+                                             LISTENER1_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+                self.assertEqual(200, response.status_code)
+
+        # Test that a valid client2 certificate can connect to listener2
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(client2_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(client2_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                response = requests.get(
+                    'https://{0}:{1}'.format(self.lb_vip_address,
+                                             LISTENER2_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+                self.assertEqual(200, response.status_code)
+
+        # Test that a valid client1 certificate can not connect to listener2
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(self.client_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(self.client_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                self.assertRaisesRegex(
+                    requests.exceptions.SSLError, ".*decrypt error.*",
+                    requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
+                                                           LISTENER2_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+
+        # Test that a valid client2 certificate can not connect to listener1
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(client2_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(client2_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                self.assertRaisesRegex(
+                    requests.exceptions.SSLError, ".*decrypt error.*",
+                    requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
+                                                           LISTENER1_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+
+        # Test that a revoked client1 certificate can not connect to listener2
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(self.revoked_client_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(self.revoked_client_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                self.assertRaisesRegex(
+                    requests.exceptions.SSLError, ".*decrypt error.*",
+                    requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
+                                                           LISTENER2_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+
+        # Test that a revoked client2 certificate can not connect to listener1
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(revoked_client2_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(revoked_client2_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                self.assertRaisesRegex(
+                    requests.exceptions.SSLError, ".*decrypt error.*",
+                    requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
+                                                           LISTENER1_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
diff --git a/octavia_tempest_plugin/tests/test_base.py b/octavia_tempest_plugin/tests/test_base.py
index f913ea9..2f94bcc 100644
--- a/octavia_tempest_plugin/tests/test_base.py
+++ b/octavia_tempest_plugin/tests/test_base.py
@@ -128,8 +128,16 @@
         cls.mem_flavor_client = cls.os_roles_lb_member.flavor_client
         cls.mem_provider_client = cls.os_roles_lb_member.provider_client
         cls.os_admin_servers_client = cls.os_admin.servers_client
-        cls.lb_admin_capabilities_client = (
+        cls.lb_admin_flavor_capabilities_client = (
             cls.os_roles_lb_admin.flavor_capabilities_client)
+        cls.lb_admin_availability_zone_capabilities_client = (
+            cls.os_roles_lb_admin.availability_zone_capabilities_client)
+        cls.lb_admin_availability_zone_profile_client = (
+            cls.os_roles_lb_admin.availability_zone_profile_client)
+        cls.lb_admin_availability_zone_client = (
+            cls.os_roles_lb_admin.availability_zone_client)
+        cls.mem_availability_zone_client = (
+            cls.os_roles_lb_member.availability_zone_client)
 
     @classmethod
     def resource_setup(cls):
diff --git a/releasenotes/notes/client-auth-scenario-bffa420a2fd38159.yaml b/releasenotes/notes/client-auth-scenario-bffa420a2fd38159.yaml
new file mode 100644
index 0000000..3e44be4
--- /dev/null
+++ b/releasenotes/notes/client-auth-scenario-bffa420a2fd38159.yaml
@@ -0,0 +1,4 @@
+---
+features:
+  - |
+    Adds scenario tests for listener client authentication.
diff --git a/zuul.d/jobs.yaml b/zuul.d/jobs.yaml
index ccf2cb0..0ed5532 100644
--- a/zuul.d/jobs.yaml
+++ b/zuul.d/jobs.yaml
@@ -535,6 +535,15 @@
         OCTAVIA_AMP_IMAGE_SIZE: 3
 
 - job:
+    name: octavia-v2-dsvm-scenario-centos-8
+    parent: octavia-v2-dsvm-scenario
+    vars:
+      devstack_localrc:
+        OCTAVIA_AMP_BASE_OS: centos
+        OCTAVIA_AMP_DISTRIBUTION_RELEASE_ID: 8
+        OCTAVIA_AMP_IMAGE_SIZE: 3
+
+- job:
     name: octavia-v2-dsvm-scenario-ubuntu-bionic
     parent: octavia-v2-dsvm-scenario
     vars:
diff --git a/zuul.d/projects.yaml b/zuul.d/projects.yaml
index 965f961..4445910 100644
--- a/zuul.d/projects.yaml
+++ b/zuul.d/projects.yaml
@@ -20,6 +20,8 @@
         - octavia-v2-dsvm-py2-scenario-stable-queens
         - octavia-v2-dsvm-scenario-ipv6-only:
             voting: false
+        - octavia-v2-dsvm-scenario-centos-8:
+            voting: false
         - octavia-v2-dsvm-scenario-ubuntu-bionic:
             voting: false
         - octavia-v2-act-stdby-dsvm-scenario-two-node:
