refactoring and typing in progress
diff --git a/wally/start_vms.py b/wally/start_vms.py
index 075f348..af81463 100644
--- a/wally/start_vms.py
+++ b/wally/start_vms.py
@@ -2,22 +2,28 @@
import os
import stat
import time
-import urllib
import os.path
import logging
-
-from typing import Dict, Any, Iterable, Generator, NamedTuple
+import tempfile
+import subprocess
+import urllib.request
+from typing import Dict, Any, Iterable, Iterator, NamedTuple, Optional, List, Tuple
from concurrent.futures import ThreadPoolExecutor
+from keystoneauth1 import loading, session
from novaclient.exceptions import NotFound
-from novaclient.client import Client as n_client
-from cinderclient.v1.client import Client as c_client
+from novaclient.client import Client as NovaClient
+from cinderclient.client import Client as CinderClient
+from glanceclient import Client as GlanceClient
-from .inode import NodeInfo
+
+from .utils import Timeout
+from .node_interfaces import NodeInfo
+
__doc__ = """
Module used to reliably spawn set of VM's, evenly distributed across
-openstack cluster. Main functions:
+compute servers in openstack cluster. Main functions:
get_OS_credentials - extract openstack credentials from different sources
nova_connect - connect to nova api
@@ -32,15 +38,6 @@
logger = logging.getLogger("wally.vms")
-STORED_OPENSTACK_CREDS = None
-NOVA_CONNECTION = None
-CINDER_CONNECTION = None
-
-
-def is_connected() -> bool:
- return NOVA_CONNECTION is not None
-
-
OSCreds = NamedTuple("OSCreds",
[("name", str),
("passwd", str),
@@ -49,57 +46,41 @@
("insecure", bool)])
-def ostack_get_creds() -> OSCreds:
- if STORED_OPENSTACK_CREDS is None:
- is_insecure = \
- os.environ.get('OS_INSECURE', 'False').lower() in ('true', 'yes')
- return OSCreds(os.environ.get('OS_USERNAME'),
- os.environ.get('OS_PASSWORD'),
- os.environ.get('OS_TENANT_NAME'),
- os.environ.get('OS_AUTH_URL'),
- is_insecure)
- else:
- return STORED_OPENSTACK_CREDS
+# TODO(koder): should correctly process different sources, not only env????
+def get_openstack_credentials() -> OSCreds:
+ is_insecure = os.environ.get('OS_INSECURE', 'false').lower() in ('true', 'yes')
+
+ return OSCreds(os.environ.get('OS_USERNAME'),
+ os.environ.get('OS_PASSWORD'),
+ os.environ.get('OS_TENANT_NAME'),
+ os.environ.get('OS_AUTH_URL'),
+ is_insecure)
-def nova_connect(os_creds: OSCreds=None) -> n_client:
- global NOVA_CONNECTION
- global STORED_OPENSTACK_CREDS
-
- if NOVA_CONNECTION is None:
- if os_creds is None:
- os_creds = ostack_get_creds()
- else:
- STORED_OPENSTACK_CREDS = os_creds
-
- NOVA_CONNECTION = n_client('1.1',
- os_creds.name,
- os_creds.passwd,
- os_creds.tenant,
- os_creds.auth_url,
- insecure=os_creds.insecure)
- return NOVA_CONNECTION
+class OSConnection:
+ def __init__(self, nova: NovaClient, cinder: CinderClient, glance: GlanceClient) -> None:
+ self.nova = nova
+ self.cinder = cinder
+ self.glance = glance
-def cinder_connect(os_creds: OSCreds=None) -> c_client:
- global CINDER_CONNECTION
- global STORED_OPENSTACK_CREDS
+def os_connect(os_creds: OSCreds, version: str = "2") -> OSConnection:
+ loader = loading.get_plugin_loader('password')
+ auth = loader.load_from_options(auth_url=os_creds.auth_url,
+ username=os_creds.name,
+ password=os_creds.passwd,
+ project_id=os_creds.tenant)
+ auth_sess = session.Session(auth=auth)
- if CINDER_CONNECTION is None:
- if os_creds is None:
- os_creds = ostack_get_creds()
- else:
- STORED_OPENSTACK_CREDS = os_creds
- CINDER_CONNECTION = c_client(os_creds.name,
- os_creds.passwd,
- os_creds.tenant,
- os_creds.auth_url,
- insecure=os_creds.insecure)
- return CINDER_CONNECTION
+ glance = GlanceClient(version, session=auth_sess)
+ nova = NovaClient(version, session=auth_sess)
+ cinder = CinderClient(os_creds.name, os_creds.passwd, os_creds.tenant, os_creds.auth_url,
+ insecure=os_creds.insecure, api_version=version)
+ return OSConnection(nova, cinder, glance)
-def find_vms(nova: n_client, name_prefix: str) -> Iterable[str, int]:
- for srv in nova.servers.list():
+def find_vms(conn: OSConnection, name_prefix: str) -> Iterable[str, int]:
+ for srv in conn.nova.servers.list():
if srv.name.startswith(name_prefix):
for ips in srv.addresses.values():
for ip in ips:
@@ -108,43 +89,33 @@
break
-def pause(ids: Iterable[str]) -> None:
- def pause_vm(conn: n_client, vm_id: str) -> None:
- vm = conn.servers.get(vm_id)
+def pause(conn: OSConnection, ids: Iterable[int], executor: ThreadPoolExecutor) -> None:
+ def pause_vm(vm_id: str) -> None:
+ vm = conn.nova.servers.get(vm_id)
if vm.status == 'ACTIVE':
vm.pause()
- conn = nova_connect()
- with ThreadPoolExecutor(max_workers=16) as executor:
- futures = [executor.submit(pause_vm, conn, vm_id)
- for vm_id in ids]
- for future in futures:
- future.result()
+ for future in executor.map(pause_vm, ids):
+ future.result()
-def unpause(ids: Iterable[str], max_resume_time=10) -> None:
- def unpause(conn: n_client, vm_id: str) -> None:
- vm = conn.servers.get(vm_id)
+def unpause(conn: OSConnection, ids: Iterable[int], executor: ThreadPoolExecutor, max_resume_time=10) -> None:
+ def unpause(vm_id: str) -> None:
+ vm = conn.nova.servers.get(vm_id)
if vm.status == 'PAUSED':
vm.unpause()
- for i in range(max_resume_time * 10):
- vm = conn.servers.get(vm_id)
+ for _ in Timeout(max_resume_time):
+ vm = conn.nova.servers.get(vm_id)
if vm.status != 'PAUSED':
return
- time.sleep(0.1)
raise RuntimeError("Can't unpause vm {0}".format(vm_id))
- conn = nova_connect()
- with ThreadPoolExecutor(max_workers=16) as executor:
- futures = [executor.submit(unpause, conn, vm_id)
- for vm_id in ids]
-
- for future in futures:
- future.result()
+ for future in executor.map(unpause, ids):
+ future.result()
-def prepare_os(nova: n_client, params: Dict[str, Any], os_creds: OSCreds) -> None:
+def prepare_os(conn: OSConnection, params: Dict[str, Any], max_vm_per_node: int = 8) -> None:
"""prepare openstack for futher usage
Creates server groups, security rules, keypair, flavor
@@ -153,7 +124,7 @@
Don't check, that existing object has required attributes
params:
- nova: novaclient connection
+ nova: OSConnection
params: dict {
security_group:str - security group name with allowed ssh and ping
aa_group_name:str - template for anti-affinity group names. Should
@@ -174,30 +145,18 @@
max_vm_per_compute: int=8 maximum expected amount of VM, per
compute host. Used to create appropriate
count of server groups for even placement
-
- returns: None
"""
- allow_ssh(nova, params['security_group'])
+ allow_ssh_and_ping(conn, params['security_group'])
- MAX_VM_PER_NODE = 8
- serv_groups = map(params['aa_group_name'].format,
- range(MAX_VM_PER_NODE))
+ for idx in range(max_vm_per_node):
+ get_or_create_aa_group(conn, params['aa_group_name'].format(idx))
- for serv_groups in serv_groups:
- get_or_create_aa_group(nova, serv_groups)
-
- create_keypair(nova,
- params['keypair_name'],
- params['keypair_file_public'],
- params['keypair_file_private'])
-
- create_image(os_creds, nova, params['image']['name'],
- params['image']['url'])
-
- create_flavor(nova, **params['flavor'])
+ create_keypair(conn, params['keypair_name'], params['keypair_file_public'], params['keypair_file_private'])
+ create_image(conn, params['image']['name'], params['image']['url'])
+ create_flavor(conn, **params['flavor'])
-def create_keypair(nova: n_client, name: str, pub_key_path: str, priv_key_path: str):
+def create_keypair(conn: OSConnection, name: str, pub_key_path: str, priv_key_path: str):
"""create and upload keypair into nova, if doesn't exists yet
Create and upload keypair into nova, if keypair with given bane
@@ -205,19 +164,17 @@
create new keys, and store'em into files.
parameters:
- nova: nova connection
+ conn: OSConnection
name: str - ketpair name
pub_key_path: str - path for public key
priv_key_path: str - path for private key
-
- returns: None
"""
pub_key_exists = os.path.exists(pub_key_path)
priv_key_exists = os.path.exists(priv_key_path)
try:
- kpair = nova.keypairs.find(name=name)
+ kpair = conn.nova.keypairs.find(name=name)
# if file not found- delete and recreate
except NotFound:
kpair = None
@@ -231,9 +188,9 @@
if kpair is None:
if pub_key_exists:
with open(pub_key_path) as pub_key_fd:
- return nova.keypairs.create(name, pub_key_fd.read())
+ return conn.nova.keypairs.create(name, pub_key_fd.read())
else:
- key = nova.keypairs.create(name)
+ key = conn.nova.keypairs.create(name)
with open(priv_key_path, "w") as priv_key_fd:
priv_key_fd.write(key.private_key)
@@ -248,54 +205,50 @@
" or remove key from openstack")
-def get_or_create_aa_group(nova: n_client, name: str) -> int:
+def get_or_create_aa_group(conn: OSConnection, name: str) -> int:
"""create anti-affinity server group, if doesn't exists yet
parameters:
- nova: nova connection
+ conn: OSConnection
name: str - group name
returns: str - group id
"""
try:
- group = nova.server_groups.find(name=name)
+ return conn.nova.server_groups.find(name=name).id
except NotFound:
- group = nova.server_groups.create(name=name,
- policies=['anti-affinity'])
-
- return group.id
+ return conn.nova.server_groups.create(name=name, policies=['anti-affinity']).id
-def allow_ssh(nova: n_client, group_name: str) -> int:
+def allow_ssh_and_ping(conn: OSConnection, group_name: str) -> int:
"""create sequrity group for ping and ssh
parameters:
- nova: nova connection
+ conn:
group_name: str - group name
returns: str - group id
"""
try:
- secgroup = nova.security_groups.find(name=group_name)
+ secgroup = conn.nova.security_groups.find(name=group_name)
except NotFound:
- secgroup = nova.security_groups.create(group_name,
- "allow ssh/ping to node")
+ secgroup = conn.nova.security_groups.create(group_name, "allow ssh/ping to node")
- nova.security_group_rules.create(secgroup.id,
- ip_protocol="tcp",
- from_port="22",
- to_port="22",
- cidr="0.0.0.0/0")
+ conn.nova.security_group_rules.create(secgroup.id,
+ ip_protocol="tcp",
+ from_port="22",
+ to_port="22",
+ cidr="0.0.0.0/0")
- nova.security_group_rules.create(secgroup.id,
- ip_protocol="icmp",
- from_port=-1,
- cidr="0.0.0.0/0",
- to_port=-1)
+ conn.nova.security_group_rules.create(secgroup.id,
+ ip_protocol="icmp",
+ from_port=-1,
+ cidr="0.0.0.0/0",
+ to_port=-1)
return secgroup.id
-def create_image(nova: n_client, os_creds: OSCreds, name: str, url: str):
+def create_image(conn: OSConnection, name: str, url: str) -> None:
"""upload image into glance from given URL, if given image doesn't exisis yet
parameters:
@@ -308,33 +261,31 @@
returns: None
"""
try:
- nova.images.find(name=name)
+ conn.nova.images.find(name=name)
return
except NotFound:
pass
- tempnam = os.tempnam()
+ ok = False
+ with tempfile.NamedTemporaryFile() as temp_fd:
+ try:
+ cmd = "wget --dns-timeout=30 --connect-timeout=30 --read-timeout=30 -o {} {}"
+ subprocess.check_call(cmd.format(temp_fd.name, url))
+ ok = True
- try:
- urllib.urlretrieve(url, tempnam)
+ # TODO(koder): add proper error handling
+ except Exception:
+ pass
- cmd = "OS_USERNAME={0.name}"
- cmd += " OS_PASSWORD={0.passwd}"
- cmd += " OS_TENANT_NAME={0.tenant}"
- cmd += " OS_AUTH_URL={0.auth_url}"
- cmd += " glance {1} image-create --name {2} $opts --file {3}"
- cmd += " --disk-format qcow2 --container-format bare --is-public true"
+ if not ok:
+ urllib.request.urlretrieve(url, temp_fd.name)
- cmd = cmd.format(os_creds,
- '--insecure' if os_creds.insecure else "",
- name,
- tempnam)
- finally:
- if os.path.exists(tempnam):
- os.unlink(tempnam)
+ image = conn.glance.images.create(name=name)
+ with open(temp_fd.name, 'rb') as fd:
+ conn.glance.images.upload(image.id, fd)
-def create_flavor(nova: n_client, name: str, ram_size: int, hdd_size: int, cpu_count: int):
+def create_flavor(conn: OSConnection, name: str, ram_size: int, hdd_size: int, cpu_count: int) -> None:
"""create flavor, if doesn't exisis yet
parameters:
@@ -347,17 +298,16 @@
returns: None
"""
try:
- nova.flavors.find(name)
+ conn.nova.flavors.find(name)
return
except NotFound:
pass
- nova.flavors.create(name, cpu_count, ram_size, hdd_size)
+ conn.nova.flavors.create(name, cpu_count, ram_size, hdd_size)
-def create_volume(size: int, name: str):
- cinder = cinder_connect()
- vol = cinder.volumes.create(size=size, display_name=name)
+def create_volume(conn: OSConnection, size: int, name: str) -> Any:
+ vol = conn.cinder.volumes.create(size=size, display_name=name)
err_count = 0
while vol.status != 'available':
@@ -367,16 +317,16 @@
raise RuntimeError("Fail to create volume")
else:
err_count += 1
- cinder.volumes.delete(vol)
+ conn.cinder.volumes.delete(vol)
time.sleep(1)
- vol = cinder.volumes.create(size=size, display_name=name)
+ vol = conn.cinder.volumes.create(size=size, display_name=name)
continue
time.sleep(1)
- vol = cinder.volumes.get(vol.id)
+ vol = conn.cinder.volumes.get(vol.id)
return vol
-def wait_for_server_active(nova: n_client, server, timeout: int=300)-> None:
+def wait_for_server_active(conn: OSConnection, server: Any, timeout: int = 300)-> None:
"""waiting till server became active
parameters:
@@ -387,29 +337,25 @@
returns: None
"""
- t = time.time()
- while True:
- time.sleep(1)
- sstate = getattr(server, 'OS-EXT-STS:vm_state').lower()
+ for _ in Timeout(timeout, no_exc=True):
+ server_state = getattr(server, 'OS-EXT-STS:vm_state').lower()
- if sstate == 'active':
+ if server_state == 'active':
return True
- if sstate == 'error':
+ if server_state == 'error':
return False
- if time.time() - t > timeout:
- return False
-
- server = nova.servers.get(server)
+ server = conn.nova.servers.get(server)
+ return False
class Allocate(object):
pass
-def get_floating_ips(nova, pool, amount):
- """allocate flationg ips
+def get_floating_ips(conn: OSConnection, pool: Optional[str], amount: int) -> List[str]:
+ """allocate floating ips
parameters:
nova: nova connection
@@ -418,7 +364,7 @@
returns: [ip object]
"""
- ip_list = nova.floating_ips.list()
+ ip_list = conn.nova.floating_ips.list()
if pool is not None:
ip_list = [ip for ip in ip_list if ip.pool == pool]
@@ -426,7 +372,10 @@
return [ip for ip in ip_list if ip.instance_id is None][:amount]
-def launch_vms(nova, params, already_has_count=0) -> Iterator[NodeInfo]:
+def launch_vms(conn: OSConnection,
+ params: Dict[str, Any],
+ executor: ThreadPoolExecutor,
+ already_has_count: int = 0) -> Iterator[NodeInfo]:
"""launch virtual servers
Parameters:
@@ -454,13 +403,12 @@
already_has_count: int=0 - how many servers already exists. Used to distribute
new servers evenly across all compute nodes, taking
old server in accout
- returns: generator of str - server credentials, in format USER@IP:KEY_PATH
+ returns: generator of NodeInfo - server credentials, in format USER@IP:KEY_PATH
"""
logger.debug("Calculating new vm count")
- count = params['count']
- nova = nova_connect()
- lst = nova.services.list(binary='nova-compute')
+ count = params['count'] # type: int
+ lst = conn.nova.services.list(binary='nova-compute')
srv_count = len([srv for srv in lst if srv.status == 'enabled'])
if isinstance(count, str):
@@ -500,12 +448,14 @@
private_key_path = params['keypair_file_private']
creds = params['image']['creds']
- for ip, os_node in create_vms_mt(NOVA_CONNECTION, count, **vm_params):
+ for ip, os_node in create_vms_mt(conn, count, executor, **vm_params):
conn_uri = creds.format(ip=ip, private_key_path=private_key_path)
- yield NodeInfo(conn_uri, []), os_node.id
+ info = NodeInfo(conn_uri, set())
+ info.os_vm_id = os_node.id
+ yield info
-def get_free_server_grpoups(nova, template):
+def get_free_server_groups(conn: OSConnection, template: str) -> Iterator[str]:
"""get fre server groups, that match given name template
parameters:
@@ -515,113 +465,117 @@
returns: generator or str - server group names
"""
- for g in nova.server_groups.list():
- if g.members == []:
- if re.match(template, g.name):
- yield str(g.id)
+ for server_group in conn.nova.server_groups.list():
+ if not server_group.members:
+ if re.match(template, server_group.name):
+ yield str(server_group.id)
-def create_vms_mt(nova, amount, group_name, keypair_name, img_name,
- flavor_name, vol_sz=None, network_zone_name=None,
- flt_ip_pool=None, name_templ='wally-{id}',
- scheduler_hints=None, security_group=None,
- sec_group_size=None):
+def create_vms_mt(conn: OSConnection,
+ amount: int,
+ executor: ThreadPoolExecutor,
+ group_name: str,
+ keypair_name: str,
+ img_name: str,
+ flavor_name: str,
+ vol_sz: int = None,
+ network_zone_name: str = None,
+ flt_ip_pool: str = None,
+ name_templ: str ='wally-{id}',
+ scheduler_hints: Dict = None,
+ security_group: str = None,
+ sec_group_size: int = None) -> List[Tuple[str, Any]]:
- with ThreadPoolExecutor(max_workers=16) as executor:
- if network_zone_name is not None:
- network_future = executor.submit(nova.networks.find,
- label=network_zone_name)
- else:
- network_future = None
+ if network_zone_name is not None:
+ network_future = executor.submit(conn.nova.networks.find,
+ label=network_zone_name)
+ else:
+ network_future = None
- fl_future = executor.submit(nova.flavors.find, name=flavor_name)
- img_future = executor.submit(nova.images.find, name=img_name)
+ fl_future = executor.submit(conn.nova.flavors.find, name=flavor_name)
+ img_future = executor.submit(conn.nova.images.find, name=img_name)
- if flt_ip_pool is not None:
- ips_future = executor.submit(get_floating_ips,
- nova, flt_ip_pool, amount)
- logger.debug("Wait for floating ip")
- ips = ips_future.result()
- ips += [Allocate] * (amount - len(ips))
- else:
- ips = [None] * amount
+ if flt_ip_pool is not None:
+ ips_future = executor.submit(get_floating_ips,
+ conn, flt_ip_pool, amount)
+ logger.debug("Wait for floating ip")
+ ips = ips_future.result()
+ ips += [Allocate] * (amount - len(ips))
+ else:
+ ips = [None] * amount
- logger.debug("Getting flavor object")
- fl = fl_future.result()
- logger.debug("Getting image object")
- img = img_future.result()
+ logger.debug("Getting flavor object")
+ fl = fl_future.result()
+ logger.debug("Getting image object")
+ img = img_future.result()
- if network_future is not None:
- logger.debug("Waiting for network results")
- nics = [{'net-id': network_future.result().id}]
- else:
- nics = None
+ if network_future is not None:
+ logger.debug("Waiting for network results")
+ nics = [{'net-id': network_future.result().id}]
+ else:
+ nics = None
- names = []
- for i in range(amount):
- names.append(name_templ.format(group=group_name, id=i))
+ names = [] # type: List[str]
+ for i in range(amount):
+ names.append(name_templ.format(group=group_name, id=i))
- futures = []
- logger.debug("Requesting new vm's")
+ futures = []
+ logger.debug("Requesting new vm's")
- orig_scheduler_hints = scheduler_hints.copy()
+ orig_scheduler_hints = scheduler_hints.copy()
+ group_name_template = scheduler_hints['group'].format("\\d+")
+ groups = list(get_free_server_groups(conn, group_name_template + "$"))
+ groups.sort()
- MAX_SHED_GROUPS = 32
- for start_idx in range(MAX_SHED_GROUPS):
- pass
+ for idx, (name, flt_ip) in enumerate(zip(names, ips), 2):
- group_name_template = scheduler_hints['group'].format("\\d+")
- groups = list(get_free_server_grpoups(nova, group_name_template + "$"))
- groups.sort()
-
- for idx, (name, flt_ip) in enumerate(zip(names, ips), 2):
-
- scheduler_hints = None
- if orig_scheduler_hints is not None and sec_group_size is not None:
- if "group" in orig_scheduler_hints:
- scheduler_hints = orig_scheduler_hints.copy()
- scheduler_hints['group'] = groups[idx // sec_group_size]
-
- if scheduler_hints is None:
+ scheduler_hints = None
+ if orig_scheduler_hints is not None and sec_group_size is not None:
+ if "group" in orig_scheduler_hints:
scheduler_hints = orig_scheduler_hints.copy()
+ scheduler_hints['group'] = groups[idx // sec_group_size]
- params = (nova, name, keypair_name, img, fl,
- nics, vol_sz, flt_ip, scheduler_hints,
- flt_ip_pool, [security_group])
+ if scheduler_hints is None:
+ scheduler_hints = orig_scheduler_hints.copy()
- futures.append(executor.submit(create_vm, *params))
- res = [future.result() for future in futures]
- logger.debug("Done spawning")
- return res
+ params = (conn, name, keypair_name, img, fl,
+ nics, vol_sz, flt_ip, scheduler_hints,
+ flt_ip_pool, [security_group])
+
+ futures.append(executor.submit(create_vm, *params))
+ res = [future.result() for future in futures]
+ logger.debug("Done spawning")
+ return res
-def create_vm(nova, name, keypair_name, img,
- fl, nics, vol_sz=None,
- flt_ip=False,
- scheduler_hints=None,
- pool=None,
- security_groups=None):
- for i in range(3):
- srv = nova.servers.create(name,
- flavor=fl,
- image=img,
- nics=nics,
- key_name=keypair_name,
- scheduler_hints=scheduler_hints,
- security_groups=security_groups)
+def create_vm(conn: OSConnection,
+ name: str,
+ keypair_name: str,
+ img: Any,
+ flavor: Any,
+ nics: List,
+ vol_sz: int = None,
+ flt_ip: Any = False,
+ scheduler_hints: Dict = None,
+ pool: str = None,
+ security_groups=None,
+ max_retry: int = 3,
+ delete_timeout: int = 120) -> Tuple[str, Any]:
- if not wait_for_server_active(nova, srv):
+ # make mypy/pylint happy
+ srv = None # type: Any
+ for i in range(max_retry):
+ srv = conn.nova.servers.create(name, flavor=flavor, image=img, nics=nics, key_name=keypair_name,
+ scheduler_hints=scheduler_hints, security_groups=security_groups)
+
+ if not wait_for_server_active(conn, srv):
msg = "Server {0} fails to start. Kill it and try again"
logger.debug(msg.format(srv))
- nova.servers.delete(srv)
+ conn.nova.servers.delete(srv)
try:
- for j in range(120):
- srv = nova.servers.get(srv.id)
- time.sleep(1)
- else:
- msg = "Server {0} delete timeout".format(srv.id)
- raise RuntimeError(msg)
+ for _ in Timeout(delete_timeout, "Server {0} delete timeout".format(srv.id)):
+ srv = conn.nova.servers.get(srv.id)
except NotFound:
pass
else:
@@ -630,27 +584,22 @@
raise RuntimeError("Failed to start server".format(srv.id))
if vol_sz is not None:
- vol = create_volume(vol_sz, name)
- nova.volumes.create_server_volume(srv.id, vol.id, None)
+ vol = create_volume(conn, vol_sz, name)
+ conn.nova.volumes.create_server_volume(srv.id, vol.id, None)
if flt_ip is Allocate:
- flt_ip = nova.floating_ips.create(pool)
+ flt_ip = conn.nova.floating_ips.create(pool)
if flt_ip is not None:
srv.add_floating_ip(flt_ip)
- return flt_ip.ip, nova.servers.get(srv.id)
+ return flt_ip.ip, conn.nova.servers.get(srv.id)
-def clear_nodes(nodes_ids):
- clear_all(NOVA_CONNECTION, nodes_ids, None)
-
-
-MAX_SERVER_DELETE_TIME = 120
-
-
-def clear_all(nova, ids=None, name_templ=None,
- max_server_delete_time=MAX_SERVER_DELETE_TIME):
+def clear_nodes(conn: OSConnection,
+ ids: List[int] = None,
+ name_templ: str = None,
+ max_server_delete_time: int = 120):
try:
def need_delete(srv):
if name_templ is not None:
@@ -659,43 +608,42 @@
return srv.id in ids
volumes_to_delete = []
- cinder = cinder_connect()
- for vol in cinder.volumes.list():
+ for vol in conn.cinder.volumes.list():
for attachment in vol.attachments:
if attachment['server_id'] in ids:
volumes_to_delete.append(vol)
break
- deleted_srvs = set()
- for srv in nova.servers.list():
+ still_alive = set()
+ for srv in conn.nova.servers.list():
if need_delete(srv):
logger.debug("Deleting server {0}".format(srv.name))
- nova.servers.delete(srv)
- deleted_srvs.add(srv.id)
+ conn.nova.servers.delete(srv)
+ still_alive.add(srv.id)
- count = 0
- while count < max_server_delete_time:
- if count % 60 == 0:
- logger.debug("Waiting till all servers are actually deleted")
- all_id = set(srv.id for srv in nova.servers.list())
- if len(all_id.intersection(deleted_srvs)) == 0:
- break
- count += 1
- time.sleep(1)
- else:
- logger.warning("Failed to remove servers. " +
- "You, probably, need to remove them manually")
- return
- logger.debug("Done, deleting volumes")
+ if still_alive:
+ logger.debug("Waiting till all servers are actually deleted")
+ tout = Timeout(max_server_delete_time, no_exc=True)
+ while tout.tick() and still_alive:
+ all_id = set(srv.id for srv in conn.nova.servers.list())
+ still_alive = still_alive.intersection(all_id)
- # wait till vm actually deleted
+ if still_alive:
+ logger.warning("Failed to remove servers {}. ".format(",".join(still_alive)) +
+ "You, probably, need to remove them manually (and volumes as well)")
+ return
- # logger.warning("Volume deletion commented out")
- for vol in volumes_to_delete:
- logger.debug("Deleting volume " + vol.display_name)
- cinder.volumes.delete(vol)
+ if volumes_to_delete:
+ logger.debug("Deleting volumes")
- logger.debug("Clearing done (yet some volumes may still deleting)")
- except:
+ # wait till vm actually deleted
+
+ # logger.warning("Volume deletion commented out")
+ for vol in volumes_to_delete:
+ logger.debug("Deleting volume " + vol.display_name)
+ conn.cinder.volumes.delete(vol)
+
+ logger.debug("Clearing complete (yet some volumes may still be deleting)")
+ except Exception:
logger.exception("During removing servers. " +
"You, probably, need to remove them manually")