qos basic scenario
Adding a basic test to check enforcement of QoS policy with
bandwidth limit rule QoS
Spec:
http://specs.openstack.org/openstack/neutron-specs/specs/liberty/qos-api-extension.html
Change-Id: Ifb319aefeee27b3e5f1bbe16ebb99bf73f4465e7
diff --git a/neutron/tests/tempest/scenario/base.py b/neutron/tests/tempest/scenario/base.py
index cf7686f..8c45f4b 100644
--- a/neutron/tests/tempest/scenario/base.py
+++ b/neutron/tests/tempest/scenario/base.py
@@ -66,7 +66,7 @@
return body['keypair']
@classmethod
- def create_loginable_secgroup_rule(cls, secgroup_id=None):
+ def create_secgroup_rules(cls, rule_list, secgroup_id=None):
client = cls.manager.network_client
if not secgroup_id:
sgs = client.list_security_groups()['security_groups']
@@ -75,18 +75,29 @@
secgroup_id = sg['id']
break
- # This rule is intended to permit inbound ssh
- # traffic from all sources, so no group_id is provided.
- # Setting a group_id would only permit traffic from ports
- # belonging to the same security group.
- ruleset = {'protocol': 'tcp',
- 'port_range_min': 22,
- 'port_range_max': 22,
- 'remote_ip_prefix': '0.0.0.0/0'}
- rules = [client.create_security_group_rule(
- direction='ingress', security_group_id=secgroup_id,
- **ruleset)['security_group_rule']]
- return rules
+ for rule in rule_list:
+ direction = rule.pop('direction')
+ client.create_security_group_rule(
+ direction=direction,
+ security_group_id=secgroup_id,
+ **rule)
+
+ @classmethod
+ def create_loginable_secgroup_rule(cls, secgroup_id=None):
+ """This rule is intended to permit inbound ssh
+
+ Allowing ssh traffic traffic from all sources, so no group_id is
+ provided.
+ Setting a group_id would only permit traffic from ports
+ belonging to the same security group.
+ """
+
+ rule_list = [{'protocol': 'tcp',
+ 'direction': 'ingress',
+ 'port_range_min': 22,
+ 'port_range_max': 22,
+ 'remote_ip_prefix': '0.0.0.0/0'}]
+ cls.create_secgroup_rules(rule_list, secgroup_id=secgroup_id)
@classmethod
def create_router_and_interface(cls, subnet_id):
@@ -109,3 +120,24 @@
def check_connectivity(cls, host, ssh_user, ssh_key=None):
ssh_client = ssh.Client(host, ssh_user, pkey=ssh_key)
ssh_client.test_connection_auth()
+
+ @classmethod
+ def setup_network_and_server(cls):
+ cls.network = cls.create_network()
+ cls.subnet = cls.create_subnet(cls.network)
+
+ cls.create_router_and_interface(cls.subnet['id'])
+ cls.keypair = cls.create_keypair()
+ cls.create_loginable_secgroup_rule()
+ cls.server = cls.create_server(
+ flavor_ref=CONF.compute.flavor_ref,
+ image_ref=CONF.compute.image_ref,
+ key_name=cls.keypair['name'],
+ networks=[{'uuid': cls.network['id']}])
+ waiters.wait_for_server_status(cls.manager.servers_client,
+ cls.server['server']['id'],
+ constants.SERVER_STATUS_ACTIVE)
+ port = cls.client.list_ports(network_id=cls.network['id'],
+ device_id=cls.server[
+ 'server']['id'])['ports'][0]
+ cls.fip = cls.create_and_associate_floatingip(port['id'])