Fwaas API Test Enhancement
Add Test to insert and remove firewall rule to firewall policy
-Create a firewall rule
-Create a firewall policy
-Insert a firewall rule to policy
-Validate the insertion of rule to policy
-Remove the firewall rule from policy
-Validate the removal of firewall rule from policy
Change-Id: I3c9711294e1e6323bdad4ff783de1ed19e133c1b
diff --git a/tempest/api/network/test_fwaas_extensions.py b/tempest/api/network/test_fwaas_extensions.py
index 6eec79e..f89af62 100644
--- a/tempest/api/network/test_fwaas_extensions.py
+++ b/tempest/api/network/test_fwaas_extensions.py
@@ -36,6 +36,8 @@
List firewall policies
Create firewall policy
Update firewall policy
+ Insert firewall rule to policy
+ Remove firewall rule from policy
Delete firewall policy
Show firewall policy
List firewall
@@ -62,6 +64,14 @@
except exceptions.NotFound:
pass
+ def _try_delete_rule(self, rule_id):
+ # delete rule, if it exists
+ try:
+ self.client.delete_firewall_rule(rule_id)
+ # if rule is not found, this means it was deleted in the test
+ except exceptions.NotFound:
+ pass
+
def _try_delete_firewall(self, fw_id):
# delete firewall, if it exists
try:
@@ -214,6 +224,40 @@
# Delete firewall
self.client.delete_firewall(firewall_id)
+ @test.attr(type='smoke')
+ def test_insert_remove_firewall_rule_from_policy(self):
+ # Create firewall rule
+ resp, body = self.client.create_firewall_rule(
+ name=data_utils.rand_name("fw-rule"),
+ action="allow",
+ protocol="tcp")
+ fw_rule_id = body['firewall_rule']['id']
+ self.addCleanup(self._try_delete_rule, fw_rule_id)
+ # Create firewall policy
+ _, body = self.client.create_firewall_policy(
+ name=data_utils.rand_name("fw-policy"))
+ fw_policy_id = body['firewall_policy']['id']
+ self.addCleanup(self._try_delete_policy, fw_policy_id)
+
+ # Insert rule to firewall policy
+ self.client.insert_firewall_rule_in_policy(
+ fw_policy_id, fw_rule_id, '', '')
+
+ # Verify insertion of rule in policy
+ self.assertIn(fw_rule_id, self._get_list_fw_rule_ids(fw_policy_id))
+ # Remove rule from the firewall policy
+ self.client.remove_firewall_rule_from_policy(
+ fw_policy_id, fw_rule_id)
+
+ # Verify removal of rule from firewall policy
+ self.assertNotIn(fw_rule_id, self._get_list_fw_rule_ids(fw_policy_id))
+
+ def _get_list_fw_rule_ids(self, fw_policy_id):
+ _, fw_policy = self.client.show_firewall_policy(
+ fw_policy_id)
+ return [ruleid for ruleid in fw_policy['firewall_policy']
+ ['firewall_rules']]
+
class FWaaSExtensionTestXML(FWaaSExtensionTestJSON):
_interface = 'xml'
diff --git a/tempest/services/network/json/network_client.py b/tempest/services/network/json/network_client.py
index 16a4f5c..78ed56f 100644
--- a/tempest/services/network/json/network_client.py
+++ b/tempest/services/network/json/network_client.py
@@ -320,3 +320,30 @@
self.rest_client.expected_success(201, resp.status)
body = json.loads(body)
return resp, body
+
+ def insert_firewall_rule_in_policy(self, firewall_policy_id,
+ firewall_rule_id, insert_after="",
+ insert_before=""):
+ uri = '%s/fw/firewall_policies/%s/insert_rule' % (self.uri_prefix,
+ firewall_policy_id)
+ body = {
+ "firewall_rule_id": firewall_rule_id,
+ "insert_after": insert_after,
+ "insert_before": insert_before
+ }
+ body = json.dumps(body)
+ resp, body = self.put(uri, body)
+ self.rest_client.expected_success(200, resp.status)
+ body = json.loads(body)
+ return resp, body
+
+ def remove_firewall_rule_from_policy(self, firewall_policy_id,
+ firewall_rule_id):
+ uri = '%s/fw/firewall_policies/%s/remove_rule' % (self.uri_prefix,
+ firewall_policy_id)
+ update_body = {"firewall_rule_id": firewall_rule_id}
+ update_body = json.dumps(update_body)
+ resp, body = self.put(uri, update_body)
+ self.rest_client.expected_success(200, resp.status)
+ body = json.loads(body)
+ return resp, body
diff --git a/tempest/services/network/xml/network_client.py b/tempest/services/network/xml/network_client.py
index 17b1f8e..c65390e 100644
--- a/tempest/services/network/xml/network_client.py
+++ b/tempest/services/network/xml/network_client.py
@@ -25,7 +25,8 @@
# list of plurals used for xml serialization
PLURALS = ['dns_nameservers', 'host_routes', 'allocation_pools',
'fixed_ips', 'extensions', 'extra_dhcp_opts', 'pools',
- 'health_monitors', 'vips', 'members', 'allowed_address_pairs']
+ 'health_monitors', 'vips', 'members', 'allowed_address_pairs',
+ 'firewall_rules']
def get_rest_client(self, auth_provider):
rc = rest_client.RestClient(auth_provider)
@@ -281,6 +282,27 @@
body = _root_tag_fetcher_and_xml_to_json_parse(body)
return resp, body
+ def insert_firewall_rule_in_policy(self, firewall_policy_id,
+ firewall_rule_id, insert_after="",
+ insert_before=""):
+ uri = '%s/fw/firewall_policies/%s/insert_rule' % (self.uri_prefix,
+ firewall_policy_id)
+ rule = common.Element("firewall_rule_id", firewall_rule_id)
+ resp, body = self.put(uri, str(common.Document(rule)))
+ self.rest_client.expected_success(200, resp.status)
+ body = _root_tag_fetcher_and_xml_to_json_parse(body)
+ return resp, body
+
+ def remove_firewall_rule_from_policy(self, firewall_policy_id,
+ firewall_rule_id):
+ uri = '%s/fw/firewall_policies/%s/remove_rule' % (self.uri_prefix,
+ firewall_policy_id)
+ rule = common.Element("firewall_rule_id", firewall_rule_id)
+ resp, body = self.put(uri, str(common.Document(rule)))
+ self.rest_client.expected_success(200, resp.status)
+ body = _root_tag_fetcher_and_xml_to_json_parse(body)
+ return resp, body
+
def _root_tag_fetcher_and_xml_to_json_parse(xml_returned_body):
body = ET.fromstring(xml_returned_body)