Add a scenario test for classless PTR delegation
This patch adds a scenario test for classless in-addr.arpa delegation
(RFC 2317).
Change-Id: I11157615a1273d4f087e725cc09de3ad64e07d29
diff --git a/designate_tempest_plugin/tests/rbac_utils.py b/designate_tempest_plugin/tests/rbac_utils.py
index fa66410..aa8bb6a 100644
--- a/designate_tempest_plugin/tests/rbac_utils.py
+++ b/designate_tempest_plugin/tests/rbac_utils.py
@@ -302,7 +302,7 @@
self.assertEqual(expected_count, len(result_objs),
message='Credential {} saw {} objects when {} '
- 'was expected.'.format(cred, len(result),
+ 'was expected.'.format(cred, len(result_objs),
expected_count))
def check_list_IDs_RBAC_enforcement(
diff --git a/designate_tempest_plugin/tests/scenario/v2/test_classless_ptr.py b/designate_tempest_plugin/tests/scenario/v2/test_classless_ptr.py
new file mode 100644
index 0000000..c149893
--- /dev/null
+++ b/designate_tempest_plugin/tests/scenario/v2/test_classless_ptr.py
@@ -0,0 +1,227 @@
+# Copyright 2022 Red Hat.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+import time
+
+from oslo_utils import versionutils
+from tempest import config
+from tempest.lib.common.utils import test_utils
+from tempest.lib import decorators
+from tempest.lib import exceptions as lib_exc
+
+from designate_tempest_plugin.tests import base
+from designate_tempest_plugin.services.dns.query.query_client import (
+ SingleQueryClient)
+
+CONF = config.CONF
+
+
+# This test suite is intended to test RFC 2317 classless in-addr.arpa
+# delegation scenarios.
+class ClasslessPTRTest(base.BaseDnsV2Test):
+
+ credentials = ['primary', 'admin', 'system_admin', 'alt']
+
+ @classmethod
+ def setup_credentials(cls):
+ # Do not create network resources for these tests.
+ cls.set_network_resources()
+ super(ClasslessPTRTest, cls).setup_credentials()
+
+ @classmethod
+ def setup_clients(cls):
+ super(ClasslessPTRTest, cls).setup_clients()
+ if CONF.enforce_scope.designate:
+ cls.admin_tld_client = cls.os_system_admin.dns_v2.TldClient()
+ else:
+ cls.admin_tld_client = cls.os_admin.dns_v2.TldClient()
+ cls.zone_client = cls.os_primary.dns_v2.ZonesClient()
+ cls.recordset_client = cls.os_primary.dns_v2.RecordsetClient()
+ cls.alt_rec_client = cls.os_alt.dns_v2.RecordsetClient()
+ cls.share_zone_client = cls.os_primary.dns_v2.SharedZonesClient()
+
+ @classmethod
+ def resource_setup(cls):
+ super(ClasslessPTRTest, cls).resource_setup()
+
+ # Make sure we have an allowed TLD available
+ cls.tld_name = '0.192.in-addr-arpa'
+ cls.class_tld = cls.admin_tld_client.create_tld(tld_name=cls.tld_name)
+
+ @classmethod
+ def resource_cleanup(cls):
+ cls.admin_tld_client.delete_tld(cls.class_tld[1]['id'])
+ super(ClasslessPTRTest, cls).resource_cleanup()
+
+ @decorators.attr(type='slow')
+ @decorators.idempotent_id('f2d9596c-ce87-4dfd-9bb4-ad2430fd3fe6')
+ def test_classless_ptr_delegation(self):
+ # Create full subnet zone
+ zone_name = f'2.{self.tld_name}.'
+ zone = self.zone_client.create_zone(name=zone_name,
+ wait_until='ACTIVE')[1]
+ self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'],
+ ignore_errors=lib_exc.NotFound)
+
+ # Create the delegated zone
+ delegated_zone_name = f'1-3.2.{self.tld_name}.'
+ delegated_zone = self.zone_client.create_zone(
+ name=delegated_zone_name, wait_until='ACTIVE')[1]
+ self.addCleanup(self.wait_zone_delete, self.zone_client,
+ delegated_zone['id'], ignore_errors=lib_exc.NotFound)
+
+ # Create the PTR record in the delegated zone
+ ptr_recordset_data = {
+ 'name': f'1.1-3.2.{self.tld_name}.',
+ 'type': 'PTR',
+ 'records': ['www.example.org.']
+ }
+ ptr_recordset = self.recordset_client.create_recordset(
+ delegated_zone['id'], ptr_recordset_data, wait_until='ACTIVE')[1]
+ self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+ self.recordset_client.delete_recordset,
+ delegated_zone['id'], ptr_recordset['id'])
+
+ # Create the CNAME record
+ cname_recordset_data = {
+ 'name': f'1.2.{self.tld_name}.',
+ 'type': 'CNAME',
+ 'records': [f'1.1-3.2.{self.tld_name}.']
+ }
+ cname_recordset = self.recordset_client.create_recordset(
+ zone['id'], cname_recordset_data, wait_until='ACTIVE')[1]
+ self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+ self.recordset_client.delete_recordset,
+ zone['id'], cname_recordset['id'])
+
+ # Check for a CNAME record
+ if config.CONF.dns.nameservers:
+ ns = config.CONF.dns.nameservers[0]
+ start = time.time()
+ while True:
+ ns_obj = SingleQueryClient(ns, config.CONF.dns.query_timeout)
+ ns_record = ns_obj.query(
+ cname_recordset['name'],
+ rdatatype=cname_recordset_data['type'])
+ if cname_recordset_data['records'][0] in str(ns_record):
+ break
+ if time.time() - start >= config.CONF.dns.build_timeout:
+ raise lib_exc.TimeoutException(
+ 'Failed, CNAME record was not detected on '
+ 'Nameserver:{} within a timeout of:{}'
+ ' seconds.'.format(ns, config.CONF.dns.build_timeout))
+
+ # Check for a PTR record
+ if config.CONF.dns.nameservers:
+ ns = config.CONF.dns.nameservers[0]
+ start = time.time()
+ while True:
+ ns_obj = SingleQueryClient(ns, config.CONF.dns.query_timeout)
+ ns_record = ns_obj.query(
+ ptr_recordset['name'],
+ rdatatype=ptr_recordset_data['type'])
+ if ptr_recordset_data['records'][0] in str(ns_record):
+ break
+ if time.time() - start >= config.CONF.dns.build_timeout:
+ raise lib_exc.TimeoutException(
+ 'Failed, PTR record was not detected on '
+ 'Nameserver:{} within a timeout of:{}'
+ ' seconds.'.format(ns, config.CONF.dns.build_timeout))
+
+ @decorators.attr(type='slow')
+ @decorators.idempotent_id('0110e7b1-9582-410e-b3d5-bd38a1265222')
+ def test_classless_ptr_delegation_shared_zone(self):
+
+ if not versionutils.is_compatible('2.1', self.api_version,
+ same_major=False):
+ raise self.skipException(
+ 'Zone share tests require Designate API version 2.1 or newer. '
+ 'Skipping test_classless_ptr_delegation_shared_zone test.')
+
+ # Create full subnet zone
+ zone_name = f'2.{self.tld_name}.'
+ zone = self.zone_client.create_zone(name=zone_name,
+ wait_until='ACTIVE')[1]
+ self.addCleanup(self.wait_zone_delete, self.zone_client, zone['id'],
+ ignore_errors=lib_exc.NotFound)
+
+ # Create the delegated zone
+ delegated_zone_name = f'1-3.2.{self.tld_name}.'
+ delegated_zone = self.zone_client.create_zone(
+ name=delegated_zone_name, wait_until='ACTIVE')[1]
+ self.addCleanup(self.wait_zone_delete, self.zone_client,
+ delegated_zone['id'], ignore_errors=lib_exc.NotFound)
+
+ # Create the CNAME record
+ cname_recordset_data = {
+ 'name': f'1.2.{self.tld_name}.',
+ 'type': 'CNAME',
+ 'records': [f'1.1-3.2.{self.tld_name}.']
+ }
+ cname_recordset = self.recordset_client.create_recordset(
+ zone['id'], cname_recordset_data, wait_until='ACTIVE')[1]
+ self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+ self.recordset_client.delete_recordset,
+ zone['id'], cname_recordset['id'])
+
+ # Share the zone with the alt credential
+ shared_zone = self.share_zone_client.create_zone_share(
+ delegated_zone['id'], self.alt_rec_client.project_id)[1]
+ self.addCleanup(self.share_zone_client.delete_zone_share,
+ delegated_zone['id'], shared_zone['id'])
+
+ # Create the PTR record in the delegated zone as the alt project
+ ptr_recordset_data = {
+ 'name': f'1.1-3.2.{self.tld_name}.',
+ 'type': 'PTR',
+ 'records': ['www.example.org.']
+ }
+ ptr_recordset = self.alt_rec_client.create_recordset(
+ delegated_zone['id'], ptr_recordset_data, wait_until='ACTIVE')[1]
+ self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+ self.alt_rec_client.delete_recordset,
+ delegated_zone['id'], ptr_recordset['id'])
+
+ # Check for a CNAME record
+ if config.CONF.dns.nameservers:
+ ns = config.CONF.dns.nameservers[0]
+ start = time.time()
+ while True:
+ ns_obj = SingleQueryClient(ns, config.CONF.dns.query_timeout)
+ ns_record = ns_obj.query(
+ cname_recordset['name'],
+ rdatatype=cname_recordset_data['type'])
+ if cname_recordset_data['records'][0] in str(ns_record):
+ break
+ if time.time() - start >= config.CONF.dns.build_timeout:
+ raise lib_exc.TimeoutException(
+ 'Failed, CNAME record was not detected on '
+ 'Nameserver:{} within a timeout of:{}'
+ ' seconds.'.format(ns, config.CONF.dns.build_timeout))
+
+ # Check for a PTR record
+ if config.CONF.dns.nameservers:
+ ns = config.CONF.dns.nameservers[0]
+ start = time.time()
+ while True:
+ ns_obj = SingleQueryClient(ns, config.CONF.dns.query_timeout)
+ ns_record = ns_obj.query(
+ ptr_recordset['name'],
+ rdatatype=ptr_recordset_data['type'])
+ if ptr_recordset_data['records'][0] in str(ns_record):
+ break
+ if time.time() - start >= config.CONF.dns.build_timeout:
+ raise lib_exc.TimeoutException(
+ 'Failed, PTR record was not detected on '
+ 'Nameserver:{} within a timeout of:{}'
+ ' seconds.'.format(ns, config.CONF.dns.build_timeout))