Migrate the "unauthed" tests
This migrates the tests that check for a 401 if a token is not provided
in the request headers.
Change-Id: I9752685afccee5dcd009f76b5242fd6f12f0fc37
diff --git a/designate_tempest_plugin/clients.py b/designate_tempest_plugin/clients.py
index 66db3de..5f4f728 100644
--- a/designate_tempest_plugin/clients.py
+++ b/designate_tempest_plugin/clients.py
@@ -13,6 +13,7 @@
# under the License.
from tempest import clients
from tempest import config
+from tempest.lib.auth import KeystoneV2AuthProvider
from designate_tempest_plugin.services.dns.v1.json.domains_client import \
DomainsClient
@@ -49,58 +50,93 @@
class ManagerV1(clients.Manager):
+
def __init__(self, credentials=None, service=None):
super(ManagerV1, self).__init__(credentials, service)
+ self._init_clients(self._get_params())
- params = {
+ def _init_clients(self, params):
+ self.domains_client = DomainsClient(**params)
+ self.records_client = RecordsClient(**params)
+ self.servers_client = ServersClient(**params)
+
+ def _get_params(self):
+ params = dict(self.default_params)
+ params.update({
+ 'auth_provider': self.auth_provider,
'service': CONF.dns.catalog_type,
'region': CONF.identity.region,
'endpoint_type': CONF.dns.endpoint_type,
'build_interval': CONF.dns.build_interval,
'build_timeout': CONF.dns.build_timeout
- }
- params.update(self.default_params)
-
- self.domains_client = DomainsClient(self.auth_provider, **params)
- self.records_client = RecordsClient(self.auth_provider, **params)
- self.servers_client = ServersClient(self.auth_provider, **params)
+ })
+ return params
class ManagerV2(clients.Manager):
+
def __init__(self, credentials=None, service=None):
super(ManagerV2, self).__init__(credentials, service)
+ self._init_clients(self._get_params())
- params = {
- 'service': CONF.dns.catalog_type,
- 'region': CONF.identity.region,
- 'endpoint_type': CONF.dns.endpoint_type,
- 'build_interval': CONF.dns.build_interval,
- 'build_timeout': CONF.dns.build_timeout
- }
- params.update(self.default_params)
+ def _init_clients(self, params):
+ self.zones_client = ZonesClient(**params)
+ self.zone_imports_client = ZoneImportsClient(**params)
+ self.blacklists_client = BlacklistsClient(**params)
+ self.quotas_client = QuotasClient(**params)
+ self.zone_exports_client = ZoneExportsClient(**params)
+ self.recordset_client = RecordsetClient(**params)
+ self.pool_client = PoolClient(**params)
+ self.tld_client = TldClient(**params)
+ self.transfer_request_client = TransferRequestClient(**params)
+ self.transfer_accept_client = TransferAcceptClient(**params)
+ self.tsigkey_client = TsigkeyClient(**params)
- self.zones_client = ZonesClient(self.auth_provider, **params)
- self.zone_imports_client = ZoneImportsClient(self.auth_provider,
- **params)
- self.blacklists_client = BlacklistsClient(self.auth_provider, **params)
- self.quotas_client = QuotasClient(self.auth_provider, **params)
- self.zone_exports_client = ZoneExportsClient(self.auth_provider,
- **params)
- self.recordset_client = RecordsetClient(self.auth_provider,
- **params)
- self.pool_client = PoolClient(self.auth_provider,
- **params)
- self.tld_client = TldClient(self.auth_provider,
- **params)
self.query_client = QueryClient(
nameservers=CONF.dns.nameservers,
query_timeout=CONF.dns.query_timeout,
build_interval=CONF.dns.build_interval,
build_timeout=CONF.dns.build_timeout,
)
- self.transfer_request_client = TransferRequestClient(
- self.auth_provider, **params)
- self.transfer_accept_client = TransferAcceptClient(
- self.auth_provider, **params)
- self.tsigkey_client = TsigkeyClient(
- self.auth_provider, **params)
+
+ def _get_params(self):
+ params = dict(self.default_params)
+ params.update({
+ 'auth_provider': self.auth_provider,
+ 'service': CONF.dns.catalog_type,
+ 'region': CONF.identity.region,
+ 'endpoint_type': CONF.dns.endpoint_type,
+ 'build_interval': CONF.dns.build_interval,
+ 'build_timeout': CONF.dns.build_timeout
+ })
+ return params
+
+
+class ManagerV2Unauthed(ManagerV2):
+
+ def __init__(self, credentials=None, service=None):
+ super(ManagerV2Unauthed, self).__init__(credentials, service)
+ self.auth_provider = KeystoneV2UnauthedProvider(
+ credentials=self.auth_provider.credentials,
+ auth_url=self.auth_provider.auth_client.auth_url,
+ disable_ssl_certificate_validation=self.auth_provider.dscv,
+ ca_certs=self.auth_provider.ca_certs,
+ trace_requests=self.auth_provider.trace_requests,
+ )
+ self._init_clients(self._get_params())
+
+
+class KeystoneV2UnauthedProvider(KeystoneV2AuthProvider):
+ """This auth provider will ensure requests are made without a token"""
+
+ def _decorate_request(self, filters, method, url, headers=None, body=None,
+ auth_data=None):
+ result = super(KeystoneV2UnauthedProvider, self)._decorate_request(
+ filters, method, url, headers=headers, body=body,
+ auth_data=auth_data)
+ url, headers, body = result
+ try:
+ del headers['X-Auth-Token']
+ except KeyError:
+ pass
+ return url, headers, body