blob: 669481309db832913ff73a32e66fdf569fb2423e [file] [log] [blame]
import logging
import os
import random
import sys
import time
from cinderclient import client as cinder_client
from glanceclient import client as glance_client
from keystoneauth1 import identity as keystone_identity
from keystoneauth1 import session as keystone_session
from keystoneclient.v3 import client as keystone_client
from neutronclient.v2_0 import client as neutron_client
from neutronclient import common as neutron_common
from novaclient import client as novaclient
import utils
logger = logging.getLogger(__name__)
class OfficialClientManager(object):
"""Manager that provides access to the official python clients for
calling various OpenStack APIs.
"""
CINDERCLIENT_VERSION = 3
GLANCECLIENT_VERSION = 2
KEYSTONECLIENT_VERSION = 3
NEUTRONCLIENT_VERSION = 2
NOVACLIENT_VERSION = 2
INTERFACE = 'admin'
if "OS_ENDPOINT_TYPE" in list(os.environ.keys()):
INTERFACE = os.environ["OS_ENDPOINT_TYPE"]
def __init__(self, username=None, password=None,
project_name=None, auth_url=None, endpoint_type="internalURL",
cert=False, domain="Default", **kwargs):
self.traceback = ""
self.client_attr_names = [
"auth",
"compute",
"network",
"volume",
"image",
]
self.username = username
self.password = password
self.project_name = project_name
self.auth_url = auth_url
self.endpoint_type = endpoint_type
self.cert = cert
self.domain = domain
self.kwargs = kwargs
# Lazy clients
self._auth = None
self._compute = None
self._network = None
self._volume = None
self._image = None
@classmethod
def _get_auth_session(cls, username=None, password=None,
project_name=None, auth_url=None, cert=None,
domain='Default'):
if None in (username, password, project_name):
sys.stdout.write((username, password, project_name))
msg = ("Missing required credentials for identity client. "
"username: {username}, password: {password}, "
"project_name: {project_name}").format(
username=username,
password=password,
project_name=project_name
)
raise msg
if cert and "https" not in auth_url:
auth_url = auth_url.replace("http", "https")
if "v2" in auth_url:
raise BaseException("Keystone v2 is deprecated since OpenStack"
"Queens release. So current OS_AUTH_URL {} "
"is not valid. Please use Keystone v3."
"".format(auth_url))
else:
auth_url = auth_url if ("v3" in auth_url) else "{}{}".format(
auth_url, "/v3")
auth = keystone_identity.v3.Password(
auth_url=auth_url,
user_domain_name=domain,
username=username,
password=password,
project_domain_name=domain,
project_name=project_name)
auth_session = keystone_session.Session(auth=auth, verify=cert)
# auth_session.get_auth_headers()
return auth_session
@classmethod
def get_auth_client(cls, username=None, password=None,
project_name=None, auth_url=None, cert=None,
domain='Default', **kwargs):
session = cls._get_auth_session(
username=username,
password=password,
project_name=project_name,
auth_url=auth_url,
cert=cert,
domain=domain)
keystone = keystone_client.Client(version=cls.KEYSTONECLIENT_VERSION,
session=session, **kwargs)
keystone.management_url = auth_url
return keystone
@classmethod
def get_compute_client(cls, username=None, password=None,
project_name=None, auth_url=None, cert=None,
domain='Default', **kwargs):
session = cls._get_auth_session(
username=username, password=password, project_name=project_name,
auth_url=auth_url, cert=cert, domain=domain)
service_type = 'compute'
compute_client = novaclient.Client(
version=cls.NOVACLIENT_VERSION, session=session,
service_type=service_type, os_cache=False, **kwargs)
return compute_client
@classmethod
def get_network_client(cls, username=None, password=None,
project_name=None, auth_url=None, cert=None,
domain='Default', **kwargs):
session = cls._get_auth_session(
username=username, password=password, project_name=project_name,
auth_url=auth_url, cert=cert, domain=domain)
service_type = 'network'
return neutron_client.Client(
service_type=service_type, session=session,
interface=cls.INTERFACE, **kwargs)
@classmethod
def get_volume_client(cls, username=None, password=None,
project_name=None, auth_url=None, cert=None,
domain='Default', **kwargs):
session = cls._get_auth_session(
username=username, password=password, project_name=project_name,
auth_url=auth_url, cert=cert, domain=domain)
service_type = 'volume'
return cinder_client.Client(
version=cls.CINDERCLIENT_VERSION,
service_type=service_type,
interface=cls.INTERFACE,
session=session, **kwargs)
@classmethod
def get_image_client(cls, username=None, password=None,
project_name=None, auth_url=None, cert=None,
domain='Default', **kwargs):
session = cls._get_auth_session(
username=username, password=password, project_name=project_name,
auth_url=auth_url, cert=cert, domain=domain)
service_type = 'image'
return glance_client.Client(
version=cls.GLANCECLIENT_VERSION,
service_type=service_type,
session=session, interface=cls.INTERFACE,
**kwargs)
@property
def auth(self):
if self._auth is None:
self._auth = self.get_auth_client(
self.username, self.password, self.project_name, self.auth_url,
self.cert, self.domain, endpoint_type=self.endpoint_type
)
return self._auth
@property
def compute(self):
if self._compute is None:
self._compute = self.get_compute_client(
self.username, self.password, self.project_name, self.auth_url,
self.cert, self.domain, endpoint_type=self.endpoint_type
)
return self._compute
@property
def network(self):
if self._network is None:
self._network = self.get_network_client(
self.username, self.password, self.project_name, self.auth_url,
self.cert, self.domain, endpoint_type=self.endpoint_type
)
return self._network
@property
def volume(self):
if self._volume is None:
self._volume = self.get_volume_client(
self.username, self.password, self.project_name, self.auth_url,
self.cert, self.domain, endpoint_type=self.endpoint_type
)
return self._volume
@property
def image(self):
if self._image is None:
self._image = self.get_image_client(
self.username, self.password, self.project_name, self.auth_url,
self.cert, self.domain
)
return self._image
class OSCliActions(object):
def __init__(self, os_clients):
self.os_clients = os_clients
self.create_fake_ext_net = False
def get_project_by_name(self, name):
return self.os_clients.auth.projects.find(name=name)
def get_internal_network(self):
networks = [
net for net in self.os_clients.network.list_networks()["networks"]
if net["admin_state_up"] and not net["router:external"] and
len(net["subnets"])
]
if networks:
net = networks[0]
else:
net = self.create_network_resources()
return net
def create_fake_external_network(self):
logger.info(
"Could not find any external network, creating a fake one...")
net_name = "spt-ext-net-{}".format(random.randrange(100, 999))
net_body = {"network": {"name": net_name,
"router:external": True,
"provider:network_type": "local"}}
try:
ext_net = \
self.os_clients.network.create_network(net_body)['network']
logger.info("Created a fake external net {}".format(net_name))
except Exception as e:
# in case 'local' net type is absent, create with default type
net_body["network"].pop('provider:network_type', None)
ext_net = \
self.os_clients.network.create_network(net_body)['network']
subnet_name = "spt-ext-subnet-{}".format(random.randrange(100, 999))
subnet_body = {
"subnet": {
"name": subnet_name,
"network_id": ext_net["id"],
"ip_version": 4,
"cidr": "10.255.255.0/24",
"allocation_pools": [{"start": "10.255.255.100",
"end": "10.255.255.200"}]
}
}
self.os_clients.network.create_subnet(subnet_body)
self.create_fake_ext_net = True
return ext_net
def get_external_network(self):
config = utils.get_configuration()
ext_net = config.get('external_network') or ''
if not ext_net:
networks = [
net for net in
self.os_clients.network.list_networks()["networks"]
if net["admin_state_up"] and net["router:external"] and
len(net["subnets"])
]
else:
networks = [net for net in
self.os_clients.network.list_networks()["networks"]
if net["name"] == ext_net]
if networks:
ext_net = networks[0]
logger.info("Using external net '{}'".format(ext_net["name"]))
else:
ext_net = self.create_fake_external_network()
return ext_net
def create_flavor(self, name, ram=256, vcpus=1, disk=2):
logger.info("Creating a flavor {}".format(name))
return self.os_clients.compute.flavors.create(name, ram, vcpus, disk)
def create_sec_group(self, rulesets=None):
if rulesets is None:
rulesets = [
{
# ssh
'protocol': 'tcp',
'port_range_max': 22,
'port_range_min': 22,
'remote_ip_prefix': '0.0.0.0/0',
'direction': 'ingress'
},
{
# iperf3
'protocol': 'tcp',
'port_range_max': 5201,
'port_range_min': 5201,
'remote_ip_prefix': '0.0.0.0/0',
'direction': 'ingress'
},
{
# iperf
'protocol': 'tcp',
'port_range_max': 5001,
'port_range_min': 5001,
'remote_ip_prefix': '0.0.0.0/0',
'direction': 'ingress'
},
{
# ping
'protocol': 'icmp',
'remote_ip_prefix': '0.0.0.0/0',
'direction': 'ingress'
}
]
sg_name = "spt-test-secgroup-{}".format(random.randrange(100, 999))
sg_desc = sg_name + " SPT"
body = {"security_group": {"name": sg_name, "description": sg_desc}}
secgroup = self.os_clients.network.create_security_group(body=body)
rule_body_teplate = {"security_group_rule": {}}
for ruleset in rulesets:
rule_body_teplate["security_group_rule"] = ruleset
rule_body_teplate["security_group_rule"]["security_group_id"] = \
secgroup['security_group']['id']
self.os_clients.network.create_security_group_rule(
body=rule_body_teplate)
logger.info("Created a security group {}".format(sg_name))
return secgroup['security_group']
def create_basic_server(self, image=None, flavor=None, net=None,
availability_zone=None, sec_groups=(),
keypair=None):
os_conn = self.os_clients
net = net or self.get_internal_network()
kwargs = {}
if sec_groups:
kwargs['security_groups'] = sec_groups
server = os_conn.compute.servers.create(
"spt-test-server-{}".format(random.randrange(100, 999)),
image, flavor, nics=[{"net-id": net["id"]}],
availability_zone=availability_zone, key_name=keypair, **kwargs)
return server
def get_vm(self, vm_id):
os_conn = self.os_clients
try:
vm = os_conn.compute.servers.find(id=vm_id)
except Exception as e:
raise Exception(
"Could not get the VM \"{}\": {}".format(
vm_id, e))
return vm
def check_vm_is_active(self, vm_uuid, retry_delay=5, timeout=500):
vm = None
timeout_reached = False
start_time = time.time()
expected_state = 'ACTIVE'
while not timeout_reached:
vm = self.get_vm(vm_uuid)
if vm.status == expected_state:
logger.info(
"VM {} is in {} status.".format(vm_uuid, vm.status))
break
if vm.status == 'ERROR':
break
time.sleep(retry_delay)
timeout_reached = (time.time() - start_time) > timeout
if vm.status != expected_state:
logger.info("VM {} is in {} status.".format(vm_uuid, vm.status))
raise TimeoutError(
"VM {vm_uuid} on is expected to be in '{expected_state}' "
"state, but is in '{actual}' state instead.".format(
vm_uuid=vm_uuid, expected_state=expected_state,
actual=vm.status))
def create_network(self, project_id):
net_name = "spt-test-net-{}".format(random.randrange(100, 999))
config = utils.get_configuration()
mtu = config.get('custom_mtu') or 'default'
net_body = {
'network': {
'name': net_name,
'project_id': project_id
}
}
# in TF2011 we cannot set MTU while creating the net: 400 Bad request
# error can happen with 'mtu' field, so we update mtu of ports later
if (mtu != 'default') and (not self.is_cloud_tf()):
try:
net_body['network']['mtu'] = int(mtu)
except ValueError as e:
raise ValueError("MTU value '{}' is not correct. "
"Must be an integer at 'custom_mtu' in "
"global_config.yaml.\n{}".format(mtu, e))
net = self.os_clients.network.create_network(net_body)['network']
# WA for TF because the network object does not have 'mtu' field by
# default, so this blocked running tests at TF envs with default MTU
if 'mtu' not in net:
net['mtu'] = None
logger.info("Created internal network {} in {} project".format(
net_name, project_id))
return net
def create_subnet(self, net, project_id, cidr=None):
subnet_name = "spt-test-subnet-{}".format(random.randrange(100, 999))
subnet_body = {
'subnet': {
"name": subnet_name,
'network_id': net['id'],
'ip_version': 4,
'cidr': cidr if cidr else '10.1.7.0/24',
'project_id': project_id
}
}
subnet = self.os_clients.network.create_subnet(subnet_body)['subnet']
logger.info("Created subnet {} in {} project".format(
subnet_name, project_id))
return subnet
def create_router(self, ext_net, project_id):
name = 'spt-test-router-{}'.format(random.randrange(100, 999))
router_body = {
'router': {
'name': name,
'external_gateway_info': {
'network_id': ext_net['id']
},
'project_id': project_id
}
}
logger.info("Created a router {} in {} project".format(
name, project_id))
router = self.os_clients.network.create_router(router_body)['router']
return router
def create_network_resources(self, project="admin", cidr=None):
project_id = self.get_project_by_name(project).id
self.get_external_network()
net = self.create_network(project_id)
self.create_subnet(net, project_id, cidr)
return net
def list_nova_computes(self):
nova_services = self.os_clients.compute.hosts.list()
computes_list = [h for h in nova_services if h.service == "compute"]
return computes_list
def create_floating_ip(self, floating_net_id):
fip = self.os_clients.network.create_floatingip({"floatingip": {
"floating_network_id": floating_net_id}})
return fip['floatingip']
def delete_floating_ip(self, floatingip_id):
try:
return self.os_clients.network.delete_floatingip(floatingip_id)
except neutron_common.exceptions.NotFound as e:
msg = "Could not delete a Floating IP, UUID {}. Error: {}" \
"".format(floatingip_id, e)
logger.info(msg)
def create_project(self):
project_name = "spt-test-project-{}".format(random.randrange(100, 999))
project = self.os_clients.auth.projects.create(
name=project_name, domain=self.os_clients.domain,
description="Mirantis SPT test project")
logger.info("Created a project {}, uuid: {}".format(
project.name, project.id))
return project
def add_roles_to_user_in_project(self, project_id, username='admin',
domain='default', roles=None):
user_id = [
user.id for user in self.os_clients.auth.users.list()
if (user.name == username) and (user.domain_id == domain)][0]
if roles is None:
roles = ["admin", "member", "creator"]
for role in roles:
try:
role_id = self.os_clients.auth.roles.list(name=role)[0].id
self.os_clients.auth.roles.grant(
role=role_id, user=user_id, project=project_id)
except Exception as e:
continue
logger.info("Added admin user to {} project".format(project_id))
def is_project_empty(self, project_id):
sec_groups = [i for i in self.os_clients.network.list_security_groups(
tenant_id=project_id)['security_groups'] if i['name'] != 'default']
servers = self.os_clients.compute.servers.list(
search_opts={'project_id': project_id})
nets = self.os_clients.network.list_networks(
project_id=project_id)["networks"]
subnets = self.os_clients.network.list_subnets(
project_id=project_id)["subnets"]
ports = self.os_clients.network.list_ports(
project_id=project_id)["ports"]
routers = self.os_clients.network.list_routers(
project_id=project_id)["routers"]
resources = [*sec_groups, *servers, *nets, *subnets, *ports, *routers]
return not bool(resources)
def is_cloud_tf(self):
# Detect the TF cloud by assuming it does not have any neutron
# agents (404 in response)
try:
self.os_clients.network.list_agents()
except neutron_common.exceptions.NotFound:
logger.info("MOS TF cloud is detected.")
return True
return False
def update_network_port_with_custom_mtu(self, vm_uuid, custom_mtu):
port_uuid = self.os_clients.network.list_ports(
device_id=vm_uuid).get("ports")[0]["id"]
body = {"port": {"extra_dhcp_opts": [
{"opt_name": "interface-mtu", "opt_value": str(custom_mtu)}]}}
try:
self.os_clients.network.update_port(port_uuid, body)
except Exception as e:
raise Exception("Could not set custom MTU by updating the port. "
"See detailed error: {}".format(e))
logger.info("The port {} is updated with custom MTU {}."
"".format(port_uuid, custom_mtu))
def get_flavor_id_by_name(self, name):
flavors = [flavor for flavor in self.os_clients.compute.flavors.list()]
flavor_id = [f.id for f in flavors if f.name == name]
if not flavor_id:
return None
return str(flavor_id[0])