blob: dac917e1fa575fa767e562d4636f3c29c786b586 [file] [log] [blame]
import os
import json
import pwd
import sys
import yaml
from cfg_checker.common.const import ENV_TYPE_GLOB, ENV_TYPE_SALT
from cfg_checker.common.const import ENV_TYPE_KUBE, ENV_TYPE_LINUX, ENV_LOCAL
from cfg_checker.common.const import supported_envs
from cfg_checker.common.exception import ConfigException
from cfg_checker.common.log import logger_cli
from cfg_checker.common.other import utils, shell
from cfg_checker.common.ssh_utils import ssh_shell_p
from cfg_checker.clients import get_kube_remote
pkg_dir = os.path.dirname(__file__)
pkg_dir = os.path.join(pkg_dir, os.pardir, os.pardir)
pkg_dir = os.path.normpath(pkg_dir)
pkg_dir = os.path.abspath(pkg_dir)
_default_work_folder = os.path.normpath(pkg_dir)
def _extract_salt_return(_raw):
if not isinstance(_raw, str):
_json = _raw
logger_cli.debug("... ambigious return detected")
else:
try:
_json = json.loads(_raw)
except ValueError:
_json = _raw
logger_cli.debug(
"... return value is not a json: '{}'".format(_raw)
)
return _json
class CheckerConfiguration(object):
@staticmethod
def load_nodes_list():
_file = os.environ.get('SALT_NODE_LIST_FILE', None)
if _file:
_v, _ = utils.get_nodes_list(
os.path.join(pkg_dir, _file),
env_sting=os.environ.get('CFG_ALL_NODES', None)
)
return _v
else:
return None
def _detect(self, _type):
logger_cli.debug("... detecting '{}'".format(_type))
if _type is None:
raise ConfigException("# Unexpected supported env type")
elif _type == ENV_TYPE_SALT:
# Detect salt env
_detect_cmd = ["curl", "-s"]
_detect_cmd.append(
"http://" + self.mcp_host + ':' + self.salt_port
)
# Try to call salt API on target host
_r = None
logger_cli.debug("... detecting env type '{}'".format(_type))
if self.env_name == ENV_LOCAL:
_r = shell(" ".join(_detect_cmd))
else:
_r = ssh_shell_p(
" ".join(_detect_cmd),
self.ssh_host,
username=self.ssh_user,
keypath=self.ssh_key,
piped=False,
use_sudo=self.ssh_uses_sudo,
silent=True
)
# Parse return
_r = _extract_salt_return(_r)
if len(_r) < 1:
return False
elif _r["return"] == "Welcome":
return True
else:
return False
elif _type == ENV_TYPE_KUBE:
_kube = get_kube_remote(self)
if not _kube.initialized:
logger_cli.debug(
"... failed to init kube using '{}'".format(
_kube.kConfigPath
)
)
return False
else:
logger_cli.debug(
"... config loaded from '{}'".format(
_kube.kConfigPath
)
)
try:
_vApi = _kube.get_versions_api()
_v = _vApi.get_code()
if hasattr(_v, "platform") and \
hasattr(_v, "major") and \
hasattr(_v, "minor"):
logger_cli.info(
"# Kube server found: {}:{} on '{}'".format(
_v.major,
_v.minor,
_kube.kConfigPath
)
)
return True
else:
return False
except Exception as e:
logger_cli.debug(
"... kube env error: '{}' ".format(
str(e)
)
)
return False
elif _type == ENV_TYPE_LINUX:
# Detect Linux env
from platform import system, release
_s = system()
_r = release()
logger_cli.debug("... running on {} {}".format(_s, _r))
if _s in ['Linux', 'Darwin']:
return True
else:
return False
else:
raise ConfigException(
"# Env type of '{}' is not supported".format(
_type
)
)
def _detect_types(self):
"""Try to detect env type based on the name
"""
self.detected_envs = []
logger_cli.info('# Detecting env types')
for _env in supported_envs:
if self._detect(_env):
logger_cli.info("# '{}' found".format(_env))
self.detected_envs.append(_env)
else:
logger_cli.info("# '{}' not found".format(_env))
return
def _init_mcp_values(self):
"""Load values from environment variables or put default ones
"""
# filter vars and preload if needed
self.salt_vars = []
self.kube_vars = []
for _key, _value in self.vars:
if _key in os.environ:
logger_cli.info(
"-> Using env var '{}={}'".format(_key, os.environ[_key])
)
if _key.startswith(ENV_TYPE_GLOB):
os.environ[_key] = _value
elif _key.startswith(ENV_TYPE_SALT):
self.salt_vars.append([_key, _value])
elif _key.startswith(ENV_TYPE_KUBE):
self.kube_vars.append([_key, _value])
else:
logger_cli.warn(
"Unsupported config variable: '{}={}'".format(
_key,
_value
)
)
self.name = "CheckerConfig"
self.working_folder = os.environ.get(
'CFG_TESTS_WORK_DIR',
_default_work_folder
)
self.date_format = "%Y-%m-%d %H:%M:%S.%f%z"
self.default_tz = "UTC"
self.pkg_versions_map = 'versions_map.csv'
# self.ssh_uses_sudo = False
self.ssh_key = os.environ.get('MCP_SSH_KEY', None)
self.ssh_user = os.environ.get('MCP_SSH_USER', None)
self.ssh_host = os.environ.get('MCP_SSH_HOST', None)
self.mcp_host = os.environ.get('MCP_ENV_HOST', None)
self.salt_port = os.environ.get('MCP_SALT_PORT', '6969')
self.threads = int(os.environ.get('MCP_THREADS', "5"))
self.skip_nodes = utils.node_string_to_list(os.environ.get(
'CFG_SKIP_NODES',
None
))
# prebuild user data and folder path
self.pw_user = pwd.getpwuid(os.getuid())
if self.env_name == "local":
pass
else:
if not self.ssh_key and not self.force_no_key:
raise ConfigException(
"Please, supply a key for the cluster's master node. "
"Use MCP_SSH_KEY, see 'etc/example.env'"
)
def _init_env_values(self):
if ENV_TYPE_SALT in self.detected_envs:
for _key, _value in self.salt_vars:
if _key not in os.environ:
os.environ[_key] = _value
self.salt_user = os.environ.get('SALT_USER', 'salt')
self.salt_timeout = os.environ.get('SALT_TIMEOUT', 30)
self.salt_file_root = os.environ.get(
'SALT_FILE_ROOT',
"/usr/share/salt-formulas/env/"
)
self.salt_scripts_folder = os.environ.get(
'SALT_SCRIPTS_FOLDER',
'cfg_checker_scripts'
)
elif ENV_TYPE_KUBE in self.detected_envs:
for _key, _value in self.kube_vars:
if _key not in os.environ:
os.environ[_key] = _value
self.kube_config_root = os.environ.get('KUBE_CONFIG_ROOT', "/root")
self.kube_scripts_folder = os.environ.get(
'KUBE_SCRIPTS_FOLDER',
"cfg-checker-scripts"
)
self.kube_node_user = os.environ.get(
'KUBE_NODE_USER',
'ubuntu'
)
self.kube_node_keypath = os.environ.get(
'KUBE_NODE_KEYPATH',
None
)
# Warn user only if Kube env is detected locally
if self.env_name == ENV_LOCAL:
if not os.path.exists(self.kube_config_path):
logger_cli.warn(
"Kube config path not found on local env: '{}'".format(
self.kube_config_path
)
)
# On local envs, KUBE_NODE_KEYPATH is mandatory and is
# provided to give cfg-checker access to kube nodes
if not self.kube_node_keypath and not self.force_no_key:
raise ConfigException(
"Please, supply a key for the cluster nodes. "
"Use KUBE_NODE_KEYPATH, see 'etc/example.env'. "
"Consider checking KUBE_NODE_USER as well"
)
self.kube_node_homepath = os.path.join(
'/home',
self.kube_node_user
)
else:
# Init key for nodes
# KUBE_NODE_KEYPATH is provided in case of node keys would be
# different to master node key, which is supplied
# using MCP_SSH_KEY (mandatory) and, for the most cases,
# should be the same for remote envs
if not self.kube_node_keypath and not self.force_no_key:
logger_cli.debug(
"... using MCP_SSH_KEY as node keys. "
"Supply KUBE_NODE_KEYPATH to update."
)
self.kube_node_keypath = self.ssh_key
self.kube_node_homepath = self.homepath
def _init_env(self, config_path, env_name=None):
"""Inits the environment vars from the env file
Uses simple validation for the values and names
Keyword Arguments:
env_name {str} -- environment name to search configuration
files in etc/<env_name>.env (default: {None})
env_type {str} -- environment type to use: salt/kube
Raises:
ConfigException -- on IO error when loading env file
ConfigException -- on env file failed validation
"""
# detect kubeconfig placement
_env_kubeconf_path = os.environ.get('KUBECONFIG', None)
if not os.path.exists(self.kube_config_path):
logger_cli.debug(
"... kubeconfig not detected at '{}'".format(
self.kube_config_path
)
)
# not exists, get KUBECONFIG var
if _env_kubeconf_path:
# get the env var path
self.kube_config_path = _env_kubeconf_path
logger_cli.debug(
"... KUBECONFIG var points to '{}'".format(
self.kube_config_path
)
)
self.kube_config_detected = True
else:
logger_cli.debug("... KUBECONFIG env var not found")
self.kube_config_path = None
self.kube_config_detected = False
else:
logger_cli.debug(
"... kubeconfig detected at '{}'".format(
self.kube_config_path
)
)
self.kube_config_detected = True
# try to load values from KUBECONF
_kube_conf = None
if self.kube_config_path:
with open(self.kube_config_path) as kF:
_kube_conf = yaml.load(kF, Loader=yaml.SafeLoader)
# extract host ip
try:
_server = _kube_conf["clusters"][0]["cluster"]["server"]
except KeyError as e:
logger_cli.debug(
"... failed to extract server ip: "
"no '{}' key in 'clusters/[0]/cluster/server".format(e)
)
except IndexError:
logger_cli.debug(
"... failed to extract server ip: empty cluster list"
)
_ip = _server.split(':')
self.remote_ip = _ip[1].replace('/', '')
logger_cli.debug("... detected ip: '{}'".format(self.remote_ip))
# load env file as init os.environment with its values
if os.path.isfile(config_path):
with open(config_path) as _f:
_list = _f.read().splitlines()
logger_cli.info(
"# Loading env vars from '{}'".format(
config_path
)
)
else:
raise ConfigException(
"# Failed to load enviroment vars from '{}'".format(
config_path
)
)
self.vars = []
for index in range(len(_list)):
_line = _list[index]
# skip comments
if _line.strip().startswith('#'):
continue
# validate
_errors = []
if len(_line) < 1:
_errors.append("Line {}: empty".format(index))
elif _line.find('=') < 0 or _line.count('=') > 1:
_errors.append("Line {}: {}".format(index, _line))
else:
# save values
_t = _line.split('=')
self.vars.append([_t[0], _t[1]])
# if there was errors, report them
if _errors:
raise ConfigException(
"# Environment file failed validation in lines: {}".format(
"\n".join(_errors)
)
)
else:
logger_cli.debug(
"... loaded total of '{}' vars".format(
len(_list)
)
)
self.env_name = env_name
def __init__(self, args):
"""Base configuration class. Only values that are common for all scripts
"""
self.ssh_uses_sudo = args.sudo
self.ssh_direct = args.ssh_direct
self.kube_config_path = args.kube_config
self.debug = args.debug
self.insecure = args.insecure
self.force_no_key = args.force_no_key
# Make sure we running on Python 3
if sys.version_info[0] < 3 and sys.version_info[1] < 5:
logger_cli.error("# ERROR: Python 3.5+ is required")
sys.exit(1)
else:
logger_cli.debug("### Python version is {}.{}".format(
sys.version_info[0],
sys.version_info[1]
))
# if env name is default, check var too
if args.env_name == ENV_LOCAL:
_env = os.getenv('MCP_ENV', None)
_env = _env if _env else args.env_name
_env_config_path = os.path.join(pkg_dir, 'etc', _env + '.env')
else:
_env = args.env_name
_env_config_path = args.env_config
# Init environment variables from file, validate
self._init_env(_env_config_path, env_name=_env)
# Load Common vars for any type of the env
self._init_mcp_values()
# Detect env types present
self._detect_types()
# handle forced env type var
_forced_type = os.getenv('MCP_TYPE_FORCE', None)
if _forced_type in supported_envs:
self.detected_envs.append(_forced_type)
elif _forced_type is not None:
logger_cli.warn(
"Unsupported forced type of '{}'".format(
_forced_type
)
)
# Check if any of the envs detected
if len(self.detected_envs) < 1:
if _env is None:
raise ConfigException("No environment types detected locally")
else:
raise ConfigException(
"No environment types detected at '{}'".format(
self.mcp_host
)
)
# initialize path to folders
if self.env_name == ENV_LOCAL:
# names and folders
self.user = self.pw_user.pw_name
self.homepath = self.pw_user.pw_dir
else:
# names and folders in case of remote env
self.user = self.ssh_user
self.homepath = os.path.join('/home', self.ssh_user)
# Init vars that is specific to detected envs only
self._init_env_values()