Negative test autogeneration framework
Generates and executes negative tests based on a json schema for the API.
It's using testscenarios lib to generate a test case for each variation.
It contains resource handling and result proofing of invalid requests.
Not covered in this patch (will be done later):
- Documentation
- Unit tests
- Actual json schemas for all APIs
Partially-implements: bp negative-tests
Co-author: David Kranz <dkranz@redhat.com>
Change-Id: I828f9b74c31f2e25c91e8149ecd7cd0b189ce99a
diff --git a/etc/schemas/compute/flavors/flavor_details.json b/etc/schemas/compute/flavors/flavor_details.json
new file mode 100644
index 0000000..d1c1077
--- /dev/null
+++ b/etc/schemas/compute/flavors/flavor_details.json
@@ -0,0 +1,6 @@
+{
+ "name": "get-flavor-details",
+ "http-method": "GET",
+ "url": "flavors/%s",
+ "resources": ["flavor"]
+}
diff --git a/etc/schemas/compute/flavors/flavors_list.json b/etc/schemas/compute/flavors/flavors_list.json
new file mode 100644
index 0000000..eb8383b
--- /dev/null
+++ b/etc/schemas/compute/flavors/flavors_list.json
@@ -0,0 +1,24 @@
+{
+ "name": "list-flavors-with-detail",
+ "http-method": "GET",
+ "url": "flavors/detail",
+ "json-schema": {
+ "type": "object",
+ "properties": {
+ "minRam": {
+ "type": "integer",
+ "results": {
+ "gen_none": 400,
+ "gen_string": 400
+ }
+ },
+ "minDisk": {
+ "type": "integer",
+ "results": {
+ "gen_none": 400,
+ "gen_string": 400
+ }
+ }
+ }
+ }
+}
diff --git a/etc/schemas/compute/servers/get_console_output.json b/etc/schemas/compute/servers/get_console_output.json
new file mode 100644
index 0000000..7c3860f
--- /dev/null
+++ b/etc/schemas/compute/servers/get_console_output.json
@@ -0,0 +1,21 @@
+{
+ "name": "get-console-output",
+ "http-method": "POST",
+ "url": "servers/%s/action",
+ "resources": ["server"],
+ "json-schema": {
+ "type": "object",
+ "properties": {
+ "os-getConsoleOutput": {
+ "type": "object",
+ "properties": {
+ "length": {
+ "type": ["integer", "string"],
+ "minimum": 0
+ }
+ }
+ }
+ },
+ "additionalProperties": false
+ }
+}
diff --git a/tempest/api/compute/flavors/test_flavors_negative.py b/tempest/api/compute/flavors/test_flavors_negative.py
index 7474996..8ac6182 100644
--- a/tempest/api/compute/flavors/test_flavors_negative.py
+++ b/tempest/api/compute/flavors/test_flavors_negative.py
@@ -13,40 +13,42 @@
# License for the specific language governing permissions and limitations
# under the License.
-import uuid
+import testscenarios
from tempest.api.compute import base
-from tempest import exceptions
-from tempest.test import attr
+from tempest import test
-class FlavorsNegativeTestJSON(base.BaseV2ComputeTest):
+load_tests = testscenarios.load_tests_apply_scenarios
+
+
+class FlavorsListNegativeTestJSON(base.BaseV2ComputeTest,
+ test.NegativeAutoTest):
_interface = 'json'
+ _service = 'compute'
+ _schema_file = 'compute/flavors/flavors_list.json'
+
+ scenarios = test.NegativeAutoTest.generate_scenario(_schema_file)
+
+ @test.attr(type=['negative', 'gate'])
+ def test_list_flavors_with_detail(self):
+ self.execute(self._schema_file)
+
+
+class FlavorDetailsNegativeTestJSON(base.BaseV2ComputeTest,
+ test.NegativeAutoTest):
+ _interface = 'json'
+ _service = 'compute'
+ _schema_file = 'compute/flavors/flavor_details.json'
+
+ scenarios = test.NegativeAutoTest.generate_scenario(_schema_file)
@classmethod
def setUpClass(cls):
- super(FlavorsNegativeTestJSON, cls).setUpClass()
- cls.client = cls.flavors_client
+ super(FlavorDetailsNegativeTestJSON, cls).setUpClass()
+ cls.set_resource("flavor", cls.flavor_ref)
- @attr(type=['negative', 'gate'])
- def test_invalid_minRam_filter(self):
- self.assertRaises(exceptions.BadRequest,
- self.client.list_flavors_with_detail,
- {'minRam': 'invalid'})
-
- @attr(type=['negative', 'gate'])
- def test_invalid_minDisk_filter(self):
- self.assertRaises(exceptions.BadRequest,
- self.client.list_flavors_with_detail,
- {'minDisk': 'invalid'})
-
- @attr(type=['negative', 'gate'])
- def test_non_existent_flavor_id(self):
+ @test.attr(type=['negative', 'gate'])
+ def test_get_flavor_details(self):
# flavor details are not returned for non-existent flavors
- nonexistent_flavor_id = str(uuid.uuid4())
- self.assertRaises(exceptions.NotFound, self.client.get_flavor_details,
- nonexistent_flavor_id)
-
-
-class FlavorsNegativeTestXML(FlavorsNegativeTestJSON):
- _interface = 'xml'
+ self.execute(self._schema_file)
diff --git a/tempest/api/compute/flavors/test_flavors_negative_xml.py b/tempest/api/compute/flavors/test_flavors_negative_xml.py
new file mode 100644
index 0000000..c93c7c9
--- /dev/null
+++ b/tempest/api/compute/flavors/test_flavors_negative_xml.py
@@ -0,0 +1,48 @@
+# Copyright 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+from tempest.api.compute import base
+from tempest import exceptions
+from tempest.test import attr
+
+
+class FlavorsNegativeTestXML(base.BaseV2ComputeTest):
+ _interface = 'xml'
+
+ @classmethod
+ def setUpClass(cls):
+ super(FlavorsNegativeTestXML, cls).setUpClass()
+ cls.client = cls.flavors_client
+
+ @attr(type=['negative', 'gate'])
+ def test_invalid_minRam_filter(self):
+ self.assertRaises(exceptions.BadRequest,
+ self.client.list_flavors_with_detail,
+ {'minRam': 'invalid'})
+
+ @attr(type=['negative', 'gate'])
+ def test_invalid_minDisk_filter(self):
+ self.assertRaises(exceptions.BadRequest,
+ self.client.list_flavors_with_detail,
+ {'minDisk': 'invalid'})
+
+ @attr(type=['negative', 'gate'])
+ def test_non_existent_flavor_id(self):
+ # flavor details are not returned for non-existent flavors
+ nonexistent_flavor_id = str(uuid.uuid4())
+ self.assertRaises(exceptions.NotFound, self.client.get_flavor_details,
+ nonexistent_flavor_id)
diff --git a/tempest/api/compute/servers/test_servers_negative_new.py b/tempest/api/compute/servers/test_servers_negative_new.py
new file mode 100644
index 0000000..2b2fcf1
--- /dev/null
+++ b/tempest/api/compute/servers/test_servers_negative_new.py
@@ -0,0 +1,41 @@
+# Copyright 2014 Red Hat, Inc & Deutsche Telekom AG
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import testscenarios
+
+from tempest.api.compute import base
+from tempest import test
+
+
+load_tests = testscenarios.load_tests_apply_scenarios
+
+
+class GetConsoleOutputNegativeTestJSON(base.BaseV2ComputeTest,
+ test.NegativeAutoTest):
+ _interface = 'json'
+ _service = 'compute'
+ _schema_file = 'compute/servers/get_console_output.json'
+
+ scenarios = test.NegativeAutoTest.generate_scenario(_schema_file)
+
+ @classmethod
+ def setUpClass(cls):
+ super(GetConsoleOutputNegativeTestJSON, cls).setUpClass()
+ _resp, server = cls.create_test_server()
+ cls.set_resource("server", server['id'])
+
+ @test.attr(type=['negative', 'gate'])
+ def test_get_console_output(self):
+ self.execute(self._schema_file)
diff --git a/tempest/clients.py b/tempest/clients.py
index 797185a..f8361af 100644
--- a/tempest/clients.py
+++ b/tempest/clients.py
@@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+from tempest.common.rest_client import NegativeRestClient
from tempest import config
from tempest import exceptions
from tempest.openstack.common import log as logging
@@ -174,7 +175,7 @@
"""
def __init__(self, username=None, password=None, tenant_name=None,
- interface='json'):
+ interface='json', service=None):
"""
We allow overriding of the credentials used within the various
client classes managed by the Manager object. Left as None, the
@@ -323,10 +324,15 @@
self.hosts_v3_client = HostsV3ClientJSON(*client_args)
if CONF.service_available.ceilometer:
self.telemetry_client = TelemetryClientJSON(*client_args)
+ self.negative_client = NegativeRestClient(*client_args)
+ self.negative_client.service = service
if client_args_v3_auth:
self.servers_client_v3_auth = ServersClientJSON(
*client_args_v3_auth)
+ self.negative_v3_client = NegativeRestClient(
+ *client_args_v3_auth)
+ self.negative_v3_client.service = service
else:
msg = "Unsupported interface type `%s'" % interface
raise exceptions.InvalidConfiguration(msg)
@@ -354,11 +360,12 @@
managed client objects
"""
- def __init__(self, interface='json'):
+ def __init__(self, interface='json', service=None):
super(AltManager, self).__init__(CONF.identity.alt_username,
CONF.identity.alt_password,
CONF.identity.alt_tenant_name,
- interface=interface)
+ interface=interface,
+ service=service)
class AdminManager(Manager):
@@ -368,11 +375,12 @@
managed client objects
"""
- def __init__(self, interface='json'):
+ def __init__(self, interface='json', service=None):
super(AdminManager, self).__init__(CONF.identity.admin_username,
CONF.identity.admin_password,
CONF.identity.admin_tenant_name,
- interface=interface)
+ interface=interface,
+ service=service)
class ComputeAdminManager(Manager):
@@ -382,12 +390,13 @@
managed client objects
"""
- def __init__(self, interface='json'):
+ def __init__(self, interface='json', service=None):
base = super(ComputeAdminManager, self)
base.__init__(CONF.compute_admin.username,
CONF.compute_admin.password,
CONF.compute_admin.tenant_name,
- interface=interface)
+ interface=interface,
+ service=service)
class OrchestrationManager(Manager):
@@ -395,9 +404,10 @@
Manager object that uses the admin credentials for its
so that heat templates can create users
"""
- def __init__(self, interface='json'):
+ def __init__(self, interface='json', service=None):
base = super(OrchestrationManager, self)
base.__init__(CONF.identity.admin_username,
CONF.identity.admin_password,
CONF.identity.tenant_name,
- interface=interface)
+ interface=interface,
+ service=service)
diff --git a/tempest/common/generate_json.py b/tempest/common/generate_json.py
new file mode 100644
index 0000000..0a0afe4
--- /dev/null
+++ b/tempest/common/generate_json.py
@@ -0,0 +1,239 @@
+# Copyright 2014 Red Hat, Inc. & Deutsche Telekom AG
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+import jsonschema
+
+from tempest.openstack.common import log as logging
+
+LOG = logging.getLogger(__name__)
+
+
+def generate_valid(schema):
+ """
+ Create a valid dictionary based on the types in a json schema.
+ """
+ LOG.debug("generate_valid: %s" % schema)
+ schema_type = schema["type"]
+ if isinstance(schema_type, list):
+ # Just choose the first one since all are valid.
+ schema_type = schema_type[0]
+ return type_map_valid[schema_type](schema)
+
+
+def generate_valid_string(schema):
+ size = schema.get("minLength", 0)
+ # TODO(dkr mko): handle format and pattern
+ return "x" * size
+
+
+def generate_valid_integer(schema):
+ # TODO(dkr mko): handle multipleOf
+ if "minimum" in schema:
+ minimum = schema["minimum"]
+ if "exclusiveMinimum" not in schema:
+ return minimum
+ else:
+ return minimum + 1
+ if "maximum" in schema:
+ maximum = schema["maximum"]
+ if "exclusiveMaximum" not in schema:
+ return maximum
+ else:
+ return maximum - 1
+ return 0
+
+
+def generate_valid_object(schema):
+ obj = {}
+ for k, v in schema["properties"].iteritems():
+ obj[k] = generate_valid(v)
+ return obj
+
+
+def generate_invalid(schema):
+ """
+ Generate an invalid json dictionary based on a schema.
+ Only one value is mis-generated for each dictionary created.
+
+ Any generator must return a list of tuples or a single tuple.
+ The values of this tuple are:
+ result[0]: Name of the test
+ result[1]: json schema for the test
+ result[2]: expected result of the test (can be None)
+ """
+ LOG.debug("generate_invalid: %s" % schema)
+ schema_type = schema["type"]
+ if isinstance(schema_type, list):
+ if "integer" in schema_type:
+ schema_type = "integer"
+ else:
+ raise Exception("non-integer list types not supported")
+ result = []
+ for generator in type_map_invalid[schema_type]:
+ ret = generator(schema)
+ if ret is not None:
+ if isinstance(ret, list):
+ result.extend(ret)
+ elif isinstance(ret, tuple):
+ result.append(ret)
+ else:
+ raise Exception("generator (%s) returns invalid result"
+ % generator)
+ LOG.debug("result: %s" % result)
+ return result
+
+
+def _check_for_expected_result(name, schema):
+ expected_result = None
+ if "results" in schema:
+ if name in schema["results"]:
+ expected_result = schema["results"][name]
+ return expected_result
+
+
+def generator(fn):
+ """
+ Decorator for simple generators that simply return one value
+ """
+ def wrapped(schema):
+ result = fn(schema)
+ if result is not None:
+ expected_result = _check_for_expected_result(fn.__name__, schema)
+ return (fn.__name__, result, expected_result)
+ return
+ return wrapped
+
+
+@generator
+def gen_int(_):
+ return 4
+
+
+@generator
+def gen_string(_):
+ return "XXXXXX"
+
+
+def gen_none(schema):
+ # Note(mkoderer): it's not using the decorator otherwise it'd be filtered
+ expected_result = _check_for_expected_result('gen_none', schema)
+ return ('gen_none', None, expected_result)
+
+
+@generator
+def gen_str_min_length(schema):
+ min_length = schema.get("minLength", 0)
+ if min_length > 0:
+ return "x" * (min_length - 1)
+
+
+@generator
+def gen_str_max_length(schema):
+ max_length = schema.get("maxLength", -1)
+ if max_length > -1:
+ return "x" * (max_length + 1)
+
+
+@generator
+def gen_int_min(schema):
+ if "minimum" in schema:
+ minimum = schema["minimum"]
+ if "exclusiveMinimum" not in schema:
+ minimum -= 1
+ return minimum
+
+
+@generator
+def gen_int_max(schema):
+ if "maximum" in schema:
+ maximum = schema["maximum"]
+ if "exclusiveMaximum" not in schema:
+ maximum += 1
+ return maximum
+
+
+def gen_obj_remove_attr(schema):
+ invalids = []
+ valid = generate_valid(schema)
+ required = schema.get("required", [])
+ for r in required:
+ new_valid = copy.deepcopy(valid)
+ del new_valid[r]
+ invalids.append(("gen_obj_remove_attr", new_valid, None))
+ return invalids
+
+
+@generator
+def gen_obj_add_attr(schema):
+ valid = generate_valid(schema)
+ if not schema.get("additionalProperties", True):
+ new_valid = copy.deepcopy(valid)
+ new_valid["$$$$$$$$$$"] = "xxx"
+ return new_valid
+
+
+def gen_inv_prop_obj(schema):
+ LOG.debug("generate_invalid_object: %s" % schema)
+ valid = generate_valid(schema)
+ invalids = []
+ properties = schema["properties"]
+
+ for k, v in properties.iteritems():
+ for invalid in generate_invalid(v):
+ LOG.debug(v)
+ new_valid = copy.deepcopy(valid)
+ new_valid[k] = invalid[1]
+ name = "prop_%s_%s" % (k, invalid[0])
+ invalids.append((name, new_valid, invalid[2]))
+
+ LOG.debug("generate_invalid_object return: %s" % invalids)
+ return invalids
+
+
+type_map_valid = {"string": generate_valid_string,
+ "integer": generate_valid_integer,
+ "object": generate_valid_object}
+
+type_map_invalid = {"string": [gen_int,
+ gen_none,
+ gen_str_min_length,
+ gen_str_max_length],
+ "integer": [gen_string,
+ gen_none,
+ gen_int_min,
+ gen_int_max],
+ "object": [gen_obj_remove_attr,
+ gen_obj_add_attr,
+ gen_inv_prop_obj]}
+
+schema = {"type": "object",
+ "properties":
+ {"name": {"type": "string"},
+ "http-method": {"enum": ["GET", "PUT", "HEAD",
+ "POST", "PATCH", "DELETE", 'COPY']},
+ "url": {"type": "string"},
+ "json-schema": jsonschema._utils.load_schema("draft4"),
+ "resources": {"type": "array", "items": {"type": "string"}},
+ "results": {"type": "object",
+ "properties": {}}
+ },
+ "required": ["name", "http-method", "url"],
+ "additionalProperties": False,
+ }
+
+
+def validate_negative_test_schema(nts):
+ jsonschema.validate(nts, schema)
diff --git a/tempest/common/rest_client.py b/tempest/common/rest_client.py
index 8d07545..dd6e730 100644
--- a/tempest/common/rest_client.py
+++ b/tempest/common/rest_client.py
@@ -559,3 +559,33 @@
'retry-after' not in resp):
return True
return 'exceed' in resp_body.get('message', 'blabla')
+
+
+class NegativeRestClient(RestClient):
+ """
+ Version of RestClient that does not raise exceptions.
+ """
+ def _error_checker(self, method, url,
+ headers, body, resp, resp_body):
+ pass
+
+ def send_request(self, method, url_template, resources, body=None):
+ url = url_template % tuple(resources)
+ if method == "GET":
+ resp, body = self.get(url)
+ elif method == "POST":
+ resp, body = self.post(url, body, self.headers)
+ elif method == "PUT":
+ resp, body = self.put(url, body, self.headers)
+ elif method == "PATCH":
+ resp, body = self.patch(url, body, self.headers)
+ elif method == "HEAD":
+ resp, body = self.head(url)
+ elif method == "DELETE":
+ resp, body = self.delete(url)
+ elif method == "COPY":
+ resp, body = self.copy(url)
+ else:
+ assert False
+
+ return resp, body
diff --git a/tempest/test.py b/tempest/test.py
index 3052f55..5af9ebf 100644
--- a/tempest/test.py
+++ b/tempest/test.py
@@ -15,8 +15,11 @@
import atexit
import functools
+import json
import os
import time
+import urllib
+import uuid
import fixtures
import nose.plugins.attrib
@@ -24,6 +27,7 @@
import testtools
from tempest import clients
+from tempest.common import generate_json
from tempest.common import isolated_creds
from tempest import config
from tempest import exceptions
@@ -224,6 +228,7 @@
testresources.ResourcedTestCase):
setUpClassCalled = False
+ _service = None
network_resources = {}
@@ -286,23 +291,27 @@
os = clients.Manager(username=username,
password=password,
tenant_name=tenant_name,
- interface=cls._interface)
+ interface=cls._interface,
+ service=cls._service)
elif interface:
os = clients.Manager(username=username,
password=password,
tenant_name=tenant_name,
- interface=interface)
+ interface=interface,
+ service=cls._service)
else:
os = clients.Manager(username=username,
password=password,
- tenant_name=tenant_name)
+ tenant_name=tenant_name,
+ service=cls._service)
else:
if getattr(cls, '_interface', None):
- os = clients.Manager(interface=cls._interface)
+ os = clients.Manager(interface=cls._interface,
+ service=cls._service)
elif interface:
- os = clients.Manager(interface=interface)
+ os = clients.Manager(interface=interface, service=cls._service)
else:
- os = clients.Manager()
+ os = clients.Manager(service=cls._service)
return os
@classmethod
@@ -318,7 +327,8 @@
"""
Returns an instance of the Identity Admin API client
"""
- os = clients.AdminManager(interface=cls._interface)
+ os = clients.AdminManager(interface=cls._interface,
+ service=cls._service)
admin_client = os.identity_client
return admin_client
@@ -344,6 +354,171 @@
'dhcp': dhcp}
+class NegativeAutoTest(BaseTestCase):
+
+ _resources = {}
+
+ @classmethod
+ def setUpClass(cls):
+ super(NegativeAutoTest, cls).setUpClass()
+ os = cls.get_client_manager()
+ cls.client = os.negative_client
+
+ @staticmethod
+ def load_schema(file):
+ """
+ Loads a schema from a file on a specified location.
+
+ :param file: the file name
+ """
+ #NOTE(mkoderer): must be extended for xml support
+ fn = os.path.join(
+ os.path.abspath(os.path.dirname(os.path.dirname(__file__))),
+ "etc", "schemas", file)
+ LOG.debug("Open schema file: %s" % (fn))
+ return json.load(open(fn))
+
+ @staticmethod
+ def generate_scenario(description_file):
+ """
+ Generates the test scenario list for a given description.
+
+ :param description: A dictionary with the following entries:
+ name (required) name for the api
+ http-method (required) one of HEAD,GET,PUT,POST,PATCH,DELETE
+ url (required) the url to be appended to the catalog url with '%s'
+ for each resource mentioned
+ resources: (optional) A list of resource names such as "server",
+ "flavor", etc. with an element for each '%s' in the url. This
+ method will call self.get_resource for each element when
+ constructing the positive test case template so negative
+ subclasses are expected to return valid resource ids when
+ appropriate.
+ json-schema (optional) A valid json schema that will be used to
+ create invalid data for the api calls. For "GET" and "HEAD",
+ the data is used to generate query strings appended to the url,
+ otherwise for the body of the http call.
+ """
+ description = NegativeAutoTest.load_schema(description_file)
+ LOG.debug(description)
+ generate_json.validate_negative_test_schema(description)
+ schema = description.get("json-schema", None)
+ resources = description.get("resources", [])
+ scenario_list = []
+ for resource in resources:
+ LOG.debug("Add resource to test %s" % resource)
+ scn_name = "inv_res_%s" % (resource)
+ scenario_list.append((scn_name, {"resource": (resource,
+ str(uuid.uuid4()))
+ }))
+ if schema is not None:
+ for invalid in generate_json.generate_invalid(schema):
+ scenario_list.append((invalid[0],
+ {"schema": invalid[1],
+ "expected_result": invalid[2]}))
+ LOG.debug(scenario_list)
+ return scenario_list
+
+ def execute(self, description_file):
+ """
+ Execute a http call on an api that are expected to
+ result in client errors. First it uses invalid resources that are part
+ of the url, and then invalid data for queries and http request bodies.
+
+ :param description: A dictionary with the following entries:
+ name (required) name for the api
+ http-method (required) one of HEAD,GET,PUT,POST,PATCH,DELETE
+ url (required) the url to be appended to the catalog url with '%s'
+ for each resource mentioned
+ resources: (optional) A list of resource names such as "server",
+ "flavor", etc. with an element for each '%s' in the url. This
+ method will call self.get_resource for each element when
+ constructing the positive test case template so negative
+ subclasses are expected to return valid resource ids when
+ appropriate.
+ json-schema (optional) A valid json schema that will be used to
+ create invalid data for the api calls. For "GET" and "HEAD",
+ the data is used to generate query strings appended to the url,
+ otherwise for the body of the http call.
+
+ """
+ description = NegativeAutoTest.load_schema(description_file)
+ LOG.info("Executing %s" % description["name"])
+ LOG.debug(description)
+ method = description["http-method"]
+ url = description["url"]
+
+ resources = [self.get_resource(r) for
+ r in description.get("resources", [])]
+
+ if hasattr(self, "resource"):
+ # Note(mkoderer): The resources list already contains an invalid
+ # entry (see get_resource).
+ # We just send a valid json-schema with it
+ valid = None
+ schema = description.get("json-schema", None)
+ if schema:
+ valid = generate_json.generate_valid(schema)
+ new_url, body = self._http_arguments(valid, url, method)
+ resp, resp_body = self.client.send_request(method, new_url,
+ resources, body=body)
+ self._check_negative_response(resp.status, resp_body)
+ return
+
+ if hasattr(self, "schema"):
+ new_url, body = self._http_arguments(self.schema, url, method)
+ resp, resp_body = self.client.send_request(method, new_url,
+ resources, body=body)
+ self._check_negative_response(resp.status, resp_body)
+
+ def _http_arguments(self, json_dict, url, method):
+ LOG.debug("dict: %s url: %s method: %s" % (json_dict, url, method))
+ if not json_dict:
+ return url, None
+ elif method in ["GET", "HEAD", "PUT", "DELETE"]:
+ return "%s?%s" % (url, urllib.urlencode(json_dict)), None
+ else:
+ return url, json.dumps(json_dict)
+
+ def _check_negative_response(self, result, body):
+ expected_result = getattr(self, "expected_result", None)
+ self.assertTrue(result >= 400 and result < 500 and result != 413,
+ "Expected client error, got %s:%s" %
+ (result, body))
+ self.assertTrue(expected_result is None or expected_result == result,
+ "Expected %s, got %s:%s" %
+ (expected_result, result, body))
+
+ @classmethod
+ def set_resource(cls, name, resource):
+ """
+ This function can be used in setUpClass context to register a resoruce
+ for a test.
+
+ :param name: The name of the kind of resource such as "flavor", "role",
+ etc.
+ :resource: The id of the resource
+ """
+ cls._resources[name] = resource
+
+ def get_resource(self, name):
+ """
+ Return a valid uuid for a type of resource. If a real resource is
+ needed as part of a url then this method should return one. Otherwise
+ it can return None.
+
+ :param name: The name of the kind of resource such as "flavor", "role",
+ etc.
+ """
+ if hasattr(self, "resource") and self.resource[0] == name:
+ LOG.debug("Return invalid resource (%s) value: %s" %
+ (self.resource[0], self.resource[1]))
+ return self.resource[1]
+ if name in self._resources:
+ return self._resources[name]
+ return None
+
+
def call_until_true(func, duration, sleep_for):
"""
Call the given function until it returns True (and return True) or