Add protocols integration tests
This patch adds the tests related to protocols/mappings
in the Identity Provider API (part of the Federated
Identity API)
Change-Id: I5e2573a175edbaf6f7a1bb73f3e0a86deeb94f1d
diff --git a/keystone_tempest_plugin/services/identity/v3/identity_providers_client.py b/keystone_tempest_plugin/services/identity/v3/identity_providers_client.py
index 3f2544f..f5c2e44 100644
--- a/keystone_tempest_plugin/services/identity/v3/identity_providers_client.py
+++ b/keystone_tempest_plugin/services/identity/v3/identity_providers_client.py
@@ -68,3 +68,48 @@
self.expected_success(200, resp.status)
body = json.loads(body)
return rest_client.ResponseBody(resp, body)
+
+ def add_protocol_and_mapping(self, idp_id, protocol_id, mapping_id):
+ """Add a protocol and mapping to an identity provider."""
+ put_body = json.dumps({'protocol': {'mapping_id': mapping_id}})
+ url = '%s/%s/%s' % (
+ self._build_path(entity_id=idp_id), 'protocols', protocol_id)
+ resp, body = self.put(url, put_body)
+ self.expected_success(201, resp.status)
+ body = json.loads(body)
+ return rest_client.ResponseBody(resp, body)
+
+ def delete_protocol_and_mapping(self, idp_id, protocol_id):
+ """Delete a protocol and mapping from an identity provider."""
+ url = '%s/%s/%s' % (
+ self._build_path(entity_id=idp_id), 'protocols', protocol_id)
+ resp, body = self.delete(url)
+ self.expected_success(204, resp.status)
+ return rest_client.ResponseBody(resp, body)
+
+ def get_protocol_and_mapping(self, idp_id, protocol_id):
+ """Get a protocol and mapping from an identity provider."""
+ url = '%s/%s/%s' % (
+ self._build_path(entity_id=idp_id), 'protocols', protocol_id)
+ resp, body = self.get(url)
+ self.expected_success(200, resp.status)
+ body = json.loads(body)
+ return rest_client.ResponseBody(resp, body)
+
+ def list_protocols_and_mappings(self, idp_id):
+ """List the protocols and mappings from an identity provider."""
+ url = '%s/%s' % (self._build_path(entity_id=idp_id), 'protocols')
+ resp, body = self.get(url)
+ self.expected_success(200, resp.status)
+ body = json.loads(body)
+ return rest_client.ResponseBody(resp, body)
+
+ def update_protocol_mapping(self, idp_id, protocol_id, mapping_id):
+ """Update the identity provider protocol with a new mapping."""
+ patch_body = json.dumps({'protocol': {'mapping_id': mapping_id}})
+ url = '%s/%s/%s' % (
+ self._build_path(entity_id=idp_id), 'protocols', protocol_id)
+ resp, body = self.patch(url, patch_body)
+ self.expected_success(200, resp.status)
+ body = json.loads(body)
+ return rest_client.ResponseBody(resp, body)
diff --git a/keystone_tempest_plugin/tests/api/identity/v3/test_identity_providers.py b/keystone_tempest_plugin/tests/api/identity/v3/test_identity_providers.py
index 5e1e2cf..fbc14c6 100644
--- a/keystone_tempest_plugin/tests/api/identity/v3/test_identity_providers.py
+++ b/keystone_tempest_plugin/tests/api/identity/v3/test_identity_providers.py
@@ -114,3 +114,121 @@
# The identity provider should be disabled
self.assertFalse(idp['enabled'])
+
+ def _assert_protocol_attributes(self, protocol, protocol_id,
+ mapping_id=None):
+ self.assertIn('id', protocol)
+ self.assertEqual(protocol_id, protocol['id'])
+
+ self.assertIn('mapping_id', protocol)
+ if mapping_id:
+ self.assertEqual(mapping_id, protocol['mapping_id'])
+
+ def _create_identity_provider_and_mapping(self):
+ # Create an identity provider
+ idp_id = data_utils.rand_uuid_hex()
+ self._create_idp(idp_id, fixtures.idp_ref(enabled=True))
+
+ # Create a mapping rule
+ mapping_id = data_utils.rand_uuid_hex()
+ self.mappings_client.create_mapping_rule(
+ mapping_id, fixtures.mapping_ref())
+ self.addCleanup(self.mappings_client.delete_mapping_rule, mapping_id)
+
+ return idp_id, mapping_id
+
+ def _create_protocol(self, idp_id, protocol_id, mapping_id):
+ protocol = self.idps_client.add_protocol_and_mapping(
+ idp_id, protocol_id, mapping_id)['protocol']
+ self.addCleanup(
+ self.idps_client.delete_protocol_and_mapping, idp_id, protocol_id)
+ return protocol
+
+ @decorators.idempotent_id('f5bdf482-1ad3-4aad-b52e-8fe7c1361104')
+ def test_add_protocol_to_identity_provider(self):
+ idp_id, mapping_id = self._create_identity_provider_and_mapping()
+
+ # Now we try to add a protocol to the identity provider
+ protocol_id = data_utils.rand_uuid_hex()
+ protocol = self._create_protocol(idp_id, protocol_id, mapping_id)
+
+ self._assert_protocol_attributes(protocol, protocol_id, mapping_id)
+
+ @decorators.idempotent_id('613d073b-0db7-41aa-b4df-5a1848cde0b3')
+ def test_get_protocol_from_identity_provider(self):
+ idp_id, mapping_id = self._create_identity_provider_and_mapping()
+
+ # Add a protocol to the identity provider
+ protocol_id = data_utils.rand_uuid_hex()
+ self._create_protocol(idp_id, protocol_id, mapping_id)
+
+ # Try to get the protocol
+ protocol = self.idps_client.get_protocol_and_mapping(
+ idp_id, protocol_id)['protocol']
+ self._assert_protocol_attributes(protocol, protocol_id, mapping_id)
+
+ @decorators.idempotent_id('6e6501ac-edae-4dc2-bb68-7a2b58602383')
+ def test_list_protocols_from_identity_provider(self):
+ idp_id, mapping_id = self._create_identity_provider_and_mapping()
+
+ protocol_ids = []
+ for _ in range(3):
+ protocol_id = data_utils.rand_uuid_hex()
+ self._create_protocol(idp_id, protocol_id, mapping_id)
+ protocol_ids.append(protocol_id)
+
+ protocols_list = self.idps_client.list_protocols_and_mappings(idp_id)[
+ 'protocols']
+ fetched_ids = [fetched['id'] for fetched in protocols_list]
+
+ for protocol_id in protocol_ids:
+ self.assertIn(protocol_id, fetched_ids)
+
+ @decorators.idempotent_id('680d36df-d6b9-4695-be29-6fc2065c5dde')
+ def test_update_mapping_from_identity_provider_protocol(self):
+ idp_id, mapping_id = self._create_identity_provider_and_mapping()
+
+ # Add a protocol to the identity provider
+ protocol_id = data_utils.rand_uuid_hex()
+ protocol = self._create_protocol(idp_id, protocol_id, mapping_id)
+
+ # Create another mapping
+ new_mapping_id = data_utils.rand_uuid_hex()
+ self.mappings_client.create_mapping_rule(
+ new_mapping_id, fixtures.mapping_ref())
+
+ # Update the identity provider protocol
+ protocol = self.idps_client.update_protocol_mapping(
+ idp_id, protocol_id, new_mapping_id)['protocol']
+
+ self._assert_protocol_attributes(protocol, protocol_id, new_mapping_id)
+
+ @decorators.idempotent_id('04fdf262-af91-4a68-a1cf-794c6d2f2eeb')
+ def test_add_protocol_to_identity_provider_unknown_mapping(self):
+ # Create an identity provider
+ idp_id = data_utils.rand_uuid_hex()
+ self._create_idp(idp_id, fixtures.idp_ref(enabled=True))
+
+ # Now we try to add a protocol to the identity provider using
+ # a non existent mapping ID
+ mapping_id = data_utils.rand_uuid_hex()
+ protocol_id = data_utils.rand_uuid_hex()
+ protocol = self._create_protocol(idp_id, protocol_id, mapping_id)
+
+ self._assert_protocol_attributes(protocol, protocol_id, mapping_id)
+
+ @decorators.idempotent_id('c73311e7-c207-4c11-998f-532a91f1b0d1')
+ def test_update_protocol_from_identity_provider_unknown_mapping(self):
+ idp_id, mapping_id = self._create_identity_provider_and_mapping()
+
+ # Add a protocol to the identity provider
+ protocol_id = data_utils.rand_uuid_hex()
+ self._create_protocol(idp_id, protocol_id, mapping_id)
+
+ # Update the identity provider protocol using a non existent
+ # mapping_id
+ new_mapping_id = data_utils.rand_uuid_hex()
+ protocol = self.idps_client.update_protocol_mapping(
+ idp_id, protocol_id, new_mapping_id)['protocol']
+
+ self._assert_protocol_attributes(protocol, protocol_id, new_mapping_id)