Merge "New API call to get details of supported QoS rule type"
diff --git a/neutron/tests/tempest/api/test_revisions.py b/neutron/tests/tempest/api/test_revisions.py
index d94aede..83c8410 100644
--- a/neutron/tests/tempest/api/test_revisions.py
+++ b/neutron/tests/tempest/api/test_revisions.py
@@ -13,6 +13,7 @@
import netaddr
from tempest.lib import decorators
+from tempest.lib import exceptions
from tempest import test
from neutron.tests.tempest.api import base
@@ -33,6 +34,35 @@
self.assertGreater(updated['network']['revision_number'],
net['revision_number'])
+ @decorators.idempotent_id('4a26a4be-9c53-483c-bc50-b11111113333')
+ def test_update_network_constrained_by_revision(self):
+ net = self.create_network()
+ current = net['revision_number']
+ stale = current - 1
+ # using a stale number should fail
+ self.assertRaises(
+ exceptions.PreconditionFailed,
+ self.client.update_network,
+ net['id'], name='newnet',
+ headers={'If-Match': 'revision_number=%s' % stale}
+ )
+
+ # using current should pass. in case something is updating the network
+ # on the server at the same time, we have to re-read and update to be
+ # safe
+ for i in range(100):
+ current = (self.client.show_network(net['id'])
+ ['network']['revision_number'])
+ try:
+ self.client.update_network(
+ net['id'], name='newnet',
+ headers={'If-Match': 'revision_number=%s' % current})
+ except exceptions.UnexpectedResponseCode:
+ continue
+ break
+ else:
+ self.fail("Failed to update network after 100 tries.")
+
@decorators.idempotent_id('cac7ecde-12d5-4331-9a03-420899dea077')
def test_update_port_bumps_revision(self):
net = self.create_network()
diff --git a/neutron/tests/tempest/api/test_subnetpools.py b/neutron/tests/tempest/api/test_subnetpools.py
index ec47606..1132d54 100644
--- a/neutron/tests/tempest/api/test_subnetpools.py
+++ b/neutron/tests/tempest/api/test_subnetpools.py
@@ -324,6 +324,74 @@
body = self.client.show_subnetpool(pool_id)
self.assertIsNone(body['subnetpool']['address_scope_id'])
+ @decorators.idempotent_id('4c6963c2-f54c-4347-b288-75d18421c4c4')
+ @test.requires_ext(extension='default-subnetpools', service='network')
+ def test_tenant_create_non_default_subnetpool(self):
+ """
+ Test creates a subnetpool, the "is_default" attribute is False.
+ """
+ created_subnetpool = self._create_subnetpool()
+ self.assertFalse(created_subnetpool['is_default'])
+
+
+class DefaultSubnetPoolsTest(SubnetPoolsTestBase):
+
+ @classmethod
+ def resource_setup(cls):
+ super(DefaultSubnetPoolsTest, cls).resource_setup()
+ body = cls.admin_client.list_subnetpools()
+ subnetpools = body['subnetpools']
+ for subnetpool in subnetpools:
+ if subnetpool.get('is_default'):
+ msg = 'Default subnetpool already exists. Only one is allowed.'
+ raise cls.skipException(msg)
+
+ @decorators.idempotent_id('cb839106-6184-4332-b292-5d07c074de4f')
+ @test.requires_ext(extension='default-subnetpools', service='network')
+ def test_admin_create_default_subnetpool(self):
+ """
+ Test uses administrative credentials to create a default subnetpool,
+ using the is_default=True.
+ """
+ created_subnetpool = self._create_subnetpool(is_admin=True,
+ is_default=True)
+ self.assertTrue(created_subnetpool['is_default'])
+
+ @decorators.idempotent_id('9e79730c-29b6-44a4-9504-bf3c7cedc56c')
+ @test.requires_ext(extension='default-subnetpools', service='network')
+ def test_convert_subnetpool_to_default_subnetpool(self):
+ """
+ Test creates a subnetpool, which is non default subnetpool.
+ Then it will update to a default subnetpool, by setting "is_default"
+ attribute to True.
+ """
+ created_subnetpool = self._create_subnetpool()
+ subnetpool_id = created_subnetpool['id']
+ self.assertFalse(created_subnetpool['is_default'])
+ subnetpool_data = {'is_default': True}
+ self.admin_client.update_subnetpool(subnetpool_id,
+ **subnetpool_data)
+ show_body = self.client.show_subnetpool(subnetpool_id)
+ self.assertTrue(show_body['subnetpool']['is_default'])
+
+ @decorators.idempotent_id('39687561-7a37-47b8-91ce-f9143ae26969')
+ @test.requires_ext(extension='default-subnetpools', service='network')
+ def test_convert_default_subnetpool_to_non_default(self):
+ """
+ Test uses administrative credentials to create a default subnetpool,
+ using the is_default=True.
+ Then it will update "is_default" attribute to False.
+ """
+ created_subnetpool = self._create_subnetpool(is_admin=True,
+ is_default=True)
+ subnetpool_id = created_subnetpool['id']
+ self.assertTrue(created_subnetpool['is_default'])
+ subnetpool_data = {'is_default': False}
+ self.admin_client.update_subnetpool(subnetpool_id,
+ **subnetpool_data)
+ show_body = self.client.show_subnetpool(subnetpool_id)
+ self.assertFalse(show_body['subnetpool']['is_default'])
+
class SubnetPoolsTestV6(SubnetPoolsTest):
diff --git a/neutron/tests/tempest/api/test_subnetpools_negative.py b/neutron/tests/tempest/api/test_subnetpools_negative.py
index c973d76..017eca4 100644
--- a/neutron/tests/tempest/api/test_subnetpools_negative.py
+++ b/neutron/tests/tempest/api/test_subnetpools_negative.py
@@ -60,6 +60,14 @@
is_admin=False, shared=True)
@test.attr(type='negative')
+ @decorators.idempotent_id('6ae09d8f-95be-40ed-b1cf-8b850d45bab5')
+ @test.requires_ext(extension='default-subnetpools', service='network')
+ def test_tenant_create_default_subnetpool(self):
+ # 'default' subnetpool can only be created by admin.
+ self.assertRaises(lib_exc.Forbidden, self._create_subnetpool,
+ is_admin=False, is_default=True)
+
+ @test.attr(type='negative')
@decorators.idempotent_id('4be84d30-60ca-4bd3-8512-db5b36ce1378')
def test_update_non_existent_subnetpool(self):
non_exist_id = data_utils.rand_name('subnetpool')
diff --git a/neutron/tests/tempest/scenario/base.py b/neutron/tests/tempest/scenario/base.py
index 5ccbd1c..ac18ab5 100644
--- a/neutron/tests/tempest/scenario/base.py
+++ b/neutron/tests/tempest/scenario/base.py
@@ -192,10 +192,10 @@
waiters.wait_for_server_status(self.os_primary.servers_client,
self.server['server']['id'],
constants.SERVER_STATUS_ACTIVE)
- port = self.client.list_ports(network_id=self.network['id'],
- device_id=self.server[
- 'server']['id'])['ports'][0]
- self.fip = self.create_and_associate_floatingip(port['id'])
+ self.port = self.client.list_ports(network_id=self.network['id'],
+ device_id=self.server[
+ 'server']['id'])['ports'][0]
+ self.fip = self.create_and_associate_floatingip(self.port['id'])
def check_connectivity(self, host, ssh_user, ssh_key, servers=None):
ssh_client = ssh.Client(host, ssh_user, pkey=ssh_key)
diff --git a/neutron/tests/tempest/scenario/test_portsecurity.py b/neutron/tests/tempest/scenario/test_portsecurity.py
new file mode 100644
index 0000000..76b23a4
--- /dev/null
+++ b/neutron/tests/tempest/scenario/test_portsecurity.py
@@ -0,0 +1,53 @@
+# All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from tempest.lib import decorators
+
+from neutron.tests.tempest import config
+from neutron.tests.tempest.scenario import base
+
+CONF = config.CONF
+
+
+class PortSecurityTest(base.BaseTempestTestCase):
+ credentials = ['primary']
+ required_extensions = ['port-security']
+
+ @decorators.idempotent_id('61ab176e-d48b-42b7-b38a-1ba571ecc033')
+ def test_port_security_removed_added(self):
+ """Test connection works after port security has been removed
+
+ Initial test that vm is accessible. Then port security is removed,
+ checked connectivity. Port security is added back and checked
+ connectivity again.
+ """
+ self.setup_network_and_server()
+ self.check_connectivity(self.fip['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ self.keypair['private_key'])
+ sec_group_id = self.security_groups[0]['id']
+
+ self.port = self.update_port(port=self.port,
+ port_security_enabled=False,
+ security_groups=[])
+ self.check_connectivity(self.fip['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ self.keypair['private_key'])
+
+ self.port = self.update_port(port=self.port,
+ port_security_enabled=True,
+ security_groups=[sec_group_id])
+ self.check_connectivity(self.fip['floating_ip_address'],
+ CONF.validation.image_ssh_user,
+ self.keypair['private_key'])
diff --git a/neutron/tests/tempest/services/network/json/network_client.py b/neutron/tests/tempest/services/network/json/network_client.py
index 7c311f4..39f2c38 100644
--- a/neutron/tests/tempest/services/network/json/network_client.py
+++ b/neutron/tests/tempest/services/network/json/network_client.py
@@ -148,10 +148,11 @@
def _updater(self, resource_name):
def _update(res_id, **kwargs):
+ headers = kwargs.pop('headers', {})
plural = self.pluralize(resource_name)
uri = '%s/%s' % (self.get_uri(plural), res_id)
post_data = self.serialize({resource_name: kwargs})
- resp, body = self.put(uri, post_data)
+ resp, body = self.put(uri, post_data, headers=headers)
body = self.deserialize_single(body)
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)