Merge "Refactoring - Use existing Tempest APIs in "QoS bandwidth limit rule" tests"
diff --git a/neutron_tempest_plugin/api/admin/test_ports.py b/neutron_tempest_plugin/api/admin/test_ports.py
index 6f51b1c..b277fac 100644
--- a/neutron_tempest_plugin/api/admin/test_ports.py
+++ b/neutron_tempest_plugin/api/admin/test_ports.py
@@ -94,16 +94,27 @@
cls.prov_network = cls.create_provider_network(
physnet_name=cls.physnet_name, start_segmentation_id=base_segm)
+ @classmethod
+ def setup_clients(cls):
+ super(PortTestCasesResourceRequest, cls).setup_clients()
+ cls.qos_minimum_bandwidth_rules_client = \
+ cls.os_admin.qos_minimum_bandwidth_rules_client
+ cls.qos_bw_limit_rule_client = \
+ cls.os_admin.qos_limit_bandwidth_rules_client
+
def _create_qos_policy_and_port(self, network, vnic_type,
network_policy=False):
qos_policy = self.create_qos_policy(
name=data_utils.rand_name('test_policy'), shared=True)
- self.create_qos_minimum_bandwidth_rule(qos_policy['id'],
- self.EGRESS_KBPS,
- const.EGRESS_DIRECTION)
- self.create_qos_minimum_bandwidth_rule(qos_policy['id'],
- self.INGRESS_KBPS,
- const.INGRESS_DIRECTION)
+ self.qos_minimum_bandwidth_rules_client.create_minimum_bandwidth_rule(
+ qos_policy_id=qos_policy['id'],
+ **{'direction': const.EGRESS_DIRECTION,
+ 'min_kbps': self.EGRESS_KBPS})
+
+ self.qos_minimum_bandwidth_rules_client.create_minimum_bandwidth_rule(
+ qos_policy_id=qos_policy['id'],
+ **{'direction': const.INGRESS_DIRECTION,
+ 'min_kbps': self.INGRESS_KBPS})
port_policy_id = qos_policy['id'] if not network_policy else None
port_kwargs = {
@@ -163,9 +174,11 @@
# Note(lajoskatona): Add a non-minimum-bandwidth-rule to the policy
# to make sure that the resource request is not filled with it.
- self.create_qos_bandwidth_limit_rule(qos_policy['id'],
- self.EGRESS_KBPS, 800,
- const.EGRESS_DIRECTION)
+ self.qos_bw_limit_rule_client.create_limit_bandwidth_rule(
+ qos_policy['id'],
+ **{'max_kbps': self.EGRESS_KBPS,
+ 'max_burst_kbps': 800,
+ 'direction': const.EGRESS_DIRECTION})
port_kwargs = {
'qos_policy_id': qos_policy['id'],
diff --git a/neutron_tempest_plugin/api/base.py b/neutron_tempest_plugin/api/base.py
index 024fe43..ecdd00a 100644
--- a/neutron_tempest_plugin/api/base.py
+++ b/neutron_tempest_plugin/api/base.py
@@ -763,27 +763,6 @@
return qos_policy
@classmethod
- def create_qos_bandwidth_limit_rule(cls, policy_id, max_kbps,
- max_burst_kbps,
- direction=const.EGRESS_DIRECTION):
- """Wrapper utility that returns a test QoS bandwidth limit rule."""
- body = cls.admin_client.create_bandwidth_limit_rule(
- policy_id, max_kbps, max_burst_kbps, direction)
- qos_rule = body['bandwidth_limit_rule']
- cls.qos_rules.append(qos_rule)
- return qos_rule
-
- @classmethod
- def create_qos_minimum_bandwidth_rule(cls, policy_id, min_kbps,
- direction=const.EGRESS_DIRECTION):
- """Wrapper utility that creates and returns a QoS min bw rule."""
- body = cls.admin_client.create_minimum_bandwidth_rule(
- policy_id, direction, min_kbps)
- qos_rule = body['minimum_bandwidth_rule']
- cls.qos_rules.append(qos_rule)
- return qos_rule
-
- @classmethod
def create_qos_dscp_marking_rule(cls, policy_id, dscp_mark):
"""Wrapper utility that creates and returns a QoS dscp rule."""
body = cls.admin_client.create_dscp_marking_rule(
diff --git a/neutron_tempest_plugin/api/clients.py b/neutron_tempest_plugin/api/clients.py
index 8f5256e..6565dcb 100644
--- a/neutron_tempest_plugin/api/clients.py
+++ b/neutron_tempest_plugin/api/clients.py
@@ -22,6 +22,8 @@
from tempest.lib.services.compute import servers_client
from tempest.lib.services.identity.v2 import tenants_client
from tempest.lib.services.identity.v3 import projects_client
+from tempest.lib.services.network import qos_limit_bandwidth_rules_client
+from tempest.lib.services.network import qos_minimum_bandwidth_rules_client
from neutron_tempest_plugin import config
from neutron_tempest_plugin.services.network.json import network_client
@@ -92,6 +94,26 @@
self.az_client = availability_zone_client.AvailabilityZoneClient(
self.auth_provider, **params)
+ self.qos_limit_bandwidth_rules_client = \
+ qos_limit_bandwidth_rules_client.QosLimitBandwidthRulesClient(
+ self.auth_provider,
+ CONF.network.catalog_type,
+ CONF.network.region or CONF.identity.region,
+ endpoint_type=CONF.network.endpoint_type,
+ build_interval=CONF.network.build_interval,
+ build_timeout=CONF.network.build_timeout,
+ **self.default_params)
+
+ self.qos_minimum_bandwidth_rules_client = \
+ qos_minimum_bandwidth_rules_client.QosMinimumBandwidthRulesClient(
+ self.auth_provider,
+ CONF.network.catalog_type,
+ CONF.network.region or CONF.identity.region,
+ endpoint_type=CONF.network.endpoint_type,
+ build_interval=CONF.network.build_interval,
+ build_timeout=CONF.network.build_timeout,
+ **self.default_params)
+
def _set_identity_clients(self):
params = {
'service': CONF.identity.catalog_type,
diff --git a/neutron_tempest_plugin/api/test_qos.py b/neutron_tempest_plugin/api/test_qos.py
index 5fb0511..5284688 100644
--- a/neutron_tempest_plugin/api/test_qos.py
+++ b/neutron_tempest_plugin/api/test_qos.py
@@ -17,6 +17,7 @@
from neutron_lib.services.qos import constants as qos_consts
from tempest.common import utils
from tempest.lib.common.utils import data_utils
+from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from tempest.lib import exceptions
@@ -34,12 +35,28 @@
required_extensions = [qos_apidef.ALIAS]
+ @classmethod
+ def setup_clients(cls):
+ super(QosTestJSON, cls).setup_clients()
+ cls.qos_bw_limit_rule_client = \
+ cls.os_admin.qos_limit_bandwidth_rules_client
+
@staticmethod
def _get_driver_details(rule_type_details, driver_name):
for driver in rule_type_details['drivers']:
if driver['name'] == driver_name:
return driver
+ def _create_qos_bw_limit_rule(self, policy_id, rule_data):
+ rule = self.qos_bw_limit_rule_client.create_limit_bandwidth_rule(
+ qos_policy_id=policy_id,
+ **rule_data)['bandwidth_limit_rule']
+ self.addCleanup(
+ test_utils.call_and_ignore_notfound_exc,
+ self.qos_bw_limit_rule_client.delete_limit_bandwidth_rule,
+ policy_id, rule['id'])
+ return rule
+
@decorators.idempotent_id('108fbdf7-3463-4e47-9871-d07f3dcf5bbb')
def test_create_policy(self):
policy = self.create_qos_policy(name='test-policy',
@@ -361,11 +378,9 @@
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
- self.admin_client.create_bandwidth_limit_rule(
- policy['id'], 200, 1337)['bandwidth_limit_rule']
-
+ self._create_qos_bw_limit_rule(
+ policy['id'], {'max_kbps': 200, 'max_burst_kbps': 1337})
self.admin_client.delete_qos_policy(policy['id'])
-
with testtools.ExpectedException(exceptions.NotFound):
self.admin_client.show_qos_policy(policy['id'])
@@ -429,14 +444,33 @@
class QosBandwidthLimitRuleTestJSON(base.BaseAdminNetworkTest):
+ credentials = ['primary', 'admin']
direction = None
required_extensions = [qos_apidef.ALIAS]
@classmethod
+ def setup_clients(cls):
+ super(QosBandwidthLimitRuleTestJSON, cls).setup_clients()
+ cls.qos_bw_limit_rule_client = \
+ cls.os_admin.qos_limit_bandwidth_rules_client
+ cls.qos_bw_limit_rule_client_primary = \
+ cls.os_primary.qos_limit_bandwidth_rules_client
+
+ @classmethod
@base.require_qos_rule_type(qos_consts.RULE_TYPE_BANDWIDTH_LIMIT)
def resource_setup(cls):
super(QosBandwidthLimitRuleTestJSON, cls).resource_setup()
+ def _create_qos_bw_limit_rule(self, policy_id, rule_data):
+ rule = self.qos_bw_limit_rule_client.create_limit_bandwidth_rule(
+ qos_policy_id=policy_id,
+ **rule_data)['bandwidth_limit_rule']
+ self.addCleanup(
+ test_utils.call_and_ignore_notfound_exc,
+ self.qos_bw_limit_rule_client.delete_limit_bandwidth_rule,
+ policy_id, rule['id'])
+ return rule
+
@property
def opposite_direction(self):
if self.direction == "ingress":
@@ -451,24 +485,24 @@
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
- rule = self.create_qos_bandwidth_limit_rule(
- policy_id=policy['id'],
- max_kbps=200,
- max_burst_kbps=1337,
- direction=self.direction)
+ rule = self._create_qos_bw_limit_rule(
+ policy['id'],
+ {'max_kbps': 200, 'max_burst_kbps': 1337, 'direction': 'ingress'})
# Test 'show rule'
- retrieved_rule = self.admin_client.show_bandwidth_limit_rule(
- policy['id'], rule['id'])
+ retrieved_rule = \
+ self.qos_bw_limit_rule_client.show_limit_bandwidth_rule(
+ policy['id'], rule['id'])
+
retrieved_rule = retrieved_rule['bandwidth_limit_rule']
self.assertEqual(rule['id'], retrieved_rule['id'])
self.assertEqual(200, retrieved_rule['max_kbps'])
self.assertEqual(1337, retrieved_rule['max_burst_kbps'])
- if self.direction:
- self.assertEqual(self.direction, retrieved_rule['direction'])
+ self.assertEqual('ingress', retrieved_rule['direction'])
# Test 'list rules'
- rules = self.admin_client.list_bandwidth_limit_rules(policy['id'])
+ rules = self.qos_bw_limit_rule_client.list_limit_bandwidth_rules(
+ policy['id'])
rules = rules['bandwidth_limit_rules']
rules_ids = [r['id'] for r in rules]
self.assertIn(rule['id'], rules_ids)
@@ -486,36 +520,34 @@
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
- self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
- max_kbps=200,
- max_burst_kbps=1337,
- direction=self.direction)
+ self._create_qos_bw_limit_rule(
+ policy['id'], {'max_kbps': 200, 'max_burst_kbps': 1337})
- self.assertRaises(exceptions.Conflict,
- self.create_qos_bandwidth_limit_rule,
- policy_id=policy['id'],
- max_kbps=201, max_burst_kbps=1338,
- direction=self.direction)
+ self.assertRaises(
+ exceptions.Conflict,
+ self._create_qos_bw_limit_rule,
+ policy['id'],
+ {'max_kbps': 201, 'max_burst_kbps': 1338})
@decorators.idempotent_id('149a6988-2568-47d2-931e-2dbc858943b3')
def test_rule_update(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
- rule = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
- max_kbps=1,
- max_burst_kbps=1,
- direction=self.direction)
+ rule = self._create_qos_bw_limit_rule(
+ policy['id'], {'max_kbps': 1, 'max_burst_kbps': 1})
- self.admin_client.update_bandwidth_limit_rule(
- policy['id'],
- rule['id'],
- max_kbps=200,
- max_burst_kbps=1337,
- direction=self.opposite_direction)
-
- retrieved_policy = self.admin_client.show_bandwidth_limit_rule(
- policy['id'], rule['id'])
+ if self.opposite_direction:
+ self.qos_bw_limit_rule_client.update_limit_bandwidth_rule(
+ policy['id'], rule['id'],
+ **{'max_kbps': 200, 'max_burst_kbps': 1337,
+ 'direction': self.opposite_direction})
+ else:
+ self.qos_bw_limit_rule_client.update_limit_bandwidth_rule(
+ policy['id'], rule['id'],
+ **{'max_kbps': 200, 'max_burst_kbps': 1337})
+ retrieved_policy = self.qos_bw_limit_rule_client.\
+ show_limit_bandwidth_rule(policy['id'], rule['id'])
retrieved_policy = retrieved_policy['bandwidth_limit_rule']
self.assertEqual(200, retrieved_policy['max_kbps'])
self.assertEqual(1337, retrieved_policy['max_burst_kbps'])
@@ -528,32 +560,33 @@
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
- rule = self.admin_client.create_bandwidth_limit_rule(
- policy['id'], 200, 1337, self.direction)['bandwidth_limit_rule']
-
- retrieved_policy = self.admin_client.show_bandwidth_limit_rule(
- policy['id'], rule['id'])
+ rule = self._create_qos_bw_limit_rule(
+ policy['id'], {'max_kbps': 200, 'max_burst_kbps': 1337})
+ retrieved_policy = \
+ self.qos_bw_limit_rule_client.show_limit_bandwidth_rule(
+ policy['id'], rule['id'])
retrieved_policy = retrieved_policy['bandwidth_limit_rule']
self.assertEqual(rule['id'], retrieved_policy['id'])
-
- self.admin_client.delete_bandwidth_limit_rule(policy['id'], rule['id'])
- self.assertRaises(exceptions.NotFound,
- self.admin_client.show_bandwidth_limit_rule,
- policy['id'], rule['id'])
+ self.qos_bw_limit_rule_client.delete_limit_bandwidth_rule(
+ policy['id'], rule['id'])
+ self.assertRaises(
+ exceptions.NotFound,
+ self.qos_bw_limit_rule_client.show_limit_bandwidth_rule,
+ policy['id'], rule['id'])
@decorators.idempotent_id('f211222c-5808-46cb-a961-983bbab6b852')
def test_rule_create_rule_nonexistent_policy(self):
self.assertRaises(
exceptions.NotFound,
- self.create_qos_bandwidth_limit_rule,
- 'policy', 200, 1337, self.direction)
+ self._create_qos_bw_limit_rule,
+ 'policy', {'max_kbps': 200, 'max_burst_kbps': 1337})
@decorators.idempotent_id('a4a2e7ad-786f-4927-a85a-e545a93bd274')
def test_rule_create_forbidden_for_regular_tenants(self):
self.assertRaises(
exceptions.Forbidden,
- self.client.create_bandwidth_limit_rule,
- 'policy', 1, 2, self.direction)
+ self.qos_bw_limit_rule_client_primary.create_limit_bandwidth_rule,
+ 'policy', **{'max_kbps': 1, 'max_burst_kbps': 2})
@decorators.idempotent_id('1bfc55d9-6fd8-4293-ab3a-b1d69bf7cd2e')
def test_rule_update_forbidden_for_regular_tenants_own_policy(self):
@@ -561,50 +594,47 @@
description='test policy',
shared=False,
project_id=self.client.tenant_id)
- rule = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
- max_kbps=1,
- max_burst_kbps=1,
- direction=self.direction)
+ rule = self._create_qos_bw_limit_rule(
+ policy['id'],
+ {'max_kbps': 1, 'max_burst_kbps': 1})
self.assertRaises(
exceptions.Forbidden,
- self.client.update_bandwidth_limit_rule,
- policy['id'], rule['id'], max_kbps=2, max_burst_kbps=4)
+ self.qos_bw_limit_rule_client_primary.update_limit_bandwidth_rule,
+ policy['id'], rule['id'], **{'max_kbps': 2, 'max_burst_kbps': 4})
@decorators.idempotent_id('9a607936-4b6f-4c2f-ad21-bd5b3d4fc91f')
def test_rule_update_forbidden_for_regular_tenants_foreign_policy(self):
- policy = self.create_qos_policy(name='test-policy',
- description='test policy',
- shared=False,
- project_id=self.admin_client.tenant_id)
- rule = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
- max_kbps=1,
- max_burst_kbps=1,
- direction=self.direction)
+ policy = self.create_qos_policy(
+ name='test-policy',
+ description='test policy',
+ shared=False,
+ project_id=self.admin_client.tenant_id)
+ rule = self._create_qos_bw_limit_rule(
+ policy['id'], {'max_kbps': 1, 'max_burst_kbps': 1})
self.assertRaises(
exceptions.NotFound,
- self.client.update_bandwidth_limit_rule,
- policy['id'], rule['id'], max_kbps=2, max_burst_kbps=4)
+ self.qos_bw_limit_rule_client_primary.update_limit_bandwidth_rule,
+ policy['id'], rule['id'], **{'max_kbps': 2, 'max_burst_kbps': 4})
@decorators.idempotent_id('ce0bd0c2-54d9-4e29-85f1-cfb36ac3ebe2')
def test_get_rules_by_policy(self):
- policy1 = self.create_qos_policy(name='test-policy1',
- description='test policy1',
- shared=False)
- rule1 = self.create_qos_bandwidth_limit_rule(policy_id=policy1['id'],
- max_kbps=200,
- max_burst_kbps=1337,
- direction=self.direction)
+ policy1 = self.create_qos_policy(
+ name='test-policy1',
+ description='test policy1',
+ shared=False)
+ rule1 = self._create_qos_bw_limit_rule(
+ policy1['id'], {'max_kbps': 200, 'max_burst_kbps': 1337})
- policy2 = self.create_qos_policy(name='test-policy2',
- description='test policy2',
- shared=False)
- rule2 = self.create_qos_bandwidth_limit_rule(policy_id=policy2['id'],
- max_kbps=5000,
- max_burst_kbps=2523,
- direction=self.direction)
+ policy2 = self.create_qos_policy(
+ name='test-policy2',
+ description='test policy2',
+ shared=False)
+ rule2 = self._create_qos_bw_limit_rule(
+ policy2['id'], {'max_kbps': 5000, 'max_burst_kbps': 2523})
# Test 'list rules'
- rules = self.admin_client.list_bandwidth_limit_rules(policy1['id'])
+ rules = self.qos_bw_limit_rule_client.list_limit_bandwidth_rules(
+ policy1['id'])
rules = rules['bandwidth_limit_rules']
rules_ids = [r['id'] for r in rules]
self.assertIn(rule1['id'], rules_ids)
@@ -622,9 +652,8 @@
policy = self.create_qos_policy(name='test-policy',
description='test policy for attach',
shared=False)
-
- self.admin_client.create_bandwidth_limit_rule(
- policy['id'], 1024, 1024)
+ self._create_qos_bw_limit_rule(
+ policy['id'], {'max_kbps': 1024, 'max_burst_kbps': 1024})
self.admin_client.update_network(
self.network['id'], qos_policy_id=policy['id'])
@@ -678,37 +707,34 @@
description='test policy1',
shared=False)
- rule1 = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
- max_kbps=1024,
- max_burst_kbps=1024,
- direction=n_constants.
- EGRESS_DIRECTION)
- rule2 = self.create_qos_bandwidth_limit_rule(policy_id=policy['id'],
- max_kbps=1024,
- max_burst_kbps=1024,
- direction=n_constants.
- INGRESS_DIRECTION)
+ rule1 = self._create_qos_bw_limit_rule(
+ policy['id'], {'max_kbps': 1024, 'max_burst_kbps': 1024,
+ 'direction': n_constants.EGRESS_DIRECTION})
+ rule2 = self._create_qos_bw_limit_rule(
+ policy['id'], {'max_kbps': 1024, 'max_burst_kbps': 1024,
+ 'direction': n_constants.INGRESS_DIRECTION})
# Check that the rules were added to the policy
- rules = self.admin_client.list_bandwidth_limit_rules(
+ rules = self.qos_bw_limit_rule_client.list_limit_bandwidth_rules(
policy['id'])['bandwidth_limit_rules']
+
rules_ids = [rule['id'] for rule in rules]
self.assertIn(rule1['id'], rules_ids)
self.assertIn(rule2['id'], rules_ids)
# Check that the rules creation fails for the same rule types
- self.assertRaises(exceptions.Conflict,
- self.create_qos_bandwidth_limit_rule,
- policy_id=policy['id'],
- max_kbps=1025,
- max_burst_kbps=1025,
- direction=n_constants.EGRESS_DIRECTION)
+ self.assertRaises(
+ exceptions.Conflict,
+ self._create_qos_bw_limit_rule,
+ policy['id'],
+ {'max_kbps': 1025, 'max_burst_kbps': 1025,
+ 'direction': n_constants.EGRESS_DIRECTION})
- self.assertRaises(exceptions.Conflict,
- self.create_qos_bandwidth_limit_rule,
- policy_id=policy['id'],
- max_kbps=1025,
- max_burst_kbps=1025,
- direction=n_constants.INGRESS_DIRECTION)
+ self.assertRaises(
+ exceptions.Conflict,
+ self._create_qos_bw_limit_rule,
+ policy['id'],
+ {'max_kbps': 1025, 'max_burst_kbps': 1025,
+ 'direction': n_constants.INGRESS_DIRECTION})
class RbacSharedQosPoliciesTest(base.BaseAdminNetworkTest):
@@ -1175,26 +1201,36 @@
def resource_setup(cls):
super(QosMinimumBandwidthRuleTestJSON, cls).resource_setup()
+ @classmethod
+ def setup_clients(cls):
+ super(QosMinimumBandwidthRuleTestJSON, cls).setup_clients()
+ cls.qos_min_bw_rules_client = \
+ cls.os_admin.qos_minimum_bandwidth_rules_client
+ cls.qos_min_bw_rules_client_primary = \
+ cls.os_primary.qos_minimum_bandwidth_rules_client
+
@decorators.idempotent_id('aa59b00b-3e9c-4787-92f8-93a5cdf5e378')
def test_rule_create(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
- rule = self.admin_client.create_minimum_bandwidth_rule(
- policy_id=policy['id'],
- direction=self.DIRECTION_EGRESS,
- min_kbps=1138)[self.RULE_NAME]
+ rule = self.qos_min_bw_rules_client.create_minimum_bandwidth_rule(
+ qos_policy_id=policy['id'],
+ **{'direction': self.DIRECTION_EGRESS,
+ 'min_kbps': 1138})[self.RULE_NAME]
# Test 'show rule'
- retrieved_rule = self.admin_client.show_minimum_bandwidth_rule(
- policy['id'], rule['id'])
+ retrieved_rule = \
+ self.qos_min_bw_rules_client.show_minimum_bandwidth_rule(
+ policy['id'], rule['id'])
retrieved_rule = retrieved_rule[self.RULE_NAME]
self.assertEqual(rule['id'], retrieved_rule['id'])
self.assertEqual(1138, retrieved_rule['min_kbps'])
self.assertEqual(self.DIRECTION_EGRESS, retrieved_rule['direction'])
# Test 'list rules'
- rules = self.admin_client.list_minimum_bandwidth_rules(policy['id'])
+ rules = self.qos_min_bw_rules_client.list_minimum_bandwidth_rules(
+ policy['id'])
rules = rules[self.RULES_NAME]
rules_ids = [r['id'] for r in rules]
self.assertIn(rule['id'], rules_ids)
@@ -1212,24 +1248,28 @@
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
- self.assertRaises(exceptions.BadRequest,
- self.admin_client.create_minimum_bandwidth_rule,
- policy_id=policy['id'],
- direction=self.DIRECTION_EGRESS)
+ self.assertRaises(
+ exceptions.BadRequest,
+ self.qos_min_bw_rules_client.create_minimum_bandwidth_rule,
+ qos_policy_id=policy['id'],
+ **{'direction': self.DIRECTION_EGRESS})
@decorators.idempotent_id('aa59b00b-ab01-4787-92f8-93a5cdf5e378')
def test_rule_create_fail_for_the_same_type(self):
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
- self.admin_client.create_minimum_bandwidth_rule(
- policy_id=policy['id'],
- direction=self.DIRECTION_EGRESS, min_kbps=200)
+ self.qos_min_bw_rules_client.create_minimum_bandwidth_rule(
+ qos_policy_id=policy['id'],
+ **{'direction': self.DIRECTION_EGRESS,
+ 'min_kbps': 200})
- self.assertRaises(exceptions.Conflict,
- self.admin_client.create_minimum_bandwidth_rule,
- policy_id=policy['id'],
- direction=self.DIRECTION_EGRESS, min_kbps=201)
+ self.assertRaises(
+ exceptions.Conflict,
+ self.qos_min_bw_rules_client.create_minimum_bandwidth_rule,
+ qos_policy_id=policy['id'],
+ **{'direction': self.DIRECTION_EGRESS,
+ 'min_kbps': 201})
@decorators.idempotent_id('35baf998-ae65-495c-9902-35a0d11e8936')
@utils.requires_ext(extension="qos-bw-minimum-ingress",
@@ -1238,10 +1278,10 @@
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
- self.admin_client.create_minimum_bandwidth_rule(
- policy_id=policy['id'],
- direction=self.DIRECTION_INGRESS,
- min_kbps=201)
+ self.qos_min_bw_rules_client.create_minimum_bandwidth_rule(
+ qos_policy_id=policy['id'],
+ **{'direction': self.DIRECTION_INGRESS,
+ 'min_kbps': 201})
retrieved_policy = self.admin_client.show_qos_policy(policy['id'])
policy_rules = retrieved_policy['policy']['rules']
@@ -1255,16 +1295,18 @@
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
- rule = self.admin_client.create_minimum_bandwidth_rule(
- policy_id=policy['id'],
- direction=self.DIRECTION_EGRESS,
- min_kbps=300)[self.RULE_NAME]
+ rule = self.qos_min_bw_rules_client.create_minimum_bandwidth_rule(
+ qos_policy_id=policy['id'],
+ **{'direction': self.DIRECTION_EGRESS,
+ 'min_kbps': 300})[self.RULE_NAME]
- self.admin_client.update_minimum_bandwidth_rule(policy['id'],
- rule['id'], min_kbps=350, direction=self.DIRECTION_EGRESS)
+ self.qos_min_bw_rules_client.update_minimum_bandwidth_rule(
+ policy['id'], rule['id'],
+ **{'min_kbps': 350, 'direction': self.DIRECTION_EGRESS})
- retrieved_policy = self.admin_client.show_minimum_bandwidth_rule(
- policy['id'], rule['id'])
+ retrieved_policy = \
+ self.qos_min_bw_rules_client.show_minimum_bandwidth_rule(
+ policy['id'], rule['id'])
retrieved_policy = retrieved_policy[self.RULE_NAME]
self.assertEqual(350, retrieved_policy['min_kbps'])
self.assertEqual(self.DIRECTION_EGRESS, retrieved_policy['direction'])
@@ -1274,54 +1316,60 @@
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
- rule = self.admin_client.create_minimum_bandwidth_rule(
- policy['id'], self.DIRECTION_EGRESS, min_kbps=200)[self.RULE_NAME]
+ rule = self.qos_min_bw_rules_client.create_minimum_bandwidth_rule(
+ policy['id'],
+ **{'direction': self.DIRECTION_EGRESS,
+ 'min_kbps': 200})[self.RULE_NAME]
- retrieved_policy = self.admin_client.show_minimum_bandwidth_rule(
- policy['id'], rule['id'])
+ retrieved_policy = \
+ self.qos_min_bw_rules_client.show_minimum_bandwidth_rule(
+ policy['id'], rule['id'])
retrieved_policy = retrieved_policy[self.RULE_NAME]
self.assertEqual(rule['id'], retrieved_policy['id'])
- self.admin_client.delete_minimum_bandwidth_rule(policy['id'],
- rule['id'])
- self.assertRaises(exceptions.NotFound,
- self.admin_client.show_minimum_bandwidth_rule,
- policy['id'], rule['id'])
+ self.qos_min_bw_rules_client.delete_minimum_bandwidth_rule(
+ policy['id'], rule['id'])
+ self.assertRaises(
+ exceptions.NotFound,
+ self.qos_min_bw_rules_client.show_minimum_bandwidth_rule,
+ policy['id'], rule['id'])
@decorators.idempotent_id('a211222c-5808-46cb-a961-983bbab6b852')
def test_rule_create_rule_nonexistent_policy(self):
self.assertRaises(
exceptions.NotFound,
- self.admin_client.create_minimum_bandwidth_rule,
- 'policy', self.DIRECTION_EGRESS, min_kbps=200)
+ self.qos_min_bw_rules_client.create_minimum_bandwidth_rule,
+ 'policy',
+ **{'direction': self.DIRECTION_EGRESS, 'min_kbps': 200})
@decorators.idempotent_id('b4a2e7ad-786f-4927-a85a-e545a93bd274')
def test_rule_create_forbidden_for_regular_tenants(self):
self.assertRaises(
exceptions.Forbidden,
- self.client.create_minimum_bandwidth_rule,
- 'policy', self.DIRECTION_EGRESS, min_kbps=300)
+ self.qos_min_bw_rules_client_primary.create_minimum_bandwidth_rule,
+ 'policy', **{'direction': self.DIRECTION_EGRESS, 'min_kbps': 300})
@decorators.idempotent_id('de0bd0c2-54d9-4e29-85f1-cfb36ac3ebe2')
def test_get_rules_by_policy(self):
policy1 = self.create_qos_policy(name='test-policy1',
description='test policy1',
shared=False)
- rule1 = self.admin_client.create_minimum_bandwidth_rule(
- policy_id=policy1['id'],
- direction=self.DIRECTION_EGRESS,
- min_kbps=200)[self.RULE_NAME]
+ rule1 = self.qos_min_bw_rules_client.create_minimum_bandwidth_rule(
+ qos_policy_id=policy1['id'],
+ **{'direction': self.DIRECTION_EGRESS,
+ 'min_kbps': 200})[self.RULE_NAME]
policy2 = self.create_qos_policy(name='test-policy2',
description='test policy2',
shared=False)
- rule2 = self.admin_client.create_minimum_bandwidth_rule(
- policy_id=policy2['id'],
- direction=self.DIRECTION_EGRESS,
- min_kbps=5000)[self.RULE_NAME]
+ rule2 = self.qos_min_bw_rules_client.create_minimum_bandwidth_rule(
+ qos_policy_id=policy2['id'],
+ **{'direction': self.DIRECTION_EGRESS,
+ 'min_kbps': 5000})[self.RULE_NAME]
# Test 'list rules'
- rules = self.admin_client.list_minimum_bandwidth_rules(policy1['id'])
+ rules = self.qos_min_bw_rules_client.list_minimum_bandwidth_rules(
+ policy1['id'])
rules = rules[self.RULES_NAME]
rules_ids = [r['id'] for r in rules]
self.assertIn(rule1['id'], rules_ids)
diff --git a/neutron_tempest_plugin/api/test_qos_negative.py b/neutron_tempest_plugin/api/test_qos_negative.py
index 2d06d11..3e80129 100644
--- a/neutron_tempest_plugin/api/test_qos_negative.py
+++ b/neutron_tempest_plugin/api/test_qos_negative.py
@@ -100,11 +100,17 @@
policy = self.create_qos_policy(name='test-policy',
description='test policy',
shared=False)
- rule = self.rule_create_m(policy_id=policy['id'], **create_params)
+ rule = self.rule_create_m(policy['id'], **create_params)
+ if "minimum_bandwidth_rule" in rule.keys():
+ rule_id = rule['minimum_bandwidth_rule']['id']
+ if "bandwidth_limit_rule" in rule.keys():
+ rule_id = rule['bandwidth_limit_rule']['id']
+ if "dscp_mark" in rule.keys():
+ rule_id = rule['id']
self.assertRaises(
lib_exc.NotFound,
self.rule_update_m,
- non_exist_id, rule['id'], **update_params)
+ non_exist_id, rule_id, **update_params)
def _test_rule_create_rule_non_existent_policy(self, create_params):
non_exist_id = data_utils.rand_name('qos_policy')
@@ -127,9 +133,17 @@
class QosBandwidthLimitRuleNegativeTestJSON(QosRuleNegativeBaseTestJSON):
@classmethod
+ def setup_clients(cls):
+ super(QosBandwidthLimitRuleNegativeTestJSON, cls).setup_clients()
+ cls.qos_bw_limit_rule_client = \
+ cls.os_admin.qos_limit_bandwidth_rules_client
+
+ @classmethod
def resource_setup(cls):
- cls.rule_create_m = cls.create_qos_bandwidth_limit_rule
- cls.rule_update_m = cls.admin_client.update_bandwidth_limit_rule
+ cls.rule_create_m = \
+ cls.qos_bw_limit_rule_client.create_limit_bandwidth_rule
+ cls.rule_update_m = \
+ cls.qos_bw_limit_rule_client.update_limit_bandwidth_rule
super(QosBandwidthLimitRuleNegativeTestJSON, cls).resource_setup()
@decorators.attr(type='negative')
@@ -157,8 +171,10 @@
@classmethod
def resource_setup(cls):
- cls.rule_create_m = cls.create_qos_minimum_bandwidth_rule
- cls.rule_update_m = cls.admin_client.update_minimum_bandwidth_rule
+ cls.rule_create_m = cls.os_admin.qos_minimum_bandwidth_rules_client.\
+ create_minimum_bandwidth_rule
+ cls.rule_update_m = cls.os_admin.qos_minimum_bandwidth_rules_client.\
+ update_minimum_bandwidth_rule
super(QosMinimumBandwidthRuleNegativeTestJSON, cls).resource_setup()
@decorators.attr(type='negative')
diff --git a/neutron_tempest_plugin/scenario/test_floatingip.py b/neutron_tempest_plugin/scenario/test_floatingip.py
index 9902b68..a5afc73 100644
--- a/neutron_tempest_plugin/scenario/test_floatingip.py
+++ b/neutron_tempest_plugin/scenario/test_floatingip.py
@@ -344,6 +344,8 @@
def setup_clients(cls):
super(FloatingIPQosTest, cls).setup_clients()
cls.admin_client = cls.os_admin.network_client
+ cls.qos_bw_limit_rule_client = \
+ cls.os_admin.qos_limit_bandwidth_rules_client
@decorators.idempotent_id('5eb48aea-eaba-4c20-8a6f-7740070a0aa3')
def test_qos(self):
@@ -364,16 +366,19 @@
ssh_client = self._create_ssh_client()
# As admin user create a new QoS rules
- self.os_admin.network_client.create_bandwidth_limit_rule(
- policy_id, max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND,
- max_burst_kbps=constants.LIMIT_KILO_BYTES,
- direction=lib_constants.INGRESS_DIRECTION)
- self.os_admin.network_client.create_bandwidth_limit_rule(
- policy_id, max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND,
- max_burst_kbps=constants.LIMIT_KILO_BYTES,
- direction=lib_constants.EGRESS_DIRECTION)
+ rule_data = {'max_kbps': constants.LIMIT_KILO_BITS_PER_SECOND,
+ 'max_burst_kbps': constants.LIMIT_KILO_BYTES,
+ 'direction': lib_constants.INGRESS_DIRECTION}
+ self.qos_bw_limit_rule_client.create_limit_bandwidth_rule(
+ qos_policy_id=policy_id, **rule_data)
- rules = self.os_admin.network_client.list_bandwidth_limit_rules(
+ rule_data = {'max_kbps': constants.LIMIT_KILO_BITS_PER_SECOND,
+ 'max_burst_kbps': constants.LIMIT_KILO_BYTES,
+ 'direction': lib_constants.EGRESS_DIRECTION}
+ self.qos_bw_limit_rule_client.create_limit_bandwidth_rule(
+ qos_policy_id=policy_id, **rule_data)
+
+ rules = self.qos_bw_limit_rule_client.list_limit_bandwidth_rules(
policy_id)
self.assertEqual(2, len(rules['bandwidth_limit_rules']))
@@ -404,11 +409,10 @@
# As admin user update QoS rules
for rule in rules['bandwidth_limit_rules']:
- self.os_admin.network_client.update_bandwidth_limit_rule(
- policy_id,
- rule['id'],
- max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND * 2,
- max_burst_kbps=constants.LIMIT_KILO_BITS_PER_SECOND * 2)
+ self.qos_bw_limit_rule_client.update_limit_bandwidth_rule(
+ policy_id, rule['id'],
+ **{'max_kbps': constants.LIMIT_KILO_BITS_PER_SECOND * 2,
+ 'max_burst_kbps': constants.LIMIT_KILO_BITS_PER_SECOND * 2})
# Check that actual BW while downloading file
# is as expected (Update BW)
diff --git a/neutron_tempest_plugin/scenario/test_qos.py b/neutron_tempest_plugin/scenario/test_qos.py
index d00210c..74be216 100644
--- a/neutron_tempest_plugin/scenario/test_qos.py
+++ b/neutron_tempest_plugin/scenario/test_qos.py
@@ -20,6 +20,7 @@
from oslo_log import log as logging
from tempest.common import utils as tutils
from tempest.common import waiters
+from tempest.lib.common.utils import test_utils
from tempest.lib import decorators
from neutron_tempest_plugin.api import base as base_api
@@ -138,8 +139,20 @@
description='test-qos-policy',
shared=True)
self.qos_policies.append(policy['policy'])
+ self.addCleanup(test_utils.call_and_ignore_notfound_exc,
+ self.os_admin.network_client.delete_qos_policy, policy)
return policy['policy']['id']
+ def _create_qos_bw_limit_rule(self, policy_id, rule_data):
+ rule = self.qos_bw_limit_rule_client.create_limit_bandwidth_rule(
+ qos_policy_id=policy_id,
+ **rule_data)['bandwidth_limit_rule']
+ self.addCleanup(
+ test_utils.call_and_ignore_notfound_exc,
+ self.qos_bw_limit_rule_client.delete_limit_bandwidth_rule,
+ policy_id, rule['id'])
+ return rule
+
def _create_server_by_port(self, port=None):
"""Launch an instance using a port interface;
@@ -194,6 +207,8 @@
def setup_clients(cls):
super(QoSTest, cls).setup_clients()
cls.admin_client = cls.os_admin.network_client
+ cls.qos_bw_limit_rule_client = \
+ cls.os_admin.qos_limit_bandwidth_rules_client
@decorators.idempotent_id('00682a0c-b72e-11e8-b81e-8c16450ea513')
def test_qos_basic_and_update(self):
@@ -227,11 +242,11 @@
bw_limit_policy_id = self._create_qos_policy()
# As admin user create QoS rule
- rule_id = self.os_admin.network_client.create_bandwidth_limit_rule(
- policy_id=bw_limit_policy_id,
- max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND,
- max_burst_kbps=constants.LIMIT_KILO_BITS_PER_SECOND)[
- 'bandwidth_limit_rule']['id']
+ rule_data = {
+ 'max_kbps': constants.LIMIT_KILO_BITS_PER_SECOND,
+ 'max_burst_kbps': constants.LIMIT_KILO_BITS_PER_SECOND}
+ rule_id = self._create_qos_bw_limit_rule(
+ bw_limit_policy_id, rule_data)['id']
# Associate QoS to the network
self.os_admin.network_client.update_network(
@@ -250,11 +265,12 @@
' the network" Actual BW is not as expected!'))
# As admin user update QoS rule
- self.os_admin.network_client.update_bandwidth_limit_rule(
- bw_limit_policy_id,
- rule_id,
- max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND * 2,
- max_burst_kbps=constants.LIMIT_KILO_BITS_PER_SECOND * 2)
+ rule_update_data = {
+ 'max_kbps': constants.LIMIT_KILO_BITS_PER_SECOND * 2,
+ 'max_burst_kbps': constants.LIMIT_KILO_BITS_PER_SECOND * 2}
+ self.qos_bw_limit_rule_client.update_limit_bandwidth_rule(
+ qos_policy_id=bw_limit_policy_id, rule_id=rule_id,
+ **rule_update_data)
# Check that actual BW while downloading file
# is as expected (Update BW)
@@ -273,11 +289,11 @@
bw_limit_policy_id_new = self._create_qos_policy()
# As admin user create a new QoS rule
- rule_id_new = self.os_admin.network_client.create_bandwidth_limit_rule(
- policy_id=bw_limit_policy_id_new,
- max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND,
- max_burst_kbps=constants.LIMIT_KILO_BITS_PER_SECOND)[
- 'bandwidth_limit_rule']['id']
+ rule_data_new = {
+ 'max_kbps': constants.LIMIT_KILO_BITS_PER_SECOND,
+ 'max_burst_kbps': constants.LIMIT_KILO_BITS_PER_SECOND}
+ rule_id_new = self._create_qos_bw_limit_rule(
+ bw_limit_policy_id_new, rule_data_new)['id']
# Associate a new QoS policy to Neutron port
self.os_admin.network_client.update_port(
@@ -296,11 +312,12 @@
' the VM port" Actual BW is not as expected!'))
# As admin user update QoS rule
- self.os_admin.network_client.update_bandwidth_limit_rule(
- bw_limit_policy_id_new,
- rule_id_new,
- max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND * 3,
- max_burst_kbps=constants.LIMIT_KILO_BITS_PER_SECOND * 3)
+ rule_update_data = {
+ 'max_kbps': constants.LIMIT_KILO_BITS_PER_SECOND * 3,
+ 'max_burst_kbps': constants.LIMIT_KILO_BITS_PER_SECOND * 3}
+ self.qos_bw_limit_rule_client.update_limit_bandwidth_rule(
+ qos_policy_id=bw_limit_policy_id_new, rule_id=rule_id_new,
+ **rule_update_data)
# Check that actual BW while downloading file
# is as expected (Update BW)
@@ -334,11 +351,10 @@
description='policy for attach',
shared=False)['policy']
- rule = self.os_admin.network_client.create_bandwidth_limit_rule(
- policy_id=port_policy['id'],
- max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND,
- max_burst_kbps=constants.LIMIT_KILO_BITS_PER_SECOND)[
- 'bandwidth_limit_rule']
+ rule_data = {
+ 'max_kbps': constants.LIMIT_KILO_BITS_PER_SECOND,
+ 'max_burst_kbps': constants.LIMIT_KILO_BITS_PER_SECOND}
+ rule = self._create_qos_bw_limit_rule(port_policy['id'], rule_data)
self.os_admin.network_client.update_port(
vm_port['id'], qos_policy_id=port_policy['id'])
@@ -378,10 +394,10 @@
name='network-policy',
shared=False)['policy']
- rule = self.os_admin.network_client.create_bandwidth_limit_rule(
- policy_id=qos_policy['id'],
- max_kbps=constants.LIMIT_KILO_BITS_PER_SECOND,
- max_burst_kbps=constants.LIMIT_KILO_BITS_PER_SECOND)
+ rule_data = {
+ 'max_kbps': constants.LIMIT_KILO_BITS_PER_SECOND,
+ 'max_burst_kbps': constants.LIMIT_KILO_BITS_PER_SECOND}
+ rule = self._create_qos_bw_limit_rule(qos_policy['id'], rule_data)
network = self.os_admin.network_client.update_network(
network['id'],
@@ -399,9 +415,9 @@
retrieved_net['network']['qos_policy_id'])
retrieved_rule_id = retrieved_policy['policy']['rules'][0]['id']
- self.assertEqual(rule['bandwidth_limit_rule']['id'],
+ self.assertEqual(rule['id'],
retrieved_rule_id,
"""The expected rule ID is {0},
the actual value is {1}""".
- format(rule['bandwidth_limit_rule']['id'],
+ format(rule['id'],
retrieved_rule_id))
diff --git a/neutron_tempest_plugin/services/network/json/network_client.py b/neutron_tempest_plugin/services/network/json/network_client.py
index d4eff84..a4c809e 100644
--- a/neutron_tempest_plugin/services/network/json/network_client.py
+++ b/neutron_tempest_plugin/services/network/json/network_client.py
@@ -627,57 +627,6 @@
self.expected_success(200, resp.status)
return service_client.ResponseBody(resp, body)
- def create_bandwidth_limit_rule(self, policy_id, max_kbps,
- max_burst_kbps, direction=None):
- uri = '%s/qos/policies/%s/bandwidth_limit_rules' % (
- self.uri_prefix, policy_id)
- post_data = {
- 'bandwidth_limit_rule': {
- 'max_kbps': max_kbps,
- 'max_burst_kbps': max_burst_kbps
- }
- }
- if direction:
- post_data['bandwidth_limit_rule']['direction'] = direction
- resp, body = self.post(uri, self.serialize(post_data))
- self.expected_success(201, resp.status)
- body = jsonutils.loads(body)
- return service_client.ResponseBody(resp, body)
-
- def list_bandwidth_limit_rules(self, policy_id):
- uri = '%s/qos/policies/%s/bandwidth_limit_rules' % (
- self.uri_prefix, policy_id)
- resp, body = self.get(uri)
- body = self.deserialize_single(body)
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def show_bandwidth_limit_rule(self, policy_id, rule_id):
- uri = '%s/qos/policies/%s/bandwidth_limit_rules/%s' % (
- self.uri_prefix, policy_id, rule_id)
- resp, body = self.get(uri)
- body = self.deserialize_single(body)
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def update_bandwidth_limit_rule(self, policy_id, rule_id, **kwargs):
- uri = '%s/qos/policies/%s/bandwidth_limit_rules/%s' % (
- self.uri_prefix, policy_id, rule_id)
- if "direction" in kwargs and kwargs['direction'] is None:
- kwargs.pop('direction')
- post_data = {'bandwidth_limit_rule': kwargs}
- resp, body = self.put(uri, jsonutils.dumps(post_data))
- body = self.deserialize_single(body)
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def delete_bandwidth_limit_rule(self, policy_id, rule_id):
- uri = '%s/qos/policies/%s/bandwidth_limit_rules/%s' % (
- self.uri_prefix, policy_id, rule_id)
- resp, body = self.delete(uri)
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
def create_dscp_marking_rule(self, policy_id, dscp_mark):
uri = '%s/qos/policies/%s/dscp_marking_rules' % (
self.uri_prefix, policy_id)
@@ -723,53 +672,6 @@
self.expected_success(204, resp.status)
return service_client.ResponseBody(resp, body)
- def create_minimum_bandwidth_rule(self, policy_id, direction,
- min_kbps=None):
- uri = '%s/qos/policies/%s/minimum_bandwidth_rules' % (
- self.uri_prefix, policy_id)
- data = {
- 'direction': direction,
- }
- if min_kbps is not None:
- data['min_kbps'] = min_kbps
- post_data = self.serialize({'minimum_bandwidth_rule': data})
- resp, body = self.post(uri, post_data)
- self.expected_success(201, resp.status)
- body = jsonutils.loads(body)
- return service_client.ResponseBody(resp, body)
-
- def list_minimum_bandwidth_rules(self, policy_id):
- uri = '%s/qos/policies/%s/minimum_bandwidth_rules' % (
- self.uri_prefix, policy_id)
- resp, body = self.get(uri)
- body = self.deserialize_single(body)
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def show_minimum_bandwidth_rule(self, policy_id, rule_id):
- uri = '%s/qos/policies/%s/minimum_bandwidth_rules/%s' % (
- self.uri_prefix, policy_id, rule_id)
- resp, body = self.get(uri)
- body = self.deserialize_single(body)
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def update_minimum_bandwidth_rule(self, policy_id, rule_id, **kwargs):
- uri = '%s/qos/policies/%s/minimum_bandwidth_rules/%s' % (
- self.uri_prefix, policy_id, rule_id)
- post_data = {'minimum_bandwidth_rule': kwargs}
- resp, body = self.put(uri, jsonutils.dumps(post_data))
- body = self.deserialize_single(body)
- self.expected_success(200, resp.status)
- return service_client.ResponseBody(resp, body)
-
- def delete_minimum_bandwidth_rule(self, policy_id, rule_id):
- uri = '%s/qos/policies/%s/minimum_bandwidth_rules/%s' % (
- self.uri_prefix, policy_id, rule_id)
- resp, body = self.delete(uri)
- self.expected_success(204, resp.status)
- return service_client.ResponseBody(resp, body)
-
def list_qos_rule_types(self):
uri = '%s/qos/rule-types' % self.uri_prefix
resp, body = self.get(uri)