blob: 778cdb14427fb095e58d10f70e530b5d309bb0bd [file] [log] [blame]
# Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com)
# Copyright 2019-2022 Mirantis, Inc.
import pkgutil
import sys
import traceback
from cfg_checker.cli.arguments import add_global_arguments
from cfg_checker.common import logger, logger_cli
from cfg_checker.common.exception import CheckerException, \
CommandNotSupportedException
from cfg_checker.common.settings import CheckerConfiguration
from cfg_checker.helpers.args_utils import MyParser
main_pkg_name = __name__.split('.')[0]
mods_package_name = "modules"
mods_import_path = main_pkg_name + '.' + mods_package_name
mods_prefix = mods_import_path + '.'
commands = {}
supports = {}
parsers_inits = {}
parsers = {}
helps = {}
# Pure dynamic magic, loading all 'do_*' methods from available modules
_m = __import__(mods_import_path, fromlist=[main_pkg_name])
for _imp, modName, isMod in pkgutil.iter_modules(_m.__path__, mods_prefix):
# iterate all packages, add to dict
if isMod:
# load module
_p = _imp.find_module(modName).load_module(modName)
# create a shortname
mod_name = modName.split('.')[-1]
# A package! Create it and add commands
commands[mod_name] = \
[_n[3:] for _n in dir(_p) if _n.startswith("do_")]
supports[mod_name] = _p.supported_envs
parsers_inits[mod_name] = getattr(_p, 'init_parser')
parsers[mod_name] = {}
helps[mod_name] = getattr(_p, 'command_help')
def execute_command(args, command, config):
# Validate the commands
# check commands
if command not in commands:
logger_cli.info("\n# Please, type a command listed above")
return 1
if not hasattr(args, 'type') or not args.type:
parsers[command].print_help()
logger_cli.info("\n# Please, type a command listed above")
return 1
_type = args.type.replace("-", "_") if "-" in args.type else args.type
if _type not in commands[command]:
# check type
logger_cli.info(
"\n# Please, select '{}' command type listed above".format(
command
)
)
return 1
else:
# check if command is supported
if len(
set(supports[command]).intersection(set(config.detected_envs))
) < 1:
raise CommandNotSupportedException(
"'{}' is not supported on any of the currently "
"detected environments: {}".format(
command,
",".join(config.detected_envs)
)
)
return 1
# form function name to call
_method_name = "do_" + _type
_target_module = __import__(
mods_prefix + command,
fromlist=[""]
)
_method = getattr(_target_module, _method_name)
# Execute the command
try:
# Compatibility between entrypoints
if not hasattr(args, 'command'):
args.command = command
_method(args, config)
return 0
except CommandNotSupportedException as e:
logger_cli.error("\n{}".format(e.message))
return 1
except CheckerException as e:
logger_cli.error("\n{}".format(e.message))
exc_type, exc_value, exc_traceback = sys.exc_info()
logger_cli.debug("\n{}".format(
"".join(traceback.format_exception(
exc_type,
exc_value,
exc_traceback
))
))
return 1
def cli_command(_title, _name):
my_parser = MyParser(_title)
parsers[_name] = parsers_inits[_name](my_parser)
# parse arguments
try:
add_global_arguments(my_parser)
args, unknown = my_parser.parse_known_args()
except TypeError:
logger_cli.info("\n# Please, check arguments")
sys.exit(1)
if unknown:
logger_cli.error(
"# Unexpected arguments: {}".format(
", ".join(["'{}'".format(a) for a in unknown])
)
)
sys.exit(1)
# Init the config
config = CheckerConfiguration(args)
# force use of sudo
config.ssh_uses_sudo = True
# Execute the command
try:
result = execute_command(args, _name, config)
logger.debug(result)
except CommandNotSupportedException as e:
logger_cli.error("\nERROR: {}".format(
e.message
))
sys.exit(result)