Enforces the use of Credentials (part1)
Multiversion auth part5
Refactor tenant isolation to use Credentials instead of username,
password and tenant_name, but keeps the same interface for now,
so test don't have to be changed.
Tenant isolation is still using v2 identity clients to generate
credentials, even when auth_version is set to v3.
This changes prepares the migration to v3.
Partially implements: bp multi-keystone-api-version-tests
Change-Id: If2da9d869af417b6cd8f6a4c48d710e817c76402
diff --git a/tempest/common/isolated_creds.py b/tempest/common/isolated_creds.py
index c54a8e8..9a50c0b 100644
--- a/tempest/common/isolated_creds.py
+++ b/tempest/common/isolated_creds.py
@@ -17,6 +17,7 @@
import keystoneclient.v2_0.client as keystoneclient
import neutronclient.v2_0.client as neutronclient
+from tempest import auth
from tempest import clients
from tempest.common.utils import data_utils
from tempest import config
@@ -33,6 +34,7 @@
password='pass', network_resources=None):
self.network_resources = network_resources
self.isolated_creds = {}
+ self.isolated_creds_old_style = {}
self.isolated_net_resources = {}
self.ports = []
self.name = name
@@ -185,22 +187,19 @@
self._assign_user_role(tenant['id'], user['id'], role['id'])
else:
self._assign_user_role(tenant.id, user.id, role.id)
- return user, tenant
+ return self._get_credentials(user, tenant), user, tenant
- def _get_cred_names(self, user, tenant):
+ def _get_credentials(self, user, tenant):
if self.tempest_client:
- username = user.get('name')
- tenant_name = tenant.get('name')
+ user_get = user.get
+ tenant_get = tenant.get
else:
- username = user.name
- tenant_name = tenant.name
- return username, tenant_name
-
- def _get_tenant_id(self, tenant):
- if self.tempest_client:
- return tenant.get('id')
- else:
- return tenant.id
+ user_get = user.__dict__.get
+ tenant_get = tenant.__dict__.get
+ return auth.get_credentials(
+ username=user_get('name'), user_id=user_get('id'),
+ tenant_name=tenant_get('name'), tenant_id=tenant_get('id'),
+ password=self.password)
def _create_network_resources(self, tenant_id):
network = None
@@ -315,22 +314,28 @@
self.network_admin_client.add_interface_router(router_id, body)
def get_primary_tenant(self):
- return self.isolated_creds.get('primary')[1]
+ # Deprecated. Maintained until all tests are ported
+ return self.isolated_creds_old_style.get('primary')[1]
def get_primary_user(self):
- return self.isolated_creds.get('primary')[0]
+ # Deprecated. Maintained until all tests are ported
+ return self.isolated_creds_old_style.get('primary')[0]
def get_alt_tenant(self):
- return self.isolated_creds.get('alt')[1]
+ # Deprecated. Maintained until all tests are ported
+ return self.isolated_creds_old_style.get('alt')[1]
def get_alt_user(self):
- return self.isolated_creds.get('alt')[0]
+ # Deprecated. Maintained until all tests are ported
+ return self.isolated_creds_old_style.get('alt')[0]
def get_admin_tenant(self):
- return self.isolated_creds.get('admin')[1]
+ # Deprecated. Maintained until all tests are ported
+ return self.isolated_creds_old_style.get('admin')[1]
def get_admin_user(self):
- return self.isolated_creds.get('admin')[0]
+ # Deprecated. Maintained until all tests are ported
+ return self.isolated_creds_old_style.get('admin')[0]
def get_primary_network(self):
return self.isolated_net_resources.get('primary')[0]
@@ -359,62 +364,38 @@
def get_alt_router(self):
return self.isolated_net_resources.get('alt')[2]
- def get_primary_creds(self):
- if self.isolated_creds.get('primary'):
- user, tenant = self.isolated_creds['primary']
- username, tenant_name = self._get_cred_names(user, tenant)
+ def get_credentials(self, credential_type, old_style):
+ if self.isolated_creds.get(credential_type):
+ credentials = self.isolated_creds[credential_type]
else:
- user, tenant = self._create_creds()
- username, tenant_name = self._get_cred_names(user, tenant)
- self.isolated_creds['primary'] = (user, tenant)
- LOG.info("Acquired isolated creds:\n user: %s, tenant: %s"
- % (username, tenant_name))
+ is_admin = (credential_type == 'admin')
+ credentials, user, tenant = self._create_creds(admin=is_admin)
+ self.isolated_creds[credential_type] = credentials
+ # Maintained until tests are ported
+ self.isolated_creds_old_style[credential_type] = (user, tenant)
+ LOG.info("Acquired isolated creds:\n credentials: %s"
+ % credentials)
if CONF.service_available.neutron:
network, subnet, router = self._create_network_resources(
- self._get_tenant_id(tenant))
- self.isolated_net_resources['primary'] = (
+ credentials.tenant_id)
+ self.isolated_net_resources[credential_type] = (
network, subnet, router,)
LOG.info("Created isolated network resources for : \n"
- + " user: %s, tenant: %s" % (username, tenant_name))
- return username, tenant_name, self.password
+ + " credentials: %s" % credentials)
+ if old_style:
+ return (credentials.username, credentials.tenant_name,
+ credentials.password)
+ else:
+ return credentials
- def get_admin_creds(self):
- if self.isolated_creds.get('admin'):
- user, tenant = self.isolated_creds['admin']
- username, tenant_name = self._get_cred_names(user, tenant)
- else:
- user, tenant = self._create_creds(admin=True)
- username, tenant_name = self._get_cred_names(user, tenant)
- self.isolated_creds['admin'] = (user, tenant)
- LOG.info("Acquired admin isolated creds:\n user: %s, tenant: %s"
- % (username, tenant_name))
- if CONF.service_available.neutron:
- network, subnet, router = self._create_network_resources(
- self._get_tenant_id(tenant))
- self.isolated_net_resources['admin'] = (
- network, subnet, router,)
- LOG.info("Created isolated network resources for : \n"
- + " user: %s, tenant: %s" % (username, tenant_name))
- return username, tenant_name, self.password
+ def get_primary_creds(self, old_style=True):
+ return self.get_credentials('primary', old_style)
- def get_alt_creds(self):
- if self.isolated_creds.get('alt'):
- user, tenant = self.isolated_creds['alt']
- username, tenant_name = self._get_cred_names(user, tenant)
- else:
- user, tenant = self._create_creds()
- username, tenant_name = self._get_cred_names(user, tenant)
- self.isolated_creds['alt'] = (user, tenant)
- LOG.info("Acquired alt isolated creds:\n user: %s, tenant: %s"
- % (username, tenant_name))
- if CONF.service_available.neutron:
- network, subnet, router = self._create_network_resources(
- self._get_tenant_id(tenant))
- self.isolated_net_resources['alt'] = (
- network, subnet, router,)
- LOG.info("Created isolated network resources for : \n"
- + " user: %s, tenant: %s" % (username, tenant_name))
- return username, tenant_name, self.password
+ def get_admin_creds(self, old_style=True):
+ return self.get_credentials('admin', old_style)
+
+ def get_alt_creds(self, old_style=True):
+ return self.get_credentials('alt', old_style)
def _clear_isolated_router(self, router_id, router_name):
net_client = self.network_admin_client
@@ -505,29 +486,16 @@
if not self.isolated_creds:
return
self._clear_isolated_net_resources()
- for cred in self.isolated_creds:
- user, tenant = self.isolated_creds.get(cred)
+ for creds in self.isolated_creds.itervalues():
try:
- if self.tempest_client:
- self._delete_user(user['id'])
- else:
- self._delete_user(user.id)
+ self._delete_user(creds.user_id)
except exceptions.NotFound:
- if self.tempest_client:
- name = user['name']
- else:
- name = user.name
- LOG.warn("user with name: %s not found for delete" % name)
+ LOG.warn("user with name: %s not found for delete" %
+ creds.username)
pass
try:
- if self.tempest_client:
- self._delete_tenant(tenant['id'])
- else:
- self._delete_tenant(tenant.id)
+ self._delete_tenant(creds.tenant_id)
except exceptions.NotFound:
- if self.tempest_client:
- name = tenant['name']
- else:
- name = tenant.name
- LOG.warn("tenant with name: %s not found for delete" % name)
+ LOG.warn("tenant with name: %s not found for delete" %
+ creds.tenant_name)
pass
diff --git a/tempest/tests/test_tenant_isolation.py b/tempest/tests/test_tenant_isolation.py
index 28adc45..084b6d2 100644
--- a/tempest/tests/test_tenant_isolation.py
+++ b/tempest/tests/test_tenant_isolation.py
@@ -114,9 +114,9 @@
password='fake_password')
self._mock_tenant_create('1234', 'fake_prim_tenant')
self._mock_user_create('1234', 'fake_prim_user')
- username, tenant_name, password = iso_creds.get_primary_creds()
- self.assertEqual(username, 'fake_prim_user')
- self.assertEqual(tenant_name, 'fake_prim_tenant')
+ primary_creds = iso_creds.get_primary_creds(old_style=False)
+ self.assertEqual(primary_creds.username, 'fake_prim_user')
+ self.assertEqual(primary_creds.tenant_name, 'fake_prim_tenant')
# Verify helper methods
tenant = iso_creds.get_primary_tenant()
user = iso_creds.get_primary_user()
@@ -142,10 +142,10 @@
self.addCleanup(user_mock.stop)
with patch.object(json_iden_client.IdentityClientJSON,
'assign_user_role') as user_mock:
- username, tenant_name, password = iso_creds.get_admin_creds()
+ admin_creds = iso_creds.get_admin_creds(old_style=False)
user_mock.assert_called_once_with('1234', '1234', '1234')
- self.assertEqual(username, 'fake_admin_user')
- self.assertEqual(tenant_name, 'fake_admin_tenant')
+ self.assertEqual(admin_creds.username, 'fake_admin_user')
+ self.assertEqual(admin_creds.tenant_name, 'fake_admin_tenant')
# Verify helper methods
tenant = iso_creds.get_admin_tenant()
user = iso_creds.get_admin_user()
@@ -159,12 +159,12 @@
password='fake_password')
tenant_fix = self._mock_tenant_create('1234', 'fake_prim_tenant')
user_fix = self._mock_user_create('1234', 'fake_prim_user')
- username, tenant_name, password = iso_creds.get_primary_creds()
+ iso_creds.get_primary_creds(old_style=False)
tenant_fix.cleanUp()
user_fix.cleanUp()
tenant_fix = self._mock_tenant_create('12345', 'fake_alt_tenant')
user_fix = self._mock_user_create('12345', 'fake_alt_user')
- alt_username, alt_tenant, alt_password = iso_creds.get_alt_creds()
+ iso_creds.get_alt_creds(old_style=False)
tenant_fix.cleanUp()
user_fix.cleanUp()
tenant_fix = self._mock_tenant_create('123456', 'fake_admin_tenant')
@@ -176,8 +176,7 @@
[{'id': '123456', 'name': 'admin'}])))
with patch.object(json_iden_client.IdentityClientJSON,
'assign_user_role'):
- admin_username, admin_tenant, admin_pass = \
- iso_creds.get_admin_creds()
+ iso_creds.get_admin_creds(old_style=False)
user_mock = self.patch(
'tempest.services.identity.json.identity_client.'
'IdentityClientJSON.delete_user')
@@ -207,9 +206,9 @@
password='fake_password')
self._mock_user_create('1234', 'fake_alt_user')
self._mock_tenant_create('1234', 'fake_alt_tenant')
- username, tenant_name, password = iso_creds.get_alt_creds()
- self.assertEqual(username, 'fake_alt_user')
- self.assertEqual(tenant_name, 'fake_alt_tenant')
+ alt_creds = iso_creds.get_alt_creds(old_style=False)
+ self.assertEqual(alt_creds.username, 'fake_alt_user')
+ self.assertEqual(alt_creds.tenant_name, 'fake_alt_tenant')
# Verify helper methods
tenant = iso_creds.get_alt_tenant()
user = iso_creds.get_alt_user()
@@ -228,7 +227,7 @@
router_interface_mock = self.patch(
'tempest.services.network.json.network_client.NetworkClientJSON.'
'add_router_interface_with_subnet_id')
- username, tenant_name, password = iso_creds.get_primary_creds()
+ iso_creds.get_primary_creds(old_style=False)
router_interface_mock.called_once_with('1234', '1234')
network = iso_creds.get_primary_network()
subnet = iso_creds.get_primary_subnet()
@@ -253,7 +252,7 @@
router_interface_mock = self.patch(
'tempest.services.network.json.network_client.NetworkClientJSON.'
'add_router_interface_with_subnet_id')
- username, tenant_name, password = iso_creds.get_primary_creds()
+ iso_creds.get_primary_creds(old_style=False)
router_interface_mock.called_once_with('1234', '1234')
router_interface_mock.reset_mock()
tenant_fix.cleanUp()
@@ -268,7 +267,7 @@
subnet_fix = self._mock_subnet_create(iso_creds, '12345',
'fake_alt_subnet')
router_fix = self._mock_router_create('12345', 'fake_alt_router')
- alt_username, alt_tenant_name, password = iso_creds.get_alt_creds()
+ iso_creds.get_alt_creds(old_style=False)
router_interface_mock.called_once_with('12345', '12345')
router_interface_mock.reset_mock()
tenant_fix.cleanUp()
@@ -291,7 +290,7 @@
[{'id': '123456', 'name': 'admin'}])))
with patch.object(json_iden_client.IdentityClientJSON,
'assign_user_role'):
- admin_user, admin_tenant, password = iso_creds.get_admin_creds()
+ iso_creds.get_admin_creds(old_style=False)
self.patch('tempest.services.identity.json.identity_client.'
'IdentityClientJSON.delete_user')
self.patch('tempest.services.identity.json.identity_client.'