Merge "Unit tests for dynamic policy file discovery"
diff --git a/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py b/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py
index e3a4429..95e6bff 100644
--- a/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py
+++ b/patrole_tempest_plugin/tests/unit/test_rbac_policy_parser.py
@@ -40,8 +40,9 @@
def setUp(self):
super(RbacPolicyTest, self).setUp()
- mock_admin_mgr = self.patchobject(rbac_policy_parser, 'credentials')
- mock_admin_mgr.AdminManager().identity_services_v3_client.\
+ self.mock_admin_mgr = self.patchobject(
+ rbac_policy_parser, 'credentials')
+ self.mock_admin_mgr.AdminManager().identity_services_v3_client.\
list_services.return_value = self.services
current_directory = os.path.dirname(os.path.realpath(__file__))
@@ -57,14 +58,19 @@
self.tenant_policy_file = os.path.join(current_directory,
'resources',
'tenant_rbac_policy.json')
+ self.conf_policy_path = os.path.join(
+ current_directory, 'resources', '%s.json')
CONF.set_override(
- 'custom_policy_files',
- [os.path.join(current_directory, 'resources', '%s.json')],
- group='rbac')
+ 'custom_policy_files', [self.conf_policy_path], group='rbac')
self.addCleanup(CONF.clear_override, 'custom_policy_files',
group='rbac')
+ # Guarantee a blank slate for each test.
+ for attr in ('available_services', 'policy_files'):
+ if attr in dir(rbac_policy_parser.RbacPolicyParser):
+ delattr(rbac_policy_parser.RbacPolicyParser, attr)
+
def _get_fake_policy_rule(self, name, rule):
fake_rule = mock.Mock(check=rule)
fake_rule.name = name
@@ -427,3 +433,60 @@
.format('tenant_rbac_policy', [CONF.rbac.custom_policy_files[0]
% 'tenant_rbac_policy']))
self.assertIn(expected_error, str(e))
+
+ def test_discover_policy_files(self):
+ policy_parser = rbac_policy_parser.RbacPolicyParser(
+ None, None, 'tenant_rbac_policy')
+
+ # Ensure that "policy_files" is set at class and instance levels.
+ self.assertIn('policy_files',
+ dir(rbac_policy_parser.RbacPolicyParser))
+ self.assertIn('policy_files', dir(policy_parser))
+ self.assertIn('tenant_rbac_policy', policy_parser.policy_files)
+ self.assertEqual(self.conf_policy_path % 'tenant_rbac_policy',
+ policy_parser.policy_files['tenant_rbac_policy'])
+
+ @mock.patch.object(rbac_policy_parser, 'policy', autospec=True)
+ @mock.patch.object(rbac_policy_parser.RbacPolicyParser, '_get_policy_data',
+ autospec=True)
+ @mock.patch.object(rbac_policy_parser, 'os', autospec=True)
+ def test_discover_policy_files_with_many_invalid_one_valid(self, m_os, *a):
+ # Only the 3rd path is valid.
+ m_os.path.isfile.side_effect = [False, False, True, False]
+
+ # Ensure the outer for loop runs only once in `discover_policy_files`.
+ self.mock_admin_mgr.AdminManager().identity_services_v3_client.\
+ list_services.return_value = {
+ 'services': [{'name': 'test_service'}]}
+
+ # The expected policy will be 'baz/test_service'.
+ CONF.set_override(
+ 'custom_policy_files', ['foo/%s', 'bar/%s', 'baz/%s'],
+ group='rbac')
+
+ policy_parser = rbac_policy_parser.RbacPolicyParser(
+ None, None, 'test_service')
+
+ # Ensure that "policy_files" is set at class and instance levels.
+ self.assertIn('policy_files',
+ dir(rbac_policy_parser.RbacPolicyParser))
+ self.assertIn('policy_files', dir(policy_parser))
+ self.assertIn('test_service', policy_parser.policy_files)
+ self.assertEqual('baz/test_service',
+ policy_parser.policy_files['test_service'])
+
+ def test_discover_policy_files_with_no_valid_files(self):
+ expected_error = ("Policy file for test_service service neither found "
+ "in code nor at %s." %
+ [self.conf_policy_path % 'test_service'])
+
+ e = self.assertRaises(rbac_exceptions.RbacParsingException,
+ rbac_policy_parser.RbacPolicyParser,
+ None, None, 'test_service')
+ self.assertIn(expected_error, str(e))
+
+ self.assertIn('policy_files',
+ dir(rbac_policy_parser.RbacPolicyParser))
+ self.assertNotIn(
+ 'test_service',
+ rbac_policy_parser.RbacPolicyParser.policy_files.keys())