Merge "Add listener client authentication scenario tests"
diff --git a/octavia_tempest_plugin/common/barbican_client_mgr.py b/octavia_tempest_plugin/common/barbican_client_mgr.py
index e93f903..eba1715 100644
--- a/octavia_tempest_plugin/common/barbican_client_mgr.py
+++ b/octavia_tempest_plugin/common/barbican_client_mgr.py
@@ -63,15 +63,15 @@
         # Setup the barbican client
         self.barbican = client.Client(session=id_session)
 
-    def store_secret(self, pkcs12_secret):
+    def store_secret(self, secret):
         """Store a secret in barbican.
 
-        :param pkcs12_secret: A pkcs12 secret.
+        :param secret: A pkcs12 secret.
         :returns: The barbican secret_ref.
         """
         p12_secret = self.barbican.secrets.create()
-        p12_secret.name = data_utils.rand_name("lb_member_barbican_pkcs12")
-        p12_secret.payload = pkcs12_secret
+        p12_secret.name = data_utils.rand_name("lb_member_barbican")
+        p12_secret.payload = secret
         secret_ref = p12_secret.store()
         LOG.debug('Secret {0} has ref {1}'.format(p12_secret.name, secret_ref))
         return secret_ref
diff --git a/octavia_tempest_plugin/common/cert_utils.py b/octavia_tempest_plugin/common/cert_utils.py
index dcdd6f0..bb8cdb3 100644
--- a/octavia_tempest_plugin/common/cert_utils.py
+++ b/octavia_tempest_plugin/common/cert_utils.py
@@ -58,6 +58,13 @@
     ).add_extension(
         x509.BasicConstraints(ca=True, path_length=None),
         critical=True,
+    ).add_extension(
+        # KeyUsage(digital_signature, content_commitment, key_encipherment,
+        #          data_encipherment, key_agreement, key_cert_sign, crl_sign,
+        #          encipher_only, decipher_only)
+        x509.KeyUsage(True, False, False, False, False,
+                      True, True, False, False),
+        critical=True,
     ).sign(ca_key, hashes.SHA256(), default_backend())
 
     return ca_cert, ca_key
@@ -104,11 +111,66 @@
     ).add_extension(
         x509.BasicConstraints(ca=False, path_length=None),
         critical=True,
+    ).add_extension(
+        # KeyUsage(digital_signature, content_commitment, key_encipherment,
+        #          data_encipherment, key_agreement, key_cert_sign, crl_sign,
+        #          encipher_only, decipher_only)
+        x509.KeyUsage(True, False, True, False, False,
+                      False, False, False, False),
+        critical=True,
     ).sign(ca_key, hashes.SHA256(), default_backend())
 
     return server_cert, server_key
 
 
+def generate_client_cert_and_key(ca_cert, ca_key, client_uuid):
+    """Creates a client cert and key for testing.
+
+    :param ca_cert: A cryptography CA certificate (x509) object.
+    :param ca_key: A cryptography CA key (x509) object.
+    :param client_uuid: A UUID identifying the client.
+    :returns: The cryptography server cert and key objects.
+    """
+
+    client_key = rsa.generate_private_key(
+        public_exponent=65537, key_size=2048, backend=default_backend())
+
+    subject = x509.Name([
+        x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
+        x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Denial"),
+        x509.NameAttribute(NameOID.LOCALITY_NAME, u"Corvallis"),
+        x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"OpenStack"),
+        x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u"Octavia"),
+        x509.NameAttribute(NameOID.COMMON_NAME, u"{}".format(client_uuid)),
+    ])
+
+    client_cert = x509.CertificateBuilder().subject_name(
+        subject
+    ).issuer_name(
+        ca_cert.subject
+    ).public_key(
+        client_key.public_key()
+    ).serial_number(
+        x509.random_serial_number()
+    ).not_valid_before(
+        datetime.datetime.utcnow()
+    ).not_valid_after(
+        datetime.datetime.utcnow() + datetime.timedelta(days=10)
+    ).add_extension(
+        x509.BasicConstraints(ca=False, path_length=None),
+        critical=True,
+    ).add_extension(
+        # KeyUsage(digital_signature, content_commitment, key_encipherment,
+        #          data_encipherment, key_agreement, key_cert_sign, crl_sign,
+        #          encipher_only, decipher_only)
+        x509.KeyUsage(True, True, True, False, False, False,
+                      False, False, False),
+        critical=True,
+    ).sign(ca_key, hashes.SHA256(), default_backend())
+
+    return client_cert, client_key
+
+
 def generate_pkcs12_bundle(server_cert, server_key):
     """Creates a pkcs12 formated bundle.
 
@@ -128,3 +190,28 @@
         OpenSSL.crypto.PKey.from_cryptography_key(server_key))
     pkcs12.set_certificate(OpenSSL.crypto.X509.from_cryptography(server_cert))
     return pkcs12.export()
+
+
+def generate_certificate_revocation_list(ca_cert, ca_key, cert_to_revoke):
+    """Create a certificate revocation list with a revoked certificate.
+
+    :param ca_cert: A cryptography CA certificate (x509) object.
+    :param ca_key: A cryptography CA key (x509) object.
+    :param cert_to_revoke: A cryptography CA certificate (x509) object.
+    :returns: A signed certificate revocation list.
+    """
+    crl_builder = x509.CertificateRevocationListBuilder()
+    crl_builder = crl_builder.issuer_name(ca_cert.subject)
+    crl_builder = crl_builder.last_update(datetime.datetime.today())
+    crl_builder = crl_builder.next_update(datetime.datetime.today() +
+                                          datetime.timedelta(1, 0, 0))
+
+    revoked_cert = x509.RevokedCertificateBuilder().serial_number(
+        cert_to_revoke.serial_number
+    ).revocation_date(
+        datetime.datetime.today()
+    ).build(default_backend())
+
+    crl_builder = crl_builder.add_revoked_certificate(revoked_cert)
+    return crl_builder.sign(private_key=ca_key, algorithm=hashes.SHA256(),
+                            backend=default_backend())
diff --git a/octavia_tempest_plugin/common/constants.py b/octavia_tempest_plugin/common/constants.py
index d4e1755..6c65245 100644
--- a/octavia_tempest_plugin/common/constants.py
+++ b/octavia_tempest_plugin/common/constants.py
@@ -20,6 +20,12 @@
 ADMIN_STATE_UP = 'admin_state_up'
 BYTES_IN = 'bytes_in'
 BYTES_OUT = 'bytes_out'
+CLIENT_AUTHENTICATION = 'client_authentication'
+CLIENT_AUTH_NONE = 'NONE'
+CLIENT_AUTH_OPTIONAL = 'OPTIONAL'
+CLIENT_AUTH_MANDATORY = 'MANDATORY'
+CLIENT_CA_TLS_CONTAINER_REF = 'client_ca_tls_container_ref'
+CLIENT_CRL_CONTAINER_REF = 'client_crl_container_ref'
 CREATED_AT = 'created_at'
 DESCRIPTION = 'description'
 FLAVOR_DATA = 'flavor_data'
diff --git a/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py b/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py
index de84bd1..1cc17ff 100644
--- a/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py
+++ b/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py
@@ -35,7 +35,10 @@
                         timeout_member_data=Unset, timeout_tcp_inspect=Unset,
                         insert_headers=Unset, default_pool_id=Unset,
                         default_tls_container_ref=Unset,
-                        sni_container_refs=Unset, return_object_only=True):
+                        sni_container_refs=Unset, client_authentication=Unset,
+                        client_ca_tls_container_ref=Unset,
+                        client_crl_container_ref=Unset,
+                        return_object_only=True):
         """Create a listener.
 
         :param protocol: The protocol for the resource.
@@ -70,6 +73,17 @@
                                    secrets containing PKCS12 format
                                    certificate/key bundles for TERMINATED_TLS
                                    listeners.
+        :param client_authentication: The TLS client authentication mode. One
+                                      of the options NONE, OPTIONAL or
+                                      MANDATORY.
+        :param client_ca_tls_container_ref: The ref of the key manager service
+                                            secret containing a PEM format
+                                            client CA certificate bundle for
+                                            TERMINATED_HTTPS listeners.
+        :param client_crl_container_ref: The URI of the key manager service
+                                         secret containing a PEM format CA
+                                         revocation list file for
+                                         TERMINATED_HTTPS listeners.
         :param return_object_only: If True, the response returns the object
                                    inside the root tag. False returns the full
                                    response from the API.
@@ -190,7 +204,10 @@
                         timeout_member_data=Unset, timeout_tcp_inspect=Unset,
                         insert_headers=Unset, default_pool_id=Unset,
                         default_tls_container_ref=Unset,
-                        sni_container_refs=Unset, return_object_only=True):
+                        sni_container_refs=Unset, client_authentication=Unset,
+                        client_ca_tls_container_ref=Unset,
+                        client_crl_container_ref=Unset,
+                        return_object_only=True):
         """Update a listener.
 
         :param listener_id: The listener ID to update.
@@ -223,6 +240,17 @@
                                    secrets containing PKCS12 format
                                    certificate/key bundles for TERMINATED_TLS
                                    listeners.
+        :param client_authentication: The TLS client authentication mode. One
+                                      of the options NONE, OPTIONAL or
+                                      MANDATORY.
+        :param client_ca_tls_container_ref: The ref of the key manager service
+                                            secret containing a PEM format
+                                            client CA certificate bundle for
+                                            TERMINATED_HTTPS listeners.
+        :param client_crl_container_ref: The URI of the key manager service
+                                         secret containing a PEM format CA
+                                         revocation list file for
+                                         TERMINATED_HTTPS listeners.
         :param return_object_only: If True, the response returns the object
                                    inside the root tag. False returns the full
                                    response from the API.
diff --git a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
index 0fe1d81..5f7ad51 100644
--- a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
+++ b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
@@ -13,7 +13,9 @@
 #    under the License.
 
 import base64
+import requests
 import socket
+import tempfile
 
 from cryptography.hazmat.primitives import serialization
 from OpenSSL.crypto import X509
@@ -53,6 +55,25 @@
                                     'barbican service.')
 
     @classmethod
+    def _store_secret(cls, barbican_mgr, secret):
+        new_secret_ref = barbican_mgr.store_secret(secret)
+        cls.addClassResourceCleanup(barbican_mgr.delete_secret,
+                                    new_secret_ref)
+
+        # Set the barbican ACL if the Octavia API version doesn't do it
+        # automatically.
+        if not cls.mem_lb_client.is_version_supported(
+                cls.api_version, '2.1'):
+            user_list = cls.os_admin.users_v3_client.list_users(
+                name=CONF.load_balancer.octavia_svc_username)
+            msg = 'Only one user named "{0}" should exist, {1} found.'.format(
+                CONF.load_balancer.octavia_svc_username,
+                len(user_list['users']))
+            assert 1 == len(user_list['users']), msg
+            barbican_mgr.add_acl(new_secret_ref, user_list['users'][0]['id'])
+        return new_secret_ref
+
+    @classmethod
     def _generate_load_certificate(cls, barbican_mgr, ca_cert, ca_key, name):
         new_cert, new_key = cert_utils.generate_server_cert_and_key(
             ca_cert, ca_key, name)
@@ -72,20 +93,8 @@
         pkcs12 = cert_utils.generate_pkcs12_bundle(new_cert, new_key)
         LOG.debug('%s PKCS12 bundle: %s', name, base64.b64encode(pkcs12))
 
-        new_secret_ref = barbican_mgr.store_secret(pkcs12)
-        cls.addClassResourceCleanup(barbican_mgr.delete_secret, new_secret_ref)
+        new_secret_ref = cls._store_secret(barbican_mgr, pkcs12)
 
-        # Set the barbican ACL if the Octavia API version doesn't do it
-        # automatically.
-        if not cls.mem_lb_client.is_version_supported(
-                cls.api_version, '2.1'):
-            user_list = cls.os_admin.users_v3_client.list_users(
-                name=CONF.load_balancer.octavia_svc_username)
-            msg = 'Only one user named "{0}" should exist, {1} found.'.format(
-                CONF.load_balancer.octavia_svc_username,
-                len(user_list['users']))
-            assert 1 == len(user_list['users']), msg
-            barbican_mgr.add_acl(new_secret_ref, user_list['users'][0]['id'])
         return new_cert, new_key, new_secret_ref
 
     @classmethod
@@ -108,7 +117,7 @@
 
         # Load the secret into the barbican service under the
         # os_roles_lb_member tenant
-        barbican_mgr = barbican_client_mgr.BarbicanClientManager(
+        cls.barbican_mgr = barbican_client_mgr.BarbicanClientManager(
             cls.os_roles_lb_member)
 
         # Create a server cert and key
@@ -117,7 +126,7 @@
         LOG.debug('Server (default) UUID: %s' % cls.server_uuid)
 
         server_cert, server_key, cls.server_secret_ref = (
-            cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
+            cls._generate_load_certificate(cls.barbican_mgr, cls.ca_cert,
                                            ca_key, cls.server_uuid))
 
         # Create the SNI1 cert and key
@@ -125,7 +134,7 @@
         LOG.debug('SNI1 UUID: %s' % cls.SNI1_uuid)
 
         SNI1_cert, SNI1_key, cls.SNI1_secret_ref = (
-            cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
+            cls._generate_load_certificate(cls.barbican_mgr, cls.ca_cert,
                                            ca_key, cls.SNI1_uuid))
 
         # Create the SNI2 cert and key
@@ -133,9 +142,37 @@
         LOG.debug('SNI2 UUID: %s' % cls.SNI2_uuid)
 
         SNI2_cert, SNI2_key, cls.SNI2_secret_ref = (
-            cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
+            cls._generate_load_certificate(cls.barbican_mgr, cls.ca_cert,
                                            ca_key, cls.SNI2_uuid))
 
+        # Create the client authentication CA
+        cls.client_ca_cert, client_ca_key = (
+            cert_utils.generate_ca_cert_and_key())
+
+        cls.client_ca_cert_ref = cls._store_secret(
+            cls.barbican_mgr,
+            cls.client_ca_cert.public_bytes(serialization.Encoding.PEM))
+
+        # Create client cert and key
+        cls.client_cn = uuidutils.generate_uuid()
+        cls.client_cert, cls.client_key = (
+            cert_utils.generate_client_cert_and_key(
+                cls.client_ca_cert, client_ca_key, cls.client_cn))
+
+        # Create revoked client cert and key
+        cls.revoked_client_cn = uuidutils.generate_uuid()
+        cls.revoked_client_cert, cls.revoked_client_key = (
+            cert_utils.generate_client_cert_and_key(
+                cls.client_ca_cert, client_ca_key, cls.revoked_client_cn))
+
+        # Create certificate revocation list and revoke cert
+        cls.client_crl = cert_utils.generate_certificate_revocation_list(
+            cls.client_ca_cert, client_ca_key, cls.revoked_client_cert)
+
+        cls.client_crl_ref = cls._store_secret(
+            cls.barbican_mgr,
+            cls.client_crl.public_bytes(serialization.Encoding.PEM))
+
         # Setup a load balancer for the tests to use
         lb_name = data_utils.rand_name("lb_member_lb1-tls")
         lb_kwargs = {const.PROVIDER: CONF.load_balancer.provider,
@@ -618,3 +655,384 @@
         sock.connect((self.lb_vip_address, 8443))
         # Validate the certificate is signed by the ca_cert we created
         sock.do_handshake()
+
+    @decorators.idempotent_id('af6bb7d2-acbb-4f6e-861f-39a2a3f02331')
+    def test_tls_client_auth_mandatory(self):
+        if not self.mem_listener_client.is_version_supported(
+                self.api_version, '2.8'):
+            raise self.skipException('TLS client authentication '
+                                     'is only available on Octavia API '
+                                     'version 2.8 or newer.')
+        LISTENER1_TCP_PORT = '443'
+        listener_name = data_utils.rand_name(
+            "lb_member_listener1-client-auth-mand")
+        listener_kwargs = {
+            const.NAME: listener_name,
+            const.PROTOCOL: const.TERMINATED_HTTPS,
+            const.PROTOCOL_PORT: LISTENER1_TCP_PORT,
+            const.LOADBALANCER_ID: self.lb_id,
+            const.DEFAULT_POOL_ID: self.pool_id,
+            const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
+            const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY,
+            const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref,
+            const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref,
+        }
+        listener = self.mem_listener_client.create_listener(**listener_kwargs)
+        self.listener_id = listener[const.ID]
+        self.addCleanup(
+            self.mem_listener_client.cleanup_listener,
+            self.listener_id,
+            lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+        waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+                                self.lb_id, const.PROVISIONING_STATUS,
+                                const.ACTIVE,
+                                CONF.load_balancer.build_interval,
+                                CONF.load_balancer.build_timeout)
+
+        # Test that no client certificate fails to connect
+        self.assertRaisesRegex(
+            requests.exceptions.SSLError, ".*certificate required.*",
+            requests.get,
+            'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT),
+            timeout=12, verify=False)
+
+        # Test that a revoked client certificate fails to connect
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(self.revoked_client_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(self.revoked_client_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                self.assertRaisesRegex(
+                    requests.exceptions.SSLError, ".*revoked.*", requests.get,
+                    'https://{0}:{1}'.format(self.lb_vip_address,
+                                             LISTENER1_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+
+        # Test that a valid client certificate can connect
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(self.client_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(self.client_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                response = requests.get(
+                    'https://{0}:{1}'.format(self.lb_vip_address,
+                                             LISTENER1_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+                self.assertEqual(200, response.status_code)
+
+    @decorators.idempotent_id('42d696bf-e7f5-44f0-9331-4a5e01d69ef3')
+    def test_tls_client_auth_optional(self):
+        if not self.mem_listener_client.is_version_supported(
+                self.api_version, '2.8'):
+            raise self.skipException('TLS client authentication '
+                                     'is only available on Octavia API '
+                                     'version 2.8 or newer.')
+        LISTENER1_TCP_PORT = '443'
+        listener_name = data_utils.rand_name(
+            "lb_member_listener1-client-auth-optional")
+        listener_kwargs = {
+            const.NAME: listener_name,
+            const.PROTOCOL: const.TERMINATED_HTTPS,
+            const.PROTOCOL_PORT: LISTENER1_TCP_PORT,
+            const.LOADBALANCER_ID: self.lb_id,
+            const.DEFAULT_POOL_ID: self.pool_id,
+            const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
+            const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_OPTIONAL,
+            const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref,
+            const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref,
+        }
+        listener = self.mem_listener_client.create_listener(**listener_kwargs)
+        self.listener_id = listener[const.ID]
+        self.addCleanup(
+            self.mem_listener_client.cleanup_listener,
+            self.listener_id,
+            lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+        waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+                                self.lb_id, const.PROVISIONING_STATUS,
+                                const.ACTIVE,
+                                CONF.load_balancer.build_interval,
+                                CONF.load_balancer.build_timeout)
+
+        # Test that no client certificate connects
+        response = requests.get(
+            'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT),
+            timeout=12, verify=False)
+        self.assertEqual(200, response.status_code)
+
+        # Test that a revoked client certificate fails to connect
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(self.revoked_client_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(self.revoked_client_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                self.assertRaisesRegex(
+                    requests.exceptions.SSLError, ".*revoked.*", requests.get,
+                    'https://{0}:{1}'.format(self.lb_vip_address,
+                                             LISTENER1_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+
+        # Test that a valid client certificate can connect
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(self.client_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(self.client_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                response = requests.get(
+                    'https://{0}:{1}'.format(self.lb_vip_address,
+                                             LISTENER1_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+                self.assertEqual(200, response.status_code)
+
+    @decorators.idempotent_id('13271ce6-f9f7-4017-a017-c2fc390b9438')
+    def test_tls_multi_listener_client_auth(self):
+        """Test client authentication in a multi-listener LB.
+
+        Validates that certificates and CRLs don't get cross configured
+        between multiple listeners on the same load balancer.
+        """
+        if not self.mem_listener_client.is_version_supported(
+                self.api_version, '2.8'):
+            raise self.skipException('TLS client authentication '
+                                     'is only available on Octavia API '
+                                     'version 2.8 or newer.')
+        # Create the client2 authentication CA
+        client2_ca_cert, client2_ca_key = (
+            cert_utils.generate_ca_cert_and_key())
+
+        client2_ca_cert_ref = self._store_secret(
+            self.barbican_mgr,
+            client2_ca_cert.public_bytes(serialization.Encoding.PEM))
+
+        # Create client2 cert and key
+        client2_cn = uuidutils.generate_uuid()
+        client2_cert, client2_key = (
+            cert_utils.generate_client_cert_and_key(
+                client2_ca_cert, client2_ca_key, client2_cn))
+
+        # Create revoked client2 cert and key
+        revoked_client2_cn = uuidutils.generate_uuid()
+        revoked_client2_cert, revoked_client2_key = (
+            cert_utils.generate_client_cert_and_key(
+                client2_ca_cert, client2_ca_key, revoked_client2_cn))
+
+        # Create certificate revocation list and revoke cert
+        client2_crl = cert_utils.generate_certificate_revocation_list(
+            client2_ca_cert, client2_ca_key, revoked_client2_cert)
+
+        client2_crl_ref = self._store_secret(
+            self.barbican_mgr,
+            client2_crl.public_bytes(serialization.Encoding.PEM))
+
+        LISTENER1_TCP_PORT = '443'
+        listener_name = data_utils.rand_name(
+            "lb_member_listener1-multi-list-client-auth")
+        listener_kwargs = {
+            const.NAME: listener_name,
+            const.PROTOCOL: const.TERMINATED_HTTPS,
+            const.PROTOCOL_PORT: LISTENER1_TCP_PORT,
+            const.LOADBALANCER_ID: self.lb_id,
+            const.DEFAULT_POOL_ID: self.pool_id,
+            const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
+            const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY,
+            const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref,
+            const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref,
+        }
+        listener = self.mem_listener_client.create_listener(**listener_kwargs)
+        self.listener_id = listener[const.ID]
+        self.addCleanup(
+            self.mem_listener_client.cleanup_listener,
+            self.listener_id,
+            lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+        waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+                                self.lb_id, const.PROVISIONING_STATUS,
+                                const.ACTIVE,
+                                CONF.load_balancer.build_interval,
+                                CONF.load_balancer.build_timeout)
+
+        LISTENER2_TCP_PORT = '8443'
+        listener_name = data_utils.rand_name(
+            "lb_member_listener2-multi-list-client-auth")
+        listener_kwargs = {
+            const.NAME: listener_name,
+            const.PROTOCOL: const.TERMINATED_HTTPS,
+            const.PROTOCOL_PORT: LISTENER2_TCP_PORT,
+            const.LOADBALANCER_ID: self.lb_id,
+            const.DEFAULT_POOL_ID: self.pool_id,
+            const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
+            const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY,
+            const.CLIENT_CA_TLS_CONTAINER_REF: client2_ca_cert_ref,
+            const.CLIENT_CRL_CONTAINER_REF: client2_crl_ref,
+        }
+        listener2 = self.mem_listener_client.create_listener(**listener_kwargs)
+        self.listener2_id = listener2[const.ID]
+        self.addCleanup(
+            self.mem_listener_client.cleanup_listener,
+            self.listener2_id,
+            lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+        waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+                                self.lb_id, const.PROVISIONING_STATUS,
+                                const.ACTIVE,
+                                CONF.load_balancer.build_interval,
+                                CONF.load_balancer.build_timeout)
+
+        # Test that no client certificate fails to connect to listener1
+        self.assertRaisesRegex(
+            requests.exceptions.SSLError, ".*certificate required.*",
+            requests.get,
+            'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT),
+            timeout=12, verify=False)
+
+        # Test that no client certificate fails to connect to listener2
+        self.assertRaisesRegex(
+            requests.exceptions.SSLError, ".*certificate required.*",
+            requests.get,
+            'https://{0}:{1}'.format(self.lb_vip_address, LISTENER2_TCP_PORT),
+            timeout=12, verify=False)
+
+        # Test that a revoked client certificate fails to connect
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(self.revoked_client_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(self.revoked_client_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                self.assertRaisesRegex(
+                    requests.exceptions.SSLError, ".*revoked.*", requests.get,
+                    'https://{0}:{1}'.format(self.lb_vip_address,
+                                             LISTENER1_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+
+        # Test that a revoked client2 certificate fails to connect
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(revoked_client2_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(revoked_client2_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                self.assertRaisesRegex(
+                    requests.exceptions.SSLError, ".*revoked.*", requests.get,
+                    'https://{0}:{1}'.format(self.lb_vip_address,
+                                             LISTENER2_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+
+        # Test that a valid client certificate can connect to listener1
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(self.client_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(self.client_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                response = requests.get(
+                    'https://{0}:{1}'.format(self.lb_vip_address,
+                                             LISTENER1_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+                self.assertEqual(200, response.status_code)
+
+        # Test that a valid client2 certificate can connect to listener2
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(client2_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(client2_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                response = requests.get(
+                    'https://{0}:{1}'.format(self.lb_vip_address,
+                                             LISTENER2_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+                self.assertEqual(200, response.status_code)
+
+        # Test that a valid client1 certificate can not connect to listener2
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(self.client_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(self.client_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                self.assertRaisesRegex(
+                    requests.exceptions.SSLError, ".*decrypt error.*",
+                    requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
+                                                           LISTENER2_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+
+        # Test that a valid client2 certificate can not connect to listener1
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(client2_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(client2_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                self.assertRaisesRegex(
+                    requests.exceptions.SSLError, ".*decrypt error.*",
+                    requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
+                                                           LISTENER1_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+
+        # Test that a revoked client1 certificate can not connect to listener2
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(self.revoked_client_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(self.revoked_client_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                self.assertRaisesRegex(
+                    requests.exceptions.SSLError, ".*decrypt error.*",
+                    requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
+                                                           LISTENER2_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+
+        # Test that a revoked client2 certificate can not connect to listener1
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(revoked_client2_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(revoked_client2_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                self.assertRaisesRegex(
+                    requests.exceptions.SSLError, ".*decrypt error.*",
+                    requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
+                                                           LISTENER1_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
diff --git a/releasenotes/notes/client-auth-scenario-bffa420a2fd38159.yaml b/releasenotes/notes/client-auth-scenario-bffa420a2fd38159.yaml
new file mode 100644
index 0000000..3e44be4
--- /dev/null
+++ b/releasenotes/notes/client-auth-scenario-bffa420a2fd38159.yaml
@@ -0,0 +1,4 @@
+---
+features:
+  - |
+    Adds scenario tests for listener client authentication.