Merge "Add scenario test for Prometheus over TLS"
diff --git a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
index 3fdada4..a983e13 100644
--- a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
+++ b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
@@ -28,6 +28,7 @@
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
+import testtools
from octavia_tempest_plugin.common import barbican_client_mgr
from octavia_tempest_plugin.common import cert_utils
@@ -1539,3 +1540,77 @@
self.check_members_balanced(self.lb_vip_address, protocol=const.HTTP,
protocol_port=85)
+
+ @decorators.idempotent_id('7d9dcae6-3e2c-4eae-9bfb-1ef0d00aa530')
+ @testtools.skipUnless(
+ CONF.loadbalancer_feature_enabled.prometheus_listener_enabled,
+ 'PROMETHEUS listener tests are disabled in the tempest configuration.')
+ def test_tls_prometheus_client_auth_mandatory(self):
+ if not self.mem_listener_client.is_version_supported(
+ self.api_version, '2.25'):
+ raise self.skipException('Prometheus listeners are only available '
+ 'on Octavia API version 2.25 or newer.')
+ LISTENER1_TCP_PORT = '9443'
+ listener_name = data_utils.rand_name(
+ "lb_member_listener1-prometheus-client-auth-mand")
+ listener_kwargs = {
+ const.NAME: listener_name,
+ const.PROTOCOL: const.PROMETHEUS,
+ const.PROTOCOL_PORT: LISTENER1_TCP_PORT,
+ const.LOADBALANCER_ID: self.lb_id,
+ const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
+ const.CLIENT_AUTHENTICATION: const.CLIENT_AUTH_MANDATORY,
+ const.CLIENT_CA_TLS_CONTAINER_REF: self.client_ca_cert_ref,
+ const.CLIENT_CRL_CONTAINER_REF: self.client_crl_ref,
+ }
+ listener = self.mem_listener_client.create_listener(**listener_kwargs)
+ self.listener_id = listener[const.ID]
+ self.addCleanup(
+ self.mem_listener_client.cleanup_listener,
+ self.listener_id,
+ lb_client=self.mem_lb_client, lb_id=self.lb_id)
+
+ waiters.wait_for_status(self.mem_lb_client.show_loadbalancer,
+ self.lb_id, const.PROVISIONING_STATUS,
+ const.ACTIVE,
+ CONF.load_balancer.build_interval,
+ CONF.load_balancer.build_timeout)
+
+ # Test that no client certificate fails to connect
+ self.assertRaises(
+ requests.exceptions.SSLError,
+ requests.get,
+ 'https://{0}:{1}'.format(self.lb_vip_address, LISTENER1_TCP_PORT),
+ timeout=12, verify=False)
+
+ # Test that a revoked client certificate fails to connect
+ with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+ cert_file.write(self.revoked_client_cert.public_bytes(
+ serialization.Encoding.PEM))
+ with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+ key_file.write(self.revoked_client_key.private_bytes(
+ serialization.Encoding.PEM,
+ serialization.PrivateFormat.TraditionalOpenSSL,
+ serialization.NoEncryption()))
+ self.assertRaises(
+ requests.exceptions.SSLError, requests.get,
+ 'https://{0}:{1}'.format(self.lb_vip_address,
+ LISTENER1_TCP_PORT),
+ timeout=12, verify=False, cert=(cert_file.name,
+ key_file.name))
+
+ # Test that a valid client certificate can connect
+ with tempfile.NamedTemporaryFile(buffering=0) as cert_file:
+ cert_file.write(self.client_cert.public_bytes(
+ serialization.Encoding.PEM))
+ with tempfile.NamedTemporaryFile(buffering=0) as key_file:
+ key_file.write(self.client_key.private_bytes(
+ serialization.Encoding.PEM,
+ serialization.PrivateFormat.TraditionalOpenSSL,
+ serialization.NoEncryption()))
+ response = requests.get(
+ 'https://{0}:{1}'.format(self.lb_vip_address,
+ LISTENER1_TCP_PORT),
+ timeout=12, verify=False, cert=(cert_file.name,
+ key_file.name))
+ self.assertEqual(200, response.status_code)