Parse XML results to JSON files
after running XCCDF or OVAL scans, parses results.xml file to
results.json one in the format acceptable by our `worp` API.
vendored-in is `untangle` lib v1.1.0
(https://github.com/stchris/untangle, MIT License)
Change-Id: I87f106c4b8b678e1b125ffab832f80ee261a4781
Related-Issue: https://mirantis.jira.com/browse/PROD-23159
Related-Issue: https://mirantis.jira.com/browse/PROD-23160
diff --git a/_modules/oscap/utils.py b/_modules/oscap/utils.py
index 164949d..f026fdc 100644
--- a/_modules/oscap/utils.py
+++ b/_modules/oscap/utils.py
@@ -1,11 +1,17 @@
+import collections
+import datetime
+import json
from lxml.etree import Element, SubElement, tostring
+import os.path
+import re
from subprocess import Popen, PIPE
import shlex
-import re
-import datetime
import salt.ext.six as six
+from oscap import untangle
+
+
def normalize_id(id,
xccdf_version='1.2',
typeof='profile',
@@ -16,6 +22,7 @@
return 'xccdf_org.{0}.content_{1}_{2}'.format(vendor, typeof, id)
return id
+
def build_tailoring(data, id):
xccdf_version = data.get('xccdf_version', '1.2')
ns = {None: 'http://checklists.nist.gov/xccdf/{}'.format(xccdf_version)}
@@ -26,12 +33,11 @@
tailoring.append(Element('benchmark', {'href': ext}))
now = datetime.datetime.now().isoformat()
- version = SubElement(tailoring, 'version', time=now).text = '1'
+ SubElement(tailoring, 'version', time=now).text = '1'
profile = SubElement(tailoring, 'Profile', id=pid, extends=ext)
- title = SubElement(profile, 'title').text = \
- 'Extends {}'.format(ext)
+ SubElement(profile, 'title').text = 'Extends {}'.format(ext)
for key, value in six.iteritems(data.get('values', {})):
idref = normalize_id(key, xccdf_version, typeof='value')
@@ -39,8 +45,122 @@
elem.text = str(value)
return tostring(tailoring, pretty_print=True)
+
def run(cmd, cwd=None):
# The Popen used here because the __salt__['cmd.run'] returns only stdout
proc = Popen(shlex.split(cmd), stdout=PIPE, stderr=PIPE, cwd=cwd)
(stdout, stderr) = proc.communicate()
return stdout, stderr, proc.returncode
+
+
+def _get_flatten_groups(document, groups=None):
+ groups = groups if groups else []
+ if hasattr(document, 'Group'):
+ for group in document.Group:
+ groups.append(group)
+ groups = _get_flatten_groups(group, groups)
+ return groups
+
+
+def _get_rules(groups):
+ rules = {}
+ for group in groups:
+ if hasattr(group, 'Rule'):
+ for rule in group.Rule:
+ rules[rule['id']] = {
+ 'title': rule.title.cdata,
+ 'severity': rule['severity'],
+ 'description': rule.description.cdata}
+ return rules
+
+
+def _parse_xccdf_doc(document):
+ groups = _get_flatten_groups(document.Benchmark)
+ rules = _get_rules(groups)
+
+ results = []
+ for result in document.Benchmark.TestResult.rule_result:
+ results.append({
+ 'rule': result['idref'],
+ 'result': result.result.cdata,
+ 'severity': result['severity'],
+ 'weight': result['weight'],
+ 'title': rules[result['idref']]['title'],
+ 'description': rules[result['idref']]['title']
+ })
+
+ return results
+
+
+def _sanitize_xccdf_xml(data):
+ data = data.replace(
+ '<html:code xmlns:html="http://www.w3.org/1999/xhtml">', '')
+ data = data.replace('</html:code>', '')
+ data = data.replace(
+ '<html:pre xmlns:html="http://www.w3.org/1999/xhtml">', '')
+ data = data.replace('<html:pre>', '')
+ data = data.replace('</html:pre>', '')
+ data = data.replace('<html:code>', '')
+ data = data.replace('<html:li>', '')
+ data = data.replace('</html:li>', '')
+ data = data.replace(
+ '<html:pre xmlns:html="http://www.w3.org/1999/xhtml" '
+ 'xmlns:ns0="http://checklists.nist.gov/xccdf/1.1">',
+ '')
+ data = data.replace(
+ '<html:br xmlns:html="http://www.w3.org/1999/xhtml"/>', '')
+ data = data.replace(
+ '<html:code xmlns:html="http://www.w3.org/1999/xhtml" '
+ 'xmlns:ns0="http://checklists.nist.gov/xccdf/1.1">',
+ '')
+ return data
+
+
+def xccdf_xml_to_json(xml_file):
+ with open(xml_file) as in_file:
+ raw_xml = in_file.read()
+ doc = untangle.parse(_sanitize_xccdf_xml(raw_xml))
+ results = _parse_xccdf_doc(doc)
+ with open(os.path.splitext(xml_file)[0] + '.json', 'w') as json_file:
+ # NOTE(pas-ha) the src/com/mirantis/mk.Common.parseJSON method
+ # from mk/pipeline-library that is used in our Jenkins pipelines
+ # can not parse the string representation of a list!
+ # only dict structure is supported as a top-level one there
+ json.dump({"results": results}, json_file)
+
+
+def _parse_oval_definitions(document):
+ definitions = {}
+ def_list = document.oval_results.oval_definitions.definitions.definition
+ for definition in def_list:
+ try:
+ definitions[definition['id']] = {'class': definition['class']}
+ def_dict = definitions[definition['id']]
+ def_dict['title'] = definition.metadata.title.cdata
+ def_dict['description'] = definition.metadata.description.cdata
+ def_dict['ref_id'] = definition.metadata.reference['ref_id']
+ def_dict['link'] = definition.metadata.reference['ref_url']
+ def_dict['severity'] = definition.metadata.advisory.severity.cdata
+ except AttributeError:
+ # NOTE(e0ne): inventory does't have definition.metadata.reference
+ pass
+
+ return definitions
+
+
+def oval_xml_to_json(xml_file):
+ document = untangle.parse(xml_file)
+ definitions = _parse_oval_definitions(document)
+ results = []
+ for defn in document.oval_results.results.system.definitions.definition:
+ res = collections.defaultdict(lambda: None)
+ res['id'] = defn['definition_id']
+ res['result'] = defn['result']
+ res.update(definitions[defn['definition_id']])
+ results.append(res)
+ with open(os.path.splitext(xml_file)[0] + '.json', 'w') as json_file:
+ # NOTE(pas-ha) the src/com/mirantis/mk.Common.parseJSON method
+ # from mk/pipeline-library that is used in our Jenkins pipelines
+ # can not parse the string representation of a list!
+ # only dict structure is supported as a top-level one there
+ json.dump({"results": results}, json_file)