Reduce complexity during import phase
In the current state of the negative testing framework the import phase
consists in many complex operation during import phase. This fix reduces
the complexity during that phase to improve the performance,
debugability and robustness.
In this fix we only determine the test scenarios in the import phase. The
payloads are created while test execution. This also improves the
flexibility since the test executor has full access to the test variables.
Change-Id: I8deed33d37c5bf07c26118d6d66ada105d259c89
Partially-implements: bp autogen-negative-tests
diff --git a/tempest/common/generator/base_generator.py b/tempest/common/generator/base_generator.py
index 0398af1..ae4e743 100644
--- a/tempest/common/generator/base_generator.py
+++ b/tempest/common/generator/base_generator.py
@@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import copy
import functools
import jsonschema
@@ -30,9 +31,11 @@
return expected_result
-def generator_type(*args):
+def generator_type(*args, **kwargs):
def wrapper(func):
func.types = args
+ for key in kwargs:
+ setattr(func, key, kwargs[key])
return func
return wrapper
@@ -106,37 +109,71 @@
jsonschema.Draft4Validator.check_schema(schema['json-schema'])
jsonschema.validate(schema, self.schema)
- def generate(self, schema):
+ def generate_scenarios(self, schema, path=None):
"""
- Generate an json dictionary based on a schema.
- Only one value is mis-generated for each dictionary created.
+ Generates the scenario (all possible test cases) out of the given
+ schema.
- Any generator must return a list of tuples or a single tuple.
- The values of this tuple are:
- result[0]: Name of the test
- result[1]: json schema for the test
- result[2]: expected result of the test (can be None)
+ :param schema: a dict style schema (see ``BasicGeneratorSet.schema``)
+ :param path: the schema path if the given schema is a subschema
"""
- LOG.debug("generate_invalid: %s" % schema)
- schema_type = schema["type"]
- if isinstance(schema_type, list):
+ schema_type = schema['type']
+ scenarios = []
+
+ if schema_type == 'object':
+ properties = schema["properties"]
+ for attribute, definition in properties.iteritems():
+ current_path = copy.copy(path)
+ if path is not None:
+ current_path.append(attribute)
+ else:
+ current_path = [attribute]
+ scenarios.extend(
+ self.generate_scenarios(definition, current_path))
+ elif isinstance(schema_type, list):
if "integer" in schema_type:
schema_type = "integer"
else:
raise Exception("non-integer list types not supported")
- result = []
- if schema_type not in self.types_dict:
- raise TypeError("generator (%s) doesn't support type: %s"
- % (self.__class__.__name__, schema_type))
for generator in self.types_dict[schema_type]:
- ret = generator(schema)
- if ret is not None:
- if isinstance(ret, list):
- result.extend(ret)
- elif isinstance(ret, tuple):
- result.append(ret)
- else:
- raise Exception("generator (%s) returns invalid result: %s"
- % (generator, ret))
- LOG.debug("result: %s" % result)
- return result
+ if hasattr(generator, "needed_property"):
+ prop = generator.needed_property
+ if (prop not in schema or
+ schema[prop] is None or
+ schema[prop] is False):
+ continue
+
+ name = generator.__name__
+ if path is not None:
+ name = "%s_%s" % ("_".join(path), name)
+ scenarios.append({
+ "_negtest_name": name,
+ "_negtest_generator": generator,
+ "_negtest_schema": schema,
+ "_negtest_path": path})
+ return scenarios
+
+ def generate_payload(self, test, schema):
+ """
+ Generates one jsonschema out of the given test. It's mandatory to use
+ generate_scenarios before to register all needed variables to the test.
+
+ :param test: A test object (scenario) with all _negtest variables on it
+ :param schema: schema for the test
+ """
+ generator = test._negtest_generator
+ ret = generator(test._negtest_schema)
+ path = copy.copy(test._negtest_path)
+ expected_result = None
+
+ if ret is not None:
+ generator_result = generator(test._negtest_schema)
+ invalid_snippet = generator_result[1]
+ expected_result = generator_result[2]
+ element = path.pop()
+ if len(path) > 0:
+ schema_snip = reduce(dict.get, path, schema)
+ schema_snip[element] = invalid_snippet
+ else:
+ schema[element] = invalid_snippet
+ return expected_result
diff --git a/tempest/common/generator/negative_generator.py b/tempest/common/generator/negative_generator.py
index 4f3d2cd..1d5ed43 100644
--- a/tempest/common/generator/negative_generator.py
+++ b/tempest/common/generator/negative_generator.py
@@ -47,65 +47,32 @@
if min_length > 0:
return "x" * (min_length - 1)
- @base.generator_type("string")
+ @base.generator_type("string", needed_property="maxLength")
@base.simple_generator
def gen_str_max_length(self, schema):
max_length = schema.get("maxLength", -1)
- if max_length > -1:
- return "x" * (max_length + 1)
+ return "x" * (max_length + 1)
- @base.generator_type("integer")
+ @base.generator_type("integer", needed_property="minimum")
@base.simple_generator
def gen_int_min(self, schema):
- if "minimum" in schema:
- minimum = schema["minimum"]
- if "exclusiveMinimum" not in schema:
- minimum -= 1
- return minimum
+ minimum = schema["minimum"]
+ if "exclusiveMinimum" not in schema:
+ minimum -= 1
+ return minimum
- @base.generator_type("integer")
+ @base.generator_type("integer", needed_property="maximum")
@base.simple_generator
def gen_int_max(self, schema):
- if "maximum" in schema:
- maximum = schema["maximum"]
- if "exclusiveMaximum" not in schema:
- maximum += 1
- return maximum
+ maximum = schema["maximum"]
+ if "exclusiveMaximum" not in schema:
+ maximum += 1
+ return maximum
- @base.generator_type("object")
- def gen_obj_remove_attr(self, schema):
- invalids = []
- valid_schema = valid.ValidTestGenerator().generate_valid(schema)
- required = schema.get("required", [])
- for r in required:
- new_valid = copy.deepcopy(valid_schema)
- del new_valid[r]
- invalids.append(("gen_obj_remove_attr", new_valid, None))
- return invalids
-
- @base.generator_type("object")
+ @base.generator_type("object", needed_property="additionalProperties")
@base.simple_generator
def gen_obj_add_attr(self, schema):
valid_schema = valid.ValidTestGenerator().generate_valid(schema)
- if not schema.get("additionalProperties", True):
- new_valid = copy.deepcopy(valid_schema)
- new_valid["$$$$$$$$$$"] = "xxx"
- return new_valid
-
- @base.generator_type("object")
- def gen_inv_prop_obj(self, schema):
- LOG.debug("generate_invalid_object: %s" % schema)
- valid_schema = valid.ValidTestGenerator().generate_valid(schema)
- invalids = []
- properties = schema["properties"]
-
- for k, v in properties.iteritems():
- for invalid in self.generate(v):
- LOG.debug(v)
- new_valid = copy.deepcopy(valid_schema)
- new_valid[k] = invalid[1]
- name = "prop_%s_%s" % (k, invalid[0])
- invalids.append((name, new_valid, invalid[2]))
-
- LOG.debug("generate_invalid_object return: %s" % invalids)
- return invalids
+ new_valid = copy.deepcopy(valid_schema)
+ new_valid["$$$$$$$$$$"] = "xxx"
+ return new_valid
diff --git a/tempest/common/generator/valid_generator.py b/tempest/common/generator/valid_generator.py
index 0d7b398..7b80afc 100644
--- a/tempest/common/generator/valid_generator.py
+++ b/tempest/common/generator/valid_generator.py
@@ -54,5 +54,28 @@
obj[k] = self.generate_valid(v)
return obj
+ def generate(self, schema):
+ schema_type = schema["type"]
+ if isinstance(schema_type, list):
+ if "integer" in schema_type:
+ schema_type = "integer"
+ else:
+ raise Exception("non-integer list types not supported")
+ result = []
+ if schema_type not in self.types_dict:
+ raise TypeError("generator (%s) doesn't support type: %s"
+ % (self.__class__.__name__, schema_type))
+ for generator in self.types_dict[schema_type]:
+ ret = generator(schema)
+ if ret is not None:
+ if isinstance(ret, list):
+ result.extend(ret)
+ elif isinstance(ret, tuple):
+ result.append(ret)
+ else:
+ raise Exception("generator (%s) returns invalid result: %s"
+ % (generator, ret))
+ return result
+
def generate_valid(self, schema):
return self.generate(schema)[0][1]
diff --git a/tempest/test.py b/tempest/test.py
index 4a22b1b..5a71b7c 100644
--- a/tempest/test.py
+++ b/tempest/test.py
@@ -510,13 +510,9 @@
"expected_result": expected_result
}))
if schema is not None:
- for name, schema, expected_result in generator.generate(schema):
- if (expected_result is None and
- "default_result_code" in description):
- expected_result = description["default_result_code"]
- scenario_list.append((name,
- {"schema": schema,
- "expected_result": expected_result}))
+ for scenario in generator.generate_scenarios(schema):
+ scenario_list.append((scenario['_negtest_name'],
+ scenario))
LOG.debug(scenario_list)
return scenario_list
@@ -546,8 +542,14 @@
"""
LOG.info("Executing %s" % description["name"])
LOG.debug(description)
+ generator = importutils.import_class(
+ CONF.negative.test_generator)()
+ schema = description.get("json-schema", None)
method = description["http-method"]
url = description["url"]
+ expected_result = None
+ if "default_result_code" in description:
+ expected_result = description["default_result_code"]
resources = [self.get_resource(r) for
r in description.get("resources", [])]
@@ -557,13 +559,19 @@
# entry (see get_resource).
# We just send a valid json-schema with it
valid_schema = None
- schema = description.get("json-schema", None)
if schema:
valid_schema = \
valid.ValidTestGenerator().generate_valid(schema)
new_url, body = self._http_arguments(valid_schema, url, method)
- elif hasattr(self, "schema"):
- new_url, body = self._http_arguments(self.schema, url, method)
+ elif hasattr(self, "_negtest_name"):
+ schema_under_test = \
+ valid.ValidTestGenerator().generate_valid(schema)
+ local_expected_result = \
+ generator.generate_payload(self, schema_under_test)
+ if local_expected_result is not None:
+ expected_result = local_expected_result
+ new_url, body = \
+ self._http_arguments(schema_under_test, url, method)
else:
raise Exception("testscenarios are not active. Please make sure "
"that your test runner supports the load_tests "
@@ -575,7 +583,7 @@
client = self.client
resp, resp_body = client.send_request(method, new_url,
resources, body=body)
- self._check_negative_response(resp.status, resp_body)
+ self._check_negative_response(expected_result, resp.status, resp_body)
def _http_arguments(self, json_dict, url, method):
LOG.debug("dict: %s url: %s method: %s" % (json_dict, url, method))
@@ -586,8 +594,7 @@
else:
return url, json.dumps(json_dict)
- def _check_negative_response(self, result, body):
- expected_result = getattr(self, "expected_result", None)
+ def _check_negative_response(self, expected_result, result, body):
self.assertTrue(result >= 400 and result < 500 and result != 413,
"Expected client error, got %s:%s" %
(result, body))
diff --git a/tempest/tests/negative/test_negative_auto_test.py b/tempest/tests/negative/test_negative_auto_test.py
index dddd083..fb1da43 100644
--- a/tempest/tests/negative/test_negative_auto_test.py
+++ b/tempest/tests/negative/test_negative_auto_test.py
@@ -43,9 +43,9 @@
def _check_prop_entries(self, result, entry):
entries = [a for a in result if entry in a[0]]
self.assertIsNotNone(entries)
- self.assertIs(len(entries), 2)
+ self.assertGreater(len(entries), 1)
for entry in entries:
- self.assertIsNotNone(entry[1]['schema'])
+ self.assertIsNotNone(entry[1]['_negtest_name'])
def _check_resource_entries(self, result, entry):
entries = [a for a in result if entry in a[0]]
@@ -57,12 +57,11 @@
def test_generate_scenario(self):
scenarios = test.NegativeAutoTest.\
generate_scenario(self.fake_input_desc)
-
self.assertIsInstance(scenarios, list)
for scenario in scenarios:
self.assertIsInstance(scenario, tuple)
self.assertIsInstance(scenario[0], str)
self.assertIsInstance(scenario[1], dict)
- self._check_prop_entries(scenarios, "prop_minRam")
- self._check_prop_entries(scenarios, "prop_minDisk")
+ self._check_prop_entries(scenarios, "minRam")
+ self._check_prop_entries(scenarios, "minDisk")
self._check_resource_entries(scenarios, "inv_res")
diff --git a/tempest/tests/negative/test_negative_generators.py b/tempest/tests/negative/test_negative_generators.py
index a7af619..2fa6933 100644
--- a/tempest/tests/negative/test_negative_generators.py
+++ b/tempest/tests/negative/test_negative_generators.py
@@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
+import copy
+
import jsonschema
import mock
@@ -86,15 +88,6 @@
class BaseNegativeGenerator(object):
types = ['string', 'integer', 'object']
- fake_input_str = {"type": "string",
- "minLength": 2,
- "maxLength": 8,
- 'results': {'gen_int': 404}}
-
- fake_input_int = {"type": "integer",
- "maximum": 255,
- "minimum": 1}
-
fake_input_obj = {"type": "object",
"properties": {"minRam": {"type": "integer"},
"diskName": {"type": "string"},
@@ -106,31 +99,21 @@
"type": "not_defined"
}
- def _validate_result(self, data):
- self.assertTrue(isinstance(data, list))
- for t in data:
- self.assertIsInstance(t, tuple)
- self.assertEqual(3, len(t))
- self.assertIsInstance(t[0], str)
+ class fake_test_class(object):
+ def __init__(self, scenario):
+ for k, v in scenario.iteritems():
+ setattr(self, k, v)
- def test_generate_string(self):
- result = self.generator.generate(self.fake_input_str)
- self._validate_result(result)
-
- def test_generate_integer(self):
- result = self.generator.generate(self.fake_input_int)
- self._validate_result(result)
-
- def test_generate_obj(self):
- result = self.generator.generate(self.fake_input_obj)
- self._validate_result(result)
+ def _validate_result(self, valid_schema, invalid_schema):
+ for k, v in valid_schema.iteritems():
+ self.assertTrue(k in invalid_schema)
def test_generator_mandatory_functions(self):
for data_type in self.types:
self.assertIn(data_type, self.generator.types_dict)
def test_generate_with_unknown_type(self):
- self.assertRaises(TypeError, self.generator.generate,
+ self.assertRaises(TypeError, self.generator.generate_payload,
self.unknown_type_schema)
@@ -151,3 +134,16 @@
def setUp(self):
super(TestNegativeNegativeGenerator, self).setUp()
self.generator = negative_generator.NegativeTestGenerator()
+
+ def test_generate_obj(self):
+ schema = self.fake_input_obj
+ scenarios = self.generator.generate_scenarios(schema)
+ for scenario in scenarios:
+ test = self.fake_test_class(scenario)
+ valid_schema = \
+ valid_generator.ValidTestGenerator().generate_valid(schema)
+ schema_under_test = copy.copy(valid_schema)
+ expected_result = \
+ self.generator.generate_payload(test, schema_under_test)
+ self.assertEqual(expected_result, None)
+ self._validate_result(valid_schema, schema_under_test)