Merge "Add listener client authentication scenario tests"
diff --git a/octavia_tempest_plugin/common/barbican_client_mgr.py b/octavia_tempest_plugin/common/barbican_client_mgr.py
index e93f903..eba1715 100644
--- a/octavia_tempest_plugin/common/barbican_client_mgr.py
+++ b/octavia_tempest_plugin/common/barbican_client_mgr.py
@@ -63,15 +63,15 @@
# Setup the barbican client
self.barbican = client.Client(session=id_session)
- def store_secret(self, pkcs12_secret):
+ def store_secret(self, secret):
"""Store a secret in barbican.
- :param pkcs12_secret: A pkcs12 secret.
+ :param secret: A pkcs12 secret.
:returns: The barbican secret_ref.
"""
p12_secret = self.barbican.secrets.create()
- p12_secret.name = data_utils.rand_name("lb_member_barbican_pkcs12")
- p12_secret.payload = pkcs12_secret
+ p12_secret.name = data_utils.rand_name("lb_member_barbican")
+ p12_secret.payload = secret
secret_ref = p12_secret.store()
LOG.debug('Secret {0} has ref {1}'.format(p12_secret.name, secret_ref))
return secret_ref
diff --git a/octavia_tempest_plugin/common/cert_utils.py b/octavia_tempest_plugin/common/cert_utils.py
index dcdd6f0..bb8cdb3 100644
--- a/octavia_tempest_plugin/common/cert_utils.py
+++ b/octavia_tempest_plugin/common/cert_utils.py
@@ -58,6 +58,13 @@
).add_extension(
x509.BasicConstraints(ca=True, path_length=None),
critical=True,
+ ).add_extension(
+ # KeyUsage(digital_signature, content_commitment, key_encipherment,
+ # data_encipherment, key_agreement, key_cert_sign, crl_sign,
+ # encipher_only, decipher_only)
+ x509.KeyUsage(True, False, False, False, False,
+ True, True, False, False),
+ critical=True,
).sign(ca_key, hashes.SHA256(), default_backend())
return ca_cert, ca_key
@@ -104,11 +111,66 @@
).add_extension(
x509.BasicConstraints(ca=False, path_length=None),
critical=True,
+ ).add_extension(
+ # KeyUsage(digital_signature, content_commitment, key_encipherment,
+ # data_encipherment, key_agreement, key_cert_sign, crl_sign,
+ # encipher_only, decipher_only)
+ x509.KeyUsage(True, False, True, False, False,
+ False, False, False, False),
+ critical=True,
).sign(ca_key, hashes.SHA256(), default_backend())
return server_cert, server_key
+def generate_client_cert_and_key(ca_cert, ca_key, client_uuid):
+ """Creates a client cert and key for testing.
+
+ :param ca_cert: A cryptography CA certificate (x509) object.
+ :param ca_key: A cryptography CA key (x509) object.
+ :param client_uuid: A UUID identifying the client.
+ :returns: The cryptography server cert and key objects.
+ """
+
+ client_key = rsa.generate_private_key(
+ public_exponent=65537, key_size=2048, backend=default_backend())
+
+ subject = x509.Name([
+ x509.NameAttribute(NameOID.COUNTRY_NAME, u"US"),
+ x509.NameAttribute(NameOID.STATE_OR_PROVINCE_NAME, u"Denial"),
+ x509.NameAttribute(NameOID.LOCALITY_NAME, u"Corvallis"),
+ x509.NameAttribute(NameOID.ORGANIZATION_NAME, u"OpenStack"),
+ x509.NameAttribute(NameOID.ORGANIZATIONAL_UNIT_NAME, u"Octavia"),
+ x509.NameAttribute(NameOID.COMMON_NAME, u"{}".format(client_uuid)),
+ ])
+
+ client_cert = x509.CertificateBuilder().subject_name(
+ subject
+ ).issuer_name(
+ ca_cert.subject
+ ).public_key(
+ client_key.public_key()
+ ).serial_number(
+ x509.random_serial_number()
+ ).not_valid_before(
+ datetime.datetime.utcnow()
+ ).not_valid_after(
+ datetime.datetime.utcnow() + datetime.timedelta(days=10)
+ ).add_extension(
+ x509.BasicConstraints(ca=False, path_length=None),
+ critical=True,
+ ).add_extension(
+ # KeyUsage(digital_signature, content_commitment, key_encipherment,
+ # data_encipherment, key_agreement, key_cert_sign, crl_sign,
+ # encipher_only, decipher_only)
+ x509.KeyUsage(True, True, True, False, False, False,
+ False, False, False),
+ critical=True,
+ ).sign(ca_key, hashes.SHA256(), default_backend())
+
+ return client_cert, client_key
+
+
def generate_pkcs12_bundle(server_cert, server_key):
"""Creates a pkcs12 formated bundle.
@@ -128,3 +190,28 @@
OpenSSL.crypto.PKey.from_cryptography_key(server_key))
pkcs12.set_certificate(OpenSSL.crypto.X509.from_cryptography(server_cert))
return pkcs12.export()
+
+
+def generate_certificate_revocation_list(ca_cert, ca_key, cert_to_revoke):
+ """Create a certificate revocation list with a revoked certificate.
+
+ :param ca_cert: A cryptography CA certificate (x509) object.
+ :param ca_key: A cryptography CA key (x509) object.
+ :param cert_to_revoke: A cryptography CA certificate (x509) object.
+ :returns: A signed certificate revocation list.
+ """
+ crl_builder = x509.CertificateRevocationListBuilder()
+ crl_builder = crl_builder.issuer_name(ca_cert.subject)
+ crl_builder = crl_builder.last_update(datetime.datetime.today())
+ crl_builder = crl_builder.next_update(datetime.datetime.today() +
+ datetime.timedelta(1, 0, 0))
+
+ revoked_cert = x509.RevokedCertificateBuilder().serial_number(
+ cert_to_revoke.serial_number
+ ).revocation_date(
+ datetime.datetime.today()
+ ).build(default_backend())
+
+ crl_builder = crl_builder.add_revoked_certificate(revoked_cert)
+ return crl_builder.sign(private_key=ca_key, algorithm=hashes.SHA256(),
+ backend=default_backend())
diff --git a/octavia_tempest_plugin/common/constants.py b/octavia_tempest_plugin/common/constants.py
index d4e1755..6c65245 100644
--- a/octavia_tempest_plugin/common/constants.py
+++ b/octavia_tempest_plugin/common/constants.py
@@ -20,6 +20,12 @@
ADMIN_STATE_UP = 'admin_state_up'
BYTES_IN = 'bytes_in'
BYTES_OUT = 'bytes_out'
+CLIENT_AUTHENTICATION = 'client_authentication'
+CLIENT_AUTH_NONE = 'NONE'
+CLIENT_AUTH_OPTIONAL = 'OPTIONAL'
+CLIENT_AUTH_MANDATORY = 'MANDATORY'
+CLIENT_CA_TLS_CONTAINER_REF = 'client_ca_tls_container_ref'
+CLIENT_CRL_CONTAINER_REF = 'client_crl_container_ref'
CREATED_AT = 'created_at'
DESCRIPTION = 'description'
FLAVOR_DATA = 'flavor_data'
diff --git a/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py b/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py
index de84bd1..1cc17ff 100644
--- a/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py
+++ b/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py
@@ -35,7 +35,10 @@
timeout_member_data=Unset, timeout_tcp_inspect=Unset,
insert_headers=Unset, default_pool_id=Unset,
default_tls_container_ref=Unset,
- sni_container_refs=Unset, return_object_only=True):
+ sni_container_refs=Unset, client_authentication=Unset,
+ client_ca_tls_container_ref=Unset,
+ client_crl_container_ref=Unset,
+ return_object_only=True):
"""Create a listener.
:param protocol: The protocol for the resource.
@@ -70,6 +73,17 @@
secrets containing PKCS12 format
certificate/key bundles for TERMINATED_TLS
listeners.
+ :param client_authentication: The TLS client authentication mode. One
+ of the options NONE, OPTIONAL or
+ MANDATORY.
+ :param client_ca_tls_container_ref: The ref of the key manager service
+ secret containing a PEM format
+ client CA certificate bundle for
+ TERMINATED_HTTPS listeners.
+ :param client_crl_container_ref: The URI of the key manager service
+ secret containing a PEM format CA
+ revocation list file for
+ TERMINATED_HTTPS listeners.
:param return_object_only: If True, the response returns the object
inside the root tag. False returns the full
response from the API.
@@ -190,7 +204,10 @@
timeout_member_data=Unset, timeout_tcp_inspect=Unset,
insert_headers=Unset, default_pool_id=Unset,
default_tls_container_ref=Unset,
- sni_container_refs=Unset, return_object_only=True):
+ sni_container_refs=Unset, client_authentication=Unset,
+ client_ca_tls_container_ref=Unset,
+ client_crl_container_ref=Unset,
+ return_object_only=True):
"""Update a listener.
:param listener_id: The listener ID to update.
@@ -223,6 +240,17 @@
secrets containing PKCS12 format
certificate/key bundles for TERMINATED_TLS
listeners.
+ :param client_authentication: The TLS client authentication mode. One
+ of the options NONE, OPTIONAL or
+ MANDATORY.
+ :param client_ca_tls_container_ref: The ref of the key manager service
+ secret containing a PEM format
+ client CA certificate bundle for
+ TERMINATED_HTTPS listeners.
+ :param client_crl_container_ref: The URI of the key manager service
+ secret containing a PEM format CA
+ revocation list file for
+ TERMINATED_HTTPS listeners.
:param return_object_only: If True, the response returns the object
inside the root tag. False returns the full
response from the API.
diff --git a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
index 0fe1d81..5f7ad51 100644
--- a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
+++ b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
@@ -13,7 +13,9 @@
# under the License.
import base64
+import requests
import socket
+import tempfile
from cryptography.hazmat.primitives import serialization
from OpenSSL.crypto import X509
@@ -53,6 +55,25 @@
'barbican service.')
@classmethod
+ def _store_secret(cls, barbican_mgr, secret):
+ new_secret_ref = barbican_mgr.store_secret(secret)
+ cls.addClassResourceCleanup(barbican_mgr.delete_secret,
+ new_secret_ref)
+
+ # Set the barbican ACL if the Octavia API version doesn't do it
+ # automatically.
+ if not cls.mem_lb_client.is_version_supported(
+ cls.api_version, '2.1'):
+ user_list = cls.os_admin.users_v3_client.list_users(
+ name=CONF.load_balancer.octavia_svc_username)
+ msg = 'Only one user named "{0}" should exist, {1} found.'.format(
+ CONF.load_balancer.octavia_svc_username,
+ len(user_list['users']))
+ assert 1 == len(user_list['users']), msg
+ barbican_mgr.add_acl(new_secret_ref, user_list['users'][0]['id'])
+ return new_secret_ref
+
+ @classmethod
def _generate_load_certificate(cls, barbican_mgr, ca_cert, ca_key, name):
new_cert, new_key = cert_utils.generate_server_cert_and_key(
ca_cert, ca_key, name)
@@ -72,20 +93,8 @@
pkcs12 = cert_utils.generate_pkcs12_bundle(new_cert, new_key)
LOG.debug('%s PKCS12 bundle: %s', name, base64.b64encode(pkcs12))
- new_secret_ref = barbican_mgr.store_secret(pkcs12)
- cls.addClassResourceCleanup(barbican_mgr.delete_secret, new_secret_ref)
+ new_secret_ref = cls._store_secret(barbican_mgr, pkcs12)
- # Set the barbican ACL if the Octavia API version doesn't do it
- # automatically.
- if not cls.mem_lb_client.is_version_supported(
- cls.api_version, '2.1'):
- user_list = cls.os_admin.users_v3_client.list_users(
- name=CONF.load_balancer.octavia_svc_username)
- msg = 'Only one user named "{0}" should exist, {1} found.'.format(
- CONF.load_balancer.octavia_svc_username,
- len(user_list['users']))
- assert 1 == len(user_list['users']), msg
- barbican_mgr.add_acl(new_secret_ref, user_list['users'][0]['id'])
return new_cert, new_key, new_secret_ref
@classmethod
@@ -108,7 +117,7 @@
# Load the secret into the barbican service under the
# os_roles_lb_member tenant
- barbican_mgr = barbican_client_mgr.BarbicanClientManager(
+ cls.barbican_mgr = barbican_client_mgr.BarbicanClientManager(
cls.os_roles_lb_member)
# Create a server cert and key
@@ -117,7 +126,7 @@
LOG.debug('Server (default) UUID: %s' % cls.server_uuid)
server_cert, server_key, cls.server_secret_ref = (
- cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
+ cls._generate_load_certificate(cls.barbican_mgr, cls.ca_cert,
ca_key, cls.server_uuid))
# Create the SNI1 cert and key
@@ -125,7 +134,7 @@
LOG.debug('SNI1 UUID: %s' % cls.SNI1_uuid)
SNI1_cert, SNI1_key, cls.SNI1_secret_ref = (
- cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
+ cls._generate_load_certificate(cls.barbican_mgr, cls.ca_cert,
ca_key, cls.SNI1_uuid))
# Create the SNI2 cert and key
@@ -133,9 +142,37 @@
LOG.debug('SNI2 UUID: %s' % cls.SNI2_uuid)
SNI2_cert, SNI2_key, cls.SNI2_secret_ref = (
- cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
+ cls._generate_load_certificate(cls.barbican_mgr, cls.ca_cert,
ca_key, cls.SNI2_uuid))
+ # Create the client authentication CA
+ cls.client_ca_cert, client_ca_key = (
+ cert_utils.generate_ca_cert_and_key())
+
+ cls.client_ca_cert_ref = cls._store_secret(
+ cls.barbican_mgr,
+ cls.client_ca_cert.public_bytes(serialization.Encoding.PEM))
+
+ # Create client cert and key
+ cls.client_cn = uuidutils.generate_uuid()
+ cls.client_cert, cls.client_key = (
+ cert_utils.generate_client_cert_and_key(
+ cls.client_ca_cert, client_ca_key, cls.client_cn))
+
+ # Create revoked client cert and key
+ cls.revoked_client_cn = uuidutils.generate_uuid()
+ cls.revoked_client_cert, cls.revoked_client_key = (
+ cert_utils.generate_client_cert_and_key(
+ cls.client_ca_cert, client_ca_key, cls.revoked_client_cn))
+
+ # Create certificate revocation list and revoke cert
+ cls.client_crl = cert_utils.generate_certificate_revocation_list(
+ cls.client_ca_cert, client_ca_key, cls.revoked_client_cert)
+
+ cls.client_crl_ref = cls._store_secret(
+ cls.barbican_mgr,
+ cls.client_crl.public_bytes(serialization.Encoding.PEM))
+
# Setup a load balancer for the tests to use
lb_name = data_utils.rand_name("lb_member_lb1-tls")
lb_kwargs = {const.PROVIDER: CONF.load_balancer.provider,
@@ -618,3 +655,384 @@
sock.connect((self.lb_vip_address, 8443))
# Validate the certificate is signed by the ca_cert we created
sock.do_handshake()
+
+ @decorators.idempotent_id('af6bb7d2-acbb-4f6e-861f-39a2a3f02331')
+ def test_tls_client_auth_mandatory(self):
+ if not self.mem_listener_client.is_version_supported(
+ self.api_version, '2.8'):
+ raise self.skipException('TLS client authentication '
+ 'is only available on Octavia API '
+ 'version 2.8 or newer.')
+ LISTENER1_TCP_PORT = '443'
+ listener_name = data_utils.rand_name(
+ "lb_member_listener1-client-auth-mand")
+ listener_kwargs = {
+ const.NAME: listener_name,
+ const.PROTOCOL: const.TERMINATED_HTTPS,
+ const.PROTOCOL_PORT: LISTENER1_TCP_PORT,
+ const.LOADBALANCER_ID: self.lb_id,
+ const.DEFAULT_POOL_ID: self.pool_id,
+ const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
+ const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY,
+ const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref,
+ const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref,
+ }
+ listener = self.mem_listener_client.create_listener(**listener_kwargs)
+ self.listener_id = listener[const.ID]
+ self.addCleanup(
+ self.mem_listener_client.cleanup_listener,
+ self.listener_id,
+ lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+ waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+ self.lb_id, const.PROVISIONING_STATUS,
+ const.ACTIVE,
+ CONF.load_balancer.build_interval,
+ CONF.load_balancer.build_timeout)
+
+ # Test that no client certificate fails to connect
+ self.assertRaisesRegex(
+ requests.exceptions.SSLError, ".*certificate required.*",
+ requests.get,
+ 'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT),
+ timeout=12, verify=False)
+
+ # Test that a revoked client certificate fails to connect
+ with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+ cert_file.write(self.revoked_client_cert.public_bytes(
+ serialization.Encoding.PEM))
+ with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+ key_file.write(self.revoked_client_key.private_bytes(
+ serialization.Encoding.PEM,
+ serialization.PrivateFormat.TraditionalOpenSSL,
+ serialization.NoEncryption()))
+ self.assertRaisesRegex(
+ requests.exceptions.SSLError, ".*revoked.*", requests.get,
+ 'https://{0}:{1}'.format(self.lb_vip_address,
+ LISTENER1_TCP_PORT),
+ timeout=12, verify=False, cert=(cert_file.name,
+ key_file.name))
+
+ # Test that a valid client certificate can connect
+ with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+ cert_file.write(self.client_cert.public_bytes(
+ serialization.Encoding.PEM))
+ with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+ key_file.write(self.client_key.private_bytes(
+ serialization.Encoding.PEM,
+ serialization.PrivateFormat.TraditionalOpenSSL,
+ serialization.NoEncryption()))
+ response = requests.get(
+ 'https://{0}:{1}'.format(self.lb_vip_address,
+ LISTENER1_TCP_PORT),
+ timeout=12, verify=False, cert=(cert_file.name,
+ key_file.name))
+ self.assertEqual(200, response.status_code)
+
+ @decorators.idempotent_id('42d696bf-e7f5-44f0-9331-4a5e01d69ef3')
+ def test_tls_client_auth_optional(self):
+ if not self.mem_listener_client.is_version_supported(
+ self.api_version, '2.8'):
+ raise self.skipException('TLS client authentication '
+ 'is only available on Octavia API '
+ 'version 2.8 or newer.')
+ LISTENER1_TCP_PORT = '443'
+ listener_name = data_utils.rand_name(
+ "lb_member_listener1-client-auth-optional")
+ listener_kwargs = {
+ const.NAME: listener_name,
+ const.PROTOCOL: const.TERMINATED_HTTPS,
+ const.PROTOCOL_PORT: LISTENER1_TCP_PORT,
+ const.LOADBALANCER_ID: self.lb_id,
+ const.DEFAULT_POOL_ID: self.pool_id,
+ const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
+ const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_OPTIONAL,
+ const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref,
+ const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref,
+ }
+ listener = self.mem_listener_client.create_listener(**listener_kwargs)
+ self.listener_id = listener[const.ID]
+ self.addCleanup(
+ self.mem_listener_client.cleanup_listener,
+ self.listener_id,
+ lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+ waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+ self.lb_id, const.PROVISIONING_STATUS,
+ const.ACTIVE,
+ CONF.load_balancer.build_interval,
+ CONF.load_balancer.build_timeout)
+
+ # Test that no client certificate connects
+ response = requests.get(
+ 'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT),
+ timeout=12, verify=False)
+ self.assertEqual(200, response.status_code)
+
+ # Test that a revoked client certificate fails to connect
+ with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+ cert_file.write(self.revoked_client_cert.public_bytes(
+ serialization.Encoding.PEM))
+ with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+ key_file.write(self.revoked_client_key.private_bytes(
+ serialization.Encoding.PEM,
+ serialization.PrivateFormat.TraditionalOpenSSL,
+ serialization.NoEncryption()))
+ self.assertRaisesRegex(
+ requests.exceptions.SSLError, ".*revoked.*", requests.get,
+ 'https://{0}:{1}'.format(self.lb_vip_address,
+ LISTENER1_TCP_PORT),
+ timeout=12, verify=False, cert=(cert_file.name,
+ key_file.name))
+
+ # Test that a valid client certificate can connect
+ with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+ cert_file.write(self.client_cert.public_bytes(
+ serialization.Encoding.PEM))
+ with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+ key_file.write(self.client_key.private_bytes(
+ serialization.Encoding.PEM,
+ serialization.PrivateFormat.TraditionalOpenSSL,
+ serialization.NoEncryption()))
+ response = requests.get(
+ 'https://{0}:{1}'.format(self.lb_vip_address,
+ LISTENER1_TCP_PORT),
+ timeout=12, verify=False, cert=(cert_file.name,
+ key_file.name))
+ self.assertEqual(200, response.status_code)
+
+ @decorators.idempotent_id('13271ce6-f9f7-4017-a017-c2fc390b9438')
+ def test_tls_multi_listener_client_auth(self):
+ """Test client authentication in a multi-listener LB.
+
+ Validates that certificates and CRLs don't get cross configured
+ between multiple listeners on the same load balancer.
+ """
+ if not self.mem_listener_client.is_version_supported(
+ self.api_version, '2.8'):
+ raise self.skipException('TLS client authentication '
+ 'is only available on Octavia API '
+ 'version 2.8 or newer.')
+ # Create the client2 authentication CA
+ client2_ca_cert, client2_ca_key = (
+ cert_utils.generate_ca_cert_and_key())
+
+ client2_ca_cert_ref = self._store_secret(
+ self.barbican_mgr,
+ client2_ca_cert.public_bytes(serialization.Encoding.PEM))
+
+ # Create client2 cert and key
+ client2_cn = uuidutils.generate_uuid()
+ client2_cert, client2_key = (
+ cert_utils.generate_client_cert_and_key(
+ client2_ca_cert, client2_ca_key, client2_cn))
+
+ # Create revoked client2 cert and key
+ revoked_client2_cn = uuidutils.generate_uuid()
+ revoked_client2_cert, revoked_client2_key = (
+ cert_utils.generate_client_cert_and_key(
+ client2_ca_cert, client2_ca_key, revoked_client2_cn))
+
+ # Create certificate revocation list and revoke cert
+ client2_crl = cert_utils.generate_certificate_revocation_list(
+ client2_ca_cert, client2_ca_key, revoked_client2_cert)
+
+ client2_crl_ref = self._store_secret(
+ self.barbican_mgr,
+ client2_crl.public_bytes(serialization.Encoding.PEM))
+
+ LISTENER1_TCP_PORT = '443'
+ listener_name = data_utils.rand_name(
+ "lb_member_listener1-multi-list-client-auth")
+ listener_kwargs = {
+ const.NAME: listener_name,
+ const.PROTOCOL: const.TERMINATED_HTTPS,
+ const.PROTOCOL_PORT: LISTENER1_TCP_PORT,
+ const.LOADBALANCER_ID: self.lb_id,
+ const.DEFAULT_POOL_ID: self.pool_id,
+ const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
+ const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY,
+ const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref,
+ const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref,
+ }
+ listener = self.mem_listener_client.create_listener(**listener_kwargs)
+ self.listener_id = listener[const.ID]
+ self.addCleanup(
+ self.mem_listener_client.cleanup_listener,
+ self.listener_id,
+ lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+ waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+ self.lb_id, const.PROVISIONING_STATUS,
+ const.ACTIVE,
+ CONF.load_balancer.build_interval,
+ CONF.load_balancer.build_timeout)
+
+ LISTENER2_TCP_PORT = '8443'
+ listener_name = data_utils.rand_name(
+ "lb_member_listener2-multi-list-client-auth")
+ listener_kwargs = {
+ const.NAME: listener_name,
+ const.PROTOCOL: const.TERMINATED_HTTPS,
+ const.PROTOCOL_PORT: LISTENER2_TCP_PORT,
+ const.LOADBALANCER_ID: self.lb_id,
+ const.DEFAULT_POOL_ID: self.pool_id,
+ const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
+ const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY,
+ const.CLIENT_CA_TLS_CONTAINER_REF: client2_ca_cert_ref,
+ const.CLIENT_CRL_CONTAINER_REF: client2_crl_ref,
+ }
+ listener2 = self.mem_listener_client.create_listener(**listener_kwargs)
+ self.listener2_id = listener2[const.ID]
+ self.addCleanup(
+ self.mem_listener_client.cleanup_listener,
+ self.listener2_id,
+ lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+ waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+ self.lb_id, const.PROVISIONING_STATUS,
+ const.ACTIVE,
+ CONF.load_balancer.build_interval,
+ CONF.load_balancer.build_timeout)
+
+ # Test that no client certificate fails to connect to listener1
+ self.assertRaisesRegex(
+ requests.exceptions.SSLError, ".*certificate required.*",
+ requests.get,
+ 'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT),
+ timeout=12, verify=False)
+
+ # Test that no client certificate fails to connect to listener2
+ self.assertRaisesRegex(
+ requests.exceptions.SSLError, ".*certificate required.*",
+ requests.get,
+ 'https://{0}:{1}'.format(self.lb_vip_address, LISTENER2_TCP_PORT),
+ timeout=12, verify=False)
+
+ # Test that a revoked client certificate fails to connect
+ with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+ cert_file.write(self.revoked_client_cert.public_bytes(
+ serialization.Encoding.PEM))
+ with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+ key_file.write(self.revoked_client_key.private_bytes(
+ serialization.Encoding.PEM,
+ serialization.PrivateFormat.TraditionalOpenSSL,
+ serialization.NoEncryption()))
+ self.assertRaisesRegex(
+ requests.exceptions.SSLError, ".*revoked.*", requests.get,
+ 'https://{0}:{1}'.format(self.lb_vip_address,
+ LISTENER1_TCP_PORT),
+ timeout=12, verify=False, cert=(cert_file.name,
+ key_file.name))
+
+ # Test that a revoked client2 certificate fails to connect
+ with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+ cert_file.write(revoked_client2_cert.public_bytes(
+ serialization.Encoding.PEM))
+ with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+ key_file.write(revoked_client2_key.private_bytes(
+ serialization.Encoding.PEM,
+ serialization.PrivateFormat.TraditionalOpenSSL,
+ serialization.NoEncryption()))
+ self.assertRaisesRegex(
+ requests.exceptions.SSLError, ".*revoked.*", requests.get,
+ 'https://{0}:{1}'.format(self.lb_vip_address,
+ LISTENER2_TCP_PORT),
+ timeout=12, verify=False, cert=(cert_file.name,
+ key_file.name))
+
+ # Test that a valid client certificate can connect to listener1
+ with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+ cert_file.write(self.client_cert.public_bytes(
+ serialization.Encoding.PEM))
+ with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+ key_file.write(self.client_key.private_bytes(
+ serialization.Encoding.PEM,
+ serialization.PrivateFormat.TraditionalOpenSSL,
+ serialization.NoEncryption()))
+ response = requests.get(
+ 'https://{0}:{1}'.format(self.lb_vip_address,
+ LISTENER1_TCP_PORT),
+ timeout=12, verify=False, cert=(cert_file.name,
+ key_file.name))
+ self.assertEqual(200, response.status_code)
+
+ # Test that a valid client2 certificate can connect to listener2
+ with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+ cert_file.write(client2_cert.public_bytes(
+ serialization.Encoding.PEM))
+ with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+ key_file.write(client2_key.private_bytes(
+ serialization.Encoding.PEM,
+ serialization.PrivateFormat.TraditionalOpenSSL,
+ serialization.NoEncryption()))
+ response = requests.get(
+ 'https://{0}:{1}'.format(self.lb_vip_address,
+ LISTENER2_TCP_PORT),
+ timeout=12, verify=False, cert=(cert_file.name,
+ key_file.name))
+ self.assertEqual(200, response.status_code)
+
+ # Test that a valid client1 certificate can not connect to listener2
+ with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+ cert_file.write(self.client_cert.public_bytes(
+ serialization.Encoding.PEM))
+ with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+ key_file.write(self.client_key.private_bytes(
+ serialization.Encoding.PEM,
+ serialization.PrivateFormat.TraditionalOpenSSL,
+ serialization.NoEncryption()))
+ self.assertRaisesRegex(
+ requests.exceptions.SSLError, ".*decrypt error.*",
+ requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
+ LISTENER2_TCP_PORT),
+ timeout=12, verify=False, cert=(cert_file.name,
+ key_file.name))
+
+ # Test that a valid client2 certificate can not connect to listener1
+ with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+ cert_file.write(client2_cert.public_bytes(
+ serialization.Encoding.PEM))
+ with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+ key_file.write(client2_key.private_bytes(
+ serialization.Encoding.PEM,
+ serialization.PrivateFormat.TraditionalOpenSSL,
+ serialization.NoEncryption()))
+ self.assertRaisesRegex(
+ requests.exceptions.SSLError, ".*decrypt error.*",
+ requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
+ LISTENER1_TCP_PORT),
+ timeout=12, verify=False, cert=(cert_file.name,
+ key_file.name))
+
+ # Test that a revoked client1 certificate can not connect to listener2
+ with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+ cert_file.write(self.revoked_client_cert.public_bytes(
+ serialization.Encoding.PEM))
+ with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+ key_file.write(self.revoked_client_key.private_bytes(
+ serialization.Encoding.PEM,
+ serialization.PrivateFormat.TraditionalOpenSSL,
+ serialization.NoEncryption()))
+ self.assertRaisesRegex(
+ requests.exceptions.SSLError, ".*decrypt error.*",
+ requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
+ LISTENER2_TCP_PORT),
+ timeout=12, verify=False, cert=(cert_file.name,
+ key_file.name))
+
+ # Test that a revoked client2 certificate can not connect to listener1
+ with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+ cert_file.write(revoked_client2_cert.public_bytes(
+ serialization.Encoding.PEM))
+ with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+ key_file.write(revoked_client2_key.private_bytes(
+ serialization.Encoding.PEM,
+ serialization.PrivateFormat.TraditionalOpenSSL,
+ serialization.NoEncryption()))
+ self.assertRaisesRegex(
+ requests.exceptions.SSLError, ".*decrypt error.*",
+ requests.get, 'https://{0}:{1}'.format(self.lb_vip_address,
+ LISTENER1_TCP_PORT),
+ timeout=12, verify=False, cert=(cert_file.name,
+ key_file.name))
diff --git a/releasenotes/notes/client-auth-scenario-bffa420a2fd38159.yaml b/releasenotes/notes/client-auth-scenario-bffa420a2fd38159.yaml
new file mode 100644
index 0000000..3e44be4
--- /dev/null
+++ b/releasenotes/notes/client-auth-scenario-bffa420a2fd38159.yaml
@@ -0,0 +1,4 @@
+---
+features:
+ - |
+ Adds scenario tests for listener client authentication.