blob: adb1cb6cd3b93ff7aee34efed8daab435b392b49 [file] [log] [blame]
#
# -*- coding: utf-8 -*-
#
# This file is part of reclass
#
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals
import copy
import itertools as it
import operator
import pyparsing as pp
from six import iteritems
from six import string_types
from reclass.values import item
from reclass.values import parser_funcs
from reclass.settings import Settings
from reclass.utils.dictpath import DictPath
from reclass.errors import ExpressionError, ParseError, ResolveError
# TODO: generalize expression handling.
class BaseTestExpression(object):
known_operators = {}
def __init__(self, delimiter):
self._delimiter = delimiter
self.refs = []
self.inv_refs = []
class EqualityTest(BaseTestExpression):
known_operators = { parser_funcs.EQUAL: operator.eq,
parser_funcs.NOT_EQUAL: operator.ne}
def __init__(self, expression, delimiter):
# expression is a list of at least three tuples, of which first element
# is a string tag, second is subelement value; other tuples apparently
# are not used.
# expression[0][1] effectively contains export path and apparently must
# be treated as such, also left hand operand in comparison
# expression[1][1] appa holds commparison operator == or !=
# expression[2][1] is the righhand operand
super(EqualityTest, self).__init__(delimiter)
# TODO: this double sommersault must be cleaned
_ = self._get_vars(expression[2][1], *self._get_vars(expression[0][1]))
self._export_path, self._parameter_path, self._parameter_value = _
try:
self._export_path.drop_first()
except AttributeError:
raise ExpressionError('No export')
try:
self._compare = self.known_operators[expression[1][1]]
except KeyError as e:
msg = 'Unknown test {0}'.format(expression[1][1])
raise ExpressionError(msg, tbFlag=False)
self.inv_refs = [self._export_path]
if self._parameter_path is not None:
self._parameter_path.drop_first()
self.refs = [str(self._parameter_path)]
def value(self, context, items):
if self._parameter_path is not None:
self._parameter_value = self._resolve(self._parameter_path,
context)
if self._parameter_value is None:
raise ExpressionError('Failed to render %s' % str(self),
tbFlag=False)
if self._export_path.exists_in(items):
export_value = self._resolve(self._export_path, items)
return self._compare(export_value, self._parameter_value)
return False
def _resolve(self, path, dictionary):
try:
return path.get_value(dictionary)
except KeyError as e:
raise ResolveError(str(path))
def _get_vars(self, var, export=None, parameter=None, value=None):
if isinstance(var, string_types):
path = DictPath(self._delimiter, var)
if path.path[0].lower() == 'exports':
export = path
elif path.path[0].lower() == 'self':
parameter = path
elif path.path[0].lower() == 'true':
value = True
elif path.path[0].lower() == 'false':
value = False
else:
value = var
else:
value = var
return export, parameter, value
class LogicTest(BaseTestExpression):
known_operators = { parser_funcs.AND: operator.and_,
parser_funcs.OR: operator.or_}
def __init__(self, expr, delimiter):
super(LogicTest, self).__init__(delimiter)
subtests = list(it.compress(expr, it.cycle([1, 1, 1, 0])))
self._els = [EqualityTest(subtests[j:j+3], self._delimiter)
for j in range(0, len(subtests), 3)]
for x in self._els:
self.refs.extend(x.refs)
self.inv_refs.extend(x.inv_refs)
try:
self._ops = [self.known_operators[x[1]] for x in expr[3::4]]
except KeyError as e:
msg = 'Unknown operator {0} {1}'.format(e.messsage, self._els)
raise ExpressionError(msg, tbFlag=False)
def value(self, context, items):
if len(self._els) == 0: # NOTE: possible logic error
return True
result = self._els[0].value(context, items)
for op, next_el in zip(self._ops, self._els[1:]):
result = op(result, next_el.value(context, items))
return result
class InvItem(item.Item):
type = item.ItemTypes.INV_QUERY
def __init__(self, newitem, settings):
super(InvItem, self).__init__(newitem.render(None, None), settings)
self.needs_all_envs = False
self.has_inv_query = True
self.ignore_failed_render = (
self._settings.inventory_ignore_failed_render)
self._parse_expression(self.contents)
def _parse_expression(self, expr):
parser = parser_funcs.get_expression_parser()
try:
tokens = parser.parseString(expr).asList()
except pp.ParseException as e:
raise ParseError(e.msg, e.line, e.col, e.lineno)
if len(tokens) == 2: # options are set
passed_opts = [x[1] for x in tokens.pop(0)]
self.ignore_failed_render = parser_funcs.IGNORE_ERRORS in passed_opts
self.needs_all_envs = parser_funcs.ALL_ENVS in passed_opts
elif len(tokens) > 2:
raise ExpressionError('Failed to parse %s' % str(tokens),
tbFlag=False)
self._expr_type = tokens[0][0]
self._expr = list(tokens[0][1])
if self._expr_type == parser_funcs.VALUE:
self._value_path = DictPath(self._settings.delimiter,
self._expr[0][1]).drop_first()
self._question = LogicTest([], self._settings.delimiter)
self.refs = []
self.inv_refs = [self._value_path]
elif self._expr_type == parser_funcs.TEST:
self._value_path = DictPath(self._settings.delimiter,
self._expr[0][1]).drop_first()
self._question = LogicTest(self._expr[2:], self._settings.delimiter)
self.refs = self._question.refs
self.inv_refs = self._question.inv_refs
self.inv_refs.append(self._value_path)
elif self._expr_type == parser_funcs.LIST_TEST:
self._value_path = None
self._question = LogicTest(self._expr[1:], self._settings.delimiter)
self.refs = self._question.refs
self.inv_refs = self._question.inv_refs
else:
msg = 'Unknown expression type: %s'
raise ExpressionError(msg % self._expr_type, tbFlag=False)
@property
def has_references(self):
return len(self._question.refs) > 0
def get_references(self):
return self._question.refs
def assembleRefs(self, context):
return
def get_inv_references(self):
return self.inv_refs
def _resolve(self, path, dictionary):
try:
return path.get_value(dictionary)
except KeyError as e:
raise ResolveError(str(path))
def _value_expression(self, inventory):
results = {}
for (node, items) in iteritems(inventory):
if self._value_path.exists_in(items):
results[node] = copy.deepcopy(self._resolve(self._value_path,
items))
return results
def _test_expression(self, context, inventory):
if self._value_path is None:
msg = 'Failed to render %s'
raise ExpressionError(msg % str(self), tbFlag=False)
results = {}
for node, items in iteritems(inventory):
if (self._question.value(context, items) and
self._value_path.exists_in(items)):
results[node] = copy.deepcopy(
self._resolve(self._value_path, items))
return results
def _list_test_expression(self, context, inventory):
results = []
for (node, items) in iteritems(inventory):
if self._question.value(context, items):
results.append(node)
return results
def render(self, context, inventory):
if self._expr_type == parser_funcs.VALUE:
return self._value_expression(inventory)
elif self._expr_type == parser_funcs.TEST:
return self._test_expression(context, inventory)
elif self._expr_type == parser_funcs.LIST_TEST:
return self._list_test_expression(context, inventory)
raise ExpressionError('Failed to render %s' % str(self), tbFlag=False)
def __str__(self):
return ' '.join(str(j) for i,j in self._expr)
def __repr__(self):
# had to leave it here for now as the behaviour differs from basic
return 'InvItem(%r)' % self._expr