Testing "Lists all recordsets owned by a project in Designate" API
1) test_list_all_recordsets_for_project
"x-auth-token" - Token used to identify the user from keystone
2) test_list_all_projects_recordsets.
"x-auth-all-projects" - If enabled this will show results from
all projects in Designate.
3) test_list_recordsets_impersonate_project
"x-auth-sudo-project-id" - This allows a user to impersonate
another project.
Change-Id: If7e7cd20ad50e94195ae12a75a2186ceb4b38425
diff --git a/designate_tempest_plugin/services/dns/json/base.py b/designate_tempest_plugin/services/dns/json/base.py
index d7bf614..c478654 100644
--- a/designate_tempest_plugin/services/dns/json/base.py
+++ b/designate_tempest_plugin/services/dns/json/base.py
@@ -152,16 +152,17 @@
return resp, self.deserialize(resp, body)
- def _list_request(self, resource, params=None):
+ def _list_request(self, resource, params=None, headers=None):
"""Gets a list of objects.
:param resource: The name of the REST resource, e.g., 'zones'.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
+ :param headers (dict): The headers to use for the request.
:returns: Serialized object as a dictionary.
"""
uri = self.get_uri(resource, params=params)
- resp, body = self.get(uri)
+ resp, body = self.get(uri, headers=headers)
self.expected_success(self.LIST_STATUS_CODES, resp.status)
diff --git a/designate_tempest_plugin/services/dns/v2/json/recordset_client.py b/designate_tempest_plugin/services/dns/v2/json/recordset_client.py
index 1cf281c..19f58d3 100644
--- a/designate_tempest_plugin/services/dns/v2/json/recordset_client.py
+++ b/designate_tempest_plugin/services/dns/v2/json/recordset_client.py
@@ -150,3 +150,14 @@
"""
return self._list_request(
'recordsets', params=params)
+
+ @base.handle_errors
+ def list_owned_recordsets(self, params=None, headers=None):
+ """Lists recordsets for all projects in Designate.
+ :param params: A Python dict that represents the query paramaters to
+ include in the request URI.
+ :param headers: (dict): The headers to use for the request.
+ :return: Serialized recordset as a list.
+ """
+ return self._list_request(
+ 'recordsets', params=params, headers=headers)[1]['recordsets']
diff --git a/designate_tempest_plugin/tests/api/v2/test_recordset.py b/designate_tempest_plugin/tests/api/v2/test_recordset.py
index aadb823..53d148d 100644
--- a/designate_tempest_plugin/tests/api/v2/test_recordset.py
+++ b/designate_tempest_plugin/tests/api/v2/test_recordset.py
@@ -19,6 +19,7 @@
import ddt
from designate_tempest_plugin.tests import base
+from designate_tempest_plugin.common import waiters
from designate_tempest_plugin import data_utils
LOG = logging.getLogger(__name__)
@@ -407,7 +408,7 @@
class RecordsetOwnershipTest(BaseRecordsetsTest):
- credentials = ['primary', 'alt']
+ credentials = ['primary', 'alt', 'admin']
@classmethod
def setup_credentials(cls):
@@ -421,8 +422,49 @@
cls.client = cls.os_primary.recordset_client
cls.zone_client = cls.os_primary.zones_client
- cls.alt_zone_client = cls.os_alt.zones_client
cls.alt_client = cls.os_alt.recordset_client
+ cls.alt_zone_client = cls.os_alt.zones_client
+ cls.admin_client = cls.os_admin.recordset_client
+
+ def _create_client_recordset(self, clients_list=None):
+ """Create a zone and asoociated recordset using given credentials
+
+ :param clients_list: supported credentials are: 'primary' and 'alt'.
+ :return: dictionary of created recordsets.
+ """
+ recordsets_created = {}
+ for client in clients_list:
+ if client == 'primary':
+ zone = self.zone_client.create_zone()[1]
+ self.addCleanup(self.wait_zone_delete,
+ self.zone_client,
+ zone['id'])
+ recordset_data = data_utils.rand_recordset_data(
+ record_type='A', zone_name=zone['name'])
+ resp, body = self.client.create_recordset(
+ zone['id'], recordset_data)
+ self.assertEqual('PENDING', body['status'],
+ 'Failed, expected status is PENDING')
+ waiters.wait_for_zone_status(
+ self.zone_client, zone['id'], 'ACTIVE')
+ recordset_data['project_id'] = zone['project_id']
+ recordsets_created['primary'] = recordset_data
+ if client == 'alt':
+ alt_zone = self.alt_zone_client.create_zone()[1]
+ self.addCleanup(self.wait_zone_delete,
+ self.alt_zone_client,
+ alt_zone['id'])
+ recordset_data = data_utils.rand_recordset_data(
+ record_type='A', zone_name=alt_zone['name'])
+ resp, body = self.alt_client.create_recordset(
+ alt_zone['id'], recordset_data)
+ self.assertEqual('PENDING', body['status'],
+ 'Failed, expected status is PENDING')
+ waiters.wait_for_zone_status(
+ self.alt_zone_client, alt_zone['id'], 'ACTIVE')
+ recordset_data['project_id'] = alt_zone['project_id']
+ recordsets_created['alt'] = recordset_data
+ return recordsets_created
@decorators.idempotent_id('9c0f58ad-1b31-4899-b184-5380720604e5')
def test_no_create_recordset_by_alt_tenant(self):
@@ -483,3 +525,56 @@
recordset_data
)
)
+
+ @decorators.idempotent_id('4d0ff972-7c19-11eb-b331-74e5f9e2a801')
+ def test_list_all_recordsets_for_project(self):
+ # Create recordsets using "primary" and "alt" credentials.
+ # Execute "list_owned_recordsets" API to list "primary" recordsets.
+ # Validate that the only "project_id" retrieved within the API is
+ # a "primary" project.
+ primary_project_id = self._create_client_recordset(
+ ['primary', 'alt'])['primary']['project_id']
+ recordsets = self.client.list_owned_recordsets()
+ LOG.info('Received by API recordsets are {} '.format(recordsets))
+ project_ids_api = set([item['project_id'] for item in recordsets])
+ self.assertEqual(
+ {primary_project_id}, project_ids_api,
+ 'Failed, unique project_ids {} are not as expected {}'.format(
+ project_ids_api, primary_project_id))
+
+ @decorators.idempotent_id('bc0af248-7b4f-11eb-98a5-74e5f9e2a801')
+ def test_list_all_projects_recordsets(self):
+ # Create recordsets using "primary" and "alt" credentials.
+ # Execute "list_owned_recordsets" API using admin client to list
+ # recordsets for all projects.
+ # Validate that project_ids of: "primary" and "alt" projects
+ # are both listed in received API response.
+ project_ids_used = [
+ item['project_id'] for item in self._create_client_recordset(
+ ['primary', 'alt']).values()]
+ recordsets = self.admin_client.list_owned_recordsets(
+ headers={'x-auth-all-projects': True})
+ LOG.info('Received by API recordsets are {} '.format(recordsets))
+ project_ids_api = set([item['project_id'] for item in recordsets])
+ for prj_id in project_ids_used:
+ self.assertIn(
+ prj_id, project_ids_api,
+ 'Failed, project_id:{} is missing in received recordsets'
+ ' for all projects {} '.format(prj_id, project_ids_api))
+
+ @decorators.idempotent_id('910eb17e-7c3a-11eb-a40b-74e5f9e2a801')
+ def test_list_recordsets_impersonate_project(self):
+ # Create recordsets using "primary" and "alt" credentials.
+ # Use admin client to impersonate "primary" project.
+ # Validate that received recordsets are all associated with
+ # expected("primary") project only.
+ primary_project_id = self._create_client_recordset(
+ ['primary', 'alt'])['primary']['project_id']
+ recordsets = self.admin_client.list_owned_recordsets(
+ headers={'x-auth-sudo-project-id': primary_project_id})
+ LOG.info('Received by API recordsets are {} '.format(recordsets))
+ project_ids_api = set([item['project_id'] for item in recordsets])
+ self.assertEqual(
+ {primary_project_id}, project_ids_api,
+ 'Failed, unique project_ids {} are not as expected {}'.format(
+ project_ids_api, primary_project_id))