Test new HSTS feature

Test HTTP Strict Transport Security with Octavia.

Partial-Bug: #2017972
Change-Id: Ie54714015e943fd1cb75ca95f8cf241fbc99268c
diff --git a/octavia_tempest_plugin/common/constants.py b/octavia_tempest_plugin/common/constants.py
index e3bd83e..733f0f4 100644
--- a/octavia_tempest_plugin/common/constants.py
+++ b/octavia_tempest_plugin/common/constants.py
@@ -72,6 +72,9 @@
 DEFAULT_POOL_ID = 'default_pool_id'
 L7_POLICIES = 'l7_policies'
 ALPN_PROTOCOLS = 'alpn_protocols'
+HSTS_MAX_AGE = 'hsts_max_age'
+HSTS_INCLUDE_SUBDOMAINS = 'hsts_include_subdomains'
+HSTS_PRELOAD = 'hsts_preload'
 
 LB_ALGORITHM = 'lb_algorithm'
 LB_ALGORITHM_ROUND_ROBIN = 'ROUND_ROBIN'
diff --git a/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py b/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py
index 1ee70f7..1c8e6e5 100644
--- a/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py
+++ b/octavia_tempest_plugin/services/load_balancer/v2/listener_client.py
@@ -41,7 +41,8 @@
                         sni_container_refs=Unset, client_authentication=Unset,
                         client_ca_tls_container_ref=Unset,
                         client_crl_container_ref=Unset, allowed_cidrs=Unset,
-                        alpn_protocols=Unset,
+                        alpn_protocols=Unset, hsts_max_age=Unset,
+                        hsts_include_subdomains=Unset, hsts_preload=Unset,
                         return_object_only=True):
         """Create a listener.
 
@@ -92,6 +93,12 @@
         :param allowed_cidrs: A list of IPv4 or IPv6 CIDRs.
         :param alpn_protocols: A list of ALPN protocols for TERMINATED_HTTPS
                                listeners.
+        :param hsts_include_subdomains: Defines whether the
+            `include_subdomains` directive is used for HSTS or not
+        :param hsts_max_age: Enables HTTP Strict Transport Security (HSTS)
+            and sets the `max_age` directive to given value
+        :param hsts_preload: Defines whether the `hsts_preload` directive
+            is used for HSTS or not
         :param return_object_only: If True, the response returns the object
                                    inside the root tag. False returns the full
                                    response from the API.
@@ -218,7 +225,8 @@
                         sni_container_refs=Unset, client_authentication=Unset,
                         client_ca_tls_container_ref=Unset,
                         client_crl_container_ref=Unset, allowed_cidrs=Unset,
-                        alpn_protocols=Unset,
+                        alpn_protocols=Unset, hsts_max_age=Unset,
+                        hsts_include_subdomains=Unset, hsts_preload=Unset,
                         return_object_only=True):
         """Update a listener.
 
@@ -267,6 +275,12 @@
         :param allowed_cidrs: A list of IPv4 or IPv6 CIDRs.
         :param alpn_protocols: A list of ALPN protocols for TERMINATED_HTTPS
                                listeners.
+        :param hsts_include_subdomains: Defines whether the
+            `include_subdomains` directive is used for HSTS or not
+        :param hsts_max_age: Enables HTTP Strict Transport Security (HSTS)
+            and sets the `max_age` directive to given value
+        :param hsts_preload: Defines whether the `hsts_preload` directive
+            is used for HSTS or not
         :param return_object_only: If True, the response returns the object
                                    inside the root tag. False returns the full
                                    response from the API.
diff --git a/octavia_tempest_plugin/tests/api/v2/test_listener.py b/octavia_tempest_plugin/tests/api/v2/test_listener.py
index 8961d78..cd320f4 100644
--- a/octavia_tempest_plugin/tests/api/v2/test_listener.py
+++ b/octavia_tempest_plugin/tests/api/v2/test_listener.py
@@ -297,6 +297,8 @@
 
         listener_name = data_utils.rand_name("lb_member_listener1-create")
         listener_description = data_utils.arbitrary_string(size=255)
+        hsts_supported = self.mem_listener_client.is_version_supported(
+            self.api_version, '2.27') and protocol == const.TERMINATED_HTTPS
 
         listener_kwargs = {
             const.NAME: listener_name,
@@ -351,9 +353,13 @@
                 exceptions.BadRequest,
                 self.mem_listener_client.create_listener,
                 **listener_kwargs)
-
             listener_kwargs.update({const.ALLOWED_CIDRS: self.allowed_cidrs})
 
+        if hsts_supported:
+            listener_kwargs[const.HSTS_PRELOAD] = True
+            listener_kwargs[const.HSTS_MAX_AGE] = 10000
+            listener_kwargs[const.HSTS_INCLUDE_SUBDOMAINS] = True
+
         # Test that a user without the loadbalancer role cannot
         # create a listener.
         expected_allowed = []
@@ -411,6 +417,11 @@
             equal_items.append(const.TIMEOUT_MEMBER_DATA)
             equal_items.append(const.TIMEOUT_TCP_INSPECT)
 
+        if hsts_supported:
+            equal_items.append(const.HSTS_PRELOAD)
+            equal_items.append(const.HSTS_MAX_AGE)
+            equal_items.append(const.HSTS_INCLUDE_SUBDOMAINS)
+
         for item in equal_items:
             self.assertEqual(listener_kwargs[item], listener[item])
 
@@ -1010,6 +1021,11 @@
         if self.mem_listener_client.is_version_supported(
                 self.api_version, '2.12'):
             show_listener_response_fields.append('allowed_cidrs')
+        if self.mem_listener_client.is_version_supported(
+                self.api_version, '2.27'):
+            show_listener_response_fields.append(const.HSTS_PRELOAD)
+            show_listener_response_fields.append(const.HSTS_MAX_AGE)
+            show_listener_response_fields.append(const.HSTS_INCLUDE_SUBDOMAINS)
         for field in show_listener_response_fields:
             if field in (const.DEFAULT_POOL_ID, const.L7_POLICIES):
                 continue
@@ -1142,6 +1158,8 @@
 
         listener_name = data_utils.rand_name("lb_member_listener1-show")
         listener_description = data_utils.arbitrary_string(size=255)
+        hsts_supported = self.mem_listener_client.is_version_supported(
+            self.api_version, '2.27') and protocol == const.TERMINATED_HTTPS
 
         listener_kwargs = {
             const.NAME: listener_name,
@@ -1168,6 +1186,11 @@
                                            self.SNI2_secret_ref],
             })
 
+        if hsts_supported:
+            listener_kwargs[const.HSTS_PRELOAD] = True
+            listener_kwargs[const.HSTS_MAX_AGE] = 10000
+            listener_kwargs[const.HSTS_INCLUDE_SUBDOMAINS] = True
+
         if self.mem_listener_client.is_version_supported(
                 self.api_version, '2.1'):
             listener_kwargs.update({
@@ -1263,6 +1286,11 @@
                 self.api_version, '2.12'):
             self.assertEqual(self.allowed_cidrs, listener[const.ALLOWED_CIDRS])
 
+        if hsts_supported:
+            self.assertTrue(listener[const.HSTS_PRELOAD])
+            self.assertEqual(10000, listener[const.HSTS_MAX_AGE])
+            self.assertTrue(listener[const.HSTS_INCLUDE_SUBDOMAINS])
+
         # Test that the appropriate users can see or not see the listener
         # based on the API RBAC.
         expected_allowed = []
@@ -1340,6 +1368,8 @@
 
         listener_name = data_utils.rand_name("lb_member_listener1-update")
         listener_description = data_utils.arbitrary_string(size=255)
+        hsts_supported = self.mem_listener_client.is_version_supported(
+            self.api_version, '2.27') and protocol == const.TERMINATED_HTTPS
 
         listener_kwargs = {
             const.NAME: listener_name,
@@ -1522,6 +1552,11 @@
                 new_cidrs = ['2001:db8::/64']
             listener_update_kwargs.update({const.ALLOWED_CIDRS: new_cidrs})
 
+        if hsts_supported:
+            listener_update_kwargs[const.HSTS_PRELOAD] = False
+            listener_update_kwargs[const.HSTS_MAX_AGE] = 0
+            listener_update_kwargs[const.HSTS_INCLUDE_SUBDOMAINS] = False
+
         listener = self.mem_listener_client.update_listener(
             listener[const.ID], **listener_update_kwargs)
 
@@ -1587,6 +1622,11 @@
                 expected_cidrs = ['2001:db8::/64']
             self.assertEqual(expected_cidrs, listener[const.ALLOWED_CIDRS])
 
+        if hsts_supported:
+            self.assertFalse(listener[const.HSTS_PRELOAD])
+            self.assertEqual(0, listener[const.HSTS_MAX_AGE])
+            self.assertFalse(listener[const.HSTS_INCLUDE_SUBDOMAINS])
+
     @decorators.idempotent_id('16f11c82-f069-4592-8954-81b35a98e3b7')
     def test_http_listener_delete(self):
         self._test_listener_delete(const.HTTP, 8070)
diff --git a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
index 511c724..2e1464c 100644
--- a/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
+++ b/octavia_tempest_plugin/tests/barbican_scenario/v2/test_tls_barbican.py
@@ -1222,7 +1222,8 @@
 
         self.assertEqual(expected_proto, selected_proto)
 
-    def _test_http_versions_tls_traffic(self, http_version, alpn_protos):
+    def _test_http_versions_tls_traffic(self, http_version, alpn_protos,
+                                        hsts: bool = False):
         if not self.mem_listener_client.is_version_supported(
                 self.api_version, '2.20'):
             raise self.skipException('ALPN protocols are only available on '
@@ -1237,6 +1238,12 @@
             const.DEFAULT_TLS_CONTAINER_REF: self.server_secret_ref,
             const.ALPN_PROTOCOLS: alpn_protos,
         }
+        if self.mem_listener_client.is_version_supported(
+                self.api_version, '2.27'):
+            listener_kwargs[const.HSTS_MAX_AGE] = 100 if hsts else None
+            listener_kwargs[const.HSTS_INCLUDE_SUBDOMAINS] = hsts
+            listener_kwargs[const.HSTS_PRELOAD] = hsts
+
         listener = self.mem_listener_client.create_listener(**listener_kwargs)
         self.listener_id = listener[const.ID]
         self.addCleanup(
@@ -1258,6 +1265,12 @@
         client = httpx.Client(http2=(http_version == 'HTTP/2'), verify=context)
         r = client.get(url)
         self.assertEqual(http_version, r.http_version)
+        if hsts:
+            self.assertIn('strict-transport-security', r.headers)
+            self.assertEqual('max-age=100; includeSubDomains; preload;',
+                             r.headers['strict-transport-security'])
+        else:
+            self.assertNotIn('strict-transport-security', r.headers)
 
     @decorators.idempotent_id('9965828d-24af-4fa0-91ae-21c6bc47ab4c')
     def test_http_2_tls_traffic(self):
@@ -1268,6 +1281,15 @@
         self._test_http_versions_tls_traffic(
             'HTTP/1.1', ['http/1.1', 'http/1.0'])
 
+    @decorators.idempotent_id('7436c6b7-44be-4544-a40b-31d2b7b2ad0b')
+    def test_http_1_1_tls_hsts_traffic(self):
+        if not self.mem_listener_client.is_version_supported(
+                self.api_version, '2.27'):
+            raise self.skipException('HSTS is only available on '
+                                     'Octavia API version 2.27 or newer.')
+        self._test_http_versions_tls_traffic(
+            'HTTP/1.1', ['http/1.1', 'http/1.0'], hsts=True)
+
     @decorators.idempotent_id('ee0faf71-d11e-4323-8673-e5e15779749b')
     def test_pool_reencryption(self):
         if not self.mem_listener_client.is_version_supported(
diff --git a/octavia_tempest_plugin/tests/test_base.py b/octavia_tempest_plugin/tests/test_base.py
index f08cec9..51834fe 100644
--- a/octavia_tempest_plugin/tests/test_base.py
+++ b/octavia_tempest_plugin/tests/test_base.py
@@ -34,6 +34,7 @@
 
 from octavia_tempest_plugin.common import cert_utils
 from octavia_tempest_plugin.common import constants as const
+import octavia_tempest_plugin.services.load_balancer.v2 as lbv2
 from octavia_tempest_plugin.tests import RBAC_tests
 from octavia_tempest_plugin.tests import validators
 from octavia_tempest_plugin.tests import waiters
@@ -182,27 +183,29 @@
             cls.os_roles_lb_member.security_group_rules_client)
         cls.lb_mem_servers_client = cls.os_roles_lb_member.servers_client
         cls.lb_mem_subnet_client = cls.os_roles_lb_member.subnets_client
-        cls.mem_lb_client = (
+        cls.mem_lb_client: lbv2.LoadbalancerClient = (
             cls.os_roles_lb_member.load_balancer_v2.LoadbalancerClient())
-        cls.mem_listener_client = (
+        cls.mem_listener_client: lbv2.ListenerClient = (
             cls.os_roles_lb_member.load_balancer_v2.ListenerClient())
-        cls.mem_pool_client = (
+        cls.mem_pool_client: lbv2.PoolClient = (
             cls.os_roles_lb_member.load_balancer_v2.PoolClient())
-        cls.mem_member_client = (
+        cls.mem_member_client: lbv2.MemberClient = (
             cls.os_roles_lb_member.load_balancer_v2.MemberClient())
-        cls.mem_healthmonitor_client = (
+        cls.mem_healthmonitor_client: lbv2.HealthMonitorClient = (
             cls.os_roles_lb_member.load_balancer_v2.HealthMonitorClient())
-        cls.mem_l7policy_client = (
+        cls.mem_l7policy_client: lbv2.L7PolicyClient = (
             cls.os_roles_lb_member.load_balancer_v2.L7PolicyClient())
-        cls.mem_l7rule_client = (
+        cls.mem_l7rule_client: lbv2.L7RuleClient = (
             cls.os_roles_lb_member.load_balancer_v2.L7RuleClient())
-        cls.lb_admin_amphora_client = lb_admin_prefix.AmphoraClient()
-        cls.lb_admin_flavor_profile_client = (
+        cls.lb_admin_amphora_client: lbv2.AmphoraClient = (
+            lb_admin_prefix.AmphoraClient())
+        cls.lb_admin_flavor_profile_client: lbv2.FlavorProfileClient = (
             lb_admin_prefix.FlavorProfileClient())
-        cls.lb_admin_flavor_client = lb_admin_prefix.FlavorClient()
-        cls.mem_flavor_client = (
+        cls.lb_admin_flavor_client: lbv2.FlavorClient = (
+            lb_admin_prefix.FlavorClient())
+        cls.mem_flavor_client: lbv2.FlavorClient = (
             cls.os_roles_lb_member.load_balancer_v2.FlavorClient())
-        cls.mem_provider_client = (
+        cls.mem_provider_client: lbv2.ProviderClient = (
             cls.os_roles_lb_member.load_balancer_v2.ProviderClient())
         cls.os_admin_servers_client = cls.os_admin.servers_client
         cls.os_admin_routers_client = cls.os_admin.routers_client