Merge "Test "NOTIFY" message is sent to Nameservers"
diff --git a/designate_tempest_plugin/services/dns/json/base.py b/designate_tempest_plugin/services/dns/json/base.py
index e1107ab..6ac9a85 100644
--- a/designate_tempest_plugin/services/dns/json/base.py
+++ b/designate_tempest_plugin/services/dns/json/base.py
@@ -17,8 +17,7 @@
from oslo_serialization import jsonutils as json
from tempest.lib.common import rest_client
from tempest.lib import exceptions as lib_exc
-from six.moves.urllib import parse as urllib
-import six
+from urllib import parse as urllib_parse
from designate_tempest_plugin.common import models
@@ -58,7 +57,7 @@
DELETE_STATUS_CODES = []
def serialize(self, data):
- if isinstance(data, six.string_types):
+ if isinstance(data, str):
return data
return json.dumps(data)
@@ -106,7 +105,7 @@
else:
uuid = '/%s' % uuid if uuid else ''
- params = '?%s' % urllib.urlencode(params) if params else ''
+ params = '?%s' % urllib_parse.urlencode(params) if params else ''
return uri_pattern.format(pref=self.uri_prefix,
res=resource_name,
@@ -199,6 +198,7 @@
:param data: A Python dict that represents an object of the
specified type (to be serialized) or a plain string which
is sent as-is.
+ :param headers (dict): The headers to use for the request.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
:param headers (dict): The headers to use for the request.
diff --git a/designate_tempest_plugin/services/dns/query/query_client.py b/designate_tempest_plugin/services/dns/query/query_client.py
index da1d1b0..ce9c7c1 100644
--- a/designate_tempest_plugin/services/dns/query/query_client.py
+++ b/designate_tempest_plugin/services/dns/query/query_client.py
@@ -14,7 +14,6 @@
import dns
import dns.exception
import dns.query
-import six
from tempest import config
CONF = config.CONF
@@ -51,7 +50,7 @@
@classmethod
def _prepare_query(cls, zone_name, rdatatype):
# support plain strings: "SOA", "A"
- if isinstance(rdatatype, six.string_types):
+ if isinstance(rdatatype, str):
rdatatype = dns.rdatatype.from_text(rdatatype)
dns_message = dns.message.make_query(zone_name, rdatatype)
dns_message.set_opcode(dns.opcode.QUERY)
diff --git a/designate_tempest_plugin/services/dns/v2/json/recordset_client.py b/designate_tempest_plugin/services/dns/v2/json/recordset_client.py
index 20b6fcf..d07ca18 100644
--- a/designate_tempest_plugin/services/dns/v2/json/recordset_client.py
+++ b/designate_tempest_plugin/services/dns/v2/json/recordset_client.py
@@ -106,17 +106,20 @@
params=params, headers=headers)
@base.handle_errors
- def delete_recordset(self, zone_uuid, recordset_uuid, params=None):
+ def delete_recordset(self, zone_uuid, recordset_uuid, params=None,
+ headers=None):
"""Deletes a recordset related to the specified zone UUID.
:param zone_uuid: The unique identifier of the zone.
:param recordset_uuid: The unique identifier of the record in
uuid format.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
+ :param headers (dict): The headers to use for the request.
:return: A tuple with the server response and the response body.
"""
resp, body = self._delete_request(
- 'zones/{0}/recordsets'.format(zone_uuid), recordset_uuid)
+ 'zones/{0}/recordsets'.format(zone_uuid), recordset_uuid,
+ params=params, headers=headers)
# Delete Recordset should Return a HTTP 202
self.expected_success(202, resp.status)
diff --git a/designate_tempest_plugin/services/dns/v2/json/transfer_accepts_client.py b/designate_tempest_plugin/services/dns/v2/json/transfer_accepts_client.py
index 753ffc2..9ee060d 100644
--- a/designate_tempest_plugin/services/dns/v2/json/transfer_accepts_client.py
+++ b/designate_tempest_plugin/services/dns/v2/json/transfer_accepts_client.py
@@ -66,5 +66,4 @@
:return: List of accepted zone transfers
"""
return self._list_request(
- 'zones/tasks/transfer_accepts', params=params,
- headers=headers)[1]['transfer_accepts']
+ 'zones/tasks/transfer_accepts', params=params, headers=headers)
diff --git a/designate_tempest_plugin/services/dns/v2/json/transfer_request_client.py b/designate_tempest_plugin/services/dns/v2/json/transfer_request_client.py
index 9523175..e2d35e2 100644
--- a/designate_tempest_plugin/services/dns/v2/json/transfer_request_client.py
+++ b/designate_tempest_plugin/services/dns/v2/json/transfer_request_client.py
@@ -103,7 +103,7 @@
@base.handle_errors
def update_transfer_request(self, uuid, transfer_request_data=None,
- params=None):
+ params=None, headers=None):
"""Update a zone transfer_requests.
:param uuid: Unique identifier of the zone transfer request in UUID
format.
@@ -111,13 +111,15 @@
data for zone transfer request
:param params: A Python dict that represents the query paramaters to
include in the request URI.
+ :param headers (dict): The headers to use for the request.
:return: Serialized imported zone as a dictionary.
"""
transfer_request_uri = 'zones/tasks/transfer_requests'
transfer_request_data = (transfer_request_data or
dns_data_utils.rand_transfer_request_data())
resp, body = self._update_request(
- transfer_request_uri, uuid, transfer_request_data, params=params)
+ transfer_request_uri, uuid, transfer_request_data, params=params,
+ headers=headers)
# Create Transfer request should Return a HTTP 200
self.expected_success(200, resp.status)
diff --git a/designate_tempest_plugin/services/dns/v2/json/zone_exports_client.py b/designate_tempest_plugin/services/dns/v2/json/zone_exports_client.py
index 4057b7e..1f469ec 100644
--- a/designate_tempest_plugin/services/dns/v2/json/zone_exports_client.py
+++ b/designate_tempest_plugin/services/dns/v2/json/zone_exports_client.py
@@ -100,16 +100,17 @@
'zones/tasks/exports', params=params, headers=headers)
@base.handle_errors
- def delete_zone_export(self, uuid, params=None):
+ def delete_zone_export(self, uuid, params=None, headers=None):
"""Deletes the zone export task with the specified UUID.
:param uuid: The unique identifier of the exported zone.
:param params: A Python dict that represents the query parameters to
include in the request URI.
+ :param headers (dict): The headers to use for the request.
:return: A tuple with the server response and the response body.
"""
resp, body = self._delete_request(
- 'zones/tasks/exports', uuid, params=params)
+ 'zones/tasks/exports', uuid, params=params, headers=headers)
# Delete Zone export should Return a HTTP 204
self.expected_success(204, resp.status)
diff --git a/designate_tempest_plugin/services/dns/v2/json/zone_imports_client.py b/designate_tempest_plugin/services/dns/v2/json/zone_imports_client.py
index 86d9fb1..4fbba2a 100644
--- a/designate_tempest_plugin/services/dns/v2/json/zone_imports_client.py
+++ b/designate_tempest_plugin/services/dns/v2/json/zone_imports_client.py
@@ -66,15 +66,16 @@
'zones/tasks/imports', params=params, headers=headers)
@base.handle_errors
- def delete_zone_import(self, uuid, params=None):
+ def delete_zone_import(self, uuid, params=None, headers=None):
"""Deletes a imported zone having the specified UUID.
:param uuid: The unique identifier of the imported zone.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
+ :param headers (dict): The headers to use for the request.
:return: A tuple with the server response and the response body.
"""
resp, body = self._delete_request(
- 'zones/tasks/imports', uuid, params=params)
+ 'zones/tasks/imports', uuid, params=params, headers=headers)
# Delete Zone should Return a HTTP 204
self.expected_success(204, resp.status)
diff --git a/designate_tempest_plugin/services/dns/v2/json/zones_client.py b/designate_tempest_plugin/services/dns/v2/json/zones_client.py
index c30d24e..4cb6016 100644
--- a/designate_tempest_plugin/services/dns/v2/json/zones_client.py
+++ b/designate_tempest_plugin/services/dns/v2/json/zones_client.py
@@ -111,16 +111,17 @@
'zones', uuid, params=params, headers=headers)
@base.handle_errors
- def show_zone_nameservers(self, zone_uuid, params=None):
+ def show_zone_nameservers(self, zone_uuid, params=None, headers=None):
"""Gets list of Zone Name Servers
:param zone_uuid: Unique identifier of the zone in UUID format.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
+ :param headers (dict): The headers to use for the request.
:return: Serialized nameservers as a list.
"""
return self._show_request(
'zones/{0}/nameservers'.format(zone_uuid), uuid=None,
- params=params)
+ params=params, headers=headers)
@base.handle_errors
def list_zones(self, params=None, headers=None):
@@ -151,7 +152,8 @@
@base.handle_errors
def update_zone(self, uuid, email=None, ttl=None,
- description=None, wait_until=False, params=None):
+ description=None, wait_until=False, params=None,
+ headers=None):
"""Update a zone with the specified parameters.
:param uuid: The unique identifier of the zone.
:param email: The email for the zone.
@@ -163,6 +165,7 @@
:param wait_until: Block until the zone reaches the desiered status
:param params: A Python dict that represents the query paramaters to
include in the request URI.
+ :param headers (dict): The headers to use for the request.
:return: A tuple with the server response and the updated zone.
"""
zone = {
@@ -171,7 +174,8 @@
'description': description or data_utils.rand_name('test-zone'),
}
- resp, body = self._update_request('zones', uuid, zone, params=params)
+ resp, body = self._update_request('zones', uuid, zone, params=params,
+ headers=headers)
# Update Zone should Return a HTTP 202
self.expected_success(202, resp.status)
diff --git a/designate_tempest_plugin/tests/api/v2/test_blacklists.py b/designate_tempest_plugin/tests/api/v2/test_blacklists.py
index 95688b3..ecb5ce5 100644
--- a/designate_tempest_plugin/tests/api/v2/test_blacklists.py
+++ b/designate_tempest_plugin/tests/api/v2/test_blacklists.py
@@ -30,7 +30,6 @@
class BlacklistsAdminTest(BaseBlacklistsTest):
- credentials = ["admin", "system_admin", "primary"]
@classmethod
def setup_credentials(cls):
# Do not create network resources for these test.
@@ -59,8 +58,14 @@
self.assertExpected(blacklist, body, self.excluded_keys)
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+
+ self.check_CUD_RBAC_enforcement('BlacklistsClient', 'create_blacklist',
+ expected_allowed, False)
+
@decorators.idempotent_id('ea608152-da3c-11eb-b8b8-74e5f9e2a801')
- @decorators.skip_because(bug="1934252")
def test_create_blacklist_invalid_pattern(self):
patterns = ['', '#(*&^%$%$#@$', 'a' * 1000]
for pattern in patterns:
@@ -95,6 +100,14 @@
LOG.info('Ensure the fetched response matches the created blacklist')
self.assertExpected(blacklist, body, self.excluded_keys)
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader']
+
+ self.check_list_show_RBAC_enforcement(
+ 'BlacklistsClient', 'show_blacklist', expected_allowed, False,
+ blacklist['id'])
+
@decorators.idempotent_id('dcea40d9-8d36-43cb-8440-4a842faaef0d')
def test_delete_blacklist(self):
LOG.info('Create a blacklist')
@@ -108,6 +121,14 @@
# A blacklist delete returns an empty body
self.assertEqual(body.strip(), b"")
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+
+ self.check_CUD_RBAC_enforcement(
+ 'BlacklistsClient', 'delete_blacklist', expected_allowed, False,
+ blacklist['id'])
+
@decorators.idempotent_id('3a2a1e6c-8176-428c-b5dd-d85217c0209d')
def test_list_blacklists(self):
LOG.info('Create a blacklist')
@@ -120,6 +141,14 @@
# TODO(pglass): Assert that the created blacklist is in the response
self.assertGreater(len(body['blacklists']), 0)
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'BlacklistsClient', 'list_blacklists',
+ expected_allowed, [blacklist['id']])
+
@decorators.idempotent_id('0063d6ad-9557-49c7-b521-e64a14d4d0d0')
def test_update_blacklist(self):
LOG.info('Create a blacklist')
@@ -139,6 +168,14 @@
self.assertEqual(pattern, body['pattern'])
self.assertEqual(description, body['description'])
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+
+ self.check_CUD_RBAC_enforcement(
+ 'BlacklistsClient', 'update_blacklist', expected_allowed, False,
+ uuid=blacklist['id'], pattern=pattern, description=description)
+
class TestBlacklistNotFoundAdmin(BaseBlacklistsTest):
diff --git a/designate_tempest_plugin/tests/api/v2/test_designate_limits.py b/designate_tempest_plugin/tests/api/v2/test_designate_limits.py
index 2cb9d9e..102f168 100644
--- a/designate_tempest_plugin/tests/api/v2/test_designate_limits.py
+++ b/designate_tempest_plugin/tests/api/v2/test_designate_limits.py
@@ -24,7 +24,8 @@
class DesignateLimit(base.BaseDnsV2Test):
- credentials = ["admin", "system_admin", "primary", "alt"]
+ credentials = ["admin", "system_admin", "system_reader", "primary", "alt",
+ "project_member", "project_reader"]
@classmethod
def setup_credentials(cls):
@@ -102,3 +103,14 @@
project_id, received_project_ids,
'Failed, expected project_id:{} is missing in:{} '.format(
project_id, received_project_ids))
+
+ @decorators.idempotent_id('fc57fa6b-5280-4186-9be9-ff4da0961db0')
+ def test_list_designate_limits_RBAC(self):
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.extend(['os_system_admin', 'os_system_reader',
+ 'os_project_member', 'os_project_reader'])
+
+ self.check_list_show_RBAC_enforcement(
+ 'DesignateLimitClient', 'list_designate_limits',
+ expected_allowed, False)
diff --git a/designate_tempest_plugin/tests/api/v2/test_pool.py b/designate_tempest_plugin/tests/api/v2/test_pool.py
index 60af204..144f0d6 100644
--- a/designate_tempest_plugin/tests/api/v2/test_pool.py
+++ b/designate_tempest_plugin/tests/api/v2/test_pool.py
@@ -36,7 +36,8 @@
class PoolAdminTest(BasePoolTest):
- credentials = ["admin", "system_admin"]
+ credentials = ["admin", "primary", "system_admin", "system_reader",
+ "project_member", "project_reader", "alt"]
@classmethod
def setup_credentials(cls):
@@ -72,6 +73,16 @@
self.assertEqual(pool_data["name"], pool['name'])
self.assertExpected(pool_data, pool, self.excluded_keys)
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'PoolClient', 'create_pool', expected_allowed, False,
+ pool_name=pool_data["name"], ns_records=pool_data["ns_records"],
+ project_id=pool_data["project_id"])
+
@decorators.idempotent_id('e80eb70a-8ee5-40eb-b06e-599597a8ab7e')
def test_show_pool(self):
LOG.info('Create a pool')
@@ -88,6 +99,20 @@
self._assertExpectedNSRecords(pool["ns_records"], body["ns_records"],
expected_key="priority")
+ # TODO(johnsom) Test reader roles once this bug is fixed.
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ # TODO(johnsom) The pools API seems inconsistent with the requirement
+ # of the all-projects header.
+ self.check_list_show_RBAC_enforcement(
+ 'PoolClient', 'show_pool', expected_allowed, True, pool['id'],
+ headers=self.all_projects_header)
+
@decorators.idempotent_id('d8c4c377-5d88-452d-a4d2-c004d72e1abe')
def test_delete_pool(self):
LOG.info('Create a pool')
@@ -104,6 +129,14 @@
lambda: self.admin_client.show_pool(
pool['id'], headers=self.all_projects_header))
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'PoolClient', 'delete_pool', expected_allowed, False, pool['id'])
+
@decorators.idempotent_id('77c85b40-83b2-4c17-9fbf-e6d516cfce90')
def test_list_pools(self):
LOG.info('Create a pool')
@@ -117,6 +150,18 @@
self.assertGreater(len(body['pools']), 0)
+ # TODO(johnsom) Test reader roles once this bug is fixed.
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'PoolClient', 'list_pools', expected_allowed, [pool['id']],
+ headers=self.all_projects_header)
+
@decorators.idempotent_id('fdcc84ce-af65-4af6-a5fc-6c50acbea0f0')
def test_update_pool(self):
LOG.info('Create a pool')
@@ -131,6 +176,15 @@
self.assertEqual("foo", patch_pool["name"])
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'PoolClient', 'update_pool', expected_allowed, True,
+ pool['id'], pool_name="test-name")
+
@decorators.idempotent_id('41ad6a84-00ce-4a04-9fd5-b7c15c31e2db')
def test_list_pools_dot_json_fails(self):
uri = self.admin_client.get_uri('pools.json')
diff --git a/designate_tempest_plugin/tests/api/v2/test_quotas.py b/designate_tempest_plugin/tests/api/v2/test_quotas.py
index 31a5c7e..37e07e3 100644
--- a/designate_tempest_plugin/tests/api/v2/test_quotas.py
+++ b/designate_tempest_plugin/tests/api/v2/test_quotas.py
@@ -29,7 +29,8 @@
class QuotasV2Test(base.BaseDnsV2Test):
- credentials = ["primary", "admin", "system_admin", "alt"]
+ credentials = ["primary", "admin", "system_admin", "system_reader", "alt",
+ "project_member", "project_reader"]
@classmethod
def setup_credentials(cls):
@@ -90,10 +91,28 @@
'Failed, the value of:{} is:{}, expected integer'.format(
quota_type, quota_value))
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.extend(['os_system_admin', 'os_system_reader',
+ 'os_project_member', 'os_project_reader'])
+
+ self.check_list_show_with_ID_RBAC_enforcement(
+ 'QuotasClient', 'show_quotas', expected_allowed, False)
+
@decorators.idempotent_id('0448b089-5803-4ce3-8a6c-5c15ff75a2cc')
def test_reset_quotas(self):
self._store_quotas(project_id=self.quotas_client.project_id)
+
LOG.info("Deleting (reset) quotas")
+
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.extend(['os_system_admin'])
+
+ self.check_CUD_RBAC_enforcement(
+ 'QuotasClient', 'delete_quotas', expected_allowed, False,
+ project_id=self.quotas_client.project_id)
+
body = self.admin_client.delete_quotas(
project_id=self.quotas_client.project_id,
headers=self.all_projects_header)[1]
@@ -103,16 +122,21 @@
@decorators.idempotent_id('76d24c87-1b39-4e19-947c-c08e1380dc61')
def test_update_quotas(self):
- if CONF.enforce_scope.designate:
- raise self.skipException(
- "System scoped tokens do not have a project_id.")
-
- self._store_quotas(project_id=self.admin_client.project_id)
+ self._store_quotas(project_id=self.quotas_client.project_id)
LOG.info("Updating quotas")
quotas = dns_data_utils.rand_quotas()
body = self.admin_client.update_quotas(
- project_id=self.admin_client.project_id,
- **quotas)[1]
+ project_id=self.quotas_client.project_id,
+ **quotas, headers=self.all_projects_header)[1]
+
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.extend(['os_system_admin'])
+
+ self.check_CUD_RBAC_enforcement(
+ 'QuotasClient', 'update_quotas', expected_allowed, False,
+ project_id=self.quotas_client.project_id,
+ **quotas, headers=self.all_projects_header)
LOG.info("Ensuring the response has all quota types")
self.assertExpected(quotas, body, [])
diff --git a/designate_tempest_plugin/tests/api/v2/test_recordset.py b/designate_tempest_plugin/tests/api/v2/test_recordset.py
index 079782f..72495d3 100644
--- a/designate_tempest_plugin/tests/api/v2/test_recordset.py
+++ b/designate_tempest_plugin/tests/api/v2/test_recordset.py
@@ -66,7 +66,8 @@
@ddt.ddt
class RecordsetsTest(BaseRecordsetsTest):
- credentials = ["admin", "system_admin", "primary", "alt"]
+ credentials = ["admin", "system_admin", "system_reader", "primary", "alt",
+ "project_member", "project_reader"]
@classmethod
def setup_credentials(cls):
@@ -94,6 +95,16 @@
recordset_data = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=self.zone['name'])
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+ expected_allowed.append('os_project_member')
+
+ self.check_CUD_RBAC_enforcement(
+ 'RecordsetClient', 'create_recordset', expected_allowed, True,
+ self.zone['id'], recordset_data)
+
LOG.info('Create a Recordset')
resp, body = self.client.create_recordset(
self.zone['id'], recordset_data)
@@ -213,15 +224,46 @@
LOG.info('Create a Recordset')
resp, body = self.client.create_recordset(
self.zone['id'], recordset_data)
+ recordset_id = body['id']
self.addCleanup(
self.wait_recordset_delete, self.client,
- self.zone['id'], body['id'])
+ self.zone['id'], recordset_id)
LOG.info('List zone recordsets')
body = self.client.list_recordset(self.zone['id'])[1]
self.assertGreater(len(body), 0)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC
+ expected_allowed = ['os_primary']
+
+ self.check_list_show_RBAC_enforcement(
+ 'RecordsetClient', 'list_recordset', expected_allowed, True,
+ self.zone['id'])
+
+ # Test that users who should see the zone, can see it.
+ expected_allowed = ['os_primary']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'RecordsetClient', 'list_recordset',
+ expected_allowed, [recordset_id], self.zone['id'])
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'RecordsetClient', 'list_recordset', expected_allowed,
+ [recordset_id], self.zone['id'], headers=self.all_projects_header)
+ self.check_list_IDs_RBAC_enforcement(
+ 'RecordsetClient', 'list_recordset',
+ expected_allowed, [recordset_id], self.zone['id'],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('84c13cb2-9020-4c1e-aeb0-c348d9a70caa')
def test_show_recordsets(self):
recordset_data = dns_data_utils.rand_recordset_data(
@@ -230,16 +272,40 @@
LOG.info('Create a Recordset')
resp, body = self.client.create_recordset(
self.zone['id'], recordset_data)
+ recordset_id = body['id']
self.addCleanup(
self.wait_recordset_delete, self.client,
- self.zone['id'], body['id'])
+ self.zone['id'], recordset_id)
LOG.info('Re-Fetch the Recordset')
- record = self.client.show_recordset(self.zone['id'], body['id'])[1]
+ record = self.client.show_recordset(self.zone['id'], recordset_id)[1]
LOG.info('Ensure the fetched response matches the expected one')
self.assertExpected(body, record, self.excluded_keys)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC
+ expected_allowed = ['os_primary']
+
+ self.check_list_show_RBAC_enforcement(
+ 'RecordsetClient', 'show_recordset', expected_allowed, True,
+ self.zone['id'], recordset_id)
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'RecordsetClient', 'show_recordset', expected_allowed, True,
+ self.zone['id'], recordset_id, headers=self.all_projects_header)
+ self.check_list_show_RBAC_enforcement(
+ 'RecordsetClient', 'show_recordset', expected_allowed, True,
+ self.zone['id'], recordset_id,
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('855399c1-8806-4ae5-aa31-cb8a6f35e218')
def test_delete_recordset(self):
recordset_data = dns_data_utils.rand_recordset_data(
@@ -248,16 +314,40 @@
LOG.info('Create a Recordset')
record = self.client.create_recordset(
self.zone['id'], recordset_data)[1]
+ recordset_id = record['id']
self.addCleanup(
self.wait_recordset_delete, self.client,
- self.zone['id'], record['id'])
+ self.zone['id'], recordset_id)
+
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'RecordsetClient', 'delete_recordset', expected_allowed, True,
+ self.zone['id'], recordset_id)
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'RecordsetClient', 'delete_recordset', expected_allowed, False,
+ self.zone['id'], recordset_id, headers=self.all_projects_header)
+
+ self.check_CUD_RBAC_enforcement(
+ 'RecordsetClient', 'delete_recordset', expected_allowed, False,
+ self.zone['id'], recordset_id,
+ headers={'x-auth-sudo-project-id': self.client.project_id})
LOG.info('Delete a Recordset')
- self.client.delete_recordset(self.zone['id'], record['id'])
+ self.client.delete_recordset(self.zone['id'], recordset_id)
LOG.info('Ensure successful deletion of Recordset')
self.assertRaises(lib_exc.NotFound,
- lambda: self.client.show_recordset(self.zone['id'], record['id']))
+ lambda: self.client.show_recordset(self.zone['id'], recordset_id))
@decorators.idempotent_id('8d41c85f-09f9-48be-a202-92d1bdf5c796')
def test_update_recordset(self):
@@ -267,20 +357,44 @@
LOG.info('Create a recordset')
record = self.client.create_recordset(
self.zone['id'], recordset_data)[1]
+ recordset_id = record['id']
self.addCleanup(
self.wait_recordset_delete, self.client,
- self.zone['id'], record['id'])
+ self.zone['id'], recordset_id)
recordset_data = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=self.zone['name'], name=record['name'])
LOG.info('Update the recordset')
update = self.client.update_recordset(self.zone['id'],
- record['id'], recordset_data)[1]
+ recordset_id, recordset_data)[1]
self.assertEqual(record['name'], update['name'])
self.assertNotEqual(record['records'], update['records'])
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'RecordsetClient', 'update_recordset', expected_allowed, True,
+ self.zone['id'], recordset_id, recordset_data)
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'RecordsetClient', 'update_recordset', expected_allowed, False,
+ self.zone['id'], recordset_id, recordset_data,
+ headers=self.all_projects_header)
+ self.check_CUD_RBAC_enforcement(
+ 'RecordsetClient', 'update_recordset', expected_allowed, False,
+ self.zone['id'], recordset_id, recordset_data,
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('60904cc5-148b-4e3b-a0c6-35656dc8d44c')
def test_update_recordset_one_field(self):
recordset_data = dns_data_utils.rand_recordset_data(
diff --git a/designate_tempest_plugin/tests/api/v2/test_service_statuses.py b/designate_tempest_plugin/tests/api/v2/test_service_statuses.py
index bee364e..c1f634b 100644
--- a/designate_tempest_plugin/tests/api/v2/test_service_statuses.py
+++ b/designate_tempest_plugin/tests/api/v2/test_service_statuses.py
@@ -26,7 +26,8 @@
class ServiceStatusAdmin(base.BaseDnsV2Test):
- credentials = ["admin", "system_admin"]
+ credentials = ["primary", "admin", "system_admin", "system_reader", "alt",
+ "project_reader", "project_member"]
mandatory_services = ['central', 'mdns', 'worker', 'producer']
service_status_fields = [
@@ -71,6 +72,15 @@
"Failed, not all listed services are in UP status, "
"services: {}".format(services_statuses_tup))
+ # Test RBAC
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ServiceClient', 'list_statuses', expected_allowed, False)
+
@decorators.idempotent_id('fce0f704-c0ae-11ec-8213-201e8823901f')
def test_admin_show_service_status(self):
diff --git a/designate_tempest_plugin/tests/api/v2/test_tld.py b/designate_tempest_plugin/tests/api/v2/test_tld.py
index d5d584c..16711e4 100644
--- a/designate_tempest_plugin/tests/api/v2/test_tld.py
+++ b/designate_tempest_plugin/tests/api/v2/test_tld.py
@@ -29,7 +29,8 @@
class TldAdminTest(BaseTldTest):
- credentials = ["admin", "system_admin", "primary"]
+ credentials = ["admin", "system_admin", "system_reader",
+ "primary", "alt", "project_reader", "project_member"]
# Use a TLD suffix unique to this test class.
local_tld_suffix = '.'.join(["tldadmintest", CONF.dns.tld_suffix])
@@ -67,6 +68,14 @@
self.assertEqual(tld_name, tld['name'])
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement('TldClient', 'create_tld',
+ expected_allowed, False)
+
@decorators.idempotent_id('961bd2e8-d4d0-11eb-b8ee-74e5f9e2a801')
def test_create_duplicated_tlds(self):
tld_name = self._generate_tld_name("test_create_duplicated_tlds")
@@ -139,6 +148,15 @@
LOG.info('Ensure the fetched response matches the created tld')
self.assertExpected(tld, body, self.excluded_keys)
+ # Test RBAC
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'TldClient', 'show_tld', expected_allowed, False, tld['id'])
+
@decorators.idempotent_id('26708cb8-7126-48a7-9424-1c225e56e609')
def test_delete_tld(self):
LOG.info('Create a tld')
@@ -150,8 +168,16 @@
LOG.info('Delete the tld')
self.admin_client.delete_tld(tld['id'])
- self.assertRaises(lib_exc.NotFound,
- lambda: self.admin_client.show_tld(tld['id']))
+ self.assertRaises(lib_exc.NotFound, self.admin_client.show_tld,
+ tld['id'])
+
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement('TldClient', 'delete_tld',
+ expected_allowed, False, tld['id'])
@decorators.idempotent_id('95b13759-c85c-4791-829b-9591ca15779d')
def test_list_tlds(self):
@@ -165,6 +191,16 @@
self.assertGreater(len(body['tlds']), 0)
+ # Test RBAC
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'TldClient', 'list_tlds', expected_allowed, [tld['id']],
+ params={'limit': 1000})
+
@decorators.idempotent_id('1a233812-48d9-4d15-af5e-9961744286ff')
def test_update_tld(self):
tld_name = self._generate_tld_name("test_update_tld")
@@ -185,6 +221,15 @@
self.assertEqual(tld_name_2, patch_tld["name"])
self.assertEqual(tld_data["description"], patch_tld["description"])
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'TldClient', 'update_tld', expected_allowed, False, tld['id'],
+ tld_data['name'], tld_data['description'])
+
@decorators.idempotent_id('8116dcf5-a329-47d1-90be-5ff32f299c53')
def test_list_tlds_dot_json_fails(self):
uri = self.admin_client.get_uri('tlds.json')
diff --git a/designate_tempest_plugin/tests/api/v2/test_transfer_accepts.py b/designate_tempest_plugin/tests/api/v2/test_transfer_accepts.py
index 87f359d..7696291 100644
--- a/designate_tempest_plugin/tests/api/v2/test_transfer_accepts.py
+++ b/designate_tempest_plugin/tests/api/v2/test_transfer_accepts.py
@@ -53,7 +53,8 @@
class TransferAcceptTest(BaseTransferAcceptTest):
- credentials = ["primary", "alt", "admin", "system_admin"]
+ credentials = ["primary", "alt", "admin", "system_admin", "system_reader",
+ "project_member", "project_reader"]
@classmethod
def setup_credentials(cls):
@@ -114,6 +115,21 @@
"key": transfer_request['key'],
"zone_transfer_request_id": transfer_request['id']
}
+
+ # Test RBAC
+ # Note: Everyone can call this API and succeed if they know the
+ # transfer key.
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+ expected_allowed.append('os_system_reader')
+ expected_allowed.append('os_project_member')
+ expected_allowed.append('os_project_reader')
+
+ self.check_CUD_RBAC_enforcement(
+ 'TransferAcceptClient', 'create_transfer_accept',
+ expected_allowed, True, data)
+
LOG.info('Create a zone transfer_accept')
_, transfer_accept = self.prm_accept_client.create_transfer_accept(
data)
@@ -159,6 +175,25 @@
'created transfer_accept')
self.assertExpected(transfer_accept, body, self.excluded_keys)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC
+ expected_allowed = ['os_primary']
+
+ self.check_list_show_RBAC_enforcement(
+ 'TransferAcceptClient', 'show_transfer_accept', expected_allowed,
+ True, transfer_accept['id'])
+
+ # Test RBAC with x-auth-all-projects
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'TransferAcceptClient', 'show_transfer_accept', expected_allowed,
+ True, transfer_accept['id'], headers=self.all_projects_header)
+
@decorators.idempotent_id('89b516f0-8c9f-11eb-a322-74e5f9e2a801')
def test_ownership_transferred_zone(self):
@@ -241,6 +276,30 @@
self.assertEqual('COMPLETE', transfer_accept['status'])
transfer_request_ids.append(transfer_accept['id'])
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC - Users that are allowed to call list, but should get
+ # zero zones.
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_RBAC_enforcement_count(
+ 'TransferAcceptClient', 'list_transfer_accept',
+ expected_allowed, 0)
+
+ # Test that users who should see the zone, can see it.
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'TransferAcceptClient', 'list_transfer_accept',
+ expected_allowed, transfer_request_ids,
+ headers=self.all_projects_header)
+
# As Admin list all accepted zone transfers, expected:
# each previously transferred zone is listed.
# Note: This is an all-projects list call, so other tests running
@@ -250,7 +309,8 @@
admin_client_accept_ids = [
item['id'] for item in
self.admin_accept_client.list_transfer_accept(
- headers=self.all_projects_header, params={'limit': 1000})]
+ headers=self.all_projects_header,
+ params={'limit': 1000})[1]['transfer_accepts']]
for tr_id in transfer_request_ids:
self.assertIn(
tr_id, admin_client_accept_ids,
@@ -265,7 +325,7 @@
item['id'] for item in
self.admin_accept_client.list_transfer_accept(
headers=self.all_projects_header,
- params={'status': 'COMPLETE'})]
+ params={'status': 'COMPLETE'})[1]['transfer_accepts']]
for tr_id in transfer_request_ids:
self.assertIn(
tr_id, admin_client_accept_ids,
@@ -282,7 +342,7 @@
item['id'] for item in
self.admin_accept_client.list_transfer_accept(
headers=self.all_projects_header,
- params={'status': not_existing_status})]
+ params={'status': not_existing_status})[1]['transfer_accepts']]
self.assertEmpty(
admin_client_accept_ids,
"Failed, filtered list should be empty, but actually it's not, "
@@ -341,6 +401,18 @@
self.addCleanup(
self.wait_zone_delete, self.alt_zone_client, zone['id'])
+ # Test RBAC with x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'TransferAcceptClient', 'show_transfer_accept', expected_allowed,
+ True, transfer_accept['id'],
+ headers={'x-auth-sudo-project-id':
+ self.os_alt.credentials.project_id})
+
class TransferAcceptTestNegative(BaseTransferAcceptTest):
diff --git a/designate_tempest_plugin/tests/api/v2/test_transfer_request.py b/designate_tempest_plugin/tests/api/v2/test_transfer_request.py
index c53d6d9..f548f47 100644
--- a/designate_tempest_plugin/tests/api/v2/test_transfer_request.py
+++ b/designate_tempest_plugin/tests/api/v2/test_transfer_request.py
@@ -53,7 +53,8 @@
class TransferRequestTest(BaseTransferRequestTest):
- credentials = ["primary", "alt", "admin", "system_admin"]
+ credentials = ["primary", "alt", "admin", "system_admin", "system_reader",
+ "project_member", "project_reader"]
@classmethod
def setup_credentials(cls):
@@ -83,6 +84,16 @@
zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+ expected_allowed.append('os_project_member')
+
+ self.check_CUD_RBAC_enforcement(
+ 'TransferRequestClient', 'create_transfer_request',
+ expected_allowed, True, zone['id'])
+
LOG.info('Create a zone transfer_request')
transfer_request = self.client.create_transfer_request(zone['id'])[1]
self.addCleanup(self.client.delete_transfer_request,
@@ -147,6 +158,36 @@
'created transfer_request')
self.assertExpected(transfer_request, body, self.excluded_keys)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC
+ # Note: The create service client does not define a target project
+ # ID, so everyone should be able to see it.
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.extend(['os_system_admin', 'os_system_reader',
+ 'os_project_member', 'os_project_reader'])
+
+ self.check_list_show_RBAC_enforcement(
+ 'TransferRequestClient', 'show_transfer_request', expected_allowed,
+ True, transfer_request['id'])
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'TransferRequestClient', 'show_transfer_request', expected_allowed,
+ True, transfer_request['id'], headers=self.all_projects_header)
+ # TODO(johnsom) Move this down to the impersonate test below when the
+ # bug is resolved and the test is not skipped.
+ self.check_list_show_RBAC_enforcement(
+ 'TransferRequestClient', 'show_transfer_request', expected_allowed,
+ True, transfer_request['id'],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('5bed4582-9cfb-11eb-a160-74e5f9e2a801')
@decorators.skip_because(bug="1926572")
def test_show_transfer_request_impersonate_another_project(self):
@@ -205,6 +246,19 @@
"project_id"]
self.assertExpected(transfer_request, body, excluded_keys)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC when a transfer target project is specified.
+ expected_allowed = ['os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+ else:
+ expected_allowed.append('os_admin')
+
+ self.check_list_show_RBAC_enforcement(
+ 'TransferRequestClient', 'show_transfer_request', expected_allowed,
+ True, transfer_request['id'])
+
@decorators.idempotent_id('7d81c487-aa15-44c4-b3e5-424ab9e6a3e5')
def test_delete_transfer_request(self):
LOG.info('Create a zone')
@@ -219,6 +273,16 @@
transfer_request['id'],
ignore_errors=lib_exc.NotFound)
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+ expected_allowed.append('os_project_member')
+
+ self.check_CUD_RBAC_enforcement(
+ 'TransferRequestClient', 'delete_transfer_request',
+ expected_allowed, True, transfer_request['id'])
+
LOG.info('Delete the transfer_request')
self.client.delete_transfer_request(transfer_request['id'])
self.assertRaises(lib_exc.NotFound,
@@ -242,6 +306,28 @@
self.assertGreater(len(body['transfer_requests']), 0)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC - Users that are allowed to call list, but should get
+ # zero zones.
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_admin', 'os_project_member',
+ 'os_project_reader']
+ else:
+ expected_allowed = ['os_alt']
+
+ self.check_list_RBAC_enforcement_count(
+ 'TransferRequestClient', 'list_transfer_requests',
+ expected_allowed, 0)
+
+ # Test that users who should see the zone, can see it.
+ expected_allowed = ['os_primary']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'TransferRequestClient', 'list_transfer_requests',
+ expected_allowed, [transfer_request['id']])
+
@decorators.idempotent_id('db985892-9d02-11eb-a160-74e5f9e2a801')
def test_list_transfer_requests_all_projects(self):
LOG.info('Create a Primary zone')
@@ -293,6 +379,17 @@
"Failed, transfer request ID:{} wasn't found in "
"listed IDs{}".format(request_id, request_ids))
+ # Test RBAC with x-auth-all-projects
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'TransferRequestClient', 'list_transfer_requests',
+ expected_allowed, [primary_transfer_request['id']],
+ headers=self.all_projects_header)
+
@decorators.idempotent_id('bee42f38-e666-4b85-a710-01f40ea1e56a')
def test_list_transfer_requests_impersonate_another_project(self):
LOG.info('Create a Primary zone')
@@ -328,6 +425,17 @@
self.assertEqual([alt_transfer_request['id']], request_ids)
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'TransferRequestClient', 'list_transfer_requests',
+ expected_allowed, [primary_transfer_request['id']],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('de5e9d32-c723-4518-84e5-58da9722cc13')
def test_update_transfer_request(self):
LOG.info('Create a zone')
@@ -351,6 +459,32 @@
self.assertEqual(data['description'],
transfer_request_patch['description'])
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'TransferRequestClient', 'update_transfer_request',
+ expected_allowed, True,
+ transfer_request['id'], transfer_request_data=data)
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'TransferRequestClient', 'update_transfer_request',
+ expected_allowed, False,
+ transfer_request['id'], transfer_request_data=data,
+ headers=self.all_projects_header)
+ self.check_CUD_RBAC_enforcement(
+ 'TransferRequestClient', 'update_transfer_request',
+ expected_allowed, False,
+ transfer_request['id'], transfer_request_data=data,
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('73b754a9-e856-4fd6-80ba-e8d1b80f5dfa')
def test_list_transfer_requests_dot_json_fails(self):
uri = self.client.get_uri('transfer_requests.json')
diff --git a/designate_tempest_plugin/tests/api/v2/test_tsigkey.py b/designate_tempest_plugin/tests/api/v2/test_tsigkey.py
index ceba12e..95a4e21 100644
--- a/designate_tempest_plugin/tests/api/v2/test_tsigkey.py
+++ b/designate_tempest_plugin/tests/api/v2/test_tsigkey.py
@@ -53,7 +53,8 @@
class TsigkeyAdminTest(BaseTsigkeyTest):
- credentials = ["primary", "admin", "system_admin"]
+ credentials = ["primary", "admin", "system_admin", "system_reader",
+ "project_member", "project_reader", "alt"]
@classmethod
def setup_credentials(cls):
@@ -120,6 +121,17 @@
self.assertEqual(tsigkey_data["name"], tsigkey['name'])
self.assertEqual(tsigkey_data["scope"], 'POOL')
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'TsigkeyClient', 'create_tsigkey', expected_allowed, False,
+ tsigkey_data['resource_id'],
+ tsigkey_data['name'], tsigkey_data['algorithm'],
+ tsigkey_data['secret'], tsigkey_data['scope'])
+
@decorators.idempotent_id('d46e5e86-a18c-4315-aa0c-95a00e816fbf')
def test_list_tsigkey(self):
LOG.info('Create a resource')
@@ -133,6 +145,14 @@
body = self.admin_client.list_tsigkeys()[1]
self.assertGreater(len(body['tsigkeys']), 0)
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader']
+ self.check_list_IDs_RBAC_enforcement(
+ 'TsigkeyClient', 'list_tsigkeys', expected_allowed,
+ [tsigkey['id']])
+
@decorators.idempotent_id('d46e5e86-a18c-4315-aa0c-95a00e816fbf')
def test_list_tsigkeys_limit_results(self):
for i in range(3):
@@ -389,6 +409,15 @@
LOG.info('Ensure the fetched response matches the created tsigkey')
self.assertExpected(tsigkey, body, self.excluded_keys)
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader']
+
+ self.check_list_show_RBAC_enforcement(
+ 'TsigkeyClient', 'show_tsigkey', expected_allowed, True,
+ tsigkey['id'])
+
@decorators.idempotent_id('d09dc0dd-dd72-41ee-9085-2afb2bf35459')
def test_update_tsigkey(self):
LOG.info('Create a resource')
@@ -413,6 +442,16 @@
self.assertEqual(tsigkey_data['name'], patch_tsigkey['name'])
self.assertEqual(tsigkey_data['secret'], patch_tsigkey['secret'])
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'TsigkeyClient', 'update_tsigkey', expected_allowed, False,
+ tsigkey['id'], name=tsigkey_data['name'],
+ secret=tsigkey_data['secret'])
+
@decorators.idempotent_id('9cdffbd2-bc67-4a25-8eb7-4be8635c88a3')
def test_delete_tsigkey(self):
LOG.info('Create a resource')
@@ -424,6 +463,15 @@
LOG.info('Create a tsigkey')
tsigkey = self.admin_client.create_tsigkey(resource_id=zone['id'])[1]
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'TsigkeyClient', 'delete_tsigkey', expected_allowed, False,
+ tsigkey['id'])
+
LOG.info('Delete the tsigkey')
self.admin_client.delete_tsigkey(tsigkey['id'])
diff --git a/designate_tempest_plugin/tests/api/v2/test_zone_tasks.py b/designate_tempest_plugin/tests/api/v2/test_zone_tasks.py
index db6541e..70a2e77 100644
--- a/designate_tempest_plugin/tests/api/v2/test_zone_tasks.py
+++ b/designate_tempest_plugin/tests/api/v2/test_zone_tasks.py
@@ -61,7 +61,8 @@
class ZoneTasks(BaseZonesTest):
- credentials = ["primary", "alt", "admin", "system_admin"]
+ credentials = ["primary", "alt", "admin", "system_admin", "system_reader",
+ "project_member", "project_reader"]
@classmethod
def setup_credentials(cls):
@@ -98,6 +99,17 @@
LOG.info('Check that the zone was created on Nameserver/BIND')
waiters.wait_for_query(self.query_client, pr_zone['name'], "SOA")
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZonesClient', 'abandon_zone', expected_allowed, False,
+ pr_zone['id'],
+ headers={'x-auth-sudo-project-id': pr_zone['project_id']})
+
+ # Test abandoning the zone
LOG.info('Abandon a zone')
self.admin_client.abandon_zone(
pr_zone['id'],
diff --git a/designate_tempest_plugin/tests/api/v2/test_zones.py b/designate_tempest_plugin/tests/api/v2/test_zones.py
index aac3bda..3555c51 100644
--- a/designate_tempest_plugin/tests/api/v2/test_zones.py
+++ b/designate_tempest_plugin/tests/api/v2/test_zones.py
@@ -57,7 +57,6 @@
class ZonesTest(BaseZonesTest):
- credentials = ["admin", "system_admin", "primary"]
@classmethod
def setup_credentials(cls):
@@ -105,6 +104,24 @@
self.assertEqual(const.CREATE, zone['action'])
self.assertEqual(const.PENDING, zone['status'])
+ # Test with no extra header overrides (sudo-project-id)
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+ expected_allowed.append('os_project_member')
+
+ self.check_CUD_RBAC_enforcement('ZonesClient', 'create_zone',
+ expected_allowed, False)
+
+ # Test with x-auth-sudo-project-id header
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZonesClient', 'create_zone', expected_allowed, False,
+ project_id=self.client.project_id)
+
@decorators.idempotent_id('ec150c22-f52e-11eb-b09b-74e5f9e2a801')
def test_create_zone_validate_recordsets_created(self):
# Create a PRIMARY zone and wait till it's Active
@@ -144,6 +161,27 @@
LOG.info('Ensure the fetched response matches the created zone')
self.assertExpected(zone, body, self.excluded_keys)
+ # TODO(johnsom) Test reader roles once this bug is fixed.
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test with no extra header overrides (all_projects, sudo-project-id)
+ expected_allowed = ['os_primary']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ZonesClient', 'show_zone', expected_allowed, True, zone['id'])
+
+ # Test with x-auth-all-projects and x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ZonesClient', 'show_zone', expected_allowed, False, zone['id'],
+ headers=self.all_projects_header)
+ self.check_list_show_RBAC_enforcement(
+ 'ZonesClient', 'show_zone', expected_allowed, False, zone['id'],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('49268b24-92de-11eb-9d02-74e5f9e2a801')
def test_show_not_existing_zone(self):
LOG.info('Fetch non existing zone')
@@ -166,6 +204,26 @@
self.addCleanup(self.wait_zone_delete, self.client, zone['id'],
ignore_errors=lib_exc.NotFound)
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement('ZonesClient', 'delete_zone',
+ expected_allowed, True, zone['id'])
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement('ZonesClient', 'delete_zone',
+ expected_allowed, False, zone['id'],
+ headers=self.all_projects_header)
+ self.check_CUD_RBAC_enforcement(
+ 'ZonesClient', 'delete_zone', expected_allowed, False, zone['id'],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
LOG.info('Delete the zone')
body = self.client.delete_zone(zone['id'])[1]
@@ -194,6 +252,39 @@
# present in the response.
self.assertGreater(len(body['zones']), 0)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC - Users that are allowed to call list, but should get
+ # zero zones.
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_admin', 'os_project_member',
+ 'os_project_reader']
+ else:
+ expected_allowed = ['os_alt']
+
+ self.check_list_RBAC_enforcement_count(
+ 'ZonesClient', 'list_zones', expected_allowed, 0)
+
+ # Test that users who should see the zone, can see it.
+ expected_allowed = ['os_primary']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'ZonesClient', 'list_zones', expected_allowed, [zone['id']])
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'ZonesClient', 'list_zones', expected_allowed, [zone['id']],
+ headers=self.all_projects_header)
+ self.check_list_IDs_RBAC_enforcement(
+ 'ZonesClient', 'list_zones', expected_allowed, [zone['id']],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('123f51cb-19d5-48a9-aacc-476742c02141')
def test_update_zone(self):
LOG.info('Create a zone')
@@ -216,6 +307,29 @@
LOG.info('Ensure we respond with updated values')
self.assertEqual(description, zone['description'])
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZonesClient', 'update_zone', expected_allowed, True,
+ zone['id'], description=description)
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZonesClient', 'update_zone', expected_allowed, False,
+ zone['id'], description=description,
+ headers=self.all_projects_header)
+ self.check_CUD_RBAC_enforcement(
+ 'ZonesClient', 'update_zone', expected_allowed, False,
+ zone['id'], description=description,
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('3acddc86-62cc-4bfa-8589-b99e5d239bf2')
@decorators.skip_because(bug="1960487")
def test_serial_changes_on_update(self):
@@ -302,6 +416,29 @@
pool_nameservers, zone_nameservers,
'Failed - Pool and Zone nameservers should be the same')
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC
+ expected_allowed = ['os_primary']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ZonesClient', 'show_zone_nameservers', expected_allowed,
+ True, zone['id'])
+
+ # Test with x-auth-all-projects and x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ZonesClient', 'show_zone_nameservers', expected_allowed,
+ False, zone['id'], headers=self.all_projects_header)
+ self.check_list_show_RBAC_enforcement(
+ 'ZonesClient', 'show_zone_nameservers', expected_allowed,
+ False, zone['id'],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
class ZonesAdminTest(BaseZonesTest):
credentials = ["primary", "admin", "system_admin", "alt"]
diff --git a/designate_tempest_plugin/tests/api/v2/test_zones_exports.py b/designate_tempest_plugin/tests/api/v2/test_zones_exports.py
index 4dab500..55bd38c 100644
--- a/designate_tempest_plugin/tests/api/v2/test_zones_exports.py
+++ b/designate_tempest_plugin/tests/api/v2/test_zones_exports.py
@@ -56,7 +56,8 @@
class ZonesExportTest(BaseZoneExportsTest):
- credentials = ["primary", "admin", "system_admin", "alt"]
+ credentials = ["primary", "admin", "system_admin", "system_reader", "alt",
+ "project_member", "project_reader"]
@classmethod
def setup_credentials(cls):
@@ -92,11 +93,21 @@
@decorators.idempotent_id('2dd8a9a0-98a2-4bf6-bb51-286583b30f40')
def test_create_zone_export(self):
- zone_export = self._create_zone_export('create_zone_export')[1]
+ zone, zone_export = self._create_zone_export('create_zone_export')
LOG.info('Ensure we respond with PENDING')
self.assertEqual(const.PENDING, zone_export['status'])
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+ expected_allowed.append('os_project_member')
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZoneExportsClient', 'create_zone_export', expected_allowed, True,
+ zone['id'])
+
@decorators.attr(type='smoke')
@decorators.idempotent_id('2d29a2a9-1941-4b7e-9d8a-ad6c2140ea68')
def test_show_zone_export(self):
@@ -108,6 +119,25 @@
LOG.info('Ensure the fetched response matches the zone export')
self.assertExpected(zone_export, body, self.excluded_keys)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC
+ expected_allowed = ['os_primary']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ZoneExportsClient', 'show_zone_export', expected_allowed, True,
+ zone_export['id'])
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ZoneExportsClient', 'show_zone_export', expected_allowed, True,
+ zone_export['id'], headers=self.all_projects_header)
+
@decorators.idempotent_id('fb04507c-9600-11eb-b1cd-74e5f9e2a801')
def test_show_zone_export_impersonate_another_project(self):
LOG.info('Create a zone')
@@ -133,6 +163,17 @@
'for a primary client: {}'.format(
zone_export['id'], listed_export_ids))
+ # Test RBAC with x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ZoneExportsClient', 'show_zone_export', expected_allowed, True,
+ zone_export['id'],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('97234f00-8bcb-43f8-84dd-874f8bc4a80e')
def test_delete_zone_export(self):
LOG.info('Create a zone')
@@ -145,6 +186,29 @@
LOG.info('Create a zone export')
_, zone_export = self.client.create_zone_export(zone['id'])
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZoneExportsClient', 'delete_zone_export', expected_allowed, True,
+ zone_export['id'])
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZoneExportsClient', 'delete_zone_export', expected_allowed, False,
+ zone_export['id'], headers=self.all_projects_header)
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZoneExportsClient', 'delete_zone_export', expected_allowed, False,
+ zone_export['id'],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
LOG.info('Delete the zone export')
_, body = self.client.delete_zone_export(zone_export['id'])
@@ -155,13 +219,45 @@
@decorators.idempotent_id('476bfdfe-58c8-46e2-b376-8403c0fff440')
def test_list_zone_exports(self):
- self._create_zone_export('list_zone_exports')[1]
+ export = self._create_zone_export('list_zone_exports')[1]
LOG.info('List zone exports')
body = self.client.list_zone_exports()[1]
self.assertGreater(len(body['exports']), 0)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC - Users that are allowed to call list, but should get
+ # zero zones.
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_admin', 'os_project_member',
+ 'os_project_reader']
+ else:
+ expected_allowed = ['os_alt']
+
+ self.check_list_RBAC_enforcement_count(
+ 'ZoneExportsClient', 'list_zone_exports', expected_allowed, 0)
+
+ # Test that users who should see the zone, can see it.
+ expected_allowed = ['os_primary']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'ZoneExportsClient', 'list_zone_exports',
+ expected_allowed, [export['id']])
+
+ # Test RBAC with x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'ZoneExportsClient', 'list_zone_exports',
+ expected_allowed, [export['id']],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('f34e7f34-9613-11eb-b1cd-74e5f9e2a801')
def test_list_zone_exports_all_projects(self):
LOG.info('Create a primary zone and its export')
@@ -199,6 +295,16 @@
'Failed, expected ID:{} was not found in '
'listed IDs:{}'.format(id, listed_exports_ids))
+ # Test RBAC with x-auth-all-projects
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'ZoneExportsClient', 'list_zone_exports', expected_allowed,
+ [alt_export['id']], headers=self.all_projects_header)
+
@decorators.idempotent_id('e4a11a14-9aaa-11eb-be59-74e5f9e2a801')
def test_list_zone_exports_filter_results(self):
diff --git a/designate_tempest_plugin/tests/api/v2/test_zones_imports.py b/designate_tempest_plugin/tests/api/v2/test_zones_imports.py
index 09ff4af..34f2538 100644
--- a/designate_tempest_plugin/tests/api/v2/test_zones_imports.py
+++ b/designate_tempest_plugin/tests/api/v2/test_zones_imports.py
@@ -55,7 +55,8 @@
class ZonesImportTest(BaseZonesImportTest):
- credentials = ["primary", "admin", "system_admin", "alt"]
+ credentials = ["primary", "admin", "system_admin", "system_reader", "alt",
+ "project_member", "project_reader"]
@classmethod
def setup_credentials(cls):
@@ -97,6 +98,15 @@
waiters.wait_for_zone_import_status(
self.client, zone_import['id'], const.COMPLETE)
+ # Test with no extra header overrides (sudo-project-id)
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+ expected_allowed.append('os_project_member')
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZoneImportsClient', 'create_zone_import', expected_allowed, False)
+
@decorators.idempotent_id('31eaf25a-9532-11eb-a55d-74e5f9e2a801')
def test_create_zone_import_invalid_ttl(self):
LOG.info('Try to create a zone import using invalid TTL value')
@@ -139,6 +149,25 @@
LOG.info('Ensure the fetched response matches the expected one')
self.assertExpected(zone_import, body, self.excluded_keys)
+ # TODO(johnsom) Test reader roles once this bug is fixed.
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test with no extra header overrides (all_projects, sudo-project-id)
+ expected_allowed = ['os_primary']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ZoneImportsClient', 'show_zone_import', expected_allowed, True,
+ zone_import['id'])
+
+ # Test with x-auth-all-projects
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ZoneImportsClient', 'show_zone_import', expected_allowed, False,
+ zone_import['id'], headers=self.all_projects_header)
+
@decorators.idempotent_id('56a16e68-b241-4e41-bc5c-c40747fa68e3')
def test_delete_zone_import(self):
LOG.info('Create a zone import')
@@ -154,6 +183,28 @@
self.zone_client,
zone_import['zone_id'])
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZoneImportsClient', 'delete_zone_import', expected_allowed, True,
+ zone_import['id'])
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZoneImportsClient', 'delete_zone_import', expected_allowed, False,
+ zone_import['id'], headers=self.all_projects_header)
+ self.check_CUD_RBAC_enforcement(
+ 'ZoneImportsClient', 'delete_zone_import', expected_allowed, False,
+ zone_import['id'],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
LOG.info('Delete the zone')
resp, body = self.client.delete_zone_import(zone_import['id'])
@@ -179,6 +230,38 @@
self.assertGreater(len(body['imports']), 0)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC - Users that are allowed to call list, but should get
+ # zero zones.
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_admin', 'os_project_member',
+ 'os_project_reader']
+ else:
+ expected_allowed = ['os_alt']
+
+ self.check_list_RBAC_enforcement_count(
+ 'ZoneImportsClient', 'list_zone_imports', expected_allowed, 0)
+
+ # Test that users who should see the zone, can see it.
+ expected_allowed = ['os_primary']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'ZoneImportsClient', 'list_zone_imports', expected_allowed,
+ [zone_import['id']])
+
+ # Test RBAC with x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'ZoneImportsClient', 'list_zone_imports', expected_allowed,
+ [zone_import['id']],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('2c1fa20e-9554-11eb-a55d-74e5f9e2a801')
def test_show_import_impersonate_another_project(self):
@@ -226,6 +309,17 @@
self.assertExpected(
zone_import, resp_body['imports'][0], self.excluded_keys)
+ # Test with x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ZoneImportsClient', 'show_zone_import', expected_allowed, False,
+ zone_import['id'],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('7bd06ec6-9556-11eb-a55d-74e5f9e2a801')
def test_list_import_zones_all_projects(self):
LOG.info('Create import zone "A" using primary client')
@@ -269,3 +363,13 @@
"Failed, expected import ID:{} wasn't found in "
"listed import IDs".format(
zone_import['id'], listed_zone_import_ids))
+
+ # Test RBAC with x-auth-all-projects
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'ZoneImportsClient', 'list_zone_imports', expected_allowed,
+ [zone_import['id']], headers=self.all_projects_header)
diff --git a/designate_tempest_plugin/tests/base.py b/designate_tempest_plugin/tests/base.py
index e251ac8..091c3ca 100644
--- a/designate_tempest_plugin/tests/base.py
+++ b/designate_tempest_plugin/tests/base.py
@@ -11,13 +11,13 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-import six
from tempest import test
from tempest import config
from tempest.lib.common.utils import test_utils as utils
from designate_tempest_plugin.services.dns.query.query_client import \
QueryClient
+from designate_tempest_plugin.tests import rbac_utils
CONF = config.CONF
@@ -55,7 +55,7 @@
return False
-class BaseDnsTest(test.BaseTestCase):
+class BaseDnsTest(rbac_utils.RBACTestsMixin, test.BaseTestCase):
"""Base class for DNS tests."""
# NOTE(andreaf) credentials holds a list of the credentials to be allocated
@@ -64,9 +64,23 @@
# rest the actual roles.
# NOTE(kiall) primary will result in a manager @ cls.os_primary, alt will
# have cls.os_alt, and admin will have cls.os_admin.
- # NOTE(kiall) We should default to only primary, and request additional
- # credentials in the tests that require them.
- credentials = ['primary']
+ # NOTE(johnsom) We will allocate most credentials here so that each test
+ # can test for allowed and disallowed RBAC policies.
+ credentials = ['admin', 'primary', 'alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ credentials.extend(['system_admin', 'system_reader',
+ 'project_member', 'project_reader'])
+
+ # A tuple of credentials that will be allocated by tempest using the
+ # 'credentials' list above. These are used to build RBAC test lists.
+ allocated_creds = []
+ for cred in credentials:
+ if isinstance(cred, list):
+ allocated_creds.append('os_roles_' + cred[0])
+ else:
+ allocated_creds.append('os_' + cred)
+ # Tests shall not mess with the list of allocated credentials
+ allocated_credentials = tuple(allocated_creds)
@classmethod
def skip_checks(cls):
@@ -91,7 +105,7 @@
)
def assertExpected(self, expected, actual, excluded_keys):
- for key, value in six.iteritems(expected):
+ for key, value in expected.items():
if key not in excluded_keys:
self.assertIn(key, actual)
self.assertEqual(value, actual[key], key)
diff --git a/designate_tempest_plugin/tests/rbac_utils.py b/designate_tempest_plugin/tests/rbac_utils.py
new file mode 100644
index 0000000..fa66410
--- /dev/null
+++ b/designate_tempest_plugin/tests/rbac_utils.py
@@ -0,0 +1,368 @@
+# Copyright 2021 Red Hat, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+
+from oslo_log import log as logging
+from tempest import config
+from tempest.lib import exceptions
+from tempest import test
+
+CONF = config.CONF
+LOG = logging.getLogger(__name__)
+
+
+class RBACTestsMixin(test.BaseTestCase):
+
+ def _get_client_method(self, cred_obj, client_str, method_str):
+ """Get requested method from registered clients in Tempest."""
+ dns_clients = getattr(cred_obj, 'dns_v2')
+ client = getattr(dns_clients, client_str)
+ client_obj = client()
+ method = getattr(client_obj, method_str)
+ return method
+
+ def _get_client_project_id(self, cred_obj, client_str):
+ """Get project ID for the credential."""
+ dns_clients = getattr(cred_obj, 'dns_v2')
+ client = getattr(dns_clients, client_str)
+ client_obj = client()
+ return client_obj.project_id
+
+ def _check_allowed(self, client_str, method_str, allowed_list,
+ with_project, *args, **kwargs):
+ """Test an API call allowed RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param allowed_list: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param with_project: When true, pass the project ID to the call.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ for cred in allowed_list:
+ try:
+ cred_obj = getattr(self, cred)
+ except AttributeError:
+ # TODO(johnsom) Remove once scoped tokens is the default.
+ if ((cred == 'os_system_admin' or
+ cred == 'os_system_reader') and
+ not CONF.enforce_scope.designate):
+ LOG.info('Skipping %s allowed RBAC test because '
+ 'enforce_scope.designate is not True', cred)
+ continue
+ else:
+ self.fail('Credential {} "expected_allowed" for RBAC '
+ 'testing was not created by tempest '
+ 'credentials setup. This is likely a bug in the '
+ 'test.'.format(cred))
+ method = self._get_client_method(cred_obj, client_str, method_str)
+ project_id = self._get_client_project_id(cred_obj, client_str)
+ try:
+ if with_project:
+ method(project_id, *args, **kwargs)
+ else:
+ method(*args, **kwargs)
+ except exceptions.Forbidden as e:
+ self.fail('Method {}.{} failed to allow access via RBAC using '
+ 'credential {}. Error: {}'.format(
+ client_str, method_str, cred, str(e)))
+ except exceptions.NotFound as e:
+ self.fail('Method {}.{} failed to allow access via RBAC using '
+ 'credential {}. Error: {}'.format(
+ client_str, method_str, cred, str(e)))
+
+ def _check_disallowed(self, client_str, method_str, allowed_list,
+ expect_404, with_project, *args, **kwargs):
+ """Test an API call disallowed RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param allowed_list: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param expect_404: When True, 404 responses are considered ok.
+ :param with_project: When true, pass the project ID to the call.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ expected_disallowed = (set(self.allocated_credentials) -
+ set(allowed_list))
+ for cred in expected_disallowed:
+ cred_obj = getattr(self, cred)
+ method = self._get_client_method(cred_obj, client_str, method_str)
+ project_id = self._get_client_project_id(cred_obj, client_str)
+
+ # Unfortunately tempest uses testtools assertRaises[1] which means
+ # we cannot use the unittest assertRaises context[2] with msg= to
+ # give a useful error.
+ # Also, testtools doesn't work with subTest[3], so we can't use
+ # that to expose the failing credential.
+ # This all means the exception raised testtools assertRaises
+ # is less than useful.
+ # TODO(johnsom) Remove this try block once testtools is useful.
+ # [1] https://testtools.readthedocs.io/en/latest/
+ # api.html#testtools.TestCase.assertRaises
+ # [2] https://docs.python.org/3/library/
+ # unittest.html#unittest.TestCase.assertRaises
+ # [3] https://github.com/testing-cabal/testtools/issues/235
+ try:
+ if with_project:
+ method(project_id, *args, **kwargs)
+ else:
+ method(*args, **kwargs)
+ except exceptions.Forbidden:
+ continue
+ except exceptions.NotFound:
+ # Some APIs hide that the resource exists by returning 404
+ # on permission denied.
+ if expect_404:
+ continue
+ raise
+ self.fail('Method {}.{} failed to deny access via RBAC using '
+ 'credential {}.'.format(client_str, method_str, cred))
+
+ def check_list_show_RBAC_enforcement(self, client_str, method_str,
+ expected_allowed, expect_404,
+ *args, **kwargs):
+ """Test list or show API call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param expect_404: When True, 404 responses are considered ok.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+
+ # #### Test that disallowed credentials cannot access the API.
+ self._check_disallowed(client_str, method_str, allowed_list,
+ expect_404, False, *args, **kwargs)
+
+ # #### Test that allowed credentials can access the API.
+ self._check_allowed(client_str, method_str, allowed_list, False,
+ *args, **kwargs)
+
+ def check_list_show_with_ID_RBAC_enforcement(self, client_str, method_str,
+ expected_allowed, expect_404,
+ *args, **kwargs):
+ """Test list or show API call passing the project ID RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param expect_404: When True, 404 responses are considered ok.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+
+ # #### Test that disallowed credentials cannot access the API.
+ self._check_disallowed(client_str, method_str, allowed_list,
+ expect_404, True, *args, **kwargs)
+
+ # #### Test that allowed credentials can access the API.
+ self._check_allowed(client_str, method_str, allowed_list, True,
+ *args, **kwargs)
+
+ def check_CUD_RBAC_enforcement(self, client_str, method_str,
+ expected_allowed, expect_404,
+ *args, **kwargs):
+ """Test an API create/update/delete call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param expect_404: When True, 404 responses are considered ok.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+
+ # #### Test that disallowed credentials cannot access the API.
+ self._check_disallowed(client_str, method_str, allowed_list,
+ expect_404, False, *args, **kwargs)
+
+ def check_list_RBAC_enforcement_count(
+ self, client_str, method_str, expected_allowed, expected_count,
+ *args, **kwargs):
+ """Test an API list call RBAC enforcement result count.
+
+ List APIs will return the object list for the project associated
+ with the token used to access the API. This means most credentials
+ will have access, but will get differing results.
+
+ This test will query the list API using a list of credentials and
+ will validate that only the expected count of results are returned.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param expected_count: The number of results expected in the list
+ returned from the API.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+
+ for cred in allowed_list:
+ try:
+ cred_obj = getattr(self, cred)
+ except AttributeError:
+ # TODO(johnsom) Remove once scoped tokens is the default.
+ if ((cred == 'os_system_admin' or
+ cred == 'os_system_reader') and
+ not CONF.enforce_scope.designate):
+ LOG.info('Skipping %s allowed RBAC test because '
+ 'enforce_scope.designate is not True', cred)
+ continue
+ else:
+ self.fail('Credential {} "expected_allowed" for RBAC '
+ 'testing was not created by tempest '
+ 'credentials setup. This is likely a bug in the '
+ 'test.'.format(cred))
+ method = self._get_client_method(cred_obj, client_str, method_str)
+ try:
+ # Get the result body
+ result = method(*args, **kwargs)[1]
+ except exceptions.Forbidden as e:
+ self.fail('Method {}.{} failed to allow access via RBAC using '
+ 'credential {}. Error: {}'.format(
+ client_str, method_str, cred, str(e)))
+ # Remove the root element
+ result_objs = next(iter(result.values()))
+
+ self.assertEqual(expected_count, len(result_objs),
+ message='Credential {} saw {} objects when {} '
+ 'was expected.'.format(cred, len(result),
+ expected_count))
+
+ def check_list_IDs_RBAC_enforcement(
+ self, client_str, method_str, expected_allowed, expected_ids,
+ *args, **kwargs):
+ """Test an API list call RBAC enforcement result contains IDs.
+
+ List APIs will return the object list for the project associated
+ with the token used to access the API. This means most credentials
+ will have access, but will get differing results.
+
+ This test will query the list API using a list of credentials and
+ will validate that the expected object Ids in included in the results.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param expected_ids: The list of object IDs to validate are included
+ in the returned list from the API.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+
+ for cred in allowed_list:
+ try:
+ cred_obj = getattr(self, cred)
+ except AttributeError:
+ # TODO(johnsom) Remove once scoped tokens is the default.
+ if ((cred == 'os_system_admin' or
+ cred == 'os_system_reader') and
+ not CONF.enforce_scope.designate):
+ LOG.info('Skipping %s allowed RBAC test because '
+ 'enforce_scope.designate is not True', cred)
+ continue
+ else:
+ self.fail('Credential {} "expected_allowed" for RBAC '
+ 'testing was not created by tempest '
+ 'credentials setup. This is likely a bug in the '
+ 'test.'.format(cred))
+ method = self._get_client_method(cred_obj, client_str, method_str)
+ try:
+ # Get the result body
+ result = method(*args, **kwargs)[1]
+ except exceptions.Forbidden as e:
+ self.fail('Method {}.{} failed to allow access via RBAC using '
+ 'credential {}. Error: {}'.format(
+ client_str, method_str, cred, str(e)))
+ # Remove the root element
+ result_objs = next(iter(result.values()))
+
+ result_ids = [result_obj["id"] for result_obj in result_objs]
+ self.assertTrue(set(expected_ids).issubset(set(result_ids)))