2.0 refactoring:
* Add type for most of functions
* Remove old fio run code, move to RPC/pluggable
* Remove most of sensors code, will move then to RPC
* Other refactoring
diff --git a/wally/fuel_rest_api.py b/wally/fuel_rest_api.py
index 728ac94..8799ed2 100644
--- a/wally/fuel_rest_api.py
+++ b/wally/fuel_rest_api.py
@@ -2,8 +2,9 @@
import json
import time
import logging
-import urllib2
-import urlparse
+import urllib.request
+import urllib.parse
+from typing import Dict, Any
from functools import partial, wraps
import netaddr
@@ -15,14 +16,14 @@
logger = logging.getLogger("wally.fuel_api")
-class Urllib2HTTP(object):
+class Urllib2HTTP:
"""
class for making HTTP requests
"""
allowed_methods = ('get', 'put', 'post', 'delete', 'patch', 'head')
- def __init__(self, root_url, headers=None):
+ def __init__(self, root_url: str, headers: Dict[str, str]=None):
"""
"""
if root_url.endswith('/'):
@@ -32,10 +33,10 @@
self.headers = headers if headers is not None else {}
- def host(self):
+ def host(self) -> str:
return self.root_url.split('/')[2]
- def do(self, method, path, params=None):
+ def do(self, method: str, path: str, params: Dict[Any, Any]=None):
if path.startswith('/'):
url = self.root_url + path
else:
@@ -49,14 +50,14 @@
logger.debug("HTTP: {0} {1}".format(method.upper(), url))
- request = urllib2.Request(url,
- data=data_json,
- headers=self.headers)
+ request = urllib.request.Request(url,
+ data=data_json,
+ headers=self.headers)
if data_json is not None:
request.add_header('Content-Type', 'application/json')
request.get_method = lambda: method.upper()
- response = urllib2.urlopen(request)
+ response = urllib.request.urlopen(request)
logger.debug("HTTP Responce: {0}".format(response.code))
@@ -70,16 +71,16 @@
return json.loads(content)
- def __getattr__(self, name):
+ def __getattr__(self, name: str):
if name in self.allowed_methods:
return partial(self.do, name)
raise AttributeError(name)
class KeystoneAuth(Urllib2HTTP):
- def __init__(self, root_url, creds, headers=None):
+ def __init__(self, root_url: str, creds: Dict[str, str], headers: Dict[str, str]=None):
super(KeystoneAuth, self).__init__(root_url, headers)
- admin_node_ip = urlparse.urlparse(root_url).hostname
+ admin_node_ip = urllib.parse.urlparse(root_url).hostname
self.keystone_url = "http://{0}:5000/v2.0".format(admin_node_ip)
self.keystone = keystoneclient(
auth_url=self.keystone_url, **creds)
@@ -95,11 +96,11 @@
'Cant establish connection to keystone with url %s',
self.keystone_url)
- def do(self, method, path, params=None):
+ def do(self, method: str, path: str, params: Dict[str, str]=None):
"""Do request. If gets 401 refresh token"""
try:
return super(KeystoneAuth, self).do(method, path, params)
- except urllib2.HTTPError as e:
+ except urllib.request.HTTPError as e:
if e.code == 401:
logger.warning(
'Authorization failure: {0}'.format(e.read()))
@@ -109,13 +110,13 @@
raise
-def get_inline_param_list(url):
+def get_inline_param_list(url: str):
format_param_rr = re.compile(r"\{([a-zA-Z_]+)\}")
for match in format_param_rr.finditer(url):
yield match.group(1)
-class RestObj(object):
+class RestObj:
name = None
id = None
@@ -136,7 +137,7 @@
return getattr(self, item)
-def make_call(method, url):
+def make_call(method: str, url: str):
def closure(obj, entire_obj=None, **data):
inline_params_vals = {}
for name in get_inline_param_list(url):
@@ -226,40 +227,6 @@
get_info = GET('/api/nodes/{id}')
get_interfaces = GET('/api/nodes/{id}/interfaces')
- update_interfaces = PUT('/api/nodes/{id}/interfaces')
-
- def set_network_assigment(self, mapping):
- """Assings networks to interfaces
- :param mapping: list (dict) interfaces info
- """
-
- curr_interfaces = self.get_interfaces()
-
- network_ids = {}
- for interface in curr_interfaces:
- for net in interface['assigned_networks']:
- network_ids[net['name']] = net['id']
-
- # transform mappings
- new_assigned_networks = {}
-
- for dev_name, networks in mapping.items():
- new_assigned_networks[dev_name] = []
- for net_name in networks:
- nnet = {'name': net_name, 'id': network_ids[net_name]}
- new_assigned_networks[dev_name].append(nnet)
-
- # update by ref
- for dev_descr in curr_interfaces:
- if dev_descr['name'] in new_assigned_networks:
- nass = new_assigned_networks[dev_descr['name']]
- dev_descr['assigned_networks'] = nass
-
- self.update_interfaces(curr_interfaces, id=self.id)
-
- def set_node_name(self, name):
- """Update node name"""
- self.__connection__.put('api/nodes', [{'id': self.id, 'name': name}])
def get_network_data(self):
"""Returns node network data"""
@@ -308,23 +275,9 @@
class Cluster(RestObj):
"""Class represents Cluster in Fuel"""
- add_node_call = PUT('api/nodes')
- start_deploy = PUT('api/clusters/{id}/changes')
get_status = GET('api/clusters/{id}')
- delete = DELETE('api/clusters/{id}')
- get_tasks_status = GET("api/tasks?cluster_id={id}")
- get_networks = GET(
- 'api/clusters/{id}/network_configuration/neutron')
-
- get_attributes = GET(
- 'api/clusters/{id}/attributes')
-
- set_attributes = PUT(
- 'api/clusters/{id}/attributes')
-
- configure_networks = PUT(
- 'api/clusters/{id}/network_configuration/{net_provider}')
-
+ get_networks = GET('api/clusters/{id}/network_configuration/neutron')
+ get_attributes = GET('api/clusters/{id}/attributes')
_get_nodes = GET('api/nodes?cluster_id={id}')
def __init__(self, *dt, **mp):
@@ -338,17 +291,16 @@
try:
self.get_status()
return True
- except urllib2.HTTPError as err:
+ except urllib.request.HTTPError as err:
if err.code == 404:
return False
raise
def get_openrc(self):
access = self.get_attributes()['editable']['access']
- creds = {}
- creds['username'] = access['user']['value']
- creds['password'] = access['password']['value']
- creds['tenant_name'] = access['tenant']['value']
+ creds = {'username': access['user']['value'],
+ 'password': access['password']['value'],
+ 'tenant_name': access['tenant']['value']}
version = FuelInfo(self.__connection__).get_version()
# only HTTPS since 7.0
@@ -365,78 +317,6 @@
for node_descr in self._get_nodes():
yield Node(self.__connection__, **node_descr)
- def add_node(self, node, roles, interfaces=None):
- """Add node to cluster
-
- :param node: Node object
- :param roles: roles to assign
- :param interfaces: mapping iface name to networks
- """
- data = {}
- data['pending_roles'] = roles
- data['cluster_id'] = self.id
- data['id'] = node.id
- data['pending_addition'] = True
-
- logger.debug("Adding node %s to cluster..." % node.id)
-
- self.add_node_call([data])
- self.nodes.append(node)
-
- if interfaces is not None:
- networks = {}
- for iface_name, params in interfaces.items():
- networks[iface_name] = params['networks']
-
- node.set_network_assigment(networks)
-
- def wait_operational(self, timeout):
- """Wait until cluster status operational"""
- def wo():
- status = self.get_status()['status']
- if status == "error":
- raise Exception("Cluster deploy failed")
- return self.get_status()['status'] == 'operational'
- with_timeout(timeout, "deploy cluster")(wo)()
-
- def deploy(self, timeout):
- """Start deploy and wait until all tasks finished"""
- logger.debug("Starting deploy...")
- self.start_deploy()
-
- self.wait_operational(timeout)
-
- def all_tasks_finished_ok(obj):
- ok = True
- for task in obj.get_tasks_status():
- if task['status'] == 'error':
- raise Exception('Task execution error')
- elif task['status'] != 'ready':
- ok = False
- return ok
-
- wto = with_timeout(timeout, "wait deployment finished")
- wto(all_tasks_finished_ok)(self)
-
- def set_networks(self, net_descriptions):
- """Update cluster networking parameters"""
- configuration = self.get_networks()
- current_networks = configuration['networks']
- parameters = configuration['networking_parameters']
-
- if net_descriptions.get('networks'):
- net_mapping = net_descriptions['networks']
-
- for net in current_networks:
- net_desc = net_mapping.get(net['name'])
- if net_desc:
- net.update(net_desc)
-
- if net_descriptions.get('networking_parameters'):
- parameters.update(net_descriptions['networking_parameters'])
-
- self.configure_networks(**configuration)
-
def reflect_cluster(conn, cluster_id):
"""Returns cluster object by id"""
@@ -465,80 +345,3 @@
raise ValueError("Cluster {0} not found".format(name))
-
-sections = {
- 'sahara': 'additional_components',
- 'murano': 'additional_components',
- 'ceilometer': 'additional_components',
- 'volumes_ceph': 'storage',
- 'images_ceph': 'storage',
- 'ephemeral_ceph': 'storage',
- 'objects_ceph': 'storage',
- 'osd_pool_size': 'storage',
- 'volumes_lvm': 'storage',
- 'volumes_vmdk': 'storage',
- 'tenant': 'access',
- 'password': 'access',
- 'user': 'access',
- 'vc_password': 'vcenter',
- 'cluster': 'vcenter',
- 'host_ip': 'vcenter',
- 'vc_user': 'vcenter',
- 'use_vcenter': 'vcenter',
-}
-
-
-def create_empty_cluster(conn, cluster_desc, debug_mode=False):
- """Create new cluster with configuration provided"""
-
- data = {}
- data['nodes'] = []
- data['tasks'] = []
- data['name'] = cluster_desc['name']
- data['release'] = cluster_desc['release']
- data['mode'] = cluster_desc.get('deployment_mode')
- data['net_provider'] = cluster_desc.get('net_provider')
-
- params = conn.post(path='/api/clusters', params=data)
- cluster = Cluster(conn, **params)
-
- attributes = cluster.get_attributes()
-
- ed_attrs = attributes['editable']
-
- ed_attrs['common']['libvirt_type']['value'] = \
- cluster_desc.get('libvirt_type', 'kvm')
-
- if 'nodes' in cluster_desc:
- use_ceph = cluster_desc['nodes'].get('ceph_osd', None) is not None
- else:
- use_ceph = False
-
- if 'storage_type' in cluster_desc:
- st = cluster_desc['storage_type']
- if st == 'ceph':
- use_ceph = True
- else:
- use_ceph = False
-
- if use_ceph:
- opts = ['ephemeral_ceph', 'images_ceph', 'images_vcenter']
- opts += ['iser', 'objects_ceph', 'volumes_ceph']
- opts += ['volumes_lvm', 'volumes_vmdk']
-
- for name in opts:
- val = ed_attrs['storage'][name]
- if val['type'] == 'checkbox':
- is_ceph = ('images_ceph' == name)
- is_ceph = is_ceph or ('volumes_ceph' == name)
-
- if is_ceph:
- val['value'] = True
- else:
- val['value'] = False
- # else:
- # raise NotImplementedError("Non-ceph storages are not implemented")
-
- cluster.set_attributes(attributes)
-
- return cluster