Move API tests to neutron.test.api
To make api test development simpler, move the tests to
neutron.tests.api. The neutron.tests.tempest subtree will remain
while work continues to transition the required functionality to
tempest-lib.
Change-Id: Ie90671fbfe2f633e851da82728e152482133fd87
diff --git a/neutron/tests/tempest/README.rst b/neutron/tests/tempest/README.rst
index 6f50970..5b3600a 100644
--- a/neutron/tests/tempest/README.rst
+++ b/neutron/tests/tempest/README.rst
@@ -1,10 +1,10 @@
WARNING
=======
-The files under this path are maintained automatically by the script
-tools/copy_api_tests_from_tempest.sh. It's contents should not be
-manually modified until further notice.
-
-Note that neutron.tests.tempest.config uses the global cfg.CONF
-instance for now and importing it outside of the api tests has the
-potential to break Neutron's use of cfg.CONF.
+The files under this path were copied from tempest as part of the move
+of the api tests, and they will be removed as the required
+functionality is transitioned from tempest to tempest-lib. While it
+exists, only neutron.tests.api and neutron.tests.retargetable should
+be importing files from this path. neutron.tests.tempest.config uses
+the global cfg.CONF instance and importing it outside of the api tests
+has the potential to break Neutron's use of cfg.CONF.
diff --git a/neutron/tests/tempest/api/__init__.py b/neutron/tests/tempest/api/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/neutron/tests/tempest/api/__init__.py
+++ /dev/null
diff --git a/neutron/tests/tempest/api/network/__init__.py b/neutron/tests/tempest/api/network/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/neutron/tests/tempest/api/network/__init__.py
+++ /dev/null
diff --git a/neutron/tests/tempest/api/network/admin/__init__.py b/neutron/tests/tempest/api/network/admin/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/neutron/tests/tempest/api/network/admin/__init__.py
+++ /dev/null
diff --git a/neutron/tests/tempest/api/network/admin/test_agent_management.py b/neutron/tests/tempest/api/network/admin/test_agent_management.py
deleted file mode 100644
index d875515..0000000
--- a/neutron/tests/tempest/api/network/admin/test_agent_management.py
+++ /dev/null
@@ -1,89 +0,0 @@
-# Copyright 2013 IBM Corp.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest.common import tempest_fixtures as fixtures
-from neutron.tests.tempest import test
-
-
-class AgentManagementTestJSON(base.BaseAdminNetworkTest):
-
- @classmethod
- def resource_setup(cls):
- super(AgentManagementTestJSON, cls).resource_setup()
- if not test.is_extension_enabled('agent', 'network'):
- msg = "agent extension not enabled."
- raise cls.skipException(msg)
- body = cls.admin_client.list_agents()
- agents = body['agents']
- cls.agent = agents[0]
-
- @test.attr(type='smoke')
- @test.idempotent_id('9c80f04d-11f3-44a4-8738-ed2f879b0ff4')
- def test_list_agent(self):
- body = self.admin_client.list_agents()
- agents = body['agents']
- # Hearthbeats must be excluded from comparison
- self.agent.pop('heartbeat_timestamp', None)
- self.agent.pop('configurations', None)
- for agent in agents:
- agent.pop('heartbeat_timestamp', None)
- agent.pop('configurations', None)
- self.assertIn(self.agent, agents)
-
- @test.attr(type=['smoke'])
- @test.idempotent_id('e335be47-b9a1-46fd-be30-0874c0b751e6')
- def test_list_agents_non_admin(self):
- body = self.client.list_agents()
- self.assertEqual(len(body["agents"]), 0)
-
- @test.attr(type='smoke')
- @test.idempotent_id('869bc8e8-0fda-4a30-9b71-f8a7cf58ca9f')
- def test_show_agent(self):
- body = self.admin_client.show_agent(self.agent['id'])
- agent = body['agent']
- self.assertEqual(agent['id'], self.agent['id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('371dfc5b-55b9-4cb5-ac82-c40eadaac941')
- def test_update_agent_status(self):
- origin_status = self.agent['admin_state_up']
- # Try to update the 'admin_state_up' to the original
- # one to avoid the negative effect.
- agent_status = {'admin_state_up': origin_status}
- body = self.admin_client.update_agent(agent_id=self.agent['id'],
- agent_info=agent_status)
- updated_status = body['agent']['admin_state_up']
- self.assertEqual(origin_status, updated_status)
-
- @test.attr(type='smoke')
- @test.idempotent_id('68a94a14-1243-46e6-83bf-157627e31556')
- def test_update_agent_description(self):
- self.useFixture(fixtures.LockFixture('agent_description'))
- description = 'description for update agent.'
- agent_description = {'description': description}
- body = self.admin_client.update_agent(agent_id=self.agent['id'],
- agent_info=agent_description)
- self.addCleanup(self._restore_agent)
- updated_description = body['agent']['description']
- self.assertEqual(updated_description, description)
-
- def _restore_agent(self):
- """
- Restore the agent description after update test.
- """
- description = self.agent['description'] or ''
- origin_agent = {'description': description}
- self.admin_client.update_agent(agent_id=self.agent['id'],
- agent_info=origin_agent)
diff --git a/neutron/tests/tempest/api/network/admin/test_dhcp_agent_scheduler.py b/neutron/tests/tempest/api/network/admin/test_dhcp_agent_scheduler.py
deleted file mode 100644
index 1812062..0000000
--- a/neutron/tests/tempest/api/network/admin/test_dhcp_agent_scheduler.py
+++ /dev/null
@@ -1,97 +0,0 @@
-# Copyright 2013 IBM Corp.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest import test
-
-
-class DHCPAgentSchedulersTestJSON(base.BaseAdminNetworkTest):
-
- @classmethod
- def resource_setup(cls):
- super(DHCPAgentSchedulersTestJSON, cls).resource_setup()
- if not test.is_extension_enabled('dhcp_agent_scheduler', 'network'):
- msg = "dhcp_agent_scheduler extension not enabled."
- raise cls.skipException(msg)
- # Create a network and make sure it will be hosted by a
- # dhcp agent: this is done by creating a regular port
- cls.network = cls.create_network()
- cls.subnet = cls.create_subnet(cls.network)
- cls.cidr = cls.subnet['cidr']
- cls.port = cls.create_port(cls.network)
-
- @test.attr(type='smoke')
- @test.idempotent_id('5032b1fe-eb42-4a64-8f3b-6e189d8b5c7d')
- def test_list_dhcp_agent_hosting_network(self):
- self.admin_client.list_dhcp_agent_hosting_network(
- self.network['id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('30c48f98-e45d-4ffb-841c-b8aad57c7587')
- def test_list_networks_hosted_by_one_dhcp(self):
- body = self.admin_client.list_dhcp_agent_hosting_network(
- self.network['id'])
- agents = body['agents']
- self.assertIsNotNone(agents)
- agent = agents[0]
- self.assertTrue(self._check_network_in_dhcp_agent(
- self.network['id'], agent))
-
- def _check_network_in_dhcp_agent(self, network_id, agent):
- network_ids = []
- body = self.admin_client.list_networks_hosted_by_one_dhcp_agent(
- agent['id'])
- networks = body['networks']
- for network in networks:
- network_ids.append(network['id'])
- return network_id in network_ids
-
- @test.attr(type='smoke')
- @test.idempotent_id('a0856713-6549-470c-a656-e97c8df9a14d')
- def test_add_remove_network_from_dhcp_agent(self):
- # The agent is now bound to the network, we can free the port
- self.client.delete_port(self.port['id'])
- self.ports.remove(self.port)
- agent = dict()
- agent['agent_type'] = None
- body = self.admin_client.list_agents()
- agents = body['agents']
- for a in agents:
- if a['agent_type'] == 'DHCP agent':
- agent = a
- break
- self.assertEqual(agent['agent_type'], 'DHCP agent', 'Could not find '
- 'DHCP agent in agent list though dhcp_agent_scheduler'
- ' is enabled.')
- network = self.create_network()
- network_id = network['id']
- if self._check_network_in_dhcp_agent(network_id, agent):
- self._remove_network_from_dhcp_agent(network_id, agent)
- self._add_dhcp_agent_to_network(network_id, agent)
- else:
- self._add_dhcp_agent_to_network(network_id, agent)
- self._remove_network_from_dhcp_agent(network_id, agent)
-
- def _remove_network_from_dhcp_agent(self, network_id, agent):
- self.admin_client.remove_network_from_dhcp_agent(
- agent_id=agent['id'],
- network_id=network_id)
- self.assertFalse(self._check_network_in_dhcp_agent(
- network_id, agent))
-
- def _add_dhcp_agent_to_network(self, network_id, agent):
- self.admin_client.add_dhcp_agent_to_network(agent['id'],
- network_id)
- self.assertTrue(self._check_network_in_dhcp_agent(
- network_id, agent))
diff --git a/neutron/tests/tempest/api/network/admin/test_external_network_extension.py b/neutron/tests/tempest/api/network/admin/test_external_network_extension.py
deleted file mode 100644
index 7935a8d..0000000
--- a/neutron/tests/tempest/api/network/admin/test_external_network_extension.py
+++ /dev/null
@@ -1,127 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest_lib.common.utils import data_utils
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest import test
-
-
-class ExternalNetworksTestJSON(base.BaseAdminNetworkTest):
-
- @classmethod
- def resource_setup(cls):
- super(ExternalNetworksTestJSON, cls).resource_setup()
- cls.network = cls.create_network()
-
- def _create_network(self, external=True):
- post_body = {'name': data_utils.rand_name('network-')}
- if external:
- post_body['router:external'] = external
- body = self.admin_client.create_network(**post_body)
- network = body['network']
- self.addCleanup(self.admin_client.delete_network, network['id'])
- return network
-
- @test.idempotent_id('462be770-b310-4df9-9c42-773217e4c8b1')
- def test_create_external_network(self):
- # Create a network as an admin user specifying the
- # external network extension attribute
- ext_network = self._create_network()
- # Verifies router:external parameter
- self.assertIsNotNone(ext_network['id'])
- self.assertTrue(ext_network['router:external'])
-
- @test.idempotent_id('4db5417a-e11c-474d-a361-af00ebef57c5')
- def test_update_external_network(self):
- # Update a network as an admin user specifying the
- # external network extension attribute
- network = self._create_network(external=False)
- self.assertFalse(network.get('router:external', False))
- update_body = {'router:external': True}
- body = self.admin_client.update_network(network['id'],
- **update_body)
- updated_network = body['network']
- # Verify that router:external parameter was updated
- self.assertTrue(updated_network['router:external'])
-
- @test.idempotent_id('39be4c9b-a57e-4ff9-b7c7-b218e209dfcc')
- def test_list_external_networks(self):
- # Create external_net
- external_network = self._create_network()
- # List networks as a normal user and confirm the external
- # network extension attribute is returned for those networks
- # that were created as external
- body = self.client.list_networks()
- networks_list = [net['id'] for net in body['networks']]
- self.assertIn(external_network['id'], networks_list)
- self.assertIn(self.network['id'], networks_list)
- for net in body['networks']:
- if net['id'] == self.network['id']:
- self.assertFalse(net['router:external'])
- elif net['id'] == external_network['id']:
- self.assertTrue(net['router:external'])
-
- @test.idempotent_id('2ac50ab2-7ebd-4e27-b3ce-a9e399faaea2')
- def test_show_external_networks_attribute(self):
- # Create external_net
- external_network = self._create_network()
- # Show an external network as a normal user and confirm the
- # external network extension attribute is returned.
- body = self.client.show_network(external_network['id'])
- show_ext_net = body['network']
- self.assertEqual(external_network['name'], show_ext_net['name'])
- self.assertEqual(external_network['id'], show_ext_net['id'])
- self.assertTrue(show_ext_net['router:external'])
- body = self.client.show_network(self.network['id'])
- show_net = body['network']
- # Verify with show that router:external is False for network
- self.assertEqual(self.network['name'], show_net['name'])
- self.assertEqual(self.network['id'], show_net['id'])
- self.assertFalse(show_net['router:external'])
-
- @test.idempotent_id('82068503-2cf2-4ed4-b3be-ecb89432e4bb')
- def test_delete_external_networks_with_floating_ip(self):
- """Verifies external network can be deleted while still holding
- (unassociated) floating IPs
-
- """
- # Set cls.client to admin to use base.create_subnet()
- client = self.admin_client
- body = client.create_network(**{'router:external': True})
- external_network = body['network']
- self.addCleanup(self._try_delete_resource,
- client.delete_network,
- external_network['id'])
- subnet = self.create_subnet(external_network, client=client,
- enable_dhcp=False)
- body = client.create_floatingip(
- floating_network_id=external_network['id'])
- created_floating_ip = body['floatingip']
- self.addCleanup(self._try_delete_resource,
- client.delete_floatingip,
- created_floating_ip['id'])
- floatingip_list = client.list_floatingips(
- network=external_network['id'])
- self.assertIn(created_floating_ip['id'],
- (f['id'] for f in floatingip_list['floatingips']))
- client.delete_network(external_network['id'])
- # Verifies floating ip is deleted
- floatingip_list = client.list_floatingips()
- self.assertNotIn(created_floating_ip['id'],
- (f['id'] for f in floatingip_list['floatingips']))
- # Verifies subnet is deleted
- subnet_list = client.list_subnets()
- self.assertNotIn(subnet['id'],
- (s['id'] for s in subnet_list))
- # Removes subnet from the cleanup list
- self.subnets.remove(subnet)
diff --git a/neutron/tests/tempest/api/network/admin/test_external_networks_negative.py b/neutron/tests/tempest/api/network/admin/test_external_networks_negative.py
deleted file mode 100644
index d9135a1..0000000
--- a/neutron/tests/tempest/api/network/admin/test_external_networks_negative.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# Copyright 2014 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest_lib import exceptions as lib_exc
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest import config
-from neutron.tests.tempest import test
-
-CONF = config.CONF
-
-
-class ExternalNetworksAdminNegativeTestJSON(base.BaseAdminNetworkTest):
-
- @test.attr(type=['negative'])
- @test.idempotent_id('d402ae6c-0be0-4d8e-833b-a738895d98d0')
- def test_create_port_with_precreated_floatingip_as_fixed_ip(self):
- """
- External networks can be used to create both floating-ip as well
- as instance-ip. So, creating an instance-ip with a value of a
- pre-created floating-ip should be denied.
- """
-
- # create a floating ip
- client = self.admin_client
- body = client.create_floatingip(
- floating_network_id=CONF.network.public_network_id)
- created_floating_ip = body['floatingip']
- self.addCleanup(self._try_delete_resource,
- client.delete_floatingip,
- created_floating_ip['id'])
- floating_ip_address = created_floating_ip['floating_ip_address']
- self.assertIsNotNone(floating_ip_address)
-
- # use the same value of floatingip as fixed-ip to create_port()
- fixed_ips = [{'ip_address': floating_ip_address}]
-
- # create a port which will internally create an instance-ip
- self.assertRaises(lib_exc.Conflict,
- client.create_port,
- network_id=CONF.network.public_network_id,
- fixed_ips=fixed_ips)
diff --git a/neutron/tests/tempest/api/network/admin/test_floating_ips_admin_actions.py b/neutron/tests/tempest/api/network/admin/test_floating_ips_admin_actions.py
deleted file mode 100644
index be1e8d6..0000000
--- a/neutron/tests/tempest/api/network/admin/test_floating_ips_admin_actions.py
+++ /dev/null
@@ -1,111 +0,0 @@
-# Copyright 2014 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest_lib.common.utils import data_utils
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.api.contrib import clients
-from neutron.tests.tempest import config
-from neutron.tests.tempest import test
-
-CONF = config.CONF
-
-
-class FloatingIPAdminTestJSON(base.BaseAdminNetworkTest):
-
- force_tenant_isolation = True
-
- @classmethod
- def resource_setup(cls):
- super(FloatingIPAdminTestJSON, cls).resource_setup()
- cls.ext_net_id = CONF.network.public_network_id
- cls.floating_ip = cls.create_floatingip(cls.ext_net_id)
- cls.alt_manager = clients.Manager(cls.isolated_creds.get_alt_creds())
- cls.alt_client = cls.alt_manager.network_client
- cls.network = cls.create_network()
- cls.subnet = cls.create_subnet(cls.network)
- cls.router = cls.create_router(data_utils.rand_name('router-'),
- external_network_id=cls.ext_net_id)
- cls.create_router_interface(cls.router['id'], cls.subnet['id'])
- cls.port = cls.create_port(cls.network)
-
- @test.attr(type='smoke')
- @test.idempotent_id('64f2100b-5471-4ded-b46c-ddeeeb4f231b')
- def test_list_floating_ips_from_admin_and_nonadmin(self):
- # Create floating ip from admin user
- floating_ip_admin = self.admin_client.create_floatingip(
- floating_network_id=self.ext_net_id)
- self.addCleanup(self.admin_client.delete_floatingip,
- floating_ip_admin['floatingip']['id'])
- # Create floating ip from alt user
- body = self.alt_client.create_floatingip(
- floating_network_id=self.ext_net_id)
- floating_ip_alt = body['floatingip']
- self.addCleanup(self.alt_client.delete_floatingip,
- floating_ip_alt['id'])
- # List floating ips from admin
- body = self.admin_client.list_floatingips()
- floating_ip_ids_admin = [f['id'] for f in body['floatingips']]
- # Check that admin sees all floating ips
- self.assertIn(self.floating_ip['id'], floating_ip_ids_admin)
- self.assertIn(floating_ip_admin['floatingip']['id'],
- floating_ip_ids_admin)
- self.assertIn(floating_ip_alt['id'], floating_ip_ids_admin)
- # List floating ips from nonadmin
- body = self.client.list_floatingips()
- floating_ip_ids = [f['id'] for f in body['floatingips']]
- # Check that nonadmin user doesn't see floating ip created from admin
- # and floating ip that is created in another tenant (alt user)
- self.assertIn(self.floating_ip['id'], floating_ip_ids)
- self.assertNotIn(floating_ip_admin['floatingip']['id'],
- floating_ip_ids)
- self.assertNotIn(floating_ip_alt['id'], floating_ip_ids)
-
- @test.attr(type='smoke')
- @test.idempotent_id('32727cc3-abe2-4485-a16e-48f2d54c14f2')
- def test_create_list_show_floating_ip_with_tenant_id_by_admin(self):
- # Creates a floating IP
- body = self.admin_client.create_floatingip(
- floating_network_id=self.ext_net_id,
- tenant_id=self.network['tenant_id'],
- port_id=self.port['id'])
- created_floating_ip = body['floatingip']
- self.addCleanup(self.client.delete_floatingip,
- created_floating_ip['id'])
- self.assertIsNotNone(created_floating_ip['id'])
- self.assertIsNotNone(created_floating_ip['tenant_id'])
- self.assertIsNotNone(created_floating_ip['floating_ip_address'])
- self.assertEqual(created_floating_ip['port_id'], self.port['id'])
- self.assertEqual(created_floating_ip['floating_network_id'],
- self.ext_net_id)
- port = self.port['fixed_ips']
- self.assertEqual(created_floating_ip['fixed_ip_address'],
- port[0]['ip_address'])
- # Verifies the details of a floating_ip
- floating_ip = self.admin_client.show_floatingip(
- created_floating_ip['id'])
- shown_floating_ip = floating_ip['floatingip']
- self.assertEqual(shown_floating_ip['id'], created_floating_ip['id'])
- self.assertEqual(shown_floating_ip['floating_network_id'],
- self.ext_net_id)
- self.assertEqual(shown_floating_ip['tenant_id'],
- self.network['tenant_id'])
- self.assertEqual(shown_floating_ip['floating_ip_address'],
- created_floating_ip['floating_ip_address'])
- self.assertEqual(shown_floating_ip['port_id'], self.port['id'])
- # Verify the floating ip exists in the list of all floating_ips
- floating_ips = self.admin_client.list_floatingips()
- floatingip_id_list = [f['id'] for f in floating_ips['floatingips']]
- self.assertIn(created_floating_ip['id'], floatingip_id_list)
diff --git a/neutron/tests/tempest/api/network/admin/test_l3_agent_scheduler.py b/neutron/tests/tempest/api/network/admin/test_l3_agent_scheduler.py
deleted file mode 100644
index a17b9b4..0000000
--- a/neutron/tests/tempest/api/network/admin/test_l3_agent_scheduler.py
+++ /dev/null
@@ -1,80 +0,0 @@
-# Copyright 2013 IBM Corp.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest_lib.common.utils import data_utils
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest import test
-
-
-class L3AgentSchedulerTestJSON(base.BaseAdminNetworkTest):
-
- """
- Tests the following operations in the Neutron API using the REST client for
- Neutron:
-
- List routers that the given L3 agent is hosting.
- List L3 agents hosting the given router.
- Add and Remove Router to L3 agent
-
- v2.0 of the Neutron API is assumed.
-
- The l3_agent_scheduler extension is required for these tests.
- """
-
- @classmethod
- def resource_setup(cls):
- super(L3AgentSchedulerTestJSON, cls).resource_setup()
- if not test.is_extension_enabled('l3_agent_scheduler', 'network'):
- msg = "L3 Agent Scheduler Extension not enabled."
- raise cls.skipException(msg)
- # Trying to get agent details for L3 Agent
- body = cls.admin_client.list_agents()
- agents = body['agents']
- for agent in agents:
- if agent['agent_type'] == 'L3 agent':
- cls.agent = agent
- break
- else:
- msg = "L3 Agent not found"
- raise cls.skipException(msg)
-
- @test.attr(type='smoke')
- @test.idempotent_id('b7ce6e89-e837-4ded-9b78-9ed3c9c6a45a')
- def test_list_routers_on_l3_agent(self):
- self.admin_client.list_routers_on_l3_agent(self.agent['id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('9464e5e7-8625-49c3-8fd1-89c52be59d66')
- def test_add_list_remove_router_on_l3_agent(self):
- l3_agent_ids = list()
- name = data_utils.rand_name('router1-')
- router = self.client.create_router(name)
- self.addCleanup(self.client.delete_router, router['router']['id'])
- self.admin_client.add_router_to_l3_agent(
- self.agent['id'],
- router['router']['id'])
- body = self.admin_client.list_l3_agents_hosting_router(
- router['router']['id'])
- for agent in body['agents']:
- l3_agent_ids.append(agent['id'])
- self.assertIn('agent_type', agent)
- self.assertEqual('L3 agent', agent['agent_type'])
- self.assertIn(self.agent['id'], l3_agent_ids)
- del l3_agent_ids[:]
- body = self.admin_client.remove_router_from_l3_agent(
- self.agent['id'],
- router['router']['id'])
- # NOTE(afazekas): The deletion not asserted, because neutron
- # is not forbidden to reschedule the router to the same agent
diff --git a/neutron/tests/tempest/api/network/admin/test_lbaas_agent_scheduler.py b/neutron/tests/tempest/api/network/admin/test_lbaas_agent_scheduler.py
deleted file mode 100644
index 0cd9239..0000000
--- a/neutron/tests/tempest/api/network/admin/test_lbaas_agent_scheduler.py
+++ /dev/null
@@ -1,73 +0,0 @@
-# Copyright 2013 IBM Corp.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest_lib.common.utils import data_utils
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest import test
-
-
-class LBaaSAgentSchedulerTestJSON(base.BaseAdminNetworkTest):
-
- """
- Tests the following operations in the Neutron API using the REST client for
- Neutron:
-
- List pools the given LBaaS agent is hosting.
- Show a LBaaS agent hosting the given pool.
-
- v2.0 of the Neutron API is assumed. It is also assumed that the following
- options are defined in the [networki-feature-enabled] section of
- etc/tempest.conf:
-
- api_extensions
- """
-
- @classmethod
- def resource_setup(cls):
- super(LBaaSAgentSchedulerTestJSON, cls).resource_setup()
- if not test.is_extension_enabled('lbaas_agent_scheduler', 'network'):
- msg = "LBaaS Agent Scheduler Extension not enabled."
- raise cls.skipException(msg)
- cls.network = cls.create_network()
- cls.subnet = cls.create_subnet(cls.network)
- pool_name = data_utils.rand_name('pool-')
- cls.pool = cls.create_pool(pool_name, "ROUND_ROBIN",
- "HTTP", cls.subnet)
-
- @test.attr(type='smoke')
- @test.idempotent_id('e5ea8b15-4f44-4350-963c-e0fcb533ee79')
- def test_list_pools_on_lbaas_agent(self):
- found = False
- body = self.admin_client.list_agents(
- agent_type="Loadbalancer agent")
- agents = body['agents']
- for a in agents:
- msg = 'Load Balancer agent expected'
- self.assertEqual(a['agent_type'], 'Loadbalancer agent', msg)
- body = (
- self.admin_client.list_pools_hosted_by_one_lbaas_agent(
- a['id']))
- pools = body['pools']
- if self.pool['id'] in [p['id'] for p in pools]:
- found = True
- msg = 'Unable to find Load Balancer agent hosting pool'
- self.assertTrue(found, msg)
-
- @test.attr(type='smoke')
- @test.idempotent_id('e2745593-fd79-4b98-a262-575fd7865796')
- def test_show_lbaas_agent_hosting_pool(self):
- body = self.admin_client.show_lbaas_agent_hosting_pool(
- self.pool['id'])
- self.assertEqual('Loadbalancer agent', body['agent']['agent_type'])
diff --git a/neutron/tests/tempest/api/network/admin/test_load_balancer_admin_actions.py b/neutron/tests/tempest/api/network/admin/test_load_balancer_admin_actions.py
deleted file mode 100644
index d05061d..0000000
--- a/neutron/tests/tempest/api/network/admin/test_load_balancer_admin_actions.py
+++ /dev/null
@@ -1,115 +0,0 @@
-# Copyright 2014 Mirantis.inc
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest_lib.common.utils import data_utils
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest import test
-
-
-class LoadBalancerAdminTestJSON(base.BaseAdminNetworkTest):
-
- """
- Test admin actions for load balancer.
-
- Create VIP for another tenant
- Create health monitor for another tenant
- """
-
- @classmethod
- def resource_setup(cls):
- super(LoadBalancerAdminTestJSON, cls).resource_setup()
- if not test.is_extension_enabled('lbaas', 'network'):
- msg = "lbaas extension not enabled."
- raise cls.skipException(msg)
- cls.force_tenant_isolation = True
- manager = cls.get_client_manager()
- cls.client = manager.network_client
- cls.tenant_id = cls.isolated_creds.get_primary_creds().tenant_id
- cls.network = cls.create_network()
- cls.subnet = cls.create_subnet(cls.network)
- cls.pool = cls.create_pool(data_utils.rand_name('pool-'),
- "ROUND_ROBIN", "HTTP", cls.subnet)
-
- @test.attr(type='smoke')
- @test.idempotent_id('6b0a20d8-4fcd-455e-b54f-ec4db5199518')
- def test_create_vip_as_admin_for_another_tenant(self):
- name = data_utils.rand_name('vip-')
- body = self.admin_client.create_pool(
- name=data_utils.rand_name('pool-'),
- lb_method="ROUND_ROBIN",
- protocol="HTTP",
- subnet_id=self.subnet['id'],
- tenant_id=self.tenant_id)
- pool = body['pool']
- self.addCleanup(self.admin_client.delete_pool, pool['id'])
- body = self.admin_client.create_vip(name=name,
- protocol="HTTP",
- protocol_port=80,
- subnet_id=self.subnet['id'],
- pool_id=pool['id'],
- tenant_id=self.tenant_id)
- vip = body['vip']
- self.addCleanup(self.admin_client.delete_vip, vip['id'])
- self.assertIsNotNone(vip['id'])
- self.assertEqual(self.tenant_id, vip['tenant_id'])
- body = self.client.show_vip(vip['id'])
- show_vip = body['vip']
- self.assertEqual(vip['id'], show_vip['id'])
- self.assertEqual(vip['name'], show_vip['name'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('74552cfc-ab78-4fb6-825b-f67bca379921')
- def test_create_health_monitor_as_admin_for_another_tenant(self):
- body = (
- self.admin_client.create_health_monitor(delay=4,
- max_retries=3,
- type="TCP",
- timeout=1,
- tenant_id=self.tenant_id))
- health_monitor = body['health_monitor']
- self.addCleanup(self.admin_client.delete_health_monitor,
- health_monitor['id'])
- self.assertIsNotNone(health_monitor['id'])
- self.assertEqual(self.tenant_id, health_monitor['tenant_id'])
- body = self.client.show_health_monitor(health_monitor['id'])
- show_health_monitor = body['health_monitor']
- self.assertEqual(health_monitor['id'], show_health_monitor['id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('266a192d-3c22-46c4-a8fb-802450301e82')
- def test_create_pool_from_admin_user_other_tenant(self):
- body = self.admin_client.create_pool(
- name=data_utils.rand_name('pool-'),
- lb_method="ROUND_ROBIN",
- protocol="HTTP",
- subnet_id=self.subnet['id'],
- tenant_id=self.tenant_id)
- pool = body['pool']
- self.addCleanup(self.admin_client.delete_pool, pool['id'])
- self.assertIsNotNone(pool['id'])
- self.assertEqual(self.tenant_id, pool['tenant_id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('158bb272-b9ed-4cfc-803c-661dac46f783')
- def test_create_member_from_admin_user_other_tenant(self):
- body = self.admin_client.create_member(address="10.0.9.47",
- protocol_port=80,
- pool_id=self.pool['id'],
- tenant_id=self.tenant_id)
- member = body['member']
- self.addCleanup(self.admin_client.delete_member, member['id'])
- self.assertIsNotNone(member['id'])
- self.assertEqual(self.tenant_id, member['tenant_id'])
diff --git a/neutron/tests/tempest/api/network/admin/test_quotas.py b/neutron/tests/tempest/api/network/admin/test_quotas.py
deleted file mode 100644
index 6bac991..0000000
--- a/neutron/tests/tempest/api/network/admin/test_quotas.py
+++ /dev/null
@@ -1,97 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest_lib.common.utils import data_utils
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest import test
-
-
-class QuotasTest(base.BaseAdminNetworkTest):
-
- """
- Tests the following operations in the Neutron API using the REST client for
- Neutron:
-
- list quotas for tenants who have non-default quota values
- show quotas for a specified tenant
- update quotas for a specified tenant
- reset quotas to default values for a specified tenant
-
- v2.0 of the API is assumed.
- It is also assumed that the per-tenant quota extension API is configured
- in /etc/neutron/neutron.conf as follows:
-
- quota_driver = neutron.db.quota_db.DbQuotaDriver
- """
-
- @classmethod
- def resource_setup(cls):
- super(QuotasTest, cls).resource_setup()
- if not test.is_extension_enabled('quotas', 'network'):
- msg = "quotas extension not enabled."
- raise cls.skipException(msg)
- cls.identity_admin_client = cls.os_adm.identity_client
-
- def _check_quotas(self, new_quotas):
- # Add a tenant to conduct the test
- test_tenant = data_utils.rand_name('test_tenant_')
- test_description = data_utils.rand_name('desc_')
- tenant = self.identity_admin_client.create_tenant(
- name=test_tenant,
- description=test_description)
- tenant_id = tenant['id']
- self.addCleanup(self.identity_admin_client.delete_tenant, tenant_id)
-
- # Change quotas for tenant
- quota_set = self.admin_client.update_quotas(tenant_id,
- **new_quotas)
- self.addCleanup(self.admin_client.reset_quotas, tenant_id)
- for key, value in new_quotas.iteritems():
- self.assertEqual(value, quota_set[key])
-
- # Confirm our tenant is listed among tenants with non default quotas
- non_default_quotas = self.admin_client.list_quotas()
- found = False
- for qs in non_default_quotas['quotas']:
- if qs['tenant_id'] == tenant_id:
- found = True
- self.assertTrue(found)
-
- # Confirm from API quotas were changed as requested for tenant
- quota_set = self.admin_client.show_quotas(tenant_id)
- quota_set = quota_set['quota']
- for key, value in new_quotas.iteritems():
- self.assertEqual(value, quota_set[key])
-
- # Reset quotas to default and confirm
- self.admin_client.reset_quotas(tenant_id)
- non_default_quotas = self.admin_client.list_quotas()
- for q in non_default_quotas['quotas']:
- self.assertNotEqual(tenant_id, q['tenant_id'])
-
- @test.attr(type='gate')
- @test.idempotent_id('2390f766-836d-40ef-9aeb-e810d78207fb')
- def test_quotas(self):
- new_quotas = {'network': 0, 'security_group': 0}
- self._check_quotas(new_quotas)
-
- @test.idempotent_id('a7add2b1-691e-44d6-875f-697d9685f091')
- @test.requires_ext(extension='lbaas', service='network')
- @test.attr(type='gate')
- def test_lbaas_quotas(self):
- new_quotas = {'vip': 1, 'pool': 2,
- 'member': 3, 'health_monitor': 4}
- self._check_quotas(new_quotas)
diff --git a/neutron/tests/tempest/api/network/admin/test_routers_dvr.py b/neutron/tests/tempest/api/network/admin/test_routers_dvr.py
deleted file mode 100644
index 54db066..0000000
--- a/neutron/tests/tempest/api/network/admin/test_routers_dvr.py
+++ /dev/null
@@ -1,102 +0,0 @@
-# Copyright 2015 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest_lib.common.utils import data_utils
-
-from neutron.tests.tempest.api.network import base_routers as base
-from neutron.tests.tempest import test
-
-
-class RoutersTestDVR(base.BaseRouterTest):
-
- @classmethod
- def resource_setup(cls):
- for ext in ['router', 'dvr']:
- if not test.is_extension_enabled(ext, 'network'):
- msg = "%s extension not enabled." % ext
- raise cls.skipException(msg)
- # The check above will pass if api_extensions=all, which does
- # not mean DVR extension itself is present.
- # Instead, we have to check whether DVR is actually present by using
- # admin credentials to create router with distributed=True attribute
- # and checking for BadRequest exception and that the resulting router
- # has a distributed attribute.
- super(RoutersTestDVR, cls).resource_setup()
- name = data_utils.rand_name('pretest-check')
- router = cls.admin_client.create_router(name)
- if 'distributed' not in router['router']:
- msg = "'distributed' attribute not found. DVR Possibly not enabled"
- raise cls.skipException(msg)
- cls.admin_client.delete_router(router['router']['id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('08a2a0a8-f1e4-4b34-8e30-e522e836c44e')
- def test_distributed_router_creation(self):
- """
- Test uses administrative credentials to creates a
- DVR (Distributed Virtual Routing) router using the
- distributed=True.
-
- Acceptance
- The router is created and the "distributed" attribute is
- set to True
- """
- name = data_utils.rand_name('router')
- router = self.admin_client.create_router(name, distributed=True)
- self.addCleanup(self.admin_client.delete_router,
- router['router']['id'])
- self.assertTrue(router['router']['distributed'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('8a0a72b4-7290-4677-afeb-b4ffe37bc352')
- def test_centralized_router_creation(self):
- """
- Test uses administrative credentials to creates a
- CVR (Centralized Virtual Routing) router using the
- distributed=False.
-
- Acceptance
- The router is created and the "distributed" attribute is
- set to False, thus making it a "Centralized Virtual Router"
- as opposed to a "Distributed Virtual Router"
- """
- name = data_utils.rand_name('router')
- router = self.admin_client.create_router(name, distributed=False)
- self.addCleanup(self.admin_client.delete_router,
- router['router']['id'])
- self.assertFalse(router['router']['distributed'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('acd43596-c1fb-439d-ada8-31ad48ae3c2e')
- def test_centralized_router_update_to_dvr(self):
- """
- Test uses administrative credentials to creates a
- CVR (Centralized Virtual Routing) router using the
- distributed=False.Then it will "update" the router
- distributed attribute to True
-
- Acceptance
- The router is created and the "distributed" attribute is
- set to False. Once the router is updated, the distributed
- attribute will be set to True
- """
- name = data_utils.rand_name('router')
- router = self.admin_client.create_router(name, distributed=False)
- self.addCleanup(self.admin_client.delete_router,
- router['router']['id'])
- self.assertFalse(router['router']['distributed'])
- router = self.admin_client.update_router(router['router']['id'],
- distributed=True)
- self.assertTrue(router['router']['distributed'])
diff --git a/neutron/tests/tempest/api/network/base.py b/neutron/tests/tempest/api/network/base.py
deleted file mode 100644
index 1efec96..0000000
--- a/neutron/tests/tempest/api/network/base.py
+++ /dev/null
@@ -1,465 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import netaddr
-from oslo_log import log as logging
-from tempest_lib.common.utils import data_utils
-from tempest_lib import exceptions as lib_exc
-
-from neutron.tests.api.contrib import clients
-from neutron.tests.tempest import config
-from neutron.tests.tempest import exceptions
-import neutron.tests.tempest.test
-
-CONF = config.CONF
-
-LOG = logging.getLogger(__name__)
-
-
-class BaseNetworkTest(neutron.tests.tempest.test.BaseTestCase):
-
- """
- Base class for the Neutron tests that use the Tempest Neutron REST client
-
- Per the Neutron API Guide, API v1.x was removed from the source code tree
- (docs.openstack.org/api/openstack-network/2.0/content/Overview-d1e71.html)
- Therefore, v2.x of the Neutron API is assumed. It is also assumed that the
- following options are defined in the [network] section of etc/tempest.conf:
-
- tenant_network_cidr with a block of cidr's from which smaller blocks
- can be allocated for tenant networks
-
- tenant_network_mask_bits with the mask bits to be used to partition the
- block defined by tenant-network_cidr
-
- Finally, it is assumed that the following option is defined in the
- [service_available] section of etc/tempest.conf
-
- neutron as True
- """
-
- force_tenant_isolation = False
-
- # Default to ipv4.
- _ip_version = 4
-
- @classmethod
- def resource_setup(cls):
- # Create no network resources for these test.
- cls.set_network_resources()
- super(BaseNetworkTest, cls).resource_setup()
- if not CONF.service_available.neutron:
- raise cls.skipException("Neutron support is required")
- if cls._ip_version == 6 and not CONF.network_feature_enabled.ipv6:
- raise cls.skipException("IPv6 Tests are disabled.")
-
- os = cls.get_client_manager()
-
- cls.network_cfg = CONF.network
- cls.client = os.network_client
- cls.networks = []
- cls.shared_networks = []
- cls.subnets = []
- cls.ports = []
- cls.routers = []
- cls.pools = []
- cls.vips = []
- cls.members = []
- cls.health_monitors = []
- cls.vpnservices = []
- cls.ikepolicies = []
- cls.floating_ips = []
- cls.metering_labels = []
- cls.metering_label_rules = []
- cls.fw_rules = []
- cls.fw_policies = []
- cls.ipsecpolicies = []
- cls.ethertype = "IPv" + str(cls._ip_version)
-
- @classmethod
- def resource_cleanup(cls):
- if CONF.service_available.neutron:
- # Clean up ipsec policies
- for ipsecpolicy in cls.ipsecpolicies:
- cls._try_delete_resource(cls.client.delete_ipsecpolicy,
- ipsecpolicy['id'])
- # Clean up firewall policies
- for fw_policy in cls.fw_policies:
- cls._try_delete_resource(cls.client.delete_firewall_policy,
- fw_policy['id'])
- # Clean up firewall rules
- for fw_rule in cls.fw_rules:
- cls._try_delete_resource(cls.client.delete_firewall_rule,
- fw_rule['id'])
- # Clean up ike policies
- for ikepolicy in cls.ikepolicies:
- cls._try_delete_resource(cls.client.delete_ikepolicy,
- ikepolicy['id'])
- # Clean up vpn services
- for vpnservice in cls.vpnservices:
- cls._try_delete_resource(cls.client.delete_vpnservice,
- vpnservice['id'])
- # Clean up floating IPs
- for floating_ip in cls.floating_ips:
- cls._try_delete_resource(cls.client.delete_floatingip,
- floating_ip['id'])
- # Clean up routers
- for router in cls.routers:
- cls._try_delete_resource(cls.delete_router,
- router)
-
- # Clean up health monitors
- for health_monitor in cls.health_monitors:
- cls._try_delete_resource(cls.client.delete_health_monitor,
- health_monitor['id'])
- # Clean up members
- for member in cls.members:
- cls._try_delete_resource(cls.client.delete_member,
- member['id'])
- # Clean up vips
- for vip in cls.vips:
- cls._try_delete_resource(cls.client.delete_vip,
- vip['id'])
- # Clean up pools
- for pool in cls.pools:
- cls._try_delete_resource(cls.client.delete_pool,
- pool['id'])
- # Clean up metering label rules
- for metering_label_rule in cls.metering_label_rules:
- cls._try_delete_resource(
- cls.admin_client.delete_metering_label_rule,
- metering_label_rule['id'])
- # Clean up metering labels
- for metering_label in cls.metering_labels:
- cls._try_delete_resource(
- cls.admin_client.delete_metering_label,
- metering_label['id'])
- # Clean up ports
- for port in cls.ports:
- cls._try_delete_resource(cls.client.delete_port,
- port['id'])
- # Clean up subnets
- for subnet in cls.subnets:
- cls._try_delete_resource(cls.client.delete_subnet,
- subnet['id'])
- # Clean up networks
- for network in cls.networks:
- cls._try_delete_resource(cls.client.delete_network,
- network['id'])
-
- # Clean up shared networks
- for network in cls.shared_networks:
- cls._try_delete_resource(cls.admin_client.delete_network,
- network['id'])
-
- cls.clear_isolated_creds()
- super(BaseNetworkTest, cls).resource_cleanup()
-
- @classmethod
- def _try_delete_resource(self, delete_callable, *args, **kwargs):
- """Cleanup resources in case of test-failure
-
- Some resources are explicitly deleted by the test.
- If the test failed to delete a resource, this method will execute
- the appropriate delete methods. Otherwise, the method ignores NotFound
- exceptions thrown for resources that were correctly deleted by the
- test.
-
- :param delete_callable: delete method
- :param args: arguments for delete method
- :param kwargs: keyword arguments for delete method
- """
- try:
- delete_callable(*args, **kwargs)
- # if resource is not found, this means it was deleted in the test
- except lib_exc.NotFound:
- pass
-
- @classmethod
- def create_network(cls, network_name=None):
- """Wrapper utility that returns a test network."""
- network_name = network_name or data_utils.rand_name('test-network-')
-
- body = cls.client.create_network(name=network_name)
- network = body['network']
- cls.networks.append(network)
- return network
-
- @classmethod
- def create_shared_network(cls, network_name=None):
- network_name = network_name or data_utils.rand_name('sharednetwork-')
- post_body = {'name': network_name, 'shared': True}
- body = cls.admin_client.create_network(**post_body)
- network = body['network']
- cls.shared_networks.append(network)
- return network
-
- @classmethod
- def create_subnet(cls, network, gateway='', cidr=None, mask_bits=None,
- ip_version=None, client=None, **kwargs):
- """Wrapper utility that returns a test subnet."""
-
- # allow tests to use admin client
- if not client:
- client = cls.client
-
- # The cidr and mask_bits depend on the ip version.
- ip_version = ip_version if ip_version is not None else cls._ip_version
- gateway_not_set = gateway == ''
- if ip_version == 4:
- cidr = cidr or netaddr.IPNetwork(CONF.network.tenant_network_cidr)
- mask_bits = mask_bits or CONF.network.tenant_network_mask_bits
- elif ip_version == 6:
- cidr = (
- cidr or netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr))
- mask_bits = mask_bits or CONF.network.tenant_network_v6_mask_bits
- # Find a cidr that is not in use yet and create a subnet with it
- for subnet_cidr in cidr.subnet(mask_bits):
- if gateway_not_set:
- gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
- else:
- gateway_ip = gateway
- try:
- body = client.create_subnet(
- network_id=network['id'],
- cidr=str(subnet_cidr),
- ip_version=ip_version,
- gateway_ip=gateway_ip,
- **kwargs)
- break
- except lib_exc.BadRequest as e:
- is_overlapping_cidr = 'overlaps with another subnet' in str(e)
- if not is_overlapping_cidr:
- raise
- else:
- message = 'Available CIDR for subnet creation could not be found'
- raise exceptions.BuildErrorException(message)
- subnet = body['subnet']
- cls.subnets.append(subnet)
- return subnet
-
- @classmethod
- def create_port(cls, network, **kwargs):
- """Wrapper utility that returns a test port."""
- body = cls.client.create_port(network_id=network['id'],
- **kwargs)
- port = body['port']
- cls.ports.append(port)
- return port
-
- @classmethod
- def update_port(cls, port, **kwargs):
- """Wrapper utility that updates a test port."""
- body = cls.client.update_port(port['id'],
- **kwargs)
- return body['port']
-
- @classmethod
- def create_router(cls, router_name=None, admin_state_up=False,
- external_network_id=None, enable_snat=None):
- ext_gw_info = {}
- if external_network_id:
- ext_gw_info['network_id'] = external_network_id
- if enable_snat:
- ext_gw_info['enable_snat'] = enable_snat
- body = cls.client.create_router(
- router_name, external_gateway_info=ext_gw_info,
- admin_state_up=admin_state_up)
- router = body['router']
- cls.routers.append(router)
- return router
-
- @classmethod
- def create_floatingip(cls, external_network_id):
- """Wrapper utility that returns a test floating IP."""
- body = cls.client.create_floatingip(
- floating_network_id=external_network_id)
- fip = body['floatingip']
- cls.floating_ips.append(fip)
- return fip
-
- @classmethod
- def create_pool(cls, name, lb_method, protocol, subnet):
- """Wrapper utility that returns a test pool."""
- body = cls.client.create_pool(
- name=name,
- lb_method=lb_method,
- protocol=protocol,
- subnet_id=subnet['id'])
- pool = body['pool']
- cls.pools.append(pool)
- return pool
-
- @classmethod
- def update_pool(cls, name):
- """Wrapper utility that returns a test pool."""
- body = cls.client.update_pool(name=name)
- pool = body['pool']
- return pool
-
- @classmethod
- def create_vip(cls, name, protocol, protocol_port, subnet, pool):
- """Wrapper utility that returns a test vip."""
- body = cls.client.create_vip(name=name,
- protocol=protocol,
- protocol_port=protocol_port,
- subnet_id=subnet['id'],
- pool_id=pool['id'])
- vip = body['vip']
- cls.vips.append(vip)
- return vip
-
- @classmethod
- def update_vip(cls, name):
- body = cls.client.update_vip(name=name)
- vip = body['vip']
- return vip
-
- @classmethod
- def create_member(cls, protocol_port, pool, ip_version=None):
- """Wrapper utility that returns a test member."""
- ip_version = ip_version if ip_version is not None else cls._ip_version
- member_address = "fd00::abcd" if ip_version == 6 else "10.0.9.46"
- body = cls.client.create_member(address=member_address,
- protocol_port=protocol_port,
- pool_id=pool['id'])
- member = body['member']
- cls.members.append(member)
- return member
-
- @classmethod
- def update_member(cls, admin_state_up):
- body = cls.client.update_member(admin_state_up=admin_state_up)
- member = body['member']
- return member
-
- @classmethod
- def create_health_monitor(cls, delay, max_retries, Type, timeout):
- """Wrapper utility that returns a test health monitor."""
- body = cls.client.create_health_monitor(delay=delay,
- max_retries=max_retries,
- type=Type,
- timeout=timeout)
- health_monitor = body['health_monitor']
- cls.health_monitors.append(health_monitor)
- return health_monitor
-
- @classmethod
- def update_health_monitor(cls, admin_state_up):
- body = cls.client.update_vip(admin_state_up=admin_state_up)
- health_monitor = body['health_monitor']
- return health_monitor
-
- @classmethod
- def create_router_interface(cls, router_id, subnet_id):
- """Wrapper utility that returns a router interface."""
- interface = cls.client.add_router_interface_with_subnet_id(
- router_id, subnet_id)
- return interface
-
- @classmethod
- def create_vpnservice(cls, subnet_id, router_id):
- """Wrapper utility that returns a test vpn service."""
- body = cls.client.create_vpnservice(
- subnet_id=subnet_id, router_id=router_id, admin_state_up=True,
- name=data_utils.rand_name("vpnservice-"))
- vpnservice = body['vpnservice']
- cls.vpnservices.append(vpnservice)
- return vpnservice
-
- @classmethod
- def create_ikepolicy(cls, name):
- """Wrapper utility that returns a test ike policy."""
- body = cls.client.create_ikepolicy(name=name)
- ikepolicy = body['ikepolicy']
- cls.ikepolicies.append(ikepolicy)
- return ikepolicy
-
- @classmethod
- def create_firewall_rule(cls, action, protocol):
- """Wrapper utility that returns a test firewall rule."""
- body = cls.client.create_firewall_rule(
- name=data_utils.rand_name("fw-rule"),
- action=action,
- protocol=protocol)
- fw_rule = body['firewall_rule']
- cls.fw_rules.append(fw_rule)
- return fw_rule
-
- @classmethod
- def create_firewall_policy(cls):
- """Wrapper utility that returns a test firewall policy."""
- body = cls.client.create_firewall_policy(
- name=data_utils.rand_name("fw-policy"))
- fw_policy = body['firewall_policy']
- cls.fw_policies.append(fw_policy)
- return fw_policy
-
- @classmethod
- def delete_router(cls, router):
- body = cls.client.list_router_interfaces(router['id'])
- interfaces = body['ports']
- for i in interfaces:
- try:
- cls.client.remove_router_interface_with_subnet_id(
- router['id'], i['fixed_ips'][0]['subnet_id'])
- except lib_exc.NotFound:
- pass
- cls.client.delete_router(router['id'])
-
- @classmethod
- def create_ipsecpolicy(cls, name):
- """Wrapper utility that returns a test ipsec policy."""
- body = cls.client.create_ipsecpolicy(name=name)
- ipsecpolicy = body['ipsecpolicy']
- cls.ipsecpolicies.append(ipsecpolicy)
- return ipsecpolicy
-
-
-class BaseAdminNetworkTest(BaseNetworkTest):
-
- @classmethod
- def resource_setup(cls):
- super(BaseAdminNetworkTest, cls).resource_setup()
-
- try:
- creds = cls.isolated_creds.get_admin_creds()
- cls.os_adm = clients.Manager(credentials=creds)
- except NotImplementedError:
- msg = ("Missing Administrative Network API credentials "
- "in configuration.")
- raise cls.skipException(msg)
- cls.admin_client = cls.os_adm.network_client
-
- @classmethod
- def create_metering_label(cls, name, description):
- """Wrapper utility that returns a test metering label."""
- body = cls.admin_client.create_metering_label(
- description=description,
- name=data_utils.rand_name("metering-label"))
- metering_label = body['metering_label']
- cls.metering_labels.append(metering_label)
- return metering_label
-
- @classmethod
- def create_metering_label_rule(cls, remote_ip_prefix, direction,
- metering_label_id):
- """Wrapper utility that returns a test metering label rule."""
- body = cls.admin_client.create_metering_label_rule(
- remote_ip_prefix=remote_ip_prefix, direction=direction,
- metering_label_id=metering_label_id)
- metering_label_rule = body['metering_label_rule']
- cls.metering_label_rules.append(metering_label_rule)
- return metering_label_rule
diff --git a/neutron/tests/tempest/api/network/base_routers.py b/neutron/tests/tempest/api/network/base_routers.py
deleted file mode 100644
index 4f02501..0000000
--- a/neutron/tests/tempest/api/network/base_routers.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from neutron.tests.tempest.api.network import base
-
-
-class BaseRouterTest(base.BaseAdminNetworkTest):
- # NOTE(salv-orlando): This class inherits from BaseAdminNetworkTest
- # as some router operations, such as enabling or disabling SNAT
- # require admin credentials by default
-
- @classmethod
- def resource_setup(cls):
- super(BaseRouterTest, cls).resource_setup()
-
- def _delete_router(self, router_id):
- self.client.delete_router(router_id)
- # Asserting that the router is not found in the list
- # after deletion
- list_body = self.client.list_routers()
- routers_list = list()
- for router in list_body['routers']:
- routers_list.append(router['id'])
- self.assertNotIn(router_id, routers_list)
-
- def _add_router_interface_with_subnet_id(self, router_id, subnet_id):
- interface = self.client.add_router_interface_with_subnet_id(
- router_id, subnet_id)
- self.addCleanup(self._remove_router_interface_with_subnet_id,
- router_id, subnet_id)
- self.assertEqual(subnet_id, interface['subnet_id'])
- return interface
-
- def _remove_router_interface_with_subnet_id(self, router_id, subnet_id):
- body = self.client.remove_router_interface_with_subnet_id(
- router_id, subnet_id)
- self.assertEqual(subnet_id, body['subnet_id'])
-
- def _remove_router_interface_with_port_id(self, router_id, port_id):
- body = self.client.remove_router_interface_with_port_id(router_id,
- port_id)
- self.assertEqual(port_id, body['port_id'])
diff --git a/neutron/tests/tempest/api/network/base_security_groups.py b/neutron/tests/tempest/api/network/base_security_groups.py
deleted file mode 100644
index 019eca4..0000000
--- a/neutron/tests/tempest/api/network/base_security_groups.py
+++ /dev/null
@@ -1,54 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest_lib.common.utils import data_utils
-
-from neutron.tests.tempest.api.network import base
-
-
-class BaseSecGroupTest(base.BaseNetworkTest):
-
- @classmethod
- def resource_setup(cls):
- super(BaseSecGroupTest, cls).resource_setup()
-
- def _create_security_group(self):
- # Create a security group
- name = data_utils.rand_name('secgroup-')
- group_create_body = self.client.create_security_group(name=name)
- self.addCleanup(self._delete_security_group,
- group_create_body['security_group']['id'])
- self.assertEqual(group_create_body['security_group']['name'], name)
- return group_create_body, name
-
- def _delete_security_group(self, secgroup_id):
- self.client.delete_security_group(secgroup_id)
- # Asserting that the security group is not found in the list
- # after deletion
- list_body = self.client.list_security_groups()
- secgroup_list = list()
- for secgroup in list_body['security_groups']:
- secgroup_list.append(secgroup['id'])
- self.assertNotIn(secgroup_id, secgroup_list)
-
- def _delete_security_group_rule(self, rule_id):
- self.client.delete_security_group_rule(rule_id)
- # Asserting that the security group is not found in the list
- # after deletion
- list_body = self.client.list_security_group_rules()
- rules_list = list()
- for rule in list_body['security_group_rules']:
- rules_list.append(rule['id'])
- self.assertNotIn(rule_id, rules_list)
diff --git a/neutron/tests/tempest/api/network/test_allowed_address_pair.py b/neutron/tests/tempest/api/network/test_allowed_address_pair.py
deleted file mode 100644
index 641223e..0000000
--- a/neutron/tests/tempest/api/network/test_allowed_address_pair.py
+++ /dev/null
@@ -1,134 +0,0 @@
-# Copyright 2014 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import netaddr
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest import config
-from neutron.tests.tempest import test
-
-CONF = config.CONF
-
-
-class AllowedAddressPairTestJSON(base.BaseNetworkTest):
-
- """
- Tests the Neutron Allowed Address Pair API extension using the Tempest
- ReST client. The following API operations are tested with this extension:
-
- create port
- list ports
- update port
- show port
-
- v2.0 of the Neutron API is assumed. It is also assumed that the following
- options are defined in the [network-feature-enabled] section of
- etc/tempest.conf
-
- api_extensions
- """
-
- @classmethod
- def resource_setup(cls):
- super(AllowedAddressPairTestJSON, cls).resource_setup()
- if not test.is_extension_enabled('allowed-address-pairs', 'network'):
- msg = "Allowed Address Pairs extension not enabled."
- raise cls.skipException(msg)
- cls.network = cls.create_network()
- cls.create_subnet(cls.network)
- port = cls.create_port(cls.network)
- cls.ip_address = port['fixed_ips'][0]['ip_address']
- cls.mac_address = port['mac_address']
-
- @test.attr(type='smoke')
- @test.idempotent_id('86c3529b-1231-40de-803c-00e40882f043')
- def test_create_list_port_with_address_pair(self):
- # Create port with allowed address pair attribute
- allowed_address_pairs = [{'ip_address': self.ip_address,
- 'mac_address': self.mac_address}]
- body = self.client.create_port(
- network_id=self.network['id'],
- allowed_address_pairs=allowed_address_pairs)
- port_id = body['port']['id']
- self.addCleanup(self.client.delete_port, port_id)
-
- # Confirm port was created with allowed address pair attribute
- body = self.client.list_ports()
- ports = body['ports']
- port = [p for p in ports if p['id'] == port_id]
- msg = 'Created port not found in list of ports returned by Neutron'
- self.assertTrue(port, msg)
- self._confirm_allowed_address_pair(port[0], self.ip_address)
-
- @test.attr(type='smoke')
- def _update_port_with_address(self, address, mac_address=None, **kwargs):
- # Create a port without allowed address pair
- body = self.client.create_port(network_id=self.network['id'])
- port_id = body['port']['id']
- self.addCleanup(self.client.delete_port, port_id)
- if mac_address is None:
- mac_address = self.mac_address
-
- # Update allowed address pair attribute of port
- allowed_address_pairs = [{'ip_address': address,
- 'mac_address': mac_address}]
- if kwargs:
- allowed_address_pairs.append(kwargs['allowed_address_pairs'])
- body = self.client.update_port(
- port_id, allowed_address_pairs=allowed_address_pairs)
- allowed_address_pair = body['port']['allowed_address_pairs']
- self.assertEqual(allowed_address_pair, allowed_address_pairs)
-
- @test.attr(type='smoke')
- @test.idempotent_id('9599b337-272c-47fd-b3cf-509414414ac4')
- def test_update_port_with_address_pair(self):
- # Update port with allowed address pair
- self._update_port_with_address(self.ip_address)
-
- @test.attr(type='smoke')
- @test.idempotent_id('4d6d178f-34f6-4bff-a01c-0a2f8fe909e4')
- def test_update_port_with_cidr_address_pair(self):
- # Update allowed address pair with cidr
- cidr = str(netaddr.IPNetwork(CONF.network.tenant_network_cidr))
- self._update_port_with_address(cidr)
-
- @test.attr(type='smoke')
- @test.idempotent_id('b3f20091-6cd5-472b-8487-3516137df933')
- def test_update_port_with_multiple_ip_mac_address_pair(self):
- # Create an ip _address and mac_address through port create
- resp = self.client.create_port(network_id=self.network['id'])
- newportid = resp['port']['id']
- self.addCleanup(self.client.delete_port, newportid)
- ipaddress = resp['port']['fixed_ips'][0]['ip_address']
- macaddress = resp['port']['mac_address']
-
- # Update allowed address pair port with multiple ip and mac
- allowed_address_pairs = {'ip_address': ipaddress,
- 'mac_address': macaddress}
- self._update_port_with_address(
- self.ip_address, self.mac_address,
- allowed_address_pairs=allowed_address_pairs)
-
- def _confirm_allowed_address_pair(self, port, ip):
- msg = 'Port allowed address pairs should not be empty'
- self.assertTrue(port['allowed_address_pairs'], msg)
- ip_address = port['allowed_address_pairs'][0]['ip_address']
- mac_address = port['allowed_address_pairs'][0]['mac_address']
- self.assertEqual(ip_address, ip)
- self.assertEqual(mac_address, self.mac_address)
-
-
-class AllowedAddressPairIpV6TestJSON(AllowedAddressPairTestJSON):
- _ip_version = 6
diff --git a/neutron/tests/tempest/api/network/test_dhcp_ipv6.py b/neutron/tests/tempest/api/network/test_dhcp_ipv6.py
deleted file mode 100644
index cb292b8..0000000
--- a/neutron/tests/tempest/api/network/test_dhcp_ipv6.py
+++ /dev/null
@@ -1,402 +0,0 @@
-# Copyright 2014 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import netaddr
-import random
-
-from tempest_lib.common.utils import data_utils
-from tempest_lib import exceptions as lib_exc
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest import config
-from neutron.tests.tempest import test
-
-CONF = config.CONF
-
-
-class NetworksTestDHCPv6(base.BaseNetworkTest):
- _ip_version = 6
-
- """ Test DHCPv6 specific features using SLAAC, stateless and
- stateful settings for subnets. Also it shall check dual-stack
- functionality (IPv4 + IPv6 together).
- The tests include:
- generating of SLAAC EUI-64 address in subnets with various settings
- receiving SLAAC addresses in combinations of various subnets
- receiving stateful IPv6 addresses
- addressing in subnets with router
- """
-
- @classmethod
- def skip_checks(cls):
- msg = None
- if not CONF.network_feature_enabled.ipv6:
- msg = "IPv6 is not enabled"
- elif not CONF.network_feature_enabled.ipv6_subnet_attributes:
- msg = "DHCPv6 attributes are not enabled."
- if msg:
- raise cls.skipException(msg)
-
- @classmethod
- def resource_setup(cls):
- super(NetworksTestDHCPv6, cls).resource_setup()
- cls.network = cls.create_network()
-
- def _remove_from_list_by_index(self, things_list, elem):
- for index, i in enumerate(things_list):
- if i['id'] == elem['id']:
- break
- del things_list[index]
-
- def _clean_network(self):
- body = self.client.list_ports()
- ports = body['ports']
- for port in ports:
- if (port['device_owner'].startswith('network:router_interface')
- and port['device_id'] in [r['id'] for r in self.routers]):
- self.client.remove_router_interface_with_port_id(
- port['device_id'], port['id']
- )
- else:
- if port['id'] in [p['id'] for p in self.ports]:
- self.client.delete_port(port['id'])
- self._remove_from_list_by_index(self.ports, port)
- body = self.client.list_subnets()
- subnets = body['subnets']
- for subnet in subnets:
- if subnet['id'] in [s['id'] for s in self.subnets]:
- self.client.delete_subnet(subnet['id'])
- self._remove_from_list_by_index(self.subnets, subnet)
- body = self.client.list_routers()
- routers = body['routers']
- for router in routers:
- if router['id'] in [r['id'] for r in self.routers]:
- self.client.delete_router(router['id'])
- self._remove_from_list_by_index(self.routers, router)
-
- def _get_ips_from_subnet(self, **kwargs):
- subnet = self.create_subnet(self.network, **kwargs)
- port_mac = data_utils.rand_mac_address()
- port = self.create_port(self.network, mac_address=port_mac)
- real_ip = next(iter(port['fixed_ips']), None)['ip_address']
- eui_ip = data_utils.get_ipv6_addr_by_EUI64(subnet['cidr'],
- port_mac).format()
- return real_ip, eui_ip
-
- @test.idempotent_id('e5517e62-6f16-430d-a672-f80875493d4c')
- def test_dhcpv6_stateless_eui64(self):
- """When subnets configured with RAs SLAAC (AOM=100) and DHCP stateless
- (AOM=110) both for radvd and dnsmasq, port shall receive IP address
- calculated from its MAC.
- """
- for ra_mode, add_mode in (
- ('slaac', 'slaac'),
- ('dhcpv6-stateless', 'dhcpv6-stateless'),
- ):
- kwargs = {'ipv6_ra_mode': ra_mode,
- 'ipv6_address_mode': add_mode}
- real_ip, eui_ip = self._get_ips_from_subnet(**kwargs)
- self._clean_network()
- self.assertEqual(eui_ip, real_ip,
- ('Real port IP is %s, but shall be %s when '
- 'ipv6_ra_mode=%s and ipv6_address_mode=%s') % (
- real_ip, eui_ip, ra_mode, add_mode))
-
- @test.idempotent_id('ae2f4a5d-03ff-4c42-a3b0-ce2fcb7ea832')
- def test_dhcpv6_stateless_no_ra(self):
- """When subnets configured with dnsmasq SLAAC and DHCP stateless
- and there is no radvd, port shall receive IP address calculated
- from its MAC and mask of subnet.
- """
- for ra_mode, add_mode in (
- (None, 'slaac'),
- (None, 'dhcpv6-stateless'),
- ):
- kwargs = {'ipv6_ra_mode': ra_mode,
- 'ipv6_address_mode': add_mode}
- kwargs = {k: v for k, v in kwargs.iteritems() if v}
- real_ip, eui_ip = self._get_ips_from_subnet(**kwargs)
- self._clean_network()
- self.assertEqual(eui_ip, real_ip,
- ('Real port IP %s shall be equal to EUI-64 %s'
- 'when ipv6_ra_mode=%s,ipv6_address_mode=%s') % (
- real_ip, eui_ip,
- ra_mode if ra_mode else "Off",
- add_mode if add_mode else "Off"))
-
- @test.idempotent_id('81f18ef6-95b5-4584-9966-10d480b7496a')
- def test_dhcpv6_invalid_options(self):
- """Different configurations for radvd and dnsmasq are not allowed"""
- for ra_mode, add_mode in (
- ('dhcpv6-stateless', 'dhcpv6-stateful'),
- ('dhcpv6-stateless', 'slaac'),
- ('slaac', 'dhcpv6-stateful'),
- ('dhcpv6-stateful', 'dhcpv6-stateless'),
- ('dhcpv6-stateful', 'slaac'),
- ('slaac', 'dhcpv6-stateless'),
- ):
- kwargs = {'ipv6_ra_mode': ra_mode,
- 'ipv6_address_mode': add_mode}
- self.assertRaises(lib_exc.BadRequest,
- self.create_subnet,
- self.network,
- **kwargs)
-
- @test.idempotent_id('21635b6f-165a-4d42-bf49-7d195e47342f')
- def test_dhcpv6_stateless_no_ra_no_dhcp(self):
- """If no radvd option and no dnsmasq option is configured
- port shall receive IP from fixed IPs list of subnet.
- """
- real_ip, eui_ip = self._get_ips_from_subnet()
- self._clean_network()
- self.assertNotEqual(eui_ip, real_ip,
- ('Real port IP %s equal to EUI-64 %s when '
- 'ipv6_ra_mode=Off and ipv6_address_mode=Off,'
- 'but shall be taken from fixed IPs') % (
- real_ip, eui_ip))
-
- @test.idempotent_id('4544adf7-bb5f-4bdc-b769-b3e77026cef2')
- def test_dhcpv6_two_subnets(self):
- """When one IPv6 subnet configured with dnsmasq SLAAC or DHCP stateless
- and other IPv6 is with DHCP stateful, port shall receive EUI-64 IP
- addresses from first subnet and DHCP address from second one.
- Order of subnet creating should be unimportant.
- """
- for order in ("slaac_first", "dhcp_first"):
- for ra_mode, add_mode in (
- ('slaac', 'slaac'),
- ('dhcpv6-stateless', 'dhcpv6-stateless'),
- ):
- kwargs = {'ipv6_ra_mode': ra_mode,
- 'ipv6_address_mode': add_mode}
- kwargs_dhcp = {'ipv6_address_mode': 'dhcpv6-stateful'}
- if order == "slaac_first":
- subnet_slaac = self.create_subnet(self.network, **kwargs)
- subnet_dhcp = self.create_subnet(
- self.network, **kwargs_dhcp)
- else:
- subnet_dhcp = self.create_subnet(
- self.network, **kwargs_dhcp)
- subnet_slaac = self.create_subnet(self.network, **kwargs)
- port_mac = data_utils.rand_mac_address()
- dhcp_ip = subnet_dhcp["allocation_pools"][0]["start"]
- eui_ip = data_utils.get_ipv6_addr_by_EUI64(
- subnet_slaac['cidr'],
- port_mac
- ).format()
- # TODO(sergsh): remove this when 1219795 is fixed
- dhcp_ip = [dhcp_ip, (netaddr.IPAddress(dhcp_ip) + 1).format()]
- port = self.create_port(self.network, mac_address=port_mac)
- real_ips = dict([(k['subnet_id'], k['ip_address'])
- for k in port['fixed_ips']])
- real_dhcp_ip, real_eui_ip = [real_ips[sub['id']]
- for sub in subnet_dhcp,
- subnet_slaac]
- self.client.delete_port(port['id'])
- self.ports.pop()
- body = self.client.list_ports()
- ports_id_list = [i['id'] for i in body['ports']]
- self.assertNotIn(port['id'], ports_id_list)
- self._clean_network()
- self.assertEqual(real_eui_ip,
- eui_ip,
- 'Real IP is {0}, but shall be {1}'.format(
- real_eui_ip,
- eui_ip))
- self.assertIn(
- real_dhcp_ip, dhcp_ip,
- 'Real IP is {0}, but shall be one from {1}'.format(
- real_dhcp_ip,
- str(dhcp_ip)))
-
- @test.idempotent_id('4256c61d-c538-41ea-9147-3c450c36669e')
- def test_dhcpv6_64_subnets(self):
- """When one IPv6 subnet configured with dnsmasq SLAAC or DHCP stateless
- and other IPv4 is with DHCP of IPv4, port shall receive EUI-64 IP
- addresses from first subnet and IPv4 DHCP address from second one.
- Order of subnet creating should be unimportant.
- """
- for order in ("slaac_first", "dhcp_first"):
- for ra_mode, add_mode in (
- ('slaac', 'slaac'),
- ('dhcpv6-stateless', 'dhcpv6-stateless'),
- ):
- kwargs = {'ipv6_ra_mode': ra_mode,
- 'ipv6_address_mode': add_mode}
- if order == "slaac_first":
- subnet_slaac = self.create_subnet(self.network, **kwargs)
- subnet_dhcp = self.create_subnet(
- self.network, ip_version=4)
- else:
- subnet_dhcp = self.create_subnet(
- self.network, ip_version=4)
- subnet_slaac = self.create_subnet(self.network, **kwargs)
- port_mac = data_utils.rand_mac_address()
- dhcp_ip = subnet_dhcp["allocation_pools"][0]["start"]
- eui_ip = data_utils.get_ipv6_addr_by_EUI64(
- subnet_slaac['cidr'],
- port_mac
- ).format()
- # TODO(sergsh): remove this when 1219795 is fixed
- dhcp_ip = [dhcp_ip, (netaddr.IPAddress(dhcp_ip) + 1).format()]
- port = self.create_port(self.network, mac_address=port_mac)
- real_ips = dict([(k['subnet_id'], k['ip_address'])
- for k in port['fixed_ips']])
- real_dhcp_ip, real_eui_ip = [real_ips[sub['id']]
- for sub in subnet_dhcp,
- subnet_slaac]
- self._clean_network()
- self.assertTrue({real_eui_ip,
- real_dhcp_ip}.issubset([eui_ip] + dhcp_ip))
- self.assertEqual(real_eui_ip,
- eui_ip,
- 'Real IP is {0}, but shall be {1}'.format(
- real_eui_ip,
- eui_ip))
- self.assertIn(
- real_dhcp_ip, dhcp_ip,
- 'Real IP is {0}, but shall be one from {1}'.format(
- real_dhcp_ip,
- str(dhcp_ip)))
-
- @test.idempotent_id('4ab211a0-276f-4552-9070-51e27f58fecf')
- def test_dhcp_stateful(self):
- """With all options below, DHCPv6 shall allocate first
- address from subnet pool to port.
- """
- for ra_mode, add_mode in (
- ('dhcpv6-stateful', 'dhcpv6-stateful'),
- ('dhcpv6-stateful', None),
- (None, 'dhcpv6-stateful'),
- ):
- kwargs = {'ipv6_ra_mode': ra_mode,
- 'ipv6_address_mode': add_mode}
- kwargs = {k: v for k, v in kwargs.iteritems() if v}
- subnet = self.create_subnet(self.network, **kwargs)
- port = self.create_port(self.network)
- port_ip = next(iter(port['fixed_ips']), None)['ip_address']
- dhcp_ip = subnet["allocation_pools"][0]["start"]
- # TODO(sergsh): remove this when 1219795 is fixed
- dhcp_ip = [dhcp_ip, (netaddr.IPAddress(dhcp_ip) + 1).format()]
- self._clean_network()
- self.assertIn(
- port_ip, dhcp_ip,
- 'Real IP is {0}, but shall be one from {1}'.format(
- port_ip,
- str(dhcp_ip)))
-
- @test.idempotent_id('51a5e97f-f02e-4e4e-9a17-a69811d300e3')
- def test_dhcp_stateful_fixedips(self):
- """With all options below, port shall be able to get
- requested IP from fixed IP range not depending on
- DHCP stateful (not SLAAC!) settings configured.
- """
- for ra_mode, add_mode in (
- ('dhcpv6-stateful', 'dhcpv6-stateful'),
- ('dhcpv6-stateful', None),
- (None, 'dhcpv6-stateful'),
- ):
- kwargs = {'ipv6_ra_mode': ra_mode,
- 'ipv6_address_mode': add_mode}
- kwargs = {k: v for k, v in kwargs.iteritems() if v}
- subnet = self.create_subnet(self.network, **kwargs)
- ip_range = netaddr.IPRange(subnet["allocation_pools"][0]["start"],
- subnet["allocation_pools"][0]["end"])
- ip = netaddr.IPAddress(random.randrange(ip_range.first,
- ip_range.last)).format()
- port = self.create_port(self.network,
- fixed_ips=[{'subnet_id': subnet['id'],
- 'ip_address': ip}])
- port_ip = next(iter(port['fixed_ips']), None)['ip_address']
- self._clean_network()
- self.assertEqual(port_ip, ip,
- ("Port IP %s is not as fixed IP from "
- "port create request: %s") % (
- port_ip, ip))
-
- @test.idempotent_id('98244d88-d990-4570-91d4-6b25d70d08af')
- def test_dhcp_stateful_fixedips_outrange(self):
- """When port gets IP address from fixed IP range it
- shall be checked if it's from subnets range.
- """
- kwargs = {'ipv6_ra_mode': 'dhcpv6-stateful',
- 'ipv6_address_mode': 'dhcpv6-stateful'}
- subnet = self.create_subnet(self.network, **kwargs)
- ip_range = netaddr.IPRange(subnet["allocation_pools"][0]["start"],
- subnet["allocation_pools"][0]["end"])
- ip = netaddr.IPAddress(random.randrange(
- ip_range.last + 1, ip_range.last + 10)).format()
- self.assertRaises(lib_exc.BadRequest,
- self.create_port,
- self.network,
- fixed_ips=[{'subnet_id': subnet['id'],
- 'ip_address': ip}])
-
- @test.idempotent_id('57b8302b-cba9-4fbb-8835-9168df029051')
- def test_dhcp_stateful_fixedips_duplicate(self):
- """When port gets IP address from fixed IP range it
- shall be checked if it's not duplicate.
- """
- kwargs = {'ipv6_ra_mode': 'dhcpv6-stateful',
- 'ipv6_address_mode': 'dhcpv6-stateful'}
- subnet = self.create_subnet(self.network, **kwargs)
- ip_range = netaddr.IPRange(subnet["allocation_pools"][0]["start"],
- subnet["allocation_pools"][0]["end"])
- ip = netaddr.IPAddress(random.randrange(
- ip_range.first, ip_range.last)).format()
- self.create_port(self.network,
- fixed_ips=[
- {'subnet_id': subnet['id'],
- 'ip_address': ip}])
- self.assertRaisesRegexp(lib_exc.Conflict,
- "object with that identifier already exists",
- self.create_port,
- self.network,
- fixed_ips=[{'subnet_id': subnet['id'],
- 'ip_address': ip}])
-
- def _create_subnet_router(self, kwargs):
- subnet = self.create_subnet(self.network, **kwargs)
- router = self.create_router(
- router_name=data_utils.rand_name("routerv6-"),
- admin_state_up=True)
- port = self.create_router_interface(router['id'],
- subnet['id'])
- body = self.client.show_port(port['port_id'])
- return subnet, body['port']
-
- @test.idempotent_id('e98f65db-68f4-4330-9fea-abd8c5192d4d')
- def test_dhcp_stateful_router(self):
- """With all options below the router interface shall
- receive DHCPv6 IP address from allocation pool.
- """
- for ra_mode, add_mode in (
- ('dhcpv6-stateful', 'dhcpv6-stateful'),
- ('dhcpv6-stateful', None),
- ):
- kwargs = {'ipv6_ra_mode': ra_mode,
- 'ipv6_address_mode': add_mode}
- kwargs = {k: v for k, v in kwargs.iteritems() if v}
- subnet, port = self._create_subnet_router(kwargs)
- port_ip = next(iter(port['fixed_ips']), None)['ip_address']
- self._clean_network()
- self.assertEqual(port_ip, subnet['gateway_ip'],
- ("Port IP %s is not as first IP from "
- "subnets allocation pool: %s") % (
- port_ip, subnet['gateway_ip']))
-
- def tearDown(self):
- self._clean_network()
- super(NetworksTestDHCPv6, self).tearDown()
diff --git a/neutron/tests/tempest/api/network/test_extensions.py b/neutron/tests/tempest/api/network/test_extensions.py
deleted file mode 100644
index 455d7e2..0000000
--- a/neutron/tests/tempest/api/network/test_extensions.py
+++ /dev/null
@@ -1,75 +0,0 @@
-# Copyright 2013 OpenStack, Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest import test
-
-
-class ExtensionsTestJSON(base.BaseNetworkTest):
-
- """
- Tests the following operations in the Neutron API using the REST client for
- Neutron:
-
- List all available extensions
-
- v2.0 of the Neutron API is assumed. It is also assumed that the following
- options are defined in the [network] section of etc/tempest.conf:
-
- """
-
- @classmethod
- def resource_setup(cls):
- super(ExtensionsTestJSON, cls).resource_setup()
-
- @test.attr(type='smoke')
- @test.idempotent_id('ef28c7e6-e646-4979-9d67-deb207bc5564')
- def test_list_show_extensions(self):
- # List available extensions for the tenant
- expected_alias = ['security-group', 'l3_agent_scheduler',
- 'ext-gw-mode', 'binding', 'quotas',
- 'agent', 'dhcp_agent_scheduler', 'provider',
- 'router', 'extraroute', 'external-net',
- 'allowed-address-pairs', 'extra_dhcp_opt']
- expected_alias = [ext for ext in expected_alias if
- test.is_extension_enabled(ext, 'network')]
- actual_alias = list()
- extensions = self.client.list_extensions()
- list_extensions = extensions['extensions']
- # Show and verify the details of the available extensions
- for ext in list_extensions:
- ext_name = ext['name']
- ext_alias = ext['alias']
- actual_alias.append(ext['alias'])
- ext_details = self.client.show_extension(ext_alias)
- ext_details = ext_details['extension']
-
- self.assertIsNotNone(ext_details)
- self.assertIn('updated', ext_details.keys())
- self.assertIn('name', ext_details.keys())
- self.assertIn('description', ext_details.keys())
- self.assertIn('namespace', ext_details.keys())
- self.assertIn('links', ext_details.keys())
- self.assertIn('alias', ext_details.keys())
- self.assertEqual(ext_details['name'], ext_name)
- self.assertEqual(ext_details['alias'], ext_alias)
- self.assertEqual(ext_details, ext)
- # Verify if expected extensions are present in the actual list
- # of extensions returned, but only for those that have been
- # enabled via configuration
- for e in expected_alias:
- if test.is_extension_enabled(e, 'network'):
- self.assertIn(e, actual_alias)
diff --git a/neutron/tests/tempest/api/network/test_extra_dhcp_options.py b/neutron/tests/tempest/api/network/test_extra_dhcp_options.py
deleted file mode 100644
index 1907ccf..0000000
--- a/neutron/tests/tempest/api/network/test_extra_dhcp_options.py
+++ /dev/null
@@ -1,101 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest_lib.common.utils import data_utils
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest import test
-
-
-class ExtraDHCPOptionsTestJSON(base.BaseNetworkTest):
-
- """
- Tests the following operations with the Extra DHCP Options Neutron API
- extension:
-
- port create
- port list
- port show
- port update
-
- v2.0 of the Neutron API is assumed. It is also assumed that the Extra
- DHCP Options extension is enabled in the [network-feature-enabled]
- section of etc/tempest.conf
- """
-
- @classmethod
- def resource_setup(cls):
- super(ExtraDHCPOptionsTestJSON, cls).resource_setup()
- if not test.is_extension_enabled('extra_dhcp_opt', 'network'):
- msg = "Extra DHCP Options extension not enabled."
- raise cls.skipException(msg)
- cls.network = cls.create_network()
- cls.subnet = cls.create_subnet(cls.network)
- cls.port = cls.create_port(cls.network)
- cls.ip_tftp = ('123.123.123.123' if cls._ip_version == 4
- else '2015::dead')
- cls.ip_server = ('123.123.123.45' if cls._ip_version == 4
- else '2015::badd')
- cls.extra_dhcp_opts = [
- {'opt_value': 'pxelinux.0', 'opt_name': 'bootfile-name'},
- {'opt_value': cls.ip_tftp, 'opt_name': 'tftp-server'},
- {'opt_value': cls.ip_server, 'opt_name': 'server-ip-address'}
- ]
-
- @test.attr(type='smoke')
- @test.idempotent_id('d2c17063-3767-4a24-be4f-a23dbfa133c9')
- def test_create_list_port_with_extra_dhcp_options(self):
- # Create a port with Extra DHCP Options
- body = self.client.create_port(
- network_id=self.network['id'],
- extra_dhcp_opts=self.extra_dhcp_opts)
- port_id = body['port']['id']
- self.addCleanup(self.client.delete_port, port_id)
-
- # Confirm port created has Extra DHCP Options
- body = self.client.list_ports()
- ports = body['ports']
- port = [p for p in ports if p['id'] == port_id]
- self.assertTrue(port)
- self._confirm_extra_dhcp_options(port[0], self.extra_dhcp_opts)
-
- @test.attr(type='smoke')
- @test.idempotent_id('9a6aebf4-86ee-4f47-b07a-7f7232c55607')
- def test_update_show_port_with_extra_dhcp_options(self):
- # Update port with extra dhcp options
- name = data_utils.rand_name('new-port-name')
- body = self.client.update_port(
- self.port['id'],
- name=name,
- extra_dhcp_opts=self.extra_dhcp_opts)
- # Confirm extra dhcp options were added to the port
- body = self.client.show_port(self.port['id'])
- self._confirm_extra_dhcp_options(body['port'], self.extra_dhcp_opts)
-
- def _confirm_extra_dhcp_options(self, port, extra_dhcp_opts):
- retrieved = port['extra_dhcp_opts']
- self.assertEqual(len(retrieved), len(extra_dhcp_opts))
- for retrieved_option in retrieved:
- for option in extra_dhcp_opts:
- if (retrieved_option['opt_value'] == option['opt_value'] and
- retrieved_option['opt_name'] == option['opt_name']):
- break
- else:
- self.fail('Extra DHCP option not found in port %s' %
- str(retrieved_option))
-
-
-class ExtraDHCPOptionsIpV6TestJSON(ExtraDHCPOptionsTestJSON):
- _ip_version = 6
diff --git a/neutron/tests/tempest/api/network/test_floating_ips.py b/neutron/tests/tempest/api/network/test_floating_ips.py
deleted file mode 100644
index 3fb652d..0000000
--- a/neutron/tests/tempest/api/network/test_floating_ips.py
+++ /dev/null
@@ -1,219 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import netaddr
-from tempest_lib.common.utils import data_utils
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest import config
-from neutron.tests.tempest import test
-
-CONF = config.CONF
-
-
-class FloatingIPTestJSON(base.BaseNetworkTest):
-
- """
- Tests the following operations in the Quantum API using the REST client for
- Neutron:
-
- Create a Floating IP
- Update a Floating IP
- Delete a Floating IP
- List all Floating IPs
- Show Floating IP details
- Associate a Floating IP with a port and then delete that port
- Associate a Floating IP with a port and then with a port on another
- router
-
- v2.0 of the Neutron API is assumed. It is also assumed that the following
- options are defined in the [network] section of etc/tempest.conf:
-
- public_network_id which is the id for the external network present
- """
-
- @classmethod
- def resource_setup(cls):
- super(FloatingIPTestJSON, cls).resource_setup()
- if not test.is_extension_enabled('router', 'network'):
- msg = "router extension not enabled."
- raise cls.skipException(msg)
- cls.ext_net_id = CONF.network.public_network_id
-
- # Create network, subnet, router and add interface
- cls.network = cls.create_network()
- cls.subnet = cls.create_subnet(cls.network)
- cls.router = cls.create_router(data_utils.rand_name('router-'),
- external_network_id=cls.ext_net_id)
- cls.create_router_interface(cls.router['id'], cls.subnet['id'])
- cls.port = list()
- # Create two ports one each for Creation and Updating of floatingIP
- for i in range(2):
- cls.create_port(cls.network)
-
- @test.attr(type='smoke')
- @test.idempotent_id('62595970-ab1c-4b7f-8fcc-fddfe55e8718')
- def test_create_list_show_update_delete_floating_ip(self):
- # Creates a floating IP
- body = self.client.create_floatingip(
- floating_network_id=self.ext_net_id,
- port_id=self.ports[0]['id'])
- created_floating_ip = body['floatingip']
- self.addCleanup(self.client.delete_floatingip,
- created_floating_ip['id'])
- self.assertIsNotNone(created_floating_ip['id'])
- self.assertIsNotNone(created_floating_ip['tenant_id'])
- self.assertIsNotNone(created_floating_ip['floating_ip_address'])
- self.assertEqual(created_floating_ip['port_id'], self.ports[0]['id'])
- self.assertEqual(created_floating_ip['floating_network_id'],
- self.ext_net_id)
- self.assertIn(created_floating_ip['fixed_ip_address'],
- [ip['ip_address'] for ip in self.ports[0]['fixed_ips']])
- # Verifies the details of a floating_ip
- floating_ip = self.client.show_floatingip(created_floating_ip['id'])
- shown_floating_ip = floating_ip['floatingip']
- self.assertEqual(shown_floating_ip['id'], created_floating_ip['id'])
- self.assertEqual(shown_floating_ip['floating_network_id'],
- self.ext_net_id)
- self.assertEqual(shown_floating_ip['tenant_id'],
- created_floating_ip['tenant_id'])
- self.assertEqual(shown_floating_ip['floating_ip_address'],
- created_floating_ip['floating_ip_address'])
- self.assertEqual(shown_floating_ip['port_id'], self.ports[0]['id'])
-
- # Verify the floating ip exists in the list of all floating_ips
- floating_ips = self.client.list_floatingips()
- floatingip_id_list = list()
- for f in floating_ips['floatingips']:
- floatingip_id_list.append(f['id'])
- self.assertIn(created_floating_ip['id'], floatingip_id_list)
- # Associate floating IP to the other port
- floating_ip = self.client.update_floatingip(
- created_floating_ip['id'],
- port_id=self.ports[1]['id'])
- updated_floating_ip = floating_ip['floatingip']
- self.assertEqual(updated_floating_ip['port_id'], self.ports[1]['id'])
- self.assertEqual(updated_floating_ip['fixed_ip_address'],
- self.ports[1]['fixed_ips'][0]['ip_address'])
- self.assertEqual(updated_floating_ip['router_id'], self.router['id'])
-
- # Disassociate floating IP from the port
- floating_ip = self.client.update_floatingip(
- created_floating_ip['id'],
- port_id=None)
- updated_floating_ip = floating_ip['floatingip']
- self.assertIsNone(updated_floating_ip['port_id'])
- self.assertIsNone(updated_floating_ip['fixed_ip_address'])
- self.assertIsNone(updated_floating_ip['router_id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('e1f6bffd-442f-4668-b30e-df13f2705e77')
- def test_floating_ip_delete_port(self):
- # Create a floating IP
- body = self.client.create_floatingip(
- floating_network_id=self.ext_net_id)
- created_floating_ip = body['floatingip']
- self.addCleanup(self.client.delete_floatingip,
- created_floating_ip['id'])
- # Create a port
- port = self.client.create_port(network_id=self.network['id'])
- created_port = port['port']
- floating_ip = self.client.update_floatingip(
- created_floating_ip['id'],
- port_id=created_port['id'])
- # Delete port
- self.client.delete_port(created_port['id'])
- # Verifies the details of the floating_ip
- floating_ip = self.client.show_floatingip(created_floating_ip['id'])
- shown_floating_ip = floating_ip['floatingip']
- # Confirm the fields are back to None
- self.assertEqual(shown_floating_ip['id'], created_floating_ip['id'])
- self.assertIsNone(shown_floating_ip['port_id'])
- self.assertIsNone(shown_floating_ip['fixed_ip_address'])
- self.assertIsNone(shown_floating_ip['router_id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('1bb2f731-fe5a-4b8c-8409-799ade1bed4d')
- def test_floating_ip_update_different_router(self):
- # Associate a floating IP to a port on a router
- body = self.client.create_floatingip(
- floating_network_id=self.ext_net_id,
- port_id=self.ports[1]['id'])
- created_floating_ip = body['floatingip']
- self.addCleanup(self.client.delete_floatingip,
- created_floating_ip['id'])
- self.assertEqual(created_floating_ip['router_id'], self.router['id'])
- network2 = self.create_network()
- subnet2 = self.create_subnet(network2)
- router2 = self.create_router(data_utils.rand_name('router-'),
- external_network_id=self.ext_net_id)
- self.create_router_interface(router2['id'], subnet2['id'])
- port_other_router = self.create_port(network2)
- # Associate floating IP to the other port on another router
- floating_ip = self.client.update_floatingip(
- created_floating_ip['id'],
- port_id=port_other_router['id'])
- updated_floating_ip = floating_ip['floatingip']
- self.assertEqual(updated_floating_ip['router_id'], router2['id'])
- self.assertEqual(updated_floating_ip['port_id'],
- port_other_router['id'])
- self.assertIsNotNone(updated_floating_ip['fixed_ip_address'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('36de4bd0-f09c-43e3-a8e1-1decc1ffd3a5')
- def test_create_floating_ip_specifying_a_fixed_ip_address(self):
- body = self.client.create_floatingip(
- floating_network_id=self.ext_net_id,
- port_id=self.ports[1]['id'],
- fixed_ip_address=self.ports[1]['fixed_ips'][0]['ip_address'])
- created_floating_ip = body['floatingip']
- self.addCleanup(self.client.delete_floatingip,
- created_floating_ip['id'])
- self.assertIsNotNone(created_floating_ip['id'])
- self.assertEqual(created_floating_ip['fixed_ip_address'],
- self.ports[1]['fixed_ips'][0]['ip_address'])
- floating_ip = self.client.update_floatingip(
- created_floating_ip['id'],
- port_id=None)
- self.assertIsNone(floating_ip['floatingip']['port_id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('45c4c683-ea97-41ef-9c51-5e9802f2f3d7')
- def test_create_update_floatingip_with_port_multiple_ip_address(self):
- # Find out ips that can be used for tests
- ips = list(netaddr.IPNetwork(self.subnet['cidr']))
- list_ips = [str(ip) for ip in ips[-3:-1]]
- fixed_ips = [{'ip_address': list_ips[0]}, {'ip_address': list_ips[1]}]
- # Create port
- body = self.client.create_port(network_id=self.network['id'],
- fixed_ips=fixed_ips)
- port = body['port']
- self.addCleanup(self.client.delete_port, port['id'])
- # Create floating ip
- body = self.client.create_floatingip(
- floating_network_id=self.ext_net_id,
- port_id=port['id'],
- fixed_ip_address=list_ips[0])
- floating_ip = body['floatingip']
- self.addCleanup(self.client.delete_floatingip, floating_ip['id'])
- self.assertIsNotNone(floating_ip['id'])
- self.assertEqual(floating_ip['fixed_ip_address'], list_ips[0])
- # Update floating ip
- body = self.client.update_floatingip(floating_ip['id'],
- port_id=port['id'],
- fixed_ip_address=list_ips[1])
- update_floating_ip = body['floatingip']
- self.assertEqual(update_floating_ip['fixed_ip_address'],
- list_ips[1])
diff --git a/neutron/tests/tempest/api/network/test_floating_ips_negative.py b/neutron/tests/tempest/api/network/test_floating_ips_negative.py
deleted file mode 100644
index 6037a84..0000000
--- a/neutron/tests/tempest/api/network/test_floating_ips_negative.py
+++ /dev/null
@@ -1,82 +0,0 @@
-# Copyright 2014 Hewlett-Packard Development Company, L.P.
-# Copyright 2014 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest_lib.common.utils import data_utils
-from tempest_lib import exceptions as lib_exc
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest import config
-from neutron.tests.tempest import test
-
-CONF = config.CONF
-
-
-class FloatingIPNegativeTestJSON(base.BaseNetworkTest):
-
- """
- Test the following negative operations for floating ips:
-
- Create floatingip with a port that is unreachable to external network
- Create floatingip in private network
- Associate floatingip with port that is unreachable to external network
- """
-
- @classmethod
- def resource_setup(cls):
- super(FloatingIPNegativeTestJSON, cls).resource_setup()
- if not test.is_extension_enabled('router', 'network'):
- msg = "router extension not enabled."
- raise cls.skipException(msg)
- cls.ext_net_id = CONF.network.public_network_id
- # Create a network with a subnet connected to a router.
- cls.network = cls.create_network()
- cls.subnet = cls.create_subnet(cls.network)
- cls.router = cls.create_router(data_utils.rand_name('router'))
- cls.create_router_interface(cls.router['id'], cls.subnet['id'])
- cls.port = cls.create_port(cls.network)
-
- @test.attr(type=['negative', 'smoke'])
- @test.idempotent_id('22996ea8-4a81-4b27-b6e1-fa5df92fa5e8')
- def test_create_floatingip_with_port_ext_net_unreachable(self):
- self.assertRaises(lib_exc.NotFound, self.client.create_floatingip,
- floating_network_id=self.ext_net_id,
- port_id=self.port['id'],
- fixed_ip_address=self.port['fixed_ips'][0]
- ['ip_address'])
-
- @test.attr(type=['negative', 'smoke'])
- @test.idempotent_id('50b9aeb4-9f0b-48ee-aa31-fa955a48ff54')
- def test_create_floatingip_in_private_network(self):
- self.assertRaises(lib_exc.BadRequest,
- self.client.create_floatingip,
- floating_network_id=self.network['id'],
- port_id=self.port['id'],
- fixed_ip_address=self.port['fixed_ips'][0]
- ['ip_address'])
-
- @test.attr(type=['negative', 'smoke'])
- @test.idempotent_id('6b3b8797-6d43-4191-985c-c48b773eb429')
- def test_associate_floatingip_port_ext_net_unreachable(self):
- # Create floating ip
- body = self.client.create_floatingip(
- floating_network_id=self.ext_net_id)
- floating_ip = body['floatingip']
- self.addCleanup(self.client.delete_floatingip, floating_ip['id'])
- # Associate floating IP to the other port
- self.assertRaises(lib_exc.NotFound, self.client.update_floatingip,
- floating_ip['id'], port_id=self.port['id'],
- fixed_ip_address=self.port['fixed_ips'][0]
- ['ip_address'])
diff --git a/neutron/tests/tempest/api/network/test_fwaas_extensions.py b/neutron/tests/tempest/api/network/test_fwaas_extensions.py
deleted file mode 100644
index 5989c0a..0000000
--- a/neutron/tests/tempest/api/network/test_fwaas_extensions.py
+++ /dev/null
@@ -1,325 +0,0 @@
-# Copyright 2014 NEC Corporation. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest_lib.common.utils import data_utils
-from tempest_lib import exceptions as lib_exc
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest import config
-from neutron.tests.tempest import exceptions
-from neutron.tests.tempest import test
-
-CONF = config.CONF
-
-
-class FWaaSExtensionTestJSON(base.BaseNetworkTest):
-
- """
- Tests the following operations in the Neutron API using the REST client for
- Neutron:
-
- List firewall rules
- Create firewall rule
- Update firewall rule
- Delete firewall rule
- Show firewall rule
- List firewall policies
- Create firewall policy
- Update firewall policy
- Insert firewall rule to policy
- Remove firewall rule from policy
- Insert firewall rule after/before rule in policy
- Update firewall policy audited attribute
- Delete firewall policy
- Show firewall policy
- List firewall
- Create firewall
- Update firewall
- Delete firewall
- Show firewall
- """
-
- @classmethod
- def resource_setup(cls):
- super(FWaaSExtensionTestJSON, cls).resource_setup()
- if not test.is_extension_enabled('fwaas', 'network'):
- msg = "FWaaS Extension not enabled."
- raise cls.skipException(msg)
- cls.fw_rule = cls.create_firewall_rule("allow", "tcp")
- cls.fw_policy = cls.create_firewall_policy()
-
- def _try_delete_policy(self, policy_id):
- # delete policy, if it exists
- try:
- self.client.delete_firewall_policy(policy_id)
- # if policy is not found, this means it was deleted in the test
- except lib_exc.NotFound:
- pass
-
- def _try_delete_rule(self, rule_id):
- # delete rule, if it exists
- try:
- self.client.delete_firewall_rule(rule_id)
- # if rule is not found, this means it was deleted in the test
- except lib_exc.NotFound:
- pass
-
- def _try_delete_firewall(self, fw_id):
- # delete firewall, if it exists
- try:
- self.client.delete_firewall(fw_id)
- # if firewall is not found, this means it was deleted in the test
- except lib_exc.NotFound:
- pass
-
- self.client.wait_for_resource_deletion('firewall', fw_id)
-
- def _wait_until_ready(self, fw_id):
- target_states = ('ACTIVE', 'CREATED')
-
- def _wait():
- firewall = self.client.show_firewall(fw_id)
- firewall = firewall['firewall']
- return firewall['status'] in target_states
-
- if not test.call_until_true(_wait, CONF.network.build_timeout,
- CONF.network.build_interval):
- m = ("Timed out waiting for firewall %s to reach %s state(s)" %
- (fw_id, target_states))
- raise exceptions.TimeoutException(m)
-
- @test.idempotent_id('1b84cf01-9c09-4ce7-bc72-b15e39076468')
- def test_list_firewall_rules(self):
- # List firewall rules
- fw_rules = self.client.list_firewall_rules()
- fw_rules = fw_rules['firewall_rules']
- self.assertIn((self.fw_rule['id'],
- self.fw_rule['name'],
- self.fw_rule['action'],
- self.fw_rule['protocol'],
- self.fw_rule['ip_version'],
- self.fw_rule['enabled']),
- [(m['id'],
- m['name'],
- m['action'],
- m['protocol'],
- m['ip_version'],
- m['enabled']) for m in fw_rules])
-
- @test.idempotent_id('563564f7-7077-4f5e-8cdc-51f37ae5a2b9')
- def test_create_update_delete_firewall_rule(self):
- # Create firewall rule
- body = self.client.create_firewall_rule(
- name=data_utils.rand_name("fw-rule"),
- action="allow",
- protocol="tcp")
- fw_rule_id = body['firewall_rule']['id']
-
- # Update firewall rule
- body = self.client.update_firewall_rule(fw_rule_id,
- shared=True)
- self.assertTrue(body["firewall_rule"]['shared'])
-
- # Delete firewall rule
- self.client.delete_firewall_rule(fw_rule_id)
- # Confirm deletion
- fw_rules = self.client.list_firewall_rules()
- self.assertNotIn(fw_rule_id,
- [m['id'] for m in fw_rules['firewall_rules']])
-
- @test.idempotent_id('3ff8c08e-26ff-4034-ae48-810ed213a998')
- def test_show_firewall_rule(self):
- # show a created firewall rule
- fw_rule = self.client.show_firewall_rule(self.fw_rule['id'])
- for key, value in fw_rule['firewall_rule'].iteritems():
- self.assertEqual(self.fw_rule[key], value)
-
- @test.idempotent_id('1086dd93-a4c0-4bbb-a1bd-6d4bc62c199f')
- def test_list_firewall_policies(self):
- fw_policies = self.client.list_firewall_policies()
- fw_policies = fw_policies['firewall_policies']
- self.assertIn((self.fw_policy['id'],
- self.fw_policy['name'],
- self.fw_policy['firewall_rules']),
- [(m['id'],
- m['name'],
- m['firewall_rules']) for m in fw_policies])
-
- @test.idempotent_id('bbf37b6c-498c-421e-9c95-45897d3ed775')
- def test_create_update_delete_firewall_policy(self):
- # Create firewall policy
- body = self.client.create_firewall_policy(
- name=data_utils.rand_name("fw-policy"))
- fw_policy_id = body['firewall_policy']['id']
- self.addCleanup(self._try_delete_policy, fw_policy_id)
-
- # Update firewall policy
- body = self.client.update_firewall_policy(fw_policy_id,
- shared=True,
- name="updated_policy")
- updated_fw_policy = body["firewall_policy"]
- self.assertTrue(updated_fw_policy['shared'])
- self.assertEqual("updated_policy", updated_fw_policy['name'])
-
- # Delete firewall policy
- self.client.delete_firewall_policy(fw_policy_id)
- # Confirm deletion
- fw_policies = self.client.list_firewall_policies()
- fw_policies = fw_policies['firewall_policies']
- self.assertNotIn(fw_policy_id, [m['id'] for m in fw_policies])
-
- @test.idempotent_id('1df59b3a-517e-41d4-96f6-fc31cf4ecff2')
- def test_show_firewall_policy(self):
- # show a created firewall policy
- fw_policy = self.client.show_firewall_policy(self.fw_policy['id'])
- fw_policy = fw_policy['firewall_policy']
- for key, value in fw_policy.iteritems():
- self.assertEqual(self.fw_policy[key], value)
-
- @test.idempotent_id('02082a03-3cdd-4789-986a-1327dd80bfb7')
- def test_create_show_delete_firewall(self):
- # Create tenant network resources required for an ACTIVE firewall
- network = self.create_network()
- subnet = self.create_subnet(network)
- router = self.create_router(
- data_utils.rand_name('router-'),
- admin_state_up=True)
- self.client.add_router_interface_with_subnet_id(
- router['id'], subnet['id'])
-
- # Create firewall
- body = self.client.create_firewall(
- name=data_utils.rand_name("firewall"),
- firewall_policy_id=self.fw_policy['id'])
- created_firewall = body['firewall']
- firewall_id = created_firewall['id']
- self.addCleanup(self._try_delete_firewall, firewall_id)
-
- # Wait for the firewall resource to become ready
- self._wait_until_ready(firewall_id)
-
- # show a created firewall
- firewall = self.client.show_firewall(firewall_id)
- firewall = firewall['firewall']
-
- for key, value in firewall.iteritems():
- if key == 'status':
- continue
- self.assertEqual(created_firewall[key], value)
-
- # list firewall
- firewalls = self.client.list_firewalls()
- firewalls = firewalls['firewalls']
- self.assertIn((created_firewall['id'],
- created_firewall['name'],
- created_firewall['firewall_policy_id']),
- [(m['id'],
- m['name'],
- m['firewall_policy_id']) for m in firewalls])
-
- # Delete firewall
- self.client.delete_firewall(firewall_id)
-
- @test.attr(type='smoke')
- @test.idempotent_id('53305b4b-9897-4e01-87c0-2ae386083180')
- def test_firewall_rule_insertion_position_removal_rule_from_policy(self):
- # Create firewall rule
- body = self.client.create_firewall_rule(
- name=data_utils.rand_name("fw-rule"),
- action="allow",
- protocol="tcp")
- fw_rule_id1 = body['firewall_rule']['id']
- self.addCleanup(self._try_delete_rule, fw_rule_id1)
- # Create firewall policy
- body = self.client.create_firewall_policy(
- name=data_utils.rand_name("fw-policy"))
- fw_policy_id = body['firewall_policy']['id']
- self.addCleanup(self._try_delete_policy, fw_policy_id)
-
- # Insert rule to firewall policy
- self.client.insert_firewall_rule_in_policy(
- fw_policy_id, fw_rule_id1, '', '')
-
- # Verify insertion of rule in policy
- self.assertIn(fw_rule_id1, self._get_list_fw_rule_ids(fw_policy_id))
- # Create another firewall rule
- body = self.client.create_firewall_rule(
- name=data_utils.rand_name("fw-rule"),
- action="allow",
- protocol="icmp")
- fw_rule_id2 = body['firewall_rule']['id']
- self.addCleanup(self._try_delete_rule, fw_rule_id2)
-
- # Insert rule to firewall policy after the first rule
- self.client.insert_firewall_rule_in_policy(
- fw_policy_id, fw_rule_id2, fw_rule_id1, '')
-
- # Verify the posiition of rule after insertion
- fw_rule = self.client.show_firewall_rule(
- fw_rule_id2)
-
- self.assertEqual(int(fw_rule['firewall_rule']['position']), 2)
- # Remove rule from the firewall policy
- self.client.remove_firewall_rule_from_policy(
- fw_policy_id, fw_rule_id2)
- # Insert rule to firewall policy before the first rule
- self.client.insert_firewall_rule_in_policy(
- fw_policy_id, fw_rule_id2, '', fw_rule_id1)
- # Verify the posiition of rule after insertion
- fw_rule = self.client.show_firewall_rule(
- fw_rule_id2)
- self.assertEqual(int(fw_rule['firewall_rule']['position']), 1)
- # Remove rule from the firewall policy
- self.client.remove_firewall_rule_from_policy(
- fw_policy_id, fw_rule_id2)
- # Verify removal of rule from firewall policy
- self.assertNotIn(fw_rule_id2, self._get_list_fw_rule_ids(fw_policy_id))
-
- # Remove rule from the firewall policy
- self.client.remove_firewall_rule_from_policy(
- fw_policy_id, fw_rule_id1)
-
- # Verify removal of rule from firewall policy
- self.assertNotIn(fw_rule_id1, self._get_list_fw_rule_ids(fw_policy_id))
-
- def _get_list_fw_rule_ids(self, fw_policy_id):
- fw_policy = self.client.show_firewall_policy(
- fw_policy_id)
- return [ruleid for ruleid in fw_policy['firewall_policy']
- ['firewall_rules']]
-
- @test.idempotent_id('8515ca8a-0d2f-4298-b5ff-6f924e4587ca')
- def test_update_firewall_policy_audited_attribute(self):
- # Create firewall rule
- body = self.client.create_firewall_rule(
- name=data_utils.rand_name("fw-rule"),
- action="allow",
- protocol="icmp")
- fw_rule_id = body['firewall_rule']['id']
- self.addCleanup(self._try_delete_rule, fw_rule_id)
- # Create firewall policy
- body = self.client.create_firewall_policy(
- name=data_utils.rand_name('fw-policy'))
- fw_policy_id = body['firewall_policy']['id']
- self.addCleanup(self._try_delete_policy, fw_policy_id)
- self.assertFalse(body['firewall_policy']['audited'])
- # Update firewall policy audited attribute to ture
- self.client.update_firewall_policy(fw_policy_id,
- audited=True)
- # Insert Firewall rule to firewall policy
- self.client.insert_firewall_rule_in_policy(
- fw_policy_id, fw_rule_id, '', '')
- body = self.client.show_firewall_policy(
- fw_policy_id)
- self.assertFalse(body['firewall_policy']['audited'])
diff --git a/neutron/tests/tempest/api/network/test_load_balancer.py b/neutron/tests/tempest/api/network/test_load_balancer.py
deleted file mode 100644
index 31082fd..0000000
--- a/neutron/tests/tempest/api/network/test_load_balancer.py
+++ /dev/null
@@ -1,453 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest_lib.common.utils import data_utils
-from tempest_lib import decorators
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest import test
-
-
-class LoadBalancerTestJSON(base.BaseNetworkTest):
-
- """
- Tests the following operations in the Neutron API using the REST client for
- Neutron:
-
- create vIP, and Pool
- show vIP
- list vIP
- update vIP
- delete vIP
- update pool
- delete pool
- show pool
- list pool
- health monitoring operations
- """
-
- @classmethod
- def resource_setup(cls):
- super(LoadBalancerTestJSON, cls).resource_setup()
- if not test.is_extension_enabled('lbaas', 'network'):
- msg = "lbaas extension not enabled."
- raise cls.skipException(msg)
- cls.network = cls.create_network()
- cls.name = cls.network['name']
- cls.subnet = cls.create_subnet(cls.network)
- pool_name = data_utils.rand_name('pool-')
- vip_name = data_utils.rand_name('vip-')
- cls.pool = cls.create_pool(pool_name, "ROUND_ROBIN",
- "HTTP", cls.subnet)
- cls.vip = cls.create_vip(name=vip_name,
- protocol="HTTP",
- protocol_port=80,
- subnet=cls.subnet,
- pool=cls.pool)
- cls.member = cls.create_member(80, cls.pool, cls._ip_version)
- cls.member_address = ("10.0.9.47" if cls._ip_version == 4
- else "2015::beef")
- cls.health_monitor = cls.create_health_monitor(delay=4,
- max_retries=3,
- Type="TCP",
- timeout=1)
-
- def _check_list_with_filter(self, obj_name, attr_exceptions, **kwargs):
- create_obj = getattr(self.client, 'create_' + obj_name)
- delete_obj = getattr(self.client, 'delete_' + obj_name)
- list_objs = getattr(self.client, 'list_' + obj_name + 's')
-
- body = create_obj(**kwargs)
- obj = body[obj_name]
- self.addCleanup(delete_obj, obj['id'])
- for key, value in obj.iteritems():
- # It is not relevant to filter by all arguments. That is why
- # there is a list of attr to except
- if key not in attr_exceptions:
- body = list_objs(**{key: value})
- objs = [v[key] for v in body[obj_name + 's']]
- self.assertIn(value, objs)
-
- @test.attr(type='smoke')
- @test.idempotent_id('c96dbfab-4a80-4e74-a535-e950b5bedd47')
- def test_list_vips(self):
- # Verify the vIP exists in the list of all vIPs
- body = self.client.list_vips()
- vips = body['vips']
- self.assertIn(self.vip['id'], [v['id'] for v in vips])
-
- @test.attr(type='smoke')
- @test.idempotent_id('b8853f65-5089-4e69-befd-041a143427ff')
- def test_list_vips_with_filter(self):
- name = data_utils.rand_name('vip-')
- body = self.client.create_pool(name=data_utils.rand_name("pool-"),
- lb_method="ROUND_ROBIN",
- protocol="HTTPS",
- subnet_id=self.subnet['id'])
- pool = body['pool']
- self.addCleanup(self.client.delete_pool, pool['id'])
- attr_exceptions = ['status', 'session_persistence',
- 'status_description']
- self._check_list_with_filter(
- 'vip', attr_exceptions, name=name, protocol="HTTPS",
- protocol_port=81, subnet_id=self.subnet['id'], pool_id=pool['id'],
- description=data_utils.rand_name('description-'),
- admin_state_up=False)
-
- @test.attr(type='smoke')
- @test.idempotent_id('27f56083-9af9-4a48-abe9-ca1bcc6c9035')
- def test_create_update_delete_pool_vip(self):
- # Creates a vip
- name = data_utils.rand_name('vip-')
- address = self.subnet['allocation_pools'][0]['end']
- body = self.client.create_pool(
- name=data_utils.rand_name("pool-"),
- lb_method='ROUND_ROBIN',
- protocol='HTTP',
- subnet_id=self.subnet['id'])
- pool = body['pool']
- body = self.client.create_vip(name=name,
- protocol="HTTP",
- protocol_port=80,
- subnet_id=self.subnet['id'],
- pool_id=pool['id'],
- address=address)
- vip = body['vip']
- vip_id = vip['id']
- # Confirm VIP's address correctness with a show
- body = self.client.show_vip(vip_id)
- vip = body['vip']
- self.assertEqual(address, vip['address'])
- # Verification of vip update
- new_name = "New_vip"
- new_description = "New description"
- persistence_type = "HTTP_COOKIE"
- update_data = {"session_persistence": {
- "type": persistence_type}}
- body = self.client.update_vip(vip_id,
- name=new_name,
- description=new_description,
- connection_limit=10,
- admin_state_up=False,
- **update_data)
- updated_vip = body['vip']
- self.assertEqual(new_name, updated_vip['name'])
- self.assertEqual(new_description, updated_vip['description'])
- self.assertEqual(10, updated_vip['connection_limit'])
- self.assertFalse(updated_vip['admin_state_up'])
- self.assertEqual(persistence_type,
- updated_vip['session_persistence']['type'])
- self.client.delete_vip(vip['id'])
- self.client.wait_for_resource_deletion('vip', vip['id'])
- # Verification of pool update
- new_name = "New_pool"
- body = self.client.update_pool(pool['id'],
- name=new_name,
- description="new_description",
- lb_method='LEAST_CONNECTIONS')
- updated_pool = body['pool']
- self.assertEqual(new_name, updated_pool['name'])
- self.assertEqual('new_description', updated_pool['description'])
- self.assertEqual('LEAST_CONNECTIONS', updated_pool['lb_method'])
- self.client.delete_pool(pool['id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('0435a95e-1d19-4d90-9e9f-3b979e9ad089')
- def test_show_vip(self):
- # Verifies the details of a vip
- body = self.client.show_vip(self.vip['id'])
- vip = body['vip']
- for key, value in vip.iteritems():
- # 'status' should not be confirmed in api tests
- if key != 'status':
- self.assertEqual(self.vip[key], value)
-
- @test.attr(type='smoke')
- @test.idempotent_id('6e7a7d31-8451-456d-b24a-e50479ce42a7')
- def test_show_pool(self):
- # Here we need to new pool without any dependence with vips
- body = self.client.create_pool(name=data_utils.rand_name("pool-"),
- lb_method='ROUND_ROBIN',
- protocol='HTTP',
- subnet_id=self.subnet['id'])
- pool = body['pool']
- self.addCleanup(self.client.delete_pool, pool['id'])
- # Verifies the details of a pool
- body = self.client.show_pool(pool['id'])
- shown_pool = body['pool']
- for key, value in pool.iteritems():
- # 'status' should not be confirmed in api tests
- if key != 'status':
- self.assertEqual(value, shown_pool[key])
-
- @test.attr(type='smoke')
- @test.idempotent_id('d1ab1ffa-e06a-487f-911f-56418cb27727')
- def test_list_pools(self):
- # Verify the pool exists in the list of all pools
- body = self.client.list_pools()
- pools = body['pools']
- self.assertIn(self.pool['id'], [p['id'] for p in pools])
-
- @test.attr(type='smoke')
- @test.idempotent_id('27cc4c1a-caac-4273-b983-2acb4afaad4f')
- def test_list_pools_with_filters(self):
- attr_exceptions = ['status', 'vip_id', 'members', 'provider',
- 'status_description']
- self._check_list_with_filter(
- 'pool', attr_exceptions, name=data_utils.rand_name("pool-"),
- lb_method="ROUND_ROBIN", protocol="HTTPS",
- subnet_id=self.subnet['id'],
- description=data_utils.rand_name('description-'),
- admin_state_up=False)
-
- @test.attr(type='smoke')
- @test.idempotent_id('282d0dfd-5c3a-4c9b-b39c-c99782f39193')
- def test_list_members(self):
- # Verify the member exists in the list of all members
- body = self.client.list_members()
- members = body['members']
- self.assertIn(self.member['id'], [m['id'] for m in members])
-
- @test.attr(type='smoke')
- @test.idempotent_id('243b5126-24c6-4879-953e-7c7e32d8a57f')
- def test_list_members_with_filters(self):
- attr_exceptions = ['status', 'status_description']
- self._check_list_with_filter('member', attr_exceptions,
- address=self.member_address,
- protocol_port=80,
- pool_id=self.pool['id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('fb833ee8-9e69-489f-b540-a409762b78b2')
- def test_create_update_delete_member(self):
- # Creates a member
- body = self.client.create_member(address=self.member_address,
- protocol_port=80,
- pool_id=self.pool['id'])
- member = body['member']
- # Verification of member update
- body = self.client.update_member(member['id'],
- admin_state_up=False)
- updated_member = body['member']
- self.assertFalse(updated_member['admin_state_up'])
- # Verification of member delete
- self.client.delete_member(member['id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('893cd71f-a7dd-4485-b162-f6ab9a534914')
- def test_show_member(self):
- # Verifies the details of a member
- body = self.client.show_member(self.member['id'])
- member = body['member']
- for key, value in member.iteritems():
- # 'status' should not be confirmed in api tests
- if key != 'status':
- self.assertEqual(self.member[key], value)
-
- @test.attr(type='smoke')
- @test.idempotent_id('8e5822c5-68a4-4224-8d6c-a617741ebc2d')
- def test_list_health_monitors(self):
- # Verify the health monitor exists in the list of all health monitors
- body = self.client.list_health_monitors()
- health_monitors = body['health_monitors']
- self.assertIn(self.health_monitor['id'],
- [h['id'] for h in health_monitors])
-
- @test.attr(type='smoke')
- @test.idempotent_id('49bac58a-511c-4875-b794-366698211d25')
- def test_list_health_monitors_with_filters(self):
- attr_exceptions = ['status', 'status_description', 'pools']
- self._check_list_with_filter('health_monitor', attr_exceptions,
- delay=5, max_retries=4, type="TCP",
- timeout=2)
-
- @test.attr(type='smoke')
- @test.idempotent_id('e8ce05c4-d554-4d1e-a257-ad32ce134bb5')
- def test_create_update_delete_health_monitor(self):
- # Creates a health_monitor
- body = self.client.create_health_monitor(delay=4,
- max_retries=3,
- type="TCP",
- timeout=1)
- health_monitor = body['health_monitor']
- # Verification of health_monitor update
- body = (self.client.update_health_monitor
- (health_monitor['id'],
- admin_state_up=False))
- updated_health_monitor = body['health_monitor']
- self.assertFalse(updated_health_monitor['admin_state_up'])
- # Verification of health_monitor delete
- body = self.client.delete_health_monitor(health_monitor['id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('d3e1aebc-06c2-49b3-9816-942af54012eb')
- def test_create_health_monitor_http_type(self):
- hm_type = "HTTP"
- body = self.client.create_health_monitor(delay=4,
- max_retries=3,
- type=hm_type,
- timeout=1)
- health_monitor = body['health_monitor']
- self.addCleanup(self.client.delete_health_monitor,
- health_monitor['id'])
- self.assertEqual(hm_type, health_monitor['type'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('0eff9f67-90fb-4bb1-b4ed-c5fda99fff0c')
- def test_update_health_monitor_http_method(self):
- body = self.client.create_health_monitor(delay=4,
- max_retries=3,
- type="HTTP",
- timeout=1)
- health_monitor = body['health_monitor']
- self.addCleanup(self.client.delete_health_monitor,
- health_monitor['id'])
- body = (self.client.update_health_monitor
- (health_monitor['id'],
- http_method="POST",
- url_path="/home/user",
- expected_codes="290"))
- updated_health_monitor = body['health_monitor']
- self.assertEqual("POST", updated_health_monitor['http_method'])
- self.assertEqual("/home/user", updated_health_monitor['url_path'])
- self.assertEqual("290", updated_health_monitor['expected_codes'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('08e126ab-1407-483f-a22e-b11cc032ca7c')
- def test_show_health_monitor(self):
- # Verifies the details of a health_monitor
- body = self.client.show_health_monitor(self.health_monitor['id'])
- health_monitor = body['health_monitor']
- for key, value in health_monitor.iteritems():
- # 'status' should not be confirmed in api tests
- if key != 'status':
- self.assertEqual(self.health_monitor[key], value)
-
- @test.attr(type='smoke')
- @test.idempotent_id('87f7628e-8918-493d-af50-0602845dbb5b')
- def test_associate_disassociate_health_monitor_with_pool(self):
- # Verify that a health monitor can be associated with a pool
- self.client.associate_health_monitor_with_pool(
- self.health_monitor['id'], self.pool['id'])
- body = self.client.show_health_monitor(
- self.health_monitor['id'])
- health_monitor = body['health_monitor']
- body = self.client.show_pool(self.pool['id'])
- pool = body['pool']
- self.assertIn(pool['id'],
- [p['pool_id'] for p in health_monitor['pools']])
- self.assertIn(health_monitor['id'], pool['health_monitors'])
- # Verify that a health monitor can be disassociated from a pool
- (self.client.disassociate_health_monitor_with_pool
- (self.health_monitor['id'], self.pool['id']))
- body = self.client.show_pool(self.pool['id'])
- pool = body['pool']
- body = self.client.show_health_monitor(
- self.health_monitor['id'])
- health_monitor = body['health_monitor']
- self.assertNotIn(health_monitor['id'], pool['health_monitors'])
- self.assertNotIn(pool['id'],
- [p['pool_id'] for p in health_monitor['pools']])
-
- @test.attr(type='smoke')
- @test.idempotent_id('525fc7dc-be24-408d-938d-822e9783e027')
- def test_get_lb_pool_stats(self):
- # Verify the details of pool stats
- body = self.client.list_lb_pool_stats(self.pool['id'])
- stats = body['stats']
- self.assertIn("bytes_in", stats)
- self.assertIn("total_connections", stats)
- self.assertIn("active_connections", stats)
- self.assertIn("bytes_out", stats)
-
- @test.attr(type='smoke')
- @test.idempotent_id('66236be2-5121-4047-8cde-db4b83b110a5')
- def test_update_list_of_health_monitors_associated_with_pool(self):
- (self.client.associate_health_monitor_with_pool
- (self.health_monitor['id'], self.pool['id']))
- self.client.update_health_monitor(
- self.health_monitor['id'], admin_state_up=False)
- body = self.client.show_pool(self.pool['id'])
- health_monitors = body['pool']['health_monitors']
- for health_monitor_id in health_monitors:
- body = self.client.show_health_monitor(health_monitor_id)
- self.assertFalse(body['health_monitor']['admin_state_up'])
- (self.client.disassociate_health_monitor_with_pool
- (self.health_monitor['id'], self.pool['id']))
-
- @test.attr(type='smoke')
- @test.idempotent_id('44ec9b40-b501-41e2-951f-4fc673b15ac0')
- def test_update_admin_state_up_of_pool(self):
- self.client.update_pool(self.pool['id'],
- admin_state_up=False)
- body = self.client.show_pool(self.pool['id'])
- pool = body['pool']
- self.assertFalse(pool['admin_state_up'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('466a9d4c-37c6-4ea2-b807-133437beb48c')
- def test_show_vip_associated_with_pool(self):
- body = self.client.show_pool(self.pool['id'])
- pool = body['pool']
- body = self.client.show_vip(pool['vip_id'])
- vip = body['vip']
- self.assertEqual(self.vip['name'], vip['name'])
- self.assertEqual(self.vip['id'], vip['id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('7b97694e-69d0-4151-b265-e1052a465aa8')
- def test_show_members_associated_with_pool(self):
- body = self.client.show_pool(self.pool['id'])
- members = body['pool']['members']
- for member_id in members:
- body = self.client.show_member(member_id)
- self.assertIsNotNone(body['member']['status'])
- self.assertEqual(member_id, body['member']['id'])
- self.assertIsNotNone(body['member']['admin_state_up'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('73ed6f27-595b-4b2c-969c-dbdda6b8ab34')
- def test_update_pool_related_to_member(self):
- # Create new pool
- body = self.client.create_pool(name=data_utils.rand_name("pool-"),
- lb_method='ROUND_ROBIN',
- protocol='HTTP',
- subnet_id=self.subnet['id'])
- new_pool = body['pool']
- self.addCleanup(self.client.delete_pool, new_pool['id'])
- # Update member with new pool's id
- body = self.client.update_member(self.member['id'],
- pool_id=new_pool['id'])
- # Confirm with show that pool_id change
- body = self.client.show_member(self.member['id'])
- member = body['member']
- self.assertEqual(member['pool_id'], new_pool['id'])
- # Update member with old pool id, this is needed for clean up
- body = self.client.update_member(self.member['id'],
- pool_id=self.pool['id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('cf63f071-bbe3-40ba-97a0-a33e11923162')
- def test_update_member_weight(self):
- self.client.update_member(self.member['id'],
- weight=2)
- body = self.client.show_member(self.member['id'])
- member = body['member']
- self.assertEqual(2, member['weight'])
-
-
-@decorators.skip_because(bug="1402007")
-class LoadBalancerIpV6TestJSON(LoadBalancerTestJSON):
- _ip_version = 6
diff --git a/neutron/tests/tempest/api/network/test_metering_extensions.py b/neutron/tests/tempest/api/network/test_metering_extensions.py
deleted file mode 100644
index 7f2a3fa..0000000
--- a/neutron/tests/tempest/api/network/test_metering_extensions.py
+++ /dev/null
@@ -1,151 +0,0 @@
-# Copyright (C) 2014 eNovance SAS <licensing@enovance.com>
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from oslo_log import log as logging
-from tempest_lib.common.utils import data_utils
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest import test
-
-
-LOG = logging.getLogger(__name__)
-
-
-class MeteringTestJSON(base.BaseAdminNetworkTest):
-
- """
- Tests the following operations in the Neutron API using the REST client for
- Neutron:
-
- List, Show, Create, Delete Metering labels
- List, Show, Create, Delete Metering labels rules
- """
-
- @classmethod
- def resource_setup(cls):
- super(MeteringTestJSON, cls).resource_setup()
- if not test.is_extension_enabled('metering', 'network'):
- msg = "metering extension not enabled."
- raise cls.skipException(msg)
- description = "metering label created by tempest"
- name = data_utils.rand_name("metering-label")
- cls.metering_label = cls.create_metering_label(name, description)
- remote_ip_prefix = ("10.0.0.0/24" if cls._ip_version == 4
- else "fd02::/64")
- direction = "ingress"
- cls.metering_label_rule = cls.create_metering_label_rule(
- remote_ip_prefix, direction,
- metering_label_id=cls.metering_label['id'])
-
- def _delete_metering_label(self, metering_label_id):
- # Deletes a label and verifies if it is deleted or not
- self.admin_client.delete_metering_label(metering_label_id)
- # Asserting that the label is not found in list after deletion
- labels = self.admin_client.list_metering_labels(id=metering_label_id)
- self.assertEqual(len(labels['metering_labels']), 0)
-
- def _delete_metering_label_rule(self, metering_label_rule_id):
- # Deletes a rule and verifies if it is deleted or not
- self.admin_client.delete_metering_label_rule(
- metering_label_rule_id)
- # Asserting that the rule is not found in list after deletion
- rules = (self.admin_client.list_metering_label_rules(
- id=metering_label_rule_id))
- self.assertEqual(len(rules['metering_label_rules']), 0)
-
- @test.attr(type='smoke')
- @test.idempotent_id('e2fb2f8c-45bf-429a-9f17-171c70444612')
- def test_list_metering_labels(self):
- # Verify label filtering
- body = self.admin_client.list_metering_labels(id=33)
- metering_labels = body['metering_labels']
- self.assertEqual(0, len(metering_labels))
-
- @test.attr(type='smoke')
- @test.idempotent_id('ec8e15ff-95d0-433b-b8a6-b466bddb1e50')
- def test_create_delete_metering_label_with_filters(self):
- # Creates a label
- name = data_utils.rand_name('metering-label-')
- description = "label created by tempest"
- body = self.admin_client.create_metering_label(name=name,
- description=description)
- metering_label = body['metering_label']
- self.addCleanup(self._delete_metering_label,
- metering_label['id'])
- # Assert whether created labels are found in labels list or fail
- # if created labels are not found in labels list
- labels = (self.admin_client.list_metering_labels(
- id=metering_label['id']))
- self.assertEqual(len(labels['metering_labels']), 1)
-
- @test.attr(type='smoke')
- @test.idempotent_id('30abb445-0eea-472e-bd02-8649f54a5968')
- def test_show_metering_label(self):
- # Verifies the details of a label
- body = self.admin_client.show_metering_label(self.metering_label['id'])
- metering_label = body['metering_label']
- self.assertEqual(self.metering_label['id'], metering_label['id'])
- self.assertEqual(self.metering_label['tenant_id'],
- metering_label['tenant_id'])
- self.assertEqual(self.metering_label['name'], metering_label['name'])
- self.assertEqual(self.metering_label['description'],
- metering_label['description'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('cc832399-6681-493b-9d79-0202831a1281')
- def test_list_metering_label_rules(self):
- # Verify rule filtering
- body = self.admin_client.list_metering_label_rules(id=33)
- metering_label_rules = body['metering_label_rules']
- self.assertEqual(0, len(metering_label_rules))
-
- @test.attr(type='smoke')
- @test.idempotent_id('f4d547cd-3aee-408f-bf36-454f8825e045')
- def test_create_delete_metering_label_rule_with_filters(self):
- # Creates a rule
- remote_ip_prefix = ("10.0.1.0/24" if self._ip_version == 4
- else "fd03::/64")
- body = (self.admin_client.create_metering_label_rule(
- remote_ip_prefix=remote_ip_prefix,
- direction="ingress",
- metering_label_id=self.metering_label['id']))
- metering_label_rule = body['metering_label_rule']
- self.addCleanup(self._delete_metering_label_rule,
- metering_label_rule['id'])
- # Assert whether created rules are found in rules list or fail
- # if created rules are not found in rules list
- rules = (self.admin_client.list_metering_label_rules(
- id=metering_label_rule['id']))
- self.assertEqual(len(rules['metering_label_rules']), 1)
-
- @test.attr(type='smoke')
- @test.idempotent_id('b7354489-96ea-41f3-9452-bace120fb4a7')
- def test_show_metering_label_rule(self):
- # Verifies the details of a rule
- body = (self.admin_client.show_metering_label_rule(
- self.metering_label_rule['id']))
- metering_label_rule = body['metering_label_rule']
- self.assertEqual(self.metering_label_rule['id'],
- metering_label_rule['id'])
- self.assertEqual(self.metering_label_rule['remote_ip_prefix'],
- metering_label_rule['remote_ip_prefix'])
- self.assertEqual(self.metering_label_rule['direction'],
- metering_label_rule['direction'])
- self.assertEqual(self.metering_label_rule['metering_label_id'],
- metering_label_rule['metering_label_id'])
- self.assertFalse(metering_label_rule['excluded'])
-
-
-class MeteringIpV6TestJSON(MeteringTestJSON):
- _ip_version = 6
diff --git a/neutron/tests/tempest/api/network/test_networks.py b/neutron/tests/tempest/api/network/test_networks.py
deleted file mode 100644
index b838ce9..0000000
--- a/neutron/tests/tempest/api/network/test_networks.py
+++ /dev/null
@@ -1,675 +0,0 @@
-# Copyright 2012 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-import itertools
-
-import netaddr
-from tempest_lib.common.utils import data_utils
-from tempest_lib import exceptions as lib_exc
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest.common import custom_matchers
-from neutron.tests.tempest import config
-from neutron.tests.tempest import test
-
-CONF = config.CONF
-
-
-class NetworksTestJSON(base.BaseNetworkTest):
-
- """
- Tests the following operations in the Neutron API using the REST client for
- Neutron:
-
- create a network for a tenant
- list tenant's networks
- show a tenant network details
- create a subnet for a tenant
- list tenant's subnets
- show a tenant subnet details
- network update
- subnet update
- delete a network also deletes its subnets
- list external networks
-
- All subnet tests are run once with ipv4 and once with ipv6.
-
- v2.0 of the Neutron API is assumed. It is also assumed that the following
- options are defined in the [network] section of etc/tempest.conf:
-
- tenant_network_cidr with a block of cidr's from which smaller blocks
- can be allocated for tenant ipv4 subnets
-
- tenant_network_v6_cidr is the equivalent for ipv6 subnets
-
- tenant_network_mask_bits with the mask bits to be used to partition the
- block defined by tenant_network_cidr
-
- tenant_network_v6_mask_bits is the equivalent for ipv6 subnets
- """
-
- @classmethod
- def resource_setup(cls):
- super(NetworksTestJSON, cls).resource_setup()
- cls.network = cls.create_network()
- cls.name = cls.network['name']
- cls.subnet = cls._create_subnet_with_last_subnet_block(cls.network,
- cls._ip_version)
- cls.cidr = cls.subnet['cidr']
- cls._subnet_data = {6: {'gateway':
- str(cls._get_gateway_from_tempest_conf(6)),
- 'allocation_pools':
- cls._get_allocation_pools_from_gateway(6),
- 'dns_nameservers': ['2001:4860:4860::8844',
- '2001:4860:4860::8888'],
- 'host_routes': [{'destination': '2001::/64',
- 'nexthop': '2003::1'}],
- 'new_host_routes': [{'destination':
- '2001::/64',
- 'nexthop': '2005::1'}],
- 'new_dns_nameservers':
- ['2001:4860:4860::7744',
- '2001:4860:4860::7888']},
- 4: {'gateway':
- str(cls._get_gateway_from_tempest_conf(4)),
- 'allocation_pools':
- cls._get_allocation_pools_from_gateway(4),
- 'dns_nameservers': ['8.8.4.4', '8.8.8.8'],
- 'host_routes': [{'destination': '10.20.0.0/32',
- 'nexthop': '10.100.1.1'}],
- 'new_host_routes': [{'destination':
- '10.20.0.0/32',
- 'nexthop':
- '10.100.1.2'}],
- 'new_dns_nameservers': ['7.8.8.8', '7.8.4.4']}}
-
- @classmethod
- def _create_subnet_with_last_subnet_block(cls, network, ip_version):
- """Derive last subnet CIDR block from tenant CIDR and
- create the subnet with that derived CIDR
- """
- if ip_version == 4:
- cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr)
- mask_bits = CONF.network.tenant_network_mask_bits
- elif ip_version == 6:
- cidr = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr)
- mask_bits = CONF.network.tenant_network_v6_mask_bits
-
- subnet_cidr = list(cidr.subnet(mask_bits))[-1]
- gateway_ip = str(netaddr.IPAddress(subnet_cidr) + 1)
- return cls.create_subnet(network, gateway=gateway_ip,
- cidr=subnet_cidr, mask_bits=mask_bits)
-
- @classmethod
- def _get_gateway_from_tempest_conf(cls, ip_version):
- """Return first subnet gateway for configured CIDR """
- if ip_version == 4:
- cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr)
- mask_bits = CONF.network.tenant_network_mask_bits
- elif ip_version == 6:
- cidr = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr)
- mask_bits = CONF.network.tenant_network_v6_mask_bits
-
- if mask_bits >= cidr.prefixlen:
- return netaddr.IPAddress(cidr) + 1
- else:
- for subnet in cidr.subnet(mask_bits):
- return netaddr.IPAddress(subnet) + 1
-
- @classmethod
- def _get_allocation_pools_from_gateway(cls, ip_version):
- """Return allocation range for subnet of given gateway"""
- gateway = cls._get_gateway_from_tempest_conf(ip_version)
- return [{'start': str(gateway + 2), 'end': str(gateway + 3)}]
-
- def subnet_dict(self, include_keys):
- """Return a subnet dict which has include_keys and their corresponding
- value from self._subnet_data
- """
- return dict((key, self._subnet_data[self._ip_version][key])
- for key in include_keys)
-
- def _compare_resource_attrs(self, actual, expected):
- exclude_keys = set(actual).symmetric_difference(expected)
- self.assertThat(actual, custom_matchers.MatchesDictExceptForKeys(
- expected, exclude_keys))
-
- def _delete_network(self, network):
- # Deleting network also deletes its subnets if exists
- self.client.delete_network(network['id'])
- if network in self.networks:
- self.networks.remove(network)
- for subnet in self.subnets:
- if subnet['network_id'] == network['id']:
- self.subnets.remove(subnet)
-
- def _create_verify_delete_subnet(self, cidr=None, mask_bits=None,
- **kwargs):
- network = self.create_network()
- net_id = network['id']
- gateway = kwargs.pop('gateway', None)
- subnet = self.create_subnet(network, gateway, cidr, mask_bits,
- **kwargs)
- compare_args_full = dict(gateway_ip=gateway, cidr=cidr,
- mask_bits=mask_bits, **kwargs)
- compare_args = dict((k, v) for k, v in compare_args_full.iteritems()
- if v is not None)
-
- if 'dns_nameservers' in set(subnet).intersection(compare_args):
- self.assertEqual(sorted(compare_args['dns_nameservers']),
- sorted(subnet['dns_nameservers']))
- del subnet['dns_nameservers'], compare_args['dns_nameservers']
-
- self._compare_resource_attrs(subnet, compare_args)
- self.client.delete_network(net_id)
- self.networks.pop()
- self.subnets.pop()
-
- @test.attr(type='smoke')
- @test.idempotent_id('0e269138-0da6-4efc-a46d-578161e7b221')
- def test_create_update_delete_network_subnet(self):
- # Create a network
- name = data_utils.rand_name('network-')
- network = self.create_network(network_name=name)
- self.addCleanup(self._delete_network, network)
- net_id = network['id']
- self.assertEqual('ACTIVE', network['status'])
- # Verify network update
- new_name = "New_network"
- body = self.client.update_network(net_id, name=new_name)
- updated_net = body['network']
- self.assertEqual(updated_net['name'], new_name)
- # Find a cidr that is not in use yet and create a subnet with it
- subnet = self.create_subnet(network)
- subnet_id = subnet['id']
- # Verify subnet update
- new_name = "New_subnet"
- body = self.client.update_subnet(subnet_id, name=new_name)
- updated_subnet = body['subnet']
- self.assertEqual(updated_subnet['name'], new_name)
-
- @test.attr(type='smoke')
- @test.idempotent_id('2bf13842-c93f-4a69-83ed-717d2ec3b44e')
- def test_show_network(self):
- # Verify the details of a network
- body = self.client.show_network(self.network['id'])
- network = body['network']
- for key in ['id', 'name', 'mtu']:
- self.assertEqual(network[key], self.network[key])
-
- @test.attr(type='smoke')
- @test.idempotent_id('867819bb-c4b6-45f7-acf9-90edcf70aa5e')
- def test_show_network_fields(self):
- # Verify specific fields of a network
- fields = ['id', 'name', 'mtu']
- body = self.client.show_network(self.network['id'],
- fields=fields)
- network = body['network']
- self.assertEqual(sorted(network.keys()), sorted(fields))
- for field_name in fields:
- self.assertEqual(network[field_name], self.network[field_name])
-
- @test.attr(type='smoke')
- @test.idempotent_id('f7ffdeda-e200-4a7a-bcbe-05716e86bf43')
- def test_list_networks(self):
- # Verify the network exists in the list of all networks
- body = self.client.list_networks()
- networks = [network['id'] for network in body['networks']
- if network['id'] == self.network['id']]
- self.assertNotEmpty(networks, "Created network not found in the list")
-
- @test.attr(type='smoke')
- @test.idempotent_id('6ae6d24f-9194-4869-9c85-c313cb20e080')
- def test_list_networks_fields(self):
- # Verify specific fields of the networks
- fields = ['id', 'name', 'mtu']
- body = self.client.list_networks(fields=fields)
- networks = body['networks']
- self.assertNotEmpty(networks, "Network list returned is empty")
- for network in networks:
- self.assertEqual(sorted(network.keys()), sorted(fields))
-
- @test.attr(type='smoke')
- @test.idempotent_id('bd635d81-6030-4dd1-b3b9-31ba0cfdf6cc')
- def test_show_subnet(self):
- # Verify the details of a subnet
- body = self.client.show_subnet(self.subnet['id'])
- subnet = body['subnet']
- self.assertNotEmpty(subnet, "Subnet returned has no fields")
- for key in ['id', 'cidr']:
- self.assertIn(key, subnet)
- self.assertEqual(subnet[key], self.subnet[key])
-
- @test.attr(type='smoke')
- @test.idempotent_id('270fff0b-8bfc-411f-a184-1e8fd35286f0')
- def test_show_subnet_fields(self):
- # Verify specific fields of a subnet
- fields = ['id', 'network_id']
- body = self.client.show_subnet(self.subnet['id'],
- fields=fields)
- subnet = body['subnet']
- self.assertEqual(sorted(subnet.keys()), sorted(fields))
- for field_name in fields:
- self.assertEqual(subnet[field_name], self.subnet[field_name])
-
- @test.attr(type='smoke')
- @test.idempotent_id('db68ba48-f4ea-49e9-81d1-e367f6d0b20a')
- def test_list_subnets(self):
- # Verify the subnet exists in the list of all subnets
- body = self.client.list_subnets()
- subnets = [subnet['id'] for subnet in body['subnets']
- if subnet['id'] == self.subnet['id']]
- self.assertNotEmpty(subnets, "Created subnet not found in the list")
-
- @test.attr(type='smoke')
- @test.idempotent_id('842589e3-9663-46b0-85e4-7f01273b0412')
- def test_list_subnets_fields(self):
- # Verify specific fields of subnets
- fields = ['id', 'network_id']
- body = self.client.list_subnets(fields=fields)
- subnets = body['subnets']
- self.assertNotEmpty(subnets, "Subnet list returned is empty")
- for subnet in subnets:
- self.assertEqual(sorted(subnet.keys()), sorted(fields))
-
- def _try_delete_network(self, net_id):
- # delete network, if it exists
- try:
- self.client.delete_network(net_id)
- # if network is not found, this means it was deleted in the test
- except lib_exc.NotFound:
- pass
-
- @test.attr(type='smoke')
- @test.idempotent_id('f04f61a9-b7f3-4194-90b2-9bcf660d1bfe')
- def test_delete_network_with_subnet(self):
- # Creates a network
- name = data_utils.rand_name('network-')
- body = self.client.create_network(name=name)
- network = body['network']
- net_id = network['id']
- self.addCleanup(self._try_delete_network, net_id)
-
- # Find a cidr that is not in use yet and create a subnet with it
- subnet = self.create_subnet(network)
- subnet_id = subnet['id']
-
- # Delete network while the subnet still exists
- body = self.client.delete_network(net_id)
-
- # Verify that the subnet got automatically deleted.
- self.assertRaises(lib_exc.NotFound, self.client.show_subnet,
- subnet_id)
-
- # Since create_subnet adds the subnet to the delete list, and it is
- # is actually deleted here - this will create and issue, hence remove
- # it from the list.
- self.subnets.pop()
-
- @test.attr(type='smoke')
- @test.idempotent_id('d2d596e2-8e76-47a9-ac51-d4648009f4d3')
- def test_create_delete_subnet_without_gateway(self):
- self._create_verify_delete_subnet()
-
- @test.attr(type='smoke')
- @test.idempotent_id('9393b468-186d-496d-aa36-732348cd76e7')
- def test_create_delete_subnet_with_gw(self):
- self._create_verify_delete_subnet(
- **self.subnet_dict(['gateway']))
-
- @test.attr(type='smoke')
- @test.idempotent_id('bec949c4-3147-4ba6-af5f-cd2306118404')
- def test_create_delete_subnet_with_allocation_pools(self):
- self._create_verify_delete_subnet(
- **self.subnet_dict(['allocation_pools']))
-
- @test.attr(type='smoke')
- @test.idempotent_id('8217a149-0c6c-4cfb-93db-0486f707d13f')
- def test_create_delete_subnet_with_gw_and_allocation_pools(self):
- self._create_verify_delete_subnet(**self.subnet_dict(
- ['gateway', 'allocation_pools']))
-
- @test.attr(type='smoke')
- @test.idempotent_id('d830de0a-be47-468f-8f02-1fd996118289')
- def test_create_delete_subnet_with_host_routes_and_dns_nameservers(self):
- self._create_verify_delete_subnet(
- **self.subnet_dict(['host_routes', 'dns_nameservers']))
-
- @test.attr(type='smoke')
- @test.idempotent_id('94ce038d-ff0a-4a4c-a56b-09da3ca0b55d')
- def test_create_delete_subnet_with_dhcp_enabled(self):
- self._create_verify_delete_subnet(enable_dhcp=True)
-
- @test.attr(type='smoke')
- @test.idempotent_id('3d3852eb-3009-49ec-97ac-5ce83b73010a')
- def test_update_subnet_gw_dns_host_routes_dhcp(self):
- network = self.create_network()
- self.addCleanup(self._delete_network, network)
-
- subnet = self.create_subnet(
- network, **self.subnet_dict(['gateway', 'host_routes',
- 'dns_nameservers',
- 'allocation_pools']))
- subnet_id = subnet['id']
- new_gateway = str(netaddr.IPAddress(
- self._subnet_data[self._ip_version]['gateway']) + 1)
- # Verify subnet update
- new_host_routes = self._subnet_data[self._ip_version][
- 'new_host_routes']
-
- new_dns_nameservers = self._subnet_data[self._ip_version][
- 'new_dns_nameservers']
- kwargs = {'host_routes': new_host_routes,
- 'dns_nameservers': new_dns_nameservers,
- 'gateway_ip': new_gateway, 'enable_dhcp': True}
-
- new_name = "New_subnet"
- body = self.client.update_subnet(subnet_id, name=new_name,
- **kwargs)
- updated_subnet = body['subnet']
- kwargs['name'] = new_name
- self.assertEqual(sorted(updated_subnet['dns_nameservers']),
- sorted(kwargs['dns_nameservers']))
- del subnet['dns_nameservers'], kwargs['dns_nameservers']
-
- self._compare_resource_attrs(updated_subnet, kwargs)
-
- @test.attr(type='smoke')
- @test.idempotent_id('a4d9ec4c-0306-4111-a75c-db01a709030b')
- def test_create_delete_subnet_all_attributes(self):
- self._create_verify_delete_subnet(
- enable_dhcp=True,
- **self.subnet_dict(['gateway', 'host_routes', 'dns_nameservers']))
-
- @test.attr(type='smoke')
- @test.idempotent_id('af774677-42a9-4e4b-bb58-16fe6a5bc1ec')
- def test_external_network_visibility(self):
- """Verifies user can see external networks but not subnets."""
- body = self.client.list_networks(**{'router:external': True})
- networks = [network['id'] for network in body['networks']]
- self.assertNotEmpty(networks, "No external networks found")
-
- nonexternal = [net for net in body['networks'] if
- not net['router:external']]
- self.assertEmpty(nonexternal, "Found non-external networks"
- " in filtered list (%s)." % nonexternal)
- self.assertIn(CONF.network.public_network_id, networks)
-
- subnets_iter = (network['subnets'] for network in body['networks'])
- # subnets_iter is a list (iterator) of lists. This flattens it to a
- # list of UUIDs
- public_subnets_iter = itertools.chain(*subnets_iter)
- body = self.client.list_subnets()
- subnets = [sub['id'] for sub in body['subnets']
- if sub['id'] in public_subnets_iter]
- self.assertEmpty(subnets, "Public subnets visible")
-
-
-class BulkNetworkOpsTestJSON(base.BaseNetworkTest):
-
- """
- Tests the following operations in the Neutron API using the REST client for
- Neutron:
-
- bulk network creation
- bulk subnet creation
- bulk port creation
- list tenant's networks
-
- v2.0 of the Neutron API is assumed. It is also assumed that the following
- options are defined in the [network] section of etc/tempest.conf:
-
- tenant_network_cidr with a block of cidr's from which smaller blocks
- can be allocated for tenant networks
-
- tenant_network_mask_bits with the mask bits to be used to partition the
- block defined by tenant-network_cidr
- """
-
- def _delete_networks(self, created_networks):
- for n in created_networks:
- self.client.delete_network(n['id'])
- # Asserting that the networks are not found in the list after deletion
- body = self.client.list_networks()
- networks_list = [network['id'] for network in body['networks']]
- for n in created_networks:
- self.assertNotIn(n['id'], networks_list)
-
- def _delete_subnets(self, created_subnets):
- for n in created_subnets:
- self.client.delete_subnet(n['id'])
- # Asserting that the subnets are not found in the list after deletion
- body = self.client.list_subnets()
- subnets_list = [subnet['id'] for subnet in body['subnets']]
- for n in created_subnets:
- self.assertNotIn(n['id'], subnets_list)
-
- def _delete_ports(self, created_ports):
- for n in created_ports:
- self.client.delete_port(n['id'])
- # Asserting that the ports are not found in the list after deletion
- body = self.client.list_ports()
- ports_list = [port['id'] for port in body['ports']]
- for n in created_ports:
- self.assertNotIn(n['id'], ports_list)
-
- @test.attr(type='smoke')
- @test.idempotent_id('d4f9024d-1e28-4fc1-a6b1-25dbc6fa11e2')
- def test_bulk_create_delete_network(self):
- # Creates 2 networks in one request
- network_names = [data_utils.rand_name('network-'),
- data_utils.rand_name('network-')]
- body = self.client.create_bulk_network(network_names)
- created_networks = body['networks']
- self.addCleanup(self._delete_networks, created_networks)
- # Asserting that the networks are found in the list after creation
- body = self.client.list_networks()
- networks_list = [network['id'] for network in body['networks']]
- for n in created_networks:
- self.assertIsNotNone(n['id'])
- self.assertIn(n['id'], networks_list)
-
- @test.attr(type='smoke')
- @test.idempotent_id('8936533b-c0aa-4f29-8e53-6cc873aec489')
- def test_bulk_create_delete_subnet(self):
- networks = [self.create_network(), self.create_network()]
- # Creates 2 subnets in one request
- if self._ip_version == 4:
- cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr)
- mask_bits = CONF.network.tenant_network_mask_bits
- else:
- cidr = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr)
- mask_bits = CONF.network.tenant_network_v6_mask_bits
-
- cidrs = [subnet_cidr for subnet_cidr in cidr.subnet(mask_bits)]
-
- names = [data_utils.rand_name('subnet-') for i in range(len(networks))]
- subnets_list = []
- for i in range(len(names)):
- p1 = {
- 'network_id': networks[i]['id'],
- 'cidr': str(cidrs[(i)]),
- 'name': names[i],
- 'ip_version': self._ip_version
- }
- subnets_list.append(p1)
- del subnets_list[1]['name']
- body = self.client.create_bulk_subnet(subnets_list)
- created_subnets = body['subnets']
- self.addCleanup(self._delete_subnets, created_subnets)
- # Asserting that the subnets are found in the list after creation
- body = self.client.list_subnets()
- subnets_list = [subnet['id'] for subnet in body['subnets']]
- for n in created_subnets:
- self.assertIsNotNone(n['id'])
- self.assertIn(n['id'], subnets_list)
-
- @test.attr(type='smoke')
- @test.idempotent_id('48037ff2-e889-4c3b-b86a-8e3f34d2d060')
- def test_bulk_create_delete_port(self):
- networks = [self.create_network(), self.create_network()]
- # Creates 2 ports in one request
- names = [data_utils.rand_name('port-') for i in range(len(networks))]
- port_list = []
- state = [True, False]
- for i in range(len(names)):
- p1 = {
- 'network_id': networks[i]['id'],
- 'name': names[i],
- 'admin_state_up': state[i],
- }
- port_list.append(p1)
- del port_list[1]['name']
- body = self.client.create_bulk_port(port_list)
- created_ports = body['ports']
- self.addCleanup(self._delete_ports, created_ports)
- # Asserting that the ports are found in the list after creation
- body = self.client.list_ports()
- ports_list = [port['id'] for port in body['ports']]
- for n in created_ports:
- self.assertIsNotNone(n['id'])
- self.assertIn(n['id'], ports_list)
-
-
-class BulkNetworkOpsIpV6TestJSON(BulkNetworkOpsTestJSON):
- _ip_version = 6
-
-
-class NetworksIpV6TestJSON(NetworksTestJSON):
- _ip_version = 6
-
- @test.attr(type='smoke')
- @test.idempotent_id('e41a4888-65a6-418c-a095-f7c2ef4ad59a')
- def test_create_delete_subnet_with_gw(self):
- net = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr)
- gateway = str(netaddr.IPAddress(net.first + 2))
- name = data_utils.rand_name('network-')
- network = self.create_network(network_name=name)
- subnet = self.create_subnet(network, gateway)
- # Verifies Subnet GW in IPv6
- self.assertEqual(subnet['gateway_ip'], gateway)
-
- @test.attr(type='smoke')
- @test.idempotent_id('ebb4fd95-524f-46af-83c1-0305b239338f')
- def test_create_delete_subnet_with_default_gw(self):
- net = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr)
- gateway_ip = str(netaddr.IPAddress(net.first + 1))
- name = data_utils.rand_name('network-')
- network = self.create_network(network_name=name)
- subnet = self.create_subnet(network)
- # Verifies Subnet GW in IPv6
- self.assertEqual(subnet['gateway_ip'], gateway_ip)
-
- @test.attr(type='smoke')
- @test.idempotent_id('a9653883-b2a4-469b-8c3c-4518430a7e55')
- def test_create_list_subnet_with_no_gw64_one_network(self):
- name = data_utils.rand_name('network-')
- network = self.create_network(name)
- ipv6_gateway = self.subnet_dict(['gateway'])['gateway']
- subnet1 = self.create_subnet(network,
- ip_version=6,
- gateway=ipv6_gateway)
- self.assertEqual(netaddr.IPNetwork(subnet1['cidr']).version, 6,
- 'The created subnet is not IPv6')
- subnet2 = self.create_subnet(network,
- gateway=None,
- ip_version=4)
- self.assertEqual(netaddr.IPNetwork(subnet2['cidr']).version, 4,
- 'The created subnet is not IPv4')
- # Verifies Subnet GW is set in IPv6
- self.assertEqual(subnet1['gateway_ip'], ipv6_gateway)
- # Verifies Subnet GW is None in IPv4
- self.assertEqual(subnet2['gateway_ip'], None)
- # Verifies all 2 subnets in the same network
- body = self.client.list_subnets()
- subnets = [sub['id'] for sub in body['subnets']
- if sub['network_id'] == network['id']]
- test_subnet_ids = [sub['id'] for sub in (subnet1, subnet2)]
- self.assertItemsEqual(subnets,
- test_subnet_ids,
- 'Subnet are not in the same network')
-
-
-class NetworksIpV6TestAttrs(NetworksIpV6TestJSON):
-
- @classmethod
- def resource_setup(cls):
- if not CONF.network_feature_enabled.ipv6_subnet_attributes:
- raise cls.skipException("IPv6 extended attributes for "
- "subnets not available")
- super(NetworksIpV6TestAttrs, cls).resource_setup()
-
- @test.attr(type='smoke')
- @test.idempotent_id('da40cd1b-a833-4354-9a85-cd9b8a3b74ca')
- def test_create_delete_subnet_with_v6_attributes_stateful(self):
- self._create_verify_delete_subnet(
- gateway=self._subnet_data[self._ip_version]['gateway'],
- ipv6_ra_mode='dhcpv6-stateful',
- ipv6_address_mode='dhcpv6-stateful')
-
- @test.attr(type='smoke')
- @test.idempotent_id('176b030f-a923-4040-a755-9dc94329e60c')
- def test_create_delete_subnet_with_v6_attributes_slaac(self):
- self._create_verify_delete_subnet(
- ipv6_ra_mode='slaac',
- ipv6_address_mode='slaac')
-
- @test.attr(type='smoke')
- @test.idempotent_id('7d410310-8c86-4902-adf9-865d08e31adb')
- def test_create_delete_subnet_with_v6_attributes_stateless(self):
- self._create_verify_delete_subnet(
- ipv6_ra_mode='dhcpv6-stateless',
- ipv6_address_mode='dhcpv6-stateless')
-
- def _test_delete_subnet_with_ports(self, mode):
- """Create subnet and delete it with existing ports"""
- slaac_network = self.create_network()
- subnet_slaac = self.create_subnet(slaac_network,
- **{'ipv6_ra_mode': mode,
- 'ipv6_address_mode': mode})
- port = self.create_port(slaac_network)
- self.assertIsNotNone(port['fixed_ips'][0]['ip_address'])
- self.client.delete_subnet(subnet_slaac['id'])
- self.subnets.pop()
- subnets = self.client.list_subnets()
- subnet_ids = [subnet['id'] for subnet in subnets['subnets']]
- self.assertNotIn(subnet_slaac['id'], subnet_ids,
- "Subnet wasn't deleted")
- self.assertRaisesRegexp(
- lib_exc.Conflict,
- "There are one or more ports still in use on the network",
- self.client.delete_network,
- slaac_network['id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('88554555-ebf8-41ef-9300-4926d45e06e9')
- def test_create_delete_slaac_subnet_with_ports(self):
- """Test deleting subnet with SLAAC ports
-
- Create subnet with SLAAC, create ports in network
- and then you shall be able to delete subnet without port
- deletion. But you still can not delete the network.
- """
- self._test_delete_subnet_with_ports("slaac")
-
- @test.attr(type='smoke')
- @test.idempotent_id('2de6ab5a-fcf0-4144-9813-f91a940291f1')
- def test_create_delete_stateless_subnet_with_ports(self):
- """Test deleting subnet with DHCPv6 stateless ports
-
- Create subnet with DHCPv6 stateless, create ports in network
- and then you shall be able to delete subnet without port
- deletion. But you still can not delete the network.
- """
- self._test_delete_subnet_with_ports("dhcpv6-stateless")
diff --git a/neutron/tests/tempest/api/network/test_networks_negative.py b/neutron/tests/tempest/api/network/test_networks_negative.py
deleted file mode 100644
index 7fada6e..0000000
--- a/neutron/tests/tempest/api/network/test_networks_negative.py
+++ /dev/null
@@ -1,59 +0,0 @@
-# Copyright 2013 Huawei Technologies Co.,LTD.
-# Copyright 2012 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest_lib.common.utils import data_utils
-from tempest_lib import exceptions as lib_exc
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest import test
-
-
-class NetworksNegativeTestJSON(base.BaseNetworkTest):
-
- @test.attr(type=['negative', 'smoke'])
- @test.idempotent_id('9293e937-824d-42d2-8d5b-e985ea67002a')
- def test_show_non_existent_network(self):
- non_exist_id = data_utils.rand_name('network')
- self.assertRaises(lib_exc.NotFound, self.client.show_network,
- non_exist_id)
-
- @test.attr(type=['negative', 'smoke'])
- @test.idempotent_id('d746b40c-5e09-4043-99f7-cba1be8b70df')
- def test_show_non_existent_subnet(self):
- non_exist_id = data_utils.rand_name('subnet')
- self.assertRaises(lib_exc.NotFound, self.client.show_subnet,
- non_exist_id)
-
- @test.attr(type=['negative', 'smoke'])
- @test.idempotent_id('a954861d-cbfd-44e8-b0a9-7fab111f235d')
- def test_show_non_existent_port(self):
- non_exist_id = data_utils.rand_name('port')
- self.assertRaises(lib_exc.NotFound, self.client.show_port,
- non_exist_id)
-
- @test.attr(type=['negative', 'smoke'])
- @test.idempotent_id('98bfe4e3-574e-4012-8b17-b2647063de87')
- def test_update_non_existent_network(self):
- non_exist_id = data_utils.rand_name('network')
- self.assertRaises(lib_exc.NotFound, self.client.update_network,
- non_exist_id, name="new_name")
-
- @test.attr(type=['negative', 'smoke'])
- @test.idempotent_id('03795047-4a94-4120-a0a1-bd376e36fd4e')
- def test_delete_non_existent_network(self):
- non_exist_id = data_utils.rand_name('network')
- self.assertRaises(lib_exc.NotFound, self.client.delete_network,
- non_exist_id)
diff --git a/neutron/tests/tempest/api/network/test_ports.py b/neutron/tests/tempest/api/network/test_ports.py
deleted file mode 100644
index 044f109..0000000
--- a/neutron/tests/tempest/api/network/test_ports.py
+++ /dev/null
@@ -1,403 +0,0 @@
-# Copyright 2014 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import socket
-
-import netaddr
-from tempest_lib.common.utils import data_utils
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest.api.network import base_security_groups as sec_base
-from neutron.tests.tempest.common import custom_matchers
-from neutron.tests.tempest import config
-from neutron.tests.tempest import test
-
-CONF = config.CONF
-
-
-class PortsTestJSON(sec_base.BaseSecGroupTest):
-
- """
- Test the following operations for ports:
-
- port create
- port delete
- port list
- port show
- port update
- """
-
- @classmethod
- def resource_setup(cls):
- super(PortsTestJSON, cls).resource_setup()
- cls.network = cls.create_network()
- cls.port = cls.create_port(cls.network)
-
- def _delete_port(self, port_id):
- self.client.delete_port(port_id)
- body = self.client.list_ports()
- ports_list = body['ports']
- self.assertFalse(port_id in [n['id'] for n in ports_list])
-
- @test.attr(type='smoke')
- @test.idempotent_id('c72c1c0c-2193-4aca-aaa4-b1442640f51c')
- def test_create_update_delete_port(self):
- # Verify port creation
- body = self.client.create_port(network_id=self.network['id'])
- port = body['port']
- # Schedule port deletion with verification upon test completion
- self.addCleanup(self._delete_port, port['id'])
- self.assertTrue(port['admin_state_up'])
- # Verify port update
- new_name = "New_Port"
- body = self.client.update_port(port['id'],
- name=new_name,
- admin_state_up=False)
- updated_port = body['port']
- self.assertEqual(updated_port['name'], new_name)
- self.assertFalse(updated_port['admin_state_up'])
-
- @test.idempotent_id('67f1b811-f8db-43e2-86bd-72c074d4a42c')
- def test_create_bulk_port(self):
- network1 = self.network
- name = data_utils.rand_name('network-')
- network2 = self.create_network(network_name=name)
- network_list = [network1['id'], network2['id']]
- port_list = [{'network_id': net_id} for net_id in network_list]
- body = self.client.create_bulk_port(port_list)
- created_ports = body['ports']
- port1 = created_ports[0]
- port2 = created_ports[1]
- self.addCleanup(self._delete_port, port1['id'])
- self.addCleanup(self._delete_port, port2['id'])
- self.assertEqual(port1['network_id'], network1['id'])
- self.assertEqual(port2['network_id'], network2['id'])
- self.assertTrue(port1['admin_state_up'])
- self.assertTrue(port2['admin_state_up'])
-
- @classmethod
- def _get_ipaddress_from_tempest_conf(cls):
- """Return first subnet gateway for configured CIDR """
- if cls._ip_version == 4:
- cidr = netaddr.IPNetwork(CONF.network.tenant_network_cidr)
-
- elif cls._ip_version == 6:
- cidr = netaddr.IPNetwork(CONF.network.tenant_network_v6_cidr)
-
- return netaddr.IPAddress(cidr)
-
- @test.attr(type='smoke')
- @test.idempotent_id('0435f278-40ae-48cb-a404-b8a087bc09b1')
- def test_create_port_in_allowed_allocation_pools(self):
- network = self.create_network()
- net_id = network['id']
- address = self._get_ipaddress_from_tempest_conf()
- allocation_pools = {'allocation_pools': [{'start': str(address + 4),
- 'end': str(address + 6)}]}
- subnet = self.create_subnet(network, **allocation_pools)
- self.addCleanup(self.client.delete_subnet, subnet['id'])
- body = self.client.create_port(network_id=net_id)
- self.addCleanup(self.client.delete_port, body['port']['id'])
- port = body['port']
- ip_address = port['fixed_ips'][0]['ip_address']
- start_ip_address = allocation_pools['allocation_pools'][0]['start']
- end_ip_address = allocation_pools['allocation_pools'][0]['end']
- ip_range = netaddr.IPRange(start_ip_address, end_ip_address)
- self.assertIn(ip_address, ip_range)
-
- @test.attr(type='smoke')
- @test.idempotent_id('c9a685bd-e83f-499c-939f-9f7863ca259f')
- def test_show_port(self):
- # Verify the details of port
- body = self.client.show_port(self.port['id'])
- port = body['port']
- self.assertIn('id', port)
- # TODO(Santosh)- This is a temporary workaround to compare create_port
- # and show_port dict elements.Remove this once extra_dhcp_opts issue
- # gets fixed in neutron.( bug - 1365341.)
- self.assertThat(self.port,
- custom_matchers.MatchesDictExceptForKeys
- (port, excluded_keys=['extra_dhcp_opts']))
-
- @test.attr(type='smoke')
- @test.idempotent_id('45fcdaf2-dab0-4c13-ac6c-fcddfb579dbd')
- def test_show_port_fields(self):
- # Verify specific fields of a port
- fields = ['id', 'mac_address']
- body = self.client.show_port(self.port['id'],
- fields=fields)
- port = body['port']
- self.assertEqual(sorted(port.keys()), sorted(fields))
- for field_name in fields:
- self.assertEqual(port[field_name], self.port[field_name])
-
- @test.attr(type='smoke')
- @test.idempotent_id('cf95b358-3e92-4a29-a148-52445e1ac50e')
- def test_list_ports(self):
- # Verify the port exists in the list of all ports
- body = self.client.list_ports()
- ports = [port['id'] for port in body['ports']
- if port['id'] == self.port['id']]
- self.assertNotEmpty(ports, "Created port not found in the list")
-
- @test.attr(type='smoke')
- @test.idempotent_id('5ad01ed0-0e6e-4c5d-8194-232801b15c72')
- def test_port_list_filter_by_router_id(self):
- # Create a router
- network = self.create_network()
- self.addCleanup(self.client.delete_network, network['id'])
- subnet = self.create_subnet(network)
- self.addCleanup(self.client.delete_subnet, subnet['id'])
- router = self.create_router(data_utils.rand_name('router-'))
- self.addCleanup(self.client.delete_router, router['id'])
- port = self.client.create_port(network_id=network['id'])
- # Add router interface to port created above
- self.client.add_router_interface_with_port_id(
- router['id'], port['port']['id'])
- self.addCleanup(self.client.remove_router_interface_with_port_id,
- router['id'], port['port']['id'])
- # List ports filtered by router_id
- port_list = self.client.list_ports(device_id=router['id'])
- ports = port_list['ports']
- self.assertEqual(len(ports), 1)
- self.assertEqual(ports[0]['id'], port['port']['id'])
- self.assertEqual(ports[0]['device_id'], router['id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('ff7f117f-f034-4e0e-abff-ccef05c454b4')
- def test_list_ports_fields(self):
- # Verify specific fields of ports
- fields = ['id', 'mac_address']
- body = self.client.list_ports(fields=fields)
- ports = body['ports']
- self.assertNotEmpty(ports, "Port list returned is empty")
- # Asserting the fields returned are correct
- for port in ports:
- self.assertEqual(sorted(fields), sorted(port.keys()))
-
- @test.attr(type='smoke')
- @test.idempotent_id('63aeadd4-3b49-427f-a3b1-19ca81f06270')
- def test_create_update_port_with_second_ip(self):
- # Create a network with two subnets
- network = self.create_network()
- self.addCleanup(self.client.delete_network, network['id'])
- subnet_1 = self.create_subnet(network)
- self.addCleanup(self.client.delete_subnet, subnet_1['id'])
- subnet_2 = self.create_subnet(network)
- self.addCleanup(self.client.delete_subnet, subnet_2['id'])
- fixed_ip_1 = [{'subnet_id': subnet_1['id']}]
- fixed_ip_2 = [{'subnet_id': subnet_2['id']}]
-
- fixed_ips = fixed_ip_1 + fixed_ip_2
-
- # Create a port with multiple IP addresses
- port = self.create_port(network,
- fixed_ips=fixed_ips)
- self.addCleanup(self.client.delete_port, port['id'])
- self.assertEqual(2, len(port['fixed_ips']))
- check_fixed_ips = [subnet_1['id'], subnet_2['id']]
- for item in port['fixed_ips']:
- self.assertIn(item['subnet_id'], check_fixed_ips)
-
- # Update the port to return to a single IP address
- port = self.update_port(port, fixed_ips=fixed_ip_1)
- self.assertEqual(1, len(port['fixed_ips']))
-
- # Update the port with a second IP address from second subnet
- port = self.update_port(port, fixed_ips=fixed_ips)
- self.assertEqual(2, len(port['fixed_ips']))
-
- def _update_port_with_security_groups(self, security_groups_names):
- subnet_1 = self.create_subnet(self.network)
- self.addCleanup(self.client.delete_subnet, subnet_1['id'])
- fixed_ip_1 = [{'subnet_id': subnet_1['id']}]
-
- security_groups_list = list()
- for name in security_groups_names:
- group_create_body = self.client.create_security_group(
- name=name)
- self.addCleanup(self.client.delete_security_group,
- group_create_body['security_group']['id'])
- security_groups_list.append(group_create_body['security_group']
- ['id'])
- # Create a port
- sec_grp_name = data_utils.rand_name('secgroup')
- security_group = self.client.create_security_group(name=sec_grp_name)
- self.addCleanup(self.client.delete_security_group,
- security_group['security_group']['id'])
- post_body = {
- "name": data_utils.rand_name('port-'),
- "security_groups": [security_group['security_group']['id']],
- "network_id": self.network['id'],
- "admin_state_up": True,
- "fixed_ips": fixed_ip_1}
- body = self.client.create_port(**post_body)
- self.addCleanup(self.client.delete_port, body['port']['id'])
- port = body['port']
-
- # Update the port with security groups
- subnet_2 = self.create_subnet(self.network)
- fixed_ip_2 = [{'subnet_id': subnet_2['id']}]
- update_body = {"name": data_utils.rand_name('port-'),
- "admin_state_up": False,
- "fixed_ips": fixed_ip_2,
- "security_groups": security_groups_list}
- body = self.client.update_port(port['id'], **update_body)
- port_show = body['port']
- # Verify the security groups and other attributes updated to port
- exclude_keys = set(port_show).symmetric_difference(update_body)
- exclude_keys.add('fixed_ips')
- exclude_keys.add('security_groups')
- self.assertThat(port_show, custom_matchers.MatchesDictExceptForKeys(
- update_body, exclude_keys))
- self.assertEqual(fixed_ip_2[0]['subnet_id'],
- port_show['fixed_ips'][0]['subnet_id'])
-
- for security_group in security_groups_list:
- self.assertIn(security_group, port_show['security_groups'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('58091b66-4ff4-4cc1-a549-05d60c7acd1a')
- def test_update_port_with_security_group_and_extra_attributes(self):
- self._update_port_with_security_groups(
- [data_utils.rand_name('secgroup')])
-
- @test.attr(type='smoke')
- @test.idempotent_id('edf6766d-3d40-4621-bc6e-2521a44c257d')
- def test_update_port_with_two_security_groups_and_extra_attributes(self):
- self._update_port_with_security_groups(
- [data_utils.rand_name('secgroup'),
- data_utils.rand_name('secgroup')])
-
- @test.attr(type='smoke')
- @test.idempotent_id('13e95171-6cbd-489c-9d7c-3f9c58215c18')
- def test_create_show_delete_port_user_defined_mac(self):
- # Create a port for a legal mac
- body = self.client.create_port(network_id=self.network['id'])
- old_port = body['port']
- free_mac_address = old_port['mac_address']
- self.client.delete_port(old_port['id'])
- # Create a new port with user defined mac
- body = self.client.create_port(network_id=self.network['id'],
- mac_address=free_mac_address)
- self.addCleanup(self.client.delete_port, body['port']['id'])
- port = body['port']
- body = self.client.show_port(port['id'])
- show_port = body['port']
- self.assertEqual(free_mac_address,
- show_port['mac_address'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('4179dcb9-1382-4ced-84fe-1b91c54f5735')
- def test_create_port_with_no_securitygroups(self):
- network = self.create_network()
- self.addCleanup(self.client.delete_network, network['id'])
- subnet = self.create_subnet(network)
- self.addCleanup(self.client.delete_subnet, subnet['id'])
- port = self.create_port(network, security_groups=[])
- self.addCleanup(self.client.delete_port, port['id'])
- self.assertIsNotNone(port['security_groups'])
- self.assertEmpty(port['security_groups'])
-
-
-class PortsAdminExtendedAttrsTestJSON(base.BaseAdminNetworkTest):
-
- @classmethod
- def resource_setup(cls):
- super(PortsAdminExtendedAttrsTestJSON, cls).resource_setup()
- cls.identity_client = cls._get_identity_admin_client()
- cls.tenant = cls.identity_client.get_tenant_by_name(
- CONF.identity.tenant_name)
- cls.network = cls.create_network()
- cls.host_id = socket.gethostname()
-
- @test.attr(type='smoke')
- @test.idempotent_id('8e8569c1-9ac7-44db-8bc1-f5fb2814f29b')
- def test_create_port_binding_ext_attr(self):
- post_body = {"network_id": self.network['id'],
- "binding:host_id": self.host_id}
- body = self.admin_client.create_port(**post_body)
- port = body['port']
- self.addCleanup(self.admin_client.delete_port, port['id'])
- host_id = port['binding:host_id']
- self.assertIsNotNone(host_id)
- self.assertEqual(self.host_id, host_id)
-
- @test.attr(type='smoke')
- @test.idempotent_id('6f6c412c-711f-444d-8502-0ac30fbf5dd5')
- def test_update_port_binding_ext_attr(self):
- post_body = {"network_id": self.network['id']}
- body = self.admin_client.create_port(**post_body)
- port = body['port']
- self.addCleanup(self.admin_client.delete_port, port['id'])
- update_body = {"binding:host_id": self.host_id}
- body = self.admin_client.update_port(port['id'], **update_body)
- updated_port = body['port']
- host_id = updated_port['binding:host_id']
- self.assertIsNotNone(host_id)
- self.assertEqual(self.host_id, host_id)
-
- @test.attr(type='smoke')
- @test.idempotent_id('1c82a44a-6c6e-48ff-89e1-abe7eaf8f9f8')
- def test_list_ports_binding_ext_attr(self):
- # Create a new port
- post_body = {"network_id": self.network['id']}
- body = self.admin_client.create_port(**post_body)
- port = body['port']
- self.addCleanup(self.admin_client.delete_port, port['id'])
-
- # Update the port's binding attributes so that is now 'bound'
- # to a host
- update_body = {"binding:host_id": self.host_id}
- self.admin_client.update_port(port['id'], **update_body)
-
- # List all ports, ensure new port is part of list and its binding
- # attributes are set and accurate
- body = self.admin_client.list_ports()
- ports_list = body['ports']
- pids_list = [p['id'] for p in ports_list]
- self.assertIn(port['id'], pids_list)
- listed_port = [p for p in ports_list if p['id'] == port['id']]
- self.assertEqual(1, len(listed_port),
- 'Multiple ports listed with id %s in ports listing: '
- '%s' % (port['id'], ports_list))
- self.assertEqual(self.host_id, listed_port[0]['binding:host_id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('b54ac0ff-35fc-4c79-9ca3-c7dbd4ea4f13')
- def test_show_port_binding_ext_attr(self):
- body = self.admin_client.create_port(network_id=self.network['id'])
- port = body['port']
- self.addCleanup(self.admin_client.delete_port, port['id'])
- body = self.admin_client.show_port(port['id'])
- show_port = body['port']
- self.assertEqual(port['binding:host_id'],
- show_port['binding:host_id'])
- self.assertEqual(port['binding:vif_type'],
- show_port['binding:vif_type'])
- self.assertEqual(port['binding:vif_details'],
- show_port['binding:vif_details'])
-
-
-class PortsIpV6TestJSON(PortsTestJSON):
- _ip_version = 6
- _tenant_network_cidr = CONF.network.tenant_network_v6_cidr
- _tenant_network_mask_bits = CONF.network.tenant_network_v6_mask_bits
-
-
-class PortsAdminExtendedAttrsIpV6TestJSON(PortsAdminExtendedAttrsTestJSON):
- _ip_version = 6
- _tenant_network_cidr = CONF.network.tenant_network_v6_cidr
- _tenant_network_mask_bits = CONF.network.tenant_network_v6_mask_bits
diff --git a/neutron/tests/tempest/api/network/test_routers.py b/neutron/tests/tempest/api/network/test_routers.py
deleted file mode 100644
index dcaba22..0000000
--- a/neutron/tests/tempest/api/network/test_routers.py
+++ /dev/null
@@ -1,359 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import netaddr
-from tempest_lib.common.utils import data_utils
-
-from neutron.tests.tempest.api.network import base_routers as base
-from neutron.tests.api.contrib import clients
-from neutron.tests.tempest import config
-from neutron.tests.tempest import test
-
-CONF = config.CONF
-
-
-class RoutersTest(base.BaseRouterTest):
-
- @classmethod
- def resource_setup(cls):
- super(RoutersTest, cls).resource_setup()
- if not test.is_extension_enabled('router', 'network'):
- msg = "router extension not enabled."
- raise cls.skipException(msg)
- admin_manager = clients.AdminManager()
- cls.identity_admin_client = admin_manager.identity_client
- cls.tenant_cidr = (CONF.network.tenant_network_cidr
- if cls._ip_version == 4 else
- CONF.network.tenant_network_v6_cidr)
-
- def _cleanup_router(self, router):
- self.delete_router(router)
- self.routers.remove(router)
-
- def _create_router(self, name, admin_state_up=False,
- external_network_id=None, enable_snat=None):
- # associate a cleanup with created routers to avoid quota limits
- router = self.create_router(name, admin_state_up,
- external_network_id, enable_snat)
- self.addCleanup(self._cleanup_router, router)
- return router
-
- @test.attr(type='smoke')
- @test.idempotent_id('f64403e2-8483-4b34-8ccd-b09a87bcc68c')
- def test_create_show_list_update_delete_router(self):
- # Create a router
- # NOTE(salv-orlando): Do not invoke self.create_router
- # as we need to check the response code
- name = data_utils.rand_name('router-')
- create_body = self.client.create_router(
- name, external_gateway_info={
- "network_id": CONF.network.public_network_id},
- admin_state_up=False)
- self.addCleanup(self._delete_router, create_body['router']['id'])
- self.assertEqual(create_body['router']['name'], name)
- self.assertEqual(
- create_body['router']['external_gateway_info']['network_id'],
- CONF.network.public_network_id)
- self.assertEqual(create_body['router']['admin_state_up'], False)
- # Show details of the created router
- show_body = self.client.show_router(create_body['router']['id'])
- self.assertEqual(show_body['router']['name'], name)
- self.assertEqual(
- show_body['router']['external_gateway_info']['network_id'],
- CONF.network.public_network_id)
- self.assertEqual(show_body['router']['admin_state_up'], False)
- # List routers and verify if created router is there in response
- list_body = self.client.list_routers()
- routers_list = list()
- for router in list_body['routers']:
- routers_list.append(router['id'])
- self.assertIn(create_body['router']['id'], routers_list)
- # Update the name of router and verify if it is updated
- updated_name = 'updated ' + name
- update_body = self.client.update_router(create_body['router']['id'],
- name=updated_name)
- self.assertEqual(update_body['router']['name'], updated_name)
- show_body = self.client.show_router(
- create_body['router']['id'])
- self.assertEqual(show_body['router']['name'], updated_name)
-
- @test.attr(type='smoke')
- @test.idempotent_id('e54dd3a3-4352-4921-b09d-44369ae17397')
- def test_create_router_setting_tenant_id(self):
- # Test creating router from admin user setting tenant_id.
- test_tenant = data_utils.rand_name('test_tenant_')
- test_description = data_utils.rand_name('desc_')
- tenant = self.identity_admin_client.create_tenant(
- name=test_tenant, description=test_description)
- tenant_id = tenant['id']
- self.addCleanup(self.identity_admin_client.delete_tenant, tenant_id)
-
- name = data_utils.rand_name('router-')
- create_body = self.admin_client.create_router(name,
- tenant_id=tenant_id)
- self.addCleanup(self.admin_client.delete_router,
- create_body['router']['id'])
- self.assertEqual(tenant_id, create_body['router']['tenant_id'])
-
- @test.idempotent_id('847257cc-6afd-4154-b8fb-af49f5670ce8')
- @test.requires_ext(extension='ext-gw-mode', service='network')
- @test.attr(type='smoke')
- def test_create_router_with_default_snat_value(self):
- # Create a router with default snat rule
- name = data_utils.rand_name('router')
- router = self._create_router(
- name, external_network_id=CONF.network.public_network_id)
- self._verify_router_gateway(
- router['id'], {'network_id': CONF.network.public_network_id,
- 'enable_snat': True})
-
- @test.idempotent_id('ea74068d-09e9-4fd7-8995-9b6a1ace920f')
- @test.requires_ext(extension='ext-gw-mode', service='network')
- @test.attr(type='smoke')
- def test_create_router_with_snat_explicit(self):
- name = data_utils.rand_name('snat-router')
- # Create a router enabling snat attributes
- enable_snat_states = [False, True]
- for enable_snat in enable_snat_states:
- external_gateway_info = {
- 'network_id': CONF.network.public_network_id,
- 'enable_snat': enable_snat}
- create_body = self.admin_client.create_router(
- name, external_gateway_info=external_gateway_info)
- self.addCleanup(self.admin_client.delete_router,
- create_body['router']['id'])
- # Verify snat attributes after router creation
- self._verify_router_gateway(create_body['router']['id'],
- exp_ext_gw_info=external_gateway_info)
-
- @test.attr(type='smoke')
- @test.idempotent_id('b42e6e39-2e37-49cc-a6f4-8467e940900a')
- def test_add_remove_router_interface_with_subnet_id(self):
- network = self.create_network()
- subnet = self.create_subnet(network)
- router = self._create_router(data_utils.rand_name('router-'))
- # Add router interface with subnet id
- interface = self.client.add_router_interface_with_subnet_id(
- router['id'], subnet['id'])
- self.addCleanup(self._remove_router_interface_with_subnet_id,
- router['id'], subnet['id'])
- self.assertIn('subnet_id', interface.keys())
- self.assertIn('port_id', interface.keys())
- # Verify router id is equal to device id in port details
- show_port_body = self.client.show_port(
- interface['port_id'])
- self.assertEqual(show_port_body['port']['device_id'],
- router['id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('2b7d2f37-6748-4d78-92e5-1d590234f0d5')
- def test_add_remove_router_interface_with_port_id(self):
- network = self.create_network()
- self.create_subnet(network)
- router = self._create_router(data_utils.rand_name('router-'))
- port_body = self.client.create_port(
- network_id=network['id'])
- # add router interface to port created above
- interface = self.client.add_router_interface_with_port_id(
- router['id'], port_body['port']['id'])
- self.addCleanup(self._remove_router_interface_with_port_id,
- router['id'], port_body['port']['id'])
- self.assertIn('subnet_id', interface.keys())
- self.assertIn('port_id', interface.keys())
- # Verify router id is equal to device id in port details
- show_port_body = self.client.show_port(
- interface['port_id'])
- self.assertEqual(show_port_body['port']['device_id'],
- router['id'])
-
- def _verify_router_gateway(self, router_id, exp_ext_gw_info=None):
- show_body = self.admin_client.show_router(router_id)
- actual_ext_gw_info = show_body['router']['external_gateway_info']
- if exp_ext_gw_info is None:
- self.assertIsNone(actual_ext_gw_info)
- return
- # Verify only keys passed in exp_ext_gw_info
- for k, v in exp_ext_gw_info.iteritems():
- self.assertEqual(v, actual_ext_gw_info[k])
-
- def _verify_gateway_port(self, router_id):
- list_body = self.admin_client.list_ports(
- network_id=CONF.network.public_network_id,
- device_id=router_id)
- self.assertEqual(len(list_body['ports']), 1)
- gw_port = list_body['ports'][0]
- fixed_ips = gw_port['fixed_ips']
- self.assertGreaterEqual(len(fixed_ips), 1)
- public_net_body = self.admin_client.show_network(
- CONF.network.public_network_id)
- public_subnet_id = public_net_body['network']['subnets'][0]
- self.assertIn(public_subnet_id,
- map(lambda x: x['subnet_id'], fixed_ips))
-
- @test.attr(type='smoke')
- @test.idempotent_id('6cc285d8-46bf-4f36-9b1a-783e3008ba79')
- def test_update_router_set_gateway(self):
- router = self._create_router(data_utils.rand_name('router-'))
- self.client.update_router(
- router['id'],
- external_gateway_info={
- 'network_id': CONF.network.public_network_id})
- # Verify operation - router
- self._verify_router_gateway(
- router['id'],
- {'network_id': CONF.network.public_network_id})
- self._verify_gateway_port(router['id'])
-
- @test.idempotent_id('b386c111-3b21-466d-880c-5e72b01e1a33')
- @test.requires_ext(extension='ext-gw-mode', service='network')
- @test.attr(type='smoke')
- def test_update_router_set_gateway_with_snat_explicit(self):
- router = self._create_router(data_utils.rand_name('router-'))
- self.admin_client.update_router_with_snat_gw_info(
- router['id'],
- external_gateway_info={
- 'network_id': CONF.network.public_network_id,
- 'enable_snat': True})
- self._verify_router_gateway(
- router['id'],
- {'network_id': CONF.network.public_network_id,
- 'enable_snat': True})
- self._verify_gateway_port(router['id'])
-
- @test.idempotent_id('96536bc7-8262-4fb2-9967-5c46940fa279')
- @test.requires_ext(extension='ext-gw-mode', service='network')
- @test.attr(type='smoke')
- def test_update_router_set_gateway_without_snat(self):
- router = self._create_router(data_utils.rand_name('router-'))
- self.admin_client.update_router_with_snat_gw_info(
- router['id'],
- external_gateway_info={
- 'network_id': CONF.network.public_network_id,
- 'enable_snat': False})
- self._verify_router_gateway(
- router['id'],
- {'network_id': CONF.network.public_network_id,
- 'enable_snat': False})
- self._verify_gateway_port(router['id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('ad81b7ee-4f81-407b-a19c-17e623f763e8')
- def test_update_router_unset_gateway(self):
- router = self._create_router(
- data_utils.rand_name('router-'),
- external_network_id=CONF.network.public_network_id)
- self.client.update_router(router['id'], external_gateway_info={})
- self._verify_router_gateway(router['id'])
- # No gateway port expected
- list_body = self.admin_client.list_ports(
- network_id=CONF.network.public_network_id,
- device_id=router['id'])
- self.assertFalse(list_body['ports'])
-
- @test.idempotent_id('f2faf994-97f4-410b-a831-9bc977b64374')
- @test.requires_ext(extension='ext-gw-mode', service='network')
- @test.attr(type='smoke')
- def test_update_router_reset_gateway_without_snat(self):
- router = self._create_router(
- data_utils.rand_name('router-'),
- external_network_id=CONF.network.public_network_id)
- self.admin_client.update_router_with_snat_gw_info(
- router['id'],
- external_gateway_info={
- 'network_id': CONF.network.public_network_id,
- 'enable_snat': False})
- self._verify_router_gateway(
- router['id'],
- {'network_id': CONF.network.public_network_id,
- 'enable_snat': False})
- self._verify_gateway_port(router['id'])
-
- @test.idempotent_id('c86ac3a8-50bd-4b00-a6b8-62af84a0765c')
- @test.requires_ext(extension='extraroute', service='network')
- @test.attr(type='smoke')
- def test_update_extra_route(self):
- self.network = self.create_network()
- self.name = self.network['name']
- self.subnet = self.create_subnet(self.network)
- # Add router interface with subnet id
- self.router = self._create_router(
- data_utils.rand_name('router-'), True)
- self.create_router_interface(self.router['id'], self.subnet['id'])
- self.addCleanup(
- self._delete_extra_routes,
- self.router['id'])
- # Update router extra route, second ip of the range is
- # used as next hop
- cidr = netaddr.IPNetwork(self.subnet['cidr'])
- next_hop = str(cidr[2])
- destination = str(self.subnet['cidr'])
- extra_route = self.client.update_extra_routes(self.router['id'],
- next_hop, destination)
- self.assertEqual(1, len(extra_route['router']['routes']))
- self.assertEqual(destination,
- extra_route['router']['routes'][0]['destination'])
- self.assertEqual(next_hop,
- extra_route['router']['routes'][0]['nexthop'])
- show_body = self.client.show_router(self.router['id'])
- self.assertEqual(destination,
- show_body['router']['routes'][0]['destination'])
- self.assertEqual(next_hop,
- show_body['router']['routes'][0]['nexthop'])
-
- def _delete_extra_routes(self, router_id):
- self.client.delete_extra_routes(router_id)
-
- @test.attr(type='smoke')
- @test.idempotent_id('a8902683-c788-4246-95c7-ad9c6d63a4d9')
- def test_update_router_admin_state(self):
- self.router = self._create_router(data_utils.rand_name('router-'))
- self.assertFalse(self.router['admin_state_up'])
- # Update router admin state
- update_body = self.client.update_router(self.router['id'],
- admin_state_up=True)
- self.assertTrue(update_body['router']['admin_state_up'])
- show_body = self.client.show_router(self.router['id'])
- self.assertTrue(show_body['router']['admin_state_up'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('802c73c9-c937-4cef-824b-2191e24a6aab')
- def test_add_multiple_router_interfaces(self):
- network01 = self.create_network(
- network_name=data_utils.rand_name('router-network01-'))
- network02 = self.create_network(
- network_name=data_utils.rand_name('router-network02-'))
- subnet01 = self.create_subnet(network01)
- sub02_cidr = netaddr.IPNetwork(self.tenant_cidr).next()
- subnet02 = self.create_subnet(network02, cidr=sub02_cidr)
- router = self._create_router(data_utils.rand_name('router-'))
- interface01 = self._add_router_interface_with_subnet_id(router['id'],
- subnet01['id'])
- self._verify_router_interface(router['id'], subnet01['id'],
- interface01['port_id'])
- interface02 = self._add_router_interface_with_subnet_id(router['id'],
- subnet02['id'])
- self._verify_router_interface(router['id'], subnet02['id'],
- interface02['port_id'])
-
- def _verify_router_interface(self, router_id, subnet_id, port_id):
- show_port_body = self.client.show_port(port_id)
- interface_port = show_port_body['port']
- self.assertEqual(router_id, interface_port['device_id'])
- self.assertEqual(subnet_id,
- interface_port['fixed_ips'][0]['subnet_id'])
-
-
-class RoutersIpV6Test(RoutersTest):
- _ip_version = 6
diff --git a/neutron/tests/tempest/api/network/test_routers_negative.py b/neutron/tests/tempest/api/network/test_routers_negative.py
deleted file mode 100644
index c62467d..0000000
--- a/neutron/tests/tempest/api/network/test_routers_negative.py
+++ /dev/null
@@ -1,112 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import netaddr
-from tempest_lib.common.utils import data_utils
-from tempest_lib import exceptions as lib_exc
-
-from neutron.tests.tempest.api.network import base_routers as base
-from neutron.tests.tempest import config
-from neutron.tests.tempest import test
-
-CONF = config.CONF
-
-
-class RoutersNegativeTest(base.BaseRouterTest):
-
- @classmethod
- def resource_setup(cls):
- super(RoutersNegativeTest, cls).resource_setup()
- if not test.is_extension_enabled('router', 'network'):
- msg = "router extension not enabled."
- raise cls.skipException(msg)
- cls.router = cls.create_router(data_utils.rand_name('router-'))
- cls.network = cls.create_network()
- cls.subnet = cls.create_subnet(cls.network)
- cls.tenant_cidr = (CONF.network.tenant_network_cidr
- if cls._ip_version == 4 else
- CONF.network.tenant_network_v6_cidr)
-
- @test.attr(type=['negative', 'smoke'])
- @test.idempotent_id('37a94fc0-a834-45b9-bd23-9a81d2fd1e22')
- def test_router_add_gateway_invalid_network_returns_404(self):
- self.assertRaises(lib_exc.NotFound,
- self.client.update_router,
- self.router['id'],
- external_gateway_info={
- 'network_id': self.router['id']})
-
- @test.attr(type=['negative', 'smoke'])
- @test.idempotent_id('11836a18-0b15-4327-a50b-f0d9dc66bddd')
- def test_router_add_gateway_net_not_external_returns_400(self):
- alt_network = self.create_network(
- network_name=data_utils.rand_name('router-negative-'))
- sub_cidr = netaddr.IPNetwork(self.tenant_cidr).next()
- self.create_subnet(alt_network, cidr=sub_cidr)
- self.assertRaises(lib_exc.BadRequest,
- self.client.update_router,
- self.router['id'],
- external_gateway_info={
- 'network_id': alt_network['id']})
-
- @test.attr(type=['negative', 'smoke'])
- @test.idempotent_id('957751a3-3c68-4fa2-93b6-eb52ea10db6e')
- def test_add_router_interfaces_on_overlapping_subnets_returns_400(self):
- network01 = self.create_network(
- network_name=data_utils.rand_name('router-network01-'))
- network02 = self.create_network(
- network_name=data_utils.rand_name('router-network02-'))
- subnet01 = self.create_subnet(network01)
- subnet02 = self.create_subnet(network02)
- self._add_router_interface_with_subnet_id(self.router['id'],
- subnet01['id'])
- self.assertRaises(lib_exc.BadRequest,
- self._add_router_interface_with_subnet_id,
- self.router['id'],
- subnet02['id'])
-
- @test.attr(type=['negative', 'smoke'])
- @test.idempotent_id('04df80f9-224d-47f5-837a-bf23e33d1c20')
- def test_router_remove_interface_in_use_returns_409(self):
- self.client.add_router_interface_with_subnet_id(
- self.router['id'], self.subnet['id'])
- self.assertRaises(lib_exc.Conflict,
- self.client.delete_router,
- self.router['id'])
-
- @test.attr(type=['negative', 'smoke'])
- @test.idempotent_id('c2a70d72-8826-43a7-8208-0209e6360c47')
- def test_show_non_existent_router_returns_404(self):
- router = data_utils.rand_name('non_exist_router')
- self.assertRaises(lib_exc.NotFound, self.client.show_router,
- router)
-
- @test.attr(type=['negative', 'smoke'])
- @test.idempotent_id('b23d1569-8b0c-4169-8d4b-6abd34fad5c7')
- def test_update_non_existent_router_returns_404(self):
- router = data_utils.rand_name('non_exist_router')
- self.assertRaises(lib_exc.NotFound, self.client.update_router,
- router, name="new_name")
-
- @test.attr(type=['negative', 'smoke'])
- @test.idempotent_id('c7edc5ad-d09d-41e6-a344-5c0c31e2e3e4')
- def test_delete_non_existent_router_returns_404(self):
- router = data_utils.rand_name('non_exist_router')
- self.assertRaises(lib_exc.NotFound, self.client.delete_router,
- router)
-
-
-class RoutersNegativeIpV6Test(RoutersNegativeTest):
- _ip_version = 6
diff --git a/neutron/tests/tempest/api/network/test_security_groups.py b/neutron/tests/tempest/api/network/test_security_groups.py
deleted file mode 100644
index a2d3a7c..0000000
--- a/neutron/tests/tempest/api/network/test_security_groups.py
+++ /dev/null
@@ -1,244 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import six
-from tempest_lib.common.utils import data_utils
-
-from neutron.tests.tempest.api.network import base_security_groups as base
-from neutron.tests.tempest import config
-from neutron.tests.tempest import test
-
-CONF = config.CONF
-
-
-class SecGroupTest(base.BaseSecGroupTest):
-
- _tenant_network_cidr = CONF.network.tenant_network_cidr
-
- @classmethod
- def resource_setup(cls):
- super(SecGroupTest, cls).resource_setup()
- if not test.is_extension_enabled('security-group', 'network'):
- msg = "security-group extension not enabled."
- raise cls.skipException(msg)
-
- def _create_verify_security_group_rule(self, sg_id, direction,
- ethertype, protocol,
- port_range_min,
- port_range_max,
- remote_group_id=None,
- remote_ip_prefix=None):
- # Create Security Group rule with the input params and validate
- # that SG rule is created with the same parameters.
- rule_create_body = self.client.create_security_group_rule(
- security_group_id=sg_id,
- direction=direction,
- ethertype=ethertype,
- protocol=protocol,
- port_range_min=port_range_min,
- port_range_max=port_range_max,
- remote_group_id=remote_group_id,
- remote_ip_prefix=remote_ip_prefix
- )
-
- sec_group_rule = rule_create_body['security_group_rule']
- self.addCleanup(self._delete_security_group_rule,
- sec_group_rule['id'])
-
- expected = {'direction': direction, 'protocol': protocol,
- 'ethertype': ethertype, 'port_range_min': port_range_min,
- 'port_range_max': port_range_max,
- 'remote_group_id': remote_group_id,
- 'remote_ip_prefix': remote_ip_prefix}
- for key, value in six.iteritems(expected):
- self.assertEqual(value, sec_group_rule[key],
- "Field %s of the created security group "
- "rule does not match with %s." %
- (key, value))
-
- @test.attr(type='smoke')
- @test.idempotent_id('e30abd17-fef9-4739-8617-dc26da88e686')
- def test_list_security_groups(self):
- # Verify the that security group belonging to tenant exist in list
- body = self.client.list_security_groups()
- security_groups = body['security_groups']
- found = None
- for n in security_groups:
- if (n['name'] == 'default'):
- found = n['id']
- msg = "Security-group list doesn't contain default security-group"
- self.assertIsNotNone(found, msg)
-
- @test.attr(type='smoke')
- @test.idempotent_id('bfd128e5-3c92-44b6-9d66-7fe29d22c802')
- def test_create_list_update_show_delete_security_group(self):
- group_create_body, name = self._create_security_group()
-
- # List security groups and verify if created group is there in response
- list_body = self.client.list_security_groups()
- secgroup_list = list()
- for secgroup in list_body['security_groups']:
- secgroup_list.append(secgroup['id'])
- self.assertIn(group_create_body['security_group']['id'], secgroup_list)
- # Update the security group
- new_name = data_utils.rand_name('security-')
- new_description = data_utils.rand_name('security-description')
- update_body = self.client.update_security_group(
- group_create_body['security_group']['id'],
- name=new_name,
- description=new_description)
- # Verify if security group is updated
- self.assertEqual(update_body['security_group']['name'], new_name)
- self.assertEqual(update_body['security_group']['description'],
- new_description)
- # Show details of the updated security group
- show_body = self.client.show_security_group(
- group_create_body['security_group']['id'])
- self.assertEqual(show_body['security_group']['name'], new_name)
- self.assertEqual(show_body['security_group']['description'],
- new_description)
-
- @test.attr(type='smoke')
- @test.idempotent_id('cfb99e0e-7410-4a3d-8a0c-959a63ee77e9')
- def test_create_show_delete_security_group_rule(self):
- group_create_body, _ = self._create_security_group()
-
- # Create rules for each protocol
- protocols = ['tcp', 'udp', 'icmp']
- for protocol in protocols:
- rule_create_body = self.client.create_security_group_rule(
- security_group_id=group_create_body['security_group']['id'],
- protocol=protocol,
- direction='ingress',
- ethertype=self.ethertype
- )
-
- # Show details of the created security rule
- show_rule_body = self.client.show_security_group_rule(
- rule_create_body['security_group_rule']['id']
- )
- create_dict = rule_create_body['security_group_rule']
- for key, value in six.iteritems(create_dict):
- self.assertEqual(value,
- show_rule_body['security_group_rule'][key],
- "%s does not match." % key)
-
- # List rules and verify created rule is in response
- rule_list_body = self.client.list_security_group_rules()
- rule_list = [rule['id']
- for rule in rule_list_body['security_group_rules']]
- self.assertIn(rule_create_body['security_group_rule']['id'],
- rule_list)
-
- @test.attr(type='smoke')
- @test.idempotent_id('87dfbcf9-1849-43ea-b1e4-efa3eeae9f71')
- def test_create_security_group_rule_with_additional_args(self):
- """Verify security group rule with additional arguments works.
-
- direction:ingress, ethertype:[IPv4/IPv6],
- protocol:tcp, port_range_min:77, port_range_max:77
- """
- group_create_body, _ = self._create_security_group()
- sg_id = group_create_body['security_group']['id']
- direction = 'ingress'
- protocol = 'tcp'
- port_range_min = 77
- port_range_max = 77
- self._create_verify_security_group_rule(sg_id, direction,
- self.ethertype, protocol,
- port_range_min,
- port_range_max)
-
- @test.attr(type='smoke')
- @test.idempotent_id('c9463db8-b44d-4f52-b6c0-8dbda99f26ce')
- def test_create_security_group_rule_with_icmp_type_code(self):
- """Verify security group rule for icmp protocol works.
-
- Specify icmp type (port_range_min) and icmp code
- (port_range_max) with different values. A separate testcase
- is added for icmp protocol as icmp validation would be
- different from tcp/udp.
- """
- group_create_body, _ = self._create_security_group()
-
- sg_id = group_create_body['security_group']['id']
- direction = 'ingress'
- protocol = 'icmp'
- icmp_type_codes = [(3, 2), (3, 0), (8, 0), (0, 0), (11, None)]
- for icmp_type, icmp_code in icmp_type_codes:
- self._create_verify_security_group_rule(sg_id, direction,
- self.ethertype, protocol,
- icmp_type, icmp_code)
-
- @test.attr(type='smoke')
- @test.idempotent_id('c2ed2deb-7a0c-44d8-8b4c-a5825b5c310b')
- def test_create_security_group_rule_with_remote_group_id(self):
- # Verify creating security group rule with remote_group_id works
- sg1_body, _ = self._create_security_group()
- sg2_body, _ = self._create_security_group()
-
- sg_id = sg1_body['security_group']['id']
- direction = 'ingress'
- protocol = 'udp'
- port_range_min = 50
- port_range_max = 55
- remote_id = sg2_body['security_group']['id']
- self._create_verify_security_group_rule(sg_id, direction,
- self.ethertype, protocol,
- port_range_min,
- port_range_max,
- remote_group_id=remote_id)
-
- @test.attr(type='smoke')
- @test.idempotent_id('16459776-5da2-4634-bce4-4b55ee3ec188')
- def test_create_security_group_rule_with_remote_ip_prefix(self):
- # Verify creating security group rule with remote_ip_prefix works
- sg1_body, _ = self._create_security_group()
-
- sg_id = sg1_body['security_group']['id']
- direction = 'ingress'
- protocol = 'tcp'
- port_range_min = 76
- port_range_max = 77
- ip_prefix = self._tenant_network_cidr
- self._create_verify_security_group_rule(sg_id, direction,
- self.ethertype, protocol,
- port_range_min,
- port_range_max,
- remote_ip_prefix=ip_prefix)
-
- @test.attr(type='smoke')
- @test.idempotent_id('0a307599-6655-4220-bebc-fd70c64f2290')
- def test_create_security_group_rule_with_protocol_integer_value(self):
- # Verify creating security group rule with the
- # protocol as integer value
- # arguments : "protocol": 17
- group_create_body, _ = self._create_security_group()
- direction = 'ingress'
- protocol = 17
- security_group_id = group_create_body['security_group']['id']
- rule_create_body = self.client.create_security_group_rule(
- security_group_id=security_group_id,
- direction=direction,
- protocol=protocol
- )
- sec_group_rule = rule_create_body['security_group_rule']
- self.assertEqual(sec_group_rule['direction'], direction)
- self.assertEqual(int(sec_group_rule['protocol']), protocol)
-
-
-class SecGroupIPv6Test(SecGroupTest):
- _ip_version = 6
- _tenant_network_cidr = CONF.network.tenant_network_v6_cidr
diff --git a/neutron/tests/tempest/api/network/test_security_groups_negative.py b/neutron/tests/tempest/api/network/test_security_groups_negative.py
deleted file mode 100644
index b36eec6..0000000
--- a/neutron/tests/tempest/api/network/test_security_groups_negative.py
+++ /dev/null
@@ -1,228 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import uuid
-
-from tempest_lib import exceptions as lib_exc
-
-from neutron.tests.tempest.api.network import base_security_groups as base
-from neutron.tests.tempest import config
-from neutron.tests.tempest import test
-
-CONF = config.CONF
-
-
-class NegativeSecGroupTest(base.BaseSecGroupTest):
-
- _tenant_network_cidr = CONF.network.tenant_network_cidr
-
- @classmethod
- def resource_setup(cls):
- super(NegativeSecGroupTest, cls).resource_setup()
- if not test.is_extension_enabled('security-group', 'network'):
- msg = "security-group extension not enabled."
- raise cls.skipException(msg)
-
- @test.attr(type=['negative', 'gate'])
- @test.idempotent_id('424fd5c3-9ddc-486a-b45f-39bf0c820fc6')
- def test_show_non_existent_security_group(self):
- non_exist_id = str(uuid.uuid4())
- self.assertRaises(lib_exc.NotFound, self.client.show_security_group,
- non_exist_id)
-
- @test.attr(type=['negative', 'gate'])
- @test.idempotent_id('4c094c09-000b-4e41-8100-9617600c02a6')
- def test_show_non_existent_security_group_rule(self):
- non_exist_id = str(uuid.uuid4())
- self.assertRaises(lib_exc.NotFound,
- self.client.show_security_group_rule,
- non_exist_id)
-
- @test.attr(type=['negative', 'gate'])
- @test.idempotent_id('1f1bb89d-5664-4956-9fcd-83ee0fa603df')
- def test_delete_non_existent_security_group(self):
- non_exist_id = str(uuid.uuid4())
- self.assertRaises(lib_exc.NotFound,
- self.client.delete_security_group,
- non_exist_id
- )
-
- @test.attr(type=['negative', 'gate'])
- @test.idempotent_id('981bdc22-ce48-41ed-900a-73148b583958')
- def test_create_security_group_rule_with_bad_protocol(self):
- group_create_body, _ = self._create_security_group()
-
- # Create rule with bad protocol name
- pname = 'bad_protocol_name'
- self.assertRaises(
- lib_exc.BadRequest, self.client.create_security_group_rule,
- security_group_id=group_create_body['security_group']['id'],
- protocol=pname, direction='ingress', ethertype=self.ethertype)
-
- @test.attr(type=['negative', 'gate'])
- @test.idempotent_id('5f8daf69-3c5f-4aaa-88c9-db1d66f68679')
- def test_create_security_group_rule_with_bad_remote_ip_prefix(self):
- group_create_body, _ = self._create_security_group()
-
- # Create rule with bad remote_ip_prefix
- prefix = ['192.168.1./24', '192.168.1.1/33', 'bad_prefix', '256']
- for remote_ip_prefix in prefix:
- self.assertRaises(
- lib_exc.BadRequest, self.client.create_security_group_rule,
- security_group_id=group_create_body['security_group']['id'],
- protocol='tcp', direction='ingress', ethertype=self.ethertype,
- remote_ip_prefix=remote_ip_prefix)
-
- @test.attr(type=['negative', 'gate'])
- @test.idempotent_id('4bf786fd-2f02-443c-9716-5b98e159a49a')
- def test_create_security_group_rule_with_non_existent_remote_groupid(self):
- group_create_body, _ = self._create_security_group()
- non_exist_id = str(uuid.uuid4())
-
- # Create rule with non existent remote_group_id
- group_ids = ['bad_group_id', non_exist_id]
- for remote_group_id in group_ids:
- self.assertRaises(
- lib_exc.NotFound, self.client.create_security_group_rule,
- security_group_id=group_create_body['security_group']['id'],
- protocol='tcp', direction='ingress', ethertype=self.ethertype,
- remote_group_id=remote_group_id)
-
- @test.attr(type=['negative', 'gate'])
- @test.idempotent_id('b5c4b247-6b02-435b-b088-d10d45650881')
- def test_create_security_group_rule_with_remote_ip_and_group(self):
- sg1_body, _ = self._create_security_group()
- sg2_body, _ = self._create_security_group()
-
- # Create rule specifying both remote_ip_prefix and remote_group_id
- prefix = self._tenant_network_cidr
- self.assertRaises(
- lib_exc.BadRequest, self.client.create_security_group_rule,
- security_group_id=sg1_body['security_group']['id'],
- protocol='tcp', direction='ingress',
- ethertype=self.ethertype, remote_ip_prefix=prefix,
- remote_group_id=sg2_body['security_group']['id'])
-
- @test.attr(type=['negative', 'gate'])
- @test.idempotent_id('5666968c-fff3-40d6-9efc-df1c8bd01abb')
- def test_create_security_group_rule_with_bad_ethertype(self):
- group_create_body, _ = self._create_security_group()
-
- # Create rule with bad ethertype
- ethertype = 'bad_ethertype'
- self.assertRaises(
- lib_exc.BadRequest, self.client.create_security_group_rule,
- security_group_id=group_create_body['security_group']['id'],
- protocol='udp', direction='ingress', ethertype=ethertype)
-
- @test.attr(type=['negative', 'gate'])
- @test.idempotent_id('0d9c7791-f2ad-4e2f-ac73-abf2373b0d2d')
- def test_create_security_group_rule_with_invalid_ports(self):
- group_create_body, _ = self._create_security_group()
-
- # Create rule for tcp protocol with invalid ports
- states = [(-16, 80, 'Invalid value for port -16'),
- (80, 79, 'port_range_min must be <= port_range_max'),
- (80, 65536, 'Invalid value for port 65536'),
- (None, 6, 'port_range_min must be <= port_range_max'),
- (-16, 65536, 'Invalid value for port')]
- for pmin, pmax, msg in states:
- ex = self.assertRaises(
- lib_exc.BadRequest, self.client.create_security_group_rule,
- security_group_id=group_create_body['security_group']['id'],
- protocol='tcp', port_range_min=pmin, port_range_max=pmax,
- direction='ingress', ethertype=self.ethertype)
- self.assertIn(msg, str(ex))
-
- # Create rule for icmp protocol with invalid ports
- states = [(1, 256, 'Invalid value for ICMP code'),
- (None, 6, 'ICMP type (port-range-min) is missing'),
- (300, 1, 'Invalid value for ICMP type')]
- for pmin, pmax, msg in states:
- ex = self.assertRaises(
- lib_exc.BadRequest, self.client.create_security_group_rule,
- security_group_id=group_create_body['security_group']['id'],
- protocol='icmp', port_range_min=pmin, port_range_max=pmax,
- direction='ingress', ethertype=self.ethertype)
- self.assertIn(msg, str(ex))
-
- @test.attr(type=['negative', 'smoke'])
- @test.idempotent_id('2323061e-9fbf-4eb0-b547-7e8fafc90849')
- def test_create_additional_default_security_group_fails(self):
- # Create security group named 'default', it should be failed.
- name = 'default'
- self.assertRaises(lib_exc.Conflict,
- self.client.create_security_group,
- name=name)
-
- @test.attr(type=['negative', 'smoke'])
- @test.idempotent_id('8fde898f-ce88-493b-adc9-4e4692879fc5')
- def test_create_duplicate_security_group_rule_fails(self):
- # Create duplicate security group rule, it should fail.
- body, _ = self._create_security_group()
-
- min_port = 66
- max_port = 67
- # Create a rule with valid params
- self.client.create_security_group_rule(
- security_group_id=body['security_group']['id'],
- direction='ingress',
- ethertype=self.ethertype,
- protocol='tcp',
- port_range_min=min_port,
- port_range_max=max_port
- )
-
- # Try creating the same security group rule, it should fail
- self.assertRaises(
- lib_exc.Conflict, self.client.create_security_group_rule,
- security_group_id=body['security_group']['id'],
- protocol='tcp', direction='ingress', ethertype=self.ethertype,
- port_range_min=min_port, port_range_max=max_port)
-
- @test.attr(type=['negative', 'smoke'])
- @test.idempotent_id('be308db6-a7cf-4d5c-9baf-71bafd73f35e')
- def test_create_security_group_rule_with_non_existent_security_group(self):
- # Create security group rules with not existing security group.
- non_existent_sg = str(uuid.uuid4())
- self.assertRaises(lib_exc.NotFound,
- self.client.create_security_group_rule,
- security_group_id=non_existent_sg,
- direction='ingress', ethertype=self.ethertype)
-
-
-class NegativeSecGroupIPv6Test(NegativeSecGroupTest):
- _ip_version = 6
- _tenant_network_cidr = CONF.network.tenant_network_v6_cidr
-
- @test.attr(type=['negative', 'gate'])
- @test.idempotent_id('7607439c-af73-499e-bf64-f687fd12a842')
- def test_create_security_group_rule_wrong_ip_prefix_version(self):
- group_create_body, _ = self._create_security_group()
-
- # Create rule with bad remote_ip_prefix
- pairs = ({'ethertype': 'IPv6',
- 'ip_prefix': CONF.network.tenant_network_cidr},
- {'ethertype': 'IPv4',
- 'ip_prefix': CONF.network.tenant_network_v6_cidr})
- for pair in pairs:
- self.assertRaisesRegexp(
- lib_exc.BadRequest,
- "Conflicting value ethertype",
- self.client.create_security_group_rule,
- security_group_id=group_create_body['security_group']['id'],
- protocol='tcp', direction='ingress',
- ethertype=pair['ethertype'],
- remote_ip_prefix=pair['ip_prefix'])
diff --git a/neutron/tests/tempest/api/network/test_service_type_management.py b/neutron/tests/tempest/api/network/test_service_type_management.py
deleted file mode 100644
index cef620b..0000000
--- a/neutron/tests/tempest/api/network/test_service_type_management.py
+++ /dev/null
@@ -1,33 +0,0 @@
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest_lib import decorators
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest import test
-
-
-class ServiceTypeManagementTestJSON(base.BaseNetworkTest):
-
- @classmethod
- def resource_setup(cls):
- super(ServiceTypeManagementTestJSON, cls).resource_setup()
- if not test.is_extension_enabled('service-type', 'network'):
- msg = "Neutron Service Type Management not enabled."
- raise cls.skipException(msg)
-
- @decorators.skip_because(bug="1400370")
- @test.attr(type='smoke')
- @test.idempotent_id('2cbbeea9-f010-40f6-8df5-4eaa0c918ea6')
- def test_service_provider_list(self):
- body = self.client.list_service_providers()
- self.assertIsInstance(body['service_providers'], list)
diff --git a/neutron/tests/tempest/api/network/test_vpnaas_extensions.py b/neutron/tests/tempest/api/network/test_vpnaas_extensions.py
deleted file mode 100644
index 6abe0a9..0000000
--- a/neutron/tests/tempest/api/network/test_vpnaas_extensions.py
+++ /dev/null
@@ -1,327 +0,0 @@
-# Copyright 2013 OpenStack Foundation
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-from tempest_lib.common.utils import data_utils
-from tempest_lib import exceptions as lib_exc
-
-from neutron.tests.tempest.api.network import base
-from neutron.tests.tempest import config
-from neutron.tests.tempest import test
-
-CONF = config.CONF
-
-
-class VPNaaSTestJSON(base.BaseAdminNetworkTest):
-
- """
- Tests the following operations in the Neutron API using the REST client for
- Neutron:
- List, Show, Create, Delete, and Update VPN Service
- List, Show, Create, Delete, and Update IKE policy
- List, Show, Create, Delete, and Update IPSec policy
- """
-
- @classmethod
- def resource_setup(cls):
- if not test.is_extension_enabled('vpnaas', 'network'):
- msg = "vpnaas extension not enabled."
- raise cls.skipException(msg)
- super(VPNaaSTestJSON, cls).resource_setup()
- cls.ext_net_id = CONF.network.public_network_id
- cls.network = cls.create_network()
- cls.subnet = cls.create_subnet(cls.network)
- cls.router = cls.create_router(
- data_utils.rand_name("router"),
- external_network_id=CONF.network.public_network_id)
- cls.create_router_interface(cls.router['id'], cls.subnet['id'])
- cls.vpnservice = cls.create_vpnservice(cls.subnet['id'],
- cls.router['id'])
-
- cls.ikepolicy = cls.create_ikepolicy(
- data_utils.rand_name("ike-policy-"))
- cls.ipsecpolicy = cls.create_ipsecpolicy(
- data_utils.rand_name("ipsec-policy-"))
-
- def _delete_ike_policy(self, ike_policy_id):
- # Deletes a ike policy and verifies if it is deleted or not
- ike_list = list()
- all_ike = self.client.list_ikepolicies()
- for ike in all_ike['ikepolicies']:
- ike_list.append(ike['id'])
- if ike_policy_id in ike_list:
- self.client.delete_ikepolicy(ike_policy_id)
- # Asserting that the policy is not found in list after deletion
- ikepolicies = self.client.list_ikepolicies()
- ike_id_list = list()
- for i in ikepolicies['ikepolicies']:
- ike_id_list.append(i['id'])
- self.assertNotIn(ike_policy_id, ike_id_list)
-
- def _delete_ipsec_policy(self, ipsec_policy_id):
- # Deletes an ike policy if it exists
- try:
- self.client.delete_ipsecpolicy(ipsec_policy_id)
-
- except lib_exc.NotFound:
- pass
-
- def _assertExpected(self, expected, actual):
- # Check if not expected keys/values exists in actual response body
- for key, value in expected.iteritems():
- self.assertIn(key, actual)
- self.assertEqual(value, actual[key])
-
- def _delete_vpn_service(self, vpn_service_id):
- self.client.delete_vpnservice(vpn_service_id)
- # Asserting if vpn service is found in the list after deletion
- body = self.client.list_vpnservices()
- vpn_services = [vs['id'] for vs in body['vpnservices']]
- self.assertNotIn(vpn_service_id, vpn_services)
-
- def _get_tenant_id(self):
- """
- Returns the tenant_id of the client current user
- """
- # TODO(jroovers) This is a temporary workaround to get the tenant_id
- # of the the current client. Replace this once tenant_isolation for
- # neutron is fixed.
- body = self.client.show_network(self.network['id'])
- return body['network']['tenant_id']
-
- @test.attr(type='smoke')
- @test.idempotent_id('14311574-0737-4e53-ac05-f7ae27742eed')
- def test_admin_create_ipsec_policy_for_tenant(self):
- tenant_id = self._get_tenant_id()
- # Create IPSec policy for the newly created tenant
- name = data_utils.rand_name('ipsec-policy')
- body = (self.admin_client.
- create_ipsecpolicy(name=name, tenant_id=tenant_id))
- ipsecpolicy = body['ipsecpolicy']
- self.assertIsNotNone(ipsecpolicy['id'])
- self.addCleanup(self.admin_client.delete_ipsecpolicy,
- ipsecpolicy['id'])
-
- # Assert that created ipsec policy is found in API list call
- body = self.client.list_ipsecpolicies()
- ipsecpolicies = [policy['id'] for policy in body['ipsecpolicies']]
- self.assertIn(ipsecpolicy['id'], ipsecpolicies)
-
- @test.attr(type='smoke')
- @test.idempotent_id('b62acdc6-0c53-4d84-84aa-859b22b79799')
- def test_admin_create_vpn_service_for_tenant(self):
- tenant_id = self._get_tenant_id()
-
- # Create vpn service for the newly created tenant
- network2 = self.create_network()
- subnet2 = self.create_subnet(network2)
- router2 = self.create_router(data_utils.rand_name('router-'),
- external_network_id=self.ext_net_id)
- self.create_router_interface(router2['id'], subnet2['id'])
- name = data_utils.rand_name('vpn-service')
- body = self.admin_client.create_vpnservice(
- subnet_id=subnet2['id'],
- router_id=router2['id'],
- name=name,
- admin_state_up=True,
- tenant_id=tenant_id)
- vpnservice = body['vpnservice']
- self.assertIsNotNone(vpnservice['id'])
- self.addCleanup(self.admin_client.delete_vpnservice, vpnservice['id'])
- # Assert that created vpnservice is found in API list call
- body = self.client.list_vpnservices()
- vpn_services = [vs['id'] for vs in body['vpnservices']]
- self.assertIn(vpnservice['id'], vpn_services)
-
- @test.attr(type='smoke')
- @test.idempotent_id('58cc4a1c-443b-4f39-8fb6-c19d39f343ab')
- def test_admin_create_ike_policy_for_tenant(self):
- tenant_id = self._get_tenant_id()
-
- # Create IKE policy for the newly created tenant
- name = data_utils.rand_name('ike-policy')
- body = (self.admin_client.
- create_ikepolicy(name=name, ike_version="v1",
- encryption_algorithm="aes-128",
- auth_algorithm="sha1",
- tenant_id=tenant_id))
- ikepolicy = body['ikepolicy']
- self.assertIsNotNone(ikepolicy['id'])
- self.addCleanup(self.admin_client.delete_ikepolicy, ikepolicy['id'])
-
- # Assert that created ike policy is found in API list call
- body = self.client.list_ikepolicies()
- ikepolicies = [ikp['id'] for ikp in body['ikepolicies']]
- self.assertIn(ikepolicy['id'], ikepolicies)
-
- @test.attr(type='smoke')
- @test.idempotent_id('de5bb04c-3a1f-46b1-b329-7a8abba5c7f1')
- def test_list_vpn_services(self):
- # Verify the VPN service exists in the list of all VPN services
- body = self.client.list_vpnservices()
- vpnservices = body['vpnservices']
- self.assertIn(self.vpnservice['id'], [v['id'] for v in vpnservices])
-
- @test.attr(type='smoke')
- @test.idempotent_id('aacb13b1-fdc7-41fd-bab2-32621aee1878')
- def test_create_update_delete_vpn_service(self):
- # Creates a VPN service and sets up deletion
- network1 = self.create_network()
- subnet1 = self.create_subnet(network1)
- router1 = self.create_router(data_utils.rand_name('router-'),
- external_network_id=self.ext_net_id)
- self.create_router_interface(router1['id'], subnet1['id'])
- name = data_utils.rand_name('vpn-service1')
- body = self.client.create_vpnservice(subnet_id=subnet1['id'],
- router_id=router1['id'],
- name=name,
- admin_state_up=True)
- vpnservice = body['vpnservice']
- self.addCleanup(self._delete_vpn_service, vpnservice['id'])
- # Assert if created vpnservices are not found in vpnservices list
- body = self.client.list_vpnservices()
- vpn_services = [vs['id'] for vs in body['vpnservices']]
- self.assertIsNotNone(vpnservice['id'])
- self.assertIn(vpnservice['id'], vpn_services)
-
- # TODO(raies): implement logic to update vpnservice
- # VPNaaS client function to update is implemented.
- # But precondition is that current state of vpnservice
- # should be "ACTIVE" not "PENDING*"
-
- @test.attr(type='smoke')
- @test.idempotent_id('0dedfc1d-f8ee-4e2a-bfd4-7997b9dc17ff')
- def test_show_vpn_service(self):
- # Verifies the details of a vpn service
- body = self.client.show_vpnservice(self.vpnservice['id'])
- vpnservice = body['vpnservice']
- self.assertEqual(self.vpnservice['id'], vpnservice['id'])
- self.assertEqual(self.vpnservice['name'], vpnservice['name'])
- self.assertEqual(self.vpnservice['description'],
- vpnservice['description'])
- self.assertEqual(self.vpnservice['router_id'], vpnservice['router_id'])
- self.assertEqual(self.vpnservice['subnet_id'], vpnservice['subnet_id'])
- self.assertEqual(self.vpnservice['tenant_id'], vpnservice['tenant_id'])
- valid_status = ["ACTIVE", "DOWN", "BUILD", "ERROR", "PENDING_CREATE",
- "PENDING_UPDATE", "PENDING_DELETE"]
- self.assertIn(vpnservice['status'], valid_status)
-
- @test.attr(type='smoke')
- @test.idempotent_id('e0fb6200-da3d-4869-8340-a8c1956ca618')
- def test_list_ike_policies(self):
- # Verify the ike policy exists in the list of all IKE policies
- body = self.client.list_ikepolicies()
- ikepolicies = body['ikepolicies']
- self.assertIn(self.ikepolicy['id'], [i['id'] for i in ikepolicies])
-
- @test.attr(type='smoke')
- @test.idempotent_id('d61f29a5-160c-487d-bc0d-22e32e731b44')
- def test_create_update_delete_ike_policy(self):
- # Creates a IKE policy
- name = data_utils.rand_name('ike-policy')
- body = (self.client.create_ikepolicy(
- name=name,
- ike_version="v1",
- encryption_algorithm="aes-128",
- auth_algorithm="sha1"))
- ikepolicy = body['ikepolicy']
- self.assertIsNotNone(ikepolicy['id'])
- self.addCleanup(self._delete_ike_policy, ikepolicy['id'])
-
- # Update IKE Policy
- new_ike = {'name': data_utils.rand_name("New-IKE"),
- 'description': "Updated ike policy",
- 'encryption_algorithm': "aes-256",
- 'ike_version': "v2",
- 'pfs': "group14",
- 'lifetime': {'units': "seconds", 'value': 2000}}
- self.client.update_ikepolicy(ikepolicy['id'], **new_ike)
- # Confirm that update was successful by verifying using 'show'
- body = self.client.show_ikepolicy(ikepolicy['id'])
- ike_policy = body['ikepolicy']
- for key, value in new_ike.iteritems():
- self.assertIn(key, ike_policy)
- self.assertEqual(value, ike_policy[key])
-
- # Verification of ike policy delete
- self.client.delete_ikepolicy(ikepolicy['id'])
- body = self.client.list_ikepolicies()
- ikepolicies = [ikp['id'] for ikp in body['ikepolicies']]
- self.assertNotIn(ike_policy['id'], ikepolicies)
-
- @test.attr(type='smoke')
- @test.idempotent_id('b5fcf3a3-9407-452d-b8a8-e7c6c32baea8')
- def test_show_ike_policy(self):
- # Verifies the details of a ike policy
- body = self.client.show_ikepolicy(self.ikepolicy['id'])
- ikepolicy = body['ikepolicy']
- self.assertEqual(self.ikepolicy['id'], ikepolicy['id'])
- self.assertEqual(self.ikepolicy['name'], ikepolicy['name'])
- self.assertEqual(self.ikepolicy['description'],
- ikepolicy['description'])
- self.assertEqual(self.ikepolicy['encryption_algorithm'],
- ikepolicy['encryption_algorithm'])
- self.assertEqual(self.ikepolicy['auth_algorithm'],
- ikepolicy['auth_algorithm'])
- self.assertEqual(self.ikepolicy['tenant_id'],
- ikepolicy['tenant_id'])
- self.assertEqual(self.ikepolicy['pfs'],
- ikepolicy['pfs'])
- self.assertEqual(self.ikepolicy['phase1_negotiation_mode'],
- ikepolicy['phase1_negotiation_mode'])
- self.assertEqual(self.ikepolicy['ike_version'],
- ikepolicy['ike_version'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('19ea0a2f-add9-44be-b732-ffd8a7b42f37')
- def test_list_ipsec_policies(self):
- # Verify the ipsec policy exists in the list of all ipsec policies
- body = self.client.list_ipsecpolicies()
- ipsecpolicies = body['ipsecpolicies']
- self.assertIn(self.ipsecpolicy['id'], [i['id'] for i in ipsecpolicies])
-
- @test.attr(type='smoke')
- @test.idempotent_id('9c1701c9-329a-4e5d-930a-1ead1b3f86ad')
- def test_create_update_delete_ipsec_policy(self):
- # Creates an ipsec policy
- ipsec_policy_body = {'name': data_utils.rand_name('ipsec-policy'),
- 'pfs': 'group5',
- 'encryption_algorithm': "aes-128",
- 'auth_algorithm': 'sha1'}
- resp_body = self.client.create_ipsecpolicy(**ipsec_policy_body)
- ipsecpolicy = resp_body['ipsecpolicy']
- self.addCleanup(self._delete_ipsec_policy, ipsecpolicy['id'])
- self._assertExpected(ipsec_policy_body, ipsecpolicy)
- # Verification of ipsec policy update
- new_ipsec = {'description': 'Updated ipsec policy',
- 'pfs': 'group2',
- 'name': data_utils.rand_name("New-IPSec"),
- 'encryption_algorithm': "aes-256",
- 'lifetime': {'units': "seconds", 'value': '2000'}}
- body = self.client.update_ipsecpolicy(ipsecpolicy['id'],
- **new_ipsec)
- updated_ipsec_policy = body['ipsecpolicy']
- self._assertExpected(new_ipsec, updated_ipsec_policy)
- # Verification of ipsec policy delete
- self.client.delete_ipsecpolicy(ipsecpolicy['id'])
- self.assertRaises(lib_exc.NotFound,
- self.client.delete_ipsecpolicy, ipsecpolicy['id'])
-
- @test.attr(type='smoke')
- @test.idempotent_id('601f8a05-9d3c-4539-a400-1c4b3a21b03b')
- def test_show_ipsec_policy(self):
- # Verifies the details of an ipsec policy
- body = self.client.show_ipsecpolicy(self.ipsecpolicy['id'])
- ipsecpolicy = body['ipsecpolicy']
- self._assertExpected(self.ipsecpolicy, ipsecpolicy)
diff --git a/neutron/tests/tempest/common/isolated_creds.py b/neutron/tests/tempest/common/isolated_creds.py
index 5c94289..b4de93b 100644
--- a/neutron/tests/tempest/common/isolated_creds.py
+++ b/neutron/tests/tempest/common/isolated_creds.py
@@ -17,7 +17,7 @@
from tempest_lib.common.utils import data_utils
from tempest_lib import exceptions as lib_exc
-from neutron.tests.api.contrib import clients
+from neutron.tests.api import clients
from neutron.tests.tempest.common import cred_provider
from neutron.tests.tempest import config
from neutron.tests.tempest import exceptions
diff --git a/neutron/tests/tempest/test.py b/neutron/tests/tempest/test.py
index 2f35710..4bb1960 100644
--- a/neutron/tests/tempest/test.py
+++ b/neutron/tests/tempest/test.py
@@ -30,7 +30,7 @@
import testscenarios
import testtools
-from neutron.tests.api.contrib import clients
+from neutron.tests.api import clients
from neutron.tests.tempest.common import credentials
import neutron.tests.tempest.common.generator.valid_generator as valid
from neutron.tests.tempest import config