Identity trust rbac tests

Add identity trust rbac tests corresponding to the policy actions
in [0].

Most of the policy actions in [0] have a rule of '' (empty string),
meaning any role can perform the action. However, the
"identity:create_trust" policy action has a rule of
base.RULE_TRUST_OWNER which translates to:

  user_id:%(trust.trustor_user_id)s [1].

This is a rather unique rule, one that is not dependent on the
current user's user_id, project_id or even role. Rather, this
rule translates to: "Does the current user's user_id match
the user_id of the trustor creating a trust with a trustee?"

As should be expected, "trust.trustor_user_id" can only be
dynamically calculated at runtime, rather than immediately
retrieved from a tempest credential variable, as is the case
with user_id and project_id.

Hence, this patch not only 1) creates trust rbac tests but 2)
enhances the framework, particularly the rbac_rule_validation
decorator, as well as the rbac_policy_parser framework, to
handle additional target data that must be passed to
oslo-policy-checker in order for proper authorization
determination.

The "target" parameter in oslo.policy is a dictionary that
contains "As much information about the object being
operated on as possible" [2]. Accordingly, the
rbac_rule_validation decorator has been enhanced with a new
param called `extra_target_data` that is a dictionary
containing key-value pairs of dynamically calculated
data needed by oslo.policy to correctly determine whether
the "target" has authorization to perform a policy action.

For example,

    extra_target_data={
        "trust.trustor_user_id": "os.auth_provider.credentials.user_id"
    })

means that trust.trustor_user_id equals the primary credential's
user_id. So, if a trustor's user_id equals the primary credential's
user_id, then the policy parser will return True for `is_allowed`.
However, if a trustor's user_id doesn'tequal to the primary credential's
user_id, but rather the alt credential's user_id, say, then `is_allowed`
returns False.

Thus, the only way to do negative testing with test_trusts_rbac is
to explicitly create a negative test, as described in the above
paragraph, which is what this patch does. Normally, negative testing
is baked in, dependent on the `rbac_test_role`, but with more
complicated policy enforcement as described above, this is not
possible.

While it is possible to create a new CONF value called trustor_user_id,
it would require using pre-provisioned credentials, which is something
Patrole doesn't explicitly use.

[0] https://github.com/openstack/keystone/blob/master/keystone/common/policies/trust.py
[1] https://github.com/openstack/keystone/blob/master/keystone/common/policies/base.py
[2] https://docs.openstack.org/developer/oslo.policy/api/oslo_policy.html

Depends-On: Ib82e8b8a0d6c8587fb0b1ce415e751c3ebc3c2f9
Change-Id: I5c00fdb345556066343bdaeb5f008d639a94bc4b
diff --git a/patrole_tempest_plugin/rbac_auth.py b/patrole_tempest_plugin/rbac_auth.py
index 7281969..ae497f3 100644
--- a/patrole_tempest_plugin/rbac_auth.py
+++ b/patrole_tempest_plugin/rbac_auth.py
@@ -27,9 +27,9 @@
 
 
 class RbacAuthority(object):
-    def __init__(self, tenant_id, user_id, service=None):
+    def __init__(self, project_id, user_id, service, extra_target_data):
         self.policy_parser = rbac_policy_parser.RbacPolicyParser(
-            tenant_id, user_id, service)
+            project_id, user_id, service, extra_target_data=extra_target_data)
 
     def get_permission(self, rule_name, role):
         try:
diff --git a/patrole_tempest_plugin/rbac_policy_parser.py b/patrole_tempest_plugin/rbac_policy_parser.py
index 38bed7c..94aa2c7 100644
--- a/patrole_tempest_plugin/rbac_policy_parser.py
+++ b/patrole_tempest_plugin/rbac_policy_parser.py
@@ -40,7 +40,8 @@
     each role, whether a given rule is allowed using oslo policy.
     """
 
-    def __init__(self, tenant_id, user_id, service=None, path=None):
+    def __init__(self, project_id, user_id, service=None, path=None,
+                 extra_target_data={}):
         """Initialization of Rbac Policy Parser.
 
         Parses a policy file to create a dictionary, mapping policy actions to
@@ -57,7 +58,7 @@
         the custom policy file over the default policy implementation is
         prioritized.
 
-        :param tenant_id: type uuid
+        :param project_id: type uuid
         :param user_id: type uuid
         :param service: type string
         :param path: type string
@@ -78,8 +79,9 @@
         self.path = path or os.path.join('/etc', service, 'policy.json')
         self.rules = policy.Rules.load(self._get_policy_data(service),
                                        'default')
-        self.tenant_id = tenant_id
+        self.project_id = project_id
         self.user_id = user_id
+        self.extra_target_data = extra_target_data
 
     def allowed(self, rule_name, role):
         is_admin_context = self._is_admin_context(role)
@@ -165,8 +167,8 @@
                         "name": role
                     }
                 ],
-                "project_id": self.tenant_id,
-                "tenant_id": self.tenant_id,
+                "project_id": self.project_id,
+                "tenant_id": self.project_id,
                 "user_id": self.user_id
             }
         }
@@ -200,6 +202,8 @@
                   "tenant_id": access_data['project_id'],
                   "network:tenant_id": access_data['project_id'],
                   "user_id": access_data['user_id']}
+        if self.extra_target_data:
+            target.update(self.extra_target_data)
 
         result = self._try_rule(apply_rule, target, access_data, o)
         return result
diff --git a/patrole_tempest_plugin/rbac_rule_validation.py b/patrole_tempest_plugin/rbac_rule_validation.py
index ec63119..6a5ed5e 100644
--- a/patrole_tempest_plugin/rbac_rule_validation.py
+++ b/patrole_tempest_plugin/rbac_rule_validation.py
@@ -29,7 +29,8 @@
 LOG = logging.getLogger(__name__)
 
 
-def action(service, rule, admin_only=False, expected_error_code=403):
+def action(service, rule, admin_only=False, expected_error_code=403,
+           extra_target_data={}):
     """A decorator which does a policy check and matches it against test run.
 
     A decorator which allows for positive and negative RBAC testing. Given
@@ -66,10 +67,10 @@
                 caller_ref = None
                 if args and isinstance(args[0], test.BaseTestCase):
                     caller_ref = args[0]
-                tenant_id = caller_ref.auth_provider.credentials.tenant_id
+                project_id = caller_ref.auth_provider.credentials.project_id
                 user_id = caller_ref.auth_provider.credentials.user_id
             except AttributeError as e:
-                msg = ("{0}: tenant_id/user_id not found in "
+                msg = ("{0}: project_id/user_id not found in "
                        "cls.auth_provider.credentials".format(e))
                 LOG.error(msg)
                 raise rbac_exceptions.RbacResourceSetupFailed(msg)
@@ -80,10 +81,11 @@
                          "check for policy action {0}.".format(rule))
                 allowed = CONF.rbac.rbac_test_role == 'admin'
             else:
-                authority = rbac_auth.RbacAuthority(tenant_id, user_id,
-                                                    service)
-                allowed = authority.get_permission(rule,
-                                                   CONF.rbac.rbac_test_role)
+                authority = rbac_auth.RbacAuthority(
+                    project_id, user_id, service,
+                    _format_extra_target_data(caller_ref, extra_target_data))
+                allowed = authority.get_permission(
+                    rule, CONF.rbac.rbac_test_role)
 
             expected_exception, irregular_msg = _get_exception_type(
                 expected_error_code)
@@ -146,3 +148,34 @@
         raise rbac_exceptions.RbacInvalidErrorCode()
 
     return expected_exception, irregular_msg
+
+
+def _format_extra_target_data(test_obj, extra_target_data):
+    """Formats the "extra_target_data" dictionary with correct test data.
+
+    Before being formatted, "extra_target_data" is a dictionary that maps a
+    policy string like "trust.trustor_user_id" to a nested list of BaseTestCase
+    attributes. For example, the attribute list in:
+
+        "trust.trustor_user_id": "os.auth_provider.credentials.user_id"
+
+    is parsed by iteratively calling `getattr` until the value of "user_id"
+    is resolved. The resulting dictionary returns:
+
+        "trust.trustor_user_id": "the user_id of the `primary` credential"
+
+    :param test_obj: type BaseTestCase (tempest base test class)
+    :param extra_target_data: dictionary with unresolved string literals that
+                              reference nested BaseTestCase attributes
+    :returns: dictionary with resolved BaseTestCase attributes
+    """
+    attr_value = test_obj
+    formatted_target_data = {}
+
+    for user_attribute, attr_string in extra_target_data.items():
+        attrs = attr_string.split('.')
+        for attr in attrs:
+            attr_value = getattr(attr_value, attr)
+        formatted_target_data[user_attribute] = attr_value
+
+    return formatted_target_data
diff --git a/patrole_tempest_plugin/tests/api/identity/v3/rbac_base.py b/patrole_tempest_plugin/tests/api/identity/v3/rbac_base.py
index a1cdf4c..31533a3 100644
--- a/patrole_tempest_plugin/tests/api/identity/v3/rbac_base.py
+++ b/patrole_tempest_plugin/tests/api/identity/v3/rbac_base.py
@@ -53,6 +53,7 @@
         cls.role_assignments_client = cls.os.role_assignments_client
         cls.roles_client = cls.os.roles_v3_client
         cls.services_client = cls.os.identity_services_v3_client
+        cls.trusts_client = cls.os.trusts_client
         cls.users_client = cls.os.users_v3_client
 
     @classmethod
@@ -67,6 +68,7 @@
         cls.regions = []
         cls.roles = []
         cls.services = []
+        cls.trusts = []
         cls.users = []
 
     @classmethod
@@ -111,6 +113,10 @@
             test_utils.call_and_ignore_notfound_exc(
                 cls.services_client.delete_service, service['id'])
 
+        for trust in cls.trusts:
+            test_utils.call_and_ignore_notfound_exc(
+                cls.trusts_client.delete_trust, trust['id'])
+
         for user in cls.users:
             test_utils.call_and_ignore_notfound_exc(
                 cls.users_client.delete_user, user['id'])
@@ -229,6 +235,16 @@
         return service
 
     @classmethod
+    def setup_test_trust(cls, trustee_user_id, trustor_user_id, **kwargs):
+        """Setup a test trust."""
+        trust = cls.trusts_client.create_trust(
+            trustee_user_id=trustee_user_id, trustor_user_id=trustor_user_id,
+            impersonation=False, **kwargs)['trust']
+        cls.trusts.append(trust)
+
+        return trust
+
+    @classmethod
     def setup_test_user(cls, password=None, **kwargs):
         """Set up a test user."""
         username = data_utils.rand_name('test_user')
diff --git a/patrole_tempest_plugin/tests/api/identity/v3/test_trusts_rbac.py b/patrole_tempest_plugin/tests/api/identity/v3/test_trusts_rbac.py
new file mode 100644
index 0000000..622b330
--- /dev/null
+++ b/patrole_tempest_plugin/tests/api/identity/v3/test_trusts_rbac.py
@@ -0,0 +1,130 @@
+# Copyright 2017 AT&T Corporation.
+# All Rights Reserved.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License"); you may
+#    not use this file except in compliance with the License. You may obtain
+#    a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+#    License for the specific language governing permissions and limitations
+#    under the License.
+
+from tempest import config
+from tempest.lib import decorators
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+
+from patrole_tempest_plugin import rbac_rule_validation
+from patrole_tempest_plugin.tests.api.identity.v3 import rbac_base
+
+CONF = config.CONF
+
+
+class IdentityTrustV3RbacTest(rbac_base.BaseIdentityV3RbacTest):
+
+    credentials = ['primary', 'admin', 'alt']
+
+    @classmethod
+    def skip_checks(cls):
+        super(IdentityTrustV3RbacTest, cls).skip_checks()
+        if not CONF.identity_feature_enabled.trust:
+            raise cls.skipException(
+                "%s skipped as trust feature isn't enabled" % cls.__name__)
+
+    @classmethod
+    def resource_setup(cls):
+        super(IdentityTrustV3RbacTest, cls).resource_setup()
+        # Use the primary user's credentials for the "trustor_user_id", since
+        # user_id:%(trust.trustor_user_id)s will thereby evaluate to
+        # "primary user's user_id:primary user's user_id" which evaluates to
+        # true.
+        cls.trustor_user_id = cls.auth_provider.credentials.user_id
+        cls.trustor_project_id = cls.auth_provider.credentials.project_id
+        cls.trustee_user_id = cls.setup_test_user()['id']
+
+        # The "unauthorized_user_id" does not have permissions to create a
+        # trust because the user_id in "user_id:%(trust.trustor_user_id)s" (the
+        # policy rule for creating a trust) corresponds to the primary user_id
+        # not the alt user_id.
+        cls.unauthorized_user_id = cls.os_alt.auth_provider.credentials.user_id
+
+        # A role is guaranteed to exist (namely the admin role), because
+        # "trustor_user_id" and "trustor_project_id" are the primary tempest
+        # user and project, respectively.
+        cls.delegated_role_id = cls.roles_client.list_user_roles_on_project(
+            cls.trustor_project_id, cls.trustor_user_id)['roles'][0]['id']
+
+        cls.trust = cls.setup_test_trust(trustor_user_id=cls.trustor_user_id,
+                                         trustee_user_id=cls.trustee_user_id,
+                                         project_id=cls.trustor_project_id,
+                                         roles=[{'id': cls.delegated_role_id}])
+
+    @decorators.idempotent_id('7ab595a7-9b71-45fe-91d8-2793b0292f72')
+    @rbac_rule_validation.action(
+        service="keystone",
+        rule="identity:create_trust",
+        extra_target_data={
+            "trust.trustor_user_id": "os.auth_provider.credentials.user_id"
+        })
+    def test_create_trust(self):
+        self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+        self.setup_test_trust(trustor_user_id=self.trustor_user_id,
+                              trustee_user_id=self.trustee_user_id)
+
+    @decorators.idempotent_id('bd72d22a-6e11-4840-bd93-17b382e7f0e0')
+    @test.attr(type=['negative'])
+    @rbac_rule_validation.action(
+        service="keystone",
+        rule="identity:create_trust",
+        extra_target_data={
+            "trust.trustor_user_id": "os_alt.auth_provider.credentials.user_id"
+        })
+    def test_create_trust_negative(self):
+        # Explicit negative test for identity:create_trust policy action.
+        # Assert expected exception is Forbidden and then reraise it.
+        self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+        e = self.assertRaises(lib_exc.Forbidden, self.setup_test_trust,
+                              trustor_user_id=self.unauthorized_user_id,
+                              trustee_user_id=self.trustee_user_id)
+        raise e
+
+    @decorators.idempotent_id('d9a6fd06-08f6-462c-a86c-ce009adf1230')
+    @rbac_rule_validation.action(
+        service="keystone",
+        rule="identity:delete_trust")
+    def test_delete_trust(self):
+        trust = self.setup_test_trust(trustor_user_id=self.trustor_user_id,
+                                      trustee_user_id=self.trustee_user_id)
+
+        self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+        self.trusts_client.delete_trust(trust['id'])
+
+    @decorators.idempotent_id('f2e32896-bf66-4f4e-89cf-e7fba0ef1f38')
+    @rbac_rule_validation.action(
+        service="keystone",
+        rule="identity:list_trusts")
+    def test_list_trusts(self):
+        self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+        self.trusts_client.list_trusts(
+            trustor_user_id=self.trustor_user_id)['trusts']
+
+    @decorators.idempotent_id('3c9ff92f-a73e-4f9b-8865-e017f38c70f5')
+    @rbac_rule_validation.action(
+        service="keystone",
+        rule="identity:list_roles_for_trust")
+    def test_list_roles_for_trust(self):
+        self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+        self.trusts_client.list_trust_roles(self.trust['id'])['roles']
+
+    @decorators.idempotent_id('3bb4f97b-cecd-4c7d-ad10-b88ee6c5d573')
+    @rbac_rule_validation.action(
+        service="keystone",
+        rule="identity:get_role_for_trust")
+    def test_show_trust_role(self):
+        self.rbac_utils.switch_role(self, toggle_rbac_role=True)
+        self.trusts_client.show_trust_role(
+            self.trust['id'], self.delegated_role_id)['role']
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py b/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py
index 78d8e66..174945e 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_rule_validation.py
@@ -33,8 +33,8 @@
         self.mock_args = mock.Mock(spec=test.BaseTestCase)
         self.mock_args.auth_provider = mock.Mock()
         self.mock_args.rbac_utils = mock.Mock()
-        self.mock_args.auth_provider.credentials.tenant_id = \
-            mock.sentinel.tenant_id
+        self.mock_args.auth_provider.credentials.project_id = \
+            mock.sentinel.project_id
         self.mock_args.auth_provider.credentials.user_id = \
             mock.sentinel.user_id
 
@@ -292,8 +292,8 @@
                          str(e))
 
         mock_rbac_policy_parser.RbacPolicyParser.assert_called_once_with(
-            mock.sentinel.tenant_id, mock.sentinel.user_id,
-            mock.sentinel.service)
+            mock.sentinel.project_id, mock.sentinel.user_id,
+            mock.sentinel.service, extra_target_data={})
 
     @mock.patch.object(rbac_auth, 'RbacAuthority', autospec=True)
     def test_get_exception_type_404(self, mock_auth):