allow logical and/or in inv queries
diff --git a/reclass/datatypes/tests/test_exports.py b/reclass/datatypes/tests/test_exports.py
index e8c1c7c..68fba6c 100644
--- a/reclass/datatypes/tests/test_exports.py
+++ b/reclass/datatypes/tests/test_exports.py
@@ -5,13 +5,10 @@
#
from reclass.datatypes import Exports, Parameters
+from reclass.errors import ParseError
import unittest
-try:
- import unittest.mock as mock
-except ImportError:
- import mock
-class TestExportsNoMock(unittest.TestCase):
+class TestInvQuery(unittest.TestCase):
def test_overwrite_method(self):
e = Exports({'alpha': { 'one': 1, 'two': 2}})
@@ -20,33 +17,82 @@
e.initialise_interpolation()
self.assertEqual(e.as_dict(), d)
- def test_value_expr_exports(self):
+ def test_malformed_invquery(self):
+ with self.assertRaises(ParseError):
+ p = Parameters({'exp': '$[ exports:a exports:b == self:test_value ]'})
+ with self.assertRaises(ParseError):
+ p = Parameters({'exp': '$[ exports:a if exports:b self:test_value ]'})
+ with self.assertRaises(ParseError):
+ p = Parameters({'exp': '$[ exports:a if exports:b == ]'})
+ with self.assertRaises(ParseError):
+ p = Parameters({'exp': '$[ exports:a if exports:b == self:test_value and exports:c = self:test_value2 ]'})
+ with self.assertRaises(ParseError):
+ p = Parameters({'exp': '$[ exports:a if exports:b == self:test_value or exports:c == ]'})
+ with self.assertRaises(ParseError):
+ p = Parameters({'exp': '$[ exports:a if exports:b == self:test_value anddd exports:c == self:test_value2 ]'})
+
+ def test_value_expr_invquery(self):
e = {'node1': {'a': 1, 'b': 2}, 'node2': {'a': 3, 'b': 4}}
p = Parameters({'exp': '$[ exports:a ]'})
r = {'exp': {'node1': 1, 'node2': 3}}
p.interpolate(e)
self.assertEqual(p.as_dict(), r)
- def test_if_expr_exports(self):
+ def test_if_expr_invquery(self):
e = {'node1': {'a': 1, 'b': 2}, 'node2': {'a': 3, 'b': 4}}
p = Parameters({'exp': '$[ exports:a if exports:b == 4 ]'})
r = {'exp': {'node2': 3}}
p.interpolate(e)
self.assertEqual(p.as_dict(), r)
- def test_if_expr_exports_with_refs(self):
+ def test_if_expr_invquery_with_refs(self):
e = {'node1': {'a': 1, 'b': 2}, 'node2': {'a': 3, 'b': 4}}
p = Parameters({'exp': '$[ exports:a if exports:b == self:test_value ]', 'test_value': 2})
r = {'exp': {'node1': 1}, 'test_value': 2}
p.interpolate(e)
self.assertEqual(p.as_dict(), r)
- def test_list_if_expr_exports(self):
+ def test_list_if_expr_invquery(self):
e = {'node1': {'a': 1, 'b': 2}, 'node2': {'a': 3, 'b': 3}, 'node3': {'a': 3, 'b': 2}}
p = Parameters({'exp': '$[ if exports:b == 2 ]'})
r = {'exp': ['node1', 'node3']}
p.interpolate(e)
self.assertEqual(p.as_dict(), r)
+ def test_if_expr_invquery_wth_and(self):
+ e = {'node1': {'a': 1, 'b': 4, 'c': False}, 'node2': {'a': 3, 'b': 4, 'c': True}}
+ p = Parameters({'exp': '$[ exports:a if exports:b == 4 and exports:c == True ]'})
+ r = {'exp': {'node2': 3}}
+ p.interpolate(e)
+ self.assertEqual(p.as_dict(), r)
+
+ def test_if_expr_invquery_wth_or(self):
+ e = {'node1': {'a': 1, 'b': 4}, 'node2': {'a': 3, 'b': 3}}
+ p = Parameters({'exp': '$[ exports:a if exports:b == 4 or exports:b == 3 ]'})
+ r = {'exp': {'node1': 1, 'node2': 3}}
+ p.interpolate(e)
+ self.assertEqual(p.as_dict(), r)
+
+ def test_list_if_expr_invquery_with_and(self):
+ e = {'node1': {'a': 1, 'b': 2, 'c': 'green'}, 'node2': {'a': 3, 'b': 3}, 'node3': {'a': 3, 'b': 2, 'c': 'red'}}
+ p = Parameters({'exp': '$[ if exports:b == 2 and exports:c == green ]'})
+ r = {'exp': ['node1']}
+ p.interpolate(e)
+ self.assertEqual(p.as_dict(), r)
+
+ def test_list_if_expr_invquery_with_and_missing(self):
+ e = {'node1': {'a': 1, 'b': 2, 'c': 'green'}, 'node2': {'a': 3, 'b': 3}, 'node3': {'a': 3, 'b': 2}}
+ p = Parameters({'exp': '$[ if exports:b == 2 and exports:c == green ]'})
+ r = {'exp': ['node1']}
+ p.interpolate(e)
+ self.assertEqual(p.as_dict(), r)
+
+ def test_list_if_expr_invquery_with_and(self):
+ e = {'node1': {'a': 1, 'b': 2}, 'node2': {'a': 3, 'b': 3}, 'node3': {'a': 3, 'b': 4}}
+ p = Parameters({'exp': '$[ if exports:b == 2 or exports:b == 4 ]'})
+ r = {'exp': ['node1', 'node3']}
+ p.interpolate(e)
+ self.assertEqual(p.as_dict(), r)
+
if __name__ == '__main__':
unittest.main()
diff --git a/reclass/values/invitem.py b/reclass/values/invitem.py
index bd887b7..c3af2f8 100644
--- a/reclass/values/invitem.py
+++ b/reclass/values/invitem.py
@@ -14,13 +14,122 @@
_OBJ = 'OBJ'
_TEST = 'TEST'
_LIST_TEST = 'LIST_TEST'
+_LOGICAL = 'LOGICAL'
_VALUE = 'VALUE'
_IF = 'IF'
+_AND = 'AND'
+_OR = 'OR'
_EQUAL = '=='
_NOT_EQUAL = '!='
+
+class Element(object):
+
+ def __init__(self, expression, delimiter):
+ self._delimiter = delimiter
+ self._export_path = None
+ self._parameter_path = None
+ self._parameter_value = None
+ self._export_path, self._parameter_path, self._parameter_value = self._get_vars(expression[0][1], self._export_path, self._parameter_path, self._parameter_value)
+ self._export_path, self._parameter_path, self._parameter_value = self._get_vars(expression[2][1], self._export_path, self._parameter_path, self._parameter_value)
+ self._export_path.drop_first()
+ self._test = expression[1][1]
+
+ if self._parameter_path is not None:
+ self._parameter_path.drop_first()
+ self._refs = [ str(self._parameter_path) ]
+ else:
+ self._refs = []
+
+ def refs(self):
+ return self._refs
+
+ def value(self, context, items):
+ if self._parameter_path is not None:
+ self._parameter_value = self._resolve(self._parameter_path, context)
+
+ if self._export_path is None or self._parameter_value is None or self._test is None:
+ ExpressionError('Failed to render %s' % str(self))
+
+ if self._export_path.exists_in(items):
+ result = False
+ export_value = self._resolve(self._export_path, items)
+ if self._test == _EQUAL:
+ if export_value == self._parameter_value:
+ result = True
+ elif self._test == _NOT_EQUAL:
+ if export_value != self._parameter_value:
+ result = True
+ else:
+ raise ExpressionError('Unknown test {0}'.format(self._test))
+ return result
+ else:
+ return False
+
+ def _resolve(self, path, dictionary):
+ try:
+ return path.get_value(dictionary)
+ except KeyError as e:
+ raise UndefinedVariableError(str(path))
+
+ def _get_vars(self, var, export, parameter, value):
+ if isinstance(var, str):
+ path = DictPath(self._delimiter, var)
+ if path.path[0].lower() == 'exports':
+ export = path
+ elif path.path[0].lower() == 'self':
+ parameter = path
+ elif path.path[0].lower() == 'true':
+ value = True
+ elif path.path[0].lower() == 'false':
+ value = False
+ else:
+ value = var
+ else:
+ value = var
+ return export, parameter, value
+
+
+class Question(object):
+
+ def __init__(self, expression, delimiter):
+ self._elements = []
+ self._operators = []
+ self._delimiter = delimiter
+ self._refs = []
+ i = 0
+ while i < len(expression):
+ e = Element(expression[i:], self._delimiter)
+ self._elements.append(e)
+ self._refs.extend(e.refs())
+ i += 3
+ if i < len(expression):
+ self._operators.append(expression[i][1])
+ i += 1
+
+ def refs(self):
+ return self._refs
+
+ def value(self, context, items):
+ if len(self._elements) == 0:
+ return True
+ elif len(self._elements) == 1:
+ return self._elements[0].value(context, items)
+ else:
+ result = self._elements[0].value(context, items)
+ for i in range(0, len(self._elements)-1):
+ next_result = self._elements[i+1].value(context, items)
+ if self._operators[i] == _AND:
+ result = result and next_result
+ elif self._operators[i] == _OR:
+ result = result or next_result
+ else:
+ raise ExpressionError('Unknown operator {0} {1}'.format(self._operators[i], self.elements))
+ return result
+
+
class InvItem(Item):
def _get_parser():
@@ -47,6 +156,10 @@
token = tokens[0]
tokens[0] = (_TEST, token)
+ def _logical(string, location, tokens):
+ token = tokens[0]
+ tokens[0] = (_LOGICAL, token)
+
def _if(string, location, tokens):
token = tokens[0]
tokens[0] = (_IF, token)
@@ -65,15 +178,18 @@
white_space = pp.White().suppress()
end = pp.StringEnd()
- operator = (pp.Literal(_EQUAL) | pp.Literal(_NOT_EQUAL)).setParseAction(_test)
+ operator_test = (pp.Literal(_EQUAL) | pp.Literal(_NOT_EQUAL)).setParseAction(_test)
+ operator_logical = (pp.CaselessLiteral(_AND) | pp.CaselessLiteral(_OR)).setParseAction(_logical)
begin_if = pp.CaselessLiteral(_IF, ).setParseAction(_if)
obj = pp.Word(pp.printables).setParseAction(_object)
integer = pp.Word('0123456789-').setParseAction(_integer)
number = pp.Word('0123456789-.').setParseAction(_number)
item = integer | number | obj
+ single_test = white_space + item + white_space + operator_test + white_space + item
+ additional_test = white_space + operator_logical + single_test
expr_var = pp.Group(obj + pp.Optional(white_space) + end).setParseAction(_expr_var)
- expr_test = pp.Group(obj + white_space + begin_if + white_space + item + white_space + operator + white_space + item).setParseAction(_expr_test)
- expr_list_test = pp.Group(begin_if + white_space + item + white_space + operator + white_space + item).setParseAction(_expr_list_test)
+ expr_test = pp.Group(obj + white_space + begin_if + single_test + pp.ZeroOrMore(additional_test) + end).setParseAction(_expr_test)
+ expr_list_test = pp.Group(begin_if + single_test + pp.ZeroOrMore(additional_test) + end).setParseAction(_expr_list_test)
expr = pp.Optional(white_space) + (expr_test | expr_var | expr_list_test)
return expr
@@ -82,9 +198,6 @@
def __init__(self, item, delimiter):
self.type = Item.INV_QUERY
self._delimiter = delimiter
- self._expr_type = None
- self._refs = []
- self._expr = []
self._parse_expression(item.render(None, None))
def _parse_expression(self, expr):
@@ -97,22 +210,22 @@
self._expr_type = tokens[0][0]
self._expr = list(tokens[0][1])
else:
- raise ExpressionError('Failed to parse %s' % str(expr))
+ raise ExpressionError('Failed to parse %s' % str(self._expr))
- if self._expr_type == _TEST:
- export, parameter, value = self._get_vars(self._expr[2][1], None, None, None)
- export, parameter, value = self._get_vars(self._expr[4][1], export, parameter, value)
- if parameter is not None:
- path = parameter
- path.drop_first()
- self._refs.append(str(path))
+ if self._expr_type == _VALUE:
+ self._value_path = DictPath(self._delimiter, self._expr[0][1]).drop_first()
+ self._question = Question([], self._delimiter)
+ self._refs = []
+ elif self._expr_type == _TEST:
+ self._value_path = DictPath(self._delimiter, self._expr[0][1]).drop_first()
+ self._question = Question(self._expr[2:], self._delimiter)
+ self._refs = self._question.refs()
elif self._expr_type == _LIST_TEST:
- export, parameter, value = self._get_vars(self._expr[1][1], None, None, None)
- export, parameter, value = self._get_vars(self._expr[3][1], export, parameter, value)
- if parameter is not None:
- path = parameter
- path.drop_first()
- self._refs.append(str(path))
+ self._value_path = None
+ self._question = Question(self._expr[1:], self._delimiter)
+ self._refs = self._question.refs()
+ else:
+ raise ExpressionError('Unknown expression type: %s' % self._expr_type)
def assembleRefs(self, context):
return
@@ -124,10 +237,10 @@
return True
def has_references(self):
- return len(self._refs) > 0
+ return len(self._question.refs()) > 0
def get_references(self):
- return self._refs
+ return self._question.refs()
def _resolve(self, path, dictionary):
try:
@@ -137,99 +250,28 @@
def _value_expression(self, inventory):
results = {}
- path = DictPath(self._delimiter, self._expr[0][1]).drop_first()
for node, items in inventory.iteritems():
- if path.exists_in(items):
- results[node] = copy.deepcopy(self._resolve(path, items))
+ if self._value_path.exists_in(items):
+ results[node] = copy.deepcopy(self._resolve(self._value_path, items))
return results
def _test_expression(self, context, inventory):
- export_path = None
- parameter_path = None
- parameter_value = None
- test = None
- value_path = DictPath(self._delimiter, self._expr[0][1])
-
- if self._expr[3][1] == _EQUAL:
- test = _EQUAL
- elif self._expr[3][1] == _NOT_EQUAL:
- test = _NOT_EQUAL
-
- export_path, parameter_path, parameter_value = self._get_vars(self._expr[2][1], export_path, parameter_path, parameter_value)
- export_path, parameter_path, parameter_value = self._get_vars(self._expr[4][1], export_path, parameter_path, parameter_value)
-
- if parameter_path is not None:
- parameter_path.drop_first()
- parameter_value = self._resolve(parameter_path, context)
-
- if export_path is None or parameter_value is None or test is None or value_path is None:
+ if self._value_path is None:
ExpressionError('Failed to render %s' % str(self))
- export_path.drop_first()
- value_path.drop_first()
-
results = {}
for node, items in inventory.iteritems():
- if export_path.exists_in(items):
- export_value = self._resolve(export_path, items)
- test_passed = False
- if test == _EQUAL and export_value == parameter_value:
- test_passed = True
- elif test == _NOT_EQUAL and export_value != parameter_value:
- test_passed = True
- if test_passed:
- results[node] = copy.deepcopy(self._resolve(value_path, items))
+ if self._question.value(context, items):
+ results[node] = copy.deepcopy(self._resolve(self._value_path, items))
return results
def _list_test_expression(self, context, inventory):
- export_path = None
- parameter_path = None
- parameter_value = None
- test = None
-
- if self._expr[2][1] == _EQUAL:
- test = _EQUAL
- elif self._expr[2][1] == _NOT_EQUAL:
- test = _NOT_EQUAL
-
- export_path, parameter_path, parameter_value = self._get_vars(self._expr[1][1], export_path, parameter_path, parameter_value)
- export_path, parameter_path, parameter_value = self._get_vars(self._expr[3][1], export_path, parameter_path, parameter_value)
-
- if parameter_path is not None:
- parameter_path.drop_first()
- parameter_value = self._resolve(parameter_path, context)
-
- if export_path is None or parameter_value is None or test is None:
- ExpressionError('Failed to render %s' % str(self))
-
- export_path.drop_first()
-
results = []
for node, items in inventory.iteritems():
- if export_path.exists_in(items):
- export_value = self._resolve(export_path, items)
- test_passed = False
- if test == _EQUAL and export_value == parameter_value:
- test_passed = True
- elif test == _NOT_EQUAL and export_value != parameter_value:
- test_passed = True
- if test_passed:
- results.append(node)
+ if self._question.value(context, items):
+ results.append(node)
return results
- def _get_vars(self, var, export, parameter, value):
- if isinstance(var, str):
- path = DictPath(self._delimiter, var)
- if path.path[0].lower() == 'exports':
- export = path
- elif path.path[0].lower() == 'self':
- parameter = path
- else:
- value = var
- else:
- value = var
- return export, parameter, value
-
def render(self, context, inventory):
if self._expr_type == _VALUE:
return self._value_expression(inventory)