Create TLD for DNSIntegrationTests
This is required to make sure concurrent tests that adds tlds
will not conflict.
Related-Prod: PRODX-29414
Change-Id: I3208363bb6997db28f31325634c5414c6a9bcd5e
(cherry picked from commit b24337af2bd3e891b8cdb94e7f9885c346636d7b)
diff --git a/neutron_tempest_plugin/scenario/test_dns_integration.py b/neutron_tempest_plugin/scenario/test_dns_integration.py
index d520aa2..f7ab6ef 100644
--- a/neutron_tempest_plugin/scenario/test_dns_integration.py
+++ b/neutron_tempest_plugin/scenario/test_dns_integration.py
@@ -17,11 +17,9 @@
import testtools
-from oslo_log import log
from tempest.common import utils
from tempest.common import waiters
from tempest.lib.common.utils import data_utils
-from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
@@ -32,8 +30,6 @@
CONF = config.CONF
-LOG = log.getLogger(__name__)
-
# Note(jh): Need to do a bit of juggling here in order to avoid failures
# when designate_tempest_plugin is not available
@@ -45,15 +41,36 @@
DNSMixin = object
+def rand_zone_name(name='', prefix='rand', suffix=None):
+ """Generate a random zone name
+
+ :param str name: The name that you want to include
+ :param prefix: the exact text to start the string. Defaults to "rand"
+ :param suffix: the exact text to end the string
+ :return: a random zone name e.g. example.org.
+ :rtype: string
+ """
+
+ if suffix is None:
+ suffix = '.{}.'.format(CONF.dns.tld_suffix)
+ name = data_utils.rand_name(name=name, prefix=prefix)
+ return name + suffix
+
+
class BaseDNSIntegrationTests(base.BaseTempestTestCase, DNSMixin):
credentials = ['primary', 'admin']
@classmethod
def setup_clients(cls):
super(BaseDNSIntegrationTests, cls).setup_clients()
- cls.zone_client = cls.os_tempest.dns_v2.ZonesClient()
- cls.recordset_client = cls.os_tempest.dns_v2.RecordsetClient()
+ cls.dns_client = cls.os_tempest.dns_v2.ZonesClient()
cls.query_client.build_timeout = 30
+ if CONF.enforce_scope.designate:
+ cls.admin_tld_client = cls.os_system_admin.dns_v2.TldClient()
+ cls.rec_client = cls.os_system_admin.dns_v2.RecordsetClient()
+ else:
+ cls.admin_tld_client = cls.os_admin.dns_v2.TldClient()
+ cls.rec_client = cls.os_admin.dns_v2.RecordsetClient()
@classmethod
def skip_checks(cls):
@@ -63,16 +80,26 @@
raise cls.skipException("Designate support is required")
if not (dns_base and dns_waiters):
raise cls.skipException("Designate tempest plugin is missing")
+ # Neutron creates zones for reverse lookups and designate requires
+ # related TLD if there is any
+ if CONF.production:
+ if CONF.dns.existing_tlds and not set([CONF.dns.tld_suffix,
+ "arpa", "in-addr.arpa"]).issubset(CONF.dns.existing_tlds):
+ raise cls.skipException("Skip on production environment "
+ "because it doesn't match TLD configuration.")
@classmethod
@utils.requires_ext(extension="dns-integration", service="network")
def resource_setup(cls):
super(BaseDNSIntegrationTests, cls).resource_setup()
- cls.zone = cls.zone_client.create_zone()[1]
- cls.addClassResourceCleanup(cls.zone_client.delete_zone,
+
+ zone_name = rand_zone_name(
+ name="dnsinttest", prefix='')
+ _, cls.zone = cls.dns_client.create_zone(name=zone_name)
+ cls.addClassResourceCleanup(cls.dns_client.delete_zone,
cls.zone['id'], ignore_errors=lib_exc.NotFound)
dns_waiters.wait_for_zone_status(
- cls.zone_client, cls.zone['id'], 'ACTIVE')
+ cls.dns_client, cls.zone['id'], 'ACTIVE')
cls.network = cls.create_network(dns_domain=cls.zone['name'])
cls.subnet = cls.create_subnet(cls.network)
@@ -81,6 +108,10 @@
cls.create_router_interface(cls.router['id'], cls.subnet['id'])
cls.keypair = cls.create_keypair()
+ @classmethod
+ def resource_cleanup(cls):
+ super(BaseDNSIntegrationTests, cls).resource_cleanup()
+
def _create_floatingip_with_dns(self, dns_name):
return self.create_floatingip(client=self.os_primary.network_client,
dns_name=dns_name,
@@ -99,79 +130,12 @@
fip = self.create_floatingip(port=port)
return {'port': port, 'fip': fip, 'server': server}
- def _check_type_in_recordsets(self, zone_id, rec_type):
- types = [rec['type'] for rec in self.recordset_client.list_recordset(
- zone_id)[1]['recordsets']]
- if rec_type in types:
- return True
- return False
-
- def _wait_for_type_in_recordsets(self, zone_id, type):
- test_utils.call_until_true(
- func=self._check_type_in_recordsets, zone_id=zone_id,
- rec_type=type, duration=self.query_client.build_timeout,
- sleep_for=5)
-
- def _check_recordset_deleted(
- self, recordset_client, zone_id, recordset_id):
- return test_utils.call_and_ignore_notfound_exc(
- recordset_client.show_recordset, zone_id, recordset_id) is None
-
- def _verify_designate_recordset(
- self, address, found=True, record_type='A'):
- if found:
- self._wait_for_type_in_recordsets(self.zone['id'], record_type)
- recordsets = self.recordset_client.list_recordset(
- self.zone['id'])[1]['recordsets']
- relevant_type = [rec for rec in recordsets if
- rec['type'] == record_type]
- self.assertTrue(
- relevant_type,
- 'Failed no {} type recordset has been detected in the '
- 'Designate DNS DB'.format(record_type))
- rec_id = [rec['id'] for rec in relevant_type if address in
- str(rec['records'])][0]
- self.assertTrue(
- rec_id, 'Record of type:{} with IP:{} was not detected in '
- 'the Designate DNS DB'.format(record_type, address))
- dns_waiters.wait_for_recordset_status(
- self.recordset_client, self.zone['id'], rec_id, 'ACTIVE')
- else:
- rec_id = None
- recordsets = self.recordset_client.list_recordset(
- self.zone['id'])[1]['recordsets']
- relevant_type = [rec for rec in recordsets if
- rec['type'] == record_type]
- if relevant_type:
- rec_id = [rec['id'] for rec in relevant_type if
- address in str(rec['records'])][0]
- if rec_id:
- recordset_exists = test_utils.call_until_true(
- func=self._check_recordset_deleted,
- recordset_client=self.recordset_client,
- zone_id=self.zone['id'], recordset_id=rec_id,
- duration=self.query_client.build_timeout, sleep_for=5)
- self.assertTrue(
- recordset_exists,
- 'Failed, recordset type:{} and ID:{} is still exist in '
- 'the Designate DNS DB'.format(record_type, rec_id))
-
def _verify_dns_records(self, address, name, found=True, record_type='A'):
client = self.query_client
forward = name + '.' + self.zone['name']
reverse = ipaddress.ip_address(address).reverse_pointer
- record_types_to_check = [record_type, 'PTR']
- for rec_type in record_types_to_check:
- try:
- if rec_type == 'PTR':
- dns_waiters.wait_for_query(
- client, reverse, rec_type, found)
- else:
- dns_waiters.wait_for_query(
- client, forward, rec_type, found)
- except Exception as e:
- LOG.error(e)
- self._verify_designate_recordset(address, found, rec_type)
+ dns_waiters.wait_for_query(client, forward, record_type, found)
+ dns_waiters.wait_for_query(client, reverse, 'PTR', found)
if not found:
return
fwd_response = client.query(forward, record_type)
@@ -212,10 +176,11 @@
@classmethod
def resource_setup(cls):
super(DNSIntegrationAdminTests, cls).resource_setup()
- segmentation_id = CONF.designate_feature_enabled.segmentation_id
- cls.network2 = cls.create_network(
- dns_domain=cls.zone['name'], provider_network_type='vxlan',
- provider_segmentation_id=segmentation_id)
+ # TODO(jh): We should add the segmentation_id as tempest option
+ # so that it can be changed to match the deployment if needed
+ cls.network2 = cls.create_network(dns_domain=cls.zone['name'],
+ provider_network_type='vxlan',
+ provider_segmentation_id=12345)
cls.subnet2 = cls.create_subnet(cls.network2)
def _verify_dns_assignment(self, port):
@@ -289,15 +254,15 @@
name = data_utils.rand_name('test-domain')
zone_name = "%s.%s.%s.zone." % (cls.client.user_id,
- cls.client.project_id,
+ cls.client.tenant_id,
name)
dns_domain_template = "<user_id>.<project_id>.%s.zone." % name
- cls.zone = cls.zone_client.create_zone(name=zone_name)[1]
- cls.addClassResourceCleanup(cls.zone_client.delete_zone,
+ _, cls.zone = cls.dns_client.create_zone(name=zone_name)
+ cls.addClassResourceCleanup(cls.dns_client.delete_zone,
cls.zone['id'], ignore_errors=lib_exc.NotFound)
dns_waiters.wait_for_zone_status(
- cls.zone_client, cls.zone['id'], 'ACTIVE')
+ cls.dns_client, cls.zone['id'], 'ACTIVE')
cls.network = cls.create_network(dns_domain=dns_domain_template)
cls.subnet = cls.create_subnet(cls.network,