Identity trust rbac tests
Add identity trust rbac tests corresponding to the policy actions
in [0].
Most of the policy actions in [0] have a rule of '' (empty string),
meaning any role can perform the action. However, the
"identity:create_trust" policy action has a rule of
base.RULE_TRUST_OWNER which translates to:
user_id:%(trust.trustor_user_id)s [1].
This is a rather unique rule, one that is not dependent on the
current user's user_id, project_id or even role. Rather, this
rule translates to: "Does the current user's user_id match
the user_id of the trustor creating a trust with a trustee?"
As should be expected, "trust.trustor_user_id" can only be
dynamically calculated at runtime, rather than immediately
retrieved from a tempest credential variable, as is the case
with user_id and project_id.
Hence, this patch not only 1) creates trust rbac tests but 2)
enhances the framework, particularly the rbac_rule_validation
decorator, as well as the rbac_policy_parser framework, to
handle additional target data that must be passed to
oslo-policy-checker in order for proper authorization
determination.
The "target" parameter in oslo.policy is a dictionary that
contains "As much information about the object being
operated on as possible" [2]. Accordingly, the
rbac_rule_validation decorator has been enhanced with a new
param called `extra_target_data` that is a dictionary
containing key-value pairs of dynamically calculated
data needed by oslo.policy to correctly determine whether
the "target" has authorization to perform a policy action.
For example,
extra_target_data={
"trust.trustor_user_id": "os.auth_provider.credentials.user_id"
})
means that trust.trustor_user_id equals the primary credential's
user_id. So, if a trustor's user_id equals the primary credential's
user_id, then the policy parser will return True for `is_allowed`.
However, if a trustor's user_id doesn'tequal to the primary credential's
user_id, but rather the alt credential's user_id, say, then `is_allowed`
returns False.
Thus, the only way to do negative testing with test_trusts_rbac is
to explicitly create a negative test, as described in the above
paragraph, which is what this patch does. Normally, negative testing
is baked in, dependent on the `rbac_test_role`, but with more
complicated policy enforcement as described above, this is not
possible.
While it is possible to create a new CONF value called trustor_user_id,
it would require using pre-provisioned credentials, which is something
Patrole doesn't explicitly use.
[0] https://github.com/openstack/keystone/blob/master/keystone/common/policies/trust.py
[1] https://github.com/openstack/keystone/blob/master/keystone/common/policies/base.py
[2] https://docs.openstack.org/developer/oslo.policy/api/oslo_policy.html
Depends-On: Ib82e8b8a0d6c8587fb0b1ce415e751c3ebc3c2f9
Change-Id: I5c00fdb345556066343bdaeb5f008d639a94bc4b
diff --git a/patrole_tempest_plugin/rbac_auth.py b/patrole_tempest_plugin/rbac_auth.py
index 7281969..ae497f3 100644
--- a/patrole_tempest_plugin/rbac_auth.py
+++ b/patrole_tempest_plugin/rbac_auth.py
@@ -27,9 +27,9 @@
class RbacAuthority(object):
- def __init__(self, tenant_id, user_id, service=None):
+ def __init__(self, project_id, user_id, service, extra_target_data):
self.policy_parser = rbac_policy_parser.RbacPolicyParser(
- tenant_id, user_id, service)
+ project_id, user_id, service, extra_target_data=extra_target_data)
def get_permission(self, rule_name, role):
try:
diff --git a/patrole_tempest_plugin/rbac_policy_parser.py b/patrole_tempest_plugin/rbac_policy_parser.py
index 38bed7c..94aa2c7 100644
--- a/patrole_tempest_plugin/rbac_policy_parser.py
+++ b/patrole_tempest_plugin/rbac_policy_parser.py
@@ -40,7 +40,8 @@
each role, whether a given rule is allowed using oslo policy.
"""
- def __init__(self, tenant_id, user_id, service=None, path=None):
+ def __init__(self, project_id, user_id, service=None, path=None,
+ extra_target_data={}):
"""Initialization of Rbac Policy Parser.
Parses a policy file to create a dictionary, mapping policy actions to
@@ -57,7 +58,7 @@
the custom policy file over the default policy implementation is
prioritized.
- :param tenant_id: type uuid
+ :param project_id: type uuid
:param user_id: type uuid
:param service: type string
:param path: type string
@@ -78,8 +79,9 @@
self.path = path or os.path.join('/etc', service, 'policy.json')
self.rules = policy.Rules.load(self._get_policy_data(service),
'default')
- self.tenant_id = tenant_id
+ self.project_id = project_id
self.user_id = user_id
+ self.extra_target_data = extra_target_data
def allowed(self, rule_name, role):
is_admin_context = self._is_admin_context(role)
@@ -165,8 +167,8 @@
"name": role
}
],
- "project_id": self.tenant_id,
- "tenant_id": self.tenant_id,
+ "project_id": self.project_id,
+ "tenant_id": self.project_id,
"user_id": self.user_id
}
}
@@ -200,6 +202,8 @@
"tenant_id": access_data['project_id'],
"network:tenant_id": access_data['project_id'],
"user_id": access_data['user_id']}
+ if self.extra_target_data:
+ target.update(self.extra_target_data)
result = self._try_rule(apply_rule, target, access_data, o)
return result
diff --git a/patrole_tempest_plugin/rbac_rule_validation.py b/patrole_tempest_plugin/rbac_rule_validation.py
index ec63119..6a5ed5e 100644
--- a/patrole_tempest_plugin/rbac_rule_validation.py
+++ b/patrole_tempest_plugin/rbac_rule_validation.py
@@ -29,7 +29,8 @@
LOG = logging.getLogger(__name__)
-def action(service, rule, admin_only=False, expected_error_code=403):
+def action(service, rule, admin_only=False, expected_error_code=403,
+ extra_target_data={}):
"""A decorator which does a policy check and matches it against test run.
A decorator which allows for positive and negative RBAC testing. Given
@@ -66,10 +67,10 @@
caller_ref = None
if args and isinstance(args[0], test.BaseTestCase):
caller_ref = args[0]
- tenant_id = caller_ref.auth_provider.credentials.tenant_id
+ project_id = caller_ref.auth_provider.credentials.project_id
user_id = caller_ref.auth_provider.credentials.user_id
except AttributeError as e:
- msg = ("{0}: tenant_id/user_id not found in "
+ msg = ("{0}: project_id/user_id not found in "
"cls.auth_provider.credentials".format(e))
LOG.error(msg)
raise rbac_exceptions.RbacResourceSetupFailed(msg)
@@ -80,10 +81,11 @@
"check for policy action {0}.".format(rule))
allowed = CONF.rbac.rbac_test_role == 'admin'
else:
- authority = rbac_auth.RbacAuthority(tenant_id, user_id,
- service)
- allowed = authority.get_permission(rule,
- CONF.rbac.rbac_test_role)
+ authority = rbac_auth.RbacAuthority(
+ project_id, user_id, service,
+ _format_extra_target_data(caller_ref, extra_target_data))
+ allowed = authority.get_permission(
+ rule, CONF.rbac.rbac_test_role)
expected_exception, irregular_msg = _get_exception_type(
expected_error_code)
@@ -146,3 +148,34 @@
raise rbac_exceptions.RbacInvalidErrorCode()
return expected_exception, irregular_msg
+
+
+def _format_extra_target_data(test_obj, extra_target_data):
+ """Formats the "extra_target_data" dictionary with correct test data.
+
+ Before being formatted, "extra_target_data" is a dictionary that maps a
+ policy string like "trust.trustor_user_id" to a nested list of BaseTestCase
+ attributes. For example, the attribute list in:
+
+ "trust.trustor_user_id": "os.auth_provider.credentials.user_id"
+
+ is parsed by iteratively calling `getattr` until the value of "user_id"
+ is resolved. The resulting dictionary returns:
+
+ "trust.trustor_user_id": "the user_id of the `primary` credential"
+
+ :param test_obj: type BaseTestCase (tempest base test class)
+ :param extra_target_data: dictionary with unresolved string literals that
+ reference nested BaseTestCase attributes
+ :returns: dictionary with resolved BaseTestCase attributes
+ """
+ attr_value = test_obj
+ formatted_target_data = {}
+
+ for user_attribute, attr_string in extra_target_data.items():
+ attrs = attr_string.split('.')
+ for attr in attrs:
+ attr_value = getattr(attr_value, attr)
+ formatted_target_data[user_attribute] = attr_value
+
+ return formatted_target_data
diff --git a/patrole_tempest_plugin/tests/api/identity/v3/rbac_base.py b/patrole_tempest_plugin/tests/api/identity/v3/rbac_base.py
index a1cdf4c..31533a3 100644
--- a/patrole_tempest_plugin/tests/api/identity/v3/rbac_base.py
+++ b/patrole_tempest_plugin/tests/api/identity/v3/rbac_base.py
@@ -53,6 +53,7 @@
cls.role_assignments_client = cls.os.role_assignments_client
cls.roles_client = cls.os.roles_v3_client
cls.services_client = cls.os.identity_services_v3_client
+ cls.trusts_client = cls.os.trusts_client
cls.users_client = cls.os.users_v3_client
@classmethod
@@ -67,6 +68,7 @@
cls.regions = []
cls.roles = []
cls.services = []
+ cls.trusts = []
cls.users = []
@classmethod
@@ -111,6 +113,10 @@
test_utils.call_and_ignore_notfound_exc(
cls.services_client.delete_service, service['id'])
+ for trust in cls.trusts:
+ test_utils.call_and_ignore_notfound_exc(
+ cls.trusts_client.delete_trust, trust['id'])
+
for user in cls.users:
test_utils.call_and_ignore_notfound_exc(
cls.users_client.delete_user, user['id'])
@@ -229,6 +235,16 @@
return service
@classmethod
+ def setup_test_trust(cls, trustee_user_id, trustor_user_id, **kwargs):
+ """Setup a test trust."""
+ trust = cls.trusts_client.create_trust(
+ trustee_user_id=trustee_user_id, trustor_user_id=trustor_user_id,
+ impersonation=False, **kwargs)['trust']
+ cls.trusts.append(trust)
+
+ return trust
+
+ @classmethod
def setup_test_user(cls, password=None, **kwargs):
"""Set up a test user."""
username = data_utils.rand_name('test_user')
diff --git a/patrole_tempest_plugin/tests/api/identity/v3/test_trusts_rbac.py b/patrole_tempest_plugin/tests/api/identity/v3/test_trusts_rbac.py
new file mode 100644
index 0000000..622b330
--- /dev/null
+++ b/patrole_tempest_plugin/tests/api/identity/v3/test_trusts_rbac.py
@@ -0,0 +1,130 @@
+# Copyright 2017 AT&T Corporation.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest import config
+from tempest.lib import decorators
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+
+from patrole_tempest_plugin import rbac_rule_validation
+from patrole_tempest_plugin.tests.api.identity.v3 import rbac_base
+
+CONF = config.CONF
+
+
+class IdentityTrustV3RbacTest(rbac_base.BaseIdentityV3RbacTest):
+
+ credentials = ['primary', 'admin', 'alt']
+
+ @classmethod
+ def skip_checks(cls):
+ super(IdentityTrustV3RbacTest, cls).skip_checks()
+ if not CONF.identity_feature_enabled.trust:
+ raise cls.skipException(
+ "%s skipped as trust feature isn't enabled" % cls.__name__)
+
+ @classmethod
+ def resource_setup(cls):
+ super(IdentityTrustV3RbacTest, cls).resource_setup()
+ # Use the primary user's credentials for the "trustor_user_id", since
+ # user_id:%(trust.trustor_user_id)s will thereby evaluate to
+ # "primary user's user_id:primary user's user_id" which evaluates to
+ # true.
+ cls.trustor_user_id = cls.auth_provider.credentials.user_id
+ cls.trustor_project_id = cls.auth_provider.credentials.project_id
+ cls.trustee_user_id = cls.setup_test_user()['id']
+
+ # The "unauthorized_user_id" does not have permissions to create a
+ # trust because the user_id in "user_id:%(trust.trustor_user_id)s" (the
+ # policy rule for creating a trust) corresponds to the primary user_id
+ # not the alt user_id.
+ cls.unauthorized_user_id = cls.os_alt.auth_provider.credentials.user_id
+
+ # A role is guaranteed to exist (namely the admin role), because
+ # "trustor_user_id" and "trustor_project_id" are the primary tempest
+ # user and project, respectively.
+ cls.delegated_role_id = cls.roles_client.list_user_roles_on_project(
+ cls.trustor_project_id, cls.trustor_user_id)['roles'][0]['id']
+
+ cls.trust = cls.setup_test_trust(trustor_user_id=cls.trustor_user_id,
+ trustee_user_id=cls.trustee_user_id,
+ project_id=cls.trustor_project_id,
+ roles=[{'id': cls.delegated_role_id}])
+
+ @decorators.idempotent_id('7ab595a7-9b71-45fe-91d8-2793b0292f72')
+ @rbac_rule_validation.action(
+ service="keystone",
+ rule="identity:create_trust",
+ extra_target_data={
+ "trust.trustor_user_id": "os.auth_provider.credentials.user_id"
+ })
+ def test_create_trust(self):
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.setup_test_trust(trustor_user_id=self.trustor_user_id,
+ trustee_user_id=self.trustee_user_id)
+
+ @decorators.idempotent_id('bd72d22a-6e11-4840-bd93-17b382e7f0e0')
+ @test.attr(type=['negative'])
+ @rbac_rule_validation.action(
+ service="keystone",
+ rule="identity:create_trust",
+ extra_target_data={
+ "trust.trustor_user_id": "os_alt.auth_provider.credentials.user_id"
+ })
+ def test_create_trust_negative(self):
+ # Explicit negative test for identity:create_trust policy action.
+ # Assert expected exception is Forbidden and then reraise it.
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ e = self.assertRaises(lib_exc.Forbidden, self.setup_test_trust,
+ trustor_user_id=self.unauthorized_user_id,
+ trustee_user_id=self.trustee_user_id)
+ raise e
+
+ @decorators.idempotent_id('d9a6fd06-08f6-462c-a86c-ce009adf1230')
+ @rbac_rule_validation.action(
+ service="keystone",
+ rule="identity:delete_trust")
+ def test_delete_trust(self):
+ trust = self.setup_test_trust(trustor_user_id=self.trustor_user_id,
+ trustee_user_id=self.trustee_user_id)
+
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.trusts_client.delete_trust(trust['id'])
+
+ @decorators.idempotent_id('f2e32896-bf66-4f4e-89cf-e7fba0ef1f38')
+ @rbac_rule_validation.action(
+ service="keystone",
+ rule="identity:list_trusts")
+ def test_list_trusts(self):
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.trusts_client.list_trusts(
+ trustor_user_id=self.trustor_user_id)['trusts']
+
+ @decorators.idempotent_id('3c9ff92f-a73e-4f9b-8865-e017f38c70f5')
+ @rbac_rule_validation.action(
+ service="keystone",
+ rule="identity:list_roles_for_trust")
+ def test_list_roles_for_trust(self):
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.trusts_client.list_trust_roles(self.trust['id'])['roles']
+
+ @decorators.idempotent_id('3bb4f97b-cecd-4c7d-ad10-b88ee6c5d573')
+ @rbac_rule_validation.action(
+ service="keystone",
+ rule="identity:get_role_for_trust")
+ def test_show_trust_role(self):
+ self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+ self.trusts_client.show_trust_role(
+ self.trust['id'], self.delegated_role_id)['role']
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py b/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py
index 78d8e66..174945e 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py
@@ -33,8 +33,8 @@
self.mock_args = mock.Mock(spec=test.BaseTestCase)
self.mock_args.auth_provider = mock.Mock()
self.mock_args.rbac_utils = mock.Mock()
- self.mock_args.auth_provider.credentials.tenant_id = \
- mock.sentinel.tenant_id
+ self.mock_args.auth_provider.credentials.project_id = \
+ mock.sentinel.project_id
self.mock_args.auth_provider.credentials.user_id = \
mock.sentinel.user_id
@@ -292,8 +292,8 @@
str(e))
mock_rbac_policy_parser.RbacPolicyParser.assert_called_once_with(
- mock.sentinel.tenant_id, mock.sentinel.user_id,
- mock.sentinel.service)
+ mock.sentinel.project_id, mock.sentinel.user_id,
+ mock.sentinel.service, extra_target_data={})
@mock.patch.object(rbac_auth, 'RbacAuthority', autospec=True)
def test_get_exception_type_404(self, mock_auth):