Add TLS SNI scenario tests
This patch adds scenario tests that exercise the SNI capabilities
of the Octavia TLS offloading.
Depends-On: https://review.opendev.org/690444
Change-Id: I4bbd103e34997dd6b1bb64cb5d69b5135c6e26ea
diff --git a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
index 25f741a..0fe1d81 100644
--- a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
+++ b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
@@ -53,6 +53,42 @@
'barbican service.')
@classmethod
+ def _generate_load_certificate(cls, barbican_mgr, ca_cert, ca_key, name):
+ new_cert, new_key = cert_utils.generate_server_cert_and_key(
+ ca_cert, ca_key, name)
+
+ LOG.debug('%s Cert: %s', name, new_cert.public_bytes(
+ serialization.Encoding.PEM))
+ LOG.debug('%s private Key: %s', name, new_key.private_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
+ encryption_algorithm=serialization.NoEncryption()))
+ new_public_key = new_key.public_key()
+ LOG.debug('%s public Key: %s', name, new_public_key.public_bytes(
+ encoding=serialization.Encoding.PEM,
+ format=serialization.PublicFormat.SubjectPublicKeyInfo))
+
+ # Create the pkcs12 bundle
+ pkcs12 = cert_utils.generate_pkcs12_bundle(new_cert, new_key)
+ LOG.debug('%s PKCS12 bundle: %s', name, base64.b64encode(pkcs12))
+
+ new_secret_ref = barbican_mgr.store_secret(pkcs12)
+ cls.addClassResourceCleanup(barbican_mgr.delete_secret, new_secret_ref)
+
+ # Set the barbican ACL if the Octavia API version doesn't do it
+ # automatically.
+ if not cls.mem_lb_client.is_version_supported(
+ cls.api_version, '2.1'):
+ user_list = cls.os_admin.users_v3_client.list_users(
+ name=CONF.load_balancer.octavia_svc_username)
+ msg = 'Only one user named "{0}" should exist, {1} found.'.format(
+ CONF.load_balancer.octavia_svc_username,
+ len(user_list['users']))
+ assert 1 == len(user_list['users']), msg
+ barbican_mgr.add_acl(new_secret_ref, user_list['users'][0]['id'])
+ return new_cert, new_key, new_secret_ref
+
+ @classmethod
def resource_setup(cls):
"""Setup resources needed by the tests."""
super(TLSWithBarbicanTest, cls).resource_setup()
@@ -70,45 +106,35 @@
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo))
- # Create a server cert and key
- cls.server_uuid = uuidutils.generate_uuid()
- server_cert, server_key = cert_utils.generate_server_cert_and_key(
- cls.ca_cert, ca_key, cls.server_uuid)
-
- LOG.debug('Server Cert: %s' % server_cert.public_bytes(
- serialization.Encoding.PEM))
- LOG.debug('Server private Key: %s' % server_key.private_bytes(
- encoding=serialization.Encoding.PEM,
- format=serialization.PrivateFormat.TraditionalOpenSSL,
- encryption_algorithm=serialization.NoEncryption()))
- server_public_key = server_key.public_key()
- LOG.debug('Server public Key: %s' % server_public_key.public_bytes(
- encoding=serialization.Encoding.PEM,
- format=serialization.PublicFormat.SubjectPublicKeyInfo))
-
- # Create the pkcs12 bundle
- pkcs12 = cert_utils.generate_pkcs12_bundle(server_cert, server_key)
- LOG.debug('Server PKCS12 bundle: %s' % base64.b64encode(pkcs12))
-
# Load the secret into the barbican service under the
# os_roles_lb_member tenant
barbican_mgr = barbican_client_mgr.BarbicanClientManager(
cls.os_roles_lb_member)
- cls.secret_ref = barbican_mgr.store_secret(pkcs12)
- cls.addClassResourceCleanup(barbican_mgr.delete_secret, cls.secret_ref)
+ # Create a server cert and key
+ # This will be used as the "default certificate" in SNI tests.
+ cls.server_uuid = uuidutils.generate_uuid()
+ LOG.debug('Server (default) UUID: %s' % cls.server_uuid)
- # Set the barbican ACL if the Octavia API version doesn't do it
- # automatically.
- if not cls.mem_lb_client.is_version_supported(
- cls.api_version, '2.1'):
- user_list = cls.os_admin.users_v3_client.list_users(
- name=CONF.load_balancer.octavia_svc_username)
- msg = 'Only one user named "{0}" should exist, {1} found.'.format(
- CONF.load_balancer.octavia_svc_username,
- len(user_list['users']))
- assert 1 == len(user_list['users']), msg
- barbican_mgr.add_acl(cls.secret_ref, user_list['users'][0]['id'])
+ server_cert, server_key, cls.server_secret_ref = (
+ cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
+ ca_key, cls.server_uuid))
+
+ # Create the SNI1 cert and key
+ cls.SNI1_uuid = uuidutils.generate_uuid()
+ LOG.debug('SNI1 UUID: %s' % cls.SNI1_uuid)
+
+ SNI1_cert, SNI1_key, cls.SNI1_secret_ref = (
+ cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
+ ca_key, cls.SNI1_uuid))
+
+ # Create the SNI2 cert and key
+ cls.SNI2_uuid = uuidutils.generate_uuid()
+ LOG.debug('SNI2 UUID: %s' % cls.SNI2_uuid)
+
+ SNI2_cert, SNI2_key, cls.SNI2_secret_ref = (
+ cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
+ ca_key, cls.SNI2_uuid))
# Setup a load balancer for the tests to use
lb_name = data_utils.rand_name("lb_member_lb1-tls")
@@ -224,7 +250,7 @@
const.PROTOCOL_PORT: '443',
const.LOADBALANCER_ID: self.lb_id,
const.DEFAULT_POOL_ID: self.pool_id,
- const.DEFAULT_TLS_CONTAINER_REF: self.secret_ref,
+ const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
}
listener = self.mem_listener_client.create_listener(**listener_kwargs)
self.listener_id = listener[const.ID]
@@ -242,7 +268,7 @@
# Test HTTPS listener load balancing.
# Note: certificate validation tests will follow this test
self.check_members_balanced(self.lb_vip_address, protocol='https',
- verify=False)
+ verify=False, protocol_port=443)
def _verify_cb(connection, x509, errno, errdepth, retcode):
"""Callback for certificate validation."""
@@ -250,9 +276,17 @@
if errdepth != 0:
return True
if errno == 0:
+ received_cn = x509.get_subject().commonName
+ received_name = self._get_cert_name(received_cn)
+ expected_cn = '{}.example.com'.format(self.server_uuid)
+ msg = ('ERROR: Received certificate "{received_name}" with CN '
+ '{received_cn} is not the expected certificate '
+ '"default" with CN {expected_cn}.'.format(
+ received_name=received_name,
+ received_cn=received_cn,
+ expected_cn=expected_cn))
# Make sure the certificate is the one we generated
- self.assertEqual('{}.example.com'.format(self.server_uuid),
- x509.get_subject().commonName)
+ self.assertEqual(expected_cn, received_cn, message=msg)
else:
LOG.error('Certificate with CN: {0} failed validation with '
'OpenSSL verify errno {1}'.format(
@@ -270,3 +304,317 @@
sock.connect((self.lb_vip_address, 443))
# Validate the certificate is signed by the ca_cert we created
sock.do_handshake()
+
+ @decorators.idempotent_id('08405802-4411-4454-b008-8607408f424a')
+ def test_basic_tls_SNI_traffic(self):
+
+ listener_name = data_utils.rand_name("lb_member_listener1-tls-sni")
+ listener_kwargs = {
+ const.NAME: listener_name,
+ const.PROTOCOL: const.TERMINATED_HTTPS,
+ const.PROTOCOL_PORT: '443',
+ const.LOADBALANCER_ID: self.lb_id,
+ const.DEFAULT_POOL_ID: self.pool_id,
+ const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
+ const.SNI_CONTAINER_REFS: [self.SNI1_secret_ref,
+ self.SNI2_secret_ref],
+ }
+ listener = self.mem_listener_client.create_listener(**listener_kwargs)
+ self.listener_id = listener[const.ID]
+ self.addCleanup(
+ self.mem_listener_client.cleanup_listener,
+ self.listener_id,
+ lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+ waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+ self.lb_id, const.PROVISIONING_STATUS,
+ const.ACTIVE,
+ CONF.load_balancer.build_interval,
+ CONF.load_balancer.build_timeout)
+
+ # Test HTTPS listener load balancing.
+ # Note: certificate validation tests will follow this test
+ self.check_members_balanced(self.lb_vip_address, protocol='https',
+ verify=False, protocol_port=443)
+
+ def _verify_server_cb(connection, x509, errno, errdepth, retcode):
+ return _verify_cb(connection, x509, errno, errdepth, retcode,
+ name=self.server_uuid)
+
+ def _verify_SNI1_cb(connection, x509, errno, errdepth, retcode):
+ return _verify_cb(connection, x509, errno, errdepth, retcode,
+ name=self.SNI1_uuid)
+
+ def _verify_SNI2_cb(connection, x509, errno, errdepth, retcode):
+ return _verify_cb(connection, x509, errno, errdepth, retcode,
+ name=self.SNI2_uuid)
+
+ def _verify_cb(connection, x509, errno, errdepth, retcode, name):
+ """Callback for certificate validation."""
+ # don't validate names of root certificates
+ if errdepth != 0:
+ return True
+ if errno == 0:
+ received_cn = x509.get_subject().commonName
+ received_name = self._get_cert_name(received_cn)
+ expected_cn = '{}.example.com'.format(name)
+ expected_name = self._get_cert_name(name)
+ msg = ('ERROR: Received certificate "{received_name}" with CN '
+ '{received_cn} is not the expected certificate '
+ '"{expected_name}" with CN {expected_cn}.'.format(
+ received_name=received_name,
+ received_cn=received_cn,
+ expected_name=expected_name,
+ expected_cn=expected_cn))
+ # Make sure the certificate is the one we generated
+ self.assertEqual(expected_cn, received_cn, message=msg)
+ else:
+ LOG.error('Certificate with CN: {0} failed validation with '
+ 'OpenSSL verify errno {1}'.format(
+ x509.get_subject().commonName, errno))
+ return False
+ return True
+
+ # Test that the default certificate is used with no SNI host request
+ context = SSL.Context(SSL.SSLv23_METHOD)
+ context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+ _verify_server_cb)
+ ca_store = context.get_cert_store()
+ ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+ sock = socket.socket()
+ sock = SSL.Connection(context, sock)
+ sock.connect((self.lb_vip_address, 443))
+ # Validate the certificate is signed by the ca_cert we created
+ sock.do_handshake()
+
+ # Test that the default certificate is used with bogus SNI host request
+ context = SSL.Context(SSL.TLSv1_METHOD)
+ context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+ _verify_server_cb)
+ ca_store = context.get_cert_store()
+ ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+ sock = socket.socket()
+ sock = SSL.Connection(context, sock)
+ sock.set_tlsext_host_name('bogus.example.com'.encode())
+ sock.connect((self.lb_vip_address, 443))
+ # Validate the certificate is signed by the ca_cert we created
+ sock.do_handshake()
+
+ # Test that the SNI1 certificate is used when SNI1 host is specified
+ context = SSL.Context(SSL.TLSv1_METHOD)
+ context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+ _verify_SNI1_cb)
+ ca_store = context.get_cert_store()
+ ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+ sock = socket.socket()
+ sock = SSL.Connection(context, sock)
+ sock.set_tlsext_host_name(
+ '{}.example.com'.format(self.SNI1_uuid).encode())
+ sock.connect((self.lb_vip_address, 443))
+ # Validate the certificate is signed by the ca_cert we created
+ sock.do_handshake()
+
+ # Test that the SNI2 certificate is used when SNI2 host is specified
+ context = SSL.Context(SSL.SSLv23_METHOD)
+ context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+ _verify_SNI2_cb)
+ ca_store = context.get_cert_store()
+ ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+ sock = socket.socket()
+ sock = SSL.Connection(context, sock)
+ sock.set_tlsext_host_name(
+ '{}.example.com'.format(self.SNI2_uuid).encode())
+ sock.connect((self.lb_vip_address, 443))
+ # Validate the certificate is signed by the ca_cert we created
+ sock.do_handshake()
+
+ def _get_cert_name(self, lookup_string):
+ if self.server_uuid in lookup_string:
+ return 'default'
+ elif self.SNI1_uuid in lookup_string:
+ return 'SNI1'
+ elif self.SNI2_uuid in lookup_string:
+ return 'SNI2'
+ else:
+ return 'Unknown'
+
+ @decorators.idempotent_id('bfac9bf4-8cd0-4519-8d99-5ad0c75abf5c')
+ def test_basic_tls_SNI_multi_listener_traffic(self):
+ """Make sure certificates are only used on the correct listeners."""
+
+ listener_name = data_utils.rand_name("lb_member_listener1-tls-sni")
+ listener_kwargs = {
+ const.NAME: listener_name,
+ const.PROTOCOL: const.TERMINATED_HTTPS,
+ const.PROTOCOL_PORT: '443',
+ const.LOADBALANCER_ID: self.lb_id,
+ const.DEFAULT_POOL_ID: self.pool_id,
+ const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
+ const.SNI_CONTAINER_REFS: [self.SNI1_secret_ref],
+ }
+ listener = self.mem_listener_client.create_listener(**listener_kwargs)
+ self.listener_id = listener[const.ID]
+ self.addCleanup(
+ self.mem_listener_client.cleanup_listener,
+ self.listener_id,
+ lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+ waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+ self.lb_id, const.PROVISIONING_STATUS,
+ const.ACTIVE,
+ CONF.load_balancer.build_interval,
+ CONF.load_balancer.build_timeout)
+
+ # Test HTTPS listener load balancing.
+ # Note: certificate validation tests will follow this test
+ self.check_members_balanced(self.lb_vip_address, protocol='https',
+ verify=False, protocol_port=443)
+
+ listener2_name = data_utils.rand_name("lb_member_listener2-tls-sni")
+ listener2_kwargs = {
+ const.NAME: listener2_name,
+ const.PROTOCOL: const.TERMINATED_HTTPS,
+ const.PROTOCOL_PORT: '8443',
+ const.LOADBALANCER_ID: self.lb_id,
+ const.DEFAULT_POOL_ID: self.pool_id,
+ const.DEFAULT_TLS_CONTAINER_REF: self.SNI2_secret_ref,
+ }
+ listener2 = self.mem_listener_client.create_listener(
+ **listener2_kwargs)
+ self.listener2_id = listener2[const.ID]
+ self.addCleanup(
+ self.mem_listener_client.cleanup_listener,
+ self.listener2_id,
+ lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+ waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+ self.lb_id, const.PROVISIONING_STATUS,
+ const.ACTIVE,
+ CONF.load_balancer.build_interval,
+ CONF.load_balancer.build_timeout)
+
+ # Test HTTPS listener load balancing.
+ # Note: certificate validation tests will follow this test
+ self.check_members_balanced(self.lb_vip_address, protocol='https',
+ verify=False, protocol_port=8443)
+
+ def _verify_server_cb(connection, x509, errno, errdepth, retcode):
+ return _verify_cb(connection, x509, errno, errdepth, retcode,
+ name=self.server_uuid)
+
+ def _verify_SNI1_cb(connection, x509, errno, errdepth, retcode):
+ return _verify_cb(connection, x509, errno, errdepth, retcode,
+ name=self.SNI1_uuid)
+
+ def _verify_SNI2_cb(connection, x509, errno, errdepth, retcode):
+ return _verify_cb(connection, x509, errno, errdepth, retcode,
+ name=self.SNI2_uuid)
+
+ def _verify_cb(connection, x509, errno, errdepth, retcode, name):
+ """Callback for certificate validation."""
+ # don't validate names of root certificates
+ if errdepth != 0:
+ return True
+ if errno == 0:
+ received_cn = x509.get_subject().commonName
+ received_name = self._get_cert_name(received_cn)
+ expected_cn = '{}.example.com'.format(name)
+ expected_name = self._get_cert_name(name)
+ msg = ('ERROR: Received certificate "{received_name}" with CN '
+ '{received_cn} is not the expected certificate '
+ '"{expected_name}" with CN {expected_cn}.'.format(
+ received_name=received_name,
+ received_cn=received_cn,
+ expected_name=expected_name,
+ expected_cn=expected_cn))
+ # Make sure the certificate is the one we generated
+ self.assertEqual(expected_cn, received_cn, message=msg)
+ else:
+ LOG.error('Certificate with CN: {0} failed validation with '
+ 'OpenSSL verify errno {1}'.format(
+ x509.get_subject().commonName, errno))
+ return False
+ return True
+
+ # Test that the default certificate is used with no SNI host request
+ context = SSL.Context(SSL.SSLv23_METHOD)
+ context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+ _verify_server_cb)
+ ca_store = context.get_cert_store()
+ ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+ sock = socket.socket()
+ sock = SSL.Connection(context, sock)
+ sock.connect((self.lb_vip_address, 443))
+ # Validate the certificate is signed by the ca_cert we created
+ sock.do_handshake()
+
+ # Test that the SNI1 certificate is used when SNI1 host is specified
+ context = SSL.Context(SSL.TLSv1_METHOD)
+ context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+ _verify_SNI1_cb)
+ ca_store = context.get_cert_store()
+ ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+ sock = socket.socket()
+ sock = SSL.Connection(context, sock)
+ sock.set_tlsext_host_name(
+ '{}.example.com'.format(self.SNI1_uuid).encode())
+ sock.connect((self.lb_vip_address, 443))
+ # Validate the certificate is signed by the ca_cert we created
+ sock.do_handshake()
+
+ # Test that the default certificate is used when SNI2 host is specified
+ context = SSL.Context(SSL.SSLv23_METHOD)
+ context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+ _verify_server_cb)
+ ca_store = context.get_cert_store()
+ ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+ sock = socket.socket()
+ sock = SSL.Connection(context, sock)
+ sock.set_tlsext_host_name(
+ '{}.example.com'.format(self.SNI2_uuid).encode())
+ sock.connect((self.lb_vip_address, 443))
+ # Validate the certificate is signed by the ca_cert we created
+ sock.do_handshake()
+
+ # Test that the SNI2 certificate is used with no SNI host request
+ # on listener 2, SNI2 is the default cert for listener 2
+ context = SSL.Context(SSL.SSLv23_METHOD)
+ context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+ _verify_SNI2_cb)
+ ca_store = context.get_cert_store()
+ ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+ sock = socket.socket()
+ sock = SSL.Connection(context, sock)
+ sock.connect((self.lb_vip_address, 8443))
+ # Validate the certificate is signed by the ca_cert we created
+ sock.do_handshake()
+
+ # Test that the SNI2 certificate is used with listener 1 host request
+ # on listener 2, SNI2 is the default cert for listener 2
+ context = SSL.Context(SSL.SSLv23_METHOD)
+ context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+ _verify_SNI2_cb)
+ ca_store = context.get_cert_store()
+ ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+ sock = socket.socket()
+ sock = SSL.Connection(context, sock)
+ sock.set_tlsext_host_name(
+ '{}.example.com'.format(self.server_uuid).encode())
+ sock.connect((self.lb_vip_address, 8443))
+ # Validate the certificate is signed by the ca_cert we created
+ sock.do_handshake()
+
+ # Test that the SNI2 certificate is used with SNI1 host request
+ # on listener 2, SNI2 is the default cert for listener 2
+ context = SSL.Context(SSL.SSLv23_METHOD)
+ context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+ _verify_SNI2_cb)
+ ca_store = context.get_cert_store()
+ ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+ sock = socket.socket()
+ sock = SSL.Connection(context, sock)
+ sock.set_tlsext_host_name(
+ '{}.example.com'.format(self.SNI1_uuid).encode())
+ sock.connect((self.lb_vip_address, 8443))
+ # Validate the certificate is signed by the ca_cert we created
+ sock.do_handshake()
diff --git a/octavia_tempest_plugin/tests/test_base.py b/octavia_tempest_plugin/tests/test_base.py
index c8f7954..5033ade 100644
--- a/octavia_tempest_plugin/tests/test_base.py
+++ b/octavia_tempest_plugin/tests/test_base.py
@@ -861,7 +861,7 @@
raise Exception()
def check_members_balanced(self, vip_address, traffic_member_count=2,
- protocol='http', verify=True):
+ protocol='http', verify=True, protocol_port=80):
handler = requests
if CONF.load_balancer.test_reuse_connection:
handler = requests.Session()
@@ -875,7 +875,8 @@
# Send a number requests to lb vip
for i in range(20):
try:
- r = handler.get('{0}://{1}'.format(protocol, vip_address),
+ r = handler.get('{0}://{1}:{2}'.format(protocol, vip_address,
+ protocol_port),
timeout=2, verify=verify)
if r.content in response_counts: