Merge "Skip creation of network resources for disabled IPv6 tests"
diff --git a/etc/tempest.conf.sample b/etc/tempest.conf.sample
index 4a567e7..70950b7 100644
--- a/etc/tempest.conf.sample
+++ b/etc/tempest.conf.sample
@@ -350,6 +350,10 @@
# API key to use when authenticating as admin. (string value)
#password=<None>
+# Domain name for authentication as admin (Keystone V3).The
+# same domain applies to user and project (string value)
+#domain_name=<None>
+
[compute-feature-enabled]
@@ -516,6 +520,10 @@
# API key to use when authenticating. (string value)
#password=<None>
+# Domain name for authentication (Keystone V3).The same domain
+# applies to user and project (string value)
+#domain_name=<None>
+
# Username of alternate user to use for Nova API requests.
# (string value)
#alt_username=<None>
@@ -528,6 +536,10 @@
# (string value)
#alt_password=<None>
+# Alternate domain name for authentication (Keystone V3).The
+# same domain applies to user and project (string value)
+#alt_domain_name=<None>
+
# Administrative Username to use for Keystone API requests.
# (string value)
#admin_username=<None>
@@ -539,6 +551,10 @@
# API key to use when authenticating as admin. (string value)
#admin_password=<None>
+# Admin domain name for authentication (Keystone V3).The same
+# domain applies to user and project (string value)
+#admin_domain_name=<None>
+
[identity-feature-enabled]
diff --git a/tempest/auth.py b/tempest/auth.py
index ac8cbd1..9c51edb 100644
--- a/tempest/auth.py
+++ b/tempest/auth.py
@@ -80,6 +80,17 @@
def _get_auth(self):
raise NotImplementedError
+ def _fill_credentials(self, auth_data_body):
+ raise NotImplementedError
+
+ def fill_credentials(self):
+ """
+ Fill credentials object with data from auth
+ """
+ auth_data = self.get_auth()
+ self._fill_credentials(auth_data[1])
+ return self.credentials
+
@classmethod
def check_credentials(cls, credentials):
"""
@@ -89,20 +100,35 @@
@property
def auth_data(self):
- if self.cache is None or self.is_expired(self.cache):
- self.cache = self._get_auth()
- return self.cache
+ return self.get_auth()
@auth_data.deleter
def auth_data(self):
self.clear_auth()
+ def get_auth(self):
+ """
+ Returns auth from cache if available, else auth first
+ """
+ if self.cache is None or self.is_expired(self.cache):
+ self.set_auth()
+ return self.cache
+
+ def set_auth(self):
+ """
+ Forces setting auth, ignores cache if it exists.
+ Refills credentials
+ """
+ self.cache = self._get_auth()
+ self._fill_credentials(self.cache[1])
+
def clear_auth(self):
"""
Can be called to clear the access cache so that next request
will fetch a new token and base_url.
"""
self.cache = None
+ self.credentials.reset()
def is_expired(self, auth_data):
raise NotImplementedError
@@ -244,6 +270,18 @@
else:
raise NotImplementedError
+ def _fill_credentials(self, auth_data_body):
+ tenant = auth_data_body['token']['tenant']
+ user = auth_data_body['user']
+ if self.credentials.tenant_name is None:
+ self.credentials.tenant_name = tenant['name']
+ if self.credentials.tenant_id is None:
+ self.credentials.tenant_id = tenant['id']
+ if self.credentials.username is None:
+ self.credentials.username = user['name']
+ if self.credentials.user_id is None:
+ self.credentials.user_id = user['id']
+
def base_url(self, filters, auth_data=None):
"""
Filters can be:
@@ -300,19 +338,6 @@
EXPIRY_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
- def _convert_credentials(self, credentials):
- # For V3 do not convert as V3 Credentials are not defined yet
- return credentials
-
- @classmethod
- def check_credentials(cls, credentials, scoped=True):
- # tenant_name is optional if not scoped
- valid = 'username' in credentials and 'password' in credentials \
- and 'domain_name' in credentials
- if scoped:
- valid = valid and 'tenant_name' in credentials
- return valid
-
def _auth_client(self):
if self.client_type == 'tempest':
if self.interface == 'json':
@@ -325,14 +350,47 @@
def _auth_params(self):
if self.client_type == 'tempest':
return dict(
- user=self.credentials['username'],
- password=self.credentials['password'],
- tenant=self.credentials['tenant_name'],
- domain=self.credentials['domain_name'],
+ user=self.credentials.username,
+ password=self.credentials.password,
+ tenant=self.credentials.tenant_name,
+ domain=self.credentials.user_domain_name,
auth_data=True)
else:
raise NotImplementedError
+ def _fill_credentials(self, auth_data_body):
+ # project or domain, depending on the scope
+ project = auth_data_body.get('project', None)
+ domain = auth_data_body.get('domain', None)
+ # user is always there
+ user = auth_data_body['user']
+ # Set project fields
+ if project is not None:
+ if self.credentials.project_name is None:
+ self.credentials.project_name = project['name']
+ if self.credentials.project_id is None:
+ self.credentials.project_id = project['id']
+ if self.credentials.project_domain_id is None:
+ self.credentials.project_domain_id = project['domain']['id']
+ if self.credentials.project_domain_name is None:
+ self.credentials.project_domain_name = \
+ project['domain']['name']
+ # Set domain fields
+ if domain is not None:
+ if self.credentials.domain_id is None:
+ self.credentials.domain_id = domain['id']
+ if self.credentials.domain_name is None:
+ self.credentials.domain_name = domain['name']
+ # Set user fields
+ if self.credentials.username is None:
+ self.credentials.username = user['name']
+ if self.credentials.user_id is None:
+ self.credentials.user_id = user['id']
+ if self.credentials.user_domain_id is None:
+ self.credentials.user_domain_id = user['domain']['id']
+ if self.credentials.user_domain_name is None:
+ self.credentials.user_domain_name = user['domain']['name']
+
def base_url(self, filters, auth_data=None):
"""
Filters can be:
@@ -400,7 +458,15 @@
datetime.datetime.utcnow()
-def get_credentials(credential_type=None, **kwargs):
+def get_default_credentials(credential_type, fill_in=True):
+ """
+ Returns configured credentials of the specified type
+ based on the configured auth_version
+ """
+ return get_credentials(fill_in=fill_in, credential_type=credential_type)
+
+
+def get_credentials(credential_type=None, fill_in=True, **kwargs):
"""
Builds a credentials object based on the configured auth_version
@@ -420,12 +486,20 @@
"""
if CONF.identity.auth_version == 'v2':
credential_class = KeystoneV2Credentials
+ auth_provider_class = KeystoneV2AuthProvider
+ elif CONF.identity.auth_version == 'v3':
+ credential_class = KeystoneV3Credentials
+ auth_provider_class = KeystoneV3AuthProvider
else:
raise exceptions.InvalidConfiguration('Unsupported auth version')
if credential_type is not None:
creds = credential_class.get_default(credential_type)
else:
creds = credential_class(**kwargs)
+ # Fill in the credentials fields that were not specified
+ if fill_in:
+ auth_provider = auth_provider_class(creds)
+ creds = auth_provider.fill_credentials()
return creds
@@ -454,6 +528,7 @@
Additional attributes can still be set afterwards if tests need
to do so.
"""
+ self._initial = kwargs
self._apply_credentials(kwargs)
def _apply_credentials(self, attr):
@@ -514,6 +589,14 @@
def is_valid(self):
raise NotImplementedError
+ def reset(self):
+ # First delete all known attributes
+ for key in self.ATTRIBUTES:
+ if getattr(self, key) is not None:
+ delattr(self, key)
+ # Then re-apply initial setup
+ self._apply_credentials(self._initial)
+
class KeystoneV2Credentials(Credentials):
@@ -531,7 +614,7 @@
params[attr] = getattr(_section, attr)
else:
params[attr] = getattr(_section, prefix + "_" + attr)
- return KeystoneV2Credentials(**params)
+ return cls(**params)
def is_valid(self):
"""
@@ -539,3 +622,82 @@
Tenant is optional.
"""
return None not in (self.username, self.password)
+
+
+class KeystoneV3Credentials(KeystoneV2Credentials):
+ """
+ Credentials suitable for the Keystone Identity V3 API
+ """
+
+ CONF_ATTRIBUTES = ['domain_name', 'password', 'tenant_name', 'username']
+ ATTRIBUTES = ['project_domain_id', 'project_domain_name', 'project_id',
+ 'project_name', 'tenant_id', 'tenant_name', 'user_domain_id',
+ 'user_domain_name', 'user_id']
+ ATTRIBUTES.extend(CONF_ATTRIBUTES)
+
+ def __init__(self, **kwargs):
+ """
+ If domain is not specified, load the one configured for the
+ identity manager.
+ """
+ domain_fields = set(x for x in self.ATTRIBUTES if 'domain' in x)
+ if not domain_fields.intersection(kwargs.keys()):
+ kwargs['user_domain_name'] = CONF.identity.admin_domain_name
+ super(KeystoneV3Credentials, self).__init__(**kwargs)
+
+ def __setattr__(self, key, value):
+ parent = super(KeystoneV3Credentials, self)
+ # for tenant_* set both project and tenant
+ if key == 'tenant_id':
+ parent.__setattr__('project_id', value)
+ elif key == 'tenant_name':
+ parent.__setattr__('project_name', value)
+ # for project_* set both project and tenant
+ if key == 'project_id':
+ parent.__setattr__('tenant_id', value)
+ elif key == 'project_name':
+ parent.__setattr__('tenant_name', value)
+ # for *_domain_* set both user and project if not set yet
+ if key == 'user_domain_id':
+ if self.project_domain_id is None:
+ parent.__setattr__('project_domain_id', value)
+ if key == 'project_domain_id':
+ if self.user_domain_id is None:
+ parent.__setattr__('user_domain_id', value)
+ if key == 'user_domain_name':
+ if self.project_domain_name is None:
+ parent.__setattr__('project_domain_name', value)
+ if key == 'project_domain_name':
+ if self.user_domain_name is None:
+ parent.__setattr__('user_domain_name', value)
+ # support domain_name coming from config
+ if key == 'domain_name':
+ parent.__setattr__('user_domain_name', value)
+ parent.__setattr__('project_domain_name', value)
+ # finally trigger default behaviour for all attributes
+ parent.__setattr__(key, value)
+
+ def is_valid(self):
+ """
+ Valid combinations of v3 credentials (excluding token, scope)
+ - User id, password (optional domain)
+ - User name, password and its domain id/name
+ For the scope, valid combinations are:
+ - None
+ - Project id (optional domain)
+ - Project name and its domain id/name
+ """
+ valid_user_domain = any(
+ [self.user_domain_id is not None,
+ self.user_domain_name is not None])
+ valid_project_domain = any(
+ [self.project_domain_id is not None,
+ self.project_domain_name is not None])
+ valid_user = any(
+ [self.user_id is not None,
+ self.username is not None and valid_user_domain])
+ valid_project = any(
+ [self.project_name is None and self.project_id is None,
+ self.project_id is not None,
+ self.project_name is not None and valid_project_domain])
+ return all([self.password is not None, valid_user, valid_project])
diff --git a/tempest/common/isolated_creds.py b/tempest/common/isolated_creds.py
index c54a8e8..9a50c0b 100644
--- a/tempest/common/isolated_creds.py
+++ b/tempest/common/isolated_creds.py
@@ -17,6 +17,7 @@
import keystoneclient.v2_0.client as keystoneclient
import neutronclient.v2_0.client as neutronclient
+from tempest import auth
from tempest import clients
from tempest.common.utils import data_utils
from tempest import config
@@ -33,6 +34,7 @@
password='pass', network_resources=None):
self.network_resources = network_resources
self.isolated_creds = {}
+ self.isolated_creds_old_style = {}
self.isolated_net_resources = {}
self.ports = []
self.name = name
@@ -185,22 +187,19 @@
self._assign_user_role(tenant['id'], user['id'], role['id'])
else:
self._assign_user_role(tenant.id, user.id, role.id)
- return user, tenant
+ return self._get_credentials(user, tenant), user, tenant
- def _get_cred_names(self, user, tenant):
+ def _get_credentials(self, user, tenant):
if self.tempest_client:
- username = user.get('name')
- tenant_name = tenant.get('name')
+ user_get = user.get
+ tenant_get = tenant.get
else:
- username = user.name
- tenant_name = tenant.name
- return username, tenant_name
-
- def _get_tenant_id(self, tenant):
- if self.tempest_client:
- return tenant.get('id')
- else:
- return tenant.id
+ user_get = user.__dict__.get
+ tenant_get = tenant.__dict__.get
+ return auth.get_credentials(
+ username=user_get('name'), user_id=user_get('id'),
+ tenant_name=tenant_get('name'), tenant_id=tenant_get('id'),
+ password=self.password)
def _create_network_resources(self, tenant_id):
network = None
@@ -315,22 +314,28 @@
self.network_admin_client.add_interface_router(router_id, body)
def get_primary_tenant(self):
- return self.isolated_creds.get('primary')[1]
+ # Deprecated. Maintained until all tests are ported
+ return self.isolated_creds_old_style.get('primary')[1]
def get_primary_user(self):
- return self.isolated_creds.get('primary')[0]
+ # Deprecated. Maintained until all tests are ported
+ return self.isolated_creds_old_style.get('primary')[0]
def get_alt_tenant(self):
- return self.isolated_creds.get('alt')[1]
+ # Deprecated. Maintained until all tests are ported
+ return self.isolated_creds_old_style.get('alt')[1]
def get_alt_user(self):
- return self.isolated_creds.get('alt')[0]
+ # Deprecated. Maintained until all tests are ported
+ return self.isolated_creds_old_style.get('alt')[0]
def get_admin_tenant(self):
- return self.isolated_creds.get('admin')[1]
+ # Deprecated. Maintained until all tests are ported
+ return self.isolated_creds_old_style.get('admin')[1]
def get_admin_user(self):
- return self.isolated_creds.get('admin')[0]
+ # Deprecated. Maintained until all tests are ported
+ return self.isolated_creds_old_style.get('admin')[0]
def get_primary_network(self):
return self.isolated_net_resources.get('primary')[0]
@@ -359,62 +364,38 @@
def get_alt_router(self):
return self.isolated_net_resources.get('alt')[2]
- def get_primary_creds(self):
- if self.isolated_creds.get('primary'):
- user, tenant = self.isolated_creds['primary']
- username, tenant_name = self._get_cred_names(user, tenant)
+ def get_credentials(self, credential_type, old_style):
+ if self.isolated_creds.get(credential_type):
+ credentials = self.isolated_creds[credential_type]
else:
- user, tenant = self._create_creds()
- username, tenant_name = self._get_cred_names(user, tenant)
- self.isolated_creds['primary'] = (user, tenant)
- LOG.info("Acquired isolated creds:\n user: %s, tenant: %s"
- % (username, tenant_name))
+ is_admin = (credential_type == 'admin')
+ credentials, user, tenant = self._create_creds(admin=is_admin)
+ self.isolated_creds[credential_type] = credentials
+ # Maintained until tests are ported
+ self.isolated_creds_old_style[credential_type] = (user, tenant)
+ LOG.info("Acquired isolated creds:\n credentials: %s"
+ % credentials)
if CONF.service_available.neutron:
network, subnet, router = self._create_network_resources(
- self._get_tenant_id(tenant))
- self.isolated_net_resources['primary'] = (
+ credentials.tenant_id)
+ self.isolated_net_resources[credential_type] = (
network, subnet, router,)
LOG.info("Created isolated network resources for : \n"
- + " user: %s, tenant: %s" % (username, tenant_name))
- return username, tenant_name, self.password
+ + " credentials: %s" % credentials)
+ if old_style:
+ return (credentials.username, credentials.tenant_name,
+ credentials.password)
+ else:
+ return credentials
- def get_admin_creds(self):
- if self.isolated_creds.get('admin'):
- user, tenant = self.isolated_creds['admin']
- username, tenant_name = self._get_cred_names(user, tenant)
- else:
- user, tenant = self._create_creds(admin=True)
- username, tenant_name = self._get_cred_names(user, tenant)
- self.isolated_creds['admin'] = (user, tenant)
- LOG.info("Acquired admin isolated creds:\n user: %s, tenant: %s"
- % (username, tenant_name))
- if CONF.service_available.neutron:
- network, subnet, router = self._create_network_resources(
- self._get_tenant_id(tenant))
- self.isolated_net_resources['admin'] = (
- network, subnet, router,)
- LOG.info("Created isolated network resources for : \n"
- + " user: %s, tenant: %s" % (username, tenant_name))
- return username, tenant_name, self.password
+ def get_primary_creds(self, old_style=True):
+ return self.get_credentials('primary', old_style)
- def get_alt_creds(self):
- if self.isolated_creds.get('alt'):
- user, tenant = self.isolated_creds['alt']
- username, tenant_name = self._get_cred_names(user, tenant)
- else:
- user, tenant = self._create_creds()
- username, tenant_name = self._get_cred_names(user, tenant)
- self.isolated_creds['alt'] = (user, tenant)
- LOG.info("Acquired alt isolated creds:\n user: %s, tenant: %s"
- % (username, tenant_name))
- if CONF.service_available.neutron:
- network, subnet, router = self._create_network_resources(
- self._get_tenant_id(tenant))
- self.isolated_net_resources['alt'] = (
- network, subnet, router,)
- LOG.info("Created isolated network resources for : \n"
- + " user: %s, tenant: %s" % (username, tenant_name))
- return username, tenant_name, self.password
+ def get_admin_creds(self, old_style=True):
+ return self.get_credentials('admin', old_style)
+
+ def get_alt_creds(self, old_style=True):
+ return self.get_credentials('alt', old_style)
def _clear_isolated_router(self, router_id, router_name):
net_client = self.network_admin_client
@@ -505,29 +486,16 @@
if not self.isolated_creds:
return
self._clear_isolated_net_resources()
- for cred in self.isolated_creds:
- user, tenant = self.isolated_creds.get(cred)
+ for creds in self.isolated_creds.itervalues():
try:
- if self.tempest_client:
- self._delete_user(user['id'])
- else:
- self._delete_user(user.id)
+ self._delete_user(creds.user_id)
except exceptions.NotFound:
- if self.tempest_client:
- name = user['name']
- else:
- name = user.name
- LOG.warn("user with name: %s not found for delete" % name)
+ LOG.warn("user with name: %s not found for delete" %
+ creds.username)
pass
try:
- if self.tempest_client:
- self._delete_tenant(tenant['id'])
- else:
- self._delete_tenant(tenant.id)
+ self._delete_tenant(creds.tenant_id)
except exceptions.NotFound:
- if self.tempest_client:
- name = tenant['name']
- else:
- name = tenant.name
- LOG.warn("tenant with name: %s not found for delete" % name)
+ LOG.warn("tenant with name: %s not found for delete" %
+ creds.tenant_name)
pass
diff --git a/tempest/config.py b/tempest/config.py
index 7084768..634634d 100644
--- a/tempest/config.py
+++ b/tempest/config.py
@@ -72,6 +72,10 @@
default=None,
help="API key to use when authenticating.",
secret=True),
+ cfg.StrOpt('domain_name',
+ default=None,
+ help="Domain name for authentication (Keystone V3)."
+ "The same domain applies to user and project"),
cfg.StrOpt('alt_username',
default=None,
help="Username of alternate user to use for Nova API "
@@ -84,6 +88,10 @@
default=None,
help="API key to use when authenticating as alternate user.",
secret=True),
+ cfg.StrOpt('alt_domain_name',
+ default=None,
+ help="Alternate domain name for authentication (Keystone V3)."
+ "The same domain applies to user and project"),
cfg.StrOpt('admin_username',
default=None,
help="Administrative Username to use for "
@@ -96,6 +104,10 @@
default=None,
help="API key to use when authenticating as admin.",
secret=True),
+ cfg.StrOpt('admin_domain_name',
+ default=None,
+ help="Admin domain name for authentication (Keystone V3)."
+ "The same domain applies to user and project"),
]
identity_feature_group = cfg.OptGroup(name='identity-feature-enabled',
@@ -302,6 +314,10 @@
default=None,
help="API key to use when authenticating as admin.",
secret=True),
+ cfg.StrOpt('domain_name',
+ default=None,
+ help="Domain name for authentication as admin (Keystone V3)."
+ "The same domain applies to user and project"),
]
image_group = cfg.OptGroup(name='image',
@@ -996,6 +1012,13 @@
self.compute_admin.username = self.identity.admin_username
self.compute_admin.password = self.identity.admin_password
self.compute_admin.tenant_name = self.identity.admin_tenant_name
+ cfg.CONF.set_default('domain_name', self.identity.admin_domain_name,
+ group='identity')
+ cfg.CONF.set_default('alt_domain_name',
+ self.identity.admin_domain_name,
+ group='identity')
+ cfg.CONF.set_default('domain_name', self.identity.admin_domain_name,
+ group='compute-admin')
def __init__(self, parse_conf=True):
"""Initialize a configuration from a conf directory and conf file."""
diff --git a/tempest/tests/fake_auth_provider.py b/tempest/tests/fake_auth_provider.py
index ddffb4a..44c331e 100644
--- a/tempest/tests/fake_auth_provider.py
+++ b/tempest/tests/fake_auth_provider.py
@@ -16,6 +16,10 @@
from tempest.tests import fake_credentials
+def get_default_credentials(credential_type, fill_in=True):
+ return fake_credentials.FakeCredentials()
+
+
def get_credentials(credential_type=None, fill_in=True, **kwargs):
return fake_credentials.FakeCredentials()
diff --git a/tempest/tests/fake_credentials.py b/tempest/tests/fake_credentials.py
index a372973..48f67d2 100644
--- a/tempest/tests/fake_credentials.py
+++ b/tempest/tests/fake_credentials.py
@@ -31,3 +31,32 @@
tenant_name='fake_tenant_name'
)
super(FakeKeystoneV2Credentials, self).__init__(**creds)
+
+
+class FakeKeystoneV3Credentials(auth.KeystoneV3Credentials):
+ """
+ Fake credentials suitable for the Keystone Identity V3 API
+ """
+
+ def __init__(self):
+ creds = dict(
+ username='fake_username',
+ password='fake_password',
+ user_domain_name='fake_domain_name',
+ project_name='fake_tenant_name'
+ )
+ super(FakeKeystoneV3Credentials, self).__init__(**creds)
+
+
+class FakeKeystoneV3DomainCredentials(auth.KeystoneV3Credentials):
+ """
+ Fake credentials suitable for the Keystone Identity V3 API, with no scope
+ """
+
+ def __init__(self):
+ creds = dict(
+ username='fake_username',
+ password='fake_password',
+ user_domain_name='fake_domain_name'
+ )
+ super(FakeKeystoneV3DomainCredentials, self).__init__(**creds)
diff --git a/tempest/tests/fake_identity.py b/tempest/tests/fake_identity.py
index 058c9c2..1900fc9 100644
--- a/tempest/tests/fake_identity.py
+++ b/tempest/tests/fake_identity.py
@@ -113,7 +113,7 @@
"expires_at": "2020-01-01T00:00:10.000123Z",
"project": {
"domain": {
- "id": "fake_id",
+ "id": "fake_domain_id",
"name": "fake"
},
"id": "project_id",
@@ -121,7 +121,7 @@
},
"user": {
"domain": {
- "id": "domain_id",
+ "id": "fake_domain_id",
"name": "domain_name"
},
"id": "fake_user_id",
diff --git a/tempest/tests/test_auth.py b/tempest/tests/test_auth.py
index 7b5b4d6..03333be 100644
--- a/tempest/tests/test_auth.py
+++ b/tempest/tests/test_auth.py
@@ -47,6 +47,8 @@
self.stubs.Set(http.ClosingHttp, 'request', self.fake_http.request)
self.stubs.Set(auth, 'get_credentials',
fake_auth_provider.get_credentials)
+ self.stubs.Set(auth, 'get_default_credentials',
+ fake_auth_provider.get_default_credentials)
self.auth_provider = self._auth(self.credentials)
@@ -107,6 +109,10 @@
self.assertIsNone(self.auth_provider.alt_part)
self.assertIsNone(self.auth_provider.alt_auth_data)
+ def test_fill_credentials(self):
+ self.assertRaises(NotImplementedError,
+ self.auth_provider.fill_credentials)
+
class TestKeystoneV2AuthProvider(BaseAuthTestsSetUp):
_endpoints = fake_identity.IDENTITY_V2_RESPONSE['access']['serviceCatalog']
@@ -131,6 +137,13 @@
def _get_token_from_fake_identity(self):
return fake_identity.TOKEN
+ def _get_from_fake_identity(self, attr):
+ access = fake_identity.IDENTITY_V2_RESPONSE['access']
+ if attr == 'user_id':
+ return access['user']['id']
+ elif attr == 'tenant_id':
+ return access['token']['tenant']['id']
+
def _test_request_helper(self, filters, expected):
url, headers, body = self.auth_provider.auth_request('GET',
self.target_url,
@@ -218,6 +231,13 @@
del cred[attr]
self.assertFalse(self.auth_provider.check_credentials(cred))
+ def test_fill_credentials(self):
+ self.auth_provider.fill_credentials()
+ creds = self.auth_provider.credentials
+ for attr in ['user_id', 'tenant_id']:
+ self.assertEqual(self._get_from_fake_identity(attr),
+ getattr(creds, attr))
+
def _test_base_url_helper(self, expected_url, filters,
auth_data=None):
@@ -318,12 +338,7 @@
class TestKeystoneV3AuthProvider(TestKeystoneV2AuthProvider):
_endpoints = fake_identity.IDENTITY_V3_RESPONSE['token']['catalog']
_auth_provider_class = auth.KeystoneV3AuthProvider
- credentials = {
- 'username': 'fake_user',
- 'password': 'fake_pwd',
- 'tenant_name': 'fake_tenant',
- 'domain_name': 'fake_domain_name',
- }
+ credentials = fake_credentials.FakeKeystoneV3Credentials()
def setUp(self):
super(TestKeystoneV3AuthProvider, self).setUp()
@@ -343,10 +358,44 @@
access['expires_at'] = date_as_string
return token, access
- def test_check_credentials_missing_tenant_name(self):
- cred = copy.copy(self.credentials)
- del cred['domain_name']
- self.assertFalse(self.auth_provider.check_credentials(cred))
+ def _get_from_fake_identity(self, attr):
+ token = fake_identity.IDENTITY_V3_RESPONSE['token']
+ if attr == 'user_id':
+ return token['user']['id']
+ elif attr == 'project_id':
+ return token['project']['id']
+ elif attr == 'user_domain_id':
+ return token['user']['domain']['id']
+ elif attr == 'project_domain_id':
+ return token['project']['domain']['id']
+
+ def test_check_credentials_missing_attribute(self):
+ # reset credentials to fresh ones
+ self.credentials.reset()
+ for attr in ['username', 'password', 'user_domain_name',
+ 'project_domain_name']:
+ cred = copy.copy(self.credentials)
+ del cred[attr]
+ self.assertFalse(self.auth_provider.check_credentials(cred),
+ "Credentials should be invalid without %s" % attr)
+
+ def test_check_domain_credentials_missing_attribute(self):
+ # reset credentials to fresh ones
+ self.credentials.reset()
+ domain_creds = fake_credentials.FakeKeystoneV3DomainCredentials()
+ for attr in ['username', 'password', 'user_domain_name']:
+ cred = copy.copy(domain_creds)
+ del cred[attr]
+ self.assertFalse(self.auth_provider.check_credentials(cred),
+ "Credentials should be invalid without %s" % attr)
+
+ def test_fill_credentials(self):
+ self.auth_provider.fill_credentials()
+ creds = self.auth_provider.credentials
+ for attr in ['user_id', 'project_id', 'user_domain_id',
+ 'project_domain_id']:
+ self.assertEqual(self._get_from_fake_identity(attr),
+ getattr(creds, attr))
# Overwrites v2 test
def test_base_url_to_get_admin_endpoint(self):
diff --git a/tempest/tests/test_credentials.py b/tempest/tests/test_credentials.py
index 86600fa..9da5f92 100644
--- a/tempest/tests/test_credentials.py
+++ b/tempest/tests/test_credentials.py
@@ -13,6 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
+import copy
+
+from oslo.config import cfg
+
from tempest import auth
from tempest.common import http
from tempest.common import tempest_fixtures as fixtures
@@ -40,6 +44,10 @@
self.useFixture(fake_config.ConfigFixture())
self.stubs.Set(config, 'TempestConfigPrivate', fake_config.FakePrivate)
+ def test_create(self):
+ creds = self._get_credentials()
+ self.assertEqual(self.attributes, creds._initial)
+
def test_create_invalid_attr(self):
self.assertRaises(exceptions.InvalidCredentials,
self._get_credentials,
@@ -77,19 +85,44 @@
self.stubs.Set(http.ClosingHttp, 'request', self.identity_response)
self.stubs.Set(config, 'TempestConfigPrivate', fake_config.FakePrivate)
- def _verify_credentials(self, credentials_class, creds_dict):
- creds = auth.get_credentials(**creds_dict)
- # Check the right version of credentials has been returned
- self.assertIsInstance(creds, credentials_class)
- # Check the id attributes are filled in
- attributes = [x for x in creds.ATTRIBUTES if (
- '_id' in x and x != 'domain_id')]
- for attr in attributes:
- self.assertIsNone(getattr(creds, attr))
+ def _verify_credentials(self, credentials_class, filled=True,
+ creds_dict=None):
+
+ def _check(credentials):
+ # Check the right version of credentials has been returned
+ self.assertIsInstance(credentials, credentials_class)
+ # Check the id attributes are filled in
+ attributes = [x for x in credentials.ATTRIBUTES if (
+ '_id' in x and x != 'domain_id')]
+ for attr in attributes:
+ if filled:
+ self.assertIsNotNone(getattr(credentials, attr))
+ else:
+ self.assertIsNone(getattr(credentials, attr))
+
+ if creds_dict is None:
+ for ctype in auth.Credentials.TYPES:
+ creds = auth.get_default_credentials(credential_type=ctype,
+ fill_in=filled)
+ _check(creds)
+ else:
+ creds = auth.get_credentials(fill_in=filled, **creds_dict)
+ _check(creds)
+
+ def test_get_default_credentials(self):
+ self.useFixture(fixtures.LockFixture('auth_version'))
+ self._verify_credentials(credentials_class=self.credentials_class)
def test_get_credentials(self):
self.useFixture(fixtures.LockFixture('auth_version'))
- self._verify_credentials(self.credentials_class, self.attributes)
+ self._verify_credentials(credentials_class=self.credentials_class,
+ creds_dict=self.attributes)
+
+ def test_get_credentials_not_filled(self):
+ self.useFixture(fixtures.LockFixture('auth_version'))
+ self._verify_credentials(credentials_class=self.credentials_class,
+ filled=False,
+ creds_dict=self.attributes)
def test_is_valid(self):
creds = self._get_credentials()
@@ -110,3 +143,87 @@
# Default configuration values related to credentials
# are defined as fake_* in fake_config.py
self.assertEqual(getattr(creds, attr), 'fake_' + attr)
+
+ def test_reset_all_attributes(self):
+ creds = self._get_credentials()
+ initial_creds = copy.deepcopy(creds)
+ set_attr = creds.__dict__.keys()
+ missing_attr = set(creds.ATTRIBUTES).difference(set_attr)
+ # Set all unset attributes, then reset
+ for attr in missing_attr:
+ setattr(creds, attr, 'fake' + attr)
+ creds.reset()
+ # Check reset credentials are same as initial ones
+ self.assertEqual(creds, initial_creds)
+
+ def test_reset_single_attribute(self):
+ creds = self._get_credentials()
+ initial_creds = copy.deepcopy(creds)
+ set_attr = creds.__dict__.keys()
+ missing_attr = set(creds.ATTRIBUTES).difference(set_attr)
+ # Set one unset attributes, then reset
+ for attr in missing_attr:
+ setattr(creds, attr, 'fake' + attr)
+ creds.reset()
+ # Check reset credentials are same as initial ones
+ self.assertEqual(creds, initial_creds)
+
+
+class KeystoneV3CredentialsTests(KeystoneV2CredentialsTests):
+ attributes = {
+ 'username': 'fake_username',
+ 'password': 'fake_password',
+ 'project_name': 'fake_project_name',
+ 'user_domain_name': 'fake_domain_name'
+ }
+
+ credentials_class = auth.KeystoneV3Credentials
+ identity_response = fake_identity._fake_v3_response
+
+ def setUp(self):
+ super(KeystoneV3CredentialsTests, self).setUp()
+ # Additional config items reset by cfg fixture after each test
+ cfg.CONF.set_default('auth_version', 'v3', group='identity')
+ # Identity group items
+ for prefix in ['', 'alt_', 'admin_']:
+ cfg.CONF.set_default(prefix + 'domain_name', 'fake_domain_name',
+ group='identity')
+ # Compute Admin group items
+ cfg.CONF.set_default('domain_name', 'fake_domain_name',
+ group='compute-admin')
+
+ def test_default(self):
+ self.useFixture(fixtures.LockFixture('auth_version'))
+ for ctype in self.credentials_class.TYPES:
+ creds = self.credentials_class.get_default(credentials_type=ctype)
+ for attr in self.attributes.keys():
+ if attr == 'project_name':
+ config_value = 'fake_tenant_name'
+ elif attr == 'user_domain_name':
+ config_value = 'fake_domain_name'
+ else:
+ config_value = 'fake_' + attr
+ self.assertEqual(getattr(creds, attr), config_value)
+
+ def test_synced_attributes(self):
+ attributes = self.attributes
+ # Create V3 credentials with tenant instead of project, and user_domain
+ for attr in ['project_id', 'user_domain_id']:
+ attributes[attr] = 'fake_' + attr
+ creds = self._get_credentials(attributes)
+ self.assertEqual(creds.project_name, creds.tenant_name)
+ self.assertEqual(creds.project_id, creds.tenant_id)
+ self.assertEqual(creds.user_domain_name, creds.project_domain_name)
+ self.assertEqual(creds.user_domain_id, creds.project_domain_id)
+ # Replace user_domain with project_domain
+ del attributes['user_domain_name']
+ del attributes['user_domain_id']
+ del attributes['project_name']
+ del attributes['project_id']
+ for attr in ['project_domain_name', 'project_domain_id',
+ 'tenant_name', 'tenant_id']:
+ attributes[attr] = 'fake_' + attr
+ self.assertEqual(creds.tenant_name, creds.project_name)
+ self.assertEqual(creds.tenant_id, creds.project_id)
+ self.assertEqual(creds.project_domain_name, creds.user_domain_name)
+ self.assertEqual(creds.project_domain_id, creds.user_domain_id)
diff --git a/tempest/tests/test_tenant_isolation.py b/tempest/tests/test_tenant_isolation.py
index ae2e57d..084b6d2 100644
--- a/tempest/tests/test_tenant_isolation.py
+++ b/tempest/tests/test_tenant_isolation.py
@@ -17,6 +17,7 @@
import neutronclient.v2_0.client as neutronclient
from oslo.config import cfg
+from tempest.common import http
from tempest.common import isolated_creds
from tempest import config
from tempest import exceptions
@@ -27,6 +28,8 @@
from tempest.services.network.xml import network_client as xml_network_client
from tempest.tests import base
from tempest.tests import fake_config
+from tempest.tests import fake_http
+from tempest.tests import fake_identity
class TestTenantIsolation(base.TestCase):
@@ -35,6 +38,9 @@
super(TestTenantIsolation, self).setUp()
self.useFixture(fake_config.ConfigFixture())
self.stubs.Set(config, 'TempestConfigPrivate', fake_config.FakePrivate)
+ self.fake_http = fake_http.fake_httplib2(return_type=200)
+ self.stubs.Set(http.ClosingHttp, 'request',
+ fake_identity._fake_v2_response)
def test_tempest_client(self):
iso_creds = isolated_creds.IsolatedCreds('test class')
@@ -108,9 +114,9 @@
password='fake_password')
self._mock_tenant_create('1234', 'fake_prim_tenant')
self._mock_user_create('1234', 'fake_prim_user')
- username, tenant_name, password = iso_creds.get_primary_creds()
- self.assertEqual(username, 'fake_prim_user')
- self.assertEqual(tenant_name, 'fake_prim_tenant')
+ primary_creds = iso_creds.get_primary_creds(old_style=False)
+ self.assertEqual(primary_creds.username, 'fake_prim_user')
+ self.assertEqual(primary_creds.tenant_name, 'fake_prim_tenant')
# Verify helper methods
tenant = iso_creds.get_primary_tenant()
user = iso_creds.get_primary_user()
@@ -136,10 +142,10 @@
self.addCleanup(user_mock.stop)
with patch.object(json_iden_client.IdentityClientJSON,
'assign_user_role') as user_mock:
- username, tenant_name, password = iso_creds.get_admin_creds()
+ admin_creds = iso_creds.get_admin_creds(old_style=False)
user_mock.assert_called_once_with('1234', '1234', '1234')
- self.assertEqual(username, 'fake_admin_user')
- self.assertEqual(tenant_name, 'fake_admin_tenant')
+ self.assertEqual(admin_creds.username, 'fake_admin_user')
+ self.assertEqual(admin_creds.tenant_name, 'fake_admin_tenant')
# Verify helper methods
tenant = iso_creds.get_admin_tenant()
user = iso_creds.get_admin_user()
@@ -153,12 +159,12 @@
password='fake_password')
tenant_fix = self._mock_tenant_create('1234', 'fake_prim_tenant')
user_fix = self._mock_user_create('1234', 'fake_prim_user')
- username, tenant_name, password = iso_creds.get_primary_creds()
+ iso_creds.get_primary_creds(old_style=False)
tenant_fix.cleanUp()
user_fix.cleanUp()
tenant_fix = self._mock_tenant_create('12345', 'fake_alt_tenant')
user_fix = self._mock_user_create('12345', 'fake_alt_user')
- alt_username, alt_tenant, alt_password = iso_creds.get_alt_creds()
+ iso_creds.get_alt_creds(old_style=False)
tenant_fix.cleanUp()
user_fix.cleanUp()
tenant_fix = self._mock_tenant_create('123456', 'fake_admin_tenant')
@@ -170,8 +176,7 @@
[{'id': '123456', 'name': 'admin'}])))
with patch.object(json_iden_client.IdentityClientJSON,
'assign_user_role'):
- admin_username, admin_tenant, admin_pass = \
- iso_creds.get_admin_creds()
+ iso_creds.get_admin_creds(old_style=False)
user_mock = self.patch(
'tempest.services.identity.json.identity_client.'
'IdentityClientJSON.delete_user')
@@ -201,9 +206,9 @@
password='fake_password')
self._mock_user_create('1234', 'fake_alt_user')
self._mock_tenant_create('1234', 'fake_alt_tenant')
- username, tenant_name, password = iso_creds.get_alt_creds()
- self.assertEqual(username, 'fake_alt_user')
- self.assertEqual(tenant_name, 'fake_alt_tenant')
+ alt_creds = iso_creds.get_alt_creds(old_style=False)
+ self.assertEqual(alt_creds.username, 'fake_alt_user')
+ self.assertEqual(alt_creds.tenant_name, 'fake_alt_tenant')
# Verify helper methods
tenant = iso_creds.get_alt_tenant()
user = iso_creds.get_alt_user()
@@ -222,7 +227,7 @@
router_interface_mock = self.patch(
'tempest.services.network.json.network_client.NetworkClientJSON.'
'add_router_interface_with_subnet_id')
- username, tenant_name, password = iso_creds.get_primary_creds()
+ iso_creds.get_primary_creds(old_style=False)
router_interface_mock.called_once_with('1234', '1234')
network = iso_creds.get_primary_network()
subnet = iso_creds.get_primary_subnet()
@@ -247,7 +252,7 @@
router_interface_mock = self.patch(
'tempest.services.network.json.network_client.NetworkClientJSON.'
'add_router_interface_with_subnet_id')
- username, tenant_name, password = iso_creds.get_primary_creds()
+ iso_creds.get_primary_creds(old_style=False)
router_interface_mock.called_once_with('1234', '1234')
router_interface_mock.reset_mock()
tenant_fix.cleanUp()
@@ -262,7 +267,7 @@
subnet_fix = self._mock_subnet_create(iso_creds, '12345',
'fake_alt_subnet')
router_fix = self._mock_router_create('12345', 'fake_alt_router')
- alt_username, alt_tenant_name, password = iso_creds.get_alt_creds()
+ iso_creds.get_alt_creds(old_style=False)
router_interface_mock.called_once_with('12345', '12345')
router_interface_mock.reset_mock()
tenant_fix.cleanUp()
@@ -285,7 +290,7 @@
[{'id': '123456', 'name': 'admin'}])))
with patch.object(json_iden_client.IdentityClientJSON,
'assign_user_role'):
- admin_user, admin_tenant, password = iso_creds.get_admin_creds()
+ iso_creds.get_admin_creds(old_style=False)
self.patch('tempest.services.identity.json.identity_client.'
'IdentityClientJSON.delete_user')
self.patch('tempest.services.identity.json.identity_client.'