| # Author: Alex Savatieiev (osavatieiev@mirantis.com; a.savex@gmail.com) |
| # Copyright 2019-2022 Mirantis, Inc. |
| import argparse |
| import os |
| import sys |
| |
| from cfg_checker.common.exception import ConfigException, \ |
| CommandTypeNotSupportedException, CheckerException |
| |
| |
| class MyParser(argparse.ArgumentParser): |
| def error(self, message): |
| sys.stderr.write('Error: {0}\n\n'.format(message)) |
| self.print_help() |
| |
| |
| def get_skip_args(args): |
| if hasattr(args, "skip_nodes"): |
| _skip = getattr(args, "skip_nodes") |
| if _skip: |
| _skip = _skip.split(',') |
| else: |
| _skip = None |
| if hasattr(args, "skip_nodes_file"): |
| _skip_file = getattr(args, "skip_nodes_file") |
| else: |
| _skip_file = None |
| return _skip, _skip_file |
| |
| |
| def get_arg(args, str_arg, nofail=False): |
| if hasattr(args, str_arg): |
| return getattr(args, str_arg) |
| else: |
| if nofail: |
| return None |
| _c = args.command if hasattr(args, 'command') else '' |
| _t = args.type if hasattr(args, 'type') else '' |
| raise ConfigException( |
| "Argument '{}' not found executing: mcp-check {} {}".format( |
| str_arg, |
| _c, |
| _t |
| ) |
| ) |
| |
| |
| def get_path_arg(path): |
| if os.path.exists(path): |
| return path |
| else: |
| raise ConfigException("'{}' not exists".format(path)) |
| |
| |
| def get_network_map_type_and_filename(args): |
| if args.html or args.text: |
| if args.html and args.text: |
| raise ConfigException("Multuple report types not supported") |
| if args.html is not None: |
| return 'html', args.html |
| if args.text is not None: |
| return 'text', args.text |
| else: |
| return 'console', None |
| |
| |
| def get_package_report_type_and_filename(args): |
| if args.html or args.csv: |
| if args.html and args.csv: |
| raise ConfigException("Multuple report types not supported") |
| if args.html is not None: |
| return 'html', args.html |
| if args.csv is not None: |
| return 'csv', args.csv |
| else: |
| raise ConfigException("Report type and filename not set") |
| |
| |
| def check_supported_env(target_env, args, config): |
| def _raise_type_not_supported(): |
| raise CommandTypeNotSupportedException( |
| "'{}' -> '{}' is not supported on any of " |
| "the currently detected environments: " |
| "{}".format( |
| args.command, |
| args.type, |
| ",".join(config.detected_envs)) |
| ) |
| |
| def _check_target_vs_used(_tenv): |
| if not hasattr(args, 'force_env_type'): |
| return _tenv |
| elif args.force_env_type == _tenv or not args.force_env_type: |
| return _tenv |
| else: |
| raise CommandTypeNotSupportedException( |
| "'{}' -> '{}' is not supported " |
| "for selected env of '{}'".format( |
| args.command, |
| args.type, |
| args.force_env_type |
| ) |
| ) |
| |
| if isinstance(target_env, list): |
| _set = set(config.detected_envs).intersection(set(target_env)) |
| if len(_set) < 1: |
| _raise_type_not_supported() |
| if len(_set) > 1: |
| if not hasattr(args, 'force_env_type'): |
| raise CheckerException( |
| "Multiple envs detected: {}, use --use-env option".format( |
| ",".join(list(_set)) |
| ) |
| ) |
| elif args.force_env_type and args.force_env_type in _set: |
| return args.force_env_type |
| else: |
| _raise_type_not_supported() |
| else: |
| return _check_target_vs_used(list(_set)[0]) |
| elif isinstance(target_env, str): |
| if target_env not in config.detected_envs: |
| _raise_type_not_supported() |
| else: |
| return _check_target_vs_used(target_env) |
| else: |
| raise CheckerException( |
| "Unexpected target env type '{}' in '{}' -> '{}'".format( |
| target_env, |
| args.command, |
| args.type, |
| ) |
| ) |