Add stage base class, refactor discovery, etc
diff --git a/wally/openstack_api.py b/wally/openstack_api.py
new file mode 100644
index 0000000..2e9ab63
--- /dev/null
+++ b/wally/openstack_api.py
@@ -0,0 +1,653 @@
+import re
+import os
+import stat
+import time
+import os.path
+import logging
+import tempfile
+import subprocess
+import urllib.request
+from typing import Dict, Any, Iterable, Iterator, NamedTuple, Optional, List, Tuple, Set
+from concurrent.futures import ThreadPoolExecutor
+
+from keystoneauth1 import loading, session
+from novaclient.exceptions import NotFound
+from novaclient.client import Client as NovaClient
+from cinderclient.client import Client as CinderClient
+from glanceclient import Client as GlanceClient
+
+from .utils import Timeout
+from .node_interfaces import NodeInfo
+from .storage import IStorable
+from .ssh_utils import ConnCreds
+
+
+__doc__ = """
+Module used to reliably spawn set of VM's, evenly distributed across
+compute servers in openstack cluster. Main functions:
+
+ get_openstack_credentials - extract openstack credentials from different sources
+ os_connect - connect to nova, cinder and glance API
+ find_vms - find VM's with given prefix in name
+ prepare_os - prepare tenant for usage
+ launch_vms - reliably start set of VM in parallel with volumes and floating IP
+ clear_nodes - clear VM and volumes
+"""
+
+
+logger = logging.getLogger("wally.vms")
+
+
+OSCreds = NamedTuple("OSCreds",
+ [("name", str),
+ ("passwd", str),
+ ("tenant", str),
+ ("auth_url", str),
+ ("insecure", bool)])
+
+
+# TODO(koder): should correctly process different sources, not only env????
+def get_openstack_credentials() -> OSCreds:
+ is_insecure = os.environ.get('OS_INSECURE', 'false').lower() in ('true', 'yes')
+
+ return OSCreds(os.environ.get('OS_USERNAME'),
+ os.environ.get('OS_PASSWORD'),
+ os.environ.get('OS_TENANT_NAME'),
+ os.environ.get('OS_AUTH_URL'),
+ is_insecure)
+
+
+class OSConnection:
+ def __init__(self, nova: NovaClient, cinder: CinderClient, glance: GlanceClient) -> None:
+ self.nova = nova
+ self.cinder = cinder
+ self.glance = glance
+
+
+def os_connect(os_creds: OSCreds, version: str = "2") -> OSConnection:
+ loader = loading.get_plugin_loader('password')
+ auth = loader.load_from_options(auth_url=os_creds.auth_url,
+ username=os_creds.name,
+ password=os_creds.passwd,
+ project_id=os_creds.tenant)
+ auth_sess = session.Session(auth=auth)
+
+ glance = GlanceClient(version, session=auth_sess)
+ nova = NovaClient(version, session=auth_sess)
+ cinder = CinderClient(os_creds.name, os_creds.passwd, os_creds.tenant, os_creds.auth_url,
+ insecure=os_creds.insecure, api_version=version)
+ return OSConnection(nova, cinder, glance)
+
+
+def find_vms(conn: OSConnection, name_prefix: str) -> Iterable[Tuple[str, int]]:
+ for srv in conn.nova.servers.list():
+ if srv.name.startswith(name_prefix):
+ # need to exit after found server first external IP
+ # so have to rollout two cycles to avoid using exceptions
+ all_ip = [] # type: List[Any]
+ for ips in srv.addresses.values():
+ all_ip.extend(ips)
+
+ for ip in all_ip:
+ if ip.get("OS-EXT-IPS:type", None) == 'floating':
+ yield ip['addr'], srv.id
+ break
+
+
+def pause(conn: OSConnection, ids: Iterable[int], executor: ThreadPoolExecutor) -> None:
+ def pause_vm(vm_id: str) -> None:
+ vm = conn.nova.servers.get(vm_id)
+ if vm.status == 'ACTIVE':
+ vm.pause()
+
+ for future in executor.map(pause_vm, ids):
+ future.result()
+
+
+def unpause(conn: OSConnection, ids: Iterable[int], executor: ThreadPoolExecutor, max_resume_time=10) -> None:
+ def unpause(vm_id: str) -> None:
+ vm = conn.nova.servers.get(vm_id)
+ if vm.status == 'PAUSED':
+ vm.unpause()
+
+ for _ in Timeout(max_resume_time):
+ vm = conn.nova.servers.get(vm_id)
+ if vm.status != 'PAUSED':
+ return
+ raise RuntimeError("Can't unpause vm {0}".format(vm_id))
+
+ for future in executor.map(unpause, ids):
+ future.result()
+
+
+def prepare_os(conn: OSConnection, params: Dict[str, Any], max_vm_per_node: int = 8) -> None:
+ """prepare openstack for futher usage
+
+ Creates server groups, security rules, keypair, flavor
+ and upload VM image from web. In case if object with
+ given name already exists, skip preparation part.
+ Don't check, that existing object has required attributes
+
+ params:
+ nova: OSConnection
+ params: dict {
+ security_group:str - security group name with allowed ssh and ping
+ aa_group_name:str - template for anti-affinity group names. Should
+ receive one integer parameter, like "cbt_aa_{0}"
+ keypair_name: str - OS keypair name
+ keypair_file_public: str - path to public key file
+ keypair_file_private: str - path to private key file
+
+ flavor:dict - flavor params
+ name, ram_size, hdd_size, cpu_count
+ as for novaclient.Client.flavor.create call
+
+ image:dict - image params
+ 'name': image name
+ 'url': image url
+ }
+ os_creds: OSCreds
+ max_vm_per_compute: int=8 maximum expected amount of VM, per
+ compute host. Used to create appropriate
+ count of server groups for even placement
+ """
+ allow_ssh_and_ping(conn, params['security_group'])
+
+ for idx in range(max_vm_per_node):
+ get_or_create_aa_group(conn, params['aa_group_name'].format(idx))
+
+ create_keypair(conn, params['keypair_name'], params['keypair_file_public'], params['keypair_file_private'])
+ create_image(conn, params['image']['name'], params['image']['url'])
+ create_flavor(conn, **params['flavor'])
+
+
+def create_keypair(conn: OSConnection, name: str, pub_key_path: str, priv_key_path: str):
+ """create and upload keypair into nova, if doesn't exists yet
+
+ Create and upload keypair into nova, if keypair with given bane
+ doesn't exists yet. Uses key from files, if file doesn't exists -
+ create new keys, and store'em into files.
+
+ parameters:
+ conn: OSConnection
+ name: str - ketpair name
+ pub_key_path: str - path for public key
+ priv_key_path: str - path for private key
+ """
+
+ pub_key_exists = os.path.exists(pub_key_path)
+ priv_key_exists = os.path.exists(priv_key_path)
+
+ try:
+ kpair = conn.nova.keypairs.find(name=name)
+ # if file not found- delete and recreate
+ except NotFound:
+ kpair = None
+
+ if pub_key_exists and not priv_key_exists:
+ raise EnvironmentError("Private key file doesn't exists")
+
+ if not pub_key_exists and priv_key_exists:
+ raise EnvironmentError("Public key file doesn't exists")
+
+ if kpair is None:
+ if pub_key_exists:
+ with open(pub_key_path) as pub_key_fd:
+ return conn.nova.keypairs.create(name, pub_key_fd.read())
+ else:
+ key = conn.nova.keypairs.create(name)
+
+ with open(priv_key_path, "w") as priv_key_fd:
+ priv_key_fd.write(key.private_key)
+ os.chmod(priv_key_path, stat.S_IREAD | stat.S_IWRITE)
+
+ with open(pub_key_path, "w") as pub_key_fd:
+ pub_key_fd.write(key.public_key)
+ elif not priv_key_exists:
+ raise EnvironmentError("Private key file doesn't exists," +
+ " but key uploaded openstack." +
+ " Either set correct path to private key" +
+ " or remove key from openstack")
+
+
+def get_or_create_aa_group(conn: OSConnection, name: str) -> int:
+ """create anti-affinity server group, if doesn't exists yet
+
+ parameters:
+ conn: OSConnection
+ name: str - group name
+
+ returns: str - group id
+ """
+ try:
+ return conn.nova.server_groups.find(name=name).id
+ except NotFound:
+ return conn.nova.server_groups.create(name=name, policies=['anti-affinity']).id
+
+
+def allow_ssh_and_ping(conn: OSConnection, group_name: str) -> int:
+ """create sequrity group for ping and ssh
+
+ parameters:
+ conn:
+ group_name: str - group name
+
+ returns: str - group id
+ """
+ try:
+ secgroup = conn.nova.security_groups.find(name=group_name)
+ except NotFound:
+ secgroup = conn.nova.security_groups.create(group_name, "allow ssh/ping to node")
+
+ conn.nova.security_group_rules.create(secgroup.id,
+ ip_protocol="tcp",
+ from_port="22",
+ to_port="22",
+ cidr="0.0.0.0/0")
+
+ conn.nova.security_group_rules.create(secgroup.id,
+ ip_protocol="icmp",
+ from_port=-1,
+ cidr="0.0.0.0/0",
+ to_port=-1)
+ return secgroup.id
+
+
+def create_image(conn: OSConnection, name: str, url: str) -> None:
+ """upload image into glance from given URL, if given image doesn't exisis yet
+
+ parameters:
+ nova: nova connection
+ os_creds: OSCreds object - openstack credentials, should be same,
+ as used when connectiong given novaclient
+ name: str - image name
+ url: str - image download url
+
+ returns: None
+ """
+ try:
+ conn.nova.images.find(name=name)
+ return
+ except NotFound:
+ pass
+
+ ok = False
+ with tempfile.NamedTemporaryFile() as temp_fd:
+ try:
+ cmd = "wget --dns-timeout=30 --connect-timeout=30 --read-timeout=30 -o {} {}"
+ subprocess.check_call(cmd.format(temp_fd.name, url))
+ ok = True
+
+ # TODO(koder): add proper error handling
+ except Exception:
+ pass
+
+ if not ok:
+ urllib.request.urlretrieve(url, temp_fd.name)
+
+ image = conn.glance.images.create(name=name)
+ with open(temp_fd.name, 'rb') as fd:
+ conn.glance.images.upload(image.id, fd)
+
+
+def create_flavor(conn: OSConnection, name: str, ram_size: int, hdd_size: int, cpu_count: int) -> None:
+ """create flavor, if doesn't exisis yet
+
+ parameters:
+ nova: nova connection
+ name: str - flavor name
+ ram_size: int - ram size (UNIT?)
+ hdd_size: int - root hdd size (UNIT?)
+ cpu_count: int - cpu cores
+
+ returns: None
+ """
+ try:
+ conn.nova.flavors.find(name)
+ return
+ except NotFound:
+ pass
+
+ conn.nova.flavors.create(name, cpu_count, ram_size, hdd_size)
+
+
+def create_volume(conn: OSConnection, size: int, name: str) -> Any:
+ vol = conn.cinder.volumes.create(size=size, display_name=name)
+ err_count = 0
+
+ while vol.status != 'available':
+ if vol.status == 'error':
+ if err_count == 3:
+ logger.critical("Fail to create volume")
+ raise RuntimeError("Fail to create volume")
+ else:
+ err_count += 1
+ conn.cinder.volumes.delete(vol)
+ time.sleep(1)
+ vol = conn.cinder.volumes.create(size=size, display_name=name)
+ continue
+ time.sleep(1)
+ vol = conn.cinder.volumes.get(vol.id)
+ return vol
+
+
+def wait_for_server_active(conn: OSConnection, server: Any, timeout: int = 300) -> bool:
+ """waiting till server became active
+
+ parameters:
+ nova: nova connection
+ server: server object
+ timeout: int - seconds to wait till raise an exception
+
+ returns: None
+ """
+
+ for _ in Timeout(timeout, no_exc=True):
+ server_state = getattr(server, 'OS-EXT-STS:vm_state').lower()
+
+ if server_state == 'active':
+ return True
+
+ if server_state == 'error':
+ return False
+
+ server = conn.nova.servers.get(server)
+ return False
+
+
+class Allocate(object):
+ pass
+
+
+def get_floating_ips(conn: OSConnection, pool: Optional[str], amount: int) -> List[str]:
+ """allocate floating ips
+
+ parameters:
+ nova: nova connection
+ pool:str floating ip pool name
+ amount:int - ip count
+
+ returns: [ip object]
+ """
+ ip_list = conn.nova.floating_ips.list()
+
+ if pool is not None:
+ ip_list = [ip for ip in ip_list if ip.pool == pool]
+
+ return [ip for ip in ip_list if ip.instance_id is None][:amount]
+
+
+def launch_vms(conn: OSConnection,
+ params: Dict[str, Any],
+ executor: ThreadPoolExecutor,
+ already_has_count: int = 0) -> Iterator[NodeInfo]:
+ """launch virtual servers
+
+ Parameters:
+ nova: nova client
+ params: dict {
+ count: str or int - server count. If count is string it should be in
+ one of bext forms: "=INT" or "xINT". First mean
+ to spawn (INT - already_has_count) servers, and
+ all should be evenly distributed across all compute
+ nodes. xINT mean spawn COMPUTE_COUNT * INT servers.
+ image: dict {'name': str - image name}
+ flavor: dict {'name': str - flavor name}
+ group_name: str - group name, used to create uniq server name
+ keypair_name: str - ssh keypais name
+ keypair_file_private: str - path to private key
+ user: str - vm user name
+ vol_sz: int or None - volume size, or None, if no volume
+ network_zone_name: str - network zone name
+ flt_ip_pool: str - floating ip pool
+ name_templ: str - server name template, should receive two parameters
+ 'group and id, like 'cbt-{group}-{id}'
+ aa_group_name: str scheduler group name
+ security_group: str - security group name
+ }
+ already_has_count: int=0 - how many servers already exists. Used to distribute
+ new servers evenly across all compute nodes, taking
+ old server in accout
+ returns: generator of NodeInfo - server credentials, in format USER@IP:KEY_PATH
+
+ """
+ logger.debug("Calculating new vm count")
+ count = params['count'] # type: int
+ lst = conn.nova.services.list(binary='nova-compute')
+ srv_count = len([srv for srv in lst if srv.status == 'enabled'])
+
+ if isinstance(count, str):
+ if count.startswith("x"):
+ count = srv_count * int(count[1:])
+ else:
+ assert count.startswith('=')
+ count = int(count[1:]) - already_has_count
+
+ if count <= 0:
+ logger.debug("Not need new vms")
+ return
+
+ logger.debug("Starting new nodes on openstack")
+
+ assert isinstance(count, int)
+
+ srv_params = "img: {image[name]}, flavor: {flavor[name]}".format(**params)
+ msg_templ = "Will start {0} servers with next params: {1}"
+ logger.info(msg_templ.format(count, srv_params))
+
+ vm_params = dict(
+ img_name=params['image']['name'],
+ flavor_name=params['flavor']['name'],
+ group_name=params['group_name'],
+ keypair_name=params['keypair_name'],
+ vol_sz=params.get('vol_sz'),
+ network_zone_name=params.get("network_zone_name"),
+ flt_ip_pool=params.get('flt_ip_pool'),
+ name_templ=params.get('name_templ'),
+ scheduler_hints={"group": params['aa_group_name']},
+ security_group=params['security_group'],
+ sec_group_size=srv_count
+ )
+
+ # precache all errors before start creating vms
+ private_key_path = params['keypair_file_private']
+ user = params['image']['user']
+
+ for ip, os_node in create_vms_mt(conn, count, executor, **vm_params):
+ info = NodeInfo(ConnCreds(ip, user, key_file=private_key_path), set())
+ info.os_vm_id = os_node.id
+ yield info
+
+
+def get_free_server_groups(conn: OSConnection, template: str) -> Iterator[str]:
+ """get fre server groups, that match given name template
+
+ parameters:
+ nova: nova connection
+ template:str - name template
+ amount:int - ip count
+
+ returns: generator or str - server group names
+ """
+ for server_group in conn.nova.server_groups.list():
+ if not server_group.members:
+ if re.match(template, server_group.name):
+ yield str(server_group.id)
+
+
+def create_vms_mt(conn: OSConnection,
+ amount: int,
+ executor: ThreadPoolExecutor,
+ group_name: str,
+ keypair_name: str,
+ img_name: str,
+ flavor_name: str,
+ vol_sz: int = None,
+ network_zone_name: str = None,
+ flt_ip_pool: str = None,
+ name_templ: str ='wally-{id}',
+ scheduler_hints: Dict = None,
+ security_group: str = None,
+ sec_group_size: int = None) -> List[Tuple[str, Any]]:
+
+ if network_zone_name is not None:
+ network_future = executor.submit(conn.nova.networks.find,
+ label=network_zone_name)
+ else:
+ network_future = None
+
+ fl_future = executor.submit(conn.nova.flavors.find, name=flavor_name)
+ img_future = executor.submit(conn.nova.images.find, name=img_name)
+
+ if flt_ip_pool is not None:
+ ips_future = executor.submit(get_floating_ips,
+ conn, flt_ip_pool, amount)
+ logger.debug("Wait for floating ip")
+ ips = ips_future.result()
+ ips += [Allocate] * (amount - len(ips))
+ else:
+ ips = [None] * amount
+
+ logger.debug("Getting flavor object")
+ fl = fl_future.result()
+ logger.debug("Getting image object")
+ img = img_future.result()
+
+ if network_future is not None:
+ logger.debug("Waiting for network results")
+ nics = [{'net-id': network_future.result().id}]
+ else:
+ nics = None
+
+ names = [] # type: List[str]
+ for i in range(amount):
+ names.append(name_templ.format(group=group_name, id=i))
+
+ futures = []
+ logger.debug("Requesting new vm's")
+
+ orig_scheduler_hints = scheduler_hints.copy()
+ group_name_template = scheduler_hints['group'].format("\\d+")
+ groups = list(get_free_server_groups(conn, group_name_template + "$"))
+ groups.sort()
+
+ for idx, (name, flt_ip) in enumerate(zip(names, ips), 2):
+
+ scheduler_hints = None
+ if orig_scheduler_hints is not None and sec_group_size is not None:
+ if "group" in orig_scheduler_hints:
+ scheduler_hints = orig_scheduler_hints.copy()
+ scheduler_hints['group'] = groups[idx // sec_group_size]
+
+ if scheduler_hints is None:
+ scheduler_hints = orig_scheduler_hints.copy()
+
+ params = (conn, name, keypair_name, img, fl,
+ nics, vol_sz, flt_ip, scheduler_hints,
+ flt_ip_pool, [security_group])
+
+ futures.append(executor.submit(create_vm, *params))
+ res = [future.result() for future in futures]
+ logger.debug("Done spawning")
+ return res
+
+
+def create_vm(conn: OSConnection,
+ name: str,
+ keypair_name: str,
+ img: Any,
+ flavor: Any,
+ nics: List,
+ vol_sz: int = None,
+ flt_ip: Any = False,
+ scheduler_hints: Dict = None,
+ pool: str = None,
+ security_groups=None,
+ max_retry: int = 3,
+ delete_timeout: int = 120) -> Tuple[str, Any]:
+
+ # make mypy/pylint happy
+ srv = None # type: Any
+ for i in range(max_retry):
+ srv = conn.nova.servers.create(name, flavor=flavor, image=img, nics=nics, key_name=keypair_name,
+ scheduler_hints=scheduler_hints, security_groups=security_groups)
+
+ if not wait_for_server_active(conn, srv):
+ logger.debug("Server {} fails to start. Kill it and try again".format(srv))
+ conn.nova.servers.delete(srv)
+
+ try:
+ for _ in Timeout(delete_timeout, "Server {} delete timeout".format(srv.id)):
+ srv = conn.nova.servers.get(srv.id)
+ except NotFound:
+ pass
+ else:
+ break
+ else:
+ raise RuntimeError("Failed to start server {}".format(srv.id))
+
+ if vol_sz is not None:
+ vol = create_volume(conn, vol_sz, name)
+ conn.nova.volumes.create_server_volume(srv.id, vol.id, None)
+
+ if flt_ip is Allocate:
+ flt_ip = conn.nova.floating_ips.create(pool)
+
+ if flt_ip is not None:
+ srv.add_floating_ip(flt_ip)
+
+ # pylint: disable=E1101
+ return flt_ip.ip, conn.nova.servers.get(srv.id)
+
+
+def clear_nodes(conn: OSConnection,
+ ids: List[int] = None,
+ name_templ: str = None,
+ max_server_delete_time: int = 120):
+ try:
+ def need_delete(srv):
+ if name_templ is not None:
+ return re.match(name_templ.format("\\d+"), srv.name) is not None
+ else:
+ return srv.id in ids
+
+ volumes_to_delete = []
+ for vol in conn.cinder.volumes.list():
+ for attachment in vol.attachments:
+ if attachment['server_id'] in ids:
+ volumes_to_delete.append(vol)
+ break
+
+ still_alive = set()
+ for srv in conn.nova.servers.list():
+ if need_delete(srv):
+ logger.debug("Deleting server {0}".format(srv.name))
+ conn.nova.servers.delete(srv)
+ still_alive.add(srv.id)
+
+ if still_alive:
+ logger.debug("Waiting till all servers are actually deleted")
+ tout = Timeout(max_server_delete_time, no_exc=True)
+ while tout.tick() and still_alive:
+ all_id = set(srv.id for srv in conn.nova.servers.list())
+ still_alive = still_alive.intersection(all_id)
+
+ if still_alive:
+ logger.warning("Failed to remove servers {}. ".format(",".join(still_alive)) +
+ "You, probably, need to remove them manually (and volumes as well)")
+ return
+
+ if volumes_to_delete:
+ logger.debug("Deleting volumes")
+
+ # wait till vm actually deleted
+
+ # logger.warning("Volume deletion commented out")
+ for vol in volumes_to_delete:
+ logger.debug("Deleting volume " + vol.display_name)
+ conn.cinder.volumes.delete(vol)
+
+ logger.debug("Clearing complete (yet some volumes may still be deleting)")
+ except Exception:
+ logger.exception("During removing servers. " +
+ "You, probably, need to remove them manually")