blob: f8d9dd498fbc7c0a46cc9f8da98fc0d415e881f6 [file] [log] [blame]
# Copyright 2013 Hewlett-Packard, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import re
import string
import unicodedata
from tempest_lib.common.utils import misc
import testscenarios
import testtools
from tempest import clients
from tempest.common import credentials
from tempest import config
from tempest import exceptions
CONF = config.CONF
class ImageUtils(object):
default_ssh_user = 'root'
def __init__(self, os):
# Load configuration items
self.ssh_users = json.loads(CONF.input_scenario.ssh_user_regex)
self.non_ssh_image_pattern = \
CONF.input_scenario.non_ssh_image_regex
# Setup clients
self.images_client = os.images_client
self.flavors_client = os.flavors_client
def ssh_user(self, image_id):
_image = self.images_client.show_image(image_id)
for regex, user in self.ssh_users:
# First match wins
if re.match(regex, _image['name']) is not None:
return user
else:
return self.default_ssh_user
def _is_sshable_image(self, image):
return not re.search(pattern=self.non_ssh_image_pattern,
string=str(image['name']))
def is_sshable_image(self, image_id):
_image = self.images_client.show_image(image_id)
return self._is_sshable_image(_image)
def _is_flavor_enough(self, flavor, image):
return image['minDisk'] <= flavor['disk']
def is_flavor_enough(self, flavor_id, image_id):
_image = self.images_client.show_image(image_id)
_flavor = self.flavors_client.get_flavor_details(flavor_id)
return self._is_flavor_enough(_flavor, _image)
@misc.singleton
class InputScenarioUtils(object):
"""
Example usage:
import testscenarios
(...)
load_tests = testscenarios.load_tests_apply_scenarios
class TestInputScenario(manager.ScenarioTest):
scenario_utils = utils.InputScenarioUtils()
scenario_flavor = scenario_utils.scenario_flavors
scenario_image = scenario_utils.scenario_images
scenarios = testscenarios.multiply_scenarios(scenario_image,
scenario_flavor)
def test_create_server_metadata(self):
name = rand_name('instance')
self.servers_client.create_server(name=name,
flavor_ref=self.flavor_ref,
image_ref=self.image_ref)
"""
validchars = "-_.{ascii}{digit}".format(ascii=string.ascii_letters,
digit=string.digits)
def __init__(self):
network_resources = {
'network': False,
'router': False,
'subnet': False,
'dhcp': False,
}
self.isolated_creds = credentials.get_isolated_credentials(
name='InputScenarioUtils',
identity_version=CONF.identity.auth_version,
network_resources=network_resources)
os = clients.Manager(self.isolated_creds.get_primary_creds())
self.images_client = os.images_client
self.flavors_client = os.flavors_client
self.image_pattern = CONF.input_scenario.image_regex
self.flavor_pattern = CONF.input_scenario.flavor_regex
def _normalize_name(self, name):
nname = unicodedata.normalize('NFKD', name).encode('ASCII', 'ignore')
nname = ''.join(c for c in nname if c in self.validchars)
return nname
def clear_creds(self):
self.isolated_creds.clear_isolated_creds()
@property
def scenario_images(self):
"""
:return: a scenario with name and uuid of images
"""
if not CONF.service_available.glance:
return []
if not hasattr(self, '_scenario_images'):
try:
images = self.images_client.list_images()
self._scenario_images = [
(self._normalize_name(i['name']), dict(image_ref=i['id']))
for i in images if re.search(self.image_pattern,
str(i['name']))
]
except Exception:
self._scenario_images = []
return self._scenario_images
@property
def scenario_flavors(self):
"""
:return: a scenario with name and uuid of flavors
"""
if not hasattr(self, '_scenario_flavors'):
try:
flavors = self.flavors_client.list_flavors()
self._scenario_flavors = [
(self._normalize_name(f['name']), dict(flavor_ref=f['id']))
for f in flavors if re.search(self.flavor_pattern,
str(f['name']))
]
except Exception:
self._scenario_flavors = []
return self._scenario_flavors
def load_tests_input_scenario_utils(*args):
"""
Wrapper for testscenarios to set the scenarios to avoid running a getattr
on the CONF object at import.
"""
if getattr(args[0], 'suiteClass', None) is not None:
loader, standard_tests, pattern = args
else:
standard_tests, module, loader = args
output = None
scenario_utils = None
try:
scenario_utils = InputScenarioUtils()
scenario_flavor = scenario_utils.scenario_flavors
scenario_image = scenario_utils.scenario_images
except (exceptions.InvalidConfiguration, TypeError):
output = standard_tests
finally:
if scenario_utils:
scenario_utils.clear_creds()
if output is not None:
return output
for test in testtools.iterate_tests(standard_tests):
setattr(test, 'scenarios', testscenarios.multiply_scenarios(
scenario_image,
scenario_flavor))
return testscenarios.load_tests_apply_scenarios(*args)