blob: 6ca6a88cc0c43689be42a564888e67f747482bd4 [file] [log] [blame]
# Copyright 2017 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
import yaml
import requests
import os
from tcp_tests import logger
LOG = logger.logger
class K8sBaseResource(object):
resource_type = None
def __init__(self, manager, name=None, namespace=None, data=None):
self._manager = manager
self._name = name
self._namespace = namespace
self._read_cache = None
if data is not None:
self._update_cache(data)
def __repr__(self):
uid = 'unknown-uid'
if self._read_cache is not None:
uid = self.uid
return "<{0} name='{1}' namespace='{2}' uuid='{3}'>".format(
self.__class__.__name__, self.name, self.namespace, uid)
@property
def name(self):
return self._name
@property
def namespace(self):
return self._namespace
@property
def uid(self):
return self.read(cached=True).metadata.uid
def _update_cache(self, data):
self._read_cache = data
self._namespace = data.metadata.namespace
self._name = data.metadata.name
def read(self, cached=False, **kwargs):
if not cached:
self._update_cache(self._read(**kwargs))
return self._read_cache
def create(self, body, **kwargs):
LOG.info("K8S API Creating {0} with body:\n{1}".format(
self.resource_type, body))
self._update_cache(self._create(body, **kwargs))
return self
def patch(self, body, **kwargs):
LOG.info("K8S API Patching {0} name={1} ns={2} with body:\n{3}".format(
self.resource_type, self.name, self.namespace, body))
self._update_cache(self._patch(body, **kwargs))
return self
def replace(self, body, **kwargs):
self._update_cache(self._replace(body, **kwargs))
return self
def delete(self, **kwargs):
self._delete(**kwargs)
return self
def __eq__(self, other):
if not isinstance(other, K8sBaseResource):
return NotImplemented
if not isinstance(other, self.__class__):
return False
return self.uid == other.uid
class K8sBaseManager(object):
resource_class = None
def __init__(self, cluster):
self._cluster = cluster
@property
def resource_type(self):
return self.resource_class.resource_type
def get(self, name=None, namespace=None, data=None):
namespace = namespace or self._cluster.default_namespace
return self.resource_class(self, name, namespace, data)
def create(self, name=None, namespace=None, body=None, **kwargs):
return self.get(name=name, namespace=namespace).create(body, **kwargs)
def __resource_from_data(self, data):
return self.resource_class(self, data=data)
def __list_filter(self, items, name_prefix=None):
if name_prefix is not None:
items = [item for item in items if
item.metadata.name.startswith(name_prefix)]
return items
def __list_to_resource(self, items):
return [self.__resource_from_data(item) for item in items]
def list(self, namespace=None, name_prefix=None, **kwargs):
namespace = namespace or self._cluster.default_namespace
items = self._list(namespace=namespace, **kwargs).items
items = self.__list_filter(items, name_prefix=name_prefix)
return self.__list_to_resource(items)
def list_all(self, name_prefix=None, **kwargs):
items = self._list_all(**kwargs).items
items = self.__list_filter(items, name_prefix=name_prefix)
return self.__list_to_resource(items)
def read_yaml_str(yaml_str):
""" load yaml from string helper """
return yaml.safe_load(yaml_str)
def read_yaml_file(file_path, *args):
""" load yaml from joined file_path and *args helper """
with open(os.path.join(file_path, *args)) as f:
return yaml.safe_load(f)
def read_yaml_url(yaml_file_url):
""" load yaml from url helper """
return yaml.safe_load(requests.get(yaml_file_url).text)