blob: 4f1e02cb137975f1e08162818921200ba2c0e9db [file] [log] [blame]
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Hewlett-Packard, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common.utils import misc
from tempest import config
from tempest.scenario import manager
import json
import re
import string
import unicodedata
CONF = config.CONF
@misc.singleton
class ImageUtils(object):
default_ssh_user = 'root'
def __init__(self):
# Load configuration items
self.ssh_users = json.loads(CONF.input_scenario.ssh_user_regex)
self.non_ssh_image_pattern = \
CONF.input_scenario.non_ssh_image_regex
# Setup clients
ocm = manager.OfficialClientManager(CONF.identity.username,
CONF.identity.password,
CONF.identity.tenant_name)
self.client = ocm.compute_client
def ssh_user(self, image_id):
_image = self.client.images.get(image_id)
for regex, user in self.ssh_users:
# First match wins
if re.match(regex, _image.name) is not None:
return user
else:
return self.default_ssh_user
def _is_sshable_image(self, image):
return not re.search(pattern=self.non_ssh_image_pattern,
string=str(image.name))
def is_sshable_image(self, image_id):
_image = self.client.images.get(image_id)
return self._is_sshable_image(_image)
def _is_flavor_enough(self, flavor, image):
return image.minDisk <= flavor.disk
def is_flavor_enough(self, flavor_id, image_id):
_image = self.client.images.get(image_id)
_flavor = self.client.flavors.get(flavor_id)
return self._is_flavor_enough(_flavor, _image)
@misc.singleton
class InputScenarioUtils(object):
"""
Example usage:
import testscenarios
(...)
load_tests = testscenarios.load_tests_apply_scenarios
class TestInputScenario(manager.OfficialClientTest):
scenario_utils = test_utils.InputScenarioUtils()
scenario_flavor = scenario_utils.scenario_flavors
scenario_image = scenario_utils.scenario_images
scenarios = testscenarios.multiply_scenarios(scenario_image,
scenario_flavor)
def test_create_server_metadata(self):
name = rand_name('instance')
_ = self.compute_client.servers.create(name=name,
flavor=self.flavor_ref,
image=self.image_ref)
"""
validchars = "-_.{ascii}{digit}".format(ascii=string.ascii_letters,
digit=string.digits)
def __init__(self):
ocm = manager.OfficialClientManager(CONF.identity.username,
CONF.identity.password,
CONF.identity.tenant_name)
self.client = ocm.compute_client
self.image_pattern = CONF.input_scenario.image_regex
self.flavor_pattern = CONF.input_scenario.flavor_regex
def _normalize_name(self, name):
nname = unicodedata.normalize('NFKD', name).encode('ASCII', 'ignore')
nname = ''.join(c for c in nname if c in self.validchars)
return nname
@property
def scenario_images(self):
"""
:return: a scenario with name and uuid of images
"""
if not hasattr(self, '_scenario_images'):
images = self.client.images.list(detailed=False)
self._scenario_images = [
(self._normalize_name(i.name), dict(image_ref=i.id))
for i in images if re.search(self.image_pattern, str(i.name))
]
return self._scenario_images
@property
def scenario_flavors(self):
"""
:return: a scenario with name and uuid of flavors
"""
if not hasattr(self, '_scenario_flavors'):
flavors = self.client.flavors.list(detailed=False)
self._scenario_flavors = [
(self._normalize_name(f.name), dict(flavor_ref=f.id))
for f in flavors if re.search(self.flavor_pattern, str(f.name))
]
return self._scenario_flavors