import subprocess
from tempest.lib import decorators
+from tempest.lib import exceptions as lib_exc
from designate_tempest_plugin.tests import base
+from designate_tempest_plugin.common import constants as const
+from designate_tempest_plugin.common import waiters
+from designate_tempest_plugin import data_utils as dns_data_utils
from designate_tempest_plugin.tests import resources
+from designate_tempest_plugin.services.dns.query.query_client import (
+ SingleQueryClient)
CONF = config.CONF
LOG = logging.getLogger(__name__)
)
if os.path.exists(temp_pools_yaml_conf_path):
os.remove(temp_pools_yaml_conf_path)
+
+
+@testtools.skipUnless(CONF.dns_feature_enabled.test_multipool_with_delete_opt,
+ 'Multipools feature is being tested with --delete '
+ 'option. It might delete pools that were created in '
+ 'other tests.')
+class DesignateMultiPoolTest(DesignateManagePoolTest):
+
+ _pool1_id = None
+ _pool2_id = None
+
+ @property
+ def pool1_id(self):
+ if not self._pool1_id:
+ self._set_pool_ids()
+ return self._pool1_id
+
+ @property
+ def pool2_id(self):
+ if not self._pool2_id:
+ self._set_pool_ids()
+ return self._pool2_id
+
+ def _set_pool_ids(self):
+ pool_config = self._run_designate_manage_pool_command(
+ 'show_config', '--all').split('\n\n')[0]
+ pool_config_list = pool_config.split('\n')
+ pool_ids = list()
+ for item in pool_config_list:
+ if 'id:' in item:
+ id = item.split()[1]
+ pool_ids.append(id)
+ if len(pool_ids) < 2:
+ self.skipException('Multipool tests require at least 2 pools.')
+ self._pool1_id = pool_ids[0]
+ self._pool2_id = pool_ids[1]
+
+ @classmethod
+ def setup_clients(cls):
+ super(DesignateMultiPoolTest, cls).setup_clients()
+ cls.admin_tld_client = cls.os_admin.dns_v2.TldClient()
+ cls.rec_client = cls.os_admin.dns_v2.RecordsetClient()
+ cls.admin_zones_client = cls.os_admin.dns_v2.ZonesClient()
+
+ @classmethod
+ def resource_setup(cls):
+ super().resource_setup()
+ super()._update_pools_file(cls.MULTIPOOLS_FILE_PATH)
+ # Make sure we have an allowed TLD available
+ tld_name = dns_data_utils.rand_zone_name(name="ZonesTest")
+ cls.tld_name = f".{tld_name}"
+ cls.class_tld = cls.admin_tld_client.create_tld(tld_name=tld_name[:-1])
+
+ def tearDown(self):
+ super().tearDown()
+ super()._update_pools_file(self.MULTIPOOLS_FILE_PATH)
+
+ @classmethod
+ def resource_cleanup(cls):
+ super().resource_cleanup()
+ cls.admin_tld_client.delete_tld(cls.class_tld[1]['id'])
+
+ @decorators.attr(type='smoke')
+ @decorators.attr(type='slow')
+ @decorators.idempotent_id('c0648f53-4114-45bd-8792-462a82f69d32')
+ def test_create_zones(self):
+ LOG.info('Create 2 zones, one per pool')
+
+ zone1_name = dns_data_utils.rand_zone_name(
+ name="create_zones_multipool-",
+ suffix=self.tld_name)
+
+ zone1 = self.admin_zones_client.create_zone(
+ name=f"{zone1_name}",
+ attributes={'pool_id': f'{self.pool1_id}'})[1]
+ self.addCleanup(self.wait_zone_delete,
+ self.admin_zones_client, zone1['id'],
+ ignore_errors=lib_exc.NotFound)
+
+ zone2_name = dns_data_utils.rand_zone_name(
+ name="create_zones_multipool-",
+ suffix=self.tld_name)
+
+ zone2 = self.admin_zones_client.create_zone(
+ name=f"{zone2_name}",
+ attributes={'pool_id': f'{self.pool2_id}'})[1]
+ self.addCleanup(self.wait_zone_delete,
+ self.admin_zones_client,
+ zone2['id'],
+ ignore_errors=lib_exc.NotFound)
+
+ self.assertNotEqual(zone1['pool_id'], zone2['pool_id'])
+
+ # wait for both of them to be active
+ for zone in [zone1, zone2]:
+ waiters.wait_for_zone_status(
+ self.admin_zones_client, zone['id'], const.ACTIVE)
+
+ # Create a type A recordset for each zone
+ recordset_data = dns_data_utils.rand_recordset_data(
+ record_type='A', zone_name=zone['name'])
+ body = self.rec_client.create_recordset(zone['id'],
+ recordset_data)[1]
+ self.addCleanup(
+ self.wait_recordset_delete, self.rec_client,
+ zone['id'], body['id'])
+ self.assertEqual(const.PENDING, body['status'],
+ 'Failed, expected status is PENDING')
+ waiters.wait_for_recordset_status(
+ self.rec_client, zone['id'],
+ body['id'], const.ACTIVE)
+
+ for ns in config.CONF.dns.nameservers:
+ ns_obj = SingleQueryClient(ns, config.CONF.dns.query_timeout)
+ a_record = str(ns_obj.query(zone['name'], rdatatype='A'))
+ self.assertNotEmpty(a_record)