Add tests for keystone OS-TRUST v3 API
Add tests providing coverage of the keystone v3 OS-TRUSTS extension,
as documented at:
https://github.com/openstack/identity-api/blob/master/\
openstack-identity-api/v3/src/markdown/identity-api-v3-os-trust-ext.md
This covers all the documented interfaces to create/delete/list/get
trusts and roles delegated by trusts, except those impacted by bug
1246383 and 1245590, which will be submitted separately.
A subsequent patch will add coverage for consuming trusts via the tokens API.
Note only the JSON API is currently tested due to bug #1246941
Partially-Implements: blueprint keystone-trust-api
Change-Id: I310951f070107f3af33b52fcd2c8e54d82654088
diff --git a/tempest/api/identity/admin/v3/test_trusts.py b/tempest/api/identity/admin/v3/test_trusts.py
new file mode 100644
index 0000000..5e13a5a
--- /dev/null
+++ b/tempest/api/identity/admin/v3/test_trusts.py
@@ -0,0 +1,222 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.api.identity import base
+from tempest import clients
+from tempest.common.utils.data_utils import rand_name
+from tempest import exceptions
+from tempest.test import attr
+
+
+class BaseTrustsV3Test(base.BaseIdentityAdminTest):
+
+ def setUp(self):
+ super(BaseTrustsV3Test, self).setUp()
+ # Use alt_username as the trustee
+ self.trustee_username = self.config.identity.alt_username
+
+ self.trust_id = None
+ self.create_trustor_and_roles()
+ self.addCleanup(self.cleanup_trust_user_and_roles)
+
+ def create_trustor_and_roles(self):
+ # Get trustor project ID, use the admin project
+ self.trustor_project_name = self.v3_client.tenant_name
+ self.trustor_project_id = self.get_tenant_by_name(
+ self.trustor_project_name)['id']
+ self.assertIsNotNone(self.trustor_project_id)
+
+ # Create a trustor User
+ self.trustor_username = rand_name('user-')
+ u_desc = self.trustor_username + 'description'
+ u_email = self.trustor_username + '@testmail.tm'
+ self.trustor_password = rand_name('pass-')
+ resp, user = self.v3_client.create_user(
+ self.trustor_username,
+ description=u_desc,
+ password=self.trustor_password,
+ email=u_email,
+ project_id=self.trustor_project_id)
+ self.assertEqual(resp['status'], '201')
+ self.trustor_user_id = user['id']
+
+ # And two roles, one we'll delegate and one we won't
+ self.delegated_role = rand_name('DelegatedRole-')
+ self.not_delegated_role = rand_name('NotDelegatedRole-')
+
+ resp, role = self.v3_client.create_role(self.delegated_role)
+ self.assertEqual(resp['status'], '201')
+ self.delegated_role_id = role['id']
+
+ resp, role = self.v3_client.create_role(self.not_delegated_role)
+ self.assertEqual(resp['status'], '201')
+ self.not_delegated_role_id = role['id']
+
+ # Assign roles to trustor
+ self.v3_client.assign_user_role(self.trustor_project_id,
+ self.trustor_user_id,
+ self.delegated_role_id)
+ self.v3_client.assign_user_role(self.trustor_project_id,
+ self.trustor_user_id,
+ self.not_delegated_role_id)
+
+ # Get trustee user ID, use the demo user
+ trustee_username = self.v3_non_admin_client.user
+ self.trustee_user_id = self.get_user_by_name(trustee_username)['id']
+ self.assertIsNotNone(self.trustee_user_id)
+
+ # Initialize a new client with the trustor credentials
+ os = clients.Manager(username=self.trustor_username,
+ password=self.trustor_password,
+ tenant_name=self.trustor_project_name,
+ interface=self._interface)
+ self.trustor_v3_client = os.identity_v3_client
+
+ def cleanup_trust_user_and_roles(self):
+ if self.trust_id:
+ try:
+ self.trustor_v3_client.delete_trust(self.trust_id)
+ except exceptions.NotFound:
+ pass
+ self.trust_id = None
+
+ if self.trustor_user_id:
+ self.v3_client.delete_user(self.trustor_user_id)
+ if self.delegated_role_id:
+ self.v3_client.delete_role(self.delegated_role_id)
+ if self.not_delegated_role_id:
+ self.v3_client.delete_role(self.not_delegated_role_id)
+
+ def create_trust(self, impersonate=True, expires=None):
+
+ resp, trust_create = self.trustor_v3_client.create_trust(
+ trustor_user_id=self.trustor_user_id,
+ trustee_user_id=self.trustee_user_id,
+ project_id=self.trustor_project_id,
+ role_names=[self.delegated_role],
+ impersonation=impersonate,
+ expires_at=expires)
+ self.assertEqual('201', resp['status'])
+ self.trust_id = trust_create['id']
+ return trust_create
+
+ def validate_trust(self, trust, impersonate=True, expires=None,
+ summary=False):
+ self.assertIsNotNone(trust['id'])
+ self.assertEqual(impersonate, trust['impersonation'])
+ self.assertEqual(expires, trust['expires_at'])
+ self.assertEqual(self.trustor_user_id, trust['trustor_user_id'])
+ self.assertEqual(self.trustee_user_id, trust['trustee_user_id'])
+ self.assertIn('v3/OS-TRUST/trusts', trust['links']['self'])
+ self.assertEqual(self.trustor_project_id, trust['project_id'])
+ if not summary:
+ self.assertEqual(self.delegated_role, trust['roles'][0]['name'])
+ self.assertEqual(1, len(trust['roles']))
+
+ def get_trust(self):
+ resp, trust_get = self.trustor_v3_client.get_trust(self.trust_id)
+ self.assertEqual('200', resp['status'])
+ return trust_get
+
+ def validate_role(self, role):
+ self.assertEqual(self.delegated_role_id, role['id'])
+ self.assertEqual(self.delegated_role, role['name'])
+ self.assertIn('v3/roles/%s' % self.delegated_role_id,
+ role['links']['self'])
+ self.assertNotEqual(self.not_delegated_role_id, role['id'])
+ self.assertNotEqual(self.not_delegated_role, role['name'])
+ self.assertNotIn('v3/roles/%s' % self.not_delegated_role_id,
+ role['links']['self'])
+
+ def check_trust_roles(self):
+ # Check we find the delegated role
+ resp, roles_get = self.trustor_v3_client.get_trust_roles(
+ self.trust_id)
+ self.assertEqual('200', resp['status'])
+ self.assertEqual(1, len(roles_get))
+ self.validate_role(roles_get[0])
+
+ resp, role_get = self.trustor_v3_client.get_trust_role(
+ self.trust_id, self.delegated_role_id)
+ self.assertEqual('200', resp['status'])
+ self.validate_role(role_get)
+
+ resp, role_get = self.trustor_v3_client.check_trust_role(
+ self.trust_id, self.delegated_role_id)
+ self.assertEqual('204', resp['status'])
+
+ # And that we don't find not_delegated_role
+ self.assertRaises(exceptions.NotFound,
+ self.trustor_v3_client.get_trust_role,
+ self.trust_id,
+ self.not_delegated_role_id)
+
+ self.assertRaises(exceptions.NotFound,
+ self.trustor_v3_client.check_trust_role,
+ self.trust_id,
+ self.not_delegated_role_id)
+
+ def delete_trust(self):
+ resp, trust_delete = self.trustor_v3_client.delete_trust(self.trust_id)
+ self.assertEqual('204', resp['status'])
+ self.assertRaises(exceptions.NotFound,
+ self.trustor_v3_client.get_trust,
+ self.trust_id)
+ self.trust_id = None
+
+
+class TrustsV3TestJSON(BaseTrustsV3Test):
+ _interface = 'json'
+
+ def setUp(self):
+ super(TrustsV3TestJSON, self).setUp()
+ self.create_trustor_and_roles()
+
+ @attr(type='smoke')
+ def test_trust_impersonate(self):
+ # Test case to check we can create, get and delete a trust
+ # updates are not supported for trusts
+ trust = self.create_trust()
+ self.validate_trust(trust)
+
+ trust_get = self.get_trust()
+ self.validate_trust(trust_get)
+
+ self.check_trust_roles()
+
+ self.delete_trust()
+
+ @attr(type='smoke')
+ def test_trust_noimpersonate(self):
+ # Test case to check we can create, get and delete a trust
+ # with impersonation=False
+ trust = self.create_trust(impersonate=False)
+ self.validate_trust(trust, impersonate=False)
+
+ trust_get = self.get_trust()
+ self.validate_trust(trust_get, impersonate=False)
+
+ self.check_trust_roles()
+
+ self.delete_trust()
+
+ @attr(type='smoke')
+ def test_trust_expire_invalid(self):
+ # Test case to check we can check an invlaid expiry time
+ # is rejected with the correct error
+ # with an expiry specified
+ expires_str = 'bad.123Z'
+ self.assertRaises(exceptions.BadRequest,
+ self.create_trust,
+ expires=expires_str)
diff --git a/tempest/services/identity/v3/json/identity_client.py b/tempest/services/identity/v3/json/identity_client.py
index ec99d37..e457c1f 100644
--- a/tempest/services/identity/v3/json/identity_client.py
+++ b/tempest/services/identity/v3/json/identity_client.py
@@ -359,6 +359,66 @@
(domain_id, group_id, role_id))
return resp, body
+ def create_trust(self, trustor_user_id, trustee_user_id, project_id,
+ role_names, impersonation, expires_at):
+ """Creates a trust."""
+ roles = [{'name': n} for n in role_names]
+ post_body = {
+ 'trustor_user_id': trustor_user_id,
+ 'trustee_user_id': trustee_user_id,
+ 'project_id': project_id,
+ 'impersonation': impersonation,
+ 'roles': roles,
+ 'expires_at': expires_at
+ }
+ post_body = json.dumps({'trust': post_body})
+ resp, body = self.post('OS-TRUST/trusts', post_body, self.headers)
+ body = json.loads(body)
+ return resp, body['trust']
+
+ def delete_trust(self, trust_id):
+ """Deletes a trust."""
+ resp, body = self.delete("OS-TRUST/trusts/%s" % trust_id)
+ return resp, body
+
+ def get_trusts(self, trustor_user_id=None, trustee_user_id=None):
+ """GET trusts."""
+ if trustor_user_id:
+ resp, body = self.get("OS-TRUST/trusts?trustor_user_id=%s"
+ % trustor_user_id)
+ elif trustee_user_id:
+ resp, body = self.get("OS-TRUST/trusts?trustee_user_id=%s"
+ % trustee_user_id)
+ else:
+ resp, body = self.get("OS-TRUST/trusts")
+ body = json.loads(body)
+ return resp, body['trusts']
+
+ def get_trust(self, trust_id):
+ """GET trust."""
+ resp, body = self.get("OS-TRUST/trusts/%s" % trust_id)
+ body = json.loads(body)
+ return resp, body['trust']
+
+ def get_trust_roles(self, trust_id):
+ """GET roles delegated by a trust."""
+ resp, body = self.get("OS-TRUST/trusts/%s/roles" % trust_id)
+ body = json.loads(body)
+ return resp, body['roles']
+
+ def get_trust_role(self, trust_id, role_id):
+ """GET role delegated by a trust."""
+ resp, body = self.get("OS-TRUST/trusts/%s/roles/%s"
+ % (trust_id, role_id))
+ body = json.loads(body)
+ return resp, body['role']
+
+ def check_trust_role(self, trust_id, role_id):
+ """HEAD Check if role is delegated by a trust."""
+ resp, body = self.head("OS-TRUST/trusts/%s/roles/%s"
+ % (trust_id, role_id))
+ return resp, body
+
class V3TokenClientJSON(RestClient):