Delete Tempest fork, import from tempest and tempest_lib
A while ago we copied Tempest networking API tests in to the
Neutron repo, and along came thousands of lines of code of Tempest
testing infrastructure (neutron.tests.tempest). For a while we
periodically refreshed our fork via:
tools/copy_api_tests_from_tempest.sh
I think it's time we move away from that model by eliminating
the fork. We do this by deleting unused code and importing the
rest from tempest_lib. There's some Tempest code still not
moved from Tempest to tempest_lib in tempest.common. I think
it's preferable to import that code than to copy it, and Tempest
cores mostly agree. Manila and Ironic also do the same.
To be able to import from tempest I added it as a requirement:
Since Tempest is not on PyPi, I had to get it from git. Only the api
tests environment needs Tempest, so instead of adding it to
test-requirements, I added it specifically to the api and
api-constraints venvs.
neutron.tests.tempest.test and neutron.tests.tempest.common.*
still remain. These are tighly coupled with one another, and
sadly since Neutron forked Tempest code, Tempest has made significant
changes to those files that also require changes to the test files.
I aim to get rid of the Neutron fork of these files in a follow up
change.
Also fixed import grouping in test files so that it's std libs,
3rd party libs, and then Neutron code.
* Removed neutron.tests.tempest.config:
- We only added one option after the fork. I created a new group
called 'neutron_plugin_options' and moved the new option to that
group. This is in preperation for the Tempest plugin architecture,
where you're supposed to add new config options to a new group
and not to existing configuration groups. Note that this is
obviously a backward incompatible change, but it's to an option
added in the same cycle.
* Removed neutron.tests.tempest.test and neutron.tests.tempest.common.
- This introduced an API change to the way we access Keystone,
which required mechanical changes to a few tests (create_tenant
calls need a different client now).
- The way Tempest manages primary, admin and alternative tenant
credentials was changed after we forked, which required another
mechanical change to a few tests.
* Cut all of the Keystone clients we don't need. We only need
to create/delete tenants, the other clients were used in Tempest by
actual Keystone tests.
* Changed neutron.tests.api.base.BaseNetworkTest:
- Re-implemented get_client_manager so that it returns the Neutron
clients manager and not the one in the Tempest repo.
- Updated it from the Tempest repo so that it uses the new way
to manage credentials (Since it now uses the Tempest test base
class and not our out of date forked copy).
Change-Id: I4f9193dfe26f2d36985cb480a98709ec182a2f7b
diff --git a/neutron/tests/tempest/auth.py b/neutron/tests/tempest/auth.py
deleted file mode 100644
index a9fdb03..0000000
--- a/neutron/tests/tempest/auth.py
+++ /dev/null
@@ -1,655 +0,0 @@
-# Copyright 2014 Hewlett-Packard Development Company, L.P.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import abc
-import copy
-import datetime
-import exceptions
-import re
-import urlparse
-
-import six
-
-from tempest_lib.services.identity.v2 import token_client as json_v2id
-from tempest_lib.services.identity.v3 import token_client as json_v3id
-
-
-@six.add_metaclass(abc.ABCMeta)
-class AuthProvider(object):
- """
- Provide authentication
- """
-
- def __init__(self, credentials):
- """
- :param credentials: credentials for authentication
- """
- if self.check_credentials(credentials):
- self.credentials = credentials
- else:
- raise TypeError("Invalid credentials")
- self.cache = None
- self.alt_auth_data = None
- self.alt_part = None
-
- def __str__(self):
- return "Creds :{creds}, cached auth data: {cache}".format(
- creds=self.credentials, cache=self.cache)
-
- @abc.abstractmethod
- def _decorate_request(self, filters, method, url, headers=None, body=None,
- auth_data=None):
- """
- Decorate request with authentication data
- """
- return
-
- @abc.abstractmethod
- def _get_auth(self):
- return
-
- @abc.abstractmethod
- def _fill_credentials(self, auth_data_body):
- return
-
- def fill_credentials(self):
- """
- Fill credentials object with data from auth
- """
- auth_data = self.get_auth()
- self._fill_credentials(auth_data[1])
- return self.credentials
-
- @classmethod
- def check_credentials(cls, credentials):
- """
- Verify credentials are valid.
- """
- return isinstance(credentials, Credentials) and credentials.is_valid()
-
- @property
- def auth_data(self):
- return self.get_auth()
-
- @auth_data.deleter
- def auth_data(self):
- self.clear_auth()
-
- def get_auth(self):
- """
- Returns auth from cache if available, else auth first
- """
- if self.cache is None or self.is_expired(self.cache):
- self.set_auth()
- return self.cache
-
- def set_auth(self):
- """
- Forces setting auth, ignores cache if it exists.
- Refills credentials
- """
- self.cache = self._get_auth()
- self._fill_credentials(self.cache[1])
-
- def clear_auth(self):
- """
- Can be called to clear the access cache so that next request
- will fetch a new token and base_url.
- """
- self.cache = None
- self.credentials.reset()
-
- @abc.abstractmethod
- def is_expired(self, auth_data):
- return
-
- def auth_request(self, method, url, headers=None, body=None, filters=None):
- """
- Obtains auth data and decorates a request with that.
- :param method: HTTP method of the request
- :param url: relative URL of the request (path)
- :param headers: HTTP headers of the request
- :param body: HTTP body in case of POST / PUT
- :param filters: select a base URL out of the catalog
- :returns a Tuple (url, headers, body)
- """
- orig_req = dict(url=url, headers=headers, body=body)
-
- auth_url, auth_headers, auth_body = self._decorate_request(
- filters, method, url, headers, body)
- auth_req = dict(url=auth_url, headers=auth_headers, body=auth_body)
-
- # Overwrite part if the request if it has been requested
- if self.alt_part is not None:
- if self.alt_auth_data is not None:
- alt_url, alt_headers, alt_body = self._decorate_request(
- filters, method, url, headers, body,
- auth_data=self.alt_auth_data)
- alt_auth_req = dict(url=alt_url, headers=alt_headers,
- body=alt_body)
- auth_req[self.alt_part] = alt_auth_req[self.alt_part]
-
- else:
- # If alt auth data is None, skip auth in the requested part
- auth_req[self.alt_part] = orig_req[self.alt_part]
-
- # Next auth request will be normal, unless otherwise requested
- self.reset_alt_auth_data()
-
- return auth_req['url'], auth_req['headers'], auth_req['body']
-
- def reset_alt_auth_data(self):
- """
- Configure auth provider to provide valid authentication data
- """
- self.alt_part = None
- self.alt_auth_data = None
-
- def set_alt_auth_data(self, request_part, auth_data):
- """
- Configure auth provider to provide alt authentication data
- on a part of the *next* auth_request. If credentials are None,
- set invalid data.
- :param request_part: request part to contain invalid auth: url,
- headers, body
- :param auth_data: alternative auth_data from which to get the
- invalid data to be injected
- """
- self.alt_part = request_part
- self.alt_auth_data = auth_data
-
- @abc.abstractmethod
- def base_url(self, filters, auth_data=None):
- """
- Extracts the base_url based on provided filters
- """
- return
-
-
-class KeystoneAuthProvider(AuthProvider):
-
- token_expiry_threshold = datetime.timedelta(seconds=60)
-
- def __init__(self, credentials, auth_url,
- disable_ssl_certificate_validation=None,
- ca_certs=None, trace_requests=None):
- super(KeystoneAuthProvider, self).__init__(credentials)
- self.dsvm = disable_ssl_certificate_validation
- self.ca_certs = ca_certs
- self.trace_requests = trace_requests
- self.auth_client = self._auth_client(auth_url)
-
- def _decorate_request(self, filters, method, url, headers=None, body=None,
- auth_data=None):
- if auth_data is None:
- auth_data = self.auth_data
- token, _ = auth_data
- base_url = self.base_url(filters=filters, auth_data=auth_data)
- # build authenticated request
- # returns new request, it does not touch the original values
- _headers = copy.deepcopy(headers) if headers is not None else {}
- _headers['X-Auth-Token'] = str(token)
- if url is None or url == "":
- _url = base_url
- else:
- # Join base URL and url, and remove multiple contiguous slashes
- _url = "/".join([base_url, url])
- parts = [x for x in urlparse.urlparse(_url)]
- parts[2] = re.sub("/{2,}", "/", parts[2])
- _url = urlparse.urlunparse(parts)
- # no change to method or body
- return str(_url), _headers, body
-
- @abc.abstractmethod
- def _auth_client(self):
- return
-
- @abc.abstractmethod
- def _auth_params(self):
- return
-
- def _get_auth(self):
- # Bypasses the cache
- auth_func = getattr(self.auth_client, 'get_token')
- auth_params = self._auth_params()
-
- # returns token, auth_data
- token, auth_data = auth_func(**auth_params)
- return token, auth_data
-
- def get_token(self):
- return self.auth_data[0]
-
-
-class KeystoneV2AuthProvider(KeystoneAuthProvider):
-
- EXPIRY_DATE_FORMAT = '%Y-%m-%dT%H:%M:%SZ'
-
- def _auth_client(self, auth_url):
- return json_v2id.TokenClientJSON(
- auth_url, disable_ssl_certificate_validation=self.dsvm,
- ca_certs=self.ca_certs, trace_requests=self.trace_requests)
-
- def _auth_params(self):
- return dict(
- user=self.credentials.username,
- password=self.credentials.password,
- tenant=self.credentials.tenant_name,
- auth_data=True)
-
- def _fill_credentials(self, auth_data_body):
- tenant = auth_data_body['token']['tenant']
- user = auth_data_body['user']
- if self.credentials.tenant_name is None:
- self.credentials.tenant_name = tenant['name']
- if self.credentials.tenant_id is None:
- self.credentials.tenant_id = tenant['id']
- if self.credentials.username is None:
- self.credentials.username = user['name']
- if self.credentials.user_id is None:
- self.credentials.user_id = user['id']
-
- def base_url(self, filters, auth_data=None):
- """
- Filters can be:
- - service: compute, image, etc
- - region: the service region
- - endpoint_type: adminURL, publicURL, internalURL
- - api_version: replace catalog version with this
- - skip_path: take just the base URL
- """
- if auth_data is None:
- auth_data = self.auth_data
- token, _auth_data = auth_data
- service = filters.get('service')
- region = filters.get('region')
- endpoint_type = filters.get('endpoint_type', 'publicURL')
-
- if service is None:
- raise exceptions.EndpointNotFound("No service provided")
-
- _base_url = None
- for ep in _auth_data['serviceCatalog']:
- if ep["type"] == service:
- for _ep in ep['endpoints']:
- if region is not None and _ep['region'] == region:
- _base_url = _ep.get(endpoint_type)
- if not _base_url:
- # No region matching, use the first
- _base_url = ep['endpoints'][0].get(endpoint_type)
- break
- if _base_url is None:
- raise exceptions.EndpointNotFound(service)
-
- parts = urlparse.urlparse(_base_url)
- if filters.get('api_version', None) is not None:
- path = "/" + filters['api_version']
- noversion_path = "/".join(parts.path.split("/")[2:])
- if noversion_path != "":
- path += "/" + noversion_path
- _base_url = _base_url.replace(parts.path, path)
- if filters.get('skip_path', None) is not None and parts.path != '':
- _base_url = _base_url.replace(parts.path, "/")
-
- return _base_url
-
- def is_expired(self, auth_data):
- _, access = auth_data
- expiry = datetime.datetime.strptime(access['token']['expires'],
- self.EXPIRY_DATE_FORMAT)
- return expiry - self.token_expiry_threshold <= \
- datetime.datetime.utcnow()
-
-
-class KeystoneV3AuthProvider(KeystoneAuthProvider):
-
- EXPIRY_DATE_FORMAT = '%Y-%m-%dT%H:%M:%S.%fZ'
-
- def _auth_client(self, auth_url):
- return json_v3id.V3TokenClientJSON(
- auth_url, disable_ssl_certificate_validation=self.dsvm,
- ca_certs=self.ca_certs, trace_requests=self.trace_requests)
-
- def _auth_params(self):
- return dict(
- user_id=self.credentials.user_id,
- username=self.credentials.username,
- password=self.credentials.password,
- project_id=self.credentials.project_id,
- project_name=self.credentials.project_name,
- user_domain_id=self.credentials.user_domain_id,
- user_domain_name=self.credentials.user_domain_name,
- project_domain_id=self.credentials.project_domain_id,
- project_domain_name=self.credentials.project_domain_name,
- domain_id=self.credentials.domain_id,
- domain_name=self.credentials.domain_name,
- auth_data=True)
-
- def _fill_credentials(self, auth_data_body):
- # project or domain, depending on the scope
- project = auth_data_body.get('project', None)
- domain = auth_data_body.get('domain', None)
- # user is always there
- user = auth_data_body['user']
- # Set project fields
- if project is not None:
- if self.credentials.project_name is None:
- self.credentials.project_name = project['name']
- if self.credentials.project_id is None:
- self.credentials.project_id = project['id']
- if self.credentials.project_domain_id is None:
- self.credentials.project_domain_id = project['domain']['id']
- if self.credentials.project_domain_name is None:
- self.credentials.project_domain_name = \
- project['domain']['name']
- # Set domain fields
- if domain is not None:
- if self.credentials.domain_id is None:
- self.credentials.domain_id = domain['id']
- if self.credentials.domain_name is None:
- self.credentials.domain_name = domain['name']
- # Set user fields
- if self.credentials.username is None:
- self.credentials.username = user['name']
- if self.credentials.user_id is None:
- self.credentials.user_id = user['id']
- if self.credentials.user_domain_id is None:
- self.credentials.user_domain_id = user['domain']['id']
- if self.credentials.user_domain_name is None:
- self.credentials.user_domain_name = user['domain']['name']
-
- def base_url(self, filters, auth_data=None):
- """
- Filters can be:
- - service: compute, image, etc
- - region: the service region
- - endpoint_type: adminURL, publicURL, internalURL
- - api_version: replace catalog version with this
- - skip_path: take just the base URL
- """
- if auth_data is None:
- auth_data = self.auth_data
- token, _auth_data = auth_data
- service = filters.get('service')
- region = filters.get('region')
- endpoint_type = filters.get('endpoint_type', 'public')
-
- if service is None:
- raise exceptions.EndpointNotFound("No service provided")
-
- if 'URL' in endpoint_type:
- endpoint_type = endpoint_type.replace('URL', '')
- _base_url = None
- catalog = _auth_data['catalog']
- # Select entries with matching service type
- service_catalog = [ep for ep in catalog if ep['type'] == service]
- if len(service_catalog) > 0:
- service_catalog = service_catalog[0]['endpoints']
- else:
- # No matching service
- raise exceptions.EndpointNotFound(service)
- # Filter by endpoint type (interface)
- filtered_catalog = [ep for ep in service_catalog if
- ep['interface'] == endpoint_type]
- if len(filtered_catalog) == 0:
- # No matching type, keep all and try matching by region at least
- filtered_catalog = service_catalog
- # Filter by region
- filtered_catalog = [ep for ep in filtered_catalog if
- ep['region'] == region]
- if len(filtered_catalog) == 0:
- # No matching region, take the first endpoint
- filtered_catalog = [service_catalog[0]]
- # There should be only one match. If not take the first.
- _base_url = filtered_catalog[0].get('url', None)
- if _base_url is None:
- raise exceptions.EndpointNotFound(service)
-
- parts = urlparse.urlparse(_base_url)
- if filters.get('api_version', None) is not None:
- path = "/" + filters['api_version']
- noversion_path = "/".join(parts.path.split("/")[2:])
- if noversion_path != "":
- path += "/" + noversion_path
- _base_url = _base_url.replace(parts.path, path)
- if filters.get('skip_path', None) is not None:
- _base_url = _base_url.replace(parts.path, "/")
-
- return _base_url
-
- def is_expired(self, auth_data):
- _, access = auth_data
- expiry = datetime.datetime.strptime(access['expires_at'],
- self.EXPIRY_DATE_FORMAT)
- return expiry - self.token_expiry_threshold <= \
- datetime.datetime.utcnow()
-
-
-def is_identity_version_supported(identity_version):
- return identity_version in IDENTITY_VERSION
-
-
-def get_credentials(auth_url, fill_in=True, identity_version='v2',
- disable_ssl_certificate_validation=None, ca_certs=None,
- trace_requests=None, **kwargs):
- """
- Builds a credentials object based on the configured auth_version
-
- :param auth_url (string): Full URI of the OpenStack Identity API(Keystone)
- which is used to fetch the token from Identity service.
- :param fill_in (boolean): obtain a token and fill in all credential
- details provided by the identity service. When fill_in is not
- specified, credentials are not validated. Validation can be invoked
- by invoking ``is_valid()``
- :param identity_version (string): identity API version is used to
- select the matching auth provider and credentials class
- :param disable_ssl_certificate_validation: whether to enforce SSL
- certificate validation in SSL API requests to the auth system
- :param ca_certs: CA certificate bundle for validation of certificates
- in SSL API requests to the auth system
- :param trace_requests: trace in log API requests to the auth system
- :param kwargs (dict): Dict of credential key/value pairs
-
- Examples:
-
- Returns credentials from the provided parameters:
- >>> get_credentials(username='foo', password='bar')
-
- Returns credentials including IDs:
- >>> get_credentials(username='foo', password='bar', fill_in=True)
- """
- if not is_identity_version_supported(identity_version):
- raise exceptions.InvalidIdentityVersion(
- identity_version=identity_version)
-
- credential_class, auth_provider_class = IDENTITY_VERSION.get(
- identity_version)
-
- creds = credential_class(**kwargs)
- # Fill in the credentials fields that were not specified
- if fill_in:
- dsvm = disable_ssl_certificate_validation
- auth_provider = auth_provider_class(
- creds, auth_url, disable_ssl_certificate_validation=dsvm,
- ca_certs=ca_certs, trace_requests=trace_requests)
- creds = auth_provider.fill_credentials()
- return creds
-
-
-class Credentials(object):
- """
- Set of credentials for accessing OpenStack services
-
- ATTRIBUTES: list of valid class attributes representing credentials.
- """
-
- ATTRIBUTES = []
-
- def __init__(self, **kwargs):
- """
- Enforce the available attributes at init time (only).
- Additional attributes can still be set afterwards if tests need
- to do so.
- """
- self._initial = kwargs
- self._apply_credentials(kwargs)
-
- def _apply_credentials(self, attr):
- for key in attr.keys():
- if key in self.ATTRIBUTES:
- setattr(self, key, attr[key])
- else:
- raise exceptions.InvalidCredentials
-
- def __str__(self):
- """
- Represent only attributes included in self.ATTRIBUTES
- """
- _repr = dict((k, getattr(self, k)) for k in self.ATTRIBUTES)
- return str(_repr)
-
- def __eq__(self, other):
- """
- Credentials are equal if attributes in self.ATTRIBUTES are equal
- """
- return str(self) == str(other)
-
- def __getattr__(self, key):
- # If an attribute is set, __getattr__ is not invoked
- # If an attribute is not set, and it is a known one, return None
- if key in self.ATTRIBUTES:
- return None
- else:
- raise AttributeError
-
- def __delitem__(self, key):
- # For backwards compatibility, support dict behaviour
- if key in self.ATTRIBUTES:
- delattr(self, key)
- else:
- raise AttributeError
-
- def get(self, item, default):
- # In this patch act as dict for backward compatibility
- try:
- return getattr(self, item)
- except AttributeError:
- return default
-
- def get_init_attributes(self):
- return self._initial.keys()
-
- def is_valid(self):
- raise NotImplementedError
-
- def reset(self):
- # First delete all known attributes
- for key in self.ATTRIBUTES:
- if getattr(self, key) is not None:
- delattr(self, key)
- # Then re-apply initial setup
- self._apply_credentials(self._initial)
-
-
-class KeystoneV2Credentials(Credentials):
-
- ATTRIBUTES = ['username', 'password', 'tenant_name', 'user_id',
- 'tenant_id']
-
- def is_valid(self):
- """
- Minimum set of valid credentials, are username and password.
- Tenant is optional.
- """
- return None not in (self.username, self.password)
-
-
-class KeystoneV3Credentials(Credentials):
- """
- Credentials suitable for the Keystone Identity V3 API
- """
-
- ATTRIBUTES = ['domain_id', 'domain_name', 'password', 'username',
- 'project_domain_id', 'project_domain_name', 'project_id',
- 'project_name', 'tenant_id', 'tenant_name', 'user_domain_id',
- 'user_domain_name', 'user_id']
-
- def __setattr__(self, key, value):
- parent = super(KeystoneV3Credentials, self)
- # for tenant_* set both project and tenant
- if key == 'tenant_id':
- parent.__setattr__('project_id', value)
- elif key == 'tenant_name':
- parent.__setattr__('project_name', value)
- # for project_* set both project and tenant
- if key == 'project_id':
- parent.__setattr__('tenant_id', value)
- elif key == 'project_name':
- parent.__setattr__('tenant_name', value)
- # for *_domain_* set both user and project if not set yet
- if key == 'user_domain_id':
- if self.project_domain_id is None:
- parent.__setattr__('project_domain_id', value)
- if key == 'project_domain_id':
- if self.user_domain_id is None:
- parent.__setattr__('user_domain_id', value)
- if key == 'user_domain_name':
- if self.project_domain_name is None:
- parent.__setattr__('project_domain_name', value)
- if key == 'project_domain_name':
- if self.user_domain_name is None:
- parent.__setattr__('user_domain_name', value)
- # support domain_name coming from config
- if key == 'domain_name':
- parent.__setattr__('user_domain_name', value)
- parent.__setattr__('project_domain_name', value)
- # finally trigger default behaviour for all attributes
- parent.__setattr__(key, value)
-
- def is_valid(self):
- """
- Valid combinations of v3 credentials (excluding token, scope)
- - User id, password (optional domain)
- - User name, password and its domain id/name
- For the scope, valid combinations are:
- - None
- - Project id (optional domain)
- - Project name and its domain id/name
- - Domain id
- - Domain name
- """
- valid_user_domain = any(
- [self.user_domain_id is not None,
- self.user_domain_name is not None])
- valid_project_domain = any(
- [self.project_domain_id is not None,
- self.project_domain_name is not None])
- valid_user = any(
- [self.user_id is not None,
- self.username is not None and valid_user_domain])
- valid_project_scope = any(
- [self.project_name is None and self.project_id is None,
- self.project_id is not None,
- self.project_name is not None and valid_project_domain])
- valid_domain_scope = any(
- [self.domain_id is None and self.domain_name is None,
- self.domain_id or self.domain_name])
- return all([self.password is not None,
- valid_user,
- valid_project_scope and valid_domain_scope])
-
-
-IDENTITY_VERSION = {'v2': (KeystoneV2Credentials, KeystoneV2AuthProvider),
- 'v3': (KeystoneV3Credentials, KeystoneV3AuthProvider)}
diff --git a/neutron/tests/tempest/common/__init__.py b/neutron/tests/tempest/common/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/neutron/tests/tempest/common/__init__.py
+++ /dev/null
diff --git a/neutron/tests/tempest/common/accounts.py b/neutron/tests/tempest/common/accounts.py
deleted file mode 100644
index 6440739..0000000
--- a/neutron/tests/tempest/common/accounts.py
+++ /dev/null
@@ -1,357 +0,0 @@
-# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import hashlib
-import os
-
-from oslo_concurrency import lockutils
-from oslo_log import log as logging
-import yaml
-
-from neutron.tests.tempest.common import cred_provider
-from neutron.tests.tempest import config
-from neutron.tests.tempest import exceptions
-
-CONF = config.CONF
-LOG = logging.getLogger(__name__)
-
-
-def read_accounts_yaml(path):
- with open(path, 'r') as yaml_file:
- accounts = yaml.load(yaml_file)
- return accounts
-
-
-class Accounts(cred_provider.CredentialProvider):
-
- def __init__(self, name):
- super(Accounts, self).__init__(name)
- self.name = name
- if os.path.isfile(CONF.auth.test_accounts_file):
- accounts = read_accounts_yaml(CONF.auth.test_accounts_file)
- self.use_default_creds = False
- else:
- accounts = {}
- self.use_default_creds = True
- self.hash_dict = self.get_hash_dict(accounts)
- # FIXME(dhellmann): The configuration option is not part of
- # the API of the library, because if we change the option name
- # or group it will break this use. Tempest needs to set this
- # value somewhere that it owns, and then use
- # lockutils.set_defaults() to tell oslo.concurrency what value
- # to use.
- self.accounts_dir = os.path.join(CONF.oslo_concurrency.lock_path,
- 'test_accounts')
- self.isolated_creds = {}
-
- @classmethod
- def _append_role(cls, role, account_hash, hash_dict):
- if role in hash_dict['roles']:
- hash_dict['roles'][role].append(account_hash)
- else:
- hash_dict['roles'][role] = [account_hash]
- return hash_dict
-
- @classmethod
- def get_hash_dict(cls, accounts):
- hash_dict = {'roles': {}, 'creds': {}}
- # Loop over the accounts read from the yaml file
- for account in accounts:
- roles = []
- types = []
- if 'roles' in account:
- roles = account.pop('roles')
- if 'types' in account:
- types = account.pop('types')
- temp_hash = hashlib.md5()
- temp_hash.update(str(account))
- temp_hash_key = temp_hash.hexdigest()
- hash_dict['creds'][temp_hash_key] = account
- for role in roles:
- hash_dict = cls._append_role(role, temp_hash_key,
- hash_dict)
- # If types are set for the account append the matching role
- # subdict with the hash
- for type in types:
- if type == 'admin':
- hash_dict = cls._append_role(CONF.identity.admin_role,
- temp_hash_key, hash_dict)
- elif type == 'operator':
- hash_dict = cls._append_role(
- CONF.object_storage.operator_role, temp_hash_key,
- hash_dict)
- elif type == 'reseller_admin':
- hash_dict = cls._append_role(
- CONF.object_storage.reseller_admin_role,
- temp_hash_key,
- hash_dict)
- return hash_dict
-
- def is_multi_user(self):
- # Default credentials is not a valid option with locking Account
- if self.use_default_creds:
- raise exceptions.InvalidConfiguration(
- "Account file %s doesn't exist" % CONF.auth.test_accounts_file)
- else:
- return len(self.hash_dict['creds']) > 1
-
- def is_multi_tenant(self):
- return self.is_multi_user()
-
- def _create_hash_file(self, hash_string):
- path = os.path.join(os.path.join(self.accounts_dir, hash_string))
- if not os.path.isfile(path):
- with open(path, 'w') as fd:
- fd.write(self.name)
- return True
- return False
-
- @lockutils.synchronized('test_accounts_io', external=True)
- def _get_free_hash(self, hashes):
- # Cast as a list because in some edge cases a set will be passed in
- hashes = list(hashes)
- if not os.path.isdir(self.accounts_dir):
- os.mkdir(self.accounts_dir)
- # Create File from first hash (since none are in use)
- self._create_hash_file(hashes[0])
- return hashes[0]
- names = []
- for _hash in hashes:
- res = self._create_hash_file(_hash)
- if res:
- return _hash
- else:
- path = os.path.join(os.path.join(self.accounts_dir,
- _hash))
- with open(path, 'r') as fd:
- names.append(fd.read())
- msg = ('Insufficient number of users provided. %s have allocated all '
- 'the credentials for this allocation request' % ','.join(names))
- raise exceptions.InvalidConfiguration(msg)
-
- def _get_match_hash_list(self, roles=None):
- hashes = []
- if roles:
- # Loop over all the creds for each role in the subdict and generate
- # a list of cred lists for each role
- for role in roles:
- temp_hashes = self.hash_dict['roles'].get(role, None)
- if not temp_hashes:
- raise exceptions.InvalidConfiguration(
- "No credentials with role: %s specified in the "
- "accounts ""file" % role)
- hashes.append(temp_hashes)
- # Take the list of lists and do a boolean and between each list to
- # find the creds which fall under all the specified roles
- temp_list = set(hashes[0])
- for hash_list in hashes[1:]:
- temp_list = temp_list & set(hash_list)
- hashes = temp_list
- else:
- hashes = self.hash_dict['creds'].keys()
- # NOTE(mtreinish): admin is a special case because of the increased
- # privlege set which could potentially cause issues on tests where that
- # is not expected. So unless the admin role isn't specified do not
- # allocate admin.
- admin_hashes = self.hash_dict['roles'].get(CONF.identity.admin_role,
- None)
- if ((not roles or CONF.identity.admin_role not in roles) and
- admin_hashes):
- useable_hashes = [x for x in hashes if x not in admin_hashes]
- else:
- useable_hashes = hashes
- return useable_hashes
-
- def _get_creds(self, roles=None):
- if self.use_default_creds:
- raise exceptions.InvalidConfiguration(
- "Account file %s doesn't exist" % CONF.auth.test_accounts_file)
- useable_hashes = self._get_match_hash_list(roles)
- free_hash = self._get_free_hash(useable_hashes)
- return self.hash_dict['creds'][free_hash]
-
- @lockutils.synchronized('test_accounts_io', external=True)
- def remove_hash(self, hash_string):
- hash_path = os.path.join(self.accounts_dir, hash_string)
- if not os.path.isfile(hash_path):
- LOG.warning('Expected an account lock file %s to remove, but '
- 'one did not exist' % hash_path)
- else:
- os.remove(hash_path)
- if not os.listdir(self.accounts_dir):
- os.rmdir(self.accounts_dir)
-
- def get_hash(self, creds):
- for _hash in self.hash_dict['creds']:
- # Comparing on the attributes that are expected in the YAML
- if all([getattr(creds, k) == self.hash_dict['creds'][_hash][k] for
- k in creds.get_init_attributes()]):
- return _hash
- raise AttributeError('Invalid credentials %s' % creds)
-
- def remove_credentials(self, creds):
- _hash = self.get_hash(creds)
- self.remove_hash(_hash)
-
- def get_primary_creds(self):
- if self.isolated_creds.get('primary'):
- return self.isolated_creds.get('primary')
- creds = self._get_creds()
- primary_credential = cred_provider.get_credentials(**creds)
- self.isolated_creds['primary'] = primary_credential
- return primary_credential
-
- def get_alt_creds(self):
- if self.isolated_creds.get('alt'):
- return self.isolated_creds.get('alt')
- creds = self._get_creds()
- alt_credential = cred_provider.get_credentials(**creds)
- self.isolated_creds['alt'] = alt_credential
- return alt_credential
-
- def get_creds_by_roles(self, roles, force_new=False):
- roles = list(set(roles))
- exist_creds = self.isolated_creds.get(str(roles), None)
- # The force kwarg is used to allocate an additional set of creds with
- # the same role list. The index used for the previously allocation
- # in the isolated_creds dict will be moved.
- if exist_creds and not force_new:
- return exist_creds
- elif exist_creds and force_new:
- new_index = str(roles) + '-' + str(len(self.isolated_creds))
- self.isolated_creds[new_index] = exist_creds
- creds = self._get_creds(roles=roles)
- role_credential = cred_provider.get_credentials(**creds)
- self.isolated_creds[str(roles)] = role_credential
- return role_credential
-
- def clear_isolated_creds(self):
- for creds in self.isolated_creds.values():
- self.remove_credentials(creds)
-
- def get_admin_creds(self):
- return self.get_creds_by_roles([CONF.identity.admin_role])
-
- def is_role_available(self, role):
- if self.use_default_creds:
- return False
- else:
- if self.hash_dict['roles'].get(role):
- return True
- return False
-
- def admin_available(self):
- return self.is_role_available(CONF.identity.admin_role)
-
-
-class NotLockingAccounts(Accounts):
- """Credentials provider which always returns the first and second
- configured accounts as primary and alt users.
- This credential provider can be used in case of serial test execution
- to preserve the current behaviour of the serial tempest run.
- """
-
- def _unique_creds(self, cred_arg=None):
- """Verify that the configured credentials are valid and distinct """
- if self.use_default_creds:
- try:
- user = self.get_primary_creds()
- alt_user = self.get_alt_creds()
- return getattr(user, cred_arg) != getattr(alt_user, cred_arg)
- except exceptions.InvalidCredentials as ic:
- msg = "At least one of the configured credentials is " \
- "not valid: %s" % ic
- raise exceptions.InvalidConfiguration(msg)
- else:
- # TODO(andreaf) Add a uniqueness check here
- return len(self.hash_dict['creds']) > 1
-
- def is_multi_user(self):
- return self._unique_creds('username')
-
- def is_multi_tenant(self):
- return self._unique_creds('tenant_id')
-
- def get_creds(self, id, roles=None):
- try:
- hashes = self._get_match_hash_list(roles)
- # No need to sort the dict as within the same python process
- # the HASH seed won't change, so subsequent calls to keys()
- # will return the same result
- _hash = hashes[id]
- except IndexError:
- msg = 'Insufficient number of users provided'
- raise exceptions.InvalidConfiguration(msg)
- return self.hash_dict['creds'][_hash]
-
- def get_primary_creds(self):
- if self.isolated_creds.get('primary'):
- return self.isolated_creds.get('primary')
- if not self.use_default_creds:
- creds = self.get_creds(0)
- primary_credential = cred_provider.get_credentials(**creds)
- else:
- primary_credential = cred_provider.get_configured_credentials(
- 'user')
- self.isolated_creds['primary'] = primary_credential
- return primary_credential
-
- def get_alt_creds(self):
- if self.isolated_creds.get('alt'):
- return self.isolated_creds.get('alt')
- if not self.use_default_creds:
- creds = self.get_creds(1)
- alt_credential = cred_provider.get_credentials(**creds)
- else:
- alt_credential = cred_provider.get_configured_credentials(
- 'alt_user')
- self.isolated_creds['alt'] = alt_credential
- return alt_credential
-
- def clear_isolated_creds(self):
- self.isolated_creds = {}
-
- def get_admin_creds(self):
- if not self.use_default_creds:
- return self.get_creds_by_roles([CONF.identity.admin_role])
- else:
- creds = cred_provider.get_configured_credentials(
- "identity_admin", fill_in=False)
- self.isolated_creds['admin'] = creds
- return creds
-
- def get_creds_by_roles(self, roles, force_new=False):
- roles = list(set(roles))
- exist_creds = self.isolated_creds.get(str(roles), None)
- index = 0
- if exist_creds and not force_new:
- return exist_creds
- elif exist_creds and force_new:
- new_index = str(roles) + '-' + str(len(self.isolated_creds))
- self.isolated_creds[new_index] = exist_creds
- # Figure out how many existing creds for this roles set are present
- # use this as the index the returning hash list to ensure separate
- # creds are returned with force_new being True
- for creds_names in self.isolated_creds:
- if str(roles) in creds_names:
- index = index + 1
- if not self.use_default_creds:
- creds = self.get_creds(index, roles=roles)
- role_credential = cred_provider.get_credentials(**creds)
- self.isolated_creds[str(roles)] = role_credential
- else:
- msg = "Default credentials can not be used with specifying "\
- "credentials by roles"
- raise exceptions.InvalidConfiguration(msg)
- return role_credential
diff --git a/neutron/tests/tempest/common/commands.py b/neutron/tests/tempest/common/commands.py
deleted file mode 100644
index 392c9d0..0000000
--- a/neutron/tests/tempest/common/commands.py
+++ /dev/null
@@ -1,39 +0,0 @@
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import shlex
-import subprocess
-
-from oslo_log import log as logging
-
-LOG = logging.getLogger(__name__)
-
-
-def copy_file_to_host(file_from, dest, host, username, pkey):
- dest = "%s@%s:%s" % (username, host, dest)
- cmd = "scp -v -o UserKnownHostsFile=/dev/null " \
- "-o StrictHostKeyChecking=no " \
- "-i %(pkey)s %(file1)s %(dest)s" % {'pkey': pkey,
- 'file1': file_from,
- 'dest': dest}
- args = shlex.split(cmd.encode('utf-8'))
- subprocess_args = {'stdout': subprocess.PIPE,
- 'stderr': subprocess.STDOUT}
- proc = subprocess.Popen(args, **subprocess_args)
- stdout, stderr = proc.communicate()
- if proc.returncode != 0:
- LOG.error(("Command {0} returned with exit status {1},"
- "output {2}, error {3}").format(cmd, proc.returncode,
- stdout, stderr))
- return stdout
diff --git a/neutron/tests/tempest/common/cred_provider.py b/neutron/tests/tempest/common/cred_provider.py
deleted file mode 100644
index b90d09d..0000000
--- a/neutron/tests/tempest/common/cred_provider.py
+++ /dev/null
@@ -1,129 +0,0 @@
-# Copyright (c) 2014 Deutsche Telekom AG
-# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import abc
-
-import six
-
-from neutron.tests.tempest import auth
-from neutron.tests.tempest import config
-from neutron.tests.tempest import exceptions
-
-CONF = config.CONF
-
-# Type of credentials available from configuration
-CREDENTIAL_TYPES = {
- 'identity_admin': ('identity', 'admin'),
- 'user': ('identity', None),
- 'alt_user': ('identity', 'alt')
-}
-
-DEFAULT_PARAMS = {
- 'disable_ssl_certificate_validation':
- CONF.identity.disable_ssl_certificate_validation,
- 'ca_certs': CONF.identity.ca_certificates_file,
- 'trace_requests': CONF.debug.trace_requests
-}
-
-
-# Read credentials from configuration, builds a Credentials object
-# based on the specified or configured version
-def get_configured_credentials(credential_type, fill_in=True,
- identity_version=None):
- identity_version = identity_version or CONF.identity.auth_version
- if identity_version not in ('v2', 'v3'):
- raise exceptions.InvalidConfiguration(
- 'Unsupported auth version: %s' % identity_version)
- if credential_type not in CREDENTIAL_TYPES:
- raise exceptions.InvalidCredentials()
- conf_attributes = ['username', 'password', 'tenant_name']
- if identity_version == 'v3':
- conf_attributes.append('domain_name')
- # Read the parts of credentials from config
- params = DEFAULT_PARAMS.copy()
- section, prefix = CREDENTIAL_TYPES[credential_type]
- for attr in conf_attributes:
- _section = getattr(CONF, section)
- if prefix is None:
- params[attr] = getattr(_section, attr)
- else:
- params[attr] = getattr(_section, prefix + "_" + attr)
- # Build and validate credentials. We are reading configured credentials,
- # so validate them even if fill_in is False
- credentials = get_credentials(fill_in=fill_in, **params)
- if not fill_in:
- if not credentials.is_valid():
- msg = ("The %s credentials are incorrectly set in the config file."
- " Double check that all required values are assigned" %
- credential_type)
- raise exceptions.InvalidConfiguration(msg)
- return credentials
-
-
-# Wrapper around auth.get_credentials to use the configured identity version
-# is none is specified
-def get_credentials(fill_in=True, identity_version=None, **kwargs):
- params = dict(DEFAULT_PARAMS, **kwargs)
- identity_version = identity_version or CONF.identity.auth_version
- # In case of "v3" add the domain from config if not specified
- if identity_version == 'v3':
- domain_fields = set(x for x in auth.KeystoneV3Credentials.ATTRIBUTES
- if 'domain' in x)
- if not domain_fields.intersection(kwargs.keys()):
- kwargs['user_domain_name'] = CONF.identity.admin_domain_name
- auth_url = CONF.identity.uri_v3
- else:
- auth_url = CONF.identity.uri
- return auth.get_credentials(auth_url,
- fill_in=fill_in,
- identity_version=identity_version,
- **params)
-
-
-@six.add_metaclass(abc.ABCMeta)
-class CredentialProvider(object):
- def __init__(self, name, password='pass', network_resources=None):
- self.name = name
-
- @abc.abstractmethod
- def get_primary_creds(self):
- return
-
- @abc.abstractmethod
- def get_admin_creds(self):
- return
-
- @abc.abstractmethod
- def get_alt_creds(self):
- return
-
- @abc.abstractmethod
- def clear_isolated_creds(self):
- return
-
- @abc.abstractmethod
- def is_multi_user(self):
- return
-
- @abc.abstractmethod
- def is_multi_tenant(self):
- return
-
- @abc.abstractmethod
- def get_creds_by_roles(self, roles, force_new=False):
- return
-
- @abc.abstractmethod
- def is_role_available(self, role):
- return
diff --git a/neutron/tests/tempest/common/credentials.py b/neutron/tests/tempest/common/credentials.py
deleted file mode 100644
index 9dfdff0..0000000
--- a/neutron/tests/tempest/common/credentials.py
+++ /dev/null
@@ -1,65 +0,0 @@
-# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-
-from neutron.tests.tempest.common import accounts
-from neutron.tests.tempest.common import cred_provider
-from neutron.tests.tempest.common import isolated_creds
-from neutron.tests.tempest import config
-from neutron.tests.tempest import exceptions
-
-CONF = config.CONF
-
-
-# Return the right implementation of CredentialProvider based on config
-# Dropping interface and password, as they are never used anyways
-# TODO(andreaf) Drop them from the CredentialsProvider interface completely
-def get_isolated_credentials(name, network_resources=None,
- force_tenant_isolation=False):
- # If a test requires a new account to work, it can have it via forcing
- # tenant isolation. A new account will be produced only for that test.
- # In case admin credentials are not available for the account creation,
- # the test should be skipped else it would fail.
- if CONF.auth.allow_tenant_isolation or force_tenant_isolation:
- return isolated_creds.IsolatedCreds(
- name=name,
- network_resources=network_resources)
- else:
- if CONF.auth.locking_credentials_provider:
- # Most params are not relevant for pre-created accounts
- return accounts.Accounts(name=name)
- else:
- return accounts.NotLockingAccounts(name=name)
-
-
-# We want a helper function here to check and see if admin credentials
-# are available so we can do a single call from skip_checks if admin
-# creds are available.
-def is_admin_available():
- is_admin = True
- # If tenant isolation is enabled admin will be available
- if CONF.auth.allow_tenant_isolation:
- return is_admin
- # Check whether test accounts file has the admin specified or not
- elif os.path.isfile(CONF.auth.test_accounts_file):
- check_accounts = accounts.Accounts(name='check_admin')
- if not check_accounts.admin_available():
- is_admin = False
- else:
- try:
- cred_provider.get_configured_credentials('identity_admin',
- fill_in=False)
- except exceptions.InvalidConfiguration:
- is_admin = False
- return is_admin
diff --git a/neutron/tests/tempest/common/custom_matchers.py b/neutron/tests/tempest/common/custom_matchers.py
deleted file mode 100644
index 839088c..0000000
--- a/neutron/tests/tempest/common/custom_matchers.py
+++ /dev/null
@@ -1,227 +0,0 @@
-# Copyright 2013 NTT Corporation
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import re
-
-import six
-from testtools import helpers
-
-
-class ExistsAllResponseHeaders(object):
- """
- Specific matcher to check the existence of Swift's response headers
-
- This matcher checks the existence of common headers for each HTTP method
- or the target, which means account, container or object.
- When checking the existence of 'specific' headers such as
- X-Account-Meta-* or X-Object-Manifest for example, those headers must be
- checked in each test code.
- """
-
- def __init__(self, target, method):
- """
- param: target Account/Container/Object
- param: method PUT/GET/HEAD/DELETE/COPY/POST
- """
- self.target = target
- self.method = method
-
- def match(self, actual):
- """
- param: actual HTTP response headers
- """
- # Check common headers for all HTTP methods
- if 'content-length' not in actual:
- return NonExistentHeader('content-length')
- if 'content-type' not in actual:
- return NonExistentHeader('content-type')
- if 'x-trans-id' not in actual:
- return NonExistentHeader('x-trans-id')
- if 'date' not in actual:
- return NonExistentHeader('date')
-
- # Check headers for a specific method or target
- if self.method == 'GET' or self.method == 'HEAD':
- if 'x-timestamp' not in actual:
- return NonExistentHeader('x-timestamp')
- if 'accept-ranges' not in actual:
- return NonExistentHeader('accept-ranges')
- if self.target == 'Account':
- if 'x-account-bytes-used' not in actual:
- return NonExistentHeader('x-account-bytes-used')
- if 'x-account-container-count' not in actual:
- return NonExistentHeader('x-account-container-count')
- if 'x-account-object-count' not in actual:
- return NonExistentHeader('x-account-object-count')
- elif self.target == 'Container':
- if 'x-container-bytes-used' not in actual:
- return NonExistentHeader('x-container-bytes-used')
- if 'x-container-object-count' not in actual:
- return NonExistentHeader('x-container-object-count')
- elif self.target == 'Object':
- if 'etag' not in actual:
- return NonExistentHeader('etag')
- if 'last-modified' not in actual:
- return NonExistentHeader('last-modified')
- elif self.method == 'PUT':
- if self.target == 'Object':
- if 'etag' not in actual:
- return NonExistentHeader('etag')
- if 'last-modified' not in actual:
- return NonExistentHeader('last-modified')
- elif self.method == 'COPY':
- if self.target == 'Object':
- if 'etag' not in actual:
- return NonExistentHeader('etag')
- if 'last-modified' not in actual:
- return NonExistentHeader('last-modified')
- if 'x-copied-from' not in actual:
- return NonExistentHeader('x-copied-from')
- if 'x-copied-from-last-modified' not in actual:
- return NonExistentHeader('x-copied-from-last-modified')
-
- return None
-
-
-class NonExistentHeader(object):
- """
- Informs an error message for end users in the case of missing a
- certain header in Swift's responses
- """
-
- def __init__(self, header):
- self.header = header
-
- def describe(self):
- return "%s header does not exist" % self.header
-
- def get_details(self):
- return {}
-
-
-class AreAllWellFormatted(object):
- """
- Specific matcher to check the correctness of formats of values of Swift's
- response headers
-
- This matcher checks the format of values of response headers.
- When checking the format of values of 'specific' headers such as
- X-Account-Meta-* or X-Object-Manifest for example, those values must be
- checked in each test code.
- """
-
- def match(self, actual):
- for key, value in six.iteritems(actual):
- if key in ('content-length', 'x-account-bytes-used',
- 'x-account-container-count', 'x-account-object-count',
- 'x-container-bytes-used', 'x-container-object-count')\
- and not value.isdigit():
- return InvalidFormat(key, value)
- elif key in ('content-type', 'date', 'last-modified',
- 'x-copied-from-last-modified') and not value:
- return InvalidFormat(key, value)
- elif key == 'x-timestamp' and not re.match("^\d+\.?\d*\Z", value):
- return InvalidFormat(key, value)
- elif key == 'x-copied-from' and not re.match("\S+/\S+", value):
- return InvalidFormat(key, value)
- elif key == 'x-trans-id' and \
- not re.match("^tx[0-9a-f]{21}-[0-9a-f]{10}.*", value):
- return InvalidFormat(key, value)
- elif key == 'accept-ranges' and not value == 'bytes':
- return InvalidFormat(key, value)
- elif key == 'etag' and not value.isalnum():
- return InvalidFormat(key, value)
- elif key == 'transfer-encoding' and not value == 'chunked':
- return InvalidFormat(key, value)
-
- return None
-
-
-class InvalidFormat(object):
- """
- Informs an error message for end users if a format of a certain header
- is invalid
- """
-
- def __init__(self, key, value):
- self.key = key
- self.value = value
-
- def describe(self):
- return "InvalidFormat (%s, %s)" % (self.key, self.value)
-
- def get_details(self):
- return {}
-
-
-class MatchesDictExceptForKeys(object):
- """Matches two dictionaries. Verifies all items are equals except for those
- identified by a list of keys.
- """
-
- def __init__(self, expected, excluded_keys=None):
- self.expected = expected
- self.excluded_keys = excluded_keys if excluded_keys is not None else []
-
- def match(self, actual):
- filtered_expected = helpers.dict_subtract(self.expected,
- self.excluded_keys)
- filtered_actual = helpers.dict_subtract(actual,
- self.excluded_keys)
- if filtered_actual != filtered_expected:
- return DictMismatch(filtered_expected, filtered_actual)
-
-
-class DictMismatch(object):
- """Mismatch between two dicts describes deltas"""
-
- def __init__(self, expected, actual):
- self.expected = expected
- self.actual = actual
- self.intersect = set(self.expected) & set(self.actual)
- self.symmetric_diff = set(self.expected) ^ set(self.actual)
-
- def _format_dict(self, dict_to_format):
- # Ensure the error string dict is printed in a set order
- # NOTE(mtreinish): needed to ensure a deterministic error msg for
- # testing. Otherwise the error message will be dependent on the
- # dict ordering.
- dict_string = "{"
- for key in sorted(dict_to_format):
- dict_string += "'%s': %s, " % (key, dict_to_format[key])
- dict_string = dict_string[:-2] + '}'
- return dict_string
-
- def describe(self):
- msg = ""
- if self.symmetric_diff:
- only_expected = helpers.dict_subtract(self.expected, self.actual)
- only_actual = helpers.dict_subtract(self.actual, self.expected)
- if only_expected:
- msg += "Only in expected:\n %s\n" % self._format_dict(
- only_expected)
- if only_actual:
- msg += "Only in actual:\n %s\n" % self._format_dict(
- only_actual)
- diff_set = set(o for o in self.intersect if
- self.expected[o] != self.actual[o])
- if diff_set:
- msg += "Differences:\n"
- for o in diff_set:
- msg += " %s: expected %s, actual %s\n" % (
- o, self.expected[o], self.actual[o])
- return msg
-
- def get_details(self):
- return {}
diff --git a/neutron/tests/tempest/common/generator/__init__.py b/neutron/tests/tempest/common/generator/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/neutron/tests/tempest/common/generator/__init__.py
+++ /dev/null
diff --git a/neutron/tests/tempest/common/generator/base_generator.py b/neutron/tests/tempest/common/generator/base_generator.py
deleted file mode 100644
index 41ed48c..0000000
--- a/neutron/tests/tempest/common/generator/base_generator.py
+++ /dev/null
@@ -1,179 +0,0 @@
-# Copyright 2014 Deutsche Telekom AG
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import copy
-import functools
-
-import jsonschema
-import six
-
-
-def _check_for_expected_result(name, schema):
- expected_result = None
- if "results" in schema:
- if name in schema["results"]:
- expected_result = schema["results"][name]
- return expected_result
-
-
-def generator_type(*args, **kwargs):
- def wrapper(func):
- func.types = args
- for key in kwargs:
- setattr(func, key, kwargs[key])
- return func
- return wrapper
-
-
-def simple_generator(fn):
- """
- Decorator for simple generators that return one value
- """
- @functools.wraps(fn)
- def wrapped(self, schema):
- result = fn(self, schema)
- if result is not None:
- expected_result = _check_for_expected_result(fn.__name__, schema)
- return (fn.__name__, result, expected_result)
- return
- return wrapped
-
-
-class BasicGeneratorSet(object):
- _instance = None
-
- schema = {
- "type": "object",
- "properties": {
- "name": {"type": "string"},
- "http-method": {
- "enum": ["GET", "PUT", "HEAD",
- "POST", "PATCH", "DELETE", 'COPY']
- },
- "admin_client": {"type": "boolean"},
- "url": {"type": "string"},
- "default_result_code": {"type": "integer"},
- "json-schema": {},
- "resources": {
- "type": "array",
- "items": {
- "oneOf": [
- {"type": "string"},
- {
- "type": "object",
- "properties": {
- "name": {"type": "string"},
- "expected_result": {"type": "integer"}
- }
- }
- ]
- }
- },
- "results": {
- "type": "object",
- "properties": {}
- }
- },
- "required": ["name", "http-method", "url"],
- "additionalProperties": False,
- }
-
- def __init__(self):
- self.types_dict = {}
- for m in dir(self):
- if callable(getattr(self, m)) and not'__' in m:
- method = getattr(self, m)
- if hasattr(method, "types"):
- for type in method.types:
- if type not in self.types_dict:
- self.types_dict[type] = []
- self.types_dict[type].append(method)
-
- def validate_schema(self, schema):
- if "json-schema" in schema:
- jsonschema.Draft4Validator.check_schema(schema['json-schema'])
- jsonschema.validate(schema, self.schema)
-
- def generate_scenarios(self, schema, path=None):
- """
- Generates the scenario (all possible test cases) out of the given
- schema.
-
- :param schema: a dict style schema (see ``BasicGeneratorSet.schema``)
- :param path: the schema path if the given schema is a subschema
- """
- schema_type = schema['type']
- scenarios = []
-
- if schema_type == 'object':
- properties = schema["properties"]
- for attribute, definition in six.iteritems(properties):
- current_path = copy.copy(path)
- if path is not None:
- current_path.append(attribute)
- else:
- current_path = [attribute]
- scenarios.extend(
- self.generate_scenarios(definition, current_path))
- elif isinstance(schema_type, list):
- if "integer" in schema_type:
- schema_type = "integer"
- else:
- raise Exception("non-integer list types not supported")
- for generator in self.types_dict[schema_type]:
- if hasattr(generator, "needed_property"):
- prop = generator.needed_property
- if (prop not in schema or
- schema[prop] is None or
- schema[prop] is False):
- continue
-
- name = generator.__name__
- if ("exclude_tests" in schema and
- name in schema["exclude_tests"]):
- continue
- if path is not None:
- name = "%s_%s" % ("_".join(path), name)
- scenarios.append({
- "_negtest_name": name,
- "_negtest_generator": generator,
- "_negtest_schema": schema,
- "_negtest_path": path})
- return scenarios
-
- def generate_payload(self, test, schema):
- """
- Generates one jsonschema out of the given test. It's mandatory to use
- generate_scenarios before to register all needed variables to the test.
-
- :param test: A test object (scenario) with all _negtest variables on it
- :param schema: schema for the test
- """
- generator = test._negtest_generator
- ret = generator(test._negtest_schema)
- path = copy.copy(test._negtest_path)
- expected_result = None
-
- if ret is not None:
- generator_result = generator(test._negtest_schema)
- invalid_snippet = generator_result[1]
- expected_result = generator_result[2]
- element = path.pop()
- if len(path) > 0:
- schema_snip = six.moves.reduce(dict.get, path, schema)
- schema_snip[element] = invalid_snippet
- else:
- schema[element] = invalid_snippet
- return expected_result
diff --git a/neutron/tests/tempest/common/generator/negative_generator.py b/neutron/tests/tempest/common/generator/negative_generator.py
deleted file mode 100644
index 44cd305..0000000
--- a/neutron/tests/tempest/common/generator/negative_generator.py
+++ /dev/null
@@ -1,75 +0,0 @@
-# Copyright 2014 Deutsche Telekom AG
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import copy
-
-import neutron.tests.tempest.common.generator.base_generator as base
-import neutron.tests.tempest.common.generator.valid_generator as valid
-
-
-class NegativeTestGenerator(base.BasicGeneratorSet):
- @base.generator_type("string")
- @base.simple_generator
- def gen_int(self, _):
- return 4
-
- @base.generator_type("integer")
- @base.simple_generator
- def gen_string(self, _):
- return "XXXXXX"
-
- @base.generator_type("integer", "string")
- def gen_none(self, schema):
- # Note(mkoderer): it's not using the decorator otherwise it'd be
- # filtered
- expected_result = base._check_for_expected_result('gen_none', schema)
- return ('gen_none', None, expected_result)
-
- @base.generator_type("string")
- @base.simple_generator
- def gen_str_min_length(self, schema):
- min_length = schema.get("minLength", 0)
- if min_length > 0:
- return "x" * (min_length - 1)
-
- @base.generator_type("string", needed_property="maxLength")
- @base.simple_generator
- def gen_str_max_length(self, schema):
- max_length = schema.get("maxLength", -1)
- return "x" * (max_length + 1)
-
- @base.generator_type("integer", needed_property="minimum")
- @base.simple_generator
- def gen_int_min(self, schema):
- minimum = schema["minimum"]
- if "exclusiveMinimum" not in schema:
- minimum -= 1
- return minimum
-
- @base.generator_type("integer", needed_property="maximum")
- @base.simple_generator
- def gen_int_max(self, schema):
- maximum = schema["maximum"]
- if "exclusiveMaximum" not in schema:
- maximum += 1
- return maximum
-
- @base.generator_type("object", needed_property="additionalProperties")
- @base.simple_generator
- def gen_obj_add_attr(self, schema):
- valid_schema = valid.ValidTestGenerator().generate_valid(schema)
- new_valid = copy.deepcopy(valid_schema)
- new_valid["$$$$$$$$$$"] = "xxx"
- return new_valid
diff --git a/neutron/tests/tempest/common/generator/valid_generator.py b/neutron/tests/tempest/common/generator/valid_generator.py
deleted file mode 100644
index bc7014c..0000000
--- a/neutron/tests/tempest/common/generator/valid_generator.py
+++ /dev/null
@@ -1,79 +0,0 @@
-# Copyright 2014 Deutsche Telekom AG
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import six
-
-import neutron.tests.tempest.common.generator.base_generator as base
-
-
-class ValidTestGenerator(base.BasicGeneratorSet):
- @base.generator_type("string")
- @base.simple_generator
- def generate_valid_string(self, schema):
- size = schema.get("minLength", 1)
- # TODO(dkr mko): handle format and pattern
- return "x" * size
-
- @base.generator_type("integer")
- @base.simple_generator
- def generate_valid_integer(self, schema):
- # TODO(dkr mko): handle multipleOf
- if "minimum" in schema:
- minimum = schema["minimum"]
- if "exclusiveMinimum" not in schema:
- return minimum
- else:
- return minimum + 1
- if "maximum" in schema:
- maximum = schema["maximum"]
- if "exclusiveMaximum" not in schema:
- return maximum
- else:
- return maximum - 1
- return 0
-
- @base.generator_type("object")
- @base.simple_generator
- def generate_valid_object(self, schema):
- obj = {}
- for k, v in six.iteritems(schema["properties"]):
- obj[k] = self.generate_valid(v)
- return obj
-
- def generate(self, schema):
- schema_type = schema["type"]
- if isinstance(schema_type, list):
- if "integer" in schema_type:
- schema_type = "integer"
- else:
- raise Exception("non-integer list types not supported")
- result = []
- if schema_type not in self.types_dict:
- raise TypeError("generator (%s) doesn't support type: %s"
- % (self.__class__.__name__, schema_type))
- for generator in self.types_dict[schema_type]:
- ret = generator(schema)
- if ret is not None:
- if isinstance(ret, list):
- result.extend(ret)
- elif isinstance(ret, tuple):
- result.append(ret)
- else:
- raise Exception("generator (%s) returns invalid result: %s"
- % (generator, ret))
- return result
-
- def generate_valid(self, schema):
- return self.generate(schema)[0][1]
diff --git a/neutron/tests/tempest/common/glance_http.py b/neutron/tests/tempest/common/glance_http.py
deleted file mode 100644
index 3d8c8aa..0000000
--- a/neutron/tests/tempest/common/glance_http.py
+++ /dev/null
@@ -1,379 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# Originally copied from python-glanceclient
-
-import copy
-import hashlib
-import posixpath
-import re
-import socket
-import StringIO
-import struct
-import urlparse
-
-
-import OpenSSL
-from oslo_log import log as logging
-from oslo_serialization import jsonutils as json
-from six import moves
-from six.moves import http_client as httplib
-from tempest_lib import exceptions as lib_exc
-
-from neutron.tests.tempest import exceptions as exc
-
-LOG = logging.getLogger(__name__)
-USER_AGENT = 'tempest'
-CHUNKSIZE = 1024 * 64 # 64kB
-TOKEN_CHARS_RE = re.compile('^[-A-Za-z0-9+/=]*$')
-
-
-class HTTPClient(object):
-
- def __init__(self, auth_provider, filters, **kwargs):
- self.auth_provider = auth_provider
- self.filters = filters
- self.endpoint = auth_provider.base_url(filters)
- endpoint_parts = urlparse.urlparse(self.endpoint)
- self.endpoint_scheme = endpoint_parts.scheme
- self.endpoint_hostname = endpoint_parts.hostname
- self.endpoint_port = endpoint_parts.port
- self.endpoint_path = endpoint_parts.path
-
- self.connection_class = self.get_connection_class(self.endpoint_scheme)
- self.connection_kwargs = self.get_connection_kwargs(
- self.endpoint_scheme, **kwargs)
-
- @staticmethod
- def get_connection_class(scheme):
- if scheme == 'https':
- return VerifiedHTTPSConnection
- else:
- return httplib.HTTPConnection
-
- @staticmethod
- def get_connection_kwargs(scheme, **kwargs):
- _kwargs = {'timeout': float(kwargs.get('timeout', 600))}
-
- if scheme == 'https':
- _kwargs['ca_certs'] = kwargs.get('ca_certs', None)
- _kwargs['cert_file'] = kwargs.get('cert_file', None)
- _kwargs['key_file'] = kwargs.get('key_file', None)
- _kwargs['insecure'] = kwargs.get('insecure', False)
- _kwargs['ssl_compression'] = kwargs.get('ssl_compression', True)
-
- return _kwargs
-
- def get_connection(self):
- _class = self.connection_class
- try:
- return _class(self.endpoint_hostname, self.endpoint_port,
- **self.connection_kwargs)
- except httplib.InvalidURL:
- raise exc.EndpointNotFound
-
- def _http_request(self, url, method, **kwargs):
- """Send an http request with the specified characteristics.
-
- Wrapper around httplib.HTTP(S)Connection.request to handle tasks such
- as setting headers and error handling.
- """
- # Copy the kwargs so we can reuse the original in case of redirects
- kwargs['headers'] = copy.deepcopy(kwargs.get('headers', {}))
- kwargs['headers'].setdefault('User-Agent', USER_AGENT)
-
- self._log_request(method, url, kwargs['headers'])
-
- conn = self.get_connection()
-
- try:
- url_parts = urlparse.urlparse(url)
- conn_url = posixpath.normpath(url_parts.path)
- LOG.debug('Actual Path: {path}'.format(path=conn_url))
- if kwargs['headers'].get('Transfer-Encoding') == 'chunked':
- conn.putrequest(method, conn_url)
- for header, value in kwargs['headers'].items():
- conn.putheader(header, value)
- conn.endheaders()
- chunk = kwargs['body'].read(CHUNKSIZE)
- # Chunk it, baby...
- while chunk:
- conn.send('%x\r\n%s\r\n' % (len(chunk), chunk))
- chunk = kwargs['body'].read(CHUNKSIZE)
- conn.send('0\r\n\r\n')
- else:
- conn.request(method, conn_url, **kwargs)
- resp = conn.getresponse()
- except socket.gaierror as e:
- message = ("Error finding address for %(url)s: %(e)s" %
- {'url': url, 'e': e})
- raise exc.EndpointNotFound(message)
- except (socket.error, socket.timeout) as e:
- message = ("Error communicating with %(endpoint)s %(e)s" %
- {'endpoint': self.endpoint, 'e': e})
- raise exc.TimeoutException(message)
-
- body_iter = ResponseBodyIterator(resp)
- # Read body into string if it isn't obviously image data
- if resp.getheader('content-type', None) != 'application/octet-stream':
- body_str = ''.join([body_chunk for body_chunk in body_iter])
- body_iter = StringIO.StringIO(body_str)
- self._log_response(resp, None)
- else:
- self._log_response(resp, body_iter)
-
- return resp, body_iter
-
- def _log_request(self, method, url, headers):
- LOG.info('Request: ' + method + ' ' + url)
- if headers:
- headers_out = headers
- if 'X-Auth-Token' in headers and headers['X-Auth-Token']:
- token = headers['X-Auth-Token']
- if len(token) > 64 and TOKEN_CHARS_RE.match(token):
- headers_out = headers.copy()
- headers_out['X-Auth-Token'] = "<Token omitted>"
- LOG.info('Request Headers: ' + str(headers_out))
-
- def _log_response(self, resp, body):
- status = str(resp.status)
- LOG.info("Response Status: " + status)
- if resp.getheaders():
- LOG.info('Response Headers: ' + str(resp.getheaders()))
- if body:
- str_body = str(body)
- length = len(body)
- LOG.info('Response Body: ' + str_body[:2048])
- if length >= 2048:
- self.LOG.debug("Large body (%d) md5 summary: %s", length,
- hashlib.md5(str_body).hexdigest())
-
- def json_request(self, method, url, **kwargs):
- kwargs.setdefault('headers', {})
- kwargs['headers'].setdefault('Content-Type', 'application/json')
- if kwargs['headers']['Content-Type'] != 'application/json':
- msg = "Only application/json content-type is supported."
- raise lib_exc.InvalidContentType(msg)
-
- if 'body' in kwargs:
- kwargs['body'] = json.dumps(kwargs['body'])
-
- resp, body_iter = self._http_request(url, method, **kwargs)
-
- if 'application/json' in resp.getheader('content-type', ''):
- body = ''.join([chunk for chunk in body_iter])
- try:
- body = json.loads(body)
- except ValueError:
- LOG.error('Could not decode response body as JSON')
- else:
- msg = "Only json/application content-type is supported."
- raise lib_exc.InvalidContentType(msg)
-
- return resp, body
-
- def raw_request(self, method, url, **kwargs):
- kwargs.setdefault('headers', {})
- kwargs['headers'].setdefault('Content-Type',
- 'application/octet-stream')
- if 'body' in kwargs:
- if (hasattr(kwargs['body'], 'read')
- and method.lower() in ('post', 'put')):
- # We use 'Transfer-Encoding: chunked' because
- # body size may not always be known in advance.
- kwargs['headers']['Transfer-Encoding'] = 'chunked'
-
- # Decorate the request with auth
- req_url, kwargs['headers'], kwargs['body'] = \
- self.auth_provider.auth_request(
- method=method, url=url, headers=kwargs['headers'],
- body=kwargs.get('body', None), filters=self.filters)
- return self._http_request(req_url, method, **kwargs)
-
-
-class OpenSSLConnectionDelegator(object):
- """
- An OpenSSL.SSL.Connection delegator.
-
- Supplies an additional 'makefile' method which httplib requires
- and is not present in OpenSSL.SSL.Connection.
-
- Note: Since it is not possible to inherit from OpenSSL.SSL.Connection
- a delegator must be used.
- """
- def __init__(self, *args, **kwargs):
- self.connection = OpenSSL.SSL.Connection(*args, **kwargs)
-
- def __getattr__(self, name):
- return getattr(self.connection, name)
-
- def makefile(self, *args, **kwargs):
- # Ensure the socket is closed when this file is closed
- kwargs['close'] = True
- return socket._fileobject(self.connection, *args, **kwargs)
-
-
-class VerifiedHTTPSConnection(httplib.HTTPSConnection):
- """
- Extended HTTPSConnection which uses the OpenSSL library
- for enhanced SSL support.
- Note: Much of this functionality can eventually be replaced
- with native Python 3.3 code.
- """
- def __init__(self, host, port=None, key_file=None, cert_file=None,
- ca_certs=None, timeout=None, insecure=False,
- ssl_compression=True):
- httplib.HTTPSConnection.__init__(self, host, port,
- key_file=key_file,
- cert_file=cert_file)
- self.key_file = key_file
- self.cert_file = cert_file
- self.timeout = timeout
- self.insecure = insecure
- self.ssl_compression = ssl_compression
- self.ca_certs = ca_certs
- self.setcontext()
-
- @staticmethod
- def host_matches_cert(host, x509):
- """
- Verify that the the x509 certificate we have received
- from 'host' correctly identifies the server we are
- connecting to, ie that the certificate's Common Name
- or a Subject Alternative Name matches 'host'.
- """
- # First see if we can match the CN
- if x509.get_subject().commonName == host:
- return True
-
- # Also try Subject Alternative Names for a match
- san_list = None
- for i in moves.range(x509.get_extension_count()):
- ext = x509.get_extension(i)
- if ext.get_short_name() == 'subjectAltName':
- san_list = str(ext)
- for san in ''.join(san_list.split()).split(','):
- if san == "DNS:%s" % host:
- return True
-
- # Server certificate does not match host
- msg = ('Host "%s" does not match x509 certificate contents: '
- 'CommonName "%s"' % (host, x509.get_subject().commonName))
- if san_list is not None:
- msg = msg + ', subjectAltName "%s"' % san_list
- raise exc.SSLCertificateError(msg)
-
- def verify_callback(self, connection, x509, errnum,
- depth, preverify_ok):
- if x509.has_expired():
- msg = "SSL Certificate expired on '%s'" % x509.get_notAfter()
- raise exc.SSLCertificateError(msg)
-
- if depth == 0 and preverify_ok is True:
- # We verify that the host matches against the last
- # certificate in the chain
- return self.host_matches_cert(self.host, x509)
- else:
- # Pass through OpenSSL's default result
- return preverify_ok
-
- def setcontext(self):
- """
- Set up the OpenSSL context.
- """
- self.context = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD)
-
- if self.ssl_compression is False:
- self.context.set_options(0x20000) # SSL_OP_NO_COMPRESSION
-
- if self.insecure is not True:
- self.context.set_verify(OpenSSL.SSL.VERIFY_PEER,
- self.verify_callback)
- else:
- self.context.set_verify(OpenSSL.SSL.VERIFY_NONE,
- self.verify_callback)
-
- if self.cert_file:
- try:
- self.context.use_certificate_file(self.cert_file)
- except Exception as e:
- msg = 'Unable to load cert from "%s" %s' % (self.cert_file, e)
- raise exc.SSLConfigurationError(msg)
- if self.key_file is None:
- # We support having key and cert in same file
- try:
- self.context.use_privatekey_file(self.cert_file)
- except Exception as e:
- msg = ('No key file specified and unable to load key '
- 'from "%s" %s' % (self.cert_file, e))
- raise exc.SSLConfigurationError(msg)
-
- if self.key_file:
- try:
- self.context.use_privatekey_file(self.key_file)
- except Exception as e:
- msg = 'Unable to load key from "%s" %s' % (self.key_file, e)
- raise exc.SSLConfigurationError(msg)
-
- if self.ca_certs:
- try:
- self.context.load_verify_locations(self.ca_certs)
- except Exception as e:
- msg = 'Unable to load CA from "%s"' % (self.ca_certs, e)
- raise exc.SSLConfigurationError(msg)
- else:
- self.context.set_default_verify_paths()
-
- def connect(self):
- """
- Connect to an SSL port using the OpenSSL library and apply
- per-connection parameters.
- """
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- if self.timeout is not None:
- # '0' microseconds
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVTIMEO,
- struct.pack('LL', self.timeout, 0))
- self.sock = OpenSSLConnectionDelegator(self.context, sock)
- self.sock.connect((self.host, self.port))
-
- def close(self):
- if self.sock:
- # Remove the reference to the socket but don't close it yet.
- # Response close will close both socket and associated
- # file. Closing socket too soon will cause response
- # reads to fail with socket IO error 'Bad file descriptor'.
- self.sock = None
- httplib.HTTPSConnection.close(self)
-
-
-class ResponseBodyIterator(object):
- """A class that acts as an iterator over an HTTP response."""
-
- def __init__(self, resp):
- self.resp = resp
-
- def __iter__(self):
- while True:
- yield next(self)
-
- def next(self):
- chunk = self.resp.read(CHUNKSIZE)
- if chunk:
- return chunk
- else:
- raise StopIteration()
-
- __next__ = next
diff --git a/neutron/tests/tempest/common/isolated_creds.py b/neutron/tests/tempest/common/isolated_creds.py
deleted file mode 100644
index 163ce8a..0000000
--- a/neutron/tests/tempest/common/isolated_creds.py
+++ /dev/null
@@ -1,392 +0,0 @@
-# Copyright 2013 IBM Corp.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import netaddr
-from oslo_log import log as logging
-from tempest_lib.common.utils import data_utils
-from tempest_lib import exceptions as lib_exc
-
-from neutron.tests.api import clients
-from neutron.tests.tempest.common import cred_provider
-from neutron.tests.tempest import config
-from neutron.tests.tempest import exceptions
-
-CONF = config.CONF
-LOG = logging.getLogger(__name__)
-
-
-class IsolatedCreds(cred_provider.CredentialProvider):
-
- def __init__(self, name, password='pass', network_resources=None):
- super(IsolatedCreds, self).__init__(name, password, network_resources)
- self.network_resources = network_resources
- self.isolated_creds = {}
- self.isolated_net_resources = {}
- self.ports = []
- self.password = password
- self.identity_admin_client, self.network_admin_client = (
- self._get_admin_clients())
-
- def _get_admin_clients(self):
- """
- Returns a tuple with instances of the following admin clients (in this
- order):
- identity
- network
- """
- os = clients.AdminManager()
- return os.identity_client, os.network_client
-
- def _create_tenant(self, name, description):
- tenant = self.identity_admin_client.create_tenant(
- name=name, description=description)
- return tenant
-
- def _get_tenant_by_name(self, name):
- tenant = self.identity_admin_client.get_tenant_by_name(name)
- return tenant
-
- def _create_user(self, username, password, tenant, email):
- user = self.identity_admin_client.create_user(
- username, password, tenant['id'], email)
- return user
-
- def _get_user(self, tenant, username):
- user = self.identity_admin_client.get_user_by_username(
- tenant['id'], username)
- return user
-
- def _list_roles(self):
- roles = self.identity_admin_client.list_roles()
- return roles
-
- def _assign_user_role(self, tenant, user, role_name):
- role = None
- try:
- roles = self._list_roles()
- role = next(r for r in roles if r['name'] == role_name)
- except StopIteration:
- msg = 'No "%s" role found' % role_name
- raise lib_exc.NotFound(msg)
- try:
- self.identity_admin_client.assign_user_role(tenant['id'],
- user['id'],
- role['id'])
- except lib_exc.Conflict:
- LOG.warning('Trying to add %s for user %s in tenant %s but they '
- ' were already granted that role' % (role_name,
- user['name'],
- tenant['name']))
-
- def _delete_user(self, user):
- self.identity_admin_client.delete_user(user)
-
- def _delete_tenant(self, tenant):
- if CONF.service_available.neutron:
- self._cleanup_default_secgroup(tenant)
- self.identity_admin_client.delete_tenant(tenant)
-
- def _create_creds(self, suffix="", admin=False, roles=None):
- """Create random credentials under the following schema.
-
- If the name contains a '.' is the full class path of something, and
- we don't really care. If it isn't, it's probably a meaningful name,
- so use it.
-
- For logging purposes, -user and -tenant are long and redundant,
- don't use them. The user# will be sufficient to figure it out.
- """
- if '.' in self.name:
- root = ""
- else:
- root = self.name
-
- tenant_name = data_utils.rand_name(root) + suffix
- tenant_desc = tenant_name + "-desc"
- tenant = self._create_tenant(name=tenant_name,
- description=tenant_desc)
-
- username = data_utils.rand_name(root) + suffix
- email = data_utils.rand_name(root) + suffix + "@example.com"
- user = self._create_user(username, self.password,
- tenant, email)
- if admin:
- self._assign_user_role(tenant, user, CONF.identity.admin_role)
- # Add roles specified in config file
- for conf_role in CONF.auth.tempest_roles:
- self._assign_user_role(tenant, user, conf_role)
- # Add roles requested by caller
- if roles:
- for role in roles:
- self._assign_user_role(tenant, user, role)
- return self._get_credentials(user, tenant)
-
- def _get_credentials(self, user, tenant):
- return cred_provider.get_credentials(
- username=user['name'], user_id=user['id'],
- tenant_name=tenant['name'], tenant_id=tenant['id'],
- password=self.password)
-
- def _create_network_resources(self, tenant_id):
- network = None
- subnet = None
- router = None
- # Make sure settings
- if self.network_resources:
- if self.network_resources['router']:
- if (not self.network_resources['subnet'] or
- not self.network_resources['network']):
- raise exceptions.InvalidConfiguration(
- 'A router requires a subnet and network')
- elif self.network_resources['subnet']:
- if not self.network_resources['network']:
- raise exceptions.InvalidConfiguration(
- 'A subnet requires a network')
- elif self.network_resources['dhcp']:
- raise exceptions.InvalidConfiguration('DHCP requires a subnet')
-
- data_utils.rand_name_root = data_utils.rand_name(self.name)
- if not self.network_resources or self.network_resources['network']:
- network_name = data_utils.rand_name_root + "-network"
- network = self._create_network(network_name, tenant_id)
- try:
- if not self.network_resources or self.network_resources['subnet']:
- subnet_name = data_utils.rand_name_root + "-subnet"
- subnet = self._create_subnet(subnet_name, tenant_id,
- network['id'])
- if not self.network_resources or self.network_resources['router']:
- router_name = data_utils.rand_name_root + "-router"
- router = self._create_router(router_name, tenant_id)
- self._add_router_interface(router['id'], subnet['id'])
- except Exception:
- if router:
- self._clear_isolated_router(router['id'], router['name'])
- if subnet:
- self._clear_isolated_subnet(subnet['id'], subnet['name'])
- if network:
- self._clear_isolated_network(network['id'], network['name'])
- raise
- return network, subnet, router
-
- def _create_network(self, name, tenant_id):
- resp_body = self.network_admin_client.create_network(
- name=name, tenant_id=tenant_id)
- return resp_body['network']
-
- def _create_subnet(self, subnet_name, tenant_id, network_id):
- base_cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr)
- mask_bits = CONF.network.tenant_network_mask_bits
- for subnet_cidr in base_cidr.subnet(mask_bits):
- try:
- if self.network_resources:
- resp_body = self.network_admin_client.\
- create_subnet(
- network_id=network_id, cidr=str(subnet_cidr),
- name=subnet_name,
- tenant_id=tenant_id,
- enable_dhcp=self.network_resources['dhcp'],
- ip_version=4)
- else:
- resp_body = self.network_admin_client.\
- create_subnet(network_id=network_id,
- cidr=str(subnet_cidr),
- name=subnet_name,
- tenant_id=tenant_id,
- ip_version=4)
- break
- except lib_exc.BadRequest as e:
- if 'overlaps with another subnet' not in str(e):
- raise
- else:
- message = 'Available CIDR for subnet creation could not be found'
- raise Exception(message)
- return resp_body['subnet']
-
- def _create_router(self, router_name, tenant_id):
- external_net_id = dict(
- network_id=CONF.network.public_network_id)
- resp_body = self.network_admin_client.create_router(
- router_name,
- external_gateway_info=external_net_id,
- tenant_id=tenant_id)
- return resp_body['router']
-
- def _add_router_interface(self, router_id, subnet_id):
- self.network_admin_client.add_router_interface_with_subnet_id(
- router_id, subnet_id)
-
- def get_primary_network(self):
- return self.isolated_net_resources.get('primary')[0]
-
- def get_primary_subnet(self):
- return self.isolated_net_resources.get('primary')[1]
-
- def get_primary_router(self):
- return self.isolated_net_resources.get('primary')[2]
-
- def get_admin_network(self):
- return self.isolated_net_resources.get('admin')[0]
-
- def get_admin_subnet(self):
- return self.isolated_net_resources.get('admin')[1]
-
- def get_admin_router(self):
- return self.isolated_net_resources.get('admin')[2]
-
- def get_alt_network(self):
- return self.isolated_net_resources.get('alt')[0]
-
- def get_alt_subnet(self):
- return self.isolated_net_resources.get('alt')[1]
-
- def get_alt_router(self):
- return self.isolated_net_resources.get('alt')[2]
-
- def get_credentials(self, credential_type):
- if self.isolated_creds.get(str(credential_type)):
- credentials = self.isolated_creds[str(credential_type)]
- else:
- if credential_type in ['primary', 'alt', 'admin']:
- is_admin = (credential_type == 'admin')
- credentials = self._create_creds(admin=is_admin)
- else:
- credentials = self._create_creds(roles=credential_type)
- self.isolated_creds[str(credential_type)] = credentials
- # Maintained until tests are ported
- LOG.info("Acquired isolated creds:\n credentials: %s"
- % credentials)
- if (CONF.service_available.neutron and
- not CONF.baremetal.driver_enabled):
- network, subnet, router = self._create_network_resources(
- credentials.tenant_id)
- self.isolated_net_resources[str(credential_type)] = (
- network, subnet, router,)
- LOG.info("Created isolated network resources for : \n"
- + " credentials: %s" % credentials)
- return credentials
-
- def get_primary_creds(self):
- return self.get_credentials('primary')
-
- def get_admin_creds(self):
- return self.get_credentials('admin')
-
- def get_alt_creds(self):
- return self.get_credentials('alt')
-
- def get_creds_by_roles(self, roles, force_new=False):
- roles = list(set(roles))
- # The roles list as a str will become the index as the dict key for
- # the created credentials set in the isolated_creds dict.
- exist_creds = self.isolated_creds.get(str(roles))
- # If force_new flag is True 2 cred sets with the same roles are needed
- # handle this by creating a separate index for old one to store it
- # separately for cleanup
- if exist_creds and force_new:
- new_index = str(roles) + '-' + str(len(self.isolated_creds))
- self.isolated_creds[new_index] = exist_creds
- del self.isolated_creds[str(roles)]
- # Handle isolated neutron resources if they exist too
- if CONF.service_available.neutron:
- exist_net = self.isolated_net_resources.get(str(roles))
- if exist_net:
- self.isolated_net_resources[new_index] = exist_net
- del self.isolated_net_resources[str(roles)]
- return self.get_credentials(roles)
-
- def _clear_isolated_router(self, router_id, router_name):
- net_client = self.network_admin_client
- try:
- net_client.delete_router(router_id)
- except lib_exc.NotFound:
- LOG.warn('router with name: %s not found for delete' %
- router_name)
-
- def _clear_isolated_subnet(self, subnet_id, subnet_name):
- net_client = self.network_admin_client
- try:
- net_client.delete_subnet(subnet_id)
- except lib_exc.NotFound:
- LOG.warn('subnet with name: %s not found for delete' %
- subnet_name)
-
- def _clear_isolated_network(self, network_id, network_name):
- net_client = self.network_admin_client
- try:
- net_client.delete_network(network_id)
- except lib_exc.NotFound:
- LOG.warn('network with name: %s not found for delete' %
- network_name)
-
- def _cleanup_default_secgroup(self, tenant):
- net_client = self.network_admin_client
- resp_body = net_client.list_security_groups(tenant_id=tenant,
- name="default")
- secgroups_to_delete = resp_body['security_groups']
- for secgroup in secgroups_to_delete:
- try:
- net_client.delete_security_group(secgroup['id'])
- except lib_exc.NotFound:
- LOG.warn('Security group %s, id %s not found for clean-up' %
- (secgroup['name'], secgroup['id']))
-
- def _clear_isolated_net_resources(self):
- net_client = self.network_admin_client
- for cred in self.isolated_net_resources:
- network, subnet, router = self.isolated_net_resources.get(cred)
- LOG.debug("Clearing network: %(network)s, "
- "subnet: %(subnet)s, router: %(router)s",
- {'network': network, 'subnet': subnet, 'router': router})
- if (not self.network_resources or
- self.network_resources.get('router')):
- try:
- net_client.remove_router_interface_with_subnet_id(
- router['id'], subnet['id'])
- except lib_exc.NotFound:
- LOG.warn('router with name: %s not found for delete' %
- router['name'])
- self._clear_isolated_router(router['id'], router['name'])
- if (not self.network_resources or
- self.network_resources.get('subnet')):
- self._clear_isolated_subnet(subnet['id'], subnet['name'])
- if (not self.network_resources or
- self.network_resources.get('network')):
- self._clear_isolated_network(network['id'], network['name'])
- self.isolated_net_resources = {}
-
- def clear_isolated_creds(self):
- if not self.isolated_creds:
- return
- self._clear_isolated_net_resources()
- for creds in self.isolated_creds.values():
- try:
- self._delete_user(creds.user_id)
- except lib_exc.NotFound:
- LOG.warn("user with name: %s not found for delete" %
- creds.username)
- try:
- self._delete_tenant(creds.tenant_id)
- except lib_exc.NotFound:
- LOG.warn("tenant with name: %s not found for delete" %
- creds.tenant_name)
- self.isolated_creds = {}
-
- def is_multi_user(self):
- return True
-
- def is_multi_tenant(self):
- return True
-
- def is_role_available(self, role):
- return True
diff --git a/neutron/tests/tempest/common/negative_rest_client.py b/neutron/tests/tempest/common/negative_rest_client.py
deleted file mode 100644
index 9058516..0000000
--- a/neutron/tests/tempest/common/negative_rest_client.py
+++ /dev/null
@@ -1,71 +0,0 @@
-# (c) 2014 Deutsche Telekom AG
-# Copyright 2014 Red Hat, Inc.
-# Copyright 2014 NEC Corporation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from neutron.tests.tempest.common import service_client
-from neutron.tests.tempest import config
-
-CONF = config.CONF
-
-
-class NegativeRestClient(service_client.ServiceClient):
- """
- Version of RestClient that does not raise exceptions.
- """
- def __init__(self, auth_provider, service):
- region = self._get_region(service)
- super(NegativeRestClient, self).__init__(auth_provider,
- service, region)
-
- def _get_region(self, service):
- """
- Returns the region for a specific service
- """
- service_region = None
- for cfgname in dir(CONF._config):
- # Find all config.FOO.catalog_type and assume FOO is a service.
- cfg = getattr(CONF, cfgname)
- catalog_type = getattr(cfg, 'catalog_type', None)
- if catalog_type == service:
- service_region = getattr(cfg, 'region', None)
- if not service_region:
- service_region = CONF.identity.region
- return service_region
-
- def _error_checker(self, method, url,
- headers, body, resp, resp_body):
- pass
-
- def send_request(self, method, url_template, resources, body=None):
- url = url_template % tuple(resources)
- if method == "GET":
- resp, body = self.get(url)
- elif method == "POST":
- resp, body = self.post(url, body)
- elif method == "PUT":
- resp, body = self.put(url, body)
- elif method == "PATCH":
- resp, body = self.patch(url, body)
- elif method == "HEAD":
- resp, body = self.head(url)
- elif method == "DELETE":
- resp, body = self.delete(url)
- elif method == "COPY":
- resp, body = self.copy(url)
- else:
- assert False
-
- return resp, body
diff --git a/neutron/tests/tempest/common/service_client.py b/neutron/tests/tempest/common/service_client.py
deleted file mode 100644
index ed19e89..0000000
--- a/neutron/tests/tempest/common/service_client.py
+++ /dev/null
@@ -1,93 +0,0 @@
-# Copyright 2015 NEC Corporation. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest_lib.common import rest_client
-
-from neutron.tests.tempest import config
-
-CONF = config.CONF
-
-
-class ServiceClient(rest_client.RestClient):
-
- def __init__(self, auth_provider, service, region,
- endpoint_type=None, build_interval=None, build_timeout=None,
- disable_ssl_certificate_validation=None, ca_certs=None,
- trace_requests=None):
-
- # TODO(oomichi): This params setting should be removed after all
- # service clients pass these values, and we can make ServiceClient
- # free from CONF values.
- dscv = (disable_ssl_certificate_validation or
- CONF.identity.disable_ssl_certificate_validation)
- params = {
- 'disable_ssl_certificate_validation': dscv,
- 'ca_certs': ca_certs or CONF.identity.ca_certificates_file,
- 'trace_requests': trace_requests or CONF.debug.trace_requests
- }
-
- if endpoint_type is not None:
- params.update({'endpoint_type': endpoint_type})
- if build_interval is not None:
- params.update({'build_interval': build_interval})
- if build_timeout is not None:
- params.update({'build_timeout': build_timeout})
- super(ServiceClient, self).__init__(auth_provider, service, region,
- **params)
-
-
-class ResponseBody(dict):
- """Class that wraps an http response and dict body into a single value.
-
- Callers that receive this object will normally use it as a dict but
- can extract the response if needed.
- """
-
- def __init__(self, response, body=None):
- body_data = body or {}
- self.update(body_data)
- self.response = response
-
- def __str__(self):
- body = super(ResponseBody, self).__str__()
- return "response: %s\nBody: %s" % (self.response, body)
-
-
-class ResponseBodyData(object):
- """Class that wraps an http response and string data into a single value.
- """
-
- def __init__(self, response, data):
- self.response = response
- self.data = data
-
- def __str__(self):
- return "response: %s\nBody: %s" % (self.response, self.data)
-
-
-class ResponseBodyList(list):
- """Class that wraps an http response and list body into a single value.
-
- Callers that receive this object will normally use it as a list but
- can extract the response if needed.
- """
-
- def __init__(self, response, body=None):
- body_data = body or []
- self.extend(body_data)
- self.response = response
-
- def __str__(self):
- body = super(ResponseBodyList, self).__str__()
- return "response: %s\nBody: %s" % (self.response, body)
diff --git a/neutron/tests/tempest/common/ssh.py b/neutron/tests/tempest/common/ssh.py
deleted file mode 100644
index 00febc6..0000000
--- a/neutron/tests/tempest/common/ssh.py
+++ /dev/null
@@ -1,152 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-
-import cStringIO
-import select
-import socket
-import time
-import warnings
-
-from oslo_log import log as logging
-import six
-
-from neutron.tests.tempest import exceptions
-
-
-with warnings.catch_warnings():
- warnings.simplefilter("ignore")
- import paramiko
-
-
-LOG = logging.getLogger(__name__)
-
-
-class Client(object):
-
- def __init__(self, host, username, password=None, timeout=300, pkey=None,
- channel_timeout=10, look_for_keys=False, key_filename=None):
- self.host = host
- self.username = username
- self.password = password
- if isinstance(pkey, six.string_types):
- pkey = paramiko.RSAKey.from_private_key(
- cStringIO.StringIO(str(pkey)))
- self.pkey = pkey
- self.look_for_keys = look_for_keys
- self.key_filename = key_filename
- self.timeout = int(timeout)
- self.channel_timeout = float(channel_timeout)
- self.buf_size = 1024
-
- def _get_ssh_connection(self, sleep=1.5, backoff=1):
- """Returns an ssh connection to the specified host."""
- bsleep = sleep
- ssh = paramiko.SSHClient()
- ssh.set_missing_host_key_policy(
- paramiko.AutoAddPolicy())
- _start_time = time.time()
- if self.pkey is not None:
- LOG.info("Creating ssh connection to '%s' as '%s'"
- " with public key authentication",
- self.host, self.username)
- else:
- LOG.info("Creating ssh connection to '%s' as '%s'"
- " with password %s",
- self.host, self.username, str(self.password))
- attempts = 0
- while True:
- try:
- ssh.connect(self.host, username=self.username,
- password=self.password,
- look_for_keys=self.look_for_keys,
- key_filename=self.key_filename,
- timeout=self.channel_timeout, pkey=self.pkey)
- LOG.info("ssh connection to %s@%s successfuly created",
- self.username, self.host)
- return ssh
- except (socket.error,
- paramiko.SSHException) as e:
- if self._is_timed_out(_start_time):
- LOG.exception("Failed to establish authenticated ssh"
- " connection to %s@%s after %d attempts",
- self.username, self.host, attempts)
- raise exceptions.SSHTimeout(host=self.host,
- user=self.username,
- password=self.password)
- bsleep += backoff
- attempts += 1
- LOG.warning("Failed to establish authenticated ssh"
- " connection to %s@%s (%s). Number attempts: %s."
- " Retry after %d seconds.",
- self.username, self.host, e, attempts, bsleep)
- time.sleep(bsleep)
-
- def _is_timed_out(self, start_time):
- return (time.time() - self.timeout) > start_time
-
- def exec_command(self, cmd):
- """
- Execute the specified command on the server.
-
- Note that this method is reading whole command outputs to memory, thus
- shouldn't be used for large outputs.
-
- :returns: data read from standard output of the command.
- :raises: SSHExecCommandFailed if command returns nonzero
- status. The exception contains command status stderr content.
- """
- ssh = self._get_ssh_connection()
- transport = ssh.get_transport()
- channel = transport.open_session()
- channel.fileno() # Register event pipe
- channel.exec_command(cmd)
- channel.shutdown_write()
- out_data = []
- err_data = []
- poll = select.poll()
- poll.register(channel, select.POLLIN)
- start_time = time.time()
-
- while True:
- ready = poll.poll(self.channel_timeout)
- if not any(ready):
- if not self._is_timed_out(start_time):
- continue
- raise exceptions.TimeoutException(
- "Command: '{0}' executed on host '{1}'.".format(
- cmd, self.host))
- if not ready[0]: # If there is nothing to read.
- continue
- out_chunk = err_chunk = None
- if channel.recv_ready():
- out_chunk = channel.recv(self.buf_size)
- out_data += out_chunk,
- if channel.recv_stderr_ready():
- err_chunk = channel.recv_stderr(self.buf_size)
- err_data += err_chunk,
- if channel.closed and not err_chunk and not out_chunk:
- break
- exit_status = channel.recv_exit_status()
- if 0 != exit_status:
- raise exceptions.SSHExecCommandFailed(
- command=cmd, exit_status=exit_status,
- strerror=''.join(err_data))
- return ''.join(out_data)
-
- def test_connection_auth(self):
- """Raises an exception when we can not connect to server via ssh."""
- connection = self._get_ssh_connection()
- connection.close()
diff --git a/neutron/tests/tempest/common/tempest_fixtures.py b/neutron/tests/tempest/common/tempest_fixtures.py
deleted file mode 100644
index d416857..0000000
--- a/neutron/tests/tempest/common/tempest_fixtures.py
+++ /dev/null
@@ -1,21 +0,0 @@
-# Copyright 2013 IBM Corp.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_concurrency.fixture import lockutils
-
-
-class LockFixture(lockutils.LockFixture):
- def __init__(self, name):
- super(LockFixture, self).__init__(name, 'tempest-')
diff --git a/neutron/tests/tempest/common/utils/__init__.py b/neutron/tests/tempest/common/utils/__init__.py
deleted file mode 100644
index 04d898d..0000000
--- a/neutron/tests/tempest/common/utils/__init__.py
+++ /dev/null
@@ -1,3 +0,0 @@
-PING_IPV4_COMMAND = 'ping -c 3 '
-PING_IPV6_COMMAND = 'ping6 -c 3 '
-PING_PACKET_LOSS_REGEX = '(\d{1,3})\.?\d*\% packet loss'
diff --git a/neutron/tests/tempest/common/utils/data_utils.py b/neutron/tests/tempest/common/utils/data_utils.py
deleted file mode 100644
index d441778..0000000
--- a/neutron/tests/tempest/common/utils/data_utils.py
+++ /dev/null
@@ -1,101 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import itertools
-import netaddr
-import random
-import uuid
-
-
-def rand_uuid():
- return str(uuid.uuid4())
-
-
-def rand_uuid_hex():
- return uuid.uuid4().hex
-
-
-def rand_name(name=''):
- randbits = str(random.randint(1, 0x7fffffff))
- if name:
- return name + '-' + randbits
- else:
- return randbits
-
-
-def rand_url():
- randbits = str(random.randint(1, 0x7fffffff))
- return 'https://url-' + randbits + '.com'
-
-
-def rand_int_id(start=0, end=0x7fffffff):
- return random.randint(start, end)
-
-
-def rand_mac_address():
- """Generate an Ethernet MAC address."""
- # NOTE(vish): We would prefer to use 0xfe here to ensure that linux
- # bridge mac addresses don't change, but it appears to
- # conflict with libvirt, so we use the next highest octet
- # that has the unicast and locally administered bits set
- # properly: 0xfa.
- # Discussion: https://bugs.launchpad.net/nova/+bug/921838
- mac = [0xfa, 0x16, 0x3e,
- random.randint(0x00, 0xff),
- random.randint(0x00, 0xff),
- random.randint(0x00, 0xff)]
- return ':'.join(["%02x" % x for x in mac])
-
-
-def parse_image_id(image_ref):
- """Return the image id from a given image ref."""
- return image_ref.rsplit('/')[-1]
-
-
-def arbitrary_string(size=4, base_text=None):
- """
- Return size characters from base_text, repeating the base_text infinitely
- if needed.
- """
- if not base_text:
- base_text = 'test'
- return ''.join(itertools.islice(itertools.cycle(base_text), size))
-
-
-def random_bytes(size=1024):
- """
- Return size randomly selected bytes as a string.
- """
- return ''.join([chr(random.randint(0, 255))
- for i in range(size)])
-
-
-def get_ipv6_addr_by_EUI64(cidr, mac):
- # Check if the prefix is IPv4 address
- is_ipv4 = netaddr.valid_ipv4(cidr)
- if is_ipv4:
- msg = "Unable to generate IP address by EUI64 for IPv4 prefix"
- raise TypeError(msg)
- try:
- eui64 = int(netaddr.EUI(mac).eui64())
- prefix = netaddr.IPNetwork(cidr)
- return netaddr.IPAddress(prefix.first + eui64 ^ (1 << 57))
- except (ValueError, netaddr.AddrFormatError):
- raise TypeError('Bad prefix or mac format for generating IPv6 '
- 'address by EUI-64: %(prefix)s, %(mac)s:'
- % {'prefix': cidr, 'mac': mac})
- except TypeError:
- raise TypeError('Bad prefix type for generate IPv6 address by '
- 'EUI-64: %s' % cidr)
diff --git a/neutron/tests/tempest/common/utils/file_utils.py b/neutron/tests/tempest/common/utils/file_utils.py
deleted file mode 100644
index 43083f4..0000000
--- a/neutron/tests/tempest/common/utils/file_utils.py
+++ /dev/null
@@ -1,23 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-
-def have_effective_read_access(path):
- try:
- fh = open(path, "rb")
- except IOError:
- return False
- fh.close()
- return True
diff --git a/neutron/tests/tempest/common/utils/misc.py b/neutron/tests/tempest/common/utils/misc.py
deleted file mode 100644
index b97dd86..0000000
--- a/neutron/tests/tempest/common/utils/misc.py
+++ /dev/null
@@ -1,87 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import inspect
-import re
-
-from oslo_log import log as logging
-
-LOG = logging.getLogger(__name__)
-
-
-def singleton(cls):
- """Simple wrapper for classes that should only have a single instance."""
- instances = {}
-
- def getinstance():
- if cls not in instances:
- instances[cls] = cls()
- return instances[cls]
- return getinstance
-
-
-def find_test_caller():
- """Find the caller class and test name.
-
- Because we know that the interesting things that call us are
- test_* methods, and various kinds of setUp / tearDown, we
- can look through the call stack to find appropriate methods,
- and the class we were in when those were called.
- """
- caller_name = None
- names = []
- frame = inspect.currentframe()
- is_cleanup = False
- # Start climbing the ladder until we hit a good method
- while True:
- try:
- frame = frame.f_back
- name = frame.f_code.co_name
- names.append(name)
- if re.search("^(test_|setUp|tearDown)", name):
- cname = ""
- if 'self' in frame.f_locals:
- cname = frame.f_locals['self'].__class__.__name__
- if 'cls' in frame.f_locals:
- cname = frame.f_locals['cls'].__name__
- caller_name = cname + ":" + name
- break
- elif re.search("^_run_cleanup", name):
- is_cleanup = True
- elif name == 'main':
- caller_name = 'main'
- break
- else:
- cname = ""
- if 'self' in frame.f_locals:
- cname = frame.f_locals['self'].__class__.__name__
- if 'cls' in frame.f_locals:
- cname = frame.f_locals['cls'].__name__
-
- # the fact that we are running cleanups is indicated pretty
- # deep in the stack, so if we see that we want to just
- # start looking for a real class name, and declare victory
- # once we do.
- if is_cleanup and cname:
- if not re.search("^RunTest", cname):
- caller_name = cname + ":_run_cleanups"
- break
- except Exception:
- break
- # prevents frame leaks
- del frame
- if caller_name is None:
- LOG.debug("Sane call name not found in %s" % names)
- return caller_name
diff --git a/neutron/tests/tempest/common/waiters.py b/neutron/tests/tempest/common/waiters.py
deleted file mode 100644
index caa9b37..0000000
--- a/neutron/tests/tempest/common/waiters.py
+++ /dev/null
@@ -1,161 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-
-import time
-
-from oslo_log import log as logging
-from tempest_lib.common.utils import misc as misc_utils
-
-from neutron.tests.tempest import config
-from neutron.tests.tempest import exceptions
-
-CONF = config.CONF
-LOG = logging.getLogger(__name__)
-
-
-# NOTE(afazekas): This function needs to know a token and a subject.
-def wait_for_server_status(client, server_id, status, ready_wait=True,
- extra_timeout=0, raise_on_error=True):
- """Waits for a server to reach a given status."""
-
- def _get_task_state(body):
- return body.get('OS-EXT-STS:task_state', None)
-
- # NOTE(afazekas): UNKNOWN status possible on ERROR
- # or in a very early stage.
- body = client.get_server(server_id)
- old_status = server_status = body['status']
- old_task_state = task_state = _get_task_state(body)
- start_time = int(time.time())
- timeout = client.build_timeout + extra_timeout
- while True:
- # NOTE(afazekas): Now the BUILD status only reached
- # between the UNKNOWN->ACTIVE transition.
- # TODO(afazekas): enumerate and validate the stable status set
- if status == 'BUILD' and server_status != 'UNKNOWN':
- return
- if server_status == status:
- if ready_wait:
- if status == 'BUILD':
- return
- # NOTE(afazekas): The instance is in "ready for action state"
- # when no task in progress
- # NOTE(afazekas): Converted to string because of the XML
- # responses
- if str(task_state) == "None":
- # without state api extension 3 sec usually enough
- time.sleep(CONF.compute.ready_wait)
- return
- else:
- return
-
- time.sleep(client.build_interval)
- body = client.get_server(server_id)
- server_status = body['status']
- task_state = _get_task_state(body)
- if (server_status != old_status) or (task_state != old_task_state):
- LOG.info('State transition "%s" ==> "%s" after %d second wait',
- '/'.join((old_status, str(old_task_state))),
- '/'.join((server_status, str(task_state))),
- time.time() - start_time)
- if (server_status == 'ERROR') and raise_on_error:
- if 'fault' in body:
- raise exceptions.BuildErrorException(body['fault'],
- server_id=server_id)
- else:
- raise exceptions.BuildErrorException(server_id=server_id)
-
- timed_out = int(time.time()) - start_time >= timeout
-
- if timed_out:
- expected_task_state = 'None' if ready_wait else 'n/a'
- message = ('Server %(server_id)s failed to reach %(status)s '
- 'status and task state "%(expected_task_state)s" '
- 'within the required time (%(timeout)s s).' %
- {'server_id': server_id,
- 'status': status,
- 'expected_task_state': expected_task_state,
- 'timeout': timeout})
- message += ' Current status: %s.' % server_status
- message += ' Current task state: %s.' % task_state
- caller = misc_utils.find_test_caller()
- if caller:
- message = '(%s) %s' % (caller, message)
- raise exceptions.TimeoutException(message)
- old_status = server_status
- old_task_state = task_state
-
-
-def wait_for_image_status(client, image_id, status):
- """Waits for an image to reach a given status.
-
- The client should have a get_image(image_id) method to get the image.
- The client should also have build_interval and build_timeout attributes.
- """
- image = client.get_image(image_id)
- start = int(time.time())
-
- while image['status'] != status:
- time.sleep(client.build_interval)
- image = client.get_image(image_id)
- status_curr = image['status']
- if status_curr == 'ERROR':
- raise exceptions.AddImageException(image_id=image_id)
-
- # check the status again to avoid a false negative where we hit
- # the timeout at the same time that the image reached the expected
- # status
- if status_curr == status:
- return
-
- if int(time.time()) - start >= client.build_timeout:
- message = ('Image %(image_id)s failed to reach %(status)s state'
- '(current state %(status_curr)s) '
- 'within the required time (%(timeout)s s).' %
- {'image_id': image_id,
- 'status': status,
- 'status_curr': status_curr,
- 'timeout': client.build_timeout})
- caller = misc_utils.find_test_caller()
- if caller:
- message = '(%s) %s' % (caller, message)
- raise exceptions.TimeoutException(message)
-
-
-def wait_for_bm_node_status(client, node_id, attr, status):
- """Waits for a baremetal node attribute to reach given status.
-
- The client should have a show_node(node_uuid) method to get the node.
- """
- _, node = client.show_node(node_id)
- start = int(time.time())
-
- while node[attr] != status:
- time.sleep(client.build_interval)
- _, node = client.show_node(node_id)
- status_curr = node[attr]
- if status_curr == status:
- return
-
- if int(time.time()) - start >= client.build_timeout:
- message = ('Node %(node_id)s failed to reach %(attr)s=%(status)s '
- 'within the required time (%(timeout)s s).' %
- {'node_id': node_id,
- 'attr': attr,
- 'status': status,
- 'timeout': client.build_timeout})
- message += ' Current state of %s: %s.' % (attr, status_curr)
- caller = misc_utils.find_test_caller()
- if caller:
- message = '(%s) %s' % (caller, message)
- raise exceptions.TimeoutException(message)
diff --git a/neutron/tests/tempest/config.py b/neutron/tests/tempest/config.py
index 06fd541..08b235a 100644
--- a/neutron/tests/tempest/config.py
+++ b/neutron/tests/tempest/config.py
@@ -1,6 +1,3 @@
-# Copyright 2012 OpenStack Foundation
-# All Rights Reserved.
-#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@@ -13,1225 +10,20 @@
# License for the specific language governing permissions and limitations
# under the License.
-from __future__ import print_function
-
-import os
-
from oslo_config import cfg
-from oslo_log import log as logging
+from tempest import config
+
+CONF = config.CONF
-def register_opt_group(conf, opt_group, options):
- conf.register_group(opt_group)
- for opt in options:
- conf.register_opt(opt, group=opt_group.name)
-
-
-auth_group = cfg.OptGroup(name='auth',
- title="Options for authentication and credentials")
-
-
-AuthGroup = [
- cfg.StrOpt('test_accounts_file',
- default='etc/accounts.yaml',
- help="Path to the yaml file that contains the list of "
- "credentials to use for running tests"),
- cfg.BoolOpt('allow_tenant_isolation',
- default=True,
- help="Allows test cases to create/destroy tenants and "
- "users. This option requires that OpenStack Identity "
- "API admin credentials are known. If false, isolated "
- "test cases and parallel execution, can still be "
- "achieved configuring a list of test accounts",
- deprecated_opts=[cfg.DeprecatedOpt('allow_tenant_isolation',
- group='compute'),
- cfg.DeprecatedOpt('allow_tenant_isolation',
- group='orchestration')]),
- cfg.BoolOpt('locking_credentials_provider',
- default=False,
- help="If set to True it enables the Accounts provider, "
- "which locks credentials to allow for parallel execution "
- "with pre-provisioned accounts. It can only be used to "
- "run tests that ensure credentials cleanup happens. "
- "It requires at least `2 * CONC` distinct accounts "
- "configured in `test_accounts_file`, with CONC == the "
- "number of concurrent test processes."),
- cfg.ListOpt('tempest_roles',
- help="Roles to assign to all users created by tempest",
- default=[]),
- cfg.StrOpt('admin_username',
- help="Administrative Username to use for "
- "Keystone API requests."),
- cfg.StrOpt('admin_tenant_name',
- help="Administrative Tenant name to use for Keystone API "
- "requests."),
- cfg.StrOpt('admin_password',
- help="API key to use when authenticating as admin.",
- secret=True),
- cfg.StrOpt('admin_domain_name',
- help="Admin domain name for authentication (Keystone V3)."
- "The same domain applies to user and project"),
-]
-
-identity_group = cfg.OptGroup(name='identity',
- title="Keystone Configuration Options")
-
-IdentityGroup = [
- cfg.StrOpt('catalog_type',
- default='identity',
- help="Catalog type of the Identity service."),
- cfg.BoolOpt('disable_ssl_certificate_validation',
- default=False,
- help="Set to True if using self-signed SSL certificates."),
- cfg.StrOpt('ca_certificates_file',
- help='Specify a CA bundle file to use in verifying a '
- 'TLS (https) server certificate.'),
- cfg.StrOpt('uri',
- help="Full URI of the OpenStack Identity API (Keystone), v2"),
- cfg.StrOpt('uri_v3',
- help='Full URI of the OpenStack Identity API (Keystone), v3'),
- cfg.StrOpt('auth_version',
- default='v2',
- help="Identity API version to be used for authentication "
- "for API tests."),
- cfg.StrOpt('region',
- default='RegionOne',
- help="The identity region name to use. Also used as the other "
- "services' region name unless they are set explicitly. "
- "If no such region is found in the service catalog, the "
- "first found one is used."),
- cfg.StrOpt('endpoint_type',
- default='publicURL',
- choices=['public', 'admin', 'internal',
- 'publicURL', 'adminURL', 'internalURL'],
- help="The endpoint type to use for the identity service."),
- cfg.StrOpt('username',
- help="Username to use for Nova API requests."),
- cfg.StrOpt('tenant_name',
- help="Tenant name to use for Nova API requests."),
- cfg.StrOpt('admin_role',
- default='admin',
- help="Role required to administrate keystone."),
- cfg.StrOpt('password',
- help="API key to use when authenticating.",
- secret=True),
- cfg.StrOpt('domain_name',
- help="Domain name for authentication (Keystone V3)."
- "The same domain applies to user and project"),
- cfg.StrOpt('alt_username',
- help="Username of alternate user to use for Nova API "
- "requests."),
- cfg.StrOpt('alt_tenant_name',
- help="Alternate user's Tenant name to use for Nova API "
- "requests."),
- cfg.StrOpt('alt_password',
- help="API key to use when authenticating as alternate user.",
- secret=True),
- cfg.StrOpt('alt_domain_name',
- help="Alternate domain name for authentication (Keystone V3)."
- "The same domain applies to user and project"),
-]
-
-identity_feature_group = cfg.OptGroup(name='identity-feature-enabled',
- title='Enabled Identity Features')
-
-IdentityFeatureGroup = [
- cfg.BoolOpt('trust',
- default=True,
- help='Does the identity service have delegation and '
- 'impersonation enabled'),
- cfg.BoolOpt('api_v2',
- default=True,
- help='Is the v2 identity API enabled'),
- cfg.BoolOpt('api_v3',
- default=True,
- help='Is the v3 identity API enabled'),
-]
-
-compute_group = cfg.OptGroup(name='compute',
- title='Compute Service Options')
-
-ComputeGroup = [
- cfg.StrOpt('image_ref',
- help="Valid primary image reference to be used in tests. "
- "This is a required option"),
- cfg.StrOpt('image_ref_alt',
- help="Valid secondary image reference to be used in tests. "
- "This is a required option, but if only one image is "
- "available duplicate the value of image_ref above"),
- cfg.StrOpt('flavor_ref',
- default="1",
- help="Valid primary flavor to use in tests."),
- cfg.StrOpt('flavor_ref_alt',
- default="2",
- help='Valid secondary flavor to be used in tests.'),
- cfg.StrOpt('image_ssh_user',
- default="root",
- help="User name used to authenticate to an instance."),
- cfg.StrOpt('image_ssh_password',
- default="password",
- help="Password used to authenticate to an instance."),
- cfg.StrOpt('image_alt_ssh_user',
- default="root",
- help="User name used to authenticate to an instance using "
- "the alternate image."),
- cfg.StrOpt('image_alt_ssh_password',
- default="password",
- help="Password used to authenticate to an instance using "
- "the alternate image."),
- cfg.IntOpt('build_interval',
- default=1,
- help="Time in seconds between build status checks."),
- cfg.IntOpt('build_timeout',
- default=300,
- help="Timeout in seconds to wait for an instance to build. "
- "Other services that do not define build_timeout will "
- "inherit this value."),
- cfg.BoolOpt('run_ssh',
- default=False,
- help="Should the tests ssh to instances?"),
- cfg.StrOpt('ssh_auth_method',
- default='keypair',
- help="Auth method used for authenticate to the instance. "
- "Valid choices are: keypair, configured, adminpass. "
- "keypair: start the servers with an ssh keypair. "
- "configured: use the configured user and password. "
- "adminpass: use the injected adminPass. "
- "disabled: avoid using ssh when it is an option."),
- cfg.StrOpt('ssh_connect_method',
- default='fixed',
- help="How to connect to the instance? "
- "fixed: using the first ip belongs the fixed network "
- "floating: creating and using a floating ip"),
- cfg.StrOpt('ssh_user',
- default='root',
- help="User name used to authenticate to an instance."),
- cfg.IntOpt('ping_timeout',
- default=120,
- help="Timeout in seconds to wait for ping to "
- "succeed."),
- cfg.IntOpt('ssh_timeout',
- default=300,
- help="Timeout in seconds to wait for authentication to "
- "succeed."),
- cfg.IntOpt('ready_wait',
- default=0,
- help="Additional wait time for clean state, when there is "
- "no OS-EXT-STS extension available"),
- cfg.IntOpt('ssh_channel_timeout',
- default=60,
- help="Timeout in seconds to wait for output from ssh "
- "channel."),
- cfg.StrOpt('fixed_network_name',
- default='private',
- help="Name of the fixed network that is visible to all test "
- "tenants."),
- cfg.StrOpt('network_for_ssh',
- default='public',
- help="Network used for SSH connections. Ignored if "
- "use_floatingip_for_ssh=true or run_ssh=false."),
- cfg.IntOpt('ip_version_for_ssh',
- default=4,
- help="IP version used for SSH connections."),
- cfg.BoolOpt('use_floatingip_for_ssh',
- default=True,
- help="Does SSH use Floating IPs?"),
- cfg.StrOpt('catalog_type',
- default='compute',
- help="Catalog type of the Compute service."),
- cfg.StrOpt('region',
- default='',
- help="The compute region name to use. If empty, the value "
- "of identity.region is used instead. If no such region "
- "is found in the service catalog, the first found one is "
- "used."),
- cfg.StrOpt('endpoint_type',
- default='publicURL',
- choices=['public', 'admin', 'internal',
- 'publicURL', 'adminURL', 'internalURL'],
- help="The endpoint type to use for the compute service."),
- cfg.StrOpt('path_to_private_key',
- help="Path to a private key file for SSH access to remote "
- "hosts"),
- cfg.StrOpt('volume_device_name',
- default='vdb',
- help="Expected device name when a volume is attached to "
- "an instance"),
- cfg.IntOpt('shelved_offload_time',
- default=0,
- help='Time in seconds before a shelved instance is eligible '
- 'for removing from a host. -1 never offload, 0 offload '
- 'when shelved. This time should be the same as the time '
- 'of nova.conf, and some tests will run for as long as the '
- 'time.'),
- cfg.StrOpt('floating_ip_range',
- default='10.0.0.0/29',
- help='Unallocated floating IP range, which will be used to '
- 'test the floating IP bulk feature for CRUD operation. '
- 'This block must not overlap an existing floating IP '
- 'pool.')
-]
-
-compute_features_group = cfg.OptGroup(name='compute-feature-enabled',
- title="Enabled Compute Service Features")
-
-ComputeFeaturesGroup = [
- cfg.BoolOpt('disk_config',
- default=True,
- help="If false, skip disk config tests"),
- cfg.ListOpt('api_extensions',
- default=['all'],
- help='A list of enabled compute extensions with a special '
- 'entry all which indicates every extension is enabled. '
- 'Each extension should be specified with alias name. '
- 'Empty list indicates all extensions are disabled'),
- cfg.BoolOpt('change_password',
- default=False,
- help="Does the test environment support changing the admin "
- "password?"),
- cfg.BoolOpt('console_output',
- default=True,
- help="Does the test environment support obtaining instance "
- "serial console output?"),
- cfg.BoolOpt('resize',
- default=False,
- help="Does the test environment support resizing?"),
- cfg.BoolOpt('pause',
- default=True,
- help="Does the test environment support pausing?"),
- cfg.BoolOpt('shelve',
- default=True,
- help="Does the test environment support shelving/unshelving?"),
- cfg.BoolOpt('suspend',
- default=True,
- help="Does the test environment support suspend/resume?"),
- cfg.BoolOpt('live_migration',
- default=True,
- help="Does the test environment support live migration "
- "available?"),
- cfg.BoolOpt('block_migration_for_live_migration',
- default=False,
- help="Does the test environment use block devices for live "
- "migration"),
- cfg.BoolOpt('block_migrate_cinder_iscsi',
- default=False,
- help="Does the test environment block migration support "
- "cinder iSCSI volumes"),
- cfg.BoolOpt('vnc_console',
- default=False,
- help='Enable VNC console. This configuration value should '
- 'be same as [nova.vnc]->vnc_enabled in nova.conf'),
- cfg.BoolOpt('spice_console',
- default=False,
- help='Enable Spice console. This configuration value should '
- 'be same as [nova.spice]->enabled in nova.conf'),
- cfg.BoolOpt('rdp_console',
- default=False,
- help='Enable RDP console. This configuration value should '
- 'be same as [nova.rdp]->enabled in nova.conf'),
- cfg.BoolOpt('rescue',
- default=True,
- help='Does the test environment support instance rescue '
- 'mode?'),
- cfg.BoolOpt('enable_instance_password',
- default=True,
- help='Enables returning of the instance password by the '
- 'relevant server API calls such as create, rebuild '
- 'or rescue.'),
- cfg.BoolOpt('interface_attach',
- default=True,
- help='Does the test environment support dynamic network '
- 'interface attachment?'),
- cfg.BoolOpt('snapshot',
- default=True,
- help='Does the test environment support creating snapshot '
- 'images of running instances?'),
- cfg.BoolOpt('ec2_api',
- default=True,
- help='Does the test environment have the ec2 api running?')
-]
-
-
-image_group = cfg.OptGroup(name='image',
- title="Image Service Options")
-
-ImageGroup = [
- cfg.StrOpt('catalog_type',
- default='image',
- help='Catalog type of the Image service.'),
- cfg.StrOpt('region',
- default='',
- help="The image region name to use. If empty, the value "
- "of identity.region is used instead. If no such region "
- "is found in the service catalog, the first found one is "
- "used."),
- cfg.StrOpt('endpoint_type',
- default='publicURL',
- choices=['public', 'admin', 'internal',
- 'publicURL', 'adminURL', 'internalURL'],
- help="The endpoint type to use for the image service."),
- cfg.StrOpt('http_image',
- default='http://download.cirros-cloud.net/0.3.1/'
- 'cirros-0.3.1-x86_64-uec.tar.gz',
- help='http accessible image'),
- cfg.IntOpt('build_timeout',
- default=300,
- help="Timeout in seconds to wait for an image to "
- "become available."),
- cfg.IntOpt('build_interval',
- default=1,
- help="Time in seconds between image operation status "
- "checks.")
-]
-
-image_feature_group = cfg.OptGroup(name='image-feature-enabled',
- title='Enabled image service features')
-
-ImageFeaturesGroup = [
- cfg.BoolOpt('api_v2',
- default=True,
- help="Is the v2 image API enabled"),
- cfg.BoolOpt('api_v1',
- default=True,
- help="Is the v1 image API enabled"),
-]
-
-network_group = cfg.OptGroup(name='network',
- title='Network Service Options')
-
-NetworkGroup = [
- cfg.StrOpt('catalog_type',
- default='network',
- help='Catalog type of the Neutron service.'),
- cfg.StrOpt('region',
- default='',
- help="The network region name to use. If empty, the value "
- "of identity.region is used instead. If no such region "
- "is found in the service catalog, the first found one is "
- "used."),
- cfg.StrOpt('endpoint_type',
- default='publicURL',
- choices=['public', 'admin', 'internal',
- 'publicURL', 'adminURL', 'internalURL'],
- help="The endpoint type to use for the network service."),
- cfg.StrOpt('tenant_network_cidr',
- default="10.100.0.0/16",
- help="The cidr block to allocate tenant ipv4 subnets from"),
- cfg.IntOpt('tenant_network_mask_bits',
- default=28,
- help="The mask bits for tenant ipv4 subnets"),
- cfg.StrOpt('tenant_network_v6_cidr',
- default="2003::/48",
- help="The cidr block to allocate tenant ipv6 subnets from"),
- cfg.IntOpt('tenant_network_v6_mask_bits',
- default=64,
- help="The mask bits for tenant ipv6 subnets"),
- cfg.BoolOpt('tenant_networks_reachable',
- default=False,
- help="Whether tenant network connectivity should be "
- "evaluated directly"),
- cfg.StrOpt('public_network_id',
- default="",
- help="Id of the public network that provides external "
- "connectivity"),
- cfg.StrOpt('public_router_id',
- default="",
- help="Id of the public router that provides external "
- "connectivity. This should only be used when Neutron's "
- "'allow_overlapping_ips' is set to 'False' in "
- "neutron.conf. usually not needed past 'Grizzly' release"),
- cfg.IntOpt('build_timeout',
- default=300,
- help="Timeout in seconds to wait for network operation to "
- "complete."),
- cfg.IntOpt('build_interval',
- default=1,
- help="Time in seconds between network operation status "
- "checks."),
- cfg.ListOpt('dns_servers',
- default=["8.8.8.8", "8.8.4.4"],
- help="List of dns servers which should be used"
- " for subnet creation"),
- cfg.StrOpt('port_vnic_type',
- choices=[None, 'normal', 'direct', 'macvtap'],
- help="vnic_type to use when Launching instances"
- " with pre-configured ports."
- " Supported ports are:"
- " ['normal','direct','macvtap']"),
-]
-
-network_feature_group = cfg.OptGroup(name='network-feature-enabled',
- title='Enabled network service features')
-
-NetworkFeaturesGroup = [
- cfg.BoolOpt('ipv6',
- default=True,
- help="Allow the execution of IPv6 tests"),
- cfg.ListOpt('api_extensions',
- default=['all'],
- help='A list of enabled network extensions with a special '
- 'entry all which indicates every extension is enabled. '
- 'Empty list indicates all extensions are disabled'),
- cfg.BoolOpt('ipv6_subnet_attributes',
- default=False,
- help="Allow the execution of IPv6 subnet tests that use "
- "the extended IPv6 attributes ipv6_ra_mode "
- "and ipv6_address_mode"
- ),
+NeutronPluginOptions = [
cfg.BoolOpt('specify_floating_ip_address_available',
default=True,
help='Allow passing an IP Address of the floating ip when '
- 'creating the floating ip'),
-]
+ 'creating the floating ip')]
-messaging_group = cfg.OptGroup(name='messaging',
- title='Messaging Service')
-
-MessagingGroup = [
- cfg.StrOpt('catalog_type',
- default='messaging',
- help='Catalog type of the Messaging service.'),
- cfg.IntOpt('max_queues_per_page',
- default=20,
- help='The maximum number of queue records per page when '
- 'listing queues'),
- cfg.IntOpt('max_queue_metadata',
- default=65536,
- help='The maximum metadata size for a queue'),
- cfg.IntOpt('max_messages_per_page',
- default=20,
- help='The maximum number of queue message per page when '
- 'listing (or) posting messages'),
- cfg.IntOpt('max_message_size',
- default=262144,
- help='The maximum size of a message body'),
- cfg.IntOpt('max_messages_per_claim',
- default=20,
- help='The maximum number of messages per claim'),
- cfg.IntOpt('max_message_ttl',
- default=1209600,
- help='The maximum ttl for a message'),
- cfg.IntOpt('max_claim_ttl',
- default=43200,
- help='The maximum ttl for a claim'),
- cfg.IntOpt('max_claim_grace',
- default=43200,
- help='The maximum grace period for a claim'),
-]
-
-volume_group = cfg.OptGroup(name='volume',
- title='Block Storage Options')
-
-VolumeGroup = [
- cfg.IntOpt('build_interval',
- default=1,
- help='Time in seconds between volume availability checks.'),
- cfg.IntOpt('build_timeout',
- default=300,
- help='Timeout in seconds to wait for a volume to become '
- 'available.'),
- cfg.StrOpt('catalog_type',
- default='volume',
- help="Catalog type of the Volume Service"),
- cfg.StrOpt('region',
- default='',
- help="The volume region name to use. If empty, the value "
- "of identity.region is used instead. If no such region "
- "is found in the service catalog, the first found one is "
- "used."),
- cfg.StrOpt('endpoint_type',
- default='publicURL',
- choices=['public', 'admin', 'internal',
- 'publicURL', 'adminURL', 'internalURL'],
- help="The endpoint type to use for the volume service."),
- cfg.StrOpt('backend1_name',
- default='BACKEND_1',
- help="Name of the backend1 (must be declared in cinder.conf)"),
- cfg.StrOpt('backend2_name',
- default='BACKEND_2',
- help="Name of the backend2 (must be declared in cinder.conf)"),
- cfg.StrOpt('storage_protocol',
- default='iSCSI',
- help='Backend protocol to target when creating volume types'),
- cfg.StrOpt('vendor_name',
- default='Open Source',
- help='Backend vendor to target when creating volume types'),
- cfg.StrOpt('disk_format',
- default='raw',
- help='Disk format to use when copying a volume to image'),
- cfg.IntOpt('volume_size',
- default=1,
- help='Default size in GB for volumes created by volumes tests'),
-]
-
-volume_feature_group = cfg.OptGroup(name='volume-feature-enabled',
- title='Enabled Cinder Features')
-
-VolumeFeaturesGroup = [
- cfg.BoolOpt('multi_backend',
- default=False,
- help="Runs Cinder multi-backend test (requires 2 backends)"),
- cfg.BoolOpt('backup',
- default=True,
- help='Runs Cinder volumes backup test'),
- cfg.BoolOpt('snapshot',
- default=True,
- help='Runs Cinder volume snapshot test'),
- cfg.ListOpt('api_extensions',
- default=['all'],
- help='A list of enabled volume extensions with a special '
- 'entry all which indicates every extension is enabled. '
- 'Empty list indicates all extensions are disabled'),
- cfg.BoolOpt('api_v1',
- default=True,
- help="Is the v1 volume API enabled"),
- cfg.BoolOpt('api_v2',
- default=True,
- help="Is the v2 volume API enabled"),
-]
-
-
-object_storage_group = cfg.OptGroup(name='object-storage',
- title='Object Storage Service Options')
-
-ObjectStoreGroup = [
- cfg.StrOpt('catalog_type',
- default='object-store',
- help="Catalog type of the Object-Storage service."),
- cfg.StrOpt('region',
- default='',
- help="The object-storage region name to use. If empty, the "
- "value of identity.region is used instead. If no such "
- "region is found in the service catalog, the first found "
- "one is used."),
- cfg.StrOpt('endpoint_type',
- default='publicURL',
- choices=['public', 'admin', 'internal',
- 'publicURL', 'adminURL', 'internalURL'],
- help="The endpoint type to use for the object-store service."),
- cfg.IntOpt('container_sync_timeout',
- default=600,
- help="Number of seconds to time on waiting for a container "
- "to container synchronization complete."),
- cfg.IntOpt('container_sync_interval',
- default=5,
- help="Number of seconds to wait while looping to check the "
- "status of a container to container synchronization"),
- cfg.StrOpt('operator_role',
- default='Member',
- help="Role to add to users created for swift tests to "
- "enable creating containers"),
- cfg.StrOpt('reseller_admin_role',
- default='ResellerAdmin',
- help="User role that has reseller admin"),
- cfg.StrOpt('realm_name',
- default='realm1',
- help="Name of sync realm. A sync realm is a set of clusters "
- "that have agreed to allow container syncing with each "
- "other. Set the same realm name as Swift's "
- "container-sync-realms.conf"),
- cfg.StrOpt('cluster_name',
- default='name1',
- help="One name of cluster which is set in the realm whose name "
- "is set in 'realm_name' item in this file. Set the "
- "same cluster name as Swift's container-sync-realms.conf"),
-]
-
-object_storage_feature_group = cfg.OptGroup(
- name='object-storage-feature-enabled',
- title='Enabled object-storage features')
-
-ObjectStoreFeaturesGroup = [
- cfg.ListOpt('discoverable_apis',
- default=['all'],
- help="A list of the enabled optional discoverable apis. "
- "A single entry, all, indicates that all of these "
- "features are expected to be enabled"),
- cfg.BoolOpt('container_sync',
- default=True,
- help="Execute (old style) container-sync tests"),
- cfg.BoolOpt('object_versioning',
- default=True,
- help="Execute object-versioning tests"),
- cfg.BoolOpt('discoverability',
- default=True,
- help="Execute discoverability tests"),
-]
-
-database_group = cfg.OptGroup(name='database',
- title='Database Service Options')
-
-DatabaseGroup = [
- cfg.StrOpt('catalog_type',
- default='database',
- help="Catalog type of the Database service."),
- cfg.StrOpt('db_flavor_ref',
- default="1",
- help="Valid primary flavor to use in database tests."),
- cfg.StrOpt('db_current_version',
- default="v1.0",
- help="Current database version to use in database tests."),
-]
-
-orchestration_group = cfg.OptGroup(name='orchestration',
- title='Orchestration Service Options')
-
-OrchestrationGroup = [
- cfg.StrOpt('catalog_type',
- default='orchestration',
- help="Catalog type of the Orchestration service."),
- cfg.StrOpt('region',
- default='',
- help="The orchestration region name to use. If empty, the "
- "value of identity.region is used instead. If no such "
- "region is found in the service catalog, the first found "
- "one is used."),
- cfg.StrOpt('endpoint_type',
- default='publicURL',
- choices=['public', 'admin', 'internal',
- 'publicURL', 'adminURL', 'internalURL'],
- help="The endpoint type to use for the orchestration service."),
- cfg.IntOpt('build_interval',
- default=1,
- help="Time in seconds between build status checks."),
- cfg.IntOpt('build_timeout',
- default=1200,
- help="Timeout in seconds to wait for a stack to build."),
- cfg.StrOpt('instance_type',
- default='m1.micro',
- help="Instance type for tests. Needs to be big enough for a "
- "full OS plus the test workload"),
- cfg.StrOpt('keypair_name',
- help="Name of existing keypair to launch servers with."),
- cfg.IntOpt('max_template_size',
- default=524288,
- help="Value must match heat configuration of the same name."),
- cfg.IntOpt('max_resources_per_stack',
- default=1000,
- help="Value must match heat configuration of the same name."),
-]
-
-
-telemetry_group = cfg.OptGroup(name='telemetry',
- title='Telemetry Service Options')
-
-TelemetryGroup = [
- cfg.StrOpt('catalog_type',
- default='metering',
- help="Catalog type of the Telemetry service."),
- cfg.StrOpt('endpoint_type',
- default='publicURL',
- choices=['public', 'admin', 'internal',
- 'publicURL', 'adminURL', 'internalURL'],
- help="The endpoint type to use for the telemetry service."),
- cfg.BoolOpt('too_slow_to_test',
- default=True,
- help="This variable is used as flag to enable "
- "notification tests")
-]
-
-
-dashboard_group = cfg.OptGroup(name="dashboard",
- title="Dashboard options")
-
-DashboardGroup = [
- cfg.StrOpt('dashboard_url',
- default='http://localhost/',
- help="Where the dashboard can be found"),
- cfg.StrOpt('login_url',
- default='http://localhost/auth/login/',
- help="Login page for the dashboard"),
-]
-
-
-data_processing_group = cfg.OptGroup(name="data_processing",
- title="Data Processing options")
-
-DataProcessingGroup = [
- cfg.StrOpt('catalog_type',
- default='data_processing',
- help="Catalog type of the data processing service."),
- cfg.StrOpt('endpoint_type',
- default='publicURL',
- choices=['public', 'admin', 'internal',
- 'publicURL', 'adminURL', 'internalURL'],
- help="The endpoint type to use for the data processing "
- "service."),
-]
-
-
-data_processing_feature_group = cfg.OptGroup(
- name="data_processing-feature-enabled",
- title="Enabled Data Processing features")
-
-DataProcessingFeaturesGroup = [
- cfg.ListOpt('plugins',
- default=["vanilla", "hdp"],
- help="List of enabled data processing plugins")
-]
-
-
-boto_group = cfg.OptGroup(name='boto',
- title='EC2/S3 options')
-BotoGroup = [
- cfg.StrOpt('ec2_url',
- default="http://localhost:8773/services/Cloud",
- help="EC2 URL"),
- cfg.StrOpt('s3_url',
- default="http://localhost:8080",
- help="S3 URL"),
- cfg.StrOpt('aws_secret',
- help="AWS Secret Key",
- secret=True),
- cfg.StrOpt('aws_access',
- help="AWS Access Key"),
- cfg.StrOpt('aws_zone',
- default="nova",
- help="AWS Zone for EC2 tests"),
- cfg.StrOpt('s3_materials_path',
- default="/opt/stack/devstack/files/images/"
- "s3-materials/cirros-0.3.0",
- help="S3 Materials Path"),
- cfg.StrOpt('ari_manifest',
- default="cirros-0.3.0-x86_64-initrd.manifest.xml",
- help="ARI Ramdisk Image manifest"),
- cfg.StrOpt('ami_manifest',
- default="cirros-0.3.0-x86_64-blank.img.manifest.xml",
- help="AMI Machine Image manifest"),
- cfg.StrOpt('aki_manifest',
- default="cirros-0.3.0-x86_64-vmlinuz.manifest.xml",
- help="AKI Kernel Image manifest"),
- cfg.StrOpt('instance_type',
- default="m1.tiny",
- help="Instance type"),
- cfg.IntOpt('http_socket_timeout',
- default=3,
- help="boto Http socket timeout"),
- cfg.IntOpt('num_retries',
- default=1,
- help="boto num_retries on error"),
- cfg.IntOpt('build_timeout',
- default=60,
- help="Status Change Timeout"),
- cfg.IntOpt('build_interval',
- default=1,
- help="Status Change Test Interval"),
-]
-
-stress_group = cfg.OptGroup(name='stress', title='Stress Test Options')
-
-StressGroup = [
- cfg.StrOpt('nova_logdir',
- help='Directory containing log files on the compute nodes'),
- cfg.IntOpt('max_instances',
- default=16,
- help='Maximum number of instances to create during test.'),
- cfg.StrOpt('controller',
- help='Controller host.'),
- # new stress options
- cfg.StrOpt('target_controller',
- help='Controller host.'),
- cfg.StrOpt('target_ssh_user',
- help='ssh user.'),
- cfg.StrOpt('target_private_key_path',
- help='Path to private key.'),
- cfg.StrOpt('target_logfiles',
- help='regexp for list of log files.'),
- cfg.IntOpt('log_check_interval',
- default=60,
- help='time (in seconds) between log file error checks.'),
- cfg.IntOpt('default_thread_number_per_action',
- default=4,
- help='The number of threads created while stress test.'),
- cfg.BoolOpt('leave_dirty_stack',
- default=False,
- help='Prevent the cleaning (tearDownClass()) between'
- ' each stress test run if an exception occurs'
- ' during this run.'),
- cfg.BoolOpt('full_clean_stack',
- default=False,
- help='Allows a full cleaning process after a stress test.'
- ' Caution : this cleanup will remove every objects of'
- ' every tenant.')
-]
-
-
-scenario_group = cfg.OptGroup(name='scenario', title='Scenario Test Options')
-
-ScenarioGroup = [
- cfg.StrOpt('img_dir',
- default='/opt/stack/new/devstack/files/images/'
- 'cirros-0.3.1-x86_64-uec',
- help='Directory containing image files'),
- cfg.StrOpt('img_file', deprecated_name='qcow2_img_file',
- default='cirros-0.3.1-x86_64-disk.img',
- help='Image file name'),
- cfg.StrOpt('img_disk_format',
- default='qcow2',
- help='Image disk format'),
- cfg.StrOpt('img_container_format',
- default='bare',
- help='Image container format'),
- cfg.StrOpt('ami_img_file',
- default='cirros-0.3.1-x86_64-blank.img',
- help='AMI image file name'),
- cfg.StrOpt('ari_img_file',
- default='cirros-0.3.1-x86_64-initrd',
- help='ARI image file name'),
- cfg.StrOpt('aki_img_file',
- default='cirros-0.3.1-x86_64-vmlinuz',
- help='AKI image file name'),
- cfg.StrOpt('ssh_user',
- default='cirros',
- help='ssh username for the image file'),
- cfg.IntOpt(
- 'large_ops_number',
- default=0,
- help="specifies how many resources to request at once. Used "
- "for large operations testing."),
- # TODO(yfried): add support for dhcpcd
- cfg.StrOpt('dhcp_client',
- default='udhcpc',
- choices=["udhcpc", "dhclient"],
- help='DHCP client used by images to renew DCHP lease. '
- 'If left empty, update operation will be skipped. '
- 'Supported clients: "udhcpc", "dhclient"')
-]
-
-
-service_available_group = cfg.OptGroup(name="service_available",
- title="Available OpenStack Services")
-
-ServiceAvailableGroup = [
- cfg.BoolOpt('cinder',
- default=True,
- help="Whether or not cinder is expected to be available"),
- cfg.BoolOpt('neutron',
- default=False,
- help="Whether or not neutron is expected to be available"),
- cfg.BoolOpt('glance',
- default=True,
- help="Whether or not glance is expected to be available"),
- cfg.BoolOpt('swift',
- default=True,
- help="Whether or not swift is expected to be available"),
- cfg.BoolOpt('nova',
- default=True,
- help="Whether or not nova is expected to be available"),
- cfg.BoolOpt('heat',
- default=False,
- help="Whether or not Heat is expected to be available"),
- cfg.BoolOpt('ceilometer',
- default=True,
- help="Whether or not Ceilometer is expected to be available"),
- cfg.BoolOpt('horizon',
- default=True,
- help="Whether or not Horizon is expected to be available"),
- cfg.BoolOpt('sahara',
- default=False,
- help="Whether or not Sahara is expected to be available"),
- cfg.BoolOpt('ironic',
- default=False,
- help="Whether or not Ironic is expected to be available"),
- cfg.BoolOpt('trove',
- default=False,
- help="Whether or not Trove is expected to be available"),
- cfg.BoolOpt('zaqar',
- default=False,
- help="Whether or not Zaqar is expected to be available"),
-]
-
-debug_group = cfg.OptGroup(name="debug",
- title="Debug System")
-
-DebugGroup = [
- cfg.StrOpt('trace_requests',
- default='',
- help="""A regex to determine which requests should be traced.
-
-This is a regex to match the caller for rest client requests to be able to
-selectively trace calls out of specific classes and methods. It largely
-exists for test development, and is not expected to be used in a real deploy
-of tempest. This will be matched against the discovered ClassName:method
-in the test environment.
-
-Expected values for this field are:
-
- * ClassName:test_method_name - traces one test_method
- * ClassName:setUp(Class) - traces specific setup functions
- * ClassName:tearDown(Class) - traces specific teardown functions
- * ClassName:_run_cleanups - traces the cleanup functions
-
-If nothing is specified, this feature is not enabled. To trace everything
-specify .* as the regex.
-""")
-]
-
-input_scenario_group = cfg.OptGroup(name="input-scenario",
- title="Filters and values for"
- " input scenarios")
-
-InputScenarioGroup = [
- cfg.StrOpt('image_regex',
- default='^cirros-0.3.1-x86_64-uec$',
- help="Matching images become parameters for scenario tests"),
- cfg.StrOpt('flavor_regex',
- default='^m1.nano$',
- help="Matching flavors become parameters for scenario tests"),
- cfg.StrOpt('non_ssh_image_regex',
- default='^.*[Ww]in.*$',
- help="SSH verification in tests is skipped"
- "for matching images"),
- cfg.StrOpt('ssh_user_regex',
- default="[[\"^.*[Cc]irros.*$\", \"root\"]]",
- help="List of user mapped to regex "
- "to matching image names."),
-]
-
-
-baremetal_group = cfg.OptGroup(name='baremetal',
- title='Baremetal provisioning service options',
- help='When enabling baremetal tests, Nova '
- 'must be configured to use the Ironic '
- 'driver. The following paremeters for the '
- '[compute] section must be disabled: '
- 'console_output, interface_attach, '
- 'live_migration, pause, rescue, resize '
- 'shelve, snapshot, and suspend')
-
-BaremetalGroup = [
- cfg.StrOpt('catalog_type',
- default='baremetal',
- help="Catalog type of the baremetal provisioning service"),
- cfg.BoolOpt('driver_enabled',
- default=False,
- help="Whether the Ironic nova-compute driver is enabled"),
- cfg.StrOpt('driver',
- default='fake',
- help="Driver name which Ironic uses"),
- cfg.StrOpt('endpoint_type',
- default='publicURL',
- choices=['public', 'admin', 'internal',
- 'publicURL', 'adminURL', 'internalURL'],
- help="The endpoint type to use for the baremetal provisioning "
- "service"),
- cfg.IntOpt('active_timeout',
- default=300,
- help="Timeout for Ironic node to completely provision"),
- cfg.IntOpt('association_timeout',
- default=30,
- help="Timeout for association of Nova instance and Ironic "
- "node"),
- cfg.IntOpt('power_timeout',
- default=60,
- help="Timeout for Ironic power transitions."),
- cfg.IntOpt('unprovision_timeout',
- default=60,
- help="Timeout for unprovisioning an Ironic node.")
-]
-
-cli_group = cfg.OptGroup(name='cli', title="cli Configuration Options")
-
-CLIGroup = [
- cfg.BoolOpt('enabled',
- default=True,
- help="enable cli tests"),
- cfg.StrOpt('cli_dir',
- default='/usr/local/bin',
- help="directory where python client binaries are located"),
- cfg.BoolOpt('has_manage',
- default=True,
- help=("Whether the tempest run location has access to the "
- "*-manage commands. In a pure blackbox environment "
- "it will not.")),
- cfg.IntOpt('timeout',
- default=15,
- help="Number of seconds to wait on a CLI timeout"),
-]
-
-negative_group = cfg.OptGroup(name='negative', title="Negative Test Options")
-
-NegativeGroup = [
- cfg.StrOpt('test_generator',
- default='tempest.common.' +
- 'generator.negative_generator.NegativeTestGenerator',
- help="Test generator class for all negative tests"),
-]
-
-_opts = [
- (auth_group, AuthGroup),
- (compute_group, ComputeGroup),
- (compute_features_group, ComputeFeaturesGroup),
- (identity_group, IdentityGroup),
- (identity_feature_group, IdentityFeatureGroup),
- (image_group, ImageGroup),
- (image_feature_group, ImageFeaturesGroup),
- (network_group, NetworkGroup),
- (network_feature_group, NetworkFeaturesGroup),
- (messaging_group, MessagingGroup),
- (volume_group, VolumeGroup),
- (volume_feature_group, VolumeFeaturesGroup),
- (object_storage_group, ObjectStoreGroup),
- (object_storage_feature_group, ObjectStoreFeaturesGroup),
- (database_group, DatabaseGroup),
- (orchestration_group, OrchestrationGroup),
- (telemetry_group, TelemetryGroup),
- (dashboard_group, DashboardGroup),
- (data_processing_group, DataProcessingGroup),
- (data_processing_feature_group, DataProcessingFeaturesGroup),
- (boto_group, BotoGroup),
- (stress_group, StressGroup),
- (scenario_group, ScenarioGroup),
- (service_available_group, ServiceAvailableGroup),
- (debug_group, DebugGroup),
- (baremetal_group, BaremetalGroup),
- (input_scenario_group, InputScenarioGroup),
- (cli_group, CLIGroup),
- (negative_group, NegativeGroup)
-]
-
-
-def register_opts():
- for g, o in _opts:
- register_opt_group(cfg.CONF, g, o)
-
-
-def list_opts():
- """Return a list of oslo.config options available.
-
- The purpose of this is to allow tools like the Oslo sample config file
- generator to discover the options exposed to users.
- """
- return [(g.name, o) for g, o in _opts]
-
-
-# this should never be called outside of this class
-class TempestConfigPrivate(object):
- """Provides OpenStack configuration information."""
-
- DEFAULT_CONFIG_DIR = os.path.join(
- os.path.abspath(os.path.dirname(os.path.dirname(__file__))),
- "etc")
-
- DEFAULT_CONFIG_FILE = "tempest.conf"
-
- def __getattr__(self, attr):
- # Handles config options from the default group
- return getattr(cfg.CONF, attr)
-
- def _set_attrs(self):
- self.auth = cfg.CONF.auth
- self.compute = cfg.CONF.compute
- self.compute_feature_enabled = cfg.CONF['compute-feature-enabled']
- self.identity = cfg.CONF.identity
- self.identity_feature_enabled = cfg.CONF['identity-feature-enabled']
- self.image = cfg.CONF.image
- self.image_feature_enabled = cfg.CONF['image-feature-enabled']
- self.network = cfg.CONF.network
- self.network_feature_enabled = cfg.CONF['network-feature-enabled']
- self.volume = cfg.CONF.volume
- self.volume_feature_enabled = cfg.CONF['volume-feature-enabled']
- self.object_storage = cfg.CONF['object-storage']
- self.object_storage_feature_enabled = cfg.CONF[
- 'object-storage-feature-enabled']
- self.database = cfg.CONF.database
- self.orchestration = cfg.CONF.orchestration
- self.messaging = cfg.CONF.messaging
- self.telemetry = cfg.CONF.telemetry
- self.dashboard = cfg.CONF.dashboard
- self.data_processing = cfg.CONF.data_processing
- self.data_processing_feature_enabled = cfg.CONF[
- 'data_processing-feature-enabled']
- self.boto = cfg.CONF.boto
- self.stress = cfg.CONF.stress
- self.scenario = cfg.CONF.scenario
- self.service_available = cfg.CONF.service_available
- self.debug = cfg.CONF.debug
- self.baremetal = cfg.CONF.baremetal
- self.input_scenario = cfg.CONF['input-scenario']
- self.cli = cfg.CONF.cli
- self.negative = cfg.CONF.negative
-
- self.identity.admin_username = self.auth.admin_username
- self.identity.admin_password = self.auth.admin_password
- self.identity.admin_tenant_name = self.auth.admin_tenant_name
- self.identity.admin_domain_name = self.auth.admin_domain_name
- self.identity.password = self.auth.admin_password
- self.identity.tenant_name = 'demo'
- self.identity.username = 'demo'
- self.identity.alt_username = 'alt_demo'
- self.identity.alt_tenant_name = 'alt_demo'
- self.identity.alt_password = self.auth.admin_password
-
- cfg.CONF.set_default('domain_name', self.identity.admin_domain_name,
- group='identity')
- cfg.CONF.set_default('alt_domain_name',
- self.identity.admin_domain_name,
- group='identity')
-
- def __init__(self, parse_conf=True, config_path=None):
- """Initialize a configuration from a conf directory and conf file."""
- super(TempestConfigPrivate, self).__init__()
- config_files = []
- failsafe_path = "/etc/tempest/" + self.DEFAULT_CONFIG_FILE
-
- if config_path:
- path = config_path
- else:
- # Environment variables override defaults...
- conf_dir = os.environ.get('TEMPEST_CONFIG_DIR',
- self.DEFAULT_CONFIG_DIR)
- conf_file = os.environ.get('TEMPEST_CONFIG',
- self.DEFAULT_CONFIG_FILE)
-
- path = os.path.join(conf_dir, conf_file)
-
- if not os.path.isfile(path):
- path = failsafe_path
-
- # only parse the config file if we expect one to exist. This is needed
- # to remove an issue with the config file up to date checker.
- if parse_conf:
- config_files.append(path)
- logging.register_options(cfg.CONF)
- if os.path.isfile(path):
- cfg.CONF([], project='tempest', default_config_files=config_files)
- else:
- cfg.CONF([], project='tempest')
- logging.setup(cfg.CONF, 'tempest')
- LOG = logging.getLogger('tempest')
- LOG.info("Using tempest config file %s" % path)
- register_opts()
- self._set_attrs()
- if parse_conf:
- cfg.CONF.log_opt_values(LOG, logging.DEBUG)
-
-
-class TempestConfigProxy(object):
- _config = None
- _path = None
-
- _extra_log_defaults = [
- ('keystoneclient.session', logging.INFO),
- ('paramiko.transport', logging.INFO),
- ('requests.packages.urllib3.connectionpool', logging.WARN),
- ]
-
- def _fix_log_levels(self):
- """Tweak the oslo log defaults."""
- for name, level in self._extra_log_defaults:
- logging.getLogger(name).logger.setLevel(level)
-
- def __getattr__(self, attr):
- if not self._config:
- self._fix_log_levels()
- self._config = TempestConfigPrivate(config_path=self._path)
-
- return getattr(self._config, attr)
-
- def set_config_path(self, path):
- self._path = path
-
-
-CONF = TempestConfigProxy()
+# TODO(amuller): Redo configuration options registration as part of the planned
+# transition to the Tempest plugin architecture
+for opt in NeutronPluginOptions:
+ CONF.register_opt(opt, 'neutron_plugin_options')
diff --git a/neutron/tests/tempest/exceptions.py b/neutron/tests/tempest/exceptions.py
index db66bba..bdc3b8d 100644
--- a/neutron/tests/tempest/exceptions.py
+++ b/neutron/tests/tempest/exceptions.py
@@ -13,42 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.
-import testtools
+from tempest_lib import exceptions
-
-class TempestException(Exception):
- """
- Base Tempest Exception
-
- To correctly use this class, inherit from it and define
- a 'message' property. That message will get printf'd
- with the keyword arguments provided to the constructor.
- """
- message = "An unknown exception occurred"
-
- def __init__(self, *args, **kwargs):
- super(TempestException, self).__init__()
- try:
- self._error_string = self.message % kwargs
- except Exception:
- # at least get the core message out if something happened
- self._error_string = self.message
- if len(args) > 0:
- # If there is a non-kwarg parameter, assume it's the error
- # message or reason description and tack it on to the end
- # of the exception message
- # Convert all arguments into their string representations...
- args = ["%s" % arg for arg in args]
- self._error_string = (self._error_string +
- "\nDetails: %s" % '\n'.join(args))
-
- def __str__(self):
- return self._error_string
-
-
-class RestClientException(TempestException,
- testtools.TestCase.failureException):
- pass
+TempestException = exceptions.TempestException
class InvalidConfiguration(TempestException):
@@ -61,134 +28,3 @@
class InvalidServiceTag(TempestException):
message = "Invalid service tag"
-
-
-class InvalidIdentityVersion(TempestException):
- message = "Invalid version %(identity_version)s of the identity service"
-
-
-class TimeoutException(TempestException):
- message = "Request timed out"
-
-
-class BuildErrorException(TempestException):
- message = "Server %(server_id)s failed to build and is in ERROR status"
-
-
-class ImageKilledException(TempestException):
- message = "Image %(image_id)s 'killed' while waiting for '%(status)s'"
-
-
-class AddImageException(TempestException):
- message = "Image %(image_id)s failed to become ACTIVE in the allotted time"
-
-
-class EC2RegisterImageException(TempestException):
- message = ("Image %(image_id)s failed to become 'available' "
- "in the allotted time")
-
-
-class VolumeBuildErrorException(TempestException):
- message = "Volume %(volume_id)s failed to build and is in ERROR status"
-
-
-class SnapshotBuildErrorException(TempestException):
- message = "Snapshot %(snapshot_id)s failed to build and is in ERROR status"
-
-
-class VolumeBackupException(TempestException):
- message = "Volume backup %(backup_id)s failed and is in ERROR status"
-
-
-class StackBuildErrorException(TempestException):
- message = ("Stack %(stack_identifier)s is in %(stack_status)s status "
- "due to '%(stack_status_reason)s'")
-
-
-class StackResourceBuildErrorException(TempestException):
- message = ("Resource %(resource_name)s in stack %(stack_identifier)s is "
- "in %(resource_status)s status due to "
- "'%(resource_status_reason)s'")
-
-
-class AuthenticationFailure(TempestException):
- message = ("Authentication with user %(user)s and password "
- "%(password)s failed auth using tenant %(tenant)s.")
-
-
-class EndpointNotFound(TempestException):
- message = "Endpoint not found"
-
-
-class ImageFault(TempestException):
- message = "Got image fault"
-
-
-class IdentityError(TempestException):
- message = "Got identity error"
-
-
-class SSHTimeout(TempestException):
- message = ("Connection to the %(host)s via SSH timed out.\n"
- "User: %(user)s, Password: %(password)s")
-
-
-class SSHExecCommandFailed(TempestException):
- """Raised when remotely executed command returns nonzero status."""
- message = ("Command '%(command)s', exit status: %(exit_status)d, "
- "Error:\n%(strerror)s")
-
-
-class ServerUnreachable(TempestException):
- message = "The server is not reachable via the configured network"
-
-
-class TearDownException(TempestException):
- message = "%(num)d cleanUp operation failed"
-
-
-class RFCViolation(RestClientException):
- message = "RFC Violation"
-
-
-class InvalidHttpSuccessCode(RestClientException):
- message = "The success code is different than the expected one"
-
-
-class BadRequest(RestClientException):
- message = "Bad request"
-
-
-class ResponseWithNonEmptyBody(RFCViolation):
- message = ("RFC Violation! Response with %(status)d HTTP Status Code "
- "MUST NOT have a body")
-
-
-class ResponseWithEntity(RFCViolation):
- message = ("RFC Violation! Response with 205 HTTP Status Code "
- "MUST NOT have an entity")
-
-
-class InvalidHTTPResponseHeader(RestClientException):
- message = "HTTP response header is invalid"
-
-
-class InvalidStructure(TempestException):
- message = "Invalid structure of table with details"
-
-
-class CommandFailed(Exception):
- def __init__(self, returncode, cmd, output, stderr):
- super(CommandFailed, self).__init__()
- self.returncode = returncode
- self.cmd = cmd
- self.stdout = output
- self.stderr = stderr
-
- def __str__(self):
- return ("Command '%s' returned non-zero exit status %d.\n"
- "stdout:\n%s\n"
- "stderr:\n%s" % (self.cmd,
- self.returncode,
- self.stdout,
- self.stderr))
diff --git a/neutron/tests/tempest/manager.py b/neutron/tests/tempest/manager.py
deleted file mode 100644
index 969bf98..0000000
--- a/neutron/tests/tempest/manager.py
+++ /dev/null
@@ -1,74 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from neutron.tests.tempest import auth
-from neutron.tests.tempest.common import cred_provider
-from neutron.tests.tempest import config
-from neutron.tests.tempest import exceptions
-
-CONF = config.CONF
-
-
-class Manager(object):
-
- """
- Base manager class
-
- Manager objects are responsible for providing a configuration object
- and a client object for a test case to use in performing actions.
- """
-
- def __init__(self, credentials=None):
- """
- We allow overriding of the credentials used within the various
- client classes managed by the Manager object. Left as None, the
- standard username/password/tenant_name[/domain_name] is used.
-
- :param credentials: Override of the credentials
- """
- self.auth_version = CONF.identity.auth_version
- if credentials is None:
- self.credentials = cred_provider.get_configured_credentials('user')
- else:
- self.credentials = credentials
- # Check if passed or default credentials are valid
- if not self.credentials.is_valid():
- raise exceptions.InvalidCredentials()
- # Creates an auth provider for the credentials
- self.auth_provider = get_auth_provider(self.credentials)
- # FIXME(andreaf) unused
- self.client_attr_names = []
-
-
-def get_auth_provider_class(credentials):
- if isinstance(credentials, auth.KeystoneV3Credentials):
- return auth.KeystoneV3AuthProvider, CONF.identity.uri_v3
- else:
- return auth.KeystoneV2AuthProvider, CONF.identity.uri
-
-
-def get_auth_provider(credentials):
- default_params = {
- 'disable_ssl_certificate_validation':
- CONF.identity.disable_ssl_certificate_validation,
- 'ca_certs': CONF.identity.ca_certificates_file,
- 'trace_requests': CONF.debug.trace_requests
- }
- if credentials is None:
- raise exceptions.InvalidCredentials(
- 'Credentials must be specified')
- auth_provider_class, auth_url = get_auth_provider_class(
- credentials)
- return auth_provider_class(credentials, auth_url, **default_params)
diff --git a/neutron/tests/tempest/services/botoclients.py b/neutron/tests/tempest/services/botoclients.py
deleted file mode 100644
index 87d5266..0000000
--- a/neutron/tests/tempest/services/botoclients.py
+++ /dev/null
@@ -1,216 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from six.moves import configparser as ConfigParser
-import contextlib
-from tempest_lib import exceptions as lib_exc
-import types
-import urlparse
-
-from neutron.tests.tempest import config
-
-import boto
-import boto.ec2
-import boto.s3.connection
-
-CONF = config.CONF
-
-
-class BotoClientBase(object):
-
- ALLOWED_METHODS = set()
-
- def __init__(self, identity_client):
- self.identity_client = identity_client
-
- self.ca_cert = CONF.identity.ca_certificates_file
- self.connection_timeout = str(CONF.boto.http_socket_timeout)
- self.num_retries = str(CONF.boto.num_retries)
- self.build_timeout = CONF.boto.build_timeout
-
- self.connection_data = {}
-
- def _config_boto_timeout(self, timeout, retries):
- try:
- boto.config.add_section("Boto")
- except ConfigParser.DuplicateSectionError:
- pass
- boto.config.set("Boto", "http_socket_timeout", timeout)
- boto.config.set("Boto", "num_retries", retries)
-
- def _config_boto_ca_certificates_file(self, ca_cert):
- if ca_cert is None:
- return
-
- try:
- boto.config.add_section("Boto")
- except ConfigParser.DuplicateSectionError:
- pass
- boto.config.set("Boto", "ca_certificates_file", ca_cert)
-
- def __getattr__(self, name):
- """Automatically creates methods for the allowed methods set."""
- if name in self.ALLOWED_METHODS:
- def func(self, *args, **kwargs):
- with contextlib.closing(self.get_connection()) as conn:
- return getattr(conn, name)(*args, **kwargs)
-
- func.__name__ = name
- setattr(self, name, types.MethodType(func, self, self.__class__))
- setattr(self.__class__, name,
- types.MethodType(func, None, self.__class__))
- return getattr(self, name)
- else:
- raise AttributeError(name)
-
- def get_connection(self):
- self._config_boto_timeout(self.connection_timeout, self.num_retries)
- self._config_boto_ca_certificates_file(self.ca_cert)
-
- ec2_client_args = {'aws_access_key_id': CONF.boto.aws_access,
- 'aws_secret_access_key': CONF.boto.aws_secret}
- if not all(ec2_client_args.values()):
- ec2_client_args = self.get_aws_credentials(self.identity_client)
-
- self.connection_data.update(ec2_client_args)
- return self.connect_method(**self.connection_data)
-
- def get_aws_credentials(self, identity_client):
- """
- Obtain existing, or create new AWS credentials
- :param identity_client: identity client with embedded credentials
- :return: EC2 credentials
- """
- ec2_cred_list = identity_client.list_user_ec2_credentials(
- identity_client.user_id)
- for cred in ec2_cred_list:
- if cred['tenant_id'] == identity_client.tenant_id:
- ec2_cred = cred
- break
- else:
- ec2_cred = identity_client.create_user_ec2_credentials(
- identity_client.user_id, identity_client.tenant_id)
- if not all((ec2_cred, ec2_cred['access'], ec2_cred['secret'])):
- raise lib_exc.NotFound("Unable to get access and secret keys")
- else:
- ec2_cred_aws = {}
- ec2_cred_aws['aws_access_key_id'] = ec2_cred['access']
- ec2_cred_aws['aws_secret_access_key'] = ec2_cred['secret']
- return ec2_cred_aws
-
-
-class APIClientEC2(BotoClientBase):
-
- def connect_method(self, *args, **kwargs):
- return boto.connect_ec2(*args, **kwargs)
-
- def __init__(self, identity_client):
- super(APIClientEC2, self).__init__(identity_client)
- insecure_ssl = CONF.identity.disable_ssl_certificate_validation
- purl = urlparse.urlparse(CONF.boto.ec2_url)
-
- region_name = CONF.compute.region
- if not region_name:
- region_name = CONF.identity.region
- region = boto.ec2.regioninfo.RegionInfo(name=region_name,
- endpoint=purl.hostname)
- port = purl.port
- if port is None:
- if purl.scheme is not "https":
- port = 80
- else:
- port = 443
- else:
- port = int(port)
- self.connection_data.update({"is_secure": purl.scheme == "https",
- "validate_certs": not insecure_ssl,
- "region": region,
- "host": purl.hostname,
- "port": port,
- "path": purl.path})
-
- ALLOWED_METHODS = set(('create_key_pair', 'get_key_pair',
- 'delete_key_pair', 'import_key_pair',
- 'get_all_key_pairs',
- 'get_all_tags',
- 'create_image', 'get_image',
- 'register_image', 'deregister_image',
- 'get_all_images', 'get_image_attribute',
- 'modify_image_attribute', 'reset_image_attribute',
- 'get_all_kernels',
- 'create_volume', 'delete_volume',
- 'get_all_volume_status', 'get_all_volumes',
- 'get_volume_attribute', 'modify_volume_attribute'
- 'bundle_instance', 'cancel_spot_instance_requests',
- 'confirm_product_instanc',
- 'get_all_instance_status', 'get_all_instances',
- 'get_all_reserved_instances',
- 'get_all_spot_instance_requests',
- 'get_instance_attribute', 'monitor_instance',
- 'monitor_instances', 'unmonitor_instance',
- 'unmonitor_instances',
- 'purchase_reserved_instance_offering',
- 'reboot_instances', 'request_spot_instances',
- 'reset_instance_attribute', 'run_instances',
- 'start_instances', 'stop_instances',
- 'terminate_instances',
- 'attach_network_interface', 'attach_volume',
- 'detach_network_interface', 'detach_volume',
- 'get_console_output',
- 'delete_network_interface', 'create_subnet',
- 'create_network_interface', 'delete_subnet',
- 'get_all_network_interfaces',
- 'allocate_address', 'associate_address',
- 'disassociate_address', 'get_all_addresses',
- 'release_address',
- 'create_snapshot', 'delete_snapshot',
- 'get_all_snapshots', 'get_snapshot_attribute',
- 'modify_snapshot_attribute',
- 'reset_snapshot_attribute', 'trim_snapshots',
- 'get_all_regions', 'get_all_zones',
- 'get_all_security_groups', 'create_security_group',
- 'delete_security_group', 'authorize_security_group',
- 'authorize_security_group_egress',
- 'revoke_security_group',
- 'revoke_security_group_egress'))
-
-
-class ObjectClientS3(BotoClientBase):
-
- def connect_method(self, *args, **kwargs):
- return boto.connect_s3(*args, **kwargs)
-
- def __init__(self, identity_client):
- super(ObjectClientS3, self).__init__(identity_client)
- insecure_ssl = CONF.identity.disable_ssl_certificate_validation
- purl = urlparse.urlparse(CONF.boto.s3_url)
- port = purl.port
- if port is None:
- if purl.scheme is not "https":
- port = 80
- else:
- port = 443
- else:
- port = int(port)
- self.connection_data.update({"is_secure": purl.scheme == "https",
- "validate_certs": not insecure_ssl,
- "host": purl.hostname,
- "port": port,
- "calling_format": boto.s3.connection.
- OrdinaryCallingFormat()})
-
- ALLOWED_METHODS = set(('create_bucket', 'delete_bucket', 'generate_url',
- 'get_all_buckets', 'get_bucket', 'delete_key',
- 'lookup'))
diff --git a/neutron/tests/tempest/services/identity/__init__.py b/neutron/tests/tempest/services/identity/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/neutron/tests/tempest/services/identity/__init__.py
+++ /dev/null
diff --git a/neutron/tests/tempest/services/identity/v2/__init__.py b/neutron/tests/tempest/services/identity/v2/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/neutron/tests/tempest/services/identity/v2/__init__.py
+++ /dev/null
diff --git a/neutron/tests/tempest/services/identity/v2/json/__init__.py b/neutron/tests/tempest/services/identity/v2/json/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/neutron/tests/tempest/services/identity/v2/json/__init__.py
+++ /dev/null
diff --git a/neutron/tests/tempest/services/identity/v2/json/identity_client.py b/neutron/tests/tempest/services/identity/v2/json/identity_client.py
deleted file mode 100644
index 46e8f87..0000000
--- a/neutron/tests/tempest/services/identity/v2/json/identity_client.py
+++ /dev/null
@@ -1,283 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_serialization import jsonutils as json
-from tempest_lib import exceptions as lib_exc
-
-from neutron.tests.tempest.common import service_client
-
-
-class IdentityClientJSON(service_client.ServiceClient):
-
- def has_admin_extensions(self):
- """
- Returns True if the KSADM Admin Extensions are supported
- False otherwise
- """
- if hasattr(self, '_has_admin_extensions'):
- return self._has_admin_extensions
- # Try something that requires admin
- try:
- self.list_roles()
- self._has_admin_extensions = True
- except Exception:
- self._has_admin_extensions = False
- return self._has_admin_extensions
-
- def create_role(self, name):
- """Create a role."""
- post_body = {
- 'name': name,
- }
- post_body = json.dumps({'role': post_body})
- resp, body = self.post('OS-KSADM/roles', post_body)
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, self._parse_resp(body))
-
- def get_role(self, role_id):
- """Get a role by its id."""
- resp, body = self.get('OS-KSADM/roles/%s' % role_id)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['role'])
-
- def create_tenant(self, name, **kwargs):
- """
- Create a tenant
- name (required): New tenant name
- description: Description of new tenant (default is none)
- enabled <true|false>: Initial tenant status (default is true)
- """
- post_body = {
- 'name': name,
- 'description': kwargs.get('description', ''),
- 'enabled': kwargs.get('enabled', True),
- }
- post_body = json.dumps({'tenant': post_body})
- resp, body = self.post('tenants', post_body)
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, self._parse_resp(body))
-
- def delete_role(self, role_id):
- """Delete a role."""
- resp, body = self.delete('OS-KSADM/roles/%s' % str(role_id))
- self.expected_success(204, resp.status)
- return resp, body
-
- def list_user_roles(self, tenant_id, user_id):
- """Returns a list of roles assigned to a user for a tenant."""
- url = '/tenants/%s/users/%s/roles' % (tenant_id, user_id)
- resp, body = self.get(url)
- self.expected_success(200, resp.status)
- return service_client.ResponseBodyList(resp, self._parse_resp(body))
-
- def assign_user_role(self, tenant_id, user_id, role_id):
- """Add roles to a user on a tenant."""
- resp, body = self.put('/tenants/%s/users/%s/roles/OS-KSADM/%s' %
- (tenant_id, user_id, role_id), "")
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, self._parse_resp(body))
-
- def remove_user_role(self, tenant_id, user_id, role_id):
- """Removes a role assignment for a user on a tenant."""
- resp, body = self.delete('/tenants/%s/users/%s/roles/OS-KSADM/%s' %
- (tenant_id, user_id, role_id))
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def delete_tenant(self, tenant_id):
- """Delete a tenant."""
- resp, body = self.delete('tenants/%s' % str(tenant_id))
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def get_tenant(self, tenant_id):
- """Get tenant details."""
- resp, body = self.get('tenants/%s' % str(tenant_id))
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, self._parse_resp(body))
-
- def list_roles(self):
- """Returns roles."""
- resp, body = self.get('OS-KSADM/roles')
- self.expected_success(200, resp.status)
- return service_client.ResponseBodyList(resp, self._parse_resp(body))
-
- def list_tenants(self):
- """Returns tenants."""
- resp, body = self.get('tenants')
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['tenants'])
-
- def get_tenant_by_name(self, tenant_name):
- tenants = self.list_tenants()
- for tenant in tenants:
- if tenant['name'] == tenant_name:
- return tenant
- raise lib_exc.NotFound('No such tenant')
-
- def update_tenant(self, tenant_id, **kwargs):
- """Updates a tenant."""
- body = self.get_tenant(tenant_id)
- name = kwargs.get('name', body['name'])
- desc = kwargs.get('description', body['description'])
- en = kwargs.get('enabled', body['enabled'])
- post_body = {
- 'id': tenant_id,
- 'name': name,
- 'description': desc,
- 'enabled': en,
- }
- post_body = json.dumps({'tenant': post_body})
- resp, body = self.post('tenants/%s' % tenant_id, post_body)
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, self._parse_resp(body))
-
- def create_user(self, name, password, tenant_id, email, **kwargs):
- """Create a user."""
- post_body = {
- 'name': name,
- 'password': password,
- 'email': email
- }
- if tenant_id is not None:
- post_body['tenantId'] = tenant_id
- if kwargs.get('enabled') is not None:
- post_body['enabled'] = kwargs.get('enabled')
- post_body = json.dumps({'user': post_body})
- resp, body = self.post('users', post_body)
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, self._parse_resp(body))
-
- def update_user(self, user_id, **kwargs):
- """Updates a user."""
- put_body = json.dumps({'user': kwargs})
- resp, body = self.put('users/%s' % user_id, put_body)
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, self._parse_resp(body))
-
- def get_user(self, user_id):
- """GET a user."""
- resp, body = self.get("users/%s" % user_id)
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, self._parse_resp(body))
-
- def delete_user(self, user_id):
- """Delete a user."""
- resp, body = self.delete("users/%s" % user_id)
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def get_users(self):
- """Get the list of users."""
- resp, body = self.get("users")
- self.expected_success(200, resp.status)
- return service_client.ResponseBodyList(resp, self._parse_resp(body))
-
- def enable_disable_user(self, user_id, enabled):
- """Enables or disables a user."""
- put_body = {
- 'enabled': enabled
- }
- put_body = json.dumps({'user': put_body})
- resp, body = self.put('users/%s/enabled' % user_id, put_body)
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, self._parse_resp(body))
-
- def get_token(self, token_id):
- """Get token details."""
- resp, body = self.get("tokens/%s" % token_id)
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, self._parse_resp(body))
-
- def delete_token(self, token_id):
- """Delete a token."""
- resp, body = self.delete("tokens/%s" % token_id)
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def list_users_for_tenant(self, tenant_id):
- """List users for a Tenant."""
- resp, body = self.get('/tenants/%s/users' % tenant_id)
- self.expected_success(200, resp.status)
- return service_client.ResponseBodyList(resp, self._parse_resp(body))
-
- def get_user_by_username(self, tenant_id, username):
- users = self.list_users_for_tenant(tenant_id)
- for user in users:
- if user['name'] == username:
- return user
- raise lib_exc.NotFound('No such user')
-
- def create_service(self, name, type, **kwargs):
- """Create a service."""
- post_body = {
- 'name': name,
- 'type': type,
- 'description': kwargs.get('description')
- }
- post_body = json.dumps({'OS-KSADM:service': post_body})
- resp, body = self.post('/OS-KSADM/services', post_body)
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, self._parse_resp(body))
-
- def get_service(self, service_id):
- """Get Service."""
- url = '/OS-KSADM/services/%s' % service_id
- resp, body = self.get(url)
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, self._parse_resp(body))
-
- def list_services(self):
- """List Service - Returns Services."""
- resp, body = self.get('/OS-KSADM/services')
- self.expected_success(200, resp.status)
- return service_client.ResponseBodyList(resp, self._parse_resp(body))
-
- def delete_service(self, service_id):
- """Delete Service."""
- url = '/OS-KSADM/services/%s' % service_id
- resp, body = self.delete(url)
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def update_user_password(self, user_id, new_pass):
- """Update User Password."""
- put_body = {
- 'password': new_pass,
- 'id': user_id
- }
- put_body = json.dumps({'user': put_body})
- resp, body = self.put('users/%s/OS-KSADM/password' % user_id, put_body)
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, self._parse_resp(body))
-
- def list_extensions(self):
- """List all the extensions."""
- resp, body = self.get('/extensions')
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp,
- body['extensions']['values'])
-
- def create_user_ec2_credentials(self, user_id, tenant_id):
- post_body = json.dumps({'tenant_id': tenant_id})
- resp, body = self.post('/users/%s/credentials/OS-EC2' % user_id,
- post_body)
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, self._parse_resp(body))
-
- def list_user_ec2_credentials(self, user_id):
- resp, body = self.get('/users/%s/credentials/OS-EC2' % user_id)
- self.expected_success(200, resp.status)
- return service_client.ResponseBodyList(resp, self._parse_resp(body))
diff --git a/neutron/tests/tempest/services/identity/v3/__init__.py b/neutron/tests/tempest/services/identity/v3/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/neutron/tests/tempest/services/identity/v3/__init__.py
+++ /dev/null
diff --git a/neutron/tests/tempest/services/identity/v3/json/__init__.py b/neutron/tests/tempest/services/identity/v3/json/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/neutron/tests/tempest/services/identity/v3/json/__init__.py
+++ /dev/null
diff --git a/neutron/tests/tempest/services/identity/v3/json/credentials_client.py b/neutron/tests/tempest/services/identity/v3/json/credentials_client.py
deleted file mode 100644
index 07e230a..0000000
--- a/neutron/tests/tempest/services/identity/v3/json/credentials_client.py
+++ /dev/null
@@ -1,83 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_serialization import jsonutils as json
-
-from neutron.tests.tempest.common import service_client
-
-
-class CredentialsClientJSON(service_client.ServiceClient):
- api_version = "v3"
-
- def create_credential(self, access_key, secret_key, user_id, project_id):
- """Creates a credential."""
- blob = "{\"access\": \"%s\", \"secret\": \"%s\"}" % (
- access_key, secret_key)
- post_body = {
- "blob": blob,
- "project_id": project_id,
- "type": "ec2",
- "user_id": user_id
- }
- post_body = json.dumps({'credential': post_body})
- resp, body = self.post('credentials', post_body)
- self.expected_success(201, resp.status)
- body = json.loads(body)
- body['credential']['blob'] = json.loads(body['credential']['blob'])
- return service_client.ResponseBody(resp, body['credential'])
-
- def update_credential(self, credential_id, **kwargs):
- """Updates a credential."""
- body = self.get_credential(credential_id)
- cred_type = kwargs.get('type', body['type'])
- access_key = kwargs.get('access_key', body['blob']['access'])
- secret_key = kwargs.get('secret_key', body['blob']['secret'])
- project_id = kwargs.get('project_id', body['project_id'])
- user_id = kwargs.get('user_id', body['user_id'])
- blob = "{\"access\": \"%s\", \"secret\": \"%s\"}" % (
- access_key, secret_key)
- post_body = {
- "blob": blob,
- "project_id": project_id,
- "type": cred_type,
- "user_id": user_id
- }
- post_body = json.dumps({'credential': post_body})
- resp, body = self.patch('credentials/%s' % credential_id, post_body)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- body['credential']['blob'] = json.loads(body['credential']['blob'])
- return service_client.ResponseBody(resp, body['credential'])
-
- def get_credential(self, credential_id):
- """To GET Details of a credential."""
- resp, body = self.get('credentials/%s' % credential_id)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- body['credential']['blob'] = json.loads(body['credential']['blob'])
- return service_client.ResponseBody(resp, body['credential'])
-
- def list_credentials(self):
- """Lists out all the available credentials."""
- resp, body = self.get('credentials')
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['credentials'])
-
- def delete_credential(self, credential_id):
- """Deletes a credential."""
- resp, body = self.delete('credentials/%s' % credential_id)
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
diff --git a/neutron/tests/tempest/services/identity/v3/json/endpoints_client.py b/neutron/tests/tempest/services/identity/v3/json/endpoints_client.py
deleted file mode 100644
index 27ac3e5..0000000
--- a/neutron/tests/tempest/services/identity/v3/json/endpoints_client.py
+++ /dev/null
@@ -1,87 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_serialization import jsonutils as json
-
-from neutron.tests.tempest.common import service_client
-
-
-class EndPointClientJSON(service_client.ServiceClient):
- api_version = "v3"
-
- def list_endpoints(self):
- """GET endpoints."""
- resp, body = self.get('endpoints')
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['endpoints'])
-
- def create_endpoint(self, service_id, interface, url, **kwargs):
- """Create endpoint.
-
- Normally this function wouldn't allow setting values that are not
- allowed for 'enabled'. Use `force_enabled` to set a non-boolean.
-
- """
- region = kwargs.get('region', None)
- if 'force_enabled' in kwargs:
- enabled = kwargs.get('force_enabled', None)
- else:
- enabled = kwargs.get('enabled', None)
- post_body = {
- 'service_id': service_id,
- 'interface': interface,
- 'url': url,
- 'region': region,
- 'enabled': enabled
- }
- post_body = json.dumps({'endpoint': post_body})
- resp, body = self.post('endpoints', post_body)
- self.expected_success(201, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['endpoint'])
-
- def update_endpoint(self, endpoint_id, service_id=None, interface=None,
- url=None, region=None, enabled=None, **kwargs):
- """Updates an endpoint with given parameters.
-
- Normally this function wouldn't allow setting values that are not
- allowed for 'enabled'. Use `force_enabled` to set a non-boolean.
-
- """
- post_body = {}
- if service_id is not None:
- post_body['service_id'] = service_id
- if interface is not None:
- post_body['interface'] = interface
- if url is not None:
- post_body['url'] = url
- if region is not None:
- post_body['region'] = region
- if 'force_enabled' in kwargs:
- post_body['enabled'] = kwargs['force_enabled']
- elif enabled is not None:
- post_body['enabled'] = enabled
- post_body = json.dumps({'endpoint': post_body})
- resp, body = self.patch('endpoints/%s' % endpoint_id, post_body)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['endpoint'])
-
- def delete_endpoint(self, endpoint_id):
- """Delete endpoint."""
- resp_header, resp_body = self.delete('endpoints/%s' % endpoint_id)
- self.expected_success(204, resp_header.status)
- return service_client.ResponseBody(resp_header, resp_body)
diff --git a/neutron/tests/tempest/services/identity/v3/json/identity_client.py b/neutron/tests/tempest/services/identity/v3/json/identity_client.py
deleted file mode 100644
index a090acf..0000000
--- a/neutron/tests/tempest/services/identity/v3/json/identity_client.py
+++ /dev/null
@@ -1,523 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_serialization import jsonutils as json
-from six.moves.urllib import parse
-
-from neutron.tests.tempest.common import service_client
-
-
-class IdentityV3ClientJSON(service_client.ServiceClient):
- api_version = "v3"
-
- def create_user(self, user_name, password=None, project_id=None,
- email=None, domain_id='default', **kwargs):
- """Creates a user."""
- en = kwargs.get('enabled', True)
- description = kwargs.get('description', None)
- default_project_id = kwargs.get('default_project_id')
- post_body = {
- 'project_id': project_id,
- 'default_project_id': default_project_id,
- 'description': description,
- 'domain_id': domain_id,
- 'email': email,
- 'enabled': en,
- 'name': user_name,
- 'password': password
- }
- post_body = json.dumps({'user': post_body})
- resp, body = self.post('users', post_body)
- self.expected_success(201, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['user'])
-
- def update_user(self, user_id, name, **kwargs):
- """Updates a user."""
- body = self.get_user(user_id)
- email = kwargs.get('email', body['email'])
- en = kwargs.get('enabled', body['enabled'])
- project_id = kwargs.get('project_id', body['project_id'])
- if 'default_project_id' in body.keys():
- default_project_id = kwargs.get('default_project_id',
- body['default_project_id'])
- else:
- default_project_id = kwargs.get('default_project_id')
- description = kwargs.get('description', body['description'])
- domain_id = kwargs.get('domain_id', body['domain_id'])
- post_body = {
- 'name': name,
- 'email': email,
- 'enabled': en,
- 'project_id': project_id,
- 'default_project_id': default_project_id,
- 'id': user_id,
- 'domain_id': domain_id,
- 'description': description
- }
- post_body = json.dumps({'user': post_body})
- resp, body = self.patch('users/%s' % user_id, post_body)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['user'])
-
- def update_user_password(self, user_id, password, original_password):
- """Updates a user password."""
- update_user = {
- 'password': password,
- 'original_password': original_password
- }
- update_user = json.dumps({'user': update_user})
- resp, _ = self.post('users/%s/password' % user_id, update_user)
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp)
-
- def list_user_projects(self, user_id):
- """Lists the projects on which a user has roles assigned."""
- resp, body = self.get('users/%s/projects' % user_id)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['projects'])
-
- def get_users(self, params=None):
- """Get the list of users."""
- url = 'users'
- if params:
- url += '?%s' % parse.urlencode(params)
- resp, body = self.get(url)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['users'])
-
- def get_user(self, user_id):
- """GET a user."""
- resp, body = self.get("users/%s" % user_id)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['user'])
-
- def delete_user(self, user_id):
- """Deletes a User."""
- resp, body = self.delete("users/%s" % user_id)
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def create_project(self, name, **kwargs):
- """Creates a project."""
- description = kwargs.get('description', None)
- en = kwargs.get('enabled', True)
- domain_id = kwargs.get('domain_id', 'default')
- post_body = {
- 'description': description,
- 'domain_id': domain_id,
- 'enabled': en,
- 'name': name
- }
- post_body = json.dumps({'project': post_body})
- resp, body = self.post('projects', post_body)
- self.expected_success(201, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['project'])
-
- def list_projects(self, params=None):
- url = "projects"
- if params:
- url += '?%s' % parse.urlencode(params)
- resp, body = self.get(url)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['projects'])
-
- def update_project(self, project_id, **kwargs):
- body = self.get_project(project_id)
- name = kwargs.get('name', body['name'])
- desc = kwargs.get('description', body['description'])
- en = kwargs.get('enabled', body['enabled'])
- domain_id = kwargs.get('domain_id', body['domain_id'])
- post_body = {
- 'id': project_id,
- 'name': name,
- 'description': desc,
- 'enabled': en,
- 'domain_id': domain_id,
- }
- post_body = json.dumps({'project': post_body})
- resp, body = self.patch('projects/%s' % project_id, post_body)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['project'])
-
- def get_project(self, project_id):
- """GET a Project."""
- resp, body = self.get("projects/%s" % project_id)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['project'])
-
- def delete_project(self, project_id):
- """Delete a project."""
- resp, body = self.delete('projects/%s' % str(project_id))
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def create_role(self, name):
- """Create a Role."""
- post_body = {
- 'name': name
- }
- post_body = json.dumps({'role': post_body})
- resp, body = self.post('roles', post_body)
- self.expected_success(201, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['role'])
-
- def get_role(self, role_id):
- """GET a Role."""
- resp, body = self.get('roles/%s' % str(role_id))
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['role'])
-
- def list_roles(self):
- """Get the list of Roles."""
- resp, body = self.get("roles")
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['roles'])
-
- def update_role(self, name, role_id):
- """Create a Role."""
- post_body = {
- 'name': name
- }
- post_body = json.dumps({'role': post_body})
- resp, body = self.patch('roles/%s' % str(role_id), post_body)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['role'])
-
- def delete_role(self, role_id):
- """Delete a role."""
- resp, body = self.delete('roles/%s' % str(role_id))
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def assign_user_role(self, project_id, user_id, role_id):
- """Add roles to a user on a project."""
- resp, body = self.put('projects/%s/users/%s/roles/%s' %
- (project_id, user_id, role_id), None)
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def create_domain(self, name, **kwargs):
- """Creates a domain."""
- description = kwargs.get('description', None)
- en = kwargs.get('enabled', True)
- post_body = {
- 'description': description,
- 'enabled': en,
- 'name': name
- }
- post_body = json.dumps({'domain': post_body})
- resp, body = self.post('domains', post_body)
- self.expected_success(201, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['domain'])
-
- def delete_domain(self, domain_id):
- """Delete a domain."""
- resp, body = self.delete('domains/%s' % str(domain_id))
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def list_domains(self):
- """List Domains."""
- resp, body = self.get('domains')
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['domains'])
-
- def update_domain(self, domain_id, **kwargs):
- """Updates a domain."""
- body = self.get_domain(domain_id)
- description = kwargs.get('description', body['description'])
- en = kwargs.get('enabled', body['enabled'])
- name = kwargs.get('name', body['name'])
- post_body = {
- 'description': description,
- 'enabled': en,
- 'name': name
- }
- post_body = json.dumps({'domain': post_body})
- resp, body = self.patch('domains/%s' % domain_id, post_body)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['domain'])
-
- def get_domain(self, domain_id):
- """Get Domain details."""
- resp, body = self.get('domains/%s' % domain_id)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['domain'])
-
- def get_token(self, resp_token):
- """Get token details."""
- headers = {'X-Subject-Token': resp_token}
- resp, body = self.get("auth/tokens", headers=headers)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['token'])
-
- def delete_token(self, resp_token):
- """Deletes token."""
- headers = {'X-Subject-Token': resp_token}
- resp, body = self.delete("auth/tokens", headers=headers)
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def create_group(self, name, **kwargs):
- """Creates a group."""
- description = kwargs.get('description', None)
- domain_id = kwargs.get('domain_id', 'default')
- project_id = kwargs.get('project_id', None)
- post_body = {
- 'description': description,
- 'domain_id': domain_id,
- 'project_id': project_id,
- 'name': name
- }
- post_body = json.dumps({'group': post_body})
- resp, body = self.post('groups', post_body)
- self.expected_success(201, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['group'])
-
- def get_group(self, group_id):
- """Get group details."""
- resp, body = self.get('groups/%s' % group_id)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['group'])
-
- def list_groups(self):
- """Lists the groups."""
- resp, body = self.get('groups')
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['groups'])
-
- def update_group(self, group_id, **kwargs):
- """Updates a group."""
- body = self.get_group(group_id)
- name = kwargs.get('name', body['name'])
- description = kwargs.get('description', body['description'])
- post_body = {
- 'name': name,
- 'description': description
- }
- post_body = json.dumps({'group': post_body})
- resp, body = self.patch('groups/%s' % group_id, post_body)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['group'])
-
- def delete_group(self, group_id):
- """Delete a group."""
- resp, body = self.delete('groups/%s' % str(group_id))
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def add_group_user(self, group_id, user_id):
- """Add user into group."""
- resp, body = self.put('groups/%s/users/%s' % (group_id, user_id),
- None)
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def list_group_users(self, group_id):
- """List users in group."""
- resp, body = self.get('groups/%s/users' % group_id)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['users'])
-
- def list_user_groups(self, user_id):
- """Lists groups which a user belongs to."""
- resp, body = self.get('users/%s/groups' % user_id)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['groups'])
-
- def delete_group_user(self, group_id, user_id):
- """Delete user in group."""
- resp, body = self.delete('groups/%s/users/%s' % (group_id, user_id))
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def assign_user_role_on_project(self, project_id, user_id, role_id):
- """Add roles to a user on a project."""
- resp, body = self.put('projects/%s/users/%s/roles/%s' %
- (project_id, user_id, role_id), None)
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def assign_user_role_on_domain(self, domain_id, user_id, role_id):
- """Add roles to a user on a domain."""
- resp, body = self.put('domains/%s/users/%s/roles/%s' %
- (domain_id, user_id, role_id), None)
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def list_user_roles_on_project(self, project_id, user_id):
- """list roles of a user on a project."""
- resp, body = self.get('projects/%s/users/%s/roles' %
- (project_id, user_id))
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['roles'])
-
- def list_user_roles_on_domain(self, domain_id, user_id):
- """list roles of a user on a domain."""
- resp, body = self.get('domains/%s/users/%s/roles' %
- (domain_id, user_id))
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['roles'])
-
- def revoke_role_from_user_on_project(self, project_id, user_id, role_id):
- """Delete role of a user on a project."""
- resp, body = self.delete('projects/%s/users/%s/roles/%s' %
- (project_id, user_id, role_id))
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def revoke_role_from_user_on_domain(self, domain_id, user_id, role_id):
- """Delete role of a user on a domain."""
- resp, body = self.delete('domains/%s/users/%s/roles/%s' %
- (domain_id, user_id, role_id))
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def assign_group_role_on_project(self, project_id, group_id, role_id):
- """Add roles to a user on a project."""
- resp, body = self.put('projects/%s/groups/%s/roles/%s' %
- (project_id, group_id, role_id), None)
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def assign_group_role_on_domain(self, domain_id, group_id, role_id):
- """Add roles to a user on a domain."""
- resp, body = self.put('domains/%s/groups/%s/roles/%s' %
- (domain_id, group_id, role_id), None)
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def list_group_roles_on_project(self, project_id, group_id):
- """list roles of a user on a project."""
- resp, body = self.get('projects/%s/groups/%s/roles' %
- (project_id, group_id))
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['roles'])
-
- def list_group_roles_on_domain(self, domain_id, group_id):
- """list roles of a user on a domain."""
- resp, body = self.get('domains/%s/groups/%s/roles' %
- (domain_id, group_id))
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['roles'])
-
- def revoke_role_from_group_on_project(self, project_id, group_id, role_id):
- """Delete role of a user on a project."""
- resp, body = self.delete('projects/%s/groups/%s/roles/%s' %
- (project_id, group_id, role_id))
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def revoke_role_from_group_on_domain(self, domain_id, group_id, role_id):
- """Delete role of a user on a domain."""
- resp, body = self.delete('domains/%s/groups/%s/roles/%s' %
- (domain_id, group_id, role_id))
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def create_trust(self, trustor_user_id, trustee_user_id, project_id,
- role_names, impersonation, expires_at):
- """Creates a trust."""
- roles = [{'name': n} for n in role_names]
- post_body = {
- 'trustor_user_id': trustor_user_id,
- 'trustee_user_id': trustee_user_id,
- 'project_id': project_id,
- 'impersonation': impersonation,
- 'roles': roles,
- 'expires_at': expires_at
- }
- post_body = json.dumps({'trust': post_body})
- resp, body = self.post('OS-TRUST/trusts', post_body)
- self.expected_success(201, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['trust'])
-
- def delete_trust(self, trust_id):
- """Deletes a trust."""
- resp, body = self.delete("OS-TRUST/trusts/%s" % trust_id)
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def get_trusts(self, trustor_user_id=None, trustee_user_id=None):
- """GET trusts."""
- if trustor_user_id:
- resp, body = self.get("OS-TRUST/trusts?trustor_user_id=%s"
- % trustor_user_id)
- elif trustee_user_id:
- resp, body = self.get("OS-TRUST/trusts?trustee_user_id=%s"
- % trustee_user_id)
- else:
- resp, body = self.get("OS-TRUST/trusts")
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['trusts'])
-
- def get_trust(self, trust_id):
- """GET trust."""
- resp, body = self.get("OS-TRUST/trusts/%s" % trust_id)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['trust'])
-
- def get_trust_roles(self, trust_id):
- """GET roles delegated by a trust."""
- resp, body = self.get("OS-TRUST/trusts/%s/roles" % trust_id)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['roles'])
-
- def get_trust_role(self, trust_id, role_id):
- """GET role delegated by a trust."""
- resp, body = self.get("OS-TRUST/trusts/%s/roles/%s"
- % (trust_id, role_id))
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['role'])
-
- def check_trust_role(self, trust_id, role_id):
- """HEAD Check if role is delegated by a trust."""
- resp, body = self.head("OS-TRUST/trusts/%s/roles/%s"
- % (trust_id, role_id))
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, body)
diff --git a/neutron/tests/tempest/services/identity/v3/json/policy_client.py b/neutron/tests/tempest/services/identity/v3/json/policy_client.py
deleted file mode 100644
index 2d247af..0000000
--- a/neutron/tests/tempest/services/identity/v3/json/policy_client.py
+++ /dev/null
@@ -1,69 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_serialization import jsonutils as json
-
-from neutron.tests.tempest.common import service_client
-
-
-class PolicyClientJSON(service_client.ServiceClient):
- api_version = "v3"
-
- def create_policy(self, blob, type):
- """Creates a Policy."""
- post_body = {
- "blob": blob,
- "type": type
- }
- post_body = json.dumps({'policy': post_body})
- resp, body = self.post('policies', post_body)
- self.expected_success(201, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['policy'])
-
- def list_policies(self):
- """Lists the policies."""
- resp, body = self.get('policies')
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['policies'])
-
- def get_policy(self, policy_id):
- """Lists out the given policy."""
- url = 'policies/%s' % policy_id
- resp, body = self.get(url)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['policy'])
-
- def update_policy(self, policy_id, **kwargs):
- """Updates a policy."""
- type = kwargs.get('type')
- post_body = {
- 'type': type
- }
- post_body = json.dumps({'policy': post_body})
- url = 'policies/%s' % policy_id
- resp, body = self.patch(url, post_body)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['policy'])
-
- def delete_policy(self, policy_id):
- """Deletes the policy."""
- url = "policies/%s" % policy_id
- resp, body = self.delete(url)
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
diff --git a/neutron/tests/tempest/services/identity/v3/json/region_client.py b/neutron/tests/tempest/services/identity/v3/json/region_client.py
deleted file mode 100644
index 0effae8..0000000
--- a/neutron/tests/tempest/services/identity/v3/json/region_client.py
+++ /dev/null
@@ -1,77 +0,0 @@
-# Copyright 2014 Hewlett-Packard Development Company, L.P
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_serialization import jsonutils as json
-from six.moves.urllib import parse
-
-from neutron.tests.tempest.common import service_client
-
-
-class RegionClientJSON(service_client.ServiceClient):
- api_version = "v3"
-
- def create_region(self, description, **kwargs):
- """Create region."""
- req_body = {
- 'description': description,
- }
- if kwargs.get('parent_region_id'):
- req_body['parent_region_id'] = kwargs.get('parent_region_id')
- req_body = json.dumps({'region': req_body})
- if kwargs.get('unique_region_id'):
- resp, body = self.put(
- 'regions/%s' % kwargs.get('unique_region_id'), req_body)
- else:
- resp, body = self.post('regions', req_body)
- self.expected_success(201, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['region'])
-
- def update_region(self, region_id, **kwargs):
- """Updates a region."""
- post_body = {}
- if 'description' in kwargs:
- post_body['description'] = kwargs.get('description')
- if 'parent_region_id' in kwargs:
- post_body['parent_region_id'] = kwargs.get('parent_region_id')
- post_body = json.dumps({'region': post_body})
- resp, body = self.patch('regions/%s' % region_id, post_body)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['region'])
-
- def get_region(self, region_id):
- """Get region."""
- url = 'regions/%s' % region_id
- resp, body = self.get(url)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['region'])
-
- def list_regions(self, params=None):
- """List regions."""
- url = 'regions'
- if params:
- url += '?%s' % parse.urlencode(params)
- resp, body = self.get(url)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['regions'])
-
- def delete_region(self, region_id):
- """Delete region."""
- resp, body = self.delete('regions/%s' % region_id)
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
diff --git a/neutron/tests/tempest/services/identity/v3/json/service_client.py b/neutron/tests/tempest/services/identity/v3/json/service_client.py
deleted file mode 100644
index 75a5cf8..0000000
--- a/neutron/tests/tempest/services/identity/v3/json/service_client.py
+++ /dev/null
@@ -1,73 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_serialization import jsonutils as json
-
-from neutron.tests.tempest.common import service_client
-
-
-class ServiceClientJSON(service_client.ServiceClient):
- api_version = "v3"
-
- def update_service(self, service_id, **kwargs):
- """Updates a service."""
- body = self.get_service(service_id)
- name = kwargs.get('name', body['name'])
- type = kwargs.get('type', body['type'])
- desc = kwargs.get('description', body['description'])
- patch_body = {
- 'description': desc,
- 'type': type,
- 'name': name
- }
- patch_body = json.dumps({'service': patch_body})
- resp, body = self.patch('services/%s' % service_id, patch_body)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['service'])
-
- def get_service(self, service_id):
- """Get Service."""
- url = 'services/%s' % service_id
- resp, body = self.get(url)
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body['service'])
-
- def create_service(self, serv_type, name=None, description=None,
- enabled=True):
- body_dict = {
- 'name': name,
- 'type': serv_type,
- 'enabled': enabled,
- 'description': description,
- }
- body = json.dumps({'service': body_dict})
- resp, body = self.post("services", body)
- self.expected_success(201, resp.status)
- body = json.loads(body)
- return service_client.ResponseBody(resp, body["service"])
-
- def delete_service(self, serv_id):
- url = "services/" + serv_id
- resp, body = self.delete(url)
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def list_services(self):
- resp, body = self.get('services')
- self.expected_success(200, resp.status)
- body = json.loads(body)
- return service_client.ResponseBodyList(resp, body['services'])
diff --git a/neutron/tests/tempest/services/network/json/network_client.py b/neutron/tests/tempest/services/network/json/network_client.py
index caefc6d..2cd1344 100644
--- a/neutron/tests/tempest/services/network/json/network_client.py
+++ b/neutron/tests/tempest/services/network/json/network_client.py
@@ -14,10 +14,9 @@
from oslo_serialization import jsonutils as json
from six.moves.urllib import parse as urlparse
-from tempest_lib.common.utils import misc
+from tempest.common import service_client
from tempest_lib import exceptions as lib_exc
-from neutron.tests.tempest.common import service_client
from neutron.tests.tempest import exceptions
@@ -220,43 +219,6 @@
return True
return False
- def wait_for_resource_status(self, fetch, status, interval=None,
- timeout=None):
- """
- @summary: Waits for a network resource to reach a status
- @param fetch: the callable to be used to query the resource status
- @type fecth: callable that takes no parameters and returns the resource
- @param status: the status that the resource has to reach
- @type status: String
- @param interval: the number of seconds to wait between each status
- query
- @type interval: Integer
- @param timeout: the maximum number of seconds to wait for the resource
- to reach the desired status
- @type timeout: Integer
- """
- if not interval:
- interval = self.build_interval
- if not timeout:
- timeout = self.build_timeout
- start_time = time.time()
-
- while time.time() - start_time <= timeout:
- resource = fetch()
- if resource['status'] == status:
- return
- time.sleep(interval)
-
- # At this point, the wait has timed out
- message = 'Resource %s' % (str(resource))
- message += ' failed to reach status %s' % status
- message += ' (current: %s)' % resource['status']
- message += ' within the required time %s' % timeout
- caller = misc.find_test_caller()
- if caller:
- message = '(%s) %s' % (caller, message)
- raise exceptions.TimeoutException(message)
-
def deserialize_single(self, body):
return json.loads(body)
diff --git a/neutron/tests/tempest/services/network/resources.py b/neutron/tests/tempest/services/network/resources.py
deleted file mode 100644
index 962dfc5..0000000
--- a/neutron/tests/tempest/services/network/resources.py
+++ /dev/null
@@ -1,167 +0,0 @@
-# Copyright 2013 Hewlett-Packard Development Company, L.P.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import abc
-
-import six
-
-
-class AttributeDict(dict):
-
- """
- Provide attribute access (dict.key) to dictionary values.
- """
-
- def __getattr__(self, name):
- """Allow attribute access for all keys in the dict."""
- if name in self:
- return self[name]
- return super(AttributeDict, self).__getattribute__(name)
-
-
-@six.add_metaclass(abc.ABCMeta)
-class DeletableResource(AttributeDict):
-
- """
- Support deletion of neutron resources (networks, subnets) via a
- delete() method, as is supported by keystone and nova resources.
- """
-
- def __init__(self, *args, **kwargs):
- self.client = kwargs.pop('client', None)
- super(DeletableResource, self).__init__(*args, **kwargs)
-
- def __str__(self):
- return '<%s id="%s" name="%s">' % (self.__class__.__name__,
- self.id, self.name)
-
- @abc.abstractmethod
- def delete(self):
- return
-
- @abc.abstractmethod
- def refresh(self):
- return
-
- def __hash__(self):
- return hash(self.id)
-
- def wait_for_status(self, status):
- if not hasattr(self, 'status'):
- return
-
- def helper_get():
- self.refresh()
- return self
-
- return self.client.wait_for_resource_status(helper_get, status)
-
-
-class DeletableNetwork(DeletableResource):
-
- def delete(self):
- self.client.delete_network(self.id)
-
-
-class DeletableSubnet(DeletableResource):
-
- def __init__(self, *args, **kwargs):
- super(DeletableSubnet, self).__init__(*args, **kwargs)
- self._router_ids = set()
-
- def update(self, *args, **kwargs):
- result = self.client.update_subnet(self.id,
- *args,
- **kwargs)
- return super(DeletableSubnet, self).update(**result['subnet'])
-
- def add_to_router(self, router_id):
- self._router_ids.add(router_id)
- self.client.add_router_interface_with_subnet_id(router_id,
- subnet_id=self.id)
-
- def delete(self):
- for router_id in self._router_ids.copy():
- self.client.remove_router_interface_with_subnet_id(
- router_id,
- subnet_id=self.id)
- self._router_ids.remove(router_id)
- self.client.delete_subnet(self.id)
-
-
-class DeletableRouter(DeletableResource):
-
- def set_gateway(self, network_id):
- return self.update(external_gateway_info=dict(network_id=network_id))
-
- def unset_gateway(self):
- return self.update(external_gateway_info=dict())
-
- def update(self, *args, **kwargs):
- result = self.client.update_router(self.id,
- *args,
- **kwargs)
- return super(DeletableRouter, self).update(**result['router'])
-
- def delete(self):
- self.unset_gateway()
- self.client.delete_router(self.id)
-
-
-class DeletableFloatingIp(DeletableResource):
-
- def refresh(self, *args, **kwargs):
- result = self.client.show_floatingip(self.id,
- *args,
- **kwargs)
- super(DeletableFloatingIp, self).update(**result['floatingip'])
-
- def update(self, *args, **kwargs):
- result = self.client.update_floatingip(self.id,
- *args,
- **kwargs)
- super(DeletableFloatingIp, self).update(**result['floatingip'])
-
- def __repr__(self):
- return '<%s addr="%s">' % (self.__class__.__name__,
- self.floating_ip_address)
-
- def __str__(self):
- return '<"FloatingIP" addr="%s" id="%s">' % (self.floating_ip_address,
- self.id)
-
- def delete(self):
- self.client.delete_floatingip(self.id)
-
-
-class DeletablePort(DeletableResource):
-
- def delete(self):
- self.client.delete_port(self.id)
-
-
-class DeletableSecurityGroup(DeletableResource):
-
- def delete(self):
- self.client.delete_security_group(self.id)
-
-
-class DeletableSecurityGroupRule(DeletableResource):
-
- def __repr__(self):
- return '<%s id="%s">' % (self.__class__.__name__, self.id)
-
- def delete(self):
- self.client.delete_security_group_rule(self.id)
diff --git a/neutron/tests/tempest/test.py b/neutron/tests/tempest/test.py
deleted file mode 100644
index 3abf826..0000000
--- a/neutron/tests/tempest/test.py
+++ /dev/null
@@ -1,675 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import atexit
-import functools
-import os
-import re
-import sys
-import time
-import uuid
-
-import fixtures
-from oslo_log import log as logging
-from oslo_serialization import jsonutils as json
-from oslo_utils import importutils
-import six
-from six.moves.urllib import parse
-import testscenarios
-import testtools
-
-from neutron.tests.api import clients
-from neutron.tests.tempest.common import credentials
-import neutron.tests.tempest.common.generator.valid_generator as valid
-from neutron.tests.tempest import config
-from neutron.tests.tempest import exceptions
-
-LOG = logging.getLogger(__name__)
-
-CONF = config.CONF
-
-
-def attr(*args, **kwargs):
- """A decorator which applies the testtools attr decorator
-
- This decorator applies the testtools.testcase.attr if it is in the list of
- attributes to testtools we want to apply.
- """
-
- def decorator(f):
- if 'type' in kwargs and isinstance(kwargs['type'], str):
- f = testtools.testcase.attr(kwargs['type'])(f)
- if kwargs['type'] == 'smoke':
- f = testtools.testcase.attr('gate')(f)
- elif 'type' in kwargs and isinstance(kwargs['type'], list):
- for attr in kwargs['type']:
- f = testtools.testcase.attr(attr)(f)
- if attr == 'smoke':
- f = testtools.testcase.attr('gate')(f)
- return f
-
- return decorator
-
-
-def idempotent_id(id):
- """Stub for metadata decorator"""
- if not isinstance(id, six.string_types):
- raise TypeError('Test idempotent_id must be string not %s'
- '' % type(id).__name__)
- uuid.UUID(id)
-
- def decorator(f):
- f = testtools.testcase.attr('id-%s' % id)(f)
- if f.__doc__:
- f.__doc__ = 'Test idempotent id: %s\n%s' % (id, f.__doc__)
- else:
- f.__doc__ = 'Test idempotent id: %s' % id
- return f
- return decorator
-
-
-def get_service_list():
- service_list = {
- 'compute': CONF.service_available.nova,
- 'image': CONF.service_available.glance,
- 'baremetal': CONF.service_available.ironic,
- 'volume': CONF.service_available.cinder,
- 'orchestration': CONF.service_available.heat,
- # NOTE(mtreinish) nova-network will provide networking functionality
- # if neutron isn't available, so always set to True.
- 'network': True,
- 'identity': True,
- 'object_storage': CONF.service_available.swift,
- 'dashboard': CONF.service_available.horizon,
- 'telemetry': CONF.service_available.ceilometer,
- 'data_processing': CONF.service_available.sahara,
- 'database': CONF.service_available.trove
- }
- return service_list
-
-
-def services(*args, **kwargs):
- """A decorator used to set an attr for each service used in a test case
-
- This decorator applies a testtools attr for each service that gets
- exercised by a test case.
- """
- def decorator(f):
- services = ['compute', 'image', 'baremetal', 'volume', 'orchestration',
- 'network', 'identity', 'object_storage', 'dashboard',
- 'telemetry', 'data_processing', 'database']
- for service in args:
- if service not in services:
- raise exceptions.InvalidServiceTag('%s is not a valid '
- 'service' % service)
- attr(type=list(args))(f)
-
- @functools.wraps(f)
- def wrapper(self, *func_args, **func_kwargs):
- service_list = get_service_list()
-
- for service in args:
- if not service_list[service]:
- msg = 'Skipped because the %s service is not available' % (
- service)
- raise testtools.TestCase.skipException(msg)
- return f(self, *func_args, **func_kwargs)
- return wrapper
- return decorator
-
-
-def stresstest(*args, **kwargs):
- """Add stress test decorator
-
- For all functions with this decorator a attr stress will be
- set automatically.
-
- @param class_setup_per: allowed values are application, process, action
- ``application``: once in the stress job lifetime
- ``process``: once in the worker process lifetime
- ``action``: on each action
- @param allow_inheritance: allows inheritance of this attribute
- """
- def decorator(f):
- if 'class_setup_per' in kwargs:
- setattr(f, "st_class_setup_per", kwargs['class_setup_per'])
- else:
- setattr(f, "st_class_setup_per", 'process')
- if 'allow_inheritance' in kwargs:
- setattr(f, "st_allow_inheritance", kwargs['allow_inheritance'])
- else:
- setattr(f, "st_allow_inheritance", False)
- attr(type='stress')(f)
- return f
- return decorator
-
-
-def requires_ext(*args, **kwargs):
- """A decorator to skip tests if an extension is not enabled
-
- @param extension
- @param service
- """
- def decorator(func):
- @functools.wraps(func)
- def wrapper(*func_args, **func_kwargs):
- if not is_extension_enabled(kwargs['extension'],
- kwargs['service']):
- msg = "Skipped because %s extension: %s is not enabled" % (
- kwargs['service'], kwargs['extension'])
- raise testtools.TestCase.skipException(msg)
- return func(*func_args, **func_kwargs)
- return wrapper
- return decorator
-
-
-def is_extension_enabled(extension_name, service):
- """A function that will check the list of enabled extensions from config
-
- """
- config_dict = {
- 'compute': CONF.compute_feature_enabled.api_extensions,
- 'volume': CONF.volume_feature_enabled.api_extensions,
- 'network': CONF.network_feature_enabled.api_extensions,
- 'object': CONF.object_storage_feature_enabled.discoverable_apis,
- }
- if len(config_dict[service]) == 0:
- return False
- if config_dict[service][0] == 'all':
- return True
- if extension_name in config_dict[service]:
- return True
- return False
-
-
-at_exit_set = set()
-
-
-def validate_tearDownClass():
- if at_exit_set:
- LOG.error(
- "tearDownClass does not call the super's "
- "tearDownClass in these classes: \n"
- + str(at_exit_set))
-
-
-atexit.register(validate_tearDownClass)
-
-
-class BaseTestCase(testtools.testcase.WithAttributes,
- testtools.TestCase):
- """The test base class defines Tempest framework for class level fixtures.
- `setUpClass` and `tearDownClass` are defined here and cannot be overwritten
- by subclasses (enforced via hacking rule T105).
-
- Set-up is split in a series of steps (setup stages), which can be
- overwritten by test classes. Set-up stages are:
- - skip_checks
- - setup_credentials
- - setup_clients
- - resource_setup
-
- Tear-down is also split in a series of steps (teardown stages), which are
- stacked for execution only if the corresponding setup stage had been
- reached during the setup phase. Tear-down stages are:
- - clear_isolated_creds (defined in the base test class)
- - resource_cleanup
- """
-
- setUpClassCalled = False
- _service = None
-
- network_resources = {}
-
- # NOTE(sdague): log_format is defined inline here instead of using the oslo
- # default because going through the config path recouples config to the
- # stress tests too early, and depending on testr order will fail unit tests
- log_format = ('%(asctime)s %(process)d %(levelname)-8s '
- '[%(name)s] %(message)s')
-
- @classmethod
- def setUpClass(cls):
- # It should never be overridden by descendants
- if hasattr(super(BaseTestCase, cls), 'setUpClass'):
- super(BaseTestCase, cls).setUpClass()
- cls.setUpClassCalled = True
- # Stack of (name, callable) to be invoked in reverse order at teardown
- cls.teardowns = []
- # All the configuration checks that may generate a skip
- cls.skip_checks()
- try:
- # Allocation of all required credentials and client managers
- cls.teardowns.append(('credentials', cls.clear_isolated_creds))
- cls.setup_credentials()
- # Shortcuts to clients
- cls.setup_clients()
- # Additional class-wide test resources
- cls.teardowns.append(('resources', cls.resource_cleanup))
- cls.resource_setup()
- except Exception:
- etype, value, trace = sys.exc_info()
- LOG.info("%s raised in %s.setUpClass. Invoking tearDownClass." % (
- etype, cls.__name__))
- cls.tearDownClass()
- try:
- raise etype, value, trace
- finally:
- del trace # to avoid circular refs
-
- @classmethod
- def tearDownClass(cls):
- at_exit_set.discard(cls)
- # It should never be overridden by descendants
- if hasattr(super(BaseTestCase, cls), 'tearDownClass'):
- super(BaseTestCase, cls).tearDownClass()
- # Save any existing exception, we always want to re-raise the original
- # exception only
- etype, value, trace = sys.exc_info()
- # If there was no exception during setup we shall re-raise the first
- # exception in teardown
- re_raise = (etype is None)
- while cls.teardowns:
- name, teardown = cls.teardowns.pop()
- # Catch any exception in tearDown so we can re-raise the original
- # exception at the end
- try:
- teardown()
- except Exception as te:
- sys_exec_info = sys.exc_info()
- tetype = sys_exec_info[0]
- # TODO(andreaf): Till we have the ability to cleanup only
- # resources that were successfully setup in resource_cleanup,
- # log AttributeError as info instead of exception.
- if tetype is AttributeError and name == 'resources':
- LOG.info("tearDownClass of %s failed: %s" % (name, te))
- else:
- LOG.exception("teardown of %s failed: %s" % (name, te))
- if not etype:
- etype, value, trace = sys_exec_info
- # If exceptions were raised during teardown, an not before, re-raise
- # the first one
- if re_raise and etype is not None:
- try:
- raise etype, value, trace
- finally:
- del trace # to avoid circular refs
-
- @classmethod
- def skip_checks(cls):
- """Class level skip checks. Subclasses verify in here all
- conditions that might prevent the execution of the entire test class.
- Checks implemented here may not make use API calls, and should rely on
- configuration alone.
- In general skip checks that require an API call are discouraged.
- If one is really needed it may be implemented either in the
- resource_setup or at test level.
- """
- pass
-
- @classmethod
- def setup_credentials(cls):
- """Allocate credentials and the client managers from them."""
- # TODO(andreaf) There is a fair amount of code that could me moved from
- # base / test classes in here. Ideally tests should be able to only
- # specify a list of (additional) credentials the need to use.
- pass
-
- @classmethod
- def setup_clients(cls):
- """Create links to the clients into the test object."""
- # TODO(andreaf) There is a fair amount of code that could me moved from
- # base / test classes in here. Ideally tests should be able to only
- # specify which client is `client` and nothing else.
- pass
-
- @classmethod
- def resource_setup(cls):
- """Class level resource setup for test cases.
- """
- pass
-
- @classmethod
- def resource_cleanup(cls):
- """Class level resource cleanup for test cases.
- Resource cleanup must be able to handle the case of partially setup
- resources, in case a failure during `resource_setup` should happen.
- """
- pass
-
- def setUp(self):
- super(BaseTestCase, self).setUp()
- if not self.setUpClassCalled:
- raise RuntimeError("setUpClass does not calls the super's"
- "setUpClass in the "
- + self.__class__.__name__)
- at_exit_set.add(self.__class__)
- test_timeout = os.environ.get('OS_TEST_TIMEOUT', 0)
- try:
- test_timeout = int(test_timeout)
- except ValueError:
- test_timeout = 0
- if test_timeout > 0:
- self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
-
- if (os.environ.get('OS_STDOUT_CAPTURE') == 'True' or
- os.environ.get('OS_STDOUT_CAPTURE') == '1'):
- stdout = self.useFixture(fixtures.StringStream('stdout')).stream
- self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
- if (os.environ.get('OS_STDERR_CAPTURE') == 'True' or
- os.environ.get('OS_STDERR_CAPTURE') == '1'):
- stderr = self.useFixture(fixtures.StringStream('stderr')).stream
- self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
- if (os.environ.get('OS_LOG_CAPTURE') != 'False' and
- os.environ.get('OS_LOG_CAPTURE') != '0'):
- self.useFixture(fixtures.LoggerFixture(nuke_handlers=False,
- format=self.log_format,
- level=None))
-
- @classmethod
- def get_client_manager(cls):
- """
- Returns an OpenStack client manager
- """
- force_tenant_isolation = getattr(cls, 'force_tenant_isolation', None)
-
- if (not hasattr(cls, 'isolated_creds') or
- not cls.isolated_creds.name == cls.__name__):
- cls.isolated_creds = credentials.get_isolated_credentials(
- name=cls.__name__, network_resources=cls.network_resources,
- force_tenant_isolation=force_tenant_isolation,
- )
-
- creds = cls.isolated_creds.get_primary_creds()
- os = clients.Manager(credentials=creds, service=cls._service)
- return os
-
- @classmethod
- def clear_isolated_creds(cls):
- """
- Clears isolated creds if set
- """
- if hasattr(cls, 'isolated_creds'):
- cls.isolated_creds.clear_isolated_creds()
-
- @classmethod
- def _get_identity_admin_client(cls):
- """
- Returns an instance of the Identity Admin API client
- """
- os = clients.AdminManager(service=cls._service)
- admin_client = os.identity_client
- return admin_client
-
- @classmethod
- def set_network_resources(cls, network=False, router=False, subnet=False,
- dhcp=False):
- """Specify which network resources should be created
-
- @param network
- @param router
- @param subnet
- @param dhcp
- """
- # network resources should be set only once from callers
- # in order to ensure that even if it's called multiple times in
- # a chain of overloaded methods, the attribute is set only
- # in the leaf class
- if not cls.network_resources:
- cls.network_resources = {
- 'network': network,
- 'router': router,
- 'subnet': subnet,
- 'dhcp': dhcp}
-
- def assertEmpty(self, list, msg=None):
- self.assertTrue(len(list) == 0, msg)
-
- def assertNotEmpty(self, list, msg=None):
- self.assertTrue(len(list) > 0, msg)
-
-
-class NegativeAutoTest(BaseTestCase):
-
- _resources = {}
-
- @classmethod
- def setUpClass(cls):
- super(NegativeAutoTest, cls).setUpClass()
- os = cls.get_client_manager()
- cls.client = os.negative_client
- os_admin = clients.AdminManager(service=cls._service)
- cls.admin_client = os_admin.negative_client
-
- @staticmethod
- def load_tests(*args):
- """
- Wrapper for testscenarios to set the mandatory scenarios variable
- only in case a real test loader is in place. Will be automatically
- called in case the variable "load_tests" is set.
- """
- if getattr(args[0], 'suiteClass', None) is not None:
- loader, standard_tests, pattern = args
- else:
- standard_tests, module, loader = args
- for test in testtools.iterate_tests(standard_tests):
- schema = getattr(test, '_schema', None)
- if schema is not None:
- setattr(test, 'scenarios',
- NegativeAutoTest.generate_scenario(schema))
- return testscenarios.load_tests_apply_scenarios(*args)
-
- @staticmethod
- def generate_scenario(description):
- """
- Generates the test scenario list for a given description.
-
- :param description: A file or dictionary with the following entries:
- name (required) name for the api
- http-method (required) one of HEAD,GET,PUT,POST,PATCH,DELETE
- url (required) the url to be appended to the catalog url with '%s'
- for each resource mentioned
- resources: (optional) A list of resource names such as "server",
- "flavor", etc. with an element for each '%s' in the url. This
- method will call self.get_resource for each element when
- constructing the positive test case template so negative
- subclasses are expected to return valid resource ids when
- appropriate.
- json-schema (optional) A valid json schema that will be used to
- create invalid data for the api calls. For "GET" and "HEAD",
- the data is used to generate query strings appended to the url,
- otherwise for the body of the http call.
- """
- LOG.debug(description)
- generator = importutils.import_class(
- CONF.negative.test_generator)()
- generator.validate_schema(description)
- schema = description.get("json-schema", None)
- resources = description.get("resources", [])
- scenario_list = []
- expected_result = None
- for resource in resources:
- if isinstance(resource, dict):
- expected_result = resource['expected_result']
- resource = resource['name']
- LOG.debug("Add resource to test %s" % resource)
- scn_name = "inv_res_%s" % (resource)
- scenario_list.append((scn_name, {"resource": (resource,
- str(uuid.uuid4())),
- "expected_result": expected_result
- }))
- if schema is not None:
- for scenario in generator.generate_scenarios(schema):
- scenario_list.append((scenario['_negtest_name'],
- scenario))
- LOG.debug(scenario_list)
- return scenario_list
-
- def execute(self, description):
- """
- Execute a http call on an api that are expected to
- result in client errors. First it uses invalid resources that are part
- of the url, and then invalid data for queries and http request bodies.
-
- :param description: A json file or dictionary with the following
- entries:
- name (required) name for the api
- http-method (required) one of HEAD,GET,PUT,POST,PATCH,DELETE
- url (required) the url to be appended to the catalog url with '%s'
- for each resource mentioned
- resources: (optional) A list of resource names such as "server",
- "flavor", etc. with an element for each '%s' in the url. This
- method will call self.get_resource for each element when
- constructing the positive test case template so negative
- subclasses are expected to return valid resource ids when
- appropriate.
- json-schema (optional) A valid json schema that will be used to
- create invalid data for the api calls. For "GET" and "HEAD",
- the data is used to generate query strings appended to the url,
- otherwise for the body of the http call.
-
- """
- LOG.info("Executing %s" % description["name"])
- LOG.debug(description)
- generator = importutils.import_class(
- CONF.negative.test_generator)()
- schema = description.get("json-schema", None)
- method = description["http-method"]
- url = description["url"]
- expected_result = None
- if "default_result_code" in description:
- expected_result = description["default_result_code"]
-
- resources = [self.get_resource(r) for
- r in description.get("resources", [])]
-
- if hasattr(self, "resource"):
- # Note(mkoderer): The resources list already contains an invalid
- # entry (see get_resource).
- # We just send a valid json-schema with it
- valid_schema = None
- if schema:
- valid_schema = \
- valid.ValidTestGenerator().generate_valid(schema)
- new_url, body = self._http_arguments(valid_schema, url, method)
- elif hasattr(self, "_negtest_name"):
- schema_under_test = \
- valid.ValidTestGenerator().generate_valid(schema)
- local_expected_result = \
- generator.generate_payload(self, schema_under_test)
- if local_expected_result is not None:
- expected_result = local_expected_result
- new_url, body = \
- self._http_arguments(schema_under_test, url, method)
- else:
- raise Exception("testscenarios are not active. Please make sure "
- "that your test runner supports the load_tests "
- "mechanism")
-
- if "admin_client" in description and description["admin_client"]:
- client = self.admin_client
- else:
- client = self.client
- resp, resp_body = client.send_request(method, new_url,
- resources, body=body)
- self._check_negative_response(expected_result, resp.status, resp_body)
-
- def _http_arguments(self, json_dict, url, method):
- LOG.debug("dict: %s url: %s method: %s" % (json_dict, url, method))
- if not json_dict:
- return url, None
- elif method in ["GET", "HEAD", "PUT", "DELETE"]:
- return "%s?%s" % (url, parse.urlencode(json_dict)), None
- else:
- return url, json.dumps(json_dict)
-
- def _check_negative_response(self, expected_result, result, body):
- self.assertTrue(result >= 400 and result < 500 and result != 413,
- "Expected client error, got %s:%s" %
- (result, body))
- self.assertTrue(expected_result is None or expected_result == result,
- "Expected %s, got %s:%s" %
- (expected_result, result, body))
-
- @classmethod
- def set_resource(cls, name, resource):
- """
- This function can be used in setUpClass context to register a resoruce
- for a test.
-
- :param name: The name of the kind of resource such as "flavor", "role",
- etc.
- :resource: The id of the resource
- """
- cls._resources[name] = resource
-
- def get_resource(self, name):
- """
- Return a valid uuid for a type of resource. If a real resource is
- needed as part of a url then this method should return one. Otherwise
- it can return None.
-
- :param name: The name of the kind of resource such as "flavor", "role",
- etc.
- """
- if isinstance(name, dict):
- name = name['name']
- if hasattr(self, "resource") and self.resource[0] == name:
- LOG.debug("Return invalid resource (%s) value: %s" %
- (self.resource[0], self.resource[1]))
- return self.resource[1]
- if name in self._resources:
- return self._resources[name]
- return None
-
-
-def SimpleNegativeAutoTest(klass):
- """
- This decorator registers a test function on basis of the class name.
- """
- @attr(type=['negative', 'gate'])
- def generic_test(self):
- if hasattr(self, '_schema'):
- self.execute(self._schema)
-
- cn = klass.__name__
- cn = cn.replace('JSON', '')
- cn = cn.replace('Test', '')
- # NOTE(mkoderer): replaces uppercase chars inside the class name with '_'
- lower_cn = re.sub('(?<!^)(?=[A-Z])', '_', cn).lower()
- func_name = 'test_%s' % lower_cn
- setattr(klass, func_name, generic_test)
- return klass
-
-
-def call_until_true(func, duration, sleep_for):
- """
- Call the given function until it returns True (and return True) or
- until the specified duration (in seconds) elapses (and return
- False).
-
- :param func: A zero argument callable that returns True on success.
- :param duration: The number of seconds for which to attempt a
- successful call of the function.
- :param sleep_for: The number of seconds to sleep after an unsuccessful
- invocation of the function.
- """
- now = time.time()
- timeout = now + duration
- while now < timeout:
- if func():
- return True
- time.sleep(sleep_for)
- now = time.time()
- return False