| # Copyright 2012 OpenStack Foundation |
| # All Rights Reserved. |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); you may |
| # not use this file except in compliance with the License. You may obtain |
| # a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| # License for the specific language governing permissions and limitations |
| # under the License. |
| |
| import six |
| |
| from oslo_log import log as logging |
| from tempest_lib import exceptions as lib_exc |
| |
| from tempest.api.compute import base |
| from tempest.common.utils import data_utils |
| from tempest import config |
| from tempest import test |
| |
| CONF = config.CONF |
| |
| LOG = logging.getLogger(__name__) |
| |
| |
| class AuthorizationTestJSON(base.BaseV2ComputeTest): |
| |
| credentials = ['primary', 'alt'] |
| |
| @classmethod |
| def skip_checks(cls): |
| super(AuthorizationTestJSON, cls).skip_checks() |
| if not CONF.service_available.glance: |
| raise cls.skipException('Glance is not available.') |
| |
| @classmethod |
| def setup_credentials(cls): |
| # No network resources required for this test |
| cls.set_network_resources() |
| super(AuthorizationTestJSON, cls).setup_credentials() |
| |
| @classmethod |
| def setup_clients(cls): |
| super(AuthorizationTestJSON, cls).setup_clients() |
| cls.client = cls.os.servers_client |
| cls.images_client = cls.os.images_client |
| cls.glance_client = cls.os.image_client |
| cls.keypairs_client = cls.os.keypairs_client |
| cls.security_client = cls.os.security_groups_client |
| cls.rule_client = cls.os.security_group_rules_client |
| |
| cls.alt_client = cls.alt_manager.servers_client |
| cls.alt_images_client = cls.alt_manager.images_client |
| cls.alt_keypairs_client = cls.alt_manager.keypairs_client |
| cls.alt_security_client = cls.alt_manager.security_groups_client |
| cls.alt_rule_client = cls.alt_manager.security_group_rules_client |
| |
| @classmethod |
| def resource_setup(cls): |
| super(AuthorizationTestJSON, cls).resource_setup() |
| server = cls.create_test_server(wait_until='ACTIVE') |
| cls.server = cls.client.show_server(server['id'])['server'] |
| |
| name = data_utils.rand_name('image') |
| body = cls.glance_client.create_image(name=name, |
| container_format='bare', |
| disk_format='raw', |
| is_public=False)['image'] |
| image_id = body['id'] |
| image_file = six.StringIO(('*' * 1024)) |
| body = cls.glance_client.update_image(image_id, |
| data=image_file)['image'] |
| cls.glance_client.wait_for_image_status(image_id, 'active') |
| cls.image = cls.images_client.show_image(image_id)['image'] |
| |
| cls.keypairname = data_utils.rand_name('keypair') |
| cls.keypairs_client.create_keypair(name=cls.keypairname) |
| |
| name = data_utils.rand_name('security') |
| description = data_utils.rand_name('description') |
| cls.security_group = cls.security_client.create_security_group( |
| name=name, description=description)['security_group'] |
| |
| parent_group_id = cls.security_group['id'] |
| ip_protocol = 'tcp' |
| from_port = 22 |
| to_port = 22 |
| cls.rule = cls.rule_client.create_security_group_rule( |
| parent_group_id=parent_group_id, ip_protocol=ip_protocol, |
| from_port=from_port, to_port=to_port)['security_group_rule'] |
| |
| @classmethod |
| def resource_cleanup(cls): |
| if hasattr(cls, 'image'): |
| cls.images_client.delete_image(cls.image['id']) |
| if hasattr(cls, 'keypairname'): |
| cls.keypairs_client.delete_keypair(cls.keypairname) |
| if hasattr(cls, 'security_group'): |
| cls.security_client.delete_security_group(cls.security_group['id']) |
| super(AuthorizationTestJSON, cls).resource_cleanup() |
| |
| @test.idempotent_id('56816e4a-bd34-47b5-aee9-268c3efeb5d4') |
| def test_get_server_for_alt_account_fails(self): |
| # A GET request for a server on another user's account should fail |
| self.assertRaises(lib_exc.NotFound, self.alt_client.show_server, |
| self.server['id']) |
| |
| @test.idempotent_id('fb8a4870-6d9d-44ad-8375-95d52e98d9f6') |
| def test_delete_server_for_alt_account_fails(self): |
| # A DELETE request for another user's server should fail |
| self.assertRaises(lib_exc.NotFound, self.alt_client.delete_server, |
| self.server['id']) |
| |
| @test.idempotent_id('d792f91f-1d49-4eb5-b1ff-b229c4b9dc64') |
| def test_update_server_for_alt_account_fails(self): |
| # An update server request for another user's server should fail |
| self.assertRaises(lib_exc.NotFound, self.alt_client.update_server, |
| self.server['id'], name='test') |
| |
| @test.idempotent_id('488f24df-d7f7-4207-949a-f17fcb8e8769') |
| def test_list_server_addresses_for_alt_account_fails(self): |
| # A list addresses request for another user's server should fail |
| self.assertRaises(lib_exc.NotFound, self.alt_client.list_addresses, |
| self.server['id']) |
| |
| @test.idempotent_id('00b442d0-2e72-40e7-9b1f-31772e36da01') |
| def test_list_server_addresses_by_network_for_alt_account_fails(self): |
| # A list address/network request for another user's server should fail |
| server_id = self.server['id'] |
| self.assertRaises(lib_exc.NotFound, |
| self.alt_client.list_addresses_by_network, server_id, |
| 'public') |
| |
| @test.idempotent_id('cc90b35a-19f0-45d2-b680-2aabf934aa22') |
| def test_list_servers_with_alternate_tenant(self): |
| # A list on servers from one tenant should not |
| # show on alternate tenant |
| # Listing servers from alternate tenant |
| alt_server_ids = [] |
| body = self.alt_client.list_servers() |
| alt_server_ids = [s['id'] for s in body['servers']] |
| self.assertNotIn(self.server['id'], alt_server_ids) |
| |
| @test.idempotent_id('376dbc16-0779-4384-a723-752774799641') |
| def test_change_password_for_alt_account_fails(self): |
| # A change password request for another user's server should fail |
| self.assertRaises(lib_exc.NotFound, self.alt_client.change_password, |
| self.server['id'], 'newpass') |
| |
| @test.idempotent_id('14cb5ff5-f646-45ca-8f51-09081d6c0c24') |
| def test_reboot_server_for_alt_account_fails(self): |
| # A reboot request for another user's server should fail |
| self.assertRaises(lib_exc.NotFound, self.alt_client.reboot_server, |
| self.server['id'], 'HARD') |
| |
| @test.idempotent_id('8a0bce51-cd00-480b-88ba-dbc7d8408a37') |
| def test_rebuild_server_for_alt_account_fails(self): |
| # A rebuild request for another user's server should fail |
| self.assertRaises(lib_exc.NotFound, self.alt_client.rebuild_server, |
| self.server['id'], self.image_ref_alt) |
| |
| @test.idempotent_id('e4da647e-f982-4e61-9dad-1d1abebfb933') |
| def test_resize_server_for_alt_account_fails(self): |
| # A resize request for another user's server should fail |
| self.assertRaises(lib_exc.NotFound, self.alt_client.resize_server, |
| self.server['id'], self.flavor_ref_alt) |
| |
| @test.idempotent_id('a9fe8112-0ffa-4902-b061-f892bd5fe0d3') |
| def test_create_image_for_alt_account_fails(self): |
| # A create image request for another user's server should fail |
| self.assertRaises(lib_exc.NotFound, |
| self.alt_images_client.create_image, |
| self.server['id'], name='testImage') |
| |
| @test.idempotent_id('95d445f6-babc-4f2e-aea3-aa24ec5e7f0d') |
| def test_create_server_with_unauthorized_image(self): |
| # Server creation with another user's image should fail |
| self.assertRaises(lib_exc.BadRequest, self.alt_client.create_server, |
| name='test', imageRef=self.image['id'], |
| flavorRef=self.flavor_ref) |
| |
| @test.idempotent_id('acf8724b-142b-4044-82c3-78d31a533f24') |
| def test_create_server_fails_when_tenant_incorrect(self): |
| # BUG(sdague): this test should fail because of bad auth url, |
| # which means that when we run with a service catalog without |
| # project_id in the urls, it should fail to fail, and thus |
| # fail the test. It does not. |
| # |
| # The 400 BadRequest is clearly ambiguous, and something else |
| # is wrong about this request. This should be fixed. |
| # |
| # A create server request should fail if the tenant id does not match |
| # the current user |
| # Change the base URL to impersonate another user |
| self.alt_client.auth_provider.set_alt_auth_data( |
| request_part='url', |
| auth_data=self.client.auth_provider.auth_data |
| ) |
| self.assertRaises(lib_exc.BadRequest, |
| self.alt_client.create_server, name='test', |
| imageRef=self.image['id'], flavorRef=self.flavor_ref) |
| |
| @test.idempotent_id('f03d1ded-7fd4-4d29-bc13-e2391f29c625') |
| def test_create_keypair_in_analt_user_tenant(self): |
| """create keypair should not function for alternate tenant |
| |
| POST {alt_service_url}/os-keypairs |
| |
| Attempt to create a keypair against an alternate tenant by |
| changing using a different tenant's service url. This should |
| return a BadRequest. This tests basic tenant isolation protections. |
| |
| NOTE(sdague): if the environment does not use project_id in |
| the service urls, this test is not valid. Skip under these |
| conditions. |
| |
| """ |
| if self.alt_keypairs_client.base_url == self.keypairs_client.base_url: |
| raise self.skipException("Service urls don't include project_id") |
| |
| k_name = data_utils.rand_name('keypair') |
| try: |
| # Change the base URL to impersonate another user |
| self.alt_keypairs_client.auth_provider.set_alt_auth_data( |
| request_part='url', |
| auth_data=self.keypairs_client.auth_provider.auth_data |
| ) |
| resp = {} |
| resp['status'] = None |
| self.assertRaises(lib_exc.BadRequest, |
| self.alt_keypairs_client.create_keypair, |
| name=k_name) |
| finally: |
| # Next request the base_url is back to normal |
| if (resp['status'] is not None): |
| self.alt_keypairs_client.delete_keypair(k_name) |
| LOG.error("Create keypair request should not happen " |
| "if the tenant id does not match the current user") |
| |
| @test.idempotent_id('85bcdd8f-56b4-4868-ae56-63fbf6f7e405') |
| def test_get_keypair_of_alt_account_fails(self): |
| # A GET request for another user's keypair should fail |
| self.assertRaises(lib_exc.NotFound, |
| self.alt_keypairs_client.show_keypair, |
| self.keypairname) |
| |
| @test.idempotent_id('6d841683-a8e0-43da-a1b8-b339f7692b61') |
| def test_delete_keypair_of_alt_account_fails(self): |
| # A DELETE request for another user's keypair should fail |
| self.assertRaises(lib_exc.NotFound, |
| self.alt_keypairs_client.delete_keypair, |
| self.keypairname) |
| |
| @test.idempotent_id('fcb2e144-36e3-4dfb-9f9f-e72fcdec5656') |
| def test_get_image_for_alt_account_fails(self): |
| # A GET request for an image on another user's account should fail |
| self.assertRaises(lib_exc.NotFound, |
| self.alt_images_client.show_image, self.image['id']) |
| |
| @test.idempotent_id('9facb962-f043-4a9d-b9ee-166a32dea098') |
| def test_delete_image_for_alt_account_fails(self): |
| # A DELETE request for another user's image should fail |
| self.assertRaises(lib_exc.NotFound, |
| self.alt_images_client.delete_image, |
| self.image['id']) |
| |
| @test.idempotent_id('752c917e-83be-499d-a422-3559127f7d3c') |
| def test_create_security_group_in_analt_user_tenant(self): |
| """create security group should not function for alternate tenant |
| |
| POST {alt_service_url}/os-security-groups |
| |
| Attempt to create a security group against an alternate tenant |
| by changing using a different tenant's service url. This |
| should return a BadRequest. This tests basic tenant isolation |
| protections. |
| |
| NOTE(sdague): if the environment does not use project_id in |
| the service urls, this test is not valid. Skip under these |
| conditions. |
| |
| """ |
| if self.alt_security_client.base_url == self.security_client.base_url: |
| raise self.skipException("Service urls don't include project_id") |
| |
| s_name = data_utils.rand_name('security') |
| s_description = data_utils.rand_name('security') |
| try: |
| # Change the base URL to impersonate another user |
| self.alt_security_client.auth_provider.set_alt_auth_data( |
| request_part='url', |
| auth_data=self.security_client.auth_provider.auth_data |
| ) |
| resp = {} |
| resp['status'] = None |
| self.assertRaises(lib_exc.BadRequest, |
| self.alt_security_client.create_security_group, |
| name=s_name, description=s_description) |
| finally: |
| # Next request the base_url is back to normal |
| if resp['status'] is not None: |
| self.alt_security_client.delete_security_group(resp['id']) |
| LOG.error("Create Security Group request should not happen if" |
| "the tenant id does not match the current user") |
| |
| @test.idempotent_id('9db3590f-4d15-4e5f-985e-b28514919a6f') |
| def test_get_security_group_of_alt_account_fails(self): |
| # A GET request for another user's security group should fail |
| self.assertRaises(lib_exc.NotFound, |
| self.alt_security_client.show_security_group, |
| self.security_group['id']) |
| |
| @test.idempotent_id('155387a5-2bbc-4acf-ab06-698dae537ea5') |
| def test_delete_security_group_of_alt_account_fails(self): |
| # A DELETE request for another user's security group should fail |
| self.assertRaises(lib_exc.NotFound, |
| self.alt_security_client.delete_security_group, |
| self.security_group['id']) |
| |
| @test.idempotent_id('b2b76de0-210a-4089-b921-591c9ec552f6') |
| def test_create_security_group_rule_in_analt_user_tenant(self): |
| """create security group rule should not function for alternate tenant |
| |
| POST {alt_service_url}/os-security-group-rules |
| |
| Attempt to create a security group rule against an alternate |
| tenant by changing using a different tenant's service |
| url. This should return a BadRequest. This tests basic tenant |
| isolation protections. |
| |
| NOTE(sdague): if the environment does not use project_id in |
| the service urls, this test is not valid. Skip under these |
| conditions. |
| |
| """ |
| if self.alt_security_client.base_url == self.security_client.base_url: |
| raise self.skipException("Service urls don't include project_id") |
| |
| parent_group_id = self.security_group['id'] |
| ip_protocol = 'icmp' |
| from_port = -1 |
| to_port = -1 |
| try: |
| # Change the base URL to impersonate another user |
| self.alt_rule_client.auth_provider.set_alt_auth_data( |
| request_part='url', |
| auth_data=self.rule_client.auth_provider.auth_data |
| ) |
| resp = {} |
| resp['status'] = None |
| self.assertRaises(lib_exc.BadRequest, |
| self.alt_rule_client. |
| create_security_group_rule, |
| parent_group_id=parent_group_id, |
| ip_protocol=ip_protocol, |
| from_port=from_port, to_port=to_port) |
| finally: |
| # Next request the base_url is back to normal |
| if resp['status'] is not None: |
| self.alt_rule_client.delete_security_group_rule(resp['id']) |
| LOG.error("Create security group rule request should not " |
| "happen if the tenant id does not match the" |
| " current user") |
| |
| @test.idempotent_id('c6044177-37ef-4ce4-b12c-270ddf26d7da') |
| def test_delete_security_group_rule_of_alt_account_fails(self): |
| # A DELETE request for another user's security group rule |
| # should fail |
| self.assertRaises(lib_exc.NotFound, |
| self.alt_rule_client.delete_security_group_rule, |
| self.rule['id']) |
| |
| @test.idempotent_id('c5f52351-53d9-4fc9-83e5-917f7f5e3d71') |
| def test_set_metadata_of_alt_account_server_fails(self): |
| # A set metadata for another user's server should fail |
| req_metadata = {'meta1': 'data1', 'meta2': 'data2'} |
| self.assertRaises(lib_exc.NotFound, |
| self.alt_client.set_server_metadata, |
| self.server['id'], |
| req_metadata) |
| |
| @test.idempotent_id('fb6f51e9-df15-4939-898d-1aca38c258f0') |
| def test_set_metadata_of_alt_account_image_fails(self): |
| # A set metadata for another user's image should fail |
| req_metadata = {'meta1': 'value1', 'meta2': 'value2'} |
| self.assertRaises(lib_exc.NotFound, |
| self.alt_images_client.set_image_metadata, |
| self.image['id'], req_metadata) |
| |
| @test.idempotent_id('dea1936a-473d-49f2-92ad-97bb7aded22e') |
| def test_get_metadata_of_alt_account_server_fails(self): |
| # A get metadata for another user's server should fail |
| req_metadata = {'meta1': 'data1'} |
| self.client.set_server_metadata(self.server['id'], req_metadata) |
| self.addCleanup(self.client.delete_server_metadata_item, |
| self.server['id'], 'meta1') |
| self.assertRaises(lib_exc.NotFound, |
| self.alt_client.get_server_metadata_item, |
| self.server['id'], 'meta1') |
| |
| @test.idempotent_id('16b2d724-0d3b-4216-a9fa-97bd4d9cf670') |
| def test_get_metadata_of_alt_account_image_fails(self): |
| # A get metadata for another user's image should fail |
| req_metadata = {'meta1': 'value1'} |
| self.addCleanup(self.images_client.delete_image_metadata_item, |
| self.image['id'], 'meta1') |
| self.images_client.set_image_metadata(self.image['id'], |
| req_metadata) |
| self.assertRaises(lib_exc.NotFound, |
| self.alt_images_client.show_image_metadata_item, |
| self.image['id'], 'meta1') |
| |
| @test.idempotent_id('79531e2e-e721-493c-8b30-a35db36fdaa6') |
| def test_delete_metadata_of_alt_account_server_fails(self): |
| # A delete metadata for another user's server should fail |
| req_metadata = {'meta1': 'data1'} |
| self.addCleanup(self.client.delete_server_metadata_item, |
| self.server['id'], 'meta1') |
| self.client.set_server_metadata(self.server['id'], req_metadata) |
| self.assertRaises(lib_exc.NotFound, |
| self.alt_client.delete_server_metadata_item, |
| self.server['id'], 'meta1') |
| |
| @test.idempotent_id('a5175dcf-cef8-43d6-9b77-3cb707d62e94') |
| def test_delete_metadata_of_alt_account_image_fails(self): |
| # A delete metadata for another user's image should fail |
| req_metadata = {'meta1': 'data1'} |
| self.addCleanup(self.images_client.delete_image_metadata_item, |
| self.image['id'], 'meta1') |
| self.images_client.set_image_metadata(self.image['id'], |
| req_metadata) |
| self.assertRaises(lib_exc.NotFound, |
| self.alt_images_client.delete_image_metadata_item, |
| self.image['id'], 'meta1') |
| |
| @test.idempotent_id('b0c1e7a0-8853-40fd-8384-01f93d116cae') |
| def test_get_console_output_of_alt_account_server_fails(self): |
| # A Get Console Output for another user's server should fail |
| self.assertRaises(lib_exc.NotFound, |
| self.alt_client.get_console_output, |
| self.server['id'], 10) |