Multi env support and Kube client integration
Kube friendly Beta
Package versions supports Kube env
Added:
- Env type detection
- New option: --use-env, for selecting env
when function supports multiple detected envs
- Updated config loading
- Each module and command type has supported env check
and stops execution if it is on unsupported env
- Functions can support multiple envs
- Kubernetes dependency
- Kubenernetes API detection: local and remote
- Package checking class hierachy for using Salt or Kube
- Remote pod execution routine
- Flexible SSH/SSH Forwarder classes: with, ssh,do(), etc
- Multithreaded SSH script execution
- Number of workers parameter, default 5
Fixed:
- Config dependency
- Command loading with supported envs list
- Unittests structure and execution flow updated
- Unittests fixes
- Fixed debug mode handling
- Unified command type/support routine
- Nested attrs getter/setter
Change-Id: I3ade693ac21536e2b5dcee4b24d511749dc72759
Related-PROD: PROD-35811
diff --git a/cfg_checker/common/__init__.py b/cfg_checker/common/__init__.py
index 752373f..6eecf92 100644
--- a/cfg_checker/common/__init__.py
+++ b/cfg_checker/common/__init__.py
@@ -2,8 +2,6 @@
from cfg_checker.common.other import Utils
-from cfg_checker.common.settings import config
-
def nested_set(_d, _keys, _value):
# # import and deepcopy for safety
@@ -18,4 +16,3 @@
utils = Utils()
logger = logger
logger_cli = logger_cli
-config = config
diff --git a/cfg_checker/common/config_file.py b/cfg_checker/common/config_file.py
index c70e5a6..513e0ec 100644
--- a/cfg_checker/common/config_file.py
+++ b/cfg_checker/common/config_file.py
@@ -2,11 +2,10 @@
import os
from . import logger_cli
+from .const import truth
class ConfigFile(object):
- _truth = ['true', '1', 't', 'y', 'yes', 'yeah', 'yup',
- 'certainly', 'uh-huh']
_config = None
_section_name = None
_config_filepath = None
@@ -45,7 +44,7 @@
return path
def _ensure_boolean(self, _value):
- if _value.lower() in self._truth:
+ if _value.lower() in truth:
return True
else:
return False
diff --git a/cfg_checker/common/const.py b/cfg_checker/common/const.py
index 3b17099..2ae41f6 100644
--- a/cfg_checker/common/const.py
+++ b/cfg_checker/common/const.py
@@ -54,7 +54,10 @@
uknown_code = "unk"
-all_roles_map = {
+ENV_TYPE_KUBE = "salt"
+ENV_TYPE_KUBE = "kube"
+
+all_salt_roles_map = {
"apt": "repository",
"bmk": "validation",
"cfg": "master",
@@ -98,3 +101,38 @@
_mainteiners_index_filename = "mainteiners.json"
_mirantis_versions_filename = "mirantis_v.json"
_other_versions_filename = "other_v.json"
+
+all_kube_roles_map = {
+ 'node-role.kubernetes.io/master': "k8s-master",
+ 'openstack-compute-node': "os-cmp",
+ 'openstack-control-plane': "os-ctl",
+ 'openstack-gateway': "os-gtw",
+ 'openvswitch': "ovs",
+ 'local-volume-provisioner': "",
+ 'ceph_role_mgr': "ceph-mgr",
+ 'ceph_role_mon': "ceph-mon",
+ 'com.docker.ucp.collection.shared': "ucp-shared",
+ 'com.docker.ucp.collection.system': "ucp-system",
+ 'com.docker.ucp.collection.swarm': "ucp-swarm",
+ 'com.docker.ucp.collection.root': "ucp-root",
+}
+
+truth = ['true', '1', 't', 'y', 'yes', 'yeah', 'yup', 'certainly', 'uh-huh']
+
+ubuntu_versions = {
+ "20.10": "Groovy Gorilla",
+ "20.04": "Focal Fossa",
+ "18.04": "Bionic Beaver",
+ "16.04": "Xenial Xerus",
+ "14.04": "Trusty Tahr",
+}
+
+nova_openstack_versions = {
+ "23": "wallaby",
+ "22": "victoria",
+ "21": "ussuri",
+ "20": "train",
+ "19": "stein",
+ "18": "rocky",
+ "17": "queens"
+}
diff --git a/cfg_checker/common/exception.py b/cfg_checker/common/exception.py
index 2536099..4ee3a99 100644
--- a/cfg_checker/common/exception.py
+++ b/cfg_checker/common/exception.py
@@ -19,18 +19,50 @@
self.message = "# Configuration error: {}".format(message)
+class CommandNotSupportedException(CheckerException):
+ def __init__(self, message, *args, **kwargs):
+ super(CommandNotSupportedException, self).__init__(
+ message,
+ *args,
+ **kwargs
+ )
+ self.message = "# Command not supported: {}".format(message)
+
+
+class CommandTypeNotSupportedException(CheckerException):
+ def __init__(self, message, *args, **kwargs):
+ super(CommandTypeNotSupportedException, self).__init__(
+ message,
+ *args,
+ **kwargs
+ )
+ self.message = "# Command type not supported: {}".format(message)
+
+
class SaltException(CheckerException):
def __init__(self, message, *args, **kwargs):
super(SaltException, self).__init__(message, *args, **kwargs)
self.message = "# Salt error: {}".format(message)
+class KubeException(CheckerException):
+ def __init__(self, message, *args, **kwargs):
+ super(KubeException, self).__init__(message, *args, **kwargs)
+ self.message = "# Kube client error: {}".format(message)
+
+
class InvalidReturnException(CheckerException):
def __init__(self, message, *args, **kwargs):
super(InvalidReturnException, self).__init__(message, *args, **kwargs)
self.message = "# Unexpected return value: {}".format(message)
+class TimeoutException(CheckerException):
+ def __init__(self, message, *args, **kwargs):
+ super(TimeoutException, self).__init__(message, *args, **kwargs)
+ self.message = "# Timed out waiting: {}".format(message)
+
+
class ErrorMappingException(CheckerException):
def __init__(self, message, *args, **kwargs):
super(ErrorMappingException, self).__init__(message, *args, **kwargs)
diff --git a/cfg_checker/common/file_utils.py b/cfg_checker/common/file_utils.py
index 398ea66..6fbb675 100644
--- a/cfg_checker/common/file_utils.py
+++ b/cfg_checker/common/file_utils.py
@@ -1,11 +1,12 @@
+import atexit
import grp
import os
import pwd
import time
+import tempfile
-from cfg_checker.common import config
-
-_default_time_format = config.date_format
+_default_time_format = "%Y-%m-%d %H:%M:%S.%f%z"
+_temp_files = {}
def remove_file(filename):
@@ -110,3 +111,35 @@
return "... folder '{}' removed".format(_folder)
else:
return "... folder '{}' not exists".format(_folder)
+
+
+def _cleanup_temp_files():
+ global _temp_files
+ for temp_file in _temp_files.values():
+ try:
+ os.remove(temp_file)
+ except OSError:
+ pass
+ _temp_files = {}
+
+
+def create_temp_file_with_content(content, mode=None):
+ if len(_temp_files) == 0:
+ atexit.register(_cleanup_temp_files)
+ # Because we may change context several times, try to remember files we
+ # created and reuse them at a small memory cost.
+ content_key = hash(content)
+ if content_key in _temp_files:
+ return _temp_files[content_key]
+
+ # new file, create it
+ _, name = tempfile.mkstemp()
+ _temp_files[content_key] = name
+ with open(name, 'wb') as fd:
+ fd.write(content.encode() if isinstance(content, str) else content)
+
+ # set mode for the file
+ if mode:
+ os.chmod(name, mode)
+
+ return name
diff --git a/cfg_checker/common/kube_utils.py b/cfg_checker/common/kube_utils.py
new file mode 100644
index 0000000..3f56f0e
--- /dev/null
+++ b/cfg_checker/common/kube_utils.py
@@ -0,0 +1,282 @@
+"""
+Module to handle interaction with Kube
+"""
+import base64
+import os
+import urllib3
+import yaml
+
+from kubernetes import client as kclient, config as kconfig
+from kubernetes.stream import stream
+
+from cfg_checker.common import logger, logger_cli
+from cfg_checker.common.exception import InvalidReturnException, KubeException
+from cfg_checker.common.file_utils import create_temp_file_with_content
+from cfg_checker.common.other import utils, shell
+from cfg_checker.common.ssh_utils import ssh_shell_p
+
+urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
+
+
+def _init_kube_conf_local(config):
+ # Init kube library locally
+ try:
+ kconfig.load_kube_config()
+ logger_cli.debug(
+ "...found Kube env: core, {}". format(
+ ",".join(
+ kclient.CoreApi().get_api_versions().versions
+ )
+ )
+ )
+ return kconfig, kclient.ApiClient()
+ except Exception as e:
+ logger.warn("Failed to init local Kube client: {}".format(
+ str(e)
+ )
+ )
+ return None, None
+
+
+def _init_kube_conf_remote(config):
+ # init remote client
+ # Preload Kube token
+ """
+ APISERVER=$(kubectl config view --minify |
+ grep server | cut -f 2- -d ":" | tr -d " ")
+ SECRET_NAME=$(kubectl get secrets |
+ grep ^default | cut -f1 -d ' ')
+ TOKEN=$(kubectl describe secret $SECRET_NAME |
+ grep -E '^token' | cut -f2 -d':' | tr -d " ")
+
+ echo "Detected API Server at: '${APISERVER}'"
+ echo "Got secret: '${SECRET_NAME}'"
+ echo "Loaded token: '${TOKEN}'"
+
+ curl $APISERVER/api
+ --header "Authorization: Bearer $TOKEN" --insecure
+ """
+ import yaml
+
+ _c_data = ssh_shell_p(
+ "sudo cat " + config.kube_config_path,
+ config.ssh_host,
+ username=config.ssh_user,
+ keypath=config.ssh_key,
+ piped=False,
+ use_sudo=config.ssh_uses_sudo,
+ )
+
+ _conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
+
+ _kube_conf = kclient.Configuration()
+ # A remote host configuration
+
+ # To work with remote cluster, we need to extract these
+ # keys = ['host', 'ssl_ca_cert', 'cert_file', 'key_file', 'verify_ssl']
+ # When v12 of the client will be release, we will use load_from_dict
+
+ _kube_conf.ssl_ca_cert = create_temp_file_with_content(
+ base64.standard_b64decode(
+ _conf['clusters'][0]['cluster']['certificate-authority-data']
+ )
+ )
+ _host = _conf['clusters'][0]['cluster']['server']
+ _kube_conf.cert_file = create_temp_file_with_content(
+ base64.standard_b64decode(
+ _conf['users'][0]['user']['client-certificate-data']
+ )
+ )
+ _kube_conf.key_file = create_temp_file_with_content(
+ base64.standard_b64decode(
+ _conf['users'][0]['user']['client-key-data']
+ )
+ )
+ if "http" not in _host or "443" not in _host:
+ logger_cli.error(
+ "Failed to extract Kube host: '{}'".format(_host)
+ )
+ else:
+ logger_cli.debug(
+ "...'context' host extracted: '{}' via SSH@{}".format(
+ _host,
+ config.ssh_host
+ )
+ )
+
+ # Substitute context host to ours
+ _tmp = _host.split(':')
+ _kube_conf.host = \
+ _tmp[0] + "://" + config.mcp_host + ":" + _tmp[2]
+ config.kube_port = _tmp[2]
+ logger_cli.debug(
+ "...kube remote host updated to {}".format(
+ _kube_conf.host
+ )
+ )
+ _kube_conf.verify_ssl = False
+ _kube_conf.debug = config.debug
+ # Nevertheless if you want to do it
+ # you can with these 2 parameters
+ # configuration.verify_ssl=True
+ # ssl_ca_cert is the filepath
+ # to the file that contains the certificate.
+ # configuration.ssl_ca_cert="certificate"
+
+ # _kube_conf.api_key = {
+ # "authorization": "Bearer " + config.kube_token
+ # }
+
+ # Create a ApiClient with our config
+ _kube_api = kclient.ApiClient(_kube_conf)
+
+ return _kube_conf, _kube_api
+
+
+class KubeApi(object):
+ def __init__(self, config):
+ self.config = config
+ self._init_kclient()
+ self.last_response = None
+
+ def _init_kclient(self):
+ # if there is no password - try to get local, if this available
+ logger_cli.debug("# Initializong Kube config...")
+ if self.config.env_name == "local":
+ self.kConf, self.kApi = _init_kube_conf_local(self.config)
+ self.is_local = True
+ # Load local config data
+ if os.path.exists(self.config.kube_config_path):
+ _c_data = shell("sudo cat " + self.config.kube_config_path)
+ _conf = yaml.load(_c_data, Loader=yaml.SafeLoader)
+ self.user_keypath = create_temp_file_with_content(
+ base64.standard_b64decode(
+ _conf['users'][0]['user']['client-key-data']
+ )
+ )
+ self.yaml_conf = _c_data
+ else:
+ self.kConf, self.kApi = _init_kube_conf_remote(self.config)
+ self.is_local = False
+
+ def get_versions_api(self):
+ # client.CoreApi().get_api_versions().versions
+ return kclient.VersionApi(self.kApi)
+
+
+class KubeRemote(KubeApi):
+ def __init__(self, config):
+ super(KubeRemote, self).__init__(config)
+ self._coreV1 = None
+
+ @property
+ def CoreV1(self):
+ if not self._coreV1:
+ self._coreV1 = kclient.CoreV1Api(self.kApi)
+ return self._coreV1
+
+ @staticmethod
+ def _typed_list_to_dict(i_list):
+ _dict = {}
+ for _item in i_list:
+ _d = _item.to_dict()
+ _type = _d.pop("type")
+ _dict[_type.lower()] = _d
+
+ return _dict
+
+ @staticmethod
+ def _get_listed_attrs(items, _path):
+ _list = []
+ for _n in items:
+ _list.append(utils.rgetattr(_n, _path))
+
+ return _list
+
+ def get_node_info(self, http=False):
+ # Query API for the nodes and do some presorting
+ _nodes = {}
+ if http:
+ _raw_nodes = self.CoreV1.list_node_with_http_info()
+ else:
+ _raw_nodes = self.CoreV1.list_node()
+
+ if not isinstance(_raw_nodes, kclient.models.v1_node_list.V1NodeList):
+ raise InvalidReturnException(
+ "Invalid return type: '{}'".format(type(_raw_nodes))
+ )
+
+ for _n in _raw_nodes.items:
+ _name = _n.metadata.name
+ _d = _n.to_dict()
+ # parse inner data classes as dicts
+ _d['addresses'] = self._typed_list_to_dict(_n.status.addresses)
+ _d['conditions'] = self._typed_list_to_dict(_n.status.conditions)
+ # Update 'status' type
+ if isinstance(_d['conditions']['ready']['status'], str):
+ _d['conditions']['ready']['status'] = utils.to_bool(
+ _d['conditions']['ready']['status']
+ )
+ # Parse image names?
+ # TODO: Here is the place where we can parse each node image names
+
+ # Parse roles
+ _d['labels'] = {}
+ for _label, _data in _d["metadata"]["labels"].items():
+ if _data.lower() in ["true", "false"]:
+ _d['labels'][_label] = utils.to_bool(_data)
+ else:
+ _d['labels'][_label] = _data
+
+ # Save
+ _nodes[_name] = _d
+
+ # debug report on how many nodes detected
+ logger_cli.debug("...node items returned '{}'".format(len(_nodes)))
+
+ return _nodes
+
+ def exec_on_target_pod(
+ self,
+ cmd,
+ pod_name,
+ namespace,
+ strict=False,
+ _request_timeout=120,
+ **kwargs
+ ):
+ _pods = {}
+ _pods = self._coreV1.list_namespaced_pod(namespace)
+ _names = self._get_listed_attrs(_pods.items, "metadata.name")
+
+ _pname = ""
+ if not strict:
+ _pname = [n for n in _names if n.startswith(pod_name)]
+ if len(_pname) > 1:
+ logger_cli.debug(
+ "...more than one pod found for '{}': {}\n"
+ "...using first one".format(
+ pod_name,
+ ", ".join(_pname)
+ )
+ )
+ _pname = _pname[0]
+ elif len(_pname) < 1:
+ raise KubeException("No pods found for '{}'".format(pod_name))
+ else:
+ _pname = pod_name
+
+ _r = stream(
+ self.CoreV1.connect_get_namespaced_pod_exec,
+ _pname,
+ namespace,
+ command=cmd.split(),
+ stderr=True,
+ stdin=False,
+ stdout=True,
+ tty=False,
+ _request_timeout=_request_timeout,
+ **kwargs
+ )
+
+ return _r
diff --git a/cfg_checker/common/log.py b/cfg_checker/common/log.py
index 6edac2f..4c1c02c 100644
--- a/cfg_checker/common/log.py
+++ b/cfg_checker/common/log.py
@@ -91,6 +91,6 @@
'cfg_checker',
log_fname=os.path.join(
pkg_dir,
- os.getenv('LOGFILE', 'cfg_checker.log')
+ os.getenv('MCP_LOGFILE', 'cfg_checker.log')
)
)
diff --git a/cfg_checker/common/other.py b/cfg_checker/common/other.py
index 5a4c552..e3a3271 100644
--- a/cfg_checker/common/other.py
+++ b/cfg_checker/common/other.py
@@ -1,8 +1,11 @@
+import functools
import os
import re
import subprocess
-from cfg_checker.common.const import all_roles_map, uknown_code
+from cfg_checker.common import logger_cli
+from cfg_checker.common.const import all_salt_roles_map, uknown_code, \
+ truth
from cfg_checker.common.exception import ConfigException
pkg_dir = os.path.dirname(__file__)
@@ -11,10 +14,22 @@
pkg_dir = os.path.abspath(pkg_dir)
+# 'Dirty' and simple way to execute piped cmds
+def piped_shell(command):
+ logger_cli.debug("...cmd:'{}'".format(command))
+ _code, _out = subprocess.getstatusoutput(command)
+ if _code:
+ logger_cli.error("Non-zero return code: {}, '{}'".format(_code, _out))
+ return _out
+
+
+# 'Proper way to execute shell
def shell(command):
+ logger_cli.debug("...cmd:'{}'".format(command))
_ps = subprocess.Popen(
command.split(),
- stdout=subprocess.PIPE
+ stdout=subprocess.PIPE,
+ universal_newlines=False
).communicate()[0].decode()
return _ps
@@ -69,7 +84,7 @@
# node role code checks
_code = re.findall(r"[a-zA-Z]+", fqdn.split('.')[0])
if len(_code) > 0:
- if _code[0] in all_roles_map:
+ if _code[0] in all_salt_roles_map:
return _result()
else:
# log warning here
@@ -107,11 +122,11 @@
if _isvalid:
# try to match it with ones in map
_c = _code[0]
- match = any([r in _c for r in all_roles_map.keys()])
+ match = any([r in _c for r in all_salt_roles_map.keys()])
if match:
# no match, try to find it
match = False
- for r in all_roles_map.keys():
+ for r in all_salt_roles_map.keys():
_idx = _c.find(r)
if _idx > -1:
_c = _c[_idx:]
@@ -153,5 +168,25 @@
return _valid, _invalid
+ @staticmethod
+ def to_bool(value):
+ if value.lower() in truth:
+ return True
+ else:
+ return False
+
+ # helper functions to get nested attrs
+ # https://stackoverflow.com/questions/31174295/getattr-and-setattr-on-nested-subobjects-chained-properties
+ # using wonder's beautiful simplification:
+ # https://stackoverflow.com/questions/31174295/getattr-and-setattr-on-nested-objects/31174427?noredirect=1#comment86638618_31174427
+ def rsetattr(self, obj, attr, val):
+ pre, _, post = attr.rpartition('.')
+ return setattr(self.rgetattr(obj, pre) if pre else obj, post, val)
+
+ def rgetattr(self, obj, attr, *args):
+ def _getattr(obj, attr):
+ return getattr(obj, attr, *args)
+ return functools.reduce(_getattr, [obj] + attr.split('.'))
+
utils = Utils()
diff --git a/cfg_checker/common/salt_utils.py b/cfg_checker/common/salt_utils.py
index b3f5cae..f7ea50b 100644
--- a/cfg_checker/common/salt_utils.py
+++ b/cfg_checker/common/salt_utils.py
@@ -7,12 +7,13 @@
import requests
-from cfg_checker.common import config, logger, logger_cli
+from cfg_checker.common import logger, logger_cli
from cfg_checker.common.exception import InvalidReturnException, SaltException
from cfg_checker.common.other import shell
+from cfg_checker.common.ssh_utils import ssh_shell_p
-def _extract_password(_raw):
+def _extract_salt_password(_raw):
if not isinstance(_raw, str):
raise InvalidReturnException(_raw)
else:
@@ -26,45 +27,40 @@
return _json["local"]
-def get_remote_env_password():
+def get_remote_salt_env_password(config):
"""Uses ssh call with configured options to get password from salt master
:return: password string
"""
_salt_cmd = "salt-call --out=json pillar.get _param:salt_api_password"
- _ssh_cmd = ["ssh"]
- # Build SSH cmd
- if config.ssh_key:
- _ssh_cmd.append("-i " + config.ssh_key)
- if config.ssh_user:
- _ssh_cmd.append(config.ssh_user+'@'+config.ssh_host)
- else:
- _ssh_cmd.append(config.ssh_host)
- if config.ssh_uses_sudo:
- _ssh_cmd.append("sudo")
-
- _ssh_cmd.append(_salt_cmd)
- _ssh_cmd = " ".join(_ssh_cmd)
- logger_cli.debug("... calling salt: '{}'".format(_ssh_cmd))
+ logger_cli.debug("... calling salt using ssh: '{}'".format(_salt_cmd))
try:
- _result = shell(_ssh_cmd)
+ _result = ssh_shell_p(
+ _salt_cmd,
+ config.ssh_host,
+ username=config.ssh_user,
+ keypath=config.ssh_key,
+ piped=False,
+ use_sudo=config.ssh_uses_sudo,
+ silent=True
+ )
if len(_result) < 1:
raise InvalidReturnException(
"# Empty value returned for '{}".format(
- _ssh_cmd
+ _salt_cmd
)
)
else:
- return _extract_password(_result)
+ return _extract_salt_password(_result)
except OSError as e:
raise SaltException(
"Salt error calling '{}': '{}'\n"
- "\nConsider checking 'SALT_ENV' "
- "and '<pkg>/etc/<env>.env' files".format(_ssh_cmd, e.strerror)
+ "\nConsider checking 'MCP_ENV' "
+ "and '<pkg>/etc/<env>.env' files".format(_salt_cmd, e.strerror)
)
-def get_local_password():
+def get_salt_local_password(config):
"""Calls salt locally to get password from the pillar
:return: password string
@@ -80,10 +76,10 @@
except OSError as e:
raise SaltException(
"Salt error calling '{}': '{}'\n"
- "\nConsider checking 'SALT_ENV' "
+ "\nConsider checking 'MCP_ENV' "
"and '<pkg>/etc/<env>.env' files".format(_cmd, e.strerror)
)
- return _extract_password(_result)
+ return _extract_salt_password(_result)
def list_to_target_string(node_list, separator):
@@ -94,9 +90,6 @@
class SaltRest(object):
- _host = config.salt_host
- _port = config.salt_port
- uri = "http://" + config.salt_host + ":" + config.salt_port
_auth = {}
default_headers = {
@@ -105,7 +98,13 @@
'X-Auth-Token': None
}
- def __init__(self):
+ def __init__(self, config):
+ self.config = config
+
+ self._host = config.mcp_host
+ self._port = config.salt_port
+ self.uri = "http://" + config.mcp_host + ":" + config.salt_port
+
self._token = self._login()
self.last_response = None
@@ -154,12 +153,12 @@
def _login(self):
# if there is no password - try to get local, if this available
- if config.salt_env == "local":
- _pass = get_local_password()
+ if self.config.env_name == "local":
+ _pass = get_salt_local_password(self.config)
else:
- _pass = get_remote_env_password()
+ _pass = get_remote_salt_env_password(self.config)
login_payload = {
- 'username': config.salt_user,
+ 'username': self.config.salt_user,
'password': _pass,
'eauth': 'pam'
}
@@ -212,8 +211,8 @@
class SaltRemote(SaltRest):
master_node = ""
- def __init__(self):
- super(SaltRemote, self).__init__()
+ def __init__(self, config):
+ super(SaltRemote, self).__init__(config)
def cmd(
self,
@@ -226,7 +225,7 @@
tgt_type=None,
timeout=None
):
- _timeout = timeout if timeout is not None else config.salt_timeout
+ _timeout = timeout if timeout is not None else self.config.salt_timeout
_payload = {
'fun': fun,
'tgt': tgt,
@@ -256,7 +255,7 @@
_payload = {
'client': 'runner',
'fun': fun,
- 'timeout': config.salt_timeout
+ 'timeout': self.config.salt_timeout
}
if kwarg:
@@ -275,7 +274,7 @@
_payload = {
'client': 'wheel',
'fun': fun,
- 'timeout': config.salt_timeout
+ 'timeout': self.config.salt_timeout
}
if arg:
@@ -358,11 +357,13 @@
:return: json result from salt test.ping
"""
- if config.skip_nodes:
- logger.info("# Nodes to be skipped: {0}".format(config.skip_nodes))
+ if self.config.skip_nodes:
+ logger.info(
+ "# Nodes to be skipped: {0}".format(self.config.skip_nodes)
+ )
_r = self.cmd(
'* and not ' + list_to_target_string(
- config.skip_nodes,
+ self.config.skip_nodes,
'and not'
),
'test.ping',
diff --git a/cfg_checker/common/settings.py b/cfg_checker/common/settings.py
index cca5142..deebbc0 100644
--- a/cfg_checker/common/settings.py
+++ b/cfg_checker/common/settings.py
@@ -1,11 +1,15 @@
import os
+import json
+import pwd
import sys
from cfg_checker.common.exception import ConfigException
-
from cfg_checker.common.log import logger_cli
-from cfg_checker.common.other import utils
+from cfg_checker.common.other import utils, shell
+from cfg_checker.common.ssh_utils import ssh_shell_p
+
+from cfg_checker.clients import get_kube_remote
pkg_dir = os.path.dirname(__file__)
pkg_dir = os.path.join(pkg_dir, os.pardir, os.pardir)
@@ -14,6 +18,31 @@
_default_work_folder = os.path.normpath(pkg_dir)
+ENV_TYPE_GLOB = "MCP"
+ENV_TYPE_SALT = "SALT"
+ENV_TYPE_KUBE = "KUBE"
+ENV_TYPE_LINUX = "LINUX"
+
+ENV_LOCAL = "local"
+
+supported_envs = [ENV_TYPE_LINUX, ENV_TYPE_SALT, ENV_TYPE_KUBE]
+
+
+def _extract_salt_return(_raw):
+ if not isinstance(_raw, str):
+ _json = _raw
+ logger_cli.debug("...ambigious return detected")
+ else:
+ try:
+ _json = json.loads(_raw)
+ except ValueError:
+ _json = _raw
+ logger_cli.debug(
+ "...return value is not a json: '{}'".format(_raw)
+ )
+
+ return _json
+
class CheckerConfiguration(object):
@staticmethod
@@ -28,10 +57,117 @@
else:
return None
- def _init_values(self):
+ def _detect(self, _type):
+ logger_cli.debug("...detecting '{}'".format(_type))
+ if _type is None:
+ raise ConfigException("# Unexpected supported env type")
+ elif _type == ENV_TYPE_SALT:
+ # Detect salt env
+ _detect_cmd = ["curl", "-s"]
+ _detect_cmd.append(
+ "http://" + self.mcp_host + ':' + self.salt_port
+ )
+ # Try to call salt API on target host
+ _r = None
+ logger_cli.debug("...trying to detect env type '{}'".format(_type))
+ if self.env_name == ENV_LOCAL:
+ _r = shell(" ".join(_detect_cmd))
+ else:
+ _r = ssh_shell_p(
+ " ".join(_detect_cmd),
+ self.ssh_host,
+ username=self.ssh_user,
+ keypath=self.ssh_key,
+ piped=False,
+ use_sudo=self.ssh_uses_sudo,
+ silent=True
+ )
+ # Parse return
+ _r = _extract_salt_return(_r)
+
+ if len(_r) < 1:
+ return False
+ elif _r["return"] == "Welcome":
+ return True
+ else:
+ return False
+ elif _type == ENV_TYPE_KUBE:
+ _kube = get_kube_remote(self)
+ try:
+ _vApi = _kube.get_versions_api()
+ _v = _vApi.get_code()
+ if hasattr(_v, "platform") and \
+ hasattr(_v, "major") and \
+ hasattr(_v, "minor"):
+ _host = "localhost" if _kube.is_local else _kube.kConf.host
+ logger_cli.info(
+ "# Kube server found: {}:{} on '{}'".format(
+ _v.major,
+ _v.minor,
+ _host
+ )
+ )
+ return True
+ else:
+ return False
+ except Exception as e:
+ logger_cli.warn(
+ "# Unexpected error finding Kube env: '{}' ".format(
+ str(e)
+ )
+ )
+ return False
+ elif _type == ENV_TYPE_LINUX:
+ # Detect Linux env
+ from platform import system, release
+ _s = system()
+ _r = release()
+ logger_cli.debug("...running on {} {}".format(_s, _r))
+ if _s in ['Linux', 'Darwin']:
+ return True
+ else:
+ return False
+ else:
+ raise ConfigException(
+ "# Env type of '{}' is not supported".format(
+ _type
+ )
+ )
+
+ def _detect_types(self):
+ """Try to detect env type based on the name
+ """
+ self.detected_envs = []
+ logger_cli.info('# Detecting env types')
+ for _env in supported_envs:
+ if self._detect(_env):
+ logger_cli.info("# '{}' found".format(_env))
+ self.detected_envs.append(_env)
+ else:
+ logger_cli.info("# '{}' not found".format(_env))
+
+ return
+
+ def _init_mcp_values(self):
"""Load values from environment variables or put default ones
"""
-
+ # filter vars and preload if needed
+ self.salt_vars = []
+ self.kube_vars = []
+ for _key, _value in self.vars:
+ if _key.startswith(ENV_TYPE_GLOB):
+ os.environ[_key] = _value
+ elif _key.startswith(ENV_TYPE_SALT):
+ self.salt_vars.append([_key, _value])
+ elif _key.startswith(ENV_TYPE_KUBE):
+ self.kube_vars.append([_key, _value])
+ else:
+ logger_cli.warn(
+ "Unsupported config variable: '{}={}'".format(
+ _key,
+ _value
+ )
+ )
self.name = "CheckerConfig"
self.working_folder = os.environ.get(
'CFG_TESTS_WORK_DIR',
@@ -43,24 +179,86 @@
self.pkg_versions_map = 'versions_map.csv'
self.ssh_uses_sudo = False
- self.ssh_key = os.environ.get('SSH_KEY', None)
- self.ssh_user = os.environ.get('SSH_USER', None)
- self.ssh_host = os.environ.get('SSH_HOST', None)
+ self.ssh_key = os.environ.get('MCP_SSH_KEY', None)
+ self.ssh_user = os.environ.get('MCP_SSH_USER', None)
+ self.ssh_host = os.environ.get('MCP_SSH_HOST', None)
- self.salt_host = os.environ.get('SALT_URL', None)
- self.salt_port = os.environ.get('SALT_PORT', '6969')
- self.salt_user = os.environ.get('SALT_USER', 'salt')
- self.salt_timeout = os.environ.get('SALT_TIMEOUT', 30)
- self.salt_file_root = os.environ.get('SALT_FILE_ROOT', None)
- self.salt_scripts_folder = os.environ.get(
- 'SALT_SCRIPTS_FOLDER',
- 'cfg_checker_scripts'
- )
+ self.mcp_host = os.environ.get('MCP_ENV_HOST', None)
+ self.salt_port = os.environ.get('MCP_SALT_PORT', '6969')
+ self.threads = int(os.environ.get('MCP_THREADS', "5"))
self.skip_nodes = utils.node_string_to_list(os.environ.get(
'CFG_SKIP_NODES',
None
))
+ # prebuild user data and folder path
+ self.pw_user = pwd.getpwuid(os.getuid())
+ if self.env_name == "local":
+ pass
+ else:
+ if not self.ssh_key and not self.force_no_key:
+ raise ConfigException(
+ "Please, supply a key for the cluster's master node. "
+ "Use MCP_SSH_KEY, see 'etc/example.env'"
+ )
+
+ def _init_env_values(self):
+ if ENV_TYPE_SALT in self.detected_envs:
+ for _key, _value in self.salt_vars:
+ os.environ[_key] = _value
+
+ self.salt_user = os.environ.get('SALT_USER', 'salt')
+ self.salt_timeout = os.environ.get('SALT_TIMEOUT', 30)
+ self.salt_file_root = os.environ.get('SALT_FILE_ROOT', None)
+ self.salt_scripts_folder = os.environ.get(
+ 'SALT_SCRIPTS_FOLDER',
+ 'cfg_checker_scripts'
+ )
+ elif ENV_TYPE_KUBE in self.detected_envs:
+ for _key, _value in self.kube_vars:
+ os.environ[_key] = _value
+
+ self.kube_config_root = os.environ.get('KUBE_CONFIG_ROOT', None)
+ self.kube_scripts_folder = os.environ.get(
+ 'KUBE_SCRIPTS_FOLDER',
+ None
+ )
+ self.kube_node_user = os.environ.get(
+ 'KUBE_NODE_USER',
+ 'ubuntu'
+ )
+ self.kube_node_keypath = os.environ.get(
+ 'KUBE_NODE_KEYPATH',
+ None
+ )
+ # Warn user only if Kube env is detected locally
+ if self.env_name == "local":
+ if not os.path.exists(self.kube_config_path):
+ logger_cli.warn(
+ "Kube config path not found on local env: '{}'".format(
+ self.kube_config_path
+ )
+ )
+ # On local envs, KUBE_NODE_KEYPATH is mandatory and is
+ # provided to give cfg-checker access to kube nodes
+ if not self.kube_node_keypath and not self.force_no_key:
+ raise ConfigException(
+ "Please, supply a key for the cluster nodes. "
+ "Use KUBE_NODE_KEYPATH, see 'etc/example.env'. "
+ "Consider checking KUBE_NODE_USER as well"
+ )
+ else:
+ # Init keys for nodes in case of remote env
+ # KUBE_NODE_KEYPATH is provided in case of nodes key would be
+ # different to master nodes key, which is supplied
+ # using MCP_SSH_KEY (mandatory) and, for the most cases,
+ # should be the same for remote envs
+ if not self.kube_node_keypath and not self.force_no_key:
+ logger_cli.debug(
+ "... using MCP_SSH_KEY as node keys. "
+ "Supply KUBE_NODE_KEYPATH to update."
+ )
+ self.kube_node_keypath = self.ssh_key
def _init_env(self, env_name=None):
"""Inits the environment vars from the env file
@@ -69,6 +267,7 @@
Keyword Arguments:
env_name {str} -- environment name to search configuration
files in etc/<env_name>.env (default: {None})
+ env_type {str} -- environment type to use: salt/kube
Raises:
ConfigException -- on IO error when loading env file
@@ -76,7 +275,7 @@
"""
# load env file as init os.environment with its values
if env_name is None:
- _env_name = 'local'
+ _env_name = ENV_LOCAL
else:
_env_name = env_name
_config_path = os.path.join(pkg_dir, 'etc', _env_name + '.env')
@@ -94,6 +293,7 @@
_config_path
)
)
+ self.vars = []
for index in range(len(_list)):
_line = _list[index]
# skip comments
@@ -101,13 +301,14 @@
continue
# validate
_errors = []
- if _line.find('=') < 0 or _line.count('=') > 1:
+ if len(_line) < 1:
+ _errors.append("Line {}: empty".format(index))
+ elif _line.find('=') < 0 or _line.count('=') > 1:
_errors.append("Line {}: {}".format(index, _line))
else:
# save values
_t = _line.split('=')
- _key, _value = _t[0], _t[1]
- os.environ[_key] = _value
+ self.vars.append([_t[0], _t[1]])
# if there was errors, report them
if _errors:
raise ConfigException(
@@ -121,11 +322,15 @@
len(_list)
)
)
- self.salt_env = _env_name
+ self.env_name = _env_name
- def __init__(self):
+ def __init__(self, args):
"""Base configuration class. Only values that are common for all scripts
"""
+ self.ssh_uses_sudo = args.sudo
+ self.kube_config_path = args.kube_config_path
+ self.debug = args.debug
+ self.force_no_key = args.force_no_key
# Make sure we running on Python 3
if sys.version_info[0] < 3 and sys.version_info[1] < 5:
logger_cli.error("# ERROR: Python 3.5+ is required")
@@ -136,9 +341,45 @@
sys.version_info[1]
))
- _env = os.getenv('SALT_ENV', None)
+ _env = os.getenv('MCP_ENV', None)
+
+ # Init environment variables from file, validate
self._init_env(_env)
- self._init_values()
+ # Load Common vars for any type of the env
+ self._init_mcp_values()
+ # Detect env types present
+ self._detect_types()
+ # handle forced env type var
+ _forced_type = os.getenv('MCP_TYPE_FORCE', None)
+ if _forced_type in supported_envs:
+ self.detected_envs.append(_forced_type)
+ elif _forced_type is not None:
+ logger_cli.warn(
+ "Unsupported forced type of '{}'".format(
+ _forced_type
+ )
+ )
+ # Check if any of the envs detected
+ if len(self.detected_envs) < 1:
+ if _env is None:
+ raise ConfigException("No environment types detected locally")
+ else:
+ raise ConfigException(
+ "No environment types detected at '{}'".format(
+ self.mcp_host
+ )
+ )
+ # Init vars that is specific to detected envs only
+ self._init_env_values()
-
-config = CheckerConfiguration()
+ # initialize path to folders
+ if self.env_name == "local":
+ # names and folders
+ self.user = self.pw_user.pw_name
+ self.homepath = self.pw_user.pw_dir
+ self.node_homepath = os.path.join('/home', self.kube_node_user)
+ else:
+ # names and folders in case of remote env
+ self.user = self.ssh_user
+ self.homepath = os.path.join('/home', self.ssh_user)
+ self.node_homepath = self.homepath
diff --git a/cfg_checker/common/ssh_utils.py b/cfg_checker/common/ssh_utils.py
new file mode 100644
index 0000000..fdf4c91
--- /dev/null
+++ b/cfg_checker/common/ssh_utils.py
@@ -0,0 +1,403 @@
+import queue
+import subprocess
+import traceback
+import threading
+
+from time import sleep
+from .exception import TimeoutException, CheckerException
+from .other import shell, piped_shell
+from .log import logger, logger_cli
+
+
+# We do not use paramiko here to preserve system level ssh config
+def ssh_shell_p(
+ command,
+ host,
+ username=None,
+ keypath=None,
+ port=None,
+ silent=False,
+ piped=False,
+ use_sudo=False
+):
+ _ssh_cmd = []
+ _ssh_cmd.append("ssh")
+ if silent:
+ _ssh_cmd.append("-q")
+ # Build SSH cmd
+ if keypath:
+ _ssh_cmd.append("-i " + keypath)
+ if port:
+ _ssh_cmd.append("-p " + str(port))
+ if username:
+ _ssh_cmd.append(username+'@'+host)
+ else:
+ _ssh_cmd.append(host)
+
+ if use_sudo:
+ _ssh_cmd.append("sudo")
+
+ _ssh_cmd.append(command)
+
+ _ssh_cmd = " ".join(_ssh_cmd)
+ if not piped:
+ return shell(_ssh_cmd)
+ else:
+ return piped_shell(_ssh_cmd)
+
+
+def scp_p(
+ source,
+ target,
+ port=None,
+ keypath=None,
+ silent=False,
+ piped=False
+):
+ _scp_cmd = []
+ _scp_cmd.append("scp")
+ if port:
+ _scp_cmd.append("-P " + str(port))
+ if silent:
+ _scp_cmd.append("-q")
+ # Build SSH cmd
+ if keypath:
+ _scp_cmd.append("-i " + keypath)
+ _scp_cmd.append(source)
+ _scp_cmd.append(target)
+ _scp_cmd = " ".join(_scp_cmd)
+ if not piped:
+ return shell(_scp_cmd)
+ else:
+ return piped_shell(_scp_cmd)
+
+
+def output_reader(_stdout, outq):
+ for line in iter(_stdout.readline, b''):
+ outq.put(line.decode('utf-8'))
+
+
+# Base class for all SSH related actions
+class SshBase(object):
+ def __init__(
+ self,
+ tgt_host,
+ user=None,
+ keypath=None,
+ port=None,
+ timeout=15,
+ silent=False,
+ piped=False
+ ):
+ self._cmd = ["ssh"]
+ self.timeout = timeout
+ self.port = port if port else 22
+ self.host = tgt_host
+ self.username = user
+ self.keypath = keypath
+ self.silent = silent
+ self.piped = piped
+ self.output = []
+
+ self._options = ["-tt"]
+ if self.keypath:
+ self._options += ["-i", self.keypath]
+ if self.port:
+ self._options += ["-p", str(self.port)]
+ self._extra_options = [
+ "-o", "UserKnownHostsFile=/dev/null",
+ "-o", "StrictHostKeyChecking=no"
+ ]
+
+ self._host_uri = ""
+ if self.username:
+ self._host_uri = self.username + "@" + self.host
+ else:
+ self._host_uri = self.host
+
+ def _connect(self, banner="Welcome"):
+ if not isinstance(banner, str):
+ raise CheckerException(
+ "Invalid SSH banner type: '{}'".format(type(banner))
+ )
+ logger.debug("...connecting")
+ while True:
+ try:
+ line = self.outq.get(block=False)
+ self.output.append(line)
+ if line.startswith(banner):
+ break
+ except queue.Empty:
+ logger.debug("... {} sec".format(self.timeout))
+ sleep(1)
+ self.timeout -= 1
+ if not self.timeout:
+ logger.debug(
+ "...timed out after {} sec".format(str(self.timeout))
+ )
+ return False
+ logger.debug("...connected")
+ return True
+
+ def _wait_for_string(self, string):
+ logger.debug("...waiting for '{}'".format(string))
+ while True:
+ try:
+ line = self.outq.get(block=False)
+ line = line.decode() if isinstance(line, bytes) else line
+ self.output.append(line)
+ if not line.startswith(string):
+ continue
+ else:
+ break
+ except queue.Empty:
+ logger.debug("... {} sec".format(self.timeout))
+ sleep(1)
+ self.timeout -= 1
+ if not self.timeout:
+ logger.debug(
+ "...timed out after {} sec".format(str(self.timeout))
+ )
+ return False
+ logger.debug("...found")
+ return True
+
+ def _init_connection(self, cmd):
+ self._proc = subprocess.Popen(
+ cmd,
+ stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ universal_newlines=False,
+ bufsize=0
+ )
+ # Create thread safe output getter
+ self.outq = queue.Queue()
+ self._t = threading.Thread(
+ target=output_reader,
+ args=(self._proc.stdout, self.outq)
+ )
+ self._t.start()
+
+ # Track if there is an yes/no
+ if not self._connect():
+ raise TimeoutException(
+ "SSH connection to '{}'".format(self.host)
+ )
+
+ self.input = self._proc.stdin
+ self.get_output()
+ logger.debug(
+ "Connected. Banners:\n{}".format(
+ "".join(self.flush_output())
+ )
+ )
+
+ def _end_connection(self):
+ # Kill the ssh process if it is alive
+ if self._proc.poll() is None:
+ self._proc.kill()
+ self.get_output()
+
+ return
+
+ def do(self, cmd, timeout=30, sudo=False, strip_cmd=True):
+ cmd = cmd if isinstance(cmd, bytes) else bytes(cmd.encode('utf-8'))
+ logger.debug("...ssh: '{}'".format(cmd))
+ if sudo:
+ _cmd = b"sudo " + cmd
+ else:
+ _cmd = cmd
+ # run command
+ self.input.write(_cmd + b'\n')
+ # wait for completion
+ self.wait_ready(_cmd, timeout=timeout)
+ self.get_output()
+ _output = self.flush_output().replace('\r', '')
+ if strip_cmd:
+ return "\n".join(_output.splitlines()[1:])
+ else:
+ return _output
+
+ def get_output(self):
+ while True:
+ try:
+ line = self.outq.get(block=False)
+ line = str(line) if isinstance(line, bytes) else line
+ self.output.append(line)
+ except queue.Empty:
+ return self.output
+ return None
+
+ def flush_output(self, as_string=True):
+ _out = self.output
+ self.output = []
+ if as_string:
+ return "".join(_out)
+ else:
+ return _out
+
+ def wait_ready(self, cmd, timeout=60):
+ def _strip_cmd_carrets(_str, carret='\r', skip_chars=1):
+ _cnt = _str.count(carret)
+ while _cnt > 0:
+ _idx = _str.index(carret)
+ _str = _str[:_idx] + _str[_idx+1+skip_chars:]
+ _cnt -= 1
+ return _str
+ while True:
+ try:
+ _line = self.outq.get(block=False)
+ line = _line.decode() if isinstance(_line, bytes) else _line
+ # line = line.replace('\r', '')
+ self.output.append(line)
+ # check if this is the command itself and skip
+ if '$' in line:
+ _cmd = line.split('$', 1)[1].strip()
+ _cmd = _strip_cmd_carrets(_cmd)
+ if _cmd == cmd.decode():
+ continue
+ break
+ except queue.Empty:
+ logger.debug("... {} sec".format(timeout))
+ sleep(1)
+ timeout -= 1
+ if not timeout:
+ logger.debug("...timed out")
+ return False
+ return True
+
+ def wait_for_string(self, string, timeout=60):
+ if not self._wait_for_string(string):
+ raise TimeoutException(
+ "Time out waiting for string '{}'".format(string)
+ )
+ else:
+ return True
+
+
+class SshShell(SshBase):
+ def __enter__(self):
+ self._cmd = ["ssh"]
+ self._cmd += self._options
+ self._cmd += self._extra_options
+ self._cmd += [self._host_uri]
+
+ logger.debug("...shell to: '{}'".format(" ".join(self._cmd)))
+ self._init_connection(self._cmd)
+ return self
+
+ def __exit__(self, _type, _value, _traceback):
+ self._end_connection()
+ if _value:
+ logger.warn(
+ "Error running SSH:\r\n{}".format(
+ "".join(traceback.format_exception(
+ _type,
+ _value,
+ _traceback
+ ))
+ )
+ )
+
+ return True
+
+ def connect(self):
+ return self.__enter__()
+
+ def kill(self):
+ self._end_connection()
+
+ def get_host_path(self, path):
+ _uri = self.host + ":" + path
+ if self.username:
+ _uri = self.username + "@" + _uri
+ return _uri
+
+ def scp(self, _src, _dst):
+ self._scp_options = []
+ if self.keypath:
+ self._scp_options += ["-i", self.keypath]
+ if self.port:
+ self._scp_options += ["-P", str(self.port)]
+
+ _cmd = ["scp"]
+ _cmd += self._scp_options
+ _cmd += self._extra_options
+ _cmd += [_src]
+ _cmd += [_dst]
+
+ logger.debug("...scp: '{}'".format(" ".join(_cmd)))
+ _proc = subprocess.Popen(
+ _cmd,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE
+ )
+ _r = _proc.communicate()
+ _e = _r[1].decode() if _r[1] else ""
+ return _proc.returncode, _r[0].decode(), _e
+
+
+class PortForward(SshBase):
+ def __init__(
+ self,
+ host,
+ fwd_host,
+ user=None,
+ keypath=None,
+ port=None,
+ loc_port=10022,
+ fwd_port=22,
+ timeout=15
+ ):
+ super(PortForward, self).__init__(
+ host,
+ user=user,
+ keypath=keypath,
+ port=port,
+ timeout=timeout,
+ silent=True,
+ piped=False
+ )
+ self.f_host = fwd_host
+ self.l_port = loc_port
+ self.f_port = fwd_port
+
+ self._forward_options = [
+ "-L",
+ ":".join([str(self.l_port), self.f_host, str(self.f_port)])
+ ]
+
+ def __enter__(self):
+ self._cmd = ["ssh"]
+ self._cmd += self._forward_options
+ self._cmd += self._options
+ self._cmd += self._extra_options
+ self._cmd += [self._host_uri]
+
+ logger.debug(
+ "...port forwarding: '{}'".format(" ".join(self._cmd))
+ )
+ self._init_connection(self._cmd)
+ return self
+
+ def __exit__(self, _type, _value, _traceback):
+ self._end_connection()
+ if _value:
+ logger_cli.warn(
+ "Error running SSH:\r\n{}".format(
+ "".join(traceback.format_exception(
+ _type,
+ _value,
+ _traceback
+ ))
+ )
+ )
+
+ return True
+
+ def connect(self):
+ return self.__enter__()
+
+ def kill(self):
+ self._end_connection()