Merge "added an api test for security_groups"
diff --git a/tempest/api/network/test_security_groups.py b/tempest/api/network/test_security_groups.py
new file mode 100644
index 0000000..24f8286
--- /dev/null
+++ b/tempest/api/network/test_security_groups.py
@@ -0,0 +1,127 @@
+# vim: tabstop=4 shiftwidth=4 softtabstop=4
+
+# Copyright 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.api.network import base
+from tempest.common.utils.data_utils import rand_name
+from tempest import exceptions
+from tempest.test import attr
+
+
+class SecGroupTest(base.BaseNetworkTest):
+ _interface = 'json'
+
+ @classmethod
+ def setUpClass(cls):
+ super(SecGroupTest, cls).setUpClass()
+
+ def _delete_security_group(self, secgroup_id):
+ resp, _ = self.client.delete_security_group(secgroup_id)
+ self.assertEqual(204, resp.status)
+ # Asserting that the secgroup is not found in the list
+ # after deletion
+ resp, list_body = self.client.list_security_groups()
+ self.assertEqual('200', resp['status'])
+ secgroup_list = list()
+ for secgroup in list_body['security_groups']:
+ secgroup_list.append(secgroup['id'])
+ self.assertNotIn(secgroup_id, secgroup_list)
+
+ def _delete_security_group_rule(self, rule_id):
+ resp, _ = self.client.delete_security_group_rule(rule_id)
+ self.assertEqual(204, resp.status)
+ # Asserting that the secgroup is not found in the list
+ # after deletion
+ resp, list_body = self.client.list_security_group_rules()
+ self.assertEqual('200', resp['status'])
+ rules_list = list()
+ for rule in list_body['security_group_rules']:
+ rules_list.append(rule['id'])
+ self.assertNotIn(rule_id, rules_list)
+
+ @attr(type='smoke')
+ def test_list_security_groups(self):
+ # Verify the that security group belonging to tenant exist in list
+ resp, body = self.client.list_security_groups()
+ self.assertEqual('200', resp['status'])
+ security_groups = body['security_groups']
+ found = None
+ for n in security_groups:
+ if (n['name'] == 'default'):
+ found = n['id']
+ msg = "Security-group list doesn't contain default security-group"
+ self.assertIsNotNone(found, msg)
+
+ @attr(type='smoke')
+ def test_create_show_delete_security_group_and_rule(self):
+ # Create a security group
+ name = rand_name('secgroup-')
+ resp, group_create_body = self.client.create_security_group(name)
+ self.assertEqual('201', resp['status'])
+ self.addCleanup(self._delete_security_group,
+ group_create_body['security_group']['id'])
+ self.assertEqual(group_create_body['security_group']['name'], name)
+
+ # Show details of the created security group
+ resp, show_body = self.client.show_security_group(
+ group_create_body['security_group']['id'])
+ self.assertEqual('200', resp['status'])
+ self.assertEqual(show_body['security_group']['name'], name)
+
+ # List security groups and verify if created group is there in response
+ resp, list_body = self.client.list_security_groups()
+ self.assertEqual('200', resp['status'])
+ secgroup_list = list()
+ for secgroup in list_body['security_groups']:
+ secgroup_list.append(secgroup['id'])
+ self.assertIn(group_create_body['security_group']['id'], secgroup_list)
+ # No Udpate in security group
+ # Create rule
+ resp, rule_create_body = self.client.create_security_group_rule(
+ group_create_body['security_group']['id']
+ )
+ self.assertEqual('201', resp['status'])
+ self.addCleanup(self._delete_security_group_rule,
+ rule_create_body['security_group_rule']['id'])
+ # Show details of the created security rule
+ resp, show_rule_body = self.client.show_security_group_rule(
+ rule_create_body['security_group_rule']['id']
+ )
+ self.assertEqual('200', resp['status'])
+
+ # List rules and verify created rule is in response
+ resp, rule_list_body = self.client.list_security_group_rules()
+ self.assertEqual('200', resp['status'])
+ rule_list = [rule['id']
+ for rule in rule_list_body['security_group_rules']]
+ self.assertIn(rule_create_body['security_group_rule']['id'], rule_list)
+
+ @attr(type=['negative', 'smoke'])
+ def test_show_non_existent_security_group(self):
+ non_exist_id = rand_name('secgroup-')
+ self.assertRaises(exceptions.NotFound, self.client.show_security_group,
+ non_exist_id)
+
+ @attr(type=['negative', 'smoke'])
+ def test_show_non_existent_security_group_rule(self):
+ non_exist_id = rand_name('rule-')
+ self.assertRaises(exceptions.NotFound,
+ self.client.show_security_group_rule,
+ non_exist_id)
+
+
+class SecGroupTestXML(SecGroupTest):
+ _interface = 'xml'
diff --git a/tempest/services/network/json/network_client.py b/tempest/services/network/json/network_client.py
index ef12a00..3550878 100644
--- a/tempest/services/network/json/network_client.py
+++ b/tempest/services/network/json/network_client.py
@@ -242,7 +242,7 @@
update_body['admin_state_up'] = kwargs.get(
'admin_state_up', body['router']['admin_state_up'])
# Must uncomment/modify these lines once LP question#233187 is solved
- #update_body['external_gateway_info'] = kwargs.get(
+ # update_body['external_gateway_info'] = kwargs.get(
# 'external_gateway_info', body['router']['external_gateway_info'])
update_body = dict(router=update_body)
update_body = json.dumps(update_body)
@@ -296,18 +296,55 @@
body = json.loads(body)
return resp, body
+ def list_security_groups(self):
+ uri = '%s/security-groups' % (self.uri_prefix)
+ resp, body = self.get(uri, self.headers)
+ body = json.loads(body)
+ return resp, body
+
+ def delete_security_group(self, secgroup_id):
+ uri = '%s/security-groups/%s' % (self.uri_prefix, secgroup_id)
+ resp, body = self.delete(uri, self.headers)
+ return resp, body
+
+ def create_security_group(self, name, **kwargs):
+ post_body = {
+ 'security_group': {
+ 'name': name,
+ }
+ }
+ for key, value in kwargs.iteritems():
+ post_body['security_group'][str(key)] = value
+ body = json.dumps(post_body)
+ uri = '%s/security-groups' % (self.uri_prefix)
+ resp, body = self.post(uri, headers=self.headers, body=body)
+ body = json.loads(body)
+ return resp, body
+
def show_floating_ip(self, floating_ip_id):
uri = '%s/floatingips/%s' % (self.uri_prefix, floating_ip_id)
resp, body = self.get(uri, self.headers)
body = json.loads(body)
return resp, body
+ def show_security_group(self, secgroup_id):
+ uri = '%s/security-groups/%s' % (self.uri_prefix, secgroup_id)
+ resp, body = self.get(uri, self.headers)
+ body = json.loads(body)
+ return resp, body
+
def list_floating_ips(self):
uri = '%s/floatingips' % (self.uri_prefix)
resp, body = self.get(uri, self.headers)
body = json.loads(body)
return resp, body
+ def list_security_group_rules(self):
+ uri = '%s/security-group-rules' % (self.uri_prefix)
+ resp, body = self.get(uri, self.headers)
+ body = json.loads(body)
+ return resp, body
+
def delete_floating_ip(self, floating_ip_id):
uri = '%s/floatingips/%s' % (self.uri_prefix, floating_ip_id)
resp, body = self.delete(uri, self.headers)
@@ -321,3 +358,30 @@
resp, body = self.put(uri, headers=self.headers, body=body)
body = json.loads(body)
return resp, body
+
+ def create_security_group_rule(self, secgroup_id,
+ direction='ingress', **kwargs):
+ post_body = {
+ 'security_group_rule': {
+ 'direction': direction,
+ 'security_group_id': secgroup_id
+ }
+ }
+ for key, value in kwargs.iteritems():
+ post_body['security_group_rule'][str(key)] = value
+ body = json.dumps(post_body)
+ uri = '%s/security-group-rules' % (self.uri_prefix)
+ resp, body = self.post(uri, headers=self.headers, body=body)
+ body = json.loads(body)
+ return resp, body
+
+ def delete_security_group_rule(self, rule_id):
+ uri = '%s/security-group-rules/%s' % (self.uri_prefix, rule_id)
+ resp, body = self.delete(uri, self.headers)
+ return resp, body
+
+ def show_security_group_rule(self, rule_id):
+ uri = '%s/security-group-rules/%s' % (self.uri_prefix, rule_id)
+ resp, body = self.get(uri, self.headers)
+ body = json.loads(body)
+ return resp, body
diff --git a/tempest/services/network/xml/network_client.py b/tempest/services/network/xml/network_client.py
index d4fb656..0aef92f 100755
--- a/tempest/services/network/xml/network_client.py
+++ b/tempest/services/network/xml/network_client.py
@@ -161,6 +161,64 @@
body = _root_tag_fetcher_and_xml_to_json_parse(body)
return resp, body
+ def create_security_group(self, name):
+ uri = '%s/security-groups' % (self.uri_prefix)
+ post_body = Element("security_group")
+ p2 = Element("name", name)
+ post_body.append(p2)
+ resp, body = self.post(uri, str(Document(post_body)), self.headers)
+ body = _root_tag_fetcher_and_xml_to_json_parse(body)
+ return resp, body
+
+ def list_security_groups(self):
+ url = '%s/security-groups' % (self.uri_prefix)
+ resp, body = self.get(url, self.headers)
+ secgroups = self._parse_array(etree.fromstring(body))
+ secgroups = {"security_groups": secgroups}
+ return resp, secgroups
+
+ def delete_security_group(self, secgroup_id):
+ uri = '%s/security-groups/%s' % (self.uri_prefix, str(secgroup_id))
+ return self.delete(uri, self.headers)
+
+ def show_security_group(self, secgroup_id):
+ uri = '%s/security-groups/%s' % (self.uri_prefix, str(secgroup_id))
+ resp, body = self.get(uri, self.headers)
+ body = _root_tag_fetcher_and_xml_to_json_parse(body)
+ return resp, body
+
+ def list_security_group_rules(self):
+ url = '%s/security-group-rules' % (self.uri_prefix)
+ resp, body = self.get(url, self.headers)
+ rules = self._parse_array(etree.fromstring(body))
+ rules = {"security_group_rules": rules}
+ return resp, rules
+
+ def create_security_group_rule(self, secgroup_id,
+ direction='ingress', **kwargs):
+ uri = '%s/security-group-rules' % (self.uri_prefix)
+ rule = Element("security_group_rule")
+ p1 = Element('security_group_id', secgroup_id)
+ p2 = Element('direction', direction)
+ rule.append(p1)
+ rule.append(p2)
+ for key, val in kwargs.items():
+ key = Element(key, val)
+ rule.append(key)
+ resp, body = self.post(uri, str(Document(rule)), self.headers)
+ body = _root_tag_fetcher_and_xml_to_json_parse(body)
+ return resp, body
+
+ def delete_security_group_rule(self, rule_id):
+ uri = '%s/security-group-rules/%s' % (self.uri_prefix, str(rule_id))
+ return self.delete(uri, self.headers)
+
+ def show_security_group_rule(self, rule_id):
+ uri = '%s/security-group-rules/%s' % (self.uri_prefix, str(rule_id))
+ resp, body = self.get(uri, self.headers)
+ body = _root_tag_fetcher_and_xml_to_json_parse(body)
+ return resp, body
+
def _root_tag_fetcher_and_xml_to_json_parse(xml_returned_body):
body = ET.fromstring(xml_returned_body)