Merge "Add scenario test for Prometheus over TLS"
diff --git a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
index 3fdada4..a983e13 100644
--- a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
+++ b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
@@ -28,6 +28,7 @@
 from tempest import config
 from tempest.lib.common.utils import data_utils
 from tempest.lib import decorators
+import testtools
 
 from octavia_tempest_plugin.common import barbican_client_mgr
 from octavia_tempest_plugin.common import cert_utils
@@ -1539,3 +1540,77 @@
 
         self.check_members_balanced(self.lb_vip_address, protocol=const.HTTP,
                                     protocol_port=85)
+
+    @decorators.idempotent_id('7d9dcae6-3e2c-4eae-9bfb-1ef0d00aa530')
+    @testtools.skipUnless(
+        CONF.loadbalancer_feature_enabled.prometheus_listener_enabled,
+        'PROMETHEUS listener tests are disabled in the tempest configuration.')
+    def test_tls_prometheus_client_auth_mandatory(self):
+        if not self.mem_listener_client.is_version_supported(
+                self.api_version, '2.25'):
+            raise self.skipException('Prometheus listeners are only available '
+                                     'on Octavia API version 2.25 or newer.')
+        LISTENER1_TCP_PORT = '9443'
+        listener_name = data_utils.rand_name(
+            "lb_member_listener1-prometheus-client-auth-mand")
+        listener_kwargs = {
+            const.NAME: listener_name,
+            const.PROTOCOL: const.PROMETHEUS,
+            const.PROTOCOL_PORT: LISTENER1_TCP_PORT,
+            const.LOADBALANCER_ID: self.lb_id,
+            const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
+            const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY,
+            const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref,
+            const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref,
+        }
+        listener = self.mem_listener_client.create_listener(**listener_kwargs)
+        self.listener_id = listener[const.ID]
+        self.addCleanup(
+            self.mem_listener_client.cleanup_listener,
+            self.listener_id,
+            lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+        waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+                                self.lb_id, const.PROVISIONING_STATUS,
+                                const.ACTIVE,
+                                CONF.load_balancer.build_interval,
+                                CONF.load_balancer.build_timeout)
+
+        # Test that no client certificate fails to connect
+        self.assertRaises(
+            requests.exceptions.SSLError,
+            requests.get,
+            'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT),
+            timeout=12, verify=False)
+
+        # Test that a revoked client certificate fails to connect
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(self.revoked_client_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(self.revoked_client_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                self.assertRaises(
+                    requests.exceptions.SSLError, requests.get,
+                    'https://{0}:{1}'.format(self.lb_vip_address,
+                                             LISTENER1_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+
+        # Test that a valid client certificate can connect
+        with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+            cert_file.write(self.client_cert.public_bytes(
+                serialization.Encoding.PEM))
+            with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+                key_file.write(self.client_key.private_bytes(
+                    serialization.Encoding.PEM,
+                    serialization.PrivateFormat.TraditionalOpenSSL,
+                    serialization.NoEncryption()))
+                response = requests.get(
+                    'https://{0}:{1}'.format(self.lb_vip_address,
+                                             LISTENER1_TCP_PORT),
+                    timeout=12, verify=False, cert=(cert_file.name,
+                                                    key_file.name))
+                self.assertEqual(200, response.status_code)