blob: dc1f391c66f5dbc51f6c52c5564ae066090bcd75 [file] [log] [blame]
# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
# Copyright 2019-2022 Mirantis, Inc.
import argparse
import os
import sys
from cfg_checker.common.exception import ConfigException, \
CommandTypeNotSupportedException, CheckerException
class MyParser(argparse.ArgumentParser):
def error(self, message):
sys.stderr.write('Error: {0}\n\n'.format(message))
self.print_help()
def get_skip_args(args):
if hasattr(args, "skip_nodes"):
_skip = getattr(args, "skip_nodes")
if _skip:
_skip = _skip.split(',')
else:
_skip = None
if hasattr(args, "skip_nodes_file"):
_skip_file = getattr(args, "skip_nodes_file")
else:
_skip_file = None
return _skip, _skip_file
def get_arg(args, str_arg, nofail=False):
if hasattr(args, str_arg):
return getattr(args, str_arg)
else:
if nofail:
return None
_c = args.command if hasattr(args, 'command') else ''
_t = args.type if hasattr(args, 'type') else ''
raise ConfigException(
"Argument '{}' not found executing: mcp-check {} {}".format(
str_arg,
_c,
_t
)
)
def get_path_arg(path):
if os.path.exists(path):
return path
else:
raise ConfigException("'{}' not exists".format(path))
def get_network_map_type_and_filename(args):
if args.html or args.text:
if args.html and args.text:
raise ConfigException("Multuple report types not supported")
if args.html is not None:
return 'html', args.html
if args.text is not None:
return 'text', args.text
else:
return 'console', None
def get_package_report_type_and_filename(args):
if args.html or args.csv:
if args.html and args.csv:
raise ConfigException("Multuple report types not supported")
if args.html is not None:
return 'html', args.html
if args.csv is not None:
return 'csv', args.csv
else:
raise ConfigException("Report type and filename not set")
def check_supported_env(target_env, args, config):
def _raise_type_not_supported():
raise CommandTypeNotSupportedException(
"'{}' -> '{}' is not supported on any of "
"the currently detected environments: "
"{}".format(
args.command,
args.type,
",".join(config.detected_envs))
)
def _check_target_vs_used(_tenv):
if not hasattr(args, 'force_env_type'):
return _tenv
elif args.force_env_type == _tenv or not args.force_env_type:
return _tenv
else:
raise CommandTypeNotSupportedException(
"'{}' -> '{}' is not supported "
"for selected env of '{}'".format(
args.command,
args.type,
args.force_env_type
)
)
if isinstance(target_env, list):
_set = set(config.detected_envs).intersection(set(target_env))
if len(_set) < 1:
_raise_type_not_supported()
if len(_set) > 1:
if not hasattr(args, 'force_env_type'):
raise CheckerException(
"Multiple envs detected: {}, use --use-env option".format(
",".join(list(_set))
)
)
elif args.force_env_type and args.force_env_type in _set:
return args.force_env_type
else:
_raise_type_not_supported()
else:
return _check_target_vs_used(list(_set)[0])
elif isinstance(target_env, str):
if target_env not in config.detected_envs:
_raise_type_not_supported()
else:
return _check_target_vs_used(target_env)
else:
raise CheckerException(
"Unexpected target env type '{}' in '{}' -> '{}'".format(
target_env,
args.command,
args.type,
)
)