| import collections |
| import datetime |
| import json |
| from lxml.etree import Element, SubElement, tostring |
| import os.path |
| import re |
| from subprocess import Popen, PIPE |
| import shlex |
| |
| import salt.ext.six as six |
| |
| from oscap import untangle |
| |
| |
| def normalize_id(id, |
| xccdf_version='1.2', |
| typeof='profile', |
| vendor='mirantis'): |
| |
| if xccdf_version == '1.2': |
| if not re.match('^xccdf_[^_]+_{}_.+'.format(typeof), id): |
| return 'xccdf_org.{0}.content_{1}_{2}'.format(vendor, typeof, id) |
| return id |
| |
| |
| def build_tailoring(data, id): |
| xccdf_version = data.get('xccdf_version', '1.2') |
| ns = {None: 'http://checklists.nist.gov/xccdf/{}'.format(xccdf_version)} |
| tid = normalize_id(id, xccdf_version, typeof='tailoring') |
| pid = normalize_id(data['profile'], xccdf_version, vendor='customer') |
| ext = normalize_id(data['extends'], xccdf_version) |
| tailoring = Element('Tailoring', nsmap=ns, id=tid) |
| tailoring.append(Element('benchmark', {'href': ext})) |
| |
| now = datetime.datetime.now().isoformat() |
| SubElement(tailoring, 'version', time=now).text = '1' |
| |
| profile = SubElement(tailoring, 'Profile', id=pid, extends=ext) |
| |
| SubElement(profile, 'title').text = 'Extends {}'.format(ext) |
| |
| for key, value in six.iteritems(data.get('values', {})): |
| idref = normalize_id(key, xccdf_version, typeof='value') |
| elem = SubElement(profile, 'set-value', idref=idref) |
| elem.text = str(value) |
| return tostring(tailoring, pretty_print=True) |
| |
| |
| def run(cmd, cwd=None): |
| # The Popen used here because the __salt__['cmd.run'] returns only stdout |
| proc = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE, cwd=cwd) |
| (stdout, stderr) = proc.communicate() |
| return stdout, stderr, proc.returncode |
| |
| |
| def _get_flatten_groups(document, groups=None): |
| groups = groups if groups else [] |
| if hasattr(document, 'Group'): |
| for group in document.Group: |
| groups.append(group) |
| groups = _get_flatten_groups(group, groups) |
| return groups |
| |
| |
| def _get_rules(groups): |
| rules = {} |
| for group in groups: |
| if hasattr(group, 'Rule'): |
| for rule in group.Rule: |
| rules[rule['id']] = { |
| 'title': rule.title.cdata, |
| 'severity': rule['severity'], |
| 'ident': rule.ident.cdata, |
| 'description': rule.description.cdata} |
| return rules |
| |
| |
| def _parse_xccdf_doc(document): |
| groups = _get_flatten_groups(document.Benchmark) |
| rules = _get_rules(groups) |
| |
| results = [] |
| for result in document.Benchmark.TestResult.rule_result: |
| results.append({ |
| 'rule': result['idref'], |
| 'result': result.result.cdata, |
| 'severity': result['severity'], |
| 'weight': result['weight'], |
| 'title': rules[result['idref']]['title'], |
| 'cis_code': rules[result['idref']]['ident'], |
| 'description': rules[result['idref']]['title'] |
| }) |
| |
| return results |
| |
| |
| def _sanitize_xccdf_xml(data): |
| data = data.replace( |
| '<html:code xmlns:html="http://www.w3.org/1999/xhtml">', '') |
| data = data.replace('</html:code>', '') |
| data = data.replace( |
| '<html:pre xmlns:html="http://www.w3.org/1999/xhtml">', '') |
| data = data.replace('<html:pre>', '') |
| data = data.replace('</html:pre>', '') |
| data = data.replace('<html:code>', '') |
| data = data.replace('<html:li>', '') |
| data = data.replace('</html:li>', '') |
| data = data.replace( |
| '<html:pre xmlns:html="http://www.w3.org/1999/xhtml" ' |
| 'xmlns:ns0="http://checklists.nist.gov/xccdf/1.1">', |
| '') |
| data = data.replace( |
| '<html:br xmlns:html="http://www.w3.org/1999/xhtml"/>', '') |
| data = data.replace( |
| '<html:code xmlns:html="http://www.w3.org/1999/xhtml" ' |
| 'xmlns:ns0="http://checklists.nist.gov/xccdf/1.1">', |
| '') |
| return data |
| |
| |
| def xccdf_xml_to_json(xml_file): |
| with open(xml_file) as in_file: |
| raw_xml = in_file.read() |
| doc = untangle.parse(_sanitize_xccdf_xml(raw_xml)) |
| results = _parse_xccdf_doc(doc) |
| with open(os.path.splitext(xml_file)[0] + '.json', 'w') as json_file: |
| # NOTE(pas-ha) the src/com/mirantis/mk.Common.parseJSON method |
| # from mk/pipeline-library that is used in our Jenkins pipelines |
| # can not parse the string representation of a list! |
| # only dict structure is supported as a top-level one there |
| json.dump({"results": results}, json_file) |
| |
| |
| def _parse_oval_definitions(document): |
| definitions = {} |
| def_list = document.oval_results.oval_definitions.definitions.definition |
| for definition in def_list: |
| try: |
| definitions[definition['id']] = {'class': definition['class']} |
| def_dict = definitions[definition['id']] |
| def_dict['title'] = definition.metadata.title.cdata |
| def_dict['description'] = definition.metadata.description.cdata |
| def_dict['ref_id'] = definition.metadata.reference['ref_id'] |
| def_dict['link'] = definition.metadata.reference['ref_url'] |
| def_dict['severity'] = definition.metadata.advisory.severity.cdata |
| except (AttributeError, IndexError): |
| # NOTE(e0ne): inventory does't have some element |
| pass |
| |
| return definitions |
| |
| |
| def oval_xml_to_json(xml_file): |
| document = untangle.parse(xml_file) |
| definitions = _parse_oval_definitions(document) |
| results = [] |
| for defn in document.oval_results.results.system.definitions.definition: |
| res = collections.defaultdict(lambda: None) |
| res['id'] = defn['definition_id'] |
| res['result'] = defn['result'] |
| res.update(definitions[defn['definition_id']]) |
| results.append(res) |
| with open(os.path.splitext(xml_file)[0] + '.json', 'w') as json_file: |
| # NOTE(pas-ha) the src/com/mirantis/mk.Common.parseJSON method |
| # from mk/pipeline-library that is used in our Jenkins pipelines |
| # can not parse the string representation of a list! |
| # only dict structure is supported as a top-level one there |
| json.dump({"results": results}, json_file) |