Merge "Enhance rbac policy parser to correctly interpret user_id policy actions."
diff --git a/patrole_tempest_plugin/rbac_auth.py b/patrole_tempest_plugin/rbac_auth.py
index 40a46a7..e4e35b1 100644
--- a/patrole_tempest_plugin/rbac_auth.py
+++ b/patrole_tempest_plugin/rbac_auth.py
@@ -21,9 +21,9 @@
class RbacAuthority(object):
- def __init__(self, tenant_id, service=None):
- self.converter = rbac_policy_parser.RbacPolicyParser(tenant_id,
- service)
+ def __init__(self, tenant_id, user_id, service=None):
+ self.converter = rbac_policy_parser.RbacPolicyParser(
+ tenant_id, user_id, service)
def get_permission(self, rule_name, role):
try:
diff --git a/patrole_tempest_plugin/rbac_policy_parser.py b/patrole_tempest_plugin/rbac_policy_parser.py
index 860a53d..045a9f8 100644
--- a/patrole_tempest_plugin/rbac_policy_parser.py
+++ b/patrole_tempest_plugin/rbac_policy_parser.py
@@ -35,7 +35,7 @@
each role, whether a given rule is allowed using oslo policy.
"""
- def __init__(self, tenant_id, service, path=None):
+ def __init__(self, tenant_id, user_id, service=None, path=None):
"""Initialization of Rbac Policy Parser.
Parses a policy file to create a dictionary, mapping policy actions to
@@ -53,12 +53,13 @@
prioritized.
:param tenant_id: type uuid
+ :param user_id: type uuid
:param service: type string
:param path: type string
"""
service = service.lower().strip()
if path is None:
- self.path = '/etc/{0}/policy.json'.format(service)
+ self.path = os.path.join('/etc', service, 'policy.json')
else:
self.path = path
@@ -83,6 +84,7 @@
self.rules = policy.Rules.load(policy_data, "default")
self.tenant_id = tenant_id
+ self.user_id = user_id
def allowed(self, rule_name, role):
is_admin_context = self._is_admin_context(role)
@@ -115,7 +117,8 @@
}
],
"project_id": self.tenant_id,
- "tenant_id": self.tenant_id
+ "tenant_id": self.tenant_id,
+ "user_id": self.user_id
}
}
return access_token
@@ -146,7 +149,8 @@
target = {"project_id": access_data['project_id'],
"tenant_id": access_data['project_id'],
- "network:tenant_id": access_data['project_id']}
+ "network:tenant_id": access_data['project_id'],
+ "user_id": access_data['user_id']}
result = self._try_rule(apply_rule, target, access_data, o)
return result
diff --git a/patrole_tempest_plugin/rbac_rule_validation.py b/patrole_tempest_plugin/rbac_rule_validation.py
index 4b85187..36784b7 100644
--- a/patrole_tempest_plugin/rbac_rule_validation.py
+++ b/patrole_tempest_plugin/rbac_rule_validation.py
@@ -30,12 +30,13 @@
def wrapper(*args, **kwargs):
try:
tenant_id = args[0].auth_provider.credentials.tenant_id
+ user_id = args[0].auth_provider.credentials.user_id
except (IndexError, AttributeError) as e:
- msg = ("{0}: tenant_id not found in "
+ msg = ("{0}: tenant_id/user_id not found in "
"cls.auth_provider.credentials".format(e))
LOG.error(msg)
raise rbac_exceptions.RbacResourceSetupFailed(msg)
- authority = rbac_auth.RbacAuthority(tenant_id, service)
+ authority = rbac_auth.RbacAuthority(tenant_id, user_id, service)
allowed = authority.get_permission(rule, CONF.rbac.rbac_test_role)
try:
diff --git a/patrole_tempest_plugin/tests/unit/resources/tenant_rbac_policy.json b/patrole_tempest_plugin/tests/unit/resources/tenant_rbac_policy.json
index 2647e4d..ea65c88 100644
--- a/patrole_tempest_plugin/tests/unit/resources/tenant_rbac_policy.json
+++ b/patrole_tempest_plugin/tests/unit/resources/tenant_rbac_policy.json
@@ -2,5 +2,7 @@
"rule1": "tenant_id:%(network:tenant_id)s",
"rule2": "tenant_id:%(tenant_id)s",
"rule3": "project_id:%(project_id)s",
- "admin_rule": "role:admin and tenant_id:%(tenant_id)s"
+ "rule4": "user_id:%(user_id)s",
+ "admin_tenant_rule": "role:admin and tenant_id:%(tenant_id)s",
+ "admin_user_rule": "role:admin and user_id:%(user_id)s"
}
\ No newline at end of file
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py b/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py
index cc0fc4f..35aaa82 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py
@@ -48,8 +48,10 @@
default_roles = ['zero', 'one', 'two', 'three', 'four',
'five', 'six', 'seven', 'eight', 'nine']
+ test_tenant_id = mock.sentinel.tenant_id
+ test_user_id = mock.sentinel.user_id
converter = rbac_policy_parser.RbacPolicyParser(
- None, "test", self.custom_policy_file)
+ test_tenant_id, test_user_id, "test", self.custom_policy_file)
expected = {
'policy_action_1': ['two', 'four', 'six', 'eight'],
@@ -76,8 +78,10 @@
self.assertFalse(converter.allowed(rule, role))
def test_admin_policy_file_with_admin_role(self):
+ test_tenant_id = mock.sentinel.tenant_id
+ test_user_id = mock.sentinel.user_id
converter = rbac_policy_parser.RbacPolicyParser(
- None, "test", self.admin_policy_file)
+ test_tenant_id, test_user_id, "test", self.admin_policy_file)
role = 'admin'
allowed_rules = [
@@ -94,8 +98,10 @@
self.assertFalse(allowed)
def test_admin_policy_file_with_member_role(self):
+ test_tenant_id = mock.sentinel.tenant_id
+ test_user_id = mock.sentinel.user_id
converter = rbac_policy_parser.RbacPolicyParser(
- None, "test", self.admin_policy_file)
+ test_tenant_id, test_user_id, "test", self.admin_policy_file)
role = 'Member'
allowed_rules = [
@@ -113,8 +119,10 @@
self.assertFalse(allowed)
def test_admin_policy_file_with_context_is_admin(self):
+ test_tenant_id = mock.sentinel.tenant_id
+ test_user_id = mock.sentinel.user_id
converter = rbac_policy_parser.RbacPolicyParser(
- None, "test", self.alt_admin_policy_file)
+ test_tenant_id, test_user_id, "test", self.alt_admin_policy_file)
role = 'fake_admin'
allowed_rules = ['non_admin_rule']
@@ -140,43 +148,58 @@
allowed = converter.allowed(rule, role)
self.assertFalse(allowed)
- def test_tenant_policy(self):
- """Test whether rules with format tenant_id:%(tenant_id)s work.
+ def test_tenant_user_policy(self):
+ """Test whether rules with format tenant_id/user_id formatting work.
Test whether Neutron rules that contain project_id, tenant_id, and
- network:tenant_id pass.
+ network:tenant_id pass. And test whether Nova rules that contain
+ user_id pass.
"""
test_tenant_id = mock.sentinel.tenant_id
+ test_user_id = mock.sentinel.user_id
converter = rbac_policy_parser.RbacPolicyParser(
- test_tenant_id, "test", self.tenant_policy_file)
+ test_tenant_id, test_user_id, "test", self.tenant_policy_file)
# Check whether Member role can perform expected actions.
- allowed_rules = ['rule1', 'rule2', 'rule3']
+ allowed_rules = ['rule1', 'rule2', 'rule3', 'rule4']
for rule in allowed_rules:
allowed = converter.allowed(rule, 'Member')
self.assertTrue(allowed)
- self.assertFalse(converter.allowed('admin_rule', 'Member'))
+
+ disallowed_rules = ['admin_tenant_rule', 'admin_user_rule']
+ for disallowed_rule in disallowed_rules:
+ self.assertFalse(converter.allowed(disallowed_rule, 'Member'))
# Check whether admin role can perform expected actions.
- allowed_rules.append('admin_rule')
+ allowed_rules.extend(disallowed_rules)
for rule in allowed_rules:
allowed = converter.allowed(rule, 'admin')
self.assertTrue(allowed)
# Check whether _try_rule is called with the correct target dictionary.
- with mock.patch.object(converter, '_try_rule', autospec=True) \
+ with mock.patch.object(
+ converter, '_try_rule', return_value=True, autospec=True) \
as mock_try_rule:
- mock_try_rule.return_value = True
expected_target = {
- "project_id": test_tenant_id,
- "tenant_id": test_tenant_id,
- "network:tenant_id": test_tenant_id
+ "project_id": mock.sentinel.tenant_id,
+ "tenant_id": mock.sentinel.tenant_id,
+ "network:tenant_id": mock.sentinel.tenant_id,
+ "user_id": mock.sentinel.user_id
+ }
+
+ expected_access_data = {
+ "roles": ['Member'],
+ "is_admin": False,
+ "is_admin_project": True,
+ "user_id": mock.sentinel.user_id,
+ "tenant_id": mock.sentinel.tenant_id,
+ "project_id": mock.sentinel.tenant_id
}
for rule in allowed_rules:
allowed = converter.allowed(rule, 'Member')
self.assertTrue(allowed)
mock_try_rule.assert_called_once_with(
- rule, expected_target, mock.ANY, mock.ANY)
+ rule, expected_target, expected_access_data, mock.ANY)
mock_try_rule.reset_mock()