blob: f232d09874e97a82b2b7446205c3be0f8d3c47ce [file] [log] [blame]
Daniel Mellado3c0aeab2016-01-29 11:30:25 +00001# Copyright 2015 Red Hat, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15from tempest.lib.common.utils import data_utils
16from tempest.lib import exceptions
17from tempest import test
18
19import testtools
20
21from neutron.services.qos import qos_consts
22from neutron.tests.tempest.api import base
23
24
25class QosTestJSON(base.BaseAdminNetworkTest):
26 @classmethod
27 @test.requires_ext(extension="qos", service="network")
28 def resource_setup(cls):
29 super(QosTestJSON, cls).resource_setup()
30
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000031 @test.idempotent_id('108fbdf7-3463-4e47-9871-d07f3dcf5bbb')
32 def test_create_policy(self):
33 policy = self.create_qos_policy(name='test-policy',
34 description='test policy desc1',
35 shared=False)
36
37 # Test 'show policy'
38 retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
39 retrieved_policy = retrieved_policy['policy']
40 self.assertEqual('test-policy', retrieved_policy['name'])
41 self.assertEqual('test policy desc1', retrieved_policy['description'])
42 self.assertFalse(retrieved_policy['shared'])
43
44 # Test 'list policies'
45 policies = self.admin_client.list_qos_policies()['policies']
46 policies_ids = [p['id'] for p in policies]
47 self.assertIn(policy['id'], policies_ids)
48
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000049 @test.idempotent_id('f8d20e92-f06d-4805-b54f-230f77715815')
50 def test_list_policy_filter_by_name(self):
51 self.create_qos_policy(name='test', description='test policy',
52 shared=False)
53 self.create_qos_policy(name='test2', description='test policy',
54 shared=False)
55
56 policies = (self.admin_client.
57 list_qos_policies(name='test')['policies'])
58 self.assertEqual(1, len(policies))
59
60 retrieved_policy = policies[0]
61 self.assertEqual('test', retrieved_policy['name'])
62
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000063 @test.idempotent_id('8e88a54b-f0b2-4b7d-b061-a15d93c2c7d6')
64 def test_policy_update(self):
65 policy = self.create_qos_policy(name='test-policy',
66 description='',
67 shared=False)
68 self.admin_client.update_qos_policy(policy['id'],
69 description='test policy desc2',
70 shared=True)
71
72 retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
73 retrieved_policy = retrieved_policy['policy']
74 self.assertEqual('test policy desc2', retrieved_policy['description'])
75 self.assertTrue(retrieved_policy['shared'])
76 self.assertEqual([], retrieved_policy['rules'])
77
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000078 @test.idempotent_id('1cb42653-54bd-4a9a-b888-c55e18199201')
79 def test_delete_policy(self):
80 policy = self.admin_client.create_qos_policy(
81 'test-policy', 'desc', True)['policy']
82
83 retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
84 retrieved_policy = retrieved_policy['policy']
85 self.assertEqual('test-policy', retrieved_policy['name'])
86
87 self.admin_client.delete_qos_policy(policy['id'])
88 self.assertRaises(exceptions.NotFound,
89 self.admin_client.show_qos_policy, policy['id'])
90
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000091 @test.idempotent_id('cf776f77-8d3d-49f2-8572-12d6a1557224')
92 def test_list_admin_rule_types(self):
93 self._test_list_rule_types(self.admin_client)
94
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000095 @test.idempotent_id('49c8ea35-83a9-453a-bd23-239cf3b13929')
96 def test_list_regular_rule_types(self):
97 self._test_list_rule_types(self.client)
98
99 def _test_list_rule_types(self, client):
100 # List supported rule types
101 # TODO(QoS): since in gate we run both ovs and linuxbridge ml2 drivers,
102 # and since Linux Bridge ml2 driver does not have QoS support yet, ml2
103 # plugin reports no rule types are supported. Once linuxbridge will
104 # receive support for QoS, the list of expected rule types will change.
105 #
106 # In theory, we could make the test conditional on which ml2 drivers
107 # are enabled in gate (or more specifically, on which supported qos
108 # rules are claimed by core plugin), but that option doesn't seem to be
109 # available thru tempest.lib framework
110 expected_rule_types = []
111 expected_rule_details = ['type']
112
113 rule_types = client.list_qos_rule_types()
114 actual_list_rule_types = rule_types['rule_types']
115 actual_rule_types = [rule['type'] for rule in actual_list_rule_types]
116
117 # Verify that only required fields present in rule details
118 for rule in actual_list_rule_types:
119 self.assertEqual(tuple(rule.keys()), tuple(expected_rule_details))
120
121 # Verify if expected rules are present in the actual rules list
122 for rule in expected_rule_types:
123 self.assertIn(rule, actual_rule_types)
124
125 def _disassociate_network(self, client, network_id):
126 client.update_network(network_id, qos_policy_id=None)
127 updated_network = self.admin_client.show_network(network_id)
128 self.assertIsNone(updated_network['network']['qos_policy_id'])
129
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000130 @test.idempotent_id('65b9ef75-1911-406a-bbdb-ca1d68d528b0')
131 def test_policy_association_with_admin_network(self):
132 policy = self.create_qos_policy(name='test-policy',
133 description='test policy',
134 shared=False)
135 network = self.create_shared_network('test network',
136 qos_policy_id=policy['id'])
137
138 retrieved_network = self.admin_client.show_network(network['id'])
139 self.assertEqual(
140 policy['id'], retrieved_network['network']['qos_policy_id'])
141
142 self._disassociate_network(self.admin_client, network['id'])
143
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000144 @test.idempotent_id('1738de5d-0476-4163-9022-5e1b548c208e')
145 def test_policy_association_with_tenant_network(self):
146 policy = self.create_qos_policy(name='test-policy',
147 description='test policy',
148 shared=True)
149 network = self.create_network('test network',
150 qos_policy_id=policy['id'])
151
152 retrieved_network = self.admin_client.show_network(network['id'])
153 self.assertEqual(
154 policy['id'], retrieved_network['network']['qos_policy_id'])
155
156 self._disassociate_network(self.client, network['id'])
157
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000158 @test.idempotent_id('9efe63d0-836f-4cc2-b00c-468e63aa614e')
159 def test_policy_association_with_network_nonexistent_policy(self):
160 self.assertRaises(
161 exceptions.NotFound,
162 self.create_network,
163 'test network',
164 qos_policy_id='9efe63d0-836f-4cc2-b00c-468e63aa614e')
165
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000166 @test.idempotent_id('1aa55a79-324f-47d9-a076-894a8fc2448b')
167 def test_policy_association_with_network_non_shared_policy(self):
168 policy = self.create_qos_policy(name='test-policy',
169 description='test policy',
170 shared=False)
171 self.assertRaises(
172 exceptions.NotFound,
173 self.create_network,
174 'test network', qos_policy_id=policy['id'])
175
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000176 @test.idempotent_id('09a9392c-1359-4cbb-989f-fb768e5834a8')
177 def test_policy_update_association_with_admin_network(self):
178 policy = self.create_qos_policy(name='test-policy',
179 description='test policy',
180 shared=False)
181 network = self.create_shared_network('test network')
182 retrieved_network = self.admin_client.show_network(network['id'])
183 self.assertIsNone(retrieved_network['network']['qos_policy_id'])
184
185 self.admin_client.update_network(network['id'],
186 qos_policy_id=policy['id'])
187 retrieved_network = self.admin_client.show_network(network['id'])
188 self.assertEqual(
189 policy['id'], retrieved_network['network']['qos_policy_id'])
190
191 self._disassociate_network(self.admin_client, network['id'])
192
193 def _disassociate_port(self, port_id):
194 self.client.update_port(port_id, qos_policy_id=None)
195 updated_port = self.admin_client.show_port(port_id)
196 self.assertIsNone(updated_port['port']['qos_policy_id'])
197
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000198 @test.idempotent_id('98fcd95e-84cf-4746-860e-44692e674f2e')
199 def test_policy_association_with_port_shared_policy(self):
200 policy = self.create_qos_policy(name='test-policy',
201 description='test policy',
202 shared=True)
203 network = self.create_shared_network('test network')
204 port = self.create_port(network, qos_policy_id=policy['id'])
205
206 retrieved_port = self.admin_client.show_port(port['id'])
207 self.assertEqual(
208 policy['id'], retrieved_port['port']['qos_policy_id'])
209
210 self._disassociate_port(port['id'])
211
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000212 @test.idempotent_id('49e02f5a-e1dd-41d5-9855-cfa37f2d195e')
213 def test_policy_association_with_port_nonexistent_policy(self):
214 network = self.create_shared_network('test network')
215 self.assertRaises(
216 exceptions.NotFound,
217 self.create_port,
218 network,
219 qos_policy_id='49e02f5a-e1dd-41d5-9855-cfa37f2d195e')
220
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000221 @test.idempotent_id('f53d961c-9fe5-4422-8b66-7add972c6031')
222 def test_policy_association_with_port_non_shared_policy(self):
223 policy = self.create_qos_policy(name='test-policy',
224 description='test policy',
225 shared=False)
226 network = self.create_shared_network('test network')
227 self.assertRaises(
228 exceptions.NotFound,
229 self.create_port,
230 network, qos_policy_id=policy['id'])
231
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000232 @test.idempotent_id('f8163237-fba9-4db5-9526-bad6d2343c76')
233 def test_policy_update_association_with_port_shared_policy(self):
234 policy = self.create_qos_policy(name='test-policy',
235 description='test policy',
236 shared=True)
237 network = self.create_shared_network('test network')
238 port = self.create_port(network)
239 retrieved_port = self.admin_client.show_port(port['id'])
240 self.assertIsNone(retrieved_port['port']['qos_policy_id'])
241
242 self.client.update_port(port['id'], qos_policy_id=policy['id'])
243 retrieved_port = self.admin_client.show_port(port['id'])
244 self.assertEqual(
245 policy['id'], retrieved_port['port']['qos_policy_id'])
246
247 self._disassociate_port(port['id'])
248
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000249 @test.idempotent_id('18163237-8ba9-4db5-9525-bad6d2343c75')
250 def test_delete_not_allowed_if_policy_in_use_by_network(self):
251 policy = self.create_qos_policy(name='test-policy',
252 description='test policy',
253 shared=True)
254 network = self.create_shared_network(
255 'test network', qos_policy_id=policy['id'])
256 self.assertRaises(
257 exceptions.Conflict,
258 self.admin_client.delete_qos_policy, policy['id'])
259
260 self._disassociate_network(self.admin_client, network['id'])
261 self.admin_client.delete_qos_policy(policy['id'])
262
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000263 @test.idempotent_id('24153230-84a9-4dd5-9525-bad6d2343c75')
264 def test_delete_not_allowed_if_policy_in_use_by_port(self):
265 policy = self.create_qos_policy(name='test-policy',
266 description='test policy',
267 shared=True)
268 network = self.create_shared_network('test network')
269 port = self.create_port(network, qos_policy_id=policy['id'])
270 self.assertRaises(
271 exceptions.Conflict,
272 self.admin_client.delete_qos_policy, policy['id'])
273
274 self._disassociate_port(port['id'])
275 self.admin_client.delete_qos_policy(policy['id'])
276
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000277 @test.idempotent_id('a2a5849b-dd06-4b18-9664-0b6828a1fc27')
278 def test_qos_policy_delete_with_rules(self):
279 policy = self.create_qos_policy(name='test-policy',
280 description='test policy',
281 shared=False)
282 self.admin_client.create_bandwidth_limit_rule(
283 policy['id'], 200, 1337)['bandwidth_limit_rule']
284
285 self.admin_client.delete_qos_policy(policy['id'])
286
287 with testtools.ExpectedException(exceptions.NotFound):
288 self.admin_client.show_qos_policy(policy['id'])
289
290
291class QosBandwidthLimitRuleTestJSON(base.BaseAdminNetworkTest):
292 @classmethod
293 @test.requires_ext(extension="qos", service="network")
294 def resource_setup(cls):
295 super(QosBandwidthLimitRuleTestJSON, cls).resource_setup()
296
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000297 @test.idempotent_id('8a59b00b-3e9c-4787-92f8-93a5cdf5e378')
298 def test_rule_create(self):
299 policy = self.create_qos_policy(name='test-policy',
300 description='test policy',
301 shared=False)
302 rule = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
303 max_kbps=200,
304 max_burst_kbps=1337)
305
306 # Test 'show rule'
307 retrieved_rule = self.admin_client.show_bandwidth_limit_rule(
308 policy['id'], rule['id'])
309 retrieved_rule = retrieved_rule['bandwidth_limit_rule']
310 self.assertEqual(rule['id'], retrieved_rule['id'])
311 self.assertEqual(200, retrieved_rule['max_kbps'])
312 self.assertEqual(1337, retrieved_rule['max_burst_kbps'])
313
314 # Test 'list rules'
315 rules = self.admin_client.list_bandwidth_limit_rules(policy['id'])
316 rules = rules['bandwidth_limit_rules']
317 rules_ids = [r['id'] for r in rules]
318 self.assertIn(rule['id'], rules_ids)
319
320 # Test 'show policy'
321 retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
322 policy_rules = retrieved_policy['policy']['rules']
323 self.assertEqual(1, len(policy_rules))
324 self.assertEqual(rule['id'], policy_rules[0]['id'])
325 self.assertEqual(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT,
326 policy_rules[0]['type'])
327
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000328 @test.idempotent_id('8a59b00b-ab01-4787-92f8-93a5cdf5e378')
329 def test_rule_create_fail_for_the_same_type(self):
330 policy = self.create_qos_policy(name='test-policy',
331 description='test policy',
332 shared=False)
333 self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
334 max_kbps=200,
335 max_burst_kbps=1337)
336
337 self.assertRaises(exceptions.Conflict,
338 self.create_qos_bandwidth_limit_rule,
339 policy_id=policy['id'],
340 max_kbps=201, max_burst_kbps=1338)
341
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000342 @test.idempotent_id('149a6988-2568-47d2-931e-2dbc858943b3')
343 def test_rule_update(self):
344 policy = self.create_qos_policy(name='test-policy',
345 description='test policy',
346 shared=False)
347 rule = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
348 max_kbps=1,
349 max_burst_kbps=1)
350
351 self.admin_client.update_bandwidth_limit_rule(policy['id'],
352 rule['id'],
353 max_kbps=200,
354 max_burst_kbps=1337)
355
356 retrieved_policy = self.admin_client.show_bandwidth_limit_rule(
357 policy['id'], rule['id'])
358 retrieved_policy = retrieved_policy['bandwidth_limit_rule']
359 self.assertEqual(200, retrieved_policy['max_kbps'])
360 self.assertEqual(1337, retrieved_policy['max_burst_kbps'])
361
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000362 @test.idempotent_id('67ee6efd-7b33-4a68-927d-275b4f8ba958')
363 def test_rule_delete(self):
364 policy = self.create_qos_policy(name='test-policy',
365 description='test policy',
366 shared=False)
367 rule = self.admin_client.create_bandwidth_limit_rule(
368 policy['id'], 200, 1337)['bandwidth_limit_rule']
369
370 retrieved_policy = self.admin_client.show_bandwidth_limit_rule(
371 policy['id'], rule['id'])
372 retrieved_policy = retrieved_policy['bandwidth_limit_rule']
373 self.assertEqual(rule['id'], retrieved_policy['id'])
374
375 self.admin_client.delete_bandwidth_limit_rule(policy['id'], rule['id'])
376 self.assertRaises(exceptions.NotFound,
377 self.admin_client.show_bandwidth_limit_rule,
378 policy['id'], rule['id'])
379
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000380 @test.idempotent_id('f211222c-5808-46cb-a961-983bbab6b852')
381 def test_rule_create_rule_nonexistent_policy(self):
382 self.assertRaises(
383 exceptions.NotFound,
384 self.create_qos_bandwidth_limit_rule,
385 'policy', 200, 1337)
386
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000387 @test.idempotent_id('eed8e2a6-22da-421b-89b9-935a2c1a1b50')
388 def test_policy_create_forbidden_for_regular_tenants(self):
389 self.assertRaises(
390 exceptions.Forbidden,
391 self.client.create_qos_policy,
392 'test-policy', 'test policy', False)
393
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000394 @test.idempotent_id('a4a2e7ad-786f-4927-a85a-e545a93bd274')
395 def test_rule_create_forbidden_for_regular_tenants(self):
396 self.assertRaises(
397 exceptions.Forbidden,
398 self.client.create_bandwidth_limit_rule,
399 'policy', 1, 2)
400
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000401 @test.idempotent_id('ce0bd0c2-54d9-4e29-85f1-cfb36ac3ebe2')
402 def test_get_rules_by_policy(self):
403 policy1 = self.create_qos_policy(name='test-policy1',
404 description='test policy1',
405 shared=False)
406 rule1 = self.create_qos_bandwidth_limit_rule(policy_id=policy1['id'],
407 max_kbps=200,
408 max_burst_kbps=1337)
409
410 policy2 = self.create_qos_policy(name='test-policy2',
411 description='test policy2',
412 shared=False)
413 rule2 = self.create_qos_bandwidth_limit_rule(policy_id=policy2['id'],
414 max_kbps=5000,
415 max_burst_kbps=2523)
416
417 # Test 'list rules'
418 rules = self.admin_client.list_bandwidth_limit_rules(policy1['id'])
419 rules = rules['bandwidth_limit_rules']
420 rules_ids = [r['id'] for r in rules]
421 self.assertIn(rule1['id'], rules_ids)
422 self.assertNotIn(rule2['id'], rules_ids)
423
424
425class RbacSharedQosPoliciesTest(base.BaseAdminNetworkTest):
426
427 force_tenant_isolation = True
428 credentials = ['primary', 'alt', 'admin']
429
430 @classmethod
431 @test.requires_ext(extension="qos", service="network")
432 def resource_setup(cls):
433 super(RbacSharedQosPoliciesTest, cls).resource_setup()
434 cls.client2 = cls.alt_manager.network_client
435
436 def _create_qos_policy(self, tenant_id=None):
437 args = {'name': data_utils.rand_name('test-policy'),
438 'description': 'test policy',
439 'shared': False,
440 'tenant_id': tenant_id}
441 qos_policy = self.admin_client.create_qos_policy(**args)['policy']
442 self.addCleanup(self.admin_client.delete_qos_policy, qos_policy['id'])
443
444 return qos_policy
445
446 def _make_admin_policy_shared_to_tenant_id(self, tenant_id):
447 policy = self._create_qos_policy()
448 rbac_policy = self.admin_client.create_rbac_policy(
449 object_type='qos_policy',
450 object_id=policy['id'],
451 action='access_as_shared',
452 target_tenant=tenant_id,
453 )['rbac_policy']
454
455 return {'policy': policy, 'rbac_policy': rbac_policy}
456
457 def _create_network(self, qos_policy_id, client, should_cleanup=True):
458 net = client.create_network(
459 name=data_utils.rand_name('test-network'),
460 qos_policy_id=qos_policy_id)['network']
461 if should_cleanup:
462 self.addCleanup(client.delete_network, net['id'])
463
464 return net
465
466 @test.idempotent_id('b9dcf582-d3b3-11e5-950a-54ee756c66df')
467 def test_policy_sharing_with_wildcard(self):
468 qos_pol = self.create_qos_policy(
469 name=data_utils.rand_name('test-policy'),
470 description='test-shared-policy', shared=False)
471 self.assertNotIn(qos_pol, self.client2.list_qos_policies()['policies'])
472
473 # test update shared False -> True
474 self.admin_client.update_qos_policy(qos_pol['id'], shared=True)
475 qos_pol['shared'] = True
476 self.client2.show_qos_policy(qos_pol['id'])
477 rbac_pol = {'target_tenant': '*',
478 'tenant_id': self.admin_client.tenant_id,
479 'object_type': 'qos_policy',
480 'object_id': qos_pol['id'],
481 'action': 'access_as_shared'}
482
483 rbac_policies = self.admin_client.list_rbac_policies()['rbac_policies']
484 rbac_policies = [r for r in rbac_policies if r.pop('id')]
485 self.assertIn(rbac_pol, rbac_policies)
486
487 # update shared True -> False should fail because the policy is bound
488 # to a network
489 net = self._create_network(qos_pol['id'], self.admin_client, False)
490 with testtools.ExpectedException(exceptions.Conflict):
491 self.admin_client.update_qos_policy(qos_pol['id'], shared=False)
492
493 # delete the network, and update shared True -> False should pass now
494 self.admin_client.delete_network(net['id'])
495 self.admin_client.update_qos_policy(qos_pol['id'], shared=False)
496 qos_pol['shared'] = False
497 self.assertNotIn(qos_pol, self.client2.list_qos_policies()['policies'])
498
499 def _create_net_bound_qos_rbacs(self):
500 res = self._make_admin_policy_shared_to_tenant_id(
501 self.client.tenant_id)
502 qos_policy, rbac_for_client_tenant = res['policy'], res['rbac_policy']
503
504 # add a wildcard rbac rule - now the policy globally shared
505 rbac_wildcard = self.admin_client.create_rbac_policy(
506 object_type='qos_policy',
507 object_id=qos_policy['id'],
508 action='access_as_shared',
509 target_tenant='*',
510 )['rbac_policy']
511
512 # tenant1 now uses qos policy for net
513 self._create_network(qos_policy['id'], self.client)
514
515 return rbac_for_client_tenant, rbac_wildcard
516
517 @test.idempotent_id('328b1f70-d424-11e5-a57f-54ee756c66df')
518 def test_net_bound_shared_policy_wildcard_and_tenant_id_wild_remove(self):
519 client_rbac, wildcard_rbac = self._create_net_bound_qos_rbacs()
520 # globally unshare the qos-policy, the specific share should remain
521 self.admin_client.delete_rbac_policy(wildcard_rbac['id'])
522 self.client.list_rbac_policies(id=client_rbac['id'])
523
Miguel Angel Ajocda3f072016-04-01 15:24:54 +0200524 @test.idempotent_id('1997b00c-0c75-4e43-8ce2-999f9fa555ee')
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000525 def test_net_bound_shared_policy_wildcard_and_tenant_id_wild_remains(self):
526 client_rbac, wildcard_rbac = self._create_net_bound_qos_rbacs()
527 # remove client_rbac policy the wildcard share should remain
528 self.admin_client.delete_rbac_policy(client_rbac['id'])
529 self.client.list_rbac_policies(id=wildcard_rbac['id'])
530
531 @test.idempotent_id('2ace9adc-da6e-11e5-aafe-54ee756c66df')
532 def test_policy_sharing_with_wildcard_and_tenant_id(self):
533 res = self._make_admin_policy_shared_to_tenant_id(
534 self.client.tenant_id)
535 qos_policy, rbac = res['policy'], res['rbac_policy']
536 qos_pol = self.client.show_qos_policy(qos_policy['id'])['policy']
537 self.assertTrue(qos_pol['shared'])
538 with testtools.ExpectedException(exceptions.NotFound):
539 self.client2.show_qos_policy(qos_policy['id'])
540
541 # make the qos-policy globally shared
542 self.admin_client.update_qos_policy(qos_policy['id'], shared=True)
543 qos_pol = self.client2.show_qos_policy(qos_policy['id'])['policy']
544 self.assertTrue(qos_pol['shared'])
545
546 # globally unshare the qos-policy, the specific share should remain
547 self.admin_client.update_qos_policy(qos_policy['id'], shared=False)
548 self.client.show_qos_policy(qos_policy['id'])
549 with testtools.ExpectedException(exceptions.NotFound):
550 self.client2.show_qos_policy(qos_policy['id'])
551 self.assertIn(rbac,
552 self.admin_client.list_rbac_policies()['rbac_policies'])
553
554 @test.idempotent_id('9f85c76a-a350-11e5-8ae5-54ee756c66df')
555 def test_policy_target_update(self):
556 res = self._make_admin_policy_shared_to_tenant_id(
557 self.client.tenant_id)
558 # change to client2
559 update_res = self.admin_client.update_rbac_policy(
560 res['rbac_policy']['id'], target_tenant=self.client2.tenant_id)
561 self.assertEqual(self.client2.tenant_id,
562 update_res['rbac_policy']['target_tenant'])
563 # make sure everything else stayed the same
564 res['rbac_policy'].pop('target_tenant')
565 update_res['rbac_policy'].pop('target_tenant')
566 self.assertEqual(res['rbac_policy'], update_res['rbac_policy'])
567
568 @test.idempotent_id('a9b39f46-a350-11e5-97c7-54ee756c66df')
569 def test_network_presence_prevents_policy_rbac_policy_deletion(self):
570 res = self._make_admin_policy_shared_to_tenant_id(
571 self.client2.tenant_id)
572 qos_policy_id = res['policy']['id']
573 self._create_network(qos_policy_id, self.client2)
574 # a network with shared qos-policy should prevent the deletion of an
575 # rbac-policy required for it to be shared
576 with testtools.ExpectedException(exceptions.Conflict):
577 self.admin_client.delete_rbac_policy(res['rbac_policy']['id'])
578
579 # a wildcard policy should allow the specific policy to be deleted
580 # since it allows the remaining port
581 wild = self.admin_client.create_rbac_policy(
582 object_type='qos_policy', object_id=res['policy']['id'],
583 action='access_as_shared', target_tenant='*')['rbac_policy']
584 self.admin_client.delete_rbac_policy(res['rbac_policy']['id'])
585
586 # now that wildcard is the only remaining, it should be subjected to
587 # the same restriction
588 with testtools.ExpectedException(exceptions.Conflict):
589 self.admin_client.delete_rbac_policy(wild['id'])
590
591 # we can't update the policy to a different tenant
592 with testtools.ExpectedException(exceptions.Conflict):
593 self.admin_client.update_rbac_policy(
594 wild['id'], target_tenant=self.client2.tenant_id)
595
596 @test.idempotent_id('b0fe87e8-a350-11e5-9f08-54ee756c66df')
597 def test_regular_client_shares_to_another_regular_client(self):
598 # owned by self.admin_client
599 policy = self._create_qos_policy()
600 with testtools.ExpectedException(exceptions.NotFound):
601 self.client.show_qos_policy(policy['id'])
602 rbac_policy = self.admin_client.create_rbac_policy(
603 object_type='qos_policy', object_id=policy['id'],
604 action='access_as_shared',
605 target_tenant=self.client.tenant_id)['rbac_policy']
606 self.client.show_qos_policy(policy['id'])
607
608 self.assertIn(rbac_policy,
609 self.admin_client.list_rbac_policies()['rbac_policies'])
610 # ensure that 'client2' can't see the rbac-policy sharing the
611 # qos-policy to it because the rbac-policy belongs to 'client'
612 self.assertNotIn(rbac_policy['id'], [p['id'] for p in
613 self.client2.list_rbac_policies()['rbac_policies']])
614
615 @test.idempotent_id('ba88d0ca-a350-11e5-a06f-54ee756c66df')
616 def test_filter_fields(self):
617 policy = self._create_qos_policy()
618 self.admin_client.create_rbac_policy(
619 object_type='qos_policy', object_id=policy['id'],
620 action='access_as_shared', target_tenant=self.client2.tenant_id)
621 field_args = (('id',), ('id', 'action'), ('object_type', 'object_id'),
622 ('tenant_id', 'target_tenant'))
623 for fields in field_args:
624 res = self.admin_client.list_rbac_policies(fields=fields)
625 self.assertEqual(set(fields), set(res['rbac_policies'][0].keys()))
626
627 @test.idempotent_id('c10d993a-a350-11e5-9c7a-54ee756c66df')
628 def test_rbac_policy_show(self):
629 res = self._make_admin_policy_shared_to_tenant_id(
630 self.client.tenant_id)
631 p1 = res['rbac_policy']
632 p2 = self.admin_client.create_rbac_policy(
633 object_type='qos_policy', object_id=res['policy']['id'],
634 action='access_as_shared',
635 target_tenant='*')['rbac_policy']
636
637 self.assertEqual(
638 p1, self.admin_client.show_rbac_policy(p1['id'])['rbac_policy'])
639 self.assertEqual(
640 p2, self.admin_client.show_rbac_policy(p2['id'])['rbac_policy'])
641
642 @test.idempotent_id('c7496f86-a350-11e5-b380-54ee756c66df')
643 def test_filter_rbac_policies(self):
644 policy = self._create_qos_policy()
645 rbac_pol1 = self.admin_client.create_rbac_policy(
646 object_type='qos_policy', object_id=policy['id'],
647 action='access_as_shared',
648 target_tenant=self.client2.tenant_id)['rbac_policy']
649 rbac_pol2 = self.admin_client.create_rbac_policy(
650 object_type='qos_policy', object_id=policy['id'],
651 action='access_as_shared',
652 target_tenant=self.admin_client.tenant_id)['rbac_policy']
653 res1 = self.admin_client.list_rbac_policies(id=rbac_pol1['id'])[
654 'rbac_policies']
655 res2 = self.admin_client.list_rbac_policies(id=rbac_pol2['id'])[
656 'rbac_policies']
657 self.assertEqual(1, len(res1))
658 self.assertEqual(1, len(res2))
659 self.assertEqual(rbac_pol1['id'], res1[0]['id'])
660 self.assertEqual(rbac_pol2['id'], res2[0]['id'])
661
662 @test.idempotent_id('cd7d755a-a350-11e5-a344-54ee756c66df')
663 def test_regular_client_blocked_from_sharing_anothers_policy(self):
664 qos_policy = self._make_admin_policy_shared_to_tenant_id(
665 self.client.tenant_id)['policy']
666 with testtools.ExpectedException(exceptions.BadRequest):
667 self.client.create_rbac_policy(
668 object_type='qos_policy', object_id=qos_policy['id'],
669 action='access_as_shared',
670 target_tenant=self.client2.tenant_id)
671
672 # make sure the rbac-policy is invisible to the tenant for which it's
673 # being shared
674 self.assertFalse(self.client.list_rbac_policies()['rbac_policies'])
675
676
677class QosDscpMarkingRuleTestJSON(base.BaseAdminNetworkTest):
678 VALID_DSCP_MARK1 = 56
679 VALID_DSCP_MARK2 = 48
680
681 @classmethod
682 @test.requires_ext(extension="qos", service="network")
683 def resource_setup(cls):
684 super(QosDscpMarkingRuleTestJSON, cls).resource_setup()
685
Miguel Angel Ajocda3f072016-04-01 15:24:54 +0200686 @test.idempotent_id('f5cbaceb-5829-497c-9c60-ad70969e9a08')
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000687 def test_rule_create(self):
688 policy = self.create_qos_policy(name='test-policy',
689 description='test policy',
690 shared=False)
691 rule = self.admin_client.create_dscp_marking_rule(
692 policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
693
694 # Test 'show rule'
695 retrieved_rule = self.admin_client.show_dscp_marking_rule(
696 policy['id'], rule['id'])
697 retrieved_rule = retrieved_rule['dscp_marking_rule']
698 self.assertEqual(rule['id'], retrieved_rule['id'])
699 self.assertEqual(self.VALID_DSCP_MARK1, retrieved_rule['dscp_mark'])
700
701 # Test 'list rules'
702 rules = self.admin_client.list_dscp_marking_rules(policy['id'])
703 rules = rules['dscp_marking_rules']
704 rules_ids = [r['id'] for r in rules]
705 self.assertIn(rule['id'], rules_ids)
706
707 # Test 'show policy'
708 retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
709 policy_rules = retrieved_policy['policy']['rules']
710 self.assertEqual(1, len(policy_rules))
711 self.assertEqual(rule['id'], policy_rules[0]['id'])
712 self.assertEqual(qos_consts.RULE_TYPE_DSCP_MARK,
713 policy_rules[0]['type'])
714
Miguel Angel Ajocda3f072016-04-01 15:24:54 +0200715 @test.idempotent_id('08553ffe-030f-4037-b486-7e0b8fb9385a')
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000716 def test_rule_create_fail_for_the_same_type(self):
717 policy = self.create_qos_policy(name='test-policy',
718 description='test policy',
719 shared=False)
720 self.admin_client.create_dscp_marking_rule(
721 policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
722
723 self.assertRaises(exceptions.Conflict,
724 self.admin_client.create_dscp_marking_rule,
725 policy_id=policy['id'],
726 dscp_mark=self.VALID_DSCP_MARK2)
727
Miguel Angel Ajocda3f072016-04-01 15:24:54 +0200728 @test.idempotent_id('76f632e5-3175-4408-9a32-3625e599c8a2')
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000729 def test_rule_update(self):
730 policy = self.create_qos_policy(name='test-policy',
731 description='test policy',
732 shared=False)
733 rule = self.admin_client.create_dscp_marking_rule(
734 policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
735
736 self.admin_client.update_dscp_marking_rule(
737 policy['id'], rule['id'], dscp_mark=self.VALID_DSCP_MARK2)
738
739 retrieved_policy = self.admin_client.show_dscp_marking_rule(
740 policy['id'], rule['id'])
741 retrieved_policy = retrieved_policy['dscp_marking_rule']
742 self.assertEqual(self.VALID_DSCP_MARK2, retrieved_policy['dscp_mark'])
743
Miguel Angel Ajocda3f072016-04-01 15:24:54 +0200744 @test.idempotent_id('74f81904-c35f-48a3-adae-1f5424cb3c18')
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000745 def test_rule_delete(self):
746 policy = self.create_qos_policy(name='test-policy',
747 description='test policy',
748 shared=False)
749 rule = self.admin_client.create_dscp_marking_rule(
750 policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
751
752 retrieved_policy = self.admin_client.show_dscp_marking_rule(
753 policy['id'], rule['id'])
754 retrieved_policy = retrieved_policy['dscp_marking_rule']
755 self.assertEqual(rule['id'], retrieved_policy['id'])
756
757 self.admin_client.delete_dscp_marking_rule(policy['id'], rule['id'])
758 self.assertRaises(exceptions.NotFound,
759 self.admin_client.show_dscp_marking_rule,
760 policy['id'], rule['id'])
761
Miguel Angel Ajocda3f072016-04-01 15:24:54 +0200762 @test.idempotent_id('9cb8ef5c-96fc-4978-9ee0-e3b02bab628a')
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000763 def test_rule_create_rule_nonexistent_policy(self):
764 self.assertRaises(
765 exceptions.NotFound,
766 self.admin_client.create_dscp_marking_rule,
767 'policy', self.VALID_DSCP_MARK1)
768
Miguel Angel Ajocda3f072016-04-01 15:24:54 +0200769 @test.idempotent_id('bf6002ea-29de-486f-b65d-08aea6d4c4e2')
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000770 def test_rule_create_forbidden_for_regular_tenants(self):
771 self.assertRaises(
772 exceptions.Forbidden,
773 self.client.create_dscp_marking_rule,
774 'policy', self.VALID_DSCP_MARK1)
775
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000776 @test.idempotent_id('33646b08-4f05-4493-a48a-bde768a18533')
777 def test_invalid_rule_create(self):
778 policy = self.create_qos_policy(name='test-policy',
779 description='test policy',
780 shared=False)
781 self.assertRaises(
782 exceptions.BadRequest,
783 self.admin_client.create_dscp_marking_rule,
784 policy['id'], 58)
785
Miguel Angel Ajocda3f072016-04-01 15:24:54 +0200786 @test.idempotent_id('c565131d-4c80-4231-b0f3-9ae2be4de129')
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000787 def test_get_rules_by_policy(self):
788 policy1 = self.create_qos_policy(name='test-policy1',
789 description='test policy1',
790 shared=False)
791 rule1 = self.admin_client.create_dscp_marking_rule(
792 policy1['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
793
794 policy2 = self.create_qos_policy(name='test-policy2',
795 description='test policy2',
796 shared=False)
797 rule2 = self.admin_client.create_dscp_marking_rule(
798 policy2['id'], self.VALID_DSCP_MARK2)['dscp_marking_rule']
799
800 # Test 'list rules'
801 rules = self.admin_client.list_dscp_marking_rules(policy1['id'])
802 rules = rules['dscp_marking_rules']
803 rules_ids = [r['id'] for r in rules]
804 self.assertIn(rule1['id'], rules_ids)
805 self.assertNotIn(rule2['id'], rules_ids)