blob: e8a8686b9f0e1b40dc603be57ae71aa61bc18112 [file] [log] [blame]
Daniel Mellado3c0aeab2016-01-29 11:30:25 +00001# Copyright 2015 Red Hat, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15from tempest.lib.common.utils import data_utils
16from tempest.lib import exceptions
17from tempest import test
18
19import testtools
20
21from neutron.services.qos import qos_consts
22from neutron.tests.tempest.api import base
23
24
25class QosTestJSON(base.BaseAdminNetworkTest):
26 @classmethod
27 @test.requires_ext(extension="qos", service="network")
28 def resource_setup(cls):
29 super(QosTestJSON, cls).resource_setup()
30
31 @test.attr(type='smoke')
32 @test.idempotent_id('108fbdf7-3463-4e47-9871-d07f3dcf5bbb')
33 def test_create_policy(self):
34 policy = self.create_qos_policy(name='test-policy',
35 description='test policy desc1',
36 shared=False)
37
38 # Test 'show policy'
39 retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
40 retrieved_policy = retrieved_policy['policy']
41 self.assertEqual('test-policy', retrieved_policy['name'])
42 self.assertEqual('test policy desc1', retrieved_policy['description'])
43 self.assertFalse(retrieved_policy['shared'])
44
45 # Test 'list policies'
46 policies = self.admin_client.list_qos_policies()['policies']
47 policies_ids = [p['id'] for p in policies]
48 self.assertIn(policy['id'], policies_ids)
49
50 @test.attr(type='smoke')
51 @test.idempotent_id('f8d20e92-f06d-4805-b54f-230f77715815')
52 def test_list_policy_filter_by_name(self):
53 self.create_qos_policy(name='test', description='test policy',
54 shared=False)
55 self.create_qos_policy(name='test2', description='test policy',
56 shared=False)
57
58 policies = (self.admin_client.
59 list_qos_policies(name='test')['policies'])
60 self.assertEqual(1, len(policies))
61
62 retrieved_policy = policies[0]
63 self.assertEqual('test', retrieved_policy['name'])
64
65 @test.attr(type='smoke')
66 @test.idempotent_id('8e88a54b-f0b2-4b7d-b061-a15d93c2c7d6')
67 def test_policy_update(self):
68 policy = self.create_qos_policy(name='test-policy',
69 description='',
70 shared=False)
71 self.admin_client.update_qos_policy(policy['id'],
72 description='test policy desc2',
73 shared=True)
74
75 retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
76 retrieved_policy = retrieved_policy['policy']
77 self.assertEqual('test policy desc2', retrieved_policy['description'])
78 self.assertTrue(retrieved_policy['shared'])
79 self.assertEqual([], retrieved_policy['rules'])
80
81 @test.attr(type='smoke')
82 @test.idempotent_id('1cb42653-54bd-4a9a-b888-c55e18199201')
83 def test_delete_policy(self):
84 policy = self.admin_client.create_qos_policy(
85 'test-policy', 'desc', True)['policy']
86
87 retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
88 retrieved_policy = retrieved_policy['policy']
89 self.assertEqual('test-policy', retrieved_policy['name'])
90
91 self.admin_client.delete_qos_policy(policy['id'])
92 self.assertRaises(exceptions.NotFound,
93 self.admin_client.show_qos_policy, policy['id'])
94
95 @test.attr(type='smoke')
96 @test.idempotent_id('cf776f77-8d3d-49f2-8572-12d6a1557224')
97 def test_list_admin_rule_types(self):
98 self._test_list_rule_types(self.admin_client)
99
100 @test.attr(type='smoke')
101 @test.idempotent_id('49c8ea35-83a9-453a-bd23-239cf3b13929')
102 def test_list_regular_rule_types(self):
103 self._test_list_rule_types(self.client)
104
105 def _test_list_rule_types(self, client):
106 # List supported rule types
107 # TODO(QoS): since in gate we run both ovs and linuxbridge ml2 drivers,
108 # and since Linux Bridge ml2 driver does not have QoS support yet, ml2
109 # plugin reports no rule types are supported. Once linuxbridge will
110 # receive support for QoS, the list of expected rule types will change.
111 #
112 # In theory, we could make the test conditional on which ml2 drivers
113 # are enabled in gate (or more specifically, on which supported qos
114 # rules are claimed by core plugin), but that option doesn't seem to be
115 # available thru tempest.lib framework
116 expected_rule_types = []
117 expected_rule_details = ['type']
118
119 rule_types = client.list_qos_rule_types()
120 actual_list_rule_types = rule_types['rule_types']
121 actual_rule_types = [rule['type'] for rule in actual_list_rule_types]
122
123 # Verify that only required fields present in rule details
124 for rule in actual_list_rule_types:
125 self.assertEqual(tuple(rule.keys()), tuple(expected_rule_details))
126
127 # Verify if expected rules are present in the actual rules list
128 for rule in expected_rule_types:
129 self.assertIn(rule, actual_rule_types)
130
131 def _disassociate_network(self, client, network_id):
132 client.update_network(network_id, qos_policy_id=None)
133 updated_network = self.admin_client.show_network(network_id)
134 self.assertIsNone(updated_network['network']['qos_policy_id'])
135
136 @test.attr(type='smoke')
137 @test.idempotent_id('65b9ef75-1911-406a-bbdb-ca1d68d528b0')
138 def test_policy_association_with_admin_network(self):
139 policy = self.create_qos_policy(name='test-policy',
140 description='test policy',
141 shared=False)
142 network = self.create_shared_network('test network',
143 qos_policy_id=policy['id'])
144
145 retrieved_network = self.admin_client.show_network(network['id'])
146 self.assertEqual(
147 policy['id'], retrieved_network['network']['qos_policy_id'])
148
149 self._disassociate_network(self.admin_client, network['id'])
150
151 @test.attr(type='smoke')
152 @test.idempotent_id('1738de5d-0476-4163-9022-5e1b548c208e')
153 def test_policy_association_with_tenant_network(self):
154 policy = self.create_qos_policy(name='test-policy',
155 description='test policy',
156 shared=True)
157 network = self.create_network('test network',
158 qos_policy_id=policy['id'])
159
160 retrieved_network = self.admin_client.show_network(network['id'])
161 self.assertEqual(
162 policy['id'], retrieved_network['network']['qos_policy_id'])
163
164 self._disassociate_network(self.client, network['id'])
165
166 @test.attr(type='smoke')
167 @test.idempotent_id('9efe63d0-836f-4cc2-b00c-468e63aa614e')
168 def test_policy_association_with_network_nonexistent_policy(self):
169 self.assertRaises(
170 exceptions.NotFound,
171 self.create_network,
172 'test network',
173 qos_policy_id='9efe63d0-836f-4cc2-b00c-468e63aa614e')
174
175 @test.attr(type='smoke')
176 @test.idempotent_id('1aa55a79-324f-47d9-a076-894a8fc2448b')
177 def test_policy_association_with_network_non_shared_policy(self):
178 policy = self.create_qos_policy(name='test-policy',
179 description='test policy',
180 shared=False)
181 self.assertRaises(
182 exceptions.NotFound,
183 self.create_network,
184 'test network', qos_policy_id=policy['id'])
185
186 @test.attr(type='smoke')
187 @test.idempotent_id('09a9392c-1359-4cbb-989f-fb768e5834a8')
188 def test_policy_update_association_with_admin_network(self):
189 policy = self.create_qos_policy(name='test-policy',
190 description='test policy',
191 shared=False)
192 network = self.create_shared_network('test network')
193 retrieved_network = self.admin_client.show_network(network['id'])
194 self.assertIsNone(retrieved_network['network']['qos_policy_id'])
195
196 self.admin_client.update_network(network['id'],
197 qos_policy_id=policy['id'])
198 retrieved_network = self.admin_client.show_network(network['id'])
199 self.assertEqual(
200 policy['id'], retrieved_network['network']['qos_policy_id'])
201
202 self._disassociate_network(self.admin_client, network['id'])
203
204 def _disassociate_port(self, port_id):
205 self.client.update_port(port_id, qos_policy_id=None)
206 updated_port = self.admin_client.show_port(port_id)
207 self.assertIsNone(updated_port['port']['qos_policy_id'])
208
209 @test.attr(type='smoke')
210 @test.idempotent_id('98fcd95e-84cf-4746-860e-44692e674f2e')
211 def test_policy_association_with_port_shared_policy(self):
212 policy = self.create_qos_policy(name='test-policy',
213 description='test policy',
214 shared=True)
215 network = self.create_shared_network('test network')
216 port = self.create_port(network, qos_policy_id=policy['id'])
217
218 retrieved_port = self.admin_client.show_port(port['id'])
219 self.assertEqual(
220 policy['id'], retrieved_port['port']['qos_policy_id'])
221
222 self._disassociate_port(port['id'])
223
224 @test.attr(type='smoke')
225 @test.idempotent_id('49e02f5a-e1dd-41d5-9855-cfa37f2d195e')
226 def test_policy_association_with_port_nonexistent_policy(self):
227 network = self.create_shared_network('test network')
228 self.assertRaises(
229 exceptions.NotFound,
230 self.create_port,
231 network,
232 qos_policy_id='49e02f5a-e1dd-41d5-9855-cfa37f2d195e')
233
234 @test.attr(type='smoke')
235 @test.idempotent_id('f53d961c-9fe5-4422-8b66-7add972c6031')
236 def test_policy_association_with_port_non_shared_policy(self):
237 policy = self.create_qos_policy(name='test-policy',
238 description='test policy',
239 shared=False)
240 network = self.create_shared_network('test network')
241 self.assertRaises(
242 exceptions.NotFound,
243 self.create_port,
244 network, qos_policy_id=policy['id'])
245
246 @test.attr(type='smoke')
247 @test.idempotent_id('f8163237-fba9-4db5-9526-bad6d2343c76')
248 def test_policy_update_association_with_port_shared_policy(self):
249 policy = self.create_qos_policy(name='test-policy',
250 description='test policy',
251 shared=True)
252 network = self.create_shared_network('test network')
253 port = self.create_port(network)
254 retrieved_port = self.admin_client.show_port(port['id'])
255 self.assertIsNone(retrieved_port['port']['qos_policy_id'])
256
257 self.client.update_port(port['id'], qos_policy_id=policy['id'])
258 retrieved_port = self.admin_client.show_port(port['id'])
259 self.assertEqual(
260 policy['id'], retrieved_port['port']['qos_policy_id'])
261
262 self._disassociate_port(port['id'])
263
264 @test.attr(type='smoke')
265 @test.idempotent_id('18163237-8ba9-4db5-9525-bad6d2343c75')
266 def test_delete_not_allowed_if_policy_in_use_by_network(self):
267 policy = self.create_qos_policy(name='test-policy',
268 description='test policy',
269 shared=True)
270 network = self.create_shared_network(
271 'test network', qos_policy_id=policy['id'])
272 self.assertRaises(
273 exceptions.Conflict,
274 self.admin_client.delete_qos_policy, policy['id'])
275
276 self._disassociate_network(self.admin_client, network['id'])
277 self.admin_client.delete_qos_policy(policy['id'])
278
279 @test.attr(type='smoke')
280 @test.idempotent_id('24153230-84a9-4dd5-9525-bad6d2343c75')
281 def test_delete_not_allowed_if_policy_in_use_by_port(self):
282 policy = self.create_qos_policy(name='test-policy',
283 description='test policy',
284 shared=True)
285 network = self.create_shared_network('test network')
286 port = self.create_port(network, qos_policy_id=policy['id'])
287 self.assertRaises(
288 exceptions.Conflict,
289 self.admin_client.delete_qos_policy, policy['id'])
290
291 self._disassociate_port(port['id'])
292 self.admin_client.delete_qos_policy(policy['id'])
293
294 @test.attr(type='smoke')
295 @test.idempotent_id('a2a5849b-dd06-4b18-9664-0b6828a1fc27')
296 def test_qos_policy_delete_with_rules(self):
297 policy = self.create_qos_policy(name='test-policy',
298 description='test policy',
299 shared=False)
300 self.admin_client.create_bandwidth_limit_rule(
301 policy['id'], 200, 1337)['bandwidth_limit_rule']
302
303 self.admin_client.delete_qos_policy(policy['id'])
304
305 with testtools.ExpectedException(exceptions.NotFound):
306 self.admin_client.show_qos_policy(policy['id'])
307
308
309class QosBandwidthLimitRuleTestJSON(base.BaseAdminNetworkTest):
310 @classmethod
311 @test.requires_ext(extension="qos", service="network")
312 def resource_setup(cls):
313 super(QosBandwidthLimitRuleTestJSON, cls).resource_setup()
314
315 @test.attr(type='smoke')
316 @test.idempotent_id('8a59b00b-3e9c-4787-92f8-93a5cdf5e378')
317 def test_rule_create(self):
318 policy = self.create_qos_policy(name='test-policy',
319 description='test policy',
320 shared=False)
321 rule = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
322 max_kbps=200,
323 max_burst_kbps=1337)
324
325 # Test 'show rule'
326 retrieved_rule = self.admin_client.show_bandwidth_limit_rule(
327 policy['id'], rule['id'])
328 retrieved_rule = retrieved_rule['bandwidth_limit_rule']
329 self.assertEqual(rule['id'], retrieved_rule['id'])
330 self.assertEqual(200, retrieved_rule['max_kbps'])
331 self.assertEqual(1337, retrieved_rule['max_burst_kbps'])
332
333 # Test 'list rules'
334 rules = self.admin_client.list_bandwidth_limit_rules(policy['id'])
335 rules = rules['bandwidth_limit_rules']
336 rules_ids = [r['id'] for r in rules]
337 self.assertIn(rule['id'], rules_ids)
338
339 # Test 'show policy'
340 retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
341 policy_rules = retrieved_policy['policy']['rules']
342 self.assertEqual(1, len(policy_rules))
343 self.assertEqual(rule['id'], policy_rules[0]['id'])
344 self.assertEqual(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT,
345 policy_rules[0]['type'])
346
347 @test.attr(type='smoke')
348 @test.idempotent_id('8a59b00b-ab01-4787-92f8-93a5cdf5e378')
349 def test_rule_create_fail_for_the_same_type(self):
350 policy = self.create_qos_policy(name='test-policy',
351 description='test policy',
352 shared=False)
353 self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
354 max_kbps=200,
355 max_burst_kbps=1337)
356
357 self.assertRaises(exceptions.Conflict,
358 self.create_qos_bandwidth_limit_rule,
359 policy_id=policy['id'],
360 max_kbps=201, max_burst_kbps=1338)
361
362 @test.attr(type='smoke')
363 @test.idempotent_id('149a6988-2568-47d2-931e-2dbc858943b3')
364 def test_rule_update(self):
365 policy = self.create_qos_policy(name='test-policy',
366 description='test policy',
367 shared=False)
368 rule = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
369 max_kbps=1,
370 max_burst_kbps=1)
371
372 self.admin_client.update_bandwidth_limit_rule(policy['id'],
373 rule['id'],
374 max_kbps=200,
375 max_burst_kbps=1337)
376
377 retrieved_policy = self.admin_client.show_bandwidth_limit_rule(
378 policy['id'], rule['id'])
379 retrieved_policy = retrieved_policy['bandwidth_limit_rule']
380 self.assertEqual(200, retrieved_policy['max_kbps'])
381 self.assertEqual(1337, retrieved_policy['max_burst_kbps'])
382
383 @test.attr(type='smoke')
384 @test.idempotent_id('67ee6efd-7b33-4a68-927d-275b4f8ba958')
385 def test_rule_delete(self):
386 policy = self.create_qos_policy(name='test-policy',
387 description='test policy',
388 shared=False)
389 rule = self.admin_client.create_bandwidth_limit_rule(
390 policy['id'], 200, 1337)['bandwidth_limit_rule']
391
392 retrieved_policy = self.admin_client.show_bandwidth_limit_rule(
393 policy['id'], rule['id'])
394 retrieved_policy = retrieved_policy['bandwidth_limit_rule']
395 self.assertEqual(rule['id'], retrieved_policy['id'])
396
397 self.admin_client.delete_bandwidth_limit_rule(policy['id'], rule['id'])
398 self.assertRaises(exceptions.NotFound,
399 self.admin_client.show_bandwidth_limit_rule,
400 policy['id'], rule['id'])
401
402 @test.attr(type='smoke')
403 @test.idempotent_id('f211222c-5808-46cb-a961-983bbab6b852')
404 def test_rule_create_rule_nonexistent_policy(self):
405 self.assertRaises(
406 exceptions.NotFound,
407 self.create_qos_bandwidth_limit_rule,
408 'policy', 200, 1337)
409
410 @test.attr(type='smoke')
411 @test.idempotent_id('eed8e2a6-22da-421b-89b9-935a2c1a1b50')
412 def test_policy_create_forbidden_for_regular_tenants(self):
413 self.assertRaises(
414 exceptions.Forbidden,
415 self.client.create_qos_policy,
416 'test-policy', 'test policy', False)
417
418 @test.attr(type='smoke')
419 @test.idempotent_id('a4a2e7ad-786f-4927-a85a-e545a93bd274')
420 def test_rule_create_forbidden_for_regular_tenants(self):
421 self.assertRaises(
422 exceptions.Forbidden,
423 self.client.create_bandwidth_limit_rule,
424 'policy', 1, 2)
425
426 @test.attr(type='smoke')
427 @test.idempotent_id('ce0bd0c2-54d9-4e29-85f1-cfb36ac3ebe2')
428 def test_get_rules_by_policy(self):
429 policy1 = self.create_qos_policy(name='test-policy1',
430 description='test policy1',
431 shared=False)
432 rule1 = self.create_qos_bandwidth_limit_rule(policy_id=policy1['id'],
433 max_kbps=200,
434 max_burst_kbps=1337)
435
436 policy2 = self.create_qos_policy(name='test-policy2',
437 description='test policy2',
438 shared=False)
439 rule2 = self.create_qos_bandwidth_limit_rule(policy_id=policy2['id'],
440 max_kbps=5000,
441 max_burst_kbps=2523)
442
443 # Test 'list rules'
444 rules = self.admin_client.list_bandwidth_limit_rules(policy1['id'])
445 rules = rules['bandwidth_limit_rules']
446 rules_ids = [r['id'] for r in rules]
447 self.assertIn(rule1['id'], rules_ids)
448 self.assertNotIn(rule2['id'], rules_ids)
449
450
451class RbacSharedQosPoliciesTest(base.BaseAdminNetworkTest):
452
453 force_tenant_isolation = True
454 credentials = ['primary', 'alt', 'admin']
455
456 @classmethod
457 @test.requires_ext(extension="qos", service="network")
458 def resource_setup(cls):
459 super(RbacSharedQosPoliciesTest, cls).resource_setup()
460 cls.client2 = cls.alt_manager.network_client
461
462 def _create_qos_policy(self, tenant_id=None):
463 args = {'name': data_utils.rand_name('test-policy'),
464 'description': 'test policy',
465 'shared': False,
466 'tenant_id': tenant_id}
467 qos_policy = self.admin_client.create_qos_policy(**args)['policy']
468 self.addCleanup(self.admin_client.delete_qos_policy, qos_policy['id'])
469
470 return qos_policy
471
472 def _make_admin_policy_shared_to_tenant_id(self, tenant_id):
473 policy = self._create_qos_policy()
474 rbac_policy = self.admin_client.create_rbac_policy(
475 object_type='qos_policy',
476 object_id=policy['id'],
477 action='access_as_shared',
478 target_tenant=tenant_id,
479 )['rbac_policy']
480
481 return {'policy': policy, 'rbac_policy': rbac_policy}
482
483 def _create_network(self, qos_policy_id, client, should_cleanup=True):
484 net = client.create_network(
485 name=data_utils.rand_name('test-network'),
486 qos_policy_id=qos_policy_id)['network']
487 if should_cleanup:
488 self.addCleanup(client.delete_network, net['id'])
489
490 return net
491
492 @test.idempotent_id('b9dcf582-d3b3-11e5-950a-54ee756c66df')
493 def test_policy_sharing_with_wildcard(self):
494 qos_pol = self.create_qos_policy(
495 name=data_utils.rand_name('test-policy'),
496 description='test-shared-policy', shared=False)
497 self.assertNotIn(qos_pol, self.client2.list_qos_policies()['policies'])
498
499 # test update shared False -> True
500 self.admin_client.update_qos_policy(qos_pol['id'], shared=True)
501 qos_pol['shared'] = True
502 self.client2.show_qos_policy(qos_pol['id'])
503 rbac_pol = {'target_tenant': '*',
504 'tenant_id': self.admin_client.tenant_id,
505 'object_type': 'qos_policy',
506 'object_id': qos_pol['id'],
507 'action': 'access_as_shared'}
508
509 rbac_policies = self.admin_client.list_rbac_policies()['rbac_policies']
510 rbac_policies = [r for r in rbac_policies if r.pop('id')]
511 self.assertIn(rbac_pol, rbac_policies)
512
513 # update shared True -> False should fail because the policy is bound
514 # to a network
515 net = self._create_network(qos_pol['id'], self.admin_client, False)
516 with testtools.ExpectedException(exceptions.Conflict):
517 self.admin_client.update_qos_policy(qos_pol['id'], shared=False)
518
519 # delete the network, and update shared True -> False should pass now
520 self.admin_client.delete_network(net['id'])
521 self.admin_client.update_qos_policy(qos_pol['id'], shared=False)
522 qos_pol['shared'] = False
523 self.assertNotIn(qos_pol, self.client2.list_qos_policies()['policies'])
524
525 def _create_net_bound_qos_rbacs(self):
526 res = self._make_admin_policy_shared_to_tenant_id(
527 self.client.tenant_id)
528 qos_policy, rbac_for_client_tenant = res['policy'], res['rbac_policy']
529
530 # add a wildcard rbac rule - now the policy globally shared
531 rbac_wildcard = self.admin_client.create_rbac_policy(
532 object_type='qos_policy',
533 object_id=qos_policy['id'],
534 action='access_as_shared',
535 target_tenant='*',
536 )['rbac_policy']
537
538 # tenant1 now uses qos policy for net
539 self._create_network(qos_policy['id'], self.client)
540
541 return rbac_for_client_tenant, rbac_wildcard
542
543 @test.idempotent_id('328b1f70-d424-11e5-a57f-54ee756c66df')
544 def test_net_bound_shared_policy_wildcard_and_tenant_id_wild_remove(self):
545 client_rbac, wildcard_rbac = self._create_net_bound_qos_rbacs()
546 # globally unshare the qos-policy, the specific share should remain
547 self.admin_client.delete_rbac_policy(wildcard_rbac['id'])
548 self.client.list_rbac_policies(id=client_rbac['id'])
549
550 @test.idempotent_id('328b1f70-d424-11e5-a57f-54ee756c66df')
551 def test_net_bound_shared_policy_wildcard_and_tenant_id_wild_remains(self):
552 client_rbac, wildcard_rbac = self._create_net_bound_qos_rbacs()
553 # remove client_rbac policy the wildcard share should remain
554 self.admin_client.delete_rbac_policy(client_rbac['id'])
555 self.client.list_rbac_policies(id=wildcard_rbac['id'])
556
557 @test.idempotent_id('2ace9adc-da6e-11e5-aafe-54ee756c66df')
558 def test_policy_sharing_with_wildcard_and_tenant_id(self):
559 res = self._make_admin_policy_shared_to_tenant_id(
560 self.client.tenant_id)
561 qos_policy, rbac = res['policy'], res['rbac_policy']
562 qos_pol = self.client.show_qos_policy(qos_policy['id'])['policy']
563 self.assertTrue(qos_pol['shared'])
564 with testtools.ExpectedException(exceptions.NotFound):
565 self.client2.show_qos_policy(qos_policy['id'])
566
567 # make the qos-policy globally shared
568 self.admin_client.update_qos_policy(qos_policy['id'], shared=True)
569 qos_pol = self.client2.show_qos_policy(qos_policy['id'])['policy']
570 self.assertTrue(qos_pol['shared'])
571
572 # globally unshare the qos-policy, the specific share should remain
573 self.admin_client.update_qos_policy(qos_policy['id'], shared=False)
574 self.client.show_qos_policy(qos_policy['id'])
575 with testtools.ExpectedException(exceptions.NotFound):
576 self.client2.show_qos_policy(qos_policy['id'])
577 self.assertIn(rbac,
578 self.admin_client.list_rbac_policies()['rbac_policies'])
579
580 @test.idempotent_id('9f85c76a-a350-11e5-8ae5-54ee756c66df')
581 def test_policy_target_update(self):
582 res = self._make_admin_policy_shared_to_tenant_id(
583 self.client.tenant_id)
584 # change to client2
585 update_res = self.admin_client.update_rbac_policy(
586 res['rbac_policy']['id'], target_tenant=self.client2.tenant_id)
587 self.assertEqual(self.client2.tenant_id,
588 update_res['rbac_policy']['target_tenant'])
589 # make sure everything else stayed the same
590 res['rbac_policy'].pop('target_tenant')
591 update_res['rbac_policy'].pop('target_tenant')
592 self.assertEqual(res['rbac_policy'], update_res['rbac_policy'])
593
594 @test.idempotent_id('a9b39f46-a350-11e5-97c7-54ee756c66df')
595 def test_network_presence_prevents_policy_rbac_policy_deletion(self):
596 res = self._make_admin_policy_shared_to_tenant_id(
597 self.client2.tenant_id)
598 qos_policy_id = res['policy']['id']
599 self._create_network(qos_policy_id, self.client2)
600 # a network with shared qos-policy should prevent the deletion of an
601 # rbac-policy required for it to be shared
602 with testtools.ExpectedException(exceptions.Conflict):
603 self.admin_client.delete_rbac_policy(res['rbac_policy']['id'])
604
605 # a wildcard policy should allow the specific policy to be deleted
606 # since it allows the remaining port
607 wild = self.admin_client.create_rbac_policy(
608 object_type='qos_policy', object_id=res['policy']['id'],
609 action='access_as_shared', target_tenant='*')['rbac_policy']
610 self.admin_client.delete_rbac_policy(res['rbac_policy']['id'])
611
612 # now that wildcard is the only remaining, it should be subjected to
613 # the same restriction
614 with testtools.ExpectedException(exceptions.Conflict):
615 self.admin_client.delete_rbac_policy(wild['id'])
616
617 # we can't update the policy to a different tenant
618 with testtools.ExpectedException(exceptions.Conflict):
619 self.admin_client.update_rbac_policy(
620 wild['id'], target_tenant=self.client2.tenant_id)
621
622 @test.idempotent_id('b0fe87e8-a350-11e5-9f08-54ee756c66df')
623 def test_regular_client_shares_to_another_regular_client(self):
624 # owned by self.admin_client
625 policy = self._create_qos_policy()
626 with testtools.ExpectedException(exceptions.NotFound):
627 self.client.show_qos_policy(policy['id'])
628 rbac_policy = self.admin_client.create_rbac_policy(
629 object_type='qos_policy', object_id=policy['id'],
630 action='access_as_shared',
631 target_tenant=self.client.tenant_id)['rbac_policy']
632 self.client.show_qos_policy(policy['id'])
633
634 self.assertIn(rbac_policy,
635 self.admin_client.list_rbac_policies()['rbac_policies'])
636 # ensure that 'client2' can't see the rbac-policy sharing the
637 # qos-policy to it because the rbac-policy belongs to 'client'
638 self.assertNotIn(rbac_policy['id'], [p['id'] for p in
639 self.client2.list_rbac_policies()['rbac_policies']])
640
641 @test.idempotent_id('ba88d0ca-a350-11e5-a06f-54ee756c66df')
642 def test_filter_fields(self):
643 policy = self._create_qos_policy()
644 self.admin_client.create_rbac_policy(
645 object_type='qos_policy', object_id=policy['id'],
646 action='access_as_shared', target_tenant=self.client2.tenant_id)
647 field_args = (('id',), ('id', 'action'), ('object_type', 'object_id'),
648 ('tenant_id', 'target_tenant'))
649 for fields in field_args:
650 res = self.admin_client.list_rbac_policies(fields=fields)
651 self.assertEqual(set(fields), set(res['rbac_policies'][0].keys()))
652
653 @test.idempotent_id('c10d993a-a350-11e5-9c7a-54ee756c66df')
654 def test_rbac_policy_show(self):
655 res = self._make_admin_policy_shared_to_tenant_id(
656 self.client.tenant_id)
657 p1 = res['rbac_policy']
658 p2 = self.admin_client.create_rbac_policy(
659 object_type='qos_policy', object_id=res['policy']['id'],
660 action='access_as_shared',
661 target_tenant='*')['rbac_policy']
662
663 self.assertEqual(
664 p1, self.admin_client.show_rbac_policy(p1['id'])['rbac_policy'])
665 self.assertEqual(
666 p2, self.admin_client.show_rbac_policy(p2['id'])['rbac_policy'])
667
668 @test.idempotent_id('c7496f86-a350-11e5-b380-54ee756c66df')
669 def test_filter_rbac_policies(self):
670 policy = self._create_qos_policy()
671 rbac_pol1 = self.admin_client.create_rbac_policy(
672 object_type='qos_policy', object_id=policy['id'],
673 action='access_as_shared',
674 target_tenant=self.client2.tenant_id)['rbac_policy']
675 rbac_pol2 = self.admin_client.create_rbac_policy(
676 object_type='qos_policy', object_id=policy['id'],
677 action='access_as_shared',
678 target_tenant=self.admin_client.tenant_id)['rbac_policy']
679 res1 = self.admin_client.list_rbac_policies(id=rbac_pol1['id'])[
680 'rbac_policies']
681 res2 = self.admin_client.list_rbac_policies(id=rbac_pol2['id'])[
682 'rbac_policies']
683 self.assertEqual(1, len(res1))
684 self.assertEqual(1, len(res2))
685 self.assertEqual(rbac_pol1['id'], res1[0]['id'])
686 self.assertEqual(rbac_pol2['id'], res2[0]['id'])
687
688 @test.idempotent_id('cd7d755a-a350-11e5-a344-54ee756c66df')
689 def test_regular_client_blocked_from_sharing_anothers_policy(self):
690 qos_policy = self._make_admin_policy_shared_to_tenant_id(
691 self.client.tenant_id)['policy']
692 with testtools.ExpectedException(exceptions.BadRequest):
693 self.client.create_rbac_policy(
694 object_type='qos_policy', object_id=qos_policy['id'],
695 action='access_as_shared',
696 target_tenant=self.client2.tenant_id)
697
698 # make sure the rbac-policy is invisible to the tenant for which it's
699 # being shared
700 self.assertFalse(self.client.list_rbac_policies()['rbac_policies'])
701
702
703class QosDscpMarkingRuleTestJSON(base.BaseAdminNetworkTest):
704 VALID_DSCP_MARK1 = 56
705 VALID_DSCP_MARK2 = 48
706
707 @classmethod
708 @test.requires_ext(extension="qos", service="network")
709 def resource_setup(cls):
710 super(QosDscpMarkingRuleTestJSON, cls).resource_setup()
711
712 @test.attr(type='smoke')
713 @test.idempotent_id('8a59b00b-3e9c-4787-92f8-93a5cdf5e378')
714 def test_rule_create(self):
715 policy = self.create_qos_policy(name='test-policy',
716 description='test policy',
717 shared=False)
718 rule = self.admin_client.create_dscp_marking_rule(
719 policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
720
721 # Test 'show rule'
722 retrieved_rule = self.admin_client.show_dscp_marking_rule(
723 policy['id'], rule['id'])
724 retrieved_rule = retrieved_rule['dscp_marking_rule']
725 self.assertEqual(rule['id'], retrieved_rule['id'])
726 self.assertEqual(self.VALID_DSCP_MARK1, retrieved_rule['dscp_mark'])
727
728 # Test 'list rules'
729 rules = self.admin_client.list_dscp_marking_rules(policy['id'])
730 rules = rules['dscp_marking_rules']
731 rules_ids = [r['id'] for r in rules]
732 self.assertIn(rule['id'], rules_ids)
733
734 # Test 'show policy'
735 retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
736 policy_rules = retrieved_policy['policy']['rules']
737 self.assertEqual(1, len(policy_rules))
738 self.assertEqual(rule['id'], policy_rules[0]['id'])
739 self.assertEqual(qos_consts.RULE_TYPE_DSCP_MARK,
740 policy_rules[0]['type'])
741
742 @test.attr(type='smoke')
743 @test.idempotent_id('8a59b00b-ab01-4787-92f8-93a5cdf5e378')
744 def test_rule_create_fail_for_the_same_type(self):
745 policy = self.create_qos_policy(name='test-policy',
746 description='test policy',
747 shared=False)
748 self.admin_client.create_dscp_marking_rule(
749 policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
750
751 self.assertRaises(exceptions.Conflict,
752 self.admin_client.create_dscp_marking_rule,
753 policy_id=policy['id'],
754 dscp_mark=self.VALID_DSCP_MARK2)
755
756 @test.attr(type='smoke')
757 @test.idempotent_id('149a6988-2568-47d2-931e-2dbc858943b3')
758 def test_rule_update(self):
759 policy = self.create_qos_policy(name='test-policy',
760 description='test policy',
761 shared=False)
762 rule = self.admin_client.create_dscp_marking_rule(
763 policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
764
765 self.admin_client.update_dscp_marking_rule(
766 policy['id'], rule['id'], dscp_mark=self.VALID_DSCP_MARK2)
767
768 retrieved_policy = self.admin_client.show_dscp_marking_rule(
769 policy['id'], rule['id'])
770 retrieved_policy = retrieved_policy['dscp_marking_rule']
771 self.assertEqual(self.VALID_DSCP_MARK2, retrieved_policy['dscp_mark'])
772
773 @test.attr(type='smoke')
774 @test.idempotent_id('67ee6efd-7b33-4a68-927d-275b4f8ba958')
775 def test_rule_delete(self):
776 policy = self.create_qos_policy(name='test-policy',
777 description='test policy',
778 shared=False)
779 rule = self.admin_client.create_dscp_marking_rule(
780 policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
781
782 retrieved_policy = self.admin_client.show_dscp_marking_rule(
783 policy['id'], rule['id'])
784 retrieved_policy = retrieved_policy['dscp_marking_rule']
785 self.assertEqual(rule['id'], retrieved_policy['id'])
786
787 self.admin_client.delete_dscp_marking_rule(policy['id'], rule['id'])
788 self.assertRaises(exceptions.NotFound,
789 self.admin_client.show_dscp_marking_rule,
790 policy['id'], rule['id'])
791
792 @test.attr(type='smoke')
793 @test.idempotent_id('f211222c-5808-46cb-a961-983bbab6b852')
794 def test_rule_create_rule_nonexistent_policy(self):
795 self.assertRaises(
796 exceptions.NotFound,
797 self.admin_client.create_dscp_marking_rule,
798 'policy', self.VALID_DSCP_MARK1)
799
800 @test.attr(type='smoke')
801 @test.idempotent_id('a4a2e7ad-786f-4927-a85a-e545a93bd274')
802 def test_rule_create_forbidden_for_regular_tenants(self):
803 self.assertRaises(
804 exceptions.Forbidden,
805 self.client.create_dscp_marking_rule,
806 'policy', self.VALID_DSCP_MARK1)
807
808 @test.attr(type='smoke')
809 @test.idempotent_id('33646b08-4f05-4493-a48a-bde768a18533')
810 def test_invalid_rule_create(self):
811 policy = self.create_qos_policy(name='test-policy',
812 description='test policy',
813 shared=False)
814 self.assertRaises(
815 exceptions.BadRequest,
816 self.admin_client.create_dscp_marking_rule,
817 policy['id'], 58)
818
819 @test.attr(type='smoke')
820 @test.idempotent_id('ce0bd0c2-54d9-4e29-85f1-cfb36ac3ebe2')
821 def test_get_rules_by_policy(self):
822 policy1 = self.create_qos_policy(name='test-policy1',
823 description='test policy1',
824 shared=False)
825 rule1 = self.admin_client.create_dscp_marking_rule(
826 policy1['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
827
828 policy2 = self.create_qos_policy(name='test-policy2',
829 description='test policy2',
830 shared=False)
831 rule2 = self.admin_client.create_dscp_marking_rule(
832 policy2['id'], self.VALID_DSCP_MARK2)['dscp_marking_rule']
833
834 # Test 'list rules'
835 rules = self.admin_client.list_dscp_marking_rules(policy1['id'])
836 rules = rules['dscp_marking_rules']
837 rules_ids = [r['id'] for r in rules]
838 self.assertIn(rule1['id'], rules_ids)
839 self.assertNotIn(rule2['id'], rules_ids)