Merge "Fixes bug 902352 – New tests for security groups"
diff --git a/tempest/services/nova/json/security_groups_client.py b/tempest/services/nova/json/security_groups_client.py
index 34cf807..0e6b6cf 100644
--- a/tempest/services/nova/json/security_groups_client.py
+++ b/tempest/services/nova/json/security_groups_client.py
@@ -10,6 +10,8 @@
self.client = rest_client.RestClient(config, username, key,
auth_url, catalog_type,
tenant_name)
+ self.headers = {'Content-Type': 'application/json',
+ 'Accept': 'application/json'}
def list_security_groups(self, params=None):
"""List all security groups for a user"""
@@ -24,19 +26,63 @@
resp, body = self.client.get(url)
body = json.loads(body)
- return resp, body
+ return resp, body['security_groups']
- def list_security_groups_with_detail(self, params=None):
- """List security groups with detail"""
-
- url = 'os-security-groups/detail'
- if params != None:
- param_list = []
- for param, value in params.iteritems():
- param_list.append("%s=%s&" % (param, value))
-
- url += '?' + ' '.join(param_list)
-
+ def get_security_group(self, security_group_id):
+ """Get the details of a Security Group"""
+ url = "os-security-groups/%s" % str(security_group_id)
resp, body = self.client.get(url)
body = json.loads(body)
- return resp, body
+ return resp, body['security_group']
+
+ def create_security_group(self, name, description):
+ """
+ Creates a new security group.
+ name (Required): Name of security group.
+ description (Required): Description of security group.
+ """
+ post_body = {
+ 'name': name,
+ 'description': description,
+ }
+ post_body = json.dumps({'security_group': post_body})
+ resp, body = self.client.post('os-security-groups',
+ post_body, self.headers)
+ body = json.loads(body)
+ return resp, body['security_group']
+
+ def delete_security_group(self, security_group_id):
+ """Deletes the provided Security Group"""
+ return self.client.delete('os-security-groups/%s'
+ % str(security_group_id))
+
+ def create_security_group_rule(self, parent_group_id, ip_proto, from_port,
+ to_port, **kwargs):
+ """
+ Creating a new security group rules.
+ parent_group_id :ID of Security group
+ ip_protocol : ip_proto (icmp, tcp, udp).
+ from_port: Port at start of range.
+ to_port : Port at end of range.
+ Following optional keyword arguments are accepted:
+ cidr : CIDR for address range.
+ group_id : ID of the Source group
+ """
+ post_body = {
+ 'parent_group_id': parent_group_id,
+ 'ip_protocol': ip_proto,
+ 'from_port': from_port,
+ 'to_port': to_port,
+ 'cidr': kwargs.get('cidr'),
+ 'group_id': kwargs.get('group_id'),
+ }
+ post_body = json.dumps({'security_group_rule': post_body})
+ url = 'os-security-group-rules'
+ resp, body = self.client.post(url, post_body, self.headers)
+ body = json.loads(body)
+ return resp, body['security_group_rule']
+
+ def delete_security_group_rule(self, group_rule_id):
+ """Deletes the provided Security Group rule"""
+ return self.client.delete('os-security-group-rules/%s'
+ % str(group_rule_id))
diff --git a/tempest/tests/test_security_group_rules.py b/tempest/tests/test_security_group_rules.py
new file mode 100644
index 0000000..c07f869
--- /dev/null
+++ b/tempest/tests/test_security_group_rules.py
@@ -0,0 +1,238 @@
+from nose.plugins.attrib import attr
+from tempest import openstack
+import unittest2 as unittest
+from tempest import exceptions
+from tempest.common.utils.data_utils import rand_name
+import time
+
+
+class SecurityGroupsTest(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.os = openstack.Manager()
+ cls.client = cls.os.security_groups_client
+
+ @attr(type='positive')
+ def test_security_group_rules_create(self):
+ """
+ Positive test: Creation of Security Group rule
+ should be successfull
+ """
+ try:
+ #Creating a Security Group to add rules to it
+ s_name = rand_name('securitygroup-')
+ s_description = rand_name('description-')
+ resp, securitygroup =\
+ self.client.create_security_group(s_name, s_description)
+ securitygroup_id = securitygroup['id']
+ #Adding rules to the created Security Group
+ parent_group_id = securitygroup['id']
+ ip_protocol = 'tcp'
+ from_port = 22
+ to_port = 22
+ resp, rule =\
+ self.client.create_security_group_rule(parent_group_id,
+ ip_protocol, from_port,
+ to_port)
+ self.assertEqual(200, resp.status)
+ finally:
+ #Deleting the Security Group rule, created in this method
+ group_rule_id = rule['id']
+ self.client.delete_security_group_rule(group_rule_id)
+ #Deleting the Security Group created in this method
+ resp, _ = self.client.delete_security_group(securitygroup_id)
+
+ @attr(type='positive')
+ def test_security_group_rules_create_with_optional_arguments(self):
+ """
+ Positive test: Creation of Security Group rule
+ with optional arguments
+ should be successfull
+ """
+ try:
+ #Creating a Security Group to add rules to it
+ s_name = rand_name('securitygroup-')
+ s_description = rand_name('description-')
+ resp, securitygroup =\
+ self.client.create_security_group(s_name, s_description)
+ securitygroup_id1 = securitygroup['id']
+ #Creating a Security Group so as to assign group_id to the rule
+ s_name2 = rand_name('securitygroup-')
+ s_description2 = rand_name('description-')
+ resp, securitygroup =\
+ self.client.create_security_group(s_name2, s_description2)
+ securitygroup_id2 = securitygroup['id']
+ #Adding rules to the created Security Group with optional arguments
+ parent_group_id = securitygroup_id1
+ ip_protocol = 'tcp'
+ from_port = 22
+ to_port = 22
+ cidr = '10.2.3.124/24'
+ group_id = securitygroup_id2
+ resp, rule =\
+ self.client.create_security_group_rule(parent_group_id,
+ ip_protocol,
+ from_port, to_port,
+ cidr=cidr,
+ group_id=group_id)
+ self.assertEqual(200, resp.status)
+ finally:
+ #Deleting the Security Group rule, created in this method
+ group_rule_id = rule['id']
+ self.client.delete_security_group_rule(group_rule_id)
+ #Deleting the Security Groups created in this method
+ resp, _ = self.client.delete_security_group(securitygroup_id1)
+ resp, _ = self.client.delete_security_group(securitygroup_id2)
+
+ @attr(type='positive')
+ def test_security_group_rules_create_delete(self):
+ """
+ Positive test: Deletion of Security Group rule
+ should be successfull
+ """
+ try:
+ #Creating a Security Group to add rule to it
+ s_name = rand_name('securitygroup-')
+ s_description = rand_name('description-')
+ resp, securitygroup =\
+ self.client.create_security_group(s_name, s_description)
+ securitygroup_id = securitygroup['id']
+ #Adding rules to the created Security Group
+ parent_group_id = securitygroup['id']
+ ip_protocol = 'tcp'
+ from_port = 22
+ to_port = 22
+ resp, rule =\
+ self.client.create_security_group_rule(parent_group_id,
+ ip_protocol,
+ from_port, to_port)
+ finally:
+ #Deleting the Security Group rule, created in this method
+ group_rule_id = rule['id']
+ self.client.delete_security_group_rule(group_rule_id)
+ #Deleting the Security Group created in this method
+ resp, _ = self.client.delete_security_group(securitygroup_id)
+
+ @attr(type='negative')
+ def test_security_group_rules_create_with_invalid_id(self):
+ """
+ Negative test: Creation of Security Group rule should FAIL
+ with invalid Parent group id
+ """
+ #Adding rules to the invalid Security Group id
+ parent_group_id = rand_name('999')
+ ip_protocol = 'tcp'
+ from_port = 22
+ to_port = 22
+ try:
+ resp, rule =\
+ self.client.create_security_group_rule(parent_group_id,
+ ip_protocol,
+ from_port, to_port)
+ except exceptions.NotFound:
+ pass
+ else:
+ self.fail('Security Group rule should not be created '
+ 'with invalid parent group id')
+
+ @attr(type='negative')
+ def test_security_group_rules_create_with_invalid_ip_protocol(self):
+ """
+ Negative test: Creation of Security Group rule should FAIL
+ with invalid ip_protocol
+ """
+ #Creating a Security Group to add rule to it
+ s_name = rand_name('securitygroup-')
+ s_description = rand_name('description-')
+ resp, securitygroup = self.client.create_security_group(s_name,
+ s_description)
+ #Adding rules to the created Security Group
+ parent_group_id = securitygroup['id']
+ ip_protocol = rand_name('999')
+ from_port = 22
+ to_port = 22
+ try:
+ resp, rule =\
+ self.client.create_security_group_rule(parent_group_id,
+ ip_protocol,
+ from_port, to_port)
+ except exceptions.BadRequest:
+ pass
+ else:
+ self.fail('Security Group rule should not be created '
+ 'with invalid ip_protocol')
+ #Deleting the Security Group created in this method
+ resp, _ = self.client.delete_security_group(securitygroup['id'])
+
+ @attr(type='negative')
+ def test_security_group_rules_create_with_invalid_from_port(self):
+ """
+ Negative test: Creation of Security Group rule should FAIL
+ with invalid from_port
+ """
+ #Creating a Security Group to add rule to it
+ s_name = rand_name('securitygroup-')
+ s_description = rand_name('description-')
+ resp, securitygroup = self.client.create_security_group(s_name,
+ s_description)
+ #Adding rules to the created Security Group
+ parent_group_id = securitygroup['id']
+ ip_protocol = 'tcp'
+ from_port = rand_name('999')
+ to_port = 22
+ try:
+ resp, rule =\
+ self.client.create_security_group_rule(parent_group_id,
+ ip_protocol,
+ from_port, to_port)
+ except exceptions.BadRequest:
+ pass
+ else:
+ self.fail('Security Group rule should not be created'
+ 'with invalid from_port')
+ #Deleting the Security Group created in this method
+ resp, _ = self.client.delete_security_group(securitygroup['id'])
+
+ @attr(type='negative')
+ def test_security_group_rules_create_with_invalid_to_port(self):
+ """
+ Negative test: Creation of Security Group rule should FAIL
+ with invalid from_port
+ """
+ #Creating a Security Group to add rule to it
+ s_name = rand_name('securitygroup-')
+ s_description = rand_name('description-')
+ resp, securitygroup = self.client.create_security_group(s_name,
+ s_description)
+ #Adding rules to the created Security Group
+ parent_group_id = securitygroup['id']
+ ip_protocol = 'tcp'
+ from_port = 22
+ to_port = rand_name('999')
+ try:
+ resp, rule =\
+ self.client.create_security_group_rule(parent_group_id,
+ ip_protocol,
+ from_port, to_port)
+ except exceptions.BadRequest:
+ pass
+ else:
+ self.fail('Security Group rule should not be created'
+ 'with invalid from_port')
+ #Deleting the Security Group created in this method
+ resp, _ = self.client.delete_security_group(securitygroup['id'])
+
+ @attr(type='negative')
+ def test_security_group_rules_delete_with_invalid_id(self):
+ """
+ Negative test: Deletion of Security Group rule should be FAIL
+ with invalid rule id
+ """
+ try:
+ self.client.delete_security_group_rule(rand_name('999'))
+ except exceptions.NotFound:
+ pass
+ else:
+ self.fail('Security Group Rule should not be deleted '
+ 'with nonexistant rule id')
diff --git a/tempest/tests/test_security_groups.py b/tempest/tests/test_security_groups.py
new file mode 100644
index 0000000..bbbe777
--- /dev/null
+++ b/tempest/tests/test_security_groups.py
@@ -0,0 +1,226 @@
+from nose.plugins.attrib import attr
+from tempest import openstack
+import unittest2 as unittest
+from tempest import exceptions
+from tempest.common.utils.data_utils import rand_name
+import time
+
+
+class SecurityGroupsTest(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.os = openstack.Manager()
+ cls.client = cls.os.security_groups_client
+
+ @attr(type='positive')
+ def test_security_groups_create_list_delete(self):
+ """Positive test:Should return the list of Security Groups"""
+ try:
+ #Create 3 Security Groups
+ security_group_list = list()
+ for i in range(3):
+ s_name = rand_name('securitygroup-')
+ s_description = rand_name('description-')
+ resp, securitygroup =\
+ self.client.create_security_group(s_name, s_description)
+ self.assertEqual(200, resp.status)
+ security_group_list.append(securitygroup)
+ #Fetch all Security Groups and verify the list
+ #has all created Security Groups
+ resp, fetched_list = self.client.list_security_groups()
+ self.assertEqual(200, resp.status)
+ #Now check if all the created Security Groups are in fetched list
+ missing_sgs =\
+ [sg for sg in security_group_list if sg not in fetched_list]
+ self.assertFalse(missing_sgs,
+ "Failed to find Security Group %s in fetched list"
+ % ', '.join(m_group['name']
+ for m_group in missing_sgs))
+ finally:
+ #Delete all the Security Groups created in this method
+ for securitygroup in security_group_list:
+ resp, _ =\
+ self.client.delete_security_group(securitygroup['id'])
+ self.assertEqual(202, resp.status)
+
+ @attr(type='smoke')
+ def test_security_group_create_delete(self):
+ """Security Group should be created, verified and deleted"""
+ try:
+ s_name = rand_name('securitygroup-')
+ s_description = rand_name('description-')
+ resp, securitygroup = \
+ self.client.create_security_group(s_name, s_description)
+ self.assertEqual(200, resp.status)
+ self.assertTrue('id' in securitygroup)
+ securitygroup_id = securitygroup['id']
+ self.assertFalse(securitygroup_id is None)
+ self.assertTrue('name' in securitygroup)
+ securitygroup_name = securitygroup['name']
+ self.assertEqual(securitygroup_name, s_name,
+ "The created Security Group name is "
+ "not equal to the requested name")
+ finally:
+ #Delete Security Group created in this method
+ resp, _ = self.client.delete_security_group(securitygroup['id'])
+ self.assertEqual(202, resp.status)
+
+ @attr(type='smoke')
+ def test_security_group_create_get_delete(self):
+ """Security Group should be created, fetched and deleted"""
+ try:
+ s_name = rand_name('securitygroup-')
+ s_description = rand_name('description-')
+ resp, securitygroup =\
+ self.client.create_security_group(s_name, s_description)
+ self.assertEqual(200, resp.status)
+ #Now fetch the created Security Group by its 'id'
+ resp, fetched_group =\
+ self.client.get_security_group(securitygroup['id'])
+ self.assertEqual(200, resp.status)
+ self.assertEqual(securitygroup, fetched_group,
+ "The fetched Security Group is different "
+ "from the created Group")
+ finally:
+ #Delete the Security Group created in this method
+ resp, _ = self.client.delete_security_group(securitygroup['id'])
+ self.assertEqual(202, resp.status)
+
+ @attr(type='negative')
+ def test_security_group_get_nonexistant_group(self):
+ """
+ Negative test:Should not be able to GET the details
+ of nonexistant Security Group
+ """
+ security_group_id = []
+ resp, body = self.client.list_security_groups()
+ for i in range(len(body)):
+ security_group_id.append(body[i]['id'])
+ #Creating a nonexistant Security Group id
+ while True:
+ non_exist_id = rand_name('999')
+ if non_exist_id not in security_group_id:
+ break
+ try:
+ resp, body = \
+ self.client.get_security_group(non_exist_id)
+ except exceptions.NotFound:
+ pass
+ else:
+ self.fail('Should not be able to GET the details from a '
+ 'nonexistant Security Group')
+
+ @attr(type='negative')
+ def test_security_group_create_with_invalid_group_name(self):
+ """
+ Negative test: Security Group should not be created with group name as
+ an empty string/with white spaces/chars more than 255
+ """
+ s_description = rand_name('description-')
+ #Create Security Group with empty string as group name
+ try:
+ resp, _ = self.client.create_security_group("", s_description)
+ except exceptions.BadRequest:
+ pass
+ else:
+ self.fail('Security Group should not be created '
+ 'with EMPTY Name')
+ #Create Security Group with white space in group name
+ try:
+ resp, _ = self.client.create_security_group(" ", s_description)
+ except exceptions.BadRequest:
+ pass
+ else:
+ self.fail('Security Group should not be created '
+ 'with WHITE SPACE in Name')
+ #Create Security Group with group name longer than 255 chars
+ s_name = 'securitygroup-'.ljust(260, '0')
+ try:
+ resp, _ = self.client.create_security_group(s_name, s_description)
+ except exceptions.BadRequest:
+ pass
+ else:
+ self.fail('Security Group should not be created '
+ 'with more than 255 chars in Name')
+
+ @attr(type='negative')
+ def test_security_group_create_with_invalid_group_description(self):
+ """
+ Negative test:Security Group should not be created with description as
+ an empty string/with white spaces/chars more than 255
+ """
+ s_name = rand_name('securitygroup-')
+ #Create Security Group with empty string as description
+ try:
+ resp, _ = self.client.create_security_group(s_name, "")
+ except exceptions.BadRequest:
+ pass
+ else:
+ self.fail('Security Group should not be created '
+ 'with EMPTY Description')
+ #Create Security Group with white space in description
+ try:
+ resp, _ = self.client.create_security_group(s_name, " ")
+ except exceptions.BadRequest:
+ pass
+ else:
+ self.fail('Security Group should not be created '
+ 'with WHITE SPACE in Description')
+ #Create Security Group with group description longer than 255 chars
+ s_description = 'description-'.ljust(260, '0')
+ try:
+ resp, _ = self.client.create_security_group(s_name, s_description)
+ except exceptions.BadRequest:
+ pass
+ else:
+ self.fail('Security Group should not be created '
+ 'with more than 255 chars in Description')
+
+ @attr(type='negative')
+ def test_security_group_create_with_duplicate_name(self):
+ """
+ Negative test:Security Group with duplicate name should not
+ be created
+ """
+ try:
+ s_name = rand_name('securitygroup-')
+ s_description = rand_name('description-')
+ resp, security_group =\
+ self.client.create_security_group(s_name, s_description)
+ self.assertEqual(200, resp.status)
+ #Now try the Security Group with the same 'Name'
+ try:
+ resp, _ =\
+ self.client.create_security_group(s_name, s_description)
+ except exceptions.BadRequest:
+ pass
+ else:
+ self.fail('Security Group should not be created '
+ 'with duplicate Group Name')
+ finally:
+ #Delete the Security Group created in this method
+ resp, _ = self.client.delete_security_group(security_group['id'])
+ self.assertEqual(202, resp.status)
+
+ @attr(type='negative')
+ def test_delete_nonexistant_security_group(self):
+ """
+ Negative test:Deletion of a nonexistant Security Group should Fail
+ """
+ security_group_id = []
+ resp, body = self.client.list_security_groups()
+ for i in range(len(body)):
+ security_group_id.append(body[i]['id'])
+ #Creating Non Existant Security Group
+ while True:
+ non_exist_id = rand_name('999')
+ if non_exist_id not in security_group_id:
+ break
+ try:
+ resp, body = self.client.delete_security_group(non_exist_id)
+ except exceptions.NotFound:
+ pass
+ else:
+ self.fail('Should not be able to delete a nonexistant '
+ 'Security Group')