Test new HSTS feature
Test HTTP Strict Transport Security with Octavia.
Partial-Bug: #2017972
Change-Id: Ie54714015e943fd1cb75ca95f8cf241fbc99268c
diff --git a/octavia_tempest_plugin/common/constants.py b/octavia_tempest_plugin/common/constants.py
index e3bd83e..733f0f4 100644
--- a/octavia_tempest_plugin/common/constants.py
+++ b/octavia_tempest_plugin/common/constants.py
@@ -72,6 +72,9 @@
DEFAULT_POOL_ID = 'default_pool_id'
L7_POLICIES = 'l7_policies'
ALPN_PROTOCOLS = 'alpn_protocols'
+HSTS_MAX_AGE = 'hsts_max_age'
+HSTS_INCLUDE_SUBDOMAINS = 'hsts_include_subdomains'
+HSTS_PRELOAD = 'hsts_preload'
LB_ALGORITHM = 'lb_algorithm'
LB_ALGORITHM_ROUND_ROBIN = 'ROUND_ROBIN'
diff --git a/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py b/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py
index 1ee70f7..1c8e6e5 100644
--- a/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py
+++ b/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py
@@ -41,7 +41,8 @@
sni_container_refs=Unset, client_authentication=Unset,
client_ca_tls_container_ref=Unset,
client_crl_container_ref=Unset, allowed_cidrs=Unset,
- alpn_protocols=Unset,
+ alpn_protocols=Unset, hsts_max_age=Unset,
+ hsts_include_subdomains=Unset, hsts_preload=Unset,
return_object_only=True):
"""Create a listener.
@@ -92,6 +93,12 @@
:param allowed_cidrs: A list of IPv4 or IPv6 CIDRs.
:param alpn_protocols: A list of ALPN protocols for TERMINATED_HTTPS
listeners.
+ :param hsts_include_subdomains: Defines whether the
+ `include_subdomains` directive is used for HSTS or not
+ :param hsts_max_age: Enables HTTP Strict Transport Security (HSTS)
+ and sets the `max_age` directive to given value
+ :param hsts_preload: Defines whether the `hsts_preload` directive
+ is used for HSTS or not
:param return_object_only: If True, the response returns the object
inside the root tag. False returns the full
response from the API.
@@ -218,7 +225,8 @@
sni_container_refs=Unset, client_authentication=Unset,
client_ca_tls_container_ref=Unset,
client_crl_container_ref=Unset, allowed_cidrs=Unset,
- alpn_protocols=Unset,
+ alpn_protocols=Unset, hsts_max_age=Unset,
+ hsts_include_subdomains=Unset, hsts_preload=Unset,
return_object_only=True):
"""Update a listener.
@@ -267,6 +275,12 @@
:param allowed_cidrs: A list of IPv4 or IPv6 CIDRs.
:param alpn_protocols: A list of ALPN protocols for TERMINATED_HTTPS
listeners.
+ :param hsts_include_subdomains: Defines whether the
+ `include_subdomains` directive is used for HSTS or not
+ :param hsts_max_age: Enables HTTP Strict Transport Security (HSTS)
+ and sets the `max_age` directive to given value
+ :param hsts_preload: Defines whether the `hsts_preload` directive
+ is used for HSTS or not
:param return_object_only: If True, the response returns the object
inside the root tag. False returns the full
response from the API.
diff --git a/octavia_tempest_plugin/tests/api/v2/test_listener.py b/octavia_tempest_plugin/tests/api/v2/test_listener.py
index 8961d78..cd320f4 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_listener.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_listener.py
@@ -297,6 +297,8 @@
listener_name = data_utils.rand_name("lb_member_listener1-create")
listener_description = data_utils.arbitrary_string(size=255)
+ hsts_supported = self.mem_listener_client.is_version_supported(
+ self.api_version, '2.27') and protocol == const.TERMINATED_HTTPS
listener_kwargs = {
const.NAME: listener_name,
@@ -351,9 +353,13 @@
exceptions.BadRequest,
self.mem_listener_client.create_listener,
**listener_kwargs)
-
listener_kwargs.update({const.ALLOWED_CIDRS: self.allowed_cidrs})
+ if hsts_supported:
+ listener_kwargs[const.HSTS_PRELOAD] = True
+ listener_kwargs[const.HSTS_MAX_AGE] = 10000
+ listener_kwargs[const.HSTS_INCLUDE_SUBDOMAINS] = True
+
# Test that a user without the loadbalancer role cannot
# create a listener.
expected_allowed = []
@@ -411,6 +417,11 @@
equal_items.append(const.TIMEOUT_MEMBER_DATA)
equal_items.append(const.TIMEOUT_TCP_INSPECT)
+ if hsts_supported:
+ equal_items.append(const.HSTS_PRELOAD)
+ equal_items.append(const.HSTS_MAX_AGE)
+ equal_items.append(const.HSTS_INCLUDE_SUBDOMAINS)
+
for item in equal_items:
self.assertEqual(listener_kwargs[item], listener[item])
@@ -1010,6 +1021,11 @@
if self.mem_listener_client.is_version_supported(
self.api_version, '2.12'):
show_listener_response_fields.append('allowed_cidrs')
+ if self.mem_listener_client.is_version_supported(
+ self.api_version, '2.27'):
+ show_listener_response_fields.append(const.HSTS_PRELOAD)
+ show_listener_response_fields.append(const.HSTS_MAX_AGE)
+ show_listener_response_fields.append(const.HSTS_INCLUDE_SUBDOMAINS)
for field in show_listener_response_fields:
if field in (const.DEFAULT_POOL_ID, const.L7_POLICIES):
continue
@@ -1142,6 +1158,8 @@
listener_name = data_utils.rand_name("lb_member_listener1-show")
listener_description = data_utils.arbitrary_string(size=255)
+ hsts_supported = self.mem_listener_client.is_version_supported(
+ self.api_version, '2.27') and protocol == const.TERMINATED_HTTPS
listener_kwargs = {
const.NAME: listener_name,
@@ -1168,6 +1186,11 @@
self.SNI2_secret_ref],
})
+ if hsts_supported:
+ listener_kwargs[const.HSTS_PRELOAD] = True
+ listener_kwargs[const.HSTS_MAX_AGE] = 10000
+ listener_kwargs[const.HSTS_INCLUDE_SUBDOMAINS] = True
+
if self.mem_listener_client.is_version_supported(
self.api_version, '2.1'):
listener_kwargs.update({
@@ -1263,6 +1286,11 @@
self.api_version, '2.12'):
self.assertEqual(self.allowed_cidrs, listener[const.ALLOWED_CIDRS])
+ if hsts_supported:
+ self.assertTrue(listener[const.HSTS_PRELOAD])
+ self.assertEqual(10000, listener[const.HSTS_MAX_AGE])
+ self.assertTrue(listener[const.HSTS_INCLUDE_SUBDOMAINS])
+
# Test that the appropriate users can see or not see the listener
# based on the API RBAC.
expected_allowed = []
@@ -1340,6 +1368,8 @@
listener_name = data_utils.rand_name("lb_member_listener1-update")
listener_description = data_utils.arbitrary_string(size=255)
+ hsts_supported = self.mem_listener_client.is_version_supported(
+ self.api_version, '2.27') and protocol == const.TERMINATED_HTTPS
listener_kwargs = {
const.NAME: listener_name,
@@ -1522,6 +1552,11 @@
new_cidrs = ['2001:db8::/64']
listener_update_kwargs.update({const.ALLOWED_CIDRS: new_cidrs})
+ if hsts_supported:
+ listener_update_kwargs[const.HSTS_PRELOAD] = False
+ listener_update_kwargs[const.HSTS_MAX_AGE] = 0
+ listener_update_kwargs[const.HSTS_INCLUDE_SUBDOMAINS] = False
+
listener = self.mem_listener_client.update_listener(
listener[const.ID], **listener_update_kwargs)
@@ -1587,6 +1622,11 @@
expected_cidrs = ['2001:db8::/64']
self.assertEqual(expected_cidrs, listener[const.ALLOWED_CIDRS])
+ if hsts_supported:
+ self.assertFalse(listener[const.HSTS_PRELOAD])
+ self.assertEqual(0, listener[const.HSTS_MAX_AGE])
+ self.assertFalse(listener[const.HSTS_INCLUDE_SUBDOMAINS])
+
@decorators.idempotent_id('16f11c82-f069-4592-8954-81b35a98e3b7')
def test_http_listener_delete(self):
self._test_listener_delete(const.HTTP, 8070)
diff --git a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
index 511c724..2e1464c 100644
--- a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
+++ b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
@@ -1222,7 +1222,8 @@
self.assertEqual(expected_proto, selected_proto)
- def _test_http_versions_tls_traffic(self, http_version, alpn_protos):
+ def _test_http_versions_tls_traffic(self, http_version, alpn_protos,
+ hsts: bool = False):
if not self.mem_listener_client.is_version_supported(
self.api_version, '2.20'):
raise self.skipException('ALPN protocols are only available on '
@@ -1237,6 +1238,12 @@
const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
const.ALPN_PROTOCOLS: alpn_protos,
}
+ if self.mem_listener_client.is_version_supported(
+ self.api_version, '2.27'):
+ listener_kwargs[const.HSTS_MAX_AGE] = 100 if hsts else None
+ listener_kwargs[const.HSTS_INCLUDE_SUBDOMAINS] = hsts
+ listener_kwargs[const.HSTS_PRELOAD] = hsts
+
listener = self.mem_listener_client.create_listener(**listener_kwargs)
self.listener_id = listener[const.ID]
self.addCleanup(
@@ -1258,6 +1265,12 @@
client = httpx.Client(http2=(http_version == 'HTTP/2'), verify=context)
r = client.get(url)
self.assertEqual(http_version, r.http_version)
+ if hsts:
+ self.assertIn('strict-transport-security', r.headers)
+ self.assertEqual('max-age=100; includeSubDomains; preload;',
+ r.headers['strict-transport-security'])
+ else:
+ self.assertNotIn('strict-transport-security', r.headers)
@decorators.idempotent_id('9965828d-24af-4fa0-91ae-21c6bc47ab4c')
def test_http_2_tls_traffic(self):
@@ -1268,6 +1281,15 @@
self._test_http_versions_tls_traffic(
'HTTP/1.1', ['http/1.1', 'http/1.0'])
+ @decorators.idempotent_id('7436c6b7-44be-4544-a40b-31d2b7b2ad0b')
+ def test_http_1_1_tls_hsts_traffic(self):
+ if not self.mem_listener_client.is_version_supported(
+ self.api_version, '2.27'):
+ raise self.skipException('HSTS is only available on '
+ 'Octavia API version 2.27 or newer.')
+ self._test_http_versions_tls_traffic(
+ 'HTTP/1.1', ['http/1.1', 'http/1.0'], hsts=True)
+
@decorators.idempotent_id('ee0faf71-d11e-4323-8673-e5e15779749b')
def test_pool_reencryption(self):
if not self.mem_listener_client.is_version_supported(
diff --git a/octavia_tempest_plugin/tests/test_base.py b/octavia_tempest_plugin/tests/test_base.py
index f08cec9..51834fe 100644
--- a/octavia_tempest_plugin/tests/test_base.py
+++ b/octavia_tempest_plugin/tests/test_base.py
@@ -34,6 +34,7 @@
from octavia_tempest_plugin.common import cert_utils
from octavia_tempest_plugin.common import constants as const
+import octavia_tempest_plugin.services.load_balancer.v2 as lbv2
from octavia_tempest_plugin.tests import RBAC_tests
from octavia_tempest_plugin.tests import validators
from octavia_tempest_plugin.tests import waiters
@@ -182,27 +183,29 @@
cls.os_roles_lb_member.security_group_rules_client)
cls.lb_mem_servers_client = cls.os_roles_lb_member.servers_client
cls.lb_mem_subnet_client = cls.os_roles_lb_member.subnets_client
- cls.mem_lb_client = (
+ cls.mem_lb_client: lbv2.LoadbalancerClient = (
cls.os_roles_lb_member.load_balancer_v2.LoadbalancerClient())
- cls.mem_listener_client = (
+ cls.mem_listener_client: lbv2.ListenerClient = (
cls.os_roles_lb_member.load_balancer_v2.ListenerClient())
- cls.mem_pool_client = (
+ cls.mem_pool_client: lbv2.PoolClient = (
cls.os_roles_lb_member.load_balancer_v2.PoolClient())
- cls.mem_member_client = (
+ cls.mem_member_client: lbv2.MemberClient = (
cls.os_roles_lb_member.load_balancer_v2.MemberClient())
- cls.mem_healthmonitor_client = (
+ cls.mem_healthmonitor_client: lbv2.HealthMonitorClient = (
cls.os_roles_lb_member.load_balancer_v2.HealthMonitorClient())
- cls.mem_l7policy_client = (
+ cls.mem_l7policy_client: lbv2.L7PolicyClient = (
cls.os_roles_lb_member.load_balancer_v2.L7PolicyClient())
- cls.mem_l7rule_client = (
+ cls.mem_l7rule_client: lbv2.L7RuleClient = (
cls.os_roles_lb_member.load_balancer_v2.L7RuleClient())
- cls.lb_admin_amphora_client = lb_admin_prefix.AmphoraClient()
- cls.lb_admin_flavor_profile_client = (
+ cls.lb_admin_amphora_client: lbv2.AmphoraClient = (
+ lb_admin_prefix.AmphoraClient())
+ cls.lb_admin_flavor_profile_client: lbv2.FlavorProfileClient = (
lb_admin_prefix.FlavorProfileClient())
- cls.lb_admin_flavor_client = lb_admin_prefix.FlavorClient()
- cls.mem_flavor_client = (
+ cls.lb_admin_flavor_client: lbv2.FlavorClient = (
+ lb_admin_prefix.FlavorClient())
+ cls.mem_flavor_client: lbv2.FlavorClient = (
cls.os_roles_lb_member.load_balancer_v2.FlavorClient())
- cls.mem_provider_client = (
+ cls.mem_provider_client: lbv2.ProviderClient = (
cls.os_roles_lb_member.load_balancer_v2.ProviderClient())
cls.os_admin_servers_client = cls.os_admin.servers_client
cls.os_admin_routers_client = cls.os_admin.routers_client