Merge remote-tracking branch 'origin/master' into myoga
Change-Id: Ic2337bdac4086d39ca5d1725d8e8b70cd9ac0e1f
diff --git a/.zuul.yaml b/.zuul.yaml
index cf8323b..d40d786 100644
--- a/.zuul.yaml
+++ b/.zuul.yaml
@@ -1,4 +1,9 @@
- job:
+ name: designate-bind9-stable-yoga
+ parent: designate-bind9
+ override-checkout: stable/yoga
+
+- job:
name: designate-bind9-stable-xena
parent: designate-bind9
override-checkout: stable/xena
@@ -8,16 +13,6 @@
parent: designate-bind9
override-checkout: stable/wallaby
-- job:
- name: designate-bind9-stable-victoria
- parent: designate-bind9
- override-checkout: stable/victoria
-
-- job:
- name: designate-bind9-stable-ussuri
- parent: designate-bind9
- override-checkout: stable/ussuri
-
- project:
templates:
- designate-devstack-jobs
@@ -27,10 +22,8 @@
- release-notes-jobs-python3
check:
jobs:
+ - designate-bind9-stable-yoga
- designate-bind9-stable-xena
- designate-bind9-stable-wallaby
- - designate-bind9-stable-victoria
- - designate-bind9-stable-ussuri
- neutron-tempest-plugin-designate-scenario
- gate:
- queue: designate
+ queue: designate
diff --git a/designate_tempest_plugin/common/constants.py b/designate_tempest_plugin/common/constants.py
index 4f6c72b..57aeab9 100644
--- a/designate_tempest_plugin/common/constants.py
+++ b/designate_tempest_plugin/common/constants.py
@@ -12,15 +12,31 @@
# License for the specific language governing permissions and limitations
# under the License.
-# Designate statuses strings
+# Designate statuses/actions strings
PENDING = 'PENDING'
COMPLETE = 'COMPLETE'
ERROR = 'ERROR'
DELETED = 'DELETED'
+DELETE = 'DELETE'
ACTIVE = 'ACTIVE'
+INACTIVE = 'INACTIVE'
UP = 'UP'
CREATE = 'CREATE'
+UPDATE = 'UPDATE'
+NONE = 'NONE'
# Zone types
PRIMARY_ZONE_TYPE = 'PRIMARY'
SECONDARY_ZONE_TYPE = 'SECONDARY'
+
+# DNS Record types
+A = 'A'
+AAAA = 'AAAA'
+ALIAS = 'ALIAS'
+CNAME = 'CNAME'
+MX = 'MX'
+NS = 'NS'
+PTR = 'PTR'
+SOA = 'SOA'
+SRV = 'SRV'
+TXT = 'TXT'
diff --git a/designate_tempest_plugin/common/waiters.py b/designate_tempest_plugin/common/waiters.py
index a606fda..affe332 100644
--- a/designate_tempest_plugin/common/waiters.py
+++ b/designate_tempest_plugin/common/waiters.py
@@ -33,7 +33,7 @@
time.sleep(client.build_interval)
try:
- _, zone = client.show_zone(zone_id)
+ zone = client.show_zone(zone_id)[1]
except lib_exc.NotFound:
LOG.info('Zone %s is 404ing', zone_id)
return
@@ -62,12 +62,12 @@
"""Waits for a zone to reach given status."""
LOG.info('Waiting for zone %s to reach %s', zone_id, status)
- _, zone = client.show_zone(zone_id, headers=headers)
+ zone = client.show_zone(zone_id, headers=headers)[1]
start = int(time.time())
while zone['status'] != status:
time.sleep(client.build_interval)
- _, zone = client.show_zone(zone_id, headers=headers)
+ zone = client.show_zone(zone_id, headers=headers)[1]
status_curr = zone['status']
if status_curr == status:
LOG.info('Zone %s reached %s', zone_id, status)
@@ -98,12 +98,12 @@
"""Waits for an imported zone to reach the given status."""
LOG.info('Waiting for zone import %s to reach %s', zone_import_id, status)
- _, zone_import = client.show_zone_import(zone_import_id)
+ zone_import = client.show_zone_import(zone_import_id)[1]
start = int(time.time())
while zone_import['status'] != status:
time.sleep(client.build_interval)
- _, zone_import = client.show_zone_import(zone_import_id)
+ zone_import = client.show_zone_import(zone_import_id)[1]
status_curr = zone_import['status']
if status_curr == status:
LOG.info('Zone import %s reached %s', zone_import_id, status)
@@ -131,16 +131,17 @@
raise lib_exc.TimeoutException(message)
-def wait_for_zone_export_status(client, zone_export_id, status):
+def wait_for_zone_export_status(client, zone_export_id, status, headers=None):
"""Waits for an exported zone to reach the given status."""
LOG.info('Waiting for zone export %s to reach %s', zone_export_id, status)
- _, zone_export = client.show_zone_export(zone_export_id)
+ zone_export = client.show_zone_export(zone_export_id, headers=headers)[1]
start = int(time.time())
while zone_export['status'] != status:
time.sleep(client.build_interval)
- _, zone_export = client.show_zone_export(zone_export_id)
+ zone_export = client.show_zone_export(
+ zone_export_id, headers=headers)[1]
status_curr = zone_export['status']
if status_curr == status:
LOG.info('Zone export %s reached %s', zone_export_id, status)
@@ -168,17 +169,20 @@
raise lib_exc.TimeoutException(message)
-def wait_for_recordset_status(client, zone_id, recordset_id, status):
+def wait_for_recordset_status(
+ client, zone_id, recordset_id, status, headers=None):
"""Waits for a recordset to reach the given status."""
LOG.info('Waiting for recordset %s to reach %s',
recordset_id, status)
- _, recordset = client.show_recordset(zone_id, recordset_id)
+ recordset = client.show_recordset(
+ zone_id, recordset_id, headers=headers)[1]
start = int(time.time())
while recordset['status'] != status:
time.sleep(client.build_interval)
- _, recordset = client.show_recordset(zone_id, recordset_id)
+ recordset = client.show_recordset(
+ zone_id, recordset_id, headers=headers)[1]
status_curr = recordset['status']
if status_curr == status:
LOG.info('Recordset %s reached %s', recordset_id, status)
diff --git a/designate_tempest_plugin/config.py b/designate_tempest_plugin/config.py
index 99c9a04..f4ca708 100644
--- a/designate_tempest_plugin/config.py
+++ b/designate_tempest_plugin/config.py
@@ -41,7 +41,7 @@
default=360,
help="Timeout in seconds to wait for an resource to build."),
cfg.IntOpt('min_ttl',
- default=1,
+ default=0,
help="The minimum value to respect when generating ttls"),
cfg.ListOpt('nameservers',
default=[],
@@ -51,8 +51,10 @@
help="The timeout on a single dns query to a nameserver"),
cfg.StrOpt('zone_id',
help="The target zone to test the dns recordsets "
- "If it is not specified, a new zone will be created ")
-
+ "If it is not specified, a new zone will be created "),
+ cfg.StrOpt('tld_suffix',
+ default='test',
+ help="TLD suffix that used in all tests (if not overridden).")
]
dns_feature_group = cfg.OptGroup(name='dns_feature_enabled',
@@ -79,6 +81,10 @@
default=True,
help="Is https://bugs.launchpad.net/designate/+bug/1573141 "
"fixed"),
+ cfg.BoolOpt('bug_1932026_fixed',
+ default=False,
+ help="Is https://bugs.launchpad.net/designate/+bug/1932026 "
+ "fixed"),
# Note: Also see the enforce_scope section (from tempest) for Designate API
# scope checking setting.
cfg.BoolOpt('enforce_new_defaults',
diff --git a/designate_tempest_plugin/data_utils.py b/designate_tempest_plugin/data_utils.py
index 4b5d24d..d148685 100644
--- a/designate_tempest_plugin/data_utils.py
+++ b/designate_tempest_plugin/data_utils.py
@@ -35,7 +35,7 @@
return an.format(netaddr.ipv6_compact)
-def rand_zone_name(name='', prefix='rand', suffix='.com.'):
+def rand_zone_name(name='', prefix='rand', suffix=None):
"""Generate a random zone name
:param str name: The name that you want to include
:param prefix: the exact text to start the string. Defaults to "rand"
@@ -43,6 +43,8 @@
:return: a random zone name e.g. example.org.
:rtype: string
"""
+ if suffix is None:
+ suffix = '.{}.'.format(CONF.dns.tld_suffix)
name = data_utils.rand_name(name=name, prefix=prefix)
return name + suffix
@@ -56,7 +58,7 @@
return 'example@%s' % domain.rstrip('.')
-def rand_ttl(start=1, end=86400):
+def rand_ttl(start=0, end=86400):
"""Generate a random TTL value
:return: a random ttl e.g. 165
:rtype: string
@@ -67,7 +69,6 @@
def rand_zonefile_data(name=None, ttl=None):
"""Generate random zone data, with optional overrides
-
:return: A ZoneModel
"""
zone_base = ('$ORIGIN &\n& # IN SOA ns.& nsadmin.& # # # # #\n'
@@ -105,11 +106,11 @@
def rand_zone_data(name=None, email=None, ttl=None, description=None):
"""Generate random zone data, with optional overrides
-
:return: A ZoneModel
"""
if name is None:
- name = rand_zone_name(prefix='testdomain', suffix='.com.')
+ name = rand_zone_name(
+ prefix='testdomain', suffix='.{}.'.format(CONF.dns.tld_suffix))
if email is None:
email = ("admin@" + name).strip('.')
if description is None:
@@ -124,15 +125,16 @@
def rand_recordset_data(record_type, zone_name, name=None, records=None,
- ttl=None):
+ ttl=None, number_of_records=None):
"""Generate random recordset data, with optional overrides
-
:return: A RecordsetModel
"""
if name is None:
name = rand_zone_name(prefix=record_type, suffix='.' + zone_name)
if records is None:
records = [rand_ip()]
+ if number_of_records:
+ records = [rand_ip() for r in range(number_of_records)]
if ttl is None:
ttl = rand_ttl()
return {
@@ -202,7 +204,7 @@
def wildcard_ns_recordset(zone_name):
name = "*.{0}".format(zone_name)
- records = ["ns.example.com."]
+ records = ["ns.example.{}.".format(CONF.dns.tld_suffix)]
return rand_recordset_data('NS', zone_name, name, records)
@@ -216,6 +218,19 @@
return ns_records
+def rand_soa_records(number_of_records=2):
+ return ['{} {} {} {} {} {}.'.format(
+ '{}.{}.{}'.format(rand_string(3), rand_string(7), rand_string(3)),
+ random.randint(1000000000, 2020080302), random.randint(3000, 7200),
+ random.randint(1000, 3600), random.randint(1000000, 1209600),
+ random.randint(1000, 3600)) for i in range(0, number_of_records)]
+
+
+def rand_soa_recordset(zone_name, **kwargs):
+ return rand_recordset_data(
+ 'SOA', zone_name, records=rand_soa_records(), **kwargs)
+
+
def rand_tld():
data = {
"name": rand_zone_name(prefix='tld', suffix='')
@@ -225,7 +240,6 @@
def rand_transfer_request_data(description=None, target_project_id=None):
"""Generate random transfer request data, with optional overrides
-
:return: A TransferRequest data
"""
@@ -255,7 +269,6 @@
"""Create a rand recordset by type
This essentially just dispatches to the relevant random recordset
creation functions.
-
:param str zone_name: The zone name the recordset applies to
:param str record_type: The type of recordset (ie A, MX, NS, etc...)
"""
@@ -266,7 +279,6 @@
def rand_string(size):
"""Create random string of ASCII chars by size
-
:param int size - length os the string to be create
:return - random creates string of ASCII lover characters
"""
@@ -275,7 +287,6 @@
def rand_domain_name(tld=None):
"""Create random valid domain name
-
:param tld (optional) - TLD that will be used to random domain name
:return - valid domain name, for example: paka.zbabun.iuh
"""
diff --git a/designate_tempest_plugin/services/dns/json/base.py b/designate_tempest_plugin/services/dns/json/base.py
index d484ac9..6ac9a85 100644
--- a/designate_tempest_plugin/services/dns/json/base.py
+++ b/designate_tempest_plugin/services/dns/json/base.py
@@ -17,8 +17,7 @@
from oslo_serialization import jsonutils as json
from tempest.lib.common import rest_client
from tempest.lib import exceptions as lib_exc
-from six.moves.urllib import parse as urllib
-import six
+from urllib import parse as urllib_parse
from designate_tempest_plugin.common import models
@@ -58,7 +57,7 @@
DELETE_STATUS_CODES = []
def serialize(self, data):
- if isinstance(data, six.string_types):
+ if isinstance(data, str):
return data
return json.dumps(data)
@@ -106,7 +105,7 @@
else:
uuid = '/%s' % uuid if uuid else ''
- params = '?%s' % urllib.urlencode(params) if params else ''
+ params = '?%s' % urllib_parse.urlencode(params) if params else ''
return uri_pattern.format(pref=self.uri_prefix,
res=resource_name,
@@ -191,20 +190,29 @@
return resp, self.deserialize(resp, body)
- def _put_request(self, resource, uuid, data, params=None):
+ def _put_request(self, resource, uuid, data, params=None,
+ headers=None, extra_headers=False):
"""Updates the specified object using PUT request.
:param resource: The name of the REST resource, e.g., 'zones'.
:param uuid: Unique identifier of the object in UUID format.
:param data: A Python dict that represents an object of the
specified type (to be serialized) or a plain string which
is sent as-is.
+ :param headers (dict): The headers to use for the request.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
+ :param headers (dict): The headers to use for the request.
+ :param extra_headers (bool): Boolean value than indicates if the
+ headers returned by the get_headers()
+ method are to be used but additional
+ headers are needed in the request
+ pass them in as a dict.
:returns: Serialized object as a dictionary.
"""
body = self.serialize(data)
uri = self.get_uri(resource, uuid=uuid, params=params)
- resp, body = self.put(uri, body=body)
+ resp, body = self.put(
+ uri, body=body, headers=headers, extra_headers=extra_headers)
self.expected_success(self.PUT_STATUS_CODES, resp.status)
diff --git a/designate_tempest_plugin/services/dns/query/query_client.py b/designate_tempest_plugin/services/dns/query/query_client.py
index da1d1b0..ce9c7c1 100644
--- a/designate_tempest_plugin/services/dns/query/query_client.py
+++ b/designate_tempest_plugin/services/dns/query/query_client.py
@@ -14,7 +14,6 @@
import dns
import dns.exception
import dns.query
-import six
from tempest import config
CONF = config.CONF
@@ -51,7 +50,7 @@
@classmethod
def _prepare_query(cls, zone_name, rdatatype):
# support plain strings: "SOA", "A"
- if isinstance(rdatatype, six.string_types):
+ if isinstance(rdatatype, str):
rdatatype = dns.rdatatype.from_text(rdatatype)
dns_message = dns.message.make_query(zone_name, rdatatype)
dns_message.set_opcode(dns.opcode.QUERY)
diff --git a/designate_tempest_plugin/services/dns/v2/json/quotas_client.py b/designate_tempest_plugin/services/dns/v2/json/quotas_client.py
index 97398c0..1b1d005 100644
--- a/designate_tempest_plugin/services/dns/v2/json/quotas_client.py
+++ b/designate_tempest_plugin/services/dns/v2/json/quotas_client.py
@@ -58,17 +58,29 @@
return resp, body
@base.handle_errors
- def show_quotas(self, project_id, params=None, headers=None):
+ def show_quotas(self, project_id=None, params=None, headers=None):
"""Gets a specific quota.
- :param project_id: Show the quotas of this project id
+ :param project_id: if provided - show the quotas of this project id.
+ https://docs.openstack.org/api-ref/dns/?expanded=
+ get-the-name-servers-for-a-zone-detail#view-quotas
+ If not - show the quotas for a current
+ project.
+ https://docs.openstack.org/api-ref/dns/?expanded=ge
+ t-the-name-servers-for-a-zone-detail#view-current-p
+ roject-s-quotas
+
:param params: A Python dict that represents the query paramaters to
include in the request URI.
:param headers (dict): The headers to use for the request.
:return: Serialized quota as a dictionary.
"""
- return self._show_request('quotas', project_id, params=params,
- headers=headers, extra_headers=True)
+ if project_id is None:
+ return self._show_request(
+ 'quotas', uuid=None, params=params, headers=headers)
+ else:
+ return self._show_request(
+ 'quotas', project_id, params=params, headers=headers)
@base.handle_errors
def delete_quotas(self, project_id, params=None, headers=None):
@@ -100,13 +112,10 @@
:param headers (dict): The headers to use for the request.
:return: Serialized quota as a dictionary.
"""
- if headers is None:
- headers = {'content-type': 'application/json'}
- if 'content-type' not in [header.lower() for header in headers]:
- headers['content-type'] = 'application/json'
resp, body = self._update_request(
"quotas", project_id,
- data=quotas, params=params, headers=headers)
+ data=quotas, params=params, headers=headers,
+ extra_headers=True)
self.expected_success(200, resp.status)
return resp, body
diff --git a/designate_tempest_plugin/services/dns/v2/json/recordset_client.py b/designate_tempest_plugin/services/dns/v2/json/recordset_client.py
index 33e9ee2..4440705 100644
--- a/designate_tempest_plugin/services/dns/v2/json/recordset_client.py
+++ b/designate_tempest_plugin/services/dns/v2/json/recordset_client.py
@@ -35,31 +35,30 @@
@base.handle_errors
def create_recordset(self, zone_uuid, recordset_data,
- params=None, wait_until=False):
+ params=None, wait_until=False, headers=None):
"""Create a recordset for the specified zone.
-
:param zone_uuid: Unique identifier of the zone in UUID format..
:param recordset_data: A dictionary that represents the recordset
data.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
+ :param headers (dict): The headers to use for the request.
:return: A tuple with the server response and the created zone.
"""
resp, body = self._create_request(
"/zones/{0}/recordsets".format(zone_uuid), params=params,
- data=recordset_data)
-
+ data=recordset_data, headers=headers)
# Create Recordset should Return a HTTP 202
self.expected_success(202, resp.status)
-
if wait_until:
- waiters.wait_for_recordset_status(self, body['id'], wait_until)
-
+ waiters.wait_for_recordset_status(
+ self, zone_uuid, body['id'], wait_until, headers=headers)
return resp, body
@base.handle_errors
def update_recordset(self, zone_uuid, recordset_uuid,
- recordet_data, params=None):
+ recordet_data, params=None,
+ headers=None, extra_headers=None):
"""Update the recordset related to the specified zone.
:param zone_uuid: Unique identifier of the zone in UUID format.
:param recordset_uuid: Unique identifier of the recordset in UUID
@@ -68,11 +67,18 @@
data.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
+ :param headers (dict): The headers to use for the request.
+ :param extra_headers (bool): Boolean value than indicates if the
+ headers returned by the get_headers()
+ method are to be used but additional
+ headers are needed in the request
+ pass them in as a dict.
:return: A tuple with the server response and the created zone.
"""
resp, body = self._put_request(
'zones/{0}/recordsets'.format(zone_uuid), recordset_uuid,
- data=recordet_data, params=params)
+ data=recordet_data, params=params,
+ headers=headers, extra_headers=extra_headers)
# Update Recordset should Return a HTTP 202, or a 200 if the recordset
# is already active
@@ -97,17 +103,20 @@
params=params, headers=headers)
@base.handle_errors
- def delete_recordset(self, zone_uuid, recordset_uuid, params=None):
+ def delete_recordset(self, zone_uuid, recordset_uuid, params=None,
+ headers=None):
"""Deletes a recordset related to the specified zone UUID.
:param zone_uuid: The unique identifier of the zone.
:param recordset_uuid: The unique identifier of the record in
uuid format.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
+ :param headers (dict): The headers to use for the request.
:return: A tuple with the server response and the response body.
"""
resp, body = self._delete_request(
- 'zones/{0}/recordsets'.format(zone_uuid), recordset_uuid)
+ 'zones/{0}/recordsets'.format(zone_uuid), recordset_uuid,
+ params=params, headers=headers)
# Delete Recordset should Return a HTTP 202
self.expected_success(202, resp.status)
diff --git a/designate_tempest_plugin/services/dns/v2/json/service_client.py b/designate_tempest_plugin/services/dns/v2/json/service_client.py
index 267e7a5..87da51a 100644
--- a/designate_tempest_plugin/services/dns/v2/json/service_client.py
+++ b/designate_tempest_plugin/services/dns/v2/json/service_client.py
@@ -25,3 +25,14 @@
"""
return self._list_request(
'service_statuses', headers=headers)[1]['service_statuses']
+
+ @base.handle_errors
+ def show_statuses(self, uuid, headers=None):
+ """Show Service status
+
+ :param headers: (dict): The headers to use for the request.
+ :param uuid: service ID
+ :return: Service status dictionary
+ """
+ return self._show_request(
+ 'service_statuses', uuid, headers=headers)[1]
diff --git a/designate_tempest_plugin/services/dns/v2/json/transfer_accepts_client.py b/designate_tempest_plugin/services/dns/v2/json/transfer_accepts_client.py
index e09f775..9ee060d 100644
--- a/designate_tempest_plugin/services/dns/v2/json/transfer_accepts_client.py
+++ b/designate_tempest_plugin/services/dns/v2/json/transfer_accepts_client.py
@@ -19,20 +19,25 @@
@base.handle_errors
def create_transfer_accept(self, transfer_accept_data,
- params=None, headers=None):
+ params=None, headers=None, extra_headers=None):
"""Create a zone transfer_accept.
:param transfer_accept_data: A python dictionary representing
data for the zone transfer accept.
:param params: A Python dict that represents the query paramaters to
include in the accept URI.
:param headers (dict): The headers to use for the request.
+ :param extra_headers (bool): Boolean value than indicates if the
+ headers returned by the get_headers()
+ method are to be used but additional
+ headers are needed in the request
+ pass them in as a dict.
:return: Serialized accepted zone transfer as a dictionary.
"""
transfer_accept_uri = 'zones/tasks/transfer_accepts'
resp, body = self._create_request(
transfer_accept_uri, transfer_accept_data,
- params=params, headers=headers)
+ params=params, headers=headers, extra_headers=extra_headers)
# Create Transfer accept should Return a HTTP 201
self.expected_success(201, resp.status)
@@ -61,5 +66,4 @@
:return: List of accepted zone transfers
"""
return self._list_request(
- 'zones/tasks/transfer_accepts', params=params,
- headers=headers)[1]['transfer_accepts']
+ 'zones/tasks/transfer_accepts', params=params, headers=headers)
diff --git a/designate_tempest_plugin/services/dns/v2/json/transfer_request_client.py b/designate_tempest_plugin/services/dns/v2/json/transfer_request_client.py
index 9523175..e2d35e2 100644
--- a/designate_tempest_plugin/services/dns/v2/json/transfer_request_client.py
+++ b/designate_tempest_plugin/services/dns/v2/json/transfer_request_client.py
@@ -103,7 +103,7 @@
@base.handle_errors
def update_transfer_request(self, uuid, transfer_request_data=None,
- params=None):
+ params=None, headers=None):
"""Update a zone transfer_requests.
:param uuid: Unique identifier of the zone transfer request in UUID
format.
@@ -111,13 +111,15 @@
data for zone transfer request
:param params: A Python dict that represents the query paramaters to
include in the request URI.
+ :param headers (dict): The headers to use for the request.
:return: Serialized imported zone as a dictionary.
"""
transfer_request_uri = 'zones/tasks/transfer_requests'
transfer_request_data = (transfer_request_data or
dns_data_utils.rand_transfer_request_data())
resp, body = self._update_request(
- transfer_request_uri, uuid, transfer_request_data, params=params)
+ transfer_request_uri, uuid, transfer_request_data, params=params,
+ headers=headers)
# Create Transfer request should Return a HTTP 200
self.expected_success(200, resp.status)
diff --git a/designate_tempest_plugin/services/dns/v2/json/tsigkey_client.py b/designate_tempest_plugin/services/dns/v2/json/tsigkey_client.py
index 683c1bb..61c632a 100644
--- a/designate_tempest_plugin/services/dns/v2/json/tsigkey_client.py
+++ b/designate_tempest_plugin/services/dns/v2/json/tsigkey_client.py
@@ -28,8 +28,9 @@
:param resource_id: Pool id or Zone id.
:param name: name of the tsigkey.
:param algorithm: TSIG algorithm e.g hmac-md5, hmac-sha256 etc.
- :param secret: represents TSIG secret.
- :param scope: represents TSIG scope.
+ :param secret: represents TSIG secret. If provided value is empty
+ it will use empty string (Needed for negative testing)
+ :param scope: represents TSIG scope. Default is ZONE
:param params: A Python dict that represents the query paramaters to
include in the request URI.
:return: A tuple with the server response and the created tsigkey.
@@ -38,9 +39,12 @@
"name": name or data_utils.rand_name('test-tsig'),
"algorithm": algorithm or utils.rand_tsig_algorithm(),
"secret": secret or data_utils.rand_name("secret"),
- "scope": scope or utils.rand_tsig_scope(),
+ "scope": scope or 'ZONE',
"resource_id": resource_id}
+ if secret == '':
+ tsig['secret'] = ''
+
resp, body = self._create_request('tsigkeys', data=tsig,
params=params)
@@ -49,23 +53,28 @@
return resp, body
@base.handle_errors
- def list_tsigkeys(self, params=None):
+ def list_tsigkeys(self, params=None, headers=None):
"""Gets a list of tsigkeys.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
+ :param headers (dict): The headers to use for the request.
:return: Serialized tsigkeys as a list.
"""
- return self._list_request('tsigkeys', params=params)
+ return self._list_request('tsigkeys', params=params, headers=headers)
@base.handle_errors
- def show_tsigkey(self, uuid, params=None):
+ def show_tsigkey(self, uuid=None, params=None, headers=None):
"""Gets a specific tsigkey.
:param uuid: Unique identifier of the tsigkey in UUID format.
+ Default value is None (it's possible to use "marker" in
+ URL query instead of UUID)
:param params: A Python dict that represents the query paramaters to
include in the request URI.
+ :param headers (dict): The headers to use for the request.
:return: Serialized tsigkey as a dictionary.
"""
- return self._show_request('tsigkeys', uuid, params=params)
+ return self._show_request(
+ 'tsigkeys', uuid, params=params, headers=headers)
@base.handle_errors
def update_tsigkey(self, uuid, name=None, algorithm=None,
diff --git a/designate_tempest_plugin/services/dns/v2/json/zone_exports_client.py b/designate_tempest_plugin/services/dns/v2/json/zone_exports_client.py
index 4057b7e..5089d36 100644
--- a/designate_tempest_plugin/services/dns/v2/json/zone_exports_client.py
+++ b/designate_tempest_plugin/services/dns/v2/json/zone_exports_client.py
@@ -19,7 +19,8 @@
class ZoneExportsClient(base.DnsClientV2Base):
@base.handle_errors
- def create_zone_export(self, uuid, params=None, wait_until=False):
+ def create_zone_export(self, uuid, params=None,
+ wait_until=False, headers=None):
"""Create a zone export.
:param uuid: Unique identifier of the zone in UUID format.
@@ -27,18 +28,20 @@
include in the request URI.
:param wait_until: Block until the exported zone reaches the
desired status
+ :param headers (dict): The headers to use for the request.
:return: Serialized imported zone as a dictionary.
"""
export_uri = 'zones/{0}/tasks/export'.format(uuid)
resp, body = self._create_request(
- export_uri, params=params)
+ export_uri, params=params, headers=headers)
# Create Zone Export should Return a HTTP 202
self.expected_success(202, resp.status)
if wait_until:
- waiters.wait_for_zone_export_status(self, body['id'], wait_until)
+ waiters.wait_for_zone_export_status(
+ self, body['id'], wait_until, headers=headers)
return resp, body
@@ -100,16 +103,17 @@
'zones/tasks/exports', params=params, headers=headers)
@base.handle_errors
- def delete_zone_export(self, uuid, params=None):
+ def delete_zone_export(self, uuid, params=None, headers=None):
"""Deletes the zone export task with the specified UUID.
:param uuid: The unique identifier of the exported zone.
:param params: A Python dict that represents the query parameters to
include in the request URI.
+ :param headers (dict): The headers to use for the request.
:return: A tuple with the server response and the response body.
"""
resp, body = self._delete_request(
- 'zones/tasks/exports', uuid, params=params)
+ 'zones/tasks/exports', uuid, params=params, headers=headers)
# Delete Zone export should Return a HTTP 204
self.expected_success(204, resp.status)
diff --git a/designate_tempest_plugin/services/dns/v2/json/zone_imports_client.py b/designate_tempest_plugin/services/dns/v2/json/zone_imports_client.py
index 86d9fb1..236e737 100644
--- a/designate_tempest_plugin/services/dns/v2/json/zone_imports_client.py
+++ b/designate_tempest_plugin/services/dns/v2/json/zone_imports_client.py
@@ -21,15 +21,16 @@
@base.handle_errors
def create_zone_import(self, zonefile_data=None,
- params=None, wait_until=None):
+ wait_until=None, headers=None):
"""Create a zone import.
:param zonefile_data: A tuple that represents zone data.
- :param params: A Python dict that represents the query paramaters to
- include in the request URI.
+ :param wait_until: If not None, a waiter for appropriate status
+ will be activated.
+ :param headers (dict): The headers to use for the request.
:return: Serialized imported zone as a dictionary.
"""
-
- headers = {'Content-Type': 'text/dns'}
+ if not headers:
+ headers = {'Content-Type': 'text/dns'}
zone_data = zonefile_data or dns_data_utils.rand_zonefile_data()
resp, body = self._create_request(
'zones/tasks/imports', zone_data, headers=headers)
@@ -66,15 +67,16 @@
'zones/tasks/imports', params=params, headers=headers)
@base.handle_errors
- def delete_zone_import(self, uuid, params=None):
+ def delete_zone_import(self, uuid, params=None, headers=None):
"""Deletes a imported zone having the specified UUID.
:param uuid: The unique identifier of the imported zone.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
+ :param headers (dict): The headers to use for the request.
:return: A tuple with the server response and the response body.
"""
resp, body = self._delete_request(
- 'zones/tasks/imports', uuid, params=params)
+ 'zones/tasks/imports', uuid, params=params, headers=headers)
# Delete Zone should Return a HTTP 204
self.expected_success(204, resp.status)
diff --git a/designate_tempest_plugin/services/dns/v2/json/zones_client.py b/designate_tempest_plugin/services/dns/v2/json/zones_client.py
index c30d24e..4cb6016 100644
--- a/designate_tempest_plugin/services/dns/v2/json/zones_client.py
+++ b/designate_tempest_plugin/services/dns/v2/json/zones_client.py
@@ -111,16 +111,17 @@
'zones', uuid, params=params, headers=headers)
@base.handle_errors
- def show_zone_nameservers(self, zone_uuid, params=None):
+ def show_zone_nameservers(self, zone_uuid, params=None, headers=None):
"""Gets list of Zone Name Servers
:param zone_uuid: Unique identifier of the zone in UUID format.
:param params: A Python dict that represents the query paramaters to
include in the request URI.
+ :param headers (dict): The headers to use for the request.
:return: Serialized nameservers as a list.
"""
return self._show_request(
'zones/{0}/nameservers'.format(zone_uuid), uuid=None,
- params=params)
+ params=params, headers=headers)
@base.handle_errors
def list_zones(self, params=None, headers=None):
@@ -151,7 +152,8 @@
@base.handle_errors
def update_zone(self, uuid, email=None, ttl=None,
- description=None, wait_until=False, params=None):
+ description=None, wait_until=False, params=None,
+ headers=None):
"""Update a zone with the specified parameters.
:param uuid: The unique identifier of the zone.
:param email: The email for the zone.
@@ -163,6 +165,7 @@
:param wait_until: Block until the zone reaches the desiered status
:param params: A Python dict that represents the query paramaters to
include in the request URI.
+ :param headers (dict): The headers to use for the request.
:return: A tuple with the server response and the updated zone.
"""
zone = {
@@ -171,7 +174,8 @@
'description': description or data_utils.rand_name('test-zone'),
}
- resp, body = self._update_request('zones', uuid, zone, params=params)
+ resp, body = self._update_request('zones', uuid, zone, params=params,
+ headers=headers)
# Update Zone should Return a HTTP 202
self.expected_success(202, resp.status)
diff --git a/designate_tempest_plugin/tests/api/v2/test_blacklists.py b/designate_tempest_plugin/tests/api/v2/test_blacklists.py
index 95688b3..ecb5ce5 100644
--- a/designate_tempest_plugin/tests/api/v2/test_blacklists.py
+++ b/designate_tempest_plugin/tests/api/v2/test_blacklists.py
@@ -30,7 +30,6 @@
class BlacklistsAdminTest(BaseBlacklistsTest):
- credentials = ["admin", "system_admin", "primary"]
@classmethod
def setup_credentials(cls):
# Do not create network resources for these test.
@@ -59,8 +58,14 @@
self.assertExpected(blacklist, body, self.excluded_keys)
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+
+ self.check_CUD_RBAC_enforcement('BlacklistsClient', 'create_blacklist',
+ expected_allowed, False)
+
@decorators.idempotent_id('ea608152-da3c-11eb-b8b8-74e5f9e2a801')
- @decorators.skip_because(bug="1934252")
def test_create_blacklist_invalid_pattern(self):
patterns = ['', '#(*&^%$%$#@$', 'a' * 1000]
for pattern in patterns:
@@ -95,6 +100,14 @@
LOG.info('Ensure the fetched response matches the created blacklist')
self.assertExpected(blacklist, body, self.excluded_keys)
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader']
+
+ self.check_list_show_RBAC_enforcement(
+ 'BlacklistsClient', 'show_blacklist', expected_allowed, False,
+ blacklist['id'])
+
@decorators.idempotent_id('dcea40d9-8d36-43cb-8440-4a842faaef0d')
def test_delete_blacklist(self):
LOG.info('Create a blacklist')
@@ -108,6 +121,14 @@
# A blacklist delete returns an empty body
self.assertEqual(body.strip(), b"")
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+
+ self.check_CUD_RBAC_enforcement(
+ 'BlacklistsClient', 'delete_blacklist', expected_allowed, False,
+ blacklist['id'])
+
@decorators.idempotent_id('3a2a1e6c-8176-428c-b5dd-d85217c0209d')
def test_list_blacklists(self):
LOG.info('Create a blacklist')
@@ -120,6 +141,14 @@
# TODO(pglass): Assert that the created blacklist is in the response
self.assertGreater(len(body['blacklists']), 0)
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'BlacklistsClient', 'list_blacklists',
+ expected_allowed, [blacklist['id']])
+
@decorators.idempotent_id('0063d6ad-9557-49c7-b521-e64a14d4d0d0')
def test_update_blacklist(self):
LOG.info('Create a blacklist')
@@ -139,6 +168,14 @@
self.assertEqual(pattern, body['pattern'])
self.assertEqual(description, body['description'])
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+
+ self.check_CUD_RBAC_enforcement(
+ 'BlacklistsClient', 'update_blacklist', expected_allowed, False,
+ uuid=blacklist['id'], pattern=pattern, description=description)
+
class TestBlacklistNotFoundAdmin(BaseBlacklistsTest):
diff --git a/designate_tempest_plugin/tests/api/v2/test_designate_limits.py b/designate_tempest_plugin/tests/api/v2/test_designate_limits.py
index 2cb9d9e..102f168 100644
--- a/designate_tempest_plugin/tests/api/v2/test_designate_limits.py
+++ b/designate_tempest_plugin/tests/api/v2/test_designate_limits.py
@@ -24,7 +24,8 @@
class DesignateLimit(base.BaseDnsV2Test):
- credentials = ["admin", "system_admin", "primary", "alt"]
+ credentials = ["admin", "system_admin", "system_reader", "primary", "alt",
+ "project_member", "project_reader"]
@classmethod
def setup_credentials(cls):
@@ -102,3 +103,14 @@
project_id, received_project_ids,
'Failed, expected project_id:{} is missing in:{} '.format(
project_id, received_project_ids))
+
+ @decorators.idempotent_id('fc57fa6b-5280-4186-9be9-ff4da0961db0')
+ def test_list_designate_limits_RBAC(self):
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.extend(['os_system_admin', 'os_system_reader',
+ 'os_project_member', 'os_project_reader'])
+
+ self.check_list_show_RBAC_enforcement(
+ 'DesignateLimitClient', 'list_designate_limits',
+ expected_allowed, False)
diff --git a/designate_tempest_plugin/tests/api/v2/test_pool.py b/designate_tempest_plugin/tests/api/v2/test_pool.py
index 60af204..144f0d6 100644
--- a/designate_tempest_plugin/tests/api/v2/test_pool.py
+++ b/designate_tempest_plugin/tests/api/v2/test_pool.py
@@ -36,7 +36,8 @@
class PoolAdminTest(BasePoolTest):
- credentials = ["admin", "system_admin"]
+ credentials = ["admin", "primary", "system_admin", "system_reader",
+ "project_member", "project_reader", "alt"]
@classmethod
def setup_credentials(cls):
@@ -72,6 +73,16 @@
self.assertEqual(pool_data["name"], pool['name'])
self.assertExpected(pool_data, pool, self.excluded_keys)
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'PoolClient', 'create_pool', expected_allowed, False,
+ pool_name=pool_data["name"], ns_records=pool_data["ns_records"],
+ project_id=pool_data["project_id"])
+
@decorators.idempotent_id('e80eb70a-8ee5-40eb-b06e-599597a8ab7e')
def test_show_pool(self):
LOG.info('Create a pool')
@@ -88,6 +99,20 @@
self._assertExpectedNSRecords(pool["ns_records"], body["ns_records"],
expected_key="priority")
+ # TODO(johnsom) Test reader roles once this bug is fixed.
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ # TODO(johnsom) The pools API seems inconsistent with the requirement
+ # of the all-projects header.
+ self.check_list_show_RBAC_enforcement(
+ 'PoolClient', 'show_pool', expected_allowed, True, pool['id'],
+ headers=self.all_projects_header)
+
@decorators.idempotent_id('d8c4c377-5d88-452d-a4d2-c004d72e1abe')
def test_delete_pool(self):
LOG.info('Create a pool')
@@ -104,6 +129,14 @@
lambda: self.admin_client.show_pool(
pool['id'], headers=self.all_projects_header))
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'PoolClient', 'delete_pool', expected_allowed, False, pool['id'])
+
@decorators.idempotent_id('77c85b40-83b2-4c17-9fbf-e6d516cfce90')
def test_list_pools(self):
LOG.info('Create a pool')
@@ -117,6 +150,18 @@
self.assertGreater(len(body['pools']), 0)
+ # TODO(johnsom) Test reader roles once this bug is fixed.
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'PoolClient', 'list_pools', expected_allowed, [pool['id']],
+ headers=self.all_projects_header)
+
@decorators.idempotent_id('fdcc84ce-af65-4af6-a5fc-6c50acbea0f0')
def test_update_pool(self):
LOG.info('Create a pool')
@@ -131,6 +176,15 @@
self.assertEqual("foo", patch_pool["name"])
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'PoolClient', 'update_pool', expected_allowed, True,
+ pool['id'], pool_name="test-name")
+
@decorators.idempotent_id('41ad6a84-00ce-4a04-9fd5-b7c15c31e2db')
def test_list_pools_dot_json_fails(self):
uri = self.admin_client.get_uri('pools.json')
diff --git a/designate_tempest_plugin/tests/api/v2/test_ptrs.py b/designate_tempest_plugin/tests/api/v2/test_ptrs.py
index b094a5d..a72b61a 100644
--- a/designate_tempest_plugin/tests/api/v2/test_ptrs.py
+++ b/designate_tempest_plugin/tests/api/v2/test_ptrs.py
@@ -16,6 +16,7 @@
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
+import testtools
from designate_tempest_plugin.tests import base
from designate_tempest_plugin.common import constants as const
@@ -27,13 +28,34 @@
CONF = config.CONF
LOG = logging.getLogger(__name__)
-TLD = dns_data_utils.rand_string(3)
-
class BasePtrTest(base.BaseDnsV2Test):
excluded_keys = ['created_at', 'updated_at', 'version', 'links',
'status', 'action']
+ @classmethod
+ def setup_clients(cls):
+ super(BasePtrTest, cls).setup_clients()
+
+ if CONF.enforce_scope.designate:
+ cls.admin_tld_client = cls.os_system_admin.dns_v2.TldClient()
+ else:
+ cls.admin_tld_client = cls.os_admin.dns_v2.TldClient()
+
+ @classmethod
+ def resource_setup(cls):
+ super(BasePtrTest, cls).resource_setup()
+
+ # Make sure we have an allowed TLD available
+ tld_name = dns_data_utils.rand_zone_name(name='BasePtrTest')
+ cls.tld_name = tld_name[:-1]
+ cls.class_tld = cls.admin_tld_client.create_tld(tld_name=tld_name[:-1])
+
+ @classmethod
+ def resource_cleanup(cls):
+ cls.admin_tld_client.delete_tld(cls.class_tld[1]['id'])
+ super(BasePtrTest, cls).resource_cleanup()
+
class DesignatePtrRecord(BasePtrTest, tempest.test.BaseTestCase):
@@ -55,8 +77,24 @@
cls.primary_ptr_client = cls.os_primary.dns_v2.PtrClient()
cls.primary_floating_ip_client = cls.os_primary.floating_ips_client
+ @classmethod
+ def resource_setup(cls):
+ super(DesignatePtrRecord, cls).resource_setup()
+
+ # The 'arpa' TLD is a special case as the negative test class also
+ # needs to use this space. To stop test class concurrency conflicts,
+ # let each class manage different TLDs for the reverse namespace.
+ cls.arpa_tld = cls.admin_tld_client.create_tld(tld_name='arpa')
+
+ @classmethod
+ def resource_cleanup(cls):
+ cls.admin_tld_client.delete_tld(cls.arpa_tld[1]['id'])
+ super(DesignatePtrRecord, cls).resource_cleanup()
+
def _set_ptr(self, ptr_name=None, ttl=None, description=None,
- headers=None, tld=TLD, fip_id=None):
+ headers=None, tld=None, fip_id=None):
+ if not tld:
+ tld = self.tld_name
if not fip_id:
fip = self.primary_floating_ip_client.create_floatingip(
floating_network_id=CONF.network.public_network_id)[
@@ -67,7 +105,8 @@
ptr = self.primary_ptr_client.set_ptr_record(
fip_id, ptr_name=ptr_name, ttl=ttl, description=description,
headers=headers, tld=tld)
- self.addCleanup(self.primary_ptr_client.unset_ptr_record, fip_id)
+ self.addCleanup(self.unset_ptr, self.primary_ptr_client, fip_id)
+
self.assertEqual('CREATE', ptr['action'])
self.assertEqual('PENDING', ptr['status'])
waiters.wait_for_ptr_status(
@@ -77,7 +116,7 @@
def _unset_ptr(self, fip_id):
self.primary_ptr_client.unset_ptr_record(fip_id)
waiters.wait_for_ptr_status(
- self.primary_ptr_client, fip_id=fip_id, status=const.DELETED)
+ self.primary_ptr_client, fip_id=fip_id, status=const.INACTIVE)
@decorators.idempotent_id('2fb9d6ea-871d-11eb-9f9a-74e5f9e2a801')
def test_set_floatingip_ptr(self):
@@ -137,7 +176,8 @@
'Failed, expected ID was not found in "received_ptr_ids" list.')
@decorators.idempotent_id('499b5a7e-87e1-11eb-b412-74e5f9e2a801')
- @decorators.skip_because(bug="1932026")
+ @testtools.skipUnless(config.CONF.dns_feature_enabled.bug_1932026_fixed,
+ 'Skip unless bug 1932026 has been fixed.')
def test_unset_floatingip_ptr(self):
fip_id, ptr = self._set_ptr()
self._unset_ptr(fip_id)
@@ -145,7 +185,7 @@
class DesignatePtrRecordNegative(BasePtrTest, tempest.test.BaseTestCase):
- credentials = ['primary', 'admin']
+ credentials = ['primary', 'admin', 'system_admin']
@classmethod
def setup_credentials(cls):
@@ -160,8 +200,25 @@
cls.primary_floating_ip_client = cls.os_primary.floating_ips_client
cls.admin_ptr_client = cls.os_admin.dns_v2.PtrClient()
+ @classmethod
+ def resource_setup(cls):
+ super(DesignatePtrRecordNegative, cls).resource_setup()
+
+ # The 'arpa' TLD is a special case as the positive test class also
+ # needs to use this space. To stop test class concurrency conflicts,
+ # let each class manage different TLDs for the reverse namespace.
+ cls.in_addr_arpa_tld = cls.admin_tld_client.create_tld(
+ tld_name='in-addr.arpa')
+
+ @classmethod
+ def resource_cleanup(cls):
+ cls.admin_tld_client.delete_tld(cls.in_addr_arpa_tld[1]['id'])
+ super(DesignatePtrRecordNegative, cls).resource_cleanup()
+
def _set_ptr(self, ptr_name=None, ttl=None, description=None,
- headers=None, tld=TLD, fip_id=None):
+ headers=None, tld=None, fip_id=None):
+ if not tld:
+ tld = self.tld_name
if not fip_id:
fip = self.primary_floating_ip_client.create_floatingip(
floating_network_id=CONF.network.public_network_id)[
@@ -172,7 +229,7 @@
ptr = self.primary_ptr_client.set_ptr_record(
fip_id, ptr_name=ptr_name, ttl=ttl, description=description,
headers=headers, tld=tld)
- self.addCleanup(self.primary_ptr_client.unset_ptr_record, fip_id)
+ self.addCleanup(self.unset_ptr, self.primary_ptr_client, fip_id)
self.assertEqual('CREATE', ptr['action'])
self.assertEqual('PENDING', ptr['status'])
waiters.wait_for_ptr_status(
diff --git a/designate_tempest_plugin/tests/api/v2/test_quotas.py b/designate_tempest_plugin/tests/api/v2/test_quotas.py
index db870af..37e07e3 100644
--- a/designate_tempest_plugin/tests/api/v2/test_quotas.py
+++ b/designate_tempest_plugin/tests/api/v2/test_quotas.py
@@ -15,7 +15,7 @@
from tempest import config
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
-from tempest.lib.common.utils import data_utils as tempest_data_utils
+from tempest.lib.common.utils import data_utils
from designate_tempest_plugin.tests import base
from designate_tempest_plugin import data_utils as dns_data_utils
@@ -23,10 +23,14 @@
CONF = config.CONF
LOG = logging.getLogger(__name__)
+quotas_types = ["api_export_size", "recordset_records",
+ "zone_records", "zone_recordsets", "zones"]
+
class QuotasV2Test(base.BaseDnsV2Test):
- credentials = ["primary", "admin", "system_admin", "alt"]
+ credentials = ["primary", "admin", "system_admin", "system_reader", "alt",
+ "project_member", "project_reader"]
@classmethod
def setup_credentials(cls):
@@ -53,7 +57,6 @@
cls.admin_client = cls.os_admin.dns_v2.QuotasClient()
cls.quotas_client = cls.os_primary.dns_v2.QuotasClient()
cls.alt_client = cls.os_alt.dns_v2.QuotasClient()
- cls.alt_zone_client = cls.os_alt.dns_v2.ZonesClient()
def _store_quotas(self, project_id, cleanup=True):
"""Remember current quotas and reset them after the test"""
@@ -68,45 +71,72 @@
@decorators.idempotent_id('1dac991a-9e2e-452c-a47a-26ac37381ec5')
def test_show_quotas(self):
- self._store_quotas(project_id=self.quotas_client.project_id)
- LOG.info("Updating quotas")
- quotas = dns_data_utils.rand_quotas()
- _, body = self.admin_client.update_quotas(
- project_id=self.quotas_client.project_id,
- headers=self.all_projects_header,
- **quotas)
+ LOG.info("Show default quotas, validate all quota types exists and "
+ "their values are integers.")
+ for user in ['primary', 'admin']:
+ if user == 'primary':
+ body = self.quotas_client.show_quotas()[1]
+ if user == 'admin':
+ body = self.admin_client.show_quotas(
+ project_id=self.quotas_client.project_id,
+ headers=self.all_projects_header)[1]
+ for quota_type in quotas_types:
+ self.assertIn(
+ quota_type, body.keys(),
+ 'Failed, expected quota type:{} was not found '
+ 'in received quota body'.format(quota_type))
+ for quota_type, quota_value in body.items():
+ self.assertTrue(
+ isinstance(quota_value, int),
+ 'Failed, the value of:{} is:{}, expected integer'.format(
+ quota_type, quota_value))
- LOG.info("Fetching quotas")
- _, body = self.admin_client.show_quotas(
- project_id=self.quotas_client.project_id,
- headers=self.all_projects_header)
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.extend(['os_system_admin', 'os_system_reader',
+ 'os_project_member', 'os_project_reader'])
- LOG.info("Ensuring the response has all quota types")
- self.assertExpected(quotas, body, [])
+ self.check_list_show_with_ID_RBAC_enforcement(
+ 'QuotasClient', 'show_quotas', expected_allowed, False)
@decorators.idempotent_id('0448b089-5803-4ce3-8a6c-5c15ff75a2cc')
- def test_delete_quotas(self):
+ def test_reset_quotas(self):
self._store_quotas(project_id=self.quotas_client.project_id)
- LOG.info("Deleting quotas")
- _, body = self.admin_client.delete_quotas(
+
+ LOG.info("Deleting (reset) quotas")
+
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.extend(['os_system_admin'])
+
+ self.check_CUD_RBAC_enforcement(
+ 'QuotasClient', 'delete_quotas', expected_allowed, False,
+ project_id=self.quotas_client.project_id)
+
+ body = self.admin_client.delete_quotas(
project_id=self.quotas_client.project_id,
- headers=self.all_projects_header)
+ headers=self.all_projects_header)[1]
LOG.info("Ensuring an empty response body")
self.assertEqual(body.strip(), b"")
@decorators.idempotent_id('76d24c87-1b39-4e19-947c-c08e1380dc61')
def test_update_quotas(self):
- if CONF.enforce_scope.designate:
- raise self.skipException(
- "System scoped tokens do not have a project_id.")
-
- self._store_quotas(project_id=self.admin_client.project_id)
+ self._store_quotas(project_id=self.quotas_client.project_id)
LOG.info("Updating quotas")
quotas = dns_data_utils.rand_quotas()
- _, body = self.admin_client.update_quotas(
- project_id=self.admin_client.project_id,
- **quotas)
+ body = self.admin_client.update_quotas(
+ project_id=self.quotas_client.project_id,
+ **quotas, headers=self.all_projects_header)[1]
+
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.extend(['os_system_admin'])
+
+ self.check_CUD_RBAC_enforcement(
+ 'QuotasClient', 'update_quotas', expected_allowed, False,
+ project_id=self.quotas_client.project_id,
+ **quotas, headers=self.all_projects_header)
LOG.info("Ensuring the response has all quota types")
self.assertExpected(quotas, body, [])
@@ -121,15 +151,15 @@
quotas = dns_data_utils.rand_quotas()
request = quotas.copy()
- _, body = self.admin_client.update_quotas(
+ body = self.admin_client.update_quotas(
project_id=project_id,
headers=self.all_projects_header,
- **request)
+ **request)[1]
LOG.info("Ensuring the response has all quota types")
self.assertExpected(quotas, body, [])
- _, client_body = self.quotas_client.show_quotas(project_id=project_id)
+ client_body = self.quotas_client.show_quotas(project_id=project_id)[1]
self.assertExpected(quotas, client_body, [])
@@ -145,15 +175,15 @@
project_id=project_id,
headers=self.all_projects_header)
- _, default_quotas = self.admin_client.show_quotas(
+ default_quotas = self.admin_client.show_quotas(
project_id=project_id,
- headers=self.all_projects_header)
+ headers=self.all_projects_header)[1]
LOG.info("Updating quotas for %s ", project_id)
quotas = dns_data_utils.rand_quotas()
request = quotas.copy()
- _, body = self.admin_client.update_quotas(
+ self.admin_client.update_quotas(
project_id=project_id,
headers=self.all_projects_header,
**request)
@@ -162,9 +192,9 @@
project_id=project_id,
headers=self.all_projects_header)
- _, final_quotas = self.admin_client.show_quotas(
+ final_quotas = self.admin_client.show_quotas(
project_id=project_id,
- headers=self.all_projects_header)
+ headers=self.all_projects_header)[1]
self.assertExpected(default_quotas, final_quotas, [])
@@ -188,16 +218,45 @@
**request)
LOG.info("Make sure that the quotas weren't changed")
- _, client_body = self.quotas_client.show_quotas(
- project_id=self.quotas_client.project_id)
+ client_body = self.quotas_client.show_quotas(
+ project_id=self.quotas_client.project_id)[1]
self.assertExpected(original_quotas, client_body, [])
+
+class QuotasV2TestNegative(base.BaseDnsV2Test):
+
+ credentials = ["primary", "admin", "system_admin"]
+
+ @classmethod
+ def setup_credentials(cls):
+ # Do not create network resources for these test.
+ cls.set_network_resources()
+ super(QuotasV2TestNegative, cls).setup_credentials()
+
+ @classmethod
+ def skip_checks(cls):
+ super(QuotasV2TestNegative, cls).skip_checks()
+
+ if not CONF.dns_feature_enabled.api_v2_quotas:
+ skip_msg = ("%s skipped as designate V2 Quotas API is not "
+ "available" % cls.__name__)
+ raise cls.skipException(skip_msg)
+
+ @classmethod
+ def setup_clients(cls):
+ super(QuotasV2TestNegative, cls).setup_clients()
+
+ if CONF.enforce_scope.designate:
+ cls.admin_client = cls.os_system_admin.dns_v2.QuotasClient()
+ else:
+ cls.admin_client = cls.os_admin.dns_v2.QuotasClient()
+ cls.quotas_client = cls.os_primary.dns_v2.QuotasClient()
+
@decorators.idempotent_id('ae82a0ba-da60-11eb-bf12-74e5f9e2a801')
def test_admin_sets_quota_for_a_project(self):
primary_project_id = self.quotas_client.project_id
- http_headers_to_use = [
- {'X-Auth-All-Projects': True},
+ http_headers_to_use = [self.all_projects_header,
{'x-auth-sudo-project-id': primary_project_id}]
for http_header in http_headers_to_use:
@@ -240,21 +299,32 @@
lib_exc.Forbidden, self.quotas_client.set_quotas,
project_id=self.quotas_client.project_id,
quotas=dns_data_utils.rand_quotas(),
- headers={'x-auth-all-projects': True})
+ headers=self.all_projects_header)
@decorators.idempotent_id('a6ce5b46-dcce-11eb-903e-74e5f9e2a801')
- @decorators.skip_because(bug="1934596")
def test_admin_sets_invalid_quota_values(self):
primary_project_id = self.quotas_client.project_id
- http_header = {'X-Auth-All-Projects': True}
- for item in ['zones', 'zone_records',
- 'zone_recordsets', 'recordset_records']:
+ for item in quotas_types:
quota = dns_data_utils.rand_quotas()
- quota[item] = tempest_data_utils.rand_name()
+ quota[item] = data_utils.rand_name()
self.assertRaises(
lib_exc.BadRequest, self.admin_client.set_quotas,
project_id=primary_project_id,
quotas=quota,
- headers=http_header)
+ headers=self.all_projects_header)
+
+ @decorators.idempotent_id('ac212fd8-c602-11ec-b042-201e8823901f')
+ def test_admin_sets_not_existing_quota_type(self):
+
+ LOG.info('Try to set quota using not existing quota type in its body')
+ primary_project_id = self.quotas_client.project_id
+ quota = dns_data_utils.rand_quotas()
+ quota[data_utils.rand_name()] = 777
+
+ with self.assertRaisesDns(
+ lib_exc.BadRequest, 'invalid_object', 400):
+ self.admin_client.set_quotas(
+ project_id=primary_project_id,
+ quotas=quota, headers=self.all_projects_header)
diff --git a/designate_tempest_plugin/tests/api/v2/test_recordset.py b/designate_tempest_plugin/tests/api/v2/test_recordset.py
index 38a2dfd..72495d3 100644
--- a/designate_tempest_plugin/tests/api/v2/test_recordset.py
+++ b/designate_tempest_plugin/tests/api/v2/test_recordset.py
@@ -15,14 +15,14 @@
from tempest import config
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
-from tempest.lib.common.utils import data_utils as lib_data_utils
+from tempest.lib.common.utils import data_utils
import ddt
from designate_tempest_plugin.tests import base
from designate_tempest_plugin.common import constants as const
from designate_tempest_plugin.common import waiters
-from designate_tempest_plugin import data_utils
+from designate_tempest_plugin import data_utils as dns_data_utils
CONF = config.CONF
LOG = logging.getLogger(__name__)
@@ -33,25 +33,41 @@
'type']
@classmethod
+ def setup_clients(cls):
+ super(BaseRecordsetsTest, cls).setup_clients()
+ if CONF.enforce_scope.designate:
+ cls.admin_tld_client = cls.os_system_admin.dns_v2.TldClient()
+ else:
+ cls.admin_tld_client = cls.os_admin.dns_v2.TldClient()
+
+ @classmethod
def resource_setup(cls):
super(BaseRecordsetsTest, cls).resource_setup()
+ # Make sure we have an allowed TLD available
+ tld_name = dns_data_utils.rand_zone_name(name="BaseRecordsetsTest")
+ cls.tld_name = f".{tld_name}"
+ cls.class_tld = cls.admin_tld_client.create_tld(tld_name=tld_name[:-1])
+
# All the recordset tests need a zone, create one to share
- LOG.info('Create a zone')
- _, cls.zone = cls.zone_client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(name="TestZone",
+ suffix=cls.tld_name)
+ LOG.info('Create a zone: %s', zone_name)
+ cls.zone = cls.zone_client.create_zone(name=zone_name)[1]
@classmethod
def resource_cleanup(cls):
cls.zone_client.delete_zone(
cls.zone['id'], ignore_errors=lib_exc.NotFound)
-
+ cls.admin_tld_client.delete_tld(cls.class_tld[1]['id'])
super(BaseRecordsetsTest, cls).resource_cleanup()
@ddt.ddt
class RecordsetsTest(BaseRecordsetsTest):
- credentials = ["admin", "system_admin", "primary", "alt"]
+ credentials = ["admin", "system_admin", "system_reader", "primary", "alt",
+ "project_member", "project_reader"]
@classmethod
def setup_credentials(cls):
@@ -76,9 +92,19 @@
@decorators.attr(type='smoke')
@decorators.idempotent_id('631d74fd-6909-4684-a61b-5c4d2f92c3e7')
def test_create_recordset(self):
- recordset_data = data_utils.rand_recordset_data(
+ recordset_data = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=self.zone['name'])
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+ expected_allowed.append('os_project_member')
+
+ self.check_CUD_RBAC_enforcement(
+ 'RecordsetClient', 'create_recordset', expected_allowed, True,
+ self.zone['id'], recordset_data)
+
LOG.info('Create a Recordset')
resp, body = self.client.create_recordset(
self.zone['id'], recordset_data)
@@ -127,16 +153,18 @@
@decorators.idempotent_id('6c22a3f9-3f4d-4b32-bdf2-5237851ed25e')
def test_create_recordset_type_SRV_TCP(self):
self._test_create_recordset_type(
- "_sip._tcp", "SRV", ["10 60 5060 server1.example.com.",
- "20 60 5060 server2.example.com.",
- "20 30 5060 server3.example.com."])
+ "_sip._tcp", "SRV", [
+ "10 60 5060 server1.example{}".format(self.tld_name),
+ "20 60 5060 server2.example{}".format(self.tld_name),
+ "20 30 5060 server3.example{}".format(self.tld_name)])
@decorators.idempotent_id('59c1aa42-278e-4f7b-a6a1-4320d5daf1fd')
def test_create_recordset_type_SRV_UDP(self):
self._test_create_recordset_type(
- "_sip._udp", "SRV", ["10 60 5060 server1.example.com.",
- "10 60 5060 server2.example.com.",
- "20 30 5060 server3.example.com."])
+ "_sip._udp", "SRV", [
+ "10 60 5060 server1.example{}".format(self.tld_name),
+ "10 60 5060 server2.example{}".format(self.tld_name),
+ "20 30 5060 server3.example{}".format(self.tld_name)])
@decorators.idempotent_id('1ac46f94-f03a-4f85-b84f-826a2660b927')
def test_create_recordset_type_CNAME(self):
@@ -190,99 +218,202 @@
@decorators.idempotent_id('5964f730-5546-46e6-9105-5030e9c492b2')
def test_list_recordsets(self):
- recordset_data = data_utils.rand_recordset_data(
+ recordset_data = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=self.zone['name'])
LOG.info('Create a Recordset')
resp, body = self.client.create_recordset(
self.zone['id'], recordset_data)
+ recordset_id = body['id']
self.addCleanup(
self.wait_recordset_delete, self.client,
- self.zone['id'], body['id'])
+ self.zone['id'], recordset_id)
LOG.info('List zone recordsets')
- _, body = self.client.list_recordset(self.zone['id'])
+ body = self.client.list_recordset(self.zone['id'])[1]
self.assertGreater(len(body), 0)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC
+ expected_allowed = ['os_primary']
+
+ self.check_list_show_RBAC_enforcement(
+ 'RecordsetClient', 'list_recordset', expected_allowed, True,
+ self.zone['id'])
+
+ # Test that users who should see the zone, can see it.
+ expected_allowed = ['os_primary']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'RecordsetClient', 'list_recordset',
+ expected_allowed, [recordset_id], self.zone['id'])
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'RecordsetClient', 'list_recordset', expected_allowed,
+ [recordset_id], self.zone['id'], headers=self.all_projects_header)
+ self.check_list_IDs_RBAC_enforcement(
+ 'RecordsetClient', 'list_recordset',
+ expected_allowed, [recordset_id], self.zone['id'],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('84c13cb2-9020-4c1e-aeb0-c348d9a70caa')
def test_show_recordsets(self):
- recordset_data = data_utils.rand_recordset_data(
+ recordset_data = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=self.zone['name'])
LOG.info('Create a Recordset')
resp, body = self.client.create_recordset(
self.zone['id'], recordset_data)
+ recordset_id = body['id']
self.addCleanup(
self.wait_recordset_delete, self.client,
- self.zone['id'], body['id'])
+ self.zone['id'], recordset_id)
LOG.info('Re-Fetch the Recordset')
- _, record = self.client.show_recordset(self.zone['id'], body['id'])
+ record = self.client.show_recordset(self.zone['id'], recordset_id)[1]
LOG.info('Ensure the fetched response matches the expected one')
self.assertExpected(body, record, self.excluded_keys)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC
+ expected_allowed = ['os_primary']
+
+ self.check_list_show_RBAC_enforcement(
+ 'RecordsetClient', 'show_recordset', expected_allowed, True,
+ self.zone['id'], recordset_id)
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'RecordsetClient', 'show_recordset', expected_allowed, True,
+ self.zone['id'], recordset_id, headers=self.all_projects_header)
+ self.check_list_show_RBAC_enforcement(
+ 'RecordsetClient', 'show_recordset', expected_allowed, True,
+ self.zone['id'], recordset_id,
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('855399c1-8806-4ae5-aa31-cb8a6f35e218')
def test_delete_recordset(self):
- recordset_data = data_utils.rand_recordset_data(
+ recordset_data = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=self.zone['name'])
LOG.info('Create a Recordset')
- _, record = self.client.create_recordset(
- self.zone['id'], recordset_data)
+ record = self.client.create_recordset(
+ self.zone['id'], recordset_data)[1]
+ recordset_id = record['id']
self.addCleanup(
self.wait_recordset_delete, self.client,
- self.zone['id'], record['id'])
+ self.zone['id'], recordset_id)
+
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'RecordsetClient', 'delete_recordset', expected_allowed, True,
+ self.zone['id'], recordset_id)
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'RecordsetClient', 'delete_recordset', expected_allowed, False,
+ self.zone['id'], recordset_id, headers=self.all_projects_header)
+
+ self.check_CUD_RBAC_enforcement(
+ 'RecordsetClient', 'delete_recordset', expected_allowed, False,
+ self.zone['id'], recordset_id,
+ headers={'x-auth-sudo-project-id': self.client.project_id})
LOG.info('Delete a Recordset')
- _, body = self.client.delete_recordset(self.zone['id'], record['id'])
+ self.client.delete_recordset(self.zone['id'], recordset_id)
LOG.info('Ensure successful deletion of Recordset')
self.assertRaises(lib_exc.NotFound,
- lambda: self.client.show_recordset(self.zone['id'], record['id']))
+ lambda: self.client.show_recordset(self.zone['id'], recordset_id))
@decorators.idempotent_id('8d41c85f-09f9-48be-a202-92d1bdf5c796')
def test_update_recordset(self):
- recordset_data = data_utils.rand_recordset_data(
+ recordset_data = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=self.zone['name'])
LOG.info('Create a recordset')
- _, record = self.client.create_recordset(
- self.zone['id'], recordset_data)
+ record = self.client.create_recordset(
+ self.zone['id'], recordset_data)[1]
+ recordset_id = record['id']
self.addCleanup(
self.wait_recordset_delete, self.client,
- self.zone['id'], record['id'])
+ self.zone['id'], recordset_id)
- recordset_data = data_utils.rand_recordset_data(
+ recordset_data = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=self.zone['name'], name=record['name'])
LOG.info('Update the recordset')
- _, update = self.client.update_recordset(self.zone['id'],
- record['id'], recordset_data)
+ update = self.client.update_recordset(self.zone['id'],
+ recordset_id, recordset_data)[1]
self.assertEqual(record['name'], update['name'])
self.assertNotEqual(record['records'], update['records'])
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'RecordsetClient', 'update_recordset', expected_allowed, True,
+ self.zone['id'], recordset_id, recordset_data)
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'RecordsetClient', 'update_recordset', expected_allowed, False,
+ self.zone['id'], recordset_id, recordset_data,
+ headers=self.all_projects_header)
+ self.check_CUD_RBAC_enforcement(
+ 'RecordsetClient', 'update_recordset', expected_allowed, False,
+ self.zone['id'], recordset_id, recordset_data,
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('60904cc5-148b-4e3b-a0c6-35656dc8d44c')
def test_update_recordset_one_field(self):
- recordset_data = data_utils.rand_recordset_data(
+ recordset_data = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=self.zone['name'])
LOG.info('Create a recordset')
- _, record = self.client.create_recordset(
- self.zone['id'], recordset_data)
+ record = self.client.create_recordset(
+ self.zone['id'], recordset_data)[1]
self.addCleanup(
self.wait_recordset_delete, self.client,
self.zone['id'], record['id'])
recordset_data = {
- 'ttl': data_utils.rand_ttl(start=record['ttl'] + 1)
+ 'ttl': dns_data_utils.rand_ttl(start=record['ttl'] + 1)
}
LOG.info('Update the recordset')
- _, update = self.client.update_recordset(self.zone['id'],
- record['id'], recordset_data)
+ update = self.client.update_recordset(self.zone['id'],
+ record['id'], recordset_data)[1]
self.assertEqual(record['name'], update['name'])
self.assertEqual(record['records'], update['records'])
@@ -293,7 +424,7 @@
def test_show_recordsets_impersonate_another_project(self):
LOG.info('Create a Recordset')
- recordset_data = data_utils.rand_recordset_data(
+ recordset_data = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=self.zone['name'])
resp, body = self.client.create_recordset(
self.zone['id'], recordset_data)
@@ -333,7 +464,7 @@
def test_admin_list_all_recordsets_for_a_project(self):
LOG.info('Create a Recordset as Primary tenant')
- recordset_data_primary_1 = data_utils.rand_recordset_data(
+ recordset_data_primary_1 = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=self.zone['name'])
body_pr_1 = self.client.create_recordset(
self.zone['id'], recordset_data_primary_1)[1]
@@ -342,7 +473,7 @@
self.zone['id'], body_pr_1['id'])
self.assertEqual(const.PENDING, body_pr_1['status'],
'Failed, expected status is PENDING')
- recordset_data_primary_2 = data_utils.rand_recordset_data(
+ recordset_data_primary_2 = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=self.zone['name'])
body_pr_2 = self.client.create_recordset(
self.zone['id'], recordset_data_primary_2)[1]
@@ -385,9 +516,9 @@
@decorators.idempotent_id('48013b7c-f526-11eb-b04f-74e5f9e2a801')
def test_create_A_recordset_multiply_ips(self):
LOG.info('Create A type Recordset using a list of random IPs')
- recordset_data = data_utils.rand_a_recordset(
+ recordset_data = dns_data_utils.rand_a_recordset(
zone_name=self.zone['name'],
- ips=[data_utils.rand_ip() for _ in range(10)])
+ ips=[dns_data_utils.rand_ip() for _ in range(10)])
resp, body = self.client.create_recordset(
self.zone['id'], recordset_data)
self.addCleanup(
@@ -400,11 +531,40 @@
self.client, self.zone['id'],
body['id'], const.ACTIVE)
+ @decorators.idempotent_id('f15e583e-e479-11eb-8e5a-74e5f9e2a801')
+ def test_delete_zone_with_existing_recordset(self):
+
+ LOG.info('Create a Zone')
+ zone_name = dns_data_utils.rand_zone_name(name="TestZone",
+ suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name,
+ wait_until=const.ACTIVE)[1]
+ self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
+
+ LOG.info('Create a Recordset')
+ recordset_data = dns_data_utils.rand_recordset_data(
+ record_type='A', zone_name=zone['name'])
+ record = self.client.create_recordset(
+ zone['id'], recordset_data, wait_until=const.ACTIVE)[1]
+
+ LOG.info("Delete a Zone and wait till it's done")
+ body = self.zone_client.delete_zone(zone['id'])[1]
+ LOG.info('Ensure we respond with DELETE+PENDING')
+ self.assertEqual(const.DELETE, body['action'])
+ self.assertEqual(const.PENDING, body['status'])
+
+ LOG.info('Ensure successful deletion of Zone')
+ waiters.wait_for_zone_404(self.zone_client, zone['id'])
+
+ LOG.info('Ensure successful deletion of Recordset')
+ self.assertRaises(lib_exc.NotFound,
+ lambda: self.client.show_recordset(zone['id'], record['id']))
+
@ddt.ddt
class RecordsetsNegativeTest(BaseRecordsetsTest):
- credentials = ["primary", "alt"]
+ credentials = ["admin", "system_admin", "primary", "alt"]
@classmethod
def setup_credentials(cls):
@@ -441,79 +601,58 @@
@decorators.idempotent_id('b6dad57e-5ce9-4fa5-8d66-aebbcd23b4ad')
def test_get_nonexistent_recordset(self):
- LOG.info('Create a zone')
- _, zone = self.zone_client.create_zone()
- self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
-
LOG.info('Attempt to get an invalid Recordset')
with self.assertRaisesDns(
lib_exc.NotFound, 'recordset_not_found', 404):
- self.client.show_recordset(zone['id'], lib_data_utils.rand_uuid())
+ self.client.show_recordset(self.zone['id'],
+ data_utils.rand_uuid())
@decorators.idempotent_id('93d744a8-0dfd-4650-bcef-1e6ad632ad72')
def test_get_nonexistent_recordset_invalid_id(self):
- LOG.info('Create a zone')
- _, zone = self.zone_client.create_zone()
- self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
-
LOG.info('Attempt to get an invalid Recordset')
with self.assertRaisesDns(lib_exc.BadRequest, 'invalid_uuid', 400):
- self.client.show_recordset(zone['id'], 'invalid')
+ self.client.show_recordset(self.zone['id'], 'invalid')
@decorators.idempotent_id('da08f19a-7f10-47cc-8b41-994507190812')
def test_update_nonexistent_recordset(self):
- LOG.info('Create a zone')
- _, zone = self.zone_client.create_zone()
- self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
-
- recordset_data = data_utils.rand_recordset_data('A', zone['name'])
+ recordset_data = dns_data_utils.rand_recordset_data(
+ 'A', self.zone['name'])
LOG.info('Attempt to update an invalid Recordset')
with self.assertRaisesDns(
lib_exc.NotFound, 'recordset_not_found', 404):
self.client.update_recordset(
- zone['id'], lib_data_utils.rand_uuid(), recordset_data)
+ self.zone['id'], data_utils.rand_uuid(), recordset_data)
@decorators.idempotent_id('158340a1-3f69-4aaa-9968-956190563768')
def test_update_nonexistent_recordset_invalid_id(self):
- LOG.info('Create a zone')
- _, zone = self.zone_client.create_zone()
- self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
-
- recordset_data = data_utils.rand_recordset_data('A', zone['name'])
+ recordset_data = dns_data_utils.rand_recordset_data(
+ 'A', self.zone['name'])
LOG.info('Attempt to update an invalid Recordset')
with self.assertRaisesDns(lib_exc.BadRequest, 'invalid_uuid', 400):
self.client.update_recordset(
- zone['id'], 'invalid', recordset_data)
+ self.zone['id'], 'invalid', recordset_data)
@decorators.idempotent_id('64bd94d4-54bd-4bee-b6fd-92ede063234e')
def test_delete_nonexistent_recordset(self):
- LOG.info('Create a zone')
- _, zone = self.zone_client.create_zone()
- self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
-
LOG.info('Attempt to delete an invalid Recordset')
with self.assertRaisesDns(
lib_exc.NotFound, 'recordset_not_found', 404):
self.client.delete_recordset(
- zone['id'], lib_data_utils.rand_uuid())
+ self.zone['id'], data_utils.rand_uuid())
@decorators.idempotent_id('5948b599-a332-4dcb-840b-afc825075ba3')
def test_delete_nonexistent_recordset_invalid_id(self):
- LOG.info('Create a zone')
- _, zone = self.zone_client.create_zone()
- self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
-
LOG.info('Attempt to get an invalid Recordset')
with self.assertRaisesDns(lib_exc.BadRequest, 'invalid_uuid', 400):
- self.client.delete_recordset(zone['id'], 'invalid')
+ self.client.delete_recordset(self.zone['id'], 'invalid')
@decorators.idempotent_id('64e01dc4-a2a8-11eb-aad4-74e5f9e2a801')
def test_show_recordsets_invalid_ids(self):
LOG.info('Create a Recordset')
- recordset_data = data_utils.rand_recordset_data(
+ recordset_data = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=self.zone['name'])
resp, body = self.client.create_recordset(
self.zone['id'], recordset_data)
@@ -532,18 +671,18 @@
self.assertRaises(
lib_exc.NotFound, lambda: self.client.show_recordset(
zone_uuid=self.zone['id'],
- recordset_uuid=lib_data_utils.rand_uuid()))
+ recordset_uuid=data_utils.rand_uuid()))
LOG.info('Ensure 404 NotFound status code is received if '
'zone ID is invalid.')
self.assertRaises(
lib_exc.NotFound, lambda: self.client.show_recordset(
- zone_uuid=lib_data_utils.rand_uuid(),
+ zone_uuid=data_utils.rand_uuid(),
recordset_uuid=body['id']))
@decorators.idempotent_id('c1d9f046-a2b1-11eb-aad4-74e5f9e2a801')
def test_create_recordset_for_other_tenant(self):
- recordset_data = data_utils.rand_recordset_data(
+ recordset_data = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=self.zone['name'])
LOG.info('Create a Recordset as Alt tenant for a zone created by '
@@ -554,6 +693,8 @@
class RootRecordsetsTests(BaseRecordsetsTest):
+ credentials = ["admin", "primary", "system_admin", "alt"]
+
@classmethod
def setup_credentials(cls):
# Do not create network resources for these test.
@@ -578,13 +719,13 @@
@decorators.idempotent_id('48a081b9-4474-4da0-9b1a-6359a80456ce')
def test_list_zones_recordsets(self):
LOG.info('List recordsets')
- _, body = self.client.list_zones_recordsets()
+ body = self.client.list_zones_recordsets()[1]
self.assertGreater(len(body['recordsets']), 0)
@decorators.idempotent_id('65ec0495-81d9-4cfb-8007-9d93b32ae883')
def test_get_single_zones_recordsets(self):
- recordset_data = data_utils.rand_recordset_data(
+ recordset_data = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=self.zone['name'], records=['10.1.0.2'])
LOG.info('Create a Recordset')
@@ -598,7 +739,7 @@
@decorators.idempotent_id('a8e41020-65be-453b-a8c1-2497d539c345')
def test_list_filter_zones_recordsets(self):
- recordset_data = data_utils.rand_recordset_data(
+ recordset_data = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=self.zone['name'], records=['10.0.1.2'])
LOG.info('Create a Recordset')
@@ -609,11 +750,13 @@
self.zone['id'], zone_recordset['id'])
LOG.info('Create another zone')
- _, zone2 = self.zone_client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(name="list-filter",
+ suffix=self.tld_name)
+ zone2 = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone2['id'])
LOG.info('Create another Recordset')
- recordset_data = data_utils.rand_recordset_data(
+ recordset_data = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=zone2['name'],
records=['10.0.1.3'])
resp, zone_recordset2 = self.client.create_recordset(
@@ -623,7 +766,7 @@
self.zone['id'], zone_recordset2['id'])
LOG.info('List recordsets')
- _, body = self.client.list_zones_recordsets(params={"data": "10.0.*"})
+ body = self.client.list_zones_recordsets(params={"data": "10.0.*"})[1]
recordsets = body['recordsets']
@@ -640,12 +783,16 @@
@decorators.idempotent_id('7f4970bf-9aeb-4a3c-9afd-02f5a7178d35')
def test_list_zones_recordsets_zone_names(self):
- LOG.info('Create another zone')
- _, zone2 = self.zone_client.create_zone()
- self.addCleanup(self.wait_zone_delete, self.zone_client, zone2['id'])
-
LOG.info('List recordsets')
- _, body = self.client.list_zones_recordsets()
+ zone_name = dns_data_utils.rand_zone_name(name="zone_names",
+ suffix=self.tld_name)
+ alt_zone = self.zone_client.create_zone(
+ name=zone_name, wait_until=const.ACTIVE)[1]
+ self.addCleanup(self.wait_zone_delete,
+ self.zone_client,
+ alt_zone['id'])
+
+ body = self.client.list_zones_recordsets()[1]
recordsets = body['recordsets']
zone_names = set()
@@ -686,7 +833,9 @@
for client in clients_list:
if client == 'primary':
# Create a zone and wait till it's ACTIVE
- zone = self.zone_client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(name="primary",
+ suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete,
self.zone_client,
zone['id'])
@@ -694,15 +843,17 @@
self.zone_client, zone['id'], const.ACTIVE)
# Create a recordset and wait till it's ACTIVE
- recordset_data = data_utils.rand_recordset_data(
+ recordset_data = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=zone['name'])
resp, body = self.client.create_recordset(
zone['id'], recordset_data)
+
self.addCleanup(
self.wait_recordset_delete, self.client,
self.zone['id'], body['id'])
self.assertEqual(const.PENDING, body['status'],
'Failed, expected status is PENDING')
+
LOG.info('Wait until the recordset is active')
waiters.wait_for_recordset_status(
self.client, zone['id'],
@@ -714,7 +865,9 @@
if client == 'alt':
# Create a zone and wait till it's ACTIVE
- alt_zone = self.alt_zone_client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(name="alt",
+ suffix=self.tld_name)
+ alt_zone = self.alt_zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete,
self.alt_zone_client,
alt_zone['id'])
@@ -722,15 +875,17 @@
self.alt_zone_client, alt_zone['id'], const.ACTIVE)
# Create a recordset and wait till it's ACTIVE
- recordset_data = data_utils.rand_recordset_data(
+ recordset_data = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=alt_zone['name'])
resp, body = self.alt_client.create_recordset(
alt_zone['id'], recordset_data)
+
self.addCleanup(
self.wait_recordset_delete, self.client,
self.zone['id'], body['id'])
self.assertEqual(const.PENDING, body['status'],
'Failed, expected status is PENDING')
+
LOG.info('Wait until the recordset is active')
waiters.wait_for_recordset_status(
self.alt_client, alt_zone['id'],
@@ -740,12 +895,13 @@
recordset_data['project_id'] = alt_zone['project_id']
recordsets_created['alt'] = recordset_data
+ LOG.info('Created resordsets are {}:'.format(recordsets_created))
return recordsets_created
@decorators.idempotent_id('9c0f58ad-1b31-4899-b184-5380720604e5')
def test_no_create_recordset_by_alt_tenant(self):
# try with name=A123456.zone.com.
- recordset_data = data_utils.rand_recordset_data(
+ recordset_data = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=self.zone['name'])
resp, rrset = self.client.create_recordset(
self.zone['id'], recordset_data)
@@ -760,13 +916,13 @@
@decorators.idempotent_id('d4a9aad9-c778-429b-9a0c-4cd2b61a0a01')
def test_no_create_super_recordsets(self):
- zone_name = data_utils.rand_zone_name()
+ zone_name = dns_data_utils.rand_zone_name(suffix=self.tld_name)
LOG.info('Create a zone as a default user')
- _, zone = self.zone_client.create_zone(name='a.b.' + zone_name)
+ zone = self.zone_client.create_zone(name='a.b.' + zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
- rrset_data = data_utils.rand_recordset_data(
+ rrset_data = dns_data_utils.rand_recordset_data(
record_type='A', zone_name=zone_name)
LOG.info('Create a zone as an alt user with existing superdomain')
@@ -777,23 +933,21 @@
@decorators.idempotent_id('3dbe244d-fa85-4afc-869b-0306388d8746')
def test_no_create_recordset_via_alt_domain(self):
- _, zone = self.zone_client.create_zone()
- _, alt_zone = self.alt_zone_client.create_zone()
- self.addCleanup(self.wait_zone_delete,
- self.zone_client,
- zone['id'])
+ zone_name = dns_data_utils.rand_zone_name(name="alt-domain",
+ suffix=self.tld_name)
+ alt_zone = self.alt_zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete,
self.alt_zone_client,
alt_zone['id'])
# alt attempts to create record with name A12345.{zone}
- recordset_data = data_utils.rand_recordset_data(
- record_type='A', zone_name=zone['name'])
+ recordset_data = dns_data_utils.rand_recordset_data(
+ record_type='A', zone_name=self.zone['name'])
self.assertRaises(
lib_exc.RestClientException,
lambda: self.alt_client.create_recordset(
- zone['id'],
+ self.zone['id'],
recordset_data
)
)
@@ -860,3 +1014,63 @@
{primary_project_id}, project_ids_api,
'Failed, unique project_ids {} are not as expected {}'.format(
project_ids_api, primary_project_id))
+
+
+class AdminManagedRecordsetTest(BaseRecordsetsTest):
+
+ credentials = ["primary", "admin", "system_admin"]
+
+ @classmethod
+ def setup_credentials(cls):
+ # Do not create network resources for these test.
+ cls.set_network_resources()
+ super(AdminManagedRecordsetTest, cls).setup_credentials()
+
+ @classmethod
+ def setup_clients(cls):
+ super(AdminManagedRecordsetTest, cls).setup_clients()
+ if CONF.enforce_scope.designate:
+ cls.admin_client = cls.os_system_admin.dns_v2.RecordsetClient()
+ else:
+ cls.admin_client = cls.os_admin.dns_v2.RecordsetClient()
+ cls.client = cls.os_primary.dns_v2.RecordsetClient()
+ cls.zone_client = cls.os_primary.dns_v2.ZonesClient()
+
+ @decorators.idempotent_id('84164ff4-8e68-11ec-983f-201e8823901f')
+ def test_admin_updates_soa_and_ns_recordsets(self):
+ # HTTP headers to be used in the test
+ sudo_header = {'X-Auth-All-Projects': True}
+ managed_records_header = {'X-Designate-Edit-Managed-Records': True}
+ sudo_managed_headers = sudo_header.copy()
+ sudo_managed_headers.update(managed_records_header)
+
+ LOG.info('Primary user creates a Zone')
+ zone_name = dns_data_utils.rand_zone_name(name="update_soa_ns",
+ suffix=self.tld_name)
+ zone = self.zone_client.create_zone(
+ name=zone_name,
+ description='Zone for "managed recordsets update" test',
+ wait_until=const.ACTIVE)[1]
+ self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
+ recordsets = self.admin_client.list_recordset(
+ zone['id'], headers=sudo_header)[1]['recordsets']
+
+ LOG.info('As Admin try to update SOA and NS recordsets,'
+ ' Expected not allowed')
+ for recordset in recordsets:
+ if recordset['type'] == 'NS':
+ self.assertRaisesDns(
+ lib_exc.BadRequest, 'bad_request', 400,
+ self.admin_client.update_recordset,
+ zone['id'], recordset['id'],
+ recordet_data=dns_data_utils.rand_ns_records(),
+ headers=sudo_managed_headers, extra_headers=True)
+
+ if recordset['type'] == 'SOA':
+ self.assertRaisesDns(
+ lib_exc.BadRequest, 'bad_request', 400,
+ self.admin_client.update_recordset,
+ zone['id'], recordset['id'],
+ recordet_data=dns_data_utils.rand_soa_recordset(
+ zone['name']),
+ headers=sudo_managed_headers, extra_headers=True)
diff --git a/designate_tempest_plugin/tests/api/v2/test_recordset_validation.py b/designate_tempest_plugin/tests/api/v2/test_recordset_validation.py
index a7efe31..a708f6d 100644
--- a/designate_tempest_plugin/tests/api/v2/test_recordset_validation.py
+++ b/designate_tempest_plugin/tests/api/v2/test_recordset_validation.py
@@ -14,14 +14,17 @@
limitations under the License.
"""
import ddt
-from tempest.lib import exceptions
+
+from tempest import config
from tempest.lib import decorators
+from tempest.lib import exceptions
from designate_tempest_plugin.tests import base
from designate_tempest_plugin.common import waiters
-from designate_tempest_plugin import data_utils
+from designate_tempest_plugin import data_utils as dns_data_utils
+CONF = config.CONF
RECORDSETS_DATASET = [
'A',
'AAAA',
@@ -37,6 +40,8 @@
@ddt.ddt
class RecordsetValidationTest(base.BaseDnsV2Test):
+ credentials = ["admin", "primary", "system_admin"]
+
def setUp(self):
super(RecordsetValidationTest, self).setUp()
self._zone = None
@@ -51,13 +56,23 @@
def setup_clients(cls):
super(RecordsetValidationTest, cls).setup_clients()
+ if CONF.enforce_scope.designate:
+ cls.admin_tld_client = cls.os_system_admin.dns_v2.TldClient()
+ else:
+ cls.admin_tld_client = cls.os_admin.dns_v2.TldClient()
cls.recordset_client = cls.os_primary.dns_v2.RecordsetClient()
cls.zones_client = cls.os_primary.dns_v2.ZonesClient()
@property
def zone(self):
if self._zone is None:
- zone_data = data_utils.rand_zone_data()
+ tld_name = dns_data_utils.rand_zone_name(
+ name="recordsetvalidation")
+ self.class_tld = self.admin_tld_client.create_tld(
+ tld_name=tld_name[:-1])
+ zone_name = dns_data_utils.rand_zone_name(name="TestZone",
+ suffix=f'.{tld_name}')
+ zone_data = dns_data_utils.rand_zone_data(name=zone_name)
resp, body = self.zones_client.create_zone(**zone_data)
self._zone = body
self.addCleanup(self.wait_zone_delete,
@@ -77,7 +92,8 @@
data = ["b0rk"]
for i in data:
- model = data_utils.make_rand_recordset(self.zone['name'], rtype)
+ model = dns_data_utils.make_rand_recordset(
+ self.zone['name'], rtype)
model['data'] = i
self.assertRaisesDns(
@@ -91,11 +107,13 @@
def test_update_invalid(self, rtype):
data = ["b0rk"]
- post_model = data_utils.make_rand_recordset(self.zone['name'], rtype)
+ post_model = dns_data_utils.make_rand_recordset(
+ self.zone['name'], rtype)
recordset = self.create_recordset(post_model)
for i in data:
- model = data_utils.make_rand_recordset(self.zone['name'], rtype)
+ model = dns_data_utils.make_rand_recordset(
+ self.zone['name'], rtype)
model['data'] = i
self.assertRaisesDns(
exceptions.BadRequest, 'invalid_object', 400,
@@ -105,7 +123,7 @@
@decorators.idempotent_id('61da1015-291f-43d1-a1a8-345cff12d201')
def test_cannot_create_wildcard_NS_recordset(self):
- model = data_utils.wildcard_ns_recordset(self.zone['name'])
+ model = dns_data_utils.wildcard_ns_recordset(self.zone['name'])
self.assertRaisesDns(
exceptions.BadRequest, 'invalid_object', 400,
self.recordset_client.create_recordset, self.zone['id'], model
@@ -113,7 +131,7 @@
@decorators.idempotent_id('92f681aa-d953-4d18-b12e-81a9149ccfd9')
def test_cname_recordsets_cannot_have_more_than_one_record(self):
- post_model = data_utils.rand_cname_recordset(
+ post_model = dns_data_utils.rand_cname_recordset(
zone_name=self.zone['name'])
post_model['records'] = [
@@ -130,7 +148,7 @@
@decorators.idempotent_id('22a9544b-2382-4ed2-ba12-4dbaedb8e880')
@ddt.file_data("invalid_txt_dataset.json")
def test_cannot_create_TXT_with(self, data):
- post_model = data_utils.rand_txt_recordset(self.zone['name'], data)
+ post_model = dns_data_utils.rand_txt_recordset(self.zone['name'], data)
self.assertRaisesDns(
exceptions.BadRequest, 'invalid_object', 400,
self.recordset_client.create_recordset,
@@ -140,7 +158,7 @@
@decorators.idempotent_id('03e4f811-0c37-4ce2-8b16-662c824f8f18')
@ddt.file_data("valid_txt_dataset.json")
def test_create_TXT_with(self, data):
- post_model = data_utils.rand_txt_recordset(self.zone['name'], data)
+ post_model = dns_data_utils.rand_txt_recordset(self.zone['name'], data)
recordset = self.create_recordset(post_model)
waiters.wait_for_recordset_status(
@@ -149,7 +167,7 @@
@decorators.idempotent_id('775b3db5-ec60-4dd7-85d2-f05a9c544978')
@ddt.file_data("valid_txt_dataset.json")
def test_create_SPF_with(self, data):
- post_model = data_utils.rand_spf_recordset(self.zone['name'], data)
+ post_model = dns_data_utils.rand_spf_recordset(self.zone['name'], data)
recordset = self.create_recordset(post_model)
waiters.wait_for_recordset_status(
@@ -158,7 +176,7 @@
@decorators.idempotent_id('7fa7783f-1624-4122-bfb2-6cfbf7a5b49b')
@ddt.file_data("invalid_mx_dataset.json")
def test_cannot_create_MX_with(self, pref):
- post_model = data_utils.rand_mx_recordset(
+ post_model = dns_data_utils.rand_mx_recordset(
self.zone['name'], pref=pref
)
@@ -171,7 +189,7 @@
@decorators.idempotent_id('3016f998-4e4a-4712-b15a-4e8dfbc5a60b')
@ddt.data("invalid_sshfp_dataset.json")
def test_cannot_create_SSHFP_with(self, algo=None, finger=None):
- post_model = data_utils.rand_sshfp_recordset(
+ post_model = dns_data_utils.rand_sshfp_recordset(
zone_name=self.zone['name'],
algorithm_number=algo,
fingerprint_type=finger,
diff --git a/designate_tempest_plugin/tests/api/v2/test_service_statuses.py b/designate_tempest_plugin/tests/api/v2/test_service_statuses.py
index 0deeb74..c1f634b 100644
--- a/designate_tempest_plugin/tests/api/v2/test_service_statuses.py
+++ b/designate_tempest_plugin/tests/api/v2/test_service_statuses.py
@@ -24,27 +24,29 @@
LOG = logging.getLogger(__name__)
-class ServiceStatus(base.BaseDnsV2Test):
+class ServiceStatusAdmin(base.BaseDnsV2Test):
- credentials = ["primary", "admin", "system_admin", "alt"]
+ credentials = ["primary", "admin", "system_admin", "system_reader", "alt",
+ "project_reader", "project_member"]
+
+ mandatory_services = ['central', 'mdns', 'worker', 'producer']
+ service_status_fields = [
+ 'id', 'hostname', 'service_name', 'status', 'stats', 'capabilities',
+ 'heartbeated_at', 'created_at', 'updated_at', 'links']
@classmethod
def setup_credentials(cls):
# Do not create network resources for these test.
cls.set_network_resources()
- super(ServiceStatus, cls).setup_credentials()
+ super(ServiceStatusAdmin, cls).setup_credentials()
@classmethod
def setup_clients(cls):
- super(ServiceStatus, cls).setup_clients()
+ super(ServiceStatusAdmin, cls).setup_clients()
if CONF.enforce_scope.designate:
cls.admin_client = cls.os_system_admin.dns_v2.ServiceClient()
else:
cls.admin_client = cls.os_admin.dns_v2.ServiceClient()
- cls.client = cls.os_primary.dns_v2.ServiceClient()
-
- cls.primary_client = cls.os_primary.dns_v2.ServiceClient()
- cls.alt_client = cls.os_alt.dns_v2.ServiceClient()
@decorators.idempotent_id('bf277a76-8583-11eb-a557-74e5f9e2a801')
def test_admin_list_service_statuses(self):
@@ -57,8 +59,8 @@
LOG.info('Make sure that all expected/mandatory services are '
'listed in API response.')
- expected_services = ['central', 'mdns', 'worker', 'producer']
- for service in expected_services:
+
+ for service in self.mandatory_services:
self.assertIn(
service, [item[0] for item in services_statuses_tup],
"Failed, expected service: {} wasn't detected in API "
@@ -70,6 +72,46 @@
"Failed, not all listed services are in UP status, "
"services: {}".format(services_statuses_tup))
+ # Test RBAC
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ServiceClient', 'list_statuses', expected_allowed, False)
+
+ @decorators.idempotent_id('fce0f704-c0ae-11ec-8213-201e8823901f')
+ def test_admin_show_service_status(self):
+
+ LOG.info('List services and get the IDs of mandatory services only')
+ services_ids = [
+ service['id'] for service in self.admin_client.list_statuses()
+ if service['service_name'] in self.mandatory_services]
+
+ LOG.info('Ensure all service status fields presents in response')
+ for id in services_ids:
+ service_show = self.admin_client.show_statuses(id)
+ self.assertEqual(
+ sorted(self.service_status_fields), sorted(service_show))
+
+
+class ServiceStatusNegative(base.BaseDnsV2Test):
+
+ credentials = ["primary", "alt"]
+
+ @classmethod
+ def setup_credentials(cls):
+ # Do not create network resources for these test.
+ cls.set_network_resources()
+ super(ServiceStatusNegative, cls).setup_credentials()
+
+ @classmethod
+ def setup_clients(cls):
+ super(ServiceStatusNegative, cls).setup_clients()
+ cls.primary_client = cls.os_primary.dns_v2.ServiceClient()
+ cls.alt_client = cls.os_alt.dns_v2.ServiceClient()
+
@decorators.idempotent_id('d4753f76-de43-11eb-91d1-74e5f9e2a801')
def test_primary_is_forbidden_to_list_service_statuses(self):
diff --git a/designate_tempest_plugin/tests/api/v2/test_tld.py b/designate_tempest_plugin/tests/api/v2/test_tld.py
index 4e68e1c..e2894fe 100644
--- a/designate_tempest_plugin/tests/api/v2/test_tld.py
+++ b/designate_tempest_plugin/tests/api/v2/test_tld.py
@@ -29,7 +29,12 @@
class TldAdminTest(BaseTldTest):
- credentials = ["admin", "system_admin", "primary"]
+ credentials = ["admin", "system_admin", "system_reader",
+ "primary", "alt", "project_reader", "project_member"]
+
+ # Use a TLD suffix unique to this test class.
+ local_tld_suffix = '.'.join(["tldadmintest", 'in-addr.arpa',
+ CONF.dns.tld_suffix])
@classmethod
def setup_credentials(cls):
@@ -48,45 +53,41 @@
cls.primary_zone_client = cls.os_primary.dns_v2.ZonesClient()
@classmethod
- def resource_setup(cls):
- super(TldAdminTest, cls).resource_setup()
- # Create in-addr.arpa as well so that if there are any tests that run
- # in parallel to this one and try to create a floating ip, they can
- # still create it fine
- cls.admin_client.create_tld(
- tld_name='in-addr.arpa', ignore_errors=lib_exc.Conflict)
- cls.tld = cls.admin_client.create_tld(
- tld_name='com', ignore_errors=lib_exc.Conflict
- )
-
- @classmethod
- def resource_cleanup(cls):
- cls.admin_client.delete_tld(cls.tld[1]['id'])
- super(TldAdminTest, cls).resource_cleanup()
+ def _generate_tld_name(cls, test_name):
+ return '.'.join([data_utils.rand_name(name=test_name),
+ cls.local_tld_suffix])
@decorators.idempotent_id('52a4bb4b-4eff-4591-9dd3-ad98316806c3')
def test_create_tld(self):
- tld_data = {
- "name": "org",
- "description": "sample tld"}
+ tld_name = self._generate_tld_name("test_create_tld")
+ tld_data = {"name": tld_name, "description": "sample tld"}
LOG.info('Create a tld')
- _, tld = self.admin_client.create_tld(tld_data['name'],
- tld_data['description'])
+ tld = self.admin_client.create_tld(tld_data['name'],
+ tld_data['description'])[1]
self.addCleanup(self.admin_client.delete_tld, tld['id'])
- self.assertEqual(tld_data["name"], tld['name'])
+ self.assertEqual(tld_name, tld['name'])
+
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement('TldClient', 'create_tld',
+ expected_allowed, False)
@decorators.idempotent_id('961bd2e8-d4d0-11eb-b8ee-74e5f9e2a801')
def test_create_duplicated_tlds(self):
- tld_data = {
- "name": "org", "description": "test_create_duplicated_tlds"}
+ tld_name = self._generate_tld_name("test_create_duplicated_tlds")
+ tld_data = {"name": tld_name,
+ "description": "test_create_duplicated_tlds"}
LOG.info('Create a first "org" TLD')
tld = self.admin_client.create_tld(
tld_data['name'], tld_data['description'])[1]
self.addCleanup(self.admin_client.delete_tld, tld['id'])
- self.assertEqual(tld_data["name"], tld['name'])
+ self.assertEqual(tld_name, tld['name'])
LOG.info('Try to create a second "org" TLD')
self.assertRaises(
@@ -96,15 +97,15 @@
@decorators.idempotent_id('0c0ab92e-d4db-11eb-b8ee-74e5f9e2a801')
def test_create_multiply_tlds(self):
- tlds = ['abc', 'def', 'gih']
- for tld_name in tlds:
- tld_data = {
- "name": tld_name, "description": "test_create_multiply_tlds"}
+ for _dummy in range(0, 2):
+ tld_name = self._generate_tld_name("test_create_multiply_tlds")
+ tld_data = {"name": tld_name,
+ "description": "test_create_multiply_tlds"}
LOG.info('Create a "{}" TLD'.format(tld_name))
tld = self.admin_client.create_tld(
tld_data['name'], tld_data['description'])[1]
self.addCleanup(self.admin_client.delete_tld, tld['id'])
- self.assertEqual(tld_data["name"], tld['name'])
+ self.assertEqual(tld_name, tld['name'])
@decorators.idempotent_id('52a4bb4b-4eff-4591-9dd3-ad98316806c3')
def test_create_invalid_tld(self):
@@ -121,21 +122,6 @@
lib_exc.BadRequest, self.admin_client.create_tld,
tld_name='org', description='test_create_invalid_tld' * 1000)
- @decorators.idempotent_id('06deced8-d4de-11eb-b8ee-74e5f9e2a801')
- def test_create_zone_for_not_existing_tld(self):
- LOG.info('Create an "org" TLD')
- tld_data = {"name": "org",
- "description": "test_create_zone_for_not_existing_tld"}
- tld = self.admin_client.create_tld(
- tld_data['name'], tld_data['description'])[1]
- self.addCleanup(self.admin_client.delete_tld, tld['id'])
- self.assertEqual(tld_data["name"], tld['name'])
-
- LOG.info('Try to create a Primary zone with "zzz" (not existing) TLD.')
- self.assertRaises(
- lib_exc.BadRequest, self.primary_zone_client.create_zone,
- name='example.zzz.')
-
@decorators.idempotent_id('757019c0-d4e2-11eb-b8ee-74e5f9e2a801')
def test_create_tld_as_primary_user(self):
tld_data = {
@@ -147,58 +133,104 @@
@decorators.idempotent_id('271af08c-2603-4f61-8eb1-05887b74e25a')
def test_show_tld(self):
+ tld_name = self._generate_tld_name("test_show_tld")
tld_data = {
- "name": "org",
+ "name": tld_name,
"description": "sample tld"}
LOG.info('Create a tld')
- _, tld = self.admin_client.create_tld(tld_data['name'],
- tld_data['description'])
+ tld = self.admin_client.create_tld(tld_data['name'],
+ tld_data['description'])[1]
self.addCleanup(self.admin_client.delete_tld, tld['id'])
LOG.info('Fetch the tld')
- _, body = self.admin_client.show_tld(tld['id'])
+ body = self.admin_client.show_tld(tld['id'])[1]
LOG.info('Ensure the fetched response matches the created tld')
self.assertExpected(tld, body, self.excluded_keys)
+ # Test RBAC
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'TldClient', 'show_tld', expected_allowed, False, tld['id'])
+
@decorators.idempotent_id('26708cb8-7126-48a7-9424-1c225e56e609')
def test_delete_tld(self):
LOG.info('Create a tld')
- _, tld = self.admin_client.create_tld()
+ tld_name = self._generate_tld_name("test_delete_tld")
+ tld = self.admin_client.create_tld(tld_name)[1]
self.addCleanup(self.admin_client.delete_tld, tld['id'],
ignore_errors=lib_exc.NotFound)
LOG.info('Delete the tld')
- _, body = self.admin_client.delete_tld(tld['id'])
+ self.admin_client.delete_tld(tld['id'])
- self.assertRaises(lib_exc.NotFound,
- lambda: self.admin_client.show_tld(tld['id']))
+ self.assertRaises(lib_exc.NotFound, self.admin_client.show_tld,
+ tld['id'])
+
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement('TldClient', 'delete_tld',
+ expected_allowed, False, tld['id'])
@decorators.idempotent_id('95b13759-c85c-4791-829b-9591ca15779d')
def test_list_tlds(self):
LOG.info('List tlds')
- _, body = self.admin_client.list_tlds()
+ tld_name = self._generate_tld_name("test_list_tlds")
+ tld = self.admin_client.create_tld(tld_name)[1]
+ self.addCleanup(self.admin_client.delete_tld, tld['id'],
+ ignore_errors=lib_exc.NotFound)
+
+ body = self.admin_client.list_tlds()[1]
self.assertGreater(len(body['tlds']), 0)
+ # Test RBAC
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'TldClient', 'list_tlds', expected_allowed, [tld['id']],
+ params={'limit': 1000})
+
@decorators.idempotent_id('1a233812-48d9-4d15-af5e-9961744286ff')
def test_update_tld(self):
- _, tld = self.admin_client.create_tld()
+ tld_name = self._generate_tld_name("test_update_tld")
+ tld = self.admin_client.create_tld(tld_name)[1]
self.addCleanup(self.admin_client.delete_tld, tld['id'])
+ tld_name_2 = self._generate_tld_name("test_update_tld")
+
tld_data = {
- "name": "org",
+ "name": tld_name_2,
"description": "Updated description"
}
LOG.info('Update the tld')
- _, patch_tld = self.admin_client.update_tld(tld['id'],
- tld_data['name'], tld_data['description'])
+ patch_tld = self.admin_client.update_tld(tld['id'],
+ tld_data['name'], tld_data['description'])[1]
- self.assertEqual(tld_data["name"], patch_tld["name"])
+ self.assertEqual(tld_name_2, patch_tld["name"])
self.assertEqual(tld_data["description"], patch_tld["description"])
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'TldClient', 'update_tld', expected_allowed, False, tld['id'],
+ tld_data['name'], tld_data['description'])
+
@decorators.idempotent_id('8116dcf5-a329-47d1-90be-5ff32f299c53')
def test_list_tlds_dot_json_fails(self):
uri = self.admin_client.get_uri('tlds.json')
diff --git a/designate_tempest_plugin/tests/api/v2/test_transfer_accepts.py b/designate_tempest_plugin/tests/api/v2/test_transfer_accepts.py
index 616431c..7696291 100644
--- a/designate_tempest_plugin/tests/api/v2/test_transfer_accepts.py
+++ b/designate_tempest_plugin/tests/api/v2/test_transfer_accepts.py
@@ -18,6 +18,7 @@
from tempest.lib import exceptions as lib_exc
from designate_tempest_plugin.tests import base
+from designate_tempest_plugin import data_utils as dns_data_utils
CONF = config.CONF
LOG = logging.getLogger(__name__)
@@ -27,9 +28,33 @@
excluded_keys = ['created_at', 'updated_at', 'key', 'links',
'zone_name']
+ @classmethod
+ def setup_clients(cls):
+ super(BaseTransferAcceptTest, cls).setup_clients()
+
+ if CONF.enforce_scope.designate:
+ cls.admin_tld_client = cls.os_system_admin.dns_v2.TldClient()
+ else:
+ cls.admin_tld_client = cls.os_admin.dns_v2.TldClient()
+
+ @classmethod
+ def resource_setup(cls):
+ super(BaseTransferAcceptTest, cls).resource_setup()
+
+ # Make sure we have an allowed TLD available
+ tld_name = dns_data_utils.rand_zone_name(name="BaseTransferAcceptTest")
+ cls.tld_name = f".{tld_name}"
+ cls.class_tld = cls.admin_tld_client.create_tld(tld_name=tld_name[:-1])
+
+ @classmethod
+ def resource_cleanup(cls):
+ cls.admin_tld_client.delete_tld(cls.class_tld[1]['id'])
+ super(BaseTransferAcceptTest, cls).resource_cleanup()
+
class TransferAcceptTest(BaseTransferAcceptTest):
- credentials = ["primary", "alt", "admin", "system_admin"]
+ credentials = ["primary", "alt", "admin", "system_admin", "system_reader",
+ "project_member", "project_reader"]
@classmethod
def setup_credentials(cls):
@@ -68,7 +93,10 @@
@decorators.idempotent_id('1c6baf97-a83e-4d2e-a5d8-9d37fb7808f3')
def test_create_transfer_accept(self):
LOG.info('Create a zone')
- _, zone = self.prm_zone_client.create_zone(wait_until='ACTIVE')
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_transfer_accept", suffix=self.tld_name)
+ zone = self.prm_zone_client.create_zone(name=zone_name,
+ wait_until='ACTIVE')[1]
self.addCleanup(
self.wait_zone_delete, self.admin_zone_client, zone['id'],
headers=self.all_projects_header,
@@ -87,6 +115,21 @@
"key": transfer_request['key'],
"zone_transfer_request_id": transfer_request['id']
}
+
+ # Test RBAC
+ # Note: Everyone can call this API and succeed if they know the
+ # transfer key.
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+ expected_allowed.append('os_system_reader')
+ expected_allowed.append('os_project_member')
+ expected_allowed.append('os_project_reader')
+
+ self.check_CUD_RBAC_enforcement(
+ 'TransferAcceptClient', 'create_transfer_accept',
+ expected_allowed, True, data)
+
LOG.info('Create a zone transfer_accept')
_, transfer_accept = self.prm_accept_client.create_transfer_accept(
data)
@@ -97,7 +140,10 @@
@decorators.idempotent_id('37c6afbb-3ea3-4fd8-94ea-a426244f019a')
def test_show_transfer_accept(self):
LOG.info('Create a zone')
- _, zone = self.prm_zone_client.create_zone(wait_until='ACTIVE')
+ zone_name = dns_data_utils.rand_zone_name(name="show_transfer_accept",
+ suffix=self.tld_name)
+ zone = self.prm_zone_client.create_zone(name=zone_name,
+ wait_until='ACTIVE')[1]
self.addCleanup(
self.wait_zone_delete, self.admin_zone_client, zone['id'],
headers=self.all_projects_header,
@@ -129,11 +175,33 @@
'created transfer_accept')
self.assertExpected(transfer_accept, body, self.excluded_keys)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC
+ expected_allowed = ['os_primary']
+
+ self.check_list_show_RBAC_enforcement(
+ 'TransferAcceptClient', 'show_transfer_accept', expected_allowed,
+ True, transfer_accept['id'])
+
+ # Test RBAC with x-auth-all-projects
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'TransferAcceptClient', 'show_transfer_accept', expected_allowed,
+ True, transfer_accept['id'], headers=self.all_projects_header)
+
@decorators.idempotent_id('89b516f0-8c9f-11eb-a322-74e5f9e2a801')
def test_ownership_transferred_zone(self):
LOG.info('Create a Primary zone')
- zone = self.prm_zone_client.create_zone(wait_until='ACTIVE')[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="ownership_transferred_zone", suffix=self.tld_name)
+ zone = self.prm_zone_client.create_zone(name=zone_name,
+ wait_until='ACTIVE')[1]
self.addCleanup(
self.wait_zone_delete, self.admin_zone_client, zone['id'],
headers=self.all_projects_header,
@@ -178,7 +246,10 @@
for _ in range(number_of_zones_to_transfer):
LOG.info('Create a Primary zone')
- zone = self.prm_zone_client.create_zone(wait_until='ACTIVE')[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="list_transfer_accepts", suffix=self.tld_name)
+ zone = self.prm_zone_client.create_zone(name=zone_name,
+ wait_until='ACTIVE')[1]
self.addCleanup(
self.wait_zone_delete, self.admin_zone_client, zone['id'],
headers=self.all_projects_header,
@@ -205,6 +276,30 @@
self.assertEqual('COMPLETE', transfer_accept['status'])
transfer_request_ids.append(transfer_accept['id'])
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC - Users that are allowed to call list, but should get
+ # zero zones.
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_RBAC_enforcement_count(
+ 'TransferAcceptClient', 'list_transfer_accept',
+ expected_allowed, 0)
+
+ # Test that users who should see the zone, can see it.
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'TransferAcceptClient', 'list_transfer_accept',
+ expected_allowed, transfer_request_ids,
+ headers=self.all_projects_header)
+
# As Admin list all accepted zone transfers, expected:
# each previously transferred zone is listed.
# Note: This is an all-projects list call, so other tests running
@@ -214,7 +309,8 @@
admin_client_accept_ids = [
item['id'] for item in
self.admin_accept_client.list_transfer_accept(
- headers=self.all_projects_header, params={'limit': 1000})]
+ headers=self.all_projects_header,
+ params={'limit': 1000})[1]['transfer_accepts']]
for tr_id in transfer_request_ids:
self.assertIn(
tr_id, admin_client_accept_ids,
@@ -229,7 +325,7 @@
item['id'] for item in
self.admin_accept_client.list_transfer_accept(
headers=self.all_projects_header,
- params={'status': 'COMPLETE'})]
+ params={'status': 'COMPLETE'})[1]['transfer_accepts']]
for tr_id in transfer_request_ids:
self.assertIn(
tr_id, admin_client_accept_ids,
@@ -246,7 +342,7 @@
item['id'] for item in
self.admin_accept_client.list_transfer_accept(
headers=self.all_projects_header,
- params={'status': not_existing_status})]
+ params={'status': not_existing_status})[1]['transfer_accepts']]
self.assertEmpty(
admin_client_accept_ids,
"Failed, filtered list should be empty, but actually it's not, "
@@ -255,7 +351,10 @@
@decorators.idempotent_id('b6ac770e-a1d3-11eb-b534-74e5f9e2a801')
def test_show_transfer_accept_impersonate_another_project(self):
LOG.info('Create a zone as primary tenant')
- zone = self.prm_zone_client.create_zone(wait_until='ACTIVE')[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="show_transfer_accept_impersonate", suffix=self.tld_name)
+ zone = self.prm_zone_client.create_zone(name=zone_name,
+ wait_until='ACTIVE')[1]
# In case when something goes wrong with the test and E2E
# scenario fails for some reason, we'll use Admin tenant
@@ -302,6 +401,18 @@
self.addCleanup(
self.wait_zone_delete, self.alt_zone_client, zone['id'])
+ # Test RBAC with x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'TransferAcceptClient', 'show_transfer_accept', expected_allowed,
+ True, transfer_accept['id'],
+ headers={'x-auth-sudo-project-id':
+ self.os_alt.credentials.project_id})
+
class TransferAcceptTestNegative(BaseTransferAcceptTest):
@@ -323,7 +434,10 @@
@decorators.idempotent_id('324a3e80-a1cc-11eb-b534-74e5f9e2a801')
def test_create_transfer_accept_using_invalid_key(self):
LOG.info('Create a zone')
- zone = self.zone_client.create_zone(wait_until='ACTIVE')[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_transfer_accept_invalid_key", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name,
+ wait_until='ACTIVE')[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
LOG.info('Create a zone transfer_request')
@@ -335,7 +449,8 @@
transfer_request['id']
)
- data = {"key": data_utils.rand_password(len(transfer_request['key'])),
+ data = {"key": data_utils.rand_password(
+ len(transfer_request['key'])),
"zone_transfer_request_id": transfer_request['id']}
LOG.info('Create a zone transfer_accept using invalid key')
@@ -346,7 +461,10 @@
@decorators.idempotent_id('23afb948-a1ce-11eb-b534-74e5f9e2a801')
def test_create_transfer_accept_using_deleted_transfer_request_id(self):
LOG.info('Create a zone')
- zone = self.zone_client.create_zone(wait_until='ACTIVE')[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_transfer_accept_deleted_id", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name,
+ wait_until='ACTIVE')[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
LOG.info('Create a zone transfer_request')
diff --git a/designate_tempest_plugin/tests/api/v2/test_transfer_request.py b/designate_tempest_plugin/tests/api/v2/test_transfer_request.py
index 6c8ed07..caf1876 100644
--- a/designate_tempest_plugin/tests/api/v2/test_transfer_request.py
+++ b/designate_tempest_plugin/tests/api/v2/test_transfer_request.py
@@ -27,9 +27,34 @@
class BaseTransferRequestTest(base.BaseDnsV2Test):
excluded_keys = ['created_at', 'updated_at', 'key', 'links']
+ @classmethod
+ def setup_clients(cls):
+ super(BaseTransferRequestTest, cls).setup_clients()
+
+ if CONF.enforce_scope.designate:
+ cls.admin_tld_client = cls.os_system_admin.dns_v2.TldClient()
+ else:
+ cls.admin_tld_client = cls.os_admin.dns_v2.TldClient()
+
+ @classmethod
+ def resource_setup(cls):
+ super(BaseTransferRequestTest, cls).resource_setup()
+
+ # Make sure we have an allowed TLD available
+ tld_name = dns_data_utils.rand_zone_name(
+ name="BaseTransferRequestTest")
+ cls.tld_name = f".{tld_name}"
+ cls.class_tld = cls.admin_tld_client.create_tld(tld_name=tld_name[:-1])
+
+ @classmethod
+ def resource_cleanup(cls):
+ cls.admin_tld_client.delete_tld(cls.class_tld[1]['id'])
+ super(BaseTransferRequestTest, cls).resource_cleanup()
+
class TransferRequestTest(BaseTransferRequestTest):
- credentials = ["primary", "alt", "admin", "system_admin"]
+ credentials = ["primary", "alt", "admin", "system_admin", "system_reader",
+ "project_member", "project_reader"]
@classmethod
def setup_credentials(cls):
@@ -54,11 +79,23 @@
@decorators.idempotent_id('2381d489-ad84-403d-b0a2-8b77e4e966bf')
def test_create_transfer_request(self):
LOG.info('Create a zone')
- _, zone = self.zone_client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_transfer_request", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+ expected_allowed.append('os_project_member')
+
+ self.check_CUD_RBAC_enforcement(
+ 'TransferRequestClient', 'create_transfer_request',
+ expected_allowed, True, zone['id'])
+
LOG.info('Create a zone transfer_request')
- _, transfer_request = self.client.create_transfer_request(zone['id'])
+ transfer_request = self.client.create_transfer_request(zone['id'])[1]
self.addCleanup(self.client.delete_transfer_request,
transfer_request['id'])
@@ -68,15 +105,17 @@
@decorators.idempotent_id('5deae1ac-7c14-42dc-b14e-4e4b2725beb7')
def test_create_transfer_request_scoped(self):
LOG.info('Create a zone')
- _, zone = self.zone_client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_transfer_request_scoped", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
transfer_request_data = dns_data_utils.rand_transfer_request_data(
target_project_id=self.os_alt.credentials.project_id)
LOG.info('Create a scoped zone transfer_request')
- _, transfer_request = self.client.create_transfer_request(
- zone['id'], transfer_request_data)
+ transfer_request = self.client.create_transfer_request(
+ zone['id'], transfer_request_data)[1]
self.addCleanup(self.client.delete_transfer_request,
transfer_request['id'])
@@ -86,11 +125,13 @@
@decorators.idempotent_id('4505152f-0a9c-4f02-b385-2216c914a0be')
def test_create_transfer_request_empty_body(self):
LOG.info('Create a zone')
- _, zone = self.zone_client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_transfer_request_empty", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
LOG.info('Create a zone transfer_request')
- _, transfer_request = self.client.create_transfer_request_empty_body(
- zone['id'])
+ transfer_request = self.client.create_transfer_request_empty_body(
+ zone['id'])[1]
self.addCleanup(self.client.delete_transfer_request,
transfer_request['id'])
@@ -100,26 +141,60 @@
@decorators.idempotent_id('64a7be9f-8371-4ce1-a242-c1190de7c985')
def test_show_transfer_request(self):
LOG.info('Create a zone')
- _, zone = self.zone_client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="show_transfer_request", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
LOG.info('Create a zone transfer_request')
- _, transfer_request = self.client.create_transfer_request(zone['id'])
+ transfer_request = self.client.create_transfer_request(zone['id'])[1]
self.addCleanup(self.client.delete_transfer_request,
transfer_request['id'])
LOG.info('Fetch the transfer_request')
- _, body = self.client.show_transfer_request(transfer_request['id'])
+ body = self.client.show_transfer_request(transfer_request['id'])[1]
LOG.info('Ensure the fetched response matches the '
'created transfer_request')
self.assertExpected(transfer_request, body, self.excluded_keys)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC
+ # Note: The create service client does not define a target project
+ # ID, so everyone should be able to see it.
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.extend(['os_system_admin', 'os_system_reader',
+ 'os_project_member', 'os_project_reader'])
+
+ self.check_list_show_RBAC_enforcement(
+ 'TransferRequestClient', 'show_transfer_request', expected_allowed,
+ True, transfer_request['id'])
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'TransferRequestClient', 'show_transfer_request', expected_allowed,
+ True, transfer_request['id'], headers=self.all_projects_header)
+ # TODO(johnsom) Move this down to the impersonate test below when the
+ # bug is resolved and the test is not skipped.
+ self.check_list_show_RBAC_enforcement(
+ 'TransferRequestClient', 'show_transfer_request', expected_allowed,
+ True, transfer_request['id'],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('5bed4582-9cfb-11eb-a160-74e5f9e2a801')
@decorators.skip_because(bug="1926572")
def test_show_transfer_request_impersonate_another_project(self):
LOG.info('Create a zone')
- zone = self.zone_client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="show_transfer_request_impersonate", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
LOG.info('Create a zone transfer_request')
@@ -148,20 +223,22 @@
# Checks the target of a scoped transfer request can see
# the request.
LOG.info('Create a zone')
- _, zone = self.zone_client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_transfer_request_as_target", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
transfer_request_data = dns_data_utils.rand_transfer_request_data(
target_project_id=self.os_alt.credentials.project_id)
LOG.info('Create a scoped zone transfer_request')
- _, transfer_request = self.client.create_transfer_request(
- zone['id'], transfer_request_data)
+ transfer_request = self.client.create_transfer_request(
+ zone['id'], transfer_request_data)[1]
self.addCleanup(self.client.delete_transfer_request,
transfer_request['id'])
LOG.info('Fetch the transfer_request as the target')
- _, body = self.alt_client.show_transfer_request(transfer_request['id'])
+ body = self.alt_client.show_transfer_request(transfer_request['id'])[1]
LOG.info('Ensure the fetched response matches the '
'created transfer_request')
@@ -169,48 +246,102 @@
"project_id"]
self.assertExpected(transfer_request, body, excluded_keys)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC when a transfer target project is specified.
+ expected_allowed = ['os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+ else:
+ expected_allowed.append('os_admin')
+
+ self.check_list_show_RBAC_enforcement(
+ 'TransferRequestClient', 'show_transfer_request', expected_allowed,
+ True, transfer_request['id'])
+
@decorators.idempotent_id('7d81c487-aa15-44c4-b3e5-424ab9e6a3e5')
def test_delete_transfer_request(self):
LOG.info('Create a zone')
- _, zone = self.zone_client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="delete_transfer_request", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
LOG.info('Create a transfer_request')
- _, transfer_request = self.client.create_transfer_request(zone['id'])
+ transfer_request = self.client.create_transfer_request(zone['id'])[1]
self.addCleanup(self.client.delete_transfer_request,
transfer_request['id'],
ignore_errors=lib_exc.NotFound)
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+ expected_allowed.append('os_project_member')
+
+ self.check_CUD_RBAC_enforcement(
+ 'TransferRequestClient', 'delete_transfer_request',
+ expected_allowed, True, transfer_request['id'])
+
LOG.info('Delete the transfer_request')
- _, body = self.client.delete_transfer_request(transfer_request['id'])
+ self.client.delete_transfer_request(transfer_request['id'])
self.assertRaises(lib_exc.NotFound,
lambda: self.client.show_transfer_request(transfer_request['id']))
@decorators.idempotent_id('ddd42a19-1768-428c-846e-32f9d6493011')
def test_list_transfer_requests(self):
LOG.info('Create a zone')
- _, zone = self.zone_client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="list_transfer_request", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
LOG.info('Create a zone transfer_request')
- _, transfer_request = self.client.create_transfer_request(zone['id'])
+ transfer_request = self.client.create_transfer_request(zone['id'])[1]
self.addCleanup(self.client.delete_transfer_request,
transfer_request['id'])
LOG.info('List transfer_requests')
- _, body = self.client.list_transfer_requests()
+ body = self.client.list_transfer_requests()[1]
self.assertGreater(len(body['transfer_requests']), 0)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC - Users that are allowed to call list, but should get
+ # zero zones.
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_admin', 'os_project_member',
+ 'os_project_reader']
+ else:
+ expected_allowed = ['os_alt']
+
+ self.check_list_RBAC_enforcement_count(
+ 'TransferRequestClient', 'list_transfer_requests',
+ expected_allowed, 0)
+
+ # Test that users who should see the zone, can see it.
+ expected_allowed = ['os_primary']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'TransferRequestClient', 'list_transfer_requests',
+ expected_allowed, [transfer_request['id']])
+
@decorators.idempotent_id('db985892-9d02-11eb-a160-74e5f9e2a801')
def test_list_transfer_requests_all_projects(self):
LOG.info('Create a Primary zone')
- primary_zone = self.zone_client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="list_transfer_request_all_projects", suffix=self.tld_name)
+ primary_zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete,
self.zone_client, primary_zone['id'])
LOG.info('Create an Alt zone')
- alt_zone = self.alt_zone_client.create_zone()[1]
+ alt_zone_name = dns_data_utils.rand_zone_name(
+ name="list_transfer_request_all_projects_alt",
+ suffix=self.tld_name)
+ alt_zone = self.alt_zone_client.create_zone(name=alt_zone_name)[1]
self.addCleanup(self.wait_zone_delete,
self.alt_zone_client, alt_zone['id'])
@@ -248,15 +379,30 @@
"Failed, transfer request ID:{} wasn't found in "
"listed IDs{}".format(request_id, request_ids))
+ # Test RBAC with x-auth-all-projects
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'TransferRequestClient', 'list_transfer_requests',
+ expected_allowed, [primary_transfer_request['id']],
+ headers=self.all_projects_header)
+
@decorators.idempotent_id('bee42f38-e666-4b85-a710-01f40ea1e56a')
def test_list_transfer_requests_impersonate_another_project(self):
LOG.info('Create a Primary zone')
- primary_zone = self.zone_client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="list_transfer_request_impersonate", suffix=self.tld_name)
+ primary_zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete,
self.zone_client, primary_zone['id'])
LOG.info('Create an Alt zone')
- alt_zone = self.alt_zone_client.create_zone()[1]
+ alt_zone_name = dns_data_utils.rand_zone_name(
+ name="list_transfer_request_impersonate_alt", suffix=self.tld_name)
+ alt_zone = self.alt_zone_client.create_zone(name=alt_zone_name)[1]
self.addCleanup(self.wait_zone_delete,
self.alt_zone_client, alt_zone['id'])
@@ -279,27 +425,66 @@
self.assertEqual([alt_transfer_request['id']], request_ids)
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'TransferRequestClient', 'list_transfer_requests',
+ expected_allowed, [primary_transfer_request['id']],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('de5e9d32-c723-4518-84e5-58da9722cc13')
def test_update_transfer_request(self):
LOG.info('Create a zone')
- _, zone = self.zone_client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="update_transfer_request", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
LOG.info('Create a zone transfer_request')
- _, transfer_request = self.client.create_transfer_request(zone['id'])
+ transfer_request = self.client.create_transfer_request(zone['id'])[1]
self.addCleanup(self.client.delete_transfer_request,
transfer_request['id'])
LOG.info('Update the transfer_request')
data = {
- "description": "demo descripion"
+ "description": "demo description"
}
- _, transfer_request_patch = self.client.update_transfer_request(
- transfer_request['id'], transfer_request_data=data)
+ transfer_request_patch = self.client.update_transfer_request(
+ transfer_request['id'], transfer_request_data=data)[1]
self.assertEqual(data['description'],
transfer_request_patch['description'])
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'TransferRequestClient', 'update_transfer_request',
+ expected_allowed, True,
+ transfer_request['id'], transfer_request_data=data)
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'TransferRequestClient', 'update_transfer_request',
+ expected_allowed, False,
+ transfer_request['id'], transfer_request_data=data,
+ headers=self.all_projects_header)
+ self.check_CUD_RBAC_enforcement(
+ 'TransferRequestClient', 'update_transfer_request',
+ expected_allowed, False,
+ transfer_request['id'], transfer_request_data=data,
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('73b754a9-e856-4fd6-80ba-e8d1b80f5dfa')
def test_list_transfer_requests_dot_json_fails(self):
uri = self.client.get_uri('transfer_requests.json')
@@ -309,6 +494,7 @@
class TestTransferRequestNotFound(BaseTransferRequestTest):
+ credentials = ["admin", "primary", "system_admin"]
@classmethod
def setup_credentials(cls):
@@ -321,6 +507,15 @@
super(TestTransferRequestNotFound, cls).setup_clients()
cls.client = cls.os_primary.dns_v2.TransferRequestClient()
+ @decorators.idempotent_id('39131f7c-e9bb-4f92-a325-444a675e1b3d')
+ def test_create_transfer_request_404(self):
+ e = self.assertRaises(lib_exc.NotFound,
+ self.client.create_transfer_request,
+ data_utils.rand_uuid())
+ self.assertEqual(404, e.resp.status)
+ self.assertEqual(404, e.resp_body['code'])
+ self.assertEqual("zone_not_found", e.resp_body['type'])
+
@decorators.idempotent_id('d255f72f-ba24-43df-9dba-011ed7f4625d')
def test_show_transfer_request_404(self):
e = self.assertRaises(lib_exc.NotFound,
@@ -346,11 +541,10 @@
self.assertEqual(404, resp.status)
self.assertEqual(404, resp_body['code'])
self.assertEqual("zone_transfer_request_not_found", resp_body['type'])
- self.assertEqual("Could not find ZoneTransferRequest",
- resp_body['message'])
class TestTransferRequestInvalidId(BaseTransferRequestTest):
+ credentials = ["admin", "primary", "system_admin"]
@classmethod
def setup_credentials(cls):
diff --git a/designate_tempest_plugin/tests/api/v2/test_tsigkey.py b/designate_tempest_plugin/tests/api/v2/test_tsigkey.py
index 292b821..95a4e21 100644
--- a/designate_tempest_plugin/tests/api/v2/test_tsigkey.py
+++ b/designate_tempest_plugin/tests/api/v2/test_tsigkey.py
@@ -19,6 +19,7 @@
from tempest.lib import exceptions as lib_exc
from designate_tempest_plugin.tests import base
+from designate_tempest_plugin import data_utils as dns_data_utils
CONF = config.CONF
LOG = logging.getLogger(__name__)
@@ -27,9 +28,33 @@
class BaseTsigkeyTest(base.BaseDnsV2Test):
excluded_keys = ['created_at', 'updated_at', 'links']
+ @classmethod
+ def setup_clients(cls):
+ super(BaseTsigkeyTest, cls).setup_clients()
+
+ if CONF.enforce_scope.designate:
+ cls.admin_tld_client = cls.os_system_admin.dns_v2.TldClient()
+ else:
+ cls.admin_tld_client = cls.os_admin.dns_v2.TldClient()
+
+ @classmethod
+ def resource_setup(cls):
+ super(BaseTsigkeyTest, cls).resource_setup()
+
+ # Make sure we have an allowed TLD available
+ tld_name = dns_data_utils.rand_zone_name(name="BaseTsigkeyTest")
+ cls.tld_name = f".{tld_name}"
+ cls.class_tld = cls.admin_tld_client.create_tld(tld_name=tld_name[:-1])
+
+ @classmethod
+ def resource_cleanup(cls):
+ cls.admin_tld_client.delete_tld(cls.class_tld[1]['id'])
+ super(BaseTsigkeyTest, cls).resource_cleanup()
+
class TsigkeyAdminTest(BaseTsigkeyTest):
- credentials = ["primary", "admin", "system_admin"]
+ credentials = ["primary", "admin", "system_admin", "system_reader",
+ "project_member", "project_reader", "alt"]
@classmethod
def setup_credentials(cls):
@@ -42,95 +67,413 @@
super(TsigkeyAdminTest, cls).setup_clients()
if CONF.enforce_scope.designate:
cls.admin_client = cls.os_system_admin.dns_v2.TsigkeyClient()
+ cls.pool_admin_client = cls.os_system_admin.dns_v2.PoolClient()
else:
cls.admin_client = cls.os_admin.dns_v2.TsigkeyClient()
+ cls.pool_admin_client = cls.os_admin.dns_v2.PoolClient()
+
cls.zone_client = cls.os_primary.dns_v2.ZonesClient()
+ cls.primary_client = cls.os_primary.dns_v2.TsigkeyClient()
@decorators.idempotent_id('e7b484e3-7ed5-4840-89d7-1e696986f8e4')
- def test_create_tsigkey(self):
+ def test_create_tsigkey_for_zone(self):
LOG.info('Create a resource')
- _, zone = self.zone_client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_tsigkey_for_zone", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
tsigkey_data = {
- "name": "Example tsigkey",
+ "name": dns_data_utils.rand_zone_name(
+ 'test_create_tsigkey_for_zone'),
"algorithm": "hmac-sha256",
"secret": "SomeSecretKey",
- "scope": "POOL",
+ "scope": "ZONE",
"resource_id": zone['id']}
LOG.info('Create a tsigkey')
- _, tsigkey = self.admin_client.create_tsigkey(
+ tsigkey = self.admin_client.create_tsigkey(
tsigkey_data['resource_id'],
tsigkey_data['name'], tsigkey_data['algorithm'],
- tsigkey_data['secret'], tsigkey_data['scope'])
+ tsigkey_data['secret'], tsigkey_data['scope'])[1]
self.addCleanup(self.admin_client.delete_tsigkey, tsigkey['id'])
self.assertEqual(tsigkey_data["name"], tsigkey['name'])
+ self.assertEqual(tsigkey_data["scope"], 'ZONE')
+
+ @decorators.idempotent_id('45975fa6-d726-11eb-beba-74e5f9e2a801')
+ def test_create_tsigkey_for_pool(self):
+ LOG.info('Get the valid pool ID from list of pools')
+ pool = self.pool_admin_client.list_pools()[1]['pools'][0]
+
+ LOG.info('Create a tsigkey')
+ tsigkey_data = {
+ "name": dns_data_utils.rand_zone_name('Example_Key'),
+ "algorithm": "hmac-sha256",
+ "secret": "SomeSecretKey",
+ "scope": "POOL",
+ "resource_id": pool['id']}
+ tsigkey = self.admin_client.create_tsigkey(
+ tsigkey_data['resource_id'],
+ tsigkey_data['name'], tsigkey_data['algorithm'],
+ tsigkey_data['secret'], tsigkey_data['scope'])[1]
+ self.addCleanup(self.admin_client.delete_tsigkey, tsigkey['id'])
+ self.assertEqual(tsigkey_data["name"], tsigkey['name'])
+ self.assertEqual(tsigkey_data["scope"], 'POOL')
+
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'TsigkeyClient', 'create_tsigkey', expected_allowed, False,
+ tsigkey_data['resource_id'],
+ tsigkey_data['name'], tsigkey_data['algorithm'],
+ tsigkey_data['secret'], tsigkey_data['scope'])
@decorators.idempotent_id('d46e5e86-a18c-4315-aa0c-95a00e816fbf')
def test_list_tsigkey(self):
LOG.info('Create a resource')
- _, zone = self.zone_client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="list_tsigkey", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
LOG.info('Create a tsigkey')
- _, tsigkey = self.admin_client.create_tsigkey(resource_id=zone['id'])
+ tsigkey = self.admin_client.create_tsigkey(resource_id=zone['id'])[1]
self.addCleanup(self.admin_client.delete_tsigkey, tsigkey['id'])
- _, body = self.admin_client.list_tsigkeys()
+ body = self.admin_client.list_tsigkeys()[1]
self.assertGreater(len(body['tsigkeys']), 0)
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader']
+ self.check_list_IDs_RBAC_enforcement(
+ 'TsigkeyClient', 'list_tsigkeys', expected_allowed,
+ [tsigkey['id']])
+
+ @decorators.idempotent_id('d46e5e86-a18c-4315-aa0c-95a00e816fbf')
+ def test_list_tsigkeys_limit_results(self):
+ for i in range(3):
+ LOG.info('As Primary user create a zone: {} '.format(i))
+ zone_name = dns_data_utils.rand_zone_name(
+ name="list_tsigkey_limit", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
+ self.addCleanup(
+ self.wait_zone_delete, self.zone_client, zone['id'])
+ LOG.info('As Admin user create a tsigkey: {} '.format(i))
+ tsigkey = self.admin_client.create_tsigkey(
+ resource_id=zone['id'])[1]
+ self.addCleanup(self.admin_client.delete_tsigkey, tsigkey['id'])
+ LOG.info('As Admin client, list all tsigkey using '
+ 'URL query: "limit=2"')
+ body = self.admin_client.list_tsigkeys(params={'limit': 2})[1]
+ self.assertEqual(len(body['tsigkeys']), 2)
+
+ @decorators.idempotent_id('f31447b0-d817-11eb-b95a-74e5f9e2a801')
+ def test_list_tsigkeys_using_marker(self):
+ test_tsigkeys_name = 'marker_tsigkey_'
+ test_tsigkeys_names = [test_tsigkeys_name + str(i) for i in range(4)]
+
+ LOG.info('Create tsigkeys named: {}'.format(test_tsigkeys_names))
+ created_tsigkeys = []
+ for name in test_tsigkeys_names:
+ LOG.info('As Primary user create a zone to be used '
+ 'for {}'.format(name))
+ zone_name = dns_data_utils.rand_zone_name(
+ name="list_tsigkey_marker", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
+ self.addCleanup(
+ self.wait_zone_delete, self.zone_client, zone['id'])
+ LOG.info('As Admin user create "{}" tsigkey'.format(name))
+ tsigkey = self.admin_client.create_tsigkey(
+ resource_id=zone['id'], name=name)[1]
+ self.addCleanup(self.admin_client.delete_tsigkey, tsigkey['id'])
+ created_tsigkeys.append(tsigkey['id'])
+
+ LOG.info('As Admin, list all tsigkeys using url queries:"limit=2" '
+ 'and "name={}*"'.format(test_tsigkeys_name))
+ body = self.admin_client.list_tsigkeys(
+ params={
+ 'limit': 2, 'name': test_tsigkeys_name + '*'})[1]
+ tsigkeys = body['tsigkeys']
+ self.assertEqual(2, len(tsigkeys),
+ 'Failed, response is not limited as expected')
+
+ LOG.info('Get the marker to be used for subsequent request')
+ first_set_of_ids = [item['id'] for item in tsigkeys]
+ links = body['links']
+ marker = links['next'].split('marker=')[-1]
+
+ LOG.info('Use marker for subsequent request to get the rest of '
+ 'tsigkeys that contains "{}*" in their '
+ 'names'.format(test_tsigkeys_name))
+ tsigkeys = self.admin_client.list_tsigkeys(
+ params={'marker': marker, 'limit': 2,
+ 'name': test_tsigkeys_name + '*'})[1]['tsigkeys']
+ self.assertEqual(2, len(tsigkeys),
+ 'Failed, response is not limited as expected')
+ second_set_of_ids = [item['id'] for item in tsigkeys]
+
+ LOG.info('Make sure that the merge of tsigkeys IDs received in two '
+ 'phases using "marker" url query, contains all the IDs '
+ 'created within the test')
+ self.assertEqual(
+ sorted(first_set_of_ids + second_set_of_ids),
+ sorted(created_tsigkeys),
+ 'Failed, tsigkeys IDs received in two phases are not as expected')
+
+ @decorators.idempotent_id('d5c6dfcc-d8af-11eb-b95a-74e5f9e2a801')
+ def test_list_tsigkey_sort_key_with_sort_direction(self):
+ names_to_create = [data_utils.rand_name(name) for name in
+ ['bbb_tsgikey', 'aaa_tsgikey', 'ccc_tsgikey']]
+ created_tsigkey_ids = []
+ for name in names_to_create:
+ LOG.info('As Primary user create a zone for: {} '.format(name))
+ zone_name = dns_data_utils.rand_zone_name(
+ name="list_tsigkey_sort", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
+ self.addCleanup(
+ self.wait_zone_delete, self.zone_client, zone['id'])
+ LOG.info('As Admin user create a tsigkey: {} '.format(name))
+ tsigkey = self.admin_client.create_tsigkey(
+ resource_id=zone['id'], name=name)[1]
+ self.addCleanup(self.admin_client.delete_tsigkey, tsigkey['id'])
+ created_tsigkey_ids.append(tsigkey['id'])
+
+ LOG.info('As Admin, list all tsigkeys using "asc" to sort by names')
+ sorted_tsigkeys = self.admin_client.list_tsigkeys(
+ params={'sort_dir': 'asc', 'sort_key': 'name'})[1]['tsigkeys']
+ sorted_by_names = [item['name'] for item in sorted_tsigkeys]
+ self.assertEqual(
+ sorted(sorted_by_names),
+ sorted_by_names,
+ 'Failed, tsgikeys names are not sorted in "asc" as expected')
+
+ LOG.info('As Admin, list all tsigkey using "desc" to sort by names')
+ sorted_tsigkeys = self.admin_client.list_tsigkeys(
+ params={'sort_dir': 'desc', 'sort_key': 'name'})[1]['tsigkeys']
+ sorted_by_names = [item['name'] for item in sorted_tsigkeys]
+ self.assertEqual(
+ sorted(sorted_by_names, reverse=True),
+ sorted_by_names,
+ 'Failed, tsgikeys names are not sorted in "desc" as expected')
+
+ LOG.info('As Admin, list all tsigkeys using "asc" to sort by ID')
+ sorted_tsigkeys = self.admin_client.list_tsigkeys(
+ params={'sort_dir': 'asc', 'sort_key': 'id'})[1]['tsigkeys']
+ sorted_by_ids = [item['id'] for item in sorted_tsigkeys]
+ self.assertEqual(
+ sorted(sorted_by_ids),
+ sorted_by_ids,
+ 'Failed, tsgikeys IDs are not sorted in "asc" as expected')
+
+ LOG.info('As Admin, list all tsigkeys using "zababun" direction '
+ 'to sort by names, expected: "invalid_sort_dir"')
+ self.assertRaisesDns(
+ lib_exc.BadRequest, 'invalid_sort_dir', 400,
+ self.admin_client.list_tsigkeys,
+ params={'sort_dir': 'zababun', 'sort_key': 'name'})
+
+ LOG.info('As Admin, list all tsigkeys using "zababun" as a key value,'
+ 'expected: "invalid_sort_key"')
+ self.assertRaisesDns(
+ lib_exc.BadRequest, 'invalid_sort_key', 400,
+ self.admin_client.list_tsigkeys,
+ params={'sort_dir': 'asc', 'sort_key': 'zababun'})
+
+ @decorators.idempotent_id('4162a840-d8b2-11eb-b95a-74e5f9e2a801')
+ def test_list_tsigkey_filter_by_name(self):
+ tsigkey_name = data_utils.rand_name('ddd_tsgikey')
+ LOG.info('As Primary user create a zone for: {} '.format(tsigkey_name))
+ zone_name = dns_data_utils.rand_zone_name(
+ name="list_tsigkey_filter_name", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
+ self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
+ LOG.info('As Admin user create a tsigkey: {} '.format(tsigkey_name))
+ tsigkey = self.admin_client.create_tsigkey(
+ resource_id=zone['id'], name=tsigkey_name)[1]
+ self.addCleanup(self.admin_client.delete_tsigkey, tsigkey['id'])
+
+ LOG.info('As Admin, list all tsigkeys named:{}'.format(tsigkey_name))
+ listed_tsigkeys = self.admin_client.list_tsigkeys(
+ params={'name': tsigkey_name})[1]['tsigkeys']
+ self.assertEqual(
+ 1, len(listed_tsigkeys),
+ 'Failed, only a single tsigkey, named: {} should be '
+ 'listed.'.format(tsigkey_name))
+
+ LOG.info('As Admin, list all tsigkeys named:"zababun"')
+ listed_tsigkeys = self.admin_client.list_tsigkeys(
+ params={'name': 'zababun'})[1]['tsigkeys']
+ self.assertEqual(
+ 0, len(listed_tsigkeys), 'Failed, no tsigkey should be listed')
+
+ @decorators.idempotent_id('e8bcf80a-d8b4-11eb-b95a-74e5f9e2a801')
+ def test_list_tsigkey_filter_by_scope(self):
+
+ LOG.info('Create tsigkey for a pool')
+ pool = self.pool_admin_client.create_pool(
+ project_id=self.os_admin.credentials.project_id)[1]
+ self.addCleanup(
+ self.pool_admin_client.delete_pool, pool['id'],
+ headers={
+ 'x-auth-sudo-project-id':
+ self.os_admin.credentials.project_id})
+ pool_tsigkey = self.admin_client.create_tsigkey(
+ resource_id=pool['id'], scope='POOL')[1]
+ self.addCleanup(self.admin_client.delete_tsigkey, pool_tsigkey['id'])
+
+ LOG.info('Create tsigkey for a zone')
+ zone_name = dns_data_utils.rand_zone_name(
+ name="list_tsigkey_filter_scope", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
+ self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
+ zone_tsigkey = self.admin_client.create_tsigkey(
+ resource_id=zone['id'], scope='ZONE')[1]
+ self.addCleanup(self.admin_client.delete_tsigkey, zone_tsigkey['id'])
+
+ LOG.info('List all "scope=POOL" tsigkeys')
+ listed_pool_scopes = [
+ item['scope'] for item in self.admin_client.list_tsigkeys(
+ params={'scope': 'POOL'})[1]['tsigkeys']]
+ self.assertEqual(
+ {'POOL'}, set(listed_pool_scopes),
+ 'Failed, the only scopes expected to be listed are: "POOL"')
+
+ LOG.info('List all "scope=ZONE" tsigkeys')
+ listed_zone_scopes = [
+ item['scope'] for item in self.admin_client.list_tsigkeys(
+ params={'scope': 'ZONE'})[1]['tsigkeys']]
+ self.assertEqual(
+ {'ZONE'}, set(listed_zone_scopes),
+ 'Failed, the only scopes expected to be listed are: "ZONE"')
+
+ LOG.info('List all "scope=zababun" tsigkeys')
+ listed_zone_scopes = [
+ item['scope'] for item in self.admin_client.list_tsigkeys(
+ params={'scope': 'zababun'})[1]['tsigkeys']]
+ self.assertEqual(
+ 0, len(listed_zone_scopes),
+ 'Failed, no tsigkey is expected to be listed')
+
+ @decorators.idempotent_id('794554f0-d8b8-11eb-b95a-74e5f9e2a801')
+ def test_list_tsigkey_filter_by_algorithm(self):
+
+ LOG.info('Create tsigkey for a pool')
+ algorithm = 'hmac-sha256'
+ pool = self.pool_admin_client.create_pool(
+ project_id=self.os_admin.credentials.project_id)[1]
+ self.addCleanup(
+ self.pool_admin_client.delete_pool, pool['id'],
+ headers={
+ 'x-auth-sudo-project-id':
+ self.os_admin.credentials.project_id})
+ pool_tsigkey = self.admin_client.create_tsigkey(
+ resource_id=pool['id'], algorithm=algorithm)[1]
+ self.addCleanup(self.admin_client.delete_tsigkey, pool_tsigkey['id'])
+
+ LOG.info('List all "algorithm={}" tsigkeys '.format(algorithm))
+ listed_tsigkeys = [
+ item['algorithm'] for item in self.admin_client.list_tsigkeys(
+ params={'algorithm': algorithm})[1]['tsigkeys']]
+ self.assertEqual(
+ {algorithm}, set(listed_tsigkeys),
+ 'Failed, the only tsigkeys expected to be listed must '
+ 'have algorithm:{} '.format(algorithm))
+
+ LOG.info('List all "algorithm=zababun" tsigkeys')
+ listed_tsigkeys = [
+ item['algorithm'] for item in self.admin_client.list_tsigkeys(
+ params={'algorithm': 'zababun'})[1]['tsigkeys']]
+ self.assertEqual(
+ 0, len(listed_tsigkeys),
+ "Failed, no tsigkey is expectedto be listed")
+
@decorators.idempotent_id('c5d7facf-0f05-47a2-a4fb-87f203860880')
def test_show_tsigkey(self):
LOG.info('Create a resource')
- _, zone = self.zone_client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="show_tsigkey", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
LOG.info('Create a tsigkey')
- _, tsigkey = self.admin_client.create_tsigkey(resource_id=zone['id'])
+ tsigkey = self.admin_client.create_tsigkey(resource_id=zone['id'])[1]
self.addCleanup(self.admin_client.delete_tsigkey, tsigkey['id'])
LOG.info('Fetch the tsigkey')
- _, body = self.admin_client.show_tsigkey(tsigkey['id'])
+ body = self.admin_client.show_tsigkey(tsigkey['id'])[1]
LOG.info('Ensure the fetched response matches the created tsigkey')
self.assertExpected(tsigkey, body, self.excluded_keys)
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader']
+
+ self.check_list_show_RBAC_enforcement(
+ 'TsigkeyClient', 'show_tsigkey', expected_allowed, True,
+ tsigkey['id'])
+
@decorators.idempotent_id('d09dc0dd-dd72-41ee-9085-2afb2bf35459')
def test_update_tsigkey(self):
LOG.info('Create a resource')
- _, zone = self.zone_client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="update_tsigkey", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
LOG.info('Create a tsigkey')
- _, tsigkey = self.admin_client.create_tsigkey(resource_id=zone['id'])
+ tsigkey = self.admin_client.create_tsigkey(resource_id=zone['id'])[1]
self.addCleanup(self.admin_client.delete_tsigkey, tsigkey['id'])
tsigkey_data = {
"name": "Patch tsigkey",
- "secret": "NewSecretKey",
- "scope": "POOL"}
+ "secret": "NewSecretKey"}
LOG.info('Update the tsigkey')
- _, patch_tsigkey = self.admin_client.update_tsigkey(tsigkey['id'],
+ patch_tsigkey = self.admin_client.update_tsigkey(tsigkey['id'],
name=tsigkey_data['name'],
- secret=tsigkey_data['secret'],
- scope=tsigkey_data['scope'])
+ secret=tsigkey_data['secret'])[1]
self.assertEqual(tsigkey_data['name'], patch_tsigkey['name'])
self.assertEqual(tsigkey_data['secret'], patch_tsigkey['secret'])
- self.assertEqual(tsigkey_data['scope'], patch_tsigkey['scope'])
+
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'TsigkeyClient', 'update_tsigkey', expected_allowed, False,
+ tsigkey['id'], name=tsigkey_data['name'],
+ secret=tsigkey_data['secret'])
@decorators.idempotent_id('9cdffbd2-bc67-4a25-8eb7-4be8635c88a3')
def test_delete_tsigkey(self):
LOG.info('Create a resource')
- _, zone = self.zone_client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="delete_tsigkey", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
LOG.info('Create a tsigkey')
- _, tsigkey = self.admin_client.create_tsigkey(resource_id=zone['id'])
+ tsigkey = self.admin_client.create_tsigkey(resource_id=zone['id'])[1]
+
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'TsigkeyClient', 'delete_tsigkey', expected_allowed, False,
+ tsigkey['id'])
LOG.info('Delete the tsigkey')
- _, body = self.admin_client.delete_tsigkey(tsigkey['id'])
+ self.admin_client.delete_tsigkey(tsigkey['id'])
self.assertRaises(lib_exc.NotFound,
lambda: self.admin_client.show_tsigkey(tsigkey['id']))
@@ -191,7 +534,7 @@
class TestTsigkeyInvalidIdAdmin(BaseTsigkeyTest):
- credentials = ["admin", "system_admin"]
+ credentials = ["admin", "primary", "system_admin"]
@classmethod
def setup_credentials(cls):
@@ -204,8 +547,11 @@
super(TestTsigkeyInvalidIdAdmin, cls).setup_clients()
if CONF.enforce_scope.designate:
cls.admin_client = cls.os_system_admin.dns_v2.TsigkeyClient()
+ cls.pool_admin_client = cls.os_system_admin.dns_v2.PoolClient()
else:
cls.admin_client = cls.os_admin.dns_v2.TsigkeyClient()
+ cls.pool_admin_client = cls.os_admin.dns_v2.PoolClient()
+ cls.zone_client = cls.os_primary.dns_v2.ZonesClient()
@decorators.idempotent_id('2a8dfc75-9884-4b1c-8f1f-ed835d96f2fe')
def test_show_tsigkey_invalid_uuid(self):
@@ -234,3 +580,117 @@
self.assertEqual("invalid_uuid", resp_body['type'])
self.assertEqual("Invalid UUID tsigkey_id: foo",
resp_body['message'])
+
+ @decorators.idempotent_id('f94af13a-d743-11eb-beba-74e5f9e2a801')
+ def test_create_tsigkey_for_zone_invalid_algorithm(self):
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_tsigkey_invalid_algo", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
+ self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
+ tsigkey_data = {
+ "name": dns_data_utils.rand_zone_name('Example_Key'),
+ "algorithm": "zababun",
+ "secret": "SomeSecretKey",
+ "scope": "ZONE",
+ "resource_id": zone['id']}
+ self.assertRaisesDns(
+ lib_exc.BadRequest, 'invalid_object', 400,
+ self.admin_client.create_tsigkey,
+ tsigkey_data['resource_id'],
+ tsigkey_data['name'], tsigkey_data['algorithm'],
+ tsigkey_data['secret'], tsigkey_data['scope'])
+
+ @decorators.idempotent_id('4df903d8-d745-11eb-beba-74e5f9e2a801')
+ def test_create_tsigkey_for_zone_invalid_name(self):
+ LOG.info('Create a zone resource')
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_tsigkey_invalid_name", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
+ self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
+ tsigkey_data = {
+ "name": dns_data_utils.rand_zone_name(
+ 'Example_Key') * 1000,
+ "algorithm": "hmac-sha256",
+ "secret": "SomeSecretKey",
+ "scope": "ZONE",
+ "resource_id": zone['id']}
+ self.assertRaisesDns(
+ lib_exc.BadRequest, 'invalid_object', 400,
+ self.admin_client.create_tsigkey,
+ tsigkey_data['resource_id'],
+ tsigkey_data['name'], tsigkey_data['algorithm'],
+ tsigkey_data['secret'], tsigkey_data['scope'])
+
+ @decorators.idempotent_id('5d6b8a84-d745-11eb-beba-74e5f9e2a801')
+ @decorators.skip_because(bug="1933760")
+ def test_create_tsigkey_for_zone_empty_secret(self):
+ LOG.info('Create a zone resource')
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_tsigkey_empty_secret", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
+ self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
+ tsigkey_data = {
+ "name": dns_data_utils.rand_zone_name('Example_Key'),
+ "algorithm": "hmac-sha256",
+ "secret": '',
+ "scope": "ZONE",
+ "resource_id": zone['id']}
+ self.assertRaisesDns(
+ lib_exc.BadRequest, 'invalid_object', 400,
+ self.admin_client.create_tsigkey,
+ tsigkey_data['resource_id'],
+ tsigkey_data['name'], tsigkey_data['algorithm'],
+ tsigkey_data['secret'], tsigkey_data['scope'])
+
+ @decorators.idempotent_id('dfca9268-d745-11eb-beba-74e5f9e2a801')
+ def test_create_tsigkey_for_zone_invalid_scope(self):
+ LOG.info('Create a zone resource')
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_tsigkey_invalid_scope", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
+ self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
+ tsigkey_data = {
+ "name": dns_data_utils.rand_zone_name('Example_Key'),
+ "algorithm": "hmac-sha256",
+ "secret": "SomeSecretKey",
+ "scope": "zababun",
+ "resource_id": zone['id']}
+ self.assertRaisesDns(
+ lib_exc.BadRequest, 'invalid_object', 400,
+ self.admin_client.create_tsigkey,
+ tsigkey_data['resource_id'],
+ tsigkey_data['name'], tsigkey_data['algorithm'],
+ tsigkey_data['secret'], tsigkey_data['scope'])
+
+ @decorators.idempotent_id('57255858-d74a-11eb-beba-74e5f9e2a801')
+ def test_create_tsigkey_for_zone_invalid_zone_id(self):
+ LOG.info('Create a resource')
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_tsigkey_invalide_zone_id", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
+ self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
+ tsigkey_data = {
+ "name": dns_data_utils.rand_zone_name('Example_Key'),
+ "algorithm": "hmac-sha256",
+ "secret": "SomeSecretKey",
+ "scope": "ZONE",
+ "resource_id": data_utils.rand_uuid}
+ self.assertRaisesDns(
+ lib_exc.BadRequest, 'invalid_object', 400,
+ self.admin_client.create_tsigkey,
+ tsigkey_data['resource_id'],
+ tsigkey_data['name'], tsigkey_data['algorithm'],
+ tsigkey_data['secret'], tsigkey_data['scope'])
+
+ @decorators.idempotent_id('0dfbc2f8-d8bb-11eb-b95a-74e5f9e2a801')
+ @decorators.skip_because(bug="1934120")
+ def test_create_tsigkey_for_pool_with_scope_zone(self):
+ pool = self.pool_admin_client.create_pool()[1]
+ self.addCleanup(self.pool_admin_client.delete_pool, pool['id'])
+
+ LOG.info('Try to create a tsigkey using pool ID and "scope:ZONE", '
+ 'should fail because ID is for pool, but scope is ZONE')
+ self.assertRaisesDns(
+ lib_exc.BadRequest, 'invalid_object', 400,
+ self.admin_client.create_tsigkey,
+ resource_id=pool['id'], scope='ZONE')
diff --git a/designate_tempest_plugin/tests/api/v2/test_zone_tasks.py b/designate_tempest_plugin/tests/api/v2/test_zone_tasks.py
index bbeea75..70a2e77 100644
--- a/designate_tempest_plugin/tests/api/v2/test_zone_tasks.py
+++ b/designate_tempest_plugin/tests/api/v2/test_zone_tasks.py
@@ -22,6 +22,7 @@
from designate_tempest_plugin.common import constants as const
from designate_tempest_plugin.common import waiters
+from designate_tempest_plugin import data_utils as dns_data_utils
from designate_tempest_plugin.tests import base
from designate_tempest_plugin.services.dns.query.query_client \
@@ -35,9 +36,33 @@
excluded_keys = ['created_at', 'updated_at', 'version', 'links',
'status', 'action']
+ @classmethod
+ def setup_clients(cls):
+ super(BaseZonesTest, cls).setup_clients()
+
+ if CONF.enforce_scope.designate:
+ cls.admin_tld_client = cls.os_system_admin.dns_v2.TldClient()
+ else:
+ cls.admin_tld_client = cls.os_admin.dns_v2.TldClient()
+
+ @classmethod
+ def resource_setup(cls):
+ super(BaseZonesTest, cls).resource_setup()
+
+ # Make sure we have an allowed TLD available
+ tld_name = dns_data_utils.rand_zone_name(name="BaseZonesTest")
+ cls.tld_name = f".{tld_name}"
+ cls.class_tld = cls.admin_tld_client.create_tld(tld_name=tld_name[:-1])
+
+ @classmethod
+ def resource_cleanup(cls):
+ cls.admin_tld_client.delete_tld(cls.class_tld[1]['id'])
+ super(BaseZonesTest, cls).resource_cleanup()
+
class ZoneTasks(BaseZonesTest):
- credentials = ["primary", "alt", "admin", "system_admin"]
+ credentials = ["primary", "alt", "admin", "system_admin", "system_reader",
+ "project_member", "project_reader"]
@classmethod
def setup_credentials(cls):
@@ -58,7 +83,9 @@
@decorators.idempotent_id('287e2cd0-a0e7-11eb-b962-74e5f9e2a801')
def test_zone_abandon(self):
LOG.info('Create a PRIMARY zone')
- pr_zone = self.client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="zone_abandon", suffix=self.tld_name)
+ pr_zone = self.client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.client, pr_zone['id'])
waiters.wait_for_zone_status(self.client, pr_zone['id'], 'ACTIVE')
@@ -72,6 +99,17 @@
LOG.info('Check that the zone was created on Nameserver/BIND')
waiters.wait_for_query(self.query_client, pr_zone['name'], "SOA")
+ # Test RBAC
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZonesClient', 'abandon_zone', expected_allowed, False,
+ pr_zone['id'],
+ headers={'x-auth-sudo-project-id': pr_zone['project_id']})
+
+ # Test abandoning the zone
LOG.info('Abandon a zone')
self.admin_client.abandon_zone(
pr_zone['id'],
@@ -88,7 +126,9 @@
def test_zone_abandon_forbidden(self):
LOG.info('Create a PRIMARY zone and add to the cleanup')
- pr_zone = self.client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="zone_abandon_forbidden", suffix=self.tld_name)
+ pr_zone = self.client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.client, pr_zone['id'])
waiters.wait_for_zone_status(self.client, pr_zone['id'], 'ACTIVE')
@@ -144,7 +184,9 @@
def test_manually_trigger_update_secondary_zone_negative(self):
# Create a PRIMARY zone
LOG.info('Create a PRIMARY zone')
- pr_zone = self.client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="manually_trigger_update_primary", suffix=self.tld_name)
+ pr_zone = self.client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.client, pr_zone['id'])
waiters.wait_for_zone_status(self.client, pr_zone['id'], 'ACTIVE')
@@ -170,7 +212,9 @@
# Create a SECONDARY zone
LOG.info('Create a SECONDARY zone')
- sec_zone = self.client.create_zone(
+ zone_name = dns_data_utils.rand_zone_name(
+ name="manually_trigger_update_secondary", suffix=self.tld_name)
+ sec_zone = self.client.create_zone(name=zone_name,
zone_type=const.SECONDARY_ZONE_TYPE, primaries=nameservers)[1]
self.addCleanup(self.wait_zone_delete, self.client, sec_zone['id'])
LOG.info('Ensure we respond with CREATE+PENDING')
diff --git a/designate_tempest_plugin/tests/api/v2/test_zones.py b/designate_tempest_plugin/tests/api/v2/test_zones.py
index dec5028..3ab2601 100644
--- a/designate_tempest_plugin/tests/api/v2/test_zones.py
+++ b/designate_tempest_plugin/tests/api/v2/test_zones.py
@@ -24,8 +24,6 @@
from designate_tempest_plugin import data_utils as dns_data_utils
from designate_tempest_plugin.tests import base
-from designate_tempest_plugin.common import waiters
-
CONF = config.CONF
LOG = logging.getLogger(__name__)
@@ -34,9 +32,31 @@
excluded_keys = ['created_at', 'updated_at', 'version', 'links',
'status', 'action']
+ @classmethod
+ def setup_clients(cls):
+ super(BaseZonesTest, cls).setup_clients()
+
+ if CONF.enforce_scope.designate:
+ cls.admin_tld_client = cls.os_system_admin.dns_v2.TldClient()
+ else:
+ cls.admin_tld_client = cls.os_admin.dns_v2.TldClient()
+
+ @classmethod
+ def resource_setup(cls):
+ super(BaseZonesTest, cls).resource_setup()
+
+ # Make sure we have an allowed TLD available
+ tld_name = dns_data_utils.rand_zone_name(name="BaseZonesTest")
+ cls.tld_name = f".{tld_name}"
+ cls.class_tld = cls.admin_tld_client.create_tld(tld_name=tld_name[:-1])
+
+ @classmethod
+ def resource_cleanup(cls):
+ cls.admin_tld_client.delete_tld(cls.class_tld[1]['id'])
+ super(BaseZonesTest, cls).resource_cleanup()
+
class ZonesTest(BaseZonesTest):
- credentials = ["admin", "system_admin", "primary"]
@classmethod
def setup_credentials(cls):
@@ -58,12 +78,14 @@
def test_create_zones(self):
# Create a PRIMARY zone
LOG.info('Create a PRIMARY zone')
- zone = self.client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_zones_primary", suffix=self.tld_name)
+ zone = self.client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.client, zone['id'])
LOG.info('Ensure we respond with CREATE+PENDING')
- self.assertEqual('CREATE', zone['action'])
- self.assertEqual('PENDING', zone['status'])
+ self.assertEqual(const.CREATE, zone['action'])
+ self.assertEqual(const.PENDING, zone['status'])
# Get the Name Servers (hosts) created in PRIMARY zone
nameservers = self.client.show_zone_nameservers(zone['id'])[1]
@@ -71,27 +93,48 @@
# Create a SECONDARY zone
LOG.info('Create a SECONDARY zone')
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_zones_secondary", suffix=self.tld_name)
zone = self.client.create_zone(
- zone_type=const.SECONDARY_ZONE_TYPE, primaries=nameservers)[1]
+ name=zone_name, zone_type=const.SECONDARY_ZONE_TYPE,
+ primaries=nameservers)[1]
self.addCleanup(self.wait_zone_delete, self.client, zone['id'])
LOG.info('Ensure we respond with CREATE+PENDING')
- self.assertEqual('CREATE', zone['action'])
- self.assertEqual('PENDING', zone['status'])
+ self.assertEqual(const.CREATE, zone['action'])
+ self.assertEqual(const.PENDING, zone['status'])
+
+ # Test with no extra header overrides (sudo-project-id)
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+ expected_allowed.append('os_project_member')
+
+ self.check_CUD_RBAC_enforcement('ZonesClient', 'create_zone',
+ expected_allowed, False)
+
+ # Test with x-auth-sudo-project-id header
+ expected_allowed = ['os_admin']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZonesClient', 'create_zone', expected_allowed, False,
+ project_id=self.client.project_id)
@decorators.idempotent_id('ec150c22-f52e-11eb-b09b-74e5f9e2a801')
def test_create_zone_validate_recordsets_created(self):
# Create a PRIMARY zone and wait till it's Active
LOG.info('Create a PRIMARY zone')
- zone = self.client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_zone_validate_recordsets", suffix=self.tld_name)
+ zone = self.client.create_zone(name=zone_name,
+ wait_until=const.ACTIVE)[1]
self.addCleanup(self.wait_zone_delete, self.client, zone['id'])
LOG.info('Ensure we respond with CREATE+PENDING')
- self.assertEqual('CREATE', zone['action'])
- self.assertEqual('PENDING', zone['status'])
-
- LOG.info('Wait till the zone is Active')
- waiters.wait_for_zone_status(self.client, zone['id'], 'ACTIVE')
+ self.assertEqual(const.CREATE, zone['action'])
+ self.assertEqual(const.PENDING, zone['status'])
LOG.info('Ensure that SOA and NS recordsets types has been created.')
recordsets = self.recordset_client.list_recordset(
@@ -107,100 +150,219 @@
@decorators.idempotent_id('02ca5d6a-86ce-4f02-9d94-9e5db55c3055')
def test_show_zone(self):
LOG.info('Create a zone')
- _, zone = self.client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="show_zones", suffix=self.tld_name)
+ zone = self.client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.client, zone['id'])
LOG.info('Fetch the zone')
- _, body = self.client.show_zone(zone['id'])
+ body = self.client.show_zone(zone['id'])[1]
LOG.info('Ensure the fetched response matches the created zone')
self.assertExpected(zone, body, self.excluded_keys)
- @decorators.idempotent_id('49268b24-92de-11eb-9d02-74e5f9e2a801')
- def test_show_not_existing_zone(self):
- LOG.info('Fetch non existing zone')
- self.assertRaises(lib_exc.NotFound,
- lambda: self.client.show_zone(uuid.uuid1()))
+ # TODO(johnsom) Test reader roles once this bug is fixed.
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test with no extra header overrides (all_projects, sudo-project-id)
+ expected_allowed = ['os_primary']
- @decorators.idempotent_id('736e3b50-92e0-11eb-9d02-74e5f9e2a801')
- def test_use_invalid_id_to_show_zone(self):
- LOG.info('Fetch the zone using invalid zone ID')
- with self.assertRaisesDns(lib_exc.BadRequest, 'invalid_uuid', 400):
- self.client.show_zone(uuid='zahlabut')
+ self.check_list_show_RBAC_enforcement(
+ 'ZonesClient', 'show_zone', expected_allowed, True, zone['id'])
+
+ # Test with x-auth-all-projects and x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ZonesClient', 'show_zone', expected_allowed, False, zone['id'],
+ headers=self.all_projects_header)
+ self.check_list_show_RBAC_enforcement(
+ 'ZonesClient', 'show_zone', expected_allowed, False, zone['id'],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
@decorators.attr(type='smoke')
@decorators.idempotent_id('a4791906-6cd6-4d27-9f15-32273db8bb3d')
def test_delete_zone(self):
LOG.info('Create a zone')
- _, zone = self.client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="delete_zones", suffix=self.tld_name)
+ zone = self.client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.client, zone['id'],
ignore_errors=lib_exc.NotFound)
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement('ZonesClient', 'delete_zone',
+ expected_allowed, True, zone['id'])
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement('ZonesClient', 'delete_zone',
+ expected_allowed, False, zone['id'],
+ headers=self.all_projects_header)
+ self.check_CUD_RBAC_enforcement(
+ 'ZonesClient', 'delete_zone', expected_allowed, False, zone['id'],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
LOG.info('Delete the zone')
- _, body = self.client.delete_zone(zone['id'])
+ body = self.client.delete_zone(zone['id'])[1]
LOG.info('Ensure we respond with DELETE+PENDING')
- self.assertEqual('DELETE', body['action'])
- self.assertEqual('PENDING', body['status'])
-
- @decorators.idempotent_id('79921370-92e1-11eb-9d02-74e5f9e2a801')
- def test_delete_non_existing_zone(self):
- LOG.info('Delete non existing zone')
- self.assertRaises(lib_exc.NotFound,
- lambda: self.client.delete_zone(uuid.uuid1()))
+ self.assertEqual(const.DELETE, body['action'])
+ self.assertEqual(const.PENDING, body['status'])
@decorators.idempotent_id('5bfa3cfe-5bc8-443b-bf48-cfba44cbb247')
def test_list_zones(self):
LOG.info('Create a zone')
- _, zone = self.client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="list_zones", suffix=self.tld_name)
+ zone = self.client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.client, zone['id'])
LOG.info('List zones')
- _, body = self.client.list_zones()
+ body = self.client.list_zones()[1]
# TODO(kiall): We really want to assert that out newly created zone is
# present in the response.
self.assertGreater(len(body['zones']), 0)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC - Users that are allowed to call list, but should get
+ # zero zones.
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_admin', 'os_project_member',
+ 'os_project_reader']
+ else:
+ expected_allowed = ['os_alt']
+
+ self.check_list_RBAC_enforcement_count(
+ 'ZonesClient', 'list_zones', expected_allowed, 0)
+
+ # Test that users who should see the zone, can see it.
+ expected_allowed = ['os_primary']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'ZonesClient', 'list_zones', expected_allowed, [zone['id']])
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'ZonesClient', 'list_zones', expected_allowed, [zone['id']],
+ headers=self.all_projects_header)
+ self.check_list_IDs_RBAC_enforcement(
+ 'ZonesClient', 'list_zones', expected_allowed, [zone['id']],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('123f51cb-19d5-48a9-aacc-476742c02141')
def test_update_zone(self):
LOG.info('Create a zone')
- _, zone = self.client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="update_zone", suffix=self.tld_name)
+ zone = self.client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.client, zone['id'])
# Generate a random description
description = data_utils.rand_name()
LOG.info('Update the zone')
- _, zone = self.client.update_zone(
- zone['id'], description=description)
+ zone = self.client.update_zone(
+ zone['id'], description=description)[1]
LOG.info('Ensure we respond with UPDATE+PENDING')
- self.assertEqual('UPDATE', zone['action'])
- self.assertEqual('PENDING', zone['status'])
+ self.assertEqual(const.UPDATE, zone['action'])
+ self.assertEqual(const.PENDING, zone['status'])
LOG.info('Ensure we respond with updated values')
self.assertEqual(description, zone['description'])
- @decorators.idempotent_id('e391e30a-92e0-11eb-9d02-74e5f9e2a801')
- def test_update_non_existing_zone(self):
- LOG.info('Update non existing zone')
- self.assertRaises(lib_exc.NotFound,
- lambda: self.client.update_zone(
- uuid.uuid1(), description=data_utils.rand_name()))
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
- @decorators.idempotent_id('925192f2-0ed8-4591-8fe7-a9fa028f90a0')
- def test_list_zones_dot_json_fails(self):
- uri = self.client.get_uri('zones.json')
+ self.check_CUD_RBAC_enforcement(
+ 'ZonesClient', 'update_zone', expected_allowed, True,
+ zone['id'], description=description)
- self.assertRaises(lib_exc.NotFound,
- lambda: self.client.get(uri))
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZonesClient', 'update_zone', expected_allowed, False,
+ zone['id'], description=description,
+ headers=self.all_projects_header)
+ self.check_CUD_RBAC_enforcement(
+ 'ZonesClient', 'update_zone', expected_allowed, False,
+ zone['id'], description=description,
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
+ @decorators.idempotent_id('3acddc86-62cc-4bfa-8589-b99e5d239bf2')
+ @decorators.skip_because(bug="1960487")
+ def test_serial_changes_on_update(self):
+ LOG.info('Create a zone')
+ zone_name = dns_data_utils.rand_zone_name(
+ name="serial_changes_on_update", suffix=self.tld_name)
+ zone = self.client.create_zone(name=zone_name,
+ wait_until=const.ACTIVE)[1]
+ self.addCleanup(self.wait_zone_delete, self.client, zone['id'])
+
+ LOG.info("Update Zone's email")
+ update_email = self.client.update_zone(
+ zone['id'], email=dns_data_utils.rand_email())[1]
+ self.assertNotEqual(
+ zone['serial'], update_email['serial'],
+ "Failed, expected: 'Serial' is supposed to be changed "
+ "on Email update.")
+
+ LOG.info("Update Zone's TTL")
+ update_ttl = self.client.update_zone(
+ zone['id'], ttl=dns_data_utils.rand_ttl())[1]
+ self.assertNotEqual(
+ update_email['serial'], update_ttl['serial'],
+ "Failed, expected: 'Serial' is supposed to be changed "
+ "on TTL update.")
+
+ LOG.info("Update Zone's email and description")
+ update_email_description = self.client.update_zone(
+ zone['id'],
+ email=dns_data_utils.rand_email(),
+ description=data_utils.rand_name())[1]
+ self.assertNotEqual(
+ update_ttl['serial'], update_email_description['serial'],
+ "Failed, expect the Serial to change "
+ "when the Email and Description are updated")
+
+ LOG.info("Update Zone's description")
+ update_description = self.client.update_zone(
+ zone['id'], description=data_utils.rand_name())[1]
+ self.assertEqual(
+ update_email_description['serial'], update_description['serial'],
+ "Failed, expect the Serial to not change "
+ "when the Description is updated")
@decorators.idempotent_id('d4ce813e-64a5-11eb-9f43-74e5f9e2a801')
def test_get_primary_zone_nameservers(self):
# Create a zone and get the associated "pool_id"
LOG.info('Create a zone')
- zone = self.client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="get_primary_nameservers", suffix=self.tld_name)
+ zone = self.client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.client, zone['id'])
zone_pool_id = zone['pool_id']
@@ -222,6 +384,48 @@
pool_nameservers, zone_nameservers,
'Failed - Pool and Zone nameservers should be the same')
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC
+ expected_allowed = ['os_primary']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ZonesClient', 'show_zone_nameservers', expected_allowed,
+ True, zone['id'])
+
+ # Test with x-auth-all-projects and x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ZonesClient', 'show_zone_nameservers', expected_allowed,
+ False, zone['id'], headers=self.all_projects_header)
+ self.check_list_show_RBAC_enforcement(
+ 'ZonesClient', 'show_zone_nameservers', expected_allowed,
+ False, zone['id'],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
+ @decorators.idempotent_id('9970b632-f2db-11ec-a757-201e8823901f')
+ def test_create_zone_ttl_zero(self):
+ LOG.info('Create a PRIMARY zone')
+ zone_name = dns_data_utils.rand_zone_name(
+ name="test_create_zone_ttl_zero", suffix=self.tld_name)
+ zone = self.client.create_zone(name=zone_name, ttl=0)[1]
+ self.addCleanup(self.wait_zone_delete, self.client, zone['id'])
+
+ LOG.info('Ensure we respond with CREATE+PENDING')
+ self.assertEqual(const.CREATE, zone['action'])
+ self.assertEqual(const.PENDING, zone['status'])
+
+ LOG.info('Fetch the zone, ensure TTL is Zero')
+ body = self.client.show_zone(zone['id'])[1]
+ self.assertEqual(
+ 0, body['ttl'],
+ "Failed, actual Zone's TTL:{} "
+ "is not Zero".format(body['ttl']))
+
class ZonesAdminTest(BaseZonesTest):
credentials = ["primary", "admin", "system_admin", "alt"]
@@ -245,7 +449,9 @@
@decorators.idempotent_id('f6fe8cce-8b04-11eb-a861-74e5f9e2a801')
def test_show_zone_impersonate_another_project(self):
LOG.info('Create zone "A" using primary client')
- zone = self.client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="show_zone_impersonate", suffix=self.tld_name)
+ zone = self.client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.client, zone['id'])
LOG.info('As Alt tenant show zone created by Primary tenant. '
@@ -280,31 +486,30 @@
def test_list_all_projects_zones(self):
LOG.info('Create zone "A" using Primary client')
- primary_zone = self.client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="list_zone_all_projects_A", suffix=self.tld_name)
+ primary_zone = self.client.create_zone(name=zone_name,
+ wait_until=const.ACTIVE)[1]
self.addCleanup(
self.wait_zone_delete, self.client, primary_zone['id'])
- LOG.info('Wait till the zone is ACTIVE')
- waiters.wait_for_zone_status(
- self.client, primary_zone['id'], 'ACTIVE')
LOG.info('Create zone "B" using Alt client')
- alt_zone = self.alt_client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="list_zone_all_projects_B", suffix=self.tld_name)
+ alt_zone = self.alt_client.create_zone(name=zone_name,
+ wait_until=const.ACTIVE)[1]
self.addCleanup(
self.wait_zone_delete, self.alt_client, alt_zone['id'])
- LOG.info('Wait till the zone is ACTIVE')
- waiters.wait_for_zone_status(
- self.alt_client, alt_zone['id'], 'ACTIVE')
LOG.info('Create zone "C" using Admin client')
+ zone_name = dns_data_utils.rand_zone_name(
+ name="list_zone_all_projects_C", suffix=self.tld_name)
admin_zone = self.admin_client.create_zone(
- project_id="FakeProjectID")[1]
+ name=zone_name, project_id="FakeProjectID",
+ wait_until=const.ACTIVE)[1]
self.addCleanup(
self.wait_zone_delete, self.admin_client, admin_zone['id'],
headers=self.all_projects_header)
- LOG.info('Wait till the zone is ACTIVE')
- waiters.wait_for_zone_status(
- self.admin_client, admin_zone['id'], 'ACTIVE',
- headers=self.all_projects_header)
LOG.info('As admin user list all projects zones')
# Note: This is an all-projects list call, so other tests running
@@ -326,7 +531,7 @@
class ZoneOwnershipTest(BaseZonesTest):
- credentials = ["primary", "alt"]
+ credentials = ["primary", "alt", "admin", "system_admin"]
@classmethod
def setup_credentials(cls):
@@ -343,7 +548,9 @@
@decorators.idempotent_id('5d28580a-a012-4b57-b211-e077b1a01340')
def test_no_create_duplicate_domain(self):
LOG.info('Create a zone as a default user')
- _, zone = self.client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="no_create_duplicate", suffix=self.tld_name)
+ zone = self.client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.client, zone['id'])
LOG.info('Create a zone as an default with existing domain')
@@ -357,7 +564,9 @@
@decorators.idempotent_id('a48776fd-b1aa-4a25-9f09-d1d34cfbb175')
def test_no_create_subdomain_by_alt_user(self):
LOG.info('Create a zone as a default user')
- _, zone = self.client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="no_create_subdomain_by_alt", suffix=self.tld_name)
+ zone = self.client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.client, zone['id'])
LOG.info('Create a zone as an alt user with existing subdomain')
@@ -368,10 +577,11 @@
@decorators.idempotent_id('f1723d48-c082-43cd-94bf-ebeb5b8c9458')
def test_no_create_superdomain_by_alt_user(self):
- zone_name = dns_data_utils.rand_zone_name()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="no_create_superdomain_by_alt", suffix=self.tld_name)
LOG.info('Create a zone as a default user')
- _, zone = self.client.create_zone(name='a.b.' + zone_name)
+ zone = self.client.create_zone(name='a.b.' + zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.client, zone['id'])
LOG.info('Create a zone as an alt user with existing superdomain')
@@ -380,6 +590,8 @@
class ZonesNegativeTest(BaseZonesTest):
+ credentials = ["admin", "primary", "system_admin"]
+
@classmethod
def setup_credentials(cls):
# Do not create network resources for these test.
@@ -432,3 +644,35 @@
lib_exc.BadRequest, 'invalid_object', 400,
self.client.create_zone,
description=dns_data_utils.rand_zone_name() * 10000)
+
+ @decorators.idempotent_id('49268b24-92de-11eb-9d02-74e5f9e2a801')
+ def test_show_not_existing_zone(self):
+ LOG.info('Fetch non existing zone')
+ self.assertRaises(lib_exc.NotFound,
+ lambda: self.client.show_zone(uuid.uuid1()))
+
+ @decorators.idempotent_id('736e3b50-92e0-11eb-9d02-74e5f9e2a801')
+ def test_use_invalid_id_to_show_zone(self):
+ LOG.info('Fetch the zone using invalid zone ID')
+ with self.assertRaisesDns(lib_exc.BadRequest, 'invalid_uuid', 400):
+ self.client.show_zone(uuid='zahlabut')
+
+ @decorators.idempotent_id('79921370-92e1-11eb-9d02-74e5f9e2a801')
+ def test_delete_non_existing_zone(self):
+ LOG.info('Delete non existing zone')
+ self.assertRaises(lib_exc.NotFound,
+ lambda: self.client.delete_zone(uuid.uuid1()))
+
+ @decorators.idempotent_id('e391e30a-92e0-11eb-9d02-74e5f9e2a801')
+ def test_update_non_existing_zone(self):
+ LOG.info('Update non existing zone')
+ self.assertRaises(lib_exc.NotFound,
+ lambda: self.client.update_zone(
+ uuid.uuid1(), description=data_utils.rand_name()))
+
+ @decorators.idempotent_id('925192f2-0ed8-4591-8fe7-a9fa028f90a0')
+ def test_list_zones_dot_json_fails(self):
+ uri = self.client.get_uri('zones.json')
+
+ self.assertRaises(lib_exc.NotFound,
+ lambda: self.client.get(uri))
diff --git a/designate_tempest_plugin/tests/api/v2/test_zones_exports.py b/designate_tempest_plugin/tests/api/v2/test_zones_exports.py
index 31da506..55bd38c 100644
--- a/designate_tempest_plugin/tests/api/v2/test_zones_exports.py
+++ b/designate_tempest_plugin/tests/api/v2/test_zones_exports.py
@@ -21,6 +21,7 @@
from designate_tempest_plugin.tests import base
from designate_tempest_plugin.common import waiters
from designate_tempest_plugin.common import constants as const
+from designate_tempest_plugin import data_utils as dns_data_utils
CONF = config.CONF
LOG = logging.getLogger(__name__)
@@ -30,9 +31,33 @@
excluded_keys = ['created_at', 'updated_at', 'version', 'links',
'status', 'location']
+ @classmethod
+ def setup_clients(cls):
+ super(BaseZoneExportsTest, cls).setup_clients()
+
+ if CONF.enforce_scope.designate:
+ cls.admin_tld_client = cls.os_system_admin.dns_v2.TldClient()
+ else:
+ cls.admin_tld_client = cls.os_admin.dns_v2.TldClient()
+
+ @classmethod
+ def resource_setup(cls):
+ super(BaseZoneExportsTest, cls).resource_setup()
+
+ # Make sure we have an allowed TLD available
+ tld_name = dns_data_utils.rand_zone_name(name="BaseZoneExportsTest")
+ cls.tld_name = f".{tld_name}"
+ cls.class_tld = cls.admin_tld_client.create_tld(tld_name=tld_name[:-1])
+
+ @classmethod
+ def resource_cleanup(cls):
+ cls.admin_tld_client.delete_tld(cls.class_tld[1]['id'])
+ super(BaseZoneExportsTest, cls).resource_cleanup()
+
class ZonesExportTest(BaseZoneExportsTest):
- credentials = ["primary", "admin", "system_admin", "alt"]
+ credentials = ["primary", "admin", "system_admin", "system_reader", "alt",
+ "project_member", "project_reader"]
@classmethod
def setup_credentials(cls):
@@ -52,9 +77,11 @@
cls.client = cls.os_primary.dns_v2.ZoneExportsClient()
cls.alt_client = cls.os_alt.dns_v2.ZoneExportsClient()
- def _create_zone_export(self):
+ def _create_zone_export(self, test_name):
LOG.info('Create a zone')
- zone = self.zone_client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name=test_name, suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
LOG.info('Create a zone export')
@@ -66,15 +93,25 @@
@decorators.idempotent_id('2dd8a9a0-98a2-4bf6-bb51-286583b30f40')
def test_create_zone_export(self):
- zone_export = self._create_zone_export()[1]
+ zone, zone_export = self._create_zone_export('create_zone_export')
LOG.info('Ensure we respond with PENDING')
self.assertEqual(const.PENDING, zone_export['status'])
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+ expected_allowed.append('os_project_member')
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZoneExportsClient', 'create_zone_export', expected_allowed, True,
+ zone['id'])
+
@decorators.attr(type='smoke')
@decorators.idempotent_id('2d29a2a9-1941-4b7e-9d8a-ad6c2140ea68')
def test_show_zone_export(self):
- zone_export = self._create_zone_export()[1]
+ zone_export = self._create_zone_export('show_zone_export')[1]
LOG.info('Re-Fetch the zone export')
body = self.client.show_zone_export(zone_export['id'])[1]
@@ -82,10 +119,31 @@
LOG.info('Ensure the fetched response matches the zone export')
self.assertExpected(zone_export, body, self.excluded_keys)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC
+ expected_allowed = ['os_primary']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ZoneExportsClient', 'show_zone_export', expected_allowed, True,
+ zone_export['id'])
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ZoneExportsClient', 'show_zone_export', expected_allowed, True,
+ zone_export['id'], headers=self.all_projects_header)
+
@decorators.idempotent_id('fb04507c-9600-11eb-b1cd-74e5f9e2a801')
def test_show_zone_export_impersonate_another_project(self):
LOG.info('Create a zone')
- zone = self.zone_client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name='show_zone_export_impersonate', suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
LOG.info('Create a zone export using primary client')
@@ -105,16 +163,52 @@
'for a primary client: {}'.format(
zone_export['id'], listed_export_ids))
+ # Test RBAC with x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ZoneExportsClient', 'show_zone_export', expected_allowed, True,
+ zone_export['id'],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('97234f00-8bcb-43f8-84dd-874f8bc4a80e')
def test_delete_zone_export(self):
LOG.info('Create a zone')
- _, zone = self.zone_client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name='delete_zone_export', suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'],
ignore_errors=lib_exc.NotFound)
LOG.info('Create a zone export')
_, zone_export = self.client.create_zone_export(zone['id'])
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZoneExportsClient', 'delete_zone_export', expected_allowed, True,
+ zone_export['id'])
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZoneExportsClient', 'delete_zone_export', expected_allowed, False,
+ zone_export['id'], headers=self.all_projects_header)
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZoneExportsClient', 'delete_zone_export', expected_allowed, False,
+ zone_export['id'],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
LOG.info('Delete the zone export')
_, body = self.client.delete_zone_export(zone_export['id'])
@@ -125,24 +219,60 @@
@decorators.idempotent_id('476bfdfe-58c8-46e2-b376-8403c0fff440')
def test_list_zone_exports(self):
- self._create_zone_export()[1]
+ export = self._create_zone_export('list_zone_exports')[1]
LOG.info('List zone exports')
body = self.client.list_zone_exports()[1]
self.assertGreater(len(body['exports']), 0)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC - Users that are allowed to call list, but should get
+ # zero zones.
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_admin', 'os_project_member',
+ 'os_project_reader']
+ else:
+ expected_allowed = ['os_alt']
+
+ self.check_list_RBAC_enforcement_count(
+ 'ZoneExportsClient', 'list_zone_exports', expected_allowed, 0)
+
+ # Test that users who should see the zone, can see it.
+ expected_allowed = ['os_primary']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'ZoneExportsClient', 'list_zone_exports',
+ expected_allowed, [export['id']])
+
+ # Test RBAC with x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'ZoneExportsClient', 'list_zone_exports',
+ expected_allowed, [export['id']],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('f34e7f34-9613-11eb-b1cd-74e5f9e2a801')
def test_list_zone_exports_all_projects(self):
LOG.info('Create a primary zone and its export')
- primary_zone = self.zone_client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name='list_zone_exports_all_projects', suffix=self.tld_name)
+ primary_zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(
self.wait_zone_delete, self.zone_client, primary_zone['id'])
primary_export = self.client.create_zone_export(primary_zone['id'])[1]
self.addCleanup(self.client.delete_zone_export, primary_export['id'])
LOG.info('Create an alt zone and its export')
- alt_zone = self.alt_zone_client.create_zone()[1]
+ alt_zone_name = dns_data_utils.rand_zone_name(
+ name='list_zone_exports_all_projects_alt', suffix=self.tld_name)
+ alt_zone = self.alt_zone_client.create_zone(name=alt_zone_name)[1]
self.addCleanup(
self.wait_zone_delete, self.alt_zone_client, alt_zone['id'])
alt_export = self.alt_client.create_zone_export(alt_zone['id'])[1]
@@ -165,18 +295,32 @@
'Failed, expected ID:{} was not found in '
'listed IDs:{}'.format(id, listed_exports_ids))
+ # Test RBAC with x-auth-all-projects
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'ZoneExportsClient', 'list_zone_exports', expected_allowed,
+ [alt_export['id']], headers=self.all_projects_header)
+
@decorators.idempotent_id('e4a11a14-9aaa-11eb-be59-74e5f9e2a801')
def test_list_zone_exports_filter_results(self):
LOG.info('Create a primary zone and its export')
- primary_zone = self.zone_client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name='list_zone_exports_filter', suffix=self.tld_name)
+ primary_zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(
self.wait_zone_delete, self.zone_client, primary_zone['id'])
primary_export = self.client.create_zone_export(primary_zone['id'])[1]
self.addCleanup(self.client.delete_zone_export, primary_export['id'])
LOG.info('Create an alt zone, its export and delete it')
- alt_zone = self.alt_zone_client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name='list_zone_exports_filter_alt', suffix=self.tld_name)
+ alt_zone = self.alt_zone_client.create_zone(name=zone_name)[1]
self.addCleanup(
self.wait_zone_delete, self.alt_zone_client, alt_zone['id'])
alt_export = self.alt_client.create_zone_export(alt_zone['id'])[1]
@@ -222,7 +366,7 @@
class ZonesExportTestNegative(BaseZoneExportsTest):
- credentials = ["primary", "alt"]
+ credentials = ["primary", "alt", "admin", "system_admin"]
@classmethod
def setup_credentials(cls):
@@ -237,9 +381,11 @@
cls.client = cls.os_primary.dns_v2.ZoneExportsClient()
cls.alt_client = cls.os_alt.dns_v2.ZoneExportsClient()
- def _create_zone_export(self):
+ def _create_zone_export(self, test_name):
LOG.info('Create a zone')
- zone = self.zone_client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(name=test_name,
+ suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
LOG.info('Create a zone export')
@@ -258,7 +404,9 @@
@decorators.idempotent_id('943dad4a-9617-11eb-b1cd-74e5f9e2a801')
def test_export_not_your_zone(self):
LOG.info('Create a primary zone.')
- primary_zone = self.zone_client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(name='export_not_your_zone',
+ suffix=self.tld_name)
+ primary_zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(
self.wait_zone_delete, self.zone_client, primary_zone['id'])
@@ -270,7 +418,9 @@
@decorators.idempotent_id('518dc308-9604-11eb-b1cd-74e5f9e2a801')
def test_create_zone_export_using_deleted_zone(self):
LOG.info('Create a zone')
- zone = self.zone_client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(name='export_deleted_zone',
+ suffix=self.tld_name)
+ zone = self.zone_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'],
ignore_errors=lib_exc.NotFound)
LOG.info("Delete the zone and wait till it's done.")
@@ -290,7 +440,8 @@
@decorators.idempotent_id('52a1fee0-c338-4ed9-b9f9-41ee7fd73375')
def test_show_zonefile_not_supported_accept_value(self):
- zone, zone_export = self._create_zone_export()
+ zone, zone_export = self._create_zone_export(
+ 'show_zonefile_bad_accept')
# Tempest-lib _error_checker will raise UnexpectedResponseCode
e = self.assertRaises(
lib_exc.UnexpectedResponseCode, self.client.show_exported_zonefile,
diff --git a/designate_tempest_plugin/tests/api/v2/test_zones_imports.py b/designate_tempest_plugin/tests/api/v2/test_zones_imports.py
index 510708d..8e03845 100644
--- a/designate_tempest_plugin/tests/api/v2/test_zones_imports.py
+++ b/designate_tempest_plugin/tests/api/v2/test_zones_imports.py
@@ -30,9 +30,33 @@
excluded_keys = ['created_at', 'updated_at', 'version', 'links',
'status', 'message', 'zone_id']
+ @classmethod
+ def setup_clients(cls):
+ super(BaseZonesImportTest, cls).setup_clients()
+
+ if CONF.enforce_scope.designate:
+ cls.admin_tld_client = cls.os_system_admin.dns_v2.TldClient()
+ else:
+ cls.admin_tld_client = cls.os_admin.dns_v2.TldClient()
+
+ @classmethod
+ def resource_setup(cls):
+ super(BaseZonesImportTest, cls).resource_setup()
+
+ # Make sure we have an allowed TLD available
+ tld_name = dns_data_utils.rand_zone_name(name="BaseZonesImportTest")
+ cls.tld_name = f".{tld_name}"
+ cls.class_tld = cls.admin_tld_client.create_tld(tld_name=tld_name[:-1])
+
+ @classmethod
+ def resource_cleanup(cls):
+ cls.admin_tld_client.delete_tld(cls.class_tld[1]['id'])
+ super(BaseZonesImportTest, cls).resource_cleanup()
+
class ZonesImportTest(BaseZonesImportTest):
- credentials = ["primary", "admin", "system_admin", "alt"]
+ credentials = ["primary", "admin", "system_admin", "system_reader", "alt",
+ "project_member", "project_reader"]
@classmethod
def setup_credentials(cls):
@@ -64,17 +88,34 @@
@decorators.idempotent_id('2e2d907d-0609-405b-9c96-3cb2b87e3dce')
def test_create_zone_import(self):
LOG.info('Create a zone import')
- _, zone_import = self.client.create_zone_import()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_zone_import", suffix=self.tld_name)
+ zone_data = dns_data_utils.rand_zonefile_data(name=zone_name)
+ zone_import = self.client.create_zone_import(
+ zonefile_data=zone_data)[1]
self.addCleanup(self.clean_up_resources, zone_import['id'])
# Make sure we complete the import and have the zone_id for cleanup
waiters.wait_for_zone_import_status(
self.client, zone_import['id'], const.COMPLETE)
+ # Test with no extra header overrides (sudo-project-id)
+ expected_allowed = ['os_admin', 'os_primary', 'os_alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+ expected_allowed.append('os_project_member')
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZoneImportsClient', 'create_zone_import', expected_allowed, False)
+
@decorators.idempotent_id('31eaf25a-9532-11eb-a55d-74e5f9e2a801')
def test_create_zone_import_invalid_ttl(self):
LOG.info('Try to create a zone import using invalid TTL value')
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_zone_import_invalid_ttl", suffix=self.tld_name)
+ zone_data = dns_data_utils.rand_zonefile_data(name=zone_name,
+ ttl='zahlabut')
zone_import = self.client.create_zone_import(
- zonefile_data=dns_data_utils.rand_zonefile_data(ttl='zahlabut'))[1]
+ zonefile_data=zone_data)[1]
self.addCleanup(self.clean_up_resources, zone_import['id'])
waiters.wait_for_zone_import_status(
self.client, zone_import['id'], "ERROR")
@@ -92,7 +133,11 @@
@decorators.idempotent_id('c8909558-0dc6-478a-9e91-eb97b52e59e0')
def test_show_zone_import(self):
LOG.info('Create a zone import')
- _, zone_import = self.client.create_zone_import()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="show_zone_import", suffix=self.tld_name)
+ zone_data = dns_data_utils.rand_zonefile_data(name=zone_name)
+ zone_import = self.client.create_zone_import(
+ zonefile_data=zone_data)[1]
self.addCleanup(self.clean_up_resources, zone_import['id'])
# Make sure we complete the import and have the zone_id for cleanup
waiters.wait_for_zone_import_status(
@@ -104,17 +149,62 @@
LOG.info('Ensure the fetched response matches the expected one')
self.assertExpected(zone_import, body, self.excluded_keys)
+ # TODO(johnsom) Test reader roles once this bug is fixed.
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test with no extra header overrides (all_projects, sudo-project-id)
+ expected_allowed = ['os_primary']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ZoneImportsClient', 'show_zone_import', expected_allowed, True,
+ zone_import['id'])
+
+ # Test with x-auth-all-projects
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ZoneImportsClient', 'show_zone_import', expected_allowed, False,
+ zone_import['id'], headers=self.all_projects_header)
+
@decorators.idempotent_id('56a16e68-b241-4e41-bc5c-c40747fa68e3')
def test_delete_zone_import(self):
LOG.info('Create a zone import')
- _, zone_import = self.client.create_zone_import()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="delete_zone_import", suffix=self.tld_name)
+ zone_data = dns_data_utils.rand_zonefile_data(name=zone_name)
+ zone_import = self.client.create_zone_import(
+ zonefile_data=zone_data)[1]
waiters.wait_for_zone_import_status(self.client, zone_import['id'],
const.COMPLETE)
- _, zone_import = self.client.show_zone_import(zone_import['id'])
+ zone_import = self.client.show_zone_import(zone_import['id'])[1]
self.addCleanup(self.wait_zone_delete,
self.zone_client,
zone_import['zone_id'])
+ # Test RBAC
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZoneImportsClient', 'delete_zone_import', expected_allowed, True,
+ zone_import['id'])
+
+ # Test RBAC with x-auth-all-projects and x-auth-sudo-project-id header
+ expected_allowed = ['os_admin', 'os_primary']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed.append('os_system_admin')
+
+ self.check_CUD_RBAC_enforcement(
+ 'ZoneImportsClient', 'delete_zone_import', expected_allowed, False,
+ zone_import['id'], headers=self.all_projects_header)
+ self.check_CUD_RBAC_enforcement(
+ 'ZoneImportsClient', 'delete_zone_import', expected_allowed, False,
+ zone_import['id'],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
LOG.info('Delete the zone')
resp, body = self.client.delete_zone_import(zone_import['id'])
@@ -125,22 +215,62 @@
@decorators.idempotent_id('9eab76af-1995-485f-a2ef-8290c1863aba')
def test_list_zones_imports(self):
LOG.info('Create a zone import')
- _, zone_import = self.client.create_zone_import()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="list_zone_imports", suffix=self.tld_name)
+ zone_data = dns_data_utils.rand_zonefile_data(name=zone_name)
+ zone_import = self.client.create_zone_import(
+ zonefile_data=zone_data)[1]
self.addCleanup(self.clean_up_resources, zone_import['id'])
# Make sure we complete the import and have the zone_id for cleanup
waiters.wait_for_zone_import_status(
self.client, zone_import['id'], const.COMPLETE)
LOG.info('List zones imports')
- _, body = self.client.list_zone_imports()
+ body = self.client.list_zone_imports()[1]
self.assertGreater(len(body['imports']), 0)
+ # TODO(johnsom) Test reader role once this bug is fixed:
+ # https://bugs.launchpad.net/tempest/+bug/1964509
+ # Test RBAC - Users that are allowed to call list, but should get
+ # zero zones.
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin', 'os_system_reader',
+ 'os_admin', 'os_project_member',
+ 'os_project_reader']
+ else:
+ expected_allowed = ['os_alt']
+
+ self.check_list_RBAC_enforcement_count(
+ 'ZoneImportsClient', 'list_zone_imports', expected_allowed, 0)
+
+ # Test that users who should see the zone, can see it.
+ expected_allowed = ['os_primary']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'ZoneImportsClient', 'list_zone_imports', expected_allowed,
+ [zone_import['id']])
+
+ # Test RBAC with x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'ZoneImportsClient', 'list_zone_imports', expected_allowed,
+ [zone_import['id']],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('2c1fa20e-9554-11eb-a55d-74e5f9e2a801')
def test_show_import_impersonate_another_project(self):
LOG.info('Import zone "A" using primary client')
- zone_import = self.client.create_zone_import()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="show_zone_import_impersonate", suffix=self.tld_name)
+ zone_data = dns_data_utils.rand_zonefile_data(name=zone_name)
+ zone_import = self.client.create_zone_import(
+ zonefile_data=zone_data)[1]
self.addCleanup(self.clean_up_resources, zone_import['id'])
# Make sure we complete the import and have the zone_id for cleanup
@@ -179,10 +309,25 @@
self.assertExpected(
zone_import, resp_body['imports'][0], self.excluded_keys)
+ # Test with x-auth-sudo-project-id header
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_show_RBAC_enforcement(
+ 'ZoneImportsClient', 'show_zone_import', expected_allowed, False,
+ zone_import['id'],
+ headers={'x-auth-sudo-project-id': self.client.project_id})
+
@decorators.idempotent_id('7bd06ec6-9556-11eb-a55d-74e5f9e2a801')
def test_list_import_zones_all_projects(self):
LOG.info('Create import zone "A" using primary client')
- zone_import = self.client.create_zone_import()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="_zone_imports_all_projects", suffix=self.tld_name)
+ zone_data = dns_data_utils.rand_zonefile_data(name=zone_name)
+ zone_import = self.client.create_zone_import(
+ zonefile_data=zone_data)[1]
self.addCleanup(self.clean_up_resources, zone_import['id'])
# Make sure we complete the import and have the zone_id for cleanup
waiters.wait_for_zone_import_status(
@@ -218,3 +363,93 @@
"Failed, expected import ID:{} wasn't found in "
"listed import IDs".format(
zone_import['id'], listed_zone_import_ids))
+
+ # Test RBAC with x-auth-all-projects
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ expected_allowed = ['os_system_admin']
+ else:
+ expected_allowed = ['os_admin']
+
+ self.check_list_IDs_RBAC_enforcement(
+ 'ZoneImportsClient', 'list_zone_imports', expected_allowed,
+ [zone_import['id']], headers=self.all_projects_header)
+
+
+class ZonesImportTestNegative(BaseZonesImportTest):
+ credentials = ["primary", "admin", "system_admin"]
+
+ @classmethod
+ def setup_credentials(cls):
+ # Do not create network resources for these test.
+ cls.set_network_resources()
+ super(ZonesImportTestNegative, cls).setup_credentials()
+
+ @classmethod
+ def setup_clients(cls):
+ super(ZonesImportTestNegative, cls).setup_clients()
+ cls.zone_client = cls.os_primary.dns_v2.ZonesClient()
+ cls.client = cls.os_primary.dns_v2.ZoneImportsClient()
+
+ def _clean_up_resources(self, zone_import_id):
+ zone_import = self.client.show_zone_import(zone_import_id)[1]
+ if zone_import['zone_id']: # A zone was actually created.
+ waiters.wait_for_zone_import_status(
+ self.client, zone_import_id, const.COMPLETE)
+ self.client.delete_zone_import(zone_import['id'])
+ self.wait_zone_delete(self.zone_client, zone_import['zone_id'])
+ else: # Import has failed and zone wasn't created.
+ self.client.delete_zone_import(zone_import['id'])
+
+ @decorators.idempotent_id('31eaf25a-9532-11eb-a55d-74e5f9e2a801')
+ def test_create_zone_import_invalid_ttl(self):
+ LOG.info('Try to create a zone import using invalid TTL value')
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_zone_import_invalid_ttl", suffix=self.tld_name)
+ zone_data = dns_data_utils.rand_zonefile_data(name=zone_name,
+ ttl='zahlabut')
+ zone_import = self.client.create_zone_import(
+ zonefile_data=zone_data, wait_until=const.ERROR)[1]
+ self.addCleanup(self._clean_up_resources, zone_import['id'])
+
+ @decorators.idempotent_id('31eaf25a-9532-11eb-a55d-74e5f9e2a801')
+ def test_create_zone_import_invalid_name(self):
+ LOG.info('Try to create a zone import using invalid name')
+ zone_import = self.client.create_zone_import(
+ zonefile_data=dns_data_utils.rand_zonefile_data(
+ name='@@@'), wait_until=const.ERROR)[1]
+ self.addCleanup(self._clean_up_resources, zone_import['id'])
+
+ @decorators.idempotent_id('8fd744d2-9dff-11ec-9fb6-201e8823901f')
+ def test_create_zone_import_invalid_file_data(self):
+ LOG.info('Try to create a zone import using random generated'
+ ' import file data')
+ zone_file_data = dns_data_utils.rand_string(size=100)
+ zone_import = self.client.create_zone_import(zone_file_data)[1]
+ self.addCleanup(self.client.delete_zone_import, zone_import['id'])
+ waiters.wait_for_zone_import_status(
+ self.client, zone_import['id'], const.ERROR)
+
+ @decorators.idempotent_id('4fb9494e-9e23-11ec-8378-201e8823901f')
+ def test_zone_cannot_be_update_by_import(self):
+ LOG.info('Create a Zone named: "...zone_to_update..."')
+ zone_name = dns_data_utils.rand_zone_name(
+ name='zone_to_update', suffix=self.tld_name)
+ zone = self.zone_client.create_zone(
+ name=zone_name, wait_until=const.ACTIVE)[1]
+ self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'])
+ LOG.info('Use zone import to update an existing zone, expected: zone'
+ ' import gets into the ERROR status ')
+ zone_import_data = dns_data_utils.rand_zonefile_data(name=zone_name)
+ zone_import = self.client.create_zone_import(zone_import_data)[1]
+ waiters.wait_for_zone_import_status(
+ self.client, zone_import['id'], const.ERROR)
+ self.addCleanup(self._clean_up_resources, zone_import['id'])
+
+ @decorators.idempotent_id('5fa8016e-6ed1-11ec-9bd7-201e8823901f')
+ def test_create_zone_import_invalid_content_type(self):
+ LOG.info('Try to create a zone import using: "Content-Type:Zahlabut"'
+ ' HTTP header in POST request')
+ with self.assertRaisesDns(
+ lib_exc.InvalidContentType, 'unsupported_content_type', 415):
+ self.client.create_zone_import(
+ headers={'Content-Type': 'Zahlabut'})
diff --git a/designate_tempest_plugin/tests/base.py b/designate_tempest_plugin/tests/base.py
index 2e02a8c..091c3ca 100644
--- a/designate_tempest_plugin/tests/base.py
+++ b/designate_tempest_plugin/tests/base.py
@@ -11,13 +11,13 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-import six
from tempest import test
from tempest import config
from tempest.lib.common.utils import test_utils as utils
from designate_tempest_plugin.services.dns.query.query_client import \
QueryClient
+from designate_tempest_plugin.tests import rbac_utils
CONF = config.CONF
@@ -55,7 +55,7 @@
return False
-class BaseDnsTest(test.BaseTestCase):
+class BaseDnsTest(rbac_utils.RBACTestsMixin, test.BaseTestCase):
"""Base class for DNS tests."""
# NOTE(andreaf) credentials holds a list of the credentials to be allocated
@@ -64,9 +64,23 @@
# rest the actual roles.
# NOTE(kiall) primary will result in a manager @ cls.os_primary, alt will
# have cls.os_alt, and admin will have cls.os_admin.
- # NOTE(kiall) We should default to only primary, and request additional
- # credentials in the tests that require them.
- credentials = ['primary']
+ # NOTE(johnsom) We will allocate most credentials here so that each test
+ # can test for allowed and disallowed RBAC policies.
+ credentials = ['admin', 'primary', 'alt']
+ if CONF.dns_feature_enabled.enforce_new_defaults:
+ credentials.extend(['system_admin', 'system_reader',
+ 'project_member', 'project_reader'])
+
+ # A tuple of credentials that will be allocated by tempest using the
+ # 'credentials' list above. These are used to build RBAC test lists.
+ allocated_creds = []
+ for cred in credentials:
+ if isinstance(cred, list):
+ allocated_creds.append('os_roles_' + cred[0])
+ else:
+ allocated_creds.append('os_' + cred)
+ # Tests shall not mess with the list of allocated credentials
+ allocated_credentials = tuple(allocated_creds)
@classmethod
def skip_checks(cls):
@@ -91,7 +105,7 @@
)
def assertExpected(self, expected, actual, excluded_keys):
- for key, value in six.iteritems(expected):
+ for key, value in expected.items():
if key not in excluded_keys:
self.assertIn(key, actual)
self.assertEqual(value, actual[key], key)
@@ -135,6 +149,10 @@
zone_id,
recordset_id)
+ def unset_ptr(self, ptr_client, fip_id, **kwargs):
+ return utils.call_and_ignore_notfound_exc(
+ ptr_client.unset_ptr_record, fip_id, **kwargs)
+
def _delete_zone(self, zone_client, zone_id, **kwargs):
return utils.call_and_ignore_notfound_exc(zone_client.delete_zone,
zone_id, **kwargs)
diff --git a/designate_tempest_plugin/tests/rbac_utils.py b/designate_tempest_plugin/tests/rbac_utils.py
new file mode 100644
index 0000000..fa66410
--- /dev/null
+++ b/designate_tempest_plugin/tests/rbac_utils.py
@@ -0,0 +1,368 @@
+# Copyright 2021 Red Hat, Inc. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+
+from oslo_log import log as logging
+from tempest import config
+from tempest.lib import exceptions
+from tempest import test
+
+CONF = config.CONF
+LOG = logging.getLogger(__name__)
+
+
+class RBACTestsMixin(test.BaseTestCase):
+
+ def _get_client_method(self, cred_obj, client_str, method_str):
+ """Get requested method from registered clients in Tempest."""
+ dns_clients = getattr(cred_obj, 'dns_v2')
+ client = getattr(dns_clients, client_str)
+ client_obj = client()
+ method = getattr(client_obj, method_str)
+ return method
+
+ def _get_client_project_id(self, cred_obj, client_str):
+ """Get project ID for the credential."""
+ dns_clients = getattr(cred_obj, 'dns_v2')
+ client = getattr(dns_clients, client_str)
+ client_obj = client()
+ return client_obj.project_id
+
+ def _check_allowed(self, client_str, method_str, allowed_list,
+ with_project, *args, **kwargs):
+ """Test an API call allowed RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param allowed_list: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param with_project: When true, pass the project ID to the call.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ for cred in allowed_list:
+ try:
+ cred_obj = getattr(self, cred)
+ except AttributeError:
+ # TODO(johnsom) Remove once scoped tokens is the default.
+ if ((cred == 'os_system_admin' or
+ cred == 'os_system_reader') and
+ not CONF.enforce_scope.designate):
+ LOG.info('Skipping %s allowed RBAC test because '
+ 'enforce_scope.designate is not True', cred)
+ continue
+ else:
+ self.fail('Credential {} "expected_allowed" for RBAC '
+ 'testing was not created by tempest '
+ 'credentials setup. This is likely a bug in the '
+ 'test.'.format(cred))
+ method = self._get_client_method(cred_obj, client_str, method_str)
+ project_id = self._get_client_project_id(cred_obj, client_str)
+ try:
+ if with_project:
+ method(project_id, *args, **kwargs)
+ else:
+ method(*args, **kwargs)
+ except exceptions.Forbidden as e:
+ self.fail('Method {}.{} failed to allow access via RBAC using '
+ 'credential {}. Error: {}'.format(
+ client_str, method_str, cred, str(e)))
+ except exceptions.NotFound as e:
+ self.fail('Method {}.{} failed to allow access via RBAC using '
+ 'credential {}. Error: {}'.format(
+ client_str, method_str, cred, str(e)))
+
+ def _check_disallowed(self, client_str, method_str, allowed_list,
+ expect_404, with_project, *args, **kwargs):
+ """Test an API call disallowed RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param allowed_list: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param expect_404: When True, 404 responses are considered ok.
+ :param with_project: When true, pass the project ID to the call.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+ expected_disallowed = (set(self.allocated_credentials) -
+ set(allowed_list))
+ for cred in expected_disallowed:
+ cred_obj = getattr(self, cred)
+ method = self._get_client_method(cred_obj, client_str, method_str)
+ project_id = self._get_client_project_id(cred_obj, client_str)
+
+ # Unfortunately tempest uses testtools assertRaises[1] which means
+ # we cannot use the unittest assertRaises context[2] with msg= to
+ # give a useful error.
+ # Also, testtools doesn't work with subTest[3], so we can't use
+ # that to expose the failing credential.
+ # This all means the exception raised testtools assertRaises
+ # is less than useful.
+ # TODO(johnsom) Remove this try block once testtools is useful.
+ # [1] https://testtools.readthedocs.io/en/latest/
+ # api.html#testtools.TestCase.assertRaises
+ # [2] https://docs.python.org/3/library/
+ # unittest.html#unittest.TestCase.assertRaises
+ # [3] https://github.com/testing-cabal/testtools/issues/235
+ try:
+ if with_project:
+ method(project_id, *args, **kwargs)
+ else:
+ method(*args, **kwargs)
+ except exceptions.Forbidden:
+ continue
+ except exceptions.NotFound:
+ # Some APIs hide that the resource exists by returning 404
+ # on permission denied.
+ if expect_404:
+ continue
+ raise
+ self.fail('Method {}.{} failed to deny access via RBAC using '
+ 'credential {}.'.format(client_str, method_str, cred))
+
+ def check_list_show_RBAC_enforcement(self, client_str, method_str,
+ expected_allowed, expect_404,
+ *args, **kwargs):
+ """Test list or show API call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param expect_404: When True, 404 responses are considered ok.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+
+ # #### Test that disallowed credentials cannot access the API.
+ self._check_disallowed(client_str, method_str, allowed_list,
+ expect_404, False, *args, **kwargs)
+
+ # #### Test that allowed credentials can access the API.
+ self._check_allowed(client_str, method_str, allowed_list, False,
+ *args, **kwargs)
+
+ def check_list_show_with_ID_RBAC_enforcement(self, client_str, method_str,
+ expected_allowed, expect_404,
+ *args, **kwargs):
+ """Test list or show API call passing the project ID RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param expect_404: When True, 404 responses are considered ok.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+
+ # #### Test that disallowed credentials cannot access the API.
+ self._check_disallowed(client_str, method_str, allowed_list,
+ expect_404, True, *args, **kwargs)
+
+ # #### Test that allowed credentials can access the API.
+ self._check_allowed(client_str, method_str, allowed_list, True,
+ *args, **kwargs)
+
+ def check_CUD_RBAC_enforcement(self, client_str, method_str,
+ expected_allowed, expect_404,
+ *args, **kwargs):
+ """Test an API create/update/delete call RBAC enforcement.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param expect_404: When True, 404 responses are considered ok.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+
+ # #### Test that disallowed credentials cannot access the API.
+ self._check_disallowed(client_str, method_str, allowed_list,
+ expect_404, False, *args, **kwargs)
+
+ def check_list_RBAC_enforcement_count(
+ self, client_str, method_str, expected_allowed, expected_count,
+ *args, **kwargs):
+ """Test an API list call RBAC enforcement result count.
+
+ List APIs will return the object list for the project associated
+ with the token used to access the API. This means most credentials
+ will have access, but will get differing results.
+
+ This test will query the list API using a list of credentials and
+ will validate that only the expected count of results are returned.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param expected_count: The number of results expected in the list
+ returned from the API.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+
+ for cred in allowed_list:
+ try:
+ cred_obj = getattr(self, cred)
+ except AttributeError:
+ # TODO(johnsom) Remove once scoped tokens is the default.
+ if ((cred == 'os_system_admin' or
+ cred == 'os_system_reader') and
+ not CONF.enforce_scope.designate):
+ LOG.info('Skipping %s allowed RBAC test because '
+ 'enforce_scope.designate is not True', cred)
+ continue
+ else:
+ self.fail('Credential {} "expected_allowed" for RBAC '
+ 'testing was not created by tempest '
+ 'credentials setup. This is likely a bug in the '
+ 'test.'.format(cred))
+ method = self._get_client_method(cred_obj, client_str, method_str)
+ try:
+ # Get the result body
+ result = method(*args, **kwargs)[1]
+ except exceptions.Forbidden as e:
+ self.fail('Method {}.{} failed to allow access via RBAC using '
+ 'credential {}. Error: {}'.format(
+ client_str, method_str, cred, str(e)))
+ # Remove the root element
+ result_objs = next(iter(result.values()))
+
+ self.assertEqual(expected_count, len(result_objs),
+ message='Credential {} saw {} objects when {} '
+ 'was expected.'.format(cred, len(result),
+ expected_count))
+
+ def check_list_IDs_RBAC_enforcement(
+ self, client_str, method_str, expected_allowed, expected_ids,
+ *args, **kwargs):
+ """Test an API list call RBAC enforcement result contains IDs.
+
+ List APIs will return the object list for the project associated
+ with the token used to access the API. This means most credentials
+ will have access, but will get differing results.
+
+ This test will query the list API using a list of credentials and
+ will validate that the expected object Ids in included in the results.
+
+ :param client_str: The service client to use for the test, without the
+ credential. Example: 'ZonesClient'
+ :param method_str: The method on the client to call for the test.
+ Example: 'list_zones'
+ :param expected_allowed: The list of credentials expected to be
+ allowed. Example: ['primary'].
+ :param expected_ids: The list of object IDs to validate are included
+ in the returned list from the API.
+ :param args: Any positional parameters needed by the method.
+ :param kwargs: Any named parameters needed by the method.
+ :raises AssertionError: Raised if the RBAC tests fail.
+ :raises Forbidden: Raised if a credential that should have access does
+ not and is denied.
+ :raises InvalidScope: Raised if a credential that should have the
+ correct scope for access is denied.
+ :returns: None on success
+ """
+
+ allowed_list = copy.deepcopy(expected_allowed)
+
+ for cred in allowed_list:
+ try:
+ cred_obj = getattr(self, cred)
+ except AttributeError:
+ # TODO(johnsom) Remove once scoped tokens is the default.
+ if ((cred == 'os_system_admin' or
+ cred == 'os_system_reader') and
+ not CONF.enforce_scope.designate):
+ LOG.info('Skipping %s allowed RBAC test because '
+ 'enforce_scope.designate is not True', cred)
+ continue
+ else:
+ self.fail('Credential {} "expected_allowed" for RBAC '
+ 'testing was not created by tempest '
+ 'credentials setup. This is likely a bug in the '
+ 'test.'.format(cred))
+ method = self._get_client_method(cred_obj, client_str, method_str)
+ try:
+ # Get the result body
+ result = method(*args, **kwargs)[1]
+ except exceptions.Forbidden as e:
+ self.fail('Method {}.{} failed to allow access via RBAC using '
+ 'credential {}. Error: {}'.format(
+ client_str, method_str, cred, str(e)))
+ # Remove the root element
+ result_objs = next(iter(result.values()))
+
+ result_ids = [result_obj["id"] for result_obj in result_objs]
+ self.assertTrue(set(expected_ids).issubset(set(result_ids)))
diff --git a/designate_tempest_plugin/tests/scenario/v2/test_blacklists.py b/designate_tempest_plugin/tests/scenario/v2/test_blacklists.py
index 27b3f7f..a7455de 100644
--- a/designate_tempest_plugin/tests/scenario/v2/test_blacklists.py
+++ b/designate_tempest_plugin/tests/scenario/v2/test_blacklists.py
@@ -27,6 +27,29 @@
class BaseBlacklistsTest(base.BaseDnsV2Test):
excluded_keys = ['created_at', 'updated_at', 'links']
+ @classmethod
+ def setup_clients(cls):
+ super(BaseBlacklistsTest, cls).setup_clients()
+
+ if CONF.enforce_scope.designate:
+ cls.admin_tld_client = cls.os_system_admin.dns_v2.TldClient()
+ else:
+ cls.admin_tld_client = cls.os_admin.dns_v2.TldClient()
+
+ @classmethod
+ def resource_setup(cls):
+ super(BaseBlacklistsTest, cls).resource_setup()
+
+ # Make sure we have an allowed TLD available
+ tld_name = dns_data_utils.rand_zone_name(name="BaseBlacklistsTest")
+ cls.tld_name = f".{tld_name}"
+ cls.class_tld = cls.admin_tld_client.create_tld(tld_name=tld_name[:-1])
+
+ @classmethod
+ def resource_cleanup(cls):
+ cls.admin_tld_client.delete_tld(cls.class_tld[1]['id'])
+ super(BaseBlacklistsTest, cls).resource_cleanup()
+
class BlacklistE2E(BaseBlacklistsTest):
@@ -86,7 +109,8 @@
@decorators.idempotent_id('de030088-d97e-11eb-8ab8-74e5f9e2a801')
def test_admin_creates_zone_matches_blacklist_name_or_regex(self):
LOG.info('Create a blacklists using: regex and exact string(name)')
- zone_name = 'blacklistnameregextest1' + dns_data_utils.rand_zone_name()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="admin_creates_zone_matches_blacklist1", suffix=self.tld_name)
blacklists = [
{'pattern': '^blacklistnameregextest2.*',
'description': 'Zone starts with "a" char'},
@@ -99,9 +123,10 @@
LOG.info('As Admin user try to create zones that are '
'supposed to be blocked')
+ zone_name2 = dns_data_utils.rand_zone_name(
+ name="admin_creates_zone_matches_blacklist2", suffix=self.tld_name)
zone = self.admin_zone_client.create_zone(
- name='blacklistnameregextest2' +
- dns_data_utils.rand_zone_name(),
+ name=zone_name2,
project_id=self.primary_zone_client.project_id)[1]
self.addCleanup(
self.wait_zone_delete, self.admin_zone_client, zone['id'])
diff --git a/designate_tempest_plugin/tests/scenario/v2/test_quotas.py b/designate_tempest_plugin/tests/scenario/v2/test_quotas.py
index 0c20b3d..17fd16f 100644
--- a/designate_tempest_plugin/tests/scenario/v2/test_quotas.py
+++ b/designate_tempest_plugin/tests/scenario/v2/test_quotas.py
@@ -11,13 +11,16 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
-
+import random
from oslo_log import log as logging
from tempest import config
from tempest.lib import decorators
+from tempest.lib import exceptions as lib_exc
from designate_tempest_plugin.tests import base
from designate_tempest_plugin import data_utils as dns_data_utils
+from designate_tempest_plugin.common import constants as const
+
LOG = logging.getLogger(__name__)
@@ -28,6 +31,7 @@
class QuotasV2Test(base.BaseDnsV2Test):
credentials = ['primary', 'admin', 'system_admin', 'alt']
+ test_quota_limit = 3
@classmethod
def setup_credentials(cls):
@@ -49,44 +53,227 @@
super(QuotasV2Test, cls).setup_clients()
if CONF.enforce_scope.designate:
cls.admin_client = cls.os_system_admin.dns_v2.QuotasClient()
+ cls.admin_tld_client = cls.os_system_admin.dns_v2.TldClient()
else:
cls.admin_client = cls.os_admin.dns_v2.QuotasClient()
+ cls.admin_tld_client = cls.os_admin.dns_v2.TldClient()
cls.quotas_client = cls.os_primary.dns_v2.QuotasClient()
cls.alt_client = cls.os_alt.dns_v2.QuotasClient()
+ cls.zone_client = cls.os_primary.dns_v2.ZonesClient()
cls.alt_zone_client = cls.os_alt.dns_v2.ZonesClient()
+ cls.recordset_client = cls.os_primary.dns_v2.RecordsetClient()
- @decorators.idempotent_id('6987953a-dccf-11eb-903e-74e5f9e2a801')
- def test_alt_reaches_zones_quota(self):
+ @classmethod
+ def resource_setup(cls):
+ super(QuotasV2Test, cls).resource_setup()
- alt_project_id = self.alt_client.project_id
- http_header = {'x-auth-sudo-project-id': alt_project_id}
- limit_zones_quota = 3
+ # Make sure we have an allowed TLD available
+ tld_name = dns_data_utils.rand_zone_name(name="QuotasV2Test")
+ cls.tld_name = f".{tld_name}"
+ cls.class_tld = cls.admin_tld_client.create_tld(tld_name=tld_name[:-1])
- LOG.info('As Admin user set Zones quota for Alt user '
- 'to:{} '.format(limit_zones_quota))
- quotas = dns_data_utils.rand_quotas()
- quotas['zones'] = limit_zones_quota
+ @classmethod
+ def resource_cleanup(cls):
+ cls.admin_tld_client.delete_tld(cls.class_tld[1]['id'])
+ super(QuotasV2Test, cls).resource_cleanup()
+
+ def _set_quota_for_project(self, project_id, quotas):
+ http_header = {'x-auth-sudo-project-id': project_id}
self.admin_client.set_quotas(
- project_id=alt_project_id, quotas=quotas, headers=http_header)
+ project_id=project_id, quotas=quotas, headers=http_header)
self.addCleanup(
- self.admin_client.delete_quotas, project_id=alt_project_id)
+ self.admin_client.delete_quotas,
+ project_id=project_id, headers=http_header)
- LOG.info('As Alt user try to create zones, up untill'
- ' "zones" quota (status code 413) is reached')
+ def _reach_quota_limit(
+ self, limit_threshold, quota_type, zone=None):
attempt_number = 0
- while attempt_number <= limit_zones_quota + 1:
- attempt_number += 1
- LOG.info('Attempt No:{} '.format(attempt_number))
+ not_raised_msg = "Failed, expected '413 over_quota' response of " \
+ "type:{} wasn't received.".format(quota_type)
+ while attempt_number <= limit_threshold + 1:
try:
- zone = self.alt_zone_client.create_zone()[1]
- self.addCleanup(
- self.wait_zone_delete, self.alt_zone_client, zone['id'])
- except Exception as err:
- raised_error = str(err).replace(' ', '')
- if not "'code':413" and "'type':'over_quota'" in raised_error \
- and attempt_number == limit_zones_quota + 1:
- raise (
- "Failed, expected status code 413 (type:over_quota) "
- "was not raised or maybe it has been raised mistakenly"
- "(bug) before the quota was actually reached."
- " Test failed with: {} ".format(err))
+ attempt_number += 1
+ LOG.info('Attempt No:{} '.format(attempt_number))
+ if quota_type == 'zones_quota':
+ zone_name = dns_data_utils.rand_zone_name(
+ name="_reach_quota_limit", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(
+ name=zone_name,
+ description='Test zone for:{}'.format(quota_type))[1]
+ self.addCleanup(
+ self.wait_zone_delete,
+ self.zone_client, zone['id'])
+ else:
+ if quota_type == 'zone_recordsets':
+ max_number_of_records = 10
+ prj_quota = self.admin_client.show_quotas(
+ project_id=self.zone_client.project_id,
+ headers=self.all_projects_header)[1][
+ 'zone_records']
+ if max_number_of_records > prj_quota:
+ max_number_of_records = prj_quota
+ recordset_data = dns_data_utils.rand_recordset_data(
+ record_type='A', zone_name=zone['name'],
+ number_of_records=random.randint(
+ 1, max_number_of_records))
+ else:
+ recordset_data = dns_data_utils.rand_recordset_data(
+ record_type='A', zone_name=zone['name'])
+ recordset = self.recordset_client.create_recordset(
+ zone['id'], recordset_data=recordset_data,
+ wait_until=const.ACTIVE)[1]
+ self.addCleanup(
+ self.wait_recordset_delete,
+ self.recordset_client,
+ zone['id'], recordset['id'])
+ self.assertLess(
+ attempt_number, limit_threshold + 1, not_raised_msg)
+ except Exception as e:
+ raised_err = str(e).replace(' ', '')
+ if not_raised_msg in str(e):
+ raise AssertionError(not_raised_msg)
+ elif "'code':413" in raised_err and \
+ "'type':'over_quota'" in raised_err:
+ LOG.info("OK, type':'over_quota' was raised")
+ break
+ else:
+ raise
+
+ @decorators.attr(type='slow')
+ @decorators.idempotent_id('41d9cf2c-866a-11ec-8ccb-201e8823901f')
+ @decorators.skip_because(bug="1960495")
+ def test_api_export_size_quota(self):
+ LOG.info('Admin sets "api_export_size:{}" quota for Primary'
+ ' user'.format(self.test_quota_limit))
+ quotas = dns_data_utils.rand_quotas()
+ quotas['api_export_size'] = self.test_quota_limit
+ self._set_quota_for_project(
+ self.zone_client.project_id, quotas)
+ LOG.info('Create a Zone, wait until ACTIVE and add:{}'
+ ' Recordsets'.format(self.test_quota_limit + 1))
+ zone = self.zone_client.create_zone(
+ description='Zone for test_api_export_size_quota',
+ wait_until=const.ACTIVE)[1]
+ self.addCleanup(
+ self.wait_zone_delete,
+ self.zone_client, zone['id'])
+ for i in range(self.test_quota_limit + 1):
+ recordset_data = dns_data_utils.rand_recordset_data(
+ record_type='A', zone_name=zone['name'])
+ LOG.info('Try to create a recordset No:{}'.format(i))
+ recordset = self.recordset_client.create_recordset(
+ zone['id'], recordset_data=recordset_data,
+ wait_until=const.ACTIVE)[1]
+ self.addCleanup(
+ self.wait_recordset_delete,
+ self.recordset_client,
+ zone['id'], recordset['id'])
+ LOG.info(
+ 'Ensure that the Number of Recordsets is bigger than configured'
+ ' api_export_size:{}'.format(self.test_quota_limit))
+ number_of_recordsets = len(self.recordset_client.list_recordset(
+ zone['id'])[1]['recordsets'])
+ self.assertGreater(
+ number_of_recordsets, self.test_quota_limit,
+ 'Failed, the number of recordsets within a Zone is not enough to'
+ ' trigger "413 over quota" on Zone Export')
+ LOG.info('Try to export Zone. Expected:"413 over_quota"')
+ with self.assertRaisesDns(
+ lib_exc.OverLimit, 'over_quota', 413):
+ self.export_zone_client.create_zone_export(zone['id'])
+
+ @decorators.attr(type='slow')
+ @decorators.idempotent_id('2513cb6e-85ec-11ec-bf7f-201e8823901f')
+ def test_recordset_records_quota(self):
+ LOG.info('Admin sets "recordset_records:{}" quota for Primary'
+ ' user'.format(self.test_quota_limit))
+ quotas = dns_data_utils.rand_quotas()
+ quotas['recordset_records'] = self.test_quota_limit
+ self._set_quota_for_project(
+ self.zone_client.project_id, quotas)
+ LOG.info('Create a Zone and wait until ACTIVE')
+ zone_name = dns_data_utils.rand_zone_name(
+ name="test_recordset_records_quota", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(
+ name=zone_name,
+ description='Zone for test_recordset_records_quota',
+ wait_until=const.ACTIVE)[1]
+ self.addCleanup(
+ self.wait_zone_delete,
+ self.zone_client, zone['id'])
+ LOG.info(
+ 'Create recordset data with:{} records and try to create'
+ ' a recordset. Expected:"413 over_quota"'.format(
+ self.test_quota_limit + 1))
+ recordset_data = dns_data_utils.rand_recordset_data(
+ record_type='A', zone_name=zone['name'],
+ number_of_records=self.test_quota_limit + 1)
+ LOG.info('Try to create a recordset. Expected:"413 over_quota"')
+ with self.assertRaisesDns(
+ lib_exc.OverLimit, 'over_quota', 413):
+ self.recordset_client.create_recordset(
+ zone['id'], recordset_data=recordset_data)
+
+ @decorators.attr(type='slow')
+ @decorators.idempotent_id('893dc648-868d-11ec-8ccb-201e8823901f')
+ def test_zone_records_quota(self):
+ LOG.info('Create a Zone and wait until ACTIVE')
+ zone_name = dns_data_utils.rand_zone_name(
+ name="test_zone_records_quota", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(
+ name=zone_name,
+ description='Zone for test_zone_records_quota',
+ wait_until=const.ACTIVE)[1]
+ self.addCleanup(
+ self.wait_zone_delete,
+ self.zone_client, zone['id'])
+ LOG.info('Admin sets "zone_records:{}" quota for Primary '
+ 'user'.format(self.test_quota_limit))
+ quotas = dns_data_utils.rand_quotas()
+ quotas['zone_records'] = self.test_quota_limit
+ self._set_quota_for_project(
+ self.zone_client.project_id, quotas)
+ LOG.info(
+ 'Try to add:{} recordsets (with a single record) to the Zone in'
+ ' loop. Expected:"413 over_quota"'.format(
+ self.test_quota_limit + 1))
+ self._reach_quota_limit(
+ self.test_quota_limit + 1, 'zone_records', zone)
+
+ @decorators.attr(type='slow')
+ @decorators.idempotent_id('f567bdda-86b3-11ec-8ccb-201e8823901f')
+ def test_zone_recordsets_quota(self):
+ LOG.info('Create a Zone and wait until ACTIVE')
+ zone_name = dns_data_utils.rand_zone_name(
+ name="test_zone_recordsets_quota", suffix=self.tld_name)
+ zone = self.zone_client.create_zone(
+ name=zone_name,
+ description='Zone for test_zone_recordsets_quota',
+ wait_until=const.ACTIVE)[1]
+ self.addCleanup(
+ self.wait_zone_delete,
+ self.zone_client, zone['id'])
+ LOG.info('Admin sets "zone_recordsets:{}" quota for Primary '
+ 'user'.format(self.test_quota_limit))
+ quotas = dns_data_utils.rand_quotas()
+ quotas['zone_recordsets'] = self.test_quota_limit
+ self._set_quota_for_project(
+ self.zone_client.project_id, quotas)
+ LOG.info(
+ 'Try to add:{} recordsets (with a random number of records) to a'
+ ' Zone in loop. Expected:"413 over_quota"'.format(
+ self.test_quota_limit + 1))
+ self._reach_quota_limit(
+ self.test_quota_limit + 1,
+ 'zone_recordsets', zone)
+
+ @decorators.attr(type='slow')
+ @decorators.idempotent_id('6987953a-dccf-11eb-903e-74e5f9e2a801')
+ def test_zones_quota(self):
+ LOG.info('Admin sets "zones" quota for Primary user')
+ quotas = dns_data_utils.rand_quotas()
+ quotas['zones'] = self.test_quota_limit
+ self._set_quota_for_project(
+ self.zone_client.project_id, quotas)
+ LOG.info('Try to create Zones. Expected:"413 over_quota"')
+ self._reach_quota_limit(self.test_quota_limit, 'zones_quota')
diff --git a/designate_tempest_plugin/tests/scenario/v2/test_recordsets.py b/designate_tempest_plugin/tests/scenario/v2/test_recordsets.py
index 4c40c28..e58a54c 100644
--- a/designate_tempest_plugin/tests/scenario/v2/test_recordsets.py
+++ b/designate_tempest_plugin/tests/scenario/v2/test_recordsets.py
@@ -17,6 +17,7 @@
import ddt
from designate_tempest_plugin.tests import base
+from designate_tempest_plugin import data_utils as dns_data_utils
from designate_tempest_plugin.common import waiters
@@ -35,8 +36,10 @@
super(RecordsetsTest, cls).setup_clients()
if CONF.enforce_scope.designate:
cls.admin_client = cls.os_system_admin.dns_v2.RecordsetClient()
+ cls.admin_tld_client = cls.os_system_admin.dns_v2.TldClient()
else:
cls.admin_client = cls.os_admin.dns_v2.RecordsetClient()
+ cls.admin_tld_client = cls.os_admin.dns_v2.TldClient()
cls.client = cls.os_primary.dns_v2.ZonesClient()
cls.recordset_client = cls.os_primary.dns_v2.RecordsetClient()
@@ -49,8 +52,15 @@
LOG.info('Retrieve info from a zone')
_, zone = cls.client.show_zone(zone_id)
else:
+ # Make sure we have an allowed TLD available
+ tld_name = dns_data_utils.rand_zone_name(name="RecordsetsTest")
+ cls.tld_name = f".{tld_name}"
+ cls.class_tld = cls.admin_tld_client.create_tld(
+ tld_name=tld_name[:-1])
LOG.info('Create a new zone')
- _, zone = cls.client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="recordsets_test_setup", suffix=cls.tld_name)
+ zone = cls.client.create_zone(name=zone_name)[1]
cls.addClassResourceCleanup(
test_utils.call_and_ignore_notfound_exc,
cls.client.delete_zone, zone['id'])
@@ -60,6 +70,11 @@
cls.zone = zone
+ @classmethod
+ def resource_cleanup(cls):
+ cls.admin_tld_client.delete_tld(cls.class_tld[1]['id'])
+ super(RecordsetsTest, cls).resource_cleanup()
+
@decorators.attr(type='slow')
@decorators.idempotent_id('4664ed66-9ff1-45f2-9e60-d4913195c505')
@ddt.file_data("recordset_data.json")
diff --git a/designate_tempest_plugin/tests/scenario/v2/test_tld.py b/designate_tempest_plugin/tests/scenario/v2/test_tld.py
new file mode 100644
index 0000000..8dc6d70
--- /dev/null
+++ b/designate_tempest_plugin/tests/scenario/v2/test_tld.py
@@ -0,0 +1,80 @@
+# Copyright 2021 Red Hat.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo_log import log as logging
+from tempest import config
+from tempest.lib import decorators
+from tempest.lib import exceptions as lib_exc
+
+from designate_tempest_plugin.common import constants as const
+from designate_tempest_plugin.tests import base
+from designate_tempest_plugin import data_utils as dns_data_utils
+
+CONF = config.CONF
+LOG = logging.getLogger(__name__)
+
+
+class TldZoneTest(base.BaseDnsV2Test):
+ credentials = ["admin", "system_admin", "primary"]
+ tld_suffix = '.'.join(["TldZoneTest", CONF.dns.tld_suffix])
+
+ @classmethod
+ def setup_credentials(cls):
+ # Do not create network resources for these test.
+ cls.set_network_resources()
+ super(TldZoneTest, cls).setup_credentials()
+
+ @classmethod
+ def setup_clients(cls):
+ super(TldZoneTest, cls).setup_clients()
+ if CONF.enforce_scope.designate:
+ cls.admin_tld_client = cls.os_system_admin.dns_v2.TldClient()
+ else:
+ cls.admin_tld_client = cls.os_admin.dns_v2.TldClient()
+ cls.primary_tld_client = cls.os_primary.dns_v2.TldClient()
+ cls.primary_zone_client = cls.os_primary.dns_v2.ZonesClient()
+
+ @classmethod
+ def resource_setup(cls):
+ super(TldZoneTest, cls).resource_setup()
+ cls.class_tld = cls.admin_tld_client.create_tld(
+ tld_name=cls.tld_suffix)
+
+ @classmethod
+ def resource_cleanup(cls):
+ cls.admin_tld_client.delete_tld(cls.class_tld[1]['id'])
+ super(TldZoneTest, cls).resource_cleanup()
+
+ @decorators.idempotent_id('68b3e7cc-bf0e-11ec-b803-201e8823901f')
+ def test_create_zone_using_existing_tld(self):
+ LOG.info('Creates a zone using existing TLD:"{}"'.format(
+ self.tld_suffix))
+ zone_name = dns_data_utils.rand_zone_name(
+ name='existing_tld_zone', prefix='rand',
+ suffix='.{}.'.format(self.tld_suffix))
+ zone = self.primary_zone_client.create_zone(
+ name=zone_name, wait_until=const.ACTIVE)[1]
+ self.addCleanup(
+ self.wait_zone_delete, self.primary_zone_client, zone['id'])
+
+ @decorators.idempotent_id('06deced8-d4de-11eb-b8ee-74e5f9e2a801')
+ def test_create_zone_using_not_existing_tld(self):
+ LOG.info('Try to create a Zone using not existing TLD:"{}"'.format(
+ self.tld_suffix[::-1]))
+ zone_name = dns_data_utils.rand_zone_name(
+ name='not_existing_tld_zone', prefix='rand',
+ suffix='.{}.'.format(self.tld_suffix)[::-1])
+ self.assertRaises(
+ lib_exc.BadRequest, self.primary_zone_client.create_zone,
+ name=zone_name)
diff --git a/designate_tempest_plugin/tests/scenario/v2/test_zones.py b/designate_tempest_plugin/tests/scenario/v2/test_zones.py
index a7529a1..74f9563 100644
--- a/designate_tempest_plugin/tests/scenario/v2/test_zones.py
+++ b/designate_tempest_plugin/tests/scenario/v2/test_zones.py
@@ -11,63 +11,134 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
+
+import time
+import math
+
from oslo_log import log as logging
from tempest import config
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
import testtools
+from designate_tempest_plugin import data_utils as dns_data_utils
from designate_tempest_plugin.tests import base
+from designate_tempest_plugin.common import constants as const
from designate_tempest_plugin.common import waiters
+from designate_tempest_plugin.services.dns.query.query_client \
+ import SingleQueryClient
+CONF = config.CONF
+CONF = config.CONF
LOG = logging.getLogger(__name__)
class ZonesTest(base.BaseDnsV2Test):
+ credentials = ["primary", "admin", "system_admin"]
+
@classmethod
def setup_clients(cls):
super(ZonesTest, cls).setup_clients()
-
+ if CONF.enforce_scope.designate:
+ cls.admin_tld_client = cls.os_system_admin.dns_v2.TldClient()
+ cls.rec_client = cls.os_system_admin.dns_v2.RecordsetClient()
+ else:
+ cls.admin_tld_client = cls.os_admin.dns_v2.TldClient()
+ cls.rec_client = cls.os_admin.dns_v2.RecordsetClient()
cls.client = cls.os_primary.dns_v2.ZonesClient()
+ cls.primary_client = cls.os_primary.dns_v2.BlacklistsClient()
+
+ @classmethod
+ def resource_setup(cls):
+ super(ZonesTest, cls).resource_setup()
+ # Make sure we have an allowed TLD available
+ tld_name = dns_data_utils.rand_zone_name(name="ZonesTest")
+ cls.tld_name = f".{tld_name}"
+ cls.class_tld = cls.admin_tld_client.create_tld(tld_name=tld_name[:-1])
+
+ @classmethod
+ def resource_cleanup(cls):
+ cls.admin_tld_client.delete_tld(cls.class_tld[1]['id'])
+ super(ZonesTest, cls).resource_cleanup()
@decorators.attr(type='smoke')
@decorators.attr(type='slow')
@decorators.idempotent_id('d0648f53-4114-45bd-8792-462a82f69d32')
def test_create_and_delete_zone(self):
LOG.info('Create a zone')
- _, zone = self.client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_and_delete_zone", suffix=self.tld_name)
+ zone = self.client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.client, zone['id'],
ignore_errors=lib_exc.NotFound)
LOG.info('Ensure we respond with CREATE+PENDING')
- self.assertEqual('CREATE', zone['action'])
- self.assertEqual('PENDING', zone['status'])
+ self.assertEqual(const.CREATE, zone['action'])
+ self.assertEqual(const.PENDING, zone['status'])
waiters.wait_for_zone_status(
- self.client, zone['id'], 'ACTIVE')
+ self.client, zone['id'], const.ACTIVE)
LOG.info('Re-Fetch the zone')
- _, zone = self.client.show_zone(zone['id'])
+ zone = self.client.show_zone(zone['id'])[1]
- LOG.info('Ensure we respond with NONE+PENDING')
- self.assertEqual('NONE', zone['action'])
- self.assertEqual('ACTIVE', zone['status'])
+ LOG.info('Ensure we respond with NONE+ACTIVE')
+ self.assertEqual(const.NONE, zone['action'])
+ self.assertEqual(const.ACTIVE, zone['status'])
LOG.info('Delete the zone')
- _, zone = self.client.delete_zone(zone['id'])
+ zone = self.client.delete_zone(zone['id'])[1]
LOG.info('Ensure we respond with DELETE+PENDING')
- self.assertEqual('DELETE', zone['action'])
- self.assertEqual('PENDING', zone['status'])
+ self.assertEqual(const.DELETE, zone['action'])
+ self.assertEqual(const.PENDING, zone['status'])
waiters.wait_for_zone_404(self.client, zone['id'])
@decorators.attr(type='slow')
+ @decorators.idempotent_id('cabd6334-ba37-11ec-9d8c-201e8823901f')
+ def test_create_and_update_zone(self):
+
+ LOG.info('Create a zone and wait until it becomes ACTIVE')
+ orig_ttl = 666
+ orig_description = 'test_create_and_update_zone: org description'
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_and_update_zone", suffix=self.tld_name)
+ zone = self.client.create_zone(
+ name=zone_name,
+ ttl=orig_ttl, description=orig_description,
+ wait_until=const.ACTIVE)[1]
+ self.addCleanup(self.wait_zone_delete, self.client, zone['id'],
+ ignore_errors=lib_exc.NotFound)
+
+ LOG.info("Update zone's: TTL and Description, wait until ACTIVE")
+ updated_ttl = 777
+ updated_description = dns_data_utils.rand_string(20)
+ self.client.update_zone(
+ zone['id'], ttl=updated_ttl, description=updated_description,
+ wait_until=const.ACTIVE)
+
+ LOG.info('Re-Fetch/Show the zone')
+ show_zone = self.client.show_zone(zone['id'])[1]
+
+ LOG.info('Ensure that the Description and TLL has been updated')
+ self.assertEqual(
+ updated_ttl, show_zone['ttl'],
+ 'Failed, actual TTL value:{} is not as expected:{} after '
+ 'the update)'.format(show_zone['ttl'], updated_ttl))
+ self.assertEqual(
+ updated_description, show_zone['description'],
+ 'Failed, actual Description:{} is not as expected:{} after '
+ 'the update)'.format(show_zone['description'], orig_description))
+
+ @decorators.attr(type='slow')
@decorators.idempotent_id('c9838adf-14dc-4097-9130-e5cea3727abb')
def test_delete_zone_pending_create(self):
LOG.info('Create a zone')
- _, zone = self.client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="delete_zone_pending_create", suffix=self.tld_name)
+ zone = self.client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.client, zone['id'],
ignore_errors=lib_exc.NotFound)
@@ -77,11 +148,11 @@
# Theres not a huge amount we can do, given this is
# black-box testing.
LOG.info('Delete the zone while it is still pending')
- _, zone = self.client.delete_zone(zone['id'])
+ zone = self.client.delete_zone(zone['id'])[1]
LOG.info('Ensure we respond with DELETE+PENDING')
- self.assertEqual('DELETE', zone['action'])
- self.assertEqual('PENDING', zone['status'])
+ self.assertEqual(const.DELETE, zone['action'])
+ self.assertEqual(const.PENDING, zone['status'])
waiters.wait_for_zone_404(self.client, zone['id'])
@@ -92,11 +163,13 @@
"Config option dns.nameservers is missing or empty")
def test_zone_create_propagates_to_nameservers(self):
LOG.info('Create a zone')
- _, zone = self.client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="zone_create_propagates", suffix=self.tld_name)
+ zone = self.client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.client, zone['id'])
- waiters.wait_for_zone_status(self.client, zone['id'], "ACTIVE")
- waiters.wait_for_query(self.query_client, zone['name'], "SOA")
+ waiters.wait_for_zone_status(self.client, zone['id'], const.ACTIVE)
+ waiters.wait_for_query(self.query_client, zone['name'], const.SOA)
@decorators.attr(type='slow')
@decorators.idempotent_id('d13d3095-c78f-4aae-8fe3-a74ccc335c84')
@@ -105,16 +178,81 @@
"Config option dns.nameservers is missing or empty")
def test_zone_delete_propagates_to_nameservers(self):
LOG.info('Create a zone')
- _, zone = self.client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="zone_delete_propagates", suffix=self.tld_name)
+ zone = self.client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.client, zone['id'],
ignore_errors=lib_exc.NotFound)
- waiters.wait_for_zone_status(self.client, zone['id'], "ACTIVE")
- waiters.wait_for_query(self.query_client, zone['name'], "SOA")
+ waiters.wait_for_zone_status(self.client, zone['id'], const.ACTIVE)
+ waiters.wait_for_query(self.query_client, zone['name'], const.SOA)
LOG.info('Delete the zone')
self.client.delete_zone(zone['id'])
waiters.wait_for_zone_404(self.client, zone['id'])
- waiters.wait_for_query(self.query_client, zone['name'], "SOA",
+ waiters.wait_for_query(self.query_client, zone['name'], const.SOA,
found=False)
+
+ @decorators.attr(type='slow')
+ @decorators.idempotent_id('ff9b9fc4-85b4-11ec-bcf5-201e8823901f')
+ @testtools.skipUnless(
+ config.CONF.dns.nameservers,
+ "Config option dns.nameservers is missing or empty")
+ def test_notify_msg_sent_to_nameservers(self):
+
+ # Test will only run when the SOA record Refresh is close to one hour,
+ # otherwise skipped.
+ # This implies that the only reason "A" record was propagated is as a
+ # result of successfully sent NOTIFY message.
+
+ LOG.info('Create a zone, wait until ACTIVE and get the Serial'
+ ' and SOA Refresh values')
+ zone_name = dns_data_utils.rand_zone_name(
+ name="test_notify_msg_sent_to_nameservers", suffix=self.tld_name)
+ zone = self.client.create_zone(name=zone_name, wait_until='ACTIVE')[1]
+
+ org_serial = zone['serial']
+ self.addCleanup(self.wait_zone_delete, self.client, zone['id'])
+ try:
+ soa = [
+ rec['records'] for rec in self.rec_client.list_recordset(
+ zone['id'], headers=self.all_projects_header)[1][
+ 'recordsets'] if rec['type'] == 'SOA'][0]
+ refresh = int(soa[0].split(' ')[3])
+ if math.isclose(3600, refresh, rel_tol=0.1) is False:
+ raise self.skipException(
+ 'Test is skipped, actual SOA REFRESH is:{} unlike test'
+ ' prerequisites that requires a value close to 3600'
+ ' (one hour)'.format(refresh))
+ except Exception as e:
+ raise self.skipException(
+ 'Test is skipped, something went wrong on getting SOA REFRESH'
+ ' value, the error was:{}'.format(e))
+
+ LOG.info("Update Zone's TTL, wait until ACTIVE and"
+ " ensure Zone's Serial has changed")
+ updated_zone = self.client.update_zone(
+ zone['id'], ttl=dns_data_utils.rand_ttl(), wait_until='ACTIVE')[1]
+ new_serial = updated_zone['serial']
+ self.assertNotEqual(
+ new_serial, org_serial,
+ "Failed, expected behaviour is that the Designate DNS changes the"
+ " Serial after updating Zone's TTL value")
+ waiters.wait_for_query(self.query_client, zone['name'], "SOA")
+
+ LOG.info('Per Nameserver "dig" for a SOA record until either:'
+ ' updated Serial is detected or build timeout has reached')
+ for ns in config.CONF.dns.nameservers:
+ start = time.time()
+ while True:
+ ns_obj = SingleQueryClient(ns, config.CONF.dns.query_timeout)
+ ns_soa_record = ns_obj.query(zone['name'], rdatatype='SOA')
+ if str(new_serial) in str(ns_soa_record):
+ return
+ if time.time() - start >= config.CONF.dns.build_timeout:
+ raise lib_exc.TimeoutException(
+ 'Failed, expected Serial:{} for a Zone was not'
+ ' detected on Nameserver:{} within a timeout of:{}'
+ ' seconds.'.format(
+ new_serial, ns, config.CONF.dns.build_timeout))
diff --git a/designate_tempest_plugin/tests/scenario/v2/test_zones_export.py b/designate_tempest_plugin/tests/scenario/v2/test_zones_export.py
index 5d0de56..565461d 100644
--- a/designate_tempest_plugin/tests/scenario/v2/test_zones_export.py
+++ b/designate_tempest_plugin/tests/scenario/v2/test_zones_export.py
@@ -12,13 +12,18 @@
# License for the specific language governing permissions and limitations
# under the License.
+import json
+import os
+
from oslo_log import log as logging
from tempest import config
from tempest.lib import decorators
+
+from designate_tempest_plugin.common import constants as const
from designate_tempest_plugin.common import waiters
+from designate_tempest_plugin import data_utils as dns_data_utils
from designate_tempest_plugin.tests.api.v2.test_zones_exports import \
BaseZoneExportsTest
-from designate_tempest_plugin.common import constants as const
CONF = config.CONF
LOG = logging.getLogger(__name__)
@@ -42,22 +47,26 @@
cls.admin_client = cls.os_admin.dns_v2.ZoneExportsClient()
cls.client = cls.os_primary.dns_v2.ZoneExportsClient()
cls.zones_client = cls.os_primary.dns_v2.ZonesClient()
+ cls.recordset_client = cls.os_primary.dns_v2.RecordsetClient()
- def _create_zone_export(self):
+ def _create_zone_export(self, test_name):
LOG.info('Create a zone')
- zone = self.zones_client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name=test_name, suffix=self.tld_name)
+ zone = self.zones_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zones_client, zone['id'])
LOG.info('Create a zone export')
- zone_export = self.client.create_zone_export(zone['id'])[1]
+ zone_export = self.client.create_zone_export(
+ zone['id'], wait_until=const.COMPLETE)[1]
self.addCleanup(self.client.delete_zone_export, zone_export['id'])
- waiters.wait_for_zone_export_status(
- self.client, zone_export['id'], const.COMPLETE)
+
return zone, zone_export
@decorators.idempotent_id('0484c3c4-df57-458e-a6e5-6eb63e0475e0')
def test_create_zone_export_and_show_exported_zonefile(self):
- zone, zone_export = self._create_zone_export()
+ zone, zone_export = self._create_zone_export(
+ 'create_zone_export_and_show_exported_zonefile')
self.assertEqual(const.PENDING, zone_export['status'])
self.assertEqual(zone['id'], zone_export['zone_id'])
@@ -79,7 +88,8 @@
@decorators.idempotent_id('56b8f30e-cd45-4c7a-bc0c-bbf92d7dc697')
def test_show_exported_zonefile_impersonate_another_project(self):
- zone, zone_export = self._create_zone_export()
+ zone, zone_export = self._create_zone_export(
+ 'show_exported_zonefile_impersonate')
LOG.info('As Admin impersonate "primary" client,'
' to show exported zone file')
@@ -91,7 +101,8 @@
@decorators.idempotent_id('c2e55514-ff2e-41d9-a3cc-9e78873254c9')
def test_show_exported_zonefile_all_projects(self):
- zone, zone_export = self._create_zone_export()
+ zone, zone_export = self._create_zone_export(
+ 'show_exported_zonefile_all_projects')
resp_headers, resp_data = self.admin_client.show_exported_zonefile(
zone_export['id'], headers={
'x-auth-all-projects': True
@@ -101,7 +112,8 @@
@decorators.idempotent_id('9746b7f2-2df4-448c-8a85-5ab6bf74f1fe')
def test_show_exported_zonefile_any_mime_type(self):
- zone, zone_export = self._create_zone_export()
+ zone, zone_export = self._create_zone_export(
+ 'show_exported_zonefile_any_mime_type')
resp_headers, resp_data = self.client.show_exported_zonefile(
zone_export['id'], headers={'Accept': '*/*'})
@@ -116,7 +128,8 @@
@decorators.idempotent_id('dc7a9dde-d287-4e22-9788-26578f0d3bf0')
def test_missing_accept_headers(self):
- zone, zone_export = self._create_zone_export()
+ zone, zone_export = self._create_zone_export(
+ 'missing_accept_headers')
resp_headers, resp_data = self.client.show_exported_zonefile(
zone_export['id'], headers={})
LOG.info('Ensure Content-Type: text/dns')
@@ -127,3 +140,61 @@
LOG.info('Ensure exported data ia as expected')
self.assertEqual(zone['name'], resp_data.origin)
self.assertEqual(zone['ttl'], resp_data.ttl)
+
+ @decorators.attr(type='slow')
+ @decorators.idempotent_id('d8f444aa-a645-4a03-b366-46836f57dc69')
+ def test_all_recordset_types_exist_in_show_zonefile(self):
+ recorsets_data_file = os.path.join(
+ os.path.dirname(__file__), 'recordset_data.json')
+
+ if not os.path.exists(recorsets_data_file):
+ raise self.skipException(
+ f"Could not find {recorsets_data_file}")
+
+ file = open(recorsets_data_file, "r")
+ load_file = json.loads(file.read())
+ file.close()
+
+ LOG.info('Create a zone')
+ zone_name = dns_data_utils.rand_zone_name(
+ name="all_recordset_types_exist", suffix=self.tld_name)
+ zone = self.zones_client.create_zone(name=zone_name,
+ wait_until=const.ACTIVE)[1]
+ self.addCleanup(self.wait_zone_delete, self.zones_client, zone['id'])
+
+ created_records = []
+ for record_data in load_file.values():
+ recordset_data = {
+ 'name': f"{record_data['name']}.{zone['name']}",
+ 'type': record_data['type'],
+ 'records': record_data['records'],
+ }
+ try:
+ LOG.info('Create a Recordset')
+ recordset = self.recordset_client.create_recordset(
+ zone['id'], recordset_data)[1]
+ self.addCleanup(self.wait_recordset_delete,
+ self.recordset_client, zone['id'],
+ recordset['id'])
+ created_records.append(recordset['records'])
+ waiters.wait_for_recordset_status(self.recordset_client,
+ zone['id'], recordset['id'],
+ const.ACTIVE)
+ except Exception as err:
+ LOG.warning(f"Record of type {recordset['type']} could not be"
+ f" created and failed with error: {err}")
+
+ LOG.info('Create a zone export')
+ zone_export = self.client.create_zone_export(
+ zone['id'], wait_until=const.COMPLETE)[1]
+ self.addCleanup(self.client.delete_zone_export, zone_export['id'])
+
+ LOG.info('Show exported zonefile')
+ created_zonefile = self.client.show_exported_zonefile(
+ zone_export['id'])[1]
+
+ file_records = [item.data for item in created_zonefile.records]
+ for record in created_records:
+ for r in record:
+ self.assertIn(r, file_records,
+ f"Failed, missing record: {r} in zone file")
diff --git a/designate_tempest_plugin/tests/scenario/v2/test_zones_import.py b/designate_tempest_plugin/tests/scenario/v2/test_zones_import.py
index e92feee..7286f23 100644
--- a/designate_tempest_plugin/tests/scenario/v2/test_zones_import.py
+++ b/designate_tempest_plugin/tests/scenario/v2/test_zones_import.py
@@ -14,6 +14,7 @@
from oslo_log import log as logging
from tempest.lib import decorators
+from designate_tempest_plugin.common import constants as const
from designate_tempest_plugin.common import waiters
from designate_tempest_plugin import data_utils as dns_data_utils
from designate_tempest_plugin.tests.api.v2.test_zones_imports import \
@@ -24,6 +25,8 @@
class ZonesImportTest(BaseZonesImportTest):
+ credentials = ["primary", "admin", "system_admin"]
+
@classmethod
def setup_clients(cls):
super(ZonesImportTest, cls).setup_clients()
@@ -34,33 +37,31 @@
@decorators.attr(type='slow')
@decorators.idempotent_id('679f38d0-2f2f-49c5-934e-8fe0c452f56e')
def test_create_zone_import_and_wait_for_zone(self):
- name = dns_data_utils.rand_zone_name('testimport')
- zonefile = dns_data_utils.rand_zonefile_data(name=name)
+ zone_name = dns_data_utils.rand_zone_name(
+ name="create_zone_import_and_wait_for_zone", suffix=self.tld_name)
+ zonefile = dns_data_utils.rand_zonefile_data(name=zone_name)
- LOG.info('Import zone %r', name)
- _, zone_import = self.client.create_zone_import(zonefile)
+ LOG.info('Import zone %r', zone_name)
+ zone_import = self.client.create_zone_import(
+ zonefile, wait_until=const.COMPLETE)[1]
self.addCleanup(self.client.delete_zone_import, zone_import['id'])
- LOG.info('Wait for the zone import to COMPLETE')
- waiters.wait_for_zone_import_status(self.client, zone_import['id'],
- "COMPLETE")
-
LOG.info('Check the zone import looks good')
- _, zone_import = self.client.show_zone_import(zone_import['id'])
+ zone_import = self.client.show_zone_import(zone_import['id'])[1]
self.addCleanup(self.wait_zone_delete,
self.zones_client,
zone_import['zone_id'])
- self.assertEqual('COMPLETE', zone_import['status'])
+ self.assertEqual(const.COMPLETE, zone_import['status'])
self.assertIsNotNone(zone_import['zone_id'])
self.assertIsNotNone(zone_import['links'].get('zone'))
LOG.info('Wait for the imported zone to go to ACTIVE')
- waiters.wait_for_zone_status(self.zones_client, zone_import['zone_id'],
- "ACTIVE")
+ waiters.wait_for_zone_status(
+ self.zones_client, zone_import['zone_id'], const.ACTIVE)
LOG.info('Check the imported zone looks good')
- _, zone = self.zones_client.show_zone(zone_import['zone_id'])
- self.assertEqual('NONE', zone['action'])
- self.assertEqual('ACTIVE', zone['status'])
- self.assertEqual(name, zone['name'])
+ zone = self.zones_client.show_zone(zone_import['zone_id'])[1]
+ self.assertEqual(const.NONE, zone['action'])
+ self.assertEqual(const.ACTIVE, zone['status'])
+ self.assertEqual(zone_name, zone['name'])
diff --git a/designate_tempest_plugin/tests/scenario/v2/test_zones_transfer.py b/designate_tempest_plugin/tests/scenario/v2/test_zones_transfer.py
index 3437c33..78aaabc 100644
--- a/designate_tempest_plugin/tests/scenario/v2/test_zones_transfer.py
+++ b/designate_tempest_plugin/tests/scenario/v2/test_zones_transfer.py
@@ -12,42 +12,71 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
+from tempest import config
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
+from designate_tempest_plugin.common import constants as const
from designate_tempest_plugin.tests import base
from designate_tempest_plugin import data_utils as dns_data_utils
+CONF = config.CONF
LOG = logging.getLogger(__name__)
class ZonesTransferTest(base.BaseDnsV2Test):
- credentials = ['primary', 'alt', 'admin']
+ credentials = ['primary', 'alt', 'admin', 'system_admin']
@classmethod
def setup_clients(cls):
super(ZonesTransferTest, cls).setup_clients()
+ if CONF.enforce_scope.designate:
+ cls.admin_zones_client = cls.os_system_admin.dns_v2.ZonesClient()
+ cls.admin_accept_client = (
+ cls.os_system_admin.dns_v2.TransferAcceptClient())
+ cls.admin_tld_client = cls.os_system_admin.dns_v2.TldClient()
+ else:
+ cls.admin_zones_client = cls.os_admin.dns_v2.ZonesClient()
+ cls.admin_accept_client = (
+ cls.os_admin.dns_v2.TransferAcceptClient())
+ cls.admin_tld_client = cls.os_admin.dns_v2.TldClient()
cls.zones_client = cls.os_primary.dns_v2.ZonesClient()
cls.alt_zones_client = cls.os_alt.dns_v2.ZonesClient()
- cls.admin_zones_client = cls.os_admin.dns_v2.ZonesClient()
cls.request_client = cls.os_primary.dns_v2.TransferRequestClient()
cls.alt_request_client = cls.os_alt.dns_v2.TransferRequestClient()
cls.accept_client = cls.os_primary.dns_v2.TransferAcceptClient()
cls.alt_accept_client = cls.os_alt.dns_v2.TransferAcceptClient()
- cls.admin_accept_client = cls.os_admin.dns_v2.TransferAcceptClient()
+
+ @classmethod
+ def resource_setup(cls):
+ super(ZonesTransferTest, cls).resource_setup()
+
+ # Make sure we have an allowed TLD available
+ tld_name = dns_data_utils.rand_zone_name(name="ZonesTransferTest")
+ cls.tld_name = f".{tld_name}"
+ cls.class_tld = cls.admin_tld_client.create_tld(tld_name=tld_name[:-1])
+
+ @classmethod
+ def resource_cleanup(cls):
+ cls.admin_tld_client.delete_tld(cls.class_tld[1]['id'])
+ super(ZonesTransferTest, cls).resource_cleanup()
@decorators.idempotent_id('60bd80ac-c979-4686-9a03-f2f775f272ab')
def test_zone_transfer(self):
LOG.info('Create a zone as primary tenant')
- _, zone = self.zones_client.create_zone()
+ zone_name = dns_data_utils.rand_zone_name(
+ name="zone_transfer", suffix=self.tld_name)
+ zone = self.zones_client.create_zone(name=zone_name)[1]
self.addCleanup(self.wait_zone_delete, self.zones_client, zone['id'],
ignore_errors=lib_exc.NotFound)
LOG.info('Create a zone transfer_request for zone as primary tenant')
- _, transfer_request = \
- self.request_client.create_transfer_request_empty_body(zone['id'])
+ transfer_request = (
+ self.request_client.create_transfer_request_empty_body(
+ zone['id'])[1])
self.addCleanup(self.request_client.delete_transfer_request,
- transfer_request['id'])
+ transfer_request['id'],
+ ignore_errors=lib_exc.NotFound)
accept_data = {
"key": transfer_request['key'],
@@ -55,10 +84,16 @@
}
LOG.info('Accept the request as alt tenant')
- self.alt_accept_client.create_transfer_accept(accept_data)
+ transfer_accept = self.alt_accept_client.create_transfer_accept(
+ accept_data)[1]
+
+ LOG.info('Ensure we respond with COMPLETE status')
+ show_request = self.request_client.show_transfer_request(
+ transfer_request['id'])[1]
+ self.assertEqual(const.COMPLETE, show_request['status'])
LOG.info('Fetch the zone as alt tenant')
- _, alt_zone = self.alt_zones_client.show_zone(zone['id'])
+ alt_zone = self.alt_zones_client.show_zone(zone['id'])[1]
self.addCleanup(self.wait_zone_delete,
self.alt_zones_client,
alt_zone['id'])
@@ -73,10 +108,21 @@
lib_exc.BadRequest, 'invalid_zone_transfer_request', 400):
self.admin_accept_client.create_transfer_accept(accept_data)
+ LOG.info('Delete the transfer_request')
+ self.request_client.delete_transfer_request(transfer_request['id'])[1]
+
+ LOG.info('Validation that transfer_accept deleted'
+ ' after the transfer_request delete')
+ self.assertRaises(lib_exc.NotFound,
+ self.accept_client.show_transfer_accept,
+ transfer_accept['id'])
+
@decorators.idempotent_id('5855b772-a036-11eb-9973-74e5f9e2a801')
def test_zone_transfer_target_project(self):
LOG.info('Create a zone as "primary" tenant')
- zone = self.zones_client.create_zone()[1]
+ zone_name = dns_data_utils.rand_zone_name(
+ name="zone_transfer_target_project", suffix=self.tld_name)
+ zone = self.zones_client.create_zone(name=zone_name)[1]
LOG.info('Create transfer_request with target project set to '
'"Admin" tenant')
@@ -87,7 +133,7 @@
self.addCleanup(self.request_client.delete_transfer_request,
transfer_request['id'])
LOG.info('Ensure we respond with ACTIVE status')
- self.assertEqual('ACTIVE', transfer_request['status'])
+ self.assertEqual(const.ACTIVE, transfer_request['status'])
LOG.info('Accept the request as "alt" tenant, Expected: should fail '
'as "admin" was set as a target project.')
@@ -100,9 +146,14 @@
transfer_accept_data=accept_data)
LOG.info('Accept the request as "Admin" tenant, Expected: should work')
- self.admin_accept_client.create_transfer_accept(accept_data)
+ self.admin_accept_client.create_transfer_accept(
+ accept_data, headers={'x-auth-sudo-project-id':
+ self.os_admin.credentials.project_id},
+ extra_headers=True)
LOG.info('Fetch the zone as "Admin" tenant')
- admin_zone = self.admin_zones_client.show_zone(zone['id'])[1]
+ admin_zone = self.admin_zones_client.show_zone(
+ zone['id'], headers={'x-auth-sudo-project-id':
+ self.os_admin.credentials.project_id})[1]
self.addCleanup(self.wait_zone_delete,
self.admin_zones_client,
admin_zone['id'])
diff --git a/tox.ini b/tox.ini
index a413aeb..545cdca 100644
--- a/tox.ini
+++ b/tox.ini
@@ -15,7 +15,7 @@
setenv =
VIRTUAL_ENV={envdir}
PYTHONDONTWRITEBYTECODE=1
-whitelist_externals = sh
+allowlist_externals = sh
find
rm
commands =
@@ -43,14 +43,14 @@
[testenv:releasenotes]
deps = {[testenv:docs]deps}
-whitelist_externals = rm
+allowlist_externals = rm
commands =
rm -rf releasenotes/build
sphinx-build -a -E -W -d releasenotes/build/doctrees -b html releasenotes/source releasenotes/build/html
[testenv:bashate]
deps = bashate
-whitelist_externals = bash
+allowlist_externals = bash
commands = bash -c "find {toxinidir}/devstack \
-not \( -type d -name .?\* -prune \) \
-not \( -type d -name doc -prune \) \