blob: 0b336f2ad95cbfefdffcca82c059e71e219a1676 [file] [log] [blame]
from cfg_checker.common import logger_cli
from cfg_checker.modules.network.mapper import SaltNetworkMapper, \
KubeNetworkMapper
from cfg_checker.modules.network.network_errors import NetworkErrors
from cfg_checker.reports import reporter
class NetworkChecker(object):
def __init__(self):
logger_cli.debug("... init error logs folder")
self.errors = NetworkErrors()
def _check_duplicate_ips(self):
# shortcuts
logger_cli.debug("... checking for duplicate ips")
_map = self.mapper.map
for _net in _map.keys():
_ips = set()
for _node_name, _interfaces in _map[_net].items():
for _if in _interfaces:
if _if["ip_address"] not in _ips:
# there was no such ip yet
_ips.add(_if["ip_address"])
else:
# this ip already used
logger_cli.warning(
"Warning: Duplicate ip address: "
"'{}: {}' at {} -> {}".format(
_if["interface"],
_if["ip_address"],
_net,
_node_name
)
)
self.errors.add_error(
self.errors.NET_DUPLICATE_IP,
network=_net,
node_name=_node_name,
if_name=_if["interface"],
ip_address=_if["ip_address"]
)
return
def _check_non_uniform_mtu(self):
# shortcuts
logger_cli.debug("... checking for duplicate ips")
_map = self.mapper.map
for _net in _map.keys():
_mtus = set()
for _node_name, _interfaces in _map[_net].items():
for _if in _interfaces:
if len(_mtus) < 1:
# this is the 1st iteration
_mtus.add(_if["rt_mtu"])
elif _if["rt_mtu"] not in _mtus:
# this ip already used
logger_cli.warning(
"Non-uniform MTU value of '{}' in '{}': "
"{}:{}:{}".format(
_net,
_node_name,
_if["interface"],
_if["ip_address"],
_if["rt_mtu"]
)
)
self.errors.add_error(
self.errors.NET_DUPLICATE_IP,
network=_net,
node_name=_node_name,
if_name=_if["interface"],
ip_address=_if["ip_address"],
mtu=_if["rt_mtu"]
)
return
def check_networks(self, map=True, skip_keywords=None):
# Load map
self.mapper.map_networks()
self.mapper.create_map(skip_keywords=skip_keywords)
# Check for errors that is not detectable during mapping
self._check_duplicate_ips()
self._check_non_uniform_mtu()
# print map if requested
if map:
self.mapper.print_map()
def print_summary(self):
logger_cli.info(self.errors.get_summary(print_zeros=False))
def print_error_details(self):
# Detailed errors
logger_cli.info(
"\n{}\n".format(
self.errors.get_errors()
)
)
def create_html_report(self, filename):
"""
Create static html showing network schema-like report
:return: none
"""
logger_cli.info("### Generating report to '{}'".format(filename))
_report = reporter.ReportToFile(
reporter.HTMLNetworkReport(self.mapper.master),
filename
)
_report(
{
"domain": self.mapper.domain,
"nodes": self.mapper.nodes,
"map": self.mapper.map,
"mcp_release": self.mapper.cluster['mcp_release'],
"openstack_release": self.mapper.cluster['openstack_release']
}
)
logger_cli.info("-> Done")
class SaltNetworkChecker(NetworkChecker):
def __init__(
self,
config,
skip_list=None,
skip_list_file=None
):
super(SaltNetworkChecker, self).__init__()
self.mapper = SaltNetworkMapper(
config,
errors_class=self.errors,
skip_list=skip_list,
skip_list_file=skip_list_file
)
class KubeNetworkChecker(NetworkChecker):
def __init__(
self,
config,
skip_list=None,
skip_list_file=None
):
super(KubeNetworkChecker, self).__init__()
self.mapper = KubeNetworkMapper(
config,
errors_class=self.errors,
skip_list=skip_list,
skip_list_file=skip_list_file
)