Use auth data to fill credentials
Provide the ability to automatically fill in credentials
details e.g. IDs if names where provided.
Verified via unit test.
Partially implements: bp multi-keystone-api-version-tests
Change-Id: I505e5024754fe1b912104ce4d5d3206f4cedd6d8
diff --git a/tempest/auth.py b/tempest/auth.py
index c95f7c7..9c51edb 100644
--- a/tempest/auth.py
+++ b/tempest/auth.py
@@ -80,6 +80,17 @@
def _get_auth(self):
raise NotImplementedError
+ def _fill_credentials(self, auth_data_body):
+ raise NotImplementedError
+
+ def fill_credentials(self):
+ """
+ Fill credentials object with data from auth
+ """
+ auth_data = self.get_auth()
+ self._fill_credentials(auth_data[1])
+ return self.credentials
+
@classmethod
def check_credentials(cls, credentials):
"""
@@ -89,20 +100,35 @@
@property
def auth_data(self):
- if self.cache is None or self.is_expired(self.cache):
- self.cache = self._get_auth()
- return self.cache
+ return self.get_auth()
@auth_data.deleter
def auth_data(self):
self.clear_auth()
+ def get_auth(self):
+ """
+ Returns auth from cache if available, else auth first
+ """
+ if self.cache is None or self.is_expired(self.cache):
+ self.set_auth()
+ return self.cache
+
+ def set_auth(self):
+ """
+ Forces setting auth, ignores cache if it exists.
+ Refills credentials
+ """
+ self.cache = self._get_auth()
+ self._fill_credentials(self.cache[1])
+
def clear_auth(self):
"""
Can be called to clear the access cache so that next request
will fetch a new token and base_url.
"""
self.cache = None
+ self.credentials.reset()
def is_expired(self, auth_data):
raise NotImplementedError
@@ -244,6 +270,18 @@
else:
raise NotImplementedError
+ def _fill_credentials(self, auth_data_body):
+ tenant = auth_data_body['token']['tenant']
+ user = auth_data_body['user']
+ if self.credentials.tenant_name is None:
+ self.credentials.tenant_name = tenant['name']
+ if self.credentials.tenant_id is None:
+ self.credentials.tenant_id = tenant['id']
+ if self.credentials.username is None:
+ self.credentials.username = user['name']
+ if self.credentials.user_id is None:
+ self.credentials.user_id = user['id']
+
def base_url(self, filters, auth_data=None):
"""
Filters can be:
@@ -320,6 +358,39 @@
else:
raise NotImplementedError
+ def _fill_credentials(self, auth_data_body):
+ # project or domain, depending on the scope
+ project = auth_data_body.get('project', None)
+ domain = auth_data_body.get('domain', None)
+ # user is always there
+ user = auth_data_body['user']
+ # Set project fields
+ if project is not None:
+ if self.credentials.project_name is None:
+ self.credentials.project_name = project['name']
+ if self.credentials.project_id is None:
+ self.credentials.project_id = project['id']
+ if self.credentials.project_domain_id is None:
+ self.credentials.project_domain_id = project['domain']['id']
+ if self.credentials.project_domain_name is None:
+ self.credentials.project_domain_name = \
+ project['domain']['name']
+ # Set domain fields
+ if domain is not None:
+ if self.credentials.domain_id is None:
+ self.credentials.domain_id = domain['id']
+ if self.credentials.domain_name is None:
+ self.credentials.domain_name = domain['name']
+ # Set user fields
+ if self.credentials.username is None:
+ self.credentials.username = user['name']
+ if self.credentials.user_id is None:
+ self.credentials.user_id = user['id']
+ if self.credentials.user_domain_id is None:
+ self.credentials.user_domain_id = user['domain']['id']
+ if self.credentials.user_domain_name is None:
+ self.credentials.user_domain_name = user['domain']['name']
+
def base_url(self, filters, auth_data=None):
"""
Filters can be:
@@ -387,15 +458,15 @@
datetime.datetime.utcnow()
-def get_default_credentials(credential_type):
+def get_default_credentials(credential_type, fill_in=True):
"""
Returns configured credentials of the specified type
based on the configured auth_version
"""
- return get_credentials(credential_type=credential_type)
+ return get_credentials(fill_in=fill_in, credential_type=credential_type)
-def get_credentials(credential_type=None, **kwargs):
+def get_credentials(credential_type=None, fill_in=True, **kwargs):
"""
Builds a credentials object based on the configured auth_version
@@ -415,14 +486,20 @@
"""
if CONF.identity.auth_version == 'v2':
credential_class = KeystoneV2Credentials
+ auth_provider_class = KeystoneV2AuthProvider
elif CONF.identity.auth_version == 'v3':
credential_class = KeystoneV3Credentials
+ auth_provider_class = KeystoneV3AuthProvider
else:
raise exceptions.InvalidConfiguration('Unsupported auth version')
if credential_type is not None:
creds = credential_class.get_default(credential_type)
else:
creds = credential_class(**kwargs)
+ # Fill in the credentials fields that were not specified
+ if fill_in:
+ auth_provider = auth_provider_class(creds)
+ creds = auth_provider.fill_credentials()
return creds
@@ -451,6 +528,7 @@
Additional attributes can still be set afterwards if tests need
to do so.
"""
+ self._initial = kwargs
self._apply_credentials(kwargs)
def _apply_credentials(self, attr):
@@ -511,6 +589,14 @@
def is_valid(self):
raise NotImplementedError
+ def reset(self):
+ # First delete all known attributes
+ for key in self.ATTRIBUTES:
+ if getattr(self, key) is not None:
+ delattr(self, key)
+ # Then re-apply initial setup
+ self._apply_credentials(self._initial)
+
class KeystoneV2Credentials(Credentials):
diff --git a/tempest/tests/test_auth.py b/tempest/tests/test_auth.py
index fb27e02..03333be 100644
--- a/tempest/tests/test_auth.py
+++ b/tempest/tests/test_auth.py
@@ -109,6 +109,10 @@
self.assertIsNone(self.auth_provider.alt_part)
self.assertIsNone(self.auth_provider.alt_auth_data)
+ def test_fill_credentials(self):
+ self.assertRaises(NotImplementedError,
+ self.auth_provider.fill_credentials)
+
class TestKeystoneV2AuthProvider(BaseAuthTestsSetUp):
_endpoints = fake_identity.IDENTITY_V2_RESPONSE['access']['serviceCatalog']
@@ -133,6 +137,13 @@
def _get_token_from_fake_identity(self):
return fake_identity.TOKEN
+ def _get_from_fake_identity(self, attr):
+ access = fake_identity.IDENTITY_V2_RESPONSE['access']
+ if attr == 'user_id':
+ return access['user']['id']
+ elif attr == 'tenant_id':
+ return access['token']['tenant']['id']
+
def _test_request_helper(self, filters, expected):
url, headers, body = self.auth_provider.auth_request('GET',
self.target_url,
@@ -220,6 +231,13 @@
del cred[attr]
self.assertFalse(self.auth_provider.check_credentials(cred))
+ def test_fill_credentials(self):
+ self.auth_provider.fill_credentials()
+ creds = self.auth_provider.credentials
+ for attr in ['user_id', 'tenant_id']:
+ self.assertEqual(self._get_from_fake_identity(attr),
+ getattr(creds, attr))
+
def _test_base_url_helper(self, expected_url, filters,
auth_data=None):
@@ -340,9 +358,20 @@
access['expires_at'] = date_as_string
return token, access
+ def _get_from_fake_identity(self, attr):
+ token = fake_identity.IDENTITY_V3_RESPONSE['token']
+ if attr == 'user_id':
+ return token['user']['id']
+ elif attr == 'project_id':
+ return token['project']['id']
+ elif attr == 'user_domain_id':
+ return token['user']['domain']['id']
+ elif attr == 'project_domain_id':
+ return token['project']['domain']['id']
+
def test_check_credentials_missing_attribute(self):
# reset credentials to fresh ones
- self.credentials = fake_credentials.FakeKeystoneV3Credentials()
+ self.credentials.reset()
for attr in ['username', 'password', 'user_domain_name',
'project_domain_name']:
cred = copy.copy(self.credentials)
@@ -352,7 +381,7 @@
def test_check_domain_credentials_missing_attribute(self):
# reset credentials to fresh ones
- self.credentials = fake_credentials.FakeKeystoneV3Credentials()
+ self.credentials.reset()
domain_creds = fake_credentials.FakeKeystoneV3DomainCredentials()
for attr in ['username', 'password', 'user_domain_name']:
cred = copy.copy(domain_creds)
@@ -360,6 +389,14 @@
self.assertFalse(self.auth_provider.check_credentials(cred),
"Credentials should be invalid without %s" % attr)
+ def test_fill_credentials(self):
+ self.auth_provider.fill_credentials()
+ creds = self.auth_provider.credentials
+ for attr in ['user_id', 'project_id', 'user_domain_id',
+ 'project_domain_id']:
+ self.assertEqual(self._get_from_fake_identity(attr),
+ getattr(creds, attr))
+
# Overwrites v2 test
def test_base_url_to_get_admin_endpoint(self):
self.filters = {
diff --git a/tempest/tests/test_credentials.py b/tempest/tests/test_credentials.py
index 40aa218..9da5f92 100644
--- a/tempest/tests/test_credentials.py
+++ b/tempest/tests/test_credentials.py
@@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
+import copy
+
from oslo.config import cfg
from tempest import auth
@@ -42,6 +44,10 @@
self.useFixture(fake_config.ConfigFixture())
self.stubs.Set(config, 'TempestConfigPrivate', fake_config.FakePrivate)
+ def test_create(self):
+ creds = self._get_credentials()
+ self.assertEqual(self.attributes, creds._initial)
+
def test_create_invalid_attr(self):
self.assertRaises(exceptions.InvalidCredentials,
self._get_credentials,
@@ -79,19 +85,44 @@
self.stubs.Set(http.ClosingHttp, 'request', self.identity_response)
self.stubs.Set(config, 'TempestConfigPrivate', fake_config.FakePrivate)
- def _verify_credentials(self, credentials_class, creds_dict):
- creds = auth.get_credentials(**creds_dict)
- # Check the right version of credentials has been returned
- self.assertIsInstance(creds, credentials_class)
- # Check the id attributes are filled in
- attributes = [x for x in creds.ATTRIBUTES if (
- '_id' in x and x != 'domain_id')]
- for attr in attributes:
- self.assertIsNone(getattr(creds, attr))
+ def _verify_credentials(self, credentials_class, filled=True,
+ creds_dict=None):
+
+ def _check(credentials):
+ # Check the right version of credentials has been returned
+ self.assertIsInstance(credentials, credentials_class)
+ # Check the id attributes are filled in
+ attributes = [x for x in credentials.ATTRIBUTES if (
+ '_id' in x and x != 'domain_id')]
+ for attr in attributes:
+ if filled:
+ self.assertIsNotNone(getattr(credentials, attr))
+ else:
+ self.assertIsNone(getattr(credentials, attr))
+
+ if creds_dict is None:
+ for ctype in auth.Credentials.TYPES:
+ creds = auth.get_default_credentials(credential_type=ctype,
+ fill_in=filled)
+ _check(creds)
+ else:
+ creds = auth.get_credentials(fill_in=filled, **creds_dict)
+ _check(creds)
+
+ def test_get_default_credentials(self):
+ self.useFixture(fixtures.LockFixture('auth_version'))
+ self._verify_credentials(credentials_class=self.credentials_class)
def test_get_credentials(self):
self.useFixture(fixtures.LockFixture('auth_version'))
- self._verify_credentials(self.credentials_class, self.attributes)
+ self._verify_credentials(credentials_class=self.credentials_class,
+ creds_dict=self.attributes)
+
+ def test_get_credentials_not_filled(self):
+ self.useFixture(fixtures.LockFixture('auth_version'))
+ self._verify_credentials(credentials_class=self.credentials_class,
+ filled=False,
+ creds_dict=self.attributes)
def test_is_valid(self):
creds = self._get_credentials()
@@ -113,6 +144,30 @@
# are defined as fake_* in fake_config.py
self.assertEqual(getattr(creds, attr), 'fake_' + attr)
+ def test_reset_all_attributes(self):
+ creds = self._get_credentials()
+ initial_creds = copy.deepcopy(creds)
+ set_attr = creds.__dict__.keys()
+ missing_attr = set(creds.ATTRIBUTES).difference(set_attr)
+ # Set all unset attributes, then reset
+ for attr in missing_attr:
+ setattr(creds, attr, 'fake' + attr)
+ creds.reset()
+ # Check reset credentials are same as initial ones
+ self.assertEqual(creds, initial_creds)
+
+ def test_reset_single_attribute(self):
+ creds = self._get_credentials()
+ initial_creds = copy.deepcopy(creds)
+ set_attr = creds.__dict__.keys()
+ missing_attr = set(creds.ATTRIBUTES).difference(set_attr)
+ # Set one unset attributes, then reset
+ for attr in missing_attr:
+ setattr(creds, attr, 'fake' + attr)
+ creds.reset()
+ # Check reset credentials are same as initial ones
+ self.assertEqual(creds, initial_creds)
+
class KeystoneV3CredentialsTests(KeystoneV2CredentialsTests):
attributes = {
diff --git a/tempest/tests/test_tenant_isolation.py b/tempest/tests/test_tenant_isolation.py
index ae2e57d..28adc45 100644
--- a/tempest/tests/test_tenant_isolation.py
+++ b/tempest/tests/test_tenant_isolation.py
@@ -17,6 +17,7 @@
import neutronclient.v2_0.client as neutronclient
from oslo.config import cfg
+from tempest.common import http
from tempest.common import isolated_creds
from tempest import config
from tempest import exceptions
@@ -27,6 +28,8 @@
from tempest.services.network.xml import network_client as xml_network_client
from tempest.tests import base
from tempest.tests import fake_config
+from tempest.tests import fake_http
+from tempest.tests import fake_identity
class TestTenantIsolation(base.TestCase):
@@ -35,6 +38,9 @@
super(TestTenantIsolation, self).setUp()
self.useFixture(fake_config.ConfigFixture())
self.stubs.Set(config, 'TempestConfigPrivate', fake_config.FakePrivate)
+ self.fake_http = fake_http.fake_httplib2(return_type=200)
+ self.stubs.Set(http.ClosingHttp, 'request',
+ fake_identity._fake_v2_response)
def test_tempest_client(self):
iso_creds = isolated_creds.IsolatedCreds('test class')