blob: ff5799b5d71cdabcb1693cd700ac2639cd8c0bc3 [file] [log] [blame]
Daniel Mellado3c0aeab2016-01-29 11:30:25 +00001# Copyright 2015 Red Hat, Inc.
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15from tempest.lib.common.utils import data_utils
16from tempest.lib import exceptions
17from tempest import test
18
19import testtools
20
21from neutron.services.qos import qos_consts
22from neutron.tests.tempest.api import base
23
24
25class QosTestJSON(base.BaseAdminNetworkTest):
26 @classmethod
27 @test.requires_ext(extension="qos", service="network")
28 def resource_setup(cls):
29 super(QosTestJSON, cls).resource_setup()
30
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000031 @test.idempotent_id('108fbdf7-3463-4e47-9871-d07f3dcf5bbb')
32 def test_create_policy(self):
33 policy = self.create_qos_policy(name='test-policy',
34 description='test policy desc1',
35 shared=False)
36
37 # Test 'show policy'
38 retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
39 retrieved_policy = retrieved_policy['policy']
40 self.assertEqual('test-policy', retrieved_policy['name'])
41 self.assertEqual('test policy desc1', retrieved_policy['description'])
42 self.assertFalse(retrieved_policy['shared'])
43
44 # Test 'list policies'
45 policies = self.admin_client.list_qos_policies()['policies']
46 policies_ids = [p['id'] for p in policies]
47 self.assertIn(policy['id'], policies_ids)
48
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000049 @test.idempotent_id('f8d20e92-f06d-4805-b54f-230f77715815')
50 def test_list_policy_filter_by_name(self):
51 self.create_qos_policy(name='test', description='test policy',
52 shared=False)
53 self.create_qos_policy(name='test2', description='test policy',
54 shared=False)
55
56 policies = (self.admin_client.
57 list_qos_policies(name='test')['policies'])
58 self.assertEqual(1, len(policies))
59
60 retrieved_policy = policies[0]
61 self.assertEqual('test', retrieved_policy['name'])
62
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000063 @test.idempotent_id('8e88a54b-f0b2-4b7d-b061-a15d93c2c7d6')
64 def test_policy_update(self):
65 policy = self.create_qos_policy(name='test-policy',
66 description='',
67 shared=False)
68 self.admin_client.update_qos_policy(policy['id'],
69 description='test policy desc2',
70 shared=True)
71
72 retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
73 retrieved_policy = retrieved_policy['policy']
74 self.assertEqual('test policy desc2', retrieved_policy['description'])
75 self.assertTrue(retrieved_policy['shared'])
76 self.assertEqual([], retrieved_policy['rules'])
77
Sławek Kapłoński6bfcc752016-06-05 09:49:27 +000078 @test.idempotent_id('ee263db4-009a-4641-83e5-d0e83506ba4c')
79 def test_shared_policy_update(self):
80 policy = self.create_qos_policy(name='test-policy',
81 description='',
82 shared=True)
83
84 self.admin_client.update_qos_policy(policy['id'],
85 description='test policy desc2')
86 retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
87 retrieved_policy = retrieved_policy['policy']
88 self.assertTrue(retrieved_policy['shared'])
89
90 self.admin_client.update_qos_policy(policy['id'],
91 shared=False)
92 retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
93 retrieved_policy = retrieved_policy['policy']
94 self.assertFalse(retrieved_policy['shared'])
95
Daniel Mellado3c0aeab2016-01-29 11:30:25 +000096 @test.idempotent_id('1cb42653-54bd-4a9a-b888-c55e18199201')
97 def test_delete_policy(self):
98 policy = self.admin_client.create_qos_policy(
99 'test-policy', 'desc', True)['policy']
100
101 retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
102 retrieved_policy = retrieved_policy['policy']
103 self.assertEqual('test-policy', retrieved_policy['name'])
104
105 self.admin_client.delete_qos_policy(policy['id'])
106 self.assertRaises(exceptions.NotFound,
107 self.admin_client.show_qos_policy, policy['id'])
108
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000109 @test.idempotent_id('cf776f77-8d3d-49f2-8572-12d6a1557224')
110 def test_list_admin_rule_types(self):
111 self._test_list_rule_types(self.admin_client)
112
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000113 @test.idempotent_id('49c8ea35-83a9-453a-bd23-239cf3b13929')
114 def test_list_regular_rule_types(self):
115 self._test_list_rule_types(self.client)
116
117 def _test_list_rule_types(self, client):
118 # List supported rule types
119 # TODO(QoS): since in gate we run both ovs and linuxbridge ml2 drivers,
120 # and since Linux Bridge ml2 driver does not have QoS support yet, ml2
121 # plugin reports no rule types are supported. Once linuxbridge will
122 # receive support for QoS, the list of expected rule types will change.
123 #
124 # In theory, we could make the test conditional on which ml2 drivers
125 # are enabled in gate (or more specifically, on which supported qos
126 # rules are claimed by core plugin), but that option doesn't seem to be
127 # available thru tempest.lib framework
128 expected_rule_types = []
129 expected_rule_details = ['type']
130
131 rule_types = client.list_qos_rule_types()
132 actual_list_rule_types = rule_types['rule_types']
133 actual_rule_types = [rule['type'] for rule in actual_list_rule_types]
134
135 # Verify that only required fields present in rule details
136 for rule in actual_list_rule_types:
137 self.assertEqual(tuple(rule.keys()), tuple(expected_rule_details))
138
139 # Verify if expected rules are present in the actual rules list
140 for rule in expected_rule_types:
141 self.assertIn(rule, actual_rule_types)
142
143 def _disassociate_network(self, client, network_id):
144 client.update_network(network_id, qos_policy_id=None)
145 updated_network = self.admin_client.show_network(network_id)
146 self.assertIsNone(updated_network['network']['qos_policy_id'])
147
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000148 @test.idempotent_id('65b9ef75-1911-406a-bbdb-ca1d68d528b0')
149 def test_policy_association_with_admin_network(self):
150 policy = self.create_qos_policy(name='test-policy',
151 description='test policy',
152 shared=False)
153 network = self.create_shared_network('test network',
154 qos_policy_id=policy['id'])
155
156 retrieved_network = self.admin_client.show_network(network['id'])
157 self.assertEqual(
158 policy['id'], retrieved_network['network']['qos_policy_id'])
159
160 self._disassociate_network(self.admin_client, network['id'])
161
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000162 @test.idempotent_id('1738de5d-0476-4163-9022-5e1b548c208e')
163 def test_policy_association_with_tenant_network(self):
164 policy = self.create_qos_policy(name='test-policy',
165 description='test policy',
166 shared=True)
167 network = self.create_network('test network',
168 qos_policy_id=policy['id'])
169
170 retrieved_network = self.admin_client.show_network(network['id'])
171 self.assertEqual(
172 policy['id'], retrieved_network['network']['qos_policy_id'])
173
174 self._disassociate_network(self.client, network['id'])
175
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000176 @test.idempotent_id('9efe63d0-836f-4cc2-b00c-468e63aa614e')
177 def test_policy_association_with_network_nonexistent_policy(self):
178 self.assertRaises(
179 exceptions.NotFound,
180 self.create_network,
181 'test network',
182 qos_policy_id='9efe63d0-836f-4cc2-b00c-468e63aa614e')
183
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000184 @test.idempotent_id('1aa55a79-324f-47d9-a076-894a8fc2448b')
185 def test_policy_association_with_network_non_shared_policy(self):
186 policy = self.create_qos_policy(name='test-policy',
187 description='test policy',
188 shared=False)
189 self.assertRaises(
190 exceptions.NotFound,
191 self.create_network,
192 'test network', qos_policy_id=policy['id'])
193
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000194 @test.idempotent_id('09a9392c-1359-4cbb-989f-fb768e5834a8')
195 def test_policy_update_association_with_admin_network(self):
196 policy = self.create_qos_policy(name='test-policy',
197 description='test policy',
198 shared=False)
199 network = self.create_shared_network('test network')
200 retrieved_network = self.admin_client.show_network(network['id'])
201 self.assertIsNone(retrieved_network['network']['qos_policy_id'])
202
203 self.admin_client.update_network(network['id'],
204 qos_policy_id=policy['id'])
205 retrieved_network = self.admin_client.show_network(network['id'])
206 self.assertEqual(
207 policy['id'], retrieved_network['network']['qos_policy_id'])
208
209 self._disassociate_network(self.admin_client, network['id'])
210
211 def _disassociate_port(self, port_id):
212 self.client.update_port(port_id, qos_policy_id=None)
213 updated_port = self.admin_client.show_port(port_id)
214 self.assertIsNone(updated_port['port']['qos_policy_id'])
215
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000216 @test.idempotent_id('98fcd95e-84cf-4746-860e-44692e674f2e')
217 def test_policy_association_with_port_shared_policy(self):
218 policy = self.create_qos_policy(name='test-policy',
219 description='test policy',
220 shared=True)
221 network = self.create_shared_network('test network')
222 port = self.create_port(network, qos_policy_id=policy['id'])
223
224 retrieved_port = self.admin_client.show_port(port['id'])
225 self.assertEqual(
226 policy['id'], retrieved_port['port']['qos_policy_id'])
227
228 self._disassociate_port(port['id'])
229
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000230 @test.idempotent_id('49e02f5a-e1dd-41d5-9855-cfa37f2d195e')
231 def test_policy_association_with_port_nonexistent_policy(self):
232 network = self.create_shared_network('test network')
233 self.assertRaises(
234 exceptions.NotFound,
235 self.create_port,
236 network,
237 qos_policy_id='49e02f5a-e1dd-41d5-9855-cfa37f2d195e')
238
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000239 @test.idempotent_id('f53d961c-9fe5-4422-8b66-7add972c6031')
240 def test_policy_association_with_port_non_shared_policy(self):
241 policy = self.create_qos_policy(name='test-policy',
242 description='test policy',
243 shared=False)
244 network = self.create_shared_network('test network')
245 self.assertRaises(
246 exceptions.NotFound,
247 self.create_port,
248 network, qos_policy_id=policy['id'])
249
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000250 @test.idempotent_id('f8163237-fba9-4db5-9526-bad6d2343c76')
251 def test_policy_update_association_with_port_shared_policy(self):
252 policy = self.create_qos_policy(name='test-policy',
253 description='test policy',
254 shared=True)
255 network = self.create_shared_network('test network')
256 port = self.create_port(network)
257 retrieved_port = self.admin_client.show_port(port['id'])
258 self.assertIsNone(retrieved_port['port']['qos_policy_id'])
259
260 self.client.update_port(port['id'], qos_policy_id=policy['id'])
261 retrieved_port = self.admin_client.show_port(port['id'])
262 self.assertEqual(
263 policy['id'], retrieved_port['port']['qos_policy_id'])
264
265 self._disassociate_port(port['id'])
266
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000267 @test.idempotent_id('18163237-8ba9-4db5-9525-bad6d2343c75')
268 def test_delete_not_allowed_if_policy_in_use_by_network(self):
269 policy = self.create_qos_policy(name='test-policy',
270 description='test policy',
271 shared=True)
272 network = self.create_shared_network(
273 'test network', qos_policy_id=policy['id'])
274 self.assertRaises(
275 exceptions.Conflict,
276 self.admin_client.delete_qos_policy, policy['id'])
277
278 self._disassociate_network(self.admin_client, network['id'])
279 self.admin_client.delete_qos_policy(policy['id'])
280
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000281 @test.idempotent_id('24153230-84a9-4dd5-9525-bad6d2343c75')
282 def test_delete_not_allowed_if_policy_in_use_by_port(self):
283 policy = self.create_qos_policy(name='test-policy',
284 description='test policy',
285 shared=True)
286 network = self.create_shared_network('test network')
287 port = self.create_port(network, qos_policy_id=policy['id'])
288 self.assertRaises(
289 exceptions.Conflict,
290 self.admin_client.delete_qos_policy, policy['id'])
291
292 self._disassociate_port(port['id'])
293 self.admin_client.delete_qos_policy(policy['id'])
294
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000295 @test.idempotent_id('a2a5849b-dd06-4b18-9664-0b6828a1fc27')
296 def test_qos_policy_delete_with_rules(self):
297 policy = self.create_qos_policy(name='test-policy',
298 description='test policy',
299 shared=False)
300 self.admin_client.create_bandwidth_limit_rule(
301 policy['id'], 200, 1337)['bandwidth_limit_rule']
302
303 self.admin_client.delete_qos_policy(policy['id'])
304
305 with testtools.ExpectedException(exceptions.NotFound):
306 self.admin_client.show_qos_policy(policy['id'])
307
308
309class QosBandwidthLimitRuleTestJSON(base.BaseAdminNetworkTest):
310 @classmethod
311 @test.requires_ext(extension="qos", service="network")
312 def resource_setup(cls):
313 super(QosBandwidthLimitRuleTestJSON, cls).resource_setup()
314
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000315 @test.idempotent_id('8a59b00b-3e9c-4787-92f8-93a5cdf5e378')
316 def test_rule_create(self):
317 policy = self.create_qos_policy(name='test-policy',
318 description='test policy',
319 shared=False)
320 rule = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
321 max_kbps=200,
322 max_burst_kbps=1337)
323
324 # Test 'show rule'
325 retrieved_rule = self.admin_client.show_bandwidth_limit_rule(
326 policy['id'], rule['id'])
327 retrieved_rule = retrieved_rule['bandwidth_limit_rule']
328 self.assertEqual(rule['id'], retrieved_rule['id'])
329 self.assertEqual(200, retrieved_rule['max_kbps'])
330 self.assertEqual(1337, retrieved_rule['max_burst_kbps'])
331
332 # Test 'list rules'
333 rules = self.admin_client.list_bandwidth_limit_rules(policy['id'])
334 rules = rules['bandwidth_limit_rules']
335 rules_ids = [r['id'] for r in rules]
336 self.assertIn(rule['id'], rules_ids)
337
338 # Test 'show policy'
339 retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
340 policy_rules = retrieved_policy['policy']['rules']
341 self.assertEqual(1, len(policy_rules))
342 self.assertEqual(rule['id'], policy_rules[0]['id'])
343 self.assertEqual(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT,
344 policy_rules[0]['type'])
345
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000346 @test.idempotent_id('8a59b00b-ab01-4787-92f8-93a5cdf5e378')
347 def test_rule_create_fail_for_the_same_type(self):
348 policy = self.create_qos_policy(name='test-policy',
349 description='test policy',
350 shared=False)
351 self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
352 max_kbps=200,
353 max_burst_kbps=1337)
354
355 self.assertRaises(exceptions.Conflict,
356 self.create_qos_bandwidth_limit_rule,
357 policy_id=policy['id'],
358 max_kbps=201, max_burst_kbps=1338)
359
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000360 @test.idempotent_id('149a6988-2568-47d2-931e-2dbc858943b3')
361 def test_rule_update(self):
362 policy = self.create_qos_policy(name='test-policy',
363 description='test policy',
364 shared=False)
365 rule = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
366 max_kbps=1,
367 max_burst_kbps=1)
368
369 self.admin_client.update_bandwidth_limit_rule(policy['id'],
370 rule['id'],
371 max_kbps=200,
372 max_burst_kbps=1337)
373
374 retrieved_policy = self.admin_client.show_bandwidth_limit_rule(
375 policy['id'], rule['id'])
376 retrieved_policy = retrieved_policy['bandwidth_limit_rule']
377 self.assertEqual(200, retrieved_policy['max_kbps'])
378 self.assertEqual(1337, retrieved_policy['max_burst_kbps'])
379
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000380 @test.idempotent_id('67ee6efd-7b33-4a68-927d-275b4f8ba958')
381 def test_rule_delete(self):
382 policy = self.create_qos_policy(name='test-policy',
383 description='test policy',
384 shared=False)
385 rule = self.admin_client.create_bandwidth_limit_rule(
386 policy['id'], 200, 1337)['bandwidth_limit_rule']
387
388 retrieved_policy = self.admin_client.show_bandwidth_limit_rule(
389 policy['id'], rule['id'])
390 retrieved_policy = retrieved_policy['bandwidth_limit_rule']
391 self.assertEqual(rule['id'], retrieved_policy['id'])
392
393 self.admin_client.delete_bandwidth_limit_rule(policy['id'], rule['id'])
394 self.assertRaises(exceptions.NotFound,
395 self.admin_client.show_bandwidth_limit_rule,
396 policy['id'], rule['id'])
397
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000398 @test.idempotent_id('f211222c-5808-46cb-a961-983bbab6b852')
399 def test_rule_create_rule_nonexistent_policy(self):
400 self.assertRaises(
401 exceptions.NotFound,
402 self.create_qos_bandwidth_limit_rule,
403 'policy', 200, 1337)
404
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000405 @test.idempotent_id('eed8e2a6-22da-421b-89b9-935a2c1a1b50')
406 def test_policy_create_forbidden_for_regular_tenants(self):
407 self.assertRaises(
408 exceptions.Forbidden,
409 self.client.create_qos_policy,
410 'test-policy', 'test policy', False)
411
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000412 @test.idempotent_id('a4a2e7ad-786f-4927-a85a-e545a93bd274')
413 def test_rule_create_forbidden_for_regular_tenants(self):
414 self.assertRaises(
415 exceptions.Forbidden,
416 self.client.create_bandwidth_limit_rule,
417 'policy', 1, 2)
418
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000419 @test.idempotent_id('ce0bd0c2-54d9-4e29-85f1-cfb36ac3ebe2')
420 def test_get_rules_by_policy(self):
421 policy1 = self.create_qos_policy(name='test-policy1',
422 description='test policy1',
423 shared=False)
424 rule1 = self.create_qos_bandwidth_limit_rule(policy_id=policy1['id'],
425 max_kbps=200,
426 max_burst_kbps=1337)
427
428 policy2 = self.create_qos_policy(name='test-policy2',
429 description='test policy2',
430 shared=False)
431 rule2 = self.create_qos_bandwidth_limit_rule(policy_id=policy2['id'],
432 max_kbps=5000,
433 max_burst_kbps=2523)
434
435 # Test 'list rules'
436 rules = self.admin_client.list_bandwidth_limit_rules(policy1['id'])
437 rules = rules['bandwidth_limit_rules']
438 rules_ids = [r['id'] for r in rules]
439 self.assertIn(rule1['id'], rules_ids)
440 self.assertNotIn(rule2['id'], rules_ids)
441
442
443class RbacSharedQosPoliciesTest(base.BaseAdminNetworkTest):
444
445 force_tenant_isolation = True
446 credentials = ['primary', 'alt', 'admin']
447
448 @classmethod
449 @test.requires_ext(extension="qos", service="network")
450 def resource_setup(cls):
451 super(RbacSharedQosPoliciesTest, cls).resource_setup()
452 cls.client2 = cls.alt_manager.network_client
453
454 def _create_qos_policy(self, tenant_id=None):
455 args = {'name': data_utils.rand_name('test-policy'),
456 'description': 'test policy',
457 'shared': False,
458 'tenant_id': tenant_id}
459 qos_policy = self.admin_client.create_qos_policy(**args)['policy']
460 self.addCleanup(self.admin_client.delete_qos_policy, qos_policy['id'])
461
462 return qos_policy
463
464 def _make_admin_policy_shared_to_tenant_id(self, tenant_id):
465 policy = self._create_qos_policy()
466 rbac_policy = self.admin_client.create_rbac_policy(
467 object_type='qos_policy',
468 object_id=policy['id'],
469 action='access_as_shared',
470 target_tenant=tenant_id,
471 )['rbac_policy']
472
473 return {'policy': policy, 'rbac_policy': rbac_policy}
474
475 def _create_network(self, qos_policy_id, client, should_cleanup=True):
476 net = client.create_network(
477 name=data_utils.rand_name('test-network'),
478 qos_policy_id=qos_policy_id)['network']
479 if should_cleanup:
480 self.addCleanup(client.delete_network, net['id'])
481
482 return net
483
484 @test.idempotent_id('b9dcf582-d3b3-11e5-950a-54ee756c66df')
485 def test_policy_sharing_with_wildcard(self):
486 qos_pol = self.create_qos_policy(
487 name=data_utils.rand_name('test-policy'),
488 description='test-shared-policy', shared=False)
489 self.assertNotIn(qos_pol, self.client2.list_qos_policies()['policies'])
490
491 # test update shared False -> True
492 self.admin_client.update_qos_policy(qos_pol['id'], shared=True)
493 qos_pol['shared'] = True
494 self.client2.show_qos_policy(qos_pol['id'])
495 rbac_pol = {'target_tenant': '*',
496 'tenant_id': self.admin_client.tenant_id,
497 'object_type': 'qos_policy',
498 'object_id': qos_pol['id'],
499 'action': 'access_as_shared'}
500
501 rbac_policies = self.admin_client.list_rbac_policies()['rbac_policies']
502 rbac_policies = [r for r in rbac_policies if r.pop('id')]
503 self.assertIn(rbac_pol, rbac_policies)
504
505 # update shared True -> False should fail because the policy is bound
506 # to a network
507 net = self._create_network(qos_pol['id'], self.admin_client, False)
508 with testtools.ExpectedException(exceptions.Conflict):
509 self.admin_client.update_qos_policy(qos_pol['id'], shared=False)
510
511 # delete the network, and update shared True -> False should pass now
512 self.admin_client.delete_network(net['id'])
513 self.admin_client.update_qos_policy(qos_pol['id'], shared=False)
514 qos_pol['shared'] = False
515 self.assertNotIn(qos_pol, self.client2.list_qos_policies()['policies'])
516
517 def _create_net_bound_qos_rbacs(self):
518 res = self._make_admin_policy_shared_to_tenant_id(
519 self.client.tenant_id)
520 qos_policy, rbac_for_client_tenant = res['policy'], res['rbac_policy']
521
522 # add a wildcard rbac rule - now the policy globally shared
523 rbac_wildcard = self.admin_client.create_rbac_policy(
524 object_type='qos_policy',
525 object_id=qos_policy['id'],
526 action='access_as_shared',
527 target_tenant='*',
528 )['rbac_policy']
529
530 # tenant1 now uses qos policy for net
531 self._create_network(qos_policy['id'], self.client)
532
533 return rbac_for_client_tenant, rbac_wildcard
534
535 @test.idempotent_id('328b1f70-d424-11e5-a57f-54ee756c66df')
536 def test_net_bound_shared_policy_wildcard_and_tenant_id_wild_remove(self):
537 client_rbac, wildcard_rbac = self._create_net_bound_qos_rbacs()
538 # globally unshare the qos-policy, the specific share should remain
539 self.admin_client.delete_rbac_policy(wildcard_rbac['id'])
540 self.client.list_rbac_policies(id=client_rbac['id'])
541
Miguel Angel Ajocda3f072016-04-01 15:24:54 +0200542 @test.idempotent_id('1997b00c-0c75-4e43-8ce2-999f9fa555ee')
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000543 def test_net_bound_shared_policy_wildcard_and_tenant_id_wild_remains(self):
544 client_rbac, wildcard_rbac = self._create_net_bound_qos_rbacs()
545 # remove client_rbac policy the wildcard share should remain
546 self.admin_client.delete_rbac_policy(client_rbac['id'])
547 self.client.list_rbac_policies(id=wildcard_rbac['id'])
548
549 @test.idempotent_id('2ace9adc-da6e-11e5-aafe-54ee756c66df')
550 def test_policy_sharing_with_wildcard_and_tenant_id(self):
551 res = self._make_admin_policy_shared_to_tenant_id(
552 self.client.tenant_id)
553 qos_policy, rbac = res['policy'], res['rbac_policy']
554 qos_pol = self.client.show_qos_policy(qos_policy['id'])['policy']
555 self.assertTrue(qos_pol['shared'])
556 with testtools.ExpectedException(exceptions.NotFound):
557 self.client2.show_qos_policy(qos_policy['id'])
558
559 # make the qos-policy globally shared
560 self.admin_client.update_qos_policy(qos_policy['id'], shared=True)
561 qos_pol = self.client2.show_qos_policy(qos_policy['id'])['policy']
562 self.assertTrue(qos_pol['shared'])
563
564 # globally unshare the qos-policy, the specific share should remain
565 self.admin_client.update_qos_policy(qos_policy['id'], shared=False)
566 self.client.show_qos_policy(qos_policy['id'])
567 with testtools.ExpectedException(exceptions.NotFound):
568 self.client2.show_qos_policy(qos_policy['id'])
569 self.assertIn(rbac,
570 self.admin_client.list_rbac_policies()['rbac_policies'])
571
572 @test.idempotent_id('9f85c76a-a350-11e5-8ae5-54ee756c66df')
573 def test_policy_target_update(self):
574 res = self._make_admin_policy_shared_to_tenant_id(
575 self.client.tenant_id)
576 # change to client2
577 update_res = self.admin_client.update_rbac_policy(
578 res['rbac_policy']['id'], target_tenant=self.client2.tenant_id)
579 self.assertEqual(self.client2.tenant_id,
580 update_res['rbac_policy']['target_tenant'])
581 # make sure everything else stayed the same
582 res['rbac_policy'].pop('target_tenant')
583 update_res['rbac_policy'].pop('target_tenant')
584 self.assertEqual(res['rbac_policy'], update_res['rbac_policy'])
585
586 @test.idempotent_id('a9b39f46-a350-11e5-97c7-54ee756c66df')
587 def test_network_presence_prevents_policy_rbac_policy_deletion(self):
588 res = self._make_admin_policy_shared_to_tenant_id(
589 self.client2.tenant_id)
590 qos_policy_id = res['policy']['id']
591 self._create_network(qos_policy_id, self.client2)
592 # a network with shared qos-policy should prevent the deletion of an
593 # rbac-policy required for it to be shared
594 with testtools.ExpectedException(exceptions.Conflict):
595 self.admin_client.delete_rbac_policy(res['rbac_policy']['id'])
596
597 # a wildcard policy should allow the specific policy to be deleted
598 # since it allows the remaining port
599 wild = self.admin_client.create_rbac_policy(
600 object_type='qos_policy', object_id=res['policy']['id'],
601 action='access_as_shared', target_tenant='*')['rbac_policy']
602 self.admin_client.delete_rbac_policy(res['rbac_policy']['id'])
603
604 # now that wildcard is the only remaining, it should be subjected to
605 # the same restriction
606 with testtools.ExpectedException(exceptions.Conflict):
607 self.admin_client.delete_rbac_policy(wild['id'])
608
609 # we can't update the policy to a different tenant
610 with testtools.ExpectedException(exceptions.Conflict):
611 self.admin_client.update_rbac_policy(
612 wild['id'], target_tenant=self.client2.tenant_id)
613
614 @test.idempotent_id('b0fe87e8-a350-11e5-9f08-54ee756c66df')
615 def test_regular_client_shares_to_another_regular_client(self):
616 # owned by self.admin_client
617 policy = self._create_qos_policy()
618 with testtools.ExpectedException(exceptions.NotFound):
619 self.client.show_qos_policy(policy['id'])
620 rbac_policy = self.admin_client.create_rbac_policy(
621 object_type='qos_policy', object_id=policy['id'],
622 action='access_as_shared',
623 target_tenant=self.client.tenant_id)['rbac_policy']
624 self.client.show_qos_policy(policy['id'])
625
626 self.assertIn(rbac_policy,
627 self.admin_client.list_rbac_policies()['rbac_policies'])
628 # ensure that 'client2' can't see the rbac-policy sharing the
629 # qos-policy to it because the rbac-policy belongs to 'client'
630 self.assertNotIn(rbac_policy['id'], [p['id'] for p in
631 self.client2.list_rbac_policies()['rbac_policies']])
632
633 @test.idempotent_id('ba88d0ca-a350-11e5-a06f-54ee756c66df')
634 def test_filter_fields(self):
635 policy = self._create_qos_policy()
636 self.admin_client.create_rbac_policy(
637 object_type='qos_policy', object_id=policy['id'],
638 action='access_as_shared', target_tenant=self.client2.tenant_id)
639 field_args = (('id',), ('id', 'action'), ('object_type', 'object_id'),
640 ('tenant_id', 'target_tenant'))
641 for fields in field_args:
642 res = self.admin_client.list_rbac_policies(fields=fields)
643 self.assertEqual(set(fields), set(res['rbac_policies'][0].keys()))
644
645 @test.idempotent_id('c10d993a-a350-11e5-9c7a-54ee756c66df')
646 def test_rbac_policy_show(self):
647 res = self._make_admin_policy_shared_to_tenant_id(
648 self.client.tenant_id)
649 p1 = res['rbac_policy']
650 p2 = self.admin_client.create_rbac_policy(
651 object_type='qos_policy', object_id=res['policy']['id'],
652 action='access_as_shared',
653 target_tenant='*')['rbac_policy']
654
655 self.assertEqual(
656 p1, self.admin_client.show_rbac_policy(p1['id'])['rbac_policy'])
657 self.assertEqual(
658 p2, self.admin_client.show_rbac_policy(p2['id'])['rbac_policy'])
659
660 @test.idempotent_id('c7496f86-a350-11e5-b380-54ee756c66df')
661 def test_filter_rbac_policies(self):
662 policy = self._create_qos_policy()
663 rbac_pol1 = self.admin_client.create_rbac_policy(
664 object_type='qos_policy', object_id=policy['id'],
665 action='access_as_shared',
666 target_tenant=self.client2.tenant_id)['rbac_policy']
667 rbac_pol2 = self.admin_client.create_rbac_policy(
668 object_type='qos_policy', object_id=policy['id'],
669 action='access_as_shared',
670 target_tenant=self.admin_client.tenant_id)['rbac_policy']
671 res1 = self.admin_client.list_rbac_policies(id=rbac_pol1['id'])[
672 'rbac_policies']
673 res2 = self.admin_client.list_rbac_policies(id=rbac_pol2['id'])[
674 'rbac_policies']
675 self.assertEqual(1, len(res1))
676 self.assertEqual(1, len(res2))
677 self.assertEqual(rbac_pol1['id'], res1[0]['id'])
678 self.assertEqual(rbac_pol2['id'], res2[0]['id'])
679
680 @test.idempotent_id('cd7d755a-a350-11e5-a344-54ee756c66df')
681 def test_regular_client_blocked_from_sharing_anothers_policy(self):
682 qos_policy = self._make_admin_policy_shared_to_tenant_id(
683 self.client.tenant_id)['policy']
684 with testtools.ExpectedException(exceptions.BadRequest):
685 self.client.create_rbac_policy(
686 object_type='qos_policy', object_id=qos_policy['id'],
687 action='access_as_shared',
688 target_tenant=self.client2.tenant_id)
689
690 # make sure the rbac-policy is invisible to the tenant for which it's
691 # being shared
692 self.assertFalse(self.client.list_rbac_policies()['rbac_policies'])
693
694
695class QosDscpMarkingRuleTestJSON(base.BaseAdminNetworkTest):
696 VALID_DSCP_MARK1 = 56
697 VALID_DSCP_MARK2 = 48
698
699 @classmethod
700 @test.requires_ext(extension="qos", service="network")
701 def resource_setup(cls):
702 super(QosDscpMarkingRuleTestJSON, cls).resource_setup()
703
Miguel Angel Ajocda3f072016-04-01 15:24:54 +0200704 @test.idempotent_id('f5cbaceb-5829-497c-9c60-ad70969e9a08')
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000705 def test_rule_create(self):
706 policy = self.create_qos_policy(name='test-policy',
707 description='test policy',
708 shared=False)
709 rule = self.admin_client.create_dscp_marking_rule(
710 policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
711
712 # Test 'show rule'
713 retrieved_rule = self.admin_client.show_dscp_marking_rule(
714 policy['id'], rule['id'])
715 retrieved_rule = retrieved_rule['dscp_marking_rule']
716 self.assertEqual(rule['id'], retrieved_rule['id'])
717 self.assertEqual(self.VALID_DSCP_MARK1, retrieved_rule['dscp_mark'])
718
719 # Test 'list rules'
720 rules = self.admin_client.list_dscp_marking_rules(policy['id'])
721 rules = rules['dscp_marking_rules']
722 rules_ids = [r['id'] for r in rules]
723 self.assertIn(rule['id'], rules_ids)
724
725 # Test 'show policy'
726 retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
727 policy_rules = retrieved_policy['policy']['rules']
728 self.assertEqual(1, len(policy_rules))
729 self.assertEqual(rule['id'], policy_rules[0]['id'])
730 self.assertEqual(qos_consts.RULE_TYPE_DSCP_MARK,
731 policy_rules[0]['type'])
732
Miguel Angel Ajocda3f072016-04-01 15:24:54 +0200733 @test.idempotent_id('08553ffe-030f-4037-b486-7e0b8fb9385a')
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000734 def test_rule_create_fail_for_the_same_type(self):
735 policy = self.create_qos_policy(name='test-policy',
736 description='test policy',
737 shared=False)
738 self.admin_client.create_dscp_marking_rule(
739 policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
740
741 self.assertRaises(exceptions.Conflict,
742 self.admin_client.create_dscp_marking_rule,
743 policy_id=policy['id'],
744 dscp_mark=self.VALID_DSCP_MARK2)
745
Miguel Angel Ajocda3f072016-04-01 15:24:54 +0200746 @test.idempotent_id('76f632e5-3175-4408-9a32-3625e599c8a2')
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000747 def test_rule_update(self):
748 policy = self.create_qos_policy(name='test-policy',
749 description='test policy',
750 shared=False)
751 rule = self.admin_client.create_dscp_marking_rule(
752 policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
753
754 self.admin_client.update_dscp_marking_rule(
755 policy['id'], rule['id'], dscp_mark=self.VALID_DSCP_MARK2)
756
757 retrieved_policy = self.admin_client.show_dscp_marking_rule(
758 policy['id'], rule['id'])
759 retrieved_policy = retrieved_policy['dscp_marking_rule']
760 self.assertEqual(self.VALID_DSCP_MARK2, retrieved_policy['dscp_mark'])
761
Miguel Angel Ajocda3f072016-04-01 15:24:54 +0200762 @test.idempotent_id('74f81904-c35f-48a3-adae-1f5424cb3c18')
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000763 def test_rule_delete(self):
764 policy = self.create_qos_policy(name='test-policy',
765 description='test policy',
766 shared=False)
767 rule = self.admin_client.create_dscp_marking_rule(
768 policy['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
769
770 retrieved_policy = self.admin_client.show_dscp_marking_rule(
771 policy['id'], rule['id'])
772 retrieved_policy = retrieved_policy['dscp_marking_rule']
773 self.assertEqual(rule['id'], retrieved_policy['id'])
774
775 self.admin_client.delete_dscp_marking_rule(policy['id'], rule['id'])
776 self.assertRaises(exceptions.NotFound,
777 self.admin_client.show_dscp_marking_rule,
778 policy['id'], rule['id'])
779
Miguel Angel Ajocda3f072016-04-01 15:24:54 +0200780 @test.idempotent_id('9cb8ef5c-96fc-4978-9ee0-e3b02bab628a')
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000781 def test_rule_create_rule_nonexistent_policy(self):
782 self.assertRaises(
783 exceptions.NotFound,
784 self.admin_client.create_dscp_marking_rule,
785 'policy', self.VALID_DSCP_MARK1)
786
Miguel Angel Ajocda3f072016-04-01 15:24:54 +0200787 @test.idempotent_id('bf6002ea-29de-486f-b65d-08aea6d4c4e2')
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000788 def test_rule_create_forbidden_for_regular_tenants(self):
789 self.assertRaises(
790 exceptions.Forbidden,
791 self.client.create_dscp_marking_rule,
792 'policy', self.VALID_DSCP_MARK1)
793
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000794 @test.idempotent_id('33646b08-4f05-4493-a48a-bde768a18533')
795 def test_invalid_rule_create(self):
796 policy = self.create_qos_policy(name='test-policy',
797 description='test policy',
798 shared=False)
799 self.assertRaises(
800 exceptions.BadRequest,
801 self.admin_client.create_dscp_marking_rule,
802 policy['id'], 58)
803
Miguel Angel Ajocda3f072016-04-01 15:24:54 +0200804 @test.idempotent_id('c565131d-4c80-4231-b0f3-9ae2be4de129')
Daniel Mellado3c0aeab2016-01-29 11:30:25 +0000805 def test_get_rules_by_policy(self):
806 policy1 = self.create_qos_policy(name='test-policy1',
807 description='test policy1',
808 shared=False)
809 rule1 = self.admin_client.create_dscp_marking_rule(
810 policy1['id'], self.VALID_DSCP_MARK1)['dscp_marking_rule']
811
812 policy2 = self.create_qos_policy(name='test-policy2',
813 description='test policy2',
814 shared=False)
815 rule2 = self.admin_client.create_dscp_marking_rule(
816 policy2['id'], self.VALID_DSCP_MARK2)['dscp_marking_rule']
817
818 # Test 'list rules'
819 rules = self.admin_client.list_dscp_marking_rules(policy1['id'])
820 rules = rules['dscp_marking_rules']
821 rules_ids = [r['id'] for r in rules]
822 self.assertIn(rule1['id'], rules_ids)
823 self.assertNotIn(rule2['id'], rules_ids)