Keystone to Keystone tests

blueprint devstack-plugin

Depends-On: I55b4e727404d910aa9b5a07b49b783799bc5f098
Change-Id: I6d46b18c75f344b626848adf255b3d459b6b238d
diff --git a/keystone_tempest_plugin/services/identity/v3/saml2_client.py b/keystone_tempest_plugin/services/identity/v3/saml2_client.py
index b70a389..ed58892 100644
--- a/keystone_tempest_plugin/services/identity/v3/saml2_client.py
+++ b/keystone_tempest_plugin/services/identity/v3/saml2_client.py
@@ -44,8 +44,8 @@
             headers=self.ECP_SP_EMPTY_REQUEST_HEADERS
         )
 
-    def _prepare_sp_saml2_authn_response(self, saml2_idp_authn_response,
-                                         relay_state):
+    def prepare_sp_saml2_authn_response(self, saml2_idp_authn_response,
+                                        relay_state):
         # Replace the header contents of the Identity Provider response with
         # the relay state initially sent by the Service Provider. The response
         # is a SOAP envelope with the following structure:
@@ -72,10 +72,7 @@
         )
 
     def send_service_provider_saml2_authn_response(
-            self, saml2_idp_authn_response, relay_state, idp_consumer_url):
-
-        self._prepare_sp_saml2_authn_response(
-            saml2_idp_authn_response, relay_state)
+            self, saml2_idp_authn_response, idp_consumer_url):
 
         return self.session.post(
             idp_consumer_url,
diff --git a/keystone_tempest_plugin/tests/scenario/test_federated_authentication.py b/keystone_tempest_plugin/tests/scenario/test_federated_authentication.py
index 7814c0a..89af4ce 100644
--- a/keystone_tempest_plugin/tests/scenario/test_federated_authentication.py
+++ b/keystone_tempest_plugin/tests/scenario/test_federated_authentication.py
@@ -12,8 +12,10 @@
 # License for the specific language governing permissions and limitations
 # under the License.
 
+import json
 from lxml import etree
 from six.moves import http_client
+from six.moves import urllib
 from tempest import config
 from tempest.lib.common.utils import data_utils
 import testtools
@@ -42,16 +44,26 @@
 
     def _setup_settings(self):
         self.idp_id = CONF.fed_scenario.idp_id
+        self.idp_remote_ids = CONF.fed_scenario.idp_remote_ids
         self.idp_url = CONF.fed_scenario.idp_ecp_url
         self.keystone_v3_endpoint = CONF.identity.uri_v3
         self.password = CONF.fed_scenario.idp_password
         self.protocol_id = CONF.fed_scenario.protocol_id
         self.username = CONF.fed_scenario.idp_username
 
+        self.mapping_remote_type = CONF.fed_scenario.mapping_remote_type
+        self.mapping_user_name = CONF.fed_scenario.mapping_user_name
+        self.mapping_group_name = CONF.fed_scenario.mapping_group_name
+        self.mapping_group_domain_name = \
+            CONF.fed_scenario.mapping_group_domain_name
+
+        # NOTE(knikolla): Authentication endpoint for keystone. If not set,
+        # will be autodetected.
+        self.auth_url = None
+
     def _setup_idp(self):
-        remote_ids = CONF.fed_scenario.idp_remote_ids
         idp = self.idps_client.create_identity_provider(
-            self.idp_id, remote_ids=remote_ids, enabled=True)
+            self.idp_id, remote_ids=self.idp_remote_ids, enabled=True)
         self.addCleanup(
             self.keystone_manager.domains_client.delete_domain,
             idp['identity_provider']['domain_id'])
@@ -63,26 +75,21 @@
 
     def _setup_mapping(self):
         self.mapping_id = data_utils.rand_uuid_hex()
-        mapping_remote_type = CONF.fed_scenario.mapping_remote_type
-        mapping_user_name = CONF.fed_scenario.mapping_user_name
-        mapping_group_name = CONF.fed_scenario.mapping_group_name
-        mapping_group_domain_name = CONF.fed_scenario.mapping_group_domain_name
-
         rules = [{
             'local': [
                 {
-                    'user': {'name': mapping_user_name}
+                    'user': {'name': self.mapping_user_name}
                 },
                 {
                     'group': {
-                        'domain': {'name': mapping_group_domain_name},
-                        'name': mapping_group_name
+                        'domain': {'name': self.mapping_group_domain_name},
+                        'name': self.mapping_group_name
                     }
                 }
             ],
             'remote': [
                 {
-                    'type': mapping_remote_type
+                    'type': self.mapping_remote_type
                 }
             ]
         }]
@@ -116,7 +123,7 @@
         self.assertEqual(1, len(l))
         return l[0]
 
-    def _request_unscoped_token(self):
+    def _get_sp_authn_request(self):
         resp = self.saml2_client.send_service_provider_request(
             self.keystone_v3_endpoint, self.idp_id, self.protocol_id)
         self.assertEqual(http_client.OK, resp.status_code)
@@ -140,19 +147,33 @@
         # have the same consumer URL.
         self.assertEqual(sp_consumer_url, idp_consumer_url)
 
-        # Present the identity provider authn response to the service provider.
+        self.saml2_client.prepare_sp_saml2_authn_response(
+            saml2_idp_authn_response, relay_state)
+
+        return saml2_idp_authn_response, sp_consumer_url
+
+    def _request_unscoped_token(self):
+        assertion, sp_url = self._get_sp_authn_request()
+
+        # Present the identity provider authn response to the service provider
         resp = self.saml2_client.send_service_provider_saml2_authn_response(
-            saml2_idp_authn_response, relay_state, idp_consumer_url)
+            assertion, sp_url)
         # Must receive a redirect from service provider to the URL where the
         # unscoped token can be retrieved.
         self.assertIn(resp.status_code,
                       [http_client.FOUND, http_client.SEE_OTHER])
 
+        # If this is K2K, don't follow HTTP specs - after the HTTP 302/303
+        # response don't repeat the call directed to the Location URL. In this
+        # case, this is an indication that SAML2 session is now active and
+        # protected resource can be accessed.
+        # https://opendev.org/openstack/keystoneauth/src/tag/3.17.1/keystoneauth1/identity/v3/k2k.py#L152
+        sp_url = self.auth_url or resp.headers['location']
+
         # We can receive multiple types of errors here, the response depends on
         # the mapping and the username used to authenticate in the Identity
         # Provider and also in the Identity Provider remote ID validation.
         # If everything works well, we receive an unscoped token.
-        sp_url = resp.headers['location']
         resp = (
             self.saml2_client.send_service_provider_unscoped_token_request(
                 sp_url))
@@ -180,3 +201,55 @@
         # Get a scoped token to one of the listed projects
         self.tokens_client.auth(
             project_id=projects[0]['id'], token=token_id)
+
+
+class TestK2KFederatedAuthentication(TestSaml2EcpFederatedAuthentication):
+
+    def setUp(self):
+        super(TestK2KFederatedAuthentication, self).setUp()
+        self._setup_sp()
+
+    def _setup_settings(self):
+        super(TestK2KFederatedAuthentication, self)._setup_settings()
+        self.idp_id = 'keystone'
+        self.idp_remote_ids = [
+            '%s/OS-FEDERATION/saml2/idp' % self.keystone_v3_endpoint]
+
+        self.mapping_remote_type = 'openstack_user'
+
+        self.sp_id = 'keystone'
+        self.auth_url = (
+            '%s/OS-FEDERATION/identity_providers/%s/protocols/%s/auth'
+            ) % (self.keystone_v3_endpoint, self.sp_id, self.protocol_id)
+        url = urllib.parse.urlparse(self.keystone_v3_endpoint)
+        self.sp_url = '%s://%s/Shibboleth.sso/SAML2/ECP' % (url.scheme,
+                                                            url.netloc)
+
+    def _setup_sp(self):
+        self.sps_client.create_service_provider(self.sp_id,
+                                                sp_url=self.sp_url,
+                                                auth_url=self.auth_url,
+                                                enabled=True)
+        self.addCleanup(self.sps_client.delete_service_provider, self.sp_id)
+
+    def _get_sp_authn_request(self):
+        body = {
+            'auth': {
+                'identity': {
+                    'methods': ['token'],
+                    'token': {
+                        'id': self.auth_client.token
+                    }
+                },
+                'scope': {
+                    'service_provider': {
+                        'id': self.sp_id
+                    }
+                }
+            }
+        }
+        resp, saml = self.auth_client.post('auth/OS-FEDERATION/saml2/ecp',
+                                           json.dumps(body))
+        self.auth_client.expected_success(200, resp.status)
+
+        return etree.XML(saml), self.sp_url