Merge "Drop OfficialClientManager and references to it"
diff --git a/tempest/clients.py b/tempest/clients.py
index 89cffba..1dda66a 100644
--- a/tempest/clients.py
+++ b/tempest/clients.py
@@ -13,9 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
-import keystoneclient.exceptions
-import keystoneclient.v2_0.client
-
from tempest import auth
from tempest.common import rest_client
from tempest import config
@@ -488,290 +485,3 @@
credentials=auth.get_default_credentials('compute_admin'),
interface=interface,
service=service)
-
-
-class OfficialClientManager(manager.Manager):
- """
- Manager that provides access to the official python clients for
- calling various OpenStack APIs.
- """
-
- NOVACLIENT_VERSION = '2'
- CINDERCLIENT_VERSION = '1'
- HEATCLIENT_VERSION = '1'
- IRONICCLIENT_VERSION = '1'
- SAHARACLIENT_VERSION = '1.1'
- CEILOMETERCLIENT_VERSION = '2'
-
- def __init__(self, credentials):
- # FIXME(andreaf) Auth provider for client_type 'official' is
- # not implemented yet, setting to 'tempest' for now.
- self.client_type = 'tempest'
- self.interface = None
- # super cares for credentials validation
- super(OfficialClientManager, self).__init__(credentials=credentials)
- self.baremetal_client = self._get_baremetal_client()
- self.compute_client = self._get_compute_client(credentials)
- self.identity_client = self._get_identity_client(credentials)
- self.image_client = self._get_image_client()
- self.network_client = self._get_network_client()
- self.volume_client = self._get_volume_client(credentials)
- self.object_storage_client = self._get_object_storage_client(
- credentials)
- self.orchestration_client = self._get_orchestration_client(
- credentials)
- self.data_processing_client = self._get_data_processing_client(
- credentials)
- self.ceilometer_client = self._get_ceilometer_client(
- credentials)
-
- def _get_roles(self):
- admin_credentials = auth.get_default_credentials('identity_admin')
- keystone_admin = self._get_identity_client(admin_credentials)
-
- username = self.credentials.username
- tenant_name = self.credentials.tenant_name
- user_id = keystone_admin.users.find(name=username).id
- tenant_id = keystone_admin.tenants.find(name=tenant_name).id
-
- roles = keystone_admin.roles.roles_for_user(
- user=user_id, tenant=tenant_id)
-
- return [r.name for r in roles]
-
- def _get_compute_client(self, credentials):
- # Novaclient will not execute operations for anyone but the
- # identified user, so a new client needs to be created for
- # each user that operations need to be performed for.
- if not CONF.service_available.nova:
- return None
- import novaclient.client
-
- auth_url = CONF.identity.uri
- dscv = CONF.identity.disable_ssl_certificate_validation
- region = CONF.identity.region
-
- client_args = (credentials.username, credentials.password,
- credentials.tenant_name, auth_url)
-
- # Create our default Nova client to use in testing
- service_type = CONF.compute.catalog_type
- endpoint_type = CONF.compute.endpoint_type
- return novaclient.client.Client(self.NOVACLIENT_VERSION,
- *client_args,
- service_type=service_type,
- endpoint_type=endpoint_type,
- region_name=region,
- no_cache=True,
- insecure=dscv,
- http_log_debug=True)
-
- def _get_image_client(self):
- if not CONF.service_available.glance:
- return None
- import glanceclient
- token = self.identity_client.auth_token
- region = CONF.identity.region
- endpoint_type = CONF.image.endpoint_type
- endpoint = self.identity_client.service_catalog.url_for(
- attr='region', filter_value=region,
- service_type=CONF.image.catalog_type, endpoint_type=endpoint_type)
- dscv = CONF.identity.disable_ssl_certificate_validation
- return glanceclient.Client('1', endpoint=endpoint, token=token,
- insecure=dscv)
-
- def _get_volume_client(self, credentials):
- if not CONF.service_available.cinder:
- return None
- import cinderclient.client
- auth_url = CONF.identity.uri
- region = CONF.identity.region
- endpoint_type = CONF.volume.endpoint_type
- dscv = CONF.identity.disable_ssl_certificate_validation
- return cinderclient.client.Client(self.CINDERCLIENT_VERSION,
- credentials.username,
- credentials.password,
- credentials.tenant_name,
- auth_url,
- region_name=region,
- endpoint_type=endpoint_type,
- insecure=dscv,
- http_log_debug=True)
-
- def _get_object_storage_client(self, credentials):
- if not CONF.service_available.swift:
- return None
- import swiftclient
- auth_url = CONF.identity.uri
- # add current tenant to swift operator role group.
- admin_credentials = auth.get_default_credentials('identity_admin')
- keystone_admin = self._get_identity_client(admin_credentials)
-
- # enable test user to operate swift by adding operator role to him.
- roles = keystone_admin.roles.list()
- operator_role = CONF.object_storage.operator_role
- member_role = [role for role in roles if role.name == operator_role][0]
- # NOTE(maurosr): This is surrounded in the try-except block cause
- # neutron tests doesn't have tenant isolation.
- try:
- keystone_admin.roles.add_user_role(self.identity_client.user_id,
- member_role.id,
- self.identity_client.tenant_id)
- except keystoneclient.exceptions.Conflict:
- pass
-
- endpoint_type = CONF.object_storage.endpoint_type
- os_options = {'endpoint_type': endpoint_type}
- return swiftclient.Connection(auth_url, credentials.username,
- credentials.password,
- tenant_name=credentials.tenant_name,
- auth_version='2',
- os_options=os_options)
-
- def _get_orchestration_client(self, credentials):
- if not CONF.service_available.heat:
- return None
- import heatclient.client
-
- keystone = self._get_identity_client(credentials)
- region = CONF.identity.region
- endpoint_type = CONF.orchestration.endpoint_type
- token = keystone.auth_token
- service_type = CONF.orchestration.catalog_type
- try:
- endpoint = keystone.service_catalog.url_for(
- attr='region',
- filter_value=region,
- service_type=service_type,
- endpoint_type=endpoint_type)
- except keystoneclient.exceptions.EndpointNotFound:
- return None
- else:
- return heatclient.client.Client(self.HEATCLIENT_VERSION,
- endpoint,
- token=token,
- username=credentials.username,
- password=credentials.password)
-
- def _get_identity_client(self, credentials):
- # This identity client is not intended to check the security
- # of the identity service, so use admin credentials by default.
-
- auth_url = CONF.identity.uri
- dscv = CONF.identity.disable_ssl_certificate_validation
-
- return keystoneclient.v2_0.client.Client(
- username=credentials.username,
- password=credentials.password,
- tenant_name=credentials.tenant_name,
- auth_url=auth_url,
- insecure=dscv)
-
- def _get_baremetal_client(self):
- # ironic client is currently intended to by used by admin users
- if not CONF.service_available.ironic:
- return None
- import ironicclient.client
- roles = self._get_roles()
- if CONF.identity.admin_role not in roles:
- return None
-
- auth_url = CONF.identity.uri
- api_version = self.IRONICCLIENT_VERSION
- insecure = CONF.identity.disable_ssl_certificate_validation
- service_type = CONF.baremetal.catalog_type
- endpoint_type = CONF.baremetal.endpoint_type
- creds = {
- 'os_username': self.credentials.username,
- 'os_password': self.credentials.password,
- 'os_tenant_name': self.credentials.tenant_name
- }
-
- try:
- return ironicclient.client.get_client(
- api_version=api_version,
- os_auth_url=auth_url,
- insecure=insecure,
- os_service_type=service_type,
- os_endpoint_type=endpoint_type,
- **creds)
- except keystoneclient.exceptions.EndpointNotFound:
- return None
-
- def _get_network_client(self):
- # The intended configuration is for the network client to have
- # admin privileges and indicate for whom resources are being
- # created via a 'tenant_id' parameter. This will often be
- # preferable to authenticating as a specific user because
- # working with certain resources (public routers and networks)
- # often requires admin privileges anyway.
- if not CONF.service_available.neutron:
- return None
- import neutronclient.v2_0.client
-
- credentials = auth.get_default_credentials('identity_admin')
-
- auth_url = CONF.identity.uri
- dscv = CONF.identity.disable_ssl_certificate_validation
- endpoint_type = CONF.network.endpoint_type
-
- return neutronclient.v2_0.client.Client(
- username=credentials.username,
- password=credentials.password,
- tenant_name=credentials.tenant_name,
- endpoint_type=endpoint_type,
- auth_url=auth_url,
- insecure=dscv)
-
- def _get_data_processing_client(self, credentials):
- if not CONF.service_available.sahara:
- # Sahara isn't available
- return None
-
- import saharaclient.client
-
- endpoint_type = CONF.data_processing.endpoint_type
- catalog_type = CONF.data_processing.catalog_type
- auth_url = CONF.identity.uri
-
- client = saharaclient.client.Client(
- self.SAHARACLIENT_VERSION,
- credentials.username,
- credentials.password,
- project_name=credentials.tenant_name,
- endpoint_type=endpoint_type,
- service_type=catalog_type,
- auth_url=auth_url)
-
- return client
-
- def _get_ceilometer_client(self, credentials):
- if not CONF.service_available.ceilometer:
- return None
-
- import ceilometerclient.client
-
- keystone = self._get_identity_client(credentials)
- region = CONF.identity.region
-
- endpoint_type = CONF.telemetry.endpoint_type
- service_type = CONF.telemetry.catalog_type
- auth_url = CONF.identity.uri
-
- try:
- keystone.service_catalog.url_for(
- attr='region',
- filter_value=region,
- service_type=service_type,
- endpoint_type=endpoint_type)
- except keystoneclient.exceptions.EndpointNotFound:
- return None
- else:
- return ceilometerclient.client.get_client(
- self.CEILOMETERCLIENT_VERSION,
- os_username=credentials.username,
- os_password=credentials.password,
- os_tenant_name=credentials.tenant_name,
- os_auth_url=auth_url,
- os_service_type=service_type,
- os_endpoint_type=endpoint_type)
diff --git a/tempest/hacking/checks.py b/tempest/hacking/checks.py
index abc60cb..55cc89b 100644
--- a/tempest/hacking/checks.py
+++ b/tempest/hacking/checks.py
@@ -106,20 +106,6 @@
"T107: service tag should not be in path")
-def no_official_client_manager_in_api_tests(physical_line, filename):
- """Check that the OfficialClientManager isn't used in the api tests
-
- The api tests should not use the official clients.
-
- T108: Can not use OfficialClientManager in the API tests
- """
- if 'tempest/api' in filename:
- if 'OfficialClientManager' in physical_line:
- return (physical_line.find('OfficialClientManager'),
- 'T108: OfficialClientManager can not be used in the api '
- 'tests')
-
-
def no_mutable_default_args(logical_line):
"""Check that mutable object isn't used as default argument
@@ -136,5 +122,4 @@
register(no_setupclass_for_unit_tests)
register(no_vi_headers)
register(service_tags_not_in_module_path)
- register(no_official_client_manager_in_api_tests)
register(no_mutable_default_args)
diff --git a/tempest/scenario/manager.py b/tempest/scenario/manager.py
index 559f3d5..93712fc 100644
--- a/tempest/scenario/manager.py
+++ b/tempest/scenario/manager.py
@@ -47,15 +47,7 @@
class ScenarioTest(tempest.test.BaseTestCase):
- """Replaces the OfficialClientTest base class.
-
- Uses tempest own clients as opposed to OfficialClients.
-
- Common differences:
- - replace resource.attribute with resource['attribute']
- - replace resouce.delete with delete_callable(resource['id'])
- - replace local waiters with common / rest_client waiters
- """
+ """Base class for scenario tests. Uses tempest own clients. """
@classmethod
def setUpClass(cls):
diff --git a/tempest/tests/test_hacking.py b/tempest/tests/test_hacking.py
index 9c13013..37ad18e 100644
--- a/tempest/tests/test_hacking.py
+++ b/tempest/tests/test_hacking.py
@@ -100,14 +100,6 @@
self.assertFalse(checks.service_tags_not_in_module_path(
"@test.services('compute')", './tempest/api/image/fake_test.py'))
- def test_no_official_client_manager_in_api_tests(self):
- self.assertTrue(checks.no_official_client_manager_in_api_tests(
- "cls.official_client = clients.OfficialClientManager(credentials)",
- "tempest/api/compute/base.py"))
- self.assertFalse(checks.no_official_client_manager_in_api_tests(
- "cls.official_client = clients.OfficialClientManager(credentials)",
- "tempest/scenario/fake_test.py"))
-
def test_no_mutable_default_args(self):
self.assertEqual(1, len(list(checks.no_mutable_default_args(
" def function1(para={}):"))))