Merge "Drop auth and corresponding unit tests"
diff --git a/requirements.txt b/requirements.txt
index 0d7fc0d..174c7c8 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -23,4 +23,4 @@
iso8601>=0.1.9
fixtures>=0.3.14
testscenarios>=0.4
-tempest-lib>=0.4.0
+tempest-lib>=0.5.0
diff --git a/tempest/api/identity/admin/v3/test_default_project_id.py b/tempest/api/identity/admin/v3/test_default_project_id.py
index 95ba885..0b9d60e 100644
--- a/tempest/api/identity/admin/v3/test_default_project_id.py
+++ b/tempest/api/identity/admin/v3/test_default_project_id.py
@@ -10,10 +10,10 @@
# License for the specific language governing permissions and limitations
# under the License.
+from tempest_lib import auth
from tempest_lib.common.utils import data_utils
from tempest.api.identity import base
-from tempest import auth
from tempest import clients
from tempest import config
from tempest import manager
diff --git a/tempest/auth.py b/tempest/auth.py
deleted file mode 100644
index 113ad69..0000000
--- a/tempest/auth.py
+++ /dev/null
@@ -1,659 +0,0 @@
-# Copyright 2014 Hewlett-Packard Development Company, L.P.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import abc
-import copy
-import datetime
-import exceptions
-import re
-import urlparse
-
-from oslo_log import log as logging
-import six
-
-from tempest.services.identity.v2.json import token_client as json_v2id
-from tempest.services.identity.v3.json import token_client as json_v3id
-
-
-LOG = logging.getLogger(__name__)
-
-
-@six.add_metaclass(abc.ABCMeta)
-class AuthProvider(object):
- """
- Provide authentication
- """
-
- def __init__(self, credentials):
- """
- :param credentials: credentials for authentication
- """
- if self.check_credentials(credentials):
- self.credentials = credentials
- else:
- raise TypeError("Invalid credentials")
- self.cache = None
- self.alt_auth_data = None
- self.alt_part = None
-
- def __str__(self):
- return "Creds :{creds}, cached auth data: {cache}".format(
- creds=self.credentials, cache=self.cache)
-
- @abc.abstractmethod
- def _decorate_request(self, filters, method, url, headers=None, body=None,
- auth_data=None):
- """
- Decorate request with authentication data
- """
- return
-
- @abc.abstractmethod
- def _get_auth(self):
- return
-
- @abc.abstractmethod
- def _fill_credentials(self, auth_data_body):
- return
-
- def fill_credentials(self):
- """
- Fill credentials object with data from auth
- """
- auth_data = self.get_auth()
- self._fill_credentials(auth_data[1])
- return self.credentials
-
- @classmethod
- def check_credentials(cls, credentials):
- """
- Verify credentials are valid.
- """
- return isinstance(credentials, Credentials) and credentials.is_valid()
-
- @property
- def auth_data(self):
- return self.get_auth()
-
- @auth_data.deleter
- def auth_data(self):
- self.clear_auth()
-
- def get_auth(self):
- """
- Returns auth from cache if available, else auth first
- """
- if self.cache is None or self.is_expired(self.cache):
- self.set_auth()
- return self.cache
-
- def set_auth(self):
- """
- Forces setting auth, ignores cache if it exists.
- Refills credentials
- """
- self.cache = self._get_auth()
- self._fill_credentials(self.cache[1])
-
- def clear_auth(self):
- """
- Can be called to clear the access cache so that next request
- will fetch a new token and base_url.
- """
- self.cache = None
- self.credentials.reset()
-
- @abc.abstractmethod
- def is_expired(self, auth_data):
- return
-
- def auth_request(self, method, url, headers=None, body=None, filters=None):
- """
- Obtains auth data and decorates a request with that.
- :param method: HTTP method of the request
- :param url: relative URL of the request (path)
- :param headers: HTTP headers of the request
- :param body: HTTP body in case of POST / PUT
- :param filters: select a base URL out of the catalog
- :returns a Tuple (url, headers, body)
- """
- orig_req = dict(url=url, headers=headers, body=body)
-
- auth_url, auth_headers, auth_body = self._decorate_request(
- filters, method, url, headers, body)
- auth_req = dict(url=auth_url, headers=auth_headers, body=auth_body)
-
- # Overwrite part if the request if it has been requested
- if self.alt_part is not None:
- if self.alt_auth_data is not None:
- alt_url, alt_headers, alt_body = self._decorate_request(
- filters, method, url, headers, body,
- auth_data=self.alt_auth_data)
- alt_auth_req = dict(url=alt_url, headers=alt_headers,
- body=alt_body)
- auth_req[self.alt_part] = alt_auth_req[self.alt_part]
-
- else:
- # If alt auth data is None, skip auth in the requested part
- auth_req[self.alt_part] = orig_req[self.alt_part]
-
- # Next auth request will be normal, unless otherwise requested
- self.reset_alt_auth_data()
-
- return auth_req['url'], auth_req['headers'], auth_req['body']
-
- def reset_alt_auth_data(self):
- """
- Configure auth provider to provide valid authentication data
- """
- self.alt_part = None
- self.alt_auth_data = None
-
- def set_alt_auth_data(self, request_part, auth_data):
- """
- Configure auth provider to provide alt authentication data
- on a part of the *next* auth_request. If credentials are None,
- set invalid data.
- :param request_part: request part to contain invalid auth: url,
- headers, body
- :param auth_data: alternative auth_data from which to get the
- invalid data to be injected
- """
- self.alt_part = request_part
- self.alt_auth_data = auth_data
-
- @abc.abstractmethod
- def base_url(self, filters, auth_data=None):
- """
- Extracts the base_url based on provided filters
- """
- return
-
-
-class KeystoneAuthProvider(AuthProvider):
-
- token_expiry_threshold = datetime.timedelta(seconds=60)
-
- def __init__(self, credentials, auth_url,
- disable_ssl_certificate_validation=None,
- ca_certs=None, trace_requests=None):
- super(KeystoneAuthProvider, self).__init__(credentials)
- self.dsvm = disable_ssl_certificate_validation
- self.ca_certs = ca_certs
- self.trace_requests = trace_requests
- self.auth_client = self._auth_client(auth_url)
-
- def _decorate_request(self, filters, method, url, headers=None, body=None,
- auth_data=None):
- if auth_data is None:
- auth_data = self.auth_data
- token, _ = auth_data
- base_url = self.base_url(filters=filters, auth_data=auth_data)
- # build authenticated request
- # returns new request, it does not touch the original values
- _headers = copy.deepcopy(headers) if headers is not None else {}
- _headers['X-Auth-Token'] = str(token)
- if url is None or url == "":
- _url = base_url
- else:
- # Join base URL and url, and remove multiple contiguous slashes
- _url = "/".join([base_url, url])
- parts = [x for x in urlparse.urlparse(_url)]
- parts[2] = re.sub("/{2,}", "/", parts[2])
- _url = urlparse.urlunparse(parts)
- # no change to method or body
- return str(_url), _headers, body
-
- @abc.abstractmethod
- def _auth_client(self):
- return
-
- @abc.abstractmethod
- def _auth_params(self):
- return
-
- def _get_auth(self):
- # Bypasses the cache
- auth_func = getattr(self.auth_client, 'get_token')
- auth_params = self._auth_params()
-
- # returns token, auth_data
- token, auth_data = auth_func(**auth_params)
- return token, auth_data
-
- def get_token(self):
- return self.auth_data[0]
-
-
-class KeystoneV2AuthProvider(KeystoneAuthProvider):
-
- EXPIRY_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
-
- def _auth_client(self, auth_url):
- return json_v2id.TokenClientJSON(
- auth_url, disable_ssl_certificate_validation=self.dsvm,
- ca_certs=self.ca_certs, trace_requests=self.trace_requests)
-
- def _auth_params(self):
- return dict(
- user=self.credentials.username,
- password=self.credentials.password,
- tenant=self.credentials.tenant_name,
- auth_data=True)
-
- def _fill_credentials(self, auth_data_body):
- tenant = auth_data_body['token']['tenant']
- user = auth_data_body['user']
- if self.credentials.tenant_name is None:
- self.credentials.tenant_name = tenant['name']
- if self.credentials.tenant_id is None:
- self.credentials.tenant_id = tenant['id']
- if self.credentials.username is None:
- self.credentials.username = user['name']
- if self.credentials.user_id is None:
- self.credentials.user_id = user['id']
-
- def base_url(self, filters, auth_data=None):
- """
- Filters can be:
- - service: compute, image, etc
- - region: the service region
- - endpoint_type: adminURL, publicURL, internalURL
- - api_version: replace catalog version with this
- - skip_path: take just the base URL
- """
- if auth_data is None:
- auth_data = self.auth_data
- token, _auth_data = auth_data
- service = filters.get('service')
- region = filters.get('region')
- endpoint_type = filters.get('endpoint_type', 'publicURL')
-
- if service is None:
- raise exceptions.EndpointNotFound("No service provided")
-
- _base_url = None
- for ep in _auth_data['serviceCatalog']:
- if ep["type"] == service:
- for _ep in ep['endpoints']:
- if region is not None and _ep['region'] == region:
- _base_url = _ep.get(endpoint_type)
- if not _base_url:
- # No region matching, use the first
- _base_url = ep['endpoints'][0].get(endpoint_type)
- break
- if _base_url is None:
- raise exceptions.EndpointNotFound(service)
-
- parts = urlparse.urlparse(_base_url)
- if filters.get('api_version', None) is not None:
- path = "/" + filters['api_version']
- noversion_path = "/".join(parts.path.split("/")[2:])
- if noversion_path != "":
- path += "/" + noversion_path
- _base_url = _base_url.replace(parts.path, path)
- if filters.get('skip_path', None) is not None and parts.path != '':
- _base_url = _base_url.replace(parts.path, "/")
-
- return _base_url
-
- def is_expired(self, auth_data):
- _, access = auth_data
- expiry = datetime.datetime.strptime(access['token']['expires'],
- self.EXPIRY_DATE_FORMAT)
- return expiry - self.token_expiry_threshold <= \
- datetime.datetime.utcnow()
-
-
-class KeystoneV3AuthProvider(KeystoneAuthProvider):
-
- EXPIRY_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
-
- def _auth_client(self, auth_url):
- return json_v3id.V3TokenClientJSON(
- auth_url, disable_ssl_certificate_validation=self.dsvm,
- ca_certs=self.ca_certs, trace_requests=self.trace_requests)
-
- def _auth_params(self):
- return dict(
- user_id=self.credentials.user_id,
- username=self.credentials.username,
- password=self.credentials.password,
- project_id=self.credentials.project_id,
- project_name=self.credentials.project_name,
- user_domain_id=self.credentials.user_domain_id,
- user_domain_name=self.credentials.user_domain_name,
- project_domain_id=self.credentials.project_domain_id,
- project_domain_name=self.credentials.project_domain_name,
- domain_id=self.credentials.domain_id,
- domain_name=self.credentials.domain_name,
- auth_data=True)
-
- def _fill_credentials(self, auth_data_body):
- # project or domain, depending on the scope
- project = auth_data_body.get('project', None)
- domain = auth_data_body.get('domain', None)
- # user is always there
- user = auth_data_body['user']
- # Set project fields
- if project is not None:
- if self.credentials.project_name is None:
- self.credentials.project_name = project['name']
- if self.credentials.project_id is None:
- self.credentials.project_id = project['id']
- if self.credentials.project_domain_id is None:
- self.credentials.project_domain_id = project['domain']['id']
- if self.credentials.project_domain_name is None:
- self.credentials.project_domain_name = \
- project['domain']['name']
- # Set domain fields
- if domain is not None:
- if self.credentials.domain_id is None:
- self.credentials.domain_id = domain['id']
- if self.credentials.domain_name is None:
- self.credentials.domain_name = domain['name']
- # Set user fields
- if self.credentials.username is None:
- self.credentials.username = user['name']
- if self.credentials.user_id is None:
- self.credentials.user_id = user['id']
- if self.credentials.user_domain_id is None:
- self.credentials.user_domain_id = user['domain']['id']
- if self.credentials.user_domain_name is None:
- self.credentials.user_domain_name = user['domain']['name']
-
- def base_url(self, filters, auth_data=None):
- """
- Filters can be:
- - service: compute, image, etc
- - region: the service region
- - endpoint_type: adminURL, publicURL, internalURL
- - api_version: replace catalog version with this
- - skip_path: take just the base URL
- """
- if auth_data is None:
- auth_data = self.auth_data
- token, _auth_data = auth_data
- service = filters.get('service')
- region = filters.get('region')
- endpoint_type = filters.get('endpoint_type', 'public')
-
- if service is None:
- raise exceptions.EndpointNotFound("No service provided")
-
- if 'URL' in endpoint_type:
- endpoint_type = endpoint_type.replace('URL', '')
- _base_url = None
- catalog = _auth_data['catalog']
- # Select entries with matching service type
- service_catalog = [ep for ep in catalog if ep['type'] == service]
- if len(service_catalog) > 0:
- service_catalog = service_catalog[0]['endpoints']
- else:
- # No matching service
- raise exceptions.EndpointNotFound(service)
- # Filter by endpoint type (interface)
- filtered_catalog = [ep for ep in service_catalog if
- ep['interface'] == endpoint_type]
- if len(filtered_catalog) == 0:
- # No matching type, keep all and try matching by region at least
- filtered_catalog = service_catalog
- # Filter by region
- filtered_catalog = [ep for ep in filtered_catalog if
- ep['region'] == region]
- if len(filtered_catalog) == 0:
- # No matching region, take the first endpoint
- filtered_catalog = [service_catalog[0]]
- # There should be only one match. If not take the first.
- _base_url = filtered_catalog[0].get('url', None)
- if _base_url is None:
- raise exceptions.EndpointNotFound(service)
-
- parts = urlparse.urlparse(_base_url)
- if filters.get('api_version', None) is not None:
- path = "/" + filters['api_version']
- noversion_path = "/".join(parts.path.split("/")[2:])
- if noversion_path != "":
- path += "/" + noversion_path
- _base_url = _base_url.replace(parts.path, path)
- if filters.get('skip_path', None) is not None:
- _base_url = _base_url.replace(parts.path, "/")
-
- return _base_url
-
- def is_expired(self, auth_data):
- _, access = auth_data
- expiry = datetime.datetime.strptime(access['expires_at'],
- self.EXPIRY_DATE_FORMAT)
- return expiry - self.token_expiry_threshold <= \
- datetime.datetime.utcnow()
-
-
-def is_identity_version_supported(identity_version):
- return identity_version in IDENTITY_VERSION
-
-
-def get_credentials(auth_url, fill_in=True, identity_version='v2',
- disable_ssl_certificate_validation=None, ca_certs=None,
- trace_requests=None, **kwargs):
- """
- Builds a credentials object based on the configured auth_version
-
- :param auth_url (string): Full URI of the OpenStack Identity API(Keystone)
- which is used to fetch the token from Identity service.
- :param fill_in (boolean): obtain a token and fill in all credential
- details provided by the identity service. When fill_in is not
- specified, credentials are not validated. Validation can be invoked
- by invoking ``is_valid()``
- :param identity_version (string): identity API version is used to
- select the matching auth provider and credentials class
- :param disable_ssl_certificate_validation: whether to enforce SSL
- certificate validation in SSL API requests to the auth system
- :param ca_certs: CA certificate bundle for validation of certificates
- in SSL API requests to the auth system
- :param trace_requests: trace in log API requests to the auth system
- :param kwargs (dict): Dict of credential key/value pairs
-
- Examples:
-
- Returns credentials from the provided parameters:
- >>> get_credentials(username='foo', password='bar')
-
- Returns credentials including IDs:
- >>> get_credentials(username='foo', password='bar', fill_in=True)
- """
- if not is_identity_version_supported(identity_version):
- raise exceptions.InvalidIdentityVersion(
- identity_version=identity_version)
-
- credential_class, auth_provider_class = IDENTITY_VERSION.get(
- identity_version)
-
- creds = credential_class(**kwargs)
- # Fill in the credentials fields that were not specified
- if fill_in:
- dsvm = disable_ssl_certificate_validation
- auth_provider = auth_provider_class(
- creds, auth_url, disable_ssl_certificate_validation=dsvm,
- ca_certs=ca_certs, trace_requests=trace_requests)
- creds = auth_provider.fill_credentials()
- return creds
-
-
-class Credentials(object):
- """
- Set of credentials for accessing OpenStack services
-
- ATTRIBUTES: list of valid class attributes representing credentials.
- """
-
- ATTRIBUTES = []
-
- def __init__(self, **kwargs):
- """
- Enforce the available attributes at init time (only).
- Additional attributes can still be set afterwards if tests need
- to do so.
- """
- self._initial = kwargs
- self._apply_credentials(kwargs)
-
- def _apply_credentials(self, attr):
- for key in attr.keys():
- if key in self.ATTRIBUTES:
- setattr(self, key, attr[key])
- else:
- raise exceptions.InvalidCredentials
-
- def __str__(self):
- """
- Represent only attributes included in self.ATTRIBUTES
- """
- _repr = dict((k, getattr(self, k)) for k in self.ATTRIBUTES)
- return str(_repr)
-
- def __eq__(self, other):
- """
- Credentials are equal if attributes in self.ATTRIBUTES are equal
- """
- return str(self) == str(other)
-
- def __getattr__(self, key):
- # If an attribute is set, __getattr__ is not invoked
- # If an attribute is not set, and it is a known one, return None
- if key in self.ATTRIBUTES:
- return None
- else:
- raise AttributeError
-
- def __delitem__(self, key):
- # For backwards compatibility, support dict behaviour
- if key in self.ATTRIBUTES:
- delattr(self, key)
- else:
- raise AttributeError
-
- def get(self, item, default):
- # In this patch act as dict for backward compatibility
- try:
- return getattr(self, item)
- except AttributeError:
- return default
-
- def get_init_attributes(self):
- return self._initial.keys()
-
- def is_valid(self):
- raise NotImplementedError
-
- def reset(self):
- # First delete all known attributes
- for key in self.ATTRIBUTES:
- if getattr(self, key) is not None:
- delattr(self, key)
- # Then re-apply initial setup
- self._apply_credentials(self._initial)
-
-
-class KeystoneV2Credentials(Credentials):
-
- ATTRIBUTES = ['username', 'password', 'tenant_name', 'user_id',
- 'tenant_id']
-
- def is_valid(self):
- """
- Minimum set of valid credentials, are username and password.
- Tenant is optional.
- """
- return None not in (self.username, self.password)
-
-
-class KeystoneV3Credentials(Credentials):
- """
- Credentials suitable for the Keystone Identity V3 API
- """
-
- ATTRIBUTES = ['domain_id', 'domain_name', 'password', 'username',
- 'project_domain_id', 'project_domain_name', 'project_id',
- 'project_name', 'tenant_id', 'tenant_name', 'user_domain_id',
- 'user_domain_name', 'user_id']
-
- def __setattr__(self, key, value):
- parent = super(KeystoneV3Credentials, self)
- # for tenant_* set both project and tenant
- if key == 'tenant_id':
- parent.__setattr__('project_id', value)
- elif key == 'tenant_name':
- parent.__setattr__('project_name', value)
- # for project_* set both project and tenant
- if key == 'project_id':
- parent.__setattr__('tenant_id', value)
- elif key == 'project_name':
- parent.__setattr__('tenant_name', value)
- # for *_domain_* set both user and project if not set yet
- if key == 'user_domain_id':
- if self.project_domain_id is None:
- parent.__setattr__('project_domain_id', value)
- if key == 'project_domain_id':
- if self.user_domain_id is None:
- parent.__setattr__('user_domain_id', value)
- if key == 'user_domain_name':
- if self.project_domain_name is None:
- parent.__setattr__('project_domain_name', value)
- if key == 'project_domain_name':
- if self.user_domain_name is None:
- parent.__setattr__('user_domain_name', value)
- # support domain_name coming from config
- if key == 'domain_name':
- parent.__setattr__('user_domain_name', value)
- parent.__setattr__('project_domain_name', value)
- # finally trigger default behaviour for all attributes
- parent.__setattr__(key, value)
-
- def is_valid(self):
- """
- Valid combinations of v3 credentials (excluding token, scope)
- - User id, password (optional domain)
- - User name, password and its domain id/name
- For the scope, valid combinations are:
- - None
- - Project id (optional domain)
- - Project name and its domain id/name
- - Domain id
- - Domain name
- """
- valid_user_domain = any(
- [self.user_domain_id is not None,
- self.user_domain_name is not None])
- valid_project_domain = any(
- [self.project_domain_id is not None,
- self.project_domain_name is not None])
- valid_user = any(
- [self.user_id is not None,
- self.username is not None and valid_user_domain])
- valid_project_scope = any(
- [self.project_name is None and self.project_id is None,
- self.project_id is not None,
- self.project_name is not None and valid_project_domain])
- valid_domain_scope = any(
- [self.domain_id is None and self.domain_name is None,
- self.domain_id or self.domain_name])
- return all([self.password is not None,
- valid_user,
- valid_project_scope and valid_domain_scope])
-
-
-IDENTITY_VERSION = {'v2': (KeystoneV2Credentials, KeystoneV2AuthProvider),
- 'v3': (KeystoneV3Credentials, KeystoneV3AuthProvider)}
diff --git a/tempest/clients.py b/tempest/clients.py
index e1b6eab..a0c9471 100644
--- a/tempest/clients.py
+++ b/tempest/clients.py
@@ -16,6 +16,8 @@
import copy
from oslo_log import log as logging
+from tempest_lib.services.identity.v2.token_client import TokenClientJSON
+from tempest_lib.services.identity.v3.token_client import V3TokenClientJSON
from tempest.common import cred_provider
from tempest.common import negative_rest_client
@@ -77,7 +79,6 @@
DatabaseVersionsClientJSON
from tempest.services.identity.v2.json.identity_client import \
IdentityClientJSON
-from tempest.services.identity.v2.json.token_client import TokenClientJSON
from tempest.services.identity.v3.json.credentials_client import \
CredentialsClientJSON
from tempest.services.identity.v3.json.endpoints_client import \
@@ -88,7 +89,6 @@
from tempest.services.identity.v3.json.region_client import RegionClientJSON
from tempest.services.identity.v3.json.service_client import \
ServiceClientJSON
-from tempest.services.identity.v3.json.token_client import V3TokenClientJSON
from tempest.services.image.v1.json.image_client import ImageClientJSON
from tempest.services.image.v2.json.image_client import ImageClientV2JSON
from tempest.services.messaging.json.messaging_client import \
diff --git a/tempest/cmd/javelin.py b/tempest/cmd/javelin.py
index f84771f..b2311b0 100755
--- a/tempest/cmd/javelin.py
+++ b/tempest/cmd/javelin.py
@@ -114,10 +114,10 @@
import netaddr
from oslo_log import log as logging
from oslo_utils import timeutils
+from tempest_lib import auth
from tempest_lib import exceptions as lib_exc
import yaml
-import tempest.auth
from tempest import config
from tempest.services.compute.json import flavors_client
from tempest.services.compute.json import floating_ips_client
@@ -175,7 +175,7 @@
}
object_storage_params.update(default_params)
- _creds = tempest.auth.KeystoneV2Credentials(
+ _creds = auth.KeystoneV2Credentials(
username=user,
password=pw,
tenant_name=tenant)
@@ -185,7 +185,7 @@
'ca_certs': CONF.identity.ca_certificates_file,
'trace_requests': CONF.debug.trace_requests
}
- _auth = tempest.auth.KeystoneV2AuthProvider(
+ _auth = auth.KeystoneV2AuthProvider(
_creds, CONF.identity.uri, **auth_provider_params)
self.identity = identity_client.IdentityClientJSON(
_auth,
diff --git a/tempest/common/cred_provider.py b/tempest/common/cred_provider.py
index 3223027..461097f 100644
--- a/tempest/common/cred_provider.py
+++ b/tempest/common/cred_provider.py
@@ -16,8 +16,8 @@
from oslo_log import log as logging
import six
+from tempest_lib import auth
-from tempest import auth
from tempest import config
from tempest import exceptions
diff --git a/tempest/manager.py b/tempest/manager.py
index 025ce65..c39d3e5 100644
--- a/tempest/manager.py
+++ b/tempest/manager.py
@@ -13,7 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
-from tempest import auth
+from tempest_lib import auth
+
from tempest.common import cred_provider
from tempest import config
from tempest import exceptions
diff --git a/tempest/tests/common/test_accounts.py b/tempest/tests/common/test_accounts.py
index b4048ba..f357bf4 100644
--- a/tempest/tests/common/test_accounts.py
+++ b/tempest/tests/common/test_accounts.py
@@ -20,13 +20,13 @@
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslotest import mockpatch
+from tempest_lib import auth
+from tempest_lib.services.identity.v2 import token_client
-from tempest import auth
from tempest.common import accounts
from tempest.common import cred_provider
from tempest import config
from tempest import exceptions
-from tempest.services.identity.v2.json import token_client
from tempest.tests import base
from tempest.tests import fake_config
from tempest.tests import fake_http
diff --git a/tempest/tests/common/test_cred_provider.py b/tempest/tests/common/test_cred_provider.py
index 76430ac..ed3f931 100644
--- a/tempest/tests/common/test_cred_provider.py
+++ b/tempest/tests/common/test_cred_provider.py
@@ -13,21 +13,21 @@
# under the License.
from oslo_config import cfg
+from tempest_lib import auth
+from tempest_lib import exceptions as lib_exc
+from tempest_lib.services.identity.v2 import token_client as v2_client
+from tempest_lib.services.identity.v3 import token_client as v3_client
-from tempest import auth
+
from tempest.common import cred_provider
from tempest.common import tempest_fixtures as fixtures
from tempest import config
-from tempest.services.identity.v2.json import token_client as v2_client
-from tempest.services.identity.v3.json import token_client as v3_client
+from tempest.tests import base
from tempest.tests import fake_config
from tempest.tests import fake_identity
-# Note(andreaf): once credentials tests move to tempest-lib, I will copy the
-# parts of them required by these here.
-from tempest.tests import test_credentials as test_creds
-class ConfiguredV2CredentialsTests(test_creds.CredentialsTests):
+class ConfiguredV2CredentialsTests(base.TestCase):
attributes = {
'username': 'fake_username',
'password': 'fake_password',
@@ -46,6 +46,23 @@
self.stubs.Set(self.tokenclient_class, 'raw_request',
self.identity_response)
+ def _get_credentials(self, attributes=None):
+ if attributes is None:
+ attributes = self.attributes
+ return self.credentials_class(**attributes)
+
+ def _check(self, credentials, credentials_class, filled):
+ # Check the right version of credentials has been returned
+ self.assertIsInstance(credentials, credentials_class)
+ # Check the id attributes are filled in
+ attributes = [x for x in credentials.ATTRIBUTES if (
+ '_id' in x and x != 'domain_id')]
+ for attr in attributes:
+ if filled:
+ self.assertIsNotNone(getattr(credentials, attr))
+ else:
+ self.assertIsNone(getattr(credentials, attr))
+
def _verify_credentials(self, credentials_class, filled=True,
identity_version=None):
for ctype in cred_provider.CREDENTIAL_TYPES:
@@ -58,6 +75,15 @@
identity_version=identity_version)
self._check(creds, credentials_class, filled)
+ def test_create(self):
+ creds = self._get_credentials()
+ self.assertEqual(self.attributes, creds._initial)
+
+ def test_create_invalid_attr(self):
+ self.assertRaises(lib_exc.InvalidCredentials,
+ self._get_credentials,
+ attributes=dict(invalid='fake'))
+
def test_get_configured_credentials(self):
self.useFixture(fixtures.LockFixture('auth_version'))
self._verify_credentials(credentials_class=self.credentials_class)
diff --git a/tempest/tests/fake_credentials.py b/tempest/tests/fake_credentials.py
deleted file mode 100644
index 649d51d..0000000
--- a/tempest/tests/fake_credentials.py
+++ /dev/null
@@ -1,63 +0,0 @@
-# Copyright 2014 Hewlett-Packard Development Company, L.P.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest import auth
-
-
-class FakeCredentials(auth.Credentials):
-
- def is_valid(self):
- return True
-
-
-class FakeKeystoneV2Credentials(auth.KeystoneV2Credentials):
-
- def __init__(self):
- creds = dict(
- username='fake_username',
- password='fake_password',
- tenant_name='fake_tenant_name'
- )
- super(FakeKeystoneV2Credentials, self).__init__(**creds)
-
-
-class FakeKeystoneV3Credentials(auth.KeystoneV3Credentials):
- """
- Fake credentials suitable for the Keystone Identity V3 API
- """
-
- def __init__(self):
- creds = dict(
- username='fake_username',
- password='fake_password',
- user_domain_name='fake_domain_name',
- project_name='fake_tenant_name',
- project_domain_name='fake_domain_name'
- )
- super(FakeKeystoneV3Credentials, self).__init__(**creds)
-
-
-class FakeKeystoneV3DomainCredentials(auth.KeystoneV3Credentials):
- """
- Fake credentials suitable for the Keystone Identity V3 API, with no scope
- """
-
- def __init__(self):
- creds = dict(
- username='fake_username',
- password='fake_password',
- user_domain_name='fake_domain_name'
- )
- super(FakeKeystoneV3DomainCredentials, self).__init__(**creds)
diff --git a/tempest/tests/test_auth.py b/tempest/tests/test_auth.py
deleted file mode 100644
index eb63b30..0000000
--- a/tempest/tests/test_auth.py
+++ /dev/null
@@ -1,411 +0,0 @@
-# Copyright 2014 IBM Corp.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import copy
-import datetime
-
-from oslotest import mockpatch
-
-from tempest import auth
-from tempest import exceptions
-from tempest.services.identity.v2.json import token_client as v2_client
-from tempest.services.identity.v3.json import token_client as v3_client
-from tempest.tests import base
-from tempest.tests import fake_credentials
-from tempest.tests import fake_http
-from tempest.tests import fake_identity
-
-
-def fake_get_credentials(fill_in=True, identity_version='v2', **kwargs):
- return fake_credentials.FakeCredentials()
-
-
-class BaseAuthTestsSetUp(base.TestCase):
- _auth_provider_class = None
- credentials = fake_credentials.FakeCredentials()
-
- def _auth(self, credentials, auth_url, **params):
- """
- returns auth method according to keystone
- """
- return self._auth_provider_class(credentials, auth_url, **params)
-
- def setUp(self):
- super(BaseAuthTestsSetUp, self).setUp()
- self.fake_http = fake_http.fake_httplib2(return_type=200)
- self.stubs.Set(auth, 'get_credentials', fake_get_credentials)
- self.auth_provider = self._auth(self.credentials,
- fake_identity.FAKE_AUTH_URL)
-
-
-class TestBaseAuthProvider(BaseAuthTestsSetUp):
- """
- This tests auth.AuthProvider class which is base for the other so we
- obviously don't test not implemented method or the ones which strongly
- depends on them.
- """
-
- class FakeAuthProviderImpl(auth.AuthProvider):
- def _decorate_request():
- pass
-
- def _fill_credentials():
- pass
-
- def _get_auth():
- pass
-
- def base_url():
- pass
-
- def is_expired():
- pass
-
- _auth_provider_class = FakeAuthProviderImpl
-
- def _auth(self, credentials, auth_url, **params):
- """
- returns auth method according to keystone
- """
- return self._auth_provider_class(credentials, **params)
-
- def test_check_credentials_bad_type(self):
- self.assertFalse(self.auth_provider.check_credentials([]))
-
- def test_auth_data_property_when_cache_exists(self):
- self.auth_provider.cache = 'foo'
- self.useFixture(mockpatch.PatchObject(self.auth_provider,
- 'is_expired',
- return_value=False))
- self.assertEqual('foo', getattr(self.auth_provider, 'auth_data'))
-
- def test_delete_auth_data_property_through_deleter(self):
- self.auth_provider.cache = 'foo'
- del self.auth_provider.auth_data
- self.assertIsNone(self.auth_provider.cache)
-
- def test_delete_auth_data_property_through_clear_auth(self):
- self.auth_provider.cache = 'foo'
- self.auth_provider.clear_auth()
- self.assertIsNone(self.auth_provider.cache)
-
- def test_set_and_reset_alt_auth_data(self):
- self.auth_provider.set_alt_auth_data('foo', 'bar')
- self.assertEqual(self.auth_provider.alt_part, 'foo')
- self.assertEqual(self.auth_provider.alt_auth_data, 'bar')
-
- self.auth_provider.reset_alt_auth_data()
- self.assertIsNone(self.auth_provider.alt_part)
- self.assertIsNone(self.auth_provider.alt_auth_data)
-
- def test_auth_class(self):
- self.assertRaises(TypeError,
- auth.AuthProvider,
- fake_credentials.FakeCredentials)
-
-
-class TestKeystoneV2AuthProvider(BaseAuthTestsSetUp):
- _endpoints = fake_identity.IDENTITY_V2_RESPONSE['access']['serviceCatalog']
- _auth_provider_class = auth.KeystoneV2AuthProvider
- credentials = fake_credentials.FakeKeystoneV2Credentials()
-
- def setUp(self):
- super(TestKeystoneV2AuthProvider, self).setUp()
- self.stubs.Set(v2_client.TokenClientJSON, 'raw_request',
- fake_identity._fake_v2_response)
- self.target_url = 'test_api'
-
- def _get_fake_alt_identity(self):
- return fake_identity.ALT_IDENTITY_V2_RESPONSE['access']
-
- def _get_result_url_from_endpoint(self, ep, endpoint_type='publicURL',
- replacement=None):
- if replacement:
- return ep[endpoint_type].replace('v2', replacement)
- return ep[endpoint_type]
-
- def _get_token_from_fake_identity(self):
- return fake_identity.TOKEN
-
- def _get_from_fake_identity(self, attr):
- access = fake_identity.IDENTITY_V2_RESPONSE['access']
- if attr == 'user_id':
- return access['user']['id']
- elif attr == 'tenant_id':
- return access['token']['tenant']['id']
-
- def _test_request_helper(self, filters, expected):
- url, headers, body = self.auth_provider.auth_request('GET',
- self.target_url,
- filters=filters)
-
- self.assertEqual(expected['url'], url)
- self.assertEqual(expected['token'], headers['X-Auth-Token'])
- self.assertEqual(expected['body'], body)
-
- def _auth_data_with_expiry(self, date_as_string):
- token, access = self.auth_provider.auth_data
- access['token']['expires'] = date_as_string
- return token, access
-
- def test_request(self):
- filters = {
- 'service': 'compute',
- 'endpoint_type': 'publicURL',
- 'region': 'FakeRegion'
- }
-
- url = self._get_result_url_from_endpoint(
- self._endpoints[0]['endpoints'][1]) + '/' + self.target_url
-
- expected = {
- 'body': None,
- 'url': url,
- 'token': self._get_token_from_fake_identity(),
- }
- self._test_request_helper(filters, expected)
-
- def test_request_with_alt_auth_cleans_alt(self):
- self.auth_provider.set_alt_auth_data(
- 'body',
- (fake_identity.ALT_TOKEN, self._get_fake_alt_identity()))
- self.test_request()
- # Assert alt auth data is clear after it
- self.assertIsNone(self.auth_provider.alt_part)
- self.assertIsNone(self.auth_provider.alt_auth_data)
-
- def test_request_with_alt_part_without_alt_data(self):
- """
- Assert that when alt_part is defined, the corresponding original
- request element is kept the same.
- """
- filters = {
- 'service': 'compute',
- 'endpoint_type': 'publicURL',
- 'region': 'fakeRegion'
- }
- self.auth_provider.set_alt_auth_data('url', None)
-
- url, headers, body = self.auth_provider.auth_request('GET',
- self.target_url,
- filters=filters)
-
- self.assertEqual(url, self.target_url)
- self.assertEqual(self._get_token_from_fake_identity(),
- headers['X-Auth-Token'])
- self.assertEqual(body, None)
-
- def test_request_with_bad_service(self):
- filters = {
- 'service': 'BAD_SERVICE',
- 'endpoint_type': 'publicURL',
- 'region': 'fakeRegion'
- }
- self.assertRaises(exceptions.EndpointNotFound,
- self.auth_provider.auth_request, 'GET',
- self.target_url, filters=filters)
-
- def test_request_without_service(self):
- filters = {
- 'service': None,
- 'endpoint_type': 'publicURL',
- 'region': 'fakeRegion'
- }
- self.assertRaises(exceptions.EndpointNotFound,
- self.auth_provider.auth_request, 'GET',
- self.target_url, filters=filters)
-
- def test_check_credentials_missing_attribute(self):
- for attr in ['username', 'password']:
- cred = copy.copy(self.credentials)
- del cred[attr]
- self.assertFalse(self.auth_provider.check_credentials(cred))
-
- def test_fill_credentials(self):
- self.auth_provider.fill_credentials()
- creds = self.auth_provider.credentials
- for attr in ['user_id', 'tenant_id']:
- self.assertEqual(self._get_from_fake_identity(attr),
- getattr(creds, attr))
-
- def _test_base_url_helper(self, expected_url, filters,
- auth_data=None):
-
- url = self.auth_provider.base_url(filters, auth_data)
- self.assertEqual(url, expected_url)
-
- def test_base_url(self):
- self.filters = {
- 'service': 'compute',
- 'endpoint_type': 'publicURL',
- 'region': 'FakeRegion'
- }
- expected = self._get_result_url_from_endpoint(
- self._endpoints[0]['endpoints'][1])
- self._test_base_url_helper(expected, self.filters)
-
- def test_base_url_to_get_admin_endpoint(self):
- self.filters = {
- 'service': 'compute',
- 'endpoint_type': 'adminURL',
- 'region': 'FakeRegion'
- }
- expected = self._get_result_url_from_endpoint(
- self._endpoints[0]['endpoints'][1], endpoint_type='adminURL')
- self._test_base_url_helper(expected, self.filters)
-
- def test_base_url_unknown_region(self):
- """
- Assure that if the region is unknow the first endpoint is returned.
- """
- self.filters = {
- 'service': 'compute',
- 'endpoint_type': 'publicURL',
- 'region': 'AintNoBodyKnowThisRegion'
- }
- expected = self._get_result_url_from_endpoint(
- self._endpoints[0]['endpoints'][0])
- self._test_base_url_helper(expected, self.filters)
-
- def test_base_url_with_non_existent_service(self):
- self.filters = {
- 'service': 'BAD_SERVICE',
- 'endpoint_type': 'publicURL',
- 'region': 'FakeRegion'
- }
- self.assertRaises(exceptions.EndpointNotFound,
- self._test_base_url_helper, None, self.filters)
-
- def test_base_url_without_service(self):
- self.filters = {
- 'endpoint_type': 'publicURL',
- 'region': 'FakeRegion'
- }
- self.assertRaises(exceptions.EndpointNotFound,
- self._test_base_url_helper, None, self.filters)
-
- def test_base_url_with_api_version_filter(self):
- self.filters = {
- 'service': 'compute',
- 'endpoint_type': 'publicURL',
- 'region': 'FakeRegion',
- 'api_version': 'v12'
- }
- expected = self._get_result_url_from_endpoint(
- self._endpoints[0]['endpoints'][1], replacement='v12')
- self._test_base_url_helper(expected, self.filters)
-
- def test_base_url_with_skip_path_filter(self):
- self.filters = {
- 'service': 'compute',
- 'endpoint_type': 'publicURL',
- 'region': 'FakeRegion',
- 'skip_path': True
- }
- expected = 'http://fake_url/'
- self._test_base_url_helper(expected, self.filters)
-
- def test_token_not_expired(self):
- expiry_data = datetime.datetime.utcnow() + datetime.timedelta(days=1)
- auth_data = self._auth_data_with_expiry(
- expiry_data.strftime(self.auth_provider.EXPIRY_DATE_FORMAT))
- self.assertFalse(self.auth_provider.is_expired(auth_data))
-
- def test_token_expired(self):
- expiry_data = datetime.datetime.utcnow() - datetime.timedelta(hours=1)
- auth_data = self._auth_data_with_expiry(
- expiry_data.strftime(self.auth_provider.EXPIRY_DATE_FORMAT))
- self.assertTrue(self.auth_provider.is_expired(auth_data))
-
- def test_token_not_expired_to_be_renewed(self):
- expiry_data = datetime.datetime.utcnow() + \
- self.auth_provider.token_expiry_threshold / 2
- auth_data = self._auth_data_with_expiry(
- expiry_data.strftime(self.auth_provider.EXPIRY_DATE_FORMAT))
- self.assertTrue(self.auth_provider.is_expired(auth_data))
-
-
-class TestKeystoneV3AuthProvider(TestKeystoneV2AuthProvider):
- _endpoints = fake_identity.IDENTITY_V3_RESPONSE['token']['catalog']
- _auth_provider_class = auth.KeystoneV3AuthProvider
- credentials = fake_credentials.FakeKeystoneV3Credentials()
-
- def setUp(self):
- super(TestKeystoneV3AuthProvider, self).setUp()
- self.stubs.Set(v3_client.V3TokenClientJSON, 'raw_request',
- fake_identity._fake_v3_response)
-
- def _get_fake_alt_identity(self):
- return fake_identity.ALT_IDENTITY_V3['token']
-
- def _get_result_url_from_endpoint(self, ep, replacement=None):
- if replacement:
- return ep['url'].replace('v3', replacement)
- return ep['url']
-
- def _auth_data_with_expiry(self, date_as_string):
- token, access = self.auth_provider.auth_data
- access['expires_at'] = date_as_string
- return token, access
-
- def _get_from_fake_identity(self, attr):
- token = fake_identity.IDENTITY_V3_RESPONSE['token']
- if attr == 'user_id':
- return token['user']['id']
- elif attr == 'project_id':
- return token['project']['id']
- elif attr == 'user_domain_id':
- return token['user']['domain']['id']
- elif attr == 'project_domain_id':
- return token['project']['domain']['id']
-
- def test_check_credentials_missing_attribute(self):
- # reset credentials to fresh ones
- self.credentials.reset()
- for attr in ['username', 'password', 'user_domain_name',
- 'project_domain_name']:
- cred = copy.copy(self.credentials)
- del cred[attr]
- self.assertFalse(self.auth_provider.check_credentials(cred),
- "Credentials should be invalid without %s" % attr)
-
- def test_check_domain_credentials_missing_attribute(self):
- # reset credentials to fresh ones
- self.credentials.reset()
- domain_creds = fake_credentials.FakeKeystoneV3DomainCredentials()
- for attr in ['username', 'password', 'user_domain_name']:
- cred = copy.copy(domain_creds)
- del cred[attr]
- self.assertFalse(self.auth_provider.check_credentials(cred),
- "Credentials should be invalid without %s" % attr)
-
- def test_fill_credentials(self):
- self.auth_provider.fill_credentials()
- creds = self.auth_provider.credentials
- for attr in ['user_id', 'project_id', 'user_domain_id',
- 'project_domain_id']:
- self.assertEqual(self._get_from_fake_identity(attr),
- getattr(creds, attr))
-
- # Overwrites v2 test
- def test_base_url_to_get_admin_endpoint(self):
- self.filters = {
- 'service': 'compute',
- 'endpoint_type': 'admin',
- 'region': 'MiddleEarthRegion'
- }
- expected = self._get_result_url_from_endpoint(
- self._endpoints[0]['endpoints'][2])
- self._test_base_url_helper(expected, self.filters)
diff --git a/tempest/tests/test_credentials.py b/tempest/tests/test_credentials.py
deleted file mode 100644
index bf44d11..0000000
--- a/tempest/tests/test_credentials.py
+++ /dev/null
@@ -1,180 +0,0 @@
-# Copyright 2014 Hewlett-Packard Development Company, L.P.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import copy
-
-from tempest import auth
-from tempest import exceptions
-from tempest.services.identity.v2.json import token_client as v2_client
-from tempest.services.identity.v3.json import token_client as v3_client
-from tempest.tests import base
-from tempest.tests import fake_identity
-
-
-class CredentialsTests(base.TestCase):
- attributes = {}
- credentials_class = auth.Credentials
-
- def _get_credentials(self, attributes=None):
- if attributes is None:
- attributes = self.attributes
- return self.credentials_class(**attributes)
-
- def _check(self, credentials, credentials_class, filled):
- # Check the right version of credentials has been returned
- self.assertIsInstance(credentials, credentials_class)
- # Check the id attributes are filled in
- attributes = [x for x in credentials.ATTRIBUTES if (
- '_id' in x and x != 'domain_id')]
- for attr in attributes:
- if filled:
- self.assertIsNotNone(getattr(credentials, attr))
- else:
- self.assertIsNone(getattr(credentials, attr))
-
- def test_create(self):
- creds = self._get_credentials()
- self.assertEqual(self.attributes, creds._initial)
-
- def test_create_invalid_attr(self):
- self.assertRaises(exceptions.InvalidCredentials,
- self._get_credentials,
- attributes=dict(invalid='fake'))
-
- def test_is_valid(self):
- creds = self._get_credentials()
- self.assertRaises(NotImplementedError, creds.is_valid)
-
-
-class KeystoneV2CredentialsTests(CredentialsTests):
- attributes = {
- 'username': 'fake_username',
- 'password': 'fake_password',
- 'tenant_name': 'fake_tenant_name'
- }
-
- identity_response = fake_identity._fake_v2_response
- credentials_class = auth.KeystoneV2Credentials
- tokenclient_class = v2_client.TokenClientJSON
- identity_version = 'v2'
-
- def setUp(self):
- super(KeystoneV2CredentialsTests, self).setUp()
- self.stubs.Set(self.tokenclient_class, 'raw_request',
- self.identity_response)
-
- def _verify_credentials(self, credentials_class, creds_dict, filled=True):
- creds = auth.get_credentials(fake_identity.FAKE_AUTH_URL,
- fill_in=filled,
- identity_version=self.identity_version,
- **creds_dict)
- self._check(creds, credentials_class, filled)
-
- def test_get_credentials(self):
- self._verify_credentials(credentials_class=self.credentials_class,
- creds_dict=self.attributes)
-
- def test_get_credentials_not_filled(self):
- self._verify_credentials(credentials_class=self.credentials_class,
- creds_dict=self.attributes,
- filled=False)
-
- def test_is_valid(self):
- creds = self._get_credentials()
- self.assertTrue(creds.is_valid())
-
- def _test_is_not_valid(self, ignore_key):
- creds = self._get_credentials()
- for attr in self.attributes.keys():
- if attr == ignore_key:
- continue
- temp_attr = getattr(creds, attr)
- delattr(creds, attr)
- self.assertFalse(creds.is_valid(),
- "Credentials should be invalid without %s" % attr)
- setattr(creds, attr, temp_attr)
-
- def test_is_not_valid(self):
- # NOTE(mtreinish): A KeystoneV2 credential object is valid without
- # a tenant_name. So skip that check. See tempest.auth for the valid
- # credential requirements
- self._test_is_not_valid('tenant_name')
-
- def test_reset_all_attributes(self):
- creds = self._get_credentials()
- initial_creds = copy.deepcopy(creds)
- set_attr = creds.__dict__.keys()
- missing_attr = set(creds.ATTRIBUTES).difference(set_attr)
- # Set all unset attributes, then reset
- for attr in missing_attr:
- setattr(creds, attr, 'fake' + attr)
- creds.reset()
- # Check reset credentials are same as initial ones
- self.assertEqual(creds, initial_creds)
-
- def test_reset_single_attribute(self):
- creds = self._get_credentials()
- initial_creds = copy.deepcopy(creds)
- set_attr = creds.__dict__.keys()
- missing_attr = set(creds.ATTRIBUTES).difference(set_attr)
- # Set one unset attributes, then reset
- for attr in missing_attr:
- setattr(creds, attr, 'fake' + attr)
- creds.reset()
- # Check reset credentials are same as initial ones
- self.assertEqual(creds, initial_creds)
-
-
-class KeystoneV3CredentialsTests(KeystoneV2CredentialsTests):
- attributes = {
- 'username': 'fake_username',
- 'password': 'fake_password',
- 'project_name': 'fake_project_name',
- 'user_domain_name': 'fake_domain_name'
- }
-
- credentials_class = auth.KeystoneV3Credentials
- identity_response = fake_identity._fake_v3_response
- tokenclient_class = v3_client.V3TokenClientJSON
- identity_version = 'v3'
-
- def test_is_not_valid(self):
- # NOTE(mtreinish) For a Keystone V3 credential object a project name
- # is not required to be valid, so we skip that check. See tempest.auth
- # for the valid credential requirements
- self._test_is_not_valid('project_name')
-
- def test_synced_attributes(self):
- attributes = self.attributes
- # Create V3 credentials with tenant instead of project, and user_domain
- for attr in ['project_id', 'user_domain_id']:
- attributes[attr] = 'fake_' + attr
- creds = self._get_credentials(attributes)
- self.assertEqual(creds.project_name, creds.tenant_name)
- self.assertEqual(creds.project_id, creds.tenant_id)
- self.assertEqual(creds.user_domain_name, creds.project_domain_name)
- self.assertEqual(creds.user_domain_id, creds.project_domain_id)
- # Replace user_domain with project_domain
- del attributes['user_domain_name']
- del attributes['user_domain_id']
- del attributes['project_name']
- del attributes['project_id']
- for attr in ['project_domain_name', 'project_domain_id',
- 'tenant_name', 'tenant_id']:
- attributes[attr] = 'fake_' + attr
- self.assertEqual(creds.tenant_name, creds.project_name)
- self.assertEqual(creds.tenant_id, creds.project_id)
- self.assertEqual(creds.project_domain_name, creds.user_domain_name)
- self.assertEqual(creds.project_domain_id, creds.user_domain_id)
diff --git a/tempest/tests/test_tenant_isolation.py b/tempest/tests/test_tenant_isolation.py
index fd8718f..1eb33a4 100644
--- a/tempest/tests/test_tenant_isolation.py
+++ b/tempest/tests/test_tenant_isolation.py
@@ -15,6 +15,7 @@
import mock
from oslo_config import cfg
from oslotest import mockpatch
+from tempest_lib.services.identity.v2 import token_client as json_token_client
from tempest.common import isolated_creds
from tempest.common import service_client
@@ -22,7 +23,6 @@
from tempest import exceptions
from tempest.services.identity.v2.json import identity_client as \
json_iden_client
-from tempest.services.identity.v2.json import token_client as json_token_client
from tempest.services.network.json import network_client as json_network_client
from tempest.tests import base
from tempest.tests import fake_config