Add TLS SNI scenario tests

This patch adds scenario tests that exercise the SNI capabilities
of the Octavia TLS offloading.

Depends-On: https://review.opendev.org/690444
Change-Id: I4bbd103e34997dd6b1bb64cb5d69b5135c6e26ea
diff --git a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
index 25f741a..0fe1d81 100644
--- a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
+++ b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
@@ -53,6 +53,42 @@
                                     'barbican service.')
 
     @classmethod
+    def _generate_load_certificate(cls, barbican_mgr, ca_cert, ca_key, name):
+        new_cert, new_key = cert_utils.generate_server_cert_and_key(
+            ca_cert, ca_key, name)
+
+        LOG.debug('%s Cert: %s', name, new_cert.public_bytes(
+            serialization.Encoding.PEM))
+        LOG.debug('%s private Key: %s', name, new_key.private_bytes(
+            encoding=serialization.Encoding.PEM,
+            format=serialization.PrivateFormat.TraditionalOpenSSL,
+            encryption_algorithm=serialization.NoEncryption()))
+        new_public_key = new_key.public_key()
+        LOG.debug('%s public Key: %s', name, new_public_key.public_bytes(
+            encoding=serialization.Encoding.PEM,
+            format=serialization.PublicFormat.SubjectPublicKeyInfo))
+
+        # Create the pkcs12 bundle
+        pkcs12 = cert_utils.generate_pkcs12_bundle(new_cert, new_key)
+        LOG.debug('%s PKCS12 bundle: %s', name, base64.b64encode(pkcs12))
+
+        new_secret_ref = barbican_mgr.store_secret(pkcs12)
+        cls.addClassResourceCleanup(barbican_mgr.delete_secret, new_secret_ref)
+
+        # Set the barbican ACL if the Octavia API version doesn't do it
+        # automatically.
+        if not cls.mem_lb_client.is_version_supported(
+                cls.api_version, '2.1'):
+            user_list = cls.os_admin.users_v3_client.list_users(
+                name=CONF.load_balancer.octavia_svc_username)
+            msg = 'Only one user named "{0}" should exist, {1} found.'.format(
+                CONF.load_balancer.octavia_svc_username,
+                len(user_list['users']))
+            assert 1 == len(user_list['users']), msg
+            barbican_mgr.add_acl(new_secret_ref, user_list['users'][0]['id'])
+        return new_cert, new_key, new_secret_ref
+
+    @classmethod
     def resource_setup(cls):
         """Setup resources needed by the tests."""
         super(TLSWithBarbicanTest, cls).resource_setup()
@@ -70,45 +106,35 @@
             encoding=serialization.Encoding.PEM,
             format=serialization.PublicFormat.SubjectPublicKeyInfo))
 
-        # Create a server cert and key
-        cls.server_uuid = uuidutils.generate_uuid()
-        server_cert, server_key = cert_utils.generate_server_cert_and_key(
-            cls.ca_cert, ca_key, cls.server_uuid)
-
-        LOG.debug('Server Cert: %s' % server_cert.public_bytes(
-            serialization.Encoding.PEM))
-        LOG.debug('Server private Key: %s' % server_key.private_bytes(
-            encoding=serialization.Encoding.PEM,
-            format=serialization.PrivateFormat.TraditionalOpenSSL,
-            encryption_algorithm=serialization.NoEncryption()))
-        server_public_key = server_key.public_key()
-        LOG.debug('Server public Key: %s' % server_public_key.public_bytes(
-            encoding=serialization.Encoding.PEM,
-            format=serialization.PublicFormat.SubjectPublicKeyInfo))
-
-        # Create the pkcs12 bundle
-        pkcs12 = cert_utils.generate_pkcs12_bundle(server_cert, server_key)
-        LOG.debug('Server PKCS12 bundle: %s' % base64.b64encode(pkcs12))
-
         # Load the secret into the barbican service under the
         # os_roles_lb_member tenant
         barbican_mgr = barbican_client_mgr.BarbicanClientManager(
             cls.os_roles_lb_member)
 
-        cls.secret_ref = barbican_mgr.store_secret(pkcs12)
-        cls.addClassResourceCleanup(barbican_mgr.delete_secret, cls.secret_ref)
+        # Create a server cert and key
+        # This will be used as the "default certificate" in SNI tests.
+        cls.server_uuid = uuidutils.generate_uuid()
+        LOG.debug('Server (default) UUID: %s' % cls.server_uuid)
 
-        # Set the barbican ACL if the Octavia API version doesn't do it
-        # automatically.
-        if not cls.mem_lb_client.is_version_supported(
-                cls.api_version, '2.1'):
-            user_list = cls.os_admin.users_v3_client.list_users(
-                name=CONF.load_balancer.octavia_svc_username)
-            msg = 'Only one user named "{0}" should exist, {1} found.'.format(
-                CONF.load_balancer.octavia_svc_username,
-                len(user_list['users']))
-            assert 1 == len(user_list['users']), msg
-            barbican_mgr.add_acl(cls.secret_ref, user_list['users'][0]['id'])
+        server_cert, server_key, cls.server_secret_ref = (
+            cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
+                                           ca_key, cls.server_uuid))
+
+        # Create the SNI1 cert and key
+        cls.SNI1_uuid = uuidutils.generate_uuid()
+        LOG.debug('SNI1 UUID: %s' % cls.SNI1_uuid)
+
+        SNI1_cert, SNI1_key, cls.SNI1_secret_ref = (
+            cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
+                                           ca_key, cls.SNI1_uuid))
+
+        # Create the SNI2 cert and key
+        cls.SNI2_uuid = uuidutils.generate_uuid()
+        LOG.debug('SNI2 UUID: %s' % cls.SNI2_uuid)
+
+        SNI2_cert, SNI2_key, cls.SNI2_secret_ref = (
+            cls._generate_load_certificate(barbican_mgr, cls.ca_cert,
+                                           ca_key, cls.SNI2_uuid))
 
         # Setup a load balancer for the tests to use
         lb_name = data_utils.rand_name("lb_member_lb1-tls")
@@ -224,7 +250,7 @@
             const.PROTOCOL_PORT: '443',
             const.LOADBALANCER_ID: self.lb_id,
             const.DEFAULT_POOL_ID: self.pool_id,
-            const.DEFAULT_TLS_CONTAINER_REF: self.secret_ref,
+            const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
         }
         listener = self.mem_listener_client.create_listener(**listener_kwargs)
         self.listener_id = listener[const.ID]
@@ -242,7 +268,7 @@
         # Test HTTPS listener load balancing.
         # Note: certificate validation tests will follow this test
         self.check_members_balanced(self.lb_vip_address, protocol='https',
-                                    verify=False)
+                                    verify=False, protocol_port=443)
 
         def _verify_cb(connection, x509, errno, errdepth, retcode):
             """Callback for certificate validation."""
@@ -250,9 +276,17 @@
             if errdepth != 0:
                 return True
             if errno == 0:
+                received_cn = x509.get_subject().commonName
+                received_name = self._get_cert_name(received_cn)
+                expected_cn = '{}.example.com'.format(self.server_uuid)
+                msg = ('ERROR: Received certificate "{received_name}" with CN '
+                       '{received_cn} is not the expected certificate '
+                       '"default" with CN {expected_cn}.'.format(
+                           received_name=received_name,
+                           received_cn=received_cn,
+                           expected_cn=expected_cn))
                 # Make sure the certificate is the one we generated
-                self.assertEqual('{}.example.com'.format(self.server_uuid),
-                                 x509.get_subject().commonName)
+                self.assertEqual(expected_cn, received_cn, message=msg)
             else:
                 LOG.error('Certificate with CN: {0} failed validation with '
                           'OpenSSL verify errno {1}'.format(
@@ -270,3 +304,317 @@
         sock.connect((self.lb_vip_address, 443))
         # Validate the certificate is signed by the ca_cert we created
         sock.do_handshake()
+
+    @decorators.idempotent_id('08405802-4411-4454-b008-8607408f424a')
+    def test_basic_tls_SNI_traffic(self):
+
+        listener_name = data_utils.rand_name("lb_member_listener1-tls-sni")
+        listener_kwargs = {
+            const.NAME: listener_name,
+            const.PROTOCOL: const.TERMINATED_HTTPS,
+            const.PROTOCOL_PORT: '443',
+            const.LOADBALANCER_ID: self.lb_id,
+            const.DEFAULT_POOL_ID: self.pool_id,
+            const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
+            const.SNI_CONTAINER_REFS: [self.SNI1_secret_ref,
+                                       self.SNI2_secret_ref],
+        }
+        listener = self.mem_listener_client.create_listener(**listener_kwargs)
+        self.listener_id = listener[const.ID]
+        self.addCleanup(
+            self.mem_listener_client.cleanup_listener,
+            self.listener_id,
+            lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+        waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+                                self.lb_id, const.PROVISIONING_STATUS,
+                                const.ACTIVE,
+                                CONF.load_balancer.build_interval,
+                                CONF.load_balancer.build_timeout)
+
+        # Test HTTPS listener load balancing.
+        # Note: certificate validation tests will follow this test
+        self.check_members_balanced(self.lb_vip_address, protocol='https',
+                                    verify=False, protocol_port=443)
+
+        def _verify_server_cb(connection, x509, errno, errdepth, retcode):
+            return _verify_cb(connection, x509, errno, errdepth, retcode,
+                              name=self.server_uuid)
+
+        def _verify_SNI1_cb(connection, x509, errno, errdepth, retcode):
+            return _verify_cb(connection, x509, errno, errdepth, retcode,
+                              name=self.SNI1_uuid)
+
+        def _verify_SNI2_cb(connection, x509, errno, errdepth, retcode):
+            return _verify_cb(connection, x509, errno, errdepth, retcode,
+                              name=self.SNI2_uuid)
+
+        def _verify_cb(connection, x509, errno, errdepth, retcode, name):
+            """Callback for certificate validation."""
+            # don't validate names of root certificates
+            if errdepth != 0:
+                return True
+            if errno == 0:
+                received_cn = x509.get_subject().commonName
+                received_name = self._get_cert_name(received_cn)
+                expected_cn = '{}.example.com'.format(name)
+                expected_name = self._get_cert_name(name)
+                msg = ('ERROR: Received certificate "{received_name}" with CN '
+                       '{received_cn} is not the expected certificate '
+                       '"{expected_name}" with CN {expected_cn}.'.format(
+                           received_name=received_name,
+                           received_cn=received_cn,
+                           expected_name=expected_name,
+                           expected_cn=expected_cn))
+                # Make sure the certificate is the one we generated
+                self.assertEqual(expected_cn, received_cn, message=msg)
+            else:
+                LOG.error('Certificate with CN: {0} failed validation with '
+                          'OpenSSL verify errno {1}'.format(
+                              x509.get_subject().commonName, errno))
+                return False
+            return True
+
+        # Test that the default certificate is used with no SNI host request
+        context = SSL.Context(SSL.SSLv23_METHOD)
+        context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+                           _verify_server_cb)
+        ca_store = context.get_cert_store()
+        ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+        sock = socket.socket()
+        sock = SSL.Connection(context, sock)
+        sock.connect((self.lb_vip_address, 443))
+        # Validate the certificate is signed by the ca_cert we created
+        sock.do_handshake()
+
+        # Test that the default certificate is used with bogus SNI host request
+        context = SSL.Context(SSL.TLSv1_METHOD)
+        context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+                           _verify_server_cb)
+        ca_store = context.get_cert_store()
+        ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+        sock = socket.socket()
+        sock = SSL.Connection(context, sock)
+        sock.set_tlsext_host_name('bogus.example.com'.encode())
+        sock.connect((self.lb_vip_address, 443))
+        # Validate the certificate is signed by the ca_cert we created
+        sock.do_handshake()
+
+        # Test that the SNI1 certificate is used when SNI1 host is specified
+        context = SSL.Context(SSL.TLSv1_METHOD)
+        context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+                           _verify_SNI1_cb)
+        ca_store = context.get_cert_store()
+        ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+        sock = socket.socket()
+        sock = SSL.Connection(context, sock)
+        sock.set_tlsext_host_name(
+            '{}.example.com'.format(self.SNI1_uuid).encode())
+        sock.connect((self.lb_vip_address, 443))
+        # Validate the certificate is signed by the ca_cert we created
+        sock.do_handshake()
+
+        # Test that the SNI2 certificate is used when SNI2 host is specified
+        context = SSL.Context(SSL.SSLv23_METHOD)
+        context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+                           _verify_SNI2_cb)
+        ca_store = context.get_cert_store()
+        ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+        sock = socket.socket()
+        sock = SSL.Connection(context, sock)
+        sock.set_tlsext_host_name(
+            '{}.example.com'.format(self.SNI2_uuid).encode())
+        sock.connect((self.lb_vip_address, 443))
+        # Validate the certificate is signed by the ca_cert we created
+        sock.do_handshake()
+
+    def _get_cert_name(self, lookup_string):
+        if self.server_uuid in lookup_string:
+            return 'default'
+        elif self.SNI1_uuid in lookup_string:
+            return 'SNI1'
+        elif self.SNI2_uuid in lookup_string:
+            return 'SNI2'
+        else:
+            return 'Unknown'
+
+    @decorators.idempotent_id('bfac9bf4-8cd0-4519-8d99-5ad0c75abf5c')
+    def test_basic_tls_SNI_multi_listener_traffic(self):
+        """Make sure certificates are only used on the correct listeners."""
+
+        listener_name = data_utils.rand_name("lb_member_listener1-tls-sni")
+        listener_kwargs = {
+            const.NAME: listener_name,
+            const.PROTOCOL: const.TERMINATED_HTTPS,
+            const.PROTOCOL_PORT: '443',
+            const.LOADBALANCER_ID: self.lb_id,
+            const.DEFAULT_POOL_ID: self.pool_id,
+            const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
+            const.SNI_CONTAINER_REFS: [self.SNI1_secret_ref],
+        }
+        listener = self.mem_listener_client.create_listener(**listener_kwargs)
+        self.listener_id = listener[const.ID]
+        self.addCleanup(
+            self.mem_listener_client.cleanup_listener,
+            self.listener_id,
+            lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+        waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+                                self.lb_id, const.PROVISIONING_STATUS,
+                                const.ACTIVE,
+                                CONF.load_balancer.build_interval,
+                                CONF.load_balancer.build_timeout)
+
+        # Test HTTPS listener load balancing.
+        # Note: certificate validation tests will follow this test
+        self.check_members_balanced(self.lb_vip_address, protocol='https',
+                                    verify=False, protocol_port=443)
+
+        listener2_name = data_utils.rand_name("lb_member_listener2-tls-sni")
+        listener2_kwargs = {
+            const.NAME: listener2_name,
+            const.PROTOCOL: const.TERMINATED_HTTPS,
+            const.PROTOCOL_PORT: '8443',
+            const.LOADBALANCER_ID: self.lb_id,
+            const.DEFAULT_POOL_ID: self.pool_id,
+            const.DEFAULT_TLS_CONTAINER_REF: self.SNI2_secret_ref,
+        }
+        listener2 = self.mem_listener_client.create_listener(
+            **listener2_kwargs)
+        self.listener2_id = listener2[const.ID]
+        self.addCleanup(
+            self.mem_listener_client.cleanup_listener,
+            self.listener2_id,
+            lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+        waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+                                self.lb_id, const.PROVISIONING_STATUS,
+                                const.ACTIVE,
+                                CONF.load_balancer.build_interval,
+                                CONF.load_balancer.build_timeout)
+
+        # Test HTTPS listener load balancing.
+        # Note: certificate validation tests will follow this test
+        self.check_members_balanced(self.lb_vip_address, protocol='https',
+                                    verify=False, protocol_port=8443)
+
+        def _verify_server_cb(connection, x509, errno, errdepth, retcode):
+            return _verify_cb(connection, x509, errno, errdepth, retcode,
+                              name=self.server_uuid)
+
+        def _verify_SNI1_cb(connection, x509, errno, errdepth, retcode):
+            return _verify_cb(connection, x509, errno, errdepth, retcode,
+                              name=self.SNI1_uuid)
+
+        def _verify_SNI2_cb(connection, x509, errno, errdepth, retcode):
+            return _verify_cb(connection, x509, errno, errdepth, retcode,
+                              name=self.SNI2_uuid)
+
+        def _verify_cb(connection, x509, errno, errdepth, retcode, name):
+            """Callback for certificate validation."""
+            # don't validate names of root certificates
+            if errdepth != 0:
+                return True
+            if errno == 0:
+                received_cn = x509.get_subject().commonName
+                received_name = self._get_cert_name(received_cn)
+                expected_cn = '{}.example.com'.format(name)
+                expected_name = self._get_cert_name(name)
+                msg = ('ERROR: Received certificate "{received_name}" with CN '
+                       '{received_cn} is not the expected certificate '
+                       '"{expected_name}" with CN {expected_cn}.'.format(
+                           received_name=received_name,
+                           received_cn=received_cn,
+                           expected_name=expected_name,
+                           expected_cn=expected_cn))
+                # Make sure the certificate is the one we generated
+                self.assertEqual(expected_cn, received_cn, message=msg)
+            else:
+                LOG.error('Certificate with CN: {0} failed validation with '
+                          'OpenSSL verify errno {1}'.format(
+                              x509.get_subject().commonName, errno))
+                return False
+            return True
+
+        # Test that the default certificate is used with no SNI host request
+        context = SSL.Context(SSL.SSLv23_METHOD)
+        context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+                           _verify_server_cb)
+        ca_store = context.get_cert_store()
+        ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+        sock = socket.socket()
+        sock = SSL.Connection(context, sock)
+        sock.connect((self.lb_vip_address, 443))
+        # Validate the certificate is signed by the ca_cert we created
+        sock.do_handshake()
+
+        # Test that the SNI1 certificate is used when SNI1 host is specified
+        context = SSL.Context(SSL.TLSv1_METHOD)
+        context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+                           _verify_SNI1_cb)
+        ca_store = context.get_cert_store()
+        ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+        sock = socket.socket()
+        sock = SSL.Connection(context, sock)
+        sock.set_tlsext_host_name(
+            '{}.example.com'.format(self.SNI1_uuid).encode())
+        sock.connect((self.lb_vip_address, 443))
+        # Validate the certificate is signed by the ca_cert we created
+        sock.do_handshake()
+
+        # Test that the default certificate is used when SNI2 host is specified
+        context = SSL.Context(SSL.SSLv23_METHOD)
+        context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+                           _verify_server_cb)
+        ca_store = context.get_cert_store()
+        ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+        sock = socket.socket()
+        sock = SSL.Connection(context, sock)
+        sock.set_tlsext_host_name(
+            '{}.example.com'.format(self.SNI2_uuid).encode())
+        sock.connect((self.lb_vip_address, 443))
+        # Validate the certificate is signed by the ca_cert we created
+        sock.do_handshake()
+
+        # Test that the SNI2 certificate is used with no SNI host request
+        # on listener 2, SNI2 is the default cert for listener 2
+        context = SSL.Context(SSL.SSLv23_METHOD)
+        context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+                           _verify_SNI2_cb)
+        ca_store = context.get_cert_store()
+        ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+        sock = socket.socket()
+        sock = SSL.Connection(context, sock)
+        sock.connect((self.lb_vip_address, 8443))
+        # Validate the certificate is signed by the ca_cert we created
+        sock.do_handshake()
+
+        # Test that the SNI2 certificate is used with listener 1 host request
+        # on listener 2, SNI2 is the default cert for listener 2
+        context = SSL.Context(SSL.SSLv23_METHOD)
+        context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+                           _verify_SNI2_cb)
+        ca_store = context.get_cert_store()
+        ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+        sock = socket.socket()
+        sock = SSL.Connection(context, sock)
+        sock.set_tlsext_host_name(
+            '{}.example.com'.format(self.server_uuid).encode())
+        sock.connect((self.lb_vip_address, 8443))
+        # Validate the certificate is signed by the ca_cert we created
+        sock.do_handshake()
+
+        # Test that the SNI2 certificate is used with SNI1 host request
+        # on listener 2, SNI2 is the default cert for listener 2
+        context = SSL.Context(SSL.SSLv23_METHOD)
+        context.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
+                           _verify_SNI2_cb)
+        ca_store = context.get_cert_store()
+        ca_store.add_cert(X509.from_cryptography(self.ca_cert))
+        sock = socket.socket()
+        sock = SSL.Connection(context, sock)
+        sock.set_tlsext_host_name(
+            '{}.example.com'.format(self.SNI1_uuid).encode())
+        sock.connect((self.lb_vip_address, 8443))
+        # Validate the certificate is signed by the ca_cert we created
+        sock.do_handshake()
diff --git a/octavia_tempest_plugin/tests/test_base.py b/octavia_tempest_plugin/tests/test_base.py
index c8f7954..5033ade 100644
--- a/octavia_tempest_plugin/tests/test_base.py
+++ b/octavia_tempest_plugin/tests/test_base.py
@@ -861,7 +861,7 @@
         raise Exception()
 
     def check_members_balanced(self, vip_address, traffic_member_count=2,
-                               protocol='http', verify=True):
+                               protocol='http', verify=True, protocol_port=80):
         handler = requests
         if CONF.load_balancer.test_reuse_connection:
             handler = requests.Session()
@@ -875,7 +875,8 @@
         # Send a number requests to lb vip
         for i in range(20):
             try:
-                r = handler.get('{0}://{1}'.format(protocol, vip_address),
+                r = handler.get('{0}://{1}:{2}'.format(protocol, vip_address,
+                                                       protocol_port),
                                 timeout=2, verify=verify)
 
                 if r.content in response_counts: