blob: ee5cd9cb05f46c0a6d41758d477b43b365ff872b [file] [log] [blame]
Yulia Portnova3556a062015-03-17 16:30:11 +02001import re
2import json
3import time
4import urllib2
5
6from functools import partial, wraps
7import urlparse
8
9import netaddr
10
11from keystoneclient.v2_0 import Client as keystoneclient
12from keystoneclient import exceptions
13
14
15logger = None
16
17
18def set_logger(log):
19 global logger
20 logger = log
21
22
23class Urllib2HTTP(object):
24 """
25 class for making HTTP requests
26 """
27
28 allowed_methods = ('get', 'put', 'post', 'delete', 'patch', 'head')
29
30 def __init__(self, root_url, headers=None, echo=False):
31 """
32 """
33 if root_url.endswith('/'):
34 self.root_url = root_url[:-1]
35 else:
36 self.root_url = root_url
37
38 self.headers = headers if headers is not None else {}
39 self.echo = echo
40
41 def host(self):
42 return self.root_url.split('/')[2]
43
44 def do(self, method, path, params=None):
45 if path.startswith('/'):
46 url = self.root_url + path
47 else:
48 url = self.root_url + '/' + path
49
50 if method == 'get':
51 assert params == {} or params is None
52 data_json = None
53 else:
54 data_json = json.dumps(params)
55
56 if self.echo and logger is not None:
57 logger.debug("HTTP: {} {}".format(method.upper(), url))
58
59 request = urllib2.Request(url,
60 data=data_json,
61 headers=self.headers)
62 if data_json is not None:
63 request.add_header('Content-Type', 'application/json')
64
65 request.get_method = lambda: method.upper()
66 response = urllib2.urlopen(request)
67
68 if self.echo and logger is not None:
69 logger.debug("HTTP Responce: {}".format(response.code))
70
71 if response.code < 200 or response.code > 209:
72 raise IndexError(url)
73
74 content = response.read()
75
76 if '' == content:
77 return None
78
79 return json.loads(content)
80
81 def __getattr__(self, name):
82 if name in self.allowed_methods:
83 return partial(self.do, name)
84 raise AttributeError(name)
85
86
87class KeystoneAuth(Urllib2HTTP):
88 def __init__(self, root_url, creds, headers=None, echo=False):
89 super(KeystoneAuth, self).__init__(root_url, headers, echo)
90 admin_node_ip = urlparse.urlparse(root_url).hostname
91 self.keystone_url = "http://{0}:5000/v2.0".format(admin_node_ip)
92 self.keystone = keystoneclient(
93 auth_url=self.keystone_url, **creds)
94 self.refresh_token()
95
96 def refresh_token(self):
97 """Get new token from keystone and update headers"""
98 try:
99 self.keystone.authenticate()
100 self.headers['X-Auth-Token'] = self.keystone.auth_token
101 except exceptions.AuthorizationFailure:
102 if logger is not None:
103 logger.warning(
104 'Cant establish connection to keystone with url %s',
105 self.keystone_url)
106
107 def do(self, method, path, params=None):
108 """Do request. If gets 401 refresh token"""
109 try:
110 return super(KeystoneAuth, self).do(method, path, params)
111 except urllib2.HTTPError as e:
112 if e.code == 401:
113 if logger is not None:
114 logger.warning(
115 'Authorization failure: {0}'.format(e.read()))
116 self.refresh_token()
117 return super(KeystoneAuth, self).do(method, path, params)
118 else:
119 raise
120
121
122def get_inline_param_list(url):
123 format_param_rr = re.compile(r"\{([a-zA-Z_]+)\}")
124 for match in format_param_rr.finditer(url):
125 yield match.group(1)
126
127
128class RestObj(object):
129 name = None
130 id = None
131
132 def __init__(self, conn, **kwargs):
133 self.__dict__.update(kwargs)
134 self.__connection__ = conn
135
136 def __str__(self):
137 res = ["{}({}):".format(self.__class__.__name__, self.name)]
138 for k, v in sorted(self.__dict__.items()):
139 if k.startswith('__') or k.endswith('__'):
140 continue
141 if k != 'name':
142 res.append(" {}={!r}".format(k, v))
143 return "\n".join(res)
144
145 def __getitem__(self, item):
146 return getattr(self, item)
147
148
149def make_call(method, url):
150 def closure(obj, entire_obj=None, **data):
151 inline_params_vals = {}
152 for name in get_inline_param_list(url):
153 if name in data:
154 inline_params_vals[name] = data[name]
155 del data[name]
156 else:
157 inline_params_vals[name] = getattr(obj, name)
158 result_url = url.format(**inline_params_vals)
159
160 if entire_obj is not None:
161 if data != {}:
162 raise ValueError("Both entire_obj and data provided")
163 data = entire_obj
164 return obj.__connection__.do(method, result_url, params=data)
165 return closure
166
167
168PUT = partial(make_call, 'put')
169GET = partial(make_call, 'get')
170DELETE = partial(make_call, 'delete')
171
172
173def with_timeout(tout, message):
174 def closure(func):
175 @wraps(func)
176 def closure2(*dt, **mp):
177 ctime = time.time()
178 etime = ctime + tout
179
180 while ctime < etime:
181 if func(*dt, **mp):
182 return
183 sleep_time = ctime + 1 - time.time()
184 if sleep_time > 0:
185 time.sleep(sleep_time)
186 ctime = time.time()
187 raise RuntimeError("Timeout during " + message)
188 return closure2
189 return closure
190
191
192# ------------------------------- ORM ----------------------------------------
193
194
195def get_fuel_info(url):
196 conn = Urllib2HTTP(url)
197 return FuelInfo(conn)
198
199
200class FuelInfo(RestObj):
201
202 """Class represents Fuel installation info"""
203
204 get_nodes = GET('api/nodes')
205 get_clusters = GET('api/clusters')
206 get_cluster = GET('api/clusters/{id}')
207
208 @property
209 def nodes(self):
210 """Get all fuel nodes"""
211 return NodeList([Node(self.__connection__, **node) for node
212 in self.get_nodes()])
213
214 @property
215 def free_nodes(self):
216 """Get unallocated nodes"""
217 return NodeList([Node(self.__connection__, **node) for node in
218 self.get_nodes() if not node['cluster']])
219
220 @property
221 def clusters(self):
222 """List clusters in fuel"""
223 return [Cluster(self.__connection__, **cluster) for cluster
224 in self.get_clusters()]
225
226
227class Node(RestObj):
228 """Represents node in Fuel"""
229
230 get_info = GET('/api/nodes/{id}')
231 get_interfaces = GET('/api/nodes/{id}/interfaces')
232 update_interfaces = PUT('/api/nodes/{id}/interfaces')
233
234 def set_network_assigment(self, mapping):
235 """Assings networks to interfaces
236 :param mapping: list (dict) interfaces info
237 """
238
239 curr_interfaces = self.get_interfaces()
240
241 network_ids = {}
242 for interface in curr_interfaces:
243 for net in interface['assigned_networks']:
244 network_ids[net['name']] = net['id']
245
246 # transform mappings
247 new_assigned_networks = {}
248
249 for dev_name, networks in mapping.items():
250 new_assigned_networks[dev_name] = []
251 for net_name in networks:
252 nnet = {'name': net_name, 'id': network_ids[net_name]}
253 new_assigned_networks[dev_name].append(nnet)
254
255 # update by ref
256 for dev_descr in curr_interfaces:
257 if dev_descr['name'] in new_assigned_networks:
258 nass = new_assigned_networks[dev_descr['name']]
259 dev_descr['assigned_networks'] = nass
260
261 self.update_interfaces(curr_interfaces, id=self.id)
262
263 def set_node_name(self, name):
264 """Update node name"""
265 self.__connection__.put('api/nodes', [{'id': self.id, 'name': name}])
266
267 def get_network_data(self):
268 """Returns node network data"""
269 node_info = self.get_info()
270 return node_info.get('network_data')
271
Yulia Portnova0e64ea22015-03-20 17:27:22 +0200272 def get_roles(self, pending=False):
Yulia Portnova3556a062015-03-17 16:30:11 +0200273 """Get node roles
274
275 Returns: (roles, pending_roles)
276 """
277 node_info = self.get_info()
Yulia Portnova0e64ea22015-03-20 17:27:22 +0200278 if pending:
279 return node_info.get('roles'), node_info.get('pending_roles')
280 else:
281 return node_info.get('roles')
Yulia Portnova3556a062015-03-17 16:30:11 +0200282
283 def get_ip(self, network='public'):
284 """Get node ip
285
286 :param network: network to pick
287 """
288 nets = self.get_network_data()
289 for net in nets:
290 if net['name'] == network:
291 iface_name = net['dev']
292 for iface in self.get_info()['meta']['interfaces']:
293 if iface['name'] == iface_name:
294 try:
295 return iface['ip']
296 except KeyError:
297 return netaddr.IPNetwork(net['ip']).ip
298 raise Exception('Network %s not found' % network)
299
300
301class NodeList(list):
302 """Class for filtering nodes through attributes"""
303 allowed_roles = ['controller', 'compute', 'cinder', 'ceph-osd', 'mongo',
304 'zabbix-server']
305
306 def __getattr__(self, name):
307 if name in self.allowed_roles:
308 return [node for node in self if name in node.roles]
309
310
311class Cluster(RestObj):
312 """Class represents Cluster in Fuel"""
313
314 add_node_call = PUT('api/nodes')
315 start_deploy = PUT('api/clusters/{id}/changes')
316 get_status = GET('api/clusters/{id}')
317 delete = DELETE('api/clusters/{id}')
318 get_tasks_status = GET("api/tasks?cluster_id={id}")
319 get_networks = GET(
320 'api/clusters/{id}/network_configuration/{net_provider}')
321
322 get_attributes = GET(
323 'api/clusters/{id}/attributes')
324
325 set_attributes = PUT(
326 'api/clusters/{id}/attributes')
327
328 configure_networks = PUT(
329 'api/clusters/{id}/network_configuration/{net_provider}')
330
331 _get_nodes = GET('api/nodes?cluster_id={id}')
332
333 def __init__(self, *dt, **mp):
334 super(Cluster, self).__init__(*dt, **mp)
Yulia Portnovad9767042015-04-10 17:32:06 +0300335 self.nodes = NodeList([Node(self.__connection__, **node) for node in
336 self._get_nodes()])
Yulia Portnova3556a062015-03-17 16:30:11 +0200337 self.network_roles = {}
338
339 def check_exists(self):
340 """Check if cluster exists"""
341 try:
342 self.get_status()
343 return True
344 except urllib2.HTTPError as err:
345 if err.code == 404:
346 return False
347 raise
348
Yulia Portnovad9767042015-04-10 17:32:06 +0300349 def get_openrc(self):
Yulia Portnova00025a52015-04-07 12:17:32 +0300350 access = self.get_attributes()['editable']['access']
351 creds = {}
352 creds['username'] = access['user']['value']
353 creds['password'] = access['password']['value']
354 creds['tenant_name'] = access['tenant']['value']
Yulia Portnovad9767042015-04-10 17:32:06 +0300355 if self.nodes.controller:
356 contr = self.nodes.controller[0]
357 creds['os_auth_url'] = "http://%s:5000/v2.0" \
358 % contr.get_ip(network="public")
359 else:
360 creds['os_auth_url'] = ""
Yulia Portnova00025a52015-04-07 12:17:32 +0300361 return creds
362
Yulia Portnova3556a062015-03-17 16:30:11 +0200363 def get_nodes(self):
364 for node_descr in self._get_nodes():
365 yield Node(self.__connection__, **node_descr)
366
367 def add_node(self, node, roles, interfaces=None):
368 """Add node to cluster
369
370 :param node: Node object
371 :param roles: roles to assign
372 :param interfaces: mapping iface name to networks
373 """
374 data = {}
375 data['pending_roles'] = roles
376 data['cluster_id'] = self.id
377 data['id'] = node.id
378 data['pending_addition'] = True
379
380 if logger is not None:
381 logger.debug("Adding node %s to cluster..." % node.id)
382
383 self.add_node_call([data])
384 self.nodes.append(node)
385
386 if interfaces is not None:
387 networks = {}
388 for iface_name, params in interfaces.items():
389 networks[iface_name] = params['networks']
390
391 node.set_network_assigment(networks)
392
393 def wait_operational(self, timeout):
394 """Wait until cluster status operational"""
395 def wo():
396 status = self.get_status()['status']
397 if status == "error":
398 raise Exception("Cluster deploy failed")
399 return self.get_status()['status'] == 'operational'
400 with_timeout(timeout, "deploy cluster")(wo)()
401
402 def deploy(self, timeout):
403 """Start deploy and wait until all tasks finished"""
404 logger.debug("Starting deploy...")
405 self.start_deploy()
406
407 self.wait_operational(timeout)
408
409 def all_tasks_finished_ok(obj):
410 ok = True
411 for task in obj.get_tasks_status():
412 if task['status'] == 'error':
413 raise Exception('Task execution error')
414 elif task['status'] != 'ready':
415 ok = False
416 return ok
417
418 wto = with_timeout(timeout, "wait deployment finished")
419 wto(all_tasks_finished_ok)(self)
420
421 def set_networks(self, net_descriptions):
422 """Update cluster networking parameters"""
423 configuration = self.get_networks()
424 current_networks = configuration['networks']
425 parameters = configuration['networking_parameters']
426
427 if net_descriptions.get('networks'):
428 net_mapping = net_descriptions['networks']
429
430 for net in current_networks:
431 net_desc = net_mapping.get(net['name'])
432 if net_desc:
433 net.update(net_desc)
434
435 if net_descriptions.get('networking_parameters'):
436 parameters.update(net_descriptions['networking_parameters'])
437
438 self.configure_networks(**configuration)
439
440
441def reflect_cluster(conn, cluster_id):
442 """Returns cluster object by id"""
443 c = Cluster(conn, id=cluster_id)
444 c.nodes = NodeList(list(c.get_nodes()))
445 return c
446
447
448def get_all_nodes(conn):
449 """Get all nodes from Fuel"""
450 for node_desc in conn.get('api/nodes'):
451 yield Node(conn, **node_desc)
452
453
454def get_all_clusters(conn):
455 """Get all clusters"""
456 for cluster_desc in conn.get('api/clusters'):
457 yield Cluster(conn, **cluster_desc)
458
459
koder aka kdanilovda45e882015-04-06 02:24:42 +0300460def get_cluster_id(conn, name):
Yulia Portnova3556a062015-03-17 16:30:11 +0200461 """Get cluster id by name"""
462 for cluster in get_all_clusters(conn):
463 if cluster.name == name:
464 if logger is not None:
465 logger.debug('cluster name is %s' % name)
466 logger.debug('cluster id is %s' % cluster.id)
467 return cluster.id
468
koder aka kdanilovda45e882015-04-06 02:24:42 +0300469 raise ValueError("Cluster {0} not found".format(name))
470
Yulia Portnova3556a062015-03-17 16:30:11 +0200471
472sections = {
473 'sahara': 'additional_components',
474 'murano': 'additional_components',
475 'ceilometer': 'additional_components',
476 'volumes_ceph': 'storage',
477 'images_ceph': 'storage',
478 'ephemeral_ceph': 'storage',
479 'objects_ceph': 'storage',
480 'osd_pool_size': 'storage',
481 'volumes_lvm': 'storage',
482 'volumes_vmdk': 'storage',
483 'tenant': 'access',
484 'password': 'access',
485 'user': 'access',
486 'vc_password': 'vcenter',
487 'cluster': 'vcenter',
488 'host_ip': 'vcenter',
489 'vc_user': 'vcenter',
490 'use_vcenter': 'vcenter',
491}
492
493
494def create_empty_cluster(conn, cluster_desc, debug_mode=False):
495 """Create new cluster with configuration provided"""
496
497 data = {}
498 data['nodes'] = []
499 data['tasks'] = []
500 data['name'] = cluster_desc['name']
501 data['release'] = cluster_desc['release']
502 data['mode'] = cluster_desc.get('deployment_mode')
503 data['net_provider'] = cluster_desc.get('net_provider')
504
505 params = conn.post(path='/api/clusters', params=data)
506 cluster = Cluster(conn, **params)
507
508 attributes = cluster.get_attributes()
509
510 ed_attrs = attributes['editable']
511
512 ed_attrs['common']['libvirt_type']['value'] = \
513 cluster_desc.get('libvirt_type', 'kvm')
514
515 if 'nodes' in cluster_desc:
516 use_ceph = cluster_desc['nodes'].get('ceph_osd', None) is not None
517 else:
518 use_ceph = False
519
520 if 'storage_type' in cluster_desc:
521 st = cluster_desc['storage_type']
522 if st == 'ceph':
523 use_ceph = True
524 else:
525 use_ceph = False
526
527 if use_ceph:
528 opts = ['ephemeral_ceph', 'images_ceph', 'images_vcenter']
529 opts += ['iser', 'objects_ceph', 'volumes_ceph']
530 opts += ['volumes_lvm', 'volumes_vmdk']
531
532 for name in opts:
533 val = ed_attrs['storage'][name]
534 if val['type'] == 'checkbox':
535 is_ceph = ('images_ceph' == name)
536 is_ceph = is_ceph or ('volumes_ceph' == name)
537
538 if is_ceph:
539 val['value'] = True
540 else:
541 val['value'] = False
542 # else:
543 # raise NotImplementedError("Non-ceph storages are not implemented")
544
545 cluster.set_attributes(attributes)
546
547 return cluster