Use tempest plugin interface
Make use of the Tempest plugin interface instead of copying Neutron
files into Tempest. This will remove the burden to port Neutron
tests onto Tempest master recurrently.
It uses neutron/tests/tempest/ as new top folder for all Tempest
tests. It follows the model of Heat [1].
[1]: https://github.com/openstack/heat/tree/master/heat_integrationtests
Partially implements bp external-plugin-interface
Change-Id: Ia233aa162746845f6ae08a8157dcd242dcd58eab
diff --git a/neutron/tests/tempest/api/__init__.py b/neutron/tests/tempest/api/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/neutron/tests/tempest/api/__init__.py
diff --git a/neutron/tests/tempest/api/admin/__init__.py b/neutron/tests/tempest/api/admin/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/neutron/tests/tempest/api/admin/__init__.py
diff --git a/neutron/tests/tempest/api/admin/test_agent_management.py b/neutron/tests/tempest/api/admin/test_agent_management.py
new file mode 100644
index 0000000..4e95391
--- /dev/null
+++ b/neutron/tests/tempest/api/admin/test_agent_management.py
@@ -0,0 +1,89 @@
+# Copyright 2013 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.tests.tempest.common import tempest_fixtures
+from tempest import test
+
+from neutron.tests.tempest.api import base
+
+
+class AgentManagementTestJSON(base.BaseAdminNetworkTest):
+
+ @classmethod
+ @test.requires_ext(extension="agent", service="network")
+ def resource_setup(cls):
+ super(AgentManagementTestJSON, cls).resource_setup()
+ body = cls.admin_client.list_agents()
+ agents = body['agents']
+ cls.agent = agents[0] # don't modify this agent
+ cls.dyn_agent = agents[1]
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('9c80f04d-11f3-44a4-8738-ed2f879b0ff4')
+ def test_list_agent(self):
+ body = self.admin_client.list_agents()
+ agents = body['agents']
+ # Heartbeats must be excluded from comparison
+ self.agent.pop('heartbeat_timestamp', None)
+ self.agent.pop('configurations', None)
+ for agent in agents:
+ agent.pop('heartbeat_timestamp', None)
+ agent.pop('configurations', None)
+ self.assertIn(self.agent, agents)
+
+ @test.attr(type=['smoke'])
+ @test.idempotent_id('e335be47-b9a1-46fd-be30-0874c0b751e6')
+ def test_list_agents_non_admin(self):
+ body = self.client.list_agents()
+ self.assertEqual(len(body["agents"]), 0)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('869bc8e8-0fda-4a30-9b71-f8a7cf58ca9f')
+ def test_show_agent(self):
+ body = self.admin_client.show_agent(self.agent['id'])
+ agent = body['agent']
+ self.assertEqual(agent['id'], self.agent['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('371dfc5b-55b9-4cb5-ac82-c40eadaac941')
+ def test_update_agent_status(self):
+ origin_status = self.agent['admin_state_up']
+ # Try to update the 'admin_state_up' to the original
+ # one to avoid the negative effect.
+ agent_status = {'admin_state_up': origin_status}
+ body = self.admin_client.update_agent(agent_id=self.agent['id'],
+ agent_info=agent_status)
+ updated_status = body['agent']['admin_state_up']
+ self.assertEqual(origin_status, updated_status)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('68a94a14-1243-46e6-83bf-157627e31556')
+ def test_update_agent_description(self):
+ self.useFixture(tempest_fixtures.LockFixture('agent_description'))
+ description = 'description for update agent.'
+ agent_description = {'description': description}
+ body = self.admin_client.update_agent(agent_id=self.dyn_agent['id'],
+ agent_info=agent_description)
+ self.addCleanup(self._restore_agent)
+ updated_description = body['agent']['description']
+ self.assertEqual(updated_description, description)
+
+ def _restore_agent(self):
+ """
+ Restore the agent description after update test.
+ """
+ description = self.dyn_agent['description']
+ origin_agent = {'description': description}
+ self.admin_client.update_agent(agent_id=self.dyn_agent['id'],
+ agent_info=origin_agent)
diff --git a/neutron/tests/tempest/api/admin/test_dhcp_agent_scheduler.py b/neutron/tests/tempest/api/admin/test_dhcp_agent_scheduler.py
new file mode 100644
index 0000000..049afe5
--- /dev/null
+++ b/neutron/tests/tempest/api/admin/test_dhcp_agent_scheduler.py
@@ -0,0 +1,96 @@
+# Copyright 2013 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest import test
+
+from neutron.tests.tempest.api import base
+
+
+class DHCPAgentSchedulersTestJSON(base.BaseAdminNetworkTest):
+
+ @classmethod
+ @test.requires_ext(extension="dhcp_agent_scheduler", service="network")
+ def resource_setup(cls):
+ super(DHCPAgentSchedulersTestJSON, cls).resource_setup()
+ # Create a network and make sure it will be hosted by a
+ # dhcp agent: this is done by creating a regular port
+ cls.network = cls.create_network()
+ cls.subnet = cls.create_subnet(cls.network)
+ cls.cidr = cls.subnet['cidr']
+ cls.port = cls.create_port(cls.network)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('5032b1fe-eb42-4a64-8f3b-6e189d8b5c7d')
+ def test_list_dhcp_agent_hosting_network(self):
+ self.admin_client.list_dhcp_agent_hosting_network(
+ self.network['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('30c48f98-e45d-4ffb-841c-b8aad57c7587')
+ def test_list_networks_hosted_by_one_dhcp(self):
+ body = self.admin_client.list_dhcp_agent_hosting_network(
+ self.network['id'])
+ agents = body['agents']
+ self.assertIsNotNone(agents)
+ agent = agents[0]
+ self.assertTrue(self._check_network_in_dhcp_agent(
+ self.network['id'], agent))
+
+ def _check_network_in_dhcp_agent(self, network_id, agent):
+ network_ids = []
+ body = self.admin_client.list_networks_hosted_by_one_dhcp_agent(
+ agent['id'])
+ networks = body['networks']
+ for network in networks:
+ network_ids.append(network['id'])
+ return network_id in network_ids
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('a0856713-6549-470c-a656-e97c8df9a14d')
+ def test_add_remove_network_from_dhcp_agent(self):
+ # The agent is now bound to the network, we can free the port
+ self.client.delete_port(self.port['id'])
+ self.ports.remove(self.port)
+ agent = dict()
+ agent['agent_type'] = None
+ body = self.admin_client.list_agents()
+ agents = body['agents']
+ for a in agents:
+ if a['agent_type'] == 'DHCP agent':
+ agent = a
+ break
+ self.assertEqual(agent['agent_type'], 'DHCP agent', 'Could not find '
+ 'DHCP agent in agent list though dhcp_agent_scheduler'
+ ' is enabled.')
+ network = self.create_network()
+ network_id = network['id']
+ if self._check_network_in_dhcp_agent(network_id, agent):
+ self._remove_network_from_dhcp_agent(network_id, agent)
+ self._add_dhcp_agent_to_network(network_id, agent)
+ else:
+ self._add_dhcp_agent_to_network(network_id, agent)
+ self._remove_network_from_dhcp_agent(network_id, agent)
+
+ def _remove_network_from_dhcp_agent(self, network_id, agent):
+ self.admin_client.remove_network_from_dhcp_agent(
+ agent_id=agent['id'],
+ network_id=network_id)
+ self.assertFalse(self._check_network_in_dhcp_agent(
+ network_id, agent))
+
+ def _add_dhcp_agent_to_network(self, network_id, agent):
+ self.admin_client.add_dhcp_agent_to_network(agent['id'],
+ network_id)
+ self.assertTrue(self._check_network_in_dhcp_agent(
+ network_id, agent))
diff --git a/neutron/tests/tempest/api/admin/test_extension_driver_port_security_admin.py b/neutron/tests/tempest/api/admin/test_extension_driver_port_security_admin.py
new file mode 100644
index 0000000..1f602ae
--- /dev/null
+++ b/neutron/tests/tempest/api/admin/test_extension_driver_port_security_admin.py
@@ -0,0 +1,34 @@
+# Copyright 2015 Cisco Systems, Inc.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+
+from neutron.tests.tempest.api import base
+from neutron.tests.tempest.api import base_security_groups as base_security
+
+
+class PortSecurityAdminTests(base_security.BaseSecGroupTest,
+ base.BaseAdminNetworkTest):
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('d39a96e2-2dea-4feb-8093-e7ac991ce6f8')
+ @test.requires_ext(extension='port-security', service='network')
+ def test_create_port_security_false_on_shared_network(self):
+ network = self.create_shared_network()
+ self.assertTrue(network['shared'])
+ self.create_subnet(network, client=self.admin_client)
+ self.assertRaises(lib_exc.Forbidden, self.create_port,
+ network, port_security_enabled=False)
diff --git a/neutron/tests/tempest/api/admin/test_external_network_extension.py b/neutron/tests/tempest/api/admin/test_external_network_extension.py
new file mode 100644
index 0000000..5403ad0
--- /dev/null
+++ b/neutron/tests/tempest/api/admin/test_external_network_extension.py
@@ -0,0 +1,183 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo_config import cfg
+from tempest.lib.common.utils import data_utils
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+import testtools
+
+from neutron.tests.tempest.api import base
+
+
+class ExternalNetworksRBACTestJSON(base.BaseAdminNetworkTest):
+
+ credentials = ['primary', 'alt', 'admin']
+
+ @classmethod
+ @test.requires_ext(extension="rbac-policies", service="network")
+ def resource_setup(cls):
+ super(ExternalNetworksRBACTestJSON, cls).resource_setup()
+ cls.client2 = cls.alt_manager.network_client
+
+ def _create_network(self, external=True):
+ post_body = {'name': data_utils.rand_name('network-')}
+ if external:
+ post_body['router:external'] = external
+ body = self.admin_client.create_network(**post_body)
+ network = body['network']
+ self.addCleanup(self.admin_client.delete_network, network['id'])
+ return network
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('afd8f1b7-a81e-4629-bca8-a367b3a144bb')
+ def test_regular_client_shares_with_another(self):
+ net = self.create_network()
+ self.client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_external',
+ target_tenant=self.client2.tenant_id)
+ body = self.client2.list_networks()
+ networks_list = [n['id'] for n in body['networks']]
+ self.assertIn(net['id'], networks_list)
+ r = self.client2.create_router(
+ data_utils.rand_name('router-'),
+ external_gateway_info={'network_id': net['id']})['router']
+ self.addCleanup(self.admin_client.delete_router, r['id'])
+
+ @test.idempotent_id('afd8f1b7-a81e-4629-bca8-a367b3a144bb')
+ def test_regular_client_blocked_from_creating_external_wild_policies(self):
+ net = self.create_network()
+ with testtools.ExpectedException(lib_exc.Forbidden):
+ self.client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_external',
+ target_tenant='*')
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('a2e19f06-48a9-4e4c-b717-08cb2008707d')
+ def test_wildcard_policy_created_from_external_network_api(self):
+ # create external makes wildcard
+ net_id = self._create_network(external=True)['id']
+ self.assertEqual(1, len(self.admin_client.list_rbac_policies(
+ object_id=net_id, action='access_as_external',
+ target_tenant='*')['rbac_policies']))
+ # update to non-external clears wildcard
+ self.admin_client.update_network(net_id, **{'router:external': False})
+ self.assertEqual(0, len(self.admin_client.list_rbac_policies(
+ object_id=net_id, action='access_as_external',
+ target_tenant='*')['rbac_policies']))
+ # create non-external has no wildcard
+ net_id = self._create_network(external=False)['id']
+ self.assertEqual(0, len(self.admin_client.list_rbac_policies(
+ object_id=net_id, action='access_as_external',
+ target_tenant='*')['rbac_policies']))
+ # update to external makes wildcard
+ self.admin_client.update_network(net_id, **{'router:external': True})
+ self.assertEqual(1, len(self.admin_client.list_rbac_policies(
+ object_id=net_id, action='access_as_external',
+ target_tenant='*')['rbac_policies']))
+
+ @test.idempotent_id('a5539002-5bdb-48b5-b124-e9eedd5975e6')
+ def test_external_conversion_on_policy_create(self):
+ net_id = self._create_network(external=False)['id']
+ self.admin_client.create_rbac_policy(
+ object_type='network', object_id=net_id,
+ action='access_as_external',
+ target_tenant=self.client2.tenant_id)
+ body = self.admin_client.show_network(net_id)['network']
+ self.assertTrue(body['router:external'])
+
+ @test.idempotent_id('01364c50-bfb6-46c4-b44c-edc4564d61cf')
+ def test_policy_allows_tenant_to_allocate_floatingip(self):
+ net = self._create_network(external=False)
+ # share to the admin client so it gets converted to external but
+ # not shared to everyone
+ self.admin_client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_external',
+ target_tenant=self.admin_client.tenant_id)
+ self.create_subnet(net, client=self.admin_client, enable_dhcp=False)
+ with testtools.ExpectedException(lib_exc.NotFound):
+ self.client2.create_floatingip(
+ floating_network_id=net['id'])
+ self.admin_client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_external',
+ target_tenant=self.client2.tenant_id)
+ self.client2.create_floatingip(
+ floating_network_id=net['id'])
+
+ @test.idempotent_id('476be1e0-f72e-47dc-9a14-4435926bbe82')
+ def test_policy_allows_tenant_to_attach_ext_gw(self):
+ net = self._create_network(external=False)
+ self.create_subnet(net, client=self.admin_client, enable_dhcp=False)
+ self.admin_client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_external',
+ target_tenant=self.client2.tenant_id)
+ r = self.client2.create_router(
+ data_utils.rand_name('router-'),
+ external_gateway_info={'network_id': net['id']})['router']
+ self.addCleanup(self.admin_client.delete_router, r['id'])
+
+ @test.idempotent_id('d54decee-4203-4ced-91a2-ea42ca63e154')
+ def test_delete_policies_while_tenant_attached_to_net(self):
+ net = self._create_network(external=False)
+ self.create_subnet(net, client=self.admin_client, enable_dhcp=False)
+ wildcard = self.admin_client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_external',
+ target_tenant='*')['rbac_policy']
+ r = self.client2.create_router(
+ data_utils.rand_name('router-'),
+ external_gateway_info={'network_id': net['id']})['router']
+ # delete should fail because the wildcard is required for the tenant's
+ # access
+ with testtools.ExpectedException(lib_exc.Conflict):
+ self.admin_client.delete_rbac_policy(wildcard['id'])
+ tenant = self.admin_client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_external',
+ target_tenant=self.client2.tenant_id)['rbac_policy']
+ # now we can delete the policy because the tenant has its own policy
+ # to allow it access
+ self.admin_client.delete_rbac_policy(wildcard['id'])
+ # but now we can't delete the tenant's policy without the wildcard
+ with testtools.ExpectedException(lib_exc.Conflict):
+ self.admin_client.delete_rbac_policy(tenant['id'])
+ wildcard = self.admin_client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_external',
+ target_tenant='*')['rbac_policy']
+ # with the wildcard added back we can delete the tenant's policy
+ self.admin_client.delete_rbac_policy(tenant['id'])
+ self.admin_client.delete_router(r['id'])
+ # now without the tenant attached, the wildcard can be deleted
+ self.admin_client.delete_rbac_policy(wildcard['id'])
+ # finally we ensure that the tenant can't attach to the network since
+ # there are no policies allowing it
+ with testtools.ExpectedException(lib_exc.NotFound):
+ self.client2.create_router(
+ data_utils.rand_name('router-'),
+ external_gateway_info={'network_id': net['id']})
+
+ @test.idempotent_id('7041cec7-d8fe-4c78-9b04-b51b2fd49dc9')
+ def test_wildcard_policy_delete_blocked_on_default_ext(self):
+ public_net_id = cfg.CONF.network.public_network_id
+ # ensure it is default before so we don't wipe out the policy
+ self.admin_client.update_network(public_net_id, is_default=True)
+ policy = self.admin_client.list_rbac_policies(
+ object_id=public_net_id, action='access_as_external',
+ target_tenant='*')['rbac_policies'][0]
+ with testtools.ExpectedException(lib_exc.Conflict):
+ self.admin_client.delete_rbac_policy(policy['id'])
diff --git a/neutron/tests/tempest/api/admin/test_floating_ips_admin_actions.py b/neutron/tests/tempest/api/admin/test_floating_ips_admin_actions.py
new file mode 100644
index 0000000..b8512ad
--- /dev/null
+++ b/neutron/tests/tempest/api/admin/test_floating_ips_admin_actions.py
@@ -0,0 +1,89 @@
+# Copyright 2014 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib.common.utils import data_utils
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+import testtools
+
+from neutron.tests.tempest.api import base
+from neutron.tests.tempest import config
+
+CONF = config.CONF
+
+
+class FloatingIPAdminTestJSON(base.BaseAdminNetworkTest):
+ force_tenant_isolation = True
+ credentials = ['primary', 'alt', 'admin']
+
+ @classmethod
+ def resource_setup(cls):
+ super(FloatingIPAdminTestJSON, cls).resource_setup()
+ cls.ext_net_id = CONF.network.public_network_id
+ cls.floating_ip = cls.create_floatingip(cls.ext_net_id)
+ cls.alt_client = cls.alt_manager.network_client
+ cls.network = cls.create_network()
+ cls.subnet = cls.create_subnet(cls.network)
+ cls.router = cls.create_router(data_utils.rand_name('router-'),
+ external_network_id=cls.ext_net_id)
+ cls.create_router_interface(cls.router['id'], cls.subnet['id'])
+ cls.port = cls.create_port(cls.network)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('11116ee9-4e99-5b15-b8e1-aa7df92ca589')
+ def test_associate_floating_ip_with_port_from_another_tenant(self):
+ body = self.admin_client.create_floatingip(
+ floating_network_id=self.ext_net_id)
+ floating_ip = body['floatingip']
+ test_tenant = data_utils.rand_name('test_tenant_')
+ test_description = data_utils.rand_name('desc_')
+ tenant = self.identity_admin_client.create_tenant(
+ name=test_tenant, description=test_description)['tenant']
+ tenant_id = tenant['id']
+ self.addCleanup(self.identity_admin_client.delete_tenant, tenant_id)
+
+ port = self.admin_client.create_port(network_id=self.network['id'],
+ tenant_id=tenant_id)
+ self.addCleanup(self.admin_client.delete_port, port['port']['id'])
+ self.assertRaises(lib_exc.BadRequest,
+ self.admin_client.update_floatingip,
+ floating_ip['id'], port_id=port['port']['id'])
+
+ @testtools.skipUnless(
+ CONF.neutron_plugin_options.specify_floating_ip_address_available,
+ "Feature for specifying floating IP address is disabled")
+ @test.attr(type='smoke')
+ @test.idempotent_id('332a8ae4-402e-4b98-bb6f-532e5a87b8e0')
+ def test_create_floatingip_with_specified_ip_address(self):
+ # other tests may end up stealing the IP before we can use it
+ # since it's on the external network so we need to retry if it's
+ # in use.
+ for i in range(100):
+ fip = self.get_unused_ip(self.ext_net_id, ip_version=4)
+ try:
+ body = self.admin_client.create_floatingip(
+ floating_network_id=self.ext_net_id,
+ floating_ip_address=fip)
+ break
+ except lib_exc.Conflict:
+ pass
+ else:
+ self.fail("Could not get an unused IP after 100 attempts")
+ created_floating_ip = body['floatingip']
+ self.addCleanup(self.admin_client.delete_floatingip,
+ created_floating_ip['id'])
+ self.assertIsNotNone(created_floating_ip['id'])
+ self.assertIsNotNone(created_floating_ip['tenant_id'])
+ self.assertEqual(created_floating_ip['floating_ip_address'], fip)
diff --git a/neutron/tests/tempest/api/admin/test_l3_agent_scheduler.py b/neutron/tests/tempest/api/admin/test_l3_agent_scheduler.py
new file mode 100644
index 0000000..de09ba1
--- /dev/null
+++ b/neutron/tests/tempest/api/admin/test_l3_agent_scheduler.py
@@ -0,0 +1,105 @@
+# Copyright 2013 IBM Corp.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib.common.utils import data_utils
+from tempest import test
+
+from neutron.tests.tempest.api import base
+from neutron.tests.tempest import exceptions
+
+AGENT_TYPE = 'L3 agent'
+AGENT_MODES = (
+ 'legacy',
+ 'dvr_snat'
+)
+
+
+class L3AgentSchedulerTestJSON(base.BaseAdminNetworkTest):
+ _agent_mode = 'legacy'
+
+ """
+ Tests the following operations in the Neutron API using the REST client for
+ Neutron:
+
+ List routers that the given L3 agent is hosting.
+ List L3 agents hosting the given router.
+ Add and Remove Router to L3 agent
+
+ v2.0 of the Neutron API is assumed.
+
+ The l3_agent_scheduler extension is required for these tests.
+ """
+
+ @classmethod
+ @test.requires_ext(extension="l3_agent_scheduler", service="network")
+ def skip_checks(cls):
+ super(L3AgentSchedulerTestJSON, cls).skip_checks()
+
+ @classmethod
+ def resource_setup(cls):
+ super(L3AgentSchedulerTestJSON, cls).resource_setup()
+ body = cls.admin_client.list_agents()
+ agents = body['agents']
+ for agent in agents:
+ # TODO(armax): falling back on default _agent_mode can be
+ # dropped as soon as Icehouse is dropped.
+ agent_mode = (
+ agent['configurations'].get('agent_mode', cls._agent_mode))
+ if agent['agent_type'] == AGENT_TYPE and agent_mode in AGENT_MODES:
+ cls.agent = agent
+ break
+ else:
+ msg = "L3 Agent Scheduler enabled in conf, but L3 Agent not found"
+ raise exceptions.InvalidConfiguration(msg)
+ cls.router = cls.create_router(data_utils.rand_name('router'))
+ # NOTE(armax): If DVR is an available extension, and the created router
+ # is indeed a distributed one, more resources need to be provisioned
+ # in order to bind the router to the L3 agent.
+ # That said, let's preserve the existing test logic, where the extra
+ # query and setup steps are only required if the extension is available
+ # and only if the router's default type is distributed.
+ if test.is_extension_enabled('dvr', 'network'):
+ is_dvr_router = cls.admin_client.show_router(
+ cls.router['id'])['router'].get('distributed', False)
+ if is_dvr_router:
+ cls.network = cls.create_network()
+ cls.create_subnet(cls.network)
+ cls.port = cls.create_port(cls.network)
+ cls.client.add_router_interface_with_port_id(
+ cls.router['id'], cls.port['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('b7ce6e89-e837-4ded-9b78-9ed3c9c6a45a')
+ def test_list_routers_on_l3_agent(self):
+ self.admin_client.list_routers_on_l3_agent(self.agent['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('9464e5e7-8625-49c3-8fd1-89c52be59d66')
+ def test_add_list_remove_router_on_l3_agent(self):
+ l3_agent_ids = list()
+ self.admin_client.add_router_to_l3_agent(
+ self.agent['id'],
+ self.router['id'])
+ body = (
+ self.admin_client.list_l3_agents_hosting_router(self.router['id']))
+ for agent in body['agents']:
+ l3_agent_ids.append(agent['id'])
+ self.assertIn('agent_type', agent)
+ self.assertEqual('L3 agent', agent['agent_type'])
+ self.assertIn(self.agent['id'], l3_agent_ids)
+ body = self.admin_client.remove_router_from_l3_agent(
+ self.agent['id'],
+ self.router['id'])
+ # NOTE(afazekas): The deletion not asserted, because neutron
+ # is not forbidden to reschedule the router to the same agent
diff --git a/neutron/tests/tempest/api/admin/test_quotas.py b/neutron/tests/tempest/api/admin/test_quotas.py
new file mode 100644
index 0000000..94471e1
--- /dev/null
+++ b/neutron/tests/tempest/api/admin/test_quotas.py
@@ -0,0 +1,85 @@
+# Copyright 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import six
+from tempest.lib.common.utils import data_utils
+from tempest import test
+
+from neutron.tests.tempest.api import base
+
+
+class QuotasTest(base.BaseAdminNetworkTest):
+
+ """
+ Tests the following operations in the Neutron API using the REST client for
+ Neutron:
+
+ list quotas for tenants who have non-default quota values
+ show quotas for a specified tenant
+ update quotas for a specified tenant
+ reset quotas to default values for a specified tenant
+
+ v2.0 of the API is assumed.
+ It is also assumed that the per-tenant quota extension API is configured
+ in /etc/neutron/neutron.conf as follows:
+
+ quota_driver = neutron.db.driver.DbQuotaDriver
+ """
+
+ @classmethod
+ @test.requires_ext(extension="quotas", service="network")
+ def resource_setup(cls):
+ super(QuotasTest, cls).resource_setup()
+
+ @test.attr(type='gate')
+ @test.idempotent_id('2390f766-836d-40ef-9aeb-e810d78207fb')
+ def test_quotas(self):
+ # Add a tenant to conduct the test
+ test_tenant = data_utils.rand_name('test_tenant_')
+ test_description = data_utils.rand_name('desc_')
+ tenant = self.identity_admin_client.create_tenant(
+ name=test_tenant,
+ description=test_description)['tenant']
+ tenant_id = tenant['id']
+ self.addCleanup(self.identity_admin_client.delete_tenant, tenant_id)
+
+ new_quotas = {'network': 0, 'security_group': 0}
+
+ # Change quotas for tenant
+ quota_set = self.admin_client.update_quotas(tenant_id,
+ **new_quotas)
+ self.addCleanup(self.admin_client.reset_quotas, tenant_id)
+ for key, value in six.iteritems(new_quotas):
+ self.assertEqual(value, quota_set[key])
+
+ # Confirm our tenant is listed among tenants with non default quotas
+ non_default_quotas = self.admin_client.list_quotas()
+ found = False
+ for qs in non_default_quotas['quotas']:
+ if qs['tenant_id'] == tenant_id:
+ found = True
+ self.assertTrue(found)
+
+ # Confirm from API quotas were changed as requested for tenant
+ quota_set = self.admin_client.show_quotas(tenant_id)
+ quota_set = quota_set['quota']
+ for key, value in six.iteritems(new_quotas):
+ self.assertEqual(value, quota_set[key])
+
+ # Reset quotas to default and confirm
+ self.admin_client.reset_quotas(tenant_id)
+ non_default_quotas = self.admin_client.list_quotas()
+ for q in non_default_quotas['quotas']:
+ self.assertNotEqual(tenant_id, q['tenant_id'])
diff --git a/neutron/tests/tempest/api/admin/test_routers_dvr.py b/neutron/tests/tempest/api/admin/test_routers_dvr.py
new file mode 100644
index 0000000..ff429b8
--- /dev/null
+++ b/neutron/tests/tempest/api/admin/test_routers_dvr.py
@@ -0,0 +1,102 @@
+# Copyright 2015 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib.common.utils import data_utils
+from tempest import test
+
+from neutron.tests.tempest.api import base_routers as base
+
+
+class RoutersTestDVR(base.BaseRouterTest):
+
+ @classmethod
+ @test.requires_ext(extension="router", service="network")
+ @test.requires_ext(extension="dvr", service="network")
+ def resource_setup(cls):
+ # The check above will pass if api_extensions=all, which does
+ # not mean DVR extension itself is present.
+ # Instead, we have to check whether DVR is actually present by using
+ # admin credentials to create router with distributed=True attribute
+ # and checking for BadRequest exception and that the resulting router
+ # has a distributed attribute.
+ super(RoutersTestDVR, cls).resource_setup()
+ name = data_utils.rand_name('pretest-check')
+ router = cls.admin_client.create_router(name)
+ if 'distributed' not in router['router']:
+ msg = "'distributed' attribute not found. DVR Possibly not enabled"
+ raise cls.skipException(msg)
+ cls.admin_client.delete_router(router['router']['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('08a2a0a8-f1e4-4b34-8e30-e522e836c44e')
+ def test_distributed_router_creation(self):
+ """
+ Test uses administrative credentials to creates a
+ DVR (Distributed Virtual Routing) router using the
+ distributed=True.
+
+ Acceptance
+ The router is created and the "distributed" attribute is
+ set to True
+ """
+ name = data_utils.rand_name('router')
+ router = self.admin_client.create_router(name, distributed=True)
+ self.addCleanup(self.admin_client.delete_router,
+ router['router']['id'])
+ self.assertTrue(router['router']['distributed'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('8a0a72b4-7290-4677-afeb-b4ffe37bc352')
+ def test_centralized_router_creation(self):
+ """
+ Test uses administrative credentials to creates a
+ CVR (Centralized Virtual Routing) router using the
+ distributed=False.
+
+ Acceptance
+ The router is created and the "distributed" attribute is
+ set to False, thus making it a "Centralized Virtual Router"
+ as opposed to a "Distributed Virtual Router"
+ """
+ name = data_utils.rand_name('router')
+ router = self.admin_client.create_router(name, distributed=False)
+ self.addCleanup(self.admin_client.delete_router,
+ router['router']['id'])
+ self.assertFalse(router['router']['distributed'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('acd43596-c1fb-439d-ada8-31ad48ae3c2e')
+ def test_centralized_router_update_to_dvr(self):
+ """
+ Test uses administrative credentials to creates a
+ CVR (Centralized Virtual Routing) router using the
+ distributed=False.Then it will "update" the router
+ distributed attribute to True
+
+ Acceptance
+ The router is created and the "distributed" attribute is
+ set to False. Once the router is updated, the distributed
+ attribute will be set to True
+ """
+ name = data_utils.rand_name('router')
+ # router needs to be in admin state down in order to be upgraded to DVR
+ router = self.admin_client.create_router(name, distributed=False,
+ admin_state_up=False)
+ self.addCleanup(self.admin_client.delete_router,
+ router['router']['id'])
+ self.assertFalse(router['router']['distributed'])
+ router = self.admin_client.update_router(router['router']['id'],
+ distributed=True)
+ self.assertTrue(router['router']['distributed'])
diff --git a/neutron/tests/tempest/api/admin/test_shared_network_extension.py b/neutron/tests/tempest/api/admin/test_shared_network_extension.py
new file mode 100644
index 0000000..4c7bf6a
--- /dev/null
+++ b/neutron/tests/tempest/api/admin/test_shared_network_extension.py
@@ -0,0 +1,421 @@
+# Copyright 2015 Hewlett-Packard Development Company, L.P.dsvsv
+# Copyright 2015 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+from tempest.lib.common.utils import data_utils
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+import testtools
+
+from neutron.tests.tempest.api import base
+
+
+class SharedNetworksTest(base.BaseAdminNetworkTest):
+
+ @classmethod
+ def resource_setup(cls):
+ super(SharedNetworksTest, cls).resource_setup()
+ cls.shared_network = cls.create_shared_network()
+
+ @test.idempotent_id('6661d219-b96d-4597-ad10-55766123421a')
+ def test_filtering_shared_networks(self):
+ # this test is necessary because the 'shared' column does not actually
+ # exist on networks so the filter function has to translate it into
+ # queries against the RBAC table
+ self.create_network()
+ self._check_shared_correct(
+ self.client.list_networks(shared=True)['networks'], True)
+ self._check_shared_correct(
+ self.admin_client.list_networks(shared=True)['networks'], True)
+ self._check_shared_correct(
+ self.client.list_networks(shared=False)['networks'], False)
+ self._check_shared_correct(
+ self.admin_client.list_networks(shared=False)['networks'], False)
+
+ def _check_shared_correct(self, items, shared):
+ self.assertNotEmpty(items)
+ self.assertTrue(all(n['shared'] == shared for n in items))
+
+ @test.idempotent_id('6661d219-b96d-4597-ad10-51672353421a')
+ def test_filtering_shared_subnets(self):
+ # shared subnets need to be tested because their shared status isn't
+ # visible as a regular API attribute and it's solely dependent on the
+ # parent network
+ reg = self.create_network()
+ priv = self.create_subnet(reg, client=self.client)
+ shared = self.create_subnet(self.shared_network,
+ client=self.admin_client)
+ self.assertIn(shared, self.client.list_subnets(shared=True)['subnets'])
+ self.assertIn(shared,
+ self.admin_client.list_subnets(shared=True)['subnets'])
+ self.assertNotIn(priv,
+ self.client.list_subnets(shared=True)['subnets'])
+ self.assertNotIn(priv,
+ self.admin_client.list_subnets(shared=True)['subnets'])
+ self.assertIn(priv, self.client.list_subnets(shared=False)['subnets'])
+ self.assertIn(priv,
+ self.admin_client.list_subnets(shared=False)['subnets'])
+ self.assertNotIn(shared,
+ self.client.list_subnets(shared=False)['subnets'])
+ self.assertNotIn(shared,
+ self.admin_client.list_subnets(shared=False)['subnets'])
+
+ @test.idempotent_id('6661d219-b96d-4597-ad10-55766ce4abf7')
+ def test_create_update_shared_network(self):
+ shared_network = self.create_shared_network()
+ net_id = shared_network['id']
+ self.assertEqual('ACTIVE', shared_network['status'])
+ self.assertIsNotNone(shared_network['id'])
+ self.assertTrue(self.shared_network['shared'])
+ new_name = "New_shared_network"
+ body = self.admin_client.update_network(net_id, name=new_name,
+ admin_state_up=False,
+ shared=False)
+ updated_net = body['network']
+ self.assertEqual(new_name, updated_net['name'])
+ self.assertFalse(updated_net['shared'])
+ self.assertFalse(updated_net['admin_state_up'])
+
+ @test.idempotent_id('9c31fabb-0181-464f-9ace-95144fe9ca77')
+ def test_create_port_shared_network_as_non_admin_tenant(self):
+ # create a port as non admin
+ body = self.client.create_port(network_id=self.shared_network['id'])
+ port = body['port']
+ self.addCleanup(self.admin_client.delete_port, port['id'])
+ # verify the tenant id of admin network and non admin port
+ self.assertNotEqual(self.shared_network['tenant_id'],
+ port['tenant_id'])
+
+ @test.idempotent_id('3e39c4a6-9caf-4710-88f1-d20073c6dd76')
+ def test_create_bulk_shared_network(self):
+ # Creates 2 networks in one request
+ net_nm = [data_utils.rand_name('network'),
+ data_utils.rand_name('network')]
+ body = self.admin_client.create_bulk_network(net_nm, shared=True)
+ created_networks = body['networks']
+ for net in created_networks:
+ self.addCleanup(self.admin_client.delete_network, net['id'])
+ self.assertIsNotNone(net['id'])
+ self.assertTrue(net['shared'])
+
+ def _list_shared_networks(self, user):
+ body = user.list_networks(shared=True)
+ networks_list = [net['id'] for net in body['networks']]
+ self.assertIn(self.shared_network['id'], networks_list)
+ self.assertTrue(self.shared_network['shared'])
+
+ @test.idempotent_id('a064a9fd-e02f-474a-8159-f828cd636a28')
+ def test_list_shared_networks(self):
+ # List the shared networks and confirm that
+ # shared network extension attribute is returned for those networks
+ # that are created as shared
+ self._list_shared_networks(self.admin_client)
+ self._list_shared_networks(self.client)
+
+ def _show_shared_network(self, user):
+ body = user.show_network(self.shared_network['id'])
+ show_shared_net = body['network']
+ self.assertEqual(self.shared_network['name'], show_shared_net['name'])
+ self.assertEqual(self.shared_network['id'], show_shared_net['id'])
+ self.assertTrue(show_shared_net['shared'])
+
+ @test.idempotent_id('e03c92a2-638d-4bfa-b50a-b1f66f087e58')
+ def test_show_shared_networks_attribute(self):
+ # Show a shared network and confirm that
+ # shared network extension attribute is returned.
+ self._show_shared_network(self.admin_client)
+ self._show_shared_network(self.client)
+
+
+class AllowedAddressPairSharedNetworkTest(base.BaseAdminNetworkTest):
+ allowed_address_pairs = [{'ip_address': '1.1.1.1'}]
+
+ @classmethod
+ @test.requires_ext(extension="allowed-address-pairs", service="network")
+ def skip_checks(cls):
+ super(AllowedAddressPairSharedNetworkTest, cls).skip_checks()
+
+ @classmethod
+ def resource_setup(cls):
+ super(AllowedAddressPairSharedNetworkTest, cls).resource_setup()
+ cls.network = cls.create_shared_network()
+ cls.create_subnet(cls.network, client=cls.admin_client)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-ffffffff1fff')
+ def test_create_with_address_pair_blocked_on_other_network(self):
+ with testtools.ExpectedException(lib_exc.Forbidden):
+ self.create_port(self.network,
+ allowed_address_pairs=self.allowed_address_pairs)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-ffffffff2fff')
+ def test_update_with_address_pair_blocked_on_other_network(self):
+ port = self.create_port(self.network)
+ with testtools.ExpectedException(lib_exc.Forbidden):
+ self.update_port(
+ port, allowed_address_pairs=self.allowed_address_pairs)
+
+
+class RBACSharedNetworksTest(base.BaseAdminNetworkTest):
+
+ force_tenant_isolation = True
+ credentials = ['primary', 'alt', 'admin']
+
+ @classmethod
+ @test.requires_ext(extension="rbac-policies", service="network")
+ def resource_setup(cls):
+ super(RBACSharedNetworksTest, cls).resource_setup()
+ cls.client2 = cls.alt_manager.network_client
+
+ def _make_admin_net_and_subnet_shared_to_tenant_id(self, tenant_id):
+ net = self.admin_client.create_network(
+ name=data_utils.rand_name('test-network-'))['network']
+ self.addCleanup(self.admin_client.delete_network, net['id'])
+ subnet = self.create_subnet(net, client=self.admin_client)
+ # network is shared to first unprivileged client by default
+ pol = self.admin_client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_shared', target_tenant=tenant_id
+ )['rbac_policy']
+ return {'network': net, 'subnet': subnet, 'policy': pol}
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-afffffff1fff')
+ def test_network_only_visible_to_policy_target(self):
+ net = self._make_admin_net_and_subnet_shared_to_tenant_id(
+ self.client.tenant_id)['network']
+ self.client.show_network(net['id'])
+ with testtools.ExpectedException(lib_exc.NotFound):
+ # client2 has not been granted access
+ self.client2.show_network(net['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-afffffff2fff')
+ def test_subnet_on_network_only_visible_to_policy_target(self):
+ sub = self._make_admin_net_and_subnet_shared_to_tenant_id(
+ self.client.tenant_id)['subnet']
+ self.client.show_subnet(sub['id'])
+ with testtools.ExpectedException(lib_exc.NotFound):
+ # client2 has not been granted access
+ self.client2.show_subnet(sub['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-afffffff2eee')
+ def test_policy_target_update(self):
+ res = self._make_admin_net_and_subnet_shared_to_tenant_id(
+ self.client.tenant_id)
+ # change to client2
+ update_res = self.admin_client.update_rbac_policy(
+ res['policy']['id'], target_tenant=self.client2.tenant_id)
+ self.assertEqual(self.client2.tenant_id,
+ update_res['rbac_policy']['target_tenant'])
+ # make sure everything else stayed the same
+ res['policy'].pop('target_tenant')
+ update_res['rbac_policy'].pop('target_tenant')
+ self.assertEqual(res['policy'], update_res['rbac_policy'])
+
+ @test.idempotent_id('86c3529b-1231-40de-803c-affefefef321')
+ def test_duplicate_policy_error(self):
+ res = self._make_admin_net_and_subnet_shared_to_tenant_id(
+ self.client.tenant_id)
+ with testtools.ExpectedException(lib_exc.Conflict):
+ self.admin_client.create_rbac_policy(
+ object_type='network', object_id=res['network']['id'],
+ action='access_as_shared', target_tenant=self.client.tenant_id)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-afffffff3fff')
+ def test_port_presence_prevents_network_rbac_policy_deletion(self):
+ res = self._make_admin_net_and_subnet_shared_to_tenant_id(
+ self.client.tenant_id)
+ port = self.client.create_port(network_id=res['network']['id'])['port']
+ # a port on the network should prevent the deletion of a policy
+ # required for it to exist
+ with testtools.ExpectedException(lib_exc.Conflict):
+ self.admin_client.delete_rbac_policy(res['policy']['id'])
+
+ # a wildcard policy should allow the specific policy to be deleted
+ # since it allows the remaining port
+ wild = self.admin_client.create_rbac_policy(
+ object_type='network', object_id=res['network']['id'],
+ action='access_as_shared', target_tenant='*')['rbac_policy']
+ self.admin_client.delete_rbac_policy(res['policy']['id'])
+
+ # now that wildcard is the only remaining, it should be subjected to
+ # to the same restriction
+ with testtools.ExpectedException(lib_exc.Conflict):
+ self.admin_client.delete_rbac_policy(wild['id'])
+ # similarly, we can't update the policy to a different tenant
+ with testtools.ExpectedException(lib_exc.Conflict):
+ self.admin_client.update_rbac_policy(
+ wild['id'], target_tenant=self.client2.tenant_id)
+
+ self.client.delete_port(port['id'])
+ # anchor is gone, delete should pass
+ self.admin_client.delete_rbac_policy(wild['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-beefbeefbeef')
+ def test_tenant_can_delete_port_on_own_network(self):
+ net = self.create_network() # owned by self.client
+ self.client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_shared', target_tenant=self.client2.tenant_id)
+ port = self.client2.create_port(network_id=net['id'])['port']
+ self.client.delete_port(port['id'])
+
+ @test.idempotent_id('f7539232-389a-4e9c-9e37-e42a129eb541')
+ def test_tenant_cant_delete_other_tenants_ports(self):
+ net = self.create_network()
+ port = self.client.create_port(network_id=net['id'])['port']
+ self.addCleanup(self.client.delete_port, port['id'])
+ with testtools.ExpectedException(lib_exc.NotFound):
+ self.client2.delete_port(port['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-afffffff4fff')
+ def test_regular_client_shares_to_another_regular_client(self):
+ net = self.create_network() # owned by self.client
+ with testtools.ExpectedException(lib_exc.NotFound):
+ self.client2.show_network(net['id'])
+ pol = self.client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_shared', target_tenant=self.client2.tenant_id)
+ self.client2.show_network(net['id'])
+
+ self.assertIn(pol['rbac_policy'],
+ self.client.list_rbac_policies()['rbac_policies'])
+ # ensure that 'client2' can't see the policy sharing the network to it
+ # because the policy belongs to 'client'
+ self.assertNotIn(pol['rbac_policy']['id'],
+ [p['id']
+ for p in self.client2.list_rbac_policies()['rbac_policies']])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('bf5052b8-b11e-407c-8e43-113447404d3e')
+ def test_filter_fields(self):
+ net = self.create_network()
+ self.client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_shared', target_tenant=self.client2.tenant_id)
+ field_args = (('id',), ('id', 'action'), ('object_type', 'object_id'),
+ ('tenant_id', 'target_tenant'))
+ for fields in field_args:
+ res = self.client.list_rbac_policies(fields=fields)
+ self.assertEqual(set(fields), set(res['rbac_policies'][0].keys()))
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-afffffff5fff')
+ def test_policy_show(self):
+ res = self._make_admin_net_and_subnet_shared_to_tenant_id(
+ self.client.tenant_id)
+ p1 = res['policy']
+ p2 = self.admin_client.create_rbac_policy(
+ object_type='network', object_id=res['network']['id'],
+ action='access_as_shared',
+ target_tenant='*')['rbac_policy']
+
+ self.assertEqual(
+ p1, self.admin_client.show_rbac_policy(p1['id'])['rbac_policy'])
+ self.assertEqual(
+ p2, self.admin_client.show_rbac_policy(p2['id'])['rbac_policy'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('e7bcb1ea-4877-4266-87bb-76f68b421f31')
+ def test_filter_policies(self):
+ net = self.create_network()
+ pol1 = self.client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_shared',
+ target_tenant=self.client2.tenant_id)['rbac_policy']
+ pol2 = self.client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_shared',
+ target_tenant=self.client.tenant_id)['rbac_policy']
+ res1 = self.client.list_rbac_policies(id=pol1['id'])['rbac_policies']
+ res2 = self.client.list_rbac_policies(id=pol2['id'])['rbac_policies']
+ self.assertEqual(1, len(res1))
+ self.assertEqual(1, len(res2))
+ self.assertEqual(pol1['id'], res1[0]['id'])
+ self.assertEqual(pol2['id'], res2[0]['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-afffffff6fff')
+ def test_regular_client_blocked_from_sharing_anothers_network(self):
+ net = self._make_admin_net_and_subnet_shared_to_tenant_id(
+ self.client.tenant_id)['network']
+ with testtools.ExpectedException(lib_exc.BadRequest):
+ self.client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_shared', target_tenant=self.client.tenant_id)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('c5f8f785-ce8d-4430-af7e-a236205862fb')
+ @test.requires_ext(extension="quotas", service="network")
+ def test_rbac_policy_quota(self):
+ quota = self.client.show_quotas(self.client.tenant_id)['quota']
+ max_policies = quota['rbac_policy']
+ self.assertGreater(max_policies, 0)
+ net = self.client.create_network(
+ name=data_utils.rand_name('test-network-'))['network']
+ self.addCleanup(self.client.delete_network, net['id'])
+ with testtools.ExpectedException(lib_exc.Conflict):
+ for i in range(0, max_policies + 1):
+ self.admin_client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_shared',
+ target_tenant=str(uuid.uuid4()).replace('-', ''))
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-afffffff7fff')
+ def test_regular_client_blocked_from_sharing_with_wildcard(self):
+ net = self.create_network()
+ with testtools.ExpectedException(lib_exc.Forbidden):
+ self.client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_shared', target_tenant='*')
+ # ensure it works on update as well
+ pol = self.client.create_rbac_policy(
+ object_type='network', object_id=net['id'],
+ action='access_as_shared', target_tenant=self.client2.tenant_id)
+ with testtools.ExpectedException(lib_exc.Forbidden):
+ self.client.update_rbac_policy(pol['rbac_policy']['id'],
+ target_tenant='*')
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-aeeeeeee7fff')
+ def test_filtering_works_with_rbac_records_present(self):
+ resp = self._make_admin_net_and_subnet_shared_to_tenant_id(
+ self.client.tenant_id)
+ net = resp['network']['id']
+ sub = resp['subnet']['id']
+ self.admin_client.create_rbac_policy(
+ object_type='network', object_id=net,
+ action='access_as_shared', target_tenant='*')
+ self._assert_shared_object_id_listing_presence('subnets', False, sub)
+ self._assert_shared_object_id_listing_presence('subnets', True, sub)
+ self._assert_shared_object_id_listing_presence('networks', False, net)
+ self._assert_shared_object_id_listing_presence('networks', True, net)
+
+ def _assert_shared_object_id_listing_presence(self, resource, shared, oid):
+ lister = getattr(self.admin_client, 'list_%s' % resource)
+ objects = [o['id'] for o in lister(shared=shared)[resource]]
+ if shared:
+ self.assertIn(oid, objects)
+ else:
+ self.assertNotIn(oid, objects)
diff --git a/neutron/tests/tempest/api/base.py b/neutron/tests/tempest/api/base.py
new file mode 100644
index 0000000..c1bd622
--- /dev/null
+++ b/neutron/tests/tempest/api/base.py
@@ -0,0 +1,462 @@
+# Copyright 2012 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import netaddr
+from tempest.lib.common.utils import data_utils
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+
+from neutron.tests.tempest.api import clients
+from neutron.tests.tempest import config
+from neutron.tests.tempest import exceptions
+
+CONF = config.CONF
+
+
+class BaseNetworkTest(test.BaseTestCase):
+
+ """
+ Base class for the Neutron tests that use the Tempest Neutron REST client
+
+ Per the Neutron API Guide, API v1.x was removed from the source code tree
+ (docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
+ Therefore, v2.x of the Neutron API is assumed. It is also assumed that the
+ following options are defined in the [network] section of etc/tempest.conf:
+
+ project_network_cidr with a block of cidr's from which smaller blocks
+ can be allocated for tenant networks
+
+ project_network_mask_bits with the mask bits to be used to partition
+ the block defined by tenant-network_cidr
+
+ Finally, it is assumed that the following option is defined in the
+ [service_available] section of etc/tempest.conf
+
+ neutron as True
+ """
+
+ force_tenant_isolation = False
+ credentials = ['primary']
+
+ # Default to ipv4.
+ _ip_version = 4
+
+ @classmethod
+ def get_client_manager(cls, credential_type=None, roles=None,
+ force_new=None):
+ manager = test.BaseTestCase.get_client_manager(
+ credential_type=credential_type,
+ roles=roles,
+ force_new=force_new)
+ # Neutron uses a different clients manager than the one in the Tempest
+ return clients.Manager(manager.credentials)
+
+ @classmethod
+ def skip_checks(cls):
+ super(BaseNetworkTest, cls).skip_checks()
+ if not CONF.service_available.neutron:
+ raise cls.skipException("Neutron support is required")
+ if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6:
+ raise cls.skipException("IPv6 Tests are disabled.")
+
+ @classmethod
+ def setup_credentials(cls):
+ # Create no network resources for these test.
+ cls.set_network_resources()
+ super(BaseNetworkTest, cls).setup_credentials()
+
+ @classmethod
+ def setup_clients(cls):
+ super(BaseNetworkTest, cls).setup_clients()
+ cls.client = cls.os.network_client
+
+ @classmethod
+ def resource_setup(cls):
+ super(BaseNetworkTest, cls).resource_setup()
+
+ cls.networks = []
+ cls.shared_networks = []
+ cls.subnets = []
+ cls.ports = []
+ cls.routers = []
+ cls.floating_ips = []
+ cls.metering_labels = []
+ cls.service_profiles = []
+ cls.flavors = []
+ cls.metering_label_rules = []
+ cls.qos_rules = []
+ cls.qos_policies = []
+ cls.ethertype = "IPv" + str(cls._ip_version)
+ cls.address_scopes = []
+ cls.admin_address_scopes = []
+ cls.subnetpools = []
+ cls.admin_subnetpools = []
+
+ @classmethod
+ def resource_cleanup(cls):
+ if CONF.service_available.neutron:
+ # Clean up QoS rules
+ for qos_rule in cls.qos_rules:
+ cls._try_delete_resource(cls.admin_client.delete_qos_rule,
+ qos_rule['id'])
+ # Clean up QoS policies
+ for qos_policy in cls.qos_policies:
+ cls._try_delete_resource(cls.admin_client.delete_qos_policy,
+ qos_policy['id'])
+ # Clean up floating IPs
+ for floating_ip in cls.floating_ips:
+ cls._try_delete_resource(cls.client.delete_floatingip,
+ floating_ip['id'])
+ # Clean up routers
+ for router in cls.routers:
+ cls._try_delete_resource(cls.delete_router,
+ router)
+ # Clean up metering label rules
+ for metering_label_rule in cls.metering_label_rules:
+ cls._try_delete_resource(
+ cls.admin_client.delete_metering_label_rule,
+ metering_label_rule['id'])
+ # Clean up metering labels
+ for metering_label in cls.metering_labels:
+ cls._try_delete_resource(
+ cls.admin_client.delete_metering_label,
+ metering_label['id'])
+ # Clean up flavors
+ for flavor in cls.flavors:
+ cls._try_delete_resource(
+ cls.admin_client.delete_flavor,
+ flavor['id'])
+ # Clean up service profiles
+ for service_profile in cls.service_profiles:
+ cls._try_delete_resource(
+ cls.admin_client.delete_service_profile,
+ service_profile['id'])
+ # Clean up ports
+ for port in cls.ports:
+ cls._try_delete_resource(cls.client.delete_port,
+ port['id'])
+ # Clean up subnets
+ for subnet in cls.subnets:
+ cls._try_delete_resource(cls.client.delete_subnet,
+ subnet['id'])
+ # Clean up networks
+ for network in cls.networks:
+ cls._try_delete_resource(cls.client.delete_network,
+ network['id'])
+
+ # Clean up shared networks
+ for network in cls.shared_networks:
+ cls._try_delete_resource(cls.admin_client.delete_network,
+ network['id'])
+
+ for subnetpool in cls.subnetpools:
+ cls._try_delete_resource(cls.client.delete_subnetpool,
+ subnetpool['id'])
+
+ for subnetpool in cls.admin_subnetpools:
+ cls._try_delete_resource(cls.admin_client.delete_subnetpool,
+ subnetpool['id'])
+
+ for address_scope in cls.address_scopes:
+ cls._try_delete_resource(cls.client.delete_address_scope,
+ address_scope['id'])
+
+ for address_scope in cls.admin_address_scopes:
+ cls._try_delete_resource(
+ cls.admin_client.delete_address_scope,
+ address_scope['id'])
+
+ super(BaseNetworkTest, cls).resource_cleanup()
+
+ @classmethod
+ def _try_delete_resource(cls, delete_callable, *args, **kwargs):
+ """Cleanup resources in case of test-failure
+
+ Some resources are explicitly deleted by the test.
+ If the test failed to delete a resource, this method will execute
+ the appropriate delete methods. Otherwise, the method ignores NotFound
+ exceptions thrown for resources that were correctly deleted by the
+ test.
+
+ :param delete_callable: delete method
+ :param args: arguments for delete method
+ :param kwargs: keyword arguments for delete method
+ """
+ try:
+ delete_callable(*args, **kwargs)
+ # if resource is not found, this means it was deleted in the test
+ except lib_exc.NotFound:
+ pass
+
+ @classmethod
+ def create_network(cls, network_name=None, **kwargs):
+ """Wrapper utility that returns a test network."""
+ network_name = network_name or data_utils.rand_name('test-network-')
+
+ body = cls.client.create_network(name=network_name, **kwargs)
+ network = body['network']
+ cls.networks.append(network)
+ return network
+
+ @classmethod
+ def create_shared_network(cls, network_name=None, **post_body):
+ network_name = network_name or data_utils.rand_name('sharednetwork-')
+ post_body.update({'name': network_name, 'shared': True})
+ body = cls.admin_client.create_network(**post_body)
+ network = body['network']
+ cls.shared_networks.append(network)
+ return network
+
+ @classmethod
+ def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None,
+ ip_version=None, client=None, **kwargs):
+ """Wrapper utility that returns a test subnet."""
+
+ # allow tests to use admin client
+ if not client:
+ client = cls.client
+
+ # The cidr and mask_bits depend on the ip version.
+ ip_version = ip_version if ip_version is not None else cls._ip_version
+ gateway_not_set = gateway == ''
+ if ip_version == 4:
+ cidr = cidr or netaddr.IPNetwork(
+ config.safe_get_config_value(
+ 'network', 'project_network_cidr'))
+ mask_bits = (
+ mask_bits or config.safe_get_config_value(
+ 'network', 'project_network_mask_bits'))
+ elif ip_version == 6:
+ cidr = (
+ cidr or netaddr.IPNetwork(
+ config.safe_get_config_value(
+ 'network', 'project_network_v6_cidr')))
+ mask_bits = (
+ mask_bits or config.safe_get_config_value(
+ 'network', 'project_network_v6_mask_bits'))
+ # Find a cidr that is not in use yet and create a subnet with it
+ for subnet_cidr in cidr.subnet(mask_bits):
+ if gateway_not_set:
+ gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
+ else:
+ gateway_ip = gateway
+ try:
+ body = client.create_subnet(
+ network_id=network['id'],
+ cidr=str(subnet_cidr),
+ ip_version=ip_version,
+ gateway_ip=gateway_ip,
+ **kwargs)
+ break
+ except lib_exc.BadRequest as e:
+ is_overlapping_cidr = 'overlaps with another subnet' in str(e)
+ if not is_overlapping_cidr:
+ raise
+ else:
+ message = 'Available CIDR for subnet creation could not be found'
+ raise ValueError(message)
+ subnet = body['subnet']
+ cls.subnets.append(subnet)
+ return subnet
+
+ @classmethod
+ def create_port(cls, network, **kwargs):
+ """Wrapper utility that returns a test port."""
+ body = cls.client.create_port(network_id=network['id'],
+ **kwargs)
+ port = body['port']
+ cls.ports.append(port)
+ return port
+
+ @classmethod
+ def update_port(cls, port, **kwargs):
+ """Wrapper utility that updates a test port."""
+ body = cls.client.update_port(port['id'],
+ **kwargs)
+ return body['port']
+
+ @classmethod
+ def create_router(cls, router_name=None, admin_state_up=False,
+ external_network_id=None, enable_snat=None,
+ **kwargs):
+ ext_gw_info = {}
+ if external_network_id:
+ ext_gw_info['network_id'] = external_network_id
+ if enable_snat:
+ ext_gw_info['enable_snat'] = enable_snat
+ body = cls.client.create_router(
+ router_name, external_gateway_info=ext_gw_info,
+ admin_state_up=admin_state_up, **kwargs)
+ router = body['router']
+ cls.routers.append(router)
+ return router
+
+ @classmethod
+ def create_floatingip(cls, external_network_id):
+ """Wrapper utility that returns a test floating IP."""
+ body = cls.client.create_floatingip(
+ floating_network_id=external_network_id)
+ fip = body['floatingip']
+ cls.floating_ips.append(fip)
+ return fip
+
+ @classmethod
+ def create_router_interface(cls, router_id, subnet_id):
+ """Wrapper utility that returns a router interface."""
+ interface = cls.client.add_router_interface_with_subnet_id(
+ router_id, subnet_id)
+ return interface
+
+ @classmethod
+ def create_qos_policy(cls, name, description, shared, tenant_id=None):
+ """Wrapper utility that returns a test QoS policy."""
+ body = cls.admin_client.create_qos_policy(
+ name, description, shared, tenant_id)
+ qos_policy = body['policy']
+ cls.qos_policies.append(qos_policy)
+ return qos_policy
+
+ @classmethod
+ def create_qos_bandwidth_limit_rule(cls, policy_id,
+ max_kbps, max_burst_kbps):
+ """Wrapper utility that returns a test QoS bandwidth limit rule."""
+ body = cls.admin_client.create_bandwidth_limit_rule(
+ policy_id, max_kbps, max_burst_kbps)
+ qos_rule = body['bandwidth_limit_rule']
+ cls.qos_rules.append(qos_rule)
+ return qos_rule
+
+ @classmethod
+ def delete_router(cls, router):
+ body = cls.client.list_router_interfaces(router['id'])
+ interfaces = body['ports']
+ for i in interfaces:
+ try:
+ cls.client.remove_router_interface_with_subnet_id(
+ router['id'], i['fixed_ips'][0]['subnet_id'])
+ except lib_exc.NotFound:
+ pass
+ cls.client.delete_router(router['id'])
+
+ @classmethod
+ def create_address_scope(cls, name, is_admin=False, **kwargs):
+ if is_admin:
+ body = cls.admin_client.create_address_scope(name=name, **kwargs)
+ cls.admin_address_scopes.append(body['address_scope'])
+ else:
+ body = cls.client.create_address_scope(name=name, **kwargs)
+ cls.address_scopes.append(body['address_scope'])
+ return body['address_scope']
+
+ @classmethod
+ def create_subnetpool(cls, name, is_admin=False, **kwargs):
+ if is_admin:
+ body = cls.admin_client.create_subnetpool(name, **kwargs)
+ cls.admin_subnetpools.append(body['subnetpool'])
+ else:
+ body = cls.client.create_subnetpool(name, **kwargs)
+ cls.subnetpools.append(body['subnetpool'])
+ return body['subnetpool']
+
+
+class BaseAdminNetworkTest(BaseNetworkTest):
+
+ credentials = ['primary', 'admin']
+
+ @classmethod
+ def setup_clients(cls):
+ super(BaseAdminNetworkTest, cls).setup_clients()
+ cls.admin_client = cls.os_adm.network_client
+ cls.identity_admin_client = cls.os_adm.tenants_client
+
+ @classmethod
+ def create_metering_label(cls, name, description):
+ """Wrapper utility that returns a test metering label."""
+ body = cls.admin_client.create_metering_label(
+ description=description,
+ name=data_utils.rand_name("metering-label"))
+ metering_label = body['metering_label']
+ cls.metering_labels.append(metering_label)
+ return metering_label
+
+ @classmethod
+ def create_metering_label_rule(cls, remote_ip_prefix, direction,
+ metering_label_id):
+ """Wrapper utility that returns a test metering label rule."""
+ body = cls.admin_client.create_metering_label_rule(
+ remote_ip_prefix=remote_ip_prefix, direction=direction,
+ metering_label_id=metering_label_id)
+ metering_label_rule = body['metering_label_rule']
+ cls.metering_label_rules.append(metering_label_rule)
+ return metering_label_rule
+
+ @classmethod
+ def create_flavor(cls, name, description, service_type):
+ """Wrapper utility that returns a test flavor."""
+ body = cls.admin_client.create_flavor(
+ description=description, service_type=service_type,
+ name=name)
+ flavor = body['flavor']
+ cls.flavors.append(flavor)
+ return flavor
+
+ @classmethod
+ def create_service_profile(cls, description, metainfo, driver):
+ """Wrapper utility that returns a test service profile."""
+ body = cls.admin_client.create_service_profile(
+ driver=driver, metainfo=metainfo, description=description)
+ service_profile = body['service_profile']
+ cls.service_profiles.append(service_profile)
+ return service_profile
+
+ @classmethod
+ def get_unused_ip(cls, net_id, ip_version=None):
+ """Get an unused ip address in a allocaion pool of net"""
+ body = cls.admin_client.list_ports(network_id=net_id)
+ ports = body['ports']
+ used_ips = []
+ for port in ports:
+ used_ips.extend(
+ [fixed_ip['ip_address'] for fixed_ip in port['fixed_ips']])
+ body = cls.admin_client.list_subnets(network_id=net_id)
+ subnets = body['subnets']
+
+ for subnet in subnets:
+ if ip_version and subnet['ip_version'] != ip_version:
+ continue
+ cidr = subnet['cidr']
+ allocation_pools = subnet['allocation_pools']
+ iterators = []
+ if allocation_pools:
+ for allocation_pool in allocation_pools:
+ iterators.append(netaddr.iter_iprange(
+ allocation_pool['start'], allocation_pool['end']))
+ else:
+ net = netaddr.IPNetwork(cidr)
+
+ def _iterip():
+ for ip in net:
+ if ip not in (net.network, net.broadcast):
+ yield ip
+ iterators.append(iter(_iterip()))
+
+ for iterator in iterators:
+ for ip in iterator:
+ if str(ip) not in used_ips:
+ return str(ip)
+
+ message = (
+ "net(%s) has no usable IP address in allocation pools" % net_id)
+ raise exceptions.InvalidConfiguration(message)
diff --git a/neutron/tests/tempest/api/base_routers.py b/neutron/tests/tempest/api/base_routers.py
new file mode 100644
index 0000000..8b0b5a4
--- /dev/null
+++ b/neutron/tests/tempest/api/base_routers.py
@@ -0,0 +1,45 @@
+# Copyright 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from neutron.tests.tempest.api import base
+
+
+class BaseRouterTest(base.BaseAdminNetworkTest):
+ # NOTE(salv-orlando): This class inherits from BaseAdminNetworkTest
+ # as some router operations, such as enabling or disabling SNAT
+ # require admin credentials by default
+
+ def _cleanup_router(self, router):
+ self.delete_router(router)
+ self.routers.remove(router)
+
+ def _create_router(self, name, admin_state_up=False,
+ external_network_id=None, enable_snat=None):
+ # associate a cleanup with created routers to avoid quota limits
+ router = self.create_router(name, admin_state_up,
+ external_network_id, enable_snat)
+ self.addCleanup(self._cleanup_router, router)
+ return router
+
+ def _delete_router(self, router_id, network_client=None):
+ client = network_client or self.client
+ client.delete_router(router_id)
+ # Asserting that the router is not found in the list
+ # after deletion
+ list_body = self.client.list_routers()
+ routers_list = list()
+ for router in list_body['routers']:
+ routers_list.append(router['id'])
+ self.assertNotIn(router_id, routers_list)
diff --git a/neutron/tests/tempest/api/base_security_groups.py b/neutron/tests/tempest/api/base_security_groups.py
new file mode 100644
index 0000000..8575ba9
--- /dev/null
+++ b/neutron/tests/tempest/api/base_security_groups.py
@@ -0,0 +1,45 @@
+# Copyright 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib.common.utils import data_utils
+
+from neutron.tests.tempest.api import base
+
+
+class BaseSecGroupTest(base.BaseNetworkTest):
+
+ @classmethod
+ def resource_setup(cls):
+ super(BaseSecGroupTest, cls).resource_setup()
+
+ def _create_security_group(self, **kwargs):
+ # Create a security group
+ name = data_utils.rand_name('secgroup-')
+ group_create_body = self.client.create_security_group(name=name,
+ **kwargs)
+ self.addCleanup(self._delete_security_group,
+ group_create_body['security_group']['id'])
+ self.assertEqual(group_create_body['security_group']['name'], name)
+ return group_create_body, name
+
+ def _delete_security_group(self, secgroup_id):
+ self.client.delete_security_group(secgroup_id)
+ # Asserting that the security group is not found in the list
+ # after deletion
+ list_body = self.client.list_security_groups()
+ secgroup_list = list()
+ for secgroup in list_body['security_groups']:
+ secgroup_list.append(secgroup['id'])
+ self.assertNotIn(secgroup_id, secgroup_list)
diff --git a/neutron/tests/tempest/api/clients.py b/neutron/tests/tempest/api/clients.py
new file mode 100644
index 0000000..d9879b0
--- /dev/null
+++ b/neutron/tests/tempest/api/clients.py
@@ -0,0 +1,73 @@
+# Copyright 2012 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest import manager
+from tempest.services.identity.v2.json.tenants_client import \
+ TenantsClient
+
+from neutron.tests.tempest import config
+from neutron.tests.tempest.services.network.json.network_client import \
+ NetworkClientJSON
+
+
+CONF = config.CONF
+
+
+class Manager(manager.Manager):
+
+ """
+ Top level manager for OpenStack tempest clients
+ """
+
+ default_params = {
+ 'disable_ssl_certificate_validation':
+ CONF.identity.disable_ssl_certificate_validation,
+ 'ca_certs': CONF.identity.ca_certificates_file,
+ 'trace_requests': CONF.debug.trace_requests
+ }
+
+ # NOTE: Tempest uses timeout values of compute API if project specific
+ # timeout values don't exist.
+ default_params_with_timeout_values = {
+ 'build_interval': CONF.compute.build_interval,
+ 'build_timeout': CONF.compute.build_timeout
+ }
+ default_params_with_timeout_values.update(default_params)
+
+ def __init__(self, credentials=None, service=None):
+ super(Manager, self).__init__(credentials=credentials)
+
+ self._set_identity_clients()
+
+ self.network_client = NetworkClientJSON(
+ self.auth_provider,
+ CONF.network.catalog_type,
+ CONF.network.region or CONF.identity.region,
+ endpoint_type=CONF.network.endpoint_type,
+ build_interval=CONF.network.build_interval,
+ build_timeout=CONF.network.build_timeout,
+ **self.default_params)
+
+ def _set_identity_clients(self):
+ params = {
+ 'service': CONF.identity.catalog_type,
+ 'region': CONF.identity.region
+ }
+ params.update(self.default_params_with_timeout_values)
+ params_v2_admin = params.copy()
+ params_v2_admin['endpoint_type'] = CONF.identity.v2_admin_endpoint_type
+ # Client uses admin endpoint type of Keystone API v2
+ self.tenants_client = TenantsClient(self.auth_provider,
+ **params_v2_admin)
diff --git a/neutron/tests/tempest/api/test_address_scopes.py b/neutron/tests/tempest/api/test_address_scopes.py
new file mode 100644
index 0000000..aedc552
--- /dev/null
+++ b/neutron/tests/tempest/api/test_address_scopes.py
@@ -0,0 +1,114 @@
+# Copyright (c) 2015 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib.common.utils import data_utils
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+
+from neutron.tests.tempest.api import base
+
+
+ADDRESS_SCOPE_NAME = 'smoke-address-scope'
+
+
+class AddressScopeTestBase(base.BaseAdminNetworkTest):
+
+ @classmethod
+ @test.requires_ext(extension="address-scope", service="network")
+ def resource_setup(cls):
+ super(AddressScopeTestBase, cls).resource_setup()
+
+ def _create_address_scope(self, is_admin=False, **kwargs):
+ name = data_utils.rand_name(ADDRESS_SCOPE_NAME)
+ return self.create_address_scope(name=name, is_admin=is_admin,
+ **kwargs)
+
+ def _test_update_address_scope_helper(self, is_admin=False, shared=None):
+ address_scope = self._create_address_scope(is_admin=is_admin,
+ ip_version=4)
+
+ if is_admin:
+ client = self.admin_client
+ else:
+ client = self.client
+
+ kwargs = {'name': 'new_name'}
+ if shared is not None:
+ kwargs['shared'] = shared
+
+ client.update_address_scope(address_scope['id'], **kwargs)
+ body = client.show_address_scope(address_scope['id'])
+ address_scope = body['address_scope']
+ self.assertEqual('new_name', address_scope['name'])
+ return address_scope
+
+
+class AddressScopeTest(AddressScopeTestBase):
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('045f9294-8b1a-4848-b6a8-edf1b41e9d06')
+ def test_tenant_create_list_address_scope(self):
+ address_scope = self._create_address_scope(ip_version=4)
+ body = self.client.list_address_scopes()
+ returned_address_scopes = body['address_scopes']
+ self.assertIn(address_scope['id'],
+ [a_s['id'] for a_s in returned_address_scopes],
+ "Created address scope id should be in the list")
+ self.assertIn(address_scope['name'],
+ [a_s['name'] for a_s in returned_address_scopes],
+ "Created address scope name should be in the list")
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('85e0326b-4c75-4b92-bd6e-7c7de6aaf05c')
+ def test_show_address_scope(self):
+ address_scope = self._create_address_scope(ip_version=4)
+ body = self.client.show_address_scope(
+ address_scope['id'])
+ returned_address_scope = body['address_scope']
+ self.assertEqual(address_scope['id'], returned_address_scope['id'])
+ self.assertEqual(address_scope['name'],
+ returned_address_scope['name'])
+ self.assertFalse(returned_address_scope['shared'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('85a259b2-ace6-4e32-9657-a9a392b452aa')
+ def test_tenant_update_address_scope(self):
+ self._test_update_address_scope_helper()
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('22b3b600-72a8-4b60-bc94-0f29dd6271df')
+ def test_delete_address_scope(self):
+ address_scope = self._create_address_scope(ip_version=4)
+ self.client.delete_address_scope(address_scope['id'])
+ self.assertRaises(lib_exc.NotFound, self.client.show_address_scope,
+ address_scope['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('5a06c287-8036-4d04-9d78-def8e06d43df')
+ def test_admin_create_shared_address_scope(self):
+ address_scope = self._create_address_scope(is_admin=True, shared=True,
+ ip_version=4)
+ body = self.admin_client.show_address_scope(
+ address_scope['id'])
+ returned_address_scope = body['address_scope']
+ self.assertEqual(address_scope['name'],
+ returned_address_scope['name'])
+ self.assertTrue(returned_address_scope['shared'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('e9e1ccdd-9ccd-4076-9503-71820529508b')
+ def test_admin_update_shared_address_scope(self):
+ address_scope = self._test_update_address_scope_helper(is_admin=True,
+ shared=True)
+ self.assertTrue(address_scope['shared'])
diff --git a/neutron/tests/tempest/api/test_address_scopes_negative.py b/neutron/tests/tempest/api/test_address_scopes_negative.py
new file mode 100644
index 0000000..9fc9e5b
--- /dev/null
+++ b/neutron/tests/tempest/api/test_address_scopes_negative.py
@@ -0,0 +1,92 @@
+# Copyright (c) 2015 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib.common.utils import data_utils
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+
+from neutron.tests.tempest.api import test_address_scopes
+
+
+class AddressScopeTestNegative(test_address_scopes.AddressScopeTestBase):
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('9c92ec34-0c50-4104-aa47-9ce98d5088df')
+ def test_tenant_create_shared_address_scope(self):
+ self.assertRaises(lib_exc.Forbidden, self._create_address_scope,
+ shared=True, ip_version=4)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('a857b61e-bf53-4fab-b21a-b0daaf81b5bd')
+ def test_tenant_update_address_scope_shared_true(self):
+ self.assertRaises(lib_exc.Forbidden,
+ self._test_update_address_scope_helper, shared=True)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('a859ef2f-9c76-4e2e-ba0f-e0339a489e8c')
+ def test_tenant_update_address_scope_shared_false(self):
+ self.assertRaises(lib_exc.Forbidden,
+ self._test_update_address_scope_helper, shared=False)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('9b6dd7ad-cabb-4f55-bd5e-e61176ef41f6')
+ def test_get_non_existent_address_scope(self):
+ non_exist_id = data_utils.rand_name('address_scope')
+ self.assertRaises(lib_exc.NotFound, self.client.show_address_scope,
+ non_exist_id)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('ef213552-f2da-487d-bf4a-e1705d115ff1')
+ def test_tenant_get_not_shared_admin_address_scope(self):
+ address_scope = self._create_address_scope(is_admin=True,
+ ip_version=4)
+ # None-shared admin address scope cannot be retrieved by tenant user.
+ self.assertRaises(lib_exc.NotFound, self.client.show_address_scope,
+ address_scope['id'])
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('5c25dc6a-1e92-467a-9cc7-cda74b6003db')
+ def test_delete_non_existent_address_scope(self):
+ non_exist_id = data_utils.rand_name('address_scope')
+ self.assertRaises(lib_exc.NotFound, self.client.delete_address_scope,
+ non_exist_id)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('47c25dc5-e886-4a84-88c3-ac5031969661')
+ def test_update_non_existent_address_scope(self):
+ non_exist_id = data_utils.rand_name('address_scope')
+ self.assertRaises(lib_exc.NotFound, self.client.update_address_scope,
+ non_exist_id, name='foo-name')
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('702d0515-82cb-4207-b0d9-703336e54665')
+ def test_update_shared_address_scope_to_unshare(self):
+ address_scope = self._create_address_scope(is_admin=True, shared=True,
+ ip_version=4)
+ self.assertRaises(lib_exc.BadRequest,
+ self.admin_client.update_address_scope,
+ address_scope['id'], name='new-name', shared=False)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('1e471e5c-6f9c-437a-9257-fd9bc4b6f0fb')
+ def test_delete_address_scope_associated_with_subnetpool(self):
+ address_scope = self._create_address_scope(ip_version=4)
+ prefixes = [u'10.11.12.0/24']
+ subnetpool_data = {
+ 'name': 'foo-subnetpool',
+ 'min_prefixlen': '29', 'prefixes': prefixes,
+ 'address_scope_id': address_scope['id']}
+ self.create_subnetpool(**subnetpool_data)
+ self.assertRaises(lib_exc.Conflict, self.client.delete_address_scope,
+ address_scope['id'])
diff --git a/neutron/tests/tempest/api/test_allowed_address_pair.py b/neutron/tests/tempest/api/test_allowed_address_pair.py
new file mode 100644
index 0000000..f6db86e
--- /dev/null
+++ b/neutron/tests/tempest/api/test_allowed_address_pair.py
@@ -0,0 +1,134 @@
+# Copyright 2014 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import netaddr
+from tempest import test
+
+from neutron.tests.tempest.api import base
+from neutron.tests.tempest import config
+
+CONF = config.CONF
+
+
+class AllowedAddressPairTestJSON(base.BaseNetworkTest):
+
+ """
+ Tests the Neutron Allowed Address Pair API extension using the Tempest
+ REST client. The following API operations are tested with this extension:
+
+ create port
+ list ports
+ update port
+ show port
+
+ v2.0 of the Neutron API is assumed. It is also assumed that the following
+ options are defined in the [network-feature-enabled] section of
+ etc/tempest.conf
+
+ api_extensions
+ """
+
+ @classmethod
+ @test.requires_ext(extension="allowed-address-pairs", service="network")
+ def resource_setup(cls):
+ super(AllowedAddressPairTestJSON, cls).resource_setup()
+ cls.network = cls.create_network()
+ cls.create_subnet(cls.network)
+ port = cls.create_port(cls.network)
+ cls.ip_address = port['fixed_ips'][0]['ip_address']
+ cls.mac_address = port['mac_address']
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86c3529b-1231-40de-803c-00e40882f043')
+ def test_create_list_port_with_address_pair(self):
+ # Create port with allowed address pair attribute
+ allowed_address_pairs = [{'ip_address': self.ip_address,
+ 'mac_address': self.mac_address}]
+ body = self.client.create_port(
+ network_id=self.network['id'],
+ allowed_address_pairs=allowed_address_pairs)
+ port_id = body['port']['id']
+ self.addCleanup(self.client.delete_port, port_id)
+
+ # Confirm port was created with allowed address pair attribute
+ body = self.client.list_ports()
+ ports = body['ports']
+ port = [p for p in ports if p['id'] == port_id]
+ msg = 'Created port not found in list of ports returned by Neutron'
+ self.assertTrue(port, msg)
+ self._confirm_allowed_address_pair(port[0], self.ip_address)
+
+ @test.attr(type='smoke')
+ def _update_port_with_address(self, address, mac_address=None, **kwargs):
+ # Create a port without allowed address pair
+ body = self.client.create_port(network_id=self.network['id'])
+ port_id = body['port']['id']
+ self.addCleanup(self.client.delete_port, port_id)
+ if mac_address is None:
+ mac_address = self.mac_address
+
+ # Update allowed address pair attribute of port
+ allowed_address_pairs = [{'ip_address': address,
+ 'mac_address': mac_address}]
+ if kwargs:
+ allowed_address_pairs.append(kwargs['allowed_address_pairs'])
+ body = self.client.update_port(
+ port_id, allowed_address_pairs=allowed_address_pairs)
+ allowed_address_pair = body['port']['allowed_address_pairs']
+ self.assertEqual(allowed_address_pair, allowed_address_pairs)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('9599b337-272c-47fd-b3cf-509414414ac4')
+ def test_update_port_with_address_pair(self):
+ # Update port with allowed address pair
+ self._update_port_with_address(self.ip_address)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('4d6d178f-34f6-4bff-a01c-0a2f8fe909e4')
+ def test_update_port_with_cidr_address_pair(self):
+ # Update allowed address pair with cidr
+ cidr = str(
+ netaddr.IPNetwork(config.safe_get_config_value(
+ 'network', 'project_network_cidr')))
+ self._update_port_with_address(cidr)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('b3f20091-6cd5-472b-8487-3516137df933')
+ def test_update_port_with_multiple_ip_mac_address_pair(self):
+ # Create an ip _address and mac_address through port create
+ resp = self.client.create_port(network_id=self.network['id'])
+ newportid = resp['port']['id']
+ self.addCleanup(self.client.delete_port, newportid)
+ ipaddress = resp['port']['fixed_ips'][0]['ip_address']
+ macaddress = resp['port']['mac_address']
+
+ # Update allowed address pair port with multiple ip and mac
+ allowed_address_pairs = {'ip_address': ipaddress,
+ 'mac_address': macaddress}
+ self._update_port_with_address(
+ self.ip_address, self.mac_address,
+ allowed_address_pairs=allowed_address_pairs)
+
+ def _confirm_allowed_address_pair(self, port, ip):
+ msg = 'Port allowed address pairs should not be empty'
+ self.assertTrue(port['allowed_address_pairs'], msg)
+ ip_address = port['allowed_address_pairs'][0]['ip_address']
+ mac_address = port['allowed_address_pairs'][0]['mac_address']
+ self.assertEqual(ip_address, ip)
+ self.assertEqual(mac_address, self.mac_address)
+
+
+class AllowedAddressPairIpV6TestJSON(AllowedAddressPairTestJSON):
+ _ip_version = 6
diff --git a/neutron/tests/tempest/api/test_auto_allocated_topology.py b/neutron/tests/tempest/api/test_auto_allocated_topology.py
new file mode 100644
index 0000000..d9c59c3
--- /dev/null
+++ b/neutron/tests/tempest/api/test_auto_allocated_topology.py
@@ -0,0 +1,101 @@
+# Copyright 2016 IBM
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from oslo_config import cfg
+from tempest import test
+
+from neutron.tests.tempest.api import base
+
+
+class TestAutoAllocatedTopology(base.BaseAdminNetworkTest):
+
+ """
+ NOTE: This test may eventually migrate to Tempest.
+
+ Tests the Get-Me-A-Network operation in the Neutron API
+ using the REST client for Neutron.
+ """
+
+ @classmethod
+ @test.requires_ext(extension="auto-allocated-topology", service="network")
+ def skip_checks(cls):
+ super(TestAutoAllocatedTopology, cls).skip_checks()
+
+ @classmethod
+ def resource_setup(cls):
+ super(TestAutoAllocatedTopology, cls).resource_setup()
+
+ # The deployment must contain a default subnetpool
+ body = cls.client.list_subnetpools(is_default=True)
+ # The deployment may contain one or two default subnetpools:
+ # one ipv4 pool, or one ipv6 pool, or one of each.
+ # This run-time dependency should be revisited if the test is
+ # moved over to tempest.
+ cls.num_subnetpools = len(body['subnetpools'])
+ if cls.num_subnetpools == 0:
+ raise cls.skipException("No default subnetpool")
+
+ # Ensure the public external network is the default external network
+ public_net_id = cfg.CONF.network.public_network_id
+ cls.admin_client.update_network(public_net_id, is_default=True)
+
+ def _count_topology_resources(self):
+ '''Count the resources whose names begin with 'auto_allocated_'.'''
+
+ def _count(resources):
+ return len([resource['id'] for resource in resources
+ if resource['name'].startswith('auto_allocated_')])
+
+ networks = _count(self.client.list_networks()['networks'])
+ subnets = _count(self.client.list_subnets()['subnets'])
+ routers = _count(self.client.list_routers()['routers'])
+ return networks, subnets, routers
+
+ def _add_topology_cleanup(self, client):
+ '''Add the auto-allocated resources to the cleanup lists.'''
+
+ body = client.list_routers(name='auto_allocated_router')
+ self.routers.extend(body['routers'])
+ body = client.list_subnets(name='auto_allocated_subnet_v4')
+ self.subnets.extend(body['subnets'])
+ body = client.list_subnets(name='auto_allocated_subnet_v6')
+ self.subnets.extend(body['subnets'])
+ body = client.list_networks(name='auto_allocated_network')
+ self.networks.extend(body['networks'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('64bc0b02-cee4-11e5-9f3c-080027605a2b')
+ def test_get_allocated_net_topology_as_tenant(self):
+ resources_before = self._count_topology_resources()
+ self.assertEqual((0, 0, 0), resources_before)
+
+ body = self.client.get_auto_allocated_topology()
+ topology = body['auto_allocated_topology']
+ self.assertIsNotNone(topology)
+ self._add_topology_cleanup(self.client)
+
+ network_id1 = topology['id']
+ self.assertIsNotNone(network_id1)
+ resources_after1 = self._count_topology_resources()
+ # One network, two subnets (v4 and v6) and one router
+ self.assertEqual((1, self.num_subnetpools, 1), resources_after1)
+
+ body = self.client.get_auto_allocated_topology()
+ topology = body['auto_allocated_topology']
+ network_id2 = topology['id']
+ resources_after2 = self._count_topology_resources()
+ # After the initial GET, the API should be idempotent
+ self.assertEqual(network_id1, network_id2)
+ self.assertEqual(resources_after1, resources_after2)
diff --git a/neutron/tests/tempest/api/test_bgp_speaker_extensions.py b/neutron/tests/tempest/api/test_bgp_speaker_extensions.py
new file mode 100644
index 0000000..123f43f
--- /dev/null
+++ b/neutron/tests/tempest/api/test_bgp_speaker_extensions.py
@@ -0,0 +1,286 @@
+# Copyright 2016 Hewlett Packard Enterprise Development Company LP
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import netaddr
+from tempest import config
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+import testtools
+
+from neutron.tests.tempest.api import base
+from neutron.tests.tempest.common import tempest_fixtures as fixtures
+
+CONF = config.CONF
+
+
+class BgpSpeakerTestJSONBase(base.BaseAdminNetworkTest):
+
+ default_bgp_speaker_args = {'local_as': '1234',
+ 'ip_version': 4,
+ 'name': 'my-bgp-speaker',
+ 'advertise_floating_ip_host_routes': True,
+ 'advertise_tenant_networks': True}
+ default_bgp_peer_args = {'remote_as': '4321',
+ 'name': 'my-bgp-peer',
+ 'peer_ip': '192.168.1.1',
+ 'auth_type': 'md5', 'password': 'my-secret'}
+
+ @classmethod
+ @test.requires_ext(extension="bgp_speaker", service="network")
+ def resource_setup(cls):
+ super(BgpSpeakerTestJSONBase, cls).resource_setup()
+
+ cls.admin_routerports = []
+ cls.admin_floatingips = []
+ cls.admin_routers = []
+ cls.ext_net_id = CONF.network.public_network_id
+
+ @classmethod
+ def resource_cleanup(cls):
+ for floatingip in cls.admin_floatingips:
+ cls._try_delete_resource(cls.admin_client.delete_floatingip,
+ floatingip['id'])
+ for routerport in cls.admin_routerports:
+ cls._try_delete_resource(
+ cls.admin_client.remove_router_interface_with_subnet_id,
+ routerport['router_id'], routerport['subnet_id'])
+ for router in cls.admin_routers:
+ cls._try_delete_resource(cls.admin_client.delete_router,
+ router['id'])
+ super(BgpSpeakerTestJSONBase, cls).resource_cleanup()
+
+ def create_bgp_speaker(self, auto_delete=True, **args):
+ data = {'bgp_speaker': args}
+ bgp_speaker = self.admin_client.create_bgp_speaker(data)
+ bgp_speaker_id = bgp_speaker['bgp-speaker']['id']
+ if auto_delete:
+ self.addCleanup(self.delete_bgp_speaker, bgp_speaker_id)
+ return bgp_speaker
+
+ def create_bgp_peer(self, **args):
+ bgp_peer = self.admin_client.create_bgp_peer({'bgp_peer': args})
+ bgp_peer_id = bgp_peer['bgp-peer']['id']
+ self.addCleanup(self.delete_bgp_peer, bgp_peer_id)
+ return bgp_peer
+
+ def update_bgp_speaker(self, id, **args):
+ data = {'bgp_speaker': args}
+ return self.admin_client.update_bgp_speaker(id, data)
+
+ def delete_bgp_speaker(self, id):
+ return self.admin_client.delete_bgp_speaker(id)
+
+ def get_bgp_speaker(self, id):
+ return self.admin_client.get_bgp_speaker(id)
+
+ def create_bgp_speaker_and_peer(self):
+ bgp_speaker = self.create_bgp_speaker(**self.default_bgp_speaker_args)
+ bgp_peer = self.create_bgp_peer(**self.default_bgp_peer_args)
+ return (bgp_speaker, bgp_peer)
+
+ def delete_bgp_peer(self, id):
+ return self.admin_client.delete_bgp_peer(id)
+
+ def add_bgp_peer(self, bgp_speaker_id, bgp_peer_id):
+ return self.admin_client.add_bgp_peer_with_id(bgp_speaker_id,
+ bgp_peer_id)
+
+ def remove_bgp_peer(self, bgp_speaker_id, bgp_peer_id):
+ return self.admin_client.remove_bgp_peer_with_id(bgp_speaker_id,
+ bgp_peer_id)
+
+ def delete_address_scope(self, id):
+ return self.admin_client.delete_address_scope(id)
+
+
+class BgpSpeakerTestJSON(BgpSpeakerTestJSONBase):
+
+ """
+ Tests the following operations in the Neutron API using the REST client for
+ Neutron:
+
+ Create bgp-speaker
+ Delete bgp-speaker
+ Create bgp-peer
+ Update bgp-peer
+ Delete bgp-peer
+ """
+
+ @test.idempotent_id('df259771-7104-4ffa-b77f-bd183600d7f9')
+ def test_delete_bgp_speaker(self):
+ bgp_speaker = self.create_bgp_speaker(auto_delete=False,
+ **self.default_bgp_speaker_args)
+ bgp_speaker_id = bgp_speaker['bgp-speaker']['id']
+ self.delete_bgp_speaker(bgp_speaker_id)
+ self.assertRaises(lib_exc.NotFound,
+ self.get_bgp_speaker,
+ bgp_speaker_id)
+
+ @test.idempotent_id('81d9dc45-19f8-4c6e-88b8-401d965cd1b0')
+ def test_create_bgp_peer(self):
+ self.create_bgp_peer(**self.default_bgp_peer_args)
+
+ @test.idempotent_id('6ade0319-1ee2-493c-ac4b-5eb230ff3a77')
+ def test_add_bgp_peer(self):
+ bgp_speaker, bgp_peer = self.create_bgp_speaker_and_peer()
+ bgp_speaker_id = bgp_speaker['bgp-speaker']['id']
+ bgp_peer_id = bgp_peer['bgp-peer']['id']
+
+ self.add_bgp_peer(bgp_speaker_id, bgp_peer_id)
+ bgp_speaker = self.admin_client.get_bgp_speaker(bgp_speaker_id)
+ bgp_peers_list = bgp_speaker['bgp-speaker']['peers']
+ self.assertEqual(1, len(bgp_peers_list))
+ self.assertTrue(bgp_peer_id in bgp_peers_list)
+
+ @test.idempotent_id('f9737708-1d79-440b-8350-779f97d882ee')
+ def test_remove_bgp_peer(self):
+ bgp_peer = self.create_bgp_peer(**self.default_bgp_peer_args)
+ bgp_peer_id = bgp_peer['bgp-peer']['id']
+ bgp_speaker = self.create_bgp_speaker(**self.default_bgp_speaker_args)
+ bgp_speaker_id = bgp_speaker['bgp-speaker']['id']
+ self.add_bgp_peer(bgp_speaker_id, bgp_peer_id)
+ bgp_speaker = self.admin_client.get_bgp_speaker(bgp_speaker_id)
+ bgp_peers_list = bgp_speaker['bgp-speaker']['peers']
+ self.assertTrue(bgp_peer_id in bgp_peers_list)
+
+ bgp_speaker = self.remove_bgp_peer(bgp_speaker_id, bgp_peer_id)
+ bgp_speaker = self.admin_client.get_bgp_speaker(bgp_speaker_id)
+ bgp_peers_list = bgp_speaker['bgp-speaker']['peers']
+ self.assertTrue(not bgp_peers_list)
+
+ @testtools.skip('bug/1553374')
+ @test.idempotent_id('23c8eb37-d10d-4f43-b2e7-6542cb6a4405')
+ def test_add_gateway_network(self):
+ self.useFixture(fixtures.LockFixture('gateway_network_binding'))
+ bgp_speaker = self.create_bgp_speaker(**self.default_bgp_speaker_args)
+ bgp_speaker_id = bgp_speaker['bgp-speaker']['id']
+
+ self.admin_client.add_bgp_gateway_network(bgp_speaker_id,
+ self.ext_net_id)
+ bgp_speaker = self.admin_client.get_bgp_speaker(bgp_speaker_id)
+ network_list = bgp_speaker['bgp-speaker']['networks']
+ self.assertEqual(1, len(network_list))
+ self.assertTrue(self.ext_net_id in network_list)
+
+ @testtools.skip('bug/1553374')
+ @test.idempotent_id('6cfc7137-0d99-4a3d-826c-9d1a3a1767b0')
+ def test_remove_gateway_network(self):
+ self.useFixture(fixtures.LockFixture('gateway_network_binding'))
+ bgp_speaker = self.create_bgp_speaker(**self.default_bgp_speaker_args)
+ bgp_speaker_id = bgp_speaker['bgp-speaker']['id']
+ self.admin_client.add_bgp_gateway_network(bgp_speaker_id,
+ self.ext_net_id)
+ bgp_speaker = self.admin_client.get_bgp_speaker(bgp_speaker_id)
+ networks = bgp_speaker['bgp-speaker']['networks']
+
+ self.assertTrue(self.ext_net_id in networks)
+ self.admin_client.remove_bgp_gateway_network(bgp_speaker_id,
+ self.ext_net_id)
+ bgp_speaker = self.admin_client.get_bgp_speaker(bgp_speaker_id)
+ network_list = bgp_speaker['bgp-speaker']['networks']
+ self.assertTrue(not network_list)
+
+ @testtools.skip('bug/1553374')
+ @test.idempotent_id('5bef22ad-5e70-4f7b-937a-dc1944642996')
+ def test_get_advertised_routes_null_address_scope(self):
+ self.useFixture(fixtures.LockFixture('gateway_network_binding'))
+ bgp_speaker = self.create_bgp_speaker(**self.default_bgp_speaker_args)
+ bgp_speaker_id = bgp_speaker['bgp-speaker']['id']
+ self.admin_client.add_bgp_gateway_network(bgp_speaker_id,
+ self.ext_net_id)
+ routes = self.admin_client.get_bgp_advertised_routes(bgp_speaker_id)
+ self.assertEqual(0, len(routes['advertised_routes']))
+
+ @testtools.skip('bug/1553374')
+ @test.idempotent_id('cae9cdb1-ad65-423c-9604-d4cd0073616e')
+ def test_get_advertised_routes_floating_ips(self):
+ self.useFixture(fixtures.LockFixture('gateway_network_binding'))
+ bgp_speaker = self.create_bgp_speaker(**self.default_bgp_speaker_args)
+ bgp_speaker_id = bgp_speaker['bgp-speaker']['id']
+ self.admin_client.add_bgp_gateway_network(bgp_speaker_id,
+ self.ext_net_id)
+ tenant_net = self.create_network()
+ tenant_subnet = self.create_subnet(tenant_net)
+ ext_gw_info = {'network_id': self.ext_net_id}
+ router = self.admin_client.create_router(
+ 'my-router',
+ external_gateway_info=ext_gw_info,
+ admin_state_up=True,
+ distributed=False)
+ self.admin_routers.append(router['router'])
+ self.admin_client.add_router_interface_with_subnet_id(
+ router['router']['id'],
+ tenant_subnet['id'])
+ self.admin_routerports.append({'router_id': router['router']['id'],
+ 'subnet_id': tenant_subnet['id']})
+ tenant_port = self.create_port(tenant_net)
+ floatingip = self.create_floatingip(self.ext_net_id)
+ self.admin_floatingips.append(floatingip)
+ self.client.update_floatingip(floatingip['id'],
+ port_id=tenant_port['id'])
+ routes = self.admin_client.get_bgp_advertised_routes(bgp_speaker_id)
+ self.assertEqual(1, len(routes['advertised_routes']))
+ self.assertEqual(floatingip['floating_ip_address'] + '/32',
+ routes['advertised_routes'][0]['destination'])
+
+ @testtools.skip('bug/1553374')
+ @test.idempotent_id('c9ad566e-fe8f-4559-8303-bbad9062a30c')
+ def test_get_advertised_routes_tenant_networks(self):
+ self.useFixture(fixtures.LockFixture('gateway_network_binding'))
+ addr_scope = self.create_address_scope('my-scope', ip_version=4)
+ ext_net = self.create_shared_network(**{'router:external': True})
+ tenant_net = self.create_network()
+ ext_subnetpool = self.create_subnetpool(
+ 'test-pool-ext',
+ is_admin=True,
+ default_prefixlen=24,
+ address_scope_id=addr_scope['id'],
+ prefixes=['8.0.0.0/8'])
+ tenant_subnetpool = self.create_subnetpool(
+ 'tenant-test-pool',
+ default_prefixlen=25,
+ address_scope_id=addr_scope['id'],
+ prefixes=['10.10.0.0/16'])
+ self.create_subnet({'id': ext_net['id']},
+ cidr=netaddr.IPNetwork('8.0.0.0/24'),
+ ip_version=4,
+ client=self.admin_client,
+ subnetpool_id=ext_subnetpool['id'])
+ tenant_subnet = self.create_subnet(
+ {'id': tenant_net['id']},
+ cidr=netaddr.IPNetwork('10.10.0.0/24'),
+ ip_version=4,
+ subnetpool_id=tenant_subnetpool['id'])
+ ext_gw_info = {'network_id': ext_net['id']}
+ router = self.admin_client.create_router(
+ 'my-router',
+ external_gateway_info=ext_gw_info,
+ distributed=False)['router']
+ self.admin_routers.append(router)
+ self.admin_client.add_router_interface_with_subnet_id(
+ router['id'],
+ tenant_subnet['id'])
+ self.admin_routerports.append({'router_id': router['id'],
+ 'subnet_id': tenant_subnet['id']})
+ bgp_speaker = self.create_bgp_speaker(**self.default_bgp_speaker_args)
+ bgp_speaker_id = bgp_speaker['bgp-speaker']['id']
+ self.admin_client.add_bgp_gateway_network(bgp_speaker_id,
+ ext_net['id'])
+ routes = self.admin_client.get_bgp_advertised_routes(bgp_speaker_id)
+ self.assertEqual(1, len(routes['advertised_routes']))
+ self.assertEqual(tenant_subnet['cidr'],
+ routes['advertised_routes'][0]['destination'])
+ fixed_ip = router['external_gateway_info']['external_fixed_ips'][0]
+ self.assertEqual(fixed_ip['ip_address'],
+ routes['advertised_routes'][0]['next_hop'])
diff --git a/neutron/tests/tempest/api/test_bgp_speaker_extensions_negative.py b/neutron/tests/tempest/api/test_bgp_speaker_extensions_negative.py
new file mode 100644
index 0000000..d0b7d1e
--- /dev/null
+++ b/neutron/tests/tempest/api/test_bgp_speaker_extensions_negative.py
@@ -0,0 +1,120 @@
+# Copyright 2016 Hewlett Packard Enterprise Development Company LP
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import netaddr
+from tempest.lib import exceptions as lib_exc
+
+from neutron.tests.tempest.api import test_bgp_speaker_extensions as test_base
+from tempest import test
+
+
+class BgpSpeakerTestJSONNegative(test_base.BgpSpeakerTestJSONBase):
+
+ """Negative test cases asserting proper behavior of BGP API extension"""
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('75e9ee2f-6efd-4320-bff7-ae24741c8b06')
+ def test_create_bgp_speaker_illegal_local_asn(self):
+ self.assertRaises(lib_exc.BadRequest,
+ self.create_bgp_speaker,
+ local_as='65537')
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('6742ec2e-382a-4453-8791-13a19b47cd13')
+ def test_create_bgp_speaker_non_admin(self):
+ self.assertRaises(lib_exc.Forbidden,
+ self.client.create_bgp_speaker,
+ {'bgp_speaker': self.default_bgp_speaker_args})
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('33f7aaf0-9786-478b-b2d1-a51086a50eb4')
+ def test_create_bgp_peer_non_admin(self):
+ self.assertRaises(lib_exc.Forbidden,
+ self.client.create_bgp_peer,
+ {'bgp_peer': self.default_bgp_peer_args})
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('39435932-0266-4358-899b-0e9b1e53c3e9')
+ def test_update_bgp_speaker_local_asn(self):
+ bgp_speaker = self.create_bgp_speaker(**self.default_bgp_speaker_args)
+ bgp_speaker_id = bgp_speaker['bgp-speaker']['id']
+
+ self.assertRaises(lib_exc.BadRequest, self.update_bgp_speaker,
+ bgp_speaker_id, local_as='4321')
+
+ @test.idempotent_id('9cc33701-51e5-421f-a5d5-fd7b330e550f')
+ def test_get_advertised_routes_tenant_networks(self):
+ addr_scope1 = self.create_address_scope('my-scope1', ip_version=4)
+ addr_scope2 = self.create_address_scope('my-scope2', ip_version=4)
+ ext_net = self.create_shared_network(**{'router:external': True})
+ tenant_net1 = self.create_network()
+ tenant_net2 = self.create_network()
+ ext_subnetpool = self.create_subnetpool(
+ 'test-pool-ext',
+ is_admin=True,
+ default_prefixlen=24,
+ address_scope_id=addr_scope1['id'],
+ prefixes=['8.0.0.0/8'])
+ tenant_subnetpool1 = self.create_subnetpool(
+ 'tenant-test-pool',
+ default_prefixlen=25,
+ address_scope_id=addr_scope1['id'],
+ prefixes=['10.10.0.0/16'])
+ tenant_subnetpool2 = self.create_subnetpool(
+ 'tenant-test-pool',
+ default_prefixlen=25,
+ address_scope_id=addr_scope2['id'],
+ prefixes=['11.10.0.0/16'])
+ self.create_subnet({'id': ext_net['id']},
+ cidr=netaddr.IPNetwork('8.0.0.0/24'),
+ ip_version=4,
+ client=self.admin_client,
+ subnetpool_id=ext_subnetpool['id'])
+ tenant_subnet1 = self.create_subnet(
+ {'id': tenant_net1['id']},
+ cidr=netaddr.IPNetwork('10.10.0.0/24'),
+ ip_version=4,
+ subnetpool_id=tenant_subnetpool1['id'])
+ tenant_subnet2 = self.create_subnet(
+ {'id': tenant_net2['id']},
+ cidr=netaddr.IPNetwork('11.10.0.0/24'),
+ ip_version=4,
+ subnetpool_id=tenant_subnetpool2['id'])
+ ext_gw_info = {'network_id': ext_net['id']}
+ router = self.admin_client.create_router(
+ 'my-router',
+ distributed=False,
+ external_gateway_info=ext_gw_info)['router']
+ self.admin_routers.append(router)
+ self.admin_client.add_router_interface_with_subnet_id(
+ router['id'],
+ tenant_subnet1['id'])
+ self.admin_routerports.append({'router_id': router['id'],
+ 'subnet_id': tenant_subnet1['id']})
+ self.admin_client.add_router_interface_with_subnet_id(
+ router['id'],
+ tenant_subnet2['id'])
+ self.admin_routerports.append({'router_id': router['id'],
+ 'subnet_id': tenant_subnet2['id']})
+ bgp_speaker = self.create_bgp_speaker(**self.default_bgp_speaker_args)
+ bgp_speaker_id = bgp_speaker['bgp-speaker']['id']
+ self.admin_client.add_bgp_gateway_network(bgp_speaker_id,
+ ext_net['id'])
+ routes = self.admin_client.get_bgp_advertised_routes(bgp_speaker_id)
+ self.assertEqual(1, len(routes['advertised_routes']))
+ self.assertEqual(tenant_subnet1['cidr'],
+ routes['advertised_routes'][0]['destination'])
+ fixed_ip = router['external_gateway_info']['external_fixed_ips'][0]
+ self.assertEqual(fixed_ip['ip_address'],
+ routes['advertised_routes'][0]['next_hop'])
diff --git a/neutron/tests/tempest/api/test_dhcp_ipv6.py b/neutron/tests/tempest/api/test_dhcp_ipv6.py
new file mode 100644
index 0000000..e136efa
--- /dev/null
+++ b/neutron/tests/tempest/api/test_dhcp_ipv6.py
@@ -0,0 +1,98 @@
+# Copyright 2014 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import netaddr
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+
+from neutron.common import constants
+from neutron.tests.tempest.api import base
+from neutron.tests.tempest import config
+
+CONF = config.CONF
+
+
+class NetworksTestDHCPv6(base.BaseNetworkTest):
+ _ip_version = 6
+
+ @classmethod
+ def skip_checks(cls):
+ msg = None
+ if not CONF.network_feature_enabled.ipv6:
+ msg = "IPv6 is not enabled"
+ elif not CONF.network_feature_enabled.ipv6_subnet_attributes:
+ msg = "DHCPv6 attributes are not enabled."
+ if msg:
+ raise cls.skipException(msg)
+
+ @classmethod
+ def resource_setup(cls):
+ super(NetworksTestDHCPv6, cls).resource_setup()
+ cls.network = cls.create_network()
+
+ def _remove_from_list_by_index(self, things_list, elem):
+ for index, i in enumerate(things_list):
+ if i['id'] == elem['id']:
+ break
+ del things_list[index]
+
+ def _clean_network(self):
+ body = self.client.list_ports()
+ ports = body['ports']
+ for port in ports:
+ if (port['device_owner'].startswith(
+ constants.DEVICE_OWNER_ROUTER_INTF)
+ and port['device_id'] in [r['id'] for r in self.routers]):
+ self.client.remove_router_interface_with_port_id(
+ port['device_id'], port['id']
+ )
+ else:
+ if port['id'] in [p['id'] for p in self.ports]:
+ self.client.delete_port(port['id'])
+ self._remove_from_list_by_index(self.ports, port)
+ body = self.client.list_subnets()
+ subnets = body['subnets']
+ for subnet in subnets:
+ if subnet['id'] in [s['id'] for s in self.subnets]:
+ self.client.delete_subnet(subnet['id'])
+ self._remove_from_list_by_index(self.subnets, subnet)
+ body = self.client.list_routers()
+ routers = body['routers']
+ for router in routers:
+ if router['id'] in [r['id'] for r in self.routers]:
+ self.client.delete_router(router['id'])
+ self._remove_from_list_by_index(self.routers, router)
+
+ @test.idempotent_id('98244d88-d990-4570-91d4-6b25d70d08af')
+ def test_dhcp_stateful_fixedips_outrange(self):
+ """When port gets IP address from fixed IP range it
+ shall be checked if it's from subnets range.
+ """
+ kwargs = {'ipv6_ra_mode': 'dhcpv6-stateful',
+ 'ipv6_address_mode': 'dhcpv6-stateful'}
+ subnet = self.create_subnet(self.network, **kwargs)
+ ip_range = netaddr.IPRange(subnet["allocation_pools"][0]["start"],
+ subnet["allocation_pools"][0]["end"])
+ for i in range(1, 3):
+ ip = netaddr.IPAddress(ip_range.last + i).format()
+ self.assertRaises(lib_exc.BadRequest,
+ self.create_port,
+ self.network,
+ fixed_ips=[{'subnet_id': subnet['id'],
+ 'ip_address': ip}])
+
+ def tearDown(self):
+ self._clean_network()
+ super(NetworksTestDHCPv6, self).tearDown()
diff --git a/neutron/tests/tempest/api/test_extension_driver_port_security.py b/neutron/tests/tempest/api/test_extension_driver_port_security.py
new file mode 100644
index 0000000..ce29260
--- /dev/null
+++ b/neutron/tests/tempest/api/test_extension_driver_port_security.py
@@ -0,0 +1,154 @@
+# Copyright 2015 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import ddt
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+
+from neutron.tests.tempest.api import base
+from neutron.tests.tempest.api import base_security_groups as base_security
+
+FAKE_IP = '10.0.0.1'
+FAKE_MAC = '00:25:64:e8:19:dd'
+
+
+@ddt.ddt
+class PortSecTest(base_security.BaseSecGroupTest,
+ base.BaseNetworkTest):
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('7c338ddf-e64e-4118-bd33-e49a1f2f1495')
+ @test.requires_ext(extension='port-security', service='network')
+ def test_port_sec_default_value(self):
+ # Default port-sec value is True, and the attr of the port will inherit
+ # from the port-sec of the network when it not be specified in API
+ network = self.create_network()
+ self.assertTrue(network['port_security_enabled'])
+ self.create_subnet(network)
+ port = self.create_port(network)
+ self.assertTrue(port['port_security_enabled'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('e60eafd2-31de-4c38-8106-55447d033b57')
+ @test.requires_ext(extension='port-security', service='network')
+ @ddt.unpack
+ @ddt.data({'port_sec_net': False, 'port_sec_port': True, 'expected': True},
+ {'port_sec_net': True, 'port_sec_port': False,
+ 'expected': False})
+ def test_port_sec_specific_value(self, port_sec_net, port_sec_port,
+ expected):
+ network = self.create_network(port_security_enabled=port_sec_net)
+ self.create_subnet(network)
+ port = self.create_port(network, port_security_enabled=port_sec_port)
+ self.assertEqual(network['port_security_enabled'], port_sec_net)
+ self.assertEqual(port['port_security_enabled'], expected)
+
+ @test.attr(type=['smoke'])
+ @test.idempotent_id('05642059-1bfc-4581-9bc9-aaa5db08dd60')
+ @test.requires_ext(extension='port-security', service='network')
+ def test_create_port_sec_with_security_group(self):
+ network = self.create_network(port_security_enabled=True)
+ self.create_subnet(network)
+
+ port = self.create_port(network, security_groups=[])
+ self.assertTrue(port['port_security_enabled'])
+ self.client.delete_port(port['id'])
+
+ port = self.create_port(network, security_groups=[],
+ port_security_enabled=False)
+ self.assertFalse(port['port_security_enabled'])
+ self.assertEmpty(port['security_groups'])
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('05642059-1bfc-4581-9bc9-aaa5db08dd60')
+ @test.requires_ext(extension='port-security', service='network')
+ def test_port_sec_update_port_failed(self):
+ network = self.create_network()
+ self.create_subnet(network)
+
+ sec_group_body, sec_group_name = self._create_security_group()
+ port = self.create_port(network)
+
+ # Exception when set port-sec to False with sec-group defined
+ self.assertRaises(lib_exc.Conflict, self.update_port, port,
+ port_security_enabled=False)
+
+ port = self.update_port(port, security_groups=[],
+ port_security_enabled=False)
+ self.assertEmpty(port['security_groups'])
+ self.assertFalse(port['port_security_enabled'])
+ port = self.update_port(
+ port, security_groups=[sec_group_body['security_group']['id']],
+ port_security_enabled=True)
+
+ self.assertNotEmpty(port['security_groups'])
+ self.assertTrue(port['port_security_enabled'])
+
+ # Remove security group from port before deletion on resource_cleanup
+ self.update_port(port, security_groups=[])
+
+ @test.attr(type=['smoke'])
+ @test.idempotent_id('05642059-1bfc-4581-9bc9-aaa5db08dd60')
+ @test.requires_ext(extension='port-security', service='network')
+ def test_port_sec_update_pass(self):
+ network = self.create_network()
+ self.create_subnet(network)
+ sec_group, _ = self._create_security_group()
+ sec_group_id = sec_group['security_group']['id']
+ port = self.create_port(network, security_groups=[sec_group_id],
+ port_security_enabled=True)
+
+ self.assertNotEmpty(port['security_groups'])
+ self.assertTrue(port['port_security_enabled'])
+
+ port = self.update_port(port, security_groups=[])
+ self.assertEmpty(port['security_groups'])
+ self.assertTrue(port['port_security_enabled'])
+
+ port = self.update_port(port, security_groups=[sec_group_id])
+ self.assertNotEmpty(port['security_groups'])
+ port = self.update_port(port, security_groups=[],
+ port_security_enabled=False)
+ self.assertEmpty(port['security_groups'])
+ self.assertFalse(port['port_security_enabled'])
+
+ @test.attr(type=['smoke'])
+ @test.idempotent_id('2df6114b-b8c3-48a1-96e8-47f08159d35c')
+ @test.requires_ext(extension='port-security', service='network')
+ def test_delete_with_port_sec(self):
+ network = self.create_network(port_security_enabled=True)
+ port = self.create_port(network=network,
+ port_security_enabled=True)
+ self.client.delete_port(port['id'])
+ self.assertTrue(self.client.is_resource_deleted('port', port['id']))
+ self.client.delete_network(network['id'])
+ self.assertTrue(
+ self.client.is_resource_deleted('network', network['id']))
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('ed93e453-3f8d-495e-8e7e-b0e268c2ebd9')
+ @test.requires_ext(extension='port-security', service='network')
+ @test.requires_ext(extension='allowed-address-pairs', service='network')
+ def test_allow_address_pairs(self):
+ network = self.create_network()
+ self.create_subnet(network)
+ port = self.create_port(network=network, port_security_enabled=False)
+ allowed_address_pairs = [{'ip_address': FAKE_IP,
+ 'mac_address': FAKE_MAC}]
+
+ # Exception when set address-pairs with port-sec is False
+ self.assertRaises(lib_exc.Conflict,
+ self.update_port, port,
+ allowed_address_pairs=allowed_address_pairs)
diff --git a/neutron/tests/tempest/api/test_extra_dhcp_options.py b/neutron/tests/tempest/api/test_extra_dhcp_options.py
new file mode 100644
index 0000000..a51ad27
--- /dev/null
+++ b/neutron/tests/tempest/api/test_extra_dhcp_options.py
@@ -0,0 +1,99 @@
+# Copyright 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib.common.utils import data_utils
+from tempest import test
+
+from neutron.tests.tempest.api import base
+
+
+class ExtraDHCPOptionsTestJSON(base.BaseNetworkTest):
+
+ """
+ Tests the following operations with the Extra DHCP Options Neutron API
+ extension:
+
+ port create
+ port list
+ port show
+ port update
+
+ v2.0 of the Neutron API is assumed. It is also assumed that the Extra
+ DHCP Options extension is enabled in the [network-feature-enabled]
+ section of etc/tempest.conf
+ """
+
+ @classmethod
+ @test.requires_ext(extension="extra_dhcp_opt", service="network")
+ def resource_setup(cls):
+ super(ExtraDHCPOptionsTestJSON, cls).resource_setup()
+ cls.network = cls.create_network()
+ cls.subnet = cls.create_subnet(cls.network)
+ cls.port = cls.create_port(cls.network)
+ cls.ip_tftp = ('123.123.123.123' if cls._ip_version == 4
+ else '2015::dead')
+ cls.ip_server = ('123.123.123.45' if cls._ip_version == 4
+ else '2015::badd')
+ cls.extra_dhcp_opts = [
+ {'opt_value': 'pxelinux.0', 'opt_name': 'bootfile-name'},
+ {'opt_value': cls.ip_tftp, 'opt_name': 'tftp-server'},
+ {'opt_value': cls.ip_server, 'opt_name': 'server-ip-address'}
+ ]
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('d2c17063-3767-4a24-be4f-a23dbfa133c9')
+ def test_create_list_port_with_extra_dhcp_options(self):
+ # Create a port with Extra DHCP Options
+ body = self.client.create_port(
+ network_id=self.network['id'],
+ extra_dhcp_opts=self.extra_dhcp_opts)
+ port_id = body['port']['id']
+ self.addCleanup(self.client.delete_port, port_id)
+
+ # Confirm port created has Extra DHCP Options
+ body = self.client.list_ports()
+ ports = body['ports']
+ port = [p for p in ports if p['id'] == port_id]
+ self.assertTrue(port)
+ self._confirm_extra_dhcp_options(port[0], self.extra_dhcp_opts)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('9a6aebf4-86ee-4f47-b07a-7f7232c55607')
+ def test_update_show_port_with_extra_dhcp_options(self):
+ # Update port with extra dhcp options
+ name = data_utils.rand_name('new-port-name')
+ body = self.client.update_port(
+ self.port['id'],
+ name=name,
+ extra_dhcp_opts=self.extra_dhcp_opts)
+ # Confirm extra dhcp options were added to the port
+ body = self.client.show_port(self.port['id'])
+ self._confirm_extra_dhcp_options(body['port'], self.extra_dhcp_opts)
+
+ def _confirm_extra_dhcp_options(self, port, extra_dhcp_opts):
+ retrieved = port['extra_dhcp_opts']
+ self.assertEqual(len(retrieved), len(extra_dhcp_opts))
+ for retrieved_option in retrieved:
+ for option in extra_dhcp_opts:
+ if (retrieved_option['opt_value'] == option['opt_value'] and
+ retrieved_option['opt_name'] == option['opt_name']):
+ break
+ else:
+ self.fail('Extra DHCP option not found in port %s' %
+ str(retrieved_option))
+
+
+class ExtraDHCPOptionsIpV6TestJSON(ExtraDHCPOptionsTestJSON):
+ _ip_version = 6
diff --git a/neutron/tests/tempest/api/test_flavors_extensions.py b/neutron/tests/tempest/api/test_flavors_extensions.py
new file mode 100644
index 0000000..063d503
--- /dev/null
+++ b/neutron/tests/tempest/api/test_flavors_extensions.py
@@ -0,0 +1,160 @@
+# Copyright 2015 Hewlett-Packard Development Company, L.P.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+
+from neutron.tests.tempest.api import base
+
+
+class TestFlavorsJson(base.BaseAdminNetworkTest):
+
+ """
+ Tests the following operations in the Neutron API using the REST client for
+ Neutron:
+
+ List, Show, Create, Update, Delete Flavors
+ List, Show, Create, Update, Delete service profiles
+ """
+
+ @classmethod
+ @test.requires_ext(extension="flavors", service="network")
+ def resource_setup(cls):
+ super(TestFlavorsJson, cls).resource_setup()
+
+ # Use flavors service type as know this is loaded
+ service_type = "FLAVORS"
+ description_flavor = "flavor is created by tempest"
+ name_flavor = "Best flavor created by tempest"
+
+ # The check above will pass if api_extensions=all, which does
+ # not mean flavors extension itself is present.
+ try:
+ cls.flavor = cls.create_flavor(name_flavor, description_flavor,
+ service_type)
+ except lib_exc.NotFound:
+ msg = "flavors plugin not enabled."
+ raise cls.skipException(msg)
+
+ description_sp = "service profile created by tempest"
+ # Drivers are supported as is an empty driver field. Use an
+ # empty field for now since otherwise driver is validated against the
+ # servicetype configuration which may differ in test scenarios.
+ driver = ""
+ metainfo = '{"data": "value"}'
+ cls.service_profile = cls.create_service_profile(
+ description=description_sp, metainfo=metainfo, driver=driver)
+
+ def _delete_service_profile(self, service_profile_id):
+ # Deletes a service profile and verifies if it is deleted or not
+ self.admin_client.delete_service_profile(service_profile_id)
+ # Asserting that service profile is not found in list after deletion
+ labels = self.admin_client.list_service_profiles(id=service_profile_id)
+ self.assertEqual(len(labels['service_profiles']), 0)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('ec8e15ff-95d0-433b-b8a6-b466bddb1e50')
+ def test_create_update_delete_service_profile(self):
+ # Creates a service profile
+ description = "service_profile created by tempest"
+ driver = ""
+ metainfo = '{"data": "value"}'
+ body = self.admin_client.create_service_profile(
+ description=description, driver=driver, metainfo=metainfo)
+ service_profile = body['service_profile']
+ # Updates a service profile
+ self.admin_client.update_service_profile(service_profile['id'],
+ enabled=False)
+ self.assertTrue(service_profile['enabled'])
+ # Deletes a service profile
+ self.addCleanup(self._delete_service_profile,
+ service_profile['id'])
+ # Assert whether created service profiles are found in service profile
+ # lists or fail if created service profiles are not found in service
+ # profiles list
+ labels = (self.admin_client.list_service_profiles(
+ id=service_profile['id']))
+ self.assertEqual(len(labels['service_profiles']), 1)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('ec8e15ff-95d0-433b-b8a6-b466bddb1e50')
+ def test_create_update_delete_flavor(self):
+ # Creates a flavor
+ description = "flavor created by tempest"
+ service = "FLAVORS"
+ name = "Best flavor created by tempest"
+ body = self.admin_client.create_flavor(name=name, service_type=service,
+ description=description)
+ flavor = body['flavor']
+ # Updates a flavor
+ self.admin_client.update_flavor(flavor['id'], enabled=False)
+ self.assertTrue(flavor['enabled'])
+ # Deletes a flavor
+ self.addCleanup(self._delete_flavor, flavor['id'])
+ # Assert whether created flavors are found in flavor lists or fail
+ # if created flavors are not found in flavors list
+ labels = (self.admin_client.list_flavors(id=flavor['id']))
+ self.assertEqual(len(labels['flavors']), 1)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('30abb445-0eea-472e-bd02-8649f54a5968')
+ def test_show_service_profile(self):
+ # Verifies the details of a service profile
+ body = self.admin_client.show_service_profile(
+ self.service_profile['id'])
+ service_profile = body['service_profile']
+ self.assertEqual(self.service_profile['id'], service_profile['id'])
+ self.assertEqual(self.service_profile['description'],
+ service_profile['description'])
+ self.assertEqual(self.service_profile['metainfo'],
+ service_profile['metainfo'])
+ self.assertTrue(service_profile['enabled'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('30abb445-0eea-472e-bd02-8649f54a5968')
+ def test_show_flavor(self):
+ # Verifies the details of a flavor
+ body = self.admin_client.show_flavor(self.flavor['id'])
+ flavor = body['flavor']
+ self.assertEqual(self.flavor['id'], flavor['id'])
+ self.assertEqual(self.flavor['description'], flavor['description'])
+ self.assertEqual(self.flavor['name'], flavor['name'])
+ self.assertTrue(flavor['enabled'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('e2fb2f8c-45bf-429a-9f17-171c70444612')
+ def test_list_flavors(self):
+ # Verify flavor lists
+ body = self.admin_client.list_flavors(id=33)
+ flavors = body['flavors']
+ self.assertEqual(0, len(flavors))
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('e2fb2f8c-45bf-429a-9f17-171c70444612')
+ def test_list_service_profiles(self):
+ # Verify service profiles lists
+ body = self.admin_client.list_service_profiles(id=33)
+ service_profiles = body['service_profiles']
+ self.assertEqual(0, len(service_profiles))
+
+ def _delete_flavor(self, flavor_id):
+ # Deletes a flavor and verifies if it is deleted or not
+ self.admin_client.delete_flavor(flavor_id)
+ # Asserting that the flavor is not found in list after deletion
+ labels = self.admin_client.list_flavors(id=flavor_id)
+ self.assertEqual(len(labels['flavors']), 0)
+
+
+class TestFlavorsIpV6TestJSON(TestFlavorsJson):
+ _ip_version = 6
diff --git a/neutron/tests/tempest/api/test_floating_ips.py b/neutron/tests/tempest/api/test_floating_ips.py
new file mode 100644
index 0000000..dd64448
--- /dev/null
+++ b/neutron/tests/tempest/api/test_floating_ips.py
@@ -0,0 +1,60 @@
+# Copyright 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib.common.utils import data_utils
+from tempest import test
+
+from neutron.tests.tempest.api import base
+from neutron.tests.tempest import config
+
+CONF = config.CONF
+
+
+class FloatingIPTestJSON(base.BaseNetworkTest):
+
+ @classmethod
+ @test.requires_ext(extension="router", service="network")
+ def resource_setup(cls):
+ super(FloatingIPTestJSON, cls).resource_setup()
+ cls.ext_net_id = CONF.network.public_network_id
+
+ # Create network, subnet, router and add interface
+ cls.network = cls.create_network()
+ cls.subnet = cls.create_subnet(cls.network)
+ cls.router = cls.create_router(data_utils.rand_name('router-'),
+ external_network_id=cls.ext_net_id)
+ cls.create_router_interface(cls.router['id'], cls.subnet['id'])
+ cls.port = list()
+ # Create two ports one each for Creation and Updating of floatingIP
+ for i in range(2):
+ cls.create_port(cls.network)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('c72c1c0c-2193-4aca-eeee-b1442641ffff')
+ @test.requires_ext(extension="standard-attr-description",
+ service="network")
+ def test_create_update_floatingip_description(self):
+ body = self.client.create_floatingip(
+ floating_network_id=self.ext_net_id,
+ port_id=self.ports[0]['id'],
+ description='d1'
+ )['floatingip']
+ self.assertEqual('d1', body['description'])
+ body = self.client.show_floatingip(body['id'])['floatingip']
+ self.assertEqual('d1', body['description'])
+ body = self.client.update_floatingip(body['id'], description='d2')
+ self.assertEqual('d2', body['floatingip']['description'])
+ body = self.client.show_floatingip(body['floatingip']['id'])
+ self.assertEqual('d2', body['floatingip']['description'])
diff --git a/neutron/tests/tempest/api/test_floating_ips_negative.py b/neutron/tests/tempest/api/test_floating_ips_negative.py
new file mode 100644
index 0000000..700fa10
--- /dev/null
+++ b/neutron/tests/tempest/api/test_floating_ips_negative.py
@@ -0,0 +1,66 @@
+# Copyright 2014 Hewlett-Packard Development Company, L.P.
+# Copyright 2014 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib.common.utils import data_utils
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+
+from neutron.tests.tempest.api import base
+from neutron.tests.tempest import config
+
+CONF = config.CONF
+
+
+class FloatingIPNegativeTestJSON(base.BaseNetworkTest):
+
+ @classmethod
+ @test.requires_ext(extension="router", service="network")
+ def resource_setup(cls):
+ super(FloatingIPNegativeTestJSON, cls).resource_setup()
+ cls.ext_net_id = CONF.network.public_network_id
+ # Create a network with a subnet connected to a router.
+ cls.network = cls.create_network()
+ cls.subnet = cls.create_subnet(cls.network)
+ cls.router = cls.create_router(data_utils.rand_name('router'))
+ cls.create_router_interface(cls.router['id'], cls.subnet['id'])
+ cls.port = cls.create_port(cls.network)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('0b5b8797-6de7-4191-905c-a48b888eb429')
+ def test_associate_floatingip_with_port_with_floatingip(self):
+ net = self.create_network()
+ subnet = self.create_subnet(net)
+ r = self.create_router('test')
+ self.create_router_interface(r['id'], subnet['id'])
+ self.client.update_router(
+ r['id'],
+ external_gateway_info={
+ 'network_id': self.ext_net_id})
+ self.addCleanup(self.client.update_router, self.router['id'],
+ external_gateway_info={})
+ port = self.create_port(net)
+ body1 = self.client.create_floatingip(
+ floating_network_id=self.ext_net_id)
+ floating_ip1 = body1['floatingip']
+ self.addCleanup(self.client.delete_floatingip, floating_ip1['id'])
+ body2 = self.client.create_floatingip(
+ floating_network_id=self.ext_net_id)
+ floating_ip2 = body2['floatingip']
+ self.addCleanup(self.client.delete_floatingip, floating_ip2['id'])
+ self.client.update_floatingip(floating_ip1['id'],
+ port_id=port['id'])
+ self.assertRaises(lib_exc.Conflict, self.client.update_floatingip,
+ floating_ip2['id'], port_id=port['id'])
diff --git a/neutron/tests/tempest/api/test_metering_extensions.py b/neutron/tests/tempest/api/test_metering_extensions.py
new file mode 100644
index 0000000..3ef55db
--- /dev/null
+++ b/neutron/tests/tempest/api/test_metering_extensions.py
@@ -0,0 +1,145 @@
+# Copyright (C) 2014 eNovance SAS <licensing@enovance.com>
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib.common.utils import data_utils
+from tempest import test
+
+from neutron.tests.tempest.api import base
+
+
+class MeteringTestJSON(base.BaseAdminNetworkTest):
+
+ """
+ Tests the following operations in the Neutron API using the REST client for
+ Neutron:
+
+ List, Show, Create, Delete Metering labels
+ List, Show, Create, Delete Metering labels rules
+ """
+
+ @classmethod
+ @test.requires_ext(extension="metering", service="network")
+ def resource_setup(cls):
+ super(MeteringTestJSON, cls).resource_setup()
+ description = "metering label created by tempest"
+ name = data_utils.rand_name("metering-label")
+ cls.metering_label = cls.create_metering_label(name, description)
+ remote_ip_prefix = ("10.0.0.0/24" if cls._ip_version == 4
+ else "fd02::/64")
+ direction = "ingress"
+ cls.metering_label_rule = cls.create_metering_label_rule(
+ remote_ip_prefix, direction,
+ metering_label_id=cls.metering_label['id'])
+
+ def _delete_metering_label(self, metering_label_id):
+ # Deletes a label and verifies if it is deleted or not
+ self.admin_client.delete_metering_label(metering_label_id)
+ # Asserting that the label is not found in list after deletion
+ labels = self.admin_client.list_metering_labels(id=metering_label_id)
+ self.assertEqual(len(labels['metering_labels']), 0)
+
+ def _delete_metering_label_rule(self, metering_label_rule_id):
+ # Deletes a rule and verifies if it is deleted or not
+ self.admin_client.delete_metering_label_rule(
+ metering_label_rule_id)
+ # Asserting that the rule is not found in list after deletion
+ rules = (self.admin_client.list_metering_label_rules(
+ id=metering_label_rule_id))
+ self.assertEqual(len(rules['metering_label_rules']), 0)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('e2fb2f8c-45bf-429a-9f17-171c70444612')
+ def test_list_metering_labels(self):
+ # Verify label filtering
+ body = self.admin_client.list_metering_labels(id=33)
+ metering_labels = body['metering_labels']
+ self.assertEqual(0, len(metering_labels))
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('ec8e15ff-95d0-433b-b8a6-b466bddb1e50')
+ def test_create_delete_metering_label_with_filters(self):
+ # Creates a label
+ name = data_utils.rand_name('metering-label-')
+ description = "label created by tempest"
+ body = self.admin_client.create_metering_label(name=name,
+ description=description)
+ metering_label = body['metering_label']
+ self.addCleanup(self._delete_metering_label,
+ metering_label['id'])
+ # Assert whether created labels are found in labels list or fail
+ # if created labels are not found in labels list
+ labels = (self.admin_client.list_metering_labels(
+ id=metering_label['id']))
+ self.assertEqual(len(labels['metering_labels']), 1)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('30abb445-0eea-472e-bd02-8649f54a5968')
+ def test_show_metering_label(self):
+ # Verifies the details of a label
+ body = self.admin_client.show_metering_label(self.metering_label['id'])
+ metering_label = body['metering_label']
+ self.assertEqual(self.metering_label['id'], metering_label['id'])
+ self.assertEqual(self.metering_label['tenant_id'],
+ metering_label['tenant_id'])
+ self.assertEqual(self.metering_label['name'], metering_label['name'])
+ self.assertEqual(self.metering_label['description'],
+ metering_label['description'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('cc832399-6681-493b-9d79-0202831a1281')
+ def test_list_metering_label_rules(self):
+ # Verify rule filtering
+ body = self.admin_client.list_metering_label_rules(id=33)
+ metering_label_rules = body['metering_label_rules']
+ self.assertEqual(0, len(metering_label_rules))
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('f4d547cd-3aee-408f-bf36-454f8825e045')
+ def test_create_delete_metering_label_rule_with_filters(self):
+ # Creates a rule
+ remote_ip_prefix = ("10.0.1.0/24" if self._ip_version == 4
+ else "fd03::/64")
+ body = (self.admin_client.create_metering_label_rule(
+ remote_ip_prefix=remote_ip_prefix,
+ direction="ingress",
+ metering_label_id=self.metering_label['id']))
+ metering_label_rule = body['metering_label_rule']
+ self.addCleanup(self._delete_metering_label_rule,
+ metering_label_rule['id'])
+ # Assert whether created rules are found in rules list or fail
+ # if created rules are not found in rules list
+ rules = (self.admin_client.list_metering_label_rules(
+ id=metering_label_rule['id']))
+ self.assertEqual(len(rules['metering_label_rules']), 1)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('b7354489-96ea-41f3-9452-bace120fb4a7')
+ def test_show_metering_label_rule(self):
+ # Verifies the details of a rule
+ body = (self.admin_client.show_metering_label_rule(
+ self.metering_label_rule['id']))
+ metering_label_rule = body['metering_label_rule']
+ self.assertEqual(self.metering_label_rule['id'],
+ metering_label_rule['id'])
+ self.assertEqual(self.metering_label_rule['remote_ip_prefix'],
+ metering_label_rule['remote_ip_prefix'])
+ self.assertEqual(self.metering_label_rule['direction'],
+ metering_label_rule['direction'])
+ self.assertEqual(self.metering_label_rule['metering_label_id'],
+ metering_label_rule['metering_label_id'])
+ self.assertFalse(metering_label_rule['excluded'])
+
+
+class MeteringIpV6TestJSON(MeteringTestJSON):
+ _ip_version = 6
diff --git a/neutron/tests/tempest/api/test_network_ip_availability.py b/neutron/tests/tempest/api/test_network_ip_availability.py
new file mode 100644
index 0000000..2976da0
--- /dev/null
+++ b/neutron/tests/tempest/api/test_network_ip_availability.py
@@ -0,0 +1,167 @@
+# Copyright 2016 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import netaddr
+
+from tempest.lib.common.utils import data_utils
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+
+from neutron.tests.tempest.api import base
+from neutron.tests.tempest import config
+
+from neutron_lib import constants as lib_constants
+
+CONF = config.CONF
+
+# 3 IP addresses are taken from every total for IPv4 these are reserved
+DEFAULT_IP4_RESERVED = 3
+# 2 IP addresses are taken from every total for IPv6 these are reserved
+# I assume the reason for having one less than IPv4 is it does not have
+# broadcast address
+DEFAULT_IP6_RESERVED = 2
+
+DELETE_TIMEOUT = 10
+DELETE_SLEEP = 2
+
+
+class NetworksIpAvailabilityTest(base.BaseAdminNetworkTest):
+
+ """
+ Tests the following operations in the Neutron API using the REST client for
+ Neutron:
+
+ test total and used ips for net create
+ test total and ips for net after subnet create
+ test total and used ips for net after subnet and port create
+
+ """
+
+ @classmethod
+ @test.requires_ext(extension="network-ip-availability", service="network")
+ def skip_checks(cls):
+ super(NetworksIpAvailabilityTest, cls).skip_checks()
+
+ def _get_used_ips(self, network, net_availability):
+ if network:
+ for availability in net_availability['network_ip_availabilities']:
+ if availability['network_id'] == network['id']:
+ return availability['used_ips']
+
+ def _cleanUp_port(self, port_id):
+ # delete port, any way to avoid race
+ try:
+ self.client.delete_port(port_id)
+ # if port is not found, this means it was deleted in the test
+ except lib_exc.NotFound:
+ pass
+
+ def _assert_total_and_used_ips(self, expected_used, expected_total,
+ network, net_availability):
+ if network:
+ for availability in net_availability['network_ip_availabilities']:
+ if availability['network_id'] == network['id']:
+ self.assertEqual(expected_total, availability['total_ips'])
+ self.assertEqual(expected_used, availability['used_ips'])
+
+ def _create_subnet(self, network, ip_version):
+ if ip_version == lib_constants.IP_VERSION_4:
+ cidr = netaddr.IPNetwork('20.0.0.0/24')
+ mask_bits = config.safe_get_config_value(
+ 'network', 'project_network_mask_bits')
+ elif ip_version == lib_constants.IP_VERSION_6:
+ cidr = netaddr.IPNetwork('20:db8::/64')
+ mask_bits = config.safe_get_config_value(
+ 'network', 'project_network_v6_mask_bits')
+
+ subnet_cidr = cidr.subnet(mask_bits).next()
+ prefix_len = subnet_cidr.prefixlen
+ subnet = self.create_subnet(network,
+ cidr=subnet_cidr,
+ enable_dhcp=False,
+ mask_bits=mask_bits,
+ ip_version=ip_version)
+ return subnet, prefix_len
+
+
+def calc_total_ips(prefix, ip_version):
+ # will calculate total ips after removing reserved.
+ if ip_version == lib_constants.IP_VERSION_4:
+ total_ips = 2 ** (32 - prefix) - DEFAULT_IP4_RESERVED
+ elif ip_version == lib_constants.IP_VERSION_6:
+ total_ips = 2 ** (128 - prefix) - DEFAULT_IP6_RESERVED
+ return total_ips
+
+
+class NetworksIpAvailabilityIPv4Test(NetworksIpAvailabilityTest):
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('0f33cc8c-1bf6-47d1-9ce1-010618240599')
+ def test_admin_network_availability_before_subnet(self):
+ net_name = data_utils.rand_name('network-')
+ network = self.create_network(network_name=net_name)
+ self.addCleanup(self.client.delete_network, network['id'])
+ net_availability = self.admin_client.list_network_ip_availabilities()
+ self._assert_total_and_used_ips(0, 0, network, net_availability)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('3aecd3b2-16ed-4b87-a54a-91d7b3c2986b')
+ def test_net_ip_availability_after_subnet_and_ports(self):
+ net_name = data_utils.rand_name('network-')
+ network = self.create_network(network_name=net_name)
+ self.addCleanup(self.client.delete_network, network['id'])
+ subnet, prefix = self._create_subnet(network, self._ip_version)
+ self.addCleanup(self.client.delete_subnet, subnet['id'])
+ body = self.admin_client.list_network_ip_availabilities()
+ used_ip = self._get_used_ips(network, body)
+ port1 = self.client.create_port(network_id=network['id'])
+ self.addCleanup(self.client.delete_port, port1['port']['id'])
+ port2 = self.client.create_port(network_id=network['id'])
+ self.addCleanup(self.client.delete_port, port2['port']['id'])
+ net_availability = self.admin_client.list_network_ip_availabilities()
+ self._assert_total_and_used_ips(
+ used_ip + 2,
+ calc_total_ips(prefix, self._ip_version),
+ network, net_availability)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('9f11254d-757b-492e-b14b-f52144e4ee7b')
+ def test_net_ip_availability_after_port_delete(self):
+ net_name = data_utils.rand_name('network-')
+ network = self.create_network(network_name=net_name)
+ self.addCleanup(self.client.delete_network, network['id'])
+ subnet, prefix = self._create_subnet(network, self._ip_version)
+ self.addCleanup(self.client.delete_subnet, subnet['id'])
+ port = self.client.create_port(network_id=network['id'])
+ self.addCleanup(self._cleanUp_port, port['port']['id'])
+ net_availability = self.admin_client.list_network_ip_availabilities()
+ used_ip = self._get_used_ips(network, net_availability)
+ self.client.delete_port(port['port']['id'])
+
+ def get_net_availability():
+ availabilities = self.admin_client.list_network_ip_availabilities()
+ used_ip_after_port_delete = self._get_used_ips(network,
+ availabilities)
+ return used_ip - 1 == used_ip_after_port_delete
+
+ self.assertTrue(
+ test.call_until_true(
+ get_net_availability, DELETE_TIMEOUT, DELETE_SLEEP),
+ msg="IP address did not become available after port delete")
+
+
+class NetworksIpAvailabilityIPv6Test(NetworksIpAvailabilityIPv4Test):
+
+ _ip_version = lib_constants.IP_VERSION_6
diff --git a/neutron/tests/tempest/api/test_networks.py b/neutron/tests/tempest/api/test_networks.py
new file mode 100644
index 0000000..7a315db
--- /dev/null
+++ b/neutron/tests/tempest/api/test_networks.py
@@ -0,0 +1,95 @@
+# Copyright 2012 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest import test
+
+from neutron.tests.tempest.api import base
+from neutron.tests.tempest import config
+
+CONF = config.CONF
+
+
+class NetworksTestJSON(base.BaseNetworkTest):
+
+ """
+ Tests the following operations in the Neutron API using the REST client for
+ Neutron:
+
+ list tenant's networks
+ show a network
+ show a tenant network details
+
+ v2.0 of the Neutron API is assumed.
+ """
+
+ @classmethod
+ def resource_setup(cls):
+ super(NetworksTestJSON, cls).resource_setup()
+ cls.network = cls.create_network()
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('2bf13842-c93f-4a69-83ed-717d2ec3b44e')
+ def test_show_network(self):
+ # Verify the details of a network
+ body = self.client.show_network(self.network['id'])
+ network = body['network']
+ fields = ['id', 'name']
+ if test.is_extension_enabled('net-mtu', 'network'):
+ fields.append('mtu')
+ for key in fields:
+ self.assertEqual(network[key], self.network[key])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('867819bb-c4b6-45f7-acf9-90edcf70aa5e')
+ def test_show_network_fields(self):
+ # Verify specific fields of a network
+ fields = ['id', 'name']
+ if test.is_extension_enabled('net-mtu', 'network'):
+ fields.append('mtu')
+ body = self.client.show_network(self.network['id'],
+ fields=fields)
+ network = body['network']
+ self.assertEqual(sorted(network.keys()), sorted(fields))
+ for field_name in fields:
+ self.assertEqual(network[field_name], self.network[field_name])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('c72c1c0c-2193-4aca-ccc4-b1442640bbbb')
+ @test.requires_ext(extension="standard-attr-description",
+ service="network")
+ def test_create_update_network_description(self):
+ body = self.create_network(description='d1')
+ self.assertEqual('d1', body['description'])
+ net_id = body['id']
+ body = self.client.list_networks(id=net_id)['networks'][0]
+ self.assertEqual('d1', body['description'])
+ body = self.client.update_network(body['id'],
+ description='d2')
+ self.assertEqual('d2', body['network']['description'])
+ body = self.client.list_networks(id=net_id)['networks'][0]
+ self.assertEqual('d2', body['description'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('6ae6d24f-9194-4869-9c85-c313cb20e080')
+ def test_list_networks_fields(self):
+ # Verify specific fields of the networks
+ fields = ['id', 'name']
+ if test.is_extension_enabled('net-mtu', 'network'):
+ fields.append('mtu')
+ body = self.client.list_networks(fields=fields)
+ networks = body['networks']
+ self.assertNotEmpty(networks, "Network list returned is empty")
+ for network in networks:
+ self.assertEqual(sorted(network.keys()), sorted(fields))
diff --git a/neutron/tests/tempest/api/test_ports.py b/neutron/tests/tempest/api/test_ports.py
new file mode 100644
index 0000000..23db50b
--- /dev/null
+++ b/neutron/tests/tempest/api/test_ports.py
@@ -0,0 +1,42 @@
+# Copyright 2014 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest import test
+
+from neutron.tests.tempest.api import base
+
+
+class PortsTestJSON(base.BaseNetworkTest):
+
+ @classmethod
+ def resource_setup(cls):
+ super(PortsTestJSON, cls).resource_setup()
+ cls.network = cls.create_network()
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('c72c1c0c-2193-4aca-bbb4-b1442640bbbb')
+ @test.requires_ext(extension="standard-attr-description",
+ service="network")
+ def test_create_update_port_description(self):
+ body = self.create_port(self.network,
+ description='d1')
+ self.assertEqual('d1', body['description'])
+ body = self.client.list_ports(id=body['id'])['ports'][0]
+ self.assertEqual('d1', body['description'])
+ body = self.client.update_port(body['id'],
+ description='d2')
+ self.assertEqual('d2', body['port']['description'])
+ body = self.client.list_ports(id=body['port']['id'])['ports'][0]
+ self.assertEqual('d2', body['description'])
diff --git a/neutron/tests/tempest/api/test_qos.py b/neutron/tests/tempest/api/test_qos.py
new file mode 100644
index 0000000..e8a8686
--- /dev/null
+++ b/neutron/tests/tempest/api/test_qos.py
@@ -0,0 +1,839 @@
+# Copyright 2015 Red Hat, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib.common.utils import data_utils
+from tempest.lib import exceptions
+from tempest import test
+
+import testtools
+
+from neutron.services.qos import qos_consts
+from neutron.tests.tempest.api import base
+
+
+class QosTestJSON(base.BaseAdminNetworkTest):
+ @classmethod
+ @test.requires_ext(extension="qos", service="network")
+ def resource_setup(cls):
+ super(QosTestJSON, cls).resource_setup()
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('108fbdf7-3463-4e47-9871-d07f3dcf5bbb')
+ def test_create_policy(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy desc1',
+ shared=False)
+
+ # Test 'show policy'
+ retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
+ retrieved_policy = retrieved_policy['policy']
+ self.assertEqual('test-policy', retrieved_policy['name'])
+ self.assertEqual('test policy desc1', retrieved_policy['description'])
+ self.assertFalse(retrieved_policy['shared'])
+
+ # Test 'list policies'
+ policies = self.admin_client.list_qos_policies()['policies']
+ policies_ids = [p['id'] for p in policies]
+ self.assertIn(policy['id'], policies_ids)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('f8d20e92-f06d-4805-b54f-230f77715815')
+ def test_list_policy_filter_by_name(self):
+ self.create_qos_policy(name='test', description='test policy',
+ shared=False)
+ self.create_qos_policy(name='test2', description='test policy',
+ shared=False)
+
+ policies = (self.admin_client.
+ list_qos_policies(name='test')['policies'])
+ self.assertEqual(1, len(policies))
+
+ retrieved_policy = policies[0]
+ self.assertEqual('test', retrieved_policy['name'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('8e88a54b-f0b2-4b7d-b061-a15d93c2c7d6')
+ def test_policy_update(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='',
+ shared=False)
+ self.admin_client.update_qos_policy(policy['id'],
+ description='test policy desc2',
+ shared=True)
+
+ retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
+ retrieved_policy = retrieved_policy['policy']
+ self.assertEqual('test policy desc2', retrieved_policy['description'])
+ self.assertTrue(retrieved_policy['shared'])
+ self.assertEqual([], retrieved_policy['rules'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('1cb42653-54bd-4a9a-b888-c55e18199201')
+ def test_delete_policy(self):
+ policy = self.admin_client.create_qos_policy(
+ 'test-policy', 'desc', True)['policy']
+
+ retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
+ retrieved_policy = retrieved_policy['policy']
+ self.assertEqual('test-policy', retrieved_policy['name'])
+
+ self.admin_client.delete_qos_policy(policy['id'])
+ self.assertRaises(exceptions.NotFound,
+ self.admin_client.show_qos_policy, policy['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('cf776f77-8d3d-49f2-8572-12d6a1557224')
+ def test_list_admin_rule_types(self):
+ self._test_list_rule_types(self.admin_client)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('49c8ea35-83a9-453a-bd23-239cf3b13929')
+ def test_list_regular_rule_types(self):
+ self._test_list_rule_types(self.client)
+
+ def _test_list_rule_types(self, client):
+ # List supported rule types
+ # TODO(QoS): since in gate we run both ovs and linuxbridge ml2 drivers,
+ # and since Linux Bridge ml2 driver does not have QoS support yet, ml2
+ # plugin reports no rule types are supported. Once linuxbridge will
+ # receive support for QoS, the list of expected rule types will change.
+ #
+ # In theory, we could make the test conditional on which ml2 drivers
+ # are enabled in gate (or more specifically, on which supported qos
+ # rules are claimed by core plugin), but that option doesn't seem to be
+ # available thru tempest.lib framework
+ expected_rule_types = []
+ expected_rule_details = ['type']
+
+ rule_types = client.list_qos_rule_types()
+ actual_list_rule_types = rule_types['rule_types']
+ actual_rule_types = [rule['type'] for rule in actual_list_rule_types]
+
+ # Verify that only required fields present in rule details
+ for rule in actual_list_rule_types:
+ self.assertEqual(tuple(rule.keys()), tuple(expected_rule_details))
+
+ # Verify if expected rules are present in the actual rules list
+ for rule in expected_rule_types:
+ self.assertIn(rule, actual_rule_types)
+
+ def _disassociate_network(self, client, network_id):
+ client.update_network(network_id, qos_policy_id=None)
+ updated_network = self.admin_client.show_network(network_id)
+ self.assertIsNone(updated_network['network']['qos_policy_id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('65b9ef75-1911-406a-bbdb-ca1d68d528b0')
+ def test_policy_association_with_admin_network(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ network = self.create_shared_network('test network',
+ qos_policy_id=policy['id'])
+
+ retrieved_network = self.admin_client.show_network(network['id'])
+ self.assertEqual(
+ policy['id'], retrieved_network['network']['qos_policy_id'])
+
+ self._disassociate_network(self.admin_client, network['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('1738de5d-0476-4163-9022-5e1b548c208e')
+ def test_policy_association_with_tenant_network(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=True)
+ network = self.create_network('test network',
+ qos_policy_id=policy['id'])
+
+ retrieved_network = self.admin_client.show_network(network['id'])
+ self.assertEqual(
+ policy['id'], retrieved_network['network']['qos_policy_id'])
+
+ self._disassociate_network(self.client, network['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('9efe63d0-836f-4cc2-b00c-468e63aa614e')
+ def test_policy_association_with_network_nonexistent_policy(self):
+ self.assertRaises(
+ exceptions.NotFound,
+ self.create_network,
+ 'test network',
+ qos_policy_id='9efe63d0-836f-4cc2-b00c-468e63aa614e')
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('1aa55a79-324f-47d9-a076-894a8fc2448b')
+ def test_policy_association_with_network_non_shared_policy(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ self.assertRaises(
+ exceptions.NotFound,
+ self.create_network,
+ 'test network', qos_policy_id=policy['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('09a9392c-1359-4cbb-989f-fb768e5834a8')
+ def test_policy_update_association_with_admin_network(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ network = self.create_shared_network('test network')
+ retrieved_network = self.admin_client.show_network(network['id'])
+ self.assertIsNone(retrieved_network['network']['qos_policy_id'])
+
+ self.admin_client.update_network(network['id'],
+ qos_policy_id=policy['id'])
+ retrieved_network = self.admin_client.show_network(network['id'])
+ self.assertEqual(
+ policy['id'], retrieved_network['network']['qos_policy_id'])
+
+ self._disassociate_network(self.admin_client, network['id'])
+
+ def _disassociate_port(self, port_id):
+ self.client.update_port(port_id, qos_policy_id=None)
+ updated_port = self.admin_client.show_port(port_id)
+ self.assertIsNone(updated_port['port']['qos_policy_id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('98fcd95e-84cf-4746-860e-44692e674f2e')
+ def test_policy_association_with_port_shared_policy(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=True)
+ network = self.create_shared_network('test network')
+ port = self.create_port(network, qos_policy_id=policy['id'])
+
+ retrieved_port = self.admin_client.show_port(port['id'])
+ self.assertEqual(
+ policy['id'], retrieved_port['port']['qos_policy_id'])
+
+ self._disassociate_port(port['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('49e02f5a-e1dd-41d5-9855-cfa37f2d195e')
+ def test_policy_association_with_port_nonexistent_policy(self):
+ network = self.create_shared_network('test network')
+ self.assertRaises(
+ exceptions.NotFound,
+ self.create_port,
+ network,
+ qos_policy_id='49e02f5a-e1dd-41d5-9855-cfa37f2d195e')
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('f53d961c-9fe5-4422-8b66-7add972c6031')
+ def test_policy_association_with_port_non_shared_policy(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ network = self.create_shared_network('test network')
+ self.assertRaises(
+ exceptions.NotFound,
+ self.create_port,
+ network, qos_policy_id=policy['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('f8163237-fba9-4db5-9526-bad6d2343c76')
+ def test_policy_update_association_with_port_shared_policy(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=True)
+ network = self.create_shared_network('test network')
+ port = self.create_port(network)
+ retrieved_port = self.admin_client.show_port(port['id'])
+ self.assertIsNone(retrieved_port['port']['qos_policy_id'])
+
+ self.client.update_port(port['id'], qos_policy_id=policy['id'])
+ retrieved_port = self.admin_client.show_port(port['id'])
+ self.assertEqual(
+ policy['id'], retrieved_port['port']['qos_policy_id'])
+
+ self._disassociate_port(port['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('18163237-8ba9-4db5-9525-bad6d2343c75')
+ def test_delete_not_allowed_if_policy_in_use_by_network(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=True)
+ network = self.create_shared_network(
+ 'test network', qos_policy_id=policy['id'])
+ self.assertRaises(
+ exceptions.Conflict,
+ self.admin_client.delete_qos_policy, policy['id'])
+
+ self._disassociate_network(self.admin_client, network['id'])
+ self.admin_client.delete_qos_policy(policy['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('24153230-84a9-4dd5-9525-bad6d2343c75')
+ def test_delete_not_allowed_if_policy_in_use_by_port(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=True)
+ network = self.create_shared_network('test network')
+ port = self.create_port(network, qos_policy_id=policy['id'])
+ self.assertRaises(
+ exceptions.Conflict,
+ self.admin_client.delete_qos_policy, policy['id'])
+
+ self._disassociate_port(port['id'])
+ self.admin_client.delete_qos_policy(policy['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('a2a5849b-dd06-4b18-9664-0b6828a1fc27')
+ def test_qos_policy_delete_with_rules(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ self.admin_client.create_bandwidth_limit_rule(
+ policy['id'], 200, 1337)['bandwidth_limit_rule']
+
+ self.admin_client.delete_qos_policy(policy['id'])
+
+ with testtools.ExpectedException(exceptions.NotFound):
+ self.admin_client.show_qos_policy(policy['id'])
+
+
+class QosBandwidthLimitRuleTestJSON(base.BaseAdminNetworkTest):
+ @classmethod
+ @test.requires_ext(extension="qos", service="network")
+ def resource_setup(cls):
+ super(QosBandwidthLimitRuleTestJSON, cls).resource_setup()
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('8a59b00b-3e9c-4787-92f8-93a5cdf5e378')
+ def test_rule_create(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ rule = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
+ max_kbps=200,
+ max_burst_kbps=1337)
+
+ # Test 'show rule'
+ retrieved_rule = self.admin_client.show_bandwidth_limit_rule(
+ policy['id'], rule['id'])
+ retrieved_rule = retrieved_rule['bandwidth_limit_rule']
+ self.assertEqual(rule['id'], retrieved_rule['id'])
+ self.assertEqual(200, retrieved_rule['max_kbps'])
+ self.assertEqual(1337, retrieved_rule['max_burst_kbps'])
+
+ # Test 'list rules'
+ rules = self.admin_client.list_bandwidth_limit_rules(policy['id'])
+ rules = rules['bandwidth_limit_rules']
+ rules_ids = [r['id'] for r in rules]
+ self.assertIn(rule['id'], rules_ids)
+
+ # Test 'show policy'
+ retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
+ policy_rules = retrieved_policy['policy']['rules']
+ self.assertEqual(1, len(policy_rules))
+ self.assertEqual(rule['id'], policy_rules[0]['id'])
+ self.assertEqual(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT,
+ policy_rules[0]['type'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('8a59b00b-ab01-4787-92f8-93a5cdf5e378')
+ def test_rule_create_fail_for_the_same_type(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
+ max_kbps=200,
+ max_burst_kbps=1337)
+
+ self.assertRaises(exceptions.Conflict,
+ self.create_qos_bandwidth_limit_rule,
+ policy_id=policy['id'],
+ max_kbps=201, max_burst_kbps=1338)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('149a6988-2568-47d2-931e-2dbc858943b3')
+ def test_rule_update(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ rule = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
+ max_kbps=1,
+ max_burst_kbps=1)
+
+ self.admin_client.update_bandwidth_limit_rule(policy['id'],
+ rule['id'],
+ max_kbps=200,
+ max_burst_kbps=1337)
+
+ retrieved_policy = self.admin_client.show_bandwidth_limit_rule(
+ policy['id'], rule['id'])
+ retrieved_policy = retrieved_policy['bandwidth_limit_rule']
+ self.assertEqual(200, retrieved_policy['max_kbps'])
+ self.assertEqual(1337, retrieved_policy['max_burst_kbps'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('67ee6efd-7b33-4a68-927d-275b4f8ba958')
+ def test_rule_delete(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ rule = self.admin_client.create_bandwidth_limit_rule(
+ policy['id'], 200, 1337)['bandwidth_limit_rule']
+
+ retrieved_policy = self.admin_client.show_bandwidth_limit_rule(
+ policy['id'], rule['id'])
+ retrieved_policy = retrieved_policy['bandwidth_limit_rule']
+ self.assertEqual(rule['id'], retrieved_policy['id'])
+
+ self.admin_client.delete_bandwidth_limit_rule(policy['id'], rule['id'])
+ self.assertRaises(exceptions.NotFound,
+ self.admin_client.show_bandwidth_limit_rule,
+ policy['id'], rule['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('f211222c-5808-46cb-a961-983bbab6b852')
+ def test_rule_create_rule_nonexistent_policy(self):
+ self.assertRaises(
+ exceptions.NotFound,
+ self.create_qos_bandwidth_limit_rule,
+ 'policy', 200, 1337)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('eed8e2a6-22da-421b-89b9-935a2c1a1b50')
+ def test_policy_create_forbidden_for_regular_tenants(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.create_qos_policy,
+ 'test-policy', 'test policy', False)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('a4a2e7ad-786f-4927-a85a-e545a93bd274')
+ def test_rule_create_forbidden_for_regular_tenants(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.create_bandwidth_limit_rule,
+ 'policy', 1, 2)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('ce0bd0c2-54d9-4e29-85f1-cfb36ac3ebe2')
+ def test_get_rules_by_policy(self):
+ policy1 = self.create_qos_policy(name='test-policy1',
+ description='test policy1',
+ shared=False)
+ rule1 = self.create_qos_bandwidth_limit_rule(policy_id=policy1['id'],
+ max_kbps=200,
+ max_burst_kbps=1337)
+
+ policy2 = self.create_qos_policy(name='test-policy2',
+ description='test policy2',
+ shared=False)
+ rule2 = self.create_qos_bandwidth_limit_rule(policy_id=policy2['id'],
+ max_kbps=5000,
+ max_burst_kbps=2523)
+
+ # Test 'list rules'
+ rules = self.admin_client.list_bandwidth_limit_rules(policy1['id'])
+ rules = rules['bandwidth_limit_rules']
+ rules_ids = [r['id'] for r in rules]
+ self.assertIn(rule1['id'], rules_ids)
+ self.assertNotIn(rule2['id'], rules_ids)
+
+
+class RbacSharedQosPoliciesTest(base.BaseAdminNetworkTest):
+
+ force_tenant_isolation = True
+ credentials = ['primary', 'alt', 'admin']
+
+ @classmethod
+ @test.requires_ext(extension="qos", service="network")
+ def resource_setup(cls):
+ super(RbacSharedQosPoliciesTest, cls).resource_setup()
+ cls.client2 = cls.alt_manager.network_client
+
+ def _create_qos_policy(self, tenant_id=None):
+ args = {'name': data_utils.rand_name('test-policy'),
+ 'description': 'test policy',
+ 'shared': False,
+ 'tenant_id': tenant_id}
+ qos_policy = self.admin_client.create_qos_policy(**args)['policy']
+ self.addCleanup(self.admin_client.delete_qos_policy, qos_policy['id'])
+
+ return qos_policy
+
+ def _make_admin_policy_shared_to_tenant_id(self, tenant_id):
+ policy = self._create_qos_policy()
+ rbac_policy = self.admin_client.create_rbac_policy(
+ object_type='qos_policy',
+ object_id=policy['id'],
+ action='access_as_shared',
+ target_tenant=tenant_id,
+ )['rbac_policy']
+
+ return {'policy': policy, 'rbac_policy': rbac_policy}
+
+ def _create_network(self, qos_policy_id, client, should_cleanup=True):
+ net = client.create_network(
+ name=data_utils.rand_name('test-network'),
+ qos_policy_id=qos_policy_id)['network']
+ if should_cleanup:
+ self.addCleanup(client.delete_network, net['id'])
+
+ return net
+
+ @test.idempotent_id('b9dcf582-d3b3-11e5-950a-54ee756c66df')
+ def test_policy_sharing_with_wildcard(self):
+ qos_pol = self.create_qos_policy(
+ name=data_utils.rand_name('test-policy'),
+ description='test-shared-policy', shared=False)
+ self.assertNotIn(qos_pol, self.client2.list_qos_policies()['policies'])
+
+ # test update shared False -> True
+ self.admin_client.update_qos_policy(qos_pol['id'], shared=True)
+ qos_pol['shared'] = True
+ self.client2.show_qos_policy(qos_pol['id'])
+ rbac_pol = {'target_tenant': '*',
+ 'tenant_id': self.admin_client.tenant_id,
+ 'object_type': 'qos_policy',
+ 'object_id': qos_pol['id'],
+ 'action': 'access_as_shared'}
+
+ rbac_policies = self.admin_client.list_rbac_policies()['rbac_policies']
+ rbac_policies = [r for r in rbac_policies if r.pop('id')]
+ self.assertIn(rbac_pol, rbac_policies)
+
+ # update shared True -> False should fail because the policy is bound
+ # to a network
+ net = self._create_network(qos_pol['id'], self.admin_client, False)
+ with testtools.ExpectedException(exceptions.Conflict):
+ self.admin_client.update_qos_policy(qos_pol['id'], shared=False)
+
+ # delete the network, and update shared True -> False should pass now
+ self.admin_client.delete_network(net['id'])
+ self.admin_client.update_qos_policy(qos_pol['id'], shared=False)
+ qos_pol['shared'] = False
+ self.assertNotIn(qos_pol, self.client2.list_qos_policies()['policies'])
+
+ def _create_net_bound_qos_rbacs(self):
+ res = self._make_admin_policy_shared_to_tenant_id(
+ self.client.tenant_id)
+ qos_policy, rbac_for_client_tenant = res['policy'], res['rbac_policy']
+
+ # add a wildcard rbac rule - now the policy globally shared
+ rbac_wildcard = self.admin_client.create_rbac_policy(
+ object_type='qos_policy',
+ object_id=qos_policy['id'],
+ action='access_as_shared',
+ target_tenant='*',
+ )['rbac_policy']
+
+ # tenant1 now uses qos policy for net
+ self._create_network(qos_policy['id'], self.client)
+
+ return rbac_for_client_tenant, rbac_wildcard
+
+ @test.idempotent_id('328b1f70-d424-11e5-a57f-54ee756c66df')
+ def test_net_bound_shared_policy_wildcard_and_tenant_id_wild_remove(self):
+ client_rbac, wildcard_rbac = self._create_net_bound_qos_rbacs()
+ # globally unshare the qos-policy, the specific share should remain
+ self.admin_client.delete_rbac_policy(wildcard_rbac['id'])
+ self.client.list_rbac_policies(id=client_rbac['id'])
+
+ @test.idempotent_id('328b1f70-d424-11e5-a57f-54ee756c66df')
+ def test_net_bound_shared_policy_wildcard_and_tenant_id_wild_remains(self):
+ client_rbac, wildcard_rbac = self._create_net_bound_qos_rbacs()
+ # remove client_rbac policy the wildcard share should remain
+ self.admin_client.delete_rbac_policy(client_rbac['id'])
+ self.client.list_rbac_policies(id=wildcard_rbac['id'])
+
+ @test.idempotent_id('2ace9adc-da6e-11e5-aafe-54ee756c66df')
+ def test_policy_sharing_with_wildcard_and_tenant_id(self):
+ res = self._make_admin_policy_shared_to_tenant_id(
+ self.client.tenant_id)
+ qos_policy, rbac = res['policy'], res['rbac_policy']
+ qos_pol = self.client.show_qos_policy(qos_policy['id'])['policy']
+ self.assertTrue(qos_pol['shared'])
+ with testtools.ExpectedException(exceptions.NotFound):
+ self.client2.show_qos_policy(qos_policy['id'])
+
+ # make the qos-policy globally shared
+ self.admin_client.update_qos_policy(qos_policy['id'], shared=True)
+ qos_pol = self.client2.show_qos_policy(qos_policy['id'])['policy']
+ self.assertTrue(qos_pol['shared'])
+
+ # globally unshare the qos-policy, the specific share should remain
+ self.admin_client.update_qos_policy(qos_policy['id'], shared=False)
+ self.client.show_qos_policy(qos_policy['id'])
+ with testtools.ExpectedException(exceptions.NotFound):
+ self.client2.show_qos_policy(qos_policy['id'])
+ self.assertIn(rbac,
+ self.admin_client.list_rbac_policies()['rbac_policies'])
+
+ @test.idempotent_id('9f85c76a-a350-11e5-8ae5-54ee756c66df')
+ def test_policy_target_update(self):
+ res = self._make_admin_policy_shared_to_tenant_id(
+ self.client.tenant_id)
+ # change to client2
+ update_res = self.admin_client.update_rbac_policy(
+ res['rbac_policy']['id'], target_tenant=self.client2.tenant_id)
+ self.assertEqual(self.client2.tenant_id,
+ update_res['rbac_policy']['target_tenant'])
+ # make sure everything else stayed the same
+ res['rbac_policy'].pop('target_tenant')
+ update_res['rbac_policy'].pop('target_tenant')
+ self.assertEqual(res['rbac_policy'], update_res['rbac_policy'])
+
+ @test.idempotent_id('a9b39f46-a350-11e5-97c7-54ee756c66df')
+ def test_network_presence_prevents_policy_rbac_policy_deletion(self):
+ res = self._make_admin_policy_shared_to_tenant_id(
+ self.client2.tenant_id)
+ qos_policy_id = res['policy']['id']
+ self._create_network(qos_policy_id, self.client2)
+ # a network with shared qos-policy should prevent the deletion of an
+ # rbac-policy required for it to be shared
+ with testtools.ExpectedException(exceptions.Conflict):
+ self.admin_client.delete_rbac_policy(res['rbac_policy']['id'])
+
+ # a wildcard policy should allow the specific policy to be deleted
+ # since it allows the remaining port
+ wild = self.admin_client.create_rbac_policy(
+ object_type='qos_policy', object_id=res['policy']['id'],
+ action='access_as_shared', target_tenant='*')['rbac_policy']
+ self.admin_client.delete_rbac_policy(res['rbac_policy']['id'])
+
+ # now that wildcard is the only remaining, it should be subjected to
+ # the same restriction
+ with testtools.ExpectedException(exceptions.Conflict):
+ self.admin_client.delete_rbac_policy(wild['id'])
+
+ # we can't update the policy to a different tenant
+ with testtools.ExpectedException(exceptions.Conflict):
+ self.admin_client.update_rbac_policy(
+ wild['id'], target_tenant=self.client2.tenant_id)
+
+ @test.idempotent_id('b0fe87e8-a350-11e5-9f08-54ee756c66df')
+ def test_regular_client_shares_to_another_regular_client(self):
+ # owned by self.admin_client
+ policy = self._create_qos_policy()
+ with testtools.ExpectedException(exceptions.NotFound):
+ self.client.show_qos_policy(policy['id'])
+ rbac_policy = self.admin_client.create_rbac_policy(
+ object_type='qos_policy', object_id=policy['id'],
+ action='access_as_shared',
+ target_tenant=self.client.tenant_id)['rbac_policy']
+ self.client.show_qos_policy(policy['id'])
+
+ self.assertIn(rbac_policy,
+ self.admin_client.list_rbac_policies()['rbac_policies'])
+ # ensure that 'client2' can't see the rbac-policy sharing the
+ # qos-policy to it because the rbac-policy belongs to 'client'
+ self.assertNotIn(rbac_policy['id'], [p['id'] for p in
+ self.client2.list_rbac_policies()['rbac_policies']])
+
+ @test.idempotent_id('ba88d0ca-a350-11e5-a06f-54ee756c66df')
+ def test_filter_fields(self):
+ policy = self._create_qos_policy()
+ self.admin_client.create_rbac_policy(
+ object_type='qos_policy', object_id=policy['id'],
+ action='access_as_shared', target_tenant=self.client2.tenant_id)
+ field_args = (('id',), ('id', 'action'), ('object_type', 'object_id'),
+ ('tenant_id', 'target_tenant'))
+ for fields in field_args:
+ res = self.admin_client.list_rbac_policies(fields=fields)
+ self.assertEqual(set(fields), set(res['rbac_policies'][0].keys()))
+
+ @test.idempotent_id('c10d993a-a350-11e5-9c7a-54ee756c66df')
+ def test_rbac_policy_show(self):
+ res = self._make_admin_policy_shared_to_tenant_id(
+ self.client.tenant_id)
+ p1 = res['rbac_policy']
+ p2 = self.admin_client.create_rbac_policy(
+ object_type='qos_policy', object_id=res['policy']['id'],
+ action='access_as_shared',
+ target_tenant='*')['rbac_policy']
+
+ self.assertEqual(
+ p1, self.admin_client.show_rbac_policy(p1['id'])['rbac_policy'])
+ self.assertEqual(
+ p2, self.admin_client.show_rbac_policy(p2['id'])['rbac_policy'])
+
+ @test.idempotent_id('c7496f86-a350-11e5-b380-54ee756c66df')
+ def test_filter_rbac_policies(self):
+ policy = self._create_qos_policy()
+ rbac_pol1 = self.admin_client.create_rbac_policy(
+ object_type='qos_policy', object_id=policy['id'],
+ action='access_as_shared',
+ target_tenant=self.client2.tenant_id)['rbac_policy']
+ rbac_pol2 = self.admin_client.create_rbac_policy(
+ object_type='qos_policy', object_id=policy['id'],
+ action='access_as_shared',
+ target_tenant=self.admin_client.tenant_id)['rbac_policy']
+ res1 = self.admin_client.list_rbac_policies(id=rbac_pol1['id'])[
+ 'rbac_policies']
+ res2 = self.admin_client.list_rbac_policies(id=rbac_pol2['id'])[
+ 'rbac_policies']
+ self.assertEqual(1, len(res1))
+ self.assertEqual(1, len(res2))
+ self.assertEqual(rbac_pol1['id'], res1[0]['id'])
+ self.assertEqual(rbac_pol2['id'], res2[0]['id'])
+
+ @test.idempotent_id('cd7d755a-a350-11e5-a344-54ee756c66df')
+ def test_regular_client_blocked_from_sharing_anothers_policy(self):
+ qos_policy = self._make_admin_policy_shared_to_tenant_id(
+ self.client.tenant_id)['policy']
+ with testtools.ExpectedException(exceptions.BadRequest):
+ self.client.create_rbac_policy(
+ object_type='qos_policy', object_id=qos_policy['id'],
+ action='access_as_shared',
+ target_tenant=self.client2.tenant_id)
+
+ # make sure the rbac-policy is invisible to the tenant for which it's
+ # being shared
+ self.assertFalse(self.client.list_rbac_policies()['rbac_policies'])
+
+
+class QosDscpMarkingRuleTestJSON(base.BaseAdminNetworkTest):
+ VALID_DSCP_MARK1 = 56
+ VALID_DSCP_MARK2 = 48
+
+ @classmethod
+ @test.requires_ext(extension="qos", service="network")
+ def resource_setup(cls):
+ super(QosDscpMarkingRuleTestJSON, cls).resource_setup()
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('8a59b00b-3e9c-4787-92f8-93a5cdf5e378')
+ def test_rule_create(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ rule = self.admin_client.create_dscp_marking_rule(
+ policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
+
+ # Test 'show rule'
+ retrieved_rule = self.admin_client.show_dscp_marking_rule(
+ policy['id'], rule['id'])
+ retrieved_rule = retrieved_rule['dscp_marking_rule']
+ self.assertEqual(rule['id'], retrieved_rule['id'])
+ self.assertEqual(self.VALID_DSCP_MARK1, retrieved_rule['dscp_mark'])
+
+ # Test 'list rules'
+ rules = self.admin_client.list_dscp_marking_rules(policy['id'])
+ rules = rules['dscp_marking_rules']
+ rules_ids = [r['id'] for r in rules]
+ self.assertIn(rule['id'], rules_ids)
+
+ # Test 'show policy'
+ retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
+ policy_rules = retrieved_policy['policy']['rules']
+ self.assertEqual(1, len(policy_rules))
+ self.assertEqual(rule['id'], policy_rules[0]['id'])
+ self.assertEqual(qos_consts.RULE_TYPE_DSCP_MARK,
+ policy_rules[0]['type'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('8a59b00b-ab01-4787-92f8-93a5cdf5e378')
+ def test_rule_create_fail_for_the_same_type(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ self.admin_client.create_dscp_marking_rule(
+ policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
+
+ self.assertRaises(exceptions.Conflict,
+ self.admin_client.create_dscp_marking_rule,
+ policy_id=policy['id'],
+ dscp_mark=self.VALID_DSCP_MARK2)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('149a6988-2568-47d2-931e-2dbc858943b3')
+ def test_rule_update(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ rule = self.admin_client.create_dscp_marking_rule(
+ policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
+
+ self.admin_client.update_dscp_marking_rule(
+ policy['id'], rule['id'], dscp_mark=self.VALID_DSCP_MARK2)
+
+ retrieved_policy = self.admin_client.show_dscp_marking_rule(
+ policy['id'], rule['id'])
+ retrieved_policy = retrieved_policy['dscp_marking_rule']
+ self.assertEqual(self.VALID_DSCP_MARK2, retrieved_policy['dscp_mark'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('67ee6efd-7b33-4a68-927d-275b4f8ba958')
+ def test_rule_delete(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ rule = self.admin_client.create_dscp_marking_rule(
+ policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
+
+ retrieved_policy = self.admin_client.show_dscp_marking_rule(
+ policy['id'], rule['id'])
+ retrieved_policy = retrieved_policy['dscp_marking_rule']
+ self.assertEqual(rule['id'], retrieved_policy['id'])
+
+ self.admin_client.delete_dscp_marking_rule(policy['id'], rule['id'])
+ self.assertRaises(exceptions.NotFound,
+ self.admin_client.show_dscp_marking_rule,
+ policy['id'], rule['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('f211222c-5808-46cb-a961-983bbab6b852')
+ def test_rule_create_rule_nonexistent_policy(self):
+ self.assertRaises(
+ exceptions.NotFound,
+ self.admin_client.create_dscp_marking_rule,
+ 'policy', self.VALID_DSCP_MARK1)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('a4a2e7ad-786f-4927-a85a-e545a93bd274')
+ def test_rule_create_forbidden_for_regular_tenants(self):
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.create_dscp_marking_rule,
+ 'policy', self.VALID_DSCP_MARK1)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('33646b08-4f05-4493-a48a-bde768a18533')
+ def test_invalid_rule_create(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False)
+ self.assertRaises(
+ exceptions.BadRequest,
+ self.admin_client.create_dscp_marking_rule,
+ policy['id'], 58)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('ce0bd0c2-54d9-4e29-85f1-cfb36ac3ebe2')
+ def test_get_rules_by_policy(self):
+ policy1 = self.create_qos_policy(name='test-policy1',
+ description='test policy1',
+ shared=False)
+ rule1 = self.admin_client.create_dscp_marking_rule(
+ policy1['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
+
+ policy2 = self.create_qos_policy(name='test-policy2',
+ description='test policy2',
+ shared=False)
+ rule2 = self.admin_client.create_dscp_marking_rule(
+ policy2['id'], self.VALID_DSCP_MARK2)['dscp_marking_rule']
+
+ # Test 'list rules'
+ rules = self.admin_client.list_dscp_marking_rules(policy1['id'])
+ rules = rules['dscp_marking_rules']
+ rules_ids = [r['id'] for r in rules]
+ self.assertIn(rule1['id'], rules_ids)
+ self.assertNotIn(rule2['id'], rules_ids)
diff --git a/neutron/tests/tempest/api/test_routers.py b/neutron/tests/tempest/api/test_routers.py
new file mode 100644
index 0000000..6acf259
--- /dev/null
+++ b/neutron/tests/tempest/api/test_routers.py
@@ -0,0 +1,256 @@
+# Copyright 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import netaddr
+import six
+from tempest.lib.common.utils import data_utils
+from tempest import test
+
+from neutron.tests.tempest.api import base_routers as base
+from neutron.tests.tempest import config
+
+CONF = config.CONF
+
+
+class RoutersTest(base.BaseRouterTest):
+
+ @classmethod
+ @test.requires_ext(extension="router", service="network")
+ def skip_checks(cls):
+ super(RoutersTest, cls).skip_checks()
+
+ @classmethod
+ def resource_setup(cls):
+ super(RoutersTest, cls).resource_setup()
+ cls.tenant_cidr = (
+ config.safe_get_config_value('network', 'project_network_cidr')
+ if cls._ip_version == 4 else
+ config.safe_get_config_value('network', 'project_network_v6_cidr'))
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('c72c1c0c-2193-4aca-eeee-b1442640eeee')
+ @test.requires_ext(extension="standard-attr-description",
+ service="network")
+ def test_create_update_router_description(self):
+ body = self.create_router(description='d1', router_name='test')
+ self.assertEqual('d1', body['description'])
+ body = self.client.show_router(body['id'])['router']
+ self.assertEqual('d1', body['description'])
+ body = self.client.update_router(body['id'], description='d2')
+ self.assertEqual('d2', body['router']['description'])
+ body = self.client.show_router(body['router']['id'])['router']
+ self.assertEqual('d2', body['description'])
+
+ @test.idempotent_id('847257cc-6afd-4154-b8fb-af49f5670ce8')
+ @test.requires_ext(extension='ext-gw-mode', service='network')
+ @test.attr(type='smoke')
+ def test_create_router_with_default_snat_value(self):
+ # Create a router with default snat rule
+ name = data_utils.rand_name('router')
+ router = self._create_router(
+ name, external_network_id=CONF.network.public_network_id)
+ self._verify_router_gateway(
+ router['id'], {'network_id': CONF.network.public_network_id,
+ 'enable_snat': True})
+
+ @test.idempotent_id('ea74068d-09e9-4fd7-8995-9b6a1ace920f')
+ @test.requires_ext(extension='ext-gw-mode', service='network')
+ @test.attr(type='smoke')
+ def test_create_router_with_snat_explicit(self):
+ name = data_utils.rand_name('snat-router')
+ # Create a router enabling snat attributes
+ enable_snat_states = [False, True]
+ for enable_snat in enable_snat_states:
+ external_gateway_info = {
+ 'network_id': CONF.network.public_network_id,
+ 'enable_snat': enable_snat}
+ create_body = self.admin_client.create_router(
+ name, external_gateway_info=external_gateway_info)
+ self.addCleanup(self.admin_client.delete_router,
+ create_body['router']['id'])
+ # Verify snat attributes after router creation
+ self._verify_router_gateway(create_body['router']['id'],
+ exp_ext_gw_info=external_gateway_info)
+
+ def _verify_router_gateway(self, router_id, exp_ext_gw_info=None):
+ show_body = self.admin_client.show_router(router_id)
+ actual_ext_gw_info = show_body['router']['external_gateway_info']
+ if exp_ext_gw_info is None:
+ self.assertIsNone(actual_ext_gw_info)
+ return
+ # Verify only keys passed in exp_ext_gw_info
+ for k, v in six.iteritems(exp_ext_gw_info):
+ self.assertEqual(v, actual_ext_gw_info[k])
+
+ def _verify_gateway_port(self, router_id):
+ list_body = self.admin_client.list_ports(
+ network_id=CONF.network.public_network_id,
+ device_id=router_id)
+ self.assertEqual(len(list_body['ports']), 1)
+ gw_port = list_body['ports'][0]
+ fixed_ips = gw_port['fixed_ips']
+ self.assertGreaterEqual(len(fixed_ips), 1)
+ public_net_body = self.admin_client.show_network(
+ CONF.network.public_network_id)
+ public_subnet_id = public_net_body['network']['subnets'][0]
+ self.assertIn(public_subnet_id,
+ [x['subnet_id'] for x in fixed_ips])
+
+ @test.idempotent_id('b386c111-3b21-466d-880c-5e72b01e1a33')
+ @test.requires_ext(extension='ext-gw-mode', service='network')
+ @test.attr(type='smoke')
+ def test_update_router_set_gateway_with_snat_explicit(self):
+ router = self._create_router(data_utils.rand_name('router-'))
+ self.admin_client.update_router_with_snat_gw_info(
+ router['id'],
+ external_gateway_info={
+ 'network_id': CONF.network.public_network_id,
+ 'enable_snat': True})
+ self._verify_router_gateway(
+ router['id'],
+ {'network_id': CONF.network.public_network_id,
+ 'enable_snat': True})
+ self._verify_gateway_port(router['id'])
+
+ @test.idempotent_id('96536bc7-8262-4fb2-9967-5c46940fa279')
+ @test.requires_ext(extension='ext-gw-mode', service='network')
+ @test.attr(type='smoke')
+ def test_update_router_set_gateway_without_snat(self):
+ router = self._create_router(data_utils.rand_name('router-'))
+ self.admin_client.update_router_with_snat_gw_info(
+ router['id'],
+ external_gateway_info={
+ 'network_id': CONF.network.public_network_id,
+ 'enable_snat': False})
+ self._verify_router_gateway(
+ router['id'],
+ {'network_id': CONF.network.public_network_id,
+ 'enable_snat': False})
+ self._verify_gateway_port(router['id'])
+
+ @test.idempotent_id('f2faf994-97f4-410b-a831-9bc977b64374')
+ @test.requires_ext(extension='ext-gw-mode', service='network')
+ @test.attr(type='smoke')
+ def test_update_router_reset_gateway_without_snat(self):
+ router = self._create_router(
+ data_utils.rand_name('router-'),
+ external_network_id=CONF.network.public_network_id)
+ self.admin_client.update_router_with_snat_gw_info(
+ router['id'],
+ external_gateway_info={
+ 'network_id': CONF.network.public_network_id,
+ 'enable_snat': False})
+ self._verify_router_gateway(
+ router['id'],
+ {'network_id': CONF.network.public_network_id,
+ 'enable_snat': False})
+ self._verify_gateway_port(router['id'])
+
+ @test.idempotent_id('c86ac3a8-50bd-4b00-a6b8-62af84a0765c')
+ @test.requires_ext(extension='extraroute', service='network')
+ @test.attr(type='smoke')
+ def test_update_extra_route(self):
+ self.network = self.create_network()
+ self.name = self.network['name']
+ self.subnet = self.create_subnet(self.network)
+ # Add router interface with subnet id
+ self.router = self._create_router(
+ data_utils.rand_name('router-'), True)
+ self.create_router_interface(self.router['id'], self.subnet['id'])
+ self.addCleanup(
+ self._delete_extra_routes,
+ self.router['id'])
+ # Update router extra route, second ip of the range is
+ # used as next hop
+ cidr = netaddr.IPNetwork(self.subnet['cidr'])
+ next_hop = str(cidr[2])
+ destination = str(self.subnet['cidr'])
+ extra_route = self.client.update_extra_routes(self.router['id'],
+ next_hop, destination)
+ self.assertEqual(1, len(extra_route['router']['routes']))
+ self.assertEqual(destination,
+ extra_route['router']['routes'][0]['destination'])
+ self.assertEqual(next_hop,
+ extra_route['router']['routes'][0]['nexthop'])
+ show_body = self.client.show_router(self.router['id'])
+ self.assertEqual(destination,
+ show_body['router']['routes'][0]['destination'])
+ self.assertEqual(next_hop,
+ show_body['router']['routes'][0]['nexthop'])
+
+ def _delete_extra_routes(self, router_id):
+ self.client.delete_extra_routes(router_id)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('01f185d1-d1a6-4cf9-abf7-e0e1384c169c')
+ def test_network_attached_with_two_routers(self):
+ network = self.create_network(data_utils.rand_name('network1'))
+ self.create_subnet(network)
+ port1 = self.create_port(network)
+ port2 = self.create_port(network)
+ router1 = self._create_router(data_utils.rand_name('router1'))
+ router2 = self._create_router(data_utils.rand_name('router2'))
+ self.client.add_router_interface_with_port_id(
+ router1['id'], port1['id'])
+ self.client.add_router_interface_with_port_id(
+ router2['id'], port2['id'])
+ self.addCleanup(self.client.remove_router_interface_with_port_id,
+ router1['id'], port1['id'])
+ self.addCleanup(self.client.remove_router_interface_with_port_id,
+ router2['id'], port2['id'])
+ body = self.client.show_port(port1['id'])
+ port_show1 = body['port']
+ body = self.client.show_port(port2['id'])
+ port_show2 = body['port']
+ self.assertEqual(port_show1['network_id'], network['id'])
+ self.assertEqual(port_show2['network_id'], network['id'])
+ self.assertEqual(port_show1['device_id'], router1['id'])
+ self.assertEqual(port_show2['device_id'], router2['id'])
+
+
+class RoutersIpV6Test(RoutersTest):
+ _ip_version = 6
+
+
+class DvrRoutersTest(base.BaseRouterTest):
+
+ @classmethod
+ @test.requires_ext(extension="dvr", service="network")
+ def skip_checks(cls):
+ super(DvrRoutersTest, cls).skip_checks()
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('141297aa-3424-455d-aa8d-f2d95731e00a')
+ def test_create_distributed_router(self):
+ name = data_utils.rand_name('router')
+ create_body = self.admin_client.create_router(
+ name, distributed=True)
+ self.addCleanup(self._delete_router,
+ create_body['router']['id'],
+ self.admin_client)
+ self.assertTrue(create_body['router']['distributed'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('644d7a4a-01a1-4b68-bb8d-0c0042cb1729')
+ def test_convert_centralized_router(self):
+ router = self._create_router(data_utils.rand_name('router'))
+ self.assertNotIn('distributed', router)
+ update_body = self.admin_client.update_router(router['id'],
+ distributed=True)
+ self.assertTrue(update_body['router']['distributed'])
+ show_body = self.admin_client.show_router(router['id'])
+ self.assertTrue(show_body['router']['distributed'])
+ show_body = self.client.show_router(router['id'])
+ self.assertNotIn('distributed', show_body['router'])
diff --git a/neutron/tests/tempest/api/test_routers_negative.py b/neutron/tests/tempest/api/test_routers_negative.py
new file mode 100644
index 0000000..c5617a0
--- /dev/null
+++ b/neutron/tests/tempest/api/test_routers_negative.py
@@ -0,0 +1,43 @@
+# Copyright 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib.common.utils import data_utils
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+import testtools
+
+from neutron.tests.tempest.api import base_routers as base
+
+
+class DvrRoutersNegativeTest(base.BaseRouterTest):
+
+ @classmethod
+ @test.requires_ext(extension="dvr", service="network")
+ def skip_checks(cls):
+ super(DvrRoutersNegativeTest, cls).skip_checks()
+
+ @classmethod
+ def resource_setup(cls):
+ super(DvrRoutersNegativeTest, cls).resource_setup()
+ cls.router = cls.create_router(data_utils.rand_name('router'))
+ cls.network = cls.create_network()
+ cls.subnet = cls.create_subnet(cls.network)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('4990b055-8fc7-48ab-bba7-aa28beaad0b9')
+ def test_router_create_tenant_distributed_returns_forbidden(self):
+ with testtools.ExpectedException(lib_exc.Forbidden):
+ self.create_router(
+ data_utils.rand_name('router'), distributed=True)
diff --git a/neutron/tests/tempest/api/test_security_groups.py b/neutron/tests/tempest/api/test_security_groups.py
new file mode 100644
index 0000000..49cd1c2
--- /dev/null
+++ b/neutron/tests/tempest/api/test_security_groups.py
@@ -0,0 +1,56 @@
+# Copyright 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib.common.utils import data_utils
+from tempest import test
+
+from neutron.tests.tempest.api import base_security_groups as base
+
+
+class SecGroupTest(base.BaseSecGroupTest):
+
+ @classmethod
+ @test.requires_ext(extension="security-group", service="network")
+ def resource_setup(cls):
+ super(SecGroupTest, cls).resource_setup()
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('bfd128e5-3c92-44b6-9d66-7fe29d22c802')
+ def test_create_list_update_show_delete_security_group(self):
+ group_create_body, name = self._create_security_group()
+
+ # List security groups and verify if created group is there in response
+ list_body = self.client.list_security_groups()
+ secgroup_list = list()
+ for secgroup in list_body['security_groups']:
+ secgroup_list.append(secgroup['id'])
+ self.assertIn(group_create_body['security_group']['id'], secgroup_list)
+ # Update the security group
+ new_name = data_utils.rand_name('security-')
+ new_description = data_utils.rand_name('security-description')
+ update_body = self.client.update_security_group(
+ group_create_body['security_group']['id'],
+ name=new_name,
+ description=new_description)
+ # Verify if security group is updated
+ self.assertEqual(update_body['security_group']['name'], new_name)
+ self.assertEqual(update_body['security_group']['description'],
+ new_description)
+ # Show details of the updated security group
+ show_body = self.client.show_security_group(
+ group_create_body['security_group']['id'])
+ self.assertEqual(show_body['security_group']['name'], new_name)
+ self.assertEqual(show_body['security_group']['description'],
+ new_description)
diff --git a/neutron/tests/tempest/api/test_security_groups_negative.py b/neutron/tests/tempest/api/test_security_groups_negative.py
new file mode 100644
index 0000000..b06601d
--- /dev/null
+++ b/neutron/tests/tempest/api/test_security_groups_negative.py
@@ -0,0 +1,74 @@
+# Copyright 2013 OpenStack Foundation
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+
+from neutron.tests.tempest.api import base_security_groups as base
+from neutron.tests.tempest import config
+
+CONF = config.CONF
+
+
+class NegativeSecGroupTest(base.BaseSecGroupTest):
+
+ @classmethod
+ @test.requires_ext(extension="security-group", service="network")
+ def resource_setup(cls):
+ super(NegativeSecGroupTest, cls).resource_setup()
+
+ @test.attr(type=['negative', 'gate'])
+ @test.idempotent_id('0d9c7791-f2ad-4e2f-ac73-abf2373b0d2d')
+ def test_create_security_group_rule_with_invalid_ports(self):
+ group_create_body, _ = self._create_security_group()
+
+ # Create rule for tcp protocol with invalid ports
+ states = [(-16, 80, 'Invalid value for port -16'),
+ (80, 79, 'port_range_min must be <= port_range_max'),
+ (80, 65536, 'Invalid value for port 65536'),
+ (None, 6, 'port_range_min must be <= port_range_max'),
+ (-16, 65536, 'Invalid value for port')]
+ for pmin, pmax, msg in states:
+ ex = self.assertRaises(
+ lib_exc.BadRequest, self.client.create_security_group_rule,
+ security_group_id=group_create_body['security_group']['id'],
+ protocol='tcp', port_range_min=pmin, port_range_max=pmax,
+ direction='ingress', ethertype=self.ethertype)
+ self.assertIn(msg, str(ex))
+
+ # Create rule for icmp protocol with invalid ports
+ states = [(1, 256, 'Invalid value for ICMP code'),
+ (-1, 25, 'Invalid value'),
+ (None, 6, 'ICMP type (port-range-min) is missing'),
+ (300, 1, 'Invalid value for ICMP type')]
+ for pmin, pmax, msg in states:
+ ex = self.assertRaises(
+ lib_exc.BadRequest, self.client.create_security_group_rule,
+ security_group_id=group_create_body['security_group']['id'],
+ protocol='icmp', port_range_min=pmin, port_range_max=pmax,
+ direction='ingress', ethertype=self.ethertype)
+ self.assertIn(msg, str(ex))
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('55100aa8-b24f-333c-0bef-64eefd85f15c')
+ def test_update_default_security_group_name(self):
+ sg_list = self.client.list_security_groups(name='default')
+ sg = sg_list['security_groups'][0]
+ self.assertRaises(lib_exc.Conflict, self.client.update_security_group,
+ sg['id'], name='test')
+
+
+class NegativeSecGroupIPv6Test(NegativeSecGroupTest):
+ _ip_version = 6
diff --git a/neutron/tests/tempest/api/test_service_type_management.py b/neutron/tests/tempest/api/test_service_type_management.py
new file mode 100644
index 0000000..71c8f5c
--- /dev/null
+++ b/neutron/tests/tempest/api/test_service_type_management.py
@@ -0,0 +1,29 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest import test
+
+from neutron.tests.tempest.api import base
+
+
+class ServiceTypeManagementTest(base.BaseNetworkTest):
+
+ @classmethod
+ @test.requires_ext(extension="service-type", service="network")
+ def resource_setup(cls):
+ super(ServiceTypeManagementTest, cls).resource_setup()
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('2cbbeea9-f010-40f6-8df5-4eaa0c918ea6')
+ def test_service_provider_list(self):
+ body = self.client.list_service_providers()
+ self.assertIsInstance(body['service_providers'], list)
diff --git a/neutron/tests/tempest/api/test_subnetpools.py b/neutron/tests/tempest/api/test_subnetpools.py
new file mode 100644
index 0000000..a6c48b9
--- /dev/null
+++ b/neutron/tests/tempest/api/test_subnetpools.py
@@ -0,0 +1,357 @@
+# Copyright 2015 Hewlett-Packard Development Company, L.P.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib.common.utils import data_utils
+from tempest import test
+
+from neutron.tests.tempest.api import base
+
+SUBNETPOOL_NAME = 'smoke-subnetpool'
+SUBNET_NAME = 'smoke-subnet'
+
+
+class SubnetPoolsTestBase(base.BaseAdminNetworkTest):
+
+ @classmethod
+ def resource_setup(cls):
+ super(SubnetPoolsTestBase, cls).resource_setup()
+ min_prefixlen = '29'
+ prefixes = [u'10.11.12.0/24']
+ cls._subnetpool_data = {'prefixes': prefixes,
+ 'min_prefixlen': min_prefixlen}
+
+ def _create_subnetpool(self, is_admin=False, **kwargs):
+ if 'name' not in kwargs:
+ name = data_utils.rand_name(SUBNETPOOL_NAME)
+ else:
+ name = kwargs.pop('name')
+
+ if 'prefixes' not in kwargs:
+ kwargs['prefixes'] = self._subnetpool_data['prefixes']
+
+ if 'min_prefixlen' not in kwargs:
+ kwargs['min_prefixlen'] = self._subnetpool_data['min_prefixlen']
+
+ return self.create_subnetpool(name=name, is_admin=is_admin, **kwargs)
+
+
+class SubnetPoolsTest(SubnetPoolsTestBase):
+
+ min_prefixlen = '28'
+ max_prefixlen = '31'
+ _ip_version = 4
+ subnet_cidr = u'10.11.12.0/31'
+ new_prefix = u'10.11.15.0/24'
+ larger_prefix = u'10.11.0.0/16'
+
+ """
+ Tests the following operations in the Neutron API using the REST client for
+ Neutron:
+
+ create a subnetpool for a tenant
+ list tenant's subnetpools
+ show a tenant subnetpool details
+ subnetpool update
+ delete a subnetpool
+
+ All subnetpool tests are run once with ipv4 and once with ipv6.
+
+ v2.0 of the Neutron API is assumed.
+
+ """
+
+ def _new_subnetpool_attributes(self):
+ new_name = data_utils.rand_name(SUBNETPOOL_NAME)
+ return {'name': new_name, 'min_prefixlen': self.min_prefixlen,
+ 'max_prefixlen': self.max_prefixlen}
+
+ def _check_equality_updated_subnetpool(self, expected_values,
+ updated_pool):
+ self.assertEqual(expected_values['name'],
+ updated_pool['name'])
+ self.assertEqual(expected_values['min_prefixlen'],
+ updated_pool['min_prefixlen'])
+ self.assertEqual(expected_values['max_prefixlen'],
+ updated_pool['max_prefixlen'])
+ # expected_values may not contains all subnetpool values
+ if 'prefixes' in expected_values:
+ self.assertEqual(expected_values['prefixes'],
+ updated_pool['prefixes'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('6e1781ec-b45b-4042-aebe-f485c022996e')
+ def test_create_list_subnetpool(self):
+ created_subnetpool = self._create_subnetpool()
+ body = self.client.list_subnetpools()
+ subnetpools = body['subnetpools']
+ self.assertIn(created_subnetpool['id'],
+ [sp['id'] for sp in subnetpools],
+ "Created subnetpool id should be in the list")
+ self.assertIn(created_subnetpool['name'],
+ [sp['name'] for sp in subnetpools],
+ "Created subnetpool name should be in the list")
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('c72c1c0c-2193-4aca-ddd4-b1442640bbbb')
+ @test.requires_ext(extension="standard-attr-description",
+ service="network")
+ def test_create_update_subnetpool_description(self):
+ body = self._create_subnetpool(description='d1')
+ self.assertEqual('d1', body['description'])
+ sub_id = body['id']
+ body = filter(lambda x: x['id'] == sub_id,
+ self.client.list_subnetpools()['subnetpools'])[0]
+ self.assertEqual('d1', body['description'])
+ body = self.client.update_subnetpool(sub_id, description='d2')
+ self.assertEqual('d2', body['subnetpool']['description'])
+ body = filter(lambda x: x['id'] == sub_id,
+ self.client.list_subnetpools()['subnetpools'])[0]
+ self.assertEqual('d2', body['description'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('741d08c2-1e3f-42be-99c7-0ea93c5b728c')
+ def test_get_subnetpool(self):
+ created_subnetpool = self._create_subnetpool()
+ prefixlen = self._subnetpool_data['min_prefixlen']
+ body = self.client.show_subnetpool(created_subnetpool['id'])
+ subnetpool = body['subnetpool']
+ self.assertEqual(created_subnetpool['name'], subnetpool['name'])
+ self.assertEqual(created_subnetpool['id'], subnetpool['id'])
+ self.assertEqual(prefixlen, subnetpool['min_prefixlen'])
+ self.assertEqual(prefixlen, subnetpool['default_prefixlen'])
+ self.assertFalse(subnetpool['shared'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('764f1b93-1c4a-4513-9e7b-6c2fc5e9270c')
+ def test_tenant_update_subnetpool(self):
+ created_subnetpool = self._create_subnetpool()
+ pool_id = created_subnetpool['id']
+ subnetpool_data = self._new_subnetpool_attributes()
+ self.client.update_subnetpool(created_subnetpool['id'],
+ **subnetpool_data)
+
+ body = self.client.show_subnetpool(pool_id)
+ subnetpool = body['subnetpool']
+ self._check_equality_updated_subnetpool(subnetpool_data,
+ subnetpool)
+ self.assertFalse(subnetpool['shared'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('4b496082-c992-4319-90be-d4a7ce646290')
+ def test_update_subnetpool_prefixes_append(self):
+ # We can append new prefixes to subnetpool
+ create_subnetpool = self._create_subnetpool()
+ pool_id = create_subnetpool['id']
+ old_prefixes = self._subnetpool_data['prefixes']
+ new_prefixes = old_prefixes[:]
+ new_prefixes.append(self.new_prefix)
+ subnetpool_data = {'prefixes': new_prefixes}
+ self.client.update_subnetpool(pool_id, **subnetpool_data)
+ body = self.client.show_subnetpool(pool_id)
+ prefixes = body['subnetpool']['prefixes']
+ self.assertIn(self.new_prefix, prefixes)
+ self.assertIn(old_prefixes[0], prefixes)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('2cae5d6a-9d32-42d8-8067-f13970ae13bb')
+ def test_update_subnetpool_prefixes_extend(self):
+ # We can extend current subnetpool prefixes
+ created_subnetpool = self._create_subnetpool()
+ pool_id = created_subnetpool['id']
+ old_prefixes = self._subnetpool_data['prefixes']
+ subnetpool_data = {'prefixes': [self.larger_prefix]}
+ self.client.update_subnetpool(pool_id, **subnetpool_data)
+ body = self.client.show_subnetpool(pool_id)
+ prefixes = body['subnetpool']['prefixes']
+ self.assertIn(self.larger_prefix, prefixes)
+ self.assertNotIn(old_prefixes[0], prefixes)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('d70c6c35-913b-4f24-909f-14cd0d29b2d2')
+ def test_admin_create_shared_subnetpool(self):
+ created_subnetpool = self._create_subnetpool(is_admin=True,
+ shared=True)
+ pool_id = created_subnetpool['id']
+ # Shared subnetpool can be retrieved by tenant user.
+ body = self.client.show_subnetpool(pool_id)
+ subnetpool = body['subnetpool']
+ self.assertEqual(created_subnetpool['name'], subnetpool['name'])
+ self.assertTrue(subnetpool['shared'])
+
+ def _create_subnet_from_pool(self, subnet_values=None, pool_values=None):
+ if pool_values is None:
+ pool_values = {}
+
+ created_subnetpool = self._create_subnetpool(**pool_values)
+ pool_id = created_subnetpool['id']
+ subnet_name = data_utils.rand_name(SUBNETPOOL_NAME)
+ network = self.create_network()
+ subnet_kwargs = {'name': subnet_name,
+ 'subnetpool_id': pool_id}
+ if subnet_values:
+ subnet_kwargs.update(subnet_values)
+ # not creating the subnet using the base.create_subnet because
+ # that function needs to be enhanced to support subnet_create when
+ # prefixlen and subnetpool_id is specified.
+ body = self.client.create_subnet(
+ network_id=network['id'],
+ ip_version=self._ip_version,
+ **subnet_kwargs)
+ subnet = body['subnet']
+ return pool_id, subnet
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('1362ed7d-3089-42eb-b3a5-d6cb8398ee77')
+ def test_create_subnet_from_pool_with_prefixlen(self):
+ subnet_values = {"prefixlen": self.max_prefixlen}
+ pool_id, subnet = self._create_subnet_from_pool(
+ subnet_values=subnet_values)
+ cidr = str(subnet['cidr'])
+ self.assertEqual(pool_id, subnet['subnetpool_id'])
+ self.assertTrue(cidr.endswith(str(self.max_prefixlen)))
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('86b86189-9789-4582-9c3b-7e2bfe5735ee')
+ def test_create_subnet_from_pool_with_subnet_cidr(self):
+ subnet_values = {"cidr": self.subnet_cidr}
+ pool_id, subnet = self._create_subnet_from_pool(
+ subnet_values=subnet_values)
+ cidr = str(subnet['cidr'])
+ self.assertEqual(pool_id, subnet['subnetpool_id'])
+ self.assertEqual(cidr, self.subnet_cidr)
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('83f76e3a-9c40-40c2-a015-b7c5242178d8')
+ def test_create_subnet_from_pool_with_default_prefixlen(self):
+ # If neither cidr nor prefixlen is specified,
+ # subnet will use subnetpool default_prefixlen for cidr.
+ pool_id, subnet = self._create_subnet_from_pool()
+ cidr = str(subnet['cidr'])
+ self.assertEqual(pool_id, subnet['subnetpool_id'])
+ prefixlen = self._subnetpool_data['min_prefixlen']
+ self.assertTrue(cidr.endswith(str(prefixlen)))
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('a64af292-ec52-4bde-b654-a6984acaf477')
+ def test_create_subnet_from_pool_with_quota(self):
+ pool_values = {'default_quota': 4}
+ subnet_values = {"prefixlen": self.max_prefixlen}
+ pool_id, subnet = self._create_subnet_from_pool(
+ subnet_values=subnet_values, pool_values=pool_values)
+ cidr = str(subnet['cidr'])
+ self.assertEqual(pool_id, subnet['subnetpool_id'])
+ self.assertTrue(cidr.endswith(str(self.max_prefixlen)))
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('49b44c64-1619-4b29-b527-ffc3c3115dc4')
+ @test.requires_ext(extension='address-scope', service='network')
+ def test_create_subnetpool_associate_address_scope(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'),
+ ip_version=self._ip_version)
+ created_subnetpool = self._create_subnetpool(
+ address_scope_id=address_scope['id'])
+ body = self.client.show_subnetpool(created_subnetpool['id'])
+ self.assertEqual(address_scope['id'],
+ body['subnetpool']['address_scope_id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('910b6393-db24-4f6f-87dc-b36892ad6c8c')
+ @test.requires_ext(extension='address-scope', service='network')
+ def test_update_subnetpool_associate_address_scope(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'),
+ ip_version=self._ip_version)
+ created_subnetpool = self._create_subnetpool()
+ pool_id = created_subnetpool['id']
+ body = self.client.show_subnetpool(pool_id)
+ self.assertIsNone(body['subnetpool']['address_scope_id'])
+ self.client.update_subnetpool(pool_id,
+ address_scope_id=address_scope['id'])
+ body = self.client.show_subnetpool(pool_id)
+ self.assertEqual(address_scope['id'],
+ body['subnetpool']['address_scope_id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('18302e80-46a3-4563-82ac-ccd1dd57f652')
+ @test.requires_ext(extension='address-scope', service='network')
+ def test_update_subnetpool_associate_another_address_scope(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'),
+ ip_version=self._ip_version)
+ another_address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'),
+ ip_version=self._ip_version)
+ created_subnetpool = self._create_subnetpool(
+ address_scope_id=address_scope['id'])
+ pool_id = created_subnetpool['id']
+ body = self.client.show_subnetpool(pool_id)
+ self.assertEqual(address_scope['id'],
+ body['subnetpool']['address_scope_id'])
+ self.client.update_subnetpool(
+ pool_id, address_scope_id=another_address_scope['id'])
+ body = self.client.show_subnetpool(pool_id)
+ self.assertEqual(another_address_scope['id'],
+ body['subnetpool']['address_scope_id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('f8970048-e41b-42d6-934b-a1297b07706a')
+ @test.requires_ext(extension='address-scope', service='network')
+ def test_update_subnetpool_disassociate_address_scope(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'),
+ ip_version=self._ip_version)
+ created_subnetpool = self._create_subnetpool(
+ address_scope_id=address_scope['id'])
+ pool_id = created_subnetpool['id']
+ body = self.client.show_subnetpool(pool_id)
+ self.assertEqual(address_scope['id'],
+ body['subnetpool']['address_scope_id'])
+ self.client.update_subnetpool(pool_id,
+ address_scope_id=None)
+ body = self.client.show_subnetpool(pool_id)
+ self.assertIsNone(body['subnetpool']['address_scope_id'])
+
+
+class SubnetPoolsTestV6(SubnetPoolsTest):
+
+ min_prefixlen = '48'
+ max_prefixlen = '64'
+ _ip_version = 6
+ subnet_cidr = '2001:db8:3::/64'
+ new_prefix = u'2001:db8:5::/64'
+ larger_prefix = u'2001:db8::/32'
+
+ @classmethod
+ def resource_setup(cls):
+ super(SubnetPoolsTestV6, cls).resource_setup()
+ min_prefixlen = '64'
+ prefixes = [u'2001:db8:3::/48']
+ cls._subnetpool_data = {'min_prefixlen': min_prefixlen,
+ 'prefixes': prefixes}
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('f62d73dc-cf6f-4879-b94b-dab53982bf3b')
+ def test_create_dual_stack_subnets_from_subnetpools(self):
+ pool_id_v6, subnet_v6 = self._create_subnet_from_pool()
+ pool_values_v4 = {'prefixes': ['192.168.0.0/16'],
+ 'min_prefixlen': 21,
+ 'max_prefixlen': 32}
+ create_v4_subnetpool = self._create_subnetpool(**pool_values_v4)
+ pool_id_v4 = create_v4_subnetpool['id']
+ subnet_v4 = self.client.create_subnet(
+ network_id=subnet_v6['network_id'], ip_version=4,
+ subnetpool_id=pool_id_v4)['subnet']
+ self.assertEqual(subnet_v4['network_id'], subnet_v6['network_id'])
diff --git a/neutron/tests/tempest/api/test_subnetpools_negative.py b/neutron/tests/tempest/api/test_subnetpools_negative.py
new file mode 100644
index 0000000..713ef32
--- /dev/null
+++ b/neutron/tests/tempest/api/test_subnetpools_negative.py
@@ -0,0 +1,268 @@
+# Copyright 2015 Hewlett-Packard Development Company, L.P.
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import uuid
+
+import netaddr
+from tempest.lib.common.utils import data_utils
+from tempest.lib import exceptions as lib_exc
+from tempest import test
+
+from neutron.tests.tempest.api import test_subnetpools
+
+
+SUBNETPOOL_NAME = 'smoke-subnetpool'
+
+
+class SubnetPoolsNegativeTestJSON(test_subnetpools.SubnetPoolsTestBase):
+
+ smaller_prefix = u'10.11.12.0/26'
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('0212a042-603a-4f46-99e0-e37de9374d30')
+ def test_get_non_existent_subnetpool(self):
+ non_exist_id = data_utils.rand_name('subnetpool')
+ self.assertRaises(lib_exc.NotFound, self.client.show_subnetpool,
+ non_exist_id)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('dc9336e5-f28f-4658-a0b0-cc79e607007d')
+ def test_tenant_get_not_shared_admin_subnetpool(self):
+ created_subnetpool = self._create_subnetpool(is_admin=True)
+ # None-shared admin subnetpool cannot be retrieved by tenant user.
+ self.assertRaises(lib_exc.NotFound, self.client.show_subnetpool,
+ created_subnetpool['id'])
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('5e1f2f86-d81a-498c-82ed-32a49f4dc4d3')
+ def test_delete_non_existent_subnetpool(self):
+ non_exist_id = data_utils.rand_name('subnetpool')
+ self.assertRaises(lib_exc.NotFound, self.client.delete_subnetpool,
+ non_exist_id)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('d1143fe2-212b-4e23-a308-d18f7d8d78d6')
+ def test_tenant_create_shared_subnetpool(self):
+ # 'shared' subnetpool can only be created by admin.
+ self.assertRaises(lib_exc.Forbidden, self._create_subnetpool,
+ is_admin=False, shared=True)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('4be84d30-60ca-4bd3-8512-db5b36ce1378')
+ def test_update_non_existent_subnetpool(self):
+ non_exist_id = data_utils.rand_name('subnetpool')
+ self.assertRaises(lib_exc.NotFound, self.client.update_subnetpool,
+ non_exist_id, name='foo-name')
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('e6cd6d87-6173-45dd-bf04-c18ea7ec7537')
+ def test_update_subnetpool_not_modifiable_shared(self):
+ # 'shared' attributes can be specified during creation.
+ # But this attribute is not modifiable after creation.
+ created_subnetpool = self._create_subnetpool(is_admin=True)
+ pool_id = created_subnetpool['id']
+ self.assertRaises(lib_exc.BadRequest, self.client.update_subnetpool,
+ pool_id, shared=True)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('62f7c43b-bff1-4def-8bb7-4754b840aaad')
+ def test_update_subnetpool_prefixes_shrink(self):
+ # Shrink current subnetpool prefixes is not supported
+ created_subnetpool = self._create_subnetpool()
+ self.assertRaises(lib_exc.BadRequest,
+ self.client.update_subnetpool,
+ created_subnetpool['id'],
+ prefixes=[self.smaller_prefix])
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('fc011824-153e-4469-97ad-9808eb88cae1')
+ def test_create_subnet_different_pools_same_network(self):
+ network = self.create_network(network_name='smoke-network')
+ created_subnetpool = self._create_subnetpool(
+ is_admin=True, prefixes=['192.168.0.0/16'])
+ subnet = self.create_subnet(
+ network, cidr=netaddr.IPNetwork('10.10.10.0/24'), ip_version=4,
+ gateway=None, client=self.admin_client)
+ # add the subnet created by admin to the cleanUp because
+ # the base.py doesn't delete it using the admin client
+ self.addCleanup(self.admin_client.delete_subnet, subnet['id'])
+ self.assertRaises(lib_exc.BadRequest, self.create_subnet, network,
+ ip_version=4,
+ subnetpool_id=created_subnetpool['id'],
+ client=self.admin_client)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('9589e332-638e-476e-81bd-013d964aa3cb')
+ @test.requires_ext(extension='address-scope', service='network')
+ def test_create_subnetpool_associate_invalid_address_scope(self):
+ self.assertRaises(lib_exc.BadRequest, self._create_subnetpool,
+ address_scope_id='foo-addr-scope')
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('3b6c5942-485d-4964-a560-55608af020b5')
+ @test.requires_ext(extension='address-scope', service='network')
+ def test_create_subnetpool_associate_non_exist_address_scope(self):
+ self.assertRaises(lib_exc.NotFound, self._create_subnetpool,
+ address_scope_id=str(uuid.uuid4()))
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('2dfb4269-8657-485a-a053-b022e911456e')
+ @test.requires_ext(extension='address-scope', service='network')
+ def test_create_subnetpool_associate_address_scope_prefix_intersect(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'),
+ ip_version=4)
+ addr_scope_id = address_scope['id']
+ self._create_subnetpool(address_scope_id=addr_scope_id)
+ subnetpool_data = {'name': 'foo-subnetpool',
+ 'prefixes': [u'10.11.12.13/24'],
+ 'min_prefixlen': '29',
+ 'address_scope_id': addr_scope_id}
+ self.assertRaises(lib_exc.Conflict, self._create_subnetpool,
+ **subnetpool_data)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('83a19a13-5384-42e2-b579-43fc69c80914')
+ @test.requires_ext(extension='address-scope', service='network')
+ def test_create_sp_associate_address_scope_multiple_prefix_intersect(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'),
+ ip_version=4)
+ addr_scope_id = address_scope['id']
+ self._create_subnetpool(prefixes=[u'20.0.0.0/18', u'30.0.0.0/18'],
+ address_scope_id=addr_scope_id)
+ prefixes = [u'40.0.0.0/18', u'50.0.0.0/18', u'30.0.0.0/12']
+ subnetpool_data = {'name': 'foo-subnetpool',
+ 'prefixes': prefixes,
+ 'min_prefixlen': '29',
+ 'address_scope_id': addr_scope_id}
+ self.assertRaises(lib_exc.Conflict, self._create_subnetpool,
+ **subnetpool_data)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('f06d8e7b-908b-4e94-b570-8156be6a4bf1')
+ @test.requires_ext(extension='address-scope', service='network')
+ def test_create_subnetpool_associate_address_scope_of_other_owner(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'), is_admin=True,
+ ip_version=4)
+ self.assertRaises(lib_exc.NotFound, self._create_subnetpool,
+ address_scope_id=address_scope['id'])
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('3396ec6c-cb80-4ebe-b897-84e904580bdf')
+ @test.requires_ext(extension='address-scope', service='network')
+ def test_tenant_create_subnetpool_associate_shared_address_scope(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'), is_admin=True,
+ shared=True, ip_version=4)
+ self.assertRaises(lib_exc.BadRequest, self._create_subnetpool,
+ address_scope_id=address_scope['id'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('6d3d9ad5-32d4-4d63-aa00-8c62f73e2881')
+ @test.requires_ext(extension='address-scope', service='network')
+ def test_update_subnetpool_associate_address_scope_of_other_owner(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'), is_admin=True,
+ ip_version=4)
+ address_scope_id = address_scope['id']
+ created_subnetpool = self._create_subnetpool(self.client)
+ self.assertRaises(lib_exc.NotFound, self.client.update_subnetpool,
+ created_subnetpool['id'],
+ address_scope_id=address_scope_id)
+
+ def _test_update_subnetpool_prefix_intersect_helper(
+ self, pool_1_prefixes, pool_2_prefixes, pool_1_updated_prefixes):
+ # create two subnet pools associating to an address scope.
+ # Updating the first subnet pool with the prefix intersecting
+ # with the second one should be a failure
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'), ip_version=4)
+ addr_scope_id = address_scope['id']
+ pool_values = {'address_scope_id': addr_scope_id,
+ 'prefixes': pool_1_prefixes}
+ created_subnetpool_1 = self._create_subnetpool(**pool_values)
+ pool_id_1 = created_subnetpool_1['id']
+ pool_values = {'address_scope_id': addr_scope_id,
+ 'prefixes': pool_2_prefixes}
+ self._create_subnetpool(**pool_values)
+ # now update the pool_id_1 with the prefix intersecting with
+ # pool_id_2
+ self.assertRaises(lib_exc.Conflict, self.client.update_subnetpool,
+ pool_id_1, prefixes=pool_1_updated_prefixes)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('96006292-7214-40e0-a471-153fb76e6b31')
+ @test.requires_ext(extension='address-scope', service='network')
+ def test_update_subnetpool_prefix_intersect(self):
+ pool_1_prefix = [u'20.0.0.0/18']
+ pool_2_prefix = [u'20.10.0.0/24']
+ pool_1_updated_prefix = [u'20.0.0.0/12']
+ self._test_update_subnetpool_prefix_intersect_helper(
+ pool_1_prefix, pool_2_prefix, pool_1_updated_prefix)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('4d3f8a79-c530-4e59-9acf-6c05968adbfe')
+ @test.requires_ext(extension='address-scope', service='network')
+ def test_update_subnetpool_multiple_prefix_intersect(self):
+ pool_1_prefixes = [u'20.0.0.0/18', u'30.0.0.0/18']
+ pool_2_prefixes = [u'20.10.0.0/24', u'40.0.0.0/18', '50.0.0.0/18']
+ pool_1_updated_prefixes = [u'20.0.0.0/18', u'30.0.0.0/18',
+ u'50.0.0.0/12']
+ self._test_update_subnetpool_prefix_intersect_helper(
+ pool_1_prefixes, pool_2_prefixes, pool_1_updated_prefixes)
+
+ @test.attr(type=['negative', 'smoke'])
+ @test.idempotent_id('7438e49e-1351-45d8-937b-892059fb97f5')
+ @test.requires_ext(extension='address-scope', service='network')
+ def test_tenant_update_sp_prefix_associated_with_shared_addr_scope(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'), is_admin=True,
+ shared=True, ip_version=4)
+ addr_scope_id = address_scope['id']
+ pool_values = {'prefixes': [u'20.0.0.0/18', u'30.0.0.0/18']}
+
+ created_subnetpool = self._create_subnetpool(**pool_values)
+ pool_id = created_subnetpool['id']
+ # associate the subnetpool to the address scope as an admin
+ self.admin_client.update_subnetpool(pool_id,
+ address_scope_id=addr_scope_id)
+ body = self.admin_client.show_subnetpool(pool_id)
+ self.assertEqual(addr_scope_id,
+ body['subnetpool']['address_scope_id'])
+
+ # updating the subnetpool prefix by the tenant user should fail
+ # since the tenant is not the owner of address scope
+ update_prefixes = [u'20.0.0.0/18', u'30.0.0.0/18', u'40.0.0.0/18']
+ self.assertRaises(lib_exc.BadRequest, self.client.update_subnetpool,
+ pool_id, prefixes=update_prefixes)
+
+ # admin can update the prefixes
+ self.admin_client.update_subnetpool(pool_id, prefixes=update_prefixes)
+ body = self.admin_client.show_subnetpool(pool_id)
+ self.assertEqual(update_prefixes,
+ body['subnetpool']['prefixes'])
+
+ @test.attr(type='smoke')
+ @test.idempotent_id('648fee7d-a909-4ced-bad3-3a169444c0a8')
+ def test_update_subnetpool_associate_address_scope_wrong_ip_version(self):
+ address_scope = self.create_address_scope(
+ name=data_utils.rand_name('smoke-address-scope'),
+ ip_version=6)
+ created_subnetpool = self._create_subnetpool()
+ self.assertRaises(lib_exc.BadRequest, self.client.update_subnetpool,
+ created_subnetpool['id'],
+ address_scope_id=address_scope['id'])
diff --git a/neutron/tests/tempest/api/test_timestamp.py b/neutron/tests/tempest/api/test_timestamp.py
new file mode 100644
index 0000000..6c36cf6
--- /dev/null
+++ b/neutron/tests/tempest/api/test_timestamp.py
@@ -0,0 +1,176 @@
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import copy
+
+from tempest.lib.common.utils import data_utils
+from tempest import test
+
+from neutron.tests.tempest.api import base
+
+
+class TestTimeStamp(base.BaseAdminNetworkTest):
+
+ ## attributes for subnetpool
+ min_prefixlen = '28'
+ max_prefixlen = '31'
+ _ip_version = 4
+ subnet_cidr = '10.11.12.0/31'
+ new_prefix = '10.11.15.0/24'
+ larger_prefix = '10.11.0.0/16'
+
+ @classmethod
+ @test.requires_ext(extension="timestamp_core", service="network")
+ def skip_checks(cls):
+ super(TestTimeStamp, cls).skip_checks()
+
+ @classmethod
+ def resource_setup(cls):
+ super(TestTimeStamp, cls).resource_setup()
+ prefixes = ['10.11.12.0/24']
+ cls._subnetpool_data = {'min_prefixlen': '29', 'prefixes': prefixes}
+
+ def _create_subnetpool(self, is_admin=False, **kwargs):
+ name = data_utils.rand_name('subnetpool-')
+ subnetpool_data = copy.deepcopy(self._subnetpool_data)
+ for key in subnetpool_data.keys():
+ kwargs[key] = subnetpool_data[key]
+ return self.create_subnetpool(name=name, is_admin=is_admin, **kwargs)
+
+ @test.idempotent_id('462be770-b310-4df9-9c42-773217e4c8b1')
+ def test_create_network_with_timestamp(self):
+ network = self.create_network()
+ # Verifies body contains timestamp fields
+ self.assertIsNotNone(network['created_at'])
+ self.assertIsNotNone(network['updated_at'])
+
+ @test.idempotent_id('4db5417a-e11c-474d-a361-af00ebef57c5')
+ def test_update_network_with_timestamp(self):
+ network = self.create_network()
+ origin_updated_at = network['updated_at']
+ update_body = {'name': network['name'] + 'new'}
+ body = self.admin_client.update_network(network['id'],
+ **update_body)
+ updated_network = body['network']
+ new_updated_at = updated_network['updated_at']
+ self.assertEqual(network['created_at'], updated_network['created_at'])
+ # Verify that origin_updated_at is not same with new_updated_at
+ self.assertIsNot(origin_updated_at, new_updated_at)
+
+ @test.idempotent_id('2ac50ab2-7ebd-4e27-b3ce-a9e399faaea2')
+ def test_show_networks_attribute_with_timestamp(self):
+ network = self.create_network()
+ body = self.client.show_network(network['id'])
+ show_net = body['network']
+ # verify the timestamp from creation and showed is same
+ self.assertEqual(network['created_at'],
+ show_net['created_at'])
+ self.assertEqual(network['updated_at'],
+ show_net['updated_at'])
+
+ @test.idempotent_id('8ee55186-454f-4b97-9f9f-eb2772ee891c')
+ def test_create_subnet_with_timestamp(self):
+ network = self.create_network()
+ subnet = self.create_subnet(network)
+ # Verifies body contains timestamp fields
+ self.assertIsNotNone(subnet['created_at'])
+ self.assertIsNotNone(subnet['updated_at'])
+
+ @test.idempotent_id('a490215a-6f4c-4af9-9a4c-57c41f1c4fa1')
+ def test_update_subnet_with_timestamp(self):
+ network = self.create_network()
+ subnet = self.create_subnet(network)
+ origin_updated_at = subnet['updated_at']
+ update_body = {'name': subnet['name'] + 'new'}
+ body = self.admin_client.update_subnet(subnet['id'],
+ **update_body)
+ updated_subnet = body['subnet']
+ new_updated_at = updated_subnet['updated_at']
+ self.assertEqual(subnet['created_at'], updated_subnet['created_at'])
+ # Verify that origin_updated_at is not same with new_updated_at
+ self.assertIsNot(origin_updated_at, new_updated_at)
+
+ @test.idempotent_id('1836a086-e7cf-4141-bf57-0cfe79e8051e')
+ def test_show_subnet_attribute_with_timestamp(self):
+ network = self.create_network()
+ subnet = self.create_subnet(network)
+ body = self.client.show_subnet(subnet['id'])
+ show_subnet = body['subnet']
+ # verify the timestamp from creation and showed is same
+ self.assertEqual(subnet['created_at'],
+ show_subnet['created_at'])
+ self.assertEqual(subnet['updated_at'],
+ show_subnet['updated_at'])
+
+ @test.idempotent_id('e2450a7b-d84f-4600-a093-45e78597bbac')
+ def test_create_port_with_timestamp(self):
+ network = self.create_network()
+ port = self.create_port(network)
+ # Verifies body contains timestamp fields
+ self.assertIsNotNone(port['created_at'])
+ self.assertIsNotNone(port['updated_at'])
+
+ @test.idempotent_id('4241e0d3-54b4-46ce-a9a7-093fc764161b')
+ def test_update_port_with_timestamp(self):
+ network = self.create_network()
+ port = self.create_port(network)
+ origin_updated_at = port['updated_at']
+ update_body = {'name': port['name'] + 'new'}
+ body = self.admin_client.update_port(port['id'],
+ **update_body)
+ updated_port = body['port']
+ new_updated_at = updated_port['updated_at']
+ self.assertEqual(port['created_at'], updated_port['created_at'])
+ # Verify that origin_updated_at is not same with new_updated_at
+ self.assertIsNot(origin_updated_at, new_updated_at)
+
+ @test.idempotent_id('584c6723-40b6-4f26-81dd-f508f9d9fb51')
+ def test_show_port_attribute_with_timestamp(self):
+ network = self.create_network()
+ port = self.create_port(network)
+ body = self.client.show_port(port['id'])
+ show_port = body['port']
+ # verify the timestamp from creation and showed is same
+ self.assertEqual(port['created_at'],
+ show_port['created_at'])
+ self.assertEqual(port['updated_at'],
+ show_port['updated_at'])
+
+ @test.idempotent_id('87a8b196-4b90-44f0-b7f3-d2057d7d658e')
+ def test_create_subnetpool_with_timestamp(self):
+ sp = self._create_subnetpool()
+ # Verifies body contains timestamp fields
+ self.assertIsNotNone(sp['created_at'])
+ self.assertIsNotNone(sp['updated_at'])
+
+ @test.idempotent_id('d48c7578-c3d2-4f9b-a7a1-be2008c770a0')
+ def test_update_subnetpool_with_timestamp(self):
+ sp = self._create_subnetpool()
+ origin_updated_at = sp['updated_at']
+ update_body = {'name': sp['name'] + 'new',
+ 'min_prefixlen': self.min_prefixlen,
+ 'max_prefixlen': self.max_prefixlen}
+ body = self.client.update_subnetpool(sp['id'], **update_body)
+ updated_sp = body['subnetpool']
+ new_updated_at = updated_sp['updated_at']
+ self.assertEqual(sp['created_at'], updated_sp['created_at'])
+ # Verify that origin_updated_at is not same with new_updated_at
+ self.assertIsNot(origin_updated_at, new_updated_at)
+
+ @test.idempotent_id('1d3970e6-bcf7-46cd-b7d7-0807759c73b4')
+ def test_show_subnetpool_attribute_with_timestamp(self):
+ sp = self._create_subnetpool()
+ body = self.client.show_subnetpool(sp['id'])
+ show_sp = body['subnetpool']
+ # verify the timestamp from creation and showed is same
+ self.assertEqual(sp['created_at'], show_sp['created_at'])
+ self.assertEqual(sp['updated_at'], show_sp['updated_at'])