Merge "Check target_tenant when create rbac policy"
diff --git a/neutron/tests/tempest/api/test_auto_allocated_topology.py b/neutron/tests/tempest/api/test_auto_allocated_topology.py
index afdfe8c..65c3057 100644
--- a/neutron/tests/tempest/api/test_auto_allocated_topology.py
+++ b/neutron/tests/tempest/api/test_auto_allocated_topology.py
@@ -22,11 +22,14 @@
class TestAutoAllocatedTopology(base.BaseAdminNetworkTest):
"""
- NOTE: This test may eventually migrate to Tempest.
-
- Tests the Get-Me-A-Network operation in the Neutron API
+ Tests the Get-Me-A-Network operations in the Neutron API
using the REST client for Neutron.
"""
+ # NOTE(armax): this is a precaution to avoid interference
+ # from other tests exercising this extension. So long as
+ # all tests are added under TestAutoAllocatedTopology,
+ # nothing bad should happen.
+ force_tenant_isolation = True
@classmethod
@test.requires_ext(extension="auto-allocated-topology", service="network")
@@ -101,3 +104,14 @@
# After the initial GET, the API should be idempotent
self.assertEqual(network_id1, network_id2)
self.assertEqual(resources_after1, resources_after2)
+
+ @test.idempotent_id('aabc0b02-cee4-11e5-9f3c-091127605a2b')
+ def test_delete_allocated_net_topology_as_tenant(self):
+ resources_before = self._count_topology_resources()
+ self.assertEqual((0, 0, 0), resources_before)
+ body = self.client.get_auto_allocated_topology()
+ topology = body['auto_allocated_topology']
+ self.assertIsNotNone(topology)
+ self.client.delete_auto_allocated_topology()
+ resources_after = self._count_topology_resources()
+ self.assertEqual((0, 0, 0), resources_after)
diff --git a/neutron/tests/tempest/api/test_floating_ips.py b/neutron/tests/tempest/api/test_floating_ips.py
index 570ca54..8ccdd44 100644
--- a/neutron/tests/tempest/api/test_floating_ips.py
+++ b/neutron/tests/tempest/api/test_floating_ips.py
@@ -41,6 +41,20 @@
for i in range(2):
cls.create_port(cls.network)
+ @test.idempotent_id('f6a0fb6c-cb64-4b81-b0d5-f41d8f69d22d')
+ def test_blank_update_clears_association(self):
+ # originally the floating IP had no attributes other than its
+ # association, so an update with an empty body was a signal to
+ # clear the association. This test ensures we maintain that behavior.
+ body = self.client.create_floatingip(
+ floating_network_id=self.ext_net_id,
+ port_id=self.ports[0]['id'],
+ description='d1'
+ )['floatingip']
+ self.assertEqual(self.ports[0]['id'], body['port_id'])
+ body = self.client.update_floatingip(body['id'])['floatingip']
+ self.assertFalse(body['port_id'])
+
@test.idempotent_id('c72c1c0c-2193-4aca-eeee-b1442641ffff')
@test.requires_ext(extension="standard-attr-description",
service="network")
diff --git a/neutron/tests/tempest/api/test_networks.py b/neutron/tests/tempest/api/test_networks.py
index 16fe81b..d3967ea 100644
--- a/neutron/tests/tempest/api/test_networks.py
+++ b/neutron/tests/tempest/api/test_networks.py
@@ -95,7 +95,7 @@
resource = 'network'
- list_kwargs = {'shared': False}
+ list_kwargs = {'shared': False, 'router:external': False}
@classmethod
def resource_setup(cls):
diff --git a/neutron/tests/tempest/api/test_qos.py b/neutron/tests/tempest/api/test_qos.py
index 313715d..bece71c 100644
--- a/neutron/tests/tempest/api/test_qos.py
+++ b/neutron/tests/tempest/api/test_qos.py
@@ -75,6 +75,28 @@
self.assertTrue(retrieved_policy['shared'])
self.assertEqual([], retrieved_policy['rules'])
+ @test.idempotent_id('6e880e0f-bbfc-4e54-87c6-680f90e1b618')
+ def test_policy_update_forbidden_for_regular_tenants_own_policy(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='',
+ shared=False,
+ tenant_id=self.client.tenant_id)
+ self.assertRaises(
+ exceptions.Forbidden,
+ self.client.update_qos_policy,
+ policy['id'], description='test policy')
+
+ @test.idempotent_id('4ecfd7e7-47b6-4702-be38-be9235901a87')
+ def test_policy_update_forbidden_for_regular_tenants_foreign_policy(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='',
+ shared=False,
+ tenant_id=self.admin_client.tenant_id)
+ self.assertRaises(
+ exceptions.NotFound,
+ self.client.update_qos_policy,
+ policy['id'], description='test policy')
+
@test.idempotent_id('ee263db4-009a-4641-83e5-d0e83506ba4c')
def test_shared_policy_update(self):
policy = self.create_qos_policy(name='test-policy',
@@ -426,6 +448,34 @@
self.client.create_bandwidth_limit_rule,
'policy', 1, 2)
+ @test.idempotent_id('1bfc55d9-6fd8-4293-ab3a-b1d69bf7cd2e')
+ def test_rule_update_forbidden_for_regular_tenants_own_policy(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False,
+ tenant_id=self.client.tenant_id)
+ rule = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
+ max_kbps=1,
+ max_burst_kbps=1)
+ self.assertRaises(
+ exceptions.NotFound,
+ self.client.update_bandwidth_limit_rule,
+ policy['id'], rule['id'], max_kbps=2, max_burst_kbps=4)
+
+ @test.idempotent_id('9a607936-4b6f-4c2f-ad21-bd5b3d4fc91f')
+ def test_rule_update_forbidden_for_regular_tenants_foreign_policy(self):
+ policy = self.create_qos_policy(name='test-policy',
+ description='test policy',
+ shared=False,
+ tenant_id=self.admin_client.tenant_id)
+ rule = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
+ max_kbps=1,
+ max_burst_kbps=1)
+ self.assertRaises(
+ exceptions.NotFound,
+ self.client.update_bandwidth_limit_rule,
+ policy['id'], rule['id'], max_kbps=2, max_burst_kbps=4)
+
@test.idempotent_id('ce0bd0c2-54d9-4e29-85f1-cfb36ac3ebe2')
def test_get_rules_by_policy(self):
policy1 = self.create_qos_policy(name='test-policy1',
diff --git a/neutron/tests/tempest/services/network/json/network_client.py b/neutron/tests/tempest/services/network/json/network_client.py
index 9c8cb05..54a1fc3 100644
--- a/neutron/tests/tempest/services/network/json/network_client.py
+++ b/neutron/tests/tempest/services/network/json/network_client.py
@@ -738,6 +738,12 @@
body = jsonutils.loads(body)
return service_client.ResponseBody(resp, body)
+ def delete_auto_allocated_topology(self, tenant_id=None):
+ uri = '%s/auto-allocated-topology/%s' % (self.uri_prefix, tenant_id)
+ resp, body = self.delete(uri)
+ self.expected_success(204, resp.status)
+ return service_client.ResponseBody(resp, body)
+
def create_security_group_rule(self, direction, security_group_id,
**kwargs):
post_body = {'security_group_rule': kwargs}